Skip to content
Snippets Groups Projects
Unverified Commit 203aec45 authored by jkrvivian's avatar jkrvivian Committed by GitHub
Browse files

Fix: Log dropping packets every 1000 drops (#255)


* Fix: Log dropping packets every 1000 drops

* use an atomic counter for dropped message because Write() could be called concurrently

* removes redundant zero

* Add external check in test

Co-authored-by: default avatarLuca Moser <moser.luca@gmail.com>
parent 9eb72cb0
No related branches found
No related tags found
No related merge requests found
...@@ -11,6 +11,7 @@ import ( ...@@ -11,6 +11,7 @@ import (
"github.com/iotaledger/goshimmer/packages/netutil/buffconn" "github.com/iotaledger/goshimmer/packages/netutil/buffconn"
"github.com/iotaledger/hive.go/autopeering/peer" "github.com/iotaledger/hive.go/autopeering/peer"
"github.com/iotaledger/hive.go/logger" "github.com/iotaledger/hive.go/logger"
"go.uber.org/atomic"
) )
var ( var (
...@@ -19,16 +20,18 @@ var ( ...@@ -19,16 +20,18 @@ var (
) )
const ( const (
neighborQueueSize = 5000 neighborQueueSize = 5000
maxNumReadErrors = 10 maxNumReadErrors = 10
droppedMessagesThreshold = 1000
) )
type Neighbor struct { type Neighbor struct {
*peer.Peer *peer.Peer
*buffconn.BufferedConnection *buffconn.BufferedConnection
log *logger.Logger log *logger.Logger
queue chan []byte queue chan []byte
messagesDropped atomic.Int32
wg sync.WaitGroup wg sync.WaitGroup
closing chan struct{} closing chan struct{}
...@@ -150,6 +153,10 @@ func (n *Neighbor) Write(b []byte) (int, error) { ...@@ -150,6 +153,10 @@ func (n *Neighbor) Write(b []byte) (int, error) {
case <-n.closing: case <-n.closing:
return 0, nil return 0, nil
default: default:
return 0, ErrNeighborQueueFull if n.messagesDropped.Inc() >= droppedMessagesThreshold {
n.messagesDropped.Store(0)
return 0, ErrNeighborQueueFull
}
return 0, nil
} }
} }
...@@ -87,8 +87,8 @@ func TestNeighborParallelWrite(t *testing.T) { ...@@ -87,8 +87,8 @@ func TestNeighborParallelWrite(t *testing.T) {
go func() { go func() {
defer wg.Done() defer wg.Done()
for i := 0; i < neighborQueueSize; i++ { for i := 0; i < neighborQueueSize; i++ {
_, err := neighborA.Write(testData) l, err := neighborA.Write(testData)
if err == ErrNeighborQueueFull { if err == ErrNeighborQueueFull || l == 0 {
continue continue
} }
assert.NoError(t, err) assert.NoError(t, err)
...@@ -99,8 +99,8 @@ func TestNeighborParallelWrite(t *testing.T) { ...@@ -99,8 +99,8 @@ func TestNeighborParallelWrite(t *testing.T) {
go func() { go func() {
defer wg.Done() defer wg.Done()
for i := 0; i < neighborQueueSize; i++ { for i := 0; i < neighborQueueSize; i++ {
_, err := neighborA.Write(testData) l, err := neighborA.Write(testData)
if err == ErrNeighborQueueFull { if err == ErrNeighborQueueFull || l == 0 {
continue continue
} }
assert.NoError(t, err) assert.NoError(t, err)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment