Skip to content
Snippets Groups Projects
Unverified Commit b1b78a60 authored by Jonas Theis's avatar Jonas Theis Committed by GitHub
Browse files

Integrate sync beacon (#670)

* Use sync beacon follower plugin instead of sync plugin

* Remove sync and bootstrap plugin

* Refactor sync beacon stuff

* Update Docker network to use sync beacon plugins

* :lipstick: Add detailed sync status to dashboard

* :sparkles: Add detailed sync status to info API

* :rotating_light: Fix linter warning

* :lipstick: Add Explorer support for sync beacon messages

* Initial integration test support for sync beacon plugins

* Fix consensus integration test

* Disable sync beacon follower plugin according to config

* :white_check_mark: Fix dRNG integration-test

* Fix sync beacon test

* :white_check_mark:

 Fix common integration test

* Clean up and add some comments

Co-authored-by: default avatarcapossele <angelocapossele@gmail.com>
parent 7ba94158
No related branches found
No related tags found
No related merge requests found
Showing
with 272 additions and 571 deletions
......@@ -5,7 +5,6 @@ import (
"github.com/iotaledger/goshimmer/dapps/valuetransfers"
"github.com/iotaledger/goshimmer/plugins/autopeering"
"github.com/iotaledger/goshimmer/plugins/banner"
"github.com/iotaledger/goshimmer/plugins/bootstrap"
"github.com/iotaledger/goshimmer/plugins/cli"
"github.com/iotaledger/goshimmer/plugins/config"
"github.com/iotaledger/goshimmer/plugins/database"
......@@ -19,7 +18,6 @@ import (
"github.com/iotaledger/goshimmer/plugins/portcheck"
"github.com/iotaledger/goshimmer/plugins/pow"
"github.com/iotaledger/goshimmer/plugins/profiling"
"github.com/iotaledger/goshimmer/plugins/sync"
"github.com/iotaledger/goshimmer/plugins/syncbeacon"
"github.com/iotaledger/goshimmer/plugins/syncbeaconfollower"
......@@ -40,8 +38,6 @@ var PLUGINS = node.Plugins(
messagelayer.Plugin(),
gossip.Plugin(),
issuer.Plugin(),
bootstrap.Plugin(),
sync.Plugin(),
metrics.Plugin(),
drng.Plugin(),
faucet.App(),
......
package bootstrap
import (
goSync "sync"
"time"
"github.com/iotaledger/goshimmer/packages/binary/spammer"
"github.com/iotaledger/goshimmer/packages/shutdown"
"github.com/iotaledger/goshimmer/plugins/config"
"github.com/iotaledger/goshimmer/plugins/issuer"
"github.com/iotaledger/goshimmer/plugins/sync"
"github.com/iotaledger/hive.go/daemon"
"github.com/iotaledger/hive.go/logger"
"github.com/iotaledger/hive.go/node"
flag "github.com/spf13/pflag"
)
const (
// PluginName is the plugin name of the bootstrap plugin.
PluginName = "Bootstrap"
// CfgBootstrapInitialIssuanceTimePeriodSec defines the initial time period of how long the node should be
// issuing messages when started in bootstrapping mode. If the value is set to -1, the issuance is continuous.
CfgBootstrapInitialIssuanceTimePeriodSec = "bootstrap.initialIssuance.timePeriodSec"
// CfgBootstrapTimeUnit defines the time unit (in seconds) of the issuance rate (e.g., 3 messages per 12 seconds).
CfgBootstrapTimeUnit = "bootstrap.timeUnit"
// the messages per period to issue when in bootstrapping mode.
initialIssuanceRate = 1
// the value which determines a continuous issuance of messages from the bootstrap plugin.
continuousIssuance = -1
)
func init() {
flag.Int(CfgBootstrapInitialIssuanceTimePeriodSec, -1, "the initial time period of how long the node should be issuing messages when started in bootstrapping mode. "+
"if the value is set to -1, the issuance is continuous.")
flag.Int(CfgBootstrapTimeUnit, 5, "the time unit (in seconds) of the issuance rate (e.g., 1 messages per 5 seconds).")
}
var (
// plugin is the plugin instance of the bootstrap plugin.
plugin *node.Plugin
once goSync.Once
log *logger.Logger
)
// Plugin gets the plugin instance.
func Plugin() *node.Plugin {
once.Do(func() {
plugin = node.NewPlugin(PluginName, node.Disabled, configure, run)
})
return plugin
}
func configure(_ *node.Plugin) {
log = logger.NewLogger(PluginName)
// we're auto. synced if we start in bootstrapping mode
sync.OverwriteSyncedState(true)
log.Infof("starting node in bootstrapping mode")
}
func run(_ *node.Plugin) {
messageSpammer := spammer.New(issuer.IssuePayload)
issuancePeriodSec := config.Node().GetInt(CfgBootstrapInitialIssuanceTimePeriodSec)
timeUnit := config.Node().GetInt(CfgBootstrapTimeUnit)
if timeUnit <= 0 {
log.Panicf("Invalid Bootsrap time unit: %d seconds", timeUnit)
}
issuancePeriod := time.Duration(issuancePeriodSec) * time.Second
// issue messages on top of the genesis
if err := daemon.BackgroundWorker("Bootstrapping-Issuer", func(shutdownSignal <-chan struct{}) {
messageSpammer.Start(initialIssuanceRate, time.Duration(timeUnit)*time.Second)
defer messageSpammer.Shutdown()
// don't stop issuing messages if in continuous mode
if issuancePeriodSec == continuousIssuance {
log.Info("continuously issuing bootstrapping messages")
<-shutdownSignal
return
}
log.Infof("issuing bootstrapping messages for %d seconds", issuancePeriodSec)
select {
case <-time.After(issuancePeriod):
case <-shutdownSignal:
}
}, shutdown.PriorityBootstrap); err != nil {
log.Panicf("Failed to start as daemon: %s", err)
}
}
......@@ -40,16 +40,14 @@ export class Dashboard extends React.Component<Props, any> {
<ListGroup.Item><Version/></ListGroup.Item>
</ListGroup>
</Col>
<Col>
<ListGroup variant={"flush"}>
<ListGroup.Item><Synced/></ListGroup.Item>
</ListGroup>
</Col>
</Row>
</Card.Body>
</Card>
</Col>
</Row>
<Row className={"mb-3"}>
<Col><Synced/></Col>
</Row>
<Row className={"mb-3"}>
<Col><MPSChart/></Col>
</Row>
......
......@@ -13,6 +13,7 @@ import {Link} from 'react-router-dom';
import {BasicPayload} from 'app/components/BasicPayload'
import {DrngPayload} from 'app/components/DrngPayload'
import {ValuePayload} from 'app/components/ValuePayload'
import {SyncBeaconPayload} from 'app/components/SyncBeaconPayload'
import {PayloadType} from 'app/misc/Payload'
interface Props {
......@@ -56,6 +57,8 @@ export class ExplorerMessageQueryResult extends React.Component<Props, any> {
return "Drng"
case PayloadType.Faucet:
return "Faucet"
case PayloadType.SyncBeacon:
return "SyncBeacon"
default:
return "Unknown"
}
......@@ -68,6 +71,8 @@ export class ExplorerMessageQueryResult extends React.Component<Props, any> {
case PayloadType.Value:
return <ValuePayload/>
case PayloadType.Data:
case PayloadType.SyncBeacon:
return <SyncBeaconPayload/>
case PayloadType.Faucet:
default:
return <BasicPayload/>
......
import * as React from 'react';
import Row from "react-bootstrap/Row";
import Col from "react-bootstrap/Col";
import {inject, observer} from "mobx-react";
import {ExplorerStore} from "app/stores/ExplorerStore";
import ListGroup from "react-bootstrap/ListGroup";
import * as dateformat from 'dateformat';
interface Props {
explorerStore?: ExplorerStore;
}
@inject("explorerStore")
@observer
export class SyncBeaconPayload extends React.Component<Props, any> {
render() {
let {payload} = this.props.explorerStore;
return (
payload &&
<React.Fragment>
<Row className={"mb-3"}>
<Col>
<ListGroup>
<ListGroup.Item>Sent Time: {dateformat(new Date(payload.sent_time/1000000), "dd.mm.yyyy HH:MM:ss")} </ListGroup.Item>
</ListGroup>
</Col>
</Row>
</React.Fragment>
);
}
}
import * as React from 'react';
import NodeStore from "app/stores/NodeStore";
import Card from "react-bootstrap/Card";
import {Link} from 'react-router-dom';
import {inject, observer} from "mobx-react";
import * as dateformat from 'dateformat';
interface Props {
nodeStore?: NodeStore;
......@@ -11,9 +14,25 @@ interface Props {
export default class Synced extends React.Component<Props, any> {
render() {
return (
<React.Fragment>
Synced: {this.props.nodeStore.status.synced? "Yes":"No"}
</React.Fragment>
<Card>
<Card.Body>
<Card.Title>Synced: {this.props.nodeStore.status.synced? "Yes":"No"}</Card.Title>
<small>
{Object.keys(this.props.nodeStore.status.beacons).map(nodeID =>
<div>
<hr/>
<div><strong>Public Key: {nodeID}</strong></div>
<div>Synced: {this.props.nodeStore.status.beacons[nodeID].synced? "Yes":"No"}</div>
<div>Beacon: <Link to={`/explorer/message/${this.props.nodeStore.status.beacons[nodeID].msg_id}`}>
{this.props.nodeStore.status.beacons[nodeID].msg_id}
</Link></div>
<div>Time: {dateformat(new Date(this.props.nodeStore.status.beacons[nodeID].sent_time/1000000), "dd.mm.yyyy HH:MM:ss")}</div>
</div>
)}
</small>
</Card.Body>
</Card>
);
}
}
......@@ -3,6 +3,7 @@ export enum PayloadType {
Value = 1,
Faucet = 2,
Drng = 111,
SyncBeacon = 200,
}
export enum DrngSubtype {
......@@ -54,3 +55,8 @@ export class Balance {
value: number;
color: string;
}
// Sync beacon payload
export class SyncBeaconPayload {
sent_time: number;
}
\ No newline at end of file
import {action, computed, observable} from 'mobx';
import {registerHandler, WSMsgType} from "app/misc/WS";
import {BasicPayload, DrngCbPayload, DrngPayload, DrngSubtype, PayloadType, ValuePayload} from "app/misc/Payload";
import {BasicPayload, DrngCbPayload, DrngPayload, DrngSubtype, PayloadType, ValuePayload, SyncBeaconPayload} from "app/misc/Payload";
import * as React from "react";
import {Link} from 'react-router-dom';
import {RouterStore} from "mobx-react-router";
......@@ -189,6 +189,11 @@ export class ExplorerStore {
break;
case PayloadType.Value:
this.payload = msg.payload as ValuePayload
break;
case PayloadType.SyncBeacon:
this.payload = msg.payload as SyncBeaconPayload
console.log(this.payload.sent_time);
break;
case PayloadType.Data:
case PayloadType.Faucet:
default:
......
......@@ -12,9 +12,16 @@ class Status {
version: string;
uptime: number;
synced: boolean;
beacons: Map<string, Beacon>;
mem: MemoryMetrics = new MemoryMetrics();
}
class Beacon {
msg_id: string;
sent_time: number;
synced: boolean;
}
class MemoryMetrics {
sys: number;
heap_sys: number;
......@@ -165,6 +172,7 @@ export class NodeStore {
@observable collecting: boolean = true;
constructor() {
this.status.beacons = new Map<string, Beacon>();
this.registerHandlers();
}
......
This diff is collapsed.
......@@ -9,6 +9,7 @@ import (
drngheader "github.com/iotaledger/goshimmer/packages/binary/drng/payload/header"
cb "github.com/iotaledger/goshimmer/packages/binary/drng/subtypes/collectiveBeacon/payload"
"github.com/iotaledger/goshimmer/packages/binary/messagelayer/payload"
syncbeaconpayload "github.com/iotaledger/goshimmer/plugins/syncbeacon/payload"
"github.com/iotaledger/hive.go/marshalutil"
)
......@@ -25,6 +26,11 @@ type BasicStringPayload struct {
Content string `json:"content"`
}
// SyncBeaconPayload contains sent time of a sync beacon.
type SyncBeaconPayload struct {
SentTime int64 `json:"sent_time"`
}
// DrngPayload contains the subtype of drng payload, instance Id
// and the subpayload
type DrngPayload struct {
......@@ -88,6 +94,9 @@ func ProcessPayload(p payload.Payload) interface{} {
case drngpayload.Type:
// drng payload
return processDrngPayload(p)
case syncbeaconpayload.Type:
// sync beacon payload
return processSyncBeaconPayload(p)
case valuepayload.Type:
return processValuePayload(p)
default:
......@@ -129,6 +138,19 @@ func processDrngPayload(p payload.Payload) (dp DrngPayload) {
}
}
// processDrngPayload handles the subtypes of Drng payload
func processSyncBeaconPayload(p payload.Payload) (dp SyncBeaconPayload) {
syncBeaconPayload, ok := p.(*syncbeaconpayload.Payload)
if !ok {
log.Info("could not cast payload to sync beacon object")
return
}
return SyncBeaconPayload{
SentTime: syncBeaconPayload.SentTime(),
}
}
// processValuePayload handles Value payload
func processValuePayload(p payload.Payload) (vp ValuePayload) {
marshalUtil := marshalutil.New(p.Bytes())
......
......@@ -18,7 +18,9 @@ import (
"github.com/iotaledger/goshimmer/plugins/drng"
"github.com/iotaledger/goshimmer/plugins/gossip"
"github.com/iotaledger/goshimmer/plugins/metrics"
"github.com/iotaledger/goshimmer/plugins/syncbeaconfollower"
"github.com/iotaledger/hive.go/autopeering/peer/service"
"github.com/iotaledger/hive.go/crypto/ed25519"
"github.com/iotaledger/hive.go/daemon"
"github.com/iotaledger/hive.go/events"
"github.com/iotaledger/hive.go/logger"
......@@ -163,11 +165,19 @@ type msg struct {
}
type nodestatus struct {
ID string `json:"id"`
Version string `json:"version"`
Uptime int64 `json:"uptime"`
Synced bool `json:"synced"`
Mem *memmetrics `json:"mem"`
ID string `json:"id"`
Version string `json:"version"`
Uptime int64 `json:"uptime"`
Synced bool `json:"synced"`
Beacons map[string]Beacon `json:"beacons"`
Mem *memmetrics `json:"mem"`
}
// Beacon contains a sync beacons detailed status.
type Beacon struct {
MsgID string `json:"msg_id"`
SentTime int64 `json:"sent_time"`
Synced bool `json:"synced"`
}
type memmetrics struct {
......@@ -227,13 +237,25 @@ func neighborMetrics() []neighbormetric {
func currentNodeStatus() *nodestatus {
var m runtime.MemStats
runtime.ReadMemStats(&m)
status := &nodestatus{}
status := &nodestatus{
Beacons: make(map[string]Beacon),
}
status.ID = local.GetInstance().ID().String()
// node status
status.Version = banner.AppVersion
status.Uptime = time.Since(nodeStartAt).Milliseconds()
status.Synced = metrics.Synced()
var beacons map[ed25519.PublicKey]syncbeaconfollower.Status
status.Synced, beacons = syncbeaconfollower.SyncStatus()
for publicKey, s := range beacons {
status.Beacons[publicKey.String()] = Beacon{
MsgID: s.MsgID.String(),
SentTime: s.SentTime,
Synced: s.Synced,
}
}
// memory metrics
status.Mem = &memmetrics{
......
......@@ -7,7 +7,7 @@ import (
"github.com/iotaledger/goshimmer/packages/binary/messagelayer/message"
"github.com/iotaledger/goshimmer/packages/binary/messagelayer/payload"
"github.com/iotaledger/goshimmer/plugins/messagelayer"
"github.com/iotaledger/goshimmer/plugins/sync"
"github.com/iotaledger/goshimmer/plugins/syncbeaconfollower"
"github.com/iotaledger/hive.go/node"
)
......@@ -33,8 +33,8 @@ func configure(_ *node.Plugin) {}
// IssuePayload issues a payload to the message layer.
// If the node is not synchronized an error is returned.
func IssuePayload(payload payload.Payload) (*message.Message, error) {
if !sync.Synced() {
return nil, fmt.Errorf("can't issue payload: %w", sync.ErrNodeNotSynchronized)
if !syncbeaconfollower.Synced() {
return nil, fmt.Errorf("can't issue payload: %w", syncbeaconfollower.ErrNodeNotSynchronized)
}
msg, err := messagelayer.MessageFactory().IssuePayload(payload)
......
......@@ -2,7 +2,7 @@ package metrics
import (
"github.com/iotaledger/goshimmer/packages/metrics"
"github.com/iotaledger/goshimmer/plugins/sync"
"github.com/iotaledger/goshimmer/plugins/syncbeaconfollower"
"go.uber.org/atomic"
)
......@@ -11,7 +11,7 @@ var (
)
func measureSynced() {
s := sync.Synced()
s := syncbeaconfollower.Synced()
metrics.Events().Synced.Trigger(s)
}
......
package sync
import (
"errors"
"sync"
"time"
"github.com/iotaledger/goshimmer/packages/binary/messagelayer/message"
"github.com/iotaledger/goshimmer/packages/binary/messagelayer/tangle"
gossipPkg "github.com/iotaledger/goshimmer/packages/gossip"
"github.com/iotaledger/goshimmer/packages/shutdown"
"github.com/iotaledger/goshimmer/plugins/autopeering/local"
"github.com/iotaledger/goshimmer/plugins/config"
"github.com/iotaledger/goshimmer/plugins/gossip"
"github.com/iotaledger/goshimmer/plugins/messagelayer"
"github.com/iotaledger/hive.go/daemon"
"github.com/iotaledger/hive.go/events"
"github.com/iotaledger/hive.go/logger"
"github.com/iotaledger/hive.go/node"
"github.com/iotaledger/hive.go/types"
flag "github.com/spf13/pflag"
"go.uber.org/atomic"
)
const (
// PluginName is the plugin name of the sync plugin.
PluginName = "Sync"
// CfgSyncAnchorPointsCount defines the amount of anchor points to use to determine
// whether a node is synchronized.
CfgSyncAnchorPointsCount = "sync.anchorPointsCount"
// CfgSyncAnchorPointsCleanupAfterSec defines the amount of time which is allowed to pass between setting an anchor
// point and it not becoming solid (to clean its slot for another anchor point). It basically defines the expectancy
// of how long it should take for an anchor point to become solid. Even if this value is set too low, usually a node
// would eventually solidify collected anchor points.
CfgSyncAnchorPointsCleanupAfterSec = "sync.anchorPointsCleanupAfterSec"
// CfgSyncAnchorPointsCleanupIntervalSec defines the interval at which it is checked whether anchor points fall
// into the cleanup window.
CfgSyncAnchorPointsCleanupIntervalSec = "sync.anchorPointsCleanupIntervalSec"
// CfgSyncDesyncedIfNoMessageAfterSec defines the time period in which new messages must be received and if not
// the node is marked as desynced.
CfgSyncDesyncedIfNoMessageAfterSec = "sync.desyncedIfNoMessagesAfterSec"
// defines the max. divergence a potential new anchor point's issuance time can have
// from the current issuance threshold. say the current threshold is at 1000, the boundary at 10,
// we allow a new potential anchor point's issuance time to be within >=990 / 10 seconds older
// than the current threshold.
issuanceThresholdBeforeTimeBoundary = 20 * time.Second
)
func init() {
flag.Int(CfgSyncAnchorPointsCount, 3, "the amount of anchor points to use to determine whether a node is synchronized")
flag.Int(CfgSyncDesyncedIfNoMessageAfterSec, 300, "the time period in seconds which sets the node as desynced if no new messages are received")
flag.Int(CfgSyncAnchorPointsCleanupIntervalSec, 10, "the interval at which it is checked whether anchor points fall into the cleanup window")
flag.Int(CfgSyncAnchorPointsCleanupAfterSec, 60, "the amount of time which is allowed to pass between setting an anchor point and it not becoming solid (to clean its slot for another anchor point)")
}
var (
// plugin is the plugin instance of the sync plugin.
plugin *node.Plugin
once sync.Once
// ErrNodeNotSynchronized is returned when an operation can't be executed because
// the node is not synchronized.
ErrNodeNotSynchronized = errors.New("node is not synchronized")
// tells whether the node is synced or not.
synced atomic.Bool
log *logger.Logger
)
// Plugin gets the plugin instance.
func Plugin() *node.Plugin {
once.Do(func() {
plugin = node.NewPlugin(PluginName, node.Enabled, configure, run)
})
return plugin
}
// Synced tells whether the node is in a state we consider synchronized, meaning
// it has the relevant past and present message data.
func Synced() bool {
return synced.Load()
}
// OverwriteSyncedState overwrites the synced state with the given value.
func OverwriteSyncedState(syncedOverwrite bool) {
synced.Store(syncedOverwrite)
}
func configure(_ *node.Plugin) {
log = logger.NewLogger(PluginName)
}
func run(_ *node.Plugin) {
// per default the node starts in a desynced state
if !Synced() {
monitorForSynchronization()
return
}
// however, another plugin might want to overwrite the synced state (i.e. the bootstrap plugin)
// in order to start issuing messages
monitorForDesynchronization()
}
// MarkSynced marks the node as synced and spawns the background worker to monitor desynchronization.
func MarkSynced() {
synced.Store(true)
isRunning := false
for _, worker := range daemon.GetRunningBackgroundWorkers() {
if worker == "Desync-Monitor" {
isRunning = true
break
}
}
if !isRunning {
monitorForDesynchronization()
}
}
// MarkDesynced marks the node as desynced and spawns the background worker to monitor synchronization.
func MarkDesynced() {
synced.Store(false)
isRunning := false
for _, worker := range daemon.GetRunningBackgroundWorkers() {
if worker == "Sync-Monitor" {
isRunning = true
break
}
}
if !isRunning {
monitorForSynchronization()
}
}
// starts a background worker and event handlers to check whether the node is desynchronized by checking
// whether the node has no more peers or didn't receive any message in a given time period.
func monitorForDesynchronization() {
log.Info("monitoring for desynchronization")
// monitors the peer count of the manager and sets the node as desynced if it has no more peers.
noPeers := make(chan types.Empty)
monitorPeerCountClosure := events.NewClosure(func(_ *gossipPkg.Neighbor) {
anyPeers := len(gossip.Manager().AllNeighbors()) > 0
if anyPeers {
return
}
noPeers <- types.Empty{}
})
msgReceived := make(chan types.Empty, 1)
monitorMessageInflowClosure := events.NewClosure(func(cachedMessage *message.CachedMessage, cachedMessageMetadata *tangle.CachedMessageMetadata) {
defer cachedMessage.Release()
defer cachedMessageMetadata.Release()
// ignore messages sent by the node itself
if local.GetInstance().LocalIdentity().PublicKey() == cachedMessage.Unwrap().IssuerPublicKey() {
return
}
select {
case msgReceived <- types.Empty{}:
default:
// via this default clause, a slow desync-monitor select-loop
// worker should not increase latency as it auto. falls through
}
})
if err := daemon.BackgroundWorker("Desync-Monitor", func(shutdownSignal <-chan struct{}) {
gossip.Manager().Events().NeighborRemoved.Attach(monitorPeerCountClosure)
defer gossip.Manager().Events().NeighborRemoved.Detach(monitorPeerCountClosure)
messagelayer.Tangle().Events.MessageAttached.Attach(monitorMessageInflowClosure)
defer messagelayer.Tangle().Events.MessageAttached.Detach(monitorMessageInflowClosure)
timeForDesync := config.Node().GetDuration(CfgSyncDesyncedIfNoMessageAfterSec) * time.Second
timer := time.NewTimer(timeForDesync)
for {
select {
case <-msgReceived:
// we received a message, therefore reset the timer to check for message receives
if !timer.Stop() {
<-timer.C
}
// TODO: perhaps find a better way instead of constantly resetting the timer
timer.Reset(timeForDesync)
case <-timer.C:
log.Infof("no message received in %d seconds, marking node as desynced", int(timeForDesync.Seconds()))
MarkDesynced()
return
case <-noPeers:
log.Info("all peers have been lost, marking node as desynced")
MarkDesynced()
return
case <-shutdownSignal:
return
}
}
}, shutdown.PrioritySynchronization); err != nil {
log.Panicf("Failed to start as daemon: %s", err)
}
}
// starts a background worker and event handlers to check whether the node is synchronized by first collecting
// a set of newly received messages and then waiting for them to become solid.
func monitorForSynchronization() {
wantedAnchorPointsCount := config.Node().GetInt(CfgSyncAnchorPointsCount)
anchorPoints := newAnchorPoints(wantedAnchorPointsCount)
log.Infof("monitoring for synchronization, awaiting %d anchor point messages to become solid", wantedAnchorPointsCount)
synced := make(chan types.Empty)
initAnchorPointClosure := events.NewClosure(func(cachedMessage *message.CachedMessage, cachedMessageMetadata *tangle.CachedMessageMetadata) {
defer cachedMessage.Release()
defer cachedMessageMetadata.Release()
if addedAnchorID := initAnchorPoint(anchorPoints, cachedMessage.Unwrap()); addedAnchorID != nil {
anchorPoints.Lock()
defer anchorPoints.Unlock()
log.Infof("added message %s as anchor point (%d of %d collected)", addedAnchorID.String()[:10], anchorPoints.collectedCount(), anchorPoints.wanted)
}
})
checkAnchorPointSolidityClosure := events.NewClosure(func(cachedMessage *message.CachedMessage, cachedMessageMetadata *tangle.CachedMessageMetadata) {
defer cachedMessage.Release()
defer cachedMessageMetadata.Release()
allSolid, newSolidAnchorID := checkAnchorPointSolidity(anchorPoints, cachedMessage.Unwrap())
if newSolidAnchorID != nil {
log.Infof("anchor message %s has become solid", newSolidAnchorID.String()[:10])
}
if !allSolid {
return
}
synced <- types.Empty{}
})
if err := daemon.BackgroundWorker("Sync-Monitor", func(shutdownSignal <-chan struct{}) {
messagelayer.Tangle().Events.MessageAttached.Attach(initAnchorPointClosure)
defer messagelayer.Tangle().Events.MessageAttached.Detach(initAnchorPointClosure)
messagelayer.Tangle().Events.MessageSolid.Attach(checkAnchorPointSolidityClosure)
defer messagelayer.Tangle().Events.MessageSolid.Detach(checkAnchorPointSolidityClosure)
cleanupDelta := config.Node().GetDuration(CfgSyncAnchorPointsCleanupAfterSec) * time.Second
ticker := time.NewTicker(config.Node().GetDuration(CfgSyncAnchorPointsCleanupIntervalSec) * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
anchorPoints.Lock()
for id, itGotAdded := range anchorPoints.ids {
if time.Since(itGotAdded) > cleanupDelta {
log.Infof("freeing anchor point slot of %s as it didn't become solid within %v", id.String()[:10], cleanupDelta)
delete(anchorPoints.ids, id)
}
}
anchorPoints.Unlock()
case <-shutdownSignal:
return
case <-synced:
log.Infof("all anchor messages have become solid, marking node as synced")
MarkSynced()
return
}
}
}, shutdown.PrioritySynchronization); err != nil {
log.Panicf("Failed to start as daemon: %s", err)
}
}
// fills up the anchor points with newly attached messages which then are used to determine whether we are synchronized.
func initAnchorPoint(anchorPoints *anchorpoints, msg *message.Message) *message.Id {
if synced.Load() {
return nil
}
anchorPoints.Lock()
defer anchorPoints.Unlock()
// we don't need to add additional anchor points if the set was already filled once
if anchorPoints.wasFilled() {
return nil
}
// as a rule, we don't consider messages attaching directly to genesis anchors
if msg.TrunkId() == message.EmptyId || msg.BranchId() == message.EmptyId {
return nil
}
// add a new anchor point if its issuance time is newer than any other anchor point
id := msg.Id()
if !anchorPoints.add(id, msg.IssuingTime()) {
return nil
}
return &id
}
// checks whether an anchor point message became solid.
// if all anchor points became solid, it sets the node's state to synchronized.
func checkAnchorPointSolidity(anchorPoints *anchorpoints, msg *message.Message) (bool, *message.Id) {
anchorPoints.Lock()
defer anchorPoints.Unlock()
if synced.Load() || len(anchorPoints.ids) == 0 {
return false, nil
}
// check whether an anchor message become solid
msgID := msg.Id()
if !anchorPoints.has(msgID) {
return false, nil
}
// an anchor became solid
anchorPoints.markAsSolidified(msgID)
if !anchorPoints.wereAllSolidified() {
return false, &msgID
}
// all anchor points have become solid
return true, &msgID
}
func newAnchorPoints(wantedAnchorPointsCount int) *anchorpoints {
return &anchorpoints{
ids: make(map[message.Id]time.Time),
wanted: wantedAnchorPointsCount,
}
}
// anchorpoints are a set of messages which we use to determine whether the node has become synchronized.
type anchorpoints struct {
sync.Mutex
// the ids of the anchor points with their addition time.
ids map[message.Id]time.Time
// the wanted amount of anchor points which should become solid.
wanted int
// how many anchor points have been solidified.
solidified int
// holds the highest issuance time of any message which was an anchor point.
// this is used to determine whether further attached messages should become an
// anchor point by matching their issuance time against this time.
issuanceTimeThreshold time.Time
}
// adds the given message to the anchor points set if its issuance time is newer than
// any other existing anchor point's.
func (ap *anchorpoints) add(id message.Id, issuanceTime time.Time) bool {
if !ap.issuanceTimeThreshold.IsZero() &&
ap.issuanceTimeThreshold.Add(-issuanceThresholdBeforeTimeBoundary).After(issuanceTime) {
return false
}
ap.ids[id] = time.Now()
ap.issuanceTimeThreshold = issuanceTime
return true
}
func (ap *anchorpoints) has(id message.Id) bool {
_, has := ap.ids[id]
return has
}
// marks the given anchor point as solidified which removes it from the set and bumps the solidified count.
func (ap *anchorpoints) markAsSolidified(id message.Id) {
delete(ap.ids, id)
ap.solidified++
}
// tells whether the anchor points set was filled at some point.
func (ap *anchorpoints) wasFilled() bool {
return ap.collectedCount() == ap.wanted
}
// tells whether all anchor points have become solid.
func (ap *anchorpoints) wereAllSolidified() bool {
return ap.solidified == ap.wanted
}
// tells the number of effectively collected anchor points.
func (ap *anchorpoints) collectedCount() int {
// since an anchor point potentially was solidified before the set became full,
// we need to incorporate that count too
return ap.solidified + len(ap.ids)
}
package syncbeacon
package payload
import (
"github.com/iotaledger/goshimmer/packages/binary/messagelayer/payload"
......
package syncbeacon
package payload
import (
"github.com/stretchr/testify/assert"
"testing"
"time"
"github.com/stretchr/testify/assert"
)
func TestPayload(t *testing.T) {
......
package syncbeacon
import (
"sync"
"time"
"github.com/iotaledger/goshimmer/packages/shutdown"
"github.com/iotaledger/goshimmer/plugins/config"
"github.com/iotaledger/goshimmer/plugins/messagelayer"
"github.com/iotaledger/goshimmer/plugins/issuer"
"github.com/iotaledger/goshimmer/plugins/syncbeacon/payload"
"github.com/iotaledger/goshimmer/plugins/syncbeaconfollower"
"github.com/iotaledger/hive.go/daemon"
"github.com/iotaledger/hive.go/logger"
"github.com/iotaledger/hive.go/node"
flag "github.com/spf13/pflag"
"sync"
"time"
)
const (
// PluginName is the plugin name of the sync beacon plugin.
PluginName = "Sync Beacon"
PluginName = "SyncBeacon"
// CfgSyncBeaconBroadcastIntervalSec is the interval in seconds at which the node broadcasts its sync status.
CfgSyncBeaconBroadcastIntervalSec = "syncbeacon.broadcastInterval"
// CfgSyncBeaconStartSynced defines whether to start the sync beacon in synced mode so it can issue an initial sync beacon message.
CfgSyncBeaconStartSynced = "syncbeacon.startSynced"
)
func init() {
flag.Int(CfgSyncBeaconBroadcastIntervalSec, 30, "the interval at which the node will broadcast ist sync status")
flag.Int(CfgSyncBeaconBroadcastIntervalSec, 30, "the interval at which the node will broadcast its sync status")
flag.Bool(CfgSyncBeaconStartSynced, false, "set node to start as synced so it can issue an initial sync beacon message")
}
var (
......@@ -42,17 +49,26 @@ func Plugin() *node.Plugin {
// configure events
func configure(_ *node.Plugin) {
log = logger.NewLogger(PluginName)
log.Infof("starting node as sync beacon")
if config.Node().GetBool(CfgSyncBeaconStartSynced) {
syncbeaconfollower.OverwriteSyncedState(true)
log.Infof("overwriting synced state to 'true'")
}
}
// broadcastSyncBeaconPayload broadcasts a sync beacon via communication layer.
func broadcastSyncBeaconPayload() {
syncBeaconPayload := NewSyncBeaconPayload(time.Now().UnixNano())
msg, err := messagelayer.MessageFactory().IssuePayload(syncBeaconPayload)
syncBeaconPayload := payload.NewSyncBeaconPayload(time.Now().UnixNano())
msg, err := issuer.IssuePayload(syncBeaconPayload)
if err != nil {
log.Infof("error issuing sync beacon. %w", err)
} else {
log.Infof("issued sync beacon %s", msg.Id())
log.Warnf("error issuing sync beacon: %w", err)
return
}
log.Debugf("issued sync beacon %s", msg.Id())
}
func run(_ *node.Plugin) {
......
......@@ -2,13 +2,15 @@ package syncbeaconfollower
import (
"errors"
"sync"
"time"
"github.com/iotaledger/goshimmer/packages/binary/messagelayer/message"
"github.com/iotaledger/goshimmer/packages/binary/messagelayer/tangle"
"github.com/iotaledger/goshimmer/packages/shutdown"
"github.com/iotaledger/goshimmer/plugins/config"
"github.com/iotaledger/goshimmer/plugins/messagelayer"
"github.com/iotaledger/goshimmer/plugins/sync"
"github.com/iotaledger/goshimmer/plugins/syncbeacon"
syncbeacon_payload "github.com/iotaledger/goshimmer/plugins/syncbeacon/payload"
"github.com/iotaledger/hive.go/crypto/ed25519"
"github.com/iotaledger/hive.go/daemon"
"github.com/iotaledger/hive.go/events"
......@@ -16,13 +18,12 @@ import (
"github.com/iotaledger/hive.go/node"
"github.com/mr-tron/base58"
flag "github.com/spf13/pflag"
goSync "sync"
"time"
"go.uber.org/atomic"
)
const (
// PluginName is the plugin name of the sync beacon plugin.
PluginName = "Sync Beacon Follower"
PluginName = "SyncBeaconFollower"
// CfgSyncBeaconFollowNodes defines the list of nodes this node should follow to determine its sync status.
CfgSyncBeaconFollowNodes = "syncbeaconfollower.followNodes"
......@@ -36,18 +37,20 @@ const (
// CfgSyncBeaconCleanupInterval defines the interval that old beacon status are cleaned up.
CfgSyncBeaconCleanupInterval = "syncbeaconfollower.cleanupInterval"
// The percentage of following nodes that have to be synced.
syncPercentage = 0.6
// syncPercentage defines the percentage of following nodes that have to be synced.
syncPercentage = 0.5
)
// beaconSync represents the beacon payload and the sync status.
type beaconSync struct {
payload *syncbeacon.Payload
synced bool
// Status represents the status of a beacon node consisting of latest messageID, sentTime and sync status.
type Status struct {
MsgID message.Id
SentTime int64
Synced bool
}
func init() {
flag.StringSlice(CfgSyncBeaconFollowNodes, []string{"Gm7W191NDnqyF7KJycZqK7V6ENLwqxTwoKQN4SmpkB24", "9DB3j9cWYSuEEtkvanrzqkzCQMdH1FGv3TawJdVbDxkd"}, "list of trusted nodes to follow their sync status")
flag.Int(CfgSyncBeaconMaxTimeWindowSec, 10, "the maximum time window for which a sync payload would be considerable")
flag.Int(CfgSyncBeaconMaxTimeOfflineSec, 40, "the maximum time the node should stay synced without receiving updates")
flag.Int(CfgSyncBeaconCleanupInterval, 10, "the interval at which cleanups are done")
......@@ -56,35 +59,80 @@ func init() {
var (
// plugin is the plugin instance of the sync beacon plugin.
plugin *node.Plugin
once goSync.Once
once sync.Once
log *logger.Logger
currentBeacons map[ed25519.PublicKey]*beaconSync
currentBeacons map[ed25519.PublicKey]*Status
currentBeaconPubKeys map[ed25519.PublicKey]string
mutex goSync.RWMutex
mutex sync.RWMutex
beaconMaxTimeOfflineSec float64
)
beaconMaxTimeWindowSec float64
// tells whether the node is synced or not.
synced atomic.Bool
var (
// ErrMissingFollowNodes is returned if the node starts with no follow nodes list
ErrMissingFollowNodes = errors.New("follow nodes list is required")
// ErrNodeNotSynchronized is returned when an operation can't be executed because
// the node is not synchronized.
ErrNodeNotSynchronized = errors.New("node is not synchronized")
)
// Plugin gets the plugin instance.
func Plugin() *node.Plugin {
once.Do(func() {
plugin = node.NewPlugin(PluginName, node.Disabled, configure, run)
plugin = node.NewPlugin(PluginName, node.Enabled, configure, run)
})
return plugin
}
// configure events
// Synced tells whether the node is in a state we consider synchronized, meaning
// it has the relevant past and present message data. The synchronized state is
// defined by following a certain set of sync beacon nodes where for each of these
// beacons a message needs to become solid within bounded time.
func Synced() bool {
return synced.Load()
}
// SyncStatus returns the detailed status per beacon node.
func SyncStatus() (bool, map[ed25519.PublicKey]Status) {
mutex.RLock()
defer mutex.RUnlock()
beacons := make(map[ed25519.PublicKey]Status)
for publicKey, status := range currentBeacons {
beacons[publicKey] = Status{
MsgID: status.MsgID,
SentTime: status.SentTime,
Synced: status.Synced,
}
}
return Synced(), beacons
}
// MarkSynced marks the node as synced.
func MarkSynced() {
synced.Store(true)
}
// MarkDesynced marks the node as desynced.
func MarkDesynced() {
synced.Store(false)
}
// OverwriteSyncedState overwrites the synced state with the given value.
func OverwriteSyncedState(syncedOverwrite bool) {
synced.Store(syncedOverwrite)
}
// configure plugin
func configure(_ *node.Plugin) {
pubKeys := config.Node().GetStringSlice(CfgSyncBeaconFollowNodes)
beaconMaxTimeOfflineSec = float64(config.Node().GetInt(CfgSyncBeaconMaxTimeOfflineSec))
beaconMaxTimeWindowSec = float64(config.Node().GetInt(CfgSyncBeaconMaxTimeWindowSec))
log = logger.NewLogger(PluginName)
currentBeacons = make(map[ed25519.PublicKey]*beaconSync, len(pubKeys))
currentBeaconPubKeys = make(map[ed25519.PublicKey]string, len(pubKeys))
currentBeacons = make(map[ed25519.PublicKey]*Status)
currentBeaconPubKeys = make(map[ed25519.PublicKey]string)
for _, str := range pubKeys {
bytes, err := base58.Decode(str)
......@@ -97,9 +145,10 @@ func configure(_ *node.Plugin) {
log.Warnf("%s is not a valid public key: %w", err)
continue
}
currentBeacons[pubKey] = &beaconSync{
payload: nil,
synced: false,
currentBeacons[pubKey] = &Status{
MsgID: message.EmptyId,
Synced: false,
SentTime: 0,
}
currentBeaconPubKeys[pubKey] = str
}
......@@ -111,17 +160,17 @@ func configure(_ *node.Plugin) {
cachedMessageMetadata.Release()
cachedMessage.Consume(func(msg *message.Message) {
messagePayload := msg.Payload()
if messagePayload.Type() != syncbeacon.Type {
if messagePayload.Type() != syncbeacon_payload.Type {
return
}
payload, ok := messagePayload.(*syncbeacon.Payload)
payload, ok := messagePayload.(*syncbeacon_payload.Payload)
if !ok {
log.Info("could not cast payload to sync beacon object")
return
}
//check if issuer is in configured beacon follow list
// check if issuer is in configured beacon follow list
if _, ok := currentBeaconPubKeys[msg.IssuerPublicKey()]; !ok {
return
}
......@@ -132,56 +181,52 @@ func configure(_ *node.Plugin) {
}
// handlePayload handles the received payload. It does the following checks:
// It checks that the issuer of the payload is a followed node.
// The time that payload was sent is not greater than CfgSyncBeaconMaxTimeWindowSec. If the duration is longer than CfgSyncBeaconMaxTimeWindowSec, we consider that beacon to be out of sync till we receive a newer payload.
// The time that payload was sent is not greater than CfgSyncBeaconMaxTimeWindowSec. If the duration is longer than CfgSyncBeaconMaxTimeWindowSec, we consider that beacon to be out of sync till we receive a newer payload.
// More than syncPercentage of followed nodes are also synced, the node is set to synced. Otherwise, its set as desynced.
func handlePayload(syncBeaconPayload *syncbeacon.Payload, issuerPublicKey ed25519.PublicKey, msgId message.Id) {
func handlePayload(syncBeaconPayload *syncbeacon_payload.Payload, issuerPublicKey ed25519.PublicKey, msgID message.Id) {
mutex.Lock()
defer mutex.Unlock()
synced := true
dur := time.Since(time.Unix(0, syncBeaconPayload.SentTime()))
if dur.Seconds() > float64(config.Node().GetInt(CfgSyncBeaconMaxTimeWindowSec)) {
log.Infof("sync beacon %s, received from %s is too old.", msgId, issuerPublicKey)
if dur.Seconds() > beaconMaxTimeWindowSec {
log.Infof("sync beacon %s, received from %s is too old.", msgID, issuerPublicKey)
synced = false
}
currentBeacons[issuerPublicKey].synced = synced
currentBeacons[issuerPublicKey].payload = syncBeaconPayload
currentBeacons[issuerPublicKey].Synced = synced
currentBeacons[issuerPublicKey].MsgID = msgID
currentBeacons[issuerPublicKey].SentTime = syncBeaconPayload.SentTime()
updateSynced()
}
// updateSynced checks the beacon nodes and update the nodes sync status
func updateSynced() {
beaconNodesSyncedCount := 0.0
for _, beaconSync := range currentBeacons {
if beaconSync.synced {
for _, status := range currentBeacons {
if status.Synced {
beaconNodesSyncedCount++
}
}
synced := true
var globalSynced bool
if len(currentBeacons) > 0 {
synced = beaconNodesSyncedCount/float64(len(currentBeacons)) >= syncPercentage
globalSynced = beaconNodesSyncedCount/float64(len(currentBeacons)) >= syncPercentage
}
// TODO: get rid of sync plugin
if synced {
sync.MarkSynced()
} else {
sync.MarkDesynced()
}
OverwriteSyncedState(globalSynced)
}
// cleanupFollowNodes cleans up offline nodes by setting their sync status to false after a configurable time window.
func cleanupFollowNodes() {
mutex.Lock()
defer mutex.Unlock()
for publicKey, beaconSync := range currentBeacons {
if beaconSync.payload != nil {
dur := time.Since(time.Unix(0, beaconSync.payload.SentTime()))
for publicKey, status := range currentBeacons {
if status.MsgID != message.EmptyId {
dur := time.Since(time.Unix(0, status.SentTime))
if dur.Seconds() > beaconMaxTimeOfflineSec {
beaconSync.synced = false
currentBeacons[publicKey] = beaconSync
currentBeacons[publicKey].Synced = false
}
}
}
......
......@@ -5,7 +5,7 @@ import (
goSync "sync"
"github.com/iotaledger/goshimmer/plugins/gossip"
"github.com/iotaledger/goshimmer/plugins/sync"
"github.com/iotaledger/goshimmer/plugins/syncbeaconfollower"
"github.com/iotaledger/goshimmer/plugins/webapi"
"github.com/iotaledger/hive.go/node"
"github.com/labstack/echo"
......@@ -43,7 +43,7 @@ func getHealthz(c echo.Context) error {
// IsNodeHealthy returns whether the node is synced, has active neighbors.
func IsNodeHealthy() bool {
// Synced
if !sync.Synced() {
if !syncbeaconfollower.Synced() {
return false
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment