Skip to content
Snippets Groups Projects
Commit f6496bc1 authored by capossele's avatar capossele
Browse files

:construction: WIP

parent de15eae0
No related branches found
No related tags found
No related merge requests found
...@@ -39,11 +39,21 @@ var ( ...@@ -39,11 +39,21 @@ var (
// Plugin is the plugin instance of the analysis client plugin. // Plugin is the plugin instance of the analysis client plugin.
Plugin = node.NewPlugin(PluginName, node.Enabled, run) Plugin = node.NewPlugin(PluginName, node.Enabled, run)
log *logger.Logger log *logger.Logger
managedConn *network.ManagedConnection
connLock sync.Mutex connLock sync.Mutex
) )
func run(_ *node.Plugin) { func run(_ *node.Plugin) {
log = logger.NewLogger(PluginName) log = logger.NewLogger(PluginName)
conn, err := net.Dial("tcp", config.Node.GetString(CfgServerAddress))
if err != nil {
log.Debugf("Could not connect to reporting server: %s", err.Error())
return
}
managedConn = network.NewManagedConnection(conn)
if err := daemon.BackgroundWorker(PluginName, func(shutdownSignal <-chan struct{}) { if err := daemon.BackgroundWorker(PluginName, func(shutdownSignal <-chan struct{}) {
fpctest.Voter().Events().RoundExecuted.Attach(events.NewClosure(onRoundExecuted)) fpctest.Voter().Events().RoundExecuted.Attach(events.NewClosure(onRoundExecuted))
...@@ -57,14 +67,7 @@ func run(_ *node.Plugin) { ...@@ -57,14 +67,7 @@ func run(_ *node.Plugin) {
return return
case <-ticker.C: case <-ticker.C:
conn, err := net.Dial("tcp", config.Node.GetString(CfgServerAddress)) sendHeartbeat(managedConn, createHeartbeat())
if err != nil {
log.Debugf("Could not connect to reporting server: %s", err.Error())
continue
}
managedConn := network.NewManagedConnection(conn)
eventDispatchers := getEventDispatchers(managedConn)
reportHeartbeat(eventDispatchers)
} }
} }
}, shutdown.PriorityAnalysis); err != nil { }, shutdown.PriorityAnalysis); err != nil {
...@@ -78,9 +81,7 @@ type EventDispatchers struct { ...@@ -78,9 +81,7 @@ type EventDispatchers struct {
Heartbeat func(heartbeat *packet.Heartbeat) Heartbeat func(heartbeat *packet.Heartbeat)
} }
func getEventDispatchers(conn *network.ManagedConnection) *EventDispatchers { func sendHeartbeat(conn *network.ManagedConnection, hb *packet.Heartbeat) {
return &EventDispatchers{
Heartbeat: func(hb *packet.Heartbeat) {
var out strings.Builder var out strings.Builder
for _, value := range hb.OutboundIDs { for _, value := range hb.OutboundIDs {
out.WriteString(hex.EncodeToString(value)) out.WriteString(hex.EncodeToString(value))
...@@ -107,11 +108,9 @@ func getEventDispatchers(conn *network.ManagedConnection) *EventDispatchers { ...@@ -107,11 +108,9 @@ func getEventDispatchers(conn *network.ManagedConnection) *EventDispatchers {
if _, err = conn.Write(data); err != nil { if _, err = conn.Write(data); err != nil {
log.Debugw("Error while writing to connection", "Description", err) log.Debugw("Error while writing to connection", "Description", err)
} }
},
}
} }
func reportHeartbeat(dispatchers *EventDispatchers) { func createHeartbeat() *packet.Heartbeat {
// get own ID // get own ID
var nodeID []byte var nodeID []byte
if local.GetInstance() != nil { if local.GetInstance() != nil {
...@@ -138,8 +137,7 @@ func reportHeartbeat(dispatchers *EventDispatchers) { ...@@ -138,8 +137,7 @@ func reportHeartbeat(dispatchers *EventDispatchers) {
inboundIDs[i] = neighbor.ID().Bytes() inboundIDs[i] = neighbor.ID().Bytes()
} }
hb := &packet.Heartbeat{OwnID: nodeID, OutboundIDs: outboundIDs, InboundIDs: inboundIDs} return &packet.Heartbeat{OwnID: nodeID, OutboundIDs: outboundIDs, InboundIDs: inboundIDs}
dispatchers.Heartbeat(hb)
} }
func onRoundExecuted(roundStats *vote.RoundStats) { func onRoundExecuted(roundStats *vote.RoundStats) {
...@@ -161,14 +159,6 @@ func onRoundExecuted(roundStats *vote.RoundStats) { ...@@ -161,14 +159,6 @@ func onRoundExecuted(roundStats *vote.RoundStats) {
return return
} }
conn, err := net.Dial("tcp", config.Node.GetString(CfgServerAddress))
if err != nil {
log.Debugf("Could not connect to reporting server: %s", err.Error())
return
}
managedConn := network.NewManagedConnection(conn)
connLock.Lock() connLock.Lock()
defer connLock.Unlock() defer connLock.Unlock()
if _, err = managedConn.Write(data); err != nil { if _, err = managedConn.Write(data); err != nil {
......
...@@ -5,31 +5,45 @@ import ( ...@@ -5,31 +5,45 @@ import (
"github.com/iotaledger/goshimmer/packages/shutdown" "github.com/iotaledger/goshimmer/packages/shutdown"
"github.com/iotaledger/goshimmer/packages/vote" "github.com/iotaledger/goshimmer/packages/vote"
"github.com/iotaledger/goshimmer/plugins/drng" "github.com/iotaledger/goshimmer/plugins/analysis/packet"
analysis "github.com/iotaledger/goshimmer/plugins/analysis/server"
"github.com/iotaledger/hive.go/daemon" "github.com/iotaledger/hive.go/daemon"
"github.com/iotaledger/hive.go/events" "github.com/iotaledger/hive.go/events"
"github.com/iotaledger/hive.go/workerpool" "github.com/iotaledger/hive.go/workerpool"
"github.com/mr-tron/base58"
) )
var fpcLiveFeedWorkerCount = 1 var (
var fpcLiveFeedWorkerQueueSize = 50 fpcLiveFeedWorkerCount = 1
var fpcLiveFeedWorkerPool *workerpool.WorkerPool fpcLiveFeedWorkerQueueSize = 50
fpcLiveFeedWorkerPool *workerpool.WorkerPool
type fpcRoundMsg struct { conflicts map[string]Conflict
ID string `json:"id"` )
Opinion int `json:"opinion"`
type Conflict struct {
NodesView map[string]voteContext
} }
func configureFPCLiveFeed() { type voteContext struct {
fpcLiveFeedWorkerPool = workerpool.New(func(task workerpool.Task) { NodeID string
newMsg := task.Param(0).(vote.RoundStats) Rounds int
Opinions []vote.Opinion
Like vote.Opinion
}
for _, conflict := range newMsg.ActiveVoteContexts { //
broadcastWsMessage(&wsmsg{MsgTypeDrng, &fpcRoundMsg{
ID: conflict.ID, // FPCMsg contains an FPC update
Opinion: int(conflict.LastOpinion()), type FPCMsg struct {
}}) Nodes int `json:"nodes"`
ConflictSet map[string]Conflict `json:"conflictset"`
} }
func configureFPCLiveFeed() {
fpcLiveFeedWorkerPool = workerpool.New(func(task workerpool.Task) {
newMsg := task.Param(0).(*FPCMsg)
broadcastWsMessage(&wsmsg{MsgTypeFPC, newMsg})
task.Return(nil) task.Return(nil)
}, workerpool.WorkerCount(fpcLiveFeedWorkerCount), workerpool.QueueSize(fpcLiveFeedWorkerQueueSize)) }, workerpool.WorkerCount(fpcLiveFeedWorkerCount), workerpool.QueueSize(fpcLiveFeedWorkerQueueSize))
} }
...@@ -39,21 +53,45 @@ func runFPCLiveFeed() { ...@@ -39,21 +53,45 @@ func runFPCLiveFeed() {
newMsgRateLimiter := time.NewTicker(time.Second / 10) newMsgRateLimiter := time.NewTicker(time.Second / 10)
defer newMsgRateLimiter.Stop() defer newMsgRateLimiter.Stop()
notifyNewFPCRound := events.NewClosure(func(message vote.RoundStats) { onFPCHeartbeatReceived := events.NewClosure(func(hb *packet.FPCHeartbeat) {
select { select {
case <-newMsgRateLimiter.C: case <-newMsgRateLimiter.C:
fpcLiveFeedWorkerPool.TrySubmit(message) fpcLiveFeedWorkerPool.TrySubmit(createFPCUpdate(hb))
default: default:
} }
}) })
//drng.Instance().Events.Randomness.Attach(notifyNewFPCRound) analysis.Events.FPCHeartbeat.Attach(events.NewClosure(onFPCHeartbeatReceived))
fpcLiveFeedWorkerPool.Start() fpcLiveFeedWorkerPool.Start()
defer fpcLiveFeedWorkerPool.Stop() defer fpcLiveFeedWorkerPool.Stop()
<-shutdownSignal <-shutdownSignal
log.Info("Stopping Analysis[FPCUpdater] ...") log.Info("Stopping Analysis[FPCUpdater] ...")
drng.Instance().Events.Randomness.Detach(notifyNewFPCRound) analysis.Events.FPCHeartbeat.Detach(events.NewClosure(onFPCHeartbeatReceived))
log.Info("Stopping Analysis[FPCUpdater] ... done") log.Info("Stopping Analysis[FPCUpdater] ... done")
}, shutdown.PriorityDashboard) }, shutdown.PriorityDashboard)
} }
func createFPCUpdate(hb *packet.FPCHeartbeat) *FPCMsg {
update := make(map[string]Conflict)
for ID, context := range hb.RoundStats.ActiveVoteContexts {
update[ID] = newConflict()
nodeID := base58.Encode(hb.OwnID)
update[ID].NodesView[nodeID] = voteContext{
NodeID: nodeID,
Rounds: context.Rounds,
Opinions: context.Opinions,
}
}
return &FPCMsg{
ConflictSet: update,
}
}
func newConflict() Conflict {
return Conflict{
NodesView: make(map[string]voteContext),
}
}
...@@ -140,6 +140,8 @@ const ( ...@@ -140,6 +140,8 @@ const (
MsgTypeVertex MsgTypeVertex
// MsgTypeTipInfo defines a tip info message. // MsgTypeTipInfo defines a tip info message.
MsgTypeTipInfo MsgTypeTipInfo
// MsgTypeFPC defines a FPC update message.
MsgTypeFPC
) )
type wsmsg struct { type wsmsg struct {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment