Skip to content
Snippets Groups Projects
Commit 6026c2e9 authored by jkerdreu's avatar jkerdreu
Browse files

- Added slots

- Added comments
- Clean some code


git-svn-id: https://redmine.imt-atlantique.fr/svn/xaal/code/Python/branches/0.7@2755 b32b6428-25c9-4566-ad07-03861ab6144f
parent 75fa932e
No related branches found
No related tags found
No related merge requests found
......@@ -8,7 +8,6 @@ from .aionetwork import AsyncNetworkConnector
from .exceptions import *
import time
from decorator import decorator
import asyncio
import aioconsole
......@@ -21,55 +20,23 @@ from pprint import pprint
import logging
logger = logging.getLogger(__name__)
import socketserver
def find_free_port():
with socketserver.TCPServer(("localhost", 0), None) as s:
return s.server_address[1]
@decorator
def spawn(func,*args,**kwargs):
return asyncio.get_event_loop().run_in_executor(None,func,*args,**kwargs)
async def console(locals=locals(),port=None):
if port == None:
port = find_free_port()
logger.info(f'loading console on port {port}')
import sys
sys.ps1 = '[xAAL] >>> '
locals.update({'pprint':pprint})
def _factory(streams):
return aioconsole.AsynchronousConsole(locals=locals, streams=streams)
banner = '=' * 78 +"\nxAAL remote console\n" + '=' *78
try:
await aioconsole.start_interactive_server(host='localhost', port=port,factory=_factory,banner=banner)
except OSError:
logger.warning('Unable to run console')
class HookType(Enum):
start = 0
stop = 1
class AsyncEngine(core.EngineMixin):
__slots__ = ['__txFifo','_loop','_hooks','_watchdog_task','_kill_counter','running_event','watchdog_event','started_event']
def __init__(self,address=config.address,port=config.port,hops=config.hops,key=config.key):
core.EngineMixin.__init__(self,address,port,hops,key)
self.subscribers =[self.handle_request] # message receive workflow
self.__txFifo = asyncio.Queue() # tx msg fifo
self._loop = None # event loop
self._hooks = [] # hooks
self._watchdog_task = None # watchdog task
self._kill_counter = 0 # watchdog counter
self._loop = None
self._hooks = []
self._watchdog_task = None
self._kill_counter = 0
self.running_event = asyncio.Event()
self.watchdog_event = asyncio.Event()
self.started_event = asyncio.Event()
self.running_event = asyncio.Event() # engine running event
self.watchdog_event = asyncio.Event() # watchdog event
self.started_event = asyncio.Event() # engine started event
signal.signal(signal.SIGTERM, self.sigkill_handler)
signal.signal(signal.SIGINT, self.sigkill_handler)
......@@ -77,8 +44,9 @@ class AsyncEngine(core.EngineMixin):
# start network
self.network = AsyncNetworkConnector(address, port, hops)
#####################################################
# Hooks
#####################################################
def on_start(self,func,*args,**kwargs):
hook = Hook(HookType.start,func,*args,**kwargs)
self._hooks.append(hook)
......@@ -106,6 +74,9 @@ class AsyncEngine(core.EngineMixin):
def all_tasks(self):
return list(asyncio.all_tasks(self.get_loop()))
#####################################################
# timers
#####################################################
async def process_timers(self):
"""Process all timers to find out which ones should be run"""
expire_list = []
......@@ -125,6 +96,17 @@ class AsyncEngine(core.EngineMixin):
for t in expire_list:
self.remove_timer(t)
#####################################################
# msg send / receive
#####################################################
def queue_msg(self, msg):
"""queue a message"""
self.__txFifo.put_nowait(msg)
def send_msg(self, msg):
"""Send an encoded message to the bus, use queue_msg instead"""
self.network.send(msg)
async def receive_msg(self):
"""return new received message or None"""
data = await self.network.get_data()
......@@ -137,14 +119,6 @@ class AsyncEngine(core.EngineMixin):
return msg
return None
def queue_msg(self, msg):
"""queue a message"""
self.__txFifo.put_nowait(msg)
def send_msg(self, msg):
"""Send an encoded message to the bus, use queue_msg instead"""
self.network.send(msg)
async def process_rx_msg(self):
"""process incomming messages"""
msg = await self.receive_msg()
......@@ -165,7 +139,7 @@ class AsyncEngine(core.EngineMixin):
if msg.action == 'is_alive':
self.send_alive(target)
else:
self.get_loop().create_task(self.handle_action_request(msg, target))
self.new_task(self.handle_action_request(msg, target))
async def handle_action_request(self, msg, target):
try:
......@@ -240,6 +214,7 @@ class AsyncEngine(core.EngineMixin):
self.add_timer(self.process_alives,10)
async def stop(self):
logger.info('Stopping engine')
await self.run_hooks(HookType.stop)
self.running_event.clear()
self.started_event.clear()
......@@ -249,13 +224,16 @@ class AsyncEngine(core.EngineMixin):
task.cancel()
def sigkill_handler(self,signal,frame):
print("", end = "\r") #remove the uggly ^C
if not self.is_running():
logger.warning('Engine already stopped')
self._kill_counter = 1
self._kill_counter +=1
self.shutdown()
if self._kill_counter > 1:
logger.warning('Force quit')
sys.exit(-1)
else:
print("", end = "\r") #remove the uggly ^C
logger.warning('Kill requested')
def shutdown(self):
......@@ -269,7 +247,6 @@ class AsyncEngine(core.EngineMixin):
loop = self.get_loop()
self._watchdog_task=loop.create_task(self.watchdog_task(),name='Watchdog task')
loop.run_until_complete(self._watchdog_task)
#loop.run_forever()
else:
logger.warning('Engine already running')
......@@ -306,7 +283,7 @@ class AsyncEngine(core.EngineMixin):
termtables.print(rows,header=header)
def dump_hooks(self):
header = ["N","Type","Hook"]
header = ["Type","Hook"]
rows = []
for h in self._hooks:
rows.append([h.type,str(h.func)])
......@@ -329,7 +306,11 @@ class AsyncEngine(core.EngineMixin):
return None
#####################################################
# Utilities functions
#####################################################
async def run_func(func,*args,**kwargs):
"""run a function or a coroutine function """
if asyncio.iscoroutinefunction(func):
return await func(*args,**kwargs)
else:
......@@ -337,17 +318,19 @@ async def run_func(func,*args,**kwargs):
async def run_action(msg,device):
""" Extract an action & launch it
"""
Extract an action & launch it
Return:
- action result
- None if no result
Note: If an exception raised, it's logged, and raise an XAALError.
Notes:
- If an exception raised, it's logged, and raise an XAALError.
- Same API as legacy Engine, but accept coroutine functions
"""
method,params = core.search_action(msg,device)
result = None
try:
#print(f"Running method {method}")
if asyncio.iscoroutinefunction(method):
result = await method(**params)
else:
......@@ -358,10 +341,44 @@ async def run_action(msg,device):
return result
#####################################################
# Hooks
#####################################################
class HookType(Enum):
start = 0
stop = 1
class Hook(object):
__slots__ = ['type','func','args','kwargs']
def __init__(self,type_,func,*args,**kwargs):
self.type = type_
self.func = func
self.args = args
self.kwargs = kwargs
#####################################################
# Debugging console
#####################################################
async def console(locals=locals(),port=None):
"""launch a console to enable remote engine inspection"""
if port == None:
# let's find a free port if not specified
def find_free_port():
import socketserver
with socketserver.TCPServer(("localhost", 0), None) as s:
return s.server_address[1]
port = find_free_port()
logger.warning(f'starting debug console on port {port}')
sys.ps1 = '[xAAL] >>> '
banner = '=' * 78 +"\nxAAL remote console\n" + '=' *78
locals.update({'pprint':pprint})
def factory(streams):
return aioconsole.AsynchronousConsole(locals=locals, streams=streams)
# start the console
try:
await aioconsole.start_interactive_server(host='localhost', port=port,factory=factory,banner=banner)
except OSError:
logger.warning('Unable to run console')
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment