Skip to content
Snippets Groups Projects
Select Git revision
  • 8cf070332c63c1b96f4788bcbecc60d080605b30
  • develop default protected
  • congestioncontrol
  • merge-v-data-collection-spammer-0.8.2
  • WIP-merge-v-data-collection-spammer-0.8.2
  • merge-v-data-collection-spammer-0.7.7
  • tmp
  • test-masterpow-fixing
  • test-masterpow
  • test-echo
  • v-data-collection
  • v-data-collection-spammer
  • tmp-dump-spam-info
  • dump-msg-info-0.3.1
  • test-dump-message-info
  • spammer-exprandom
  • extra/tutorial
  • without_tipselection
  • hacking-docker-network
  • hacking-docker-network-0.2.3
  • master
  • v0.2.3
22 results

go.mod

Blame
  • This project manages its dependencies using Go Modules. Learn more
    plugin.go 7.17 KiB
    package metrics
    
    import (
    	"sync"
    	"time"
    
    	"github.com/iotaledger/goshimmer/dapps/valuetransfers"
    	"github.com/iotaledger/goshimmer/dapps/valuetransfers/packages/payload"
    	valuetangle "github.com/iotaledger/goshimmer/dapps/valuetransfers/packages/tangle"
    	"github.com/iotaledger/goshimmer/packages/binary/messagelayer/message"
    	"github.com/iotaledger/goshimmer/packages/binary/messagelayer/tangle"
    	"github.com/iotaledger/goshimmer/packages/metrics"
    	"github.com/iotaledger/goshimmer/packages/shutdown"
    	"github.com/iotaledger/goshimmer/packages/vote"
    	"github.com/iotaledger/goshimmer/plugins/analysis/server"
    	"github.com/iotaledger/goshimmer/plugins/autopeering"
    	"github.com/iotaledger/goshimmer/plugins/config"
    	"github.com/iotaledger/goshimmer/plugins/gossip"
    	"github.com/iotaledger/goshimmer/plugins/messagelayer"
    	"github.com/iotaledger/hive.go/daemon"
    	"github.com/iotaledger/hive.go/events"
    	"github.com/iotaledger/hive.go/logger"
    	"github.com/iotaledger/hive.go/node"
    	"github.com/iotaledger/hive.go/objectstorage"
    	"github.com/iotaledger/hive.go/timeutil"
    )
    
    // PluginName is the name of the metrics plugin.
    const PluginName = "Metrics"
    
    var (
    	// plugin is the plugin instance of the metrics plugin.
    	plugin *node.Plugin
    	once   sync.Once
    	log    *logger.Logger
    )
    
    // Plugin gets the plugin instance.
    func Plugin() *node.Plugin {
    	once.Do(func() {
    		plugin = node.NewPlugin(PluginName, node.Enabled, configure, run)
    	})
    	return plugin
    }
    
    func configure(_ *node.Plugin) {
    	log = logger.NewLogger(PluginName)
    }
    
    func run(_ *node.Plugin) {
    	log.Infof("Starting %s ...", PluginName)
    	if config.Node().GetBool(CfgMetricsLocal) {
    		// initial measurement, since we have to know how many messages are there in the db
    		measureInitialDBStats()
    		registerLocalMetrics()
    	}
    
    	// Events from analysis server
    	if config.Node().GetBool(CfgMetricsGlobal) {
    		server.Events.MetricHeartbeat.Attach(onMetricHeartbeatReceived)
    	}
    
    	// create a background worker that update the metrics every second
    	if err := daemon.BackgroundWorker("Metrics Updater", func(shutdownSignal <-chan struct{}) {
    		defer log.Infof("Stopping Metrics Updater ... done")
    		if config.Node().GetBool(CfgMetricsLocal) {
    			timeutil.Ticker(func() {
    				measureCPUUsage()
    				measureMemUsage()
    				measureSynced()
    				measureMessageTips()
    				measureValueTips()
    				measureReceivedMPS()
    				measureRequestQueueSize()
    
    				// gossip network traffic
    				g := gossipCurrentTraffic()
    				gossipCurrentRx.Store(uint64(g.BytesRead))
    				gossipCurrentTx.Store(uint64(g.BytesWritten))
    			}, 1*time.Second, shutdownSignal)
    		}
    		if config.Node().GetBool(CfgMetricsGlobal) {
    			timeutil.Ticker(calculateNetworkDiameter, 1*time.Minute, shutdownSignal)
    		}
    		log.Infof("Stopping Metrics Updater ...")
    	}, shutdown.PriorityMetrics); err != nil {
    		log.Panicf("Failed to start as daemon: %s", err)
    	}
    }
    
    func registerLocalMetrics() {
    	//// Events declared in other packages which we want to listen to here ////
    
    	// increase received MPS counter whenever we attached a message
    	messagelayer.Tangle().Events.MessageAttached.Attach(events.NewClosure(func(cachedMessage *message.CachedMessage, cachedMessageMetadata *tangle.CachedMessageMetadata) {
    		_payloadType := cachedMessage.Unwrap().Payload().Type()
    		cachedMessage.Release()
    		cachedMessageMetadata.Release()
    		increaseReceivedMPSCounter()
    		increasePerPayloadCounter(_payloadType)
    		// MessageAttached is triggered in storeMessageWorker that saves the msg to database
    		messageTotalCountDB.Inc()
    
    	}))
    
    	messagelayer.Tangle().Events.MessageRemoved.Attach(events.NewClosure(func(messageId message.Id) {
    		// MessageRemoved triggered when the message gets removed from database.
    		messageTotalCountDB.Dec()
    	}))
    
    	// messages can only become solid once, then they stay like that, hence no .Dec() part
    	messagelayer.Tangle().Events.MessageSolid.Attach(events.NewClosure(func(cachedMessage *message.CachedMessage, cachedMessageMetadata *tangle.CachedMessageMetadata) {
    		cachedMessage.Release()
    		solidTimeMutex.Lock()
    		defer solidTimeMutex.Unlock()
    		// Consume should release cachedMessageMetadata
    		cachedMessageMetadata.Consume(func(object objectstorage.StorableObject) {
    			msgMetaData := object.(*tangle.MessageMetadata)
    			if msgMetaData.IsSolid() {
    				messageSolidCountDBInc.Inc()
    				sumSolidificationTime += msgMetaData.SolidificationTime().Sub(msgMetaData.ReceivedTime())
    			}
    		})
    	}))
    
    	// fired when a message gets added to missing message storage
    	messagelayer.Tangle().Events.MessageMissing.Attach(events.NewClosure(func(messageId message.Id) {
    		missingMessageCountDB.Inc()
    	}))
    
    	// fired when a missing message was received and removed from missing message storage
    	messagelayer.Tangle().Events.MissingMessageReceived.Attach(events.NewClosure(func(cachedMessage *message.CachedMessage, cachedMessageMetadata *tangle.CachedMessageMetadata) {
    		cachedMessage.Release()
    		cachedMessageMetadata.Release()
    		missingMessageCountDB.Dec()
    	}))
    
    	// Value payload attached
    	valuetransfers.Tangle().Events.PayloadAttached.Attach(events.NewClosure(func(cachedPayload *payload.CachedPayload, cachedPayloadMetadata *valuetangle.CachedPayloadMetadata) {
    		cachedPayload.Release()
    		cachedPayloadMetadata.Release()
    		valueTransactionCounter.Inc()
    	}))
    
    	// FPC round executed
    	valuetransfers.Voter().Events().RoundExecuted.Attach(events.NewClosure(func(roundStats *vote.RoundStats) {
    		processRoundStats(roundStats)
    	}))
    
    	// a conflict has been finalized
    	valuetransfers.Voter().Events().Finalized.Attach(events.NewClosure(func(ev *vote.OpinionEvent) {
    		processFinalized(ev.Ctx)
    	}))
    
    	// consensus failure in conflict resolution
    	valuetransfers.Voter().Events().Failed.Attach(events.NewClosure(func(ev *vote.OpinionEvent) {
    		processFailed(ev.Ctx)
    	}))
    
    	//// Events coming from metrics package ////
    
    	metrics.Events().FPCInboundBytes.Attach(events.NewClosure(func(amountBytes uint64) {
    		_FPCInboundBytes.Add(amountBytes)
    	}))
    	metrics.Events().FPCOutboundBytes.Attach(events.NewClosure(func(amountBytes uint64) {
    		_FPCOutboundBytes.Add(amountBytes)
    	}))
    	metrics.Events().AnalysisOutboundBytes.Attach(events.NewClosure(func(amountBytes uint64) {
    		analysisOutboundBytes.Add(amountBytes)
    	}))
    	metrics.Events().CPUUsage.Attach(events.NewClosure(func(cpuPercent float64) {
    		cpuUsage.Store(cpuPercent)
    	}))
    	metrics.Events().MemUsage.Attach(events.NewClosure(func(memAllocBytes uint64) {
    		memUsageBytes.Store(memAllocBytes)
    	}))
    	metrics.Events().Synced.Attach(events.NewClosure(func(synced bool) {
    		isSynced.Store(synced)
    	}))
    
    	gossip.Manager().Events().NeighborRemoved.Attach(onNeighborRemoved)
    	gossip.Manager().Events().NeighborAdded.Attach(onNeighborAdded)
    
    	autopeering.Selection().Events().IncomingPeering.Attach(onAutopeeringSelection)
    	autopeering.Selection().Events().OutgoingPeering.Attach(onAutopeeringSelection)
    
    	metrics.Events().MessageTips.Attach(events.NewClosure(func(tipsCount uint64) {
    		messageTips.Store(tipsCount)
    	}))
    	metrics.Events().ValueTips.Attach(events.NewClosure(func(tipsCount uint64) {
    		valueTips.Store(tipsCount)
    	}))
    
    	metrics.Events().QueryReceived.Attach(events.NewClosure(func(ev *metrics.QueryReceivedEvent) {
    		processQueryReceived(ev)
    	}))
    	metrics.Events().QueryReplyError.Attach(events.NewClosure(func(ev *metrics.QueryReplyErrorEvent) {
    		processQueryReplyError(ev)
    	}))
    }