From bf151da5b7357348ba97bf1e7d370475117fdbc1 Mon Sep 17 00:00:00 2001 From: capossele <angelocapossele@gmail.com> Date: Tue, 16 Jun 2020 17:34:39 +0100 Subject: [PATCH] =?UTF-8?q?=F0=9F=93=88=20Add=20autopeering=20network=20tr?= =?UTF-8?q?affic=20metric?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- plugins/autopeering/autopeering.go | 6 ++++- plugins/autopeering/netconnmetric.go | 38 ++++++++++++++++++++++++++++ plugins/metrics/network.go | 15 +++++------ plugins/metrics/plugin.go | 7 +++-- plugins/prometheus/local_metrics.go | 23 ++++++++++++++--- 5 files changed, 72 insertions(+), 17 deletions(-) create mode 100644 plugins/autopeering/netconnmetric.go diff --git a/plugins/autopeering/autopeering.go b/plugins/autopeering/autopeering.go index 3fc0bea3..e290ad3b 100644 --- a/plugins/autopeering/autopeering.go +++ b/plugins/autopeering/autopeering.go @@ -35,6 +35,8 @@ var ( // NetworkID specifies the autopeering network identifier NetworkID = hash32([]byte(banner.AppVersion + NetworkVersion)) + + Conn *NetConnMetric ) var ( @@ -142,8 +144,10 @@ func start(shutdownSignal <-chan struct{}) { } defer conn.Close() + Conn = &NetConnMetric{UDPConn: conn} + // start a server doing peerDisc and peering - srv := server.Serve(lPeer, conn, log.Named("srv"), Discovery(), Selection()) + srv := server.Serve(lPeer, Conn, log.Named("srv"), Discovery(), Selection()) defer srv.Close() // start the peer discovery on that connection diff --git a/plugins/autopeering/netconnmetric.go b/plugins/autopeering/netconnmetric.go new file mode 100644 index 00000000..bef457d7 --- /dev/null +++ b/plugins/autopeering/netconnmetric.go @@ -0,0 +1,38 @@ +package autopeering + +import ( + "net" + + "go.uber.org/atomic" +) + +// NetConnMetric is a wrapper of a UDPConn that keeps track of RX and TX bytes. +type NetConnMetric struct { + *net.UDPConn + rxBytes atomic.Uint64 + txBytes atomic.Uint64 +} + +// RXBytes returns the RX bytes. +func (nc *NetConnMetric) RXBytes() uint64 { + return nc.rxBytes.Load() +} + +// RXBytes returns the TX bytes. +func (nc *NetConnMetric) TXBytes() uint64 { + return nc.txBytes.Load() +} + +// ReadFromUDP acts like ReadFrom but returns a UDPAddr. +func (nc *NetConnMetric) ReadFromUDP(b []byte) (int, *net.UDPAddr, error) { + n, addr, err := nc.UDPConn.ReadFromUDP(b) + nc.rxBytes.Add(uint64(n)) + return n, addr, err +} + +// WriteToUDP acts like WriteTo but takes a UDPAddr. +func (nc *NetConnMetric) WriteToUDP(b []byte, addr *net.UDPAddr) (int, error) { + n, err := nc.UDPConn.WriteToUDP(b, addr) + nc.txBytes.Add(uint64(n)) + return n, err +} diff --git a/plugins/metrics/network.go b/plugins/metrics/network.go index 76cb12e6..cf1bfdf4 100644 --- a/plugins/metrics/network.go +++ b/plugins/metrics/network.go @@ -1,33 +1,32 @@ package metrics import ( - "sync/atomic" - "github.com/iotaledger/goshimmer/plugins/gossip" "github.com/iotaledger/hive.go/identity" + "go.uber.org/atomic" ) var ( - _FPCInboundBytes *uint64 - _FPCOutboundBytes *uint64 + _FPCInboundBytes atomic.Uint64 + _FPCOutboundBytes atomic.Uint64 previousNeighbors = make(map[identity.ID]gossipTrafficMetric) gossipOldTx uint32 gossipOldRx uint32 - analysisOutboundBytes *uint64 + analysisOutboundBytes atomic.Uint64 ) func FPCInboundBytes() uint64 { - return atomic.LoadUint64(_FPCInboundBytes) + return _FPCInboundBytes.Load() } func FPCOutboundBytes() uint64 { - return atomic.LoadUint64(_FPCOutboundBytes) + return _FPCOutboundBytes.Load() } func AnalysisOutboundBytes() uint64 { - return atomic.LoadUint64(analysisOutboundBytes) + return analysisOutboundBytes.Load() } type gossipTrafficMetric struct { diff --git a/plugins/metrics/plugin.go b/plugins/metrics/plugin.go index cc99bc3f..974ed49d 100644 --- a/plugins/metrics/plugin.go +++ b/plugins/metrics/plugin.go @@ -1,7 +1,6 @@ package metrics import ( - "sync/atomic" "time" "github.com/iotaledger/goshimmer/packages/binary/messagelayer/message" @@ -35,13 +34,13 @@ func configure(_ *node.Plugin) { increaseReceivedMPSCounter() })) metrics.Events().FPCInboundBytes.Attach(events.NewClosure(func(amountBytes uint64) { - atomic.AddUint64(_FPCInboundBytes, amountBytes) + _FPCInboundBytes.Add(amountBytes) })) metrics.Events().FPCOutboundBytes.Attach(events.NewClosure(func(amountBytes uint64) { - atomic.AddUint64(_FPCOutboundBytes, amountBytes) + _FPCOutboundBytes.Add(amountBytes) })) metrics.Events().AnalysisOutboundBytes.Attach(events.NewClosure(func(amountBytes uint64) { - atomic.AddUint64(analysisOutboundBytes, amountBytes) + analysisOutboundBytes.Add(amountBytes) })) metrics.Events().CPUUsage.Attach(events.NewClosure(func(cpuPercent float64) { cpuLock.Lock() diff --git a/plugins/prometheus/local_metrics.go b/plugins/prometheus/local_metrics.go index 28956a1a..19d80982 100644 --- a/plugins/prometheus/local_metrics.go +++ b/plugins/prometheus/local_metrics.go @@ -1,15 +1,18 @@ package prometheus import ( + "github.com/iotaledger/goshimmer/plugins/autopeering" "github.com/iotaledger/goshimmer/plugins/metrics" "github.com/prometheus/client_golang/prometheus" ) var ( - messagesPerSecond prometheus.Gauge - fpcInboundBytes prometheus.Gauge - fpcOutboundBytes prometheus.Gauge - analysisOutboundBytes prometheus.Gauge + messagesPerSecond prometheus.Gauge + fpcInboundBytes prometheus.Gauge + fpcOutboundBytes prometheus.Gauge + analysisOutboundBytes prometheus.Gauge + autopeeringInboundBytes prometheus.Gauge + autopeeringOutboundBytes prometheus.Gauge ) func init() { @@ -27,6 +30,14 @@ func init() { Name: "fpc_outbound_bytes", Help: "FPC TX network traffic [bytes].", }) + autopeeringInboundBytes = prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "autopeering_inbound_bytes", + Help: "autopeering RX network traffic [bytes].", + }) + autopeeringOutboundBytes = prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "autopeering_outbound_bytes", + Help: "autopeering TX network traffic [bytes].", + }) analysisOutboundBytes = prometheus.NewGauge(prometheus.GaugeOpts{ Name: "analysis_outbound_bytes", Help: "Analysis client TX network traffic [bytes].", @@ -36,6 +47,8 @@ func init() { registry.MustRegister(fpcInboundBytes) registry.MustRegister(fpcOutboundBytes) registry.MustRegister(analysisOutboundBytes) + registry.MustRegister(autopeeringInboundBytes) + registry.MustRegister(autopeeringOutboundBytes) addCollect(collectLocalMetrics) } @@ -45,4 +58,6 @@ func collectLocalMetrics() { fpcInboundBytes.Set(float64(metrics.FPCInboundBytes())) fpcOutboundBytes.Set(float64(metrics.FPCOutboundBytes())) analysisOutboundBytes.Set(float64(metrics.AnalysisOutboundBytes())) + autopeeringInboundBytes.Set(float64(autopeering.Conn.RXBytes())) + autopeeringOutboundBytes.Set(float64(autopeering.Conn.TXBytes())) } -- GitLab