diff --git a/go.mod b/go.mod index 19cc19dc1878918b49160cc625a34f6202935e48..db8945ff31cdc050226e9970d464c7f3ee210a07 100644 --- a/go.mod +++ b/go.mod @@ -17,7 +17,7 @@ require ( github.com/magiconair/properties v1.8.1 github.com/mr-tron/base58 v1.1.3 github.com/panjf2000/ants/v2 v2.2.2 - github.com/pkg/errors v0.9.1 // indirect + github.com/pkg/errors v0.9.1 github.com/spf13/pflag v1.0.5 github.com/spf13/viper v1.6.2 github.com/stretchr/testify v1.5.1 diff --git a/packages/binary/spammer/spammer.go b/packages/binary/spammer/spammer.go index ae607a9a2c243cea89c8e24651cde5c58b4d6ff4..df8775505c4201ea4d0a3cad917bf9e54261c5ca 100644 --- a/packages/binary/spammer/spammer.go +++ b/packages/binary/spammer/spammer.go @@ -6,24 +6,26 @@ import ( "github.com/iotaledger/hive.go/types" - "github.com/iotaledger/goshimmer/packages/binary/messagelayer/messagefactory" + "github.com/iotaledger/goshimmer/packages/binary/messagelayer/message" "github.com/iotaledger/goshimmer/packages/binary/messagelayer/payload" ) +// IssuePayloadFunc is a function which issues a payload. +type IssuePayloadFunc = func(payload payload.Payload) (*message.Message, error) + // Spammer spams messages with a static data payload. type Spammer struct { - messageFactory *messagefactory.MessageFactory + issuePayloadFunc IssuePayloadFunc processId int64 shutdownSignal chan types.Empty } // New creates a new spammer. -func New(messageFactory *messagefactory.MessageFactory) *Spammer { +func New(issuePayloadFunc IssuePayloadFunc) *Spammer { return &Spammer{ - messageFactory: messageFactory, - - shutdownSignal: make(chan types.Empty), + issuePayloadFunc: issuePayloadFunc, + shutdownSignal: make(chan types.Empty), } } @@ -46,7 +48,8 @@ func (spammer *Spammer) run(mps int, processId int64) { return } - spammer.messageFactory.IssuePayload(payload.NewData([]byte("SPAM"))) + // we don't care about errors or the actual issued message + _, _ = spammer.issuePayloadFunc(payload.NewData([]byte("SPAM"))) currentSentCounter++ diff --git a/packages/shutdown/order.go b/packages/shutdown/order.go index 319a8b06ff03b20445c61aca17a5340a55ee8876..ff483327a38a2fa48dcfa7b1417e07ca4c050c2b 100644 --- a/packages/shutdown/order.go +++ b/packages/shutdown/order.go @@ -11,7 +11,8 @@ const ( PriorityGossip PriorityWebAPI PriorityDashboard - PriorityGraph + PrioritySynchronization + PriorityBootstrap PrioritySpammer PriorityBadgerGarbageCollection ) diff --git a/pluginmgr/core/plugins.go b/pluginmgr/core/plugins.go index 2ad52fcc444ef1d886fc21d5cbe944d03ce4a400..e4ae7af0836d83effc2c193b30fe5049c0f1e0b7 100644 --- a/pluginmgr/core/plugins.go +++ b/pluginmgr/core/plugins.go @@ -3,17 +3,20 @@ package core import ( "github.com/iotaledger/goshimmer/plugins/autopeering" "github.com/iotaledger/goshimmer/plugins/banner" + "github.com/iotaledger/goshimmer/plugins/bootstrap" "github.com/iotaledger/goshimmer/plugins/cli" "github.com/iotaledger/goshimmer/plugins/config" "github.com/iotaledger/goshimmer/plugins/database" "github.com/iotaledger/goshimmer/plugins/drng" "github.com/iotaledger/goshimmer/plugins/gossip" "github.com/iotaledger/goshimmer/plugins/gracefulshutdown" + "github.com/iotaledger/goshimmer/plugins/issuer" "github.com/iotaledger/goshimmer/plugins/logger" "github.com/iotaledger/goshimmer/plugins/messagelayer" "github.com/iotaledger/goshimmer/plugins/metrics" "github.com/iotaledger/goshimmer/plugins/portcheck" "github.com/iotaledger/goshimmer/plugins/profiling" + "github.com/iotaledger/goshimmer/plugins/sync" "github.com/iotaledger/hive.go/node" ) @@ -29,6 +32,9 @@ var PLUGINS = node.Plugins( autopeering.Plugin, messagelayer.Plugin, gossip.Plugin, + issuer.Plugin, + bootstrap.Plugin, + sync.Plugin, gracefulshutdown.Plugin, metrics.Plugin, drng.Plugin, diff --git a/plugins/bootstrap/plugin.go b/plugins/bootstrap/plugin.go new file mode 100644 index 0000000000000000000000000000000000000000..d6ce2e3108c2e91689eba57e094ec9d843e7d960 --- /dev/null +++ b/plugins/bootstrap/plugin.go @@ -0,0 +1,69 @@ +package bootstrap + +import ( + "time" + + "github.com/iotaledger/goshimmer/packages/binary/spammer" + "github.com/iotaledger/goshimmer/packages/shutdown" + "github.com/iotaledger/goshimmer/plugins/config" + "github.com/iotaledger/goshimmer/plugins/issuer" + "github.com/iotaledger/goshimmer/plugins/sync" + "github.com/iotaledger/hive.go/daemon" + "github.com/iotaledger/hive.go/logger" + "github.com/iotaledger/hive.go/node" + flag "github.com/spf13/pflag" +) + +const ( + // PluginName is the plugin name of the bootstrap plugin. + PluginName = "Bootstrap" + // CfgBootstrapInitialIssuanceTimePeriodSec defines the initial time period of how long the node should be + // issuing messages when started in bootstrapping mode. If the value is set to -1, the issuance is continuous. + CfgBootstrapInitialIssuanceTimePeriodSec = "bootstrap.initialIssuance.timePeriodSec" + // the messages per second to issue when in bootstrapping mode. + initialIssuanceMPS = 1 + // the value which determines a continuous issuance of messages from the bootstrap plugin. + continuousIssuance = -1 +) + +func init() { + flag.Int(CfgBootstrapInitialIssuanceTimePeriodSec, -1, "the initial time period of how long the node should be issuing messages when started in bootstrapping mode. "+ + "if the value is set to -1, the issuance is continuous.") +} + +var ( + // Plugin is the plugin instance of the bootstrap plugin. + Plugin = node.NewPlugin(PluginName, node.Disabled, configure, run) + log *logger.Logger +) + +func configure(_ *node.Plugin) { + log = logger.NewLogger(PluginName) + // we're auto. synced if we start in bootstrapping mode + sync.OverwriteSyncedState(true) + log.Infof("starting node in bootstrapping mode") +} + +func run(_ *node.Plugin) { + + messageSpammer := spammer.New(issuer.IssuePayload) + issuancePeriodSec := config.Node.GetInt(CfgBootstrapInitialIssuanceTimePeriodSec) + issuancePeriod := time.Duration(issuancePeriodSec) * time.Second + + // issue messages on top of the genesis + _ = daemon.BackgroundWorker("Bootstrapping-Issuer", func(shutdownSignal <-chan struct{}) { + messageSpammer.Start(initialIssuanceMPS) + defer messageSpammer.Shutdown() + // don't stop issuing messages if in continuous mode + if issuancePeriodSec == continuousIssuance { + log.Info("continuously issuing bootstrapping messages") + <-shutdownSignal + return + } + log.Infof("issuing bootstrapping messages for %d seconds", issuancePeriodSec) + select { + case <-time.After(issuancePeriod): + case <-shutdownSignal: + } + }, shutdown.PriorityBootstrap) +} diff --git a/plugins/issuer/plugin.go b/plugins/issuer/plugin.go new file mode 100644 index 0000000000000000000000000000000000000000..d34e6f3621499a77771f55653ff9b6f29254b6fb --- /dev/null +++ b/plugins/issuer/plugin.go @@ -0,0 +1,30 @@ +package issuer + +import ( + "fmt" + + "github.com/iotaledger/goshimmer/packages/binary/messagelayer/message" + "github.com/iotaledger/goshimmer/packages/binary/messagelayer/payload" + "github.com/iotaledger/goshimmer/plugins/messagelayer" + "github.com/iotaledger/goshimmer/plugins/sync" + "github.com/iotaledger/hive.go/node" +) + +// PluginName is the name of the issuer plugin. +const PluginName = "Issuer" + +var ( + // Plugin is the plugin instance of the issuer plugin. + Plugin = node.NewPlugin(PluginName, node.Enabled, configure) +) + +func configure(_ *node.Plugin) {} + +// IssuePayload issues a payload to the message layer. +// If the node is not synchronized an error is returned. +func IssuePayload(payload payload.Payload) (*message.Message, error) { + if !sync.Synced() { + return nil, fmt.Errorf("can't issue payload: %w", sync.ErrNodeNotSynchronized) + } + return messagelayer.MessageFactory.IssuePayload(payload), nil +} diff --git a/plugins/sync/plugin.go b/plugins/sync/plugin.go new file mode 100644 index 0000000000000000000000000000000000000000..c169aa6228a209ab33b44ad31f58ca6c63adcf75 --- /dev/null +++ b/plugins/sync/plugin.go @@ -0,0 +1,335 @@ +package sync + +import ( + "sync" + "time" + + "github.com/iotaledger/goshimmer/packages/binary/messagelayer/message" + "github.com/iotaledger/goshimmer/packages/binary/messagelayer/tangle" + "github.com/iotaledger/goshimmer/packages/shutdown" + "github.com/iotaledger/goshimmer/plugins/autopeering/local" + "github.com/iotaledger/goshimmer/plugins/config" + "github.com/iotaledger/goshimmer/plugins/gossip" + "github.com/iotaledger/goshimmer/plugins/messagelayer" + "github.com/iotaledger/hive.go/autopeering/peer" + "github.com/iotaledger/hive.go/daemon" + "github.com/iotaledger/hive.go/events" + "github.com/iotaledger/hive.go/logger" + "github.com/iotaledger/hive.go/node" + "github.com/iotaledger/hive.go/types" + "github.com/pkg/errors" + flag "github.com/spf13/pflag" + "go.uber.org/atomic" +) + +const ( + // PluginName is the plugin name of the sync plugin. + PluginName = "Sync" + // CfgSyncAnchorPointsCount defines the amount of anchor points to use to determine + // whether a node is synchronized. + CfgSyncAnchorPointsCount = "sync.anchorPointsCount" + // CfgSyncAnchorPointsCleanupAfterSec defines the amount of time which is allowed to pass between setting an anchor + // point and it not becoming solid (to clean its slot for another anchor point). It basically defines the expectancy + // of how long it should take for an anchor point to become solid. Even if this value is set too low, usually a node + // would eventually solidify collected anchor points. + CfgSyncAnchorPointsCleanupAfterSec = "sync.anchorPointsCleanupAfterSec" + // CfgSyncAnchorPointsCleanupIntervalSec defines the interval at which it is checked whether anchor points fall + // into the cleanup window. + CfgSyncAnchorPointsCleanupIntervalSec = "sync.anchorPointsCleanupIntervalSec" + // CfgSyncDesyncedIfNoMessageAfterSec defines the time period in which new messages must be received and if not + // the node is marked as desynced. + CfgSyncDesyncedIfNoMessageAfterSec = "sync.desyncedIfNoMessagesAfterSec" +) + +func init() { + flag.Int(CfgSyncAnchorPointsCount, 3, "the amount of anchor points to use to determine whether a node is synchronized") + flag.Int(CfgSyncDesyncedIfNoMessageAfterSec, 300, "the time period in seconds which sets the node as desynced if no new messages are received") + flag.Int(CfgSyncAnchorPointsCleanupIntervalSec, 10, "the interval at which it is checked whether anchor points fall into the cleanup window") + flag.Int(CfgSyncAnchorPointsCleanupAfterSec, 60, "the amount of time which is allowed to pass between setting an anchor point and it not becoming solid (to clean its slot for another anchor point)") +} + +var ( + // Plugin is the plugin instance of the sync plugin. + Plugin = node.NewPlugin(PluginName, node.Enabled, configure, run) + // ErrNodeNotSynchronized is returned when an operation can't be executed because + // the node is not synchronized. + ErrNodeNotSynchronized = errors.New("node is not synchronized") + // tells whether the node is synced or not. + synced atomic.Bool + log *logger.Logger +) + +// Synced tells whether the node is in a state we consider synchronized, meaning +// it has the relevant past and present message data. +func Synced() bool { + return synced.Load() +} + +// OverwriteSyncedState overwrites the synced state with the given value. +func OverwriteSyncedState(syncedOverwrite bool) { + synced.Store(syncedOverwrite) +} + +func configure(_ *node.Plugin) { + log = logger.NewLogger(PluginName) +} + +func run(_ *node.Plugin) { + // per default the node starts in a desynced state + if !Synced() { + monitorForSynchronization() + return + } + + // however, another plugin might want to overwrite the synced state (i.e. the bootstrap plugin) + // in order to start issuing messages + monitorForDesynchronization() +} + +// marks the node as synced and spawns the background worker to monitor desynchronization. +func markSynced() { + synced.Store(true) + monitorForDesynchronization() +} + +// marks the node as desynced and spawns the background worker to monitor synchronization. +func markDesynced() { + synced.Store(false) + monitorForSynchronization() +} + +// starts a background worker and event handlers to check whether the node is desynchronized by checking +// whether the node has no more peers or didn't receive any message in a given time period. +func monitorForDesynchronization() { + log.Info("monitoring for desynchronization") + + // monitors the peer count of the manager and sets the node as desynced if it has no more peers. + noPeers := make(chan types.Empty) + monitorPeerCountClosure := events.NewClosure(func(_ *peer.Peer) { + anyPeers := len(gossip.Manager().AllNeighbors()) > 0 + if anyPeers { + return + } + noPeers <- types.Empty{} + }) + + msgReceived := make(chan types.Empty, 1) + + monitorMessageInflowClosure := events.NewClosure(func(cachedMessage *message.CachedMessage, cachedMessageMetadata *tangle.CachedMessageMetadata) { + defer cachedMessage.Release() + defer cachedMessageMetadata.Release() + // ignore messages sent by the node itself + if local.GetInstance().LocalIdentity().PublicKey() == cachedMessage.Unwrap().IssuerPublicKey() { + return + } + select { + case msgReceived <- types.Empty{}: + default: + // via this default clause, a slow desync-monitor select-loop + // worker should not increase latency as it auto. falls through + } + }) + + daemon.BackgroundWorker("Desync-Monitor", func(shutdownSignal <-chan struct{}) { + gossip.Manager().Events().NeighborRemoved.Attach(monitorPeerCountClosure) + defer gossip.Manager().Events().NeighborRemoved.Detach(monitorPeerCountClosure) + messagelayer.Tangle.Events.MessageAttached.Attach(monitorMessageInflowClosure) + defer messagelayer.Tangle.Events.MessageAttached.Detach(monitorMessageInflowClosure) + + desyncedIfNoMessageInSec := config.Node.GetDuration(CfgSyncDesyncedIfNoMessageAfterSec) * time.Second + timer := time.NewTimer(desyncedIfNoMessageInSec) + for { + select { + + case <-msgReceived: + // we received a message, therefore reset the timer to check for message receives + if !timer.Stop() { + <-timer.C + } + // TODO: perhaps find a better way instead of constantly resetting the timer + timer.Reset(desyncedIfNoMessageInSec) + + case <-timer.C: + log.Infof("no message received in %d seconds, marking node as desynced", desyncedIfNoMessageInSec) + markDesynced() + return + + case <-noPeers: + log.Info("all peers have been lost, marking node as desynced") + markDesynced() + return + + case <-shutdownSignal: + return + } + } + }, shutdown.PrioritySynchronization) +} + +// starts a background worker and event handlers to check whether the node is synchronized by first collecting +// a set of newly received messages and then waiting for them to become solid. +func monitorForSynchronization() { + wantedAnchorPointsCount := config.Node.GetInt(CfgSyncAnchorPointsCount) + anchorPoints := newAnchorPoints(wantedAnchorPointsCount) + log.Infof("monitoring for synchronization, awaiting %d anchor point messages to become solid", wantedAnchorPointsCount) + + synced := make(chan types.Empty) + + initAnchorPointClosure := events.NewClosure(func(cachedMessage *message.CachedMessage, cachedMessageMetadata *tangle.CachedMessageMetadata) { + defer cachedMessage.Release() + defer cachedMessageMetadata.Release() + if addedAnchorID := initAnchorPoint(anchorPoints, cachedMessage.Unwrap()); addedAnchorID != nil { + anchorPoints.Lock() + defer anchorPoints.Unlock() + log.Infof("added message %s as anchor point (%d of %d collected)", addedAnchorID.String()[:10], anchorPoints.collectedCount(), anchorPoints.wanted) + } + }) + + checkAnchorPointSolidityClosure := events.NewClosure(func(cachedMessage *message.CachedMessage, cachedMessageMetadata *tangle.CachedMessageMetadata) { + defer cachedMessage.Release() + defer cachedMessageMetadata.Release() + allSolid, newSolidAnchorID := checkAnchorPointSolidity(anchorPoints, cachedMessage.Unwrap()) + + if newSolidAnchorID != nil { + log.Infof("anchor message %s has become solid", newSolidAnchorID.String()[:10]) + } + + if !allSolid { + return + } + synced <- types.Empty{} + }) + + daemon.BackgroundWorker("Sync-Monitor", func(shutdownSignal <-chan struct{}) { + messagelayer.Tangle.Events.MessageAttached.Attach(initAnchorPointClosure) + defer messagelayer.Tangle.Events.MessageAttached.Detach(initAnchorPointClosure) + messagelayer.Tangle.Events.MessageSolid.Attach(checkAnchorPointSolidityClosure) + defer messagelayer.Tangle.Events.MessageSolid.Detach(checkAnchorPointSolidityClosure) + + cleanupDelta := config.Node.GetDuration(CfgSyncAnchorPointsCleanupAfterSec) * time.Second + ticker := time.NewTimer(config.Node.GetDuration(CfgSyncAnchorPointsCleanupIntervalSec) * time.Second) + defer ticker.Stop() + for { + select { + case <-ticker.C: + anchorPoints.Lock() + for id, itGotAdded := range anchorPoints.ids { + if time.Since(itGotAdded) > cleanupDelta { + log.Infof("freeing anchor point slot of %s as it didn't become solid within %v", id.String()[:10], cleanupDelta) + delete(anchorPoints.ids, id) + } + } + anchorPoints.Unlock() + case <-shutdownSignal: + return + case <-synced: + log.Infof("all anchor messages have become solid, marking node as synced") + markSynced() + return + } + } + }, shutdown.PrioritySynchronization) +} + +// fills up the anchor points with newly attached messages which then are used to determine whether we are synchronized. +func initAnchorPoint(anchorPoints *anchorpoints, msg *message.Message) *message.Id { + if synced.Load() { + return nil + } + + anchorPoints.Lock() + defer anchorPoints.Unlock() + + // we don't need to add additional anchor points if the set was already filled once + if anchorPoints.wasFilled() { + return nil + } + + // as a rule, we don't consider messages attaching directly to genesis anchors + if msg.TrunkId() == message.EmptyId || msg.BranchId() == message.EmptyId { + return nil + } + + // add a new anchor point + id := msg.Id() + anchorPoints.add(id) + return &id +} + +// checks whether an anchor point message became solid. +// if all anchor points became solid, it sets the node's state to synchronized. +func checkAnchorPointSolidity(anchorPoints *anchorpoints, msg *message.Message) (bool, *message.Id) { + anchorPoints.Lock() + defer anchorPoints.Unlock() + + if synced.Load() || len(anchorPoints.ids) == 0 { + return false, nil + } + + // check whether an anchor message become solid + msgID := msg.Id() + if !anchorPoints.has(msgID) { + return false, nil + } + + // an anchor became solid + anchorPoints.markAsSolidified(msgID) + + if !anchorPoints.wereAllSolidified() { + return false, &msgID + } + + // all anchor points have become solid + return true, &msgID +} + +func newAnchorPoints(wantedAnchorPointsCount int) *anchorpoints { + return &anchorpoints{ + ids: make(map[message.Id]time.Time), + wanted: wantedAnchorPointsCount, + } +} + +// anchorpoints are a set of messages which we use to determine whether the node has become synchronized. +type anchorpoints struct { + sync.Mutex + // the ids of the anchor points with their addition time. + ids map[message.Id]time.Time + // the wanted amount of anchor points which should become solid. + wanted int + // how many anchor points have been solidified. + solidified int +} + +// adds the given message to the anchor points set. +func (ap *anchorpoints) add(id message.Id) { + ap.ids[id] = time.Now() +} + +func (ap *anchorpoints) has(id message.Id) bool { + _, has := ap.ids[id] + return has +} + +// marks the given anchor point as solidified which removes it from the set and bumps the solidified count. +func (ap *anchorpoints) markAsSolidified(id message.Id) { + delete(ap.ids, id) + ap.solidified++ +} + +// tells whether the anchor points set was filled at some point. +func (ap *anchorpoints) wasFilled() bool { + return ap.collectedCount() == ap.wanted +} + +// tells whether all anchor points have become solid. +func (ap *anchorpoints) wereAllSolidified() bool { + return ap.solidified == ap.wanted +} + +// tells the number of effectively collected anchor points. +func (ap *anchorpoints) collectedCount() int { + // since an anchor point potentially was solidified before the set became full, + // we need to incorporate that count too + return ap.solidified + len(ap.ids) +} diff --git a/plugins/webapi/data/plugin.go b/plugins/webapi/data/plugin.go index fdbb5f2f2b7fc1f1b44e46a6158af9226c75d00c..83d86d267015c60a4254b340f2796ac6812c5e0e 100644 --- a/plugins/webapi/data/plugin.go +++ b/plugins/webapi/data/plugin.go @@ -4,7 +4,7 @@ import ( "net/http" "github.com/iotaledger/goshimmer/packages/binary/messagelayer/payload" - "github.com/iotaledger/goshimmer/plugins/messagelayer" + "github.com/iotaledger/goshimmer/plugins/issuer" "github.com/iotaledger/goshimmer/plugins/webapi" "github.com/iotaledger/hive.go/logger" "github.com/iotaledger/hive.go/node" @@ -35,7 +35,10 @@ func broadcastData(c echo.Context) error { } //TODO: to check max payload size allowed, if exceeding return an error - msg := messagelayer.MessageFactory.IssuePayload(payload.NewData(request.Data)) + msg, err := issuer.IssuePayload(payload.NewData(request.Data)) + if err != nil { + return c.JSON(http.StatusBadRequest, Response{Error: err.Error()}) + } return c.JSON(http.StatusOK, Response{ID: msg.Id().String()}) } diff --git a/plugins/webapi/drng/collectivebeacon/handler.go b/plugins/webapi/drng/collectivebeacon/handler.go index 11c8c26a6ff33d5eb50669600e695ec0e23815c5..591dc4820ca55c0fa61e6f6ba458ed1f9cc10a03 100644 --- a/plugins/webapi/drng/collectivebeacon/handler.go +++ b/plugins/webapi/drng/collectivebeacon/handler.go @@ -4,7 +4,7 @@ import ( "net/http" "github.com/iotaledger/goshimmer/packages/binary/drng/subtypes/collectiveBeacon/payload" - "github.com/iotaledger/goshimmer/plugins/messagelayer" + "github.com/iotaledger/goshimmer/plugins/issuer" "github.com/iotaledger/hive.go/marshalutil" "github.com/labstack/echo" "github.com/labstack/gommon/log" @@ -25,7 +25,10 @@ func Handler(c echo.Context) error { return c.JSON(http.StatusBadRequest, Response{Error: "not a valid Collective Beacon payload"}) } - msg := messagelayer.MessageFactory.IssuePayload(parsedPayload) + msg, err := issuer.IssuePayload(parsedPayload) + if err != nil { + return c.JSON(http.StatusBadRequest, Response{Error: err.Error()}) + } return c.JSON(http.StatusOK, Response{ID: msg.Id().String()}) } diff --git a/plugins/webapi/info/plugin.go b/plugins/webapi/info/plugin.go index 2a858e75cd0e690dcebc4ddc91ee5e9f016176c6..b6bf4eb3901b6d98e64a39d1d3599e0fbc5f9c2d 100644 --- a/plugins/webapi/info/plugin.go +++ b/plugins/webapi/info/plugin.go @@ -5,6 +5,7 @@ import ( "github.com/iotaledger/goshimmer/plugins/autopeering/local" "github.com/iotaledger/goshimmer/plugins/banner" + "github.com/iotaledger/goshimmer/plugins/sync" "github.com/iotaledger/goshimmer/plugins/webapi" "github.com/iotaledger/hive.go/node" "github.com/labstack/echo" @@ -24,6 +25,7 @@ func configure(_ *node.Plugin) { // e.g., // { // "version":"v0.2.0", +// "synchronized": true, // "identityID":"5bf4aa1d6c47e4ce", // "publickey":"CjUsn86jpFHWnSCx3NhWfU4Lk16mDdy1Hr7ERSTv3xn9", // "enabledplugins":[ @@ -70,6 +72,7 @@ func getInfo(c echo.Context) error { return c.JSON(http.StatusOK, Response{ Version: banner.AppVersion, + Synced: sync.Synced(), IdentityID: local.GetInstance().Identity.ID().String(), PublicKey: local.GetInstance().PublicKey().String(), EnabledPlugins: enabledPlugins, @@ -81,6 +84,8 @@ func getInfo(c echo.Context) error { type Response struct { // version of GoShimmer Version string `json:"version,omitempty"` + // whether the node is synchronized + Synced bool `json:"synced"` // identity ID of the node encoded in hex and truncated to its first 8 bytes IdentityID string `json:"identityID,omitempty"` // public key of the node encoded in base58 diff --git a/plugins/webapi/spammer/plugin.go b/plugins/webapi/spammer/plugin.go index c9f601228a7ff2765bbfffcda0b3cfd6b708d2df..e1dda8b4ad687f402a37b4b205ead151d1675443 100644 --- a/plugins/webapi/spammer/plugin.go +++ b/plugins/webapi/spammer/plugin.go @@ -3,7 +3,7 @@ package spammer import ( "github.com/iotaledger/goshimmer/packages/binary/spammer" "github.com/iotaledger/goshimmer/packages/shutdown" - "github.com/iotaledger/goshimmer/plugins/messagelayer" + "github.com/iotaledger/goshimmer/plugins/issuer" "github.com/iotaledger/goshimmer/plugins/webapi" "github.com/iotaledger/hive.go/daemon" "github.com/iotaledger/hive.go/node" @@ -18,7 +18,7 @@ const PluginName = "Spammer" var Plugin = node.NewPlugin(PluginName, node.Disabled, configure, run) func configure(plugin *node.Plugin) { - messageSpammer = spammer.New(messagelayer.MessageFactory) + messageSpammer = spammer.New(issuer.IssuePayload) webapi.Server.GET("spammer", handleRequest) } diff --git a/tools/docker-network/docker-compose.yml b/tools/docker-network/docker-compose.yml index c1ec36372a23181d623110348c5d642e815d64aa..70481a1983f447d786cbda1d7521e1ac800d3c46 100644 --- a/tools/docker-network/docker-compose.yml +++ b/tools/docker-network/docker-compose.yml @@ -17,6 +17,7 @@ services: - integration-test peer_master: + command: --node.enablePlugins=bootstrap container_name: peer_master image: iotaledger/goshimmer build: diff --git a/tools/integration-tests/tester/docker-compose.yml b/tools/integration-tests/tester/docker-compose.yml index 62bfbd4c4fbd4ca84e468812e4df34ce06d74c63..72438dcdefb644f583e4a660b58cb69707a9c99c 100644 --- a/tools/integration-tests/tester/docker-compose.yml +++ b/tools/integration-tests/tester/docker-compose.yml @@ -7,7 +7,6 @@ services: working_dir: /go/src/github.com/iotaledger/goshimmer/tools/integration-tests/tester entrypoint: go test ./tests -v -mod=readonly volumes: - - /var/run/docker.sock:/var/run/docker.sock:ro - - ../../..:/go/src/github.com/iotaledger/goshimmer:ro - - ../logs:/tmp/logs - + - /var/run/docker.sock:/var/run/docker.sock:ro + - ../../..:/go/src/github.com/iotaledger/goshimmer:ro + - ../logs:/tmp/logs \ No newline at end of file diff --git a/tools/integration-tests/tester/framework/docker.go b/tools/integration-tests/tester/framework/docker.go index 1851325ca5a6e91881734cbd5c047f57caa0cddb..19b37ce21a66485fc58352076accdb92ddab6ac0 100644 --- a/tools/integration-tests/tester/framework/docker.go +++ b/tools/integration-tests/tester/framework/docker.go @@ -69,7 +69,7 @@ func (d *DockerContainer) CreateGoShimmerEntryNode(name string, seed string) err } // CreateGoShimmerPeer creates a new container with the GoShimmer peer's configuration. -func (d *DockerContainer) CreateGoShimmerPeer(name string, seed string, entryNodeHost string, entryNodePublicKey string) error { +func (d *DockerContainer) CreateGoShimmerPeer(name string, seed string, entryNodeHost string, entryNodePublicKey string, bootstrap bool) error { // configure GoShimmer container instance containerConfig := &container.Config{ Image: "iotaledger/goshimmer", @@ -78,6 +78,12 @@ func (d *DockerContainer) CreateGoShimmerPeer(name string, seed string, entryNod }, Cmd: strslice.StrSlice{ fmt.Sprintf("--node.disablePlugins=%s", disabledPluginsPeer), + fmt.Sprintf("--node.enablePlugins=%s", func() string { + if bootstrap { + return "Bootstrap" + } + return "" + }()), "--webapi.bindAddress=0.0.0.0:8080", fmt.Sprintf("--autopeering.seed=%s", seed), fmt.Sprintf("--autopeering.entryNodes=%s@%s:14626", entryNodePublicKey, entryNodeHost), diff --git a/tools/integration-tests/tester/framework/framework.go b/tools/integration-tests/tester/framework/framework.go index 71d81786ac1e6978d752051f5258f9362facc811..0770c3a5f868d961a61d79f6ce160be97755053f 100644 --- a/tools/integration-tests/tester/framework/framework.go +++ b/tools/integration-tests/tester/framework/framework.go @@ -55,6 +55,7 @@ func newFramework() (*Framework, error) { // CreateNetwork creates and returns a (Docker) Network that contains `peers` GoShimmer nodes. // It waits for the peers to autopeer until the minimum neighbors criteria is met for every peer. +// The first peer automatically starts with the bootstrap plugin enabled. func (f *Framework) CreateNetwork(name string, peers int, minimumNeighbors int) (*Network, error) { network, err := newNetwork(f.dockerClient, strings.ToLower(name), f.tester) if err != nil { @@ -68,8 +69,8 @@ func (f *Framework) CreateNetwork(name string, peers int, minimumNeighbors int) // create peers/GoShimmer nodes for i := 0; i < peers; i++ { - _, err = network.CreatePeer() - if err != nil { + bootstrap := i == 0 + if _, err = network.CreatePeer(bootstrap); err != nil { return nil, err } } diff --git a/tools/integration-tests/tester/framework/network.go b/tools/integration-tests/tester/framework/network.go index 3b4c9480940b9c566749fe4adec05bb32f21532a..0a0c875f81d4517f2428bac922c7ce3e9608c42a 100644 --- a/tools/integration-tests/tester/framework/network.go +++ b/tools/integration-tests/tester/framework/network.go @@ -82,7 +82,8 @@ func (n *Network) createEntryNode() error { } // CreatePeer creates a new peer/GoShimmer node in the network and returns it. -func (n *Network) CreatePeer() (*Peer, error) { +// Passing bootstrap true enables the bootstrap plugin on the given peer. +func (n *Network) CreatePeer(bootstrap bool) (*Peer, error) { name := n.namePrefix(fmt.Sprintf("%s%d", containerNameReplica, len(n.peers))) // create identity @@ -94,7 +95,7 @@ func (n *Network) CreatePeer() (*Peer, error) { // create Docker container container := NewDockerContainer(n.dockerClient) - err = container.CreateGoShimmerPeer(name, seed, n.namePrefix(containerNameEntryNode), n.entryNodePublicKey()) + err = container.CreateGoShimmerPeer(name, seed, n.namePrefix(containerNameEntryNode), n.entryNodePublicKey(), bootstrap) if err != nil { return nil, err } diff --git a/tools/integration-tests/tester/tests/synchronization_test.go b/tools/integration-tests/tester/tests/synchronization_test.go index 35ecc45fd646ad08073f6e128037d27d6fc5b9c2..43974cfaeba2451706cbc539295a652cf9b173a1 100644 --- a/tools/integration-tests/tester/tests/synchronization_test.go +++ b/tools/integration-tests/tester/tests/synchronization_test.go @@ -18,6 +18,9 @@ func TestNodeSynchronization(t *testing.T) { require.NoError(t, err) defer n.Shutdown() + // wait for peers to change their state to synchronized + time.Sleep(5 * time.Second) + numMessages := 100 idsMap := make(map[string]MessageSent, numMessages) ids := make([]string, numMessages) @@ -40,7 +43,7 @@ func TestNodeSynchronization(t *testing.T) { checkForMessageIds(t, n.Peers(), ids, idsMap) // spawn peer without knowledge of previous messages - newPeer, err := n.CreatePeer() + newPeer, err := n.CreatePeer(false) require.NoError(t, err) err = n.WaitForAutopeering(3) require.NoError(t, err) @@ -91,6 +94,11 @@ func sendDataMessage(t *testing.T, peer *framework.Peer, data []byte, number int func checkForMessageIds(t *testing.T, peers []*framework.Peer, ids []string, idsMap map[string]MessageSent) { for _, peer := range peers { + // check that the peer sees itself as synchronized + info, err := peer.Info() + require.NoError(t, err) + require.True(t, info.Synced) + resp, err := peer.FindMessageByID(ids) require.NoError(t, err)