package gossip import ( "errors" "io" "net" "strings" "sync" "github.com/iotaledger/hive.go/autopeering/peer" "github.com/iotaledger/hive.go/logger" "github.com/iotaledger/hive.go/netutil" "github.com/iotaledger/hive.go/netutil/buffconn" "go.uber.org/atomic" ) var ( // ErrNeighborQueueFull is returned when the send queue is already full. ErrNeighborQueueFull = errors.New("send queue is full") ) const ( neighborQueueSize = 5000 maxNumReadErrors = 10 droppedMessagesThreshold = 1000 ) type Neighbor struct { *peer.Peer *buffconn.BufferedConnection log *logger.Logger queue chan []byte messagesDropped atomic.Int32 wg sync.WaitGroup closing chan struct{} disconnectOnce sync.Once } // NewNeighbor creates a new neighbor from the provided peer and connection. func NewNeighbor(peer *peer.Peer, conn net.Conn, log *logger.Logger) *Neighbor { if !IsSupported(peer) { panic("peer does not support gossip") } // always include ID and address with every log message log = log.With( "id", peer.ID(), "network", conn.LocalAddr().Network(), "addr", conn.RemoteAddr().String(), ) return &Neighbor{ Peer: peer, BufferedConnection: buffconn.NewBufferedConnection(conn), log: log, queue: make(chan []byte, neighborQueueSize), closing: make(chan struct{}), } } // Listen starts the communication to the neighbor. func (n *Neighbor) Listen() { n.wg.Add(2) go n.readLoop() go n.writeLoop() n.log.Info("Connection established") } // Close closes the connection to the neighbor and stops all communication. func (n *Neighbor) Close() error { err := n.disconnect() // wait for everything to finish n.wg.Wait() n.log.Info("Connection closed") return err } // IsOutbound returns true if the neighbor is an outbound neighbor. func (n *Neighbor) IsOutbound() bool { return GetAddress(n.Peer) == n.RemoteAddr().String() } func (n *Neighbor) disconnect() (err error) { n.disconnectOnce.Do(func() { close(n.closing) err = n.BufferedConnection.Close() }) return } func (n *Neighbor) writeLoop() { defer n.wg.Done() for { select { case msg := <-n.queue: if len(msg) == 0 { continue } if _, err := n.BufferedConnection.Write(msg); err != nil { n.log.Warnw("Write error", "err", err) _ = n.BufferedConnection.Close() return } case <-n.closing: return } } } func (n *Neighbor) readLoop() { defer n.wg.Done() var numReadErrors uint for { err := n.Read() if netutil.IsTemporaryError(err) { // ignore temporary read errors. n.log.Debugw("temporary read error", "err", err) numReadErrors++ if numReadErrors > maxNumReadErrors { n.log.Warnw("Too many read errors", "err", err) _ = n.BufferedConnection.Close() return } continue } if err != nil { // return from the loop on all other errors if err != io.EOF && !strings.Contains(err.Error(), "use of closed network connection") { n.log.Warnw("Permanent error", "err", err) } _ = n.BufferedConnection.Close() return } } } func (n *Neighbor) Write(b []byte) (int, error) { l := len(b) if l > maxPacketSize { n.log.Panicw("message too large", "len", l, "max", maxPacketSize) } // add to queue select { case n.queue <- b: return l, nil case <-n.closing: return 0, nil default: if n.messagesDropped.Inc() >= droppedMessagesThreshold { n.messagesDropped.Store(0) return 0, ErrNeighborQueueFull } return 0, nil } }