fpc_livefeed.go 4.18 KiB
package dashboard
import (
"time"
"github.com/gorilla/websocket"
"github.com/iotaledger/goshimmer/packages/shutdown"
"github.com/iotaledger/goshimmer/packages/vote"
"github.com/iotaledger/goshimmer/plugins/analysis/packet"
analysis "github.com/iotaledger/goshimmer/plugins/analysis/server"
"github.com/iotaledger/hive.go/daemon"
"github.com/iotaledger/hive.go/events"
"github.com/iotaledger/hive.go/workerpool"
"github.com/mr-tron/base58/base58"
)
const (
unfinalized = 0
liked = 1
disliked = 2
)
var (
fpcLiveFeedWorkerCount = 1
fpcLiveFeedWorkerQueueSize = 300
fpcLiveFeedWorkerPool *workerpool.WorkerPool
recordedConflicts *conflictRecord
)
// FPCUpdate contains an FPC update.
type FPCUpdate struct {
Conflicts conflictSet `json:"conflictset" bson:"conflictset"`
}
func configureFPCLiveFeed() {
recordedConflicts = newConflictRecord()
fpcLiveFeedWorkerPool = workerpool.New(func(task workerpool.Task) {
newMsg := task.Param(0).(*FPCUpdate)
//fmt.Println("broadcasting FPC message to websocket clients")
broadcastWsMessage(&wsmsg{MsgTypeFPC, newMsg}, true)
task.Return(nil)
}, workerpool.WorkerCount(fpcLiveFeedWorkerCount), workerpool.QueueSize(fpcLiveFeedWorkerQueueSize))
}
func runFPCLiveFeed() {
if err := daemon.BackgroundWorker("Analysis[FPCUpdater]", func(shutdownSignal <-chan struct{}) {
onFPCHeartbeatReceived := events.NewClosure(func(hb *packet.FPCHeartbeat) {
//fmt.Println("broadcasting FPC live feed")
fpcLiveFeedWorkerPool.Submit(createFPCUpdate(hb, true))
})
analysis.Events.FPCHeartbeat.Attach(onFPCHeartbeatReceived)
fpcLiveFeedWorkerPool.Start()
defer fpcLiveFeedWorkerPool.Stop()
cleanUpTicker := time.NewTicker(1 * time.Minute)
for {
select {
case <-shutdownSignal:
log.Info("Stopping Analysis[FPCUpdater] ...")
analysis.Events.FPCHeartbeat.Detach(onFPCHeartbeatReceived)
cleanUpTicker.Stop()
log.Info("Stopping Analysis[FPCUpdater] ... done")
case <-cleanUpTicker.C:
log.Info("Cleaning up Finalized Conflicts ...")
recordedConflicts.cleanUp()
log.Info("Cleaning up Finalized Conflicts ... done")
}
}
}, shutdown.PriorityDashboard); err != nil {
log.Panicf("Failed to start as daemon: %s", err)
}
}
func createFPCUpdate(hb *packet.FPCHeartbeat, recordEvent bool) *FPCUpdate {
// prepare the update
conflicts := make(conflictSet)
nodeID := base58.Encode(hb.OwnID)
for ID, context := range hb.RoundStats.ActiveVoteContexts {
newVoteContext := voteContext{
NodeID: nodeID,
Rounds: context.Rounds,
Opinions: vote.ConvertOpinionsToInts32(context.Opinions),
}
conflicts[ID] = newConflict()
conflicts[ID].NodesView[nodeID] = newVoteContext
if recordEvent {
// update recorded events
recordedConflicts.update(ID, conflict{NodesView: map[string]voteContext{nodeID: newVoteContext}})
}
}
if recordEvent {
// check finalized conflicts
if len(hb.Finalized) > 0 {
finalizedConflicts := make([]FPCRecord, len(hb.Finalized))
i := 0
for ID, finalOpinion := range hb.Finalized {
conflictOverview, ok := recordedConflicts.load(ID)
if !ok {
log.Error("Error: missing conflict with ID:", ID)
continue
}
conflictDetail := conflictOverview.NodesView[nodeID]
conflictDetail.Status = vote.ConvertOpinionToInt32(finalOpinion)
conflicts[ID] = newConflict()
conflicts[ID].NodesView[nodeID] = conflictDetail
recordedConflicts.update(ID, conflicts[ID])
finalizedConflicts[i] = FPCRecord{
ConflictID: ID,
NodeID: conflictDetail.NodeID,
Rounds: conflictDetail.Rounds,
Opinions: conflictDetail.Opinions,
Status: conflictDetail.Status,
}
i++
}
//log.Info("Storing:\n", finalizedConflicts)
err := storeFPCRecords(finalizedConflicts, mongoDB())
if err != nil {
log.Errorf("Error while writing on MongoDB: %s", err)
}
}
}
return &FPCUpdate{
Conflicts: conflicts,
}
}
// replay FPC records (past events).
func replayFPCRecords(ws *websocket.Conn) {
wsMessage := &wsmsg{MsgTypeFPC, recordedConflicts.ToFPCUpdate()}
if err := ws.WriteJSON(wsMessage); err != nil {
log.Info(err)
return
}
if err := ws.SetWriteDeadline(time.Now().Add(webSocketWriteTimeout)); err != nil {
return
}
}