Skip to content
Snippets Groups Projects
Commit d0d1112f authored by jkerdreu's avatar jkerdreu
Browse files

After a long refactoring, I managed to get Tuya working on most devices

even buggy one. This should be ok for every day.

git-svn-id: https://redmine.imt-atlantique.fr/svn/xaal/code/Python/branches/0.7@2470 b32b6428-25c9-4566-ad07-03861ab6144f
parent f56a7f81
Branches
No related tags found
No related merge requests found
......@@ -23,15 +23,9 @@ setup(
platforms='any',
packages=find_packages(),
include_package_data=True,
dependency_links=[
"git+https://github.com/belzedaar/python-tuya.git#egg=pytuya"
],
install_requires=[
'xaal.lib',
'gevent==1.5a2',
'pytuya',
'pyaes',
'pycrypto',
'tenacity',
'tuyaface',
]
)
from xaal.schemas import devices
from xaal.lib import tools,Device
import pytuya
import tuyaface as tuya
import time
import logging
import functools
import gevent
# Tuya device aren't really network reliable, I get a bunch of connection reset
# This looks like they use a single socket for local network & cloud (SmartLife APP)
# connection. Everytime time I toggle a outlet in SmartLife I get a connection reset
# on the next update status. I tested w/ delays but not really working, so I switched
# to tenacity once again.
# on the next update status.
from tenacity import retry,stop_after_attempt,RetryError
logger = logging.getLogger(__name__)
# disable pytuya debug
logging.getLogger('pytuya').setLevel(logging.INFO)
# disable tuyaface debug
#logging.getLogger('tuyaface').setLevel(logging.INFO)
def get_dps(data,idx):
"""extract a given datapoint from a tuya dict received"""
if data:
dps = data.get('dps',{})
return dps.get(str(idx),None)
def spawn(func,*args,**kwargs):
def wrapper(*args,**kwargs):
gevent.spawn(func,*args,**kwargs)
return wrapper
def retry(cmd,*args,**kwargs):
"""
Tuya send/receive produce a lot of network error. This function provide shield.
If a network issue occur, this function wait a couple of ms, than resend.
"""
cnt = 1
while 1:
try:
result = cmd(*args,**kwargs)
if result:
return result
except ConnectionResetError:
logger.error('ConnectionResetError: w/ %s (%s)' % (args,cnt))
except OSError:
logger.error('OSError: w/ %s (%s)' % (args,cnt))
# error
if cnt == 2: break
cnt = cnt+1
time.sleep(0.3)
def now():
return time.time()
class TuyaDev:
def __init__(self,tuya_id,tuya_key,tuya_ip,base_addr):
def __init__(self,tuya_id,cfg):
self.is_valid = False
self.tuya_id = tuya_id
self.tuya_key = tuya_key
self.tuya_ip = tuya_ip
self.tuya_dev = None
self.base_addr = base_addr
self.last_update = 0
self.devices = []
self.last_update = 0
self.cfg = cfg
self.load_config()
self.setup()
self.init_properties()
def load_config(self):
cfg = self.cfg
addr = tools.get_uuid(cfg.get('base_addr',None))
ip = cfg.get('ip',None)
key = cfg.get('key',None)
proto = cfg.get('protocol','3.3')
if not ip or not key:
return
if addr == None:
addr = tools.get_random_base_uuid()
cfg['base_addr'] = str(addr)
self.base_addr = addr
# forge the dict needed for tuyaface
self.tuya_info = { 'ip' : ip, 'deviceid' : self.tuya_id, 'localkey' : key,'protocol' : proto }
self.is_valid = True
def setup(self):
logger.warning('Please override setup()')
def init_properties(self):
for dev in self.devices:
dev.vendor_id = 'IHSEV / Tuya'
dev.hw_id = self.tuya_id
dev.info = 'Tuya device: %s @ %s' % (self.tuya_id,self.tuya_ip)
if len(self.devices) > 1:
dev.group_id = self.base_addr + 0xff
@spawn
def update_status(self):
try:
self._update_status()
except RetryError as e:
logger.warn(e)
#@retry(stop=stop_after_attempt(2),wait=wait_fixed(0.15), before_sleep=before_sleep_log(logger, logging.DEBUG))
@retry(stop=stop_after_attempt(2))
def _update_status(self):
time.sleep(0.15)
#logger.info("Updating %s" % self.tuya_id)
data=self.status()
self.on_status(data)
self.last_update = now()
status = self.tuya_dev.status()
#logger.warning(status)
self.on_status(status)
def on_status(self,status):
logger.warning('Please override on_status')
class OnOffMixing(object):
def setup_onoff(self,tuya_dev,xaal_dev):
tuya_dev.set_version(3.3)
self.tuya_dev = tuya_dev
xaal_dev.methods['on'] = self.on
xaal_dev.methods['off'] = self.off
xaal_dev.methods['toggle'] = self.toggle
xaal_dev.methods['update'] = self.update_status
self.xaal_dev = xaal_dev
@retry(stop=stop_after_attempt(3))
def on(self):
self.tuya_dev.turn_on()
self.update_status()
@retry(stop=stop_after_attempt(3))
def off(self):
self.tuya_dev.turn_off()
self.update_status()
def toggle(self):
power = self.xaal_dev.attributes['power']
if power == None:
self.update_status()
power = self.xaal_dev.attributes['power']
if power == True:
self.off()
elif power == False:
self.on()
class PowerRelay(TuyaDev,OnOffMixing):
def setup(self):
relay = devices.powerrelay_toggle(self.base_addr)
relay.product_id = 'Generic Tuya Outlet'
outlet = pytuya.OutletDevice(self.tuya_id,self.tuya_ip,self.tuya_key)
self.setup_onoff(outlet,relay)
self.devices.append(relay)
def on_status(self,status):
self.devices[0].attributes['power'] = status['dps']['1']
#========================================================================
# Tuya wrappers
#========================================================================
def status(self):
return retry(tuya.status,self.tuya_info)
def set_state(self,state,idx=1):
return retry(tuya.set_state,self.tuya_info,state,idx)
class SmartPlug(TuyaDev,OnOffMixing):
class PowerRelay(TuyaDev):
def setup(self):
self.dps = self.cfg.get('dps',[1])
addr = self.base_addr
for k in self.dps:
dev = devices.powerrelay_toggle(addr)
dev.methods['turn_on'] = functools.partial(self.turn_on,k,dev)
dev.methods['turn_off'] = functools.partial(self.turn_off,k,dev)
dev.methods['toggle'] = functools.partial(self.toggle,k,dev)
addr = addr +1
self.devices.append(dev)
@spawn
def turn_on(self,idx,dev):
r = self.set_state(True,idx)
print(r)
@spawn
def turn_off(self,idx,dev):
r = self.set_state(False,idx)
print(r)
@spawn
def toggle(self,idx,dev):
tmp = self.status()
state = get_dps(tmp,idx)
if state != None:
self.set_state(not state,idx)
def on_status(self,data):
logger.warning('TDB')
class SmartPlug(PowerRelay):
def setup(self):
# TBD switch to the right schema, but right now nothing exists
relay = devices.powerrelay_toggle(self.base_addr)
relay.product_id = 'Tuya Smart Plug Outlet'
plug = pytuya.OutletDevice(self.tuya_id,self.tuya_ip,self.tuya_key)
self.setup_onoff(plug,relay)
pmeter = devices.powermeter(self.base_addr + 1)
pmeter.product_id = 'Tuya Smart Plug Power Meter'
pmeter.new_attribute('voltage')
pmeter.new_attribute('current')
pmeter.del_attribute(pmeter.get_attribute('energy'))
pmeter.del_attribute(pmeter.get_attribute('devices'))
pmeter.unsupported_attributes = ['energy','devices']
self.devices.append(relay)
self.devices.append(pmeter)
def on_status(self,status):
dps = status['dps']
self.devices[0].attributes['power'] = dps.get('1',None)
self.devices[1].attributes['current'] = int(dps.get('4',0)) / 1000
self.devices[1].attributes['power'] = int(dps.get('5',0)) / 10
self.devices[1].attributes['voltage'] = int(dps.get('6',0)) / 10
# TODO I don't have this kind of devices right now
class RGBLamp(TuyaDev,OnOffMixing):
PowerRelay.setup(self)
class RGBLamp(TuyaDev):
def setup(self):
lamp = devices.lamp(self.base_addr)
dev = pytuya.OutletDevice(self.tuya_id,self.tuya_ip,self.tuya_key)
self.setup_onoff(dev,lamp)
self.devices.append(lamp)
dev = devices.lamp_color(self.base_addr)
dev.methods['turn_on'] = self.turn_on
dev.methods['turn_off'] = self.turn_off
self.devices.append(dev)
def on_status(self,data):
print(data)
result = get_dps(data,1)
if result!=None:
self.devices[0].attributes['light'] = result
result = get_dps(data,2)
if result == 'colour':
self.devices[0].attributes['mode'] = 'color'
@spawn
def turn_on(self):
data = self.set_state(True,1)
self.on_status(data)
@spawn
def turn_off(self):
data = self.set_state(False,1)
self.on_status(data)
def status(self,status):
logger.warning(status)
class PowerRelayMetering(TuyaDev):pass
from gevent import monkey;monkey.patch_all()
from xaal.lib import tools,Device
from xaal import schemas
from . import devices
......@@ -46,33 +49,27 @@ class GW:
devices = self.cfg.get('devices',[])
for d in devices:
cfg = devices.get(d,{})
base_addr = tools.get_uuid(cfg.get('base_addr',None))
ip = cfg.get('ip',None)
key = cfg.get('key',None)
type_ = cfg.get('type','PowerRelay')
if base_addr == None:
base_addr = tools.get_random_base_uuid()
cfg['base_addr'] = str(base_addr)
if ip and key:
self.add_device(d,key,ip,base_addr,type_)
tmp = cfg.get('type','PowerRelay')
dev_type = CFG_MAP.get(tmp,None)
if dev_type:
dev = dev_type(d,cfg)
if dev.is_valid:
self.add_device(d,dev)
dev.update_status()
else:
logger.warning(f"Config error for {d}")
else:
logger.warn('Error in config file, wrong devices')
logger.warn(f"Unsupported device type {type_} {d}")
# loaded all devices
self.engine.add_device(gw)
def add_device(self,tuya_id,tuya_key,ip,base_addr,type_):
dev_type = CFG_MAP.get(type_,None)
if dev_type:
dev = dev_type(tuya_id,tuya_key,ip,base_addr)
def add_device(self,tuya_id,dev):
self.devices[tuya_id] = dev
for d in dev.devices:
self.engine.add_device(d)
self.gw.attributes['embedded'].append(d.address)
dev.update_status()
else:
logger.warn(f"Unsupported device type {type_} {ip}")
def update(self):
# Tuya protocol doesn't support update broadcasting
......@@ -82,7 +79,7 @@ class GW:
now = time.time()
for dev in self.devices.values():
if now > (dev.last_update + REFRESH_RATE):
dev.update_status()
#dev.update_status()
break # we only update one dev to avoid blocking
def _exit(self):
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment