Select Git revision
fpc_livefeed.go
fpc_livefeed.go 2.60 KiB
package dashboard
import (
"time"
"github.com/iotaledger/goshimmer/packages/shutdown"
"github.com/iotaledger/goshimmer/packages/vote"
"github.com/iotaledger/goshimmer/plugins/analysis/packet"
analysis "github.com/iotaledger/goshimmer/plugins/analysis/server"
"github.com/iotaledger/hive.go/daemon"
"github.com/iotaledger/hive.go/events"
"github.com/iotaledger/hive.go/workerpool"
"github.com/mr-tron/base58"
)
var (
fpcLiveFeedWorkerCount = 1
fpcLiveFeedWorkerQueueSize = 50
fpcLiveFeedWorkerPool *workerpool.WorkerPool
)
// Conflict defines the struct for the opinions of the nodes regarding a given conflict.
type Conflict struct {
NodesView map[string]voteContext `json:"nodesview"`
}
type voteContext struct {
NodeID string `json:"nodeid"`
Rounds int `json:"rounds"`
Opinions []int32 `json:"opinions"`
Like int32 `json:"like"`
}
// FPCMsg contains an FPC update
type FPCMsg struct {
Nodes int `json:"nodes"`
ConflictSet map[string]Conflict `json:"conflictset"`
}
func configureFPCLiveFeed() {
fpcLiveFeedWorkerPool = workerpool.New(func(task workerpool.Task) {
newMsg := task.Param(0).(*FPCMsg)
broadcastWsMessage(&wsmsg{MsgTypeFPC, newMsg})
task.Return(nil)
}, workerpool.WorkerCount(fpcLiveFeedWorkerCount), workerpool.QueueSize(fpcLiveFeedWorkerQueueSize))
}
func runFPCLiveFeed() {
if err := daemon.BackgroundWorker("Analysis[FPCUpdater]", func(shutdownSignal <-chan struct{}) {
newMsgRateLimiter := time.NewTicker(time.Second / 10)
defer newMsgRateLimiter.Stop()
onFPCHeartbeatReceived := events.NewClosure(func(hb *packet.FPCHeartbeat) {
select {
case <-newMsgRateLimiter.C:
fpcLiveFeedWorkerPool.TrySubmit(createFPCUpdate(hb))
default:
}
})
analysis.Events.FPCHeartbeat.Attach(onFPCHeartbeatReceived)
fpcLiveFeedWorkerPool.Start()
defer fpcLiveFeedWorkerPool.Stop()
<-shutdownSignal
log.Info("Stopping Analysis[FPCUpdater] ...")
analysis.Events.FPCHeartbeat.Detach(onFPCHeartbeatReceived)
log.Info("Stopping Analysis[FPCUpdater] ... done")
}, shutdown.PriorityDashboard); err != nil {
panic(err)