Skip to content
Snippets Groups Projects
Commit a48e348d authored by Hans Moog's avatar Hans Moog
Browse files

Feat: added batchworkerpool and modified curl batch hasher

parent 453479bf
No related branches found
No related tags found
No related merge requests found
package batchworkerpool
import (
"sync"
"time"
)
type BatchWorkerPool struct {
workerFnc func([]Task)
options *Options
calls chan Task
batchedCalls chan []Task
terminate chan int
running bool
mutex sync.RWMutex
wait sync.WaitGroup
}
func New(workerFnc func([]Task), optionalOptions ...Option) (result *BatchWorkerPool) {
options := DEFAULT_OPTIONS.Override(optionalOptions...)
result = &BatchWorkerPool{
workerFnc: workerFnc,
options: options,
}
result.resetChannels()
return
}
func (wp *BatchWorkerPool) Submit(params ...interface{}) (result chan interface{}) {
result = make(chan interface{}, 1)
wp.mutex.RLock()
if wp.running {
wp.calls <- Task{
params: params,
resultChan: result,
}
} else {
close(result)
}
wp.mutex.RUnlock()
return
}
func (wp *BatchWorkerPool) Start() {
wp.mutex.Lock()
if !wp.running {
wp.running = true
wp.startBatchDispatcher()
wp.startBatchWorkers()
}
wp.mutex.Unlock()
}
func (wp *BatchWorkerPool) Run() {
wp.Start()
wp.wait.Wait()
}
func (wp *BatchWorkerPool) Stop() {
go wp.StopAndWait()
}
func (wp *BatchWorkerPool) StopAndWait() {
wp.mutex.Lock()
if wp.running {
wp.running = false
close(wp.terminate)
wp.resetChannels()
}
wp.wait.Wait()
wp.mutex.Unlock()
}
func (wp *BatchWorkerPool) resetChannels() {
wp.calls = make(chan Task, wp.options.QueueSize)
wp.batchedCalls = make(chan []Task, 2*wp.options.WorkerCount)
wp.terminate = make(chan int, 1)
}
func (wp *BatchWorkerPool) startBatchDispatcher() {
calls := wp.calls
terminate := wp.terminate
wp.wait.Add(1)
go func() {
for {
select {
case <-terminate:
wp.wait.Done()
return
case firstCall := <-calls:
batchTask := append(make([]Task, 0), firstCall)
collectionTimeout := time.After(wp.options.BatchCollectionTimeout)
// collect additional requests that arrive within the timeout
CollectAdditionalCalls:
for {
select {
case <-terminate:
wp.wait.Done()
return
case <-collectionTimeout:
break CollectAdditionalCalls
case call := <-wp.calls:
batchTask = append(batchTask, call)
if len(batchTask) == wp.options.BatchSize {
break CollectAdditionalCalls
}
}
}
wp.batchedCalls <- batchTask
}
}
}()
}
func (wp *BatchWorkerPool) startBatchWorkers() {
batchedCalls := wp.batchedCalls
terminate := wp.terminate
for i := 0; i < wp.options.WorkerCount; i++ {
wp.wait.Add(1)
go func() {
aborted := false
for !aborted {
select {
case <-terminate:
aborted = true
case batchTask := <-batchedCalls:
wp.workerFnc(batchTask)
}
}
wp.wait.Done()
}()
}
}
package batchworkerpool
import (
"runtime"
"time"
)
var DEFAULT_OPTIONS = &Options{
WorkerCount: 2 * runtime.NumCPU(),
QueueSize: 2 * runtime.NumCPU() * 64,
BatchSize: 64,
BatchCollectionTimeout: 15 * time.Millisecond,
}
func WorkerCount(workerCount int) Option {
return func(args *Options) {
args.WorkerCount = workerCount
}
}
func BatchSize(batchSize int) Option {
return func(args *Options) {
args.BatchSize = batchSize
}
}
func BatchCollectionTimeout(batchCollectionTimeout time.Duration) Option {
return func(args *Options) {
args.BatchCollectionTimeout = batchCollectionTimeout
}
}
func QueueSize(queueSize int) Option {
return func(args *Options) {
args.QueueSize = queueSize
}
}
type Options struct {
WorkerCount int
QueueSize int
BatchSize int
BatchCollectionTimeout time.Duration
}
func (options Options) Override(optionalOptions ...Option) *Options {
result := &options
for _, option := range optionalOptions {
option(result)
}
return result
}
type Option func(*Options)
package batchworkerpool
type Task struct {
params []interface{}
resultChan chan interface{}
}
func (task *Task) Return(result interface{}) {
task.resultChan <- result
close(task.resultChan)
}
func (task *Task) Param(index int) interface{} {
return task.params[index]
}
......@@ -3,79 +3,39 @@ package curl
import (
"fmt"
"strconv"
"time"
"github.com/iotaledger/goshimmer/packages/batchworkerpool"
"github.com/iotaledger/goshimmer/packages/ternary"
)
type HashRequest struct {
input ternary.Trits
output chan ternary.Trits
}
type BatchHasher struct {
hashRequests chan HashRequest
tasks chan []HashRequest
hashLength int
rounds int
hashLength int
rounds int
workerPool *batchworkerpool.BatchWorkerPool
}
func NewBatchHasher(hashLength int, rounds int) *BatchHasher {
this := &BatchHasher{
hashLength: hashLength,
rounds: rounds,
hashRequests: make(chan HashRequest),
tasks: make(chan []HashRequest, NUMBER_OF_WORKERS),
func NewBatchHasher(hashLength int, rounds int) (result *BatchHasher) {
result = &BatchHasher{
hashLength: hashLength,
rounds: rounds,
}
go this.startDispatcher()
this.startWorkers()
result.workerPool = batchworkerpool.New(result.processHashes, batchworkerpool.BatchSize(strconv.IntSize))
result.workerPool.Start()
return this
return
}
func (this *BatchHasher) startWorkers() {
for i := 0; i < NUMBER_OF_WORKERS; i++ {
go func() {
for {
this.processHashes(<-this.tasks)
}
}()
}
func (this *BatchHasher) Hash(trits ternary.Trits) ternary.Trits {
return (<-this.workerPool.Submit(trits)).(ternary.Trits)
}
func (this *BatchHasher) startDispatcher() {
for {
collectedHashRequests := make([]HashRequest, 0)
// wait for first request to start processing at all
collectedHashRequests = append(collectedHashRequests, <-this.hashRequests)
// collect additional requests that arrive within the timeout
CollectAdditionalRequests:
for {
select {
case hashRequest := <-this.hashRequests:
collectedHashRequests = append(collectedHashRequests, hashRequest)
if len(collectedHashRequests) == strconv.IntSize {
break CollectAdditionalRequests
}
case <-time.After(50 * time.Millisecond):
break CollectAdditionalRequests
}
}
this.tasks <- collectedHashRequests
}
}
func (this *BatchHasher) processHashes(collectedHashRequests []HashRequest) {
if len(collectedHashRequests) > 1 {
func (this *BatchHasher) processHashes(tasks []batchworkerpool.Task) {
if len(tasks) > 1 {
// multiplex the requests
multiplexer := ternary.NewBCTernaryMultiplexer()
for _, hashRequest := range collectedHashRequests {
multiplexer.Add(hashRequest.input)
for _, hashRequest := range tasks {
multiplexer.Add(hashRequest.Param(0).(ternary.Trits))
}
bcTrits, err := multiplexer.Extract()
if err != nil {
......@@ -89,33 +49,18 @@ func (this *BatchHasher) processHashes(collectedHashRequests []HashRequest) {
// extract the results from the demultiplexer
demux := ternary.NewBCTernaryDemultiplexer(bctCurl.Squeeze(243))
for i, hashRequest := range collectedHashRequests {
hashRequest.output <- demux.Get(i)
close(hashRequest.output)
for i, task := range tasks {
task.Return(demux.Get(i))
}
} else {
var resp = make(ternary.Trits, this.hashLength)
trits := tasks[0].Param(0).(ternary.Trits)
curl := NewCurl(this.hashLength, this.rounds)
curl.Absorb(collectedHashRequests[0].input, 0, len(collectedHashRequests[0].input))
curl.Absorb(trits, 0, len(trits))
curl.Squeeze(resp, 0, this.hashLength)
collectedHashRequests[0].output <- resp
close(collectedHashRequests[0].output)
tasks[0].Return(resp)
}
}
func (this *BatchHasher) Hash(trits ternary.Trits) chan ternary.Trits {
hashRequest := HashRequest{
input: trits,
output: make(chan ternary.Trits, 1),
}
this.hashRequests <- hashRequest
return hashRequest.output
}
const (
NUMBER_OF_WORKERS = 1000
)
package curl
import (
"sync"
"testing"
"github.com/iotaledger/goshimmer/packages/ternary"
)
func BenchmarkBatchHasher_Hash(b *testing.B) {
batchHasher := NewBatchHasher(243, 81)
tritsToHash := ternary.Trytes("A999999FF").ToTrits()
b.ResetTimer()
var wg sync.WaitGroup
for i := 0; i < b.N; i++ {
wg.Add(1)
go func() {
batchHasher.Hash(tritsToHash)
wg.Done()
}()
}
wg.Wait()
}
......@@ -115,7 +115,7 @@ func (this *MetaTransaction) GetWeightMagnitude() (result int) {
// hashes the transaction using curl (without locking - internal usage)
func (this *MetaTransaction) parseHashRelatedDetails() {
hashTrits := <-curl.CURLP81.Hash(this.trits)
hashTrits := curl.CURLP81.Hash(this.trits)
hashTrytes := hashTrits.ToTrytes()
this.hash = &hashTrytes
......
package workerpool
import (
"runtime"
"time"
)
type BatchWorkerPoolOptions struct {
WorkerCount int
QueueSize int
MaxBatchSize int
BatchCollectionTimeout time.Duration
}
type BatchWorkerPool struct {
workerFnc func([]Call)
options BatchWorkerPoolOptions
callsChan chan Call
batchedCallsChan chan []Call
}
func NewBatchWorkerPool(workerFnc func([]Call), options BatchWorkerPoolOptions) *BatchWorkerPool {
return &BatchWorkerPool{
workerFnc: workerFnc,
options: options,
callsChan: make(chan Call, options.QueueSize),
batchedCallsChan: make(chan []Call, 2*options.WorkerCount),
}
}
func (wp *BatchWorkerPool) Submit(params ...interface{}) (result chan interface{}) {
result = make(chan interface{}, 1)
wp.callsChan <- Call{
params: params,
resultChan: result,
}
return
}
func (wp *BatchWorkerPool) Start() {
wp.startBatchDispatcher()
wp.startBatchWorkers()
}
func (wp *BatchWorkerPool) startBatchDispatcher() {
go func() {
for {
// wait for first request to start processing at all
batchTask := append(make([]Call, 0), <-wp.callsChan)
collectionTimeout := time.After(wp.options.BatchCollectionTimeout)
// collect additional requests that arrive within the timeout
CollectAdditionalCalls:
for {
select {
case <-collectionTimeout:
break CollectAdditionalCalls
case call := <-wp.callsChan:
batchTask = append(batchTask, call)
if len(batchTask) == wp.options.MaxBatchSize {
break CollectAdditionalCalls
}
}
}
wp.batchedCallsChan <- batchTask
}
}()
}
func (wp *BatchWorkerPool) startBatchWorkers() {
for i := 0; i < wp.options.WorkerCount; i++ {
go func() {
for {
batchTask := <-wp.batchedCallsChan
wp.workerFnc(batchTask)
}
}()
}
}
var (
DEFAULT_OPTIONS = BatchWorkerPoolOptions{
WorkerCount: 2 * runtime.NumCPU(),
QueueSize: 500,
MaxBatchSize: 64,
BatchCollectionTimeout: 10 * time.Millisecond,
}
)
package workerpool
import (
"fmt"
"sync"
"testing"
"github.com/iotaledger/goshimmer/packages/curl"
"github.com/iotaledger/goshimmer/packages/ternary"
)
func Benchmark(b *testing.B) {
trits := ternary.Trytes("A99999999999999999999999A99999999999999999999999A99999999999999999999999A99999999999999999999999").ToTrits()
options := DEFAULT_OPTIONS
options.QueueSize = 5000
pool := NewBatchWorkerPool(processBatchHashRequests, options)
pool.Start()
b.ResetTimer()
var wg sync.WaitGroup
for i := 0; i < b.N; i++ {
resultChan := pool.Submit(trits)
wg.Add(1)
go func() {
<-resultChan
wg.Done()
}()
}
wg.Wait()
}
func TestBatchWorkerPool(t *testing.T) {
pool := NewBatchWorkerPool(processBatchHashRequests, DEFAULT_OPTIONS)
pool.Start()
trits := ternary.Trytes("A99999").ToTrits()
var wg sync.WaitGroup
for i := 0; i < 10007; i++ {
resultChan := pool.Submit(trits)
wg.Add(1)
go func() {
<-resultChan
wg.Done()
}()
}
wg.Wait()
}
func processBatchHashRequests(calls []Call) {
if len(calls) > 1 {
// multiplex the requests
multiplexer := ternary.NewBCTernaryMultiplexer()
for _, hashRequest := range calls {
multiplexer.Add(hashRequest.params[0].(ternary.Trits))
}
bcTrits, err := multiplexer.Extract()
if err != nil {
fmt.Println(err)
}
// calculate the hash
bctCurl := curl.NewBCTCurl(243, 81)
bctCurl.Reset()
bctCurl.Absorb(bcTrits)
// extract the results from the demultiplexer
demux := ternary.NewBCTernaryDemultiplexer(bctCurl.Squeeze(243))
for i, hashRequest := range calls {
hashRequest.resultChan <- demux.Get(i)
close(hashRequest.resultChan)
}
} else {
var resp = make(ternary.Trits, 243)
curl := curl.NewCurl(243, 81)
curl.Absorb(calls[0].params[0].(ternary.Trits), 0, len(calls[0].params[0].(ternary.Trits)))
curl.Squeeze(resp, 0, 81)
calls[0].resultChan <- resp
close(calls[0].resultChan)
}
}
package workerpool
import (
"fmt"
)
type WorkerPool struct {
maxWorkers int
MaxQueueSize int
callsChan chan Call
}
func New(maxWorkers int) *WorkerPool {
return &WorkerPool{
maxWorkers: maxWorkers,
}
}
type Call struct {
params []interface{}
resultChan chan interface{}
}
func (wp *WorkerPool) Submit(params ...interface{}) (result chan interface{}) {
result = make(chan interface{}, 1)
wp.callsChan <- Call{
params: params,
resultChan: result,
}
return
}
func (wp *WorkerPool) startWorkers() {
for i := 0; i < wp.maxWorkers; i++ {
go func() {
for {
batchTasks := <-wp.callsChan
fmt.Println(batchTasks)
}
}()
}
}
func (wp *WorkerPool) Start() {
wp.callsChan = make(chan Call, 2*wp.maxWorkers)
wp.startWorkers()
}
package bundleprocessor
import (
"github.com/iotaledger/goshimmer/packages/errors"
"github.com/iotaledger/goshimmer/packages/model/bundle"
"github.com/iotaledger/goshimmer/packages/model/transactionmetadata"
"github.com/iotaledger/goshimmer/packages/model/value_transaction"
"github.com/iotaledger/goshimmer/packages/ternary"
"github.com/iotaledger/goshimmer/plugins/tangle"
)
func ProcessSolidBundleHead(headTransaction *value_transaction.ValueTransaction) (*bundle.Bundle, errors.IdentifiableError) {
// only process the bundle if we didn't process it, yet
return tangle.GetBundle(headTransaction.GetHash(), func(headTransactionHash ternary.Trytes) (*bundle.Bundle, errors.IdentifiableError) {
// abort if bundle syntax is wrong
if !headTransaction.IsHead() {
return nil, ErrProcessBundleFailed.Derive(errors.New("invalid parameter"), "transaction needs to be head of bundle")
}
// initialize event variables
newBundle := bundle.New(headTransactionHash)
bundleTransactions := make([]*value_transaction.ValueTransaction, 0)
// iterate through trunk transactions until we reach the tail
currentTransaction := headTransaction
for {
// abort if we reached a previous head
if currentTransaction.IsHead() && currentTransaction != headTransaction {
newBundle.SetTransactionHashes(mapTransactionsToTransactionHashes(bundleTransactions))
Events.InvalidBundleReceived.Trigger(newBundle, bundleTransactions)
return nil, ErrProcessBundleFailed.Derive(errors.New("invalid bundle found"), "missing bundle tail")
}
// update bundle transactions
bundleTransactions = append(bundleTransactions, currentTransaction)
// retrieve & update metadata
currentTransactionMetadata, dbErr := tangle.GetTransactionMetadata(currentTransaction.GetHash(), transactionmetadata.New)
if dbErr != nil {
return nil, ErrProcessBundleFailed.Derive(dbErr, "failed to retrieve transaction metadata")
}
currentTransactionMetadata.SetBundleHeadHash(headTransactionHash)
// update value bundle flag
if !newBundle.IsValueBundle() && currentTransaction.GetValue() != 0 {
newBundle.SetValueBundle(true)
}
// if we are done -> trigger events
if currentTransaction.IsTail() {
newBundle.SetTransactionHashes(mapTransactionsToTransactionHashes(bundleTransactions))
if newBundle.IsValueBundle() {
Events.ValueBundleReceived.Trigger(newBundle, bundleTransactions)
} else {
Events.DataBundleReceived.Trigger(newBundle, bundleTransactions)
}
return newBundle, nil
}
// try to iterate to next turn
if nextTransaction, err := tangle.GetTransaction(currentTransaction.GetTrunkTransactionHash()); err != nil {
return nil, ErrProcessBundleFailed.Derive(err, "failed to retrieve trunk while processing bundle")
} else {
currentTransaction = nextTransaction
}
}
})
}
func mapTransactionsToTransactionHashes(transactions []*value_transaction.ValueTransaction) (result []ternary.Trytes) {
result = make([]ternary.Trytes, len(transactions))
for k, v := range transactions {
result[k] = v.GetHash()
}
return
}
package bundleprocessor
import (
"github.com/iotaledger/goshimmer/packages/errors"
"github.com/iotaledger/goshimmer/packages/events"
"github.com/iotaledger/goshimmer/packages/model/bundle"
"github.com/iotaledger/goshimmer/packages/model/transactionmetadata"
"github.com/iotaledger/goshimmer/packages/model/value_transaction"
"github.com/iotaledger/goshimmer/packages/node"
"github.com/iotaledger/goshimmer/packages/ternary"
"github.com/iotaledger/goshimmer/plugins/tangle"
)
......@@ -22,74 +18,3 @@ func configure(plugin *node.Plugin) {
}
}))
}
func ProcessSolidBundleHead(headTransaction *value_transaction.ValueTransaction) (*bundle.Bundle, errors.IdentifiableError) {
// only process the bundle if we didn't process it, yet
return tangle.GetBundle(headTransaction.GetHash(), func(headTransactionHash ternary.Trytes) (*bundle.Bundle, errors.IdentifiableError) {
// abort if bundle syntax is wrong
if !headTransaction.IsHead() {
return nil, ErrProcessBundleFailed.Derive(errors.New("invalid parameter"), "transaction needs to be head of bundle")
}
// initialize event variables
newBundle := bundle.New(headTransactionHash)
bundleTransactions := make([]*value_transaction.ValueTransaction, 0)
// iterate through trunk transactions until we reach the tail
currentTransaction := headTransaction
for {
// abort if we reached a previous head
if currentTransaction.IsHead() && currentTransaction != headTransaction {
newBundle.SetTransactionHashes(mapTransactionsToTransactionHashes(bundleTransactions))
Events.InvalidBundleReceived.Trigger(newBundle, bundleTransactions)
return nil, ErrProcessBundleFailed.Derive(errors.New("invalid bundle found"), "missing bundle tail")
}
// update bundle transactions
bundleTransactions = append(bundleTransactions, currentTransaction)
// retrieve & update metadata
currentTransactionMetadata, dbErr := tangle.GetTransactionMetadata(currentTransaction.GetHash(), transactionmetadata.New)
if dbErr != nil {
return nil, ErrProcessBundleFailed.Derive(dbErr, "failed to retrieve transaction metadata")
}
currentTransactionMetadata.SetBundleHeadHash(headTransactionHash)
// update value bundle flag
if !newBundle.IsValueBundle() && currentTransaction.GetValue() != 0 {
newBundle.SetValueBundle(true)
}
// if we are done -> trigger events
if currentTransaction.IsTail() {
newBundle.SetTransactionHashes(mapTransactionsToTransactionHashes(bundleTransactions))
if newBundle.IsValueBundle() {
Events.ValueBundleReceived.Trigger(newBundle, bundleTransactions)
} else {
Events.DataBundleReceived.Trigger(newBundle, bundleTransactions)
}
return newBundle, nil
}
// try to iterate to next turn
if nextTransaction, err := tangle.GetTransaction(currentTransaction.GetTrunkTransactionHash()); err != nil {
return nil, ErrProcessBundleFailed.Derive(err, "failed to retrieve trunk while processing bundle")
} else {
currentTransaction = nextTransaction
}
}
})
}
func mapTransactionsToTransactionHashes(transactions []*value_transaction.ValueTransaction) (result []ternary.Trytes) {
result = make([]ternary.Trytes, len(transactions))
for k, v := range transactions {
result[k] = v.GetHash()
}
return
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment