From 3b62158d2050b809788799b4b8108d7f392c60ef Mon Sep 17 00:00:00 2001 From: Wolfgang Welz <welzwo@gmail.com> Date: Mon, 27 Jan 2020 12:50:25 +0100 Subject: [PATCH] Fix: Buffer gossip messages before passing them to ProtoBuf (#180) * Use buffered connection for gossip * fix wrong timeout * Encode length in big endian * :loud_sound: Improve debug logging --- packages/gossip/manager.go | 12 +- packages/gossip/neighbor.go | 50 +++--- packages/gossip/neighbor_test.go | 4 +- packages/netutil/buffconn/buffconn.go | 172 +++++++++++++++++++++ packages/netutil/buffconn/buffconn_test.go | 136 ++++++++++++++++ plugins/autopeering/autopeering.go | 2 +- plugins/gossip/gossip.go | 11 +- plugins/tangle/solidifier.go | 12 +- 8 files changed, 361 insertions(+), 38 deletions(-) create mode 100644 packages/netutil/buffconn/buffconn.go create mode 100644 packages/netutil/buffconn/buffconn_test.go diff --git a/packages/gossip/manager.go b/packages/gossip/manager.go index 19933333..fd373838 100644 --- a/packages/gossip/manager.go +++ b/packages/gossip/manager.go @@ -123,6 +123,7 @@ func (m *Manager) RequestTransaction(txHash []byte, to ...peer.ID) { req := &pb.TransactionRequest{ Hash: txHash, } + m.log.Debugw("send message", "type", "TRANSACTION_REQUEST", "to", to) m.send(marshal(req), to...) } @@ -132,6 +133,7 @@ func (m *Manager) SendTransaction(txData []byte, to ...peer.ID) { tx := &pb.Transaction{ Data: txData, } + m.log.Debugw("send message", "type", "TRANSACTION", "to", to) m.send(marshal(tx), to...) } @@ -200,7 +202,7 @@ func (m *Manager) addNeighbor(peer *peer.Peer, connectorFunc func(*peer.Peer) (n // create and add the neighbor n := NewNeighbor(peer, conn, m.log) n.Events.Close.Attach(events.NewClosure(func() { _ = m.DropNeighbor(peer.ID()) })) - n.Events.ReceiveData.Attach(events.NewClosure(func(data []byte) { + n.Events.ReceiveMessage.Attach(events.NewClosure(func(data []byte) { if err := m.handlePacket(data, peer); err != nil { m.log.Debugw("error handling packet", "err", err) } @@ -227,22 +229,22 @@ func (m *Manager) handlePacket(data []byte, p *peer.Peer) error { if err := proto.Unmarshal(data[1:], msg); err != nil { return fmt.Errorf("invalid packet: %w", err) } - m.log.Debugw("Received Transaction", "data", msg.GetData()) + m.log.Debugw("received message", "type", "TRANSACTION", "id", p.ID()) Events.TransactionReceived.Trigger(&TransactionReceivedEvent{Data: msg.GetData(), Peer: p}) // Incoming Transaction request case pb.MTransactionRequest: + msg := new(pb.TransactionRequest) if err := proto.Unmarshal(data[1:], msg); err != nil { return fmt.Errorf("invalid packet: %w", err) } - m.log.Debugw("Received Tx Req", "data", msg.GetHash()) + m.log.Debugw("received message", "type", "TRANSACTION_REQUEST", "id", p.ID()) // do something tx, err := m.getTransaction(msg.GetHash()) if err != nil { - m.log.Debugw("Tx not available", "tx", msg.GetHash()) + m.log.Debugw("error getting transaction", "hash", msg.GetHash(), "err", err) } else { - m.log.Debugw("Tx found", "tx", tx) m.SendTransaction(tx, p.ID()) } diff --git a/packages/gossip/neighbor.go b/packages/gossip/neighbor.go index 6fa7566f..6674275b 100644 --- a/packages/gossip/neighbor.go +++ b/packages/gossip/neighbor.go @@ -9,19 +9,23 @@ import ( "github.com/iotaledger/goshimmer/packages/autopeering/peer" "github.com/iotaledger/goshimmer/packages/netutil" + "github.com/iotaledger/goshimmer/packages/netutil/buffconn" "github.com/iotaledger/hive.go/logger" - "github.com/iotaledger/hive.go/network" ) var ( + // ErrNeighborQueueFull is returned when the send queue is already full. ErrNeighborQueueFull = errors.New("send queue is full") ) -const neighborQueueSize = 1000 +const ( + neighborQueueSize = 1000 + maxNumReadErrors = 10 +) type Neighbor struct { *peer.Peer - *network.ManagedConnection + *buffconn.BufferedConnection log *logger.Logger queue chan []byte @@ -45,11 +49,11 @@ func NewNeighbor(peer *peer.Peer, conn net.Conn, log *logger.Logger) *Neighbor { ) return &Neighbor{ - Peer: peer, - ManagedConnection: network.NewManagedConnection(conn), - log: log, - queue: make(chan []byte, neighborQueueSize), - closing: make(chan struct{}), + Peer: peer, + BufferedConnection: buffconn.NewBufferedConnection(conn), + log: log, + queue: make(chan []byte, neighborQueueSize), + closing: make(chan struct{}), } } @@ -68,23 +72,20 @@ func (n *Neighbor) Close() error { // wait for everything to finish n.wg.Wait() - n.log.Infow("Connection closed", - "read", n.BytesRead, - "written", n.BytesWritten, - ) + n.log.Info("Connection closed") return err } // IsOutbound returns true if the neighbor is an outbound neighbor. func (n *Neighbor) IsOutbound() bool { - return GetAddress(n.Peer) == n.Conn.RemoteAddr().String() + return GetAddress(n.Peer) == n.RemoteAddr().String() } func (n *Neighbor) disconnect() (err error) { n.disconnectOnce.Do(func() { close(n.closing) close(n.queue) - err = n.ManagedConnection.Close() + err = n.BufferedConnection.Close() }) return } @@ -98,8 +99,9 @@ func (n *Neighbor) writeLoop() { if len(msg) == 0 { continue } - if _, err := n.ManagedConnection.Write(msg); err != nil { - n.log.Warn("write error", "err", err) + if _, err := n.BufferedConnection.Write(msg); err != nil { + // ignore write errors + n.log.Warn("Write error", "err", err) } case <-n.closing: return @@ -110,22 +112,26 @@ func (n *Neighbor) writeLoop() { func (n *Neighbor) readLoop() { defer n.wg.Done() - // create a buffer for the packages - b := make([]byte, maxPacketSize) - + var numReadErrors uint for { - _, err := n.ManagedConnection.Read(b) + err := n.Read() if netutil.IsTemporaryError(err) { // ignore temporary read errors. n.log.Debugw("temporary read error", "err", err) + numReadErrors++ + if numReadErrors > maxNumReadErrors { + n.log.Warnw("Too many read errors", "err", err) + _ = n.BufferedConnection.Close() + return + } continue } if err != nil { // return from the loop on all other errors if err != io.EOF && !strings.Contains(err.Error(), "use of closed network connection") { - n.log.Warnw("read error", "err", err) + n.log.Warnw("Permanent error", "err", err) } - _ = n.ManagedConnection.Close() + _ = n.BufferedConnection.Close() return } } diff --git a/packages/gossip/neighbor_test.go b/packages/gossip/neighbor_test.go index 3ad65b09..72a0f895 100644 --- a/packages/gossip/neighbor_test.go +++ b/packages/gossip/neighbor_test.go @@ -60,7 +60,7 @@ func TestNeighborWrite(t *testing.T) { defer neighborB.Close() var count uint32 - neighborB.Events.ReceiveData.Attach(events.NewClosure(func(data []byte) { + neighborB.Events.ReceiveMessage.Attach(events.NewClosure(func(data []byte) { assert.Equal(t, testData, data) atomic.AddUint32(&count, 1) })) @@ -84,7 +84,7 @@ func TestNeighborParallelWrite(t *testing.T) { defer neighborB.Close() var count uint32 - neighborB.Events.ReceiveData.Attach(events.NewClosure(func(data []byte) { + neighborB.Events.ReceiveMessage.Attach(events.NewClosure(func(data []byte) { assert.Equal(t, testData, data) atomic.AddUint32(&count, 1) })) diff --git a/packages/netutil/buffconn/buffconn.go b/packages/netutil/buffconn/buffconn.go new file mode 100644 index 00000000..e5541400 --- /dev/null +++ b/packages/netutil/buffconn/buffconn.go @@ -0,0 +1,172 @@ +package buffconn + +import ( + "encoding/binary" + "errors" + "fmt" + "net" + "sync" + "time" + + "github.com/iotaledger/hive.go/events" +) + +const ( + // MaxMessageSize is the maximum message size in bytes. + MaxMessageSize = 4096 + // IOTimeout specifies the timeout for sending and receiving multi packet messages. + IOTimeout = 4 * time.Second + + headerSize = 4 // size of the header: uint32 +) + +// Errors returned by the BufferedConnection. +var ( + ErrInvalidHeader = errors.New("invalid message header") + ErrInsufficientBuffer = errors.New("insufficient buffer") +) + +// BufferedConnectionEvents contains all the events that are triggered during the peer discovery. +type BufferedConnectionEvents struct { + ReceiveMessage *events.Event + Close *events.Event +} + +// BufferedConnection is a wrapper for sending and reading messages with a buffer. +type BufferedConnection struct { + Events BufferedConnectionEvents + + conn net.Conn + incomingHeaderBuffer []byte + closeOnce sync.Once +} + +// NewBufferedConnection creates a new BufferedConnection from a net.Conn. +func NewBufferedConnection(conn net.Conn) *BufferedConnection { + return &BufferedConnection{ + Events: BufferedConnectionEvents{ + ReceiveMessage: events.NewEvent(events.ByteSliceCaller), + Close: events.NewEvent(events.CallbackCaller), + }, + conn: conn, + incomingHeaderBuffer: make([]byte, headerSize), + } +} + +// Close closes the connection. +// Any blocked Read or Write operations will be unblocked and return errors. +func (c *BufferedConnection) Close() (err error) { + c.closeOnce.Do(func() { + err = c.conn.Close() + // close in separate go routine to avoid deadlocks + go c.Events.Close.Trigger() + }) + return err +} + +// LocalAddr returns the local network address. +func (c *BufferedConnection) LocalAddr() net.Addr { + return c.conn.LocalAddr() +} + +// RemoteAddr returns the remote network address. +func (c *BufferedConnection) RemoteAddr() net.Addr { + return c.conn.RemoteAddr() +} + +// Read starts reading on the connection, it only returns when an error occurred or when Close has been called. +// If a complete message has been received and ReceiveMessage event is triggered with its complete payload. +// If read leads to an error, the loop will be stopped and that error returned. +func (c *BufferedConnection) Read() error { + buffer := make([]byte, MaxMessageSize) + + for { + n, err := c.readMessage(buffer) + if err != nil { + return err + } + if n > 0 { + c.Events.ReceiveMessage.Trigger(buffer[:n]) + } + } +} + +// Write sends a stream of bytes as messages. +// Each array of bytes you pass in will be pre-pended with it's size. If the +// connection isn't open you will receive an error. If not all bytes can be +// written, Write will keep trying until the full message is delivered, or the +// connection is broken. +func (c *BufferedConnection) Write(msg []byte) (int, error) { + if l := len(msg); l > MaxMessageSize { + panic(fmt.Sprintf("invalid message length: %d", l)) + } + + buffer := append(newHeader(len(msg)), msg...) + + if err := c.conn.SetWriteDeadline(time.Now().Add(IOTimeout)); err != nil { + return 0, fmt.Errorf("error while setting timeout: %w", err) + } + + toWrite := len(buffer) + for bytesWritten := 0; bytesWritten < toWrite; { + n, err := c.conn.Write(buffer[bytesWritten:]) + bytesWritten += n + if err != nil { + return bytesWritten, err + } + } + return toWrite - headerSize, nil +} + +func (c *BufferedConnection) read(buffer []byte) (int, error) { + toRead := len(buffer) + for bytesRead := 0; bytesRead < toRead; { + n, err := c.conn.Read(buffer[bytesRead:]) + bytesRead += n + if err != nil { + return bytesRead, err + } + } + return toRead, nil +} + +func (c *BufferedConnection) readMessage(buffer []byte) (int, error) { + if err := c.conn.SetReadDeadline(time.Time{}); err != nil { + return 0, fmt.Errorf("error while unsetting timeout: %w", err) + } + _, err := c.read(c.incomingHeaderBuffer) + if err != nil { + return 0, err + } + + msgLength, err := parseHeader(c.incomingHeaderBuffer) + if err != nil { + return 0, err + } + if msgLength > len(buffer) { + return 0, ErrInsufficientBuffer + } + + if err := c.conn.SetReadDeadline(time.Now().Add(IOTimeout)); err != nil { + return 0, fmt.Errorf("error while setting timeout: %w", err) + } + return c.read(buffer[:msgLength]) +} + +func newHeader(msgLength int) []byte { + // the header only consists of the message length + header := make([]byte, headerSize) + binary.BigEndian.PutUint32(header, uint32(msgLength)) + return header +} + +func parseHeader(header []byte) (int, error) { + if len(header) != headerSize { + return 0, ErrInvalidHeader + } + msgLength := int(binary.BigEndian.Uint32(header)) + if msgLength > MaxMessageSize { + return 0, ErrInvalidHeader + } + return msgLength, nil +} diff --git a/packages/netutil/buffconn/buffconn_test.go b/packages/netutil/buffconn/buffconn_test.go new file mode 100644 index 00000000..e24506b4 --- /dev/null +++ b/packages/netutil/buffconn/buffconn_test.go @@ -0,0 +1,136 @@ +package buffconn + +import ( + "errors" + "io" + "net" + "sync" + "testing" + "time" + + "github.com/iotaledger/hive.go/events" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +const graceTime = 10 * time.Millisecond + +var testMsg = []byte("test") + +func TestBufferedConnection(t *testing.T) { + t.Run("Close", func(t *testing.T) { + conn1, conn2 := net.Pipe() + buffConn1 := NewBufferedConnection(conn1) + defer buffConn1.Close() + buffConn2 := NewBufferedConnection(conn2) + defer buffConn2.Close() + + var wg sync.WaitGroup + wg.Add(2) + buffConn2.Events.Close.Attach(events.NewClosure(func() { wg.Done() })) + go func() { + err := buffConn2.Read() + assert.True(t, errors.Is(err, io.ErrClosedPipe), "unexpected error: %s", err) + require.NoError(t, buffConn2.Close()) + wg.Done() + }() + + err := buffConn1.Close() + require.NoError(t, err) + wg.Wait() + }) + + t.Run("Write", func(t *testing.T) { + conn1, conn2 := net.Pipe() + buffConn1 := NewBufferedConnection(conn1) + defer buffConn1.Close() + buffConn2 := NewBufferedConnection(conn2) + defer buffConn2.Close() + + go func() { + _ = buffConn2.Read() + }() + + n, err := buffConn1.Write(testMsg) + require.NoError(t, err) + assert.EqualValues(t, len(testMsg), n) + }) + + t.Run("ReceiveMessage", func(t *testing.T) { + conn1, conn2 := net.Pipe() + buffConn1 := NewBufferedConnection(conn1) + defer buffConn1.Close() + buffConn2 := NewBufferedConnection(conn2) + defer buffConn2.Close() + + var wg sync.WaitGroup + wg.Add(2) + buffConn2.Events.ReceiveMessage.Attach(events.NewClosure(func(data []byte) { + assert.EqualValues(t, testMsg, data) + wg.Done() + })) + go func() { + err := buffConn2.Read() + assert.True(t, errors.Is(err, io.EOF), "unexpected error: %s", err) + wg.Done() + }() + + n, err := buffConn1.Write(testMsg) + require.NoError(t, err) + assert.EqualValues(t, len(testMsg), n) + + time.Sleep(graceTime) + + err = buffConn1.Close() + require.NoError(t, err) + wg.Wait() + }) + + t.Run("ReceiveMany", func(t *testing.T) { + conn1, conn2 := net.Pipe() + buffConn1 := NewBufferedConnection(conn1) + defer buffConn1.Close() + buffConn2 := NewBufferedConnection(conn2) + defer buffConn2.Close() + + const numWrites = 3 + + var wg sync.WaitGroup + wg.Add(numWrites) + buffConn2.Events.ReceiveMessage.Attach(events.NewClosure(func(data []byte) { + assert.Equal(t, testMsg, data) + wg.Done() + })) + go func() { + _ = buffConn2.Read() + }() + + for i := 1; i <= numWrites; i++ { + _, err := buffConn1.Write(testMsg) + require.NoError(t, err) + if i < numWrites { + time.Sleep(IOTimeout + graceTime) + } + } + wg.Wait() + }) + + t.Run("InvalidHeader", func(t *testing.T) { + conn1, conn2 := net.Pipe() + buffConn1 := NewBufferedConnection(conn1) + defer buffConn1.Close() + defer conn2.Close() + + var wg sync.WaitGroup + wg.Add(1) + go func() { + err := buffConn1.Read() + assert.True(t, errors.Is(err, ErrInvalidHeader), "unexpected error: %s", err) + wg.Done() + }() + + _, err := conn2.Write([]byte{0xff, 0xff, 0xff, 0xff}) + require.NoError(t, err) + wg.Wait() + }) +} diff --git a/plugins/autopeering/autopeering.go b/plugins/autopeering/autopeering.go index 18d04fc0..5f2ee94b 100644 --- a/plugins/autopeering/autopeering.go +++ b/plugins/autopeering/autopeering.go @@ -106,7 +106,7 @@ func start(shutdownSignal <-chan struct{}) { } log.Infof(name+" started: Address=%s/%s", peeringAddr.String(), peeringAddr.Network()) - log.Infof(name+" started: PubKey=%s", base64.StdEncoding.EncodeToString(lPeer.PublicKey())) + log.Infof(name+" started: ID=%s PublicKey=%s", lPeer.ID(), base64.StdEncoding.EncodeToString(lPeer.PublicKey())) <-shutdownSignal log.Info("Stopping " + name + " ...") diff --git a/plugins/gossip/gossip.go b/plugins/gossip/gossip.go index 63536632..db2c609d 100644 --- a/plugins/gossip/gossip.go +++ b/plugins/gossip/gossip.go @@ -40,7 +40,7 @@ func configureGossip() { log.Fatalf("could not update services: %s", err) } - mgr = gp.NewManager(lPeer, loadTransaction, log) + mgr = gp.NewManager(lPeer, getTransaction, log) } func start(shutdownSignal <-chan struct{}) { @@ -105,10 +105,13 @@ func checkConnection(srv *server.TCP, self *peer.Peer) { wg.Wait() } -func loadTransaction(hash []byte) ([]byte, error) { - log.Infof("Retrieving tx: hash=%s", hash) - +func getTransaction(hash []byte) ([]byte, error) { tx, err := tangle.GetTransaction(typeutils.BytesToString(hash)) + log.Debugw("get tx from db", + "hash", hash, + "tx", tx, + "err", err, + ) if err != nil { return nil, fmt.Errorf("could not get transaction: %w", err) } diff --git a/plugins/tangle/solidifier.go b/plugins/tangle/solidifier.go index 098b30ba..7d89db6e 100644 --- a/plugins/tangle/solidifier.go +++ b/plugins/tangle/solidifier.go @@ -177,15 +177,19 @@ func propagateSolidity(transactionHash trinary.Trytes) error { func processMetaTransaction(metaTransaction *meta_transaction.MetaTransaction) { var newTransaction bool - if tx, err := GetTransaction(metaTransaction.GetHash(), func(transactionHash trinary.Trytes) *value_transaction.ValueTransaction { + tx, err := GetTransaction(metaTransaction.GetHash(), func(transactionHash trinary.Trytes) *value_transaction.ValueTransaction { newTransaction = true tx := value_transaction.FromMetaTransaction(metaTransaction) tx.SetModified(true) return tx - }); err != nil { - log.Errorf("Unable to load transaction %s: %s", metaTransaction.GetHash(), err.Error()) - } else if newTransaction { + }) + if err != nil { + log.Errorf("Unable to process transaction %s: %s", metaTransaction.GetHash(), err.Error()) + return + } + if newTransaction { + log.Debugw("process new transaction", "hash", tx.GetHash()) updateUnsolidTxs(tx) processTransaction(tx) } -- GitLab