Skip to content
Snippets Groups Projects
Commit 7e279ae2 authored by KERDREUX Jerome's avatar KERDREUX Jerome
Browse files

Massive type hinting / Format / import ordering

fix a bug in address typed as int, and is_running() missing return type
parent 82a4c273
No related branches found
No related tags found
1 merge request!1First try of type hints
......@@ -3,8 +3,11 @@ import logging
import signal
import sys
import time
import typing
from enum import Enum
from pprint import pprint
from typing import Any, Optional, Union
from uuid import UUID
import aioconsole
from tabulate import tabulate
......@@ -14,29 +17,62 @@ from .aionetwork import AsyncNetworkConnector
from .exceptions import CallbackError, XAALError
from .messages import MessageParserError
if typing.TYPE_CHECKING:
from .devices import Device
from .messages import Message
logger = logging.getLogger(__name__)
class AsyncEngine(core.EngineMixin):
__slots__ = ['__txFifo','_loop','_tasks','_hooks','_watchdog_task','_kill_counter','running_event','watchdog_event','started_event']
#####################################################
# Hooks
#####################################################
class HookType(Enum):
start = 0
stop = 1
class Hook(object):
__slots__ = ["type", "func", "args", "kwargs"]
def __init__(self, address=config.address, port=config.port, hops=config.hops, key=config.key):
core.EngineMixin.__init__(self,address,port,hops,key)
def __init__(self, type_, func, *args, **kwargs):
self.type = type_
self.func = func
self.args = args
self.kwargs = kwargs
self.__txFifo = asyncio.Queue() # tx msg fifo
self._loop = None # event loop
self._hooks = [] # hooks
self._tasks = [] # tasks
self._watchdog_task = None # watchdog task
self._kill_counter = 0 # watchdog counter
self.started_event = asyncio.Event() # engine started event
self.running_event = asyncio.Event() # engine running event
self.watchdog_event = asyncio.Event() # watchdog event
class AsyncEngine(core.EngineMixin):
__slots__ = [
"__txFifo",
"_loop",
"_tasks",
"_hooks",
"_watchdog_task",
"_kill_counter",
"running_event",
"watchdog_event",
"started_event",
]
def __init__(
self, address: str = config.address, port: int = config.port, hops: int = config.hops, key: bytes = config.key):
core.EngineMixin.__init__(self, address, port, hops, key)
self.__txFifo = asyncio.Queue() # tx msg fifo
self._loop = None # event loop
self._hooks = [] # hooks
self._tasks = [] # tasks
self._watchdog_task = None # watchdog task
self._kill_counter = 0 # watchdog counter
self.started_event = asyncio.Event() # engine started event
self.running_event = asyncio.Event() # engine running event
self.watchdog_event = asyncio.Event() # watchdog event
signal.signal(signal.SIGTERM, self.sigkill_handler)
signal.signal(signal.SIGINT, self.sigkill_handler)
# message receive workflow
self.subscribe(self.handle_request)
# start network
......@@ -45,20 +81,20 @@ class AsyncEngine(core.EngineMixin):
#####################################################
# Hooks
#####################################################
def on_start(self,func,*args,**kwargs):
hook = Hook(HookType.start,func,*args,**kwargs)
def on_start(self, func, *args, **kwargs):
hook = Hook(HookType.start, func, *args, **kwargs)
self._hooks.append(hook)
def on_stop(self,func,*args,**kwargs):
hook = Hook(HookType.stop,func,*args,**kwargs)
def on_stop(self, func, *args, **kwargs):
hook = Hook(HookType.stop, func, *args, **kwargs)
self._hooks.append(hook)
async def run_hooks(self,hook_type):
hooks = list(filter(lambda hook: hook.type==hook_type,self._hooks))
if len(hooks)!=0:
async def run_hooks(self, hook_type: HookType):
hooks = list(filter(lambda hook: hook.type == hook_type, self._hooks))
if len(hooks) != 0:
logger.debug(f"Launching {hook_type} hooks")
for h in hooks:
await run_func(h.func,*h.args,**h.kwargs)
await run_func(h.func, *h.args, **h.kwargs)
#####################################################
# timers
......@@ -74,7 +110,7 @@ class AsyncEngine(core.EngineMixin):
except CallbackError as e:
logger.error(e.description)
if t.counter != -1:
t.counter-= 1
t.counter -= 1
if t.counter == 0:
expire_list.append(t)
t.deadline = now + t.period
......@@ -85,20 +121,20 @@ class AsyncEngine(core.EngineMixin):
#####################################################
# msg send / receive
#####################################################
def queue_msg(self, msg):
def queue_msg(self, msg: bytes):
"""queue a message"""
self.__txFifo.put_nowait(msg)
def send_msg(self, msg):
def send_msg(self, msg: bytes):
"""Send an encoded message to the bus, use queue_msg instead"""
self.network.send(msg)
async def receive_msg(self):
async def receive_msg(self) -> Optional["Message"]:
"""return new received message or None"""
data = await self.network.get_data()
if data:
try:
msg = self.msg_factory.decode_msg(data,self.msg_filter)
msg = self.msg_factory.decode_msg(data, self.msg_filter)
except MessageParserError as e:
logger.warning(e)
msg = None
......@@ -110,10 +146,10 @@ class AsyncEngine(core.EngineMixin):
msg = await self.receive_msg()
if msg:
for func in self.subscribers:
await run_func(func,msg)
await run_func(func, msg)
self.process_attributes_change()
def handle_request(self, msg):
def handle_request(self, msg: "Message"):
"""Filter msg for devices according default xAAL API then process the
request for each targets identied in the engine
"""
......@@ -121,16 +157,16 @@ class AsyncEngine(core.EngineMixin):
return
targets = core.filter_msg_for_devices(msg, self.devices)
for target in targets:
if msg.action == 'is_alive':
if msg.action == "is_alive":
self.send_alive(target)
else:
self.new_task(self.handle_action_request(msg, target))
async def handle_action_request(self, msg, target):
async def handle_action_request(self, msg: "Message", target: "Device"):
try:
result = await run_action(msg, target)
if result is not None:
self.send_reply(dev=target,targets=[msg.source],action=msg.action,body=result)
self.send_reply(dev=target, targets=[msg.source], action=msg.action, body=result)
except CallbackError as e:
self.send_error(target, e.code, e.description)
except XAALError as e:
......@@ -139,25 +175,25 @@ class AsyncEngine(core.EngineMixin):
#####################################################
# Asyncio loop & Tasks
#####################################################
def get_loop(self):
def get_loop(self) -> asyncio.AbstractEventLoop:
if self._loop is None:
logger.debug('New event loop')
logger.debug("New event loop")
self._loop = asyncio.get_event_loop()
return self._loop
def new_task(self, coro, name=None):
def new_task(self, coro: Any, name: Optional[str] = None) -> asyncio.Task:
# we maintain a task list, to be able to stop/start the engine
# on demand. needed by HASS
task = self.get_loop().create_task(coro,name=name)
task = self.get_loop().create_task(coro, name=name)
self._tasks.append(task)
task.add_done_callback(self.task_callback)
return task
def task_callback(self, task):
def task_callback(self, task: asyncio.Task):
# called when a task ended
self._tasks.remove(task)
def all_tasks(self):
def all_tasks(self) -> typing.List[asyncio.Task]:
return self._tasks
async def boot_task(self):
......@@ -191,34 +227,34 @@ class AsyncEngine(core.EngineMixin):
async def watchdog_task(self):
await self.watchdog_event.wait()
await self.stop()
logger.info('Exit')
logger.info("Exit")
#####################################################
# start / stop / shutdown
#####################################################
def is_running(self):
def is_running(self) -> bool:
return self.running_event.is_set()
def start(self):
if self.is_running():
logger.warning('Engine already started')
logger.warning("Engine already started")
return
self.started_event.set()
self.new_task(self.boot_task(), name='Boot')
self.new_task(self.receive_task(), name='RecvQ')
self.new_task(self.send_task(), name='SendQ')
self.new_task(self.timer_task(), name='Timers')
self.new_task(console(locals()), name='Console')
self.new_task(self.boot_task(), name="Boot")
self.new_task(self.receive_task(), name="RecvQ")
self.new_task(self.send_task(), name="SendQ")
self.new_task(self.timer_task(), name="Timers")
self.new_task(console(locals()), name="Console")
def setup_alives_timer(self):
# needed on stop-start sequence
if self.process_alives in [t.func for t in self.timers]:
return
# process alives every 10 seconds
self.add_timer(self.process_alives,10)
self.add_timer(self.process_alives, 10)
async def stop(self):
logger.info('Stopping engine')
logger.info("Stopping engine")
await self.run_hooks(HookType.stop)
self.running_event.clear()
self.started_event.clear()
......@@ -229,67 +265,66 @@ class AsyncEngine(core.EngineMixin):
await asyncio.sleep(0.1)
def sigkill_handler(self, signal, frame):
print("", end = "\r") #remove the uggly ^C
print("", end="\r") # remove the uggly ^C
if not self.is_running():
logger.warning('Engine already stopped')
logger.warning("Engine already stopped")
self._kill_counter = 1
self._kill_counter +=1
self._kill_counter += 1
self.shutdown()
if self._kill_counter > 1:
logger.warning('Force quit')
logger.warning("Force quit")
sys.exit(-1)
else:
logger.warning('Kill requested')
logger.warning("Kill requested")
def shutdown(self):
self.watchdog_event.set()
def run(self):
if not self.started_event.is_set():
self.start()
if self._watchdog_task is None:
# start the watchdog task
self._watchdog_task = self.new_task(self.watchdog_task(), name='Watchdog task')
self._watchdog_task = self.new_task(self.watchdog_task(), name="Watchdog task")
self.get_loop().run_until_complete(self._watchdog_task)
else:
logger.warning('Engine already running')
logger.warning("Engine already running")
#####################################################
# Debugging tools
#####################################################
def dump_timers(self):
headers = ['Func','Period','Counter','Deadline']
headers = ["Func", "Period", "Counter", "Deadline"]
rows = []
now = time.time()
for t in self.timers:
remain = round(t.deadline-now,1)
rows.append([str(t.func),t.period,t.counter,remain])
print('= Timers')
remain = round(t.deadline - now, 1)
rows.append([str(t.func), t.period, t.counter, remain])
print("= Timers")
print(tabulate(rows, headers=headers, tablefmt="fancy_grid"))
def dump_tasks(self):
headers = ["Name","Coro","Loop ID"]
headers = ["Name", "Coro", "Loop ID"]
rows = []
for t in self.all_tasks():
rows.append([t.get_name(),str(t.get_coro()),id(t.get_loop())])
print('= Tasks')
rows.append([t.get_name(), str(t.get_coro()), id(t.get_loop())])
print("= Tasks")
print(tabulate(rows, headers=headers, tablefmt="fancy_grid"))
def dump_devices(self):
headers = ["addr","dev_type","info"]
headers = ["addr", "dev_type", "info"]
rows = []
for d in self.devices:
rows.append([d.address,d.dev_type,d.info])
print('= Devices')
rows.append([d.address, d.dev_type, d.info])
print("= Devices")
print(tabulate(rows, headers=headers, tablefmt="fancy_grid"))
def dump_hooks(self):
headers = ["Type","Hook"]
headers = ["Type", "Hook"]
rows = []
for h in self._hooks:
rows.append([h.type,str(h.func)])
print('= Hooks')
rows.append([h.type, str(h.func)])
print("= Hooks")
print(tabulate(rows, headers=headers, tablefmt="fancy_grid"))
def dump(self):
......@@ -298,7 +333,7 @@ class AsyncEngine(core.EngineMixin):
self.dump_timers()
self.dump_hooks()
def get_device(self,uuid):
def get_device(self, uuid: Union[str, UUID]) -> Optional["Device"]:
uuid = tools.get_uuid(uuid)
for dev in self.devices:
if dev.address == uuid:
......@@ -309,26 +344,26 @@ class AsyncEngine(core.EngineMixin):
#####################################################
# Utilities functions
#####################################################
async def run_func(func,*args,**kwargs):
"""run a function or a coroutine function """
async def run_func(func, *args, **kwargs):
"""run a function or a coroutine function"""
if asyncio.iscoroutinefunction(func):
return await func(*args,**kwargs)
return await func(*args, **kwargs)
else:
return func(*args,**kwargs)
return func(*args, **kwargs)
async def run_action(msg,device):
"""
async def run_action(msg: "Message", device: "Device"):
"""
Extract an action & launch it
Return:
- action result
- None if no result
Notes:
Notes:
- If an exception raised, it's logged, and raise an XAALError.
- Same API as legacy Engine, but accept coroutine functions
"""
method,params = core.search_action(msg,device)
method, params = core.search_action(msg, device)
result = None
try:
if asyncio.iscoroutinefunction(method):
......@@ -341,45 +376,32 @@ async def run_action(msg,device):
return result
#####################################################
# Hooks
#####################################################
class HookType(Enum):
start = 0
stop = 1
class Hook(object):
__slots__ = ['type','func','args','kwargs']
def __init__(self, type_, func, *args, **kwargs):
self.type = type_
self.func = func
self.args = args
self.kwargs = kwargs
#####################################################
# Debugging console
#####################################################
async def console(locals=locals(), port=None):
async def console(locals=locals(), port: Optional[int] = None):
"""launch a console to enable remote engine inspection"""
if port is None:
# let's find a free port if not specified
def find_free_port():
import socketserver
with socketserver.TCPServer(("localhost", 0), None) as s:
return s.server_address[1]
port = find_free_port()
logger.debug(f'starting debug console on port {port}')
sys.ps1 = '[xAAL] >>> '
banner = '=' * 78 +"\nxAAL remote console\n" + '=' *78
locals.update({'pprint':pprint})
logger.debug(f"starting debug console on port {port}")
sys.ps1 = "[xAAL] >>> "
banner = "=" * 78 + "\nxAAL remote console\n" + "=" * 78
locals.update({"pprint": pprint})
def factory(streams):
return aioconsole.AsynchronousConsole(locals=locals, streams=streams)
# start the console
try:
# debian with ipv6 disabled still state that localhost is ::1, which broke aioconsole
await aioconsole.start_interactive_server(host='127.0.0.1', port=port,factory=factory,banner=banner)
await aioconsole.start_interactive_server(host="127.0.0.1", port=port, factory=factory, banner=banner)
except OSError:
logger.warning('Unable to run console')
logger.warning("Unable to run console")
......@@ -49,7 +49,7 @@ class Timer(object):
class EngineMixin(object):
__slots__ = ["devices", "timers", "subscribers", "msg_filter", "_attributesChange", "network", "msg_factory"]
def __init__(self, address: int, port: int, hops: int, key: str):
def __init__(self, address: str, port: int, hops: int, key: bytes):
self.devices = [] # list of devices / use (un)register_devices()
self.timers = [] # functions to call periodic
self.subscribers = [] # message receive workflow
......@@ -128,16 +128,7 @@ class EngineMixin(object):
self.queue_msg(msg)
dev.update_alive()
def send_is_alive(
self,
dev: "Device",
targets: list = [
ALIVE_ADDR,
],
dev_types: list = [
"any.any",
],
):
def send_is_alive(self, dev: "Device", targets: list = [ALIVE_ADDR,], dev_types: list = ["any.any", ] ):
"""Send a is_alive message, w/ dev_types filtering"""
body = {"dev_types": dev_types}
self.send_request(dev, targets, MessageAction.IS_ALIVE.value, body)
......@@ -246,7 +237,7 @@ class EngineMixin(object):
def run(self):
logger.critical("To be implemented run")
def is_running(self):
def is_running(self) -> bool:
logger.critical("To be implemented is_running")
return False
......
......@@ -18,33 +18,42 @@
# along with xAAL. If not, see <http://www.gnu.org/licenses/>.
#
from . import core
from .network import NetworkConnector
from .exceptions import MessageParserError, CallbackError, XAALError
from . import config
import time
import collections
import logging
import time
import typing
from enum import Enum
from typing import Any, Optional
from . import config, core
from .exceptions import CallbackError, MessageParserError, XAALError
from .network import NetworkConnector
from .messages import MessageAction
if typing.TYPE_CHECKING:
from .devices import Device
from .messages import Message
import logging
logger = logging.getLogger(__name__)
class EngineState(Enum):
started = 1
running = 2
halted = 3
class Engine(core.EngineMixin):
class EngineState(Enum):
started = 1
running = 2
halted = 3
__slots__ = ['__last_timer','__txFifo','state','network']
def __init__(self,address=config.address,port=config.port,hops=config.hops,key=config.key):
core.EngineMixin.__init__(self,address,port,hops,key)
class Engine(core.EngineMixin):
__slots__ = ["__last_timer", "__txFifo", "state", "network"]
self.__last_timer = 0 # last timer check
self.__txFifo = collections.deque() # tx msg fifo
def __init__(
self, address: str = config.address, port: int = config.port, hops: int = config.hops, key: bytes = config.key
):
core.EngineMixin.__init__(self, address, port, hops, key)
self.__last_timer = 0 # last timer check
self.__txFifo = collections.deque() # tx msg fifo
# message receive workflow
self.subscribe(self.handle_request)
# ready to go
......@@ -56,16 +65,16 @@ class Engine(core.EngineMixin):
# xAAL messages Tx handling
#####################################################
# Fifo for msg to send
def queue_msg(self, msg):
def queue_msg(self, msg: bytes):
"""queue an encoded / cyphered message"""
self.__txFifo.append(msg)
def send_msg(self, msg):
def send_msg(self, msg: bytes):
"""Send an encoded message to the bus, use queue_msg instead"""
self.network.send(msg)
def process_tx_msg(self):
""" Process (send) message in tx queue called from the loop()"""
"""Process (send) message in tx queue called from the loop()"""
cnt = 0
while self.__txFifo:
temp = self.__txFifo.popleft()
......@@ -79,13 +88,13 @@ class Engine(core.EngineMixin):
#####################################################
# xAAL messages subscribers
#####################################################
def receive_msg(self):
def receive_msg(self) -> Optional["Message"]:
"""return new received message or None"""
result = None
data = self.network.get_data()
if data:
try:
msg = self.msg_factory.decode_msg(data,self.msg_filter)
msg = self.msg_factory.decode_msg(data, self.msg_filter)
except MessageParserError as e:
logger.warning(e)
msg = None
......@@ -99,23 +108,23 @@ class Engine(core.EngineMixin):
for func in self.subscribers:
func(msg)
self.process_attributes_change()
def handle_request(self, msg):
def handle_request(self, msg: "Message"):
"""
Filter msg for devices according default xAAL API then process the
request for each targets identied in the engine
"""
if not msg.is_request():
return
return
targets = core.filter_msg_for_devices(msg, self.devices)
for target in targets:
if msg.action == 'is_alive':
if msg.action == MessageAction.IS_ALIVE:
self.send_alive(target)
else:
self.handle_action_request(msg, target)
def handle_action_request(self, msg, target):
def handle_action_request(self, msg: "Message", target: "Device"):
"""
Run method (xAAL exposed method) on device:
- None is returned if device method do not return anything
......@@ -127,7 +136,7 @@ class Engine(core.EngineMixin):
try:
result = run_action(msg, target)
if result is not None:
self.send_reply(dev=target,targets=[msg.source],action=msg.action,body=result)
self.send_reply(dev=target, targets=[msg.source], action=msg.action, body=result)
except CallbackError as e:
self.send_error(target, e.code, e.description)
except XAALError as e:
......@@ -139,8 +148,8 @@ class Engine(core.EngineMixin):
def process_timers(self):
"""Process all timers to find out which ones should be run"""
expire_list = []
if len(self.timers)!=0 :
if len(self.timers) != 0:
now = time.time()
# little hack to avoid to check timer to often.
# w/ this enable timer precision is bad, but far enougth
......@@ -154,14 +163,14 @@ class Engine(core.EngineMixin):
except CallbackError as e:
logger.error(e.description)
if t.counter != -1:
t.counter-= 1
t.counter -= 1
if t.counter == 0:
expire_list.append(t)
t.deadline = now + t.period
# delete expired timers
for t in expire_list:
self.remove_timer(t)
self.__last_timer = now
#####################################################
......@@ -188,7 +197,7 @@ class Engine(core.EngineMixin):
def start(self):
"""Start the core engine: send queue alive msg"""
if self.state in [EngineState.started,EngineState.running]:
if self.state in [EngineState.started, EngineState.running]:
return
self.network.connect()
for dev in self.devices:
......@@ -207,13 +216,14 @@ class Engine(core.EngineMixin):
while self.state == EngineState.running:
self.loop()
def is_running(self):
def is_running(self) -> bool:
if self.state == EngineState.running:
return True
return False
def run_action(msg,device):
"""
def run_action(msg: "Message", device: "Device") -> Optional[Any]:
"""
Extract an action & launch it
Return:
- action result
......@@ -221,12 +231,11 @@ def run_action(msg,device):
Note: If an exception raised, it's logged, and raise an XAALError.
"""
method,params = core.search_action(msg,device)
method, params = core.search_action(msg, device)
result = None
try:
result = method(**params)
except Exception as e:
logger.error(e)
raise XAALError("Error in method:%s params:%s" % (msg.action,params))
raise XAALError("Error in method:%s params:%s" % (msg.action, params))
return result
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment