Skip to content
Snippets Groups Projects
Unverified Commit 02e044d0 authored by Wolfgang Welz's avatar Wolfgang Welz Committed by GitHub
Browse files

Fix: Gossip race condition (#347)


* trigger event while locked

* add dropNeighbor test

* Fix name

Co-Authored-By: default avatarLuca Moser <moser.luca@gmail.com>

* Rename AllNeighbors

* fix linter warnings

Co-authored-by: default avatarLuca Moser <moser.luca@gmail.com>
parent 08b51ce5
No related branches found
No related tags found
No related merge requests found
......@@ -3,8 +3,7 @@ package gossip
import "errors"
var (
ErrNotStarted = errors.New("manager not started")
ErrClosed = errors.New("manager closed")
ErrNotRunning = errors.New("manager not running")
ErrNotANeighbor = errors.New("peer is not a neighbor")
ErrLoopback = errors.New("loopback connection not allowed")
ErrDuplicateNeighbor = errors.New("peer already connected")
......
package gossip
import (
"errors"
"fmt"
"net"
"sync"
"github.com/golang/protobuf/proto"
"go.uber.org/zap"
"github.com/iotaledger/hive.go/autopeering/peer"
"github.com/iotaledger/hive.go/events"
"github.com/iotaledger/hive.go/identity"
"github.com/iotaledger/goshimmer/packages/binary/messagelayer/message"
pb "github.com/iotaledger/goshimmer/packages/gossip/proto"
"github.com/iotaledger/goshimmer/packages/gossip/server"
"github.com/iotaledger/hive.go/autopeering/peer"
"github.com/iotaledger/hive.go/events"
"github.com/iotaledger/hive.go/identity"
"go.uber.org/zap"
)
const (
maxPacketSize = 2048
)
var (
ErrNeighborManagerNotRunning = errors.New("neighbor manager is not running")
ErrNeighborAlreadyConnected = errors.New("neighbor is already connected")
)
// LoadMessageFunc defines a function that returns the message for the given id.
type LoadMessageFunc func(messageId message.Id) ([]byte, error)
// The Manager handles the connected neighbors.
type Manager struct {
local *peer.Local
loadMessageFunc LoadMessageFunc
......@@ -37,12 +30,12 @@ type Manager struct {
wg sync.WaitGroup
mu sync.RWMutex
mu sync.Mutex
srv *server.TCP
neighbors map[identity.ID]*Neighbor
running bool
}
// NewManager creates a new Manager.
func NewManager(local *peer.Local, f LoadMessageFunc, log *zap.SugaredLogger) *Manager {
return &Manager{
local: local,
......@@ -50,16 +43,15 @@ func NewManager(local *peer.Local, f LoadMessageFunc, log *zap.SugaredLogger) *M
log: log,
srv: nil,
neighbors: make(map[identity.ID]*Neighbor),
running: false,
}
}
// Start starts the manager for the given TCP server.
func (m *Manager) Start(srv *server.TCP) {
m.mu.Lock()
defer m.mu.Unlock()
m.srv = srv
m.running = true
}
// Close stops the manager and closes all established connections.
......@@ -72,7 +64,7 @@ func (m *Manager) stop() {
m.mu.Lock()
defer m.mu.Unlock()
m.running = false
m.srv = nil
// close all neighbor connections
for _, nbr := range m.neighbors {
......@@ -82,47 +74,44 @@ func (m *Manager) stop() {
// AddOutbound tries to add a neighbor by connecting to that peer.
func (m *Manager) AddOutbound(p *peer.Peer) error {
m.mu.Lock()
defer m.mu.Unlock()
if p.ID() == m.local.ID() {
return ErrLoopback
}
var srv *server.TCP
m.mu.RLock()
if m.srv == nil {
m.mu.RUnlock()
return ErrNotStarted
return ErrNotRunning
}
srv = m.srv
m.mu.RUnlock()
return m.addNeighbor(p, srv.DialPeer)
return m.addNeighbor(p, m.srv.DialPeer)
}
// AddInbound tries to add a neighbor by accepting an incoming connection from that peer.
func (m *Manager) AddInbound(p *peer.Peer) error {
m.mu.Lock()
defer m.mu.Unlock()
if p.ID() == m.local.ID() {
return ErrLoopback
}
var srv *server.TCP
m.mu.RLock()
if m.srv == nil {
m.mu.RUnlock()
return ErrNotStarted
return ErrNotRunning
}
srv = m.srv
m.mu.RUnlock()
return m.addNeighbor(p, srv.AcceptPeer)
return m.addNeighbor(p, m.srv.AcceptPeer)
}
// NeighborRemoved disconnects the neighbor with the given ID.
// DropNeighbor disconnects the neighbor with the given ID.
func (m *Manager) DropNeighbor(id identity.ID) error {
n, err := m.removeNeighbor(id)
if err != nil {
return err
m.mu.Lock()
defer m.mu.Unlock()
if _, ok := m.neighbors[id]; !ok {
return ErrNotANeighbor
}
err = n.Close()
Events.NeighborRemoved.Trigger(n.Peer)
return err
n := m.neighbors[id]
delete(m.neighbors, id)
return n.Close()
}
// RequestMessage requests the message with the given id from the neighbors.
......@@ -141,9 +130,11 @@ func (m *Manager) SendMessage(msgData []byte, to ...identity.ID) {
m.send(marshal(msg), to...)
}
func (m *Manager) GetAllNeighbors() []*Neighbor {
m.mu.RLock()
defer m.mu.RUnlock()
// AllNeighbors returns all the neighbors that are currently connected.
func (m *Manager) AllNeighbors() []*Neighbor {
m.mu.Lock()
defer m.mu.Unlock()
result := make([]*Neighbor, 0, len(m.neighbors))
for _, n := range m.neighbors {
result = append(result, n)
......@@ -155,14 +146,15 @@ func (m *Manager) getNeighbors(ids ...identity.ID) []*Neighbor {
if len(ids) > 0 {
return m.getNeighborsById(ids)
}
return m.GetAllNeighbors()
return m.AllNeighbors()
}
func (m *Manager) getNeighborsById(ids []identity.ID) []*Neighbor {
result := make([]*Neighbor, 0, len(ids))
m.mu.RLock()
defer m.mu.RUnlock()
m.mu.Lock()
defer m.mu.Unlock()
for _, id := range ids {
if n, ok := m.neighbors[id]; ok {
result = append(result, n)
......@@ -188,22 +180,19 @@ func (m *Manager) addNeighbor(peer *peer.Peer, connectorFunc func(*peer.Peer) (n
return err
}
m.mu.Lock()
defer m.mu.Unlock()
if !m.running {
_ = conn.Close()
Events.ConnectionFailed.Trigger(peer, ErrNeighborManagerNotRunning)
return ErrClosed
}
if _, ok := m.neighbors[peer.ID()]; ok {
_ = conn.Close()
Events.ConnectionFailed.Trigger(peer, ErrNeighborAlreadyConnected)
Events.ConnectionFailed.Trigger(peer, ErrDuplicateNeighbor)
return ErrDuplicateNeighbor
}
// create and add the neighbor
n := NewNeighbor(peer, conn, m.log)
n.Events.Close.Attach(events.NewClosure(func() { _ = m.DropNeighbor(peer.ID()) }))
n.Events.Close.Attach(events.NewClosure(func() {
// assure that the neighbor is removed and notify
_ = m.DropNeighbor(peer.ID())
Events.NeighborRemoved.Trigger(peer)
}))
n.Events.ReceiveMessage.Attach(events.NewClosure(func(data []byte) {
if err := m.handlePacket(data, peer); err != nil {
m.log.Debugw("error handling packet", "err", err)
......@@ -217,17 +206,6 @@ func (m *Manager) addNeighbor(peer *peer.Peer, connectorFunc func(*peer.Peer) (n
return nil
}
func (m *Manager) removeNeighbor(id identity.ID) (*Neighbor, error) {
m.mu.Lock()
defer m.mu.Unlock()
if _, ok := m.neighbors[id]; !ok {
return nil, ErrNotANeighbor
}
n := m.neighbors[id]
delete(m.neighbors, id)
return n, nil
}
func (m *Manager) handlePacket(data []byte, p *peer.Peer) error {
// ignore empty packages
if len(data) == 0 {
......
......@@ -372,6 +372,46 @@ func TestMessageRequest(t *testing.T) {
e.AssertExpectations(t)
}
func TestDropNeighbor(t *testing.T) {
mgrA, closeA, peerA := newTestManager(t, "A")
defer closeA()
mgrB, closeB, peerB := newTestManager(t, "B")
defer closeB()
connect := func() {
var wg sync.WaitGroup
closure := events.NewClosure(func(_ *Neighbor) { wg.Done() })
wg.Add(2)
Events.NeighborAdded.Attach(closure)
defer Events.NeighborAdded.Detach(closure)
go func() { assert.NoError(t, mgrA.AddInbound(peerB)) }()
go func() { assert.NoError(t, mgrB.AddOutbound(peerA)) }()
wg.Wait() // wait until the events were triggered
}
disc := func() {
var wg sync.WaitGroup
closure := events.NewClosure(func(_ *peer.Peer) { wg.Done() })
wg.Add(2)
Events.NeighborRemoved.Attach(closure)
defer Events.NeighborRemoved.Detach(closure)
go func() { _ = mgrA.DropNeighbor(peerB.ID()) }()
go func() { _ = mgrB.DropNeighbor(peerA.ID()) }()
wg.Wait() // wait until the events were triggered
}
// drop and connect many many times
for i := 0; i < 100; i++ {
connect()
assert.NotEmpty(t, mgrA.AllNeighbors())
assert.NotEmpty(t, mgrB.AllNeighbors())
disc()
assert.Empty(t, mgrA.AllNeighbors())
assert.Empty(t, mgrB.AllNeighbors())
}
}
func newTestDB(t require.TestingT) *peer.DB {
db, err := peer.NewDB(mapdb.NewMapDB())
require.NoError(t, err)
......
......@@ -86,5 +86,5 @@ func GetAllNeighbors() []*gp.Neighbor {
if mgr == nil {
return nil
}
return mgr.GetAllNeighbors()
return mgr.AllNeighbors()
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment