package client import ( "encoding/hex" "net" "strings" "sync" "time" "github.com/iotaledger/goshimmer/dapps/fpctest" "github.com/iotaledger/goshimmer/packages/shutdown" "github.com/iotaledger/goshimmer/packages/vote" "github.com/iotaledger/goshimmer/plugins/analysis/packet" "github.com/iotaledger/goshimmer/plugins/autopeering" "github.com/iotaledger/goshimmer/plugins/autopeering/local" "github.com/iotaledger/goshimmer/plugins/config" "github.com/iotaledger/hive.go/daemon" "github.com/iotaledger/hive.go/events" "github.com/iotaledger/hive.go/logger" "github.com/iotaledger/hive.go/network" "github.com/iotaledger/hive.go/node" flag "github.com/spf13/pflag" ) const ( // PluginName is the name of the analysis client plugin. PluginName = "Analysis-Client" // CfgServerAddress defines the config flag of the analysis server address. CfgServerAddress = "analysis.client.serverAddress" // defines the report interval of the reporting in seconds. reportIntervalSec = 5 ) func init() { flag.String(CfgServerAddress, "ressims.iota.cafe:188", "tcp server for collecting analysis information") } var ( // Plugin is the plugin instance of the analysis client plugin. Plugin = node.NewPlugin(PluginName, node.Enabled, run) log *logger.Logger managedConn *network.ManagedConnection connLock sync.Mutex ) func run(_ *node.Plugin) { log = logger.NewLogger(PluginName) conn, err := net.Dial("tcp", config.Node.GetString(CfgServerAddress)) if err != nil { log.Debugf("Could not connect to reporting server: %s", err.Error()) return } managedConn = network.NewManagedConnection(conn) if err := daemon.BackgroundWorker(PluginName, func(shutdownSignal <-chan struct{}) { fpctest.Voter().Events().RoundExecuted.Attach(events.NewClosure(onRoundExecuted)) defer fpctest.Voter().Events().RoundExecuted.Detach(events.NewClosure(onRoundExecuted)) ticker := time.NewTicker(reportIntervalSec * time.Second) defer ticker.Stop() for { select { case <-shutdownSignal: return case <-ticker.C: sendHeartbeat(managedConn, createHeartbeat()) } } }, shutdown.PriorityAnalysis); err != nil { log.Panic(err) } } // EventDispatchers holds the Heartbeat function. type EventDispatchers struct { // Heartbeat defines the Heartbeat function. Heartbeat func(heartbeat *packet.Heartbeat) } func sendHeartbeat(conn *network.ManagedConnection, hb *packet.Heartbeat) { var out strings.Builder for _, value := range hb.OutboundIDs { out.WriteString(hex.EncodeToString(value)) } var in strings.Builder for _, value := range hb.InboundIDs { in.WriteString(hex.EncodeToString(value)) } log.Debugw( "Heartbeat", "nodeID", hex.EncodeToString(hb.OwnID), "outboundIDs", out.String(), "inboundIDs", in.String(), ) data, err := packet.NewHeartbeatMessage(hb) if err != nil { log.Info(err, " - heartbeat message skipped") return } connLock.Lock() defer connLock.Unlock() if _, err = conn.Write(data); err != nil { log.Debugw("Error while writing to connection", "Description", err) } } func createHeartbeat() *packet.Heartbeat { // get own ID var nodeID []byte if local.GetInstance() != nil { // doesn't copy the ID, take care not to modify underlying bytearray! nodeID = local.GetInstance().ID().Bytes() } var outboundIDs [][]byte var inboundIDs [][]byte // get outboundIDs (chosen neighbors) outgoingNeighbors := autopeering.Selection().GetOutgoingNeighbors() outboundIDs = make([][]byte, len(outgoingNeighbors)) for i, neighbor := range outgoingNeighbors { // doesn't copy the ID, take care not to modify underlying bytearray! outboundIDs[i] = neighbor.ID().Bytes() } // get inboundIDs (accepted neighbors) incomingNeighbors := autopeering.Selection().GetIncomingNeighbors() inboundIDs = make([][]byte, len(incomingNeighbors)) for i, neighbor := range incomingNeighbors { // doesn't copy the ID, take care not to modify underlying bytearray! inboundIDs[i] = neighbor.ID().Bytes() } return &packet.Heartbeat{OwnID: nodeID, OutboundIDs: outboundIDs, InboundIDs: inboundIDs} } func onRoundExecuted(roundStats *vote.RoundStats) { // get own ID var nodeID []byte if local.GetInstance() != nil { // doesn't copy the ID, take care not to modify underlying bytearray! nodeID = local.GetInstance().ID().Bytes() } rs := vote.RoundStats{ Duration: roundStats.Duration, RandUsed: roundStats.RandUsed, ActiveVoteContexts: roundStats.ActiveVoteContexts, } hb := &packet.FPCHeartbeat{ OwnID: nodeID, RoundStats: rs, } data, err := packet.NewFPCHeartbeatMessage(hb) if err != nil { log.Info(err, " - FPC heartbeat message skipped") return } log.Info("Client: onRoundExecuted data size: ", len(data)) connLock.Lock() defer connLock.Unlock() if _, err = managedConn.Write(data); err != nil { log.Debugw("Error while writing to connection", "Description", err) return } }