From 9039716c61394021ce8c77d2040eb4cd76c28b04 Mon Sep 17 00:00:00 2001 From: Hans Moog <hm@mkjc.net> Date: Tue, 2 Jun 2020 18:22:09 +0200 Subject: [PATCH] Feat: Starting to implement the propagation of the confirmed flag (#440) * Fix: fixed marshal/unmarshal of branch regarding finalized flag * Feat: added confirmed propagation to branch dag * Feat: added rejected marshaling * Feat: added comment for SetBranchFinalized * Feat: finished confirmed propagation * Feat: private setters so one can not mess with the tangle * Refactor: refactored some code * Feat: linked branchmanager to tangle * Feat: refactored confirmed propagation * Feat: started to clean up the code / resort methods * Feat: cleaned up more code * Refactor: reordered methods --- .../packages/branchmanager/branch.go | 47 +- .../packages/branchmanager/branchmanager.go | 141 ++- .../packages/branchmanager/events.go | 10 + .../valuetransfers/packages/consensus/fcob.go | 4 +- .../valuetransfers/packages/tangle/events.go | 38 +- .../valuetransfers/packages/tangle/output.go | 4 +- .../packages/tangle/payloadmetadata.go | 86 +- .../packages/tangle/payloadmetadata_test.go | 4 +- .../valuetransfers/packages/tangle/tangle.go | 862 +++++++++++------- .../packages/tangle/tangle_test.go | 4 +- .../packages/test/tangle_test.go | 5 + 11 files changed, 873 insertions(+), 332 deletions(-) diff --git a/dapps/valuetransfers/packages/branchmanager/branch.go b/dapps/valuetransfers/packages/branchmanager/branch.go index 2fde95d8..7ccf9b0c 100644 --- a/dapps/valuetransfers/packages/branchmanager/branch.go +++ b/dapps/valuetransfers/packages/branchmanager/branch.go @@ -23,6 +23,7 @@ type Branch struct { liked bool finalized bool confirmed bool + rejected bool parentBranchesMutex sync.RWMutex conflictsMutex sync.RWMutex @@ -30,6 +31,7 @@ type Branch struct { likedMutex sync.RWMutex finalizedMutex sync.RWMutex confirmedMutex sync.RWMutex + rejectedMutex sync.RWMutex } // NewBranch is the constructor of a Branch and creates a new Branch object from the given details. @@ -318,6 +320,39 @@ func (branch *Branch) setConfirmed(confirmed bool) (modified bool) { return } +// Rejected returns true if the branch has been rejected to be part of the ledger state. +func (branch *Branch) Rejected() bool { + branch.rejectedMutex.RLock() + defer branch.rejectedMutex.RUnlock() + + return branch.rejected +} + +// setRejected is the setter for the rejected flag. It returns true if the value of the flag has been updated. +// A branch is rejected if it is considered to have been rejected to be part of the ledger state. +func (branch *Branch) setRejected(rejected bool) (modified bool) { + branch.rejectedMutex.RLock() + if branch.rejected == rejected { + branch.rejectedMutex.RUnlock() + + return + } + + branch.rejectedMutex.RUnlock() + branch.rejectedMutex.Lock() + defer branch.rejectedMutex.Unlock() + + if branch.rejected == rejected { + return + } + + branch.rejected = rejected + branch.SetModified() + modified = true + + return +} + // Bytes returns a marshaled version of this Branch. func (branch *Branch) Bytes() []byte { return marshalutil.New(). @@ -354,10 +389,12 @@ func (branch *Branch) ObjectStorageValue() []byte { parentBranches := branch.ParentBranches() parentBranchCount := len(parentBranches) - marshalUtil := marshalutil.New(3*marshalutil.BOOL_SIZE + marshalutil.UINT32_SIZE + parentBranchCount*BranchIDLength) + marshalUtil := marshalutil.New(5*marshalutil.BOOL_SIZE + marshalutil.UINT32_SIZE + parentBranchCount*BranchIDLength) marshalUtil.WriteBool(branch.Preferred()) marshalUtil.WriteBool(branch.Liked()) + marshalUtil.WriteBool(branch.Finalized()) marshalUtil.WriteBool(branch.Confirmed()) + marshalUtil.WriteBool(branch.Rejected()) marshalUtil.WriteUint32(uint32(parentBranchCount)) for _, branchID := range parentBranches { marshalUtil.WriteBytes(branchID.Bytes()) @@ -377,10 +414,18 @@ func (branch *Branch) UnmarshalObjectStorageValue(valueBytes []byte) (consumedBy if err != nil { return } + branch.finalized, err = marshalUtil.ReadBool() + if err != nil { + return + } branch.confirmed, err = marshalUtil.ReadBool() if err != nil { return } + branch.rejected, err = marshalUtil.ReadBool() + if err != nil { + return + } parentBranchCount, err := marshalUtil.ReadUint32() if err != nil { return diff --git a/dapps/valuetransfers/packages/branchmanager/branchmanager.go b/dapps/valuetransfers/packages/branchmanager/branchmanager.go index 9e1bafbe..32093550 100644 --- a/dapps/valuetransfers/packages/branchmanager/branchmanager.go +++ b/dapps/valuetransfers/packages/branchmanager/branchmanager.go @@ -53,6 +53,9 @@ func New(store kvstore.KVStore) (branchManager *BranchManager) { BranchUnpreferred: events.NewEvent(branchCaller), BranchLiked: events.NewEvent(branchCaller), BranchDisliked: events.NewEvent(branchCaller), + BranchFinalized: events.NewEvent(branchCaller), + BranchConfirmed: events.NewEvent(branchCaller), + BranchRejected: events.NewEvent(branchCaller), }, } branchManager.init() @@ -346,7 +349,123 @@ func (branchManager *BranchManager) SetBranchLiked(branchID BranchID, liked bool return branchManager.setBranchLiked(branchManager.Branch(branchID), liked) } +// SetBranchFinalized modifies the finalized flag of a branch. It automatically triggers func (branchManager *BranchManager) SetBranchFinalized(branchID BranchID) (modified bool, err error) { + return branchManager.setBranchFinalized(branchManager.Branch(branchID)) +} + +func (branchManager *BranchManager) setBranchFinalized(cachedBranch *CachedBranch) (modified bool, err error) { + defer cachedBranch.Release() + branch := cachedBranch.Unwrap() + if branch == nil { + err = fmt.Errorf("failed to unwrap branch") + + return + } + + if modified = branch.setFinalized(true); !modified { + return + } + + branchManager.Events.BranchFinalized.Trigger(cachedBranch) + + if !branch.Preferred() { + branchManager.propagateRejectedToChildBranches(cachedBranch.Retain()) + + return + } + + // update all other branches that are in the same conflict sets to be not preferred and also finalized + for conflictID := range branch.Conflicts() { + branchManager.ConflictMembers(conflictID).Consume(func(conflictMember *ConflictMember) { + // skip the branch which just got preferred + if conflictMember.BranchID() == branch.ID() { + return + } + + _, err = branchManager.setBranchPreferred(branchManager.Branch(conflictMember.BranchID()), false) + if err != nil { + return + } + _, err = branchManager.setBranchFinalized(branchManager.Branch(conflictMember.BranchID())) + if err != nil { + return + } + }) + } + + err = branchManager.propagateConfirmedToChildBranches(cachedBranch.Retain()) + + return +} + +func (branchManager *BranchManager) propagateRejectedToChildBranches(cachedBranch *CachedBranch) { + branchStack := list.New() + branchStack.PushBack(cachedBranch) + + for branchStack.Len() >= 1 { + currentStackElement := branchStack.Front() + currentCachedBranch := currentStackElement.Value.(*CachedBranch) + branchStack.Remove(currentStackElement) + + currentBranch := currentCachedBranch.Unwrap() + if currentBranch == nil || !currentBranch.setRejected(false) { + currentCachedBranch.Release() + + continue + } + + branchManager.Events.BranchRejected.Trigger(cachedBranch) + + branchManager.ChildBranches(currentBranch.ID()).Consume(func(childBranch *ChildBranch) { + branchStack.PushBack(branchManager.Branch(childBranch.ChildID())) + }) + + currentCachedBranch.Release() + } +} + +func (branchManager *BranchManager) propagateConfirmedToChildBranches(cachedBranch *CachedBranch) (err error) { + // initialize stack with our entry point for the propagation + propagationStack := list.New() + propagationStack.PushBack(cachedBranch) + + // iterate through stack to propagate the changes to child branches + for propagationStack.Len() >= 1 { + stackElement := propagationStack.Front() + stackElement.Value.(*CachedBranch).Consume(func(branch *Branch) { + // abort if the branch does not fulfill the conditions to be confirmed + if !branch.Preferred() || !branch.Finalized() { + return + } + + // abort if not all parents are confirmed + for _, parentBranchID := range branch.ParentBranches() { + cachedParentBranch := branchManager.Branch(parentBranchID) + if parentBranch := cachedParentBranch.Unwrap(); parentBranch == nil || !parentBranch.Confirmed() { + cachedParentBranch.Release() + + return + } + cachedParentBranch.Release() + } + + // abort if the branch was confirmed already + if !branch.setConfirmed(true) { + return + } + + // trigger events + branchManager.Events.BranchConfirmed.Trigger(cachedBranch) + + // schedule confirmed checks of children + for _, cachedChildBranch := range branchManager.ChildBranches(branch.ID()) { + propagationStack.PushBack(cachedChildBranch) + } + }) + propagationStack.Remove(stackElement) + } + return } @@ -515,7 +634,10 @@ func (branchManager *BranchManager) setBranchLiked(cachedBranch *CachedBranch, l return } - _, _ = branchManager.setBranchPreferred(branchManager.Branch(conflictMember.BranchID()), false) + _, err = branchManager.setBranchPreferred(branchManager.Branch(conflictMember.BranchID()), false) + if err != nil { + return + } }) } @@ -553,6 +675,23 @@ func (branchManager *BranchManager) IsBranchLiked(id BranchID) (liked bool) { return } +// IsBranchConfirmed returns true if the Branch is marked as confirmed. +func (branchManager *BranchManager) IsBranchConfirmed(id BranchID) (confirmed bool) { + if id == UndefinedBranchID { + return + } + + if id == MasterBranchID { + return true + } + + branchManager.Branch(id).Consume(func(branch *Branch) { + confirmed = branch.Confirmed() + }) + + return +} + func (branchManager *BranchManager) propagateLike(cachedBranch *CachedBranch) (err error) { // unpack CachedBranch and abort of the branch doesn't exist or isn't preferred defer cachedBranch.Release() diff --git a/dapps/valuetransfers/packages/branchmanager/events.go b/dapps/valuetransfers/packages/branchmanager/events.go index 87aa61a7..f9615e20 100644 --- a/dapps/valuetransfers/packages/branchmanager/events.go +++ b/dapps/valuetransfers/packages/branchmanager/events.go @@ -17,6 +17,16 @@ type Events struct { // BranchLiked gets triggered whenever a Branch becomes preferred that was not preferred before. BranchDisliked *events.Event + + // BranchFinalized gets triggered when a decision on a Branch is finalized and there will be no further state + // changes regarding its preferred state. + BranchFinalized *events.Event + + // BranchConfirmed gets triggered whenever a Branch becomes confirmed that was not confirmed before. + BranchConfirmed *events.Event + + // BranchRejected gets triggered whenever a Branch becomes rejected that was not rejected before. + BranchRejected *events.Event } func branchCaller(handler interface{}, params ...interface{}) { diff --git a/dapps/valuetransfers/packages/consensus/fcob.go b/dapps/valuetransfers/packages/consensus/fcob.go index 7c9d1446..00722a7d 100644 --- a/dapps/valuetransfers/packages/consensus/fcob.go +++ b/dapps/valuetransfers/packages/consensus/fcob.go @@ -126,7 +126,9 @@ func (fcob *FCOB) setFinalized(cachedTransactionMetadata *tangle.CachedTransacti return } - transactionMetadata.SetFinalized(true) + if _, err := fcob.tangle.SetTransactionFinalized(transactionMetadata.ID()); err != nil { + fcob.Events.Error.Trigger(err) + } }) } diff --git a/dapps/valuetransfers/packages/tangle/events.go b/dapps/valuetransfers/packages/tangle/events.go index eaa5f33d..444d80ac 100644 --- a/dapps/valuetransfers/packages/tangle/events.go +++ b/dapps/valuetransfers/packages/tangle/events.go @@ -14,6 +14,8 @@ type Events struct { PayloadAttached *events.Event PayloadSolid *events.Event PayloadLiked *events.Event + PayloadConfirmed *events.Event + PayloadRejected *events.Event PayloadDisliked *events.Event MissingPayloadReceived *events.Event PayloadMissing *events.Event @@ -25,6 +27,12 @@ type Events struct { // TransactionBooked gets triggered whenever a transactions becomes solid and gets booked into a particular branch. TransactionBooked *events.Event + TransactionPreferred *events.Event + + TransactionUnpreferred *events.Event + + TransactionFinalized *events.Event + // Fork gets triggered when a previously un-conflicting transaction get's some of its inputs double spend, so that a // new Branch is created. Fork *events.Event @@ -32,17 +40,38 @@ type Events struct { Error *events.Event } +// EventSource is a type that contains information from where a specific change was triggered (the branch manager or +// the tangle). +type EventSource int + +const ( + // EventSourceTangle indicates that a change was issued by the Tangle. + EventSourceTangle EventSource = iota + + // EventSourceBranchManager indicates that a change was issued by the BranchManager. + EventSourceBranchManager +) + +func (eventSource EventSource) String() string { + return [...]string{"EventSourceTangle", "EventSourceBranchManager"}[eventSource] +} + func newEvents() *Events { return &Events{ PayloadAttached: events.NewEvent(cachedPayloadEvent), PayloadSolid: events.NewEvent(cachedPayloadEvent), PayloadLiked: events.NewEvent(cachedPayloadEvent), + PayloadConfirmed: events.NewEvent(cachedPayloadEvent), + PayloadRejected: events.NewEvent(cachedPayloadEvent), PayloadDisliked: events.NewEvent(cachedPayloadEvent), MissingPayloadReceived: events.NewEvent(cachedPayloadEvent), PayloadMissing: events.NewEvent(payloadIDEvent), PayloadUnsolidifiable: events.NewEvent(payloadIDEvent), - TransactionReceived: events.NewEvent(cachedTransactionEvent), + TransactionReceived: events.NewEvent(cachedTransactionAttachmentEvent), TransactionBooked: events.NewEvent(transactionBookedEvent), + TransactionPreferred: events.NewEvent(cachedTransactionEvent), + TransactionUnpreferred: events.NewEvent(cachedTransactionEvent), + TransactionFinalized: events.NewEvent(cachedTransactionEvent), Fork: events.NewEvent(forkEvent), Error: events.NewEvent(events.ErrorCaller), } @@ -77,6 +106,13 @@ func forkEvent(handler interface{}, params ...interface{}) { } func cachedTransactionEvent(handler interface{}, params ...interface{}) { + handler.(func(*transaction.CachedTransaction, *CachedTransactionMetadata))( + params[0].(*transaction.CachedTransaction).Retain(), + params[1].(*CachedTransactionMetadata).Retain(), + ) +} + +func cachedTransactionAttachmentEvent(handler interface{}, params ...interface{}) { handler.(func(*transaction.CachedTransaction, *CachedTransactionMetadata, *CachedAttachment))( params[0].(*transaction.CachedTransaction).Retain(), params[1].(*CachedTransactionMetadata).Retain(), diff --git a/dapps/valuetransfers/packages/tangle/output.go b/dapps/valuetransfers/packages/tangle/output.go index 54fe2188..b4de9e25 100644 --- a/dapps/valuetransfers/packages/tangle/output.go +++ b/dapps/valuetransfers/packages/tangle/output.go @@ -166,8 +166,8 @@ func (output *Output) Solid() bool { return output.solid } -// SetSolid is the setter of the solid flag. It returns true if the solid flag was modified. -func (output *Output) SetSolid(solid bool) (modified bool) { +// setSolid is the setter of the solid flag. It returns true if the solid flag was modified. +func (output *Output) setSolid(solid bool) (modified bool) { output.solidMutex.RLock() if output.solid != solid { output.solidMutex.RUnlock() diff --git a/dapps/valuetransfers/packages/tangle/payloadmetadata.go b/dapps/valuetransfers/packages/tangle/payloadmetadata.go index 39840272..d60a9494 100644 --- a/dapps/valuetransfers/packages/tangle/payloadmetadata.go +++ b/dapps/valuetransfers/packages/tangle/payloadmetadata.go @@ -21,11 +21,15 @@ type PayloadMetadata struct { solid bool solidificationTime time.Time liked bool + confirmed bool + rejected bool branchID branchmanager.BranchID solidMutex sync.RWMutex solidificationTimeMutex sync.RWMutex likedMutex sync.RWMutex + confirmedMutex sync.RWMutex + rejectedMutex sync.RWMutex branchIDMutex sync.RWMutex } @@ -104,9 +108,9 @@ func (payloadMetadata *PayloadMetadata) IsSolid() (result bool) { return } -// SetSolid marks a payload as either solid or not solid. +// setSolid marks a payload as either solid or not solid. // It returns true if the solid flag was changes and automatically updates the solidificationTime as well. -func (payloadMetadata *PayloadMetadata) SetSolid(solid bool) (modified bool) { +func (payloadMetadata *PayloadMetadata) setSolid(solid bool) (modified bool) { payloadMetadata.solidMutex.RLock() if payloadMetadata.solid != solid { payloadMetadata.solidMutex.RUnlock() @@ -149,8 +153,8 @@ func (payloadMetadata *PayloadMetadata) Liked() bool { return payloadMetadata.liked } -// SetLiked modifies the liked flag of the given Payload. It returns true if the value has been updated. -func (payloadMetadata *PayloadMetadata) SetLiked(liked bool) (modified bool) { +// setLiked modifies the liked flag of the given Payload. It returns true if the value has been updated. +func (payloadMetadata *PayloadMetadata) setLiked(liked bool) (modified bool) { payloadMetadata.likedMutex.RLock() if payloadMetadata.liked == liked { payloadMetadata.likedMutex.RUnlock() @@ -173,6 +177,70 @@ func (payloadMetadata *PayloadMetadata) SetLiked(liked bool) (modified bool) { return } +// Confirmed returns true if the Payload was marked as confirmed. +func (payloadMetadata *PayloadMetadata) Confirmed() bool { + payloadMetadata.confirmedMutex.RLock() + defer payloadMetadata.confirmedMutex.RUnlock() + + return payloadMetadata.confirmed +} + +// setConfirmed modifies the confirmed flag of the given Payload. It returns true if the value has been updated. +func (payloadMetadata *PayloadMetadata) setConfirmed(confirmed bool) (modified bool) { + payloadMetadata.confirmedMutex.RLock() + if payloadMetadata.confirmed == confirmed { + payloadMetadata.confirmedMutex.RUnlock() + + return + } + + payloadMetadata.confirmedMutex.RUnlock() + payloadMetadata.confirmedMutex.Lock() + defer payloadMetadata.confirmedMutex.Unlock() + + if payloadMetadata.confirmed == confirmed { + return + } + + payloadMetadata.confirmed = confirmed + payloadMetadata.SetModified() + modified = true + + return +} + +// Rejected returns true if the Payload was marked as confirmed. +func (payloadMetadata *PayloadMetadata) Rejected() bool { + payloadMetadata.rejectedMutex.RLock() + defer payloadMetadata.rejectedMutex.RUnlock() + + return payloadMetadata.rejected +} + +// setRejected modifies the rejected flag of the given Payload. It returns true if the value has been updated. +func (payloadMetadata *PayloadMetadata) setRejected(rejected bool) (modified bool) { + payloadMetadata.rejectedMutex.RLock() + if payloadMetadata.rejected == rejected { + payloadMetadata.rejectedMutex.RUnlock() + + return + } + + payloadMetadata.rejectedMutex.RUnlock() + payloadMetadata.rejectedMutex.Lock() + defer payloadMetadata.rejectedMutex.Unlock() + + if payloadMetadata.rejected == rejected { + return + } + + payloadMetadata.rejected = rejected + payloadMetadata.SetModified() + modified = true + + return +} + // BranchID returns the identifier of the Branch that this Payload was booked into. func (payloadMetadata *PayloadMetadata) BranchID() branchmanager.BranchID { payloadMetadata.branchIDMutex.RLock() @@ -236,10 +304,12 @@ func (payloadMetadata *PayloadMetadata) Update(other objectstorage.StorableObjec // ObjectStorageValue is required to match the encoding.BinaryMarshaler interface. func (payloadMetadata *PayloadMetadata) ObjectStorageValue() []byte { - return marshalutil.New(marshalutil.TIME_SIZE + 2*marshalutil.BOOL_SIZE). + return marshalutil.New(marshalutil.TIME_SIZE + 4*marshalutil.BOOL_SIZE). WriteTime(payloadMetadata.solidificationTime). WriteBool(payloadMetadata.solid). WriteBool(payloadMetadata.liked). + WriteBool(payloadMetadata.confirmed). + WriteBool(payloadMetadata.rejected). WriteBytes(payloadMetadata.branchID.Bytes()). Bytes() } @@ -256,6 +326,12 @@ func (payloadMetadata *PayloadMetadata) UnmarshalObjectStorageValue(data []byte) if payloadMetadata.liked, err = marshalUtil.ReadBool(); err != nil { return } + if payloadMetadata.confirmed, err = marshalUtil.ReadBool(); err != nil { + return + } + if payloadMetadata.rejected, err = marshalUtil.ReadBool(); err != nil { + return + } if payloadMetadata.branchID, err = branchmanager.ParseBranchID(marshalUtil); err != nil { return } diff --git a/dapps/valuetransfers/packages/tangle/payloadmetadata_test.go b/dapps/valuetransfers/packages/tangle/payloadmetadata_test.go index 0b028341..90bf32bd 100644 --- a/dapps/valuetransfers/packages/tangle/payloadmetadata_test.go +++ b/dapps/valuetransfers/packages/tangle/payloadmetadata_test.go @@ -21,7 +21,7 @@ func TestMarshalUnmarshal(t *testing.T) { assert.Equal(t, originalMetadata.IsSolid(), clonedMetadata.IsSolid()) assert.Equal(t, originalMetadata.SoldificationTime().Round(time.Second), clonedMetadata.SoldificationTime().Round(time.Second)) - originalMetadata.SetSolid(true) + originalMetadata.setSolid(true) clonedMetadata, _, err = PayloadMetadataFromBytes(originalMetadata.Bytes()) if err != nil { @@ -39,7 +39,7 @@ func TestPayloadMetadata_SetSolid(t *testing.T) { assert.Equal(t, false, originalMetadata.IsSolid()) assert.Equal(t, time.Time{}, originalMetadata.SoldificationTime()) - originalMetadata.SetSolid(true) + originalMetadata.setSolid(true) assert.Equal(t, true, originalMetadata.IsSolid()) assert.Equal(t, time.Now().Round(time.Second), originalMetadata.SoldificationTime().Round(time.Second)) diff --git a/dapps/valuetransfers/packages/tangle/tangle.go b/dapps/valuetransfers/packages/tangle/tangle.go index eaa3bba7..c8a63f48 100644 --- a/dapps/valuetransfers/packages/tangle/tangle.go +++ b/dapps/valuetransfers/packages/tangle/tangle.go @@ -43,10 +43,10 @@ type Tangle struct { } // New is the constructor of a Tangle and creates a new Tangle object from the given details. -func New(store kvstore.KVStore) (result *Tangle) { +func New(store kvstore.KVStore) (tangle *Tangle) { osFactory := objectstorage.NewFactory(store, storageprefix.ValueTransfers) - result = &Tangle{ + tangle = &Tangle{ branchManager: branchmanager.New(store), payloadStorage: osFactory.New(osPayload, osPayloadFactory, objectstorage.CacheTime(time.Second)), @@ -61,35 +61,102 @@ func New(store kvstore.KVStore) (result *Tangle) { Events: *newEvents(), } - - result.branchManager.Events.BranchPreferred.Attach(events.NewClosure(func(cachedBranch *branchmanager.CachedBranch) { - result.propagateBranchPreferredChangesToTransaction(cachedBranch, true) - })) - result.branchManager.Events.BranchUnpreferred.Attach(events.NewClosure(func(cachedBranch *branchmanager.CachedBranch) { - result.propagateBranchPreferredChangesToTransaction(cachedBranch, false) - })) + tangle.setupDAGSynchronization() return } -// propagateBranchPreferredChangesToTransaction updates the preferred flag of a transaction, whenever the preferred -// status of its corresponding branch changes. -func (tangle *Tangle) propagateBranchPreferredChangesToTransaction(cachedBranch *branchmanager.CachedBranch, preferred bool) { - cachedBranch.Consume(func(branch *branchmanager.Branch) { - if !branch.IsAggregated() { - transactionID, _, err := transaction.IDFromBytes(branch.ID().Bytes()) - if err != nil { - panic(err) // this should never ever happen - } +// region MAIN PUBLIC API ////////////////////////////////////////////////////////////////////////////////////////////// - _, err = tangle.SetTransactionPreferred(transactionID, preferred) - if err != nil { - tangle.Events.Error.Trigger(err) +// AttachPayload adds a new payload to the value tangle. +func (tangle *Tangle) AttachPayload(payload *payload.Payload) { + tangle.workerPool.Submit(func() { tangle.AttachPayloadSync(payload) }) +} - return - } +// AttachPayloadSync is the worker function that stores the payload and calls the corresponding storage events. +func (tangle *Tangle) AttachPayloadSync(payloadToStore *payload.Payload) { + // store the payload models or abort if we have seen the payload already + cachedPayload, cachedPayloadMetadata, payloadStored := tangle.storePayload(payloadToStore) + if !payloadStored { + return + } + defer cachedPayload.Release() + defer cachedPayloadMetadata.Release() + + // store transaction models or abort if we have seen this attachment already (nil == was not stored) + cachedTransaction, cachedTransactionMetadata, cachedAttachment, transactionIsNew := tangle.storeTransactionModels(payloadToStore) + defer cachedTransaction.Release() + defer cachedTransactionMetadata.Release() + if cachedAttachment == nil { + return + } + defer cachedAttachment.Release() + + // store the references between the different entities (we do this after the actual entities were stored, so that + // all the metadata models exist in the database as soon as the entities are reachable by walks). + tangle.storePayloadReferences(payloadToStore) + + // trigger events + if tangle.missingPayloadStorage.DeleteIfPresent(payloadToStore.ID().Bytes()) { + tangle.Events.MissingPayloadReceived.Trigger(cachedPayload, cachedPayloadMetadata) + } + tangle.Events.PayloadAttached.Trigger(cachedPayload, cachedPayloadMetadata) + if transactionIsNew { + tangle.Events.TransactionReceived.Trigger(cachedTransaction, cachedTransactionMetadata, cachedAttachment) + } + + // check solidity + tangle.solidifyPayload(cachedPayload.Retain(), cachedPayloadMetadata.Retain(), cachedTransaction.Retain(), cachedTransactionMetadata.Retain()) +} + +// SetTransactionPreferred modifies the preferred flag of a transaction. It updates the transactions metadata, +// propagates the changes to the branch DAG and triggers an update of the liked flags in the value tangle. +func (tangle *Tangle) SetTransactionPreferred(transactionID transaction.ID, preferred bool) (modified bool, err error) { + return tangle.setTransactionPreferred(transactionID, preferred, EventSourceTangle) +} + +// SetTransactionFinalized modifies the finalized flag of a transaction. It updates the transactions metadata and +// propagates the changes to the BranchManager if the flag was updated. +func (tangle *Tangle) SetTransactionFinalized(transactionID transaction.ID) (modified bool, err error) { + return tangle.setTransactionFinalized(transactionID, EventSourceTangle) +} + +// ValuePayloadsLiked is checking if the Payloads referenced by the passed in IDs are all liked. +func (tangle *Tangle) ValuePayloadsLiked(payloadIDs ...payload.ID) (liked bool) { + for _, payloadID := range payloadIDs { + if payloadID == payload.GenesisID { + continue } - }) + + payloadMetadataFound := tangle.PayloadMetadata(payloadID).Consume(func(payloadMetadata *PayloadMetadata) { + liked = payloadMetadata.Liked() + }) + + if !payloadMetadataFound || !liked { + return false + } + } + + return true +} + +// ValuePayloadsConfirmed is checking if the Payloads referenced by the passed in IDs are all confirmed. +func (tangle *Tangle) ValuePayloadsConfirmed(payloadIDs ...payload.ID) (confirmed bool) { + for _, payloadID := range payloadIDs { + if payloadID == payload.GenesisID { + continue + } + + payloadMetadataFound := tangle.PayloadMetadata(payloadID).Consume(func(payloadMetadata *PayloadMetadata) { + confirmed = payloadMetadata.Liked() + }) + + if !payloadMetadataFound || !confirmed { + return false + } + } + + return true } // BranchManager is the getter for the manager that takes care of creating and updating branches. @@ -97,6 +164,132 @@ func (tangle *Tangle) BranchManager() *branchmanager.BranchManager { return tangle.branchManager } +// LoadSnapshot creates a set of outputs in the value tangle, that are forming the genesis for future transactions. +func (tangle *Tangle) LoadSnapshot(snapshot map[transaction.ID]map[address.Address][]*balance.Balance) { + for transactionID, addressBalances := range snapshot { + for outputAddress, balances := range addressBalances { + input := NewOutput(outputAddress, transactionID, branchmanager.MasterBranchID, balances) + input.setSolid(true) + input.SetBranchID(branchmanager.MasterBranchID) + + // store output and abort if the snapshot has already been loaded earlier (output exists in the database) + cachedOutput, stored := tangle.outputStorage.StoreIfAbsent(input) + if !stored { + return + } + + cachedOutput.Release() + } + } +} + +// Fork creates a new branch from an existing transaction. +func (tangle *Tangle) Fork(transactionID transaction.ID, conflictingInputs []transaction.OutputID) (forked bool, finalized bool, err error) { + cachedTransaction := tangle.Transaction(transactionID) + cachedTransactionMetadata := tangle.TransactionMetadata(transactionID) + defer cachedTransaction.Release() + defer cachedTransactionMetadata.Release() + + tx := cachedTransaction.Unwrap() + if tx == nil { + err = fmt.Errorf("failed to load transaction '%s'", transactionID) + + return + } + txMetadata := cachedTransactionMetadata.Unwrap() + if txMetadata == nil { + err = fmt.Errorf("failed to load metadata of transaction '%s'", transactionID) + + return + } + + // abort if this transaction was finalized already + if txMetadata.Finalized() { + finalized = true + + return + } + + // update / create new branch + newBranchID := branchmanager.NewBranchID(tx.ID()) + cachedTargetBranch, newBranchCreated := tangle.branchManager.Fork(newBranchID, []branchmanager.BranchID{txMetadata.BranchID()}, conflictingInputs) + defer cachedTargetBranch.Release() + + // set branch to be preferred if the underlying transaction was marked as preferred + if txMetadata.Preferred() { + if _, err = tangle.branchManager.SetBranchPreferred(newBranchID, true); err != nil { + return + } + } + + // abort if the branch existed already + if !newBranchCreated { + return + } + + // move transactions to new branch + if err = tangle.moveTransactionToBranch(cachedTransaction.Retain(), cachedTransactionMetadata.Retain(), cachedTargetBranch.Retain()); err != nil { + return + } + + // trigger events + set result + tangle.Events.Fork.Trigger(cachedTransaction, cachedTransactionMetadata) + forked = true + + return +} + +// Prune resets the database and deletes all objects (for testing or "node resets"). +func (tangle *Tangle) Prune() (err error) { + if err = tangle.branchManager.Prune(); err != nil { + return + } + + for _, storage := range []*objectstorage.ObjectStorage{ + tangle.payloadStorage, + tangle.payloadMetadataStorage, + tangle.missingPayloadStorage, + tangle.approverStorage, + tangle.transactionStorage, + tangle.transactionMetadataStorage, + tangle.attachmentStorage, + tangle.outputStorage, + tangle.consumerStorage, + } { + if err = storage.Prune(); err != nil { + return + } + } + + return +} + +// Shutdown stops the worker pools and shuts down the object storage instances. +func (tangle *Tangle) Shutdown() *Tangle { + tangle.workerPool.ShutdownGracefully() + tangle.cleanupWorkerPool.ShutdownGracefully() + + for _, storage := range []*objectstorage.ObjectStorage{ + tangle.payloadStorage, + tangle.payloadMetadataStorage, + tangle.missingPayloadStorage, + tangle.approverStorage, + tangle.transactionStorage, + tangle.transactionMetadataStorage, + tangle.attachmentStorage, + tangle.outputStorage, + tangle.consumerStorage, + } { + storage.Shutdown() + } + + return tangle +} + +// endregion /////////////////////////////////////////////////////////////////////////////////////////////////////////// + +// region GETTERS/ITERATORS FOR THE STORED MODELS ////////////////////////////////////////////////////////////////////// + // Transaction loads the given transaction from the objectstorage. func (tangle *Tangle) Transaction(transactionID transaction.ID) *transaction.CachedTransaction { return &transaction.CachedTransaction{CachedObject: tangle.transactionStorage.Load(transactionID.Bytes())} @@ -112,6 +305,23 @@ func (tangle *Tangle) TransactionOutput(outputID transaction.OutputID) *CachedOu return &CachedOutput{CachedObject: tangle.outputStorage.Load(outputID.Bytes())} } +// OutputsOnAddress retrieves all the Outputs that are associated with an address. +func (tangle *Tangle) OutputsOnAddress(address address.Address) (result CachedOutputs) { + result = make(CachedOutputs) + tangle.outputStorage.ForEach(func(key []byte, cachedObject objectstorage.CachedObject) bool { + outputID, _, err := transaction.OutputIDFromBytes(key) + if err != nil { + panic(err) + } + + result[outputID] = &CachedOutput{CachedObject: cachedObject} + + return true + }, address.Bytes()) + + return +} + // Consumers retrieves the approvers of a payload from the object storage. func (tangle *Tangle) Consumers(outputID transaction.OutputID) CachedConsumers { consumers := make(CachedConsumers, 0) @@ -136,32 +346,233 @@ func (tangle *Tangle) Attachments(transactionID transaction.ID) CachedAttachment return attachments } -// AttachPayload adds a new payload to the value tangle. -func (tangle *Tangle) AttachPayload(payload *payload.Payload) { - tangle.workerPool.Submit(func() { tangle.AttachPayloadSync(payload) }) +// Payload retrieves a payload from the object storage. +func (tangle *Tangle) Payload(payloadID payload.ID) *payload.CachedPayload { + return &payload.CachedPayload{CachedObject: tangle.payloadStorage.Load(payloadID.Bytes())} } -// SetTransactionFinalized modifies the finalized flag of a transaction. It updates the transactions metadata and -// propagates the changes to the BranchManager if the flag was updated. -func (tangle *Tangle) SetTransactionFinalized(transactionID transaction.ID) (modified bool, err error) { - tangle.TransactionMetadata(transactionID).Consume(func(metadata *TransactionMetadata) { +// PayloadMetadata retrieves the metadata of a value payload from the object storage. +func (tangle *Tangle) PayloadMetadata(payloadID payload.ID) *CachedPayloadMetadata { + return &CachedPayloadMetadata{CachedObject: tangle.payloadMetadataStorage.Load(payloadID.Bytes())} +} + +// Approvers retrieves the approvers of a payload from the object storage. +func (tangle *Tangle) Approvers(payloadID payload.ID) CachedApprovers { + approvers := make(CachedApprovers, 0) + tangle.approverStorage.ForEach(func(key []byte, cachedObject objectstorage.CachedObject) bool { + approvers = append(approvers, &CachedPayloadApprover{CachedObject: cachedObject}) + + return true + }, payloadID.Bytes()) + + return approvers +} + +// ForeachApprovers iterates through the approvers of a payload and calls the passed in consumer function. +func (tangle *Tangle) ForeachApprovers(payloadID payload.ID, consume func(payload *payload.CachedPayload, payloadMetadata *CachedPayloadMetadata, transaction *transaction.CachedTransaction, transactionMetadata *CachedTransactionMetadata)) { + tangle.Approvers(payloadID).Consume(func(approver *PayloadApprover) { + approvingCachedPayload := tangle.Payload(approver.ApprovingPayloadID()) + + approvingCachedPayload.Consume(func(payload *payload.Payload) { + consume(approvingCachedPayload.Retain(), tangle.PayloadMetadata(approver.ApprovingPayloadID()), tangle.Transaction(payload.Transaction().ID()), tangle.TransactionMetadata(payload.Transaction().ID())) + }) + }) +} + +// ForEachConsumers iterates through the transactions that are consuming outputs of the given transactions +func (tangle *Tangle) ForEachConsumers(currentTransaction *transaction.Transaction, consume func(payload *payload.CachedPayload, payloadMetadata *CachedPayloadMetadata, transaction *transaction.CachedTransaction, transactionMetadata *CachedTransactionMetadata)) { + seenTransactions := make(map[transaction.ID]types.Empty) + currentTransaction.Outputs().ForEach(func(address address.Address, balances []*balance.Balance) bool { + tangle.Consumers(transaction.NewOutputID(address, currentTransaction.ID())).Consume(func(consumer *Consumer) { + if _, transactionSeen := seenTransactions[consumer.TransactionID()]; !transactionSeen { + seenTransactions[consumer.TransactionID()] = types.Void + + cachedTransaction := tangle.Transaction(consumer.TransactionID()) + defer cachedTransaction.Release() + + cachedTransactionMetadata := tangle.TransactionMetadata(consumer.TransactionID()) + defer cachedTransactionMetadata.Release() + + tangle.Attachments(consumer.TransactionID()).Consume(func(attachment *Attachment) { + consume(tangle.Payload(attachment.PayloadID()), tangle.PayloadMetadata(attachment.PayloadID()), cachedTransaction.Retain(), cachedTransactionMetadata.Retain()) + }) + } + }) + + return true + }) +} + +// ForEachConsumersAndApprovers calls the passed in consumer for all payloads that either approve the given payload or +// that attach a transaction that spends outputs from the transaction inside the given payload. +func (tangle *Tangle) ForEachConsumersAndApprovers(currentPayload *payload.Payload, consume func(payload *payload.CachedPayload, payloadMetadata *CachedPayloadMetadata, transaction *transaction.CachedTransaction, transactionMetadata *CachedTransactionMetadata)) { + tangle.ForEachConsumers(currentPayload.Transaction(), consume) + tangle.ForeachApprovers(currentPayload.ID(), consume) +} + +// endregion /////////////////////////////////////////////////////////////////////////////////////////////////////////// + +// region DAG SYNCHRONIZATION ////////////////////////////////////////////////////////////////////////////////////////// + +// setupDAGSynchronization sets up the behavior how the branch dag and the value tangle and UTXO dag are connected. +func (tangle *Tangle) setupDAGSynchronization() { + tangle.branchManager.Events.BranchPreferred.Attach(events.NewClosure(tangle.onBranchPreferred)) + tangle.branchManager.Events.BranchUnpreferred.Attach(events.NewClosure(tangle.onBranchUnpreferred)) + tangle.branchManager.Events.BranchLiked.Attach(events.NewClosure(tangle.onBranchLiked)) + tangle.branchManager.Events.BranchDisliked.Attach(events.NewClosure(tangle.onBranchDisliked)) + tangle.branchManager.Events.BranchFinalized.Attach(events.NewClosure(tangle.onBranchFinalized)) + tangle.branchManager.Events.BranchConfirmed.Attach(events.NewClosure(tangle.onBranchConfirmed)) + tangle.branchManager.Events.BranchRejected.Attach(events.NewClosure(tangle.onBranchRejected)) +} + +// onBranchPreferred gets triggered when a branch in the branch DAG is marked as preferred. +func (tangle *Tangle) onBranchPreferred(cachedBranch *branchmanager.CachedBranch) { + tangle.propagateBranchPreferredChangesToTangle(cachedBranch, true) +} + +// onBranchUnpreferred gets triggered when a branch in the branch DAG is marked as NOT preferred. +func (tangle *Tangle) onBranchUnpreferred(cachedBranch *branchmanager.CachedBranch) { + tangle.propagateBranchPreferredChangesToTangle(cachedBranch, false) +} + +// onBranchLiked gets triggered when a branch in the branch DAG is marked as liked. +func (tangle *Tangle) onBranchLiked(cachedBranch *branchmanager.CachedBranch) { + tangle.propagateBranchedLikedChangesToTangle(cachedBranch, true) +} + +// onBranchDisliked gets triggered when a branch in the branch DAG is marked as disliked. +func (tangle *Tangle) onBranchDisliked(cachedBranch *branchmanager.CachedBranch) { + tangle.propagateBranchedLikedChangesToTangle(cachedBranch, false) +} + +// onBranchFinalized gets triggered when a branch in the branch DAG is marked as finalized. +func (tangle *Tangle) onBranchFinalized(cachedBranch *branchmanager.CachedBranch) { + tangle.propagateBranchFinalizedChangesToTangle(cachedBranch) +} + +// onBranchConfirmed gets triggered when a branch in the branch DAG is marked as confirmed. +func (tangle *Tangle) onBranchConfirmed(cachedBranch *branchmanager.CachedBranch) { + tangle.propagateBranchConfirmedRejectedChangesToTangle(cachedBranch) +} + +// onBranchRejected gets triggered when a branch in the branch DAG is marked as rejected. +func (tangle *Tangle) onBranchRejected(cachedBranch *branchmanager.CachedBranch) { + tangle.propagateBranchConfirmedRejectedChangesToTangle(cachedBranch) +} + +// propagateBranchPreferredChangesToTangle triggers the propagation of preferred status changes of a branch to the value +// tangle and its UTXO DAG. +func (tangle *Tangle) propagateBranchPreferredChangesToTangle(cachedBranch *branchmanager.CachedBranch, preferred bool) { + cachedBranch.Consume(func(branch *branchmanager.Branch) { + if !branch.IsAggregated() { + transactionID, _, err := transaction.IDFromBytes(branch.ID().Bytes()) + if err != nil { + panic(err) // this should never ever happen + } + + _, err = tangle.setTransactionPreferred(transactionID, preferred, EventSourceBranchManager) + if err != nil { + tangle.Events.Error.Trigger(err) + + return + } + } + }) +} + +// propagateBranchFinalizedChangesToTangle triggers the propagation of finalized status changes of a branch to the value +// tangle and its UTXO DAG. +func (tangle *Tangle) propagateBranchFinalizedChangesToTangle(cachedBranch *branchmanager.CachedBranch) { + cachedBranch.Consume(func(branch *branchmanager.Branch) { + if !branch.IsAggregated() { + transactionID, _, err := transaction.IDFromBytes(branch.ID().Bytes()) + if err != nil { + panic(err) // this should never ever happen + } + + _, err = tangle.setTransactionFinalized(transactionID, EventSourceBranchManager) + if err != nil { + tangle.Events.Error.Trigger(err) + + return + } + } + }) +} + +// propagateBranchedLikedChangesToTangle triggers the propagation of liked status changes of a branch to the value +// tangle and its UTXO DAG. +func (tangle *Tangle) propagateBranchedLikedChangesToTangle(cachedBranch *branchmanager.CachedBranch, liked bool) { + cachedBranch.Consume(func(branch *branchmanager.Branch) { + if !branch.IsAggregated() { + transactionID, _, err := transaction.IDFromBytes(branch.ID().Bytes()) + if err != nil { + panic(err) // this should never ever happen + } + + // propagate changes to future cone of transaction (value tangle) + tangle.propagateValuePayloadLikeUpdates(transactionID, liked) + } + }) +} + +// propagateBranchConfirmedRejectedChangesToTangle triggers the propagation of confirmed and rejected status changes of +// a branch to the value tangle and its UTXO DAG. +func (tangle *Tangle) propagateBranchConfirmedRejectedChangesToTangle(cachedBranch *branchmanager.CachedBranch) { + cachedBranch.Consume(func(branch *branchmanager.Branch) { + if !branch.IsAggregated() { + transactionID, _, err := transaction.IDFromBytes(branch.ID().Bytes()) + if err != nil { + panic(err) // this should never ever happen + } + + // propagate changes to future cone of transaction (value tangle) + tangle.propagateValuePayloadConfirmedRejectedUpdates(transactionID) + } + }) +} + +// endregion /////////////////////////////////////////////////////////////////////////////////////////////////////////// + +// region PRIVATE UTILITY METHODS ////////////////////////////////////////////////////////////////////////////////////// + +func (tangle *Tangle) setTransactionFinalized(transactionID transaction.ID, eventSource EventSource) (modified bool, err error) { + // retrieve metadata and consume + cachedTransactionMetadata := tangle.TransactionMetadata(transactionID) + cachedTransactionMetadata.Consume(func(metadata *TransactionMetadata) { // update the finalized flag of the transaction modified = metadata.SetFinalized(true) // only propagate the changes if the flag was modified if modified { - // propagate changes to the branches (UTXO DAG) - if metadata.Conflicting() { - _, err = tangle.branchManager.SetBranchFinalized(metadata.BranchID()) - if err != nil { - tangle.Events.Error.Trigger(err) + // retrieve transaction from the database (for the events) + cachedTransaction := tangle.Transaction(transactionID) + defer cachedTransaction.Release() + if !cachedTransaction.Exists() { + return + } - return + // trigger the corresponding event + tangle.Events.TransactionFinalized.Trigger(cachedTransaction, cachedTransactionMetadata) + + // propagate changes to value tangle and branch DAG if we were called from the tangle + // Note: if the update was triggered by a change in the branch DAG then we do not propagate the confirmed + // and rejected changes yet as those require the branch to be liked before (we instead do it in the + // BranchLiked event) + if eventSource == EventSourceTangle { + // propagate changes to the branches (UTXO DAG) + if metadata.Conflicting() { + _, err = tangle.branchManager.SetBranchFinalized(metadata.BranchID()) + if err != nil { + tangle.Events.Error.Trigger(err) + + return + } } - } - // propagate changes to future cone of transaction (value tangle) - tangle.propagateValuePayloadConfirmedUpdates(transactionID) + // propagate changes to future cone of transaction (value tangle) + tangle.propagateValuePayloadConfirmedRejectedUpdates(transactionID) + } } }) @@ -169,31 +580,104 @@ func (tangle *Tangle) SetTransactionFinalized(transactionID transaction.ID) (mod } // TODO: WRITE COMMENT -func (tangle *Tangle) propagateValuePayloadConfirmedUpdates(transactionID transaction.ID) { - panic("not yet implemented") +func (tangle *Tangle) propagateValuePayloadConfirmedRejectedUpdates(transactionID transaction.ID) { + // initiate stack with the attachments of the passed in transaction + propagationStack := list.New() + tangle.Attachments(transactionID).Consume(func(attachment *Attachment) { + propagationStack.PushBack(&valuePayloadPropagationStackEntry{ + CachedPayload: tangle.Payload(attachment.PayloadID()), + CachedPayloadMetadata: tangle.PayloadMetadata(attachment.PayloadID()), + CachedTransaction: tangle.Transaction(transactionID), + CachedTransactionMetadata: tangle.TransactionMetadata(transactionID), + }) + }) + + // keep track of the seen payloads so we do not process them twice + seenPayloads := make(map[payload.ID]types.Empty) + + // iterate through stack (future cone of transactions) + for propagationStack.Len() >= 1 { + currentAttachmentEntry := propagationStack.Front() + tangle.propagateValuePayloadConfirmedRejectedUpdateStackEntry(propagationStack, seenPayloads, currentAttachmentEntry.Value.(*valuePayloadPropagationStackEntry)) + propagationStack.Remove(currentAttachmentEntry) + } } -// SetTransactionPreferred modifies the preferred flag of a transaction. It updates the transactions metadata and -// propagates the changes to the BranchManager if the flag was updated. -func (tangle *Tangle) SetTransactionPreferred(transactionID transaction.ID, preferred bool) (modified bool, err error) { - tangle.TransactionMetadata(transactionID).Consume(func(metadata *TransactionMetadata) { +func (tangle *Tangle) propagateValuePayloadConfirmedRejectedUpdateStackEntry(propagationStack *list.List, processedPayloads map[payload.ID]types.Empty, propagationStackEntry *valuePayloadPropagationStackEntry) { + // release the entry when we are done + defer propagationStackEntry.Release() + + // unpack loaded objects and abort if the entities could not be loaded from the database + currentPayload, currentPayloadMetadata, currentTransaction, currentTransactionMetadata := propagationStackEntry.Unwrap() + if currentPayload == nil || currentPayloadMetadata == nil || currentTransaction == nil || currentTransactionMetadata == nil { + return + } + + // perform different logic depending on the type of the change (liked vs dislike) + switch currentTransactionMetadata.Preferred() { + case true: + // abort if the transaction is not preferred, the branch of the payload is not liked, the referenced value payloads are not liked or the payload was marked as liked before + if !currentTransactionMetadata.Finalized() || !tangle.BranchManager().IsBranchConfirmed(currentPayloadMetadata.BranchID()) || !tangle.ValuePayloadsConfirmed(currentPayload.TrunkID(), currentPayload.BranchID()) || !currentPayloadMetadata.setConfirmed(true) { + return + } + + tangle.Events.PayloadConfirmed.Trigger(propagationStackEntry.CachedPayload, propagationStackEntry.CachedPayloadMetadata) + case false: + // abort if the payload has been marked as disliked before + if !currentTransactionMetadata.Finalized() || !currentPayloadMetadata.setRejected(true) { + return + } + + tangle.Events.PayloadRejected.Trigger(propagationStackEntry.CachedPayload, propagationStackEntry.CachedPayloadMetadata) + } + + // schedule checks of approvers and consumers + tangle.ForEachConsumersAndApprovers(currentPayload, tangle.createValuePayloadFutureConeIterator(propagationStack, processedPayloads)) +} + +// setTransactionPreferred is an internal utility method that updates the preferred flag and triggers changes to the +// branch DAG and triggers an updates of the liked flags in the value tangle +func (tangle *Tangle) setTransactionPreferred(transactionID transaction.ID, preferred bool, eventSource EventSource) (modified bool, err error) { + // retrieve metadata and consume + cachedTransactionMetadata := tangle.TransactionMetadata(transactionID) + cachedTransactionMetadata.Consume(func(metadata *TransactionMetadata) { // update the preferred flag of the transaction modified = metadata.setPreferred(preferred) - // only propagate the changes if the flag was modified + // only do something if the flag was modified if modified { - // propagate changes to the branches (UTXO DAG) - if metadata.Conflicting() { - _, err = tangle.branchManager.SetBranchPreferred(metadata.BranchID(), preferred) - if err != nil { - tangle.Events.Error.Trigger(err) + // retrieve transaction from the database (for the events) + cachedTransaction := tangle.Transaction(transactionID) + defer cachedTransaction.Release() + if !cachedTransaction.Exists() { + return + } - return - } + // trigger the correct event + if preferred { + tangle.Events.TransactionPreferred.Trigger(cachedTransaction, cachedTransactionMetadata) + } else { + tangle.Events.TransactionUnpreferred.Trigger(cachedTransaction, cachedTransactionMetadata) } - // propagate changes to future cone of transaction (value tangle) - tangle.propagateValuePayloadLikeUpdates(transactionID, preferred) + // propagate changes to value tangle and branch DAG if we were called from the tangle + // Note: if the update was triggered by a change in the branch DAG then we do not propagate the value + // payload changes yet as those require the branch to be liked before (we instead do it in the + // BranchLiked event) + if eventSource == EventSourceTangle { + // propagate changes to the branches (UTXO DAG) + if metadata.Conflicting() { + _, err = tangle.branchManager.SetBranchPreferred(metadata.BranchID(), preferred) + if err != nil { + tangle.Events.Error.Trigger(err) + + return + } + } + + // propagate changes to future cone of transaction (value tangle) + tangle.propagateValuePayloadLikeUpdates(transactionID, preferred) + } } }) @@ -234,7 +718,7 @@ func (tangle *Tangle) processValuePayloadLikedUpdateStackEntry(propagationStack // release the entry when we are done defer propagationStackEntry.Release() - // unpack loaded objects and abort if the entities could not be loaded from the database + // unpack loaded objects and abort if the entities could not be loaded from the database currentPayload, currentPayloadMetadata, currentTransaction, currentTransactionMetadata := propagationStackEntry.Unwrap() if currentPayload == nil || currentPayloadMetadata == nil || currentTransaction == nil || currentTransactionMetadata == nil { return @@ -244,14 +728,14 @@ func (tangle *Tangle) processValuePayloadLikedUpdateStackEntry(propagationStack switch liked { case true: // abort if the transaction is not preferred, the branch of the payload is not liked, the referenced value payloads are not liked or the payload was marked as liked before - if !currentTransactionMetadata.Preferred() || !tangle.BranchManager().IsBranchLiked(currentPayloadMetadata.BranchID()) || !tangle.ValuePayloadsLiked(currentPayload.TrunkID(), currentPayload.BranchID()) || !currentPayloadMetadata.SetLiked(liked) { + if !currentTransactionMetadata.Preferred() || !tangle.BranchManager().IsBranchLiked(currentPayloadMetadata.BranchID()) || !tangle.ValuePayloadsLiked(currentPayload.TrunkID(), currentPayload.BranchID()) || !currentPayloadMetadata.setLiked(liked) { return } tangle.Events.PayloadLiked.Trigger(propagationStackEntry.CachedPayload, propagationStackEntry.CachedPayloadMetadata) case false: // abort if the payload has been marked as disliked before - if !currentPayloadMetadata.SetLiked(liked) { + if !currentPayloadMetadata.setLiked(liked) { return } @@ -295,130 +779,6 @@ func (tangle *Tangle) createValuePayloadFutureConeIterator(propagationStack *lis } } -// ValuePayloadsLiked is checking if the Payloads referenced by the passed in IDs are all liked. -func (tangle *Tangle) ValuePayloadsLiked(payloadIDs ...payload.ID) (liked bool) { - for _, payloadID := range payloadIDs { - if payloadID == payload.GenesisID { - continue - } - - payloadMetadataFound := tangle.PayloadMetadata(payloadID).Consume(func(payloadMetadata *PayloadMetadata) { - liked = payloadMetadata.Liked() - }) - - if !payloadMetadataFound || !liked { - return - } - } - - return true -} - -// Payload retrieves a payload from the object storage. -func (tangle *Tangle) Payload(payloadID payload.ID) *payload.CachedPayload { - return &payload.CachedPayload{CachedObject: tangle.payloadStorage.Load(payloadID.Bytes())} -} - -// PayloadMetadata retrieves the metadata of a value payload from the object storage. -func (tangle *Tangle) PayloadMetadata(payloadID payload.ID) *CachedPayloadMetadata { - return &CachedPayloadMetadata{CachedObject: tangle.payloadMetadataStorage.Load(payloadID.Bytes())} -} - -// Approvers retrieves the approvers of a payload from the object storage. -func (tangle *Tangle) Approvers(payloadID payload.ID) CachedApprovers { - approvers := make(CachedApprovers, 0) - tangle.approverStorage.ForEach(func(key []byte, cachedObject objectstorage.CachedObject) bool { - approvers = append(approvers, &CachedPayloadApprover{CachedObject: cachedObject}) - - return true - }, payloadID.Bytes()) - - return approvers -} - -// Shutdown stops the worker pools and shuts down the object storage instances. -func (tangle *Tangle) Shutdown() *Tangle { - tangle.workerPool.ShutdownGracefully() - tangle.cleanupWorkerPool.ShutdownGracefully() - - for _, storage := range []*objectstorage.ObjectStorage{ - tangle.payloadStorage, - tangle.payloadMetadataStorage, - tangle.missingPayloadStorage, - tangle.approverStorage, - tangle.transactionStorage, - tangle.transactionMetadataStorage, - tangle.attachmentStorage, - tangle.outputStorage, - tangle.consumerStorage, - } { - storage.Shutdown() - } - - return tangle -} - -// Prune resets the database and deletes all objects (for testing or "node resets"). -func (tangle *Tangle) Prune() (err error) { - if err = tangle.branchManager.Prune(); err != nil { - return - } - - for _, storage := range []*objectstorage.ObjectStorage{ - tangle.payloadStorage, - tangle.payloadMetadataStorage, - tangle.missingPayloadStorage, - tangle.approverStorage, - tangle.transactionStorage, - tangle.transactionMetadataStorage, - tangle.attachmentStorage, - tangle.outputStorage, - tangle.consumerStorage, - } { - if err = storage.Prune(); err != nil { - return - } - } - - return -} - -// AttachPayloadSync is the worker function that stores the payload and calls the corresponding storage events. -func (tangle *Tangle) AttachPayloadSync(payloadToStore *payload.Payload) { - // store the payload models or abort if we have seen the payload already - cachedPayload, cachedPayloadMetadata, payloadStored := tangle.storePayload(payloadToStore) - if !payloadStored { - return - } - defer cachedPayload.Release() - defer cachedPayloadMetadata.Release() - - // store transaction models or abort if we have seen this attachment already (nil == was not stored) - cachedTransaction, cachedTransactionMetadata, cachedAttachment, transactionIsNew := tangle.storeTransactionModels(payloadToStore) - defer cachedTransaction.Release() - defer cachedTransactionMetadata.Release() - if cachedAttachment == nil { - return - } - defer cachedAttachment.Release() - - // store the references between the different entities (we do this after the actual entities were stored, so that - // all the metadata models exist in the database as soon as the entities are reachable by walks). - tangle.storePayloadReferences(payloadToStore) - - // trigger events - if tangle.missingPayloadStorage.DeleteIfPresent(payloadToStore.ID().Bytes()) { - tangle.Events.MissingPayloadReceived.Trigger(cachedPayload, cachedPayloadMetadata) - } - tangle.Events.PayloadAttached.Trigger(cachedPayload, cachedPayloadMetadata) - if transactionIsNew { - tangle.Events.TransactionReceived.Trigger(cachedTransaction, cachedTransactionMetadata, cachedAttachment) - } - - // check solidity - tangle.solidifyPayload(cachedPayload.Retain(), cachedPayloadMetadata.Retain(), cachedTransaction.Retain(), cachedTransactionMetadata.Retain()) -} - func (tangle *Tangle) storePayload(payloadToStore *payload.Payload) (cachedPayload *payload.CachedPayload, cachedMetadata *CachedPayloadMetadata, payloadStored bool) { storedTransaction, transactionIsNew := tangle.payloadStorage.StoreIfAbsent(payloadToStore) if !transactionIsNew { @@ -665,7 +1025,7 @@ func (tangle *Tangle) bookTransaction(cachedTransaction *transaction.CachedTrans // book outputs into the target branch transactionToBook.Outputs().ForEach(func(address address.Address, balances []*balance.Balance) bool { newOutput := NewOutput(address, transactionToBook.ID(), targetBranch.ID(), balances) - newOutput.SetSolid(true) + newOutput.setSolid(true) tangle.outputStorage.Store(newOutput).Release() return true @@ -724,17 +1084,6 @@ func (tangle *Tangle) bookPayload(cachedPayload *payload.CachedPayload, cachedPa return } -// ForeachApprovers iterates through the approvers of a payload and calls the passed in consumer function. -func (tangle *Tangle) ForeachApprovers(payloadID payload.ID, consume func(payload *payload.CachedPayload, payloadMetadata *CachedPayloadMetadata, transaction *transaction.CachedTransaction, transactionMetadata *CachedTransactionMetadata)) { - tangle.Approvers(payloadID).Consume(func(approver *PayloadApprover) { - approvingCachedPayload := tangle.Payload(approver.ApprovingPayloadID()) - - approvingCachedPayload.Consume(func(payload *payload.Payload) { - consume(approvingCachedPayload.Retain(), tangle.PayloadMetadata(approver.ApprovingPayloadID()), tangle.Transaction(payload.Transaction().ID()), tangle.TransactionMetadata(payload.Transaction().ID())) - }) - }) -} - // payloadBranchID returns the BranchID that the referenced Payload was booked into. func (tangle *Tangle) payloadBranchID(payloadID payload.ID) branchmanager.BranchID { if payloadID == payload.GenesisID { @@ -847,42 +1196,6 @@ func (tangle *Tangle) checkTransactionSolidity(tx *transaction.Transaction, meta return } -// LoadSnapshot creates a set of outputs in the value tangle, that are forming the genesis for future transactions. -func (tangle *Tangle) LoadSnapshot(snapshot map[transaction.ID]map[address.Address][]*balance.Balance) { - for transactionID, addressBalances := range snapshot { - for outputAddress, balances := range addressBalances { - input := NewOutput(outputAddress, transactionID, branchmanager.MasterBranchID, balances) - input.SetSolid(true) - input.SetBranchID(branchmanager.MasterBranchID) - - // store output and abort if the snapshot has already been loaded earlier (output exists in the database) - cachedOutput, stored := tangle.outputStorage.StoreIfAbsent(input) - if !stored { - return - } - - cachedOutput.Release() - } - } -} - -// OutputsOnAddress retrieves all the Outputs that are associated with an address. -func (tangle *Tangle) OutputsOnAddress(address address.Address) (result CachedOutputs) { - result = make(CachedOutputs) - tangle.outputStorage.ForEach(func(key []byte, cachedObject objectstorage.CachedObject) bool { - outputID, _, err := transaction.OutputIDFromBytes(key) - if err != nil { - panic(err) - } - - result[outputID] = &CachedOutput{CachedObject: cachedObject} - - return true - }, address.Bytes()) - - return -} - func (tangle *Tangle) getCachedOutputsFromTransactionInputs(tx *transaction.Transaction) (result CachedOutputs) { result = make(CachedOutputs) tx.Inputs().ForEach(func(inputId transaction.OutputID) bool { @@ -1018,62 +1331,6 @@ func (tangle *Tangle) checkTransactionOutputs(inputBalances map[balance.Color]in return unspentCoins == newlyColoredCoins+uncoloredCoins } -// Fork creates a new branch from an existing transaction. -func (tangle *Tangle) Fork(transactionID transaction.ID, conflictingInputs []transaction.OutputID) (forked bool, finalized bool, err error) { - cachedTransaction := tangle.Transaction(transactionID) - cachedTransactionMetadata := tangle.TransactionMetadata(transactionID) - defer cachedTransaction.Release() - defer cachedTransactionMetadata.Release() - - tx := cachedTransaction.Unwrap() - if tx == nil { - err = fmt.Errorf("failed to load transaction '%s'", transactionID) - - return - } - txMetadata := cachedTransactionMetadata.Unwrap() - if txMetadata == nil { - err = fmt.Errorf("failed to load metadata of transaction '%s'", transactionID) - - return - } - - // abort if this transaction was finalized already - if txMetadata.Finalized() { - finalized = true - - return - } - - // update / create new branch - newBranchID := branchmanager.NewBranchID(tx.ID()) - cachedTargetBranch, newBranchCreated := tangle.branchManager.Fork(newBranchID, []branchmanager.BranchID{txMetadata.BranchID()}, conflictingInputs) - defer cachedTargetBranch.Release() - - // set branch to be preferred if the underlying transaction was marked as preferred - if txMetadata.Preferred() { - if _, err = tangle.branchManager.SetBranchPreferred(newBranchID, true); err != nil { - return - } - } - - // abort if the branch existed already - if !newBranchCreated { - return - } - - // move transactions to new branch - if err = tangle.moveTransactionToBranch(cachedTransaction.Retain(), cachedTransactionMetadata.Retain(), cachedTargetBranch.Retain()); err != nil { - return - } - - // trigger events + set result - tangle.Events.Fork.Trigger(cachedTransaction, cachedTransactionMetadata) - forked = true - - return -} - // TODO: write comment what it does func (tangle *Tangle) moveTransactionToBranch(cachedTransaction *transaction.CachedTransaction, cachedTransactionMetadata *CachedTransactionMetadata, cachedTargetBranch *branchmanager.CachedBranch) (err error) { // push transaction that shall be moved to the stack @@ -1231,36 +1488,7 @@ func (tangle *Tangle) calculateBranchOfTransaction(currentTransaction *transacti return } -// ForEachConsumers iterates through the transactions that are consuming outputs of the given transactions -func (tangle *Tangle) ForEachConsumers(currentTransaction *transaction.Transaction, consume func(payload *payload.CachedPayload, payloadMetadata *CachedPayloadMetadata, transaction *transaction.CachedTransaction, transactionMetadata *CachedTransactionMetadata)) { - seenTransactions := make(map[transaction.ID]types.Empty) - currentTransaction.Outputs().ForEach(func(address address.Address, balances []*balance.Balance) bool { - tangle.Consumers(transaction.NewOutputID(address, currentTransaction.ID())).Consume(func(consumer *Consumer) { - if _, transactionSeen := seenTransactions[consumer.TransactionID()]; !transactionSeen { - seenTransactions[consumer.TransactionID()] = types.Void - - cachedTransaction := tangle.Transaction(consumer.TransactionID()) - defer cachedTransaction.Release() - - cachedTransactionMetadata := tangle.TransactionMetadata(consumer.TransactionID()) - defer cachedTransactionMetadata.Release() - - tangle.Attachments(consumer.TransactionID()).Consume(func(attachment *Attachment) { - consume(tangle.Payload(attachment.PayloadID()), tangle.PayloadMetadata(attachment.PayloadID()), cachedTransaction.Retain(), cachedTransactionMetadata.Retain()) - }) - } - }) - - return true - }) -} - -// ForEachConsumersAndApprovers calls the passed in consumer for all payloads that either approve the given payload or -// that attach a transaction that spends outputs from the transaction inside the given payload. -func (tangle *Tangle) ForEachConsumersAndApprovers(currentPayload *payload.Payload, consume func(payload *payload.CachedPayload, payloadMetadata *CachedPayloadMetadata, transaction *transaction.CachedTransaction, transactionMetadata *CachedTransactionMetadata)) { - tangle.ForEachConsumers(currentPayload.Transaction(), consume) - tangle.ForeachApprovers(currentPayload.ID(), consume) -} +// endregion /////////////////////////////////////////////////////////////////////////////////////////////////////////// // valuePayloadPropagationStackEntry is a container for the elements in the propagation stack of ValuePayloads type valuePayloadPropagationStackEntry struct { diff --git a/dapps/valuetransfers/packages/tangle/tangle_test.go b/dapps/valuetransfers/packages/tangle/tangle_test.go index f335593c..a418b926 100644 --- a/dapps/valuetransfers/packages/tangle/tangle_test.go +++ b/dapps/valuetransfers/packages/tangle/tangle_test.go @@ -29,8 +29,8 @@ func TestNewOutput(t *testing.T) { balance.New(balance.ColorIOTA, 1337), }, output.Balances()) - assert.Equal(t, true, output.SetSolid(true)) - assert.Equal(t, false, output.SetSolid(true)) + assert.Equal(t, true, output.setSolid(true)) + assert.Equal(t, false, output.setSolid(true)) assert.Equal(t, true, output.Solid()) assert.NotEqual(t, time.Time{}, output.SolidificationTime()) diff --git a/dapps/valuetransfers/packages/test/tangle_test.go b/dapps/valuetransfers/packages/test/tangle_test.go index 14728b8f..a4580370 100644 --- a/dapps/valuetransfers/packages/test/tangle_test.go +++ b/dapps/valuetransfers/packages/test/tangle_test.go @@ -1,6 +1,7 @@ package test import ( + "fmt" "testing" "github.com/iotaledger/hive.go/events" @@ -72,6 +73,10 @@ func TestTangle_ValueTransfer(t *testing.T) { )) valueTangle.AttachPayloadSync(attachedPayload1) + valueTangle.PayloadMetadata(attachedPayload1.ID()).Consume(func(payload *tangle.PayloadMetadata) { + fmt.Println(payload.Confirmed()) + }) + // check if old addresses are empty and new addresses are filled assert.Equal(t, map[balance.Color]int64{}, ledgerState.Balances(seed.Address(0))) assert.Equal(t, map[balance.Color]int64{}, ledgerState.Balances(seed.Address(1))) -- GitLab