test_rpi_adapters.py 1.7 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364
  1. """
  2. Tests of RPI-specific functionality (adapters and sensors).
  3. """
  4. import asyncio
  5. import unittest
  6. import platform
  7. from utils import ValueListSensor
  8. from thingflow.base import Scheduler, SensorAsOutputThing
  9. import thingflow.filters.map
  10. from thingflow.filters.output import output
  11. from thingflow.filters.combinators import passthrough
  12. from utils import StopAfterN
  13. MACHINE=platform.machine()
  14. # Check whether the library for tsl2591 is installed.
  15. # See https://github.com/maxlklaxl/python-tsl2591.git
  16. try:
  17. import tsl2591
  18. TSL2591_INSTALLED=True
  19. except:
  20. TSL2591_INSTALLED=False
  21. values = [
  22. 0,
  23. 1,
  24. 0,
  25. 1,
  26. 0,
  27. 1,
  28. 0
  29. ]
  30. @unittest.skipUnless(MACHINE=="armv7l",
  31. "Tests are specific to RaspberryPi")
  32. class TestRpi(unittest.TestCase):
  33. def test_gpio(self):
  34. import thingflow.adapters.rpi.gpio
  35. o = thingflow.adapters.rpi.gpio.GpioPinOut()
  36. sensor_thing = SensorAsOutputThing(ValueListSensor("sensor-1", values))
  37. sensor_thing.map(lambda evt: evt.val>0).passthrough(output()).connect(o)
  38. s = Scheduler(asyncio.get_event_loop())
  39. s.schedule_periodic(sensor_thing, 1.0)
  40. s.run_forever()
  41. @unittest.skipUnless(TSL2591_INSTALLED,
  42. "TSL2591 sensor library not installed")
  43. def test_tsl2591(self):
  44. import thingflow.sensors.rpi.lux_sensor
  45. sensor = SensorAsOutputThing(thingflow.sensors.rpi.lux_sensor.LuxSensor())
  46. s = Scheduler(asyncio.get_event_loop())
  47. stop = s.schedule_periodic(sensor, 1.0)
  48. StopAfterN(sensor, stop, N=4).output()
  49. s.run_forever()
  50. if __name__ == '__main__':
  51. unittest.main()