diff --git a/dapps/valuetransfers/packages/tangle/signature_filter.go b/dapps/valuetransfers/packages/tangle/signature_filter.go index 3720dab3a1923799579df560a72378d9e609541a..a7e7e2cc0c254f4248237d4d44a9d5f39bd8d065 100644 --- a/dapps/valuetransfers/packages/tangle/signature_filter.go +++ b/dapps/valuetransfers/packages/tangle/signature_filter.go @@ -7,7 +7,6 @@ import ( "github.com/iotaledger/goshimmer/dapps/valuetransfers/packages/payload" "github.com/iotaledger/goshimmer/packages/binary/messagelayer/message" "github.com/iotaledger/goshimmer/packages/binary/messagelayer/messageparser" - "github.com/iotaledger/hive.go/async" "github.com/iotaledger/hive.go/autopeering/peer" ) @@ -17,7 +16,6 @@ type SignatureFilter struct { onRejectCallback func(message *message.Message, err error, peer *peer.Peer) onAcceptCallbackMutex sync.RWMutex onRejectCallbackMutex sync.RWMutex - workerPool async.WorkerPool } // NewSignatureFilter is the constructor of the MessageFilter. @@ -28,33 +26,31 @@ func NewSignatureFilter() *SignatureFilter { // Filter get's called whenever a new message is received. It rejects the message, if the message is not a valid value // message. func (filter *SignatureFilter) Filter(message *message.Message, peer *peer.Peer) { - filter.workerPool.Submit(func() { - // accept message if the message is not a value message (it will be checked by other filters) - valuePayload := message.Payload() - if valuePayload.Type() != payload.Type { - filter.getAcceptCallback()(message, peer) + // accept message if the message is not a value message (it will be checked by other filters) + valuePayload := message.Payload() + if valuePayload.Type() != payload.Type { + filter.getAcceptCallback()(message, peer) - return - } + return + } - // reject if the payload can not be casted to a ValuePayload (invalid payload) - typeCastedValuePayload, ok := valuePayload.(*payload.Payload) - if !ok { - filter.getRejectCallback()(message, errors.New("invalid value message"), peer) + // reject if the payload can not be casted to a ValuePayload (invalid payload) + typeCastedValuePayload, ok := valuePayload.(*payload.Payload) + if !ok { + filter.getRejectCallback()(message, errors.New("invalid value message"), peer) - return - } + return + } - // reject message if it contains a transaction with invalid signatures - if !typeCastedValuePayload.Transaction().SignaturesValid() { - filter.getRejectCallback()(message, errors.New("invalid transaction signatures"), peer) + // reject message if it contains a transaction with invalid signatures + if !typeCastedValuePayload.Transaction().SignaturesValid() { + filter.getRejectCallback()(message, errors.New("invalid transaction signatures"), peer) - return - } + return + } - // if all previous checks passed: accept message - filter.getAcceptCallback()(message, peer) - }) + // if all previous checks passed: accept message + filter.getAcceptCallback()(message, peer) } // OnAccept registers the given callback as the acceptance function of the filter. @@ -73,11 +69,6 @@ func (filter *SignatureFilter) OnReject(callback func(message *message.Message, filter.onRejectCallback = callback } -// Shutdown shuts down the filter. -func (filter *SignatureFilter) Shutdown() { - filter.workerPool.ShutdownGracefully() -} - // getAcceptCallback returns the callback that is executed when a message passes the filter. func (filter *SignatureFilter) getAcceptCallback() func(message *message.Message, peer *peer.Peer) { filter.onAcceptCallbackMutex.RLock() diff --git a/packages/binary/messagelayer/messageparser/builtinfilters/message_signature_filter.go b/packages/binary/messagelayer/messageparser/builtinfilters/message_signature_filter.go index a7195d61e1d026f49be76cc1833cb4ca930c5028..02ba960c4e183af9dd7b084c798accaf1d14e458 100644 --- a/packages/binary/messagelayer/messageparser/builtinfilters/message_signature_filter.go +++ b/packages/binary/messagelayer/messageparser/builtinfilters/message_signature_filter.go @@ -4,10 +4,8 @@ import ( "fmt" "sync" - "github.com/iotaledger/hive.go/async" - "github.com/iotaledger/hive.go/autopeering/peer" - "github.com/iotaledger/goshimmer/packages/binary/messagelayer/message" + "github.com/iotaledger/hive.go/autopeering/peer" ) // ErrInvalidSignature is returned when a message contains an invalid signature. @@ -17,7 +15,6 @@ var ErrInvalidSignature = fmt.Errorf("invalid signature") type MessageSignatureFilter struct { onAcceptCallback func(msg *message.Message, peer *peer.Peer) onRejectCallback func(msg *message.Message, err error, peer *peer.Peer) - workerPool async.WorkerPool onAcceptCallbackMutex sync.RWMutex onRejectCallbackMutex sync.RWMutex @@ -28,32 +25,30 @@ func NewMessageSignatureFilter() *MessageSignatureFilter { return &MessageSignatureFilter{} } +// Filter filters up on the given bytes and peer and calls the acceptance callback +// if the input passes or the rejection callback if the input is rejected. func (filter *MessageSignatureFilter) Filter(msg *message.Message, peer *peer.Peer) { - filter.workerPool.Submit(func() { - if msg.VerifySignature() { - filter.getAcceptCallback()(msg, peer) - return - } - filter.getRejectCallback()(msg, ErrInvalidSignature, peer) - }) + if msg.VerifySignature() { + filter.getAcceptCallback()(msg, peer) + return + } + filter.getRejectCallback()(msg, ErrInvalidSignature, peer) } +// OnAccept registers the given callback as the acceptance function of the filter. func (filter *MessageSignatureFilter) OnAccept(callback func(msg *message.Message, peer *peer.Peer)) { filter.onAcceptCallbackMutex.Lock() filter.onAcceptCallback = callback filter.onAcceptCallbackMutex.Unlock() } +// OnReject registers the given callback as the rejection function of the filter. func (filter *MessageSignatureFilter) OnReject(callback func(msg *message.Message, err error, peer *peer.Peer)) { filter.onRejectCallbackMutex.Lock() filter.onRejectCallback = callback filter.onRejectCallbackMutex.Unlock() } -func (filter *MessageSignatureFilter) Shutdown() { - filter.workerPool.ShutdownGracefully() -} - func (filter *MessageSignatureFilter) getAcceptCallback() (result func(msg *message.Message, peer *peer.Peer)) { filter.onAcceptCallbackMutex.RLock() result = filter.onAcceptCallback diff --git a/packages/binary/messagelayer/messageparser/builtinfilters/pow_filter.go b/packages/binary/messagelayer/messageparser/builtinfilters/pow_filter.go index 38f708cb4cbe22ffc51d8c0d63774025071f3617..0147ec3730374158de774f3623c90ac24c8faeca 100644 --- a/packages/binary/messagelayer/messageparser/builtinfilters/pow_filter.go +++ b/packages/binary/messagelayer/messageparser/builtinfilters/pow_filter.go @@ -7,7 +7,6 @@ import ( "sync" "github.com/iotaledger/goshimmer/packages/pow" - "github.com/iotaledger/hive.go/async" "github.com/iotaledger/hive.go/autopeering/peer" ) @@ -22,7 +21,6 @@ var ( type PowFilter struct { worker *pow.Worker difficulty int - workerPool async.WorkerPool mu sync.Mutex acceptCallback func([]byte, *peer.Peer) @@ -39,13 +37,11 @@ func NewPowFilter(worker *pow.Worker, difficulty int) *PowFilter { // Filter checks whether the given bytes pass the PoW validation and calls the corresponding callback. func (f *PowFilter) Filter(msgBytes []byte, p *peer.Peer) { - f.workerPool.Submit(func() { - if err := f.validate(msgBytes); err != nil { - f.reject(msgBytes, err, p) - return - } - f.accept(msgBytes, p) - }) + if err := f.validate(msgBytes); err != nil { + f.reject(msgBytes, err, p) + return + } + f.accept(msgBytes, p) } // OnAccept registers the given callback as the acceptance function of the filter. @@ -62,11 +58,6 @@ func (f *PowFilter) OnReject(callback func([]byte, error, *peer.Peer)) { f.rejectCallback = callback } -// Shutdown shuts down the filter. -func (f *PowFilter) Shutdown() { - f.workerPool.ShutdownGracefully() -} - func (f *PowFilter) accept(msgBytes []byte, p *peer.Peer) { f.mu.Lock() defer f.mu.Unlock() diff --git a/packages/binary/messagelayer/messageparser/builtinfilters/pow_filter_test.go b/packages/binary/messagelayer/messageparser/builtinfilters/pow_filter_test.go index fe53a9d71cb715dec6a9ad5e26fc87365a680493..b348247bb28c6c5b4f7cf533492b5fa23b21af54 100644 --- a/packages/binary/messagelayer/messageparser/builtinfilters/pow_filter_test.go +++ b/packages/binary/messagelayer/messageparser/builtinfilters/pow_filter_test.go @@ -18,15 +18,14 @@ import ( ) var ( - testPayload = payload.NewData([]byte("test")) - testPeer *peer.Peer = nil - testWorker = pow.New(crypto.BLAKE2b_512, 1) - testDifficulty = 10 + testPayload = payload.NewData([]byte("test")) + testPeer *peer.Peer + testWorker = pow.New(crypto.BLAKE2b_512, 1) + testDifficulty = 10 ) func TestPowFilter_Filter(t *testing.T) { filter := NewPowFilter(testWorker, testDifficulty) - defer filter.Shutdown() // set callbacks m := &callbackMock{} @@ -61,7 +60,6 @@ func TestPowFilter_Filter(t *testing.T) { filter.Filter(msgPOWBytes, testPeer) }) - filter.Shutdown() m.AssertExpectations(t) } diff --git a/packages/binary/messagelayer/messageparser/builtinfilters/recently_seen_bytes_filter.go b/packages/binary/messagelayer/messageparser/builtinfilters/recently_seen_bytes_filter.go index 45b3bfee8ba4b3e3bd6d428c896fecd7241646df..855d79426db2487b2badb2c2eab690db85fbf81b 100644 --- a/packages/binary/messagelayer/messageparser/builtinfilters/recently_seen_bytes_filter.go +++ b/packages/binary/messagelayer/messageparser/builtinfilters/recently_seen_bytes_filter.go @@ -4,7 +4,6 @@ import ( "fmt" "sync" - "github.com/iotaledger/hive.go/async" "github.com/iotaledger/hive.go/autopeering/peer" "github.com/iotaledger/hive.go/bytesfilter" ) @@ -17,7 +16,6 @@ type RecentlySeenBytesFilter struct { bytesFilter *bytesfilter.BytesFilter onAcceptCallback func(bytes []byte, peer *peer.Peer) onRejectCallback func(bytes []byte, err error, peer *peer.Peer) - workerPool async.WorkerPool onAcceptCallbackMutex sync.RWMutex onRejectCallbackMutex sync.RWMutex @@ -30,22 +28,24 @@ func NewRecentlySeenBytesFilter() *RecentlySeenBytesFilter { } } +// Filter filters up on the given bytes and peer and calls the acceptance callback +// if the input passes or the rejection callback if the input is rejected. func (filter *RecentlySeenBytesFilter) Filter(bytes []byte, peer *peer.Peer) { - filter.workerPool.Submit(func() { - if filter.bytesFilter.Add(bytes) { - filter.getAcceptCallback()(bytes, peer) - return - } - filter.getRejectCallback()(bytes, ErrReceivedDuplicateBytes, peer) - }) + if filter.bytesFilter.Add(bytes) { + filter.getAcceptCallback()(bytes, peer) + return + } + filter.getRejectCallback()(bytes, ErrReceivedDuplicateBytes, peer) } +// OnAccept registers the given callback as the acceptance function of the filter. func (filter *RecentlySeenBytesFilter) OnAccept(callback func(bytes []byte, peer *peer.Peer)) { filter.onAcceptCallbackMutex.Lock() filter.onAcceptCallback = callback filter.onAcceptCallbackMutex.Unlock() } +// OnReject registers the given callback as the rejection function of the filter. func (filter *RecentlySeenBytesFilter) OnReject(callback func(bytes []byte, err error, peer *peer.Peer)) { filter.onRejectCallbackMutex.Lock() filter.onRejectCallback = callback @@ -65,7 +65,3 @@ func (filter *RecentlySeenBytesFilter) getRejectCallback() (result func(bytes [] filter.onRejectCallbackMutex.Unlock() return } - -func (filter *RecentlySeenBytesFilter) Shutdown() { - filter.workerPool.ShutdownGracefully() -} diff --git a/packages/binary/messagelayer/messageparser/bytes_filter.go b/packages/binary/messagelayer/messageparser/bytes_filter.go index 96d928e582ee330dfc732c002b4119c561e6f19b..7c62a45982127604f7e4bfb84ca94fda93e4d10c 100644 --- a/packages/binary/messagelayer/messageparser/bytes_filter.go +++ b/packages/binary/messagelayer/messageparser/bytes_filter.go @@ -13,6 +13,4 @@ type BytesFilter interface { OnAccept(callback func(bytes []byte, peer *peer.Peer)) // OnReject registers the given callback as the rejection function of the filter. OnReject(callback func(bytes []byte, err error, peer *peer.Peer)) - // Shutdown shuts down the filter. - Shutdown() } diff --git a/packages/binary/messagelayer/messageparser/message_filter.go b/packages/binary/messagelayer/messageparser/message_filter.go index f303a5b6ffc3833391377f69dc8b37224ceefeda..b883f3d12025fb5b2b3c6d54cc7c502b28e291c0 100644 --- a/packages/binary/messagelayer/messageparser/message_filter.go +++ b/packages/binary/messagelayer/messageparser/message_filter.go @@ -15,6 +15,4 @@ type MessageFilter interface { OnAccept(callback func(msg *message.Message, peer *peer.Peer)) // OnAccept registers the given callback as the rejection function of the filter. OnReject(callback func(msg *message.Message, err error, peer *peer.Peer)) - // Shutdown shuts down the filter. - Shutdown() } diff --git a/packages/binary/messagelayer/messageparser/message_parser.go b/packages/binary/messagelayer/messageparser/message_parser.go index 8e491be4949927c1eb26e1bb05b1a6632b740108..9c590e63ff6fe81dcb30925ee0abd7498bc58d0f 100644 --- a/packages/binary/messagelayer/messageparser/message_parser.go +++ b/packages/binary/messagelayer/messageparser/message_parser.go @@ -70,21 +70,6 @@ func (messageParser *MessageParser) AddMessageFilter(filter MessageFilter) { messageParser.messageFiltersModified.Set() } -// Shutdown shut downs the message parser and its corresponding registered filters. -func (messageParser *MessageParser) Shutdown() { - messageParser.bytesFiltersMutex.Lock() - for _, bytesFilter := range messageParser.bytesFilters { - bytesFilter.Shutdown() - } - messageParser.bytesFiltersMutex.Unlock() - - messageParser.messageFiltersMutex.Lock() - for _, messageFilter := range messageParser.messageFilters { - messageFilter.Shutdown() - } - messageParser.messageFiltersMutex.Unlock() -} - // sets up the byte filter data flow chain. func (messageParser *MessageParser) setupBytesFilterDataFlow() { if !messageParser.byteFiltersModified.IsSet() { diff --git a/packages/binary/messagelayer/messageparser/message_parser_test.go b/packages/binary/messagelayer/messageparser/message_parser_test.go index 2e33aad3d231fb15852b182fbdbe7453891052b4..e85cf4cee9881f6902a2ff6f4de3e0e3c6e2a060 100644 --- a/packages/binary/messagelayer/messageparser/message_parser_test.go +++ b/packages/binary/messagelayer/messageparser/message_parser_test.go @@ -22,8 +22,6 @@ func BenchmarkMessageParser_ParseBytesSame(b *testing.B) { for i := 0; i < b.N; i++ { msgParser.Parse(msgBytes, nil) } - - msgParser.Shutdown() } func BenchmarkMessageParser_ParseBytesDifferent(b *testing.B) { @@ -39,8 +37,6 @@ func BenchmarkMessageParser_ParseBytesDifferent(b *testing.B) { for i := 0; i < b.N; i++ { msgParser.Parse(messageBytes[i], nil) } - - msgParser.Shutdown() } func TestMessageParser_ParseMessage(t *testing.T) { @@ -52,8 +48,6 @@ func TestMessageParser_ParseMessage(t *testing.T) { msgParser.Events.MessageParsed.Attach(events.NewClosure(func(msg *message.Message) { log.Infof("parsed message") })) - - msgParser.Shutdown() } func newTestMessage(payloadString string) *message.Message { diff --git a/packages/binary/messagelayer/messagerequester/messagerequester.go b/packages/binary/messagelayer/messagerequester/messagerequester.go index 068451643a9ea3d239e52006e6bf63f9a9bd4733..39ce0f9870dcf31c9db0377201442641bfdefd58 100644 --- a/packages/binary/messagelayer/messagerequester/messagerequester.go +++ b/packages/binary/messagelayer/messagerequester/messagerequester.go @@ -33,17 +33,17 @@ func New(optionalOptions ...Option) *MessageRequester { // StartRequest initiates a regular triggering of the StartRequest event until it has been stopped using StopRequest. func (requester *MessageRequester) StartRequest(id message.Id) { requester.scheduledRequestsMutex.Lock() - defer requester.scheduledRequestsMutex.Unlock() // ignore already scheduled requests if _, exists := requester.scheduledRequests[id]; exists { + requester.scheduledRequestsMutex.Unlock() return } - // trigger the event and schedule the next request - // make this atomic to be sure that a successive call of StartRequest does not trigger again - requester.Events.SendRequest.Trigger(id) + // schedule the next request and trigger the event requester.scheduledRequests[id] = time.AfterFunc(requester.options.retryInterval, func() { requester.reRequest(id) }) + requester.scheduledRequestsMutex.Unlock() + requester.Events.SendRequest.Trigger(id) } // StopRequest stops requests for the given message to further happen. diff --git a/packages/binary/messagelayer/tangle/tangle.go b/packages/binary/messagelayer/tangle/tangle.go index d750ecbd59f1fd63af673b32456681aaef36e2b9..9d7e86427a5877141c84c785e096c4ed07697c96 100644 --- a/packages/binary/messagelayer/tangle/tangle.go +++ b/packages/binary/messagelayer/tangle/tangle.go @@ -2,7 +2,6 @@ package tangle import ( "container/list" - "runtime" "time" "github.com/iotaledger/goshimmer/packages/binary/messagelayer/message" @@ -57,7 +56,8 @@ func New(store kvstore.KVStore) (result *Tangle) { Events: *newEvents(), } - result.solidifierWorkerPool.Tune(runtime.GOMAXPROCS(0)) + result.solidifierWorkerPool.Tune(1024) + result.storeMessageWorkerPool.Tune(1024) return } @@ -294,3 +294,13 @@ func (tangle *Tangle) deleteFutureCone(messageId message.Id) { }) } } + +// SolidifierWorkerPoolStatus returns the name and the load of the workerpool. +func (tangle *Tangle) SolidifierWorkerPoolStatus() (name string, load int) { + return "Solidifier", tangle.solidifierWorkerPool.RunningWorkers() +} + +// StoreMessageWorkerPoolStatus returns the name and the load of the workerpool. +func (tangle *Tangle) StoreMessageWorkerPoolStatus() (name string, load int) { + return "StoreMessage", tangle.storeMessageWorkerPool.RunningWorkers() +} diff --git a/packages/gossip/manager.go b/packages/gossip/manager.go index ced57670d4ffa9a79e1365f68921ae5df2b1a39c..d6489199ea21567ffa7d2526569bee9c32601926 100644 --- a/packages/gossip/manager.go +++ b/packages/gossip/manager.go @@ -3,17 +3,18 @@ package gossip import ( "fmt" "net" + "runtime" "sync" "github.com/golang/protobuf/proto" "github.com/iotaledger/goshimmer/packages/binary/messagelayer/message" pb "github.com/iotaledger/goshimmer/packages/gossip/proto" "github.com/iotaledger/goshimmer/packages/gossip/server" - "github.com/iotaledger/hive.go/async" "github.com/iotaledger/hive.go/autopeering/peer" "github.com/iotaledger/hive.go/events" "github.com/iotaledger/hive.go/identity" "github.com/iotaledger/hive.go/logger" + "github.com/iotaledger/hive.go/workerpool" ) const ( @@ -21,6 +22,14 @@ const ( maxPacketSize = 65 * 1024 ) +var ( + messageWorkerCount = runtime.GOMAXPROCS(0) * 4 + messageWorkerQueueSize = 1000 + + messageRequestWorkerCount = runtime.GOMAXPROCS(0) + messageRequestWorkerQueueSize = 100 +) + // LoadMessageFunc defines a function that returns the message for the given id. type LoadMessageFunc func(messageId message.Id) ([]byte, error) @@ -37,8 +46,10 @@ type Manager struct { srv *server.TCP neighbors map[identity.ID]*Neighbor - // inboxWorkerPool defines a worker pool where all incoming messages are processed. - inboxWorkerPool async.WorkerPool + // messageWorkerPool defines a worker pool where all incoming messages are processed. + messageWorkerPool *workerpool.WorkerPool + + messageRequestWorkerPool *workerpool.WorkerPool } // NewManager creates a new Manager. @@ -56,6 +67,21 @@ func NewManager(local *peer.Local, f LoadMessageFunc, log *logger.Logger) *Manag srv: nil, neighbors: make(map[identity.ID]*Neighbor), } + + m.messageWorkerPool = workerpool.New(func(task workerpool.Task) { + + m.processPacketMessage(task.Param(0).([]byte), task.Param(1).(*Neighbor)) + + task.Return(nil) + }, workerpool.WorkerCount(messageWorkerCount), workerpool.QueueSize(messageWorkerQueueSize)) + + m.messageRequestWorkerPool = workerpool.New(func(task workerpool.Task) { + + m.processMessageRequest(task.Param(0).([]byte), task.Param(1).(*Neighbor)) + + task.Return(nil) + }, workerpool.WorkerCount(messageRequestWorkerCount), workerpool.QueueSize(messageRequestWorkerQueueSize)) + return m } @@ -65,6 +91,9 @@ func (m *Manager) Start(srv *server.TCP) { defer m.mu.Unlock() m.srv = srv + + m.messageWorkerPool.Start() + m.messageRequestWorkerPool.Start() } // Close stops the manager and closes all established connections. @@ -72,7 +101,8 @@ func (m *Manager) Close() { m.stop() m.wg.Wait() - m.inboxWorkerPool.ShutdownGracefully() + m.messageWorkerPool.Stop() + m.messageRequestWorkerPool.Stop() } // Events returns the events related to the gossip protocol. @@ -214,11 +244,9 @@ func (m *Manager) addNeighbor(peer *peer.Peer, connectorFunc func(*peer.Peer) (n nbr.Events.ReceiveMessage.Attach(events.NewClosure(func(data []byte) { dataCopy := make([]byte, len(data)) copy(dataCopy, data) - m.inboxWorkerPool.Submit(func() { - if err := m.handlePacket(dataCopy, nbr); err != nil { - m.log.Debugw("error handling packet", "err", err) - } - }) + if err := m.handlePacket(dataCopy, nbr); err != nil { + m.log.Debugw("error handling packet", "err", err) + } })) m.neighbors[peer.ID()] = nbr @@ -237,31 +265,14 @@ func (m *Manager) handlePacket(data []byte, nbr *Neighbor) error { switch pb.PacketType(data[0]) { case pb.PacketMessage: - packet := new(pb.Message) - if err := proto.Unmarshal(data[1:], packet); err != nil { - return fmt.Errorf("invalid packet: %w", err) + if _, added := m.messageWorkerPool.TrySubmit(data, nbr); !added { + return fmt.Errorf("messageWorkerPool full: packet message discarded") } - m.events.MessageReceived.Trigger(&MessageReceivedEvent{Data: packet.GetData(), Peer: nbr.Peer}) - case pb.PacketMessageRequest: - packet := new(pb.MessageRequest) - if err := proto.Unmarshal(data[1:], packet); err != nil { - return fmt.Errorf("invalid packet: %w", err) - } - - msgID, _, err := message.IdFromBytes(packet.GetId()) - if err != nil { - return fmt.Errorf("invalid message id: %w", err) - } - - msgBytes, err := m.loadMessageFunc(msgID) - if err != nil { - m.log.Debugw("error loading message", "msg-id", msgID, "err", err) - return nil + if _, added := m.messageRequestWorkerPool.TrySubmit(data, nbr); !added { + return fmt.Errorf("messageRequestWorkerPool full: message request discarded") } - // send the loaded message directly to the neighbor - _, _ = nbr.Write(marshal(&pb.Message{Data: msgBytes})) default: return ErrInvalidPacket } @@ -281,3 +292,41 @@ func marshal(packet pb.Packet) []byte { } return append([]byte{byte(packetType)}, data...) } + +// MessageWorkerPoolStatus returns the name and the load of the workerpool. +func (m *Manager) MessageWorkerPoolStatus() (name string, load int) { + return "messageWorkerPool", m.messageWorkerPool.GetPendingQueueSize() +} + +// MessageRequestWorkerPoolStatus returns the name and the load of the workerpool. +func (m *Manager) MessageRequestWorkerPoolStatus() (name string, load int) { + return "messageRequestWorkerPool", m.messageRequestWorkerPool.GetPendingQueueSize() +} + +func (m *Manager) processPacketMessage(data []byte, nbr *Neighbor) { + packet := new(pb.Message) + if err := proto.Unmarshal(data[1:], packet); err != nil { + m.log.Debugw("error processing packet", "err", err) + } + m.events.MessageReceived.Trigger(&MessageReceivedEvent{Data: packet.GetData(), Peer: nbr.Peer}) +} + +func (m *Manager) processMessageRequest(data []byte, nbr *Neighbor) { + packet := new(pb.MessageRequest) + if err := proto.Unmarshal(data[1:], packet); err != nil { + m.log.Debugw("invalid packet", "err", err) + } + + msgID, _, err := message.IdFromBytes(packet.GetId()) + if err != nil { + m.log.Debugw("invalid message id:", "err", err) + } + + msgBytes, err := m.loadMessageFunc(msgID) + if err != nil { + m.log.Debugw("error loading message", "msg-id", msgID, "err", err) + } + + // send the loaded message directly to the neighbor + _, _ = nbr.Write(marshal(&pb.Message{Data: msgBytes})) +} diff --git a/plugins/messagelayer/plugin.go b/plugins/messagelayer/plugin.go index 084df96ce53f45d156a0122cb868eac6ea1cc871..23967033e30475adf6cfb8bb3c09e80f3a97a0f4 100644 --- a/plugins/messagelayer/plugin.go +++ b/plugins/messagelayer/plugin.go @@ -132,7 +132,6 @@ func run(*node.Plugin) { if err := daemon.BackgroundWorker("Tangle", func(shutdownSignal <-chan struct{}) { <-shutdownSignal messageFactory.Shutdown() - messageParser.Shutdown() _tangle.Shutdown() }, shutdown.PriorityTangle); err != nil { log.Panicf("Failed to start as daemon: %s", err) diff --git a/plugins/prometheus/parameters.go b/plugins/prometheus/parameters.go index dbc77cb0d17fd0c1e1c1544425225e4dd72d1f99..5960760ca3ac7d9cab0ffc8e189d0e6749e9ef1b 100644 --- a/plugins/prometheus/parameters.go +++ b/plugins/prometheus/parameters.go @@ -11,6 +11,8 @@ const ( CfgPrometheusProcessMetrics = "prometheus.processMetrics" // CfgPrometheusPromhttpMetrics defines the config flag to enable/disable promhttp metrics. CfgPrometheusPromhttpMetrics = "prometheus.promhttpMetrics" + // CfgPrometheusWorkerpoolMetrics defines the config flag to enable/disable workerpool metrics. + CfgPrometheusWorkerpoolMetrics = "prometheus.workerpoolMetrics" // CfgPrometheusBindAddress defines the config flag of the bind address on which the Prometheus exporter listens on. CfgPrometheusBindAddress = "prometheus.bindAddress" ) @@ -20,4 +22,5 @@ func init() { flag.Bool(CfgPrometheusGoMetrics, false, "include go metrics") flag.Bool(CfgPrometheusProcessMetrics, false, "include process metrics") flag.Bool(CfgPrometheusPromhttpMetrics, false, "include promhttp metrics") + flag.Bool(CfgPrometheusWorkerpoolMetrics, false, "include workerpool metrics") } diff --git a/plugins/prometheus/plugin.go b/plugins/prometheus/plugin.go index 5f32a260f1d88e7b54efab6614f7709b0e71c207..1a612d8700318c883c42bdadd21a56334a478be6 100644 --- a/plugins/prometheus/plugin.go +++ b/plugins/prometheus/plugin.go @@ -29,6 +29,10 @@ var ( func configure(plugin *node.Plugin) { log = logger.NewLogger(plugin.Name) + if config.Node().GetBool(CfgPrometheusWorkerpoolMetrics) { + registerWorkerpoolMetrics() + } + if config.Node().GetBool(CfgPrometheusGoMetrics) { registry.MustRegister(prometheus.NewGoCollector()) } diff --git a/plugins/prometheus/workerpool.go b/plugins/prometheus/workerpool.go new file mode 100644 index 0000000000000000000000000000000000000000..7bca48993451a702586e7f9169a56d80fea18c52 --- /dev/null +++ b/plugins/prometheus/workerpool.go @@ -0,0 +1,49 @@ +package prometheus + +import ( + "github.com/iotaledger/goshimmer/plugins/gossip" + "github.com/iotaledger/goshimmer/plugins/messagelayer" + "github.com/prometheus/client_golang/prometheus" +) + +var ( + workerpools *prometheus.GaugeVec +) + +func registerWorkerpoolMetrics() { + workerpools = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "workerpools_load", + Help: "Info about workerpools load", + }, + []string{ + "name", + }, + ) + + registry.MustRegister(workerpools) + + addCollect(collectWorkerpoolMetrics) +} + +func collectWorkerpoolMetrics() { + name, load := gossip.Manager().MessageWorkerPoolStatus() + workerpools.WithLabelValues( + name, + ).Set(float64(load)) + + name, load = gossip.Manager().MessageRequestWorkerPoolStatus() + workerpools.WithLabelValues( + name, + ).Set(float64(load)) + + name, load = messagelayer.Tangle().SolidifierWorkerPoolStatus() + workerpools.WithLabelValues( + name, + ).Set(float64(load)) + + name, load = messagelayer.Tangle().StoreMessageWorkerPoolStatus() + workerpools.WithLabelValues( + name, + ).Set(float64(load)) +} diff --git a/tools/integration-tests/tester/framework/docker.go b/tools/integration-tests/tester/framework/docker.go index 0b5d688f84d72d10b7cd092840df917c0923f83b..5fd40921b80e60e7a981bc2b492e45fc38d97d27 100644 --- a/tools/integration-tests/tester/framework/docker.go +++ b/tools/integration-tests/tester/framework/docker.go @@ -214,7 +214,7 @@ func (d *DockerContainer) Remove() error { // Stop stops a container without terminating the process. // The process is blocked until the container stops or the timeout expires. func (d *DockerContainer) Stop() error { - duration := 30 * time.Second + duration := 3 * time.Minute return d.client.ContainerStop(context.Background(), d.id, &duration) }