package metrics import ( "time" "github.com/iotaledger/goshimmer/dapps/valuetransfers" "github.com/iotaledger/goshimmer/dapps/valuetransfers/packages/payload" valuetangle "github.com/iotaledger/goshimmer/dapps/valuetransfers/packages/tangle" "github.com/iotaledger/goshimmer/packages/binary/messagelayer/message" "github.com/iotaledger/goshimmer/packages/binary/messagelayer/tangle" "github.com/iotaledger/goshimmer/packages/metrics" "github.com/iotaledger/goshimmer/packages/shutdown" "github.com/iotaledger/goshimmer/packages/vote" "github.com/iotaledger/goshimmer/plugins/analysis/server" "github.com/iotaledger/goshimmer/plugins/autopeering" "github.com/iotaledger/goshimmer/plugins/config" "github.com/iotaledger/goshimmer/plugins/gossip" "github.com/iotaledger/goshimmer/plugins/messagelayer" "github.com/iotaledger/hive.go/daemon" "github.com/iotaledger/hive.go/events" "github.com/iotaledger/hive.go/logger" "github.com/iotaledger/hive.go/node" "github.com/iotaledger/hive.go/timeutil" ) // PluginName is the name of the metrics plugin. const PluginName = "Metrics" var ( // Plugin is the plugin instance of the metrics plugin. Plugin = node.NewPlugin(PluginName, node.Enabled, configure, run) log *logger.Logger ) func configure(_ *node.Plugin) { log = logger.NewLogger(PluginName) } func run(_ *node.Plugin) { if config.Node.GetBool(CfgMetricsLocal) { registerLocalMetrics() } // Events from analysis server if config.Node.GetBool(CfgMetricsGlobal) { server.Events.MetricHeartbeat.Attach(onMetricHeartbeatReceived) } // create a background worker that update the metrics every second if err := daemon.BackgroundWorker("Metrics Updater", func(shutdownSignal <-chan struct{}) { if config.Node.GetBool(CfgMetricsLocal) { timeutil.Ticker(func() { measureCPUUsage() measureMemUsage() measureSynced() measureMessageTips() measureValueTips() measureReceivedMPS() // gossip network traffic g := gossipCurrentTraffic() gossipCurrentRx.Store(uint64(g.BytesRead)) gossipCurrentTx.Store(uint64(g.BytesWritten)) }, 1*time.Second, shutdownSignal) } if config.Node.GetBool(CfgMetricsGlobal) { timeutil.Ticker(calculateNetworkDiameter, 1*time.Minute, shutdownSignal) } }, shutdown.PriorityMetrics); err != nil { log.Panicf("Failed to start as daemon: %s", err) } } func registerLocalMetrics() { //// Events declared in other packages which we want to listen to here //// // increase received MPS counter whenever we attached a message messagelayer.Tangle.Events.MessageAttached.Attach(events.NewClosure(func(cachedMessage *message.CachedMessage, cachedMessageMetadata *tangle.CachedMessageMetadata) { _payloadType := cachedMessage.Unwrap().Payload().Type() cachedMessage.Release() cachedMessageMetadata.Release() increaseReceivedMPSCounter() increasePerPayloadCounter(_payloadType) })) // Value payload attached valuetransfers.Tangle.Events.PayloadAttached.Attach(events.NewClosure(func(cachedPayload *payload.CachedPayload, cachedPayloadMetadata *valuetangle.CachedPayloadMetadata) { cachedPayload.Release() cachedPayloadMetadata.Release() valueTransactionCounter.Inc() })) // FPC round executed valuetransfers.Voter().Events().RoundExecuted.Attach(events.NewClosure(func(roundStats *vote.RoundStats) { processRoundStats(roundStats) })) // a conflict has been finalized valuetransfers.Voter().Events().Finalized.Attach(events.NewClosure(func(ev *vote.OpinionEvent) { processFinalized(ev.Ctx) })) // consensus failure in conflict resolution valuetransfers.Voter().Events().Failed.Attach(events.NewClosure(func(ev *vote.OpinionEvent) { processFailed(ev.Ctx) })) //// Events coming from metrics package //// metrics.Events().FPCInboundBytes.Attach(events.NewClosure(func(amountBytes uint64) { _FPCInboundBytes.Add(amountBytes) })) metrics.Events().FPCOutboundBytes.Attach(events.NewClosure(func(amountBytes uint64) { _FPCOutboundBytes.Add(amountBytes) })) metrics.Events().AnalysisOutboundBytes.Attach(events.NewClosure(func(amountBytes uint64) { analysisOutboundBytes.Add(amountBytes) })) metrics.Events().CPUUsage.Attach(events.NewClosure(func(cpuPercent float64) { cpuUsage.Store(cpuPercent) })) metrics.Events().MemUsage.Attach(events.NewClosure(func(memAllocBytes uint64) { memUsageBytes.Store(memAllocBytes) })) metrics.Events().Synced.Attach(events.NewClosure(func(synced bool) { isSynced.Store(synced) })) gossip.Manager().Events().NeighborRemoved.Attach(onNeighborRemoved) gossip.Manager().Events().NeighborAdded.Attach(onNeighborAdded) autopeering.Selection().Events().IncomingPeering.Attach(onAutopeeringSelection) autopeering.Selection().Events().OutgoingPeering.Attach(onAutopeeringSelection) metrics.Events().MessageTips.Attach(events.NewClosure(func(tipsCount uint64) { messageTips.Store(tipsCount) })) metrics.Events().ValueTips.Attach(events.NewClosure(func(tipsCount uint64) { valueTips.Store(tipsCount) })) metrics.Events().QueryReceived.Attach(events.NewClosure(func(ev *metrics.QueryReceivedEvent) { processQueryReceived(ev) })) metrics.Events().QueryReplyError.Attach(events.NewClosure(func(ev *metrics.QueryReplyErrorEvent) { processQueryReplyError(ev) })) }