package sync import ( "errors" "sync" "time" "github.com/iotaledger/goshimmer/packages/binary/messagelayer/message" "github.com/iotaledger/goshimmer/packages/binary/messagelayer/tangle" gossipPkg "github.com/iotaledger/goshimmer/packages/gossip" "github.com/iotaledger/goshimmer/packages/shutdown" "github.com/iotaledger/goshimmer/plugins/autopeering/local" "github.com/iotaledger/goshimmer/plugins/config" "github.com/iotaledger/goshimmer/plugins/gossip" "github.com/iotaledger/goshimmer/plugins/messagelayer" "github.com/iotaledger/hive.go/daemon" "github.com/iotaledger/hive.go/events" "github.com/iotaledger/hive.go/logger" "github.com/iotaledger/hive.go/node" "github.com/iotaledger/hive.go/types" flag "github.com/spf13/pflag" "go.uber.org/atomic" ) const ( // PluginName is the plugin name of the sync plugin. PluginName = "Sync" // CfgSyncAnchorPointsCount defines the amount of anchor points to use to determine // whether a node is synchronized. CfgSyncAnchorPointsCount = "sync.anchorPointsCount" // CfgSyncAnchorPointsCleanupAfterSec defines the amount of time which is allowed to pass between setting an anchor // point and it not becoming solid (to clean its slot for another anchor point). It basically defines the expectancy // of how long it should take for an anchor point to become solid. Even if this value is set too low, usually a node // would eventually solidify collected anchor points. CfgSyncAnchorPointsCleanupAfterSec = "sync.anchorPointsCleanupAfterSec" // CfgSyncAnchorPointsCleanupIntervalSec defines the interval at which it is checked whether anchor points fall // into the cleanup window. CfgSyncAnchorPointsCleanupIntervalSec = "sync.anchorPointsCleanupIntervalSec" // CfgSyncDesyncedIfNoMessageAfterSec defines the time period in which new messages must be received and if not // the node is marked as desynced. CfgSyncDesyncedIfNoMessageAfterSec = "sync.desyncedIfNoMessagesAfterSec" // defines the max. divergence a potential new anchor point's issuance time can have // from the current issuance threshold. say the current threshold is at 1000, the boundary at 10, // we allow a new potential anchor point's issuance time to be within >=990 / 10 seconds older // than the current threshold. issuanceThresholdBeforeTimeBoundary = 20 * time.Second ) func init() { flag.Int(CfgSyncAnchorPointsCount, 3, "the amount of anchor points to use to determine whether a node is synchronized") flag.Int(CfgSyncDesyncedIfNoMessageAfterSec, 300, "the time period in seconds which sets the node as desynced if no new messages are received") flag.Int(CfgSyncAnchorPointsCleanupIntervalSec, 10, "the interval at which it is checked whether anchor points fall into the cleanup window") flag.Int(CfgSyncAnchorPointsCleanupAfterSec, 60, "the amount of time which is allowed to pass between setting an anchor point and it not becoming solid (to clean its slot for another anchor point)") } var ( // plugin is the plugin instance of the sync plugin. plugin *node.Plugin once sync.Once // ErrNodeNotSynchronized is returned when an operation can't be executed because // the node is not synchronized. ErrNodeNotSynchronized = errors.New("node is not synchronized") // tells whether the node is synced or not. synced atomic.Bool log *logger.Logger ) // Plugin gets the plugin instance. func Plugin() *node.Plugin { once.Do(func() { plugin = node.NewPlugin(PluginName, node.Enabled, configure, run) }) return plugin } // Synced tells whether the node is in a state we consider synchronized, meaning // it has the relevant past and present message data. func Synced() bool { return synced.Load() } // OverwriteSyncedState overwrites the synced state with the given value. func OverwriteSyncedState(syncedOverwrite bool) { synced.Store(syncedOverwrite) } func configure(_ *node.Plugin) { log = logger.NewLogger(PluginName) } func run(_ *node.Plugin) { // per default the node starts in a desynced state if !Synced() { monitorForSynchronization() return } // however, another plugin might want to overwrite the synced state (i.e. the bootstrap plugin) // in order to start issuing messages monitorForDesynchronization() } // MarkSynced marks the node as synced and spawns the background worker to monitor desynchronization. func MarkSynced() { synced.Store(true) isRunning := false for _, worker := range daemon.GetRunningBackgroundWorkers() { if worker == "Desync-Monitor" { isRunning = true break } } if !isRunning { monitorForDesynchronization() } } // MarkDesynced marks the node as desynced and spawns the background worker to monitor synchronization. func MarkDesynced() { synced.Store(false) isRunning := false for _, worker := range daemon.GetRunningBackgroundWorkers() { if worker == "Sync-Monitor" { isRunning = true break } } if !isRunning { monitorForSynchronization() } } // starts a background worker and event handlers to check whether the node is desynchronized by checking // whether the node has no more peers or didn't receive any message in a given time period. func monitorForDesynchronization() { log.Info("monitoring for desynchronization") // monitors the peer count of the manager and sets the node as desynced if it has no more peers. noPeers := make(chan types.Empty) monitorPeerCountClosure := events.NewClosure(func(_ *gossipPkg.Neighbor) { anyPeers := len(gossip.Manager().AllNeighbors()) > 0 if anyPeers { return } noPeers <- types.Empty{} }) msgReceived := make(chan types.Empty, 1) monitorMessageInflowClosure := events.NewClosure(func(cachedMessage *message.CachedMessage, cachedMessageMetadata *tangle.CachedMessageMetadata) { defer cachedMessage.Release() defer cachedMessageMetadata.Release() // ignore messages sent by the node itself if local.GetInstance().LocalIdentity().PublicKey() == cachedMessage.Unwrap().IssuerPublicKey() { return } select { case msgReceived <- types.Empty{}: default: // via this default clause, a slow desync-monitor select-loop // worker should not increase latency as it auto. falls through } }) if err := daemon.BackgroundWorker("Desync-Monitor", func(shutdownSignal <-chan struct{}) { gossip.Manager().Events().NeighborRemoved.Attach(monitorPeerCountClosure) defer gossip.Manager().Events().NeighborRemoved.Detach(monitorPeerCountClosure) messagelayer.Tangle().Events.MessageAttached.Attach(monitorMessageInflowClosure) defer messagelayer.Tangle().Events.MessageAttached.Detach(monitorMessageInflowClosure) timeForDesync := config.Node().GetDuration(CfgSyncDesyncedIfNoMessageAfterSec) * time.Second timer := time.NewTimer(timeForDesync) for { select { case <-msgReceived: // we received a message, therefore reset the timer to check for message receives if !timer.Stop() { <-timer.C } // TODO: perhaps find a better way instead of constantly resetting the timer timer.Reset(timeForDesync) case <-timer.C: log.Infof("no message received in %d seconds, marking node as desynced", int(timeForDesync.Seconds())) MarkDesynced() return case <-noPeers: log.Info("all peers have been lost, marking node as desynced") MarkDesynced() return case <-shutdownSignal: return } } }, shutdown.PrioritySynchronization); err != nil { log.Panicf("Failed to start as daemon: %s", err) } } // starts a background worker and event handlers to check whether the node is synchronized by first collecting // a set of newly received messages and then waiting for them to become solid. func monitorForSynchronization() { wantedAnchorPointsCount := config.Node().GetInt(CfgSyncAnchorPointsCount) anchorPoints := newAnchorPoints(wantedAnchorPointsCount) log.Infof("monitoring for synchronization, awaiting %d anchor point messages to become solid", wantedAnchorPointsCount) synced := make(chan types.Empty) initAnchorPointClosure := events.NewClosure(func(cachedMessage *message.CachedMessage, cachedMessageMetadata *tangle.CachedMessageMetadata) { defer cachedMessage.Release() defer cachedMessageMetadata.Release() if addedAnchorID := initAnchorPoint(anchorPoints, cachedMessage.Unwrap()); addedAnchorID != nil { anchorPoints.Lock() defer anchorPoints.Unlock() log.Infof("added message %s as anchor point (%d of %d collected)", addedAnchorID.String()[:10], anchorPoints.collectedCount(), anchorPoints.wanted) } }) checkAnchorPointSolidityClosure := events.NewClosure(func(cachedMessage *message.CachedMessage, cachedMessageMetadata *tangle.CachedMessageMetadata) { defer cachedMessage.Release() defer cachedMessageMetadata.Release() allSolid, newSolidAnchorID := checkAnchorPointSolidity(anchorPoints, cachedMessage.Unwrap()) if newSolidAnchorID != nil { log.Infof("anchor message %s has become solid", newSolidAnchorID.String()[:10]) } if !allSolid { return } synced <- types.Empty{} }) if err := daemon.BackgroundWorker("Sync-Monitor", func(shutdownSignal <-chan struct{}) { messagelayer.Tangle().Events.MessageAttached.Attach(initAnchorPointClosure) defer messagelayer.Tangle().Events.MessageAttached.Detach(initAnchorPointClosure) messagelayer.Tangle().Events.MessageSolid.Attach(checkAnchorPointSolidityClosure) defer messagelayer.Tangle().Events.MessageSolid.Detach(checkAnchorPointSolidityClosure) cleanupDelta := config.Node().GetDuration(CfgSyncAnchorPointsCleanupAfterSec) * time.Second ticker := time.NewTicker(config.Node().GetDuration(CfgSyncAnchorPointsCleanupIntervalSec) * time.Second) defer ticker.Stop() for { select { case <-ticker.C: anchorPoints.Lock() for id, itGotAdded := range anchorPoints.ids { if time.Since(itGotAdded) > cleanupDelta { log.Infof("freeing anchor point slot of %s as it didn't become solid within %v", id.String()[:10], cleanupDelta) delete(anchorPoints.ids, id) } } anchorPoints.Unlock() case <-shutdownSignal: return case <-synced: log.Infof("all anchor messages have become solid, marking node as synced") MarkSynced() return } } }, shutdown.PrioritySynchronization); err != nil { log.Panicf("Failed to start as daemon: %s", err) } } // fills up the anchor points with newly attached messages which then are used to determine whether we are synchronized. func initAnchorPoint(anchorPoints *anchorpoints, msg *message.Message) *message.Id { if synced.Load() { return nil } anchorPoints.Lock() defer anchorPoints.Unlock() // we don't need to add additional anchor points if the set was already filled once if anchorPoints.wasFilled() { return nil } // as a rule, we don't consider messages attaching directly to genesis anchors if msg.TrunkId() == message.EmptyId || msg.BranchId() == message.EmptyId { return nil } // add a new anchor point if its issuance time is newer than any other anchor point id := msg.Id() if !anchorPoints.add(id, msg.IssuingTime()) { return nil } return &id } // checks whether an anchor point message became solid. // if all anchor points became solid, it sets the node's state to synchronized. func checkAnchorPointSolidity(anchorPoints *anchorpoints, msg *message.Message) (bool, *message.Id) { anchorPoints.Lock() defer anchorPoints.Unlock() if synced.Load() || len(anchorPoints.ids) == 0 { return false, nil } // check whether an anchor message become solid msgID := msg.Id() if !anchorPoints.has(msgID) { return false, nil } // an anchor became solid anchorPoints.markAsSolidified(msgID) if !anchorPoints.wereAllSolidified() { return false, &msgID } // all anchor points have become solid return true, &msgID } func newAnchorPoints(wantedAnchorPointsCount int) *anchorpoints { return &anchorpoints{ ids: make(map[message.Id]time.Time), wanted: wantedAnchorPointsCount, } } // anchorpoints are a set of messages which we use to determine whether the node has become synchronized. type anchorpoints struct { sync.Mutex // the ids of the anchor points with their addition time. ids map[message.Id]time.Time // the wanted amount of anchor points which should become solid. wanted int // how many anchor points have been solidified. solidified int // holds the highest issuance time of any message which was an anchor point. // this is used to determine whether further attached messages should become an // anchor point by matching their issuance time against this time. issuanceTimeThreshold time.Time } // adds the given message to the anchor points set if its issuance time is newer than // any other existing anchor point's. func (ap *anchorpoints) add(id message.Id, issuanceTime time.Time) bool { if !ap.issuanceTimeThreshold.IsZero() && ap.issuanceTimeThreshold.Add(-issuanceThresholdBeforeTimeBoundary).After(issuanceTime) { return false } ap.ids[id] = time.Now() ap.issuanceTimeThreshold = issuanceTime return true } func (ap *anchorpoints) has(id message.Id) bool { _, has := ap.ids[id] return has } // marks the given anchor point as solidified which removes it from the set and bumps the solidified count. func (ap *anchorpoints) markAsSolidified(id message.Id) { delete(ap.ids, id) ap.solidified++ } // tells whether the anchor points set was filled at some point. func (ap *anchorpoints) wasFilled() bool { return ap.collectedCount() == ap.wanted } // tells whether all anchor points have become solid. func (ap *anchorpoints) wereAllSolidified() bool { return ap.solidified == ap.wanted } // tells the number of effectively collected anchor points. func (ap *anchorpoints) collectedCount() int { // since an anchor point potentially was solidified before the set became full, // we need to incorporate that count too return ap.solidified + len(ap.ids) }