Skip to content
Snippets Groups Projects
Commit d2e97fdf authored by jkerdreu's avatar jkerdreu
Browse files

- added async test.

- minor changes to the CBOR test




git-svn-id: https://redmine.imt-atlantique.fr/svn/xaal/code/Python/branches/0.7@2707 b32b6428-25c9-4566-ad07-03861ab6144f
parent 2267f36e
Branches
Tags
No related merge requests found
from xaal.lib.core import Engine
from xaal.lib.network import NetworkConnector
from xaal.lib.helpers import setup_console_logger
from xaal.schemas import devices
import functools
import asyncio
from decorator import decorator
@decorator
def spawn(func,*args,**kwargs):
print(f"Calling {func.__name__}")
asyncio.ensure_future(func(*args,**kwargs))
def spawn0(func):
@functools.wraps(func)
def spawn_future(*args,**kwargs):
print(f"Calling {func.__name__}")
asyncio.ensure_future(func(*args,**kwargs))
return spawn_future
class AioNetworkConnector(NetworkConnector):
async def receive(self):
data=NetworkConnector.receive(self)
#print(data)
print("receive")
return data
class AioEngine(Engine):
async def run(self):
self.start()
self.running = True
while self.running:
self.loop()
await asyncio.sleep(0)
def handler(data):
#print(data)
print("hanlder")
@spawn
async def toggle(dev):
print("Toggle")
await asyncio.sleep(2)
dev.attributes['light']=False if dev.attributes['light'] else True
@spawn
async def test(dev,_foo):
print(f"test.... {dev} {_foo}")
async def foo():
while 1:
print("Foo")
await asyncio.sleep(30)
async def test_receive():
net = AioNetworkConnector('224.0.29.200',1236,10)
net.connect()
while True:
await net.receive()
def main():
setup_console_logger()
#loop = asyncio.new_event_loop()
eng = AioEngine()
#eng.add_rx_handler(handler)
import sys
from xaal.lib import tools
dev = devices.lamp_toggle(tools.get_uuid(sys.argv[1]))
dev.info = 'FooBar'
eng.add_device(dev)
ptr = functools.partial(toggle,dev)
dev.methods['toggle'] = ptr
eng.add_timer(ptr,10)
ptr = functools.partial(test,dev)
dev.methods['test'] = ptr
tasks = [ asyncio.ensure_future(eng.run()),
# asyncio.ensure_future(foo()),
# asyncio.ensure_future(test_receive()),
]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
if __name__ == '__main__':
try:
main()
except KeyboardInterrupt:
print("Bye bye")
from xaal.lib import bindings, cbor
#import ipdb;ipdb.set_trace()
data = bindings.UUID.random()
tmp = cbor.dumps(data)
print(type(cbor.loads(tmp)))
"""
uuid = bindings.UUID.random()
url = bindings.URL("http://google.fr")
......@@ -14,4 +21,5 @@ dec = cbor.loads(tmp)
print(f"{tmp} => {dec}")
print(f"UUID: {type(dec[2]['device'])}")
print(f"URL: {type(dec[2]['url'])}")
\ No newline at end of file
print(f"URL: {type(dec[2]['url'])}")
"""
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment