Skip to content
Snippets Groups Projects
Forked from JEDI / base-intersemestre-jedi
2 commits behind, 10 commits ahead of the upstream repository.
sensors.py 4.46 KiB
"""
File: sensors.py
Description: Provides interfaces for the distance sensors
"""

import time
from enum import IntEnum
from math import inf
import threading
import RPi.GPIO as GPIO
from pins import RaspiPin, RaspiPinError


class HCSR04Event(IntEnum):
    """
    Possible distance event.
    """
    DANGER = 0
    WALL = 1
    NOTHING = 2

    def __repr__(self):
        return self.name


class HCSR04:
    """
    Interface for an HCSR04 distance sensor
    """

    def __init__(self,
                 name,
                 echo: RaspiPin,
                 trigger: RaspiPin,
                 low: float = 10.,
                 high: float = 100.):
        '''
        Constructs a HCSR04 object to communicate with a HC-SR04 sensor connected to the given pins
        :param string  name: Name of the sensor
        :param RaspiPin echo: Pin connected to the echo sensor
        :param RaspiPin trigger: Pin connected to the trigger
        :param float low: Distance threshold used for detecting a danger
        :param float high: Distance threshold used for detecting a "not dangerous" wall
        '''
        self.name = name
        if echo is None:
            raise RaspiPinError(self.name + " " + "Echo pin not set")
        self.echo = echo
        if trigger is None:
            raise RaspiPinError(self.name + " " + "Trigger pin not set")
        self.trigger = trigger
        self.low = low
        self.high = high
        GPIO.setup(self.trigger, GPIO.OUT)
        GPIO.setup(self.echo, GPIO.IN)

    def __repr__(self):
        return self.name

    def __str__(self):
        return self.name

    def _send_trigger_pulse(self, wait_duration: float = 10 * 1e-6):
        '''
        Trigger a pulse for computing the distance in front of the sensor
        :param float wait_duration: Duration of the pulse
        '''
        GPIO.output(self.trigger, GPIO.LOW)
        time.sleep(0.2)
        GPIO.output(self.trigger, GPIO.HIGH)
        time.sleep(wait_duration)
        GPIO.output(self.trigger, GPIO.LOW)

    def _wait_for_echo(self, value: bool, timeout: int):
        '''
        Blocking function that returns if
            * the echo sensor state change to the given value,
            * or the timeout is reached

        :param bool value: Expected value on the echo input
        :param int timeout: Timeout value in number of loop
        '''
        count = timeout
        while GPIO.input(self.echo) == value and count > 0:
            count -= 1
        return count

    def get_distance(self, threshold: float = 100.):
        """
        Returns the current distance detected by the sensor.
        :param float threshold Threshold above which the distance is considered as infinite
        """
        # print("Sending pulse", time.time())
        self._send_trigger_pulse()
        self._wait_for_echo(GPIO.LOW, 10000)
        start = time.time()
        # print("Got HIGH echo at", start)
        count2 = self._wait_for_echo(GPIO.HIGH, 10000)
        finish = time.time()
        # print("Got LOW echo at", finish)
        pulse_len = finish - start
        distance_cm = pulse_len * 34300. / 2.
        # print(count1, count2, pulse_len, distance_cm)
        if distance_cm > self.high:
            distance_cm = inf
        return distance_cm


class ContinuousSensor:
    """
    Provide an interface for an HCSR04 that is regularly polled for the distance
    """

    def _update_distance(self):
        while self.running:
            self.distance = self.sensor.get_distance()
            time.sleep(self.sampling_rate)

    def __init__(self, sensor, sampling_rate):
        self.sampling_rate = sampling_rate
        self.distance = float('inf')
        self.sensor = sensor
        self.thread = threading.Thread(
            target=ContinuousSensor._update_distance, args=(self, ))
        self.running = False

    def __del__(self):
        if self.running:
            self.stop()

    def stop(self):
        """
        Stop the thread to read the distance
        """
        self.running = False
        self.thread.join()

    def start(self):
        """
        Start the thread to read the distance
        """
        self.running = True
        self.thread.start()

    def get_distance(self):
        """
        Returns the last distance pull from the sensor.
        Return inf if the distance is bigger than the maximum stated by the spec
        """
        if not self.running:
            raise RuntimeError("Sensor is not currently polling distance")
        return self.distance