diff --git a/packages/gossip/neighbor.go b/packages/gossip/neighbor.go index cbf39e91ab2b22d7b1093d69ee733adb6737f021..a529aab2af7ede70439dd696a3d15f2e7a1cd8c4 100644 --- a/packages/gossip/neighbor.go +++ b/packages/gossip/neighbor.go @@ -11,6 +11,7 @@ import ( "github.com/iotaledger/goshimmer/packages/netutil/buffconn" "github.com/iotaledger/hive.go/autopeering/peer" "github.com/iotaledger/hive.go/logger" + "go.uber.org/atomic" ) var ( @@ -19,16 +20,18 @@ var ( ) const ( - neighborQueueSize = 5000 - maxNumReadErrors = 10 + neighborQueueSize = 5000 + maxNumReadErrors = 10 + droppedMessagesThreshold = 1000 ) type Neighbor struct { *peer.Peer *buffconn.BufferedConnection - log *logger.Logger - queue chan []byte + log *logger.Logger + queue chan []byte + messagesDropped atomic.Int32 wg sync.WaitGroup closing chan struct{} @@ -150,6 +153,10 @@ func (n *Neighbor) Write(b []byte) (int, error) { case <-n.closing: return 0, nil default: - return 0, ErrNeighborQueueFull + if n.messagesDropped.Inc() >= droppedMessagesThreshold { + n.messagesDropped.Store(0) + return 0, ErrNeighborQueueFull + } + return 0, nil } } diff --git a/packages/gossip/neighbor_test.go b/packages/gossip/neighbor_test.go index 21d02cf9341e8694ca96cd461373a33d33272d1d..36c61534baf178c3fd637406302661e6b75687bd 100644 --- a/packages/gossip/neighbor_test.go +++ b/packages/gossip/neighbor_test.go @@ -87,8 +87,8 @@ func TestNeighborParallelWrite(t *testing.T) { go func() { defer wg.Done() for i := 0; i < neighborQueueSize; i++ { - _, err := neighborA.Write(testData) - if err == ErrNeighborQueueFull { + l, err := neighborA.Write(testData) + if err == ErrNeighborQueueFull || l == 0 { continue } assert.NoError(t, err) @@ -99,8 +99,8 @@ func TestNeighborParallelWrite(t *testing.T) { go func() { defer wg.Done() for i := 0; i < neighborQueueSize; i++ { - _, err := neighborA.Write(testData) - if err == ErrNeighborQueueFull { + l, err := neighborA.Write(testData) + if err == ErrNeighborQueueFull || l == 0 { continue } assert.NoError(t, err)