diff --git a/packages/vote/fpc/fpc.go b/packages/vote/fpc/fpc.go index e1cfe0c65c7b373b0dd29cf9441ca02a28b3b355..63deecd3630f520ee976438312bb8dc988c65ae0 100644 --- a/packages/vote/fpc/fpc.go +++ b/packages/vote/fpc/fpc.go @@ -24,13 +24,14 @@ func New(opinionGiverFunc vote.OpinionGiverFunc, paras ...*Parameters) *FPC { opinionGiverFunc: opinionGiverFunc, paras: DefaultParameters(), opinionGiverRng: rand.New(rand.NewSource(time.Now().UnixNano())), - ctxs: make(map[string]*VoteContext), + ctxs: make(map[string]*vote.Context), queue: list.New(), queueSet: make(map[string]struct{}), events: vote.Events{ - Finalized: events.NewEvent(vote.OpinionCaller), - Failed: events.NewEvent(vote.OpinionCaller), - Error: events.NewEvent(events.ErrorCaller), + Finalized: events.NewEvent(vote.OpinionCaller), + Failed: events.NewEvent(vote.OpinionCaller), + RoundExecuted: events.NewEvent(vote.RoundStatsCaller), + Error: events.NewEvent(events.ErrorCaller), }, } if len(paras) > 0 { @@ -50,7 +51,7 @@ type FPC struct { queueSet map[string]struct{} queueMu sync.Mutex // contains the set of current vote contexts. - ctxs map[string]*VoteContext + ctxs map[string]*vote.Context ctxsMu sync.RWMutex // parameters to use within FPC. paras *Parameters @@ -71,7 +72,7 @@ func (f *FPC) Vote(id string, initOpn vote.Opinion) error { if _, alreadyOngoing := f.ctxs[id]; alreadyOngoing { return fmt.Errorf("%w: %s", ErrVoteAlreadyOngoing, id) } - f.queue.PushBack(newVoteContext(id, initOpn)) + f.queue.PushBack(vote.NewContext(id, initOpn)) f.queueSet[id] = struct{}{} return nil } @@ -93,6 +94,7 @@ func (f *FPC) Events() vote.Events { // Round enqueues new items, sets opinions on active vote contexts, finalizes them and then // queries for opinions. func (f *FPC) Round(rand float64) error { + start := time.Now() // enqueue new voting contexts f.enqueue() // we can only form opinions when the last round was actually executed successfully @@ -104,8 +106,20 @@ func (f *FPC) Round(rand float64) error { f.finalizeOpinions() } // query for opinions on the current vote contexts - err := f.queryOpinions() - f.lastRoundCompletedSuccessfully = err == nil + queriedOpinions, err := f.queryOpinions() + if err == nil { + f.lastRoundCompletedSuccessfully = true + // execute a round executed event + roundStats := &vote.RoundStats{ + Duration: time.Since(start), + RandUsed: rand, + ActiveVoteContexts: f.ctxs, + QueriedOpinions: queriedOpinions, + } + // TODO: add possibility to check whether an event handler is registered + // in order to prevent the collection of the round stats data if not needed + f.events.RoundExecuted.Trigger(roundStats) + } return err } @@ -116,10 +130,10 @@ func (f *FPC) enqueue() { f.ctxsMu.Lock() defer f.ctxsMu.Unlock() for ele := f.queue.Front(); ele != nil; ele = f.queue.Front() { - voteCtx := ele.Value.(*VoteContext) - f.ctxs[voteCtx.id] = voteCtx + voteCtx := ele.Value.(*vote.Context) + f.ctxs[voteCtx.ID] = voteCtx f.queue.Remove(ele) - delete(f.queueSet, voteCtx.id) + delete(f.queueSet, voteCtx.ID) } } @@ -168,22 +182,22 @@ func (f *FPC) finalizeOpinions() { } // queries the opinions of QuerySampleSize amount of OpinionGivers. -func (f *FPC) queryOpinions() error { +func (f *FPC) queryOpinions() ([]vote.QueriedOpinions, error) { ids := f.voteContextIDs() // nothing to vote on if len(ids) == 0 { - return nil + return nil, nil } opinionGivers, err := f.opinionGiverFunc() if err != nil { - return err + return nil, err } // nobody to query if len(opinionGivers) == 0 { - return ErrNoOpinionGiversAvailable + return nil, ErrNoOpinionGiversAvailable } // select a random subset of opinion givers to query. @@ -199,6 +213,9 @@ func (f *FPC) queryOpinions() error { var voteMapMu sync.Mutex voteMap := map[string]vote.Opinions{} + // holds queried opinions + allQueriedOpinions := []vote.QueriedOpinions{} + // send queries var wg sync.WaitGroup for opinionGiverToQuery, selectedCount := range opinionGiversToQuery { @@ -216,6 +233,12 @@ func (f *FPC) queryOpinions() error { return } + queriedOpinions := vote.QueriedOpinions{ + OpinionGiverID: opinionGiverToQuery.ID(), + Opinions: make(map[string]vote.Opinion), + TimesCounted: selectedCount, + } + // add opinions to vote map voteMapMu.Lock() defer voteMapMu.Unlock() @@ -229,8 +252,10 @@ func (f *FPC) queryOpinions() error { for j := 0; j < selectedCount; j++ { votes = append(votes, opinions[i]) } + queriedOpinions.Opinions[id] = opinions[i] voteMap[id] = votes } + allQueriedOpinions = append(allQueriedOpinions, queriedOpinions) }(opinionGiverToQuery, selectedCount) } wg.Wait() @@ -259,7 +284,7 @@ func (f *FPC) queryOpinions() error { } f.ctxs[id].Liked = likedSum / votedCount } - return nil + return allQueriedOpinions, nil } func (f *FPC) voteContextIDs() []string { diff --git a/packages/vote/fpc/fpc_test.go b/packages/vote/fpc/fpc_test.go index 86484a47c0441fc6dab8f2352b4766ec2adc5012..3fe0d7f44d5be8090a9f803be267272b0e6f8500 100644 --- a/packages/vote/fpc/fpc_test.go +++ b/packages/vote/fpc/fpc_test.go @@ -14,16 +14,16 @@ import ( func TestVoteContext_IsFinalized(t *testing.T) { type testInput struct { - voteCtx fpc.VoteContext + voteCtx vote.Context coolOffPeriod int finalizationThreshold int want bool } var tests = []testInput{ - {fpc.VoteContext{ + {vote.Context{ Opinions: []vote.Opinion{vote.Like, vote.Like, vote.Like, vote.Like, vote.Like}, }, 2, 2, true}, - {fpc.VoteContext{ + {vote.Context{ Opinions: []vote.Opinion{vote.Like, vote.Like, vote.Like, vote.Like, vote.Dislike}, }, 2, 2, false}, } @@ -35,14 +35,14 @@ func TestVoteContext_IsFinalized(t *testing.T) { func TestVoteContext_LastOpinion(t *testing.T) { type testInput struct { - voteCtx fpc.VoteContext + voteCtx vote.Context expected vote.Opinion } var tests = []testInput{ - {fpc.VoteContext{ + {vote.Context{ Opinions: []vote.Opinion{vote.Like, vote.Like, vote.Like, vote.Like}, }, vote.Like}, - {fpc.VoteContext{ + {vote.Context{ Opinions: []vote.Opinion{vote.Like, vote.Like, vote.Like, vote.Dislike}, }, vote.Dislike}, } @@ -64,6 +64,10 @@ type opiniongivermock struct { roundIndex int } +func (ogm *opiniongivermock) ID() string { + return "" +} + func (ogm *opiniongivermock) Query(_ context.Context, _ []string) (vote.Opinions, error) { if ogm.roundIndex >= len(ogm.roundsReplies) { return ogm.roundsReplies[len(ogm.roundsReplies)-1], nil diff --git a/packages/vote/opinion.go b/packages/vote/opinion.go index 4164c576541d278cebcf706749e30db5c0adab7a..5b79ca52cad2226df685ec6ff5b5ea6c80cbc374 100644 --- a/packages/vote/opinion.go +++ b/packages/vote/opinion.go @@ -9,12 +9,20 @@ type OpinionGiver interface { // Query queries the OpinionGiver for its opinions on the given IDs. // The passed in context can be used to signal cancellation of the query. Query(ctx context.Context, ids []string) (Opinions, error) + // ID returns the ID of the opinion giver. + ID() string } -type OpinionQueryFunc func(ctx context.Context, ids []string) (Opinions, error) - -func (oqf OpinionQueryFunc) Query(ctx context.Context, ids []string) (Opinions, error) { - return oqf(ctx, ids) +// QueriedOpinions represents queried opinions from a given opinion giver. +type QueriedOpinions struct { + // The ID of the opinion giver. + OpinionGiverID string `json:"opinion_giver_id"` + // The map of IDs to opinions. + Opinions map[string]Opinion `json:"opinions"` + // The amount of times the opinion giver's opinion has counted. + // Usually this number is 1 but due to randomization of the queried opinion givers, + // the same opinion giver's opinions might be taken into account multiple times. + TimesCounted int `json:"times_counted"` } // OpinionGiverFunc is a function which gives a slice of OpinionGivers or an error. diff --git a/packages/vote/fpc/vote_context.go b/packages/vote/vote_context.go similarity index 67% rename from packages/vote/fpc/vote_context.go rename to packages/vote/vote_context.go index fb4ae5e841cd3bc8b378de791fd0de7d0ff24d01..5566c9c13f511602acb855395ba82f3cfb9ad27e 100644 --- a/packages/vote/fpc/vote_context.go +++ b/packages/vote/vote_context.go @@ -1,40 +1,39 @@ -package fpc +package vote -import "github.com/iotaledger/goshimmer/packages/vote" - -func newVoteContext(id string, initOpn vote.Opinion) *VoteContext { - voteCtx := &VoteContext{id: id, Liked: likedInit} +// NewContext creates a new vote context. +func NewContext(id string, initOpn Opinion) *Context { + voteCtx := &Context{ID: id, Liked: likedInit} voteCtx.AddOpinion(initOpn) return voteCtx } const likedInit = -1 -// VoteContext is the context of votes from multiple rounds about a given item. -type VoteContext struct { - id string +// Context is the context of votes from multiple rounds about a given item. +type Context struct { + ID string // The percentage of OpinionGivers who liked this item on the last query. Liked float64 // The number of voting rounds performed. Rounds int // Append-only list of opinions formed after each round. // the first opinion is the initial opinion when this vote context was created. - Opinions []vote.Opinion + Opinions []Opinion } // AddOpinion adds the given opinion to this vote context. -func (vc *VoteContext) AddOpinion(opn vote.Opinion) { +func (vc *Context) AddOpinion(opn Opinion) { vc.Opinions = append(vc.Opinions, opn) } // LastOpinion returns the last formed opinion. -func (vc *VoteContext) LastOpinion() vote.Opinion { +func (vc *Context) LastOpinion() Opinion { return vc.Opinions[len(vc.Opinions)-1] } -// tells whether this vote context is finalized by checking whether the opinion was held +// IsFinalized tells whether this vote context is finalized by checking whether the opinion was held // for finalizationThreshold number of rounds. -func (vc *VoteContext) IsFinalized(coolingOffPeriod int, finalizationThreshold int) bool { +func (vc *Context) IsFinalized(coolingOffPeriod int, finalizationThreshold int) bool { // check whether we have enough opinions to say whether this vote context is finalized. // we start from the 2nd opinion since the first one is the initial opinion. if len(vc.Opinions[1:]) < coolingOffPeriod+finalizationThreshold { @@ -54,11 +53,11 @@ func (vc *VoteContext) IsFinalized(coolingOffPeriod int, finalizationThreshold i } // IsNew tells whether the vote context is new. -func (vc *VoteContext) IsNew() bool { +func (vc *Context) IsNew() bool { return vc.Liked == likedInit } // HadFirstRound tells whether the vote context just had its first round. -func (vc *VoteContext) HadFirstRound() bool { +func (vc *Context) HadFirstRound() bool { return vc.Rounds == 1 } diff --git a/packages/vote/voter.go b/packages/vote/voter.go index c50781852c107d7ffbe55fa65089c9af2b738720..12f904a10ab8deb594ef0662d371ff634227fd95 100644 --- a/packages/vote/voter.go +++ b/packages/vote/voter.go @@ -2,6 +2,7 @@ package vote import ( "errors" + "time" "github.com/iotaledger/hive.go/events" ) @@ -35,11 +36,33 @@ type Events struct { Finalized *events.Event // Fired when an Opinion couldn't be finalized. Failed *events.Event + // Fired when a DRNGRoundBasedVoter has executed a round. + RoundExecuted *events.Event // Fired when internal errors occur. Error *events.Event } +// RoundStats encapsulates data about an executed round. +type RoundStats struct { + // The time it took to complete a round. + Duration time.Duration `json:"duration"` + // The rand number used during the round. + RandUsed float64 `json:"rand_used"` + // The vote contexts on which opinions were formed and queried. + // This list does not include the vote contexts which were finalized/aborted + // during the execution of the round. + // Create a copy of this map if you need to modify any of its elements. + ActiveVoteContexts map[string]*Context `json:"active_vote_contexts"` + // The opinions which were queried during the round per opinion giver. + QueriedOpinions []QueriedOpinions `json:"queried_opinions"` +} + // OpinionCaller calls the given handler with an Opinion and its associated Id. func OpinionCaller(handler interface{}, params ...interface{}) { handler.(func(id string, opinion Opinion))(params[0].(string), params[1].(Opinion)) } + +// RoundStats calls the given handler with a RoundStats. +func RoundStatsCaller(handler interface{}, params ...interface{}) { + handler.(func(stats *RoundStats))(params[0].(*RoundStats)) +} diff --git a/plugins/fpc/plugin.go b/plugins/fpc/plugin.go index 7c333bd1af6ae3d184dc0c717612e6e6ad841b13..8d9e55b0a73e0aa3f16f977f73096ebfe1507ab2 100644 --- a/plugins/fpc/plugin.go +++ b/plugins/fpc/plugin.go @@ -8,6 +8,7 @@ import ( "sync" "github.com/iotaledger/goshimmer/packages/prng" + "github.com/iotaledger/hive.go/events" "google.golang.org/grpc" "github.com/iotaledger/goshimmer/packages/shutdown" @@ -36,6 +37,7 @@ var ( roundIntervalSeconds int64 = 5 ) +// Voter returns the DRNGRoundBasedVoter instance used by the FPC plugin. func Voter() vote.DRNGRoundBasedVoter { voterOnce.Do(func() { // create a function which gets OpinionGivers @@ -46,7 +48,8 @@ func Voter() vote.DRNGRoundBasedVoter { if fpcService == nil { continue } - opinionGivers = append(opinionGivers, peerOpinionGiver(p)) + // TODO: maybe cache the PeerOpinionGiver instead of creating a new one every time + opinionGivers = append(opinionGivers, &PeerOpinionGiver{p: p}) } return opinionGivers, nil } @@ -72,6 +75,12 @@ func configure(_ *node.Plugin) { if err := lPeer.UpdateService(service.FPCKey, "tcp", port); err != nil { log.Fatalf("could not update services: %v", err) } + + voter.Events().RoundExecuted.Attach(events.NewClosure(func(roundStats *vote.RoundStats) { + peersQueried := len(roundStats.QueriedOpinions) + voteContextsCount := len(roundStats.ActiveVoteContexts) + log.Infof("executed round with rand %0.4f for %d vote contexts on %d peers, took %v", roundStats.RandUsed, voteContextsCount, peersQueried, roundStats.Duration) + })) } func run(_ *node.Plugin) { @@ -113,34 +122,40 @@ func run(_ *node.Plugin) { }, shutdown.ShutdownPriorityFPC) } -// creates an OpinionQueryFunc which uses the given peer. -func peerOpinionGiver(p *peer.Peer) vote.OpinionQueryFunc { - return func(ctx context.Context, ids []string) (vote.Opinions, error) { - fpcServicePort := p.Services().Get(service.FPCKey).Port() - fpcAddr := net.JoinHostPort(p.IP().String(), strconv.Itoa(fpcServicePort)) +// PeerOpinionGiver implements the OpinionGiver interface based on a peer. +type PeerOpinionGiver struct { + p *peer.Peer +} - var opts []grpc.DialOption - opts = append(opts, grpc.WithInsecure()) +func (pog *PeerOpinionGiver) Query(ctx context.Context, ids []string) (vote.Opinions, error) { + fpcServicePort := pog.p.Services().Get(service.FPCKey).Port() + fpcAddr := net.JoinHostPort(pog.p.IP().String(), strconv.Itoa(fpcServicePort)) - // connect to the FPC service - conn, err := grpc.Dial(fpcAddr, opts...) - if err != nil { - return nil, fmt.Errorf("unable to connect to FPC service: %w", err) - } - defer conn.Close() + var opts []grpc.DialOption + opts = append(opts, grpc.WithInsecure()) - client := votenet.NewVoterQueryClient(conn) - reply, err := client.Opinion(ctx, &votenet.QueryRequest{Id: ids}) - if err != nil { - return nil, fmt.Errorf("unable to query opinions: %w", err) - } + // connect to the FPC service + conn, err := grpc.Dial(fpcAddr, opts...) + if err != nil { + return nil, fmt.Errorf("unable to connect to FPC service: %w", err) + } + defer conn.Close() - // convert int32s in reply to opinions - opinions := make(vote.Opinions, len(reply.Opinion)) - for i, intOpn := range reply.Opinion { - opinions[i] = vote.ConvertInt32Opinion(intOpn) - } + client := votenet.NewVoterQueryClient(conn) + reply, err := client.Opinion(ctx, &votenet.QueryRequest{Id: ids}) + if err != nil { + return nil, fmt.Errorf("unable to query opinions: %w", err) + } - return opinions, nil + // convert int32s in reply to opinions + opinions := make(vote.Opinions, len(reply.Opinion)) + for i, intOpn := range reply.Opinion { + opinions[i] = vote.ConvertInt32Opinion(intOpn) } + + return opinions, nil +} + +func (pog *PeerOpinionGiver) ID() string { + return pog.p.ID().String() }