Skip to content
Snippets Groups Projects
Unverified Commit 865ffd74 authored by Wolfgang Welz's avatar Wolfgang Welz Committed by GitHub
Browse files

Improve gossip logging (#537)

* improve gossip logging

* fix naming
parent 0b83c7ec
No related branches found
No related tags found
No related merge requests found
......@@ -137,7 +137,6 @@ func (m *Manager) DropNeighbor(id identity.ID) error {
// If no peer is provided, all neighbors are queried.
func (m *Manager) RequestMessage(messageId []byte, to ...identity.ID) {
msgReq := &pb.MessageRequest{Id: messageId}
m.log.Debugw("send packet", "type", msgReq.Type(), "to", to)
m.send(marshal(msgReq), to...)
}
......@@ -145,7 +144,6 @@ func (m *Manager) RequestMessage(messageId []byte, to ...identity.ID) {
// The actual send then happens asynchronously. If no peer is provided, it is send to all neighbors.
func (m *Manager) SendMessage(msgData []byte, to ...identity.ID) {
msg := &pb.Message{Data: msgData}
m.log.Debugw("send packet", "type", msg.Type(), "to", to)
m.send(marshal(msg), to...)
}
......@@ -187,7 +185,7 @@ func (m *Manager) send(b []byte, to ...identity.ID) {
for _, nbr := range neighbors {
if _, err := nbr.Write(b); err != nil {
m.log.Warnw("send error", "err", err, "neighbor", nbr.Peer.Address())
m.log.Warnw("send error", "peer-id", nbr.ID(), "err", err)
}
}
}
......@@ -206,30 +204,30 @@ func (m *Manager) addNeighbor(peer *peer.Peer, connectorFunc func(*peer.Peer) (n
}
// create and add the neighbor
n := NewNeighbor(peer, conn, m.log)
n.Events.Close.Attach(events.NewClosure(func() {
nbr := NewNeighbor(peer, conn, m.log)
nbr.Events.Close.Attach(events.NewClosure(func() {
// assure that the neighbor is removed and notify
_ = m.DropNeighbor(peer.ID())
m.events.NeighborRemoved.Trigger(n)
m.events.NeighborRemoved.Trigger(nbr)
}))
n.Events.ReceiveMessage.Attach(events.NewClosure(func(data []byte) {
nbr.Events.ReceiveMessage.Attach(events.NewClosure(func(data []byte) {
dataCopy := make([]byte, len(data))
copy(dataCopy, data)
m.inboxWorkerPool.Submit(func() {
if err := m.handlePacket(dataCopy, n); err != nil {
if err := m.handlePacket(dataCopy, nbr); err != nil {
m.log.Debugw("error handling packet", "err", err)
}
})
}))
m.neighbors[peer.ID()] = n
n.Listen()
m.events.NeighborAdded.Trigger(n)
m.neighbors[peer.ID()] = nbr
nbr.Listen()
m.events.NeighborAdded.Trigger(nbr)
return nil
}
func (m *Manager) handlePacket(data []byte, n *Neighbor) error {
func (m *Manager) handlePacket(data []byte, nbr *Neighbor) error {
// ignore empty packages
if len(data) == 0 {
return nil
......@@ -238,33 +236,33 @@ func (m *Manager) handlePacket(data []byte, n *Neighbor) error {
switch pb.PacketType(data[0]) {
case pb.PacketMessage:
protoMsg := new(pb.Message)
if err := proto.Unmarshal(data[1:], protoMsg); err != nil {
packet := new(pb.Message)
if err := proto.Unmarshal(data[1:], packet); err != nil {
return fmt.Errorf("invalid packet: %w", err)
}
m.log.Debugw("received packet", "type", protoMsg.Type(), "peer-id", n.Peer.ID())
m.events.MessageReceived.Trigger(&MessageReceivedEvent{Data: protoMsg.GetData(), Peer: n.Peer})
m.log.Debugw("received packet", "type", packet.Name(), "peer-id", nbr.ID())
m.events.MessageReceived.Trigger(&MessageReceivedEvent{Data: packet.GetData(), Peer: nbr.Peer})
case pb.PacketMessageRequest:
protoMsgReq := new(pb.MessageRequest)
if err := proto.Unmarshal(data[1:], protoMsgReq); err != nil {
packet := new(pb.MessageRequest)
if err := proto.Unmarshal(data[1:], packet); err != nil {
return fmt.Errorf("invalid packet: %w", err)
}
m.log.Debugw("received packet", "type", protoMsgReq.Type(), "peer-id", n.Peer.ID())
msgId, _, err := message.IdFromBytes(protoMsgReq.GetId())
m.log.Debugw("received packet", "type", packet.Name(), "peer-id", nbr.ID())
msgID, _, err := message.IdFromBytes(packet.GetId())
if err != nil {
m.log.Debugw("couldn't compute message id from bytes", "peer-id", n.Peer.ID(), "err", err)
return nil
return fmt.Errorf("invalid message id: %w", err)
}
msg, err := m.loadMessageFunc(msgId)
msgBytes, err := m.loadMessageFunc(msgID)
if err != nil {
m.log.Debugw("error getting message", "peer-id", n.Peer.ID(), "msg-id", msgId, "err", err)
m.log.Debugw("error loading message", "msg-id", msgID, "err", err)
return nil
}
n.Write(marshal(&pb.Message{Data: msg}))
// send the loaded message directly to the neighbor
_, _ = nbr.Write(marshal(&pb.Message{Data: msgBytes}))
default:
return ErrInvalidPacket
}
......
package gossip
import (
"fmt"
"errors"
"net"
"strconv"
"sync"
......@@ -18,6 +18,11 @@ import (
"github.com/iotaledger/hive.go/netutil"
)
var (
// ErrMessageNotFound is returned when a message could not be found in the Tangle.
ErrMessageNotFound = errors.New("message not found")
)
var (
mgr *gossip.Manager
mgrOnce sync.Once
......@@ -87,11 +92,10 @@ func start(shutdownSignal <-chan struct{}) {
// loads the given message from the message layer or an error if not found.
func loadMessage(messageID message.Id) (bytes []byte, err error) {
log.Debugw("load message from db", "id", messageID.String())
if !messagelayer.Tangle.Message(messageID).Consume(func(message *message.Message) {
bytes = message.Bytes()
}) {
err = fmt.Errorf("message not found: hash=%s", messageID)
err = ErrMessageNotFound
}
return
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment