From 39237b7ce78c2f16f70bd72a7ed0fe7373f95bea Mon Sep 17 00:00:00 2001
From: Hans Moog <hm@mkjc.net>
Date: Wed, 15 May 2019 13:50:23 +0200
Subject: [PATCH] Feat: gossip protocol nearly done

---
 plugins/analysis/client/plugin.go             |   2 +-
 plugins/autopeering/plugin.go                 |   5 +
 .../types/peerregister/peer_register.go       |  18 ++--
 plugins/gossip/events.go                      |   3 +
 plugins/gossip/neighbors.go                   | 102 ++++++++++++------
 plugins/gossip/protocol.go                    |  18 +---
 plugins/gossip/protocol_v1.go                 |   8 +-
 plugins/gossip/server.go                      |  22 +++-
 8 files changed, 113 insertions(+), 65 deletions(-)

diff --git a/plugins/analysis/client/plugin.go b/plugins/analysis/client/plugin.go
index 5233feed..c4c3970d 100644
--- a/plugins/analysis/client/plugin.go
+++ b/plugins/analysis/client/plugin.go
@@ -61,7 +61,7 @@ func setupHooks(conn *network.ManagedConnection, eventDispatchers *EventDispatch
     // define hooks ////////////////////////////////////////////////////////////////////////////////////////////////////
 
     onDiscoverPeer := events.NewClosure(func(p *peer.Peer) {
-        eventDispatchers.AddNode(p.Identity.Identifier)
+        go eventDispatchers.AddNode(p.Identity.Identifier)
     })
 
     onAddAcceptedNeighbor := events.NewClosure(func(p *peer.Peer) {
diff --git a/plugins/autopeering/plugin.go b/plugins/autopeering/plugin.go
index 9717eb36..d2c46c4d 100644
--- a/plugins/autopeering/plugin.go
+++ b/plugins/autopeering/plugin.go
@@ -37,6 +37,11 @@ func run(plugin *node.Plugin) {
 }
 
 func configureLogging(plugin *node.Plugin) {
+    gossip.Events.RemoveNeighbor.Attach(events.NewClosure(func(peer *gossip.Peer) {
+        chosenneighbors.INSTANCE.Remove(peer.Identity.StringIdentifier)
+        acceptedneighbors.INSTANCE.Remove(peer.Identity.StringIdentifier)
+    }))
+
     acceptedneighbors.INSTANCE.Events.Add.Attach(events.NewClosure(func(p *peer.Peer) {
         plugin.LogDebug("accepted neighbor added: " + p.Address.String() + " / " + p.Identity.StringIdentifier)
 
diff --git a/plugins/autopeering/types/peerregister/peer_register.go b/plugins/autopeering/types/peerregister/peer_register.go
index e2ba3823..6502155e 100644
--- a/plugins/autopeering/types/peerregister/peer_register.go
+++ b/plugins/autopeering/types/peerregister/peer_register.go
@@ -62,14 +62,20 @@ func (this *PeerRegister) Lock() func() {
 }
 
 func (this *PeerRegister) Remove(key string, lock... bool) {
-    if len(lock) == 0 || lock[0] {
-        defer this.Lock()()
-    }
-
     if peerEntry, exists := this.Peers[key]; exists {
-        delete(this.Peers, key)
+        if len(lock) == 0 || lock[0] {
+            defer this.Lock()()
+
+            if peerEntry, exists := this.Peers[key]; exists {
+                delete(this.Peers, key)
+
+                this.Events.Remove.Trigger(peerEntry)
+            }
+        } else {
+            delete(this.Peers, key)
 
-        this.Events.Remove.Trigger(peerEntry)
+            this.Events.Remove.Trigger(peerEntry)
+        }
     }
 }
 
diff --git a/plugins/gossip/events.go b/plugins/gossip/events.go
index faf64976..e22b0117 100644
--- a/plugins/gossip/events.go
+++ b/plugins/gossip/events.go
@@ -2,6 +2,7 @@ package gossip
 
 import (
     "github.com/iotaledger/goshimmer/packages/events"
+    "github.com/iotaledger/goshimmer/packages/identity"
     "github.com/iotaledger/goshimmer/packages/network"
     "github.com/iotaledger/goshimmer/packages/transaction"
 )
@@ -61,6 +62,8 @@ type protocolEvents struct {
 
 func intCaller(handler interface{}, params ...interface{}) { handler.(func(int))(params[0].(int)) }
 
+func identityCaller(handler interface{}, params ...interface{}) { handler.(func(*identity.Identity))(params[0].(*identity.Identity)) }
+
 func connectionCaller(handler interface{}, params ...interface{}) { handler.(func(*network.ManagedConnection))(params[0].(*network.ManagedConnection)) }
 
 func peerCaller(handler interface{}, params ...interface{}) { handler.(func(*Peer))(params[0].(*Peer)) }
diff --git a/plugins/gossip/neighbors.go b/plugins/gossip/neighbors.go
index 414dd489..c27c5f8b 100644
--- a/plugins/gossip/neighbors.go
+++ b/plugins/gossip/neighbors.go
@@ -7,9 +7,11 @@ import (
     "github.com/iotaledger/goshimmer/packages/identity"
     "github.com/iotaledger/goshimmer/packages/network"
     "github.com/iotaledger/goshimmer/packages/node"
+    "math"
     "net"
     "strconv"
     "sync"
+    "time"
 )
 
 func configureNeighbors(plugin *node.Plugin) {
@@ -30,6 +32,9 @@ func runNeighbors(plugin *node.Plugin) {
     plugin.LogInfo("Starting Neighbor Connection Manager ...")
 
     neighborLock.RLock()
+    for _, neighbor := range GetNeighbors() {
+        manageConnection(plugin, neighbor)
+    }
     neighborLock.RUnlock()
 
     Events.AddNeighbor.Attach(events.NewClosure(func(neighbor *Peer) {
@@ -39,21 +44,46 @@ func runNeighbors(plugin *node.Plugin) {
     plugin.LogSuccess("Starting Neighbor Connection Manager ... done")
 }
 
-func manageConnection(plugin *node.Plugin, neighbor *Peer, initiated bool) {
+func manageConnection(plugin *node.Plugin, neighbor *Peer) {
     daemon.BackgroundWorker(func() {
         failedConnectionAttempts := 0
 
-        for failedConnectionAttempts < MAX_CONNECTION_ATTEMPTS {
+        for failedConnectionAttempts < CONNECTION_MAX_ATTEMPTS {
             if neighbor, exists := GetNeighbor(neighbor.Identity.StringIdentifier); !exists {
                 return
             } else {
                 if conn, newConnection, err := neighbor.Connect(); err != nil {
                     failedConnectionAttempts++
 
-                    plugin.LogFailure("connection attempt [" + strconv.Itoa(int(failedConnectionAttempts)) + "/" + strconv.Itoa(MAX_CONNECTION_ATTEMPTS) + "] " + err.Error())
+                    plugin.LogFailure("connection attempt [" + strconv.Itoa(int(failedConnectionAttempts)) + "/" + strconv.Itoa(CONNECTION_MAX_ATTEMPTS) + "] " + err.Error())
+
+                    if failedConnectionAttempts <= CONNECTION_MAX_ATTEMPTS {
+                        select {
+                        case <-daemon.ShutdownSignal:
+                            return
+
+                        case <-time.After(time.Duration(int(math.Pow(2, float64(failedConnectionAttempts-1)))) * CONNECTION_BASE_TIMEOUT):
+                            // continue
+                        }
+                    }
                 } else {
+                    failedConnectionAttempts = 0
+
+                    disconnectChan := make(chan int, 1)
+                    conn.Events.Close.Attach(events.NewClosure(func() {
+                        close(disconnectChan)
+                    }))
+
                     if newConnection {
-                        newProtocol(conn).init()
+                        go newProtocol(conn).init()
+                    }
+
+                    select {
+                    case <-daemon.ShutdownSignal:
+                        return
+
+                    case <-disconnectChan:
+                        break
                     }
                 }
             }
@@ -69,8 +99,8 @@ type Peer struct {
     Port               uint16
     InitiatedConn      *network.ManagedConnection
     AcceptedConn       *network.ManagedConnection
-    initiatedConnMutex sync.Mutex
-    acceptedConnMutex  sync.Mutex
+    initiatedConnMutex sync.RWMutex
+    acceptedConnMutex  sync.RWMutex
 }
 
 func UnmarshalPeer(data []byte) (*Peer, error) {
@@ -78,41 +108,42 @@ func UnmarshalPeer(data []byte) (*Peer, error) {
 }
 
 func (peer *Peer) Connect() (*network.ManagedConnection, bool, errors.IdentifiableError) {
+    peer.initiatedConnMutex.Lock()
+    defer peer.initiatedConnMutex.Unlock()
+
+    // return existing connections first
+    if peer.InitiatedConn != nil {
+        return peer.InitiatedConn, false, nil
+    }
+
     // if we already have an accepted connection -> use it instead
     if peer.AcceptedConn != nil {
-        peer.acceptedConnMutex.Lock()
+        peer.acceptedConnMutex.RLock()
         if peer.AcceptedConn != nil {
-            defer peer.acceptedConnMutex.Unlock()
+            defer peer.acceptedConnMutex.RUnlock()
 
             return peer.AcceptedConn, false, nil
         }
-        peer.acceptedConnMutex.Unlock()
+        peer.acceptedConnMutex.RUnlock()
     }
 
     // otherwise try to dial
-    peer.initiatedConnMutex.Lock()
-    defer peer.initiatedConnMutex.Unlock()
-
-    if peer.InitiatedConn != nil {
-        return peer.InitiatedConn, false, nil
-    } else {
-        conn, err := net.Dial("tcp", peer.Address.String()+":"+strconv.Itoa(int(peer.Port)))
-        if err != nil {
-            return nil, false, ErrConnectionFailed.Derive(err, "error when connecting to neighbor "+
-                peer.Identity.StringIdentifier+"@"+peer.Address.String()+":"+strconv.Itoa(int(peer.Port)))
-        }
+    conn, err := net.Dial("tcp", peer.Address.String()+":"+strconv.Itoa(int(peer.Port)))
+    if err != nil {
+        return nil, false, ErrConnectionFailed.Derive(err, "error when connecting to neighbor "+
+            peer.Identity.StringIdentifier+"@"+peer.Address.String()+":"+strconv.Itoa(int(peer.Port)))
+    }
 
-        peer.InitiatedConn = network.NewManagedConnection(conn)
+    peer.InitiatedConn = network.NewManagedConnection(conn)
 
-        peer.InitiatedConn.Events.Close.Attach(events.NewClosure(func() {
-            peer.initiatedConnMutex.Lock()
-            defer peer.initiatedConnMutex.Unlock()
+    peer.InitiatedConn.Events.Close.Attach(events.NewClosure(func() {
+        peer.initiatedConnMutex.Lock()
+        defer peer.initiatedConnMutex.Unlock()
 
-            peer.InitiatedConn = nil
-        }))
+        peer.InitiatedConn = nil
+    }))
 
-        return peer.InitiatedConn, true, nil
-    }
+    return peer.InitiatedConn, true, nil
 }
 
 func (peer *Peer) Marshal() []byte {
@@ -144,13 +175,15 @@ func AddNeighbor(newNeighbor *Peer) {
 }
 
 func RemoveNeighbor(identifier string) {
-    neighborLock.Lock()
-    defer neighborLock.Lock()
+    if _, exists := neighbors[identifier]; exists {
+        neighborLock.Lock()
+        defer neighborLock.Unlock()
 
-    if neighbor, exists := neighbors[identifier]; exists {
-        delete(neighbors, identifier)
+        if neighbor, exists := neighbors[identifier]; exists {
+            delete(neighbors, identifier)
 
-        Events.RemoveNeighbor.Trigger(neighbor)
+            Events.RemoveNeighbor.Trigger(neighbor)
+        }
     }
 }
 
@@ -176,7 +209,8 @@ func GetNeighbors() map[string]*Peer {
 }
 
 const (
-    MAX_CONNECTION_ATTEMPTS        = 5
+    CONNECTION_MAX_ATTEMPTS        = 5
+    CONNECTION_BASE_TIMEOUT        = 10 * time.Second
     MARSHALLED_NEIGHBOR_TOTAL_SIZE = 1
 )
 
diff --git a/plugins/gossip/protocol.go b/plugins/gossip/protocol.go
index bc5dbfd8..7ba2e322 100644
--- a/plugins/gossip/protocol.go
+++ b/plugins/gossip/protocol.go
@@ -1,7 +1,6 @@
 package gossip
 
 import (
-    "fmt"
     "github.com/iotaledger/goshimmer/packages/accountability"
     "github.com/iotaledger/goshimmer/packages/errors"
     "github.com/iotaledger/goshimmer/packages/events"
@@ -33,7 +32,7 @@ func newProtocol(conn *network.ManagedConnection) *protocol {
         CurrentState: &versionState{},
         Events: protocolEvents{
             ReceiveVersion:        events.NewEvent(intCaller),
-            ReceiveIdentification: events.NewEvent(peerCaller),
+            ReceiveIdentification: events.NewEvent(identityCaller),
         },
     }
 
@@ -43,8 +42,6 @@ func newProtocol(conn *network.ManagedConnection) *protocol {
 func (protocol *protocol) init() {
     var onClose, onReceiveData *events.Closure
 
-    fmt.Println("INIT")
-
     onReceiveData = events.NewClosure(protocol.parseData)
     onClose = events.NewClosure(func() {
         protocol.Conn.Events.ReceiveData.Detach(onReceiveData)
@@ -53,26 +50,15 @@ func (protocol *protocol) init() {
 
     protocol.Conn.Events.ReceiveData.Attach(onReceiveData)
     protocol.Conn.Events.Close.Attach(onClose)
-    protocol.Events.ReceiveVersion.Attach(events.NewClosure(func(version int) {
-        fmt.Println(version)
-    }))
-    protocol.Events.ReceiveIdentification.Attach(events.NewClosure(func(neighbor *Peer) {
-        fmt.Println(neighbor)
-    }))
 
     protocol.Conn.Write([]byte{1})
-    fmt.Println("SENT VERSION")
     protocol.Conn.Write(accountability.OWN_ID.Identifier)
 
-    fmt.Println(len(accountability.OWN_ID.Identifier))
-
     if signature, err := accountability.OWN_ID.Sign(accountability.OWN_ID.Identifier); err == nil {
         protocol.Conn.Write(signature)
-        fmt.Println(len(signature))
     }
-    fmt.Println("SENTSIGNATURE")
 
-    go protocol.Conn.Read(make([]byte, 1000))
+    protocol.Conn.Read(make([]byte, 1000))
 }
 
 func (protocol *protocol) parseData(data []byte) {
diff --git a/plugins/gossip/protocol_v1.go b/plugins/gossip/protocol_v1.go
index c033775c..09495280 100644
--- a/plugins/gossip/protocol_v1.go
+++ b/plugins/gossip/protocol_v1.go
@@ -32,18 +32,12 @@ func (state *indentificationStateV1) Consume(protocol *protocol, data []byte, of
             return bytesRead, ErrInvalidAuthenticationMessage.Derive(err, "invalid authentication message")
         } else {
             if neighbor, exists := GetNeighbor(receivedIdentity.StringIdentifier); exists {
-                neighbor.initiatedConnMutex.Lock()
-                if neighbor.InitiatedConn == nil {
-                    neighbor.InitiatedConn = protocol.Conn
-                }
-                neighbor.initiatedConnMutex.Unlock()
-
                 protocol.Neighbor = neighbor
             } else {
                 protocol.Neighbor = nil
             }
 
-            protocol.Events.ReceiveIdentification.Trigger(protocol.Neighbor)
+            protocol.Events.ReceiveIdentification.Trigger(receivedIdentity)
 
             protocol.CurrentState = newacceptanceStateV1()
             state.offset = 0
diff --git a/plugins/gossip/server.go b/plugins/gossip/server.go
index 1002389b..d323c9b6 100644
--- a/plugins/gossip/server.go
+++ b/plugins/gossip/server.go
@@ -3,6 +3,7 @@ package gossip
 import (
     "github.com/iotaledger/goshimmer/packages/daemon"
     "github.com/iotaledger/goshimmer/packages/events"
+    "github.com/iotaledger/goshimmer/packages/identity"
     "github.com/iotaledger/goshimmer/packages/network"
     "github.com/iotaledger/goshimmer/packages/network/tcp"
     "github.com/iotaledger/goshimmer/packages/node"
@@ -13,7 +14,26 @@ var TCPServer = tcp.NewServer()
 
 func configureServer(plugin *node.Plugin) {
     TCPServer.Events.Connect.Attach(events.NewClosure(func(conn *network.ManagedConnection) {
-        newProtocol(conn).init()
+        protocol := newProtocol(conn)
+
+        protocol.Events.ReceiveIdentification.Attach(events.NewClosure(func(identity *identity.Identity) {
+            if protocol.Neighbor != nil {
+                protocol.Neighbor.acceptedConnMutex.Lock()
+                if protocol.Neighbor.AcceptedConn == nil {
+                    protocol.Neighbor.AcceptedConn = protocol.Conn
+
+                    protocol.Neighbor.AcceptedConn.Events.Close.Attach(events.NewClosure(func() {
+                        protocol.Neighbor.acceptedConnMutex.Lock()
+                        defer protocol.Neighbor.acceptedConnMutex.Unlock()
+
+                        protocol.Neighbor.AcceptedConn = nil
+                    }))
+                }
+                protocol.Neighbor.acceptedConnMutex.Unlock()
+            }
+        }))
+
+        go protocol.init()
     }))
 
     daemon.Events.Shutdown.Attach(events.NewClosure(func() {
-- 
GitLab