diff --git a/dapps/valuetransfers/dapp.go b/dapps/valuetransfers/dapp.go index aa2443049896c0dcb53e837dfd901fbd3e6e9d20..7964e47159906f81943a9410a9af87b603daffc8 100644 --- a/dapps/valuetransfers/dapp.go +++ b/dapps/valuetransfers/dapp.go @@ -4,22 +4,21 @@ import ( "sync" "time" - "github.com/iotaledger/goshimmer/dapps/valuetransfers/packages/payload" - "github.com/iotaledger/goshimmer/dapps/valuetransfers/packages/tipmanager" - "github.com/iotaledger/goshimmer/plugins/database" - "github.com/iotaledger/hive.go/daemon" - "github.com/iotaledger/hive.go/events" - "github.com/iotaledger/hive.go/logger" - "github.com/iotaledger/hive.go/node" - "github.com/iotaledger/goshimmer/dapps/valuetransfers/packages/consensus" + "github.com/iotaledger/goshimmer/dapps/valuetransfers/packages/payload" valuepayload "github.com/iotaledger/goshimmer/dapps/valuetransfers/packages/payload" "github.com/iotaledger/goshimmer/dapps/valuetransfers/packages/tangle" + "github.com/iotaledger/goshimmer/dapps/valuetransfers/packages/tipmanager" "github.com/iotaledger/goshimmer/packages/binary/messagelayer/message" messageTangle "github.com/iotaledger/goshimmer/packages/binary/messagelayer/tangle" "github.com/iotaledger/goshimmer/packages/shutdown" "github.com/iotaledger/goshimmer/packages/vote" + "github.com/iotaledger/goshimmer/plugins/database" "github.com/iotaledger/goshimmer/plugins/messagelayer" + "github.com/iotaledger/hive.go/daemon" + "github.com/iotaledger/hive.go/events" + "github.com/iotaledger/hive.go/logger" + "github.com/iotaledger/hive.go/node" ) const ( @@ -99,7 +98,7 @@ func configure(_ *node.Plugin) { } func run(*node.Plugin) { - if err := daemon.BackgroundWorker("Tangle", func(shutdownSignal <-chan struct{}) { + if err := daemon.BackgroundWorker("ValueTangle", func(shutdownSignal <-chan struct{}) { <-shutdownSignal Tangle.Shutdown() }, shutdown.PriorityTangle); err != nil { diff --git a/dapps/valuetransfers/fpc.go b/dapps/valuetransfers/fpc.go index dbbaa8d391975fd3fd681f2b10f02bb8f9aea8f1..c6a6d4a0caca5f48f27d67b214ed17f595005199 100644 --- a/dapps/valuetransfers/fpc.go +++ b/dapps/valuetransfers/fpc.go @@ -2,16 +2,10 @@ package valuetransfers import ( "context" - "flag" "fmt" "net" "strconv" - - "github.com/iotaledger/hive.go/autopeering/peer" - "github.com/iotaledger/hive.go/daemon" - "github.com/iotaledger/hive.go/events" - "github.com/iotaledger/hive.go/logger" - "google.golang.org/grpc" + "sync" "github.com/iotaledger/goshimmer/dapps/valuetransfers/packages/branchmanager" "github.com/iotaledger/goshimmer/packages/prng" @@ -22,13 +16,19 @@ import ( "github.com/iotaledger/goshimmer/plugins/autopeering" "github.com/iotaledger/goshimmer/plugins/autopeering/local" "github.com/iotaledger/goshimmer/plugins/config" - - "sync" - + "github.com/iotaledger/hive.go/autopeering/peer" "github.com/iotaledger/hive.go/autopeering/peer/service" + "github.com/iotaledger/hive.go/daemon" + "github.com/iotaledger/hive.go/events" + "github.com/iotaledger/hive.go/logger" + flag "github.com/spf13/pflag" + "google.golang.org/grpc" ) const ( + // FpcPluginName contains the human readable name of the plugin. + FpcPluginName = "FPC" + // CfgFPCQuerySampleSize defines how many nodes will be queried each round. CfgFPCQuerySampleSize = "fpc.querySampleSize" @@ -74,7 +74,7 @@ func Voter() vote.DRNGRoundBasedVoter { } func configureFPC() { - log = logger.NewLogger(PluginName) + log = logger.NewLogger(FpcPluginName) lPeer := local.GetInstance() bindAddr := config.Node.GetString(CfgFPCBindAddress) @@ -91,7 +91,7 @@ func configureFPC() { log.Fatalf("could not update services: %v", err) } - voter.Events().RoundExecuted.Attach(events.NewClosure(func(roundStats *vote.RoundStats) { + Voter().Events().RoundExecuted.Attach(events.NewClosure(func(roundStats *vote.RoundStats) { peersQueried := len(roundStats.QueriedOpinions) voteContextsCount := len(roundStats.ActiveVoteContexts) log.Infof("executed round with rand %0.4f for %d vote contexts on %d peers, took %v", roundStats.RandUsed, voteContextsCount, peersQueried, roundStats.Duration) @@ -140,6 +140,7 @@ func runFPC() { if err := daemon.BackgroundWorker("FPCRoundsInitiator", func(shutdownSignal <-chan struct{}) { log.Infof("Started FPC round initiator") unixTsPRNG := prng.NewUnixTimestampPRNG(roundIntervalSeconds) + unixTsPRNG.Start() defer unixTsPRNG.Stop() exit: for { diff --git a/dapps/valuetransfers/packages/consensus/fcob.go b/dapps/valuetransfers/packages/consensus/fcob.go index cb1069c127e11c0589803609ebe39b546a8504ff..7c9d14466e4071ec329de834ef629ef23cbc271c 100644 --- a/dapps/valuetransfers/packages/consensus/fcob.go +++ b/dapps/valuetransfers/packages/consensus/fcob.go @@ -29,6 +29,7 @@ func NewFCOB(tangle *tangle.Tangle, averageNetworkDelay time.Duration) (fcob *FC averageNetworkDelay: averageNetworkDelay, Events: &FCOBEvents{ Error: events.NewEvent(events.ErrorCaller), + Vote: events.NewEvent(voteEvent), }, } @@ -156,3 +157,7 @@ type FCOBEvents struct { // Vote gets called when FCOB needs to vote on a transaction. Vote *events.Event } + +func voteEvent(handler interface{}, params ...interface{}) { + handler.(func(id string, initOpn vote.Opinion))(params[0].(string), params[1].(vote.Opinion)) +} diff --git a/go.mod b/go.mod index e47e995a182b0e00a738f7cdc13c35da911a13aa..8485376324ad4c40826bad1928e1249caa0cc371 100644 --- a/go.mod +++ b/go.mod @@ -27,7 +27,7 @@ require ( go.uber.org/atomic v1.6.0 go.uber.org/zap v1.14.0 golang.org/x/crypto v0.0.0-20200429183012-4b2356b1ed79 - golang.org/x/net v0.0.0-20200425230154-ff2c4b7c35a0 + golang.org/x/net v0.0.0-20200425230154-ff2c4b7c35a0 // indirect golang.org/x/tools v0.0.0-20200330040139-fa3cc9eebcfe // indirect google.golang.org/grpc v1.28.1 gopkg.in/src-d/go-git.v4 v4.13.1 diff --git a/packages/vote/net/server.go b/packages/vote/net/server.go index ad8b019469fb19d0b508df1b65e78d256bd00005..44f359bb21f1aaf32b331c7958bfb5bdb568f790 100644 --- a/packages/vote/net/server.go +++ b/packages/vote/net/server.go @@ -53,10 +53,10 @@ func (vs *VoterServer) Run() error { return err } - grpcServer := grpc.NewServer() - RegisterVoterQueryServer(grpcServer, vs) + vs.grpcServer = grpc.NewServer() + RegisterVoterQueryServer(vs.grpcServer, vs) - return grpcServer.Serve(listener) + return vs.grpcServer.Serve(listener) } func (vs *VoterServer) Shutdown() { diff --git a/pluginmgr/core/plugins.go b/pluginmgr/core/plugins.go index e4ae7af0836d83effc2c193b30fe5049c0f1e0b7..1eba3f117348d559b68e2f677c8789f23f58a082 100644 --- a/pluginmgr/core/plugins.go +++ b/pluginmgr/core/plugins.go @@ -1,6 +1,7 @@ package core import ( + "github.com/iotaledger/goshimmer/dapps/valuetransfers" "github.com/iotaledger/goshimmer/plugins/autopeering" "github.com/iotaledger/goshimmer/plugins/banner" "github.com/iotaledger/goshimmer/plugins/bootstrap" @@ -38,4 +39,5 @@ var PLUGINS = node.Plugins( gracefulshutdown.Plugin, metrics.Plugin, drng.Plugin, + valuetransfers.App, ) diff --git a/pluginmgr/research/plugins.go b/pluginmgr/research/plugins.go index f85a30a6defd2f5e89fe8727f0f8f5363e7a26d3..1b2e701623c77e9a2f77c3ca4531d3efd8a00632 100644 --- a/pluginmgr/research/plugins.go +++ b/pluginmgr/research/plugins.go @@ -2,8 +2,8 @@ package research import ( analysisclient "github.com/iotaledger/goshimmer/plugins/analysis/client" - analysisserver "github.com/iotaledger/goshimmer/plugins/analysis/server" analysisdashboard "github.com/iotaledger/goshimmer/plugins/analysis/dashboard" + analysisserver "github.com/iotaledger/goshimmer/plugins/analysis/server" "github.com/iotaledger/goshimmer/plugins/remotelog" "github.com/iotaledger/hive.go/node" ) diff --git a/plugins/fpc/parameters.go b/plugins/fpc/parameters.go deleted file mode 100644 index 5f1356426367175845f5bfc0c2f50c5af6a68980..0000000000000000000000000000000000000000 --- a/plugins/fpc/parameters.go +++ /dev/null @@ -1,17 +0,0 @@ -package fpc - -import ( - flag "github.com/spf13/pflag" -) - -const ( - CfgFPCQuerySampleSize = "fpc.querySampleSize" - CfgFPCRoundInterval = "fpc.roundInterval" - CfgFPCBindAddress = "fpc.bindAddress" -) - -func init() { - flag.Int(CfgFPCQuerySampleSize, 3, "Size of the voting quorum (k)") - flag.Int(CfgFPCRoundInterval, 5, "FPC round interval [s]") - flag.String(CfgFPCBindAddress, "0.0.0.0:10895", "the bind address on which the FPC vote server binds to") -} diff --git a/plugins/fpc/plugin.go b/plugins/fpc/plugin.go deleted file mode 100644 index c420f3cd2e97cdfdc45548314ebdb407d8faf013..0000000000000000000000000000000000000000 --- a/plugins/fpc/plugin.go +++ /dev/null @@ -1,163 +0,0 @@ -package fpc - -import ( - "context" - "fmt" - "net" - "strconv" - "sync" - - "github.com/iotaledger/goshimmer/packages/prng" - "github.com/iotaledger/hive.go/events" - "google.golang.org/grpc" - - "github.com/iotaledger/goshimmer/packages/shutdown" - "github.com/iotaledger/goshimmer/packages/vote" - "github.com/iotaledger/goshimmer/packages/vote/fpc" - votenet "github.com/iotaledger/goshimmer/packages/vote/net" - "github.com/iotaledger/goshimmer/plugins/autopeering" - "github.com/iotaledger/goshimmer/plugins/autopeering/local" - "github.com/iotaledger/goshimmer/plugins/config" - - "github.com/iotaledger/hive.go/autopeering/peer" - "github.com/iotaledger/hive.go/autopeering/peer/service" - "github.com/iotaledger/hive.go/daemon" - "github.com/iotaledger/hive.go/logger" - "github.com/iotaledger/hive.go/node" -) - -// PluginName is the name of the FPC plugin. -const PluginName = "FPC" - -var ( - // Plugin is the plugin instance of the FPC plugin. - Plugin = node.NewPlugin(PluginName, node.Enabled, configure, run) - log *logger.Logger - voter *fpc.FPC - voterOnce sync.Once - voterServer *votenet.VoterServer - roundIntervalSeconds int64 = 5 -) - -// Voter returns the DRNGRoundBasedVoter instance used by the FPC plugin. -func Voter() vote.DRNGRoundBasedVoter { - voterOnce.Do(func() { - // create a function which gets OpinionGivers - opinionGiverFunc := func() (givers []vote.OpinionGiver, err error) { - opinionGivers := make([]vote.OpinionGiver, 0) - for _, p := range autopeering.Discovery().GetVerifiedPeers() { - fpcService := p.Services().Get(service.FPCKey) - if fpcService == nil { - continue - } - // TODO: maybe cache the PeerOpinionGiver instead of creating a new one every time - opinionGivers = append(opinionGivers, &PeerOpinionGiver{p: p}) - } - return opinionGivers, nil - } - voter = fpc.New(opinionGiverFunc) - }) - return voter -} - -func configure(_ *node.Plugin) { - log = logger.NewLogger(PluginName) - lPeer := local.GetInstance() - - bindAddr := config.Node.GetString(CfgFPCBindAddress) - _, portStr, err := net.SplitHostPort(bindAddr) - if err != nil { - log.Fatalf("FPC bind address '%s' is invalid: %s", bindAddr, err) - } - port, err := strconv.Atoi(portStr) - if err != nil { - log.Fatalf("FPC bind address '%s' is invalid: %s", bindAddr, err) - } - - if err := lPeer.UpdateService(service.FPCKey, "tcp", port); err != nil { - log.Fatalf("could not update services: %v", err) - } - - voter.Events().RoundExecuted.Attach(events.NewClosure(func(roundStats *vote.RoundStats) { - peersQueried := len(roundStats.QueriedOpinions) - voteContextsCount := len(roundStats.ActiveVoteContexts) - log.Infof("executed round with rand %0.4f for %d vote contexts on %d peers, took %v", roundStats.RandUsed, voteContextsCount, peersQueried, roundStats.Duration) - })) -} - -func run(_ *node.Plugin) { - - daemon.BackgroundWorker("FPCVoterServer", func(shutdownSignal <-chan struct{}) { - voterServer = votenet.New(Voter(), func(id string) vote.Opinion { - // TODO: replace with persistence layer call - return vote.Unknown - }, config.Node.GetString(CfgFPCBindAddress)) - - go func() { - if err := voterServer.Run(); err != nil { - log.Error(err) - } - }() - - log.Infof("Started vote server on %s", config.Node.GetString(CfgFPCBindAddress)) - <-shutdownSignal - voterServer.Shutdown() - log.Info("Stopped vote server") - }, shutdown.PriorityFPC) - - daemon.BackgroundWorker("FPCRoundsInitiator", func(shutdownSignal <-chan struct{}) { - log.Infof("Started FPC round initiator") - unixTsPRNG := prng.NewUnixTimestampPRNG(roundIntervalSeconds) - defer unixTsPRNG.Stop() - exit: - for { - select { - case r := <-unixTsPRNG.C(): - if err := voter.Round(r); err != nil { - log.Errorf("unable to execute FPC round: %s", err) - } - case <-shutdownSignal: - break exit - } - } - log.Infof("Stopped FPC round initiator") - }, shutdown.PriorityFPC) -} - -// PeerOpinionGiver implements the OpinionGiver interface based on a peer. -type PeerOpinionGiver struct { - p *peer.Peer -} - -func (pog *PeerOpinionGiver) Query(ctx context.Context, ids []string) (vote.Opinions, error) { - fpcServicePort := pog.p.Services().Get(service.FPCKey).Port() - fpcAddr := net.JoinHostPort(pog.p.IP().String(), strconv.Itoa(fpcServicePort)) - - var opts []grpc.DialOption - opts = append(opts, grpc.WithInsecure()) - - // connect to the FPC service - conn, err := grpc.Dial(fpcAddr, opts...) - if err != nil { - return nil, fmt.Errorf("unable to connect to FPC service: %w", err) - } - defer conn.Close() - - client := votenet.NewVoterQueryClient(conn) - reply, err := client.Opinion(ctx, &votenet.QueryRequest{Id: ids}) - if err != nil { - return nil, fmt.Errorf("unable to query opinions: %w", err) - } - - // convert int32s in reply to opinions - opinions := make(vote.Opinions, len(reply.Opinion)) - for i, intOpn := range reply.Opinion { - opinions[i] = vote.ConvertInt32Opinion(intOpn) - } - - return opinions, nil -} - -func (pog *PeerOpinionGiver) ID() string { - return pog.p.ID().String() -} diff --git a/plugins/webapi/value/attachments/handler.go b/plugins/webapi/value/attachments/handler.go index 71cca4df720c44af2312ca0178c1a61ebe02cef3..9996feec57343fe68a61b70f4f3ff22df2315830 100644 --- a/plugins/webapi/value/attachments/handler.go +++ b/plugins/webapi/value/attachments/handler.go @@ -22,6 +22,7 @@ func Handler(c echo.Context) error { // get txn by txn id txnObj := valuetransfers.Tangle.Transaction(txnID) + defer txnObj.Release() if !txnObj.Exists() { return c.JSON(http.StatusNotFound, Response{Error: "Transaction not found"}) } @@ -29,6 +30,7 @@ func Handler(c echo.Context) error { // get attachements by txn id for _, attachmentObj := range valuetransfers.Tangle.Attachments(txnID) { + defer attachmentObj.Release() if !attachmentObj.Exists() { continue } @@ -36,6 +38,7 @@ func Handler(c echo.Context) error { // get payload by payload id payloadObj := valuetransfers.Tangle.Payload(attachment.PayloadID()) + defer payloadObj.Release() if !payloadObj.Exists() { continue } diff --git a/plugins/webapi/value/gettransactionbyid/handler.go b/plugins/webapi/value/gettransactionbyid/handler.go index 3ae7e4fd029bdb77af3359aa8664099c3c68b3af..ba4d9248bf24eac9cdca043f1c6da2bd2aa3d8c4 100644 --- a/plugins/webapi/value/gettransactionbyid/handler.go +++ b/plugins/webapi/value/gettransactionbyid/handler.go @@ -20,6 +20,7 @@ func Handler(c echo.Context) error { // get txn by txn id txnObj := valuetransfers.Tangle.Transaction(txnID) + defer txnObj.Release() if !txnObj.Exists() { return c.JSON(http.StatusNotFound, Response{Error: "Transaction not found"}) } diff --git a/plugins/webapi/value/testsendtxn/handler.go b/plugins/webapi/value/testsendtxn/handler.go index 927de126357de3f4c84a88dcf8bc94b77a4370a8..222e919ede5d6b2390bd0540475b5f0cde0be4f4 100644 --- a/plugins/webapi/value/testsendtxn/handler.go +++ b/plugins/webapi/value/testsendtxn/handler.go @@ -46,12 +46,16 @@ func Handler(c echo.Context) error { balances := []*balance.Balance{} for _, b := range out.Balances { // get token color - color, _, err := balance.ColorFromBytes([]byte(b.Color)) - if err != nil { - log.Info(err.Error()) - return c.JSON(http.StatusBadRequest, Response{Error: err.Error()}) + if b.Color == "IOTA" { + balances = append(balances, balance.New(balance.ColorIOTA, b.Value)) + } else { + color, _, err := balance.ColorFromBytes([]byte(b.Color)) + if err != nil { + log.Info(err.Error()) + return c.JSON(http.StatusBadRequest, Response{Error: err.Error()}) + } + balances = append(balances, balance.New(color, b.Value)) } - balances = append(balances, balance.New(color, b.Value)) } outmap[addr] = balances } diff --git a/plugins/webapi/value/unspentoutputs/handler.go b/plugins/webapi/value/unspentoutputs/handler.go index 7523fcb52226a1fe00f0e6831125fd398ea6136e..e22c9b0a076be495e2816be8b2c268201e823dd8 100644 --- a/plugins/webapi/value/unspentoutputs/handler.go +++ b/plugins/webapi/value/unspentoutputs/handler.go @@ -29,12 +29,23 @@ func Handler(c echo.Context) error { outputids := make([]OutputID, 0) // get outputids by address for id, outputObj := range valuetransfers.Tangle.OutputsOnAddress(address) { + defer outputObj.Release() output := outputObj.Unwrap() // TODO: get inclusion state if output.ConsumerCount() == 0 { + // iterate balances + var b []utils.Balance + for _, balance := range output.Balances() { + b = append(b, utils.Balance{ + Value: balance.Value(), + Color: balance.Color().String(), + }) + } + outputids = append(outputids, OutputID{ - ID: id.String(), + ID: id.String(), + Balances: b, InclusionState: utils.InclusionState{ Confirmed: true, Conflict: false, @@ -74,5 +85,6 @@ type UnspentOutput struct { // OutputID holds the output id and its inclusion state type OutputID struct { ID string `json:"id"` + Balances []utils.Balance `json:"balances"` InclusionState utils.InclusionState `json:"inclusion_state"` } diff --git a/tools/docker-network/docker-compose.yml b/tools/docker-network/docker-compose.yml index 374d5d9d839cce06170d4859428b8fbb3601d1ee..510364e38fdc19d142c25302361e9e5691ea7414 100644 --- a/tools/docker-network/docker-compose.yml +++ b/tools/docker-network/docker-compose.yml @@ -13,7 +13,7 @@ services: --analysis.server.bindAddress=0.0.0.0:1888 --analysis.dashboard.bindAddress=0.0.0.0:9000 --node.enablePlugins=analysis-server,analysis-dashboard - --node.disablePlugins=portcheck,dashboard,analysis-client,gossip,drng,issuer,sync,metrics,messagelayer,webapi,webapibroadcastdataendpoint,webapifindtransactionhashesendpoint,webapigetneighborsendpoint,webapigettransactionobjectsbyhashendpoint,webapigettransactiontrytesbyhashendpoint + --node.disablePlugins=portcheck,dashboard,analysis-client,gossip,drng,issuer,sync,metrics,messagelayer,valuetransfers,webapi,webapibroadcastdataendpoint,webapifindtransactionhashesendpoint,webapigetneighborsendpoint,webapigettransactionobjectsbyhashendpoint,webapigettransactiontrytesbyhashendpoint volumes: - ./config.docker.json:/tmp/config.json:ro - goshimmer-cache:/go diff --git a/tools/integration-tests/tester/framework/parameters.go b/tools/integration-tests/tester/framework/parameters.go index 481a3fb610127c6d37886e85f3f8b23c0fc69a67..6e2ae8c4719bc7ffe388474f9d3094e6f6cba0bd 100644 --- a/tools/integration-tests/tester/framework/parameters.go +++ b/tools/integration-tests/tester/framework/parameters.go @@ -13,7 +13,7 @@ const ( logsDir = "/tmp/logs/" - disabledPluginsEntryNode = "portcheck,dashboard,analysis-client,profiling,gossip,drng,issuer,sync,metrics,messagelayer,webapi,webapibroadcastdataendpoint,webapifindtransactionhashesendpoint,webapigetneighborsendpoint,webapigettransactionobjectsbyhashendpoint,webapigettransactiontrytesbyhashendpoint" + disabledPluginsEntryNode = "portcheck,dashboard,analysis-client,profiling,gossip,drng,issuer,sync,metrics,valuetransfers,messagelayer,webapi,webapibroadcastdataendpoint,webapifindtransactionhashesendpoint,webapigetneighborsendpoint,webapigettransactionobjectsbyhashendpoint,webapigettransactiontrytesbyhashendpoint" disabledPluginsPeer = "portcheck,dashboard,analysis-client,profiling" dockerLogsPrefixLen = 8 diff --git a/tools/integration-tests/tester/tests/common/common_test.go b/tools/integration-tests/tester/tests/common/common_test.go index 2332e4c5c68bdf191c7d65b7066fb975f4d7fa17..7cdceef806dd0823a35e9e80ccbceadf25c0e15d 100644 --- a/tools/integration-tests/tester/tests/common/common_test.go +++ b/tools/integration-tests/tester/tests/common/common_test.go @@ -55,6 +55,9 @@ func TestSynchronization(t *testing.T) { // wait for peer to start time.Sleep(5 * time.Second) + err = n.WaitForAutopeering(3) + require.NoError(t, err) + // note: this check is too dependent on the initial time a node sends bootstrap messages // and therefore very error prone. Therefore it's not done for now. // 7. check that it is in state desynced