From 19ad2e0159f9122da7adc71446628bf0b23a5c69 Mon Sep 17 00:00:00 2001
From: capossele <angelocapossele@gmail.com>
Date: Fri, 17 Jul 2020 10:36:13 +0100
Subject: [PATCH] =?UTF-8?q?=F0=9F=94=8D=20Add=20workerpools=20load=20on=20?=
 =?UTF-8?q?Prometheus?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

---
 dapps/valuetransfers/dapp.go                  |  5 +-
 .../packages/tangle/signature_filter.go       |  5 ++
 .../message_signature_filter.go               |  5 ++
 .../builtinfilters/pow_filter.go              |  5 ++
 .../recently_seen_bytes_filter.go             |  5 ++
 .../messageparser/message_parser.go           | 11 ++-
 packages/binary/messagelayer/tangle/tangle.go | 13 +++-
 packages/gossip/manager.go                    |  5 ++
 plugins/pow/plugin.go                         |  3 +-
 plugins/prometheus/parameters.go              |  3 +
 plugins/prometheus/workerpool.go              | 73 +++++++++++++++++++
 11 files changed, 127 insertions(+), 6 deletions(-)
 create mode 100644 plugins/prometheus/workerpool.go

diff --git a/dapps/valuetransfers/dapp.go b/dapps/valuetransfers/dapp.go
index 60adff59..021e8806 100644
--- a/dapps/valuetransfers/dapp.go
+++ b/dapps/valuetransfers/dapp.go
@@ -72,6 +72,8 @@ var (
 
 	valueObjectFactory     *tangle.ValueObjectFactory
 	valueObjectFactoryOnce sync.Once
+
+	SignatureFilter *tangle.SignatureFilter
 )
 
 // App gets the plugin instance.
@@ -169,7 +171,8 @@ func configure(_ *node.Plugin) {
 	}))
 
 	// register SignatureFilter in Parser
-	messagelayer.MessageParser().AddMessageFilter(tangle.NewSignatureFilter())
+	SignatureFilter = tangle.NewSignatureFilter()
+	messagelayer.MessageParser().AddMessageFilter(SignatureFilter)
 
 	// subscribe to message-layer
 	messagelayer.Tangle().Events.MessageSolid.Attach(events.NewClosure(onReceiveMessageFromMessageLayer))
diff --git a/dapps/valuetransfers/packages/tangle/signature_filter.go b/dapps/valuetransfers/packages/tangle/signature_filter.go
index 3720dab3..8e02c332 100644
--- a/dapps/valuetransfers/packages/tangle/signature_filter.go
+++ b/dapps/valuetransfers/packages/tangle/signature_filter.go
@@ -94,5 +94,10 @@ func (filter *SignatureFilter) getRejectCallback() func(message *message.Message
 	return filter.onRejectCallback
 }
 
+// WorkerPoolStatus returns the name and the load of the workerpool.
+func (filter *SignatureFilter) WorkerPoolStatus() (name string, load int) {
+	return "SignatureFilter", filter.workerPool.RunningWorkers()
+}
+
 // interface contract (allow the compiler to check if the implementation has all of the required methods).
 var _ messageparser.MessageFilter = &SignatureFilter{}
