Skip to content
Snippets Groups Projects
Commit e80270da authored by ptangu01's avatar ptangu01
Browse files

mv receiveMSG to processRxMSG in order to add the mainHandler method which...

mv receiveMSG to processRxMSG in order to add the mainHandler method which will be used to add customize message handlers.

git-svn-id: https://redmine.imt-atlantique.fr/svn/xaal/code/Python/trunk@1347 b32b6428-25c9-4566-ad07-03861ab6144f
parent 2e598872
Branches
Tags
No related merge requests found
......@@ -35,7 +35,7 @@ import logging
logger=logging.getLogger(__name__)
class Engine:
class Engine(object):
def __init__(self,address=config.ADDR,port=config.PORT,hops=config.HOPS,passphrase=config.PASSPHRASE):
cipherKey=tools.pass2key(passphrase)
......@@ -47,6 +47,9 @@ class Engine:
self.__attributesChange = [] # list of XAALAttributes instances
self.__txFifo = deque() # tx msg fifo
self.__alives = [] # list of alive devices
self.__tasks = []
self._periodicCallbacks = []
self._callbacks = []
#####################################################
# Attributes
......@@ -80,26 +83,27 @@ class Engine:
self.delAlive(dev)
# Fifo for msg to send
def queue(self,msg):
def queueTx(self,msg):
self.__txFifo.append(msg)
#####################################################
# xAAL messages Rx/Tx handling
#####################################################
def receiveMSG(self):
result = None
data = self.__nc.getData()
msg = None
if data:
try:
msg = self.__mf.decodeMSG(data)
except MessageFactoryParserError as e:
logger.error(e.message)
return msg
result = msg
return result
def processRxMsg(self,msg):
# Default
if msg.getMsgtype() == 'request':
self.handleRequest(msg)
def processRxMsg(self):
msg = self.receiveMSG()
if msg:
self.mainHandler(msg)
def processTxMsg(self):
if self.__txFifo:
......@@ -111,20 +115,20 @@ class Engine:
def sendReply(self,dev,targets,action,body=None):
msg = self.__mf.buildMSG(dev,targets,'reply',action,body)
self.queue(msg)
self.queueTx(msg)
def sendError(self,dev,errcode,description=None):
msg=self.__mf.buildErrorMSG(dev,errcode,description)
self.queue(msg)
self.queueTx(msg)
#####################################################
# Alive messages
#####################################################
def delAlive(self,dev):
"""remove dev from the alives list of tuples (dev,alives)
:param dev: instance of xAAL device
:type dev: Device"""
:type dev: Device
"""
for i, (d, t) in enumerate(self.__alives):
if d == dev:
del self.__alives[i]
......@@ -133,7 +137,8 @@ class Engine:
def upAlive(self,dev):
"""update alive for device dev from the alives list of tuples (dev,aliveTimeout)
:param dev: instance of xAAL device
:type dev: Device"""
:type dev: Device
"""
for i, (d, t) in enumerate(self.__alives):
if d == dev:
nextAlive = time.time() + dev.getAlivePeriod()
......@@ -141,20 +146,20 @@ class Engine:
break
def sendAlive(self,dev):
""" send a Alive message for a given device """
"""Send a Alive message for a given device"""
timeout = dev.getTimeout()
msg = self.__mf.buildAlivefor(dev,timeout)
self.queue(msg)
self.queueTx(msg)
def sendIsAlive(self,dev,devTypes):
""" send a isAlive message, w/ devTypes filtering"""
"""Send a isAlive message, w/ devTypes filtering"""
body={}
body['devTypes'] = devTypes
msg=self.__mf.buildMSG(dev,[],"request","isAlive",body)
self.queue(msg)
self.queueTx(msg)
def processAlives(self):
""" periodic sending alive messages """
"""Periodic sending alive messages"""
now = time.time()
for i,(dev,t) in enumerate(self.__alives):
if t < now:
......@@ -185,25 +190,28 @@ class Engine:
for dev in devices:
msg = self.__mf.buildMSG(dev,[],"notify","attributesChange",devices[dev])
self.queue(msg)
self.queueTx(msg)
self.__attributesChange = [] # empty array
#####################################################
# Request handling
# xAAL messages handlers
#####################################################
def mainHandler(self,msg):
#default
self.handleRequest(msg)
def handleRequest(self,msg):
"""
filter msg for devices according default xAAL API then process the
"""Filter msg for devices according default xAAL API then process the
request for each targets identied in the engine
"""
if msg.getMsgtype() == 'request':
targets = filterMsgForDevices(msg,self.getDevices())
if targets:
self.processRequest(msg,targets)
def processRequest(self,msg,targets):
"""
Processes request by device and add related response
"""Processes request by device and add related response
if reply necessary in the Tx fifo
Note: xAAL attributes change are managed separately
......@@ -215,8 +223,7 @@ class Engine:
self.handleMethodRequest(msg,target)
def handleMethodRequest(self,msg,target):
"""
Run method (xAAL exposed method) on device:
"""Run method (xAAL exposed method) on device:
- None is returned if device method do not return anything
- result is returned if device method gives a response
- Errors are raised if an error occured:
......@@ -242,29 +249,65 @@ class Engine:
def getCipherKey(self):
return self.getMessageFactory().getCipherKey()
#####################################################
# Callback
#####################################################
def addPeriodicCallback(self,callback,period):
_next = time.time()+period
self._periodicCallbacks.append((callback,period,_next))
def processPeriodicCallbacks(self):
if self._periodicCallbacks:
now = time.time()
for i,(callback,p,t) in enumerate(self._periodicCallbacks):
if t < now:
try:
callback()
except CallbackError as e:
logger.error(e.message)
_next = now + p
self.__callbacks[i]=(callback,p,_next)
def addCallback(self,callback):
self._callbacks.append(callback)
def processCallbacks(self):
if self._callbacks:
for callback in self._callbacks:
try:
callback()
except CallbackError as e:
logger.error(e.message)
#####################################################
# Mainloops & run ..
#####################################################
def loop(self):
"""
Process incomming xAAL msg
"""Process incomming xAAL msg
Process attributes change for device
Process periodic callback
Process callback
Process isAlive for device
Send msg from the Tx Buffer
"""
msg = self.receiveMSG()
# Process xAAL msg received, filter msg and process request
if msg:
self.processRxMsg(msg)
self.processRxMsg()
# Process attributes change for devices
self.processAttributesChange()
# Manage devices alive
# Process Periodic Callbacks
self.processPeriodicCallbacks()
# Process Callbacks
self.processCallbacks()
# Process Alives
self.processAlives()
# Send messages
# Process xAAL msg to send
self.processTxMsg()
def start(self):
......@@ -285,8 +328,7 @@ class Engine:
def filterMsgForDevices(msg,devices):
"""
loop throught the devices, to find which are
"""loop throught the devices, to find which are
expected w/ the msg
- Filter on devTypes for isAlive request.
......@@ -315,8 +357,7 @@ def filterMsgForDevices(msg,devices):
return results
def runAction(msg,device):
"""
Extract & run an action (match with exposed method) from a msg on the selected device.
"""Extract & run an action (match with exposed method) from a msg on the selected device.
Return:
- None
- result from method if method return something
......@@ -351,3 +392,11 @@ def getArgumentsForMethod(method):
pass
return args.args
class CallbackError(Exception):
def __init__(self,code,desc):
self.code = code
self.description = desc
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please to comment