Skip to content
Snippets Groups Projects
Commit 8ac150ef authored by Hans Moog's avatar Hans Moog
Browse files

Feat: added a transactionparser module

To separate the logic of validating transactions from the rest of the code (the tangle + ledger) and further modularize the codebase, I have introduced a separate module that supports the filtering of transactions as early as possible. It supports to be "extended" by additional filters that can the be used to implement the mana based rate control.
parent 1bfec132
No related branches found
No related tags found
No related merge requests found
Showing
with 557 additions and 5 deletions
package bytesfilter
import (
"sync"
"github.com/iotaledger/hive.go/typeutils"
"github.com/iotaledger/goshimmer/packages/binary/types"
)
type BytesFilter struct {
byteArrays [][]byte
bytesByKey map[string]types.Empty
size int
mutex sync.RWMutex
}
func New(size int) *BytesFilter {
return &BytesFilter{
byteArrays: make([][]byte, 0, size),
bytesByKey: make(map[string]types.Empty, size),
size: size,
}
}
func (bytesFilter *BytesFilter) Add(bytes []byte) bool {
key := typeutils.BytesToString(bytes)
bytesFilter.mutex.Lock()
if _, exists := bytesFilter.bytesByKey[key]; !exists {
if len(bytesFilter.byteArrays) == bytesFilter.size {
delete(bytesFilter.bytesByKey, typeutils.BytesToString(bytesFilter.byteArrays[0]))
bytesFilter.byteArrays = append(bytesFilter.byteArrays[1:], bytes)
} else {
bytesFilter.byteArrays = append(bytesFilter.byteArrays, bytes)
}
bytesFilter.bytesByKey[key] = types.Void
bytesFilter.mutex.Unlock()
return true
} else {
bytesFilter.mutex.Unlock()
return false
}
}
func (bytesFilter *BytesFilter) Contains(byteArray []byte) (exists bool) {
bytesFilter.mutex.RLock()
_, exists = bytesFilter.bytesByKey[typeutils.BytesToString(byteArray)]
bytesFilter.mutex.RUnlock()
return
}
package bytesfilter
import (
"testing"
)
func BenchmarkAdd(b *testing.B) {
filter, bytesFilter := setupTest(15000, 1604)
b.ResetTimer()
for i := 0; i < b.N; i++ {
filter.Add(bytesFilter)
}
}
func BenchmarkContains(b *testing.B) {
filter, bytesFilter := setupTest(15000, 1604)
b.ResetTimer()
for i := 0; i < b.N; i++ {
filter.Contains(bytesFilter)
}
}
func setupTest(filterSize int, byteArraySize int) (*BytesFilter, []byte) {
filter := New(filterSize)
for j := 0; j < filterSize; j++ {
byteArray := make([]byte, byteArraySize)
for i := 0; i < len(byteArray); i++ {
byteArray[(i+j)%byteArraySize] = byte((i + j) % 128)
}
filter.Add(byteArray)
}
byteArray := make([]byte, byteArraySize)
for i := 0; i < len(byteArray); i++ {
byteArray[i] = byte(i % 128)
}
return filter, byteArray
}
...@@ -5,5 +5,7 @@ import ( ...@@ -5,5 +5,7 @@ import (
) )
type tangleEvents struct { type tangleEvents struct {
TransactionSolid *events.Event TransactionSolid *events.Event
TransactionAttached *events.Event
Error *events.Event
} }
...@@ -5,9 +5,11 @@ import ( ...@@ -5,9 +5,11 @@ import (
"github.com/iotaledger/goshimmer/packages/binary/transaction" "github.com/iotaledger/goshimmer/packages/binary/transaction"
"github.com/iotaledger/goshimmer/packages/binary/transactionmetadata" "github.com/iotaledger/goshimmer/packages/binary/transactionmetadata"
"github.com/iotaledger/goshimmer/packages/storageprefix" "github.com/iotaledger/goshimmer/packages/storageprefix"
"github.com/iotaledger/goshimmer/packages/stringify"
"github.com/iotaledger/hive.go/async" "github.com/iotaledger/hive.go/async"
"github.com/iotaledger/hive.go/events" "github.com/iotaledger/hive.go/events"
"github.com/iotaledger/hive.go/objectstorage" "github.com/iotaledger/hive.go/objectstorage"
"github.com/pkg/errors"
) )
type Tangle struct { type Tangle struct {
...@@ -29,6 +31,15 @@ func New(storageId []byte) (result *Tangle) { ...@@ -29,6 +31,15 @@ func New(storageId []byte) (result *Tangle) {
approversStorage: objectstorage.New(append(storageId, storageprefix.TangleApprovers...), approversFactory), approversStorage: objectstorage.New(append(storageId, storageprefix.TangleApprovers...), approversFactory),
Events: tangleEvents{ Events: tangleEvents{
TransactionAttached: events.NewEvent(func(handler interface{}, params ...interface{}) {
cachedTransaction := params[0].(*objectstorage.CachedObject)
cachedTransactionMetadata := params[1].(*objectstorage.CachedObject)
cachedTransaction.RegisterConsumer()
cachedTransactionMetadata.RegisterConsumer()
handler.(func(*objectstorage.CachedObject, *objectstorage.CachedObject))(cachedTransaction, cachedTransactionMetadata)
}),
TransactionSolid: events.NewEvent(func(handler interface{}, params ...interface{}) { TransactionSolid: events.NewEvent(func(handler interface{}, params ...interface{}) {
cachedTransaction := params[0].(*objectstorage.CachedObject) cachedTransaction := params[0].(*objectstorage.CachedObject)
cachedTransactionMetadata := params[1].(*objectstorage.CachedObject) cachedTransactionMetadata := params[1].(*objectstorage.CachedObject)
...@@ -38,6 +49,9 @@ func New(storageId []byte) (result *Tangle) { ...@@ -38,6 +49,9 @@ func New(storageId []byte) (result *Tangle) {
handler.(func(*objectstorage.CachedObject, *objectstorage.CachedObject))(cachedTransaction, cachedTransactionMetadata) handler.(func(*objectstorage.CachedObject, *objectstorage.CachedObject))(cachedTransaction, cachedTransactionMetadata)
}), }),
Error: events.NewEvent(func(handler interface{}, params ...interface{}) {
handler.(func(error))(params[0].(error))
}),
}, },
} }
...@@ -80,7 +94,7 @@ func (tangle *Tangle) GetApprovers(transactionId transaction.Id) *objectstorage. ...@@ -80,7 +94,7 @@ func (tangle *Tangle) GetApprovers(transactionId transaction.Id) *objectstorage.
func (tangle *Tangle) verifyTransaction(transaction *transaction.Transaction) { func (tangle *Tangle) verifyTransaction(transaction *transaction.Transaction) {
if !transaction.VerifySignature() { if !transaction.VerifySignature() {
// err = errors.New("transaction has invalid signature") tangle.Events.Error.Trigger(errors.New("transaction with id " + stringify.Interface(transaction.GetId()) + " has an invalid signature"))
return return
} }
......
package builtinfilters
import (
"sync"
"github.com/iotaledger/hive.go/async"
"github.com/iotaledger/goshimmer/packages/binary/bytesfilter"
)
type RecentlySeenBytesFilter struct {
bytesFilter *bytesfilter.BytesFilter
onAcceptCallback func(bytes []byte)
onRejectCallback func(bytes []byte)
workerPool async.WorkerPool
onAcceptCallbackMutex sync.RWMutex
onRejectCallbackMutex sync.RWMutex
}
func NewRecentlySeenBytesFilter() (result *RecentlySeenBytesFilter) {
result = &RecentlySeenBytesFilter{
bytesFilter: bytesfilter.New(100000),
}
return
}
func (filter *RecentlySeenBytesFilter) Filter(bytes []byte) {
filter.workerPool.Submit(func() {
if filter.bytesFilter.Add(bytes) {
filter.getAcceptCallback()(bytes)
} else {
filter.getRejectCallback()(bytes)
}
})
}
func (filter *RecentlySeenBytesFilter) OnAccept(callback func(bytes []byte)) {
filter.onAcceptCallbackMutex.Lock()
filter.onAcceptCallback = callback
filter.onAcceptCallbackMutex.Unlock()
}
func (filter *RecentlySeenBytesFilter) OnReject(callback func(bytes []byte)) {
filter.onRejectCallbackMutex.Lock()
filter.onRejectCallback = callback
filter.onRejectCallbackMutex.Unlock()
}
func (filter *RecentlySeenBytesFilter) getAcceptCallback() (result func(bytes []byte)) {
filter.onAcceptCallbackMutex.Lock()
result = filter.onAcceptCallback
filter.onAcceptCallbackMutex.Unlock()
return
}
func (filter *RecentlySeenBytesFilter) getRejectCallback() (result func(bytes []byte)) {
filter.onRejectCallbackMutex.Lock()
result = filter.onRejectCallback
filter.onRejectCallbackMutex.Unlock()
return
}
func (filter *RecentlySeenBytesFilter) Shutdown() {
filter.workerPool.ShutdownGracefully()
}
package builtinfilters
import (
"sync"
"github.com/iotaledger/goshimmer/packages/binary/transaction"
"github.com/iotaledger/hive.go/async"
)
type TransactionSignatureFilter struct {
onAcceptCallback func(tx *transaction.Transaction)
onRejectCallback func(tx *transaction.Transaction)
workerPool async.WorkerPool
onAcceptCallbackMutex sync.RWMutex
onRejectCallbackMutex sync.RWMutex
}
func NewTransactionSignatureFilter() (result *TransactionSignatureFilter) {
result = &TransactionSignatureFilter{}
return
}
func (filter *TransactionSignatureFilter) Filter(tx *transaction.Transaction) {
filter.workerPool.Submit(func() {
if tx.VerifySignature() {
filter.getAcceptCallback()(tx)
} else {
filter.getRejectCallback()(tx)
}
})
}
func (filter *TransactionSignatureFilter) OnAccept(callback func(tx *transaction.Transaction)) {
filter.onAcceptCallbackMutex.Lock()
filter.onAcceptCallback = callback
filter.onAcceptCallbackMutex.Unlock()
}
func (filter *TransactionSignatureFilter) OnReject(callback func(tx *transaction.Transaction)) {
filter.onRejectCallbackMutex.Lock()
filter.onRejectCallback = callback
filter.onRejectCallbackMutex.Unlock()
}
func (filter *TransactionSignatureFilter) Shutdown() {
filter.workerPool.ShutdownGracefully()
}
func (filter *TransactionSignatureFilter) getAcceptCallback() (result func(tx *transaction.Transaction)) {
filter.onAcceptCallbackMutex.RLock()
result = filter.onAcceptCallback
filter.onAcceptCallbackMutex.RUnlock()
return
}
func (filter *TransactionSignatureFilter) getRejectCallback() (result func(tx *transaction.Transaction)) {
filter.onRejectCallbackMutex.RLock()
result = filter.onRejectCallback
filter.onRejectCallbackMutex.RUnlock()
return
}
package builtinfilters
import (
"sync"
"github.com/iotaledger/goshimmer/packages/binary/transaction/payload/valuetransfer"
"github.com/iotaledger/goshimmer/packages/binary/transaction"
"github.com/iotaledger/hive.go/async"
)
type ValueTransactionSignatureFilter struct {
onAcceptCallback func(tx *transaction.Transaction)
onRejectCallback func(tx *transaction.Transaction)
workerPool async.WorkerPool
onAcceptCallbackMutex sync.RWMutex
onRejectCallbackMutex sync.RWMutex
}
func NewValueTransactionSignatureFilter() (result *ValueTransactionSignatureFilter) {
result = &ValueTransactionSignatureFilter{}
return
}
func (filter *ValueTransactionSignatureFilter) Filter(tx *transaction.Transaction) {
filter.workerPool.Submit(func() {
if payload := tx.GetPayload(); payload.GetType() == valuetransfer.Type {
if valueTransfer, ok := payload.(*valuetransfer.ValueTransfer); ok && valueTransfer.VerifySignatures() {
filter.getAcceptCallback()(tx)
} else {
filter.getRejectCallback()(tx)
}
} else {
filter.getAcceptCallback()(tx)
}
})
}
func (filter *ValueTransactionSignatureFilter) OnAccept(callback func(tx *transaction.Transaction)) {
filter.onAcceptCallbackMutex.Lock()
filter.onAcceptCallback = callback
filter.onAcceptCallbackMutex.Unlock()
}
func (filter *ValueTransactionSignatureFilter) OnReject(callback func(tx *transaction.Transaction)) {
filter.onRejectCallbackMutex.Lock()
filter.onRejectCallback = callback
filter.onRejectCallbackMutex.Unlock()
}
func (filter *ValueTransactionSignatureFilter) Shutdown() {
filter.workerPool.ShutdownGracefully()
}
func (filter *ValueTransactionSignatureFilter) getAcceptCallback() (result func(tx *transaction.Transaction)) {
filter.onAcceptCallbackMutex.RLock()
result = filter.onAcceptCallback
filter.onAcceptCallbackMutex.RUnlock()
return
}
func (filter *ValueTransactionSignatureFilter) getRejectCallback() (result func(tx *transaction.Transaction)) {
filter.onRejectCallbackMutex.RLock()
result = filter.onRejectCallback
filter.onRejectCallbackMutex.RUnlock()
return
}
package transactionparser
type BytesFilter interface {
Filter(bytes []byte)
OnAccept(callback func(bytes []byte))
OnReject(callback func(bytes []byte))
Shutdown()
}
package transactionparser
import "github.com/iotaledger/hive.go/events"
type transactionParserEvents struct {
BytesRejected *events.Event
TransactionParsed *events.Event
TransactionRejected *events.Event
}
package transactionparser
import (
"github.com/iotaledger/goshimmer/packages/binary/transaction"
)
type TransactionFilter interface {
Filter(tx *transaction.Transaction)
OnAccept(callback func(tx *transaction.Transaction))
OnReject(callback func(tx *transaction.Transaction))
Shutdown()
}
package transactionparser
import (
"sync"
"github.com/iotaledger/goshimmer/packages/binary/transactionparser/builtinfilters"
"github.com/iotaledger/hive.go/events"
"github.com/iotaledger/hive.go/typeutils"
"github.com/iotaledger/goshimmer/packages/binary/transaction"
)
type TransactionParser struct {
bytesFilters []BytesFilter
transactionFilters []TransactionFilter
Events transactionParserEvents
byteFiltersModified typeutils.AtomicBool
transactionFiltersModified typeutils.AtomicBool
bytesFiltersMutex sync.Mutex
transactionFiltersMutex sync.Mutex
}
func New() (result *TransactionParser) {
result = &TransactionParser{
bytesFilters: make([]BytesFilter, 0),
transactionFilters: make([]TransactionFilter, 0),
Events: transactionParserEvents{
BytesRejected: events.NewEvent(func(handler interface{}, params ...interface{}) {
handler.(func([]byte))(params[0].([]byte))
}),
TransactionParsed: events.NewEvent(func(handler interface{}, params ...interface{}) {
handler.(func(*transaction.Transaction))(params[0].(*transaction.Transaction))
}),
TransactionRejected: events.NewEvent(func(handler interface{}, params ...interface{}) {
handler.(func(*transaction.Transaction))(params[0].(*transaction.Transaction))
}),
},
}
// add builtin filters
result.AddBytesFilter(builtinfilters.NewRecentlySeenBytesFilter())
result.AddTransactionsFilter(builtinfilters.NewTransactionSignatureFilter())
result.AddTransactionsFilter(builtinfilters.NewValueTransactionSignatureFilter())
return
}
func (transactionParser *TransactionParser) Parse(transactionBytes []byte) {
transactionParser.setupBytesFilterDataFlow()
transactionParser.setupTransactionsFilterDataFlow()
transactionParser.bytesFilters[0].Filter(transactionBytes)
}
func (transactionParser *TransactionParser) AddBytesFilter(filter BytesFilter) {
transactionParser.bytesFiltersMutex.Lock()
transactionParser.bytesFilters = append(transactionParser.bytesFilters, filter)
transactionParser.bytesFiltersMutex.Unlock()
transactionParser.byteFiltersModified.Set()
}
func (transactionParser *TransactionParser) AddTransactionsFilter(filter TransactionFilter) {
transactionParser.transactionFiltersMutex.Lock()
transactionParser.transactionFilters = append(transactionParser.transactionFilters, filter)
transactionParser.transactionFiltersMutex.Unlock()
transactionParser.transactionFiltersModified.Set()
}
func (transactionParser *TransactionParser) Shutdown() {
transactionParser.bytesFiltersMutex.Lock()
for _, bytesFilter := range transactionParser.bytesFilters {
bytesFilter.Shutdown()
}
transactionParser.bytesFiltersMutex.Unlock()
transactionParser.transactionFiltersMutex.Lock()
for _, transactionFilter := range transactionParser.transactionFilters {
transactionFilter.Shutdown()
}
transactionParser.transactionFiltersMutex.Unlock()
}
func (transactionParser *TransactionParser) setupBytesFilterDataFlow() {
if !transactionParser.byteFiltersModified.IsSet() {
return
}
transactionParser.bytesFiltersMutex.Lock()
if transactionParser.byteFiltersModified.IsSet() {
transactionParser.byteFiltersModified.SetTo(false)
numberOfBytesFilters := len(transactionParser.bytesFilters)
for i := 0; i < numberOfBytesFilters; i++ {
if i == numberOfBytesFilters-1 {
transactionParser.bytesFilters[i].OnAccept(transactionParser.parseTransaction)
} else {
transactionParser.bytesFilters[i].OnAccept(transactionParser.bytesFilters[i+1].Filter)
}
transactionParser.bytesFilters[i].OnReject(func(bytes []byte) { transactionParser.Events.BytesRejected.Trigger(bytes) })
}
}
transactionParser.bytesFiltersMutex.Unlock()
}
func (transactionParser *TransactionParser) setupTransactionsFilterDataFlow() {
if !transactionParser.transactionFiltersModified.IsSet() {
return
}
transactionParser.transactionFiltersMutex.Lock()
if transactionParser.transactionFiltersModified.IsSet() {
transactionParser.transactionFiltersModified.SetTo(false)
numberOfTransactionFilters := len(transactionParser.transactionFilters)
for i := 0; i < numberOfTransactionFilters; i++ {
if i == numberOfTransactionFilters-1 {
transactionParser.transactionFilters[i].OnAccept(func(tx *transaction.Transaction) { transactionParser.Events.TransactionParsed.Trigger(tx) })
} else {
transactionParser.transactionFilters[i].OnAccept(transactionParser.transactionFilters[i+1].Filter)
}
transactionParser.transactionFilters[i].OnReject(func(tx *transaction.Transaction) { transactionParser.Events.TransactionRejected.Trigger(tx) })
}
}
transactionParser.transactionFiltersMutex.Unlock()
}
func (transactionParser *TransactionParser) parseTransaction(bytes []byte) {
if parsedTransaction, err := transaction.FromBytes(bytes); err != nil {
// trigger parsingError
panic(err)
} else {
transactionParser.transactionFilters[0].Filter(parsedTransaction)
}
}
package transactionparser
import (
"fmt"
"strconv"
"testing"
"github.com/iotaledger/hive.go/events"
"github.com/iotaledger/goshimmer/packages/binary/identity"
"github.com/iotaledger/goshimmer/packages/binary/transaction"
"github.com/iotaledger/goshimmer/packages/binary/transaction/payload/data"
)
func BenchmarkTransactionParser_ParseBytesSame(b *testing.B) {
txBytes := transaction.New(transaction.EmptyId, transaction.EmptyId, identity.Generate(), data.New([]byte("Test"))).GetBytes()
txParser := New()
b.ResetTimer()
for i := 0; i < b.N; i++ {
txParser.Parse(txBytes)
}
txParser.Shutdown()
}
func BenchmarkTransactionParser_ParseBytesDifferent(b *testing.B) {
transactionBytes := make([][]byte, b.N)
for i := 0; i < b.N; i++ {
transactionBytes[i] = transaction.New(transaction.EmptyId, transaction.EmptyId, identity.Generate(), data.New([]byte("Test"+strconv.Itoa(i)))).GetBytes()
}
txParser := New()
b.ResetTimer()
for i := 0; i < b.N; i++ {
txParser.Parse(transactionBytes[i])
}
txParser.Shutdown()
}
func TestTransactionParser_ParseTransaction(t *testing.T) {
tx := transaction.New(transaction.EmptyId, transaction.EmptyId, identity.Generate(), data.New([]byte("Test")))
txParser := New()
txParser.Parse(tx.GetBytes())
txParser.Events.TransactionParsed.Attach(events.NewClosure(func(tx *transaction.Transaction) {
fmt.Println("PARSED!!!")
}))
txParser.Shutdown()
}
...@@ -3,12 +3,13 @@ package filter ...@@ -3,12 +3,13 @@ package filter
import ( import (
"sync" "sync"
"github.com/iotaledger/goshimmer/packages/binary/types"
"github.com/iotaledger/goshimmer/packages/typeutils" "github.com/iotaledger/goshimmer/packages/typeutils"
) )
type ByteArrayFilter struct { type ByteArrayFilter struct {
byteArrays [][]byte byteArrays [][]byte
byteArraysByKey map[string]bool byteArraysByKey map[string]types.Empty
size int size int
mutex sync.RWMutex mutex sync.RWMutex
} }
...@@ -16,7 +17,7 @@ type ByteArrayFilter struct { ...@@ -16,7 +17,7 @@ type ByteArrayFilter struct {
func NewByteArrayFilter(size int) *ByteArrayFilter { func NewByteArrayFilter(size int) *ByteArrayFilter {
return &ByteArrayFilter{ return &ByteArrayFilter{
byteArrays: make([][]byte, 0, size), byteArrays: make([][]byte, 0, size),
byteArraysByKey: make(map[string]bool, size), byteArraysByKey: make(map[string]types.Empty, size),
size: size, size: size,
} }
} }
...@@ -45,7 +46,7 @@ func (filter *ByteArrayFilter) Add(byteArray []byte) bool { ...@@ -45,7 +46,7 @@ func (filter *ByteArrayFilter) Add(byteArray []byte) bool {
filter.byteArrays = append(filter.byteArrays, byteArray) filter.byteArrays = append(filter.byteArrays, byteArray)
} }
filter.byteArraysByKey[key] = true filter.byteArraysByKey[key] = types.Void
return true return true
} else { } else {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment