Skip to content
Snippets Groups Projects
Select Git revision
  • 2d72324b12a6d78175989f25050aadc465fe8fb0
  • develop default protected
  • congestioncontrol
  • merge-v-data-collection-spammer-0.8.2
  • WIP-merge-v-data-collection-spammer-0.8.2
  • merge-v-data-collection-spammer-0.7.7
  • tmp
  • test-masterpow-fixing
  • test-masterpow
  • test-echo
  • v-data-collection
  • v-data-collection-spammer
  • tmp-dump-spam-info
  • dump-msg-info-0.3.1
  • test-dump-message-info
  • spammer-exprandom
  • extra/tutorial
  • without_tipselection
  • hacking-docker-network
  • hacking-docker-network-0.2.3
  • master
  • v0.2.3
22 results

manager.go

Blame
  • user avatar
    Wolfgang Welz authored
    2d72324b
    History
    manager.go 8.17 KiB
    package discover
    
    import (
    	"math/rand"
    	"sync"
    	"time"
    
    	"github.com/iotaledger/goshimmer/packages/autopeering/peer"
    	"github.com/iotaledger/goshimmer/packages/autopeering/server"
    	"go.uber.org/zap"
    )
    
    var (
    	reverifyInterval = DefaultReverifyInterval // time interval after which the next peer is reverified
    	reverifyTries    = DefaultReverifyTries    // number of times a peer is pinged before it is removed
    	queryInterval    = DefaultQueryInterval    // time interval after which peers are queried for new peers
    	maxManaged       = DefaultMaxManaged       // maximum number of peers that can be managed
    	maxReplacements  = DefaultMaxReplacements  // maximum number of peers kept in the replacement list
    )
    
    type network interface {
    	local() *peer.Local
    
    	ping(*peer.Peer) error
    	discoveryRequest(*peer.Peer) ([]*peer.Peer, error)
    }
    
    type manager struct {
    	mutex        sync.Mutex // protects active and replacement
    	active       []*mpeer
    	replacements []*mpeer
    
    	net network
    	log *zap.SugaredLogger
    
    	wg      sync.WaitGroup
    	closing chan struct{}
    }
    
    func newManager(net network, masters []*peer.Peer, log *zap.SugaredLogger, param *Parameters) *manager {
    	if param != nil {
    		if param.ReverifyInterval > 0 {
    			reverifyInterval = param.ReverifyInterval
    		}
    		if param.ReverifyTries > 0 {
    			reverifyTries = param.ReverifyTries
    		}
    		if param.QueryInterval > 0 {
    			queryInterval = param.QueryInterval
    		}
    		if param.MaxManaged > 0 {
    			maxManaged = param.MaxManaged
    		}
    		if param.MaxReplacements > 0 {
    			maxReplacements = param.MaxReplacements
    		}
    	}
    
    	m := &manager{
    		active:       make([]*mpeer, 0, maxManaged),
    		replacements: make([]*mpeer, 0, maxReplacements),
    		net:          net,
    		log:          log,
    		closing:      make(chan struct{}),
    	}
    	m.loadInitialPeers(masters)
    
    	return m
    }
    
    func (m *manager) start() {
    	m.wg.Add(1)
    	go m.loop()
    }
    
    func (m *manager) self() peer.ID {
    	return m.net.local().ID()
    }
    
    func (m *manager) close() {
    	close(m.closing)
    	m.wg.Wait()
    }
    
    func (m *manager) loop() {
    	defer m.wg.Done()
    
    	var (
    		reverify     = time.NewTimer(0) // setting this to 0 will cause a trigger right away
    		reverifyDone chan struct{}
    
    		query     = time.NewTimer(server.ResponseTimeout) // trigger the first query after the reverify
    		queryNext chan time.Duration
    	)
    	defer reverify.Stop()
    	defer query.Stop()
    
    Loop:
    	for {
    		select {
    		// start verification, if not yet running
    		case <-reverify.C:
    			// if there is no reverifyDone, this means doReverify is not running
    			if reverifyDone == nil {
    				reverifyDone = make(chan struct{})
    				go m.doReverify(reverifyDone)
    			}
    
    		// reset verification
    		case <-reverifyDone:
    			reverifyDone = nil
    			reverify.Reset(reverifyInterval) // reverify again after the given interval
    
    		// start requesting new peers, if no yet running
    		case <-query.C:
    			if queryNext == nil {
    				queryNext = make(chan time.Duration)
    				go m.doQuery(queryNext)
    			}
    
    		// on query done, reset time to given duration
    		case d := <-queryNext:
    			queryNext = nil
    			query.Reset(d)
    
    		// on close, exit the loop
    		case <-m.closing:
    			break Loop
    		}
    	}
    
    	// wait for spawned goroutines to finish
    	if reverifyDone != nil {
    		<-reverifyDone
    	}
    	if queryNext != nil {
    		<-queryNext
    	}
    }
    
    // doReverify pings the oldest active peer.
    func (m *manager) doReverify(done chan<- struct{}) {
    	defer close(done)
    
    	p := m.peerToReverify()
    	if p == nil {
    		return // nothing can be reverified
    	}
    	m.log.Debugw("reverifying",
    		"id", p.ID(),
    		"addr", p.Address(),
    	)
    
    	var err error
    	for i := 0; i < reverifyTries; i++ {
    		err = m.net.ping(unwrapPeer(p))
    		if err == nil {
    			break
    		} else {
    			m.log.Debugw("ping failed",
    				"id", p.ID(),
    				"addr", p.Address(),
    				"err", err,
    			)
    			time.Sleep(1 * time.Second)
    		}
    	}
    
    	// could not verify the peer
    	if err != nil {
    		m.mutex.Lock()
    		defer m.mutex.Unlock()
    
    		m.active, _ = deletePeerByID(m.active, p.ID())
    		m.log.Debugw("remove dead",
    			"peer", p,
    		)
    		Events.PeerDeleted.Trigger(&DeletedEvent{Peer: unwrapPeer(p)})
    
    		// add a random replacement, if available
    		if len(m.replacements) > 0 {
    			var r *mpeer
    			m.replacements, r = deletePeer(m.replacements, rand.Intn(len(m.replacements)))
    			m.active = pushPeer(m.active, r, maxManaged)
    		}
    		return
    	}
    
    	// no need to do anything here, as the peer is bumped when handling the pong
    }
    
    // peerToReverify returns the oldest peer, or nil if empty.
    func (m *manager) peerToReverify() *mpeer {
    	m.mutex.Lock()
    	defer m.mutex.Unlock()
    
    	if len(m.active) == 0 {
    		return nil
    	}
    	// the last peer is the oldest
    	return m.active[len(m.active)-1]
    }
    
    // updatePeer moves the peer with the given ID to the front of the list of managed peers.
    // It returns 0 if there was no peer with that id, otherwise the verifiedCount of the updated peer is returned.
    func (m *manager) updatePeer(update *peer.Peer) uint {
    	id := update.ID()
    	for i, p := range m.active {
    		if p.ID() == id {
    			if i > 0 {
    				//  move i-th peer to the front
    				copy(m.active[1:], m.active[:i])
    			}
    			// replace first mpeer with a wrap of the updated peer
    			m.active[0] = &mpeer{
    				Peer:          *update,
    				verifiedCount: p.verifiedCount + 1,
    				lastNewPeers:  p.lastNewPeers,
    			}
    			return p.verifiedCount + 1
    		}
    	}
    	return 0
    }
    
    func (m *manager) addReplacement(p *mpeer) bool {
    	if containsPeer(m.replacements, p.ID()) {
    		return false // already in the list
    	}
    	m.replacements = unshiftPeer(m.replacements, p, maxReplacements)
    	return true
    }
    
    func (m *manager) loadInitialPeers(masters []*peer.Peer) {
    	var peers []*peer.Peer
    
    	db := m.net.local().Database()
    	if db != nil {
    		peers = db.SeedPeers()
    	}
    	peers = append(peers, masters...)
    	for _, p := range peers {
    		m.addDiscoveredPeer(p)
    	}
    }
    
    // addDiscoveredPeer adds a newly discovered peer that has never been verified or pinged yet.
    // It returns true, if the given peer was new and added, false otherwise.
    func (m *manager) addDiscoveredPeer(p *peer.Peer) bool {
    	// never add the local peer
    	if p.ID() == m.self() {
    		return false
    	}
    
    	m.mutex.Lock()
    	defer m.mutex.Unlock()
    
    	if containsPeer(m.active, p.ID()) {
    		return false
    	}
    	m.log.Debugw("discovered",
    		"peer", p,
    	)
    
    	mp := wrapPeer(p)
    	if len(m.active) >= maxManaged {
    		return m.addReplacement(mp)
    	}
    
    	m.active = pushPeer(m.active, mp, maxManaged)
    	return true
    }
    
    // addVerifiedPeer adds a new peer that has just been successfully pinged.
    // It returns true, if the given peer was new and added, false otherwise.
    func (m *manager) addVerifiedPeer(p *peer.Peer) bool {
    	// never add the local peer
    	if p.ID() == m.self() {
    		return false
    	}
    
    	m.log.Debugw("verified",
    		"peer", p,
    		"services", p.Services(),
    	)
    
    	m.mutex.Lock()
    	defer m.mutex.Unlock()
    
    	// if already in the list, move it to the front
    	if v := m.updatePeer(p); v > 0 {
    		// trigger the event only for the first time the peer is updated
    		if v == 1 {
    			Events.PeerDiscovered.Trigger(&DiscoveredEvent{Peer: p})
    		}
    		return false
    	}
    
    	mp := wrapPeer(p)
    	mp.verifiedCount = 1
    
    	if len(m.active) >= maxManaged {
    		return m.addReplacement(mp)
    	}
    	// trigger the event only when the peer is added to active
    	Events.PeerDiscovered.Trigger(&DiscoveredEvent{Peer: p})
    
    	// new nodes are added to the front
    	m.active = unshiftPeer(m.active, mp, maxManaged)
    	return true
    }
    
    // getRandomPeers returns a list of randomly selected peers.
    func (m *manager) getRandomPeers(n int, minVerified uint) []*peer.Peer {
    	m.mutex.Lock()
    	defer m.mutex.Unlock()
    
    	if n > len(m.active) {
    		n = len(m.active)
    	}
    
    	peers := make([]*peer.Peer, 0, n)
    	for _, i := range rand.Perm(len(m.active)) {
    		if len(peers) == n {
    			break
    		}
    
    		mp := m.active[i]
    		if mp.verifiedCount < minVerified {
    			continue
    		}
    		peers = append(peers, unwrapPeer(mp))
    	}
    
    	return peers
    }
    
    // getVerifiedPeers returns all the currently managed peers that have been verified at least once.
    func (m *manager) getVerifiedPeers() []*mpeer {
    	m.mutex.Lock()
    	defer m.mutex.Unlock()
    
    	peers := make([]*mpeer, 0, len(m.active))
    	for _, mp := range m.active {
    		if mp.verifiedCount == 0 {
    			continue
    		}
    		peers = append(peers, mp)
    	}
    
    	return peers
    }
    
    // isKnown returns true if the manager is keeping track of that peer.
    func (m *manager) isKnown(id peer.ID) bool {
    	if id == m.self() {
    		return true
    	}
    
    	m.mutex.Lock()
    	defer m.mutex.Unlock()
    
    	return containsPeer(m.active, id) || containsPeer(m.replacements, id)
    }