Skip to content
Snippets Groups Projects
Unverified Commit 46e28b7f authored by capossele's avatar capossele
Browse files

Merge branch 'merge/fpc-test-value-transfer' into feat/prometheus

parents cbe5fc1d bbe187b2
No related branches found
No related tags found
No related merge requests found
......@@ -4,16 +4,23 @@ import "sync"
type conflictRecord struct {
conflictSet conflictSet
size uint32
buffer []string
lock sync.RWMutex
}
func newConflictRecord(recordSize uint32) *conflictRecord {
func newConflictRecord() *conflictRecord {
return &conflictRecord{
conflictSet: make(conflictSet),
size: recordSize,
buffer: []string{},
}
}
func (cr *conflictRecord) cleanUp() {
cr.lock.Lock()
defer cr.lock.Unlock()
for id, conflict := range cr.conflictSet {
if conflict.isFinalized() {
delete(cr.conflictSet, id)
}
}
}
......@@ -21,32 +28,45 @@ func (cr *conflictRecord) ToFPCUpdate() *FPCUpdate {
cr.lock.RLock()
defer cr.lock.RUnlock()
// log.Info("FPC refresh conflicts: ", len(cr.conflictSet))
// for k, v := range cr.conflictSet {
// log.Info("Conflict ID: ", k, len(v.NodesView))
// }
return &FPCUpdate{
Conflicts: cr.conflictSet,
}
}
func (cr *conflictRecord) load(ID string) (conflict, bool) {
cr.lock.RLock()
defer cr.lock.RUnlock()
// update the internal state
if c, ok := cr.conflictSet[ID]; !ok {
return c, false
}
return cr.conflictSet[ID], true
}
func (cr *conflictRecord) update(ID string, c conflict) {
lock.Lock()
defer lock.Unlock()
cr.lock.Lock()
defer cr.lock.Unlock()
// update the internal state
if _, ok := cr.conflictSet[ID]; !ok {
cr.conflictSet[ID] = newConflict()
cr.buffer = append(cr.buffer, ID)
if len(cr.buffer) > int(cr.size) {
delete(cr.conflictSet, cr.buffer[0])
cr.buffer = cr.buffer[1:]
}
}
for nodeID, context := range c.NodesView {
cr.conflictSet[ID].NodesView[nodeID] = context
}
}
func (cr *conflictRecord) delete(ID string) {
cr.lock.Lock()
defer cr.lock.Unlock()
// update the internal state
if _, ok := cr.conflictSet[ID]; !ok {
return
}
delete(cr.conflictSet, ID)
}
......@@ -7,9 +7,8 @@ import (
)
func TestConflictRecordUpdate(t *testing.T) {
// test ConflictRecord creation
c := newConflictRecord(2)
require.Equal(t, 2, int(c.size))
// ConflictRecord creation
c := newConflictRecord()
// test first new update
conflictA := conflict{
......@@ -25,8 +24,6 @@ func TestConflictRecordUpdate(t *testing.T) {
c.update("A", conflictA)
require.Equal(t, conflictA, c.conflictSet["A"])
require.Equal(t, 1, len(c.buffer))
require.Contains(t, c.buffer, "A")
// test second new update
conflictB := conflict{
......@@ -42,8 +39,6 @@ func TestConflictRecordUpdate(t *testing.T) {
c.update("B", conflictB)
require.Equal(t, conflictB, c.conflictSet["B"])
require.Equal(t, 2, len(c.buffer))
require.Contains(t, c.buffer, "B")
// test modify existing entry
conflictB = conflict{
......@@ -57,29 +52,9 @@ func TestConflictRecordUpdate(t *testing.T) {
},
}
c.update("B", conflictB)
require.Equal(t, conflictB, c.conflictSet["B"])
require.Equal(t, 2, len(c.buffer))
require.Contains(t, c.buffer, "B")
// test last update and first update entry removal
conflictC := conflict{
NodesView: map[string]voteContext{
"nodeC": {
NodeID: "nodeC",
Rounds: 3,
Opinions: []int32{disliked, liked, disliked},
Status: liked,
},
},
}
c.update("C", conflictC)
require.Equal(t, conflictC, c.conflictSet["C"])
require.Equal(t, 2, len(c.buffer))
require.Contains(t, c.buffer, "C")
require.NotContains(t, c.conflictSet, "A")
require.NotContains(t, c.buffer, "A")
// test entry removal
c.delete("B")
require.NotContains(t, c.conflictSet, "B")
}
package dashboard
import (
"fmt"
"time"
"github.com/gorilla/websocket"
......@@ -9,7 +8,6 @@ import (
"github.com/iotaledger/goshimmer/packages/vote"
"github.com/iotaledger/goshimmer/plugins/analysis/packet"
analysis "github.com/iotaledger/goshimmer/plugins/analysis/server"
"github.com/iotaledger/goshimmer/plugins/config"
"github.com/iotaledger/hive.go/daemon"
"github.com/iotaledger/hive.go/events"
"github.com/iotaledger/hive.go/workerpool"
......@@ -36,11 +34,11 @@ type FPCUpdate struct {
}
func configureFPCLiveFeed() {
recordedConflicts = newConflictRecord(config.Node.GetUint32(CfgFPCBufferSize))
recordedConflicts = newConflictRecord()
fpcLiveFeedWorkerPool = workerpool.New(func(task workerpool.Task) {
newMsg := task.Param(0).(*FPCUpdate)
fmt.Println("broadcasting FPC message to websocket clients")
//fmt.Println("broadcasting FPC message to websocket clients")
broadcastWsMessage(&wsmsg{MsgTypeFPC, newMsg}, true)
task.Return(nil)
}, workerpool.WorkerCount(fpcLiveFeedWorkerCount), workerpool.QueueSize(fpcLiveFeedWorkerQueueSize))
......@@ -49,7 +47,7 @@ func configureFPCLiveFeed() {
func runFPCLiveFeed() {
if err := daemon.BackgroundWorker("Analysis[FPCUpdater]", func(shutdownSignal <-chan struct{}) {
onFPCHeartbeatReceived := events.NewClosure(func(hb *packet.FPCHeartbeat) {
fmt.Println("broadcasting FPC live feed")
//fmt.Println("broadcasting FPC live feed")
fpcLiveFeedWorkerPool.Submit(createFPCUpdate(hb, true))
})
analysis.Events.FPCHeartbeat.Attach(onFPCHeartbeatReceived)
......@@ -57,10 +55,21 @@ func runFPCLiveFeed() {
fpcLiveFeedWorkerPool.Start()
defer fpcLiveFeedWorkerPool.Stop()
<-shutdownSignal
log.Info("Stopping Analysis[FPCUpdater] ...")
analysis.Events.FPCHeartbeat.Detach(onFPCHeartbeatReceived)
log.Info("Stopping Analysis[FPCUpdater] ... done")
cleanUpTicker := time.NewTicker(1 * time.Minute)
for {
select {
case <-shutdownSignal:
log.Info("Stopping Analysis[FPCUpdater] ...")
analysis.Events.FPCHeartbeat.Detach(onFPCHeartbeatReceived)
cleanUpTicker.Stop()
log.Info("Stopping Analysis[FPCUpdater] ... done")
case <-cleanUpTicker.C:
log.Info("Cleaning up Finalized Conflicts ...")
recordedConflicts.cleanUp()
log.Info("Cleaning up Finalized Conflicts ... done")
}
}
}, shutdown.PriorityDashboard); err != nil {
log.Panicf("Failed to start as daemon: %s", err)
}
......@@ -92,12 +101,16 @@ func createFPCUpdate(hb *packet.FPCHeartbeat, recordEvent bool) *FPCUpdate {
finalizedConflicts := make([]FPCRecord, len(hb.Finalized))
i := 0
for ID, finalOpinion := range hb.Finalized {
recordedConflicts.lock.Lock()
conflictDetail := recordedConflicts.conflictSet[ID].NodesView[nodeID]
conflictOverview, ok := recordedConflicts.load(ID)
if !ok {
log.Error("Error: missing conflict with ID:", ID)
continue
}
conflictDetail := conflictOverview.NodesView[nodeID]
conflictDetail.Status = vote.ConvertOpinionToInt32(finalOpinion)
conflicts[ID] = newConflict()
conflicts[ID].NodesView[nodeID] = conflictDetail
recordedConflicts.conflictSet[ID].NodesView[nodeID] = conflictDetail
recordedConflicts.update(ID, conflicts[ID])
finalizedConflicts[i] = FPCRecord{
ConflictID: ID,
NodeID: conflictDetail.NodeID,
......@@ -105,10 +118,10 @@ func createFPCUpdate(hb *packet.FPCHeartbeat, recordEvent bool) *FPCUpdate {
Opinions: conflictDetail.Opinions,
Status: conflictDetail.Status,
}
recordedConflicts.lock.Unlock()
i++
}
//log.Info("Storing:\n", finalizedConflicts)
err := storeFPCRecords(finalizedConflicts, mongoDB())
if err != nil {
log.Errorf("Error while writing on MongoDB: %s", err)
......
......@@ -21,20 +21,21 @@ type FPCRecord struct {
}
var (
db *mongo.Database
ctxDB context.Context
cancelDB context.CancelFunc
db *mongo.Database
ctxDisconnectDB context.Context
// cancelDB context.CancelFunc
clientDB *mongo.Client
dbOnce sync.Once
)
func shutdownMongoDB() {
cancelDB()
clientDB.Disconnect(ctxDB)
//cancelDB()
clientDB.Disconnect(ctxDisconnectDB)
}
func mongoDB() *mongo.Database {
dbOnce.Do(func() {
log.Info("ONCEEEEEEE")
username := config.Node.GetString(CfgMongoDBUsername)
password := config.Node.GetString(CfgMongoDBPassword)
bindAddr := config.Node.GetString(CfgMongoDBBindAddress)
......@@ -42,13 +43,17 @@ func mongoDB() *mongo.Database {
if err != nil {
log.Fatal(err)
}
ctxDB, cancelDB = context.WithTimeout(context.Background(), 10*time.Second)
err = client.Connect(ctxDB)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
err = client.Connect(ctx)
ctxDisconnectDB = ctx
defer cancel()
if err != nil {
log.Fatal(err)
}
err = client.Ping(ctxDB, readpref.Primary())
ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
err = client.Ping(ctx, readpref.Primary())
if err != nil {
log.Fatal(err)
}
......@@ -62,6 +67,8 @@ func storeFPCRecords(records []FPCRecord, db *mongo.Database) error {
for i := range records {
data[i] = records[i]
}
_, err := db.Collection("FPC").InsertMany(ctxDB, data)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
_, err := db.Collection("FPC").InsertMany(ctx, data)
return err
}
......@@ -14,7 +14,7 @@ var Plugin = node.NewPlugin(PluginName, node.Enabled, configure, run)
const (
// AppVersion version number
AppVersion = "v0.666.0"
AppVersion = "v0.2.0"
// AppName app code name
AppName = "GoShimmer"
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment