From 2fa51899dc72670b786097a4c8cd437d54a9a998 Mon Sep 17 00:00:00 2001 From: capossele <angelocapossele@gmail.com> Date: Mon, 8 Jun 2020 18:48:03 +0100 Subject: [PATCH] =?UTF-8?q?=F0=9F=97=83=20Add=20FPC=20analysis=20data=20pe?= =?UTF-8?q?rsistence?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- go.mod | 1 + go.sum | 4 ++ plugins/analysis/client/plugin.go | 18 ++++++ plugins/analysis/dashboard/fpc_conflict.go | 10 ++-- plugins/analysis/dashboard/fpc_livefeed.go | 35 ++++++++++-- plugins/analysis/dashboard/fpc_storage.go | 66 ++++++++++++++++++++++ plugins/analysis/dashboard/parameters.go | 9 +++ tools/docker-network/docker-compose.yml | 11 ++++ 8 files changed, 143 insertions(+), 11 deletions(-) create mode 100644 plugins/analysis/dashboard/fpc_storage.go diff --git a/go.mod b/go.mod index 84853763..7ed8a345 100644 --- a/go.mod +++ b/go.mod @@ -24,6 +24,7 @@ require ( github.com/stretchr/testify v1.5.1 github.com/valyala/fasttemplate v1.1.0 // indirect go.dedis.ch/kyber/v3 v3.0.12 + go.mongodb.org/mongo-driver v1.0.0 go.uber.org/atomic v1.6.0 go.uber.org/zap v1.14.0 golang.org/x/crypto v0.0.0-20200429183012-4b2356b1ed79 diff --git a/go.sum b/go.sum index 77da261f..613421d5 100644 --- a/go.sum +++ b/go.sum @@ -299,6 +299,7 @@ github.com/stretchr/testify v1.5.1 h1:nOGnQDM7FYENwehXlg/kFVnos3rEvtKTjRvOWSzb6H github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= github.com/subosito/gotenv v1.2.0 h1:Slr1R9HxAlEKefgq5jn9U+DnETlIUa6HfgEzj0g5d7s= github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw= +github.com/tidwall/pretty v1.0.0 h1:HsD+QiTn7sK6flMKIvNmpqz1qrpP3Ps6jOKIKMooyg4= github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk= github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U= github.com/ugorji/go v1.1.4/go.mod h1:uQMGLiO92mf5W77hV/PUCpI3pbzQx3CRekS0kk+RGrc= @@ -311,7 +312,9 @@ github.com/valyala/fasttemplate v1.1.0 h1:RZqt0yGBsps8NGvLSGW804QQqCUYYLsaOjTVHy github.com/valyala/fasttemplate v1.1.0/go.mod h1:UQGH1tvbgY+Nz5t2n7tXsz52dQxojPUpymEIMZ47gx8= github.com/xanzy/ssh-agent v0.2.1 h1:TCbipTQL2JiiCprBWx9frJ2eJlCYT00NmctrHxVAr70= github.com/xanzy/ssh-agent v0.2.1/go.mod h1:mLlQY/MoOhWBj+gOGMQkOeiEvkx+8pJSI+0Bx9h2kr4= +github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c h1:u40Z8hqBAAQyv+vATcGgV0YCnDjqSL7/q/JyPhhJSPk= github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I= +github.com/xdg/stringprep v1.0.0 h1:d9X0esnoa3dFsV0FG35rAT0RIhYFlPq7MiP+DW89La0= github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y= github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU= github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q= @@ -331,6 +334,7 @@ go.etcd.io/bbolt v1.3.3 h1:MUGmc65QhB3pIlaQ5bB4LwqSj6GIonVJXpZiaKNyaKk= go.etcd.io/bbolt v1.3.3/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= go.etcd.io/bbolt v1.3.4 h1:hi1bXHMVrlQh6WwxAy+qZCV/SYIlqo+Ushwdpa4tAKg= go.etcd.io/bbolt v1.3.4/go.mod h1:G5EMThwa9y8QZGBClrRx5EY+Yw9kAhnjy3bSjsnlVTQ= +go.mongodb.org/mongo-driver v1.0.0 h1:KxPRDyfB2xXnDE2My8acoOWBQkfv3tz0SaWTRZjJR0c= go.mongodb.org/mongo-driver v1.0.0/go.mod h1:u7ryQJ+DOzQmeO7zB6MHyr8jkEQvC8vH7qLUO4lqsUM= go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= diff --git a/plugins/analysis/client/plugin.go b/plugins/analysis/client/plugin.go index bcbd601a..15fdec67 100644 --- a/plugins/analysis/client/plugin.go +++ b/plugins/analysis/client/plugin.go @@ -43,9 +43,13 @@ var ( log *logger.Logger managedConn *network.ManagedConnection connLock sync.Mutex + + finalized map[string]vote.Opinion + finalizedMutex sync.RWMutex ) func run(_ *node.Plugin) { + finalized = make(map[string]vote.Opinion) log = logger.NewLogger(PluginName) conn, err := net.Dial("tcp", config.Node.GetString(CfgServerAddress)) @@ -58,6 +62,9 @@ func run(_ *node.Plugin) { if err := daemon.BackgroundWorker(PluginName, func(shutdownSignal <-chan struct{}) { + fpctest.Voter().Events().Finalized.Attach(events.NewClosure(onFinalized)) + defer fpctest.Voter().Events().Finalized.Detach(events.NewClosure(onFinalized)) + fpctest.Voter().Events().RoundExecuted.Attach(events.NewClosure(onRoundExecuted)) defer fpctest.Voter().Events().RoundExecuted.Detach(events.NewClosure(onRoundExecuted)) @@ -77,6 +84,12 @@ func run(_ *node.Plugin) { } } +func onFinalized(id string, opinion vote.Opinion) { + finalizedMutex.Lock() + finalized[id] = opinion + finalizedMutex.Unlock() +} + // EventDispatchers holds the Heartbeat function. type EventDispatchers struct { // Heartbeat defines the Heartbeat function. @@ -167,6 +180,11 @@ func onRoundExecuted(roundStats *vote.RoundStats) { RoundStats: rs, } + finalizedMutex.Lock() + hb.Finalized = finalized + finalized = make(map[string]vote.Opinion) + finalizedMutex.Unlock() + data, err := packet.NewFPCHeartbeatMessage(hb) if err != nil { log.Info(err, " - FPC heartbeat message skipped") diff --git a/plugins/analysis/dashboard/fpc_conflict.go b/plugins/analysis/dashboard/fpc_conflict.go index ff662784..beb2bccd 100644 --- a/plugins/analysis/dashboard/fpc_conflict.go +++ b/plugins/analysis/dashboard/fpc_conflict.go @@ -5,14 +5,14 @@ type conflictSet = map[string]conflict // conflict defines the struct for the opinions of the nodes regarding a given conflict. type conflict struct { - NodesView map[string]voteContext `json:"nodesview"` + NodesView map[string]voteContext `json:"nodesview" bson:"nodesview"` } type voteContext struct { - NodeID string `json:"nodeid"` - Rounds int `json:"rounds"` - Opinions []int32 `json:"opinions"` - Status int32 `json:"status"` + NodeID string `json:"nodeid" bson:"nodeid"` + Rounds int `json:"rounds" bson:"rounds"` + Opinions []int32 `json:"opinions" bson:"opinions"` + Status int32 `json:"status" bson:"status"` } func newConflict() conflict { diff --git a/plugins/analysis/dashboard/fpc_livefeed.go b/plugins/analysis/dashboard/fpc_livefeed.go index d6c7ea0e..8a80f817 100644 --- a/plugins/analysis/dashboard/fpc_livefeed.go +++ b/plugins/analysis/dashboard/fpc_livefeed.go @@ -32,7 +32,7 @@ var ( // FPCUpdate contains an FPC update. type FPCUpdate struct { - Conflicts conflictSet `json:"conflictset"` + Conflicts conflictSet `json:"conflictset" bson:"conflictset"` } func configureFPCLiveFeed() { @@ -77,11 +77,6 @@ func createFPCUpdate(hb *packet.FPCHeartbeat, recordEvent bool) *FPCUpdate { Opinions: vote.ConvertOpinionsToInts32(context.Opinions), } - // check conflict has been finalized - if _, ok := hb.Finalized[ID]; ok { - newVoteContext.Status = vote.ConvertOpinionToInt32(hb.Finalized[ID]) - } - conflicts[ID] = newConflict() conflicts[ID].NodesView[nodeID] = newVoteContext @@ -91,6 +86,34 @@ func createFPCUpdate(hb *packet.FPCHeartbeat, recordEvent bool) *FPCUpdate { } } + // check finalized conflicts + if len(hb.Finalized) > 0 { + finalizedConflicts := make([]FPCRecord, len(hb.Finalized)) + i := 0 + for ID, finalOpinion := range hb.Finalized { + recordedConflicts.lock.Lock() + conflictDetail := recordedConflicts.conflictSet[ID].NodesView[nodeID] + conflictDetail.Status = vote.ConvertOpinionToInt32(finalOpinion) + conflicts[ID] = newConflict() + conflicts[ID].NodesView[nodeID] = conflictDetail + recordedConflicts.conflictSet[ID].NodesView[nodeID] = conflictDetail + finalizedConflicts[i] = FPCRecord{ + ConflictID: ID, + NodeID: conflictDetail.NodeID, + Rounds: conflictDetail.Rounds, + Opinions: conflictDetail.Opinions, + Status: conflictDetail.Status, + } + recordedConflicts.lock.Unlock() + i++ + } + + err := storeFPCRecords(finalizedConflicts, mongoDB()) + if err != nil { + log.Errorf("Error while writing on MongoDB: %s", err) + } + } + return &FPCUpdate{ Conflicts: conflicts, } diff --git a/plugins/analysis/dashboard/fpc_storage.go b/plugins/analysis/dashboard/fpc_storage.go new file mode 100644 index 00000000..9fe54069 --- /dev/null +++ b/plugins/analysis/dashboard/fpc_storage.go @@ -0,0 +1,66 @@ +package dashboard + +import ( + "context" + "sync" + "time" + + "github.com/iotaledger/goshimmer/plugins/config" + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" + "go.mongodb.org/mongo-driver/mongo/readpref" +) + +type FPCRecord struct { + ConflictID string `json:"conflictid" bson:"conflictid"` + NodeID string `json:"nodeid" bson:"nodeid"` + Rounds int `json:"rounds" bson:"rounds"` + Opinions []int32 `json:"opinions" bson:"opinions"` + Status int32 `json:"status" bson:"status"` +} + +var ( + db *mongo.Database + ctxDB context.Context + cancelDB context.CancelFunc + clientDB *mongo.Client + dbOnce sync.Once +) + +func shutdownMongoDB() { + cancelDB() + clientDB.Disconnect(ctxDB) +} + +func mongoDB() *mongo.Database { + dbOnce.Do(func() { + username := config.Node.GetString(CfgMongoDBUsername) + password := config.Node.GetString(CfgMongoDBPassword) + bindAddr := config.Node.GetString(CfgMongoDBBindAddress) + client, err := mongo.NewClient(options.Client().ApplyURI("mongodb://" + username + ":" + password + "@" + bindAddr)) + if err != nil { + log.Fatal(err) + } + ctxDB, cancelDB = context.WithTimeout(context.Background(), 10*time.Second) + err = client.Connect(ctxDB) + if err != nil { + log.Fatal(err) + } + + err = client.Ping(ctxDB, readpref.Primary()) + if err != nil { + log.Fatal(err) + } + db = client.Database("analysis") + }) + return db +} + +func storeFPCRecords(records []FPCRecord, db *mongo.Database) error { + data := make([]interface{}, len(records)) + for i := range records { + data[i] = records[i] + } + _, err := db.Collection("FPC").InsertMany(ctxDB, data) + return err +} diff --git a/plugins/analysis/dashboard/parameters.go b/plugins/analysis/dashboard/parameters.go index e7c4b7a8..c1ea6d39 100644 --- a/plugins/analysis/dashboard/parameters.go +++ b/plugins/analysis/dashboard/parameters.go @@ -17,6 +17,12 @@ const ( CfgBasicAuthPassword = "analysis.dashboard.basic_auth.password" // CfgFPCBufferSize defines the config flag of the analysis dashboard FPC (past conflicts) buffer size. CfgFPCBufferSize = "analysis.dashboard.fpc.buffer_size" + // CfgMongoDBUsername defines the config flag of the analysis dashboard mongoDB username. + CfgMongoDBUsername = "analysis.dashboard.mongodb.username" + // CfgMongoDBPassword defines the config flag of the analysis dashboard mongoDBpassword. + CfgMongoDBPassword = "analysis.dashboard.mongodb.password" + // CfgMongoDBBindAddress defines the config flag of the analysis dashboard mongoDB binding address. + CfgMongoDBBindAddress = "analysis.dashboard.mongodb.bindAddress" ) func init() { @@ -26,4 +32,7 @@ func init() { flag.String(CfgBasicAuthUsername, "goshimmer", "HTTP basic auth username") flag.String(CfgBasicAuthPassword, "goshimmer", "HTTP basic auth password") flag.Uint32(CfgFPCBufferSize, 200, "FPC buffer size") + flag.String(CfgMongoDBUsername, "root", "MongoDB username") + flag.String(CfgMongoDBPassword, "password", "MongoDB username") + flag.String(CfgMongoDBBindAddress, "mongodb_container:27017", "MongoDB bind address") } diff --git a/tools/docker-network/docker-compose.yml b/tools/docker-network/docker-compose.yml index 49783070..129c1769 100644 --- a/tools/docker-network/docker-compose.yml +++ b/tools/docker-network/docker-compose.yml @@ -1,6 +1,16 @@ version: "3.5" services: + mongodb_container: + image: mongo:latest + environment: + MONGO_INITDB_ROOT_USERNAME: root + MONGO_INITDB_ROOT_PASSWORD: password + ports: + - 27017:27017 + volumes: + - mongodb_data_container:/data/db + entry_node: container_name: entry_node image: golang:1.14 @@ -59,3 +69,4 @@ services: volumes: goshimmer-cache: name: goshimmer-cache + mongodb_data_container: -- GitLab