From 871d4bf6935521df54a88ea9ff5bbf4ba86731e1 Mon Sep 17 00:00:00 2001 From: capossele <angelocapossele@gmail.com> Date: Fri, 17 Jul 2020 15:41:51 +0100 Subject: [PATCH] =?UTF-8?q?=E2=99=BB=EF=B8=8F=20Clean=20up=20code?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../packages/tangle/signature_filter.go | 13 +------ .../message_signature_filter.go | 16 ++------- .../builtinfilters/pow_filter.go | 13 +------ .../recently_seen_bytes_filter.go | 13 +------ packages/gossip/manager.go | 11 ++++-- plugins/prometheus/workerpool.go | 36 ++++--------------- 6 files changed, 19 insertions(+), 83 deletions(-) diff --git a/dapps/valuetransfers/packages/tangle/signature_filter.go b/dapps/valuetransfers/packages/tangle/signature_filter.go index a8e77e1f..de79193a 100644 --- a/dapps/valuetransfers/packages/tangle/signature_filter.go +++ b/dapps/valuetransfers/packages/tangle/signature_filter.go @@ -7,7 +7,6 @@ import ( "github.com/iotaledger/goshimmer/dapps/valuetransfers/packages/payload" "github.com/iotaledger/goshimmer/packages/binary/messagelayer/message" "github.com/iotaledger/goshimmer/packages/binary/messagelayer/messageparser" - "github.com/iotaledger/hive.go/async" "github.com/iotaledger/hive.go/autopeering/peer" ) @@ -17,7 +16,6 @@ type SignatureFilter struct { onRejectCallback func(message *message.Message, err error, peer *peer.Peer) onAcceptCallbackMutex sync.RWMutex onRejectCallbackMutex sync.RWMutex - workerPool async.WorkerPool } // NewSignatureFilter is the constructor of the MessageFilter. @@ -28,7 +26,6 @@ func NewSignatureFilter() *SignatureFilter { // Filter get's called whenever a new message is received. It rejects the message, if the message is not a valid value // message. func (filter *SignatureFilter) Filter(message *message.Message, peer *peer.Peer) { - // filter.workerPool.Submit(func() { // accept message if the message is not a value message (it will be checked by other filters) valuePayload := message.Payload() if valuePayload.Type() != payload.Type { @@ -54,7 +51,6 @@ func (filter *SignatureFilter) Filter(message *message.Message, peer *peer.Peer) // if all previous checks passed: accept message filter.getAcceptCallback()(message, peer) - // }) } // OnAccept registers the given callback as the acceptance function of the filter. @@ -74,9 +70,7 @@ func (filter *SignatureFilter) OnReject(callback func(message *message.Message, } // Shutdown shuts down the filter. -func (filter *SignatureFilter) Shutdown() { - filter.workerPool.ShutdownGracefully() -} +func (filter *SignatureFilter) Shutdown() {} // getAcceptCallback returns the callback that is executed when a message passes the filter. func (filter *SignatureFilter) getAcceptCallback() func(message *message.Message, peer *peer.Peer) { @@ -94,10 +88,5 @@ func (filter *SignatureFilter) getRejectCallback() func(message *message.Message return filter.onRejectCallback } -// WorkerPoolStatus returns the name and the load of the workerpool. -func (filter *SignatureFilter) WorkerPoolStatus() (name string, load int) { - return "SignatureFilter", filter.workerPool.RunningWorkers() -} - // interface contract (allow the compiler to check if the implementation has all of the required methods). var _ messageparser.MessageFilter = &SignatureFilter{} diff --git a/packages/binary/messagelayer/messageparser/builtinfilters/message_signature_filter.go b/packages/binary/messagelayer/messageparser/builtinfilters/message_signature_filter.go index 887673dd..554cc4cc 100644 --- a/packages/binary/messagelayer/messageparser/builtinfilters/message_signature_filter.go +++ b/packages/binary/messagelayer/messageparser/builtinfilters/message_signature_filter.go @@ -4,10 +4,8 @@ import ( "fmt" "sync" - "github.com/iotaledger/hive.go/async" - "github.com/iotaledger/hive.go/autopeering/peer" - "github.com/iotaledger/goshimmer/packages/binary/messagelayer/message" + "github.com/iotaledger/hive.go/autopeering/peer" ) // ErrInvalidSignature is returned when a message contains an invalid signature. @@ -17,7 +15,6 @@ var ErrInvalidSignature = fmt.Errorf("invalid signature") type MessageSignatureFilter struct { onAcceptCallback func(msg *message.Message, peer *peer.Peer) onRejectCallback func(msg *message.Message, err error, peer *peer.Peer) - workerPool async.WorkerPool onAcceptCallbackMutex sync.RWMutex onRejectCallbackMutex sync.RWMutex @@ -29,13 +26,11 @@ func NewMessageSignatureFilter() *MessageSignatureFilter { } func (filter *MessageSignatureFilter) Filter(msg *message.Message, peer *peer.Peer) { - // filter.workerPool.Submit(func() { if msg.VerifySignature() { filter.getAcceptCallback()(msg, peer) return } filter.getRejectCallback()(msg, ErrInvalidSignature, peer) - // }) } func (filter *MessageSignatureFilter) OnAccept(callback func(msg *message.Message, peer *peer.Peer)) { @@ -50,9 +45,7 @@ func (filter *MessageSignatureFilter) OnReject(callback func(msg *message.Messag filter.onRejectCallbackMutex.Unlock() } -func (filter *MessageSignatureFilter) Shutdown() { - filter.workerPool.ShutdownGracefully() -} +func (filter *MessageSignatureFilter) Shutdown() {} func (filter *MessageSignatureFilter) getAcceptCallback() (result func(msg *message.Message, peer *peer.Peer)) { filter.onAcceptCallbackMutex.RLock() @@ -67,8 +60,3 @@ func (filter *MessageSignatureFilter) getRejectCallback() (result func(msg *mess filter.onRejectCallbackMutex.RUnlock() return } - -// WorkerPoolStatus returns the name and the load of the workerpool. -func (filter *MessageSignatureFilter) WorkerPoolStatus() (name string, load int) { - return "MessageSignatureFilter", filter.workerPool.RunningWorkers() -} diff --git a/packages/binary/messagelayer/messageparser/builtinfilters/pow_filter.go b/packages/binary/messagelayer/messageparser/builtinfilters/pow_filter.go index c27e82cf..5de27b28 100644 --- a/packages/binary/messagelayer/messageparser/builtinfilters/pow_filter.go +++ b/packages/binary/messagelayer/messageparser/builtinfilters/pow_filter.go @@ -7,7 +7,6 @@ import ( "sync" "github.com/iotaledger/goshimmer/packages/pow" - "github.com/iotaledger/hive.go/async" "github.com/iotaledger/hive.go/autopeering/peer" ) @@ -22,7 +21,6 @@ var ( type PowFilter struct { worker *pow.Worker difficulty int - workerPool async.WorkerPool mu sync.Mutex acceptCallback func([]byte, *peer.Peer) @@ -39,13 +37,11 @@ func NewPowFilter(worker *pow.Worker, difficulty int) *PowFilter { // Filter checks whether the given bytes pass the PoW validation and calls the corresponding callback. func (f *PowFilter) Filter(msgBytes []byte, p *peer.Peer) { - // f.workerPool.Submit(func() { if err := f.validate(msgBytes); err != nil { f.reject(msgBytes, err, p) return } f.accept(msgBytes, p) - // }) } // OnAccept registers the given callback as the acceptance function of the filter. @@ -63,9 +59,7 @@ func (f *PowFilter) OnReject(callback func([]byte, error, *peer.Peer)) { } // Shutdown shuts down the filter. -func (f *PowFilter) Shutdown() { - f.workerPool.ShutdownGracefully() -} +func (f *PowFilter) Shutdown() {} func (f *PowFilter) accept(msgBytes []byte, p *peer.Peer) { f.mu.Lock() @@ -98,11 +92,6 @@ func (f *PowFilter) validate(msgBytes []byte) error { return nil } -// WorkerPoolStatus returns the name and the load of the workerpool. -func (f *PowFilter) WorkerPoolStatus() (name string, load int) { - return "PowFilter", f.workerPool.RunningWorkers() -} - // powData returns the bytes over which PoW should be computed. func powData(msgBytes []byte) ([]byte, error) { contentLength := len(msgBytes) - ed25519.SignatureSize diff --git a/packages/binary/messagelayer/messageparser/builtinfilters/recently_seen_bytes_filter.go b/packages/binary/messagelayer/messageparser/builtinfilters/recently_seen_bytes_filter.go index 42b44ede..6cbeec50 100644 --- a/packages/binary/messagelayer/messageparser/builtinfilters/recently_seen_bytes_filter.go +++ b/packages/binary/messagelayer/messageparser/builtinfilters/recently_seen_bytes_filter.go @@ -4,7 +4,6 @@ import ( "fmt" "sync" - "github.com/iotaledger/hive.go/async" "github.com/iotaledger/hive.go/autopeering/peer" "github.com/iotaledger/hive.go/bytesfilter" ) @@ -17,7 +16,6 @@ type RecentlySeenBytesFilter struct { bytesFilter *bytesfilter.BytesFilter onAcceptCallback func(bytes []byte, peer *peer.Peer) onRejectCallback func(bytes []byte, err error, peer *peer.Peer) - workerPool async.WorkerPool onAcceptCallbackMutex sync.RWMutex onRejectCallbackMutex sync.RWMutex @@ -31,13 +29,11 @@ func NewRecentlySeenBytesFilter() *RecentlySeenBytesFilter { } func (filter *RecentlySeenBytesFilter) Filter(bytes []byte, peer *peer.Peer) { - // filter.workerPool.Submit(func() { if filter.bytesFilter.Add(bytes) { filter.getAcceptCallback()(bytes, peer) return } filter.getRejectCallback()(bytes, ErrReceivedDuplicateBytes, peer) - // }) } func (filter *RecentlySeenBytesFilter) OnAccept(callback func(bytes []byte, peer *peer.Peer)) { @@ -66,11 +62,4 @@ func (filter *RecentlySeenBytesFilter) getRejectCallback() (result func(bytes [] return } -func (filter *RecentlySeenBytesFilter) Shutdown() { - filter.workerPool.ShutdownGracefully() -} - -// WorkerPoolStatus returns the name and the load of the workerpool. -func (filter *RecentlySeenBytesFilter) WorkerPoolStatus() (name string, load int) { - return "RecentlySeenBytesFilter", filter.workerPool.RunningWorkers() -} +func (filter *RecentlySeenBytesFilter) Shutdown() {} diff --git a/packages/gossip/manager.go b/packages/gossip/manager.go index bfefb13c..db4e1e85 100644 --- a/packages/gossip/manager.go +++ b/packages/gossip/manager.go @@ -292,9 +292,14 @@ func marshal(packet pb.Packet) []byte { return append([]byte{byte(packetType)}, data...) } -// WorkerPoolStatus returns the name and the load of the workerpool. -func (m *Manager) WorkerPoolStatus() (name string, load int) { - return "InboxWorkerPool", m.messageWorkerPool.GetPendingQueueSize() + m.messageRequestWorkerPool.GetPendingQueueSize() +// MessageWorkerPoolStatus returns the name and the load of the workerpool. +func (m *Manager) MessageWorkerPoolStatus() (name string, load int) { + return "messageWorkerPool", m.messageWorkerPool.GetPendingQueueSize() +} + +// MessageRequestWorkerPoolStatus returns the name and the load of the workerpool. +func (m *Manager) MessageRequestWorkerPoolStatus() (name string, load int) { + return "messageRequestWorkerPool", m.messageRequestWorkerPool.GetPendingQueueSize() } func (m *Manager) processPacketMessage(data []byte, nbr *Neighbor) { diff --git a/plugins/prometheus/workerpool.go b/plugins/prometheus/workerpool.go index 30dbfcf3..7bca4899 100644 --- a/plugins/prometheus/workerpool.go +++ b/plugins/prometheus/workerpool.go @@ -1,7 +1,6 @@ package prometheus import ( - "github.com/iotaledger/goshimmer/dapps/valuetransfers" "github.com/iotaledger/goshimmer/plugins/gossip" "github.com/iotaledger/goshimmer/plugins/messagelayer" "github.com/prometheus/client_golang/prometheus" @@ -28,7 +27,12 @@ func registerWorkerpoolMetrics() { } func collectWorkerpoolMetrics() { - name, load := gossip.Manager().WorkerPoolStatus() + name, load := gossip.Manager().MessageWorkerPoolStatus() + workerpools.WithLabelValues( + name, + ).Set(float64(load)) + + name, load = gossip.Manager().MessageRequestWorkerPoolStatus() workerpools.WithLabelValues( name, ).Set(float64(load)) @@ -42,32 +46,4 @@ func collectWorkerpoolMetrics() { workerpools.WithLabelValues( name, ).Set(float64(load)) - - if messagelayer.MessageParser().MessageSignatureFilter != nil { - name, load = messagelayer.MessageParser().MessageSignatureFilter.WorkerPoolStatus() - workerpools.WithLabelValues( - name, - ).Set(float64(load)) - } - - if valuetransfers.SignatureFilter != nil { - name, load = valuetransfers.SignatureFilter.WorkerPoolStatus() - workerpools.WithLabelValues( - name, - ).Set(float64(load)) - } - - if messagelayer.MessageParser().RecentlySeenBytesFilter != nil { - name, load = messagelayer.MessageParser().RecentlySeenBytesFilter.WorkerPoolStatus() - workerpools.WithLabelValues( - name, - ).Set(float64(load)) - } - - if messagelayer.MessageParser().PowFilter != nil { - name, load = messagelayer.MessageParser().PowFilter.WorkerPoolStatus() - workerpools.WithLabelValues( - name, - ).Set(float64(load)) - } } -- GitLab