diff --git a/packages/gossip/events.go b/packages/gossip/events.go index 5215d054a9aece8cb18ecc9b975570a4c1a3b60b..93875d68c290387327e4411b2c2dab86c798d1af 100644 --- a/packages/gossip/events.go +++ b/packages/gossip/events.go @@ -5,8 +5,8 @@ import ( "github.com/iotaledger/hive.go/events" ) -// Events contains all the events related to the gossip protocol. -var Events = struct { +// Events defines all the events related to the gossip protocol. +type Events struct { // Fired when an attempt to build a connection to a neighbor has failed. ConnectionFailed *events.Event // Fired when a neighbor connection has been established. @@ -15,11 +15,6 @@ var Events = struct { NeighborRemoved *events.Event // Fired when a new message was received via the gossip protocol. MessageReceived *events.Event -}{ - ConnectionFailed: events.NewEvent(peerAndErrorCaller), - NeighborAdded: events.NewEvent(neighborCaller), - NeighborRemoved: events.NewEvent(peerCaller), - MessageReceived: events.NewEvent(messageReceived), } // MessageReceivedEvent holds data about a message received event. diff --git a/packages/gossip/manager.go b/packages/gossip/manager.go index 04886f2bd4d76703352b780f32abac3cf75eac36..eddc0ff4e9e70347945fc1e82d5a15192872caf1 100644 --- a/packages/gossip/manager.go +++ b/packages/gossip/manager.go @@ -12,7 +12,7 @@ import ( "github.com/iotaledger/hive.go/autopeering/peer" "github.com/iotaledger/hive.go/events" "github.com/iotaledger/hive.go/identity" - "go.uber.org/zap" + "github.com/iotaledger/hive.go/logger" ) const ( @@ -26,7 +26,8 @@ type LoadMessageFunc func(messageId message.Id) ([]byte, error) type Manager struct { local *peer.Local loadMessageFunc LoadMessageFunc - log *zap.SugaredLogger + log *logger.Logger + events Events wg sync.WaitGroup @@ -36,13 +37,19 @@ type Manager struct { } // NewManager creates a new Manager. -func NewManager(local *peer.Local, f LoadMessageFunc, log *zap.SugaredLogger) *Manager { +func NewManager(local *peer.Local, f LoadMessageFunc, log *logger.Logger) *Manager { return &Manager{ local: local, loadMessageFunc: f, log: log, - srv: nil, - neighbors: make(map[identity.ID]*Neighbor), + events: Events{ + ConnectionFailed: events.NewEvent(peerAndErrorCaller), + NeighborAdded: events.NewEvent(neighborCaller), + NeighborRemoved: events.NewEvent(peerCaller), + MessageReceived: events.NewEvent(messageReceived), + }, + srv: nil, + neighbors: make(map[identity.ID]*Neighbor), } } @@ -60,6 +67,11 @@ func (m *Manager) Close() { m.wg.Wait() } +// Events returns the events related to the gossip protocol. +func (m *Manager) Events() Events { + return m.events +} + func (m *Manager) stop() { m.mu.Lock() defer m.mu.Unlock() @@ -176,13 +188,13 @@ func (m *Manager) send(b []byte, to ...identity.ID) { func (m *Manager) addNeighbor(peer *peer.Peer, connectorFunc func(*peer.Peer) (net.Conn, error)) error { conn, err := connectorFunc(peer) if err != nil { - Events.ConnectionFailed.Trigger(peer, err) + m.events.ConnectionFailed.Trigger(peer, err) return err } if _, ok := m.neighbors[peer.ID()]; ok { _ = conn.Close() - Events.ConnectionFailed.Trigger(peer, ErrDuplicateNeighbor) + m.events.ConnectionFailed.Trigger(peer, ErrDuplicateNeighbor) return ErrDuplicateNeighbor } @@ -191,7 +203,7 @@ func (m *Manager) addNeighbor(peer *peer.Peer, connectorFunc func(*peer.Peer) (n n.Events.Close.Attach(events.NewClosure(func() { // assure that the neighbor is removed and notify _ = m.DropNeighbor(peer.ID()) - Events.NeighborRemoved.Trigger(peer) + m.events.NeighborRemoved.Trigger(peer) })) n.Events.ReceiveMessage.Attach(events.NewClosure(func(data []byte) { if err := m.handlePacket(data, peer); err != nil { @@ -201,7 +213,7 @@ func (m *Manager) addNeighbor(peer *peer.Peer, connectorFunc func(*peer.Peer) (n m.neighbors[peer.ID()] = n n.Listen() - Events.NeighborAdded.Trigger(n) + m.events.NeighborAdded.Trigger(n) return nil } @@ -220,7 +232,7 @@ func (m *Manager) handlePacket(data []byte, p *peer.Peer) error { return fmt.Errorf("invalid packet: %w", err) } m.log.Debugw("received packet", "type", protoMsg.Type(), "peer-id", p.ID()) - Events.MessageReceived.Trigger(&MessageReceivedEvent{Data: protoMsg.GetData(), Peer: p}) + m.events.MessageReceived.Trigger(&MessageReceivedEvent{Data: protoMsg.GetData(), Peer: p}) case pb.PacketMessageRequest: protoMsgReq := new(pb.MessageRequest) diff --git a/packages/gossip/manager_test.go b/packages/gossip/manager_test.go index 53fe088906d16a982750777e59e04944f344ebcb..399fa1759670c9b14c84ce65b304de1334be5888 100644 --- a/packages/gossip/manager_test.go +++ b/packages/gossip/manager_test.go @@ -7,6 +7,9 @@ import ( "time" "github.com/golang/protobuf/proto" + "github.com/iotaledger/goshimmer/packages/binary/messagelayer/message" + pb "github.com/iotaledger/goshimmer/packages/gossip/proto" + "github.com/iotaledger/goshimmer/packages/gossip/server" "github.com/iotaledger/hive.go/autopeering/peer" "github.com/iotaledger/hive.go/autopeering/peer/service" "github.com/iotaledger/hive.go/database/mapdb" @@ -15,10 +18,6 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" - - "github.com/iotaledger/goshimmer/packages/binary/messagelayer/message" - pb "github.com/iotaledger/goshimmer/packages/gossip/proto" - "github.com/iotaledger/goshimmer/packages/gossip/server" ) const graceTime = 10 * time.Millisecond @@ -31,30 +30,24 @@ var ( func loadTestMessage(message.Id) ([]byte, error) { return testMessageData, nil } func TestClose(t *testing.T) { - _, detach := newEventMock(t) - defer detach() - - _, teardown, _ := newTestManager(t, "A") + _, teardown, _ := newMockedManager(t, "A") teardown() } func TestClosedConnection(t *testing.T) { - e, detach := newEventMock(t) - defer detach() - - mgrA, closeA, peerA := newTestManager(t, "A") + mgrA, closeA, peerA := newMockedManager(t, "A") defer closeA() - mgrB, closeB, peerB := newTestManager(t, "B") + mgrB, closeB, peerB := newMockedManager(t, "B") defer closeB() - connections := 2 - e.On("neighborAdded", mock.Anything).Times(connections) - var wg sync.WaitGroup - wg.Add(connections) + wg.Add(2) // connect in the following way // B -> A + mgrA.On("neighborAdded", mock.Anything).Once() + mgrB.On("neighborAdded", mock.Anything).Once() + go func() { defer wg.Done() err := mgrA.AddInbound(peerB) @@ -70,8 +63,8 @@ func TestClosedConnection(t *testing.T) { // wait for the connections to establish wg.Wait() - e.On("neighborRemoved", peerA).Once() - e.On("neighborRemoved", peerB).Once() + mgrA.On("neighborRemoved", peerB).Once() + mgrB.On("neighborRemoved", peerA).Once() // A drops B err := mgrA.DropNeighbor(peerB.ID()) @@ -79,24 +72,22 @@ func TestClosedConnection(t *testing.T) { time.Sleep(graceTime) // the events should be there even before we close - e.AssertExpectations(t) + mgrA.AssertExpectations(t) + mgrB.AssertExpectations(t) } func TestP2PSend(t *testing.T) { - e, detach := newEventMock(t) - defer detach() - - mgrA, closeA, peerA := newTestManager(t, "A") - mgrB, closeB, peerB := newTestManager(t, "B") - - connections := 2 - e.On("neighborAdded", mock.Anything).Times(connections) + mgrA, closeA, peerA := newMockedManager(t, "A") + mgrB, closeB, peerB := newMockedManager(t, "B") var wg sync.WaitGroup - wg.Add(connections) + wg.Add(2) // connect in the following way // B -> A + mgrA.On("neighborAdded", mock.Anything).Once() + mgrB.On("neighborAdded", mock.Anything).Once() + go func() { defer wg.Done() err := mgrA.AddInbound(peerB) @@ -112,7 +103,7 @@ func TestP2PSend(t *testing.T) { // wait for the connections to establish wg.Wait() - e.On("messageReceived", &MessageReceivedEvent{ + mgrB.On("messageReceived", &MessageReceivedEvent{ Data: testMessageData, Peer: peerA, }).Once() @@ -120,31 +111,30 @@ func TestP2PSend(t *testing.T) { mgrA.SendMessage(testMessageData) time.Sleep(graceTime) - e.On("neighborRemoved", peerA).Once() - e.On("neighborRemoved", peerB).Once() + mgrA.On("neighborRemoved", peerB).Once() + mgrB.On("neighborRemoved", peerA).Once() closeA() closeB() time.Sleep(graceTime) - e.AssertExpectations(t) + // the events should be there even before we close + mgrA.AssertExpectations(t) + mgrB.AssertExpectations(t) } func TestP2PSendTwice(t *testing.T) { - e, detach := newEventMock(t) - defer detach() - - mgrA, closeA, peerA := newTestManager(t, "A") - mgrB, closeB, peerB := newTestManager(t, "B") - - connections := 2 - e.On("neighborAdded", mock.Anything).Times(connections) + mgrA, closeA, peerA := newMockedManager(t, "A") + mgrB, closeB, peerB := newMockedManager(t, "B") var wg sync.WaitGroup - wg.Add(connections) + wg.Add(2) // connect in the following way // B -> A + mgrA.On("neighborAdded", mock.Anything).Once() + mgrB.On("neighborAdded", mock.Anything).Once() + go func() { defer wg.Done() err := mgrA.AddInbound(peerB) @@ -160,7 +150,7 @@ func TestP2PSendTwice(t *testing.T) { // wait for the connections to establish wg.Wait() - e.On("messageReceived", &MessageReceivedEvent{ + mgrB.On("messageReceived", &MessageReceivedEvent{ Data: testMessageData, Peer: peerA, }).Twice() @@ -170,32 +160,32 @@ func TestP2PSendTwice(t *testing.T) { mgrA.SendMessage(testMessageData) time.Sleep(graceTime) - e.On("neighborRemoved", peerA).Once() - e.On("neighborRemoved", peerB).Once() + mgrA.On("neighborRemoved", peerB).Once() + mgrB.On("neighborRemoved", peerA).Once() closeA() closeB() time.Sleep(graceTime) - e.AssertExpectations(t) + // the events should be there even before we close + mgrA.AssertExpectations(t) + mgrB.AssertExpectations(t) } func TestBroadcast(t *testing.T) { - e, detach := newEventMock(t) - defer detach() - - mgrA, closeA, peerA := newTestManager(t, "A") - mgrB, closeB, peerB := newTestManager(t, "B") - mgrC, closeC, peerC := newTestManager(t, "C") - - connections := 4 - e.On("neighborAdded", mock.Anything).Times(connections) + mgrA, closeA, peerA := newMockedManager(t, "A") + mgrB, closeB, peerB := newMockedManager(t, "B") + mgrC, closeC, peerC := newMockedManager(t, "C") var wg sync.WaitGroup - wg.Add(connections) + wg.Add(4) // connect in the following way // B -> A <- C + mgrA.On("neighborAdded", mock.Anything).Twice() + mgrB.On("neighborAdded", mock.Anything).Once() + mgrC.On("neighborAdded", mock.Anything).Once() + go func() { defer wg.Done() err := mgrA.AddInbound(peerB) @@ -221,42 +211,42 @@ func TestBroadcast(t *testing.T) { // wait for the connections to establish wg.Wait() - e.On("messageReceived", &MessageReceivedEvent{ - Data: testMessageData, - Peer: peerA, - }).Twice() + event := &MessageReceivedEvent{Data: testMessageData, Peer: peerA} + mgrB.On("messageReceived", event).Once() + mgrC.On("messageReceived", event).Once() mgrA.SendMessage(testMessageData) time.Sleep(graceTime) - e.On("neighborRemoved", peerA).Twice() - e.On("neighborRemoved", peerB).Once() - e.On("neighborRemoved", peerC).Once() + mgrA.On("neighborRemoved", peerB).Once() + mgrA.On("neighborRemoved", peerC).Once() + mgrB.On("neighborRemoved", peerA).Once() + mgrC.On("neighborRemoved", peerA).Once() closeA() closeB() closeC() time.Sleep(graceTime) - e.AssertExpectations(t) + mgrA.AssertExpectations(t) + mgrB.AssertExpectations(t) + mgrC.AssertExpectations(t) } func TestSingleSend(t *testing.T) { - e, detach := newEventMock(t) - defer detach() - - mgrA, closeA, peerA := newTestManager(t, "A") - mgrB, closeB, peerB := newTestManager(t, "B") - mgrC, closeC, peerC := newTestManager(t, "C") - - connections := 4 - e.On("neighborAdded", mock.Anything).Times(connections) + mgrA, closeA, peerA := newMockedManager(t, "A") + mgrB, closeB, peerB := newMockedManager(t, "B") + mgrC, closeC, peerC := newMockedManager(t, "C") var wg sync.WaitGroup - wg.Add(connections) + wg.Add(4) // connect in the following way // B -> A <- C + mgrA.On("neighborAdded", mock.Anything).Twice() + mgrB.On("neighborAdded", mock.Anything).Once() + mgrC.On("neighborAdded", mock.Anything).Once() + go func() { defer wg.Done() err := mgrA.AddInbound(peerB) @@ -282,59 +272,55 @@ func TestSingleSend(t *testing.T) { // wait for the connections to establish wg.Wait() - e.On("messageReceived", &MessageReceivedEvent{ - Data: testMessageData, - Peer: peerA, - }).Once() + // only mgr should receive the message + mgrB.On("messageReceived", &MessageReceivedEvent{Data: testMessageData, Peer: peerA}).Once() // A sends the message only to B mgrA.SendMessage(testMessageData, peerB.ID()) time.Sleep(graceTime) - e.On("neighborRemoved", peerA).Twice() - e.On("neighborRemoved", peerB).Once() - e.On("neighborRemoved", peerC).Once() + mgrA.On("neighborRemoved", peerB).Once() + mgrA.On("neighborRemoved", peerC).Once() + mgrB.On("neighborRemoved", peerA).Once() + mgrC.On("neighborRemoved", peerA).Once() closeA() closeB() closeC() time.Sleep(graceTime) - e.AssertExpectations(t) + mgrA.AssertExpectations(t) + mgrB.AssertExpectations(t) + mgrC.AssertExpectations(t) } func TestDropUnsuccessfulAccept(t *testing.T) { - e, detach := newEventMock(t) - defer detach() - - mgrA, closeA, _ := newTestManager(t, "A") + mgrA, closeA, _ := newMockedManager(t, "A") defer closeA() - _, closeB, peerB := newTestManager(t, "B") + mgrB, closeB, peerB := newMockedManager(t, "B") defer closeB() - e.On("connectionFailed", peerB, mock.Anything).Once() + mgrA.On("connectionFailed", peerB, mock.Anything).Once() err := mgrA.AddInbound(peerB) assert.Error(t, err) - e.AssertExpectations(t) + mgrA.AssertExpectations(t) + mgrB.AssertExpectations(t) } func TestMessageRequest(t *testing.T) { - e, detach := newEventMock(t) - defer detach() - - mgrA, closeA, peerA := newTestManager(t, "A") - mgrB, closeB, peerB := newTestManager(t, "B") - - connections := 2 - e.On("neighborAdded", mock.Anything).Times(connections) + mgrA, closeA, peerA := newMockedManager(t, "A") + mgrB, closeB, peerB := newMockedManager(t, "B") var wg sync.WaitGroup - wg.Add(connections) + wg.Add(2) // connect in the following way // B -> A + mgrA.On("neighborAdded", mock.Anything).Once() + mgrB.On("neighborAdded", mock.Anything).Once() + go func() { defer wg.Done() err := mgrA.AddInbound(peerB) @@ -352,24 +338,23 @@ func TestMessageRequest(t *testing.T) { messageId := message.Id{} - e.On("messageReceived", &MessageReceivedEvent{ - Data: testMessageData, - Peer: peerB, - }).Once() + // mgrA should eventually receive the message + mgrA.On("messageReceived", &MessageReceivedEvent{Data: testMessageData, Peer: peerB}).Once() b, err := proto.Marshal(&pb.MessageRequest{Id: messageId[:]}) require.NoError(t, err) mgrA.RequestMessage(b) time.Sleep(graceTime) - e.On("neighborRemoved", peerA).Once() - e.On("neighborRemoved", peerB).Once() + mgrA.On("neighborRemoved", peerB).Once() + mgrB.On("neighborRemoved", peerA).Once() closeA() closeB() time.Sleep(graceTime) - e.AssertExpectations(t) + mgrA.AssertExpectations(t) + mgrB.AssertExpectations(t) } func TestDropNeighbor(t *testing.T) { @@ -378,23 +363,35 @@ func TestDropNeighbor(t *testing.T) { mgrB, closeB, peerB := newTestManager(t, "B") defer closeB() + // establish connection connect := func() { var wg sync.WaitGroup - closure := events.NewClosure(func(_ *Neighbor) { wg.Done() }) + signal := events.NewClosure(func(_ *Neighbor) { wg.Done() }) + // we are expecting two signals wg.Add(2) - Events.NeighborAdded.Attach(closure) - defer Events.NeighborAdded.Detach(closure) + + // signal as soon as the neighbor is added + mgrA.Events().NeighborAdded.Attach(signal) + defer mgrA.Events().NeighborAdded.Detach(signal) + mgrB.Events().NeighborAdded.Attach(signal) + defer mgrB.Events().NeighborAdded.Detach(signal) go func() { assert.NoError(t, mgrA.AddInbound(peerB)) }() go func() { assert.NoError(t, mgrB.AddOutbound(peerA)) }() wg.Wait() // wait until the events were triggered and the peers are connected } - disc := func() { + // close connection + disconnect := func() { var wg sync.WaitGroup - closure := events.NewClosure(func(_ *peer.Peer) { wg.Done() }) + signal := events.NewClosure(func(_ *peer.Peer) { wg.Done() }) + // we are expecting two signals wg.Add(2) - Events.NeighborRemoved.Attach(closure) - defer Events.NeighborRemoved.Detach(closure) + + // signal as soon as the neighbor is added + mgrA.Events().NeighborRemoved.Attach(signal) + defer mgrA.Events().NeighborRemoved.Detach(signal) + mgrB.Events().NeighborRemoved.Attach(signal) + defer mgrB.Events().NeighborRemoved.Detach(signal) // assure that no DropNeighbor calls are leaking wg.Add(2) @@ -414,7 +411,7 @@ func TestDropNeighbor(t *testing.T) { connect() assert.NotEmpty(t, mgrA.AllNeighbors()) assert.NotEmpty(t, mgrB.AllNeighbors()) - disc() + disconnect() assert.Empty(t, mgrA.AllNeighbors()) assert.Empty(t, mgrB.AllNeighbors()) } @@ -455,33 +452,29 @@ func newTestManager(t require.TestingT, name string) (*Manager, func(), *peer.Pe return mgr, detach, local.Peer } -func newEventMock(t mock.TestingT) (*eventMock, func()) { - e := &eventMock{} +func newMockedManager(t *testing.T, name string) (*mockedManager, func(), *peer.Peer) { + mgr, detach, p := newTestManager(t, name) + return mockManager(t, mgr), detach, p +} + +func mockManager(t mock.TestingT, mgr *Manager) *mockedManager { + e := &mockedManager{Manager: mgr} e.Test(t) - connectionFailedC := events.NewClosure(e.connectionFailed) - neighborAddedC := events.NewClosure(e.neighborAdded) - neighborRemoved := events.NewClosure(e.neighborRemoved) - messageReceivedC := events.NewClosure(e.messageReceived) - - Events.ConnectionFailed.Attach(connectionFailedC) - Events.NeighborAdded.Attach(neighborAddedC) - Events.NeighborRemoved.Attach(neighborRemoved) - Events.MessageReceived.Attach(messageReceivedC) - - return e, func() { - Events.ConnectionFailed.Detach(connectionFailedC) - Events.NeighborAdded.Detach(neighborAddedC) - Events.NeighborRemoved.Detach(neighborRemoved) - Events.MessageReceived.Detach(messageReceivedC) - } + e.Events().ConnectionFailed.Attach(events.NewClosure(e.connectionFailed)) + e.Events().NeighborAdded.Attach(events.NewClosure(e.neighborAdded)) + e.Events().NeighborRemoved.Attach(events.NewClosure(e.neighborRemoved)) + e.Events().MessageReceived.Attach(events.NewClosure(e.messageReceived)) + + return e } -type eventMock struct { +type mockedManager struct { mock.Mock + *Manager } -func (e *eventMock) connectionFailed(p *peer.Peer, err error) { e.Called(p, err) } -func (e *eventMock) neighborAdded(n *Neighbor) { e.Called(n) } -func (e *eventMock) neighborRemoved(p *peer.Peer) { e.Called(p) } -func (e *eventMock) messageReceived(ev *MessageReceivedEvent) { e.Called(ev) } +func (e *mockedManager) connectionFailed(p *peer.Peer, err error) { e.Called(p, err) } +func (e *mockedManager) neighborAdded(n *Neighbor) { e.Called(n) } +func (e *mockedManager) neighborRemoved(p *peer.Peer) { e.Called(p) } +func (e *mockedManager) messageReceived(ev *MessageReceivedEvent) { e.Called(ev) } diff --git a/plugins/autopeering/plugin.go b/plugins/autopeering/plugin.go index 102a7c44e6b44df95a649ac03d39abb6a64a8038..102c05a55428d042dbface00d252cd25bba3394a 100644 --- a/plugins/autopeering/plugin.go +++ b/plugins/autopeering/plugin.go @@ -3,8 +3,8 @@ package autopeering import ( "time" - "github.com/iotaledger/goshimmer/packages/gossip" "github.com/iotaledger/goshimmer/packages/shutdown" + "github.com/iotaledger/goshimmer/plugins/gossip" "github.com/iotaledger/hive.go/autopeering/discover" "github.com/iotaledger/hive.go/autopeering/peer" "github.com/iotaledger/hive.go/autopeering/selection" @@ -37,10 +37,10 @@ func run(*node.Plugin) { func configureEvents() { // notify the selection when a connection is closed or failed. - gossip.Events.ConnectionFailed.Attach(events.NewClosure(func(p *peer.Peer, _ error) { + gossip.Manager().Events().ConnectionFailed.Attach(events.NewClosure(func(p *peer.Peer, _ error) { Selection.RemoveNeighbor(p.ID()) })) - gossip.Events.NeighborRemoved.Attach(events.NewClosure(func(p *peer.Peer) { + gossip.Manager().Events().NeighborRemoved.Attach(events.NewClosure(func(p *peer.Peer) { Selection.RemoveNeighbor(p.ID()) })) diff --git a/plugins/dashboard/plugin.go b/plugins/dashboard/plugin.go index 2fab6b08b8298685d3a009c4d430e7f36184aedb..b2b8e09f2a997ec62e7c0ce375daf7cd8265a60b 100644 --- a/plugins/dashboard/plugin.go +++ b/plugins/dashboard/plugin.go @@ -198,7 +198,7 @@ func neighborMetrics() []neighbormetric { stats := []neighbormetric{} // gossip plugin might be disabled - neighbors := gossip.Neighbors() + neighbors := gossip.Manager().AllNeighbors() if neighbors == nil { return stats } diff --git a/plugins/gossip/gossip.go b/plugins/gossip/gossip.go index 8214d7fafcdcb8c553b60774099ca9dc0f02bf2e..7dd18f5563d0fc7a4d271ed9c62da2857b43c83f 100644 --- a/plugins/gossip/gossip.go +++ b/plugins/gossip/gossip.go @@ -4,9 +4,10 @@ import ( "fmt" "net" "strconv" + "sync" "github.com/iotaledger/goshimmer/packages/binary/messagelayer/message" - gp "github.com/iotaledger/goshimmer/packages/gossip" + "github.com/iotaledger/goshimmer/packages/gossip" "github.com/iotaledger/goshimmer/packages/gossip/server" "github.com/iotaledger/goshimmer/plugins/autopeering/local" "github.com/iotaledger/goshimmer/plugins/config" @@ -17,12 +18,19 @@ import ( ) var ( - log *logger.Logger - mgr *gp.Manager - srv *server.TCP + log *logger.Logger + mgr *gossip.Manager + mgrOnce sync.Once ) -func configureGossip() { +// Manager returns the manager instance of the gossip plugin. +func Manager() *gossip.Manager { + mgrOnce.Do(createManager) + return mgr +} + +func createManager() { + log = logger.NewLogger(PluginName) lPeer := local.GetInstance() // announce the gossip service @@ -34,7 +42,7 @@ func configureGossip() { if err := lPeer.UpdateService(service.GossipKey, "tcp", gossipPort); err != nil { log.Fatalf("could not update services: %s", err) } - mgr = gp.NewManager(lPeer, loadMessage, log) + mgr = gossip.NewManager(lPeer, loadMessage, log) } func start(shutdownSignal <-chan struct{}) { @@ -58,7 +66,7 @@ func start(shutdownSignal <-chan struct{}) { } defer listener.Close() - srv = server.ServeTCP(lPeer, listener, log) + srv := server.ServeTCP(lPeer, listener, log) defer srv.Close() mgr.Start(srv) @@ -80,11 +88,3 @@ func loadMessage(messageID message.Id) (bytes []byte, err error) { } return } - -// Neighbors returns the list of the neighbors. -func Neighbors() []*gp.Neighbor { - if mgr == nil { - return nil - } - return mgr.AllNeighbors() -} diff --git a/plugins/gossip/plugin.go b/plugins/gossip/plugin.go index 49c42e85c847a1e50d3c0d3b520f977e17a5bf20..9d457f8b5230f21cabfa5a4f50c4a3747baab830 100644 --- a/plugins/gossip/plugin.go +++ b/plugins/gossip/plugin.go @@ -10,7 +10,6 @@ import ( "github.com/iotaledger/hive.go/autopeering/selection" "github.com/iotaledger/hive.go/daemon" "github.com/iotaledger/hive.go/events" - "github.com/iotaledger/hive.go/logger" "github.com/iotaledger/hive.go/node" ) @@ -21,19 +20,10 @@ const PluginName = "Gossip" var Plugin = node.NewPlugin(PluginName, node.Enabled, configure, run) func configure(*node.Plugin) { - log = logger.NewLogger(PluginName) + // assure that the Manager is instantiated + mgr := Manager() - configureGossip() - configureEvents() -} - -func run(*node.Plugin) { - if err := daemon.BackgroundWorker(PluginName, start, shutdown.PriorityGossip); err != nil { - log.Errorf("Failed to start as daemon: %s", err) - } -} - -func configureEvents() { + // link to the auto peering selection.Events.Dropped.Attach(events.NewClosure(func(ev *selection.DroppedEvent) { go func() { if err := mgr.DropNeighbor(ev.DroppedID); err != nil { @@ -62,18 +52,19 @@ func configureEvents() { }() })) - gossip.Events.ConnectionFailed.Attach(events.NewClosure(func(p *peer.Peer, err error) { + // log neighbor changes + mgr.Events().ConnectionFailed.Attach(events.NewClosure(func(p *peer.Peer, err error) { log.Infof("Connection to neighbor %s / %s failed: %s", gossip.GetAddress(p), p.ID(), err) })) - gossip.Events.NeighborAdded.Attach(events.NewClosure(func(n *gossip.Neighbor) { + mgr.Events().NeighborAdded.Attach(events.NewClosure(func(n *gossip.Neighbor) { log.Infof("Neighbor added: %s / %s", gossip.GetAddress(n.Peer), n.ID()) })) - gossip.Events.NeighborRemoved.Attach(events.NewClosure(func(p *peer.Peer) { + mgr.Events().NeighborRemoved.Attach(events.NewClosure(func(p *peer.Peer) { log.Infof("Neighbor removed: %s / %s", gossip.GetAddress(p), p.ID()) })) // configure flow of incoming messages - gossip.Events.MessageReceived.Attach(events.NewClosure(func(event *gossip.MessageReceivedEvent) { + mgr.Events().MessageReceived.Attach(events.NewClosure(func(event *gossip.MessageReceivedEvent) { messagelayer.MessageParser.Parse(event.Data, event.Peer) })) @@ -90,3 +81,9 @@ func configureEvents() { mgr.RequestMessage(messageId[:]) })) } + +func run(*node.Plugin) { + if err := daemon.BackgroundWorker(PluginName, start, shutdown.PriorityGossip); err != nil { + log.Errorf("Failed to start as daemon: %s", err) + } +}