diff --git a/packages/datastructure/lru_cache.go b/packages/datastructure/lru_cache.go index f2701bcc27e51d3b40318d3e9c99a1ae2ba855c4..fa7a71d833e348d95c1499022668da0cbae14844 100644 --- a/packages/datastructure/lru_cache.go +++ b/packages/datastructure/lru_cache.go @@ -18,6 +18,8 @@ type LRUCache struct { size int options *LRUCacheOptions mutex sync.RWMutex + keyMutexes map[interface{}]*sync.RWMutex + keyMutexesMutex sync.RWMutex } func NewLRUCache(capacity int, options ...*LRUCacheOptions) *LRUCache { @@ -33,14 +35,20 @@ func NewLRUCache(capacity int, options ...*LRUCacheOptions) *LRUCache { doublyLinkedList: &DoublyLinkedList{}, capacity: capacity, options: currentOptions, + keyMutexes: make(map[interface{}]*sync.RWMutex), } } func (cache *LRUCache) Set(key interface{}, value interface{}) { - cache.mutex.Lock() - defer cache.mutex.Unlock() + keyMutex := cache.getKeyMutex(key) + keyMutex.Lock() + cache.mutex.Lock() cache.set(key, value) + cache.mutex.Unlock() + + keyMutex.Unlock() + cache.deleteKeyMutex(key) } func (cache *LRUCache) set(key interface{}, value interface{}) { @@ -76,17 +84,31 @@ func (cache *LRUCache) set(key interface{}, value interface{}) { } func (cache *LRUCache) ComputeIfAbsent(key interface{}, callback func() interface{}) (result interface{}) { - cache.mutex.Lock() - defer cache.mutex.Unlock() + keyMutex := cache.getKeyMutex(key) + keyMutex.RLock() + cache.mutex.RLock() if element, exists := cache.directory[key]; exists { + cache.mutex.RUnlock() + cache.mutex.Lock() cache.promoteElement(element) + cache.mutex.Unlock() result = element.GetValue().(*lruCacheElement).value + + keyMutex.RUnlock() } else { - if result = callback(); result != nil { + cache.mutex.RUnlock() + keyMutex.RUnlock() + keyMutex.Lock() + if result = callback(); !typeutils.IsInterfaceNil(result) { + cache.mutex.Lock() cache.set(key, result) + cache.mutex.Unlock() } + keyMutex.Unlock() + + cache.deleteKeyMutex(key) } return @@ -97,31 +119,44 @@ func (cache *LRUCache) ComputeIfAbsent(key interface{}, callback func() interfac // If the callback returns nil the entry is removed from the cache. // Returns the updated entry. func (cache *LRUCache) ComputeIfPresent(key interface{}, callback func(value interface{}) interface{}) (result interface{}) { - cache.mutex.Lock() - defer cache.mutex.Unlock() - + keyMutex := cache.getKeyMutex(key) + keyMutex.RLock() + cache.mutex.RLock() if entry, exists := cache.directory[key]; exists { + cache.mutex.RUnlock() + result = entry.GetValue().(*lruCacheElement).value + keyMutex.RUnlock() + keyMutex.Lock() if callbackResult := callback(result); !typeutils.IsInterfaceNil(callbackResult) { result = callbackResult + cache.mutex.Lock() cache.set(key, callbackResult) + cache.mutex.Unlock() + + keyMutex.Unlock() } else { + cache.mutex.Lock() if err := cache.doublyLinkedList.removeEntry(entry); err != nil { panic(err) } delete(cache.directory, key) - cache.size-- + cache.mutex.Unlock() + + keyMutex.Unlock() if cache.options.EvictionCallback != nil { cache.options.EvictionCallback(key, result) } } } else { - result = nil + cache.mutex.RUnlock() + keyMutex.RUnlock() } + cache.deleteKeyMutex(key) return } @@ -208,3 +243,28 @@ func (cache *LRUCache) promoteElement(element *DoublyLinkedListEntry) { } cache.doublyLinkedList.addFirstEntry(element) } + +func (cache *LRUCache) getKeyMutex(key interface{}) (result *sync.RWMutex) { + cache.keyMutexesMutex.RLock() + if keyMutex, keyMutexExists := cache.keyMutexes[key]; !keyMutexExists { + cache.keyMutexesMutex.RUnlock() + cache.keyMutexesMutex.Lock() + + result = &sync.RWMutex{} + cache.keyMutexes[key] = result + + cache.keyMutexesMutex.Unlock() + } else { + cache.keyMutexesMutex.RUnlock() + + result = keyMutex + } + + return +} + +func (cache *LRUCache) deleteKeyMutex(key interface{}) { + cache.keyMutexesMutex.Lock() + delete(cache.keyMutexes, key) + cache.keyMutexesMutex.Unlock() +} diff --git a/packages/model/bundle/bundle.go b/packages/model/bundle/bundle.go new file mode 100644 index 0000000000000000000000000000000000000000..36c50cb995fee7780add92cb9f280302cc2871ae --- /dev/null +++ b/packages/model/bundle/bundle.go @@ -0,0 +1,76 @@ +package bundle + +import ( + "sync" + + "github.com/iotaledger/goshimmer/packages/errors" + + "github.com/iotaledger/goshimmer/packages/model/value_transaction" + "github.com/iotaledger/goshimmer/packages/ternary" +) + +type Bundle struct { + hash ternary.Trytes + transactionHashes []ternary.Trytes + isValueBundle bool + isValueBundleMutex sync.RWMutex + bundleEssenceHash ternary.Trytes + modified bool + modifiedMutex sync.RWMutex +} + +func New(headTransactionHash ternary.Trytes) (result *Bundle) { + result = &Bundle{ + hash: headTransactionHash, + } + + return +} + +func (bundle *Bundle) GetTransactionHashes() []ternary.Trytes { + return bundle.transactionHashes +} + +func (bundle *Bundle) GetHash() ternary.Trytes { + return bundle.hash +} + +func (bundle *Bundle) IsValueBundle() (result bool) { + bundle.isValueBundleMutex.RLock() + result = bundle.isValueBundle + bundle.isValueBundleMutex.RUnlock() + + return +} + +func (bundle *Bundle) SetValueBundle(valueBundle bool) { + bundle.isValueBundleMutex.Lock() + bundle.isValueBundle = valueBundle + bundle.isValueBundleMutex.Unlock() +} + +func (bundle *Bundle) GetModified() (result bool) { + bundle.modifiedMutex.RLock() + result = bundle.modified + bundle.modifiedMutex.RUnlock() + + return +} + +func (bundle *Bundle) SetModified(modified bool) { + bundle.modifiedMutex.Lock() + bundle.modified = modified + bundle.modifiedMutex.Unlock() +} + +func (bundle *Bundle) Marshal() []byte { + return nil +} + +func (bundle *Bundle) Unmarshal(data []byte) errors.IdentifiableError { + return nil +} + +func CalculateBundleHash(transactions []*value_transaction.ValueTransaction) ternary.Trytes { + return (<-Hasher.Hash(transactions[0].GetBundleEssence())).ToTrytes() +} diff --git a/packages/model/valuebundle/hasher.go b/packages/model/bundle/hasher.go similarity index 90% rename from packages/model/valuebundle/hasher.go rename to packages/model/bundle/hasher.go index bc2f0d99cd8c8566510fee16653d0c7252daa5b1..e1eccd1e1d7320436b9b26c57a4e0c8294f97ea7 100644 --- a/packages/model/valuebundle/hasher.go +++ b/packages/model/bundle/hasher.go @@ -1,4 +1,4 @@ -package valuebundle +package bundle import ( "github.com/iotaledger/goshimmer/packages/curl" diff --git a/packages/model/meta_transaction/meta_transaction.go b/packages/model/meta_transaction/meta_transaction.go index f949915eeac1faec7e7c5746789267125ca111b0..58669c8bfe650ca478be302f05678a3c3e99bc39 100644 --- a/packages/model/meta_transaction/meta_transaction.go +++ b/packages/model/meta_transaction/meta_transaction.go @@ -264,7 +264,7 @@ func (this *MetaTransaction) SetBranchTransactionHash(branchTransactionHash tern } // getter for the head flag (supports concurrency) -func (this *MetaTransaction) GetHead() (result bool) { +func (this *MetaTransaction) IsHead() (result bool) { this.headMutex.RLock() if this.head == nil { this.headMutex.RUnlock() @@ -315,7 +315,7 @@ func (this *MetaTransaction) SetHead(head bool) bool { } // getter for the tail flag (supports concurrency) -func (this *MetaTransaction) GetTail() (result bool) { +func (this *MetaTransaction) IsTail() (result bool) { this.tailMutex.RLock() if this.tail == nil { this.tailMutex.RUnlock() diff --git a/packages/model/meta_transaction/meta_transaction_test.go b/packages/model/meta_transaction/meta_transaction_test.go index 996cc8a1be55db5753cd6961fcfeaee68ed25b94..1b10208dab37deec82bc2407a5fb16c9c62c22d3 100644 --- a/packages/model/meta_transaction/meta_transaction_test.go +++ b/packages/model/meta_transaction/meta_transaction_test.go @@ -29,8 +29,8 @@ func TestMetaTransaction_SettersGetters(t *testing.T) { assert.Equal(t, transaction.GetShardMarker(), shardMarker) assert.Equal(t, transaction.GetTrunkTransactionHash(), trunkTransactionHash) assert.Equal(t, transaction.GetBranchTransactionHash(), branchTransactionHash) - assert.Equal(t, transaction.GetHead(), head) - assert.Equal(t, transaction.GetTail(), tail) + assert.Equal(t, transaction.IsHead(), head) + assert.Equal(t, transaction.IsTail(), tail) assert.Equal(t, transaction.GetTransactionType(), transactionType) assert.Equal(t, transaction.GetHash(), FromBytes(transaction.GetBytes()).GetHash()) diff --git a/packages/model/transactionmetadata/transactionmetadata.go b/packages/model/transactionmetadata/transactionmetadata.go index 8ba50c3c26626f204224c9907bcc167ad515235d..f2518fdd7ece5f6b382ebeea64f3fae4d5fe4e86 100644 --- a/packages/model/transactionmetadata/transactionmetadata.go +++ b/packages/model/transactionmetadata/transactionmetadata.go @@ -15,8 +15,8 @@ import ( type TransactionMetadata struct { hash ternary.Trytes hashMutex sync.RWMutex - bundleTailHash ternary.Trytes - bundleTailHashMutex sync.RWMutex + bundleHeadHash ternary.Trytes + bundleHeadHashMutex sync.RWMutex receivedTime time.Time receivedTimeMutex sync.RWMutex solid bool @@ -67,26 +67,26 @@ func (metadata *TransactionMetadata) SetHash(hash ternary.Trytes) { } } -func (metadata *TransactionMetadata) GetBundleTailHash() ternary.Trytes { - metadata.bundleTailHashMutex.RLock() - defer metadata.bundleTailHashMutex.RUnlock() +func (metadata *TransactionMetadata) GetBundleHeadHash() ternary.Trytes { + metadata.bundleHeadHashMutex.RLock() + defer metadata.bundleHeadHashMutex.RUnlock() - return metadata.bundleTailHash + return metadata.bundleHeadHash } -func (metadata *TransactionMetadata) SetBundleTailHash(bundleTailHash ternary.Trytes) { - metadata.bundleTailHashMutex.RLock() - if metadata.bundleTailHash != bundleTailHash { - metadata.bundleTailHashMutex.RUnlock() - metadata.bundleTailHashMutex.Lock() - defer metadata.bundleTailHashMutex.Unlock() - if metadata.bundleTailHash != bundleTailHash { - metadata.bundleTailHash = bundleTailHash +func (metadata *TransactionMetadata) SetBundleHeadHash(bundleTailHash ternary.Trytes) { + metadata.bundleHeadHashMutex.RLock() + if metadata.bundleHeadHash != bundleTailHash { + metadata.bundleHeadHashMutex.RUnlock() + metadata.bundleHeadHashMutex.Lock() + defer metadata.bundleHeadHashMutex.Unlock() + if metadata.bundleHeadHash != bundleTailHash { + metadata.bundleHeadHash = bundleTailHash metadata.SetModified(true) } } else { - metadata.bundleTailHashMutex.RUnlock() + metadata.bundleHeadHashMutex.RUnlock() } } diff --git a/packages/model/valuebundle/metabundle.go b/packages/model/valuebundle/metabundle.go deleted file mode 100644 index 47ebb409988084ce89a5b87f3ac58eb41a317419..0000000000000000000000000000000000000000 --- a/packages/model/valuebundle/metabundle.go +++ /dev/null @@ -1,31 +0,0 @@ -package valuebundle - -import ( - "github.com/iotaledger/goshimmer/packages/model/value_transaction" - "github.com/iotaledger/goshimmer/packages/ternary" -) - -type MetaBundle struct { - hash ternary.Trytes - transactionHashes []ternary.Trytes -} - -func New(transactions []*value_transaction.ValueTransaction) (result *MetaBundle) { - result = &MetaBundle{ - hash: CalculateBundleHash(transactions), - } - - return -} - -func (bundle *MetaBundle) GetTransactionHashes() []ternary.Trytes { - return bundle.transactionHashes -} - -func (bundle *MetaBundle) GetHash() ternary.Trytes { - return bundle.hash -} - -func CalculateBundleHash(transactions []*value_transaction.ValueTransaction) ternary.Trytes { - return (<-Hasher.Hash(transactions[0].GetBundleEssence())).ToTrytes() -} diff --git a/plugins/bundleprocessor/errors.go b/plugins/bundleprocessor/errors.go new file mode 100644 index 0000000000000000000000000000000000000000..214b5859117f21079a1e1a6e7fad7437e8ca8cbd --- /dev/null +++ b/plugins/bundleprocessor/errors.go @@ -0,0 +1,7 @@ +package bundleprocessor + +import "github.com/iotaledger/goshimmer/packages/errors" + +var ( + ErrProcessBundleFailed = errors.Wrap(errors.New("bundle processing error"), "failed to process bundle") +) diff --git a/plugins/bundleprocessor/events.go b/plugins/bundleprocessor/events.go new file mode 100644 index 0000000000000000000000000000000000000000..19f4ca0424b193c179d092af2bb18ad64b37c919 --- /dev/null +++ b/plugins/bundleprocessor/events.go @@ -0,0 +1,23 @@ +package bundleprocessor + +import ( + "github.com/iotaledger/goshimmer/packages/events" + "github.com/iotaledger/goshimmer/packages/model/bundle" + "github.com/iotaledger/goshimmer/packages/model/value_transaction" +) + +var Events = pluginEvents{ + DataBundleReceived: events.NewEvent(bundleEventCaller), + ValueBundleReceived: events.NewEvent(bundleEventCaller), + InvalidBundleReceived: events.NewEvent(bundleEventCaller), +} + +type pluginEvents struct { + DataBundleReceived *events.Event + ValueBundleReceived *events.Event + InvalidBundleReceived *events.Event +} + +func bundleEventCaller(handler interface{}, params ...interface{}) { + handler.(func(*bundle.Bundle, []*value_transaction.ValueTransaction))(params[0].(*bundle.Bundle), params[1].([]*value_transaction.ValueTransaction)) +} diff --git a/plugins/bundleprocessor/plugin.go b/plugins/bundleprocessor/plugin.go new file mode 100644 index 0000000000000000000000000000000000000000..4139f2336fe4d7cd9e1f79ecfcd76ab6e14825e0 --- /dev/null +++ b/plugins/bundleprocessor/plugin.go @@ -0,0 +1,96 @@ +package bundleprocessor + +import ( + "github.com/iotaledger/goshimmer/packages/errors" + "github.com/iotaledger/goshimmer/packages/events" + "github.com/iotaledger/goshimmer/packages/model/bundle" + "github.com/iotaledger/goshimmer/packages/model/transactionmetadata" + "github.com/iotaledger/goshimmer/packages/model/value_transaction" + "github.com/iotaledger/goshimmer/packages/node" + "github.com/iotaledger/goshimmer/packages/ternary" + "github.com/iotaledger/goshimmer/plugins/tangle" +) + +var PLUGIN = node.NewPlugin("Bundle Processor", configure) + +func configure(plugin *node.Plugin) { + tangle.Events.TransactionSolid.Attach(events.NewClosure(func(tx *value_transaction.ValueTransaction) { + if tx.IsHead() { + if _, err := ProcessSolidBundleHead(tx); err != nil { + plugin.LogFailure(err.Error()) + } + } + })) +} + +func ProcessSolidBundleHead(headTransaction *value_transaction.ValueTransaction) (*bundle.Bundle, errors.IdentifiableError) { + // only process the bundle if we didn't process it, yet + return tangle.GetBundle(headTransaction.GetHash(), func(headTransactionHash ternary.Trytes) (*bundle.Bundle, errors.IdentifiableError) { + // abort if bundle syntax is wrong + if !headTransaction.IsHead() { + return nil, ErrProcessBundleFailed.Derive(errors.New("invalid parameter"), "transaction needs to be head of bundle") + } + + // initialize result variables + processedBundle := bundle.New(headTransactionHash) + bundleTransactions := make([]*value_transaction.ValueTransaction, 0) + + // iterate through trunk transactions until we reach the tail + currentTransaction := headTransaction + for { + // abort if we reached a previous head + if currentTransaction.IsHead() && currentTransaction != headTransaction { + processedBundle.SetTransactionHashes(mapTransactionsToTransactionHashes(bundleTransactions)) + + Events.InvalidBundleReceived.Trigger(processedBundle, bundleTransactions) + + return nil, ErrProcessBundleFailed.Derive(errors.New("invalid bundle found"), "missing bundle tail") + } + + // update bundle transactions + bundleTransactions = append(bundleTransactions, currentTransaction) + + // retrieve & update metadata + currentTransactionMetadata, dbErr := tangle.GetTransactionMetadata(currentTransaction.GetHash(), transactionmetadata.New) + if dbErr != nil { + return nil, ErrProcessBundleFailed.Derive(dbErr, "failed to retrieve transaction metadata") + } + currentTransactionMetadata.SetBundleHeadHash(headTransactionHash) + + // update value bundle flag + if !processedBundle.IsValueBundle() && currentTransaction.GetValue() != 0 { + processedBundle.SetValueBundle(true) + } + + // if we are done -> trigger events + if currentTransaction.IsTail() { + processedBundle.SetTransactionHashes(mapTransactionsToTransactionHashes(bundleTransactions)) + + if processedBundle.IsValueBundle() { + Events.ValueBundleReceived.Trigger(processedBundle, bundleTransactions) + } else { + Events.DataBundleReceived.Trigger(processedBundle, bundleTransactions) + } + + return processedBundle, nil + } + + // try to iterate to next turn + if nextTransaction, err := tangle.GetTransaction(currentTransaction.GetTrunkTransactionHash()); err != nil { + return nil, ErrProcessBundleFailed.Derive(err, "failed to retrieve trunk while processing bundle") + } else { + currentTransaction = nextTransaction + } + } + }) +} + +func mapTransactionsToTransactionHashes(transactions []*value_transaction.ValueTransaction) []ternary.Trytes { + result := make([]ternary.Trytes, len(transactions)) + + for i, v := range transactions { + result[i] = v.GetHash() + } + + return result +} diff --git a/plugins/tangle/bundle.go b/plugins/tangle/bundle.go new file mode 100644 index 0000000000000000000000000000000000000000..38259be9055c788e3cc53a1a7b06a4644b8be935 --- /dev/null +++ b/plugins/tangle/bundle.go @@ -0,0 +1,128 @@ +package tangle + +import ( + "github.com/dgraph-io/badger" + "github.com/iotaledger/goshimmer/packages/database" + "github.com/iotaledger/goshimmer/packages/datastructure" + "github.com/iotaledger/goshimmer/packages/errors" + "github.com/iotaledger/goshimmer/packages/model/bundle" + "github.com/iotaledger/goshimmer/packages/node" + "github.com/iotaledger/goshimmer/packages/ternary" +) + +// region global public api //////////////////////////////////////////////////////////////////////////////////////////// + +// GetBundle retrieves bundle from the database. +func GetBundle(headerTransactionHash ternary.Trytes, computeIfAbsent ...func(ternary.Trytes) (*bundle.Bundle, errors.IdentifiableError)) (result *bundle.Bundle, err errors.IdentifiableError) { + if cacheResult := bundleCache.ComputeIfAbsent(headerTransactionHash, func() interface{} { + if dbBundle, dbErr := getBundleFromDatabase(headerTransactionHash); dbErr != nil { + err = dbErr + + return nil + } else if dbBundle != nil { + return dbBundle + } else { + if len(computeIfAbsent) >= 1 { + if computedBundle, computedErr := computeIfAbsent[0](headerTransactionHash); computedErr != nil { + err = computedErr + } else { + return computedBundle + } + } + + return nil + } + }); cacheResult != nil && cacheResult.(*bundle.Bundle) != nil { + result = cacheResult.(*bundle.Bundle) + } + + return +} + +func ContainsBundle(headerTransactionHash ternary.Trytes) (result bool, err errors.IdentifiableError) { + if bundleCache.Contains(headerTransactionHash) { + result = true + } else { + result, err = databaseContainsBundle(headerTransactionHash) + } + + return +} + +func StoreBundle(bundle *bundle.Bundle) { + bundleCache.Set(bundle.GetHash(), bundle) +} + +// region lru cache //////////////////////////////////////////////////////////////////////////////////////////////////// + +var bundleCache = datastructure.NewLRUCache(BUNDLE_CACHE_SIZE, &datastructure.LRUCacheOptions{ + EvictionCallback: onEvictBundle, +}) + +func onEvictBundle(_ interface{}, value interface{}) { + if evictedBundle := value.(*bundle.Bundle); evictedBundle.GetModified() { + go func(evictedBundle *bundle.Bundle) { + if err := storeBundleInDatabase(evictedBundle); err != nil { + panic(err) + } + }(evictedBundle) + } +} + +const ( + BUNDLE_CACHE_SIZE = 50000 +) + +// endregion /////////////////////////////////////////////////////////////////////////////////////////////////////////// + +// region database ///////////////////////////////////////////////////////////////////////////////////////////////////// + +var bundleDatabase database.Database + +func configureBundleDatabase(plugin *node.Plugin) { + if db, err := database.Get("bundle"); err != nil { + panic(err) + } else { + bundleDatabase = db + } +} + +func storeBundleInDatabase(bundle *bundle.Bundle) errors.IdentifiableError { + if bundle.GetModified() { + if err := bundleDatabase.Set(bundle.GetHash().CastToBytes(), bundle.Marshal()); err != nil { + return ErrDatabaseError.Derive(err, "failed to store bundle") + } + + bundle.SetModified(false) + } + + return nil +} + +func getBundleFromDatabase(transactionHash ternary.Trytes) (*bundle.Bundle, errors.IdentifiableError) { + bundleData, err := bundleDatabase.Get(transactionHash.CastToBytes()) + if err != nil { + if err == badger.ErrKeyNotFound { + return nil, nil + } + + return nil, ErrDatabaseError.Derive(err, "failed to retrieve bundle") + } + + var result bundle.Bundle + if err = result.Unmarshal(bundleData); err != nil { + panic(err) + } + + return &result, nil +} + +func databaseContainsBundle(transactionHash ternary.Trytes) (bool, errors.IdentifiableError) { + if contains, err := bundleDatabase.Contains(transactionHash.CastToBytes()); err != nil { + return false, ErrDatabaseError.Derive(err, "failed to check if the bundle exists") + } else { + return contains, nil + } +} + +// endregion ///////////////////////////////////////////////////////////////////////////////////////////////////////////