Skip to content
Snippets Groups Projects
Unverified Commit e8b6b9e5 authored by Luca Moser's avatar Luca Moser Committed by GitHub
Browse files

Adds bootstrap/sync/issuer plugin (#390)

* adds bootstrap plugin

* check sync state in sync-integration test

* adds synced and bootstrapping plugin to integration test networks

* fix :dog:comments

* re-introduce go.mod into integration test dir

* adds desynchronization monitor

* adds shutdown priority for bootstrap plugin bk. worker

* give the dog some pedigree

* adds anchor point cleanup interval

* fix review dog comments

* go mod tidy powered by Marie Kondo
parent 8e22cef5
No related branches found
No related tags found
No related merge requests found
Showing
with 496 additions and 25 deletions
......@@ -6,24 +6,26 @@ import (
"github.com/iotaledger/hive.go/types"
"github.com/iotaledger/goshimmer/packages/binary/messagelayer/messagefactory"
"github.com/iotaledger/goshimmer/packages/binary/messagelayer/message"
"github.com/iotaledger/goshimmer/packages/binary/messagelayer/payload"
)
// IssuePayloadFunc is a function which issues a payload.
type IssuePayloadFunc = func(payload payload.Payload) (*message.Message, error)
// Spammer spams messages with a static data payload.
type Spammer struct {
messageFactory *messagefactory.MessageFactory
issuePayloadFunc IssuePayloadFunc
processId int64
shutdownSignal chan types.Empty
}
// New creates a new spammer.
func New(messageFactory *messagefactory.MessageFactory) *Spammer {
func New(issuePayloadFunc IssuePayloadFunc) *Spammer {
return &Spammer{
messageFactory: messageFactory,
shutdownSignal: make(chan types.Empty),
issuePayloadFunc: issuePayloadFunc,
shutdownSignal: make(chan types.Empty),
}
}
......@@ -46,7 +48,8 @@ func (spammer *Spammer) run(mps int, processId int64) {
return
}
spammer.messageFactory.IssuePayload(payload.NewData([]byte("SPAM")))
// we don't care about errors or the actual issued message
_, _ = spammer.issuePayloadFunc(payload.NewData([]byte("SPAM")))
currentSentCounter++
......
......@@ -11,7 +11,8 @@ const (
PriorityGossip
PriorityWebAPI
PriorityDashboard
PriorityGraph
PrioritySynchronization
PriorityBootstrap
PrioritySpammer
PriorityBadgerGarbageCollection
)
......@@ -3,17 +3,20 @@ package core
import (
"github.com/iotaledger/goshimmer/plugins/autopeering"
"github.com/iotaledger/goshimmer/plugins/banner"
"github.com/iotaledger/goshimmer/plugins/bootstrap"
"github.com/iotaledger/goshimmer/plugins/cli"
"github.com/iotaledger/goshimmer/plugins/config"
"github.com/iotaledger/goshimmer/plugins/database"
"github.com/iotaledger/goshimmer/plugins/drng"
"github.com/iotaledger/goshimmer/plugins/gossip"
"github.com/iotaledger/goshimmer/plugins/gracefulshutdown"
"github.com/iotaledger/goshimmer/plugins/issuer"
"github.com/iotaledger/goshimmer/plugins/logger"
"github.com/iotaledger/goshimmer/plugins/messagelayer"
"github.com/iotaledger/goshimmer/plugins/metrics"
"github.com/iotaledger/goshimmer/plugins/portcheck"
"github.com/iotaledger/goshimmer/plugins/profiling"
"github.com/iotaledger/goshimmer/plugins/sync"
"github.com/iotaledger/hive.go/node"
)
......@@ -29,6 +32,9 @@ var PLUGINS = node.Plugins(
autopeering.Plugin,
messagelayer.Plugin,
gossip.Plugin,
issuer.Plugin,
bootstrap.Plugin,
sync.Plugin,
gracefulshutdown.Plugin,
metrics.Plugin,
drng.Plugin,
......
package bootstrap
import (
"time"
"github.com/iotaledger/goshimmer/packages/binary/spammer"
"github.com/iotaledger/goshimmer/packages/shutdown"
"github.com/iotaledger/goshimmer/plugins/config"
"github.com/iotaledger/goshimmer/plugins/issuer"
"github.com/iotaledger/goshimmer/plugins/sync"
"github.com/iotaledger/hive.go/daemon"
"github.com/iotaledger/hive.go/logger"
"github.com/iotaledger/hive.go/node"
flag "github.com/spf13/pflag"
)
const (
// PluginName is the plugin name of the bootstrap plugin.
PluginName = "Bootstrap"
// CfgBootstrapInitialIssuanceTimePeriodSec defines the initial time period of how long the node should be
// issuing messages when started in bootstrapping mode. If the value is set to -1, the issuance is continuous.
CfgBootstrapInitialIssuanceTimePeriodSec = "bootstrap.initialIssuance.timePeriodSec"
// the messages per second to issue when in bootstrapping mode.
initialIssuanceMPS = 1
// the value which determines a continuous issuance of messages from the bootstrap plugin.
continuousIssuance = -1
)
func init() {
flag.Int(CfgBootstrapInitialIssuanceTimePeriodSec, -1, "the initial time period of how long the node should be issuing messages when started in bootstrapping mode. "+
"if the value is set to -1, the issuance is continuous.")
}
var (
// Plugin is the plugin instance of the bootstrap plugin.
Plugin = node.NewPlugin(PluginName, node.Disabled, configure, run)
log *logger.Logger
)
func configure(_ *node.Plugin) {
log = logger.NewLogger(PluginName)
// we're auto. synced if we start in bootstrapping mode
sync.OverwriteSyncedState(true)
log.Infof("starting node in bootstrapping mode")
}
func run(_ *node.Plugin) {
messageSpammer := spammer.New(issuer.IssuePayload)
issuancePeriodSec := config.Node.GetInt(CfgBootstrapInitialIssuanceTimePeriodSec)
issuancePeriod := time.Duration(issuancePeriodSec) * time.Second
// issue messages on top of the genesis
_ = daemon.BackgroundWorker("Bootstrapping-Issuer", func(shutdownSignal <-chan struct{}) {
messageSpammer.Start(initialIssuanceMPS)
defer messageSpammer.Shutdown()
// don't stop issuing messages if in continuous mode
if issuancePeriodSec == continuousIssuance {
log.Info("continuously issuing bootstrapping messages")
<-shutdownSignal
return
}
log.Infof("issuing bootstrapping messages for %d seconds", issuancePeriodSec)
select {
case <-time.After(issuancePeriod):
case <-shutdownSignal:
}
}, shutdown.PriorityBootstrap)
}
package issuer
import (
"fmt"
"github.com/iotaledger/goshimmer/packages/binary/messagelayer/message"
"github.com/iotaledger/goshimmer/packages/binary/messagelayer/payload"
"github.com/iotaledger/goshimmer/plugins/messagelayer"
"github.com/iotaledger/goshimmer/plugins/sync"
"github.com/iotaledger/hive.go/node"
)
// PluginName is the name of the issuer plugin.
const PluginName = "Issuer"
var (
// Plugin is the plugin instance of the issuer plugin.
Plugin = node.NewPlugin(PluginName, node.Enabled, configure)
)
func configure(_ *node.Plugin) {}
// IssuePayload issues a payload to the message layer.
// If the node is not synchronized an error is returned.
func IssuePayload(payload payload.Payload) (*message.Message, error) {
if !sync.Synced() {
return nil, fmt.Errorf("can't issue payload: %w", sync.ErrNodeNotSynchronized)
}
return messagelayer.MessageFactory.IssuePayload(payload), nil
}
package sync
import (
"sync"
"time"
"github.com/iotaledger/goshimmer/packages/binary/messagelayer/message"
"github.com/iotaledger/goshimmer/packages/binary/messagelayer/tangle"
"github.com/iotaledger/goshimmer/packages/shutdown"
"github.com/iotaledger/goshimmer/plugins/autopeering/local"
"github.com/iotaledger/goshimmer/plugins/config"
"github.com/iotaledger/goshimmer/plugins/gossip"
"github.com/iotaledger/goshimmer/plugins/messagelayer"
"github.com/iotaledger/hive.go/autopeering/peer"
"github.com/iotaledger/hive.go/daemon"
"github.com/iotaledger/hive.go/events"
"github.com/iotaledger/hive.go/logger"
"github.com/iotaledger/hive.go/node"
"github.com/iotaledger/hive.go/types"
"github.com/pkg/errors"
flag "github.com/spf13/pflag"
"go.uber.org/atomic"
)
const (
// PluginName is the plugin name of the sync plugin.
PluginName = "Sync"
// CfgSyncAnchorPointsCount defines the amount of anchor points to use to determine
// whether a node is synchronized.
CfgSyncAnchorPointsCount = "sync.anchorPointsCount"
// CfgSyncAnchorPointsCleanupAfterSec defines the amount of time which is allowed to pass between setting an anchor
// point and it not becoming solid (to clean its slot for another anchor point). It basically defines the expectancy
// of how long it should take for an anchor point to become solid. Even if this value is set too low, usually a node
// would eventually solidify collected anchor points.
CfgSyncAnchorPointsCleanupAfterSec = "sync.anchorPointsCleanupAfterSec"
// CfgSyncAnchorPointsCleanupIntervalSec defines the interval at which it is checked whether anchor points fall
// into the cleanup window.
CfgSyncAnchorPointsCleanupIntervalSec = "sync.anchorPointsCleanupIntervalSec"
// CfgSyncDesyncedIfNoMessageAfterSec defines the time period in which new messages must be received and if not
// the node is marked as desynced.
CfgSyncDesyncedIfNoMessageAfterSec = "sync.desyncedIfNoMessagesAfterSec"
)
func init() {
flag.Int(CfgSyncAnchorPointsCount, 3, "the amount of anchor points to use to determine whether a node is synchronized")
flag.Int(CfgSyncDesyncedIfNoMessageAfterSec, 300, "the time period in seconds which sets the node as desynced if no new messages are received")
flag.Int(CfgSyncAnchorPointsCleanupIntervalSec, 10, "the interval at which it is checked whether anchor points fall into the cleanup window")
flag.Int(CfgSyncAnchorPointsCleanupAfterSec, 60, "the amount of time which is allowed to pass between setting an anchor point and it not becoming solid (to clean its slot for another anchor point)")
}
var (
// Plugin is the plugin instance of the sync plugin.
Plugin = node.NewPlugin(PluginName, node.Enabled, configure, run)
// ErrNodeNotSynchronized is returned when an operation can't be executed because
// the node is not synchronized.
ErrNodeNotSynchronized = errors.New("node is not synchronized")
// tells whether the node is synced or not.
synced atomic.Bool
log *logger.Logger
)
// Synced tells whether the node is in a state we consider synchronized, meaning
// it has the relevant past and present message data.
func Synced() bool {
return synced.Load()
}
// OverwriteSyncedState overwrites the synced state with the given value.
func OverwriteSyncedState(syncedOverwrite bool) {
synced.Store(syncedOverwrite)
}
func configure(_ *node.Plugin) {
log = logger.NewLogger(PluginName)
}
func run(_ *node.Plugin) {
// per default the node starts in a desynced state
if !Synced() {
monitorForSynchronization()
return
}
// however, another plugin might want to overwrite the synced state (i.e. the bootstrap plugin)
// in order to start issuing messages
monitorForDesynchronization()
}
// marks the node as synced and spawns the background worker to monitor desynchronization.
func markSynced() {
synced.Store(true)
monitorForDesynchronization()
}
// marks the node as desynced and spawns the background worker to monitor synchronization.
func markDesynced() {
synced.Store(false)
monitorForSynchronization()
}
// starts a background worker and event handlers to check whether the node is desynchronized by checking
// whether the node has no more peers or didn't receive any message in a given time period.
func monitorForDesynchronization() {
log.Info("monitoring for desynchronization")
// monitors the peer count of the manager and sets the node as desynced if it has no more peers.
noPeers := make(chan types.Empty)
monitorPeerCountClosure := events.NewClosure(func(_ *peer.Peer) {
anyPeers := len(gossip.Manager().AllNeighbors()) > 0
if anyPeers {
return
}
noPeers <- types.Empty{}
})
msgReceived := make(chan types.Empty, 1)
monitorMessageInflowClosure := events.NewClosure(func(cachedMessage *message.CachedMessage, cachedMessageMetadata *tangle.CachedMessageMetadata) {
defer cachedMessage.Release()
defer cachedMessageMetadata.Release()
// ignore messages sent by the node itself
if local.GetInstance().LocalIdentity().PublicKey() == cachedMessage.Unwrap().IssuerPublicKey() {
return
}
select {
case msgReceived <- types.Empty{}:
default:
// via this default clause, a slow desync-monitor select-loop
// worker should not increase latency as it auto. falls through
}
})
daemon.BackgroundWorker("Desync-Monitor", func(shutdownSignal <-chan struct{}) {
gossip.Manager().Events().NeighborRemoved.Attach(monitorPeerCountClosure)
defer gossip.Manager().Events().NeighborRemoved.Detach(monitorPeerCountClosure)
messagelayer.Tangle.Events.MessageAttached.Attach(monitorMessageInflowClosure)
defer messagelayer.Tangle.Events.MessageAttached.Detach(monitorMessageInflowClosure)
desyncedIfNoMessageInSec := config.Node.GetDuration(CfgSyncDesyncedIfNoMessageAfterSec) * time.Second
timer := time.NewTimer(desyncedIfNoMessageInSec)
for {
select {
case <-msgReceived:
// we received a message, therefore reset the timer to check for message receives
if !timer.Stop() {
<-timer.C
}
// TODO: perhaps find a better way instead of constantly resetting the timer
timer.Reset(desyncedIfNoMessageInSec)
case <-timer.C:
log.Infof("no message received in %d seconds, marking node as desynced", desyncedIfNoMessageInSec)
markDesynced()
return
case <-noPeers:
log.Info("all peers have been lost, marking node as desynced")
markDesynced()
return
case <-shutdownSignal:
return
}
}
}, shutdown.PrioritySynchronization)
}
// starts a background worker and event handlers to check whether the node is synchronized by first collecting
// a set of newly received messages and then waiting for them to become solid.
func monitorForSynchronization() {
wantedAnchorPointsCount := config.Node.GetInt(CfgSyncAnchorPointsCount)
anchorPoints := newAnchorPoints(wantedAnchorPointsCount)
log.Infof("monitoring for synchronization, awaiting %d anchor point messages to become solid", wantedAnchorPointsCount)
synced := make(chan types.Empty)
initAnchorPointClosure := events.NewClosure(func(cachedMessage *message.CachedMessage, cachedMessageMetadata *tangle.CachedMessageMetadata) {
defer cachedMessage.Release()
defer cachedMessageMetadata.Release()
if addedAnchorID := initAnchorPoint(anchorPoints, cachedMessage.Unwrap()); addedAnchorID != nil {
anchorPoints.Lock()
defer anchorPoints.Unlock()
log.Infof("added message %s as anchor point (%d of %d collected)", addedAnchorID.String()[:10], anchorPoints.collectedCount(), anchorPoints.wanted)
}
})
checkAnchorPointSolidityClosure := events.NewClosure(func(cachedMessage *message.CachedMessage, cachedMessageMetadata *tangle.CachedMessageMetadata) {
defer cachedMessage.Release()
defer cachedMessageMetadata.Release()
allSolid, newSolidAnchorID := checkAnchorPointSolidity(anchorPoints, cachedMessage.Unwrap())
if newSolidAnchorID != nil {
log.Infof("anchor message %s has become solid", newSolidAnchorID.String()[:10])
}
if !allSolid {
return
}
synced <- types.Empty{}
})
daemon.BackgroundWorker("Sync-Monitor", func(shutdownSignal <-chan struct{}) {
messagelayer.Tangle.Events.MessageAttached.Attach(initAnchorPointClosure)
defer messagelayer.Tangle.Events.MessageAttached.Detach(initAnchorPointClosure)
messagelayer.Tangle.Events.MessageSolid.Attach(checkAnchorPointSolidityClosure)
defer messagelayer.Tangle.Events.MessageSolid.Detach(checkAnchorPointSolidityClosure)
cleanupDelta := config.Node.GetDuration(CfgSyncAnchorPointsCleanupAfterSec) * time.Second
ticker := time.NewTimer(config.Node.GetDuration(CfgSyncAnchorPointsCleanupIntervalSec) * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
anchorPoints.Lock()
for id, itGotAdded := range anchorPoints.ids {
if time.Since(itGotAdded) > cleanupDelta {
log.Infof("freeing anchor point slot of %s as it didn't become solid within %v", id.String()[:10], cleanupDelta)
delete(anchorPoints.ids, id)
}
}
anchorPoints.Unlock()
case <-shutdownSignal:
return
case <-synced:
log.Infof("all anchor messages have become solid, marking node as synced")
markSynced()
return
}
}
}, shutdown.PrioritySynchronization)
}
// fills up the anchor points with newly attached messages which then are used to determine whether we are synchronized.
func initAnchorPoint(anchorPoints *anchorpoints, msg *message.Message) *message.Id {
if synced.Load() {
return nil
}
anchorPoints.Lock()
defer anchorPoints.Unlock()
// we don't need to add additional anchor points if the set was already filled once
if anchorPoints.wasFilled() {
return nil
}
// as a rule, we don't consider messages attaching directly to genesis anchors
if msg.TrunkId() == message.EmptyId || msg.BranchId() == message.EmptyId {
return nil
}
// add a new anchor point
id := msg.Id()
anchorPoints.add(id)
return &id
}
// checks whether an anchor point message became solid.
// if all anchor points became solid, it sets the node's state to synchronized.
func checkAnchorPointSolidity(anchorPoints *anchorpoints, msg *message.Message) (bool, *message.Id) {
anchorPoints.Lock()
defer anchorPoints.Unlock()
if synced.Load() || len(anchorPoints.ids) == 0 {
return false, nil
}
// check whether an anchor message become solid
msgID := msg.Id()
if !anchorPoints.has(msgID) {
return false, nil
}
// an anchor became solid
anchorPoints.markAsSolidified(msgID)
if !anchorPoints.wereAllSolidified() {
return false, &msgID
}
// all anchor points have become solid
return true, &msgID
}
func newAnchorPoints(wantedAnchorPointsCount int) *anchorpoints {
return &anchorpoints{
ids: make(map[message.Id]time.Time),
wanted: wantedAnchorPointsCount,
}
}
// anchorpoints are a set of messages which we use to determine whether the node has become synchronized.
type anchorpoints struct {
sync.Mutex
// the ids of the anchor points with their addition time.
ids map[message.Id]time.Time
// the wanted amount of anchor points which should become solid.
wanted int
// how many anchor points have been solidified.
solidified int
}
// adds the given message to the anchor points set.
func (ap *anchorpoints) add(id message.Id) {
ap.ids[id] = time.Now()
}
func (ap *anchorpoints) has(id message.Id) bool {
_, has := ap.ids[id]
return has
}
// marks the given anchor point as solidified which removes it from the set and bumps the solidified count.
func (ap *anchorpoints) markAsSolidified(id message.Id) {
delete(ap.ids, id)
ap.solidified++
}
// tells whether the anchor points set was filled at some point.
func (ap *anchorpoints) wasFilled() bool {
return ap.collectedCount() == ap.wanted
}
// tells whether all anchor points have become solid.
func (ap *anchorpoints) wereAllSolidified() bool {
return ap.solidified == ap.wanted
}
// tells the number of effectively collected anchor points.
func (ap *anchorpoints) collectedCount() int {
// since an anchor point potentially was solidified before the set became full,
// we need to incorporate that count too
return ap.solidified + len(ap.ids)
}
......@@ -4,7 +4,7 @@ import (
"net/http"
"github.com/iotaledger/goshimmer/packages/binary/messagelayer/payload"
"github.com/iotaledger/goshimmer/plugins/messagelayer"
"github.com/iotaledger/goshimmer/plugins/issuer"
"github.com/iotaledger/goshimmer/plugins/webapi"
"github.com/iotaledger/hive.go/logger"
"github.com/iotaledger/hive.go/node"
......@@ -35,7 +35,10 @@ func broadcastData(c echo.Context) error {
}
//TODO: to check max payload size allowed, if exceeding return an error
msg := messagelayer.MessageFactory.IssuePayload(payload.NewData(request.Data))
msg, err := issuer.IssuePayload(payload.NewData(request.Data))
if err != nil {
return c.JSON(http.StatusBadRequest, Response{Error: err.Error()})
}
return c.JSON(http.StatusOK, Response{ID: msg.Id().String()})
}
......
......@@ -4,7 +4,7 @@ import (
"net/http"
"github.com/iotaledger/goshimmer/packages/binary/drng/subtypes/collectiveBeacon/payload"
"github.com/iotaledger/goshimmer/plugins/messagelayer"
"github.com/iotaledger/goshimmer/plugins/issuer"
"github.com/iotaledger/hive.go/marshalutil"
"github.com/labstack/echo"
"github.com/labstack/gommon/log"
......@@ -25,7 +25,10 @@ func Handler(c echo.Context) error {
return c.JSON(http.StatusBadRequest, Response{Error: "not a valid Collective Beacon payload"})
}
msg := messagelayer.MessageFactory.IssuePayload(parsedPayload)
msg, err := issuer.IssuePayload(parsedPayload)
if err != nil {
return c.JSON(http.StatusBadRequest, Response{Error: err.Error()})
}
return c.JSON(http.StatusOK, Response{ID: msg.Id().String()})
}
......
......@@ -5,6 +5,7 @@ import (
"github.com/iotaledger/goshimmer/plugins/autopeering/local"
"github.com/iotaledger/goshimmer/plugins/banner"
"github.com/iotaledger/goshimmer/plugins/sync"
"github.com/iotaledger/goshimmer/plugins/webapi"
"github.com/iotaledger/hive.go/node"
"github.com/labstack/echo"
......@@ -24,6 +25,7 @@ func configure(_ *node.Plugin) {
// e.g.,
// {
// "version":"v0.2.0",
// "synchronized": true,
// "identityID":"5bf4aa1d6c47e4ce",
// "publickey":"CjUsn86jpFHWnSCx3NhWfU4Lk16mDdy1Hr7ERSTv3xn9",
// "enabledplugins":[
......@@ -70,6 +72,7 @@ func getInfo(c echo.Context) error {
return c.JSON(http.StatusOK, Response{
Version: banner.AppVersion,
Synced: sync.Synced(),
IdentityID: local.GetInstance().Identity.ID().String(),
PublicKey: local.GetInstance().PublicKey().String(),
EnabledPlugins: enabledPlugins,
......@@ -81,6 +84,8 @@ func getInfo(c echo.Context) error {
type Response struct {
// version of GoShimmer
Version string `json:"version,omitempty"`
// whether the node is synchronized
Synced bool `json:"synced"`
// identity ID of the node encoded in hex and truncated to its first 8 bytes
IdentityID string `json:"identityID,omitempty"`
// public key of the node encoded in base58
......
......@@ -3,7 +3,7 @@ package spammer
import (
"github.com/iotaledger/goshimmer/packages/binary/spammer"
"github.com/iotaledger/goshimmer/packages/shutdown"
"github.com/iotaledger/goshimmer/plugins/messagelayer"
"github.com/iotaledger/goshimmer/plugins/issuer"
"github.com/iotaledger/goshimmer/plugins/webapi"
"github.com/iotaledger/hive.go/daemon"
"github.com/iotaledger/hive.go/node"
......@@ -18,7 +18,7 @@ const PluginName = "Spammer"
var Plugin = node.NewPlugin(PluginName, node.Disabled, configure, run)
func configure(plugin *node.Plugin) {
messageSpammer = spammer.New(messagelayer.MessageFactory)
messageSpammer = spammer.New(issuer.IssuePayload)
webapi.Server.GET("spammer", handleRequest)
}
......
......@@ -17,6 +17,7 @@ services:
- integration-test
peer_master:
command: --node.enablePlugins=bootstrap
container_name: peer_master
image: iotaledger/goshimmer
build:
......
......@@ -7,7 +7,6 @@ services:
working_dir: /go/src/github.com/iotaledger/goshimmer/tools/integration-tests/tester
entrypoint: go test ./tests -v -mod=readonly
volumes:
- /var/run/docker.sock:/var/run/docker.sock:ro
- ../../..:/go/src/github.com/iotaledger/goshimmer:ro
- ../logs:/tmp/logs
- /var/run/docker.sock:/var/run/docker.sock:ro
- ../../..:/go/src/github.com/iotaledger/goshimmer:ro
- ../logs:/tmp/logs
\ No newline at end of file
......@@ -69,7 +69,7 @@ func (d *DockerContainer) CreateGoShimmerEntryNode(name string, seed string) err
}
// CreateGoShimmerPeer creates a new container with the GoShimmer peer's configuration.
func (d *DockerContainer) CreateGoShimmerPeer(name string, seed string, entryNodeHost string, entryNodePublicKey string) error {
func (d *DockerContainer) CreateGoShimmerPeer(name string, seed string, entryNodeHost string, entryNodePublicKey string, bootstrap bool) error {
// configure GoShimmer container instance
containerConfig := &container.Config{
Image: "iotaledger/goshimmer",
......@@ -78,6 +78,12 @@ func (d *DockerContainer) CreateGoShimmerPeer(name string, seed string, entryNod
},
Cmd: strslice.StrSlice{
fmt.Sprintf("--node.disablePlugins=%s", disabledPluginsPeer),
fmt.Sprintf("--node.enablePlugins=%s", func() string {
if bootstrap {
return "Bootstrap"
}
return ""
}()),
"--webapi.bindAddress=0.0.0.0:8080",
fmt.Sprintf("--autopeering.seed=%s", seed),
fmt.Sprintf("--autopeering.entryNodes=%s@%s:14626", entryNodePublicKey, entryNodeHost),
......
......@@ -55,6 +55,7 @@ func newFramework() (*Framework, error) {
// CreateNetwork creates and returns a (Docker) Network that contains `peers` GoShimmer nodes.
// It waits for the peers to autopeer until the minimum neighbors criteria is met for every peer.
// The first peer automatically starts with the bootstrap plugin enabled.
func (f *Framework) CreateNetwork(name string, peers int, minimumNeighbors int) (*Network, error) {
network, err := newNetwork(f.dockerClient, strings.ToLower(name), f.tester)
if err != nil {
......@@ -68,8 +69,8 @@ func (f *Framework) CreateNetwork(name string, peers int, minimumNeighbors int)
// create peers/GoShimmer nodes
for i := 0; i < peers; i++ {
_, err = network.CreatePeer()
if err != nil {
bootstrap := i == 0
if _, err = network.CreatePeer(bootstrap); err != nil {
return nil, err
}
}
......
......@@ -82,7 +82,8 @@ func (n *Network) createEntryNode() error {
}
// CreatePeer creates a new peer/GoShimmer node in the network and returns it.
func (n *Network) CreatePeer() (*Peer, error) {
// Passing bootstrap true enables the bootstrap plugin on the given peer.
func (n *Network) CreatePeer(bootstrap bool) (*Peer, error) {
name := n.namePrefix(fmt.Sprintf("%s%d", containerNameReplica, len(n.peers)))
// create identity
......@@ -94,7 +95,7 @@ func (n *Network) CreatePeer() (*Peer, error) {
// create Docker container
container := NewDockerContainer(n.dockerClient)
err = container.CreateGoShimmerPeer(name, seed, n.namePrefix(containerNameEntryNode), n.entryNodePublicKey())
err = container.CreateGoShimmerPeer(name, seed, n.namePrefix(containerNameEntryNode), n.entryNodePublicKey(), bootstrap)
if err != nil {
return nil, err
}
......
......@@ -18,6 +18,9 @@ func TestNodeSynchronization(t *testing.T) {
require.NoError(t, err)
defer n.Shutdown()
// wait for peers to change their state to synchronized
time.Sleep(5 * time.Second)
numMessages := 100
idsMap := make(map[string]MessageSent, numMessages)
ids := make([]string, numMessages)
......@@ -40,7 +43,7 @@ func TestNodeSynchronization(t *testing.T) {
checkForMessageIds(t, n.Peers(), ids, idsMap)
// spawn peer without knowledge of previous messages
newPeer, err := n.CreatePeer()
newPeer, err := n.CreatePeer(false)
require.NoError(t, err)
err = n.WaitForAutopeering(3)
require.NoError(t, err)
......@@ -91,6 +94,11 @@ func sendDataMessage(t *testing.T, peer *framework.Peer, data []byte, number int
func checkForMessageIds(t *testing.T, peers []*framework.Peer, ids []string, idsMap map[string]MessageSent) {
for _, peer := range peers {
// check that the peer sees itself as synchronized
info, err := peer.Info()
require.NoError(t, err)
require.True(t, info.Synced)
resp, err := peer.FindMessageByID(ids)
require.NoError(t, err)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment