Skip to content
Snippets Groups Projects
workerpool.go 1.63 KiB
Newer Older
Hans Moog's avatar
Hans Moog committed
	"sync"
Hans Moog's avatar
Hans Moog committed
	workerFnc func(Task)
	options   *Options

	calls     chan Task
	terminate chan int

	running bool
	mutex   sync.RWMutex
	wait    sync.WaitGroup
Hans Moog's avatar
Hans Moog committed
func New(workerFnc func(Task), optionalOptions ...Option) (result *WorkerPool) {
	options := DEFAULT_OPTIONS.Override(optionalOptions...)

	result = &WorkerPool{
		workerFnc: workerFnc,
		options:   options,
Hans Moog's avatar
Hans Moog committed
	result.resetChannels()

	return
}

func (wp *WorkerPool) Submit(params ...interface{}) (result chan interface{}) {
	result = make(chan interface{}, 1)

Hans Moog's avatar
Hans Moog committed
	wp.mutex.RLock()

	if wp.running {
		wp.calls <- Task{
			params:     params,
			resultChan: result,
		}
	} else {
		close(result)
Hans Moog's avatar
Hans Moog committed
	wp.mutex.RUnlock()

Hans Moog's avatar
Hans Moog committed
func (wp *WorkerPool) Start() {
	wp.mutex.Lock()

	if !wp.running {
		wp.running = true

		wp.startWorkers()
	}

	wp.mutex.Unlock()
}

func (wp *WorkerPool) Run() {
	wp.Start()

	wp.wait.Wait()
}

func (wp *WorkerPool) Stop() {
	go wp.StopAndWait()
}

func (wp *WorkerPool) StopAndWait() {
	wp.mutex.Lock()

	if wp.running {
		wp.running = false

		close(wp.terminate)
		wp.resetChannels()
	}

	wp.wait.Wait()

	wp.mutex.Unlock()
}

func (wp *WorkerPool) resetChannels() {
	wp.calls = make(chan Task, wp.options.QueueSize)
	wp.terminate = make(chan int, 1)
}

func (wp *WorkerPool) startWorkers() {
Hans Moog's avatar
Hans Moog committed
	calls := wp.calls
	terminate := wp.terminate

	for i := 0; i < wp.options.WorkerCount; i++ {
		wp.wait.Add(1)

Hans Moog's avatar
Hans Moog committed
			aborted := false
Hans Moog's avatar
Hans Moog committed
			for !aborted {
				select {
				case <-terminate:
					aborted = true

				case batchTask := <-calls:
					wp.workerFnc(batchTask)
				}
Hans Moog's avatar
Hans Moog committed

			wp.wait.Done()