Skip to content
Snippets Groups Projects
Commit 7dc04e6a authored by jkerdreu's avatar jkerdreu
Browse files

Added task



git-svn-id: https://redmine.imt-atlantique.fr/svn/xaal/code/Python/branches/0.7@2728 b32b6428-25c9-4566-ad07-03861ab6144f
parent b54a3e25
Branches
No related tags found
No related merge requests found
import asyncio
from asyncio.events import get_event_loop
from code import InteractiveConsole
from xaal.lib import Engine as CoreEngine
from xaal.lib.core import EngineState
from xaal.lib.core import EngineState,search_method,filter_msg_for_devices
from xaal.lib import MessageFactory,MessageParserError
from xaal.lib import config
from .network import NetworkConnector
......@@ -13,10 +15,22 @@ from decorator import decorator
import asyncio
import aiomonitor
import aioconsole
import logging
logger = logging.getLogger(__name__)
@decorator
def spawn(func,*args,**kwargs):
asyncio.get_event_loop().run_in_executor(None,func,*args,*kwargs)
async def console(locals=locals()):
def _factory(streams):
return aioconsole.AsynchronousConsole(locals=locals, streams=streams)
await aioconsole.start_interactive_server(host='localhost', port=8000,factory=_factory)
class Engine(CoreEngine):
def __init__(self,address=config.address,port=config.port,hops=config.hops,key=config.key):
......@@ -34,6 +48,8 @@ class Engine(CoreEngine):
# start msg worker
self.msg_factory = MessageFactory(key)
self._loop = None
async def process_timers(self):
expire_list = []
......@@ -83,15 +99,48 @@ class Engine(CoreEngine):
func(msg)
self.process_attributesChange()
def handle_request(self, msg):
"""Filter msg for devices according default xAAL API then process the
request for each targets identied in the engine
"""
if msg.is_request():
targets = filter_msg_for_devices(msg, self.devices)
if targets:
self.process_request(msg, targets)
def process_request(self, msg, targets):
"""Processes request by device and add related response
if reply necessary in the Tx fifo
Note: xAAL attributes change are managed separately
"""
for target in targets:
if msg.action == 'is_alive':
self.send_alive(target)
else:
loop = self._loop
loop.create_task(self.handle_method_request(msg, target))
async def handle_method_request(self, msg, target):
"""Run method (xAAL exposed method) on device:
- None is returned if device method do not return anything
- result is returned if device method gives a response
- Errors are raised if an error occured:
* Internal error
* error returned on the xAAL bus
"""
try:
result = await run_method(msg, target)
if result != None:
self.send_reply(dev=target,targets=[msg.source],action=msg.action,body=result)
except CallbackError as e:
self.send_error(target, e.code, e.description)
except XAALError as e:
logger.error(e)
async def loop(self):
logger.warning('Never call me')
#await self.process_rx_msg()
# Process attributes change for devices due to timers
#self.process_attributesChange()
# Process Alives
#self.process_alives()
# Process xAAL msgs to send
#self.process_tx_msg()
async def wait_running(self):
while 1:
......@@ -126,11 +175,12 @@ class Engine(CoreEngine):
await self.process_timers()
self.process_attributesChange()
def start(self,loop=None):
if not loop:
loop = asyncio.get_event_loop()
def start(self):
if not self._loop:
self._loop = asyncio.get_event_loop()
loop = self._loop
loop.set_debug(True)
aiomonitor.start_monitor(loop=loop)
#aiomonitor.start_monitor(loop=loop)
loop.create_task(self.boot_task(),name='Boot')
loop.create_task(self.pull_task(),name='Pull')
loop.create_task(self.push_task(),name='Push')
......@@ -138,13 +188,42 @@ class Engine(CoreEngine):
def stop(self):
self.state = EngineState.halt
for task in self.all_tasks():
task.cancel()
def run(self,loop=None):
def run(self):
if self.state == EngineState.halt:
self.start(loop)
loop = asyncio.get_event_loop()
loop.run_forever()
self.start()
self._loop.run_forever()
@decorator
def spawn(func,*args,**kwargs):
asyncio.get_event_loop().run_in_executor(None,func,*args,*kwargs)
def all_tasks(self):
return list(asyncio.all_tasks(self._loop))
def dump_tasks(self):
for t in self.all_tasks():
print(f"{t.get_name()} \t {t.get_coro()} \t {t.get_loop()}")
def shutdown(self):
loop = self._loop
logger.info('Shutdown in progress')
if not loop.is_running():
return
for task in self.all_tasks():
task.cancel()
loop.stop()
async def run_method(msg,device):
method,params = search_method(msg,device)
result = None
try:
if asyncio.iscoroutinefunction(method):
result = await method(**params)
else:
result = method(**params)
except Exception as e:
logger.error(e)
raise XAALError("Error in method:%s params:%s" % (msg.action,params))
return result
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment