Skip to content
Snippets Groups Projects
Unverified Commit 8cd10312 authored by Levente Pap's avatar Levente Pap Committed by GitHub
Browse files

Evenly distribute issued msgs in spammer (#838)

parent 59779fdb
No related branches found
No related tags found
No related merge requests found
......@@ -4,10 +4,10 @@ import (
"sync/atomic"
"time"
"github.com/iotaledger/goshimmer/packages/tangle/payload"
"github.com/iotaledger/hive.go/types"
"github.com/iotaledger/goshimmer/packages/tangle"
"github.com/iotaledger/goshimmer/packages/tangle/payload"
"github.com/iotaledger/goshimmer/plugins/syncbeaconfollower"
"golang.org/x/xerrors"
)
// IssuePayloadFunc is a function which issues a payload.
......@@ -16,16 +16,13 @@ type IssuePayloadFunc = func(payload payload.Payload) (*tangle.Message, error)
// Spammer spams messages with a static data payload.
type Spammer struct {
issuePayloadFunc IssuePayloadFunc
processID int64
shutdownSignal chan types.Empty
processID int64
}
// New creates a new spammer.
func New(issuePayloadFunc IssuePayloadFunc) *Spammer {
return &Spammer{
issuePayloadFunc: issuePayloadFunc,
shutdownSignal: make(chan types.Empty),
}
}
......@@ -40,7 +37,8 @@ func (spammer *Spammer) Shutdown() {
}
func (spammer *Spammer) run(rate int, timeUnit time.Duration, processID int64) {
currentSentCounter := 0
// emit messages every msgInterval interval
msgInterval := time.Duration(timeUnit.Nanoseconds() / int64(rate))
start := time.Now()
for {
......@@ -49,19 +47,18 @@ func (spammer *Spammer) run(rate int, timeUnit time.Duration, processID int64) {
}
// we don't care about errors or the actual issued message
_, _ = spammer.issuePayloadFunc(payload.NewGenericDataPayload([]byte("SPAM")))
currentSentCounter++
// rate limit to the specified MPS
if currentSentCounter >= rate {
duration := time.Since(start)
if duration < timeUnit {
time.Sleep(timeUnit - duration)
}
_, err := spammer.issuePayloadFunc(payload.NewGenericDataPayload([]byte("SPAM")))
if xerrors.Is(err, syncbeaconfollower.ErrNodeNotSynchronized) {
// can't issue msg because node not in sync
return
}
start = time.Now()
currentSentCounter = 0
currentInterval := time.Since(start)
if currentInterval < msgInterval {
//there is still time, sleep until msgInterval
time.Sleep(msgInterval - currentInterval)
}
// when currentInterval > msgInterval, the node can't issue msgs as fast as requested, will do as fast as it can
start = time.Now()
}
}
package issuer
import (
"fmt"
goSync "sync"
"github.com/iotaledger/goshimmer/packages/tangle"
......@@ -9,6 +8,7 @@ import (
"github.com/iotaledger/goshimmer/plugins/messagelayer"
"github.com/iotaledger/goshimmer/plugins/syncbeaconfollower"
"github.com/iotaledger/hive.go/node"
"golang.org/x/xerrors"
)
// PluginName is the name of the issuer plugin.
......@@ -34,7 +34,7 @@ func configure(_ *node.Plugin) {}
// If the node is not synchronized an error is returned.
func IssuePayload(payload payload.Payload) (*tangle.Message, error) {
if !syncbeaconfollower.Synced() {
return nil, fmt.Errorf("can't issue payload: %w", syncbeaconfollower.ErrNodeNotSynchronized)
return nil, xerrors.Errorf("can't issue payload: %w", syncbeaconfollower.ErrNodeNotSynchronized)
}
msg, err := messagelayer.MessageFactory().IssuePayload(payload)
......
......@@ -21,9 +21,11 @@ func handleRequest(c echo.Context) error {
messageSpammer.Shutdown()
messageSpammer.Start(request.MPM, time.Minute)
log.Infof("Started spamming messages with %d MPM", request.MPM)
return c.JSON(http.StatusOK, Response{Message: "started spamming messages"})
case "stop":
messageSpammer.Shutdown()
log.Info("Stopped spamming messages")
return c.JSON(http.StatusOK, Response{Message: "stopped spamming messages"})
default:
return c.JSON(http.StatusBadRequest, Response{Error: "invalid cmd in request"})
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment