Skip to content
Snippets Groups Projects
Unverified Commit 439e8ab8 authored by Angelo Capossele's avatar Angelo Capossele Committed by GitHub
Browse files

Various sync related fixes (#650)

* :mag: Add workerpools load on Prometheus

* :bug: Fix bug

* :zap: Change unlock order in MessageRequester StartRequest

* :recycle: Refactor gossip manager workerpools

* :recycle: Disable filter workerpools

* :recycle: Increase WorkerPools size

* :recycle: Increase docker shutdown timeout

* :recycle: Clean up code

* :recycle: Refactor

* :heavy_minus_sign: Remove filter shutdown

* :zap: Increase messageRequestWorkerCount
parent 70cfe4ee
Branches
Tags
No related merge requests found
Showing
with 199 additions and 139 deletions
...@@ -7,7 +7,6 @@ import ( ...@@ -7,7 +7,6 @@ import (
"github.com/iotaledger/goshimmer/dapps/valuetransfers/packages/payload" "github.com/iotaledger/goshimmer/dapps/valuetransfers/packages/payload"
"github.com/iotaledger/goshimmer/packages/binary/messagelayer/message" "github.com/iotaledger/goshimmer/packages/binary/messagelayer/message"
"github.com/iotaledger/goshimmer/packages/binary/messagelayer/messageparser" "github.com/iotaledger/goshimmer/packages/binary/messagelayer/messageparser"
"github.com/iotaledger/hive.go/async"
"github.com/iotaledger/hive.go/autopeering/peer" "github.com/iotaledger/hive.go/autopeering/peer"
) )
...@@ -17,7 +16,6 @@ type SignatureFilter struct { ...@@ -17,7 +16,6 @@ type SignatureFilter struct {
onRejectCallback func(message *message.Message, err error, peer *peer.Peer) onRejectCallback func(message *message.Message, err error, peer *peer.Peer)
onAcceptCallbackMutex sync.RWMutex onAcceptCallbackMutex sync.RWMutex
onRejectCallbackMutex sync.RWMutex onRejectCallbackMutex sync.RWMutex
workerPool async.WorkerPool
} }
// NewSignatureFilter is the constructor of the MessageFilter. // NewSignatureFilter is the constructor of the MessageFilter.
...@@ -28,33 +26,31 @@ func NewSignatureFilter() *SignatureFilter { ...@@ -28,33 +26,31 @@ func NewSignatureFilter() *SignatureFilter {
// Filter get's called whenever a new message is received. It rejects the message, if the message is not a valid value // Filter get's called whenever a new message is received. It rejects the message, if the message is not a valid value
// message. // message.
func (filter *SignatureFilter) Filter(message *message.Message, peer *peer.Peer) { func (filter *SignatureFilter) Filter(message *message.Message, peer *peer.Peer) {
filter.workerPool.Submit(func() { // accept message if the message is not a value message (it will be checked by other filters)
// accept message if the message is not a value message (it will be checked by other filters) valuePayload := message.Payload()
valuePayload := message.Payload() if valuePayload.Type() != payload.Type {
if valuePayload.Type() != payload.Type { filter.getAcceptCallback()(message, peer)
filter.getAcceptCallback()(message, peer)
return return
} }
// reject if the payload can not be casted to a ValuePayload (invalid payload) // reject if the payload can not be casted to a ValuePayload (invalid payload)
typeCastedValuePayload, ok := valuePayload.(*payload.Payload) typeCastedValuePayload, ok := valuePayload.(*payload.Payload)
if !ok { if !ok {
filter.getRejectCallback()(message, errors.New("invalid value message"), peer) filter.getRejectCallback()(message, errors.New("invalid value message"), peer)
return return
} }
// reject message if it contains a transaction with invalid signatures // reject message if it contains a transaction with invalid signatures
if !typeCastedValuePayload.Transaction().SignaturesValid() { if !typeCastedValuePayload.Transaction().SignaturesValid() {
filter.getRejectCallback()(message, errors.New("invalid transaction signatures"), peer) filter.getRejectCallback()(message, errors.New("invalid transaction signatures"), peer)
return return
} }
// if all previous checks passed: accept message // if all previous checks passed: accept message
filter.getAcceptCallback()(message, peer) filter.getAcceptCallback()(message, peer)
})
} }
// OnAccept registers the given callback as the acceptance function of the filter. // OnAccept registers the given callback as the acceptance function of the filter.
...@@ -73,11 +69,6 @@ func (filter *SignatureFilter) OnReject(callback func(message *message.Message, ...@@ -73,11 +69,6 @@ func (filter *SignatureFilter) OnReject(callback func(message *message.Message,
filter.onRejectCallback = callback filter.onRejectCallback = callback
} }
// Shutdown shuts down the filter.
func (filter *SignatureFilter) Shutdown() {
filter.workerPool.ShutdownGracefully()
}
// getAcceptCallback returns the callback that is executed when a message passes the filter. // getAcceptCallback returns the callback that is executed when a message passes the filter.
func (filter *SignatureFilter) getAcceptCallback() func(message *message.Message, peer *peer.Peer) { func (filter *SignatureFilter) getAcceptCallback() func(message *message.Message, peer *peer.Peer) {
filter.onAcceptCallbackMutex.RLock() filter.onAcceptCallbackMutex.RLock()
......
...@@ -4,10 +4,8 @@ import ( ...@@ -4,10 +4,8 @@ import (
"fmt" "fmt"
"sync" "sync"
"github.com/iotaledger/hive.go/async"
"github.com/iotaledger/hive.go/autopeering/peer"
"github.com/iotaledger/goshimmer/packages/binary/messagelayer/message" "github.com/iotaledger/goshimmer/packages/binary/messagelayer/message"
"github.com/iotaledger/hive.go/autopeering/peer"
) )
// ErrInvalidSignature is returned when a message contains an invalid signature. // ErrInvalidSignature is returned when a message contains an invalid signature.
...@@ -17,7 +15,6 @@ var ErrInvalidSignature = fmt.Errorf("invalid signature") ...@@ -17,7 +15,6 @@ var ErrInvalidSignature = fmt.Errorf("invalid signature")
type MessageSignatureFilter struct { type MessageSignatureFilter struct {
onAcceptCallback func(msg *message.Message, peer *peer.Peer) onAcceptCallback func(msg *message.Message, peer *peer.Peer)
onRejectCallback func(msg *message.Message, err error, peer *peer.Peer) onRejectCallback func(msg *message.Message, err error, peer *peer.Peer)
workerPool async.WorkerPool
onAcceptCallbackMutex sync.RWMutex onAcceptCallbackMutex sync.RWMutex
onRejectCallbackMutex sync.RWMutex onRejectCallbackMutex sync.RWMutex
...@@ -28,32 +25,30 @@ func NewMessageSignatureFilter() *MessageSignatureFilter { ...@@ -28,32 +25,30 @@ func NewMessageSignatureFilter() *MessageSignatureFilter {
return &MessageSignatureFilter{} return &MessageSignatureFilter{}
} }
// Filter filters up on the given bytes and peer and calls the acceptance callback
// if the input passes or the rejection callback if the input is rejected.
func (filter *MessageSignatureFilter) Filter(msg *message.Message, peer *peer.Peer) { func (filter *MessageSignatureFilter) Filter(msg *message.Message, peer *peer.Peer) {
filter.workerPool.Submit(func() { if msg.VerifySignature() {
if msg.VerifySignature() { filter.getAcceptCallback()(msg, peer)
filter.getAcceptCallback()(msg, peer) return
return }
} filter.getRejectCallback()(msg, ErrInvalidSignature, peer)
filter.getRejectCallback()(msg, ErrInvalidSignature, peer)
})
} }
// OnAccept registers the given callback as the acceptance function of the filter.
func (filter *MessageSignatureFilter) OnAccept(callback func(msg *message.Message, peer *peer.Peer)) { func (filter *MessageSignatureFilter) OnAccept(callback func(msg *message.Message, peer *peer.Peer)) {
filter.onAcceptCallbackMutex.Lock() filter.onAcceptCallbackMutex.Lock()
filter.onAcceptCallback = callback filter.onAcceptCallback = callback
filter.onAcceptCallbackMutex.Unlock() filter.onAcceptCallbackMutex.Unlock()
} }
// OnReject registers the given callback as the rejection function of the filter.
func (filter *MessageSignatureFilter) OnReject(callback func(msg *message.Message, err error, peer *peer.Peer)) { func (filter *MessageSignatureFilter) OnReject(callback func(msg *message.Message, err error, peer *peer.Peer)) {
filter.onRejectCallbackMutex.Lock() filter.onRejectCallbackMutex.Lock()
filter.onRejectCallback = callback filter.onRejectCallback = callback
filter.onRejectCallbackMutex.Unlock() filter.onRejectCallbackMutex.Unlock()
} }
func (filter *MessageSignatureFilter) Shutdown() {
filter.workerPool.ShutdownGracefully()
}
func (filter *MessageSignatureFilter) getAcceptCallback() (result func(msg *message.Message, peer *peer.Peer)) { func (filter *MessageSignatureFilter) getAcceptCallback() (result func(msg *message.Message, peer *peer.Peer)) {
filter.onAcceptCallbackMutex.RLock() filter.onAcceptCallbackMutex.RLock()
result = filter.onAcceptCallback result = filter.onAcceptCallback
......
...@@ -7,7 +7,6 @@ import ( ...@@ -7,7 +7,6 @@ import (
"sync" "sync"
"github.com/iotaledger/goshimmer/packages/pow" "github.com/iotaledger/goshimmer/packages/pow"
"github.com/iotaledger/hive.go/async"
"github.com/iotaledger/hive.go/autopeering/peer" "github.com/iotaledger/hive.go/autopeering/peer"
) )
...@@ -22,7 +21,6 @@ var ( ...@@ -22,7 +21,6 @@ var (
type PowFilter struct { type PowFilter struct {
worker *pow.Worker worker *pow.Worker
difficulty int difficulty int
workerPool async.WorkerPool
mu sync.Mutex mu sync.Mutex
acceptCallback func([]byte, *peer.Peer) acceptCallback func([]byte, *peer.Peer)
...@@ -39,13 +37,11 @@ func NewPowFilter(worker *pow.Worker, difficulty int) *PowFilter { ...@@ -39,13 +37,11 @@ func NewPowFilter(worker *pow.Worker, difficulty int) *PowFilter {
// Filter checks whether the given bytes pass the PoW validation and calls the corresponding callback. // Filter checks whether the given bytes pass the PoW validation and calls the corresponding callback.
func (f *PowFilter) Filter(msgBytes []byte, p *peer.Peer) { func (f *PowFilter) Filter(msgBytes []byte, p *peer.Peer) {
f.workerPool.Submit(func() { if err := f.validate(msgBytes); err != nil {
if err := f.validate(msgBytes); err != nil { f.reject(msgBytes, err, p)
f.reject(msgBytes, err, p) return
return }
} f.accept(msgBytes, p)
f.accept(msgBytes, p)
})
} }
// OnAccept registers the given callback as the acceptance function of the filter. // OnAccept registers the given callback as the acceptance function of the filter.
...@@ -62,11 +58,6 @@ func (f *PowFilter) OnReject(callback func([]byte, error, *peer.Peer)) { ...@@ -62,11 +58,6 @@ func (f *PowFilter) OnReject(callback func([]byte, error, *peer.Peer)) {
f.rejectCallback = callback f.rejectCallback = callback
} }
// Shutdown shuts down the filter.
func (f *PowFilter) Shutdown() {
f.workerPool.ShutdownGracefully()
}
func (f *PowFilter) accept(msgBytes []byte, p *peer.Peer) { func (f *PowFilter) accept(msgBytes []byte, p *peer.Peer) {
f.mu.Lock() f.mu.Lock()
defer f.mu.Unlock() defer f.mu.Unlock()
......
...@@ -18,15 +18,14 @@ import ( ...@@ -18,15 +18,14 @@ import (
) )
var ( var (
testPayload = payload.NewData([]byte("test")) testPayload = payload.NewData([]byte("test"))
testPeer *peer.Peer = nil testPeer *peer.Peer
testWorker = pow.New(crypto.BLAKE2b_512, 1) testWorker = pow.New(crypto.BLAKE2b_512, 1)
testDifficulty = 10 testDifficulty = 10
) )
func TestPowFilter_Filter(t *testing.T) { func TestPowFilter_Filter(t *testing.T) {
filter := NewPowFilter(testWorker, testDifficulty) filter := NewPowFilter(testWorker, testDifficulty)
defer filter.Shutdown()
// set callbacks // set callbacks
m := &callbackMock{} m := &callbackMock{}
...@@ -61,7 +60,6 @@ func TestPowFilter_Filter(t *testing.T) { ...@@ -61,7 +60,6 @@ func TestPowFilter_Filter(t *testing.T) {
filter.Filter(msgPOWBytes, testPeer) filter.Filter(msgPOWBytes, testPeer)
}) })
filter.Shutdown()
m.AssertExpectations(t) m.AssertExpectations(t)
} }
......
...@@ -4,7 +4,6 @@ import ( ...@@ -4,7 +4,6 @@ import (
"fmt" "fmt"
"sync" "sync"
"github.com/iotaledger/hive.go/async"
"github.com/iotaledger/hive.go/autopeering/peer" "github.com/iotaledger/hive.go/autopeering/peer"
"github.com/iotaledger/hive.go/bytesfilter" "github.com/iotaledger/hive.go/bytesfilter"
) )
...@@ -17,7 +16,6 @@ type RecentlySeenBytesFilter struct { ...@@ -17,7 +16,6 @@ type RecentlySeenBytesFilter struct {
bytesFilter *bytesfilter.BytesFilter bytesFilter *bytesfilter.BytesFilter
onAcceptCallback func(bytes []byte, peer *peer.Peer) onAcceptCallback func(bytes []byte, peer *peer.Peer)
onRejectCallback func(bytes []byte, err error, peer *peer.Peer) onRejectCallback func(bytes []byte, err error, peer *peer.Peer)
workerPool async.WorkerPool
onAcceptCallbackMutex sync.RWMutex onAcceptCallbackMutex sync.RWMutex
onRejectCallbackMutex sync.RWMutex onRejectCallbackMutex sync.RWMutex
...@@ -30,22 +28,24 @@ func NewRecentlySeenBytesFilter() *RecentlySeenBytesFilter { ...@@ -30,22 +28,24 @@ func NewRecentlySeenBytesFilter() *RecentlySeenBytesFilter {
} }
} }
// Filter filters up on the given bytes and peer and calls the acceptance callback
// if the input passes or the rejection callback if the input is rejected.
func (filter *RecentlySeenBytesFilter) Filter(bytes []byte, peer *peer.Peer) { func (filter *RecentlySeenBytesFilter) Filter(bytes []byte, peer *peer.Peer) {
filter.workerPool.Submit(func() { if filter.bytesFilter.Add(bytes) {
if filter.bytesFilter.Add(bytes) { filter.getAcceptCallback()(bytes, peer)
filter.getAcceptCallback()(bytes, peer) return
return }
} filter.getRejectCallback()(bytes, ErrReceivedDuplicateBytes, peer)
filter.getRejectCallback()(bytes, ErrReceivedDuplicateBytes, peer)
})
} }
// OnAccept registers the given callback as the acceptance function of the filter.
func (filter *RecentlySeenBytesFilter) OnAccept(callback func(bytes []byte, peer *peer.Peer)) { func (filter *RecentlySeenBytesFilter) OnAccept(callback func(bytes []byte, peer *peer.Peer)) {
filter.onAcceptCallbackMutex.Lock() filter.onAcceptCallbackMutex.Lock()
filter.onAcceptCallback = callback filter.onAcceptCallback = callback
filter.onAcceptCallbackMutex.Unlock() filter.onAcceptCallbackMutex.Unlock()
} }
// OnReject registers the given callback as the rejection function of the filter.
func (filter *RecentlySeenBytesFilter) OnReject(callback func(bytes []byte, err error, peer *peer.Peer)) { func (filter *RecentlySeenBytesFilter) OnReject(callback func(bytes []byte, err error, peer *peer.Peer)) {
filter.onRejectCallbackMutex.Lock() filter.onRejectCallbackMutex.Lock()
filter.onRejectCallback = callback filter.onRejectCallback = callback
...@@ -65,7 +65,3 @@ func (filter *RecentlySeenBytesFilter) getRejectCallback() (result func(bytes [] ...@@ -65,7 +65,3 @@ func (filter *RecentlySeenBytesFilter) getRejectCallback() (result func(bytes []
filter.onRejectCallbackMutex.Unlock() filter.onRejectCallbackMutex.Unlock()
return return
} }
func (filter *RecentlySeenBytesFilter) Shutdown() {
filter.workerPool.ShutdownGracefully()
}
...@@ -13,6 +13,4 @@ type BytesFilter interface { ...@@ -13,6 +13,4 @@ type BytesFilter interface {
OnAccept(callback func(bytes []byte, peer *peer.Peer)) OnAccept(callback func(bytes []byte, peer *peer.Peer))
// OnReject registers the given callback as the rejection function of the filter. // OnReject registers the given callback as the rejection function of the filter.
OnReject(callback func(bytes []byte, err error, peer *peer.Peer)) OnReject(callback func(bytes []byte, err error, peer *peer.Peer))
// Shutdown shuts down the filter.
Shutdown()
} }
...@@ -15,6 +15,4 @@ type MessageFilter interface { ...@@ -15,6 +15,4 @@ type MessageFilter interface {
OnAccept(callback func(msg *message.Message, peer *peer.Peer)) OnAccept(callback func(msg *message.Message, peer *peer.Peer))
// OnAccept registers the given callback as the rejection function of the filter. // OnAccept registers the given callback as the rejection function of the filter.
OnReject(callback func(msg *message.Message, err error, peer *peer.Peer)) OnReject(callback func(msg *message.Message, err error, peer *peer.Peer))
// Shutdown shuts down the filter.
Shutdown()
} }
...@@ -70,21 +70,6 @@ func (messageParser *MessageParser) AddMessageFilter(filter MessageFilter) { ...@@ -70,21 +70,6 @@ func (messageParser *MessageParser) AddMessageFilter(filter MessageFilter) {
messageParser.messageFiltersModified.Set() messageParser.messageFiltersModified.Set()
} }
// Shutdown shut downs the message parser and its corresponding registered filters.
func (messageParser *MessageParser) Shutdown() {
messageParser.bytesFiltersMutex.Lock()
for _, bytesFilter := range messageParser.bytesFilters {
bytesFilter.Shutdown()
}
messageParser.bytesFiltersMutex.Unlock()
messageParser.messageFiltersMutex.Lock()
for _, messageFilter := range messageParser.messageFilters {
messageFilter.Shutdown()
}
messageParser.messageFiltersMutex.Unlock()
}
// sets up the byte filter data flow chain. // sets up the byte filter data flow chain.
func (messageParser *MessageParser) setupBytesFilterDataFlow() { func (messageParser *MessageParser) setupBytesFilterDataFlow() {
if !messageParser.byteFiltersModified.IsSet() { if !messageParser.byteFiltersModified.IsSet() {
......
...@@ -22,8 +22,6 @@ func BenchmarkMessageParser_ParseBytesSame(b *testing.B) { ...@@ -22,8 +22,6 @@ func BenchmarkMessageParser_ParseBytesSame(b *testing.B) {
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
msgParser.Parse(msgBytes, nil) msgParser.Parse(msgBytes, nil)
} }
msgParser.Shutdown()
} }
func BenchmarkMessageParser_ParseBytesDifferent(b *testing.B) { func BenchmarkMessageParser_ParseBytesDifferent(b *testing.B) {
...@@ -39,8 +37,6 @@ func BenchmarkMessageParser_ParseBytesDifferent(b *testing.B) { ...@@ -39,8 +37,6 @@ func BenchmarkMessageParser_ParseBytesDifferent(b *testing.B) {
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
msgParser.Parse(messageBytes[i], nil) msgParser.Parse(messageBytes[i], nil)
} }
msgParser.Shutdown()
} }
func TestMessageParser_ParseMessage(t *testing.T) { func TestMessageParser_ParseMessage(t *testing.T) {
...@@ -52,8 +48,6 @@ func TestMessageParser_ParseMessage(t *testing.T) { ...@@ -52,8 +48,6 @@ func TestMessageParser_ParseMessage(t *testing.T) {
msgParser.Events.MessageParsed.Attach(events.NewClosure(func(msg *message.Message) { msgParser.Events.MessageParsed.Attach(events.NewClosure(func(msg *message.Message) {
log.Infof("parsed message") log.Infof("parsed message")
})) }))
msgParser.Shutdown()
} }
func newTestMessage(payloadString string) *message.Message { func newTestMessage(payloadString string) *message.Message {
......
...@@ -33,17 +33,17 @@ func New(optionalOptions ...Option) *MessageRequester { ...@@ -33,17 +33,17 @@ func New(optionalOptions ...Option) *MessageRequester {
// StartRequest initiates a regular triggering of the StartRequest event until it has been stopped using StopRequest. // StartRequest initiates a regular triggering of the StartRequest event until it has been stopped using StopRequest.
func (requester *MessageRequester) StartRequest(id message.Id) { func (requester *MessageRequester) StartRequest(id message.Id) {
requester.scheduledRequestsMutex.Lock() requester.scheduledRequestsMutex.Lock()
defer requester.scheduledRequestsMutex.Unlock()
// ignore already scheduled requests // ignore already scheduled requests
if _, exists := requester.scheduledRequests[id]; exists { if _, exists := requester.scheduledRequests[id]; exists {
requester.scheduledRequestsMutex.Unlock()
return return
} }
// trigger the event and schedule the next request // schedule the next request and trigger the event
// make this atomic to be sure that a successive call of StartRequest does not trigger again
requester.Events.SendRequest.Trigger(id)
requester.scheduledRequests[id] = time.AfterFunc(requester.options.retryInterval, func() { requester.reRequest(id) }) requester.scheduledRequests[id] = time.AfterFunc(requester.options.retryInterval, func() { requester.reRequest(id) })
requester.scheduledRequestsMutex.Unlock()
requester.Events.SendRequest.Trigger(id)
} }
// StopRequest stops requests for the given message to further happen. // StopRequest stops requests for the given message to further happen.
......
...@@ -2,7 +2,6 @@ package tangle ...@@ -2,7 +2,6 @@ package tangle
import ( import (
"container/list" "container/list"
"runtime"
"time" "time"
"github.com/iotaledger/goshimmer/packages/binary/messagelayer/message" "github.com/iotaledger/goshimmer/packages/binary/messagelayer/message"
...@@ -57,7 +56,8 @@ func New(store kvstore.KVStore) (result *Tangle) { ...@@ -57,7 +56,8 @@ func New(store kvstore.KVStore) (result *Tangle) {
Events: *newEvents(), Events: *newEvents(),
} }
result.solidifierWorkerPool.Tune(runtime.GOMAXPROCS(0)) result.solidifierWorkerPool.Tune(1024)
result.storeMessageWorkerPool.Tune(1024)
return return
} }
...@@ -294,3 +294,13 @@ func (tangle *Tangle) deleteFutureCone(messageId message.Id) { ...@@ -294,3 +294,13 @@ func (tangle *Tangle) deleteFutureCone(messageId message.Id) {
}) })
} }
} }
// SolidifierWorkerPoolStatus returns the name and the load of the workerpool.
func (tangle *Tangle) SolidifierWorkerPoolStatus() (name string, load int) {
return "Solidifier", tangle.solidifierWorkerPool.RunningWorkers()
}
// StoreMessageWorkerPoolStatus returns the name and the load of the workerpool.
func (tangle *Tangle) StoreMessageWorkerPoolStatus() (name string, load int) {
return "StoreMessage", tangle.storeMessageWorkerPool.RunningWorkers()
}
...@@ -3,17 +3,18 @@ package gossip ...@@ -3,17 +3,18 @@ package gossip
import ( import (
"fmt" "fmt"
"net" "net"
"runtime"
"sync" "sync"
"github.com/golang/protobuf/proto" "github.com/golang/protobuf/proto"
"github.com/iotaledger/goshimmer/packages/binary/messagelayer/message" "github.com/iotaledger/goshimmer/packages/binary/messagelayer/message"
pb "github.com/iotaledger/goshimmer/packages/gossip/proto" pb "github.com/iotaledger/goshimmer/packages/gossip/proto"
"github.com/iotaledger/goshimmer/packages/gossip/server" "github.com/iotaledger/goshimmer/packages/gossip/server"
"github.com/iotaledger/hive.go/async"
"github.com/iotaledger/hive.go/autopeering/peer" "github.com/iotaledger/hive.go/autopeering/peer"
"github.com/iotaledger/hive.go/events" "github.com/iotaledger/hive.go/events"
"github.com/iotaledger/hive.go/identity" "github.com/iotaledger/hive.go/identity"
"github.com/iotaledger/hive.go/logger" "github.com/iotaledger/hive.go/logger"
"github.com/iotaledger/hive.go/workerpool"
) )
const ( const (
...@@ -21,6 +22,14 @@ const ( ...@@ -21,6 +22,14 @@ const (
maxPacketSize = 65 * 1024 maxPacketSize = 65 * 1024
) )
var (
messageWorkerCount = runtime.GOMAXPROCS(0) * 4
messageWorkerQueueSize = 1000
messageRequestWorkerCount = runtime.GOMAXPROCS(0)
messageRequestWorkerQueueSize = 100
)
// LoadMessageFunc defines a function that returns the message for the given id. // LoadMessageFunc defines a function that returns the message for the given id.
type LoadMessageFunc func(messageId message.Id) ([]byte, error) type LoadMessageFunc func(messageId message.Id) ([]byte, error)
...@@ -37,8 +46,10 @@ type Manager struct { ...@@ -37,8 +46,10 @@ type Manager struct {
srv *server.TCP srv *server.TCP
neighbors map[identity.ID]*Neighbor neighbors map[identity.ID]*Neighbor
// inboxWorkerPool defines a worker pool where all incoming messages are processed. // messageWorkerPool defines a worker pool where all incoming messages are processed.
inboxWorkerPool async.WorkerPool messageWorkerPool *workerpool.WorkerPool
messageRequestWorkerPool *workerpool.WorkerPool
} }
// NewManager creates a new Manager. // NewManager creates a new Manager.
...@@ -56,6 +67,21 @@ func NewManager(local *peer.Local, f LoadMessageFunc, log *logger.Logger) *Manag ...@@ -56,6 +67,21 @@ func NewManager(local *peer.Local, f LoadMessageFunc, log *logger.Logger) *Manag
srv: nil, srv: nil,
neighbors: make(map[identity.ID]*Neighbor), neighbors: make(map[identity.ID]*Neighbor),
} }
m.messageWorkerPool = workerpool.New(func(task workerpool.Task) {
m.processPacketMessage(task.Param(0).([]byte), task.Param(1).(*Neighbor))
task.Return(nil)
}, workerpool.WorkerCount(messageWorkerCount), workerpool.QueueSize(messageWorkerQueueSize))
m.messageRequestWorkerPool = workerpool.New(func(task workerpool.Task) {
m.processMessageRequest(task.Param(0).([]byte), task.Param(1).(*Neighbor))
task.Return(nil)
}, workerpool.WorkerCount(messageRequestWorkerCount), workerpool.QueueSize(messageRequestWorkerQueueSize))
return m return m
} }
...@@ -65,6 +91,9 @@ func (m *Manager) Start(srv *server.TCP) { ...@@ -65,6 +91,9 @@ func (m *Manager) Start(srv *server.TCP) {
defer m.mu.Unlock() defer m.mu.Unlock()
m.srv = srv m.srv = srv
m.messageWorkerPool.Start()
m.messageRequestWorkerPool.Start()
} }
// Close stops the manager and closes all established connections. // Close stops the manager and closes all established connections.
...@@ -72,7 +101,8 @@ func (m *Manager) Close() { ...@@ -72,7 +101,8 @@ func (m *Manager) Close() {
m.stop() m.stop()
m.wg.Wait() m.wg.Wait()
m.inboxWorkerPool.ShutdownGracefully() m.messageWorkerPool.Stop()
m.messageRequestWorkerPool.Stop()
} }
// Events returns the events related to the gossip protocol. // Events returns the events related to the gossip protocol.
...@@ -214,11 +244,9 @@ func (m *Manager) addNeighbor(peer *peer.Peer, connectorFunc func(*peer.Peer) (n ...@@ -214,11 +244,9 @@ func (m *Manager) addNeighbor(peer *peer.Peer, connectorFunc func(*peer.Peer) (n
nbr.Events.ReceiveMessage.Attach(events.NewClosure(func(data []byte) { nbr.Events.ReceiveMessage.Attach(events.NewClosure(func(data []byte) {
dataCopy := make([]byte, len(data)) dataCopy := make([]byte, len(data))
copy(dataCopy, data) copy(dataCopy, data)
m.inboxWorkerPool.Submit(func() { if err := m.handlePacket(dataCopy, nbr); err != nil {
if err := m.handlePacket(dataCopy, nbr); err != nil { m.log.Debugw("error handling packet", "err", err)
m.log.Debugw("error handling packet", "err", err) }
}
})
})) }))
m.neighbors[peer.ID()] = nbr m.neighbors[peer.ID()] = nbr
...@@ -237,31 +265,14 @@ func (m *Manager) handlePacket(data []byte, nbr *Neighbor) error { ...@@ -237,31 +265,14 @@ func (m *Manager) handlePacket(data []byte, nbr *Neighbor) error {
switch pb.PacketType(data[0]) { switch pb.PacketType(data[0]) {
case pb.PacketMessage: case pb.PacketMessage:
packet := new(pb.Message) if _, added := m.messageWorkerPool.TrySubmit(data, nbr); !added {
if err := proto.Unmarshal(data[1:], packet); err != nil { return fmt.Errorf("messageWorkerPool full: packet message discarded")
return fmt.Errorf("invalid packet: %w", err)
} }
m.events.MessageReceived.Trigger(&MessageReceivedEvent{Data: packet.GetData(), Peer: nbr.Peer})
case pb.PacketMessageRequest: case pb.PacketMessageRequest:
packet := new(pb.MessageRequest) if _, added := m.messageRequestWorkerPool.TrySubmit(data, nbr); !added {
if err := proto.Unmarshal(data[1:], packet); err != nil { return fmt.Errorf("messageRequestWorkerPool full: message request discarded")
return fmt.Errorf("invalid packet: %w", err)
}
msgID, _, err := message.IdFromBytes(packet.GetId())
if err != nil {
return fmt.Errorf("invalid message id: %w", err)
}
msgBytes, err := m.loadMessageFunc(msgID)
if err != nil {
m.log.Debugw("error loading message", "msg-id", msgID, "err", err)
return nil
} }
// send the loaded message directly to the neighbor
_, _ = nbr.Write(marshal(&pb.Message{Data: msgBytes}))
default: default:
return ErrInvalidPacket return ErrInvalidPacket
} }
...@@ -281,3 +292,41 @@ func marshal(packet pb.Packet) []byte { ...@@ -281,3 +292,41 @@ func marshal(packet pb.Packet) []byte {
} }
return append([]byte{byte(packetType)}, data...) return append([]byte{byte(packetType)}, data...)
} }
// MessageWorkerPoolStatus returns the name and the load of the workerpool.
func (m *Manager) MessageWorkerPoolStatus() (name string, load int) {
return "messageWorkerPool", m.messageWorkerPool.GetPendingQueueSize()
}
// MessageRequestWorkerPoolStatus returns the name and the load of the workerpool.
func (m *Manager) MessageRequestWorkerPoolStatus() (name string, load int) {
return "messageRequestWorkerPool", m.messageRequestWorkerPool.GetPendingQueueSize()
}
func (m *Manager) processPacketMessage(data []byte, nbr *Neighbor) {
packet := new(pb.Message)
if err := proto.Unmarshal(data[1:], packet); err != nil {
m.log.Debugw("error processing packet", "err", err)
}
m.events.MessageReceived.Trigger(&MessageReceivedEvent{Data: packet.GetData(), Peer: nbr.Peer})
}
func (m *Manager) processMessageRequest(data []byte, nbr *Neighbor) {
packet := new(pb.MessageRequest)
if err := proto.Unmarshal(data[1:], packet); err != nil {
m.log.Debugw("invalid packet", "err", err)
}
msgID, _, err := message.IdFromBytes(packet.GetId())
if err != nil {
m.log.Debugw("invalid message id:", "err", err)
}
msgBytes, err := m.loadMessageFunc(msgID)
if err != nil {
m.log.Debugw("error loading message", "msg-id", msgID, "err", err)
}
// send the loaded message directly to the neighbor
_, _ = nbr.Write(marshal(&pb.Message{Data: msgBytes}))
}
...@@ -132,7 +132,6 @@ func run(*node.Plugin) { ...@@ -132,7 +132,6 @@ func run(*node.Plugin) {
if err := daemon.BackgroundWorker("Tangle", func(shutdownSignal <-chan struct{}) { if err := daemon.BackgroundWorker("Tangle", func(shutdownSignal <-chan struct{}) {
<-shutdownSignal <-shutdownSignal
messageFactory.Shutdown() messageFactory.Shutdown()
messageParser.Shutdown()
_tangle.Shutdown() _tangle.Shutdown()
}, shutdown.PriorityTangle); err != nil { }, shutdown.PriorityTangle); err != nil {
log.Panicf("Failed to start as daemon: %s", err) log.Panicf("Failed to start as daemon: %s", err)
......
...@@ -11,6 +11,8 @@ const ( ...@@ -11,6 +11,8 @@ const (
CfgPrometheusProcessMetrics = "prometheus.processMetrics" CfgPrometheusProcessMetrics = "prometheus.processMetrics"
// CfgPrometheusPromhttpMetrics defines the config flag to enable/disable promhttp metrics. // CfgPrometheusPromhttpMetrics defines the config flag to enable/disable promhttp metrics.
CfgPrometheusPromhttpMetrics = "prometheus.promhttpMetrics" CfgPrometheusPromhttpMetrics = "prometheus.promhttpMetrics"
// CfgPrometheusWorkerpoolMetrics defines the config flag to enable/disable workerpool metrics.
CfgPrometheusWorkerpoolMetrics = "prometheus.workerpoolMetrics"
// CfgPrometheusBindAddress defines the config flag of the bind address on which the Prometheus exporter listens on. // CfgPrometheusBindAddress defines the config flag of the bind address on which the Prometheus exporter listens on.
CfgPrometheusBindAddress = "prometheus.bindAddress" CfgPrometheusBindAddress = "prometheus.bindAddress"
) )
...@@ -20,4 +22,5 @@ func init() { ...@@ -20,4 +22,5 @@ func init() {
flag.Bool(CfgPrometheusGoMetrics, false, "include go metrics") flag.Bool(CfgPrometheusGoMetrics, false, "include go metrics")
flag.Bool(CfgPrometheusProcessMetrics, false, "include process metrics") flag.Bool(CfgPrometheusProcessMetrics, false, "include process metrics")
flag.Bool(CfgPrometheusPromhttpMetrics, false, "include promhttp metrics") flag.Bool(CfgPrometheusPromhttpMetrics, false, "include promhttp metrics")
flag.Bool(CfgPrometheusWorkerpoolMetrics, false, "include workerpool metrics")
} }
...@@ -29,6 +29,10 @@ var ( ...@@ -29,6 +29,10 @@ var (
func configure(plugin *node.Plugin) { func configure(plugin *node.Plugin) {
log = logger.NewLogger(plugin.Name) log = logger.NewLogger(plugin.Name)
if config.Node().GetBool(CfgPrometheusWorkerpoolMetrics) {
registerWorkerpoolMetrics()
}
if config.Node().GetBool(CfgPrometheusGoMetrics) { if config.Node().GetBool(CfgPrometheusGoMetrics) {
registry.MustRegister(prometheus.NewGoCollector()) registry.MustRegister(prometheus.NewGoCollector())
} }
......
package prometheus
import (
"github.com/iotaledger/goshimmer/plugins/gossip"
"github.com/iotaledger/goshimmer/plugins/messagelayer"
"github.com/prometheus/client_golang/prometheus"
)
var (
workerpools *prometheus.GaugeVec
)
func registerWorkerpoolMetrics() {
workerpools = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "workerpools_load",
Help: "Info about workerpools load",
},
[]string{
"name",
},
)
registry.MustRegister(workerpools)
addCollect(collectWorkerpoolMetrics)
}
func collectWorkerpoolMetrics() {
name, load := gossip.Manager().MessageWorkerPoolStatus()
workerpools.WithLabelValues(
name,
).Set(float64(load))
name, load = gossip.Manager().MessageRequestWorkerPoolStatus()
workerpools.WithLabelValues(
name,
).Set(float64(load))
name, load = messagelayer.Tangle().SolidifierWorkerPoolStatus()
workerpools.WithLabelValues(
name,
).Set(float64(load))
name, load = messagelayer.Tangle().StoreMessageWorkerPoolStatus()
workerpools.WithLabelValues(
name,
).Set(float64(load))
}
...@@ -214,7 +214,7 @@ func (d *DockerContainer) Remove() error { ...@@ -214,7 +214,7 @@ func (d *DockerContainer) Remove() error {
// Stop stops a container without terminating the process. // Stop stops a container without terminating the process.
// The process is blocked until the container stops or the timeout expires. // The process is blocked until the container stops or the timeout expires.
func (d *DockerContainer) Stop() error { func (d *DockerContainer) Stop() error {
duration := 30 * time.Second duration := 3 * time.Minute
return d.client.ContainerStop(context.Background(), d.id, &duration) return d.client.ContainerStop(context.Background(), d.id, &duration)
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment