Skip to content
Snippets Groups Projects
Commit a7a1c52b authored by jkerdreu's avatar jkerdreu
Browse files

After some pull-request on tuyaface. Everything looks fine now.

The last step is temperature range, and fini !

git-svn-id: https://redmine.imt-atlantique.fr/svn/xaal/code/Python/branches/0.7@2476 b32b6428-25c9-4566-ad07-03861ab6144f
parent 78a36a9d
No related branches found
No related tags found
No related merge requests found
from tuyaface import aescipher
KEY='f598419fef804b7f'
a = b'\x00\x00U\xaa\x00\x00\x00\x00\x00\x00\x00\n\x00\x00\x00\xfc\x00\x00\x00\x00\x11\xa6\x9a8\xcdN>L\x0cu\x84\x18\x87\xa6\x0b"\xe4\xfc#{:\x18N>\n\x87\xa1q\xf0*3\r\x17\x8d\xc4\x15\xcc\xc6+\x9e\x80&\xe1">a=\xbe\x8bc\xd6\x1e\xc5\xb4\xdf\xfe\x7f\x9d\x9f\x83\xd3\xb7\xe2\xd5\xbc\xeb\xed\xe9\x00\xbcN]j\x90\xe19\xc9/<\xae\xb8\xe4Dt\xf3]\xc0Kg\x01"\x01*\x11a\x9c\xd3\xac\xf3\x9ah\xd6\x96\xb5\xe4\xd2\xd1\xec\x8b\x04\xe7U\xde\xcd\x88\xf6\xc3nP\xa6\xe6\xf4\xef\x9a\xe1\xe8\xa2!\xb1\xcc\t_\xe5\x03\x1cv\x96E\x9a\x0ccn\x95\x8en\xfd\x9a\xe5\xfc\xe4|U\x07\x8e\xb5U\x04\xd6R\x9e\xfd\x93@\x9e\xcf\'\xd5\xdf3}\xd4A\x9bH|\xc5@\xa5@\x83\xc7\x15\x94\x98!K\x04\r\x08\xf8\xcf\xe0\x9f\xd3\x1c9\xaf\xc6\xa1\xdb\x137u\xdc\x9c\xf9\x19\x0b\x07\xdb\x85Z"\xfe\x8a\xf3\xba\x8f\x0b\xd0NLb\xc9\xeb7\xec\x80.LU5v\xfb\xc8\x1btx\xe6\x0c7\xf4BU\x00\x00\xaaU'
b = b'\x00\x00U\xaa\x00\x00\x00\x00\x00\x00\x00\n\x00\x00\x01\x0c\x00\x00\x00\x00\x11\xa6\x9a8\xcdN>L\x0cu\x84\x18\x87\xa6\x0b"\xe4\xfc#{:\x18N>\n\x87\xa1q\xf0*3\r\xc7*\x8a\xc3r\x16\x17+C\x82\xe6\x11K\x07\xdfo\xa7\x0fy\xf8\x1b\x8e\xf2\xfd\x9d\xdd\x9d\xd5\x01\xf4#p%\xb6hPK$j\xec\xaf\x18kJ\x9eE\xfd\xb5\x89\xe3I8\x19\xc7\x84\xf3\x87\xbb\x03d\x05C%\x07qP\xd9\x81\xd87\x18^\xdc\x96#e\xf8\x9f\x94\xe2\x18\xaa\xbc\xf4\xc2\x8b%\xa8q\xe9\xe1\xcd\x07x\x0e\x13\x07\x81[\x12\xf9\xe6\xc8ba\x18l\xaa\xc5\x80~\xb4\xa3\xebH\xef\xea\x99\x92\x07\x9a\xb9s\xbe\xef\xc8e\xd77\xe1G\x97+\xf1\xfbo\xec\x17\xbf\xd3,Q\x95\xecT\xa1\x04\xf0z\x071\xb1\xbb\xa3V\xcaw\xc5!\xd1k\x0eR\x07\x8b\xc2\xde\x8f\xdc\x1fT\xd6\x8fF\xce\x96\xda\x1fyG=\xdd?|\x8b4\x19\x18\xa2\xe3\xd5\xde\x97\x1c,\x12>\x11\xdan\x19\xb1\x8b\x01K\x91\x84\x9e\xde\x0e${\n\xc9Q\x16v4\x818\x83i\'5\x14A\xdc\x86\x00\x00\xaaU'
def data(sbytes):
print(f"sbytes: {len(sbytes)}")
mark = sbytes[15:16]
tmp = 268
print(f"mark: {mark} => {int.from_bytes(mark,byteorder='little')}")
data = sbytes[20:8+tmp]
print(f"data len {len(data)}")
#print(data)
print(aescipher.decrypt(KEY, data, False))
def hl():
print("=" * 78)
hl()
#data(a)
hl()
data(b)
import bpdb;bpdb.set_trace()
\ No newline at end of file
from gevent import monkey;monkey.patch_all()
import gevent
import tuyaface as tuya
import time
import colorsys
import coloredlogs
coloredlogs.install('DEBUG')
import logging
logger = logging.getLogger(__name__)
# lamp LSC
d1 = { 'ip' : '192.168.1.65', 'deviceid' : 'bf017829869d45e15dld7g','localkey' : 'efb36814ce74ee7d', 'protocol' : '3.3' }
# SHP6-test
d2 = { 'ip' : '192.168.1.63','deviceid' : '142200252462ab419539','localkey' : '6eeec6c9eeeff233','protocol' : '3.3'}
# alĥaplug-2
d3 = { 'ip' : '192.168.1.61','deviceid' : '74012545dc4f229f058f','localkey' : 'e3e182a7491f2d44','protocol' : '3.3'}
# shp-7
d4 = { 'ip' : '192.168.1.66', 'deviceid' : '42361312d8f15bd5a25d','localkey' : 'af6430180fd6dde7', 'protocol' : '3.3' }
d0 = { 'ip' : '192.168.1.60', 'deviceid' : '42361312d8f15bd5a25d','localkey' : 'af6430180fd6dde7', 'protocol' : '3.3' }
devices = [d1,d2,d3,d4]
def color_to_hex(hsv):
"hue is 0 to 360, sat & brighness between 0 to 1"
# ensure we received a list
hsv = list(hsv)
hsv[0] = hsv[0] / 360.0
h,s,v = hsv
rgb = [int(i*255) for i in colorsys.hsv_to_rgb(h,s,v)]
# This code from the original pytuya lib
hexvalue = ""
for value in rgb:
temp = str(hex(int(value))).replace("0x","")
if len(temp) == 1:
temp = "0" + temp
hexvalue = hexvalue + temp
hsvarray = [int(hsv[0] * 360), int(hsv[1] * 255), int(hsv[2] * 255)]
hexvalue_hsv = ""
for value in hsvarray:
temp = str(hex(int(value))).replace("0x","")
if len(temp) == 1:
temp = "0" + temp
hexvalue_hsv = hexvalue_hsv + temp
if len(hexvalue_hsv) == 7:
hexvalue = hexvalue + "0" + hexvalue_hsv
else:
hexvalue = hexvalue + "00" + hexvalue_hsv
return hexvalue
def _toggle(dev,idx=1):
status = tuya.status(dev)
logger.info(status)
time.sleep(0.1)
state = get_dps(status,idx)
return tuya.set_state(dev,not state,idx)
def run_cmd(cmd,*args,**kwargs):
cnt = 1
while 1:
try:
result = cmd(*args,**kwargs)
if result:
return result
except ConnectionResetError:
logger.error('ConnectionResetError: w/ %s (%s)' % (args,cnt))
except OSError:
logger.error('OSError: w/ %s (%s)' % (args,cnt))
# error
if cnt == 2: break
cnt = cnt+1
time.sleep(0.3)
def get_dps(data,idx):
if data:
dps = data.get('dps',{})
return dps.get(str(idx),None)
def toggle(dev,idx=1):
result = run_cmd(_toggle,dev,idx)
state = get_dps(result,idx)
print("%s[%s] => %s" % (dev['ip'],idx,state))
return state
def status(dev):
result = run_cmd(tuya.status,dev)
return result
def on(dev,idx=1):
return run_cmd(tuya.set_state,dev,True,idx)
def off(dev,idx=1):
return run_cmd(tuya.set_state,dev,False,idx)
def set_status(dev,data):
return run_cmd(tuya.set_status,dev,data)
def loop():
for i in range (0,10):
logger.warning('Loop')
time.sleep(0.5)
def async_toggle(dev,idx=1):
ev = gevent.spawn(toggle,dev).link(result)
#return result.get_nowait()
# if state:
# tuya.set_status(device,{"1":False})
# else:
# tuya.set_status(device,{"1":True,"2":'white',"3":25,"4":25})
# for i in range(25,255,5):
# tuya.set_status(device,{"1":True,"2":'white',"3":i,"4":100})
# time.sleep(2)
time.sleep(0.3)
#logger.info()
#logger.info(tuya.status(device))
import ac
#import pdb;pdb.set_trace()
import bpdb;bpdb.set_trace()
\ No newline at end of file
......@@ -16,7 +16,7 @@ import colorsys
logger = logging.getLogger(__name__)
# disable tuyaface debug
#logging.getLogger('tuyaface').setLevel(logging.INFO)
#logging.getLogger('tuyaface').setLevel(logging.ERROR)
# number of retry
......@@ -24,13 +24,24 @@ RETRY_COUNTER=2
def get_dps(data,idx):
"""extract a given datapoint from a tuya dict received"""
"""
Extract a given datapoint from a tuya dict received
Args:
data: received dps dict
idx: index
"""
if data:
dps = data.get('dps',{})
return dps.get(str(idx),None)
def color_to_hex(hsv):
"hue is 0 to 360, sat & brighness between 0 to 1"
"""
Converts the hsv list in a hexvalue.
Args:
hue is 0 to 360, sat & brighness between 0 to 1"
"""
# ensure we received a list
hsv = list(hsv)
hsv[0] = hsv[0] / 360.0
......@@ -58,6 +69,20 @@ def color_to_hex(hsv):
hexvalue = hexvalue + "00" + hexvalue_hsv
return hexvalue
def hexvalue_to_hsv(hexvalue):
"""
Converts the hexvalue used by tuya for colour representation into
an HSV value.
Args:
hexvalue(string): The hex representation generated by BulbDevice._rgb_to_hexvalue()
"""
h = int(hexvalue[7:10], 16)
s = int(hexvalue[10:12], 16) / 255
v = int(hexvalue[12:14], 16) / 255
return (h, s, v)
@decorator
def spawn(func,*args,**kwargs):
gevent.spawn(func,*args,**kwargs)
......@@ -79,10 +104,14 @@ def retry(cmd,*args,**kwargs):
except OSError:
if cnt == RETRY_COUNTER:
logger.error('OSError: w/ %s (%s)' % (args,cnt))
except ValueError:
if cnt == RETRY_COUNTER:
logger.error('ValueError: w/ %s (%s)' % (args,cnt))
# error
if cnt == RETRY_COUNTER: break
cnt = cnt+1
time.sleep(0.3)
time.sleep(0.2)
return {}
def now():
......@@ -125,15 +154,23 @@ class TuyaDev:
if len(self.devices) > 1:
dev.group_id = self.base_addr + 0xff
#debug method / usefull to start the debugging a device
#dev.methods['debug'] = self.debug
def debug(self):
import pdb;
pdb.set_trace()
def setup(self):
logger.warning('Please override setup()')
@spawn
def update_status(self):
# update last_update early to avoid subsequent calls
self.last_update = now()
data=self.status()
if data:
self.on_status(data)
self.last_update = now()
#========================================================================
# Tuya wrappers
......@@ -141,6 +178,13 @@ class TuyaDev:
def status(self):
return retry(tuya.status,self.tuya_info)
def on_status(self,data):
# My first thought was to call on_status() only when valid dps is found.
# but Tuya protocol is so buggy that some cases may occurs where the dps
# is wrong, and status ~ Ok. So every on_status() handler should take care
# of this
logger.warning('Please implement on_status in your class')
def set_state(self,state,idx=1):
data = retry(tuya.set_state,self.tuya_info,state,idx)
if data:
......@@ -150,7 +194,7 @@ class TuyaDev:
def set_status(self,data):
data = retry(tuya.set_status,self.tuya_info,data)
if data:
logger.warning(data)
#logger.debug(data)
self.on_status(data)
return data
......@@ -194,8 +238,8 @@ class PowerRelay(TuyaDev):
class SmartPlug(PowerRelay):
# I tried to schedule a update after on/off change
# but this doesn't work cause tuya power compute is
# really slow. more than 10 so, not really usefull
# but this doesn't work because tuya power compute is
# really slow. more than 10sec so, not really usefull
def setup(self):
self.pmeter_dps = self.cfg.get('pmeter_dps',['4','5','6'])
......@@ -212,7 +256,7 @@ class SmartPlug(PowerRelay):
def on_status(self,data):
PowerRelay.on_status(self,data)
pmeter_attr = self.devices[0].attributes
# extract current / power / voltage from
current = get_dps(data,self.pmeter_dps[0])
if current!=None:
pmeter_attr['current'] = int(current) / 1000
......@@ -221,7 +265,7 @@ class SmartPlug(PowerRelay):
pmeter_attr['power'] = int(power) / 10
voltage = get_dps(data,self.pmeter_dps[2])
if voltage!=None:
pmeter_attr['voltage'] = int(voltage) / 10
pmeter_attr['voltage'] = round(int(voltage) / 10)
class RGBLamp(TuyaDev):
......@@ -256,6 +300,10 @@ class RGBLamp(TuyaDev):
result = get_dps(data,4)
if result:
attrs['color_temperature'] = result
# color value (hsv)
result = get_dps(data,5)
if result:
attrs['hsv'] = hexvalue_to_hsv(result)
@spawn
def set_color_temperature(self,_color_temperature):
......@@ -265,14 +313,14 @@ class RGBLamp(TuyaDev):
@spawn
def set_brightness(self,_brightness,_smooth=0):
# smooth is not supported
value = int(_brightness) * 255 / 100
value = round(int(_brightness) * 255 / 100)
if value < 25: value = 25
if value > 255: value = 255
self.set_status({2:'white',3:value})
@spawn
def set_hsv(self,_hsv,_smooth=0):
hsv = [int(k) for k in list(_hsv.split(','))]
hsv = [float(k) for k in list(_hsv.split(','))]
result = color_to_hex(hsv)
self.set_status({2:'colour',5:result})
......@@ -288,8 +336,6 @@ class RGBLamp(TuyaDev):
def toggle(self):
tmp = self.status()
state = get_dps(tmp,1)
time.sleep(0.1)
if state != None:
self.set_state(not state,1)
......@@ -13,7 +13,7 @@ PACKAGE_NAME = 'xaal.tuya'
logger = logging.getLogger(__name__)
# poll devices only every n seconds
REFRESH_RATE=30
REFRESH_RATE=45
CFG_MAP = { 'powerrelay': devices.PowerRelay,
'smartplug' : devices.SmartPlug ,
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment