Skip to content
Snippets Groups Projects
Select Git revision
  • 8fde09bf5b40f82559c2cf7333b182acde468d02
  • develop default protected
  • congestioncontrol
  • merge-v-data-collection-spammer-0.8.2
  • WIP-merge-v-data-collection-spammer-0.8.2
  • merge-v-data-collection-spammer-0.7.7
  • tmp
  • test-masterpow-fixing
  • test-masterpow
  • test-echo
  • v-data-collection
  • v-data-collection-spammer
  • tmp-dump-spam-info
  • dump-msg-info-0.3.1
  • test-dump-message-info
  • spammer-exprandom
  • extra/tutorial
  • without_tipselection
  • hacking-docker-network
  • hacking-docker-network-0.2.3
  • master
  • v0.2.3
22 results

manager.go

Blame
  • user avatar
    Wolfgang Welz authored and GitHub committed
    * :sparkles: GetNextCandidate selection added
    
    * :bug: Recompile salt.proto
    
    * :recycle: Refactor peer IDs completely
    
    * :bug: Recompile proto sources
    
    * :construction: WIP
    
    * :ok_hand: renamed to sort
    
    * :construction: Return requested peers on request
    
    * :sparkles: Kepp track how ofter a managed peer was verified
    
    * :wrench: sort adapted to the new peer
    
    * :construction: Query verified peers for new peers
    
    * :construction: WIP
    
    * :white_check_mark: Add manager tests
    
    * :bento: Add peering messages to protobuf
    
    * :art: Make Salt methods consistent with Peer
    
    * :sparkles: Handle peering messages
    
    * :construction: WIP
    
    * :bug: Add To filed to PeersRequest
    
    * :white_check_mark: Add PeersRequest test
    
    * :zap: Adding PeersRequest benchmark
    
    * :art: Move mpeer and helper functions to own file
    
    * :bug: Bump peer only once on reverification
    
    * :construction: WIP
    
    * :bug:
    
    * :construction: WIP
    
    * :bug: Increase buffer to prevent deadlocks
    
    * :zap: Remove unnecessary lock
    
    * :white_check_mark: Add peering request test
    
    * :white_check_mark: Make tests deterministic by triggering reverify
    
    * :construction: WIP
    
    * :lock: fixing concurrency issues
    
    * :white_check_mark: testManager improved
    
    * :bug: Don't send PeeringResponse for invalid salt
    
    * :construction: WIP
    
    * :construction: WIP
    
    * :pencil: removed commented code
    
    * :construction: WIP
    
    * :construction: WIP
    
    * Neighbourhood manager (#11)
    
    * :sparkles: GetNextCandidate selection added
    
    * :lock: fixing concurrency issues
    
    * :white_check_mark: testManager improved
    
    * :pencil: removed commented code
    
    * :construction: WIP
    
    * :construction: WIP
    
    * :construction: WIP
    
    * :construction: WIP
    
    * :heavy_check_mark: Make TestAdd pass in neighborhood_test
    
    `Add` should do nothing if the neighborhood list is full.
    
    * :white_check_mark: Improve TestSrvVerifyBoot
    
    * :arrow_up: Upgrade to go 1.13
    
    * :heavy_minus_sign: Use testify instead of magiconair/properties
    
    * :construction:
    
    * :construction: WIP
    
    * :construction: WIP
    
    * :construction: WIP
    
    * :construction: WIP
    
    * Simulation (#14)
    
    * :sparkles: clear rejection filter after refreshing the public salt
    
    * :sparkles: clean rejection filter
    
    * :white_check_mark: Add mpeer test
    
    * :art: gofmt test
    
    * :art: Remove ineffactual assignments
    
    * :hammer: Trigger panic when index is out of bound in deletePeer
    
    * :white_check_mark: Add selection test
    
    * :white_check_mark: Add TestGetFurtherest and TestGetPeerIndex
    
    * :sparkles: both salt updated at the same time and neighborhood dropped
    
    * :art: Add node peering history for simulation
    
    * :construction: WIP
    
    * :sparkles: added sim visualization
    
    * :fire: removed root handler
    
    * :pencil: added README
    
    * :pencil: updated README
    
    * :heavy_minus_sign: removed unused dependencies
    
    * :art: Tidy go.mod
    
    * :construction: Work in progress
    
    * :construction: WIP
    
    * :construction: WIP
    
    * :construction: WIP
    
    * :lipstick: improved start button
    
    * :construction: WIP
    
    * :sparkles: added keyboard support for start
    
    * :construction: WIP
    
    * :construction: WIP
    
    * :construction: WIP
    
    * :construction: WIP
    
    * :construction: WIP
    
    * :construction: WIP
    
    * :sparkles: added input parameters
    
    * :pencil: README updated
    
    * :lipstick: gif updated
    
    * :lipstick: figure updated
    
    * :pencil: updated
    
    * :lipstick: updated gif
    
    * removed simulation metrics
    
    * :pencil: updated
    
    * :pencil: updated
    
    * :recycle: Extract server from discovery and peering
    
    * :bug: Use golang/protobuf
    
    * Update README.md
    
    * :recycle: Rename PeersRequest to DiscoveryRequest
    
    * :pencil2: Fixing typos
    
    * :art: Move unused function to tests
    
    * :recycle: The selection protocol depends on the discovery
    
    * :white_check_mark: Make tests more robust when using -race
    
    * :loud_sound: Improve logging
    
    * :art: Remove unnecessary arguments
    
    * :bug: Fix data races
    
    * :bug: added timeout for simulated network
    
    * :art: added loop to animation
    
    * :recycle: rename neighborhood to selection
    
    * :sparkles: adds initial salt and fixes simulation end
    
    * :pencil: visualizer enabled by default
    
    * :sparkles: new parameter dropAll and improved python script
    
    * :pencil: updated README
    
    * :pencil: updated README
    
    * :bug: fix salt initialization
    
    * :pencil: added blogpost link
    
    * :pencil: Add badges to README
    
    * :construction_worker: Add Travis CI for tests
    
    * :rotating_light: Correct formating
    
    * :construction_worker: Running golangci on Travis
    
    * :rotating_light: Ignore return value of 'logger.Sync'
    
    * :rotating_light: Remove unused functions
    
    * :pencil: Add link to license
    
    * :art: Move simnetwork to transport
    
    * :art: Use the complete network protocol in simulations
    
    * :recycle: Do not export selection/manager
    
    * :fire: Remove gRPC transport layer
    
    * :white_check_mark: Add UDP connection transport test
    
    * :construction: Implement the peer DB using Goshimmer database
    
    * :heavy_plus_sign: Use the local GoShimmer version
    
    * :sparkles: Add support for a persistent database
    
    * :sparkles: Persist private key of local peer in database
    
    * :bug: Set TTL for bootstrap peers
    
    * :construction: Use GoShimmer events
    
    * :sparkles: Store the supported services in the local peer
    
    * :pushpin: Use most current GoShimmer git version as a dep
    
    * :heavy_plus_sign: Switch to hive.go event package
    
    * :art: Use correct module iotaledger/autopeering-sim
    
    * :bug: Provide dummy service in autopeering simulation
    
    * :sparkles: adds service support in the selection
    
    * :sparkles: adds peer discovered events
    
    * :sparkles: adds GetIncomingNeighbors and GetOutgoingNeighbors
    
    * :bug: fixes out of bound error in splitNodeKey
    
    * :sparkles: adds public IP support
    
    * :bug: fixes localhost
    
    * :bug: fixes localhost parsing
    
    * :wrench: changes selection loop to 1s
    
    * :loud_sound: switches from fmt.Println to log.Debug
    
    * :wrench: increases maxKnown to 1000 and always triggers discovered peer
    
    * :sparkles: adds PeerDeleted event
    
    * :construction: moves PeerDeleted event into deletePeer
    
    * :sparkles: adds config paramters for the peer discovery
    
    * :sparkles: adds config parameters to neighbor selection
    
    * :sparkles: enable/disable inbound/outbound selection
    
    * :bulb: Improve Godoc comments
    
    * :sparkles: modifies disabled outbound selection
    
    * :bug: fixes bug with disabling the selection
    
    * :heavy_minus_sign: removes getMyIP() from server
    
    * :mute: removes some debugging logs
    
    * :construction: Introduce services
    
    - Each peer now comes with a set of services
    - Local peer is a proper peer
    - Services are exchanged during Pong and are available for all verified
    peers
    
    * :bug: fixes GetVerifiedPeer
    
    * :sparkles: adds gossip key to the service
    
    * :mute: removes debugging logs
    
    * :white_check_mark: Add test for GetVerifiedPeer
    
    * :bug: Fix main
    
    * :art: Add localNetwork to Protocol
    
    * :bug: Add new but verified peers to the manager
    
    * :wrench: changes configurable parameters
    
    * :bug: fixes DiscoveryRequest field sequence
    
    * :bento: Regenerate proto files
    
    * :art: Cleanup parameters
    
    * :bug: Fix potential race condition
    
    * :mute: Reduce logging verbosity
    
    * :white_check_mark: Add test for selection+discover
    
    * :sparkles: Return net.Addr in Transport
    
    * :bug: Remove inbound/outbound switches completely
    
    * :loud_sound: Improve logging
    
    * :loud_sound: Fix peerdb logs
    
    * :bug: Fix peer updating
    
    * :white_check_mark: Make TestProtFull more robust
    
    * :art: Make queryInterval a parameter
    
    * :loud_sound: Improve loggind during querying
    
    * :bug: Trigger PeerDiscovered only for active peers
    
    * :art: Cleanup protocol tests
    
    * :white_check_mark: Add discovery test on protocol level
    
    * :art: Rename maxVerified to maxManaged
    
    * :wrench: Increase default query interval
    
    * :art: Improve discover benchmarks
    
    * :white_check_mark: Fix manager tests
    
    * :art: Do not use bugged assert.Eventually
    
    * :rotating_light: Fix linter warnings in server
    
    * :rotating_light: Remove unused parameters
    
    * :art: Make transport work on slices of bytes
    
    * :pencil2: Fix typo in comments
    
    * :rotating_light: Fix linter warnings
    
    * :art: UpdateService accepts two strings
    
    * :white_check_mark: Add test that services are received in discover
    
    * :sparkles: adds required services
    
    * :art: Handle closed connections consistently
    
    * :art: Code cleanup
    
    * :bug: fixes DropPeer
    
    * :art: improves debug messages
    
    * :loud_sound: Log errors during reverification
    
    * :loud_sound:
    
     Log packet size
    
    * refactor: remove unused files
    
    * refactor: use internal autopeering package
    
    Co-authored-by: default avatarAngelo Capossele <angelocapossele@gmail.com>
    Co-authored-by: default avatarjkrvivian <jkrvivian@gmail.com>
    8fde09bf
    History
    manager.go 7.98 KiB
    package discover
    
    import (
    	"math/rand"
    	"sync"
    	"time"
    
    	"github.com/iotaledger/goshimmer/packages/autopeering/peer"
    	"github.com/iotaledger/goshimmer/packages/autopeering/server"
    	"go.uber.org/zap"
    )
    
    var (
    	reverifyInterval = DefaultReverifyInterval // time interval after which the next peer is reverified
    	reverifyTries    = DefaultReverifyTries    // number of times a peer is pinged before it is removed
    	queryInterval    = DefaultQueryInterval    // time interval after which peers are queried for new peers
    	maxManaged       = DefaultMaxManaged       // maximum number of peers that can be managed
    	maxReplacements  = DefaultMaxReplacements  // maximum number of peers kept in the replacement list
    )
    
    type network interface {
    	local() *peer.Local
    
    	ping(*peer.Peer) error
    	discoveryRequest(*peer.Peer) ([]*peer.Peer, error)
    }
    
    type manager struct {
    	mutex        sync.Mutex // protects active and replacement
    	active       []*mpeer
    	replacements []*mpeer
    
    	net network
    	log *zap.SugaredLogger
    
    	wg      sync.WaitGroup
    	closing chan struct{}
    }
    
    func newManager(net network, masters []*peer.Peer, log *zap.SugaredLogger, param *Parameters) *manager {
    	if param != nil {
    		if param.ReverifyInterval > 0 {
    			reverifyInterval = param.ReverifyInterval
    		}
    		if param.ReverifyTries > 0 {
    			reverifyTries = param.ReverifyTries
    		}
    		if param.QueryInterval > 0 {
    			queryInterval = param.QueryInterval
    		}
    		if param.MaxManaged > 0 {
    			maxManaged = param.MaxManaged
    		}
    		if param.MaxReplacements > 0 {
    			maxReplacements = param.MaxReplacements
    		}
    	}
    
    	m := &manager{
    		active:       make([]*mpeer, 0, maxManaged),
    		replacements: make([]*mpeer, 0, maxReplacements),
    		net:          net,
    		log:          log,
    		closing:      make(chan struct{}),
    	}
    	m.loadInitialPeers(masters)
    
    	return m
    }
    
    func (m *manager) start() {
    	m.wg.Add(1)
    	go m.loop()
    }
    
    func (m *manager) self() peer.ID {
    	return m.net.local().ID()
    }
    
    func (m *manager) close() {
    	close(m.closing)
    	m.wg.Wait()
    }
    
    func (m *manager) loop() {
    	defer m.wg.Done()
    
    	var (
    		reverify     = time.NewTimer(0) // setting this to 0 will cause a trigger right away
    		reverifyDone chan struct{}
    
    		query     = time.NewTimer(server.ResponseTimeout) // trigger the first query after the reverify
    		queryNext chan time.Duration
    	)
    	defer reverify.Stop()
    	defer query.Stop()
    
    Loop:
    	for {
    		select {
    		// start verification, if not yet running
    		case <-reverify.C:
    			// if there is no reverifyDone, this means doReverify is not running
    			if reverifyDone == nil {
    				reverifyDone = make(chan struct{})
    				go m.doReverify(reverifyDone)
    			}
    
    		// reset verification
    		case <-reverifyDone:
    			reverifyDone = nil
    			reverify.Reset(reverifyInterval) // reverify again after the given interval
    
    		// start requesting new peers, if no yet running
    		case <-query.C:
    			if queryNext == nil {
    				queryNext = make(chan time.Duration)
    				go m.doQuery(queryNext)
    			}
    
    		// on query done, reset time to given duration
    		case d := <-queryNext:
    			queryNext = nil
    			query.Reset(d)
    
    		// on close, exit the loop
    		case <-m.closing:
    			break Loop
    		}
    	}
    
    	// wait for spawned goroutines to finish
    	if reverifyDone != nil {
    		<-reverifyDone
    	}
    	if queryNext != nil {
    		<-queryNext
    	}
    }
    
    // doReverify pings the oldest active peer.
    func (m *manager) doReverify(done chan<- struct{}) {
    	defer close(done)
    
    	p := m.peerToReverify()
    	if p == nil {
    		return // nothing can be reverified
    	}
    	m.log.Debugw("reverifying",
    		"id", p.ID(),
    		"addr", p.Address(),
    	)
    
    	var err error
    	for i := 0; i < reverifyTries; i++ {
    		err = m.net.ping(unwrapPeer(p))
    		if err == nil {
    			break
    		} else {
    			m.log.Debugw("ping failed",
    				"id", p.ID(),
    				"addr", p.Address(),
    				"err", err,
    			)
    			time.Sleep(1 * time.Second)
    		}
    	}
    
    	// could not verify the peer
    	if err != nil {
    		m.mutex.Lock()
    		defer m.mutex.Unlock()
    
    		m.active, _ = deletePeerByID(m.active, p.ID())
    		m.log.Debugw("remove dead",
    			"peer", p,
    		)
    		Events.PeerDeleted.Trigger(&DeletedEvent{Peer: unwrapPeer(p)})
    
    		// add a random replacement, if available
    		if len(m.replacements) > 0 {
    			var r *mpeer
    			m.replacements, r = deletePeer(m.replacements, rand.Intn(len(m.replacements)))
    			m.active = pushPeer(m.active, r, maxManaged)
    		}
    		return
    	}
    
    	// no need to do anything here, as the peer is bumped when handling the pong
    }
    
    // peerToReverify returns the oldest peer, or nil if empty.
    func (m *manager) peerToReverify() *mpeer {
    	m.mutex.Lock()
    	defer m.mutex.Unlock()
    
    	if len(m.active) == 0 {
    		return nil
    	}
    	// the last peer is the oldest
    	return m.active[len(m.active)-1]
    }
    
    // updatePeer moves the peer with the given ID to the front of the list of managed peers.
    // It returns true if a peer was bumped or false if there was no peer with that id
    func (m *manager) updatePeer(update *peer.Peer) bool {
    	id := update.ID()
    	for i, p := range m.active {
    		if p.ID() == id {
    			if i > 0 {
    				//  move i-th peer to the front
    				copy(m.active[1:], m.active[:i])
    			}
    			// replace first mpeer with a wrap of the updated peer
    			m.active[0] = &mpeer{
    				Peer:          *update,
    				verifiedCount: p.verifiedCount + 1,
    				lastNewPeers:  p.lastNewPeers,
    			}
    
    			return true
    		}
    	}
    	return false
    }
    
    func (m *manager) addReplacement(p *mpeer) bool {
    	if containsPeer(m.replacements, p.ID()) {
    		return false // already in the list
    	}
    	m.replacements = unshiftPeer(m.replacements, p, maxReplacements)
    	return true
    }
    
    func (m *manager) loadInitialPeers(masters []*peer.Peer) {
    	var peers []*peer.Peer
    
    	db := m.net.local().Database()
    	if db != nil {
    		peers = db.SeedPeers()
    	}
    	peers = append(peers, masters...)
    	for _, p := range peers {
    		m.addDiscoveredPeer(p)
    	}
    }
    
    // addDiscoveredPeer adds a newly discovered peer that has never been verified or pinged yet.
    // It returns true, if the given peer was new and added, false otherwise.
    func (m *manager) addDiscoveredPeer(p *peer.Peer) bool {
    	// never add the local peer
    	if p.ID() == m.self() {
    		return false
    	}
    
    	m.mutex.Lock()
    	defer m.mutex.Unlock()
    
    	if containsPeer(m.active, p.ID()) {
    		return false
    	}
    	m.log.Debugw("discovered",
    		"peer", p,
    	)
    
    	mp := wrapPeer(p)
    	if len(m.active) >= maxManaged {
    		return m.addReplacement(mp)
    	}
    
    	m.active = pushPeer(m.active, mp, maxManaged)
    	return true
    }
    
    // addVerifiedPeer adds a new peer that has just been successfully pinged.
    // It returns true, if the given peer was new and added, false otherwise.
    func (m *manager) addVerifiedPeer(p *peer.Peer) bool {
    	// never add the local peer
    	if p.ID() == m.self() {
    		return false
    	}
    
    	m.log.Debugw("verified",
    		"peer", p,
    		"services", p.Services(),
    	)
    
    	m.mutex.Lock()
    	defer m.mutex.Unlock()
    
    	// if already in the list, move it to the front
    	if m.updatePeer(p) {
    		return false
    	}
    
    	mp := wrapPeer(p)
    	mp.verifiedCount = 1
    
    	if len(m.active) >= maxManaged {
    		return m.addReplacement(mp)
    	}
    	// trigger the event only when the peer is added to active
    	Events.PeerDiscovered.Trigger(&DiscoveredEvent{Peer: p})
    
    	// new nodes are added to the front
    	m.active = unshiftPeer(m.active, mp, maxManaged)
    	return true
    }
    
    // getRandomPeers returns a list of randomly selected peers.
    func (m *manager) getRandomPeers(n int, minVerified uint) []*peer.Peer {
    	m.mutex.Lock()
    	defer m.mutex.Unlock()
    
    	if n > len(m.active) {
    		n = len(m.active)
    	}
    
    	peers := make([]*peer.Peer, 0, n)
    	for _, i := range rand.Perm(len(m.active)) {
    		if len(peers) == n {
    			break
    		}
    
    		mp := m.active[i]
    		if mp.verifiedCount < minVerified {
    			continue
    		}
    		peers = append(peers, unwrapPeer(mp))
    	}
    
    	return peers
    }
    
    // getVerifiedPeers returns all the currently managed peers that have been verified at least once.
    func (m *manager) getVerifiedPeers() []*mpeer {
    	m.mutex.Lock()
    	defer m.mutex.Unlock()
    
    	peers := make([]*mpeer, 0, len(m.active))
    	for _, mp := range m.active {
    		if mp.verifiedCount == 0 {
    			continue
    		}
    		peers = append(peers, mp)
    	}
    
    	return peers
    }
    
    // isKnown returns true if the manager is keeping track of that peer.
    func (m *manager) isKnown(id peer.ID) bool {
    	if id == m.self() {
    		return true
    	}
    
    	m.mutex.Lock()
    	defer m.mutex.Unlock()
    
    	return containsPeer(m.active, id) || containsPeer(m.replacements, id)
    }