From e07fd604a47394b10df7185147f904a3f35e4324 Mon Sep 17 00:00:00 2001
From: Hans Moog <hm@mkjc.net>
Date: Tue, 21 May 2019 02:23:03 +0200
Subject: [PATCH] Feat: added a sendqueue to the gossip module

---
 plugins/autopeering/plugin.go           |  26 +---
 plugins/gossip/events.go                |  17 ++-
 plugins/gossip/neighbors.go             | 107 ++++++++++------
 plugins/gossip/plugin.go                |   2 +
 plugins/gossip/protocol.go              |  31 +++--
 plugins/gossip/protocol_v1.go           |   8 ++
 plugins/gossip/send_queue.go            | 155 ++++++++++++++++++++++++
 plugins/gossip/server.go                |  21 +++-
 plugins/gossip/transaction_processor.go |  41 +------
 9 files changed, 297 insertions(+), 111 deletions(-)
 create mode 100644 plugins/gossip/send_queue.go

diff --git a/plugins/autopeering/plugin.go b/plugins/autopeering/plugin.go
index d2c46c4d..769c37ee 100644
--- a/plugins/autopeering/plugin.go
+++ b/plugins/autopeering/plugin.go
@@ -37,7 +37,7 @@ func run(plugin *node.Plugin) {
 }
 
 func configureLogging(plugin *node.Plugin) {
-    gossip.Events.RemoveNeighbor.Attach(events.NewClosure(func(peer *gossip.Peer) {
+    gossip.Events.RemoveNeighbor.Attach(events.NewClosure(func(peer *gossip.Neighbor) {
         chosenneighbors.INSTANCE.Remove(peer.Identity.StringIdentifier)
         acceptedneighbors.INSTANCE.Remove(peer.Identity.StringIdentifier)
     }))
@@ -45,11 +45,7 @@ func configureLogging(plugin *node.Plugin) {
     acceptedneighbors.INSTANCE.Events.Add.Attach(events.NewClosure(func(p *peer.Peer) {
         plugin.LogDebug("accepted neighbor added: " + p.Address.String() + " / " + p.Identity.StringIdentifier)
 
-        gossip.AddNeighbor(&gossip.Peer{
-            Identity: p.Identity,
-            Address: p.Address,
-            Port: p.GossipPort,
-        })
+        gossip.AddNeighbor(gossip.NewNeighbor(p.Identity, p.Address, p.GossipPort))
     }))
     acceptedneighbors.INSTANCE.Events.Remove.Attach(events.NewClosure(func(p *peer.Peer) {
         plugin.LogDebug("accepted neighbor removed: " + p.Address.String() + " / " + p.Identity.StringIdentifier)
@@ -60,11 +56,7 @@ func configureLogging(plugin *node.Plugin) {
     chosenneighbors.INSTANCE.Events.Add.Attach(events.NewClosure(func(p *peer.Peer) {
         plugin.LogDebug("chosen neighbor added: " + p.Address.String() + " / " + p.Identity.StringIdentifier)
 
-        gossip.AddNeighbor(&gossip.Peer{
-            Identity: p.Identity,
-            Address: p.Address,
-            Port: p.GossipPort,
-        })
+        gossip.AddNeighbor(gossip.NewNeighbor(p.Identity, p.Address, p.GossipPort))
     }))
     chosenneighbors.INSTANCE.Events.Remove.Attach(events.NewClosure(func(p *peer.Peer) {
         plugin.LogDebug("chosen neighbor removed: " + p.Address.String() + " / " + p.Identity.StringIdentifier)
@@ -76,22 +68,14 @@ func configureLogging(plugin *node.Plugin) {
         plugin.LogInfo("new peer discovered: " + p.Address.String() + " / " + p.Identity.StringIdentifier)
 
         if _, exists := gossip.GetNeighbor(p.Identity.StringIdentifier); exists {
-            gossip.AddNeighbor(&gossip.Peer{
-                Identity: p.Identity,
-                Address: p.Address,
-                Port: p.GossipPort,
-            })
+            gossip.AddNeighbor(gossip.NewNeighbor(p.Identity, p.Address, p.GossipPort))
         }
     }))
     knownpeers.INSTANCE.Events.Update.Attach(events.NewClosure(func(p *peer.Peer) {
         plugin.LogDebug("peer updated: " + p.Address.String() + " / " + p.Identity.StringIdentifier)
 
         if _, exists := gossip.GetNeighbor(p.Identity.StringIdentifier); exists {
-            gossip.AddNeighbor(&gossip.Peer{
-                Identity: p.Identity,
-                Address: p.Address,
-                Port: p.GossipPort,
-            })
+            gossip.AddNeighbor(gossip.NewNeighbor(p.Identity, p.Address, p.GossipPort))
         }
     }))
 }
diff --git a/plugins/gossip/events.go b/plugins/gossip/events.go
index 8930ab5f..b354280f 100644
--- a/plugins/gossip/events.go
+++ b/plugins/gossip/events.go
@@ -10,15 +10,15 @@ import (
 
 var Events = pluginEvents{
     // neighbor events
-    AddNeighbor:    events.NewEvent(peerCaller),
-    UpdateNeighbor: events.NewEvent(peerCaller),
-    RemoveNeighbor: events.NewEvent(peerCaller),
+    AddNeighbor:    events.NewEvent(neighborCaller),
+    UpdateNeighbor: events.NewEvent(neighborCaller),
+    RemoveNeighbor: events.NewEvent(neighborCaller),
 
     // low level network events
     IncomingConnection: events.NewEvent(connectionCaller),
 
     // high level protocol events
-    DropNeighbor:              events.NewEvent(peerCaller),
+    DropNeighbor:              events.NewEvent(neighborCaller),
     SendTransaction:           events.NewEvent(transactionCaller),
     SendTransactionRequest:    events.NewEvent(transactionCaller), // TODO
     ReceiveTransaction:        events.NewEvent(transactionCaller),
@@ -58,16 +58,23 @@ type protocolEvents struct {
     ReceiveDropConnection     *events.Event
     ReceiveTransactionData    *events.Event
     ReceiveRequestData        *events.Event
+    HandshakeCompleted        *events.Event
     Error                     *events.Event
 }
 
+type neighborEvents struct {
+    ProtocolConnectionEstablished *events.Event
+}
+
 func intCaller(handler interface{}, params ...interface{}) { handler.(func(int))(params[0].(int)) }
 
 func identityCaller(handler interface{}, params ...interface{}) { handler.(func(*identity.Identity))(params[0].(*identity.Identity)) }
 
 func connectionCaller(handler interface{}, params ...interface{}) { handler.(func(*network.ManagedConnection))(params[0].(*network.ManagedConnection)) }
 
-func peerCaller(handler interface{}, params ...interface{}) { handler.(func(*Peer))(params[0].(*Peer)) }
+func protocolCaller(handler interface{}, params ...interface{}) { handler.(func(*protocol))(params[0].(*protocol)) }
+
+func neighborCaller(handler interface{}, params ...interface{}) { handler.(func(*Neighbor))(params[0].(*Neighbor)) }
 
 func errorCaller(handler interface{}, params ...interface{}) { handler.(func(errors.IdentifiableError))(params[0].(errors.IdentifiableError)) }
 
diff --git a/plugins/gossip/neighbors.go b/plugins/gossip/neighbors.go
index 97a5d6f2..bfa3e055 100644
--- a/plugins/gossip/neighbors.go
+++ b/plugins/gossip/neighbors.go
@@ -1,6 +1,7 @@
 package gossip
 
 import (
+    "github.com/iotaledger/goshimmer/packages/accountability"
     "github.com/iotaledger/goshimmer/packages/daemon"
     "github.com/iotaledger/goshimmer/packages/errors"
     "github.com/iotaledger/goshimmer/packages/events"
@@ -15,15 +16,15 @@ import (
 )
 
 func configureNeighbors(plugin *node.Plugin) {
-    Events.AddNeighbor.Attach(events.NewClosure(func(neighbor *Peer) {
+    Events.AddNeighbor.Attach(events.NewClosure(func(neighbor *Neighbor) {
         plugin.LogSuccess("new neighbor added " + neighbor.Identity.StringIdentifier + "@" + neighbor.Address.String() + ":" + strconv.Itoa(int(neighbor.Port)))
     }))
 
-    Events.UpdateNeighbor.Attach(events.NewClosure(func(neighbor *Peer) {
+    Events.UpdateNeighbor.Attach(events.NewClosure(func(neighbor *Neighbor) {
         plugin.LogSuccess("existing neighbor updated " + neighbor.Identity.StringIdentifier + "@" + neighbor.Address.String() + ":" + strconv.Itoa(int(neighbor.Port)))
     }))
 
-    Events.RemoveNeighbor.Attach(events.NewClosure(func(neighbor *Peer) {
+    Events.RemoveNeighbor.Attach(events.NewClosure(func(neighbor *Neighbor) {
         plugin.LogSuccess("existing neighbor removed " + neighbor.Identity.StringIdentifier + "@" + neighbor.Address.String() + ":" + strconv.Itoa(int(neighbor.Port)))
     }))
 }
@@ -37,14 +38,14 @@ func runNeighbors(plugin *node.Plugin) {
     }
     neighborLock.RUnlock()
 
-    Events.AddNeighbor.Attach(events.NewClosure(func(neighbor *Peer) {
+    Events.AddNeighbor.Attach(events.NewClosure(func(neighbor *Neighbor) {
         manageConnection(plugin, neighbor)
     }))
 
     plugin.LogSuccess("Starting Neighbor Connection Manager ... done")
 }
 
-func manageConnection(plugin *node.Plugin, neighbor *Peer) {
+func manageConnection(plugin *node.Plugin, neighbor *Neighbor) {
     daemon.BackgroundWorker(func() {
         failedConnectionAttempts := 0
 
@@ -91,69 +92,99 @@ func manageConnection(plugin *node.Plugin, neighbor *Peer) {
     })
 }
 
-type Peer struct {
+type Neighbor struct {
     Identity               *identity.Identity
     Address                net.IP
     Port                   uint16
     InitiatedProtocol      *protocol
     AcceptedProtocol       *protocol
+    Events                 neighborEvents
     initiatedProtocolMutex sync.RWMutex
     acceptedProtocolMutex  sync.RWMutex
 }
 
-func UnmarshalPeer(data []byte) (*Peer, error) {
-    return &Peer{}, nil
+func NewNeighbor(identity *identity.Identity, address net.IP, port uint16) *Neighbor {
+    return &Neighbor{
+        Identity: identity,
+        Address:  address,
+        Port:     port,
+        Events: neighborEvents{
+            ProtocolConnectionEstablished: events.NewEvent(protocolCaller),
+        },
+    }
+}
+
+func UnmarshalPeer(data []byte) (*Neighbor, error) {
+    return &Neighbor{}, nil
 }
 
-func (peer *Peer) Connect() (*protocol, bool, errors.IdentifiableError) {
-    peer.initiatedProtocolMutex.Lock()
-    defer peer.initiatedProtocolMutex.Unlock()
+func (neighbor *Neighbor) Connect() (*protocol, bool, errors.IdentifiableError) {
+    neighbor.initiatedProtocolMutex.Lock()
+    defer neighbor.initiatedProtocolMutex.Unlock()
 
     // return existing connections first
-    if peer.InitiatedProtocol != nil {
-        return peer.InitiatedProtocol, false, nil
+    if neighbor.InitiatedProtocol != nil {
+        return neighbor.InitiatedProtocol, false, nil
     }
 
     // if we already have an accepted connection -> use it instead
-    if peer.AcceptedProtocol != nil {
-        peer.acceptedProtocolMutex.RLock()
-        if peer.AcceptedProtocol != nil {
-            defer peer.acceptedProtocolMutex.RUnlock()
+    if neighbor.AcceptedProtocol != nil {
+        neighbor.acceptedProtocolMutex.RLock()
+        if neighbor.AcceptedProtocol != nil {
+            defer neighbor.acceptedProtocolMutex.RUnlock()
 
-            return peer.AcceptedProtocol, false, nil
+            return neighbor.AcceptedProtocol, false, nil
         }
-        peer.acceptedProtocolMutex.RUnlock()
+        neighbor.acceptedProtocolMutex.RUnlock()
     }
 
     // otherwise try to dial
-    conn, err := net.Dial("tcp", peer.Address.String()+":"+strconv.Itoa(int(peer.Port)))
+    conn, err := net.Dial("tcp", neighbor.Address.String()+":"+strconv.Itoa(int(neighbor.Port)))
     if err != nil {
         return nil, false, ErrConnectionFailed.Derive(err, "error when connecting to neighbor "+
-            peer.Identity.StringIdentifier+"@"+peer.Address.String()+":"+strconv.Itoa(int(peer.Port)))
+            neighbor.Identity.StringIdentifier+"@"+neighbor.Address.String()+":"+strconv.Itoa(int(neighbor.Port)))
     }
 
-    peer.InitiatedProtocol = newProtocol(network.NewManagedConnection(conn))
+    neighbor.InitiatedProtocol = newProtocol(network.NewManagedConnection(conn))
+
+    neighbor.InitiatedProtocol.Conn.Events.Close.Attach(events.NewClosure(func() {
+        neighbor.initiatedProtocolMutex.Lock()
+        defer neighbor.initiatedProtocolMutex.Unlock()
+
+        neighbor.InitiatedProtocol = nil
+    }))
+
+    // drop the "secondary" connection upon successful handshake
+    neighbor.InitiatedProtocol.Events.HandshakeCompleted.Attach(events.NewClosure(func() {
+        if accountability.OWN_ID.StringIdentifier <= neighbor.Identity.StringIdentifier {
+            neighbor.acceptedProtocolMutex.Lock()
+            var acceptedProtocolConn *network.ManagedConnection
+            if neighbor.AcceptedProtocol != nil {
+                acceptedProtocolConn = neighbor.AcceptedProtocol.Conn
+            }
+            neighbor.acceptedProtocolMutex.Unlock()
 
-    peer.InitiatedProtocol.Conn.Events.Close.Attach(events.NewClosure(func() {
-        peer.initiatedProtocolMutex.Lock()
-        defer peer.initiatedProtocolMutex.Unlock()
+            if acceptedProtocolConn != nil {
+                _ = acceptedProtocolConn.Close()
+            }
+        }
 
-        peer.InitiatedProtocol = nil
+        neighbor.Events.ProtocolConnectionEstablished.Trigger(neighbor.InitiatedProtocol)
     }))
 
-    return peer.InitiatedProtocol, true, nil
+    return neighbor.InitiatedProtocol, true, nil
 }
 
-func (peer *Peer) Marshal() []byte {
+func (neighbor *Neighbor) Marshal() []byte {
     return nil
 }
 
-func (peer *Peer) Equals(other *Peer) bool {
-    return peer.Identity.StringIdentifier == peer.Identity.StringIdentifier &&
-        peer.Port == other.Port && peer.Address.String() == other.Address.String()
+func (neighbor *Neighbor) Equals(other *Neighbor) bool {
+    return neighbor.Identity.StringIdentifier == neighbor.Identity.StringIdentifier &&
+        neighbor.Port == other.Port && neighbor.Address.String() == other.Address.String()
 }
 
-func AddNeighbor(newNeighbor *Peer) {
+func AddNeighbor(newNeighbor *Neighbor) {
     neighborLock.Lock()
     defer neighborLock.Unlock()
 
@@ -185,7 +216,7 @@ func RemoveNeighbor(identifier string) {
     }
 }
 
-func GetNeighbor(identifier string) (*Peer, bool) {
+func GetNeighbor(identifier string) (*Neighbor, bool) {
     neighborLock.RLock()
     defer neighborLock.RUnlock()
 
@@ -194,11 +225,11 @@ func GetNeighbor(identifier string) (*Peer, bool) {
     return neighbor, exists
 }
 
-func GetNeighbors() map[string]*Peer {
+func GetNeighbors() map[string]*Neighbor {
     neighborLock.RLock()
     defer neighborLock.RUnlock()
 
-    result := make(map[string]*Peer)
+    result := make(map[string]*Neighbor)
     for id, neighbor := range neighbors {
         result[id] = neighbor
     }
@@ -207,10 +238,10 @@ func GetNeighbors() map[string]*Peer {
 }
 
 const (
-    CONNECTION_MAX_ATTEMPTS        = 5
-    CONNECTION_BASE_TIMEOUT        = 10 * time.Second
+    CONNECTION_MAX_ATTEMPTS = 5
+    CONNECTION_BASE_TIMEOUT = 10 * time.Second
 )
 
-var neighbors = make(map[string]*Peer)
+var neighbors = make(map[string]*Neighbor)
 
 var neighborLock sync.RWMutex
diff --git a/plugins/gossip/plugin.go b/plugins/gossip/plugin.go
index 73488f03..5262b9e4 100644
--- a/plugins/gossip/plugin.go
+++ b/plugins/gossip/plugin.go
@@ -11,6 +11,7 @@ var PLUGIN = node.NewPlugin("Gossip", configure, run)
 func configure(plugin *node.Plugin) {
     configureNeighbors(plugin)
     configureServer(plugin)
+    configureSendQueue(plugin)
     configureTransactionProcessor(plugin)
 
     Events.ReceiveTransaction.Attach(events.NewClosure(func(transaction *transaction.Transaction) {
@@ -21,5 +22,6 @@ func configure(plugin *node.Plugin) {
 func run(plugin *node.Plugin) {
     runNeighbors(plugin)
     runServer(plugin)
+    runSendQueue(plugin)
     runTransactionProcessor(plugin)
 }
diff --git a/plugins/gossip/protocol.go b/plugins/gossip/protocol.go
index e8b9d56c..f05e6c8c 100644
--- a/plugins/gossip/protocol.go
+++ b/plugins/gossip/protocol.go
@@ -20,13 +20,16 @@ var DEFAULT_PROTOCOL = protocolDefinition{
 // region protocol /////////////////////////////////////////////////////////////////////////////////////////////////////
 
 type protocol struct {
-    Conn           *network.ManagedConnection
-    Neighbor       *Peer
-    Version        byte
-    SendState      protocolState
-    ReceivingState protocolState
-    Events         protocolEvents
-    sendMutex      sync.Mutex
+    Conn                      *network.ManagedConnection
+    Neighbor                  *Neighbor
+    Version                   byte
+    sendHandshakeCompleted    bool
+    receiveHandshakeCompleted bool
+    SendState                 protocolState
+    ReceivingState            protocolState
+    Events                    protocolEvents
+    sendMutex                 sync.Mutex
+    handshakeMutex            sync.Mutex
 }
 
 func newProtocol(conn *network.ManagedConnection) *protocol {
@@ -37,8 +40,11 @@ func newProtocol(conn *network.ManagedConnection) *protocol {
             ReceiveIdentification:     events.NewEvent(identityCaller),
             ReceiveConnectionAccepted: events.NewEvent(events.CallbackCaller),
             ReceiveConnectionRejected: events.NewEvent(events.CallbackCaller),
+            HandshakeCompleted:        events.NewEvent(events.CallbackCaller),
             Error:                     events.NewEvent(errorCaller),
         },
+        sendHandshakeCompleted:    false,
+        receiveHandshakeCompleted: false,
     }
 
     protocol.SendState = &versionState{protocol: protocol}
@@ -50,15 +56,26 @@ func newProtocol(conn *network.ManagedConnection) *protocol {
 func (protocol *protocol) Init() {
     // setup event handlers
     onReceiveData := events.NewClosure(protocol.Receive)
+    onConnectionAccepted := events.NewClosure(func() {
+        protocol.handshakeMutex.Lock()
+        defer protocol.handshakeMutex.Unlock()
+
+        protocol.receiveHandshakeCompleted = true
+        if protocol.sendHandshakeCompleted {
+            protocol.Events.HandshakeCompleted.Trigger()
+        }
+    })
     var onClose *events.Closure
     onClose = events.NewClosure(func() {
         protocol.Conn.Events.ReceiveData.Detach(onReceiveData)
         protocol.Conn.Events.Close.Detach(onClose)
+        protocol.Events.ReceiveConnectionAccepted.Detach(onConnectionAccepted)
     })
 
     // region register event handlers
     protocol.Conn.Events.ReceiveData.Attach(onReceiveData)
     protocol.Conn.Events.Close.Attach(onClose)
+    protocol.Events.ReceiveConnectionAccepted.Attach(onConnectionAccepted)
 
     // send protocol version
     if err := protocol.Send(DEFAULT_PROTOCOL.version); err != nil {
diff --git a/plugins/gossip/protocol_v1.go b/plugins/gossip/protocol_v1.go
index bfd3f32d..1b841f6b 100644
--- a/plugins/gossip/protocol_v1.go
+++ b/plugins/gossip/protocol_v1.go
@@ -27,6 +27,14 @@ func protocolV1(protocol *protocol) errors.IdentifiableError {
             if err := protocol.Send(CONNECTION_ACCEPT); err != nil {
                 return
             }
+
+            protocol.handshakeMutex.Lock()
+            defer protocol.handshakeMutex.Unlock()
+
+            protocol.sendHandshakeCompleted = true
+            if protocol.receiveHandshakeCompleted {
+                protocol.Events.HandshakeCompleted.Trigger()
+            }
         }
     })
 
diff --git a/plugins/gossip/send_queue.go b/plugins/gossip/send_queue.go
new file mode 100644
index 00000000..23443a1d
--- /dev/null
+++ b/plugins/gossip/send_queue.go
@@ -0,0 +1,155 @@
+package gossip
+
+import (
+    "github.com/iotaledger/goshimmer/packages/daemon"
+    "github.com/iotaledger/goshimmer/packages/events"
+    "github.com/iotaledger/goshimmer/packages/node"
+    "github.com/iotaledger/goshimmer/packages/transaction"
+    "sync"
+)
+
+// region plugin module setup //////////////////////////////////////////////////////////////////////////////////////////
+
+func configureSendQueue(plugin *node.Plugin) {
+    for _, neighbor := range GetNeighbors() {
+        setupEventHandlers(neighbor)
+    }
+
+    Events.AddNeighbor.Attach(events.NewClosure(setupEventHandlers))
+
+    daemon.Events.Shutdown.Attach(events.NewClosure(func() {
+        plugin.LogInfo("Stopping Send Queue Dispatcher ...")
+    }))
+}
+
+func runSendQueue(plugin *node.Plugin) {
+    plugin.LogInfo("Starting Send Queue Dispatcher ...")
+
+    daemon.BackgroundWorker(func() {
+        plugin.LogSuccess("Starting Send Queue Dispatcher ... done")
+
+        for {
+            select {
+            case <-daemon.ShutdownSignal:
+                plugin.LogSuccess("Stopping Send Queue Dispatcher ... done")
+
+                return
+
+            case tx := <-sendQueue:
+                connectedNeighborsMutex.RLock()
+                for _, neighborQueue := range neighborQueues {
+                    select {
+                    case neighborQueue.queue <- tx:
+                        return
+
+                    default:
+                        return
+                    }
+                }
+                connectedNeighborsMutex.RUnlock()
+            }
+        }
+    })
+
+    connectedNeighborsMutex.Lock()
+    for _, neighborQueue := range neighborQueues {
+        startNeighborSendQueue(neighborQueue)
+    }
+    connectedNeighborsMutex.Unlock()
+}
+
+// endregion ///////////////////////////////////////////////////////////////////////////////////////////////////////////
+
+// region public api ///////////////////////////////////////////////////////////////////////////////////////////////////
+
+func SendTransaction(transaction *transaction.Transaction) {
+    sendQueue <- transaction
+}
+
+func (neighbor *Neighbor) SendTransaction(transaction *transaction.Transaction) {
+    if queue, exists := neighborQueues[neighbor.Identity.StringIdentifier]; exists {
+        select {
+        case queue.queue <- transaction:
+            return
+
+        default:
+            return
+        }
+    }
+}
+
+// endregion ///////////////////////////////////////////////////////////////////////////////////////////////////////////
+
+// region utility methods //////////////////////////////////////////////////////////////////////////////////////////////
+
+func setupEventHandlers(neighbor *Neighbor) {
+    neighbor.Events.ProtocolConnectionEstablished.Attach(events.NewClosure(func(protocol *protocol) {
+        queue := &neighborQueue{
+            protocol:       protocol,
+            queue:          make(chan *transaction.Transaction, SEND_QUEUE_SIZE),
+            disconnectChan: make(chan int, 1),
+        }
+
+        connectedNeighborsMutex.Lock()
+        neighborQueues[neighbor.Identity.StringIdentifier] = queue
+        connectedNeighborsMutex.Unlock()
+
+        protocol.Conn.Events.Close.Attach(events.NewClosure(func() {
+            close(queue.disconnectChan)
+
+            connectedNeighborsMutex.Lock()
+            delete(neighborQueues, neighbor.Identity.StringIdentifier)
+            connectedNeighborsMutex.Unlock()
+        }))
+
+        if(daemon.IsRunning()) {
+            startNeighborSendQueue(queue)
+        }
+    }))
+}
+
+func startNeighborSendQueue(neighborQueue *neighborQueue) {
+    daemon.BackgroundWorker(func() {
+        for {
+            select {
+            case <-daemon.ShutdownSignal:
+                return
+
+            case <-neighborQueue.disconnectChan:
+                return
+
+            case tx := <-neighborQueue.queue:
+                switch neighborQueue.protocol.Version {
+                case VERSION_1:
+                    sendTransactionV1(neighborQueue.protocol, tx)
+                }
+            }
+        }
+    })
+}
+
+// endregion ///////////////////////////////////////////////////////////////////////////////////////////////////////////
+
+// region types and interfaces /////////////////////////////////////////////////////////////////////////////////////////
+
+type neighborQueue struct {
+    protocol       *protocol
+    queue          chan *transaction.Transaction
+    disconnectChan chan int
+}
+
+// endregion ///////////////////////////////////////////////////////////////////////////////////////////////////////////
+
+// region constants and variables //////////////////////////////////////////////////////////////////////////////////////
+
+var neighborQueues = make(map[string]*neighborQueue)
+
+var connectedNeighborsMutex sync.RWMutex
+
+var sendQueue = make(chan *transaction.Transaction, SEND_QUEUE_SIZE)
+
+const (
+    SEND_QUEUE_SIZE = 500
+)
+
+// endregion ///////////////////////////////////////////////////////////////////////////////////////////////////////////
diff --git a/plugins/gossip/server.go b/plugins/gossip/server.go
index 1d9f08e1..264fd0b0 100644
--- a/plugins/gossip/server.go
+++ b/plugins/gossip/server.go
@@ -1,6 +1,7 @@
 package gossip
 
 import (
+    "github.com/iotaledger/goshimmer/packages/accountability"
     "github.com/iotaledger/goshimmer/packages/daemon"
     "github.com/iotaledger/goshimmer/packages/errors"
     "github.com/iotaledger/goshimmer/packages/events"
@@ -22,7 +23,7 @@ func configureServer(plugin *node.Plugin) {
             plugin.LogFailure(err.Error())
         }))
 
-        // store connection in neighbor if its a neighbor calling
+        // store protocol in neighbor if its a neighbor calling
         protocol.Events.ReceiveIdentification.Attach(events.NewClosure(func(identity *identity.Identity) {
             if protocol.Neighbor != nil {
                 protocol.Neighbor.acceptedProtocolMutex.Lock()
@@ -40,6 +41,24 @@ func configureServer(plugin *node.Plugin) {
             }
         }))
 
+        // drop the "secondary" connection upon successful handshake
+        protocol.Events.HandshakeCompleted.Attach(events.NewClosure(func() {
+            if protocol.Neighbor.Identity.StringIdentifier <= accountability.OWN_ID.StringIdentifier {
+                protocol.Neighbor.initiatedProtocolMutex.Lock()
+                var initiatedProtocolConn *network.ManagedConnection
+                if protocol.Neighbor.InitiatedProtocol != nil {
+                    initiatedProtocolConn = protocol.Neighbor.InitiatedProtocol.Conn
+                }
+                protocol.Neighbor.initiatedProtocolMutex.Unlock()
+
+                if initiatedProtocolConn != nil {
+                    _ = initiatedProtocolConn.Close()
+                }
+            }
+
+            protocol.Neighbor.Events.ProtocolConnectionEstablished.Trigger(protocol)
+        }))
+
         go protocol.Init()
     }))
 
diff --git a/plugins/gossip/transaction_processor.go b/plugins/gossip/transaction_processor.go
index aae8362a..2dae5050 100644
--- a/plugins/gossip/transaction_processor.go
+++ b/plugins/gossip/transaction_processor.go
@@ -1,8 +1,6 @@
 package gossip
 
 import (
-    "github.com/iotaledger/goshimmer/packages/daemon"
-    "github.com/iotaledger/goshimmer/packages/events"
     "github.com/iotaledger/goshimmer/packages/filter"
     "github.com/iotaledger/goshimmer/packages/node"
     "github.com/iotaledger/goshimmer/packages/transaction"
@@ -10,55 +8,20 @@ import (
 
 var transactionFilter = filter.NewByteArrayFilter(TRANSACTION_FILTER_SIZE)
 
-var sendQueue = make(chan *transaction.Transaction, TRANSACTION_SEND_QUEUE_SIZE)
-
-func SendTransaction(transaction *transaction.Transaction) {
-    sendQueue <- transaction
-}
-
-func SendTransactionToNeighbor(neighbor *Peer, transaction *transaction.Transaction) {
-    switch neighbor.AcceptedProtocol.Version {
-    case VERSION_1:
-        sendTransactionV1(neighbor.AcceptedProtocol, transaction)
-    }
-}
-
 func processIncomingTransactionData(transactionData []byte) {
     if transactionFilter.Add(transactionData) {
         Events.ReceiveTransaction.Trigger(transaction.FromBytes(transactionData))
     }
 }
 
+
+
 func configureTransactionProcessor(plugin *node.Plugin) {
-    daemon.Events.Shutdown.Attach(events.NewClosure(func() {
-        plugin.LogInfo("Stopping Transaction Processor ...")
-    }))
 }
 
 func runTransactionProcessor(plugin *node.Plugin) {
-    plugin.LogInfo("Starting Transaction Processor ...")
-
-    daemon.BackgroundWorker(func() {
-        plugin.LogSuccess("Starting Transaction Processor ... done")
-
-        for {
-            select {
-            case <- daemon.ShutdownSignal:
-                plugin.LogSuccess("Stopping Transaction Processor ... done")
-
-                return
-
-            case tx := <-sendQueue:
-                for _, neighbor := range GetNeighbors() {
-                    SendTransactionToNeighbor(neighbor, tx)
-                }
-            }
-        }
-    })
 }
 
 const (
     TRANSACTION_FILTER_SIZE = 5000
-
-    TRANSACTION_SEND_QUEUE_SIZE = 500
 )
-- 
GitLab