diff --git a/packages/gossip/errors.go b/packages/gossip/errors.go index 554e38618c97aff2cda1b3c97cc3cbd805a6888d..3c3228e695892fc05e8bac127dc42b0f87a713af 100644 --- a/packages/gossip/errors.go +++ b/packages/gossip/errors.go @@ -3,8 +3,7 @@ package gossip import "errors" var ( - ErrNotStarted = errors.New("manager not started") - ErrClosed = errors.New("manager closed") + ErrNotRunning = errors.New("manager not running") ErrNotANeighbor = errors.New("peer is not a neighbor") ErrLoopback = errors.New("loopback connection not allowed") ErrDuplicateNeighbor = errors.New("peer already connected") diff --git a/packages/gossip/manager.go b/packages/gossip/manager.go index e35fb747c6f56231c13cba71fa394583f69317a7..04886f2bd4d76703352b780f32abac3cf75eac36 100644 --- a/packages/gossip/manager.go +++ b/packages/gossip/manager.go @@ -1,35 +1,28 @@ package gossip import ( - "errors" "fmt" "net" "sync" "github.com/golang/protobuf/proto" - "go.uber.org/zap" - - "github.com/iotaledger/hive.go/autopeering/peer" - "github.com/iotaledger/hive.go/events" - "github.com/iotaledger/hive.go/identity" - "github.com/iotaledger/goshimmer/packages/binary/messagelayer/message" pb "github.com/iotaledger/goshimmer/packages/gossip/proto" "github.com/iotaledger/goshimmer/packages/gossip/server" + "github.com/iotaledger/hive.go/autopeering/peer" + "github.com/iotaledger/hive.go/events" + "github.com/iotaledger/hive.go/identity" + "go.uber.org/zap" ) const ( maxPacketSize = 2048 ) -var ( - ErrNeighborManagerNotRunning = errors.New("neighbor manager is not running") - ErrNeighborAlreadyConnected = errors.New("neighbor is already connected") -) - // LoadMessageFunc defines a function that returns the message for the given id. type LoadMessageFunc func(messageId message.Id) ([]byte, error) +// The Manager handles the connected neighbors. type Manager struct { local *peer.Local loadMessageFunc LoadMessageFunc @@ -37,12 +30,12 @@ type Manager struct { wg sync.WaitGroup - mu sync.RWMutex + mu sync.Mutex srv *server.TCP neighbors map[identity.ID]*Neighbor - running bool } +// NewManager creates a new Manager. func NewManager(local *peer.Local, f LoadMessageFunc, log *zap.SugaredLogger) *Manager { return &Manager{ local: local, @@ -50,16 +43,15 @@ func NewManager(local *peer.Local, f LoadMessageFunc, log *zap.SugaredLogger) *M log: log, srv: nil, neighbors: make(map[identity.ID]*Neighbor), - running: false, } } +// Start starts the manager for the given TCP server. func (m *Manager) Start(srv *server.TCP) { m.mu.Lock() defer m.mu.Unlock() m.srv = srv - m.running = true } // Close stops the manager and closes all established connections. @@ -72,7 +64,7 @@ func (m *Manager) stop() { m.mu.Lock() defer m.mu.Unlock() - m.running = false + m.srv = nil // close all neighbor connections for _, nbr := range m.neighbors { @@ -82,47 +74,44 @@ func (m *Manager) stop() { // AddOutbound tries to add a neighbor by connecting to that peer. func (m *Manager) AddOutbound(p *peer.Peer) error { + m.mu.Lock() + defer m.mu.Unlock() + if p.ID() == m.local.ID() { return ErrLoopback } - var srv *server.TCP - m.mu.RLock() if m.srv == nil { - m.mu.RUnlock() - return ErrNotStarted + return ErrNotRunning } - srv = m.srv - m.mu.RUnlock() - - return m.addNeighbor(p, srv.DialPeer) + return m.addNeighbor(p, m.srv.DialPeer) } // AddInbound tries to add a neighbor by accepting an incoming connection from that peer. func (m *Manager) AddInbound(p *peer.Peer) error { + m.mu.Lock() + defer m.mu.Unlock() + if p.ID() == m.local.ID() { return ErrLoopback } - var srv *server.TCP - m.mu.RLock() if m.srv == nil { - m.mu.RUnlock() - return ErrNotStarted + return ErrNotRunning } - srv = m.srv - m.mu.RUnlock() - - return m.addNeighbor(p, srv.AcceptPeer) + return m.addNeighbor(p, m.srv.AcceptPeer) } -// NeighborRemoved disconnects the neighbor with the given ID. +// DropNeighbor disconnects the neighbor with the given ID. func (m *Manager) DropNeighbor(id identity.ID) error { - n, err := m.removeNeighbor(id) - if err != nil { - return err + m.mu.Lock() + defer m.mu.Unlock() + + if _, ok := m.neighbors[id]; !ok { + return ErrNotANeighbor } - err = n.Close() - Events.NeighborRemoved.Trigger(n.Peer) - return err + n := m.neighbors[id] + delete(m.neighbors, id) + + return n.Close() } // RequestMessage requests the message with the given id from the neighbors. @@ -141,9 +130,11 @@ func (m *Manager) SendMessage(msgData []byte, to ...identity.ID) { m.send(marshal(msg), to...) } -func (m *Manager) GetAllNeighbors() []*Neighbor { - m.mu.RLock() - defer m.mu.RUnlock() +// AllNeighbors returns all the neighbors that are currently connected. +func (m *Manager) AllNeighbors() []*Neighbor { + m.mu.Lock() + defer m.mu.Unlock() + result := make([]*Neighbor, 0, len(m.neighbors)) for _, n := range m.neighbors { result = append(result, n) @@ -155,14 +146,15 @@ func (m *Manager) getNeighbors(ids ...identity.ID) []*Neighbor { if len(ids) > 0 { return m.getNeighborsById(ids) } - return m.GetAllNeighbors() + return m.AllNeighbors() } func (m *Manager) getNeighborsById(ids []identity.ID) []*Neighbor { result := make([]*Neighbor, 0, len(ids)) - m.mu.RLock() - defer m.mu.RUnlock() + m.mu.Lock() + defer m.mu.Unlock() + for _, id := range ids { if n, ok := m.neighbors[id]; ok { result = append(result, n) @@ -188,22 +180,19 @@ func (m *Manager) addNeighbor(peer *peer.Peer, connectorFunc func(*peer.Peer) (n return err } - m.mu.Lock() - defer m.mu.Unlock() - if !m.running { - _ = conn.Close() - Events.ConnectionFailed.Trigger(peer, ErrNeighborManagerNotRunning) - return ErrClosed - } if _, ok := m.neighbors[peer.ID()]; ok { _ = conn.Close() - Events.ConnectionFailed.Trigger(peer, ErrNeighborAlreadyConnected) + Events.ConnectionFailed.Trigger(peer, ErrDuplicateNeighbor) return ErrDuplicateNeighbor } // create and add the neighbor n := NewNeighbor(peer, conn, m.log) - n.Events.Close.Attach(events.NewClosure(func() { _ = m.DropNeighbor(peer.ID()) })) + n.Events.Close.Attach(events.NewClosure(func() { + // assure that the neighbor is removed and notify + _ = m.DropNeighbor(peer.ID()) + Events.NeighborRemoved.Trigger(peer) + })) n.Events.ReceiveMessage.Attach(events.NewClosure(func(data []byte) { if err := m.handlePacket(data, peer); err != nil { m.log.Debugw("error handling packet", "err", err) @@ -217,17 +206,6 @@ func (m *Manager) addNeighbor(peer *peer.Peer, connectorFunc func(*peer.Peer) (n return nil } -func (m *Manager) removeNeighbor(id identity.ID) (*Neighbor, error) { - m.mu.Lock() - defer m.mu.Unlock() - if _, ok := m.neighbors[id]; !ok { - return nil, ErrNotANeighbor - } - n := m.neighbors[id] - delete(m.neighbors, id) - return n, nil -} - func (m *Manager) handlePacket(data []byte, p *peer.Peer) error { // ignore empty packages if len(data) == 0 { diff --git a/packages/gossip/manager_test.go b/packages/gossip/manager_test.go index 3c9ede2bfc10f7ab889968e77394984908ee3938..4988650699bc850d9f3f82224b5378b06a4f1d3a 100644 --- a/packages/gossip/manager_test.go +++ b/packages/gossip/manager_test.go @@ -372,6 +372,46 @@ func TestMessageRequest(t *testing.T) { e.AssertExpectations(t) } +func TestDropNeighbor(t *testing.T) { + mgrA, closeA, peerA := newTestManager(t, "A") + defer closeA() + mgrB, closeB, peerB := newTestManager(t, "B") + defer closeB() + + connect := func() { + var wg sync.WaitGroup + closure := events.NewClosure(func(_ *Neighbor) { wg.Done() }) + wg.Add(2) + Events.NeighborAdded.Attach(closure) + defer Events.NeighborAdded.Detach(closure) + + go func() { assert.NoError(t, mgrA.AddInbound(peerB)) }() + go func() { assert.NoError(t, mgrB.AddOutbound(peerA)) }() + wg.Wait() // wait until the events were triggered + } + disc := func() { + var wg sync.WaitGroup + closure := events.NewClosure(func(_ *peer.Peer) { wg.Done() }) + wg.Add(2) + Events.NeighborRemoved.Attach(closure) + defer Events.NeighborRemoved.Detach(closure) + + go func() { _ = mgrA.DropNeighbor(peerB.ID()) }() + go func() { _ = mgrB.DropNeighbor(peerA.ID()) }() + wg.Wait() // wait until the events were triggered + } + + // drop and connect many many times + for i := 0; i < 100; i++ { + connect() + assert.NotEmpty(t, mgrA.AllNeighbors()) + assert.NotEmpty(t, mgrB.AllNeighbors()) + disc() + assert.Empty(t, mgrA.AllNeighbors()) + assert.Empty(t, mgrB.AllNeighbors()) + } +} + func newTestDB(t require.TestingT) *peer.DB { db, err := peer.NewDB(mapdb.NewMapDB()) require.NoError(t, err) diff --git a/plugins/gossip/gossip.go b/plugins/gossip/gossip.go index 1cbc1852ffa8c6449847c19d30003c4e8af88ff2..6255d8e33a7e1ffff6f2aa26eb296a8a590d03dc 100644 --- a/plugins/gossip/gossip.go +++ b/plugins/gossip/gossip.go @@ -86,5 +86,5 @@ func GetAllNeighbors() []*gp.Neighbor { if mgr == nil { return nil } - return mgr.GetAllNeighbors() + return mgr.AllNeighbors() }