Select Git revision
fpc.go 6.53 KiB
package valuetransfers
import (
"context"
"fmt"
"net"
"strconv"
"sync"
"github.com/golang/protobuf/proto"
"github.com/iotaledger/goshimmer/dapps/valuetransfers/packages/branchmanager"
"github.com/iotaledger/goshimmer/packages/metrics"
"github.com/iotaledger/goshimmer/packages/prng"
"github.com/iotaledger/goshimmer/packages/shutdown"
"github.com/iotaledger/goshimmer/packages/vote"
"github.com/iotaledger/goshimmer/packages/vote/fpc"
votenet "github.com/iotaledger/goshimmer/packages/vote/net"
"github.com/iotaledger/goshimmer/plugins/autopeering"
"github.com/iotaledger/goshimmer/plugins/autopeering/local"
"github.com/iotaledger/goshimmer/plugins/config"
"github.com/iotaledger/hive.go/autopeering/peer"
"github.com/iotaledger/hive.go/autopeering/peer/service"
"github.com/iotaledger/hive.go/daemon"
"github.com/iotaledger/hive.go/events"
"github.com/iotaledger/hive.go/logger"
flag "github.com/spf13/pflag"
"google.golang.org/grpc"
)
const (
// FpcPluginName contains the human readable name of the plugin.
FpcPluginName = "FPC"
// CfgFPCQuerySampleSize defines how many nodes will be queried each round.
CfgFPCQuerySampleSize = "fpc.querySampleSize"
// CfgFPCRoundInterval defines how long a round lasts (in seconds)
CfgFPCRoundInterval = "fpc.roundInterval"
// CfgFPCBindAddress defines on which address the FPC service should listen.
CfgFPCBindAddress = "fpc.bindAddress"
)
func init() {
flag.Int(CfgFPCQuerySampleSize, 21, "Size of the voting quorum (k)")
flag.Int(CfgFPCRoundInterval, 5, "FPC round interval [s]")
flag.String(CfgFPCBindAddress, "0.0.0.0:10895", "the bind address on which the FPC vote server binds to")
}
var (
voter *fpc.FPC
voterOnce sync.Once
voterServer *votenet.VoterServer
roundIntervalSeconds int64 = 5
)
// Voter returns the DRNGRoundBasedVoter instance used by the FPC plugin.
func Voter() vote.DRNGRoundBasedVoter {
voterOnce.Do(func() {
// create a function which gets OpinionGivers
opinionGiverFunc := func() (givers []vote.OpinionGiver, err error) {
opinionGivers := make([]vote.OpinionGiver, 0)
for _, p := range autopeering.Discovery().GetVerifiedPeers() {
fpcService := p.Services().Get(service.FPCKey)
if fpcService == nil {
continue
}
// TODO: maybe cache the PeerOpinionGiver instead of creating a new one every time
opinionGivers = append(opinionGivers, &PeerOpinionGiver{p: p})
}
return opinionGivers, nil
}
voter = fpc.New(opinionGiverFunc)
})
return voter
}
func configureFPC() {
log = logger.NewLogger(FpcPluginName)
lPeer := local.GetInstance()
bindAddr := config.Node.GetString(CfgFPCBindAddress)
_, portStr, err := net.SplitHostPort(bindAddr)
if err != nil {
log.Fatalf("FPC bind address '%s' is invalid: %s", bindAddr, err)
}
port, err := strconv.Atoi(portStr)
if err != nil {
log.Fatalf("FPC bind address '%s' is invalid: %s", bindAddr, err)
}
if err := lPeer.UpdateService(service.FPCKey, "tcp", port); err != nil {
log.Fatalf("could not update services: %v", err)
}
Voter().Events().RoundExecuted.Attach(events.NewClosure(func(roundStats *vote.RoundStats) {
peersQueried := len(roundStats.QueriedOpinions)
voteContextsCount := len(roundStats.ActiveVoteContexts)
log.Infof("executed round with rand %0.4f for %d vote contexts on %d peers, took %v", roundStats.RandUsed, voteContextsCount, peersQueried, roundStats.Duration)
}))
}
func runFPC() {
if err := daemon.BackgroundWorker("FPCVoterServer", func(shutdownSignal <-chan struct{}) {
voterServer = votenet.New(Voter(), func(id string) vote.Opinion {
branchID, err := branchmanager.BranchIDFromBase58(id)
if err != nil {
log.Errorf("received invalid vote request for branch '%s'", id)
return vote.Unknown
}
cachedBranch := Tangle.BranchManager().Branch(branchID)
defer cachedBranch.Release()
branch := cachedBranch.Unwrap()
if branch == nil {
return vote.Unknown
}
if !branch.Preferred() {
return vote.Dislike
}
return vote.Like
}, config.Node.GetString(CfgFPCBindAddress),
metrics.Events().FPCInboundBytes,
metrics.Events().FPCOutboundBytes,
metrics.Events().QueryReceived)
go func() {
if err := voterServer.Run(); err != nil {
log.Error(err)
}
}()
log.Infof("Started vote server on %s", config.Node.GetString(CfgFPCBindAddress))
<-shutdownSignal
voterServer.Shutdown()
log.Info("Stopped vote server")
}, shutdown.PriorityFPC); err != nil {
log.Panicf("Failed to start as daemon: %s", err)
}
if err := daemon.BackgroundWorker("FPCRoundsInitiator", func(shutdownSignal <-chan struct{}) {
log.Infof("Started FPC round initiator")
unixTsPRNG := prng.NewUnixTimestampPRNG(roundIntervalSeconds)
unixTsPRNG.Start()
defer unixTsPRNG.Stop()
exit:
for {
select {
case r := <-unixTsPRNG.C():
if err := voter.Round(r); err != nil {
log.Errorf("unable to execute FPC round: %s", err)
}
case <-shutdownSignal:
break exit
}
}
log.Infof("Stopped FPC round initiator")
}, shutdown.PriorityFPC); err != nil {
log.Panicf("Failed to start as daemon: %s", err)
}
}
// PeerOpinionGiver implements the OpinionGiver interface based on a peer.
type PeerOpinionGiver struct {
p *peer.Peer
}
// Query queries another node for its opinion.
func (pog *PeerOpinionGiver) Query(ctx context.Context, ids []string) (vote.Opinions, error) {
fpcServicePort := pog.p.Services().Get(service.FPCKey).Port()
fpcAddr := net.JoinHostPort(pog.p.IP().String(), strconv.Itoa(fpcServicePort))
var opts []grpc.DialOption
opts = append(opts, grpc.WithInsecure())
// connect to the FPC service
conn, err := grpc.Dial(fpcAddr, opts...)
if err != nil {
return nil, fmt.Errorf("unable to connect to FPC service: %w", err)
}
defer conn.Close()
client := votenet.NewVoterQueryClient(conn)
query := &votenet.QueryRequest{Id: ids}
reply, err := client.Opinion(ctx, query)
if err != nil {
metrics.Events().QueryReplyError.Trigger(&metrics.QueryReplyErrorEvent{
ID: pog.p.ID().String(),
OpinionCount: len(ids),
})
return nil, fmt.Errorf("unable to query opinions: %w", err)
}
metrics.Events().FPCInboundBytes.Trigger(uint64(proto.Size(reply)))
metrics.Events().FPCOutboundBytes.Trigger(uint64(proto.Size(query)))
// convert int32s in reply to opinions
opinions := make(vote.Opinions, len(reply.Opinion))
for i, intOpn := range reply.Opinion {
opinions[i] = vote.ConvertInt32Opinion(intOpn)
}
return opinions, nil
}
// ID returns a string representation of the identifier of the underlying Peer.
func (pog *PeerOpinionGiver) ID() string {
return pog.p.ID().String()
}