Skip to content
Snippets Groups Projects
Select Git revision
  • fedfad7429621bd588c753346b6a1732ac185ece
  • without_tipselection default
  • develop protected
  • fix/grafana-local-dashboard
  • wasp
  • fix/dashboard-explorer-freeze
  • master
  • feat/timerqueue
  • test/sync_debug_and_650
  • feat/sync_revamp_inv
  • wip/sync
  • tool/db-recovery
  • portcheck/fix
  • fix/synchronization
  • feat/new-dashboard-analysis
  • feat/refactored-analysis-dashboard
  • feat/new-analysis-dashboard
  • test/demo-prometheus-fpc
  • prometheus_metrics
  • wip/analysis-server
  • merge/fpc-test-value-transfer
  • v0.2.2
  • v0.2.1
  • v0.2.0
  • v0.1.3
  • v0.1.2
  • v0.1.1
  • v0.1.0
28 results

constants.go

Blame
  • dapp.go 8.22 KiB
    package valuetransfers
    
    import (
    	"errors"
    	"os"
    	"sync"
    	"time"
    
    	"github.com/iotaledger/goshimmer/dapps/valuetransfers/packages/consensus"
    	valuepayload "github.com/iotaledger/goshimmer/dapps/valuetransfers/packages/payload"
    	"github.com/iotaledger/goshimmer/dapps/valuetransfers/packages/tangle"
    	"github.com/iotaledger/goshimmer/dapps/valuetransfers/packages/tipmanager"
    	"github.com/iotaledger/goshimmer/dapps/valuetransfers/packages/transaction"
    	messageTangle "github.com/iotaledger/goshimmer/packages/binary/messagelayer/tangle"
    	"github.com/iotaledger/goshimmer/packages/shutdown"
    	"github.com/iotaledger/goshimmer/packages/vote"
    	"github.com/iotaledger/goshimmer/plugins/config"
    	"github.com/iotaledger/goshimmer/plugins/database"
    	"github.com/iotaledger/goshimmer/plugins/messagelayer"
    	"github.com/iotaledger/hive.go/daemon"
    	"github.com/iotaledger/hive.go/events"
    	"github.com/iotaledger/hive.go/logger"
    	"github.com/iotaledger/hive.go/node"
    	flag "github.com/spf13/pflag"
    )
    
    const (
    	// PluginName contains the human readable name of the plugin.
    	PluginName = "ValueTransfers"
    
    	// DefaultAverageNetworkDelay contains the default average time it takes for a network to propagate through gossip.
    	DefaultAverageNetworkDelay = 5 * time.Second
    
    	// CfgValueLayerSnapshotFile is the path to the snapshot file.
    	CfgValueLayerSnapshotFile = "valueLayer.snapshot.file"
    
    	// CfgValueLayerFCOBAverageNetworkDelay is the avg. network delay to use for FCoB rules
    	CfgValueLayerFCOBAverageNetworkDelay = "valueLayer.fcob.averageNetworkDelay"
    )
    
    func init() {
    	flag.String(CfgValueLayerSnapshotFile, "./snapshot.bin", "the path to the snapshot file")
    	flag.Int(CfgValueLayerFCOBAverageNetworkDelay, 5, "the avg. network delay to use for FCoB rules")
    }
    
    var (
    	// ErrTransactionWasNotBookedInTime is returned if a transaction did not get booked
    	// within the defined await time.
    	ErrTransactionWasNotBookedInTime = errors.New("transaction could not be booked in time")
    
    	// app is the "plugin" instance of the value-transfers application.
    	app     *node.Plugin
    	appOnce sync.Once
    
    	// _tangle represents the value tangle that is used to express votes on value transactions.
    	_tangle    *tangle.Tangle
    	tangleOnce sync.Once
    
    	// fcob contains the fcob consensus logic.
    	fcob *consensus.FCOB
    
    	// ledgerState represents the ledger state, that keeps track of the liked branches and offers an API to access funds.
    	ledgerState *tangle.LedgerState
    
    	// log holds a reference to the logger used by this app.
    	log *logger.Logger
    
    	tipManager     *tipmanager.TipManager
    	tipManagerOnce sync.Once
    
    	valueObjectFactory     *tangle.ValueObjectFactory
    	valueObjectFactoryOnce sync.Once
    )
    
    // App gets the plugin instance.
    func App() *node.Plugin {
    	appOnce.Do(func() {
    		app = node.NewPlugin(PluginName, node.Enabled, configure, run)
    	})
    	return app
    }
    
    // Tangle gets the tangle instance.
    // tangle represents the value tangle that is used to express votes on value transactions.
    func Tangle() *tangle.Tangle {
    	tangleOnce.Do(func() {
    		_tangle = tangle.New(database.Store())
    	})
    	return _tangle
    }
    
    // FCOB gets the fcob instance.
    // fcob contains the fcob consensus logic.
    func FCOB() *consensus.FCOB {
    	return fcob
    }
    
    // LedgerState gets the ledgerState instance.
    // ledgerState represents the ledger state, that keeps track of the liked branches and offers an API to access funds.
    func LedgerState() *tangle.LedgerState {
    	return ledgerState
    }
    
    func configure(_ *node.Plugin) {
    	// configure logger
    	log = logger.NewLogger(PluginName)
    
    	// configure Tangle
    	_tangle = Tangle()
    
    	// configure LedgerState
    	ledgerState = tangle.NewLedgerState(Tangle())
    
    	// read snapshot file
    	snapshotFilePath := config.Node().GetString(CfgValueLayerSnapshotFile)
    	if len(snapshotFilePath) != 0 {
    		snapshot := tangle.Snapshot{}
    		f, err := os.Open(snapshotFilePath)
    		if err != nil {
    			log.Panic("can not open snapshot file:", err)
    		}
    		if _, err := snapshot.ReadFrom(f); err != nil {
    			log.Panic("could not read snapshot file:", err)
    		}
    		_tangle.LoadSnapshot(snapshot)
    		log.Infof("read snapshot from %s", snapshotFilePath)
    	}
    
    	_tangle.Events.Error.Attach(events.NewClosure(func(err error) {
    		log.Error(err)
    	}))
    
    	// initialize tip manager and value object factory
    	tipManager = TipManager()
    	valueObjectFactory = ValueObjectFactory()
    
    	_tangle.Events.PayloadLiked.Attach(events.NewClosure(func(cachedPayloadEvent *tangle.CachedPayloadEvent) {
    		cachedPayloadEvent.PayloadMetadata.Release()
    		cachedPayloadEvent.Payload.Consume(tipManager.AddTip)
    	}))
    	_tangle.Events.PayloadDisliked.Attach(events.NewClosure(func(cachedPayloadEvent *tangle.CachedPayloadEvent) {
    		cachedPayloadEvent.PayloadMetadata.Release()
    		cachedPayloadEvent.Payload.Consume(tipManager.RemoveTip)
    	}))
    
    	// configure FCOB consensus rules
    	cfgAvgNetworkDelay := config.Node().GetInt(CfgValueLayerFCOBAverageNetworkDelay)
    	log.Infof("avg. network delay configured to %d seconds", cfgAvgNetworkDelay)
    	fcob = consensus.NewFCOB(_tangle, time.Duration(cfgAvgNetworkDelay)*time.Second)
    	fcob.Events.Vote.Attach(events.NewClosure(func(id string, initOpn vote.Opinion) {
    		if err := voter.Vote(id, initOpn); err != nil {
    			log.Warnf("FPC vote: %s", err)
    		}
    	}))
    	fcob.Events.Error.Attach(events.NewClosure(func(err error) {
    		log.Errorf("FCOB error: %s", err)
    	}))
    
    	// configure FPC + link to consensus
    	configureFPC()
    	voter.Events().Finalized.Attach(events.NewClosure(fcob.ProcessVoteResult))
    	voter.Events().Finalized.Attach(events.NewClosure(func(ev *vote.OpinionEvent) {
    		log.Infof("FPC finalized for transaction with id '%s' - final opinion: '%s'", ev.ID, ev.Opinion)
    	}))
    	voter.Events().Failed.Attach(events.NewClosure(func(ev *vote.OpinionEvent) {
    		log.Warnf("FPC failed for transaction with id '%s' - last opinion: '%s'", ev.ID, ev.Opinion)
    	}))
    
    	// register SignatureFilter in Parser
    	messagelayer.MessageParser().AddMessageFilter(tangle.NewSignatureFilter())
    
    	// subscribe to message-layer
    	messagelayer.Tangle().Events.MessageSolid.Attach(events.NewClosure(onReceiveMessageFromMessageLayer))
    }
    
    func run(*node.Plugin) {
    	if err := daemon.BackgroundWorker("ValueTangle", func(shutdownSignal <-chan struct{}) {
    		<-shutdownSignal
    		// TODO: make this better
    		time.Sleep(12 * time.Second)
    		_tangle.Shutdown()
    	}, shutdown.PriorityTangle); err != nil {
    		log.Panicf("Failed to start as daemon: %s", err)
    	}
    
    	runFPC()
    }
    
    func onReceiveMessageFromMessageLayer(cachedMessageEvent *messageTangle.CachedMessageEvent) {
    	defer cachedMessageEvent.Message.Release()
    	defer cachedMessageEvent.MessageMetadata.Release()
    
    	solidMessage := cachedMessageEvent.Message.Unwrap()
    	if solidMessage == nil {
    		log.Debug("failed to unpack solid message from message layer")
    
    		return
    	}
    
    	messagePayload := solidMessage.Payload()
    	if messagePayload.Type() != valuepayload.Type {
    		return
    	}
    
    	valuePayload, ok := messagePayload.(*valuepayload.Payload)
    	if !ok {
    		log.Debug("could not cast payload to value payload")
    
    		return
    	}
    
    	_tangle.AttachPayload(valuePayload)
    }
    
    // TipManager returns the TipManager singleton.
    func TipManager() *tipmanager.TipManager {
    	tipManagerOnce.Do(func() {
    		tipManager = tipmanager.New()
    	})
    	return tipManager
    }
    
    // ValueObjectFactory returns the ValueObjectFactory singleton.
    func ValueObjectFactory() *tangle.ValueObjectFactory {
    	valueObjectFactoryOnce.Do(func() {
    		valueObjectFactory = tangle.NewValueObjectFactory(Tangle(), TipManager())
    	})
    	return valueObjectFactory
    }
    
    // AwaitTransactionToBeBooked awaits maxAwait for the given transaction to get booked.
    func AwaitTransactionToBeBooked(txID transaction.ID, maxAwait time.Duration) error {
    	booked := make(chan struct{}, 1)
    	// exit is used to let the caller exit if for whatever
    	// reason the same transaction gets booked multiple times
    	exit := make(chan struct{})
    	defer close(exit)
    	closure := events.NewClosure(func(cachedTransactionBookEvent *tangle.CachedTransactionBookEvent) {
    		defer cachedTransactionBookEvent.Transaction.Release()
    		defer cachedTransactionBookEvent.TransactionMetadata.Release()
    		if cachedTransactionBookEvent.Transaction.Unwrap().ID() != txID {
    			return
    		}
    		select {
    		case booked <- struct{}{}:
    		case <-exit:
    		}
    	})
    	Tangle().Events.TransactionBooked.Attach(closure)
    	defer Tangle().Events.TransactionBooked.Detach(closure)
    	select {
    	case <-time.After(maxAwait):
    		return ErrTransactionWasNotBookedInTime
    	case <-booked:
    		return nil
    	}
    }