Skip to content
Snippets Groups Projects
Commit 26f5b516 authored by capossele's avatar capossele
Browse files

:sparkles: improve FPC analysis

parent 4ba3c353
No related branches found
No related tags found
No related merge requests found
package dashboard
type ConflictSet = map[string]Conflict
// Conflict defines the struct for the opinions of the nodes regarding a given conflict.
type Conflict struct {
NodesView map[string]voteContext `json:"nodesview"`
}
type voteContext struct {
NodeID string `json:"nodeid"`
Rounds int `json:"rounds"`
Opinions []int32 `json:"opinions"`
Status int32 `json:"status"`
}
func newConflict() Conflict {
return Conflict{
NodesView: make(map[string]voteContext),
}
}
// isFinalized return true if all the nodes have finalized a given conflict.
// It also returns false if the given conflict has an empty nodesView.
func (c Conflict) isFinalized() bool {
if len(c.NodesView) == 0 {
return false
}
count := 0
for _, context := range c.NodesView {
if context.Status == liked || context.Status == disliked {
count++
}
}
return (count == len(c.NodesView))
}
// finalizationStatus returns the ratio of nodes that have finlized a given conflict.
func (c Conflict) finalizationStatus() float64 {
if len(c.NodesView) == 0 {
return 0
}
count := 0
for _, context := range c.NodesView {
if context.Status == liked || context.Status == disliked {
count++
}
}
return (float64(count) / float64(len(c.NodesView)))
}
package dashboard
import "sync"
type conflictRecord struct {
conflictSet ConflictSet
size uint32
buffer []string
lock sync.RWMutex
}
func NewConflictRecord(recordSize uint32) *conflictRecord {
return &conflictRecord{
conflictSet: make(ConflictSet),
size: recordSize,
buffer: []string{},
}
}
func (cr *conflictRecord) Update(ID string, c Conflict) {
lock.Lock()
defer lock.Unlock()
// update the internal state
if _, ok := cr.conflictSet[ID]; !ok {
cr.conflictSet[ID] = newConflict()
cr.buffer = append(cr.buffer, ID)
if len(cr.buffer) > int(cr.size) {
delete(cr.conflictSet, cr.buffer[0])
cr.buffer = cr.buffer[1:]
}
}
for nodeID, context := range c.NodesView {
cr.conflictSet[ID].NodesView[nodeID] = context
}
}
package dashboard
import (
"testing"
"github.com/stretchr/testify/require"
)
func TestConflictRecordUpdate(t *testing.T) {
// test ConflictRecord creation
c := NewConflictRecord(2)
require.Equal(t, 2, int(c.size))
// test first new update
conflictA := Conflict{
NodesView: map[string]voteContext{
"nodeA": {
NodeID: "nodeA",
Rounds: 3,
Opinions: []int32{disliked, liked, disliked},
Status: liked,
},
},
}
c.Update("A", conflictA)
require.Equal(t, conflictA, c.conflictSet["A"])
require.Equal(t, 1, len(c.buffer))
require.Contains(t, c.buffer, "A")
// test second new update
conflictB := Conflict{
NodesView: map[string]voteContext{
"nodeB": {
NodeID: "nodeB",
Rounds: 3,
Opinions: []int32{disliked, liked, disliked},
Status: liked,
},
},
}
c.Update("B", conflictB)
require.Equal(t, conflictB, c.conflictSet["B"])
require.Equal(t, 2, len(c.buffer))
require.Contains(t, c.buffer, "B")
// test modify existing entry
conflictB = Conflict{
NodesView: map[string]voteContext{
"nodeB": {
NodeID: "nodeB",
Rounds: 4,
Opinions: []int32{disliked, liked, disliked, liked},
Status: liked,
},
},
}
c.Update("B", conflictB)
require.Equal(t, conflictB, c.conflictSet["B"])
require.Equal(t, 2, len(c.buffer))
require.Contains(t, c.buffer, "B")
// test last update and first update entry removal
conflictC := Conflict{
NodesView: map[string]voteContext{
"nodeC": {
NodeID: "nodeC",
Rounds: 3,
Opinions: []int32{disliked, liked, disliked},
Status: liked,
},
},
}
c.Update("C", conflictC)
require.Equal(t, conflictC, c.conflictSet["C"])
require.Equal(t, 2, len(c.buffer))
require.Contains(t, c.buffer, "C")
require.NotContains(t, c.conflictSet, "A")
require.NotContains(t, c.buffer, "A")
}
package dashboard
import (
"testing"
"github.com/stretchr/testify/require"
)
// TestIsFinalized checks that for a given conflict, its method isFinalized works ok.
func TestIsFinalized(t *testing.T) {
tests := []struct {
Conflict
want bool
}{
{
Conflict: Conflict{
NodesView: map[string]voteContext{
"one": {Status: liked},
"two": {Status: disliked},
},
},
want: true,
},
{
Conflict: Conflict{
NodesView: map[string]voteContext{
"one": {Status: liked},
"two": {},
},
},
want: false,
},
{
Conflict: Conflict{},
want: false,
},
}
for _, conflictTest := range tests {
require.Equal(t, conflictTest.want, conflictTest.isFinalized())
}
}
// TestFinalizationStatus checks that for a given conflict, its method finalizationStatus works ok.
func TestFinalizationStatus(t *testing.T) {
tests := []struct {
Conflict
want float64
}{
{
Conflict: Conflict{
NodesView: map[string]voteContext{
"one": {Status: liked},
"two": {Status: disliked},
},
},
want: 1,
},
{
Conflict: Conflict{
NodesView: map[string]voteContext{
"one": {Status: liked},
"two": {},
},
},
want: 0.5,
},
{
Conflict: Conflict{},
want: 0,
},
}
for _, conflictTest := range tests {
require.Equal(t, conflictTest.want, conflictTest.finalizationStatus())
}
}
......@@ -11,37 +11,37 @@ import (
"github.com/iotaledger/hive.go/daemon"
"github.com/iotaledger/hive.go/events"
"github.com/iotaledger/hive.go/workerpool"
"github.com/mr-tron/base58/base58"
)
const (
unfinalized = 0
liked = 1
disliked = 2
)
var (
ErrConflictMissing = fmt.Errorf("conflictID missing")
)
var (
fpcLiveFeedWorkerCount = 1
fpcLiveFeedWorkerQueueSize = 200
fpcLiveFeedWorkerQueueSize = 300
fpcLiveFeedWorkerPool *workerpool.WorkerPool
conflicts map[string]Conflict
recordedConflicts *conflictRecord
)
// Conflict defines the struct for the opinions of the nodes regarding a given conflict.
type Conflict struct {
NodesView map[string]voteContext `json:"nodesview"`
}
type voteContext struct {
NodeID string `json:"nodeid"`
Rounds int `json:"rounds"`
Opinions []int32 `json:"opinions"`
Like int32 `json:"like"`
}
// FPCMsg contains an FPC update
type FPCMsg struct {
Nodes int `json:"nodes"`
ConflictSet map[string]Conflict `json:"conflictset"`
// FPCUpdate contains an FPC update.
type FPCUpdate struct {
Conflicts ConflictSet `json:"conflictset"`
}
func configureFPCLiveFeed() {
recordedConflicts = NewConflictRecord(100)
fpcLiveFeedWorkerPool = workerpool.New(func(task workerpool.Task) {
newMsg := task.Param(0).(*FPCMsg)
newMsg := task.Param(0).(*FPCUpdate)
broadcastWsMessage(&wsmsg{MsgTypeFPC, newMsg})
task.Return(nil)
}, workerpool.WorkerCount(fpcLiveFeedWorkerCount), workerpool.QueueSize(fpcLiveFeedWorkerQueueSize))
......@@ -49,7 +49,7 @@ func configureFPCLiveFeed() {
func runFPCLiveFeed() {
daemon.BackgroundWorker("Analysis[FPCUpdater]", func(shutdownSignal <-chan struct{}) {
newMsgRateLimiter := time.NewTicker(time.Second / 100)
newMsgRateLimiter := time.NewTicker(time.Millisecond)
defer newMsgRateLimiter.Stop()
onFPCHeartbeatReceived := events.NewClosure(func(hb *packet.FPCHeartbeat) {
......@@ -71,29 +71,30 @@ func runFPCLiveFeed() {
}, shutdown.PriorityDashboard)
}
func createFPCUpdate(hb *packet.FPCHeartbeat) *FPCMsg {
update := make(map[string]Conflict)
conflictIds := ""
nodeID := fmt.Sprintf("%x", hb.OwnID[:8])
func createFPCUpdate(hb *packet.FPCHeartbeat) *FPCUpdate {
// prepare the update
conflicts := make(map[string]Conflict)
nodeID := base58.Encode(hb.OwnID)
for ID, context := range hb.RoundStats.ActiveVoteContexts {
conflictIds += fmt.Sprintf("%s - ", ID)
update[ID] = newConflict()
update[ID].NodesView[nodeID] = voteContext{
newVoteContext := voteContext{
NodeID: nodeID,
Rounds: context.Rounds,
Opinions: vote.ConvertOpinionsToInts32(context.Opinions),
}
// check conflict has been finalized
if _, ok := hb.Finalized[ID]; ok {
newVoteContext.Status = vote.ConvertOpinionToInt32(hb.Finalized[ID])
}
log.Infow("FPC-hb:", "nodeID", nodeID, "conflicts:", conflictIds)
conflicts[ID] = newConflict()
conflicts[ID].NodesView[nodeID] = newVoteContext
return &FPCMsg{
ConflictSet: update,
}
// update recorded events
recordedConflicts.Update(ID, Conflict{NodesView: map[string]voteContext{nodeID: newVoteContext}})
}
func newConflict() Conflict {
return Conflict{
NodesView: make(map[string]voteContext),
return &FPCUpdate{
Conflicts: conflicts,
}
}
package dashboard
import (
"crypto/sha256"
"fmt"
"testing"
"time"
"github.com/iotaledger/goshimmer/packages/vote"
"github.com/iotaledger/goshimmer/plugins/analysis/packet"
"github.com/stretchr/testify/require"
)
// TestCreateFPCUpdate checks that given a FPC heartbeat, the returned FPCUpdate is ok.
func TestCreateFPCUpdate(t *testing.T) {
ownID := sha256.Sum256([]byte{'A'})
shortOwnID := fmt.Sprintf("%x", ownID[:8])
// create a FPCHeartbeat
hbTest := &packet.FPCHeartbeat{
OwnID: ownID[:],
RoundStats: vote.RoundStats{
Duration: time.Second,
RandUsed: 0.5,
ActiveVoteContexts: map[string]*vote.Context{
"one": {
ID: "one",
Liked: 1.,
Rounds: 3,
Opinions: []vote.Opinion{vote.Dislike, vote.Like, vote.Dislike},
}},
},
Finalized: map[string]vote.Opinion{"one": vote.Like},
}
// create a matching FPCUpdate
want := &FPCUpdate{
Conflicts: map[string]Conflict{
"one": {
NodesView: map[string]voteContext{
shortOwnID: {
NodeID: shortOwnID,
Rounds: 3,
Opinions: []int32{disliked, liked, disliked},
Status: liked,
},
},
},
},
}
// check that createFPCUpdate returns a matching FPCMsg
require.Equal(t, want, createFPCUpdate(hbTest))
}
......@@ -26,13 +26,12 @@ class VoteContext {
nodeid: string;
rounds: number;
opinions: number[];
like: number;
status: number;
}
class Conflict {
nodesview: Map<string, VoteContext>
}
export class FPCMessage {
nodes: number;
conflictset: Map<string, Conflict>
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment