Skip to content
Snippets Groups Projects
Commit 914bb6a4 authored by KERDREUX Jerome's avatar KERDREUX Jerome
Browse files

Added type-hints, and change timers

- Added a lot of comments
- Added type-hints
- Changed the refresh-rate to lower the rate
parent 58bb1c9e
No related branches found
No related tags found
No related merge requests found
from xaal.lib import tools, Engine, Device, helpers
from xaal.monitor import Monitor
......@@ -9,9 +8,11 @@ PACKAGE_NAME = "xaal.monitorexample"
logger = logging.getLogger(PACKAGE_NAME)
def display_event(event, dev):
logger.debug("MonitorExample: %s %s %s" % (event, dev.address, dev.attributes))
def monitor_example(engine):
# load config
cfg = tools.load_cfg_or_die(PACKAGE_NAME)
......@@ -34,6 +35,7 @@ def monitor_example(engine):
mon.subscribe(display_event)
return mon
def run():
print("Monitor test")
helpers.setup_console_logger()
......@@ -44,8 +46,9 @@ def run():
eng.run()
except KeyboardInterrupt:
import pdb
pdb.set_trace()
if __name__ == '__main__':
run()
......@@ -5,13 +5,18 @@ from enum import Enum
from xaal.lib import tools, config
from xaal.lib import Message
# Typing checker
from xaal.lib.bindings import UUID
from xaal.lib import Device
from typing import Callable
import logging
logger = logging.getLogger(__name__)
# how often we force refresh the devices attributes/description/keyvalues
REFRESH_RATE = 600
REFRESH_RATE = 900
BOOT_TIMER = 1
REFRESH_TIMER = 7
AUTOWASH_TIMER = 10
......@@ -48,8 +53,8 @@ class TimedDict(dict):
return True if (self.last_update != 0) else False
class Device:
def __init__(self, addr, dev_type):
class MonitoredDevice:
def __init__(self, addr: UUID, dev_type: str):
self.address = addr
self.short_address = tools.reduce_addr(addr)
self.dev_type = dev_type
......@@ -67,11 +72,6 @@ class Device:
return self.attributes.update(data)
def update_description(self, data):
"""
group_id = data.get('groupId',None)
if group_id:
data['groupId'] = tools.bytes_to_uuid(group_id)
"""
return self.description.update(data)
def update_db(self, data):
......@@ -87,13 +87,14 @@ class Device:
self.db.pop(k)
return r
def is_ready(self):
def is_ready(self) -> bool:
"""return True if all cache are ready"""
if self.attributes.is_ready() and self.description.is_ready() and self.db.is_ready():
return True
return False
def alive(self, value):
self.last_alive = int(time.time())
self.last_alive = now()
self.next_alive = self.last_alive + value
def get_kv(self, key):
......@@ -113,24 +114,26 @@ class Device:
return result
class Devices:
class MonitoredDevices:
"""Device List for monitoring"""
def __init__(self):
self.__devs = {}
self.__list_cache = None
def add(self, addr, dev_type):
dev = Device(addr, dev_type)
def add(self, addr: UUID, dev_type: str) -> MonitoredDevice:
"""add a new device to the list"""
dev = MonitoredDevice(addr, dev_type)
self.__devs.update({addr: dev})
self.__list_cache = None
return dev
def remove(self, addr):
def remove(self, addr: UUID):
del self.__devs[addr]
self.__list_cache = None
def get(self):
def get(self) -> list[MonitoredDevice]:
"""return a list of devices"""
if not self.__list_cache:
# print("Refresh cache")
res = list(self.__devs.values())
......@@ -138,48 +141,53 @@ class Devices:
self.__list_cache = res
return self.__list_cache
def get_with_addr(self, addr):
def get_with_addr(self, addr: UUID) -> MonitoredDevice | None:
try:
return self.__devs[addr]
except KeyError:
return None
def get_with_group(self, addr):
def get_with_group(self, group_id: UUID) -> list[MonitoredDevice]:
"""return a list of devices w/ the same group_id"""
r = []
for d in self.get():
if addr == d.description.get('group_id', None):
if group_id == d.description.get('group_id', None):
r.append(d)
return r
def get_with_dev_type(self, dev_type):
def get_with_dev_type(self, dev_type: str) -> list[MonitoredDevice]:
"""return a list of devices w/ the same dev_type"""
r = []
for d in self.get():
if d.dev_type == dev_type:
r.append(d)
return r
def get_with_key(self, key):
def get_with_key(self, key: str) -> list[MonitoredDevice]:
"""return a list of devices w/ a specific key"""
r = []
for d in self.get():
if key in d.db:
r.append(d)
return r
def get_with_key_value(self, key, value):
def get_with_key_value(self, key: str, value) -> list[MonitoredDevice]:
"""return a list of devices w/ a specific key and value"""
r = []
for d in self.get():
if (key in d.db) and (d.db[key] == value):
r.append(d)
return r
def fetch_one_kv(self, key, value):
def fetch_one_kv(self, key: str, value) -> MonitoredDevice | None:
"""return the first device with a specific key and value"""
r = self.get_with_key_value(key, value)
try:
return r[0]
except IndexError:
return None
def get_dev_types(self):
def get_dev_types(self) -> list[str]:
"""return the list of distinct dev_types"""
ll = []
for dev in self.__devs.values():
......@@ -202,7 +210,8 @@ class Devices:
def __contains__(self, key):
return key in self.__devs
def auto_wash(self):
def auto_wash(self) -> list[MonitoredDevice]:
"""return a list of devices that need to be washed"""
now_ = now()
result = []
for dev in self.get():
......@@ -229,14 +238,16 @@ class Monitor:
use this class to monitor a xAAL network
"""
def __init__(self, device, filter_func=None, db_server=None):
def __init__(self, device: Device, filter_func: Callable | None = None, db_server: UUID | None = None):
self.dev = device
if device.engine is None:
raise ValueError("Device must have an engine")
self.engine = device.engine
self.db_server = db_server
self.boot_finished = False
self.last_isalive = 0
self.devices = Devices()
self.devices = MonitoredDevices()
self.filter = filter_func
self.subscribers = []
......@@ -252,13 +263,15 @@ class Monitor:
self.refresh_timer = self.engine.add_timer(self.refresh_devices, BOOT_TIMER)
def on_receive_msg(self, msg: Message):
# do nothing for some msg
"""We received a message"""
# filter some messages
if (self.filter is not None) and not self.filter(msg):
return
assert msg.source is not None # type-check
if msg.source not in self.devices:
dev = self.add_device(msg)
if dev:
self.notify(Notification.new_device, dev)
dev = self.devices.get_with_addr(msg.source)
if not dev:
return
......@@ -279,6 +292,8 @@ class Monitor:
elif self.is_from_metadb(msg):
addr = msg.body.get('device')
if addr is None:
return
target = self.devices.get_with_addr(addr)
changed = False
if target and 'map' in msg.body:
......@@ -289,19 +304,26 @@ class Monitor:
if changed:
self.notify(Notification.metadata_change, target)
def subscribe(self, func):
def subscribe(self, func: Callable):
"""subscribe a function"""
self.subscribers.append(func)
def unsubscribe(self, func):
def unsubscribe(self, func: Callable):
"""unsubscribe a function"""
self.subscribers.remove(func)
def notify(self, ev_type, device):
def notify(self, ev_type: Notification, device: MonitoredDevice):
"""notify all subscribers"""
for s in self.subscribers:
# logger.warning(f"{s} {ev_type}")
s(ev_type, device)
def add_device(self, msg):
def add_device(self, msg: Message) -> MonitoredDevice | None:
"""add a new device to the list"""
if msg.source and msg.dev_type:
return self.devices.add(msg.source, msg.dev_type)
else:
logger.warning(f"Invalid message source or dev_type {msg}")
def auto_wash(self):
"""call the Auto-wash on devices List"""
......@@ -311,6 +333,7 @@ class Monitor:
self.devices.remove(d.address)
def send_is_alive(self):
"""send a isAlive message to all devices (any.any)"""
self.engine.send_is_alive(self.dev)
self.last_isalive = now()
......@@ -321,6 +344,13 @@ class Monitor:
self.send_is_alive()
def refresh_devices(self):
"""
Refresh all devices data :
- but send only 40 requests per call
- used a boot timer to speed up the initial crawl
- switch to slow refresh timer after boot
- device which don't answer will be refreshed at least at REFRESH_RATE
"""
now_ = now()
cnt = 0
for dev in self.devices:
......@@ -352,30 +382,13 @@ class Monitor:
def request_metadb(self, addr):
if self.db_server:
self.engine.send_request(
self.dev,
[
self.db_server,
],
'get_keys_values',
{'device': addr},
)
self.engine.send_request(self.dev, [self.db_server], 'get_keys_values', {'device': addr})
def request_attributes(self, addr):
self.engine.send_get_attributes(
self.dev,
[
addr,
],
)
self.engine.send_get_attributes(self.dev, [addr])
def request_description(self, addr):
self.engine.send_get_description(
self.dev,
[
addr,
],
)
self.engine.send_get_description(self.dev, [addr])
def is_from_metadb(self, msg):
if (msg.is_notify() or msg.is_reply()) and msg.source == self.db_server:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment