Skip to content
Snippets Groups Projects
Unverified Commit 1134f3fd authored by Luca Moser's avatar Luca Moser Committed by GitHub
Browse files

Adds changes for v0.1.2 (#266)


* New binary tangle using atomic transactions (#239)

* Feat: started porting the new binary stuff

* Refactor: removed unnecessary folder

* Refactor: cleaned up go.mod files

* Fix: removed objectsdb files

* Feat: added transactionrequester package

* Adds the transactionParser as a fliter and pre-processing mechanism of the tangle. (#242)

* Feat: started porting the new binary stuff

* Refactor: removed unnecessary folder

* Refactor: cleaned up go.mod files

* Fix: removed objectsdb files

* Feat: added transactionrequester package

* Feat: added new transactionparser as the filter for the tangle

* Feat: Use hive.go autopeering (#250)

* use autopeering from hive.go
* update hive.go

* update hive.go

* Adds the TransactionRequester and some refactors (#256)

* Feat: started porting the new binary stuff

* Refactor: removed unnecessary folder

* Refactor: cleaned up go.mod files

* Fix: removed objectsdb files

* Feat: added transactionrequester package

* Feat: added new transactionparser as the filter for the tangle

* Refactor: removed duplicate code

* Fix: Log dropping packets every 1000 drops (#255)

* Fix: Log dropping packets every 1000 drops

* use an atomic counter for dropped message because Write() could be called concurrently

* removes redundant zero

* Add external check in test

Co-authored-by: default avatarLuca Moser <moser.luca@gmail.com>

* Fix: Update pprof port (#244)

Since both Hornet and Goshimmer might be running on the same machine, update the port so that we can get debug information for both.

* Remove docker specific config (#245)

* :arrow_up: upgrades hive.go

* :bug: checks meta_tx size

* :heavy_minus_sign: removes dependencies

* :sparkles: enables local testing

* :sparkles: adds config plugin

* Extend remote log message (#260)

* Bump up Logstash and ElasticSearch memory #251

* Add version and if available GIT information to remote log message #251

* :bug:

 fixes flag parsing before loading config file

* Feat: Add --version to cli (#264)

* Adds changelog entry for v0.1.2 and bumps version number (#265)

* updates changelog for v0.1.2 release

* bumps version number to v0.1.2

Co-authored-by: default avatarHans Moog <hm@mkjc.net>
Co-authored-by: default avatarWolfgang Welz <welzwo@gmail.com>
Co-authored-by: default avatarjkrvivian <jkrvivian@gmail.com>
Co-authored-by: default avatarDave <44786846+centercirclesolutions@users.noreply.github.com>
Co-authored-by: default avatarAngelo Capossele <angelocapossele@gmail.com>
Co-authored-by: default avatarJonas Theis <mail@jonastheis.de>
parent 5d92569b
No related branches found
No related tags found
No related merge requests found
Showing
with 85 additions and 2178 deletions
......@@ -21,6 +21,7 @@ testNodes/*
# Database directory
mainnetdb/
objectsdb/
# OSX related files
.DS_Store
......
# v0.1.2 - 2020-02-24
* Adds `--version` flag to retrieve the GoShimmer version
* Adds the version and commit hash to the remote log logging
* Replaces the autopeering module with the one from hive.go
* Changed the pprof listen port to `6061` to avoid conflict with Hornet
* Fixes `invalid stored peer` messages
* Fixes masternodes getting removed if they were offline
* Fixes `-c` and `-d` to define config file/dir
* Fixes drop messages about full queues appearing too many times
* Fixes crash due to incopatible transaction size
* Changed the salt lifetime to 2 hours from 30 minutes
# v0.1.1 - 2020-02-07
This release contains a series of fixes:
......
......@@ -27,11 +27,10 @@ VOLUME /app/mainnetdb
EXPOSE 14666/tcp
EXPOSE 14626/udp
EXPOSE 14626/tcp
# Copy the Pre-built binary file from the previous stage
COPY --from=build /go/bin/goshimmer .
# Copy the docker config
COPY docker.config.json config.json
COPY config.json config.json
ENTRYPOINT ["./goshimmer"]
{
"analysis": {
"serveraddress": "ressims.iota.cafe:188",
"serverport": 0
},
"autopeering": {
"address": "0.0.0.0",
"entrynodes": [
"V8LYtWWcPYYDTTXLeIEFjJEuWlsjDiI0+Pq/Cx9ai6g=@116.202.49.178:14626"
],
"port": 14626
},
"database": {
"directory": "mainnetdb"
},
"gossip": {
"port": 14666
},
"logger": {
"Level": "info",
"DisableCaller": true,
"DisableStacktrace": false,
"Encoding": "console",
"OutputPaths": [
"stdout",
"goshimmer.log"
],
"DisableEvents": true
},
"node": {
"disablePlugins": [],
"enablePlugins": []
}
}
......@@ -13,31 +13,36 @@ require (
github.com/googollee/go-engine.io v1.4.3-0.20190924125625-798118fc0dd2
github.com/googollee/go-socket.io v1.4.3-0.20191204093753-683f8725b6d0
github.com/gorilla/websocket v1.4.1
github.com/iotaledger/hive.go v0.0.0-20200207144536-27b18f10f09e
github.com/iotaledger/hive.go v0.0.0-20200219224037-2d5f5238c0de
github.com/iotaledger/iota.go v1.0.0-beta.14
github.com/kr/pretty v0.2.0 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/labstack/echo v3.3.10+incompatible
github.com/labstack/gommon v0.3.0 // indirect
github.com/magiconair/properties v1.8.1
github.com/mattn/go-colorable v0.1.4 // indirect
github.com/mattn/go-isatty v0.0.11 // indirect
github.com/mr-tron/base58 v1.1.3
github.com/oasislabs/ed25519 v0.0.0-20200206134218-2893bee822a3
github.com/panjf2000/ants/v2 v2.2.2
github.com/pelletier/go-toml v1.6.0 // indirect
github.com/pkg/errors v0.9.1
github.com/rogpeppe/go-internal v1.5.2 // indirect
github.com/sergi/go-diff v1.1.0 // indirect
github.com/spf13/afero v1.2.2 // indirect
github.com/spf13/cast v1.3.1 // indirect
github.com/spf13/jwalterweatherman v1.1.0 // indirect
github.com/spf13/pflag v1.0.5
github.com/spf13/viper v1.6.1
github.com/stretchr/objx v0.2.0 // indirect
github.com/stretchr/testify v1.4.0
github.com/valyala/fasttemplate v1.1.0 // indirect
go.uber.org/atomic v1.5.1
go.uber.org/zap v1.13.0
golang.org/x/crypto v0.0.0-20200128174031-69ecbb4d6d5d // indirect
golang.org/x/net v0.0.0-20191209160850-c0dbc17a3553
golang.org/x/sys v0.0.0-20200124204421-9fbb57f87de9 // indirect
golang.org/x/text v0.3.2 // indirect
golang.org/x/crypto v0.0.0-20200214034016-1d94cc7ab1c6
golang.org/x/net v0.0.0-20200202094626-16171245cfb2
golang.org/x/sys v0.0.0-20200212091648-12a6c2dcc1e4 // indirect
golang.org/x/tools v0.0.0-20200130002326-2f3ba24bd6e7 // indirect
gopkg.in/ini.v1 v1.51.1 // indirect
gopkg.in/src-d/go-git.v4 v4.13.1
gopkg.in/yaml.v2 v2.2.7 // indirect
)
This diff is collapsed.
......@@ -29,9 +29,10 @@ import (
)
func main() {
cli.PrintVersion()
cli.LoadConfig()
go http.ListenAndServe("localhost:6060", nil) // pprof Server for Debbuging Mutexes
go http.ListenAndServe("localhost:6061", nil) // pprof Server for Debbuging Mutexes
node.Run(
node.Plugins(
......
package discover
import (
"time"
"github.com/iotaledger/goshimmer/packages/autopeering/peer"
"github.com/iotaledger/hive.go/logger"
)
// Default values for the global parameters
const (
DefaultReverifyInterval = 10 * time.Second
DefaultQueryInterval = 60 * time.Second
DefaultMaxManaged = 1000
DefaultMaxReplacements = 10
)
var (
reverifyInterval = DefaultReverifyInterval // time interval after which the next peer is reverified
queryInterval = DefaultQueryInterval // time interval after which peers are queried for new peers
maxManaged = DefaultMaxManaged // maximum number of peers that can be managed
maxReplacements = DefaultMaxReplacements // maximum number of peers kept in the replacement list
)
// Config holds discovery related settings.
type Config struct {
// These settings are required and configure the listener:
Log *logger.Logger
// These settings are optional:
MasterPeers []*peer.Peer // list of master peers used for bootstrapping
}
// Parameters holds the parameters that can be configured.
type Parameters struct {
ReverifyInterval time.Duration // time interval after which the next peer is reverified
QueryInterval time.Duration // time interval after which peers are queried for new peers
MaxManaged int // maximum number of peers that can be managed
MaxReplacements int // maximum number of peers kept in the replacement list
}
// SetParameters sets the global parameters for this package.
// This function cannot be used concurrently.
func SetParameter(param Parameters) {
if param.ReverifyInterval > 0 {
reverifyInterval = param.ReverifyInterval
} else {
reverifyInterval = DefaultReverifyInterval
}
if param.QueryInterval > 0 {
queryInterval = param.QueryInterval
} else {
queryInterval = DefaultQueryInterval
}
if param.MaxManaged > 0 {
maxManaged = param.MaxManaged
} else {
maxManaged = DefaultMaxManaged
}
if param.MaxReplacements > 0 {
maxReplacements = param.MaxReplacements
} else {
maxReplacements = DefaultMaxReplacements
}
}
package discover
import (
"github.com/iotaledger/goshimmer/packages/autopeering/peer"
"github.com/iotaledger/hive.go/events"
)
// Events contains all the events that are triggered during the peer discovery.
var Events = struct {
// A PeerDiscovered event is triggered, when a new peer has been discovered and verified.
PeerDiscovered *events.Event
// A PeerDeleted event is triggered, when a discovered and verified peer could not be reverified.
PeerDeleted *events.Event
}{
PeerDiscovered: events.NewEvent(peerDiscovered),
PeerDeleted: events.NewEvent(peerDeleted),
}
// DiscoveredEvent bundles the information of the discovered peer.
type DiscoveredEvent struct {
Peer *peer.Peer // discovered peer
}
// DeletedEvent bundles the information of the deleted peer.
type DeletedEvent struct {
Peer *peer.Peer // deleted peer
}
func peerDiscovered(handler interface{}, params ...interface{}) {
handler.(func(*DiscoveredEvent))(params[0].(*DiscoveredEvent))
}
func peerDeleted(handler interface{}, params ...interface{}) {
handler.(func(*DeletedEvent))(params[0].(*DeletedEvent))
}
package discover
import (
"math/rand"
"sync"
"time"
"github.com/iotaledger/goshimmer/packages/autopeering/peer"
"github.com/iotaledger/goshimmer/packages/autopeering/server"
"github.com/iotaledger/hive.go/logger"
)
const (
// PingExpiration is the time until a peer verification expires.
PingExpiration = 12 * time.Hour
// MaxPeersInResponse is the maximum number of peers returned in DiscoveryResponse.
MaxPeersInResponse = 6
// MaxServices is the maximum number of services a peer can support.
MaxServices = 5
// VersionNum specifies the expected version number for this Protocol.
VersionNum = 0
)
type network interface {
local() *peer.Local
Ping(*peer.Peer) error
DiscoveryRequest(*peer.Peer) ([]*peer.Peer, error)
}
type manager struct {
mutex sync.Mutex // protects active and replacement
active []*mpeer
replacements []*mpeer
net network
log *logger.Logger
wg sync.WaitGroup
closing chan struct{}
}
func newManager(net network, masters []*peer.Peer, log *logger.Logger) *manager {
m := &manager{
active: make([]*mpeer, 0, maxManaged),
replacements: make([]*mpeer, 0, maxReplacements),
net: net,
log: log,
closing: make(chan struct{}),
}
m.loadInitialPeers(masters)
return m
}
func (m *manager) start() {
m.wg.Add(1)
go m.loop()
}
func (m *manager) self() peer.ID {
return m.net.local().ID()
}
func (m *manager) close() {
close(m.closing)
m.wg.Wait()
}
func (m *manager) loop() {
defer m.wg.Done()
var (
reverify = time.NewTimer(0) // setting this to 0 will cause a trigger right away
reverifyDone chan struct{}
query = time.NewTimer(server.ResponseTimeout) // trigger the first query after the reverify
queryNext chan time.Duration
)
defer reverify.Stop()
defer query.Stop()
Loop:
for {
select {
// start verification, if not yet running
case <-reverify.C:
// if there is no reverifyDone, this means doReverify is not running
if reverifyDone == nil {
reverifyDone = make(chan struct{})
go m.doReverify(reverifyDone)
}
// reset verification
case <-reverifyDone:
reverifyDone = nil
reverify.Reset(reverifyInterval) // reverify again after the given interval
// start requesting new peers, if no yet running
case <-query.C:
if queryNext == nil {
queryNext = make(chan time.Duration)
go m.doQuery(queryNext)
}
// on query done, reset time to given duration
case d := <-queryNext:
queryNext = nil
query.Reset(d)
// on close, exit the loop
case <-m.closing:
break Loop
}
}
// wait for spawned goroutines to finish
if reverifyDone != nil {
<-reverifyDone
}
if queryNext != nil {
<-queryNext
}
}
// doReverify pings the oldest active peer.
func (m *manager) doReverify(done chan<- struct{}) {
defer close(done)
p := m.peerToReverify()
if p == nil {
return // nothing can be reverified
}
m.log.Debugw("reverifying",
"id", p.ID(),
"addr", p.Address(),
)
// could not verify the peer
if m.net.Ping(unwrapPeer(p)) != nil {
m.mutex.Lock()
defer m.mutex.Unlock()
m.active, _ = deletePeerByID(m.active, p.ID())
m.log.Debugw("remove dead",
"peer", p,
)
Events.PeerDeleted.Trigger(&DeletedEvent{Peer: unwrapPeer(p)})
// add a random replacement, if available
if len(m.replacements) > 0 {
var r *mpeer
m.replacements, r = deletePeer(m.replacements, rand.Intn(len(m.replacements)))
m.active = pushPeer(m.active, r, maxManaged)
}
return
}
// no need to do anything here, as the peer is bumped when handling the pong
}
// peerToReverify returns the oldest peer, or nil if empty.
func (m *manager) peerToReverify() *mpeer {
m.mutex.Lock()
defer m.mutex.Unlock()
if len(m.active) == 0 {
return nil
}
// the last peer is the oldest
return m.active[len(m.active)-1]
}
// updatePeer moves the peer with the given ID to the front of the list of managed peers.
// It returns 0 if there was no peer with that id, otherwise the verifiedCount of the updated peer is returned.
func (m *manager) updatePeer(update *peer.Peer) uint {
id := update.ID()
for i, p := range m.active {
if p.ID() == id {
if i > 0 {
// move i-th peer to the front
copy(m.active[1:], m.active[:i])
}
// replace first mpeer with a wrap of the updated peer
m.active[0] = &mpeer{
Peer: *update,
verifiedCount: p.verifiedCount + 1,
lastNewPeers: p.lastNewPeers,
}
return p.verifiedCount + 1
}
}
return 0
}
func (m *manager) addReplacement(p *mpeer) bool {
if containsPeer(m.replacements, p.ID()) {
return false // already in the list
}
m.replacements = unshiftPeer(m.replacements, p, maxReplacements)
return true
}
func (m *manager) loadInitialPeers(masters []*peer.Peer) {
var peers []*peer.Peer
db := m.net.local().Database()
if db != nil {
peers = db.SeedPeers()
}
peers = append(peers, masters...)
for _, p := range peers {
m.addDiscoveredPeer(p)
}
}
// addDiscoveredPeer adds a newly discovered peer that has never been verified or pinged yet.
// It returns true, if the given peer was new and added, false otherwise.
func (m *manager) addDiscoveredPeer(p *peer.Peer) bool {
// never add the local peer
if p.ID() == m.self() {
return false
}
m.mutex.Lock()
defer m.mutex.Unlock()
if containsPeer(m.active, p.ID()) {
return false
}
m.log.Debugw("discovered",
"peer", p,
)
mp := wrapPeer(p)
if len(m.active) >= maxManaged {
return m.addReplacement(mp)
}
m.active = pushPeer(m.active, mp, maxManaged)
return true
}
// addVerifiedPeer adds a new peer that has just been successfully pinged.
// It returns true, if the given peer was new and added, false otherwise.
func (m *manager) addVerifiedPeer(p *peer.Peer) bool {
// never add the local peer
if p.ID() == m.self() {
return false
}
m.log.Debugw("verified",
"peer", p,
"services", p.Services(),
)
m.mutex.Lock()
defer m.mutex.Unlock()
// if already in the list, move it to the front
if v := m.updatePeer(p); v > 0 {
// trigger the event only for the first time the peer is updated
if v == 1 {
Events.PeerDiscovered.Trigger(&DiscoveredEvent{Peer: p})
}
return false
}
mp := wrapPeer(p)
mp.verifiedCount = 1
if len(m.active) >= maxManaged {
return m.addReplacement(mp)
}
// trigger the event only when the peer is added to active
Events.PeerDiscovered.Trigger(&DiscoveredEvent{Peer: p})
// new nodes are added to the front
m.active = unshiftPeer(m.active, mp, maxManaged)
return true
}
// getRandomPeers returns a list of randomly selected peers.
func (m *manager) getRandomPeers(n int, minVerified uint) []*peer.Peer {
m.mutex.Lock()
defer m.mutex.Unlock()
if n > len(m.active) {
n = len(m.active)
}
peers := make([]*peer.Peer, 0, n)
for _, i := range rand.Perm(len(m.active)) {
if len(peers) == n {
break
}
mp := m.active[i]
if mp.verifiedCount < minVerified {
continue
}
peers = append(peers, unwrapPeer(mp))
}
return peers
}
// getVerifiedPeers returns all the currently managed peers that have been verified at least once.
func (m *manager) getVerifiedPeers() []*mpeer {
m.mutex.Lock()
defer m.mutex.Unlock()
peers := make([]*mpeer, 0, len(m.active))
for _, mp := range m.active {
if mp.verifiedCount == 0 {
continue
}
peers = append(peers, mp)
}
return peers
}
// isKnown returns true if the manager is keeping track of that peer.
func (m *manager) isKnown(id peer.ID) bool {
if id == m.self() {
return true
}
m.mutex.Lock()
defer m.mutex.Unlock()
return containsPeer(m.active, id) || containsPeer(m.replacements, id)
}
package discover
import (
"fmt"
"testing"
"time"
"github.com/iotaledger/goshimmer/packages/autopeering/peer"
"github.com/iotaledger/goshimmer/packages/autopeering/peer/peertest"
"github.com/iotaledger/goshimmer/packages/autopeering/server"
"github.com/iotaledger/goshimmer/packages/database/mapdb"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
)
func TestMgrClose(t *testing.T) {
_, _, teardown := newManagerTest(t)
defer teardown()
time.Sleep(graceTime)
}
func TestMgrVerifyDiscoveredPeer(t *testing.T) {
mgr, m, teardown := newManagerTest(t)
defer teardown()
p := peertest.NewPeer(testNetwork, "p")
// expect Ping of peer p
m.On("Ping", p).Return(nil).Once()
// ignore DiscoveryRequest calls
m.On("DiscoveryRequest", mock.Anything).Return([]*peer.Peer{}, nil).Maybe()
// let the manager initialize
time.Sleep(graceTime)
mgr.addDiscoveredPeer(p)
mgr.doReverify(make(chan struct{})) // manually trigger a verify
m.AssertExpectations(t)
}
func TestMgrReverifyPeer(t *testing.T) {
mgr, m, teardown := newManagerTest(t)
defer teardown()
p := peertest.NewPeer(testNetwork, "p")
// expect Ping of peer p
m.On("Ping", p).Return(nil).Once()
// ignore DiscoveryRequest calls
m.On("DiscoveryRequest", mock.Anything).Return([]*peer.Peer{}, nil).Maybe()
// let the manager initialize
time.Sleep(graceTime)
mgr.addVerifiedPeer(p)
mgr.doReverify(make(chan struct{})) // manually trigger a verify
m.AssertExpectations(t)
}
func TestMgrRequestDiscoveredPeer(t *testing.T) {
mgr, m, teardown := newManagerTest(t)
defer teardown()
p1 := peertest.NewPeer(testNetwork, "verified")
p2 := peertest.NewPeer(testNetwork, "discovered")
// expect DiscoveryRequest on the discovered peer
m.On("DiscoveryRequest", p1).Return([]*peer.Peer{p2}, nil).Once()
// ignore any Ping
m.On("Ping", mock.Anything).Return(nil).Maybe()
mgr.addVerifiedPeer(p1)
mgr.addDiscoveredPeer(p2)
mgr.doQuery(make(chan time.Duration, 1)) // manually trigger a query
m.AssertExpectations(t)
}
func TestMgrAddManyVerifiedPeers(t *testing.T) {
mgr, m, teardown := newManagerTest(t)
defer teardown()
p := peertest.NewPeer(testNetwork, "p")
// expect Ping of peer p
m.On("Ping", p).Return(nil).Once()
// ignore DiscoveryRequest calls
m.On("DiscoveryRequest", mock.Anything).Return([]*peer.Peer{}, nil).Maybe()
// let the manager initialize
time.Sleep(graceTime)
mgr.addVerifiedPeer(p)
for i := 0; i < maxManaged+maxReplacements; i++ {
mgr.addVerifiedPeer(peertest.NewPeer(testNetwork, fmt.Sprintf("p%d", i)))
}
mgr.doReverify(make(chan struct{})) // manually trigger a verify
ps := unwrapPeers(mgr.getVerifiedPeers())
assert.Equal(t, maxManaged, len(ps))
assert.Contains(t, ps, p)
m.AssertExpectations(t)
}
func TestMgrDeleteUnreachablePeer(t *testing.T) {
mgr, m, teardown := newManagerTest(t)
defer teardown()
p := peertest.NewPeer(testNetwork, "p")
// expect Ping of peer p, but return error
m.On("Ping", p).Return(server.ErrTimeout).Times(1)
// ignore DiscoveryRequest calls
m.On("DiscoveryRequest", mock.Anything).Return([]*peer.Peer{}, nil).Maybe()
// let the manager initialize
time.Sleep(graceTime)
mgr.addVerifiedPeer(p)
for i := 0; i < maxManaged; i++ {
mgr.addVerifiedPeer(peertest.NewPeer(testNetwork, fmt.Sprintf("p%d", i)))
}
mgr.doReverify(make(chan struct{})) // manually trigger a verify
ps := unwrapPeers(mgr.getVerifiedPeers())
assert.Equal(t, maxManaged, len(ps))
assert.NotContains(t, ps, p)
m.AssertExpectations(t)
}
type NetworkMock struct {
mock.Mock
loc *peer.Local
}
func newManagerTest(t require.TestingT) (*manager, *NetworkMock, func()) {
db, err := peer.NewDB(mapdb.NewMapDB())
require.NoError(t, err)
local := peertest.NewLocal(testNetwork, testAddress, db)
networkMock := &NetworkMock{
loc: local,
}
mgr := newManager(networkMock, nil, log)
return mgr, networkMock, mgr.close
}
func (m *NetworkMock) local() *peer.Local {
return m.loc
}
func (m *NetworkMock) Ping(p *peer.Peer) error {
args := m.Called(p)
return args.Error(0)
}
func (m *NetworkMock) DiscoveryRequest(p *peer.Peer) ([]*peer.Peer, error) {
args := m.Called(p)
return args.Get(0).([]*peer.Peer), args.Error(1)
}
package discover
import (
"fmt"
"github.com/iotaledger/goshimmer/packages/autopeering/peer"
)
// mpeer represents a discovered peer with additional data.
// The fields of Peer may not be modified.
type mpeer struct {
peer.Peer
verifiedCount uint // how often that peer has been reverified
lastNewPeers uint // number of returned new peers when queried the last time
}
func wrapPeer(p *peer.Peer) *mpeer {
return &mpeer{Peer: *p}
}
func unwrapPeer(p *mpeer) *peer.Peer {
return &p.Peer
}
func unwrapPeers(ps []*mpeer) []*peer.Peer {
result := make([]*peer.Peer, len(ps))
for i, n := range ps {
result[i] = unwrapPeer(n)
}
return result
}
// containsPeer returns true if a peer with the given ID is in the list.
func containsPeer(list []*mpeer, id peer.ID) bool {
for _, p := range list {
if p.ID() == id {
return true
}
}
return false
}
// unshiftPeer adds a new peer to the front of the list.
// If the list already contains max peers, the last is discarded.
func unshiftPeer(list []*mpeer, p *mpeer, max int) []*mpeer {
if len(list) > max {
panic(fmt.Sprintf("mpeer: invalid max value %d", max))
}
if len(list) < max {
list = append(list, nil)
}
copy(list[1:], list)
list[0] = p
return list
}
// deletePeer is a helper that deletes the peer with the given index from the list.
func deletePeer(list []*mpeer, i int) ([]*mpeer, *mpeer) {
if i >= len(list) {
panic("mpeer: invalid index or empty mpeer list")
}
p := list[i]
copy(list[i:], list[i+1:])
list[len(list)-1] = nil
return list[:len(list)-1], p
}
// deletePeerByID deletes the peer with the given ID from the list.
func deletePeerByID(list []*mpeer, id peer.ID) ([]*mpeer, *mpeer) {
for i, p := range list {
if p.ID() == id {
return deletePeer(list, i)
}
}
panic("mpeer: id not contained in list")
}
// pushPeer adds the given peer to the pack of the list.
// If the list already contains max peers, the first is discarded.
func pushPeer(list []*mpeer, p *mpeer, max int) []*mpeer {
if len(list) > max {
panic(fmt.Sprintf("mpeer: invalid max value %d", max))
}
if len(list) == max {
copy(list, list[1:])
list[len(list)-1] = p
return list
}
return append(list, p)
}
package discover
import (
"fmt"
"testing"
"github.com/iotaledger/goshimmer/packages/autopeering/peer"
"github.com/iotaledger/goshimmer/packages/autopeering/peer/peertest"
"github.com/stretchr/testify/assert"
)
func TestUnwrapPeers(t *testing.T) {
m := make([]*mpeer, 5)
p := make([]*peer.Peer, 5)
for i := range m {
p[i] = peertest.NewPeer(testNetwork, fmt.Sprintf("%d", i))
m[i] = &mpeer{Peer: *p[i]}
}
unwrapP := unwrapPeers(m)
assert.Equal(t, p, unwrapP, "unwrapPeers")
}
func TestContainsPeer(t *testing.T) {
m := make([]*mpeer, 5)
p := make([]*peer.Peer, 5)
k := peertest.NewPeer(testNetwork, "k")
for i := range m {
p[i] = peertest.NewPeer(testNetwork, fmt.Sprintf("%d", i))
m[i] = &mpeer{Peer: *p[i]}
}
for i := range m {
assert.Equal(t, true, containsPeer(m, p[i].ID()), "Contains")
}
assert.Equal(t, false, containsPeer(m, k.ID()), "Contains")
}
func TestUnshiftPeer(t *testing.T) {
m := make([]*mpeer, 5)
for i := range m {
m[i] = &mpeer{Peer: *peertest.NewPeer(testNetwork, fmt.Sprintf("%d", i))}
}
type testCase struct {
input []*mpeer
toAdd *mpeer
expected []*mpeer
}
tests := []testCase{
{
input: []*mpeer{},
toAdd: m[0],
expected: []*mpeer{m[0]},
},
{
input: []*mpeer{m[0]},
toAdd: m[1],
expected: []*mpeer{m[1], m[0]},
},
{
input: []*mpeer{m[0], m[1]},
toAdd: m[2],
expected: []*mpeer{m[2], m[0], m[1]},
},
{
input: []*mpeer{m[0], m[1], m[2], m[3]},
toAdd: m[4],
expected: []*mpeer{m[4], m[0], m[1], m[2]},
},
}
for _, test := range tests {
test.input = unshiftPeer(test.input, test.toAdd, len(m)-1)
assert.Equal(t, test.expected, test.input, "unshiftPeer")
}
}
func TestDeletePeer(t *testing.T) {
m := make([]*mpeer, 5)
for i := range m {
m[i] = &mpeer{Peer: *peertest.NewPeer(testNetwork, fmt.Sprintf("%d", i))}
}
type testCase struct {
input []*mpeer
toRemove int
expected []*mpeer
deleted *mpeer
}
tests := []testCase{
{
input: []*mpeer{m[0]},
toRemove: 0,
expected: []*mpeer{},
deleted: m[0],
},
{
input: []*mpeer{m[0], m[1], m[2], m[3]},
toRemove: 2,
expected: []*mpeer{m[0], m[1], m[3]},
deleted: m[2],
},
}
for _, test := range tests {
var deleted *mpeer
test.input, deleted = deletePeer(test.input, test.toRemove)
assert.Equal(t, test.expected, test.input, "deletePeer_list")
assert.Equal(t, test.deleted, deleted, "deletePeer_peer")
}
}
func TestDeletePeerByID(t *testing.T) {
m := make([]*mpeer, 5)
p := make([]*peer.Peer, 5)
for i := range m {
p[i] = peertest.NewPeer(testNetwork, fmt.Sprintf("%d", i))
m[i] = &mpeer{Peer: *p[i]}
}
type testCase struct {
input []*mpeer
toRemove peer.ID
expected []*mpeer
deleted *mpeer
}
tests := []testCase{
{
input: []*mpeer{m[0]},
toRemove: p[0].ID(),
expected: []*mpeer{},
deleted: m[0],
},
{
input: []*mpeer{m[0], m[1], m[2], m[3]},
toRemove: p[2].ID(),
expected: []*mpeer{m[0], m[1], m[3]},
deleted: m[2],
},
}
for _, test := range tests {
var deleted *mpeer
test.input, deleted = deletePeerByID(test.input, test.toRemove)
assert.Equal(t, test.expected, test.input, "deletePeerByID_list")
assert.Equal(t, test.deleted, deleted, "deletePeerByID_peer")
}
}
func TestPushPeer(t *testing.T) {
m := make([]*mpeer, 5)
max := len(m) - 1
for i := range m {
m[i] = &mpeer{Peer: *peertest.NewPeer(testNetwork, fmt.Sprintf("%d", i))}
}
type testCase struct {
input []*mpeer
toPush *mpeer
expected []*mpeer
}
tests := []testCase{
{
input: []*mpeer{},
toPush: m[0],
expected: []*mpeer{m[0]},
},
{
input: []*mpeer{m[0], m[1]},
toPush: m[2],
expected: []*mpeer{m[0], m[1], m[2]},
},
{
input: []*mpeer{m[0], m[1], m[2], m[3]},
toPush: m[4],
expected: []*mpeer{m[1], m[2], m[3], m[4]},
},
}
for _, test := range tests {
test.input = pushPeer(test.input, test.toPush, max)
assert.Equal(t, test.expected, test.input, "pushPeer")
}
}
package proto
import (
"github.com/golang/protobuf/proto"
"github.com/iotaledger/goshimmer/packages/autopeering/server"
)
// MType is the type of message type enum.
type MType = server.MType
// An enum for the different message types.
const (
MPing MType = 10 + iota
MPong
MDiscoveryRequest
MDiscoveryResponse
)
// Message extends the proto.Message interface with additional util functions.
type Message interface {
proto.Message
// Name returns the name of the corresponding message type for debugging.
Name() string
// Type returns the type of the corresponding message as an enum.
Type() MType
}
func (m *Ping) Name() string { return "PING" }
func (m *Ping) Type() MType { return MPing }
func (m *Pong) Name() string { return "PONG" }
func (m *Pong) Type() MType { return MPong }
func (m *DiscoveryRequest) Name() string { return "DISCOVERY_REQUEST" }
func (m *DiscoveryRequest) Type() MType { return MDiscoveryRequest }
func (m *DiscoveryResponse) Name() string { return "DISCOVERY_RESPONSE" }
func (m *DiscoveryResponse) Type() MType { return MDiscoveryResponse }
// Code generated by protoc-gen-go. DO NOT EDIT.
// source: discover/proto/message.proto
package proto
import (
fmt "fmt"
math "math"
proto "github.com/golang/protobuf/proto"
proto2 "github.com/iotaledger/goshimmer/packages/autopeering/peer/proto"
proto1 "github.com/iotaledger/goshimmer/packages/autopeering/peer/service/proto"
)
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package
type Ping struct {
// protocol version number
Version uint32 `protobuf:"varint,1,opt,name=version,proto3" json:"version,omitempty"`
// string form of the return address (e.g. "192.0.2.1:25", "[2001:db8::1]:80")
From string `protobuf:"bytes,2,opt,name=from,proto3" json:"from,omitempty"`
// string form of the recipient address
To string `protobuf:"bytes,3,opt,name=to,proto3" json:"to,omitempty"`
// unix time
Timestamp int64 `protobuf:"varint,4,opt,name=timestamp,proto3" json:"timestamp,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *Ping) Reset() { *m = Ping{} }
func (m *Ping) String() string { return proto.CompactTextString(m) }
func (*Ping) ProtoMessage() {}
func (*Ping) Descriptor() ([]byte, []int) {
return fileDescriptor_43f14146485f66eb, []int{0}
}
func (m *Ping) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_Ping.Unmarshal(m, b)
}
func (m *Ping) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_Ping.Marshal(b, m, deterministic)
}
func (m *Ping) XXX_Merge(src proto.Message) {
xxx_messageInfo_Ping.Merge(m, src)
}
func (m *Ping) XXX_Size() int {
return xxx_messageInfo_Ping.Size(m)
}
func (m *Ping) XXX_DiscardUnknown() {
xxx_messageInfo_Ping.DiscardUnknown(m)
}
var xxx_messageInfo_Ping proto.InternalMessageInfo
func (m *Ping) GetVersion() uint32 {
if m != nil {
return m.Version
}
return 0
}
func (m *Ping) GetFrom() string {
if m != nil {
return m.From
}
return ""
}
func (m *Ping) GetTo() string {
if m != nil {
return m.To
}
return ""
}
func (m *Ping) GetTimestamp() int64 {
if m != nil {
return m.Timestamp
}
return 0
}
type Pong struct {
// hash of the ping packet
PingHash []byte `protobuf:"bytes,1,opt,name=ping_hash,json=pingHash,proto3" json:"ping_hash,omitempty"`
// string form of the recipient address
To string `protobuf:"bytes,2,opt,name=to,proto3" json:"to,omitempty"`
// services supported by the sender
Services *proto1.ServiceMap `protobuf:"bytes,3,opt,name=services,proto3" json:"services,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *Pong) Reset() { *m = Pong{} }
func (m *Pong) String() string { return proto.CompactTextString(m) }
func (*Pong) ProtoMessage() {}
func (*Pong) Descriptor() ([]byte, []int) {
return fileDescriptor_43f14146485f66eb, []int{1}
}
func (m *Pong) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_Pong.Unmarshal(m, b)
}
func (m *Pong) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_Pong.Marshal(b, m, deterministic)
}
func (m *Pong) XXX_Merge(src proto.Message) {
xxx_messageInfo_Pong.Merge(m, src)
}
func (m *Pong) XXX_Size() int {
return xxx_messageInfo_Pong.Size(m)
}
func (m *Pong) XXX_DiscardUnknown() {
xxx_messageInfo_Pong.DiscardUnknown(m)
}
var xxx_messageInfo_Pong proto.InternalMessageInfo
func (m *Pong) GetPingHash() []byte {
if m != nil {
return m.PingHash
}
return nil
}
func (m *Pong) GetTo() string {
if m != nil {
return m.To
}
return ""
}
func (m *Pong) GetServices() *proto1.ServiceMap {
if m != nil {
return m.Services
}
return nil
}
type DiscoveryRequest struct {
// string form of the recipient address
To string `protobuf:"bytes,1,opt,name=to,proto3" json:"to,omitempty"`
// unix time
Timestamp int64 `protobuf:"varint,2,opt,name=timestamp,proto3" json:"timestamp,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *DiscoveryRequest) Reset() { *m = DiscoveryRequest{} }
func (m *DiscoveryRequest) String() string { return proto.CompactTextString(m) }
func (*DiscoveryRequest) ProtoMessage() {}
func (*DiscoveryRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_43f14146485f66eb, []int{2}
}
func (m *DiscoveryRequest) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_DiscoveryRequest.Unmarshal(m, b)
}
func (m *DiscoveryRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_DiscoveryRequest.Marshal(b, m, deterministic)
}
func (m *DiscoveryRequest) XXX_Merge(src proto.Message) {
xxx_messageInfo_DiscoveryRequest.Merge(m, src)
}
func (m *DiscoveryRequest) XXX_Size() int {
return xxx_messageInfo_DiscoveryRequest.Size(m)
}
func (m *DiscoveryRequest) XXX_DiscardUnknown() {
xxx_messageInfo_DiscoveryRequest.DiscardUnknown(m)
}
var xxx_messageInfo_DiscoveryRequest proto.InternalMessageInfo
func (m *DiscoveryRequest) GetTo() string {
if m != nil {
return m.To
}
return ""
}
func (m *DiscoveryRequest) GetTimestamp() int64 {
if m != nil {
return m.Timestamp
}
return 0
}
type DiscoveryResponse struct {
// hash of the corresponding request
ReqHash []byte `protobuf:"bytes,1,opt,name=req_hash,json=reqHash,proto3" json:"req_hash,omitempty"`
// list of peers
Peers []*proto2.Peer `protobuf:"bytes,2,rep,name=peers,proto3" json:"peers,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *DiscoveryResponse) Reset() { *m = DiscoveryResponse{} }
func (m *DiscoveryResponse) String() string { return proto.CompactTextString(m) }
func (*DiscoveryResponse) ProtoMessage() {}
func (*DiscoveryResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_43f14146485f66eb, []int{3}
}
func (m *DiscoveryResponse) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_DiscoveryResponse.Unmarshal(m, b)
}
func (m *DiscoveryResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_DiscoveryResponse.Marshal(b, m, deterministic)
}
func (m *DiscoveryResponse) XXX_Merge(src proto.Message) {
xxx_messageInfo_DiscoveryResponse.Merge(m, src)
}
func (m *DiscoveryResponse) XXX_Size() int {
return xxx_messageInfo_DiscoveryResponse.Size(m)
}
func (m *DiscoveryResponse) XXX_DiscardUnknown() {
xxx_messageInfo_DiscoveryResponse.DiscardUnknown(m)
}
var xxx_messageInfo_DiscoveryResponse proto.InternalMessageInfo
func (m *DiscoveryResponse) GetReqHash() []byte {
if m != nil {
return m.ReqHash
}
return nil
}
func (m *DiscoveryResponse) GetPeers() []*proto2.Peer {
if m != nil {
return m.Peers
}
return nil
}
func init() {
proto.RegisterType((*Ping)(nil), "proto.Ping")
proto.RegisterType((*Pong)(nil), "proto.Pong")
proto.RegisterType((*DiscoveryRequest)(nil), "proto.DiscoveryRequest")
proto.RegisterType((*DiscoveryResponse)(nil), "proto.DiscoveryResponse")
}
func init() { proto.RegisterFile("discover/proto/message.proto", fileDescriptor_43f14146485f66eb) }
var fileDescriptor_43f14146485f66eb = []byte{
// 309 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x6c, 0x91, 0xc1, 0x4b, 0xc3, 0x30,
0x14, 0xc6, 0x59, 0xd7, 0xb9, 0xf6, 0x4d, 0xc5, 0x05, 0x84, 0x3a, 0x77, 0x98, 0x3b, 0x79, 0x59,
0x0b, 0x2a, 0x9e, 0x45, 0x3c, 0x78, 0x11, 0x66, 0xbc, 0x79, 0x91, 0xb4, 0x8b, 0x6d, 0xc0, 0x36,
0x59, 0x93, 0x0e, 0xfc, 0xef, 0x7d, 0x4d, 0x63, 0x9d, 0xe2, 0x29, 0xef, 0x7d, 0xef, 0xf1, 0x7d,
0xbf, 0x24, 0x30, 0xdf, 0x08, 0x9d, 0xc9, 0x1d, 0xaf, 0x13, 0x55, 0x4b, 0x23, 0x93, 0x92, 0x6b,
0xcd, 0x72, 0x1e, 0xdb, 0x8e, 0x8c, 0xec, 0x31, 0x3b, 0x55, 0xbc, 0x5f, 0x68, 0xcb, 0x6e, 0x3a,
0x5b, 0x58, 0x59, 0xf3, 0x7a, 0x27, 0x32, 0xee, 0xc6, 0xae, 0xeb, 0x36, 0x96, 0x29, 0xf8, 0x6b,
0x51, 0xe5, 0x24, 0x82, 0x31, 0x46, 0x68, 0x21, 0xab, 0x68, 0xb0, 0x18, 0x5c, 0x1e, 0xd1, 0xef,
0x96, 0x10, 0xf0, 0xdf, 0x6b, 0x59, 0x46, 0x1e, 0xca, 0x21, 0xb5, 0x35, 0x39, 0x06, 0xcf, 0xc8,
0x68, 0x68, 0x15, 0xac, 0xc8, 0x1c, 0x42, 0x23, 0x10, 0xcc, 0xb0, 0x52, 0x45, 0x3e, 0xca, 0x43,
0xfa, 0x23, 0xd8, 0x0c, 0x89, 0x19, 0xe7, 0x10, 0x2a, 0xcc, 0x7a, 0x2b, 0x98, 0x2e, 0x6c, 0xca,
0x21, 0x0d, 0x5a, 0xe1, 0x11, 0x7b, 0x67, 0xe9, 0xf5, 0x96, 0x2b, 0x08, 0x1c, 0xa9, 0xb6, 0x41,
0x93, 0xab, 0x69, 0x87, 0x1c, 0xbf, 0x74, 0xf2, 0x13, 0x53, 0xb4, 0x5f, 0x59, 0xde, 0xc1, 0xc9,
0x83, 0x7b, 0xa7, 0x4f, 0xca, 0xb7, 0x0d, 0x46, 0x3b, 0xcb, 0xc1, 0xff, 0x94, 0xde, 0x5f, 0xca,
0x67, 0x98, 0xee, 0x39, 0x68, 0x25, 0x2b, 0xcd, 0xc9, 0x19, 0x04, 0x35, 0xdf, 0xee, 0x13, 0x8f,
0xb1, 0xb7, 0xc0, 0x17, 0x30, 0x6a, 0x5f, 0x57, 0xa3, 0xd3, 0x10, 0xe9, 0x26, 0x8e, 0x6e, 0x8d,
0x1a, 0xed, 0x26, 0xf7, 0xb7, 0xaf, 0x37, 0xb9, 0x30, 0x45, 0x93, 0xc6, 0x99, 0x2c, 0x13, 0x21,
0x0d, 0xfb, 0xe0, 0x9b, 0x1c, 0x7f, 0x84, 0x35, 0x46, 0xb6, 0x2b, 0x78, 0xf9, 0x95, 0x16, 0x65,
0xf2, 0xfb, 0x8b, 0xd3, 0x03, 0x7b, 0x5c, 0x7f, 0x05, 0x00, 0x00, 0xff, 0xff, 0x4d, 0xa6, 0x1f,
0xce, 0xfb, 0x01, 0x00, 0x00,
}
syntax = "proto3";
option go_package = "github.com/iotaledger/goshimmer/packages/autopeering/discover/proto";
package proto;
import "peer/proto/peer.proto";
import "peer/service/proto/service.proto";
message Ping {
// protocol version number
uint32 version = 1;
// string form of the return address (e.g. "192.0.2.1:25", "[2001:db8::1]:80")
string from = 2;
// string form of the recipient address
string to = 3;
// unix time
int64 timestamp = 4;
}
message Pong {
// hash of the ping packet
bytes ping_hash = 1;
// string form of the recipient address
string to = 2;
// services supported by the sender
ServiceMap services = 3;
}
message DiscoveryRequest {
// string form of the recipient address
string to = 1;
// unix time
int64 timestamp = 2;
}
message DiscoveryResponse {
// hash of the corresponding request
bytes req_hash = 1;
// list of peers
repeated Peer peers = 2;
}
package discover
import (
"bytes"
"errors"
"fmt"
"sync"
"time"
"github.com/golang/protobuf/proto"
pb "github.com/iotaledger/goshimmer/packages/autopeering/discover/proto"
"github.com/iotaledger/goshimmer/packages/autopeering/peer"
peerpb "github.com/iotaledger/goshimmer/packages/autopeering/peer/proto"
"github.com/iotaledger/goshimmer/packages/autopeering/peer/service"
"github.com/iotaledger/goshimmer/packages/autopeering/server"
"github.com/iotaledger/hive.go/backoff"
"github.com/iotaledger/hive.go/logger"
"github.com/iotaledger/hive.go/typeutils"
)
const (
maxRetries = 2
logSends = true
)
// policy for retrying failed network calls
var retryPolicy = backoff.ExponentialBackOff(500*time.Millisecond, 1.5).With(
backoff.Jitter(0.5), backoff.MaxRetries(maxRetries))
// The Protocol handles the peer discovery.
// It responds to incoming messages and sends own requests when needed.
type Protocol struct {
server.Protocol
loc *peer.Local // local peer that runs the protocol
log *logger.Logger // logging
mgr *manager // the manager handles the actual peer discovery and re-verification
running *typeutils.AtomicBool
closeOnce sync.Once
}
// New creates a new discovery protocol.
func New(local *peer.Local, cfg Config) *Protocol {
p := &Protocol{
Protocol: server.Protocol{},
loc: local,
log: cfg.Log,
running: typeutils.NewAtomicBool(),
}
p.mgr = newManager(p, cfg.MasterPeers, cfg.Log.Named("mgr"))
return p
}
// Start starts the actual peer discovery over the provided Sender.
func (p *Protocol) Start(s server.Sender) {
p.Protocol.Sender = s
p.mgr.start()
p.log.Debug("discover started")
p.running.Set()
}
// Close finalizes the protocol.
func (p *Protocol) Close() {
p.closeOnce.Do(func() {
p.running.UnSet()
p.mgr.close()
})
}
// IsVerified checks whether the given peer has recently been verified a recent enough endpoint proof.
func (p *Protocol) IsVerified(id peer.ID, addr string) bool {
return time.Since(p.loc.Database().LastPong(id, addr)) < PingExpiration
}
// EnsureVerified checks if the given peer has recently sent a Ping;
// if not, we send a Ping to trigger a verification.
func (p *Protocol) EnsureVerified(peer *peer.Peer) error {
if !p.hasVerified(peer.ID(), peer.Address()) {
if err := p.Ping(peer); err != nil {
return err
}
// Wait for them to Ping back and process our pong
time.Sleep(server.ResponseTimeout)
}
return nil
}
// GetVerifiedPeer returns the verified peer with the given ID, or nil if no such peer exists.
func (p *Protocol) GetVerifiedPeer(id peer.ID, addr string) *peer.Peer {
for _, verified := range p.mgr.getVerifiedPeers() {
if verified.ID() == id && verified.Address() == addr {
return unwrapPeer(verified)
}
}
return nil
}
// GetVerifiedPeers returns all the currently managed peers that have been verified at least once.
func (p *Protocol) GetVerifiedPeers() []*peer.Peer {
return unwrapPeers(p.mgr.getVerifiedPeers())
}
// HandleMessage responds to incoming peer discovery messages.
func (p *Protocol) HandleMessage(s *server.Server, fromAddr string, fromID peer.ID, fromKey peer.PublicKey, data []byte) (bool, error) {
if !p.running.IsSet() {
return false, nil
}
switch pb.MType(data[0]) {
// Ping
case pb.MPing:
m := new(pb.Ping)
if err := proto.Unmarshal(data[1:], m); err != nil {
return true, fmt.Errorf("invalid message: %w", err)
}
if p.validatePing(fromAddr, m) {
p.handlePing(s, fromAddr, fromID, fromKey, data)
}
// Pong
case pb.MPong:
m := new(pb.Pong)
if err := proto.Unmarshal(data[1:], m); err != nil {
return true, fmt.Errorf("invalid message: %w", err)
}
if p.validatePong(s, fromAddr, fromID, m) {
p.handlePong(fromAddr, fromID, fromKey, m)
}
// DiscoveryRequest
case pb.MDiscoveryRequest:
m := new(pb.DiscoveryRequest)
if err := proto.Unmarshal(data[1:], m); err != nil {
return true, fmt.Errorf("invalid message: %w", err)
}
if p.validateDiscoveryRequest(fromAddr, fromID, m) {
p.handleDiscoveryRequest(s, fromAddr, data)
}
// DiscoveryResponse
case pb.MDiscoveryResponse:
m := new(pb.DiscoveryResponse)
if err := proto.Unmarshal(data[1:], m); err != nil {
return true, fmt.Errorf("invalid message: %w", err)
}
p.validateDiscoveryResponse(s, fromAddr, fromID, m)
// DiscoveryResponse messages are handled in the handleReply function of the validation
default:
return false, nil
}
return true, nil
}
// local returns the associated local peer of the neighbor selection.
func (p *Protocol) local() *peer.Local {
return p.loc
}
// publicAddr returns the public address of the peering service in string representation.
func (p *Protocol) publicAddr() string {
return p.loc.Services().Get(service.PeeringKey).String()
}
// ------ message senders ------
// Ping sends a Ping to the specified peer and blocks until a reply is received or timeout.
func (p *Protocol) Ping(to *peer.Peer) error {
return backoff.Retry(retryPolicy, func() error {
err := <-p.sendPing(to.Address(), to.ID())
if err != nil && !errors.Is(err, server.ErrTimeout) {
return backoff.Permanent(err)
}
return err
})
}
// sendPing sends a Ping to the specified address and expects a matching reply.
// This method is non-blocking, but it returns a channel that can be used to query potential errors.
func (p *Protocol) sendPing(toAddr string, toID peer.ID) <-chan error {
ping := newPing(p.publicAddr(), toAddr)
data := marshal(ping)
// compute the message hash
hash := server.PacketHash(data)
hashEqual := func(m interface{}) bool {
return bytes.Equal(m.(*pb.Pong).GetPingHash(), hash)
}
p.logSend(toAddr, ping)
return p.Protocol.SendExpectingReply(toAddr, toID, data, pb.MPong, hashEqual)
}
// DiscoveryRequest request known peers from the given target. This method blocks
// until a response is received and the provided peers are returned.
func (p *Protocol) DiscoveryRequest(to *peer.Peer) ([]*peer.Peer, error) {
if err := p.EnsureVerified(to); err != nil {
return nil, err
}
req := newDiscoveryRequest(to.Address())
data := marshal(req)
// compute the message hash
hash := server.PacketHash(data)
peers := make([]*peer.Peer, 0, MaxPeersInResponse)
callback := func(m interface{}) bool {
res := m.(*pb.DiscoveryResponse)
if !bytes.Equal(res.GetReqHash(), hash) {
return false
}
peers = peers[:0]
for _, protoPeer := range res.GetPeers() {
if p, _ := peer.FromProto(protoPeer); p != nil {
peers = append(peers, p)
}
}
return true
}
err := backoff.Retry(retryPolicy, func() error {
p.logSend(to.Address(), req)
err := <-p.Protocol.SendExpectingReply(to.Address(), to.ID(), data, pb.MDiscoveryResponse, callback)
if err != nil && !errors.Is(err, server.ErrTimeout) {
return backoff.Permanent(err)
}
return err
})
return peers, err
}
// ------ helper functions ------
// hasVerified returns whether the given peer has recently verified the local peer.
func (p *Protocol) hasVerified(id peer.ID, addr string) bool {
return time.Since(p.loc.Database().LastPing(id, addr)) < PingExpiration
}
func (p *Protocol) logSend(toAddr string, msg pb.Message) {
if logSends {
p.log.Debugw("send message", "type", msg.Name(), "addr", toAddr)
}
}
func marshal(msg pb.Message) []byte {
mType := msg.Type()
if mType > 0xFF {
panic("invalid message")
}
data, err := proto.Marshal(msg)
if err != nil {
panic("invalid message")
}
return append([]byte{byte(mType)}, data...)
}
// newPeer creates a new peer that only has a peering service at the given address.
func newPeer(key peer.PublicKey, network string, address string) *peer.Peer {
services := service.New()
services.Update(service.PeeringKey, network, address)
return peer.NewPeer(key, services)
}
// ------ Message Constructors ------
func newPing(fromAddr string, toAddr string) *pb.Ping {
return &pb.Ping{
Version: VersionNum,
From: fromAddr,
To: toAddr,
Timestamp: time.Now().Unix(),
}
}
func newPong(toAddr string, reqData []byte, services *service.Record) *pb.Pong {
return &pb.Pong{
PingHash: server.PacketHash(reqData),
To: toAddr,
Services: services.ToProto(),
}
}
func newDiscoveryRequest(toAddr string) *pb.DiscoveryRequest {
return &pb.DiscoveryRequest{
To: toAddr,
Timestamp: time.Now().Unix(),
}
}
func newDiscoveryResponse(reqData []byte, list []*peer.Peer) *pb.DiscoveryResponse {
peers := make([]*peerpb.Peer, 0, len(list))
for _, p := range list {
peers = append(peers, p.ToProto())
}
return &pb.DiscoveryResponse{
ReqHash: server.PacketHash(reqData),
Peers: peers,
}
}
// ------ Message Handlers ------
func (p *Protocol) validatePing(fromAddr string, m *pb.Ping) bool {
// check version number
if m.GetVersion() != VersionNum {
p.log.Debugw("invalid message",
"type", m.Name(),
"version", m.GetVersion(),
"want", VersionNum,
)
return false
}
// check that From matches the package sender address
if m.GetFrom() != fromAddr {
p.log.Debugw("invalid message",
"type", m.Name(),
"from", m.GetFrom(),
"want", fromAddr,
)
return false
}
// check that To matches the local address
if m.GetTo() != p.publicAddr() {
p.log.Debugw("invalid message",
"type", m.Name(),
"to", m.GetTo(),
"want", p.publicAddr(),
)
return false
}
// check Timestamp
if p.Protocol.IsExpired(m.GetTimestamp()) {
p.log.Debugw("invalid message",
"type", m.Name(),
"timestamp", time.Unix(m.GetTimestamp(), 0),
)
return false
}
p.log.Debugw("valid message",
"type", m.Name(),
"addr", fromAddr,
)
return true
}
func (p *Protocol) handlePing(s *server.Server, fromAddr string, fromID peer.ID, fromKey peer.PublicKey, rawData []byte) {
// create and send the pong response
pong := newPong(fromAddr, rawData, p.loc.Services().CreateRecord())
p.logSend(fromAddr, pong)
s.Send(fromAddr, marshal(pong))
// if the peer is new or expired, send a Ping to verify
if !p.IsVerified(fromID, fromAddr) {
p.sendPing(fromAddr, fromID)
} else if !p.mgr.isKnown(fromID) { // add a discovered peer to the manager if it is new
p.mgr.addDiscoveredPeer(newPeer(fromKey, s.LocalAddr().Network(), fromAddr))
}
_ = p.loc.Database().UpdateLastPing(fromID, fromAddr, time.Now())
}
func (p *Protocol) validatePong(s *server.Server, fromAddr string, fromID peer.ID, m *pb.Pong) bool {
// check that To matches the local address
if m.GetTo() != p.publicAddr() {
p.log.Debugw("invalid message",
"type", m.Name(),
"to", m.GetTo(),
"want", p.publicAddr(),
)
return false
}
// there must be a Ping waiting for this pong as a reply
if !s.IsExpectedReply(fromAddr, fromID, m.Type(), m) {
p.log.Debugw("invalid message",
"type", m.Name(),
"unexpected", fromAddr,
)
return false
}
// there must a valid number of services
numServices := len(m.GetServices().GetMap())
if numServices <= 0 || numServices > MaxServices {
p.log.Debugw("invalid message",
"type", m.Name(),
"#peers", numServices,
)
return false
}
p.log.Debugw("valid message",
"type", m.Name(),
"addr", fromAddr,
)
return true
}
func (p *Protocol) handlePong(fromAddr string, fromID peer.ID, fromKey peer.PublicKey, m *pb.Pong) {
services, _ := service.FromProto(m.GetServices())
peering := services.Get(service.PeeringKey)
if peering == nil || peering.String() != fromAddr {
p.log.Warn("invalid services")
return
}
// create a proper key with these services
from := peer.NewPeer(fromKey, services)
// a valid pong verifies the peer
p.mgr.addVerifiedPeer(from)
// update peer database
db := p.loc.Database()
_ = db.UpdateLastPong(fromID, fromAddr, time.Now())
_ = db.UpdatePeer(from)
}
func (p *Protocol) validateDiscoveryRequest(fromAddr string, fromID peer.ID, m *pb.DiscoveryRequest) bool {
// check that To matches the local address
if m.GetTo() != p.publicAddr() {
p.log.Debugw("invalid message",
"type", m.Name(),
"to", m.GetTo(),
"want", p.publicAddr(),
)
return false
}
// check Timestamp
if p.Protocol.IsExpired(m.GetTimestamp()) {
p.log.Debugw("invalid message",
"type", m.Name(),
"timestamp", time.Unix(m.GetTimestamp(), 0),
)
return false
}
// check whether the sender is verified
if !p.IsVerified(fromID, fromAddr) {
p.log.Debugw("invalid message",
"type", m.Name(),
"unverified", fromAddr,
)
return false
}
p.log.Debugw("valid message",
"type", m.Name(),
"addr", fromAddr,
)
return true
}
func (p *Protocol) handleDiscoveryRequest(s *server.Server, fromAddr string, rawData []byte) {
// get a random list of verified peers
peers := p.mgr.getRandomPeers(MaxPeersInResponse, 1)
res := newDiscoveryResponse(rawData, peers)
p.logSend(fromAddr, res)
s.Send(fromAddr, marshal(res))
}
func (p *Protocol) validateDiscoveryResponse(s *server.Server, fromAddr string, fromID peer.ID, m *pb.DiscoveryResponse) bool {
// there must not be too many peers
if len(m.GetPeers()) > MaxPeersInResponse {
p.log.Debugw("invalid message",
"type", m.Name(),
"#peers", len(m.GetPeers()),
)
return false
}
// there must be a request waiting for this response
if !s.IsExpectedReply(fromAddr, fromID, m.Type(), m) {
p.log.Debugw("invalid message",
"type", m.Name(),
"unexpected", fromAddr,
)
return false
}
p.log.Debugw("valid message",
"type", m.Name(),
"addr", fromAddr,
)
return true
}
package discover
import (
"testing"
"time"
"github.com/iotaledger/goshimmer/packages/autopeering/peer"
"github.com/iotaledger/goshimmer/packages/autopeering/peer/peertest"
"github.com/iotaledger/goshimmer/packages/autopeering/peer/service"
"github.com/iotaledger/goshimmer/packages/autopeering/server"
"github.com/iotaledger/goshimmer/packages/autopeering/transport"
"github.com/iotaledger/goshimmer/packages/database/mapdb"
"github.com/iotaledger/hive.go/logger"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
)
const (
testNetwork = "test"
testAddress = "test"
graceTime = 100 * time.Millisecond
)
var log = logger.NewExampleLogger("discover")
func init() {
// decrease parameters to simplify and speed up tests
SetParameter(Parameters{
ReverifyInterval: 500 * time.Millisecond,
QueryInterval: 1000 * time.Millisecond,
MaxManaged: 10,
MaxReplacements: 2,
})
}
func TestProtVerifyMaster(t *testing.T) {
p2p := transport.P2P()
defer p2p.Close()
protA, closeA := newTestProtocol(p2p.A, log)
defer closeA()
peerA := getPeer(protA)
// use peerA as masters peer
protB, closeB := newTestProtocol(p2p.B, log, peerA)
time.Sleep(graceTime) // wait for the packages to ripple through the network
closeB() // close srvB to avoid race conditions, when asserting
if assert.EqualValues(t, 1, len(protB.mgr.active)) {
assert.EqualValues(t, peerA, &protB.mgr.active[0].Peer)
assert.EqualValues(t, 1, protB.mgr.active[0].verifiedCount)
}
}
func TestProtPingPong(t *testing.T) {
p2p := transport.P2P()
defer p2p.Close()
protA, closeA := newTestProtocol(p2p.A, log)
defer closeA()
protB, closeB := newTestProtocol(p2p.B, log)
defer closeB()
peerA := getPeer(protA)
peerB := getPeer(protB)
// send a Ping from node A to B
t.Run("A->B", func(t *testing.T) { assert.NoError(t, protA.Ping(peerB)) })
time.Sleep(graceTime)
// send a Ping from node B to A
t.Run("B->A", func(t *testing.T) { assert.NoError(t, protB.Ping(peerA)) })
time.Sleep(graceTime)
}
func TestProtPingTimeout(t *testing.T) {
p2p := transport.P2P()
defer p2p.Close()
protA, closeA := newTestProtocol(p2p.A, log)
defer closeA()
protB, closeB := newTestProtocol(p2p.B, log)
closeB() // close the connection right away to prevent any replies
// send a Ping from node A to B
err := protA.Ping(getPeer(protB))
assert.EqualError(t, err, server.ErrTimeout.Error())
}
func TestProtVerifiedPeers(t *testing.T) {
p2p := transport.P2P()
defer p2p.Close()
protA, closeA := newTestProtocol(p2p.A, log)
defer closeA()
protB, closeB := newTestProtocol(p2p.B, log)
defer closeB()
peerB := getPeer(protB)
// send a Ping from node A to B
assert.NoError(t, protA.Ping(peerB))
time.Sleep(graceTime)
// protA should have peerB as the single verified peer
assert.ElementsMatch(t, []*peer.Peer{peerB}, protA.GetVerifiedPeers())
for _, p := range protA.GetVerifiedPeers() {
assert.Equal(t, p, protA.GetVerifiedPeer(p.ID(), p.Address()))
}
}
func TestProtVerifiedPeer(t *testing.T) {
p2p := transport.P2P()
defer p2p.Close()
protA, closeA := newTestProtocol(p2p.A, log)
defer closeA()
protB, closeB := newTestProtocol(p2p.B, log)
defer closeB()
peerA := getPeer(protA)
peerB := getPeer(protB)
// send a Ping from node A to B
assert.NoError(t, protA.Ping(peerB))
time.Sleep(graceTime)
// we should have peerB as a verified peer
assert.Equal(t, peerB, protA.GetVerifiedPeer(peerB.ID(), peerB.Address()))
// we should not have ourselves as a verified peer
assert.Nil(t, protA.GetVerifiedPeer(peerA.ID(), peerA.Address()))
// the address of peerB should match
assert.Nil(t, protA.GetVerifiedPeer(peerB.ID(), ""))
}
func TestProtDiscoveryRequest(t *testing.T) {
p2p := transport.P2P()
defer p2p.Close()
protA, closeA := newTestProtocol(p2p.A, log)
defer closeA()
protB, closeB := newTestProtocol(p2p.B, log)
defer closeB()
peerA := getPeer(protA)
peerB := getPeer(protB)
// request peers from node A
t.Run("A->B", func(t *testing.T) {
if ps, err := protA.DiscoveryRequest(peerB); assert.NoError(t, err) {
assert.ElementsMatch(t, []*peer.Peer{peerA}, ps)
}
})
// request peers from node B
t.Run("B->A", func(t *testing.T) {
if ps, err := protB.DiscoveryRequest(peerA); assert.NoError(t, err) {
assert.ElementsMatch(t, []*peer.Peer{peerB}, ps)
}
})
}
func TestProtServices(t *testing.T) {
p2p := transport.P2P()
defer p2p.Close()
protA, closeA := newTestProtocol(p2p.A, log)
defer closeA()
err := protA.local().UpdateService(service.FPCKey, "fpc", p2p.A.LocalAddr().String())
require.NoError(t, err)
peerA := getPeer(protA)
// use peerA as masters peer
protB, closeB := newTestProtocol(p2p.B, log, peerA)
defer closeB()
time.Sleep(graceTime) // wait for the packages to ripple through the network
ps := protB.GetVerifiedPeers()
if assert.ElementsMatch(t, []*peer.Peer{peerA}, ps) {
assert.Equal(t, protA.local().Services(), ps[0].Services())
}
}
func TestProtDiscovery(t *testing.T) {
net := transport.NewNetwork("M", "A", "B", "C")
defer net.Close()
protM, closeM := newTestProtocol(net.GetTransport("M"), log)
defer closeM()
time.Sleep(graceTime) // wait for the master to initialize
protA, closeA := newTestProtocol(net.GetTransport("A"), log, getPeer(protM))
defer closeA()
protB, closeB := newTestProtocol(net.GetTransport("B"), log, getPeer(protM))
defer closeB()
protC, closeC := newTestProtocol(net.GetTransport("C"), log, getPeer(protM))
defer closeC()
time.Sleep(queryInterval + graceTime) // wait for the next discovery cycle
time.Sleep(reverifyInterval + graceTime) // wait for the next verification cycle
// now the full network should be discovered
assert.ElementsMatch(t, []*peer.Peer{getPeer(protA), getPeer(protB), getPeer(protC)}, protM.GetVerifiedPeers())
assert.ElementsMatch(t, []*peer.Peer{getPeer(protM), getPeer(protB), getPeer(protC)}, protA.GetVerifiedPeers())
assert.ElementsMatch(t, []*peer.Peer{getPeer(protM), getPeer(protA), getPeer(protC)}, protB.GetVerifiedPeers())
assert.ElementsMatch(t, []*peer.Peer{getPeer(protM), getPeer(protA), getPeer(protB)}, protC.GetVerifiedPeers())
}
func BenchmarkPingPong(b *testing.B) {
p2p := transport.P2P()
defer p2p.Close()
log := zap.NewNop().Sugar() // disable logging
// disable query/reverify
reverifyInterval = time.Hour
queryInterval = time.Hour
protA, closeA := newTestProtocol(p2p.A, log)
defer closeA()
protB, closeB := newTestProtocol(p2p.B, log)
defer closeB()
peerB := getPeer(protB)
// send initial Ping to ensure that every peer is verified
err := protA.Ping(peerB)
require.NoError(b, err)
time.Sleep(graceTime)
b.ResetTimer()
for n := 0; n < b.N; n++ {
// send a Ping from node A to B
_ = protA.Ping(peerB)
}
b.StopTimer()
}
func BenchmarkDiscoveryRequest(b *testing.B) {
p2p := transport.P2P()
defer p2p.Close()
log := zap.NewNop().Sugar() // disable logging
// disable query/reverify
reverifyInterval = time.Hour
queryInterval = time.Hour
protA, closeA := newTestProtocol(p2p.A, log)
defer closeA()
protB, closeB := newTestProtocol(p2p.B, log)
defer closeB()
peerB := getPeer(protB)
// send initial DiscoveryRequest to ensure that every peer is verified
_, err := protA.DiscoveryRequest(peerB)
require.NoError(b, err)
time.Sleep(graceTime)
b.ResetTimer()
for n := 0; n < b.N; n++ {
_, _ = protA.DiscoveryRequest(peerB)
}
b.StopTimer()
}
// newTestProtocol creates a new discovery server and also returns the teardown.
func newTestProtocol(trans transport.Transport, logger *logger.Logger, masters ...*peer.Peer) (*Protocol, func()) {
db, _ := peer.NewDB(mapdb.NewMapDB())
local := peertest.NewLocal(trans.LocalAddr().Network(), trans.LocalAddr().String(), db)
log := logger.Named(trans.LocalAddr().String())
prot := New(local, Config{Log: log, MasterPeers: masters})
srv := server.Serve(local, trans, log, prot)
prot.Start(srv)
teardown := func() {
srv.Close()
prot.Close()
}
return prot, teardown
}
func getPeer(p *Protocol) *peer.Peer {
return &p.local().Peer
}
package discover
import (
"container/ring"
"math/rand"
"sync"
"time"
)
// doQuery is the main method of the query strategy.
// It writes the next time this function should be called by the manager to next.
// The current strategy is to always select the latest verified peer and one of
// the peers that returned the most number of peers the last time it was queried.
func (m *manager) doQuery(next chan<- time.Duration) {
defer func() { next <- queryInterval }()
ps := m.peersToQuery()
if len(ps) == 0 {
return
}
m.log.Debugw("querying",
"#peers", len(ps),
)
// request from peers in parallel
var wg sync.WaitGroup
wg.Add(len(ps))
for _, p := range ps {
go m.requestWorker(p, &wg)
}
wg.Wait()
}
func (m *manager) requestWorker(p *mpeer, wg *sync.WaitGroup) {
defer wg.Done()
peers, err := m.net.DiscoveryRequest(unwrapPeer(p))
if err != nil || len(peers) == 0 {
p.lastNewPeers = 0
m.log.Debugw("query failed",
"id", p.ID(),
"addr", p.Address(),
"err", err,
)
return
}
var added uint
for _, rp := range peers {
if m.addDiscoveredPeer(rp) {
added++
}
}
p.lastNewPeers = added
m.log.Debugw("queried",
"id", p.ID(),
"addr", p.Address(),
"#added", added,
)
}
// peersToQuery selects the peers that should be queried.
func (m *manager) peersToQuery() []*mpeer {
ps := m.getVerifiedPeers()
if len(ps) == 0 {
return nil
}
latest := ps[0]
if len(ps) == 1 {
return []*mpeer{latest}
}
// find the 3 heaviest peers
r := ring.New(3)
for i, p := range ps {
if i == 0 {
continue // the latest peer is already included
}
if r.Value == nil {
r.Value = p
} else if p.lastNewPeers >= r.Value.(*mpeer).lastNewPeers {
r = r.Next()
r.Value = p
}
}
// select a random peer from the heaviest ones
r.Move(rand.Intn(r.Len()))
return []*mpeer{latest, r.Value.(*mpeer)}
}
package distance
const (
Max = 4294967295
)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment