Skip to content
Snippets Groups Projects
Commit 5c332830 authored by jkerdreu's avatar jkerdreu
Browse files

A Kerpape

git-svn-id: https://redmine.imt-atlantique.fr/svn/xaal/code/Python/trunk@2211 b32b6428-25c9-4566-ad07-03861ab6144f
parent 2f30f3ad
No related branches found
No related tags found
No related merge requests found
from xaal.lib import Device,Engine,tools,Message,helpers
from xaal.monitor import Monitor,Notification
import platform
import time
import logging
from enum import Enum
DELAY = 50
ADDR = 'aa4d1cbc-92af-11e8-80cd-408d5c18c800'
PKG_NAME = 'scenario_ensibs_alerte'
DOOR = '42775a2e-92af-11e8-ae30-408d5c18c800'
MVT = '836b6728-92af-11e8-80cd-408d5c18c800'
LIGHT = '092725ef-ca87-45d5-a619-9d7680046c91'
BULLET= ['bc5bb184-8d16-11e9-b4db-9cebe88e1963']
BLINKS = ['aa8cd2e4-8c5d-11e9-b0ba-b827ebe99201','980a639c-20b1-11e9-8d70-a4badbf92500']
SIREN = ['980a639c-20b1-11e9-8d70-a4badbf92501',]
MONITORING_DEVICES = [DOOR,LIGHT,MVT]
class States(Enum):
free = 'free'
busy = 'busy'
fail = 'fail'
no_motion = 'no_motion'
alarm = 'alarm'
class Devices:
def __init__(self):
self.door = None
self.light = None
self.motion = None
@property
def list(self):
return [self.door,self.light,self.motion]
def check(self):
for k in self.list:
if k == None: return False
if len(k.attributes.keys()) == 0: return False
return True
def used(self,dev):
for k in self.list:
if k==dev: return True
return False
logger = logging.getLogger(PKG_NAME)
device = None
devices = Devices()
state = States.fail
motion_timer = 0
def send(targets,action,body=None):
global device
dev.engine.send_request(device,targets,action,body)
def alert():
logger.warning('WARNING !!!!')
send(BULLET,'notify',{'title':'Alarme !!','msg':"SDB"})
send(BLINKS,'blink')
send(SIREN,'play')
def update_state():
global state,device
device.attributes['state'] = state.value
def on_event(event,dev):
global state,motion_timer
if event == Notification.new_device:
if dev.address == DOOR: devices.door = dev
if dev.address == MVT : devices.motion = dev
if dev.address == LIGHT: devices.light = dev
if event == Notification.attribute_change:
if devices.check() == False:
state = States.fail
update_state()
return
if state == States.fail:
state = States.free
update_state()
logger.info(dev.attributes)
if dev == devices.door:
# close the door
if dev.attributes['position'] == False:
# mvt + light => busy
if state == States.free and devices.motion.attributes['presence'] == True and devices.light.attributes['power'] == True:
state = States.busy
# no light => free
if devices.light.attributes['power'] == False:
state = States.free
else:
# somebody open the door
if state in [States.busy,States.alarm]:
state = States.free
if dev == devices.motion:
# no motion while busy => start timer
if dev.attributes['presence']==False and state == States.busy:
motion_timer = time.time()
state = States.no_motion
# motion while state no_motion => just busy
if dev.attributes['presence'] == True and state == States.no_motion:
state = States.busy
update_state()
def update():
global state,motion_timer,device
now = time.time()
if state == States.no_motion:
if now > (motion_timer + DELAY):
logger.warning('ALARME !!!')
state = States.alarm
device.attributes['state'] = state.value
def filter_msg(msg):
if msg.source in MONITORING_DEVICES:
return True
return False
def main():
global mon,device
device = Device('scenario.basic',ADDR)
device.new_attribute('state')
device.info = '%s@%s' % (PKG_NAME,platform.node())
engine = Engine()
engine.add_device(device)
engine.add_timer(update,1)
mon = Monitor(device,filter_func = filter_msg)
mon.subscribe(on_event)
engine.run()
if __name__ == '__main__':
try:
helpers.setup_console_logger()
main()
except KeyboardInterrupt:
print('Bye bye')
\ No newline at end of file
...@@ -31,9 +31,8 @@ mon = None ...@@ -31,9 +31,8 @@ mon = None
dev = None dev = None
def send(targets,action,body=None): def send(targets,action,body=None):
global mon,dev global dev
engine = mon.dev.engine dev.engine.send_request(dev,targets,action,body)
engine.send_request(dev,targets,action,body)
def search_for_light(lamps): def search_for_light(lamps):
for l in lamps: for l in lamps:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment