Skip to content
Snippets Groups Projects
Commit 92731288 authored by KERDREUX Jerome's avatar KERDREUX Jerome
Browse files

Change FuncT and SubFunct

The long time was short ;)
Not really a big change, just drop the TypeVar (TypeVar sucks really)
parent 49bc28c5
No related branches found
No related tags found
1 merge request!1First try of type hints
......@@ -171,7 +171,7 @@ class AsyncEngine(core.EngineMixin):
try:
result = await run_action(msg, target)
if result is not None:
if result is not None and type(result) is dict:
self.send_reply(dev=target, targets=[msg.source], action=msg.action, body=result)
except CallbackError as e:
self.send_error(target, e.code, e.description)
......
......@@ -22,7 +22,7 @@ import inspect
import logging
import time
import typing
from typing import Any, Awaitable, Optional, List, Callable, TypeVar
from typing import Any, Awaitable, Optional, List, Callable, Union
from .exceptions import EngineError, XAALError
from .messages import ALIVE_ADDR, MessageAction, MessageFactory, MessageType
......@@ -32,14 +32,10 @@ if typing.TYPE_CHECKING:
from .messages import Message
# Function type w/ no argument, and no return (Timer, Hooks)
FuncT = TypeVar("FuncT", Callable[[], None], Callable[[], Awaitable[None]])
FuncT = Union[Callable[[], None], Callable[[], Awaitable[None]]]
# Function type w/ message as argument (subscribers), no return
SubFuncT = TypeVar(
"SubFuncT",
Callable[['Message'], None],
Callable[['Message'], Awaitable[None]]
)
SubFuncT = Union[Callable[['Message'], None], Callable[['Message'], Awaitable[None]]]
logger = logging.getLogger(__name__)
......@@ -60,9 +56,9 @@ class EngineMixin(object):
__slots__ = ['devices', 'timers', 'subscribers', 'msg_filter', '_attributesChange', 'network', 'msg_factory']
def __init__(self, address: str, port: int, hops: int, key: bytes):
self.devices = [] # list of devices / use (un)register_devices()
self.timers = [] # functions to call periodic
self.subscribers = [] # message receive workflow
self.devices:list[Device] = [] # list of devices / use (un)register_devices()
self.timers: list[Timer] = [] # functions to call periodic
self.subscribers: list[SubFuncT] = [] # message receive workflow
self.msg_filter = None # message filter
self._attributesChange = [] # list of XAALAttributes instances
......@@ -217,7 +213,6 @@ class EngineMixin(object):
#####################################################
# timers
#####################################################
from typing import Coroutine
def add_timer(self, func: FuncT, period: int, counter: int = -1):
"""
func: function to call
......@@ -301,6 +296,7 @@ def search_action(msg: 'Message', device: 'Device'):
params = {}
result = None
if msg.action in methods.keys():
assert msg.action
method = methods[msg.action]
body_params = None
if msg.body:
......
......@@ -21,7 +21,7 @@
import logging
import time
import typing
from typing import Any, Optional, Union, Callable, TypeVar, Awaitable
from typing import Any, Optional, Union, Callable, Awaitable
from tabulate import tabulate
......@@ -34,8 +34,7 @@ if typing.TYPE_CHECKING:
from .core import EngineMixin
# Funtion types with any arguments and return a dict or None (device methods)
SyncMethodT = Callable[..., Union[dict, None]] # Fonction synchrone qui retourne un dict ou None
AsyncMethodT = Callable[..., Awaitable[Union[dict, None]]] # Coroutine qui retourne un dict ou None
MethodT = Union[Callable[..., Union[dict, None]], Callable[..., Awaitable[Union[dict, None]]]]
logger = logging.getLogger(__name__)
......@@ -134,7 +133,7 @@ class Device(object):
self.next_alive = 0
# Default attributes & methods
self.__attributes = Attributes()
self.methods: dict[str, Union[SyncMethodT, AsyncMethodT]] = {
self.methods: dict[str, MethodT] = {
'get_attributes': self._get_attributes,
'get_description': self._get_description,
}
......@@ -219,14 +218,14 @@ class Device(object):
else:
raise DeviceError("Invalid attributes list, use class Attributes)")
def add_method(self, name: str, func: Union[SyncMethodT, AsyncMethodT]):
def add_method(self, name: str, func: MethodT):
self.methods.update({name: func})
def del_method(self, name: str):
if name in self.methods:
del self.methods[name]
def get_methods(self) -> dict[str, Union[SyncMethodT, AsyncMethodT]]:
def get_methods(self) -> dict[str, MethodT]:
return self.methods
def update_alive(self):
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment