Skip to content
Snippets Groups Projects
Unverified Commit 97fd9862 authored by Angelo Capossele's avatar Angelo Capossele Committed by GitHub
Browse files

minor improvements (#672)

* :wrench: Add syncbeacon to config

* :recycle: Refactor syncbeaconfollower

* Fix syncbeacon integration test

* :bug: Fix double-spend tool

* :lipstick: Fix analysis dashboard title

* :construction: Add Shutdown method to messageRequester

* :poop: Dirty FCOB shutdown fix

* :loud_sound: Add debug logs for message tangle shutdown

* :pencil2: Fix typo

* :bug:

 Fix message Tangle objects released

* Load all missing messages into message requester on startup

* Fix pointer issue for anonymous function

* Fix consensus integration test

Co-authored-by: default avatarjonastheis <mail@jonastheis.de>
parent a1def20e
No related branches found
No related tags found
No related merge requests found
Showing
with 88 additions and 46 deletions
...@@ -17,6 +17,12 @@ ...@@ -17,6 +17,12 @@
], ],
"port": 14626 "port": 14626
}, },
"syncbeaconfollower": {
"followNodes": [
"Gm7W191NDnqyF7KJycZqK7V6ENLwqxTwoKQN4SmpkB24",
"9DB3j9cWYSuEEtkvanrzqkzCQMdH1FGv3TawJdVbDxkd"
]
},
"dashboard": { "dashboard": {
"bindAddress": "127.0.0.1:8081", "bindAddress": "127.0.0.1:8081",
"dev": false, "dev": false,
......
...@@ -178,6 +178,8 @@ func configure(_ *node.Plugin) { ...@@ -178,6 +178,8 @@ func configure(_ *node.Plugin) {
func run(*node.Plugin) { func run(*node.Plugin) {
if err := daemon.BackgroundWorker("ValueTangle", func(shutdownSignal <-chan struct{}) { if err := daemon.BackgroundWorker("ValueTangle", func(shutdownSignal <-chan struct{}) {
<-shutdownSignal <-shutdownSignal
// TODO: make this better
time.Sleep(12 * time.Second)
_tangle.Shutdown() _tangle.Shutdown()
}, shutdown.PriorityTangle); err != nil { }, shutdown.PriorityTangle); err != nil {
log.Panicf("Failed to start as daemon: %s", err) log.Panicf("Failed to start as daemon: %s", err)
......
...@@ -8,7 +8,7 @@ import ( ...@@ -8,7 +8,7 @@ import (
"github.com/iotaledger/hive.go/events" "github.com/iotaledger/hive.go/events"
) )
const messageExistCheckThreshold = 21 const messageExistCheckThreshold = 10
// MessageRequester takes care of requesting messages. // MessageRequester takes care of requesting messages.
type MessageRequester struct { type MessageRequester struct {
...@@ -23,9 +23,13 @@ type MessageRequester struct { ...@@ -23,9 +23,13 @@ type MessageRequester struct {
// MessageExistsFunc is a function that tells if a message exists. // MessageExistsFunc is a function that tells if a message exists.
type MessageExistsFunc func(messageId message.Id) bool type MessageExistsFunc func(messageId message.Id) bool
func createReRequest(requester *MessageRequester, msgID message.Id, count int) func() {
return func() { requester.reRequest(msgID, count) }
}
// New creates a new message requester. // New creates a new message requester.
func New(messageExists MessageExistsFunc, optionalOptions ...Option) *MessageRequester { func New(messageExists MessageExistsFunc, missingMessages []message.Id, optionalOptions ...Option) *MessageRequester {
return &MessageRequester{ requester := &MessageRequester{
scheduledRequests: make(map[message.Id]*time.Timer), scheduledRequests: make(map[message.Id]*time.Timer),
options: newOptions(optionalOptions), options: newOptions(optionalOptions),
messageExistsFunc: messageExists, messageExistsFunc: messageExists,
...@@ -38,6 +42,16 @@ func New(messageExists MessageExistsFunc, optionalOptions ...Option) *MessageReq ...@@ -38,6 +42,16 @@ func New(messageExists MessageExistsFunc, optionalOptions ...Option) *MessageReq
}), }),
}, },
} }
// add requests for all missing messages
requester.scheduledRequestsMutex.Lock()
defer requester.scheduledRequestsMutex.Unlock()
for _, id := range missingMessages {
requester.scheduledRequests[id] = time.AfterFunc(requester.options.retryInterval, createReRequest(requester, id, 0))
}
return requester
} }
// StartRequest initiates a regular triggering of the StartRequest event until it has been stopped using StopRequest. // StartRequest initiates a regular triggering of the StartRequest event until it has been stopped using StopRequest.
...@@ -51,7 +65,7 @@ func (requester *MessageRequester) StartRequest(id message.Id) { ...@@ -51,7 +65,7 @@ func (requester *MessageRequester) StartRequest(id message.Id) {
} }
// schedule the next request and trigger the event // schedule the next request and trigger the event
requester.scheduledRequests[id] = time.AfterFunc(requester.options.retryInterval, func() { requester.reRequest(id, 0) }) requester.scheduledRequests[id] = time.AfterFunc(requester.options.retryInterval, func() { createReRequest(requester, id, 0) })
requester.scheduledRequestsMutex.Unlock() requester.scheduledRequestsMutex.Unlock()
requester.Events.SendRequest.Trigger(id) requester.Events.SendRequest.Trigger(id)
} }
...@@ -85,7 +99,7 @@ func (requester *MessageRequester) reRequest(id message.Id, count int) { ...@@ -85,7 +99,7 @@ func (requester *MessageRequester) reRequest(id message.Id, count int) {
requester.Events.MissingMessageAppeared.Trigger(id) requester.Events.MissingMessageAppeared.Trigger(id)
return return
} }
requester.scheduledRequests[id] = time.AfterFunc(requester.options.retryInterval, func() { requester.reRequest(id, count) }) requester.scheduledRequests[id] = time.AfterFunc(requester.options.retryInterval, func() { createReRequest(requester, id, count) })
return return
} }
} }
......
...@@ -21,8 +21,8 @@ class App extends React.Component<AppProps, unknown> { ...@@ -21,8 +21,8 @@ class App extends React.Component<AppProps, unknown> {
<div className="root"> <div className="root">
<header> <header>
<Link className="brand" to="/"> <Link className="brand" to="/">
<img src="/assets/logo-header.svg" alt="GoShimmer Analyser" /> <img src="/assets/logo-header.svg" alt="Pollen Analyzer" />
<h1>GoShimmer Analyzer</h1> <h1>Pollen Analyzer</h1>
</Link> </Link>
<div className="badge-container"> <div className="badge-container">
{!this.props.autopeeringStore.websocketConnected && {!this.props.autopeeringStore.websocketConnected &&
......
...@@ -22,7 +22,7 @@ export default class Autopeering extends React.Component<AutopeeringProps, unkno ...@@ -22,7 +22,7 @@ export default class Autopeering extends React.Component<AutopeeringProps, unkno
return ( return (
<div className="auto-peering"> <div className="auto-peering">
<div className="header margin-b-m"> <div className="header margin-b-m">
<h2>Pollen Visualizer</h2> <h2>Autopeering Visualizer</h2>
<div className="row"> <div className="row">
<select <select
onChange={(e) => this.props.autopeeringStore.handleVersionSelection(e.target.value)} onChange={(e) => this.props.autopeeringStore.handleVersionSelection(e.target.value)}
......
This diff is collapsed.
...@@ -33,7 +33,10 @@ func runLiveFeed() { ...@@ -33,7 +33,10 @@ func runLiveFeed() {
select { select {
case <-newMsgRateLimiter.C: case <-newMsgRateLimiter.C:
liveFeedWorkerPool.TrySubmit(message) _, ok := liveFeedWorkerPool.TrySubmit(message)
if !ok {
message.Release()
}
default: default:
message.Release() message.Release()
} }
......
...@@ -71,7 +71,11 @@ func runVisualizer() { ...@@ -71,7 +71,11 @@ func runVisualizer() {
notifyNewMsg := events.NewClosure(func(message *message.CachedMessage, metadata *tangle.CachedMessageMetadata) { notifyNewMsg := events.NewClosure(func(message *message.CachedMessage, metadata *tangle.CachedMessageMetadata) {
defer message.Release() defer message.Release()
defer metadata.Release() defer metadata.Release()
visualizerWorkerPool.TrySubmit(message.Retain(), metadata.Retain()) _, ok := visualizerWorkerPool.TrySubmit(message.Retain(), metadata.Retain())
if !ok {
message.Release()
metadata.Release()
}
}) })
notifyNewTip := events.NewClosure(func(messageId message.Id) { notifyNewTip := events.NewClosure(func(messageId message.Id) {
......
...@@ -85,7 +85,8 @@ func MessageFactory() *messagefactory.MessageFactory { ...@@ -85,7 +85,8 @@ func MessageFactory() *messagefactory.MessageFactory {
// MessageRequester gets the messageRequester instance. // MessageRequester gets the messageRequester instance.
func MessageRequester() *messagerequester.MessageRequester { func MessageRequester() *messagerequester.MessageRequester {
msgReqOnce.Do(func() { msgReqOnce.Do(func() {
messageRequester = messagerequester.New(messageExists) // load all missing messages on start up
messageRequester = messagerequester.New(messageExists, Tangle().MissingMessages())
}) })
return messageRequester return messageRequester
} }
......
...@@ -52,7 +52,7 @@ func init() { ...@@ -52,7 +52,7 @@ func init() {
flag.StringSlice(CfgSyncBeaconFollowNodes, []string{"Gm7W191NDnqyF7KJycZqK7V6ENLwqxTwoKQN4SmpkB24", "9DB3j9cWYSuEEtkvanrzqkzCQMdH1FGv3TawJdVbDxkd"}, "list of trusted nodes to follow their sync status") flag.StringSlice(CfgSyncBeaconFollowNodes, []string{"Gm7W191NDnqyF7KJycZqK7V6ENLwqxTwoKQN4SmpkB24", "9DB3j9cWYSuEEtkvanrzqkzCQMdH1FGv3TawJdVbDxkd"}, "list of trusted nodes to follow their sync status")
flag.Int(CfgSyncBeaconMaxTimeWindowSec, 10, "the maximum time window for which a sync payload would be considerable") flag.Int(CfgSyncBeaconMaxTimeWindowSec, 10, "the maximum time window for which a sync payload would be considerable")
flag.Int(CfgSyncBeaconMaxTimeOfflineSec, 40, "the maximum time the node should stay synced without receiving updates") flag.Int(CfgSyncBeaconMaxTimeOfflineSec, 70, "the maximum time the node should stay synced without receiving updates")
flag.Int(CfgSyncBeaconCleanupInterval, 10, "the interval at which cleanups are done") flag.Int(CfgSyncBeaconCleanupInterval, 10, "the interval at which cleanups are done")
} }
...@@ -166,7 +166,7 @@ func configure(_ *node.Plugin) { ...@@ -166,7 +166,7 @@ func configure(_ *node.Plugin) {
payload, ok := messagePayload.(*syncbeacon_payload.Payload) payload, ok := messagePayload.(*syncbeacon_payload.Payload)
if !ok { if !ok {
log.Info("could not cast payload to sync beacon object") log.Debug("could not cast payload to sync beacon object")
return return
} }
...@@ -175,6 +175,14 @@ func configure(_ *node.Plugin) { ...@@ -175,6 +175,14 @@ func configure(_ *node.Plugin) {
return return
} }
// only consider fresh beacons
mutex.RLock()
if payload.SentTime() < currentBeacons[msg.IssuerPublicKey()].SentTime {
mutex.RUnlock()
return
}
mutex.RUnlock()
handlePayload(payload, msg.IssuerPublicKey(), msg.Id()) handlePayload(payload, msg.IssuerPublicKey(), msg.Id())
}) })
})) }))
...@@ -184,16 +192,15 @@ func configure(_ *node.Plugin) { ...@@ -184,16 +192,15 @@ func configure(_ *node.Plugin) {
// The time that payload was sent is not greater than CfgSyncBeaconMaxTimeWindowSec. If the duration is longer than CfgSyncBeaconMaxTimeWindowSec, we consider that beacon to be out of sync till we receive a newer payload. // The time that payload was sent is not greater than CfgSyncBeaconMaxTimeWindowSec. If the duration is longer than CfgSyncBeaconMaxTimeWindowSec, we consider that beacon to be out of sync till we receive a newer payload.
// More than syncPercentage of followed nodes are also synced, the node is set to synced. Otherwise, its set as desynced. // More than syncPercentage of followed nodes are also synced, the node is set to synced. Otherwise, its set as desynced.
func handlePayload(syncBeaconPayload *syncbeacon_payload.Payload, issuerPublicKey ed25519.PublicKey, msgID message.Id) { func handlePayload(syncBeaconPayload *syncbeacon_payload.Payload, issuerPublicKey ed25519.PublicKey, msgID message.Id) {
mutex.Lock()
defer mutex.Unlock()
synced := true synced := true
dur := time.Since(time.Unix(0, syncBeaconPayload.SentTime())) dur := time.Since(time.Unix(0, syncBeaconPayload.SentTime()))
if dur.Seconds() > beaconMaxTimeWindowSec { if dur.Seconds() > beaconMaxTimeWindowSec {
log.Infof("sync beacon %s, received from %s is too old.", msgID, issuerPublicKey) log.Debugf("sync beacon %s, received from %s is too old.", msgID, issuerPublicKey)
synced = false synced = false
} }
mutex.Lock()
defer mutex.Unlock()
currentBeacons[issuerPublicKey].Synced = synced currentBeacons[issuerPublicKey].Synced = synced
currentBeacons[issuerPublicKey].MsgID = msgID currentBeacons[issuerPublicKey].MsgID = msgID
currentBeacons[issuerPublicKey].SentTime = syncBeaconPayload.SentTime() currentBeacons[issuerPublicKey].SentTime = syncBeaconPayload.SentTime()
......
...@@ -7,7 +7,6 @@ import ( ...@@ -7,7 +7,6 @@ import (
"time" "time"
"github.com/iotaledger/goshimmer/client" "github.com/iotaledger/goshimmer/client"
"github.com/iotaledger/goshimmer/client/wallet"
walletseed "github.com/iotaledger/goshimmer/client/wallet/packages/seed" walletseed "github.com/iotaledger/goshimmer/client/wallet/packages/seed"
"github.com/iotaledger/goshimmer/dapps/valuetransfers/packages/address" "github.com/iotaledger/goshimmer/dapps/valuetransfers/packages/address"
"github.com/iotaledger/goshimmer/dapps/valuetransfers/packages/address/signaturescheme" "github.com/iotaledger/goshimmer/dapps/valuetransfers/packages/address/signaturescheme"
...@@ -30,8 +29,8 @@ func main() { ...@@ -30,8 +29,8 @@ func main() {
clients[0] = client.NewGoShimmerAPI(node1APIURL, http.Client{Timeout: 60 * time.Second}) clients[0] = client.NewGoShimmerAPI(node1APIURL, http.Client{Timeout: 60 * time.Second})
clients[1] = client.NewGoShimmerAPI(node2APIURL, http.Client{Timeout: 60 * time.Second}) clients[1] = client.NewGoShimmerAPI(node2APIURL, http.Client{Timeout: 60 * time.Second})
myWallet := wallet.New() mySeed := walletseed.NewSeed()
myAddr := myWallet.Seed().Address(0) myAddr := mySeed.Address(0)
if _, err := clients[0].SendFaucetRequest(myAddr.String()); err != nil { if _, err := clients[0].SendFaucetRequest(myAddr.String()); err != nil {
fmt.Println(err) fmt.Println(err)
...@@ -88,8 +87,6 @@ func main() { ...@@ -88,8 +87,6 @@ func main() {
go func(i int) { go func(i int) {
defer wg.Done() defer wg.Done()
fmt.Println(i)
// create a new receiver wallet for the given conflict // create a new receiver wallet for the given conflict
receiverSeeds[i] = walletseed.NewSeed() receiverSeeds[i] = walletseed.NewSeed()
destAddr := receiverSeeds[i].Address(0) destAddr := receiverSeeds[i].Address(0)
...@@ -101,7 +98,7 @@ func main() { ...@@ -101,7 +98,7 @@ func main() {
{Value: 1337, Color: balance.ColorIOTA}, {Value: 1337, Color: balance.ColorIOTA},
}, },
})) }))
tx = tx.Sign(signaturescheme.ED25519(*myWallet.Seed().KeyPair(0))) tx = tx.Sign(signaturescheme.ED25519(*mySeed.KeyPair(0)))
conflictingTxs[i] = tx conflictingTxs[i] = tx
valueObject := valuepayload.New(valuepayload.GenesisID, valuepayload.GenesisID, tx) valueObject := valuepayload.New(valuepayload.GenesisID, valuepayload.GenesisID, tx)
......
...@@ -119,6 +119,12 @@ func (d *DockerContainer) CreateGoShimmerPeer(config GoShimmerConfig) error { ...@@ -119,6 +119,12 @@ func (d *DockerContainer) CreateGoShimmerPeer(config GoShimmerConfig) error {
fmt.Sprintf("--syncbeaconfollower.followNodes=%s", config.SyncBeaconFollowNodes), fmt.Sprintf("--syncbeaconfollower.followNodes=%s", config.SyncBeaconFollowNodes),
fmt.Sprintf("--syncbeacon.broadcastInterval=%d", config.SyncBeaconBroadcastInterval), fmt.Sprintf("--syncbeacon.broadcastInterval=%d", config.SyncBeaconBroadcastInterval),
"--syncbeacon.startSynced=true", "--syncbeacon.startSynced=true",
func() string {
if config.SyncBeaconMaxTimeOfflineSec == 0 {
return ""
}
return fmt.Sprintf("--syncbeaconfollower.maxTimeOffline=%d", config.SyncBeaconMaxTimeOfflineSec)
}(),
}, },
} }
......
...@@ -70,6 +70,7 @@ type GoShimmerConfig struct { ...@@ -70,6 +70,7 @@ type GoShimmerConfig struct {
SyncBeaconFollower bool SyncBeaconFollower bool
SyncBeaconFollowNodes string SyncBeaconFollowNodes string
SyncBeaconBroadcastInterval int SyncBeaconBroadcastInterval int
SyncBeaconMaxTimeOfflineSec int
} }
// NetworkConfig defines the config of a GoShimmer Docker network. // NetworkConfig defines the config of a GoShimmer Docker network.
......
...@@ -39,6 +39,7 @@ func TestSyncBeacon(t *testing.T) { ...@@ -39,6 +39,7 @@ func TestSyncBeacon(t *testing.T) {
peer, err := n.CreatePeer(framework.GoShimmerConfig{ peer, err := n.CreatePeer(framework.GoShimmerConfig{
SyncBeaconFollower: true, SyncBeaconFollower: true,
SyncBeaconFollowNodes: strings.Join(beaconPublicKeys, ","), SyncBeaconFollowNodes: strings.Join(beaconPublicKeys, ","),
SyncBeaconMaxTimeOfflineSec: 15,
}) })
require.NoError(t, err) require.NoError(t, err)
err = n.WaitForAutopeering(3) err = n.WaitForAutopeering(3)
...@@ -46,7 +47,7 @@ func TestSyncBeacon(t *testing.T) { ...@@ -46,7 +47,7 @@ func TestSyncBeacon(t *testing.T) {
log.Println("Waiting...1/2") log.Println("Waiting...1/2")
// wait for node to solidify beacon messages // wait for node to solidify beacon messages
time.Sleep(60 * time.Second) time.Sleep(30 * time.Second)
log.Println("done waiting.") log.Println("done waiting.")
resp, err := peer.Info() resp, err := peer.Info()
...@@ -60,7 +61,7 @@ func TestSyncBeacon(t *testing.T) { ...@@ -60,7 +61,7 @@ func TestSyncBeacon(t *testing.T) {
// wait for peers to sync and broadcast // wait for peers to sync and broadcast
log.Println("Waiting...2/2") log.Println("Waiting...2/2")
time.Sleep(60 * time.Second) time.Sleep(30 * time.Second)
log.Println("done waiting.") log.Println("done waiting.")
// expect majority of nodes to not have broadcasted beacons. Hence should be desynced due to cleanup. // expect majority of nodes to not have broadcasted beacons. Hence should be desynced due to cleanup.
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment