From e8b6b9e50456446004675dfeb61659c610719bb8 Mon Sep 17 00:00:00 2001 From: Luca Moser <moser.luca@gmail.com> Date: Thu, 30 Apr 2020 15:19:47 +0200 Subject: [PATCH] Adds bootstrap/sync/issuer plugin (#390) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * adds bootstrap plugin * check sync state in sync-integration test * adds synced and bootstrapping plugin to integration test networks * fix 🐶comments * re-introduce go.mod into integration test dir * adds desynchronization monitor * adds shutdown priority for bootstrap plugin bk. worker * give the dog some pedigree * adds anchor point cleanup interval * fix review dog comments * go mod tidy powered by Marie Kondo --- go.mod | 2 +- packages/binary/spammer/spammer.go | 17 +- packages/shutdown/order.go | 3 +- pluginmgr/core/plugins.go | 6 + plugins/bootstrap/plugin.go | 69 ++++ plugins/issuer/plugin.go | 30 ++ plugins/sync/plugin.go | 335 ++++++++++++++++++ plugins/webapi/data/plugin.go | 7 +- .../webapi/drng/collectivebeacon/handler.go | 7 +- plugins/webapi/info/plugin.go | 5 + plugins/webapi/spammer/plugin.go | 4 +- tools/docker-network/docker-compose.yml | 1 + .../tester/docker-compose.yml | 7 +- .../tester/framework/docker.go | 8 +- .../tester/framework/framework.go | 5 +- .../tester/framework/network.go | 5 +- .../tester/tests/synchronization_test.go | 10 +- 17 files changed, 496 insertions(+), 25 deletions(-) create mode 100644 plugins/bootstrap/plugin.go create mode 100644 plugins/issuer/plugin.go create mode 100644 plugins/sync/plugin.go diff --git a/go.mod b/go.mod index 19cc19dc..db8945ff 100644 --- a/go.mod +++ b/go.mod @@ -17,7 +17,7 @@ require ( github.com/magiconair/properties v1.8.1 github.com/mr-tron/base58 v1.1.3 github.com/panjf2000/ants/v2 v2.2.2 - github.com/pkg/errors v0.9.1 // indirect + github.com/pkg/errors v0.9.1 github.com/spf13/pflag v1.0.5 github.com/spf13/viper v1.6.2 github.com/stretchr/testify v1.5.1 diff --git a/packages/binary/spammer/spammer.go b/packages/binary/spammer/spammer.go index ae607a9a..df877550 100644 --- a/packages/binary/spammer/spammer.go +++ b/packages/binary/spammer/spammer.go @@ -6,24 +6,26 @@ import ( "github.com/iotaledger/hive.go/types" - "github.com/iotaledger/goshimmer/packages/binary/messagelayer/messagefactory" + "github.com/iotaledger/goshimmer/packages/binary/messagelayer/message" "github.com/iotaledger/goshimmer/packages/binary/messagelayer/payload" ) +// IssuePayloadFunc is a function which issues a payload. +type IssuePayloadFunc = func(payload payload.Payload) (*message.Message, error) + // Spammer spams messages with a static data payload. type Spammer struct { - messageFactory *messagefactory.MessageFactory + issuePayloadFunc IssuePayloadFunc processId int64 shutdownSignal chan types.Empty } // New creates a new spammer. -func New(messageFactory *messagefactory.MessageFactory) *Spammer { +func New(issuePayloadFunc IssuePayloadFunc) *Spammer { return &Spammer{ - messageFactory: messageFactory, - - shutdownSignal: make(chan types.Empty), + issuePayloadFunc: issuePayloadFunc, + shutdownSignal: make(chan types.Empty), } } @@ -46,7 +48,8 @@ func (spammer *Spammer) run(mps int, processId int64) { return } - spammer.messageFactory.IssuePayload(payload.NewData([]byte("SPAM"))) + // we don't care about errors or the actual issued message + _, _ = spammer.issuePayloadFunc(payload.NewData([]byte("SPAM"))) currentSentCounter++ diff --git a/packages/shutdown/order.go b/packages/shutdown/order.go index 319a8b06..ff483327 100644 --- a/packages/shutdown/order.go +++ b/packages/shutdown/order.go @@ -11,7 +11,8 @@ const ( PriorityGossip PriorityWebAPI PriorityDashboard - PriorityGraph + PrioritySynchronization + PriorityBootstrap PrioritySpammer PriorityBadgerGarbageCollection ) diff --git a/pluginmgr/core/plugins.go b/pluginmgr/core/plugins.go index 2ad52fcc..e4ae7af0 100644 --- a/pluginmgr/core/plugins.go +++ b/pluginmgr/core/plugins.go @@ -3,17 +3,20 @@ package core import ( "github.com/iotaledger/goshimmer/plugins/autopeering" "github.com/iotaledger/goshimmer/plugins/banner" + "github.com/iotaledger/goshimmer/plugins/bootstrap" "github.com/iotaledger/goshimmer/plugins/cli" "github.com/iotaledger/goshimmer/plugins/config" "github.com/iotaledger/goshimmer/plugins/database" "github.com/iotaledger/goshimmer/plugins/drng" "github.com/iotaledger/goshimmer/plugins/gossip" "github.com/iotaledger/goshimmer/plugins/gracefulshutdown" + "github.com/iotaledger/goshimmer/plugins/issuer" "github.com/iotaledger/goshimmer/plugins/logger" "github.com/iotaledger/goshimmer/plugins/messagelayer" "github.com/iotaledger/goshimmer/plugins/metrics" "github.com/iotaledger/goshimmer/plugins/portcheck" "github.com/iotaledger/goshimmer/plugins/profiling" + "github.com/iotaledger/goshimmer/plugins/sync" "github.com/iotaledger/hive.go/node" ) @@ -29,6 +32,9 @@ var PLUGINS = node.Plugins( autopeering.Plugin, messagelayer.Plugin, gossip.Plugin, + issuer.Plugin, + bootstrap.Plugin, + sync.Plugin, gracefulshutdown.Plugin, metrics.Plugin, drng.Plugin, diff --git a/plugins/bootstrap/plugin.go b/plugins/bootstrap/plugin.go new file mode 100644 index 00000000..d6ce2e31 --- /dev/null +++ b/plugins/bootstrap/plugin.go @@ -0,0 +1,69 @@ +package bootstrap + +import ( + "time" + + "github.com/iotaledger/goshimmer/packages/binary/spammer" + "github.com/iotaledger/goshimmer/packages/shutdown" + "github.com/iotaledger/goshimmer/plugins/config" + "github.com/iotaledger/goshimmer/plugins/issuer" + "github.com/iotaledger/goshimmer/plugins/sync" + "github.com/iotaledger/hive.go/daemon" + "github.com/iotaledger/hive.go/logger" + "github.com/iotaledger/hive.go/node" + flag "github.com/spf13/pflag" +) + +const ( + // PluginName is the plugin name of the bootstrap plugin. + PluginName = "Bootstrap" + // CfgBootstrapInitialIssuanceTimePeriodSec defines the initial time period of how long the node should be + // issuing messages when started in bootstrapping mode. If the value is set to -1, the issuance is continuous. + CfgBootstrapInitialIssuanceTimePeriodSec = "bootstrap.initialIssuance.timePeriodSec" + // the messages per second to issue when in bootstrapping mode. + initialIssuanceMPS = 1 + // the value which determines a continuous issuance of messages from the bootstrap plugin. + continuousIssuance = -1 +) + +func init() { + flag.Int(CfgBootstrapInitialIssuanceTimePeriodSec, -1, "the initial time period of how long the node should be issuing messages when started in bootstrapping mode. "+ + "if the value is set to -1, the issuance is continuous.") +} + +var ( + // Plugin is the plugin instance of the bootstrap plugin. + Plugin = node.NewPlugin(PluginName, node.Disabled, configure, run) + log *logger.Logger +) + +func configure(_ *node.Plugin) { + log = logger.NewLogger(PluginName) + // we're auto. synced if we start in bootstrapping mode + sync.OverwriteSyncedState(true) + log.Infof("starting node in bootstrapping mode") +} + +func run(_ *node.Plugin) { + + messageSpammer := spammer.New(issuer.IssuePayload) + issuancePeriodSec := config.Node.GetInt(CfgBootstrapInitialIssuanceTimePeriodSec) + issuancePeriod := time.Duration(issuancePeriodSec) * time.Second + + // issue messages on top of the genesis + _ = daemon.BackgroundWorker("Bootstrapping-Issuer", func(shutdownSignal <-chan struct{}) { + messageSpammer.Start(initialIssuanceMPS) + defer messageSpammer.Shutdown() + // don't stop issuing messages if in continuous mode + if issuancePeriodSec == continuousIssuance { + log.Info("continuously issuing bootstrapping messages") + <-shutdownSignal + return + } + log.Infof("issuing bootstrapping messages for %d seconds", issuancePeriodSec) + select { + case <-time.After(issuancePeriod): + case <-shutdownSignal: + } + }, shutdown.PriorityBootstrap) +} diff --git a/plugins/issuer/plugin.go b/plugins/issuer/plugin.go new file mode 100644 index 00000000..d34e6f36 --- /dev/null +++ b/plugins/issuer/plugin.go @@ -0,0 +1,30 @@ +package issuer + +import ( + "fmt" + + "github.com/iotaledger/goshimmer/packages/binary/messagelayer/message" + "github.com/iotaledger/goshimmer/packages/binary/messagelayer/payload" + "github.com/iotaledger/goshimmer/plugins/messagelayer" + "github.com/iotaledger/goshimmer/plugins/sync" + "github.com/iotaledger/hive.go/node" +) + +// PluginName is the name of the issuer plugin. +const PluginName = "Issuer" + +var ( + // Plugin is the plugin instance of the issuer plugin. + Plugin = node.NewPlugin(PluginName, node.Enabled, configure) +) + +func configure(_ *node.Plugin) {} + +// IssuePayload issues a payload to the message layer. +// If the node is not synchronized an error is returned. +func IssuePayload(payload payload.Payload) (*message.Message, error) { + if !sync.Synced() { + return nil, fmt.Errorf("can't issue payload: %w", sync.ErrNodeNotSynchronized) + } + return messagelayer.MessageFactory.IssuePayload(payload), nil +} diff --git a/plugins/sync/plugin.go b/plugins/sync/plugin.go new file mode 100644 index 00000000..c169aa62 --- /dev/null +++ b/plugins/sync/plugin.go @@ -0,0 +1,335 @@ +package sync + +import ( + "sync" + "time" + + "github.com/iotaledger/goshimmer/packages/binary/messagelayer/message" + "github.com/iotaledger/goshimmer/packages/binary/messagelayer/tangle" + "github.com/iotaledger/goshimmer/packages/shutdown" + "github.com/iotaledger/goshimmer/plugins/autopeering/local" + "github.com/iotaledger/goshimmer/plugins/config" + "github.com/iotaledger/goshimmer/plugins/gossip" + "github.com/iotaledger/goshimmer/plugins/messagelayer" + "github.com/iotaledger/hive.go/autopeering/peer" + "github.com/iotaledger/hive.go/daemon" + "github.com/iotaledger/hive.go/events" + "github.com/iotaledger/hive.go/logger" + "github.com/iotaledger/hive.go/node" + "github.com/iotaledger/hive.go/types" + "github.com/pkg/errors" + flag "github.com/spf13/pflag" + "go.uber.org/atomic" +) + +const ( + // PluginName is the plugin name of the sync plugin. + PluginName = "Sync" + // CfgSyncAnchorPointsCount defines the amount of anchor points to use to determine + // whether a node is synchronized. + CfgSyncAnchorPointsCount = "sync.anchorPointsCount" + // CfgSyncAnchorPointsCleanupAfterSec defines the amount of time which is allowed to pass between setting an anchor + // point and it not becoming solid (to clean its slot for another anchor point). It basically defines the expectancy + // of how long it should take for an anchor point to become solid. Even if this value is set too low, usually a node + // would eventually solidify collected anchor points. + CfgSyncAnchorPointsCleanupAfterSec = "sync.anchorPointsCleanupAfterSec" + // CfgSyncAnchorPointsCleanupIntervalSec defines the interval at which it is checked whether anchor points fall + // into the cleanup window. + CfgSyncAnchorPointsCleanupIntervalSec = "sync.anchorPointsCleanupIntervalSec" + // CfgSyncDesyncedIfNoMessageAfterSec defines the time period in which new messages must be received and if not + // the node is marked as desynced. + CfgSyncDesyncedIfNoMessageAfterSec = "sync.desyncedIfNoMessagesAfterSec" +) + +func init() { + flag.Int(CfgSyncAnchorPointsCount, 3, "the amount of anchor points to use to determine whether a node is synchronized") + flag.Int(CfgSyncDesyncedIfNoMessageAfterSec, 300, "the time period in seconds which sets the node as desynced if no new messages are received") + flag.Int(CfgSyncAnchorPointsCleanupIntervalSec, 10, "the interval at which it is checked whether anchor points fall into the cleanup window") + flag.Int(CfgSyncAnchorPointsCleanupAfterSec, 60, "the amount of time which is allowed to pass between setting an anchor point and it not becoming solid (to clean its slot for another anchor point)") +} + +var ( + // Plugin is the plugin instance of the sync plugin. + Plugin = node.NewPlugin(PluginName, node.Enabled, configure, run) + // ErrNodeNotSynchronized is returned when an operation can't be executed because + // the node is not synchronized. + ErrNodeNotSynchronized = errors.New("node is not synchronized") + // tells whether the node is synced or not. + synced atomic.Bool + log *logger.Logger +) + +// Synced tells whether the node is in a state we consider synchronized, meaning +// it has the relevant past and present message data. +func Synced() bool { + return synced.Load() +} + +// OverwriteSyncedState overwrites the synced state with the given value. +func OverwriteSyncedState(syncedOverwrite bool) { + synced.Store(syncedOverwrite) +} + +func configure(_ *node.Plugin) { + log = logger.NewLogger(PluginName) +} + +func run(_ *node.Plugin) { + // per default the node starts in a desynced state + if !Synced() { + monitorForSynchronization() + return + } + + // however, another plugin might want to overwrite the synced state (i.e. the bootstrap plugin) + // in order to start issuing messages + monitorForDesynchronization() +} + +// marks the node as synced and spawns the background worker to monitor desynchronization. +func markSynced() { + synced.Store(true) + monitorForDesynchronization() +} + +// marks the node as desynced and spawns the background worker to monitor synchronization. +func markDesynced() { + synced.Store(false) + monitorForSynchronization() +} + +// starts a background worker and event handlers to check whether the node is desynchronized by checking +// whether the node has no more peers or didn't receive any message in a given time period. +func monitorForDesynchronization() { + log.Info("monitoring for desynchronization") + + // monitors the peer count of the manager and sets the node as desynced if it has no more peers. + noPeers := make(chan types.Empty) + monitorPeerCountClosure := events.NewClosure(func(_ *peer.Peer) { + anyPeers := len(gossip.Manager().AllNeighbors()) > 0 + if anyPeers { + return + } + noPeers <- types.Empty{} + }) + + msgReceived := make(chan types.Empty, 1) + + monitorMessageInflowClosure := events.NewClosure(func(cachedMessage *message.CachedMessage, cachedMessageMetadata *tangle.CachedMessageMetadata) { + defer cachedMessage.Release() + defer cachedMessageMetadata.Release() + // ignore messages sent by the node itself + if local.GetInstance().LocalIdentity().PublicKey() == cachedMessage.Unwrap().IssuerPublicKey() { + return + } + select { + case msgReceived <- types.Empty{}: + default: + // via this default clause, a slow desync-monitor select-loop + // worker should not increase latency as it auto. falls through + } + }) + + daemon.BackgroundWorker("Desync-Monitor", func(shutdownSignal <-chan struct{}) { + gossip.Manager().Events().NeighborRemoved.Attach(monitorPeerCountClosure) + defer gossip.Manager().Events().NeighborRemoved.Detach(monitorPeerCountClosure) + messagelayer.Tangle.Events.MessageAttached.Attach(monitorMessageInflowClosure) + defer messagelayer.Tangle.Events.MessageAttached.Detach(monitorMessageInflowClosure) + + desyncedIfNoMessageInSec := config.Node.GetDuration(CfgSyncDesyncedIfNoMessageAfterSec) * time.Second + timer := time.NewTimer(desyncedIfNoMessageInSec) + for { + select { + + case <-msgReceived: + // we received a message, therefore reset the timer to check for message receives + if !timer.Stop() { + <-timer.C + } + // TODO: perhaps find a better way instead of constantly resetting the timer + timer.Reset(desyncedIfNoMessageInSec) + + case <-timer.C: + log.Infof("no message received in %d seconds, marking node as desynced", desyncedIfNoMessageInSec) + markDesynced() + return + + case <-noPeers: + log.Info("all peers have been lost, marking node as desynced") + markDesynced() + return + + case <-shutdownSignal: + return + } + } + }, shutdown.PrioritySynchronization) +} + +// starts a background worker and event handlers to check whether the node is synchronized by first collecting +// a set of newly received messages and then waiting for them to become solid. +func monitorForSynchronization() { + wantedAnchorPointsCount := config.Node.GetInt(CfgSyncAnchorPointsCount) + anchorPoints := newAnchorPoints(wantedAnchorPointsCount) + log.Infof("monitoring for synchronization, awaiting %d anchor point messages to become solid", wantedAnchorPointsCount) + + synced := make(chan types.Empty) + + initAnchorPointClosure := events.NewClosure(func(cachedMessage *message.CachedMessage, cachedMessageMetadata *tangle.CachedMessageMetadata) { + defer cachedMessage.Release() + defer cachedMessageMetadata.Release() + if addedAnchorID := initAnchorPoint(anchorPoints, cachedMessage.Unwrap()); addedAnchorID != nil { + anchorPoints.Lock() + defer anchorPoints.Unlock() + log.Infof("added message %s as anchor point (%d of %d collected)", addedAnchorID.String()[:10], anchorPoints.collectedCount(), anchorPoints.wanted) + } + }) + + checkAnchorPointSolidityClosure := events.NewClosure(func(cachedMessage *message.CachedMessage, cachedMessageMetadata *tangle.CachedMessageMetadata) { + defer cachedMessage.Release() + defer cachedMessageMetadata.Release() + allSolid, newSolidAnchorID := checkAnchorPointSolidity(anchorPoints, cachedMessage.Unwrap()) + + if newSolidAnchorID != nil { + log.Infof("anchor message %s has become solid", newSolidAnchorID.String()[:10]) + } + + if !allSolid { + return + } + synced <- types.Empty{} + }) + + daemon.BackgroundWorker("Sync-Monitor", func(shutdownSignal <-chan struct{}) { + messagelayer.Tangle.Events.MessageAttached.Attach(initAnchorPointClosure) + defer messagelayer.Tangle.Events.MessageAttached.Detach(initAnchorPointClosure) + messagelayer.Tangle.Events.MessageSolid.Attach(checkAnchorPointSolidityClosure) + defer messagelayer.Tangle.Events.MessageSolid.Detach(checkAnchorPointSolidityClosure) + + cleanupDelta := config.Node.GetDuration(CfgSyncAnchorPointsCleanupAfterSec) * time.Second + ticker := time.NewTimer(config.Node.GetDuration(CfgSyncAnchorPointsCleanupIntervalSec) * time.Second) + defer ticker.Stop() + for { + select { + case <-ticker.C: + anchorPoints.Lock() + for id, itGotAdded := range anchorPoints.ids { + if time.Since(itGotAdded) > cleanupDelta { + log.Infof("freeing anchor point slot of %s as it didn't become solid within %v", id.String()[:10], cleanupDelta) + delete(anchorPoints.ids, id) + } + } + anchorPoints.Unlock() + case <-shutdownSignal: + return + case <-synced: + log.Infof("all anchor messages have become solid, marking node as synced") + markSynced() + return + } + } + }, shutdown.PrioritySynchronization) +} + +// fills up the anchor points with newly attached messages which then are used to determine whether we are synchronized. +func initAnchorPoint(anchorPoints *anchorpoints, msg *message.Message) *message.Id { + if synced.Load() { + return nil + } + + anchorPoints.Lock() + defer anchorPoints.Unlock() + + // we don't need to add additional anchor points if the set was already filled once + if anchorPoints.wasFilled() { + return nil + } + + // as a rule, we don't consider messages attaching directly to genesis anchors + if msg.TrunkId() == message.EmptyId || msg.BranchId() == message.EmptyId { + return nil + } + + // add a new anchor point + id := msg.Id() + anchorPoints.add(id) + return &id +} + +// checks whether an anchor point message became solid. +// if all anchor points became solid, it sets the node's state to synchronized. +func checkAnchorPointSolidity(anchorPoints *anchorpoints, msg *message.Message) (bool, *message.Id) { + anchorPoints.Lock() + defer anchorPoints.Unlock() + + if synced.Load() || len(anchorPoints.ids) == 0 { + return false, nil + } + + // check whether an anchor message become solid + msgID := msg.Id() + if !anchorPoints.has(msgID) { + return false, nil + } + + // an anchor became solid + anchorPoints.markAsSolidified(msgID) + + if !anchorPoints.wereAllSolidified() { + return false, &msgID + } + + // all anchor points have become solid + return true, &msgID +} + +func newAnchorPoints(wantedAnchorPointsCount int) *anchorpoints { + return &anchorpoints{ + ids: make(map[message.Id]time.Time), + wanted: wantedAnchorPointsCount, + } +} + +// anchorpoints are a set of messages which we use to determine whether the node has become synchronized. +type anchorpoints struct { + sync.Mutex + // the ids of the anchor points with their addition time. + ids map[message.Id]time.Time + // the wanted amount of anchor points which should become solid. + wanted int + // how many anchor points have been solidified. + solidified int +} + +// adds the given message to the anchor points set. +func (ap *anchorpoints) add(id message.Id) { + ap.ids[id] = time.Now() +} + +func (ap *anchorpoints) has(id message.Id) bool { + _, has := ap.ids[id] + return has +} + +// marks the given anchor point as solidified which removes it from the set and bumps the solidified count. +func (ap *anchorpoints) markAsSolidified(id message.Id) { + delete(ap.ids, id) + ap.solidified++ +} + +// tells whether the anchor points set was filled at some point. +func (ap *anchorpoints) wasFilled() bool { + return ap.collectedCount() == ap.wanted +} + +// tells whether all anchor points have become solid. +func (ap *anchorpoints) wereAllSolidified() bool { + return ap.solidified == ap.wanted +} + +// tells the number of effectively collected anchor points. +func (ap *anchorpoints) collectedCount() int { + // since an anchor point potentially was solidified before the set became full, + // we need to incorporate that count too + return ap.solidified + len(ap.ids) +} diff --git a/plugins/webapi/data/plugin.go b/plugins/webapi/data/plugin.go index fdbb5f2f..83d86d26 100644 --- a/plugins/webapi/data/plugin.go +++ b/plugins/webapi/data/plugin.go @@ -4,7 +4,7 @@ import ( "net/http" "github.com/iotaledger/goshimmer/packages/binary/messagelayer/payload" - "github.com/iotaledger/goshimmer/plugins/messagelayer" + "github.com/iotaledger/goshimmer/plugins/issuer" "github.com/iotaledger/goshimmer/plugins/webapi" "github.com/iotaledger/hive.go/logger" "github.com/iotaledger/hive.go/node" @@ -35,7 +35,10 @@ func broadcastData(c echo.Context) error { } //TODO: to check max payload size allowed, if exceeding return an error - msg := messagelayer.MessageFactory.IssuePayload(payload.NewData(request.Data)) + msg, err := issuer.IssuePayload(payload.NewData(request.Data)) + if err != nil { + return c.JSON(http.StatusBadRequest, Response{Error: err.Error()}) + } return c.JSON(http.StatusOK, Response{ID: msg.Id().String()}) } diff --git a/plugins/webapi/drng/collectivebeacon/handler.go b/plugins/webapi/drng/collectivebeacon/handler.go index 11c8c26a..591dc482 100644 --- a/plugins/webapi/drng/collectivebeacon/handler.go +++ b/plugins/webapi/drng/collectivebeacon/handler.go @@ -4,7 +4,7 @@ import ( "net/http" "github.com/iotaledger/goshimmer/packages/binary/drng/subtypes/collectiveBeacon/payload" - "github.com/iotaledger/goshimmer/plugins/messagelayer" + "github.com/iotaledger/goshimmer/plugins/issuer" "github.com/iotaledger/hive.go/marshalutil" "github.com/labstack/echo" "github.com/labstack/gommon/log" @@ -25,7 +25,10 @@ func Handler(c echo.Context) error { return c.JSON(http.StatusBadRequest, Response{Error: "not a valid Collective Beacon payload"}) } - msg := messagelayer.MessageFactory.IssuePayload(parsedPayload) + msg, err := issuer.IssuePayload(parsedPayload) + if err != nil { + return c.JSON(http.StatusBadRequest, Response{Error: err.Error()}) + } return c.JSON(http.StatusOK, Response{ID: msg.Id().String()}) } diff --git a/plugins/webapi/info/plugin.go b/plugins/webapi/info/plugin.go index 2a858e75..b6bf4eb3 100644 --- a/plugins/webapi/info/plugin.go +++ b/plugins/webapi/info/plugin.go @@ -5,6 +5,7 @@ import ( "github.com/iotaledger/goshimmer/plugins/autopeering/local" "github.com/iotaledger/goshimmer/plugins/banner" + "github.com/iotaledger/goshimmer/plugins/sync" "github.com/iotaledger/goshimmer/plugins/webapi" "github.com/iotaledger/hive.go/node" "github.com/labstack/echo" @@ -24,6 +25,7 @@ func configure(_ *node.Plugin) { // e.g., // { // "version":"v0.2.0", +// "synchronized": true, // "identityID":"5bf4aa1d6c47e4ce", // "publickey":"CjUsn86jpFHWnSCx3NhWfU4Lk16mDdy1Hr7ERSTv3xn9", // "enabledplugins":[ @@ -70,6 +72,7 @@ func getInfo(c echo.Context) error { return c.JSON(http.StatusOK, Response{ Version: banner.AppVersion, + Synced: sync.Synced(), IdentityID: local.GetInstance().Identity.ID().String(), PublicKey: local.GetInstance().PublicKey().String(), EnabledPlugins: enabledPlugins, @@ -81,6 +84,8 @@ func getInfo(c echo.Context) error { type Response struct { // version of GoShimmer Version string `json:"version,omitempty"` + // whether the node is synchronized + Synced bool `json:"synced"` // identity ID of the node encoded in hex and truncated to its first 8 bytes IdentityID string `json:"identityID,omitempty"` // public key of the node encoded in base58 diff --git a/plugins/webapi/spammer/plugin.go b/plugins/webapi/spammer/plugin.go index c9f60122..e1dda8b4 100644 --- a/plugins/webapi/spammer/plugin.go +++ b/plugins/webapi/spammer/plugin.go @@ -3,7 +3,7 @@ package spammer import ( "github.com/iotaledger/goshimmer/packages/binary/spammer" "github.com/iotaledger/goshimmer/packages/shutdown" - "github.com/iotaledger/goshimmer/plugins/messagelayer" + "github.com/iotaledger/goshimmer/plugins/issuer" "github.com/iotaledger/goshimmer/plugins/webapi" "github.com/iotaledger/hive.go/daemon" "github.com/iotaledger/hive.go/node" @@ -18,7 +18,7 @@ const PluginName = "Spammer" var Plugin = node.NewPlugin(PluginName, node.Disabled, configure, run) func configure(plugin *node.Plugin) { - messageSpammer = spammer.New(messagelayer.MessageFactory) + messageSpammer = spammer.New(issuer.IssuePayload) webapi.Server.GET("spammer", handleRequest) } diff --git a/tools/docker-network/docker-compose.yml b/tools/docker-network/docker-compose.yml index c1ec3637..70481a19 100644 --- a/tools/docker-network/docker-compose.yml +++ b/tools/docker-network/docker-compose.yml @@ -17,6 +17,7 @@ services: - integration-test peer_master: + command: --node.enablePlugins=bootstrap container_name: peer_master image: iotaledger/goshimmer build: diff --git a/tools/integration-tests/tester/docker-compose.yml b/tools/integration-tests/tester/docker-compose.yml index 62bfbd4c..72438dcd 100644 --- a/tools/integration-tests/tester/docker-compose.yml +++ b/tools/integration-tests/tester/docker-compose.yml @@ -7,7 +7,6 @@ services: working_dir: /go/src/github.com/iotaledger/goshimmer/tools/integration-tests/tester entrypoint: go test ./tests -v -mod=readonly volumes: - - /var/run/docker.sock:/var/run/docker.sock:ro - - ../../..:/go/src/github.com/iotaledger/goshimmer:ro - - ../logs:/tmp/logs - + - /var/run/docker.sock:/var/run/docker.sock:ro + - ../../..:/go/src/github.com/iotaledger/goshimmer:ro + - ../logs:/tmp/logs \ No newline at end of file diff --git a/tools/integration-tests/tester/framework/docker.go b/tools/integration-tests/tester/framework/docker.go index 1851325c..19b37ce2 100644 --- a/tools/integration-tests/tester/framework/docker.go +++ b/tools/integration-tests/tester/framework/docker.go @@ -69,7 +69,7 @@ func (d *DockerContainer) CreateGoShimmerEntryNode(name string, seed string) err } // CreateGoShimmerPeer creates a new container with the GoShimmer peer's configuration. -func (d *DockerContainer) CreateGoShimmerPeer(name string, seed string, entryNodeHost string, entryNodePublicKey string) error { +func (d *DockerContainer) CreateGoShimmerPeer(name string, seed string, entryNodeHost string, entryNodePublicKey string, bootstrap bool) error { // configure GoShimmer container instance containerConfig := &container.Config{ Image: "iotaledger/goshimmer", @@ -78,6 +78,12 @@ func (d *DockerContainer) CreateGoShimmerPeer(name string, seed string, entryNod }, Cmd: strslice.StrSlice{ fmt.Sprintf("--node.disablePlugins=%s", disabledPluginsPeer), + fmt.Sprintf("--node.enablePlugins=%s", func() string { + if bootstrap { + return "Bootstrap" + } + return "" + }()), "--webapi.bindAddress=0.0.0.0:8080", fmt.Sprintf("--autopeering.seed=%s", seed), fmt.Sprintf("--autopeering.entryNodes=%s@%s:14626", entryNodePublicKey, entryNodeHost), diff --git a/tools/integration-tests/tester/framework/framework.go b/tools/integration-tests/tester/framework/framework.go index 71d81786..0770c3a5 100644 --- a/tools/integration-tests/tester/framework/framework.go +++ b/tools/integration-tests/tester/framework/framework.go @@ -55,6 +55,7 @@ func newFramework() (*Framework, error) { // CreateNetwork creates and returns a (Docker) Network that contains `peers` GoShimmer nodes. // It waits for the peers to autopeer until the minimum neighbors criteria is met for every peer. +// The first peer automatically starts with the bootstrap plugin enabled. func (f *Framework) CreateNetwork(name string, peers int, minimumNeighbors int) (*Network, error) { network, err := newNetwork(f.dockerClient, strings.ToLower(name), f.tester) if err != nil { @@ -68,8 +69,8 @@ func (f *Framework) CreateNetwork(name string, peers int, minimumNeighbors int) // create peers/GoShimmer nodes for i := 0; i < peers; i++ { - _, err = network.CreatePeer() - if err != nil { + bootstrap := i == 0 + if _, err = network.CreatePeer(bootstrap); err != nil { return nil, err } } diff --git a/tools/integration-tests/tester/framework/network.go b/tools/integration-tests/tester/framework/network.go index 3b4c9480..0a0c875f 100644 --- a/tools/integration-tests/tester/framework/network.go +++ b/tools/integration-tests/tester/framework/network.go @@ -82,7 +82,8 @@ func (n *Network) createEntryNode() error { } // CreatePeer creates a new peer/GoShimmer node in the network and returns it. -func (n *Network) CreatePeer() (*Peer, error) { +// Passing bootstrap true enables the bootstrap plugin on the given peer. +func (n *Network) CreatePeer(bootstrap bool) (*Peer, error) { name := n.namePrefix(fmt.Sprintf("%s%d", containerNameReplica, len(n.peers))) // create identity @@ -94,7 +95,7 @@ func (n *Network) CreatePeer() (*Peer, error) { // create Docker container container := NewDockerContainer(n.dockerClient) - err = container.CreateGoShimmerPeer(name, seed, n.namePrefix(containerNameEntryNode), n.entryNodePublicKey()) + err = container.CreateGoShimmerPeer(name, seed, n.namePrefix(containerNameEntryNode), n.entryNodePublicKey(), bootstrap) if err != nil { return nil, err } diff --git a/tools/integration-tests/tester/tests/synchronization_test.go b/tools/integration-tests/tester/tests/synchronization_test.go index 35ecc45f..43974cfa 100644 --- a/tools/integration-tests/tester/tests/synchronization_test.go +++ b/tools/integration-tests/tester/tests/synchronization_test.go @@ -18,6 +18,9 @@ func TestNodeSynchronization(t *testing.T) { require.NoError(t, err) defer n.Shutdown() + // wait for peers to change their state to synchronized + time.Sleep(5 * time.Second) + numMessages := 100 idsMap := make(map[string]MessageSent, numMessages) ids := make([]string, numMessages) @@ -40,7 +43,7 @@ func TestNodeSynchronization(t *testing.T) { checkForMessageIds(t, n.Peers(), ids, idsMap) // spawn peer without knowledge of previous messages - newPeer, err := n.CreatePeer() + newPeer, err := n.CreatePeer(false) require.NoError(t, err) err = n.WaitForAutopeering(3) require.NoError(t, err) @@ -91,6 +94,11 @@ func sendDataMessage(t *testing.T, peer *framework.Peer, data []byte, number int func checkForMessageIds(t *testing.T, peers []*framework.Peer, ids []string, idsMap map[string]MessageSent) { for _, peer := range peers { + // check that the peer sees itself as synchronized + info, err := peer.Info() + require.NoError(t, err) + require.True(t, info.Synced) + resp, err := peer.FindMessageByID(ids) require.NoError(t, err) -- GitLab