diff --git a/packages/binary/messagelayer/messageparser/builtinfilters/message_signature_filter.go b/packages/binary/messagelayer/messageparser/builtinfilters/message_signature_filter.go
index a7195d61..25220254 100644
--- a/packages/binary/messagelayer/messageparser/builtinfilters/message_signature_filter.go
+++ b/packages/binary/messagelayer/messageparser/builtinfilters/message_signature_filter.go
@@ -67,3 +67,8 @@ func (filter *MessageSignatureFilter) getRejectCallback() (result func(msg *mess
 	filter.onRejectCallbackMutex.RUnlock()
 	return
 }
+
+// WorkerPoolStatus returns the name and the load of the workerpool.
+func (filter *MessageSignatureFilter) WorkerPoolStatus() (name string, load int) {
+	return "MessageSignatureFilter", filter.workerPool.RunningWorkers()
+}
diff --git a/packages/binary/messagelayer/messageparser/builtinfilters/pow_filter.go b/packages/binary/messagelayer/messageparser/builtinfilters/pow_filter.go
index 38f708cb..8b53d340 100644
--- a/packages/binary/messagelayer/messageparser/builtinfilters/pow_filter.go
+++ b/packages/binary/messagelayer/messageparser/builtinfilters/pow_filter.go
@@ -98,6 +98,11 @@ func (f *PowFilter) validate(msgBytes []byte) error {
 	return nil
 }
 
+// WorkerPoolStatus returns the name and the load of the workerpool.
+func (f *PowFilter) WorkerPoolStatus() (name string, load int) {
+	return "PowFilter", f.workerPool.RunningWorkers()
+}
+
 // powData returns the bytes over which PoW should be computed.
 func powData(msgBytes []byte) ([]byte, error) {
 	contentLength := len(msgBytes) - ed25519.SignatureSize
diff --git a/packages/binary/messagelayer/messageparser/builtinfilters/recently_seen_bytes_filter.go b/packages/binary/messagelayer/messageparser/builtinfilters/recently_seen_bytes_filter.go
index 45b3bfee..db1d9686 100644
--- a/packages/binary/messagelayer/messageparser/builtinfilters/recently_seen_bytes_filter.go
+++ b/packages/binary/messagelayer/messageparser/builtinfilters/recently_seen_bytes_filter.go
@@ -69,3 +69,8 @@ func (filter *RecentlySeenBytesFilter) getRejectCallback() (result func(bytes []
 func (filter *RecentlySeenBytesFilter) Shutdown() {
 	filter.workerPool.ShutdownGracefully()
 }
+
+// WorkerPoolStatus returns the name and the load of the workerpool.
+func (filter *RecentlySeenBytesFilter) WorkerPoolStatus() (name string, load int) {
+	return "RecentlySeenBytesFilter", filter.workerPool.RunningWorkers()
+}
diff --git a/packages/binary/messagelayer/messageparser/message_parser.go b/packages/binary/messagelayer/messageparser/message_parser.go
index 8e491be4..fe4fd08a 100644
--- a/packages/binary/messagelayer/messageparser/message_parser.go
+++ b/packages/binary/messagelayer/messageparser/message_parser.go
@@ -21,6 +21,11 @@ type MessageParser struct {
 	messageFiltersModified typeutils.AtomicBool
 	bytesFiltersMutex      sync.Mutex
 	messageFiltersMutex    sync.Mutex
+
+	// only used for workerpool debugging
+	MessageSignatureFilter  *builtinfilters.MessageSignatureFilter
+	PowFilter               *builtinfilters.PowFilter
+	RecentlySeenBytesFilter *builtinfilters.RecentlySeenBytesFilter
 }
 
 // New creates a new message parser.
@@ -42,8 +47,10 @@ func New() (result *MessageParser) {
 	}
 
 	// add builtin filters
-	result.AddBytesFilter(builtinfilters.NewRecentlySeenBytesFilter())
-	result.AddMessageFilter(builtinfilters.NewMessageSignatureFilter())
+	result.RecentlySeenBytesFilter = builtinfilters.NewRecentlySeenBytesFilter()
+	result.AddBytesFilter(result.RecentlySeenBytesFilter)
+	result.MessageSignatureFilter = builtinfilters.NewMessageSignatureFilter()
+	result.AddMessageFilter(result.MessageSignatureFilter)
 	return
 }
 
diff --git a/packages/binary/messagelayer/tangle/tangle.go b/packages/binary/messagelayer/tangle/tangle.go
index d750ecbd..8947996f 100644
--- a/packages/binary/messagelayer/tangle/tangle.go
+++ b/packages/binary/messagelayer/tangle/tangle.go
@@ -2,7 +2,6 @@ package tangle
 
 import (
 	"container/list"
-	"runtime"
 	"time"
 
 	"github.com/iotaledger/goshimmer/packages/binary/messagelayer/message"
@@ -57,7 +56,7 @@ func New(store kvstore.KVStore) (result *Tangle) {
 		Events: *newEvents(),
 	}
 
-	result.solidifierWorkerPool.Tune(runtime.GOMAXPROCS(0))
+	result.solidifierWorkerPool.Tune(1024)
 	return
 }
 
@@ -294,3 +293,13 @@ func (tangle *Tangle) deleteFutureCone(messageId message.Id) {
 		})
 	}
 }
+
+// SolidifierWorkerPoolStatus returns the name and the load of the workerpool.
+func (tangle *Tangle) SolidifierWorkerPoolStatus() (name string, load int) {
+	return "Solidifier", tangle.solidifierWorkerPool.RunningWorkers()
+}
+
+// StoreMessageWorkerPoolStatus returns the name and the load of the workerpool.
+func (tangle *Tangle) StoreMessageWorkerPoolStatus() (name string, load int) {
+	return "StoreMessage", tangle.storeMessageWorkerPool.RunningWorkers()
+}
diff --git a/packages/gossip/manager.go b/packages/gossip/manager.go
index 8ee8f0eb..fbfb5f49 100644
--- a/packages/gossip/manager.go
+++ b/packages/gossip/manager.go
@@ -280,3 +280,8 @@ func marshal(packet pb.Packet) []byte {
 	}
 	return append([]byte{byte(packetType)}, data...)
 }
