Skip to content
Snippets Groups Projects
Unverified Commit 8a8a1e53 authored by capossele's avatar capossele
Browse files

:chart_with_upwards_trend: Add clients metrics collection via the analysis server

parent 4cfff8e7
No related branches found
No related tags found
No related merge requests found
package client
import (
"sync"
"github.com/iotaledger/goshimmer/packages/metrics"
"github.com/iotaledger/goshimmer/packages/vote"
"github.com/iotaledger/goshimmer/plugins/analysis/packet"
"github.com/iotaledger/goshimmer/plugins/autopeering/local"
)
var (
finalized map[string]vote.Opinion
finalizedMutex sync.RWMutex
)
func onFinalized(id string, opinion vote.Opinion) {
finalizedMutex.Lock()
finalized[id] = opinion
finalizedMutex.Unlock()
}
func onRoundExecuted(roundStats *vote.RoundStats) {
// get own ID
var nodeID []byte
if local.GetInstance() != nil {
// doesn't copy the ID, take care not to modify underlying bytearray!
nodeID = local.GetInstance().ID().Bytes()
}
chunks := splitFPCVoteContext(roundStats.ActiveVoteContexts)
connLock.Lock()
defer connLock.Unlock()
for _, chunk := range chunks {
// abort if empty round
if len(chunk) == 0 {
return
}
rs := vote.RoundStats{
Duration: roundStats.Duration,
RandUsed: roundStats.RandUsed,
ActiveVoteContexts: chunk,
}
hb := &packet.FPCHeartbeat{
OwnID: nodeID,
RoundStats: rs,
}
finalizedMutex.Lock()
hb.Finalized = finalized
finalized = make(map[string]vote.Opinion)
finalizedMutex.Unlock()
data, err := packet.NewFPCHeartbeatMessage(hb)
if err != nil {
log.Info(err, " - FPC heartbeat message skipped")
return
}
log.Info("Client: onRoundExecuted data size: ", len(data))
if _, err = managedConn.Write(data); err != nil {
log.Debugw("Error while writing to connection", "Description", err)
return
}
// trigger AnalysisOutboundBytes event
metrics.Events().AnalysisOutboundBytes.Trigger(uint64(len(data)))
}
}
func splitFPCVoteContext(ctx map[string]*vote.Context) (chunk []map[string]*vote.Context) {
chunk = make([]map[string]*vote.Context, 1)
i, counter := 0, 0
chunk[i] = make(map[string]*vote.Context)
if len(ctx) < maxVoteContext {
chunk[i] = ctx
return
}
for conflictID, voteCtx := range ctx {
counter++
if counter >= maxVoteContext {
counter = 0
i++
chunk = append(chunk, make(map[string]*vote.Context))
}
chunk[i][conflictID] = voteCtx
}
return
}
package client
import (
"strings"
"github.com/iotaledger/goshimmer/packages/metrics"
"github.com/iotaledger/goshimmer/plugins/analysis/packet"
"github.com/iotaledger/goshimmer/plugins/autopeering"
"github.com/iotaledger/goshimmer/plugins/autopeering/local"
"github.com/iotaledger/hive.go/network"
"github.com/mr-tron/base58"
)
// EventDispatchers holds the Heartbeat function.
type EventDispatchers struct {
// Heartbeat defines the Heartbeat function.
Heartbeat func(heartbeat *packet.Heartbeat)
}
func sendHeartbeat(conn *network.ManagedConnection, hb *packet.Heartbeat) {
var out strings.Builder
for _, value := range hb.OutboundIDs {
out.WriteString(base58.Encode(value))
}
var in strings.Builder
for _, value := range hb.InboundIDs {
in.WriteString(base58.Encode(value))
}
log.Debugw(
"Heartbeat",
"nodeID", base58.Encode(hb.OwnID),
"outboundIDs", out.String(),
"inboundIDs", in.String(),
)
data, err := packet.NewHeartbeatMessage(hb)
if err != nil {
log.Info(err, " - heartbeat message skipped")
return
}
connLock.Lock()
defer connLock.Unlock()
if _, err = conn.Write(data); err != nil {
log.Debugw("Error while writing to connection", "Description", err)
}
// trigger AnalysisOutboundBytes event
metrics.Events().AnalysisOutboundBytes.Trigger(uint64(len(data)))
}
func createHeartbeat() *packet.Heartbeat {
// get own ID
var nodeID []byte
if local.GetInstance() != nil {
// doesn't copy the ID, take care not to modify underlying bytearray!
nodeID = local.GetInstance().ID().Bytes()
}
var outboundIDs [][]byte
var inboundIDs [][]byte
// get outboundIDs (chosen neighbors)
outgoingNeighbors := autopeering.Selection().GetOutgoingNeighbors()
outboundIDs = make([][]byte, len(outgoingNeighbors))
for i, neighbor := range outgoingNeighbors {
// doesn't copy the ID, take care not to modify underlying bytearray!
outboundIDs[i] = neighbor.ID().Bytes()
}
// get inboundIDs (accepted neighbors)
incomingNeighbors := autopeering.Selection().GetIncomingNeighbors()
inboundIDs = make([][]byte, len(incomingNeighbors))
for i, neighbor := range incomingNeighbors {
// doesn't copy the ID, take care not to modify underlying bytearray!
inboundIDs[i] = neighbor.ID().Bytes()
}
return &packet.Heartbeat{OwnID: nodeID, OutboundIDs: outboundIDs, InboundIDs: inboundIDs}
}
package client
import (
"runtime"
"time"
"github.com/iotaledger/goshimmer/packages/metrics"
"github.com/iotaledger/goshimmer/plugins/analysis/packet"
"github.com/iotaledger/goshimmer/plugins/autopeering/local"
"github.com/iotaledger/hive.go/network"
"github.com/shirou/gopsutil/cpu"
)
func sendMetricHeartbeat(conn *network.ManagedConnection, hb *packet.MetricHeartbeat) {
data, err := packet.NewMetricHeartbeatMessage(hb)
if err != nil {
log.Info(err, " - metric heartbeat message skipped")
return
}
connLock.Lock()
defer connLock.Unlock()
if _, err = conn.Write(data); err != nil {
log.Debugw("Error while writing to connection", "Description", err)
}
// trigger AnalysisOutboundBytes event
metrics.Events().AnalysisOutboundBytes.Trigger(uint64(len(data)))
}
func createMetricHeartbeat() *packet.MetricHeartbeat {
// get own ID
var nodeID []byte
if local.GetInstance() != nil {
// doesn't copy the ID, take care not to modify underlying bytearray!
nodeID = local.GetInstance().ID().Bytes()
}
return &packet.MetricHeartbeat{
OwnID: nodeID,
OS: runtime.GOOS,
Arch: runtime.GOARCH,
NumCPU: runtime.NumCPU(),
CPUUsage: func() (p float64) {
percent, err := cpu.Percent(time.Second, false)
if err == nil {
p = percent[0]
}
return
}(),
MemoryUsage: func() uint64 {
var m runtime.MemStats
runtime.ReadMemStats(&m)
return m.Alloc
}(),
}
}
......@@ -2,24 +2,18 @@ package client
import (
"net"
"strings"
"sync"
"time"
"github.com/iotaledger/goshimmer/dapps/valuetransfers"
"github.com/iotaledger/goshimmer/packages/metrics"
"github.com/iotaledger/goshimmer/packages/shutdown"
"github.com/iotaledger/goshimmer/packages/vote"
"github.com/iotaledger/goshimmer/plugins/analysis/packet"
"github.com/iotaledger/goshimmer/plugins/autopeering"
"github.com/iotaledger/goshimmer/plugins/autopeering/local"
"github.com/iotaledger/goshimmer/plugins/config"
"github.com/iotaledger/hive.go/daemon"
"github.com/iotaledger/hive.go/events"
"github.com/iotaledger/hive.go/logger"
"github.com/iotaledger/hive.go/network"
"github.com/iotaledger/hive.go/node"
"github.com/mr-tron/base58"
flag "github.com/spf13/pflag"
)
......@@ -44,9 +38,6 @@ var (
log *logger.Logger
managedConn *network.ManagedConnection
connLock sync.Mutex
finalized map[string]vote.Opinion
finalizedMutex sync.RWMutex
)
func run(_ *node.Plugin) {
......@@ -80,157 +71,10 @@ func run(_ *node.Plugin) {
case <-ticker.C:
sendHeartbeat(managedConn, createHeartbeat())
sendMetricHeartbeat(managedConn, createMetricHeartbeat())
}
}
}, shutdown.PriorityAnalysis); err != nil {
log.Panicf("Failed to start as daemon: %s", err)
}
}
func onFinalized(id string, opinion vote.Opinion) {
finalizedMutex.Lock()
finalized[id] = opinion
finalizedMutex.Unlock()
}
// EventDispatchers holds the Heartbeat function.
type EventDispatchers struct {
// Heartbeat defines the Heartbeat function.
Heartbeat func(heartbeat *packet.Heartbeat)
}
func sendHeartbeat(conn *network.ManagedConnection, hb *packet.Heartbeat) {
var out strings.Builder
for _, value := range hb.OutboundIDs {
out.WriteString(base58.Encode(value))
}
var in strings.Builder
for _, value := range hb.InboundIDs {
in.WriteString(base58.Encode(value))
}
log.Debugw(
"Heartbeat",
"nodeID", base58.Encode(hb.OwnID),
"outboundIDs", out.String(),
"inboundIDs", in.String(),
)
data, err := packet.NewHeartbeatMessage(hb)
if err != nil {
log.Info(err, " - heartbeat message skipped")
return
}
connLock.Lock()
defer connLock.Unlock()
if _, err = conn.Write(data); err != nil {
log.Debugw("Error while writing to connection", "Description", err)
}
// trigger AnalysisOutboundBytes event
metrics.Events().AnalysisOutboundBytes.Trigger(uint64(len(data)))
}
func createHeartbeat() *packet.Heartbeat {
// get own ID
var nodeID []byte
if local.GetInstance() != nil {
// doesn't copy the ID, take care not to modify underlying bytearray!
nodeID = local.GetInstance().ID().Bytes()
}
var outboundIDs [][]byte
var inboundIDs [][]byte
// get outboundIDs (chosen neighbors)
outgoingNeighbors := autopeering.Selection().GetOutgoingNeighbors()
outboundIDs = make([][]byte, len(outgoingNeighbors))
for i, neighbor := range outgoingNeighbors {
// doesn't copy the ID, take care not to modify underlying bytearray!
outboundIDs[i] = neighbor.ID().Bytes()
}
// get inboundIDs (accepted neighbors)
incomingNeighbors := autopeering.Selection().GetIncomingNeighbors()
inboundIDs = make([][]byte, len(incomingNeighbors))
for i, neighbor := range incomingNeighbors {
// doesn't copy the ID, take care not to modify underlying bytearray!
inboundIDs[i] = neighbor.ID().Bytes()
}
return &packet.Heartbeat{OwnID: nodeID, OutboundIDs: outboundIDs, InboundIDs: inboundIDs}
}
func onRoundExecuted(roundStats *vote.RoundStats) {
// get own ID
var nodeID []byte
if local.GetInstance() != nil {
// doesn't copy the ID, take care not to modify underlying bytearray!
nodeID = local.GetInstance().ID().Bytes()
}
chunks := splitFPCVoteContext(roundStats.ActiveVoteContexts)
connLock.Lock()
defer connLock.Unlock()
for _, chunk := range chunks {
// abort if empty round
if len(chunk) == 0 {
return
}
rs := vote.RoundStats{
Duration: roundStats.Duration,
RandUsed: roundStats.RandUsed,
ActiveVoteContexts: chunk,
}
hb := &packet.FPCHeartbeat{
OwnID: nodeID,
RoundStats: rs,
}
finalizedMutex.Lock()
hb.Finalized = finalized
finalized = make(map[string]vote.Opinion)
finalizedMutex.Unlock()
data, err := packet.NewFPCHeartbeatMessage(hb)
if err != nil {
log.Info(err, " - FPC heartbeat message skipped")
return
}
log.Info("Client: onRoundExecuted data size: ", len(data))
if _, err = managedConn.Write(data); err != nil {
log.Debugw("Error while writing to connection", "Description", err)
return
}
// trigger AnalysisOutboundBytes event
metrics.Events().AnalysisOutboundBytes.Trigger(uint64(len(data)))
}
}
func splitFPCVoteContext(ctx map[string]*vote.Context) (chunk []map[string]*vote.Context) {
chunk = make([]map[string]*vote.Context, 1)
i, counter := 0, 0
chunk[i] = make(map[string]*vote.Context)
if len(ctx) < maxVoteContext {
chunk[i] = ctx
return
}
for conflictID, voteCtx := range ctx {
counter++
if counter >= maxVoteContext {
counter = 0
i++
chunk = append(chunk, make(map[string]*vote.Context))
}
chunk[i][conflictID] = voteCtx
}
return
}
......@@ -21,6 +21,7 @@ func init() {
tlv.HeaderMessageDefinition,
HeartbeatMessageDefinition,
FPCHeartbeatMessageDefinition,
MetricHeartbeatMessageDefinition,
}
AnalysisMsgRegistry = message.NewRegistry(definitions)
}
......@@ -21,6 +21,8 @@ var Events = struct {
Heartbeat *events.Event
// FPCHeartbeat triggers when an FPC heartbeat has been received.
FPCHeartbeat *events.Event
// MetricHeartbeat triggers when an MetricHeartbeat heartbeat has been received.
MetricHeartbeat *events.Event
}{
events.NewEvent(stringCaller),
events.NewEvent(stringCaller),
......@@ -29,6 +31,7 @@ var Events = struct {
events.NewEvent(errorCaller),
events.NewEvent(heartbeatPacketCaller),
events.NewEvent(fpcHeartbeatPacketCaller),
events.NewEvent(metricHeartbeatPacketCaller),
}
func stringCaller(handler interface{}, params ...interface{}) {
......@@ -50,3 +53,7 @@ func heartbeatPacketCaller(handler interface{}, params ...interface{}) {
func fpcHeartbeatPacketCaller(handler interface{}, params ...interface{}) {
handler.(func(hb *packet.FPCHeartbeat))(params[0].(*packet.FPCHeartbeat))
}
func metricHeartbeatPacketCaller(handler interface{}, params ...interface{}) {
handler.(func(hb *packet.MetricHeartbeat))(params[0].(*packet.MetricHeartbeat))
}
......@@ -112,6 +112,9 @@ func wireUp(p *protocol.Protocol) {
p.Events.Received[packet.MessageTypeFPCHeartbeat].Attach(events.NewClosure(func(data []byte) {
processFPCHeartbeatPacket(data, p)
}))
p.Events.Received[packet.MessageTypeMetricHeartbeat].Attach(events.NewClosure(func(data []byte) {
processMetricHeartbeatPacket(data, p)
}))
}
// processHeartbeatPacket parses the serialized data into a Heartbeat packet and triggers its event
......@@ -125,7 +128,7 @@ func processHeartbeatPacket(data []byte, p *protocol.Protocol) {
Events.Heartbeat.Trigger(heartbeatPacket)
}
// processHeartbeatPacket parses the serialized data into a Heartbeat packet and triggers its event
// processHeartbeatPacket parses the serialized data into a FPC Heartbeat packet and triggers its event
func processFPCHeartbeatPacket(data []byte, p *protocol.Protocol) {
hb, err := packet.ParseFPCHeartbeat(data)
if err != nil {
......@@ -135,3 +138,14 @@ func processFPCHeartbeatPacket(data []byte, p *protocol.Protocol) {
}
Events.FPCHeartbeat.Trigger(hb)
}
// processMetricHeartbeatPacket parses the serialized data into a MEtric Heartbeat packet and triggers its event
func processMetricHeartbeatPacket(data []byte, p *protocol.Protocol) {
hb, err := packet.ParseMetricHeartbeat(data)
if err != nil {
Events.Error.Trigger(err)
p.CloseConnection()
return
}
Events.MetricHeartbeat.Trigger(hb)
}
package metrics
import "time"
import (
"time"
flag "github.com/spf13/pflag"
)
const (
// should always be 1 second
......@@ -13,3 +17,15 @@ const (
MemUsageMeasurementInterval = 1 * time.Second
SyncedMeasurementInterval = 1 * time.Second
)
const (
// CfgMetricsLocal defines the config flag to enable/disable local metrics.
CfgMetricsLocal = "metrics.local"
// CfgMetricsGlobal defines the config flag to enable/disable global metrics.
CfgMetricsGlobal = "metrics.global"
)
func init() {
flag.Bool(CfgMetricsLocal, true, "include local metrics")
flag.Bool(CfgMetricsGlobal, false, "include global metrics")
}
......@@ -12,7 +12,9 @@ import (
"github.com/iotaledger/goshimmer/packages/metrics"
"github.com/iotaledger/goshimmer/packages/shutdown"
"github.com/iotaledger/goshimmer/packages/vote"
"github.com/iotaledger/goshimmer/plugins/analysis/server"
"github.com/iotaledger/goshimmer/plugins/autopeering"
"github.com/iotaledger/goshimmer/plugins/config"
"github.com/iotaledger/goshimmer/plugins/gossip"
"github.com/iotaledger/goshimmer/plugins/messagelayer"
"github.com/iotaledger/hive.go/daemon"
......@@ -36,6 +38,40 @@ func configure(_ *node.Plugin) {
func run(_ *node.Plugin) {
if config.Node.GetBool(CfgMetricsLocal) {
registerLocalMetrics()
}
// Events from analysis server
if config.Node.GetBool(CfgMetricsGlobal) {
server.Events.MetricHeartbeat.Attach(onMetricHeartbeatReceived)
}
// create a background worker that update the metrics every second
if err := daemon.BackgroundWorker("Metrics Updater", func(shutdownSignal <-chan struct{}) {
if config.Node.GetBool(CfgMetricsLocal) {
timeutil.Ticker(func() {
measureReceivedMPS()
measureCPUUsage()
measureMemUsage()
measureSynced()
// gossip network traffic
g := gossipCurrentTraffic()
gossipCurrentRx.Store(uint64(g.BytesRead))
gossipCurrentTx.Store(uint64(g.BytesWritten))
}, 1*time.Second, shutdownSignal)
timeutil.Ticker(measureMPSPerPayload, MPSMeasurementInterval, shutdownSignal)
timeutil.Ticker(measureMessageTips, MessageTipsMeasurementInterval, shutdownSignal)
timeutil.Ticker(measureReceivedTPS, TPSMeasurementInterval, shutdownSignal)
timeutil.Ticker(measureValueTips, ValueTipsMeasurementInterval, shutdownSignal)
}
}, shutdown.PriorityMetrics); err != nil {
log.Panicf("Failed to start as daemon: %s", err)
}
}
func registerLocalMetrics() {
//// Events declared in other packages which we want to listen to here ////
// increase received MPS counter whenever we attached a message
......@@ -110,26 +146,4 @@ func run(_ *node.Plugin) {
metrics.Events().QueryReplyError.Attach(events.NewClosure(func(ev *metrics.QueryReplyErrorEvent) {
processQueryReplyError(ev)
}))
// create a background worker that "measures" the MPS value every second
if err := daemon.BackgroundWorker("Metrics Updater", func(shutdownSignal <-chan struct{}) {
timeutil.Ticker(func() {
measureReceivedMPS()
measureCPUUsage()
measureMemUsage()
measureSynced()
// gossip network traffic
g := gossipCurrentTraffic()
gossipCurrentRx.Store(uint64(g.BytesRead))
gossipCurrentTx.Store(uint64(g.BytesWritten))
}, 1*time.Second, shutdownSignal)
timeutil.Ticker(measureMPSPerPayload, MPSMeasurementInterval, shutdownSignal)
timeutil.Ticker(measureMessageTips, MessageTipsMeasurementInterval, shutdownSignal)
timeutil.Ticker(measureReceivedTPS, TPSMeasurementInterval, shutdownSignal)
timeutil.Ticker(measureValueTips, ValueTipsMeasurementInterval, shutdownSignal)
}, shutdown.PriorityMetrics); err != nil {
log.Panicf("Failed to start as daemon: %s", err)
}
}
package metrics
import (
"bytes"
"encoding/gob"
"sync"
"github.com/iotaledger/goshimmer/plugins/analysis/packet"
"github.com/iotaledger/hive.go/events"
"github.com/mr-tron/base58/base58"
)
type ClientInfo struct {
OS string
Arch string
NumCPU int
CPUUsage float64
MemoryUsage uint64
}
var (
clientsMetrics = make(map[string]ClientInfo)
clientsMetricsMutex sync.RWMutex
)
var onMetricHeartbeatReceived = events.NewClosure(func(hb *packet.MetricHeartbeat) {
clientsMetricsMutex.Lock()
defer clientsMetricsMutex.Unlock()
clientsMetrics[base58.Encode(hb.OwnID)] = ClientInfo{
OS: hb.OS,
Arch: hb.Arch,
NumCPU: hb.NumCPU,
CPUUsage: hb.CPUUsage,
MemoryUsage: hb.MemoryUsage,
}
})
func ClientsMetrics() map[string]ClientInfo {
clientsMetricsMutex.RLock()
defer clientsMetricsMutex.RUnlock()
var buf bytes.Buffer
enc := gob.NewEncoder(&buf)
err := enc.Encode(clientsMetrics)
if err != nil {
return nil
}
dec := gob.NewDecoder(&buf)
var copy map[string]ClientInfo
err = dec.Decode(&copy)
if err != nil {
return nil
}
return copy
}
package prometheus
......@@ -24,7 +24,9 @@ services:
--analysis.dashboard.bindAddress=0.0.0.0:9000
--node.enablePlugins=analysis-server,analysis-dashboard
--analysis.dashboard.dev=false
--node.disablePlugins=portcheck,dashboard,analysis-client,gossip,drng,issuer,sync,metrics,messagelayer,valuetransfers,webapi,webapibroadcastdataendpoint,webapifindtransactionhashesendpoint,webapigetneighborsendpoint,webapigettransactionobjectsbyhashendpoint,webapigettransactiontrytesbyhashendpoint
--metrics.local=false
--metrics.global=true
--node.disablePlugins=portcheck,dashboard,analysis-client,gossip,drng,issuer,sync,messagelayer,valuetransfers,webapi,webapibroadcastdataendpoint,webapifindtransactionhashesendpoint,webapigetneighborsendpoint,webapigettransactionobjectsbyhashendpoint,webapigettransactiontrytesbyhashendpoint
volumes:
- ./config.docker.json:/tmp/config.json:ro
- goshimmer-cache:/go
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment