diff --git a/go.mod b/go.mod index ba050fa6eae140f517bf650b7ab4f4f4198b6d5e..d9a2075bbf2896d0aef6e151671eef33caa40370 100644 --- a/go.mod +++ b/go.mod @@ -36,7 +36,7 @@ require ( github.com/valyala/fasttemplate v1.1.0 // indirect go.uber.org/atomic v1.5.1 go.uber.org/zap v1.13.0 - golang.org/x/crypto v0.0.0-20200128174031-69ecbb4d6d5d // indirect + golang.org/x/crypto v0.0.0-20200128174031-69ecbb4d6d5d golang.org/x/net v0.0.0-20191209160850-c0dbc17a3553 golang.org/x/sys v0.0.0-20200124204421-9fbb57f87de9 // indirect golang.org/x/text v0.3.2 // indirect diff --git a/packages/binary/storageprefix/storageprefix.go b/packages/binary/storageprefix/storageprefix.go index c491a24e5d6adfeae617211e290b148a1f5738b3..1e55f2db25ff3bd2703056c8ea61c0aa048fda84 100644 --- a/packages/binary/storageprefix/storageprefix.go +++ b/packages/binary/storageprefix/storageprefix.go @@ -2,16 +2,16 @@ package storageprefix var ( TangleTransaction = []byte{0} - TangleTransactionMetadata = []byte{6} - TangleApprovers = []byte{1} - TangleMissingTransaction = []byte{7} + TangleTransactionMetadata = []byte{1} + TangleApprovers = []byte{2} + TangleMissingTransaction = []byte{3} - ValueTangleTransferMetadata = []byte{8} - ValueTangleConsumers = []byte{9} - ValueTangleMissingTransfers = []byte{10} + ValueTangleTransferMetadata = []byte{4} + ValueTangleConsumers = []byte{5} + ValueTangleMissingTransfers = []byte{6} - LedgerStateTransferOutput = []byte{2} - LedgerStateTransferOutputBooking = []byte{3} - LedgerStateReality = []byte{4} - LedgerStateConflictSet = []byte{5} + LedgerStateTransferOutput = []byte{7} + LedgerStateTransferOutputBooking = []byte{8} + LedgerStateReality = []byte{9} + LedgerStateConflictSet = []byte{10} ) diff --git a/packages/binary/tangle/transactionparser/builtinfilters/recently_seen_bytes_filter.go b/packages/binary/tangle/transactionparser/builtinfilters/recently_seen_bytes_filter.go new file mode 100644 index 0000000000000000000000000000000000000000..c0a6002f0cdadf0290cbab19057bba3f6c3536d4 --- /dev/null +++ b/packages/binary/tangle/transactionparser/builtinfilters/recently_seen_bytes_filter.go @@ -0,0 +1,68 @@ +package builtinfilters + +import ( + "sync" + + "github.com/iotaledger/hive.go/async" + "github.com/iotaledger/hive.go/bytesfilter" +) + +type RecentlySeenBytesFilter struct { + bytesFilter *bytesfilter.BytesFilter + onAcceptCallback func(bytes []byte) + onRejectCallback func(bytes []byte) + workerPool async.WorkerPool + + onAcceptCallbackMutex sync.RWMutex + onRejectCallbackMutex sync.RWMutex +} + +func NewRecentlySeenBytesFilter() (result *RecentlySeenBytesFilter) { + result = &RecentlySeenBytesFilter{ + bytesFilter: bytesfilter.New(100000), + } + + return +} + +func (filter *RecentlySeenBytesFilter) Filter(bytes []byte) { + filter.workerPool.Submit(func() { + if filter.bytesFilter.Add(bytes) { + filter.getAcceptCallback()(bytes) + } else { + filter.getRejectCallback()(bytes) + } + }) +} + +func (filter *RecentlySeenBytesFilter) OnAccept(callback func(bytes []byte)) { + filter.onAcceptCallbackMutex.Lock() + filter.onAcceptCallback = callback + filter.onAcceptCallbackMutex.Unlock() +} + +func (filter *RecentlySeenBytesFilter) OnReject(callback func(bytes []byte)) { + filter.onRejectCallbackMutex.Lock() + filter.onRejectCallback = callback + filter.onRejectCallbackMutex.Unlock() +} + +func (filter *RecentlySeenBytesFilter) getAcceptCallback() (result func(bytes []byte)) { + filter.onAcceptCallbackMutex.Lock() + result = filter.onAcceptCallback + filter.onAcceptCallbackMutex.Unlock() + + return +} + +func (filter *RecentlySeenBytesFilter) getRejectCallback() (result func(bytes []byte)) { + filter.onRejectCallbackMutex.Lock() + result = filter.onRejectCallback + filter.onRejectCallbackMutex.Unlock() + + return +} + +func (filter *RecentlySeenBytesFilter) Shutdown() { + filter.workerPool.ShutdownGracefully() +} diff --git a/packages/binary/tangle/transactionparser/builtinfilters/transaction_signature_filter.go b/packages/binary/tangle/transactionparser/builtinfilters/transaction_signature_filter.go new file mode 100644 index 0000000000000000000000000000000000000000..9160d821cfa2ff6c1d1946ca34953010f8ddf38b --- /dev/null +++ b/packages/binary/tangle/transactionparser/builtinfilters/transaction_signature_filter.go @@ -0,0 +1,66 @@ +package builtinfilters + +import ( + "sync" + + "github.com/iotaledger/hive.go/async" + + "github.com/iotaledger/goshimmer/packages/binary/tangle/model/transaction" +) + +type TransactionSignatureFilter struct { + onAcceptCallback func(tx *transaction.Transaction) + onRejectCallback func(tx *transaction.Transaction) + workerPool async.WorkerPool + + onAcceptCallbackMutex sync.RWMutex + onRejectCallbackMutex sync.RWMutex +} + +func NewTransactionSignatureFilter() (result *TransactionSignatureFilter) { + result = &TransactionSignatureFilter{} + + return +} + +func (filter *TransactionSignatureFilter) Filter(tx *transaction.Transaction) { + filter.workerPool.Submit(func() { + if tx.VerifySignature() { + filter.getAcceptCallback()(tx) + } else { + filter.getRejectCallback()(tx) + } + }) +} + +func (filter *TransactionSignatureFilter) OnAccept(callback func(tx *transaction.Transaction)) { + filter.onAcceptCallbackMutex.Lock() + filter.onAcceptCallback = callback + filter.onAcceptCallbackMutex.Unlock() +} + +func (filter *TransactionSignatureFilter) OnReject(callback func(tx *transaction.Transaction)) { + filter.onRejectCallbackMutex.Lock() + filter.onRejectCallback = callback + filter.onRejectCallbackMutex.Unlock() +} + +func (filter *TransactionSignatureFilter) Shutdown() { + filter.workerPool.ShutdownGracefully() +} + +func (filter *TransactionSignatureFilter) getAcceptCallback() (result func(tx *transaction.Transaction)) { + filter.onAcceptCallbackMutex.RLock() + result = filter.onAcceptCallback + filter.onAcceptCallbackMutex.RUnlock() + + return +} + +func (filter *TransactionSignatureFilter) getRejectCallback() (result func(tx *transaction.Transaction)) { + filter.onRejectCallbackMutex.RLock() + result = filter.onRejectCallback + filter.onRejectCallbackMutex.RUnlock() + + return +} diff --git a/packages/binary/tangle/transactionparser/bytes_filter.go b/packages/binary/tangle/transactionparser/bytes_filter.go new file mode 100644 index 0000000000000000000000000000000000000000..c8e2bab61a1ff376f860cd97c7dfa640dda9286d --- /dev/null +++ b/packages/binary/tangle/transactionparser/bytes_filter.go @@ -0,0 +1,8 @@ +package transactionparser + +type BytesFilter interface { + Filter(bytes []byte) + OnAccept(callback func(bytes []byte)) + OnReject(callback func(bytes []byte)) + Shutdown() +} diff --git a/packages/binary/tangle/transactionparser/events.go b/packages/binary/tangle/transactionparser/events.go new file mode 100644 index 0000000000000000000000000000000000000000..9bde1a04963fa72ec3f53bd91cd81947e66e886c --- /dev/null +++ b/packages/binary/tangle/transactionparser/events.go @@ -0,0 +1,9 @@ +package transactionparser + +import "github.com/iotaledger/hive.go/events" + +type transactionParserEvents struct { + BytesRejected *events.Event + TransactionParsed *events.Event + TransactionRejected *events.Event +} diff --git a/packages/binary/tangle/transactionparser/transaction_filter.go b/packages/binary/tangle/transactionparser/transaction_filter.go new file mode 100644 index 0000000000000000000000000000000000000000..39dfc277c5932c6f6ce1ee8cca2d525605c04235 --- /dev/null +++ b/packages/binary/tangle/transactionparser/transaction_filter.go @@ -0,0 +1,12 @@ +package transactionparser + +import ( + "github.com/iotaledger/goshimmer/packages/binary/tangle/model/transaction" +) + +type TransactionFilter interface { + Filter(tx *transaction.Transaction) + OnAccept(callback func(tx *transaction.Transaction)) + OnReject(callback func(tx *transaction.Transaction)) + Shutdown() +} diff --git a/packages/binary/tangle/transactionparser/transactionparser.go b/packages/binary/tangle/transactionparser/transactionparser.go new file mode 100644 index 0000000000000000000000000000000000000000..1d5f53660d3b4b3714e26377a686ea1969f4ab82 --- /dev/null +++ b/packages/binary/tangle/transactionparser/transactionparser.go @@ -0,0 +1,137 @@ +package transactionparser + +import ( + "sync" + + "github.com/iotaledger/goshimmer/packages/binary/tangle/model/transaction" + "github.com/iotaledger/goshimmer/packages/binary/tangle/transactionparser/builtinfilters" + + "github.com/iotaledger/hive.go/events" + "github.com/iotaledger/hive.go/typeutils" +) + +type TransactionParser struct { + bytesFilters []BytesFilter + transactionFilters []TransactionFilter + Events transactionParserEvents + + byteFiltersModified typeutils.AtomicBool + transactionFiltersModified typeutils.AtomicBool + bytesFiltersMutex sync.Mutex + transactionFiltersMutex sync.Mutex +} + +func New() (result *TransactionParser) { + result = &TransactionParser{ + bytesFilters: make([]BytesFilter, 0), + transactionFilters: make([]TransactionFilter, 0), + + Events: transactionParserEvents{ + BytesRejected: events.NewEvent(func(handler interface{}, params ...interface{}) { + handler.(func([]byte))(params[0].([]byte)) + }), + TransactionParsed: events.NewEvent(func(handler interface{}, params ...interface{}) { + handler.(func(*transaction.Transaction))(params[0].(*transaction.Transaction)) + }), + TransactionRejected: events.NewEvent(func(handler interface{}, params ...interface{}) { + handler.(func(*transaction.Transaction))(params[0].(*transaction.Transaction)) + }), + }, + } + + // add builtin filters + result.AddBytesFilter(builtinfilters.NewRecentlySeenBytesFilter()) + result.AddTransactionsFilter(builtinfilters.NewTransactionSignatureFilter()) + + return +} + +func (transactionParser *TransactionParser) Parse(transactionBytes []byte) { + transactionParser.setupBytesFilterDataFlow() + transactionParser.setupTransactionsFilterDataFlow() + + transactionParser.bytesFilters[0].Filter(transactionBytes) +} + +func (transactionParser *TransactionParser) AddBytesFilter(filter BytesFilter) { + transactionParser.bytesFiltersMutex.Lock() + transactionParser.bytesFilters = append(transactionParser.bytesFilters, filter) + transactionParser.bytesFiltersMutex.Unlock() + + transactionParser.byteFiltersModified.Set() +} + +func (transactionParser *TransactionParser) AddTransactionsFilter(filter TransactionFilter) { + transactionParser.transactionFiltersMutex.Lock() + transactionParser.transactionFilters = append(transactionParser.transactionFilters, filter) + transactionParser.transactionFiltersMutex.Unlock() + + transactionParser.transactionFiltersModified.Set() +} + +func (transactionParser *TransactionParser) Shutdown() { + transactionParser.bytesFiltersMutex.Lock() + for _, bytesFilter := range transactionParser.bytesFilters { + bytesFilter.Shutdown() + } + transactionParser.bytesFiltersMutex.Unlock() + + transactionParser.transactionFiltersMutex.Lock() + for _, transactionFilter := range transactionParser.transactionFilters { + transactionFilter.Shutdown() + } + transactionParser.transactionFiltersMutex.Unlock() +} + +func (transactionParser *TransactionParser) setupBytesFilterDataFlow() { + if !transactionParser.byteFiltersModified.IsSet() { + return + } + + transactionParser.bytesFiltersMutex.Lock() + if transactionParser.byteFiltersModified.IsSet() { + transactionParser.byteFiltersModified.SetTo(false) + + numberOfBytesFilters := len(transactionParser.bytesFilters) + for i := 0; i < numberOfBytesFilters; i++ { + if i == numberOfBytesFilters-1 { + transactionParser.bytesFilters[i].OnAccept(transactionParser.parseTransaction) + } else { + transactionParser.bytesFilters[i].OnAccept(transactionParser.bytesFilters[i+1].Filter) + } + transactionParser.bytesFilters[i].OnReject(func(bytes []byte) { transactionParser.Events.BytesRejected.Trigger(bytes) }) + } + } + transactionParser.bytesFiltersMutex.Unlock() +} + +func (transactionParser *TransactionParser) setupTransactionsFilterDataFlow() { + if !transactionParser.transactionFiltersModified.IsSet() { + return + } + + transactionParser.transactionFiltersMutex.Lock() + if transactionParser.transactionFiltersModified.IsSet() { + transactionParser.transactionFiltersModified.SetTo(false) + + numberOfTransactionFilters := len(transactionParser.transactionFilters) + for i := 0; i < numberOfTransactionFilters; i++ { + if i == numberOfTransactionFilters-1 { + transactionParser.transactionFilters[i].OnAccept(func(tx *transaction.Transaction) { transactionParser.Events.TransactionParsed.Trigger(tx) }) + } else { + transactionParser.transactionFilters[i].OnAccept(transactionParser.transactionFilters[i+1].Filter) + } + transactionParser.transactionFilters[i].OnReject(func(tx *transaction.Transaction) { transactionParser.Events.TransactionRejected.Trigger(tx) }) + } + } + transactionParser.transactionFiltersMutex.Unlock() +} + +func (transactionParser *TransactionParser) parseTransaction(bytes []byte) { + if parsedTransaction, err := transaction.FromBytes(bytes); err != nil { + // trigger parsingError + panic(err) + } else { + transactionParser.transactionFilters[0].Filter(parsedTransaction) + } +} diff --git a/packages/binary/tangle/transactionparser/transactionparser_test.go b/packages/binary/tangle/transactionparser/transactionparser_test.go new file mode 100644 index 0000000000000000000000000000000000000000..c7dc3b18a91c82260b4196d7794e499cabec13c1 --- /dev/null +++ b/packages/binary/tangle/transactionparser/transactionparser_test.go @@ -0,0 +1,56 @@ +package transactionparser + +import ( + "fmt" + "strconv" + "testing" + + "github.com/iotaledger/hive.go/events" + + "github.com/iotaledger/goshimmer/packages/binary/identity" + "github.com/iotaledger/goshimmer/packages/binary/tangle/model/transaction" + "github.com/iotaledger/goshimmer/packages/binary/tangle/model/transaction/payload/data" +) + +func BenchmarkTransactionParser_ParseBytesSame(b *testing.B) { + txBytes := transaction.New(transaction.EmptyId, transaction.EmptyId, identity.Generate(), data.New([]byte("Test"))).GetBytes() + txParser := New() + + b.ResetTimer() + + for i := 0; i < b.N; i++ { + txParser.Parse(txBytes) + } + + txParser.Shutdown() +} + +func BenchmarkTransactionParser_ParseBytesDifferent(b *testing.B) { + transactionBytes := make([][]byte, b.N) + for i := 0; i < b.N; i++ { + transactionBytes[i] = transaction.New(transaction.EmptyId, transaction.EmptyId, identity.Generate(), data.New([]byte("Test"+strconv.Itoa(i)))).GetBytes() + } + + txParser := New() + + b.ResetTimer() + + for i := 0; i < b.N; i++ { + txParser.Parse(transactionBytes[i]) + } + + txParser.Shutdown() +} + +func TestTransactionParser_ParseTransaction(t *testing.T) { + tx := transaction.New(transaction.EmptyId, transaction.EmptyId, identity.Generate(), data.New([]byte("Test"))) + + txParser := New() + txParser.Parse(tx.GetBytes()) + + txParser.Events.TransactionParsed.Attach(events.NewClosure(func(tx *transaction.Transaction) { + fmt.Println("PARSED!!!") + })) + + txParser.Shutdown() +} diff --git a/packages/binary/tangle/transactionrequester/constants.go b/packages/binary/tangle/transactionrequester/constants.go new file mode 100644 index 0000000000000000000000000000000000000000..b9f59769fe276ba7d9ba7b68267aff806599b3a2 --- /dev/null +++ b/packages/binary/tangle/transactionrequester/constants.go @@ -0,0 +1,10 @@ +package transactionrequester + +import ( + "time" +) + +const ( + DEFAULT_REQUEST_WORKER_COUNT = 1024 + DEFAULT_RETRY_INTERVAL = 10 * time.Second +) diff --git a/packages/binary/tangle/transactionrequester/events.go b/packages/binary/tangle/transactionrequester/events.go new file mode 100644 index 0000000000000000000000000000000000000000..e845d790496fc0c849c9ca0d4c81a7bfa2fefba2 --- /dev/null +++ b/packages/binary/tangle/transactionrequester/events.go @@ -0,0 +1,9 @@ +package transactionrequester + +import ( + "github.com/iotaledger/hive.go/events" +) + +type Events struct { + SendRequest *events.Event +} diff --git a/packages/binary/tangle/transactionrequester/options.go b/packages/binary/tangle/transactionrequester/options.go new file mode 100644 index 0000000000000000000000000000000000000000..05db5db4c3c00448dbc6b6ccc805f109d9194ba0 --- /dev/null +++ b/packages/binary/tangle/transactionrequester/options.go @@ -0,0 +1,37 @@ +package transactionrequester + +import ( + "time" +) + +type Options struct { + retryInterval time.Duration + workerCount int +} + +func newOptions(optionalOptions []Option) *Options { + result := &Options{ + retryInterval: 10 * time.Second, + workerCount: DEFAULT_REQUEST_WORKER_COUNT, + } + + for _, optionalOption := range optionalOptions { + optionalOption(result) + } + + return result +} + +type Option func(*Options) + +func RetryInterval(interval time.Duration) Option { + return func(args *Options) { + args.retryInterval = interval + } +} + +func WorkerCount(workerCount int) Option { + return func(args *Options) { + args.workerCount = workerCount + } +} diff --git a/packages/binary/tangle/transactionrequester/transactionrequester.go b/packages/binary/tangle/transactionrequester/transactionrequester.go new file mode 100644 index 0000000000000000000000000000000000000000..6f036e810abd8f17844ec0bd5f1ffb8c06a3c024 --- /dev/null +++ b/packages/binary/tangle/transactionrequester/transactionrequester.go @@ -0,0 +1,74 @@ +package transactionrequester + +import ( + "sync" + "time" + + "github.com/iotaledger/hive.go/async" + "github.com/iotaledger/hive.go/events" + + "github.com/iotaledger/goshimmer/packages/binary/tangle/model/transaction" +) + +type TransactionRequester struct { + scheduledRequests map[transaction.Id]*time.Timer + requestWorker async.NonBlockingWorkerPool + options *Options + Events Events + + scheduledRequestsMutex sync.RWMutex +} + +func New(optionalOptions ...Option) *TransactionRequester { + requester := &TransactionRequester{ + scheduledRequests: make(map[transaction.Id]*time.Timer), + options: newOptions(optionalOptions), + Events: Events{ + SendRequest: events.NewEvent(func(handler interface{}, params ...interface{}) { + handler.(func(transaction.Id))(params[0].(transaction.Id)) + }), + }, + } + + requester.requestWorker.Tune(requester.options.workerCount) + + return requester +} + +func (requester *TransactionRequester) ScheduleRequest(transactionId transaction.Id) { + var retryRequest func(bool) + retryRequest = func(initialRequest bool) { + requester.requestWorker.Submit(func() { + requester.scheduledRequestsMutex.RLock() + if _, requestExists := requester.scheduledRequests[transactionId]; !initialRequest && !requestExists { + requester.scheduledRequestsMutex.RUnlock() + + return + } + requester.scheduledRequestsMutex.RUnlock() + + requester.Events.SendRequest.Trigger(transactionId) + + requester.scheduledRequestsMutex.Lock() + requester.scheduledRequests[transactionId] = time.AfterFunc(requester.options.retryInterval, func() { retryRequest(false) }) + requester.scheduledRequestsMutex.Unlock() + }) + } + + retryRequest(true) +} + +func (requester *TransactionRequester) StopRequest(transactionId transaction.Id) { + requester.scheduledRequestsMutex.RLock() + if timer, timerExists := requester.scheduledRequests[transactionId]; timerExists { + requester.scheduledRequestsMutex.RUnlock() + + timer.Stop() + + requester.scheduledRequestsMutex.Lock() + delete(requester.scheduledRequests, transactionId) + requester.scheduledRequestsMutex.Unlock() + } else { + requester.scheduledRequestsMutex.RUnlock() + } +}