Skip to content
Snippets Groups Projects
Unverified Commit 871d4bf6 authored by capossele's avatar capossele
Browse files

:recycle: Clean up code

parent 53e16226
Branches
Tags
No related merge requests found
......@@ -7,7 +7,6 @@ import (
"github.com/iotaledger/goshimmer/dapps/valuetransfers/packages/payload"
"github.com/iotaledger/goshimmer/packages/binary/messagelayer/message"
"github.com/iotaledger/goshimmer/packages/binary/messagelayer/messageparser"
"github.com/iotaledger/hive.go/async"
"github.com/iotaledger/hive.go/autopeering/peer"
)
......@@ -17,7 +16,6 @@ type SignatureFilter struct {
onRejectCallback func(message *message.Message, err error, peer *peer.Peer)
onAcceptCallbackMutex sync.RWMutex
onRejectCallbackMutex sync.RWMutex
workerPool async.WorkerPool
}
// NewSignatureFilter is the constructor of the MessageFilter.
......@@ -28,7 +26,6 @@ func NewSignatureFilter() *SignatureFilter {
// Filter get's called whenever a new message is received. It rejects the message, if the message is not a valid value
// message.
func (filter *SignatureFilter) Filter(message *message.Message, peer *peer.Peer) {
// filter.workerPool.Submit(func() {
// accept message if the message is not a value message (it will be checked by other filters)
valuePayload := message.Payload()
if valuePayload.Type() != payload.Type {
......@@ -54,7 +51,6 @@ func (filter *SignatureFilter) Filter(message *message.Message, peer *peer.Peer)
// if all previous checks passed: accept message
filter.getAcceptCallback()(message, peer)
// })
}
// OnAccept registers the given callback as the acceptance function of the filter.
......@@ -74,9 +70,7 @@ func (filter *SignatureFilter) OnReject(callback func(message *message.Message,
}
// Shutdown shuts down the filter.
func (filter *SignatureFilter) Shutdown() {
filter.workerPool.ShutdownGracefully()
}
func (filter *SignatureFilter) Shutdown() {}
// getAcceptCallback returns the callback that is executed when a message passes the filter.
func (filter *SignatureFilter) getAcceptCallback() func(message *message.Message, peer *peer.Peer) {
......@@ -94,10 +88,5 @@ func (filter *SignatureFilter) getRejectCallback() func(message *message.Message
return filter.onRejectCallback
}
// WorkerPoolStatus returns the name and the load of the workerpool.
func (filter *SignatureFilter) WorkerPoolStatus() (name string, load int) {
return "SignatureFilter", filter.workerPool.RunningWorkers()
}
// interface contract (allow the compiler to check if the implementation has all of the required methods).
var _ messageparser.MessageFilter = &SignatureFilter{}
......@@ -4,10 +4,8 @@ import (
"fmt"
"sync"
"github.com/iotaledger/hive.go/async"
"github.com/iotaledger/hive.go/autopeering/peer"
"github.com/iotaledger/goshimmer/packages/binary/messagelayer/message"
"github.com/iotaledger/hive.go/autopeering/peer"
)
// ErrInvalidSignature is returned when a message contains an invalid signature.
......@@ -17,7 +15,6 @@ var ErrInvalidSignature = fmt.Errorf("invalid signature")
type MessageSignatureFilter struct {
onAcceptCallback func(msg *message.Message, peer *peer.Peer)
onRejectCallback func(msg *message.Message, err error, peer *peer.Peer)
workerPool async.WorkerPool
onAcceptCallbackMutex sync.RWMutex
onRejectCallbackMutex sync.RWMutex
......@@ -29,13 +26,11 @@ func NewMessageSignatureFilter() *MessageSignatureFilter {
}
func (filter *MessageSignatureFilter) Filter(msg *message.Message, peer *peer.Peer) {
// filter.workerPool.Submit(func() {
if msg.VerifySignature() {
filter.getAcceptCallback()(msg, peer)
return
}
filter.getRejectCallback()(msg, ErrInvalidSignature, peer)
// })
}
func (filter *MessageSignatureFilter) OnAccept(callback func(msg *message.Message, peer *peer.Peer)) {
......@@ -50,9 +45,7 @@ func (filter *MessageSignatureFilter) OnReject(callback func(msg *message.Messag
filter.onRejectCallbackMutex.Unlock()
}
func (filter *MessageSignatureFilter) Shutdown() {
filter.workerPool.ShutdownGracefully()
}
func (filter *MessageSignatureFilter) Shutdown() {}
func (filter *MessageSignatureFilter) getAcceptCallback() (result func(msg *message.Message, peer *peer.Peer)) {
filter.onAcceptCallbackMutex.RLock()
......@@ -67,8 +60,3 @@ func (filter *MessageSignatureFilter) getRejectCallback() (result func(msg *mess
filter.onRejectCallbackMutex.RUnlock()
return
}
// WorkerPoolStatus returns the name and the load of the workerpool.
func (filter *MessageSignatureFilter) WorkerPoolStatus() (name string, load int) {
return "MessageSignatureFilter", filter.workerPool.RunningWorkers()
}
......@@ -7,7 +7,6 @@ import (
"sync"
"github.com/iotaledger/goshimmer/packages/pow"
"github.com/iotaledger/hive.go/async"
"github.com/iotaledger/hive.go/autopeering/peer"
)
......@@ -22,7 +21,6 @@ var (
type PowFilter struct {
worker *pow.Worker
difficulty int
workerPool async.WorkerPool
mu sync.Mutex
acceptCallback func([]byte, *peer.Peer)
......@@ -39,13 +37,11 @@ func NewPowFilter(worker *pow.Worker, difficulty int) *PowFilter {
// Filter checks whether the given bytes pass the PoW validation and calls the corresponding callback.
func (f *PowFilter) Filter(msgBytes []byte, p *peer.Peer) {
// f.workerPool.Submit(func() {
if err := f.validate(msgBytes); err != nil {
f.reject(msgBytes, err, p)
return
}
f.accept(msgBytes, p)
// })
}
// OnAccept registers the given callback as the acceptance function of the filter.
......@@ -63,9 +59,7 @@ func (f *PowFilter) OnReject(callback func([]byte, error, *peer.Peer)) {
}
// Shutdown shuts down the filter.
func (f *PowFilter) Shutdown() {
f.workerPool.ShutdownGracefully()
}
func (f *PowFilter) Shutdown() {}
func (f *PowFilter) accept(msgBytes []byte, p *peer.Peer) {
f.mu.Lock()
......@@ -98,11 +92,6 @@ func (f *PowFilter) validate(msgBytes []byte) error {
return nil
}
// WorkerPoolStatus returns the name and the load of the workerpool.
func (f *PowFilter) WorkerPoolStatus() (name string, load int) {
return "PowFilter", f.workerPool.RunningWorkers()
}
// powData returns the bytes over which PoW should be computed.
func powData(msgBytes []byte) ([]byte, error) {
contentLength := len(msgBytes) - ed25519.SignatureSize
......
......@@ -4,7 +4,6 @@ import (
"fmt"
"sync"
"github.com/iotaledger/hive.go/async"
"github.com/iotaledger/hive.go/autopeering/peer"
"github.com/iotaledger/hive.go/bytesfilter"
)
......@@ -17,7 +16,6 @@ type RecentlySeenBytesFilter struct {
bytesFilter *bytesfilter.BytesFilter
onAcceptCallback func(bytes []byte, peer *peer.Peer)
onRejectCallback func(bytes []byte, err error, peer *peer.Peer)
workerPool async.WorkerPool
onAcceptCallbackMutex sync.RWMutex
onRejectCallbackMutex sync.RWMutex
......@@ -31,13 +29,11 @@ func NewRecentlySeenBytesFilter() *RecentlySeenBytesFilter {
}
func (filter *RecentlySeenBytesFilter) Filter(bytes []byte, peer *peer.Peer) {
// filter.workerPool.Submit(func() {
if filter.bytesFilter.Add(bytes) {
filter.getAcceptCallback()(bytes, peer)
return
}
filter.getRejectCallback()(bytes, ErrReceivedDuplicateBytes, peer)
// })
}
func (filter *RecentlySeenBytesFilter) OnAccept(callback func(bytes []byte, peer *peer.Peer)) {
......@@ -66,11 +62,4 @@ func (filter *RecentlySeenBytesFilter) getRejectCallback() (result func(bytes []
return
}
func (filter *RecentlySeenBytesFilter) Shutdown() {
filter.workerPool.ShutdownGracefully()
}
// WorkerPoolStatus returns the name and the load of the workerpool.
func (filter *RecentlySeenBytesFilter) WorkerPoolStatus() (name string, load int) {
return "RecentlySeenBytesFilter", filter.workerPool.RunningWorkers()
}
func (filter *RecentlySeenBytesFilter) Shutdown() {}
......@@ -292,9 +292,14 @@ func marshal(packet pb.Packet) []byte {
return append([]byte{byte(packetType)}, data...)
}
// WorkerPoolStatus returns the name and the load of the workerpool.
func (m *Manager) WorkerPoolStatus() (name string, load int) {
return "InboxWorkerPool", m.messageWorkerPool.GetPendingQueueSize() + m.messageRequestWorkerPool.GetPendingQueueSize()
// MessageWorkerPoolStatus returns the name and the load of the workerpool.
func (m *Manager) MessageWorkerPoolStatus() (name string, load int) {
return "messageWorkerPool", m.messageWorkerPool.GetPendingQueueSize()
}
// MessageRequestWorkerPoolStatus returns the name and the load of the workerpool.
func (m *Manager) MessageRequestWorkerPoolStatus() (name string, load int) {
return "messageRequestWorkerPool", m.messageRequestWorkerPool.GetPendingQueueSize()
}
func (m *Manager) processPacketMessage(data []byte, nbr *Neighbor) {
......
package prometheus
import (
"github.com/iotaledger/goshimmer/dapps/valuetransfers"
"github.com/iotaledger/goshimmer/plugins/gossip"
"github.com/iotaledger/goshimmer/plugins/messagelayer"
"github.com/prometheus/client_golang/prometheus"
......@@ -28,7 +27,12 @@ func registerWorkerpoolMetrics() {
}
func collectWorkerpoolMetrics() {
name, load := gossip.Manager().WorkerPoolStatus()
name, load := gossip.Manager().MessageWorkerPoolStatus()
workerpools.WithLabelValues(
name,
).Set(float64(load))
name, load = gossip.Manager().MessageRequestWorkerPoolStatus()
workerpools.WithLabelValues(
name,
).Set(float64(load))
......@@ -42,32 +46,4 @@ func collectWorkerpoolMetrics() {
workerpools.WithLabelValues(
name,
).Set(float64(load))
if messagelayer.MessageParser().MessageSignatureFilter != nil {
name, load = messagelayer.MessageParser().MessageSignatureFilter.WorkerPoolStatus()
workerpools.WithLabelValues(
name,
).Set(float64(load))
}
if valuetransfers.SignatureFilter != nil {
name, load = valuetransfers.SignatureFilter.WorkerPoolStatus()
workerpools.WithLabelValues(
name,
).Set(float64(load))
}
if messagelayer.MessageParser().RecentlySeenBytesFilter != nil {
name, load = messagelayer.MessageParser().RecentlySeenBytesFilter.WorkerPoolStatus()
workerpools.WithLabelValues(
name,
).Set(float64(load))
}
if messagelayer.MessageParser().PowFilter != nil {
name, load = messagelayer.MessageParser().PowFilter.WorkerPoolStatus()
workerpools.WithLabelValues(
name,
).Set(float64(load))
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment