Skip to content
Snippets Groups Projects
Commit f5038034 authored by Hans Moog's avatar Hans Moog
Browse files

Feat: added worker pool

parent a48e348d
No related branches found
No related tags found
No related merge requests found
package workerpool
import (
"runtime"
"time"
)
type BatchWorkerPoolOptions struct {
WorkerCount int
QueueSize int
MaxBatchSize int
BatchCollectionTimeout time.Duration
}
type BatchWorkerPool struct {
workerFnc func([]Call)
options BatchWorkerPoolOptions
callsChan chan Call
batchedCallsChan chan []Call
}
func NewBatchWorkerPool(workerFnc func([]Call), options BatchWorkerPoolOptions) *BatchWorkerPool {
return &BatchWorkerPool{
workerFnc: workerFnc,
options: options,
callsChan: make(chan Call, options.QueueSize),
batchedCallsChan: make(chan []Call, 2*options.WorkerCount),
}
}
func (wp *BatchWorkerPool) Submit(params ...interface{}) (result chan interface{}) {
result = make(chan interface{}, 1)
wp.callsChan <- Call{
params: params,
resultChan: result,
}
return
}
func (wp *BatchWorkerPool) Start() {
wp.startBatchDispatcher()
wp.startBatchWorkers()
}
func (wp *BatchWorkerPool) startBatchDispatcher() {
go func() {
for {
// wait for first request to start processing at all
batchTask := append(make([]Call, 0), <-wp.callsChan)
collectionTimeout := time.After(wp.options.BatchCollectionTimeout)
// collect additional requests that arrive within the timeout
CollectAdditionalCalls:
for {
select {
case <-collectionTimeout:
break CollectAdditionalCalls
case call := <-wp.callsChan:
batchTask = append(batchTask, call)
if len(batchTask) == wp.options.MaxBatchSize {
break CollectAdditionalCalls
}
}
}
wp.batchedCallsChan <- batchTask
}
}()
}
func (wp *BatchWorkerPool) startBatchWorkers() {
for i := 0; i < wp.options.WorkerCount; i++ {
go func() {
for {
batchTask := <-wp.batchedCallsChan
wp.workerFnc(batchTask)
}
}()
}
}
var (
DEFAULT_OPTIONS = BatchWorkerPoolOptions{
WorkerCount: 2 * runtime.NumCPU(),
QueueSize: 500,
MaxBatchSize: 64,
BatchCollectionTimeout: 10 * time.Millisecond,
}
)
package workerpool
import (
"fmt"
"sync"
"testing"
"github.com/iotaledger/goshimmer/packages/curl"
"github.com/iotaledger/goshimmer/packages/ternary"
)
func Benchmark(b *testing.B) {
trits := ternary.Trytes("A99999999999999999999999A99999999999999999999999A99999999999999999999999A99999999999999999999999").ToTrits()
options := DEFAULT_OPTIONS
options.QueueSize = 5000
pool := NewBatchWorkerPool(processBatchHashRequests, options)
pool.Start()
b.ResetTimer()
var wg sync.WaitGroup
for i := 0; i < b.N; i++ {
resultChan := pool.Submit(trits)
wg.Add(1)
go func() {
<-resultChan
wg.Done()
}()
}
wg.Wait()
}
func TestBatchWorkerPool(t *testing.T) {
pool := NewBatchWorkerPool(processBatchHashRequests, DEFAULT_OPTIONS)
pool.Start()
trits := ternary.Trytes("A99999").ToTrits()
var wg sync.WaitGroup
for i := 0; i < 10007; i++ {
resultChan := pool.Submit(trits)
wg.Add(1)
go func() {
<-resultChan
wg.Done()
}()
}
wg.Wait()
}
func processBatchHashRequests(calls []Call) {
if len(calls) > 1 {
// multiplex the requests
multiplexer := ternary.NewBCTernaryMultiplexer()
for _, hashRequest := range calls {
multiplexer.Add(hashRequest.params[0].(ternary.Trits))
}
bcTrits, err := multiplexer.Extract()
if err != nil {
fmt.Println(err)
}
// calculate the hash
bctCurl := curl.NewBCTCurl(243, 81)
bctCurl.Reset()
bctCurl.Absorb(bcTrits)
// extract the results from the demultiplexer
demux := ternary.NewBCTernaryDemultiplexer(bctCurl.Squeeze(243))
for i, hashRequest := range calls {
hashRequest.resultChan <- demux.Get(i)
close(hashRequest.resultChan)
}
} else {
var resp = make(ternary.Trits, 243)
curl := curl.NewCurl(243, 81)
curl.Absorb(calls[0].params[0].(ternary.Trits), 0, len(calls[0].params[0].(ternary.Trits)))
curl.Squeeze(resp, 0, 81)
calls[0].resultChan <- resp
close(calls[0].resultChan)
}
}
package workerpool
import (
"runtime"
)
var DEFAULT_OPTIONS = &Options{
WorkerCount: 2 * runtime.NumCPU(),
QueueSize: 4 * runtime.NumCPU(),
}
func WorkerCount(workerCount int) Option {
return func(args *Options) {
args.WorkerCount = workerCount
}
}
func QueueSize(queueSize int) Option {
return func(args *Options) {
args.QueueSize = queueSize
}
}
type Options struct {
WorkerCount int
QueueSize int
}
func (options Options) Override(optionalOptions ...Option) *Options {
result := &options
for _, option := range optionalOptions {
option(result)
}
return result
}
type Option func(*Options)
package workerpool
type Task struct {
params []interface{}
resultChan chan interface{}
}
func (task *Task) Return(result interface{}) {
task.resultChan <- result
close(task.resultChan)
}
func (task *Task) Param(index int) interface{} {
return task.params[index]
}
package workerpool
import (
"fmt"
"sync"
)
type WorkerPool struct {
maxWorkers int
MaxQueueSize int
callsChan chan Call
workerFnc func(Task)
options *Options
calls chan Task
terminate chan int
running bool
mutex sync.RWMutex
wait sync.WaitGroup
}
func New(maxWorkers int) *WorkerPool {
return &WorkerPool{
maxWorkers: maxWorkers,
func New(workerFnc func(Task), optionalOptions ...Option) (result *WorkerPool) {
options := DEFAULT_OPTIONS.Override(optionalOptions...)
result = &WorkerPool{
workerFnc: workerFnc,
options: options,
}
}
type Call struct {
params []interface{}
resultChan chan interface{}
result.resetChannels()
return
}
func (wp *WorkerPool) Submit(params ...interface{}) (result chan interface{}) {
result = make(chan interface{}, 1)
wp.callsChan <- Call{
params: params,
resultChan: result,
wp.mutex.RLock()
if wp.running {
wp.calls <- Task{
params: params,
resultChan: result,
}
} else {
close(result)
}
wp.mutex.RUnlock()
return
}
func (wp *WorkerPool) Start() {
wp.mutex.Lock()
if !wp.running {
wp.running = true
wp.startWorkers()
}
wp.mutex.Unlock()
}
func (wp *WorkerPool) Run() {
wp.Start()
wp.wait.Wait()
}
func (wp *WorkerPool) Stop() {
go wp.StopAndWait()
}
func (wp *WorkerPool) StopAndWait() {
wp.mutex.Lock()
if wp.running {
wp.running = false
close(wp.terminate)
wp.resetChannels()
}
wp.wait.Wait()
wp.mutex.Unlock()
}
func (wp *WorkerPool) resetChannels() {
wp.calls = make(chan Task, wp.options.QueueSize)
wp.terminate = make(chan int, 1)
}
func (wp *WorkerPool) startWorkers() {
for i := 0; i < wp.maxWorkers; i++ {
calls := wp.calls
terminate := wp.terminate
for i := 0; i < wp.options.WorkerCount; i++ {
wp.wait.Add(1)
go func() {
for {
batchTasks := <-wp.callsChan
aborted := false
fmt.Println(batchTasks)
for !aborted {
select {
case <-terminate:
aborted = true
case batchTask := <-calls:
wp.workerFnc(batchTask)
}
}
wp.wait.Done()
}()
}
}
func (wp *WorkerPool) Start() {
wp.callsChan = make(chan Call, 2*wp.maxWorkers)
wp.startWorkers()
}
package workerpool
import (
"sync"
"testing"
)
func Benchmark(b *testing.B) {
pool := New(func(task Task) {
task.Return(task.Param(0))
}, WorkerCount(10), QueueSize(2000))
pool.Start()
var wg sync.WaitGroup
for i := 0; i < b.N; i++ {
wg.Add(1)
go func() {
<-pool.Submit(i)
wg.Done()
}()
}
wg.Wait()
}
......@@ -12,11 +12,13 @@ func configure(plugin *node.Plugin) {
configureTransactionDatabase(plugin)
configureTransactionMetaDataDatabase(plugin)
configureApproversDatabase(plugin)
configureBundleDatabase(plugin)
configureSolidifier(plugin)
}
func run(plugin *node.Plugin) {
// this plugin has no background workers
runSolidifier(plugin)
}
// endregion ///////////////////////////////////////////////////////////////////////////////////////////////////////////
......@@ -10,36 +10,42 @@ import (
"github.com/iotaledger/goshimmer/packages/model/value_transaction"
"github.com/iotaledger/goshimmer/packages/node"
"github.com/iotaledger/goshimmer/packages/ternary"
"github.com/iotaledger/goshimmer/packages/workerpool"
"github.com/iotaledger/goshimmer/plugins/gossip"
)
// region plugin module setup //////////////////////////////////////////////////////////////////////////////////////////
//var solidifierChan = make(chan *value_transaction.ValueTransaction, 1000)
const NUMBER_OF_WORKERS = 3000
var tasksChan = make(chan *meta_transaction.MetaTransaction, NUMBER_OF_WORKERS)
var workerPool *workerpool.WorkerPool
func configureSolidifier(plugin *node.Plugin) {
for i := 0; i < NUMBER_OF_WORKERS; i++ {
go func() {
for {
select {
case <-daemon.ShutdownSignal:
return
case rawTransaction := <-tasksChan:
processMetaTransaction(plugin, rawTransaction)
}
}
}()
}
workerPool = workerpool.New(func(task workerpool.Task) {
processMetaTransaction(plugin, task.Param(0).(*meta_transaction.MetaTransaction))
}, workerpool.WorkerCount(WORKER_COUNT), workerpool.QueueSize(2*WORKER_COUNT))
gossip.Events.ReceiveTransaction.Attach(events.NewClosure(func(rawTransaction *meta_transaction.MetaTransaction) {
tasksChan <- rawTransaction
workerPool.Submit(rawTransaction)
}))
daemon.Events.Shutdown.Attach(events.NewClosure(func() {
plugin.LogInfo("Stopping Solidifier ...")
workerPool.Stop()
}))
}
func runSolidifier(plugin *node.Plugin) {
plugin.LogInfo("Starting Solidifier ...")
daemon.BackgroundWorker(func() {
plugin.LogSuccess("Starting Solidifier ... done")
workerPool.Run()
plugin.LogSuccess("Stopping Solidifier ... done")
})
}
// endregion ///////////////////////////////////////////////////////////////////////////////////////////////////////////
// Checks and updates the solid flag of a single transaction.
......@@ -115,10 +121,10 @@ func IsSolid(transaction *value_transaction.ValueTransaction) (bool, errors.Iden
}
func propagateSolidity(transactionHash ternary.Trytes) errors.IdentifiableError {
if approvers, err := GetApprovers(transactionHash, approvers.New); err != nil {
if transactionApprovers, err := GetApprovers(transactionHash, approvers.New); err != nil {
return err
} else {
for _, approverHash := range approvers.GetHashes() {
for _, approverHash := range transactionApprovers.GetHashes() {
if approver, err := GetTransaction(approverHash); err != nil {
return err
} else if approver != nil {
......@@ -178,5 +184,6 @@ func processTransaction(plugin *node.Plugin, transaction *value_transaction.Valu
return
}
//solidifierChan <- transaction
}
const WORKER_COUNT = 400
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment