Skip to content
Snippets Groups Projects
Commit 2c5e031a authored by Hans Moog's avatar Hans Moog
Browse files

Feat: reworked TransactionParser to use Errors

parent 498e4d56
No related branches found
No related tags found
No related merge requests found
Showing
with 194 additions and 52 deletions
......@@ -13,7 +13,7 @@ require (
github.com/googollee/go-engine.io v1.4.3-0.20190924125625-798118fc0dd2
github.com/googollee/go-socket.io v1.4.3-0.20191204093753-683f8725b6d0
github.com/gorilla/websocket v1.4.1
github.com/iotaledger/hive.go v0.0.0-20200217140357-8f1ea1f52085
github.com/iotaledger/hive.go v0.0.0-20200219224037-2d5f5238c0de
github.com/iotaledger/iota.go v1.0.0-beta.14
github.com/labstack/echo v3.3.10+incompatible
github.com/labstack/gommon v0.3.0 // indirect
......
......@@ -127,6 +127,10 @@ github.com/iotaledger/hive.go v0.0.0-20200212114128-1460dba4a6b0 h1:AXUiVkx3QWQP
github.com/iotaledger/hive.go v0.0.0-20200212114128-1460dba4a6b0/go.mod h1:wj3bFHlcX0NiEOWu5+WOg/MI/5N7PKCFnyaziaylB64=
github.com/iotaledger/hive.go v0.0.0-20200217140357-8f1ea1f52085 h1:gkxkCRUlAszGx1qgN+QxnVI5VvX7bxH0/2hqz8l8lSo=
github.com/iotaledger/hive.go v0.0.0-20200217140357-8f1ea1f52085/go.mod h1:wj3bFHlcX0NiEOWu5+WOg/MI/5N7PKCFnyaziaylB64=
github.com/iotaledger/hive.go v0.0.0-20200219220156-cd331d4393e2 h1:BozQ8rDfaCuUGQfCN6RdbklksQg2idkRN32xyoRLQuk=
github.com/iotaledger/hive.go v0.0.0-20200219220156-cd331d4393e2/go.mod h1:wj3bFHlcX0NiEOWu5+WOg/MI/5N7PKCFnyaziaylB64=
github.com/iotaledger/hive.go v0.0.0-20200219224037-2d5f5238c0de h1:J9G9YWM5q7r3DObMIx/Qc8CUjrpD+c90EPVKjsBrR+E=
github.com/iotaledger/hive.go v0.0.0-20200219224037-2d5f5238c0de/go.mod h1:wj3bFHlcX0NiEOWu5+WOg/MI/5N7PKCFnyaziaylB64=
github.com/iotaledger/iota.go v1.0.0-beta.9/go.mod h1:F6WBmYd98mVjAmmPVYhnxg8NNIWCjjH8VWT9qvv3Rc8=
github.com/iotaledger/iota.go v1.0.0-beta.14 h1:Oeb28MfBuJEeXcGrLhTCJFtbsnc8y1u7xidsAmiOD5A=
github.com/iotaledger/iota.go v1.0.0-beta.14/go.mod h1:F6WBmYd98mVjAmmPVYhnxg8NNIWCjjH8VWT9qvv3Rc8=
......
out.png 0 → 100644
out.png

344 KiB

package queue
import (
"sync"
)
type Queue struct {
ringBuffer []interface{}
read int
write int
capacity int
size int
mutex sync.Mutex
}
func New(capacity int) *Queue {
return &Queue{
ringBuffer: make([]interface{}, capacity),
capacity: capacity,
}
}
func (queue *Queue) Size() int {
queue.mutex.Lock()
defer queue.mutex.Unlock()
return queue.size
}
func (queue *Queue) Capacity() int {
queue.mutex.Lock()
defer queue.mutex.Unlock()
return queue.capacity
}
func (queue *Queue) Offer(element interface{}) bool {
queue.mutex.Lock()
defer queue.mutex.Unlock()
if queue.size == queue.capacity {
return false
}
queue.ringBuffer[queue.write] = element
queue.write = (queue.write + 1) % queue.capacity
queue.size++
return true
}
func (queue *Queue) Poll() (element interface{}, success bool) {
queue.mutex.Lock()
defer queue.mutex.Unlock()
if success = queue.size != 0; !success {
return
}
element = queue.ringBuffer[queue.read]
queue.ringBuffer[queue.read] = nil
queue.read = (queue.read + 1) % queue.capacity
queue.size--
return
}
package queue
import (
"testing"
"github.com/stretchr/testify/assert"
)
func Test(t *testing.T) {
queue := New(2)
assert.Equal(t, 0, queue.Size())
assert.Equal(t, 2, queue.Capacity())
assert.Equal(t, true, queue.Offer(1))
assert.Equal(t, 1, queue.Size())
assert.Equal(t, true, queue.Offer(2))
assert.Equal(t, 2, queue.Size())
assert.Equal(t, false, queue.Offer(3))
polledValue, ok := queue.Poll()
assert.Equal(t, true, ok)
assert.Equal(t, 1, polledValue)
assert.Equal(t, 1, queue.Size())
polledValue, ok = queue.Poll()
assert.Equal(t, true, ok)
assert.Equal(t, 2, polledValue)
assert.Equal(t, 0, queue.Size())
polledValue, ok = queue.Poll()
assert.Equal(t, false, ok)
assert.Equal(t, nil, polledValue)
assert.Equal(t, 0, queue.Size())
assert.Equal(t, true, queue.Offer(3))
assert.Equal(t, 1, queue.Size())
assert.Equal(t, true, queue.Offer(4))
assert.Equal(t, 2, queue.Size())
}
package spammer
import (
"fmt"
"sync"
"time"
......@@ -31,45 +30,25 @@ func New(transactionParser *transactionparser.TransactionParser, tipSelector *ti
}
}
func (spammer *Spammer) Burst(transactions int) {
spammingIdentity := identity.Generate()
previousTransactionId := transaction.EmptyId
fmt.Println("STARTING TO GENERATE")
burstBuffer := make([][]byte, transactions)
for i := 0; i < transactions; i++ {
spamTransaction := transaction.New(previousTransactionId, previousTransactionId, spammingIdentity, data.New([]byte("SPAM")))
previousTransactionId = spamTransaction.GetId()
burstBuffer[i] = spamTransaction.GetBytes()
if i%1000 == 0 {
fmt.Println("GENERATED", i)
}
}
fmt.Println("STARTING TO SPAM")
func (spammer *Spammer) Start(tps int) {
spammer.startStopMutex.Lock()
defer spammer.startStopMutex.Unlock()
for i := 0; i < transactions; i++ {
spammer.transactionParser.Parse(burstBuffer[i])
if !spammer.running {
spammer.running = true
if i%1000 == 0 {
fmt.Println("SENT", i)
}
go spammer.run(tps)
}
fmt.Println("SPAMMING DONE")
}
func (spammer *Spammer) Start(tps int) {
func (spammer *Spammer) Burst(transactions int) {
spammer.startStopMutex.Lock()
defer spammer.startStopMutex.Unlock()
if !spammer.running {
spammer.running = true
go spammer.run(tps)
go spammer.sendBurst(transactions)
}
}
......@@ -89,6 +68,7 @@ func (spammer *Spammer) run(tps int) {
start := time.Now()
for {
select {
case <-spammer.shutdownSignal:
return
......@@ -114,3 +94,32 @@ func (spammer *Spammer) run(tps int) {
}
}
}
func (spammer *Spammer) sendBurst(transactions int) {
spammingIdentity := identity.Generate()
previousTransactionId := transaction.EmptyId
burstBuffer := make([][]byte, transactions)
for i := 0; i < transactions; i++ {
select {
case <-spammer.shutdownSignal:
return
default:
spamTransaction := transaction.New(previousTransactionId, previousTransactionId, spammingIdentity, data.New([]byte("SPAM")))
previousTransactionId = spamTransaction.GetId()
burstBuffer[i] = spamTransaction.GetBytes()
}
}
for i := 0; i < transactions; i++ {
select {
case <-spammer.shutdownSignal:
return
default:
spammer.transactionParser.Parse(burstBuffer[i])
}
}
}
package builtinfilters
import (
"fmt"
"sync"
"github.com/iotaledger/hive.go/async"
"github.com/iotaledger/hive.go/bytesfilter"
)
var ErrReceivedDuplicateBytes = fmt.Errorf("received duplicate bytes")
type RecentlySeenBytesFilter struct {
bytesFilter *bytesfilter.BytesFilter
onAcceptCallback func(bytes []byte)
onRejectCallback func(bytes []byte)
onRejectCallback func(bytes []byte, err error)
workerPool async.WorkerPool
onAcceptCallbackMutex sync.RWMutex
......@@ -30,7 +33,7 @@ func (filter *RecentlySeenBytesFilter) Filter(bytes []byte) {
if filter.bytesFilter.Add(bytes) {
filter.getAcceptCallback()(bytes)
} else {
filter.getRejectCallback()(bytes)
filter.getRejectCallback()(bytes, ErrReceivedDuplicateBytes)
}
})
}
......@@ -41,7 +44,7 @@ func (filter *RecentlySeenBytesFilter) OnAccept(callback func(bytes []byte)) {
filter.onAcceptCallbackMutex.Unlock()
}
func (filter *RecentlySeenBytesFilter) OnReject(callback func(bytes []byte)) {
func (filter *RecentlySeenBytesFilter) OnReject(callback func(bytes []byte, err error)) {
filter.onRejectCallbackMutex.Lock()
filter.onRejectCallback = callback
filter.onRejectCallbackMutex.Unlock()
......@@ -55,7 +58,7 @@ func (filter *RecentlySeenBytesFilter) getAcceptCallback() (result func(bytes []
return
}
func (filter *RecentlySeenBytesFilter) getRejectCallback() (result func(bytes []byte)) {
func (filter *RecentlySeenBytesFilter) getRejectCallback() (result func(bytes []byte, err error)) {
filter.onRejectCallbackMutex.Lock()
result = filter.onRejectCallback
filter.onRejectCallbackMutex.Unlock()
......
package builtinfilters
import (
"fmt"
"sync"
"github.com/iotaledger/hive.go/async"
......@@ -8,9 +9,11 @@ import (
"github.com/iotaledger/goshimmer/packages/binary/tangle/model/transaction"
)
var ErrInvalidSignature = fmt.Errorf("invalid signature")
type TransactionSignatureFilter struct {
onAcceptCallback func(tx *transaction.Transaction)
onRejectCallback func(tx *transaction.Transaction)
onRejectCallback func(tx *transaction.Transaction, err error)
workerPool async.WorkerPool
onAcceptCallbackMutex sync.RWMutex
......@@ -28,7 +31,7 @@ func (filter *TransactionSignatureFilter) Filter(tx *transaction.Transaction) {
if tx.VerifySignature() {
filter.getAcceptCallback()(tx)
} else {
filter.getRejectCallback()(tx)
filter.getRejectCallback()(tx, ErrInvalidSignature)
}
})
}
......@@ -39,7 +42,7 @@ func (filter *TransactionSignatureFilter) OnAccept(callback func(tx *transaction
filter.onAcceptCallbackMutex.Unlock()
}
func (filter *TransactionSignatureFilter) OnReject(callback func(tx *transaction.Transaction)) {
func (filter *TransactionSignatureFilter) OnReject(callback func(tx *transaction.Transaction, err error)) {
filter.onRejectCallbackMutex.Lock()
filter.onRejectCallback = callback
filter.onRejectCallbackMutex.Unlock()
......@@ -57,7 +60,7 @@ func (filter *TransactionSignatureFilter) getAcceptCallback() (result func(tx *t
return
}
func (filter *TransactionSignatureFilter) getRejectCallback() (result func(tx *transaction.Transaction)) {
func (filter *TransactionSignatureFilter) getRejectCallback() (result func(tx *transaction.Transaction, err error)) {
filter.onRejectCallbackMutex.RLock()
result = filter.onRejectCallback
filter.onRejectCallbackMutex.RUnlock()
......
......@@ -3,6 +3,6 @@ package transactionparser
type BytesFilter interface {
Filter(bytes []byte)
OnAccept(callback func(bytes []byte))
OnReject(callback func(bytes []byte))
OnReject(callback func(bytes []byte, err error))
Shutdown()
}
......@@ -7,6 +7,6 @@ import (
type TransactionFilter interface {
Filter(tx *transaction.Transaction)
OnAccept(callback func(tx *transaction.Transaction))
OnReject(callback func(tx *transaction.Transaction))
OnReject(callback func(tx *transaction.Transaction, err error))
Shutdown()
}
......@@ -28,13 +28,13 @@ func New() (result *TransactionParser) {
Events: transactionParserEvents{
BytesRejected: events.NewEvent(func(handler interface{}, params ...interface{}) {
handler.(func([]byte))(params[0].([]byte))
handler.(func([]byte, error))(params[0].([]byte), params[1].(error))
}),
TransactionParsed: events.NewEvent(func(handler interface{}, params ...interface{}) {
handler.(func(*transaction.Transaction))(params[0].(*transaction.Transaction))
}),
TransactionRejected: events.NewEvent(func(handler interface{}, params ...interface{}) {
handler.(func(*transaction.Transaction))(params[0].(*transaction.Transaction))
handler.(func(*transaction.Transaction, error))(params[0].(*transaction.Transaction), params[1].(error))
}),
},
}
......@@ -99,7 +99,9 @@ func (transactionParser *TransactionParser) setupBytesFilterDataFlow() {
} else {
transactionParser.bytesFilters[i].OnAccept(transactionParser.bytesFilters[i+1].Filter)
}
transactionParser.bytesFilters[i].OnReject(func(bytes []byte) { transactionParser.Events.BytesRejected.Trigger(bytes) })
transactionParser.bytesFilters[i].OnReject(func(bytes []byte, err error) {
transactionParser.Events.BytesRejected.Trigger(bytes, err)
})
}
}
transactionParser.bytesFiltersMutex.Unlock()
......@@ -117,11 +119,15 @@ func (transactionParser *TransactionParser) setupTransactionsFilterDataFlow() {
numberOfTransactionFilters := len(transactionParser.transactionFilters)
for i := 0; i < numberOfTransactionFilters; i++ {
if i == numberOfTransactionFilters-1 {
transactionParser.transactionFilters[i].OnAccept(func(tx *transaction.Transaction) { transactionParser.Events.TransactionParsed.Trigger(tx) })
transactionParser.transactionFilters[i].OnAccept(func(tx *transaction.Transaction) {
transactionParser.Events.TransactionParsed.Trigger(tx)
})
} else {
transactionParser.transactionFilters[i].OnAccept(transactionParser.transactionFilters[i+1].Filter)
}
transactionParser.transactionFilters[i].OnReject(func(tx *transaction.Transaction) { transactionParser.Events.TransactionRejected.Trigger(tx) })
transactionParser.transactionFilters[i].OnReject(func(tx *transaction.Transaction, err error) {
transactionParser.Events.TransactionRejected.Trigger(tx, err)
})
}
}
transactionParser.transactionFiltersMutex.Unlock()
......@@ -129,8 +135,7 @@ func (transactionParser *TransactionParser) setupTransactionsFilterDataFlow() {
func (transactionParser *TransactionParser) parseTransaction(bytes []byte) {
if parsedTransaction, err := transaction.FromBytes(bytes); err != nil {
// trigger parsingError
panic(err)
transactionParser.Events.BytesRejected.Trigger(bytes, err)
} else {
transactionParser.transactionFilters[0].Filter(parsedTransaction)
}
......
......@@ -3,15 +3,13 @@ package shutdown
const (
ShutdownPriorityTangle = iota
ShutdownPriorityRemoteLog
ShutdownPrioritySolidifier
ShutdownPriorityBundleProcessor
ShutdownPriorityAnalysis
ShutdownPriorityMetrics
ShutdownPriorityAutopeering
ShutdownPriorityGossip
ShutdownPriorityWebAPI
ShutdownPriorityGraph
ShutdownPriorityTangleSpammer
ShutdownPrioritySPA
ShutdownPriorityGraph
ShutdownPrioritySpammer
ShutdownPriorityBadgerGarbageCollection
)
package spammer
import (
"github.com/iotaledger/hive.go/daemon"
"github.com/iotaledger/goshimmer/packages/binary/spammer"
"github.com/iotaledger/goshimmer/packages/shutdown"
"github.com/iotaledger/goshimmer/plugins/tangle"
"github.com/iotaledger/goshimmer/plugins/webapi"
......@@ -10,10 +13,18 @@ import (
var transactionSpammer *spammer.Spammer
var PLUGIN = node.NewPlugin("Spammer", node.Disabled, configure)
var PLUGIN = node.NewPlugin("Spammer", node.Disabled, configure, run)
func configure(plugin *node.Plugin) {
transactionSpammer = spammer.New(tangle.TransactionParser, tangle.TipSelector)
webapi.Server.GET("spammer", handleRequest)
}
func run(*node.Plugin) {
_ = daemon.BackgroundWorker("Tangle", func(shutdownSignal <-chan struct{}) {
<-shutdownSignal
transactionSpammer.Shutdown()
}, shutdown.ShutdownPrioritySpammer)
}
......@@ -27,6 +27,7 @@ func handleRequest(c echo.Context) error {
return c.JSON(http.StatusBadRequest, Response{Error: "burst requires the tps to be set"})
}
transactionSpammer.Shutdown()
transactionSpammer.Burst(request.Tps)
return c.JSON(http.StatusOK, Response{Message: "sent a burst of " + strconv.Itoa(request.Tps) + " transactions"})
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment