Skip to content
Snippets Groups Projects
Commit cc038578 authored by Hans Moog's avatar Hans Moog
Browse files

Feat: added key based locking for lrucache + minor fixes

parent f4ca1237
No related branches found
No related tags found
No related merge requests found
...@@ -18,8 +18,7 @@ type LRUCache struct { ...@@ -18,8 +18,7 @@ type LRUCache struct {
size int size int
options *LRUCacheOptions options *LRUCacheOptions
mutex sync.RWMutex mutex sync.RWMutex
keyMutexes map[interface{}]*sync.RWMutex krwMutex KRWMutex
keyMutexesMutex sync.RWMutex
} }
func NewLRUCache(capacity int, options ...*LRUCacheOptions) *LRUCache { func NewLRUCache(capacity int, options ...*LRUCacheOptions) *LRUCache {
...@@ -35,12 +34,12 @@ func NewLRUCache(capacity int, options ...*LRUCacheOptions) *LRUCache { ...@@ -35,12 +34,12 @@ func NewLRUCache(capacity int, options ...*LRUCacheOptions) *LRUCache {
doublyLinkedList: &DoublyLinkedList{}, doublyLinkedList: &DoublyLinkedList{},
capacity: capacity, capacity: capacity,
options: currentOptions, options: currentOptions,
keyMutexes: make(map[interface{}]*sync.RWMutex), krwMutex: KRWMutex{keyMutexConsumers: make(map[interface{}]int), keyMutexes: make(map[interface{}]*sync.RWMutex)},
} }
} }
func (cache *LRUCache) Set(key interface{}, value interface{}) { func (cache *LRUCache) Set(key interface{}, value interface{}) {
keyMutex := cache.getKeyMutex(key) keyMutex := cache.krwMutex.Register(key)
keyMutex.Lock() keyMutex.Lock()
cache.mutex.Lock() cache.mutex.Lock()
...@@ -48,7 +47,7 @@ func (cache *LRUCache) Set(key interface{}, value interface{}) { ...@@ -48,7 +47,7 @@ func (cache *LRUCache) Set(key interface{}, value interface{}) {
cache.mutex.Unlock() cache.mutex.Unlock()
keyMutex.Unlock() keyMutex.Unlock()
cache.deleteKeyMutex(key) cache.krwMutex.Free(key)
} }
func (cache *LRUCache) set(key interface{}, value interface{}) { func (cache *LRUCache) set(key interface{}, value interface{}) {
...@@ -84,7 +83,7 @@ func (cache *LRUCache) set(key interface{}, value interface{}) { ...@@ -84,7 +83,7 @@ func (cache *LRUCache) set(key interface{}, value interface{}) {
} }
func (cache *LRUCache) ComputeIfAbsent(key interface{}, callback func() interface{}) (result interface{}) { func (cache *LRUCache) ComputeIfAbsent(key interface{}, callback func() interface{}) (result interface{}) {
keyMutex := cache.getKeyMutex(key) keyMutex := cache.krwMutex.Register(key)
keyMutex.RLock() keyMutex.RLock()
cache.mutex.RLock() cache.mutex.RLock()
...@@ -107,10 +106,10 @@ func (cache *LRUCache) ComputeIfAbsent(key interface{}, callback func() interfac ...@@ -107,10 +106,10 @@ func (cache *LRUCache) ComputeIfAbsent(key interface{}, callback func() interfac
cache.mutex.Unlock() cache.mutex.Unlock()
} }
keyMutex.Unlock() keyMutex.Unlock()
cache.deleteKeyMutex(key)
} }
cache.krwMutex.Free(key)
return return
} }
...@@ -119,16 +118,17 @@ func (cache *LRUCache) ComputeIfAbsent(key interface{}, callback func() interfac ...@@ -119,16 +118,17 @@ func (cache *LRUCache) ComputeIfAbsent(key interface{}, callback func() interfac
// If the callback returns nil the entry is removed from the cache. // If the callback returns nil the entry is removed from the cache.
// Returns the updated entry. // Returns the updated entry.
func (cache *LRUCache) ComputeIfPresent(key interface{}, callback func(value interface{}) interface{}) (result interface{}) { func (cache *LRUCache) ComputeIfPresent(key interface{}, callback func(value interface{}) interface{}) (result interface{}) {
keyMutex := cache.getKeyMutex(key) keyMutex := cache.krwMutex.Register(key)
keyMutex.RLock() keyMutex.RLock()
cache.mutex.RLock() cache.mutex.RLock()
if entry, exists := cache.directory[key]; exists { if entry, exists := cache.directory[key]; exists {
cache.mutex.RUnlock() cache.mutex.RUnlock()
keyMutex.RUnlock()
keyMutex.Lock()
result = entry.GetValue().(*lruCacheElement).value result = entry.GetValue().(*lruCacheElement).value
keyMutex.RUnlock()
keyMutex.Lock()
if callbackResult := callback(result); !typeutils.IsInterfaceNil(callbackResult) { if callbackResult := callback(result); !typeutils.IsInterfaceNil(callbackResult) {
result = callbackResult result = callbackResult
...@@ -156,42 +156,58 @@ func (cache *LRUCache) ComputeIfPresent(key interface{}, callback func(value int ...@@ -156,42 +156,58 @@ func (cache *LRUCache) ComputeIfPresent(key interface{}, callback func(value int
cache.mutex.RUnlock() cache.mutex.RUnlock()
keyMutex.RUnlock() keyMutex.RUnlock()
} }
cache.deleteKeyMutex(key)
cache.krwMutex.Free(key)
return return
} }
func (cache *LRUCache) Contains(key interface{}) bool { func (cache *LRUCache) Contains(key interface{}) (result bool) {
keyMutex := cache.krwMutex.Register(key)
keyMutex.RLock()
cache.mutex.RLock() cache.mutex.RLock()
if element, exists := cache.directory[key]; exists { if element, exists := cache.directory[key]; exists {
keyMutex.RUnlock()
cache.mutex.RUnlock() cache.mutex.RUnlock()
cache.mutex.Lock() cache.mutex.Lock()
defer cache.mutex.Unlock()
cache.promoteElement(element) cache.promoteElement(element)
cache.mutex.Unlock()
return true result = true
} else { } else {
cache.mutex.RUnlock() cache.mutex.RUnlock()
keyMutex.RUnlock()
return false result = false
} }
cache.krwMutex.Free(key)
return
} }
func (cache *LRUCache) Get(key interface{}) (result interface{}) { func (cache *LRUCache) Get(key interface{}) (result interface{}) {
keyMutex := cache.krwMutex.Register(key)
keyMutex.RLock()
cache.mutex.RLock() cache.mutex.RLock()
if element, exists := cache.directory[key]; exists { if element, exists := cache.directory[key]; exists {
cache.mutex.RUnlock() cache.mutex.RUnlock()
cache.mutex.Lock() cache.mutex.Lock()
defer cache.mutex.Unlock()
cache.promoteElement(element) cache.promoteElement(element)
cache.mutex.Unlock()
result = element.GetValue().(*lruCacheElement).value result = element.GetValue().(*lruCacheElement).value
keyMutex.RUnlock()
} else { } else {
cache.mutex.RUnlock() cache.mutex.RUnlock()
} }
cache.krwMutex.Free(key)
return return
} }
...@@ -210,6 +226,9 @@ func (cache *LRUCache) GetSize() int { ...@@ -210,6 +226,9 @@ func (cache *LRUCache) GetSize() int {
} }
func (cache *LRUCache) Delete(key interface{}) bool { func (cache *LRUCache) Delete(key interface{}) bool {
keyMutex := cache.krwMutex.Register(key)
keyMutex.Lock()
cache.mutex.RLock() cache.mutex.RLock()
entry, exists := cache.directory[key] entry, exists := cache.directory[key]
...@@ -234,6 +253,9 @@ func (cache *LRUCache) Delete(key interface{}) bool { ...@@ -234,6 +253,9 @@ func (cache *LRUCache) Delete(key interface{}) bool {
cache.mutex.RUnlock() cache.mutex.RUnlock()
keyMutex.Unlock()
cache.krwMutex.Free(key)
return false return false
} }
...@@ -243,28 +265,3 @@ func (cache *LRUCache) promoteElement(element *DoublyLinkedListEntry) { ...@@ -243,28 +265,3 @@ func (cache *LRUCache) promoteElement(element *DoublyLinkedListEntry) {
} }
cache.doublyLinkedList.addFirstEntry(element) cache.doublyLinkedList.addFirstEntry(element)
} }
func (cache *LRUCache) getKeyMutex(key interface{}) (result *sync.RWMutex) {
cache.keyMutexesMutex.RLock()
if keyMutex, keyMutexExists := cache.keyMutexes[key]; !keyMutexExists {
cache.keyMutexesMutex.RUnlock()
cache.keyMutexesMutex.Lock()
result = &sync.RWMutex{}
cache.keyMutexes[key] = result
cache.keyMutexesMutex.Unlock()
} else {
cache.keyMutexesMutex.RUnlock()
result = keyMutex
}
return
}
func (cache *LRUCache) deleteKeyMutex(key interface{}) {
cache.keyMutexesMutex.Lock()
delete(cache.keyMutexes, key)
cache.keyMutexesMutex.Unlock()
}
...@@ -11,14 +11,12 @@ type Node struct { ...@@ -11,14 +11,12 @@ type Node struct {
wg *sync.WaitGroup wg *sync.WaitGroup
loggers []*Logger loggers []*Logger
loadedPlugins []*Plugin loadedPlugins []*Plugin
logLevel int
} }
var DisabledPlugins = make(map[string]bool) var DisabledPlugins = make(map[string]bool)
func Load(plugins ...*Plugin) *Node { func Load(plugins ...*Plugin) *Node {
node := &Node{ node := &Node{
logLevel: *LOG_LEVEL.Value,
loggers: make([]*Logger, 0), loggers: make([]*Logger, 0),
wg: &sync.WaitGroup{}, wg: &sync.WaitGroup{},
loadedPlugins: make([]*Plugin, 0), loadedPlugins: make([]*Plugin, 0),
...@@ -42,7 +40,7 @@ func (node *Node) AddLogger(logger *Logger) { ...@@ -42,7 +40,7 @@ func (node *Node) AddLogger(logger *Logger) {
} }
func (node *Node) LogSuccess(pluginName string, message string) { func (node *Node) LogSuccess(pluginName string, message string) {
if node.logLevel >= LOG_LEVEL_SUCCESS { if *LOG_LEVEL.Value >= LOG_LEVEL_SUCCESS {
for _, logger := range node.loggers { for _, logger := range node.loggers {
if logger.Enabled { if logger.Enabled {
logger.LogSuccess(pluginName, message) logger.LogSuccess(pluginName, message)
...@@ -52,7 +50,7 @@ func (node *Node) LogSuccess(pluginName string, message string) { ...@@ -52,7 +50,7 @@ func (node *Node) LogSuccess(pluginName string, message string) {
} }
func (node *Node) LogInfo(pluginName string, message string) { func (node *Node) LogInfo(pluginName string, message string) {
if node.logLevel >= LOG_LEVEL_INFO { if *LOG_LEVEL.Value >= LOG_LEVEL_INFO {
for _, logger := range node.loggers { for _, logger := range node.loggers {
if logger.Enabled { if logger.Enabled {
logger.LogInfo(pluginName, message) logger.LogInfo(pluginName, message)
...@@ -62,7 +60,7 @@ func (node *Node) LogInfo(pluginName string, message string) { ...@@ -62,7 +60,7 @@ func (node *Node) LogInfo(pluginName string, message string) {
} }
func (node *Node) LogDebug(pluginName string, message string) { func (node *Node) LogDebug(pluginName string, message string) {
if node.logLevel >= LOG_LEVEL_DEBUG { if *LOG_LEVEL.Value >= LOG_LEVEL_DEBUG {
for _, logger := range node.loggers { for _, logger := range node.loggers {
if logger.Enabled { if logger.Enabled {
logger.LogDebug(pluginName, message) logger.LogDebug(pluginName, message)
...@@ -72,7 +70,7 @@ func (node *Node) LogDebug(pluginName string, message string) { ...@@ -72,7 +70,7 @@ func (node *Node) LogDebug(pluginName string, message string) {
} }
func (node *Node) LogWarning(pluginName string, message string) { func (node *Node) LogWarning(pluginName string, message string) {
if node.logLevel >= LOG_LEVEL_WARNING { if *LOG_LEVEL.Value >= LOG_LEVEL_WARNING {
for _, logger := range node.loggers { for _, logger := range node.loggers {
if logger.Enabled { if logger.Enabled {
logger.LogWarning(pluginName, message) logger.LogWarning(pluginName, message)
...@@ -82,7 +80,7 @@ func (node *Node) LogWarning(pluginName string, message string) { ...@@ -82,7 +80,7 @@ func (node *Node) LogWarning(pluginName string, message string) {
} }
func (node *Node) LogFailure(pluginName string, message string) { func (node *Node) LogFailure(pluginName string, message string) {
if node.logLevel >= LOG_LEVEL_FAILURE { if *LOG_LEVEL.Value >= LOG_LEVEL_FAILURE {
for _, logger := range node.loggers { for _, logger := range node.loggers {
if logger.Enabled { if logger.Enabled {
logger.LogFailure(pluginName, message) logger.LogFailure(pluginName, message)
...@@ -106,8 +104,6 @@ func (node *Node) Load(plugins ...*Plugin) { ...@@ -106,8 +104,6 @@ func (node *Node) Load(plugins ...*Plugin) {
} }
} }
} }
//node.loadedPlugins = append(node.loadedPlugins, plugins...)
} }
func (node *Node) Run() { func (node *Node) Run() {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment