Skip to content
Snippets Groups Projects
Commit 2b43f647 authored by jkerdreu's avatar jkerdreu
Browse files

Add receive / send queue time to limit rate

git-svn-id: https://redmine.imt-atlantique.fr/svn/xaal/code/Python/trunk@2150 b32b6428-25c9-4566-ad07-03861ab6144f
parent 3be8d600
No related branches found
No related tags found
No related merge requests found
......@@ -13,6 +13,7 @@ DEF_PORT = 1235 # mcast port
DEF_HOPS = 10 # mcast hop
DEF_ALIVE_TIMER = 100 # Time between two alive msg
DEF_CIPHER_WINDOW = 60 * 2 # Time Window in seconds to avoid replay attacks
DEF_QUEUE_SIZE = 10 # How many packet we can send/receive in one loop
DEF_LOG_LEVEL = 'DEBUG' # should be INFO|DEBUG|None
DEF_LOG_PATH = '/var/log/xaal' # where log are
......@@ -39,6 +40,7 @@ def load_config(name='xaal.ini'):
self.hops = int(cfg.get('hops',DEF_HOPS))
self.alive_timer = int(cfg.get('alive_timer',DEF_ALIVE_TIMER))
self.cipher_window = int(cfg.get('ciper_window',DEF_CIPHER_WINDOW))
self.queue_size = int(cfg.get('queue_size',DEF_QUEUE_SIZE))
self.log_level = cfg.get('log_level',DEF_LOG_LEVEL)
self.log_path = cfg.get('log_path',DEF_LOG_PATH)
key = cfg.get('key',None)
......
......@@ -82,9 +82,14 @@ class Engine(object):
def process_tx_msg(self):
""" Process (send) message in tx queue called from the loop()"""
cnt = 0
while self.__txFifo:
temp = self.__txFifo.popleft()
self.send_msg(temp)
# try to limit rate
cnt = cnt + 1
if cnt > config.queue_size: break
def send_msg(self, msg):
"""Send an message to the bus, use queue_msg instead"""
......@@ -160,9 +165,6 @@ class Engine(object):
if attr.device not in devices.keys():
devices[attr.device] = {}
devices[attr.device][attr.name] = attr.value
else:
# ToDo: test if name keys still exist???
devices[attr.device][attr.name] = attr.value
for dev in devices:
self.send_notification(dev,"attributesChange",devices[dev])
......@@ -190,12 +192,29 @@ class Engine(object):
def remove_rx_hanlder(self,func):
self.rx_handlers.remove(func)
def process_rx_msg(self):
# ======== DEPRECATED ===================
def _process_rx_msg(self):
"""process incomming messages"""
msg = self.receive_msg()
if msg:
for func in self.rx_handlers:
func(msg)
# =========================================
def process_rx_msg(self):
"""process incomming messages"""
cnt = 0
msg = self.receive_msg()
while msg:
for func in self.rx_handlers:
func(msg)
self.process_attributesChange()
# only flush some message, to limit rate
cnt = cnt + 1
if cnt > config.queue_size:
time.sleep(0.02)
break
msg = self.receive_msg()
def handle_request(self, msg):
"""Filter msg for devices according default xAAL API then process the
......@@ -288,10 +307,10 @@ class Engine(object):
"""
# Process xAAL msg received, filter msg and process request
self.process_rx_msg()
# Process attributes change for devices
self.process_attributesChange()
# Process timers
self.process_timers()
# Process attributes change for devices due to timers
self.process_attributesChange()
# Process Alives
self.process_alives()
# Process xAAL msgs to send
......
......@@ -81,7 +81,7 @@ class NetworkConnector(object):
return packt
def __get_data(self):
r = select.select([self.__sock, ], [], [], 0.03)
r = select.select([self.__sock, ], [], [], 0.02)
if r[0]:
return self.receive()
return None
......
......@@ -27,7 +27,7 @@ class Device:
self.last_alive = int(time.time())
self.next_alive = 0
# Refresh rate
self.refresh = now() - REFRESH_RATE + random.randint(0,20)
self.refresh = 0
self.refresh_attributes = 0
self.refresh_description = 0
self.refresh_db = 0
......@@ -199,13 +199,12 @@ class Monitor:
self.engine.add_rx_handler(self.parse_msg)
self.engine.add_timer(self.auto_wash, 10)
self.engine.add_timer(self.send_isalive, 240)
self.engine.add_timer(self.refresh_devices, 2)
self.engine.add_timer(self.refresh_devices, 5)
def parse_msg(self, msg):
# do nothing for some msg
if (self.filter!=None) and self.filter(msg)==False:
return
if msg.source not in self.devices:
dev = self.add_device(msg)
self.notify(Notification.new_device,dev)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please to comment