Skip to content
Snippets Groups Projects
Unverified Commit 85e92abc authored by jkrvivian's avatar jkrvivian Committed by GitHub
Browse files

fix: Fix bugs while enabling value transfers dapp (#442)

* fix: Fix bugs while enabling value transfer

* :heavy_minus_sign: Remove unused plugin

* feat: Implement value web api & client library (#438)

* feat: Implement value api: Attachment

* feat: Implement value api: UnspentOutputs

* feat: Implement value api: transactionByID

* feat: Implement client lib & value api: sendTransaction

* fix: minor tweak

* fix: minor fix

* refactor: Refactor sendTransaction api

* Refactor: Fix :dog:

* refactor: Fix :dog:

* refactor: Rename api route of testSendTxn to camel case

* fix: Fix bugs in webapi

* Release object storage after retreiving it
* Handle IOTA color specifically

* refactor: Add balances in unspentOutput api

* Fix entry node disabled plugins for integration tests

* Fix grpc server nil

* :bug: Fix FPC round initiator

* :lipstick: Fix backgroundWorker name

* :rotating_light: Adjust imports order

* Wait for autopeering when node starts

* Fix background worker name conflict with message tangle

* :art: Use Voter()

* :bug:

 Re-fix ValueTangle BackgroundWorker name

Co-authored-by: default avatarcapossele <angelocapossele@gmail.com>
Co-authored-by: default avatarjonastheis <mail@jonastheis.de>
parent aa32ab70
No related branches found
No related tags found
No related merge requests found
Showing
with 64 additions and 214 deletions
......@@ -4,22 +4,21 @@ import (
"sync"
"time"
"github.com/iotaledger/goshimmer/dapps/valuetransfers/packages/payload"
"github.com/iotaledger/goshimmer/dapps/valuetransfers/packages/tipmanager"
"github.com/iotaledger/goshimmer/plugins/database"
"github.com/iotaledger/hive.go/daemon"
"github.com/iotaledger/hive.go/events"
"github.com/iotaledger/hive.go/logger"
"github.com/iotaledger/hive.go/node"
"github.com/iotaledger/goshimmer/dapps/valuetransfers/packages/consensus"
"github.com/iotaledger/goshimmer/dapps/valuetransfers/packages/payload"
valuepayload "github.com/iotaledger/goshimmer/dapps/valuetransfers/packages/payload"
"github.com/iotaledger/goshimmer/dapps/valuetransfers/packages/tangle"
"github.com/iotaledger/goshimmer/dapps/valuetransfers/packages/tipmanager"
"github.com/iotaledger/goshimmer/packages/binary/messagelayer/message"
messageTangle "github.com/iotaledger/goshimmer/packages/binary/messagelayer/tangle"
"github.com/iotaledger/goshimmer/packages/shutdown"
"github.com/iotaledger/goshimmer/packages/vote"
"github.com/iotaledger/goshimmer/plugins/database"
"github.com/iotaledger/goshimmer/plugins/messagelayer"
"github.com/iotaledger/hive.go/daemon"
"github.com/iotaledger/hive.go/events"
"github.com/iotaledger/hive.go/logger"
"github.com/iotaledger/hive.go/node"
)
const (
......@@ -99,7 +98,7 @@ func configure(_ *node.Plugin) {
}
func run(*node.Plugin) {
if err := daemon.BackgroundWorker("Tangle", func(shutdownSignal <-chan struct{}) {
if err := daemon.BackgroundWorker("ValueTangle", func(shutdownSignal <-chan struct{}) {
<-shutdownSignal
Tangle.Shutdown()
}, shutdown.PriorityTangle); err != nil {
......
......@@ -2,16 +2,10 @@ package valuetransfers
import (
"context"
"flag"
"fmt"
"net"
"strconv"
"github.com/iotaledger/hive.go/autopeering/peer"
"github.com/iotaledger/hive.go/daemon"
"github.com/iotaledger/hive.go/events"
"github.com/iotaledger/hive.go/logger"
"google.golang.org/grpc"
"sync"
"github.com/iotaledger/goshimmer/dapps/valuetransfers/packages/branchmanager"
"github.com/iotaledger/goshimmer/packages/prng"
......@@ -22,13 +16,19 @@ import (
"github.com/iotaledger/goshimmer/plugins/autopeering"
"github.com/iotaledger/goshimmer/plugins/autopeering/local"
"github.com/iotaledger/goshimmer/plugins/config"
"sync"
"github.com/iotaledger/hive.go/autopeering/peer"
"github.com/iotaledger/hive.go/autopeering/peer/service"
"github.com/iotaledger/hive.go/daemon"
"github.com/iotaledger/hive.go/events"
"github.com/iotaledger/hive.go/logger"
flag "github.com/spf13/pflag"
"google.golang.org/grpc"
)
const (
// FpcPluginName contains the human readable name of the plugin.
FpcPluginName = "FPC"
// CfgFPCQuerySampleSize defines how many nodes will be queried each round.
CfgFPCQuerySampleSize = "fpc.querySampleSize"
......@@ -74,7 +74,7 @@ func Voter() vote.DRNGRoundBasedVoter {
}
func configureFPC() {
log = logger.NewLogger(PluginName)
log = logger.NewLogger(FpcPluginName)
lPeer := local.GetInstance()
bindAddr := config.Node.GetString(CfgFPCBindAddress)
......@@ -91,7 +91,7 @@ func configureFPC() {
log.Fatalf("could not update services: %v", err)
}
voter.Events().RoundExecuted.Attach(events.NewClosure(func(roundStats *vote.RoundStats) {
Voter().Events().RoundExecuted.Attach(events.NewClosure(func(roundStats *vote.RoundStats) {
peersQueried := len(roundStats.QueriedOpinions)
voteContextsCount := len(roundStats.ActiveVoteContexts)
log.Infof("executed round with rand %0.4f for %d vote contexts on %d peers, took %v", roundStats.RandUsed, voteContextsCount, peersQueried, roundStats.Duration)
......@@ -140,6 +140,7 @@ func runFPC() {
if err := daemon.BackgroundWorker("FPCRoundsInitiator", func(shutdownSignal <-chan struct{}) {
log.Infof("Started FPC round initiator")
unixTsPRNG := prng.NewUnixTimestampPRNG(roundIntervalSeconds)
unixTsPRNG.Start()
defer unixTsPRNG.Stop()
exit:
for {
......
......@@ -29,6 +29,7 @@ func NewFCOB(tangle *tangle.Tangle, averageNetworkDelay time.Duration) (fcob *FC
averageNetworkDelay: averageNetworkDelay,
Events: &FCOBEvents{
Error: events.NewEvent(events.ErrorCaller),
Vote: events.NewEvent(voteEvent),
},
}
......@@ -156,3 +157,7 @@ type FCOBEvents struct {
// Vote gets called when FCOB needs to vote on a transaction.
Vote *events.Event
}
func voteEvent(handler interface{}, params ...interface{}) {
handler.(func(id string, initOpn vote.Opinion))(params[0].(string), params[1].(vote.Opinion))
}
......@@ -27,7 +27,7 @@ require (
go.uber.org/atomic v1.6.0
go.uber.org/zap v1.14.0
golang.org/x/crypto v0.0.0-20200429183012-4b2356b1ed79
golang.org/x/net v0.0.0-20200425230154-ff2c4b7c35a0
golang.org/x/net v0.0.0-20200425230154-ff2c4b7c35a0 // indirect
golang.org/x/tools v0.0.0-20200330040139-fa3cc9eebcfe // indirect
google.golang.org/grpc v1.28.1
gopkg.in/src-d/go-git.v4 v4.13.1
......
......@@ -53,10 +53,10 @@ func (vs *VoterServer) Run() error {
return err
}
grpcServer := grpc.NewServer()
RegisterVoterQueryServer(grpcServer, vs)
vs.grpcServer = grpc.NewServer()
RegisterVoterQueryServer(vs.grpcServer, vs)
return grpcServer.Serve(listener)
return vs.grpcServer.Serve(listener)
}
func (vs *VoterServer) Shutdown() {
......
package core
import (
"github.com/iotaledger/goshimmer/dapps/valuetransfers"
"github.com/iotaledger/goshimmer/plugins/autopeering"
"github.com/iotaledger/goshimmer/plugins/banner"
"github.com/iotaledger/goshimmer/plugins/bootstrap"
......@@ -38,4 +39,5 @@ var PLUGINS = node.Plugins(
gracefulshutdown.Plugin,
metrics.Plugin,
drng.Plugin,
valuetransfers.App,
)
......@@ -2,8 +2,8 @@ package research
import (
analysisclient "github.com/iotaledger/goshimmer/plugins/analysis/client"
analysisserver "github.com/iotaledger/goshimmer/plugins/analysis/server"
analysisdashboard "github.com/iotaledger/goshimmer/plugins/analysis/dashboard"
analysisserver "github.com/iotaledger/goshimmer/plugins/analysis/server"
"github.com/iotaledger/goshimmer/plugins/remotelog"
"github.com/iotaledger/hive.go/node"
)
......
package fpc
import (
flag "github.com/spf13/pflag"
)
const (
CfgFPCQuerySampleSize = "fpc.querySampleSize"
CfgFPCRoundInterval = "fpc.roundInterval"
CfgFPCBindAddress = "fpc.bindAddress"
)
func init() {
flag.Int(CfgFPCQuerySampleSize, 3, "Size of the voting quorum (k)")
flag.Int(CfgFPCRoundInterval, 5, "FPC round interval [s]")
flag.String(CfgFPCBindAddress, "0.0.0.0:10895", "the bind address on which the FPC vote server binds to")
}
package fpc
import (
"context"
"fmt"
"net"
"strconv"
"sync"
"github.com/iotaledger/goshimmer/packages/prng"
"github.com/iotaledger/hive.go/events"
"google.golang.org/grpc"
"github.com/iotaledger/goshimmer/packages/shutdown"
"github.com/iotaledger/goshimmer/packages/vote"
"github.com/iotaledger/goshimmer/packages/vote/fpc"
votenet "github.com/iotaledger/goshimmer/packages/vote/net"
"github.com/iotaledger/goshimmer/plugins/autopeering"
"github.com/iotaledger/goshimmer/plugins/autopeering/local"
"github.com/iotaledger/goshimmer/plugins/config"
"github.com/iotaledger/hive.go/autopeering/peer"
"github.com/iotaledger/hive.go/autopeering/peer/service"
"github.com/iotaledger/hive.go/daemon"
"github.com/iotaledger/hive.go/logger"
"github.com/iotaledger/hive.go/node"
)
// PluginName is the name of the FPC plugin.
const PluginName = "FPC"
var (
// Plugin is the plugin instance of the FPC plugin.
Plugin = node.NewPlugin(PluginName, node.Enabled, configure, run)
log *logger.Logger
voter *fpc.FPC
voterOnce sync.Once
voterServer *votenet.VoterServer
roundIntervalSeconds int64 = 5
)
// Voter returns the DRNGRoundBasedVoter instance used by the FPC plugin.
func Voter() vote.DRNGRoundBasedVoter {
voterOnce.Do(func() {
// create a function which gets OpinionGivers
opinionGiverFunc := func() (givers []vote.OpinionGiver, err error) {
opinionGivers := make([]vote.OpinionGiver, 0)
for _, p := range autopeering.Discovery().GetVerifiedPeers() {
fpcService := p.Services().Get(service.FPCKey)
if fpcService == nil {
continue
}
// TODO: maybe cache the PeerOpinionGiver instead of creating a new one every time
opinionGivers = append(opinionGivers, &PeerOpinionGiver{p: p})
}
return opinionGivers, nil
}
voter = fpc.New(opinionGiverFunc)
})
return voter
}
func configure(_ *node.Plugin) {
log = logger.NewLogger(PluginName)
lPeer := local.GetInstance()
bindAddr := config.Node.GetString(CfgFPCBindAddress)
_, portStr, err := net.SplitHostPort(bindAddr)
if err != nil {
log.Fatalf("FPC bind address '%s' is invalid: %s", bindAddr, err)
}
port, err := strconv.Atoi(portStr)
if err != nil {
log.Fatalf("FPC bind address '%s' is invalid: %s", bindAddr, err)
}
if err := lPeer.UpdateService(service.FPCKey, "tcp", port); err != nil {
log.Fatalf("could not update services: %v", err)
}
voter.Events().RoundExecuted.Attach(events.NewClosure(func(roundStats *vote.RoundStats) {
peersQueried := len(roundStats.QueriedOpinions)
voteContextsCount := len(roundStats.ActiveVoteContexts)
log.Infof("executed round with rand %0.4f for %d vote contexts on %d peers, took %v", roundStats.RandUsed, voteContextsCount, peersQueried, roundStats.Duration)
}))
}
func run(_ *node.Plugin) {
daemon.BackgroundWorker("FPCVoterServer", func(shutdownSignal <-chan struct{}) {
voterServer = votenet.New(Voter(), func(id string) vote.Opinion {
// TODO: replace with persistence layer call
return vote.Unknown
}, config.Node.GetString(CfgFPCBindAddress))
go func() {
if err := voterServer.Run(); err != nil {
log.Error(err)
}
}()
log.Infof("Started vote server on %s", config.Node.GetString(CfgFPCBindAddress))
<-shutdownSignal
voterServer.Shutdown()
log.Info("Stopped vote server")
}, shutdown.PriorityFPC)
daemon.BackgroundWorker("FPCRoundsInitiator", func(shutdownSignal <-chan struct{}) {
log.Infof("Started FPC round initiator")
unixTsPRNG := prng.NewUnixTimestampPRNG(roundIntervalSeconds)
defer unixTsPRNG.Stop()
exit:
for {
select {
case r := <-unixTsPRNG.C():
if err := voter.Round(r); err != nil {
log.Errorf("unable to execute FPC round: %s", err)
}
case <-shutdownSignal:
break exit
}
}
log.Infof("Stopped FPC round initiator")
}, shutdown.PriorityFPC)
}
// PeerOpinionGiver implements the OpinionGiver interface based on a peer.
type PeerOpinionGiver struct {
p *peer.Peer
}
func (pog *PeerOpinionGiver) Query(ctx context.Context, ids []string) (vote.Opinions, error) {
fpcServicePort := pog.p.Services().Get(service.FPCKey).Port()
fpcAddr := net.JoinHostPort(pog.p.IP().String(), strconv.Itoa(fpcServicePort))
var opts []grpc.DialOption
opts = append(opts, grpc.WithInsecure())
// connect to the FPC service
conn, err := grpc.Dial(fpcAddr, opts...)
if err != nil {
return nil, fmt.Errorf("unable to connect to FPC service: %w", err)
}
defer conn.Close()
client := votenet.NewVoterQueryClient(conn)
reply, err := client.Opinion(ctx, &votenet.QueryRequest{Id: ids})
if err != nil {
return nil, fmt.Errorf("unable to query opinions: %w", err)
}
// convert int32s in reply to opinions
opinions := make(vote.Opinions, len(reply.Opinion))
for i, intOpn := range reply.Opinion {
opinions[i] = vote.ConvertInt32Opinion(intOpn)
}
return opinions, nil
}
func (pog *PeerOpinionGiver) ID() string {
return pog.p.ID().String()
}
......@@ -22,6 +22,7 @@ func Handler(c echo.Context) error {
// get txn by txn id
txnObj := valuetransfers.Tangle.Transaction(txnID)
defer txnObj.Release()
if !txnObj.Exists() {
return c.JSON(http.StatusNotFound, Response{Error: "Transaction not found"})
}
......@@ -29,6 +30,7 @@ func Handler(c echo.Context) error {
// get attachements by txn id
for _, attachmentObj := range valuetransfers.Tangle.Attachments(txnID) {
defer attachmentObj.Release()
if !attachmentObj.Exists() {
continue
}
......@@ -36,6 +38,7 @@ func Handler(c echo.Context) error {
// get payload by payload id
payloadObj := valuetransfers.Tangle.Payload(attachment.PayloadID())
defer payloadObj.Release()
if !payloadObj.Exists() {
continue
}
......
......@@ -20,6 +20,7 @@ func Handler(c echo.Context) error {
// get txn by txn id
txnObj := valuetransfers.Tangle.Transaction(txnID)
defer txnObj.Release()
if !txnObj.Exists() {
return c.JSON(http.StatusNotFound, Response{Error: "Transaction not found"})
}
......
......@@ -46,6 +46,9 @@ func Handler(c echo.Context) error {
balances := []*balance.Balance{}
for _, b := range out.Balances {
// get token color
if b.Color == "IOTA" {
balances = append(balances, balance.New(balance.ColorIOTA, b.Value))
} else {
color, _, err := balance.ColorFromBytes([]byte(b.Color))
if err != nil {
log.Info(err.Error())
......@@ -53,6 +56,7 @@ func Handler(c echo.Context) error {
}
balances = append(balances, balance.New(color, b.Value))
}
}
outmap[addr] = balances
}
outputs := transaction.NewOutputs(outmap)
......
......@@ -29,12 +29,23 @@ func Handler(c echo.Context) error {
outputids := make([]OutputID, 0)
// get outputids by address
for id, outputObj := range valuetransfers.Tangle.OutputsOnAddress(address) {
defer outputObj.Release()
output := outputObj.Unwrap()
// TODO: get inclusion state
if output.ConsumerCount() == 0 {
// iterate balances
var b []utils.Balance
for _, balance := range output.Balances() {
b = append(b, utils.Balance{
Value: balance.Value(),
Color: balance.Color().String(),
})
}
outputids = append(outputids, OutputID{
ID: id.String(),
Balances: b,
InclusionState: utils.InclusionState{
Confirmed: true,
Conflict: false,
......@@ -74,5 +85,6 @@ type UnspentOutput struct {
// OutputID holds the output id and its inclusion state
type OutputID struct {
ID string `json:"id"`
Balances []utils.Balance `json:"balances"`
InclusionState utils.InclusionState `json:"inclusion_state"`
}
......@@ -13,7 +13,7 @@ services:
--analysis.server.bindAddress=0.0.0.0:1888
--analysis.dashboard.bindAddress=0.0.0.0:9000
--node.enablePlugins=analysis-server,analysis-dashboard
--node.disablePlugins=portcheck,dashboard,analysis-client,gossip,drng,issuer,sync,metrics,messagelayer,webapi,webapibroadcastdataendpoint,webapifindtransactionhashesendpoint,webapigetneighborsendpoint,webapigettransactionobjectsbyhashendpoint,webapigettransactiontrytesbyhashendpoint
--node.disablePlugins=portcheck,dashboard,analysis-client,gossip,drng,issuer,sync,metrics,messagelayer,valuetransfers,webapi,webapibroadcastdataendpoint,webapifindtransactionhashesendpoint,webapigetneighborsendpoint,webapigettransactionobjectsbyhashendpoint,webapigettransactiontrytesbyhashendpoint
volumes:
- ./config.docker.json:/tmp/config.json:ro
- goshimmer-cache:/go
......
......@@ -13,7 +13,7 @@ const (
logsDir = "/tmp/logs/"
disabledPluginsEntryNode = "portcheck,dashboard,analysis-client,profiling,gossip,drng,issuer,sync,metrics,messagelayer,webapi,webapibroadcastdataendpoint,webapifindtransactionhashesendpoint,webapigetneighborsendpoint,webapigettransactionobjectsbyhashendpoint,webapigettransactiontrytesbyhashendpoint"
disabledPluginsEntryNode = "portcheck,dashboard,analysis-client,profiling,gossip,drng,issuer,sync,metrics,valuetransfers,messagelayer,webapi,webapibroadcastdataendpoint,webapifindtransactionhashesendpoint,webapigetneighborsendpoint,webapigettransactionobjectsbyhashendpoint,webapigettransactiontrytesbyhashendpoint"
disabledPluginsPeer = "portcheck,dashboard,analysis-client,profiling"
dockerLogsPrefixLen = 8
......
......@@ -55,6 +55,9 @@ func TestSynchronization(t *testing.T) {
// wait for peer to start
time.Sleep(5 * time.Second)
err = n.WaitForAutopeering(3)
require.NoError(t, err)
// note: this check is too dependent on the initial time a node sends bootstrap messages
// and therefore very error prone. Therefore it's not done for now.
// 7. check that it is in state desynced
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment