Skip to content
Snippets Groups Projects
Commit 8b688a41 authored by jkerdreu's avatar jkerdreu
Browse files

Added message filter according to 0.7rev2 spec

git-svn-id: https://redmine.imt-atlantique.fr/svn/xaal/code/Python/branches/0.7@2761 b32b6428-25c9-4566-ad07-03861ab6144f
parent f9a89935
Branches
No related tags found
No related merge requests found
......@@ -110,7 +110,7 @@ class AsyncEngine(core.EngineMixin):
data = await self.network.get_data()
if data:
try:
msg = self.msg_factory.decode_msg(data)
msg = self.msg_factory.decode_msg(data,self.filter_func)
except MessageParserError as e:
logger.warning(e)
msg = None
......@@ -131,7 +131,6 @@ class AsyncEngine(core.EngineMixin):
"""
if not msg.is_request():
return
targets = core.filter_msg_for_devices(msg, self.devices)
for target in targets:
if msg.action == 'is_alive':
......
......@@ -18,7 +18,8 @@
# along with xAAL. If not, see <http://www.gnu.org/licenses/>.
#
from .messages import MessageType,MessageFactory
from xaal.lib.bindings import UUID
from .messages import MessageType,MessageFactory,ALIVE_ADDR
from .exceptions import *
import time
......@@ -29,19 +30,22 @@ logger = logging.getLogger(__name__)
class EngineMixin(object):
__slots__ = ['devices','timers','subscribers','__attributesChange','network','msg_factory']
__slots__ = ['devices','timers','subscribers','filter_func','_attributesChange','network','msg_factory']
def __init__(self,address,port,hops,key):
self.devices = [] # list of devices / use (un)register_devices()
self.timers = [] # functions to call periodic
self.subscribers = [] # message receive workflow
self.filter_func = None # message filter
self.__attributesChange = [] # list of XAALAttributes instances
self._attributesChange = [] # list of XAALAttributes instances
# network connector
self.network = None
# start msg worker
self.msg_factory = MessageFactory(key)
# filter function activated
self.enable_msg_filter()
#####################################################
# Devices management
......@@ -108,10 +112,37 @@ class EngineMixin(object):
"""Send a is_alive message, w/ dev_types filtering"""
body = {}
body['dev_types'] = dev_types
msg = self.msg_factory.build_msg(dev, [], MessageType.REQUEST, "is_alive", body)
msg = self.msg_factory.build_msg(dev, [ALIVE_ADDR,], MessageType.REQUEST, "is_alive", body)
self.queue_msg(msg)
#####################################################
# Messages filtering
#####################################################
def enable_msg_filter(self,func=None):
"""enable message filter"""
self.filter_func = func or self.default_msg_filter
def disable_msg_filter(self):
"""disable message filter"""
self.filter_func = None
def default_msg_filter(self,msg):
"""
Filter messages:
- check if message has alive request address
- check if the message is for us
return False, if message should be dropped
"""
# Alive request
if ALIVE_ADDR in msg.targets:
return True
# Managed device ?
for dev in self.devices:
if dev.address in msg.targets:
return True
return False
#####################################################
# Alive messages
#####################################################
......@@ -128,11 +159,11 @@ class EngineMixin(object):
#####################################################
def add_attributes_change(self, attr):
"""add a new attribute change to the list"""
self.__attributesChange.append(attr)
self._attributesChange.append(attr)
def get_attributes_change(self):
"""return the pending attributes changes list"""
return self.__attributesChange
return self._attributesChange
def process_attributes_change(self):
"""Processes (send notify) attributes changes for all devices"""
......@@ -145,7 +176,7 @@ class EngineMixin(object):
for dev in devices:
self.send_notification(dev,"attributes_change",devices[dev])
self.__attributesChange = [] # empty array
self._attributesChange = [] # empty array
#####################################################
# xAAL messages subscribers
......
......@@ -24,6 +24,7 @@ from .bindings import UUID
from .exceptions import MessageError,MessageParserError
from . import cbor
from enum import Enum
from tabulate import tabulate
import datetime
import pysodium
......@@ -33,7 +34,8 @@ import sys
import logging
logger = logging.getLogger(__name__)
from enum import Enum
ALIVE_ADDR=UUID('00000000-0000-0000-0000-000000000000')
class MessageFactory(object):
"""Message Factory:
......@@ -79,10 +81,12 @@ class MessageFactory(object):
pkt = cbor.dumps(result)
return pkt
def decode_msg(self, data):
def decode_msg(self, data,filter_func=None):
"""Decode incoming CBOR data and De-Ciphering
:param data: data received from the multicast bus
:type data: cbor
:filter_func: function to filter incoming messages
:type filter_func: function
:return: xAAL msg
:rtype: Message
"""
......@@ -103,6 +107,11 @@ class MessageFactory(object):
except KeyError:
raise MessageParserError("Bad Message, wrong fields")
# filter some messages
if filter_func is not None:
if not filter_func(msg):
return None # filter out the message
# Replay attack, window fixed to CIPHER_WINDOW in seconds
now = build_timestamp()[0] # test done only on seconds ...
if msg_time < (now - config.cipher_window):
......
......@@ -211,7 +211,10 @@ class Monitor:
self.devices = Devices()
self.filter = filter_func
self.subscribers = []
self.engine.subscribe(self.on_receive_msg)
# disable all engine filtering
self.engine.disable_msg_filter()
# only send isAlive message every 2 expirations
self.send_is_alive()
self.engine.add_timer(self.refresh_alives,REFRESH_TIMER)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment