Forked from
JEDI / base-intersemestre-jedi
2 commits behind, 10 commits ahead of the upstream repository.
-
ROUE Simon, Claude, Yves authoredROUE Simon, Claude, Yves authored
sensors.py 4.46 KiB
"""
File: sensors.py
Description: Provides interfaces for the distance sensors
"""
import time
from enum import IntEnum
from math import inf
import threading
import RPi.GPIO as GPIO
from pins import RaspiPin, RaspiPinError
class HCSR04Event(IntEnum):
"""
Possible distance event.
"""
DANGER = 0
WALL = 1
NOTHING = 2
def __repr__(self):
return self.name
class HCSR04:
"""
Interface for an HCSR04 distance sensor
"""
def __init__(self,
name,
echo: RaspiPin,
trigger: RaspiPin,
low: float = 10.,
high: float = 100.):
'''
Constructs a HCSR04 object to communicate with a HC-SR04 sensor connected to the given pins
:param string name: Name of the sensor
:param RaspiPin echo: Pin connected to the echo sensor
:param RaspiPin trigger: Pin connected to the trigger
:param float low: Distance threshold used for detecting a danger
:param float high: Distance threshold used for detecting a "not dangerous" wall
'''
self.name = name
if echo is None:
raise RaspiPinError(self.name + " " + "Echo pin not set")
self.echo = echo
if trigger is None:
raise RaspiPinError(self.name + " " + "Trigger pin not set")
self.trigger = trigger
self.low = low
self.high = high
GPIO.setup(self.trigger, GPIO.OUT)
GPIO.setup(self.echo, GPIO.IN)
def __repr__(self):
return self.name
def __str__(self):
return self.name
def _send_trigger_pulse(self, wait_duration: float = 10 * 1e-6):
'''
Trigger a pulse for computing the distance in front of the sensor
:param float wait_duration: Duration of the pulse
'''
GPIO.output(self.trigger, GPIO.LOW)
time.sleep(0.2)
GPIO.output(self.trigger, GPIO.HIGH)
time.sleep(wait_duration)
GPIO.output(self.trigger, GPIO.LOW)
def _wait_for_echo(self, value: bool, timeout: int):
'''
Blocking function that returns if
* the echo sensor state change to the given value,
* or the timeout is reached
:param bool value: Expected value on the echo input
:param int timeout: Timeout value in number of loop
'''
count = timeout
while GPIO.input(self.echo) == value and count > 0:
count -= 1
return count
def get_distance(self, threshold: float = 100.):
"""
Returns the current distance detected by the sensor.
:param float threshold Threshold above which the distance is considered as infinite
"""
# print("Sending pulse", time.time())
self._send_trigger_pulse()
self._wait_for_echo(GPIO.LOW, 10000)
start = time.time()
# print("Got HIGH echo at", start)
count2 = self._wait_for_echo(GPIO.HIGH, 10000)
finish = time.time()
# print("Got LOW echo at", finish)
pulse_len = finish - start
distance_cm = pulse_len * 34300. / 2.
# print(count1, count2, pulse_len, distance_cm)
if distance_cm > self.high:
distance_cm = inf
return distance_cm
class ContinuousSensor:
"""
Provide an interface for an HCSR04 that is regularly polled for the distance
"""
def _update_distance(self):
while self.running:
self.distance = self.sensor.get_distance()
time.sleep(self.sampling_rate)
def __init__(self, sensor, sampling_rate):
self.sampling_rate = sampling_rate
self.distance = float('inf')
self.sensor = sensor
self.thread = threading.Thread(
target=ContinuousSensor._update_distance, args=(self, ))
self.running = False
def __del__(self):
if self.running:
self.stop()
def stop(self):
"""
Stop the thread to read the distance
"""
self.running = False
self.thread.join()
def start(self):
"""
Start the thread to read the distance
"""
self.running = True
self.thread.start()
def get_distance(self):
"""
Returns the last distance pull from the sensor.
Return inf if the distance is bigger than the maximum stated by the spec
"""
if not self.running:
raise RuntimeError("Sensor is not currently polling distance")
return self.distance