Skip to content
Snippets Groups Projects
Unverified Commit aba33d58 authored by Wolfgang Welz's avatar Wolfgang Welz Committed by GitHub
Browse files

Remove global gossip events (#364)

* assure that DropNeighbor is not leaking

* bind events to gossip plugin

* make the gossip manager a singleton

* add comment

* fix manager instantiation
parent ae9985e7
No related branches found
No related tags found
No related merge requests found
...@@ -5,8 +5,8 @@ import ( ...@@ -5,8 +5,8 @@ import (
"github.com/iotaledger/hive.go/events" "github.com/iotaledger/hive.go/events"
) )
// Events contains all the events related to the gossip protocol. // Events defines all the events related to the gossip protocol.
var Events = struct { type Events struct {
// Fired when an attempt to build a connection to a neighbor has failed. // Fired when an attempt to build a connection to a neighbor has failed.
ConnectionFailed *events.Event ConnectionFailed *events.Event
// Fired when a neighbor connection has been established. // Fired when a neighbor connection has been established.
...@@ -15,11 +15,6 @@ var Events = struct { ...@@ -15,11 +15,6 @@ var Events = struct {
NeighborRemoved *events.Event NeighborRemoved *events.Event
// Fired when a new message was received via the gossip protocol. // Fired when a new message was received via the gossip protocol.
MessageReceived *events.Event MessageReceived *events.Event
}{
ConnectionFailed: events.NewEvent(peerAndErrorCaller),
NeighborAdded: events.NewEvent(neighborCaller),
NeighborRemoved: events.NewEvent(peerCaller),
MessageReceived: events.NewEvent(messageReceived),
} }
// MessageReceivedEvent holds data about a message received event. // MessageReceivedEvent holds data about a message received event.
......
...@@ -12,7 +12,7 @@ import ( ...@@ -12,7 +12,7 @@ import (
"github.com/iotaledger/hive.go/autopeering/peer" "github.com/iotaledger/hive.go/autopeering/peer"
"github.com/iotaledger/hive.go/events" "github.com/iotaledger/hive.go/events"
"github.com/iotaledger/hive.go/identity" "github.com/iotaledger/hive.go/identity"
"go.uber.org/zap" "github.com/iotaledger/hive.go/logger"
) )
const ( const (
...@@ -26,7 +26,8 @@ type LoadMessageFunc func(messageId message.Id) ([]byte, error) ...@@ -26,7 +26,8 @@ type LoadMessageFunc func(messageId message.Id) ([]byte, error)
type Manager struct { type Manager struct {
local *peer.Local local *peer.Local
loadMessageFunc LoadMessageFunc loadMessageFunc LoadMessageFunc
log *zap.SugaredLogger log *logger.Logger
events Events
wg sync.WaitGroup wg sync.WaitGroup
...@@ -36,11 +37,17 @@ type Manager struct { ...@@ -36,11 +37,17 @@ type Manager struct {
} }
// NewManager creates a new Manager. // NewManager creates a new Manager.
func NewManager(local *peer.Local, f LoadMessageFunc, log *zap.SugaredLogger) *Manager { func NewManager(local *peer.Local, f LoadMessageFunc, log *logger.Logger) *Manager {
return &Manager{ return &Manager{
local: local, local: local,
loadMessageFunc: f, loadMessageFunc: f,
log: log, log: log,
events: Events{
ConnectionFailed: events.NewEvent(peerAndErrorCaller),
NeighborAdded: events.NewEvent(neighborCaller),
NeighborRemoved: events.NewEvent(peerCaller),
MessageReceived: events.NewEvent(messageReceived),
},
srv: nil, srv: nil,
neighbors: make(map[identity.ID]*Neighbor), neighbors: make(map[identity.ID]*Neighbor),
} }
...@@ -60,6 +67,11 @@ func (m *Manager) Close() { ...@@ -60,6 +67,11 @@ func (m *Manager) Close() {
m.wg.Wait() m.wg.Wait()
} }
// Events returns the events related to the gossip protocol.
func (m *Manager) Events() Events {
return m.events
}
func (m *Manager) stop() { func (m *Manager) stop() {
m.mu.Lock() m.mu.Lock()
defer m.mu.Unlock() defer m.mu.Unlock()
...@@ -176,13 +188,13 @@ func (m *Manager) send(b []byte, to ...identity.ID) { ...@@ -176,13 +188,13 @@ func (m *Manager) send(b []byte, to ...identity.ID) {
func (m *Manager) addNeighbor(peer *peer.Peer, connectorFunc func(*peer.Peer) (net.Conn, error)) error { func (m *Manager) addNeighbor(peer *peer.Peer, connectorFunc func(*peer.Peer) (net.Conn, error)) error {
conn, err := connectorFunc(peer) conn, err := connectorFunc(peer)
if err != nil { if err != nil {
Events.ConnectionFailed.Trigger(peer, err) m.events.ConnectionFailed.Trigger(peer, err)
return err return err
} }
if _, ok := m.neighbors[peer.ID()]; ok { if _, ok := m.neighbors[peer.ID()]; ok {
_ = conn.Close() _ = conn.Close()
Events.ConnectionFailed.Trigger(peer, ErrDuplicateNeighbor) m.events.ConnectionFailed.Trigger(peer, ErrDuplicateNeighbor)
return ErrDuplicateNeighbor return ErrDuplicateNeighbor
} }
...@@ -191,7 +203,7 @@ func (m *Manager) addNeighbor(peer *peer.Peer, connectorFunc func(*peer.Peer) (n ...@@ -191,7 +203,7 @@ func (m *Manager) addNeighbor(peer *peer.Peer, connectorFunc func(*peer.Peer) (n
n.Events.Close.Attach(events.NewClosure(func() { n.Events.Close.Attach(events.NewClosure(func() {
// assure that the neighbor is removed and notify // assure that the neighbor is removed and notify
_ = m.DropNeighbor(peer.ID()) _ = m.DropNeighbor(peer.ID())
Events.NeighborRemoved.Trigger(peer) m.events.NeighborRemoved.Trigger(peer)
})) }))
n.Events.ReceiveMessage.Attach(events.NewClosure(func(data []byte) { n.Events.ReceiveMessage.Attach(events.NewClosure(func(data []byte) {
if err := m.handlePacket(data, peer); err != nil { if err := m.handlePacket(data, peer); err != nil {
...@@ -201,7 +213,7 @@ func (m *Manager) addNeighbor(peer *peer.Peer, connectorFunc func(*peer.Peer) (n ...@@ -201,7 +213,7 @@ func (m *Manager) addNeighbor(peer *peer.Peer, connectorFunc func(*peer.Peer) (n
m.neighbors[peer.ID()] = n m.neighbors[peer.ID()] = n
n.Listen() n.Listen()
Events.NeighborAdded.Trigger(n) m.events.NeighborAdded.Trigger(n)
return nil return nil
} }
...@@ -220,7 +232,7 @@ func (m *Manager) handlePacket(data []byte, p *peer.Peer) error { ...@@ -220,7 +232,7 @@ func (m *Manager) handlePacket(data []byte, p *peer.Peer) error {
return fmt.Errorf("invalid packet: %w", err) return fmt.Errorf("invalid packet: %w", err)
} }
m.log.Debugw("received packet", "type", protoMsg.Type(), "peer-id", p.ID()) m.log.Debugw("received packet", "type", protoMsg.Type(), "peer-id", p.ID())
Events.MessageReceived.Trigger(&MessageReceivedEvent{Data: protoMsg.GetData(), Peer: p}) m.events.MessageReceived.Trigger(&MessageReceivedEvent{Data: protoMsg.GetData(), Peer: p})
case pb.PacketMessageRequest: case pb.PacketMessageRequest:
protoMsgReq := new(pb.MessageRequest) protoMsgReq := new(pb.MessageRequest)
......
...@@ -7,6 +7,9 @@ import ( ...@@ -7,6 +7,9 @@ import (
"time" "time"
"github.com/golang/protobuf/proto" "github.com/golang/protobuf/proto"
"github.com/iotaledger/goshimmer/packages/binary/messagelayer/message"
pb "github.com/iotaledger/goshimmer/packages/gossip/proto"
"github.com/iotaledger/goshimmer/packages/gossip/server"
"github.com/iotaledger/hive.go/autopeering/peer" "github.com/iotaledger/hive.go/autopeering/peer"
"github.com/iotaledger/hive.go/autopeering/peer/service" "github.com/iotaledger/hive.go/autopeering/peer/service"
"github.com/iotaledger/hive.go/database/mapdb" "github.com/iotaledger/hive.go/database/mapdb"
...@@ -15,10 +18,6 @@ import ( ...@@ -15,10 +18,6 @@ import (
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock" "github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/iotaledger/goshimmer/packages/binary/messagelayer/message"
pb "github.com/iotaledger/goshimmer/packages/gossip/proto"
"github.com/iotaledger/goshimmer/packages/gossip/server"
) )
const graceTime = 10 * time.Millisecond const graceTime = 10 * time.Millisecond
...@@ -31,30 +30,24 @@ var ( ...@@ -31,30 +30,24 @@ var (
func loadTestMessage(message.Id) ([]byte, error) { return testMessageData, nil } func loadTestMessage(message.Id) ([]byte, error) { return testMessageData, nil }
func TestClose(t *testing.T) { func TestClose(t *testing.T) {
_, detach := newEventMock(t) _, teardown, _ := newMockedManager(t, "A")
defer detach()
_, teardown, _ := newTestManager(t, "A")
teardown() teardown()
} }
func TestClosedConnection(t *testing.T) { func TestClosedConnection(t *testing.T) {
e, detach := newEventMock(t) mgrA, closeA, peerA := newMockedManager(t, "A")
defer detach()
mgrA, closeA, peerA := newTestManager(t, "A")
defer closeA() defer closeA()
mgrB, closeB, peerB := newTestManager(t, "B") mgrB, closeB, peerB := newMockedManager(t, "B")
defer closeB() defer closeB()
connections := 2
e.On("neighborAdded", mock.Anything).Times(connections)
var wg sync.WaitGroup var wg sync.WaitGroup
wg.Add(connections) wg.Add(2)
// connect in the following way // connect in the following way
// B -> A // B -> A
mgrA.On("neighborAdded", mock.Anything).Once()
mgrB.On("neighborAdded", mock.Anything).Once()
go func() { go func() {
defer wg.Done() defer wg.Done()
err := mgrA.AddInbound(peerB) err := mgrA.AddInbound(peerB)
...@@ -70,8 +63,8 @@ func TestClosedConnection(t *testing.T) { ...@@ -70,8 +63,8 @@ func TestClosedConnection(t *testing.T) {
// wait for the connections to establish // wait for the connections to establish
wg.Wait() wg.Wait()
e.On("neighborRemoved", peerA).Once() mgrA.On("neighborRemoved", peerB).Once()
e.On("neighborRemoved", peerB).Once() mgrB.On("neighborRemoved", peerA).Once()
// A drops B // A drops B
err := mgrA.DropNeighbor(peerB.ID()) err := mgrA.DropNeighbor(peerB.ID())
...@@ -79,24 +72,22 @@ func TestClosedConnection(t *testing.T) { ...@@ -79,24 +72,22 @@ func TestClosedConnection(t *testing.T) {
time.Sleep(graceTime) time.Sleep(graceTime)
// the events should be there even before we close // the events should be there even before we close
e.AssertExpectations(t) mgrA.AssertExpectations(t)
mgrB.AssertExpectations(t)
} }
func TestP2PSend(t *testing.T) { func TestP2PSend(t *testing.T) {
e, detach := newEventMock(t) mgrA, closeA, peerA := newMockedManager(t, "A")
defer detach() mgrB, closeB, peerB := newMockedManager(t, "B")
mgrA, closeA, peerA := newTestManager(t, "A")
mgrB, closeB, peerB := newTestManager(t, "B")
connections := 2
e.On("neighborAdded", mock.Anything).Times(connections)
var wg sync.WaitGroup var wg sync.WaitGroup
wg.Add(connections) wg.Add(2)
// connect in the following way // connect in the following way
// B -> A // B -> A
mgrA.On("neighborAdded", mock.Anything).Once()
mgrB.On("neighborAdded", mock.Anything).Once()
go func() { go func() {
defer wg.Done() defer wg.Done()
err := mgrA.AddInbound(peerB) err := mgrA.AddInbound(peerB)
...@@ -112,7 +103,7 @@ func TestP2PSend(t *testing.T) { ...@@ -112,7 +103,7 @@ func TestP2PSend(t *testing.T) {
// wait for the connections to establish // wait for the connections to establish
wg.Wait() wg.Wait()
e.On("messageReceived", &MessageReceivedEvent{ mgrB.On("messageReceived", &MessageReceivedEvent{
Data: testMessageData, Data: testMessageData,
Peer: peerA, Peer: peerA,
}).Once() }).Once()
...@@ -120,31 +111,30 @@ func TestP2PSend(t *testing.T) { ...@@ -120,31 +111,30 @@ func TestP2PSend(t *testing.T) {
mgrA.SendMessage(testMessageData) mgrA.SendMessage(testMessageData)
time.Sleep(graceTime) time.Sleep(graceTime)
e.On("neighborRemoved", peerA).Once() mgrA.On("neighborRemoved", peerB).Once()
e.On("neighborRemoved", peerB).Once() mgrB.On("neighborRemoved", peerA).Once()
closeA() closeA()
closeB() closeB()
time.Sleep(graceTime) time.Sleep(graceTime)
e.AssertExpectations(t) // the events should be there even before we close
mgrA.AssertExpectations(t)
mgrB.AssertExpectations(t)
} }
func TestP2PSendTwice(t *testing.T) { func TestP2PSendTwice(t *testing.T) {
e, detach := newEventMock(t) mgrA, closeA, peerA := newMockedManager(t, "A")
defer detach() mgrB, closeB, peerB := newMockedManager(t, "B")
mgrA, closeA, peerA := newTestManager(t, "A")
mgrB, closeB, peerB := newTestManager(t, "B")
connections := 2
e.On("neighborAdded", mock.Anything).Times(connections)
var wg sync.WaitGroup var wg sync.WaitGroup
wg.Add(connections) wg.Add(2)
// connect in the following way // connect in the following way
// B -> A // B -> A
mgrA.On("neighborAdded", mock.Anything).Once()
mgrB.On("neighborAdded", mock.Anything).Once()
go func() { go func() {
defer wg.Done() defer wg.Done()
err := mgrA.AddInbound(peerB) err := mgrA.AddInbound(peerB)
...@@ -160,7 +150,7 @@ func TestP2PSendTwice(t *testing.T) { ...@@ -160,7 +150,7 @@ func TestP2PSendTwice(t *testing.T) {
// wait for the connections to establish // wait for the connections to establish
wg.Wait() wg.Wait()
e.On("messageReceived", &MessageReceivedEvent{ mgrB.On("messageReceived", &MessageReceivedEvent{
Data: testMessageData, Data: testMessageData,
Peer: peerA, Peer: peerA,
}).Twice() }).Twice()
...@@ -170,32 +160,32 @@ func TestP2PSendTwice(t *testing.T) { ...@@ -170,32 +160,32 @@ func TestP2PSendTwice(t *testing.T) {
mgrA.SendMessage(testMessageData) mgrA.SendMessage(testMessageData)
time.Sleep(graceTime) time.Sleep(graceTime)
e.On("neighborRemoved", peerA).Once() mgrA.On("neighborRemoved", peerB).Once()
e.On("neighborRemoved", peerB).Once() mgrB.On("neighborRemoved", peerA).Once()
closeA() closeA()
closeB() closeB()
time.Sleep(graceTime) time.Sleep(graceTime)
e.AssertExpectations(t) // the events should be there even before we close
mgrA.AssertExpectations(t)
mgrB.AssertExpectations(t)
} }
func TestBroadcast(t *testing.T) { func TestBroadcast(t *testing.T) {
e, detach := newEventMock(t) mgrA, closeA, peerA := newMockedManager(t, "A")
defer detach() mgrB, closeB, peerB := newMockedManager(t, "B")
mgrC, closeC, peerC := newMockedManager(t, "C")
mgrA, closeA, peerA := newTestManager(t, "A")
mgrB, closeB, peerB := newTestManager(t, "B")
mgrC, closeC, peerC := newTestManager(t, "C")
connections := 4
e.On("neighborAdded", mock.Anything).Times(connections)
var wg sync.WaitGroup var wg sync.WaitGroup
wg.Add(connections) wg.Add(4)
// connect in the following way // connect in the following way
// B -> A <- C // B -> A <- C
mgrA.On("neighborAdded", mock.Anything).Twice()
mgrB.On("neighborAdded", mock.Anything).Once()
mgrC.On("neighborAdded", mock.Anything).Once()
go func() { go func() {
defer wg.Done() defer wg.Done()
err := mgrA.AddInbound(peerB) err := mgrA.AddInbound(peerB)
...@@ -221,42 +211,42 @@ func TestBroadcast(t *testing.T) { ...@@ -221,42 +211,42 @@ func TestBroadcast(t *testing.T) {
// wait for the connections to establish // wait for the connections to establish
wg.Wait() wg.Wait()
e.On("messageReceived", &MessageReceivedEvent{ event := &MessageReceivedEvent{Data: testMessageData, Peer: peerA}
Data: testMessageData, mgrB.On("messageReceived", event).Once()
Peer: peerA, mgrC.On("messageReceived", event).Once()
}).Twice()
mgrA.SendMessage(testMessageData) mgrA.SendMessage(testMessageData)
time.Sleep(graceTime) time.Sleep(graceTime)
e.On("neighborRemoved", peerA).Twice() mgrA.On("neighborRemoved", peerB).Once()
e.On("neighborRemoved", peerB).Once() mgrA.On("neighborRemoved", peerC).Once()
e.On("neighborRemoved", peerC).Once() mgrB.On("neighborRemoved", peerA).Once()
mgrC.On("neighborRemoved", peerA).Once()
closeA() closeA()
closeB() closeB()
closeC() closeC()
time.Sleep(graceTime) time.Sleep(graceTime)
e.AssertExpectations(t) mgrA.AssertExpectations(t)
mgrB.AssertExpectations(t)
mgrC.AssertExpectations(t)
} }
func TestSingleSend(t *testing.T) { func TestSingleSend(t *testing.T) {
e, detach := newEventMock(t) mgrA, closeA, peerA := newMockedManager(t, "A")
defer detach() mgrB, closeB, peerB := newMockedManager(t, "B")
mgrC, closeC, peerC := newMockedManager(t, "C")
mgrA, closeA, peerA := newTestManager(t, "A")
mgrB, closeB, peerB := newTestManager(t, "B")
mgrC, closeC, peerC := newTestManager(t, "C")
connections := 4
e.On("neighborAdded", mock.Anything).Times(connections)
var wg sync.WaitGroup var wg sync.WaitGroup
wg.Add(connections) wg.Add(4)
// connect in the following way // connect in the following way
// B -> A <- C // B -> A <- C
mgrA.On("neighborAdded", mock.Anything).Twice()
mgrB.On("neighborAdded", mock.Anything).Once()
mgrC.On("neighborAdded", mock.Anything).Once()
go func() { go func() {
defer wg.Done() defer wg.Done()
err := mgrA.AddInbound(peerB) err := mgrA.AddInbound(peerB)
...@@ -282,59 +272,55 @@ func TestSingleSend(t *testing.T) { ...@@ -282,59 +272,55 @@ func TestSingleSend(t *testing.T) {
// wait for the connections to establish // wait for the connections to establish
wg.Wait() wg.Wait()
e.On("messageReceived", &MessageReceivedEvent{ // only mgr should receive the message
Data: testMessageData, mgrB.On("messageReceived", &MessageReceivedEvent{Data: testMessageData, Peer: peerA}).Once()
Peer: peerA,
}).Once()
// A sends the message only to B // A sends the message only to B
mgrA.SendMessage(testMessageData, peerB.ID()) mgrA.SendMessage(testMessageData, peerB.ID())
time.Sleep(graceTime) time.Sleep(graceTime)
e.On("neighborRemoved", peerA).Twice() mgrA.On("neighborRemoved", peerB).Once()
e.On("neighborRemoved", peerB).Once() mgrA.On("neighborRemoved", peerC).Once()
e.On("neighborRemoved", peerC).Once() mgrB.On("neighborRemoved", peerA).Once()
mgrC.On("neighborRemoved", peerA).Once()
closeA() closeA()
closeB() closeB()
closeC() closeC()
time.Sleep(graceTime) time.Sleep(graceTime)
e.AssertExpectations(t) mgrA.AssertExpectations(t)
mgrB.AssertExpectations(t)
mgrC.AssertExpectations(t)
} }
func TestDropUnsuccessfulAccept(t *testing.T) { func TestDropUnsuccessfulAccept(t *testing.T) {
e, detach := newEventMock(t) mgrA, closeA, _ := newMockedManager(t, "A")
defer detach()
mgrA, closeA, _ := newTestManager(t, "A")
defer closeA() defer closeA()
_, closeB, peerB := newTestManager(t, "B") mgrB, closeB, peerB := newMockedManager(t, "B")
defer closeB() defer closeB()
e.On("connectionFailed", peerB, mock.Anything).Once() mgrA.On("connectionFailed", peerB, mock.Anything).Once()
err := mgrA.AddInbound(peerB) err := mgrA.AddInbound(peerB)
assert.Error(t, err) assert.Error(t, err)
e.AssertExpectations(t) mgrA.AssertExpectations(t)
mgrB.AssertExpectations(t)
} }
func TestMessageRequest(t *testing.T) { func TestMessageRequest(t *testing.T) {
e, detach := newEventMock(t) mgrA, closeA, peerA := newMockedManager(t, "A")
defer detach() mgrB, closeB, peerB := newMockedManager(t, "B")
mgrA, closeA, peerA := newTestManager(t, "A")
mgrB, closeB, peerB := newTestManager(t, "B")
connections := 2
e.On("neighborAdded", mock.Anything).Times(connections)
var wg sync.WaitGroup var wg sync.WaitGroup
wg.Add(connections) wg.Add(2)
// connect in the following way // connect in the following way
// B -> A // B -> A
mgrA.On("neighborAdded", mock.Anything).Once()
mgrB.On("neighborAdded", mock.Anything).Once()
go func() { go func() {
defer wg.Done() defer wg.Done()
err := mgrA.AddInbound(peerB) err := mgrA.AddInbound(peerB)
...@@ -352,24 +338,23 @@ func TestMessageRequest(t *testing.T) { ...@@ -352,24 +338,23 @@ func TestMessageRequest(t *testing.T) {
messageId := message.Id{} messageId := message.Id{}
e.On("messageReceived", &MessageReceivedEvent{ // mgrA should eventually receive the message
Data: testMessageData, mgrA.On("messageReceived", &MessageReceivedEvent{Data: testMessageData, Peer: peerB}).Once()
Peer: peerB,
}).Once()
b, err := proto.Marshal(&pb.MessageRequest{Id: messageId[:]}) b, err := proto.Marshal(&pb.MessageRequest{Id: messageId[:]})
require.NoError(t, err) require.NoError(t, err)
mgrA.RequestMessage(b) mgrA.RequestMessage(b)
time.Sleep(graceTime) time.Sleep(graceTime)
e.On("neighborRemoved", peerA).Once() mgrA.On("neighborRemoved", peerB).Once()
e.On("neighborRemoved", peerB).Once() mgrB.On("neighborRemoved", peerA).Once()
closeA() closeA()
closeB() closeB()
time.Sleep(graceTime) time.Sleep(graceTime)
e.AssertExpectations(t) mgrA.AssertExpectations(t)
mgrB.AssertExpectations(t)
} }
func TestDropNeighbor(t *testing.T) { func TestDropNeighbor(t *testing.T) {
...@@ -378,23 +363,35 @@ func TestDropNeighbor(t *testing.T) { ...@@ -378,23 +363,35 @@ func TestDropNeighbor(t *testing.T) {
mgrB, closeB, peerB := newTestManager(t, "B") mgrB, closeB, peerB := newTestManager(t, "B")
defer closeB() defer closeB()
// establish connection
connect := func() { connect := func() {
var wg sync.WaitGroup var wg sync.WaitGroup
closure := events.NewClosure(func(_ *Neighbor) { wg.Done() }) signal := events.NewClosure(func(_ *Neighbor) { wg.Done() })
// we are expecting two signals
wg.Add(2) wg.Add(2)
Events.NeighborAdded.Attach(closure)
defer Events.NeighborAdded.Detach(closure) // signal as soon as the neighbor is added
mgrA.Events().NeighborAdded.Attach(signal)
defer mgrA.Events().NeighborAdded.Detach(signal)
mgrB.Events().NeighborAdded.Attach(signal)
defer mgrB.Events().NeighborAdded.Detach(signal)
go func() { assert.NoError(t, mgrA.AddInbound(peerB)) }() go func() { assert.NoError(t, mgrA.AddInbound(peerB)) }()
go func() { assert.NoError(t, mgrB.AddOutbound(peerA)) }() go func() { assert.NoError(t, mgrB.AddOutbound(peerA)) }()
wg.Wait() // wait until the events were triggered and the peers are connected wg.Wait() // wait until the events were triggered and the peers are connected
} }
disc := func() { // close connection
disconnect := func() {
var wg sync.WaitGroup var wg sync.WaitGroup
closure := events.NewClosure(func(_ *peer.Peer) { wg.Done() }) signal := events.NewClosure(func(_ *peer.Peer) { wg.Done() })
// we are expecting two signals
wg.Add(2) wg.Add(2)
Events.NeighborRemoved.Attach(closure)
defer Events.NeighborRemoved.Detach(closure) // signal as soon as the neighbor is added
mgrA.Events().NeighborRemoved.Attach(signal)
defer mgrA.Events().NeighborRemoved.Detach(signal)
mgrB.Events().NeighborRemoved.Attach(signal)
defer mgrB.Events().NeighborRemoved.Detach(signal)
// assure that no DropNeighbor calls are leaking // assure that no DropNeighbor calls are leaking
wg.Add(2) wg.Add(2)
...@@ -414,7 +411,7 @@ func TestDropNeighbor(t *testing.T) { ...@@ -414,7 +411,7 @@ func TestDropNeighbor(t *testing.T) {
connect() connect()
assert.NotEmpty(t, mgrA.AllNeighbors()) assert.NotEmpty(t, mgrA.AllNeighbors())
assert.NotEmpty(t, mgrB.AllNeighbors()) assert.NotEmpty(t, mgrB.AllNeighbors())
disc() disconnect()
assert.Empty(t, mgrA.AllNeighbors()) assert.Empty(t, mgrA.AllNeighbors())
assert.Empty(t, mgrB.AllNeighbors()) assert.Empty(t, mgrB.AllNeighbors())
} }
...@@ -455,33 +452,29 @@ func newTestManager(t require.TestingT, name string) (*Manager, func(), *peer.Pe ...@@ -455,33 +452,29 @@ func newTestManager(t require.TestingT, name string) (*Manager, func(), *peer.Pe
return mgr, detach, local.Peer return mgr, detach, local.Peer
} }
func newEventMock(t mock.TestingT) (*eventMock, func()) { func newMockedManager(t *testing.T, name string) (*mockedManager, func(), *peer.Peer) {
e := &eventMock{} mgr, detach, p := newTestManager(t, name)
return mockManager(t, mgr), detach, p
}
func mockManager(t mock.TestingT, mgr *Manager) *mockedManager {
e := &mockedManager{Manager: mgr}
e.Test(t) e.Test(t)
connectionFailedC := events.NewClosure(e.connectionFailed) e.Events().ConnectionFailed.Attach(events.NewClosure(e.connectionFailed))
neighborAddedC := events.NewClosure(e.neighborAdded) e.Events().NeighborAdded.Attach(events.NewClosure(e.neighborAdded))
neighborRemoved := events.NewClosure(e.neighborRemoved) e.Events().NeighborRemoved.Attach(events.NewClosure(e.neighborRemoved))
messageReceivedC := events.NewClosure(e.messageReceived) e.Events().MessageReceived.Attach(events.NewClosure(e.messageReceived))
Events.ConnectionFailed.Attach(connectionFailedC) return e
Events.NeighborAdded.Attach(neighborAddedC)
Events.NeighborRemoved.Attach(neighborRemoved)
Events.MessageReceived.Attach(messageReceivedC)
return e, func() {
Events.ConnectionFailed.Detach(connectionFailedC)
Events.NeighborAdded.Detach(neighborAddedC)
Events.NeighborRemoved.Detach(neighborRemoved)
Events.MessageReceived.Detach(messageReceivedC)
}
} }
type eventMock struct { type mockedManager struct {
mock.Mock mock.Mock
*Manager
} }
func (e *eventMock) connectionFailed(p *peer.Peer, err error) { e.Called(p, err) } func (e *mockedManager) connectionFailed(p *peer.Peer, err error) { e.Called(p, err) }
func (e *eventMock) neighborAdded(n *Neighbor) { e.Called(n) } func (e *mockedManager) neighborAdded(n *Neighbor) { e.Called(n) }
func (e *eventMock) neighborRemoved(p *peer.Peer) { e.Called(p) } func (e *mockedManager) neighborRemoved(p *peer.Peer) { e.Called(p) }
func (e *eventMock) messageReceived(ev *MessageReceivedEvent) { e.Called(ev) } func (e *mockedManager) messageReceived(ev *MessageReceivedEvent) { e.Called(ev) }
...@@ -3,8 +3,8 @@ package autopeering ...@@ -3,8 +3,8 @@ package autopeering
import ( import (
"time" "time"
"github.com/iotaledger/goshimmer/packages/gossip"
"github.com/iotaledger/goshimmer/packages/shutdown" "github.com/iotaledger/goshimmer/packages/shutdown"
"github.com/iotaledger/goshimmer/plugins/gossip"
"github.com/iotaledger/hive.go/autopeering/discover" "github.com/iotaledger/hive.go/autopeering/discover"
"github.com/iotaledger/hive.go/autopeering/peer" "github.com/iotaledger/hive.go/autopeering/peer"
"github.com/iotaledger/hive.go/autopeering/selection" "github.com/iotaledger/hive.go/autopeering/selection"
...@@ -37,10 +37,10 @@ func run(*node.Plugin) { ...@@ -37,10 +37,10 @@ func run(*node.Plugin) {
func configureEvents() { func configureEvents() {
// notify the selection when a connection is closed or failed. // notify the selection when a connection is closed or failed.
gossip.Events.ConnectionFailed.Attach(events.NewClosure(func(p *peer.Peer, _ error) { gossip.Manager().Events().ConnectionFailed.Attach(events.NewClosure(func(p *peer.Peer, _ error) {
Selection.RemoveNeighbor(p.ID()) Selection.RemoveNeighbor(p.ID())
})) }))
gossip.Events.NeighborRemoved.Attach(events.NewClosure(func(p *peer.Peer) { gossip.Manager().Events().NeighborRemoved.Attach(events.NewClosure(func(p *peer.Peer) {
Selection.RemoveNeighbor(p.ID()) Selection.RemoveNeighbor(p.ID())
})) }))
......
...@@ -198,7 +198,7 @@ func neighborMetrics() []neighbormetric { ...@@ -198,7 +198,7 @@ func neighborMetrics() []neighbormetric {
stats := []neighbormetric{} stats := []neighbormetric{}
// gossip plugin might be disabled // gossip plugin might be disabled
neighbors := gossip.Neighbors() neighbors := gossip.Manager().AllNeighbors()
if neighbors == nil { if neighbors == nil {
return stats return stats
} }
......
...@@ -4,9 +4,10 @@ import ( ...@@ -4,9 +4,10 @@ import (
"fmt" "fmt"
"net" "net"
"strconv" "strconv"
"sync"
"github.com/iotaledger/goshimmer/packages/binary/messagelayer/message" "github.com/iotaledger/goshimmer/packages/binary/messagelayer/message"
gp "github.com/iotaledger/goshimmer/packages/gossip" "github.com/iotaledger/goshimmer/packages/gossip"
"github.com/iotaledger/goshimmer/packages/gossip/server" "github.com/iotaledger/goshimmer/packages/gossip/server"
"github.com/iotaledger/goshimmer/plugins/autopeering/local" "github.com/iotaledger/goshimmer/plugins/autopeering/local"
"github.com/iotaledger/goshimmer/plugins/config" "github.com/iotaledger/goshimmer/plugins/config"
...@@ -18,11 +19,18 @@ import ( ...@@ -18,11 +19,18 @@ import (
var ( var (
log *logger.Logger log *logger.Logger
mgr *gp.Manager mgr *gossip.Manager
srv *server.TCP mgrOnce sync.Once
) )
func configureGossip() { // Manager returns the manager instance of the gossip plugin.
func Manager() *gossip.Manager {
mgrOnce.Do(createManager)
return mgr
}
func createManager() {
log = logger.NewLogger(PluginName)
lPeer := local.GetInstance() lPeer := local.GetInstance()
// announce the gossip service // announce the gossip service
...@@ -34,7 +42,7 @@ func configureGossip() { ...@@ -34,7 +42,7 @@ func configureGossip() {
if err := lPeer.UpdateService(service.GossipKey, "tcp", gossipPort); err != nil { if err := lPeer.UpdateService(service.GossipKey, "tcp", gossipPort); err != nil {
log.Fatalf("could not update services: %s", err) log.Fatalf("could not update services: %s", err)
} }
mgr = gp.NewManager(lPeer, loadMessage, log) mgr = gossip.NewManager(lPeer, loadMessage, log)
} }
func start(shutdownSignal <-chan struct{}) { func start(shutdownSignal <-chan struct{}) {
...@@ -58,7 +66,7 @@ func start(shutdownSignal <-chan struct{}) { ...@@ -58,7 +66,7 @@ func start(shutdownSignal <-chan struct{}) {
} }
defer listener.Close() defer listener.Close()
srv = server.ServeTCP(lPeer, listener, log) srv := server.ServeTCP(lPeer, listener, log)
defer srv.Close() defer srv.Close()
mgr.Start(srv) mgr.Start(srv)
...@@ -80,11 +88,3 @@ func loadMessage(messageID message.Id) (bytes []byte, err error) { ...@@ -80,11 +88,3 @@ func loadMessage(messageID message.Id) (bytes []byte, err error) {
} }
return return
} }
// Neighbors returns the list of the neighbors.
func Neighbors() []*gp.Neighbor {
if mgr == nil {
return nil
}
return mgr.AllNeighbors()
}
...@@ -10,7 +10,6 @@ import ( ...@@ -10,7 +10,6 @@ import (
"github.com/iotaledger/hive.go/autopeering/selection" "github.com/iotaledger/hive.go/autopeering/selection"
"github.com/iotaledger/hive.go/daemon" "github.com/iotaledger/hive.go/daemon"
"github.com/iotaledger/hive.go/events" "github.com/iotaledger/hive.go/events"
"github.com/iotaledger/hive.go/logger"
"github.com/iotaledger/hive.go/node" "github.com/iotaledger/hive.go/node"
) )
...@@ -21,19 +20,10 @@ const PluginName = "Gossip" ...@@ -21,19 +20,10 @@ const PluginName = "Gossip"
var Plugin = node.NewPlugin(PluginName, node.Enabled, configure, run) var Plugin = node.NewPlugin(PluginName, node.Enabled, configure, run)
func configure(*node.Plugin) { func configure(*node.Plugin) {
log = logger.NewLogger(PluginName) // assure that the Manager is instantiated
mgr := Manager()
configureGossip() // link to the auto peering
configureEvents()
}
func run(*node.Plugin) {
if err := daemon.BackgroundWorker(PluginName, start, shutdown.PriorityGossip); err != nil {
log.Errorf("Failed to start as daemon: %s", err)
}
}
func configureEvents() {
selection.Events.Dropped.Attach(events.NewClosure(func(ev *selection.DroppedEvent) { selection.Events.Dropped.Attach(events.NewClosure(func(ev *selection.DroppedEvent) {
go func() { go func() {
if err := mgr.DropNeighbor(ev.DroppedID); err != nil { if err := mgr.DropNeighbor(ev.DroppedID); err != nil {
...@@ -62,18 +52,19 @@ func configureEvents() { ...@@ -62,18 +52,19 @@ func configureEvents() {
}() }()
})) }))
gossip.Events.ConnectionFailed.Attach(events.NewClosure(func(p *peer.Peer, err error) { // log neighbor changes
mgr.Events().ConnectionFailed.Attach(events.NewClosure(func(p *peer.Peer, err error) {
log.Infof("Connection to neighbor %s / %s failed: %s", gossip.GetAddress(p), p.ID(), err) log.Infof("Connection to neighbor %s / %s failed: %s", gossip.GetAddress(p), p.ID(), err)
})) }))
gossip.Events.NeighborAdded.Attach(events.NewClosure(func(n *gossip.Neighbor) { mgr.Events().NeighborAdded.Attach(events.NewClosure(func(n *gossip.Neighbor) {
log.Infof("Neighbor added: %s / %s", gossip.GetAddress(n.Peer), n.ID()) log.Infof("Neighbor added: %s / %s", gossip.GetAddress(n.Peer), n.ID())
})) }))
gossip.Events.NeighborRemoved.Attach(events.NewClosure(func(p *peer.Peer) { mgr.Events().NeighborRemoved.Attach(events.NewClosure(func(p *peer.Peer) {
log.Infof("Neighbor removed: %s / %s", gossip.GetAddress(p), p.ID()) log.Infof("Neighbor removed: %s / %s", gossip.GetAddress(p), p.ID())
})) }))
// configure flow of incoming messages // configure flow of incoming messages
gossip.Events.MessageReceived.Attach(events.NewClosure(func(event *gossip.MessageReceivedEvent) { mgr.Events().MessageReceived.Attach(events.NewClosure(func(event *gossip.MessageReceivedEvent) {
messagelayer.MessageParser.Parse(event.Data, event.Peer) messagelayer.MessageParser.Parse(event.Data, event.Peer)
})) }))
...@@ -90,3 +81,9 @@ func configureEvents() { ...@@ -90,3 +81,9 @@ func configureEvents() {
mgr.RequestMessage(messageId[:]) mgr.RequestMessage(messageId[:])
})) }))
} }
func run(*node.Plugin) {
if err := daemon.BackgroundWorker(PluginName, start, shutdown.PriorityGossip); err != nil {
log.Errorf("Failed to start as daemon: %s", err)
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment