From efafcd7b8effe1e1e3a6d2efe7c5734f3f2a87af Mon Sep 17 00:00:00 2001 From: Luca Moser <moser.luca@gmail.com> Date: Tue, 14 Apr 2020 13:41:29 +0200 Subject: [PATCH] refactors FPC --- go.mod | 1 + go.sum | 7 + packages/prng/unix_ts_rng.go | 91 ++++++++++ packages/prng/unix_ts_rng_test.go | 30 ++++ packages/shutdown/order.go | 1 + packages/vote/fpc/fpc.go | 267 ++++++++++++++++++++++++++++++ packages/vote/fpc/fpc_test.go | 180 ++++++++++++++++++++ packages/vote/fpc/parameters.go | 42 +++++ packages/vote/fpc/vote_context.go | 63 +++++++ packages/vote/net/query.pb.go | 206 +++++++++++++++++++++++ packages/vote/net/query.proto | 17 ++ packages/vote/net/server.go | 64 +++++++ packages/vote/opinion.go | 44 +++++ packages/vote/voter.go | 45 +++++ plugins/fpc/parameters.go | 17 ++ plugins/fpc/plugin.go | 146 ++++++++++++++++ 16 files changed, 1221 insertions(+) create mode 100644 packages/prng/unix_ts_rng.go create mode 100644 packages/prng/unix_ts_rng_test.go create mode 100644 packages/vote/fpc/fpc.go create mode 100644 packages/vote/fpc/fpc_test.go create mode 100644 packages/vote/fpc/parameters.go create mode 100644 packages/vote/fpc/vote_context.go create mode 100644 packages/vote/net/query.pb.go create mode 100644 packages/vote/net/query.proto create mode 100644 packages/vote/net/server.go create mode 100644 packages/vote/opinion.go create mode 100644 packages/vote/voter.go create mode 100644 plugins/fpc/parameters.go create mode 100644 plugins/fpc/plugin.go diff --git a/go.mod b/go.mod index a080b8f1..6b55bdb4 100644 --- a/go.mod +++ b/go.mod @@ -30,5 +30,6 @@ require ( golang.org/x/crypto v0.0.0-20200302210943-78000ba7a073 golang.org/x/net v0.0.0-20200301022130-244492dfa37a golang.org/x/tools v0.0.0-20200330040139-fa3cc9eebcfe // indirect + google.golang.org/grpc v1.28.1 gopkg.in/src-d/go-git.v4 v4.13.1 ) diff --git a/go.sum b/go.sum index 667f1daa..b7b8038a 100644 --- a/go.sum +++ b/go.sum @@ -34,6 +34,7 @@ github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko= github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= +github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= github.com/coreos/bbolt v1.3.2/go.mod h1:iRUV2dpdMOn7Bo10OQBFzIJO9kkE559Wcmn+qkEiiKk= github.com/coreos/bbolt v1.3.3 h1:n6AiVyVRKQFNb6mJlwESEvvLoDyiTzXX7ORAUlkeBdY= github.com/coreos/bbolt v1.3.3/go.mod h1:iRUV2dpdMOn7Bo10OQBFzIJO9kkE559Wcmn+qkEiiKk= @@ -77,7 +78,9 @@ github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4 github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= github.com/emirpasic/gods v1.12.0 h1:QAUIPSaCu4G+POclxeqb3F+WPpdKqFGlw36+yOzGlrg= github.com/emirpasic/gods v1.12.0/go.mod h1:YfzfFFoVP/catgzJb4IKIqXjX78Ha8FMSDh3ymbK86o= +github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= +github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/flynn/go-shlex v0.0.0-20150515145356-3f9db97f8568 h1:BHsljHzVlRcyQhjrss6TZTdY2VfCqZPbv5k3iBFa2ZQ= github.com/flynn/go-shlex v0.0.0-20150515145356-3f9db97f8568/go.mod h1:xEzjJPgXI435gkrCt3MPfRiAkVrwSbHsst4LCFVfpJc= @@ -117,6 +120,7 @@ github.com/golang/mock v1.2.0/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfb github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.3.3/go.mod h1:vzj43D7+SQXF/4pzW/hwtAqwc6iTitCiVSaWz5lYuqw= github.com/golang/protobuf v1.3.4 h1:87PNWwrRvUSnqS4dlcBU/ftvOIBep4sYuBLlh6rX2wk= github.com/golang/protobuf v1.3.4/go.mod h1:vzj43D7+SQXF/4pzW/hwtAqwc6iTitCiVSaWz5lYuqw= github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4= @@ -519,8 +523,11 @@ google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZi google.golang.org/grpc v1.21.0/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM= google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= google.golang.org/grpc v1.24.0/go.mod h1:XDChyiUovWa60DnaeDeZmSW86xtLtjtZbwvSiRnRtcA= +google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY= google.golang.org/grpc v1.26.0 h1:2dTRdpdFEEhJYQD8EMLB61nnrzSCTbG38PhqdhvOltg= google.golang.org/grpc v1.26.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk= +google.golang.org/grpc v1.28.1 h1:C1QC6KzgSiLyBabDi87BbjaGreoRgGUF5nOyvfrAZ1k= +google.golang.org/grpc v1.28.1/go.mod h1:rpkK4SK4GF4Ach/+MFLZUBavHOvF2JJB5uozKKal+60= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/packages/prng/unix_ts_rng.go b/packages/prng/unix_ts_rng.go new file mode 100644 index 00000000..1cfcc683 --- /dev/null +++ b/packages/prng/unix_ts_rng.go @@ -0,0 +1,91 @@ +package prng + +import ( + "bytes" + "encoding/binary" + "time" +) + +// TimeSourceFunc is a function which gets an understanding of time in seconds resolution back. +type TimeSourceFunc func() int64 + +// NewUnixTimestampPRNG creates a new Unix timestamp based pseudo random number generator +// using the given resolution. The resolution defines at which second interval numbers are generated. +func NewUnixTimestampPRNG(resolution int64, timeSourceFunc ...TimeSourceFunc) *UnixTimestampPrng { + utrng := &UnixTimestampPrng{ + c: make(chan float64), + exit: make(chan struct{}), + resolution: resolution, + timeSourceFunc: func() int64 { return time.Now().Unix() }, + } + if len(timeSourceFunc) > 0 { + utrng.timeSourceFunc = timeSourceFunc[0] + } + return utrng +} + +// UnixTimestampPrng is a pseudo random number generator using the Unix time in seconds to derive +// a random number from. +type UnixTimestampPrng struct { + c chan float64 + exit chan struct{} + resolution int64 + timeSourceFunc TimeSourceFunc +} + +// Start starts the Unix timestamp pseudo random number generator by examining the +// interval and then starting production of numbers after at least interval seconds +// plus delta of the next resolution time have elapsed. +func (utrng *UnixTimestampPrng) Start() { + nowSec := utrng.timeSourceFunc() + nextTimePoint := ResolveNextTimePoint(nowSec, utrng.resolution) + time.AfterFunc(time.Duration(nextTimePoint-nowSec)*time.Second, func() { + // send for the first time right after the timer is executed + utrng.send() + + t := time.NewTicker(time.Duration(utrng.resolution) * time.Second) + defer t.Stop() + out: + for { + select { + case <-t.C: + utrng.send() + case <-utrng.exit: + break out + } + } + }) +} + +// sends the next pseudo random number to the consumer channel. +func (utrng *UnixTimestampPrng) send() { + now := utrng.timeSourceFunc() + // reduce to last resolution + timePoint := now - (now % utrng.resolution) + // convert to float64 + buf := bytes.NewBuffer(make([]byte, 0, 8)) + if err := binary.Write(buf, binary.LittleEndian, timePoint); err != nil { + panic(err) + } + pseudoR := float64(binary.BigEndian.Uint64(buf.Bytes()[:8])>>11) / (1 << 53) + // skip slow consumers + select { + case utrng.c <- pseudoR: + default: + } +} + +// C returns the channel from which random generated numbers can be consumed from. +func (utrng *UnixTimestampPrng) C() <-chan float64 { + return utrng.c +} + +// Stop stops the Unix timestamp pseudo random number generator. +func (utrng *UnixTimestampPrng) Stop() { + utrng.exit <- struct{}{} +} + +// ResolveNextTimePoint returns the next time point. +func ResolveNextTimePoint(nowSec int64, resolution int64) int64 { + return nowSec + (resolution - nowSec%resolution) +} diff --git a/packages/prng/unix_ts_rng_test.go b/packages/prng/unix_ts_rng_test.go new file mode 100644 index 00000000..4756f38b --- /dev/null +++ b/packages/prng/unix_ts_rng_test.go @@ -0,0 +1,30 @@ +package prng_test + +import ( + "testing" + + "github.com/iotaledger/goshimmer/packages/prng" + "github.com/stretchr/testify/assert" +) + +func TestResolveNextTimePoint(t *testing.T) { + assert.EqualValues(t, 105, prng.ResolveNextTimePoint(103, 5)) + assert.EqualValues(t, 110, prng.ResolveNextTimePoint(105, 5)) + assert.EqualValues(t, 105, prng.ResolveNextTimePoint(100, 5)) + assert.EqualValues(t, 100, prng.ResolveNextTimePoint(97, 5)) +} + +func TestUnixTsPrng(t *testing.T) { + unixTsRng := prng.NewUnixTimestampPRNG(1) + unixTsRng.Start() + defer unixTsRng.Stop() + + var last float64 + for i := 0; i < 3; i++ { + r := <-unixTsRng.C() + assert.Less(t, r, 1.0) + assert.Greater(t, r, 0.0) + assert.NotEqual(t, last, r) + last = r + } +} diff --git a/packages/shutdown/order.go b/packages/shutdown/order.go index 9815548b..c00add7e 100644 --- a/packages/shutdown/order.go +++ b/packages/shutdown/order.go @@ -2,6 +2,7 @@ package shutdown const ( ShutdownPriorityDatabase = iota + ShutdownPriorityFPC ShutdownPriorityTangle ShutdownPriorityRemoteLog ShutdownPriorityAnalysis diff --git a/packages/vote/fpc/fpc.go b/packages/vote/fpc/fpc.go new file mode 100644 index 00000000..37f06252 --- /dev/null +++ b/packages/vote/fpc/fpc.go @@ -0,0 +1,267 @@ +package fpc + +import ( + "container/list" + "context" + "errors" + "fmt" + "math/rand" + "sync" + "time" + + "github.com/iotaledger/goshimmer/packages/vote" + "github.com/iotaledger/hive.go/events" +) + +var ( + ErrVoteAlreadyOngoing = errors.New("a vote is already ongoing for the given ID") + ErrNoOpinionGiversAvailable = errors.New("can't perform round as no opinion givers are available") +) + +// New creates a new FPC instance. +func New(opinionGiverFunc vote.OpinionGiverFunc, paras ...*Parameters) *FPC { + f := &FPC{ + opinionGiverFunc: opinionGiverFunc, + paras: DefaultParameters(), + rng: rand.New(rand.NewSource(time.Now().UnixNano())), + ctxs: make(map[string]*VoteContext), + queue: list.New(), + queueSet: make(map[string]struct{}), + events: vote.Events{ + Finalized: events.NewEvent(vote.OpinionCaller), + Failed: events.NewEvent(vote.OpinionCaller), + Error: events.NewEvent(events.ErrorCaller), + }, + } + if len(paras) > 0 { + f.paras = paras[0] + } + return f +} + +// FPC is a DRNGRoundBasedVoter which uses the Opinion of other entities +// in order to finalize an Opinion. +type FPC struct { + events vote.Events + opinionGiverFunc vote.OpinionGiverFunc + // the lifo queue of newly enqueued items to vote on. + queue *list.List + // contains a set of currently queued items. + queueSet map[string]struct{} + queueMu sync.Mutex + // contains the set of current vote contexts. + ctxs map[string]*VoteContext + ctxsMu sync.RWMutex + // parameters to use within FPC. + paras *Parameters + // indicates whether the last round was performed successfully + lastRoundCompletedSuccessfully bool + rng *rand.Rand +} + +func (f *FPC) Vote(id string, initOpn vote.Opinion) error { + f.queueMu.Lock() + defer f.queueMu.Unlock() + f.ctxsMu.RLock() + defer f.ctxsMu.RUnlock() + if _, alreadyQueued := f.queueSet[id]; alreadyQueued { + return fmt.Errorf("%w: %s", ErrVoteAlreadyOngoing, id) + } + if _, alreadyOngoing := f.ctxs[id]; alreadyOngoing { + return fmt.Errorf("%w: %s", ErrVoteAlreadyOngoing, id) + } + f.queue.PushBack(newVoteContext(id, initOpn)) + f.queueSet[id] = struct{}{} + return nil +} + +func (f *FPC) IntermediateOpinion(id string) (vote.Opinion, error) { + f.ctxsMu.RLock() + defer f.ctxsMu.RUnlock() + voteCtx, has := f.ctxs[id] + if !has { + return vote.Unknown, fmt.Errorf("%w: %s", vote.ErrVotingNotFound, id) + } + return voteCtx.LastOpinion(), nil +} + +func (f *FPC) Events() vote.Events { + return f.events +} + +// Round enqueues new items, sets opinions on active vote contexts, finalizes them and then +// queries for opinions. +func (f *FPC) Round(rand float64) error { + // enqueue new voting contexts + f.enqueue() + // we can only form opinions when the last round was actually executed successfully + if f.lastRoundCompletedSuccessfully { + // form opinions by using the random number supplied for this new round + f.formOpinions(rand) + // clean opinions on vote contexts where an opinion was reached in FinalizationThreshold + // number of rounds and clear those who failed to be finalized in MaxRoundsPerVoteContext. + f.finalizeOpinions() + } + // query for opinions on the current vote contexts + err := f.queryOpinions() + f.lastRoundCompletedSuccessfully = err == nil + return err +} + +// enqueues items for voting +func (f *FPC) enqueue() { + f.queueMu.Lock() + defer f.queueMu.Unlock() + f.ctxsMu.Lock() + defer f.ctxsMu.Unlock() + for ele := f.queue.Front(); ele != nil; ele = f.queue.Front() { + voteCtx := ele.Value.(*VoteContext) + f.ctxs[voteCtx.id] = voteCtx + f.queue.Remove(ele) + delete(f.queueSet, voteCtx.id) + } +} + +// formOpinions updates the opinion for ongoing vote contexts by comparing their liked percentage +// against the threshold appropriate for their given rounds. +func (f *FPC) formOpinions(rand float64) { + f.ctxsMu.RLock() + defer f.ctxsMu.RUnlock() + for _, voteCtx := range f.ctxs { + // when the vote context is new there's no opinion to form + if voteCtx.IsNew() { + continue + } + + lowerThreshold := f.paras.SubsequentRoundsLowerBoundThreshold + upperThreshold := 1 - f.paras.SubsequentRoundsLowerBoundThreshold + + if voteCtx.HadFirstRound() { + lowerThreshold = f.paras.FirstRoundLowerBoundThreshold + upperThreshold = f.paras.FirstRoundUpperBoundThreshold + } + + if voteCtx.Liked >= RandUniformThreshold(rand, lowerThreshold, upperThreshold) { + voteCtx.AddOpinion(vote.Like) + continue + } + voteCtx.AddOpinion(vote.Dislike) + } +} + +// emits a Voted event for every finalized vote context (or Failed event if failed) and then removes it from FPC. +func (f *FPC) finalizeOpinions() { + f.ctxsMu.Lock() + defer f.ctxsMu.Unlock() + for id, voteCtx := range f.ctxs { + if voteCtx.IsFinalized(f.paras.CoolingOffPeriod, f.paras.FinalizationThreshold) { + f.events.Finalized.Trigger(id, voteCtx.LastOpinion()) + delete(f.ctxs, id) + } + if voteCtx.Rounds >= f.paras.MaxRoundsPerVoteContext { + f.events.Failed.Trigger(id, voteCtx.LastOpinion()) + delete(f.ctxs, id) + continue + } + } +} + +// queries the opinions of QuerySampleSize amount of OpinionGivers. +func (f *FPC) queryOpinions() error { + ids := f.voteContextIDs() + + // nothing to vote on + if len(ids) == 0 { + return nil + } + + opinionGivers, err := f.opinionGiverFunc() + if err != nil { + return err + } + + // nobody to query + if len(opinionGivers) == 0 { + return ErrNoOpinionGiversAvailable + } + + // select random subset to query (the same opinion giver can occur multiple times) + opinionGiversToQuery := make([]vote.OpinionGiver, f.paras.QuerySampleSize) + for i := 0; i < f.paras.QuerySampleSize; i++ { + opinionGiversToQuery[i] = opinionGivers[f.rng.Intn(len(opinionGivers))] + } + + // votes per id + var voteMapMu sync.Mutex + voteMap := map[string]vote.Opinions{} + + // send queries + var wg sync.WaitGroup + for _, opinionGiverToQuery := range opinionGiversToQuery { + wg.Add(1) + go func(opinionGiverToQuery vote.OpinionGiver) { + defer wg.Done() + + queryCtx, cancel := context.WithTimeout(context.Background(), f.paras.QueryTimeout) + defer cancel() + + // query + opinions, err := opinionGiverToQuery.Query(queryCtx, ids) + if err != nil || len(opinions) != len(ids) { + // ignore opinions + return + } + + // add opinions to vote map + voteMapMu.Lock() + defer voteMapMu.Unlock() + for i, id := range ids { + votes, has := voteMap[id] + if !has { + votes = vote.Opinions{} + } + votes = append(votes, opinions[i]) + voteMap[id] = votes + } + }(opinionGiverToQuery) + } + wg.Wait() + + f.ctxsMu.RLock() + defer f.ctxsMu.RUnlock() + // compute liked percentage + for id, votes := range voteMap { + var likedSum float64 + votedCount := float64(len(votes)) + + for _, opinion := range votes { + switch opinion { + case vote.Unknown: + votedCount-- + case vote.Like: + likedSum++ + } + } + + // mark a round being done, even though there's no opinion, + // so this voting context will be cleared eventually + f.ctxs[id].Rounds++ + if votedCount == 0 { + continue + } + f.ctxs[id].Liked = likedSum / votedCount + } + return nil +} + +func (f *FPC) voteContextIDs() []string { + f.ctxsMu.RLock() + defer f.ctxsMu.RUnlock() + var i int + ids := make([]string, len(f.ctxs)) + for id := range f.ctxs { + ids[i] = id + i++ + } + return ids +} diff --git a/packages/vote/fpc/fpc_test.go b/packages/vote/fpc/fpc_test.go new file mode 100644 index 00000000..0c021798 --- /dev/null +++ b/packages/vote/fpc/fpc_test.go @@ -0,0 +1,180 @@ +package fpc_test + +import ( + "context" + "errors" + "testing" + + "github.com/iotaledger/goshimmer/packages/vote" + "github.com/iotaledger/goshimmer/packages/vote/fpc" + "github.com/iotaledger/hive.go/events" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestVoteContext_IsFinalized(t *testing.T) { + type testInput struct { + voteCtx fpc.VoteContext + coolOffPeriod int + finalizationThreshold int + want bool + } + var tests = []testInput{ + {fpc.VoteContext{ + Opinions: []vote.Opinion{vote.Like, vote.Like, vote.Like, vote.Like, vote.Like}, + }, 2, 2, true}, + {fpc.VoteContext{ + Opinions: []vote.Opinion{vote.Like, vote.Like, vote.Like, vote.Like, vote.Dislike}, + }, 2, 2, false}, + } + + for _, test := range tests { + assert.Equal(t, test.want, test.voteCtx.IsFinalized(test.coolOffPeriod, test.finalizationThreshold)) + } +} + +func TestVoteContext_LastOpinion(t *testing.T) { + type testInput struct { + voteCtx fpc.VoteContext + expected vote.Opinion + } + var tests = []testInput{ + {fpc.VoteContext{ + Opinions: []vote.Opinion{vote.Like, vote.Like, vote.Like, vote.Like}, + }, vote.Like}, + {fpc.VoteContext{ + Opinions: []vote.Opinion{vote.Like, vote.Like, vote.Like, vote.Dislike}, + }, vote.Dislike}, + } + + for _, test := range tests { + assert.Equal(t, test.expected, test.voteCtx.LastOpinion()) + } +} + +func TestFPCPreventSameIDMultipleTimes(t *testing.T) { + voter := fpc.New(nil) + assert.NoError(t, voter.Vote("a", vote.Like)) + // can't add the same item twice + assert.True(t, errors.Is(voter.Vote("a", vote.Like), fpc.ErrVoteAlreadyOngoing)) +} + +type opiniongivermock struct { + roundsReplies []vote.Opinions + roundIndex int +} + +func (ogm *opiniongivermock) Query(_ context.Context, _ []string) (vote.Opinions, error) { + if ogm.roundIndex >= len(ogm.roundsReplies) { + return ogm.roundsReplies[len(ogm.roundsReplies)-1], nil + } + opinions := ogm.roundsReplies[ogm.roundIndex] + ogm.roundIndex++ + return opinions, nil +} + +func TestFPCFinalizedEvent(t *testing.T) { + opinionGiverMock := &opiniongivermock{ + roundsReplies: []vote.Opinions{ + // 2 cool-off period, 2 finalization threshold + {vote.Like}, {vote.Like}, {vote.Like}, {vote.Like}, + }, + } + opinionGiverFunc := func() (givers []vote.OpinionGiver, err error) { + return []vote.OpinionGiver{opinionGiverMock}, nil + } + + id := "a" + + paras := fpc.DefaultParameters() + paras.QuerySampleSize = 1 + voter := fpc.New(opinionGiverFunc, paras) + var finalizedOpinion *vote.Opinion + voter.Events().Finalized.Attach(events.NewClosure(func(id string, opinion vote.Opinion) { + finalizedOpinion = &opinion + })) + assert.NoError(t, voter.Vote(id, vote.Like)) + + // do 5 rounds of FPC -> 5 because the last one finalizes the vote + for i := 0; i < 5; i++ { + assert.NoError(t, voter.Round(0.5)) + } + + require.NotNil(t, finalizedOpinion, "finalized event should have been fired") + assert.Equal(t, vote.Like, *finalizedOpinion, "the final opinion should have been 'Like'") +} + +func TestFPCFailedEvent(t *testing.T) { + opinionGiverFunc := func() (givers []vote.OpinionGiver, err error) { + return []vote.OpinionGiver{&opiniongivermock{ + // doesn't matter what we set here + roundsReplies: []vote.Opinions{{vote.Dislike}}, + }}, nil + } + + id := "a" + + paras := fpc.DefaultParameters() + paras.QuerySampleSize = 1 + paras.MaxRoundsPerVoteContext = 3 + paras.CoolingOffPeriod = 0 + // since the finalization threshold is over max rounds it will + // always fail finalizing an opinion + paras.FinalizationThreshold = 4 + voter := fpc.New(opinionGiverFunc, paras) + var failedOpinion *vote.Opinion + voter.Events().Failed.Attach(events.NewClosure(func(id string, opinion vote.Opinion) { + failedOpinion = &opinion + })) + assert.NoError(t, voter.Vote(id, vote.Like)) + + for i := 0; i < 4; i++ { + assert.NoError(t, voter.Round(0.5)) + } + + require.NotNil(t, failedOpinion, "failed event should have been fired") + assert.Equal(t, vote.Dislike, *failedOpinion, "the final opinion should have been 'Dislike'") +} + +func TestFPCVotingMultipleOpinionGivers(t *testing.T) { + type testInput struct { + id string + initOpinion vote.Opinion + expectedRoundsDone int + expectedOpinion vote.Opinion + } + var tests = []testInput{ + {"1", vote.Like, 5, vote.Like}, + {"2", vote.Dislike, 5, vote.Dislike}, + } + + for _, test := range tests { + // note that even though we're defining QuerySampleSize times opinion givers, + // it doesn't mean that FPC will query all of them. + opinionGiverFunc := func() (givers []vote.OpinionGiver, err error) { + opinionGivers := make([]vote.OpinionGiver, fpc.DefaultParameters().QuerySampleSize) + for i := 0; i < len(opinionGivers); i++ { + opinionGivers[i] = &opiniongivermock{roundsReplies: []vote.Opinions{{test.initOpinion}}} + } + return opinionGivers, nil + } + + voter := fpc.New(opinionGiverFunc) + var finalOpinion *vote.Opinion + voter.Events().Finalized.Attach(events.NewClosure(func(id string, finalizedOpinion vote.Opinion) { + finalOpinion = &finalizedOpinion + })) + + assert.NoError(t, voter.Vote(test.id, test.initOpinion)) + + var roundsDone int + for finalOpinion == nil { + assert.NoError(t, voter.Round(0.7)) + roundsDone++ + } + + assert.Equal(t, test.expectedRoundsDone, roundsDone) + require.NotNil(t, finalOpinion) + assert.Equal(t, test.expectedOpinion, *finalOpinion) + } +} diff --git a/packages/vote/fpc/parameters.go b/packages/vote/fpc/parameters.go new file mode 100644 index 00000000..997ca759 --- /dev/null +++ b/packages/vote/fpc/parameters.go @@ -0,0 +1,42 @@ +package fpc + +import "time" + +// Parameters define the parameters of an FPC instance. +type Parameters struct { + // The lower bound liked percentage threshold at the first round. Also called 'a'. + FirstRoundLowerBoundThreshold float64 + // The upper bound liked percentage threshold at the first round. Also called 'b'. + FirstRoundUpperBoundThreshold float64 + // The liked percentage threshold used after the first round. Also called 'beta'. + SubsequentRoundsLowerBoundThreshold float64 + // The amount of opinions to query on each round for a given vote context. Also called 'k'. + QuerySampleSize int + // The amount of rounds a vote context's opinion needs to stay the same to be considered final. Also called 'l'. + FinalizationThreshold int + // The amount of rounds for which to ignore any finalization checks for. Also called 'm'. + CoolingOffPeriod int + // The max amount of rounds to execute per vote context before aborting them. + MaxRoundsPerVoteContext int + // The max amount of time a query is allowed to take. + QueryTimeout time.Duration +} + +// DefaultParameters returns the default parameters used in FPC. +func DefaultParameters() *Parameters { + return &Parameters{ + FirstRoundLowerBoundThreshold: 0.75, + FirstRoundUpperBoundThreshold: 0.85, + SubsequentRoundsLowerBoundThreshold: 0.33, + QuerySampleSize: 10, + FinalizationThreshold: 2, + CoolingOffPeriod: 2, + MaxRoundsPerVoteContext: 100, + QueryTimeout: 1500 * time.Millisecond, + } +} + +// RandUniformThreshold returns random threshold between the given lower/upper bound values. +func RandUniformThreshold(rand float64, thresholdLowerBound float64, thresholdUpperBound float64) float64 { + return thresholdLowerBound + rand*(thresholdUpperBound-thresholdLowerBound) +} diff --git a/packages/vote/fpc/vote_context.go b/packages/vote/fpc/vote_context.go new file mode 100644 index 00000000..862dc769 --- /dev/null +++ b/packages/vote/fpc/vote_context.go @@ -0,0 +1,63 @@ +package fpc + +import "github.com/iotaledger/goshimmer/packages/vote" + +func newVoteContext(id string, initOpn vote.Opinion) *VoteContext { + voteCtx := &VoteContext{id: id, Liked: likedInit} + voteCtx.AddOpinion(initOpn) + return voteCtx +} + +const likedInit = -1 + +// VoteContext is the context of votes from multiple rounds about a given item. +type VoteContext struct { + id string + // The percentage of OpinionGivers who liked this item on the last query. + Liked float64 + // The number of voting rounds performed. + Rounds int + // Append-only list of opinions formed after each round. + // the first opinion is the initial opinion when this vote context was created. + Opinions []vote.Opinion +} + +// adds the given opinion to this vote context +func (vc *VoteContext) AddOpinion(opn vote.Opinion) { + vc.Opinions = append(vc.Opinions, opn) +} + +func (vc *VoteContext) LastOpinion() vote.Opinion { + return vc.Opinions[len(vc.Opinions)-1] +} + +// tells whether this vote context is finalized by checking whether the opinion was held +// for finalizationThreshold number of rounds. +func (vc *VoteContext) IsFinalized(coolingOffPeriod int, finalizationThreshold int) bool { + // check whether we have enough opinions to say whether this vote context is finalized. + // we start from the 2nd opinion since the first one is the initial opinion. + if len(vc.Opinions[1:]) < coolingOffPeriod+finalizationThreshold { + return false + } + + // grab opinion which needs to be held for finalizationThreshold number of rounds + candidateOpinion := vc.Opinions[len(vc.Opinions)-finalizationThreshold] + + // check whether it was held for the subsequent rounds + for _, subsequentOpinion := range vc.Opinions[len(vc.Opinions)-finalizationThreshold+1:] { + if subsequentOpinion != candidateOpinion { + return false + } + } + return true +} + +// tells whether the vote context is new. +func (vc *VoteContext) IsNew() bool { + return vc.Liked == likedInit +} + +// tells whether the vote context just had its first round. +func (vc *VoteContext) HadFirstRound() bool { + return vc.Rounds == 1 +} diff --git a/packages/vote/net/query.pb.go b/packages/vote/net/query.pb.go new file mode 100644 index 00000000..30a964a0 --- /dev/null +++ b/packages/vote/net/query.pb.go @@ -0,0 +1,206 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// source: query.proto + +package net + +import ( + context "context" + fmt "fmt" + proto "github.com/golang/protobuf/proto" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" + math "math" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package + +type QueryRequest struct { + Id []string `protobuf:"bytes,1,rep,name=id,proto3" json:"id,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *QueryRequest) Reset() { *m = QueryRequest{} } +func (m *QueryRequest) String() string { return proto.CompactTextString(m) } +func (*QueryRequest) ProtoMessage() {} +func (*QueryRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_5c6ac9b241082464, []int{0} +} + +func (m *QueryRequest) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_QueryRequest.Unmarshal(m, b) +} +func (m *QueryRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_QueryRequest.Marshal(b, m, deterministic) +} +func (m *QueryRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_QueryRequest.Merge(m, src) +} +func (m *QueryRequest) XXX_Size() int { + return xxx_messageInfo_QueryRequest.Size(m) +} +func (m *QueryRequest) XXX_DiscardUnknown() { + xxx_messageInfo_QueryRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_QueryRequest proto.InternalMessageInfo + +func (m *QueryRequest) GetId() []string { + if m != nil { + return m.Id + } + return nil +} + +type QueryReply struct { + Opinion []int32 `protobuf:"varint,1,rep,packed,name=opinion,proto3" json:"opinion,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *QueryReply) Reset() { *m = QueryReply{} } +func (m *QueryReply) String() string { return proto.CompactTextString(m) } +func (*QueryReply) ProtoMessage() {} +func (*QueryReply) Descriptor() ([]byte, []int) { + return fileDescriptor_5c6ac9b241082464, []int{1} +} + +func (m *QueryReply) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_QueryReply.Unmarshal(m, b) +} +func (m *QueryReply) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_QueryReply.Marshal(b, m, deterministic) +} +func (m *QueryReply) XXX_Merge(src proto.Message) { + xxx_messageInfo_QueryReply.Merge(m, src) +} +func (m *QueryReply) XXX_Size() int { + return xxx_messageInfo_QueryReply.Size(m) +} +func (m *QueryReply) XXX_DiscardUnknown() { + xxx_messageInfo_QueryReply.DiscardUnknown(m) +} + +var xxx_messageInfo_QueryReply proto.InternalMessageInfo + +func (m *QueryReply) GetOpinion() []int32 { + if m != nil { + return m.Opinion + } + return nil +} + +func init() { + proto.RegisterType((*QueryRequest)(nil), "net.QueryRequest") + proto.RegisterType((*QueryReply)(nil), "net.QueryReply") +} + +func init() { + proto.RegisterFile("query.proto", fileDescriptor_5c6ac9b241082464) +} + +var fileDescriptor_5c6ac9b241082464 = []byte{ + // 148 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0x2e, 0x2c, 0x4d, 0x2d, + 0xaa, 0xd4, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x62, 0xce, 0x4b, 0x2d, 0x51, 0x92, 0xe3, 0xe2, + 0x09, 0x04, 0x89, 0x05, 0xa5, 0x16, 0x96, 0xa6, 0x16, 0x97, 0x08, 0xf1, 0x71, 0x31, 0x65, 0xa6, + 0x48, 0x30, 0x2a, 0x30, 0x6b, 0x70, 0x06, 0x31, 0x65, 0xa6, 0x28, 0xa9, 0x71, 0x71, 0x41, 0xe5, + 0x0b, 0x72, 0x2a, 0x85, 0x24, 0xb8, 0xd8, 0xf3, 0x0b, 0x32, 0xf3, 0x32, 0xf3, 0xf3, 0xc0, 0x4a, + 0x58, 0x83, 0x60, 0x5c, 0x23, 0x5b, 0x2e, 0xae, 0xb0, 0xfc, 0x92, 0xd4, 0x22, 0xb0, 0x62, 0x21, + 0x7d, 0x2e, 0x76, 0x7f, 0x88, 0x84, 0x90, 0xa0, 0x5e, 0x5e, 0x6a, 0x89, 0x1e, 0xb2, 0x1d, 0x52, + 0xfc, 0xc8, 0x42, 0x05, 0x39, 0x95, 0x4a, 0x0c, 0x4e, 0xec, 0x51, 0xac, 0x7a, 0xd6, 0x79, 0xa9, + 0x25, 0x49, 0x6c, 0x60, 0xb7, 0x19, 0x03, 0x02, 0x00, 0x00, 0xff, 0xff, 0xb4, 0x93, 0xeb, 0x61, + 0xaa, 0x00, 0x00, 0x00, +} + +// Reference imports to suppress errors if they are not otherwise used. +var _ context.Context +var _ grpc.ClientConnInterface + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +const _ = grpc.SupportPackageIsVersion6 + +// VoterQueryClient is the client API for VoterQuery service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream. +type VoterQueryClient interface { + Opinion(ctx context.Context, in *QueryRequest, opts ...grpc.CallOption) (*QueryReply, error) +} + +type voterQueryClient struct { + cc grpc.ClientConnInterface +} + +func NewVoterQueryClient(cc grpc.ClientConnInterface) VoterQueryClient { + return &voterQueryClient{cc} +} + +func (c *voterQueryClient) Opinion(ctx context.Context, in *QueryRequest, opts ...grpc.CallOption) (*QueryReply, error) { + out := new(QueryReply) + err := c.cc.Invoke(ctx, "/net.VoterQuery/Opinion", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +// VoterQueryServer is the server API for VoterQuery service. +type VoterQueryServer interface { + Opinion(context.Context, *QueryRequest) (*QueryReply, error) +} + +// UnimplementedVoterQueryServer can be embedded to have forward compatible implementations. +type UnimplementedVoterQueryServer struct { +} + +func (*UnimplementedVoterQueryServer) Opinion(ctx context.Context, req *QueryRequest) (*QueryReply, error) { + return nil, status.Errorf(codes.Unimplemented, "method Opinion not implemented") +} + +func RegisterVoterQueryServer(s *grpc.Server, srv VoterQueryServer) { + s.RegisterService(&_VoterQuery_serviceDesc, srv) +} + +func _VoterQuery_Opinion_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(QueryRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(VoterQueryServer).Opinion(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/net.VoterQuery/Opinion", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(VoterQueryServer).Opinion(ctx, req.(*QueryRequest)) + } + return interceptor(ctx, in, info, handler) +} + +var _VoterQuery_serviceDesc = grpc.ServiceDesc{ + ServiceName: "net.VoterQuery", + HandlerType: (*VoterQueryServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "Opinion", + Handler: _VoterQuery_Opinion_Handler, + }, + }, + Streams: []grpc.StreamDesc{}, + Metadata: "query.proto", +} diff --git a/packages/vote/net/query.proto b/packages/vote/net/query.proto new file mode 100644 index 00000000..8c594680 --- /dev/null +++ b/packages/vote/net/query.proto @@ -0,0 +1,17 @@ +syntax = "proto3"; + +package net; + +option go_package = ".;net"; + +service VoterQuery { + rpc Opinion (QueryRequest) returns (QueryReply) {} +} + +message QueryRequest { + repeated string id = 1; +} + +message QueryReply { + repeated int32 opinion = 1; +} \ No newline at end of file diff --git a/packages/vote/net/server.go b/packages/vote/net/server.go new file mode 100644 index 00000000..c06f6984 --- /dev/null +++ b/packages/vote/net/server.go @@ -0,0 +1,64 @@ +package net + +import ( + "context" + "net" + + "github.com/iotaledger/goshimmer/packages/vote" + "google.golang.org/grpc" +) + +// OpinionRetriever retrieves the opinion for the given ID. +// If there's no opinion, the function should return Unknown. +type OpinionRetriever func(id string) vote.Opinion + +// New creates a new VoterServer. +func New(voter vote.Voter, opnRetriever OpinionRetriever, bindAddr string) *VoterServer { + vs := &VoterServer{ + voter: voter, + opnRetriever: opnRetriever, + bindAddr: bindAddr, + } + return vs +} + +// VoterServer is a server which responds to opinion queries. +type VoterServer struct { + voter vote.Voter + opnRetriever OpinionRetriever + bindAddr string + grpcServer *grpc.Server +} + +func (vs *VoterServer) Opinion(ctx context.Context, req *QueryRequest) (*QueryReply, error) { + reply := &QueryReply{ + Opinion: make([]int32, len(req.Id)), + } + for i, id := range req.Id { + // check whether there's an ongoing vote + opinion, err := vs.voter.IntermediateOpinion(id) + if err != nil { + reply.Opinion[i] = int32(opinion) + continue + } + reply.Opinion[i] = int32(vs.opnRetriever(id)) + } + + return reply, nil +} + +func (vs *VoterServer) Run() error { + listener, err := net.Listen("tcp", vs.bindAddr) + if err != nil { + return err + } + + grpcServer := grpc.NewServer() + RegisterVoterQueryServer(grpcServer, vs) + + return grpcServer.Serve(listener) +} + +func (vs *VoterServer) Shutdown() { + vs.grpcServer.GracefulStop() +} diff --git a/packages/vote/opinion.go b/packages/vote/opinion.go new file mode 100644 index 00000000..4164c576 --- /dev/null +++ b/packages/vote/opinion.go @@ -0,0 +1,44 @@ +package vote + +import ( + "context" +) + +// OpinionGiver gives opinions about the given IDs. +type OpinionGiver interface { + // Query queries the OpinionGiver for its opinions on the given IDs. + // The passed in context can be used to signal cancellation of the query. + Query(ctx context.Context, ids []string) (Opinions, error) +} + +type OpinionQueryFunc func(ctx context.Context, ids []string) (Opinions, error) + +func (oqf OpinionQueryFunc) Query(ctx context.Context, ids []string) (Opinions, error) { + return oqf(ctx, ids) +} + +// OpinionGiverFunc is a function which gives a slice of OpinionGivers or an error. +type OpinionGiverFunc func() ([]OpinionGiver, error) + +// Opinions is a slice of Opinion. +type Opinions []Opinion + +// Opinion is an opinion about a given thing. +type Opinion byte + +const ( + Like Opinion = 1 << 0 + Dislike Opinion = 1 << 1 + Unknown Opinion = 1 << 2 +) + +// ConvertInt32Opinion converts the given int32 to an Opinion. +func ConvertInt32Opinion(x int32) Opinion { + switch { + case x == 1<<0: + return Like + case x == 1<<1: + return Dislike + } + return Unknown +} diff --git a/packages/vote/voter.go b/packages/vote/voter.go new file mode 100644 index 00000000..c5078185 --- /dev/null +++ b/packages/vote/voter.go @@ -0,0 +1,45 @@ +package vote + +import ( + "errors" + + "github.com/iotaledger/hive.go/events" +) + +var ( + // ErrVotingNotFound is returned when a voting for a given id wasn't found. + ErrVotingNotFound = errors.New("no voting found") +) + +// Voter votes on hashes. +type Voter interface { + // Vote submits the given ID for voting with its initial Opinion. + Vote(id string, initOpn Opinion) error + // IntermediateOpinion gets intermediate Opinion about the given ID. + IntermediateOpinion(id string) (Opinion, error) + // Events returns the Events instance of the given Voter. + Events() Events +} + +// DRNGRoundBasedVoter is a Voter which votes in rounds and uses random numbers which +// were generated in a decentralized fashion. +type DRNGRoundBasedVoter interface { + Voter + // Round starts a new round. + Round(rand float64) error +} + +// Events defines events which happen on a Voter. +type Events struct { + // Fired when an Opinion has been finalized. + Finalized *events.Event + // Fired when an Opinion couldn't be finalized. + Failed *events.Event + // Fired when internal errors occur. + Error *events.Event +} + +// OpinionCaller calls the given handler with an Opinion and its associated Id. +func OpinionCaller(handler interface{}, params ...interface{}) { + handler.(func(id string, opinion Opinion))(params[0].(string), params[1].(Opinion)) +} diff --git a/plugins/fpc/parameters.go b/plugins/fpc/parameters.go new file mode 100644 index 00000000..7bc73424 --- /dev/null +++ b/plugins/fpc/parameters.go @@ -0,0 +1,17 @@ +package fpc + +import ( + flag "github.com/spf13/pflag" +) + +const ( + CfgFPCQuerySampleSize = "fpc.querySampleSize" + CfgFPCRoundInterval = "fpc.roundInterval" + CfgFPCBindAddress = "fpc.bindAddress" +) + +func init() { + flag.Int(CfgFPCQuerySampleSize, 3, "Size of the voting quorum (k)") + flag.Int(CfgFPCRoundInterval, 5, "FPC round interval [s]") + flag.String(CfgFPCBindAddress, "0.0.0.0:14636", "the bind address on which the FPC vote server binds to") +} diff --git a/plugins/fpc/plugin.go b/plugins/fpc/plugin.go new file mode 100644 index 00000000..7c333bd1 --- /dev/null +++ b/plugins/fpc/plugin.go @@ -0,0 +1,146 @@ +package fpc + +import ( + "context" + "fmt" + "net" + "strconv" + "sync" + + "github.com/iotaledger/goshimmer/packages/prng" + "google.golang.org/grpc" + + "github.com/iotaledger/goshimmer/packages/shutdown" + "github.com/iotaledger/goshimmer/packages/vote" + "github.com/iotaledger/goshimmer/packages/vote/fpc" + votenet "github.com/iotaledger/goshimmer/packages/vote/net" + "github.com/iotaledger/goshimmer/plugins/autopeering" + "github.com/iotaledger/goshimmer/plugins/autopeering/local" + "github.com/iotaledger/goshimmer/plugins/config" + + "github.com/iotaledger/hive.go/autopeering/peer" + "github.com/iotaledger/hive.go/autopeering/peer/service" + "github.com/iotaledger/hive.go/daemon" + "github.com/iotaledger/hive.go/logger" + "github.com/iotaledger/hive.go/node" +) + +const name = "FPC" + +var ( + PLUGIN = node.NewPlugin(name, node.Enabled, configure, run) + log *logger.Logger + voter *fpc.FPC + voterOnce sync.Once + voterServer *votenet.VoterServer + roundIntervalSeconds int64 = 5 +) + +func Voter() vote.DRNGRoundBasedVoter { + voterOnce.Do(func() { + // create a function which gets OpinionGivers + opinionGiverFunc := func() (givers []vote.OpinionGiver, err error) { + opinionGivers := make([]vote.OpinionGiver, 0) + for _, p := range autopeering.Discovery.GetVerifiedPeers() { + fpcService := p.Services().Get(service.FPCKey) + if fpcService == nil { + continue + } + opinionGivers = append(opinionGivers, peerOpinionGiver(p)) + } + return opinionGivers, nil + } + voter = fpc.New(opinionGiverFunc) + }) + return voter +} + +func configure(_ *node.Plugin) { + log = logger.NewLogger(name) + lPeer := local.GetInstance() + + bindAddr := config.Node.GetString(CfgFPCBindAddress) + _, portStr, err := net.SplitHostPort(bindAddr) + if err != nil { + log.Fatalf("FPC bind address '%s' is invalid: %s", bindAddr, err) + } + port, err := strconv.Atoi(portStr) + if err != nil { + log.Fatalf("FPC bind address '%s' is invalid: %s", bindAddr, err) + } + + if err := lPeer.UpdateService(service.FPCKey, "tcp", port); err != nil { + log.Fatalf("could not update services: %v", err) + } +} + +func run(_ *node.Plugin) { + + daemon.BackgroundWorker("FPCVoterServer", func(shutdownSignal <-chan struct{}) { + voterServer = votenet.New(Voter(), func(id string) vote.Opinion { + // TODO: replace with persistence layer call + return vote.Unknown + }, config.Node.GetString(CfgFPCBindAddress)) + + go func() { + if err := voterServer.Run(); err != nil { + log.Error(err) + } + }() + + log.Infof("Started vote server on %s", config.Node.GetString(CfgFPCBindAddress)) + <-shutdownSignal + voterServer.Shutdown() + log.Info("Stopped vote server") + }, shutdown.ShutdownPriorityFPC) + + daemon.BackgroundWorker("FPCRoundsInitiator", func(shutdownSignal <-chan struct{}) { + log.Infof("Started FPC round initiator") + unixTsPRNG := prng.NewUnixTimestampPRNG(roundIntervalSeconds) + defer unixTsPRNG.Stop() + exit: + for { + select { + case r := <-unixTsPRNG.C(): + if err := voter.Round(r); err != nil { + log.Errorf("unable to execute FPC round: %s", err) + } + case <-shutdownSignal: + break exit + } + } + log.Infof("Stopped FPC round initiator") + }, shutdown.ShutdownPriorityFPC) +} + +// creates an OpinionQueryFunc which uses the given peer. +func peerOpinionGiver(p *peer.Peer) vote.OpinionQueryFunc { + return func(ctx context.Context, ids []string) (vote.Opinions, error) { + fpcServicePort := p.Services().Get(service.FPCKey).Port() + fpcAddr := net.JoinHostPort(p.IP().String(), strconv.Itoa(fpcServicePort)) + + var opts []grpc.DialOption + opts = append(opts, grpc.WithInsecure()) + + // connect to the FPC service + conn, err := grpc.Dial(fpcAddr, opts...) + if err != nil { + return nil, fmt.Errorf("unable to connect to FPC service: %w", err) + } + defer conn.Close() + + client := votenet.NewVoterQueryClient(conn) + reply, err := client.Opinion(ctx, &votenet.QueryRequest{Id: ids}) + if err != nil { + return nil, fmt.Errorf("unable to query opinions: %w", err) + } + + // convert int32s in reply to opinions + opinions := make(vote.Opinions, len(reply.Opinion)) + for i, intOpn := range reply.Opinion { + opinions[i] = vote.ConvertInt32Opinion(intOpn) + } + + return opinions, nil + } +} -- GitLab