-
Wolfgang Welz authored
* fix: use netutil to check UDP connection * Make ping private * use public address from local * add missing import * update hive.go version * repeat sends directly in protocol * retry ping to ensure verified * Apply suggestions from code review Co-Authored-By:
Luca Moser <moser.luca@gmail.com> Co-authored-by:
Luca Moser <moser.luca@gmail.com>
Wolfgang Welz authored* fix: use netutil to check UDP connection * Make ping private * use public address from local * add missing import * update hive.go version * repeat sends directly in protocol * retry ping to ensure verified * Apply suggestions from code review Co-Authored-By:
Luca Moser <moser.luca@gmail.com> Co-authored-by:
Luca Moser <moser.luca@gmail.com>
manager.go 7.44 KiB
package discover
import (
"math/rand"
"sync"
"time"
"github.com/iotaledger/goshimmer/packages/autopeering/peer"
"github.com/iotaledger/goshimmer/packages/autopeering/server"
"github.com/iotaledger/hive.go/logger"
)
const (
// PingExpiration is the time until a peer verification expires.
PingExpiration = 12 * time.Hour
// MaxPeersInResponse is the maximum number of peers returned in DiscoveryResponse.
MaxPeersInResponse = 6
// MaxServices is the maximum number of services a peer can support.
MaxServices = 5
// VersionNum specifies the expected version number for this Protocol.
VersionNum = 0
)
type network interface {
local() *peer.Local
Ping(*peer.Peer) error
DiscoveryRequest(*peer.Peer) ([]*peer.Peer, error)
}
type manager struct {
mutex sync.Mutex // protects active and replacement
active []*mpeer
replacements []*mpeer
net network
log *logger.Logger
wg sync.WaitGroup
closing chan struct{}
}
func newManager(net network, masters []*peer.Peer, log *logger.Logger) *manager {
m := &manager{
active: make([]*mpeer, 0, maxManaged),
replacements: make([]*mpeer, 0, maxReplacements),
net: net,
log: log,
closing: make(chan struct{}),
}
m.loadInitialPeers(masters)
return m
}
func (m *manager) start() {
m.wg.Add(1)
go m.loop()
}
func (m *manager) self() peer.ID {
return m.net.local().ID()
}
func (m *manager) close() {
close(m.closing)
m.wg.Wait()
}
func (m *manager) loop() {
defer m.wg.Done()
var (
reverify = time.NewTimer(0) // setting this to 0 will cause a trigger right away
reverifyDone chan struct{}
query = time.NewTimer(server.ResponseTimeout) // trigger the first query after the reverify
queryNext chan time.Duration
)
defer reverify.Stop()
defer query.Stop()
Loop:
for {
select {
// start verification, if not yet running
case <-reverify.C:
// if there is no reverifyDone, this means doReverify is not running
if reverifyDone == nil {
reverifyDone = make(chan struct{})
go m.doReverify(reverifyDone)
}
// reset verification
case <-reverifyDone:
reverifyDone = nil
reverify.Reset(reverifyInterval) // reverify again after the given interval
// start requesting new peers, if no yet running
case <-query.C:
if queryNext == nil {
queryNext = make(chan time.Duration)
go m.doQuery(queryNext)
}
// on query done, reset time to given duration
case d := <-queryNext:
queryNext = nil
query.Reset(d)
// on close, exit the loop
case <-m.closing:
break Loop
}
}
// wait for spawned goroutines to finish
if reverifyDone != nil {
<-reverifyDone
}
if queryNext != nil {
<-queryNext
}
}
// doReverify pings the oldest active peer.
func (m *manager) doReverify(done chan<- struct{}) {
defer close(done)
p := m.peerToReverify()
if p == nil {
return // nothing can be reverified
}
m.log.Debugw("reverifying",
"id", p.ID(),
"addr", p.Address(),
)
// could not verify the peer
if m.net.Ping(unwrapPeer(p)) != nil {
m.mutex.Lock()
defer m.mutex.Unlock()
m.active, _ = deletePeerByID(m.active, p.ID())
m.log.Debugw("remove dead",
"peer", p,
)
Events.PeerDeleted.Trigger(&DeletedEvent{Peer: unwrapPeer(p)})
// add a random replacement, if available
if len(m.replacements) > 0 {
var r *mpeer
m.replacements, r = deletePeer(m.replacements, rand.Intn(len(m.replacements)))
m.active = pushPeer(m.active, r, maxManaged)
}
return
}
// no need to do anything here, as the peer is bumped when handling the pong
}
// peerToReverify returns the oldest peer, or nil if empty.
func (m *manager) peerToReverify() *mpeer {
m.mutex.Lock()
defer m.mutex.Unlock()
if len(m.active) == 0 {
return nil
}
// the last peer is the oldest
return m.active[len(m.active)-1]
}
// updatePeer moves the peer with the given ID to the front of the list of managed peers.
// It returns 0 if there was no peer with that id, otherwise the verifiedCount of the updated peer is returned.
func (m *manager) updatePeer(update *peer.Peer) uint {
id := update.ID()
for i, p := range m.active {
if p.ID() == id {
if i > 0 {
// move i-th peer to the front
copy(m.active[1:], m.active[:i])
}
// replace first mpeer with a wrap of the updated peer
m.active[0] = &mpeer{
Peer: *update,
verifiedCount: p.verifiedCount + 1,
lastNewPeers: p.lastNewPeers,
}
return p.verifiedCount + 1
}
}
return 0
}
func (m *manager) addReplacement(p *mpeer) bool {
if containsPeer(m.replacements, p.ID()) {
return false // already in the list
}
m.replacements = unshiftPeer(m.replacements, p, maxReplacements)
return true
}
func (m *manager) loadInitialPeers(masters []*peer.Peer) {
var peers []*peer.Peer
db := m.net.local().Database()
if db != nil {
peers = db.SeedPeers()
}
peers = append(peers, masters...)
for _, p := range peers {
m.addDiscoveredPeer(p)
}
}
// addDiscoveredPeer adds a newly discovered peer that has never been verified or pinged yet.
// It returns true, if the given peer was new and added, false otherwise.
func (m *manager) addDiscoveredPeer(p *peer.Peer) bool {
// never add the local peer
if p.ID() == m.self() {
return false
}
m.mutex.Lock()
defer m.mutex.Unlock()
if containsPeer(m.active, p.ID()) {
return false
}
m.log.Debugw("discovered",
"peer", p,
)
mp := wrapPeer(p)
if len(m.active) >= maxManaged {
return m.addReplacement(mp)
}
m.active = pushPeer(m.active, mp, maxManaged)
return true
}
// addVerifiedPeer adds a new peer that has just been successfully pinged.
// It returns true, if the given peer was new and added, false otherwise.
func (m *manager) addVerifiedPeer(p *peer.Peer) bool {
// never add the local peer
if p.ID() == m.self() {
return false
}
m.log.Debugw("verified",
"peer", p,
"services", p.Services(),
)
m.mutex.Lock()
defer m.mutex.Unlock()
// if already in the list, move it to the front
if v := m.updatePeer(p); v > 0 {
// trigger the event only for the first time the peer is updated
if v == 1 {
Events.PeerDiscovered.Trigger(&DiscoveredEvent{Peer: p})
}
return false
}
mp := wrapPeer(p)
mp.verifiedCount = 1
if len(m.active) >= maxManaged {
return m.addReplacement(mp)
}
// trigger the event only when the peer is added to active
Events.PeerDiscovered.Trigger(&DiscoveredEvent{Peer: p})
// new nodes are added to the front
m.active = unshiftPeer(m.active, mp, maxManaged)
return true
}
// getRandomPeers returns a list of randomly selected peers.
func (m *manager) getRandomPeers(n int, minVerified uint) []*peer.Peer {
m.mutex.Lock()
defer m.mutex.Unlock()
if n > len(m.active) {
n = len(m.active)
}
peers := make([]*peer.Peer, 0, n)
for _, i := range rand.Perm(len(m.active)) {
if len(peers) == n {
break
}
mp := m.active[i]
if mp.verifiedCount < minVerified {
continue
}
peers = append(peers, unwrapPeer(mp))
}
return peers
}
// getVerifiedPeers returns all the currently managed peers that have been verified at least once.
func (m *manager) getVerifiedPeers() []*mpeer {
m.mutex.Lock()
defer m.mutex.Unlock()
peers := make([]*mpeer, 0, len(m.active))
for _, mp := range m.active {
if mp.verifiedCount == 0 {
continue
}
peers = append(peers, mp)
}
return peers
}
// isKnown returns true if the manager is keeping track of that peer.
func (m *manager) isKnown(id peer.ID) bool {
if id == m.self() {
return true
}
m.mutex.Lock()
defer m.mutex.Unlock()
return containsPeer(m.active, id) || containsPeer(m.replacements, id)
}