Skip to content
Snippets Groups Projects
Commit bc0e8d9c authored by jkerdreu's avatar jkerdreu
Browse files

Oups ..



git-svn-id: https://redmine.imt-atlantique.fr/svn/xaal/code/Python/branches/0.7@2861 b32b6428-25c9-4566-ad07-03861ab6144f
parent fe59e940
No related branches found
No related tags found
No related merge requests found
from xaal.lib import NetworkConnector,config,cbor,core
class ParseError(Exception):pass
def incr(value,max_value,increment=1):
tmp = value + increment
if tmp > max_value:
raise ParseError("Unable to go forward, issue w/ packet lenght ?")
return tmp
def hexdump(data):
print("HEX:",end="")
for k in data:
print("0x%x" % k,end=", ")
def parse(data):
i = 0
size = len(data)
print("========= Headers ========")
header_size = data[i]
if header_size != 0x85:
raise ParseError("Wrong array in header: 0x%x" % header_size)
print("Array w/ size=5 (0x85) : 0x%x" %header_size)
i = incr(i,size)
ver = data[i]
if ver != 7:
raise ParseError("Wrong packet version: 0x%x" % ver)
print("Version: 0%x" % ver)
i = incr(i,size)
ts0_size = data[i]
if ts0_size not in [0x1a,0x1b]:
raise ParseError("Wrong timestamp part0 size: 0x%x" % ts0_size)
print("TS0 size (0x1a or 0x1b): 0x%x" % ts0_size )
if ts0_size == 0x1a:
i=incr(i,size,5)
ts0 = list(data[i-4:i])
print("TS0: ",end="")
hexdump(ts0)
print("=> %s" % cbor.loads(bytes(data[i-5:i])))
if ts0_size == 0x1b:
i=incr(i,size,9)
ts0 = list(data[i-8:i])
print("TS0: ",end="")
hexdump(ts0)
print("=> %s" % cbor.loads(bytes(data[i-9:i])))
ts1_size = data[i]
if ts1_size != 0x1a:
raise ParseError("Wrong timestamp part1 size: 0x%x" % ts1_size)
print("TS1 size (0x1a): 0x%x" % ts0_size )
i = incr(i,size,5)
ts1 = list(data[i-4:i])
print("TS1: ",end="")
hexdump(ts1)
print("=> %s" % cbor.loads(bytes(data[i-5:i])))
target_size = data[i]
hexdump(data[i:i+10])
#print("0x%x" % target_size)
print()
def main():
nc = NetworkConnector(config.address,config.port,config.hops)
while 1:
data = nc.get_data()
if data:
try:
parse(data)
except ParseError as e:
print("ERROR ==> %s" % e)
if __name__ == '__main__':
try:
main()
except KeyboardInterrupt:
print("Bye...")
\ No newline at end of file
# -*- coding: utf-8 -*-
#
# Copyright 2016 Jérôme Kerdreux, IMT Atlantique.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from xaal.lib import Engine, Device, MessageType
from xaal.lib import tools
from . import isalive
import sys
import json
import time
from .ansi2 import term
def usage():
print("xaal-querydb xxxx-xxxx-xxxx : display metadata for a given device")
class QueryDB:
def __init__(self,engine,db_servers):
self.eng = engine
self.db_servers = db_servers
# new fake device
self.addr = tools.get_random_uuid()
self.dev = Device("cli.experimental",self.addr)
self.eng.add_device(self.dev)
self.eng.add_rx_handler(self.parse_answer)
print("xAAL DB query [%s]" % self.addr)
def query(self,addr):
self.timer = 0
mf = self.eng.msg_factory
body = {'device': addr,}
msg = mf.build_msg(self.dev,self.db_servers, MessageType.REQUEST,'get_keys_values',body)
self.eng.queue_msg(msg)
while 1:
self.eng.loop()
if self.timer > 40:
print("TimeOut...")
break
self.timer += 1
print('\n')
def parse_answer(self,msg):
""" message parser """
if msg.is_reply():
if (self.addr in msg.targets) and (msg.action == 'get_keys_values'):
term('yellow')
print("%s => " % msg.source,end='')
print(msg.body)
term()
def main():
if len(sys.argv) == 2:
addr = tools.get_uuid(sys.argv[1])
if tools.is_valid_address(addr):
t0 = time.time()
eng = Engine()
eng.start()
db_servers = isalive.search(eng,'metadatadb.basic')
if len(db_servers) == 0:
print("No metadb server found")
return
dev = QueryDB(eng,db_servers)
dev.query(addr)
print("Time : %0.3f sec" % (time.time() -t0))
else:
print("Invalid addr")
else:
usage()
if __name__ == '__main__':
main()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment