diff --git a/libs/monitor/xaal/monitor/example.py b/libs/monitor/xaal/monitor/example.py index 7351c365f20337f8f455a0ae1ec0f1cdac0a15df..48ff83a828832aa0766fd927e3feba8748828d9f 100644 --- a/libs/monitor/xaal/monitor/example.py +++ b/libs/monitor/xaal/monitor/example.py @@ -1,5 +1,4 @@ - -from xaal.lib import tools,Engine,Device,helpers +from xaal.lib import tools, Engine, Device, helpers from xaal.monitor import Monitor import platform @@ -9,19 +8,21 @@ PACKAGE_NAME = "xaal.monitorexample" logger = logging.getLogger(PACKAGE_NAME) -def display_event(event,dev): - logger.debug("MonitorExample: %s %s %s" %(event, dev.address, dev.attributes)) + +def display_event(event, dev): + logger.debug("MonitorExample: %s %s %s" % (event, dev.address, dev.attributes)) + def monitor_example(engine): # load config cfg = tools.load_cfg_or_die(PACKAGE_NAME) # create a device & register it - dev = Device("hmi.basic") - dev.address = tools.get_uuid(cfg['config']['addr']) or tools.get_random_uuid() - dev.vendor_id = "IHSEV" + dev = Device("hmi.basic") + dev.address = tools.get_uuid(cfg['config']['addr']) or tools.get_random_uuid() + dev.vendor_id = "IHSEV" dev.product_id = "Monitor Example" - dev.version = 0.1 - dev.info = "%s@%s" % (PACKAGE_NAME,platform.node()) + dev.version = 0.1 + dev.info = "%s@%s" % (PACKAGE_NAME, platform.node()) engine.add_device(dev) db_server = None @@ -30,10 +31,11 @@ def monitor_example(engine): else: logger.info('You can set "db_server" in the config file') # start the monitoring - mon = Monitor(dev,db_server=db_server) + mon = Monitor(dev, db_server=db_server) mon.subscribe(display_event) return mon + def run(): print("Monitor test") helpers.setup_console_logger() @@ -44,8 +46,9 @@ def run(): eng.run() except KeyboardInterrupt: import pdb + pdb.set_trace() + if __name__ == '__main__': run() - diff --git a/libs/monitor/xaal/monitor/monitor.py b/libs/monitor/xaal/monitor/monitor.py index d50ce8bc7c3d040810be34f75f30b6ab21de0874..16db8340cf6de06f00505599efd02aa59ade047e 100644 --- a/libs/monitor/xaal/monitor/monitor.py +++ b/libs/monitor/xaal/monitor/monitor.py @@ -5,13 +5,18 @@ from enum import Enum from xaal.lib import tools, config from xaal.lib import Message +# Typing checker +from xaal.lib.bindings import UUID +from xaal.lib import Device +from typing import Callable + import logging logger = logging.getLogger(__name__) # how often we force refresh the devices attributes/description/keyvalues -REFRESH_RATE = 600 +REFRESH_RATE = 900 BOOT_TIMER = 1 REFRESH_TIMER = 7 AUTOWASH_TIMER = 10 @@ -48,8 +53,8 @@ class TimedDict(dict): return True if (self.last_update != 0) else False -class Device: - def __init__(self, addr, dev_type): +class MonitoredDevice: + def __init__(self, addr: UUID, dev_type: str): self.address = addr self.short_address = tools.reduce_addr(addr) self.dev_type = dev_type @@ -67,11 +72,6 @@ class Device: return self.attributes.update(data) def update_description(self, data): - """ - group_id = data.get('groupId',None) - if group_id: - data['groupId'] = tools.bytes_to_uuid(group_id) - """ return self.description.update(data) def update_db(self, data): @@ -87,13 +87,14 @@ class Device: self.db.pop(k) return r - def is_ready(self): + def is_ready(self) -> bool: + """return True if all cache are ready""" if self.attributes.is_ready() and self.description.is_ready() and self.db.is_ready(): return True return False def alive(self, value): - self.last_alive = int(time.time()) + self.last_alive = now() self.next_alive = self.last_alive + value def get_kv(self, key): @@ -113,24 +114,26 @@ class Device: return result -class Devices: +class MonitoredDevices: """Device List for monitoring""" def __init__(self): self.__devs = {} self.__list_cache = None - def add(self, addr, dev_type): - dev = Device(addr, dev_type) + def add(self, addr: UUID, dev_type: str) -> MonitoredDevice: + """add a new device to the list""" + dev = MonitoredDevice(addr, dev_type) self.__devs.update({addr: dev}) self.__list_cache = None return dev - def remove(self, addr): + def remove(self, addr: UUID): del self.__devs[addr] self.__list_cache = None - def get(self): + def get(self) -> list[MonitoredDevice]: + """return a list of devices""" if not self.__list_cache: # print("Refresh cache") res = list(self.__devs.values()) @@ -138,48 +141,53 @@ class Devices: self.__list_cache = res return self.__list_cache - def get_with_addr(self, addr): + def get_with_addr(self, addr: UUID) -> MonitoredDevice | None: try: return self.__devs[addr] except KeyError: return None - def get_with_group(self, addr): + def get_with_group(self, group_id: UUID) -> list[MonitoredDevice]: + """return a list of devices w/ the same group_id""" r = [] for d in self.get(): - if addr == d.description.get('group_id', None): + if group_id == d.description.get('group_id', None): r.append(d) return r - def get_with_dev_type(self, dev_type): + def get_with_dev_type(self, dev_type: str) -> list[MonitoredDevice]: + """return a list of devices w/ the same dev_type""" r = [] for d in self.get(): if d.dev_type == dev_type: r.append(d) return r - def get_with_key(self, key): + def get_with_key(self, key: str) -> list[MonitoredDevice]: + """return a list of devices w/ a specific key""" r = [] for d in self.get(): if key in d.db: r.append(d) return r - def get_with_key_value(self, key, value): + def get_with_key_value(self, key: str, value) -> list[MonitoredDevice]: + """return a list of devices w/ a specific key and value""" r = [] for d in self.get(): if (key in d.db) and (d.db[key] == value): r.append(d) return r - def fetch_one_kv(self, key, value): + def fetch_one_kv(self, key: str, value) -> MonitoredDevice | None: + """return the first device with a specific key and value""" r = self.get_with_key_value(key, value) try: return r[0] except IndexError: return None - def get_dev_types(self): + def get_dev_types(self) -> list[str]: """return the list of distinct dev_types""" ll = [] for dev in self.__devs.values(): @@ -202,7 +210,8 @@ class Devices: def __contains__(self, key): return key in self.__devs - def auto_wash(self): + def auto_wash(self) -> list[MonitoredDevice]: + """return a list of devices that need to be washed""" now_ = now() result = [] for dev in self.get(): @@ -229,14 +238,16 @@ class Monitor: use this class to monitor a xAAL network """ - def __init__(self, device, filter_func=None, db_server=None): + def __init__(self, device: Device, filter_func: Callable | None = None, db_server: UUID | None = None): self.dev = device + if device.engine is None: + raise ValueError("Device must have an engine") self.engine = device.engine self.db_server = db_server self.boot_finished = False self.last_isalive = 0 - self.devices = Devices() + self.devices = MonitoredDevices() self.filter = filter_func self.subscribers = [] @@ -252,13 +263,15 @@ class Monitor: self.refresh_timer = self.engine.add_timer(self.refresh_devices, BOOT_TIMER) def on_receive_msg(self, msg: Message): - # do nothing for some msg + """We received a message""" + # filter some messages if (self.filter is not None) and not self.filter(msg): return + assert msg.source is not None # type-check if msg.source not in self.devices: dev = self.add_device(msg) - self.notify(Notification.new_device, dev) - + if dev: + self.notify(Notification.new_device, dev) dev = self.devices.get_with_addr(msg.source) if not dev: return @@ -279,6 +292,8 @@ class Monitor: elif self.is_from_metadb(msg): addr = msg.body.get('device') + if addr is None: + return target = self.devices.get_with_addr(addr) changed = False if target and 'map' in msg.body: @@ -289,19 +304,26 @@ class Monitor: if changed: self.notify(Notification.metadata_change, target) - def subscribe(self, func): + def subscribe(self, func: Callable): + """subscribe a function""" self.subscribers.append(func) - def unsubscribe(self, func): + def unsubscribe(self, func: Callable): + """unsubscribe a function""" self.subscribers.remove(func) - def notify(self, ev_type, device): + def notify(self, ev_type: Notification, device: MonitoredDevice): + """notify all subscribers""" for s in self.subscribers: # logger.warning(f"{s} {ev_type}") s(ev_type, device) - def add_device(self, msg): - return self.devices.add(msg.source, msg.dev_type) + def add_device(self, msg: Message) -> MonitoredDevice | None: + """add a new device to the list""" + if msg.source and msg.dev_type: + return self.devices.add(msg.source, msg.dev_type) + else: + logger.warning(f"Invalid message source or dev_type {msg}") def auto_wash(self): """call the Auto-wash on devices List""" @@ -311,6 +333,7 @@ class Monitor: self.devices.remove(d.address) def send_is_alive(self): + """send a isAlive message to all devices (any.any)""" self.engine.send_is_alive(self.dev) self.last_isalive = now() @@ -321,6 +344,13 @@ class Monitor: self.send_is_alive() def refresh_devices(self): + """ + Refresh all devices data : + - but send only 40 requests per call + - used a boot timer to speed up the initial crawl + - switch to slow refresh timer after boot + - device which don't answer will be refreshed at least at REFRESH_RATE + """ now_ = now() cnt = 0 for dev in self.devices: @@ -352,30 +382,13 @@ class Monitor: def request_metadb(self, addr): if self.db_server: - self.engine.send_request( - self.dev, - [ - self.db_server, - ], - 'get_keys_values', - {'device': addr}, - ) + self.engine.send_request(self.dev, [self.db_server], 'get_keys_values', {'device': addr}) def request_attributes(self, addr): - self.engine.send_get_attributes( - self.dev, - [ - addr, - ], - ) + self.engine.send_get_attributes(self.dev, [addr]) def request_description(self, addr): - self.engine.send_get_description( - self.dev, - [ - addr, - ], - ) + self.engine.send_get_description(self.dev, [addr]) def is_from_metadb(self, msg): if (msg.is_notify() or msg.is_reply()) and msg.source == self.db_server: