dist_lux_rpi.py 1.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051
  1. """Demo of lux sensor and led from raspberry pi - distributed version.
  2. This file contains the data capture part that runs on the Raspberry Pi.
  3. """
  4. import sys
  5. import asyncio
  6. import argparse
  7. from thingflow.base import Scheduler, SensorAsOutputThing
  8. from thingflow.sensors.rpi.lux_sensor import LuxSensor
  9. from thingflow.adapters.rpi.gpio import GpioPinOut
  10. from thingflow.adapters.mqtt import MQTTWriter
  11. import thingflow.filters.select
  12. import thingflow.filters.json
  13. def setup(broker, threshold):
  14. lux = SensorAsOutputThing(LuxSensor())
  15. lux.connect(print)
  16. led = GpioPinOut()
  17. actions = lux.map(lambda event: event.val > threshold)
  18. actions.connect(led)
  19. actions.connect(lambda v: print('ON' if v else 'OFF'))
  20. lux.to_json().connect(MQTTWriter(broker, topics=[('bogus/bogus', 0)]))
  21. lux.print_downstream()
  22. return (lux, led)
  23. def main(argv=sys.argv[1:]):
  24. parser=argparse.ArgumentParser(description='Distributed lux example, data capture process')
  25. parser.add_argument('-i', '--interval', type=float, default=5.0,
  26. help="Sample interval in seconds")
  27. parser.add_argument('-t', '--threshold', type=float, default=25.0,
  28. help="Threshold lux level above which light should be turned on")
  29. parser.add_argument('broker', metavar="BROKER",
  30. type=str,
  31. help="hostname or ip address of mqtt broker")
  32. parsed_args = parser.parse_args(argv)
  33. (lux, led) = setup(parsed_args.broker, parsed_args.threshold)
  34. scheduler = Scheduler(asyncio.get_event_loop())
  35. stop = scheduler.schedule_periodic_on_separate_thread(lux,
  36. parsed_args.interval)
  37. print("starting run...")
  38. try:
  39. scheduler.run_forever()
  40. except KeyboardInterrupt:
  41. led.on_completed()
  42. stop()
  43. return 0
  44. if __name__ == '__main__':
  45. main()