diff --git a/packages/workerpool/batchworkerpool.go b/packages/workerpool/batchworkerpool.go deleted file mode 100644 index bfd2d03762c6643662dca5c01bc32137a05edec4..0000000000000000000000000000000000000000 --- a/packages/workerpool/batchworkerpool.go +++ /dev/null @@ -1,95 +0,0 @@ -package workerpool - -import ( - "runtime" - "time" -) - -type BatchWorkerPoolOptions struct { - WorkerCount int - QueueSize int - MaxBatchSize int - BatchCollectionTimeout time.Duration -} - -type BatchWorkerPool struct { - workerFnc func([]Call) - options BatchWorkerPoolOptions - callsChan chan Call - batchedCallsChan chan []Call -} - -func NewBatchWorkerPool(workerFnc func([]Call), options BatchWorkerPoolOptions) *BatchWorkerPool { - return &BatchWorkerPool{ - workerFnc: workerFnc, - options: options, - - callsChan: make(chan Call, options.QueueSize), - batchedCallsChan: make(chan []Call, 2*options.WorkerCount), - } -} - -func (wp *BatchWorkerPool) Submit(params ...interface{}) (result chan interface{}) { - result = make(chan interface{}, 1) - - wp.callsChan <- Call{ - params: params, - resultChan: result, - } - - return -} - -func (wp *BatchWorkerPool) Start() { - wp.startBatchDispatcher() - wp.startBatchWorkers() -} - -func (wp *BatchWorkerPool) startBatchDispatcher() { - go func() { - for { - // wait for first request to start processing at all - batchTask := append(make([]Call, 0), <-wp.callsChan) - - collectionTimeout := time.After(wp.options.BatchCollectionTimeout) - - // collect additional requests that arrive within the timeout - CollectAdditionalCalls: - for { - select { - case <-collectionTimeout: - break CollectAdditionalCalls - case call := <-wp.callsChan: - batchTask = append(batchTask, call) - - if len(batchTask) == wp.options.MaxBatchSize { - break CollectAdditionalCalls - } - } - } - - wp.batchedCallsChan <- batchTask - } - }() -} - -func (wp *BatchWorkerPool) startBatchWorkers() { - for i := 0; i < wp.options.WorkerCount; i++ { - go func() { - for { - batchTask := <-wp.batchedCallsChan - - wp.workerFnc(batchTask) - } - }() - } -} - -var ( - DEFAULT_OPTIONS = BatchWorkerPoolOptions{ - WorkerCount: 2 * runtime.NumCPU(), - QueueSize: 500, - MaxBatchSize: 64, - BatchCollectionTimeout: 10 * time.Millisecond, - } -) diff --git a/packages/workerpool/batchworkerpool_test.go b/packages/workerpool/batchworkerpool_test.go deleted file mode 100644 index 737842abf3bfda282b6c340c76a6789fba916f67..0000000000000000000000000000000000000000 --- a/packages/workerpool/batchworkerpool_test.go +++ /dev/null @@ -1,90 +0,0 @@ -package workerpool - -import ( - "fmt" - "sync" - "testing" - - "github.com/iotaledger/goshimmer/packages/curl" - "github.com/iotaledger/goshimmer/packages/ternary" -) - -func Benchmark(b *testing.B) { - trits := ternary.Trytes("A99999999999999999999999A99999999999999999999999A99999999999999999999999A99999999999999999999999").ToTrits() - - options := DEFAULT_OPTIONS - options.QueueSize = 5000 - - pool := NewBatchWorkerPool(processBatchHashRequests, options) - pool.Start() - - b.ResetTimer() - - var wg sync.WaitGroup - for i := 0; i < b.N; i++ { - resultChan := pool.Submit(trits) - - wg.Add(1) - go func() { - <-resultChan - - wg.Done() - }() - } - wg.Wait() -} - -func TestBatchWorkerPool(t *testing.T) { - pool := NewBatchWorkerPool(processBatchHashRequests, DEFAULT_OPTIONS) - pool.Start() - - trits := ternary.Trytes("A99999").ToTrits() - - var wg sync.WaitGroup - for i := 0; i < 10007; i++ { - resultChan := pool.Submit(trits) - - wg.Add(1) - go func() { - <-resultChan - - wg.Done() - }() - } - wg.Wait() -} - -func processBatchHashRequests(calls []Call) { - if len(calls) > 1 { - // multiplex the requests - multiplexer := ternary.NewBCTernaryMultiplexer() - for _, hashRequest := range calls { - multiplexer.Add(hashRequest.params[0].(ternary.Trits)) - } - bcTrits, err := multiplexer.Extract() - if err != nil { - fmt.Println(err) - } - - // calculate the hash - bctCurl := curl.NewBCTCurl(243, 81) - bctCurl.Reset() - bctCurl.Absorb(bcTrits) - - // extract the results from the demultiplexer - demux := ternary.NewBCTernaryDemultiplexer(bctCurl.Squeeze(243)) - for i, hashRequest := range calls { - hashRequest.resultChan <- demux.Get(i) - close(hashRequest.resultChan) - } - } else { - var resp = make(ternary.Trits, 243) - - curl := curl.NewCurl(243, 81) - curl.Absorb(calls[0].params[0].(ternary.Trits), 0, len(calls[0].params[0].(ternary.Trits))) - curl.Squeeze(resp, 0, 81) - - calls[0].resultChan <- resp - close(calls[0].resultChan) - } -} diff --git a/packages/workerpool/options.go b/packages/workerpool/options.go new file mode 100644 index 0000000000000000000000000000000000000000..dc6b27e4da9145b0a34101dfd22696c76b63d83e --- /dev/null +++ b/packages/workerpool/options.go @@ -0,0 +1,38 @@ +package workerpool + +import ( + "runtime" +) + +var DEFAULT_OPTIONS = &Options{ + WorkerCount: 2 * runtime.NumCPU(), + QueueSize: 4 * runtime.NumCPU(), +} + +func WorkerCount(workerCount int) Option { + return func(args *Options) { + args.WorkerCount = workerCount + } +} + +func QueueSize(queueSize int) Option { + return func(args *Options) { + args.QueueSize = queueSize + } +} + +type Options struct { + WorkerCount int + QueueSize int +} + +func (options Options) Override(optionalOptions ...Option) *Options { + result := &options + for _, option := range optionalOptions { + option(result) + } + + return result +} + +type Option func(*Options) diff --git a/packages/workerpool/task.go b/packages/workerpool/task.go new file mode 100644 index 0000000000000000000000000000000000000000..8c56b1a9274f7e16df7fa02f47e53553703158cd --- /dev/null +++ b/packages/workerpool/task.go @@ -0,0 +1,15 @@ +package workerpool + +type Task struct { + params []interface{} + resultChan chan interface{} +} + +func (task *Task) Return(result interface{}) { + task.resultChan <- result + close(task.resultChan) +} + +func (task *Task) Param(index int) interface{} { + return task.params[index] +} diff --git a/packages/workerpool/workerpool.go b/packages/workerpool/workerpool.go index eba10b1f6f2fe1e4609455004ed1351ad629fb54..95b6a2a7d2814b152b9d8320d2f3873bee0a428b 100644 --- a/packages/workerpool/workerpool.go +++ b/packages/workerpool/workerpool.go @@ -1,51 +1,116 @@ package workerpool import ( - "fmt" + "sync" ) type WorkerPool struct { - maxWorkers int - MaxQueueSize int - callsChan chan Call + workerFnc func(Task) + options *Options + + calls chan Task + terminate chan int + + running bool + mutex sync.RWMutex + wait sync.WaitGroup } -func New(maxWorkers int) *WorkerPool { - return &WorkerPool{ - maxWorkers: maxWorkers, +func New(workerFnc func(Task), optionalOptions ...Option) (result *WorkerPool) { + options := DEFAULT_OPTIONS.Override(optionalOptions...) + + result = &WorkerPool{ + workerFnc: workerFnc, + options: options, } -} -type Call struct { - params []interface{} - resultChan chan interface{} + result.resetChannels() + + return } func (wp *WorkerPool) Submit(params ...interface{}) (result chan interface{}) { result = make(chan interface{}, 1) - wp.callsChan <- Call{ - params: params, - resultChan: result, + wp.mutex.RLock() + + if wp.running { + wp.calls <- Task{ + params: params, + resultChan: result, + } + } else { + close(result) } + wp.mutex.RUnlock() + return } +func (wp *WorkerPool) Start() { + wp.mutex.Lock() + + if !wp.running { + wp.running = true + + wp.startWorkers() + } + + wp.mutex.Unlock() +} + +func (wp *WorkerPool) Run() { + wp.Start() + + wp.wait.Wait() +} + +func (wp *WorkerPool) Stop() { + go wp.StopAndWait() +} + +func (wp *WorkerPool) StopAndWait() { + wp.mutex.Lock() + + if wp.running { + wp.running = false + + close(wp.terminate) + wp.resetChannels() + } + + wp.wait.Wait() + + wp.mutex.Unlock() +} + +func (wp *WorkerPool) resetChannels() { + wp.calls = make(chan Task, wp.options.QueueSize) + wp.terminate = make(chan int, 1) +} + func (wp *WorkerPool) startWorkers() { - for i := 0; i < wp.maxWorkers; i++ { + calls := wp.calls + terminate := wp.terminate + + for i := 0; i < wp.options.WorkerCount; i++ { + wp.wait.Add(1) + go func() { - for { - batchTasks := <-wp.callsChan + aborted := false - fmt.Println(batchTasks) + for !aborted { + select { + case <-terminate: + aborted = true + + case batchTask := <-calls: + wp.workerFnc(batchTask) + } } + + wp.wait.Done() }() } } - -func (wp *WorkerPool) Start() { - wp.callsChan = make(chan Call, 2*wp.maxWorkers) - - wp.startWorkers() -} diff --git a/packages/workerpool/workerpool_test.go b/packages/workerpool/workerpool_test.go new file mode 100644 index 0000000000000000000000000000000000000000..fb8f83e38adfe543488cc7170d77776fc74d66ca --- /dev/null +++ b/packages/workerpool/workerpool_test.go @@ -0,0 +1,26 @@ +package workerpool + +import ( + "sync" + "testing" +) + +func Benchmark(b *testing.B) { + pool := New(func(task Task) { + task.Return(task.Param(0)) + }, WorkerCount(10), QueueSize(2000)) + pool.Start() + + var wg sync.WaitGroup + for i := 0; i < b.N; i++ { + wg.Add(1) + + go func() { + <-pool.Submit(i) + + wg.Done() + }() + } + + wg.Wait() +} diff --git a/plugins/tangle/plugin.go b/plugins/tangle/plugin.go index 00f2dfaea6bc8481e11720ed9d8124f11e40ee93..df3862f6a3857cb953672094d123df88c2fe6667 100644 --- a/plugins/tangle/plugin.go +++ b/plugins/tangle/plugin.go @@ -12,11 +12,13 @@ func configure(plugin *node.Plugin) { configureTransactionDatabase(plugin) configureTransactionMetaDataDatabase(plugin) configureApproversDatabase(plugin) + configureBundleDatabase(plugin) configureSolidifier(plugin) } func run(plugin *node.Plugin) { // this plugin has no background workers + runSolidifier(plugin) } // endregion /////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/plugins/tangle/solidifier.go b/plugins/tangle/solidifier.go index ce66662a01ff3d8db3535e067bbf30c3daf84207..bffbb046a5e08af0a64341ea9f063969e3c29864 100644 --- a/plugins/tangle/solidifier.go +++ b/plugins/tangle/solidifier.go @@ -10,36 +10,42 @@ import ( "github.com/iotaledger/goshimmer/packages/model/value_transaction" "github.com/iotaledger/goshimmer/packages/node" "github.com/iotaledger/goshimmer/packages/ternary" + "github.com/iotaledger/goshimmer/packages/workerpool" "github.com/iotaledger/goshimmer/plugins/gossip" ) // region plugin module setup ////////////////////////////////////////////////////////////////////////////////////////// -//var solidifierChan = make(chan *value_transaction.ValueTransaction, 1000) - -const NUMBER_OF_WORKERS = 3000 - -var tasksChan = make(chan *meta_transaction.MetaTransaction, NUMBER_OF_WORKERS) +var workerPool *workerpool.WorkerPool func configureSolidifier(plugin *node.Plugin) { - for i := 0; i < NUMBER_OF_WORKERS; i++ { - go func() { - for { - select { - case <-daemon.ShutdownSignal: - return - case rawTransaction := <-tasksChan: - processMetaTransaction(plugin, rawTransaction) - } - } - }() - } + workerPool = workerpool.New(func(task workerpool.Task) { + processMetaTransaction(plugin, task.Param(0).(*meta_transaction.MetaTransaction)) + }, workerpool.WorkerCount(WORKER_COUNT), workerpool.QueueSize(2*WORKER_COUNT)) gossip.Events.ReceiveTransaction.Attach(events.NewClosure(func(rawTransaction *meta_transaction.MetaTransaction) { - tasksChan <- rawTransaction + workerPool.Submit(rawTransaction) + })) + + daemon.Events.Shutdown.Attach(events.NewClosure(func() { + plugin.LogInfo("Stopping Solidifier ...") + + workerPool.Stop() })) } +func runSolidifier(plugin *node.Plugin) { + plugin.LogInfo("Starting Solidifier ...") + + daemon.BackgroundWorker(func() { + plugin.LogSuccess("Starting Solidifier ... done") + + workerPool.Run() + + plugin.LogSuccess("Stopping Solidifier ... done") + }) +} + // endregion /////////////////////////////////////////////////////////////////////////////////////////////////////////// // Checks and updates the solid flag of a single transaction. @@ -115,10 +121,10 @@ func IsSolid(transaction *value_transaction.ValueTransaction) (bool, errors.Iden } func propagateSolidity(transactionHash ternary.Trytes) errors.IdentifiableError { - if approvers, err := GetApprovers(transactionHash, approvers.New); err != nil { + if transactionApprovers, err := GetApprovers(transactionHash, approvers.New); err != nil { return err } else { - for _, approverHash := range approvers.GetHashes() { + for _, approverHash := range transactionApprovers.GetHashes() { if approver, err := GetTransaction(approverHash); err != nil { return err } else if approver != nil { @@ -178,5 +184,6 @@ func processTransaction(plugin *node.Plugin, transaction *value_transaction.Valu return } - //solidifierChan <- transaction } + +const WORKER_COUNT = 400