diff --git a/main.go b/main.go index 91ec88b8e32820b238b744768ba952c3ab681374..7ab4b3abc41f3f141454f7493c241cce9e1e6d75 100644 --- a/main.go +++ b/main.go @@ -4,6 +4,7 @@ import ( "github.com/iotaledger/goshimmer/packages/node" "github.com/iotaledger/goshimmer/plugins/analysis" "github.com/iotaledger/goshimmer/plugins/autopeering" + "github.com/iotaledger/goshimmer/plugins/bundleprocessor" "github.com/iotaledger/goshimmer/plugins/cli" "github.com/iotaledger/goshimmer/plugins/gossip" gossip_on_solidification "github.com/iotaledger/goshimmer/plugins/gossip-on-solidification" @@ -25,6 +26,7 @@ func main() { gossip.PLUGIN, gossip_on_solidification.PLUGIN, tangle.PLUGIN, + bundleprocessor.PLUGIN, analysis.PLUGIN, gracefulshutdown.PLUGIN, tipselection.PLUGIN, diff --git a/plugins/bundleprocessor/plugin.go b/plugins/bundleprocessor/plugin.go index 1a1a55c3ae7227db5e07c5066fdcacccf9b39944..44e110035c536d846d4b675841835b4701817921 100644 --- a/plugins/bundleprocessor/plugin.go +++ b/plugins/bundleprocessor/plugin.go @@ -1,20 +1,48 @@ package bundleprocessor import ( + "github.com/iotaledger/goshimmer/packages/daemon" "github.com/iotaledger/goshimmer/packages/events" "github.com/iotaledger/goshimmer/packages/model/value_transaction" "github.com/iotaledger/goshimmer/packages/node" + "github.com/iotaledger/goshimmer/packages/workerpool" "github.com/iotaledger/goshimmer/plugins/tangle" ) -var PLUGIN = node.NewPlugin("Bundle Processor", configure) +var PLUGIN = node.NewPlugin("Bundle Processor", configure, run) func configure(plugin *node.Plugin) { + workerPool = workerpool.New(func(task workerpool.Task) { + if _, err := ProcessSolidBundleHead(task.Param(0).(*value_transaction.ValueTransaction)); err != nil { + plugin.LogFailure(err.Error()) + } + }, workerpool.WorkerCount(WORKER_COUNT), workerpool.QueueSize(2*WORKER_COUNT)) + tangle.Events.TransactionSolid.Attach(events.NewClosure(func(tx *value_transaction.ValueTransaction) { if tx.IsHead() { - if _, err := ProcessSolidBundleHead(tx); err != nil { - plugin.LogFailure(err.Error()) - } + workerPool.Submit(tx) } })) + + daemon.Events.Shutdown.Attach(events.NewClosure(func() { + plugin.LogInfo("Stopping Bundle Processor ...") + + workerPool.Stop() + })) +} + +func run(plugin *node.Plugin) { + plugin.LogInfo("Starting Bundle Processor ...") + + daemon.BackgroundWorker(func() { + plugin.LogSuccess("Starting Bundle Processor ... done") + + workerPool.Run() + + plugin.LogSuccess("Stopping Bundle Processor ... done") + }) } + +var workerPool *workerpool.WorkerPool + +const WORKER_COUNT = 10000