diff --git a/packages/gossip/manager.go b/packages/gossip/manager.go index 7e9f693c18e2f820b36444957b6091f190136057..40d96b28410e2162aa365eb93e0d7053a5e44c01 100644 --- a/packages/gossip/manager.go +++ b/packages/gossip/manager.go @@ -42,7 +42,7 @@ type Manager struct { // NewManager creates a new Manager. func NewManager(local *peer.Local, f LoadMessageFunc, log *logger.Logger) *Manager { - return &Manager{ + m := &Manager{ local: local, loadMessageFunc: f, log: log, @@ -55,6 +55,8 @@ func NewManager(local *peer.Local, f LoadMessageFunc, log *logger.Logger) *Manag srv: nil, neighbors: make(map[identity.ID]*Neighbor), } + m.inboxWorkerPool.Tune(2) + return m } // Start starts the manager for the given TCP server. @@ -215,7 +217,7 @@ func (m *Manager) addNeighbor(peer *peer.Peer, connectorFunc func(*peer.Peer) (n dataCopy := make([]byte, len(data)) copy(dataCopy, data) m.inboxWorkerPool.Submit(func() { - if err := m.handlePacket(dataCopy, peer); err != nil { + if err := m.handlePacket(dataCopy, n); err != nil { m.log.Debugw("error handling packet", "err", err) } }) @@ -228,7 +230,7 @@ func (m *Manager) addNeighbor(peer *peer.Peer, connectorFunc func(*peer.Peer) (n return nil } -func (m *Manager) handlePacket(data []byte, p *peer.Peer) error { +func (m *Manager) handlePacket(data []byte, n *Neighbor) error { // ignore empty packages if len(data) == 0 { return nil @@ -241,8 +243,8 @@ func (m *Manager) handlePacket(data []byte, p *peer.Peer) error { if err := proto.Unmarshal(data[1:], protoMsg); err != nil { return fmt.Errorf("invalid packet: %w", err) } - m.log.Debugw("received packet", "type", protoMsg.Type(), "peer-id", p.ID()) - m.events.MessageReceived.Trigger(&MessageReceivedEvent{Data: protoMsg.GetData(), Peer: p}) + m.log.Debugw("received packet", "type", protoMsg.Type(), "peer-id", n.Peer.ID()) + m.events.MessageReceived.Trigger(&MessageReceivedEvent{Data: protoMsg.GetData(), Peer: n.Peer}) case pb.PacketMessageRequest: protoMsgReq := new(pb.MessageRequest) @@ -250,20 +252,20 @@ func (m *Manager) handlePacket(data []byte, p *peer.Peer) error { return fmt.Errorf("invalid packet: %w", err) } - m.log.Debugw("received packet", "type", protoMsgReq.Type(), "peer-id", p.ID()) + m.log.Debugw("received packet", "type", protoMsgReq.Type(), "peer-id", n.Peer.ID()) msgId, _, err := message.IdFromBytes(protoMsgReq.GetId()) if err != nil { - m.log.Debugw("couldn't compute message id from bytes", "peer-id", p.ID(), "err", err) + m.log.Debugw("couldn't compute message id from bytes", "peer-id", n.Peer.ID(), "err", err) return nil } msg, err := m.loadMessageFunc(msgId) if err != nil { - m.log.Debugw("error getting message", "peer-id", p.ID(), "msg-id", msgId, "err", err) + m.log.Debugw("error getting message", "peer-id", n.Peer.ID(), "msg-id", msgId, "err", err) return nil } - m.SendMessage(msg, p.ID()) + n.Write(marshal(&pb.Message{Data: msg})) default: return ErrInvalidPacket } diff --git a/plugins/profiling/plugin.go b/plugins/profiling/plugin.go index 2aeb0f6439b3baddea1b679117059e16ef0315cd..388da6453312ed18e7a030241dafbe03eb7af109 100644 --- a/plugins/profiling/plugin.go +++ b/plugins/profiling/plugin.go @@ -2,6 +2,8 @@ package profiling import ( "net/http" + "runtime" + // import required to profile _ "net/http/pprof" @@ -33,6 +35,10 @@ func configure(_ *node.Plugin) { func run(_ *node.Plugin) { bindAddr := config.Node.GetString(CfgProfilingBindAddress) + + runtime.SetMutexProfileFraction(5) + runtime.SetBlockProfileRate(5) + log.Infof("%s started, bind-address=%s", PluginName, bindAddr) go http.ListenAndServe(bindAddr, nil) }