Skip to content
Snippets Groups Projects
Unverified Commit 2fa51899 authored by capossele's avatar capossele
Browse files

:card_box: Add FPC analysis data persistence

parent f69d4bc8
Branches
Tags
No related merge requests found
......@@ -24,6 +24,7 @@ require (
github.com/stretchr/testify v1.5.1
github.com/valyala/fasttemplate v1.1.0 // indirect
go.dedis.ch/kyber/v3 v3.0.12
go.mongodb.org/mongo-driver v1.0.0
go.uber.org/atomic v1.6.0
go.uber.org/zap v1.14.0
golang.org/x/crypto v0.0.0-20200429183012-4b2356b1ed79
......
......@@ -299,6 +299,7 @@ github.com/stretchr/testify v1.5.1 h1:nOGnQDM7FYENwehXlg/kFVnos3rEvtKTjRvOWSzb6H
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/subosito/gotenv v1.2.0 h1:Slr1R9HxAlEKefgq5jn9U+DnETlIUa6HfgEzj0g5d7s=
github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw=
github.com/tidwall/pretty v1.0.0 h1:HsD+QiTn7sK6flMKIvNmpqz1qrpP3Ps6jOKIKMooyg4=
github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk=
github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
github.com/ugorji/go v1.1.4/go.mod h1:uQMGLiO92mf5W77hV/PUCpI3pbzQx3CRekS0kk+RGrc=
......@@ -311,7 +312,9 @@ github.com/valyala/fasttemplate v1.1.0 h1:RZqt0yGBsps8NGvLSGW804QQqCUYYLsaOjTVHy
github.com/valyala/fasttemplate v1.1.0/go.mod h1:UQGH1tvbgY+Nz5t2n7tXsz52dQxojPUpymEIMZ47gx8=
github.com/xanzy/ssh-agent v0.2.1 h1:TCbipTQL2JiiCprBWx9frJ2eJlCYT00NmctrHxVAr70=
github.com/xanzy/ssh-agent v0.2.1/go.mod h1:mLlQY/MoOhWBj+gOGMQkOeiEvkx+8pJSI+0Bx9h2kr4=
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c h1:u40Z8hqBAAQyv+vATcGgV0YCnDjqSL7/q/JyPhhJSPk=
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I=
github.com/xdg/stringprep v1.0.0 h1:d9X0esnoa3dFsV0FG35rAT0RIhYFlPq7MiP+DW89La0=
github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y=
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU=
github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q=
......@@ -331,6 +334,7 @@ go.etcd.io/bbolt v1.3.3 h1:MUGmc65QhB3pIlaQ5bB4LwqSj6GIonVJXpZiaKNyaKk=
go.etcd.io/bbolt v1.3.3/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU=
go.etcd.io/bbolt v1.3.4 h1:hi1bXHMVrlQh6WwxAy+qZCV/SYIlqo+Ushwdpa4tAKg=
go.etcd.io/bbolt v1.3.4/go.mod h1:G5EMThwa9y8QZGBClrRx5EY+Yw9kAhnjy3bSjsnlVTQ=
go.mongodb.org/mongo-driver v1.0.0 h1:KxPRDyfB2xXnDE2My8acoOWBQkfv3tz0SaWTRZjJR0c=
go.mongodb.org/mongo-driver v1.0.0/go.mod h1:u7ryQJ+DOzQmeO7zB6MHyr8jkEQvC8vH7qLUO4lqsUM=
go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
......
......@@ -43,9 +43,13 @@ var (
log *logger.Logger
managedConn *network.ManagedConnection
connLock sync.Mutex
finalized map[string]vote.Opinion
finalizedMutex sync.RWMutex
)
func run(_ *node.Plugin) {
finalized = make(map[string]vote.Opinion)
log = logger.NewLogger(PluginName)
conn, err := net.Dial("tcp", config.Node.GetString(CfgServerAddress))
......@@ -58,6 +62,9 @@ func run(_ *node.Plugin) {
if err := daemon.BackgroundWorker(PluginName, func(shutdownSignal <-chan struct{}) {
fpctest.Voter().Events().Finalized.Attach(events.NewClosure(onFinalized))
defer fpctest.Voter().Events().Finalized.Detach(events.NewClosure(onFinalized))
fpctest.Voter().Events().RoundExecuted.Attach(events.NewClosure(onRoundExecuted))
defer fpctest.Voter().Events().RoundExecuted.Detach(events.NewClosure(onRoundExecuted))
......@@ -77,6 +84,12 @@ func run(_ *node.Plugin) {
}
}
func onFinalized(id string, opinion vote.Opinion) {
finalizedMutex.Lock()
finalized[id] = opinion
finalizedMutex.Unlock()
}
// EventDispatchers holds the Heartbeat function.
type EventDispatchers struct {
// Heartbeat defines the Heartbeat function.
......@@ -167,6 +180,11 @@ func onRoundExecuted(roundStats *vote.RoundStats) {
RoundStats: rs,
}
finalizedMutex.Lock()
hb.Finalized = finalized
finalized = make(map[string]vote.Opinion)
finalizedMutex.Unlock()
data, err := packet.NewFPCHeartbeatMessage(hb)
if err != nil {
log.Info(err, " - FPC heartbeat message skipped")
......
......@@ -5,14 +5,14 @@ type conflictSet = map[string]conflict
// conflict defines the struct for the opinions of the nodes regarding a given conflict.
type conflict struct {
NodesView map[string]voteContext `json:"nodesview"`
NodesView map[string]voteContext `json:"nodesview" bson:"nodesview"`
}
type voteContext struct {
NodeID string `json:"nodeid"`
Rounds int `json:"rounds"`
Opinions []int32 `json:"opinions"`
Status int32 `json:"status"`
NodeID string `json:"nodeid" bson:"nodeid"`
Rounds int `json:"rounds" bson:"rounds"`
Opinions []int32 `json:"opinions" bson:"opinions"`
Status int32 `json:"status" bson:"status"`
}
func newConflict() conflict {
......
......@@ -32,7 +32,7 @@ var (
// FPCUpdate contains an FPC update.
type FPCUpdate struct {
Conflicts conflictSet `json:"conflictset"`
Conflicts conflictSet `json:"conflictset" bson:"conflictset"`
}
func configureFPCLiveFeed() {
......@@ -77,11 +77,6 @@ func createFPCUpdate(hb *packet.FPCHeartbeat, recordEvent bool) *FPCUpdate {
Opinions: vote.ConvertOpinionsToInts32(context.Opinions),
}
// check conflict has been finalized
if _, ok := hb.Finalized[ID]; ok {
newVoteContext.Status = vote.ConvertOpinionToInt32(hb.Finalized[ID])
}
conflicts[ID] = newConflict()
conflicts[ID].NodesView[nodeID] = newVoteContext
......@@ -91,6 +86,34 @@ func createFPCUpdate(hb *packet.FPCHeartbeat, recordEvent bool) *FPCUpdate {
}
}
// check finalized conflicts
if len(hb.Finalized) > 0 {
finalizedConflicts := make([]FPCRecord, len(hb.Finalized))
i := 0
for ID, finalOpinion := range hb.Finalized {
recordedConflicts.lock.Lock()
conflictDetail := recordedConflicts.conflictSet[ID].NodesView[nodeID]
conflictDetail.Status = vote.ConvertOpinionToInt32(finalOpinion)
conflicts[ID] = newConflict()
conflicts[ID].NodesView[nodeID] = conflictDetail
recordedConflicts.conflictSet[ID].NodesView[nodeID] = conflictDetail
finalizedConflicts[i] = FPCRecord{
ConflictID: ID,
NodeID: conflictDetail.NodeID,
Rounds: conflictDetail.Rounds,
Opinions: conflictDetail.Opinions,
Status: conflictDetail.Status,
}
recordedConflicts.lock.Unlock()
i++
}
err := storeFPCRecords(finalizedConflicts, mongoDB())
if err != nil {
log.Errorf("Error while writing on MongoDB: %s", err)
}
}
return &FPCUpdate{
Conflicts: conflicts,
}
......
package dashboard
import (
"context"
"sync"
"time"
"github.com/iotaledger/goshimmer/plugins/config"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"go.mongodb.org/mongo-driver/mongo/readpref"
)
type FPCRecord struct {
ConflictID string `json:"conflictid" bson:"conflictid"`
NodeID string `json:"nodeid" bson:"nodeid"`
Rounds int `json:"rounds" bson:"rounds"`
Opinions []int32 `json:"opinions" bson:"opinions"`
Status int32 `json:"status" bson:"status"`
}
var (
db *mongo.Database
ctxDB context.Context
cancelDB context.CancelFunc
clientDB *mongo.Client
dbOnce sync.Once
)
func shutdownMongoDB() {
cancelDB()
clientDB.Disconnect(ctxDB)
}
func mongoDB() *mongo.Database {
dbOnce.Do(func() {
username := config.Node.GetString(CfgMongoDBUsername)
password := config.Node.GetString(CfgMongoDBPassword)
bindAddr := config.Node.GetString(CfgMongoDBBindAddress)
client, err := mongo.NewClient(options.Client().ApplyURI("mongodb://" + username + ":" + password + "@" + bindAddr))
if err != nil {
log.Fatal(err)
}
ctxDB, cancelDB = context.WithTimeout(context.Background(), 10*time.Second)
err = client.Connect(ctxDB)
if err != nil {
log.Fatal(err)
}
err = client.Ping(ctxDB, readpref.Primary())
if err != nil {
log.Fatal(err)
}
db = client.Database("analysis")
})
return db
}
func storeFPCRecords(records []FPCRecord, db *mongo.Database) error {
data := make([]interface{}, len(records))
for i := range records {
data[i] = records[i]
}
_, err := db.Collection("FPC").InsertMany(ctxDB, data)
return err
}
......@@ -17,6 +17,12 @@ const (
CfgBasicAuthPassword = "analysis.dashboard.basic_auth.password"
// CfgFPCBufferSize defines the config flag of the analysis dashboard FPC (past conflicts) buffer size.
CfgFPCBufferSize = "analysis.dashboard.fpc.buffer_size"
// CfgMongoDBUsername defines the config flag of the analysis dashboard mongoDB username.
CfgMongoDBUsername = "analysis.dashboard.mongodb.username"
// CfgMongoDBPassword defines the config flag of the analysis dashboard mongoDBpassword.
CfgMongoDBPassword = "analysis.dashboard.mongodb.password"
// CfgMongoDBBindAddress defines the config flag of the analysis dashboard mongoDB binding address.
CfgMongoDBBindAddress = "analysis.dashboard.mongodb.bindAddress"
)
func init() {
......@@ -26,4 +32,7 @@ func init() {
flag.String(CfgBasicAuthUsername, "goshimmer", "HTTP basic auth username")
flag.String(CfgBasicAuthPassword, "goshimmer", "HTTP basic auth password")
flag.Uint32(CfgFPCBufferSize, 200, "FPC buffer size")
flag.String(CfgMongoDBUsername, "root", "MongoDB username")
flag.String(CfgMongoDBPassword, "password", "MongoDB username")
flag.String(CfgMongoDBBindAddress, "mongodb_container:27017", "MongoDB bind address")
}
version: "3.5"
services:
mongodb_container:
image: mongo:latest
environment:
MONGO_INITDB_ROOT_USERNAME: root
MONGO_INITDB_ROOT_PASSWORD: password
ports:
- 27017:27017
volumes:
- mongodb_data_container:/data/db
entry_node:
container_name: entry_node
image: golang:1.14
......@@ -59,3 +69,4 @@ services:
volumes:
goshimmer-cache:
name: goshimmer-cache
mongodb_data_container:
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment