functional_api_example.py 1.9 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859
  1. """This is a demonstration of the fuctional API for filters. It is
  2. based on examples/rpi/lux_sensor_example.py. See docs/functional-api.rst
  3. for details.
  4. """
  5. import asyncio
  6. import random
  7. random.seed()
  8. from thingflow.base import InputThing, Scheduler
  9. from thingflow.filters.output import output
  10. from thingflow.filters.select import map
  11. from thingflow.adapters.csv import csv_writer
  12. from thingflow.filters.combinators import passthrough
  13. class DummyLuxSensor:
  14. def __init__(self, sensor_id, mean=300, stddev=100, stop_after=5):
  15. """Rather than use the real RPI sensor here, we will just
  16. define one that generates random numbers.
  17. """
  18. self.sensor_id = sensor_id
  19. self.mean = mean
  20. self.stddev = stddev
  21. self.events_left = stop_after
  22. def sample(self):
  23. if self.events_left>0:
  24. data = random.gauss(self.mean, self.stddev)
  25. self.events_left -= 1
  26. return data
  27. else:
  28. raise StopIteration
  29. def __repr__(self):
  30. return "DummyLuxSensor(%s, %s, %s)" % \
  31. (self.sensor_id, self.mean, self.stddev)
  32. class DummyLed(InputThing):
  33. def on_next(seelf, x):
  34. if x:
  35. print("LED ON")
  36. else:
  37. print("LED OFF")
  38. def __repr__(self):
  39. return 'DummyLed'
  40. THRESHOLD = 300
  41. # Instantiate the sensor and use the functional API to build a flow
  42. lux = DummyLuxSensor("lux-1")
  43. scheduler = Scheduler(asyncio.get_event_loop())
  44. scheduler.schedule_sensor(lux, 1.0,
  45. passthrough(output()),
  46. passthrough(csv_writer('/tmp/lux.csv')),
  47. map(lambda event:event.val > THRESHOLD),
  48. passthrough(lambda v: print('ON' if v else 'OFF')),
  49. DummyLed(), print_downstream=True)
  50. scheduler.run_forever()
  51. print("That's all folks")