Skip to content
Snippets Groups Projects
Commit b6ceee55 authored by jkerdreu's avatar jkerdreu
Browse files

Initial release



git-svn-id: https://redmine.imt-atlantique.fr/svn/xaal/code/Python/trunk@2213 b32b6428-25c9-4566-ad07-03861ab6144f
parent 39621222
No related branches found
No related tags found
No related merge requests found
from xaal.lib import Device,Engine,tools,Message,helpers
from xaal.monitor import Monitor,Notification
import platform
import time
import logging
from enum import Enum
DELAY = 30
ADDR = 'aa4d1cbc-92af-11e8-80cd-408d5c18c801'
PKG_NAME = 'scenario_ensibs_alerte'
DOOR = 'ce91c3a6-20b1-11e9-a250-a4badbf92500'
MVT = '93e09033-708e-11e8-956e-00fec8f7138c'
LIGHTS = ['93e09007-708e-11e8-956e-00fec8f7138c','93e09008-708e-11e8-956e-00fec8f7138c']
BULLET = ['6eb64b73-6e51-11e9-8f96-00fec8f7138c']
BLINKS = ['aa8cd2e4-8c5d-11e9-b0ba-b827ebe99201','980a639c-20b1-11e9-8d70-a4badbf92500']
SIREN = ['980a639c-20b1-11e9-8d70-a4badbf92501',]
MONITORING_DEVICES = [DOOR,LIGHTS,MVT]
class States(Enum):
free = 'free'
busy = 'busy'
fail = 'fail'
no_motion = 'no_motion'
alarm = 'alarm'
class Devices:
def __init__(self):
self.door = None
self.light0 = None
self.light1 = None
self.motion = None
@property
def list(self):
return [self.door,self.light0,self.light1,self.motion]
def check(self):
for k in self.list:
if k == None: return False
if len(k.attributes.keys()) == 0: return False
return True
def used(self,dev):
for k in self.list:
if k==dev: return True
return False
logger = logging.getLogger(PKG_NAME)
device = None
devices = Devices()
state = States.fail
motion_timer = 0
def send(targets,action,body=None):
global device
device.engine.send_request(device,targets,action,body)
def alert():
logger.warning('WARNING !!!!')
send(BULLET,'notify',{'title':'Alarme SDB haut !!','msg':"Personne inerte ou lumière oubliée"})
send(BLINKS,'blink')
send(SIREN,'play')
def update_state():
global state,device
device.attributes['state'] = state.value
def is_light():
if devices.light0.attributes['light']: return True
if devices.light1.attributes['light']: return True
return False
def on_event(event,dev):
global state,motion_timer
if event == Notification.new_device:
if dev.address == DOOR : devices.door = dev
if dev.address == MVT : devices.motion = dev
if dev.address == LIGHTS[0] : devices.light0 = dev
if dev.address == LIGHTS[1] : devices.light1 = dev
if event == Notification.attribute_change:
if devices.check() == False:
state = States.fail
update_state()
return
if state == States.fail:
state = States.free
update_state()
logger.info(dev.attributes)
if dev == devices.door:
# close the door
if dev.attributes['position'] == False:
# mvt + light => busy
if state == States.free and devices.motion.attributes['presence'] == True and is_light() == True:
state = States.busy
# no light => free
if is_light() == False:
state = States.free
else:
# somebody open the door
if state in [States.busy,States.alarm]:
state = States.free
if dev == devices.motion:
# no motion while busy => start timer
if dev.attributes['presence']==False and state == States.busy:
motion_timer = time.time()
state = States.no_motion
# motion while state no_motion => just busy
if dev.attributes['presence'] == True and state == States.no_motion:
state = States.busy
update_state()
def update():
global state,motion_timer,device
now = time.time()
if state == States.no_motion:
if now > (motion_timer + DELAY):
logger.warning('ALARME !!!')
alert()
state = States.alarm
device.attributes['state'] = state.value
def filter_msg(msg):
if msg.source in MONITORING_DEVICES:
return True
return False
def main():
global mon,device
device = Device('scenario.basic',ADDR)
device.new_attribute('state')
device.info = '%s@%s' % (PKG_NAME,platform.node())
engine = Engine()
engine.add_device(device)
engine.add_timer(update,1)
mon = Monitor(device,filter_func = filter_msg)
mon.subscribe(on_event)
engine.run()
if __name__ == '__main__':
try:
helpers.setup_console_logger()
main()
except KeyboardInterrupt:
print('Bye bye')
from xaal.lib import Device,Engine,tools,Message,helpers
from xaal.monitor import Monitor,Notification
import platform
import time
import logging
from enum import Enum
DELAY = 30
ADDR = 'aa4d1cbc-92af-11e8-80cd-408d5c18c800'
PKG_NAME = 'scenario_ensibs_alerte'
DOOR = 'cbdb198c-20b1-11e9-a250-a4badbf92500'
MVT = '93e09031-708e-11e8-956e-00fec8f7138c'
LIGHT = '93e09005-708e-11e8-956e-00fec8f7138c'
BULLET= ['6eb64b73-6e51-11e9-8f96-00fec8f7138c']
BLINKS = ['aa8cd2e4-8c5d-11e9-b0ba-b827ebe99201','980a639c-20b1-11e9-8d70-a4badbf92500']
SIREN = ['980a639c-20b1-11e9-8d70-a4badbf92501',]
MONITORING_DEVICES = [DOOR,LIGHT,MVT]
class States(Enum):
free = 'free'
busy = 'busy'
fail = 'fail'
no_motion = 'no_motion'
alarm = 'alarm'
class Devices:
def __init__(self):
self.door = None
self.light = None
self.motion = None
@property
def list(self):
return [self.door,self.light,self.motion]
def check(self):
for k in self.list:
if k == None: return False
if len(k.attributes.keys()) == 0: return False
return True
def used(self,dev):
for k in self.list:
if k==dev: return True
return False
logger = logging.getLogger(PKG_NAME)
device = None
devices = Devices()
state = States.fail
motion_timer = 0
def send(targets,action,body=None):
global device
device.engine.send_request(device,targets,action,body)
def alert():
logger.warning('WARNING !!!!')
send(BULLET,'notify',{'title':'Alarme WC haut !!','msg':"Personne inerte ou lumière oubliée"})
send(BLINKS,'blink')
send(SIREN,'play')
def update_state():
global state,device
device.attributes['state'] = state.value
def on_event(event,dev):
global state,motion_timer
if event == Notification.new_device:
if dev.address == DOOR: devices.door = dev
if dev.address == MVT : devices.motion = dev
if dev.address == LIGHT: devices.light = dev
if event == Notification.attribute_change:
if devices.check() == False:
state = States.fail
update_state()
return
if state == States.fail:
state = States.free
update_state()
logger.info(dev.attributes)
if dev == devices.door:
# close the door
if dev.attributes['position'] == False:
# mvt + light => busy
if state == States.free and devices.motion.attributes['presence'] == True and devices.light.attributes['light'] == True:
state = States.busy
# no light => free
if devices.light.attributes['light'] == False:
state = States.free
else:
# somebody open the door
if state in [States.busy,States.alarm]:
state = States.free
if dev == devices.motion:
# no motion while busy => start timer
if dev.attributes['presence']==False and state == States.busy:
motion_timer = time.time()
state = States.no_motion
# motion while state no_motion => just busy
if dev.attributes['presence'] == True and state == States.no_motion:
state = States.busy
update_state()
def update():
global state,motion_timer,device
now = time.time()
if state == States.no_motion:
if now > (motion_timer + DELAY):
logger.warning('ALARME !!!')
alert()
state = States.alarm
device.attributes['state'] = state.value
def filter_msg(msg):
if msg.source in MONITORING_DEVICES:
return True
return False
def main():
global mon,device
device = Device('scenario.basic',ADDR)
device.new_attribute('state')
device.info = '%s@%s' % (PKG_NAME,platform.node())
engine = Engine()
engine.add_device(device)
engine.add_timer(update,1)
mon = Monitor(device,filter_func = filter_msg)
mon.subscribe(on_event)
engine.run()
if __name__ == '__main__':
try:
helpers.setup_console_logger()
main()
except KeyboardInterrupt:
print('Bye bye')
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment