From 0df591f2ef47032756477d7eaa4abe3ef00b83ef Mon Sep 17 00:00:00 2001
From: Jonas Theis <mail@jonastheis.de>
Date: Thu, 18 Jun 2020 16:44:08 +0200
Subject: [PATCH] Decouple communication layer message processing (#497)

* Introduce worker pool to decouple receiving of messages and processing them

* Adjust shutdown/close order

* copy data before submitting it to the packet worker pool

Co-authored-by: Luca Moser <moser.luca@gmail.com>
---
 go.mod                                |  2 +-
 go.sum                                |  4 ++--
 packages/gossip/manager.go            | 16 +++++++++++++---
 tools/integration-tests/tester/go.mod |  2 +-
 tools/integration-tests/tester/go.sum |  4 ++--
 5 files changed, 19 insertions(+), 9 deletions(-)

diff --git a/go.mod b/go.mod
index 4b05e611..c2392c60 100644
--- a/go.mod
+++ b/go.mod
@@ -12,7 +12,7 @@ require (
 	github.com/golang/protobuf v1.3.5
 	github.com/google/go-cmp v0.4.0
 	github.com/gorilla/websocket v1.4.1
-	github.com/iotaledger/hive.go v0.0.0-20200610104211-d603429af242
+	github.com/iotaledger/hive.go v0.0.0-20200617164933-c48b4401b814
 	github.com/iotaledger/iota.go v1.0.0-beta.14
 	github.com/labstack/echo v3.3.10+incompatible
 	github.com/labstack/gommon v0.3.0
diff --git a/go.sum b/go.sum
index 9c42f993..57583113 100644
--- a/go.sum
+++ b/go.sum
@@ -139,8 +139,8 @@ github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T
 github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI=
 github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
 github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
-github.com/iotaledger/hive.go v0.0.0-20200610104211-d603429af242 h1:uHMFmfrP6O6lp1lCHT6lpFwHFWYk77V0nUlGbhneQHI=
-github.com/iotaledger/hive.go v0.0.0-20200610104211-d603429af242/go.mod h1:zwZhaE4ZeglpTrbmbwdnVPMI5XdRu2RmByi3Qn0ztmU=
+github.com/iotaledger/hive.go v0.0.0-20200617164933-c48b4401b814 h1:9Cg6q13ngg3/UxBPQjdDREYr+792HO8SN8KzI1t7xFY=
+github.com/iotaledger/hive.go v0.0.0-20200617164933-c48b4401b814/go.mod h1:zwZhaE4ZeglpTrbmbwdnVPMI5XdRu2RmByi3Qn0ztmU=
 github.com/iotaledger/iota.go v1.0.0-beta.9/go.mod h1:F6WBmYd98mVjAmmPVYhnxg8NNIWCjjH8VWT9qvv3Rc8=
 github.com/iotaledger/iota.go v1.0.0-beta.14 h1:Oeb28MfBuJEeXcGrLhTCJFtbsnc8y1u7xidsAmiOD5A=
 github.com/iotaledger/iota.go v1.0.0-beta.14/go.mod h1:F6WBmYd98mVjAmmPVYhnxg8NNIWCjjH8VWT9qvv3Rc8=
diff --git a/packages/gossip/manager.go b/packages/gossip/manager.go
index 128a323d..7e9f693c 100644
--- a/packages/gossip/manager.go
+++ b/packages/gossip/manager.go
@@ -9,6 +9,7 @@ import (
 	"github.com/iotaledger/goshimmer/packages/binary/messagelayer/message"
 	pb "github.com/iotaledger/goshimmer/packages/gossip/proto"
 	"github.com/iotaledger/goshimmer/packages/gossip/server"
+	"github.com/iotaledger/hive.go/async"
 	"github.com/iotaledger/hive.go/autopeering/peer"
 	"github.com/iotaledger/hive.go/events"
 	"github.com/iotaledger/hive.go/identity"
@@ -34,6 +35,9 @@ type Manager struct {
 	mu        sync.Mutex
 	srv       *server.TCP
 	neighbors map[identity.ID]*Neighbor
+
+	// inboxWorkerPool defines a worker pool where all incoming messages are processed.
+	inboxWorkerPool async.WorkerPool
 }
 
 // NewManager creates a new Manager.
@@ -65,6 +69,8 @@ func (m *Manager) Start(srv *server.TCP) {
 func (m *Manager) Close() {
 	m.stop()
 	m.wg.Wait()
+
+	m.inboxWorkerPool.ShutdownGracefully()
 }
 
 // Events returns the events related to the gossip protocol.
@@ -206,9 +212,13 @@ func (m *Manager) addNeighbor(peer *peer.Peer, connectorFunc func(*peer.Peer) (n
 		m.events.NeighborRemoved.Trigger(peer)
 	}))
 	n.Events.ReceiveMessage.Attach(events.NewClosure(func(data []byte) {
-		if err := m.handlePacket(data, peer); err != nil {
-			m.log.Debugw("error handling packet", "err", err)
-		}
+		dataCopy := make([]byte, len(data))
+		copy(dataCopy, data)
+		m.inboxWorkerPool.Submit(func() {
+			if err := m.handlePacket(dataCopy, peer); err != nil {
+				m.log.Debugw("error handling packet", "err", err)
+			}
+		})
 	}))
 
 	m.neighbors[peer.ID()] = n
diff --git a/tools/integration-tests/tester/go.mod b/tools/integration-tests/tester/go.mod
index 81fe8f1d..3d3c6637 100644
--- a/tools/integration-tests/tester/go.mod
+++ b/tools/integration-tests/tester/go.mod
@@ -10,7 +10,7 @@ require (
 	github.com/docker/go-units v0.4.0 // indirect
 	github.com/drand/drand v0.8.1
 	github.com/iotaledger/goshimmer v0.1.3
-	github.com/iotaledger/hive.go v0.0.0-20200610104211-d603429af242
+	github.com/iotaledger/hive.go v0.0.0-20200617164933-c48b4401b814
 	github.com/opencontainers/go-digest v1.0.0-rc1 // indirect
 	github.com/stretchr/testify v1.5.1
 )
diff --git a/tools/integration-tests/tester/go.sum b/tools/integration-tests/tester/go.sum
index dcd5ef8f..949a92ac 100644
--- a/tools/integration-tests/tester/go.sum
+++ b/tools/integration-tests/tester/go.sum
@@ -138,8 +138,8 @@ github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4=
 github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
 github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
 github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
-github.com/iotaledger/hive.go v0.0.0-20200610104211-d603429af242 h1:uHMFmfrP6O6lp1lCHT6lpFwHFWYk77V0nUlGbhneQHI=
-github.com/iotaledger/hive.go v0.0.0-20200610104211-d603429af242/go.mod h1:zwZhaE4ZeglpTrbmbwdnVPMI5XdRu2RmByi3Qn0ztmU=
+github.com/iotaledger/hive.go v0.0.0-20200617164933-c48b4401b814 h1:9Cg6q13ngg3/UxBPQjdDREYr+792HO8SN8KzI1t7xFY=
+github.com/iotaledger/hive.go v0.0.0-20200617164933-c48b4401b814/go.mod h1:zwZhaE4ZeglpTrbmbwdnVPMI5XdRu2RmByi3Qn0ztmU=
 github.com/iotaledger/iota.go v1.0.0-beta.9/go.mod h1:F6WBmYd98mVjAmmPVYhnxg8NNIWCjjH8VWT9qvv3Rc8=
 github.com/iotaledger/iota.go v1.0.0-beta.14/go.mod h1:F6WBmYd98mVjAmmPVYhnxg8NNIWCjjH8VWT9qvv3Rc8=
 github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99/go.mod h1:1lJo3i6rXxKeerYnT8Nvf0QmHCRC1n8sfWVwXF2Frvo=
-- 
GitLab