Skip to content
Snippets Groups Projects
Unverified Commit b48b24f3 authored by capossele's avatar capossele
Browse files

:bug: Fix MongoDB ctx bug

parent 6a1ec92f
No related branches found
No related tags found
No related merge requests found
...@@ -4,16 +4,12 @@ import "sync" ...@@ -4,16 +4,12 @@ import "sync"
type conflictRecord struct { type conflictRecord struct {
conflictSet conflictSet conflictSet conflictSet
size uint32
buffer []string
lock sync.RWMutex lock sync.RWMutex
} }
func newConflictRecord(recordSize uint32) *conflictRecord { func newConflictRecord() *conflictRecord {
return &conflictRecord{ return &conflictRecord{
conflictSet: make(conflictSet), conflictSet: make(conflictSet),
size: recordSize,
buffer: []string{},
} }
} }
...@@ -21,32 +17,45 @@ func (cr *conflictRecord) ToFPCUpdate() *FPCUpdate { ...@@ -21,32 +17,45 @@ func (cr *conflictRecord) ToFPCUpdate() *FPCUpdate {
cr.lock.RLock() cr.lock.RLock()
defer cr.lock.RUnlock() defer cr.lock.RUnlock()
// log.Info("FPC refresh conflicts: ", len(cr.conflictSet))
// for k, v := range cr.conflictSet {
// log.Info("Conflict ID: ", k, len(v.NodesView))
// }
return &FPCUpdate{ return &FPCUpdate{
Conflicts: cr.conflictSet, Conflicts: cr.conflictSet,
} }
} }
func (cr *conflictRecord) load(ID string) (conflict, bool) {
cr.lock.RLock()
defer cr.lock.RUnlock()
// update the internal state
if c, ok := cr.conflictSet[ID]; !ok {
return c, false
}
return cr.conflictSet[ID], true
}
func (cr *conflictRecord) update(ID string, c conflict) { func (cr *conflictRecord) update(ID string, c conflict) {
lock.Lock() cr.lock.Lock()
defer lock.Unlock() defer cr.lock.Unlock()
// update the internal state // update the internal state
if _, ok := cr.conflictSet[ID]; !ok { if _, ok := cr.conflictSet[ID]; !ok {
cr.conflictSet[ID] = newConflict() cr.conflictSet[ID] = newConflict()
cr.buffer = append(cr.buffer, ID)
if len(cr.buffer) > int(cr.size) {
delete(cr.conflictSet, cr.buffer[0])
cr.buffer = cr.buffer[1:]
}
} }
for nodeID, context := range c.NodesView { for nodeID, context := range c.NodesView {
cr.conflictSet[ID].NodesView[nodeID] = context cr.conflictSet[ID].NodesView[nodeID] = context
} }
} }
func (cr *conflictRecord) delete(ID string) {
cr.lock.Lock()
defer cr.lock.Unlock()
// update the internal state
if _, ok := cr.conflictSet[ID]; !ok {
return
}
delete(cr.conflictSet, ID)
}
...@@ -7,9 +7,8 @@ import ( ...@@ -7,9 +7,8 @@ import (
) )
func TestConflictRecordUpdate(t *testing.T) { func TestConflictRecordUpdate(t *testing.T) {
// test ConflictRecord creation // ConflictRecord creation
c := newConflictRecord(2) c := newConflictRecord()
require.Equal(t, 2, int(c.size))
// test first new update // test first new update
conflictA := conflict{ conflictA := conflict{
...@@ -25,8 +24,6 @@ func TestConflictRecordUpdate(t *testing.T) { ...@@ -25,8 +24,6 @@ func TestConflictRecordUpdate(t *testing.T) {
c.update("A", conflictA) c.update("A", conflictA)
require.Equal(t, conflictA, c.conflictSet["A"]) require.Equal(t, conflictA, c.conflictSet["A"])
require.Equal(t, 1, len(c.buffer))
require.Contains(t, c.buffer, "A")
// test second new update // test second new update
conflictB := conflict{ conflictB := conflict{
...@@ -42,8 +39,6 @@ func TestConflictRecordUpdate(t *testing.T) { ...@@ -42,8 +39,6 @@ func TestConflictRecordUpdate(t *testing.T) {
c.update("B", conflictB) c.update("B", conflictB)
require.Equal(t, conflictB, c.conflictSet["B"]) require.Equal(t, conflictB, c.conflictSet["B"])
require.Equal(t, 2, len(c.buffer))
require.Contains(t, c.buffer, "B")
// test modify existing entry // test modify existing entry
conflictB = conflict{ conflictB = conflict{
...@@ -57,29 +52,9 @@ func TestConflictRecordUpdate(t *testing.T) { ...@@ -57,29 +52,9 @@ func TestConflictRecordUpdate(t *testing.T) {
}, },
} }
c.update("B", conflictB) c.update("B", conflictB)
require.Equal(t, conflictB, c.conflictSet["B"]) require.Equal(t, conflictB, c.conflictSet["B"])
require.Equal(t, 2, len(c.buffer))
require.Contains(t, c.buffer, "B")
// test last update and first update entry removal
conflictC := conflict{
NodesView: map[string]voteContext{
"nodeC": {
NodeID: "nodeC",
Rounds: 3,
Opinions: []int32{disliked, liked, disliked},
Status: liked,
},
},
}
c.update("C", conflictC)
require.Equal(t, conflictC, c.conflictSet["C"])
require.Equal(t, 2, len(c.buffer))
require.Contains(t, c.buffer, "C")
require.NotContains(t, c.conflictSet, "A")
require.NotContains(t, c.buffer, "A")
// test entry removal
c.delete("B")
require.NotContains(t, c.conflictSet, "B")
} }
package dashboard package dashboard
import ( import (
"fmt"
"time" "time"
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
...@@ -9,7 +8,6 @@ import ( ...@@ -9,7 +8,6 @@ import (
"github.com/iotaledger/goshimmer/packages/vote" "github.com/iotaledger/goshimmer/packages/vote"
"github.com/iotaledger/goshimmer/plugins/analysis/packet" "github.com/iotaledger/goshimmer/plugins/analysis/packet"
analysis "github.com/iotaledger/goshimmer/plugins/analysis/server" analysis "github.com/iotaledger/goshimmer/plugins/analysis/server"
"github.com/iotaledger/goshimmer/plugins/config"
"github.com/iotaledger/hive.go/daemon" "github.com/iotaledger/hive.go/daemon"
"github.com/iotaledger/hive.go/events" "github.com/iotaledger/hive.go/events"
"github.com/iotaledger/hive.go/workerpool" "github.com/iotaledger/hive.go/workerpool"
...@@ -36,11 +34,11 @@ type FPCUpdate struct { ...@@ -36,11 +34,11 @@ type FPCUpdate struct {
} }
func configureFPCLiveFeed() { func configureFPCLiveFeed() {
recordedConflicts = newConflictRecord(config.Node.GetUint32(CfgFPCBufferSize)) recordedConflicts = newConflictRecord()
fpcLiveFeedWorkerPool = workerpool.New(func(task workerpool.Task) { fpcLiveFeedWorkerPool = workerpool.New(func(task workerpool.Task) {
newMsg := task.Param(0).(*FPCUpdate) newMsg := task.Param(0).(*FPCUpdate)
fmt.Println("broadcasting FPC message to websocket clients") //fmt.Println("broadcasting FPC message to websocket clients")
broadcastWsMessage(&wsmsg{MsgTypeFPC, newMsg}, true) broadcastWsMessage(&wsmsg{MsgTypeFPC, newMsg}, true)
task.Return(nil) task.Return(nil)
}, workerpool.WorkerCount(fpcLiveFeedWorkerCount), workerpool.QueueSize(fpcLiveFeedWorkerQueueSize)) }, workerpool.WorkerCount(fpcLiveFeedWorkerCount), workerpool.QueueSize(fpcLiveFeedWorkerQueueSize))
...@@ -49,7 +47,7 @@ func configureFPCLiveFeed() { ...@@ -49,7 +47,7 @@ func configureFPCLiveFeed() {
func runFPCLiveFeed() { func runFPCLiveFeed() {
if err := daemon.BackgroundWorker("Analysis[FPCUpdater]", func(shutdownSignal <-chan struct{}) { if err := daemon.BackgroundWorker("Analysis[FPCUpdater]", func(shutdownSignal <-chan struct{}) {
onFPCHeartbeatReceived := events.NewClosure(func(hb *packet.FPCHeartbeat) { onFPCHeartbeatReceived := events.NewClosure(func(hb *packet.FPCHeartbeat) {
fmt.Println("broadcasting FPC live feed") //fmt.Println("broadcasting FPC live feed")
fpcLiveFeedWorkerPool.Submit(createFPCUpdate(hb, true)) fpcLiveFeedWorkerPool.Submit(createFPCUpdate(hb, true))
}) })
analysis.Events.FPCHeartbeat.Attach(onFPCHeartbeatReceived) analysis.Events.FPCHeartbeat.Attach(onFPCHeartbeatReceived)
...@@ -92,12 +90,16 @@ func createFPCUpdate(hb *packet.FPCHeartbeat, recordEvent bool) *FPCUpdate { ...@@ -92,12 +90,16 @@ func createFPCUpdate(hb *packet.FPCHeartbeat, recordEvent bool) *FPCUpdate {
finalizedConflicts := make([]FPCRecord, len(hb.Finalized)) finalizedConflicts := make([]FPCRecord, len(hb.Finalized))
i := 0 i := 0
for ID, finalOpinion := range hb.Finalized { for ID, finalOpinion := range hb.Finalized {
recordedConflicts.lock.Lock() conflictOverview, ok := recordedConflicts.load(ID)
conflictDetail := recordedConflicts.conflictSet[ID].NodesView[nodeID] if !ok {
log.Error("Error: missing conflict with ID:", ID)
continue
}
conflictDetail := conflictOverview.NodesView[nodeID]
conflictDetail.Status = vote.ConvertOpinionToInt32(finalOpinion) conflictDetail.Status = vote.ConvertOpinionToInt32(finalOpinion)
conflicts[ID] = newConflict() conflicts[ID] = newConflict()
conflicts[ID].NodesView[nodeID] = conflictDetail conflicts[ID].NodesView[nodeID] = conflictDetail
recordedConflicts.conflictSet[ID].NodesView[nodeID] = conflictDetail recordedConflicts.update(ID, conflicts[ID])
finalizedConflicts[i] = FPCRecord{ finalizedConflicts[i] = FPCRecord{
ConflictID: ID, ConflictID: ID,
NodeID: conflictDetail.NodeID, NodeID: conflictDetail.NodeID,
...@@ -105,10 +107,10 @@ func createFPCUpdate(hb *packet.FPCHeartbeat, recordEvent bool) *FPCUpdate { ...@@ -105,10 +107,10 @@ func createFPCUpdate(hb *packet.FPCHeartbeat, recordEvent bool) *FPCUpdate {
Opinions: conflictDetail.Opinions, Opinions: conflictDetail.Opinions,
Status: conflictDetail.Status, Status: conflictDetail.Status,
} }
recordedConflicts.lock.Unlock()
i++ i++
} }
//log.Info("Storing:\n", finalizedConflicts)
err := storeFPCRecords(finalizedConflicts, mongoDB()) err := storeFPCRecords(finalizedConflicts, mongoDB())
if err != nil { if err != nil {
log.Errorf("Error while writing on MongoDB: %s", err) log.Errorf("Error while writing on MongoDB: %s", err)
......
...@@ -21,20 +21,21 @@ type FPCRecord struct { ...@@ -21,20 +21,21 @@ type FPCRecord struct {
} }
var ( var (
db *mongo.Database db *mongo.Database
ctxDB context.Context ctxDisconnectDB context.Context
cancelDB context.CancelFunc // cancelDB context.CancelFunc
clientDB *mongo.Client clientDB *mongo.Client
dbOnce sync.Once dbOnce sync.Once
) )
func shutdownMongoDB() { func shutdownMongoDB() {
cancelDB() //cancelDB()
clientDB.Disconnect(ctxDB) clientDB.Disconnect(ctxDisconnectDB)
} }
func mongoDB() *mongo.Database { func mongoDB() *mongo.Database {
dbOnce.Do(func() { dbOnce.Do(func() {
log.Info("ONCEEEEEEE")
username := config.Node.GetString(CfgMongoDBUsername) username := config.Node.GetString(CfgMongoDBUsername)
password := config.Node.GetString(CfgMongoDBPassword) password := config.Node.GetString(CfgMongoDBPassword)
bindAddr := config.Node.GetString(CfgMongoDBBindAddress) bindAddr := config.Node.GetString(CfgMongoDBBindAddress)
...@@ -42,13 +43,17 @@ func mongoDB() *mongo.Database { ...@@ -42,13 +43,17 @@ func mongoDB() *mongo.Database {
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
ctxDB, cancelDB = context.WithTimeout(context.Background(), 10*time.Second) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
err = client.Connect(ctxDB) err = client.Connect(ctx)
ctxDisconnectDB = ctx
defer cancel()
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
err = client.Ping(ctxDB, readpref.Primary()) ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
err = client.Ping(ctx, readpref.Primary())
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
...@@ -62,6 +67,8 @@ func storeFPCRecords(records []FPCRecord, db *mongo.Database) error { ...@@ -62,6 +67,8 @@ func storeFPCRecords(records []FPCRecord, db *mongo.Database) error {
for i := range records { for i := range records {
data[i] = records[i] data[i] = records[i]
} }
_, err := db.Collection("FPC").InsertMany(ctxDB, data) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
_, err := db.Collection("FPC").InsertMany(ctx, data)
return err return err
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment