From 439e8ab8c4b7f1f25125d227fd12db749b441453 Mon Sep 17 00:00:00 2001 From: Angelo Capossele <angelocapossele@gmail.com> Date: Mon, 20 Jul 2020 19:04:56 +0100 Subject: [PATCH] Various sync related fixes (#650) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * 🔍 Add workerpools load on Prometheus * 🐛 Fix bug * ⚡️ Change unlock order in MessageRequester StartRequest * ♻️ Refactor gossip manager workerpools * ♻️ Disable filter workerpools * ♻️ Increase WorkerPools size * ♻️ Increase docker shutdown timeout * ♻️ Clean up code * ♻️ Refactor * ➖ Remove filter shutdown * ⚡️ Increase messageRequestWorkerCount --- .../packages/tangle/signature_filter.go | 47 +++----- .../message_signature_filter.go | 25 ++-- .../builtinfilters/pow_filter.go | 19 +-- .../builtinfilters/pow_filter_test.go | 10 +- .../recently_seen_bytes_filter.go | 22 ++-- .../messageparser/bytes_filter.go | 2 - .../messageparser/message_filter.go | 2 - .../messageparser/message_parser.go | 15 --- .../messageparser/message_parser_test.go | 6 - .../messagerequester/messagerequester.go | 8 +- packages/binary/messagelayer/tangle/tangle.go | 14 ++- packages/gossip/manager.go | 109 +++++++++++++----- plugins/messagelayer/plugin.go | 1 - plugins/prometheus/parameters.go | 3 + plugins/prometheus/plugin.go | 4 + plugins/prometheus/workerpool.go | 49 ++++++++ .../tester/framework/docker.go | 2 +- 17 files changed, 199 insertions(+), 139 deletions(-) create mode 100644 plugins/prometheus/workerpool.go diff --git a/dapps/valuetransfers/packages/tangle/signature_filter.go b/dapps/valuetransfers/packages/tangle/signature_filter.go index 3720dab3..a7e7e2cc 100644 --- a/dapps/valuetransfers/packages/tangle/signature_filter.go +++ b/dapps/valuetransfers/packages/tangle/signature_filter.go @@ -7,7 +7,6 @@ import ( "github.com/iotaledger/goshimmer/dapps/valuetransfers/packages/payload" "github.com/iotaledger/goshimmer/packages/binary/messagelayer/message" "github.com/iotaledger/goshimmer/packages/binary/messagelayer/messageparser" - "github.com/iotaledger/hive.go/async" "github.com/iotaledger/hive.go/autopeering/peer" ) @@ -17,7 +16,6 @@ type SignatureFilter struct { onRejectCallback func(message *message.Message, err error, peer *peer.Peer) onAcceptCallbackMutex sync.RWMutex onRejectCallbackMutex sync.RWMutex - workerPool async.WorkerPool } // NewSignatureFilter is the constructor of the MessageFilter. @@ -28,33 +26,31 @@ func NewSignatureFilter() *SignatureFilter { // Filter get's called whenever a new message is received. It rejects the message, if the message is not a valid value // message. func (filter *SignatureFilter) Filter(message *message.Message, peer *peer.Peer) { - filter.workerPool.Submit(func() { - // accept message if the message is not a value message (it will be checked by other filters) - valuePayload := message.Payload() - if valuePayload.Type() != payload.Type { - filter.getAcceptCallback()(message, peer) + // accept message if the message is not a value message (it will be checked by other filters) + valuePayload := message.Payload() + if valuePayload.Type() != payload.Type { + filter.getAcceptCallback()(message, peer) - return - } + return + } - // reject if the payload can not be casted to a ValuePayload (invalid payload) - typeCastedValuePayload, ok := valuePayload.(*payload.Payload) - if !ok { - filter.getRejectCallback()(message, errors.New("invalid value message"), peer) + // reject if the payload can not be casted to a ValuePayload (invalid payload) + typeCastedValuePayload, ok := valuePayload.(*payload.Payload) + if !ok { + filter.getRejectCallback()(message, errors.New("invalid value message"), peer) - return - } + return + } - // reject message if it contains a transaction with invalid signatures - if !typeCastedValuePayload.Transaction().SignaturesValid() { - filter.getRejectCallback()(message, errors.New("invalid transaction signatures"), peer) + // reject message if it contains a transaction with invalid signatures + if !typeCastedValuePayload.Transaction().SignaturesValid() { + filter.getRejectCallback()(message, errors.New("invalid transaction signatures"), peer) - return - } + return + } - // if all previous checks passed: accept message - filter.getAcceptCallback()(message, peer) - }) + // if all previous checks passed: accept message + filter.getAcceptCallback()(message, peer) } // OnAccept registers the given callback as the acceptance function of the filter. @@ -73,11 +69,6 @@ func (filter *SignatureFilter) OnReject(callback func(message *message.Message, filter.onRejectCallback = callback } -// Shutdown shuts down the filter. -func (filter *SignatureFilter) Shutdown() { - filter.workerPool.ShutdownGracefully() -} - // getAcceptCallback returns the callback that is executed when a message passes the filter. func (filter *SignatureFilter) getAcceptCallback() func(message *message.Message, peer *peer.Peer) { filter.onAcceptCallbackMutex.RLock() diff --git a/packages/binary/messagelayer/messageparser/builtinfilters/message_signature_filter.go b/packages/binary/messagelayer/messageparser/builtinfilters/message_signature_filter.go index a7195d61..02ba960c 100644 --- a/packages/binary/messagelayer/messageparser/builtinfilters/message_signature_filter.go +++ b/packages/binary/messagelayer/messageparser/builtinfilters/message_signature_filter.go @@ -4,10 +4,8 @@ import ( "fmt" "sync" - "github.com/iotaledger/hive.go/async" - "github.com/iotaledger/hive.go/autopeering/peer" - "github.com/iotaledger/goshimmer/packages/binary/messagelayer/message" + "github.com/iotaledger/hive.go/autopeering/peer" ) // ErrInvalidSignature is returned when a message contains an invalid signature. @@ -17,7 +15,6 @@ var ErrInvalidSignature = fmt.Errorf("invalid signature") type MessageSignatureFilter struct { onAcceptCallback func(msg *message.Message, peer *peer.Peer) onRejectCallback func(msg *message.Message, err error, peer *peer.Peer) - workerPool async.WorkerPool onAcceptCallbackMutex sync.RWMutex onRejectCallbackMutex sync.RWMutex @@ -28,32 +25,30 @@ func NewMessageSignatureFilter() *MessageSignatureFilter { return &MessageSignatureFilter{} } +// Filter filters up on the given bytes and peer and calls the acceptance callback +// if the input passes or the rejection callback if the input is rejected. func (filter *MessageSignatureFilter) Filter(msg *message.Message, peer *peer.Peer) { - filter.workerPool.Submit(func() { - if msg.VerifySignature() { - filter.getAcceptCallback()(msg, peer) - return - } - filter.getRejectCallback()(msg, ErrInvalidSignature, peer) - }) + if msg.VerifySignature() { + filter.getAcceptCallback()(msg, peer) + return + } + filter.getRejectCallback()(msg, ErrInvalidSignature, peer) } +// OnAccept registers the given callback as the acceptance function of the filter. func (filter *MessageSignatureFilter) OnAccept(callback func(msg *message.Message, peer *peer.Peer)) { filter.onAcceptCallbackMutex.Lock() filter.onAcceptCallback = callback filter.onAcceptCallbackMutex.Unlock() } +// OnReject registers the given callback as the rejection function of the filter. func (filter *MessageSignatureFilter) OnReject(callback func(msg *message.Message, err error, peer *peer.Peer)) { filter.onRejectCallbackMutex.Lock() filter.onRejectCallback = callback filter.onRejectCallbackMutex.Unlock() } -func (filter *MessageSignatureFilter) Shutdown() { - filter.workerPool.ShutdownGracefully() -} - func (filter *MessageSignatureFilter) getAcceptCallback() (result func(msg *message.Message, peer *peer.Peer)) { filter.onAcceptCallbackMutex.RLock() result = filter.onAcceptCallback diff --git a/packages/binary/messagelayer/messageparser/builtinfilters/pow_filter.go b/packages/binary/messagelayer/messageparser/builtinfilters/pow_filter.go index 38f708cb..0147ec37 100644 --- a/packages/binary/messagelayer/messageparser/builtinfilters/pow_filter.go +++ b/packages/binary/messagelayer/messageparser/builtinfilters/pow_filter.go @@ -7,7 +7,6 @@ import ( "sync" "github.com/iotaledger/goshimmer/packages/pow" - "github.com/iotaledger/hive.go/async" "github.com/iotaledger/hive.go/autopeering/peer" ) @@ -22,7 +21,6 @@ var ( type PowFilter struct { worker *pow.Worker difficulty int - workerPool async.WorkerPool mu sync.Mutex acceptCallback func([]byte, *peer.Peer) @@ -39,13 +37,11 @@ func NewPowFilter(worker *pow.Worker, difficulty int) *PowFilter { // Filter checks whether the given bytes pass the PoW validation and calls the corresponding callback. func (f *PowFilter) Filter(msgBytes []byte, p *peer.Peer) { - f.workerPool.Submit(func() { - if err := f.validate(msgBytes); err != nil { - f.reject(msgBytes, err, p) - return - } - f.accept(msgBytes, p) - }) + if err := f.validate(msgBytes); err != nil { + f.reject(msgBytes, err, p) + return + } + f.accept(msgBytes, p) } // OnAccept registers the given callback as the acceptance function of the filter. @@ -62,11 +58,6 @@ func (f *PowFilter) OnReject(callback func([]byte, error, *peer.Peer)) { f.rejectCallback = callback } -// Shutdown shuts down the filter. -func (f *PowFilter) Shutdown() { - f.workerPool.ShutdownGracefully() -} - func (f *PowFilter) accept(msgBytes []byte, p *peer.Peer) { f.mu.Lock() defer f.mu.Unlock() diff --git a/packages/binary/messagelayer/messageparser/builtinfilters/pow_filter_test.go b/packages/binary/messagelayer/messageparser/builtinfilters/pow_filter_test.go index fe53a9d7..b348247b 100644 --- a/packages/binary/messagelayer/messageparser/builtinfilters/pow_filter_test.go +++ b/packages/binary/messagelayer/messageparser/builtinfilters/pow_filter_test.go @@ -18,15 +18,14 @@ import ( ) var ( - testPayload = payload.NewData([]byte("test")) - testPeer *peer.Peer = nil - testWorker = pow.New(crypto.BLAKE2b_512, 1) - testDifficulty = 10 + testPayload = payload.NewData([]byte("test")) + testPeer *peer.Peer + testWorker = pow.New(crypto.BLAKE2b_512, 1) + testDifficulty = 10 ) func TestPowFilter_Filter(t *testing.T) { filter := NewPowFilter(testWorker, testDifficulty) - defer filter.Shutdown() // set callbacks m := &callbackMock{} @@ -61,7 +60,6 @@ func TestPowFilter_Filter(t *testing.T) { filter.Filter(msgPOWBytes, testPeer) }) - filter.Shutdown() m.AssertExpectations(t) } diff --git a/packages/binary/messagelayer/messageparser/builtinfilters/recently_seen_bytes_filter.go b/packages/binary/messagelayer/messageparser/builtinfilters/recently_seen_bytes_filter.go index 45b3bfee..855d7942 100644 --- a/packages/binary/messagelayer/messageparser/builtinfilters/recently_seen_bytes_filter.go +++ b/packages/binary/messagelayer/messageparser/builtinfilters/recently_seen_bytes_filter.go @@ -4,7 +4,6 @@ import ( "fmt" "sync" - "github.com/iotaledger/hive.go/async" "github.com/iotaledger/hive.go/autopeering/peer" "github.com/iotaledger/hive.go/bytesfilter" ) @@ -17,7 +16,6 @@ type RecentlySeenBytesFilter struct { bytesFilter *bytesfilter.BytesFilter onAcceptCallback func(bytes []byte, peer *peer.Peer) onRejectCallback func(bytes []byte, err error, peer *peer.Peer) - workerPool async.WorkerPool onAcceptCallbackMutex sync.RWMutex onRejectCallbackMutex sync.RWMutex @@ -30,22 +28,24 @@ func NewRecentlySeenBytesFilter() *RecentlySeenBytesFilter { } } +// Filter filters up on the given bytes and peer and calls the acceptance callback +// if the input passes or the rejection callback if the input is rejected. func (filter *RecentlySeenBytesFilter) Filter(bytes []byte, peer *peer.Peer) { - filter.workerPool.Submit(func() { - if filter.bytesFilter.Add(bytes) { - filter.getAcceptCallback()(bytes, peer) - return - } - filter.getRejectCallback()(bytes, ErrReceivedDuplicateBytes, peer) - }) + if filter.bytesFilter.Add(bytes) { + filter.getAcceptCallback()(bytes, peer) + return + } + filter.getRejectCallback()(bytes, ErrReceivedDuplicateBytes, peer) } +// OnAccept registers the given callback as the acceptance function of the filter. func (filter *RecentlySeenBytesFilter) OnAccept(callback func(bytes []byte, peer *peer.Peer)) { filter.onAcceptCallbackMutex.Lock() filter.onAcceptCallback = callback filter.onAcceptCallbackMutex.Unlock() } +// OnReject registers the given callback as the rejection function of the filter. func (filter *RecentlySeenBytesFilter) OnReject(callback func(bytes []byte, err error, peer *peer.Peer)) { filter.onRejectCallbackMutex.Lock() filter.onRejectCallback = callback @@ -65,7 +65,3 @@ func (filter *RecentlySeenBytesFilter) getRejectCallback() (result func(bytes [] filter.onRejectCallbackMutex.Unlock() return } - -func (filter *RecentlySeenBytesFilter) Shutdown() { - filter.workerPool.ShutdownGracefully() -} diff --git a/packages/binary/messagelayer/messageparser/bytes_filter.go b/packages/binary/messagelayer/messageparser/bytes_filter.go index 96d928e5..7c62a459 100644 --- a/packages/binary/messagelayer/messageparser/bytes_filter.go +++ b/packages/binary/messagelayer/messageparser/bytes_filter.go @@ -13,6 +13,4 @@ type BytesFilter interface { OnAccept(callback func(bytes []byte, peer *peer.Peer)) // OnReject registers the given callback as the rejection function of the filter. OnReject(callback func(bytes []byte, err error, peer *peer.Peer)) - // Shutdown shuts down the filter. - Shutdown() } diff --git a/packages/binary/messagelayer/messageparser/message_filter.go b/packages/binary/messagelayer/messageparser/message_filter.go index f303a5b6..b883f3d1 100644 --- a/packages/binary/messagelayer/messageparser/message_filter.go +++ b/packages/binary/messagelayer/messageparser/message_filter.go @@ -15,6 +15,4 @@ type MessageFilter interface { OnAccept(callback func(msg *message.Message, peer *peer.Peer)) // OnAccept registers the given callback as the rejection function of the filter. OnReject(callback func(msg *message.Message, err error, peer *peer.Peer)) - // Shutdown shuts down the filter. - Shutdown() } diff --git a/packages/binary/messagelayer/messageparser/message_parser.go b/packages/binary/messagelayer/messageparser/message_parser.go index 8e491be4..9c590e63 100644 --- a/packages/binary/messagelayer/messageparser/message_parser.go +++ b/packages/binary/messagelayer/messageparser/message_parser.go @@ -70,21 +70,6 @@ func (messageParser *MessageParser) AddMessageFilter(filter MessageFilter) { messageParser.messageFiltersModified.Set() } -// Shutdown shut downs the message parser and its corresponding registered filters. -func (messageParser *MessageParser) Shutdown() { - messageParser.bytesFiltersMutex.Lock() - for _, bytesFilter := range messageParser.bytesFilters { - bytesFilter.Shutdown() - } - messageParser.bytesFiltersMutex.Unlock() - - messageParser.messageFiltersMutex.Lock() - for _, messageFilter := range messageParser.messageFilters { - messageFilter.Shutdown() - } - messageParser.messageFiltersMutex.Unlock() -} - // sets up the byte filter data flow chain. func (messageParser *MessageParser) setupBytesFilterDataFlow() { if !messageParser.byteFiltersModified.IsSet() { diff --git a/packages/binary/messagelayer/messageparser/message_parser_test.go b/packages/binary/messagelayer/messageparser/message_parser_test.go index 2e33aad3..e85cf4ce 100644 --- a/packages/binary/messagelayer/messageparser/message_parser_test.go +++ b/packages/binary/messagelayer/messageparser/message_parser_test.go @@ -22,8 +22,6 @@ func BenchmarkMessageParser_ParseBytesSame(b *testing.B) { for i := 0; i < b.N; i++ { msgParser.Parse(msgBytes, nil) } - - msgParser.Shutdown() } func BenchmarkMessageParser_ParseBytesDifferent(b *testing.B) { @@ -39,8 +37,6 @@ func BenchmarkMessageParser_ParseBytesDifferent(b *testing.B) { for i := 0; i < b.N; i++ { msgParser.Parse(messageBytes[i], nil) } - - msgParser.Shutdown() } func TestMessageParser_ParseMessage(t *testing.T) { @@ -52,8 +48,6 @@ func TestMessageParser_ParseMessage(t *testing.T) { msgParser.Events.MessageParsed.Attach(events.NewClosure(func(msg *message.Message) { log.Infof("parsed message") })) - - msgParser.Shutdown() } func newTestMessage(payloadString string) *message.Message { diff --git a/packages/binary/messagelayer/messagerequester/messagerequester.go b/packages/binary/messagelayer/messagerequester/messagerequester.go index 06845164..39ce0f98 100644 --- a/packages/binary/messagelayer/messagerequester/messagerequester.go +++ b/packages/binary/messagelayer/messagerequester/messagerequester.go @@ -33,17 +33,17 @@ func New(optionalOptions ...Option) *MessageRequester { // StartRequest initiates a regular triggering of the StartRequest event until it has been stopped using StopRequest. func (requester *MessageRequester) StartRequest(id message.Id) { requester.scheduledRequestsMutex.Lock() - defer requester.scheduledRequestsMutex.Unlock() // ignore already scheduled requests if _, exists := requester.scheduledRequests[id]; exists { + requester.scheduledRequestsMutex.Unlock() return } - // trigger the event and schedule the next request - // make this atomic to be sure that a successive call of StartRequest does not trigger again - requester.Events.SendRequest.Trigger(id) + // schedule the next request and trigger the event requester.scheduledRequests[id] = time.AfterFunc(requester.options.retryInterval, func() { requester.reRequest(id) }) + requester.scheduledRequestsMutex.Unlock() + requester.Events.SendRequest.Trigger(id) } // StopRequest stops requests for the given message to further happen. diff --git a/packages/binary/messagelayer/tangle/tangle.go b/packages/binary/messagelayer/tangle/tangle.go index d750ecbd..9d7e8642 100644 --- a/packages/binary/messagelayer/tangle/tangle.go +++ b/packages/binary/messagelayer/tangle/tangle.go @@ -2,7 +2,6 @@ package tangle import ( "container/list" - "runtime" "time" "github.com/iotaledger/goshimmer/packages/binary/messagelayer/message" @@ -57,7 +56,8 @@ func New(store kvstore.KVStore) (result *Tangle) { Events: *newEvents(), } - result.solidifierWorkerPool.Tune(runtime.GOMAXPROCS(0)) + result.solidifierWorkerPool.Tune(1024) + result.storeMessageWorkerPool.Tune(1024) return } @@ -294,3 +294,13 @@ func (tangle *Tangle) deleteFutureCone(messageId message.Id) { }) } } + +// SolidifierWorkerPoolStatus returns the name and the load of the workerpool. +func (tangle *Tangle) SolidifierWorkerPoolStatus() (name string, load int) { + return "Solidifier", tangle.solidifierWorkerPool.RunningWorkers() +} + +// StoreMessageWorkerPoolStatus returns the name and the load of the workerpool. +func (tangle *Tangle) StoreMessageWorkerPoolStatus() (name string, load int) { + return "StoreMessage", tangle.storeMessageWorkerPool.RunningWorkers() +} diff --git a/packages/gossip/manager.go b/packages/gossip/manager.go index ced57670..d6489199 100644 --- a/packages/gossip/manager.go +++ b/packages/gossip/manager.go @@ -3,17 +3,18 @@ package gossip import ( "fmt" "net" + "runtime" "sync" "github.com/golang/protobuf/proto" "github.com/iotaledger/goshimmer/packages/binary/messagelayer/message" pb "github.com/iotaledger/goshimmer/packages/gossip/proto" "github.com/iotaledger/goshimmer/packages/gossip/server" - "github.com/iotaledger/hive.go/async" "github.com/iotaledger/hive.go/autopeering/peer" "github.com/iotaledger/hive.go/events" "github.com/iotaledger/hive.go/identity" "github.com/iotaledger/hive.go/logger" + "github.com/iotaledger/hive.go/workerpool" ) const ( @@ -21,6 +22,14 @@ const ( maxPacketSize = 65 * 1024 ) +var ( + messageWorkerCount = runtime.GOMAXPROCS(0) * 4 + messageWorkerQueueSize = 1000 + + messageRequestWorkerCount = runtime.GOMAXPROCS(0) + messageRequestWorkerQueueSize = 100 +) + // LoadMessageFunc defines a function that returns the message for the given id. type LoadMessageFunc func(messageId message.Id) ([]byte, error) @@ -37,8 +46,10 @@ type Manager struct { srv *server.TCP neighbors map[identity.ID]*Neighbor - // inboxWorkerPool defines a worker pool where all incoming messages are processed. - inboxWorkerPool async.WorkerPool + // messageWorkerPool defines a worker pool where all incoming messages are processed. + messageWorkerPool *workerpool.WorkerPool + + messageRequestWorkerPool *workerpool.WorkerPool } // NewManager creates a new Manager. @@ -56,6 +67,21 @@ func NewManager(local *peer.Local, f LoadMessageFunc, log *logger.Logger) *Manag srv: nil, neighbors: make(map[identity.ID]*Neighbor), } + + m.messageWorkerPool = workerpool.New(func(task workerpool.Task) { + + m.processPacketMessage(task.Param(0).([]byte), task.Param(1).(*Neighbor)) + + task.Return(nil) + }, workerpool.WorkerCount(messageWorkerCount), workerpool.QueueSize(messageWorkerQueueSize)) + + m.messageRequestWorkerPool = workerpool.New(func(task workerpool.Task) { + + m.processMessageRequest(task.Param(0).([]byte), task.Param(1).(*Neighbor)) + + task.Return(nil) + }, workerpool.WorkerCount(messageRequestWorkerCount), workerpool.QueueSize(messageRequestWorkerQueueSize)) + return m } @@ -65,6 +91,9 @@ func (m *Manager) Start(srv *server.TCP) { defer m.mu.Unlock() m.srv = srv + + m.messageWorkerPool.Start() + m.messageRequestWorkerPool.Start() } // Close stops the manager and closes all established connections. @@ -72,7 +101,8 @@ func (m *Manager) Close() { m.stop() m.wg.Wait() - m.inboxWorkerPool.ShutdownGracefully() + m.messageWorkerPool.Stop() + m.messageRequestWorkerPool.Stop() } // Events returns the events related to the gossip protocol. @@ -214,11 +244,9 @@ func (m *Manager) addNeighbor(peer *peer.Peer, connectorFunc func(*peer.Peer) (n nbr.Events.ReceiveMessage.Attach(events.NewClosure(func(data []byte) { dataCopy := make([]byte, len(data)) copy(dataCopy, data) - m.inboxWorkerPool.Submit(func() { - if err := m.handlePacket(dataCopy, nbr); err != nil { - m.log.Debugw("error handling packet", "err", err) - } - }) + if err := m.handlePacket(dataCopy, nbr); err != nil { + m.log.Debugw("error handling packet", "err", err) + } })) m.neighbors[peer.ID()] = nbr @@ -237,31 +265,14 @@ func (m *Manager) handlePacket(data []byte, nbr *Neighbor) error { switch pb.PacketType(data[0]) { case pb.PacketMessage: - packet := new(pb.Message) - if err := proto.Unmarshal(data[1:], packet); err != nil { - return fmt.Errorf("invalid packet: %w", err) + if _, added := m.messageWorkerPool.TrySubmit(data, nbr); !added { + return fmt.Errorf("messageWorkerPool full: packet message discarded") } - m.events.MessageReceived.Trigger(&MessageReceivedEvent{Data: packet.GetData(), Peer: nbr.Peer}) - case pb.PacketMessageRequest: - packet := new(pb.MessageRequest) - if err := proto.Unmarshal(data[1:], packet); err != nil { - return fmt.Errorf("invalid packet: %w", err) - } - - msgID, _, err := message.IdFromBytes(packet.GetId()) - if err != nil { - return fmt.Errorf("invalid message id: %w", err) - } - - msgBytes, err := m.loadMessageFunc(msgID) - if err != nil { - m.log.Debugw("error loading message", "msg-id", msgID, "err", err) - return nil + if _, added := m.messageRequestWorkerPool.TrySubmit(data, nbr); !added { + return fmt.Errorf("messageRequestWorkerPool full: message request discarded") } - // send the loaded message directly to the neighbor - _, _ = nbr.Write(marshal(&pb.Message{Data: msgBytes})) default: return ErrInvalidPacket } @@ -281,3 +292,41 @@ func marshal(packet pb.Packet) []byte { } return append([]byte{byte(packetType)}, data...) } + +// MessageWorkerPoolStatus returns the name and the load of the workerpool. +func (m *Manager) MessageWorkerPoolStatus() (name string, load int) { + return "messageWorkerPool", m.messageWorkerPool.GetPendingQueueSize() +} + +// MessageRequestWorkerPoolStatus returns the name and the load of the workerpool. +func (m *Manager) MessageRequestWorkerPoolStatus() (name string, load int) { + return "messageRequestWorkerPool", m.messageRequestWorkerPool.GetPendingQueueSize() +} + +func (m *Manager) processPacketMessage(data []byte, nbr *Neighbor) { + packet := new(pb.Message) + if err := proto.Unmarshal(data[1:], packet); err != nil { + m.log.Debugw("error processing packet", "err", err) + } + m.events.MessageReceived.Trigger(&MessageReceivedEvent{Data: packet.GetData(), Peer: nbr.Peer}) +} + +func (m *Manager) processMessageRequest(data []byte, nbr *Neighbor) { + packet := new(pb.MessageRequest) + if err := proto.Unmarshal(data[1:], packet); err != nil { + m.log.Debugw("invalid packet", "err", err) + } + + msgID, _, err := message.IdFromBytes(packet.GetId()) + if err != nil { + m.log.Debugw("invalid message id:", "err", err) + } + + msgBytes, err := m.loadMessageFunc(msgID) + if err != nil { + m.log.Debugw("error loading message", "msg-id", msgID, "err", err) + } + + // send the loaded message directly to the neighbor + _, _ = nbr.Write(marshal(&pb.Message{Data: msgBytes})) +} diff --git a/plugins/messagelayer/plugin.go b/plugins/messagelayer/plugin.go index 084df96c..23967033 100644 --- a/plugins/messagelayer/plugin.go +++ b/plugins/messagelayer/plugin.go @@ -132,7 +132,6 @@ func run(*node.Plugin) { if err := daemon.BackgroundWorker("Tangle", func(shutdownSignal <-chan struct{}) { <-shutdownSignal messageFactory.Shutdown() - messageParser.Shutdown() _tangle.Shutdown() }, shutdown.PriorityTangle); err != nil { log.Panicf("Failed to start as daemon: %s", err) diff --git a/plugins/prometheus/parameters.go b/plugins/prometheus/parameters.go index dbc77cb0..5960760c 100644 --- a/plugins/prometheus/parameters.go +++ b/plugins/prometheus/parameters.go @@ -11,6 +11,8 @@ const ( CfgPrometheusProcessMetrics = "prometheus.processMetrics" // CfgPrometheusPromhttpMetrics defines the config flag to enable/disable promhttp metrics. CfgPrometheusPromhttpMetrics = "prometheus.promhttpMetrics" + // CfgPrometheusWorkerpoolMetrics defines the config flag to enable/disable workerpool metrics. + CfgPrometheusWorkerpoolMetrics = "prometheus.workerpoolMetrics" // CfgPrometheusBindAddress defines the config flag of the bind address on which the Prometheus exporter listens on. CfgPrometheusBindAddress = "prometheus.bindAddress" ) @@ -20,4 +22,5 @@ func init() { flag.Bool(CfgPrometheusGoMetrics, false, "include go metrics") flag.Bool(CfgPrometheusProcessMetrics, false, "include process metrics") flag.Bool(CfgPrometheusPromhttpMetrics, false, "include promhttp metrics") + flag.Bool(CfgPrometheusWorkerpoolMetrics, false, "include workerpool metrics") } diff --git a/plugins/prometheus/plugin.go b/plugins/prometheus/plugin.go index 5f32a260..1a612d87 100644 --- a/plugins/prometheus/plugin.go +++ b/plugins/prometheus/plugin.go @@ -29,6 +29,10 @@ var ( func configure(plugin *node.Plugin) { log = logger.NewLogger(plugin.Name) + if config.Node().GetBool(CfgPrometheusWorkerpoolMetrics) { + registerWorkerpoolMetrics() + } + if config.Node().GetBool(CfgPrometheusGoMetrics) { registry.MustRegister(prometheus.NewGoCollector()) } diff --git a/plugins/prometheus/workerpool.go b/plugins/prometheus/workerpool.go new file mode 100644 index 00000000..7bca4899 --- /dev/null +++ b/plugins/prometheus/workerpool.go @@ -0,0 +1,49 @@ +package prometheus + +import ( + "github.com/iotaledger/goshimmer/plugins/gossip" + "github.com/iotaledger/goshimmer/plugins/messagelayer" + "github.com/prometheus/client_golang/prometheus" +) + +var ( + workerpools *prometheus.GaugeVec +) + +func registerWorkerpoolMetrics() { + workerpools = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "workerpools_load", + Help: "Info about workerpools load", + }, + []string{ + "name", + }, + ) + + registry.MustRegister(workerpools) + + addCollect(collectWorkerpoolMetrics) +} + +func collectWorkerpoolMetrics() { + name, load := gossip.Manager().MessageWorkerPoolStatus() + workerpools.WithLabelValues( + name, + ).Set(float64(load)) + + name, load = gossip.Manager().MessageRequestWorkerPoolStatus() + workerpools.WithLabelValues( + name, + ).Set(float64(load)) + + name, load = messagelayer.Tangle().SolidifierWorkerPoolStatus() + workerpools.WithLabelValues( + name, + ).Set(float64(load)) + + name, load = messagelayer.Tangle().StoreMessageWorkerPoolStatus() + workerpools.WithLabelValues( + name, + ).Set(float64(load)) +} diff --git a/tools/integration-tests/tester/framework/docker.go b/tools/integration-tests/tester/framework/docker.go index 0b5d688f..5fd40921 100644 --- a/tools/integration-tests/tester/framework/docker.go +++ b/tools/integration-tests/tester/framework/docker.go @@ -214,7 +214,7 @@ func (d *DockerContainer) Remove() error { // Stop stops a container without terminating the process. // The process is blocked until the container stops or the timeout expires. func (d *DockerContainer) Stop() error { - duration := 30 * time.Second + duration := 3 * time.Minute return d.client.ContainerStop(context.Background(), d.id, &duration) } -- GitLab