Skip to content
Snippets Groups Projects
Commit f4ca1237 authored by Hans Moog's avatar Hans Moog
Browse files

Feat: intermediary commit (bundle processor)

parent 754ace90
No related branches found
No related tags found
No related merge requests found
......@@ -18,6 +18,8 @@ type LRUCache struct {
size int
options *LRUCacheOptions
mutex sync.RWMutex
keyMutexes map[interface{}]*sync.RWMutex
keyMutexesMutex sync.RWMutex
}
func NewLRUCache(capacity int, options ...*LRUCacheOptions) *LRUCache {
......@@ -33,14 +35,20 @@ func NewLRUCache(capacity int, options ...*LRUCacheOptions) *LRUCache {
doublyLinkedList: &DoublyLinkedList{},
capacity: capacity,
options: currentOptions,
keyMutexes: make(map[interface{}]*sync.RWMutex),
}
}
func (cache *LRUCache) Set(key interface{}, value interface{}) {
cache.mutex.Lock()
defer cache.mutex.Unlock()
keyMutex := cache.getKeyMutex(key)
keyMutex.Lock()
cache.mutex.Lock()
cache.set(key, value)
cache.mutex.Unlock()
keyMutex.Unlock()
cache.deleteKeyMutex(key)
}
func (cache *LRUCache) set(key interface{}, value interface{}) {
......@@ -76,17 +84,31 @@ func (cache *LRUCache) set(key interface{}, value interface{}) {
}
func (cache *LRUCache) ComputeIfAbsent(key interface{}, callback func() interface{}) (result interface{}) {
cache.mutex.Lock()
defer cache.mutex.Unlock()
keyMutex := cache.getKeyMutex(key)
keyMutex.RLock()
cache.mutex.RLock()
if element, exists := cache.directory[key]; exists {
cache.mutex.RUnlock()
cache.mutex.Lock()
cache.promoteElement(element)
cache.mutex.Unlock()
result = element.GetValue().(*lruCacheElement).value
keyMutex.RUnlock()
} else {
if result = callback(); result != nil {
cache.mutex.RUnlock()
keyMutex.RUnlock()
keyMutex.Lock()
if result = callback(); !typeutils.IsInterfaceNil(result) {
cache.mutex.Lock()
cache.set(key, result)
cache.mutex.Unlock()
}
keyMutex.Unlock()
cache.deleteKeyMutex(key)
}
return
......@@ -97,31 +119,44 @@ func (cache *LRUCache) ComputeIfAbsent(key interface{}, callback func() interfac
// If the callback returns nil the entry is removed from the cache.
// Returns the updated entry.
func (cache *LRUCache) ComputeIfPresent(key interface{}, callback func(value interface{}) interface{}) (result interface{}) {
cache.mutex.Lock()
defer cache.mutex.Unlock()
keyMutex := cache.getKeyMutex(key)
keyMutex.RLock()
cache.mutex.RLock()
if entry, exists := cache.directory[key]; exists {
cache.mutex.RUnlock()
result = entry.GetValue().(*lruCacheElement).value
keyMutex.RUnlock()
keyMutex.Lock()
if callbackResult := callback(result); !typeutils.IsInterfaceNil(callbackResult) {
result = callbackResult
cache.mutex.Lock()
cache.set(key, callbackResult)
cache.mutex.Unlock()
keyMutex.Unlock()
} else {
cache.mutex.Lock()
if err := cache.doublyLinkedList.removeEntry(entry); err != nil {
panic(err)
}
delete(cache.directory, key)
cache.size--
cache.mutex.Unlock()
keyMutex.Unlock()
if cache.options.EvictionCallback != nil {
cache.options.EvictionCallback(key, result)
}
}
} else {
result = nil
cache.mutex.RUnlock()
keyMutex.RUnlock()
}
cache.deleteKeyMutex(key)
return
}
......@@ -208,3 +243,28 @@ func (cache *LRUCache) promoteElement(element *DoublyLinkedListEntry) {
}
cache.doublyLinkedList.addFirstEntry(element)
}
func (cache *LRUCache) getKeyMutex(key interface{}) (result *sync.RWMutex) {
cache.keyMutexesMutex.RLock()
if keyMutex, keyMutexExists := cache.keyMutexes[key]; !keyMutexExists {
cache.keyMutexesMutex.RUnlock()
cache.keyMutexesMutex.Lock()
result = &sync.RWMutex{}
cache.keyMutexes[key] = result
cache.keyMutexesMutex.Unlock()
} else {
cache.keyMutexesMutex.RUnlock()
result = keyMutex
}
return
}
func (cache *LRUCache) deleteKeyMutex(key interface{}) {
cache.keyMutexesMutex.Lock()
delete(cache.keyMutexes, key)
cache.keyMutexesMutex.Unlock()
}
package bundle
import (
"sync"
"github.com/iotaledger/goshimmer/packages/errors"
"github.com/iotaledger/goshimmer/packages/model/value_transaction"
"github.com/iotaledger/goshimmer/packages/ternary"
)
type Bundle struct {
hash ternary.Trytes
transactionHashes []ternary.Trytes
isValueBundle bool
isValueBundleMutex sync.RWMutex
bundleEssenceHash ternary.Trytes
modified bool
modifiedMutex sync.RWMutex
}
func New(headTransactionHash ternary.Trytes) (result *Bundle) {
result = &Bundle{
hash: headTransactionHash,
}
return
}
func (bundle *Bundle) GetTransactionHashes() []ternary.Trytes {
return bundle.transactionHashes
}
func (bundle *Bundle) GetHash() ternary.Trytes {
return bundle.hash
}
func (bundle *Bundle) IsValueBundle() (result bool) {
bundle.isValueBundleMutex.RLock()
result = bundle.isValueBundle
bundle.isValueBundleMutex.RUnlock()
return
}
func (bundle *Bundle) SetValueBundle(valueBundle bool) {
bundle.isValueBundleMutex.Lock()
bundle.isValueBundle = valueBundle
bundle.isValueBundleMutex.Unlock()
}
func (bundle *Bundle) GetModified() (result bool) {
bundle.modifiedMutex.RLock()
result = bundle.modified
bundle.modifiedMutex.RUnlock()
return
}
func (bundle *Bundle) SetModified(modified bool) {
bundle.modifiedMutex.Lock()
bundle.modified = modified
bundle.modifiedMutex.Unlock()
}
func (bundle *Bundle) Marshal() []byte {
return nil
}
func (bundle *Bundle) Unmarshal(data []byte) errors.IdentifiableError {
return nil
}
func CalculateBundleHash(transactions []*value_transaction.ValueTransaction) ternary.Trytes {
return (<-Hasher.Hash(transactions[0].GetBundleEssence())).ToTrytes()
}
package valuebundle
package bundle
import (
"github.com/iotaledger/goshimmer/packages/curl"
......
......@@ -264,7 +264,7 @@ func (this *MetaTransaction) SetBranchTransactionHash(branchTransactionHash tern
}
// getter for the head flag (supports concurrency)
func (this *MetaTransaction) GetHead() (result bool) {
func (this *MetaTransaction) IsHead() (result bool) {
this.headMutex.RLock()
if this.head == nil {
this.headMutex.RUnlock()
......@@ -315,7 +315,7 @@ func (this *MetaTransaction) SetHead(head bool) bool {
}
// getter for the tail flag (supports concurrency)
func (this *MetaTransaction) GetTail() (result bool) {
func (this *MetaTransaction) IsTail() (result bool) {
this.tailMutex.RLock()
if this.tail == nil {
this.tailMutex.RUnlock()
......
......@@ -29,8 +29,8 @@ func TestMetaTransaction_SettersGetters(t *testing.T) {
assert.Equal(t, transaction.GetShardMarker(), shardMarker)
assert.Equal(t, transaction.GetTrunkTransactionHash(), trunkTransactionHash)
assert.Equal(t, transaction.GetBranchTransactionHash(), branchTransactionHash)
assert.Equal(t, transaction.GetHead(), head)
assert.Equal(t, transaction.GetTail(), tail)
assert.Equal(t, transaction.IsHead(), head)
assert.Equal(t, transaction.IsTail(), tail)
assert.Equal(t, transaction.GetTransactionType(), transactionType)
assert.Equal(t, transaction.GetHash(), FromBytes(transaction.GetBytes()).GetHash())
......
......@@ -15,8 +15,8 @@ import (
type TransactionMetadata struct {
hash ternary.Trytes
hashMutex sync.RWMutex
bundleTailHash ternary.Trytes
bundleTailHashMutex sync.RWMutex
bundleHeadHash ternary.Trytes
bundleHeadHashMutex sync.RWMutex
receivedTime time.Time
receivedTimeMutex sync.RWMutex
solid bool
......@@ -67,26 +67,26 @@ func (metadata *TransactionMetadata) SetHash(hash ternary.Trytes) {
}
}
func (metadata *TransactionMetadata) GetBundleTailHash() ternary.Trytes {
metadata.bundleTailHashMutex.RLock()
defer metadata.bundleTailHashMutex.RUnlock()
func (metadata *TransactionMetadata) GetBundleHeadHash() ternary.Trytes {
metadata.bundleHeadHashMutex.RLock()
defer metadata.bundleHeadHashMutex.RUnlock()
return metadata.bundleTailHash
return metadata.bundleHeadHash
}
func (metadata *TransactionMetadata) SetBundleTailHash(bundleTailHash ternary.Trytes) {
metadata.bundleTailHashMutex.RLock()
if metadata.bundleTailHash != bundleTailHash {
metadata.bundleTailHashMutex.RUnlock()
metadata.bundleTailHashMutex.Lock()
defer metadata.bundleTailHashMutex.Unlock()
if metadata.bundleTailHash != bundleTailHash {
metadata.bundleTailHash = bundleTailHash
func (metadata *TransactionMetadata) SetBundleHeadHash(bundleTailHash ternary.Trytes) {
metadata.bundleHeadHashMutex.RLock()
if metadata.bundleHeadHash != bundleTailHash {
metadata.bundleHeadHashMutex.RUnlock()
metadata.bundleHeadHashMutex.Lock()
defer metadata.bundleHeadHashMutex.Unlock()
if metadata.bundleHeadHash != bundleTailHash {
metadata.bundleHeadHash = bundleTailHash
metadata.SetModified(true)
}
} else {
metadata.bundleTailHashMutex.RUnlock()
metadata.bundleHeadHashMutex.RUnlock()
}
}
......
package valuebundle
import (
"github.com/iotaledger/goshimmer/packages/model/value_transaction"
"github.com/iotaledger/goshimmer/packages/ternary"
)
type MetaBundle struct {
hash ternary.Trytes
transactionHashes []ternary.Trytes
}
func New(transactions []*value_transaction.ValueTransaction) (result *MetaBundle) {
result = &MetaBundle{
hash: CalculateBundleHash(transactions),
}
return
}
func (bundle *MetaBundle) GetTransactionHashes() []ternary.Trytes {
return bundle.transactionHashes
}
func (bundle *MetaBundle) GetHash() ternary.Trytes {
return bundle.hash
}
func CalculateBundleHash(transactions []*value_transaction.ValueTransaction) ternary.Trytes {
return (<-Hasher.Hash(transactions[0].GetBundleEssence())).ToTrytes()
}
package bundleprocessor
import "github.com/iotaledger/goshimmer/packages/errors"
var (
ErrProcessBundleFailed = errors.Wrap(errors.New("bundle processing error"), "failed to process bundle")
)
package bundleprocessor
import (
"github.com/iotaledger/goshimmer/packages/events"
"github.com/iotaledger/goshimmer/packages/model/bundle"
"github.com/iotaledger/goshimmer/packages/model/value_transaction"
)
var Events = pluginEvents{
DataBundleReceived: events.NewEvent(bundleEventCaller),
ValueBundleReceived: events.NewEvent(bundleEventCaller),
InvalidBundleReceived: events.NewEvent(bundleEventCaller),
}
type pluginEvents struct {
DataBundleReceived *events.Event
ValueBundleReceived *events.Event
InvalidBundleReceived *events.Event
}
func bundleEventCaller(handler interface{}, params ...interface{}) {
handler.(func(*bundle.Bundle, []*value_transaction.ValueTransaction))(params[0].(*bundle.Bundle), params[1].([]*value_transaction.ValueTransaction))
}
package bundleprocessor
import (
"github.com/iotaledger/goshimmer/packages/errors"
"github.com/iotaledger/goshimmer/packages/events"
"github.com/iotaledger/goshimmer/packages/model/bundle"
"github.com/iotaledger/goshimmer/packages/model/transactionmetadata"
"github.com/iotaledger/goshimmer/packages/model/value_transaction"
"github.com/iotaledger/goshimmer/packages/node"
"github.com/iotaledger/goshimmer/packages/ternary"
"github.com/iotaledger/goshimmer/plugins/tangle"
)
var PLUGIN = node.NewPlugin("Bundle Processor", configure)
func configure(plugin *node.Plugin) {
tangle.Events.TransactionSolid.Attach(events.NewClosure(func(tx *value_transaction.ValueTransaction) {
if tx.IsHead() {
if _, err := ProcessSolidBundleHead(tx); err != nil {
plugin.LogFailure(err.Error())
}
}
}))
}
func ProcessSolidBundleHead(headTransaction *value_transaction.ValueTransaction) (*bundle.Bundle, errors.IdentifiableError) {
// only process the bundle if we didn't process it, yet
return tangle.GetBundle(headTransaction.GetHash(), func(headTransactionHash ternary.Trytes) (*bundle.Bundle, errors.IdentifiableError) {
// abort if bundle syntax is wrong
if !headTransaction.IsHead() {
return nil, ErrProcessBundleFailed.Derive(errors.New("invalid parameter"), "transaction needs to be head of bundle")
}
// initialize result variables
processedBundle := bundle.New(headTransactionHash)
bundleTransactions := make([]*value_transaction.ValueTransaction, 0)
// iterate through trunk transactions until we reach the tail
currentTransaction := headTransaction
for {
// abort if we reached a previous head
if currentTransaction.IsHead() && currentTransaction != headTransaction {
processedBundle.SetTransactionHashes(mapTransactionsToTransactionHashes(bundleTransactions))
Events.InvalidBundleReceived.Trigger(processedBundle, bundleTransactions)
return nil, ErrProcessBundleFailed.Derive(errors.New("invalid bundle found"), "missing bundle tail")
}
// update bundle transactions
bundleTransactions = append(bundleTransactions, currentTransaction)
// retrieve & update metadata
currentTransactionMetadata, dbErr := tangle.GetTransactionMetadata(currentTransaction.GetHash(), transactionmetadata.New)
if dbErr != nil {
return nil, ErrProcessBundleFailed.Derive(dbErr, "failed to retrieve transaction metadata")
}
currentTransactionMetadata.SetBundleHeadHash(headTransactionHash)
// update value bundle flag
if !processedBundle.IsValueBundle() && currentTransaction.GetValue() != 0 {
processedBundle.SetValueBundle(true)
}
// if we are done -> trigger events
if currentTransaction.IsTail() {
processedBundle.SetTransactionHashes(mapTransactionsToTransactionHashes(bundleTransactions))
if processedBundle.IsValueBundle() {
Events.ValueBundleReceived.Trigger(processedBundle, bundleTransactions)
} else {
Events.DataBundleReceived.Trigger(processedBundle, bundleTransactions)
}
return processedBundle, nil
}
// try to iterate to next turn
if nextTransaction, err := tangle.GetTransaction(currentTransaction.GetTrunkTransactionHash()); err != nil {
return nil, ErrProcessBundleFailed.Derive(err, "failed to retrieve trunk while processing bundle")
} else {
currentTransaction = nextTransaction
}
}
})
}
func mapTransactionsToTransactionHashes(transactions []*value_transaction.ValueTransaction) []ternary.Trytes {
result := make([]ternary.Trytes, len(transactions))
for i, v := range transactions {
result[i] = v.GetHash()
}
return result
}
package tangle
import (
"github.com/dgraph-io/badger"
"github.com/iotaledger/goshimmer/packages/database"
"github.com/iotaledger/goshimmer/packages/datastructure"
"github.com/iotaledger/goshimmer/packages/errors"
"github.com/iotaledger/goshimmer/packages/model/bundle"
"github.com/iotaledger/goshimmer/packages/node"
"github.com/iotaledger/goshimmer/packages/ternary"
)
// region global public api ////////////////////////////////////////////////////////////////////////////////////////////
// GetBundle retrieves bundle from the database.
func GetBundle(headerTransactionHash ternary.Trytes, computeIfAbsent ...func(ternary.Trytes) (*bundle.Bundle, errors.IdentifiableError)) (result *bundle.Bundle, err errors.IdentifiableError) {
if cacheResult := bundleCache.ComputeIfAbsent(headerTransactionHash, func() interface{} {
if dbBundle, dbErr := getBundleFromDatabase(headerTransactionHash); dbErr != nil {
err = dbErr
return nil
} else if dbBundle != nil {
return dbBundle
} else {
if len(computeIfAbsent) >= 1 {
if computedBundle, computedErr := computeIfAbsent[0](headerTransactionHash); computedErr != nil {
err = computedErr
} else {
return computedBundle
}
}
return nil
}
}); cacheResult != nil && cacheResult.(*bundle.Bundle) != nil {
result = cacheResult.(*bundle.Bundle)
}
return
}
func ContainsBundle(headerTransactionHash ternary.Trytes) (result bool, err errors.IdentifiableError) {
if bundleCache.Contains(headerTransactionHash) {
result = true
} else {
result, err = databaseContainsBundle(headerTransactionHash)
}
return
}
func StoreBundle(bundle *bundle.Bundle) {
bundleCache.Set(bundle.GetHash(), bundle)
}
// region lru cache ////////////////////////////////////////////////////////////////////////////////////////////////////
var bundleCache = datastructure.NewLRUCache(BUNDLE_CACHE_SIZE, &datastructure.LRUCacheOptions{
EvictionCallback: onEvictBundle,
})
func onEvictBundle(_ interface{}, value interface{}) {
if evictedBundle := value.(*bundle.Bundle); evictedBundle.GetModified() {
go func(evictedBundle *bundle.Bundle) {
if err := storeBundleInDatabase(evictedBundle); err != nil {
panic(err)
}
}(evictedBundle)
}
}
const (
BUNDLE_CACHE_SIZE = 50000
)
// endregion ///////////////////////////////////////////////////////////////////////////////////////////////////////////
// region database /////////////////////////////////////////////////////////////////////////////////////////////////////
var bundleDatabase database.Database
func configureBundleDatabase(plugin *node.Plugin) {
if db, err := database.Get("bundle"); err != nil {
panic(err)
} else {
bundleDatabase = db
}
}
func storeBundleInDatabase(bundle *bundle.Bundle) errors.IdentifiableError {
if bundle.GetModified() {
if err := bundleDatabase.Set(bundle.GetHash().CastToBytes(), bundle.Marshal()); err != nil {
return ErrDatabaseError.Derive(err, "failed to store bundle")
}
bundle.SetModified(false)
}
return nil
}
func getBundleFromDatabase(transactionHash ternary.Trytes) (*bundle.Bundle, errors.IdentifiableError) {
bundleData, err := bundleDatabase.Get(transactionHash.CastToBytes())
if err != nil {
if err == badger.ErrKeyNotFound {
return nil, nil
}
return nil, ErrDatabaseError.Derive(err, "failed to retrieve bundle")
}
var result bundle.Bundle
if err = result.Unmarshal(bundleData); err != nil {
panic(err)
}
return &result, nil
}
func databaseContainsBundle(transactionHash ternary.Trytes) (bool, errors.IdentifiableError) {
if contains, err := bundleDatabase.Contains(transactionHash.CastToBytes()); err != nil {
return false, ErrDatabaseError.Derive(err, "failed to check if the bundle exists")
} else {
return contains, nil
}
}
// endregion ///////////////////////////////////////////////////////////////////////////////////////////////////////////
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment