Skip to content
Snippets Groups Projects
Commit 4600686a authored by jkerdreu's avatar jkerdreu
Browse files

Before merging..

git-svn-id: https://redmine.imt-atlantique.fr/svn/xaal/code/Python/branches/0.7@2743 b32b6428-25c9-4566-ad07-03861ab6144f
parent 2f7df511
Branches
No related tags found
No related merge requests found
import asyncio
from asyncio.events import get_event_loop
from code import InteractiveConsole
from enum import Enum
from xaal.lib import Engine as CoreEngine
from xaal.lib.core import EngineState,search_action,filter_msg_for_devices
from xaal.lib import MessageFactory,MessageParserError
......@@ -13,25 +12,40 @@ from decorator import decorator
import asyncio
import aioconsole
import signal
import sys
from prettytable import PrettyTable
from pprint import pprint
import logging
logger = logging.getLogger(__name__)
@decorator
def spawn(func,*args,**kwargs):
asyncio.get_event_loop().run_in_executor(None,func,*args,*kwargs)
return asyncio.get_event_loop().run_in_executor(None,func,*args,**kwargs)
async def console(locals=locals()):
async def console(locals=locals(),port=9999):
logger.info(f'loading console on port {port}')
import sys
sys.ps1 = '[xAAL] >>> '
locals.update({'pprint':pprint})
def _factory(streams):
return aioconsole.AsynchronousConsole(locals=locals, streams=streams)
banner = '=' * 78 +"\nxAAL remote console\n" + '=' *78
await aioconsole.start_interactive_server(host='localhost', port=8000,factory=_factory,banner=banner)
try:
await aioconsole.start_interactive_server(host='localhost', port=port,factory=_factory,banner=banner)
except OSError:
logger.warning('Unable to run console')
class HookType(Enum):
start = 0
stop = 1
class Engine(CoreEngine):
......@@ -51,18 +65,49 @@ class Engine(CoreEngine):
self.msg_factory = MessageFactory(key)
self._loop = None
self._hooks = []
self._watchdog_task = None
self._kill_counter = 0
self.running_event = asyncio.Event()
self.watchdog_event = asyncio.Event()
signal.signal(signal.SIGTERM, self.sigkill_handler)
signal.signal(signal.SIGINT, self.sigkill_handler)
def on_start(self,func,*args,**kwargs):
hook = Hook(HookType.start,func,*args,**kwargs)
self._hooks.append(hook)
def on_stop(self,func,*args,**kwargs):
hook = Hook(HookType.stop,func,*args,**kwargs)
self._hooks.append(hook)
async def run_hooks(self,hook_type):
hooks = list(filter(lambda hook: hook.type==hook_type,self._hooks))
if len(hooks)!=0:
logger.debug(f"Launching {hook_type} hooks")
for h in hooks:
await run_func(h.func,*h.args,**h.kwargs)
def get_loop(self):
if self._loop == None:
logger.warning('New event loop')
self._loop = asyncio.get_event_loop()
return self._loop
def all_tasks(self):
return list(asyncio.all_tasks(self.get_loop()))
async def process_timers(self):
"""Process all timers to find out which ones should be run"""
expire_list = []
now = time.time()
for t in self.timers:
if t.deadline < now:
try:
if asyncio.iscoroutinefunction(t.func):
self._loop.create_task(t.func())
else:
t.func()
await run_func(t.func)
except CallbackError as e:
logger.error(e.description)
if t.counter != -1:
......@@ -95,11 +140,8 @@ class Engine(CoreEngine):
msg = await self.receive_msg()
if msg:
for func in self.subscribers:
if asyncio.iscoroutinefunction(func):
await func(msg)
else:
func(msg)
self.process_attributesChange()
await run_func(func,msg)
self.process_attributes_change()
def handle_request(self, msg):
"""Filter msg for devices according default xAAL API then process the
......@@ -107,16 +149,15 @@ class Engine(CoreEngine):
"""
if not msg.is_request():
return
targets = filter_msg_for_devices(msg, self.devices)
targets = filter_msg_for_devices(msg, self.devices)
for target in targets:
if msg.action == 'is_alive':
self.send_alive(target)
else:
loop = self._loop
loop.create_task(self.on_method(msg, target))
self.get_loop().create_task(self.handle_action_request(msg, target))
async def on_method(self, msg, target):
async def handle_action_request(self, msg, target):
try:
result = await run_action(msg, target)
if result != None:
......@@ -126,86 +167,105 @@ class Engine(CoreEngine):
except XAALError as e:
logger.error(e)
async def loop(self):
logger.warning('Never call me')
async def wait_running(self):
while 1:
if self.state == EngineState.run:
return
await asyncio.sleep(0.1)
async def boot_task(self):
if self.state == EngineState.run:
return
self.state = EngineState.run
await self.network.connect()
#####################################################
# Tasks
#####################################################
async def start_task(self):
self.watchdog_event.clear()
# queue the alive before anything
for dev in self.devices:
self.send_alive(dev)
dev.update_alive()
await self.network.connect()
self.running_event.set()
self.state = EngineState.run
await self.run_hooks(HookType.start)
async def receive_task(self):
await self.wait_running()
await self.running_event.wait()
while self.state == EngineState.run:
await self.process_rx_msg()
async def send_task(self):
await self.wait_running()
await self.running_event.wait()
while self.state == EngineState.run:
temp = await self.__txFifo.get()
self.send_msg(temp)
async def timer_task(self):
await self.wait_running()
await self.running_event.wait()
self.setup_alives_timer()
while self.state == EngineState.run:
await asyncio.sleep(0.2)
await self.process_timers()
self.process_attributesChange()
self.process_attributes_change()
async def watchdog_task(self):
await self.watchdog_event.wait()
await self.stop()
logger.info('Exit')
#####################################################
# start / stop / shutdown
#####################################################
def start(self):
if self.state == EngineState.run:
logger.warning('Engine already started')
return
self.state = EngineState.start
if not self._loop:
self._loop = asyncio.get_event_loop()
loop = self._loop
loop.set_debug(True)
loop.create_task(self.boot_task(),name='Boot')
loop = self.get_loop()
#loop.set_debug(True)
loop.create_task(self.start_task(),name='Boot')
loop.create_task(self.receive_task(),name='RecvQ')
loop.create_task(self.send_task(),name='SendQ')
loop.create_task(self.timer_task(),name='Timers')
loop.create_task(console(locals()),name='Console')
self.setup_alives_timer()
def new_task(self,coro,name=None):
return self.get_loop().create_task(coro,name=name)
def setup_alives_timer(self):
for k in self.timers:
# needed on stop-start sequence
if k.func == self.process_alives:
if self.process_alives in [t.func for t in self.timers]:
return
self.add_timer(self.process_alives,10)
def stop(self):
async def stop(self):
await self.run_hooks(HookType.stop)
self.running_event.clear()
self.state = EngineState.halt
# cancel all tasks
for task in self.all_tasks():
if task!=self._watchdog_task:
task.cancel()
def run(self):
if self.state == EngineState.halt:
self.start()
self._loop.run_forever()
def all_tasks(self):
return list(asyncio.all_tasks(self._loop))
def sigkill_handler(self,signal,frame):
self._kill_counter +=1
self.shutdown()
if self._kill_counter > 1:
logger.warning('Force quit')
sys.exit(-1)
else:
print("", end = "\r") #remove the uggly ^C
logger.warning('Kill requested')
def shutdown(self):
loop = self._loop
logger.info('Shutdown in progress')
if not loop.is_running():
return
for task in self.all_tasks():
task.cancel()
loop.stop()
self.watchdog_event.set()
def run(self):
if self.state == EngineState.halt:
self.start()
# start the watchdog task
loop = self.get_loop()
self._watchdog_task=loop.create_task(self.watchdog_task(),name='Watchdog task')
loop.run_until_complete(self._watchdog_task)
#loop.run_forever()
#####################################################
# Debugging tools
#####################################################
def dump_timers(self):
table = PrettyTable(["Func","Period","Counter","Remaining"])
table = PrettyTable(["Func","Period","Counter","Remaining"],align='l')
now = time.time()
for t in self.timers:
remain = round(t.deadline-now,1)
......@@ -213,28 +273,49 @@ class Engine(CoreEngine):
print(table)
def dump_tasks(self):
table = PrettyTable(["Name","Coro","Loop"])
table = PrettyTable(["Name","Coro","Loop ID","Loop"],align='l')
for t in self.all_tasks():
table.add_row([t.get_name(),str(t.get_coro()),str(t.get_loop())])
table.add_row([t.get_name(),str(t.get_coro()),id(t.get_loop()), str(t.get_loop()) ])
print(table)
def dump_devices(self):
table = PrettyTable(["addr","dev_type","info"])
table = PrettyTable(["addr","dev_type","info"],align='l')
for d in self.devices:
table.add_row([d.address,d.dev_type,d.info])
print(table)
def dump_hooks(self):
table = PrettyTable(["Type","Hook"],align='l')
for h in self._hooks:
table.add_row([h.type,str(h.func)])
print(table)
def dump(self):
self.dump_devices()
self.dump_tasks()
self.dump_timers()
self.dump_hooks()
async def run_func(func,*args,**kwargs):
if asyncio.iscoroutinefunction(func):
return await func(*args,**kwargs)
else:
return func(*args,**kwargs)
async def run_action(msg,device):
""" Extract an action & launch it
Return:
- action result
- None if no result
Note: If an exception raised, it's logged, and raise an XAALError.
"""
method,params = search_action(msg,device)
result = None
try:
print(f"Running method {method}")
#print(f"Running method {method}")
if asyncio.iscoroutinefunction(method):
result = await method(**params)
else:
......@@ -243,3 +324,12 @@ async def run_action(msg,device):
logger.error(e)
raise XAALError("Error in method:%s params:%s" % (msg.action,params))
return result
class Hook(object):
def __init__(self,type_,func,*args,**kwargs):
self.type = type_
self.func = func
self.args = args
self.kwargs = kwargs
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please to comment