diff --git a/plugins/analysis/client/plugin.go b/plugins/analysis/client/plugin.go index 5233feedf15d02b9c0686516265a983648cefa9f..c4c3970d009ca403711c37c286b5968427b09133 100644 --- a/plugins/analysis/client/plugin.go +++ b/plugins/analysis/client/plugin.go @@ -61,7 +61,7 @@ func setupHooks(conn *network.ManagedConnection, eventDispatchers *EventDispatch // define hooks //////////////////////////////////////////////////////////////////////////////////////////////////// onDiscoverPeer := events.NewClosure(func(p *peer.Peer) { - eventDispatchers.AddNode(p.Identity.Identifier) + go eventDispatchers.AddNode(p.Identity.Identifier) }) onAddAcceptedNeighbor := events.NewClosure(func(p *peer.Peer) { diff --git a/plugins/autopeering/plugin.go b/plugins/autopeering/plugin.go index 9717eb36637b9340600cfca9f415d9cd1534ecd8..d2c46c4de425f57985c3079afa705ae264d65e28 100644 --- a/plugins/autopeering/plugin.go +++ b/plugins/autopeering/plugin.go @@ -37,6 +37,11 @@ func run(plugin *node.Plugin) { } func configureLogging(plugin *node.Plugin) { + gossip.Events.RemoveNeighbor.Attach(events.NewClosure(func(peer *gossip.Peer) { + chosenneighbors.INSTANCE.Remove(peer.Identity.StringIdentifier) + acceptedneighbors.INSTANCE.Remove(peer.Identity.StringIdentifier) + })) + acceptedneighbors.INSTANCE.Events.Add.Attach(events.NewClosure(func(p *peer.Peer) { plugin.LogDebug("accepted neighbor added: " + p.Address.String() + " / " + p.Identity.StringIdentifier) diff --git a/plugins/autopeering/types/peerregister/peer_register.go b/plugins/autopeering/types/peerregister/peer_register.go index e2ba382315fff7b35b2a4674f772ede140c2c832..6502155effa683ec470b7047ea1731656de7ac71 100644 --- a/plugins/autopeering/types/peerregister/peer_register.go +++ b/plugins/autopeering/types/peerregister/peer_register.go @@ -62,14 +62,20 @@ func (this *PeerRegister) Lock() func() { } func (this *PeerRegister) Remove(key string, lock... bool) { - if len(lock) == 0 || lock[0] { - defer this.Lock()() - } - if peerEntry, exists := this.Peers[key]; exists { - delete(this.Peers, key) + if len(lock) == 0 || lock[0] { + defer this.Lock()() + + if peerEntry, exists := this.Peers[key]; exists { + delete(this.Peers, key) + + this.Events.Remove.Trigger(peerEntry) + } + } else { + delete(this.Peers, key) - this.Events.Remove.Trigger(peerEntry) + this.Events.Remove.Trigger(peerEntry) + } } } diff --git a/plugins/gossip/events.go b/plugins/gossip/events.go index faf649765b82b82242f63fa2aec3e2cf5624e162..e22b01170def0d3c26eec906393805c27f2382e9 100644 --- a/plugins/gossip/events.go +++ b/plugins/gossip/events.go @@ -2,6 +2,7 @@ package gossip import ( "github.com/iotaledger/goshimmer/packages/events" + "github.com/iotaledger/goshimmer/packages/identity" "github.com/iotaledger/goshimmer/packages/network" "github.com/iotaledger/goshimmer/packages/transaction" ) @@ -61,6 +62,8 @@ type protocolEvents struct { func intCaller(handler interface{}, params ...interface{}) { handler.(func(int))(params[0].(int)) } +func identityCaller(handler interface{}, params ...interface{}) { handler.(func(*identity.Identity))(params[0].(*identity.Identity)) } + func connectionCaller(handler interface{}, params ...interface{}) { handler.(func(*network.ManagedConnection))(params[0].(*network.ManagedConnection)) } func peerCaller(handler interface{}, params ...interface{}) { handler.(func(*Peer))(params[0].(*Peer)) } diff --git a/plugins/gossip/neighbors.go b/plugins/gossip/neighbors.go index 414dd48962acfeaf9b2349dc1bf604dbef40f693..c27c5f8b4559f2196cdcb9c85d497fc57fcff6b2 100644 --- a/plugins/gossip/neighbors.go +++ b/plugins/gossip/neighbors.go @@ -7,9 +7,11 @@ import ( "github.com/iotaledger/goshimmer/packages/identity" "github.com/iotaledger/goshimmer/packages/network" "github.com/iotaledger/goshimmer/packages/node" + "math" "net" "strconv" "sync" + "time" ) func configureNeighbors(plugin *node.Plugin) { @@ -30,6 +32,9 @@ func runNeighbors(plugin *node.Plugin) { plugin.LogInfo("Starting Neighbor Connection Manager ...") neighborLock.RLock() + for _, neighbor := range GetNeighbors() { + manageConnection(plugin, neighbor) + } neighborLock.RUnlock() Events.AddNeighbor.Attach(events.NewClosure(func(neighbor *Peer) { @@ -39,21 +44,46 @@ func runNeighbors(plugin *node.Plugin) { plugin.LogSuccess("Starting Neighbor Connection Manager ... done") } -func manageConnection(plugin *node.Plugin, neighbor *Peer, initiated bool) { +func manageConnection(plugin *node.Plugin, neighbor *Peer) { daemon.BackgroundWorker(func() { failedConnectionAttempts := 0 - for failedConnectionAttempts < MAX_CONNECTION_ATTEMPTS { + for failedConnectionAttempts < CONNECTION_MAX_ATTEMPTS { if neighbor, exists := GetNeighbor(neighbor.Identity.StringIdentifier); !exists { return } else { if conn, newConnection, err := neighbor.Connect(); err != nil { failedConnectionAttempts++ - plugin.LogFailure("connection attempt [" + strconv.Itoa(int(failedConnectionAttempts)) + "/" + strconv.Itoa(MAX_CONNECTION_ATTEMPTS) + "] " + err.Error()) + plugin.LogFailure("connection attempt [" + strconv.Itoa(int(failedConnectionAttempts)) + "/" + strconv.Itoa(CONNECTION_MAX_ATTEMPTS) + "] " + err.Error()) + + if failedConnectionAttempts <= CONNECTION_MAX_ATTEMPTS { + select { + case <-daemon.ShutdownSignal: + return + + case <-time.After(time.Duration(int(math.Pow(2, float64(failedConnectionAttempts-1)))) * CONNECTION_BASE_TIMEOUT): + // continue + } + } } else { + failedConnectionAttempts = 0 + + disconnectChan := make(chan int, 1) + conn.Events.Close.Attach(events.NewClosure(func() { + close(disconnectChan) + })) + if newConnection { - newProtocol(conn).init() + go newProtocol(conn).init() + } + + select { + case <-daemon.ShutdownSignal: + return + + case <-disconnectChan: + break } } } @@ -69,8 +99,8 @@ type Peer struct { Port uint16 InitiatedConn *network.ManagedConnection AcceptedConn *network.ManagedConnection - initiatedConnMutex sync.Mutex - acceptedConnMutex sync.Mutex + initiatedConnMutex sync.RWMutex + acceptedConnMutex sync.RWMutex } func UnmarshalPeer(data []byte) (*Peer, error) { @@ -78,41 +108,42 @@ func UnmarshalPeer(data []byte) (*Peer, error) { } func (peer *Peer) Connect() (*network.ManagedConnection, bool, errors.IdentifiableError) { + peer.initiatedConnMutex.Lock() + defer peer.initiatedConnMutex.Unlock() + + // return existing connections first + if peer.InitiatedConn != nil { + return peer.InitiatedConn, false, nil + } + // if we already have an accepted connection -> use it instead if peer.AcceptedConn != nil { - peer.acceptedConnMutex.Lock() + peer.acceptedConnMutex.RLock() if peer.AcceptedConn != nil { - defer peer.acceptedConnMutex.Unlock() + defer peer.acceptedConnMutex.RUnlock() return peer.AcceptedConn, false, nil } - peer.acceptedConnMutex.Unlock() + peer.acceptedConnMutex.RUnlock() } // otherwise try to dial - peer.initiatedConnMutex.Lock() - defer peer.initiatedConnMutex.Unlock() - - if peer.InitiatedConn != nil { - return peer.InitiatedConn, false, nil - } else { - conn, err := net.Dial("tcp", peer.Address.String()+":"+strconv.Itoa(int(peer.Port))) - if err != nil { - return nil, false, ErrConnectionFailed.Derive(err, "error when connecting to neighbor "+ - peer.Identity.StringIdentifier+"@"+peer.Address.String()+":"+strconv.Itoa(int(peer.Port))) - } + conn, err := net.Dial("tcp", peer.Address.String()+":"+strconv.Itoa(int(peer.Port))) + if err != nil { + return nil, false, ErrConnectionFailed.Derive(err, "error when connecting to neighbor "+ + peer.Identity.StringIdentifier+"@"+peer.Address.String()+":"+strconv.Itoa(int(peer.Port))) + } - peer.InitiatedConn = network.NewManagedConnection(conn) + peer.InitiatedConn = network.NewManagedConnection(conn) - peer.InitiatedConn.Events.Close.Attach(events.NewClosure(func() { - peer.initiatedConnMutex.Lock() - defer peer.initiatedConnMutex.Unlock() + peer.InitiatedConn.Events.Close.Attach(events.NewClosure(func() { + peer.initiatedConnMutex.Lock() + defer peer.initiatedConnMutex.Unlock() - peer.InitiatedConn = nil - })) + peer.InitiatedConn = nil + })) - return peer.InitiatedConn, true, nil - } + return peer.InitiatedConn, true, nil } func (peer *Peer) Marshal() []byte { @@ -144,13 +175,15 @@ func AddNeighbor(newNeighbor *Peer) { } func RemoveNeighbor(identifier string) { - neighborLock.Lock() - defer neighborLock.Lock() + if _, exists := neighbors[identifier]; exists { + neighborLock.Lock() + defer neighborLock.Unlock() - if neighbor, exists := neighbors[identifier]; exists { - delete(neighbors, identifier) + if neighbor, exists := neighbors[identifier]; exists { + delete(neighbors, identifier) - Events.RemoveNeighbor.Trigger(neighbor) + Events.RemoveNeighbor.Trigger(neighbor) + } } } @@ -176,7 +209,8 @@ func GetNeighbors() map[string]*Peer { } const ( - MAX_CONNECTION_ATTEMPTS = 5 + CONNECTION_MAX_ATTEMPTS = 5 + CONNECTION_BASE_TIMEOUT = 10 * time.Second MARSHALLED_NEIGHBOR_TOTAL_SIZE = 1 ) diff --git a/plugins/gossip/protocol.go b/plugins/gossip/protocol.go index bc5dbfd83328aef30f94cdc00848457ae74961fa..7ba2e3223074ee43ae24b7979f86768249da3840 100644 --- a/plugins/gossip/protocol.go +++ b/plugins/gossip/protocol.go @@ -1,7 +1,6 @@ package gossip import ( - "fmt" "github.com/iotaledger/goshimmer/packages/accountability" "github.com/iotaledger/goshimmer/packages/errors" "github.com/iotaledger/goshimmer/packages/events" @@ -33,7 +32,7 @@ func newProtocol(conn *network.ManagedConnection) *protocol { CurrentState: &versionState{}, Events: protocolEvents{ ReceiveVersion: events.NewEvent(intCaller), - ReceiveIdentification: events.NewEvent(peerCaller), + ReceiveIdentification: events.NewEvent(identityCaller), }, } @@ -43,8 +42,6 @@ func newProtocol(conn *network.ManagedConnection) *protocol { func (protocol *protocol) init() { var onClose, onReceiveData *events.Closure - fmt.Println("INIT") - onReceiveData = events.NewClosure(protocol.parseData) onClose = events.NewClosure(func() { protocol.Conn.Events.ReceiveData.Detach(onReceiveData) @@ -53,26 +50,15 @@ func (protocol *protocol) init() { protocol.Conn.Events.ReceiveData.Attach(onReceiveData) protocol.Conn.Events.Close.Attach(onClose) - protocol.Events.ReceiveVersion.Attach(events.NewClosure(func(version int) { - fmt.Println(version) - })) - protocol.Events.ReceiveIdentification.Attach(events.NewClosure(func(neighbor *Peer) { - fmt.Println(neighbor) - })) protocol.Conn.Write([]byte{1}) - fmt.Println("SENT VERSION") protocol.Conn.Write(accountability.OWN_ID.Identifier) - fmt.Println(len(accountability.OWN_ID.Identifier)) - if signature, err := accountability.OWN_ID.Sign(accountability.OWN_ID.Identifier); err == nil { protocol.Conn.Write(signature) - fmt.Println(len(signature)) } - fmt.Println("SENTSIGNATURE") - go protocol.Conn.Read(make([]byte, 1000)) + protocol.Conn.Read(make([]byte, 1000)) } func (protocol *protocol) parseData(data []byte) { diff --git a/plugins/gossip/protocol_v1.go b/plugins/gossip/protocol_v1.go index c033775c74d579f65de4b7ecdfb715352804a01f..09495280de65cb881058e2760b66ada8024dc21b 100644 --- a/plugins/gossip/protocol_v1.go +++ b/plugins/gossip/protocol_v1.go @@ -32,18 +32,12 @@ func (state *indentificationStateV1) Consume(protocol *protocol, data []byte, of return bytesRead, ErrInvalidAuthenticationMessage.Derive(err, "invalid authentication message") } else { if neighbor, exists := GetNeighbor(receivedIdentity.StringIdentifier); exists { - neighbor.initiatedConnMutex.Lock() - if neighbor.InitiatedConn == nil { - neighbor.InitiatedConn = protocol.Conn - } - neighbor.initiatedConnMutex.Unlock() - protocol.Neighbor = neighbor } else { protocol.Neighbor = nil } - protocol.Events.ReceiveIdentification.Trigger(protocol.Neighbor) + protocol.Events.ReceiveIdentification.Trigger(receivedIdentity) protocol.CurrentState = newacceptanceStateV1() state.offset = 0 diff --git a/plugins/gossip/server.go b/plugins/gossip/server.go index 1002389bb5e2d96488727875f869836444e3ef9f..d323c9b699a111cb35b8adf5badf167598083902 100644 --- a/plugins/gossip/server.go +++ b/plugins/gossip/server.go @@ -3,6 +3,7 @@ package gossip import ( "github.com/iotaledger/goshimmer/packages/daemon" "github.com/iotaledger/goshimmer/packages/events" + "github.com/iotaledger/goshimmer/packages/identity" "github.com/iotaledger/goshimmer/packages/network" "github.com/iotaledger/goshimmer/packages/network/tcp" "github.com/iotaledger/goshimmer/packages/node" @@ -13,7 +14,26 @@ var TCPServer = tcp.NewServer() func configureServer(plugin *node.Plugin) { TCPServer.Events.Connect.Attach(events.NewClosure(func(conn *network.ManagedConnection) { - newProtocol(conn).init() + protocol := newProtocol(conn) + + protocol.Events.ReceiveIdentification.Attach(events.NewClosure(func(identity *identity.Identity) { + if protocol.Neighbor != nil { + protocol.Neighbor.acceptedConnMutex.Lock() + if protocol.Neighbor.AcceptedConn == nil { + protocol.Neighbor.AcceptedConn = protocol.Conn + + protocol.Neighbor.AcceptedConn.Events.Close.Attach(events.NewClosure(func() { + protocol.Neighbor.acceptedConnMutex.Lock() + defer protocol.Neighbor.acceptedConnMutex.Unlock() + + protocol.Neighbor.AcceptedConn = nil + })) + } + protocol.Neighbor.acceptedConnMutex.Unlock() + } + })) + + go protocol.init() })) daemon.Events.Shutdown.Attach(events.NewClosure(func() {