From 9eaead509e7f1b49b0018ba58f38b7a9c12f2cde Mon Sep 17 00:00:00 2001 From: Hans Moog <hm@mkjc.net> Date: Thu, 1 Aug 2019 13:03:31 +0200 Subject: [PATCH] Fix: made peer thread safe + checking for close conn --- .../protocol/incoming_response_processor.go | 4 +- .../protocol/outgoing_request_processor.go | 13 ++++--- plugins/autopeering/server/tcp/server.go | 18 ++++----- plugins/autopeering/types/peer/peer.go | 38 ++++++++++++++----- 4 files changed, 47 insertions(+), 26 deletions(-) diff --git a/plugins/autopeering/protocol/incoming_response_processor.go b/plugins/autopeering/protocol/incoming_response_processor.go index ea33fa60..ea1af9c5 100644 --- a/plugins/autopeering/protocol/incoming_response_processor.go +++ b/plugins/autopeering/protocol/incoming_response_processor.go @@ -17,7 +17,9 @@ func createIncomingResponseProcessor(plugin *node.Plugin) *events.Closure { func processIncomingResponse(plugin *node.Plugin, peeringResponse *response.Response) { plugin.LogDebug("received peering response from " + peeringResponse.Issuer.String()) - _ = peeringResponse.Issuer.Conn.Close() + if conn := peeringResponse.Issuer.GetConn(); conn != nil { + _ = conn.Close() + } knownpeers.INSTANCE.AddOrUpdate(peeringResponse.Issuer) for _, peer := range peeringResponse.Peers { diff --git a/plugins/autopeering/protocol/outgoing_request_processor.go b/plugins/autopeering/protocol/outgoing_request_processor.go index ea2f3745..118a2ffc 100644 --- a/plugins/autopeering/protocol/outgoing_request_processor.go +++ b/plugins/autopeering/protocol/outgoing_request_processor.go @@ -1,10 +1,11 @@ package protocol import ( + "time" + "github.com/iotaledger/goshimmer/plugins/autopeering/instances/outgoingrequest" "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/types" "github.com/iotaledger/goshimmer/plugins/autopeering/server/tcp" - "time" "github.com/iotaledger/goshimmer/packages/timeutil" @@ -55,7 +56,7 @@ func sendOutgoingRequests(plugin *node.Plugin) { plugin.LogDebug("sent peering request to " + chosenNeighborCandidate.String()) if dialed { - tcp.HandleConnection(chosenNeighborCandidate.Conn) + tcp.HandleConnection(chosenNeighborCandidate.GetConn()) } } @@ -63,10 +64,10 @@ func sendOutgoingRequests(plugin *node.Plugin) { }(doneChan) select { - case <-daemon.ShutdownSignal: - return - case <-doneChan: - continue + case <-daemon.ShutdownSignal: + return + case <-doneChan: + continue } } } diff --git a/plugins/autopeering/server/tcp/server.go b/plugins/autopeering/server/tcp/server.go index 165c098e..3c6f11e8 100644 --- a/plugins/autopeering/server/tcp/server.go +++ b/plugins/autopeering/server/tcp/server.go @@ -140,11 +140,11 @@ func processIncomingRequestPacket(connectionState *byte, receiveBuffer *[]byte, return } else { - req.Issuer.Conn = conn + req.Issuer.SetConn(conn) req.Issuer.Address = conn.RemoteAddr().(*net.TCPAddr).IP - req.Issuer.Conn.Events.Close.Attach(events.NewClosure(func() { - req.Issuer.Conn = nil + conn.Events.Close.Attach(events.NewClosure(func() { + req.Issuer.SetConn(nil) })) Events.ReceiveRequest.Trigger(req) @@ -173,11 +173,11 @@ func processIncomingResponsePacket(connectionState *byte, receiveBuffer *[]byte, return } else { - res.Issuer.Conn = conn + res.Issuer.SetConn(conn) res.Issuer.Address = conn.RemoteAddr().(*net.TCPAddr).IP - res.Issuer.Conn.Events.Close.Attach(events.NewClosure(func() { - res.Issuer.Conn = nil + conn.Events.Close.Attach(events.NewClosure(func() { + res.Issuer.SetConn(nil) })) Events.ReceiveResponse.Trigger(res) @@ -206,11 +206,11 @@ func processIncomingPingPacket(connectionState *byte, receiveBuffer *[]byte, con return } else { - ping.Issuer.Conn = conn + ping.Issuer.SetConn(conn) ping.Issuer.Address = conn.RemoteAddr().(*net.TCPAddr).IP - ping.Issuer.Conn.Events.Close.Attach(events.NewClosure(func() { - ping.Issuer.Conn = nil + conn.Events.Close.Attach(events.NewClosure(func() { + ping.Issuer.SetConn(nil) })) Events.ReceivePing.Trigger(ping) diff --git a/plugins/autopeering/types/peer/peer.go b/plugins/autopeering/types/peer/peer.go index 1428ed80..cecf6e10 100644 --- a/plugins/autopeering/types/peer/peer.go +++ b/plugins/autopeering/types/peer/peer.go @@ -21,14 +21,28 @@ type Peer struct { PeeringPort uint16 GossipPort uint16 Salt *salt.Salt - Conn *network.ManagedConnection - connectMutex sync.Mutex + conn *network.ManagedConnection + connectMutex sync.RWMutex firstSeen time.Time firstSeenMutex sync.RWMutex lastSeen time.Time lastSeenMutex sync.RWMutex } +func (peer *Peer) GetConn() (result *network.ManagedConnection) { + peer.connectMutex.RLock() + result = peer.conn + peer.connectMutex.RUnlock() + + return +} + +func (peer *Peer) SetConn(conn *network.ManagedConnection) { + peer.connectMutex.Lock() + peer.conn = conn + peer.connectMutex.Unlock() +} + func Unmarshal(data []byte) (*Peer, error) { if len(data) < MARSHALED_TOTAL_SIZE { return nil, errors.New("size of marshaled peer is too small") @@ -76,27 +90,31 @@ func (peer *Peer) Send(data []byte, protocol types.ProtocolType, responseExpecte } func (peer *Peer) ConnectTCP() (*network.ManagedConnection, bool, error) { - if peer.Conn == nil { + peer.connectMutex.RLock() + + if peer.conn == nil { + peer.connectMutex.RUnlock() peer.connectMutex.Lock() defer peer.connectMutex.Unlock() - if peer.Conn == nil { + if peer.conn == nil { conn, err := net.Dial("tcp", peer.Address.String()+":"+strconv.Itoa(int(peer.PeeringPort))) if err != nil { return nil, false, errors.New("error when connecting to " + peer.String() + ": " + err.Error()) } else { - peer.Conn = network.NewManagedConnection(conn) - - peer.Conn.Events.Close.Attach(events.NewClosure(func() { - peer.Conn = nil + peer.conn = network.NewManagedConnection(conn) + peer.conn.Events.Close.Attach(events.NewClosure(func() { + peer.SetConn(nil) })) - return peer.Conn, true, nil + return peer.conn, true, nil } } + } else { + peer.connectMutex.RUnlock() } - return peer.Conn, false, nil + return peer.conn, false, nil } func (peer *Peer) ConnectUDP() (*network.ManagedConnection, bool, error) { -- GitLab