diff --git a/packages/gossip/manager.go b/packages/gossip/manager.go index fbfb5f49ca735f8aaf3314027d7a72872bc8dd0b..69cfab5bae9a347fa53d1f83a59ad3cf7a46918f 100644 --- a/packages/gossip/manager.go +++ b/packages/gossip/manager.go @@ -3,23 +3,32 @@ package gossip import ( "fmt" "net" + "runtime" "sync" "github.com/golang/protobuf/proto" "github.com/iotaledger/goshimmer/packages/binary/messagelayer/message" pb "github.com/iotaledger/goshimmer/packages/gossip/proto" "github.com/iotaledger/goshimmer/packages/gossip/server" - "github.com/iotaledger/hive.go/async" "github.com/iotaledger/hive.go/autopeering/peer" "github.com/iotaledger/hive.go/events" "github.com/iotaledger/hive.go/identity" "github.com/iotaledger/hive.go/logger" + "github.com/iotaledger/hive.go/workerpool" ) const ( maxPacketSize = 64 * 1024 ) +var ( + messageWorkerCount = runtime.GOMAXPROCS(0) + messageWorkerQueueSize = 500 + + messageRequestWorkerCount = 1 + messageRequestWorkerQueueSize = 100 +) + // LoadMessageFunc defines a function that returns the message for the given id. type LoadMessageFunc func(messageId message.Id) ([]byte, error) @@ -36,8 +45,10 @@ type Manager struct { srv *server.TCP neighbors map[identity.ID]*Neighbor - // inboxWorkerPool defines a worker pool where all incoming messages are processed. - inboxWorkerPool async.WorkerPool + // messageWorkerPool defines a worker pool where all incoming messages are processed. + messageWorkerPool *workerpool.WorkerPool + + messageRequestWorkerPool *workerpool.WorkerPool } // NewManager creates a new Manager. @@ -55,6 +66,21 @@ func NewManager(local *peer.Local, f LoadMessageFunc, log *logger.Logger) *Manag srv: nil, neighbors: make(map[identity.ID]*Neighbor), } + + m.messageWorkerPool = workerpool.New(func(task workerpool.Task) { + + m.processPacketMessage(task.Param(0).([]byte), task.Param(1).(*Neighbor)) + + task.Return(nil) + }, workerpool.WorkerCount(messageWorkerCount), workerpool.QueueSize(messageWorkerQueueSize)) + + m.messageRequestWorkerPool = workerpool.New(func(task workerpool.Task) { + + m.processMessageRequest(task.Param(0).([]byte), task.Param(1).(*Neighbor)) + + task.Return(nil) + }, workerpool.WorkerCount(messageRequestWorkerCount), workerpool.QueueSize(messageRequestWorkerQueueSize)) + return m } @@ -64,6 +90,9 @@ func (m *Manager) Start(srv *server.TCP) { defer m.mu.Unlock() m.srv = srv + + m.messageWorkerPool.Start() + m.messageRequestWorkerPool.Start() } // Close stops the manager and closes all established connections. @@ -71,7 +100,8 @@ func (m *Manager) Close() { m.stop() m.wg.Wait() - m.inboxWorkerPool.ShutdownGracefully() + m.messageWorkerPool.Stop() + m.messageRequestWorkerPool.Stop() } // Events returns the events related to the gossip protocol. @@ -213,11 +243,9 @@ func (m *Manager) addNeighbor(peer *peer.Peer, connectorFunc func(*peer.Peer) (n nbr.Events.ReceiveMessage.Attach(events.NewClosure(func(data []byte) { dataCopy := make([]byte, len(data)) copy(dataCopy, data) - m.inboxWorkerPool.Submit(func() { - if err := m.handlePacket(dataCopy, nbr); err != nil { - m.log.Debugw("error handling packet", "err", err) - } - }) + if err := m.handlePacket(dataCopy, nbr); err != nil { + m.log.Debugw("error handling packet", "err", err) + } })) m.neighbors[peer.ID()] = nbr @@ -236,31 +264,14 @@ func (m *Manager) handlePacket(data []byte, nbr *Neighbor) error { switch pb.PacketType(data[0]) { case pb.PacketMessage: - packet := new(pb.Message) - if err := proto.Unmarshal(data[1:], packet); err != nil { - return fmt.Errorf("invalid packet: %w", err) + if _, added := m.messageWorkerPool.TrySubmit(data, nbr); !added { + return fmt.Errorf("messageWorkerPool full: packet message discarded") } - m.events.MessageReceived.Trigger(&MessageReceivedEvent{Data: packet.GetData(), Peer: nbr.Peer}) - case pb.PacketMessageRequest: - packet := new(pb.MessageRequest) - if err := proto.Unmarshal(data[1:], packet); err != nil { - return fmt.Errorf("invalid packet: %w", err) - } - - msgID, _, err := message.IdFromBytes(packet.GetId()) - if err != nil { - return fmt.Errorf("invalid message id: %w", err) + if _, added := m.messageRequestWorkerPool.TrySubmit(data, nbr); !added { + return fmt.Errorf("messageRequestWorkerPool full: message request discarded") } - msgBytes, err := m.loadMessageFunc(msgID) - if err != nil { - m.log.Debugw("error loading message", "msg-id", msgID, "err", err) - return nil - } - - // send the loaded message directly to the neighbor - _, _ = nbr.Write(marshal(&pb.Message{Data: msgBytes})) default: return ErrInvalidPacket } @@ -283,5 +294,33 @@ func marshal(packet pb.Packet) []byte { // WorkerPoolStatus returns the name and the load of the workerpool. func (m *Manager) WorkerPoolStatus() (name string, load int) { - return "InboxWorkerPool", m.inboxWorkerPool.RunningWorkers() + return "InboxWorkerPool", m.messageWorkerPool.GetPendingQueueSize() + m.messageRequestWorkerPool.GetPendingQueueSize() +} + +func (m *Manager) processPacketMessage(data []byte, nbr *Neighbor) { + packet := new(pb.Message) + if err := proto.Unmarshal(data[1:], packet); err != nil { + m.log.Debugw("error processing packet", "err", err) + } + m.events.MessageReceived.Trigger(&MessageReceivedEvent{Data: packet.GetData(), Peer: nbr.Peer}) +} + +func (m *Manager) processMessageRequest(data []byte, nbr *Neighbor) { + packet := new(pb.MessageRequest) + if err := proto.Unmarshal(data[1:], packet); err != nil { + m.log.Debugw("invalid packet", "err", err) + } + + msgID, _, err := message.IdFromBytes(packet.GetId()) + if err != nil { + m.log.Debugw("invalid message id:", "err", err) + } + + msgBytes, err := m.loadMessageFunc(msgID) + if err != nil { + m.log.Debugw("error loading message", "msg-id", msgID, "err", err) + } + + // send the loaded message directly to the neighbor + _, _ = nbr.Write(marshal(&pb.Message{Data: msgBytes})) }