package fpc import ( "container/list" "context" "errors" "fmt" "math/rand" "sync" "time" "github.com/iotaledger/goshimmer/packages/vote" "github.com/iotaledger/hive.go/events" ) var ( ErrVoteAlreadyOngoing = errors.New("a vote is already ongoing for the given ID") ErrNoOpinionGiversAvailable = errors.New("can't perform round as no opinion givers are available") ) // New creates a new FPC instance. func New(opinionGiverFunc vote.OpinionGiverFunc, paras ...*Parameters) *FPC { f := &FPC{ opinionGiverFunc: opinionGiverFunc, paras: DefaultParameters(), opinionGiverRng: rand.New(rand.NewSource(time.Now().UnixNano())), ctxs: make(map[string]*vote.Context), queue: list.New(), queueSet: make(map[string]struct{}), events: vote.Events{ Finalized: events.NewEvent(vote.OpinionCaller), Failed: events.NewEvent(vote.OpinionCaller), RoundExecuted: events.NewEvent(vote.RoundStatsCaller), Error: events.NewEvent(events.ErrorCaller), }, } if len(paras) > 0 { f.paras = paras[0] } return f } // FPC is a DRNGRoundBasedVoter which uses the Opinion of other entities // in order to finalize an Opinion. type FPC struct { events vote.Events opinionGiverFunc vote.OpinionGiverFunc // the lifo queue of newly enqueued items to vote on. queue *list.List // contains a set of currently queued items. queueSet map[string]struct{} queueMu sync.Mutex // contains the set of current vote contexts. ctxs map[string]*vote.Context ctxsMu sync.RWMutex // parameters to use within FPC. paras *Parameters // indicates whether the last round was performed successfully. lastRoundCompletedSuccessfully bool // used to randomly select opinion givers. opinionGiverRng *rand.Rand } func (f *FPC) Vote(id string, initOpn vote.Opinion) error { f.queueMu.Lock() defer f.queueMu.Unlock() f.ctxsMu.RLock() defer f.ctxsMu.RUnlock() if _, alreadyQueued := f.queueSet[id]; alreadyQueued { return fmt.Errorf("%w: %s", ErrVoteAlreadyOngoing, id) } if _, alreadyOngoing := f.ctxs[id]; alreadyOngoing { return fmt.Errorf("%w: %s", ErrVoteAlreadyOngoing, id) } f.queue.PushBack(vote.NewContext(id, initOpn)) f.queueSet[id] = struct{}{} return nil } func (f *FPC) IntermediateOpinion(id string) (vote.Opinion, error) { f.ctxsMu.RLock() defer f.ctxsMu.RUnlock() voteCtx, has := f.ctxs[id] if !has { return vote.Unknown, fmt.Errorf("%w: %s", vote.ErrVotingNotFound, id) } return voteCtx.LastOpinion(), nil } func (f *FPC) Events() vote.Events { return f.events } // Round enqueues new items, sets opinions on active vote contexts, finalizes them and then // queries for opinions. func (f *FPC) Round(rand float64) error { start := time.Now() // enqueue new voting contexts f.enqueue() // we can only form opinions when the last round was actually executed successfully if f.lastRoundCompletedSuccessfully { // form opinions by using the random number supplied for this new round f.formOpinions(rand) // clean opinions on vote contexts where an opinion was reached in FinalizationThreshold // number of rounds and clear those who failed to be finalized in MaxRoundsPerVoteContext. f.finalizeOpinions() } // query for opinions on the current vote contexts queriedOpinions, err := f.queryOpinions() if err == nil { f.lastRoundCompletedSuccessfully = true // execute a round executed event roundStats := &vote.RoundStats{ Duration: time.Since(start), RandUsed: rand, ActiveVoteContexts: f.ctxs, QueriedOpinions: queriedOpinions, } // TODO: add possibility to check whether an event handler is registered // in order to prevent the collection of the round stats data if not needed f.events.RoundExecuted.Trigger(roundStats) } return err } // enqueues items for voting func (f *FPC) enqueue() { f.queueMu.Lock() defer f.queueMu.Unlock() f.ctxsMu.Lock() defer f.ctxsMu.Unlock() for ele := f.queue.Front(); ele != nil; ele = f.queue.Front() { voteCtx := ele.Value.(*vote.Context) f.ctxs[voteCtx.ID] = voteCtx f.queue.Remove(ele) delete(f.queueSet, voteCtx.ID) } } // formOpinions updates the opinion for ongoing vote contexts by comparing their liked percentage // against the threshold appropriate for their given rounds. func (f *FPC) formOpinions(rand float64) { f.ctxsMu.RLock() defer f.ctxsMu.RUnlock() for _, voteCtx := range f.ctxs { // when the vote context is new there's no opinion to form if voteCtx.IsNew() { continue } lowerThreshold := f.paras.SubsequentRoundsLowerBoundThreshold upperThreshold := f.paras.SubsequentRoundsUpperBoundThreshold if voteCtx.HadFirstRound() { lowerThreshold = f.paras.FirstRoundLowerBoundThreshold upperThreshold = f.paras.FirstRoundUpperBoundThreshold } if voteCtx.Liked >= RandUniformThreshold(rand, lowerThreshold, upperThreshold) { voteCtx.AddOpinion(vote.Like) continue } voteCtx.AddOpinion(vote.Dislike) } } // emits a Voted event for every finalized vote context (or Failed event if failed) and then removes it from FPC. func (f *FPC) finalizeOpinions() { f.ctxsMu.Lock() defer f.ctxsMu.Unlock() for id, voteCtx := range f.ctxs { if voteCtx.IsFinalized(f.paras.CoolingOffPeriod, f.paras.FinalizationThreshold) { f.events.Finalized.Trigger(id, voteCtx.LastOpinion()) delete(f.ctxs, id) continue } if voteCtx.Rounds >= f.paras.MaxRoundsPerVoteContext { f.events.Failed.Trigger(id, voteCtx.LastOpinion()) delete(f.ctxs, id) } } } // queries the opinions of QuerySampleSize amount of OpinionGivers. func (f *FPC) queryOpinions() ([]vote.QueriedOpinions, error) { ids := f.voteContextIDs() // nothing to vote on if len(ids) == 0 { return nil, nil } opinionGivers, err := f.opinionGiverFunc() if err != nil { return nil, err } // nobody to query if len(opinionGivers) == 0 { return nil, ErrNoOpinionGiversAvailable } // select a random subset of opinion givers to query. // if the same opinion giver is selected multiple times, we query it only once // but use its opinion N selected times. opinionGiversToQuery := map[vote.OpinionGiver]int{} for i := 0; i < f.paras.QuerySampleSize; i++ { selected := opinionGivers[f.opinionGiverRng.Intn(len(opinionGivers))] opinionGiversToQuery[selected]++ } // votes per id var voteMapMu sync.Mutex voteMap := map[string]vote.Opinions{} // holds queried opinions allQueriedOpinions := []vote.QueriedOpinions{} // send queries var wg sync.WaitGroup for opinionGiverToQuery, selectedCount := range opinionGiversToQuery { wg.Add(1) go func(opinionGiverToQuery vote.OpinionGiver, selectedCount int) { defer wg.Done() queryCtx, cancel := context.WithTimeout(context.Background(), f.paras.QueryTimeout) defer cancel() // query opinions, err := opinionGiverToQuery.Query(queryCtx, ids) if err != nil || len(opinions) != len(ids) { // ignore opinions return } queriedOpinions := vote.QueriedOpinions{ OpinionGiverID: opinionGiverToQuery.ID(), Opinions: make(map[string]vote.Opinion), TimesCounted: selectedCount, } // add opinions to vote map voteMapMu.Lock() defer voteMapMu.Unlock() for i, id := range ids { votes, has := voteMap[id] if !has { votes = vote.Opinions{} } // reuse the opinion N times selected. // note this is always at least 1. for j := 0; j < selectedCount; j++ { votes = append(votes, opinions[i]) } queriedOpinions.Opinions[id] = opinions[i] voteMap[id] = votes } allQueriedOpinions = append(allQueriedOpinions, queriedOpinions) }(opinionGiverToQuery, selectedCount) } wg.Wait() f.ctxsMu.RLock() defer f.ctxsMu.RUnlock() // compute liked percentage for id, votes := range voteMap { var likedSum float64 votedCount := float64(len(votes)) for _, opinion := range votes { switch opinion { case vote.Unknown: votedCount-- case vote.Like: likedSum++ } } // mark a round being done, even though there's no opinion, // so this voting context will be cleared eventually f.ctxs[id].Rounds++ if votedCount == 0 { continue } f.ctxs[id].Liked = likedSum / votedCount } return allQueriedOpinions, nil } func (f *FPC) voteContextIDs() []string { f.ctxsMu.RLock() defer f.ctxsMu.RUnlock() var i int ids := make([]string, len(f.ctxs)) for id := range f.ctxs { ids[i] = id i++ } return ids }