diff --git a/go.mod b/go.mod index 7b6391201ffdd2f037ffee6bdf8d5f52432d243a..8629f7239d23ae4d793dcd7b9e8ad1b9939a3b31 100644 --- a/go.mod +++ b/go.mod @@ -35,6 +35,7 @@ require ( github.com/stretchr/objx v0.2.0 // indirect github.com/stretchr/testify v1.4.0 github.com/valyala/fasttemplate v1.1.0 // indirect + go.uber.org/atomic v1.5.1 go.uber.org/zap v1.13.0 golang.org/x/crypto v0.0.0-20200128174031-69ecbb4d6d5d golang.org/x/net v0.0.0-20191209160850-c0dbc17a3553 diff --git a/packages/netutil/buffconn/buffconn.go b/packages/netutil/buffconn/buffconn.go index 70f51d2feb22849e162034f904b4ee784c202c5d..c1ad9eaa28544f3091a1a3788798a4d2cdf6ae83 100644 --- a/packages/netutil/buffconn/buffconn.go +++ b/packages/netutil/buffconn/buffconn.go @@ -9,6 +9,7 @@ import ( "time" "github.com/iotaledger/hive.go/events" + "go.uber.org/atomic" ) const ( @@ -40,8 +41,8 @@ type BufferedConnection struct { incomingHeaderBuffer []byte closeOnce sync.Once - BytesRead int - BytesWritten int + bytesRead *atomic.Uint32 + bytesWritten *atomic.Uint32 } // NewBufferedConnection creates a new BufferedConnection from a net.Conn. @@ -53,6 +54,8 @@ func NewBufferedConnection(conn net.Conn) *BufferedConnection { }, conn: conn, incomingHeaderBuffer: make([]byte, headerSize), + bytesRead: atomic.NewUint32(0), + bytesWritten: atomic.NewUint32(0), } } @@ -77,6 +80,16 @@ func (c *BufferedConnection) RemoteAddr() net.Addr { return c.conn.RemoteAddr() } +// BytesRead returns the total number of bytes read. +func (c *BufferedConnection) BytesRead() uint32 { + return c.bytesRead.Load() +} + +// BytesWritten returns the total number of bytes written. +func (c *BufferedConnection) BytesWritten() uint32 { + return c.bytesWritten.Load() +} + // Read starts reading on the connection, it only returns when an error occurred or when Close has been called. // If a complete message has been received and ReceiveMessage event is triggered with its complete payload. // If read leads to an error, the loop will be stopped and that error returned. @@ -114,7 +127,7 @@ func (c *BufferedConnection) Write(msg []byte) (int, error) { for bytesWritten := 0; bytesWritten < toWrite; { n, err := c.conn.Write(buffer[bytesWritten:]) bytesWritten += n - c.BytesWritten += n + c.bytesWritten.Add(uint32(n)) if err != nil { return bytesWritten, err } @@ -127,7 +140,7 @@ func (c *BufferedConnection) read(buffer []byte) (int, error) { for bytesRead := 0; bytesRead < toRead; { n, err := c.conn.Read(buffer[bytesRead:]) bytesRead += n - c.BytesRead += n + c.bytesRead.Add(uint32(n)) if err != nil { return bytesRead, err } diff --git a/plugins/spa/plugin.go b/plugins/spa/plugin.go index fd3215d8de6a81ef237b7ece696856f50028e1c9..829d3609a9bee5d4dac0b822494697fdb5170561 100644 --- a/plugins/spa/plugin.go +++ b/plugins/spa/plugin.go @@ -161,8 +161,8 @@ type neighbormetric struct { ID string `json:"id"` Address string `json:"address"` ConnectionOrigin string `json:"connection_origin"` - BytesRead int `json:"bytes_read"` - BytesWritten int `json:"bytes_written"` + BytesRead uint32 `json:"bytes_read"` + BytesWritten uint32 `json:"bytes_written"` } func neighborMetrics() []neighbormetric { @@ -186,8 +186,8 @@ func neighborMetrics() []neighbormetric { stats = append(stats, neighbormetric{ ID: neighbor.Peer.ID().String(), Address: neighbor.Peer.Services().Get(service.GossipKey).String(), - BytesRead: neighbor.BytesRead, - BytesWritten: neighbor.BytesWritten, + BytesRead: neighbor.BytesRead(), + BytesWritten: neighbor.BytesWritten(), ConnectionOrigin: origin, }) }