Newer
Older
package workerpool
import (
)
type WorkerPool struct {
workerFnc func(Task)
options *Options
calls chan Task
terminate chan int
running bool
mutex sync.RWMutex
wait sync.WaitGroup
func New(workerFnc func(Task), optionalOptions ...Option) (result *WorkerPool) {
options := DEFAULT_OPTIONS.Override(optionalOptions...)
result = &WorkerPool{
workerFnc: workerFnc,
options: options,
}
func (wp *WorkerPool) Submit(params ...interface{}) (result chan interface{}) {
result = make(chan interface{}, 1)
wp.mutex.RLock()
if wp.running {
wp.calls <- Task{
params: params,
resultChan: result,
}
} else {
close(result)
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
func (wp *WorkerPool) Start() {
wp.mutex.Lock()
if !wp.running {
wp.running = true
wp.startWorkers()
}
wp.mutex.Unlock()
}
func (wp *WorkerPool) Run() {
wp.Start()
wp.wait.Wait()
}
func (wp *WorkerPool) Stop() {
go wp.StopAndWait()
}
func (wp *WorkerPool) StopAndWait() {
wp.mutex.Lock()
if wp.running {
wp.running = false
close(wp.terminate)
wp.resetChannels()
}
wp.wait.Wait()
wp.mutex.Unlock()
}
func (wp *WorkerPool) resetChannels() {
wp.calls = make(chan Task, wp.options.QueueSize)
wp.terminate = make(chan int, 1)
}
func (wp *WorkerPool) startWorkers() {
calls := wp.calls
terminate := wp.terminate
for i := 0; i < wp.options.WorkerCount; i++ {
wp.wait.Add(1)
for !aborted {
select {
case <-terminate:
aborted = true
case batchTask := <-calls:
wp.workerFnc(batchTask)
}