Skip to content
Snippets Groups Projects
fpc_livefeed.go 4.18 KiB
package dashboard

import (
	"time"

	"github.com/gorilla/websocket"
	"github.com/iotaledger/goshimmer/packages/shutdown"
	"github.com/iotaledger/goshimmer/packages/vote"
	"github.com/iotaledger/goshimmer/plugins/analysis/packet"
	analysis "github.com/iotaledger/goshimmer/plugins/analysis/server"
	"github.com/iotaledger/hive.go/daemon"
	"github.com/iotaledger/hive.go/events"
	"github.com/iotaledger/hive.go/workerpool"
	"github.com/mr-tron/base58/base58"
)

const (
	unfinalized = 0
	liked       = 1
	disliked    = 2
)

var (
	fpcLiveFeedWorkerCount     = 1
	fpcLiveFeedWorkerQueueSize = 300
	fpcLiveFeedWorkerPool      *workerpool.WorkerPool

	recordedConflicts *conflictRecord
)

// FPCUpdate contains an FPC update.
type FPCUpdate struct {
	Conflicts conflictSet `json:"conflictset" bson:"conflictset"`
}

func configureFPCLiveFeed() {
	recordedConflicts = newConflictRecord()

	fpcLiveFeedWorkerPool = workerpool.New(func(task workerpool.Task) {
		newMsg := task.Param(0).(*FPCUpdate)
		//fmt.Println("broadcasting FPC message to websocket clients")
		broadcastWsMessage(&wsmsg{MsgTypeFPC, newMsg}, true)
		task.Return(nil)
	}, workerpool.WorkerCount(fpcLiveFeedWorkerCount), workerpool.QueueSize(fpcLiveFeedWorkerQueueSize))
}

func runFPCLiveFeed() {
	if err := daemon.BackgroundWorker("Analysis[FPCUpdater]", func(shutdownSignal <-chan struct{}) {
		onFPCHeartbeatReceived := events.NewClosure(func(hb *packet.FPCHeartbeat) {
			//fmt.Println("broadcasting FPC live feed")
			fpcLiveFeedWorkerPool.Submit(createFPCUpdate(hb, true))
		})
		analysis.Events.FPCHeartbeat.Attach(onFPCHeartbeatReceived)

		fpcLiveFeedWorkerPool.Start()
		defer fpcLiveFeedWorkerPool.Stop()

		cleanUpTicker := time.NewTicker(1 * time.Minute)

		for {
			select {
			case <-shutdownSignal:
				log.Info("Stopping Analysis[FPCUpdater] ...")
				analysis.Events.FPCHeartbeat.Detach(onFPCHeartbeatReceived)
				cleanUpTicker.Stop()
				log.Info("Stopping Analysis[FPCUpdater] ... done")
			case <-cleanUpTicker.C:
				log.Info("Cleaning up Finalized Conflicts ...")
				recordedConflicts.cleanUp()
				log.Info("Cleaning up Finalized Conflicts ... done")
			}
		}
	}, shutdown.PriorityDashboard); err != nil {
		log.Panicf("Failed to start as daemon: %s", err)
	}
}

func createFPCUpdate(hb *packet.FPCHeartbeat, recordEvent bool) *FPCUpdate {
	// prepare the update
	conflicts := make(conflictSet)
	nodeID := base58.Encode(hb.OwnID)
	for ID, context := range hb.RoundStats.ActiveVoteContexts {
		newVoteContext := voteContext{
			NodeID:   nodeID,
			Rounds:   context.Rounds,
			Opinions: vote.ConvertOpinionsToInts32(context.Opinions),
		}

		conflicts[ID] = newConflict()
		conflicts[ID].NodesView[nodeID] = newVoteContext

		if recordEvent {
			// update recorded events
			recordedConflicts.update(ID, conflict{NodesView: map[string]voteContext{nodeID: newVoteContext}})
		}
	}

	if recordEvent {
		// check finalized conflicts
		if len(hb.Finalized) > 0 {
			finalizedConflicts := make([]FPCRecord, len(hb.Finalized))
			i := 0
			for ID, finalOpinion := range hb.Finalized {
				conflictOverview, ok := recordedConflicts.load(ID)
				if !ok {
					log.Error("Error: missing conflict with ID:", ID)
					continue
				}
				conflictDetail := conflictOverview.NodesView[nodeID]
				conflictDetail.Status = vote.ConvertOpinionToInt32(finalOpinion)
				conflicts[ID] = newConflict()
				conflicts[ID].NodesView[nodeID] = conflictDetail
				recordedConflicts.update(ID, conflicts[ID])
				finalizedConflicts[i] = FPCRecord{
					ConflictID: ID,
					NodeID:     conflictDetail.NodeID,
					Rounds:     conflictDetail.Rounds,
					Opinions:   conflictDetail.Opinions,
					Status:     conflictDetail.Status,
				}
				i++
			}

			//log.Info("Storing:\n", finalizedConflicts)
			err := storeFPCRecords(finalizedConflicts, mongoDB())
			if err != nil {
				log.Errorf("Error while writing on MongoDB: %s", err)
			}
		}
	}

	return &FPCUpdate{
		Conflicts: conflicts,
	}
}

// replay FPC records (past events).
func replayFPCRecords(ws *websocket.Conn) {
	wsMessage := &wsmsg{MsgTypeFPC, recordedConflicts.ToFPCUpdate()}
	if err := ws.WriteJSON(wsMessage); err != nil {
		log.Info(err)
		return
	}
	if err := ws.SetWriteDeadline(time.Now().Add(webSocketWriteTimeout)); err != nil {
		return
	}
}