Skip to content
Snippets Groups Projects
Commit a87a4ea7 authored by jkerdreu's avatar jkerdreu
Browse files

Initial ZWave import

git-svn-id: https://redmine.imt-atlantique.fr/svn/xaal/code/Python/trunk@1916 b32b6428-25c9-4566-ad07-03861ab6144f
parent dae58fb8
Branches
No related tags found
No related merge requests found
Showing
with 578 additions and 0 deletions
from enum import Enum
class COMMAND_CLASS(Enum):
NO_OPERATION = 0x00
BASIC = 0x20
CONTROLLER_REPLICATION = 0x21
APPLICATION_STATUS = 0x22
ZIP_SERVICES = 0x23
ZIP_SERVER = 0x24
SWITCH_BINARY = 0x25
SWITCH_MULTILEVEL = 0x26
SWITCH_MULTILEVEL_V2 = 0x26
SWITCH_ALL = 0x27
SWITCH_TOGGLE_BINARY = 0x28
SWITCH_TOGGLE_MULTILEVEL = 0x29
CHIMNEY_FAN = 0x2A
SCENE_ACTIVATION = 0x2B
SCENE_ACTUATOR_CONF = 0x2C
SCENE_CONTROLLER_CONF = 0x2D
ZIP_CLIENT = 0x2E
ZIP_ADV_SERVICES = 0x2F
SENSOR_BINARY = 0x30
SENSOR_MULTILEVEL = 0x31
SENSOR_MULTILEVEL_V2 = 0x31
METER = 0x32
ZIP_ADV_SERVER = 0x33
ZIP_ADV_CLIENT = 0x34
METER_PULSE = 0x35
METER_TBL_CONFIG = 0x3C
METER_TBL_MONITOR = 0x3D
METER_TBL_PUSH = 0x3E
THERMOSTAT_HEATING = 0x38
THERMOSTAT_MODE = 0x40
THERMOSTAT_OPERATING_STATE = 0x42
THERMOSTAT_SETPOINT = 0x43
THERMOSTAT_FAN_MODE = 0x44
THERMOSTAT_FAN_STATE = 0x45
CLIMATE_CONTROL_SCHEDULE = 0x46
THERMOSTAT_SETBACK = 0x47
DOOR_LOCK_LOGGING = 0x4C
SCHEDULE_ENTRY_LOCK = 0x4E
BASIC_WINDOW_COVERING = 0x50
MTP_WINDOW_COVERING = 0x51
ASSOCIATION_GRP_INFO = 0x59
DEVICE_RESET_LOCALLY = 0x5A
CENTRAL_SCENE = 0x5B
IP_ASSOCIATION = 0x5C
ANTITHEFT = 0x5D
ZWAVEPLUS_INFO = 0x5E
MULTI_CHANNEL_V2 = 0x60
MULTI_INSTANCE = 0x60
DOOR_LOCK = 0x62
USER_CODE = 0x63
BARRIER_OPERATOR = 0x66
CONFIGURATION = 0x70
CONFIGURATION_V2 = 0x70
ALARM = 0x71
MANUFACTURER_SPECIFIC = 0x72
POWERLEVEL = 0x73
PROTECTION = 0x75
PROTECTION_V2 = 0x75
LOCK = 0x76
NODE_NAMING = 0x77
FIRMWARE_UPDATE_MD = 0x7A
GROUPING_NAME = 0x7B
REMOTE_ASSOCIATION_ACTIVATE = 0x7C
REMOTE_ASSOCIATION = 0x7D
BATTERY = 0x80
CLOCK = 0x81
HAIL = 0x82
WAKE_UP = 0x84
WAKE_UP_V2 = 0x84
ASSOCIATION = 0x85
ASSOCIATION_V2 = 0x85
VERSION = 0x86
INDICATOR = 0x87
PROPRIETARY = 0x88
LANGUAGE = 0x89
TIME = 0x8A
TIME_PARAMETERS = 0x8B
GEOGRAPHIC_LOCATION = 0x8C
COMPOSITE = 0x8D
MULTI_CHANNEL_ASSOCIATION_V2 = 0x8E
MULTI_INSTANCE_ASSOCIATION = 0x8E
MULTI_CMD = 0x8F
ENERGY_PRODUCTION = 0x90
MANUFACTURER_PROPRIETARY = 0x91
SCREEN_MD = 0x92
SCREEN_MD_V2 = 0x92
SCREEN_ATTRIBUTES = 0x93
SCREEN_ATTRIBUTES_V2 = 0x93
SIMPLE_AV_CONTROL = 0x94
AV_CONTENT_DIRECTORY_MD = 0x95
AV_RENDERER_STATUS = 0x96
AV_CONTENT_SEARCH_MD = 0x97
SECURITY = 0x98
AV_TAGGING_MD = 0x99
IP_CONFIGURATION = 0x9A
ASSOCIATION_COMMAND_CONFIGURATION = 0x9B
SENSOR_ALARM = 0x9C
SILENCE_ALARM = 0x9D
SENSOR_CONFIGURATION = 0x9E
MARK = 0xEF
NON_INTEROPERABLE = 0xF0
from xaal.lib import Device,tools
from cmdclass import COMMAND_CLASS
class ZDevice(object):
def __init__(self,node,gateway):
self.node = node
self.gw = gateway
self.devices = []
self.values = {}
self.config()
self.setup()
def config(self):
""" read config files, search base_addr & group"""
cfg = self.gw.cfg['products']
key = str(self.node.node_id)
if key in cfg.keys():
tmp = cfg[key]['base_addr']
self.base_addr = tmp[:-2]
else:
self.gw.save_config=True
tmp = tools.get_random_uuid()
self.base_addr = tmp[:-2]
cfg.update({key:{}})
cfg[key]['base_addr'] = self.base_addr + '00'
cfg[key]['group'] = tools.get_random_uuid()
cfg.inline_comments[key] = self.node.product_name
def new_device(self,devtype):
""" embed an new device """
node_id = str(self.node.node_id)
dev = Device(devtype)
dev.vendor_id = "IHSEV/OpenZWave"
dev.product_id = self.node.product_name
dev.hw_id = node_id
dev.url = "http://www.openzwave.com"
dev.address = self.base_addr + '%02x' % (len(self.devices) + 1)
dev.group_id = self.gw.cfg['products'][node_id]['group']
dev.info = '%s/%s' % (self.node.type,node_id)
self.devices.append(dev)
return dev
def start(self):
for k in self.values:
self.handle_value_changed(self.values[k])
self.values[k].refresh()
def monitor_value(self,name,cmd_class,instance=1,idx=0):
val = self.gw.get_value(self.node.node_id,cmd_class,instance,idx)
if val:
self.values[name] = val
def set_value(self,name,val):
self.values[name].data = val
def get_value(self,name):
return self.values[name]
def dump(self):
self.gw.dump_product(self.node.node_id)
def setup(self):
""" You should overide this for your product"""
pass
def handle_value_changed(self,value):
""" You can overide this for your product"""
pass
from __future__ import print_function
import platform
import sys,time
# Zwave imports
from openzwave.option import ZWaveOption
from openzwave.network import ZWaveNetwork
from pydispatch import dispatcher
from xaal.lib import tools,Engine,Device
from cmdclass import COMMAND_CLASS
import products
from prettytable import PrettyTable
import atexit
PACKAGE_NAME = "xaal.zwave"
logger = tools.get_logger(PACKAGE_NAME,'INFO')
class GW(object):
def __init__(self,engine):
self.products = []
self.engine = engine
self.save_config = False
self.cfg = tools.load_cfg_or_die(PACKAGE_NAME)
atexit.register(self._exit)
self.setup_gw()
self.setup_network()
self.setup_products()
def update_value(self,network,node,value):
dev = self.get_product(node.node_id)
if dev:
dev.handle_value_changed(value=value)
else:
print('update_value: {}.'.format(value))
def update_node(self,network,node):
if self.get_product(node.node_id) != None: return
if node.is_ready:
logger.info("new zwave device [%d]: %s" % (node.node_id,node.product_name))
dev = self.add_product(node.node_id)
def setup_network(self):
cfg = self.cfg['config']
options = ZWaveOption(cfg['port'])
options.set_console_output(False)
# comments the following lines if you want a log file.
options.set_save_log_level('None')
options.set_log_file("/dev/null")
options.lock()
self.network = ZWaveNetwork(options, autostart=False)
dispatcher.connect(self.update_node,ZWaveNetwork.SIGNAL_NODE)
dispatcher.connect(self.update_value,ZWaveNetwork.SIGNAL_VALUE)
# searching for device w/ wait loop is mandatory. If you don't do that
# the openzwave lib will segfault.
self.network.start()
logger.info('searching for Zwave devices, this may take a while')
for i in range(0,60):
if self.network.is_ready:
logger.info('Zwave network ready')
break
else:
time.sleep(1.0)
print("\rLoading [%d] " % i,end='')
def setup_products(self):
for k in self.network.nodes:
node = self.network.nodes[k]
if self.get_product(node.node_id):
continue
if (node and node.is_ready):
self.add_product(k)
def get_product(self,node_id):
for dev in self.products:
if dev.node.node_id == node_id:
return dev
def add_product(self,node_id):
node = self.network.nodes[node_id]
sig = "%s:%s:%s" % (node.manufacturer_id,node.product_id,node.product_type)
klass=products.search(sig)
if klass:
dev = klass(node,self)
self.products.append(dev)
self.engine.add_devices(dev.devices)
self.update_embedded()
dev.start()
else:
logger.info('Unknow device %s %s' % (node.product_name,sig))
def setup_gw(self):
# last step build the GW device
gw = Device("gateway.basic")
gw.address = self.cfg['config']['addr']
gw.vendor_id = "IHSEV"
gw.product_id = "OpenZwave gateway"
gw.version = 0.1
gw.url = "http://www.openzwave.com/"
gw.info = "%s@%s" % (PACKAGE_NAME,platform.node())
emb = gw.new_attribute('embedded',[])
self.engine.add_device(gw)
self.gw = gw
def update_embedded(self):
result = []
for prod in self.products:
for dev in prod.devices:
result.append(dev.address)
emb = self.gw.get_attribute('embedded')
emb.value = result
def get_value(self,node_id,cmd_class,instance=1,idx=0):
node = self.network.nodes[node_id]
for k in node.values:
val = node.values[k]
# some devices have broken command_class
if val.command_class == None: continue
if ((cmd_class == COMMAND_CLASS(val.command_class)) and (val.index==idx) and (val.instance == instance)):
return val
return None
def _exit(self):
if self.save_config:
logger.info('Saving configuration file')
self.cfg.write()
def dump_product(self,node_id):
""" dumb method that display a zwave device"""
zdev = self.network.nodes[node_id]
print("***** %s" % zdev.product_name)
print("***** %s:%s:%s" % (zdev.manufacturer_id,zdev.product_id,zdev.product_type))
table = PrettyTable(["value","inst","idx","Label","Data","Units","Command class"])
table.align["Label"] = 'l'
table.align["Data"] = 'l'
table.align["Command class"] = 'l'
for k in zdev.values:
val = zdev.values[k]
if val.command_class:
klass = COMMAND_CLASS(val.command_class)
else:
klass = 'Error'
table.add_row([k,val.instance,val.index,val.label,val.data,val.units,klass])
print(table)
def run():
eng = Engine()
gw=GW(eng)
eng.run()
def main():
try:
run()
except KeyboardInterrupt:
print("Bye Bye...")
if __name__ == '__main__':
main()
import core
class ZW100(core.ZDevice):
""" Aeotec MultiSensor 6"""
MANUFACTURER_ID = '0x0086'
PRODUCTS = ['0x0064:0x0002',]
def setup(self):
# temperature
dev = self.new_device("thermometer.basic")
dev.new_attribute("temperature")
# humidity
dev = self.new_device("hygrometer.basic")
dev.new_attribute('humidity')
# luxmeter
dev = self.new_device("luxmeter.basic")
dev.new_attribute('lux')
dev.new_attribute('ultraviolet')
# motion sensor
dev = self.new_device("motion.basic")
dev.new_attribute("motion")
# shock sensor
dev = self.new_device("shock.basic")
dev.new_attribute("shock")
# battery
dev = self.new_device("battery.basic")
dev.new_attribute("level")
# zwave values mapping
self.monitor_value('temperature',core.COMMAND_CLASS.SENSOR_MULTILEVEL,1,1)
self.monitor_value('humidity',core.COMMAND_CLASS.SENSOR_MULTILEVEL,1,5)
self.monitor_value('lux',core.COMMAND_CLASS.SENSOR_MULTILEVEL,1,3)
self.monitor_value('ultraviolet',core.COMMAND_CLASS.SENSOR_MULTILEVEL,1,27)
self.monitor_value('burglar',core.COMMAND_CLASS.ALARM,1,10)
self.monitor_value('battery',core.COMMAND_CLASS.BATTERY,1,0)
def handle_value_changed(self,value):
# thermometer
if value == self.get_value('temperature'):
self.devices[0].attributes["temperature"] = round(value.data,1)
# hygrometer
if value == self.get_value('humidity'):
self.devices[1].attributes['humidity'] = round(value.data)
# luxmeter
if value == self.get_value('lux'):
self.devices[2].attributes['lux'] = round(value.data)
if value == self.get_value('ultraviolet'):
self.devices[2].attributes['ultraviolet'] = round(value.data)
# motion & shock
if value == self.get_value('burglar'):
if value.data == 8:
self.devices[3].attributes['motion'] = True
if value.data == 3:
self.devices[4].attributes['shock'] = True
if value.data == 0:
self.devices[3].attributes['motion'] = False
self.devices[4].attributes['shock'] = False
# battery
if value == self.get_value('battery'):
self.devices[5].attributes['level'] = value.data
from .ZW100 import ZW100
import products
products.register(ZW100)
import core
class AD142(core.ZDevice):
"""
AD142, the buggy dimmer from Everspring !
Important note :
- This device must be excluded from previous association before inclusion
- Put z-stick in exclusion mode and quickly press 3 times the device button
- Put z-stick in inclusion mode and quickly press 3 times the device button
BUG:
- This device report the state in an wrong way. It always return the previous
value. So parsing message won't work.You have to deal w/ manual refresh of
state.
"""
MANUFACTURER_ID ='0x0060'
PRODUCTS = ['0x0001:0x0003',]
def setup(self):
lamp = self.new_device("lamp.dimmer")
# attributes
lamp.new_attribute("light")
lamp.new_attribute("dimmer")
# methods
lamp.add_method('on',self.on)
lamp.add_method('off',self.off)
lamp.add_method('dim',self.dim)
self.monitor_value('level',core.COMMAND_CLASS.SWITCH_MULTILEVEL)
def set_level(self,value):
self.set_value('level',value)
self.gw.engine.add_timer(self.values['level'].refresh,1.8,2)
def on(self):
self.set_level(0x63)
def off(self):
self.set_level(0)
def dim(self,value):
if (value > 0) and (value <100):
self.set_level(value)
def handle_value_changed(self,value):
if value == self.get_value('level'):
dev = self.devices[0]
dev.attributes["dimmer"] = value.data
if value.data == 0:
dev.attributes["light"] = False
else:
dev.attributes["light"] = True
import products
from .AD142 import AD142
for k in [AD142,]:
products.register(k)
import core
class FGWPE(core.ZDevice):
""" Fibaro FGWPE/F Wall Plug"""
MANUFACTURER_ID = '0x010f'
PRODUCTS = ['0x1000:0x0600','0x1001:0x0602']
def setup(self):
# powerrelay
relay = self.new_device("powerrelay.basic")
relay.add_method('on',self.on)
relay.add_method('off',self.off)
relay.new_attribute("power")
# powermeter
power = self.new_device("powermeter.basic")
power.new_attribute("power")
# map zwave var to
self.monitor_value('relay',core.COMMAND_CLASS.SWITCH_BINARY)
self.monitor_value('power',core.COMMAND_CLASS.SENSOR_MULTILEVEL,1,4)
def on(self):
self.set_value('relay',True)
def off(self):
self.set_value('relay',False)
def handle_value_changed(self,value):
if value == self.get_value('relay'):
self.devices[0].attributes['power']=value.data
if value == self.get_value('power'):
self.devices[1].attributes['power']=round(value.data)
import products
from .FGWPE import FGWPE
for k in [FGWPE,]:
products.register(k)
import core
class RGBBulb(core.ZDevice):
MANUFACTURER_ID = '0x0131'
PRODUCTS = ['0x0002:0x0002',]
def setup(self):
lamp = self.new_device("lamp.rgb")
# attributes
lamp.new_attribute("light")
lamp.new_attribute("dimmer")
# methods
lamp.add_method('on',self.on)
lamp.add_method('off',self.off)
lamp.add_method('dim',self.dim)
self.monitor_value('level',core.COMMAND_CLASS.SWITCH_MULTILEVEL)
def set_level(self,value):
self.set_value('level',value)
self.gw.engine.add_timer(self.get_value('level').refresh,1,2)
def on(self):
self.set_level(0x63)
def off(self):
self.set_level(0)
def dim(self,value):
if (value > 0) and (value <100):
self.set_level(value)
def handle_value_changed(self,value):
if value == self.get_value('level'):
dev = self.devices[0]
dev.attributes["dimmer"] = value.data
if value.data == 0:
dev.attributes["light"] = False
else:
dev.attributes["light"] = True
from .RGBBulb import RGBBulb
import products
products.register(RGBBulb)
product_list = []
def register(klass):
product_list.append(klass)
def get():
r = {}
for klass in product_list:
l = []
for p in klass.PRODUCTS:
l.append('%s:%s' % (klass.MANUFACTURER_ID,p))
r.update({klass:l})
return r
def search(product_id):
prod_map = get()
for k in prod_map :
if product_id in prod_map[k]:
return k
from . import Fibaro,Aeotec,Everspring,Zipato
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment