Skip to content
Snippets Groups Projects
Commit e118f583 authored by jkerdreu's avatar jkerdreu
Browse files

Merge in progress



git-svn-id: https://redmine.imt-atlantique.fr/svn/xaal/code/Python/branches/0.7@2749 b32b6428-25c9-4566-ad07-03861ab6144f
parent d2f1a8af
No related branches found
No related tags found
No related merge requests found
......@@ -7,8 +7,16 @@ from . import tools
from . import config
from . import bindings
from .core import Engine,Timer
from .core import Timer
# sync engine
from .engine import Engine
from .network import NetworkConnector
# async engine
from .aioengine import AsyncEngine
from .aionetwork import AsyncNetworkConnector
from .devices import Device, Attribute, Attributes
from .messages import Message,MessageFactory,MessageType
from .exceptions import *
import asyncio
from enum import Enum
from . import core
from . import config
from .messages import MessageParserError
from .aionetwork import AsyncNetworkConnector
from .exceptions import *
import time
from decorator import decorator
import asyncio
import aioconsole
import signal
import sys
import termtables
from pprint import pprint
import logging
logger = logging.getLogger(__name__)
import socketserver
def find_free_port():
with socketserver.TCPServer(("localhost", 0), None) as s:
return s.server_address[1]
@decorator
def spawn(func,*args,**kwargs):
return asyncio.get_event_loop().run_in_executor(None,func,*args,**kwargs)
async def console(locals=locals(),port=None):
if port == None:
port = find_free_port()
logger.info(f'loading console on port {port}')
import sys
sys.ps1 = '[xAAL] >>> '
locals.update({'pprint':pprint})
def _factory(streams):
return aioconsole.AsynchronousConsole(locals=locals, streams=streams)
banner = '=' * 78 +"\nxAAL remote console\n" + '=' *78
try:
await aioconsole.start_interactive_server(host='localhost', port=port,factory=_factory,banner=banner)
except OSError:
logger.warning('Unable to run console')
class HookType(Enum):
start = 0
stop = 1
class AsyncEngine(core.EngineMixin):
def __init__(self,address=config.address,port=config.port,hops=config.hops,key=config.key):
core.EngineMixin.__init__(self,address,port,hops,key)
self.subscribers =[self.handle_request] # message receive workflow
self.__txFifo = asyncio.Queue() # tx msg fifo
self._loop = None
self._hooks = []
self._watchdog_task = None
self._kill_counter = 0
self.running_event = asyncio.Event()
self.watchdog_event = asyncio.Event()
self.started_event = asyncio.Event()
signal.signal(signal.SIGTERM, self.sigkill_handler)
signal.signal(signal.SIGINT, self.sigkill_handler)
# start network
self.network = AsyncNetworkConnector(address, port, hops)
def on_start(self,func,*args,**kwargs):
hook = Hook(HookType.start,func,*args,**kwargs)
self._hooks.append(hook)
def on_stop(self,func,*args,**kwargs):
hook = Hook(HookType.stop,func,*args,**kwargs)
self._hooks.append(hook)
def is_running(self):
return self.running_event.is_set()
async def run_hooks(self,hook_type):
hooks = list(filter(lambda hook: hook.type==hook_type,self._hooks))
if len(hooks)!=0:
logger.debug(f"Launching {hook_type} hooks")
for h in hooks:
await run_func(h.func,*h.args,**h.kwargs)
def get_loop(self):
if self._loop == None:
logger.warning('New event loop')
self._loop = asyncio.get_event_loop()
return self._loop
def all_tasks(self):
return list(asyncio.all_tasks(self.get_loop()))
async def process_timers(self):
"""Process all timers to find out which ones should be run"""
expire_list = []
now = time.time()
for t in self.timers:
if t.deadline < now:
try:
await run_func(t.func)
except CallbackError as e:
logger.error(e.description)
if t.counter != -1:
t.counter-= 1
if t.counter == 0:
expire_list.append(t)
t.deadline = now + t.period
# delete expired timers
for t in expire_list:
self.remove_timer(t)
async def receive_msg(self):
"""return new received message or None"""
data = await self.network.get_data()
if data:
try:
msg = self.msg_factory.decode_msg(data)
except MessageParserError as e:
logger.warning(e)
msg = None
return msg
return None
def queue_msg(self, msg):
"""queue a message"""
self.__txFifo.put_nowait(msg)
def send_msg(self, msg):
"""Send an encoded message to the bus, use queue_msg instead"""
self.network.send(msg)
async def process_rx_msg(self):
"""process incomming messages"""
msg = await self.receive_msg()
if msg:
for func in self.subscribers:
await run_func(func,msg)
self.process_attributes_change()
def handle_request(self, msg):
"""Filter msg for devices according default xAAL API then process the
request for each targets identied in the engine
"""
if not msg.is_request():
return
targets = core.filter_msg_for_devices(msg, self.devices)
for target in targets:
if msg.action == 'is_alive':
self.send_alive(target)
else:
self.get_loop().create_task(self.handle_action_request(msg, target))
async def handle_action_request(self, msg, target):
try:
result = await run_action(msg, target)
if result != None:
self.send_reply(dev=target,targets=[msg.source],action=msg.action,body=result)
except CallbackError as e:
self.send_error(target, e.code, e.description)
except XAALError as e:
logger.error(e)
#####################################################
# Tasks
#####################################################
async def start_task(self):
self.watchdog_event.clear()
# queue the alive before anything
for dev in self.devices:
self.send_alive(dev)
dev.update_alive()
await self.network.connect()
self.running_event.set()
await self.run_hooks(HookType.start)
async def receive_task(self):
await self.running_event.wait()
while self.is_running():
await self.process_rx_msg()
async def send_task(self):
await self.running_event.wait()
while self.is_running():
temp = await self.__txFifo.get()
self.send_msg(temp)
async def timer_task(self):
await self.running_event.wait()
self.setup_alives_timer()
while self.is_running():
await asyncio.sleep(0.2)
await self.process_timers()
self.process_attributes_change()
async def watchdog_task(self):
await self.watchdog_event.wait()
await self.stop()
logger.info('Exit')
#####################################################
# start / stop / shutdown
#####################################################
def start(self):
if self.is_running():
logger.warning('Engine already started')
return
self.started_event.set()
loop = self.get_loop()
loop.create_task(self.start_task(),name='Boot')
loop.create_task(self.receive_task(),name='RecvQ')
loop.create_task(self.send_task(),name='SendQ')
loop.create_task(self.timer_task(),name='Timers')
loop.create_task(console(locals()),name='Console')
def new_task(self,coro,name=None):
return self.get_loop().create_task(coro,name=name)
def setup_alives_timer(self):
# needed on stop-start sequence
if self.process_alives in [t.func for t in self.timers]:
return
# process alives every 10 seconds
self.add_timer(self.process_alives,10)
async def stop(self):
await self.run_hooks(HookType.stop)
self.running_event.clear()
self.started_event.clear()
# cancel all tasks
for task in self.all_tasks():
if task!=self._watchdog_task:
task.cancel()
def sigkill_handler(self,signal,frame):
self._kill_counter +=1
self.shutdown()
if self._kill_counter > 1:
logger.warning('Force quit')
sys.exit(-1)
else:
print("", end = "\r") #remove the uggly ^C
logger.warning('Kill requested')
def shutdown(self):
self.watchdog_event.set()
def run(self):
if not self.started_event.is_set():
self.start()
if self._watchdog_task == None:
# start the watchdog task
loop = self.get_loop()
self._watchdog_task=loop.create_task(self.watchdog_task(),name='Watchdog task')
loop.run_until_complete(self._watchdog_task)
#loop.run_forever()
else:
logger.warning('Engine already running')
#####################################################
# Debugging tools
#####################################################
def dump_timers(self):
header = ['Func','Period','Counter','Deadline']
rows = []
now = time.time()
for t in self.timers:
remain = round(t.deadline-now,1)
rows.append([str(t.func),t.period,t.counter,remain])
print('= Timers')
if rows:
termtables.print(rows,header=header)
def dump_tasks(self):
header = ["Name","Coro","Loop ID"]
rows = []
for t in self.all_tasks():
rows.append([t.get_name(),str(t.get_coro()),id(t.get_loop())])
print('= Tasks')
if rows:
termtables.print(rows,header=header)
def dump_devices(self):
header = ["addr","dev_type","info"]
rows = []
for d in self.devices:
rows.append([d.address,d.dev_type,d.info])
print('= Devices')
if rows:
termtables.print(rows,header=header)
def dump_hooks(self):
header = ["N","Type","Hook"]
rows = []
for h in self._hooks:
rows.append([h.type,str(h.func)])
print('= Hooks')
if rows:
termtables.print(rows,header=header)
def dump(self):
self.dump_devices()
self.dump_tasks()
self.dump_timers()
self.dump_hooks()
def get_device(self,uuid):
from . import tools
uuid = tools.get_uuid(uuid)
for dev in self.devices:
if dev.address == uuid:
return dev
return None
async def run_func(func,*args,**kwargs):
if asyncio.iscoroutinefunction(func):
return await func(*args,**kwargs)
else:
return func(*args,**kwargs)
async def run_action(msg,device):
""" Extract an action & launch it
Return:
- action result
- None if no result
Note: If an exception raised, it's logged, and raise an XAALError.
"""
method,params = core.search_action(msg,device)
result = None
try:
#print(f"Running method {method}")
if asyncio.iscoroutinefunction(method):
result = await method(**params)
else:
result = method(**params)
except Exception as e:
logger.error(e)
raise XAALError("Error in method:%s params:%s" % (msg.action,params))
return result
class Hook(object):
def __init__(self,type_,func,*args,**kwargs):
self.type = type_
self.func = func
self.args = args
self.kwargs = kwargs
import asyncio
import struct
import socket
from enum import Enum
import logging
logger = logging.getLogger(__name__)
class AsyncNetworkConnector(object):
UDP_MAX_SIZE = 65507
def __init__(self, addr, port, hops,bind_addr='0.0.0.0'):
self.addr = addr
self.port = port
self.hops = hops
self.bind_addr = bind_addr
self._rx_queue = asyncio.Queue()
async def connect(self):
loop = asyncio.get_running_loop()
on_con_lost = loop.create_future()
self.transport, self.protocol = await loop.create_datagram_endpoint(
lambda: XAALServerProtocol(on_con_lost,self.receive), sock = self.new_sock())
def new_sock(self):
sock = socket.socket(socket.AF_INET,socket.SOCK_DGRAM,socket.IPPROTO_UDP)
try:
# Linux + MacOS + BSD
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
except:
# Windows
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind((self.bind_addr, self.port))
mreq = struct.pack('4sl',socket.inet_aton(self.addr),socket.INADDR_ANY)
sock.setsockopt(socket.IPPROTO_IP,socket.IP_ADD_MEMBERSHIP,mreq)
sock.setsockopt(socket.IPPROTO_IP,socket.IP_MULTICAST_TTL,10)
sock.setblocking(False)
return sock
def send(self,data):
self.protocol.datagram_send(data,self.addr,self.port)
def receive(self,data):
self._rx_queue.put_nowait(data)
async def get_data(self):
return await self._rx_queue.get()
class XAALServerProtocol(asyncio.Protocol):
def __init__(self,on_con_lost,on_dtg_recv):
self.on_con_lost = on_con_lost
self.on_dtg_recv = on_dtg_recv
def connection_made(self, transport):
logger.info(f"xAAL network connected")
self.transport = transport
def error_received(self, exc):
print('Error received:', exc)
logger.warning(f"Error received: {exc}")
def connection_lost(self, exc):
logger.info(f"Connexion closed: {exc}")
self.on_con_lost.set_result(True)
def datagram_send(self,data,ip,port):
self.transport.sendto(data,(ip,port))
def datagram_received(self, data, addr):
#print(f"pkt from {addr}")
self.on_dtg_recv(data)
from .core import Engine as CoreEngine
import functools
import asyncio
"""
The current xAAL Lib doesn't support asyncio. A full rewrite of Engine and NetworkConnector is needed.
The module provide an temporary hack to warps the Engine mainloop (run() function) in an asyncio loop.
Use this module as temporary fix only.
"""
def spawn(func):
@functools.wraps(func)
def spawn_future(*args,**kwargs):
print(f"Calling {func.__name__}")
asyncio.ensure_future(func(*args,**kwargs))
return spawn_future
class Engine(CoreEngine):
async def run(self):
self.start()
self.running = True
while self.running:
self.loop()
await asyncio.sleep(0.02)
AsyncEngine = Engine
from io import BytesIO
import cbor2
from cbor2 import CBORTag
from cbor2.decoder import CBORDecoder
from . import bindings
......@@ -25,7 +27,16 @@ def dumps(obj, **kwargs):
return cbor2.dumps(obj,default=default_encoder,**kwargs)
def loads(payload, **kwargs):
return cbor2.loads(payload,tag_hook=tag_hook,**kwargs)
return _loads(payload,tag_hook=tag_hook,**kwargs)
def _loads(s, **kwargs):
with BytesIO(s) as fp:
return CustomDecoder(fp, **kwargs).decode()
class CustomDecoder(CBORDecoder):pass
def cleanup(obj):
"""
......
......@@ -18,38 +18,26 @@
# along with xAAL. If not, see <http://www.gnu.org/licenses/>.
#
from .network import NetworkConnector
from .messages import MessageFactory,MessageType
from .messages import MessageType,MessageFactory
from .exceptions import *
from . import config
import time
import inspect
import collections
from enum import Enum
import logging
logger = logging.getLogger(__name__)
class EngineState(Enum):
started = 1
running = 2
halted = 3
class EngineMixin(object):
class Engine(object):
def __init__(self,address=config.address,port=config.port,hops=config.hops,key=config.key):
def __init__(self,address,port,hops,key):
self.devices = [] # list of devices / use (un)register_devices()
self.state = EngineState.halted
self.timers = [] # functions to call periodic
self.__last_timer = 0 # last timer check
self.subscribers =[self.handle_request] # message receive workflow
self.subscribers = [] # message receive workflow
self.__attributesChange = [] # list of XAALAttributes instances
self.__txFifo = collections.deque() # tx msg fifo
# start network
self.network = NetworkConnector(address, port, hops)
# network connector
self.network = None
# start msg worker
self.msg_factory = MessageFactory(key)
......@@ -78,24 +66,7 @@ class Engine(object):
#####################################################
# Fifo for msg to send
def queue_msg(self, msg):
"""queue an encoded / cyphered message"""
self.__txFifo.append(msg)
def process_tx_msg(self):
""" Process (send) message in tx queue called from the loop()"""
cnt = 0
while self.__txFifo:
temp = self.__txFifo.popleft()
self.send_msg(temp)
# try to limit rate
cnt = cnt + 1
if cnt > config.queue_size:
time.sleep(0.2)
break
def send_msg(self, msg):
"""Send an encoded message to the bus, use queue_msg instead"""
self.network.send(msg)
logger.critical("To be implemented queue_msg: %s", msg)
def send_request(self,dev,targets,action,body = None):
"""queue a new request"""
......@@ -124,10 +95,7 @@ class Engine(object):
"""queue a notificaton"""
msg = self.msg_factory.build_msg(dev,[],MessageType.NOTIFY,action,body)
self.queue_msg(msg)
#####################################################
# Alive messages
#####################################################
def send_alive(self, dev):
"""Send a Alive message for a given device"""
timeout = dev.get_timeout()
......@@ -141,6 +109,10 @@ class Engine(object):
msg = self.msg_factory.build_msg(dev, [], MessageType.REQUEST, "is_alive", body)
self.queue_msg(msg)
#####################################################
# Alive messages
#####################################################
def process_alives(self):
"""Periodic sending alive messages"""
now = time.time()
......@@ -176,64 +148,12 @@ class Engine(object):
#####################################################
# xAAL messages subscribers
#####################################################
def receive_msg(self):
"""return new received message or None"""
result = None
data = self.network.get_data()
if data:
try:
msg = self.msg_factory.decode_msg(data)
except MessageParserError as e:
logger.warning(e)
msg = None
result = msg
return result
def subscribe(self,func):
self.subscribers.append(func)
def unsubscribe(self,func):
self.subscribers.remove(func)
def process_subscribers(self):
"""process incomming messages"""
msg = self.receive_msg()
if msg:
for func in self.subscribers:
func(msg)
self.process_attributes_change()
def handle_request(self, msg):
"""Filter msg for devices according default xAAL API then process the
request for each targets identied in the engine
"""
if not msg.is_request():
return
targets = filter_msg_for_devices(msg, self.devices)
for target in targets:
if msg.action == 'is_alive':
self.send_alive(target)
else:
self.handle_action_request(msg, target)
def handle_action_request(self, msg, target):
"""Run method (xAAL exposed method) on device:
- None is returned if device method do not return anything
- result is returned if device method gives a response
- Errors are raised if an error occured:
* Internal error
* error returned on the xAAL bus
"""
try:
result = run_action(msg, target)
if result != None:
self.send_reply(dev=target,targets=[msg.source],action=msg.action,body=result)
except CallbackError as e:
self.send_error(target, e.code, e.description)
except XAALError as e:
logger.error(e)
#####################################################
# timers
#####################################################
......@@ -253,81 +173,38 @@ class Engine(object):
"""remove a given timer from the list"""
self.timers.remove(timer)
def process_timers(self):
"""Process all timers to find out which ones should be run"""
expire_list = []
if len(self.timers)!=0 :
now = time.time()
# little hack to avoid to check timer to often.
# w/ this enable timer precision is bad, but far enougth
if (now - self.__last_timer) < 0.4: return
for t in self.timers:
if t.deadline < now:
try:
t.func()
except CallbackError as e:
logger.error(e.description)
if t.counter != -1:
t.counter-= 1
if t.counter == 0:
expire_list.append(t)
t.deadline = now + t.period
# delete expired timers
for t in expire_list:
self.remove_timer(t)
self.__last_timer = now
#####################################################
# Mainloops & run ..
# start/stop/run API
#####################################################
def loop(self):
"""Process incomming xAAL msg
Process timers
Process attributes change for devices
Process is_alive for device
Send msgs from the Tx Buffer
"""
# Process xAAL msg received, filter msg and process request
self.process_subscribers()
# Process timers
self.process_timers()
# Process attributes change for devices due to timers
self.process_attributes_change()
# Process Alives
self.process_alives()
# Process xAAL msgs to send
self.process_tx_msg()
def start(self):
"""Start the core engine: send queue alive msg"""
if self.state in [EngineState.started,EngineState.running]:
return
self.network.connect()
for dev in self.devices:
self.send_alive(dev)
dev.update_alive()
self.state = EngineState.started
logger.critical("To be implemented start")
def stop(self):
self.state = EngineState.halted
logger.critical("To be implemented stop")
def shutdown(self):
self.stop()
logger.critical("To be implemented shutdown")
def run(self):
self.start()
self.state = EngineState.running
while self.state == EngineState.running:
self.loop()
logger.critical("To be implemented run")
def filter_msg_for_devices(msg, devices):
"""loop throught the devices, to find which are
expected w/ the msg
#####################################################
# Timer class
#####################################################
class Timer(object):
def __init__(self,func,period,counter):
self.func = func
self.period = period
self.counter = counter
self.deadline = time.time() + period
#####################################################
# Usefull functions to Engine developpers
#####################################################
def filter_msg_for_devices(msg, devices):
"""
loop throught the devices, to find which are expected w/ the msg
- Filter on dev_types for is_alive request.
- Filter on device address
"""
......@@ -353,25 +230,9 @@ def filter_msg_for_devices(msg, devices):
results.append(dev)
return results
def run_action(msg,device):
""" Extract an action & launch it
Return:
- action result
- None if no result
Note: If an exception raised, it's logged, and raise an XAALError.
"""
method,params = search_action(msg,device)
result = None
try:
result = method(**params)
except Exception as e:
logger.error(e)
raise XAALError("Error in method:%s params:%s" % (msg.action,params))
return result
def search_action(msg, device):
"""Extract an action (match with methods) from a msg on the device.
"""
Extract an action (match with methods) from a msg on the device.
Return:
- None
- found method & matching parameters
......@@ -409,9 +270,3 @@ def get_args_method(method):
return spec.args
class Timer(object):
def __init__(self,func,period,counter):
self.func = func
self.period = period
self.counter = counter
self.deadline = time.time() + period
......@@ -7,8 +7,8 @@ import logging
import logging.handlers
import os
import time
import signal
import coloredlogs
from decorator import decorator
from . import config,Engine
......@@ -20,24 +20,23 @@ def singleton(class_):
return instances[class_]
return getinstance
def timeit(method):
def timed(*args, **kw):
@decorator
def timeit(method,*args,**kwargs):
logger = logging.getLogger(__name__)
ts = time.time()
result = method(*args, **kw)
result = method(*args, **kwargs)
te = time.time()
logger.debug('%r (%r, %r) %2.6f sec' % (method.__name__, args, kw, te-ts))
logger.debug('%r (%r, %r) %2.6f sec' % (method.__name__, args, kwargs, te-ts))
return result
return timed
def set_console_title(value):
# set xterm title
print("\x1B]0;xAAL => %s\x07" % value )
print("\x1B]0;xAAL => %s\x07" % value,end='\r')
def setup_console_logger(level=config.log_level):
#fmt = '%(asctime)s %(hostname)s %(name)s:%(funcName)s %(levelname)s %(message)s'
fmt = '[%(name)s] %(funcName)s %(levelname)s: %(message)s'
fmt = '%(asctime)s %(name)-25s %(funcName)-18s %(levelname)-8s %(message)s'
#fmt = '[%(name)s] %(funcName)s %(levelname)s: %(message)s'
coloredlogs.install(level=level,fmt=fmt)
def setup_file_logger(name,level=config.log_level,filename = None):
......@@ -64,15 +63,18 @@ def run_package(pkg_name,pkg_setup,console_log = True,file_log=False):
setup_console_logger()
if file_log:
setup_file_logger(pkg_name)
eng = Engine()
logger = logging.getLogger(pkg_name)
logger.info('starting xaal package: %s'% pkg_name )
eng = Engine()
result = pkg_setup(eng)
if result != True:
logger.critical("something goes wrong with package: %s" % pkg_name)
try:
eng.run()
except KeyboardInterrupt:
eng.shutdown()
logger.info("exit")
def run_async_package(pkg_name,pkg_setup,console_log = True,file_log=False):
......@@ -81,23 +83,17 @@ def run_async_package(pkg_name,pkg_setup,console_log = True,file_log=False):
setup_console_logger()
if file_log:
setup_file_logger(pkg_name)
from .asyncio import AsyncEngine
from .aioengine import AsyncEngine
eng = AsyncEngine()
eng.start()
logger = logging.getLogger(pkg_name)
logger.info('starting xaal package: %s'% pkg_name )
result = pkg_setup(eng)
import asyncio
loop = asyncio.get_event_loop()
asyncio.ensure_future(eng.run())
if result != True:
logger.critical("something goes wrong with package: %s" % pkg_name)
for signame in ["SIGINT", "SIGTERM","SIGQUIT"]:
loop.add_signal_handler(getattr(signal, signame), loop.stop)
loop.run_forever()
logger.info("Exit")
try:
eng.run()
except KeyboardInterrupt:
eng.shutdown()
logger.info("Exit")
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment