diff --git a/go.mod b/go.mod index a2e098d1b295c97143fc9271537d5767698b47dd..88299566be0133b6abaae2af10942d022c65d83d 100644 --- a/go.mod +++ b/go.mod @@ -15,7 +15,7 @@ require ( github.com/golang/protobuf v1.3.2 // indirect github.com/google/open-location-code/go v0.0.0-20190903173953-119bc96a3a51 github.com/gorilla/websocket v1.4.1 - github.com/iotaledger/hive.go v0.0.0-20200203224255-313f831d7885 + github.com/iotaledger/hive.go v0.0.0-20200206155421-88b071e6232c github.com/iotaledger/iota.go v1.0.0-beta.9 github.com/kr/text v0.1.0 github.com/labstack/echo v3.3.10+incompatible diff --git a/go.sum b/go.sum index ff2be2016d19ed5c2b0601e999d12b0c3e678219..b385eefc19438eeeef846f1b71f14bb9d004258f 100644 --- a/go.sum +++ b/go.sum @@ -116,6 +116,10 @@ github.com/iotaledger/hive.go v0.0.0-20200131164002-e50853dd9172 h1:QuckesiAtCzf github.com/iotaledger/hive.go v0.0.0-20200131164002-e50853dd9172/go.mod h1:wj3bFHlcX0NiEOWu5+WOg/MI/5N7PKCFnyaziaylB64= github.com/iotaledger/hive.go v0.0.0-20200203224255-313f831d7885 h1:MAD62T49q1AnoybapTCnlqIFfFv2fs0GmKgn7IcR+oA= github.com/iotaledger/hive.go v0.0.0-20200203224255-313f831d7885/go.mod h1:wj3bFHlcX0NiEOWu5+WOg/MI/5N7PKCFnyaziaylB64= +github.com/iotaledger/hive.go v0.0.0-20200206115534-7f680d0055fe h1:e7C1RzKi4fFksYt+rn8sEi9RnUjbV0Od9pkKWb44PhE= +github.com/iotaledger/hive.go v0.0.0-20200206115534-7f680d0055fe/go.mod h1:wj3bFHlcX0NiEOWu5+WOg/MI/5N7PKCFnyaziaylB64= +github.com/iotaledger/hive.go v0.0.0-20200206155421-88b071e6232c h1:dPRoel27yR8okT5vSL2mzmCMOP41ede1kqcVD5Fzu7I= +github.com/iotaledger/hive.go v0.0.0-20200206155421-88b071e6232c/go.mod h1:wj3bFHlcX0NiEOWu5+WOg/MI/5N7PKCFnyaziaylB64= github.com/iotaledger/iota.go v1.0.0-beta.9 h1:c654s9pkdhMBkABUvWg+6k91MEBbdtmZXP1xDfQpajg= github.com/iotaledger/iota.go v1.0.0-beta.9/go.mod h1:F6WBmYd98mVjAmmPVYhnxg8NNIWCjjH8VWT9qvv3Rc8= github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo= diff --git a/packages/binary/tangle/model/approver/approver.go b/packages/binary/tangle/model/approver/approver.go new file mode 100644 index 0000000000000000000000000000000000000000..b08bf342d269616674e883d31791650dcdd29006 --- /dev/null +++ b/packages/binary/tangle/model/approver/approver.go @@ -0,0 +1,59 @@ +package approver + +import ( + "github.com/iotaledger/hive.go/objectstorage" + + "github.com/iotaledger/goshimmer/packages/binary/tangle/model/transaction" +) + +type Approver struct { + objectstorage.StorableObjectFlags + + storageKey []byte + referencedTransaction transaction.Id + approvingTransaction transaction.Id +} + +func New(referencedTransaction transaction.Id, approvingTransaction transaction.Id) *Approver { + approver := &Approver{ + storageKey: make([]byte, transaction.IdLength+transaction.IdLength), + referencedTransaction: referencedTransaction, + approvingTransaction: approvingTransaction, + } + + copy(approver.storageKey[:transaction.IdLength], referencedTransaction[:]) + copy(approver.storageKey[transaction.IdLength:], approvingTransaction[:]) + + return approver +} + +func FromStorage(id []byte) (result objectstorage.StorableObject) { + approver := &Approver{ + storageKey: make([]byte, transaction.IdLength+transaction.IdLength), + } + copy(approver.referencedTransaction[:], id[:transaction.IdLength]) + copy(approver.approvingTransaction[:], id[transaction.IdLength:]) + copy(approver.storageKey, id) + + return approver +} + +func (approver *Approver) GetStorageKey() []byte { + return approver.storageKey +} + +func (approver *Approver) GetApprovingTransactionId() transaction.Id { + return approver.approvingTransaction +} + +func (approver *Approver) Update(other objectstorage.StorableObject) { + panic("approvers should never be overwritten and only stored once to optimize IO") +} + +func (approver *Approver) MarshalBinary() (result []byte, err error) { + return +} + +func (approver *Approver) UnmarshalBinary(data []byte) (err error) { + return +} diff --git a/packages/binary/tangle/model/approver/cached_approver.go b/packages/binary/tangle/model/approver/cached_approver.go new file mode 100644 index 0000000000000000000000000000000000000000..ba2793f2e9bbf399d86212f51710cfc1a1e4b2cd --- /dev/null +++ b/packages/binary/tangle/model/approver/cached_approver.go @@ -0,0 +1,21 @@ +package approver + +import ( + "github.com/iotaledger/hive.go/objectstorage" +) + +type CachedApprover struct { + objectstorage.CachedObject +} + +func (cachedApprover *CachedApprover) Unwrap() *Approver { + if untypedObject := cachedApprover.Get(); untypedObject == nil { + return nil + } else { + if typedObject := untypedObject.(*Approver); typedObject == nil || typedObject.IsDeleted() { + return nil + } else { + return typedObject + } + } +} diff --git a/packages/binary/tangle/model/approvers/approvers.go b/packages/binary/tangle/model/approvers_old/approvers.go similarity index 99% rename from packages/binary/tangle/model/approvers/approvers.go rename to packages/binary/tangle/model/approvers_old/approvers.go index 178faf9157078196515ba231bcea3d8c05ebe24f..cf60ce624878a3474d4e2e85093828bc29d4e40e 100644 --- a/packages/binary/tangle/model/approvers/approvers.go +++ b/packages/binary/tangle/model/approvers_old/approvers.go @@ -1,4 +1,4 @@ -package approvers +package approvers_old import ( "encoding/binary" diff --git a/packages/binary/tangle/model/approvers/cached_approvers.go b/packages/binary/tangle/model/approvers_old/cached_approvers.go similarity index 95% rename from packages/binary/tangle/model/approvers/cached_approvers.go rename to packages/binary/tangle/model/approvers_old/cached_approvers.go index 55e225ba24aa427f2b0252e41519492532e1dd6f..a25526468852d2cced26824b72a66c0fbc388660 100644 --- a/packages/binary/tangle/model/approvers/cached_approvers.go +++ b/packages/binary/tangle/model/approvers_old/cached_approvers.go @@ -1,4 +1,4 @@ -package approvers +package approvers_old import ( "github.com/iotaledger/hive.go/objectstorage" diff --git a/packages/binary/tangle/model/transaction/payload/valuetransfer/valuetransfer.go b/packages/binary/tangle/model/transaction/payload/valuetransfer/valuetransfer.go index b50fc60249b9273ead4f816f767b2c174a0920d5..c84fae5ac4a9f32b208f5eb6037a9a2b702a594d 100644 --- a/packages/binary/tangle/model/transaction/payload/valuetransfer/valuetransfer.go +++ b/packages/binary/tangle/model/transaction/payload/valuetransfer/valuetransfer.go @@ -6,10 +6,10 @@ import ( "github.com/iotaledger/goshimmer/packages/binary/signature/ed25119" "github.com/iotaledger/goshimmer/packages/binary/tangle/model/transaction/payload" - "github.com/iotaledger/goshimmer/packages/binary/types" "github.com/iotaledger/goshimmer/packages/binary/valuetangle/model/address" "github.com/iotaledger/goshimmer/packages/ledgerstate/coloredcoins" "github.com/iotaledger/goshimmer/packages/ledgerstate/transfer" + "github.com/iotaledger/hive.go/types" ) type ValueTransfer struct { diff --git a/packages/binary/tangle/tangle.go b/packages/binary/tangle/tangle.go index b1941ce24969aed8489c41376145c2a54383ad07..90004c1ea46988f9ed10caecbbb71f588610a410 100644 --- a/packages/binary/tangle/tangle.go +++ b/packages/binary/tangle/tangle.go @@ -4,7 +4,10 @@ import ( "container/list" "time" - "github.com/iotaledger/goshimmer/packages/binary/tangle/model/approvers" + "github.com/iotaledger/hive.go/syncutils" + "github.com/iotaledger/hive.go/types" + + "github.com/iotaledger/goshimmer/packages/binary/tangle/model/approver" "github.com/iotaledger/goshimmer/packages/binary/tangle/model/missingtransaction" "github.com/iotaledger/goshimmer/packages/binary/tangle/model/transaction" "github.com/iotaledger/goshimmer/packages/binary/tangle/model/transaction/payload/data" @@ -26,7 +29,7 @@ type Tangle struct { transactionStorage *objectstorage.ObjectStorage transactionMetadataStorage *objectstorage.ObjectStorage - approversStorage *objectstorage.ObjectStorage + approverStorage *objectstorage.ObjectStorage missingTransactionsStorage *objectstorage.ObjectStorage Events Events @@ -34,6 +37,8 @@ type Tangle struct { storeTransactionsWorkerPool async.WorkerPool solidifierWorkerPool async.WorkerPool cleanupWorkerPool async.WorkerPool + + tangleMutex syncutils.RWMultiMutex } // Constructor for the tangle. @@ -42,7 +47,7 @@ func New(storageId []byte) (result *Tangle) { storageId: storageId, transactionStorage: objectstorage.New(append(storageId, storageprefix.TangleTransaction...), transaction.FromStorage), transactionMetadataStorage: objectstorage.New(append(storageId, storageprefix.TangleTransactionMetadata...), transactionmetadata.FromStorage), - approversStorage: objectstorage.New(append(storageId, storageprefix.TangleApprovers...), approvers.FromStorage), + approverStorage: objectstorage.New(append(storageId, storageprefix.TangleApprovers...), approver.FromStorage, objectstorage.PartitionKey(transaction.IdLength, transaction.IdLength)), missingTransactionsStorage: objectstorage.New(append(storageId, storageprefix.TangleMissingTransaction...), missingtransaction.FromStorage), Events: *newEvents(), @@ -99,33 +104,58 @@ func (tangle *Tangle) GetTransactionMetadata(transactionId transaction.Id) *tran } // Retrieves the approvers of a transaction from the tangle. -func (tangle *Tangle) GetApprovers(transactionId transaction.Id) *approvers.CachedApprovers { - return &approvers.CachedApprovers{CachedObject: tangle.approversStorage.Load(transactionId[:])} +func (tangle *Tangle) GetApprover(approvedTransaction transaction.Id, approvingTransaction transaction.Id) *approver.CachedApprover { + keyToLoad := make([]byte, transaction.IdLength+transaction.IdLength) + copy(keyToLoad[:transaction.IdLength], approvedTransaction[:]) + copy(keyToLoad[transaction.IdLength:], approvingTransaction[:]) + + return &approver.CachedApprover{CachedObject: tangle.approverStorage.Load(keyToLoad)} +} + +func (tangle *Tangle) GetApprovers(transactionId transaction.Id) []*approver.CachedApprover { + approvers := make([]*approver.CachedApprover, 0) + tangle.approverStorage.ForEach(func(key []byte, cachedObject objectstorage.CachedObject) bool { + approvers = append(approvers, &approver.CachedApprover{CachedObject: cachedObject.Retain()}) + + return true + }, transactionId[:]) + + return approvers } // Deletes a transaction from the tangle (i.e. for local snapshots) func (tangle *Tangle) DeleteTransaction(transactionId transaction.Id) { - tangle.GetTransaction(transactionId).Consume(func(object objectstorage.StorableObject) { + tangle.tangleMutex.RLock(transactionId) + if !tangle.GetTransaction(transactionId).Consume(func(object objectstorage.StorableObject) { + tangle.tangleMutex.RUnlock(transactionId) + tangle.tangleMutex.Lock(transactionId) + currentTransaction := object.(*transaction.Transaction) - tangle.GetApprovers(currentTransaction.GetTrunkTransactionId()).Consume(func(object objectstorage.StorableObject) { - if _tmp := object.(*approvers.Approvers); _tmp.Remove(transactionId) && _tmp.Size() == 0 { - _tmp.Delete() - } - }) + trunkTransactionId := currentTransaction.GetTrunkTransactionId() + trunkIdToDelete := make([]byte, transaction.IdLength+transaction.IdLength) + copy(trunkIdToDelete[:transaction.IdLength], transactionId[:]) + copy(trunkIdToDelete[transaction.IdLength:], trunkTransactionId[:]) + tangle.approverStorage.Delete(trunkIdToDelete) + + branchTransactionId := currentTransaction.GetBranchTransactionId() + if branchTransactionId != trunkTransactionId { + branchIdToDelete := make([]byte, transaction.IdLength+transaction.IdLength) + copy(branchIdToDelete[:transaction.IdLength], transactionId[:]) + copy(branchIdToDelete[transaction.IdLength:], branchTransactionId[:]) + tangle.approverStorage.Delete(branchIdToDelete) + } - tangle.GetApprovers(currentTransaction.GetTrunkTransactionId()).Consume(func(object objectstorage.StorableObject) { - if _tmp := object.(*approvers.Approvers); _tmp.Remove(transactionId) && _tmp.Size() == 0 { - _tmp.Delete() - } - }) - }) + tangle.transactionStorage.Delete(transactionId[:]) + tangle.transactionMetadataStorage.Delete(transactionId[:]) + tangle.missingTransactionsStorage.Delete(transactionId[:]) - tangle.transactionStorage.Delete(transactionId[:]) - tangle.transactionMetadataStorage.Delete(transactionId[:]) - tangle.missingTransactionsStorage.Delete(transactionId[:]) + tangle.tangleMutex.Unlock(transactionId) - tangle.Events.TransactionRemoved.Trigger(transactionId) + tangle.Events.TransactionRemoved.Trigger(transactionId) + }) { + tangle.tangleMutex.RUnlock(transactionId) + } } // Marks the tangle as stopped, so it will not accept any new transactions (waits for all backgroundTasks to finish. @@ -142,7 +172,7 @@ func (tangle *Tangle) Prune() error { for _, storage := range []*objectstorage.ObjectStorage{ tangle.transactionStorage, tangle.transactionMetadataStorage, - tangle.approversStorage, + tangle.approverStorage, tangle.missingTransactionsStorage, } { if err := storage.Prune(); err != nil { @@ -155,39 +185,35 @@ func (tangle *Tangle) Prune() error { // Worker that stores the transactions and calls the corresponding "Storage events" func (tangle *Tangle) storeTransactionWorker(tx *transaction.Transaction) { - addTransactionToApprovers := func(transactionId transaction.Id, approvedTransactionId transaction.Id) { - cachedApprovers := tangle.approversStorage.ComputeIfAbsent(approvedTransactionId[:], func([]byte) objectstorage.StorableObject { - result := approvers.New(approvedTransactionId) - - result.SetModified() - - return result - }) - - if _tmp := cachedApprovers.Get(); _tmp != nil { - if approversObject := _tmp.(*approvers.Approvers); approversObject != nil { - approversObject.Add(transactionId) - - // if the approvers got "cleaned up" while being in cache, we make sure the object gets persisted again - approversObject.Persist() - } - } - - cachedApprovers.Release() - } - var cachedTransaction *transaction.CachedTransaction - if _tmp, transactionIsNew := tangle.transactionStorage.StoreIfAbsent(tx.GetStorageKey(), tx); !transactionIsNew { + if _tmp, transactionIsNew := tangle.transactionStorage.StoreIfAbsent(tx); !transactionIsNew { return } else { cachedTransaction = &transaction.CachedTransaction{CachedObject: _tmp} } transactionId := tx.GetId() + trunkTransactionID := tx.GetTrunkTransactionId() + branchTransactionID := tx.GetBranchTransactionId() + + var lockBuilder syncutils.MultiMutexLockBuilder + if trunkTransactionID != transaction.EmptyId { + lockBuilder.AddLock(trunkTransactionID) + } + if branchTransactionID != transaction.EmptyId && branchTransactionID != trunkTransactionID { + lockBuilder.AddLock(branchTransactionID) + } + locks := lockBuilder.Build() + + tangle.tangleMutex.Lock(locks...) cachedTransactionMetadata := &transactionmetadata.CachedTransactionMetadata{CachedObject: tangle.transactionMetadataStorage.Store(transactionmetadata.New(transactionId))} - addTransactionToApprovers(transactionId, tx.GetTrunkTransactionId()) - addTransactionToApprovers(transactionId, tx.GetBranchTransactionId()) + tangle.approverStorage.Store(approver.New(trunkTransactionID, transactionId)).Release() + if branchTransactionID != trunkTransactionID { + tangle.approverStorage.Store(approver.New(branchTransactionID, transactionId)).Release() + } + + tangle.tangleMutex.Unlock(locks...) if tangle.missingTransactionsStorage.DeleteIfPresent(transactionId[:]) { tangle.Events.MissingTransactionReceived.Trigger(transactionId) @@ -212,7 +238,7 @@ func (tangle *Tangle) solidifyTransactionWorker(cachedTransaction *transaction.C transactionMetadataCached.Release() // if transaction is missing and was not reported as missing, yet - if cachedMissingTransaction, missingTransactionStored := tangle.missingTransactionsStorage.StoreIfAbsent(transactionId[:], missingtransaction.New(transactionId)); missingTransactionStored { + if cachedMissingTransaction, missingTransactionStored := tangle.missingTransactionsStorage.StoreIfAbsent(missingtransaction.New(transactionId)); missingTransactionStored { cachedMissingTransaction.Consume(func(object objectstorage.StorableObject) { tangle.monitorMissingTransactionWorker(object.(*missingtransaction.MissingTransaction).GetTransactionId()) }) @@ -283,14 +309,16 @@ func (tangle *Tangle) solidifyTransactionWorker(cachedTransaction *transaction.C if isTransactionSolid(currentTransaction, currentTransactionMetadata) && currentTransactionMetadata.SetSolid(true) { tangle.Events.TransactionSolid.Trigger(currentCachedTransaction, currentCachedTransactionMetadata) - tangle.GetApprovers(currentTransaction.GetId()).Consume(func(object objectstorage.StorableObject) { - for approverTransactionId := range object.(*approvers.Approvers).Get() { + for _, cachedApprover := range tangle.GetApprovers(currentTransaction.GetId()) { + cachedApprover.Consume(func(object objectstorage.StorableObject) { + approverTransactionId := object.(*approver.Approver).GetApprovingTransactionId() + solidificationStack.PushBack([2]interface{}{ tangle.GetTransaction(approverTransactionId), tangle.GetTransactionMetadata(approverTransactionId), }) - } - }) + }) + } } // release cached results @@ -310,8 +338,6 @@ func (tangle *Tangle) monitorMissingTransactionWorker(transactionId transaction. if time.Since(missingTransaction.GetMissingSince()) >= MAX_MISSING_TIME_BEFORE_CLEANUP { tangle.cleanupWorkerPool.Submit(func() { tangle.cleanupWorker(missingTransaction.GetTransactionId()) }) } else { - tangle.Events.TransactionMissing.Trigger(transactionId) - scheduleNextMissingCheck(transactionId) } }) @@ -327,17 +353,26 @@ func (tangle *Tangle) cleanupWorker(transactionId transaction.Id) { cleanupStack := list.New() cleanupStack.PushBack(transactionId) + processedTransactions := make(map[transaction.Id]types.Empty) + processedTransactions[transactionId] = types.Void + for cleanupStack.Len() >= 1 { currentStackEntry := cleanupStack.Front() currentTransactionId := currentStackEntry.Value.(transaction.Id) cleanupStack.Remove(currentStackEntry) - tangle.GetApprovers(currentTransactionId).Consume(func(object objectstorage.StorableObject) { - for approverTransactionId := range object.(*approvers.Approvers).Get() { - tangle.DeleteTransaction(currentTransactionId) + tangle.DeleteTransaction(currentTransactionId) - cleanupStack.PushBack(approverTransactionId) - } - }) + for _, cachedApprover := range tangle.GetApprovers(currentTransactionId) { + cachedApprover.Consume(func(object objectstorage.StorableObject) { + approverId := object.(*approver.Approver).GetApprovingTransactionId() + + if _, transactionProcessed := processedTransactions[approverId]; !transactionProcessed { + cleanupStack.PushBack(approverId) + + processedTransactions[approverId] = types.Void + } + }) + } } } diff --git a/packages/binary/tangle/tangle_test.go b/packages/binary/tangle/tangle_test.go index 5b629ceb409ab3ae2105af7163f1f337b2d2a248..0fc69f2e0123f61c6c5cc8ab9ffb27e6f9f53e5a 100644 --- a/packages/binary/tangle/tangle_test.go +++ b/packages/binary/tangle/tangle_test.go @@ -10,6 +10,7 @@ import ( "github.com/iotaledger/goshimmer/packages/binary/identity" "github.com/iotaledger/goshimmer/packages/binary/tangle/model/transaction" "github.com/iotaledger/goshimmer/packages/binary/tangle/model/transaction/payload/data" + "github.com/iotaledger/goshimmer/packages/binary/tangle/model/transactionmetadata" ) func BenchmarkTangle_AttachTransaction(b *testing.B) { @@ -45,6 +46,14 @@ func TestTangle_AttachTransaction(t *testing.T) { return } + tangle.Events.TransactionAttached.Attach(events.NewClosure(func(cachedTransaction *transaction.CachedTransaction, cachedTransactionMetadata *transactionmetadata.CachedTransactionMetadata) { + fmt.Println("ATTACHED:", cachedTransaction.Unwrap().GetId()) + })) + + tangle.Events.TransactionSolid.Attach(events.NewClosure(func(cachedTransaction *transaction.CachedTransaction, cachedTransactionMetadata *transactionmetadata.CachedTransactionMetadata) { + fmt.Println("SOLID:", cachedTransaction.Unwrap().GetId()) + })) + tangle.Events.TransactionMissing.Attach(events.NewClosure(func(transactionId transaction.Id) { fmt.Println("MISSING:", transactionId) })) @@ -56,12 +65,10 @@ func TestTangle_AttachTransaction(t *testing.T) { newTransaction1 := transaction.New(transaction.EmptyId, transaction.EmptyId, identity.Generate(), data.New([]byte("some data"))) newTransaction2 := transaction.New(newTransaction1.GetId(), newTransaction1.GetId(), identity.Generate(), data.New([]byte("some other data"))) - fmt.Println("ATTACH", newTransaction2.GetId()) tangle.AttachTransaction(newTransaction2) - time.Sleep(37 * time.Second) + time.Sleep(7 * time.Second) - fmt.Println("ATTACH", newTransaction1.GetId()) tangle.AttachTransaction(newTransaction1) tangle.Shutdown() diff --git a/packages/binary/types/empty.go b/packages/binary/types/empty.go deleted file mode 100644 index e8be3c8ad5d8d95d719e4aab5aca3c796aa3e2f3..0000000000000000000000000000000000000000 --- a/packages/binary/types/empty.go +++ /dev/null @@ -1,5 +0,0 @@ -package types - -type Empty struct{} - -var Void Empty diff --git a/packages/ledgerstate/reality/id_list.go b/packages/ledgerstate/reality/id_list.go index b200c0e0796890cb7536f27dd089c961f9c7a614..b2ec8e9506de9d4735386e9f4430c2d09b432c6a 100644 --- a/packages/ledgerstate/reality/id_list.go +++ b/packages/ledgerstate/reality/id_list.go @@ -1,7 +1,7 @@ package reality import ( - "github.com/iotaledger/goshimmer/packages/binary/types" + "github.com/iotaledger/hive.go/types" ) type IdList []Id diff --git a/packages/ledgerstate/reality/id_set.go b/packages/ledgerstate/reality/id_set.go index e9dffffce395ad57e5d438be82abb13c1625523c..2ec2e708647ccda9c8c1de2c24dd9e1f8d719d8e 100644 --- a/packages/ledgerstate/reality/id_set.go +++ b/packages/ledgerstate/reality/id_set.go @@ -1,6 +1,8 @@ package reality -import "github.com/iotaledger/goshimmer/packages/binary/types" +import ( + "github.com/iotaledger/hive.go/types" +) type IdSet map[Id]types.Empty