Skip to content
Snippets Groups Projects
Unverified Commit a544fc34 authored by Levente Pap's avatar Levente Pap
Browse files

Merge branch 'prometheus_metrics_part2' into prometheus_metrics

parents 664da235 35959853
No related branches found
No related tags found
No related merge requests found
Showing
with 665 additions and 17 deletions
......@@ -89,8 +89,8 @@ func configure(_ *node.Plugin) {
// configure FPC + link to consensus
configureFPC()
voter.Events().Finalized.Attach(events.NewClosure(FCOB.ProcessVoteResult))
voter.Events().Failed.Attach(events.NewClosure(func(id string, lastOpinion vote.Opinion) {
log.Errorf("FPC failed for transaction with id '%s' - last opinion: '%s'", id, lastOpinion)
voter.Events().Failed.Attach(events.NewClosure(func(ev *vote.OpinionEvent) {
log.Errorf("FPC failed for transaction with id '%s' - last opinion: '%s'", ev.ID, ev.Opinion)
}))
// subscribe to message-layer
......
......@@ -187,12 +187,15 @@ func (pog *PeerOpinionGiver) Query(ctx context.Context, ids []string) (vote.Opin
query := &votenet.QueryRequest{Id: ids}
reply, err := client.Opinion(ctx, query)
if err != nil {
metrics.Events().QueryReplyError.Trigger(&metrics.QueryReplyErrorEvent{
ID: pog.p.ID().String(),
OpinionCount: len(ids),
})
return nil, fmt.Errorf("unable to query opinions: %w", err)
}
metrics.Events().FPCInboundBytes.Trigger(proto.Size(reply))
metrics.Events().FPCOutboundBytes.Trigger(proto.Size(query))
// convert int32s in reply to opinions
opinions := make(vote.Opinions, len(reply.Opinion))
for i, intOpn := range reply.Opinion {
......
......@@ -41,15 +41,15 @@ func NewFCOB(tangle *tangle.Tangle, averageNetworkDelay time.Duration) (fcob *FC
}
// ProcessVoteResult allows an external voter to hand in the results of the voting process.
func (fcob *FCOB) ProcessVoteResult(id string, opinion vote.Opinion) {
transactionID, err := transaction.IDFromBase58(id)
func (fcob *FCOB) ProcessVoteResult(ev *vote.OpinionEvent) {
transactionID, err := transaction.IDFromBase58(ev.ID)
if err != nil {
fcob.Events.Error.Trigger(err)
return
}
if _, err := fcob.tangle.SetTransactionPreferred(transactionID, opinion == vote.Like); err != nil {
if _, err := fcob.tangle.SetTransactionPreferred(transactionID, ev.Opinion == vote.Like); err != nil {
fcob.Events.Error.Trigger(err)
}
......
package metrics
import "github.com/iotaledger/hive.go/events"
import (
"github.com/iotaledger/hive.go/events"
)
// CollectionEvents defines the events fot the metrics package
type CollectionEvents struct {
......@@ -11,6 +13,29 @@ type CollectionEvents struct {
MemUsage *events.Event
DBSize *events.Event
Synced *events.Event
ValueTips *events.Event
MessageTips *events.Event
QueryReceived *events.Event
QueryReplyError *events.Event
}
// QueryReceivedEvent is used to pass information through a QueryReceived event.
type QueryReceivedEvent struct {
OpinionCount int
}
// QueryReplyErrorEvent is used to pass information through a QueryReplyError event.
type QueryReplyErrorEvent struct {
ID string
OpinionCount int
}
func queryReceivedEventCaller(handler interface{}, params ...interface{}) {
handler.(func(ev *QueryReceivedEvent))(params[0].(*QueryReceivedEvent))
}
func queryReplyErrorEventCaller(handler interface{}, params ...interface{}) {
handler.(func(ev *QueryReplyErrorEvent))(params[0].(*QueryReplyErrorEvent))
}
func uint64Caller(handler interface{}, params ...interface{}) {
......
......@@ -20,6 +20,10 @@ func new() *CollectionEvents {
MemUsage: events.NewEvent(uint64Caller),
DBSize: events.NewEvent(uint64Caller),
Synced: events.NewEvent(boolCaller),
ValueTips: events.NewEvent(uint64Caller),
MessageTips: events.NewEvent(uint64Caller),
QueryReceived: events.NewEvent(queryReceivedEventCaller),
QueryReplyError: events.NewEvent(queryReplyErrorEventCaller),
}
}
......
......@@ -170,12 +170,12 @@ func (f *FPC) finalizeOpinions() {
defer f.ctxsMu.Unlock()
for id, voteCtx := range f.ctxs {
if voteCtx.IsFinalized(f.paras.CoolingOffPeriod, f.paras.FinalizationThreshold) {
f.events.Finalized.Trigger(id, voteCtx.LastOpinion())
f.events.Finalized.Trigger(&vote.OpinionEvent{ID: id, Opinion: voteCtx.LastOpinion(), Ctx: *voteCtx})
delete(f.ctxs, id)
continue
}
if voteCtx.Rounds >= f.paras.MaxRoundsPerVoteContext {
f.events.Failed.Trigger(id, voteCtx.LastOpinion())
f.events.Failed.Trigger(&vote.OpinionEvent{ID: id, Opinion: voteCtx.LastOpinion(), Ctx: *voteCtx})
delete(f.ctxs, id)
}
}
......
......@@ -96,8 +96,8 @@ func TestFPCFinalizedEvent(t *testing.T) {
paras.QuerySampleSize = 1
voter := fpc.New(opinionGiverFunc, paras)
var finalizedOpinion *vote.Opinion
voter.Events().Finalized.Attach(events.NewClosure(func(id string, opinion vote.Opinion) {
finalizedOpinion = &opinion
voter.Events().Finalized.Attach(events.NewClosure(func(ev *vote.OpinionEvent) {
finalizedOpinion = &ev.Opinion
}))
assert.NoError(t, voter.Vote(id, vote.Like))
......@@ -129,8 +129,8 @@ func TestFPCFailedEvent(t *testing.T) {
paras.FinalizationThreshold = 4
voter := fpc.New(opinionGiverFunc, paras)
var failedOpinion *vote.Opinion
voter.Events().Failed.Attach(events.NewClosure(func(id string, opinion vote.Opinion) {
failedOpinion = &opinion
voter.Events().Failed.Attach(events.NewClosure(func(ev *vote.OpinionEvent) {
failedOpinion = &ev.Opinion
}))
assert.NoError(t, voter.Vote(id, vote.Like))
......@@ -170,8 +170,8 @@ func TestFPCVotingMultipleOpinionGivers(t *testing.T) {
paras.CoolingOffPeriod = 2
voter := fpc.New(opinionGiverFunc, paras)
var finalOpinion *vote.Opinion
voter.Events().Finalized.Attach(events.NewClosure(func(id string, finalizedOpinion vote.Opinion) {
finalOpinion = &finalizedOpinion
voter.Events().Finalized.Attach(events.NewClosure(func(ev *vote.OpinionEvent) {
finalOpinion = &ev.Opinion
}))
assert.NoError(t, voter.Vote(test.id, test.initOpinion))
......
......@@ -5,6 +5,7 @@ import (
"net"
"github.com/golang/protobuf/proto"
"github.com/iotaledger/goshimmer/packages/metrics"
"github.com/iotaledger/goshimmer/packages/vote"
"github.com/iotaledger/hive.go/events"
"google.golang.org/grpc"
......@@ -62,6 +63,7 @@ func (vs *VoterServer) Opinion(ctx context.Context, req *QueryRequest) (*QueryRe
vs.netEventTX.Trigger(proto.Size(reply))
}
metrics.Events().QueryReceived.Trigger(&metrics.QueryReceivedEvent{OpinionCount: len(req.Id)})
return reply, nil
}
......
......@@ -57,9 +57,19 @@ type RoundStats struct {
QueriedOpinions []QueriedOpinions `json:"queried_opinions"`
}
// OpinionCaller calls the given handler with an Opinion and its associated ID.
// OpinionEvent is the struct containing data to be passed around with Finalized and Failed events.
type OpinionEvent struct {
// ID is the of the conflict
ID string
// Opinion is an opinion about a conflict
Opinion Opinion
// Ctx contains all relevant infos regarding the conflict.
Ctx Context
}
// OpinionCaller calls the given handler with an OpinionEvent (containing its opinions, its associated ID and context).
func OpinionCaller(handler interface{}, params ...interface{}) {
handler.(func(id string, opinion Opinion))(params[0].(string), params[1].(Opinion))
handler.(func(ev *OpinionEvent))(params[0].(*OpinionEvent))
}
// RoundStats calls the given handler with a RoundStats.
......
......@@ -8,11 +8,14 @@ import (
var Events = pluginEvents{
// ReceivedMPSUpdated triggers upon reception of a MPS update.
ReceivedMPSUpdated: events.NewEvent(uint64EventCaller),
ReceivedTPSUpdated: events.NewEvent(uint64EventCaller),
}
type pluginEvents struct {
// Fired when the messages per second metric is updated.
ReceivedMPSUpdated *events.Event
// Fired when the transactions per second metric is updated.
ReceivedTPSUpdated *events.Event
}
func uint64EventCaller(handler interface{}, params ...interface{}) {
......
package metrics
import (
"sync/atomic"
"github.com/iotaledger/goshimmer/packages/metrics"
"github.com/iotaledger/goshimmer/packages/vote"
"github.com/iotaledger/hive.go/syncutils"
)
var activeConflicts uint64
var finalizedConflictCount uint64
var failedConflictCount uint64
var averageRoundsToFinalize float64
var avLock syncutils.RWMutex
// QueryReceivedCount is the number of queries received (each query can contain multiple conflicts to give an opinion about)
var QueryReceivedCount uint64
// OpinionQueryReceivedCount is the number of opinion queries received (multiple in one query)
var OpinionQueryReceivedCount uint64
// QueryReplyErrorCount counts how many times we haven't received an answer for our query. (each query reply can contain multiple conflicts to get an opinion about)
var QueryReplyErrorCount uint64
// OpinionQueryReplyErrorCount counts how many opinions we asked for but never heard back (multiple opinions in one query)
var OpinionQueryReplyErrorCount uint64
// ActiveConflicts returns the number of currently active conflicts.
func ActiveConflicts() uint64 {
return atomic.LoadUint64(&activeConflicts)
}
// FinalizedConflict returns the number of finalized conflicts since the start of the node.
func FinalizedConflict() uint64 {
return atomic.LoadUint64(&finalizedConflictCount)
}
// FailedConflicts returns the number of failed conflicts since the start of the node.
func FailedConflicts() uint64 {
return atomic.LoadUint64(&failedConflictCount)
}
// AverageRoundsToFinalize returns the average number fo rounds it takes to finalize conflicts since the start of the node.
func AverageRoundsToFinalize() float64 {
avLock.RLock()
defer avLock.RUnlock()
return averageRoundsToFinalize
}
// FPCQueryReceived returns the number of received voting queries. For an exact number of opinion queries, use FPCOpinionQueryReceived().
func FPCQueryReceived() uint64 {
return atomic.LoadUint64(&QueryReceivedCount)
}
// FPCOpinionQueryReceived returns the number of received opinion queries.
func FPCOpinionQueryReceived() uint64 {
return atomic.LoadUint64(&OpinionQueryReceivedCount)
}
// FPCQueryReplyErrors returns the number of sent but unanswered queries for conflict opinions. For an exact number of failed opinions, use FPCOpinionQueryReplyErrors().
func FPCQueryReplyErrors() uint64 {
return atomic.LoadUint64(&QueryReplyErrorCount)
}
// FPCOpinionQueryReplyErrors returns the number of opinions that the node failed to gather from peers.
func FPCOpinionQueryReplyErrors() uint64 {
return atomic.LoadUint64(&OpinionQueryReplyErrorCount)
}
//// logic broken into "process..." functions to be able to write unit tests ////
func processRoundStats(stats vote.RoundStats) {
// get the number of active conflicts
numActive := (uint64)(len(stats.ActiveVoteContexts))
atomic.StoreUint64(&activeConflicts, numActive)
}
func processFinalized(ctx vote.Context) {
avLock.Lock()
defer avLock.Unlock()
// calculate sum of all rounds, including the currently finalized
sumRounds := averageRoundsToFinalize*(float64)(atomic.LoadUint64(&finalizedConflictCount)) + (float64)(ctx.Rounds)
// increase finalized counter
atomic.AddUint64(&finalizedConflictCount, 1)
// calculate new average
averageRoundsToFinalize = sumRounds / (float64)(atomic.LoadUint64(&finalizedConflictCount))
}
func processFailed(ctx vote.Context) {
atomic.AddUint64(&failedConflictCount, 1)
}
func processQueryReceived(ev *metrics.QueryReceivedEvent) {
// received one query
atomic.AddUint64(&QueryReceivedCount, 1)
// containing this many conflicts to give opinion about
atomic.AddUint64(&OpinionQueryReceivedCount, (uint64)(ev.OpinionCount))
}
func processQueryReplyError(ev *metrics.QueryReplyErrorEvent) {
// received one query
atomic.AddUint64(&QueryReplyErrorCount, 1)
// containing this many conflicts to give opinion about
atomic.AddUint64(&OpinionQueryReplyErrorCount, (uint64)(ev.OpinionCount))
}
package metrics
import (
"testing"
"github.com/iotaledger/goshimmer/packages/metrics"
"github.com/iotaledger/goshimmer/packages/vote"
"github.com/magiconair/properties/assert"
)
func TestActiveConflicts(t *testing.T) {
// initialized to 0
assert.Equal(t, ActiveConflicts(), (uint64)(0))
stats := vote.RoundStats{
ActiveVoteContexts: map[string]*vote.Context{
"test1": &vote.Context{},
"test2": &vote.Context{},
"test3": &vote.Context{},
},
}
processRoundStats(stats)
assert.Equal(t, ActiveConflicts(), (uint64)(3))
}
func TestFailedConflicts(t *testing.T) {
// initialized to 0
assert.Equal(t, FailedConflicts(), (uint64)(0))
// simulate 10 failed conflicts
for i := 0; i < 10; i++ {
processFailed(vote.Context{})
}
assert.Equal(t, FailedConflicts(), (uint64)(10))
// simulate 10 failed conflicts
for i := 0; i < 10; i++ {
processFailed(vote.Context{})
}
assert.Equal(t, FailedConflicts(), (uint64)(20))
}
func TestFinalize(t *testing.T) {
assert.Equal(t, AverageRoundsToFinalize(), 0.0)
assert.Equal(t, FinalizedConflict(), (uint64)(0))
// simulate 5 finalized conflicts with 5 rounds
for i := 0; i < 5; i++ {
processFinalized(vote.Context{
Rounds: 5,
})
}
assert.Equal(t, FinalizedConflict(), (uint64)(5))
// simulate 5 finalized conflicts with 10 rounds
for i := 0; i < 5; i++ {
processFinalized(vote.Context{
Rounds: 10,
})
}
assert.Equal(t, FinalizedConflict(), (uint64)(10))
// => average should be 7.5
assert.Equal(t, AverageRoundsToFinalize(), 7.5)
}
func TestQueryReceived(t *testing.T) {
assert.Equal(t, FPCQueryReceived(), (uint64)(0))
assert.Equal(t, FPCOpinionQueryReceived(), (uint64)(0))
processQueryReceived(&metrics.QueryReceivedEvent{OpinionCount: 5})
assert.Equal(t, FPCQueryReceived(), (uint64)(1))
assert.Equal(t, FPCOpinionQueryReceived(), (uint64)(5))
processQueryReceived(&metrics.QueryReceivedEvent{OpinionCount: 5})
assert.Equal(t, FPCQueryReceived(), (uint64)(2))
assert.Equal(t, FPCOpinionQueryReceived(), (uint64)(10))
}
func TestQueryReplyError(t *testing.T) {
assert.Equal(t, FPCQueryReplyErrors(), (uint64)(0))
assert.Equal(t, FPCOpinionQueryReplyErrors(), (uint64)(0))
processQueryReplyError(&metrics.QueryReplyErrorEvent{OpinionCount: 5})
assert.Equal(t, FPCQueryReplyErrors(), (uint64)(1))
assert.Equal(t, FPCOpinionQueryReplyErrors(), (uint64)(5))
processQueryReplyError(&metrics.QueryReplyErrorEvent{OpinionCount: 5})
assert.Equal(t, FPCQueryReplyErrors(), (uint64)(2))
assert.Equal(t, FPCOpinionQueryReplyErrors(), (uint64)(10))
}
package metrics
import (
"sync/atomic"
"github.com/iotaledger/goshimmer/packages/binary/messagelayer/payload"
"github.com/iotaledger/goshimmer/packages/metrics"
"github.com/iotaledger/goshimmer/plugins/messagelayer"
"github.com/iotaledger/hive.go/syncutils"
)
// counter for the received MPS
var mpsReceivedSinceLastMeasurement uint64
// measured value of the received MPS
var measuredReceivedMPS uint64
// Total number of processed messages since start of the node.
var messageTotalCount uint64
// current number of message tips
var messageTips uint64
// MPS figures for different payload types
var mpsPerPayloadSinceLastMeasurement = make(map[payload.Type]uint64)
var mpsPerPayloadMeasured = make(map[payload.Type]uint64)
// Number of messages per payload type since start of the node.
var messageCountPerPayload = make(map[payload.Type]uint64)
// protect maps from concurrent read/write
var messageLock syncutils.RWMutex
////// Exported functions to obtain metrics from outside //////
// MessageTotalCount returns the total number of messages seen since the start of the node.
func MessageTotalCount() uint64 {
return atomic.LoadUint64(&messageTotalCount)
}
// MPS retrieves the current messages per second number.
func MPS() uint64 {
return atomic.LoadUint64(&measuredReceivedMPS)
}
// MessageCountPerPayload returns a map of message payload types and their count since the start of the node.
func MessageCountPerPayload() map[payload.Type]uint64 {
messageLock.RLock()
defer messageLock.RUnlock()
// copy the original map
target := make(map[payload.Type]uint64)
for key, element := range messageCountPerPayload {
target[key] = element
}
return target
}
// MPSPerPayload returns a map of message payload types and their corresponding MPS values.
func MPSPerPayload() map[payload.Type]uint64 {
messageLock.RLock()
defer messageLock.RUnlock()
// copy the original map
target := make(map[payload.Type]uint64)
for key, element := range mpsPerPayloadMeasured {
target[key] = element
}
return target
}
// MessageTips returns the actual number of tips in the message tangle.
func MessageTips() uint64 {
return atomic.LoadUint64(&messageTips)
}
////// Handling data updates and measuring //////
// increases the received MPS counter
func increaseReceivedMPSCounter() {
atomic.AddUint64(&mpsReceivedSinceLastMeasurement, 1)
}
// measures the received MPS value
func measureReceivedMPS() {
// sample the current counter value into a measured MPS value
sampledMPS := atomic.LoadUint64(&mpsReceivedSinceLastMeasurement)
// store the measured value
atomic.StoreUint64(&measuredReceivedMPS, sampledMPS)
// reset the counter
atomic.StoreUint64(&mpsReceivedSinceLastMeasurement, 0)
// trigger events for outside listeners
Events.ReceivedMPSUpdated.Trigger(sampledMPS)
}
func increasePerPayloadMPSCounter(p payload.Type) {
messageLock.Lock()
defer messageLock.Unlock()
// init counter with zero value if we see the payload for the first time
if _, exist := mpsPerPayloadSinceLastMeasurement[p]; !exist {
mpsPerPayloadSinceLastMeasurement[p] = 0
mpsPerPayloadMeasured[p] = 0
messageCountPerPayload[p] = 0
}
// else just update value
mpsPerPayloadSinceLastMeasurement[p]++
// increase cumulative metrics
messageCountPerPayload[p]++
atomic.AddUint64(&messageTotalCount, 1)
}
func measureMPSPerPayload() {
messageLock.Lock()
defer messageLock.Unlock()
for payloadType, sinceLast := range mpsPerPayloadSinceLastMeasurement {
// measure
mpsPerPayloadMeasured[payloadType] = sinceLast
// reset counter
mpsPerPayloadSinceLastMeasurement[payloadType] = 0
}
}
func measureMessageTips() {
metrics.Events().MessageTips.Trigger((uint64)(messagelayer.TipSelector.TipCount()))
}
package metrics
import (
"sync"
"sync/atomic"
"testing"
valuepayload "github.com/iotaledger/goshimmer/dapps/valuetransfers/packages/payload"
drngpayload "github.com/iotaledger/goshimmer/packages/binary/drng/payload"
"github.com/iotaledger/goshimmer/packages/binary/messagelayer/payload"
"github.com/iotaledger/goshimmer/packages/binary/messagelayer/tipselector"
"github.com/iotaledger/goshimmer/packages/metrics"
"github.com/iotaledger/goshimmer/plugins/messagelayer"
"github.com/iotaledger/hive.go/events"
"github.com/magiconair/properties/assert"
)
func TestReceivedMessagesPerSecond(t *testing.T) {
// simulate attaching 10 value payloads in 0s < t < 1s
for i := 0; i < 10; i++ {
increaseReceivedMPSCounter()
}
// first measurement happens at t=1s
measureReceivedMPS()
// simulate 5 TPS for 1s < t < 2s
for i := 0; i < 5; i++ {
increaseReceivedMPSCounter()
}
assert.Equal(t, MPS(), (uint64)(10))
// measure at t=2s
measureReceivedMPS()
assert.Equal(t, MPS(), (uint64)(5))
// measure at t=3s
measureReceivedMPS()
assert.Equal(t, MPS(), (uint64)(0))
}
func TestReceivedMPSUpdatedEvent(t *testing.T) {
var wg sync.WaitGroup
Events.ReceivedMPSUpdated.Attach(events.NewClosure(func(mps uint64) {
assert.Equal(t, mps, (uint64)(10))
wg.Done()
}))
// simulate attaching 10 value payloads in 0s < t < 1s
for i := 0; i < 10; i++ {
increaseReceivedMPSCounter()
}
wg.Add(1)
measureReceivedMPS()
wg.Wait()
}
func TestMPSPerPayload(t *testing.T) {
// it is empty initially
assert.Equal(t, MPSPerPayload(), map[payload.Type]uint64{})
assert.Equal(t, MessageTotalCount(), (uint64)(0))
// simulate attaching 10 value payloads in 0s < t < 1s
for i := 0; i < 10; i++ {
increasePerPayloadMPSCounter(valuepayload.Type)
}
assert.Equal(t, MessageTotalCount(), (uint64)(10))
assert.Equal(t, MessageCountPerPayload(), map[payload.Type]uint64{valuepayload.Type: 10})
// simulate attaching 5 drng payloads
for i := 0; i < 5; i++ {
increasePerPayloadMPSCounter(drngpayload.Type)
}
assert.Equal(t, MessageTotalCount(), (uint64)(15))
assert.Equal(t, MessageCountPerPayload(), map[payload.Type]uint64{valuepayload.Type: 10, drngpayload.Type: 5})
// test measurement
measureMPSPerPayload()
assert.Equal(t, MPSPerPayload(), map[payload.Type]uint64{valuepayload.Type: 10, drngpayload.Type: 5})
// test counter reset on measurement
measureMPSPerPayload()
assert.Equal(t, MPSPerPayload(), map[payload.Type]uint64{valuepayload.Type: 0, drngpayload.Type: 0})
assert.Equal(t, MessageCountPerPayload(), map[payload.Type]uint64{valuepayload.Type: 10, drngpayload.Type: 5})
}
func TestMessageTips(t *testing.T) {
var wg sync.WaitGroup
// messagelayer TipSelector not configured here, so to avoid nil pointer panic, we instantiate it
messagelayer.TipSelector = tipselector.New()
metrics.Events().MessageTips.Attach(events.NewClosure(func(tips uint64) {
atomic.StoreUint64(&messageTips, tips)
wg.Done()
}))
wg.Add(1)
measureMessageTips()
wg.Wait()
assert.Equal(t, MessageTips(), (uint64)(0))
}
package metrics
import "time"
const (
// should always be 1 second
MPSMeasurementInterval = 1 * time.Second
TPSMeasurementInterval = 1 * time.Second
// can be adjusted as wished
MessageTipsMeasurementInterval = 1 * time.Second
ValueTipsMeasurementInterval = 1 * time.Second
CPUUsageMeasurementInterval = 1 * time.Second
MemUsageMeasurementInterval = 1 * time.Second
SyncedMeasurementInterval = 1 * time.Second
)
package metrics
import (
"sync/atomic"
"time"
"github.com/iotaledger/goshimmer/dapps/valuetransfers"
"github.com/iotaledger/goshimmer/dapps/valuetransfers/packages/payload"
valuetangle "github.com/iotaledger/goshimmer/dapps/valuetransfers/packages/tangle"
"github.com/iotaledger/goshimmer/packages/binary/messagelayer/message"
"github.com/iotaledger/goshimmer/packages/binary/messagelayer/tangle"
"github.com/iotaledger/goshimmer/packages/metrics"
"github.com/iotaledger/goshimmer/packages/shutdown"
"github.com/iotaledger/goshimmer/packages/vote"
"github.com/iotaledger/goshimmer/plugins/autopeering"
"github.com/iotaledger/goshimmer/plugins/gossip"
"github.com/iotaledger/goshimmer/plugins/messagelayer"
......@@ -27,12 +32,42 @@ var log *logger.Logger
func configure(_ *node.Plugin) {
log = logger.NewLogger(PluginName)
//// Events declared in other packages which we want to listen to here ////
// increase received MPS counter whenever we attached a message
messagelayer.Tangle.Events.MessageAttached.Attach(events.NewClosure(func(cachedMessage *message.CachedMessage, cachedMessageMetadata *tangle.CachedMessageMetadata) {
_payloadType := cachedMessage.Unwrap().Payload().Type()
cachedMessage.Release()
cachedMessageMetadata.Release()
increaseReceivedMPSCounter()
increasePerPayloadMPSCounter(_payloadType)
}))
// Value payload attached
valuetransfers.Tangle.Events.PayloadAttached.Attach(events.NewClosure(func(cachedPayload *payload.CachedPayload, cachedPayloadMetadata *valuetangle.CachedPayloadMetadata) {
cachedPayload.Release()
cachedPayloadMetadata.Release()
increaseReceivedTPSCounter()
}))
// FPC round executed
valuetransfers.Voter().Events().RoundExecuted.Attach(events.NewClosure(func(roundStats vote.RoundStats) {
processRoundStats(roundStats)
}))
// a conflict has been finalized
valuetransfers.Voter().Events().Finalized.Attach(events.NewClosure(func(ev *vote.OpinionEvent) {
processFinalized(ev.Ctx)
}))
// consensus failure in conflict resolution
valuetransfers.Voter().Events().Failed.Attach(events.NewClosure(func(ev *vote.OpinionEvent) {
processFailed(ev.Ctx)
}))
//// Events coming from metrics package ////
metrics.Events().FPCInboundBytes.Attach(events.NewClosure(func(amountBytes uint64) {
_FPCInboundBytes.Add(amountBytes)
}))
......@@ -58,6 +93,20 @@ func configure(_ *node.Plugin) {
autopeering.Selection().Events().IncomingPeering.Attach(onAutopeeringSelection)
autopeering.Selection().Events().OutgoingPeering.Attach(onAutopeeringSelection)
metrics.Events().MessageTips.Attach(events.NewClosure(func(tipsCount uint64) {
atomic.StoreUint64(&messageTips, tipsCount)
}))
metrics.Events().ValueTips.Attach(events.NewClosure(func(tipsCount uint64) {
atomic.StoreUint64(&valueTips, tipsCount)
}))
metrics.Events().QueryReceived.Attach(events.NewClosure(func(ev *metrics.QueryReceivedEvent) {
processQueryReceived(ev)
}))
metrics.Events().QueryReplyError.Attach(events.NewClosure(func(ev *metrics.QueryReplyErrorEvent) {
processQueryReplyError(ev)
}))
}
func run(_ *node.Plugin) {
......@@ -75,6 +124,10 @@ func run(_ *node.Plugin) {
gossipCurrentTx.Store(uint64(g.BytesWritten))
}, 1*time.Second, shutdownSignal)
timeutil.Ticker(measureMPSPerPayload, MPSMeasurementInterval, shutdownSignal)
timeutil.Ticker(measureMessageTips, MessageTipsMeasurementInterval, shutdownSignal)
timeutil.Ticker(measureReceivedTPS, TPSMeasurementInterval, shutdownSignal)
timeutil.Ticker(measureValueTips, ValueTipsMeasurementInterval, shutdownSignal)
}, shutdown.PriorityMetrics); err != nil {
log.Panicf("Failed to start as daemon: %s", err)
}
......
package metrics
import (
"sync/atomic"
"github.com/iotaledger/goshimmer/dapps/valuetransfers"
"github.com/iotaledger/goshimmer/packages/metrics"
)
// TPS retrieves the current transactions (value payloads) per second number.
func TPS() uint64 {
return atomic.LoadUint64(&measuredReceivedTPS)
}
// counter for the received TPS (transactions per second)
var tpsReceivedSinceLastMeasurement uint64
// measured value of the received MPS
var measuredReceivedTPS uint64
// current number of value tips
var valueTips uint64
// increases the received TPS counter
func increaseReceivedTPSCounter() {
atomic.AddUint64(&tpsReceivedSinceLastMeasurement, 1)
}
// measures the received TPS value
func measureReceivedTPS() {
// sample the current counter value into a measured TPS value
sampledTPS := atomic.LoadUint64(&tpsReceivedSinceLastMeasurement)
// store the measured value
atomic.StoreUint64(&measuredReceivedTPS, sampledTPS)
// reset the counter
atomic.StoreUint64(&tpsReceivedSinceLastMeasurement, 0)
// trigger events for outside listeners
Events.ReceivedTPSUpdated.Trigger(sampledTPS)
}
func measureValueTips() {
metrics.Events().ValueTips.Trigger((uint64)(valuetransfers.TipManager().Size()))
}
// ValueTips returns the actual number of tips in the value tangle.
func ValueTips() uint64 {
return atomic.LoadUint64(&valueTips)
}
package metrics
import (
"sync"
"sync/atomic"
"testing"
"github.com/iotaledger/goshimmer/packages/metrics"
"github.com/iotaledger/hive.go/events"
"github.com/magiconair/properties/assert"
)
func TestReceivedTransactionsPerSecond(t *testing.T) {
// simulate attaching 10 value payloads in 0s < t < 1s
for i := 0; i < 10; i++ {
increaseReceivedTPSCounter()
}
// first measurement happens at t=1s
measureReceivedTPS()
// simulate 5 TPS for 1s < t < 2s
for i := 0; i < 5; i++ {
increaseReceivedTPSCounter()
}
assert.Equal(t, TPS(), (uint64)(10))
// measure at t=2s
measureReceivedTPS()
assert.Equal(t, TPS(), (uint64)(5))
// measure at t=3s
measureReceivedTPS()
assert.Equal(t, TPS(), (uint64)(0))
}
func TestReceivedTPSUpdatedEvent(t *testing.T) {
var wg sync.WaitGroup
Events.ReceivedTPSUpdated.Attach(events.NewClosure(func(tps uint64) {
assert.Equal(t, tps, (uint64)(10))
wg.Done()
}))
// simulate attaching 10 value payloads in 0s < t < 1s
for i := 0; i < 10; i++ {
increaseReceivedTPSCounter()
}
wg.Add(1)
measureReceivedTPS()
wg.Wait()
}
func TestValueTips(t *testing.T) {
var wg sync.WaitGroup
metrics.Events().ValueTips.Attach(events.NewClosure(func(tips uint64) {
atomic.StoreUint64(&valueTips, tips)
wg.Done()
}))
wg.Add(1)
measureValueTips()
wg.Wait()
assert.Equal(t, ValueTips(), (uint64)(0))
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment