Skip to content
Snippets Groups Projects
Commit a65d058a authored by jkerdreu's avatar jkerdreu
Browse files

Initial release of xaal.ipx800



git-svn-id: https://redmine.imt-atlantique.fr/svn/xaal/code/Python/branches/fork@1546 b32b6428-25c9-4566-ad07-03861ab6144f
parent 77a778a3
No related branches found
No related tags found
No related merge requests found
xaal.ipx800
===========
This package contains a xAAL gateway for IPX-800 Ethernet Control System.
Install
=======
You can use both pip, with pip install xaal.ipx800 or setup.py
You can test the package with :
python setup.py develop (or install) --user
To run the gateway:
- edit your config file (~/.xaal/xaal.ipx800.ini)
- launch python -m xaal.ipx800
TODO
====
Due to time constraint, only channel outputs (relays) are
supported right now. You can add easily analog or digital
inputs, everything is ready.
Perhaps, in future.
from setuptools import setup
with open('README.rst') as f:
long_description = f.read()
VERSION = "0.1"
setup(
name='xaal.ipx800',
version=VERSION,
license='GPL License',
author='Jerome Kerdreux',
author_email='Jerome.Kerdreux@imt-atlantique.fr',
#url='',
description=('xAAL devices for IPX-800 Ethernet Control System' ),
long_description=long_description,
classifiers=[
'Programming Language :: Python',
'Topic :: Software Development :: Libraries :: Python Modules',
],
keywords=['xaal', 'ipx-800'],
platforms='any',
packages=["xaal.ipx800",],
namespace_packages=["xaal"],
include_package_data=True,
install_requires=[
'xaal.lib',
]
)
[config]
host = localhost
port = 9870
addr = xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx
[outputs]
base_addr = xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx
outputs_type = lamp,relay,None,relay,relay,relay,relay,lamp
# this is a namespace package
try:
import pkg_resources
pkg_resources.declare_namespace(__name__)
except ImportError:
import pkgutil
__path__ = pkgutil.extend_path(__path__, __name__)
from . import gw
gw.main()
from xaal.lib import Device,Attribute
class OutputChannel(object):
def __init__(self,ipx,channel,addr,state_name):
dev = Device("changeme.basic",addr)
dev.vendor_id = "IHSEV"
dev.version = 0.1
dev.hw_id = channel
# attribute
self.state = dev.new_attribute(state_name)
# method
dev.add_method('on',self.on)
dev.add_method('off',self.off)
self.chan = channel
self.ipx = ipx
self.dev = dev
def on(self):
self.ipx.relay_on(self.chan)
self.state.value = True
def off(self):
self.ipx.relay_off(self.chan)
self.state.value = False
def new_lamp(ipx,channel,addr):
lamp = OutputChannel(ipx,channel,addr,'light')
lamp.dev.devtype = 'lamp.basic'
lamp.dev.product_id = 'IPX-800 Lamp'
return lamp
def new_relay(ipx,channel,addr):
relay = OutputChannel(ipx,channel,addr,'power')
relay.dev.devtype = 'powerrelay.basic'
relay.dev.product_id = 'IPX-800 Power Relay'
return relay
from gevent import monkey; monkey.patch_all()
import gevent
from xaal.lib import tools,Engine,Device
from . import devices
import platform
import socket
PACKAGE_NAME = "xaal.ipx800"
logger = tools.get_logger(PACKAGE_NAME,'DEBUG')
class IPX800(gevent.Greenlet):
def __init__(self,engine):
gevent.Greenlet.__init__(self)
self.engine = engine
self.cfg = tools.load_cfg_or_die(PACKAGE_NAME)
self.connect()
self.setup_ouputs()
self.setup_gw()
def connect(self):
""" create the bugnet connector"""
cfg = self.cfg['config']
host = cfg['host']
port = int(cfg['port'])
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.connect((host, port))
def setup_ouputs(self):
""" load nodes from config file"""
cfg = self.cfg['outputs']
self.in_out=[]
i = 0
for t in cfg['outputs_type']:
i = i+1
out = None
if t == 'relay':
out = devices.new_relay(self,i,tools.get_random_uuid())
if t == 'lamp':
out = devices.new_lamp(self,i,tools.get_random_uuid())
if out:
self.engine.add_device(out.dev)
self.in_out.append(out)
def setup_gw(self):
# last step build the GW device
gw = Device("gateway.basic")
gw.address = self.cfg['config']['addr']
gw.vendor_id = "IHSEV"
gw.product_id = "IPX-800 Ethernet Control System"
gw.version = 0.1
gw.url = "http://gce-electronics.com/fr/home/57-module-ip-rail-din-webserver-8-relais-ipx800-v3.html"
gw.info = "%s@%s" % (PACKAGE_NAME,platform.node())
emb = gw.new_attribute('embedded',[])
emb.value.append([io.dev.address for io in self.in_out])
self.engine.add_device(gw)
def _run(self):
fd = self.sock.makefile()
while 1:
line = fd.readline().strip('\r\n')
self.parse_line(line)
def parse_line(self,line):
if line == 'OK': return
data = line.split('&')
if len(data) == 26:
out = data[1][2:]
#print(out)
i = 0
for c in out:
i = i + 1
for io in self.in_out:
if io.chan == i:
io.state.value = bool(int(c))
break
def send(self,data):
data = data + "\r\n"
self.sock.send(data.encode('utf-8'))
def relay_on(self,ID):
msg = 'Set%02d1' % ID
self.send(msg)
def relay_off(self,ID):
msg = 'Set%02d0' % ID
self.send(msg)
def run():
eng = Engine()
ipx = IPX800.spawn(eng)
eng.run()
def main():
try:
run()
except KeyboardInterrupt:
print("Bye Bye...")
if __name__ == '__main__':
main()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment