Skip to content
Snippets Groups Projects
Commit a7d0733c authored by jkerdreu's avatar jkerdreu
Browse files

work in progress. code looks nice now. still have to deal w/ a lot of stuffs

git-svn-id: https://redmine.imt-atlantique.fr/svn/xaal/code/Python/branches/0.7@2472 b32b6428-25c9-4566-ad07-03861ab6144f
parent 1f14847a
Branches
No related tags found
No related merge requests found
......@@ -5,6 +5,8 @@ import time
import logging
import functools
import gevent
from decorator import decorator
import colorsys
# Tuya device aren't really network reliable, I get a bunch of connection reset
# This looks like they use a single socket for local network & cloud (SmartLife APP)
......@@ -17,18 +19,48 @@ logger = logging.getLogger(__name__)
#logging.getLogger('tuyaface').setLevel(logging.INFO)
# number of retry
RETRY_COUNTER=2
def get_dps(data,idx):
"""extract a given datapoint from a tuya dict received"""
if data:
dps = data.get('dps',{})
return dps.get(str(idx),None)
def color_to_hex(hsv):
"hue is 0 to 360, sat & brighness between 0 to 1"
# ensure we received a list
hsv = list(hsv)
hsv[0] = hsv[0] / 360.0
h,s,v = hsv
rgb = [int(i*255) for i in colorsys.hsv_to_rgb(h,s,v)]
# This code from the original pytuya lib
hexvalue = ""
for value in rgb:
temp = str(hex(int(value))).replace("0x","")
if len(temp) == 1:
temp = "0" + temp
hexvalue = hexvalue + temp
hsvarray = [int(hsv[0] * 360), int(hsv[1] * 255), int(hsv[2] * 255)]
hexvalue_hsv = ""
for value in hsvarray:
temp = str(hex(int(value))).replace("0x","")
if len(temp) == 1:
temp = "0" + temp
hexvalue_hsv = hexvalue_hsv + temp
if len(hexvalue_hsv) == 7:
hexvalue = hexvalue + "0" + hexvalue_hsv
else:
hexvalue = hexvalue + "00" + hexvalue_hsv
return hexvalue
@decorator
def spawn(func,*args,**kwargs):
def wrapper(*args,**kwargs):
gevent.spawn(func,*args,**kwargs)
return wrapper
gevent.spawn(func,*args,**kwargs)
def retry(cmd,*args,**kwargs):
"""
......@@ -42,11 +74,13 @@ def retry(cmd,*args,**kwargs):
if result:
return result
except ConnectionResetError:
logger.error('ConnectionResetError: w/ %s (%s)' % (args,cnt))
if cnt == RETRY_COUNTER:
logger.error('ConnectionResetError: w/ %s (%s)' % (args,cnt))
except OSError:
logger.error('OSError: w/ %s (%s)' % (args,cnt))
if cnt == RETRY_COUNTER:
logger.error('OSError: w/ %s (%s)' % (args,cnt))
# error
if cnt == 2: break
if cnt == RETRY_COUNTER: break
cnt = cnt+1
time.sleep(0.3)
......@@ -99,8 +133,17 @@ class TuyaDev:
return retry(tuya.status,self.tuya_info)
def set_state(self,state,idx=1):
return retry(tuya.set_state,self.tuya_info,state,idx)
data = retry(tuya.set_state,self.tuya_info,state,idx)
if data:
self.on_status(data)
return data
def set_status(self,data):
data = retry(tuya.set_status,self.tuya_info,data)
if data:
logger.warning(data)
self.on_status(data)
return data
class PowerRelay(TuyaDev):
......@@ -146,28 +189,63 @@ class RGBLamp(TuyaDev):
dev = devices.lamp_color(self.base_addr)
dev.methods['turn_on'] = self.turn_on
dev.methods['turn_off'] = self.turn_off
dev.methods['set_color_temperature'] = self.set_color_temperature
dev.methods['set_brightness'] = self.set_brightness
dev.methods['set_hsv'] = self.set_hsv
dev.methods['toggle'] = self.toggle
self.devices.append(dev)
def on_status(self,data):
print(data)
# state
result = get_dps(data,1)
if result!=None:
self.devices[0].attributes['light'] = result
# color / white
result = get_dps(data,2)
if result == 'colour':
self.devices[0].attributes['mode'] = 'color'
if result == 'white':
self.devices[0].attributes['mode'] = 'white'
# brightness
result = get_dps(data,3)
if result:
self.devices[0].attributes['brightness'] = result
# color_temperature
result = get_dps(data,4)
if result:
self.devices[0].attributes['color_temperature'] = result
@spawn
def set_color_temperature(self,_color_temperature):
value = int(_color_temperature)
self.set_status({2:'white',4:value})
@spawn
def set_brightness(self,_brightness,_smooth=0):
# smooth is not supported
value = int(_brightness)
self.set_status({2:'white',3:value})
@spawn
def set_hsv(self,_hsv,_smooth=0):
hsv = [int(k) for k in list(_hsv.split(','))]
result = color_to_hex(hsv)
self.set_status({2:'colour',5:result})
@spawn
def turn_on(self):
data = self.set_state(True,1)
self.on_status(data)
self.set_state(True,1)
@spawn
def turn_off(self):
data = self.set_state(False,1)
self.on_status(data)
self.set_state(False,1)
@spawn
def toggle(self):
tmp = self.status()
state = get_dps(tmp,1)
if state != None:
self.set_state(not state,1)
......@@ -79,7 +79,7 @@ class GW:
now = time.time()
for dev in self.devices.values():
if now > (dev.last_update + REFRESH_RATE):
#dev.update_status()
dev.update_status()
break # we only update one dev to avoid blocking
def _exit(self):
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment