diff --git a/.github/workflows/integration-tests.yml b/.github/workflows/integration-tests.yml index 4a5167673b90ecfaae1a2f313254d38490e4f3df..bb1cb633e03c81f4811d14839f9703b34155188f 100644 --- a/.github/workflows/integration-tests.yml +++ b/.github/workflows/integration-tests.yml @@ -247,3 +247,37 @@ jobs: with: name: ${{ env.TEST_NAME }} path: tools/integration-tests/logs + + syncbeacon: + name: syncbeacon + env: + TEST_NAME: syncbeacon + runs-on: ubuntu-latest + steps: + + - name: Check out code + uses: actions/checkout@v2 + + - name: Build GoShimmer image + run: docker build -t iotaledger/goshimmer . + + - name: Pull additional Docker images + run: | + docker pull angelocapossele/drand:latest + docker pull gaiaadm/pumba:latest + docker pull gaiadocker/iproute2:latest + + - name: Run integration tests + run: docker-compose -f tools/integration-tests/tester/docker-compose.yml up --abort-on-container-exit --exit-code-from tester --build + + - name: Create logs from tester + if: always() + run: | + docker logs tester &> tools/integration-tests/logs/tester.log + + - name: Save logs as artifacts + if: always() + uses: actions/upload-artifact@v1 + with: + name: ${{ env.TEST_NAME }} + path: tools/integration-tests/logs diff --git a/pluginmgr/core/plugins.go b/pluginmgr/core/plugins.go index 03191820233abc9204e3984a79342ad8c5787065..4643599af7822b5774395665463475748a68d16e 100644 --- a/pluginmgr/core/plugins.go +++ b/pluginmgr/core/plugins.go @@ -20,6 +20,8 @@ import ( "github.com/iotaledger/goshimmer/plugins/pow" "github.com/iotaledger/goshimmer/plugins/profiling" "github.com/iotaledger/goshimmer/plugins/sync" + "github.com/iotaledger/goshimmer/plugins/syncbeacon" + "github.com/iotaledger/goshimmer/plugins/syncbeaconfollower" "github.com/iotaledger/hive.go/node" ) @@ -44,4 +46,6 @@ var PLUGINS = node.Plugins( drng.Plugin(), faucet.App(), valuetransfers.App(), + syncbeacon.Plugin(), + syncbeaconfollower.Plugin(), ) diff --git a/plugins/sync/plugin.go b/plugins/sync/plugin.go index d29842dd74e5967daec7784f4638a434c27701a6..afab08e6bddcbaa094c81d0ea479db92442f6987 100644 --- a/plugins/sync/plugin.go +++ b/plugins/sync/plugin.go @@ -101,16 +101,35 @@ func run(_ *node.Plugin) { monitorForDesynchronization() } -// marks the node as synced and spawns the background worker to monitor desynchronization. -func markSynced() { +// MarkSynced marks the node as synced and spawns the background worker to monitor desynchronization. +func MarkSynced() { synced.Store(true) - monitorForDesynchronization() + isRunning := false + for _, worker := range daemon.GetRunningBackgroundWorkers() { + if worker == "Desync-Monitor" { + isRunning = true + break + } + } + if !isRunning { + monitorForDesynchronization() + } } -// marks the node as desynced and spawns the background worker to monitor synchronization. -func markDesynced() { +// MarkDesynced marks the node as desynced and spawns the background worker to monitor synchronization. +func MarkDesynced() { synced.Store(false) - monitorForSynchronization() + isRunning := false + for _, worker := range daemon.GetRunningBackgroundWorkers() { + if worker == "Sync-Monitor" { + isRunning = true + break + } + } + + if !isRunning { + monitorForSynchronization() + } } // starts a background worker and event handlers to check whether the node is desynchronized by checking @@ -166,12 +185,12 @@ func monitorForDesynchronization() { case <-timer.C: log.Infof("no message received in %d seconds, marking node as desynced", int(timeForDesync.Seconds())) - markDesynced() + MarkDesynced() return case <-noPeers: log.Info("all peers have been lost, marking node as desynced") - markDesynced() + MarkDesynced() return case <-shutdownSignal: @@ -241,7 +260,7 @@ func monitorForSynchronization() { return case <-synced: log.Infof("all anchor messages have become solid, marking node as synced") - markSynced() + MarkSynced() return } } diff --git a/plugins/syncbeacon/payload.go b/plugins/syncbeacon/payload.go new file mode 100644 index 0000000000000000000000000000000000000000..fb61e04d1e92306c72d2fe12c23f3fe4d269285d --- /dev/null +++ b/plugins/syncbeacon/payload.go @@ -0,0 +1,117 @@ +package syncbeacon + +import ( + "github.com/iotaledger/goshimmer/packages/binary/messagelayer/payload" + "github.com/iotaledger/hive.go/marshalutil" + "github.com/iotaledger/hive.go/stringify" +) + +const ( + // ObjectName defines the name of the syncbeacon object. + ObjectName = "syncbeacon" +) + +// Type is the type of the syncbeacon payload. +var Type = payload.Type(200) + +// Payload represents the syncbeacon payload +type Payload struct { + payloadType payload.Type + sentTime int64 +} + +// NewSyncBeaconPayload creates a new syncbeacon payload +func NewSyncBeaconPayload(sentTime int64) *Payload { + return &Payload{ + payloadType: Type, + sentTime: sentTime, + } +} + +// FromBytes parses the marshaled version of a Payload into an object. +// It either returns a new Payload or fills an optionally provided Payload with the parsed information. +func FromBytes(bytes []byte, optionalTargetObject ...*Payload) (result *Payload, err error, consumedBytes int) { + // determine the target object that will hold the unmarshaled information + switch len(optionalTargetObject) { + case 0: + result = &Payload{} + case 1: + result = optionalTargetObject[0] + default: + panic("too many arguments in call to FromBytes") + } + + // initialize helper + marshalUtil := marshalutil.New(bytes) + + // read data + result.payloadType, err = marshalUtil.ReadUint32() + if err != nil { + return + } + _, err = marshalUtil.ReadUint32() + if err != nil { + return + } + result.sentTime, err = marshalUtil.ReadInt64() + if err != nil { + return + } + + // return the number of bytes we processed + consumedBytes = marshalUtil.ReadOffset() + + return +} + +// Type returns the type of the Payload. +func (p *Payload) Type() payload.Type { + return p.payloadType +} + +// SentTime returns the time that payload was sent. +func (p *Payload) SentTime() int64 { + return p.sentTime +} + +// Bytes marshals the syncbeacon payload into a sequence of bytes. +func (p *Payload) Bytes() []byte { + // initialize helper + marshalUtil := marshalutil.New() + objectLength := marshalutil.INT64_SIZE + + // marshal the p specific information + marshalUtil.WriteUint32(Type) + marshalUtil.WriteUint32(uint32(objectLength)) + marshalUtil.WriteInt64(p.sentTime) + + // return result + return marshalUtil.Bytes() +} + +// Unmarshal unmarshals a given slice of bytes and fills the object. +func (p *Payload) Unmarshal(data []byte) (err error) { + _, err, _ = FromBytes(data, p) + + return +} + +// String returns a human readable version of syncbeacon payload (for debug purposes). +func (p *Payload) String() string { + return stringify.Struct("syncBeaconPayload", + stringify.StructField("sentTime", p.sentTime)) +} + +// IsSyncBeaconPayload checks if the message is sync beacon payload. +func IsSyncBeaconPayload(p *Payload) bool { + return p.Type() == Type +} + +func init() { + payload.RegisterType(Type, ObjectName, func(data []byte) (payload payload.Payload, err error) { + payload = &Payload{} + err = payload.Unmarshal(data) + + return + }) +} diff --git a/plugins/syncbeacon/payload_test.go b/plugins/syncbeacon/payload_test.go new file mode 100644 index 0000000000000000000000000000000000000000..e53381c2eda93fff176265527599bb4502231fb7 --- /dev/null +++ b/plugins/syncbeacon/payload_test.go @@ -0,0 +1,31 @@ +package syncbeacon + +import ( + "github.com/stretchr/testify/assert" + "testing" + "time" +) + +func TestPayload(t *testing.T) { + originalPayload := NewSyncBeaconPayload(time.Now().UnixNano()) + clonedPayload1, err, _ := FromBytes(originalPayload.Bytes()) + if err != nil { + panic(err) + } + + assert.Equal(t, originalPayload.SentTime(), clonedPayload1.SentTime()) + + clonedPayload2, err, _ := FromBytes(clonedPayload1.Bytes()) + if err != nil { + panic(err) + } + + assert.Equal(t, originalPayload.SentTime(), clonedPayload2.SentTime()) +} + +func TestIsSyncBeaconPayload(t *testing.T) { + p := NewSyncBeaconPayload(time.Now().UnixNano()) + + isSyncBeaconPayload := IsSyncBeaconPayload(p) + assert.True(t, isSyncBeaconPayload) +} diff --git a/plugins/syncbeacon/plugin.go b/plugins/syncbeacon/plugin.go new file mode 100644 index 0000000000000000000000000000000000000000..ee6706e9758184325e32c287eaba6394b9f124e1 --- /dev/null +++ b/plugins/syncbeacon/plugin.go @@ -0,0 +1,73 @@ +package syncbeacon + +import ( + "github.com/iotaledger/goshimmer/packages/shutdown" + "github.com/iotaledger/goshimmer/plugins/config" + "github.com/iotaledger/goshimmer/plugins/messagelayer" + "github.com/iotaledger/hive.go/daemon" + "github.com/iotaledger/hive.go/logger" + "github.com/iotaledger/hive.go/node" + flag "github.com/spf13/pflag" + "sync" + "time" +) + +const ( + // PluginName is the plugin name of the sync beacon plugin. + PluginName = "Sync Beacon" + + // CfgSyncBeaconBroadcastIntervalSec is the interval in seconds at which the node broadcasts its sync status. + CfgSyncBeaconBroadcastIntervalSec = "syncbeacon.broadcastInterval" +) + +func init() { + flag.Int(CfgSyncBeaconBroadcastIntervalSec, 30, "the interval at which the node will broadcast ist sync status") +} + +var ( + // plugin is the plugin instance of the sync beacon plugin. + plugin *node.Plugin + once sync.Once + log *logger.Logger +) + +// Plugin gets the plugin instance. +func Plugin() *node.Plugin { + once.Do(func() { + plugin = node.NewPlugin(PluginName, node.Disabled, configure, run) + }) + return plugin +} + +// configure events +func configure(_ *node.Plugin) { + log = logger.NewLogger(PluginName) +} + +// broadcastSyncBeaconPayload broadcasts a sync beacon via communication layer. +func broadcastSyncBeaconPayload() { + syncBeaconPayload := NewSyncBeaconPayload(time.Now().UnixNano()) + msg, err := messagelayer.MessageFactory().IssuePayload(syncBeaconPayload) + if err != nil { + log.Infof("error issuing sync beacon. %w", err) + } else { + log.Infof("issued sync beacon %s", msg.Id()) + } +} + +func run(_ *node.Plugin) { + if err := daemon.BackgroundWorker("Sync-Beacon", func(shutdownSignal <-chan struct{}) { + ticker := time.NewTicker(config.Node().GetDuration(CfgSyncBeaconBroadcastIntervalSec) * time.Second) + defer ticker.Stop() + for { + select { + case <-ticker.C: + broadcastSyncBeaconPayload() + case <-shutdownSignal: + return + } + } + }, shutdown.PrioritySynchronization); err != nil { + log.Panicf("Failed to start as daemon: %s", err) + } +} diff --git a/plugins/syncbeaconfollower/plugin.go b/plugins/syncbeaconfollower/plugin.go new file mode 100644 index 0000000000000000000000000000000000000000..037e8403507895fa6df3968a8c35e33f175ad5f0 --- /dev/null +++ b/plugins/syncbeaconfollower/plugin.go @@ -0,0 +1,206 @@ +package syncbeaconfollower + +import ( + "errors" + "github.com/iotaledger/goshimmer/packages/binary/messagelayer/message" + "github.com/iotaledger/goshimmer/packages/binary/messagelayer/tangle" + "github.com/iotaledger/goshimmer/packages/shutdown" + "github.com/iotaledger/goshimmer/plugins/config" + "github.com/iotaledger/goshimmer/plugins/messagelayer" + "github.com/iotaledger/goshimmer/plugins/sync" + "github.com/iotaledger/goshimmer/plugins/syncbeacon" + "github.com/iotaledger/hive.go/crypto/ed25519" + "github.com/iotaledger/hive.go/daemon" + "github.com/iotaledger/hive.go/events" + "github.com/iotaledger/hive.go/logger" + "github.com/iotaledger/hive.go/node" + "github.com/mr-tron/base58" + flag "github.com/spf13/pflag" + goSync "sync" + "time" +) + +const ( + // PluginName is the plugin name of the sync beacon plugin. + PluginName = "Sync Beacon Follower" + + // CfgSyncBeaconFollowNodes defines the list of nodes this node should follow to determine its sync status. + CfgSyncBeaconFollowNodes = "syncbeaconfollower.followNodes" + + // CfgSyncBeaconMaxTimeWindowSec defines the maximum time window for which a sync payload would be considerable. + CfgSyncBeaconMaxTimeWindowSec = "syncbeaconfollower.maxTimeWindowSec" + + // CfgSyncBeaconMaxTimeOfflineSec defines the maximum time a beacon node can stay without receiving updates. + CfgSyncBeaconMaxTimeOfflineSec = "syncbeaconfollower.maxTimeOffline" + + // CfgSyncBeaconCleanupInterval defines the interval that old beacon status are cleaned up. + CfgSyncBeaconCleanupInterval = "syncbeaconfollower.cleanupInterval" + + // The percentage of following nodes that have to be synced. + syncPercentage = 0.6 +) + +// beaconSync represents the beacon payload and the sync status. +type beaconSync struct { + payload *syncbeacon.Payload + synced bool +} + +func init() { + flag.StringSlice(CfgSyncBeaconFollowNodes, []string{"Gm7W191NDnqyF7KJycZqK7V6ENLwqxTwoKQN4SmpkB24", "9DB3j9cWYSuEEtkvanrzqkzCQMdH1FGv3TawJdVbDxkd"}, "list of trusted nodes to follow their sync status") + flag.Int(CfgSyncBeaconMaxTimeWindowSec, 10, "the maximum time window for which a sync payload would be considerable") + flag.Int(CfgSyncBeaconMaxTimeOfflineSec, 40, "the maximum time the node should stay synced without receiving updates") + flag.Int(CfgSyncBeaconCleanupInterval, 10, "the interval at which cleanups are done") +} + +var ( + // plugin is the plugin instance of the sync beacon plugin. + plugin *node.Plugin + once goSync.Once + log *logger.Logger + currentBeacons map[ed25519.PublicKey]*beaconSync + currentBeaconPubKeys map[ed25519.PublicKey]string + mutex goSync.RWMutex + beaconMaxTimeOfflineSec float64 +) + +var ( + // ErrMissingFollowNodes is returned if the node starts with no follow nodes list + ErrMissingFollowNodes = errors.New("follow nodes list is required") +) + +// Plugin gets the plugin instance. +func Plugin() *node.Plugin { + once.Do(func() { + plugin = node.NewPlugin(PluginName, node.Disabled, configure, run) + }) + return plugin +} + +// configure events +func configure(_ *node.Plugin) { + pubKeys := config.Node().GetStringSlice(CfgSyncBeaconFollowNodes) + beaconMaxTimeOfflineSec = float64(config.Node().GetInt(CfgSyncBeaconMaxTimeOfflineSec)) + log = logger.NewLogger(PluginName) + + currentBeacons = make(map[ed25519.PublicKey]*beaconSync, len(pubKeys)) + currentBeaconPubKeys = make(map[ed25519.PublicKey]string, len(pubKeys)) + + for _, str := range pubKeys { + bytes, err := base58.Decode(str) + if err != nil { + log.Warnf("error decoding public key: %w", err) + continue + } + pubKey, _, err := ed25519.PublicKeyFromBytes(bytes) + if err != nil { + log.Warnf("%s is not a valid public key: %w", err) + continue + } + currentBeacons[pubKey] = &beaconSync{ + payload: nil, + synced: false, + } + currentBeaconPubKeys[pubKey] = str + } + if len(currentBeaconPubKeys) == 0 { + log.Panicf("Follow node list cannot be empty: %w", ErrMissingFollowNodes) + } + + messagelayer.Tangle().Events.MessageSolid.Attach(events.NewClosure(func(cachedMessage *message.CachedMessage, cachedMessageMetadata *tangle.CachedMessageMetadata) { + cachedMessageMetadata.Release() + cachedMessage.Consume(func(msg *message.Message) { + messagePayload := msg.Payload() + if messagePayload.Type() != syncbeacon.Type { + return + } + + payload, ok := messagePayload.(*syncbeacon.Payload) + if !ok { + log.Info("could not cast payload to sync beacon object") + return + } + + //check if issuer is in configured beacon follow list + if _, ok := currentBeaconPubKeys[msg.IssuerPublicKey()]; !ok { + return + } + + handlePayload(payload, msg.IssuerPublicKey(), msg.Id()) + }) + })) +} + +// handlePayload handles the received payload. It does the following checks: +// It checks that the issuer of the payload is a followed node. +// The time that payload was sent is not greater than CfgSyncBeaconMaxTimeWindowSec. If the duration is longer than CfgSyncBeaconMaxTimeWindowSec, we consider that beacon to be out of sync till we receive a newer payload. +// More than syncPercentage of followed nodes are also synced, the node is set to synced. Otherwise, its set as desynced. +func handlePayload(syncBeaconPayload *syncbeacon.Payload, issuerPublicKey ed25519.PublicKey, msgId message.Id) { + mutex.Lock() + defer mutex.Unlock() + + synced := true + dur := time.Since(time.Unix(0, syncBeaconPayload.SentTime())) + if dur.Seconds() > float64(config.Node().GetInt(CfgSyncBeaconMaxTimeWindowSec)) { + log.Infof("sync beacon %s, received from %s is too old.", msgId, issuerPublicKey) + synced = false + } + + currentBeacons[issuerPublicKey].synced = synced + currentBeacons[issuerPublicKey].payload = syncBeaconPayload + updateSynced() +} + +// updateSynced checks the beacon nodes and update the nodes sync status +func updateSynced() { + beaconNodesSyncedCount := 0.0 + for _, beaconSync := range currentBeacons { + if beaconSync.synced { + beaconNodesSyncedCount++ + } + } + synced := true + if len(currentBeacons) > 0 { + synced = beaconNodesSyncedCount/float64(len(currentBeacons)) >= syncPercentage + } + + // TODO: get rid of sync plugin + if synced { + sync.MarkSynced() + } else { + sync.MarkDesynced() + } +} + +// cleanupFollowNodes cleans up offline nodes by setting their sync status to false after a configurable time window. +func cleanupFollowNodes() { + mutex.Lock() + defer mutex.Unlock() + for publicKey, beaconSync := range currentBeacons { + if beaconSync.payload != nil { + dur := time.Since(time.Unix(0, beaconSync.payload.SentTime())) + if dur.Seconds() > beaconMaxTimeOfflineSec { + beaconSync.synced = false + currentBeacons[publicKey] = beaconSync + } + } + } + updateSynced() +} + +func run(_ *node.Plugin) { + if err := daemon.BackgroundWorker("Sync-Beacon-Cleanup", func(shutdownSignal <-chan struct{}) { + ticker := time.NewTicker(config.Node().GetDuration(CfgSyncBeaconCleanupInterval) * time.Second) + defer ticker.Stop() + for { + select { + case <-ticker.C: + cleanupFollowNodes() + case <-shutdownSignal: + return + } + } + }, shutdown.PrioritySynchronization); err != nil { + log.Panicf("Failed to start as daemon: %s", err) + } +} diff --git a/tools/integration-tests/runTests.sh b/tools/integration-tests/runTests.sh index da9004da397c9c8ca8e07bfa4447bb64636be337..87f56ea3f0706a4863f98bd8bb5cc1ac3541e4be 100755 --- a/tools/integration-tests/runTests.sh +++ b/tools/integration-tests/runTests.sh @@ -1,6 +1,6 @@ #!/bin/bash -TEST_NAMES='autopeering common drng message value consensus faucet' +TEST_NAMES='autopeering common drng message value consensus faucet syncbeacon' echo "Build GoShimmer image" docker build -t iotaledger/goshimmer ../../. diff --git a/tools/integration-tests/tester/framework/docker.go b/tools/integration-tests/tester/framework/docker.go index 3bc9b7cb9b9729f78fdf923cb71728574a87fc0c..e6052c884cc9e89a694436d38cb34683b8b10bcb 100644 --- a/tools/integration-tests/tester/framework/docker.go +++ b/tools/integration-tests/tester/framework/docker.go @@ -95,6 +95,12 @@ func (d *DockerContainer) CreateGoShimmerPeer(config GoShimmerConfig) error { if config.Faucet { plugins = append(plugins, "faucet") } + if config.SyncBeacon { + plugins = append(plugins, "Sync Beacon") + } + if config.SyncBeaconFollower { + plugins = append(plugins, "Sync Beacon Follower") + } return strings.Join(plugins[:], ",") }()), // define the faucet seed in case the faucet dApp is enabled @@ -114,6 +120,8 @@ func (d *DockerContainer) CreateGoShimmerPeer(config GoShimmerConfig) error { fmt.Sprintf("--drng.threshold=%d", config.DRNGThreshold), fmt.Sprintf("--drng.committeeMembers=%s", config.DRNGCommittee), fmt.Sprintf("--drng.distributedPubKey=%s", config.DRNGDistKey), + fmt.Sprintf("--syncbeaconfollower.followNodes=%s", config.SyncBeaconFollowNodes), + fmt.Sprintf("--syncbeacon.broadcastInterval=%d", config.SyncBeaconBroadcastInterval), }, } diff --git a/tools/integration-tests/tester/framework/parameters.go b/tools/integration-tests/tester/framework/parameters.go index 9694ed97ec0c1f404f32583f8fa3305d1807036d..2618c9af2022c02c2f6b2e9a6fab51445a1d0c2b 100644 --- a/tools/integration-tests/tester/framework/parameters.go +++ b/tools/integration-tests/tester/framework/parameters.go @@ -64,7 +64,11 @@ type GoShimmerConfig struct { DRNGInstance int DRNGThreshold int - Faucet bool + Faucet bool + SyncBeacon bool + SyncBeaconFollower bool + SyncBeaconFollowNodes string + SyncBeaconBroadcastInterval int } // NetworkConfig defines the config of a GoShimmer Docker network. diff --git a/tools/integration-tests/tester/tests/syncbeacon/main_test.go b/tools/integration-tests/tester/tests/syncbeacon/main_test.go new file mode 100644 index 0000000000000000000000000000000000000000..721e82efb417c6d108bf2f21b883921ec81ff64b --- /dev/null +++ b/tools/integration-tests/tester/tests/syncbeacon/main_test.go @@ -0,0 +1,24 @@ +package syncbeacon + +import ( + "os" + "testing" + + "github.com/iotaledger/goshimmer/tools/integration-tests/tester/framework" +) + +var f *framework.Framework + +// TestMain gets called by the test utility and is executed before any other test in this package. +// It is therefore used to initialize the integration testing framework. +func TestMain(m *testing.M) { + var err error + f, err = framework.Instance() + if err != nil { + panic(err) + } + + // call the tests + os.Exit(m.Run()) +} + diff --git a/tools/integration-tests/tester/tests/syncbeacon/syncbeacon_test.go b/tools/integration-tests/tester/tests/syncbeacon/syncbeacon_test.go new file mode 100644 index 0000000000000000000000000000000000000000..f416fc85fab49c7d39c4e5b1ac65cd2e81a1ef03 --- /dev/null +++ b/tools/integration-tests/tester/tests/syncbeacon/syncbeacon_test.go @@ -0,0 +1,69 @@ +package syncbeacon + +import ( + "github.com/iotaledger/goshimmer/tools/integration-tests/tester/framework" + "github.com/iotaledger/goshimmer/tools/integration-tests/tester/tests" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "log" + "strings" + "testing" + "time" +) + +// TestSyncBeacon checks that beacon nodes broadcast sync beacons +// and follower nodes use those payloads to determine if they are synced or not. +func TestSyncBeacon(t *testing.T) { + initialPeers := 4 + n, err := f.CreateNetwork("syncbeacon_TestSyncBeacon", 0, 0) + require.NoError(t, err) + defer tests.ShutdownNetwork(t, n) + + // create sync beacon nodes + var beaconPublicKeys []string + for i := 0; i < initialPeers; i++ { + peer, err := n.CreatePeer(framework.GoShimmerConfig{ + SyncBeacon: true, + SyncBeaconBroadcastInterval: 20, + }) + require.NoError(t, err) + beaconPublicKeys = append(beaconPublicKeys, peer.PublicKey().String()) + } + peers := n.Peers() + err = n.WaitForAutopeering(3) + require.NoError(t, err) + + + // beacon follower node to follow all previous nodes + peer, err := n.CreatePeer(framework.GoShimmerConfig{ + SyncBeaconFollower: true, + SyncBeaconFollowNodes: strings.Join(beaconPublicKeys, ","), + }) + require.NoError(t, err) + err = n.WaitForAutopeering(3) + require.NoError(t, err) + + log.Println("Waiting...1/2") + // wait for node to solidify beacon messages + time.Sleep(60 * time.Second) + log.Println("done waiting.") + + resp, err := peer.Info() + require.NoError(t, err) + assert.Truef(t, resp.Synced, "Peer %s should be synced but is desynced!", peer.String()) + + // 2. shutdown all but 2 beacon peers. + for _, p := range peers[:len(peers)-2] { + _ = p.Stop() + } + + // wait for peers to sync and broadcast + log.Println("Waiting...2/2") + time.Sleep(40 * time.Second) + log.Println("done waiting.") + + // expect majority of nodes to not have broadcasted beacons. Hence should be desynced due to cleanup. + resp, err = peer.Info() + require.NoError(t, err) + assert.Falsef(t, resp.Synced, "Peer %s should be desynced but is synced!", peer.String()) +}