diff --git a/plugins/autopeering/protocol/incoming_response_processor.go b/plugins/autopeering/protocol/incoming_response_processor.go index ea33fa60d5c7f00411f8315065692014d022a1e6..ea1af9c5733b45840e2675c0315b9ec956807d9a 100644 --- a/plugins/autopeering/protocol/incoming_response_processor.go +++ b/plugins/autopeering/protocol/incoming_response_processor.go @@ -17,7 +17,9 @@ func createIncomingResponseProcessor(plugin *node.Plugin) *events.Closure { func processIncomingResponse(plugin *node.Plugin, peeringResponse *response.Response) { plugin.LogDebug("received peering response from " + peeringResponse.Issuer.String()) - _ = peeringResponse.Issuer.Conn.Close() + if conn := peeringResponse.Issuer.GetConn(); conn != nil { + _ = conn.Close() + } knownpeers.INSTANCE.AddOrUpdate(peeringResponse.Issuer) for _, peer := range peeringResponse.Peers { diff --git a/plugins/autopeering/protocol/outgoing_request_processor.go b/plugins/autopeering/protocol/outgoing_request_processor.go index ea2f37459d7f3876105d30a8b2b3461bac99ca22..118a2ffc907d6eec9232b71af12e7d2efc10f329 100644 --- a/plugins/autopeering/protocol/outgoing_request_processor.go +++ b/plugins/autopeering/protocol/outgoing_request_processor.go @@ -1,10 +1,11 @@ package protocol import ( + "time" + "github.com/iotaledger/goshimmer/plugins/autopeering/instances/outgoingrequest" "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/types" "github.com/iotaledger/goshimmer/plugins/autopeering/server/tcp" - "time" "github.com/iotaledger/goshimmer/packages/timeutil" @@ -55,7 +56,7 @@ func sendOutgoingRequests(plugin *node.Plugin) { plugin.LogDebug("sent peering request to " + chosenNeighborCandidate.String()) if dialed { - tcp.HandleConnection(chosenNeighborCandidate.Conn) + tcp.HandleConnection(chosenNeighborCandidate.GetConn()) } } @@ -63,10 +64,10 @@ func sendOutgoingRequests(plugin *node.Plugin) { }(doneChan) select { - case <-daemon.ShutdownSignal: - return - case <-doneChan: - continue + case <-daemon.ShutdownSignal: + return + case <-doneChan: + continue } } } diff --git a/plugins/autopeering/server/tcp/server.go b/plugins/autopeering/server/tcp/server.go index 165c098e56290c712655513414017c99c53f3a5b..3c6f11e82ea7d707935252c9afd5f4a7b5a44d52 100644 --- a/plugins/autopeering/server/tcp/server.go +++ b/plugins/autopeering/server/tcp/server.go @@ -140,11 +140,11 @@ func processIncomingRequestPacket(connectionState *byte, receiveBuffer *[]byte, return } else { - req.Issuer.Conn = conn + req.Issuer.SetConn(conn) req.Issuer.Address = conn.RemoteAddr().(*net.TCPAddr).IP - req.Issuer.Conn.Events.Close.Attach(events.NewClosure(func() { - req.Issuer.Conn = nil + conn.Events.Close.Attach(events.NewClosure(func() { + req.Issuer.SetConn(nil) })) Events.ReceiveRequest.Trigger(req) @@ -173,11 +173,11 @@ func processIncomingResponsePacket(connectionState *byte, receiveBuffer *[]byte, return } else { - res.Issuer.Conn = conn + res.Issuer.SetConn(conn) res.Issuer.Address = conn.RemoteAddr().(*net.TCPAddr).IP - res.Issuer.Conn.Events.Close.Attach(events.NewClosure(func() { - res.Issuer.Conn = nil + conn.Events.Close.Attach(events.NewClosure(func() { + res.Issuer.SetConn(nil) })) Events.ReceiveResponse.Trigger(res) @@ -206,11 +206,11 @@ func processIncomingPingPacket(connectionState *byte, receiveBuffer *[]byte, con return } else { - ping.Issuer.Conn = conn + ping.Issuer.SetConn(conn) ping.Issuer.Address = conn.RemoteAddr().(*net.TCPAddr).IP - ping.Issuer.Conn.Events.Close.Attach(events.NewClosure(func() { - ping.Issuer.Conn = nil + conn.Events.Close.Attach(events.NewClosure(func() { + ping.Issuer.SetConn(nil) })) Events.ReceivePing.Trigger(ping) diff --git a/plugins/autopeering/types/peer/peer.go b/plugins/autopeering/types/peer/peer.go index 1428ed80e675b4c8cb60ff63836cb476bb8ba929..cecf6e10e132d89566d1e1e79229e1bf44918248 100644 --- a/plugins/autopeering/types/peer/peer.go +++ b/plugins/autopeering/types/peer/peer.go @@ -21,14 +21,28 @@ type Peer struct { PeeringPort uint16 GossipPort uint16 Salt *salt.Salt - Conn *network.ManagedConnection - connectMutex sync.Mutex + conn *network.ManagedConnection + connectMutex sync.RWMutex firstSeen time.Time firstSeenMutex sync.RWMutex lastSeen time.Time lastSeenMutex sync.RWMutex } +func (peer *Peer) GetConn() (result *network.ManagedConnection) { + peer.connectMutex.RLock() + result = peer.conn + peer.connectMutex.RUnlock() + + return +} + +func (peer *Peer) SetConn(conn *network.ManagedConnection) { + peer.connectMutex.Lock() + peer.conn = conn + peer.connectMutex.Unlock() +} + func Unmarshal(data []byte) (*Peer, error) { if len(data) < MARSHALED_TOTAL_SIZE { return nil, errors.New("size of marshaled peer is too small") @@ -76,27 +90,31 @@ func (peer *Peer) Send(data []byte, protocol types.ProtocolType, responseExpecte } func (peer *Peer) ConnectTCP() (*network.ManagedConnection, bool, error) { - if peer.Conn == nil { + peer.connectMutex.RLock() + + if peer.conn == nil { + peer.connectMutex.RUnlock() peer.connectMutex.Lock() defer peer.connectMutex.Unlock() - if peer.Conn == nil { + if peer.conn == nil { conn, err := net.Dial("tcp", peer.Address.String()+":"+strconv.Itoa(int(peer.PeeringPort))) if err != nil { return nil, false, errors.New("error when connecting to " + peer.String() + ": " + err.Error()) } else { - peer.Conn = network.NewManagedConnection(conn) - - peer.Conn.Events.Close.Attach(events.NewClosure(func() { - peer.Conn = nil + peer.conn = network.NewManagedConnection(conn) + peer.conn.Events.Close.Attach(events.NewClosure(func() { + peer.SetConn(nil) })) - return peer.Conn, true, nil + return peer.conn, true, nil } } + } else { + peer.connectMutex.RUnlock() } - return peer.Conn, false, nil + return peer.conn, false, nil } func (peer *Peer) ConnectUDP() (*network.ManagedConnection, bool, error) {