diff --git a/packages/binary/spammer/spammer.go b/packages/binary/spammer/spammer.go index a6636c74fd89625bc1fe595f1e73f61a9cf7ef0f..12d92d4c22a0f9a762f0d69a1cb4255d3169cc1d 100644 --- a/packages/binary/spammer/spammer.go +++ b/packages/binary/spammer/spammer.go @@ -77,6 +77,7 @@ func (spammer *Spammer) run(tps int) { trunkTransactionId, branchTransactionId := spammer.tipSelector.GetTips() spammer.transactionParser.Parse( transaction.New(trunkTransactionId, branchTransactionId, identity.Generate(), data.New([]byte("SPAM"))).GetBytes(), + nil, ) currentSentCounter++ @@ -119,7 +120,7 @@ func (spammer *Spammer) sendBurst(transactions int) { return default: - spammer.transactionParser.Parse(burstBuffer[i]) + spammer.transactionParser.Parse(burstBuffer[i], nil) } } } diff --git a/packages/binary/tangle/transactionparser/builtinfilters/recently_seen_bytes_filter.go b/packages/binary/tangle/transactionparser/builtinfilters/recently_seen_bytes_filter.go index b35a2c9ff3ef44498bd810ec089c7aa57606c16c..8dec31ea41c4575574c161869586d5fbaffa50d7 100644 --- a/packages/binary/tangle/transactionparser/builtinfilters/recently_seen_bytes_filter.go +++ b/packages/binary/tangle/transactionparser/builtinfilters/recently_seen_bytes_filter.go @@ -5,6 +5,7 @@ import ( "sync" "github.com/iotaledger/hive.go/async" + "github.com/iotaledger/hive.go/autopeering/peer" "github.com/iotaledger/hive.go/bytesfilter" ) @@ -12,7 +13,7 @@ var ErrReceivedDuplicateBytes = fmt.Errorf("received duplicate bytes") type RecentlySeenBytesFilter struct { bytesFilter *bytesfilter.BytesFilter - onAcceptCallback func(bytes []byte) + onAcceptCallback func(bytes []byte, peer *peer.Peer) onRejectCallback func(bytes []byte, err error) workerPool async.WorkerPool @@ -28,17 +29,17 @@ func NewRecentlySeenBytesFilter() (result *RecentlySeenBytesFilter) { return } -func (filter *RecentlySeenBytesFilter) Filter(bytes []byte) { +func (filter *RecentlySeenBytesFilter) Filter(bytes []byte, peer *peer.Peer) { filter.workerPool.Submit(func() { if filter.bytesFilter.Add(bytes) { - filter.getAcceptCallback()(bytes) + filter.getAcceptCallback()(bytes, peer) } else { filter.getRejectCallback()(bytes, ErrReceivedDuplicateBytes) } }) } -func (filter *RecentlySeenBytesFilter) OnAccept(callback func(bytes []byte)) { +func (filter *RecentlySeenBytesFilter) OnAccept(callback func(bytes []byte, peer *peer.Peer)) { filter.onAcceptCallbackMutex.Lock() filter.onAcceptCallback = callback filter.onAcceptCallbackMutex.Unlock() @@ -50,7 +51,7 @@ func (filter *RecentlySeenBytesFilter) OnReject(callback func(bytes []byte, err filter.onRejectCallbackMutex.Unlock() } -func (filter *RecentlySeenBytesFilter) getAcceptCallback() (result func(bytes []byte)) { +func (filter *RecentlySeenBytesFilter) getAcceptCallback() (result func(bytes []byte, peer *peer.Peer)) { filter.onAcceptCallbackMutex.Lock() result = filter.onAcceptCallback filter.onAcceptCallbackMutex.Unlock() diff --git a/packages/binary/tangle/transactionparser/builtinfilters/transaction_signature_filter.go b/packages/binary/tangle/transactionparser/builtinfilters/transaction_signature_filter.go index 6dccd3f9254ce12f58b18f18ce6a00d1dc11bf7e..469864ba8b8f7f529df9c8e5db4911f73349b82b 100644 --- a/packages/binary/tangle/transactionparser/builtinfilters/transaction_signature_filter.go +++ b/packages/binary/tangle/transactionparser/builtinfilters/transaction_signature_filter.go @@ -5,6 +5,7 @@ import ( "sync" "github.com/iotaledger/hive.go/async" + "github.com/iotaledger/hive.go/autopeering/peer" "github.com/iotaledger/goshimmer/packages/binary/tangle/model/transaction" ) @@ -12,7 +13,7 @@ import ( var ErrInvalidSignature = fmt.Errorf("invalid signature") type TransactionSignatureFilter struct { - onAcceptCallback func(tx *transaction.Transaction) + onAcceptCallback func(tx *transaction.Transaction, peer *peer.Peer) onRejectCallback func(tx *transaction.Transaction, err error) workerPool async.WorkerPool @@ -26,17 +27,17 @@ func NewTransactionSignatureFilter() (result *TransactionSignatureFilter) { return } -func (filter *TransactionSignatureFilter) Filter(tx *transaction.Transaction) { +func (filter *TransactionSignatureFilter) Filter(tx *transaction.Transaction, peer *peer.Peer) { filter.workerPool.Submit(func() { if tx.VerifySignature() { - filter.getAcceptCallback()(tx) + filter.getAcceptCallback()(tx, peer) } else { filter.getRejectCallback()(tx, ErrInvalidSignature) } }) } -func (filter *TransactionSignatureFilter) OnAccept(callback func(tx *transaction.Transaction)) { +func (filter *TransactionSignatureFilter) OnAccept(callback func(tx *transaction.Transaction, peer *peer.Peer)) { filter.onAcceptCallbackMutex.Lock() filter.onAcceptCallback = callback filter.onAcceptCallbackMutex.Unlock() @@ -52,7 +53,7 @@ func (filter *TransactionSignatureFilter) Shutdown() { filter.workerPool.ShutdownGracefully() } -func (filter *TransactionSignatureFilter) getAcceptCallback() (result func(tx *transaction.Transaction)) { +func (filter *TransactionSignatureFilter) getAcceptCallback() (result func(tx *transaction.Transaction, peer *peer.Peer)) { filter.onAcceptCallbackMutex.RLock() result = filter.onAcceptCallback filter.onAcceptCallbackMutex.RUnlock() diff --git a/packages/binary/tangle/transactionparser/bytes_filter.go b/packages/binary/tangle/transactionparser/bytes_filter.go index ac541a7b1c8ed2bd883dd2e5a5c6e7cec49234a7..277c2da119fdd8ea355ab18ef7ce599ffe38250a 100644 --- a/packages/binary/tangle/transactionparser/bytes_filter.go +++ b/packages/binary/tangle/transactionparser/bytes_filter.go @@ -1,8 +1,12 @@ package transactionparser +import ( + "github.com/iotaledger/hive.go/autopeering/peer" +) + type BytesFilter interface { - Filter(bytes []byte) - OnAccept(callback func(bytes []byte)) + Filter(bytes []byte, peer *peer.Peer) + OnAccept(callback func(bytes []byte, peer *peer.Peer)) OnReject(callback func(bytes []byte, err error)) Shutdown() } diff --git a/packages/binary/tangle/transactionparser/transaction_filter.go b/packages/binary/tangle/transactionparser/transaction_filter.go index c5b9b93e0932b6fd0dcdcdf4b2b6345589b0cbc2..2898afde8b30468ed2b1041126395436f18686f7 100644 --- a/packages/binary/tangle/transactionparser/transaction_filter.go +++ b/packages/binary/tangle/transactionparser/transaction_filter.go @@ -1,12 +1,14 @@ package transactionparser import ( + "github.com/iotaledger/hive.go/autopeering/peer" + "github.com/iotaledger/goshimmer/packages/binary/tangle/model/transaction" ) type TransactionFilter interface { - Filter(tx *transaction.Transaction) - OnAccept(callback func(tx *transaction.Transaction)) + Filter(tx *transaction.Transaction, peer *peer.Peer) + OnAccept(callback func(tx *transaction.Transaction, peer *peer.Peer)) OnReject(callback func(tx *transaction.Transaction, err error)) Shutdown() } diff --git a/packages/binary/tangle/transactionparser/transactionparser.go b/packages/binary/tangle/transactionparser/transactionparser.go index feb61d0781a66a590e27d551f44fe52000697270..a144746d28f73cf16b9eca88cf5fee6005f7172f 100644 --- a/packages/binary/tangle/transactionparser/transactionparser.go +++ b/packages/binary/tangle/transactionparser/transactionparser.go @@ -3,6 +3,8 @@ package transactionparser import ( "sync" + "github.com/iotaledger/hive.go/autopeering/peer" + "github.com/iotaledger/goshimmer/packages/binary/tangle/model/transaction" "github.com/iotaledger/goshimmer/packages/binary/tangle/transactionparser/builtinfilters" @@ -31,7 +33,7 @@ func New() (result *TransactionParser) { handler.(func([]byte, error))(params[0].([]byte), params[1].(error)) }), TransactionParsed: events.NewEvent(func(handler interface{}, params ...interface{}) { - handler.(func(*transaction.Transaction))(params[0].(*transaction.Transaction)) + handler.(func(*transaction.Transaction, *peer.Peer))(params[0].(*transaction.Transaction), params[1].(*peer.Peer)) }), TransactionRejected: events.NewEvent(func(handler interface{}, params ...interface{}) { handler.(func(*transaction.Transaction, error))(params[0].(*transaction.Transaction), params[1].(error)) @@ -46,11 +48,11 @@ func New() (result *TransactionParser) { return } -func (transactionParser *TransactionParser) Parse(transactionBytes []byte) { +func (transactionParser *TransactionParser) Parse(transactionBytes []byte, peer *peer.Peer) { transactionParser.setupBytesFilterDataFlow() transactionParser.setupTransactionsFilterDataFlow() - transactionParser.bytesFilters[0].Filter(transactionBytes) + transactionParser.bytesFilters[0].Filter(transactionBytes, peer) } func (transactionParser *TransactionParser) AddBytesFilter(filter BytesFilter) { @@ -119,8 +121,8 @@ func (transactionParser *TransactionParser) setupTransactionsFilterDataFlow() { numberOfTransactionFilters := len(transactionParser.transactionFilters) for i := 0; i < numberOfTransactionFilters; i++ { if i == numberOfTransactionFilters-1 { - transactionParser.transactionFilters[i].OnAccept(func(tx *transaction.Transaction) { - transactionParser.Events.TransactionParsed.Trigger(tx) + transactionParser.transactionFilters[i].OnAccept(func(tx *transaction.Transaction, peer *peer.Peer) { + transactionParser.Events.TransactionParsed.Trigger(tx, peer) }) } else { transactionParser.transactionFilters[i].OnAccept(transactionParser.transactionFilters[i+1].Filter) @@ -133,10 +135,10 @@ func (transactionParser *TransactionParser) setupTransactionsFilterDataFlow() { transactionParser.transactionFiltersMutex.Unlock() } -func (transactionParser *TransactionParser) parseTransaction(bytes []byte) { +func (transactionParser *TransactionParser) parseTransaction(bytes []byte, peer *peer.Peer) { if parsedTransaction, err := transaction.FromBytes(bytes); err != nil { transactionParser.Events.BytesRejected.Trigger(bytes, err) } else { - transactionParser.transactionFilters[0].Filter(parsedTransaction) + transactionParser.transactionFilters[0].Filter(parsedTransaction, peer) } } diff --git a/packages/binary/tangle/transactionparser/transactionparser_test.go b/packages/binary/tangle/transactionparser/transactionparser_test.go index c7dc3b18a91c82260b4196d7794e499cabec13c1..5741252932ba5a82eb889300aa32e031f4b6ce2b 100644 --- a/packages/binary/tangle/transactionparser/transactionparser_test.go +++ b/packages/binary/tangle/transactionparser/transactionparser_test.go @@ -19,7 +19,7 @@ func BenchmarkTransactionParser_ParseBytesSame(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { - txParser.Parse(txBytes) + txParser.Parse(txBytes, nil) } txParser.Shutdown() @@ -36,7 +36,7 @@ func BenchmarkTransactionParser_ParseBytesDifferent(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { - txParser.Parse(transactionBytes[i]) + txParser.Parse(transactionBytes[i], nil) } txParser.Shutdown() @@ -46,7 +46,7 @@ func TestTransactionParser_ParseTransaction(t *testing.T) { tx := transaction.New(transaction.EmptyId, transaction.EmptyId, identity.Generate(), data.New([]byte("Test"))) txParser := New() - txParser.Parse(tx.GetBytes()) + txParser.Parse(tx.GetBytes(), nil) txParser.Events.TransactionParsed.Attach(events.NewClosure(func(tx *transaction.Transaction) { fmt.Println("PARSED!!!") diff --git a/plugins/gossip/plugin.go b/plugins/gossip/plugin.go index 4b8bee220239a19ed7facdff06e2f4f311e430d5..dcf615e8af4f4220f6f58f3da88644e061a3c653 100644 --- a/plugins/gossip/plugin.go +++ b/plugins/gossip/plugin.go @@ -73,7 +73,7 @@ func configureEvents() { // configure flow of incoming transactions gossip.Events.TransactionReceived.Attach(events.NewClosure(func(event *gossip.TransactionReceivedEvent) { - tangle.TransactionParser.Parse(event.Data) + tangle.TransactionParser.Parse(event.Data, event.Peer) })) // configure flow of outgoing transactions (gossip on solidification) diff --git a/plugins/spa/plugin.go b/plugins/spa/plugin.go index 89889e96004b56a56a66382a50b037d9bfc10279..9e19c396601b6e11425095c111da8ca8aa8b9f41 100644 --- a/plugins/spa/plugin.go +++ b/plugins/spa/plugin.go @@ -55,7 +55,6 @@ func configure(plugin *node.Plugin) { } func run(plugin *node.Plugin) { - notifyStatus := events.NewClosure(func(tps uint64) { wsSendWorkerPool.TrySubmit(tps) }) diff --git a/plugins/tangle/plugin.go b/plugins/tangle/plugin.go index 0d5c16d94de0e014650a0ed8ca49131153c9e8ea..acc998fe3a0e6801f6337f0a120a91683fb0bab7 100644 --- a/plugins/tangle/plugin.go +++ b/plugins/tangle/plugin.go @@ -1,6 +1,8 @@ package tangle import ( + "github.com/iotaledger/hive.go/autopeering/peer" + "github.com/iotaledger/goshimmer/packages/binary/storageprefix" "github.com/iotaledger/goshimmer/packages/binary/tangle" "github.com/iotaledger/goshimmer/packages/binary/tangle/model/transaction" @@ -39,7 +41,9 @@ func configure(*node.Plugin) { Instance = tangle.New(database.GetBadgerInstance(), storageprefix.MainNet) // setup TransactionParser - TransactionParser.Events.TransactionParsed.Attach(events.NewClosure(func(transaction *transaction.Transaction) { + TransactionParser.Events.TransactionParsed.Attach(events.NewClosure(func(transaction *transaction.Transaction, peer *peer.Peer) { + // TODO: ADD PEER + Instance.AttachTransaction(transaction) }))