diff --git a/packages/gossip/manager.go b/packages/gossip/manager.go index 80b04494fb078332042ac72a659ed03c5be09968..eba4d5e63d11cee193336f1337e1e053a4c0db2a 100644 --- a/packages/gossip/manager.go +++ b/packages/gossip/manager.go @@ -137,7 +137,6 @@ func (m *Manager) DropNeighbor(id identity.ID) error { // If no peer is provided, all neighbors are queried. func (m *Manager) RequestMessage(messageId []byte, to ...identity.ID) { msgReq := &pb.MessageRequest{Id: messageId} - m.log.Debugw("send packet", "type", msgReq.Type(), "to", to) m.send(marshal(msgReq), to...) } @@ -145,7 +144,6 @@ func (m *Manager) RequestMessage(messageId []byte, to ...identity.ID) { // The actual send then happens asynchronously. If no peer is provided, it is send to all neighbors. func (m *Manager) SendMessage(msgData []byte, to ...identity.ID) { msg := &pb.Message{Data: msgData} - m.log.Debugw("send packet", "type", msg.Type(), "to", to) m.send(marshal(msg), to...) } @@ -187,7 +185,7 @@ func (m *Manager) send(b []byte, to ...identity.ID) { for _, nbr := range neighbors { if _, err := nbr.Write(b); err != nil { - m.log.Warnw("send error", "err", err, "neighbor", nbr.Peer.Address()) + m.log.Warnw("send error", "peer-id", nbr.ID(), "err", err) } } } @@ -206,30 +204,30 @@ func (m *Manager) addNeighbor(peer *peer.Peer, connectorFunc func(*peer.Peer) (n } // create and add the neighbor - n := NewNeighbor(peer, conn, m.log) - n.Events.Close.Attach(events.NewClosure(func() { + nbr := NewNeighbor(peer, conn, m.log) + nbr.Events.Close.Attach(events.NewClosure(func() { // assure that the neighbor is removed and notify _ = m.DropNeighbor(peer.ID()) - m.events.NeighborRemoved.Trigger(n) + m.events.NeighborRemoved.Trigger(nbr) })) - n.Events.ReceiveMessage.Attach(events.NewClosure(func(data []byte) { + nbr.Events.ReceiveMessage.Attach(events.NewClosure(func(data []byte) { dataCopy := make([]byte, len(data)) copy(dataCopy, data) m.inboxWorkerPool.Submit(func() { - if err := m.handlePacket(dataCopy, n); err != nil { + if err := m.handlePacket(dataCopy, nbr); err != nil { m.log.Debugw("error handling packet", "err", err) } }) })) - m.neighbors[peer.ID()] = n - n.Listen() - m.events.NeighborAdded.Trigger(n) + m.neighbors[peer.ID()] = nbr + nbr.Listen() + m.events.NeighborAdded.Trigger(nbr) return nil } -func (m *Manager) handlePacket(data []byte, n *Neighbor) error { +func (m *Manager) handlePacket(data []byte, nbr *Neighbor) error { // ignore empty packages if len(data) == 0 { return nil @@ -238,33 +236,33 @@ func (m *Manager) handlePacket(data []byte, n *Neighbor) error { switch pb.PacketType(data[0]) { case pb.PacketMessage: - protoMsg := new(pb.Message) - if err := proto.Unmarshal(data[1:], protoMsg); err != nil { + packet := new(pb.Message) + if err := proto.Unmarshal(data[1:], packet); err != nil { return fmt.Errorf("invalid packet: %w", err) } - m.log.Debugw("received packet", "type", protoMsg.Type(), "peer-id", n.Peer.ID()) - m.events.MessageReceived.Trigger(&MessageReceivedEvent{Data: protoMsg.GetData(), Peer: n.Peer}) + m.log.Debugw("received packet", "type", packet.Name(), "peer-id", nbr.ID()) + m.events.MessageReceived.Trigger(&MessageReceivedEvent{Data: packet.GetData(), Peer: nbr.Peer}) case pb.PacketMessageRequest: - protoMsgReq := new(pb.MessageRequest) - if err := proto.Unmarshal(data[1:], protoMsgReq); err != nil { + packet := new(pb.MessageRequest) + if err := proto.Unmarshal(data[1:], packet); err != nil { return fmt.Errorf("invalid packet: %w", err) } - m.log.Debugw("received packet", "type", protoMsgReq.Type(), "peer-id", n.Peer.ID()) - msgId, _, err := message.IdFromBytes(protoMsgReq.GetId()) + m.log.Debugw("received packet", "type", packet.Name(), "peer-id", nbr.ID()) + msgID, _, err := message.IdFromBytes(packet.GetId()) if err != nil { - m.log.Debugw("couldn't compute message id from bytes", "peer-id", n.Peer.ID(), "err", err) - return nil + return fmt.Errorf("invalid message id: %w", err) } - msg, err := m.loadMessageFunc(msgId) + msgBytes, err := m.loadMessageFunc(msgID) if err != nil { - m.log.Debugw("error getting message", "peer-id", n.Peer.ID(), "msg-id", msgId, "err", err) + m.log.Debugw("error loading message", "msg-id", msgID, "err", err) return nil } - n.Write(marshal(&pb.Message{Data: msg})) + // send the loaded message directly to the neighbor + _, _ = nbr.Write(marshal(&pb.Message{Data: msgBytes})) default: return ErrInvalidPacket } diff --git a/plugins/gossip/gossip.go b/plugins/gossip/gossip.go index 17e19e180f39d8c7a59a722586791e02f099f616..06162245c0e1f2d51740df2420e6b3b325483225 100644 --- a/plugins/gossip/gossip.go +++ b/plugins/gossip/gossip.go @@ -1,7 +1,7 @@ package gossip import ( - "fmt" + "errors" "net" "strconv" "sync" @@ -18,6 +18,11 @@ import ( "github.com/iotaledger/hive.go/netutil" ) +var ( + // ErrMessageNotFound is returned when a message could not be found in the Tangle. + ErrMessageNotFound = errors.New("message not found") +) + var ( mgr *gossip.Manager mgrOnce sync.Once @@ -87,11 +92,10 @@ func start(shutdownSignal <-chan struct{}) { // loads the given message from the message layer or an error if not found. func loadMessage(messageID message.Id) (bytes []byte, err error) { - log.Debugw("load message from db", "id", messageID.String()) if !messagelayer.Tangle.Message(messageID).Consume(func(message *message.Message) { bytes = message.Bytes() }) { - err = fmt.Errorf("message not found: hash=%s", messageID) + err = ErrMessageNotFound } return }