Skip to content
Snippets Groups Projects
Commit bd93a486 authored by jkerdreu's avatar jkerdreu
Browse files

Added bunch of console stuff.

git-svn-id: https://redmine.imt-atlantique.fr/svn/xaal/code/Python/branches/0.7@2731 b32b6428-25c9-4566-ad07-03861ab6144f
parent ca431390
No related branches found
No related tags found
No related merge requests found
......@@ -2,20 +2,19 @@ import asyncio
from asyncio.events import get_event_loop
from code import InteractiveConsole
from xaal.lib import Engine as CoreEngine
from xaal.lib.core import EngineState,search_method,filter_msg_for_devices
from xaal.lib.core import EngineState,search_action,filter_msg_for_devices
from xaal.lib import MessageFactory,MessageParserError
from xaal.lib import config
from .network import NetworkConnector
from xaal.lib.exceptions import *
import time
import inspect
import collections
from decorator import decorator
import asyncio
import aiomonitor
import aioconsole
from prettytable import PrettyTable
import logging
logger = logging.getLogger(__name__)
......@@ -26,10 +25,13 @@ def spawn(func,*args,**kwargs):
asyncio.get_event_loop().run_in_executor(None,func,*args,*kwargs)
async def console(locals=locals()):
import sys
sys.ps1 = '[xAAL] >>> '
def _factory(streams):
return aioconsole.AsynchronousConsole(locals=locals, streams=streams)
await aioconsole.start_interactive_server(host='localhost', port=8000,factory=_factory)
banner = '=' * 78 +"\nxAAL remote console\n" + '=' *78
await aioconsole.start_interactive_server(host='localhost', port=8000,factory=_factory,banner=banner)
class Engine(CoreEngine):
......@@ -56,9 +58,9 @@ class Engine(CoreEngine):
now = time.time()
for t in self.timers:
if t.deadline < now:
try:
try:
if asyncio.iscoroutinefunction(t.func):
await t.func()
self._loop.create_task(t.func())
else:
t.func()
except CallbackError as e:
......@@ -103,34 +105,20 @@ class Engine(CoreEngine):
"""Filter msg for devices according default xAAL API then process the
request for each targets identied in the engine
"""
if msg.is_request():
targets = filter_msg_for_devices(msg, self.devices)
if targets:
self.process_request(msg, targets)
def process_request(self, msg, targets):
"""Processes request by device and add related response
if reply necessary in the Tx fifo
if not msg.is_request():
return
targets = filter_msg_for_devices(msg, self.devices)
Note: xAAL attributes change are managed separately
"""
for target in targets:
if msg.action == 'is_alive':
self.send_alive(target)
else:
loop = self._loop
loop.create_task(self.handle_method_request(msg, target))
async def handle_method_request(self, msg, target):
"""Run method (xAAL exposed method) on device:
- None is returned if device method do not return anything
- result is returned if device method gives a response
- Errors are raised if an error occured:
* Internal error
* error returned on the xAAL bus
"""
loop.create_task(self.on_method(msg, target))
async def on_method(self, msg, target):
try:
result = await run_method(msg, target)
result = await run_action(msg, target)
if result != None:
self.send_reply(dev=target,targets=[msg.source],action=msg.action,body=result)
except CallbackError as e:
......@@ -138,7 +126,6 @@ class Engine(CoreEngine):
except XAALError as e:
logger.error(e)
async def loop(self):
logger.warning('Never call me')
......@@ -157,12 +144,12 @@ class Engine(CoreEngine):
self.send_alive(dev)
dev.update_alive()
async def pull_task(self):
async def receive_task(self):
await self.wait_running()
while self.state == EngineState.run:
await self.process_rx_msg()
async def push_task(self):
async def send_task(self):
await self.wait_running()
while self.state == EngineState.run:
temp = await self.__txFifo.get()
......@@ -176,15 +163,24 @@ class Engine(CoreEngine):
self.process_attributesChange()
def start(self):
self.state = EngineState.start
if not self._loop:
self._loop = asyncio.get_event_loop()
loop = self._loop
loop.set_debug(True)
#aiomonitor.start_monitor(loop=loop)
loop.create_task(self.boot_task(),name='Boot')
loop.create_task(self.pull_task(),name='Pull')
loop.create_task(self.push_task(),name='Push')
loop.create_task(self.receive_task(),name='RecvQ')
loop.create_task(self.send_task(),name='SendQ')
loop.create_task(self.timer_task(),name='Timers')
loop.create_task(console(locals()),name='Console')
self.setup_alives_timer()
def setup_alives_timer(self):
for k in self.timers:
# needed on stop-start sequence
if k.func == self.process_alives:
return
self.add_timer(self.process_alives,10)
def stop(self):
self.state = EngineState.halt
......@@ -198,11 +194,6 @@ class Engine(CoreEngine):
def all_tasks(self):
return list(asyncio.all_tasks(self._loop))
def dump_tasks(self):
for t in self.all_tasks():
print(f"{t.get_name()} \t {t.get_coro()} \t {t.get_loop()}")
def shutdown(self):
loop = self._loop
......@@ -212,13 +203,38 @@ class Engine(CoreEngine):
for task in self.all_tasks():
task.cancel()
loop.stop()
def dump_timers(self):
table = PrettyTable(["Func","Period","Counter","Remaining"])
now = time.time()
for t in self.timers:
remain = round(t.deadline-now,1)
table.add_row([str(t.func),t.period,t.counter,remain])
print(table)
def dump_tasks(self):
table = PrettyTable(["Name","Coro","Loop"])
for t in self.all_tasks():
table.add_row([t.get_name(),str(t.get_coro()),str(t.get_loop())])
print(table)
def dump_devices(self):
table = PrettyTable(["addr","dev_type","info"])
for d in self.devices:
table.add_row([d.address,d.dev_type,d.info])
print(table)
def dump(self):
self.dump_devices()
self.dump_tasks()
self.dump_timers()
async def run_method(msg,device):
method,params = search_method(msg,device)
async def run_action(msg,device):
method,params = search_action(msg,device)
result = None
try:
print(f"Running method {method}")
if asyncio.iscoroutinefunction(method):
result = await method(**params)
else:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment