Skip to content
Snippets Groups Projects
Commit b1d4de0f authored by jkerdreu's avatar jkerdreu
Browse files

Done with that.


git-svn-id: https://redmine.imt-atlantique.fr/svn/xaal/code/Python/branches/0.7@2934 b32b6428-25c9-4566-ad07-03861ab6144f
parent 0705265f
Branches
No related tags found
No related merge requests found
from xaal.schemas import devices
from xaal.lib import tools,Device
import logging
import colorsys
......@@ -10,17 +9,25 @@ def find_class(meross_dev):
if meross_dev.type == 'msl430':
return RGBLamp
return None
class MerossDev(object):
def __init__(self,meross_dev,base_addr):
self.meross = meross_dev
self.base_addr = base_addr
self.dev = None
self.embs = []
self.setup()
self.setup_xaal()
def setup(self):
logger.warning('Please overide setup()')
def setup_xaal(self):
for dev in self.embs:
dev.vendor_id = 'Meross'
dev.product_id = f"{self.meross.type} HW:{self.meross.hardware_version} FW:{self.meross.firmware_version}"
dev.info = self.meross.name
dev.hw_id = self.meross.uuid
class RGBLamp(MerossDev):
......@@ -36,25 +43,25 @@ class RGBLamp(MerossDev):
dev.methods['set_hsv'] = self.set_hsv
dev.methods['set_white_temperature'] = self.set_white_temperature
dev.methods['set_mode'] = self.set_mode
dev.methods['debug'] = self.debug
#dev.info = 'RGBW / %s' % self.addr
#dev.schema = 'https://redmine.telecom-bretagne.eu/svn/xaal/schemas/branches/schemas-0.7/lamp.color'
dev.attributes['hsv'] = [0,0,0]
dev.unsupported_attributes = ['scene']
dev.unsupported_methods = ['get_scene','set_scene']
self.dev = dev
dev.dump()
def debug(self):
import pdb;pdb.set_trace()
dev.del_attribute(dev.get_attribute('scene'))
self.embs.append(dev)
async def update(self):
self.dev.attributes['light'] = self.meross.is_on()
self.dev.attributes['brightness'] = self.meross.get_luminance()
#await self.meross.async_update()
dev = self.embs[0]
dev.attributes['light'] = self.meross.is_on()
dev.attributes['brightness'] = self.meross.get_luminance()
temp = (self.temp_max - self.temp_min) / 100 * self.meross.get_color_temperature() + self.temp_min
self.dev.attributes['white_temperature'] = round(temp)
dev.attributes['white_temperature'] = round(temp/100,1) * 100
rgb = self.meross.get_rgb_color()
hsv = colorsys.rgb_to_hsv(rgb[0]/255,rgb[1]/255,rgb[2]/255)
h = round(hsv[0] * 360)
s = round(hsv[1],2)
v = round(hsv[2],2)
dev.attributes['hsv'] = (h,s,v)
async def turn_on(self):
await self.meross.async_turn_on()
......@@ -80,19 +87,25 @@ class RGBLamp(MerossDev):
else:
hsv = _hsv
h,s,v = hsv
rgb=tuple(round(i * 255) for i in colorsys.hsv_to_rgb(h,s,v))
rgb=tuple(round(i * 255) for i in colorsys.hsv_to_rgb(h/360.0,s,v))
await self.meross.async_set_light_color(rgb=rgb)
await self.update()
self.embs[0].attributes['mode'] = 'color'
async def set_white_temperature(self,_white_temperature):
temp = int(_white_temperature)
#
value = int((temp - self.temp_min) / (self.temp_max - self.temp_min) * 100)
if value <= 1: value = 1
if value >= 100: value = 100
await self.meross.async_set_light_color(temperature=value)
await self.update()
self.embs[0].attributes['mode'] = 'white'
async def set_mode(self,_mode):
pass
dev = self.embs[0]
if _mode == 'color':
await self.set_hsv(dev.attributes['hsv'])
if _mode == 'white':
await self.set_white_temperature(dev.attributes['white_temperature'])
......@@ -20,12 +20,17 @@ class GW(object):
self.devices = []
self.config()
engine.on_start(self.setup)
engine.on_stop(self._exit)
# refresh devices state
engine.add_timer(self.refresh,60)
# dis
engine.add_timer(self.discover,60)
def config(self):
cfg = tools.load_cfg(PACKAGE_NAME)
if not cfg:
cfg= tools.new_cfg(PACKAGE_NAME)
cfg['config'] = {'addr': tools.get_random_uuid(),'login':'','password':''}
cfg['devices'] = {}
logger.warn("Created an empty config file")
cfg.write()
......@@ -43,32 +48,42 @@ class GW(object):
self.client = await MerossHttpClient.async_from_user_password(email=login, password=passwd)
self.manager = MerossManager(http_client=self.client)
await self.manager.async_init()
await self.discover()
async def discover(self):
# discovery
await self.manager.async_device_discovery()
meross_devices = self.manager.find_devices()
# config
devices_config = self.cfg.get('devices',{})
# devices
uuids = [d.meross.uuid for d in self.devices]
for m_dev in meross_devices:
# skip devices already known
if m_dev.uuid in uuids:
continue
# find a class to handle this device
klass = bindings.find_class(m_dev)
addr = tools.get_random_uuid()
#addr = tools.get_uuid('ca61fd70-b0fd-11ec-8fe3-d6bd5fe18737')
dev = klass(m_dev,addr)
if not klass:
logger.warn(f"No binding for {m_dev.type}")
continue
# search config
conf = devices_config.get(m_dev.uuid,None)
if not conf:
logger.info(f"Found a new device {m_dev.type} {m_dev.uuid}")
base_addr = tools.get_random_base_uuid()
devices_config.update({m_dev.uuid:{'base_addr':base_addr}})
devices_config.inline_comments[m_dev.uuid] = m_dev.type
else:
base_addr = tools.get_uuid(conf.get('base_addr',None))
# create device
dev = klass(m_dev,base_addr)
# poll the current state
await dev.meross.async_update()
await dev.update()
# register device
self.devices.append(dev)
self.engine.add_device(dev.dev)
# cfg = self.cfg['devices']
# for k in cfg:
# tmp = cfg.get(k,None)
# addr = None
# if tmp:
# addr = tools.get_uuid( tmp.get('addr',None) )
# if not addr:
# addr = tools.get_random_uuid()
# cfg[k['ip']] = {'addr':str(addr)}
# bulb = yeelight.Bulb(k)
# dev = devices.RGBW(bulb,cfg[k])
# self.devices.append(dev)
# self.engine.add_device(dev.dev)
self.engine.add_devices(dev.embs)
async def refresh(self):
for d in self.devices:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment