From 002ee26f163ed1ad52edae395fcecb3dedbf0c72 Mon Sep 17 00:00:00 2001 From: Wolfgang Welz <welzwo@gmail.com> Date: Thu, 16 Jan 2020 12:42:36 +0100 Subject: [PATCH] Feat: Improve peer selection logic to avoid parallel connections (#101) * Log failed connection attempts * Only trigger drop event, when a neighbor was removed * Remove unused dropNeighbors * Use only one loop to process channels in selection * Update out neighbors less often * feat: introduce SaltUpdated event * fix: use global parameters * Update salt log messages * fix: do not close public channels to prevent panics during shutdown * fix: assure correct order of events * feat: add selection event tests * Set update interval to 1s and full updat interval to 1m * Use subtests for the protocol tests * fix: send peering drop message only when event is triggered * Update packages/autopeering/selection/manager.go Co-Authored-By: jkrvivian <jkrvivian@gmail.com> Co-authored-by: jkrvivian <jkrvivian@gmail.com> --- packages/autopeering/discover/common.go | 74 ++- packages/autopeering/discover/manager.go | 40 +- packages/autopeering/discover/manager_test.go | 2 +- packages/autopeering/discover/protocol.go | 8 +- .../autopeering/discover/protocol_test.go | 11 +- packages/autopeering/peer/mapdb.go | 6 +- packages/autopeering/peer/mapdb_test.go | 35 +- packages/autopeering/peer/peerdb.go | 6 +- packages/autopeering/salt/salt.go | 2 +- packages/autopeering/selection/common.go | 79 ++- packages/autopeering/selection/events.go | 19 +- packages/autopeering/selection/manager.go | 537 ++++++++---------- .../autopeering/selection/manager_test.go | 239 +++++--- .../autopeering/selection/neighborhood.go | 124 ++-- .../selection/neighborhood_test.go | 70 +-- packages/autopeering/selection/protocol.go | 31 +- .../autopeering/selection/protocol_test.go | 165 +++--- .../autopeering/selection/selection_test.go | 16 +- packages/autopeering/server/server.go | 6 +- plugins/autopeering/autopeering.go | 7 +- plugins/autopeering/plugin.go | 9 +- plugins/gossip/plugin.go | 3 + 22 files changed, 816 insertions(+), 673 deletions(-) diff --git a/packages/autopeering/discover/common.go b/packages/autopeering/discover/common.go index 7249d1be..826ad2cd 100644 --- a/packages/autopeering/discover/common.go +++ b/packages/autopeering/discover/common.go @@ -4,47 +4,35 @@ import ( "time" "github.com/iotaledger/goshimmer/packages/autopeering/peer" - "go.uber.org/zap" + "github.com/iotaledger/hive.go/logger" ) +// Default values for the global parameters const ( - // PingExpiration is the time until a peer verification expires. - PingExpiration = 12 * time.Hour - // MaxPeersInResponse is the maximum number of peers returned in DiscoveryResponse. - MaxPeersInResponse = 6 - // MaxServices is the maximum number of services a peer can support. - MaxServices = 5 - - // VersionNum specifies the expected version number for this Protocol. - VersionNum = 0 + DefaultReverifyInterval = 10 * time.Second + DefaultReverifyTries = 2 + DefaultQueryInterval = 60 * time.Second + DefaultMaxManaged = 1000 + DefaultMaxReplacements = 10 +) + +var ( + reverifyInterval = DefaultReverifyInterval // time interval after which the next peer is reverified + reverifyTries = DefaultReverifyTries // number of times a peer is pinged before it is removed + queryInterval = DefaultQueryInterval // time interval after which peers are queried for new peers + maxManaged = DefaultMaxManaged // maximum number of peers that can be managed + maxReplacements = DefaultMaxReplacements // maximum number of peers kept in the replacement list ) // Config holds discovery related settings. type Config struct { // These settings are required and configure the listener: - Log *zap.SugaredLogger + Log *logger.Logger // These settings are optional: MasterPeers []*peer.Peer // list of master peers used for bootstrapping - Param *Parameters // parameters } -// default parameter values -const ( - // DefaultReverifyInterval is the default time interval after which a new peer is reverified. - DefaultReverifyInterval = 10 * time.Second - // DefaultReverifyTries is the default number of times a peer is pinged before it is removed. - DefaultReverifyTries = 2 - - // DefaultQueryInterval is the default time interval after which known peers are queried for new peers. - DefaultQueryInterval = 60 * time.Second - - // DefaultMaxManaged is the default maximum number of peers that can be managed. - DefaultMaxManaged = 1000 - // DefaultMaxReplacements is the default maximum number of peers kept in the replacement list. - DefaultMaxReplacements = 10 -) - // Parameters holds the parameters that can be configured. type Parameters struct { ReverifyInterval time.Duration // time interval after which the next peer is reverified @@ -53,3 +41,33 @@ type Parameters struct { MaxManaged int // maximum number of peers that can be managed MaxReplacements int // maximum number of peers kept in the replacement list } + +// SetParameters sets the global parameters for this package. +// This function cannot be used concurrently. +func SetParameter(param Parameters) { + if param.ReverifyInterval > 0 { + reverifyInterval = param.ReverifyInterval + } else { + reverifyInterval = DefaultReverifyInterval + } + if param.ReverifyTries > 0 { + reverifyTries = param.ReverifyTries + } else { + reverifyTries = DefaultReverifyTries + } + if param.QueryInterval > 0 { + queryInterval = param.QueryInterval + } else { + queryInterval = DefaultQueryInterval + } + if param.MaxManaged > 0 { + maxManaged = param.MaxManaged + } else { + maxManaged = DefaultMaxManaged + } + if param.MaxReplacements > 0 { + maxReplacements = param.MaxReplacements + } else { + maxReplacements = DefaultMaxReplacements + } +} diff --git a/packages/autopeering/discover/manager.go b/packages/autopeering/discover/manager.go index d7697aed..055dd9bb 100644 --- a/packages/autopeering/discover/manager.go +++ b/packages/autopeering/discover/manager.go @@ -7,15 +7,19 @@ import ( "github.com/iotaledger/goshimmer/packages/autopeering/peer" "github.com/iotaledger/goshimmer/packages/autopeering/server" - "go.uber.org/zap" + "github.com/iotaledger/hive.go/logger" ) -var ( - reverifyInterval = DefaultReverifyInterval // time interval after which the next peer is reverified - reverifyTries = DefaultReverifyTries // number of times a peer is pinged before it is removed - queryInterval = DefaultQueryInterval // time interval after which peers are queried for new peers - maxManaged = DefaultMaxManaged // maximum number of peers that can be managed - maxReplacements = DefaultMaxReplacements // maximum number of peers kept in the replacement list +const ( + // PingExpiration is the time until a peer verification expires. + PingExpiration = 12 * time.Hour + // MaxPeersInResponse is the maximum number of peers returned in DiscoveryResponse. + MaxPeersInResponse = 6 + // MaxServices is the maximum number of services a peer can support. + MaxServices = 5 + + // VersionNum specifies the expected version number for this Protocol. + VersionNum = 0 ) type network interface { @@ -31,31 +35,13 @@ type manager struct { replacements []*mpeer net network - log *zap.SugaredLogger + log *logger.Logger wg sync.WaitGroup closing chan struct{} } -func newManager(net network, masters []*peer.Peer, log *zap.SugaredLogger, param *Parameters) *manager { - if param != nil { - if param.ReverifyInterval > 0 { - reverifyInterval = param.ReverifyInterval - } - if param.ReverifyTries > 0 { - reverifyTries = param.ReverifyTries - } - if param.QueryInterval > 0 { - queryInterval = param.QueryInterval - } - if param.MaxManaged > 0 { - maxManaged = param.MaxManaged - } - if param.MaxReplacements > 0 { - maxReplacements = param.MaxReplacements - } - } - +func newManager(net network, masters []*peer.Peer, log *logger.Logger) *manager { m := &manager{ active: make([]*mpeer, 0, maxManaged), replacements: make([]*mpeer, 0, maxReplacements), diff --git a/packages/autopeering/discover/manager_test.go b/packages/autopeering/discover/manager_test.go index ddd25565..fba880e4 100644 --- a/packages/autopeering/discover/manager_test.go +++ b/packages/autopeering/discover/manager_test.go @@ -49,7 +49,7 @@ func newDummyPeer(name string) *peer.Peer { func newTestManager() (*manager, *NetworkMock, func()) { networkMock := newNetworkMock() - mgr := newManager(networkMock, nil, log, nil) + mgr := newManager(networkMock, nil, log) teardown := func() { mgr.close() } diff --git a/packages/autopeering/discover/protocol.go b/packages/autopeering/discover/protocol.go index 7202c66d..242016f7 100644 --- a/packages/autopeering/discover/protocol.go +++ b/packages/autopeering/discover/protocol.go @@ -11,8 +11,8 @@ import ( peerpb "github.com/iotaledger/goshimmer/packages/autopeering/peer/proto" "github.com/iotaledger/goshimmer/packages/autopeering/peer/service" "github.com/iotaledger/goshimmer/packages/autopeering/server" + "github.com/iotaledger/hive.go/logger" "github.com/pkg/errors" - "go.uber.org/zap" ) // The Protocol handles the peer discovery. @@ -20,8 +20,8 @@ import ( type Protocol struct { server.Protocol - loc *peer.Local // local peer that runs the protocol - log *zap.SugaredLogger // logging + loc *peer.Local // local peer that runs the protocol + log *logger.Logger // logging mgr *manager // the manager handles the actual peer discovery and re-verification closeOnce sync.Once @@ -34,7 +34,7 @@ func New(local *peer.Local, cfg Config) *Protocol { loc: local, log: cfg.Log, } - p.mgr = newManager(p, cfg.MasterPeers, cfg.Log.Named("mgr"), cfg.Param) + p.mgr = newManager(p, cfg.MasterPeers, cfg.Log.Named("mgr")) return p } diff --git a/packages/autopeering/discover/protocol_test.go b/packages/autopeering/discover/protocol_test.go index 7f6432f7..223c2c3e 100644 --- a/packages/autopeering/discover/protocol_test.go +++ b/packages/autopeering/discover/protocol_test.go @@ -20,10 +20,13 @@ var log = logger.NewExampleLogger("discover") func init() { // decrease parameters to simplify and speed up tests - reverifyInterval = 500 * time.Millisecond - queryInterval = 1000 * time.Millisecond - maxManaged = 10 - maxReplacements = 2 + SetParameter(Parameters{ + ReverifyInterval: 500 * time.Millisecond, + ReverifyTries: 1, + QueryInterval: 1000 * time.Millisecond, + MaxManaged: 10, + MaxReplacements: 2, + }) } // newTest creates a new discovery server and also returns the teardown. diff --git a/packages/autopeering/peer/mapdb.go b/packages/autopeering/peer/mapdb.go index 5baa569d..03719614 100644 --- a/packages/autopeering/peer/mapdb.go +++ b/packages/autopeering/peer/mapdb.go @@ -5,7 +5,7 @@ import ( "time" "github.com/iotaledger/goshimmer/packages/autopeering/peer/service" - "go.uber.org/zap" + "github.com/iotaledger/hive.go/logger" ) // mapDB is a simple implementation of DB using a map. @@ -15,7 +15,7 @@ type mapDB struct { key PrivateKey services *service.Record - log *zap.SugaredLogger + log *logger.Logger wg sync.WaitGroup closeOnce sync.Once @@ -32,7 +32,7 @@ type peerPropEntry struct { } // NewMemoryDB creates a new DB that uses a GO map. -func NewMemoryDB(log *zap.SugaredLogger) DB { +func NewMemoryDB(log *logger.Logger) DB { db := &mapDB{ m: make(map[string]peerEntry), services: service.New(), diff --git a/packages/autopeering/peer/mapdb_test.go b/packages/autopeering/peer/mapdb_test.go index 516d7351..5c7580b7 100644 --- a/packages/autopeering/peer/mapdb_test.go +++ b/packages/autopeering/peer/mapdb_test.go @@ -2,50 +2,41 @@ package peer import ( "crypto/ed25519" - "log" "testing" "time" + "github.com/iotaledger/hive.go/logger" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "go.uber.org/zap" ) -var logger *zap.SugaredLogger - -func init() { - l, err := zap.NewDevelopment() - if err != nil { - log.Fatalf("cannot initialize logger: %v", err) - } - logger = l.Sugar() -} +var log = logger.NewExampleLogger("peer") func TestMapDBPing(t *testing.T) { p := newTestPeer() - db := NewMemoryDB(logger) + db := NewMemoryDB(log) - time := time.Now() - err := db.UpdateLastPing(p.ID(), p.Address(), time) + now := time.Now() + err := db.UpdateLastPing(p.ID(), p.Address(), now) require.NoError(t, err) - assert.Equal(t, time.Unix(), db.LastPing(p.ID(), p.Address()).Unix()) + assert.Equal(t, now.Unix(), db.LastPing(p.ID(), p.Address()).Unix()) } func TestMapDBPong(t *testing.T) { p := newTestPeer() - db := NewMemoryDB(logger) + db := NewMemoryDB(log) - time := time.Now() - err := db.UpdateLastPong(p.ID(), p.Address(), time) + now := time.Now() + err := db.UpdateLastPong(p.ID(), p.Address(), now) require.NoError(t, err) - assert.Equal(t, time.Unix(), db.LastPong(p.ID(), p.Address()).Unix()) + assert.Equal(t, now.Unix(), db.LastPong(p.ID(), p.Address()).Unix()) } func TestMapDBPeer(t *testing.T) { p := newTestPeer() - db := NewMemoryDB(logger) + db := NewMemoryDB(log) err := db.UpdatePeer(p) require.NoError(t, err) @@ -55,7 +46,7 @@ func TestMapDBPeer(t *testing.T) { func TestMapDBSeedPeers(t *testing.T) { p := newTestPeer() - db := NewMemoryDB(logger) + db := NewMemoryDB(log) require.NoError(t, db.UpdatePeer(p)) require.NoError(t, db.UpdateLastPong(p.ID(), p.Address(), time.Now())) @@ -65,7 +56,7 @@ func TestMapDBSeedPeers(t *testing.T) { } func TestMapDBLocal(t *testing.T) { - db := NewMemoryDB(logger) + db := NewMemoryDB(log) l1, err := NewLocal(testNetwork, testAddress, db) require.NoError(t, err) diff --git a/packages/autopeering/peer/peerdb.go b/packages/autopeering/peer/peerdb.go index 3947f3af..b5cfcdfb 100644 --- a/packages/autopeering/peer/peerdb.go +++ b/packages/autopeering/peer/peerdb.go @@ -9,7 +9,7 @@ import ( "github.com/iotaledger/goshimmer/packages/autopeering/peer/service" "github.com/iotaledger/goshimmer/packages/database" - "go.uber.org/zap" + "github.com/iotaledger/hive.go/logger" ) const ( @@ -57,7 +57,7 @@ type DB interface { type persistentDB struct { db database.Database - log *zap.SugaredLogger + log *logger.Logger closeOnce sync.Once } @@ -77,7 +77,7 @@ const ( ) // NewPersistentDB creates a new persistent DB. -func NewPersistentDB(log *zap.SugaredLogger) DB { +func NewPersistentDB(log *logger.Logger) DB { db, err := database.Get("peer") if err != nil { panic(err) diff --git a/packages/autopeering/salt/salt.go b/packages/autopeering/salt/salt.go index 917f5a91..07afb2ea 100644 --- a/packages/autopeering/salt/salt.go +++ b/packages/autopeering/salt/salt.go @@ -37,7 +37,7 @@ func NewSalt(lifetime time.Duration) (salt *Salt, err error) { func (s *Salt) GetBytes() []byte { s.mutex.RLock() defer s.mutex.RUnlock() - return s.bytes + return append([]byte{}, s.bytes...) } func (s *Salt) GetExpiration() time.Time { diff --git a/packages/autopeering/selection/common.go b/packages/autopeering/selection/common.go index e31c8416..679de027 100644 --- a/packages/autopeering/selection/common.go +++ b/packages/autopeering/selection/common.go @@ -4,38 +4,71 @@ import ( "time" "github.com/iotaledger/goshimmer/packages/autopeering/peer/service" - "go.uber.org/zap" + "github.com/iotaledger/hive.go/logger" +) + +// Default values for the global parameters +const ( + DefaultInboundNeighborSize = 4 + DefaultOutboundNeighborSize = 4 + DefaultSaltLifetime = 30 * time.Minute + DefaultOutboundUpdateInterval = 1 * time.Second + DefaultFullOutboundUpdateInterval = 1 * time.Minute +) + +var ( + inboundNeighborSize = DefaultInboundNeighborSize // number of inbound neighbors + outboundNeighborSize = DefaultOutboundNeighborSize // number of outbound neighbors + saltLifetime = DefaultSaltLifetime // lifetime of the private and public local salt + outboundUpdateInterval = DefaultOutboundUpdateInterval // time after which out neighbors are updated + fullOutboundUpdateInterval = DefaultFullOutboundUpdateInterval // time after which full out neighbors are updated ) // Config holds settings for the peer selection. type Config struct { // Logger - Log *zap.SugaredLogger + Log *logger.Logger // These settings are optional: - Param *Parameters // parameters + DropOnUpdate bool // set true to drop all neighbors when the salt is updated + RequiredServices []service.Key // services required in order to select/be selected during autopeering } -// default parameter values -const ( - // DefaultInboundNeighborSize is the default number of inbound neighbors. - DefaultInboundNeighborSize = 4 - // DefaultOutboundNeighborSize is the default number of outbound neighbors. - DefaultOutboundNeighborSize = 4 - - // DefaultSaltLifetime is the default lifetime of the private and public local salt. - DefaultSaltLifetime = 30 * time.Minute - - // DefaultUpdateOutboundInterval is the default time interval after which the outbound neighbors are checked. - DefaultUpdateOutboundInterval = 200 * time.Millisecond -) - // Parameters holds the parameters that can be configured. type Parameters struct { - InboundNeighborSize int // number of inbound neighbors - OutboundNeighborSize int // number of outbound neighbors - SaltLifetime time.Duration // lifetime of the private and public local salt - UpdateOutboundInterval time.Duration // time interval after which the outbound neighbors are checked - DropNeighborsOnUpdate bool // set true to drop all neighbors when the distance is updated - RequiredService []service.Key // services required in order to select/be selected during autopeering + InboundNeighborSize int // number of inbound neighbors + OutboundNeighborSize int // number of outbound neighbors + SaltLifetime time.Duration // lifetime of the private and public local salt + OutboundUpdateInterval time.Duration // time interval after which the outbound neighbors are checked + FullOutboundUpdateInterval time.Duration // time after which the full outbound neighbors are updated +} + +// SetParameters sets the global parameters for this package. +// This function cannot be used concurrently. +func SetParameters(param Parameters) { + if param.InboundNeighborSize > 0 { + inboundNeighborSize = param.InboundNeighborSize + } else { + inboundNeighborSize = DefaultInboundNeighborSize + } + if param.OutboundNeighborSize > 0 { + outboundNeighborSize = param.OutboundNeighborSize + } else { + outboundNeighborSize = DefaultOutboundNeighborSize + } + if param.SaltLifetime > 0 { + saltLifetime = param.SaltLifetime + } else { + saltLifetime = DefaultSaltLifetime + } + if param.OutboundUpdateInterval > 0 { + outboundUpdateInterval = param.OutboundUpdateInterval + } else { + outboundUpdateInterval = DefaultOutboundUpdateInterval + } + if param.FullOutboundUpdateInterval > 0 { + fullOutboundUpdateInterval = param.FullOutboundUpdateInterval + } else { + fullOutboundUpdateInterval = DefaultFullOutboundUpdateInterval + } } diff --git a/packages/autopeering/selection/events.go b/packages/autopeering/selection/events.go index 9a312b2f..bc927e91 100644 --- a/packages/autopeering/selection/events.go +++ b/packages/autopeering/selection/events.go @@ -2,25 +2,34 @@ package selection import ( "github.com/iotaledger/goshimmer/packages/autopeering/peer" + "github.com/iotaledger/goshimmer/packages/autopeering/salt" "github.com/iotaledger/hive.go/events" ) // Events contains all the events that are triggered during the neighbor selection. var Events = struct { + // A SaltUpdated event is triggered, when the private and public salt were updated. + SaltUpdated *events.Event // An OutgoingPeering event is triggered, when a valid response of PeeringRequest has been received. OutgoingPeering *events.Event // An IncomingPeering event is triggered, when a valid PeerRequest has been received. IncomingPeering *events.Event - - // A Dropped event is triggered, when a neigbhor is dropped or when a drop message is received. + // A Dropped event is triggered, when a neighbor is dropped or when a drop message is received. Dropped *events.Event }{ + SaltUpdated: events.NewEvent(saltUpdatedCaller), OutgoingPeering: events.NewEvent(peeringCaller), IncomingPeering: events.NewEvent(peeringCaller), Dropped: events.NewEvent(droppedCaller), } -// PeeringEvent bundles the information sent in a peering event. +// SaltUpdatedEvent bundles the information sent in the SaltUpdated event. +type SaltUpdatedEvent struct { + Self peer.ID // ID of the peer triggering the event. + Public, Private *salt.Salt // the updated salt +} + +// PeeringEvent bundles the information sent in the OutgoingPeering and IncomingPeering event. type PeeringEvent struct { Self peer.ID // ID of the peer triggering the event. Peer *peer.Peer // peering partner @@ -33,6 +42,10 @@ type DroppedEvent struct { DroppedID peer.ID // ID of the peer that gets dropped. } +func saltUpdatedCaller(handler interface{}, params ...interface{}) { + handler.(func(*SaltUpdatedEvent))(params[0].(*SaltUpdatedEvent)) +} + func peeringCaller(handler interface{}, params ...interface{}) { handler.(func(*PeeringEvent))(params[0].(*PeeringEvent)) } diff --git a/packages/autopeering/selection/manager.go b/packages/autopeering/selection/manager.go index c30ef6ae..0ac30f10 100644 --- a/packages/autopeering/selection/manager.go +++ b/packages/autopeering/selection/manager.go @@ -1,14 +1,13 @@ package selection import ( - "math/rand" "sync" "time" "github.com/iotaledger/goshimmer/packages/autopeering/peer" "github.com/iotaledger/goshimmer/packages/autopeering/peer/service" "github.com/iotaledger/goshimmer/packages/autopeering/salt" - "go.uber.org/zap" + "github.com/iotaledger/hive.go/logger" ) const ( @@ -16,20 +15,7 @@ const ( reject = false // buffer size of the channels handling inbound requests and drops. - queueSize = 100 -) - -var ( - // number of neighbors - inboundNeighborSize = DefaultInboundNeighborSize - outboundNeighborSize = DefaultOutboundNeighborSize - - // lifetime of the private and public local salt - saltLifetime = DefaultSaltLifetime - // time interval after which the outbound neighbors are checked - updateOutboundInterval = DefaultUpdateOutboundInterval - - dropNeighborsOnUpdate bool // whether all neighbors are dropped on distance update + queueSize = 10 ) // A network represents the communication layer for the manager. @@ -37,7 +23,7 @@ type network interface { local() *peer.Local RequestPeering(*peer.Peer, *salt.Salt) (bool, error) - DropPeer(*peer.Peer) + SendPeeringDrop(*peer.Peer) } type peeringRequest struct { @@ -46,367 +32,342 @@ type peeringRequest struct { } type manager struct { - net network - peersFunc func() []*peer.Peer - - log *zap.SugaredLogger - dropNeighbors bool + net network + getPeersToConnect func() []*peer.Peer + log *logger.Logger + dropOnUpdate bool // set true to drop all neighbors when the salt is updated + requiredServices []service.Key // services required in order to select/be selected during autopeering inbound *Neighborhood outbound *Neighborhood - inboundRequestChan chan peeringRequest - inboundReplyChan chan bool - inboundDropChan chan peer.ID - outboundDropChan chan peer.ID - rejectionFilter *Filter - wg sync.WaitGroup - inboundClosing chan struct{} - outboundClosing chan struct{} + dropChan chan peer.ID + requestChan chan peeringRequest + replyChan chan bool - requiredService []service.Key + wg sync.WaitGroup + closing chan struct{} } -func newManager(net network, peersFunc func() []*peer.Peer, log *zap.SugaredLogger, param *Parameters) *manager { - var requiredService []service.Key - - if param != nil { - if param.InboundNeighborSize > 0 { - inboundNeighborSize = param.InboundNeighborSize - } - if param.OutboundNeighborSize > 0 { - outboundNeighborSize = param.OutboundNeighborSize - } - if param.SaltLifetime > 0 { - saltLifetime = param.SaltLifetime - } - if param.UpdateOutboundInterval > 0 { - updateOutboundInterval = param.UpdateOutboundInterval - } - requiredService = param.RequiredService - dropNeighborsOnUpdate = param.DropNeighborsOnUpdate - } - +func newManager(net network, peersFunc func() []*peer.Peer, log *logger.Logger, config Config) *manager { return &manager{ - net: net, - peersFunc: peersFunc, - log: log, - dropNeighbors: dropNeighborsOnUpdate, - inboundClosing: make(chan struct{}), - outboundClosing: make(chan struct{}), - rejectionFilter: NewFilter(), - inboundRequestChan: make(chan peeringRequest, queueSize), - inboundReplyChan: make(chan bool), - inboundDropChan: make(chan peer.ID, queueSize), - outboundDropChan: make(chan peer.ID, queueSize), - inbound: &Neighborhood{ - Neighbors: []peer.PeerDistance{}, - Size: inboundNeighborSize}, - outbound: &Neighborhood{ - Neighbors: []peer.PeerDistance{}, - Size: outboundNeighborSize}, - requiredService: requiredService, + net: net, + getPeersToConnect: peersFunc, + log: log, + dropOnUpdate: config.DropOnUpdate, + requiredServices: config.RequiredServices, + inbound: NewNeighborhood(inboundNeighborSize), + outbound: NewNeighborhood(outboundNeighborSize), + rejectionFilter: NewFilter(), + dropChan: make(chan peer.ID, queueSize), + requestChan: make(chan peeringRequest, queueSize), + replyChan: make(chan bool, 1), + closing: make(chan struct{}), } } func (m *manager) start() { - // create valid salts - if m.net.local().GetPublicSalt() == nil || m.net.local().GetPrivateSalt() == nil { + if m.getPublicSalt() == nil || m.getPrivateSalt() == nil { m.updateSalt() } - - m.wg.Add(2) - go m.loopOutbound() - go m.loopInbound() + m.wg.Add(1) + go m.loop() } func (m *manager) close() { - close(m.inboundClosing) - close(m.outboundClosing) + close(m.closing) m.wg.Wait() } -func (m *manager) self() peer.ID { +func (m *manager) getID() peer.ID { return m.net.local().ID() } -func (m *manager) loopOutbound() { - defer m.wg.Done() - - var ( - updateOutboundDone chan struct{} - updateOutbound = time.NewTimer(0) // setting this to 0 will cause a trigger right away - backoff = 10 - ) - defer updateOutbound.Stop() +func (m *manager) getPublicSalt() *salt.Salt { + return m.net.local().GetPublicSalt() +} -Loop: - for { - select { - case <-updateOutbound.C: - // if there is no updateOutbound, this means doUpdateOutbound is not running - if updateOutboundDone == nil { - // check salt and update if necessary (this will drop the whole neighborhood) - if m.net.local().GetPublicSalt().Expired() { - m.updateSalt() - } +func (m *manager) getPrivateSalt() *salt.Salt { + return m.net.local().GetPrivateSalt() +} - //remove potential duplicates - dup := m.getDuplicates() - for _, peerToDrop := range dup { - toDrop := m.inbound.GetPeerFromID(peerToDrop) - time.Sleep(time.Duration(rand.Intn(backoff)) * time.Millisecond) - m.outbound.RemovePeer(peerToDrop) - m.inbound.RemovePeer(peerToDrop) - if toDrop != nil { - m.dropPeer(toDrop) - } - } +func (m *manager) getNeighbors() []*peer.Peer { + var neighbors []*peer.Peer + neighbors = append(neighbors, m.inbound.GetPeers()...) + neighbors = append(neighbors, m.outbound.GetPeers()...) - updateOutboundDone = make(chan struct{}) - go m.updateOutbound(updateOutboundDone) - } + return neighbors +} - case <-updateOutboundDone: - updateOutboundDone = nil - updateOutbound.Reset(updateOutboundInterval) // updateOutbound again after the given interval +func (m *manager) getInNeighbors() []*peer.Peer { + return m.inbound.GetPeers() +} - case peerToDrop := <-m.outboundDropChan: - if containsPeer(m.outbound.GetPeers(), peerToDrop) { - m.outbound.RemovePeer(peerToDrop) - m.rejectionFilter.AddPeer(peerToDrop) - m.log.Debug("Outbound Dropped BY ", peerToDrop, " (", len(m.outbound.GetPeers()), ",", len(m.inbound.GetPeers()), ")") - } +func (m *manager) getOutNeighbors() []*peer.Peer { + return m.outbound.GetPeers() +} - // on close, exit the loop - case <-m.outboundClosing: - break Loop - } +func (m *manager) requestPeering(p *peer.Peer, s *salt.Salt) bool { + var status bool + select { + case m.requestChan <- peeringRequest{p, s}: + status = <-m.replyChan + default: + // a full queue should count as a failed request + status = false } + return status +} - // wait for the updateOutbound to finish - if updateOutboundDone != nil { - <-updateOutboundDone - } +func (m *manager) removeNeighbor(id peer.ID) { + m.dropChan <- id } -func (m *manager) loopInbound() { +func (m *manager) loop() { defer m.wg.Done() + var updateOutResultChan chan peer.PeerDistance + updateTimer := time.NewTimer(0) // setting this to 0 will cause a trigger right away + defer updateTimer.Stop() + Loop: for { select { - case req := <-m.inboundRequestChan: - m.updateInbound(req.peer, req.salt) - case peerToDrop := <-m.inboundDropChan: - if containsPeer(m.inbound.GetPeers(), peerToDrop) { - m.inbound.RemovePeer(peerToDrop) - m.log.Debug("Inbound Dropped BY ", peerToDrop, " (", len(m.outbound.GetPeers()), ",", len(m.inbound.GetPeers()), ")") + // update the outbound neighbors + case <-updateTimer.C: + updateOutResultChan = make(chan peer.PeerDistance) + // check salt and update if necessary + if m.getPublicSalt().Expired() { + m.updateSalt() } + // check for new peers to connect to in a separate go routine + go m.updateOutbound(updateOutResultChan) + + // handle the result of updateOutbound + case req := <-updateOutResultChan: + if req.Remote != nil { + // if the peer is already in inbound, do not add it and remove it from inbound + if p := m.inbound.RemovePeer(req.Remote.ID()); p != nil { + m.triggerPeeringEvent(true, req.Remote, false) + m.dropPeering(p) + } else { + m.addNeighbor(m.outbound, req) + m.triggerPeeringEvent(true, req.Remote, true) + } + } + // call updateOutbound again after the given interval + updateOutResultChan = nil + updateTimer.Reset(m.getUpdateTimeout()) + + // handle a drop request + case id := <-m.dropChan: + droppedPeer := m.inbound.RemovePeer(id) + if p := m.outbound.RemovePeer(id); p != nil { + droppedPeer = p + m.rejectionFilter.AddPeer(id) + // if not yet updating, trigger an immediate update + if updateOutResultChan == nil && updateTimer.Stop() { + updateTimer.Reset(0) + } + } + if droppedPeer != nil { + m.dropPeering(droppedPeer) + } + + // handle an inbound request + case req := <-m.requestChan: + status := m.handleInRequest(req) + // trigger in the main loop to guarantee order of events + m.triggerPeeringEvent(false, req.peer, status) // on close, exit the loop - case <-m.inboundClosing: + case <-m.closing: break Loop } } + + // wait for the updateOutbound to finish + if updateOutResultChan != nil { + <-updateOutResultChan + } +} + +func (m *manager) getUpdateTimeout() time.Duration { + result := outboundUpdateInterval + if m.outbound.IsFull() { + result = fullOutboundUpdateInterval + } + saltExpiration := time.Until(m.getPublicSalt().GetExpiration()) + if saltExpiration < result { + result = saltExpiration + } + return result } // updateOutbound updates outbound neighbors. -func (m *manager) updateOutbound(done chan<- struct{}) { - defer func() { - done <- struct{}{} - }() // always signal, when the function returns +func (m *manager) updateOutbound(resultChan chan<- peer.PeerDistance) { + var result peer.PeerDistance + defer func() { resultChan <- result }() // assure that a result is always sent to the channel // sort verified peers by distance - distList := peer.SortBySalt(m.self().Bytes(), m.net.local().GetPublicSalt().GetBytes(), m.peersFunc()) - - filter := NewFilter() - filter.AddPeer(m.self()) //set filter for ourself - filter.AddPeers(m.inbound.GetPeers()) // set filter for inbound neighbors - filter.AddPeers(m.outbound.GetPeers()) // set filter for outbound neighbors - - filteredList := filter.Apply(distList) // filter out current neighbors - filteredList = m.rejectionFilter.Apply(filteredList) // filter out previous rejection + distList := peer.SortBySalt(m.getID().Bytes(), m.getPublicSalt().GetBytes(), m.getPeersToConnect()) + + // filter out current neighbors + filter := m.getConnectedFilter() + filteredList := filter.Apply(distList) + // filter out previous rejections + filteredList = m.rejectionFilter.Apply(filteredList) + if len(filteredList) == 0 { + return + } // select new candidate candidate := m.outbound.Select(filteredList) + if candidate.Remote == nil { + return + } - if candidate.Remote != nil { - // reject if required services are missing - for _, reqService := range m.requiredService { - if candidate.Remote.Services().Get(reqService) == nil { - m.rejectionFilter.AddPeer(candidate.Remote.ID()) - return - } - } - - furthest, _ := m.outbound.getFurthest() - - // send peering request - mySalt := m.net.local().GetPublicSalt() - status, err := m.net.RequestPeering(candidate.Remote, mySalt) - if err != nil { - m.rejectionFilter.AddPeer(candidate.Remote.ID()) // TODO: add retries - return - } - - // add candidate to the outbound neighborhood - if status { - //m.acceptedFilter.AddPeer(candidate.Remote.ID()) - if furthest.Remote != nil { - m.outbound.RemovePeer(furthest.Remote.ID()) - m.dropPeer(furthest.Remote) - m.log.Debug("Outbound furthest removed ", furthest.Remote.ID()) - } - m.outbound.Add(candidate) - m.log.Debug("Peering request TO ", candidate.Remote.ID(), " status ACCEPTED (", len(m.outbound.GetPeers()), ",", len(m.inbound.GetPeers()), ")") - } else { - m.log.Debug("Peering request TO ", candidate.Remote.ID(), " status REJECTED (", len(m.outbound.GetPeers()), ",", len(m.inbound.GetPeers()), ")") - m.rejectionFilter.AddPeer(candidate.Remote.ID()) - //m.log.Debug("Rejection Filter ", candidate.Remote.ID()) - } - - // signal the result of the outgoing peering request - Events.OutgoingPeering.Trigger(&PeeringEvent{Self: m.self(), Peer: candidate.Remote, Status: status}) + // send peering request + status, err := m.net.RequestPeering(candidate.Remote, m.getPublicSalt()) + if err != nil { + m.rejectionFilter.AddPeer(candidate.Remote.ID()) + m.log.Debugw("error requesting peering", + "id", candidate.Remote.ID(), + "addr", candidate.Remote.Address(), "err", err, + ) + return } + if !status { + m.rejectionFilter.AddPeer(candidate.Remote.ID()) + m.triggerPeeringEvent(true, candidate.Remote, false) + return + } + + result = candidate } -func (m *manager) updateInbound(requester *peer.Peer, salt *salt.Salt) { - // TODO: check request legitimacy - //m.log.Debug("Evaluating peering request FROM ", requester.ID()) - reqDistance := peer.NewPeerDistance(m.self().Bytes(), m.net.local().GetPrivateSalt().GetBytes(), requester) +func (m *manager) handleInRequest(req peeringRequest) (resp bool) { + resp = reject + defer func() { m.replyChan <- resp }() // assure that a response is always issued - candidateList := []peer.PeerDistance{reqDistance} + if !m.isValidNeighbor(req.peer) { + return + } - filter := NewFilter() - filter.AddPeers(m.outbound.GetPeers()) // set filter for outbound neighbors - filteredList := filter.Apply(candidateList) // filter out current neighbors + reqDistance := peer.NewPeerDistance(m.getID().Bytes(), m.getPrivateSalt().GetBytes(), req.peer) + filter := m.getConnectedFilter() + filteredList := filter.Apply([]peer.PeerDistance{reqDistance}) + if len(filteredList) == 0 { + return + } - // make decision toAccept := m.inbound.Select(filteredList) - for _, reqService := range m.requiredService { - if requester.Services().Get(reqService) == nil { - toAccept.Remote = nil // reject if required services are missing - break - } - } - // reject request if toAccept.Remote == nil { - m.log.Debug("Peering request FROM ", requester.ID(), " status REJECTED (", len(m.outbound.GetPeers()), ",", len(m.inbound.GetPeers()), ")") - m.inboundReplyChan <- reject return } - // accept request - m.inboundReplyChan <- accept - furthest, _ := m.inbound.getFurthest() - // drop furthest neighbor - if furthest.Remote != nil { - m.inbound.RemovePeer(furthest.Remote.ID()) - m.dropPeer(furthest.Remote) - m.log.Debug("Inbound furthest removed ", furthest.Remote.ID()) - } - // update inbound neighborhood - m.inbound.Add(toAccept) - m.log.Debug("Peering request FROM ", toAccept.Remote.ID(), " status ACCEPTED (", len(m.outbound.GetPeers()), ",", len(m.inbound.GetPeers()), ")") + + m.addNeighbor(m.inbound, toAccept) + resp = accept + return } -func (m *manager) updateSalt() (*salt.Salt, *salt.Salt) { - pubSalt, _ := salt.NewSalt(saltLifetime) - m.net.local().SetPublicSalt(pubSalt) +func (m *manager) addNeighbor(nh *Neighborhood, toAdd peer.PeerDistance) { + // drop furthest neighbor if necessary + if furthest, _ := nh.getFurthest(); furthest.Remote != nil { + if p := nh.RemovePeer(furthest.Remote.ID()); p != nil { + m.dropPeering(p) + } + } + nh.Add(toAdd) +} - privSalt, _ := salt.NewSalt(saltLifetime) - m.net.local().SetPrivateSalt(privSalt) +func (m *manager) updateSalt() { + public, _ := salt.NewSalt(saltLifetime) + m.net.local().SetPublicSalt(public) + private, _ := salt.NewSalt(saltLifetime) + m.net.local().SetPrivateSalt(private) + // clean the rejection filter m.rejectionFilter.Clean() - if !dropNeighborsOnUpdate { // update distance without dropping neighbors - m.outbound.UpdateDistance(m.self().Bytes(), m.net.local().GetPublicSalt().GetBytes()) - m.inbound.UpdateDistance(m.self().Bytes(), m.net.local().GetPrivateSalt().GetBytes()) + if !m.dropOnUpdate { // update distance without dropping neighbors + m.outbound.UpdateDistance(m.getID().Bytes(), m.getPublicSalt().GetBytes()) + m.inbound.UpdateDistance(m.getID().Bytes(), m.getPrivateSalt().GetBytes()) } else { // drop all the neighbors m.dropNeighborhood(m.inbound) m.dropNeighborhood(m.outbound) } - return pubSalt, privSalt -} - -func (m *manager) dropNeighbor(peerToDrop peer.ID) { - m.inboundDropChan <- peerToDrop - m.outboundDropChan <- peerToDrop - - // signal the dropped peer - Events.Dropped.Trigger(&DroppedEvent{Self: m.self(), DroppedID: peerToDrop}) + m.log.Debugw("salt updated", + "public", saltLifetime, + "private", saltLifetime, + ) + Events.SaltUpdated.Trigger(&SaltUpdatedEvent{Self: m.getID(), Public: public, Private: private}) } -// containsPeer returns true if a peer with the given ID is in the list. -func containsPeer(list []*peer.Peer, id peer.ID) bool { - for _, p := range list { - if p.ID() == id { - return true - } +func (m *manager) dropNeighborhood(nh *Neighborhood) { + for _, p := range nh.GetPeers() { + nh.RemovePeer(p.ID()) + m.dropPeering(p) } - return false } -func (m *manager) acceptRequest(p *peer.Peer, s *salt.Salt) bool { - m.inboundRequestChan <- peeringRequest{p, s} - status := <-m.inboundReplyChan +// dropPeering sends the peering drop over the network and triggers the corresponding event. +func (m *manager) dropPeering(p *peer.Peer) { + m.net.SendPeeringDrop(p) - // signal the received request - Events.IncomingPeering.Trigger(&PeeringEvent{Self: m.self(), Peer: p, Status: status}) - - return status -} - -func (m *manager) getNeighbors() []*peer.Peer { - var neighbors []*peer.Peer - - neighbors = append(neighbors, m.inbound.GetPeers()...) - neighbors = append(neighbors, m.outbound.GetPeers()...) - - return neighbors -} - -func (m *manager) getIncomingNeighbors() []*peer.Peer { - var neighbors []*peer.Peer - - neighbors = append(neighbors, m.inbound.GetPeers()...) - - return neighbors + m.log.Debugw("peering dropped", + "id", p.ID(), + "#out", m.outbound, + "#in", m.inbound, + ) + Events.Dropped.Trigger(&DroppedEvent{Self: m.getID(), DroppedID: p.ID()}) } -func (m *manager) getOutgoingNeighbors() []*peer.Peer { - var neighbors []*peer.Peer - - neighbors = append(neighbors, m.outbound.GetPeers()...) - - return neighbors +func (m *manager) getConnectedFilter() *Filter { + filter := NewFilter() + filter.AddPeer(m.getID()) //set filter for oneself + filter.AddPeers(m.inbound.GetPeers()) // set filter for inbound neighbors + filter.AddPeers(m.outbound.GetPeers()) // set filter for outbound neighbors + return filter } -func (m *manager) getDuplicates() []peer.ID { - var d []peer.ID - - for _, p := range m.inbound.GetPeers() { - if containsPeer(m.outbound.GetPeers(), p.ID()) { - d = append(d, p.ID()) +// isValidNeighbor returns whether the given peer is a valid neighbor candidate. +func (m *manager) isValidNeighbor(p *peer.Peer) bool { + // do not connect to oneself + if m.getID() == p.ID() { + return false + } + // reject if required services are missing + for _, reqService := range m.requiredServices { + if p.Services().Get(reqService) == nil { + return false } } - return d + return true } -func (m *manager) dropNeighborhood(nh *Neighborhood) { - for _, p := range nh.GetPeers() { - nh.RemovePeer(p.ID()) - m.dropPeer(p) +func (m *manager) triggerPeeringEvent(isOut bool, p *peer.Peer, status bool) { + if isOut { + m.log.Debugw("peering requested", + "direction", "out", + "status", status, + "to", p.ID(), + "#out", m.outbound, + "#in", m.inbound, + ) + Events.OutgoingPeering.Trigger(&PeeringEvent{Self: m.getID(), Peer: p, Status: status}) + } else { + m.log.Debugw("peering requested", + "direction", "in", + "status", status, + "from", p.ID(), + "#out", m.outbound, + "#in", m.inbound, + ) + Events.IncomingPeering.Trigger(&PeeringEvent{Self: m.getID(), Peer: p, Status: status}) } } - -func (m *manager) dropPeer(p *peer.Peer) { - // send the drop request over the network - m.net.DropPeer(p) - // signal the drop - Events.Dropped.Trigger(&DroppedEvent{Self: m.self(), DroppedID: p.ID()}) -} diff --git a/packages/autopeering/selection/manager_test.go b/packages/autopeering/selection/manager_test.go index daffe4b0..15e472a2 100644 --- a/packages/autopeering/selection/manager_test.go +++ b/packages/autopeering/selection/manager_test.go @@ -2,47 +2,105 @@ package selection import ( "fmt" - "math/rand" + "sync" "testing" "time" "github.com/iotaledger/goshimmer/packages/autopeering/peer" "github.com/iotaledger/goshimmer/packages/autopeering/salt" + "github.com/iotaledger/hive.go/events" "github.com/iotaledger/hive.go/logger" "github.com/stretchr/testify/assert" ) -var ( - allPeers []*peer.Peer +const ( + testSaltLifetime = time.Hour // disable salt updates + testUpdateInterval = 2 * graceTime // very short update interval to speed up tests ) -type testPeer struct { - local *peer.Local - peer *peer.Peer - db peer.DB - log *logger.Logger - rand *rand.Rand // random number generator +func TestMgrNoDuplicates(t *testing.T) { + const ( + nNeighbors = 4 + nNodes = 2*nNeighbors + 1 + ) + SetParameters(Parameters{ + OutboundNeighborSize: nNeighbors, + InboundNeighborSize: nNeighbors, + SaltLifetime: testSaltLifetime, + OutboundUpdateInterval: testUpdateInterval, + }) + + mgrMap := make(map[peer.ID]*manager) + runTestNetwork(nNodes, mgrMap) + + for _, mgr := range mgrMap { + assert.NotEmpty(t, mgr.getOutNeighbors()) + assert.NotEmpty(t, mgr.getInNeighbors()) + assert.Empty(t, getDuplicates(mgr.getNeighbors())) + } +} + +func TestEvents(t *testing.T) { + // we want many drops/connects + const ( + nNeighbors = 2 + nNodes = 10 + ) + SetParameters(Parameters{ + OutboundNeighborSize: nNeighbors, + InboundNeighborSize: nNeighbors, + SaltLifetime: 3 * testUpdateInterval, + OutboundUpdateInterval: testUpdateInterval, + }) + + e, teardown := newEventMock(t) + defer teardown() + mgrMap := make(map[peer.ID]*manager) + runTestNetwork(nNodes, mgrMap) + + // the events should lead to exactly the same neighbors + for _, mgr := range mgrMap { + nc := e.m[mgr.getID()] + assert.ElementsMatchf(t, mgr.getOutNeighbors(), getValues(nc.out), + "out neighbors of %s do not match", mgr.getID()) + assert.ElementsMatch(t, mgr.getInNeighbors(), getValues(nc.in), + "in neighbors of %s do not match", mgr.getID()) + } } -func newPeer(name string) testPeer { - log := log.Named(name) - db := peer.NewMemoryDB(log.Named("db")) - local, _ := peer.NewLocal("", name, db) - s, _ := salt.NewSalt(100 * time.Second) - local.SetPrivateSalt(s) - s, _ = salt.NewSalt(100 * time.Second) - local.SetPublicSalt(s) - p := &local.Peer - return testPeer{local, p, db, log, rand.New(rand.NewSource(time.Now().UnixNano()))} +func getValues(m map[peer.ID]*peer.Peer) []*peer.Peer { + result := make([]*peer.Peer, 0, len(m)) + for _, p := range m { + result = append(result, p) + } + return result } -func removeDuplicatePeers(peers []*peer.Peer) []*peer.Peer { +func runTestNetwork(n int, mgrMap map[peer.ID]*manager) { + for i := 0; i < n; i++ { + _ = newTestManager(fmt.Sprintf("%d", i), mgrMap) + } + for _, mgr := range mgrMap { + mgr.start() + } + + // give the managers time to potentially connect all other peers + time.Sleep((time.Duration(n) - 1) * (outboundUpdateInterval + graceTime)) + + // close all the managers + for _, mgr := range mgrMap { + mgr.close() + } +} + +func getDuplicates(peers []*peer.Peer) []*peer.Peer { seen := make(map[peer.ID]bool, len(peers)) result := make([]*peer.Peer, 0, len(peers)) for _, p := range peers { if !seen[p.ID()] { seen[p.ID()] = true + } else { result = append(result, p) } } @@ -50,74 +108,115 @@ func removeDuplicatePeers(peers []*peer.Peer) []*peer.Peer { return result } -type testNet struct { - loc *peer.Local - self *peer.Peer - mgr map[peer.ID]*manager - rand *rand.Rand +type neighbors struct { + out, in map[peer.ID]*peer.Peer } -func (n testNet) local() *peer.Local { - return n.loc +type eventMock struct { + t *testing.T + lock sync.Mutex + m map[peer.ID]neighbors } -func (n testNet) DropPeer(p *peer.Peer) { - n.mgr[p.ID()].dropNeighbor(n.local().ID()) -} +func newEventMock(t *testing.T) (*eventMock, func()) { + e := &eventMock{ + t: t, + m: make(map[peer.ID]neighbors), + } -func (n testNet) RequestPeering(p *peer.Peer, s *salt.Salt) (bool, error) { - return n.mgr[p.ID()].acceptRequest(n.self, s), nil -} + outgoingPeeringC := events.NewClosure(e.outgoingPeering) + incomingPeeringC := events.NewClosure(e.incomingPeering) + droppedC := events.NewClosure(e.dropped) -func (n testNet) GetKnownPeers() []*peer.Peer { - list := make([]*peer.Peer, len(allPeers)-1) - i := 0 - for _, p := range allPeers { - if p.ID() == n.self.ID() { - continue - } + Events.OutgoingPeering.Attach(outgoingPeeringC) + Events.IncomingPeering.Attach(incomingPeeringC) + Events.Dropped.Attach(droppedC) - list[i] = p - i++ + teardown := func() { + Events.OutgoingPeering.Detach(outgoingPeeringC) + Events.IncomingPeering.Detach(incomingPeeringC) + Events.Dropped.Detach(droppedC) } - return list + return e, teardown } -func TestSimManager(t *testing.T) { - N := 9 // number of peers to generate +func (e *eventMock) outgoingPeering(ev *PeeringEvent) { + if !ev.Status { + return + } + e.lock.Lock() + defer e.lock.Unlock() + s, ok := e.m[ev.Self] + if !ok { + s = neighbors{out: make(map[peer.ID]*peer.Peer), in: make(map[peer.ID]*peer.Peer)} + e.m[ev.Self] = s + } + assert.NotContains(e.t, s.out, ev.Peer) + s.out[ev.Peer.ID()] = ev.Peer +} - allPeers = make([]*peer.Peer, N) +func (e *eventMock) incomingPeering(ev *PeeringEvent) { + if !ev.Status { + return + } + e.lock.Lock() + defer e.lock.Unlock() + s, ok := e.m[ev.Self] + if !ok { + s = neighbors{out: make(map[peer.ID]*peer.Peer), in: make(map[peer.ID]*peer.Peer)} + e.m[ev.Self] = s + } + assert.NotContains(e.t, s.in, ev.Peer) + s.in[ev.Peer.ID()] = ev.Peer +} - mgrMap := make(map[peer.ID]*manager) - for i := range allPeers { - p := newPeer(fmt.Sprintf("%d", i)) - allPeers[i] = p.peer - - net := testNet{ - mgr: mgrMap, - loc: p.local, - self: p.peer, - rand: p.rand, - } - mgrMap[p.local.ID()] = newManager(net, net.GetKnownPeers, p.log, &Parameters{SaltLifetime: 100 * time.Second}) +func (e *eventMock) dropped(ev *DroppedEvent) { + e.lock.Lock() + defer e.lock.Unlock() + if assert.Contains(e.t, e.m, ev.Self) { + s := e.m[ev.Self] + delete(s.out, ev.DroppedID) + delete(s.in, ev.DroppedID) } +} - // start all the managers - for _, mgr := range mgrMap { - mgr.start() +type networkMock struct { + loc *peer.Local + mgr map[peer.ID]*manager +} + +func newNetworkMock(name string, mgrMap map[peer.ID]*manager, log *logger.Logger) *networkMock { + local, _ := peer.NewLocal("mock", name, peer.NewMemoryDB(log)) + return &networkMock{ + loc: local, + mgr: mgrMap, } +} - time.Sleep(6 * time.Second) +func (n *networkMock) local() *peer.Local { + return n.loc +} - for i, p := range allPeers { - neighbors := mgrMap[p.ID()].getNeighbors() +func (n *networkMock) SendPeeringDrop(p *peer.Peer) { + n.mgr[p.ID()].removeNeighbor(n.local().ID()) +} - assert.NotEmpty(t, neighbors, "Peer %d has no neighbors", i) - assert.Equal(t, removeDuplicatePeers(neighbors), neighbors, "Peer %d has non unique neighbors", i) - } +func (n *networkMock) RequestPeering(p *peer.Peer, s *salt.Salt) (bool, error) { + return n.mgr[p.ID()].requestPeering(&n.local().Peer, s), nil +} - // close all the managers - for _, mgr := range mgrMap { - mgr.close() +func (n *networkMock) GetKnownPeers() []*peer.Peer { + peers := make([]*peer.Peer, 0, len(n.mgr)) + for _, m := range n.mgr { + peers = append(peers, &m.net.local().Peer) } + return peers +} + +func newTestManager(name string, mgrMap map[peer.ID]*manager) *manager { + l := log.Named(name) + net := newNetworkMock(name, mgrMap, l) + m := newManager(net, net.GetKnownPeers, l, Config{}) + mgrMap[m.getID()] = m + return m } diff --git a/packages/autopeering/selection/neighborhood.go b/packages/autopeering/selection/neighborhood.go index e3e138b2..638874b0 100644 --- a/packages/autopeering/selection/neighborhood.go +++ b/packages/autopeering/selection/neighborhood.go @@ -1,6 +1,7 @@ package selection import ( + "fmt" "sync" "github.com/iotaledger/goshimmer/packages/autopeering/distance" @@ -8,26 +9,37 @@ import ( ) type Neighborhood struct { - Neighbors []peer.PeerDistance - Size int - mutex sync.RWMutex + neighbors []peer.PeerDistance + size int + mu sync.RWMutex +} + +func NewNeighborhood(size int) *Neighborhood { + return &Neighborhood{ + neighbors: []peer.PeerDistance{}, + size: size, + } +} + +func (nh *Neighborhood) String() string { + return fmt.Sprintf("%d/%d", nh.GetNumPeers(), nh.size) } func (nh *Neighborhood) getFurthest() (peer.PeerDistance, int) { - nh.mutex.RLock() - defer nh.mutex.RUnlock() - if len(nh.Neighbors) < nh.Size { + nh.mu.RLock() + defer nh.mu.RUnlock() + if len(nh.neighbors) < nh.size { return peer.PeerDistance{ Remote: nil, Distance: distance.Max, - }, len(nh.Neighbors) + }, len(nh.neighbors) } index := 0 - furthest := nh.Neighbors[index] - for i, neighbor := range nh.Neighbors { - if neighbor.Distance > furthest.Distance { - furthest = neighbor + furthest := nh.neighbors[index] + for i, n := range nh.neighbors { + if n.Distance > furthest.Distance { + furthest = n index = i } } @@ -46,63 +58,75 @@ func (nh *Neighborhood) Select(candidates []peer.PeerDistance) peer.PeerDistance return peer.PeerDistance{} } -func (nh *Neighborhood) Add(toAdd peer.PeerDistance) { - nh.mutex.Lock() - defer nh.mutex.Unlock() - if len(nh.Neighbors) < nh.Size { - nh.Neighbors = append(nh.Neighbors, toAdd) +// Add tries to add a new peer with distance to the neighborhood. +// It returns true, if the peer was added, or false if the neighborhood was full. +func (nh *Neighborhood) Add(toAdd peer.PeerDistance) bool { + nh.mu.Lock() + defer nh.mu.Unlock() + if len(nh.neighbors) >= nh.size { + return false } + nh.neighbors = append(nh.neighbors, toAdd) + return true } -func (nh *Neighborhood) RemovePeer(toRemove peer.ID) { - index := nh.getPeerIndex(toRemove) - nh.mutex.Lock() - defer nh.mutex.Unlock() - if index < 0 || len(nh.Neighbors) == 0 || len(nh.Neighbors) < index+1 { - return +// RemovePeer removes the peer with the given ID from the neighborhood. +// It returns the peer that was removed or nil of no such peer exists. +func (nh *Neighborhood) RemovePeer(id peer.ID) *peer.Peer { + nh.mu.Lock() + defer nh.mu.Unlock() + + index := nh.getPeerIndex(id) + if index < 0 { + return nil + } + n := nh.neighbors[index] + + // remove index from slice + if index < len(nh.neighbors)-1 { + copy(nh.neighbors[index:], nh.neighbors[index+1:]) } - nh.Neighbors[index] = peer.PeerDistance{} - copy(nh.Neighbors[index:], nh.Neighbors[index+1:]) - nh.Neighbors = nh.Neighbors[:len(nh.Neighbors)-1] + nh.neighbors[len(nh.neighbors)-1] = peer.PeerDistance{} + nh.neighbors = nh.neighbors[:len(nh.neighbors)-1] + + return n.Remote } -func (nh *Neighborhood) getPeerIndex(target peer.ID) int { - nh.mutex.RLock() - defer nh.mutex.RUnlock() - for i, peer := range nh.Neighbors { - if peer.Remote.ID() == target { +func (nh *Neighborhood) getPeerIndex(id peer.ID) int { + for i, p := range nh.neighbors { + if p.Remote.ID() == id { return i } } return -1 - } func (nh *Neighborhood) UpdateDistance(anchor, salt []byte) { - nh.mutex.Lock() - defer nh.mutex.Unlock() - for i, peer := range nh.Neighbors { - nh.Neighbors[i].Distance = distance.BySalt(anchor, peer.Remote.ID().Bytes(), salt) + nh.mu.Lock() + defer nh.mu.Unlock() + for i, n := range nh.neighbors { + nh.neighbors[i].Distance = distance.BySalt(anchor, n.Remote.ID().Bytes(), salt) } } +func (nh *Neighborhood) IsFull() bool { + nh.mu.RLock() + defer nh.mu.RUnlock() + return len(nh.neighbors) >= nh.size +} + func (nh *Neighborhood) GetPeers() []*peer.Peer { - nh.mutex.RLock() - defer nh.mutex.RUnlock() - list := make([]*peer.Peer, len(nh.Neighbors)) - for i, peer := range nh.Neighbors { - list[i] = peer.Remote + nh.mu.RLock() + defer nh.mu.RUnlock() + result := make([]*peer.Peer, len(nh.neighbors)) + for i, n := range nh.neighbors { + result[i] = n.Remote } - return list + return result } -func (nh *Neighborhood) GetPeerFromID(id peer.ID) *peer.Peer { - nh.mutex.RLock() - defer nh.mutex.RUnlock() - for _, peer := range nh.Neighbors { - if peer.Remote.ID() == id { - return peer.Remote - } - } - return nil +func (nh *Neighborhood) GetNumPeers() int { + nh.mu.RLock() + defer nh.mu.RUnlock() + return len(nh.neighbors) } diff --git a/packages/autopeering/selection/neighborhood_test.go b/packages/autopeering/selection/neighborhood_test.go index 40fcf597..19dc5211 100644 --- a/packages/autopeering/selection/neighborhood_test.go +++ b/packages/autopeering/selection/neighborhood_test.go @@ -26,8 +26,8 @@ func TestGetFurthest(t *testing.T) { tests := []testCase{ { input: &Neighborhood{ - Neighbors: []peer.PeerDistance{d[0]}, - Size: 4}, + neighbors: []peer.PeerDistance{d[0]}, + size: 4}, expected: peer.PeerDistance{ Remote: nil, Distance: distance.Max}, @@ -35,15 +35,15 @@ func TestGetFurthest(t *testing.T) { }, { input: &Neighborhood{ - Neighbors: []peer.PeerDistance{d[0], d[1], d[2], d[3]}, - Size: 4}, + neighbors: []peer.PeerDistance{d[0], d[1], d[2], d[3]}, + size: 4}, expected: d[3], index: 3, }, { input: &Neighborhood{ - Neighbors: []peer.PeerDistance{d[0], d[1], d[4], d[2]}, - Size: 4}, + neighbors: []peer.PeerDistance{d[0], d[1], d[4], d[2]}, + size: 4}, expected: d[4], index: 2, }, @@ -72,22 +72,22 @@ func TestGetPeerIndex(t *testing.T) { tests := []testCase{ { input: &Neighborhood{ - Neighbors: []peer.PeerDistance{d[0]}, - Size: 4}, + neighbors: []peer.PeerDistance{d[0]}, + size: 4}, target: d[0].Remote, index: 0, }, { input: &Neighborhood{ - Neighbors: []peer.PeerDistance{d[0], d[1], d[2], d[3]}, - Size: 4}, + neighbors: []peer.PeerDistance{d[0], d[1], d[2], d[3]}, + size: 4}, target: d[3].Remote, index: 3, }, { input: &Neighborhood{ - Neighbors: []peer.PeerDistance{d[0], d[1], d[4], d[2]}, - Size: 4}, + neighbors: []peer.PeerDistance{d[0], d[1], d[4], d[2]}, + size: 4}, target: d[3].Remote, index: -1, }, @@ -115,22 +115,22 @@ func TestRemove(t *testing.T) { tests := []testCase{ { input: &Neighborhood{ - Neighbors: []peer.PeerDistance{d[0]}, - Size: 4}, + neighbors: []peer.PeerDistance{d[0]}, + size: 4}, toRemove: d[0].Remote, expected: []peer.PeerDistance{}, }, { input: &Neighborhood{ - Neighbors: []peer.PeerDistance{d[0], d[1], d[3]}, - Size: 4}, + neighbors: []peer.PeerDistance{d[0], d[1], d[3]}, + size: 4}, toRemove: d[1].Remote, expected: []peer.PeerDistance{d[0], d[3]}, }, { input: &Neighborhood{ - Neighbors: []peer.PeerDistance{d[0], d[1], d[4], d[2]}, - Size: 4}, + neighbors: []peer.PeerDistance{d[0], d[1], d[4], d[2]}, + size: 4}, toRemove: d[2].Remote, expected: []peer.PeerDistance{d[0], d[1], d[4]}, }, @@ -138,7 +138,7 @@ func TestRemove(t *testing.T) { for _, test := range tests { test.input.RemovePeer(test.toRemove.ID()) - assert.Equal(t, test.expected, test.input.Neighbors, "Remove") + assert.Equal(t, test.expected, test.input.neighbors, "Remove") } } @@ -158,22 +158,22 @@ func TestAdd(t *testing.T) { tests := []testCase{ { input: &Neighborhood{ - Neighbors: []peer.PeerDistance{d[0]}, - Size: 4}, + neighbors: []peer.PeerDistance{d[0]}, + size: 4}, toAdd: d[2], expected: []peer.PeerDistance{d[0], d[2]}, }, { input: &Neighborhood{ - Neighbors: []peer.PeerDistance{d[0], d[1], d[3]}, - Size: 4}, + neighbors: []peer.PeerDistance{d[0], d[1], d[3]}, + size: 4}, toAdd: d[2], expected: []peer.PeerDistance{d[0], d[1], d[3], d[2]}, }, { input: &Neighborhood{ - Neighbors: []peer.PeerDistance{d[0], d[1], d[4], d[2]}, - Size: 4}, + neighbors: []peer.PeerDistance{d[0], d[1], d[4], d[2]}, + size: 4}, toAdd: d[3], expected: []peer.PeerDistance{d[0], d[1], d[4], d[2]}, }, @@ -181,7 +181,7 @@ func TestAdd(t *testing.T) { for _, test := range tests { test.input.Add(test.toAdd) - assert.Equal(t, test.expected, test.input.Neighbors, "Add") + assert.Equal(t, test.expected, test.input.neighbors, "Add") } } @@ -201,12 +201,12 @@ func TestUpdateDistance(t *testing.T) { } neighborhood := Neighborhood{ - Neighbors: d[1:], - Size: 4} + neighbors: d[1:], + size: 4} neighborhood.UpdateDistance(d[0].Remote.ID().Bytes(), s.GetBytes()) - assert.Equal(t, d2, neighborhood.Neighbors, "UpdateDistance") + assert.Equal(t, d2, neighborhood.neighbors, "UpdateDistance") } func TestGetPeers(t *testing.T) { @@ -227,20 +227,20 @@ func TestGetPeers(t *testing.T) { tests := []testCase{ { input: &Neighborhood{ - Neighbors: []peer.PeerDistance{}, - Size: 4}, + neighbors: []peer.PeerDistance{}, + size: 4}, expected: []*peer.Peer{}, }, { input: &Neighborhood{ - Neighbors: []peer.PeerDistance{d[0]}, - Size: 4}, + neighbors: []peer.PeerDistance{d[0]}, + size: 4}, expected: []*peer.Peer{peers[0]}, }, { input: &Neighborhood{ - Neighbors: []peer.PeerDistance{d[0], d[1], d[3], d[2]}, - Size: 4}, + neighbors: []peer.PeerDistance{d[0], d[1], d[3], d[2]}, + size: 4}, expected: []*peer.Peer{peers[0], peers[1], peers[3], peers[2]}, }, } diff --git a/packages/autopeering/selection/protocol.go b/packages/autopeering/selection/protocol.go index 3b4b36c1..dd92c1f2 100644 --- a/packages/autopeering/selection/protocol.go +++ b/packages/autopeering/selection/protocol.go @@ -10,8 +10,8 @@ import ( "github.com/iotaledger/goshimmer/packages/autopeering/salt" pb "github.com/iotaledger/goshimmer/packages/autopeering/selection/proto" "github.com/iotaledger/goshimmer/packages/autopeering/server" + "github.com/iotaledger/hive.go/logger" "github.com/pkg/errors" - "go.uber.org/zap" ) // DiscoverProtocol specifies the methods from the peer discovery that are required. @@ -28,9 +28,9 @@ type DiscoverProtocol interface { type Protocol struct { server.Protocol - disc DiscoverProtocol // reference to the peer discovery to query verified peers - loc *peer.Local // local peer that runs the protocol - log *zap.SugaredLogger // logging + disc DiscoverProtocol // reference to the peer discovery to query verified peers + loc *peer.Local // local peer that runs the protocol + log *logger.Logger // logging mgr *manager // the manager handles the actual neighbor selection closeOnce sync.Once @@ -44,7 +44,7 @@ func New(local *peer.Local, disc DiscoverProtocol, cfg Config) *Protocol { disc: disc, log: cfg.Log, } - p.mgr = newManager(p, disc.GetVerifiedPeers, cfg.Log.Named("mgr"), cfg.Param) + p.mgr = newManager(p, disc.GetVerifiedPeers, cfg.Log.Named("mgr"), cfg) return p } @@ -78,12 +78,19 @@ func (p *Protocol) GetNeighbors() []*peer.Peer { // GetIncomingNeighbors returns the current incoming neighbors. func (p *Protocol) GetIncomingNeighbors() []*peer.Peer { - return p.mgr.getIncomingNeighbors() + return p.mgr.getInNeighbors() } // GetOutgoingNeighbors returns the current outgoing neighbors. func (p *Protocol) GetOutgoingNeighbors() []*peer.Peer { - return p.mgr.getOutgoingNeighbors() + return p.mgr.getOutNeighbors() +} + +// RemoveNeighbor removes the peer with the given id from the incoming and outgoing neighbors. +// If such a peer was actually contained in anyone of the neighbor sets, the corresponding event is triggered +// and the corresponding peering drop is sent. Otherwise the call is ignored. +func (p *Protocol) RemoveNeighbor(id peer.ID) { + p.mgr.removeNeighbor(id) } // HandleMessage responds to incoming neighbor selection messages. @@ -160,10 +167,8 @@ func (p *Protocol) RequestPeering(to *peer.Peer, salt *salt.Salt) (bool, error) return status, err } -// DropPeer sends a PeeringDrop to the given peer. -func (p *Protocol) DropPeer(to *peer.Peer) { - p.mgr.dropNeighbor(to.ID()) - +// SendPeeringDrop sends a peering drop to the given peer and does not wait for any responses. +func (p *Protocol) SendPeeringDrop(to *peer.Peer) { toAddr := to.Address() drop := newPeeringDrop(toAddr) @@ -282,7 +287,7 @@ func (p *Protocol) handlePeeringRequest(s *server.Server, fromAddr string, fromI return } - res := newPeeringResponse(rawData, p.mgr.acceptRequest(from, salt)) + res := newPeeringResponse(rawData, p.mgr.requestPeering(from, salt)) p.log.Debugw("send message", "type", res.Name(), @@ -334,5 +339,5 @@ func (p *Protocol) validatePeeringDrop(s *server.Server, fromAddr string, m *pb. } func (p *Protocol) handlePeeringDrop(fromID peer.ID) { - p.mgr.dropNeighbor(fromID) + p.mgr.removeNeighbor(fromID) } diff --git a/packages/autopeering/selection/protocol_test.go b/packages/autopeering/selection/protocol_test.go index 8caf9174..d43820f8 100644 --- a/packages/autopeering/selection/protocol_test.go +++ b/packages/autopeering/selection/protocol_test.go @@ -38,10 +38,7 @@ func newTest(t require.TestingT, trans transport.Transport) (*server.Server, *Pr // add the new peer to the global map for dummyDiscovery peerMap[local.ID()] = &local.Peer - cfg := Config{ - Log: l, - } - prot := New(local, dummyDiscovery{}, cfg) + prot := New(local, dummyDiscovery{}, Config{Log: l}) srv := server.Listen(local, trans, l.Named("srv"), prot) prot.Start(srv) @@ -57,75 +54,102 @@ func getPeer(s *server.Server) *peer.Peer { return &s.Local().Peer } -func TestProtPeeringRequest(t *testing.T) { - p2p := transport.P2P() - defer p2p.Close() - - srvA, protA, closeA := newTest(t, p2p.A) - defer closeA() - srvB, protB, closeB := newTest(t, p2p.B) - defer closeB() - - peerA := getPeer(srvA) - saltA, _ := salt.NewSalt(100 * time.Second) - peerB := getPeer(srvB) - saltB, _ := salt.NewSalt(100 * time.Second) - - // request peering to peer B - t.Run("A->B", func(t *testing.T) { - if services, err := protA.RequestPeering(peerB, saltA); assert.NoError(t, err) { - assert.NotEmpty(t, services) - } +func TestProtocol(t *testing.T) { + // assure that the default test parameters are used for all protocol tests + SetParameters(Parameters{ + SaltLifetime: testSaltLifetime, + OutboundUpdateInterval: testUpdateInterval, }) - // request peering to peer A - t.Run("B->A", func(t *testing.T) { - if services, err := protB.RequestPeering(peerA, saltB); assert.NoError(t, err) { - assert.NotEmpty(t, services) - } + + t.Run("PeeringRequest", func(t *testing.T) { + p2p := transport.P2P() + defer p2p.Close() + + srvA, protA, closeA := newTest(t, p2p.A) + defer closeA() + srvB, protB, closeB := newTest(t, p2p.B) + defer closeB() + + peerA := getPeer(srvA) + saltA, _ := salt.NewSalt(100 * time.Second) + peerB := getPeer(srvB) + saltB, _ := salt.NewSalt(100 * time.Second) + + // request peering to peer B + t.Run("A->B", func(t *testing.T) { + if services, err := protA.RequestPeering(peerB, saltA); assert.NoError(t, err) { + assert.NotEmpty(t, services) + } + }) + // request peering to peer A + t.Run("B->A", func(t *testing.T) { + if services, err := protB.RequestPeering(peerA, saltB); assert.NoError(t, err) { + assert.NotEmpty(t, services) + } + }) }) -} -func TestProtExpiredSalt(t *testing.T) { - p2p := transport.P2P() - defer p2p.Close() + t.Run("ExpiredSalt", func(t *testing.T) { + p2p := transport.P2P() + defer p2p.Close() - _, protA, closeA := newTest(t, p2p.A) - defer closeA() - srvB, _, closeB := newTest(t, p2p.B) - defer closeB() + _, protA, closeA := newTest(t, p2p.A) + defer closeA() + srvB, _, closeB := newTest(t, p2p.B) + defer closeB() - saltA, _ := salt.NewSalt(-1 * time.Second) - peerB := getPeer(srvB) + saltA, _ := salt.NewSalt(-1 * time.Second) + peerB := getPeer(srvB) - // request peering to peer B - _, err := protA.RequestPeering(peerB, saltA) - assert.EqualError(t, err, server.ErrTimeout.Error()) -} + // request peering to peer B + _, err := protA.RequestPeering(peerB, saltA) + assert.EqualError(t, err, server.ErrTimeout.Error()) + }) -func TestProtDropPeer(t *testing.T) { - p2p := transport.P2P() - defer p2p.Close() + t.Run("PeeringDrop", func(t *testing.T) { + p2p := transport.P2P() + defer p2p.Close() - srvA, protA, closeA := newTest(t, p2p.A) - defer closeA() - srvB, protB, closeB := newTest(t, p2p.B) - defer closeB() + srvA, protA, closeA := newTest(t, p2p.A) + defer closeA() + srvB, protB, closeB := newTest(t, p2p.B) + defer closeB() - peerA := getPeer(srvA) - saltA, _ := salt.NewSalt(100 * time.Second) - peerB := getPeer(srvB) + peerA := getPeer(srvA) + saltA, _ := salt.NewSalt(100 * time.Second) + peerB := getPeer(srvB) - // request peering to peer B - services, err := protA.RequestPeering(peerB, saltA) - require.NoError(t, err) - assert.NotEmpty(t, services) + // request peering to peer B + services, err := protA.RequestPeering(peerB, saltA) + require.NoError(t, err) + assert.NotEmpty(t, services) + + require.Contains(t, protB.GetNeighbors(), peerA) + + // drop peer A + protA.SendPeeringDrop(peerB) + time.Sleep(graceTime) + require.NotContains(t, protB.GetNeighbors(), peerA) + }) + + t.Run("FullTest", func(t *testing.T) { + p2p := transport.P2P() + defer p2p.Close() + + srvA, protA, closeA := newFullTest(t, p2p.A) + defer closeA() - require.Contains(t, protB.GetNeighbors(), peerA) + time.Sleep(graceTime) // wait for the master to initialize - // drop peer A - protA.DropPeer(peerB) - time.Sleep(graceTime) - require.NotContains(t, protB.GetNeighbors(), peerA) + srvB, protB, closeB := newFullTest(t, p2p.B, getPeer(srvA)) + defer closeB() + + time.Sleep(outboundUpdateInterval + graceTime) // wait for the next outbound cycle + + // the two peers should be peered + assert.ElementsMatch(t, []*peer.Peer{getPeer(srvB)}, protA.GetNeighbors()) + assert.ElementsMatch(t, []*peer.Peer{getPeer(srvA)}, protB.GetNeighbors()) + }) } // newTest creates a new server handling discover as well as neighborhood and also returns the teardown. @@ -156,22 +180,3 @@ func newFullTest(t require.TestingT, trans transport.Transport, masterPeers ...* } return srv, selection, teardown } - -func TestProtFull(t *testing.T) { - p2p := transport.P2P() - defer p2p.Close() - - srvA, protA, closeA := newFullTest(t, p2p.A) - defer closeA() - - time.Sleep(graceTime) // wait for the master to initialize - - srvB, protB, closeB := newFullTest(t, p2p.B, getPeer(srvA)) - defer closeB() - - time.Sleep(updateOutboundInterval + graceTime) // wait for the next outbound cycle - - // the two peers should be peered - assert.ElementsMatch(t, []*peer.Peer{getPeer(srvB)}, protA.GetNeighbors()) - assert.ElementsMatch(t, []*peer.Peer{getPeer(srvA)}, protB.GetNeighbors()) -} diff --git a/packages/autopeering/selection/selection_test.go b/packages/autopeering/selection/selection_test.go index cdd89874..cc2235e8 100644 --- a/packages/autopeering/selection/selection_test.go +++ b/packages/autopeering/selection/selection_test.go @@ -146,26 +146,26 @@ func TestSelection(t *testing.T) { tests := []testCase{ { nh: &Neighborhood{ - Neighbors: []peer.PeerDistance{d[0]}, - Size: 4}, + neighbors: []peer.PeerDistance{d[0]}, + size: 4}, expCandidate: d[1].Remote, }, { nh: &Neighborhood{ - Neighbors: []peer.PeerDistance{d[0], d[1], d[3]}, - Size: 4}, + neighbors: []peer.PeerDistance{d[0], d[1], d[3]}, + size: 4}, expCandidate: d[2].Remote, }, { nh: &Neighborhood{ - Neighbors: []peer.PeerDistance{d[0], d[1], d[4], d[2]}, - Size: 4}, + neighbors: []peer.PeerDistance{d[0], d[1], d[4], d[2]}, + size: 4}, expCandidate: d[3].Remote, }, { nh: &Neighborhood{ - Neighbors: []peer.PeerDistance{d[0], d[1], d[2], d[3]}, - Size: 4}, + neighbors: []peer.PeerDistance{d[0], d[1], d[2], d[3]}, + size: 4}, expCandidate: nil, }, } diff --git a/packages/autopeering/server/server.go b/packages/autopeering/server/server.go index d403b523..2c6303e5 100644 --- a/packages/autopeering/server/server.go +++ b/packages/autopeering/server/server.go @@ -11,8 +11,8 @@ import ( "github.com/iotaledger/goshimmer/packages/autopeering/peer" pb "github.com/iotaledger/goshimmer/packages/autopeering/server/proto" "github.com/iotaledger/goshimmer/packages/autopeering/transport" + "github.com/iotaledger/hive.go/logger" "github.com/pkg/errors" - "go.uber.org/zap" ) const ( @@ -25,7 +25,7 @@ type Server struct { local *peer.Local trans transport.Transport handlers []Handler - log *zap.SugaredLogger + log *logger.Logger network string address string @@ -70,7 +70,7 @@ type reply struct { // Listen starts a new peer server using the given transport layer for communication. // Sent data is signed using the identity of the local peer, // received data with a valid peer signature is handled according to the provided Handler. -func Listen(local *peer.Local, t transport.Transport, log *zap.SugaredLogger, h ...Handler) *Server { +func Listen(local *peer.Local, t transport.Transport, log *logger.Logger, h ...Handler) *Server { srv := &Server{ local: local, trans: t, diff --git a/plugins/autopeering/autopeering.go b/plugins/autopeering/autopeering.go index 95b2b8e4..ff21679c 100644 --- a/plugins/autopeering/autopeering.go +++ b/plugins/autopeering/autopeering.go @@ -44,11 +44,8 @@ func configureAP() { // enable peer selection only when gossip is enabled if !node.IsSkipped(gossip.PLUGIN) { Selection = selection.New(local.GetInstance(), Discovery, selection.Config{ - Log: log.Named("sel"), - Param: &selection.Parameters{ - SaltLifetime: selection.DefaultSaltLifetime, - RequiredService: []service.Key{service.GossipKey}, - }, + Log: log.Named("sel"), + RequiredServices: []service.Key{service.GossipKey}, }) } } diff --git a/plugins/autopeering/plugin.go b/plugins/autopeering/plugin.go index c6e9ec88..f40c3746 100644 --- a/plugins/autopeering/plugin.go +++ b/plugins/autopeering/plugin.go @@ -1,6 +1,8 @@ package autopeering import ( + "time" + "github.com/iotaledger/goshimmer/packages/autopeering/discover" "github.com/iotaledger/goshimmer/packages/autopeering/peer" "github.com/iotaledger/goshimmer/packages/autopeering/selection" @@ -32,10 +34,10 @@ func run(*node.Plugin) { func configureEvents() { // notify the selection when a connection is closed or failed. gossip.Events.ConnectionFailed.Attach(events.NewClosure(func(p *peer.Peer) { - Selection.DropPeer(p) + Selection.RemoveNeighbor(p.ID()) })) gossip.Events.NeighborRemoved.Attach(events.NewClosure(func(p *peer.Peer) { - Selection.DropPeer(p) + Selection.RemoveNeighbor(p.ID()) })) discover.Events.PeerDiscovered.Attach(events.NewClosure(func(ev *discover.DiscoveredEvent) { @@ -45,6 +47,9 @@ func configureEvents() { log.Infof("Removed offline: %s / %s", ev.Peer.Address(), ev.Peer.ID()) })) + selection.Events.SaltUpdated.Attach(events.NewClosure(func(ev *selection.SaltUpdatedEvent) { + log.Infof("Salt updated; expires=%s", ev.Public.GetExpiration().Format(time.RFC822)) + })) selection.Events.OutgoingPeering.Attach(events.NewClosure(func(ev *selection.PeeringEvent) { log.Infof("Peering chosen: %s / %s", ev.Peer.Address(), ev.Peer.ID()) })) diff --git a/plugins/gossip/plugin.go b/plugins/gossip/plugin.go index 3829565b..6d84cd4f 100644 --- a/plugins/gossip/plugin.go +++ b/plugins/gossip/plugin.go @@ -53,6 +53,9 @@ func configureEvents() { }() })) + gossip.Events.ConnectionFailed.Attach(events.NewClosure(func(p *peer.Peer) { + log.Infof("Connection to neighbor failed: %s / %s", gossip.GetAddress(p), p.ID()) + })) gossip.Events.NeighborAdded.Attach(events.NewClosure(func(n *gossip.Neighbor) { log.Infof("Neighbor added: %s / %s", gossip.GetAddress(n.Peer), n.ID()) })) -- GitLab