Skip to content
Snippets Groups Projects
Commit 08d1df7c authored by jkerdreu's avatar jkerdreu
Browse files

Added binding for vars ..

git-svn-id: https://redmine.imt-atlantique.fr/svn/xaal/code/Python/trunk@1981 b32b6428-25c9-4566-ad07-03861ab6144f
parent 75bcdf46
No related branches found
No related tags found
No related merge requests found
from . import dpts
# This file include the functions you can apply to data to
# match and attribute
def bool_(attribute,data,dpt):
val = dpts.decode[dpt](data)
attribute.value = bool(val)
def bool_inv(attribute,data,dpt):
val = dpts.decode[dpt](data)
attribute.value = not bool(val)
def on_off(attribute,data,dpt):
val = bool(dpts.decode[dpt](data))
if val:
attribute.value = 'ON'
else:
attribute.value = 'OFF'
def round_(attribute,data,dpt):
val = dpts.decode[dpt](data)
attribute.value = round(val)
funct = {
"bool" : bool_,
"bool_inv" : bool_inv,
"round" : round_,
"on_off" : on_off,
}
\ No newline at end of file
from xaal.schemas import devices
from .bindings import funct
import logging
from functools import partial
import dpts
logger = logging.getLogger(__name__)
def bool_(attribute,dpt,data):
val = dpts.decode[dpt](data)
attribute.value = bool(val)
def round_(attribute,dpt,data):
val = dpts.decode[dpt](data)
attribute.value = round(val)
class KNXDev:
def __init__(self,gw,addr,w_addr,r_addr,dpt):
def __init__(self,gw,cfg):
self.gateway= gw
self.addr = addr
self.w_addr = w_addr
self.r_addr = r_addr or w_addr
self.dpt = dpt
self.attribute_binding = {}
self.setup(addr)
self.cfg = cfg
self.attributes_binding = {}
self.addr = cfg.get('addr',None)
self.dev = None
self.setup()
def setup(self,addr):
logger.warn("Please define setup() in this device %s" % addr)
def setup(self):
logger.warn("Please define setup() in this device")
def bind_write(self,data):
func = partial(self.gateway.knx.write,self.w_addr,data,self.dpt)
def write(self,group_addr,dpt,data):
func = partial(self.gateway.knx.write,group_addr,data,dpt)
return func
def bind_attribute(self,group_addr,attribute,func=None):
ptr = partial(func,attribute,self.dpt,data=None)
self.attribute_binding.update({group_addr: ptr})
def bind_attribute(self,group_addr,attribute,func,dpt):
ptr = partial(func,attribute,data=None,dpt='1')
self.attributes_binding.update({group_addr: ptr})
def parse(self,cemi):
if cemi.group_addr in self.attribute_binding:
func = self.attribute_binding[cemi.group_addr]
if cemi.group_addr in self.attributes_binding:
func = self.attributes_binding[cemi.group_addr]
func(data=cemi.data)
class PowerRelay(KNXDev):
def setup(self,addr):
dev = devices.powerrelay(self.addr)
dev.add_method('on', self.bind_write(0x1))
dev.add_method('off', self.bind_write(0x0))
self.bind_attribute(self.r_addr,dev.get_attribute("power"),bool_)
self.device = dev
def setup(self):
self.dev = devices.powerrelay(self.addr)
cmd = self.cfg.get('cmd',None)
if cmd:
self.dev.add_method('on', self.write(cmd,'1',1))
self.dev.add_method('off',self.write(cmd,'1',0))
state = self.cfg.get('state',None)
if state == None:
if cmd: state = cmd
if state:
self.bind_attribute(state,self.dev.get_attribute("power"),funct['bool'],'1')
self.dev.info = cmd or state
class Lamp(KNXDev):
def setup(self):
self.dev = devices.lamp(self.addr)
cmd = self.cfg.get('cmd',None)
if cmd:
self.dev.add_method('on', self.write(cmd,'1',1))
self.dev.add_method('off',self.write(cmd,'1',0))
state = self.cfg.get('state',None)
if state == None:
if cmd: state = cmd
if state:
self.bind_attribute(state,self.dev.get_attribute("light"),funct['bool'],'1')
self.dev.info = cmd or state
class PowerMeter(KNXDev):
def setup(self,addr):
dev = devices.powermeter(self.addr)
self.bind_attribute(self.r_addr,dev.get_attribute('power'),round_)
self.device = dev
self.dev = devices.powermeter(self.addr)
state = self.cfg.get('state',None)
if state == None:
self.bind_attribute(state,self.dev.get_attribute('power'),funct['round'],'9')
self.dev.info = state
types = {
'PowerRelay' : PowerRelay,
'Lamp' : Lamp,
'PowerMeter' : PowerMeter
}
\ No newline at end of file
......@@ -8,6 +8,7 @@ from . import devices
PACKAGE_NAME = 'xaal.knx'
logger = tools.get_logger(PACKAGE_NAME,'DEBUG')
class GW(gevent.Greenlet):
def __init__(self,engine):
gevent.Greenlet.__init__(self)
......@@ -22,6 +23,7 @@ class GW(gevent.Greenlet):
cfg = tools.load_cfg(PACKAGE_NAME)
if not cfg:
cfg= tools.new_cfg(PACKAGE_NAME)
cfg['devices'] = []
logger.warn("Created an empty config file, please add your devices")
cfg.write()
self.cfg = cfg
......@@ -32,18 +34,22 @@ class GW(gevent.Greenlet):
self.knx = KNXConnector(addr)
else:
self.knx = KNXConnector()
devs = self.cfg['devices']
for k in devs:
klass = devices.types.get(devs[k]['type'],None)
if klass:
dev = klass(self,devs[k])
xaal_dev = dev.dev
xaal_dev.vendor_id = 'IHSEV'
xaal_dev.version = 0.1
xaal_dev.product_id = 'KNX ' + devs[k]['type']
self.devices.append(dev)
rel1 = devices.PowerRelay(self,'3c22779c-6e80-11e8-a654-408d5c18c8f7','0/0/1','0/1/0','1')
rel2 = devices.PowerRelay(self,'3c22779c-6e80-11e8-a654-408d5c18c8f8','0/0/2','0/1/1','1')
self.devices.append(rel1)
self.devices.append(rel2)
self.engine.add_devices([rel1.device,rel2.device])
def init(self):
"""loop throught the device adress to find initial values"""
for dev in self.devices:
self.knx.read(dev.r_addr)
l = [k.dev for k in self.devices]
self.engine.add_devices(l)
def _run(self):
while 1:
cemi = self.knx.receive()
......@@ -53,9 +59,16 @@ class GW(gevent.Greenlet):
def handle(self,cemi):
for dev in self.devices:
if dev.r_addr == cemi.group_addr:
if cemi.group_addr in dev.attributes_binding :
dev.parse(cemi)
def init(self):
"""loop throught the device adress to find initial values"""
for dev in self.devices:
for ga in dev.attributes_binding:
self.knx.read(ga)
def setup(eng):
logger.info('Starting %s' % PACKAGE_NAME)
GW.spawn(eng)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment