Skip to content
Snippets Groups Projects
Unverified Commit 32ed993a authored by Angelo Capossele's avatar Angelo Capossele Committed by GitHub
Browse files

Add time unit to spammer and bootstrap (#568)

* :clock3: Add time unit to spammer and bootstrap

* :rotating_light: Fix linter warning
parent bf533b56
Branches
Tags
No related merge requests found
...@@ -29,9 +29,9 @@ func New(issuePayloadFunc IssuePayloadFunc) *Spammer { ...@@ -29,9 +29,9 @@ func New(issuePayloadFunc IssuePayloadFunc) *Spammer {
} }
} }
// Start starts the spammer to spam with the given messages per second. // Start starts the spammer to spam with the given messages per time unit.
func (spammer *Spammer) Start(mps int) { func (spammer *Spammer) Start(rate int, timeUnit time.Duration) {
go spammer.run(mps, atomic.AddInt64(&spammer.processId, 1)) go spammer.run(rate, timeUnit, atomic.AddInt64(&spammer.processId, 1))
} }
// Shutdown shuts down the spammer. // Shutdown shuts down the spammer.
...@@ -39,12 +39,12 @@ func (spammer *Spammer) Shutdown() { ...@@ -39,12 +39,12 @@ func (spammer *Spammer) Shutdown() {
atomic.AddInt64(&spammer.processId, 1) atomic.AddInt64(&spammer.processId, 1)
} }
func (spammer *Spammer) run(mps int, processId int64) { func (spammer *Spammer) run(rate int, timeUnit time.Duration, processID int64) {
currentSentCounter := 0 currentSentCounter := 0
start := time.Now() start := time.Now()
for { for {
if atomic.LoadInt64(&spammer.processId) != processId { if atomic.LoadInt64(&spammer.processId) != processID {
return return
} }
...@@ -54,10 +54,10 @@ func (spammer *Spammer) run(mps int, processId int64) { ...@@ -54,10 +54,10 @@ func (spammer *Spammer) run(mps int, processId int64) {
currentSentCounter++ currentSentCounter++
// rate limit to the specified MPS // rate limit to the specified MPS
if currentSentCounter >= mps { if currentSentCounter >= rate {
duration := time.Since(start) duration := time.Since(start)
if duration < time.Second { if duration < timeUnit {
time.Sleep(time.Second - duration) time.Sleep(timeUnit - duration)
} }
start = time.Now() start = time.Now()
......
...@@ -21,8 +21,10 @@ const ( ...@@ -21,8 +21,10 @@ const (
// CfgBootstrapInitialIssuanceTimePeriodSec defines the initial time period of how long the node should be // CfgBootstrapInitialIssuanceTimePeriodSec defines the initial time period of how long the node should be
// issuing messages when started in bootstrapping mode. If the value is set to -1, the issuance is continuous. // issuing messages when started in bootstrapping mode. If the value is set to -1, the issuance is continuous.
CfgBootstrapInitialIssuanceTimePeriodSec = "bootstrap.initialIssuance.timePeriodSec" CfgBootstrapInitialIssuanceTimePeriodSec = "bootstrap.initialIssuance.timePeriodSec"
// the messages per second to issue when in bootstrapping mode. // CfgBootstrapTimeUnit defines the time unit (in seconds) of the issuance rate (e.g., 3 messages per 12 seconds).
initialIssuanceMPS = 1 CfgBootstrapTimeUnit = "bootstrap.timeUnit"
// the messages per period to issue when in bootstrapping mode.
initialIssuanceRate = 1
// the value which determines a continuous issuance of messages from the bootstrap plugin. // the value which determines a continuous issuance of messages from the bootstrap plugin.
continuousIssuance = -1 continuousIssuance = -1
) )
...@@ -30,6 +32,7 @@ const ( ...@@ -30,6 +32,7 @@ const (
func init() { func init() {
flag.Int(CfgBootstrapInitialIssuanceTimePeriodSec, -1, "the initial time period of how long the node should be issuing messages when started in bootstrapping mode. "+ flag.Int(CfgBootstrapInitialIssuanceTimePeriodSec, -1, "the initial time period of how long the node should be issuing messages when started in bootstrapping mode. "+
"if the value is set to -1, the issuance is continuous.") "if the value is set to -1, the issuance is continuous.")
flag.Int(CfgBootstrapTimeUnit, 5, "the time unit (in seconds) of the issuance rate (e.g., 1 messages per 5 seconds).")
} }
var ( var (
...@@ -58,11 +61,15 @@ func run(_ *node.Plugin) { ...@@ -58,11 +61,15 @@ func run(_ *node.Plugin) {
messageSpammer := spammer.New(issuer.IssuePayload) messageSpammer := spammer.New(issuer.IssuePayload)
issuancePeriodSec := config.Node().GetInt(CfgBootstrapInitialIssuanceTimePeriodSec) issuancePeriodSec := config.Node().GetInt(CfgBootstrapInitialIssuanceTimePeriodSec)
timeUnit := config.Node().GetInt(CfgBootstrapTimeUnit)
if timeUnit <= 0 {
log.Panicf("Invalid Bootsrap time unit: %d seconds", timeUnit)
}
issuancePeriod := time.Duration(issuancePeriodSec) * time.Second issuancePeriod := time.Duration(issuancePeriodSec) * time.Second
// issue messages on top of the genesis // issue messages on top of the genesis
if err := daemon.BackgroundWorker("Bootstrapping-Issuer", func(shutdownSignal <-chan struct{}) { if err := daemon.BackgroundWorker("Bootstrapping-Issuer", func(shutdownSignal <-chan struct{}) {
messageSpammer.Start(initialIssuanceMPS) messageSpammer.Start(initialIssuanceRate, time.Duration(timeUnit)*time.Second)
defer messageSpammer.Shutdown() defer messageSpammer.Shutdown()
// don't stop issuing messages if in continuous mode // don't stop issuing messages if in continuous mode
if issuancePeriodSec == continuousIssuance { if issuancePeriodSec == continuousIssuance {
......
...@@ -2,6 +2,7 @@ package spammer ...@@ -2,6 +2,7 @@ package spammer
import ( import (
"net/http" "net/http"
"time"
"github.com/labstack/echo" "github.com/labstack/echo"
) )
...@@ -19,7 +20,7 @@ func handleRequest(c echo.Context) error { ...@@ -19,7 +20,7 @@ func handleRequest(c echo.Context) error {
} }
messageSpammer.Shutdown() messageSpammer.Shutdown()
messageSpammer.Start(request.MPS) messageSpammer.Start(request.MPS, time.Second)
return c.JSON(http.StatusOK, Response{Message: "started spamming messages"}) return c.JSON(http.StatusOK, Response{Message: "started spamming messages"})
case "stop": case "stop":
messageSpammer.Shutdown() messageSpammer.Shutdown()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment