Skip to content
Snippets Groups Projects
Commit 11a24985 authored by Hans Moog's avatar Hans Moog
Browse files

Feat: TransactionParser uses Peer

parent 8f5d0e6d
Branches
Tags
No related merge requests found
...@@ -77,6 +77,7 @@ func (spammer *Spammer) run(tps int) { ...@@ -77,6 +77,7 @@ func (spammer *Spammer) run(tps int) {
trunkTransactionId, branchTransactionId := spammer.tipSelector.GetTips() trunkTransactionId, branchTransactionId := spammer.tipSelector.GetTips()
spammer.transactionParser.Parse( spammer.transactionParser.Parse(
transaction.New(trunkTransactionId, branchTransactionId, identity.Generate(), data.New([]byte("SPAM"))).GetBytes(), transaction.New(trunkTransactionId, branchTransactionId, identity.Generate(), data.New([]byte("SPAM"))).GetBytes(),
nil,
) )
currentSentCounter++ currentSentCounter++
...@@ -119,7 +120,7 @@ func (spammer *Spammer) sendBurst(transactions int) { ...@@ -119,7 +120,7 @@ func (spammer *Spammer) sendBurst(transactions int) {
return return
default: default:
spammer.transactionParser.Parse(burstBuffer[i]) spammer.transactionParser.Parse(burstBuffer[i], nil)
} }
} }
} }
...@@ -5,6 +5,7 @@ import ( ...@@ -5,6 +5,7 @@ import (
"sync" "sync"
"github.com/iotaledger/hive.go/async" "github.com/iotaledger/hive.go/async"
"github.com/iotaledger/hive.go/autopeering/peer"
"github.com/iotaledger/hive.go/bytesfilter" "github.com/iotaledger/hive.go/bytesfilter"
) )
...@@ -12,7 +13,7 @@ var ErrReceivedDuplicateBytes = fmt.Errorf("received duplicate bytes") ...@@ -12,7 +13,7 @@ var ErrReceivedDuplicateBytes = fmt.Errorf("received duplicate bytes")
type RecentlySeenBytesFilter struct { type RecentlySeenBytesFilter struct {
bytesFilter *bytesfilter.BytesFilter bytesFilter *bytesfilter.BytesFilter
onAcceptCallback func(bytes []byte) onAcceptCallback func(bytes []byte, peer *peer.Peer)
onRejectCallback func(bytes []byte, err error) onRejectCallback func(bytes []byte, err error)
workerPool async.WorkerPool workerPool async.WorkerPool
...@@ -28,17 +29,17 @@ func NewRecentlySeenBytesFilter() (result *RecentlySeenBytesFilter) { ...@@ -28,17 +29,17 @@ func NewRecentlySeenBytesFilter() (result *RecentlySeenBytesFilter) {
return return
} }
func (filter *RecentlySeenBytesFilter) Filter(bytes []byte) { func (filter *RecentlySeenBytesFilter) Filter(bytes []byte, peer *peer.Peer) {
filter.workerPool.Submit(func() { filter.workerPool.Submit(func() {
if filter.bytesFilter.Add(bytes) { if filter.bytesFilter.Add(bytes) {
filter.getAcceptCallback()(bytes) filter.getAcceptCallback()(bytes, peer)
} else { } else {
filter.getRejectCallback()(bytes, ErrReceivedDuplicateBytes) filter.getRejectCallback()(bytes, ErrReceivedDuplicateBytes)
} }
}) })
} }
func (filter *RecentlySeenBytesFilter) OnAccept(callback func(bytes []byte)) { func (filter *RecentlySeenBytesFilter) OnAccept(callback func(bytes []byte, peer *peer.Peer)) {
filter.onAcceptCallbackMutex.Lock() filter.onAcceptCallbackMutex.Lock()
filter.onAcceptCallback = callback filter.onAcceptCallback = callback
filter.onAcceptCallbackMutex.Unlock() filter.onAcceptCallbackMutex.Unlock()
...@@ -50,7 +51,7 @@ func (filter *RecentlySeenBytesFilter) OnReject(callback func(bytes []byte, err ...@@ -50,7 +51,7 @@ func (filter *RecentlySeenBytesFilter) OnReject(callback func(bytes []byte, err
filter.onRejectCallbackMutex.Unlock() filter.onRejectCallbackMutex.Unlock()
} }
func (filter *RecentlySeenBytesFilter) getAcceptCallback() (result func(bytes []byte)) { func (filter *RecentlySeenBytesFilter) getAcceptCallback() (result func(bytes []byte, peer *peer.Peer)) {
filter.onAcceptCallbackMutex.Lock() filter.onAcceptCallbackMutex.Lock()
result = filter.onAcceptCallback result = filter.onAcceptCallback
filter.onAcceptCallbackMutex.Unlock() filter.onAcceptCallbackMutex.Unlock()
......
...@@ -5,6 +5,7 @@ import ( ...@@ -5,6 +5,7 @@ import (
"sync" "sync"
"github.com/iotaledger/hive.go/async" "github.com/iotaledger/hive.go/async"
"github.com/iotaledger/hive.go/autopeering/peer"
"github.com/iotaledger/goshimmer/packages/binary/tangle/model/transaction" "github.com/iotaledger/goshimmer/packages/binary/tangle/model/transaction"
) )
...@@ -12,7 +13,7 @@ import ( ...@@ -12,7 +13,7 @@ import (
var ErrInvalidSignature = fmt.Errorf("invalid signature") var ErrInvalidSignature = fmt.Errorf("invalid signature")
type TransactionSignatureFilter struct { type TransactionSignatureFilter struct {
onAcceptCallback func(tx *transaction.Transaction) onAcceptCallback func(tx *transaction.Transaction, peer *peer.Peer)
onRejectCallback func(tx *transaction.Transaction, err error) onRejectCallback func(tx *transaction.Transaction, err error)
workerPool async.WorkerPool workerPool async.WorkerPool
...@@ -26,17 +27,17 @@ func NewTransactionSignatureFilter() (result *TransactionSignatureFilter) { ...@@ -26,17 +27,17 @@ func NewTransactionSignatureFilter() (result *TransactionSignatureFilter) {
return return
} }
func (filter *TransactionSignatureFilter) Filter(tx *transaction.Transaction) { func (filter *TransactionSignatureFilter) Filter(tx *transaction.Transaction, peer *peer.Peer) {
filter.workerPool.Submit(func() { filter.workerPool.Submit(func() {
if tx.VerifySignature() { if tx.VerifySignature() {
filter.getAcceptCallback()(tx) filter.getAcceptCallback()(tx, peer)
} else { } else {
filter.getRejectCallback()(tx, ErrInvalidSignature) filter.getRejectCallback()(tx, ErrInvalidSignature)
} }
}) })
} }
func (filter *TransactionSignatureFilter) OnAccept(callback func(tx *transaction.Transaction)) { func (filter *TransactionSignatureFilter) OnAccept(callback func(tx *transaction.Transaction, peer *peer.Peer)) {
filter.onAcceptCallbackMutex.Lock() filter.onAcceptCallbackMutex.Lock()
filter.onAcceptCallback = callback filter.onAcceptCallback = callback
filter.onAcceptCallbackMutex.Unlock() filter.onAcceptCallbackMutex.Unlock()
...@@ -52,7 +53,7 @@ func (filter *TransactionSignatureFilter) Shutdown() { ...@@ -52,7 +53,7 @@ func (filter *TransactionSignatureFilter) Shutdown() {
filter.workerPool.ShutdownGracefully() filter.workerPool.ShutdownGracefully()
} }
func (filter *TransactionSignatureFilter) getAcceptCallback() (result func(tx *transaction.Transaction)) { func (filter *TransactionSignatureFilter) getAcceptCallback() (result func(tx *transaction.Transaction, peer *peer.Peer)) {
filter.onAcceptCallbackMutex.RLock() filter.onAcceptCallbackMutex.RLock()
result = filter.onAcceptCallback result = filter.onAcceptCallback
filter.onAcceptCallbackMutex.RUnlock() filter.onAcceptCallbackMutex.RUnlock()
......
package transactionparser package transactionparser
import (
"github.com/iotaledger/hive.go/autopeering/peer"
)
type BytesFilter interface { type BytesFilter interface {
Filter(bytes []byte) Filter(bytes []byte, peer *peer.Peer)
OnAccept(callback func(bytes []byte)) OnAccept(callback func(bytes []byte, peer *peer.Peer))
OnReject(callback func(bytes []byte, err error)) OnReject(callback func(bytes []byte, err error))
Shutdown() Shutdown()
} }
package transactionparser package transactionparser
import ( import (
"github.com/iotaledger/hive.go/autopeering/peer"
"github.com/iotaledger/goshimmer/packages/binary/tangle/model/transaction" "github.com/iotaledger/goshimmer/packages/binary/tangle/model/transaction"
) )
type TransactionFilter interface { type TransactionFilter interface {
Filter(tx *transaction.Transaction) Filter(tx *transaction.Transaction, peer *peer.Peer)
OnAccept(callback func(tx *transaction.Transaction)) OnAccept(callback func(tx *transaction.Transaction, peer *peer.Peer))
OnReject(callback func(tx *transaction.Transaction, err error)) OnReject(callback func(tx *transaction.Transaction, err error))
Shutdown() Shutdown()
} }
...@@ -3,6 +3,8 @@ package transactionparser ...@@ -3,6 +3,8 @@ package transactionparser
import ( import (
"sync" "sync"
"github.com/iotaledger/hive.go/autopeering/peer"
"github.com/iotaledger/goshimmer/packages/binary/tangle/model/transaction" "github.com/iotaledger/goshimmer/packages/binary/tangle/model/transaction"
"github.com/iotaledger/goshimmer/packages/binary/tangle/transactionparser/builtinfilters" "github.com/iotaledger/goshimmer/packages/binary/tangle/transactionparser/builtinfilters"
...@@ -31,7 +33,7 @@ func New() (result *TransactionParser) { ...@@ -31,7 +33,7 @@ func New() (result *TransactionParser) {
handler.(func([]byte, error))(params[0].([]byte), params[1].(error)) handler.(func([]byte, error))(params[0].([]byte), params[1].(error))
}), }),
TransactionParsed: events.NewEvent(func(handler interface{}, params ...interface{}) { TransactionParsed: events.NewEvent(func(handler interface{}, params ...interface{}) {
handler.(func(*transaction.Transaction))(params[0].(*transaction.Transaction)) handler.(func(*transaction.Transaction, *peer.Peer))(params[0].(*transaction.Transaction), params[1].(*peer.Peer))
}), }),
TransactionRejected: events.NewEvent(func(handler interface{}, params ...interface{}) { TransactionRejected: events.NewEvent(func(handler interface{}, params ...interface{}) {
handler.(func(*transaction.Transaction, error))(params[0].(*transaction.Transaction), params[1].(error)) handler.(func(*transaction.Transaction, error))(params[0].(*transaction.Transaction), params[1].(error))
...@@ -46,11 +48,11 @@ func New() (result *TransactionParser) { ...@@ -46,11 +48,11 @@ func New() (result *TransactionParser) {
return return
} }
func (transactionParser *TransactionParser) Parse(transactionBytes []byte) { func (transactionParser *TransactionParser) Parse(transactionBytes []byte, peer *peer.Peer) {
transactionParser.setupBytesFilterDataFlow() transactionParser.setupBytesFilterDataFlow()
transactionParser.setupTransactionsFilterDataFlow() transactionParser.setupTransactionsFilterDataFlow()
transactionParser.bytesFilters[0].Filter(transactionBytes) transactionParser.bytesFilters[0].Filter(transactionBytes, peer)
} }
func (transactionParser *TransactionParser) AddBytesFilter(filter BytesFilter) { func (transactionParser *TransactionParser) AddBytesFilter(filter BytesFilter) {
...@@ -119,8 +121,8 @@ func (transactionParser *TransactionParser) setupTransactionsFilterDataFlow() { ...@@ -119,8 +121,8 @@ func (transactionParser *TransactionParser) setupTransactionsFilterDataFlow() {
numberOfTransactionFilters := len(transactionParser.transactionFilters) numberOfTransactionFilters := len(transactionParser.transactionFilters)
for i := 0; i < numberOfTransactionFilters; i++ { for i := 0; i < numberOfTransactionFilters; i++ {
if i == numberOfTransactionFilters-1 { if i == numberOfTransactionFilters-1 {
transactionParser.transactionFilters[i].OnAccept(func(tx *transaction.Transaction) { transactionParser.transactionFilters[i].OnAccept(func(tx *transaction.Transaction, peer *peer.Peer) {
transactionParser.Events.TransactionParsed.Trigger(tx) transactionParser.Events.TransactionParsed.Trigger(tx, peer)
}) })
} else { } else {
transactionParser.transactionFilters[i].OnAccept(transactionParser.transactionFilters[i+1].Filter) transactionParser.transactionFilters[i].OnAccept(transactionParser.transactionFilters[i+1].Filter)
...@@ -133,10 +135,10 @@ func (transactionParser *TransactionParser) setupTransactionsFilterDataFlow() { ...@@ -133,10 +135,10 @@ func (transactionParser *TransactionParser) setupTransactionsFilterDataFlow() {
transactionParser.transactionFiltersMutex.Unlock() transactionParser.transactionFiltersMutex.Unlock()
} }
func (transactionParser *TransactionParser) parseTransaction(bytes []byte) { func (transactionParser *TransactionParser) parseTransaction(bytes []byte, peer *peer.Peer) {
if parsedTransaction, err := transaction.FromBytes(bytes); err != nil { if parsedTransaction, err := transaction.FromBytes(bytes); err != nil {
transactionParser.Events.BytesRejected.Trigger(bytes, err) transactionParser.Events.BytesRejected.Trigger(bytes, err)
} else { } else {
transactionParser.transactionFilters[0].Filter(parsedTransaction) transactionParser.transactionFilters[0].Filter(parsedTransaction, peer)
} }
} }
...@@ -19,7 +19,7 @@ func BenchmarkTransactionParser_ParseBytesSame(b *testing.B) { ...@@ -19,7 +19,7 @@ func BenchmarkTransactionParser_ParseBytesSame(b *testing.B) {
b.ResetTimer() b.ResetTimer()
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
txParser.Parse(txBytes) txParser.Parse(txBytes, nil)
} }
txParser.Shutdown() txParser.Shutdown()
...@@ -36,7 +36,7 @@ func BenchmarkTransactionParser_ParseBytesDifferent(b *testing.B) { ...@@ -36,7 +36,7 @@ func BenchmarkTransactionParser_ParseBytesDifferent(b *testing.B) {
b.ResetTimer() b.ResetTimer()
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
txParser.Parse(transactionBytes[i]) txParser.Parse(transactionBytes[i], nil)
} }
txParser.Shutdown() txParser.Shutdown()
...@@ -46,7 +46,7 @@ func TestTransactionParser_ParseTransaction(t *testing.T) { ...@@ -46,7 +46,7 @@ func TestTransactionParser_ParseTransaction(t *testing.T) {
tx := transaction.New(transaction.EmptyId, transaction.EmptyId, identity.Generate(), data.New([]byte("Test"))) tx := transaction.New(transaction.EmptyId, transaction.EmptyId, identity.Generate(), data.New([]byte("Test")))
txParser := New() txParser := New()
txParser.Parse(tx.GetBytes()) txParser.Parse(tx.GetBytes(), nil)
txParser.Events.TransactionParsed.Attach(events.NewClosure(func(tx *transaction.Transaction) { txParser.Events.TransactionParsed.Attach(events.NewClosure(func(tx *transaction.Transaction) {
fmt.Println("PARSED!!!") fmt.Println("PARSED!!!")
......
...@@ -73,7 +73,7 @@ func configureEvents() { ...@@ -73,7 +73,7 @@ func configureEvents() {
// configure flow of incoming transactions // configure flow of incoming transactions
gossip.Events.TransactionReceived.Attach(events.NewClosure(func(event *gossip.TransactionReceivedEvent) { gossip.Events.TransactionReceived.Attach(events.NewClosure(func(event *gossip.TransactionReceivedEvent) {
tangle.TransactionParser.Parse(event.Data) tangle.TransactionParser.Parse(event.Data, event.Peer)
})) }))
// configure flow of outgoing transactions (gossip on solidification) // configure flow of outgoing transactions (gossip on solidification)
......
...@@ -55,7 +55,6 @@ func configure(plugin *node.Plugin) { ...@@ -55,7 +55,6 @@ func configure(plugin *node.Plugin) {
} }
func run(plugin *node.Plugin) { func run(plugin *node.Plugin) {
notifyStatus := events.NewClosure(func(tps uint64) { notifyStatus := events.NewClosure(func(tps uint64) {
wsSendWorkerPool.TrySubmit(tps) wsSendWorkerPool.TrySubmit(tps)
}) })
......
package tangle package tangle
import ( import (
"github.com/iotaledger/hive.go/autopeering/peer"
"github.com/iotaledger/goshimmer/packages/binary/storageprefix" "github.com/iotaledger/goshimmer/packages/binary/storageprefix"
"github.com/iotaledger/goshimmer/packages/binary/tangle" "github.com/iotaledger/goshimmer/packages/binary/tangle"
"github.com/iotaledger/goshimmer/packages/binary/tangle/model/transaction" "github.com/iotaledger/goshimmer/packages/binary/tangle/model/transaction"
...@@ -39,7 +41,9 @@ func configure(*node.Plugin) { ...@@ -39,7 +41,9 @@ func configure(*node.Plugin) {
Instance = tangle.New(database.GetBadgerInstance(), storageprefix.MainNet) Instance = tangle.New(database.GetBadgerInstance(), storageprefix.MainNet)
// setup TransactionParser // setup TransactionParser
TransactionParser.Events.TransactionParsed.Attach(events.NewClosure(func(transaction *transaction.Transaction) { TransactionParser.Events.TransactionParsed.Attach(events.NewClosure(func(transaction *transaction.Transaction, peer *peer.Peer) {
// TODO: ADD PEER
Instance.AttachTransaction(transaction) Instance.AttachTransaction(transaction)
})) }))
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment