Skip to content
Snippets Groups Projects
Unverified Commit 19ad2e01 authored by capossele's avatar capossele
Browse files

:mag: Add workerpools load on Prometheus

parent 4554a1ff
Branches
No related tags found
No related merge requests found
...@@ -72,6 +72,8 @@ var ( ...@@ -72,6 +72,8 @@ var (
valueObjectFactory *tangle.ValueObjectFactory valueObjectFactory *tangle.ValueObjectFactory
valueObjectFactoryOnce sync.Once valueObjectFactoryOnce sync.Once
SignatureFilter *tangle.SignatureFilter
) )
// App gets the plugin instance. // App gets the plugin instance.
...@@ -169,7 +171,8 @@ func configure(_ *node.Plugin) { ...@@ -169,7 +171,8 @@ func configure(_ *node.Plugin) {
})) }))
// register SignatureFilter in Parser // register SignatureFilter in Parser
messagelayer.MessageParser().AddMessageFilter(tangle.NewSignatureFilter()) SignatureFilter = tangle.NewSignatureFilter()
messagelayer.MessageParser().AddMessageFilter(SignatureFilter)
// subscribe to message-layer // subscribe to message-layer
messagelayer.Tangle().Events.MessageSolid.Attach(events.NewClosure(onReceiveMessageFromMessageLayer)) messagelayer.Tangle().Events.MessageSolid.Attach(events.NewClosure(onReceiveMessageFromMessageLayer))
......
...@@ -94,5 +94,10 @@ func (filter *SignatureFilter) getRejectCallback() func(message *message.Message ...@@ -94,5 +94,10 @@ func (filter *SignatureFilter) getRejectCallback() func(message *message.Message
return filter.onRejectCallback return filter.onRejectCallback
} }
// WorkerPoolStatus returns the name and the load of the workerpool.
func (filter *SignatureFilter) WorkerPoolStatus() (name string, load int) {
return "SignatureFilter", filter.workerPool.RunningWorkers()
}
// interface contract (allow the compiler to check if the implementation has all of the required methods). // interface contract (allow the compiler to check if the implementation has all of the required methods).
var _ messageparser.MessageFilter = &SignatureFilter{} var _ messageparser.MessageFilter = &SignatureFilter{}
...@@ -67,3 +67,8 @@ func (filter *MessageSignatureFilter) getRejectCallback() (result func(msg *mess ...@@ -67,3 +67,8 @@ func (filter *MessageSignatureFilter) getRejectCallback() (result func(msg *mess
filter.onRejectCallbackMutex.RUnlock() filter.onRejectCallbackMutex.RUnlock()
return return
} }
// WorkerPoolStatus returns the name and the load of the workerpool.
func (filter *MessageSignatureFilter) WorkerPoolStatus() (name string, load int) {
return "MessageSignatureFilter", filter.workerPool.RunningWorkers()
}
...@@ -98,6 +98,11 @@ func (f *PowFilter) validate(msgBytes []byte) error { ...@@ -98,6 +98,11 @@ func (f *PowFilter) validate(msgBytes []byte) error {
return nil return nil
} }
// WorkerPoolStatus returns the name and the load of the workerpool.
func (f *PowFilter) WorkerPoolStatus() (name string, load int) {
return "PowFilter", f.workerPool.RunningWorkers()
}
// powData returns the bytes over which PoW should be computed. // powData returns the bytes over which PoW should be computed.
func powData(msgBytes []byte) ([]byte, error) { func powData(msgBytes []byte) ([]byte, error) {
contentLength := len(msgBytes) - ed25519.SignatureSize contentLength := len(msgBytes) - ed25519.SignatureSize
......
...@@ -69,3 +69,8 @@ func (filter *RecentlySeenBytesFilter) getRejectCallback() (result func(bytes [] ...@@ -69,3 +69,8 @@ func (filter *RecentlySeenBytesFilter) getRejectCallback() (result func(bytes []
func (filter *RecentlySeenBytesFilter) Shutdown() { func (filter *RecentlySeenBytesFilter) Shutdown() {
filter.workerPool.ShutdownGracefully() filter.workerPool.ShutdownGracefully()
} }
// WorkerPoolStatus returns the name and the load of the workerpool.
func (filter *RecentlySeenBytesFilter) WorkerPoolStatus() (name string, load int) {
return "RecentlySeenBytesFilter", filter.workerPool.RunningWorkers()
}
...@@ -21,6 +21,11 @@ type MessageParser struct { ...@@ -21,6 +21,11 @@ type MessageParser struct {
messageFiltersModified typeutils.AtomicBool messageFiltersModified typeutils.AtomicBool
bytesFiltersMutex sync.Mutex bytesFiltersMutex sync.Mutex
messageFiltersMutex sync.Mutex messageFiltersMutex sync.Mutex
// only used for workerpool debugging
MessageSignatureFilter *builtinfilters.MessageSignatureFilter
PowFilter *builtinfilters.PowFilter
RecentlySeenBytesFilter *builtinfilters.RecentlySeenBytesFilter
} }
// New creates a new message parser. // New creates a new message parser.
...@@ -42,8 +47,10 @@ func New() (result *MessageParser) { ...@@ -42,8 +47,10 @@ func New() (result *MessageParser) {
} }
// add builtin filters // add builtin filters
result.AddBytesFilter(builtinfilters.NewRecentlySeenBytesFilter()) result.RecentlySeenBytesFilter = builtinfilters.NewRecentlySeenBytesFilter()
result.AddMessageFilter(builtinfilters.NewMessageSignatureFilter()) result.AddBytesFilter(result.RecentlySeenBytesFilter)
result.MessageSignatureFilter = builtinfilters.NewMessageSignatureFilter()
result.AddMessageFilter(result.MessageSignatureFilter)
return return
} }
......
...@@ -2,7 +2,6 @@ package tangle ...@@ -2,7 +2,6 @@ package tangle
import ( import (
"container/list" "container/list"
"runtime"
"time" "time"
"github.com/iotaledger/goshimmer/packages/binary/messagelayer/message" "github.com/iotaledger/goshimmer/packages/binary/messagelayer/message"
...@@ -57,7 +56,7 @@ func New(store kvstore.KVStore) (result *Tangle) { ...@@ -57,7 +56,7 @@ func New(store kvstore.KVStore) (result *Tangle) {
Events: *newEvents(), Events: *newEvents(),
} }
result.solidifierWorkerPool.Tune(runtime.GOMAXPROCS(0)) result.solidifierWorkerPool.Tune(1024)
return return
} }
...@@ -294,3 +293,13 @@ func (tangle *Tangle) deleteFutureCone(messageId message.Id) { ...@@ -294,3 +293,13 @@ func (tangle *Tangle) deleteFutureCone(messageId message.Id) {
}) })
} }
} }
// SolidifierWorkerPoolStatus returns the name and the load of the workerpool.
func (tangle *Tangle) SolidifierWorkerPoolStatus() (name string, load int) {
return "Solidifier", tangle.solidifierWorkerPool.RunningWorkers()
}
// StoreMessageWorkerPoolStatus returns the name and the load of the workerpool.
func (tangle *Tangle) StoreMessageWorkerPoolStatus() (name string, load int) {
return "StoreMessage", tangle.storeMessageWorkerPool.RunningWorkers()
}
...@@ -280,3 +280,8 @@ func marshal(packet pb.Packet) []byte { ...@@ -280,3 +280,8 @@ func marshal(packet pb.Packet) []byte {
} }
return append([]byte{byte(packetType)}, data...) return append([]byte{byte(packetType)}, data...)
} }
// WorkerPoolStatus returns the name and the load of the workerpool.
func (m *Manager) WorkerPoolStatus() (name string, load int) {
return "InboxWorkerPool", m.inboxWorkerPool.RunningWorkers()
}
...@@ -30,6 +30,7 @@ func run(*node.Plugin) { ...@@ -30,6 +30,7 @@ func run(*node.Plugin) {
log.Infof("%s started: difficult=%d", PluginName, difficulty) log.Infof("%s started: difficult=%d", PluginName, difficulty)
messagelayer.MessageParser().AddBytesFilter(builtinfilters.NewPowFilter(worker, difficulty)) messagelayer.MessageParser().PowFilter = builtinfilters.NewPowFilter(worker, difficulty)
messagelayer.MessageParser().AddBytesFilter(messagelayer.MessageParser().PowFilter)
messagelayer.MessageFactory().SetWorker(messagefactory.WorkerFunc(DoPOW)) messagelayer.MessageFactory().SetWorker(messagefactory.WorkerFunc(DoPOW))
} }
...@@ -11,6 +11,8 @@ const ( ...@@ -11,6 +11,8 @@ const (
CfgPrometheusProcessMetrics = "prometheus.processMetrics" CfgPrometheusProcessMetrics = "prometheus.processMetrics"
// CfgPrometheusPromhttpMetrics defines the config flag to enable/disable promhttp metrics. // CfgPrometheusPromhttpMetrics defines the config flag to enable/disable promhttp metrics.
CfgPrometheusPromhttpMetrics = "prometheus.promhttpMetrics" CfgPrometheusPromhttpMetrics = "prometheus.promhttpMetrics"
// CfgPrometheusWorkerpoolMetrics defines the config flag to enable/disable workerpool metrics.
CfgPrometheusWorkerpoolMetrics = "prometheus.workerpoolMetrics"
// CfgPrometheusBindAddress defines the config flag of the bind address on which the Prometheus exporter listens on. // CfgPrometheusBindAddress defines the config flag of the bind address on which the Prometheus exporter listens on.
CfgPrometheusBindAddress = "prometheus.bindAddress" CfgPrometheusBindAddress = "prometheus.bindAddress"
) )
...@@ -20,4 +22,5 @@ func init() { ...@@ -20,4 +22,5 @@ func init() {
flag.Bool(CfgPrometheusGoMetrics, false, "include go metrics") flag.Bool(CfgPrometheusGoMetrics, false, "include go metrics")
flag.Bool(CfgPrometheusProcessMetrics, false, "include process metrics") flag.Bool(CfgPrometheusProcessMetrics, false, "include process metrics")
flag.Bool(CfgPrometheusPromhttpMetrics, false, "include promhttp metrics") flag.Bool(CfgPrometheusPromhttpMetrics, false, "include promhttp metrics")
flag.Bool(CfgPrometheusWorkerpoolMetrics, false, "include workerpool metrics")
} }
package prometheus
import (
"github.com/iotaledger/goshimmer/dapps/valuetransfers"
"github.com/iotaledger/goshimmer/plugins/gossip"
"github.com/iotaledger/goshimmer/plugins/messagelayer"
"github.com/prometheus/client_golang/prometheus"
)
var (
workerpools *prometheus.GaugeVec
)
func registerWorkerpoolMetrics() {
workerpools = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "workerpools_load",
Help: "Info about workerpools load",
},
[]string{
"name",
},
)
registry.MustRegister(workerpools)
addCollect(collectWorkerpoolMetrics)
}
func collectWorkerpoolMetrics() {
name, load := gossip.Manager().WorkerPoolStatus()
workerpools.WithLabelValues(
name,
).Set(float64(load))
name, load = messagelayer.Tangle().SolidifierWorkerPoolStatus()
workerpools.WithLabelValues(
name,
).Set(float64(load))
name, load = messagelayer.Tangle().StoreMessageWorkerPoolStatus()
workerpools.WithLabelValues(
name,
).Set(float64(load))
if messagelayer.MessageParser().MessageSignatureFilter != nil {
name, load = messagelayer.MessageParser().MessageSignatureFilter.WorkerPoolStatus()
workerpools.WithLabelValues(
name,
).Set(float64(load))
}
if valuetransfers.SignatureFilter != nil {
name, load = valuetransfers.SignatureFilter.WorkerPoolStatus()
workerpools.WithLabelValues(
name,
).Set(float64(load))
}
if messagelayer.MessageParser().RecentlySeenBytesFilter != nil {
name, load = messagelayer.MessageParser().RecentlySeenBytesFilter.WorkerPoolStatus()
workerpools.WithLabelValues(
name,
).Set(float64(load))
}
if messagelayer.MessageParser().PowFilter != nil {
name, load = messagelayer.MessageParser().PowFilter.WorkerPoolStatus()
workerpools.WithLabelValues(
name,
).Set(float64(load))
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment