Skip to content
Snippets Groups Projects
Unverified Commit e4283d33 authored by Angelo Capossele's avatar Angelo Capossele Committed by GitHub
Browse files

Merge pull request #325 from iotaledger/refactor/fpc

Refactors FPC packages
parents 4b34c119 c1993f0e
Branches
Tags
No related merge requests found
......@@ -30,5 +30,6 @@ require (
golang.org/x/crypto v0.0.0-20200302210943-78000ba7a073
golang.org/x/net v0.0.0-20200301022130-244492dfa37a
golang.org/x/tools v0.0.0-20200330040139-fa3cc9eebcfe // indirect
google.golang.org/grpc v1.28.1
gopkg.in/src-d/go-git.v4 v4.13.1
)
......@@ -34,6 +34,7 @@ github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA
github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko=
github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
github.com/coreos/bbolt v1.3.2/go.mod h1:iRUV2dpdMOn7Bo10OQBFzIJO9kkE559Wcmn+qkEiiKk=
github.com/coreos/bbolt v1.3.3 h1:n6AiVyVRKQFNb6mJlwESEvvLoDyiTzXX7ORAUlkeBdY=
github.com/coreos/bbolt v1.3.3/go.mod h1:iRUV2dpdMOn7Bo10OQBFzIJO9kkE559Wcmn+qkEiiKk=
......@@ -77,7 +78,9 @@ github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4
github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
github.com/emirpasic/gods v1.12.0 h1:QAUIPSaCu4G+POclxeqb3F+WPpdKqFGlw36+yOzGlrg=
github.com/emirpasic/gods v1.12.0/go.mod h1:YfzfFFoVP/catgzJb4IKIqXjX78Ha8FMSDh3ymbK86o=
github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98=
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
github.com/flynn/go-shlex v0.0.0-20150515145356-3f9db97f8568 h1:BHsljHzVlRcyQhjrss6TZTdY2VfCqZPbv5k3iBFa2ZQ=
github.com/flynn/go-shlex v0.0.0-20150515145356-3f9db97f8568/go.mod h1:xEzjJPgXI435gkrCt3MPfRiAkVrwSbHsst4LCFVfpJc=
......@@ -117,6 +120,7 @@ github.com/golang/mock v1.2.0/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfb
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.3/go.mod h1:vzj43D7+SQXF/4pzW/hwtAqwc6iTitCiVSaWz5lYuqw=
github.com/golang/protobuf v1.3.4 h1:87PNWwrRvUSnqS4dlcBU/ftvOIBep4sYuBLlh6rX2wk=
github.com/golang/protobuf v1.3.4/go.mod h1:vzj43D7+SQXF/4pzW/hwtAqwc6iTitCiVSaWz5lYuqw=
github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
......@@ -519,8 +523,11 @@ google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZi
google.golang.org/grpc v1.21.0/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM=
google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg=
google.golang.org/grpc v1.24.0/go.mod h1:XDChyiUovWa60DnaeDeZmSW86xtLtjtZbwvSiRnRtcA=
google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY=
google.golang.org/grpc v1.26.0 h1:2dTRdpdFEEhJYQD8EMLB61nnrzSCTbG38PhqdhvOltg=
google.golang.org/grpc v1.26.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk=
google.golang.org/grpc v1.28.1 h1:C1QC6KzgSiLyBabDi87BbjaGreoRgGUF5nOyvfrAZ1k=
google.golang.org/grpc v1.28.1/go.mod h1:rpkK4SK4GF4Ach/+MFLZUBavHOvF2JJB5uozKKal+60=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
......
......
package prng
import (
"bytes"
"encoding/binary"
"time"
)
// TimeSourceFunc is a function which gets an understanding of time in seconds resolution back.
type TimeSourceFunc func() int64
// NewUnixTimestampPRNG creates a new Unix timestamp based pseudo random number generator
// using the given resolution. The resolution defines at which second interval numbers are generated.
func NewUnixTimestampPRNG(resolution int64, timeSourceFunc ...TimeSourceFunc) *UnixTimestampPrng {
utrng := &UnixTimestampPrng{
c: make(chan float64),
exit: make(chan struct{}),
resolution: resolution,
timeSourceFunc: func() int64 { return time.Now().Unix() },
}
if len(timeSourceFunc) > 0 {
utrng.timeSourceFunc = timeSourceFunc[0]
}
return utrng
}
// UnixTimestampPrng is a pseudo random number generator using the Unix time in seconds to derive
// a random number from.
type UnixTimestampPrng struct {
c chan float64
exit chan struct{}
resolution int64
timeSourceFunc TimeSourceFunc
}
// Start starts the Unix timestamp pseudo random number generator by examining the
// interval and then starting production of numbers after at least interval seconds
// plus delta of the next resolution time have elapsed.
func (utrng *UnixTimestampPrng) Start() {
nowSec := utrng.timeSourceFunc()
nextTimePoint := ResolveNextTimePoint(nowSec, utrng.resolution)
time.AfterFunc(time.Duration(nextTimePoint-nowSec)*time.Second, func() {
// send for the first time right after the timer is executed
utrng.send()
t := time.NewTicker(time.Duration(utrng.resolution) * time.Second)
defer t.Stop()
out:
for {
select {
case <-t.C:
utrng.send()
case <-utrng.exit:
break out
}
}
})
}
// sends the next pseudo random number to the consumer channel.
func (utrng *UnixTimestampPrng) send() {
now := utrng.timeSourceFunc()
// reduce to last resolution
timePoint := now - (now % utrng.resolution)
// convert to float64
buf := bytes.NewBuffer(make([]byte, 0, 8))
if err := binary.Write(buf, binary.LittleEndian, timePoint); err != nil {
panic(err)
}
pseudoR := float64(binary.BigEndian.Uint64(buf.Bytes()[:8])>>11) / (1 << 53)
// skip slow consumers
select {
case utrng.c <- pseudoR:
default:
}
}
// C returns the channel from which random generated numbers can be consumed from.
func (utrng *UnixTimestampPrng) C() <-chan float64 {
return utrng.c
}
// Stop stops the Unix timestamp pseudo random number generator.
func (utrng *UnixTimestampPrng) Stop() {
utrng.exit <- struct{}{}
}
// ResolveNextTimePoint returns the next time point.
func ResolveNextTimePoint(nowSec int64, resolution int64) int64 {
return nowSec + (resolution - nowSec%resolution)
}
package prng_test
import (
"testing"
"github.com/iotaledger/goshimmer/packages/prng"
"github.com/stretchr/testify/assert"
)
func TestResolveNextTimePoint(t *testing.T) {
assert.EqualValues(t, 105, prng.ResolveNextTimePoint(103, 5))
assert.EqualValues(t, 110, prng.ResolveNextTimePoint(105, 5))
assert.EqualValues(t, 105, prng.ResolveNextTimePoint(100, 5))
assert.EqualValues(t, 100, prng.ResolveNextTimePoint(97, 5))
}
func TestUnixTsPrng(t *testing.T) {
unixTsRng := prng.NewUnixTimestampPRNG(1)
unixTsRng.Start()
defer unixTsRng.Stop()
var last float64
for i := 0; i < 3; i++ {
r := <-unixTsRng.C()
assert.Less(t, r, 1.0)
assert.Greater(t, r, 0.0)
assert.NotEqual(t, last, r)
last = r
}
}
......@@ -2,6 +2,7 @@ package shutdown
const (
ShutdownPriorityDatabase = iota
ShutdownPriorityFPC
ShutdownPriorityTangle
ShutdownPriorityRemoteLog
ShutdownPriorityAnalysis
......
......
package fpc
import (
"container/list"
"context"
"errors"
"fmt"
"math/rand"
"sync"
"time"
"github.com/iotaledger/goshimmer/packages/vote"
"github.com/iotaledger/hive.go/events"
)
var (
ErrVoteAlreadyOngoing = errors.New("a vote is already ongoing for the given ID")
ErrNoOpinionGiversAvailable = errors.New("can't perform round as no opinion givers are available")
)
// New creates a new FPC instance.
func New(opinionGiverFunc vote.OpinionGiverFunc, paras ...*Parameters) *FPC {
f := &FPC{
opinionGiverFunc: opinionGiverFunc,
paras: DefaultParameters(),
opinionGiverRng: rand.New(rand.NewSource(time.Now().UnixNano())),
ctxs: make(map[string]*vote.Context),
queue: list.New(),
queueSet: make(map[string]struct{}),
events: vote.Events{
Finalized: events.NewEvent(vote.OpinionCaller),
Failed: events.NewEvent(vote.OpinionCaller),
RoundExecuted: events.NewEvent(vote.RoundStatsCaller),
Error: events.NewEvent(events.ErrorCaller),
},
}
if len(paras) > 0 {
f.paras = paras[0]
}
return f
}
// FPC is a DRNGRoundBasedVoter which uses the Opinion of other entities
// in order to finalize an Opinion.
type FPC struct {
events vote.Events
opinionGiverFunc vote.OpinionGiverFunc
// the lifo queue of newly enqueued items to vote on.
queue *list.List
// contains a set of currently queued items.
queueSet map[string]struct{}
queueMu sync.Mutex
// contains the set of current vote contexts.
ctxs map[string]*vote.Context
ctxsMu sync.RWMutex
// parameters to use within FPC.
paras *Parameters
// indicates whether the last round was performed successfully.
lastRoundCompletedSuccessfully bool
// used to randomly select opinion givers.
opinionGiverRng *rand.Rand
}
func (f *FPC) Vote(id string, initOpn vote.Opinion) error {
f.queueMu.Lock()
defer f.queueMu.Unlock()
f.ctxsMu.RLock()
defer f.ctxsMu.RUnlock()
if _, alreadyQueued := f.queueSet[id]; alreadyQueued {
return fmt.Errorf("%w: %s", ErrVoteAlreadyOngoing, id)
}
if _, alreadyOngoing := f.ctxs[id]; alreadyOngoing {
return fmt.Errorf("%w: %s", ErrVoteAlreadyOngoing, id)
}
f.queue.PushBack(vote.NewContext(id, initOpn))
f.queueSet[id] = struct{}{}
return nil
}
func (f *FPC) IntermediateOpinion(id string) (vote.Opinion, error) {
f.ctxsMu.RLock()
defer f.ctxsMu.RUnlock()
voteCtx, has := f.ctxs[id]
if !has {
return vote.Unknown, fmt.Errorf("%w: %s", vote.ErrVotingNotFound, id)
}
return voteCtx.LastOpinion(), nil
}
func (f *FPC) Events() vote.Events {
return f.events
}
// Round enqueues new items, sets opinions on active vote contexts, finalizes them and then
// queries for opinions.
func (f *FPC) Round(rand float64) error {
start := time.Now()
// enqueue new voting contexts
f.enqueue()
// we can only form opinions when the last round was actually executed successfully
if f.lastRoundCompletedSuccessfully {
// form opinions by using the random number supplied for this new round
f.formOpinions(rand)
// clean opinions on vote contexts where an opinion was reached in FinalizationThreshold
// number of rounds and clear those who failed to be finalized in MaxRoundsPerVoteContext.
f.finalizeOpinions()
}
// query for opinions on the current vote contexts
queriedOpinions, err := f.queryOpinions()
if err == nil {
f.lastRoundCompletedSuccessfully = true
// execute a round executed event
roundStats := &vote.RoundStats{
Duration: time.Since(start),
RandUsed: rand,
ActiveVoteContexts: f.ctxs,
QueriedOpinions: queriedOpinions,
}
// TODO: add possibility to check whether an event handler is registered
// in order to prevent the collection of the round stats data if not needed
f.events.RoundExecuted.Trigger(roundStats)
}
return err
}
// enqueues items for voting
func (f *FPC) enqueue() {
f.queueMu.Lock()
defer f.queueMu.Unlock()
f.ctxsMu.Lock()
defer f.ctxsMu.Unlock()
for ele := f.queue.Front(); ele != nil; ele = f.queue.Front() {
voteCtx := ele.Value.(*vote.Context)
f.ctxs[voteCtx.ID] = voteCtx
f.queue.Remove(ele)
delete(f.queueSet, voteCtx.ID)
}
}
// formOpinions updates the opinion for ongoing vote contexts by comparing their liked percentage
// against the threshold appropriate for their given rounds.
func (f *FPC) formOpinions(rand float64) {
f.ctxsMu.RLock()
defer f.ctxsMu.RUnlock()
for _, voteCtx := range f.ctxs {
// when the vote context is new there's no opinion to form
if voteCtx.IsNew() {
continue
}
lowerThreshold := f.paras.SubsequentRoundsLowerBoundThreshold
upperThreshold := f.paras.SubsequentRoundsUpperBoundThreshold
if voteCtx.HadFirstRound() {
lowerThreshold = f.paras.FirstRoundLowerBoundThreshold
upperThreshold = f.paras.FirstRoundUpperBoundThreshold
}
if voteCtx.Liked >= RandUniformThreshold(rand, lowerThreshold, upperThreshold) {
voteCtx.AddOpinion(vote.Like)
continue
}
voteCtx.AddOpinion(vote.Dislike)
}
}
// emits a Voted event for every finalized vote context (or Failed event if failed) and then removes it from FPC.
func (f *FPC) finalizeOpinions() {
f.ctxsMu.Lock()
defer f.ctxsMu.Unlock()
for id, voteCtx := range f.ctxs {
if voteCtx.IsFinalized(f.paras.CoolingOffPeriod, f.paras.FinalizationThreshold) {
f.events.Finalized.Trigger(id, voteCtx.LastOpinion())
delete(f.ctxs, id)
continue
}
if voteCtx.Rounds >= f.paras.MaxRoundsPerVoteContext {
f.events.Failed.Trigger(id, voteCtx.LastOpinion())
delete(f.ctxs, id)
}
}
}
// queries the opinions of QuerySampleSize amount of OpinionGivers.
func (f *FPC) queryOpinions() ([]vote.QueriedOpinions, error) {
ids := f.voteContextIDs()
// nothing to vote on
if len(ids) == 0 {
return nil, nil
}
opinionGivers, err := f.opinionGiverFunc()
if err != nil {
return nil, err
}
// nobody to query
if len(opinionGivers) == 0 {
return nil, ErrNoOpinionGiversAvailable
}
// select a random subset of opinion givers to query.
// if the same opinion giver is selected multiple times, we query it only once
// but use its opinion N selected times.
opinionGiversToQuery := map[vote.OpinionGiver]int{}
for i := 0; i < f.paras.QuerySampleSize; i++ {
selected := opinionGivers[f.opinionGiverRng.Intn(len(opinionGivers))]
opinionGiversToQuery[selected]++
}
// votes per id
var voteMapMu sync.Mutex
voteMap := map[string]vote.Opinions{}
// holds queried opinions
allQueriedOpinions := []vote.QueriedOpinions{}
// send queries
var wg sync.WaitGroup
for opinionGiverToQuery, selectedCount := range opinionGiversToQuery {
wg.Add(1)
go func(opinionGiverToQuery vote.OpinionGiver, selectedCount int) {
defer wg.Done()
queryCtx, cancel := context.WithTimeout(context.Background(), f.paras.QueryTimeout)
defer cancel()
// query
opinions, err := opinionGiverToQuery.Query(queryCtx, ids)
if err != nil || len(opinions) != len(ids) {
// ignore opinions
return
}
queriedOpinions := vote.QueriedOpinions{
OpinionGiverID: opinionGiverToQuery.ID(),
Opinions: make(map[string]vote.Opinion),
TimesCounted: selectedCount,
}
// add opinions to vote map
voteMapMu.Lock()
defer voteMapMu.Unlock()
for i, id := range ids {
votes, has := voteMap[id]
if !has {
votes = vote.Opinions{}
}
// reuse the opinion N times selected.
// note this is always at least 1.
for j := 0; j < selectedCount; j++ {
votes = append(votes, opinions[i])
}
queriedOpinions.Opinions[id] = opinions[i]
voteMap[id] = votes
}
allQueriedOpinions = append(allQueriedOpinions, queriedOpinions)
}(opinionGiverToQuery, selectedCount)
}
wg.Wait()
f.ctxsMu.RLock()
defer f.ctxsMu.RUnlock()
// compute liked percentage
for id, votes := range voteMap {
var likedSum float64
votedCount := float64(len(votes))
for _, opinion := range votes {
switch opinion {
case vote.Unknown:
votedCount--
case vote.Like:
likedSum++
}
}
// mark a round being done, even though there's no opinion,
// so this voting context will be cleared eventually
f.ctxs[id].Rounds++
if votedCount == 0 {
continue
}
f.ctxs[id].Liked = likedSum / votedCount
}
return allQueriedOpinions, nil
}
func (f *FPC) voteContextIDs() []string {
f.ctxsMu.RLock()
defer f.ctxsMu.RUnlock()
var i int
ids := make([]string, len(f.ctxs))
for id := range f.ctxs {
ids[i] = id
i++
}
return ids
}
package fpc_test
import (
"context"
"errors"
"testing"
"github.com/iotaledger/goshimmer/packages/vote"
"github.com/iotaledger/goshimmer/packages/vote/fpc"
"github.com/iotaledger/hive.go/events"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestVoteContext_IsFinalized(t *testing.T) {
type testInput struct {
voteCtx vote.Context
coolOffPeriod int
finalizationThreshold int
want bool
}
var tests = []testInput{
{vote.Context{
Opinions: []vote.Opinion{vote.Like, vote.Like, vote.Like, vote.Like, vote.Like},
}, 2, 2, true},
{vote.Context{
Opinions: []vote.Opinion{vote.Like, vote.Like, vote.Like, vote.Like, vote.Dislike},
}, 2, 2, false},
}
for _, test := range tests {
assert.Equal(t, test.want, test.voteCtx.IsFinalized(test.coolOffPeriod, test.finalizationThreshold))
}
}
func TestVoteContext_LastOpinion(t *testing.T) {
type testInput struct {
voteCtx vote.Context
expected vote.Opinion
}
var tests = []testInput{
{vote.Context{
Opinions: []vote.Opinion{vote.Like, vote.Like, vote.Like, vote.Like},
}, vote.Like},
{vote.Context{
Opinions: []vote.Opinion{vote.Like, vote.Like, vote.Like, vote.Dislike},
}, vote.Dislike},
}
for _, test := range tests {
assert.Equal(t, test.expected, test.voteCtx.LastOpinion())
}
}
func TestFPCPreventSameIDMultipleTimes(t *testing.T) {
voter := fpc.New(nil)
assert.NoError(t, voter.Vote("a", vote.Like))
// can't add the same item twice
assert.True(t, errors.Is(voter.Vote("a", vote.Like), fpc.ErrVoteAlreadyOngoing))
}
type opiniongivermock struct {
roundsReplies []vote.Opinions
roundIndex int
}
func (ogm *opiniongivermock) ID() string {
return ""
}
func (ogm *opiniongivermock) Query(_ context.Context, _ []string) (vote.Opinions, error) {
if ogm.roundIndex >= len(ogm.roundsReplies) {
return ogm.roundsReplies[len(ogm.roundsReplies)-1], nil
}
opinions := ogm.roundsReplies[ogm.roundIndex]
ogm.roundIndex++
return opinions, nil
}
func TestFPCFinalizedEvent(t *testing.T) {
opinionGiverMock := &opiniongivermock{
roundsReplies: []vote.Opinions{
// 2 cool-off period, 2 finalization threshold
{vote.Like}, {vote.Like}, {vote.Like}, {vote.Like},
},
}
opinionGiverFunc := func() (givers []vote.OpinionGiver, err error) {
return []vote.OpinionGiver{opinionGiverMock}, nil
}
id := "a"
paras := fpc.DefaultParameters()
paras.FinalizationThreshold = 2
paras.CoolingOffPeriod = 2
paras.QuerySampleSize = 1
voter := fpc.New(opinionGiverFunc, paras)
var finalizedOpinion *vote.Opinion
voter.Events().Finalized.Attach(events.NewClosure(func(id string, opinion vote.Opinion) {
finalizedOpinion = &opinion
}))
assert.NoError(t, voter.Vote(id, vote.Like))
// do 5 rounds of FPC -> 5 because the last one finalizes the vote
for i := 0; i < 5; i++ {
assert.NoError(t, voter.Round(0.5))
}
require.NotNil(t, finalizedOpinion, "finalized event should have been fired")
assert.Equal(t, vote.Like, *finalizedOpinion, "the final opinion should have been 'Like'")
}
func TestFPCFailedEvent(t *testing.T) {
opinionGiverFunc := func() (givers []vote.OpinionGiver, err error) {
return []vote.OpinionGiver{&opiniongivermock{
// doesn't matter what we set here
roundsReplies: []vote.Opinions{{vote.Dislike}},
}}, nil
}
id := "a"
paras := fpc.DefaultParameters()
paras.QuerySampleSize = 1
paras.MaxRoundsPerVoteContext = 3
paras.CoolingOffPeriod = 0
// since the finalization threshold is over max rounds it will
// always fail finalizing an opinion
paras.FinalizationThreshold = 4
voter := fpc.New(opinionGiverFunc, paras)
var failedOpinion *vote.Opinion
voter.Events().Failed.Attach(events.NewClosure(func(id string, opinion vote.Opinion) {
failedOpinion = &opinion
}))
assert.NoError(t, voter.Vote(id, vote.Like))
for i := 0; i < 4; i++ {
assert.NoError(t, voter.Round(0.5))
}
require.NotNil(t, failedOpinion, "failed event should have been fired")
assert.Equal(t, vote.Dislike, *failedOpinion, "the final opinion should have been 'Dislike'")
}
func TestFPCVotingMultipleOpinionGivers(t *testing.T) {
type testInput struct {
id string
initOpinion vote.Opinion
expectedRoundsDone int
expectedOpinion vote.Opinion
}
var tests = []testInput{
{"1", vote.Like, 5, vote.Like},
{"2", vote.Dislike, 5, vote.Dislike},
}
for _, test := range tests {
// note that even though we're defining QuerySampleSize times opinion givers,
// it doesn't mean that FPC will query all of them.
opinionGiverFunc := func() (givers []vote.OpinionGiver, err error) {
opinionGivers := make([]vote.OpinionGiver, fpc.DefaultParameters().QuerySampleSize)
for i := 0; i < len(opinionGivers); i++ {
opinionGivers[i] = &opiniongivermock{roundsReplies: []vote.Opinions{{test.initOpinion}}}
}
return opinionGivers, nil
}
paras := fpc.DefaultParameters()
paras.FinalizationThreshold = 2
paras.CoolingOffPeriod = 2
voter := fpc.New(opinionGiverFunc, paras)
var finalOpinion *vote.Opinion
voter.Events().Finalized.Attach(events.NewClosure(func(id string, finalizedOpinion vote.Opinion) {
finalOpinion = &finalizedOpinion
}))
assert.NoError(t, voter.Vote(test.id, test.initOpinion))
var roundsDone int
for finalOpinion == nil {
assert.NoError(t, voter.Round(0.7))
roundsDone++
}
assert.Equal(t, test.expectedRoundsDone, roundsDone)
require.NotNil(t, finalOpinion)
assert.Equal(t, test.expectedOpinion, *finalOpinion)
}
}
package fpc
import "time"
// Parameters define the parameters of an FPC instance.
type Parameters struct {
// The lower bound liked percentage threshold at the first round. Also called 'a'.
FirstRoundLowerBoundThreshold float64
// The upper bound liked percentage threshold at the first round. Also called 'b'.
FirstRoundUpperBoundThreshold float64
// The lower bound liked percentage threshold used after the first round.
SubsequentRoundsLowerBoundThreshold float64
// The upper bound liked percentage threshold used after the first round.
SubsequentRoundsUpperBoundThreshold float64
// The amount of opinions to query on each round for a given vote context. Also called 'k'.
QuerySampleSize int
// The amount of rounds a vote context's opinion needs to stay the same to be considered final. Also called 'l'.
FinalizationThreshold int
// The amount of rounds for which to ignore any finalization checks for. Also called 'm'.
CoolingOffPeriod int
// The max amount of rounds to execute per vote context before aborting them.
MaxRoundsPerVoteContext int
// The max amount of time a query is allowed to take.
QueryTimeout time.Duration
}
// DefaultParameters returns the default parameters used in FPC.
func DefaultParameters() *Parameters {
return &Parameters{
FirstRoundLowerBoundThreshold: 0.75,
FirstRoundUpperBoundThreshold: 0.75,
SubsequentRoundsLowerBoundThreshold: 0.50,
SubsequentRoundsUpperBoundThreshold: 0.67,
QuerySampleSize: 21,
FinalizationThreshold: 10,
CoolingOffPeriod: 0,
MaxRoundsPerVoteContext: 100,
QueryTimeout: 1500 * time.Millisecond,
}
}
// RandUniformThreshold returns random threshold between the given lower/upper bound values.
func RandUniformThreshold(rand float64, thresholdLowerBound float64, thresholdUpperBound float64) float64 {
return thresholdLowerBound + rand*(thresholdUpperBound-thresholdLowerBound)
}
// Code generated by protoc-gen-go. DO NOT EDIT.
// source: query.proto
package net
import (
context "context"
fmt "fmt"
proto "github.com/golang/protobuf/proto"
grpc "google.golang.org/grpc"
codes "google.golang.org/grpc/codes"
status "google.golang.org/grpc/status"
math "math"
)
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package
type QueryRequest struct {
Id []string `protobuf:"bytes,1,rep,name=id,proto3" json:"id,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *QueryRequest) Reset() { *m = QueryRequest{} }
func (m *QueryRequest) String() string { return proto.CompactTextString(m) }
func (*QueryRequest) ProtoMessage() {}
func (*QueryRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_5c6ac9b241082464, []int{0}
}
func (m *QueryRequest) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_QueryRequest.Unmarshal(m, b)
}
func (m *QueryRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_QueryRequest.Marshal(b, m, deterministic)
}
func (m *QueryRequest) XXX_Merge(src proto.Message) {
xxx_messageInfo_QueryRequest.Merge(m, src)
}
func (m *QueryRequest) XXX_Size() int {
return xxx_messageInfo_QueryRequest.Size(m)
}
func (m *QueryRequest) XXX_DiscardUnknown() {
xxx_messageInfo_QueryRequest.DiscardUnknown(m)
}
var xxx_messageInfo_QueryRequest proto.InternalMessageInfo
func (m *QueryRequest) GetId() []string {
if m != nil {
return m.Id
}
return nil
}
type QueryReply struct {
Opinion []int32 `protobuf:"varint,1,rep,packed,name=opinion,proto3" json:"opinion,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *QueryReply) Reset() { *m = QueryReply{} }
func (m *QueryReply) String() string { return proto.CompactTextString(m) }
func (*QueryReply) ProtoMessage() {}
func (*QueryReply) Descriptor() ([]byte, []int) {
return fileDescriptor_5c6ac9b241082464, []int{1}
}
func (m *QueryReply) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_QueryReply.Unmarshal(m, b)
}
func (m *QueryReply) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_QueryReply.Marshal(b, m, deterministic)
}
func (m *QueryReply) XXX_Merge(src proto.Message) {
xxx_messageInfo_QueryReply.Merge(m, src)
}
func (m *QueryReply) XXX_Size() int {
return xxx_messageInfo_QueryReply.Size(m)
}
func (m *QueryReply) XXX_DiscardUnknown() {
xxx_messageInfo_QueryReply.DiscardUnknown(m)
}
var xxx_messageInfo_QueryReply proto.InternalMessageInfo
func (m *QueryReply) GetOpinion() []int32 {
if m != nil {
return m.Opinion
}
return nil
}
func init() {
proto.RegisterType((*QueryRequest)(nil), "net.QueryRequest")
proto.RegisterType((*QueryReply)(nil), "net.QueryReply")
}
func init() {
proto.RegisterFile("query.proto", fileDescriptor_5c6ac9b241082464)
}
var fileDescriptor_5c6ac9b241082464 = []byte{
// 148 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0x2e, 0x2c, 0x4d, 0x2d,
0xaa, 0xd4, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x62, 0xce, 0x4b, 0x2d, 0x51, 0x92, 0xe3, 0xe2,
0x09, 0x04, 0x89, 0x05, 0xa5, 0x16, 0x96, 0xa6, 0x16, 0x97, 0x08, 0xf1, 0x71, 0x31, 0x65, 0xa6,
0x48, 0x30, 0x2a, 0x30, 0x6b, 0x70, 0x06, 0x31, 0x65, 0xa6, 0x28, 0xa9, 0x71, 0x71, 0x41, 0xe5,
0x0b, 0x72, 0x2a, 0x85, 0x24, 0xb8, 0xd8, 0xf3, 0x0b, 0x32, 0xf3, 0x32, 0xf3, 0xf3, 0xc0, 0x4a,
0x58, 0x83, 0x60, 0x5c, 0x23, 0x5b, 0x2e, 0xae, 0xb0, 0xfc, 0x92, 0xd4, 0x22, 0xb0, 0x62, 0x21,
0x7d, 0x2e, 0x76, 0x7f, 0x88, 0x84, 0x90, 0xa0, 0x5e, 0x5e, 0x6a, 0x89, 0x1e, 0xb2, 0x1d, 0x52,
0xfc, 0xc8, 0x42, 0x05, 0x39, 0x95, 0x4a, 0x0c, 0x4e, 0xec, 0x51, 0xac, 0x7a, 0xd6, 0x79, 0xa9,
0x25, 0x49, 0x6c, 0x60, 0xb7, 0x19, 0x03, 0x02, 0x00, 0x00, 0xff, 0xff, 0xb4, 0x93, 0xeb, 0x61,
0xaa, 0x00, 0x00, 0x00,
}
// Reference imports to suppress errors if they are not otherwise used.
var _ context.Context
var _ grpc.ClientConnInterface
// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
const _ = grpc.SupportPackageIsVersion6
// VoterQueryClient is the client API for VoterQuery service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream.
type VoterQueryClient interface {
Opinion(ctx context.Context, in *QueryRequest, opts ...grpc.CallOption) (*QueryReply, error)
}
type voterQueryClient struct {
cc grpc.ClientConnInterface
}
func NewVoterQueryClient(cc grpc.ClientConnInterface) VoterQueryClient {
return &voterQueryClient{cc}
}
func (c *voterQueryClient) Opinion(ctx context.Context, in *QueryRequest, opts ...grpc.CallOption) (*QueryReply, error) {
out := new(QueryReply)
err := c.cc.Invoke(ctx, "/net.VoterQuery/Opinion", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
// VoterQueryServer is the server API for VoterQuery service.
type VoterQueryServer interface {
Opinion(context.Context, *QueryRequest) (*QueryReply, error)
}
// UnimplementedVoterQueryServer can be embedded to have forward compatible implementations.
type UnimplementedVoterQueryServer struct {
}
func (*UnimplementedVoterQueryServer) Opinion(ctx context.Context, req *QueryRequest) (*QueryReply, error) {
return nil, status.Errorf(codes.Unimplemented, "method Opinion not implemented")
}
func RegisterVoterQueryServer(s *grpc.Server, srv VoterQueryServer) {
s.RegisterService(&_VoterQuery_serviceDesc, srv)
}
func _VoterQuery_Opinion_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(QueryRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(VoterQueryServer).Opinion(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/net.VoterQuery/Opinion",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(VoterQueryServer).Opinion(ctx, req.(*QueryRequest))
}
return interceptor(ctx, in, info, handler)
}
var _VoterQuery_serviceDesc = grpc.ServiceDesc{
ServiceName: "net.VoterQuery",
HandlerType: (*VoterQueryServer)(nil),
Methods: []grpc.MethodDesc{
{
MethodName: "Opinion",
Handler: _VoterQuery_Opinion_Handler,
},
},
Streams: []grpc.StreamDesc{},
Metadata: "query.proto",
}
syntax = "proto3";
package net;
option go_package = ".;net";
service VoterQuery {
rpc Opinion (QueryRequest) returns (QueryReply) {}
}
message QueryRequest {
repeated string id = 1;
}
message QueryReply {
repeated int32 opinion = 1;
}
\ No newline at end of file
package net
import (
"context"
"net"
"github.com/iotaledger/goshimmer/packages/vote"
"google.golang.org/grpc"
)
// OpinionRetriever retrieves the opinion for the given ID.
// If there's no opinion, the function should return Unknown.
type OpinionRetriever func(id string) vote.Opinion
// New creates a new VoterServer.
func New(voter vote.Voter, opnRetriever OpinionRetriever, bindAddr string) *VoterServer {
vs := &VoterServer{
voter: voter,
opnRetriever: opnRetriever,
bindAddr: bindAddr,
}
return vs
}
// VoterServer is a server which responds to opinion queries.
type VoterServer struct {
voter vote.Voter
opnRetriever OpinionRetriever
bindAddr string
grpcServer *grpc.Server
}
func (vs *VoterServer) Opinion(ctx context.Context, req *QueryRequest) (*QueryReply, error) {
reply := &QueryReply{
Opinion: make([]int32, len(req.Id)),
}
for i, id := range req.Id {
// check whether there's an ongoing vote
opinion, err := vs.voter.IntermediateOpinion(id)
if err == nil {
reply.Opinion[i] = int32(opinion)
continue
}
reply.Opinion[i] = int32(vs.opnRetriever(id))
}
return reply, nil
}
func (vs *VoterServer) Run() error {
listener, err := net.Listen("tcp", vs.bindAddr)
if err != nil {
return err
}
grpcServer := grpc.NewServer()
RegisterVoterQueryServer(grpcServer, vs)
return grpcServer.Serve(listener)
}
func (vs *VoterServer) Shutdown() {
vs.grpcServer.GracefulStop()
}
package vote
import (
"context"
)
// OpinionGiver gives opinions about the given IDs.
type OpinionGiver interface {
// Query queries the OpinionGiver for its opinions on the given IDs.
// The passed in context can be used to signal cancellation of the query.
Query(ctx context.Context, ids []string) (Opinions, error)
// ID returns the ID of the opinion giver.
ID() string
}
// QueriedOpinions represents queried opinions from a given opinion giver.
type QueriedOpinions struct {
// The ID of the opinion giver.
OpinionGiverID string `json:"opinion_giver_id"`
// The map of IDs to opinions.
Opinions map[string]Opinion `json:"opinions"`
// The amount of times the opinion giver's opinion has counted.
// Usually this number is 1 but due to randomization of the queried opinion givers,
// the same opinion giver's opinions might be taken into account multiple times.
TimesCounted int `json:"times_counted"`
}
// OpinionGiverFunc is a function which gives a slice of OpinionGivers or an error.
type OpinionGiverFunc func() ([]OpinionGiver, error)
// Opinions is a slice of Opinion.
type Opinions []Opinion
// Opinion is an opinion about a given thing.
type Opinion byte
const (
Like Opinion = 1 << 0
Dislike Opinion = 1 << 1
Unknown Opinion = 1 << 2
)
// ConvertInt32Opinion converts the given int32 to an Opinion.
func ConvertInt32Opinion(x int32) Opinion {
switch {
case x == 1<<0:
return Like
case x == 1<<1:
return Dislike
}
return Unknown
}
package vote
// NewContext creates a new vote context.
func NewContext(id string, initOpn Opinion) *Context {
voteCtx := &Context{ID: id, Liked: likedInit}
voteCtx.AddOpinion(initOpn)
return voteCtx
}
const likedInit = -1
// Context is the context of votes from multiple rounds about a given item.
type Context struct {
ID string
// The percentage of OpinionGivers who liked this item on the last query.
Liked float64
// The number of voting rounds performed.
Rounds int
// Append-only list of opinions formed after each round.
// the first opinion is the initial opinion when this vote context was created.
Opinions []Opinion
}
// AddOpinion adds the given opinion to this vote context.
func (vc *Context) AddOpinion(opn Opinion) {
vc.Opinions = append(vc.Opinions, opn)
}
// LastOpinion returns the last formed opinion.
func (vc *Context) LastOpinion() Opinion {
return vc.Opinions[len(vc.Opinions)-1]
}
// IsFinalized tells whether this vote context is finalized by checking whether the opinion was held
// for finalizationThreshold number of rounds.
func (vc *Context) IsFinalized(coolingOffPeriod int, finalizationThreshold int) bool {
// check whether we have enough opinions to say whether this vote context is finalized.
// we start from the 2nd opinion since the first one is the initial opinion.
if len(vc.Opinions[1:]) < coolingOffPeriod+finalizationThreshold {
return false
}
// grab opinion which needs to be held for finalizationThreshold number of rounds
candidateOpinion := vc.Opinions[len(vc.Opinions)-finalizationThreshold]
// check whether it was held for the subsequent rounds
for _, subsequentOpinion := range vc.Opinions[len(vc.Opinions)-finalizationThreshold+1:] {
if subsequentOpinion != candidateOpinion {
return false
}
}
return true
}
// IsNew tells whether the vote context is new.
func (vc *Context) IsNew() bool {
return vc.Liked == likedInit
}
// HadFirstRound tells whether the vote context just had its first round.
func (vc *Context) HadFirstRound() bool {
return vc.Rounds == 1
}
package vote
import (
"errors"
"time"
"github.com/iotaledger/hive.go/events"
)
var (
// ErrVotingNotFound is returned when a voting for a given id wasn't found.
ErrVotingNotFound = errors.New("no voting found")
)
// Voter votes on hashes.
type Voter interface {
// Vote submits the given ID for voting with its initial Opinion.
Vote(id string, initOpn Opinion) error
// IntermediateOpinion gets intermediate Opinion about the given ID.
IntermediateOpinion(id string) (Opinion, error)
// Events returns the Events instance of the given Voter.
Events() Events
}
// DRNGRoundBasedVoter is a Voter which votes in rounds and uses random numbers which
// were generated in a decentralized fashion.
type DRNGRoundBasedVoter interface {
Voter
// Round starts a new round.
Round(rand float64) error
}
// Events defines events which happen on a Voter.
type Events struct {
// Fired when an Opinion has been finalized.
Finalized *events.Event
// Fired when an Opinion couldn't be finalized.
Failed *events.Event
// Fired when a DRNGRoundBasedVoter has executed a round.
RoundExecuted *events.Event
// Fired when internal errors occur.
Error *events.Event
}
// RoundStats encapsulates data about an executed round.
type RoundStats struct {
// The time it took to complete a round.
Duration time.Duration `json:"duration"`
// The rand number used during the round.
RandUsed float64 `json:"rand_used"`
// The vote contexts on which opinions were formed and queried.
// This list does not include the vote contexts which were finalized/aborted
// during the execution of the round.
// Create a copy of this map if you need to modify any of its elements.
ActiveVoteContexts map[string]*Context `json:"active_vote_contexts"`
// The opinions which were queried during the round per opinion giver.
QueriedOpinions []QueriedOpinions `json:"queried_opinions"`
}
// OpinionCaller calls the given handler with an Opinion and its associated Id.
func OpinionCaller(handler interface{}, params ...interface{}) {
handler.(func(id string, opinion Opinion))(params[0].(string), params[1].(Opinion))
}
// RoundStats calls the given handler with a RoundStats.
func RoundStatsCaller(handler interface{}, params ...interface{}) {
handler.(func(stats *RoundStats))(params[0].(*RoundStats))
}
package fpc
import (
flag "github.com/spf13/pflag"
)
const (
CfgFPCQuerySampleSize = "fpc.querySampleSize"
CfgFPCRoundInterval = "fpc.roundInterval"
CfgFPCBindAddress = "fpc.bindAddress"
)
func init() {
flag.Int(CfgFPCQuerySampleSize, 3, "Size of the voting quorum (k)")
flag.Int(CfgFPCRoundInterval, 5, "FPC round interval [s]")
flag.String(CfgFPCBindAddress, "0.0.0.0:10895", "the bind address on which the FPC vote server binds to")
}
package fpc
import (
"context"
"fmt"
"net"
"strconv"
"sync"
"github.com/iotaledger/goshimmer/packages/prng"
"github.com/iotaledger/hive.go/events"
"google.golang.org/grpc"
"github.com/iotaledger/goshimmer/packages/shutdown"
"github.com/iotaledger/goshimmer/packages/vote"
"github.com/iotaledger/goshimmer/packages/vote/fpc"
votenet "github.com/iotaledger/goshimmer/packages/vote/net"
"github.com/iotaledger/goshimmer/plugins/autopeering"
"github.com/iotaledger/goshimmer/plugins/autopeering/local"
"github.com/iotaledger/goshimmer/plugins/config"
"github.com/iotaledger/hive.go/autopeering/peer"
"github.com/iotaledger/hive.go/autopeering/peer/service"
"github.com/iotaledger/hive.go/daemon"
"github.com/iotaledger/hive.go/logger"
"github.com/iotaledger/hive.go/node"
)
const name = "FPC"
var (
PLUGIN = node.NewPlugin(name, node.Enabled, configure, run)
log *logger.Logger
voter *fpc.FPC
voterOnce sync.Once
voterServer *votenet.VoterServer
roundIntervalSeconds int64 = 5
)
// Voter returns the DRNGRoundBasedVoter instance used by the FPC plugin.
func Voter() vote.DRNGRoundBasedVoter {
voterOnce.Do(func() {
// create a function which gets OpinionGivers
opinionGiverFunc := func() (givers []vote.OpinionGiver, err error) {
opinionGivers := make([]vote.OpinionGiver, 0)
for _, p := range autopeering.Discovery.GetVerifiedPeers() {
fpcService := p.Services().Get(service.FPCKey)
if fpcService == nil {
continue
}
// TODO: maybe cache the PeerOpinionGiver instead of creating a new one every time
opinionGivers = append(opinionGivers, &PeerOpinionGiver{p: p})
}
return opinionGivers, nil
}
voter = fpc.New(opinionGiverFunc)
})
return voter
}
func configure(_ *node.Plugin) {
log = logger.NewLogger(name)
lPeer := local.GetInstance()
bindAddr := config.Node.GetString(CfgFPCBindAddress)
_, portStr, err := net.SplitHostPort(bindAddr)
if err != nil {
log.Fatalf("FPC bind address '%s' is invalid: %s", bindAddr, err)
}
port, err := strconv.Atoi(portStr)
if err != nil {
log.Fatalf("FPC bind address '%s' is invalid: %s", bindAddr, err)
}
if err := lPeer.UpdateService(service.FPCKey, "tcp", port); err != nil {
log.Fatalf("could not update services: %v", err)
}
voter.Events().RoundExecuted.Attach(events.NewClosure(func(roundStats *vote.RoundStats) {
peersQueried := len(roundStats.QueriedOpinions)
voteContextsCount := len(roundStats.ActiveVoteContexts)
log.Infof("executed round with rand %0.4f for %d vote contexts on %d peers, took %v", roundStats.RandUsed, voteContextsCount, peersQueried, roundStats.Duration)
}))
}
func run(_ *node.Plugin) {
daemon.BackgroundWorker("FPCVoterServer", func(shutdownSignal <-chan struct{}) {
voterServer = votenet.New(Voter(), func(id string) vote.Opinion {
// TODO: replace with persistence layer call
return vote.Unknown
}, config.Node.GetString(CfgFPCBindAddress))
go func() {
if err := voterServer.Run(); err != nil {
log.Error(err)
}
}()
log.Infof("Started vote server on %s", config.Node.GetString(CfgFPCBindAddress))
<-shutdownSignal
voterServer.Shutdown()
log.Info("Stopped vote server")
}, shutdown.ShutdownPriorityFPC)
daemon.BackgroundWorker("FPCRoundsInitiator", func(shutdownSignal <-chan struct{}) {
log.Infof("Started FPC round initiator")
unixTsPRNG := prng.NewUnixTimestampPRNG(roundIntervalSeconds)
defer unixTsPRNG.Stop()
exit:
for {
select {
case r := <-unixTsPRNG.C():
if err := voter.Round(r); err != nil {
log.Errorf("unable to execute FPC round: %s", err)
}
case <-shutdownSignal:
break exit
}
}
log.Infof("Stopped FPC round initiator")
}, shutdown.ShutdownPriorityFPC)
}
// PeerOpinionGiver implements the OpinionGiver interface based on a peer.
type PeerOpinionGiver struct {
p *peer.Peer
}
func (pog *PeerOpinionGiver) Query(ctx context.Context, ids []string) (vote.Opinions, error) {
fpcServicePort := pog.p.Services().Get(service.FPCKey).Port()
fpcAddr := net.JoinHostPort(pog.p.IP().String(), strconv.Itoa(fpcServicePort))
var opts []grpc.DialOption
opts = append(opts, grpc.WithInsecure())
// connect to the FPC service
conn, err := grpc.Dial(fpcAddr, opts...)
if err != nil {
return nil, fmt.Errorf("unable to connect to FPC service: %w", err)
}
defer conn.Close()
client := votenet.NewVoterQueryClient(conn)
reply, err := client.Opinion(ctx, &votenet.QueryRequest{Id: ids})
if err != nil {
return nil, fmt.Errorf("unable to query opinions: %w", err)
}
// convert int32s in reply to opinions
opinions := make(vote.Opinions, len(reply.Opinion))
for i, intOpn := range reply.Opinion {
opinions[i] = vote.ConvertInt32Opinion(intOpn)
}
return opinions, nil
}
func (pog *PeerOpinionGiver) ID() string {
return pog.p.ID().String()
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please to comment