lux_sensor_example.py 1.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748
  1. """Demo of lux sensor and led from raspberry pi
  2. """
  3. import sys
  4. import asyncio
  5. import os.path
  6. from thingflow.base import Scheduler, SensorAsOutputThing
  7. from thingflow.sensors.rpi.lux_sensor import LuxSensor
  8. from thingflow.adapters.rpi.gpio import GpioPinOut
  9. import thingflow.adapters.csv
  10. import thingflow.filters.select
  11. def setup(threshold=25):
  12. lux = SensorAsOutputThing(LuxSensor())
  13. lux.connect(print)
  14. lux.csv_writer(os.path.expanduser('~/lux.csv'))
  15. led = GpioPinOut()
  16. actions = lux.map(lambda event: event.val > threshold)
  17. actions.connect(led)
  18. actions.connect(lambda v: print('ON' if v else 'OFF'))
  19. lux.print_downstream()
  20. return (lux, led)
  21. def main(argv=sys.argv[1:]):
  22. if len(argv)!=2:
  23. print("%s threshold interval" % sys.argv[0])
  24. return 1
  25. threshold = float(argv[0])
  26. interval = float(argv[1])
  27. print("%f seconds interval and an led threshold of %f lux" %
  28. (interval, threshold))
  29. (lux, led) = setup(threshold)
  30. scheduler = Scheduler(asyncio.get_event_loop())
  31. stop = scheduler.schedule_periodic_on_separate_thread(lux, interval)
  32. print("starting run...")
  33. try:
  34. scheduler.run_forever()
  35. except KeyboardInterrupt:
  36. led.on_completed()
  37. stop()
  38. return 0
  39. if __name__ == '__main__':
  40. main()