Select Git revision
Wolfgang Welz
authored and
GitHub
committed
*GetNextCandidate selection added * Recompile salt.proto * Refactor peer IDs completely * Recompile proto sources * WIP * renamed to sort * Return requested peers on request * Kepp track how ofter a managed peer was verified * sort adapted to the new peer * Query verified peers for new peers * WIP * Add manager tests * Add peering messages to protobuf * Make Salt methods consistent with Peer * Handle peering messages * WIP * Add To filed to PeersRequest * Add PeersRequest test * Adding PeersRequest benchmark * Move mpeer and helper functions to own file * Bump peer only once on reverification * WIP * * WIP * Increase buffer to prevent deadlocks * Remove unnecessary lock * Add peering request test * Make tests deterministic by triggering reverify * WIP * fixing concurrency issues * testManager improved * Don't send PeeringResponse for invalid salt * WIP * WIP * removed commented code * WIP * WIP * Neighbourhood manager (#11) * GetNextCandidate selection added * fixing concurrency issues * testManager improved * removed commented code * WIP * WIP * WIP * WIP * Make TestAdd pass in neighborhood_test `Add` should do nothing if the neighborhood list is full. * Improve TestSrvVerifyBoot * Upgrade to go 1.13 * Use testify instead of magiconair/properties * * WIP * WIP * WIP * WIP * Simulation (#14) * clear rejection filter after refreshing the public salt * clean rejection filter * Add mpeer test * gofmt test * Remove ineffactual assignments * Trigger panic when index is out of bound in deletePeer * Add selection test * Add TestGetFurtherest and TestGetPeerIndex * both salt updated at the same time and neighborhood dropped * Add node peering history for simulation * WIP * added sim visualization * removed root handler * added README * updated README * removed unused dependencies * Tidy go.mod * Work in progress * WIP * WIP * WIP * improved start button * WIP * added keyboard support for start * WIP * WIP * WIP * WIP * WIP * WIP * added input parameters * README updated * gif updated * figure updated * updated * updated gif * removed simulation metrics * updated * updated * Extract server from discovery and peering * Use golang/protobuf * Update README.md * Rename PeersRequest to DiscoveryRequest * Fixing typos * Move unused function to tests * The selection protocol depends on the discovery * Make tests more robust when using -race * Improve logging * Remove unnecessary arguments * Fix data races * added timeout for simulated network * added loop to animation * rename neighborhood to selection * adds initial salt and fixes simulation end * visualizer enabled by default * new parameter dropAll and improved python script * updated README * updated README * fix salt initialization * added blogpost link * Add badges to README * Add Travis CI for tests * Correct formating * Running golangci on Travis * Ignore return value of 'logger.Sync' * Remove unused functions * Add link to license * Move simnetwork to transport * Use the complete network protocol in simulations * Do not export selection/manager * Remove gRPC transport layer * Add UDP connection transport test * Implement the peer DB using Goshimmer database * Use the local GoShimmer version * Add support for a persistent database * Persist private key of local peer in database * Set TTL for bootstrap peers * Use GoShimmer events * Store the supported services in the local peer * Use most current GoShimmer git version as a dep * Switch to hive.go event package * Use correct module iotaledger/autopeering-sim * Provide dummy service in autopeering simulation * adds service support in the selection * adds peer discovered events * adds GetIncomingNeighbors and GetOutgoingNeighbors * fixes out of bound error in splitNodeKey * adds public IP support * fixes localhost * fixes localhost parsing * changes selection loop to 1s * switches from fmt.Println to log.Debug * increases maxKnown to 1000 and always triggers discovered peer * adds PeerDeleted event * moves PeerDeleted event into deletePeer * adds config paramters for the peer discovery * adds config parameters to neighbor selection * enable/disable inbound/outbound selection * Improve Godoc comments * modifies disabled outbound selection * fixes bug with disabling the selection * removes getMyIP() from server * removes some debugging logs * Introduce services - Each peer now comes with a set of services - Local peer is a proper peer - Services are exchanged during Pong and are available for all verified peers * fixes GetVerifiedPeer * adds gossip key to the service * removes debugging logs * Add test for GetVerifiedPeer * Fix main * Add localNetwork to Protocol * Add new but verified peers to the manager * changes configurable parameters * fixes DiscoveryRequest field sequence * Regenerate proto files * Cleanup parameters * Fix potential race condition * Reduce logging verbosity * Add test for selection+discover * Return net.Addr in Transport * Remove inbound/outbound switches completely * Improve logging * Fix peerdb logs * Fix peer updating * Make TestProtFull more robust * Make queryInterval a parameter * Improve loggind during querying * Trigger PeerDiscovered only for active peers * Cleanup protocol tests * Add discovery test on protocol level * Rename maxVerified to maxManaged * Increase default query interval * Improve discover benchmarks * Fix manager tests * Do not use bugged assert.Eventually * Fix linter warnings in server * Remove unused parameters * Make transport work on slices of bytes * Fix typo in comments * Fix linter warnings * UpdateService accepts two strings * Add test that services are received in discover * adds required services * Handle closed connections consistently * Code cleanup * fixes DropPeer * improves debug messages * Log errors during reverification * Log packet size * refactor: remove unused files * refactor: use internal autopeering package Co-authored-by: Angelo Capossele <angelocapossele@gmail.com> Co-authored-by:
jkrvivian <jkrvivian@gmail.com>
manager.go 7.98 KiB
package discover
import (
"math/rand"
"sync"
"time"
"github.com/iotaledger/goshimmer/packages/autopeering/peer"
"github.com/iotaledger/goshimmer/packages/autopeering/server"
"go.uber.org/zap"
)
var (
reverifyInterval = DefaultReverifyInterval // time interval after which the next peer is reverified
reverifyTries = DefaultReverifyTries // number of times a peer is pinged before it is removed
queryInterval = DefaultQueryInterval // time interval after which peers are queried for new peers
maxManaged = DefaultMaxManaged // maximum number of peers that can be managed
maxReplacements = DefaultMaxReplacements // maximum number of peers kept in the replacement list
)
type network interface {
local() *peer.Local
ping(*peer.Peer) error
discoveryRequest(*peer.Peer) ([]*peer.Peer, error)
}
type manager struct {
mutex sync.Mutex // protects active and replacement
active []*mpeer
replacements []*mpeer
net network
log *zap.SugaredLogger
wg sync.WaitGroup
closing chan struct{}
}
func newManager(net network, masters []*peer.Peer, log *zap.SugaredLogger, param *Parameters) *manager {
if param != nil {
if param.ReverifyInterval > 0 {
reverifyInterval = param.ReverifyInterval
}
if param.ReverifyTries > 0 {
reverifyTries = param.ReverifyTries
}
if param.QueryInterval > 0 {
queryInterval = param.QueryInterval
}
if param.MaxManaged > 0 {
maxManaged = param.MaxManaged
}
if param.MaxReplacements > 0 {
maxReplacements = param.MaxReplacements
}
}
m := &manager{
active: make([]*mpeer, 0, maxManaged),
replacements: make([]*mpeer, 0, maxReplacements),
net: net,
log: log,
closing: make(chan struct{}),
}
m.loadInitialPeers(masters)
return m
}
func (m *manager) start() {
m.wg.Add(1)
go m.loop()
}
func (m *manager) self() peer.ID {
return m.net.local().ID()
}
func (m *manager) close() {
close(m.closing)
m.wg.Wait()
}
func (m *manager) loop() {
defer m.wg.Done()
var (
reverify = time.NewTimer(0) // setting this to 0 will cause a trigger right away
reverifyDone chan struct{}
query = time.NewTimer(server.ResponseTimeout) // trigger the first query after the reverify
queryNext chan time.Duration
)
defer reverify.Stop()
defer query.Stop()
Loop:
for {
select {
// start verification, if not yet running
case <-reverify.C:
// if there is no reverifyDone, this means doReverify is not running
if reverifyDone == nil {
reverifyDone = make(chan struct{})
go m.doReverify(reverifyDone)
}
// reset verification
case <-reverifyDone:
reverifyDone = nil
reverify.Reset(reverifyInterval) // reverify again after the given interval
// start requesting new peers, if no yet running
case <-query.C:
if queryNext == nil {
queryNext = make(chan time.Duration)
go m.doQuery(queryNext)
}
// on query done, reset time to given duration
case d := <-queryNext:
queryNext = nil
query.Reset(d)
// on close, exit the loop
case <-m.closing:
break Loop
}
}
// wait for spawned goroutines to finish
if reverifyDone != nil {
<-reverifyDone
}
if queryNext != nil {
<-queryNext
}
}
// doReverify pings the oldest active peer.
func (m *manager) doReverify(done chan<- struct{}) {
defer close(done)
p := m.peerToReverify()
if p == nil {
return // nothing can be reverified
}
m.log.Debugw("reverifying",
"id", p.ID(),
"addr", p.Address(),
)
var err error
for i := 0; i < reverifyTries; i++ {
err = m.net.ping(unwrapPeer(p))
if err == nil {
break
} else {
m.log.Debugw("ping failed",
"id", p.ID(),
"addr", p.Address(),
"err", err,
)
time.Sleep(1 * time.Second)
}
}
// could not verify the peer
if err != nil {
m.mutex.Lock()
defer m.mutex.Unlock()
m.active, _ = deletePeerByID(m.active, p.ID())
m.log.Debugw("remove dead",
"peer", p,
)
Events.PeerDeleted.Trigger(&DeletedEvent{Peer: unwrapPeer(p)})
// add a random replacement, if available
if len(m.replacements) > 0 {
var r *mpeer
m.replacements, r = deletePeer(m.replacements, rand.Intn(len(m.replacements)))
m.active = pushPeer(m.active, r, maxManaged)
}
return
}
// no need to do anything here, as the peer is bumped when handling the pong
}
// peerToReverify returns the oldest peer, or nil if empty.
func (m *manager) peerToReverify() *mpeer {
m.mutex.Lock()
defer m.mutex.Unlock()
if len(m.active) == 0 {
return nil
}
// the last peer is the oldest
return m.active[len(m.active)-1]
}
// updatePeer moves the peer with the given ID to the front of the list of managed peers.
// It returns true if a peer was bumped or false if there was no peer with that id
func (m *manager) updatePeer(update *peer.Peer) bool {
id := update.ID()
for i, p := range m.active {
if p.ID() == id {
if i > 0 {
// move i-th peer to the front
copy(m.active[1:], m.active[:i])
}
// replace first mpeer with a wrap of the updated peer
m.active[0] = &mpeer{
Peer: *update,
verifiedCount: p.verifiedCount + 1,
lastNewPeers: p.lastNewPeers,
}
return true
}
}
return false
}
func (m *manager) addReplacement(p *mpeer) bool {
if containsPeer(m.replacements, p.ID()) {
return false // already in the list
}
m.replacements = unshiftPeer(m.replacements, p, maxReplacements)
return true
}
func (m *manager) loadInitialPeers(masters []*peer.Peer) {
var peers []*peer.Peer
db := m.net.local().Database()
if db != nil {
peers = db.SeedPeers()
}
peers = append(peers, masters...)
for _, p := range peers {
m.addDiscoveredPeer(p)
}
}
// addDiscoveredPeer adds a newly discovered peer that has never been verified or pinged yet.
// It returns true, if the given peer was new and added, false otherwise.
func (m *manager) addDiscoveredPeer(p *peer.Peer) bool {
// never add the local peer
if p.ID() == m.self() {
return false
}
m.mutex.Lock()
defer m.mutex.Unlock()
if containsPeer(m.active, p.ID()) {
return false
}
m.log.Debugw("discovered",
"peer", p,
)
mp := wrapPeer(p)
if len(m.active) >= maxManaged {
return m.addReplacement(mp)
}
m.active = pushPeer(m.active, mp, maxManaged)
return true
}
// addVerifiedPeer adds a new peer that has just been successfully pinged.
// It returns true, if the given peer was new and added, false otherwise.
func (m *manager) addVerifiedPeer(p *peer.Peer) bool {
// never add the local peer
if p.ID() == m.self() {
return false
}
m.log.Debugw("verified",
"peer", p,
"services", p.Services(),
)
m.mutex.Lock()
defer m.mutex.Unlock()
// if already in the list, move it to the front
if m.updatePeer(p) {
return false
}
mp := wrapPeer(p)
mp.verifiedCount = 1
if len(m.active) >= maxManaged {
return m.addReplacement(mp)
}
// trigger the event only when the peer is added to active
Events.PeerDiscovered.Trigger(&DiscoveredEvent{Peer: p})
// new nodes are added to the front
m.active = unshiftPeer(m.active, mp, maxManaged)
return true
}
// getRandomPeers returns a list of randomly selected peers.
func (m *manager) getRandomPeers(n int, minVerified uint) []*peer.Peer {
m.mutex.Lock()
defer m.mutex.Unlock()
if n > len(m.active) {
n = len(m.active)
}
peers := make([]*peer.Peer, 0, n)
for _, i := range rand.Perm(len(m.active)) {
if len(peers) == n {
break
}
mp := m.active[i]
if mp.verifiedCount < minVerified {
continue
}
peers = append(peers, unwrapPeer(mp))
}
return peers
}
// getVerifiedPeers returns all the currently managed peers that have been verified at least once.
func (m *manager) getVerifiedPeers() []*mpeer {
m.mutex.Lock()
defer m.mutex.Unlock()
peers := make([]*mpeer, 0, len(m.active))
for _, mp := range m.active {
if mp.verifiedCount == 0 {
continue
}
peers = append(peers, mp)
}
return peers
}
// isKnown returns true if the manager is keeping track of that peer.
func (m *manager) isKnown(id peer.ID) bool {
if id == m.self() {
return true
}
m.mutex.Lock()
defer m.mutex.Unlock()
return containsPeer(m.active, id) || containsPeer(m.replacements, id)
}