Skip to content
Snippets Groups Projects
Commit 8565459d authored by Hans Moog's avatar Hans Moog
Browse files

Feat: intermediary commit

parent eb032da9
No related branches found
No related tags found
No related merge requests found
...@@ -2,7 +2,6 @@ package datastructure ...@@ -2,7 +2,6 @@ package datastructure
import ( import (
"container/heap" "container/heap"
"fmt"
"sync" "sync"
"time" "time"
...@@ -16,17 +15,19 @@ import ( ...@@ -16,17 +15,19 @@ import (
type TimedQueue struct { type TimedQueue struct {
heap timedHeap heap timedHeap
heapMutex sync.RWMutex heapMutex sync.RWMutex
nonEmpty sync.WaitGroup waitForNewElementsWG sync.WaitGroup
shutdown chan byte queueEmptyWG sync.WaitGroup
shutdownSignal chan byte
shutdownFlag typeutils.AtomicBool
returnPendingElementsAfterShutdown typeutils.AtomicBool returnPendingElementsAfterShutdown typeutils.AtomicBool
} }
// NewTimedQueue is the constructor for the TimedQueue. // NewTimedQueue is the constructor for the TimedQueue.
func NewTimedQueue() (queue *TimedQueue) { func NewTimedQueue() (queue *TimedQueue) {
queue = &TimedQueue{ queue = &TimedQueue{
shutdown: make(chan byte), shutdownSignal: make(chan byte),
} }
queue.nonEmpty.Add(1) queue.waitForNewElementsWG.Add(1)
return return
} }
...@@ -44,7 +45,7 @@ func (t *TimedQueue) Add(value interface{}, scheduledTime time.Time) (addedEleme ...@@ -44,7 +45,7 @@ func (t *TimedQueue) Add(value interface{}, scheduledTime time.Time) (addedEleme
// mark queue as non-empty // mark queue as non-empty
if len(t.heap) == 0 { if len(t.heap) == 0 {
t.nonEmpty.Done() t.waitForNewElementsWG.Done()
} }
// add new element // add new element
...@@ -76,14 +77,14 @@ func (t *TimedQueue) Poll() interface{} { ...@@ -76,14 +77,14 @@ func (t *TimedQueue) Poll() interface{} {
// mark the queue as empty if last element was polled // mark the queue as empty if last element was polled
if len(t.heap) == 0 { if len(t.heap) == 0 {
t.nonEmpty.Add(1) t.waitForNewElementsWG.Add(1)
} }
t.heapMutex.Unlock() t.heapMutex.Unlock()
// wait for the return value to become due // wait for the return value to become due
select { select {
// abort if the queue was shutdown // abort if the queue was shutdown
case <-t.shutdown: case <-t.shutdownSignal:
if !t.returnPendingElementsAfterShutdown.IsSet() { if !t.returnPendingElementsAfterShutdown.IsSet() {
if t.Size() != 0 { if t.Size() != 0 {
return t.Poll() return t.Poll()
...@@ -92,8 +93,6 @@ func (t *TimedQueue) Poll() interface{} { ...@@ -92,8 +93,6 @@ func (t *TimedQueue) Poll() interface{} {
return nil return nil
} }
fmt.Println("RETURN LAST ELEMENTS")
return polledElement.value return polledElement.value
// abort waiting for this element and wait for the next one instead if it was canceled // abort waiting for this element and wait for the next one instead if it was canceled
case <-polledElement.cancel: case <-polledElement.cancel:
...@@ -113,13 +112,20 @@ func (t *TimedQueue) Size() int { ...@@ -113,13 +112,20 @@ func (t *TimedQueue) Size() int {
return len(t.heap) return len(t.heap)
} }
// WaitForNewElements waits for the queue to be non-empty. This can be used by i.e. schedulers to continuously iterate // IsProcessingElements returns true if the queue was not shutdown, yet or still has queued elements to process after a
// over the queue to process its elements. It returns false, if the queue has been shutdown. // shutdown.
func (t *TimedQueue) WaitForNewElements() bool { //
t.nonEmpty.Wait() // It will wait for the arrival of new elements if the waitForElements parameter is set to true. This can accordingly be
// used by schedulers that want to continuously iterate over the queue without wasting CPU cycles for checking for new
// elements over and over again.
func (t *TimedQueue) IsProcessingElements(waitForElements bool) bool {
// wait for elements to arrive
if waitForElements {
t.waitForNewElementsWG.Wait()
}
select { select {
case <-t.shutdown: case <-t.shutdownSignal:
return t.returnPendingElementsAfterShutdown.IsSet() && t.Size() != 0 return t.returnPendingElementsAfterShutdown.IsSet() && t.Size() != 0
default: default:
return true return true
...@@ -127,20 +133,31 @@ func (t *TimedQueue) WaitForNewElements() bool { ...@@ -127,20 +133,31 @@ func (t *TimedQueue) WaitForNewElements() bool {
} }
// Shutdown terminates the queue and stops the currently waiting Poll requests. // Shutdown terminates the queue and stops the currently waiting Poll requests.
func (t *TimedQueue) Shutdown(returnPendingElements bool) { func (t *TimedQueue) Shutdown(processQueuedElements bool) {
// acquire lock to be thread safe
t.heapMutex.Lock() t.heapMutex.Lock()
select {
// only shutdown once
case <-t.shutdown:
// do nothing
default:
t.returnPendingElementsAfterShutdown.SetTo(returnPendingElements)
close(t.shutdown) // abort if the queue was shutdown before
if t.shutdownFlag.IsSet() {
t.heapMutex.Unlock()
return
} }
// set the flag that indicates if the currently queued elements should be executed immediately
t.returnPendingElementsAfterShutdown.SetTo(processQueuedElements)
// mark the queue as shutdown
t.shutdownFlag.Set()
// close the shutdown channel
close(t.shutdownSignal)
// release the lock
t.heapMutex.Unlock() t.heapMutex.Unlock()
t.nonEmpty.Wait() // wait for the queue to be empty
t.queueEmptyWG.Wait()
} }
// removeElement is an internal utility function that removes the given element from the queue. // removeElement is an internal utility function that removes the given element from the queue.
...@@ -155,7 +172,7 @@ func (t *TimedQueue) removeElement(element *TimedQueueElement) { ...@@ -155,7 +172,7 @@ func (t *TimedQueue) removeElement(element *TimedQueueElement) {
// mark the queue as empty // mark the queue as empty
if len(t.heap) == 0 { if len(t.heap) == 0 {
t.nonEmpty.Add(1) t.waitForNewElementsWG.Add(1)
} }
} }
......
...@@ -7,28 +7,28 @@ import ( ...@@ -7,28 +7,28 @@ import (
) )
func TestTimedQueue_Poll(t *testing.T) { func TestTimedQueue_Poll(t *testing.T) {
tq := NewTimedQueue() timedQueue := NewTimedQueue()
go func() { go func() {
for tq.WaitForNewElements() { for timedQueue.IsProcessingElements(true) {
for currentEntry := tq.Poll(); currentEntry != nil; currentEntry = tq.Poll() { for currentEntry := timedQueue.Poll(); currentEntry != nil; currentEntry = timedQueue.Poll() {
fmt.Println(currentEntry) fmt.Println(currentEntry)
} }
} }
}() }()
tq.Add(2, time.Now().Add(1*time.Second)) timedQueue.Add(2, time.Now().Add(1*time.Second))
elem := tq.Add(4, time.Now().Add(2*time.Second)) elem := timedQueue.Add(4, time.Now().Add(2*time.Second))
tq.Add(6, time.Now().Add(3*time.Second)) timedQueue.Add(6, time.Now().Add(3*time.Second))
elem.Cancel() time.Sleep(4 * time.Second)
time.Sleep(3 * time.Second) elem.Cancel()
tq.Add(6, time.Now().Add(time.Second)) timedQueue.Add(6, time.Now().Add(time.Second))
tq.Add(68, time.Now().Add(4*time.Second)) timedQueue.Add(68, time.Now().Add(4*time.Second))
tq.Shutdown(false) timedQueue.Shutdown(true)
time.Sleep(500 * time.Millisecond) time.Sleep(500 * time.Millisecond)
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment