diff --git a/dapps/valuetransfers/dapp.go b/dapps/valuetransfers/dapp.go index 60adff59bb18cc52bec8690144c53a0dd2654bb8..021e88065982b08524bbf96d12a4f11fb46f90a3 100644 --- a/dapps/valuetransfers/dapp.go +++ b/dapps/valuetransfers/dapp.go @@ -72,6 +72,8 @@ var ( valueObjectFactory *tangle.ValueObjectFactory valueObjectFactoryOnce sync.Once + + SignatureFilter *tangle.SignatureFilter ) // App gets the plugin instance. @@ -169,7 +171,8 @@ func configure(_ *node.Plugin) { })) // register SignatureFilter in Parser - messagelayer.MessageParser().AddMessageFilter(tangle.NewSignatureFilter()) + SignatureFilter = tangle.NewSignatureFilter() + messagelayer.MessageParser().AddMessageFilter(SignatureFilter) // subscribe to message-layer messagelayer.Tangle().Events.MessageSolid.Attach(events.NewClosure(onReceiveMessageFromMessageLayer)) diff --git a/dapps/valuetransfers/packages/tangle/signature_filter.go b/dapps/valuetransfers/packages/tangle/signature_filter.go index 3720dab3a1923799579df560a72378d9e609541a..8e02c33247f05c2a86cdfc0c669f8278b0561f9c 100644 --- a/dapps/valuetransfers/packages/tangle/signature_filter.go +++ b/dapps/valuetransfers/packages/tangle/signature_filter.go @@ -94,5 +94,10 @@ func (filter *SignatureFilter) getRejectCallback() func(message *message.Message return filter.onRejectCallback } +// WorkerPoolStatus returns the name and the load of the workerpool. +func (filter *SignatureFilter) WorkerPoolStatus() (name string, load int) { + return "SignatureFilter", filter.workerPool.RunningWorkers() +} + // interface contract (allow the compiler to check if the implementation has all of the required methods). var _ messageparser.MessageFilter = &SignatureFilter{} diff --git a/packages/binary/messagelayer/messageparser/builtinfilters/message_signature_filter.go b/packages/binary/messagelayer/messageparser/builtinfilters/message_signature_filter.go index a7195d61e1d026f49be76cc1833cb4ca930c5028..25220254fc04240e314a3bba6d11599d3f681185 100644 --- a/packages/binary/messagelayer/messageparser/builtinfilters/message_signature_filter.go +++ b/packages/binary/messagelayer/messageparser/builtinfilters/message_signature_filter.go @@ -67,3 +67,8 @@ func (filter *MessageSignatureFilter) getRejectCallback() (result func(msg *mess filter.onRejectCallbackMutex.RUnlock() return } + +// WorkerPoolStatus returns the name and the load of the workerpool. +func (filter *MessageSignatureFilter) WorkerPoolStatus() (name string, load int) { + return "MessageSignatureFilter", filter.workerPool.RunningWorkers() +} diff --git a/packages/binary/messagelayer/messageparser/builtinfilters/pow_filter.go b/packages/binary/messagelayer/messageparser/builtinfilters/pow_filter.go index 38f708cb4cbe22ffc51d8c0d63774025071f3617..8b53d3403066716e6198537cfc8994e7f935141f 100644 --- a/packages/binary/messagelayer/messageparser/builtinfilters/pow_filter.go +++ b/packages/binary/messagelayer/messageparser/builtinfilters/pow_filter.go @@ -98,6 +98,11 @@ func (f *PowFilter) validate(msgBytes []byte) error { return nil } +// WorkerPoolStatus returns the name and the load of the workerpool. +func (f *PowFilter) WorkerPoolStatus() (name string, load int) { + return "PowFilter", f.workerPool.RunningWorkers() +} + // powData returns the bytes over which PoW should be computed. func powData(msgBytes []byte) ([]byte, error) { contentLength := len(msgBytes) - ed25519.SignatureSize diff --git a/packages/binary/messagelayer/messageparser/builtinfilters/recently_seen_bytes_filter.go b/packages/binary/messagelayer/messageparser/builtinfilters/recently_seen_bytes_filter.go index 45b3bfee8ba4b3e3bd6d428c896fecd7241646df..db1d96866c779e81749831014d9b95ffc87359bf 100644 --- a/packages/binary/messagelayer/messageparser/builtinfilters/recently_seen_bytes_filter.go +++ b/packages/binary/messagelayer/messageparser/builtinfilters/recently_seen_bytes_filter.go @@ -69,3 +69,8 @@ func (filter *RecentlySeenBytesFilter) getRejectCallback() (result func(bytes [] func (filter *RecentlySeenBytesFilter) Shutdown() { filter.workerPool.ShutdownGracefully() } + +// WorkerPoolStatus returns the name and the load of the workerpool. +func (filter *RecentlySeenBytesFilter) WorkerPoolStatus() (name string, load int) { + return "RecentlySeenBytesFilter", filter.workerPool.RunningWorkers() +} diff --git a/packages/binary/messagelayer/messageparser/message_parser.go b/packages/binary/messagelayer/messageparser/message_parser.go index 8e491be4949927c1eb26e1bb05b1a6632b740108..fe4fd08a22aa26e6e1513e876f90af26e146078e 100644 --- a/packages/binary/messagelayer/messageparser/message_parser.go +++ b/packages/binary/messagelayer/messageparser/message_parser.go @@ -21,6 +21,11 @@ type MessageParser struct { messageFiltersModified typeutils.AtomicBool bytesFiltersMutex sync.Mutex messageFiltersMutex sync.Mutex + + // only used for workerpool debugging + MessageSignatureFilter *builtinfilters.MessageSignatureFilter + PowFilter *builtinfilters.PowFilter + RecentlySeenBytesFilter *builtinfilters.RecentlySeenBytesFilter } // New creates a new message parser. @@ -42,8 +47,10 @@ func New() (result *MessageParser) { } // add builtin filters - result.AddBytesFilter(builtinfilters.NewRecentlySeenBytesFilter()) - result.AddMessageFilter(builtinfilters.NewMessageSignatureFilter()) + result.RecentlySeenBytesFilter = builtinfilters.NewRecentlySeenBytesFilter() + result.AddBytesFilter(result.RecentlySeenBytesFilter) + result.MessageSignatureFilter = builtinfilters.NewMessageSignatureFilter() + result.AddMessageFilter(result.MessageSignatureFilter) return } diff --git a/packages/binary/messagelayer/tangle/tangle.go b/packages/binary/messagelayer/tangle/tangle.go index d750ecbd59f1fd63af673b32456681aaef36e2b9..8947996fb5a6694430bfc58fb7242553ca0a4cc9 100644 --- a/packages/binary/messagelayer/tangle/tangle.go +++ b/packages/binary/messagelayer/tangle/tangle.go @@ -2,7 +2,6 @@ package tangle import ( "container/list" - "runtime" "time" "github.com/iotaledger/goshimmer/packages/binary/messagelayer/message" @@ -57,7 +56,7 @@ func New(store kvstore.KVStore) (result *Tangle) { Events: *newEvents(), } - result.solidifierWorkerPool.Tune(runtime.GOMAXPROCS(0)) + result.solidifierWorkerPool.Tune(1024) return } @@ -294,3 +293,13 @@ func (tangle *Tangle) deleteFutureCone(messageId message.Id) { }) } } + +// SolidifierWorkerPoolStatus returns the name and the load of the workerpool. +func (tangle *Tangle) SolidifierWorkerPoolStatus() (name string, load int) { + return "Solidifier", tangle.solidifierWorkerPool.RunningWorkers() +} + +// StoreMessageWorkerPoolStatus returns the name and the load of the workerpool. +func (tangle *Tangle) StoreMessageWorkerPoolStatus() (name string, load int) { + return "StoreMessage", tangle.storeMessageWorkerPool.RunningWorkers() +} diff --git a/packages/gossip/manager.go b/packages/gossip/manager.go index 8ee8f0ebadbda4aa1938951fe14fc9dbcb430ed8..fbfb5f49ca735f8aaf3314027d7a72872bc8dd0b 100644 --- a/packages/gossip/manager.go +++ b/packages/gossip/manager.go @@ -280,3 +280,8 @@ func marshal(packet pb.Packet) []byte { } return append([]byte{byte(packetType)}, data...) } + +// WorkerPoolStatus returns the name and the load of the workerpool. +func (m *Manager) WorkerPoolStatus() (name string, load int) { + return "InboxWorkerPool", m.inboxWorkerPool.RunningWorkers() +} diff --git a/plugins/pow/plugin.go b/plugins/pow/plugin.go index d8056d6dc213d5ae053b483a43a19886fff8eb35..74da7d9d3509308192410f5aa73fd12524d750cd 100644 --- a/plugins/pow/plugin.go +++ b/plugins/pow/plugin.go @@ -30,6 +30,7 @@ func run(*node.Plugin) { log.Infof("%s started: difficult=%d", PluginName, difficulty) - messagelayer.MessageParser().AddBytesFilter(builtinfilters.NewPowFilter(worker, difficulty)) + messagelayer.MessageParser().PowFilter = builtinfilters.NewPowFilter(worker, difficulty) + messagelayer.MessageParser().AddBytesFilter(messagelayer.MessageParser().PowFilter) messagelayer.MessageFactory().SetWorker(messagefactory.WorkerFunc(DoPOW)) } diff --git a/plugins/prometheus/parameters.go b/plugins/prometheus/parameters.go index dbc77cb0d17fd0c1e1c1544425225e4dd72d1f99..5960760ca3ac7d9cab0ffc8e189d0e6749e9ef1b 100644 --- a/plugins/prometheus/parameters.go +++ b/plugins/prometheus/parameters.go @@ -11,6 +11,8 @@ const ( CfgPrometheusProcessMetrics = "prometheus.processMetrics" // CfgPrometheusPromhttpMetrics defines the config flag to enable/disable promhttp metrics. CfgPrometheusPromhttpMetrics = "prometheus.promhttpMetrics" + // CfgPrometheusWorkerpoolMetrics defines the config flag to enable/disable workerpool metrics. + CfgPrometheusWorkerpoolMetrics = "prometheus.workerpoolMetrics" // CfgPrometheusBindAddress defines the config flag of the bind address on which the Prometheus exporter listens on. CfgPrometheusBindAddress = "prometheus.bindAddress" ) @@ -20,4 +22,5 @@ func init() { flag.Bool(CfgPrometheusGoMetrics, false, "include go metrics") flag.Bool(CfgPrometheusProcessMetrics, false, "include process metrics") flag.Bool(CfgPrometheusPromhttpMetrics, false, "include promhttp metrics") + flag.Bool(CfgPrometheusWorkerpoolMetrics, false, "include workerpool metrics") } diff --git a/plugins/prometheus/workerpool.go b/plugins/prometheus/workerpool.go new file mode 100644 index 0000000000000000000000000000000000000000..30dbfcf3f2c7cc6742969a534460caff2fad2520 --- /dev/null +++ b/plugins/prometheus/workerpool.go @@ -0,0 +1,73 @@ +package prometheus + +import ( + "github.com/iotaledger/goshimmer/dapps/valuetransfers" + "github.com/iotaledger/goshimmer/plugins/gossip" + "github.com/iotaledger/goshimmer/plugins/messagelayer" + "github.com/prometheus/client_golang/prometheus" +) + +var ( + workerpools *prometheus.GaugeVec +) + +func registerWorkerpoolMetrics() { + workerpools = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "workerpools_load", + Help: "Info about workerpools load", + }, + []string{ + "name", + }, + ) + + registry.MustRegister(workerpools) + + addCollect(collectWorkerpoolMetrics) +} + +func collectWorkerpoolMetrics() { + name, load := gossip.Manager().WorkerPoolStatus() + workerpools.WithLabelValues( + name, + ).Set(float64(load)) + + name, load = messagelayer.Tangle().SolidifierWorkerPoolStatus() + workerpools.WithLabelValues( + name, + ).Set(float64(load)) + + name, load = messagelayer.Tangle().StoreMessageWorkerPoolStatus() + workerpools.WithLabelValues( + name, + ).Set(float64(load)) + + if messagelayer.MessageParser().MessageSignatureFilter != nil { + name, load = messagelayer.MessageParser().MessageSignatureFilter.WorkerPoolStatus() + workerpools.WithLabelValues( + name, + ).Set(float64(load)) + } + + if valuetransfers.SignatureFilter != nil { + name, load = valuetransfers.SignatureFilter.WorkerPoolStatus() + workerpools.WithLabelValues( + name, + ).Set(float64(load)) + } + + if messagelayer.MessageParser().RecentlySeenBytesFilter != nil { + name, load = messagelayer.MessageParser().RecentlySeenBytesFilter.WorkerPoolStatus() + workerpools.WithLabelValues( + name, + ).Set(float64(load)) + } + + if messagelayer.MessageParser().PowFilter != nil { + name, load = messagelayer.MessageParser().PowFilter.WorkerPoolStatus() + workerpools.WithLabelValues( + name, + ).Set(float64(load)) + } +}