From b48b24f3fe5f4dbb21a31b5f7d0b458b67610e9d Mon Sep 17 00:00:00 2001 From: capossele <angelocapossele@gmail.com> Date: Wed, 10 Jun 2020 19:27:47 +0100 Subject: [PATCH] =?UTF-8?q?=F0=9F=90=9B=20Fix=20MongoDB=20ctx=20bug?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../analysis/dashboard/fpc_conflictRecord.go | 45 +++++++++++-------- .../dashboard/fpc_conflictRecord_test.go | 35 +++------------ plugins/analysis/dashboard/fpc_livefeed.go | 20 +++++---- plugins/analysis/dashboard/fpc_storage.go | 25 +++++++---- 4 files changed, 59 insertions(+), 66 deletions(-) diff --git a/plugins/analysis/dashboard/fpc_conflictRecord.go b/plugins/analysis/dashboard/fpc_conflictRecord.go index 847c2479..be7c3487 100644 --- a/plugins/analysis/dashboard/fpc_conflictRecord.go +++ b/plugins/analysis/dashboard/fpc_conflictRecord.go @@ -4,16 +4,12 @@ import "sync" type conflictRecord struct { conflictSet conflictSet - size uint32 - buffer []string lock sync.RWMutex } -func newConflictRecord(recordSize uint32) *conflictRecord { +func newConflictRecord() *conflictRecord { return &conflictRecord{ conflictSet: make(conflictSet), - size: recordSize, - buffer: []string{}, } } @@ -21,32 +17,45 @@ func (cr *conflictRecord) ToFPCUpdate() *FPCUpdate { cr.lock.RLock() defer cr.lock.RUnlock() - // log.Info("FPC refresh conflicts: ", len(cr.conflictSet)) - // for k, v := range cr.conflictSet { - // log.Info("Conflict ID: ", k, len(v.NodesView)) - // } - return &FPCUpdate{ Conflicts: cr.conflictSet, } } +func (cr *conflictRecord) load(ID string) (conflict, bool) { + cr.lock.RLock() + defer cr.lock.RUnlock() + + // update the internal state + if c, ok := cr.conflictSet[ID]; !ok { + return c, false + } + + return cr.conflictSet[ID], true +} + func (cr *conflictRecord) update(ID string, c conflict) { - lock.Lock() - defer lock.Unlock() + cr.lock.Lock() + defer cr.lock.Unlock() // update the internal state if _, ok := cr.conflictSet[ID]; !ok { cr.conflictSet[ID] = newConflict() - - cr.buffer = append(cr.buffer, ID) - if len(cr.buffer) > int(cr.size) { - delete(cr.conflictSet, cr.buffer[0]) - cr.buffer = cr.buffer[1:] - } } for nodeID, context := range c.NodesView { cr.conflictSet[ID].NodesView[nodeID] = context } } + +func (cr *conflictRecord) delete(ID string) { + cr.lock.Lock() + defer cr.lock.Unlock() + + // update the internal state + if _, ok := cr.conflictSet[ID]; !ok { + return + } + + delete(cr.conflictSet, ID) +} diff --git a/plugins/analysis/dashboard/fpc_conflictRecord_test.go b/plugins/analysis/dashboard/fpc_conflictRecord_test.go index 37630e17..e36b053e 100644 --- a/plugins/analysis/dashboard/fpc_conflictRecord_test.go +++ b/plugins/analysis/dashboard/fpc_conflictRecord_test.go @@ -7,9 +7,8 @@ import ( ) func TestConflictRecordUpdate(t *testing.T) { - // test ConflictRecord creation - c := newConflictRecord(2) - require.Equal(t, 2, int(c.size)) + // ConflictRecord creation + c := newConflictRecord() // test first new update conflictA := conflict{ @@ -25,8 +24,6 @@ func TestConflictRecordUpdate(t *testing.T) { c.update("A", conflictA) require.Equal(t, conflictA, c.conflictSet["A"]) - require.Equal(t, 1, len(c.buffer)) - require.Contains(t, c.buffer, "A") // test second new update conflictB := conflict{ @@ -42,8 +39,6 @@ func TestConflictRecordUpdate(t *testing.T) { c.update("B", conflictB) require.Equal(t, conflictB, c.conflictSet["B"]) - require.Equal(t, 2, len(c.buffer)) - require.Contains(t, c.buffer, "B") // test modify existing entry conflictB = conflict{ @@ -57,29 +52,9 @@ func TestConflictRecordUpdate(t *testing.T) { }, } c.update("B", conflictB) - require.Equal(t, conflictB, c.conflictSet["B"]) - require.Equal(t, 2, len(c.buffer)) - require.Contains(t, c.buffer, "B") - - // test last update and first update entry removal - conflictC := conflict{ - NodesView: map[string]voteContext{ - "nodeC": { - NodeID: "nodeC", - Rounds: 3, - Opinions: []int32{disliked, liked, disliked}, - Status: liked, - }, - }, - } - c.update("C", conflictC) - - require.Equal(t, conflictC, c.conflictSet["C"]) - require.Equal(t, 2, len(c.buffer)) - require.Contains(t, c.buffer, "C") - - require.NotContains(t, c.conflictSet, "A") - require.NotContains(t, c.buffer, "A") + // test entry removal + c.delete("B") + require.NotContains(t, c.conflictSet, "B") } diff --git a/plugins/analysis/dashboard/fpc_livefeed.go b/plugins/analysis/dashboard/fpc_livefeed.go index 5c85f159..187004bb 100644 --- a/plugins/analysis/dashboard/fpc_livefeed.go +++ b/plugins/analysis/dashboard/fpc_livefeed.go @@ -1,7 +1,6 @@ package dashboard import ( - "fmt" "time" "github.com/gorilla/websocket" @@ -9,7 +8,6 @@ import ( "github.com/iotaledger/goshimmer/packages/vote" "github.com/iotaledger/goshimmer/plugins/analysis/packet" analysis "github.com/iotaledger/goshimmer/plugins/analysis/server" - "github.com/iotaledger/goshimmer/plugins/config" "github.com/iotaledger/hive.go/daemon" "github.com/iotaledger/hive.go/events" "github.com/iotaledger/hive.go/workerpool" @@ -36,11 +34,11 @@ type FPCUpdate struct { } func configureFPCLiveFeed() { - recordedConflicts = newConflictRecord(config.Node.GetUint32(CfgFPCBufferSize)) + recordedConflicts = newConflictRecord() fpcLiveFeedWorkerPool = workerpool.New(func(task workerpool.Task) { newMsg := task.Param(0).(*FPCUpdate) - fmt.Println("broadcasting FPC message to websocket clients") + //fmt.Println("broadcasting FPC message to websocket clients") broadcastWsMessage(&wsmsg{MsgTypeFPC, newMsg}, true) task.Return(nil) }, workerpool.WorkerCount(fpcLiveFeedWorkerCount), workerpool.QueueSize(fpcLiveFeedWorkerQueueSize)) @@ -49,7 +47,7 @@ func configureFPCLiveFeed() { func runFPCLiveFeed() { if err := daemon.BackgroundWorker("Analysis[FPCUpdater]", func(shutdownSignal <-chan struct{}) { onFPCHeartbeatReceived := events.NewClosure(func(hb *packet.FPCHeartbeat) { - fmt.Println("broadcasting FPC live feed") + //fmt.Println("broadcasting FPC live feed") fpcLiveFeedWorkerPool.Submit(createFPCUpdate(hb, true)) }) analysis.Events.FPCHeartbeat.Attach(onFPCHeartbeatReceived) @@ -92,12 +90,16 @@ func createFPCUpdate(hb *packet.FPCHeartbeat, recordEvent bool) *FPCUpdate { finalizedConflicts := make([]FPCRecord, len(hb.Finalized)) i := 0 for ID, finalOpinion := range hb.Finalized { - recordedConflicts.lock.Lock() - conflictDetail := recordedConflicts.conflictSet[ID].NodesView[nodeID] + conflictOverview, ok := recordedConflicts.load(ID) + if !ok { + log.Error("Error: missing conflict with ID:", ID) + continue + } + conflictDetail := conflictOverview.NodesView[nodeID] conflictDetail.Status = vote.ConvertOpinionToInt32(finalOpinion) conflicts[ID] = newConflict() conflicts[ID].NodesView[nodeID] = conflictDetail - recordedConflicts.conflictSet[ID].NodesView[nodeID] = conflictDetail + recordedConflicts.update(ID, conflicts[ID]) finalizedConflicts[i] = FPCRecord{ ConflictID: ID, NodeID: conflictDetail.NodeID, @@ -105,10 +107,10 @@ func createFPCUpdate(hb *packet.FPCHeartbeat, recordEvent bool) *FPCUpdate { Opinions: conflictDetail.Opinions, Status: conflictDetail.Status, } - recordedConflicts.lock.Unlock() i++ } + //log.Info("Storing:\n", finalizedConflicts) err := storeFPCRecords(finalizedConflicts, mongoDB()) if err != nil { log.Errorf("Error while writing on MongoDB: %s", err) diff --git a/plugins/analysis/dashboard/fpc_storage.go b/plugins/analysis/dashboard/fpc_storage.go index 8b3daff6..e32becb1 100644 --- a/plugins/analysis/dashboard/fpc_storage.go +++ b/plugins/analysis/dashboard/fpc_storage.go @@ -21,20 +21,21 @@ type FPCRecord struct { } var ( - db *mongo.Database - ctxDB context.Context - cancelDB context.CancelFunc + db *mongo.Database + ctxDisconnectDB context.Context + // cancelDB context.CancelFunc clientDB *mongo.Client dbOnce sync.Once ) func shutdownMongoDB() { - cancelDB() - clientDB.Disconnect(ctxDB) + //cancelDB() + clientDB.Disconnect(ctxDisconnectDB) } func mongoDB() *mongo.Database { dbOnce.Do(func() { + log.Info("ONCEEEEEEE") username := config.Node.GetString(CfgMongoDBUsername) password := config.Node.GetString(CfgMongoDBPassword) bindAddr := config.Node.GetString(CfgMongoDBBindAddress) @@ -42,13 +43,17 @@ func mongoDB() *mongo.Database { if err != nil { log.Fatal(err) } - ctxDB, cancelDB = context.WithTimeout(context.Background(), 10*time.Second) - err = client.Connect(ctxDB) + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + err = client.Connect(ctx) + ctxDisconnectDB = ctx + defer cancel() if err != nil { log.Fatal(err) } - err = client.Ping(ctxDB, readpref.Primary()) + ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + err = client.Ping(ctx, readpref.Primary()) if err != nil { log.Fatal(err) } @@ -62,6 +67,8 @@ func storeFPCRecords(records []FPCRecord, db *mongo.Database) error { for i := range records { data[i] = records[i] } - _, err := db.Collection("FPC").InsertMany(ctxDB, data) + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + _, err := db.Collection("FPC").InsertMany(ctx, data) return err } -- GitLab