Newer
Older
package dashboard
import (
Angelo Capossele
committed
"runtime"
Angelo Capossele
committed
"github.com/gorilla/websocket"
"github.com/iotaledger/goshimmer/packages/metrics"
"github.com/iotaledger/goshimmer/packages/shutdown"
"github.com/iotaledger/goshimmer/packages/vote"
"github.com/iotaledger/goshimmer/plugins/analysis/packet"
analysis "github.com/iotaledger/goshimmer/plugins/analysis/server"
Angelo Capossele
committed
"github.com/iotaledger/goshimmer/plugins/config"
"github.com/iotaledger/hive.go/daemon"
"github.com/iotaledger/hive.go/events"
"github.com/iotaledger/hive.go/workerpool"
"go.mongodb.org/mongo-driver/bson/primitive"
Angelo Capossele
committed
)
const (
liked = 1
disliked = 2
)
var (
fpcLiveFeedWorkerCount = 1
Angelo Capossele
committed
fpcLiveFeedWorkerQueueSize = 300
fpcLiveFeedWorkerPool *workerpool.WorkerPool
fpcStoreFinalizedWorkerCount = runtime.GOMAXPROCS(0)
Angelo Capossele
committed
fpcStoreFinalizedWorkerQueueSize = 300
fpcStoreFinalizedWorkerPool *workerpool.WorkerPool
Angelo Capossele
committed
activeConflicts = newActiveConflictSet()
)
Angelo Capossele
committed
// FPCUpdate contains an FPC update.
type FPCUpdate struct {
Conflicts conflictSet `json:"conflictset" bson:"conflictset"`
}
func configureFPCLiveFeed() {
Angelo Capossele
committed
if config.Node().GetBool(CfgMongoDBEnabled) {
Angelo Capossele
committed
mongoDB()
}
fpcLiveFeedWorkerPool = workerpool.New(func(task workerpool.Task) {
Angelo Capossele
committed
newMsg := task.Param(0).(*FPCUpdate)
broadcastWsMessage(&wsmsg{MsgTypeFPC, newMsg}, true)
task.Return(nil)
}, workerpool.WorkerCount(fpcLiveFeedWorkerCount), workerpool.QueueSize(fpcLiveFeedWorkerQueueSize))
Angelo Capossele
committed
if config.Node().GetBool(CfgMongoDBEnabled) {
Angelo Capossele
committed
fpcStoreFinalizedWorkerPool = workerpool.New(func(task workerpool.Task) {
storeFinalizedVoteContext(task.Param(0).(FPCRecords))
task.Return(nil)
}, workerpool.WorkerCount(fpcStoreFinalizedWorkerCount), workerpool.QueueSize(fpcStoreFinalizedWorkerQueueSize))
}
}
func runFPCLiveFeed() {
if err := daemon.BackgroundWorker("Analysis[FPCUpdater]", func(shutdownSignal <-chan struct{}) {
onFPCHeartbeatReceived := events.NewClosure(func(hb *packet.FPCHeartbeat) {
Angelo Capossele
committed
fpcLiveFeedWorkerPool.Submit(createFPCUpdate(hb))
})
analysis.Events.FPCHeartbeat.Attach(onFPCHeartbeatReceived)
fpcLiveFeedWorkerPool.Start()
defer fpcLiveFeedWorkerPool.Stop()
if config.Node().GetBool(CfgMongoDBEnabled) {
Angelo Capossele
committed
fpcStoreFinalizedWorkerPool.Start()
defer fpcStoreFinalizedWorkerPool.Stop()
}
cleanUpTicker := time.NewTicker(1 * time.Minute)
for {
select {
case <-shutdownSignal:
log.Info("Stopping Analysis[FPCUpdater] ...")
analysis.Events.FPCHeartbeat.Detach(onFPCHeartbeatReceived)
cleanUpTicker.Stop()
log.Info("Stopping Analysis[FPCUpdater] ... done")
return
case <-cleanUpTicker.C:
log.Debug("Cleaning up Finalized Conflicts ...")
activeConflicts.cleanUp()
log.Debug("Cleaning up Finalized Conflicts ... done")
}
}
}, shutdown.PriorityDashboard); err != nil {
log.Panicf("Failed to start as daemon: %s", err)
Angelo Capossele
committed
func createFPCUpdate(hb *packet.FPCHeartbeat) *FPCUpdate {
// prepare the update
conflicts := make(conflictSet)
nodeID := shortNodeIDString(hb.OwnID)
for ID, context := range hb.RoundStats.ActiveVoteContexts {
Angelo Capossele
committed
newVoteContext := voteContext{
NodeID: nodeID,
Rounds: context.Rounds,
Opinions: vote.ConvertOpinionsToInts32(context.Opinions),
}
Angelo Capossele
committed
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
conflicts[ID] = newConflict()
conflicts[ID].NodesView[nodeID] = newVoteContext
// update recorded events
activeConflicts.update(ID, conflict{NodesView: map[string]voteContext{nodeID: newVoteContext}})
}
// check finalized conflicts
if len(hb.Finalized) == 0 {
return &FPCUpdate{Conflicts: conflicts}
}
finalizedConflicts := make(FPCRecords, len(hb.Finalized))
i := 0
for ID, finalOpinion := range hb.Finalized {
conflictOverview, ok := activeConflicts.load(ID)
if !ok {
log.Error("Error: missing conflict with ID:", ID)
continue
}
conflictDetail := conflictOverview.NodesView[nodeID]
conflictDetail.Outcome = vote.ConvertOpinionToInt32(finalOpinion)
conflicts[ID] = newConflict()
conflicts[ID].NodesView[nodeID] = conflictDetail
activeConflicts.update(ID, conflicts[ID])
finalizedConflicts[i] = FPCRecord{
ConflictID: ID,
NodeID: conflictDetail.NodeID,
Rounds: conflictDetail.Rounds,
Opinions: conflictDetail.Opinions,
Outcome: conflictDetail.Outcome,
Time: primitive.NewDateTimeFromTime(time.Now()),
Angelo Capossele
committed
}
i++
metrics.Events().AnalysisFPCFinalized.Trigger(&metrics.AnalysisFPCFinalizedEvent{
ConflictID: ID,
NodeID: conflictDetail.NodeID,
Rounds: conflictDetail.Rounds,
Opinions: vote.ConvertInts32ToOpinions(conflictDetail.Opinions),
Outcome: vote.ConvertInt32Opinion(conflictDetail.Outcome),
})
}
if config.Node().GetBool(CfgMongoDBEnabled) {
Angelo Capossele
committed
fpcStoreFinalizedWorkerPool.TrySubmit(finalizedConflicts)
}
return &FPCUpdate{Conflicts: conflicts}
}
// stores the given finalized vote contexts into the database.
func storeFinalizedVoteContext(finalizedConflicts FPCRecords) {
db := mongoDB()
if err := pingOrReconnectMongoDB(); err != nil {
return
Angelo Capossele
committed
if err := storeFPCRecords(finalizedConflicts, db); err != nil {
log.Errorf("Error while writing on MongoDB: %s", err)
Angelo Capossele
committed
// replay FPC records (past events).
func replayFPCRecords(ws *websocket.Conn) {
wsMessage := &wsmsg{MsgTypeFPC, activeConflicts.ToFPCUpdate()}
if err := ws.WriteJSON(wsMessage); err != nil {
log.Info(err)
return
}
if err := ws.SetWriteDeadline(time.Now().Add(webSocketWriteTimeout)); err != nil {
return