Skip to content
Snippets Groups Projects
Commit e0a771d0 authored by jkerdreu's avatar jkerdreu
Browse files

refactoring the monitor to add per dict timer

git-svn-id: https://redmine.imt-atlantique.fr/svn/xaal/code/Python/trunk@2177 b32b6428-25c9-4566-ad07-03861ab6144f
parent ddc79d53
Branches
No related tags found
No related merge requests found
......@@ -2,7 +2,7 @@
import time,random
from enum import Enum
from xaal.lib import tools
from xaal.lib import tools,config
import logging
logger = logging.getLogger(__name__)
......@@ -13,45 +13,59 @@ REFRESH_RATE = 300
def now():
return int(time.time())
class TimedDict(dict):
def __init__(self,refresh_rate=REFRESH_RATE,data={}):
dict.__init__(self,data)
self.last_update = 0
self.next_update = 0
def updated(self):
self.last_update = now()
self.next_update = self.last_update + REFRESH_RATE - random.randint(0,20)
def __setitem__(self, key, item):
super().__setitem__(key,item)
self.updated()
def update(self,_dict):
super().update(_dict)
self.updated()
class Device:
def __init__(self, addr, devtype, version):
def __init__(self, addr, devtype):
self.address = addr
self.short_address = tools.reduce_addr(addr)
self.devtype = devtype
self.version = version
# device cache
self.attributes = {}
self.description = {}
self.db = {}
self.attributes = TimedDict(refresh_rate=REFRESH_RATE)
self.description = TimedDict(refresh_rate=REFRESH_RATE*3)
self.db = TimedDict(refresh_rate=REFRESH_RATE*3)
# Alive management
self.last_alive = int(time.time())
self.last_alive = now()
self.next_alive = 0
# Refresh rate
self.refresh = 0
self.refresh_attributes = 0
self.refresh_description = 0
self.refresh_db = 0
def update_attributes(self, data):
""" rude update attributes. Return true if updated"""
self.refresh_attributes = now()
# really not the best comparaison, but we just need a flag
if self.attributes == data:
# TODO Fix this too
self.attributes.update(data)
return False
self.attributes.update(data)
return True
def update_description(self, data):
self.description.update(data)
self.refresh_description = now()
def set_db(self,data):
self.db = data
self.refresh_db = now()
#self.db = data
# TODO Fix this
self.update_db(data)
def update_db(self,data):
self.db.update(data)
self.refresh_db = now()
def update_cache_db(self,data):
purge = []
......@@ -61,7 +75,6 @@ class Device:
self.db.update(data)
for k in purge:
self.db.pop(k)
self.refresh_db = 0
def alive(self,value):
self.last_alive = int(time.time())
......@@ -89,8 +102,8 @@ class Devices:
self.__devs = {}
self.__list_cache = None
def add(self,addr,devtype,version):
dev = Device(addr,devtype,version)
def add(self,addr,devtype):
dev = Device(addr,devtype)
self.__devs.update({addr : dev})
self.__list_cache = None
return dev
......@@ -162,14 +175,14 @@ class Devices:
return key in self.__devs
def auto_wash(self):
now = int(time.time())
now_ =now()
for dev in self.get():
if dev.next_alive < now:
if dev.next_alive < now_:
logger.info("Auto Washing %s" % dev.address)
del self.__devs[dev.address]
self.__list_cache = None
def display(self):
def dump(self):
for d in self.get():
print("%s %s" % (d.address,d.devtype))
......@@ -179,10 +192,11 @@ class Notification(Enum):
new_device = 0
drop_device = 1 # sending drop_device notif is not implemented yet,
attribute_change = 2
description_change = 3
metadata_change = 4
class Monitor:
"""
class xAAL Monitor:
use this class to monitor a xAAL network
"""
def __init__(self,device,filter_func=None,db_server=None):
......@@ -193,12 +207,15 @@ class Monitor:
self.devices = Devices()
self.filter = filter_func
self.subscribers = []
self.engine.add_rx_handler(self.parse_msg)
self.engine.add_rx_handler(self.on_parse_msg)
# only send isAlive message every 2 expirations
self.engine.add_timer(self.send_isalive, config.alive_timer * 2)
# delete expired device every 10s
self.engine.add_timer(self.auto_wash, 10)
self.engine.add_timer(self.send_isalive, 240)
self.engine.add_timer(self.refresh_devices, 5)
# wait 5s for the first isAlive answers before the initial crawl
self.boot_timer = self.engine.add_timer(self.initial_refresh, 5)
def parse_msg(self, msg):
def on_parse_msg(self, msg):
# do nothing for some msg
if (self.filter!=None) and self.filter(msg)==False:
return
......@@ -215,22 +232,17 @@ class Monitor:
r = dev.update_attributes(msg.body)
if r:
self.notify(Notification.attribute_change,dev)
else:
logger.warning("FAKE UPDATE")
elif msg.is_get_description_reply():
dev.update_description(msg.body)
elif self.is_reply_metadb(msg):
elif self.is_from_metadb(msg):
addr = msg.body['device']
target = self.devices.get_with_addr(addr)
if target and 'map' in msg.body:
if self.is_reply_metadb(msg):
target.set_db(msg.body['map'])
elif self.is_update_metadb(msg):
addr = msg.body['device']
target = self.devices.get_with_addr(addr)
if target and 'map' in msg.body:
if self.is_update_metadb(msg):
target.update_db(msg.body['map'])
def subscribe(self,func):
......@@ -244,7 +256,7 @@ class Monitor:
s(ev_type,device)
def add_device(self,msg):
return self.devices.add(msg.source,msg.devtype,msg.version)
return self.devices.add(msg.source,msg.devtype)
def send_isalive(self):
self.engine.send_isAlive(self.dev, "any.any")
......@@ -253,23 +265,65 @@ class Monitor:
"""call the Auto-wash on devices List"""
self.devices.auto_wash()
def initial_refresh(self):
print("Initial....")
now_ = now()
cnt = 0
for dev in self.devices:
# description
old_cnt = cnt
if dev.description.next_update < now_:
self.request_description(dev.address)
dev.description.next_update = now_ + REFRESH_RATE
cnt = cnt +1
# metadata
if self.db_server and dev.db.next_update < now_:
self.request_metadb(dev.address)
dev.db.next_update = now_ + REFRESH_RATE
cnt = cnt +1
# attributes
if dev.attributes.next_update < now_:
self.request_attributes(dev.address)
dev.attributes.next_update = now_ + REFRESH_RATE
cnt = cnt +1
if cnt != old_cnt:
dev.dump()
if cnt > 60:
break
#self.engine.add_timer(self.refresh_devices, 5)
#self.engine.remove_timer(self.boot_timer)
"""
def refresh_devices(self):
now = int(time.time())
now_ = now()
for dev in self.devices:
if dev.refresh + REFRESH_RATE < now:
if dev.refresh_db + REFRESH_RATE < now:
if dev.refresh + REFRESH_RATE < now_:
if dev.refresh_db + REFRESH_RATE < now_:
self.request_metadb(dev.address)
if dev.refresh_attributes + REFRESH_RATE < now:
self.engine.send_get_attributes(self.dev,[dev.address,])
if dev.refresh_description + REFRESH_RATE < now:
self.engine.send_get_description(self.dev,[dev.address,])
if dev.refresh_attributes + REFRESH_RATE < now_:
self.request_attributes(dev.address)
if dev.refresh_description + REFRESH_RATE < now_:
self.request_description(dev.address)
# to avoid bulk send, we introduce this salt in refresh
dev.refresh = now - random.randint(0,20)
"""
def request_metadb(self,addr):
if self.db_server:
self.engine.send_request(self.dev,[self.db_server,],'getKeysValues',{'device':addr})
def request_attributes(self,addr):
self.engine.send_get_attributes(self.dev,[addr,])
def request_description(self,addr):
self.engine.send_get_description(self.dev,[addr,])
def is_from_metadb(self,msg):
if msg.msgtype in ['reply','notify'] and msg.source == self.db_server:
return True
return False
def is_reply_metadb(self,msg):
if msg.msgtype == 'reply' and msg.action == 'getKeysValues' and msg.source == self.db_server:
return True
......@@ -279,3 +333,7 @@ class Monitor:
if msg.msgtype == 'notify' and msg.action == 'keysValuesChanged' and msg.source == self.db_server:
return True
return False
def debug_timers(self):
for dev in self.devices:
print("%s\t%s\t%d\t%d\t%d" % (dev.address,dev.devtype,dev.description.last_update,dev.db.last_update,dev.attributes.last_update))
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please to comment