diff --git a/packages/gossip/manager.go b/packages/gossip/manager.go index 49c0e6b56da51c35cef66d8c27fb08b7b6439088..8a8c1587e14acc098edcf0c31812f42a0bb0c49e 100644 --- a/packages/gossip/manager.go +++ b/packages/gossip/manager.go @@ -151,13 +151,13 @@ func (m *Manager) send(msg []byte, to ...peer.ID) { } } -func (m *Manager) addNeighbor(peer *peer.Peer, handshake func(*peer.Peer) (*transport.Connection, error)) error { +func (m *Manager) addNeighbor(peer *peer.Peer, connectorFunc func(*peer.Peer) (*transport.Connection, error)) error { var ( err error conn *transport.Connection ) for i := 0; i < maxConnectionAttempts; i++ { - conn, err = handshake(peer) + conn, err = connectorFunc(peer) if err == nil { break } @@ -210,9 +210,10 @@ func (m *Manager) readLoop(nbr *neighbor) { if err != io.EOF && !strings.Contains(err.Error(), "use of closed network connection") { m.log.Warnw("read error", "err", err) } + + m.log.Debug("connection closed", "id", nbr.peer.ID(), "addr", nbr.conn.RemoteAddr().String()) _ = nbr.conn.Close() // just make sure that the connection is closed as fast as possible _ = m.DropNeighbor(nbr.peer.ID()) - m.log.Debug("reading stopped") return } diff --git a/packages/gossip/transport/transport.go b/packages/gossip/transport/transport.go index 54794a631f89017ceb5fe12803d851f0b6e35fda..ef651bb300374e607d7394a12a18592d0d946050 100644 --- a/packages/gossip/transport/transport.go +++ b/packages/gossip/transport/transport.go @@ -141,15 +141,18 @@ func (t *TCP) DialPeer(p *peer.Peer) (*Connection, error) { conn, err := net.DialTimeout(gossipAddr.Network(), gossipAddr.String(), acceptTimeout) if err != nil { - return nil, err + return nil, errors.Wrap(err, "dial peer failed") } err = t.doHandshake(p.PublicKey(), gossipAddr.String(), conn) if err != nil { - return nil, err + return nil, errors.Wrap(err, "outgoing handshake failed") } - t.log.Debugw("connected", "id", p.ID(), "addr", conn.RemoteAddr(), "direction", "out") + t.log.Debugw("outgoing connection established", + "id", p.ID(), + "addr", conn.RemoteAddr(), + ) return newConnection(conn, p), nil } @@ -159,13 +162,17 @@ func (t *TCP) AcceptPeer(p *peer.Peer) (*Connection, error) { if p.Services().Get(service.GossipKey) == nil { return nil, ErrNoGossip } + // wait for the connection connected := <-t.acceptPeer(p) if connected.err != nil { - return nil, connected.err + return nil, errors.Wrap(connected.err, "accept peer failed") } - t.log.Debugw("connected", "id", p.ID(), "addr", connected.c.RemoteAddr(), "direction", "in") + t.log.Debugw("incoming connection established", + "id", p.ID(), + "addr", connected.c.RemoteAddr(), + ) return connected.c, nil } @@ -262,8 +269,7 @@ func (t *TCP) matchAccept(m *acceptMatcher, req []byte, conn net.Conn) { defer t.wg.Done() if err := t.writeHandshakeResponse(req, conn); err != nil { - t.log.Warnw("failed handshake", "addr", conn.RemoteAddr(), "err", err) - m.connected <- connect{nil, err} + m.connected <- connect{nil, errors.Wrap(err, "incoming handshake failed")} t.closeConnection(conn) return } @@ -320,7 +326,7 @@ func (t *TCP) doHandshake(key peer.PublicKey, remoteAddr string, conn net.Conn) } b, err := proto.Marshal(pkt) if err != nil { - return errors.Wrap(err, ErrInvalidHandshake.Error()) + return err } if l := len(b); l > maxHandshakePacketSize { return fmt.Errorf("handshake size too large: %d, max %d", l, maxHandshakePacketSize) @@ -345,7 +351,7 @@ func (t *TCP) doHandshake(key peer.PublicKey, remoteAddr string, conn net.Conn) pkt = new(pb.Packet) if err := proto.Unmarshal(b[:n], pkt); err != nil { - return errors.Wrap(err, ErrInvalidHandshake.Error()) + return err } signer, err := peer.RecoverKeyFromSignedData(pkt)