From cc038578da917812e5bd4560e59c6235024a05e7 Mon Sep 17 00:00:00 2001
From: Hans Moog <hm@mkjc.net>
Date: Fri, 5 Jul 2019 20:02:36 +0200
Subject: [PATCH] Feat: added key based locking for lrucache + minor fixes

---
 packages/datastructure/lru_cache.go | 85 ++++++++++++++---------------
 packages/node/node.go               | 14 ++---
 2 files changed, 46 insertions(+), 53 deletions(-)

diff --git a/packages/datastructure/lru_cache.go b/packages/datastructure/lru_cache.go
index fa7a71d8..f70a86ae 100644
--- a/packages/datastructure/lru_cache.go
+++ b/packages/datastructure/lru_cache.go
@@ -18,8 +18,7 @@ type LRUCache struct {
 	size             int
 	options          *LRUCacheOptions
 	mutex            sync.RWMutex
-	keyMutexes       map[interface{}]*sync.RWMutex
-	keyMutexesMutex  sync.RWMutex
+	krwMutex         KRWMutex
 }
 
 func NewLRUCache(capacity int, options ...*LRUCacheOptions) *LRUCache {
@@ -35,12 +34,12 @@ func NewLRUCache(capacity int, options ...*LRUCacheOptions) *LRUCache {
 		doublyLinkedList: &DoublyLinkedList{},
 		capacity:         capacity,
 		options:          currentOptions,
-		keyMutexes:       make(map[interface{}]*sync.RWMutex),
+		krwMutex:         KRWMutex{keyMutexConsumers: make(map[interface{}]int), keyMutexes: make(map[interface{}]*sync.RWMutex)},
 	}
 }
 
 func (cache *LRUCache) Set(key interface{}, value interface{}) {
-	keyMutex := cache.getKeyMutex(key)
+	keyMutex := cache.krwMutex.Register(key)
 	keyMutex.Lock()
 
 	cache.mutex.Lock()
@@ -48,7 +47,7 @@ func (cache *LRUCache) Set(key interface{}, value interface{}) {
 	cache.mutex.Unlock()
 
 	keyMutex.Unlock()
-	cache.deleteKeyMutex(key)
+	cache.krwMutex.Free(key)
 }
 
 func (cache *LRUCache) set(key interface{}, value interface{}) {
@@ -84,7 +83,7 @@ func (cache *LRUCache) set(key interface{}, value interface{}) {
 }
 
 func (cache *LRUCache) ComputeIfAbsent(key interface{}, callback func() interface{}) (result interface{}) {
-	keyMutex := cache.getKeyMutex(key)
+	keyMutex := cache.krwMutex.Register(key)
 
 	keyMutex.RLock()
 	cache.mutex.RLock()
@@ -107,10 +106,10 @@ func (cache *LRUCache) ComputeIfAbsent(key interface{}, callback func() interfac
 			cache.mutex.Unlock()
 		}
 		keyMutex.Unlock()
-
-		cache.deleteKeyMutex(key)
 	}
 
+	cache.krwMutex.Free(key)
+
 	return
 }
 
@@ -119,16 +118,17 @@ func (cache *LRUCache) ComputeIfAbsent(key interface{}, callback func() interfac
 // If the callback returns nil the entry is removed from the cache.
 // Returns the updated entry.
 func (cache *LRUCache) ComputeIfPresent(key interface{}, callback func(value interface{}) interface{}) (result interface{}) {
-	keyMutex := cache.getKeyMutex(key)
+	keyMutex := cache.krwMutex.Register(key)
+
 	keyMutex.RLock()
 	cache.mutex.RLock()
 	if entry, exists := cache.directory[key]; exists {
 		cache.mutex.RUnlock()
+		keyMutex.RUnlock()
+		keyMutex.Lock()
 
 		result = entry.GetValue().(*lruCacheElement).value
 
-		keyMutex.RUnlock()
-		keyMutex.Lock()
 		if callbackResult := callback(result); !typeutils.IsInterfaceNil(callbackResult) {
 			result = callbackResult
 
@@ -156,42 +156,58 @@ func (cache *LRUCache) ComputeIfPresent(key interface{}, callback func(value int
 		cache.mutex.RUnlock()
 		keyMutex.RUnlock()
 	}
-	cache.deleteKeyMutex(key)
+
+	cache.krwMutex.Free(key)
 
 	return
 }
 
-func (cache *LRUCache) Contains(key interface{}) bool {
+func (cache *LRUCache) Contains(key interface{}) (result bool) {
+	keyMutex := cache.krwMutex.Register(key)
+
+	keyMutex.RLock()
 	cache.mutex.RLock()
 	if element, exists := cache.directory[key]; exists {
+		keyMutex.RUnlock()
+
 		cache.mutex.RUnlock()
 		cache.mutex.Lock()
-		defer cache.mutex.Unlock()
-
 		cache.promoteElement(element)
+		cache.mutex.Unlock()
 
-		return true
+		result = true
 	} else {
 		cache.mutex.RUnlock()
+		keyMutex.RUnlock()
 
-		return false
+		result = false
 	}
+
+	cache.krwMutex.Free(key)
+
+	return
 }
 
 func (cache *LRUCache) Get(key interface{}) (result interface{}) {
+	keyMutex := cache.krwMutex.Register(key)
+
+	keyMutex.RLock()
 	cache.mutex.RLock()
 	if element, exists := cache.directory[key]; exists {
 		cache.mutex.RUnlock()
 		cache.mutex.Lock()
-		defer cache.mutex.Unlock()
-
 		cache.promoteElement(element)
+		cache.mutex.Unlock()
 
 		result = element.GetValue().(*lruCacheElement).value
+
+		keyMutex.RUnlock()
 	} else {
 		cache.mutex.RUnlock()
 	}
 
+	cache.krwMutex.Free(key)
+
 	return
 }
 
@@ -210,6 +226,9 @@ func (cache *LRUCache) GetSize() int {
 }
 
 func (cache *LRUCache) Delete(key interface{}) bool {
+	keyMutex := cache.krwMutex.Register(key)
+	keyMutex.Lock()
+
 	cache.mutex.RLock()
 
 	entry, exists := cache.directory[key]
@@ -234,6 +253,9 @@ func (cache *LRUCache) Delete(key interface{}) bool {
 
 	cache.mutex.RUnlock()
 
+	keyMutex.Unlock()
+	cache.krwMutex.Free(key)
+
 	return false
 }
 
@@ -243,28 +265,3 @@ func (cache *LRUCache) promoteElement(element *DoublyLinkedListEntry) {
 	}
 	cache.doublyLinkedList.addFirstEntry(element)
 }
-
-func (cache *LRUCache) getKeyMutex(key interface{}) (result *sync.RWMutex) {
-	cache.keyMutexesMutex.RLock()
-	if keyMutex, keyMutexExists := cache.keyMutexes[key]; !keyMutexExists {
-		cache.keyMutexesMutex.RUnlock()
-		cache.keyMutexesMutex.Lock()
-
-		result = &sync.RWMutex{}
-		cache.keyMutexes[key] = result
-
-		cache.keyMutexesMutex.Unlock()
-	} else {
-		cache.keyMutexesMutex.RUnlock()
-
-		result = keyMutex
-	}
-
-	return
-}
-
-func (cache *LRUCache) deleteKeyMutex(key interface{}) {
-	cache.keyMutexesMutex.Lock()
-	delete(cache.keyMutexes, key)
-	cache.keyMutexesMutex.Unlock()
-}
diff --git a/packages/node/node.go b/packages/node/node.go
index c0b99d37..bedaa490 100644
--- a/packages/node/node.go
+++ b/packages/node/node.go
@@ -11,14 +11,12 @@ type Node struct {
 	wg            *sync.WaitGroup
 	loggers       []*Logger
 	loadedPlugins []*Plugin
-	logLevel      int
 }
 
 var DisabledPlugins = make(map[string]bool)
 
 func Load(plugins ...*Plugin) *Node {
 	node := &Node{
-		logLevel:      *LOG_LEVEL.Value,
 		loggers:       make([]*Logger, 0),
 		wg:            &sync.WaitGroup{},
 		loadedPlugins: make([]*Plugin, 0),
@@ -42,7 +40,7 @@ func (node *Node) AddLogger(logger *Logger) {
 }
 
 func (node *Node) LogSuccess(pluginName string, message string) {
-	if node.logLevel >= LOG_LEVEL_SUCCESS {
+	if *LOG_LEVEL.Value >= LOG_LEVEL_SUCCESS {
 		for _, logger := range node.loggers {
 			if logger.Enabled {
 				logger.LogSuccess(pluginName, message)
@@ -52,7 +50,7 @@ func (node *Node) LogSuccess(pluginName string, message string) {
 }
 
 func (node *Node) LogInfo(pluginName string, message string) {
-	if node.logLevel >= LOG_LEVEL_INFO {
+	if *LOG_LEVEL.Value >= LOG_LEVEL_INFO {
 		for _, logger := range node.loggers {
 			if logger.Enabled {
 				logger.LogInfo(pluginName, message)
@@ -62,7 +60,7 @@ func (node *Node) LogInfo(pluginName string, message string) {
 }
 
 func (node *Node) LogDebug(pluginName string, message string) {
-	if node.logLevel >= LOG_LEVEL_DEBUG {
+	if *LOG_LEVEL.Value >= LOG_LEVEL_DEBUG {
 		for _, logger := range node.loggers {
 			if logger.Enabled {
 				logger.LogDebug(pluginName, message)
@@ -72,7 +70,7 @@ func (node *Node) LogDebug(pluginName string, message string) {
 }
 
 func (node *Node) LogWarning(pluginName string, message string) {
-	if node.logLevel >= LOG_LEVEL_WARNING {
+	if *LOG_LEVEL.Value >= LOG_LEVEL_WARNING {
 		for _, logger := range node.loggers {
 			if logger.Enabled {
 				logger.LogWarning(pluginName, message)
@@ -82,7 +80,7 @@ func (node *Node) LogWarning(pluginName string, message string) {
 }
 
 func (node *Node) LogFailure(pluginName string, message string) {
-	if node.logLevel >= LOG_LEVEL_FAILURE {
+	if *LOG_LEVEL.Value >= LOG_LEVEL_FAILURE {
 		for _, logger := range node.loggers {
 			if logger.Enabled {
 				logger.LogFailure(pluginName, message)
@@ -106,8 +104,6 @@ func (node *Node) Load(plugins ...*Plugin) {
 			}
 		}
 	}
-
-	//node.loadedPlugins = append(node.loadedPlugins, plugins...)
 }
 
 func (node *Node) Run() {
-- 
GitLab