Skip to content
Snippets Groups Projects
Unverified Commit 4fb473f2 authored by Acha Bill's avatar Acha Bill Committed by GitHub
Browse files

sync beacon plugin (#644)

* sync beacon

* add integration tests

* save point

* use beacon as anchor point

* review suggestion

* check if monitor is already running before starting

* debug: add logs

* fix tests

* separeate into sync beacon and sync beacon follower

* review fixes

* review changes
parent cf7707db
No related branches found
No related tags found
No related merge requests found
......@@ -247,3 +247,37 @@ jobs:
with:
name: ${{ env.TEST_NAME }}
path: tools/integration-tests/logs
syncbeacon:
name: syncbeacon
env:
TEST_NAME: syncbeacon
runs-on: ubuntu-latest
steps:
- name: Check out code
uses: actions/checkout@v2
- name: Build GoShimmer image
run: docker build -t iotaledger/goshimmer .
- name: Pull additional Docker images
run: |
docker pull angelocapossele/drand:latest
docker pull gaiaadm/pumba:latest
docker pull gaiadocker/iproute2:latest
- name: Run integration tests
run: docker-compose -f tools/integration-tests/tester/docker-compose.yml up --abort-on-container-exit --exit-code-from tester --build
- name: Create logs from tester
if: always()
run: |
docker logs tester &> tools/integration-tests/logs/tester.log
- name: Save logs as artifacts
if: always()
uses: actions/upload-artifact@v1
with:
name: ${{ env.TEST_NAME }}
path: tools/integration-tests/logs
......@@ -20,6 +20,8 @@ import (
"github.com/iotaledger/goshimmer/plugins/pow"
"github.com/iotaledger/goshimmer/plugins/profiling"
"github.com/iotaledger/goshimmer/plugins/sync"
"github.com/iotaledger/goshimmer/plugins/syncbeacon"
"github.com/iotaledger/goshimmer/plugins/syncbeaconfollower"
"github.com/iotaledger/hive.go/node"
)
......@@ -44,4 +46,6 @@ var PLUGINS = node.Plugins(
drng.Plugin(),
faucet.App(),
valuetransfers.App(),
syncbeacon.Plugin(),
syncbeaconfollower.Plugin(),
)
......@@ -101,16 +101,35 @@ func run(_ *node.Plugin) {
monitorForDesynchronization()
}
// marks the node as synced and spawns the background worker to monitor desynchronization.
func markSynced() {
// MarkSynced marks the node as synced and spawns the background worker to monitor desynchronization.
func MarkSynced() {
synced.Store(true)
monitorForDesynchronization()
isRunning := false
for _, worker := range daemon.GetRunningBackgroundWorkers() {
if worker == "Desync-Monitor" {
isRunning = true
break
}
}
if !isRunning {
monitorForDesynchronization()
}
}
// marks the node as desynced and spawns the background worker to monitor synchronization.
func markDesynced() {
// MarkDesynced marks the node as desynced and spawns the background worker to monitor synchronization.
func MarkDesynced() {
synced.Store(false)
monitorForSynchronization()
isRunning := false
for _, worker := range daemon.GetRunningBackgroundWorkers() {
if worker == "Sync-Monitor" {
isRunning = true
break
}
}
if !isRunning {
monitorForSynchronization()
}
}
// starts a background worker and event handlers to check whether the node is desynchronized by checking
......@@ -166,12 +185,12 @@ func monitorForDesynchronization() {
case <-timer.C:
log.Infof("no message received in %d seconds, marking node as desynced", int(timeForDesync.Seconds()))
markDesynced()
MarkDesynced()
return
case <-noPeers:
log.Info("all peers have been lost, marking node as desynced")
markDesynced()
MarkDesynced()
return
case <-shutdownSignal:
......@@ -241,7 +260,7 @@ func monitorForSynchronization() {
return
case <-synced:
log.Infof("all anchor messages have become solid, marking node as synced")
markSynced()
MarkSynced()
return
}
}
......
package syncbeacon
import (
"github.com/iotaledger/goshimmer/packages/binary/messagelayer/payload"
"github.com/iotaledger/hive.go/marshalutil"
"github.com/iotaledger/hive.go/stringify"
)
const (
// ObjectName defines the name of the syncbeacon object.
ObjectName = "syncbeacon"
)
// Type is the type of the syncbeacon payload.
var Type = payload.Type(200)
// Payload represents the syncbeacon payload
type Payload struct {
payloadType payload.Type
sentTime int64
}
// NewSyncBeaconPayload creates a new syncbeacon payload
func NewSyncBeaconPayload(sentTime int64) *Payload {
return &Payload{
payloadType: Type,
sentTime: sentTime,
}
}
// FromBytes parses the marshaled version of a Payload into an object.
// It either returns a new Payload or fills an optionally provided Payload with the parsed information.
func FromBytes(bytes []byte, optionalTargetObject ...*Payload) (result *Payload, err error, consumedBytes int) {
// determine the target object that will hold the unmarshaled information
switch len(optionalTargetObject) {
case 0:
result = &Payload{}
case 1:
result = optionalTargetObject[0]
default:
panic("too many arguments in call to FromBytes")
}
// initialize helper
marshalUtil := marshalutil.New(bytes)
// read data
result.payloadType, err = marshalUtil.ReadUint32()
if err != nil {
return
}
_, err = marshalUtil.ReadUint32()
if err != nil {
return
}
result.sentTime, err = marshalUtil.ReadInt64()
if err != nil {
return
}
// return the number of bytes we processed
consumedBytes = marshalUtil.ReadOffset()
return
}
// Type returns the type of the Payload.
func (p *Payload) Type() payload.Type {
return p.payloadType
}
// SentTime returns the time that payload was sent.
func (p *Payload) SentTime() int64 {
return p.sentTime
}
// Bytes marshals the syncbeacon payload into a sequence of bytes.
func (p *Payload) Bytes() []byte {
// initialize helper
marshalUtil := marshalutil.New()
objectLength := marshalutil.INT64_SIZE
// marshal the p specific information
marshalUtil.WriteUint32(Type)
marshalUtil.WriteUint32(uint32(objectLength))
marshalUtil.WriteInt64(p.sentTime)
// return result
return marshalUtil.Bytes()
}
// Unmarshal unmarshals a given slice of bytes and fills the object.
func (p *Payload) Unmarshal(data []byte) (err error) {
_, err, _ = FromBytes(data, p)
return
}
// String returns a human readable version of syncbeacon payload (for debug purposes).
func (p *Payload) String() string {
return stringify.Struct("syncBeaconPayload",
stringify.StructField("sentTime", p.sentTime))
}
// IsSyncBeaconPayload checks if the message is sync beacon payload.
func IsSyncBeaconPayload(p *Payload) bool {
return p.Type() == Type
}
func init() {
payload.RegisterType(Type, ObjectName, func(data []byte) (payload payload.Payload, err error) {
payload = &Payload{}
err = payload.Unmarshal(data)
return
})
}
package syncbeacon
import (
"github.com/stretchr/testify/assert"
"testing"
"time"
)
func TestPayload(t *testing.T) {
originalPayload := NewSyncBeaconPayload(time.Now().UnixNano())
clonedPayload1, err, _ := FromBytes(originalPayload.Bytes())
if err != nil {
panic(err)
}
assert.Equal(t, originalPayload.SentTime(), clonedPayload1.SentTime())
clonedPayload2, err, _ := FromBytes(clonedPayload1.Bytes())
if err != nil {
panic(err)
}
assert.Equal(t, originalPayload.SentTime(), clonedPayload2.SentTime())
}
func TestIsSyncBeaconPayload(t *testing.T) {
p := NewSyncBeaconPayload(time.Now().UnixNano())
isSyncBeaconPayload := IsSyncBeaconPayload(p)
assert.True(t, isSyncBeaconPayload)
}
package syncbeacon
import (
"github.com/iotaledger/goshimmer/packages/shutdown"
"github.com/iotaledger/goshimmer/plugins/config"
"github.com/iotaledger/goshimmer/plugins/messagelayer"
"github.com/iotaledger/hive.go/daemon"
"github.com/iotaledger/hive.go/logger"
"github.com/iotaledger/hive.go/node"
flag "github.com/spf13/pflag"
"sync"
"time"
)
const (
// PluginName is the plugin name of the sync beacon plugin.
PluginName = "Sync Beacon"
// CfgSyncBeaconBroadcastIntervalSec is the interval in seconds at which the node broadcasts its sync status.
CfgSyncBeaconBroadcastIntervalSec = "syncbeacon.broadcastInterval"
)
func init() {
flag.Int(CfgSyncBeaconBroadcastIntervalSec, 30, "the interval at which the node will broadcast ist sync status")
}
var (
// plugin is the plugin instance of the sync beacon plugin.
plugin *node.Plugin
once sync.Once
log *logger.Logger
)
// Plugin gets the plugin instance.
func Plugin() *node.Plugin {
once.Do(func() {
plugin = node.NewPlugin(PluginName, node.Disabled, configure, run)
})
return plugin
}
// configure events
func configure(_ *node.Plugin) {
log = logger.NewLogger(PluginName)
}
// broadcastSyncBeaconPayload broadcasts a sync beacon via communication layer.
func broadcastSyncBeaconPayload() {
syncBeaconPayload := NewSyncBeaconPayload(time.Now().UnixNano())
msg, err := messagelayer.MessageFactory().IssuePayload(syncBeaconPayload)
if err != nil {
log.Infof("error issuing sync beacon. %w", err)
} else {
log.Infof("issued sync beacon %s", msg.Id())
}
}
func run(_ *node.Plugin) {
if err := daemon.BackgroundWorker("Sync-Beacon", func(shutdownSignal <-chan struct{}) {
ticker := time.NewTicker(config.Node().GetDuration(CfgSyncBeaconBroadcastIntervalSec) * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
broadcastSyncBeaconPayload()
case <-shutdownSignal:
return
}
}
}, shutdown.PrioritySynchronization); err != nil {
log.Panicf("Failed to start as daemon: %s", err)
}
}
package syncbeaconfollower
import (
"errors"
"github.com/iotaledger/goshimmer/packages/binary/messagelayer/message"
"github.com/iotaledger/goshimmer/packages/binary/messagelayer/tangle"
"github.com/iotaledger/goshimmer/packages/shutdown"
"github.com/iotaledger/goshimmer/plugins/config"
"github.com/iotaledger/goshimmer/plugins/messagelayer"
"github.com/iotaledger/goshimmer/plugins/sync"
"github.com/iotaledger/goshimmer/plugins/syncbeacon"
"github.com/iotaledger/hive.go/crypto/ed25519"
"github.com/iotaledger/hive.go/daemon"
"github.com/iotaledger/hive.go/events"
"github.com/iotaledger/hive.go/logger"
"github.com/iotaledger/hive.go/node"
"github.com/mr-tron/base58"
flag "github.com/spf13/pflag"
goSync "sync"
"time"
)
const (
// PluginName is the plugin name of the sync beacon plugin.
PluginName = "Sync Beacon Follower"
// CfgSyncBeaconFollowNodes defines the list of nodes this node should follow to determine its sync status.
CfgSyncBeaconFollowNodes = "syncbeaconfollower.followNodes"
// CfgSyncBeaconMaxTimeWindowSec defines the maximum time window for which a sync payload would be considerable.
CfgSyncBeaconMaxTimeWindowSec = "syncbeaconfollower.maxTimeWindowSec"
// CfgSyncBeaconMaxTimeOfflineSec defines the maximum time a beacon node can stay without receiving updates.
CfgSyncBeaconMaxTimeOfflineSec = "syncbeaconfollower.maxTimeOffline"
// CfgSyncBeaconCleanupInterval defines the interval that old beacon status are cleaned up.
CfgSyncBeaconCleanupInterval = "syncbeaconfollower.cleanupInterval"
// The percentage of following nodes that have to be synced.
syncPercentage = 0.6
)
// beaconSync represents the beacon payload and the sync status.
type beaconSync struct {
payload *syncbeacon.Payload
synced bool
}
func init() {
flag.StringSlice(CfgSyncBeaconFollowNodes, []string{"Gm7W191NDnqyF7KJycZqK7V6ENLwqxTwoKQN4SmpkB24", "9DB3j9cWYSuEEtkvanrzqkzCQMdH1FGv3TawJdVbDxkd"}, "list of trusted nodes to follow their sync status")
flag.Int(CfgSyncBeaconMaxTimeWindowSec, 10, "the maximum time window for which a sync payload would be considerable")
flag.Int(CfgSyncBeaconMaxTimeOfflineSec, 40, "the maximum time the node should stay synced without receiving updates")
flag.Int(CfgSyncBeaconCleanupInterval, 10, "the interval at which cleanups are done")
}
var (
// plugin is the plugin instance of the sync beacon plugin.
plugin *node.Plugin
once goSync.Once
log *logger.Logger
currentBeacons map[ed25519.PublicKey]*beaconSync
currentBeaconPubKeys map[ed25519.PublicKey]string
mutex goSync.RWMutex
beaconMaxTimeOfflineSec float64
)
var (
// ErrMissingFollowNodes is returned if the node starts with no follow nodes list
ErrMissingFollowNodes = errors.New("follow nodes list is required")
)
// Plugin gets the plugin instance.
func Plugin() *node.Plugin {
once.Do(func() {
plugin = node.NewPlugin(PluginName, node.Disabled, configure, run)
})
return plugin
}
// configure events
func configure(_ *node.Plugin) {
pubKeys := config.Node().GetStringSlice(CfgSyncBeaconFollowNodes)
beaconMaxTimeOfflineSec = float64(config.Node().GetInt(CfgSyncBeaconMaxTimeOfflineSec))
log = logger.NewLogger(PluginName)
currentBeacons = make(map[ed25519.PublicKey]*beaconSync, len(pubKeys))
currentBeaconPubKeys = make(map[ed25519.PublicKey]string, len(pubKeys))
for _, str := range pubKeys {
bytes, err := base58.Decode(str)
if err != nil {
log.Warnf("error decoding public key: %w", err)
continue
}
pubKey, _, err := ed25519.PublicKeyFromBytes(bytes)
if err != nil {
log.Warnf("%s is not a valid public key: %w", err)
continue
}
currentBeacons[pubKey] = &beaconSync{
payload: nil,
synced: false,
}
currentBeaconPubKeys[pubKey] = str
}
if len(currentBeaconPubKeys) == 0 {
log.Panicf("Follow node list cannot be empty: %w", ErrMissingFollowNodes)
}
messagelayer.Tangle().Events.MessageSolid.Attach(events.NewClosure(func(cachedMessage *message.CachedMessage, cachedMessageMetadata *tangle.CachedMessageMetadata) {
cachedMessageMetadata.Release()
cachedMessage.Consume(func(msg *message.Message) {
messagePayload := msg.Payload()
if messagePayload.Type() != syncbeacon.Type {
return
}
payload, ok := messagePayload.(*syncbeacon.Payload)
if !ok {
log.Info("could not cast payload to sync beacon object")
return
}
//check if issuer is in configured beacon follow list
if _, ok := currentBeaconPubKeys[msg.IssuerPublicKey()]; !ok {
return
}
handlePayload(payload, msg.IssuerPublicKey(), msg.Id())
})
}))
}
// handlePayload handles the received payload. It does the following checks:
// It checks that the issuer of the payload is a followed node.
// The time that payload was sent is not greater than CfgSyncBeaconMaxTimeWindowSec. If the duration is longer than CfgSyncBeaconMaxTimeWindowSec, we consider that beacon to be out of sync till we receive a newer payload.
// More than syncPercentage of followed nodes are also synced, the node is set to synced. Otherwise, its set as desynced.
func handlePayload(syncBeaconPayload *syncbeacon.Payload, issuerPublicKey ed25519.PublicKey, msgId message.Id) {
mutex.Lock()
defer mutex.Unlock()
synced := true
dur := time.Since(time.Unix(0, syncBeaconPayload.SentTime()))
if dur.Seconds() > float64(config.Node().GetInt(CfgSyncBeaconMaxTimeWindowSec)) {
log.Infof("sync beacon %s, received from %s is too old.", msgId, issuerPublicKey)
synced = false
}
currentBeacons[issuerPublicKey].synced = synced
currentBeacons[issuerPublicKey].payload = syncBeaconPayload
updateSynced()
}
// updateSynced checks the beacon nodes and update the nodes sync status
func updateSynced() {
beaconNodesSyncedCount := 0.0
for _, beaconSync := range currentBeacons {
if beaconSync.synced {
beaconNodesSyncedCount++
}
}
synced := true
if len(currentBeacons) > 0 {
synced = beaconNodesSyncedCount/float64(len(currentBeacons)) >= syncPercentage
}
// TODO: get rid of sync plugin
if synced {
sync.MarkSynced()
} else {
sync.MarkDesynced()
}
}
// cleanupFollowNodes cleans up offline nodes by setting their sync status to false after a configurable time window.
func cleanupFollowNodes() {
mutex.Lock()
defer mutex.Unlock()
for publicKey, beaconSync := range currentBeacons {
if beaconSync.payload != nil {
dur := time.Since(time.Unix(0, beaconSync.payload.SentTime()))
if dur.Seconds() > beaconMaxTimeOfflineSec {
beaconSync.synced = false
currentBeacons[publicKey] = beaconSync
}
}
}
updateSynced()
}
func run(_ *node.Plugin) {
if err := daemon.BackgroundWorker("Sync-Beacon-Cleanup", func(shutdownSignal <-chan struct{}) {
ticker := time.NewTicker(config.Node().GetDuration(CfgSyncBeaconCleanupInterval) * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
cleanupFollowNodes()
case <-shutdownSignal:
return
}
}
}, shutdown.PrioritySynchronization); err != nil {
log.Panicf("Failed to start as daemon: %s", err)
}
}
#!/bin/bash
TEST_NAMES='autopeering common drng message value consensus faucet'
TEST_NAMES='autopeering common drng message value consensus faucet syncbeacon'
echo "Build GoShimmer image"
docker build -t iotaledger/goshimmer ../../.
......
......@@ -95,6 +95,12 @@ func (d *DockerContainer) CreateGoShimmerPeer(config GoShimmerConfig) error {
if config.Faucet {
plugins = append(plugins, "faucet")
}
if config.SyncBeacon {
plugins = append(plugins, "Sync Beacon")
}
if config.SyncBeaconFollower {
plugins = append(plugins, "Sync Beacon Follower")
}
return strings.Join(plugins[:], ",")
}()),
// define the faucet seed in case the faucet dApp is enabled
......@@ -114,6 +120,8 @@ func (d *DockerContainer) CreateGoShimmerPeer(config GoShimmerConfig) error {
fmt.Sprintf("--drng.threshold=%d", config.DRNGThreshold),
fmt.Sprintf("--drng.committeeMembers=%s", config.DRNGCommittee),
fmt.Sprintf("--drng.distributedPubKey=%s", config.DRNGDistKey),
fmt.Sprintf("--syncbeaconfollower.followNodes=%s", config.SyncBeaconFollowNodes),
fmt.Sprintf("--syncbeacon.broadcastInterval=%d", config.SyncBeaconBroadcastInterval),
},
}
......
......@@ -64,7 +64,11 @@ type GoShimmerConfig struct {
DRNGInstance int
DRNGThreshold int
Faucet bool
Faucet bool
SyncBeacon bool
SyncBeaconFollower bool
SyncBeaconFollowNodes string
SyncBeaconBroadcastInterval int
}
// NetworkConfig defines the config of a GoShimmer Docker network.
......
package syncbeacon
import (
"os"
"testing"
"github.com/iotaledger/goshimmer/tools/integration-tests/tester/framework"
)
var f *framework.Framework
// TestMain gets called by the test utility and is executed before any other test in this package.
// It is therefore used to initialize the integration testing framework.
func TestMain(m *testing.M) {
var err error
f, err = framework.Instance()
if err != nil {
panic(err)
}
// call the tests
os.Exit(m.Run())
}
package syncbeacon
import (
"github.com/iotaledger/goshimmer/tools/integration-tests/tester/framework"
"github.com/iotaledger/goshimmer/tools/integration-tests/tester/tests"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"log"
"strings"
"testing"
"time"
)
// TestSyncBeacon checks that beacon nodes broadcast sync beacons
// and follower nodes use those payloads to determine if they are synced or not.
func TestSyncBeacon(t *testing.T) {
initialPeers := 4
n, err := f.CreateNetwork("syncbeacon_TestSyncBeacon", 0, 0)
require.NoError(t, err)
defer tests.ShutdownNetwork(t, n)
// create sync beacon nodes
var beaconPublicKeys []string
for i := 0; i < initialPeers; i++ {
peer, err := n.CreatePeer(framework.GoShimmerConfig{
SyncBeacon: true,
SyncBeaconBroadcastInterval: 20,
})
require.NoError(t, err)
beaconPublicKeys = append(beaconPublicKeys, peer.PublicKey().String())
}
peers := n.Peers()
err = n.WaitForAutopeering(3)
require.NoError(t, err)
// beacon follower node to follow all previous nodes
peer, err := n.CreatePeer(framework.GoShimmerConfig{
SyncBeaconFollower: true,
SyncBeaconFollowNodes: strings.Join(beaconPublicKeys, ","),
})
require.NoError(t, err)
err = n.WaitForAutopeering(3)
require.NoError(t, err)
log.Println("Waiting...1/2")
// wait for node to solidify beacon messages
time.Sleep(60 * time.Second)
log.Println("done waiting.")
resp, err := peer.Info()
require.NoError(t, err)
assert.Truef(t, resp.Synced, "Peer %s should be synced but is desynced!", peer.String())
// 2. shutdown all but 2 beacon peers.
for _, p := range peers[:len(peers)-2] {
_ = p.Stop()
}
// wait for peers to sync and broadcast
log.Println("Waiting...2/2")
time.Sleep(40 * time.Second)
log.Println("done waiting.")
// expect majority of nodes to not have broadcasted beacons. Hence should be desynced due to cleanup.
resp, err = peer.Info()
require.NoError(t, err)
assert.Falsef(t, resp.Synced, "Peer %s should be desynced but is synced!", peer.String())
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment