Skip to content
Snippets Groups Projects
Commit 7b5a474d authored by jkerdreu's avatar jkerdreu
Browse files

added description & db change notif.

git-svn-id: https://redmine.imt-atlantique.fr/svn/xaal/code/Python/trunk@2179 b32b6428-25c9-4566-ad07-03861ab6144f
parent a33c548e
No related branches found
No related tags found
No related merge requests found
......@@ -9,6 +9,9 @@ logger = logging.getLogger(__name__)
# how often we force refresh the devices attributes/description/keyvalues
REFRESH_RATE = 300
BOOT_TIMER = 2
REFRESH_TIMER = 7
AUTOWASH_TIMER = 10
def now():
return int(time.time())
......@@ -21,16 +24,19 @@ class TimedDict(dict):
def updated(self):
self.last_update = now()
self.next_update = self.last_update + REFRESH_RATE - random.randint(0,20)
self.next_update = self.last_update + REFRESH_RATE + random.randint(-30,30)
def __setitem__(self, key, item):
super().__setitem__(key,item)
self.updated()
def update(self,_dict):
super().update(_dict)
def update(self,dict_):
changed = False
if dict_!= self:
changed = True
super().update(dict_)
self.updated()
return changed
class Device:
......@@ -39,9 +45,9 @@ class Device:
self.short_address = tools.reduce_addr(addr)
self.devtype = devtype
# device cache
self.attributes = TimedDict(refresh_rate=REFRESH_RATE)
self.attributes = TimedDict(refresh_rate=REFRESH_RATE)
self.description = TimedDict(refresh_rate=REFRESH_RATE*3)
self.db = TimedDict(refresh_rate=REFRESH_RATE*3)
self.db = TimedDict(refresh_rate=REFRESH_RATE*3)
# Alive management
self.last_alive = now()
self.next_alive = 0
......@@ -49,32 +55,23 @@ class Device:
def update_attributes(self, data):
""" rude update attributes. Return true if updated"""
# really not the best comparaison, but we just need a flag
if self.attributes == data:
# TODO Fix this too
self.attributes.update(data)
return False
self.attributes.update(data)
return True
return self.attributes.update(data)
def update_description(self, data):
self.description.update(data)
def set_db(self,data):
#self.db = data
# TODO Fix this
self.update_db(data)
return self.description.update(data)
def update_db(self,data):
self.db.update(data)
return self.db.update(data)
def update_cache_db(self,data):
def set_db(self,data):
purge = []
for k in data:
if data[k]==None:
purge.append(k)
self.db.update(data)
r = self.db.update(data)
for k in purge:
self.db.pop(k)
return r
def alive(self,value):
self.last_alive = int(time.time())
......@@ -204,6 +201,7 @@ class Monitor:
self.engine = device.engine
self.db_server = db_server
self.boot_finished = False
self.devices = Devices()
self.filter = filter_func
self.subscribers = []
......@@ -211,9 +209,9 @@ class Monitor:
# only send isAlive message every 2 expirations
self.engine.add_timer(self.send_isalive, config.alive_timer * 2)
# delete expired device every 10s
self.engine.add_timer(self.auto_wash, 10)
self.engine.add_timer(self.auto_wash, AUTOWASH_TIMER)
# wait 5s for the first isAlive answers before the initial crawl
self.boot_timer = self.engine.add_timer(self.initial_refresh, 5)
self.refresh_timer = self.engine.add_timer(self.refresh_devices, BOOT_TIMER)
def on_parse_msg(self, msg):
# do nothing for some msg
......@@ -229,21 +227,24 @@ class Monitor:
dev.alive(msg.body['timeout'])
elif msg.is_attributes_change() or msg.is_get_attribute_reply():
r = dev.update_attributes(msg.body)
if r:
if dev.update_attributes(msg.body):
self.notify(Notification.attribute_change,dev)
elif msg.is_get_description_reply():
dev.update_description(msg.body)
if dev.update_description(msg.body):
self.notify(Notification.description_change,dev)
elif self.is_from_metadb(msg):
addr = msg.body['device']
target = self.devices.get_with_addr(addr)
changed = False
if target and 'map' in msg.body:
if self.is_reply_metadb(msg):
target.set_db(msg.body['map'])
changed = target.set_db(msg.body['map'])
if self.is_update_metadb(msg):
target.update_db(msg.body['map'])
changed = target.update_db(msg.body['map'])
if changed:
self.notify(Notification.metadata_change,dev)
def subscribe(self,func):
self.subscribers.append(func)
......@@ -265,14 +266,11 @@ class Monitor:
"""call the Auto-wash on devices List"""
self.devices.auto_wash()
def initial_refresh(self):
print("Initial....")
def refresh_devices(self):
now_ = now()
cnt = 0
for dev in self.devices:
# description
old_cnt = cnt
if dev.description.next_update < now_:
self.request_description(dev.address)
dev.description.next_update = now_ + REFRESH_RATE
......@@ -287,27 +285,16 @@ class Monitor:
self.request_attributes(dev.address)
dev.attributes.next_update = now_ + REFRESH_RATE
cnt = cnt +1
if cnt != old_cnt:
dev.dump()
if cnt > 60:
break
#self.engine.add_timer(self.refresh_devices, 5)
#self.engine.remove_timer(self.boot_timer)
"""
def refresh_devices(self):
now_ = now()
for dev in self.devices:
if dev.refresh + REFRESH_RATE < now_:
if dev.refresh_db + REFRESH_RATE < now_:
self.request_metadb(dev.address)
if dev.refresh_attributes + REFRESH_RATE < now_:
self.request_attributes(dev.address)
if dev.refresh_description + REFRESH_RATE < now_:
self.request_description(dev.address)
# to avoid bulk send, we introduce this salt in refresh
dev.refresh = now - random.randint(0,20)
"""
if cnt > 40:
break
# switch to normal timer after boot
if self.boot_finished == False and cnt == 0 and len(self.devices)!=0:
self.refresh_timer.period = REFRESH_TIMER
logger.info("Switching to slow refresh timer")
self.boot_finished = True
elif cnt!=0:
logger.info("request queued: %d" % cnt )
def request_metadb(self,addr):
if self.db_server:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment