Skip to content
Snippets Groups Projects
Unverified Commit f6c3ead1 authored by capossele's avatar capossele
Browse files

:recycle: Refactor gossip manager workerpools

parent 516a1875
No related branches found
No related tags found
No related merge requests found
...@@ -3,23 +3,32 @@ package gossip ...@@ -3,23 +3,32 @@ package gossip
import ( import (
"fmt" "fmt"
"net" "net"
"runtime"
"sync" "sync"
"github.com/golang/protobuf/proto" "github.com/golang/protobuf/proto"
"github.com/iotaledger/goshimmer/packages/binary/messagelayer/message" "github.com/iotaledger/goshimmer/packages/binary/messagelayer/message"
pb "github.com/iotaledger/goshimmer/packages/gossip/proto" pb "github.com/iotaledger/goshimmer/packages/gossip/proto"
"github.com/iotaledger/goshimmer/packages/gossip/server" "github.com/iotaledger/goshimmer/packages/gossip/server"
"github.com/iotaledger/hive.go/async"
"github.com/iotaledger/hive.go/autopeering/peer" "github.com/iotaledger/hive.go/autopeering/peer"
"github.com/iotaledger/hive.go/events" "github.com/iotaledger/hive.go/events"
"github.com/iotaledger/hive.go/identity" "github.com/iotaledger/hive.go/identity"
"github.com/iotaledger/hive.go/logger" "github.com/iotaledger/hive.go/logger"
"github.com/iotaledger/hive.go/workerpool"
) )
const ( const (
maxPacketSize = 64 * 1024 maxPacketSize = 64 * 1024
) )
var (
messageWorkerCount = runtime.GOMAXPROCS(0)
messageWorkerQueueSize = 500
messageRequestWorkerCount = 1
messageRequestWorkerQueueSize = 100
)
// LoadMessageFunc defines a function that returns the message for the given id. // LoadMessageFunc defines a function that returns the message for the given id.
type LoadMessageFunc func(messageId message.Id) ([]byte, error) type LoadMessageFunc func(messageId message.Id) ([]byte, error)
...@@ -36,8 +45,10 @@ type Manager struct { ...@@ -36,8 +45,10 @@ type Manager struct {
srv *server.TCP srv *server.TCP
neighbors map[identity.ID]*Neighbor neighbors map[identity.ID]*Neighbor
// inboxWorkerPool defines a worker pool where all incoming messages are processed. // messageWorkerPool defines a worker pool where all incoming messages are processed.
inboxWorkerPool async.WorkerPool messageWorkerPool *workerpool.WorkerPool
messageRequestWorkerPool *workerpool.WorkerPool
} }
// NewManager creates a new Manager. // NewManager creates a new Manager.
...@@ -55,6 +66,21 @@ func NewManager(local *peer.Local, f LoadMessageFunc, log *logger.Logger) *Manag ...@@ -55,6 +66,21 @@ func NewManager(local *peer.Local, f LoadMessageFunc, log *logger.Logger) *Manag
srv: nil, srv: nil,
neighbors: make(map[identity.ID]*Neighbor), neighbors: make(map[identity.ID]*Neighbor),
} }
m.messageWorkerPool = workerpool.New(func(task workerpool.Task) {
m.processPacketMessage(task.Param(0).([]byte), task.Param(1).(*Neighbor))
task.Return(nil)
}, workerpool.WorkerCount(messageWorkerCount), workerpool.QueueSize(messageWorkerQueueSize))
m.messageRequestWorkerPool = workerpool.New(func(task workerpool.Task) {
m.processMessageRequest(task.Param(0).([]byte), task.Param(1).(*Neighbor))
task.Return(nil)
}, workerpool.WorkerCount(messageRequestWorkerCount), workerpool.QueueSize(messageRequestWorkerQueueSize))
return m return m
} }
...@@ -64,6 +90,9 @@ func (m *Manager) Start(srv *server.TCP) { ...@@ -64,6 +90,9 @@ func (m *Manager) Start(srv *server.TCP) {
defer m.mu.Unlock() defer m.mu.Unlock()
m.srv = srv m.srv = srv
m.messageWorkerPool.Start()
m.messageRequestWorkerPool.Start()
} }
// Close stops the manager and closes all established connections. // Close stops the manager and closes all established connections.
...@@ -71,7 +100,8 @@ func (m *Manager) Close() { ...@@ -71,7 +100,8 @@ func (m *Manager) Close() {
m.stop() m.stop()
m.wg.Wait() m.wg.Wait()
m.inboxWorkerPool.ShutdownGracefully() m.messageWorkerPool.Stop()
m.messageRequestWorkerPool.Stop()
} }
// Events returns the events related to the gossip protocol. // Events returns the events related to the gossip protocol.
...@@ -213,11 +243,9 @@ func (m *Manager) addNeighbor(peer *peer.Peer, connectorFunc func(*peer.Peer) (n ...@@ -213,11 +243,9 @@ func (m *Manager) addNeighbor(peer *peer.Peer, connectorFunc func(*peer.Peer) (n
nbr.Events.ReceiveMessage.Attach(events.NewClosure(func(data []byte) { nbr.Events.ReceiveMessage.Attach(events.NewClosure(func(data []byte) {
dataCopy := make([]byte, len(data)) dataCopy := make([]byte, len(data))
copy(dataCopy, data) copy(dataCopy, data)
m.inboxWorkerPool.Submit(func() { if err := m.handlePacket(dataCopy, nbr); err != nil {
if err := m.handlePacket(dataCopy, nbr); err != nil { m.log.Debugw("error handling packet", "err", err)
m.log.Debugw("error handling packet", "err", err) }
}
})
})) }))
m.neighbors[peer.ID()] = nbr m.neighbors[peer.ID()] = nbr
...@@ -236,31 +264,14 @@ func (m *Manager) handlePacket(data []byte, nbr *Neighbor) error { ...@@ -236,31 +264,14 @@ func (m *Manager) handlePacket(data []byte, nbr *Neighbor) error {
switch pb.PacketType(data[0]) { switch pb.PacketType(data[0]) {
case pb.PacketMessage: case pb.PacketMessage:
packet := new(pb.Message) if _, added := m.messageWorkerPool.TrySubmit(data, nbr); !added {
if err := proto.Unmarshal(data[1:], packet); err != nil { return fmt.Errorf("messageWorkerPool full: packet message discarded")
return fmt.Errorf("invalid packet: %w", err)
} }
m.events.MessageReceived.Trigger(&MessageReceivedEvent{Data: packet.GetData(), Peer: nbr.Peer})
case pb.PacketMessageRequest: case pb.PacketMessageRequest:
packet := new(pb.MessageRequest) if _, added := m.messageRequestWorkerPool.TrySubmit(data, nbr); !added {
if err := proto.Unmarshal(data[1:], packet); err != nil { return fmt.Errorf("messageRequestWorkerPool full: message request discarded")
return fmt.Errorf("invalid packet: %w", err)
}
msgID, _, err := message.IdFromBytes(packet.GetId())
if err != nil {
return fmt.Errorf("invalid message id: %w", err)
} }
msgBytes, err := m.loadMessageFunc(msgID)
if err != nil {
m.log.Debugw("error loading message", "msg-id", msgID, "err", err)
return nil
}
// send the loaded message directly to the neighbor
_, _ = nbr.Write(marshal(&pb.Message{Data: msgBytes}))
default: default:
return ErrInvalidPacket return ErrInvalidPacket
} }
...@@ -283,5 +294,33 @@ func marshal(packet pb.Packet) []byte { ...@@ -283,5 +294,33 @@ func marshal(packet pb.Packet) []byte {
// WorkerPoolStatus returns the name and the load of the workerpool. // WorkerPoolStatus returns the name and the load of the workerpool.
func (m *Manager) WorkerPoolStatus() (name string, load int) { func (m *Manager) WorkerPoolStatus() (name string, load int) {
return "InboxWorkerPool", m.inboxWorkerPool.RunningWorkers() return "InboxWorkerPool", m.messageWorkerPool.GetPendingQueueSize() + m.messageRequestWorkerPool.GetPendingQueueSize()
}
func (m *Manager) processPacketMessage(data []byte, nbr *Neighbor) {
packet := new(pb.Message)
if err := proto.Unmarshal(data[1:], packet); err != nil {
m.log.Debugw("error processing packet", "err", err)
}
m.events.MessageReceived.Trigger(&MessageReceivedEvent{Data: packet.GetData(), Peer: nbr.Peer})
}
func (m *Manager) processMessageRequest(data []byte, nbr *Neighbor) {
packet := new(pb.MessageRequest)
if err := proto.Unmarshal(data[1:], packet); err != nil {
m.log.Debugw("invalid packet", "err", err)
}
msgID, _, err := message.IdFromBytes(packet.GetId())
if err != nil {
m.log.Debugw("invalid message id:", "err", err)
}
msgBytes, err := m.loadMessageFunc(msgID)
if err != nil {
m.log.Debugw("error loading message", "msg-id", msgID, "err", err)
}
// send the loaded message directly to the neighbor
_, _ = nbr.Write(marshal(&pb.Message{Data: msgBytes}))
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment