Skip to content
Snippets Groups Projects
Unverified Commit 0b68bca0 authored by Luca Moser's avatar Luca Moser Committed by GitHub
Browse files

unblock message processor from acquiring neighbor manager locks (#499)

parent e8ba423d
No related branches found
No related tags found
No related merge requests found
...@@ -42,7 +42,7 @@ type Manager struct { ...@@ -42,7 +42,7 @@ type Manager struct {
// NewManager creates a new Manager. // NewManager creates a new Manager.
func NewManager(local *peer.Local, f LoadMessageFunc, log *logger.Logger) *Manager { func NewManager(local *peer.Local, f LoadMessageFunc, log *logger.Logger) *Manager {
return &Manager{ m := &Manager{
local: local, local: local,
loadMessageFunc: f, loadMessageFunc: f,
log: log, log: log,
...@@ -55,6 +55,8 @@ func NewManager(local *peer.Local, f LoadMessageFunc, log *logger.Logger) *Manag ...@@ -55,6 +55,8 @@ func NewManager(local *peer.Local, f LoadMessageFunc, log *logger.Logger) *Manag
srv: nil, srv: nil,
neighbors: make(map[identity.ID]*Neighbor), neighbors: make(map[identity.ID]*Neighbor),
} }
m.inboxWorkerPool.Tune(2)
return m
} }
// Start starts the manager for the given TCP server. // Start starts the manager for the given TCP server.
...@@ -215,7 +217,7 @@ func (m *Manager) addNeighbor(peer *peer.Peer, connectorFunc func(*peer.Peer) (n ...@@ -215,7 +217,7 @@ func (m *Manager) addNeighbor(peer *peer.Peer, connectorFunc func(*peer.Peer) (n
dataCopy := make([]byte, len(data)) dataCopy := make([]byte, len(data))
copy(dataCopy, data) copy(dataCopy, data)
m.inboxWorkerPool.Submit(func() { m.inboxWorkerPool.Submit(func() {
if err := m.handlePacket(dataCopy, peer); err != nil { if err := m.handlePacket(dataCopy, n); err != nil {
m.log.Debugw("error handling packet", "err", err) m.log.Debugw("error handling packet", "err", err)
} }
}) })
...@@ -228,7 +230,7 @@ func (m *Manager) addNeighbor(peer *peer.Peer, connectorFunc func(*peer.Peer) (n ...@@ -228,7 +230,7 @@ func (m *Manager) addNeighbor(peer *peer.Peer, connectorFunc func(*peer.Peer) (n
return nil return nil
} }
func (m *Manager) handlePacket(data []byte, p *peer.Peer) error { func (m *Manager) handlePacket(data []byte, n *Neighbor) error {
// ignore empty packages // ignore empty packages
if len(data) == 0 { if len(data) == 0 {
return nil return nil
...@@ -241,8 +243,8 @@ func (m *Manager) handlePacket(data []byte, p *peer.Peer) error { ...@@ -241,8 +243,8 @@ func (m *Manager) handlePacket(data []byte, p *peer.Peer) error {
if err := proto.Unmarshal(data[1:], protoMsg); err != nil { if err := proto.Unmarshal(data[1:], protoMsg); err != nil {
return fmt.Errorf("invalid packet: %w", err) return fmt.Errorf("invalid packet: %w", err)
} }
m.log.Debugw("received packet", "type", protoMsg.Type(), "peer-id", p.ID()) m.log.Debugw("received packet", "type", protoMsg.Type(), "peer-id", n.Peer.ID())
m.events.MessageReceived.Trigger(&MessageReceivedEvent{Data: protoMsg.GetData(), Peer: p}) m.events.MessageReceived.Trigger(&MessageReceivedEvent{Data: protoMsg.GetData(), Peer: n.Peer})
case pb.PacketMessageRequest: case pb.PacketMessageRequest:
protoMsgReq := new(pb.MessageRequest) protoMsgReq := new(pb.MessageRequest)
...@@ -250,20 +252,20 @@ func (m *Manager) handlePacket(data []byte, p *peer.Peer) error { ...@@ -250,20 +252,20 @@ func (m *Manager) handlePacket(data []byte, p *peer.Peer) error {
return fmt.Errorf("invalid packet: %w", err) return fmt.Errorf("invalid packet: %w", err)
} }
m.log.Debugw("received packet", "type", protoMsgReq.Type(), "peer-id", p.ID()) m.log.Debugw("received packet", "type", protoMsgReq.Type(), "peer-id", n.Peer.ID())
msgId, _, err := message.IdFromBytes(protoMsgReq.GetId()) msgId, _, err := message.IdFromBytes(protoMsgReq.GetId())
if err != nil { if err != nil {
m.log.Debugw("couldn't compute message id from bytes", "peer-id", p.ID(), "err", err) m.log.Debugw("couldn't compute message id from bytes", "peer-id", n.Peer.ID(), "err", err)
return nil return nil
} }
msg, err := m.loadMessageFunc(msgId) msg, err := m.loadMessageFunc(msgId)
if err != nil { if err != nil {
m.log.Debugw("error getting message", "peer-id", p.ID(), "msg-id", msgId, "err", err) m.log.Debugw("error getting message", "peer-id", n.Peer.ID(), "msg-id", msgId, "err", err)
return nil return nil
} }
m.SendMessage(msg, p.ID()) n.Write(marshal(&pb.Message{Data: msg}))
default: default:
return ErrInvalidPacket return ErrInvalidPacket
} }
......
...@@ -2,6 +2,8 @@ package profiling ...@@ -2,6 +2,8 @@ package profiling
import ( import (
"net/http" "net/http"
"runtime"
// import required to profile // import required to profile
_ "net/http/pprof" _ "net/http/pprof"
...@@ -33,6 +35,10 @@ func configure(_ *node.Plugin) { ...@@ -33,6 +35,10 @@ func configure(_ *node.Plugin) {
func run(_ *node.Plugin) { func run(_ *node.Plugin) {
bindAddr := config.Node.GetString(CfgProfilingBindAddress) bindAddr := config.Node.GetString(CfgProfilingBindAddress)
runtime.SetMutexProfileFraction(5)
runtime.SetBlockProfileRate(5)
log.Infof("%s started, bind-address=%s", PluginName, bindAddr) log.Infof("%s started, bind-address=%s", PluginName, bindAddr)
go http.ListenAndServe(bindAddr, nil) go http.ListenAndServe(bindAddr, nil)
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment