diff --git a/main.go b/main.go index 91ec88b8e32820b238b744768ba952c3ab681374..7ab4b3abc41f3f141454f7493c241cce9e1e6d75 100644 --- a/main.go +++ b/main.go @@ -4,6 +4,7 @@ import ( "github.com/iotaledger/goshimmer/packages/node" "github.com/iotaledger/goshimmer/plugins/analysis" "github.com/iotaledger/goshimmer/plugins/autopeering" + "github.com/iotaledger/goshimmer/plugins/bundleprocessor" "github.com/iotaledger/goshimmer/plugins/cli" "github.com/iotaledger/goshimmer/plugins/gossip" gossip_on_solidification "github.com/iotaledger/goshimmer/plugins/gossip-on-solidification" @@ -25,6 +26,7 @@ func main() { gossip.PLUGIN, gossip_on_solidification.PLUGIN, tangle.PLUGIN, + bundleprocessor.PLUGIN, analysis.PLUGIN, gracefulshutdown.PLUGIN, tipselection.PLUGIN, diff --git a/packages/batchworkerpool/batchworkerpool.go b/packages/batchworkerpool/batchworkerpool.go new file mode 100644 index 0000000000000000000000000000000000000000..071015ff28bea5774fda0b0280231ef679fa8051 --- /dev/null +++ b/packages/batchworkerpool/batchworkerpool.go @@ -0,0 +1,163 @@ +package batchworkerpool + +import ( + "sync" + "time" +) + +type BatchWorkerPool struct { + workerFnc func([]Task) + options *Options + + calls chan Task + batchedCalls chan []Task + terminate chan int + + running bool + mutex sync.RWMutex + wait sync.WaitGroup +} + +func New(workerFnc func([]Task), optionalOptions ...Option) (result *BatchWorkerPool) { + options := DEFAULT_OPTIONS.Override(optionalOptions...) + + result = &BatchWorkerPool{ + workerFnc: workerFnc, + options: options, + } + + result.resetChannels() + + return +} + +func (wp *BatchWorkerPool) Submit(params ...interface{}) (result chan interface{}) { + result = make(chan interface{}, 1) + + wp.mutex.RLock() + + if wp.running { + wp.calls <- Task{ + params: params, + resultChan: result, + } + } else { + close(result) + } + + wp.mutex.RUnlock() + + return +} + +func (wp *BatchWorkerPool) Start() { + wp.mutex.Lock() + + if !wp.running { + wp.running = true + + wp.startBatchDispatcher() + wp.startBatchWorkers() + } + + wp.mutex.Unlock() +} + +func (wp *BatchWorkerPool) Run() { + wp.Start() + + wp.wait.Wait() +} + +func (wp *BatchWorkerPool) Stop() { + go wp.StopAndWait() +} + +func (wp *BatchWorkerPool) StopAndWait() { + wp.mutex.Lock() + + if wp.running { + wp.running = false + + close(wp.terminate) + wp.resetChannels() + } + + wp.wait.Wait() + + wp.mutex.Unlock() +} + +func (wp *BatchWorkerPool) resetChannels() { + wp.calls = make(chan Task, wp.options.QueueSize) + wp.batchedCalls = make(chan []Task, 2*wp.options.WorkerCount) + wp.terminate = make(chan int, 1) +} + +func (wp *BatchWorkerPool) startBatchDispatcher() { + calls := wp.calls + terminate := wp.terminate + + wp.wait.Add(1) + + go func() { + for { + select { + case <-terminate: + wp.wait.Done() + + return + case firstCall := <-calls: + batchTask := append(make([]Task, 0), firstCall) + + collectionTimeout := time.After(wp.options.BatchCollectionTimeout) + + // collect additional requests that arrive within the timeout + CollectAdditionalCalls: + for { + select { + case <-terminate: + wp.wait.Done() + + return + case <-collectionTimeout: + break CollectAdditionalCalls + case call := <-wp.calls: + batchTask = append(batchTask, call) + + if len(batchTask) == wp.options.BatchSize { + break CollectAdditionalCalls + } + } + } + + wp.batchedCalls <- batchTask + } + } + }() +} + +func (wp *BatchWorkerPool) startBatchWorkers() { + batchedCalls := wp.batchedCalls + terminate := wp.terminate + + for i := 0; i < wp.options.WorkerCount; i++ { + wp.wait.Add(1) + + go func() { + aborted := false + + for !aborted { + select { + case <-terminate: + aborted = true + + case batchTask := <-batchedCalls: + wp.workerFnc(batchTask) + } + } + + wp.wait.Done() + }() + } +} diff --git a/packages/batchworkerpool/options.go b/packages/batchworkerpool/options.go new file mode 100644 index 0000000000000000000000000000000000000000..47e53b75931c1b7b9c065fd344fac684fb3adccc --- /dev/null +++ b/packages/batchworkerpool/options.go @@ -0,0 +1,55 @@ +package batchworkerpool + +import ( + "runtime" + "time" +) + +var DEFAULT_OPTIONS = &Options{ + WorkerCount: 2 * runtime.NumCPU(), + QueueSize: 2 * runtime.NumCPU() * 64, + BatchSize: 64, + BatchCollectionTimeout: 15 * time.Millisecond, +} + +func WorkerCount(workerCount int) Option { + return func(args *Options) { + args.WorkerCount = workerCount + } +} + +func BatchSize(batchSize int) Option { + return func(args *Options) { + args.BatchSize = batchSize + } +} + +func BatchCollectionTimeout(batchCollectionTimeout time.Duration) Option { + return func(args *Options) { + args.BatchCollectionTimeout = batchCollectionTimeout + } +} + +func QueueSize(queueSize int) Option { + return func(args *Options) { + args.QueueSize = queueSize + } +} + +type Options struct { + WorkerCount int + QueueSize int + BatchSize int + BatchCollectionTimeout time.Duration +} + +func (options Options) Override(optionalOptions ...Option) *Options { + result := &options + for _, option := range optionalOptions { + option(result) + } + + return result +} + +type Option func(*Options) diff --git a/packages/batchworkerpool/task.go b/packages/batchworkerpool/task.go new file mode 100644 index 0000000000000000000000000000000000000000..7d4ea82e80b0ff0e44714bdea19851d6f6ea329f --- /dev/null +++ b/packages/batchworkerpool/task.go @@ -0,0 +1,15 @@ +package batchworkerpool + +type Task struct { + params []interface{} + resultChan chan interface{} +} + +func (task *Task) Return(result interface{}) { + task.resultChan <- result + close(task.resultChan) +} + +func (task *Task) Param(index int) interface{} { + return task.params[index] +} diff --git a/packages/curl/batch_hasher.go b/packages/curl/batch_hasher.go index b9a4059fc8392eb9bc90554a390e6a4568f98178..a04ec2315c9139da51b23fc7eeb672a5148da765 100644 --- a/packages/curl/batch_hasher.go +++ b/packages/curl/batch_hasher.go @@ -3,80 +3,40 @@ package curl import ( "fmt" "strconv" - "time" + "github.com/iotaledger/goshimmer/packages/batchworkerpool" "github.com/iotaledger/goshimmer/packages/ternary" "github.com/iotaledger/iota.go/trinary" ) -type HashRequest struct { - input trinary.Trits - output chan trinary.Trits -} - type BatchHasher struct { - hashRequests chan HashRequest - tasks chan []HashRequest - hashLength int - rounds int + hashLength int + rounds int + workerPool *batchworkerpool.BatchWorkerPool } -func NewBatchHasher(hashLength int, rounds int) *BatchHasher { - this := &BatchHasher{ - hashLength: hashLength, - rounds: rounds, - hashRequests: make(chan HashRequest), - tasks: make(chan []HashRequest, NUMBER_OF_WORKERS), +func NewBatchHasher(hashLength int, rounds int) (result *BatchHasher) { + result = &BatchHasher{ + hashLength: hashLength, + rounds: rounds, } - go this.startDispatcher() - this.startWorkers() + result.workerPool = batchworkerpool.New(result.processHashes, batchworkerpool.BatchSize(strconv.IntSize)) + result.workerPool.Start() - return this + return } -func (this *BatchHasher) startWorkers() { - for i := 0; i < NUMBER_OF_WORKERS; i++ { - go func() { - for { - this.processHashes(<-this.tasks) - } - }() - } +func (this *BatchHasher) Hash(trits ternary.Trits) ternary.Trits { + return (<-this.workerPool.Submit(trits)).(ternary.Trits) } -func (this *BatchHasher) startDispatcher() { - for { - collectedHashRequests := make([]HashRequest, 0) - - // wait for first request to start processing at all - collectedHashRequests = append(collectedHashRequests, <-this.hashRequests) - - // collect additional requests that arrive within the timeout - CollectAdditionalRequests: - for { - select { - case hashRequest := <-this.hashRequests: - collectedHashRequests = append(collectedHashRequests, hashRequest) - - if len(collectedHashRequests) == strconv.IntSize { - break CollectAdditionalRequests - } - case <-time.After(50 * time.Millisecond): - break CollectAdditionalRequests - } - } - - this.tasks <- collectedHashRequests - } -} - -func (this *BatchHasher) processHashes(collectedHashRequests []HashRequest) { - if len(collectedHashRequests) > 1 { +func (this *BatchHasher) processHashes(tasks []batchworkerpool.Task) { + if len(tasks) > 1 { // multiplex the requests multiplexer := ternary.NewBCTernaryMultiplexer() - for _, hashRequest := range collectedHashRequests { - multiplexer.Add(hashRequest.input) + for _, hashRequest := range tasks { + multiplexer.Add(hashRequest.Param(0).(ternary.Trits)) } bcTrits, err := multiplexer.Extract() if err != nil { @@ -90,33 +50,18 @@ func (this *BatchHasher) processHashes(collectedHashRequests []HashRequest) { // extract the results from the demultiplexer demux := ternary.NewBCTernaryDemultiplexer(bctCurl.Squeeze(243)) - for i, hashRequest := range collectedHashRequests { - hashRequest.output <- demux.Get(i) - close(hashRequest.output) + for i, task := range tasks { + task.Return(demux.Get(i)) } } else { var resp = make(trinary.Trits, this.hashLength) + trits := tasks[0].Param(0).(ternary.Trits) + curl := NewCurl(this.hashLength, this.rounds) - curl.Absorb(collectedHashRequests[0].input, 0, len(collectedHashRequests[0].input)) + curl.Absorb(trits, 0, len(trits)) curl.Squeeze(resp, 0, this.hashLength) - collectedHashRequests[0].output <- resp - close(collectedHashRequests[0].output) + tasks[0].Return(resp) } } - -func (this *BatchHasher) Hash(trits trinary.Trits) chan trinary.Trits { - hashRequest := HashRequest{ - input: trits, - output: make(chan trinary.Trits, 1), - } - - this.hashRequests <- hashRequest - - return hashRequest.output -} - -const ( - NUMBER_OF_WORKERS = 1000 -) diff --git a/packages/curl/batch_hasher_test.go b/packages/curl/batch_hasher_test.go new file mode 100644 index 0000000000000000000000000000000000000000..9c9c3eec8f53c77e940f6121dee18291d1eddbef --- /dev/null +++ b/packages/curl/batch_hasher_test.go @@ -0,0 +1,27 @@ +package curl + +import ( + "sync" + "testing" + + "github.com/iotaledger/goshimmer/packages/ternary" +) + +func BenchmarkBatchHasher_Hash(b *testing.B) { + batchHasher := NewBatchHasher(243, 81) + tritsToHash := ternary.Trytes("A999999FF").ToTrits() + + b.ResetTimer() + + var wg sync.WaitGroup + for i := 0; i < b.N; i++ { + wg.Add(1) + + go func() { + batchHasher.Hash(tritsToHash) + + wg.Done() + }() + } + wg.Wait() +} diff --git a/packages/daemon/daemon.go b/packages/daemon/daemon.go index 404fccd00b25674bc39961c87bcd7ffcc6ca7994..d6961424582e40699ebe97b69f56593b5b61d9d8 100644 --- a/packages/daemon/daemon.go +++ b/packages/daemon/daemon.go @@ -5,29 +5,53 @@ import ( ) var ( - running bool - wg sync.WaitGroup - ShutdownSignal = make(chan int, 1) - backgroundWorkers = make([]func(), 0) - lock = sync.Mutex{} + running bool + wg sync.WaitGroup + ShutdownSignal = make(chan int, 1) + backgroundWorkers = make([]func(), 0) + backgroundWorkerNames = make([]string, 0) + runningBackgroundWorkers = make(map[string]bool) + lock = sync.Mutex{} ) -func runBackgroundWorker(backgroundWorker func()) { +func GetRunningBackgroundWorkers() []string { + lock.Lock() + + result := make([]string, 0) + for runningBackgroundWorker := range runningBackgroundWorkers { + result = append(result, runningBackgroundWorker) + } + + lock.Unlock() + + return result +} + +func runBackgroundWorker(name string, backgroundWorker func()) { wg.Add(1) go func() { + lock.Lock() + runningBackgroundWorkers[name] = true + lock.Unlock() + backgroundWorker() + lock.Lock() + delete(runningBackgroundWorkers, name) + lock.Unlock() + wg.Done() }() } -func BackgroundWorker(handler func()) { +func BackgroundWorker(name string, handler func()) { lock.Lock() if IsRunning() { - runBackgroundWorker(handler) + runBackgroundWorker(name, handler) } else { + backgroundWorkerNames = append(backgroundWorkerNames, name) backgroundWorkers = append(backgroundWorkers, handler) } @@ -45,8 +69,8 @@ func Run() { Events.Run.Trigger() - for _, backgroundWorker := range backgroundWorkers { - runBackgroundWorker(backgroundWorker) + for i, backgroundWorker := range backgroundWorkers { + runBackgroundWorker(backgroundWorkerNames[i], backgroundWorker) } } diff --git a/packages/model/meta_transaction/meta_transaction.go b/packages/model/meta_transaction/meta_transaction.go index cd78f4085dd74a162f4ffa4c9f37169e182c6de8..575ea6b435bc775e00cc253fa362ce8f3ee95153 100644 --- a/packages/model/meta_transaction/meta_transaction.go +++ b/packages/model/meta_transaction/meta_transaction.go @@ -120,7 +120,7 @@ func (this *MetaTransaction) GetWeightMagnitude() (result int) { // hashes the transaction using curl (without locking - internal usage) func (this *MetaTransaction) parseHashRelatedDetails() { - hashTrits := <-curl.CURLP81.Hash(this.trits) + hashTrits := curl.CURLP81.Hash(this.trits) hashTrytes := trinary.MustTritsToTrytes(hashTrits) this.hash = &hashTrytes diff --git a/packages/node/plugin.go b/packages/node/plugin.go index a98a22beff880d7b73b210c14e38f0f6730a1ce1..1a6cb527a5eb3d5cd3aa0488e3ce22d2b7fb74e5 100644 --- a/packages/node/plugin.go +++ b/packages/node/plugin.go @@ -55,3 +55,16 @@ func (plugin *Plugin) LogFailure(message string) { func (plugin *Plugin) LogDebug(message string) { plugin.Node.LogDebug(plugin.Name, message) } + +var TestNode = &Node{ + loggers: make([]*Logger, 0), + wg: &sync.WaitGroup{}, + loadedPlugins: make([]*Plugin, 0), +} + +func (plugin *Plugin) InitTest() { + plugin.Node = TestNode + + plugin.Events.Configure.Trigger(plugin) + plugin.Events.Run.Trigger(plugin) +} diff --git a/packages/transactionspammer/transactionspammer.go b/packages/transactionspammer/transactionspammer.go index 503a0f5af72c6f3120b0749ebb30bf5778ba3cd5..c59c6f544f888b83025376fc9b4e0793fb5e83ad 100644 --- a/packages/transactionspammer/transactionspammer.go +++ b/packages/transactionspammer/transactionspammer.go @@ -23,7 +23,7 @@ func Start(tps int64) { shutdownSignal = make(chan int, 1) func(shutdownSignal chan int) { - daemon.BackgroundWorker(func() { + daemon.BackgroundWorker("Transaction Spammer", func() { for { start := time.Now() sentCounter := int64(0) diff --git a/packages/workerpool/options.go b/packages/workerpool/options.go new file mode 100644 index 0000000000000000000000000000000000000000..dc6b27e4da9145b0a34101dfd22696c76b63d83e --- /dev/null +++ b/packages/workerpool/options.go @@ -0,0 +1,38 @@ +package workerpool + +import ( + "runtime" +) + +var DEFAULT_OPTIONS = &Options{ + WorkerCount: 2 * runtime.NumCPU(), + QueueSize: 4 * runtime.NumCPU(), +} + +func WorkerCount(workerCount int) Option { + return func(args *Options) { + args.WorkerCount = workerCount + } +} + +func QueueSize(queueSize int) Option { + return func(args *Options) { + args.QueueSize = queueSize + } +} + +type Options struct { + WorkerCount int + QueueSize int +} + +func (options Options) Override(optionalOptions ...Option) *Options { + result := &options + for _, option := range optionalOptions { + option(result) + } + + return result +} + +type Option func(*Options) diff --git a/packages/workerpool/task.go b/packages/workerpool/task.go new file mode 100644 index 0000000000000000000000000000000000000000..8c56b1a9274f7e16df7fa02f47e53553703158cd --- /dev/null +++ b/packages/workerpool/task.go @@ -0,0 +1,15 @@ +package workerpool + +type Task struct { + params []interface{} + resultChan chan interface{} +} + +func (task *Task) Return(result interface{}) { + task.resultChan <- result + close(task.resultChan) +} + +func (task *Task) Param(index int) interface{} { + return task.params[index] +} diff --git a/packages/workerpool/workerpool.go b/packages/workerpool/workerpool.go new file mode 100644 index 0000000000000000000000000000000000000000..95b6a2a7d2814b152b9d8320d2f3873bee0a428b --- /dev/null +++ b/packages/workerpool/workerpool.go @@ -0,0 +1,116 @@ +package workerpool + +import ( + "sync" +) + +type WorkerPool struct { + workerFnc func(Task) + options *Options + + calls chan Task + terminate chan int + + running bool + mutex sync.RWMutex + wait sync.WaitGroup +} + +func New(workerFnc func(Task), optionalOptions ...Option) (result *WorkerPool) { + options := DEFAULT_OPTIONS.Override(optionalOptions...) + + result = &WorkerPool{ + workerFnc: workerFnc, + options: options, + } + + result.resetChannels() + + return +} + +func (wp *WorkerPool) Submit(params ...interface{}) (result chan interface{}) { + result = make(chan interface{}, 1) + + wp.mutex.RLock() + + if wp.running { + wp.calls <- Task{ + params: params, + resultChan: result, + } + } else { + close(result) + } + + wp.mutex.RUnlock() + + return +} + +func (wp *WorkerPool) Start() { + wp.mutex.Lock() + + if !wp.running { + wp.running = true + + wp.startWorkers() + } + + wp.mutex.Unlock() +} + +func (wp *WorkerPool) Run() { + wp.Start() + + wp.wait.Wait() +} + +func (wp *WorkerPool) Stop() { + go wp.StopAndWait() +} + +func (wp *WorkerPool) StopAndWait() { + wp.mutex.Lock() + + if wp.running { + wp.running = false + + close(wp.terminate) + wp.resetChannels() + } + + wp.wait.Wait() + + wp.mutex.Unlock() +} + +func (wp *WorkerPool) resetChannels() { + wp.calls = make(chan Task, wp.options.QueueSize) + wp.terminate = make(chan int, 1) +} + +func (wp *WorkerPool) startWorkers() { + calls := wp.calls + terminate := wp.terminate + + for i := 0; i < wp.options.WorkerCount; i++ { + wp.wait.Add(1) + + go func() { + aborted := false + + for !aborted { + select { + case <-terminate: + aborted = true + + case batchTask := <-calls: + wp.workerFnc(batchTask) + } + } + + wp.wait.Done() + }() + } +} diff --git a/packages/workerpool/workerpool_test.go b/packages/workerpool/workerpool_test.go new file mode 100644 index 0000000000000000000000000000000000000000..fb8f83e38adfe543488cc7170d77776fc74d66ca --- /dev/null +++ b/packages/workerpool/workerpool_test.go @@ -0,0 +1,26 @@ +package workerpool + +import ( + "sync" + "testing" +) + +func Benchmark(b *testing.B) { + pool := New(func(task Task) { + task.Return(task.Param(0)) + }, WorkerCount(10), QueueSize(2000)) + pool.Start() + + var wg sync.WaitGroup + for i := 0; i < b.N; i++ { + wg.Add(1) + + go func() { + <-pool.Submit(i) + + wg.Done() + }() + } + + wg.Wait() +} diff --git a/plugins/analysis/client/plugin.go b/plugins/analysis/client/plugin.go index 3a42c55ae0a0af094e138b446b3ddcadb5f11e73..b73dba38dec66d241a32edf020c8946101b384ed 100644 --- a/plugins/analysis/client/plugin.go +++ b/plugins/analysis/client/plugin.go @@ -21,7 +21,7 @@ import ( ) func Run(plugin *node.Plugin) { - daemon.BackgroundWorker(func() { + daemon.BackgroundWorker("Analysis Client", func() { shuttingDown := false for !shuttingDown { diff --git a/plugins/analysis/server/plugin.go b/plugins/analysis/server/plugin.go index 86522f1cda3bd0388c24ab1b0a3a065480b8e39f..cbcbb7259b81acf2e4177bc91fa4a7435b321363 100644 --- a/plugins/analysis/server/plugin.go +++ b/plugins/analysis/server/plugin.go @@ -36,7 +36,7 @@ func Configure(plugin *node.Plugin) { } func Run(plugin *node.Plugin) { - daemon.BackgroundWorker(func() { + daemon.BackgroundWorker("Analysis Server", func() { plugin.LogInfo("Starting Server (port " + strconv.Itoa(*SERVER_PORT.Value) + ") ...") server.Listen(*SERVER_PORT.Value) diff --git a/plugins/analysis/webinterface/httpserver/plugin.go b/plugins/analysis/webinterface/httpserver/plugin.go index 6f5edbb8ea736c8be04d5176ce5bf9e24f2b324d..a6b62cbb12f33087fbdf7e58b2b2eb2dfacd8833 100644 --- a/plugins/analysis/webinterface/httpserver/plugin.go +++ b/plugins/analysis/webinterface/httpserver/plugin.go @@ -32,7 +32,7 @@ func Configure(plugin *node.Plugin) { } func Run(plugin *node.Plugin) { - daemon.BackgroundWorker(func() { + daemon.BackgroundWorker("Analysis HTTP Server", func() { httpServer.ListenAndServe() }) } diff --git a/plugins/autopeering/instances/neighborhood/instance.go b/plugins/autopeering/instances/neighborhood/instance.go index 5a94afd36cdc4f2906a51593c6b2f5ea06e7aa18..d5791d22dd74e273906d892b2b887ff27491c29a 100644 --- a/plugins/autopeering/instances/neighborhood/instance.go +++ b/plugins/autopeering/instances/neighborhood/instance.go @@ -35,7 +35,7 @@ func Configure(plugin *node.Plugin) { } func Run(plugin *node.Plugin) { - daemon.BackgroundWorker(func() { + daemon.BackgroundWorker("Neighborhood Updater", func() { timeutil.Ticker(updateNeighborHood, 1*time.Second) }) } diff --git a/plugins/autopeering/protocol/outgoing_request_processor.go b/plugins/autopeering/protocol/outgoing_request_processor.go index cec618cb4827d68a59239e0d8d4dff916969da4c..31fb40cf38a562cabba789b1a121b32958563c2b 100644 --- a/plugins/autopeering/protocol/outgoing_request_processor.go +++ b/plugins/autopeering/protocol/outgoing_request_processor.go @@ -41,7 +41,11 @@ func createOutgoingRequestProcessor(plugin *node.Plugin) func() { func sendOutgoingRequests(plugin *node.Plugin) { for _, chosenNeighborCandidate := range chosenneighbors.CANDIDATES.Clone() { - time.Sleep(5 * time.Second) + select { + case <-daemon.ShutdownSignal: + return + case <-time.After(5 * time.Second): + } if candidateShouldBeContacted(chosenNeighborCandidate) { if dialed, err := chosenNeighborCandidate.Send(outgoingrequest.INSTANCE.Marshal(), types.PROTOCOL_TYPE_TCP, true); err != nil { diff --git a/plugins/autopeering/protocol/plugin.go b/plugins/autopeering/protocol/plugin.go index fce222652b9d551a6aa84c8c15617795e68ba592..a1b175fc871798bd195e5b63e055f0792611cdf6 100644 --- a/plugins/autopeering/protocol/plugin.go +++ b/plugins/autopeering/protocol/plugin.go @@ -20,8 +20,8 @@ func Configure(plugin *node.Plugin) { } func Run(plugin *node.Plugin) { - daemon.BackgroundWorker(createChosenNeighborDropper(plugin)) - daemon.BackgroundWorker(createAcceptedNeighborDropper(plugin)) - daemon.BackgroundWorker(createOutgoingRequestProcessor(plugin)) - daemon.BackgroundWorker(createOutgoingPingProcessor(plugin)) + daemon.BackgroundWorker("Autopeering Chosen Neighbor Dropper", createChosenNeighborDropper(plugin)) + daemon.BackgroundWorker("Autopeering Accepted Neighbor Dropper", createAcceptedNeighborDropper(plugin)) + daemon.BackgroundWorker("Autopeering Outgoing Request Processor", createOutgoingRequestProcessor(plugin)) + daemon.BackgroundWorker("Autopeering Outgoing Ping Processor", createOutgoingPingProcessor(plugin)) } diff --git a/plugins/autopeering/saltmanager/saltmanager.go b/plugins/autopeering/saltmanager/saltmanager.go index 5127646aeb0c3da71350732861d9fbb70c0a381e..0004b7f68791561edf5833dea36d130c126f0a86 100644 --- a/plugins/autopeering/saltmanager/saltmanager.go +++ b/plugins/autopeering/saltmanager/saltmanager.go @@ -68,7 +68,7 @@ func scheduleUpdateForSalt(saltToUpdate *salt.Salt, settingsKey []byte, lifeSpan if saltToUpdate.ExpirationTime.Before(now) { updatePublicSalt(saltToUpdate, settingsKey, lifeSpan, callback) } else { - daemon.BackgroundWorker(func() { + daemon.BackgroundWorker("Salt Updater", func() { select { case <-time.After(saltToUpdate.ExpirationTime.Sub(now)): updatePublicSalt(saltToUpdate, settingsKey, lifeSpan, callback) diff --git a/plugins/autopeering/server/tcp/server.go b/plugins/autopeering/server/tcp/server.go index 3d4977aa395efac4ca5a0326dc9bc0f54c3b87ff..165c098e56290c712655513414017c99c53f3a5b 100644 --- a/plugins/autopeering/server/tcp/server.go +++ b/plugins/autopeering/server/tcp/server.go @@ -37,7 +37,7 @@ func ConfigureServer(plugin *node.Plugin) { } func RunServer(plugin *node.Plugin) { - daemon.BackgroundWorker(func() { + daemon.BackgroundWorker("Autopeering TCP Server", func() { if *parameters.ADDRESS.Value == "0.0.0.0" { plugin.LogInfo("Starting TCP Server (port " + strconv.Itoa(*parameters.PORT.Value) + ") ...") } else { diff --git a/plugins/autopeering/server/udp/server.go b/plugins/autopeering/server/udp/server.go index 01e66fade4fab0bad39baa0785d751511dec4189..9d01a18977262c393608c46bcfdfbc63d34d983d 100644 --- a/plugins/autopeering/server/udp/server.go +++ b/plugins/autopeering/server/udp/server.go @@ -41,7 +41,7 @@ func ConfigureServer(plugin *node.Plugin) { } func RunServer(plugin *node.Plugin) { - daemon.BackgroundWorker(func() { + daemon.BackgroundWorker("Autopeering UDP Server", func() { if *parameters.ADDRESS.Value == "0.0.0.0" { plugin.LogInfo("Starting UDP Server (port " + strconv.Itoa(*parameters.PORT.Value) + ") ...") } else { diff --git a/plugins/bundleprocessor/bundleprocessor.go b/plugins/bundleprocessor/bundleprocessor.go new file mode 100644 index 0000000000000000000000000000000000000000..8f49b8d5ee7c51193bf68069127551bb0f9825c6 --- /dev/null +++ b/plugins/bundleprocessor/bundleprocessor.go @@ -0,0 +1,77 @@ +package bundleprocessor + +import ( + "github.com/iotaledger/goshimmer/packages/errors" + "github.com/iotaledger/goshimmer/packages/model/bundle" + "github.com/iotaledger/goshimmer/packages/model/transactionmetadata" + "github.com/iotaledger/goshimmer/packages/model/value_transaction" + "github.com/iotaledger/goshimmer/packages/ternary" + "github.com/iotaledger/goshimmer/plugins/tangle" +) + +func ProcessSolidBundleHead(headTransaction *value_transaction.ValueTransaction) (*bundle.Bundle, errors.IdentifiableError) { + // only process the bundle if we didn't process it, yet + return tangle.GetBundle(headTransaction.GetHash(), func(headTransactionHash ternary.Trytes) (*bundle.Bundle, errors.IdentifiableError) { + // abort if bundle syntax is wrong + if !headTransaction.IsHead() { + return nil, ErrProcessBundleFailed.Derive(errors.New("invalid parameter"), "transaction needs to be head of bundle") + } + + // initialize event variables + newBundle := bundle.New(headTransactionHash) + bundleTransactions := make([]*value_transaction.ValueTransaction, 0) + + // iterate through trunk transactions until we reach the tail + currentTransaction := headTransaction + for { + // abort if we reached a previous head + if currentTransaction.IsHead() && currentTransaction != headTransaction { + newBundle.SetTransactionHashes(mapTransactionsToTransactionHashes(bundleTransactions)) + + Events.InvalidBundle.Trigger(newBundle, bundleTransactions) + + return nil, ErrProcessBundleFailed.Derive(errors.New("invalid bundle found"), "missing bundle tail") + } + + // update bundle transactions + bundleTransactions = append(bundleTransactions, currentTransaction) + + // retrieve & update metadata + currentTransactionMetadata, dbErr := tangle.GetTransactionMetadata(currentTransaction.GetHash(), transactionmetadata.New) + if dbErr != nil { + return nil, ErrProcessBundleFailed.Derive(dbErr, "failed to retrieve transaction metadata") + } + currentTransactionMetadata.SetBundleHeadHash(headTransactionHash) + + // update value bundle flag + if !newBundle.IsValueBundle() && currentTransaction.GetValue() != 0 { + newBundle.SetValueBundle(true) + } + + // if we are done -> trigger events + if currentTransaction.IsTail() { + newBundle.SetTransactionHashes(mapTransactionsToTransactionHashes(bundleTransactions)) + + Events.BundleSolid.Trigger(newBundle, bundleTransactions) + + return newBundle, nil + } + + // try to iterate to next turn + if nextTransaction, err := tangle.GetTransaction(currentTransaction.GetTrunkTransactionHash()); err != nil { + return nil, ErrProcessBundleFailed.Derive(err, "failed to retrieve trunk while processing bundle") + } else { + currentTransaction = nextTransaction + } + } + }) +} + +func mapTransactionsToTransactionHashes(transactions []*value_transaction.ValueTransaction) (result []ternary.Trytes) { + result = make([]ternary.Trytes, len(transactions)) + for k, v := range transactions { + result[k] = v.GetHash() + } + + return +} diff --git a/plugins/bundleprocessor/bundleprocessor_test.go b/plugins/bundleprocessor/bundleprocessor_test.go new file mode 100644 index 0000000000000000000000000000000000000000..f2132dc94a610ccb2fd013f0b993e6d5b1bbc05f --- /dev/null +++ b/plugins/bundleprocessor/bundleprocessor_test.go @@ -0,0 +1,41 @@ +package bundleprocessor + +import ( + "fmt" + "testing" + + "github.com/iotaledger/goshimmer/packages/events" + "github.com/iotaledger/goshimmer/packages/model/bundle" + "github.com/iotaledger/goshimmer/packages/model/value_transaction" + "github.com/iotaledger/goshimmer/packages/ternary" + "github.com/iotaledger/goshimmer/plugins/tangle" + "github.com/magiconair/properties/assert" +) + +func TestProcessSolidBundleHead(t *testing.T) { + tangle.PLUGIN.InitTest() + + tx := value_transaction.New() + tx.SetTail(true) + tx.SetValue(3) + + tx1 := value_transaction.New() + tx1.SetTrunkTransactionHash(tx.GetHash()) + tx1.SetHead(true) + + tangle.StoreTransaction(tx) + tangle.StoreTransaction(tx1) + + Events.BundleSolid.Attach(events.NewClosure(func(bundle *bundle.Bundle, transactions []*value_transaction.ValueTransaction) { + fmt.Println("IT HAPPENED") + fmt.Println(bundle.GetHash()) + })) + + result, err := ProcessSolidBundleHead(tx1) + if err != nil { + t.Error(err) + } else { + assert.Equal(t, result.GetHash(), ternary.Trytes("UFWJYEWKMEQDNSQUCUWBGOFRHVBGHVVYEZCLCGRDTRQSMAFALTIPMJEEYFDPMQCNJWLXUWFMBZGHQRO99"), "invalid bundle hash") + assert.Equal(t, result.IsValueBundle(), true, "invalid value bundle status") + } +} diff --git a/plugins/bundleprocessor/events.go b/plugins/bundleprocessor/events.go index 19f4ca0424b193c179d092af2bb18ad64b37c919..2d67f5da2020893b66a6fdcc6bf6df8ff1a01c6b 100644 --- a/plugins/bundleprocessor/events.go +++ b/plugins/bundleprocessor/events.go @@ -7,15 +7,13 @@ import ( ) var Events = pluginEvents{ - DataBundleReceived: events.NewEvent(bundleEventCaller), - ValueBundleReceived: events.NewEvent(bundleEventCaller), - InvalidBundleReceived: events.NewEvent(bundleEventCaller), + BundleSolid: events.NewEvent(bundleEventCaller), + InvalidBundle: events.NewEvent(bundleEventCaller), } type pluginEvents struct { - DataBundleReceived *events.Event - ValueBundleReceived *events.Event - InvalidBundleReceived *events.Event + BundleSolid *events.Event + InvalidBundle *events.Event } func bundleEventCaller(handler interface{}, params ...interface{}) { diff --git a/plugins/bundleprocessor/plugin.go b/plugins/bundleprocessor/plugin.go index 3ff2a85762d23019dd6cc0bb8594ce15a24163c9..89930b024c288180112c5a305b53a0dbff2a138c 100644 --- a/plugins/bundleprocessor/plugin.go +++ b/plugins/bundleprocessor/plugin.go @@ -1,95 +1,49 @@ package bundleprocessor import ( - "github.com/iotaledger/goshimmer/packages/errors" + "github.com/iotaledger/goshimmer/packages/daemon" "github.com/iotaledger/goshimmer/packages/events" - "github.com/iotaledger/goshimmer/packages/model/bundle" - "github.com/iotaledger/goshimmer/packages/model/transactionmetadata" "github.com/iotaledger/goshimmer/packages/model/value_transaction" "github.com/iotaledger/goshimmer/packages/node" + "github.com/iotaledger/goshimmer/packages/workerpool" "github.com/iotaledger/goshimmer/plugins/tangle" "github.com/iotaledger/iota.go/trinary" ) -var PLUGIN = node.NewPlugin("Bundle Processor", configure) +var PLUGIN = node.NewPlugin("Bundle Processor", configure, run) func configure(plugin *node.Plugin) { + workerPool = workerpool.New(func(task workerpool.Task) { + if _, err := ProcessSolidBundleHead(task.Param(0).(*value_transaction.ValueTransaction)); err != nil { + plugin.LogFailure(err.Error()) + } + }, workerpool.WorkerCount(WORKER_COUNT), workerpool.QueueSize(2*WORKER_COUNT)) + tangle.Events.TransactionSolid.Attach(events.NewClosure(func(tx *value_transaction.ValueTransaction) { if tx.IsHead() { - if _, err := ProcessSolidBundleHead(tx); err != nil { - plugin.LogFailure(err.Error()) - } + workerPool.Submit(tx) } })) -} - -func ProcessSolidBundleHead(headTransaction *value_transaction.ValueTransaction) (*bundle.Bundle, errors.IdentifiableError) { - // only process the bundle if we didn't process it, yet - return tangle.GetBundle(headTransaction.GetHash(), func(headTransactionHash trinary.Trytes) (*bundle.Bundle, errors.IdentifiableError) { - // abort if bundle syntax is wrong - if !headTransaction.IsHead() { - return nil, ErrProcessBundleFailed.Derive(errors.New("invalid parameter"), "transaction needs to be head of bundle") - } - - // initialize event variables - newBundle := bundle.New(headTransactionHash) - bundleTransactions := make([]*value_transaction.ValueTransaction, 0) - - // iterate through trunk transactions until we reach the tail - currentTransaction := headTransaction - for { - // abort if we reached a previous head - if currentTransaction.IsHead() && currentTransaction != headTransaction { - newBundle.SetTransactionHashes(mapTransactionsToTransactionHashes(bundleTransactions)) - Events.InvalidBundleReceived.Trigger(newBundle, bundleTransactions) + daemon.Events.Shutdown.Attach(events.NewClosure(func() { + plugin.LogInfo("Stopping Bundle Processor ...") - return nil, ErrProcessBundleFailed.Derive(errors.New("invalid bundle found"), "missing bundle tail") - } - - // update bundle transactions - bundleTransactions = append(bundleTransactions, currentTransaction) - - // retrieve & update metadata - currentTransactionMetadata, dbErr := tangle.GetTransactionMetadata(currentTransaction.GetHash(), transactionmetadata.New) - if dbErr != nil { - return nil, ErrProcessBundleFailed.Derive(dbErr, "failed to retrieve transaction metadata") - } - currentTransactionMetadata.SetBundleHeadHash(headTransactionHash) - - // update value bundle flag - if !newBundle.IsValueBundle() && currentTransaction.GetValue() != 0 { - newBundle.SetValueBundle(true) - } + workerPool.Stop() + })) +} - // if we are done -> trigger events - if currentTransaction.IsTail() { - newBundle.SetTransactionHashes(mapTransactionsToTransactionHashes(bundleTransactions)) +func run(plugin *node.Plugin) { + plugin.LogInfo("Starting Bundle Processor ...") - if newBundle.IsValueBundle() { - Events.ValueBundleReceived.Trigger(newBundle, bundleTransactions) - } else { - Events.DataBundleReceived.Trigger(newBundle, bundleTransactions) - } + daemon.BackgroundWorker("Bundle Processor", func() { + plugin.LogSuccess("Starting Bundle Processor ... done") - return newBundle, nil - } + workerPool.Run() - // try to iterate to next turn - if nextTransaction, err := tangle.GetTransaction(currentTransaction.GetTrunkTransactionHash()); err != nil { - return nil, ErrProcessBundleFailed.Derive(err, "failed to retrieve trunk while processing bundle") - } else { - currentTransaction = nextTransaction - } - } + plugin.LogSuccess("Stopping Bundle Processor ... done") }) } -func mapTransactionsToTransactionHashes(transactions []*value_transaction.ValueTransaction) (result []trinary.Trytes) { - result = make([]trinary.Trytes, len(transactions)) - for k, v := range transactions { - result[k] = v.GetHash() - } +var workerPool *workerpool.WorkerPool - return -} +const WORKER_COUNT = 10000 diff --git a/plugins/gossip/neighbors.go b/plugins/gossip/neighbors.go index f61e08d86a6d0eac60eff86fe65df56acb75105c..87577a6669dcab45675e3a787ff4e8ac21f1b64a 100644 --- a/plugins/gossip/neighbors.go +++ b/plugins/gossip/neighbors.go @@ -47,7 +47,7 @@ func runNeighbors(plugin *node.Plugin) { } func manageConnection(plugin *node.Plugin, neighbor *Neighbor) { - daemon.BackgroundWorker(func() { + daemon.BackgroundWorker("Connection Manager ("+neighbor.Identity.StringIdentifier+")", func() { failedConnectionAttempts := 0 for _, exists := GetNeighbor(neighbor.Identity.StringIdentifier); exists && failedConnectionAttempts < CONNECTION_MAX_ATTEMPTS; { diff --git a/plugins/gossip/send_queue.go b/plugins/gossip/send_queue.go index 354c0c3bf0557265e7806d249a20b38e294ef667..9894910edde22b7e50e980ff176de408b62984b7 100644 --- a/plugins/gossip/send_queue.go +++ b/plugins/gossip/send_queue.go @@ -26,7 +26,7 @@ func configureSendQueue(plugin *node.Plugin) { func runSendQueue(plugin *node.Plugin) { plugin.LogInfo("Starting Send Queue Dispatcher ...") - daemon.BackgroundWorker(func() { + daemon.BackgroundWorker("Gossip Send Queue Dispatcher", func() { plugin.LogSuccess("Starting Send Queue Dispatcher ... done") for { @@ -54,7 +54,7 @@ func runSendQueue(plugin *node.Plugin) { connectedNeighborsMutex.Lock() for _, neighborQueue := range neighborQueues { - startNeighborSendQueue(neighborQueue) + startNeighborSendQueue(neighborQueue.protocol.Neighbor, neighborQueue) } connectedNeighborsMutex.Unlock() } @@ -104,13 +104,13 @@ func setupEventHandlers(neighbor *Neighbor) { })) if daemon.IsRunning() { - startNeighborSendQueue(queue) + startNeighborSendQueue(neighbor, queue) } })) } -func startNeighborSendQueue(neighborQueue *neighborQueue) { - daemon.BackgroundWorker(func() { +func startNeighborSendQueue(neighbor *Neighbor, neighborQueue *neighborQueue) { + daemon.BackgroundWorker("Gossip Send Queue ("+neighbor.Identity.StringIdentifier+")", func() { for { select { case <-daemon.ShutdownSignal: diff --git a/plugins/gossip/server.go b/plugins/gossip/server.go index 37ebc20ddd75f018ad016cb641ce9e87816d02c2..07804ad02702be929ec022dbc0e7efe3009fb5c7 100644 --- a/plugins/gossip/server.go +++ b/plugins/gossip/server.go @@ -73,7 +73,7 @@ func configureServer(plugin *node.Plugin) { func runServer(plugin *node.Plugin) { plugin.LogInfo("Starting TCP Server (port " + strconv.Itoa(*PORT.Value) + ") ...") - daemon.BackgroundWorker(func() { + daemon.BackgroundWorker("Gossip TCP Server", func() { plugin.LogSuccess("Starting TCP Server (port " + strconv.Itoa(*PORT.Value) + ") ... done") TCPServer.Listen(*PORT.Value) diff --git a/plugins/gracefulshutdown/plugin.go b/plugins/gracefulshutdown/plugin.go index babf75718afa7bd702d3dd1e17e29d51f15fa171..ee3a074c18f7b7d078be19689b70c8192994977f 100644 --- a/plugins/gracefulshutdown/plugin.go +++ b/plugins/gracefulshutdown/plugin.go @@ -4,6 +4,7 @@ import ( "os" "os/signal" "strconv" + "strings" "syscall" "time" @@ -31,7 +32,13 @@ var PLUGIN = node.NewPlugin("Graceful Shutdown", func(plugin *node.Plugin) { secondsSinceStart := x.Sub(start).Seconds() if secondsSinceStart <= WAIT_TO_KILL_TIME_IN_SECONDS { - plugin.LogWarning("Received shutdown request - waiting (max " + strconv.Itoa(WAIT_TO_KILL_TIME_IN_SECONDS-int(secondsSinceStart)) + " seconds) to finish processing ...") + processList := "" + runningBackgroundWorkers := daemon.GetRunningBackgroundWorkers() + if len(runningBackgroundWorkers) >= 1 { + processList = "(" + strings.Join(runningBackgroundWorkers, ", ") + ") " + } + + plugin.LogWarning("Received shutdown request - waiting (max " + strconv.Itoa(WAIT_TO_KILL_TIME_IN_SECONDS-int(secondsSinceStart)) + " seconds) to finish processing " + processList + "...") } else { plugin.LogFailure("Background processes did not terminate in time! Forcing shutdown ...") diff --git a/plugins/statusscreen-tps/plugin.go b/plugins/statusscreen-tps/plugin.go index eabb868f15e786ecea795ceac8173f2984d4a31c..9b73392c93acbe5d71a52a9ab6b3b3339cd83a3a 100644 --- a/plugins/statusscreen-tps/plugin.go +++ b/plugins/statusscreen-tps/plugin.go @@ -36,7 +36,7 @@ var PLUGIN = node.NewPlugin("Statusscreen TPS", func(plugin *node.Plugin) { return "TPS", strconv.FormatUint(atomic.LoadUint64(&receivedTps), 10) + " received / " + strconv.FormatUint(atomic.LoadUint64(&solidTps), 10) + " new" }) }, func(plugin *node.Plugin) { - daemon.BackgroundWorker(func() { + daemon.BackgroundWorker("Statusscreen TPS Tracker", func() { ticker := time.NewTicker(time.Second) for { diff --git a/plugins/statusscreen/statusscreen.go b/plugins/statusscreen/statusscreen.go index 31b00d08edc337ddb5afccb897e3fc39322c243d..5c6d13e27a7162126eef3a69798110f6758ff059 100644 --- a/plugins/statusscreen/statusscreen.go +++ b/plugins/statusscreen/statusscreen.go @@ -100,7 +100,7 @@ func run(plugin *node.Plugin) { return false }) - daemon.BackgroundWorker(func() { + daemon.BackgroundWorker("Statusscreen Refresher", func() { for { select { case <-daemon.ShutdownSignal: @@ -111,7 +111,7 @@ func run(plugin *node.Plugin) { } }) - daemon.BackgroundWorker(func() { + daemon.BackgroundWorker("Statusscreen App", func() { if err := app.SetRoot(frame, true).SetFocus(frame).Run(); err != nil { panic(err) } diff --git a/plugins/tangle/plugin.go b/plugins/tangle/plugin.go index 00f2dfaea6bc8481e11720ed9d8124f11e40ee93..db5d513513b885a1e6c1a6d939c00e3bd751607d 100644 --- a/plugins/tangle/plugin.go +++ b/plugins/tangle/plugin.go @@ -12,11 +12,12 @@ func configure(plugin *node.Plugin) { configureTransactionDatabase(plugin) configureTransactionMetaDataDatabase(plugin) configureApproversDatabase(plugin) + configureBundleDatabase(plugin) configureSolidifier(plugin) } func run(plugin *node.Plugin) { - // this plugin has no background workers + runSolidifier(plugin) } // endregion /////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/plugins/tangle/solidifier.go b/plugins/tangle/solidifier.go index 69cd07e90db519dc5330abc41537b4661e110a55..f9636163acc41b843195f17f2e4a1cb32c03901a 100644 --- a/plugins/tangle/solidifier.go +++ b/plugins/tangle/solidifier.go @@ -9,37 +9,44 @@ import ( "github.com/iotaledger/goshimmer/packages/model/transactionmetadata" "github.com/iotaledger/goshimmer/packages/model/value_transaction" "github.com/iotaledger/goshimmer/packages/node" + "github.com/iotaledger/goshimmer/packages/ternary" + "github.com/iotaledger/goshimmer/packages/workerpool" "github.com/iotaledger/goshimmer/plugins/gossip" "github.com/iotaledger/iota.go/trinary" ) // region plugin module setup ////////////////////////////////////////////////////////////////////////////////////////// -//var solidifierChan = make(chan *value_transaction.ValueTransaction, 1000) - -const NUMBER_OF_WORKERS = 3000 - -var tasksChan = make(chan *meta_transaction.MetaTransaction, NUMBER_OF_WORKERS) +var workerPool *workerpool.WorkerPool func configureSolidifier(plugin *node.Plugin) { - for i := 0; i < NUMBER_OF_WORKERS; i++ { - go func() { - for { - select { - case <-daemon.ShutdownSignal: - return - case rawTransaction := <-tasksChan: - processMetaTransaction(plugin, rawTransaction) - } - } - }() - } + workerPool = workerpool.New(func(task workerpool.Task) { + processMetaTransaction(plugin, task.Param(0).(*meta_transaction.MetaTransaction)) + }, workerpool.WorkerCount(WORKER_COUNT), workerpool.QueueSize(2*WORKER_COUNT)) gossip.Events.ReceiveTransaction.Attach(events.NewClosure(func(rawTransaction *meta_transaction.MetaTransaction) { - tasksChan <- rawTransaction + workerPool.Submit(rawTransaction) + })) + + daemon.Events.Shutdown.Attach(events.NewClosure(func() { + plugin.LogInfo("Stopping Solidifier ...") + + workerPool.Stop() })) } +func runSolidifier(plugin *node.Plugin) { + plugin.LogInfo("Starting Solidifier ...") + + daemon.BackgroundWorker("Tangle Solidifier", func() { + plugin.LogSuccess("Starting Solidifier ... done") + + workerPool.Run() + + plugin.LogSuccess("Stopping Solidifier ... done") + }) +} + // endregion /////////////////////////////////////////////////////////////////////////////////////////////////////////// // Checks and updates the solid flag of a single transaction. @@ -115,10 +122,10 @@ func IsSolid(transaction *value_transaction.ValueTransaction) (bool, errors.Iden } func propagateSolidity(transactionHash trinary.Trytes) errors.IdentifiableError { - if approvers, err := GetApprovers(transactionHash, approvers.New); err != nil { + if transactionApprovers, err := GetApprovers(transactionHash, approvers.New); err != nil { return err } else { - for _, approverHash := range approvers.GetHashes() { + for _, approverHash := range transactionApprovers.GetHashes() { if approver, err := GetTransaction(approverHash); err != nil { return err } else if approver != nil { @@ -178,5 +185,6 @@ func processTransaction(plugin *node.Plugin, transaction *value_transaction.Valu return } - //solidifierChan <- transaction } + +const WORKER_COUNT = 400 diff --git a/plugins/webapi/plugin.go b/plugins/webapi/plugin.go index ee2010c4de024d622b2da51705db47604c581e4c..4d80bdad8550809452b03c53e786f0836b0da6b5 100644 --- a/plugins/webapi/plugin.go +++ b/plugins/webapi/plugin.go @@ -33,7 +33,7 @@ func configure(plugin *node.Plugin) { func run(plugin *node.Plugin) { plugin.LogInfo("Starting Web Server ...") - daemon.BackgroundWorker(func() { + daemon.BackgroundWorker("WebAPI Server", func() { plugin.LogSuccess("Starting Web Server ... done") if err := Server.Start(":8080"); err != nil { diff --git a/plugins/zeromq/plugin.go b/plugins/zeromq/plugin.go index 4db21355e53a934e01ca1ee69748e76c6ddc80cb..1b7e38af503c813ef397549098f9024964e2262e 100644 --- a/plugins/zeromq/plugin.go +++ b/plugins/zeromq/plugin.go @@ -45,7 +45,7 @@ func run(plugin *node.Plugin) { plugin.LogInfo("Starting ZeroMQ Publisher (port " + strconv.Itoa(*PORT.Value) + ") ...") - daemon.BackgroundWorker(func() { + daemon.BackgroundWorker("ZeroMQ Publisher", func() { if err := startPublisher(plugin); err != nil { plugin.LogFailure("Stopping ZeroMQ Publisher: " + err.Error()) } else {