Skip to content
Snippets Groups Projects
Commit c45d897e authored by jkerdreu's avatar jkerdreu
Browse files

- Linting !


git-svn-id: https://redmine.imt-atlantique.fr/svn/xaal/code/Python/branches/0.7@3027 b32b6428-25c9-4566-ad07-03861ab6144f
parent 2b4eb553
No related branches found
No related tags found
No related merge requests found
import colorsys
from xaal.schemas import devices
from xaal.lib import tools,Device
import logging
import copy
import functools
import gevent
from decorator import decorator
from xaal.schemas import devices
from xaal.lib import tools
logger = logging.getLogger(__name__)
def run(func, *args, **kwargs):
self = args[0]
if not self.lock.ready():
logger.warning("LOCKED waiting.. %s" % func)
logger.warning(f"LOCKED waiting.. {func}")
self.lock.wait()
self.lock.acquire()
try:
#logger.debug("Calling %s " % func)
# logger.debug("Calling %s " % func)
func(*args, **kwargs)
except Exception as e:
logger.warning(f"{self.bulb} {e} calling {func}")
......@@ -27,40 +27,40 @@ def run(func, *args, **kwargs):
@decorator
def spawn(func, *args, **kwargs):
ptr = functools.partial(run,func,*args,**kwargs)
ptr = functools.partial(run, func, *args, **kwargs)
gevent.spawn(ptr)
def properties_compare(orig, new):
r = {}
res = {}
for k in new.keys():
if k in orig.keys():
if new[k] != orig[k]:
r.update({k:new[k]})
return r
res.update({k: new[k]})
return res
class YeelightDev(object):
def __init__(self,bulb,cfg):
def __init__(self, bulb, cfg):
self.bulb = bulb
self.cfg = cfg
self.addr = tools.get_uuid(cfg.get('addr'))
self.dev = None
self.setup()
self.set_xaal()
logger.info('New device at %s : %s' % (bulb._ip,self.addr))
logger.info(f"New device at {bulb._ip} : {self.addr}")
# It's safer to use a lock to avoid the socket to be used w/ 2 greenlets at the same time.
# This can occurs on the device refresh
self.lock = gevent.lock.BoundedSemaphore(1)
#self.bulb.start_music()
# self.bulb.start_music()
def debug_properties(self,properties):
if not hasattr(self,'last_properties'):
def debug_properties(self, properties):
if not hasattr(self, 'last_properties'):
self.last_properties = properties
logger.debug(properties)
return
# for debugging only display changes
changes = properties_compare(self.last_properties,properties)
changes = properties_compare(self.last_properties, properties)
if changes:
logger.debug(changes)
self.last_properties = copy.copy(properties)
......@@ -113,7 +113,7 @@ class RGBW(YeelightDev):
dev.methods['set_hsv'] = self.set_hsv
dev.methods['set_white_temperature'] = self.set_white_temperature
dev.methods['set_mode'] = self.set_mode
dev.info = 'RGBW / %s' % self.addr
dev.info = f"RGBW / {self.addr}"
dev.attributes['hsv'] = [0, 0, 0]
dev.unsupported_attributes = ['scene']
dev.unsupported_methods = ['get_scene', 'set_scene']
......@@ -146,8 +146,7 @@ class RGBW(YeelightDev):
duration = int(_smooth)
else:
duration = int(self.cfg.get('smooth_default', 500))
if duration < 50:
duration = 50
duration = max(duration, 50)
self.bulb.turn_on()
self.bulb.duration = duration
self.bulb.set_hsv(h, s, v)
......@@ -159,7 +158,6 @@ class RGBW(YeelightDev):
self.dev.engine.add_timer(self.get_properties, target, 1)
self.dev.engine.add_timer(self.get_properties, target+0.5, 1)
@spawn
def set_hsv(self, _hsv, _smooth=None):
# FIXME
......@@ -173,8 +171,7 @@ class RGBW(YeelightDev):
duration = int(_smooth)
else:
duration = int(self.cfg.get('smooth_default', 500))
if duration < 50:
duration = 50
duration = max(duration, 50)
self.bulb.turn_on()
self.bulb.duration = duration
......@@ -210,12 +207,12 @@ class RGBW(YeelightDev):
self.debug_properties(props)
attrs = self.dev.attributes
# light state
power = props.get('power',None)
power = props.get('power', None)
if power:
if power == 'on' : attrs['light'] = True
if power == 'off': attrs['light'] = False
# color mode ?
mode = props.get('color_mode',None)
mode = props.get('color_mode', None)
if mode:
if mode == '2' : attrs['mode'] = 'white'
if mode == '1' : attrs['mode'] = 'color'
......@@ -229,14 +226,14 @@ class RGBW(YeelightDev):
if bright:
attrs['brightness'] = int(bright)
#hsv = list(attrs['hsv'])
#hsv[2] = round(int(bright)/100.0, 2)
#attrs['hsv'] = hsv
# hsv = list(attrs['hsv'])
# hsv[2] = round(int(bright)/100.0, 2)
# attrs['hsv'] = hsv
# Yeelight Python API provide both rgb and hsv values
# we parse both, even if we don't' issue set_hsv
# sat ?
# sat ?
# sat = props.get('sat',None)
# if sat:
# if sat:
# hsv = attrs['hsv']
# hsv[1] = (int(sat) / 100.0)
# attrs['hsv']=list(hsv)
......@@ -247,7 +244,6 @@ class RGBW(YeelightDev):
# hsv[0] = int(hue)
# attrs['hsv']=hsv
if rgb:
rgb = int(rgb)
r = (rgb >> 16) / 255
......@@ -259,4 +255,4 @@ class RGBW(YeelightDev):
s = hsv[1]
v = hsv[2]
attrs['hsv'] = [h, s, v]
#attrs['brightness'] = int(hsv[2] * 100)
# attrs['brightness'] = int(hsv[2] * 100)
......@@ -14,63 +14,62 @@ logger = logging.getLogger(PACKAGE_NAME)
# disable internal logging
logging.getLogger("yeelight").setLevel(logging.WARNING)
class GW(object):
def __init__(self,engine):
def __init__(self, engine):
self.engine = engine
self.devices = []
atexit.register(self._exit)
self.config()
self.setup()
self.refresh()
self.engine.add_timer(self.refresh,60)
self.engine.add_timer(self.refresh, 60)
def config(self):
cfg = tools.load_cfg(PACKAGE_NAME)
if not cfg:
cfg= tools.new_cfg(PACKAGE_NAME)
cfg = tools.new_cfg(PACKAGE_NAME)
cfg['devices'] = {}
logger.warn("Created an empty config file")
cfg.write()
self.cfg = cfg
def setup_(self):
logger.info("Searching for bulbs")
bulbs = yeelight.discover_bulbs()
cfg = self.cfg['devices']
for k in bulbs:
tmp = cfg.get(k['ip'],None)
tmp = cfg.get(k['ip'], None)
addr = None
if tmp:
addr = tools.get_uuid( tmp.get('addr',None) )
addr = tools.get_uuid(tmp.get('addr', None))
if not addr:
addr = tools.get_random_uuid()
cfg[k['ip']] = {'addr':str(addr)}
bulb = yeelight.Bulb(k['ip'],k['port'])
dev = devices.RGBW(bulb,cfg[k])
cfg[k['ip']] = {'addr': str(addr)}
bulb = yeelight.Bulb(k['ip'], k['port'])
dev = devices.RGBW(bulb, cfg[k])
self.engine.add_device(dev.dev)
def setup(self):
logger.info("Loading bulbs")
cfg = self.cfg['devices']
for k in cfg:
tmp = cfg.get(k,None)
tmp = cfg.get(k, None)
addr = None
if tmp:
addr = tools.get_uuid( tmp.get('addr',None) )
addr = tools.get_uuid(tmp.get('addr', None))
if not addr:
addr = tools.get_random_uuid()
cfg[k['ip']] = {'addr':str(addr)}
cfg[k['ip']] = {'addr': str(addr)}
bulb = yeelight.Bulb(k)
dev = devices.RGBW(bulb,cfg[k])
dev = devices.RGBW(bulb, cfg[k])
self.devices.append(dev)
self.engine.add_device(dev.dev)
def refresh(self):
for d in self.devices:
d.get_properties()
d.set_xaal()
for dev in self.devices:
dev.get_properties()
dev.set_xaal()
def _exit(self):
cfg = tools.load_cfg(PACKAGE_NAME)
......@@ -78,6 +77,7 @@ class GW(object):
logger.info('Saving configuration file')
self.cfg.write()
def setup(eng):
GW(eng)
return True
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment