diff --git a/dapps/valuetransfers/packages/tangle/signature_filter.go b/dapps/valuetransfers/packages/tangle/signature_filter.go index a8e77e1fe9acda6b3a00f7c570a340f2cf9ea277..de79193ae32dd98288bf844946072903becf00d4 100644 --- a/dapps/valuetransfers/packages/tangle/signature_filter.go +++ b/dapps/valuetransfers/packages/tangle/signature_filter.go @@ -7,7 +7,6 @@ import ( "github.com/iotaledger/goshimmer/dapps/valuetransfers/packages/payload" "github.com/iotaledger/goshimmer/packages/binary/messagelayer/message" "github.com/iotaledger/goshimmer/packages/binary/messagelayer/messageparser" - "github.com/iotaledger/hive.go/async" "github.com/iotaledger/hive.go/autopeering/peer" ) @@ -17,7 +16,6 @@ type SignatureFilter struct { onRejectCallback func(message *message.Message, err error, peer *peer.Peer) onAcceptCallbackMutex sync.RWMutex onRejectCallbackMutex sync.RWMutex - workerPool async.WorkerPool } // NewSignatureFilter is the constructor of the MessageFilter. @@ -28,7 +26,6 @@ func NewSignatureFilter() *SignatureFilter { // Filter get's called whenever a new message is received. It rejects the message, if the message is not a valid value // message. func (filter *SignatureFilter) Filter(message *message.Message, peer *peer.Peer) { - // filter.workerPool.Submit(func() { // accept message if the message is not a value message (it will be checked by other filters) valuePayload := message.Payload() if valuePayload.Type() != payload.Type { @@ -54,7 +51,6 @@ func (filter *SignatureFilter) Filter(message *message.Message, peer *peer.Peer) // if all previous checks passed: accept message filter.getAcceptCallback()(message, peer) - // }) } // OnAccept registers the given callback as the acceptance function of the filter. @@ -74,9 +70,7 @@ func (filter *SignatureFilter) OnReject(callback func(message *message.Message, } // Shutdown shuts down the filter. -func (filter *SignatureFilter) Shutdown() { - filter.workerPool.ShutdownGracefully() -} +func (filter *SignatureFilter) Shutdown() {} // getAcceptCallback returns the callback that is executed when a message passes the filter. func (filter *SignatureFilter) getAcceptCallback() func(message *message.Message, peer *peer.Peer) { @@ -94,10 +88,5 @@ func (filter *SignatureFilter) getRejectCallback() func(message *message.Message return filter.onRejectCallback } -// WorkerPoolStatus returns the name and the load of the workerpool. -func (filter *SignatureFilter) WorkerPoolStatus() (name string, load int) { - return "SignatureFilter", filter.workerPool.RunningWorkers() -} - // interface contract (allow the compiler to check if the implementation has all of the required methods). var _ messageparser.MessageFilter = &SignatureFilter{} diff --git a/packages/binary/messagelayer/messageparser/builtinfilters/message_signature_filter.go b/packages/binary/messagelayer/messageparser/builtinfilters/message_signature_filter.go index 887673dd25b0a19d11d5d1dda1403ee85da432b7..554cc4ccd6651d637005279070b37a968131cc06 100644 --- a/packages/binary/messagelayer/messageparser/builtinfilters/message_signature_filter.go +++ b/packages/binary/messagelayer/messageparser/builtinfilters/message_signature_filter.go @@ -4,10 +4,8 @@ import ( "fmt" "sync" - "github.com/iotaledger/hive.go/async" - "github.com/iotaledger/hive.go/autopeering/peer" - "github.com/iotaledger/goshimmer/packages/binary/messagelayer/message" + "github.com/iotaledger/hive.go/autopeering/peer" ) // ErrInvalidSignature is returned when a message contains an invalid signature. @@ -17,7 +15,6 @@ var ErrInvalidSignature = fmt.Errorf("invalid signature") type MessageSignatureFilter struct { onAcceptCallback func(msg *message.Message, peer *peer.Peer) onRejectCallback func(msg *message.Message, err error, peer *peer.Peer) - workerPool async.WorkerPool onAcceptCallbackMutex sync.RWMutex onRejectCallbackMutex sync.RWMutex @@ -29,13 +26,11 @@ func NewMessageSignatureFilter() *MessageSignatureFilter { } func (filter *MessageSignatureFilter) Filter(msg *message.Message, peer *peer.Peer) { - // filter.workerPool.Submit(func() { if msg.VerifySignature() { filter.getAcceptCallback()(msg, peer) return } filter.getRejectCallback()(msg, ErrInvalidSignature, peer) - // }) } func (filter *MessageSignatureFilter) OnAccept(callback func(msg *message.Message, peer *peer.Peer)) { @@ -50,9 +45,7 @@ func (filter *MessageSignatureFilter) OnReject(callback func(msg *message.Messag filter.onRejectCallbackMutex.Unlock() } -func (filter *MessageSignatureFilter) Shutdown() { - filter.workerPool.ShutdownGracefully() -} +func (filter *MessageSignatureFilter) Shutdown() {} func (filter *MessageSignatureFilter) getAcceptCallback() (result func(msg *message.Message, peer *peer.Peer)) { filter.onAcceptCallbackMutex.RLock() @@ -67,8 +60,3 @@ func (filter *MessageSignatureFilter) getRejectCallback() (result func(msg *mess filter.onRejectCallbackMutex.RUnlock() return } - -// WorkerPoolStatus returns the name and the load of the workerpool. -func (filter *MessageSignatureFilter) WorkerPoolStatus() (name string, load int) { - return "MessageSignatureFilter", filter.workerPool.RunningWorkers() -} diff --git a/packages/binary/messagelayer/messageparser/builtinfilters/pow_filter.go b/packages/binary/messagelayer/messageparser/builtinfilters/pow_filter.go index c27e82cfae2f32facc0308b9ddcbcd969421be38..5de27b283457db365f8803571a1f487075ac7f7b 100644 --- a/packages/binary/messagelayer/messageparser/builtinfilters/pow_filter.go +++ b/packages/binary/messagelayer/messageparser/builtinfilters/pow_filter.go @@ -7,7 +7,6 @@ import ( "sync" "github.com/iotaledger/goshimmer/packages/pow" - "github.com/iotaledger/hive.go/async" "github.com/iotaledger/hive.go/autopeering/peer" ) @@ -22,7 +21,6 @@ var ( type PowFilter struct { worker *pow.Worker difficulty int - workerPool async.WorkerPool mu sync.Mutex acceptCallback func([]byte, *peer.Peer) @@ -39,13 +37,11 @@ func NewPowFilter(worker *pow.Worker, difficulty int) *PowFilter { // Filter checks whether the given bytes pass the PoW validation and calls the corresponding callback. func (f *PowFilter) Filter(msgBytes []byte, p *peer.Peer) { - // f.workerPool.Submit(func() { if err := f.validate(msgBytes); err != nil { f.reject(msgBytes, err, p) return } f.accept(msgBytes, p) - // }) } // OnAccept registers the given callback as the acceptance function of the filter. @@ -63,9 +59,7 @@ func (f *PowFilter) OnReject(callback func([]byte, error, *peer.Peer)) { } // Shutdown shuts down the filter. -func (f *PowFilter) Shutdown() { - f.workerPool.ShutdownGracefully() -} +func (f *PowFilter) Shutdown() {} func (f *PowFilter) accept(msgBytes []byte, p *peer.Peer) { f.mu.Lock() @@ -98,11 +92,6 @@ func (f *PowFilter) validate(msgBytes []byte) error { return nil } -// WorkerPoolStatus returns the name and the load of the workerpool. -func (f *PowFilter) WorkerPoolStatus() (name string, load int) { - return "PowFilter", f.workerPool.RunningWorkers() -} - // powData returns the bytes over which PoW should be computed. func powData(msgBytes []byte) ([]byte, error) { contentLength := len(msgBytes) - ed25519.SignatureSize diff --git a/packages/binary/messagelayer/messageparser/builtinfilters/recently_seen_bytes_filter.go b/packages/binary/messagelayer/messageparser/builtinfilters/recently_seen_bytes_filter.go index 42b44ede3328b3dd3ab5abdf0ea855e533bd3fd8..6cbeec505062967ff21f79e82f7f9813f94c65fd 100644 --- a/packages/binary/messagelayer/messageparser/builtinfilters/recently_seen_bytes_filter.go +++ b/packages/binary/messagelayer/messageparser/builtinfilters/recently_seen_bytes_filter.go @@ -4,7 +4,6 @@ import ( "fmt" "sync" - "github.com/iotaledger/hive.go/async" "github.com/iotaledger/hive.go/autopeering/peer" "github.com/iotaledger/hive.go/bytesfilter" ) @@ -17,7 +16,6 @@ type RecentlySeenBytesFilter struct { bytesFilter *bytesfilter.BytesFilter onAcceptCallback func(bytes []byte, peer *peer.Peer) onRejectCallback func(bytes []byte, err error, peer *peer.Peer) - workerPool async.WorkerPool onAcceptCallbackMutex sync.RWMutex onRejectCallbackMutex sync.RWMutex @@ -31,13 +29,11 @@ func NewRecentlySeenBytesFilter() *RecentlySeenBytesFilter { } func (filter *RecentlySeenBytesFilter) Filter(bytes []byte, peer *peer.Peer) { - // filter.workerPool.Submit(func() { if filter.bytesFilter.Add(bytes) { filter.getAcceptCallback()(bytes, peer) return } filter.getRejectCallback()(bytes, ErrReceivedDuplicateBytes, peer) - // }) } func (filter *RecentlySeenBytesFilter) OnAccept(callback func(bytes []byte, peer *peer.Peer)) { @@ -66,11 +62,4 @@ func (filter *RecentlySeenBytesFilter) getRejectCallback() (result func(bytes [] return } -func (filter *RecentlySeenBytesFilter) Shutdown() { - filter.workerPool.ShutdownGracefully() -} - -// WorkerPoolStatus returns the name and the load of the workerpool. -func (filter *RecentlySeenBytesFilter) WorkerPoolStatus() (name string, load int) { - return "RecentlySeenBytesFilter", filter.workerPool.RunningWorkers() -} +func (filter *RecentlySeenBytesFilter) Shutdown() {} diff --git a/packages/gossip/manager.go b/packages/gossip/manager.go index bfefb13cc201e87dd19a617f8aaeba760e9b83e1..db4e1e85925d7cc5c73c453997533314b8badd88 100644 --- a/packages/gossip/manager.go +++ b/packages/gossip/manager.go @@ -292,9 +292,14 @@ func marshal(packet pb.Packet) []byte { return append([]byte{byte(packetType)}, data...) } -// WorkerPoolStatus returns the name and the load of the workerpool. -func (m *Manager) WorkerPoolStatus() (name string, load int) { - return "InboxWorkerPool", m.messageWorkerPool.GetPendingQueueSize() + m.messageRequestWorkerPool.GetPendingQueueSize() +// MessageWorkerPoolStatus returns the name and the load of the workerpool. +func (m *Manager) MessageWorkerPoolStatus() (name string, load int) { + return "messageWorkerPool", m.messageWorkerPool.GetPendingQueueSize() +} + +// MessageRequestWorkerPoolStatus returns the name and the load of the workerpool. +func (m *Manager) MessageRequestWorkerPoolStatus() (name string, load int) { + return "messageRequestWorkerPool", m.messageRequestWorkerPool.GetPendingQueueSize() } func (m *Manager) processPacketMessage(data []byte, nbr *Neighbor) { diff --git a/plugins/prometheus/workerpool.go b/plugins/prometheus/workerpool.go index 30dbfcf3f2c7cc6742969a534460caff2fad2520..7bca48993451a702586e7f9169a56d80fea18c52 100644 --- a/plugins/prometheus/workerpool.go +++ b/plugins/prometheus/workerpool.go @@ -1,7 +1,6 @@ package prometheus import ( - "github.com/iotaledger/goshimmer/dapps/valuetransfers" "github.com/iotaledger/goshimmer/plugins/gossip" "github.com/iotaledger/goshimmer/plugins/messagelayer" "github.com/prometheus/client_golang/prometheus" @@ -28,7 +27,12 @@ func registerWorkerpoolMetrics() { } func collectWorkerpoolMetrics() { - name, load := gossip.Manager().WorkerPoolStatus() + name, load := gossip.Manager().MessageWorkerPoolStatus() + workerpools.WithLabelValues( + name, + ).Set(float64(load)) + + name, load = gossip.Manager().MessageRequestWorkerPoolStatus() workerpools.WithLabelValues( name, ).Set(float64(load)) @@ -42,32 +46,4 @@ func collectWorkerpoolMetrics() { workerpools.WithLabelValues( name, ).Set(float64(load)) - - if messagelayer.MessageParser().MessageSignatureFilter != nil { - name, load = messagelayer.MessageParser().MessageSignatureFilter.WorkerPoolStatus() - workerpools.WithLabelValues( - name, - ).Set(float64(load)) - } - - if valuetransfers.SignatureFilter != nil { - name, load = valuetransfers.SignatureFilter.WorkerPoolStatus() - workerpools.WithLabelValues( - name, - ).Set(float64(load)) - } - - if messagelayer.MessageParser().RecentlySeenBytesFilter != nil { - name, load = messagelayer.MessageParser().RecentlySeenBytesFilter.WorkerPoolStatus() - workerpools.WithLabelValues( - name, - ).Set(float64(load)) - } - - if messagelayer.MessageParser().PowFilter != nil { - name, load = messagelayer.MessageParser().PowFilter.WorkerPoolStatus() - workerpools.WithLabelValues( - name, - ).Set(float64(load)) - } }