From e7e4f19ee56f47b3fa5a89444fe2d7bff610f2ca Mon Sep 17 00:00:00 2001 From: Wolfgang Welz <welzwo@gmail.com> Date: Fri, 10 Jan 2020 11:08:56 +0100 Subject: [PATCH] Fix potential race conditions in gossip (#94) * fix: use queue for neighbor connection * refactor: do not request transactions via event * fix: remove unused connection * chore: fix linter warnings * feat: improve gossip events and logs * fix: log after connection is closed --- .../autopeering/discover/protocol_test.go | 8 +- .../autopeering/selection/protocol_test.go | 18 +- packages/autopeering/server/server_test.go | 6 +- packages/gossip/common.go | 20 ++ packages/gossip/errors.go | 1 + packages/gossip/events.go | 32 +-- packages/gossip/manager.go | 143 ++++------ packages/gossip/manager_test.go | 262 +++++++++++------- packages/gossip/neighbor.go | 145 ++++++++++ packages/gossip/neighbor_test.go | 152 ++++++++++ packages/gossip/server/connection.go | 29 -- packages/gossip/server/server.go | 10 +- packages/gossip/server/server_test.go | 15 +- plugins/autopeering/plugin.go | 12 +- plugins/gossip/gossip.go | 13 +- plugins/gossip/plugin.go | 47 ++-- plugins/statusscreen/plugin.go | 10 +- plugins/tangle/events.go | 10 +- plugins/tangle/plugin.go | 10 + plugins/tangle/solidifier.go | 23 +- plugins/tangle/solidifier_test.go | 20 +- plugins/webapi/routes.go | 9 - 22 files changed, 652 insertions(+), 343 deletions(-) create mode 100644 packages/gossip/common.go create mode 100644 packages/gossip/neighbor.go create mode 100644 packages/gossip/neighbor_test.go delete mode 100644 packages/gossip/server/connection.go delete mode 100644 plugins/webapi/routes.go diff --git a/packages/autopeering/discover/protocol_test.go b/packages/autopeering/discover/protocol_test.go index f7c5d328..7f6432f7 100644 --- a/packages/autopeering/discover/protocol_test.go +++ b/packages/autopeering/discover/protocol_test.go @@ -28,17 +28,17 @@ func init() { // newTest creates a new discovery server and also returns the teardown. func newTest(t require.TestingT, trans transport.Transport, logger *logger.Logger, masters ...*peer.Peer) (*server.Server, *Protocol, func()) { - log := logger.Named(trans.LocalAddr().String()) - db := peer.NewMemoryDB(log.Named("db")) + l := logger.Named(trans.LocalAddr().String()) + db := peer.NewMemoryDB(l.Named("db")) local, err := peer.NewLocal(trans.LocalAddr().Network(), trans.LocalAddr().String(), db) require.NoError(t, err) cfg := Config{ - Log: log, + Log: l, MasterPeers: masters, } prot := New(local, cfg) - srv := server.Listen(local, trans, log.Named("srv"), prot) + srv := server.Listen(local, trans, l.Named("srv"), prot) prot.Start(srv) teardown := func() { diff --git a/packages/autopeering/selection/protocol_test.go b/packages/autopeering/selection/protocol_test.go index 37871352..8caf9174 100644 --- a/packages/autopeering/selection/protocol_test.go +++ b/packages/autopeering/selection/protocol_test.go @@ -30,8 +30,8 @@ func (d dummyDiscovery) GetVerifiedPeers() []*peer.Peer { retur // newTest creates a new neighborhood server and also returns the teardown. func newTest(t require.TestingT, trans transport.Transport) (*server.Server, *Protocol, func()) { - log := log.Named(trans.LocalAddr().String()) - db := peer.NewMemoryDB(log.Named("db")) + l := log.Named(trans.LocalAddr().String()) + db := peer.NewMemoryDB(l.Named("db")) local, err := peer.NewLocal(trans.LocalAddr().Network(), trans.LocalAddr().String(), db) require.NoError(t, err) @@ -39,10 +39,10 @@ func newTest(t require.TestingT, trans transport.Transport) (*server.Server, *Pr peerMap[local.ID()] = &local.Peer cfg := Config{ - Log: log, + Log: l, } prot := New(local, dummyDiscovery{}, cfg) - srv := server.Listen(local, trans, log.Named("srv"), prot) + srv := server.Listen(local, trans, l.Named("srv"), prot) prot.Start(srv) teardown := func() { @@ -130,20 +130,20 @@ func TestProtDropPeer(t *testing.T) { // newTest creates a new server handling discover as well as neighborhood and also returns the teardown. func newFullTest(t require.TestingT, trans transport.Transport, masterPeers ...*peer.Peer) (*server.Server, *Protocol, func()) { - log := log.Named(trans.LocalAddr().String()) - db := peer.NewMemoryDB(log.Named("db")) + l := log.Named(trans.LocalAddr().String()) + db := peer.NewMemoryDB(l.Named("db")) local, err := peer.NewLocal(trans.LocalAddr().Network(), trans.LocalAddr().String(), db) require.NoError(t, err) discovery := discover.New(local, discover.Config{ - Log: log.Named("disc"), + Log: l.Named("disc"), MasterPeers: masterPeers, }) selection := New(local, discovery, Config{ - Log: log.Named("sel"), + Log: l.Named("sel"), }) - srv := server.Listen(local, trans, log.Named("srv"), discovery, selection) + srv := server.Listen(local, trans, l.Named("srv"), discovery, selection) discovery.Start(srv) selection.Start(srv) diff --git a/packages/autopeering/server/server_test.go b/packages/autopeering/server/server_test.go index 3e784af5..f3cff2c8 100644 --- a/packages/autopeering/server/server_test.go +++ b/packages/autopeering/server/server_test.go @@ -113,8 +113,8 @@ func TestSrvEncodeDecodePing(t *testing.T) { } func newTestServer(t require.TestingT, name string, trans transport.Transport) (*Server, func()) { - log := log.Named(name) - db := peer.NewMemoryDB(log.Named("db")) + l := log.Named(name) + db := peer.NewMemoryDB(l.Named("db")) local, err := peer.NewLocal(trans.LocalAddr().Network(), trans.LocalAddr().String(), db) require.NoError(t, err) @@ -123,7 +123,7 @@ func newTestServer(t require.TestingT, name string, trans transport.Transport) ( s, _ = salt.NewSalt(100 * time.Second) local.SetPublicSalt(s) - srv := Listen(local, trans, log, HandlerFunc(handle)) + srv := Listen(local, trans, l, HandlerFunc(handle)) teardown := func() { srv.Close() diff --git a/packages/gossip/common.go b/packages/gossip/common.go new file mode 100644 index 00000000..6f48f378 --- /dev/null +++ b/packages/gossip/common.go @@ -0,0 +1,20 @@ +package gossip + +import ( + "github.com/iotaledger/goshimmer/packages/autopeering/peer" + "github.com/iotaledger/goshimmer/packages/autopeering/peer/service" +) + +// IsSupported returns whether the peer supports the gossip service. +func IsSupported(p *peer.Peer) bool { + return p.Services().Get(service.GossipKey) != nil +} + +// GetAddress returns the address of the gossip service. +func GetAddress(p *peer.Peer) string { + gossip := p.Services().Get(service.GossipKey) + if gossip == nil { + panic("peer does not support gossip") + } + return gossip.String() +} diff --git a/packages/gossip/errors.go b/packages/gossip/errors.go index 160ed551..e32a1530 100644 --- a/packages/gossip/errors.go +++ b/packages/gossip/errors.go @@ -7,4 +7,5 @@ var ( ErrClosed = errors.New("manager closed") ErrNotANeighbor = errors.New("peer is not a neighbor") ErrDuplicateNeighbor = errors.New("peer already connected") + ErrInvalidPacket = errors.New("invalid packet") ) diff --git a/packages/gossip/events.go b/packages/gossip/events.go index b7f82934..169ec219 100644 --- a/packages/gossip/events.go +++ b/packages/gossip/events.go @@ -3,25 +3,23 @@ package gossip import ( "github.com/iotaledger/goshimmer/packages/autopeering/peer" "github.com/iotaledger/hive.go/events" - "github.com/iotaledger/iota.go/trinary" ) // Events contains all the events related to the gossip protocol. var Events = struct { - // A NeighborDropped event is triggered when a neighbor has been dropped. - NeighborDropped *events.Event + // A ConnectionFailed event is triggered when a neighbor connection to a peer could not be established. + ConnectionFailed *events.Event + // A NeighborAdded event is triggered when a connection to a new neighbor has been established. + NeighborAdded *events.Event + // A NeighborRemoved event is triggered when a neighbor has been dropped. + NeighborRemoved *events.Event // A TransactionReceived event is triggered when a new transaction is received by the gossip protocol. TransactionReceived *events.Event - // A RequestTransaction should be triggered for a transaction to be requested through the gossip protocol. - RequestTransaction *events.Event }{ - NeighborDropped: events.NewEvent(neighborDropped), + ConnectionFailed: events.NewEvent(peerCaller), + NeighborAdded: events.NewEvent(neighborCaller), + NeighborRemoved: events.NewEvent(peerCaller), TransactionReceived: events.NewEvent(transactionReceived), - RequestTransaction: events.NewEvent(requestTransaction), -} - -type NeighborDroppedEvent struct { - Peer *peer.Peer } type TransactionReceivedEvent struct { @@ -29,18 +27,14 @@ type TransactionReceivedEvent struct { Peer *peer.Peer // peer that send the transaction } -type RequestTransactionEvent struct { - Hash trinary.Trytes // hash of the transaction to request +func peerCaller(handler interface{}, params ...interface{}) { + handler.(func(*peer.Peer))(params[0].(*peer.Peer)) } -func neighborDropped(handler interface{}, params ...interface{}) { - handler.(func(*NeighborDroppedEvent))(params[0].(*NeighborDroppedEvent)) +func neighborCaller(handler interface{}, params ...interface{}) { + handler.(func(*Neighbor))(params[0].(*Neighbor)) } func transactionReceived(handler interface{}, params ...interface{}) { handler.(func(*TransactionReceivedEvent))(params[0].(*TransactionReceivedEvent)) } - -func requestTransaction(handler interface{}, params ...interface{}) { - handler.(func(*RequestTransactionEvent))(params[0].(*RequestTransactionEvent)) -} diff --git a/packages/gossip/manager.go b/packages/gossip/manager.go index 6e600939..1050243f 100644 --- a/packages/gossip/manager.go +++ b/packages/gossip/manager.go @@ -1,17 +1,15 @@ package gossip import ( - "io" "net" - "strings" "sync" - "github.com/iotaledger/goshimmer/packages/autopeering/peer/service" - "github.com/golang/protobuf/proto" "github.com/iotaledger/goshimmer/packages/autopeering/peer" + "github.com/iotaledger/goshimmer/packages/autopeering/peer/service" pb "github.com/iotaledger/goshimmer/packages/gossip/proto" "github.com/iotaledger/goshimmer/packages/gossip/server" + "github.com/iotaledger/hive.go/events" "github.com/pkg/errors" "go.uber.org/zap" ) @@ -32,22 +30,17 @@ type Manager struct { mu sync.RWMutex srv *server.TCP - neighbors map[peer.ID]*neighbor + neighbors map[peer.ID]*Neighbor running bool } -type neighbor struct { - peer *peer.Peer - conn *server.Connection -} - func NewManager(local *peer.Local, f GetTransaction, log *zap.SugaredLogger) *Manager { return &Manager{ local: local, getTransaction: f, log: log, srv: nil, - neighbors: make(map[peer.ID]*neighbor), + neighbors: make(map[peer.ID]*Neighbor), running: false, } } @@ -62,15 +55,20 @@ func (m *Manager) Start(srv *server.TCP) { // Close stops the manager and closes all established connections. func (m *Manager) Close() { + m.stop() + m.wg.Wait() +} + +func (m *Manager) stop() { m.mu.Lock() + defer m.mu.Unlock() + m.running = false - // close all connections - for _, n := range m.neighbors { - _ = n.conn.Close() - } - m.mu.Unlock() - m.wg.Wait() + // close all neighbor connections + for _, nbr := range m.neighbors { + _ = nbr.Close() + } } // LocalAddr returns the public address of the gossip service. @@ -104,7 +102,7 @@ func (m *Manager) AddInbound(p *peer.Peer) error { return m.addNeighbor(p, srv.AcceptPeer) } -// NeighborDropped disconnects the neighbor with the given ID. +// NeighborRemoved disconnects the neighbor with the given ID. func (m *Manager) DropNeighbor(id peer.ID) error { m.mu.Lock() defer m.mu.Unlock() @@ -113,9 +111,10 @@ func (m *Manager) DropNeighbor(id peer.ID) error { } n := m.neighbors[id] delete(m.neighbors, id) - disconnect(n.conn) - return nil + err := n.Close() + Events.NeighborRemoved.Trigger(n.Peer) + return err } // RequestTransaction requests the transaction with the given hash from the neighbors. @@ -127,8 +126,8 @@ func (m *Manager) RequestTransaction(txHash []byte, to ...peer.ID) { m.send(marshal(req), to...) } -// SendTransaction sends the given transaction data to the neighbors. -// If no peer is provided, it is send to all neighbors. +// SendTransaction adds the given transaction data to the send queue of the neighbors. +// The actual send then happens asynchronously. If no peer is provided, it is send to all neighbors. func (m *Manager) SendTransaction(txData []byte, to ...peer.ID) { tx := &pb.Transaction{ Data: txData, @@ -136,16 +135,16 @@ func (m *Manager) SendTransaction(txData []byte, to ...peer.ID) { m.send(marshal(tx), to...) } -func (m *Manager) getNeighbors(ids ...peer.ID) []*neighbor { +func (m *Manager) getNeighbors(ids ...peer.ID) []*Neighbor { if len(ids) > 0 { return m.getNeighborsById(ids) } return m.getAllNeighbors() } -func (m *Manager) getAllNeighbors() []*neighbor { +func (m *Manager) getAllNeighbors() []*Neighbor { m.mu.RLock() - result := make([]*neighbor, 0, len(m.neighbors)) + result := make([]*Neighbor, 0, len(m.neighbors)) for _, n := range m.neighbors { result = append(result, n) } @@ -154,8 +153,8 @@ func (m *Manager) getAllNeighbors() []*neighbor { return result } -func (m *Manager) getNeighborsById(ids []peer.ID) []*neighbor { - result := make([]*neighbor, 0, len(ids)) +func (m *Manager) getNeighborsById(ids []peer.ID) []*Neighbor { + result := make([]*Neighbor, 0, len(ids)) m.mu.RLock() for _, id := range ids { @@ -168,102 +167,74 @@ func (m *Manager) getNeighborsById(ids []peer.ID) []*neighbor { return result } -func (m *Manager) send(msg []byte, to ...peer.ID) { - if l := len(msg); l > maxPacketSize { - m.log.Errorw("message too large", "len", l, "max", maxPacketSize) - } +func (m *Manager) send(b []byte, to ...peer.ID) { neighbors := m.getNeighbors(to...) for _, nbr := range neighbors { - m.log.Debugw("Sending", "to", nbr.peer.ID(), "msg", msg) - _, err := nbr.conn.Write(msg) - if err != nil { - m.log.Debugw("send error", "err", err) + if _, err := nbr.Write(b); err != nil { + m.log.Warnw("send error", "err", err) } } } -func (m *Manager) addNeighbor(peer *peer.Peer, connectorFunc func(*peer.Peer) (*server.Connection, error)) error { +func (m *Manager) addNeighbor(peer *peer.Peer, connectorFunc func(*peer.Peer) (net.Conn, error)) error { conn, err := connectorFunc(peer) if err != nil { - m.log.Debugw("addNeighbor failed", "peer", peer.ID(), "err", err) - Events.NeighborDropped.Trigger(&NeighborDroppedEvent{Peer: peer}) + Events.ConnectionFailed.Trigger(peer) return err } m.mu.Lock() defer m.mu.Unlock() if !m.running { - disconnect(conn) + _ = conn.Close() + Events.ConnectionFailed.Trigger(peer) return ErrClosed } if _, ok := m.neighbors[peer.ID()]; ok { - disconnect(conn) + _ = conn.Close() + Events.ConnectionFailed.Trigger(peer) return ErrDuplicateNeighbor } - // add the neighbor - n := &neighbor{ - peer: peer, - conn: conn, - } + // create and add the neighbor + n := NewNeighbor(peer, conn, m.log) + n.Events.Close.Attach(events.NewClosure(func() { _ = m.DropNeighbor(peer.ID()) })) + n.Events.ReceiveData.Attach(events.NewClosure(func(data []byte) { + if err := m.handlePacket(data, peer); err != nil { + m.log.Debugw("error handling packet", "err", err) + } + })) + m.neighbors[peer.ID()] = n - go m.readLoop(n) + n.Listen() + Events.NeighborAdded.Trigger(n) return nil } -func (m *Manager) readLoop(nbr *neighbor) { - m.wg.Add(1) - defer m.wg.Done() - - // create a buffer for the packages - b := make([]byte, maxPacketSize) - - for { - n, err := nbr.conn.Read(b) - if nerr, ok := err.(net.Error); ok && nerr.Temporary() { - // ignore temporary read errors. - m.log.Debugw("temporary read error", "err", err) - continue - } else if err != nil { - // return from the loop on all other errors - if err != io.EOF && !strings.Contains(err.Error(), "use of closed network connection") { - m.log.Warnw("read error", "err", err) - } - - m.log.Debugw("connection closed", - "id", nbr.peer.ID(), - "addr", nbr.conn.RemoteAddr().String(), - ) - _ = nbr.conn.Close() // just make sure that the connection is closed as fast as possible - _ = m.DropNeighbor(nbr.peer.ID()) - return - } - - if err := m.handlePacket(b[:n], nbr); err != nil { - m.log.Warnw("failed to handle packet", "id", nbr.peer.ID(), "err", err) - } +func (m *Manager) handlePacket(data []byte, p *peer.Peer) error { + // ignore empty packages + if len(data) == 0 { + return nil } -} -func (m *Manager) handlePacket(data []byte, n *neighbor) error { switch pb.MType(data[0]) { // Incoming Transaction case pb.MTransaction: msg := new(pb.Transaction) if err := proto.Unmarshal(data[1:], msg); err != nil { - return errors.Wrap(err, "invalid message") + return errors.Wrap(err, "invalid packet") } m.log.Debugw("Received Transaction", "data", msg.GetData()) - Events.TransactionReceived.Trigger(&TransactionReceivedEvent{Data: msg.GetData(), Peer: n.peer}) + Events.TransactionReceived.Trigger(&TransactionReceivedEvent{Data: msg.GetData(), Peer: p}) // Incoming Transaction request case pb.MTransactionRequest: msg := new(pb.TransactionRequest) if err := proto.Unmarshal(data[1:], msg); err != nil { - return errors.Wrap(err, "invalid message") + return errors.Wrap(err, "invalid packet") } m.log.Debugw("Received Tx Req", "data", msg.GetHash()) // do something @@ -272,10 +243,11 @@ func (m *Manager) handlePacket(data []byte, n *neighbor) error { m.log.Debugw("Tx not available", "tx", msg.GetHash()) } else { m.log.Debugw("Tx found", "tx", tx) - m.SendTransaction(tx, n.peer.ID()) + m.SendTransaction(tx, p.ID()) } default: + return ErrInvalidPacket } return nil @@ -293,8 +265,3 @@ func marshal(msg pb.Message) []byte { } return append([]byte{byte(mType)}, data...) } - -func disconnect(conn *server.Connection) { - _ = conn.Close() - Events.NeighborDropped.Trigger(&NeighborDroppedEvent{Peer: conn.Peer()}) -} diff --git a/packages/gossip/manager_test.go b/packages/gossip/manager_test.go index 29a5634c..eb86bf01 100644 --- a/packages/gossip/manager_test.go +++ b/packages/gossip/manager_test.go @@ -1,7 +1,6 @@ package gossip import ( - "log" "net" "sync" "testing" @@ -13,54 +12,25 @@ import ( pb "github.com/iotaledger/goshimmer/packages/gossip/proto" "github.com/iotaledger/goshimmer/packages/gossip/server" "github.com/iotaledger/hive.go/events" + "github.com/iotaledger/hive.go/logger" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" - "go.uber.org/zap" ) const graceTime = 10 * time.Millisecond var ( - logger *zap.SugaredLogger - eventMock mock.Mock - + log = logger.NewExampleLogger("gossip") testTxData = []byte("testTx") ) -func transactionReceivedEvent(ev *TransactionReceivedEvent) { eventMock.Called(ev) } -func neighborDroppedEvent(ev *NeighborDroppedEvent) { eventMock.Called(ev) } - -// assertEvents initializes the mock and asserts the expectations -func assertEvents(t *testing.T) func() { - eventMock = mock.Mock{} - return func() { - if !t.Failed() { - eventMock.AssertExpectations(t) - } - } -} - -func init() { - l, err := zap.NewDevelopment() - if err != nil { - log.Fatalf("cannot initialize logger: %v", err) - } - logger = l.Sugar() - - // mock the events triggered by the gossip - Events.TransactionReceived.Attach(events.NewClosure(transactionReceivedEvent)) - Events.NeighborDropped.Attach(events.NewClosure(neighborDroppedEvent)) -} - -func getTestTransaction([]byte) ([]byte, error) { - return testTxData, nil -} +func getTestTransaction([]byte) ([]byte, error) { return testTxData, nil } func getTCPAddress(t require.TestingT) string { - laddr, err := net.ResolveTCPAddr("tcp", "localhost:0") + tcpAddr, err := net.ResolveTCPAddr("tcp", "localhost:0") require.NoError(t, err) - lis, err := net.ListenTCP("tcp", laddr) + lis, err := net.ListenTCP("tcp", tcpAddr) require.NoError(t, err) addr := lis.Addr().String() @@ -69,8 +39,8 @@ func getTCPAddress(t require.TestingT) string { return addr } -func newTest(t require.TestingT, name string) (*Manager, func(), *peer.Peer) { - l := logger.Named(name) +func newTestManager(t require.TestingT, name string) (*Manager, func(), *peer.Peer) { + l := log.Named(name) db := peer.NewMemoryDB(l.Named("db")) local, err := peer.NewLocal("peering", name, db) require.NoError(t, err) @@ -89,31 +59,36 @@ func newTest(t require.TestingT, name string) (*Manager, func(), *peer.Peer) { // start the actual gossipping mgr.Start(srv) - teardown := func() { + detach := func() { mgr.Close() srv.Close() db.Close() } - return mgr, teardown, &local.Peer + return mgr, detach, &local.Peer } func TestClose(t *testing.T) { - defer assertEvents(t)() + _, detach := newEventMock(t) + defer detach() - _, teardown, _ := newTest(t, "A") + _, teardown, _ := newTestManager(t, "A") teardown() } func TestClosedConnection(t *testing.T) { - defer assertEvents(t)() + e, detach := newEventMock(t) + defer detach() - mgrA, closeA, peerA := newTest(t, "A") + mgrA, closeA, peerA := newTestManager(t, "A") defer closeA() - mgrB, closeB, peerB := newTest(t, "B") + mgrB, closeB, peerB := newTestManager(t, "B") defer closeB() + connections := 2 + e.On("neighborAdded", mock.Anything).Times(connections) + var wg sync.WaitGroup - wg.Add(2) + wg.Add(connections) // connect in the following way // B -> A @@ -132,8 +107,8 @@ func TestClosedConnection(t *testing.T) { // wait for the connections to establish wg.Wait() - eventMock.On("neighborDroppedEvent", &NeighborDroppedEvent{Peer: peerA}).Once() - eventMock.On("neighborDroppedEvent", &NeighborDroppedEvent{Peer: peerB}).Once() + e.On("neighborRemoved", peerA).Once() + e.On("neighborRemoved", peerB).Once() // A drops B err := mgrA.DropNeighbor(peerB.ID()) @@ -141,19 +116,21 @@ func TestClosedConnection(t *testing.T) { time.Sleep(graceTime) // the events should be there even before we close - eventMock.AssertExpectations(t) + e.AssertExpectations(t) } func TestP2PSend(t *testing.T) { - defer assertEvents(t)() + e, detach := newEventMock(t) + defer detach() - mgrA, closeA, peerA := newTest(t, "A") - defer closeA() - mgrB, closeB, peerB := newTest(t, "B") - defer closeB() + mgrA, closeA, peerA := newTestManager(t, "A") + mgrB, closeB, peerB := newTestManager(t, "B") + + connections := 2 + e.On("neighborAdded", mock.Anything).Times(connections) var wg sync.WaitGroup - wg.Add(2) + wg.Add(connections) // connect in the following way // B -> A @@ -172,27 +149,36 @@ func TestP2PSend(t *testing.T) { // wait for the connections to establish wg.Wait() - eventMock.On("transactionReceivedEvent", &TransactionReceivedEvent{ + e.On("transactionReceived", &TransactionReceivedEvent{ Data: testTxData, Peer: peerA, }).Once() - eventMock.On("neighborDroppedEvent", &NeighborDroppedEvent{Peer: peerA}).Once() - eventMock.On("neighborDroppedEvent", &NeighborDroppedEvent{Peer: peerB}).Once() mgrA.SendTransaction(testTxData) time.Sleep(graceTime) + + e.On("neighborRemoved", peerA).Once() + e.On("neighborRemoved", peerB).Once() + + closeA() + closeB() + time.Sleep(graceTime) + + e.AssertExpectations(t) } func TestP2PSendTwice(t *testing.T) { - defer assertEvents(t)() + e, detach := newEventMock(t) + defer detach() - mgrA, closeA, peerA := newTest(t, "A") - defer closeA() - mgrB, closeB, peerB := newTest(t, "B") - defer closeB() + mgrA, closeA, peerA := newTestManager(t, "A") + mgrB, closeB, peerB := newTestManager(t, "B") + + connections := 2 + e.On("neighborAdded", mock.Anything).Times(connections) var wg sync.WaitGroup - wg.Add(2) + wg.Add(connections) // connect in the following way // B -> A @@ -211,31 +197,39 @@ func TestP2PSendTwice(t *testing.T) { // wait for the connections to establish wg.Wait() - eventMock.On("transactionReceivedEvent", &TransactionReceivedEvent{ + e.On("transactionReceived", &TransactionReceivedEvent{ Data: testTxData, Peer: peerA, }).Twice() - eventMock.On("neighborDroppedEvent", &NeighborDroppedEvent{Peer: peerA}).Once() - eventMock.On("neighborDroppedEvent", &NeighborDroppedEvent{Peer: peerB}).Once() mgrA.SendTransaction(testTxData) time.Sleep(1 * time.Second) // wait a bit between the sends, to test timeouts mgrA.SendTransaction(testTxData) time.Sleep(graceTime) + + e.On("neighborRemoved", peerA).Once() + e.On("neighborRemoved", peerB).Once() + + closeA() + closeB() + time.Sleep(graceTime) + + e.AssertExpectations(t) } func TestBroadcast(t *testing.T) { - defer assertEvents(t)() + e, detach := newEventMock(t) + defer detach() - mgrA, closeA, peerA := newTest(t, "A") - defer closeA() - mgrB, closeB, peerB := newTest(t, "B") - defer closeB() - mgrC, closeC, peerC := newTest(t, "C") - defer closeC() + mgrA, closeA, peerA := newTestManager(t, "A") + mgrB, closeB, peerB := newTestManager(t, "B") + mgrC, closeC, peerC := newTestManager(t, "C") + + connections := 4 + e.On("neighborAdded", mock.Anything).Times(connections) var wg sync.WaitGroup - wg.Add(4) + wg.Add(connections) // connect in the following way // B -> A <- C @@ -264,30 +258,39 @@ func TestBroadcast(t *testing.T) { // wait for the connections to establish wg.Wait() - eventMock.On("transactionReceivedEvent", &TransactionReceivedEvent{ + e.On("transactionReceived", &TransactionReceivedEvent{ Data: testTxData, Peer: peerA, }).Twice() - eventMock.On("neighborDroppedEvent", &NeighborDroppedEvent{Peer: peerA}).Twice() - eventMock.On("neighborDroppedEvent", &NeighborDroppedEvent{Peer: peerB}).Once() - eventMock.On("neighborDroppedEvent", &NeighborDroppedEvent{Peer: peerC}).Once() mgrA.SendTransaction(testTxData) time.Sleep(graceTime) + + e.On("neighborRemoved", peerA).Twice() + e.On("neighborRemoved", peerB).Once() + e.On("neighborRemoved", peerC).Once() + + closeA() + closeB() + closeC() + time.Sleep(graceTime) + + e.AssertExpectations(t) } func TestSingleSend(t *testing.T) { - defer assertEvents(t)() + e, detach := newEventMock(t) + defer detach() - mgrA, closeA, peerA := newTest(t, "A") - defer closeA() - mgrB, closeB, peerB := newTest(t, "B") - defer closeB() - mgrC, closeC, peerC := newTest(t, "C") - defer closeC() + mgrA, closeA, peerA := newTestManager(t, "A") + mgrB, closeB, peerB := newTestManager(t, "B") + mgrC, closeC, peerC := newTestManager(t, "C") + + connections := 4 + e.On("neighborAdded", mock.Anything).Times(connections) var wg sync.WaitGroup - wg.Add(4) + wg.Add(connections) // connect in the following way // B -> A <- C @@ -316,45 +319,56 @@ func TestSingleSend(t *testing.T) { // wait for the connections to establish wg.Wait() - eventMock.On("transactionReceivedEvent", &TransactionReceivedEvent{ + e.On("transactionReceived", &TransactionReceivedEvent{ Data: testTxData, Peer: peerA, }).Once() - eventMock.On("neighborDroppedEvent", &NeighborDroppedEvent{Peer: peerA}).Twice() - eventMock.On("neighborDroppedEvent", &NeighborDroppedEvent{Peer: peerB}).Once() - eventMock.On("neighborDroppedEvent", &NeighborDroppedEvent{Peer: peerC}).Once() // A sends the transaction only to B mgrA.SendTransaction(testTxData, peerB.ID()) time.Sleep(graceTime) + + e.On("neighborRemoved", peerA).Twice() + e.On("neighborRemoved", peerB).Once() + e.On("neighborRemoved", peerC).Once() + + closeA() + closeB() + closeC() + time.Sleep(graceTime) + + e.AssertExpectations(t) } func TestDropUnsuccessfulAccept(t *testing.T) { - defer assertEvents(t)() + e, detach := newEventMock(t) + defer detach() - mgrA, closeA, _ := newTest(t, "A") + mgrA, closeA, _ := newTestManager(t, "A") defer closeA() - _, closeB, peerB := newTest(t, "B") + _, closeB, peerB := newTestManager(t, "B") defer closeB() - eventMock.On("neighborDroppedEvent", &NeighborDroppedEvent{ - Peer: peerB, - }).Once() + e.On("connectionFailed", peerB).Once() err := mgrA.AddInbound(peerB) assert.Error(t, err) + + e.AssertExpectations(t) } func TestTxRequest(t *testing.T) { - defer assertEvents(t)() + e, detach := newEventMock(t) + defer detach() - mgrA, closeA, peerA := newTest(t, "A") - defer closeA() - mgrB, closeB, peerB := newTest(t, "B") - defer closeB() + mgrA, closeA, peerA := newTestManager(t, "A") + mgrB, closeB, peerB := newTestManager(t, "B") + + connections := 2 + e.On("neighborAdded", mock.Anything).Times(connections) var wg sync.WaitGroup - wg.Add(2) + wg.Add(connections) // connect in the following way // B -> A @@ -375,15 +389,53 @@ func TestTxRequest(t *testing.T) { txHash := []byte("Hello!") - eventMock.On("transactionReceivedEvent", &TransactionReceivedEvent{ + e.On("transactionReceived", &TransactionReceivedEvent{ Data: testTxData, Peer: peerB, }).Once() - eventMock.On("neighborDroppedEvent", &NeighborDroppedEvent{Peer: peerA}).Once() - eventMock.On("neighborDroppedEvent", &NeighborDroppedEvent{Peer: peerB}).Once() b, err := proto.Marshal(&pb.TransactionRequest{Hash: txHash}) require.NoError(t, err) mgrA.RequestTransaction(b) time.Sleep(graceTime) + + e.On("neighborRemoved", peerA).Once() + e.On("neighborRemoved", peerB).Once() + + closeA() + closeB() + time.Sleep(graceTime) + + e.AssertExpectations(t) } + +func newEventMock(t mock.TestingT) (*eventMock, func()) { + e := &eventMock{} + e.Test(t) + + connectionFailedC := events.NewClosure(e.connectionFailed) + neighborAddedC := events.NewClosure(e.neighborAdded) + neighborRemoved := events.NewClosure(e.neighborRemoved) + transactionReceivedC := events.NewClosure(e.transactionReceived) + + Events.ConnectionFailed.Attach(connectionFailedC) + Events.NeighborAdded.Attach(neighborAddedC) + Events.NeighborRemoved.Attach(neighborRemoved) + Events.TransactionReceived.Attach(transactionReceivedC) + + return e, func() { + Events.ConnectionFailed.Detach(connectionFailedC) + Events.NeighborAdded.Detach(neighborAddedC) + Events.NeighborRemoved.Detach(neighborRemoved) + Events.TransactionReceived.Detach(transactionReceivedC) + } +} + +type eventMock struct { + mock.Mock +} + +func (e *eventMock) connectionFailed(p *peer.Peer) { e.Called(p) } +func (e *eventMock) neighborAdded(n *Neighbor) { e.Called(n) } +func (e *eventMock) neighborRemoved(p *peer.Peer) { e.Called(p) } +func (e *eventMock) transactionReceived(ev *TransactionReceivedEvent) { e.Called(ev) } diff --git a/packages/gossip/neighbor.go b/packages/gossip/neighbor.go new file mode 100644 index 00000000..178105a4 --- /dev/null +++ b/packages/gossip/neighbor.go @@ -0,0 +1,145 @@ +package gossip + +import ( + "errors" + "io" + "net" + "strings" + "sync" + + "github.com/iotaledger/goshimmer/packages/autopeering/peer" + "github.com/iotaledger/hive.go/logger" + "github.com/iotaledger/hive.go/network" +) + +var ( + ErrNeighborQueueFull = errors.New("send queue is full") +) + +const neighborQueueSize = 1000 + +type Neighbor struct { + *peer.Peer + *network.ManagedConnection + + log *logger.Logger + queue chan []byte + + wg sync.WaitGroup + closing chan struct{} + disconnectOnce sync.Once +} + +// NewNeighbor creates a new neighbor from the provided peer and connection. +func NewNeighbor(peer *peer.Peer, conn net.Conn, log *logger.Logger) *Neighbor { + if !IsSupported(peer) { + panic("peer does not support gossip") + } + + // always include ID and address with every log message + log = log.With( + "id", peer.ID(), + "network", conn.LocalAddr().Network(), + "addr", conn.RemoteAddr().String(), + ) + + return &Neighbor{ + Peer: peer, + ManagedConnection: network.NewManagedConnection(conn), + log: log, + queue: make(chan []byte, neighborQueueSize), + closing: make(chan struct{}), + } +} + +// Listen starts the communication to the neighbor. +func (n *Neighbor) Listen() { + n.wg.Add(2) + go n.readLoop() + go n.writeLoop() + + n.log.Info("Connection established") +} + +// Close closes the connection to the neighbor and stops all communication. +func (n *Neighbor) Close() error { + err := n.disconnect() + // wait for everything to finish + n.wg.Wait() + + n.log.Infow("Connection closed", + "read", n.BytesRead, + "written", n.BytesWritten, + ) + return err +} + +// IsOutbound returns true if the neighbor is an outbound neighbor. +func (n *Neighbor) IsOutbound() bool { + return GetAddress(n.Peer) == n.Conn.RemoteAddr().String() +} + +func (n *Neighbor) disconnect() (err error) { + n.disconnectOnce.Do(func() { + close(n.closing) + close(n.queue) + err = n.ManagedConnection.Close() + }) + return +} + +func (n *Neighbor) writeLoop() { + defer n.wg.Done() + + for { + select { + case msg := <-n.queue: + if len(msg) == 0 { + continue + } + if _, err := n.ManagedConnection.Write(msg); err != nil { + n.log.Warn("write error", "err", err) + } + case <-n.closing: + return + } + } +} + +func (n *Neighbor) readLoop() { + defer n.wg.Done() + + // create a buffer for the packages + b := make([]byte, maxPacketSize) + + for { + _, err := n.ManagedConnection.Read(b) + if nerr, ok := err.(net.Error); ok && nerr.Temporary() { + // ignore temporary read errors. + n.log.Debugw("temporary read error", "err", err) + continue + } else if err != nil { + // return from the loop on all other errors + if err != io.EOF && !strings.Contains(err.Error(), "use of closed network connection") { + n.log.Warnw("read error", "err", err) + } + _ = n.ManagedConnection.Close() + return + } + } +} + +func (n *Neighbor) Write(b []byte) (int, error) { + l := len(b) + if l > maxPacketSize { + n.log.Errorw("message too large", "len", l, "max", maxPacketSize) + } + + // add to queue + select { + case n.queue <- b: + return l, nil + default: + return 0, ErrNeighborQueueFull + } +} diff --git a/packages/gossip/neighbor_test.go b/packages/gossip/neighbor_test.go new file mode 100644 index 00000000..3ad65b09 --- /dev/null +++ b/packages/gossip/neighbor_test.go @@ -0,0 +1,152 @@ +package gossip + +import ( + "net" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/iotaledger/goshimmer/packages/autopeering/peer" + "github.com/iotaledger/goshimmer/packages/autopeering/peer/service" + "github.com/iotaledger/hive.go/events" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +var testData = []byte("foobar") + +func TestNeighborClose(t *testing.T) { + a, _, teardown := newPipe() + defer teardown() + + n := newTestNeighbor("A", a) + n.Listen() + require.NoError(t, n.Close()) +} + +func TestNeighborCloseTwice(t *testing.T) { + a, _, teardown := newPipe() + defer teardown() + + n := newTestNeighbor("A", a) + n.Listen() + require.NoError(t, n.Close()) + require.NoError(t, n.Close()) +} + +func TestNeighborWriteToClosed(t *testing.T) { + a, _, teardown := newPipe() + defer teardown() + + n := newTestNeighbor("A", a) + n.Listen() + require.NoError(t, n.Close()) + + assert.Panics(t, func() { + _, _ = n.Write(testData) + }) +} + +func TestNeighborWrite(t *testing.T) { + a, b, teardown := newPipe() + defer teardown() + + neighborA := newTestNeighbor("A", a) + defer neighborA.Close() + neighborA.Listen() + + neighborB := newTestNeighbor("B", b) + defer neighborB.Close() + + var count uint32 + neighborB.Events.ReceiveData.Attach(events.NewClosure(func(data []byte) { + assert.Equal(t, testData, data) + atomic.AddUint32(&count, 1) + })) + neighborB.Listen() + + _, err := neighborA.Write(testData) + require.NoError(t, err) + + assert.Eventually(t, func() bool { return atomic.LoadUint32(&count) == 1 }, time.Second, 10*time.Millisecond) +} + +func TestNeighborParallelWrite(t *testing.T) { + a, b, teardown := newPipe() + defer teardown() + + neighborA := newTestNeighbor("A", a) + defer neighborA.Close() + neighborA.Listen() + + neighborB := newTestNeighbor("B", b) + defer neighborB.Close() + + var count uint32 + neighborB.Events.ReceiveData.Attach(events.NewClosure(func(data []byte) { + assert.Equal(t, testData, data) + atomic.AddUint32(&count, 1) + })) + neighborB.Listen() + + var ( + wg sync.WaitGroup + expected uint32 + ) + wg.Add(2) + + // Writer 1 + go func() { + defer wg.Done() + for i := 0; i < neighborQueueSize; i++ { + _, err := neighborA.Write(testData) + if err == ErrNeighborQueueFull { + continue + } + assert.NoError(t, err) + atomic.AddUint32(&expected, 1) + } + }() + // Writer 2 + go func() { + defer wg.Done() + for i := 0; i < neighborQueueSize; i++ { + _, err := neighborA.Write(testData) + if err == ErrNeighborQueueFull { + continue + } + assert.NoError(t, err) + atomic.AddUint32(&expected, 1) + } + }() + + wg.Wait() + + done := func() bool { + actual := atomic.LoadUint32(&count) + return expected == actual + } + assert.Eventually(t, done, time.Second, 10*time.Millisecond) +} + +func newTestNeighbor(name string, conn net.Conn) *Neighbor { + return NewNeighbor(newTestPeer(name, conn.LocalAddr()), conn, log.Named(name)) +} + +func newTestPeer(name string, addr net.Addr) *peer.Peer { + services := service.New() + services.Update(service.PeeringKey, addr.Network(), addr.String()) + services.Update(service.GossipKey, addr.Network(), addr.String()) + + return peer.NewPeer([]byte(name), services) +} + +func newPipe() (net.Conn, net.Conn, func()) { + a, b := net.Pipe() + teardown := func() { + _ = a.Close() + _ = b.Close() + } + return a, b, teardown +} diff --git a/packages/gossip/server/connection.go b/packages/gossip/server/connection.go deleted file mode 100644 index 9c087ecb..00000000 --- a/packages/gossip/server/connection.go +++ /dev/null @@ -1,29 +0,0 @@ -package server - -import ( - "net" - "time" - - "github.com/iotaledger/goshimmer/packages/autopeering/peer" -) - -// Connection represents a network connection to a neighbor peer. -type Connection struct { - net.Conn - peer *peer.Peer -} - -func newConnection(c net.Conn, p *peer.Peer) *Connection { - // make sure the connection has no timeouts - _ = c.SetDeadline(time.Time{}) - - return &Connection{ - Conn: c, - peer: p, - } -} - -// Peer returns the peer associated with that connection. -func (c *Connection) Peer() *peer.Peer { - return c.peer -} diff --git a/packages/gossip/server/server.go b/packages/gossip/server/server.go index ed210257..ade291bd 100644 --- a/packages/gossip/server/server.go +++ b/packages/gossip/server/server.go @@ -55,7 +55,7 @@ type TCP struct { // connect contains the result of an incoming connection. type connect struct { - c *Connection + c net.Conn err error } @@ -133,7 +133,7 @@ func (t *TCP) LocalAddr() net.Addr { // DialPeer establishes a gossip connection to the given peer. // If the peer does not accept the connection or the handshake fails, an error is returned. -func (t *TCP) DialPeer(p *peer.Peer) (*Connection, error) { +func (t *TCP) DialPeer(p *peer.Peer) (net.Conn, error) { gossipAddr := p.Services().Get(service.GossipKey) if gossipAddr == nil { return nil, ErrNoGossip @@ -153,12 +153,12 @@ func (t *TCP) DialPeer(p *peer.Peer) (*Connection, error) { "id", p.ID(), "addr", conn.RemoteAddr(), ) - return newConnection(conn, p), nil + return conn, nil } // AcceptPeer awaits an incoming connection from the given peer. // If the peer does not establish the connection or the handshake fails, an error is returned. -func (t *TCP) AcceptPeer(p *peer.Peer) (*Connection, error) { +func (t *TCP) AcceptPeer(p *peer.Peer) (net.Conn, error) { if p.Services().Get(service.GossipKey) == nil { return nil, ErrNoGossip } @@ -273,7 +273,7 @@ func (t *TCP) matchAccept(m *acceptMatcher, req []byte, conn net.Conn) { t.closeConnection(conn) return } - m.connected <- connect{newConnection(conn, m.peer), nil} + m.connected <- connect{conn, nil} } func (t *TCP) listenLoop() { diff --git a/packages/gossip/server/server_test.go b/packages/gossip/server/server_test.go index e7e88d83..9aa398fb 100644 --- a/packages/gossip/server/server_test.go +++ b/packages/gossip/server/server_test.go @@ -1,7 +1,6 @@ package server import ( - "log" "net" "sync" "testing" @@ -9,22 +8,14 @@ import ( "github.com/iotaledger/goshimmer/packages/autopeering/peer" "github.com/iotaledger/goshimmer/packages/autopeering/peer/service" + "github.com/iotaledger/hive.go/logger" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "go.uber.org/zap" ) const graceTime = 5 * time.Millisecond -var logger *zap.SugaredLogger - -func init() { - l, err := zap.NewDevelopment() - if err != nil { - log.Fatalf("cannot initialize logger: %v", err) - } - logger = l.Sugar() -} +var log = logger.NewExampleLogger("server") func getTCPAddress(t require.TestingT) string { laddr, err := net.ResolveTCPAddr("tcp", "localhost:0") @@ -39,7 +30,7 @@ func getTCPAddress(t require.TestingT) string { } func newTest(t require.TestingT, name string) (*TCP, func()) { - l := logger.Named(name) + l := log.Named(name) db := peer.NewMemoryDB(l.Named("db")) local, err := peer.NewLocal("peering", name, db) require.NoError(t, err) diff --git a/plugins/autopeering/plugin.go b/plugins/autopeering/plugin.go index da2fd920..dd93e398 100644 --- a/plugins/autopeering/plugin.go +++ b/plugins/autopeering/plugin.go @@ -2,6 +2,7 @@ package autopeering import ( "github.com/iotaledger/goshimmer/packages/autopeering/discover" + "github.com/iotaledger/goshimmer/packages/autopeering/peer" "github.com/iotaledger/goshimmer/packages/gossip" "github.com/iotaledger/hive.go/daemon" "github.com/iotaledger/hive.go/events" @@ -27,11 +28,12 @@ func run(*node.Plugin) { } func configureEvents() { - gossip.Events.NeighborDropped.Attach(events.NewClosure(func(ev *gossip.NeighborDroppedEvent) { - log.Info("neighbor dropped: " + ev.Peer.Address() + " / " + ev.Peer.ID().String()) - if Selection != nil { - Selection.DropPeer(ev.Peer) - } + // notify the selection when a connection is closed or failed. + gossip.Events.ConnectionFailed.Attach(events.NewClosure(func(p *peer.Peer) { + Selection.DropPeer(p) + })) + gossip.Events.NeighborRemoved.Attach(events.NewClosure(func(p *peer.Peer) { + Selection.DropPeer(p) })) discover.Events.PeerDiscovered.Attach(events.NewClosure(func(ev *discover.DiscoveredEvent) { diff --git a/plugins/gossip/gossip.go b/plugins/gossip/gossip.go index aff925c7..0c793d55 100644 --- a/plugins/gossip/gossip.go +++ b/plugins/gossip/gossip.go @@ -14,6 +14,7 @@ import ( "github.com/iotaledger/goshimmer/plugins/tangle" "github.com/iotaledger/hive.go/logger" "github.com/iotaledger/hive.go/typeutils" + "github.com/iotaledger/iota.go/trinary" ) var ( @@ -35,7 +36,7 @@ func configureGossip() { log.Fatalf("could not update services: %v", err) } - mgr = gp.NewManager(lPeer, getTransaction, log) + mgr = gp.NewManager(lPeer, loadTransaction, log) } func start(shutdownSignal <-chan struct{}) { @@ -56,7 +57,7 @@ func start(shutdownSignal <-chan struct{}) { log.Info("Stopping Gossip ...") } -func getTransaction(hash []byte) ([]byte, error) { +func loadTransaction(hash []byte) ([]byte, error) { log.Infof("Retrieving tx: hash=%s", hash) tx, err := tangle.GetTransaction(typeutils.BytesToString(hash)) @@ -68,3 +69,11 @@ func getTransaction(hash []byte) ([]byte, error) { } return tx.GetBytes(), nil } + +func requestTransaction(hash trinary.Hash) { + if contains, _ := tangle.ContainsTransaction(hash); contains { + // Do not request tx that we already know + return + } + mgr.RequestTransaction(typeutils.StringToBytes(hash)) +} diff --git a/plugins/gossip/plugin.go b/plugins/gossip/plugin.go index 77436467..0a4f61e4 100644 --- a/plugins/gossip/plugin.go +++ b/plugins/gossip/plugin.go @@ -1,7 +1,7 @@ package gossip import ( - "github.com/iotaledger/goshimmer/packages/autopeering/peer/service" + "github.com/iotaledger/goshimmer/packages/autopeering/peer" "github.com/iotaledger/goshimmer/packages/autopeering/selection" "github.com/iotaledger/goshimmer/packages/gossip" "github.com/iotaledger/goshimmer/packages/model/value_transaction" @@ -10,7 +10,6 @@ import ( "github.com/iotaledger/hive.go/events" "github.com/iotaledger/hive.go/logger" "github.com/iotaledger/hive.go/node" - "github.com/iotaledger/hive.go/typeutils" ) const name = "Gossip" // name of the plugin @@ -32,33 +31,37 @@ func run(*node.Plugin) { func configureEvents() { selection.Events.Dropped.Attach(events.NewClosure(func(ev *selection.DroppedEvent) { - log.Info("neighbor removed: " + ev.DroppedID.String()) - go mgr.DropNeighbor(ev.DroppedID) + go func() { + if err := mgr.DropNeighbor(ev.DroppedID); err != nil { + log.Debugw("error dropping neighbor", "id", ev.DroppedID, "err", err) + } + }() })) - selection.Events.IncomingPeering.Attach(events.NewClosure(func(ev *selection.PeeringEvent) { - gossipService := ev.Peer.Services().Get(service.GossipKey) - if gossipService != nil { - log.Info("accepted neighbor added: " + ev.Peer.Address() + " / " + ev.Peer.String()) - go mgr.AddInbound(ev.Peer) - } + go func() { + if err := mgr.AddInbound(ev.Peer); err != nil { + log.Debugw("error adding inbound", "id", ev.Peer.ID(), "err", err) + } + }() })) - selection.Events.OutgoingPeering.Attach(events.NewClosure(func(ev *selection.PeeringEvent) { - gossipService := ev.Peer.Services().Get(service.GossipKey) - if gossipService != nil { - log.Info("chosen neighbor added: " + ev.Peer.Address() + " / " + ev.Peer.String()) - go mgr.AddOutbound(ev.Peer) - } + go func() { + if err := mgr.AddOutbound(ev.Peer); err != nil { + log.Debugw("error adding outbound", "id", ev.Peer.ID(), "err", err) + } + }() })) - tangle.Events.TransactionSolid.Attach(events.NewClosure(func(tx *value_transaction.ValueTransaction) { - log.Debugf("gossip solid tx: hash=%s", tx.GetHash()) - go mgr.SendTransaction(tx.GetBytes()) + gossip.Events.NeighborAdded.Attach(events.NewClosure(func(n *gossip.Neighbor) { + log.Infof("Neighbor added: %s / %s", gossip.GetAddress(n.Peer), n.ID()) + })) + gossip.Events.NeighborRemoved.Attach(events.NewClosure(func(p *peer.Peer) { + log.Infof("Neighbor removed: %s / %s", gossip.GetAddress(p), p.ID()) })) - gossip.Events.RequestTransaction.Attach(events.NewClosure(func(ev *gossip.RequestTransactionEvent) { - log.Debugf("gossip tx request: hash=%s", ev.Hash) - go mgr.RequestTransaction(typeutils.StringToBytes(ev.Hash)) + // gossip transactions on solidification + tangle.Events.TransactionSolid.Attach(events.NewClosure(func(tx *value_transaction.ValueTransaction) { + mgr.SendTransaction(tx.GetBytes()) })) + tangle.SetRequester(tangle.RequesterFunc(requestTransaction)) } diff --git a/plugins/statusscreen/plugin.go b/plugins/statusscreen/plugin.go index 544e1900..f79ca05e 100644 --- a/plugins/statusscreen/plugin.go +++ b/plugins/statusscreen/plugin.go @@ -44,7 +44,7 @@ func run(*node.Plugin) { } stopped := make(chan struct{}) - err := daemon.BackgroundWorker(name+" Refresher", func(shutdown <-chan struct{}) { + if err := daemon.BackgroundWorker(name+" Refresher", func(shutdown <-chan struct{}) { for { select { case <-time.After(repaintInterval): @@ -57,13 +57,12 @@ func run(*node.Plugin) { return } } - }) - if err != nil { + }); err != nil { log.Errorf("Failed to start as daemon: %s", err) return } - err = daemon.BackgroundWorker(name+" App", func(<-chan struct{}) { + if err := daemon.BackgroundWorker(name+" App", func(<-chan struct{}) { defer close(stopped) // switch logging to status screen @@ -73,8 +72,7 @@ func run(*node.Plugin) { if err := app.SetRoot(frame, true).SetFocus(frame).Run(); err != nil { log.Errorf("Error running application: %s", err) } - }) - if err != nil { + }); err != nil { log.Errorf("Failed to start as daemon: %s", err) close(stopped) } diff --git a/plugins/tangle/events.go b/plugins/tangle/events.go index 3cc90ba4..2a89c724 100644 --- a/plugins/tangle/events.go +++ b/plugins/tangle/events.go @@ -5,14 +5,12 @@ import ( "github.com/iotaledger/hive.go/events" ) -var Events = pluginEvents{ - TransactionStored: events.NewEvent(transactionCaller), - TransactionSolid: events.NewEvent(transactionCaller), -} - -type pluginEvents struct { +var Events = struct { TransactionStored *events.Event TransactionSolid *events.Event +}{ + TransactionStored: events.NewEvent(transactionCaller), + TransactionSolid: events.NewEvent(transactionCaller), } func transactionCaller(handler interface{}, params ...interface{}) { diff --git a/plugins/tangle/plugin.go b/plugins/tangle/plugin.go index 6684986c..50cbafaa 100644 --- a/plugins/tangle/plugin.go +++ b/plugins/tangle/plugin.go @@ -3,6 +3,7 @@ package tangle import ( "github.com/iotaledger/hive.go/logger" "github.com/iotaledger/hive.go/node" + "github.com/iotaledger/iota.go/trinary" ) // region plugin module setup ////////////////////////////////////////////////////////////////////////////////////////// @@ -24,4 +25,13 @@ func run(*node.Plugin) { runSolidifier() } +// Requester provides the functionality to request a transaction from the network. +type Requester interface { + RequestTransaction(hash trinary.Hash) +} + +type RequesterFunc func(hash trinary.Hash) + +func (f RequesterFunc) RequestTransaction(hash trinary.Hash) { f(hash) } + // endregion /////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/plugins/tangle/solidifier.go b/plugins/tangle/solidifier.go index e30ae82e..4b3dd663 100644 --- a/plugins/tangle/solidifier.go +++ b/plugins/tangle/solidifier.go @@ -21,16 +21,23 @@ import ( const UnsolidInterval = 30 var ( - workerPool *workerpool.WorkerPool - unsolidTxs *UnsolidTxs + workerCount = runtime.NumCPU() + workerPool *workerpool.WorkerPool + unsolidTxs *UnsolidTxs + + requester Requester ) +func SetRequester(req Requester) { + requester = req +} + func configureSolidifier() { workerPool = workerpool.New(func(task workerpool.Task) { processMetaTransaction(task.Param(0).(*meta_transaction.MetaTransaction)) task.Return(nil) - }, workerpool.WorkerCount(WORKER_COUNT), workerpool.QueueSize(10000)) + }, workerpool.WorkerCount(workerCount), workerpool.QueueSize(10000)) unsolidTxs = NewUnsolidTxs() @@ -221,8 +228,10 @@ func updateUnsolidTxs(tx *value_transaction.ValueTransaction) { } func requestTransaction(hash trinary.Trytes) { - log.Infof("Requesting hash: hash=%s", hash) - gossip.Events.RequestTransaction.Trigger(&gossip.RequestTransactionEvent{Hash: hash}) -} + if requester == nil { + return + } -var WORKER_COUNT = runtime.NumCPU() + log.Infow("Requesting tx", "hash", hash) + requester.RequestTransaction(hash) +} diff --git a/plugins/tangle/solidifier_test.go b/plugins/tangle/solidifier_test.go index e93cfdce..14573d43 100644 --- a/plugins/tangle/solidifier_test.go +++ b/plugins/tangle/solidifier_test.go @@ -11,23 +11,20 @@ import ( "github.com/iotaledger/hive.go/events" "github.com/iotaledger/hive.go/logger" "github.com/iotaledger/hive.go/node" - "github.com/spf13/viper" + "github.com/iotaledger/iota.go/trinary" "github.com/stretchr/testify/require" ) func init() { - err := parameter.LoadDefaultConfig(false) - if err != nil { + if err := parameter.LoadDefaultConfig(false); err != nil { + log.Fatalf("Failed to initialize config: %s", err) + } + if err := logger.InitGlobalLogger(parameter.NodeConfig); err != nil { log.Fatalf("Failed to initialize config: %s", err) } - - logger.InitGlobalLogger(&viper.Viper{}) } func TestSolidifier(t *testing.T) { - // show all error messages for tests - // TODO: adjust logger package - // start a test node node.Start(node.Plugins(PLUGIN)) @@ -54,12 +51,12 @@ func TestSolidifier(t *testing.T) { // setup event handlers var wg sync.WaitGroup Events.TransactionSolid.Attach(events.NewClosure(func(transaction *value_transaction.ValueTransaction) { - t.Log("Tx solidified", transaction.GetValue()) wg.Done() })) - gossip.Events.RequestTransaction.Attach(events.NewClosure(func(ev *gossip.RequestTransactionEvent) { - require.Equal(t, transaction3.GetHash(), ev.Hash) + // only transaction3 should be requested + SetRequester(RequesterFunc(func(hash trinary.Hash) { + require.Equal(t, transaction3.GetHash(), hash) // return the transaction data gossip.Events.TransactionReceived.Trigger(&gossip.TransactionReceivedEvent{Data: transaction3.GetBytes()}) })) @@ -77,5 +74,4 @@ func TestSolidifier(t *testing.T) { // shutdown test node node.Shutdown() - } diff --git a/plugins/webapi/routes.go b/plugins/webapi/routes.go deleted file mode 100644 index 2f5e3754..00000000 --- a/plugins/webapi/routes.go +++ /dev/null @@ -1,9 +0,0 @@ -package webapi - -import ( - "github.com/labstack/echo" -) - -func setupRoutes(e *echo.Echo) { - -} -- GitLab