Skip to content
Snippets Groups Projects
Commit 9eaead50 authored by Hans Moog's avatar Hans Moog
Browse files

Fix: made peer thread safe + checking for close conn

parent 324cf785
No related branches found
No related tags found
No related merge requests found
...@@ -17,7 +17,9 @@ func createIncomingResponseProcessor(plugin *node.Plugin) *events.Closure { ...@@ -17,7 +17,9 @@ func createIncomingResponseProcessor(plugin *node.Plugin) *events.Closure {
func processIncomingResponse(plugin *node.Plugin, peeringResponse *response.Response) { func processIncomingResponse(plugin *node.Plugin, peeringResponse *response.Response) {
plugin.LogDebug("received peering response from " + peeringResponse.Issuer.String()) plugin.LogDebug("received peering response from " + peeringResponse.Issuer.String())
_ = peeringResponse.Issuer.Conn.Close() if conn := peeringResponse.Issuer.GetConn(); conn != nil {
_ = conn.Close()
}
knownpeers.INSTANCE.AddOrUpdate(peeringResponse.Issuer) knownpeers.INSTANCE.AddOrUpdate(peeringResponse.Issuer)
for _, peer := range peeringResponse.Peers { for _, peer := range peeringResponse.Peers {
......
package protocol package protocol
import ( import (
"time"
"github.com/iotaledger/goshimmer/plugins/autopeering/instances/outgoingrequest" "github.com/iotaledger/goshimmer/plugins/autopeering/instances/outgoingrequest"
"github.com/iotaledger/goshimmer/plugins/autopeering/protocol/types" "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/types"
"github.com/iotaledger/goshimmer/plugins/autopeering/server/tcp" "github.com/iotaledger/goshimmer/plugins/autopeering/server/tcp"
"time"
"github.com/iotaledger/goshimmer/packages/timeutil" "github.com/iotaledger/goshimmer/packages/timeutil"
...@@ -55,7 +56,7 @@ func sendOutgoingRequests(plugin *node.Plugin) { ...@@ -55,7 +56,7 @@ func sendOutgoingRequests(plugin *node.Plugin) {
plugin.LogDebug("sent peering request to " + chosenNeighborCandidate.String()) plugin.LogDebug("sent peering request to " + chosenNeighborCandidate.String())
if dialed { if dialed {
tcp.HandleConnection(chosenNeighborCandidate.Conn) tcp.HandleConnection(chosenNeighborCandidate.GetConn())
} }
} }
...@@ -63,10 +64,10 @@ func sendOutgoingRequests(plugin *node.Plugin) { ...@@ -63,10 +64,10 @@ func sendOutgoingRequests(plugin *node.Plugin) {
}(doneChan) }(doneChan)
select { select {
case <-daemon.ShutdownSignal: case <-daemon.ShutdownSignal:
return return
case <-doneChan: case <-doneChan:
continue continue
} }
} }
} }
......
...@@ -140,11 +140,11 @@ func processIncomingRequestPacket(connectionState *byte, receiveBuffer *[]byte, ...@@ -140,11 +140,11 @@ func processIncomingRequestPacket(connectionState *byte, receiveBuffer *[]byte,
return return
} else { } else {
req.Issuer.Conn = conn req.Issuer.SetConn(conn)
req.Issuer.Address = conn.RemoteAddr().(*net.TCPAddr).IP req.Issuer.Address = conn.RemoteAddr().(*net.TCPAddr).IP
req.Issuer.Conn.Events.Close.Attach(events.NewClosure(func() { conn.Events.Close.Attach(events.NewClosure(func() {
req.Issuer.Conn = nil req.Issuer.SetConn(nil)
})) }))
Events.ReceiveRequest.Trigger(req) Events.ReceiveRequest.Trigger(req)
...@@ -173,11 +173,11 @@ func processIncomingResponsePacket(connectionState *byte, receiveBuffer *[]byte, ...@@ -173,11 +173,11 @@ func processIncomingResponsePacket(connectionState *byte, receiveBuffer *[]byte,
return return
} else { } else {
res.Issuer.Conn = conn res.Issuer.SetConn(conn)
res.Issuer.Address = conn.RemoteAddr().(*net.TCPAddr).IP res.Issuer.Address = conn.RemoteAddr().(*net.TCPAddr).IP
res.Issuer.Conn.Events.Close.Attach(events.NewClosure(func() { conn.Events.Close.Attach(events.NewClosure(func() {
res.Issuer.Conn = nil res.Issuer.SetConn(nil)
})) }))
Events.ReceiveResponse.Trigger(res) Events.ReceiveResponse.Trigger(res)
...@@ -206,11 +206,11 @@ func processIncomingPingPacket(connectionState *byte, receiveBuffer *[]byte, con ...@@ -206,11 +206,11 @@ func processIncomingPingPacket(connectionState *byte, receiveBuffer *[]byte, con
return return
} else { } else {
ping.Issuer.Conn = conn ping.Issuer.SetConn(conn)
ping.Issuer.Address = conn.RemoteAddr().(*net.TCPAddr).IP ping.Issuer.Address = conn.RemoteAddr().(*net.TCPAddr).IP
ping.Issuer.Conn.Events.Close.Attach(events.NewClosure(func() { conn.Events.Close.Attach(events.NewClosure(func() {
ping.Issuer.Conn = nil ping.Issuer.SetConn(nil)
})) }))
Events.ReceivePing.Trigger(ping) Events.ReceivePing.Trigger(ping)
......
...@@ -21,14 +21,28 @@ type Peer struct { ...@@ -21,14 +21,28 @@ type Peer struct {
PeeringPort uint16 PeeringPort uint16
GossipPort uint16 GossipPort uint16
Salt *salt.Salt Salt *salt.Salt
Conn *network.ManagedConnection conn *network.ManagedConnection
connectMutex sync.Mutex connectMutex sync.RWMutex
firstSeen time.Time firstSeen time.Time
firstSeenMutex sync.RWMutex firstSeenMutex sync.RWMutex
lastSeen time.Time lastSeen time.Time
lastSeenMutex sync.RWMutex lastSeenMutex sync.RWMutex
} }
func (peer *Peer) GetConn() (result *network.ManagedConnection) {
peer.connectMutex.RLock()
result = peer.conn
peer.connectMutex.RUnlock()
return
}
func (peer *Peer) SetConn(conn *network.ManagedConnection) {
peer.connectMutex.Lock()
peer.conn = conn
peer.connectMutex.Unlock()
}
func Unmarshal(data []byte) (*Peer, error) { func Unmarshal(data []byte) (*Peer, error) {
if len(data) < MARSHALED_TOTAL_SIZE { if len(data) < MARSHALED_TOTAL_SIZE {
return nil, errors.New("size of marshaled peer is too small") return nil, errors.New("size of marshaled peer is too small")
...@@ -76,27 +90,31 @@ func (peer *Peer) Send(data []byte, protocol types.ProtocolType, responseExpecte ...@@ -76,27 +90,31 @@ func (peer *Peer) Send(data []byte, protocol types.ProtocolType, responseExpecte
} }
func (peer *Peer) ConnectTCP() (*network.ManagedConnection, bool, error) { func (peer *Peer) ConnectTCP() (*network.ManagedConnection, bool, error) {
if peer.Conn == nil { peer.connectMutex.RLock()
if peer.conn == nil {
peer.connectMutex.RUnlock()
peer.connectMutex.Lock() peer.connectMutex.Lock()
defer peer.connectMutex.Unlock() defer peer.connectMutex.Unlock()
if peer.Conn == nil { if peer.conn == nil {
conn, err := net.Dial("tcp", peer.Address.String()+":"+strconv.Itoa(int(peer.PeeringPort))) conn, err := net.Dial("tcp", peer.Address.String()+":"+strconv.Itoa(int(peer.PeeringPort)))
if err != nil { if err != nil {
return nil, false, errors.New("error when connecting to " + peer.String() + ": " + err.Error()) return nil, false, errors.New("error when connecting to " + peer.String() + ": " + err.Error())
} else { } else {
peer.Conn = network.NewManagedConnection(conn) peer.conn = network.NewManagedConnection(conn)
peer.conn.Events.Close.Attach(events.NewClosure(func() {
peer.Conn.Events.Close.Attach(events.NewClosure(func() { peer.SetConn(nil)
peer.Conn = nil
})) }))
return peer.Conn, true, nil return peer.conn, true, nil
} }
} }
} else {
peer.connectMutex.RUnlock()
} }
return peer.Conn, false, nil return peer.conn, false, nil
} }
func (peer *Peer) ConnectUDP() (*network.ManagedConnection, bool, error) { func (peer *Peer) ConnectUDP() (*network.ManagedConnection, bool, error) {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment