Skip to content
Snippets Groups Projects
Commit c1993f0e authored by Luca Moser's avatar Luca Moser
Browse files

adds RoundExecuted event with stats about the round

parent bb91e994
No related branches found
No related tags found
No related merge requests found
......@@ -24,13 +24,14 @@ func New(opinionGiverFunc vote.OpinionGiverFunc, paras ...*Parameters) *FPC {
opinionGiverFunc: opinionGiverFunc,
paras: DefaultParameters(),
opinionGiverRng: rand.New(rand.NewSource(time.Now().UnixNano())),
ctxs: make(map[string]*VoteContext),
ctxs: make(map[string]*vote.Context),
queue: list.New(),
queueSet: make(map[string]struct{}),
events: vote.Events{
Finalized: events.NewEvent(vote.OpinionCaller),
Failed: events.NewEvent(vote.OpinionCaller),
Error: events.NewEvent(events.ErrorCaller),
Finalized: events.NewEvent(vote.OpinionCaller),
Failed: events.NewEvent(vote.OpinionCaller),
RoundExecuted: events.NewEvent(vote.RoundStatsCaller),
Error: events.NewEvent(events.ErrorCaller),
},
}
if len(paras) > 0 {
......@@ -50,7 +51,7 @@ type FPC struct {
queueSet map[string]struct{}
queueMu sync.Mutex
// contains the set of current vote contexts.
ctxs map[string]*VoteContext
ctxs map[string]*vote.Context
ctxsMu sync.RWMutex
// parameters to use within FPC.
paras *Parameters
......@@ -71,7 +72,7 @@ func (f *FPC) Vote(id string, initOpn vote.Opinion) error {
if _, alreadyOngoing := f.ctxs[id]; alreadyOngoing {
return fmt.Errorf("%w: %s", ErrVoteAlreadyOngoing, id)
}
f.queue.PushBack(newVoteContext(id, initOpn))
f.queue.PushBack(vote.NewContext(id, initOpn))
f.queueSet[id] = struct{}{}
return nil
}
......@@ -93,6 +94,7 @@ func (f *FPC) Events() vote.Events {
// Round enqueues new items, sets opinions on active vote contexts, finalizes them and then
// queries for opinions.
func (f *FPC) Round(rand float64) error {
start := time.Now()
// enqueue new voting contexts
f.enqueue()
// we can only form opinions when the last round was actually executed successfully
......@@ -104,8 +106,20 @@ func (f *FPC) Round(rand float64) error {
f.finalizeOpinions()
}
// query for opinions on the current vote contexts
err := f.queryOpinions()
f.lastRoundCompletedSuccessfully = err == nil
queriedOpinions, err := f.queryOpinions()
if err == nil {
f.lastRoundCompletedSuccessfully = true
// execute a round executed event
roundStats := &vote.RoundStats{
Duration: time.Since(start),
RandUsed: rand,
ActiveVoteContexts: f.ctxs,
QueriedOpinions: queriedOpinions,
}
// TODO: add possibility to check whether an event handler is registered
// in order to prevent the collection of the round stats data if not needed
f.events.RoundExecuted.Trigger(roundStats)
}
return err
}
......@@ -116,10 +130,10 @@ func (f *FPC) enqueue() {
f.ctxsMu.Lock()
defer f.ctxsMu.Unlock()
for ele := f.queue.Front(); ele != nil; ele = f.queue.Front() {
voteCtx := ele.Value.(*VoteContext)
f.ctxs[voteCtx.id] = voteCtx
voteCtx := ele.Value.(*vote.Context)
f.ctxs[voteCtx.ID] = voteCtx
f.queue.Remove(ele)
delete(f.queueSet, voteCtx.id)
delete(f.queueSet, voteCtx.ID)
}
}
......@@ -168,22 +182,22 @@ func (f *FPC) finalizeOpinions() {
}
// queries the opinions of QuerySampleSize amount of OpinionGivers.
func (f *FPC) queryOpinions() error {
func (f *FPC) queryOpinions() ([]vote.QueriedOpinions, error) {
ids := f.voteContextIDs()
// nothing to vote on
if len(ids) == 0 {
return nil
return nil, nil
}
opinionGivers, err := f.opinionGiverFunc()
if err != nil {
return err
return nil, err
}
// nobody to query
if len(opinionGivers) == 0 {
return ErrNoOpinionGiversAvailable
return nil, ErrNoOpinionGiversAvailable
}
// select a random subset of opinion givers to query.
......@@ -199,6 +213,9 @@ func (f *FPC) queryOpinions() error {
var voteMapMu sync.Mutex
voteMap := map[string]vote.Opinions{}
// holds queried opinions
allQueriedOpinions := []vote.QueriedOpinions{}
// send queries
var wg sync.WaitGroup
for opinionGiverToQuery, selectedCount := range opinionGiversToQuery {
......@@ -216,6 +233,12 @@ func (f *FPC) queryOpinions() error {
return
}
queriedOpinions := vote.QueriedOpinions{
OpinionGiverID: opinionGiverToQuery.ID(),
Opinions: make(map[string]vote.Opinion),
TimesCounted: selectedCount,
}
// add opinions to vote map
voteMapMu.Lock()
defer voteMapMu.Unlock()
......@@ -229,8 +252,10 @@ func (f *FPC) queryOpinions() error {
for j := 0; j < selectedCount; j++ {
votes = append(votes, opinions[i])
}
queriedOpinions.Opinions[id] = opinions[i]
voteMap[id] = votes
}
allQueriedOpinions = append(allQueriedOpinions, queriedOpinions)
}(opinionGiverToQuery, selectedCount)
}
wg.Wait()
......@@ -259,7 +284,7 @@ func (f *FPC) queryOpinions() error {
}
f.ctxs[id].Liked = likedSum / votedCount
}
return nil
return allQueriedOpinions, nil
}
func (f *FPC) voteContextIDs() []string {
......
......@@ -14,16 +14,16 @@ import (
func TestVoteContext_IsFinalized(t *testing.T) {
type testInput struct {
voteCtx fpc.VoteContext
voteCtx vote.Context
coolOffPeriod int
finalizationThreshold int
want bool
}
var tests = []testInput{
{fpc.VoteContext{
{vote.Context{
Opinions: []vote.Opinion{vote.Like, vote.Like, vote.Like, vote.Like, vote.Like},
}, 2, 2, true},
{fpc.VoteContext{
{vote.Context{
Opinions: []vote.Opinion{vote.Like, vote.Like, vote.Like, vote.Like, vote.Dislike},
}, 2, 2, false},
}
......@@ -35,14 +35,14 @@ func TestVoteContext_IsFinalized(t *testing.T) {
func TestVoteContext_LastOpinion(t *testing.T) {
type testInput struct {
voteCtx fpc.VoteContext
voteCtx vote.Context
expected vote.Opinion
}
var tests = []testInput{
{fpc.VoteContext{
{vote.Context{
Opinions: []vote.Opinion{vote.Like, vote.Like, vote.Like, vote.Like},
}, vote.Like},
{fpc.VoteContext{
{vote.Context{
Opinions: []vote.Opinion{vote.Like, vote.Like, vote.Like, vote.Dislike},
}, vote.Dislike},
}
......@@ -64,6 +64,10 @@ type opiniongivermock struct {
roundIndex int
}
func (ogm *opiniongivermock) ID() string {
return ""
}
func (ogm *opiniongivermock) Query(_ context.Context, _ []string) (vote.Opinions, error) {
if ogm.roundIndex >= len(ogm.roundsReplies) {
return ogm.roundsReplies[len(ogm.roundsReplies)-1], nil
......
......@@ -9,12 +9,20 @@ type OpinionGiver interface {
// Query queries the OpinionGiver for its opinions on the given IDs.
// The passed in context can be used to signal cancellation of the query.
Query(ctx context.Context, ids []string) (Opinions, error)
// ID returns the ID of the opinion giver.
ID() string
}
type OpinionQueryFunc func(ctx context.Context, ids []string) (Opinions, error)
func (oqf OpinionQueryFunc) Query(ctx context.Context, ids []string) (Opinions, error) {
return oqf(ctx, ids)
// QueriedOpinions represents queried opinions from a given opinion giver.
type QueriedOpinions struct {
// The ID of the opinion giver.
OpinionGiverID string `json:"opinion_giver_id"`
// The map of IDs to opinions.
Opinions map[string]Opinion `json:"opinions"`
// The amount of times the opinion giver's opinion has counted.
// Usually this number is 1 but due to randomization of the queried opinion givers,
// the same opinion giver's opinions might be taken into account multiple times.
TimesCounted int `json:"times_counted"`
}
// OpinionGiverFunc is a function which gives a slice of OpinionGivers or an error.
......
package fpc
package vote
import "github.com/iotaledger/goshimmer/packages/vote"
func newVoteContext(id string, initOpn vote.Opinion) *VoteContext {
voteCtx := &VoteContext{id: id, Liked: likedInit}
// NewContext creates a new vote context.
func NewContext(id string, initOpn Opinion) *Context {
voteCtx := &Context{ID: id, Liked: likedInit}
voteCtx.AddOpinion(initOpn)
return voteCtx
}
const likedInit = -1
// VoteContext is the context of votes from multiple rounds about a given item.
type VoteContext struct {
id string
// Context is the context of votes from multiple rounds about a given item.
type Context struct {
ID string
// The percentage of OpinionGivers who liked this item on the last query.
Liked float64
// The number of voting rounds performed.
Rounds int
// Append-only list of opinions formed after each round.
// the first opinion is the initial opinion when this vote context was created.
Opinions []vote.Opinion
Opinions []Opinion
}
// AddOpinion adds the given opinion to this vote context.
func (vc *VoteContext) AddOpinion(opn vote.Opinion) {
func (vc *Context) AddOpinion(opn Opinion) {
vc.Opinions = append(vc.Opinions, opn)
}
// LastOpinion returns the last formed opinion.
func (vc *VoteContext) LastOpinion() vote.Opinion {
func (vc *Context) LastOpinion() Opinion {
return vc.Opinions[len(vc.Opinions)-1]
}
// tells whether this vote context is finalized by checking whether the opinion was held
// IsFinalized tells whether this vote context is finalized by checking whether the opinion was held
// for finalizationThreshold number of rounds.
func (vc *VoteContext) IsFinalized(coolingOffPeriod int, finalizationThreshold int) bool {
func (vc *Context) IsFinalized(coolingOffPeriod int, finalizationThreshold int) bool {
// check whether we have enough opinions to say whether this vote context is finalized.
// we start from the 2nd opinion since the first one is the initial opinion.
if len(vc.Opinions[1:]) < coolingOffPeriod+finalizationThreshold {
......@@ -54,11 +53,11 @@ func (vc *VoteContext) IsFinalized(coolingOffPeriod int, finalizationThreshold i
}
// IsNew tells whether the vote context is new.
func (vc *VoteContext) IsNew() bool {
func (vc *Context) IsNew() bool {
return vc.Liked == likedInit
}
// HadFirstRound tells whether the vote context just had its first round.
func (vc *VoteContext) HadFirstRound() bool {
func (vc *Context) HadFirstRound() bool {
return vc.Rounds == 1
}
......@@ -2,6 +2,7 @@ package vote
import (
"errors"
"time"
"github.com/iotaledger/hive.go/events"
)
......@@ -35,11 +36,33 @@ type Events struct {
Finalized *events.Event
// Fired when an Opinion couldn't be finalized.
Failed *events.Event
// Fired when a DRNGRoundBasedVoter has executed a round.
RoundExecuted *events.Event
// Fired when internal errors occur.
Error *events.Event
}
// RoundStats encapsulates data about an executed round.
type RoundStats struct {
// The time it took to complete a round.
Duration time.Duration `json:"duration"`
// The rand number used during the round.
RandUsed float64 `json:"rand_used"`
// The vote contexts on which opinions were formed and queried.
// This list does not include the vote contexts which were finalized/aborted
// during the execution of the round.
// Create a copy of this map if you need to modify any of its elements.
ActiveVoteContexts map[string]*Context `json:"active_vote_contexts"`
// The opinions which were queried during the round per opinion giver.
QueriedOpinions []QueriedOpinions `json:"queried_opinions"`
}
// OpinionCaller calls the given handler with an Opinion and its associated Id.
func OpinionCaller(handler interface{}, params ...interface{}) {
handler.(func(id string, opinion Opinion))(params[0].(string), params[1].(Opinion))
}
// RoundStats calls the given handler with a RoundStats.
func RoundStatsCaller(handler interface{}, params ...interface{}) {
handler.(func(stats *RoundStats))(params[0].(*RoundStats))
}
......@@ -8,6 +8,7 @@ import (
"sync"
"github.com/iotaledger/goshimmer/packages/prng"
"github.com/iotaledger/hive.go/events"
"google.golang.org/grpc"
"github.com/iotaledger/goshimmer/packages/shutdown"
......@@ -36,6 +37,7 @@ var (
roundIntervalSeconds int64 = 5
)
// Voter returns the DRNGRoundBasedVoter instance used by the FPC plugin.
func Voter() vote.DRNGRoundBasedVoter {
voterOnce.Do(func() {
// create a function which gets OpinionGivers
......@@ -46,7 +48,8 @@ func Voter() vote.DRNGRoundBasedVoter {
if fpcService == nil {
continue
}
opinionGivers = append(opinionGivers, peerOpinionGiver(p))
// TODO: maybe cache the PeerOpinionGiver instead of creating a new one every time
opinionGivers = append(opinionGivers, &PeerOpinionGiver{p: p})
}
return opinionGivers, nil
}
......@@ -72,6 +75,12 @@ func configure(_ *node.Plugin) {
if err := lPeer.UpdateService(service.FPCKey, "tcp", port); err != nil {
log.Fatalf("could not update services: %v", err)
}
voter.Events().RoundExecuted.Attach(events.NewClosure(func(roundStats *vote.RoundStats) {
peersQueried := len(roundStats.QueriedOpinions)
voteContextsCount := len(roundStats.ActiveVoteContexts)
log.Infof("executed round with rand %0.4f for %d vote contexts on %d peers, took %v", roundStats.RandUsed, voteContextsCount, peersQueried, roundStats.Duration)
}))
}
func run(_ *node.Plugin) {
......@@ -113,34 +122,40 @@ func run(_ *node.Plugin) {
}, shutdown.ShutdownPriorityFPC)
}
// creates an OpinionQueryFunc which uses the given peer.
func peerOpinionGiver(p *peer.Peer) vote.OpinionQueryFunc {
return func(ctx context.Context, ids []string) (vote.Opinions, error) {
fpcServicePort := p.Services().Get(service.FPCKey).Port()
fpcAddr := net.JoinHostPort(p.IP().String(), strconv.Itoa(fpcServicePort))
// PeerOpinionGiver implements the OpinionGiver interface based on a peer.
type PeerOpinionGiver struct {
p *peer.Peer
}
var opts []grpc.DialOption
opts = append(opts, grpc.WithInsecure())
func (pog *PeerOpinionGiver) Query(ctx context.Context, ids []string) (vote.Opinions, error) {
fpcServicePort := pog.p.Services().Get(service.FPCKey).Port()
fpcAddr := net.JoinHostPort(pog.p.IP().String(), strconv.Itoa(fpcServicePort))
// connect to the FPC service
conn, err := grpc.Dial(fpcAddr, opts...)
if err != nil {
return nil, fmt.Errorf("unable to connect to FPC service: %w", err)
}
defer conn.Close()
var opts []grpc.DialOption
opts = append(opts, grpc.WithInsecure())
client := votenet.NewVoterQueryClient(conn)
reply, err := client.Opinion(ctx, &votenet.QueryRequest{Id: ids})
if err != nil {
return nil, fmt.Errorf("unable to query opinions: %w", err)
}
// connect to the FPC service
conn, err := grpc.Dial(fpcAddr, opts...)
if err != nil {
return nil, fmt.Errorf("unable to connect to FPC service: %w", err)
}
defer conn.Close()
// convert int32s in reply to opinions
opinions := make(vote.Opinions, len(reply.Opinion))
for i, intOpn := range reply.Opinion {
opinions[i] = vote.ConvertInt32Opinion(intOpn)
}
client := votenet.NewVoterQueryClient(conn)
reply, err := client.Opinion(ctx, &votenet.QueryRequest{Id: ids})
if err != nil {
return nil, fmt.Errorf("unable to query opinions: %w", err)
}
return opinions, nil
// convert int32s in reply to opinions
opinions := make(vote.Opinions, len(reply.Opinion))
for i, intOpn := range reply.Opinion {
opinions[i] = vote.ConvertInt32Opinion(intOpn)
}
return opinions, nil
}
func (pog *PeerOpinionGiver) ID() string {
return pog.p.ID().String()
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment