Skip to content
Snippets Groups Projects
Unverified Commit 69e058fc authored by Hans Moog's avatar Hans Moog Committed by GitHub
Browse files

Adds the transactionParser as a fliter and pre-processing mechanism of the tangle. (#242)

* Feat: started porting the new binary stuff

* Refactor: removed unnecessary folder

* Refactor: cleaned up go.mod files

* Fix: removed objectsdb files

* Feat: added transactionrequester package

* Feat: added new transactionparser as the filter for the tangle
parent fc4b3745
Branches
Tags
No related merge requests found
Showing
with 497 additions and 11 deletions
...@@ -36,7 +36,7 @@ require ( ...@@ -36,7 +36,7 @@ require (
github.com/valyala/fasttemplate v1.1.0 // indirect github.com/valyala/fasttemplate v1.1.0 // indirect
go.uber.org/atomic v1.5.1 go.uber.org/atomic v1.5.1
go.uber.org/zap v1.13.0 go.uber.org/zap v1.13.0
golang.org/x/crypto v0.0.0-20200128174031-69ecbb4d6d5d // indirect golang.org/x/crypto v0.0.0-20200128174031-69ecbb4d6d5d
golang.org/x/net v0.0.0-20191209160850-c0dbc17a3553 golang.org/x/net v0.0.0-20191209160850-c0dbc17a3553
golang.org/x/sys v0.0.0-20200124204421-9fbb57f87de9 // indirect golang.org/x/sys v0.0.0-20200124204421-9fbb57f87de9 // indirect
golang.org/x/text v0.3.2 // indirect golang.org/x/text v0.3.2 // indirect
......
...@@ -2,16 +2,16 @@ package storageprefix ...@@ -2,16 +2,16 @@ package storageprefix
var ( var (
TangleTransaction = []byte{0} TangleTransaction = []byte{0}
TangleTransactionMetadata = []byte{6} TangleTransactionMetadata = []byte{1}
TangleApprovers = []byte{1} TangleApprovers = []byte{2}
TangleMissingTransaction = []byte{7} TangleMissingTransaction = []byte{3}
ValueTangleTransferMetadata = []byte{8} ValueTangleTransferMetadata = []byte{4}
ValueTangleConsumers = []byte{9} ValueTangleConsumers = []byte{5}
ValueTangleMissingTransfers = []byte{10} ValueTangleMissingTransfers = []byte{6}
LedgerStateTransferOutput = []byte{2} LedgerStateTransferOutput = []byte{7}
LedgerStateTransferOutputBooking = []byte{3} LedgerStateTransferOutputBooking = []byte{8}
LedgerStateReality = []byte{4} LedgerStateReality = []byte{9}
LedgerStateConflictSet = []byte{5} LedgerStateConflictSet = []byte{10}
) )
package builtinfilters
import (
"sync"
"github.com/iotaledger/hive.go/async"
"github.com/iotaledger/hive.go/bytesfilter"
)
type RecentlySeenBytesFilter struct {
bytesFilter *bytesfilter.BytesFilter
onAcceptCallback func(bytes []byte)
onRejectCallback func(bytes []byte)
workerPool async.WorkerPool
onAcceptCallbackMutex sync.RWMutex
onRejectCallbackMutex sync.RWMutex
}
func NewRecentlySeenBytesFilter() (result *RecentlySeenBytesFilter) {
result = &RecentlySeenBytesFilter{
bytesFilter: bytesfilter.New(100000),
}
return
}
func (filter *RecentlySeenBytesFilter) Filter(bytes []byte) {
filter.workerPool.Submit(func() {
if filter.bytesFilter.Add(bytes) {
filter.getAcceptCallback()(bytes)
} else {
filter.getRejectCallback()(bytes)
}
})
}
func (filter *RecentlySeenBytesFilter) OnAccept(callback func(bytes []byte)) {
filter.onAcceptCallbackMutex.Lock()
filter.onAcceptCallback = callback
filter.onAcceptCallbackMutex.Unlock()
}
func (filter *RecentlySeenBytesFilter) OnReject(callback func(bytes []byte)) {
filter.onRejectCallbackMutex.Lock()
filter.onRejectCallback = callback
filter.onRejectCallbackMutex.Unlock()
}
func (filter *RecentlySeenBytesFilter) getAcceptCallback() (result func(bytes []byte)) {
filter.onAcceptCallbackMutex.Lock()
result = filter.onAcceptCallback
filter.onAcceptCallbackMutex.Unlock()
return
}
func (filter *RecentlySeenBytesFilter) getRejectCallback() (result func(bytes []byte)) {
filter.onRejectCallbackMutex.Lock()
result = filter.onRejectCallback
filter.onRejectCallbackMutex.Unlock()
return
}
func (filter *RecentlySeenBytesFilter) Shutdown() {
filter.workerPool.ShutdownGracefully()
}
package builtinfilters
import (
"sync"
"github.com/iotaledger/hive.go/async"
"github.com/iotaledger/goshimmer/packages/binary/tangle/model/transaction"
)
type TransactionSignatureFilter struct {
onAcceptCallback func(tx *transaction.Transaction)
onRejectCallback func(tx *transaction.Transaction)
workerPool async.WorkerPool
onAcceptCallbackMutex sync.RWMutex
onRejectCallbackMutex sync.RWMutex
}
func NewTransactionSignatureFilter() (result *TransactionSignatureFilter) {
result = &TransactionSignatureFilter{}
return
}
func (filter *TransactionSignatureFilter) Filter(tx *transaction.Transaction) {
filter.workerPool.Submit(func() {
if tx.VerifySignature() {
filter.getAcceptCallback()(tx)
} else {
filter.getRejectCallback()(tx)
}
})
}
func (filter *TransactionSignatureFilter) OnAccept(callback func(tx *transaction.Transaction)) {
filter.onAcceptCallbackMutex.Lock()
filter.onAcceptCallback = callback
filter.onAcceptCallbackMutex.Unlock()
}
func (filter *TransactionSignatureFilter) OnReject(callback func(tx *transaction.Transaction)) {
filter.onRejectCallbackMutex.Lock()
filter.onRejectCallback = callback
filter.onRejectCallbackMutex.Unlock()
}
func (filter *TransactionSignatureFilter) Shutdown() {
filter.workerPool.ShutdownGracefully()
}
func (filter *TransactionSignatureFilter) getAcceptCallback() (result func(tx *transaction.Transaction)) {
filter.onAcceptCallbackMutex.RLock()
result = filter.onAcceptCallback
filter.onAcceptCallbackMutex.RUnlock()
return
}
func (filter *TransactionSignatureFilter) getRejectCallback() (result func(tx *transaction.Transaction)) {
filter.onRejectCallbackMutex.RLock()
result = filter.onRejectCallback
filter.onRejectCallbackMutex.RUnlock()
return
}
package transactionparser
type BytesFilter interface {
Filter(bytes []byte)
OnAccept(callback func(bytes []byte))
OnReject(callback func(bytes []byte))
Shutdown()
}
package transactionparser
import "github.com/iotaledger/hive.go/events"
type transactionParserEvents struct {
BytesRejected *events.Event
TransactionParsed *events.Event
TransactionRejected *events.Event
}
package transactionparser
import (
"github.com/iotaledger/goshimmer/packages/binary/tangle/model/transaction"
)
type TransactionFilter interface {
Filter(tx *transaction.Transaction)
OnAccept(callback func(tx *transaction.Transaction))
OnReject(callback func(tx *transaction.Transaction))
Shutdown()
}
package transactionparser
import (
"sync"
"github.com/iotaledger/goshimmer/packages/binary/tangle/model/transaction"
"github.com/iotaledger/goshimmer/packages/binary/tangle/transactionparser/builtinfilters"
"github.com/iotaledger/hive.go/events"
"github.com/iotaledger/hive.go/typeutils"
)
type TransactionParser struct {
bytesFilters []BytesFilter
transactionFilters []TransactionFilter
Events transactionParserEvents
byteFiltersModified typeutils.AtomicBool
transactionFiltersModified typeutils.AtomicBool
bytesFiltersMutex sync.Mutex
transactionFiltersMutex sync.Mutex
}
func New() (result *TransactionParser) {
result = &TransactionParser{
bytesFilters: make([]BytesFilter, 0),
transactionFilters: make([]TransactionFilter, 0),
Events: transactionParserEvents{
BytesRejected: events.NewEvent(func(handler interface{}, params ...interface{}) {
handler.(func([]byte))(params[0].([]byte))
}),
TransactionParsed: events.NewEvent(func(handler interface{}, params ...interface{}) {
handler.(func(*transaction.Transaction))(params[0].(*transaction.Transaction))
}),
TransactionRejected: events.NewEvent(func(handler interface{}, params ...interface{}) {
handler.(func(*transaction.Transaction))(params[0].(*transaction.Transaction))
}),
},
}
// add builtin filters
result.AddBytesFilter(builtinfilters.NewRecentlySeenBytesFilter())
result.AddTransactionsFilter(builtinfilters.NewTransactionSignatureFilter())
return
}
func (transactionParser *TransactionParser) Parse(transactionBytes []byte) {
transactionParser.setupBytesFilterDataFlow()
transactionParser.setupTransactionsFilterDataFlow()
transactionParser.bytesFilters[0].Filter(transactionBytes)
}
func (transactionParser *TransactionParser) AddBytesFilter(filter BytesFilter) {
transactionParser.bytesFiltersMutex.Lock()
transactionParser.bytesFilters = append(transactionParser.bytesFilters, filter)
transactionParser.bytesFiltersMutex.Unlock()
transactionParser.byteFiltersModified.Set()
}
func (transactionParser *TransactionParser) AddTransactionsFilter(filter TransactionFilter) {
transactionParser.transactionFiltersMutex.Lock()
transactionParser.transactionFilters = append(transactionParser.transactionFilters, filter)
transactionParser.transactionFiltersMutex.Unlock()
transactionParser.transactionFiltersModified.Set()
}
func (transactionParser *TransactionParser) Shutdown() {
transactionParser.bytesFiltersMutex.Lock()
for _, bytesFilter := range transactionParser.bytesFilters {
bytesFilter.Shutdown()
}
transactionParser.bytesFiltersMutex.Unlock()
transactionParser.transactionFiltersMutex.Lock()
for _, transactionFilter := range transactionParser.transactionFilters {
transactionFilter.Shutdown()
}
transactionParser.transactionFiltersMutex.Unlock()
}
func (transactionParser *TransactionParser) setupBytesFilterDataFlow() {
if !transactionParser.byteFiltersModified.IsSet() {
return
}
transactionParser.bytesFiltersMutex.Lock()
if transactionParser.byteFiltersModified.IsSet() {
transactionParser.byteFiltersModified.SetTo(false)
numberOfBytesFilters := len(transactionParser.bytesFilters)
for i := 0; i < numberOfBytesFilters; i++ {
if i == numberOfBytesFilters-1 {
transactionParser.bytesFilters[i].OnAccept(transactionParser.parseTransaction)
} else {
transactionParser.bytesFilters[i].OnAccept(transactionParser.bytesFilters[i+1].Filter)
}
transactionParser.bytesFilters[i].OnReject(func(bytes []byte) { transactionParser.Events.BytesRejected.Trigger(bytes) })
}
}
transactionParser.bytesFiltersMutex.Unlock()
}
func (transactionParser *TransactionParser) setupTransactionsFilterDataFlow() {
if !transactionParser.transactionFiltersModified.IsSet() {
return
}
transactionParser.transactionFiltersMutex.Lock()
if transactionParser.transactionFiltersModified.IsSet() {
transactionParser.transactionFiltersModified.SetTo(false)
numberOfTransactionFilters := len(transactionParser.transactionFilters)
for i := 0; i < numberOfTransactionFilters; i++ {
if i == numberOfTransactionFilters-1 {
transactionParser.transactionFilters[i].OnAccept(func(tx *transaction.Transaction) { transactionParser.Events.TransactionParsed.Trigger(tx) })
} else {
transactionParser.transactionFilters[i].OnAccept(transactionParser.transactionFilters[i+1].Filter)
}
transactionParser.transactionFilters[i].OnReject(func(tx *transaction.Transaction) { transactionParser.Events.TransactionRejected.Trigger(tx) })
}
}
transactionParser.transactionFiltersMutex.Unlock()
}
func (transactionParser *TransactionParser) parseTransaction(bytes []byte) {
if parsedTransaction, err := transaction.FromBytes(bytes); err != nil {
// trigger parsingError
panic(err)
} else {
transactionParser.transactionFilters[0].Filter(parsedTransaction)
}
}
package transactionparser
import (
"fmt"
"strconv"
"testing"
"github.com/iotaledger/hive.go/events"
"github.com/iotaledger/goshimmer/packages/binary/identity"
"github.com/iotaledger/goshimmer/packages/binary/tangle/model/transaction"
"github.com/iotaledger/goshimmer/packages/binary/tangle/model/transaction/payload/data"
)
func BenchmarkTransactionParser_ParseBytesSame(b *testing.B) {
txBytes := transaction.New(transaction.EmptyId, transaction.EmptyId, identity.Generate(), data.New([]byte("Test"))).GetBytes()
txParser := New()
b.ResetTimer()
for i := 0; i < b.N; i++ {
txParser.Parse(txBytes)
}
txParser.Shutdown()
}
func BenchmarkTransactionParser_ParseBytesDifferent(b *testing.B) {
transactionBytes := make([][]byte, b.N)
for i := 0; i < b.N; i++ {
transactionBytes[i] = transaction.New(transaction.EmptyId, transaction.EmptyId, identity.Generate(), data.New([]byte("Test"+strconv.Itoa(i)))).GetBytes()
}
txParser := New()
b.ResetTimer()
for i := 0; i < b.N; i++ {
txParser.Parse(transactionBytes[i])
}
txParser.Shutdown()
}
func TestTransactionParser_ParseTransaction(t *testing.T) {
tx := transaction.New(transaction.EmptyId, transaction.EmptyId, identity.Generate(), data.New([]byte("Test")))
txParser := New()
txParser.Parse(tx.GetBytes())
txParser.Events.TransactionParsed.Attach(events.NewClosure(func(tx *transaction.Transaction) {
fmt.Println("PARSED!!!")
}))
txParser.Shutdown()
}
package transactionrequester
import (
"time"
)
const (
DEFAULT_REQUEST_WORKER_COUNT = 1024
DEFAULT_RETRY_INTERVAL = 10 * time.Second
)
package transactionrequester
import (
"github.com/iotaledger/hive.go/events"
)
type Events struct {
SendRequest *events.Event
}
package transactionrequester
import (
"time"
)
type Options struct {
retryInterval time.Duration
workerCount int
}
func newOptions(optionalOptions []Option) *Options {
result := &Options{
retryInterval: 10 * time.Second,
workerCount: DEFAULT_REQUEST_WORKER_COUNT,
}
for _, optionalOption := range optionalOptions {
optionalOption(result)
}
return result
}
type Option func(*Options)
func RetryInterval(interval time.Duration) Option {
return func(args *Options) {
args.retryInterval = interval
}
}
func WorkerCount(workerCount int) Option {
return func(args *Options) {
args.workerCount = workerCount
}
}
package transactionrequester
import (
"sync"
"time"
"github.com/iotaledger/hive.go/async"
"github.com/iotaledger/hive.go/events"
"github.com/iotaledger/goshimmer/packages/binary/tangle/model/transaction"
)
type TransactionRequester struct {
scheduledRequests map[transaction.Id]*time.Timer
requestWorker async.NonBlockingWorkerPool
options *Options
Events Events
scheduledRequestsMutex sync.RWMutex
}
func New(optionalOptions ...Option) *TransactionRequester {
requester := &TransactionRequester{
scheduledRequests: make(map[transaction.Id]*time.Timer),
options: newOptions(optionalOptions),
Events: Events{
SendRequest: events.NewEvent(func(handler interface{}, params ...interface{}) {
handler.(func(transaction.Id))(params[0].(transaction.Id))
}),
},
}
requester.requestWorker.Tune(requester.options.workerCount)
return requester
}
func (requester *TransactionRequester) ScheduleRequest(transactionId transaction.Id) {
var retryRequest func(bool)
retryRequest = func(initialRequest bool) {
requester.requestWorker.Submit(func() {
requester.scheduledRequestsMutex.RLock()
if _, requestExists := requester.scheduledRequests[transactionId]; !initialRequest && !requestExists {
requester.scheduledRequestsMutex.RUnlock()
return
}
requester.scheduledRequestsMutex.RUnlock()
requester.Events.SendRequest.Trigger(transactionId)
requester.scheduledRequestsMutex.Lock()
requester.scheduledRequests[transactionId] = time.AfterFunc(requester.options.retryInterval, func() { retryRequest(false) })
requester.scheduledRequestsMutex.Unlock()
})
}
retryRequest(true)
}
func (requester *TransactionRequester) StopRequest(transactionId transaction.Id) {
requester.scheduledRequestsMutex.RLock()
if timer, timerExists := requester.scheduledRequests[transactionId]; timerExists {
requester.scheduledRequestsMutex.RUnlock()
timer.Stop()
requester.scheduledRequestsMutex.Lock()
delete(requester.scheduledRequests, transactionId)
requester.scheduledRequestsMutex.Unlock()
} else {
requester.scheduledRequestsMutex.RUnlock()
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment