Skip to content
Snippets Groups Projects
Commit e3104920 authored by Hans Moog's avatar Hans Moog
Browse files

Feat: created worker for valueBundleProcessor

parent 97cfb525
Branches
Tags
No related merge requests found
package bundleprocessor package bundleprocessor
import ( import (
"github.com/iotaledger/goshimmer/packages/curl"
"github.com/iotaledger/goshimmer/packages/errors" "github.com/iotaledger/goshimmer/packages/errors"
"github.com/iotaledger/goshimmer/packages/model/bundle" "github.com/iotaledger/goshimmer/packages/model/bundle"
"github.com/iotaledger/goshimmer/packages/model/transactionmetadata" "github.com/iotaledger/goshimmer/packages/model/transactionmetadata"
"github.com/iotaledger/goshimmer/packages/model/value_transaction" "github.com/iotaledger/goshimmer/packages/model/value_transaction"
"github.com/iotaledger/goshimmer/packages/workerpool"
"github.com/iotaledger/goshimmer/plugins/tangle" "github.com/iotaledger/goshimmer/plugins/tangle"
"github.com/iotaledger/iota.go/trinary" "github.com/iotaledger/iota.go/trinary"
) )
var workerPool = workerpool.New(func(task workerpool.Task) {
if _, err := ProcessSolidBundleHead(task.Param(0).(*value_transaction.ValueTransaction)); err != nil {
Events.Error.Trigger(err)
}
}, workerpool.WorkerCount(WORKER_COUNT), workerpool.QueueSize(2*WORKER_COUNT))
const WORKER_COUNT = 10000
func ProcessSolidBundleHead(headTransaction *value_transaction.ValueTransaction) (*bundle.Bundle, errors.IdentifiableError) { func ProcessSolidBundleHead(headTransaction *value_transaction.ValueTransaction) (*bundle.Bundle, errors.IdentifiableError) {
// only process the bundle if we didn't process it, yet // only process the bundle if we didn't process it, yet
return tangle.GetBundle(headTransaction.GetHash(), func(headTransactionHash trinary.Trytes) (*bundle.Bundle, errors.IdentifiableError) { return tangle.GetBundle(headTransaction.GetHash(), func(headTransactionHash trinary.Trytes) (*bundle.Bundle, errors.IdentifiableError) {
...@@ -54,18 +62,9 @@ func ProcessSolidBundleHead(headTransaction *value_transaction.ValueTransaction) ...@@ -54,18 +62,9 @@ func ProcessSolidBundleHead(headTransaction *value_transaction.ValueTransaction)
newBundle.SetTransactionHashes(mapTransactionsToTransactionHashes(bundleTransactions)) newBundle.SetTransactionHashes(mapTransactionsToTransactionHashes(bundleTransactions))
if newBundle.IsValueBundle() { if newBundle.IsValueBundle() {
var concatenatedBundleEssences = make(trinary.Trits, len(bundleTransactions)*value_transaction.BUNDLE_ESSENCE_SIZE) valueBundleProcessorWorkerPool.Submit(newBundle, bundleTransactions)
for i, bundleTransaction := range bundleTransactions {
copy(concatenatedBundleEssences[value_transaction.BUNDLE_ESSENCE_SIZE*i:value_transaction.BUNDLE_ESSENCE_SIZE*(i+1)], bundleTransaction.GetBundleEssence())
}
var resp = make(trinary.Trits, 243)
hasher := curl.NewCurl(243, 81)
hasher.Absorb(concatenatedBundleEssences, 0, len(concatenatedBundleEssences))
hasher.Squeeze(resp, 0, 243)
newBundle.SetBundleEssenceHash(trinary.MustTritsToTrytes(resp)) return newBundle, nil
} }
Events.BundleSolid.Trigger(newBundle, bundleTransactions) Events.BundleSolid.Trigger(newBundle, bundleTransactions)
......
package bundleprocessor package bundleprocessor
import ( import (
"github.com/iotaledger/goshimmer/packages/errors"
"github.com/iotaledger/goshimmer/packages/events" "github.com/iotaledger/goshimmer/packages/events"
"github.com/iotaledger/goshimmer/packages/model/bundle" "github.com/iotaledger/goshimmer/packages/model/bundle"
"github.com/iotaledger/goshimmer/packages/model/value_transaction" "github.com/iotaledger/goshimmer/packages/model/value_transaction"
) )
var Events = pluginEvents{ var Events = pluginEvents{
Error: events.NewEvent(errorCaller),
BundleSolid: events.NewEvent(bundleEventCaller), BundleSolid: events.NewEvent(bundleEventCaller),
InvalidBundle: events.NewEvent(bundleEventCaller), InvalidBundle: events.NewEvent(bundleEventCaller),
} }
type pluginEvents struct { type pluginEvents struct {
Error *events.Event
BundleSolid *events.Event BundleSolid *events.Event
InvalidBundle *events.Event InvalidBundle *events.Event
} }
func errorCaller(handler interface{}, params ...interface{}) {
handler.(func(errors.IdentifiableError))(params[0].(errors.IdentifiableError))
}
func bundleEventCaller(handler interface{}, params ...interface{}) { func bundleEventCaller(handler interface{}, params ...interface{}) {
handler.(func(*bundle.Bundle, []*value_transaction.ValueTransaction))(params[0].(*bundle.Bundle), params[1].([]*value_transaction.ValueTransaction)) handler.(func(*bundle.Bundle, []*value_transaction.ValueTransaction))(params[0].(*bundle.Bundle), params[1].([]*value_transaction.ValueTransaction))
} }
...@@ -2,32 +2,34 @@ package bundleprocessor ...@@ -2,32 +2,34 @@ package bundleprocessor
import ( import (
"github.com/iotaledger/goshimmer/packages/daemon" "github.com/iotaledger/goshimmer/packages/daemon"
"github.com/iotaledger/goshimmer/packages/errors"
"github.com/iotaledger/goshimmer/packages/events" "github.com/iotaledger/goshimmer/packages/events"
"github.com/iotaledger/goshimmer/packages/model/value_transaction" "github.com/iotaledger/goshimmer/packages/model/value_transaction"
"github.com/iotaledger/goshimmer/packages/node" "github.com/iotaledger/goshimmer/packages/node"
"github.com/iotaledger/goshimmer/packages/workerpool"
"github.com/iotaledger/goshimmer/plugins/tangle" "github.com/iotaledger/goshimmer/plugins/tangle"
) )
var PLUGIN = node.NewPlugin("Bundle Processor", configure, run) var PLUGIN = node.NewPlugin("Bundle Processor", configure, run)
func configure(plugin *node.Plugin) { func configure(plugin *node.Plugin) {
workerPool = workerpool.New(func(task workerpool.Task) {
if _, err := ProcessSolidBundleHead(task.Param(0).(*value_transaction.ValueTransaction)); err != nil {
plugin.LogFailure(err.Error())
}
}, workerpool.WorkerCount(WORKER_COUNT), workerpool.QueueSize(2*WORKER_COUNT))
tangle.Events.TransactionSolid.Attach(events.NewClosure(func(tx *value_transaction.ValueTransaction) { tangle.Events.TransactionSolid.Attach(events.NewClosure(func(tx *value_transaction.ValueTransaction) {
if tx.IsHead() { if tx.IsHead() {
workerPool.Submit(tx) workerPool.Submit(tx)
} }
})) }))
Events.Error.Attach(events.NewClosure(func(err errors.IdentifiableError) {
plugin.LogFailure(err.Error())
}))
daemon.Events.Shutdown.Attach(events.NewClosure(func() { daemon.Events.Shutdown.Attach(events.NewClosure(func() {
plugin.LogInfo("Stopping Bundle Processor ...") plugin.LogInfo("Stopping Bundle Processor ...")
workerPool.Stop() workerPool.Stop()
plugin.LogInfo("Stopping Value Bundle Processor ...")
valueBundleProcessorWorkerPool.Stop()
})) }))
} }
...@@ -41,8 +43,14 @@ func run(plugin *node.Plugin) { ...@@ -41,8 +43,14 @@ func run(plugin *node.Plugin) {
plugin.LogSuccess("Stopping Bundle Processor ... done") plugin.LogSuccess("Stopping Bundle Processor ... done")
}) })
}
var workerPool *workerpool.WorkerPool plugin.LogInfo("Starting Value Bundle Processor ...")
daemon.BackgroundWorker("Value Bundle Processor", func() {
plugin.LogSuccess("Starting Value Bundle Processor ... done")
valueBundleProcessorWorkerPool.Run()
const WORKER_COUNT = 10000 plugin.LogSuccess("Stopping Value Bundle Processor ... done")
})
}
package bundleprocessor
import (
"github.com/iotaledger/goshimmer/packages/curl"
"github.com/iotaledger/goshimmer/packages/errors"
"github.com/iotaledger/goshimmer/packages/model/bundle"
"github.com/iotaledger/goshimmer/packages/model/value_transaction"
"github.com/iotaledger/goshimmer/packages/workerpool"
"github.com/iotaledger/iota.go/trinary"
)
var valueBundleProcessorWorkerPool = workerpool.New(func(task workerpool.Task) {
if err := ProcessSolidValueBundle(task.Param(0).(*bundle.Bundle), task.Param(1).([]*value_transaction.ValueTransaction)); err != nil {
Events.Error.Trigger(err)
}
}, workerpool.WorkerCount(WORKER_COUNT), workerpool.QueueSize(2*WORKER_COUNT))
func ProcessSolidValueBundle(bundle *bundle.Bundle, bundleTransactions []*value_transaction.ValueTransaction) errors.IdentifiableError {
var concatenatedBundleEssences = make(trinary.Trits, len(bundleTransactions)*value_transaction.BUNDLE_ESSENCE_SIZE)
for i, bundleTransaction := range bundleTransactions {
copy(concatenatedBundleEssences[value_transaction.BUNDLE_ESSENCE_SIZE*i:value_transaction.BUNDLE_ESSENCE_SIZE*(i+1)], bundleTransaction.GetBundleEssence())
}
var resp = make(trinary.Trits, 243)
hasher := curl.NewCurl(243, 81)
hasher.Absorb(concatenatedBundleEssences, 0, len(concatenatedBundleEssences))
hasher.Squeeze(resp, 0, 243)
bundle.SetBundleEssenceHash(trinary.MustTritsToTrytes(resp))
Events.BundleSolid.Trigger(bundle, bundleTransactions)
return nil
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment