diff --git a/go.mod b/go.mod index cb09e5a0938d9f34b15aa902eee8826462d4d96c..b648140393b9008b1a2eeef475ae00dae4b1b28b 100644 --- a/go.mod +++ b/go.mod @@ -13,7 +13,7 @@ require ( github.com/googollee/go-engine.io v1.4.3-0.20190924125625-798118fc0dd2 github.com/googollee/go-socket.io v1.4.3-0.20191204093753-683f8725b6d0 github.com/gorilla/websocket v1.4.1 - github.com/iotaledger/hive.go v0.0.0-20200217140357-8f1ea1f52085 + github.com/iotaledger/hive.go v0.0.0-20200219224037-2d5f5238c0de github.com/iotaledger/iota.go v1.0.0-beta.14 github.com/labstack/echo v3.3.10+incompatible github.com/labstack/gommon v0.3.0 // indirect diff --git a/go.sum b/go.sum index ccf7ed45bbe3db76d7f7e6d29cf5b386c13fb12a..cf167e7e365908b9be6c4b5053957099614351b6 100644 --- a/go.sum +++ b/go.sum @@ -127,6 +127,10 @@ github.com/iotaledger/hive.go v0.0.0-20200212114128-1460dba4a6b0 h1:AXUiVkx3QWQP github.com/iotaledger/hive.go v0.0.0-20200212114128-1460dba4a6b0/go.mod h1:wj3bFHlcX0NiEOWu5+WOg/MI/5N7PKCFnyaziaylB64= github.com/iotaledger/hive.go v0.0.0-20200217140357-8f1ea1f52085 h1:gkxkCRUlAszGx1qgN+QxnVI5VvX7bxH0/2hqz8l8lSo= github.com/iotaledger/hive.go v0.0.0-20200217140357-8f1ea1f52085/go.mod h1:wj3bFHlcX0NiEOWu5+WOg/MI/5N7PKCFnyaziaylB64= +github.com/iotaledger/hive.go v0.0.0-20200219220156-cd331d4393e2 h1:BozQ8rDfaCuUGQfCN6RdbklksQg2idkRN32xyoRLQuk= +github.com/iotaledger/hive.go v0.0.0-20200219220156-cd331d4393e2/go.mod h1:wj3bFHlcX0NiEOWu5+WOg/MI/5N7PKCFnyaziaylB64= +github.com/iotaledger/hive.go v0.0.0-20200219224037-2d5f5238c0de h1:J9G9YWM5q7r3DObMIx/Qc8CUjrpD+c90EPVKjsBrR+E= +github.com/iotaledger/hive.go v0.0.0-20200219224037-2d5f5238c0de/go.mod h1:wj3bFHlcX0NiEOWu5+WOg/MI/5N7PKCFnyaziaylB64= github.com/iotaledger/iota.go v1.0.0-beta.9/go.mod h1:F6WBmYd98mVjAmmPVYhnxg8NNIWCjjH8VWT9qvv3Rc8= github.com/iotaledger/iota.go v1.0.0-beta.14 h1:Oeb28MfBuJEeXcGrLhTCJFtbsnc8y1u7xidsAmiOD5A= github.com/iotaledger/iota.go v1.0.0-beta.14/go.mod h1:F6WBmYd98mVjAmmPVYhnxg8NNIWCjjH8VWT9qvv3Rc8= diff --git a/out.png b/out.png new file mode 100644 index 0000000000000000000000000000000000000000..791e95508b8a43cf8e9c0cc509a1f15ef5a53ed7 Binary files /dev/null and b/out.png differ diff --git a/packages/binary/datastructure/queue/queue.go b/packages/binary/datastructure/queue/queue.go new file mode 100644 index 0000000000000000000000000000000000000000..33cd34f9344607d86596e422cb7c8d539b4a5c66 --- /dev/null +++ b/packages/binary/datastructure/queue/queue.go @@ -0,0 +1,66 @@ +package queue + +import ( + "sync" +) + +type Queue struct { + ringBuffer []interface{} + read int + write int + capacity int + size int + mutex sync.Mutex +} + +func New(capacity int) *Queue { + return &Queue{ + ringBuffer: make([]interface{}, capacity), + capacity: capacity, + } +} + +func (queue *Queue) Size() int { + queue.mutex.Lock() + defer queue.mutex.Unlock() + + return queue.size +} + +func (queue *Queue) Capacity() int { + queue.mutex.Lock() + defer queue.mutex.Unlock() + + return queue.capacity +} + +func (queue *Queue) Offer(element interface{}) bool { + queue.mutex.Lock() + defer queue.mutex.Unlock() + + if queue.size == queue.capacity { + return false + } + + queue.ringBuffer[queue.write] = element + queue.write = (queue.write + 1) % queue.capacity + queue.size++ + + return true +} + +func (queue *Queue) Poll() (element interface{}, success bool) { + queue.mutex.Lock() + defer queue.mutex.Unlock() + + if success = queue.size != 0; !success { + return + } + + element = queue.ringBuffer[queue.read] + queue.ringBuffer[queue.read] = nil + queue.read = (queue.read + 1) % queue.capacity + queue.size-- + + return +} diff --git a/packages/binary/datastructure/queue/queue_test.go b/packages/binary/datastructure/queue/queue_test.go new file mode 100644 index 0000000000000000000000000000000000000000..bc084afc3741331f66a6e4701a794225b72e392a --- /dev/null +++ b/packages/binary/datastructure/queue/queue_test.go @@ -0,0 +1,42 @@ +package queue + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func Test(t *testing.T) { + queue := New(2) + assert.Equal(t, 0, queue.Size()) + assert.Equal(t, 2, queue.Capacity()) + + assert.Equal(t, true, queue.Offer(1)) + assert.Equal(t, 1, queue.Size()) + + assert.Equal(t, true, queue.Offer(2)) + assert.Equal(t, 2, queue.Size()) + + assert.Equal(t, false, queue.Offer(3)) + + polledValue, ok := queue.Poll() + assert.Equal(t, true, ok) + assert.Equal(t, 1, polledValue) + assert.Equal(t, 1, queue.Size()) + + polledValue, ok = queue.Poll() + assert.Equal(t, true, ok) + assert.Equal(t, 2, polledValue) + assert.Equal(t, 0, queue.Size()) + + polledValue, ok = queue.Poll() + assert.Equal(t, false, ok) + assert.Equal(t, nil, polledValue) + assert.Equal(t, 0, queue.Size()) + + assert.Equal(t, true, queue.Offer(3)) + assert.Equal(t, 1, queue.Size()) + + assert.Equal(t, true, queue.Offer(4)) + assert.Equal(t, 2, queue.Size()) +} diff --git a/packages/binary/spammer/spammer.go b/packages/binary/spammer/spammer.go index 31a812b41355e58b76895a5380ceda16b7778505..a6636c74fd89625bc1fe595f1e73f61a9cf7ef0f 100644 --- a/packages/binary/spammer/spammer.go +++ b/packages/binary/spammer/spammer.go @@ -1,7 +1,6 @@ package spammer import ( - "fmt" "sync" "time" @@ -31,45 +30,25 @@ func New(transactionParser *transactionparser.TransactionParser, tipSelector *ti } } -func (spammer *Spammer) Burst(transactions int) { - spammingIdentity := identity.Generate() - - previousTransactionId := transaction.EmptyId - - fmt.Println("STARTING TO GENERATE") - - burstBuffer := make([][]byte, transactions) - for i := 0; i < transactions; i++ { - spamTransaction := transaction.New(previousTransactionId, previousTransactionId, spammingIdentity, data.New([]byte("SPAM"))) - previousTransactionId = spamTransaction.GetId() - burstBuffer[i] = spamTransaction.GetBytes() - - if i%1000 == 0 { - fmt.Println("GENERATED", i) - } - } - - fmt.Println("STARTING TO SPAM") +func (spammer *Spammer) Start(tps int) { + spammer.startStopMutex.Lock() + defer spammer.startStopMutex.Unlock() - for i := 0; i < transactions; i++ { - spammer.transactionParser.Parse(burstBuffer[i]) + if !spammer.running { + spammer.running = true - if i%1000 == 0 { - fmt.Println("SENT", i) - } + go spammer.run(tps) } - - fmt.Println("SPAMMING DONE") } -func (spammer *Spammer) Start(tps int) { +func (spammer *Spammer) Burst(transactions int) { spammer.startStopMutex.Lock() defer spammer.startStopMutex.Unlock() if !spammer.running { spammer.running = true - go spammer.run(tps) + go spammer.sendBurst(transactions) } } @@ -89,6 +68,7 @@ func (spammer *Spammer) run(tps int) { start := time.Now() for { + select { case <-spammer.shutdownSignal: return @@ -114,3 +94,32 @@ func (spammer *Spammer) run(tps int) { } } } + +func (spammer *Spammer) sendBurst(transactions int) { + spammingIdentity := identity.Generate() + + previousTransactionId := transaction.EmptyId + + burstBuffer := make([][]byte, transactions) + for i := 0; i < transactions; i++ { + select { + case <-spammer.shutdownSignal: + return + + default: + spamTransaction := transaction.New(previousTransactionId, previousTransactionId, spammingIdentity, data.New([]byte("SPAM"))) + previousTransactionId = spamTransaction.GetId() + burstBuffer[i] = spamTransaction.GetBytes() + } + } + + for i := 0; i < transactions; i++ { + select { + case <-spammer.shutdownSignal: + return + + default: + spammer.transactionParser.Parse(burstBuffer[i]) + } + } +} diff --git a/packages/binary/tangle/transactionparser/builtinfilters/recently_seen_bytes_filter.go b/packages/binary/tangle/transactionparser/builtinfilters/recently_seen_bytes_filter.go index c0a6002f0cdadf0290cbab19057bba3f6c3536d4..b35a2c9ff3ef44498bd810ec089c7aa57606c16c 100644 --- a/packages/binary/tangle/transactionparser/builtinfilters/recently_seen_bytes_filter.go +++ b/packages/binary/tangle/transactionparser/builtinfilters/recently_seen_bytes_filter.go @@ -1,16 +1,19 @@ package builtinfilters import ( + "fmt" "sync" "github.com/iotaledger/hive.go/async" "github.com/iotaledger/hive.go/bytesfilter" ) +var ErrReceivedDuplicateBytes = fmt.Errorf("received duplicate bytes") + type RecentlySeenBytesFilter struct { bytesFilter *bytesfilter.BytesFilter onAcceptCallback func(bytes []byte) - onRejectCallback func(bytes []byte) + onRejectCallback func(bytes []byte, err error) workerPool async.WorkerPool onAcceptCallbackMutex sync.RWMutex @@ -30,7 +33,7 @@ func (filter *RecentlySeenBytesFilter) Filter(bytes []byte) { if filter.bytesFilter.Add(bytes) { filter.getAcceptCallback()(bytes) } else { - filter.getRejectCallback()(bytes) + filter.getRejectCallback()(bytes, ErrReceivedDuplicateBytes) } }) } @@ -41,7 +44,7 @@ func (filter *RecentlySeenBytesFilter) OnAccept(callback func(bytes []byte)) { filter.onAcceptCallbackMutex.Unlock() } -func (filter *RecentlySeenBytesFilter) OnReject(callback func(bytes []byte)) { +func (filter *RecentlySeenBytesFilter) OnReject(callback func(bytes []byte, err error)) { filter.onRejectCallbackMutex.Lock() filter.onRejectCallback = callback filter.onRejectCallbackMutex.Unlock() @@ -55,7 +58,7 @@ func (filter *RecentlySeenBytesFilter) getAcceptCallback() (result func(bytes [] return } -func (filter *RecentlySeenBytesFilter) getRejectCallback() (result func(bytes []byte)) { +func (filter *RecentlySeenBytesFilter) getRejectCallback() (result func(bytes []byte, err error)) { filter.onRejectCallbackMutex.Lock() result = filter.onRejectCallback filter.onRejectCallbackMutex.Unlock() diff --git a/packages/binary/tangle/transactionparser/builtinfilters/transaction_signature_filter.go b/packages/binary/tangle/transactionparser/builtinfilters/transaction_signature_filter.go index 9160d821cfa2ff6c1d1946ca34953010f8ddf38b..6dccd3f9254ce12f58b18f18ce6a00d1dc11bf7e 100644 --- a/packages/binary/tangle/transactionparser/builtinfilters/transaction_signature_filter.go +++ b/packages/binary/tangle/transactionparser/builtinfilters/transaction_signature_filter.go @@ -1,6 +1,7 @@ package builtinfilters import ( + "fmt" "sync" "github.com/iotaledger/hive.go/async" @@ -8,9 +9,11 @@ import ( "github.com/iotaledger/goshimmer/packages/binary/tangle/model/transaction" ) +var ErrInvalidSignature = fmt.Errorf("invalid signature") + type TransactionSignatureFilter struct { onAcceptCallback func(tx *transaction.Transaction) - onRejectCallback func(tx *transaction.Transaction) + onRejectCallback func(tx *transaction.Transaction, err error) workerPool async.WorkerPool onAcceptCallbackMutex sync.RWMutex @@ -28,7 +31,7 @@ func (filter *TransactionSignatureFilter) Filter(tx *transaction.Transaction) { if tx.VerifySignature() { filter.getAcceptCallback()(tx) } else { - filter.getRejectCallback()(tx) + filter.getRejectCallback()(tx, ErrInvalidSignature) } }) } @@ -39,7 +42,7 @@ func (filter *TransactionSignatureFilter) OnAccept(callback func(tx *transaction filter.onAcceptCallbackMutex.Unlock() } -func (filter *TransactionSignatureFilter) OnReject(callback func(tx *transaction.Transaction)) { +func (filter *TransactionSignatureFilter) OnReject(callback func(tx *transaction.Transaction, err error)) { filter.onRejectCallbackMutex.Lock() filter.onRejectCallback = callback filter.onRejectCallbackMutex.Unlock() @@ -57,7 +60,7 @@ func (filter *TransactionSignatureFilter) getAcceptCallback() (result func(tx *t return } -func (filter *TransactionSignatureFilter) getRejectCallback() (result func(tx *transaction.Transaction)) { +func (filter *TransactionSignatureFilter) getRejectCallback() (result func(tx *transaction.Transaction, err error)) { filter.onRejectCallbackMutex.RLock() result = filter.onRejectCallback filter.onRejectCallbackMutex.RUnlock() diff --git a/packages/binary/tangle/transactionparser/bytes_filter.go b/packages/binary/tangle/transactionparser/bytes_filter.go index c8e2bab61a1ff376f860cd97c7dfa640dda9286d..ac541a7b1c8ed2bd883dd2e5a5c6e7cec49234a7 100644 --- a/packages/binary/tangle/transactionparser/bytes_filter.go +++ b/packages/binary/tangle/transactionparser/bytes_filter.go @@ -3,6 +3,6 @@ package transactionparser type BytesFilter interface { Filter(bytes []byte) OnAccept(callback func(bytes []byte)) - OnReject(callback func(bytes []byte)) + OnReject(callback func(bytes []byte, err error)) Shutdown() } diff --git a/packages/binary/tangle/transactionparser/transaction_filter.go b/packages/binary/tangle/transactionparser/transaction_filter.go index 39dfc277c5932c6f6ce1ee8cca2d525605c04235..c5b9b93e0932b6fd0dcdcdf4b2b6345589b0cbc2 100644 --- a/packages/binary/tangle/transactionparser/transaction_filter.go +++ b/packages/binary/tangle/transactionparser/transaction_filter.go @@ -7,6 +7,6 @@ import ( type TransactionFilter interface { Filter(tx *transaction.Transaction) OnAccept(callback func(tx *transaction.Transaction)) - OnReject(callback func(tx *transaction.Transaction)) + OnReject(callback func(tx *transaction.Transaction, err error)) Shutdown() } diff --git a/packages/binary/tangle/transactionparser/transactionparser.go b/packages/binary/tangle/transactionparser/transactionparser.go index 1d5f53660d3b4b3714e26377a686ea1969f4ab82..feb61d0781a66a590e27d551f44fe52000697270 100644 --- a/packages/binary/tangle/transactionparser/transactionparser.go +++ b/packages/binary/tangle/transactionparser/transactionparser.go @@ -28,13 +28,13 @@ func New() (result *TransactionParser) { Events: transactionParserEvents{ BytesRejected: events.NewEvent(func(handler interface{}, params ...interface{}) { - handler.(func([]byte))(params[0].([]byte)) + handler.(func([]byte, error))(params[0].([]byte), params[1].(error)) }), TransactionParsed: events.NewEvent(func(handler interface{}, params ...interface{}) { handler.(func(*transaction.Transaction))(params[0].(*transaction.Transaction)) }), TransactionRejected: events.NewEvent(func(handler interface{}, params ...interface{}) { - handler.(func(*transaction.Transaction))(params[0].(*transaction.Transaction)) + handler.(func(*transaction.Transaction, error))(params[0].(*transaction.Transaction), params[1].(error)) }), }, } @@ -99,7 +99,9 @@ func (transactionParser *TransactionParser) setupBytesFilterDataFlow() { } else { transactionParser.bytesFilters[i].OnAccept(transactionParser.bytesFilters[i+1].Filter) } - transactionParser.bytesFilters[i].OnReject(func(bytes []byte) { transactionParser.Events.BytesRejected.Trigger(bytes) }) + transactionParser.bytesFilters[i].OnReject(func(bytes []byte, err error) { + transactionParser.Events.BytesRejected.Trigger(bytes, err) + }) } } transactionParser.bytesFiltersMutex.Unlock() @@ -117,11 +119,15 @@ func (transactionParser *TransactionParser) setupTransactionsFilterDataFlow() { numberOfTransactionFilters := len(transactionParser.transactionFilters) for i := 0; i < numberOfTransactionFilters; i++ { if i == numberOfTransactionFilters-1 { - transactionParser.transactionFilters[i].OnAccept(func(tx *transaction.Transaction) { transactionParser.Events.TransactionParsed.Trigger(tx) }) + transactionParser.transactionFilters[i].OnAccept(func(tx *transaction.Transaction) { + transactionParser.Events.TransactionParsed.Trigger(tx) + }) } else { transactionParser.transactionFilters[i].OnAccept(transactionParser.transactionFilters[i+1].Filter) } - transactionParser.transactionFilters[i].OnReject(func(tx *transaction.Transaction) { transactionParser.Events.TransactionRejected.Trigger(tx) }) + transactionParser.transactionFilters[i].OnReject(func(tx *transaction.Transaction, err error) { + transactionParser.Events.TransactionRejected.Trigger(tx, err) + }) } } transactionParser.transactionFiltersMutex.Unlock() @@ -129,8 +135,7 @@ func (transactionParser *TransactionParser) setupTransactionsFilterDataFlow() { func (transactionParser *TransactionParser) parseTransaction(bytes []byte) { if parsedTransaction, err := transaction.FromBytes(bytes); err != nil { - // trigger parsingError - panic(err) + transactionParser.Events.BytesRejected.Trigger(bytes, err) } else { transactionParser.transactionFilters[0].Filter(parsedTransaction) } diff --git a/packages/shutdown/order.go b/packages/shutdown/order.go index b8347d1eaeac6069fbf3c9cc835d5b598987630e..199c0350179c1fb0723905572acb6c06c94c6183 100644 --- a/packages/shutdown/order.go +++ b/packages/shutdown/order.go @@ -3,15 +3,13 @@ package shutdown const ( ShutdownPriorityTangle = iota ShutdownPriorityRemoteLog - ShutdownPrioritySolidifier - ShutdownPriorityBundleProcessor ShutdownPriorityAnalysis ShutdownPriorityMetrics ShutdownPriorityAutopeering ShutdownPriorityGossip ShutdownPriorityWebAPI - ShutdownPriorityGraph - ShutdownPriorityTangleSpammer ShutdownPrioritySPA + ShutdownPriorityGraph + ShutdownPrioritySpammer ShutdownPriorityBadgerGarbageCollection ) diff --git a/plugins/webapi/spammer/plugin.go b/plugins/webapi/spammer/plugin.go index a545fdeb1d14e16bece395fefb6036207c9f1699..dc06b9f23dadf851976514106e39b6cc57830c79 100644 --- a/plugins/webapi/spammer/plugin.go +++ b/plugins/webapi/spammer/plugin.go @@ -1,7 +1,10 @@ package spammer import ( + "github.com/iotaledger/hive.go/daemon" + "github.com/iotaledger/goshimmer/packages/binary/spammer" + "github.com/iotaledger/goshimmer/packages/shutdown" "github.com/iotaledger/goshimmer/plugins/tangle" "github.com/iotaledger/goshimmer/plugins/webapi" @@ -10,10 +13,18 @@ import ( var transactionSpammer *spammer.Spammer -var PLUGIN = node.NewPlugin("Spammer", node.Disabled, configure) +var PLUGIN = node.NewPlugin("Spammer", node.Disabled, configure, run) func configure(plugin *node.Plugin) { transactionSpammer = spammer.New(tangle.TransactionParser, tangle.TipSelector) webapi.Server.GET("spammer", handleRequest) } + +func run(*node.Plugin) { + _ = daemon.BackgroundWorker("Tangle", func(shutdownSignal <-chan struct{}) { + <-shutdownSignal + + transactionSpammer.Shutdown() + }, shutdown.ShutdownPrioritySpammer) +} diff --git a/plugins/webapi/spammer/webapi.go b/plugins/webapi/spammer/webapi.go index c62d33040a6368587357439dabccb9f7500d7178..f020f8478abbc0edd89f13550c15a88ee416f7be 100644 --- a/plugins/webapi/spammer/webapi.go +++ b/plugins/webapi/spammer/webapi.go @@ -27,6 +27,7 @@ func handleRequest(c echo.Context) error { return c.JSON(http.StatusBadRequest, Response{Error: "burst requires the tps to be set"}) } + transactionSpammer.Shutdown() transactionSpammer.Burst(request.Tps) return c.JSON(http.StatusOK, Response{Message: "sent a burst of " + strconv.Itoa(request.Tps) + " transactions"})