From 919895a0a30618eaf8260f65574191429490feed Mon Sep 17 00:00:00 2001 From: Hans Moog <hm@mkjc.net> Date: Fri, 7 Feb 2020 14:25:13 +0100 Subject: [PATCH] Refactor: refactored the tangle --- packages/binary/tangle/events.go | 18 +-- .../tangle/model/approver/cached_approver.go | 12 ++ .../model/transaction/cached_transaction.go | 8 +- packages/binary/tangle/tangle.go | 139 +++++++----------- packages/binary/tangle/tangle_test.go | 12 +- 5 files changed, 93 insertions(+), 96 deletions(-) diff --git a/packages/binary/tangle/events.go b/packages/binary/tangle/events.go index 00f33b48..d2bf3552 100644 --- a/packages/binary/tangle/events.go +++ b/packages/binary/tangle/events.go @@ -10,8 +10,9 @@ import ( type Events struct { TransactionAttached *events.Event TransactionSolid *events.Event - TransactionMissing *events.Event MissingTransactionReceived *events.Event + TransactionMissing *events.Event + TransactionUnsolidifiable *events.Event TransactionRemoved *events.Event } @@ -19,21 +20,20 @@ func newEvents() *Events { return &Events{ TransactionAttached: events.NewEvent(cachedTransactionEvent), TransactionSolid: events.NewEvent(cachedTransactionEvent), + MissingTransactionReceived: events.NewEvent(cachedTransactionEvent), TransactionMissing: events.NewEvent(transactionIdEvent), - MissingTransactionReceived: events.NewEvent(transactionIdEvent), + TransactionUnsolidifiable: events.NewEvent(transactionIdEvent), TransactionRemoved: events.NewEvent(transactionIdEvent), } } func transactionIdEvent(handler interface{}, params ...interface{}) { - missingTransactionId := params[0].(transaction.Id) - - handler.(func(transaction.Id))(missingTransactionId) + handler.(func(transaction.Id))(params[0].(transaction.Id)) } func cachedTransactionEvent(handler interface{}, params ...interface{}) { - cachedTransaction := params[0].(*transaction.CachedTransaction) - cachedTransactionMetadata := params[1].(*transactionmetadata.CachedTransactionMetadata) - - handler.(func(*transaction.CachedTransaction, *transactionmetadata.CachedTransactionMetadata))(cachedTransaction.Retain().(*transaction.CachedTransaction), cachedTransactionMetadata.Retain().(*transactionmetadata.CachedTransactionMetadata)) + handler.(func(*transaction.CachedTransaction, *transactionmetadata.CachedTransactionMetadata))( + params[0].(*transaction.CachedTransaction).Retain(), + params[1].(*transactionmetadata.CachedTransactionMetadata).Retain().(*transactionmetadata.CachedTransactionMetadata), + ) } diff --git a/packages/binary/tangle/model/approver/cached_approver.go b/packages/binary/tangle/model/approver/cached_approver.go index ba2793f2..c5284432 100644 --- a/packages/binary/tangle/model/approver/cached_approver.go +++ b/packages/binary/tangle/model/approver/cached_approver.go @@ -19,3 +19,15 @@ func (cachedApprover *CachedApprover) Unwrap() *Approver { } } } + +type CachedApprovers []*CachedApprover + +func (cachedApprovers CachedApprovers) Consume(consumer func(approver *Approver)) (consumed bool) { + for _, cachedApprover := range cachedApprovers { + consumed = consumed || cachedApprover.Consume(func(object objectstorage.StorableObject) { + consumer(object.(*Approver)) + }) + } + + return +} diff --git a/packages/binary/tangle/model/transaction/cached_transaction.go b/packages/binary/tangle/model/transaction/cached_transaction.go index 5b9bc6a2..5f177ea7 100644 --- a/packages/binary/tangle/model/transaction/cached_transaction.go +++ b/packages/binary/tangle/model/transaction/cached_transaction.go @@ -8,10 +8,16 @@ type CachedTransaction struct { objectstorage.CachedObject } -func (cachedTransaction *CachedTransaction) Retain() objectstorage.CachedObject { +func (cachedTransaction *CachedTransaction) Retain() *CachedTransaction { return &CachedTransaction{cachedTransaction.CachedObject.Retain()} } +func (cachedTransaction *CachedTransaction) Consume(consumer func(object *Transaction)) bool { + return cachedTransaction.CachedObject.Consume(func(object objectstorage.StorableObject) { + consumer(object.(*Transaction)) + }) +} + func (cachedTransaction *CachedTransaction) Unwrap() *Transaction { if untypedTransaction := cachedTransaction.Get(); untypedTransaction == nil { return nil diff --git a/packages/binary/tangle/tangle.go b/packages/binary/tangle/tangle.go index 90004c1e..6308042e 100644 --- a/packages/binary/tangle/tangle.go +++ b/packages/binary/tangle/tangle.go @@ -4,7 +4,6 @@ import ( "container/list" "time" - "github.com/iotaledger/hive.go/syncutils" "github.com/iotaledger/hive.go/types" "github.com/iotaledger/goshimmer/packages/binary/tangle/model/approver" @@ -37,8 +36,6 @@ type Tangle struct { storeTransactionsWorkerPool async.WorkerPool solidifierWorkerPool async.WorkerPool cleanupWorkerPool async.WorkerPool - - tangleMutex syncutils.RWMultiMutex } // Constructor for the tangle. @@ -104,18 +101,10 @@ func (tangle *Tangle) GetTransactionMetadata(transactionId transaction.Id) *tran } // Retrieves the approvers of a transaction from the tangle. -func (tangle *Tangle) GetApprover(approvedTransaction transaction.Id, approvingTransaction transaction.Id) *approver.CachedApprover { - keyToLoad := make([]byte, transaction.IdLength+transaction.IdLength) - copy(keyToLoad[:transaction.IdLength], approvedTransaction[:]) - copy(keyToLoad[transaction.IdLength:], approvingTransaction[:]) - - return &approver.CachedApprover{CachedObject: tangle.approverStorage.Load(keyToLoad)} -} - -func (tangle *Tangle) GetApprovers(transactionId transaction.Id) []*approver.CachedApprover { - approvers := make([]*approver.CachedApprover, 0) +func (tangle *Tangle) GetApprovers(transactionId transaction.Id) approver.CachedApprovers { + approvers := make(approver.CachedApprovers, 0) tangle.approverStorage.ForEach(func(key []byte, cachedObject objectstorage.CachedObject) bool { - approvers = append(approvers, &approver.CachedApprover{CachedObject: cachedObject.Retain()}) + approvers = append(approvers, &approver.CachedApprover{CachedObject: cachedObject}) return true }, transactionId[:]) @@ -125,37 +114,20 @@ func (tangle *Tangle) GetApprovers(transactionId transaction.Id) []*approver.Cac // Deletes a transaction from the tangle (i.e. for local snapshots) func (tangle *Tangle) DeleteTransaction(transactionId transaction.Id) { - tangle.tangleMutex.RLock(transactionId) - if !tangle.GetTransaction(transactionId).Consume(func(object objectstorage.StorableObject) { - tangle.tangleMutex.RUnlock(transactionId) - tangle.tangleMutex.Lock(transactionId) - - currentTransaction := object.(*transaction.Transaction) - + tangle.GetTransaction(transactionId).Consume(func(currentTransaction *transaction.Transaction) { trunkTransactionId := currentTransaction.GetTrunkTransactionId() - trunkIdToDelete := make([]byte, transaction.IdLength+transaction.IdLength) - copy(trunkIdToDelete[:transaction.IdLength], transactionId[:]) - copy(trunkIdToDelete[transaction.IdLength:], trunkTransactionId[:]) - tangle.approverStorage.Delete(trunkIdToDelete) + tangle.deleteApprover(trunkTransactionId, transactionId) branchTransactionId := currentTransaction.GetBranchTransactionId() if branchTransactionId != trunkTransactionId { - branchIdToDelete := make([]byte, transaction.IdLength+transaction.IdLength) - copy(branchIdToDelete[:transaction.IdLength], transactionId[:]) - copy(branchIdToDelete[transaction.IdLength:], branchTransactionId[:]) - tangle.approverStorage.Delete(branchIdToDelete) + tangle.deleteApprover(branchTransactionId, transactionId) } - tangle.transactionStorage.Delete(transactionId[:]) tangle.transactionMetadataStorage.Delete(transactionId[:]) - tangle.missingTransactionsStorage.Delete(transactionId[:]) - - tangle.tangleMutex.Unlock(transactionId) + tangle.transactionStorage.Delete(transactionId[:]) tangle.Events.TransactionRemoved.Trigger(transactionId) - }) { - tangle.tangleMutex.RUnlock(transactionId) - } + }) } // Marks the tangle as stopped, so it will not accept any new transactions (waits for all backgroundTasks to finish. @@ -164,6 +136,11 @@ func (tangle *Tangle) Shutdown() *Tangle { tangle.solidifierWorkerPool.ShutdownGracefully() tangle.cleanupWorkerPool.ShutdownGracefully() + tangle.transactionStorage.Shutdown() + tangle.transactionMetadataStorage.Shutdown() + tangle.approverStorage.Shutdown() + tangle.missingTransactionsStorage.Shutdown() + return tangle } @@ -185,6 +162,7 @@ func (tangle *Tangle) Prune() error { // Worker that stores the transactions and calls the corresponding "Storage events" func (tangle *Tangle) storeTransactionWorker(tx *transaction.Transaction) { + // store transaction var cachedTransaction *transaction.CachedTransaction if _tmp, transactionIsNew := tangle.transactionStorage.StoreIfAbsent(tx); !transactionIsNew { return @@ -192,35 +170,26 @@ func (tangle *Tangle) storeTransactionWorker(tx *transaction.Transaction) { cachedTransaction = &transaction.CachedTransaction{CachedObject: _tmp} } + // store transaction metadata transactionId := tx.GetId() - trunkTransactionID := tx.GetTrunkTransactionId() - branchTransactionID := tx.GetBranchTransactionId() - - var lockBuilder syncutils.MultiMutexLockBuilder - if trunkTransactionID != transaction.EmptyId { - lockBuilder.AddLock(trunkTransactionID) - } - if branchTransactionID != transaction.EmptyId && branchTransactionID != trunkTransactionID { - lockBuilder.AddLock(branchTransactionID) - } - locks := lockBuilder.Build() - - tangle.tangleMutex.Lock(locks...) - cachedTransactionMetadata := &transactionmetadata.CachedTransactionMetadata{CachedObject: tangle.transactionMetadataStorage.Store(transactionmetadata.New(transactionId))} + + // store trunk approver + trunkTransactionID := tx.GetTrunkTransactionId() tangle.approverStorage.Store(approver.New(trunkTransactionID, transactionId)).Release() - if branchTransactionID != trunkTransactionID { + + // store branch approver + if branchTransactionID := tx.GetBranchTransactionId(); branchTransactionID != trunkTransactionID { tangle.approverStorage.Store(approver.New(branchTransactionID, transactionId)).Release() } - tangle.tangleMutex.Unlock(locks...) - + // trigger events if tangle.missingTransactionsStorage.DeleteIfPresent(transactionId[:]) { - tangle.Events.MissingTransactionReceived.Trigger(transactionId) + tangle.Events.MissingTransactionReceived.Trigger(cachedTransaction, cachedTransactionMetadata) } - tangle.Events.TransactionAttached.Trigger(cachedTransaction, cachedTransactionMetadata) + // check solidity tangle.solidifierWorkerPool.Submit(func() { tangle.solidifyTransactionWorker(cachedTransaction, cachedTransactionMetadata) }) @@ -268,15 +237,7 @@ func (tangle *Tangle) solidifyTransactionWorker(cachedTransaction *transaction.C return true } - // 1. check tangle solidity - isTrunkSolid := isTransactionMarkedAsSolid(transaction.GetTrunkTransactionId()) - isBranchSolid := isTransactionMarkedAsSolid(transaction.GetBranchTransactionId()) - if isTrunkSolid && isBranchSolid { - // 2. check payload solidity - return true - } - - return false + return isTransactionMarkedAsSolid(transaction.GetTrunkTransactionId()) && isTransactionMarkedAsSolid(transaction.GetBranchTransactionId()) } popElementsFromStack := func(stack *list.List) (*transaction.CachedTransaction, *transactionmetadata.CachedTransactionMetadata) { @@ -309,16 +270,14 @@ func (tangle *Tangle) solidifyTransactionWorker(cachedTransaction *transaction.C if isTransactionSolid(currentTransaction, currentTransactionMetadata) && currentTransactionMetadata.SetSolid(true) { tangle.Events.TransactionSolid.Trigger(currentCachedTransaction, currentCachedTransactionMetadata) - for _, cachedApprover := range tangle.GetApprovers(currentTransaction.GetId()) { - cachedApprover.Consume(func(object objectstorage.StorableObject) { - approverTransactionId := object.(*approver.Approver).GetApprovingTransactionId() + tangle.GetApprovers(currentTransaction.GetId()).Consume(func(approver *approver.Approver) { + approverTransactionId := approver.GetApprovingTransactionId() - solidificationStack.PushBack([2]interface{}{ - tangle.GetTransaction(approverTransactionId), - tangle.GetTransactionMetadata(approverTransactionId), - }) + solidificationStack.PushBack([2]interface{}{ + tangle.GetTransaction(approverTransactionId), + tangle.GetTransactionMetadata(approverTransactionId), }) - } + }) } // release cached results @@ -336,20 +295,34 @@ func (tangle *Tangle) monitorMissingTransactionWorker(transactionId transaction. missingTransaction := object.(*missingtransaction.MissingTransaction) if time.Since(missingTransaction.GetMissingSince()) >= MAX_MISSING_TIME_BEFORE_CLEANUP { - tangle.cleanupWorkerPool.Submit(func() { tangle.cleanupWorker(missingTransaction.GetTransactionId()) }) + tangle.cleanupWorkerPool.Submit(func() { + tangle.Events.TransactionUnsolidifiable.Trigger(transactionId) + + tangle.deleteSubtangle(missingTransaction.GetTransactionId()) + }) } else { + // TRIGGER STILL MISSING EVENT? + scheduleNextMissingCheck(transactionId) } }) }) } + tangle.Events.TransactionMissing.Trigger(transactionId) scheduleNextMissingCheck(transactionId) } -// Worker that recursively cleans up the approvers of a unsolidifiable missing transaction. -func (tangle *Tangle) cleanupWorker(transactionId transaction.Id) { +func (tangle *Tangle) deleteApprover(approvedTransaction transaction.Id, approvingTransaction transaction.Id) { + idToDelete := make([]byte, transaction.IdLength+transaction.IdLength) + copy(idToDelete[:transaction.IdLength], approvedTransaction[:]) + copy(idToDelete[transaction.IdLength:], approvingTransaction[:]) + tangle.approverStorage.Delete(idToDelete) +} + +// Deletes a transaction and all of its approvers (recursively). +func (tangle *Tangle) deleteSubtangle(transactionId transaction.Id) { cleanupStack := list.New() cleanupStack.PushBack(transactionId) @@ -363,16 +336,14 @@ func (tangle *Tangle) cleanupWorker(transactionId transaction.Id) { tangle.DeleteTransaction(currentTransactionId) - for _, cachedApprover := range tangle.GetApprovers(currentTransactionId) { - cachedApprover.Consume(func(object objectstorage.StorableObject) { - approverId := object.(*approver.Approver).GetApprovingTransactionId() + tangle.GetApprovers(currentTransactionId).Consume(func(approver *approver.Approver) { + approverId := approver.GetApprovingTransactionId() - if _, transactionProcessed := processedTransactions[approverId]; !transactionProcessed { - cleanupStack.PushBack(approverId) + if _, transactionProcessed := processedTransactions[approverId]; !transactionProcessed { + cleanupStack.PushBack(approverId) - processedTransactions[approverId] = types.Void - } - }) - } + processedTransactions[approverId] = types.Void + } + }) } } diff --git a/packages/binary/tangle/tangle_test.go b/packages/binary/tangle/tangle_test.go index 0fc69f2e..33420e87 100644 --- a/packages/binary/tangle/tangle_test.go +++ b/packages/binary/tangle/tangle_test.go @@ -47,11 +47,19 @@ func TestTangle_AttachTransaction(t *testing.T) { } tangle.Events.TransactionAttached.Attach(events.NewClosure(func(cachedTransaction *transaction.CachedTransaction, cachedTransactionMetadata *transactionmetadata.CachedTransactionMetadata) { - fmt.Println("ATTACHED:", cachedTransaction.Unwrap().GetId()) + cachedTransaction.Consume(func(transaction *transaction.Transaction) { + fmt.Println("ATTACHED:", transaction.GetId()) + }) })) tangle.Events.TransactionSolid.Attach(events.NewClosure(func(cachedTransaction *transaction.CachedTransaction, cachedTransactionMetadata *transactionmetadata.CachedTransactionMetadata) { - fmt.Println("SOLID:", cachedTransaction.Unwrap().GetId()) + cachedTransaction.Consume(func(transaction *transaction.Transaction) { + fmt.Println("SOLID:", transaction.GetId()) + }) + })) + + tangle.Events.TransactionUnsolidifiable.Attach(events.NewClosure(func(transactionId transaction.Id) { + fmt.Println("UNSOLIDIFIABLE:", transactionId) })) tangle.Events.TransactionMissing.Attach(events.NewClosure(func(transactionId transaction.Id) { -- GitLab