Skip to content
Snippets Groups Projects
Unverified Commit 3b62158d authored by Wolfgang Welz's avatar Wolfgang Welz Committed by GitHub
Browse files

Fix: Buffer gossip messages before passing them to ProtoBuf (#180)

* Use buffered connection for gossip

* fix wrong timeout

* Encode length in big endian

* :loud_sound: Improve debug logging
parent da5e87ba
No related branches found
No related tags found
No related merge requests found
...@@ -123,6 +123,7 @@ func (m *Manager) RequestTransaction(txHash []byte, to ...peer.ID) { ...@@ -123,6 +123,7 @@ func (m *Manager) RequestTransaction(txHash []byte, to ...peer.ID) {
req := &pb.TransactionRequest{ req := &pb.TransactionRequest{
Hash: txHash, Hash: txHash,
} }
m.log.Debugw("send message", "type", "TRANSACTION_REQUEST", "to", to)
m.send(marshal(req), to...) m.send(marshal(req), to...)
} }
...@@ -132,6 +133,7 @@ func (m *Manager) SendTransaction(txData []byte, to ...peer.ID) { ...@@ -132,6 +133,7 @@ func (m *Manager) SendTransaction(txData []byte, to ...peer.ID) {
tx := &pb.Transaction{ tx := &pb.Transaction{
Data: txData, Data: txData,
} }
m.log.Debugw("send message", "type", "TRANSACTION", "to", to)
m.send(marshal(tx), to...) m.send(marshal(tx), to...)
} }
...@@ -200,7 +202,7 @@ func (m *Manager) addNeighbor(peer *peer.Peer, connectorFunc func(*peer.Peer) (n ...@@ -200,7 +202,7 @@ func (m *Manager) addNeighbor(peer *peer.Peer, connectorFunc func(*peer.Peer) (n
// create and add the neighbor // create and add the neighbor
n := NewNeighbor(peer, conn, m.log) n := NewNeighbor(peer, conn, m.log)
n.Events.Close.Attach(events.NewClosure(func() { _ = m.DropNeighbor(peer.ID()) })) n.Events.Close.Attach(events.NewClosure(func() { _ = m.DropNeighbor(peer.ID()) }))
n.Events.ReceiveData.Attach(events.NewClosure(func(data []byte) { n.Events.ReceiveMessage.Attach(events.NewClosure(func(data []byte) {
if err := m.handlePacket(data, peer); err != nil { if err := m.handlePacket(data, peer); err != nil {
m.log.Debugw("error handling packet", "err", err) m.log.Debugw("error handling packet", "err", err)
} }
...@@ -227,22 +229,22 @@ func (m *Manager) handlePacket(data []byte, p *peer.Peer) error { ...@@ -227,22 +229,22 @@ func (m *Manager) handlePacket(data []byte, p *peer.Peer) error {
if err := proto.Unmarshal(data[1:], msg); err != nil { if err := proto.Unmarshal(data[1:], msg); err != nil {
return fmt.Errorf("invalid packet: %w", err) return fmt.Errorf("invalid packet: %w", err)
} }
m.log.Debugw("Received Transaction", "data", msg.GetData()) m.log.Debugw("received message", "type", "TRANSACTION", "id", p.ID())
Events.TransactionReceived.Trigger(&TransactionReceivedEvent{Data: msg.GetData(), Peer: p}) Events.TransactionReceived.Trigger(&TransactionReceivedEvent{Data: msg.GetData(), Peer: p})
// Incoming Transaction request // Incoming Transaction request
case pb.MTransactionRequest: case pb.MTransactionRequest:
msg := new(pb.TransactionRequest) msg := new(pb.TransactionRequest)
if err := proto.Unmarshal(data[1:], msg); err != nil { if err := proto.Unmarshal(data[1:], msg); err != nil {
return fmt.Errorf("invalid packet: %w", err) return fmt.Errorf("invalid packet: %w", err)
} }
m.log.Debugw("Received Tx Req", "data", msg.GetHash()) m.log.Debugw("received message", "type", "TRANSACTION_REQUEST", "id", p.ID())
// do something // do something
tx, err := m.getTransaction(msg.GetHash()) tx, err := m.getTransaction(msg.GetHash())
if err != nil { if err != nil {
m.log.Debugw("Tx not available", "tx", msg.GetHash()) m.log.Debugw("error getting transaction", "hash", msg.GetHash(), "err", err)
} else { } else {
m.log.Debugw("Tx found", "tx", tx)
m.SendTransaction(tx, p.ID()) m.SendTransaction(tx, p.ID())
} }
......
...@@ -9,19 +9,23 @@ import ( ...@@ -9,19 +9,23 @@ import (
"github.com/iotaledger/goshimmer/packages/autopeering/peer" "github.com/iotaledger/goshimmer/packages/autopeering/peer"
"github.com/iotaledger/goshimmer/packages/netutil" "github.com/iotaledger/goshimmer/packages/netutil"
"github.com/iotaledger/goshimmer/packages/netutil/buffconn"
"github.com/iotaledger/hive.go/logger" "github.com/iotaledger/hive.go/logger"
"github.com/iotaledger/hive.go/network"
) )
var ( var (
// ErrNeighborQueueFull is returned when the send queue is already full.
ErrNeighborQueueFull = errors.New("send queue is full") ErrNeighborQueueFull = errors.New("send queue is full")
) )
const neighborQueueSize = 1000 const (
neighborQueueSize = 1000
maxNumReadErrors = 10
)
type Neighbor struct { type Neighbor struct {
*peer.Peer *peer.Peer
*network.ManagedConnection *buffconn.BufferedConnection
log *logger.Logger log *logger.Logger
queue chan []byte queue chan []byte
...@@ -45,11 +49,11 @@ func NewNeighbor(peer *peer.Peer, conn net.Conn, log *logger.Logger) *Neighbor { ...@@ -45,11 +49,11 @@ func NewNeighbor(peer *peer.Peer, conn net.Conn, log *logger.Logger) *Neighbor {
) )
return &Neighbor{ return &Neighbor{
Peer: peer, Peer: peer,
ManagedConnection: network.NewManagedConnection(conn), BufferedConnection: buffconn.NewBufferedConnection(conn),
log: log, log: log,
queue: make(chan []byte, neighborQueueSize), queue: make(chan []byte, neighborQueueSize),
closing: make(chan struct{}), closing: make(chan struct{}),
} }
} }
...@@ -68,23 +72,20 @@ func (n *Neighbor) Close() error { ...@@ -68,23 +72,20 @@ func (n *Neighbor) Close() error {
// wait for everything to finish // wait for everything to finish
n.wg.Wait() n.wg.Wait()
n.log.Infow("Connection closed", n.log.Info("Connection closed")
"read", n.BytesRead,
"written", n.BytesWritten,
)
return err return err
} }
// IsOutbound returns true if the neighbor is an outbound neighbor. // IsOutbound returns true if the neighbor is an outbound neighbor.
func (n *Neighbor) IsOutbound() bool { func (n *Neighbor) IsOutbound() bool {
return GetAddress(n.Peer) == n.Conn.RemoteAddr().String() return GetAddress(n.Peer) == n.RemoteAddr().String()
} }
func (n *Neighbor) disconnect() (err error) { func (n *Neighbor) disconnect() (err error) {
n.disconnectOnce.Do(func() { n.disconnectOnce.Do(func() {
close(n.closing) close(n.closing)
close(n.queue) close(n.queue)
err = n.ManagedConnection.Close() err = n.BufferedConnection.Close()
}) })
return return
} }
...@@ -98,8 +99,9 @@ func (n *Neighbor) writeLoop() { ...@@ -98,8 +99,9 @@ func (n *Neighbor) writeLoop() {
if len(msg) == 0 { if len(msg) == 0 {
continue continue
} }
if _, err := n.ManagedConnection.Write(msg); err != nil { if _, err := n.BufferedConnection.Write(msg); err != nil {
n.log.Warn("write error", "err", err) // ignore write errors
n.log.Warn("Write error", "err", err)
} }
case <-n.closing: case <-n.closing:
return return
...@@ -110,22 +112,26 @@ func (n *Neighbor) writeLoop() { ...@@ -110,22 +112,26 @@ func (n *Neighbor) writeLoop() {
func (n *Neighbor) readLoop() { func (n *Neighbor) readLoop() {
defer n.wg.Done() defer n.wg.Done()
// create a buffer for the packages var numReadErrors uint
b := make([]byte, maxPacketSize)
for { for {
_, err := n.ManagedConnection.Read(b) err := n.Read()
if netutil.IsTemporaryError(err) { if netutil.IsTemporaryError(err) {
// ignore temporary read errors. // ignore temporary read errors.
n.log.Debugw("temporary read error", "err", err) n.log.Debugw("temporary read error", "err", err)
numReadErrors++
if numReadErrors > maxNumReadErrors {
n.log.Warnw("Too many read errors", "err", err)
_ = n.BufferedConnection.Close()
return
}
continue continue
} }
if err != nil { if err != nil {
// return from the loop on all other errors // return from the loop on all other errors
if err != io.EOF && !strings.Contains(err.Error(), "use of closed network connection") { if err != io.EOF && !strings.Contains(err.Error(), "use of closed network connection") {
n.log.Warnw("read error", "err", err) n.log.Warnw("Permanent error", "err", err)
} }
_ = n.ManagedConnection.Close() _ = n.BufferedConnection.Close()
return return
} }
} }
......
...@@ -60,7 +60,7 @@ func TestNeighborWrite(t *testing.T) { ...@@ -60,7 +60,7 @@ func TestNeighborWrite(t *testing.T) {
defer neighborB.Close() defer neighborB.Close()
var count uint32 var count uint32
neighborB.Events.ReceiveData.Attach(events.NewClosure(func(data []byte) { neighborB.Events.ReceiveMessage.Attach(events.NewClosure(func(data []byte) {
assert.Equal(t, testData, data) assert.Equal(t, testData, data)
atomic.AddUint32(&count, 1) atomic.AddUint32(&count, 1)
})) }))
...@@ -84,7 +84,7 @@ func TestNeighborParallelWrite(t *testing.T) { ...@@ -84,7 +84,7 @@ func TestNeighborParallelWrite(t *testing.T) {
defer neighborB.Close() defer neighborB.Close()
var count uint32 var count uint32
neighborB.Events.ReceiveData.Attach(events.NewClosure(func(data []byte) { neighborB.Events.ReceiveMessage.Attach(events.NewClosure(func(data []byte) {
assert.Equal(t, testData, data) assert.Equal(t, testData, data)
atomic.AddUint32(&count, 1) atomic.AddUint32(&count, 1)
})) }))
......
package buffconn
import (
"encoding/binary"
"errors"
"fmt"
"net"
"sync"
"time"
"github.com/iotaledger/hive.go/events"
)
const (
// MaxMessageSize is the maximum message size in bytes.
MaxMessageSize = 4096
// IOTimeout specifies the timeout for sending and receiving multi packet messages.
IOTimeout = 4 * time.Second
headerSize = 4 // size of the header: uint32
)
// Errors returned by the BufferedConnection.
var (
ErrInvalidHeader = errors.New("invalid message header")
ErrInsufficientBuffer = errors.New("insufficient buffer")
)
// BufferedConnectionEvents contains all the events that are triggered during the peer discovery.
type BufferedConnectionEvents struct {
ReceiveMessage *events.Event
Close *events.Event
}
// BufferedConnection is a wrapper for sending and reading messages with a buffer.
type BufferedConnection struct {
Events BufferedConnectionEvents
conn net.Conn
incomingHeaderBuffer []byte
closeOnce sync.Once
}
// NewBufferedConnection creates a new BufferedConnection from a net.Conn.
func NewBufferedConnection(conn net.Conn) *BufferedConnection {
return &BufferedConnection{
Events: BufferedConnectionEvents{
ReceiveMessage: events.NewEvent(events.ByteSliceCaller),
Close: events.NewEvent(events.CallbackCaller),
},
conn: conn,
incomingHeaderBuffer: make([]byte, headerSize),
}
}
// Close closes the connection.
// Any blocked Read or Write operations will be unblocked and return errors.
func (c *BufferedConnection) Close() (err error) {
c.closeOnce.Do(func() {
err = c.conn.Close()
// close in separate go routine to avoid deadlocks
go c.Events.Close.Trigger()
})
return err
}
// LocalAddr returns the local network address.
func (c *BufferedConnection) LocalAddr() net.Addr {
return c.conn.LocalAddr()
}
// RemoteAddr returns the remote network address.
func (c *BufferedConnection) RemoteAddr() net.Addr {
return c.conn.RemoteAddr()
}
// Read starts reading on the connection, it only returns when an error occurred or when Close has been called.
// If a complete message has been received and ReceiveMessage event is triggered with its complete payload.
// If read leads to an error, the loop will be stopped and that error returned.
func (c *BufferedConnection) Read() error {
buffer := make([]byte, MaxMessageSize)
for {
n, err := c.readMessage(buffer)
if err != nil {
return err
}
if n > 0 {
c.Events.ReceiveMessage.Trigger(buffer[:n])
}
}
}
// Write sends a stream of bytes as messages.
// Each array of bytes you pass in will be pre-pended with it's size. If the
// connection isn't open you will receive an error. If not all bytes can be
// written, Write will keep trying until the full message is delivered, or the
// connection is broken.
func (c *BufferedConnection) Write(msg []byte) (int, error) {
if l := len(msg); l > MaxMessageSize {
panic(fmt.Sprintf("invalid message length: %d", l))
}
buffer := append(newHeader(len(msg)), msg...)
if err := c.conn.SetWriteDeadline(time.Now().Add(IOTimeout)); err != nil {
return 0, fmt.Errorf("error while setting timeout: %w", err)
}
toWrite := len(buffer)
for bytesWritten := 0; bytesWritten < toWrite; {
n, err := c.conn.Write(buffer[bytesWritten:])
bytesWritten += n
if err != nil {
return bytesWritten, err
}
}
return toWrite - headerSize, nil
}
func (c *BufferedConnection) read(buffer []byte) (int, error) {
toRead := len(buffer)
for bytesRead := 0; bytesRead < toRead; {
n, err := c.conn.Read(buffer[bytesRead:])
bytesRead += n
if err != nil {
return bytesRead, err
}
}
return toRead, nil
}
func (c *BufferedConnection) readMessage(buffer []byte) (int, error) {
if err := c.conn.SetReadDeadline(time.Time{}); err != nil {
return 0, fmt.Errorf("error while unsetting timeout: %w", err)
}
_, err := c.read(c.incomingHeaderBuffer)
if err != nil {
return 0, err
}
msgLength, err := parseHeader(c.incomingHeaderBuffer)
if err != nil {
return 0, err
}
if msgLength > len(buffer) {
return 0, ErrInsufficientBuffer
}
if err := c.conn.SetReadDeadline(time.Now().Add(IOTimeout)); err != nil {
return 0, fmt.Errorf("error while setting timeout: %w", err)
}
return c.read(buffer[:msgLength])
}
func newHeader(msgLength int) []byte {
// the header only consists of the message length
header := make([]byte, headerSize)
binary.BigEndian.PutUint32(header, uint32(msgLength))
return header
}
func parseHeader(header []byte) (int, error) {
if len(header) != headerSize {
return 0, ErrInvalidHeader
}
msgLength := int(binary.BigEndian.Uint32(header))
if msgLength > MaxMessageSize {
return 0, ErrInvalidHeader
}
return msgLength, nil
}
package buffconn
import (
"errors"
"io"
"net"
"sync"
"testing"
"time"
"github.com/iotaledger/hive.go/events"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
const graceTime = 10 * time.Millisecond
var testMsg = []byte("test")
func TestBufferedConnection(t *testing.T) {
t.Run("Close", func(t *testing.T) {
conn1, conn2 := net.Pipe()
buffConn1 := NewBufferedConnection(conn1)
defer buffConn1.Close()
buffConn2 := NewBufferedConnection(conn2)
defer buffConn2.Close()
var wg sync.WaitGroup
wg.Add(2)
buffConn2.Events.Close.Attach(events.NewClosure(func() { wg.Done() }))
go func() {
err := buffConn2.Read()
assert.True(t, errors.Is(err, io.ErrClosedPipe), "unexpected error: %s", err)
require.NoError(t, buffConn2.Close())
wg.Done()
}()
err := buffConn1.Close()
require.NoError(t, err)
wg.Wait()
})
t.Run("Write", func(t *testing.T) {
conn1, conn2 := net.Pipe()
buffConn1 := NewBufferedConnection(conn1)
defer buffConn1.Close()
buffConn2 := NewBufferedConnection(conn2)
defer buffConn2.Close()
go func() {
_ = buffConn2.Read()
}()
n, err := buffConn1.Write(testMsg)
require.NoError(t, err)
assert.EqualValues(t, len(testMsg), n)
})
t.Run("ReceiveMessage", func(t *testing.T) {
conn1, conn2 := net.Pipe()
buffConn1 := NewBufferedConnection(conn1)
defer buffConn1.Close()
buffConn2 := NewBufferedConnection(conn2)
defer buffConn2.Close()
var wg sync.WaitGroup
wg.Add(2)
buffConn2.Events.ReceiveMessage.Attach(events.NewClosure(func(data []byte) {
assert.EqualValues(t, testMsg, data)
wg.Done()
}))
go func() {
err := buffConn2.Read()
assert.True(t, errors.Is(err, io.EOF), "unexpected error: %s", err)
wg.Done()
}()
n, err := buffConn1.Write(testMsg)
require.NoError(t, err)
assert.EqualValues(t, len(testMsg), n)
time.Sleep(graceTime)
err = buffConn1.Close()
require.NoError(t, err)
wg.Wait()
})
t.Run("ReceiveMany", func(t *testing.T) {
conn1, conn2 := net.Pipe()
buffConn1 := NewBufferedConnection(conn1)
defer buffConn1.Close()
buffConn2 := NewBufferedConnection(conn2)
defer buffConn2.Close()
const numWrites = 3
var wg sync.WaitGroup
wg.Add(numWrites)
buffConn2.Events.ReceiveMessage.Attach(events.NewClosure(func(data []byte) {
assert.Equal(t, testMsg, data)
wg.Done()
}))
go func() {
_ = buffConn2.Read()
}()
for i := 1; i <= numWrites; i++ {
_, err := buffConn1.Write(testMsg)
require.NoError(t, err)
if i < numWrites {
time.Sleep(IOTimeout + graceTime)
}
}
wg.Wait()
})
t.Run("InvalidHeader", func(t *testing.T) {
conn1, conn2 := net.Pipe()
buffConn1 := NewBufferedConnection(conn1)
defer buffConn1.Close()
defer conn2.Close()
var wg sync.WaitGroup
wg.Add(1)
go func() {
err := buffConn1.Read()
assert.True(t, errors.Is(err, ErrInvalidHeader), "unexpected error: %s", err)
wg.Done()
}()
_, err := conn2.Write([]byte{0xff, 0xff, 0xff, 0xff})
require.NoError(t, err)
wg.Wait()
})
}
...@@ -106,7 +106,7 @@ func start(shutdownSignal <-chan struct{}) { ...@@ -106,7 +106,7 @@ func start(shutdownSignal <-chan struct{}) {
} }
log.Infof(name+" started: Address=%s/%s", peeringAddr.String(), peeringAddr.Network()) log.Infof(name+" started: Address=%s/%s", peeringAddr.String(), peeringAddr.Network())
log.Infof(name+" started: PubKey=%s", base64.StdEncoding.EncodeToString(lPeer.PublicKey())) log.Infof(name+" started: ID=%s PublicKey=%s", lPeer.ID(), base64.StdEncoding.EncodeToString(lPeer.PublicKey()))
<-shutdownSignal <-shutdownSignal
log.Info("Stopping " + name + " ...") log.Info("Stopping " + name + " ...")
......
...@@ -40,7 +40,7 @@ func configureGossip() { ...@@ -40,7 +40,7 @@ func configureGossip() {
log.Fatalf("could not update services: %s", err) log.Fatalf("could not update services: %s", err)
} }
mgr = gp.NewManager(lPeer, loadTransaction, log) mgr = gp.NewManager(lPeer, getTransaction, log)
} }
func start(shutdownSignal <-chan struct{}) { func start(shutdownSignal <-chan struct{}) {
...@@ -105,10 +105,13 @@ func checkConnection(srv *server.TCP, self *peer.Peer) { ...@@ -105,10 +105,13 @@ func checkConnection(srv *server.TCP, self *peer.Peer) {
wg.Wait() wg.Wait()
} }
func loadTransaction(hash []byte) ([]byte, error) { func getTransaction(hash []byte) ([]byte, error) {
log.Infof("Retrieving tx: hash=%s", hash)
tx, err := tangle.GetTransaction(typeutils.BytesToString(hash)) tx, err := tangle.GetTransaction(typeutils.BytesToString(hash))
log.Debugw("get tx from db",
"hash", hash,
"tx", tx,
"err", err,
)
if err != nil { if err != nil {
return nil, fmt.Errorf("could not get transaction: %w", err) return nil, fmt.Errorf("could not get transaction: %w", err)
} }
......
...@@ -177,15 +177,19 @@ func propagateSolidity(transactionHash trinary.Trytes) error { ...@@ -177,15 +177,19 @@ func propagateSolidity(transactionHash trinary.Trytes) error {
func processMetaTransaction(metaTransaction *meta_transaction.MetaTransaction) { func processMetaTransaction(metaTransaction *meta_transaction.MetaTransaction) {
var newTransaction bool var newTransaction bool
if tx, err := GetTransaction(metaTransaction.GetHash(), func(transactionHash trinary.Trytes) *value_transaction.ValueTransaction { tx, err := GetTransaction(metaTransaction.GetHash(), func(transactionHash trinary.Trytes) *value_transaction.ValueTransaction {
newTransaction = true newTransaction = true
tx := value_transaction.FromMetaTransaction(metaTransaction) tx := value_transaction.FromMetaTransaction(metaTransaction)
tx.SetModified(true) tx.SetModified(true)
return tx return tx
}); err != nil { })
log.Errorf("Unable to load transaction %s: %s", metaTransaction.GetHash(), err.Error()) if err != nil {
} else if newTransaction { log.Errorf("Unable to process transaction %s: %s", metaTransaction.GetHash(), err.Error())
return
}
if newTransaction {
log.Debugw("process new transaction", "hash", tx.GetHash())
updateUnsolidTxs(tx) updateUnsolidTxs(tx)
processTransaction(tx) processTransaction(tx)
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment