diff --git a/plugins/bundleprocessor/bundleprocessor.go b/plugins/bundleprocessor/bundleprocessor.go index 9dade9b6e6551106f9b672f7cb65fcc72982163a..4e7a80ed44de49a3efd4318ba20a0ce1171314b6 100644 --- a/plugins/bundleprocessor/bundleprocessor.go +++ b/plugins/bundleprocessor/bundleprocessor.go @@ -1,15 +1,23 @@ package bundleprocessor import ( - "github.com/iotaledger/goshimmer/packages/curl" "github.com/iotaledger/goshimmer/packages/errors" "github.com/iotaledger/goshimmer/packages/model/bundle" "github.com/iotaledger/goshimmer/packages/model/transactionmetadata" "github.com/iotaledger/goshimmer/packages/model/value_transaction" + "github.com/iotaledger/goshimmer/packages/workerpool" "github.com/iotaledger/goshimmer/plugins/tangle" "github.com/iotaledger/iota.go/trinary" ) +var workerPool = workerpool.New(func(task workerpool.Task) { + if _, err := ProcessSolidBundleHead(task.Param(0).(*value_transaction.ValueTransaction)); err != nil { + Events.Error.Trigger(err) + } +}, workerpool.WorkerCount(WORKER_COUNT), workerpool.QueueSize(2*WORKER_COUNT)) + +const WORKER_COUNT = 10000 + func ProcessSolidBundleHead(headTransaction *value_transaction.ValueTransaction) (*bundle.Bundle, errors.IdentifiableError) { // only process the bundle if we didn't process it, yet return tangle.GetBundle(headTransaction.GetHash(), func(headTransactionHash trinary.Trytes) (*bundle.Bundle, errors.IdentifiableError) { @@ -54,18 +62,9 @@ func ProcessSolidBundleHead(headTransaction *value_transaction.ValueTransaction) newBundle.SetTransactionHashes(mapTransactionsToTransactionHashes(bundleTransactions)) if newBundle.IsValueBundle() { - var concatenatedBundleEssences = make(trinary.Trits, len(bundleTransactions)*value_transaction.BUNDLE_ESSENCE_SIZE) - for i, bundleTransaction := range bundleTransactions { - copy(concatenatedBundleEssences[value_transaction.BUNDLE_ESSENCE_SIZE*i:value_transaction.BUNDLE_ESSENCE_SIZE*(i+1)], bundleTransaction.GetBundleEssence()) - } - - var resp = make(trinary.Trits, 243) - - hasher := curl.NewCurl(243, 81) - hasher.Absorb(concatenatedBundleEssences, 0, len(concatenatedBundleEssences)) - hasher.Squeeze(resp, 0, 243) + valueBundleProcessorWorkerPool.Submit(newBundle, bundleTransactions) - newBundle.SetBundleEssenceHash(trinary.MustTritsToTrytes(resp)) + return newBundle, nil } Events.BundleSolid.Trigger(newBundle, bundleTransactions) diff --git a/plugins/bundleprocessor/events.go b/plugins/bundleprocessor/events.go index 2d67f5da2020893b66a6fdcc6bf6df8ff1a01c6b..6deb5fbb853361d764101be74f94f15ae0abdaab 100644 --- a/plugins/bundleprocessor/events.go +++ b/plugins/bundleprocessor/events.go @@ -1,21 +1,28 @@ package bundleprocessor import ( + "github.com/iotaledger/goshimmer/packages/errors" "github.com/iotaledger/goshimmer/packages/events" "github.com/iotaledger/goshimmer/packages/model/bundle" "github.com/iotaledger/goshimmer/packages/model/value_transaction" ) var Events = pluginEvents{ + Error: events.NewEvent(errorCaller), BundleSolid: events.NewEvent(bundleEventCaller), InvalidBundle: events.NewEvent(bundleEventCaller), } type pluginEvents struct { + Error *events.Event BundleSolid *events.Event InvalidBundle *events.Event } +func errorCaller(handler interface{}, params ...interface{}) { + handler.(func(errors.IdentifiableError))(params[0].(errors.IdentifiableError)) +} + func bundleEventCaller(handler interface{}, params ...interface{}) { handler.(func(*bundle.Bundle, []*value_transaction.ValueTransaction))(params[0].(*bundle.Bundle), params[1].([]*value_transaction.ValueTransaction)) } diff --git a/plugins/bundleprocessor/plugin.go b/plugins/bundleprocessor/plugin.go index cec87804d3c1fa95ad0e51d025f1e84d1e19ad76..02303b751f4007b8bfd24f7b8e4f3237cfa8e23a 100644 --- a/plugins/bundleprocessor/plugin.go +++ b/plugins/bundleprocessor/plugin.go @@ -2,32 +2,34 @@ package bundleprocessor import ( "github.com/iotaledger/goshimmer/packages/daemon" + "github.com/iotaledger/goshimmer/packages/errors" "github.com/iotaledger/goshimmer/packages/events" "github.com/iotaledger/goshimmer/packages/model/value_transaction" "github.com/iotaledger/goshimmer/packages/node" - "github.com/iotaledger/goshimmer/packages/workerpool" "github.com/iotaledger/goshimmer/plugins/tangle" ) var PLUGIN = node.NewPlugin("Bundle Processor", configure, run) func configure(plugin *node.Plugin) { - workerPool = workerpool.New(func(task workerpool.Task) { - if _, err := ProcessSolidBundleHead(task.Param(0).(*value_transaction.ValueTransaction)); err != nil { - plugin.LogFailure(err.Error()) - } - }, workerpool.WorkerCount(WORKER_COUNT), workerpool.QueueSize(2*WORKER_COUNT)) - tangle.Events.TransactionSolid.Attach(events.NewClosure(func(tx *value_transaction.ValueTransaction) { if tx.IsHead() { workerPool.Submit(tx) } })) + Events.Error.Attach(events.NewClosure(func(err errors.IdentifiableError) { + plugin.LogFailure(err.Error()) + })) + daemon.Events.Shutdown.Attach(events.NewClosure(func() { plugin.LogInfo("Stopping Bundle Processor ...") workerPool.Stop() + + plugin.LogInfo("Stopping Value Bundle Processor ...") + + valueBundleProcessorWorkerPool.Stop() })) } @@ -41,8 +43,14 @@ func run(plugin *node.Plugin) { plugin.LogSuccess("Stopping Bundle Processor ... done") }) -} -var workerPool *workerpool.WorkerPool + plugin.LogInfo("Starting Value Bundle Processor ...") + + daemon.BackgroundWorker("Value Bundle Processor", func() { + plugin.LogSuccess("Starting Value Bundle Processor ... done") + + valueBundleProcessorWorkerPool.Run() -const WORKER_COUNT = 10000 + plugin.LogSuccess("Stopping Value Bundle Processor ... done") + }) +} diff --git a/plugins/bundleprocessor/valuebundleprocessor.go b/plugins/bundleprocessor/valuebundleprocessor.go new file mode 100644 index 0000000000000000000000000000000000000000..9b606e45470a0a7b58d0f2420d7c9033221e1572 --- /dev/null +++ b/plugins/bundleprocessor/valuebundleprocessor.go @@ -0,0 +1,35 @@ +package bundleprocessor + +import ( + "github.com/iotaledger/goshimmer/packages/curl" + "github.com/iotaledger/goshimmer/packages/errors" + "github.com/iotaledger/goshimmer/packages/model/bundle" + "github.com/iotaledger/goshimmer/packages/model/value_transaction" + "github.com/iotaledger/goshimmer/packages/workerpool" + "github.com/iotaledger/iota.go/trinary" +) + +var valueBundleProcessorWorkerPool = workerpool.New(func(task workerpool.Task) { + if err := ProcessSolidValueBundle(task.Param(0).(*bundle.Bundle), task.Param(1).([]*value_transaction.ValueTransaction)); err != nil { + Events.Error.Trigger(err) + } +}, workerpool.WorkerCount(WORKER_COUNT), workerpool.QueueSize(2*WORKER_COUNT)) + +func ProcessSolidValueBundle(bundle *bundle.Bundle, bundleTransactions []*value_transaction.ValueTransaction) errors.IdentifiableError { + var concatenatedBundleEssences = make(trinary.Trits, len(bundleTransactions)*value_transaction.BUNDLE_ESSENCE_SIZE) + for i, bundleTransaction := range bundleTransactions { + copy(concatenatedBundleEssences[value_transaction.BUNDLE_ESSENCE_SIZE*i:value_transaction.BUNDLE_ESSENCE_SIZE*(i+1)], bundleTransaction.GetBundleEssence()) + } + + var resp = make(trinary.Trits, 243) + + hasher := curl.NewCurl(243, 81) + hasher.Absorb(concatenatedBundleEssences, 0, len(concatenatedBundleEssences)) + hasher.Squeeze(resp, 0, 243) + + bundle.SetBundleEssenceHash(trinary.MustTritsToTrytes(resp)) + + Events.BundleSolid.Trigger(bundle, bundleTransactions) + + return nil +}