Skip to content
Snippets Groups Projects
fpc_livefeed.go 5.35 KiB
Newer Older
	"github.com/gorilla/websocket"
	"github.com/iotaledger/goshimmer/packages/metrics"
	"github.com/iotaledger/goshimmer/packages/shutdown"
	"github.com/iotaledger/goshimmer/packages/vote"
	"github.com/iotaledger/goshimmer/plugins/analysis/packet"
	analysis "github.com/iotaledger/goshimmer/plugins/analysis/server"
	"github.com/iotaledger/goshimmer/plugins/config"
	"github.com/iotaledger/hive.go/daemon"
	"github.com/iotaledger/hive.go/events"
	"github.com/iotaledger/hive.go/workerpool"
	"go.mongodb.org/mongo-driver/bson/primitive"
	fpcLiveFeedWorkerPool      *workerpool.WorkerPool

	fpcStoreFinalizedWorkerCount     = runtime.GOMAXPROCS(0)
	fpcStoreFinalizedWorkerQueueSize = 300
	fpcStoreFinalizedWorkerPool      *workerpool.WorkerPool
// FPCUpdate contains an FPC update.
type FPCUpdate struct {
	Conflicts conflictSet `json:"conflictset" bson:"conflictset"`
	if config.Node().GetBool(CfgMongoDBEnabled) {
	fpcLiveFeedWorkerPool = workerpool.New(func(task workerpool.Task) {
		newMsg := task.Param(0).(*FPCUpdate)
		broadcastWsMessage(&wsmsg{MsgTypeFPC, newMsg}, true)
		task.Return(nil)
	}, workerpool.WorkerCount(fpcLiveFeedWorkerCount), workerpool.QueueSize(fpcLiveFeedWorkerQueueSize))
	if config.Node().GetBool(CfgMongoDBEnabled) {
		fpcStoreFinalizedWorkerPool = workerpool.New(func(task workerpool.Task) {
			storeFinalizedVoteContext(task.Param(0).(FPCRecords))
			task.Return(nil)
		}, workerpool.WorkerCount(fpcStoreFinalizedWorkerCount), workerpool.QueueSize(fpcStoreFinalizedWorkerQueueSize))
	}
}

func runFPCLiveFeed() {
	if err := daemon.BackgroundWorker("Analysis[FPCUpdater]", func(shutdownSignal <-chan struct{}) {
		onFPCHeartbeatReceived := events.NewClosure(func(hb *packet.FPCHeartbeat) {
			fpcLiveFeedWorkerPool.Submit(createFPCUpdate(hb))
		})
		analysis.Events.FPCHeartbeat.Attach(onFPCHeartbeatReceived)

		fpcLiveFeedWorkerPool.Start()
		defer fpcLiveFeedWorkerPool.Stop()

		if config.Node().GetBool(CfgMongoDBEnabled) {
			fpcStoreFinalizedWorkerPool.Start()
			defer fpcStoreFinalizedWorkerPool.Stop()
		}

		cleanUpTicker := time.NewTicker(1 * time.Minute)

		for {
			select {
			case <-shutdownSignal:
				log.Info("Stopping Analysis[FPCUpdater] ...")
				analysis.Events.FPCHeartbeat.Detach(onFPCHeartbeatReceived)
				cleanUpTicker.Stop()
				log.Info("Stopping Analysis[FPCUpdater] ... done")
				return
			case <-cleanUpTicker.C:
				log.Debug("Cleaning up Finalized Conflicts ...")
				activeConflicts.cleanUp()
				log.Debug("Cleaning up Finalized Conflicts ... done")
			}
		}
	}, shutdown.PriorityDashboard); err != nil {
		log.Panicf("Failed to start as daemon: %s", err)
func createFPCUpdate(hb *packet.FPCHeartbeat) *FPCUpdate {
	// prepare the update
	conflicts := make(conflictSet)
	nodeID := shortNodeIDString(hb.OwnID)
	for ID, context := range hb.RoundStats.ActiveVoteContexts {
			NodeID:   nodeID,
			Rounds:   context.Rounds,
			Opinions: vote.ConvertOpinionsToInts32(context.Opinions),
		}

		conflicts[ID] = newConflict()
		conflicts[ID].NodesView[nodeID] = newVoteContext

		// update recorded events
		activeConflicts.update(ID, conflict{NodesView: map[string]voteContext{nodeID: newVoteContext}})
	}

	// check finalized conflicts
	if len(hb.Finalized) == 0 {
		return &FPCUpdate{Conflicts: conflicts}
	}

	finalizedConflicts := make(FPCRecords, len(hb.Finalized))
	i := 0
	for ID, finalOpinion := range hb.Finalized {
		conflictOverview, ok := activeConflicts.load(ID)
		if !ok {
			log.Error("Error: missing conflict with ID:", ID)
			continue
		}
		conflictDetail := conflictOverview.NodesView[nodeID]
		conflictDetail.Outcome = vote.ConvertOpinionToInt32(finalOpinion)
		conflicts[ID] = newConflict()
		conflicts[ID].NodesView[nodeID] = conflictDetail
		activeConflicts.update(ID, conflicts[ID])
		finalizedConflicts[i] = FPCRecord{
			ConflictID: ID,
			NodeID:     conflictDetail.NodeID,
			Rounds:     conflictDetail.Rounds,
			Opinions:   conflictDetail.Opinions,
			Outcome:    conflictDetail.Outcome,
			Time:       primitive.NewDateTimeFromTime(time.Now()),
		}
		i++

		metrics.Events().AnalysisFPCFinalized.Trigger(&metrics.AnalysisFPCFinalizedEvent{
			ConflictID: ID,
			NodeID:     conflictDetail.NodeID,
			Rounds:     conflictDetail.Rounds,
			Opinions:   vote.ConvertInts32ToOpinions(conflictDetail.Opinions),
			Outcome:    vote.ConvertInt32Opinion(conflictDetail.Outcome),
		})
	}

	if config.Node().GetBool(CfgMongoDBEnabled) {
		fpcStoreFinalizedWorkerPool.TrySubmit(finalizedConflicts)
	}

	return &FPCUpdate{Conflicts: conflicts}
}

// stores the given finalized vote contexts into the database.
func storeFinalizedVoteContext(finalizedConflicts FPCRecords) {
	db := mongoDB()

	if err := pingOrReconnectMongoDB(); err != nil {
		return
	if err := storeFPCRecords(finalizedConflicts, db); err != nil {
		log.Errorf("Error while writing on MongoDB: %s", err)
// replay FPC records (past events).
func replayFPCRecords(ws *websocket.Conn) {
	wsMessage := &wsmsg{MsgTypeFPC, activeConflicts.ToFPCUpdate()}

	if err := ws.WriteJSON(wsMessage); err != nil {
		log.Info(err)
		return
	}
	if err := ws.SetWriteDeadline(time.Now().Add(webSocketWriteTimeout)); err != nil {
		return