diff --git a/packages/binary/datastructure/timedqueue.go b/packages/binary/datastructure/timedqueue.go index d38755f0b39b024a1b07875cce79e4cee6709140..0a62bd200785830aaaeb028f3042d1a467280380 100644 --- a/packages/binary/datastructure/timedqueue.go +++ b/packages/binary/datastructure/timedqueue.go @@ -2,7 +2,6 @@ package datastructure import ( "container/heap" - "fmt" "sync" "time" @@ -16,17 +15,19 @@ import ( type TimedQueue struct { heap timedHeap heapMutex sync.RWMutex - nonEmpty sync.WaitGroup - shutdown chan byte + waitForNewElementsWG sync.WaitGroup + queueEmptyWG sync.WaitGroup + shutdownSignal chan byte + shutdownFlag typeutils.AtomicBool returnPendingElementsAfterShutdown typeutils.AtomicBool } // NewTimedQueue is the constructor for the TimedQueue. func NewTimedQueue() (queue *TimedQueue) { queue = &TimedQueue{ - shutdown: make(chan byte), + shutdownSignal: make(chan byte), } - queue.nonEmpty.Add(1) + queue.waitForNewElementsWG.Add(1) return } @@ -44,7 +45,7 @@ func (t *TimedQueue) Add(value interface{}, scheduledTime time.Time) (addedEleme // mark queue as non-empty if len(t.heap) == 0 { - t.nonEmpty.Done() + t.waitForNewElementsWG.Done() } // add new element @@ -76,14 +77,14 @@ func (t *TimedQueue) Poll() interface{} { // mark the queue as empty if last element was polled if len(t.heap) == 0 { - t.nonEmpty.Add(1) + t.waitForNewElementsWG.Add(1) } t.heapMutex.Unlock() // wait for the return value to become due select { // abort if the queue was shutdown - case <-t.shutdown: + case <-t.shutdownSignal: if !t.returnPendingElementsAfterShutdown.IsSet() { if t.Size() != 0 { return t.Poll() @@ -92,8 +93,6 @@ func (t *TimedQueue) Poll() interface{} { return nil } - fmt.Println("RETURN LAST ELEMENTS") - return polledElement.value // abort waiting for this element and wait for the next one instead if it was canceled case <-polledElement.cancel: @@ -113,13 +112,20 @@ func (t *TimedQueue) Size() int { return len(t.heap) } -// WaitForNewElements waits for the queue to be non-empty. This can be used by i.e. schedulers to continuously iterate -// over the queue to process its elements. It returns false, if the queue has been shutdown. -func (t *TimedQueue) WaitForNewElements() bool { - t.nonEmpty.Wait() +// IsProcessingElements returns true if the queue was not shutdown, yet or still has queued elements to process after a +// shutdown. +// +// It will wait for the arrival of new elements if the waitForElements parameter is set to true. This can accordingly be +// used by schedulers that want to continuously iterate over the queue without wasting CPU cycles for checking for new +// elements over and over again. +func (t *TimedQueue) IsProcessingElements(waitForElements bool) bool { + // wait for elements to arrive + if waitForElements { + t.waitForNewElementsWG.Wait() + } select { - case <-t.shutdown: + case <-t.shutdownSignal: return t.returnPendingElementsAfterShutdown.IsSet() && t.Size() != 0 default: return true @@ -127,20 +133,31 @@ func (t *TimedQueue) WaitForNewElements() bool { } // Shutdown terminates the queue and stops the currently waiting Poll requests. -func (t *TimedQueue) Shutdown(returnPendingElements bool) { +func (t *TimedQueue) Shutdown(processQueuedElements bool) { + // acquire lock to be thread safe t.heapMutex.Lock() - select { - // only shutdown once - case <-t.shutdown: - // do nothing - default: - t.returnPendingElementsAfterShutdown.SetTo(returnPendingElements) - close(t.shutdown) + // abort if the queue was shutdown before + if t.shutdownFlag.IsSet() { + t.heapMutex.Unlock() + + return } + + // set the flag that indicates if the currently queued elements should be executed immediately + t.returnPendingElementsAfterShutdown.SetTo(processQueuedElements) + + // mark the queue as shutdown + t.shutdownFlag.Set() + + // close the shutdown channel + close(t.shutdownSignal) + + // release the lock t.heapMutex.Unlock() - t.nonEmpty.Wait() + // wait for the queue to be empty + t.queueEmptyWG.Wait() } // removeElement is an internal utility function that removes the given element from the queue. @@ -155,7 +172,7 @@ func (t *TimedQueue) removeElement(element *TimedQueueElement) { // mark the queue as empty if len(t.heap) == 0 { - t.nonEmpty.Add(1) + t.waitForNewElementsWG.Add(1) } } diff --git a/packages/binary/datastructure/timedqueue_test.go b/packages/binary/datastructure/timedqueue_test.go index f0a20cae3d0845e57bb4e4a468de22802380a2e2..de2cf709890fefd991b8bbc96f625d64c65ed9c3 100644 --- a/packages/binary/datastructure/timedqueue_test.go +++ b/packages/binary/datastructure/timedqueue_test.go @@ -7,28 +7,28 @@ import ( ) func TestTimedQueue_Poll(t *testing.T) { - tq := NewTimedQueue() + timedQueue := NewTimedQueue() go func() { - for tq.WaitForNewElements() { - for currentEntry := tq.Poll(); currentEntry != nil; currentEntry = tq.Poll() { + for timedQueue.IsProcessingElements(true) { + for currentEntry := timedQueue.Poll(); currentEntry != nil; currentEntry = timedQueue.Poll() { fmt.Println(currentEntry) } } }() - tq.Add(2, time.Now().Add(1*time.Second)) - elem := tq.Add(4, time.Now().Add(2*time.Second)) - tq.Add(6, time.Now().Add(3*time.Second)) + timedQueue.Add(2, time.Now().Add(1*time.Second)) + elem := timedQueue.Add(4, time.Now().Add(2*time.Second)) + timedQueue.Add(6, time.Now().Add(3*time.Second)) - elem.Cancel() + time.Sleep(4 * time.Second) - time.Sleep(3 * time.Second) + elem.Cancel() - tq.Add(6, time.Now().Add(time.Second)) - tq.Add(68, time.Now().Add(4*time.Second)) + timedQueue.Add(6, time.Now().Add(time.Second)) + timedQueue.Add(68, time.Now().Add(4*time.Second)) - tq.Shutdown(false) + timedQueue.Shutdown(true) time.Sleep(500 * time.Millisecond) }