"...goshimmer_without_tipselection.git" did not exist on "2ec8656c272403580e081238b56258aa39347a6a"
Select Git revision
dapp.go 8.22 KiB
package valuetransfers
import (
"errors"
"os"
"sync"
"time"
"github.com/iotaledger/goshimmer/dapps/valuetransfers/packages/consensus"
valuepayload "github.com/iotaledger/goshimmer/dapps/valuetransfers/packages/payload"
"github.com/iotaledger/goshimmer/dapps/valuetransfers/packages/tangle"
"github.com/iotaledger/goshimmer/dapps/valuetransfers/packages/tipmanager"
"github.com/iotaledger/goshimmer/dapps/valuetransfers/packages/transaction"
messageTangle "github.com/iotaledger/goshimmer/packages/binary/messagelayer/tangle"
"github.com/iotaledger/goshimmer/packages/shutdown"
"github.com/iotaledger/goshimmer/packages/vote"
"github.com/iotaledger/goshimmer/plugins/config"
"github.com/iotaledger/goshimmer/plugins/database"
"github.com/iotaledger/goshimmer/plugins/messagelayer"
"github.com/iotaledger/hive.go/daemon"
"github.com/iotaledger/hive.go/events"
"github.com/iotaledger/hive.go/logger"
"github.com/iotaledger/hive.go/node"
flag "github.com/spf13/pflag"
)
const (
// PluginName contains the human readable name of the plugin.
PluginName = "ValueTransfers"
// DefaultAverageNetworkDelay contains the default average time it takes for a network to propagate through gossip.
DefaultAverageNetworkDelay = 5 * time.Second
// CfgValueLayerSnapshotFile is the path to the snapshot file.
CfgValueLayerSnapshotFile = "valueLayer.snapshot.file"
// CfgValueLayerFCOBAverageNetworkDelay is the avg. network delay to use for FCoB rules
CfgValueLayerFCOBAverageNetworkDelay = "valueLayer.fcob.averageNetworkDelay"
)
func init() {
flag.String(CfgValueLayerSnapshotFile, "./snapshot.bin", "the path to the snapshot file")
flag.Int(CfgValueLayerFCOBAverageNetworkDelay, 5, "the avg. network delay to use for FCoB rules")
}
var (
// ErrTransactionWasNotBookedInTime is returned if a transaction did not get booked
// within the defined await time.
ErrTransactionWasNotBookedInTime = errors.New("transaction could not be booked in time")
// app is the "plugin" instance of the value-transfers application.
app *node.Plugin
appOnce sync.Once
// _tangle represents the value tangle that is used to express votes on value transactions.
_tangle *tangle.Tangle
tangleOnce sync.Once
// fcob contains the fcob consensus logic.
fcob *consensus.FCOB
// ledgerState represents the ledger state, that keeps track of the liked branches and offers an API to access funds.
ledgerState *tangle.LedgerState
// log holds a reference to the logger used by this app.
log *logger.Logger
tipManager *tipmanager.TipManager
tipManagerOnce sync.Once
valueObjectFactory *tangle.ValueObjectFactory
valueObjectFactoryOnce sync.Once
)
// App gets the plugin instance.
func App() *node.Plugin {
appOnce.Do(func() {
app = node.NewPlugin(PluginName, node.Enabled, configure, run)
})
return app
}
// Tangle gets the tangle instance.
// tangle represents the value tangle that is used to express votes on value transactions.
func Tangle() *tangle.Tangle {
tangleOnce.Do(func() {
_tangle = tangle.New(database.Store())
})
return _tangle
}
// FCOB gets the fcob instance.
// fcob contains the fcob consensus logic.
func FCOB() *consensus.FCOB {
return fcob
}
// LedgerState gets the ledgerState instance.
// ledgerState represents the ledger state, that keeps track of the liked branches and offers an API to access funds.
func LedgerState() *tangle.LedgerState {
return ledgerState
}
func configure(_ *node.Plugin) {
// configure logger
log = logger.NewLogger(PluginName)
// configure Tangle
_tangle = Tangle()
// configure LedgerState
ledgerState = tangle.NewLedgerState(Tangle())
// read snapshot file
snapshotFilePath := config.Node().GetString(CfgValueLayerSnapshotFile)
if len(snapshotFilePath) != 0 {
snapshot := tangle.Snapshot{}
f, err := os.Open(snapshotFilePath)
if err != nil {
log.Panic("can not open snapshot file:", err)
}
if _, err := snapshot.ReadFrom(f); err != nil {
log.Panic("could not read snapshot file:", err)
}
_tangle.LoadSnapshot(snapshot)
log.Infof("read snapshot from %s", snapshotFilePath)
}
_tangle.Events.Error.Attach(events.NewClosure(func(err error) {
log.Error(err)
}))
// initialize tip manager and value object factory
tipManager = TipManager()
valueObjectFactory = ValueObjectFactory()
_tangle.Events.PayloadLiked.Attach(events.NewClosure(func(cachedPayloadEvent *tangle.CachedPayloadEvent) {
cachedPayloadEvent.PayloadMetadata.Release()
cachedPayloadEvent.Payload.Consume(tipManager.AddTip)
}))
_tangle.Events.PayloadDisliked.Attach(events.NewClosure(func(cachedPayloadEvent *tangle.CachedPayloadEvent) {
cachedPayloadEvent.PayloadMetadata.Release()
cachedPayloadEvent.Payload.Consume(tipManager.RemoveTip)
}))
// configure FCOB consensus rules
cfgAvgNetworkDelay := config.Node().GetInt(CfgValueLayerFCOBAverageNetworkDelay)
log.Infof("avg. network delay configured to %d seconds", cfgAvgNetworkDelay)
fcob = consensus.NewFCOB(_tangle, time.Duration(cfgAvgNetworkDelay)*time.Second)
fcob.Events.Vote.Attach(events.NewClosure(func(id string, initOpn vote.Opinion) {
if err := voter.Vote(id, initOpn); err != nil {
log.Warnf("FPC vote: %s", err)
}
}))
fcob.Events.Error.Attach(events.NewClosure(func(err error) {
log.Errorf("FCOB error: %s", err)
}))
// configure FPC + link to consensus
configureFPC()
voter.Events().Finalized.Attach(events.NewClosure(fcob.ProcessVoteResult))
voter.Events().Finalized.Attach(events.NewClosure(func(ev *vote.OpinionEvent) {
log.Infof("FPC finalized for transaction with id '%s' - final opinion: '%s'", ev.ID, ev.Opinion)
}))
voter.Events().Failed.Attach(events.NewClosure(func(ev *vote.OpinionEvent) {
log.Warnf("FPC failed for transaction with id '%s' - last opinion: '%s'", ev.ID, ev.Opinion)
}))
// register SignatureFilter in Parser
messagelayer.MessageParser().AddMessageFilter(tangle.NewSignatureFilter())
// subscribe to message-layer
messagelayer.Tangle().Events.MessageSolid.Attach(events.NewClosure(onReceiveMessageFromMessageLayer))
}
func run(*node.Plugin) {
if err := daemon.BackgroundWorker("ValueTangle", func(shutdownSignal <-chan struct{}) {
<-shutdownSignal
// TODO: make this better
time.Sleep(12 * time.Second)
_tangle.Shutdown()
}, shutdown.PriorityTangle); err != nil {
log.Panicf("Failed to start as daemon: %s", err)
}
runFPC()
}
func onReceiveMessageFromMessageLayer(cachedMessageEvent *messageTangle.CachedMessageEvent) {
defer cachedMessageEvent.Message.Release()
defer cachedMessageEvent.MessageMetadata.Release()
solidMessage := cachedMessageEvent.Message.Unwrap()
if solidMessage == nil {
log.Debug("failed to unpack solid message from message layer")
return
}
messagePayload := solidMessage.Payload()
if messagePayload.Type() != valuepayload.Type {
return
}
valuePayload, ok := messagePayload.(*valuepayload.Payload)
if !ok {
log.Debug("could not cast payload to value payload")
return
}
_tangle.AttachPayload(valuePayload)
}
// TipManager returns the TipManager singleton.
func TipManager() *tipmanager.TipManager {
tipManagerOnce.Do(func() {
tipManager = tipmanager.New()
})
return tipManager
}
// ValueObjectFactory returns the ValueObjectFactory singleton.
func ValueObjectFactory() *tangle.ValueObjectFactory {
valueObjectFactoryOnce.Do(func() {
valueObjectFactory = tangle.NewValueObjectFactory(Tangle(), TipManager())
})
return valueObjectFactory
}
// AwaitTransactionToBeBooked awaits maxAwait for the given transaction to get booked.
func AwaitTransactionToBeBooked(txID transaction.ID, maxAwait time.Duration) error {
booked := make(chan struct{}, 1)
// exit is used to let the caller exit if for whatever
// reason the same transaction gets booked multiple times
exit := make(chan struct{})
defer close(exit)
closure := events.NewClosure(func(cachedTransactionBookEvent *tangle.CachedTransactionBookEvent) {
defer cachedTransactionBookEvent.Transaction.Release()
defer cachedTransactionBookEvent.TransactionMetadata.Release()
if cachedTransactionBookEvent.Transaction.Unwrap().ID() != txID {
return
}
select {
case booked <- struct{}{}:
case <-exit:
}
})
Tangle().Events.TransactionBooked.Attach(closure)
defer Tangle().Events.TransactionBooked.Detach(closure)
select {
case <-time.After(maxAwait):
return ErrTransactionWasNotBookedInTime
case <-booked:
return nil
}
}