From 7e279ae2fb7e5a8251339f2db7a5cd141ca9440b Mon Sep 17 00:00:00 2001 From: jkerdreux-imt <jerome.kerdreux@imt-atlantique.fr> Date: Tue, 26 Nov 2024 00:52:19 +0100 Subject: [PATCH] Massive type hinting / Format / import ordering fix a bug in address typed as int, and is_running() missing return type --- libs/lib/xaal/lib/aioengine.py | 232 ++++++++++++++++++--------------- libs/lib/xaal/lib/core.py | 15 +-- libs/lib/xaal/lib/engine.py | 89 +++++++------ 3 files changed, 179 insertions(+), 157 deletions(-) diff --git a/libs/lib/xaal/lib/aioengine.py b/libs/lib/xaal/lib/aioengine.py index db97fb11..d3024051 100644 --- a/libs/lib/xaal/lib/aioengine.py +++ b/libs/lib/xaal/lib/aioengine.py @@ -3,8 +3,11 @@ import logging import signal import sys import time +import typing from enum import Enum from pprint import pprint +from typing import Any, Optional, Union +from uuid import UUID import aioconsole from tabulate import tabulate @@ -14,29 +17,62 @@ from .aionetwork import AsyncNetworkConnector from .exceptions import CallbackError, XAALError from .messages import MessageParserError +if typing.TYPE_CHECKING: + from .devices import Device + from .messages import Message + logger = logging.getLogger(__name__) -class AsyncEngine(core.EngineMixin): - __slots__ = ['__txFifo','_loop','_tasks','_hooks','_watchdog_task','_kill_counter','running_event','watchdog_event','started_event'] +##################################################### +# Hooks +##################################################### +class HookType(Enum): + start = 0 + stop = 1 + + +class Hook(object): + __slots__ = ["type", "func", "args", "kwargs"] - def __init__(self, address=config.address, port=config.port, hops=config.hops, key=config.key): - core.EngineMixin.__init__(self,address,port,hops,key) + def __init__(self, type_, func, *args, **kwargs): + self.type = type_ + self.func = func + self.args = args + self.kwargs = kwargs - self.__txFifo = asyncio.Queue() # tx msg fifo - self._loop = None # event loop - self._hooks = [] # hooks - self._tasks = [] # tasks - self._watchdog_task = None # watchdog task - self._kill_counter = 0 # watchdog counter - self.started_event = asyncio.Event() # engine started event - self.running_event = asyncio.Event() # engine running event - self.watchdog_event = asyncio.Event() # watchdog event +class AsyncEngine(core.EngineMixin): + __slots__ = [ + "__txFifo", + "_loop", + "_tasks", + "_hooks", + "_watchdog_task", + "_kill_counter", + "running_event", + "watchdog_event", + "started_event", + ] + + def __init__( + self, address: str = config.address, port: int = config.port, hops: int = config.hops, key: bytes = config.key): + core.EngineMixin.__init__(self, address, port, hops, key) + + self.__txFifo = asyncio.Queue() # tx msg fifo + self._loop = None # event loop + self._hooks = [] # hooks + self._tasks = [] # tasks + self._watchdog_task = None # watchdog task + self._kill_counter = 0 # watchdog counter + + self.started_event = asyncio.Event() # engine started event + self.running_event = asyncio.Event() # engine running event + self.watchdog_event = asyncio.Event() # watchdog event signal.signal(signal.SIGTERM, self.sigkill_handler) signal.signal(signal.SIGINT, self.sigkill_handler) - + # message receive workflow self.subscribe(self.handle_request) # start network @@ -45,20 +81,20 @@ class AsyncEngine(core.EngineMixin): ##################################################### # Hooks ##################################################### - def on_start(self,func,*args,**kwargs): - hook = Hook(HookType.start,func,*args,**kwargs) + def on_start(self, func, *args, **kwargs): + hook = Hook(HookType.start, func, *args, **kwargs) self._hooks.append(hook) - - def on_stop(self,func,*args,**kwargs): - hook = Hook(HookType.stop,func,*args,**kwargs) + + def on_stop(self, func, *args, **kwargs): + hook = Hook(HookType.stop, func, *args, **kwargs) self._hooks.append(hook) - async def run_hooks(self,hook_type): - hooks = list(filter(lambda hook: hook.type==hook_type,self._hooks)) - if len(hooks)!=0: + async def run_hooks(self, hook_type: HookType): + hooks = list(filter(lambda hook: hook.type == hook_type, self._hooks)) + if len(hooks) != 0: logger.debug(f"Launching {hook_type} hooks") for h in hooks: - await run_func(h.func,*h.args,**h.kwargs) + await run_func(h.func, *h.args, **h.kwargs) ##################################################### # timers @@ -74,7 +110,7 @@ class AsyncEngine(core.EngineMixin): except CallbackError as e: logger.error(e.description) if t.counter != -1: - t.counter-= 1 + t.counter -= 1 if t.counter == 0: expire_list.append(t) t.deadline = now + t.period @@ -85,20 +121,20 @@ class AsyncEngine(core.EngineMixin): ##################################################### # msg send / receive ##################################################### - def queue_msg(self, msg): + def queue_msg(self, msg: bytes): """queue a message""" self.__txFifo.put_nowait(msg) - def send_msg(self, msg): + def send_msg(self, msg: bytes): """Send an encoded message to the bus, use queue_msg instead""" self.network.send(msg) - async def receive_msg(self): + async def receive_msg(self) -> Optional["Message"]: """return new received message or None""" data = await self.network.get_data() if data: try: - msg = self.msg_factory.decode_msg(data,self.msg_filter) + msg = self.msg_factory.decode_msg(data, self.msg_filter) except MessageParserError as e: logger.warning(e) msg = None @@ -110,10 +146,10 @@ class AsyncEngine(core.EngineMixin): msg = await self.receive_msg() if msg: for func in self.subscribers: - await run_func(func,msg) + await run_func(func, msg) self.process_attributes_change() - def handle_request(self, msg): + def handle_request(self, msg: "Message"): """Filter msg for devices according default xAAL API then process the request for each targets identied in the engine """ @@ -121,16 +157,16 @@ class AsyncEngine(core.EngineMixin): return targets = core.filter_msg_for_devices(msg, self.devices) for target in targets: - if msg.action == 'is_alive': + if msg.action == "is_alive": self.send_alive(target) else: self.new_task(self.handle_action_request(msg, target)) - async def handle_action_request(self, msg, target): + async def handle_action_request(self, msg: "Message", target: "Device"): try: result = await run_action(msg, target) if result is not None: - self.send_reply(dev=target,targets=[msg.source],action=msg.action,body=result) + self.send_reply(dev=target, targets=[msg.source], action=msg.action, body=result) except CallbackError as e: self.send_error(target, e.code, e.description) except XAALError as e: @@ -139,25 +175,25 @@ class AsyncEngine(core.EngineMixin): ##################################################### # Asyncio loop & Tasks ##################################################### - def get_loop(self): + def get_loop(self) -> asyncio.AbstractEventLoop: if self._loop is None: - logger.debug('New event loop') + logger.debug("New event loop") self._loop = asyncio.get_event_loop() return self._loop - def new_task(self, coro, name=None): + def new_task(self, coro: Any, name: Optional[str] = None) -> asyncio.Task: # we maintain a task list, to be able to stop/start the engine # on demand. needed by HASS - task = self.get_loop().create_task(coro,name=name) + task = self.get_loop().create_task(coro, name=name) self._tasks.append(task) task.add_done_callback(self.task_callback) return task - def task_callback(self, task): + def task_callback(self, task: asyncio.Task): # called when a task ended self._tasks.remove(task) - def all_tasks(self): + def all_tasks(self) -> typing.List[asyncio.Task]: return self._tasks async def boot_task(self): @@ -191,34 +227,34 @@ class AsyncEngine(core.EngineMixin): async def watchdog_task(self): await self.watchdog_event.wait() await self.stop() - logger.info('Exit') - + logger.info("Exit") + ##################################################### # start / stop / shutdown ##################################################### - def is_running(self): + def is_running(self) -> bool: return self.running_event.is_set() def start(self): if self.is_running(): - logger.warning('Engine already started') + logger.warning("Engine already started") return self.started_event.set() - self.new_task(self.boot_task(), name='Boot') - self.new_task(self.receive_task(), name='RecvQ') - self.new_task(self.send_task(), name='SendQ') - self.new_task(self.timer_task(), name='Timers') - self.new_task(console(locals()), name='Console') + self.new_task(self.boot_task(), name="Boot") + self.new_task(self.receive_task(), name="RecvQ") + self.new_task(self.send_task(), name="SendQ") + self.new_task(self.timer_task(), name="Timers") + self.new_task(console(locals()), name="Console") def setup_alives_timer(self): # needed on stop-start sequence if self.process_alives in [t.func for t in self.timers]: return # process alives every 10 seconds - self.add_timer(self.process_alives,10) + self.add_timer(self.process_alives, 10) async def stop(self): - logger.info('Stopping engine') + logger.info("Stopping engine") await self.run_hooks(HookType.stop) self.running_event.clear() self.started_event.clear() @@ -229,67 +265,66 @@ class AsyncEngine(core.EngineMixin): await asyncio.sleep(0.1) def sigkill_handler(self, signal, frame): - print("", end = "\r") #remove the uggly ^C + print("", end="\r") # remove the uggly ^C if not self.is_running(): - logger.warning('Engine already stopped') + logger.warning("Engine already stopped") self._kill_counter = 1 - self._kill_counter +=1 + self._kill_counter += 1 self.shutdown() if self._kill_counter > 1: - logger.warning('Force quit') + logger.warning("Force quit") sys.exit(-1) else: - logger.warning('Kill requested') + logger.warning("Kill requested") def shutdown(self): self.watchdog_event.set() - + def run(self): if not self.started_event.is_set(): self.start() if self._watchdog_task is None: # start the watchdog task - self._watchdog_task = self.new_task(self.watchdog_task(), name='Watchdog task') + self._watchdog_task = self.new_task(self.watchdog_task(), name="Watchdog task") self.get_loop().run_until_complete(self._watchdog_task) else: - logger.warning('Engine already running') + logger.warning("Engine already running") ##################################################### # Debugging tools ##################################################### def dump_timers(self): - headers = ['Func','Period','Counter','Deadline'] + headers = ["Func", "Period", "Counter", "Deadline"] rows = [] now = time.time() for t in self.timers: - remain = round(t.deadline-now,1) - rows.append([str(t.func),t.period,t.counter,remain]) - print('= Timers') + remain = round(t.deadline - now, 1) + rows.append([str(t.func), t.period, t.counter, remain]) + print("= Timers") print(tabulate(rows, headers=headers, tablefmt="fancy_grid")) - def dump_tasks(self): - headers = ["Name","Coro","Loop ID"] + headers = ["Name", "Coro", "Loop ID"] rows = [] for t in self.all_tasks(): - rows.append([t.get_name(),str(t.get_coro()),id(t.get_loop())]) - print('= Tasks') + rows.append([t.get_name(), str(t.get_coro()), id(t.get_loop())]) + print("= Tasks") print(tabulate(rows, headers=headers, tablefmt="fancy_grid")) def dump_devices(self): - headers = ["addr","dev_type","info"] + headers = ["addr", "dev_type", "info"] rows = [] for d in self.devices: - rows.append([d.address,d.dev_type,d.info]) - print('= Devices') + rows.append([d.address, d.dev_type, d.info]) + print("= Devices") print(tabulate(rows, headers=headers, tablefmt="fancy_grid")) def dump_hooks(self): - headers = ["Type","Hook"] + headers = ["Type", "Hook"] rows = [] for h in self._hooks: - rows.append([h.type,str(h.func)]) - print('= Hooks') + rows.append([h.type, str(h.func)]) + print("= Hooks") print(tabulate(rows, headers=headers, tablefmt="fancy_grid")) def dump(self): @@ -298,7 +333,7 @@ class AsyncEngine(core.EngineMixin): self.dump_timers() self.dump_hooks() - def get_device(self,uuid): + def get_device(self, uuid: Union[str, UUID]) -> Optional["Device"]: uuid = tools.get_uuid(uuid) for dev in self.devices: if dev.address == uuid: @@ -309,26 +344,26 @@ class AsyncEngine(core.EngineMixin): ##################################################### # Utilities functions ##################################################### -async def run_func(func,*args,**kwargs): - """run a function or a coroutine function """ +async def run_func(func, *args, **kwargs): + """run a function or a coroutine function""" if asyncio.iscoroutinefunction(func): - return await func(*args,**kwargs) + return await func(*args, **kwargs) else: - return func(*args,**kwargs) + return func(*args, **kwargs) -async def run_action(msg,device): - """ +async def run_action(msg: "Message", device: "Device"): + """ Extract an action & launch it Return: - action result - None if no result - Notes: + Notes: - If an exception raised, it's logged, and raise an XAALError. - Same API as legacy Engine, but accept coroutine functions """ - method,params = core.search_action(msg,device) + method, params = core.search_action(msg, device) result = None try: if asyncio.iscoroutinefunction(method): @@ -341,45 +376,32 @@ async def run_action(msg,device): return result -##################################################### -# Hooks -##################################################### -class HookType(Enum): - start = 0 - stop = 1 - -class Hook(object): - __slots__ = ['type','func','args','kwargs'] - def __init__(self, type_, func, *args, **kwargs): - self.type = type_ - self.func = func - self.args = args - self.kwargs = kwargs - - ##################################################### # Debugging console ##################################################### -async def console(locals=locals(), port=None): +async def console(locals=locals(), port: Optional[int] = None): """launch a console to enable remote engine inspection""" if port is None: # let's find a free port if not specified def find_free_port(): import socketserver + with socketserver.TCPServer(("localhost", 0), None) as s: return s.server_address[1] + port = find_free_port() - - logger.debug(f'starting debug console on port {port}') - sys.ps1 = '[xAAL] >>> ' - banner = '=' * 78 +"\nxAAL remote console\n" + '=' *78 - locals.update({'pprint':pprint}) + + logger.debug(f"starting debug console on port {port}") + sys.ps1 = "[xAAL] >>> " + banner = "=" * 78 + "\nxAAL remote console\n" + "=" * 78 + locals.update({"pprint": pprint}) def factory(streams): return aioconsole.AsynchronousConsole(locals=locals, streams=streams) + # start the console try: # debian with ipv6 disabled still state that localhost is ::1, which broke aioconsole - await aioconsole.start_interactive_server(host='127.0.0.1', port=port,factory=factory,banner=banner) + await aioconsole.start_interactive_server(host="127.0.0.1", port=port, factory=factory, banner=banner) except OSError: - logger.warning('Unable to run console') + logger.warning("Unable to run console") diff --git a/libs/lib/xaal/lib/core.py b/libs/lib/xaal/lib/core.py index db3ca439..aa449a52 100644 --- a/libs/lib/xaal/lib/core.py +++ b/libs/lib/xaal/lib/core.py @@ -49,7 +49,7 @@ class Timer(object): class EngineMixin(object): __slots__ = ["devices", "timers", "subscribers", "msg_filter", "_attributesChange", "network", "msg_factory"] - def __init__(self, address: int, port: int, hops: int, key: str): + def __init__(self, address: str, port: int, hops: int, key: bytes): self.devices = [] # list of devices / use (un)register_devices() self.timers = [] # functions to call periodic self.subscribers = [] # message receive workflow @@ -128,16 +128,7 @@ class EngineMixin(object): self.queue_msg(msg) dev.update_alive() - def send_is_alive( - self, - dev: "Device", - targets: list = [ - ALIVE_ADDR, - ], - dev_types: list = [ - "any.any", - ], - ): + def send_is_alive(self, dev: "Device", targets: list = [ALIVE_ADDR,], dev_types: list = ["any.any", ] ): """Send a is_alive message, w/ dev_types filtering""" body = {"dev_types": dev_types} self.send_request(dev, targets, MessageAction.IS_ALIVE.value, body) @@ -246,7 +237,7 @@ class EngineMixin(object): def run(self): logger.critical("To be implemented run") - def is_running(self): + def is_running(self) -> bool: logger.critical("To be implemented is_running") return False diff --git a/libs/lib/xaal/lib/engine.py b/libs/lib/xaal/lib/engine.py index 67875d4a..90bd90bb 100644 --- a/libs/lib/xaal/lib/engine.py +++ b/libs/lib/xaal/lib/engine.py @@ -18,33 +18,42 @@ # along with xAAL. If not, see <http://www.gnu.org/licenses/>. # -from . import core -from .network import NetworkConnector -from .exceptions import MessageParserError, CallbackError, XAALError -from . import config - -import time import collections +import logging +import time +import typing from enum import Enum +from typing import Any, Optional + +from . import config, core +from .exceptions import CallbackError, MessageParserError, XAALError +from .network import NetworkConnector +from .messages import MessageAction + +if typing.TYPE_CHECKING: + from .devices import Device + from .messages import Message + -import logging logger = logging.getLogger(__name__) -class EngineState(Enum): - started = 1 - running = 2 - halted = 3 -class Engine(core.EngineMixin): +class EngineState(Enum): + started = 1 + running = 2 + halted = 3 - __slots__ = ['__last_timer','__txFifo','state','network'] - def __init__(self,address=config.address,port=config.port,hops=config.hops,key=config.key): - core.EngineMixin.__init__(self,address,port,hops,key) +class Engine(core.EngineMixin): + __slots__ = ["__last_timer", "__txFifo", "state", "network"] - self.__last_timer = 0 # last timer check - self.__txFifo = collections.deque() # tx msg fifo + def __init__( + self, address: str = config.address, port: int = config.port, hops: int = config.hops, key: bytes = config.key + ): + core.EngineMixin.__init__(self, address, port, hops, key) + self.__last_timer = 0 # last timer check + self.__txFifo = collections.deque() # tx msg fifo # message receive workflow self.subscribe(self.handle_request) # ready to go @@ -56,16 +65,16 @@ class Engine(core.EngineMixin): # xAAL messages Tx handling ##################################################### # Fifo for msg to send - def queue_msg(self, msg): + def queue_msg(self, msg: bytes): """queue an encoded / cyphered message""" self.__txFifo.append(msg) - def send_msg(self, msg): + def send_msg(self, msg: bytes): """Send an encoded message to the bus, use queue_msg instead""" self.network.send(msg) def process_tx_msg(self): - """ Process (send) message in tx queue called from the loop()""" + """Process (send) message in tx queue called from the loop()""" cnt = 0 while self.__txFifo: temp = self.__txFifo.popleft() @@ -79,13 +88,13 @@ class Engine(core.EngineMixin): ##################################################### # xAAL messages subscribers ##################################################### - def receive_msg(self): + def receive_msg(self) -> Optional["Message"]: """return new received message or None""" result = None data = self.network.get_data() if data: try: - msg = self.msg_factory.decode_msg(data,self.msg_filter) + msg = self.msg_factory.decode_msg(data, self.msg_filter) except MessageParserError as e: logger.warning(e) msg = None @@ -99,23 +108,23 @@ class Engine(core.EngineMixin): for func in self.subscribers: func(msg) self.process_attributes_change() - - def handle_request(self, msg): + + def handle_request(self, msg: "Message"): """ Filter msg for devices according default xAAL API then process the request for each targets identied in the engine """ if not msg.is_request(): - return - + return + targets = core.filter_msg_for_devices(msg, self.devices) for target in targets: - if msg.action == 'is_alive': + if msg.action == MessageAction.IS_ALIVE: self.send_alive(target) else: self.handle_action_request(msg, target) - def handle_action_request(self, msg, target): + def handle_action_request(self, msg: "Message", target: "Device"): """ Run method (xAAL exposed method) on device: - None is returned if device method do not return anything @@ -127,7 +136,7 @@ class Engine(core.EngineMixin): try: result = run_action(msg, target) if result is not None: - self.send_reply(dev=target,targets=[msg.source],action=msg.action,body=result) + self.send_reply(dev=target, targets=[msg.source], action=msg.action, body=result) except CallbackError as e: self.send_error(target, e.code, e.description) except XAALError as e: @@ -139,8 +148,8 @@ class Engine(core.EngineMixin): def process_timers(self): """Process all timers to find out which ones should be run""" expire_list = [] - - if len(self.timers)!=0 : + + if len(self.timers) != 0: now = time.time() # little hack to avoid to check timer to often. # w/ this enable timer precision is bad, but far enougth @@ -154,14 +163,14 @@ class Engine(core.EngineMixin): except CallbackError as e: logger.error(e.description) if t.counter != -1: - t.counter-= 1 + t.counter -= 1 if t.counter == 0: expire_list.append(t) t.deadline = now + t.period # delete expired timers for t in expire_list: self.remove_timer(t) - + self.__last_timer = now ##################################################### @@ -188,7 +197,7 @@ class Engine(core.EngineMixin): def start(self): """Start the core engine: send queue alive msg""" - if self.state in [EngineState.started,EngineState.running]: + if self.state in [EngineState.started, EngineState.running]: return self.network.connect() for dev in self.devices: @@ -207,13 +216,14 @@ class Engine(core.EngineMixin): while self.state == EngineState.running: self.loop() - def is_running(self): + def is_running(self) -> bool: if self.state == EngineState.running: return True return False -def run_action(msg,device): - """ + +def run_action(msg: "Message", device: "Device") -> Optional[Any]: + """ Extract an action & launch it Return: - action result @@ -221,12 +231,11 @@ def run_action(msg,device): Note: If an exception raised, it's logged, and raise an XAALError. """ - method,params = core.search_action(msg,device) + method, params = core.search_action(msg, device) result = None try: result = method(**params) except Exception as e: logger.error(e) - raise XAALError("Error in method:%s params:%s" % (msg.action,params)) + raise XAALError("Error in method:%s params:%s" % (msg.action, params)) return result - -- GitLab