+
+// WorkerPoolStatus returns the name and the load of the workerpool.
+func (m *Manager) WorkerPoolStatus() (name string, load int) {
+	return "InboxWorkerPool", m.inboxWorkerPool.RunningWorkers()
+}
diff --git a/plugins/pow/plugin.go b/plugins/pow/plugin.go
index d8056d6d..74da7d9d 100644
--- a/plugins/pow/plugin.go
+++ b/plugins/pow/plugin.go
@@ -30,6 +30,7 @@ func run(*node.Plugin) {
 
 	log.Infof("%s started: difficult=%d", PluginName, difficulty)
 
-	messagelayer.MessageParser().AddBytesFilter(builtinfilters.NewPowFilter(worker, difficulty))
+	messagelayer.MessageParser().PowFilter = builtinfilters.NewPowFilter(worker, difficulty)
+	messagelayer.MessageParser().AddBytesFilter(messagelayer.MessageParser().PowFilter)
 	messagelayer.MessageFactory().SetWorker(messagefactory.WorkerFunc(DoPOW))
 }
diff --git a/plugins/prometheus/parameters.go b/plugins/prometheus/parameters.go
index dbc77cb0..5960760c 100644
--- a/plugins/prometheus/parameters.go
+++ b/plugins/prometheus/parameters.go
@@ -11,6 +11,8 @@ const (
 	CfgPrometheusProcessMetrics = "prometheus.processMetrics"
 	// CfgPrometheusPromhttpMetrics defines the config flag to enable/disable promhttp metrics.
 	CfgPrometheusPromhttpMetrics = "prometheus.promhttpMetrics"
+	// CfgPrometheusWorkerpoolMetrics defines the config flag to enable/disable workerpool metrics.
+	CfgPrometheusWorkerpoolMetrics = "prometheus.workerpoolMetrics"
 	// CfgPrometheusBindAddress defines the config flag of the bind address on which the Prometheus exporter listens on.
 	CfgPrometheusBindAddress = "prometheus.bindAddress"
 )
@@ -20,4 +22,5 @@ func init() {
 	flag.Bool(CfgPrometheusGoMetrics, false, "include go metrics")
 	flag.Bool(CfgPrometheusProcessMetrics, false, "include process metrics")
 	flag.Bool(CfgPrometheusPromhttpMetrics, false, "include promhttp metrics")
+	flag.Bool(CfgPrometheusWorkerpoolMetrics, false, "include workerpool metrics")
 }
diff --git a/plugins/prometheus/workerpool.go b/plugins/prometheus/workerpool.go
new file mode 100644
index 00000000..30dbfcf3
--- /dev/null
+++ b/plugins/prometheus/workerpool.go
@@ -0,0 +1,73 @@
+package prometheus
+
+import (
+	"github.com/iotaledger/goshimmer/dapps/valuetransfers"
+	"github.com/iotaledger/goshimmer/plugins/gossip"
+	"github.com/iotaledger/goshimmer/plugins/messagelayer"
+	"github.com/prometheus/client_golang/prometheus"
+)
+
+var (
+	workerpools *prometheus.GaugeVec
+)
+
+func registerWorkerpoolMetrics() {
+	workerpools = prometheus.NewGaugeVec(
+		prometheus.GaugeOpts{
+			Name: "workerpools_load",
+			Help: "Info about workerpools load",
+		},
+		[]string{
+			"name",
+		},
+	)
+
+	registry.MustRegister(workerpools)
+
+	addCollect(collectWorkerpoolMetrics)
+}
+
+func collectWorkerpoolMetrics() {
+	name, load := gossip.Manager().WorkerPoolStatus()
+	workerpools.WithLabelValues(
+		name,
+	).Set(float64(load))
+
+	name, load = messagelayer.Tangle().SolidifierWorkerPoolStatus()
+	workerpools.WithLabelValues(
+		name,
+	).Set(float64(load))
+
+	name, load = messagelayer.Tangle().StoreMessageWorkerPoolStatus()
+	workerpools.WithLabelValues(
+		name,
+	).Set(float64(load))
+
+	if messagelayer.MessageParser().MessageSignatureFilter != nil {
+		name, load = messagelayer.MessageParser().MessageSignatureFilter.WorkerPoolStatus()
+		workerpools.WithLabelValues(
+			name,
+		).Set(float64(load))
+	}
+
+	if valuetransfers.SignatureFilter != nil {
+		name, load = valuetransfers.SignatureFilter.WorkerPoolStatus()
+		workerpools.WithLabelValues(
+			name,
+		).Set(float64(load))
+	}
+
+	if messagelayer.MessageParser().RecentlySeenBytesFilter != nil {
+		name, load = messagelayer.MessageParser().RecentlySeenBytesFilter.WorkerPoolStatus()
+		workerpools.WithLabelValues(
+			name,
+		).Set(float64(load))
+	}
+
+	if messagelayer.MessageParser().PowFilter != nil {
+		name, load = messagelayer.MessageParser().PowFilter.WorkerPoolStatus()
+		workerpools.WithLabelValues(
+			name,
+		).Set(float64(load))
+	}
+}
-- 
GitLab