Skip to content
Snippets Groups Projects
Commit 54340442 authored by Angelo Capossele's avatar Angelo Capossele Committed by Luca Moser
Browse files

Use the workerpool from hive.go (#127)

* :arrow_up: uses workerpool from hive.go

* :heavy_minus_sign: removes  internal workerpool
parent bcbcf50c
No related branches found
No related tags found
No related merge requests found
package workerpool
import (
"runtime"
)
var DEFAULT_OPTIONS = &Options{
WorkerCount: 2 * runtime.NumCPU(),
QueueSize: 4 * runtime.NumCPU(),
}
func WorkerCount(workerCount int) Option {
return func(args *Options) {
args.WorkerCount = workerCount
}
}
func QueueSize(queueSize int) Option {
return func(args *Options) {
args.QueueSize = queueSize
}
}
type Options struct {
WorkerCount int
QueueSize int
}
func (options Options) Override(optionalOptions ...Option) *Options {
result := &options
for _, option := range optionalOptions {
option(result)
}
return result
}
type Option func(*Options)
package workerpool
type Task struct {
params []interface{}
resultChan chan interface{}
}
func (task *Task) Return(result interface{}) {
task.resultChan <- result
close(task.resultChan)
}
func (task *Task) Param(index int) interface{} {
return task.params[index]
}
package workerpool
import (
"sync"
)
type WorkerPool struct {
workerFnc func(Task)
options *Options
calls chan Task
terminate chan int
running bool
mutex sync.RWMutex
wait sync.WaitGroup
}
func New(workerFnc func(Task), optionalOptions ...Option) (result *WorkerPool) {
options := DEFAULT_OPTIONS.Override(optionalOptions...)
result = &WorkerPool{
workerFnc: workerFnc,
options: options,
}
result.resetChannels()
return
}
func (wp *WorkerPool) Submit(params ...interface{}) (result chan interface{}) {
result = make(chan interface{}, 1)
wp.mutex.RLock()
if wp.running {
wp.calls <- Task{
params: params,
resultChan: result,
}
} else {
close(result)
}
wp.mutex.RUnlock()
return
}
func (wp *WorkerPool) Start() {
wp.mutex.Lock()
if !wp.running {
wp.running = true
wp.startWorkers()
}
wp.mutex.Unlock()
}
func (wp *WorkerPool) Run() {
wp.Start()
wp.wait.Wait()
}
func (wp *WorkerPool) Stop() {
go wp.StopAndWait()
}
func (wp *WorkerPool) StopAndWait() {
wp.mutex.Lock()
if wp.running {
wp.running = false
close(wp.terminate)
wp.resetChannels()
}
wp.wait.Wait()
wp.mutex.Unlock()
}
func (wp *WorkerPool) resetChannels() {
wp.calls = make(chan Task, wp.options.QueueSize)
wp.terminate = make(chan int, 1)
}
func (wp *WorkerPool) startWorkers() {
calls := wp.calls
terminate := wp.terminate
for i := 0; i < wp.options.WorkerCount; i++ {
wp.wait.Add(1)
go func() {
aborted := false
for !aborted {
select {
case <-terminate:
aborted = true
case batchTask := <-calls:
wp.workerFnc(batchTask)
}
}
wp.wait.Done()
}()
}
}
package workerpool
import (
"sync"
"testing"
)
func Benchmark(b *testing.B) {
pool := New(func(task Task) {
task.Return(task.Param(0))
}, WorkerCount(10), QueueSize(2000))
pool.Start()
var wg sync.WaitGroup
for i := 0; i < b.N; i++ {
wg.Add(1)
go func(i int) {
<-pool.Submit(i)
wg.Done()
}(i)
}
wg.Wait()
}
......@@ -8,8 +8,8 @@ import (
"github.com/iotaledger/goshimmer/packages/model/bundle"
"github.com/iotaledger/goshimmer/packages/model/transactionmetadata"
"github.com/iotaledger/goshimmer/packages/model/value_transaction"
"github.com/iotaledger/goshimmer/packages/workerpool"
"github.com/iotaledger/goshimmer/plugins/tangle"
"github.com/iotaledger/hive.go/workerpool"
"github.com/iotaledger/iota.go/trinary"
)
......
......@@ -4,7 +4,7 @@ import (
"github.com/iotaledger/goshimmer/packages/errors"
"github.com/iotaledger/goshimmer/packages/model/bundle"
"github.com/iotaledger/goshimmer/packages/model/value_transaction"
"github.com/iotaledger/goshimmer/packages/workerpool"
"github.com/iotaledger/hive.go/workerpool"
"github.com/iotaledger/iota.go/curl"
"github.com/iotaledger/iota.go/signing"
"github.com/iotaledger/iota.go/trinary"
......
......@@ -11,9 +11,9 @@ import (
"github.com/iotaledger/goshimmer/packages/model/transactionmetadata"
"github.com/iotaledger/goshimmer/packages/model/value_transaction"
"github.com/iotaledger/goshimmer/packages/shutdown"
"github.com/iotaledger/goshimmer/packages/workerpool"
"github.com/iotaledger/hive.go/daemon"
"github.com/iotaledger/hive.go/events"
"github.com/iotaledger/hive.go/workerpool"
"github.com/iotaledger/iota.go/trinary"
)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment