Skip to content
Snippets Groups Projects
Unverified Commit 35959853 authored by Levente Pap's avatar Levente Pap
Browse files

Measure voting queries (received, unreplied) and number of opinions

parent d8bd71f6
No related branches found
No related tags found
No related merge requests found
...@@ -3,13 +3,13 @@ package valuetransfers ...@@ -3,13 +3,13 @@ package valuetransfers
import ( import (
"context" "context"
"fmt" "fmt"
"github.com/golang/protobuf/proto"
"github.com/iotaledger/goshimmer/packages/metrics"
"net" "net"
"strconv" "strconv"
"sync" "sync"
"github.com/golang/protobuf/proto"
"github.com/iotaledger/goshimmer/dapps/valuetransfers/packages/branchmanager" "github.com/iotaledger/goshimmer/dapps/valuetransfers/packages/branchmanager"
"github.com/iotaledger/goshimmer/packages/metrics"
"github.com/iotaledger/goshimmer/packages/prng" "github.com/iotaledger/goshimmer/packages/prng"
"github.com/iotaledger/goshimmer/packages/shutdown" "github.com/iotaledger/goshimmer/packages/shutdown"
"github.com/iotaledger/goshimmer/packages/vote" "github.com/iotaledger/goshimmer/packages/vote"
...@@ -185,7 +185,10 @@ func (pog *PeerOpinionGiver) Query(ctx context.Context, ids []string) (vote.Opin ...@@ -185,7 +185,10 @@ func (pog *PeerOpinionGiver) Query(ctx context.Context, ids []string) (vote.Opin
query := &votenet.QueryRequest{Id: ids} query := &votenet.QueryRequest{Id: ids}
reply, err := client.Opinion(ctx, query) reply, err := client.Opinion(ctx, query)
if err != nil { if err != nil {
// TODO: add an event QueryErrorEvent (metrics) metrics.Events().QueryReplyError.Trigger(&metrics.QueryReplyErrorEvent{
ID: pog.p.ID().String(),
OpinionCount: len(ids),
})
return nil, fmt.Errorf("unable to query opinions: %w", err) return nil, fmt.Errorf("unable to query opinions: %w", err)
} }
......
...@@ -13,6 +13,27 @@ type CollectionEvents struct { ...@@ -13,6 +13,27 @@ type CollectionEvents struct {
Synced *events.Event Synced *events.Event
ValueTips *events.Event ValueTips *events.Event
MessageTips *events.Event MessageTips *events.Event
QueryReceived *events.Event
QueryReplyError *events.Event
}
// QueryReceivedEvent is used to pass information through a QueryReceived event.
type QueryReceivedEvent struct {
OpinionCount int
}
// QueryReplyErrorEvent is used to pass information through a QueryReplyError event.
type QueryReplyErrorEvent struct {
ID string
OpinionCount int
}
func queryReceivedEventCaller(handler interface{}, params ...interface{}) {
handler.(func(ev *QueryReceivedEvent))(params[0].(*QueryReceivedEvent))
}
func queryReplyErrorEventCaller(handler interface{}, params ...interface{}) {
handler.(func(ev *QueryReplyErrorEvent))(params[0].(*QueryReplyErrorEvent))
} }
func uint64Caller(handler interface{}, params ...interface{}) { func uint64Caller(handler interface{}, params ...interface{}) {
......
...@@ -20,6 +20,8 @@ func new() *CollectionEvents { ...@@ -20,6 +20,8 @@ func new() *CollectionEvents {
events.NewEvent(boolCaller), events.NewEvent(boolCaller),
events.NewEvent(uint64Caller), events.NewEvent(uint64Caller),
events.NewEvent(uint64Caller), events.NewEvent(uint64Caller),
events.NewEvent(queryReceivedEventCaller),
events.NewEvent(queryReplyErrorEventCaller),
} }
} }
......
...@@ -2,10 +2,10 @@ package net ...@@ -2,10 +2,10 @@ package net
import ( import (
"context" "context"
"github.com/golang/protobuf/proto"
"github.com/iotaledger/goshimmer/packages/metrics"
"net" "net"
"github.com/golang/protobuf/proto"
"github.com/iotaledger/goshimmer/packages/metrics"
"github.com/iotaledger/goshimmer/packages/vote" "github.com/iotaledger/goshimmer/packages/vote"
"google.golang.org/grpc" "google.golang.org/grpc"
) )
...@@ -46,7 +46,7 @@ func (vs *VoterServer) Opinion(ctx context.Context, req *QueryRequest) (*QueryRe ...@@ -46,7 +46,7 @@ func (vs *VoterServer) Opinion(ctx context.Context, req *QueryRequest) (*QueryRe
reply.Opinion[i] = int32(vs.opnRetriever(id)) reply.Opinion[i] = int32(vs.opnRetriever(id))
} }
// trigger metrics event metrics.Events().QueryReceived.Trigger(&metrics.QueryReceivedEvent{OpinionCount: len(req.Id)})
metrics.Events().FPCInboundBytes.Trigger(proto.Size(req)) metrics.Events().FPCInboundBytes.Trigger(proto.Size(req))
metrics.Events().FPCOutboundBytes.Trigger(proto.Size(reply)) metrics.Events().FPCOutboundBytes.Trigger(proto.Size(reply))
return reply, nil return reply, nil
......
...@@ -3,9 +3,9 @@ package metrics ...@@ -3,9 +3,9 @@ package metrics
import ( import (
"sync/atomic" "sync/atomic"
"github.com/iotaledger/hive.go/syncutils" "github.com/iotaledger/goshimmer/packages/metrics"
"github.com/iotaledger/goshimmer/packages/vote" "github.com/iotaledger/goshimmer/packages/vote"
"github.com/iotaledger/hive.go/syncutils"
) )
var activeConflicts uint64 var activeConflicts uint64
...@@ -14,6 +14,18 @@ var failedConflictCount uint64 ...@@ -14,6 +14,18 @@ var failedConflictCount uint64
var averageRoundsToFinalize float64 var averageRoundsToFinalize float64
var avLock syncutils.RWMutex var avLock syncutils.RWMutex
// QueryReceivedCount is the number of queries received (each query can contain multiple conflicts to give an opinion about)
var QueryReceivedCount uint64
// OpinionQueryReceivedCount is the number of opinion queries received (multiple in one query)
var OpinionQueryReceivedCount uint64
// QueryReplyErrorCount counts how many times we haven't received an answer for our query. (each query reply can contain multiple conflicts to get an opinion about)
var QueryReplyErrorCount uint64
// OpinionQueryReplyErrorCount counts how many opinions we asked for but never heard back (multiple opinions in one query)
var OpinionQueryReplyErrorCount uint64
// ActiveConflicts returns the number of currently active conflicts. // ActiveConflicts returns the number of currently active conflicts.
func ActiveConflicts() uint64 { func ActiveConflicts() uint64 {
return atomic.LoadUint64(&activeConflicts) return atomic.LoadUint64(&activeConflicts)
...@@ -36,6 +48,26 @@ func AverageRoundsToFinalize() float64 { ...@@ -36,6 +48,26 @@ func AverageRoundsToFinalize() float64 {
return averageRoundsToFinalize return averageRoundsToFinalize
} }
// FPCQueryReceived returns the number of received voting queries. For an exact number of opinion queries, use FPCOpinionQueryReceived().
func FPCQueryReceived() uint64 {
return atomic.LoadUint64(&QueryReceivedCount)
}
// FPCOpinionQueryReceived returns the number of received opinion queries.
func FPCOpinionQueryReceived() uint64 {
return atomic.LoadUint64(&OpinionQueryReceivedCount)
}
// FPCQueryReplyErrors returns the number of sent but unanswered queries for conflict opinions. For an exact number of failed opinions, use FPCOpinionQueryReplyErrors().
func FPCQueryReplyErrors() uint64 {
return atomic.LoadUint64(&QueryReplyErrorCount)
}
// FPCOpinionQueryReplyErrors returns the number of opinions that the node failed to gather from peers.
func FPCOpinionQueryReplyErrors() uint64 {
return atomic.LoadUint64(&OpinionQueryReplyErrorCount)
}
//// logic broken into "process..." functions to be able to write unit tests //// //// logic broken into "process..." functions to be able to write unit tests ////
func processRoundStats(stats vote.RoundStats) { func processRoundStats(stats vote.RoundStats) {
...@@ -58,3 +90,17 @@ func processFinalized(ctx vote.Context) { ...@@ -58,3 +90,17 @@ func processFinalized(ctx vote.Context) {
func processFailed(ctx vote.Context) { func processFailed(ctx vote.Context) {
atomic.AddUint64(&failedConflictCount, 1) atomic.AddUint64(&failedConflictCount, 1)
} }
func processQueryReceived(ev *metrics.QueryReceivedEvent) {
// received one query
atomic.AddUint64(&QueryReceivedCount, 1)
// containing this many conflicts to give opinion about
atomic.AddUint64(&OpinionQueryReceivedCount, (uint64)(ev.OpinionCount))
}
func processQueryReplyError(ev *metrics.QueryReplyErrorEvent) {
// received one query
atomic.AddUint64(&QueryReplyErrorCount, 1)
// containing this many conflicts to give opinion about
atomic.AddUint64(&OpinionQueryReplyErrorCount, (uint64)(ev.OpinionCount))
}
...@@ -3,8 +3,8 @@ package metrics ...@@ -3,8 +3,8 @@ package metrics
import ( import (
"testing" "testing"
"github.com/iotaledger/goshimmer/packages/metrics"
"github.com/iotaledger/goshimmer/packages/vote" "github.com/iotaledger/goshimmer/packages/vote"
"github.com/magiconair/properties/assert" "github.com/magiconair/properties/assert"
) )
...@@ -57,3 +57,33 @@ func TestFinalize(t *testing.T) { ...@@ -57,3 +57,33 @@ func TestFinalize(t *testing.T) {
// => average should be 7.5 // => average should be 7.5
assert.Equal(t, AverageRoundsToFinalize(), 7.5) assert.Equal(t, AverageRoundsToFinalize(), 7.5)
} }
func TestQueryReceived(t *testing.T) {
assert.Equal(t, FPCQueryReceived(), (uint64)(0))
assert.Equal(t, FPCOpinionQueryReceived(), (uint64)(0))
processQueryReceived(&metrics.QueryReceivedEvent{OpinionCount: 5})
assert.Equal(t, FPCQueryReceived(), (uint64)(1))
assert.Equal(t, FPCOpinionQueryReceived(), (uint64)(5))
processQueryReceived(&metrics.QueryReceivedEvent{OpinionCount: 5})
assert.Equal(t, FPCQueryReceived(), (uint64)(2))
assert.Equal(t, FPCOpinionQueryReceived(), (uint64)(10))
}
func TestQueryReplyError(t *testing.T) {
assert.Equal(t, FPCQueryReplyErrors(), (uint64)(0))
assert.Equal(t, FPCOpinionQueryReplyErrors(), (uint64)(0))
processQueryReplyError(&metrics.QueryReplyErrorEvent{OpinionCount: 5})
assert.Equal(t, FPCQueryReplyErrors(), (uint64)(1))
assert.Equal(t, FPCOpinionQueryReplyErrors(), (uint64)(5))
processQueryReplyError(&metrics.QueryReplyErrorEvent{OpinionCount: 5})
assert.Equal(t, FPCQueryReplyErrors(), (uint64)(2))
assert.Equal(t, FPCOpinionQueryReplyErrors(), (uint64)(10))
}
...@@ -92,6 +92,13 @@ func configure(_ *node.Plugin) { ...@@ -92,6 +92,13 @@ func configure(_ *node.Plugin) {
metrics.Events().ValueTips.Attach(events.NewClosure(func(tipsCount uint64) { metrics.Events().ValueTips.Attach(events.NewClosure(func(tipsCount uint64) {
atomic.StoreUint64(&valueTips, tipsCount) atomic.StoreUint64(&valueTips, tipsCount)
})) }))
metrics.Events().QueryReceived.Attach(events.NewClosure(func(ev *metrics.QueryReceivedEvent) {
processQueryReceived(ev)
}))
metrics.Events().QueryReplyError.Attach(events.NewClosure(func(ev *metrics.QueryReplyErrorEvent) {
processQueryReplyError(ev)
}))
} }
func run(_ *node.Plugin) { func run(_ *node.Plugin) {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment