diff --git a/packages/binary/bytesfilter/bytes_filter.go b/packages/binary/bytesfilter/bytes_filter.go new file mode 100644 index 0000000000000000000000000000000000000000..e434eef8d58ddd0f7746876ea4ca32c1e1a38962 --- /dev/null +++ b/packages/binary/bytesfilter/bytes_filter.go @@ -0,0 +1,58 @@ +package bytesfilter + +import ( + "sync" + + "github.com/iotaledger/hive.go/typeutils" + + "github.com/iotaledger/goshimmer/packages/binary/types" +) + +type BytesFilter struct { + byteArrays [][]byte + bytesByKey map[string]types.Empty + size int + mutex sync.RWMutex +} + +func New(size int) *BytesFilter { + return &BytesFilter{ + byteArrays: make([][]byte, 0, size), + bytesByKey: make(map[string]types.Empty, size), + size: size, + } +} + +func (bytesFilter *BytesFilter) Add(bytes []byte) bool { + key := typeutils.BytesToString(bytes) + + bytesFilter.mutex.Lock() + + if _, exists := bytesFilter.bytesByKey[key]; !exists { + if len(bytesFilter.byteArrays) == bytesFilter.size { + delete(bytesFilter.bytesByKey, typeutils.BytesToString(bytesFilter.byteArrays[0])) + + bytesFilter.byteArrays = append(bytesFilter.byteArrays[1:], bytes) + } else { + bytesFilter.byteArrays = append(bytesFilter.byteArrays, bytes) + } + + bytesFilter.bytesByKey[key] = types.Void + + bytesFilter.mutex.Unlock() + + return true + } else { + bytesFilter.mutex.Unlock() + + return false + } +} + +func (bytesFilter *BytesFilter) Contains(byteArray []byte) (exists bool) { + bytesFilter.mutex.RLock() + _, exists = bytesFilter.bytesByKey[typeutils.BytesToString(byteArray)] + bytesFilter.mutex.RUnlock() + + return +} diff --git a/packages/binary/bytesfilter/bytes_filter_test.go b/packages/binary/bytesfilter/bytes_filter_test.go new file mode 100644 index 0000000000000000000000000000000000000000..1b928ffeb32647839fd30aae4e47451c534e2f0c --- /dev/null +++ b/packages/binary/bytesfilter/bytes_filter_test.go @@ -0,0 +1,47 @@ +package bytesfilter + +import ( + "testing" +) + +func BenchmarkAdd(b *testing.B) { + filter, bytesFilter := setupTest(15000, 1604) + + b.ResetTimer() + + for i := 0; i < b.N; i++ { + filter.Add(bytesFilter) + } +} + +func BenchmarkContains(b *testing.B) { + filter, bytesFilter := setupTest(15000, 1604) + + b.ResetTimer() + + for i := 0; i < b.N; i++ { + filter.Contains(bytesFilter) + } +} + +func setupTest(filterSize int, byteArraySize int) (*BytesFilter, []byte) { + filter := New(filterSize) + + for j := 0; j < filterSize; j++ { + byteArray := make([]byte, byteArraySize) + + for i := 0; i < len(byteArray); i++ { + byteArray[(i+j)%byteArraySize] = byte((i + j) % 128) + } + + filter.Add(byteArray) + } + + byteArray := make([]byte, byteArraySize) + + for i := 0; i < len(byteArray); i++ { + byteArray[i] = byte(i % 128) + } + + return filter, byteArray +} diff --git a/packages/binary/tangle/events.go b/packages/binary/tangle/events.go index 0f4a24ebdd80ef8680dafb25f78cbb690837d8a1..5afba2788ccf9899a2abb86c8ea86f7b696a8e6a 100644 --- a/packages/binary/tangle/events.go +++ b/packages/binary/tangle/events.go @@ -5,5 +5,7 @@ import ( ) type tangleEvents struct { - TransactionSolid *events.Event + TransactionSolid *events.Event + TransactionAttached *events.Event + Error *events.Event } diff --git a/packages/binary/tangle/tangle.go b/packages/binary/tangle/tangle.go index edaf55e104902a4057d588851d0f9b31aaeccad0..e584a44267733019e0678067491b11aa65f881b1 100644 --- a/packages/binary/tangle/tangle.go +++ b/packages/binary/tangle/tangle.go @@ -5,9 +5,11 @@ import ( "github.com/iotaledger/goshimmer/packages/binary/transaction" "github.com/iotaledger/goshimmer/packages/binary/transactionmetadata" "github.com/iotaledger/goshimmer/packages/storageprefix" + "github.com/iotaledger/goshimmer/packages/stringify" "github.com/iotaledger/hive.go/async" "github.com/iotaledger/hive.go/events" "github.com/iotaledger/hive.go/objectstorage" + "github.com/pkg/errors" ) type Tangle struct { @@ -29,6 +31,15 @@ func New(storageId []byte) (result *Tangle) { approversStorage: objectstorage.New(append(storageId, storageprefix.TangleApprovers...), approversFactory), Events: tangleEvents{ + TransactionAttached: events.NewEvent(func(handler interface{}, params ...interface{}) { + cachedTransaction := params[0].(*objectstorage.CachedObject) + cachedTransactionMetadata := params[1].(*objectstorage.CachedObject) + + cachedTransaction.RegisterConsumer() + cachedTransactionMetadata.RegisterConsumer() + + handler.(func(*objectstorage.CachedObject, *objectstorage.CachedObject))(cachedTransaction, cachedTransactionMetadata) + }), TransactionSolid: events.NewEvent(func(handler interface{}, params ...interface{}) { cachedTransaction := params[0].(*objectstorage.CachedObject) cachedTransactionMetadata := params[1].(*objectstorage.CachedObject) @@ -38,6 +49,9 @@ func New(storageId []byte) (result *Tangle) { handler.(func(*objectstorage.CachedObject, *objectstorage.CachedObject))(cachedTransaction, cachedTransactionMetadata) }), + Error: events.NewEvent(func(handler interface{}, params ...interface{}) { + handler.(func(error))(params[0].(error)) + }), }, } @@ -80,7 +94,7 @@ func (tangle *Tangle) GetApprovers(transactionId transaction.Id) *objectstorage. func (tangle *Tangle) verifyTransaction(transaction *transaction.Transaction) { if !transaction.VerifySignature() { - // err = errors.New("transaction has invalid signature") + tangle.Events.Error.Trigger(errors.New("transaction with id " + stringify.Interface(transaction.GetId()) + " has an invalid signature")) return } diff --git a/packages/binary/transactionparser/builtinfilters/recently_seen_bytes_filter.go b/packages/binary/transactionparser/builtinfilters/recently_seen_bytes_filter.go new file mode 100644 index 0000000000000000000000000000000000000000..1e8f93c033023e4570000189874f8a6a9fe34dcc --- /dev/null +++ b/packages/binary/transactionparser/builtinfilters/recently_seen_bytes_filter.go @@ -0,0 +1,69 @@ +package builtinfilters + +import ( + "sync" + + "github.com/iotaledger/hive.go/async" + + "github.com/iotaledger/goshimmer/packages/binary/bytesfilter" +) + +type RecentlySeenBytesFilter struct { + bytesFilter *bytesfilter.BytesFilter + onAcceptCallback func(bytes []byte) + onRejectCallback func(bytes []byte) + workerPool async.WorkerPool + + onAcceptCallbackMutex sync.RWMutex + onRejectCallbackMutex sync.RWMutex +} + +func NewRecentlySeenBytesFilter() (result *RecentlySeenBytesFilter) { + result = &RecentlySeenBytesFilter{ + bytesFilter: bytesfilter.New(100000), + } + + return +} + +func (filter *RecentlySeenBytesFilter) Filter(bytes []byte) { + filter.workerPool.Submit(func() { + if filter.bytesFilter.Add(bytes) { + filter.getAcceptCallback()(bytes) + } else { + filter.getRejectCallback()(bytes) + } + }) +} + +func (filter *RecentlySeenBytesFilter) OnAccept(callback func(bytes []byte)) { + filter.onAcceptCallbackMutex.Lock() + filter.onAcceptCallback = callback + filter.onAcceptCallbackMutex.Unlock() +} + +func (filter *RecentlySeenBytesFilter) OnReject(callback func(bytes []byte)) { + filter.onRejectCallbackMutex.Lock() + filter.onRejectCallback = callback + filter.onRejectCallbackMutex.Unlock() +} + +func (filter *RecentlySeenBytesFilter) getAcceptCallback() (result func(bytes []byte)) { + filter.onAcceptCallbackMutex.Lock() + result = filter.onAcceptCallback + filter.onAcceptCallbackMutex.Unlock() + + return +} + +func (filter *RecentlySeenBytesFilter) getRejectCallback() (result func(bytes []byte)) { + filter.onRejectCallbackMutex.Lock() + result = filter.onRejectCallback + filter.onRejectCallbackMutex.Unlock() + + return +} + +func (filter *RecentlySeenBytesFilter) Shutdown() { + filter.workerPool.ShutdownGracefully() +} diff --git a/packages/binary/transactionparser/builtinfilters/transaction_signature_filter.go b/packages/binary/transactionparser/builtinfilters/transaction_signature_filter.go new file mode 100644 index 0000000000000000000000000000000000000000..6862426cc273a059867fb4a25e6b651d328ff114 --- /dev/null +++ b/packages/binary/transactionparser/builtinfilters/transaction_signature_filter.go @@ -0,0 +1,66 @@ +package builtinfilters + +import ( + "sync" + + "github.com/iotaledger/goshimmer/packages/binary/transaction" + + "github.com/iotaledger/hive.go/async" +) + +type TransactionSignatureFilter struct { + onAcceptCallback func(tx *transaction.Transaction) + onRejectCallback func(tx *transaction.Transaction) + workerPool async.WorkerPool + + onAcceptCallbackMutex sync.RWMutex + onRejectCallbackMutex sync.RWMutex +} + +func NewTransactionSignatureFilter() (result *TransactionSignatureFilter) { + result = &TransactionSignatureFilter{} + + return +} + +func (filter *TransactionSignatureFilter) Filter(tx *transaction.Transaction) { + filter.workerPool.Submit(func() { + if tx.VerifySignature() { + filter.getAcceptCallback()(tx) + } else { + filter.getRejectCallback()(tx) + } + }) +} + +func (filter *TransactionSignatureFilter) OnAccept(callback func(tx *transaction.Transaction)) { + filter.onAcceptCallbackMutex.Lock() + filter.onAcceptCallback = callback + filter.onAcceptCallbackMutex.Unlock() +} + +func (filter *TransactionSignatureFilter) OnReject(callback func(tx *transaction.Transaction)) { + filter.onRejectCallbackMutex.Lock() + filter.onRejectCallback = callback + filter.onRejectCallbackMutex.Unlock() +} + +func (filter *TransactionSignatureFilter) Shutdown() { + filter.workerPool.ShutdownGracefully() +} + +func (filter *TransactionSignatureFilter) getAcceptCallback() (result func(tx *transaction.Transaction)) { + filter.onAcceptCallbackMutex.RLock() + result = filter.onAcceptCallback + filter.onAcceptCallbackMutex.RUnlock() + + return +} + +func (filter *TransactionSignatureFilter) getRejectCallback() (result func(tx *transaction.Transaction)) { + filter.onRejectCallbackMutex.RLock() + result = filter.onRejectCallback + filter.onRejectCallbackMutex.RUnlock() + + return +} diff --git a/packages/binary/transactionparser/builtinfilters/value_transaction_signature_filter.go b/packages/binary/transactionparser/builtinfilters/value_transaction_signature_filter.go new file mode 100644 index 0000000000000000000000000000000000000000..4a0b0cdb5d49888eb0bc8038cc14a8b7d869a2a1 --- /dev/null +++ b/packages/binary/transactionparser/builtinfilters/value_transaction_signature_filter.go @@ -0,0 +1,71 @@ +package builtinfilters + +import ( + "sync" + + "github.com/iotaledger/goshimmer/packages/binary/transaction/payload/valuetransfer" + + "github.com/iotaledger/goshimmer/packages/binary/transaction" + "github.com/iotaledger/hive.go/async" +) + +type ValueTransactionSignatureFilter struct { + onAcceptCallback func(tx *transaction.Transaction) + onRejectCallback func(tx *transaction.Transaction) + workerPool async.WorkerPool + + onAcceptCallbackMutex sync.RWMutex + onRejectCallbackMutex sync.RWMutex +} + +func NewValueTransactionSignatureFilter() (result *ValueTransactionSignatureFilter) { + result = &ValueTransactionSignatureFilter{} + + return +} + +func (filter *ValueTransactionSignatureFilter) Filter(tx *transaction.Transaction) { + filter.workerPool.Submit(func() { + if payload := tx.GetPayload(); payload.GetType() == valuetransfer.Type { + if valueTransfer, ok := payload.(*valuetransfer.ValueTransfer); ok && valueTransfer.VerifySignatures() { + filter.getAcceptCallback()(tx) + } else { + filter.getRejectCallback()(tx) + } + } else { + filter.getAcceptCallback()(tx) + } + }) +} + +func (filter *ValueTransactionSignatureFilter) OnAccept(callback func(tx *transaction.Transaction)) { + filter.onAcceptCallbackMutex.Lock() + filter.onAcceptCallback = callback + filter.onAcceptCallbackMutex.Unlock() +} + +func (filter *ValueTransactionSignatureFilter) OnReject(callback func(tx *transaction.Transaction)) { + filter.onRejectCallbackMutex.Lock() + filter.onRejectCallback = callback + filter.onRejectCallbackMutex.Unlock() +} + +func (filter *ValueTransactionSignatureFilter) Shutdown() { + filter.workerPool.ShutdownGracefully() +} + +func (filter *ValueTransactionSignatureFilter) getAcceptCallback() (result func(tx *transaction.Transaction)) { + filter.onAcceptCallbackMutex.RLock() + result = filter.onAcceptCallback + filter.onAcceptCallbackMutex.RUnlock() + + return +} + +func (filter *ValueTransactionSignatureFilter) getRejectCallback() (result func(tx *transaction.Transaction)) { + filter.onRejectCallbackMutex.RLock() + result = filter.onRejectCallback + filter.onRejectCallbackMutex.RUnlock() + + return +} diff --git a/packages/binary/transactionparser/bytes_filter.go b/packages/binary/transactionparser/bytes_filter.go new file mode 100644 index 0000000000000000000000000000000000000000..c8e2bab61a1ff376f860cd97c7dfa640dda9286d --- /dev/null +++ b/packages/binary/transactionparser/bytes_filter.go @@ -0,0 +1,8 @@ +package transactionparser + +type BytesFilter interface { + Filter(bytes []byte) + OnAccept(callback func(bytes []byte)) + OnReject(callback func(bytes []byte)) + Shutdown() +} diff --git a/packages/binary/transactionparser/events.go b/packages/binary/transactionparser/events.go new file mode 100644 index 0000000000000000000000000000000000000000..9bde1a04963fa72ec3f53bd91cd81947e66e886c --- /dev/null +++ b/packages/binary/transactionparser/events.go @@ -0,0 +1,9 @@ +package transactionparser + +import "github.com/iotaledger/hive.go/events" + +type transactionParserEvents struct { + BytesRejected *events.Event + TransactionParsed *events.Event + TransactionRejected *events.Event +} diff --git a/packages/binary/transactionparser/transaction_filter.go b/packages/binary/transactionparser/transaction_filter.go new file mode 100644 index 0000000000000000000000000000000000000000..39f15788ee33ca4905360013b9ad0fd2e5e9cc5d --- /dev/null +++ b/packages/binary/transactionparser/transaction_filter.go @@ -0,0 +1,12 @@ +package transactionparser + +import ( + "github.com/iotaledger/goshimmer/packages/binary/transaction" +) + +type TransactionFilter interface { + Filter(tx *transaction.Transaction) + OnAccept(callback func(tx *transaction.Transaction)) + OnReject(callback func(tx *transaction.Transaction)) + Shutdown() +} diff --git a/packages/binary/transactionparser/transactionparser.go b/packages/binary/transactionparser/transactionparser.go new file mode 100644 index 0000000000000000000000000000000000000000..3e937f327aa4abdcd199e20179951b1b5ce40f98 --- /dev/null +++ b/packages/binary/transactionparser/transactionparser.go @@ -0,0 +1,139 @@ +package transactionparser + +import ( + "sync" + + "github.com/iotaledger/goshimmer/packages/binary/transactionparser/builtinfilters" + + "github.com/iotaledger/hive.go/events" + "github.com/iotaledger/hive.go/typeutils" + + "github.com/iotaledger/goshimmer/packages/binary/transaction" +) + +type TransactionParser struct { + bytesFilters []BytesFilter + transactionFilters []TransactionFilter + Events transactionParserEvents + + byteFiltersModified typeutils.AtomicBool + transactionFiltersModified typeutils.AtomicBool + bytesFiltersMutex sync.Mutex + transactionFiltersMutex sync.Mutex +} + +func New() (result *TransactionParser) { + result = &TransactionParser{ + bytesFilters: make([]BytesFilter, 0), + transactionFilters: make([]TransactionFilter, 0), + + Events: transactionParserEvents{ + BytesRejected: events.NewEvent(func(handler interface{}, params ...interface{}) { + handler.(func([]byte))(params[0].([]byte)) + }), + TransactionParsed: events.NewEvent(func(handler interface{}, params ...interface{}) { + handler.(func(*transaction.Transaction))(params[0].(*transaction.Transaction)) + }), + TransactionRejected: events.NewEvent(func(handler interface{}, params ...interface{}) { + handler.(func(*transaction.Transaction))(params[0].(*transaction.Transaction)) + }), + }, + } + + // add builtin filters + result.AddBytesFilter(builtinfilters.NewRecentlySeenBytesFilter()) + result.AddTransactionsFilter(builtinfilters.NewTransactionSignatureFilter()) + result.AddTransactionsFilter(builtinfilters.NewValueTransactionSignatureFilter()) + + return +} + +func (transactionParser *TransactionParser) Parse(transactionBytes []byte) { + transactionParser.setupBytesFilterDataFlow() + transactionParser.setupTransactionsFilterDataFlow() + + transactionParser.bytesFilters[0].Filter(transactionBytes) +} + +func (transactionParser *TransactionParser) AddBytesFilter(filter BytesFilter) { + transactionParser.bytesFiltersMutex.Lock() + transactionParser.bytesFilters = append(transactionParser.bytesFilters, filter) + transactionParser.bytesFiltersMutex.Unlock() + + transactionParser.byteFiltersModified.Set() +} + +func (transactionParser *TransactionParser) AddTransactionsFilter(filter TransactionFilter) { + transactionParser.transactionFiltersMutex.Lock() + transactionParser.transactionFilters = append(transactionParser.transactionFilters, filter) + transactionParser.transactionFiltersMutex.Unlock() + + transactionParser.transactionFiltersModified.Set() +} + +func (transactionParser *TransactionParser) Shutdown() { + transactionParser.bytesFiltersMutex.Lock() + for _, bytesFilter := range transactionParser.bytesFilters { + bytesFilter.Shutdown() + } + transactionParser.bytesFiltersMutex.Unlock() + + transactionParser.transactionFiltersMutex.Lock() + for _, transactionFilter := range transactionParser.transactionFilters { + transactionFilter.Shutdown() + } + transactionParser.transactionFiltersMutex.Unlock() +} + +func (transactionParser *TransactionParser) setupBytesFilterDataFlow() { + if !transactionParser.byteFiltersModified.IsSet() { + return + } + + transactionParser.bytesFiltersMutex.Lock() + if transactionParser.byteFiltersModified.IsSet() { + transactionParser.byteFiltersModified.SetTo(false) + + numberOfBytesFilters := len(transactionParser.bytesFilters) + for i := 0; i < numberOfBytesFilters; i++ { + if i == numberOfBytesFilters-1 { + transactionParser.bytesFilters[i].OnAccept(transactionParser.parseTransaction) + } else { + transactionParser.bytesFilters[i].OnAccept(transactionParser.bytesFilters[i+1].Filter) + } + transactionParser.bytesFilters[i].OnReject(func(bytes []byte) { transactionParser.Events.BytesRejected.Trigger(bytes) }) + } + } + transactionParser.bytesFiltersMutex.Unlock() +} + +func (transactionParser *TransactionParser) setupTransactionsFilterDataFlow() { + if !transactionParser.transactionFiltersModified.IsSet() { + return + } + + transactionParser.transactionFiltersMutex.Lock() + if transactionParser.transactionFiltersModified.IsSet() { + transactionParser.transactionFiltersModified.SetTo(false) + + numberOfTransactionFilters := len(transactionParser.transactionFilters) + for i := 0; i < numberOfTransactionFilters; i++ { + if i == numberOfTransactionFilters-1 { + transactionParser.transactionFilters[i].OnAccept(func(tx *transaction.Transaction) { transactionParser.Events.TransactionParsed.Trigger(tx) }) + } else { + transactionParser.transactionFilters[i].OnAccept(transactionParser.transactionFilters[i+1].Filter) + } + transactionParser.transactionFilters[i].OnReject(func(tx *transaction.Transaction) { transactionParser.Events.TransactionRejected.Trigger(tx) }) + } + } + transactionParser.transactionFiltersMutex.Unlock() +} + +func (transactionParser *TransactionParser) parseTransaction(bytes []byte) { + if parsedTransaction, err := transaction.FromBytes(bytes); err != nil { + // trigger parsingError + panic(err) + } else { + transactionParser.transactionFilters[0].Filter(parsedTransaction) + } +} diff --git a/packages/binary/transactionparser/transactionparser_test.go b/packages/binary/transactionparser/transactionparser_test.go new file mode 100644 index 0000000000000000000000000000000000000000..8043fc59d4b31fce00aa3e221d26d6cc6d536f9b --- /dev/null +++ b/packages/binary/transactionparser/transactionparser_test.go @@ -0,0 +1,56 @@ +package transactionparser + +import ( + "fmt" + "strconv" + "testing" + + "github.com/iotaledger/hive.go/events" + + "github.com/iotaledger/goshimmer/packages/binary/identity" + "github.com/iotaledger/goshimmer/packages/binary/transaction" + "github.com/iotaledger/goshimmer/packages/binary/transaction/payload/data" +) + +func BenchmarkTransactionParser_ParseBytesSame(b *testing.B) { + txBytes := transaction.New(transaction.EmptyId, transaction.EmptyId, identity.Generate(), data.New([]byte("Test"))).GetBytes() + txParser := New() + + b.ResetTimer() + + for i := 0; i < b.N; i++ { + txParser.Parse(txBytes) + } + + txParser.Shutdown() +} + +func BenchmarkTransactionParser_ParseBytesDifferent(b *testing.B) { + transactionBytes := make([][]byte, b.N) + for i := 0; i < b.N; i++ { + transactionBytes[i] = transaction.New(transaction.EmptyId, transaction.EmptyId, identity.Generate(), data.New([]byte("Test"+strconv.Itoa(i)))).GetBytes() + } + + txParser := New() + + b.ResetTimer() + + for i := 0; i < b.N; i++ { + txParser.Parse(transactionBytes[i]) + } + + txParser.Shutdown() +} + +func TestTransactionParser_ParseTransaction(t *testing.T) { + tx := transaction.New(transaction.EmptyId, transaction.EmptyId, identity.Generate(), data.New([]byte("Test"))) + + txParser := New() + txParser.Parse(tx.GetBytes()) + + txParser.Events.TransactionParsed.Attach(events.NewClosure(func(tx *transaction.Transaction) { + fmt.Println("PARSED!!!") + })) + + txParser.Shutdown() +} diff --git a/packages/filter/byte_array_filter.go b/packages/filter/byte_array_filter.go index d8cb89fe010d8571331a2f4dbc0e31b328cbf275..87260362a5179758a02eb0acb5165aa3d62d2bc1 100644 --- a/packages/filter/byte_array_filter.go +++ b/packages/filter/byte_array_filter.go @@ -3,12 +3,13 @@ package filter import ( "sync" + "github.com/iotaledger/goshimmer/packages/binary/types" "github.com/iotaledger/goshimmer/packages/typeutils" ) type ByteArrayFilter struct { byteArrays [][]byte - byteArraysByKey map[string]bool + byteArraysByKey map[string]types.Empty size int mutex sync.RWMutex } @@ -16,7 +17,7 @@ type ByteArrayFilter struct { func NewByteArrayFilter(size int) *ByteArrayFilter { return &ByteArrayFilter{ byteArrays: make([][]byte, 0, size), - byteArraysByKey: make(map[string]bool, size), + byteArraysByKey: make(map[string]types.Empty, size), size: size, } } @@ -45,7 +46,7 @@ func (filter *ByteArrayFilter) Add(byteArray []byte) bool { filter.byteArrays = append(filter.byteArrays, byteArray) } - filter.byteArraysByKey[key] = true + filter.byteArraysByKey[key] = types.Void return true } else {