Skip to content
Snippets Groups Projects
Commit 6f7b0ce4 authored by jkerdreu's avatar jkerdreu
Browse files

Better metadb server handling, but still have issue w/ timeout

git-svn-id: https://redmine.imt-atlantique.fr/svn/xaal/code/Python/branches/0.7@2808 b32b6428-25c9-4566-ad07-03861ab6144f
parent 87a21435
No related branches found
No related tags found
No related merge requests found
......@@ -5,6 +5,7 @@ If you're looking for simples examples, please check the legacy tools instead.
"""
import asyncio
import imp
from xaal.lib import AsyncEngine, Device,tools,helpers,config
import functools
import sys
......@@ -140,10 +141,13 @@ class ToolboxHelper(object):
# idle detector / force exit
self.exit_event = asyncio.Event()
self.last_msg_time = 0
# display time
self.start_time = 0
# db server
self.db_server = None
self.db_server_found = asyncio.Event()
# Let's start
self.setup_name()
......@@ -192,6 +196,9 @@ class ToolboxHelper(object):
dev = self.setup_device()
return (eng,dev)
#####################################################
# command line parsing
#####################################################
def setup_msg_parser(self):
self.engine.subscribe(self.parse_msg)
......@@ -199,6 +206,27 @@ class ToolboxHelper(object):
self.options, self.args = self.parser.parse_args()
return self.options,self.args
def get_filter_address(self):
addr = None
value = self.options.filter_address
if value:
addr = tools.get_uuid(value)
if addr == None:
self.error(f"Invalid address: {value}")
return addr
def get_filter_devtype(self):
dev_type = 'any.any'
value = self.options.filter_type
if value:
if not tools.is_valid_dev_type(value):
self.error("Invalid device type: %s" % value)
dev_type = value
return dev_type
#####################################################
# devices
#####################################################
def add_device(self,dev):
if dev.address:
if self.get_device(dev.address):
......@@ -220,6 +248,8 @@ class ToolboxHelper(object):
target.address = msg.source
target.dev_type = msg.dev_type
self.add_device(target)
if target.dev_type==None:
target.dev_type = msg.dev_type
if msg.is_get_attribute_reply():
target.attributes = msg.body
elif msg.is_get_description_reply():
......@@ -228,14 +258,44 @@ class ToolboxHelper(object):
target.alive = True
return target
def get_info(self,addr):
self.engine.send_get_description(self.device,[addr,])
self.engine.send_get_attributes(self.device,[addr,])
#####################################################
# db server
#####################################################
def find_db_callback(self,msg):
if not match_dev_type(msg,db_type):return
# new db server found
if msg.is_alive():
self.db_server = msg.source
self.db_server_found.set()
async def find_db_server(self):
self.engine.subscribe(self.find_db_callback)
self.engine.send_is_alive(self.device,dev_types=[db_type,])
try:
await asyncio.wait_for(self.db_server_found.wait(),timeout=0.3)
except asyncio.exceptions.TimeoutError:
print("No db server found")
else:
print("db server found %s" % self.db_server)
self.engine.unsubscribe(self.find_db_callback)
def query_db(self,addr):
self.engine.send_request(self.device,[self.db_server,],"get_keys_values",{'device':addr})
#####################################################
# start/stop/idle
#####################################################
def engine_wait(self,timeout=3):
""" run the engine until timeout """
self.engine.add_timer(self.quit,timeout)
self.engine.run()
#####################################################
# idle detection
#####################################################
def idle_callback(self,msg):
self.last_msg_time = now()
......@@ -261,24 +321,6 @@ class ToolboxHelper(object):
self.parser.print_help()
exit(1)
def get_filter_address(self):
addr = None
value = self.options.filter_address
if value:
addr = tools.get_uuid(value)
if addr == None:
self.error(f"Invalid address: {value}")
return addr
def get_filter_devtype(self):
dev_type = 'any.any'
value = self.options.filter_type
if value:
if not tools.is_valid_dev_type(value):
self.error("Invalid device type: %s" % value)
dev_type = value
return dev_type
def colorize(color,text):
return f"{color}{text}{style.RESET}"
......@@ -400,10 +442,7 @@ def info():
async def query_db_callback(msg):
if not match_dev_type(msg,db_type):return
if msg.is_alive():
eng.send_request(dev,[msg.source,],"get_keys_values",{'device':target})
# db server reply
elif msg.is_reply() and dev.address in msg.targets:
if msg.is_reply() and dev.address in msg.targets:
found = msg.body.get('device',None)
found_map = msg.body.get('map',None)
tmp = helper.get_device(found)
......@@ -412,20 +451,21 @@ def info():
tmp.address = found
helper.add_device(tmp)
tmp.db = found_map
print(tmp.db)
print("data: found")
display_quit(tmp)
async def run():
eng.send_is_alive(dev,dev_types=[db_type,])
await helper.find_db_server()
if helper.db_server:
helper.query_db(target)
eng.subscribe(query_db_callback)
eng.send_is_alive(dev,[target,])
eng.send_get_description(dev,[target,])
eng.send_get_attributes(dev,[target,])
helper.get_info(target)
# wait for device to complete
await helper.exit_event.wait()
helper.quit()
eng.subscribe(info_callback)
eng.subscribe(query_db_callback)
eng.on_start(run)
helper.engine_wait()
......@@ -448,16 +488,33 @@ def walker():
target = helper.parse_msg(msg)
if msg.is_alive():
if not target.ready():
target.db = {}
eng.send_get_description(target,[msg.source,])
eng.send_get_attributes(target,[msg.source,])
#target.db = {}
helper.get_info(msg.source)
helper.query_db(msg.source)
if target.ready():
target.display(color)
def start():
eng.send_is_alive(dev,dev_types=dev_type)
async def query_db_callback(msg):
if not match_dev_type(msg,db_type):return
if msg.is_reply() and dev.address in msg.targets:
found = msg.body.get('device',None)
found_map = msg.body.get('map',None)
tmp = helper.get_device(found)
if not tmp:
tmp = DeviceInfo()
tmp.address = found
helper.add_device(tmp)
tmp.db = found_map
print("data: found")
async def start():
await helper.find_db_server()
if helper.db_server:
eng.subscribe(query_db_callback)
eng.subscribe(walker_callback)
eng.send_is_alive(dev,dev_types=dev_type)
eng.on_start(start)
# idle detection
eng.new_task(helper.idle_detector())
......@@ -522,12 +579,7 @@ def query_db():
def query_db_callback(msg):
if not match_dev_type(msg,db_type):return
# new db server found
if msg.is_alive():
db_server = helper.parse_msg(msg)
eng.send_request(dev,[db_server.address,],"get_keys_values",{'device':target})
# db server reply
elif msg.is_reply() and dev.address in msg.targets:
if msg.is_reply() and dev.address in msg.targets:
found = msg.body.get('device',None)
found_map = msg.body.get('map',None)
if found == target:
......@@ -546,10 +598,14 @@ def query_db():
helper.exit_event.set()
async def run():
eng.send_is_alive(dev,dev_types=[db_type,])
await helper.find_db_server()
if helper.db_server:
helper.query_db(target)
# wait for device to complete
await helper.exit_event.wait()
helper.quit()
else:
helper.error("No db server found")
eng.subscribe(query_db_callback)
eng.on_start(run)
......@@ -562,31 +618,13 @@ def test():
helper = ToolboxHelper()
(options,args) = helper.parse()
(eng,dev) = helper.setup_basic()
db_server_found = asyncio.Event()
def test_callback(msg):
if not match_dev_type(msg,db_type):return
# new db server found
if msg.is_alive():
helper.db_server = msg.source
db_server_found.set()
async def search_db():
eng.send_is_alive(dev,dev_types=[db_type,])
# wait for device to complete
try:
await asyncio.wait_for(db_server_found.wait(),timeout=0.3)
except asyncio.exceptions.TimeoutError:
print("No db server found")
else:
print("db server found %s" % helper.db_server)
eng.unsubscribe(test_callback)
async def run():
await search_db()
await helper.find_db_server()
print("ready")
eng.subscribe(test_callback)
#eng.subscribe(test_callback)
eng.on_start(run)
eng.new_task(helper.idle_detector())
helper.engine_wait()
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please to comment