tutorial.py 2.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889
  1. """
  2. This is an example thingflow program that is described in tutorial.rst.
  3. """
  4. # First, let's define a sensor that generates a random number each time
  5. # it is sampled.
  6. import random
  7. random.seed()
  8. from thingflow.base import SensorAsOutputThing
  9. class RandomSensor:
  10. def __init__(self, sensor_id, mean, stddev, stop_after):
  11. """This sensor will signal it is completed after the
  12. specified number of events have been sampled.
  13. """
  14. self.sensor_id = sensor_id
  15. self.mean = mean
  16. self.stddev = stddev
  17. self.events_left = stop_after
  18. def sample(self):
  19. if self.events_left>0:
  20. data = random.gauss(self.mean, self.stddev)
  21. self.events_left -= 1
  22. return data
  23. else:
  24. raise StopIteration
  25. def __str__(self):
  26. return "RandomSensor(%s, %s, %s)" % \
  27. (self.sensor_id, self.mean, self.stddev)
  28. # Instantiate our sensor
  29. MEAN = 100
  30. STDDEV = 10
  31. sensor = SensorAsOutputThing(RandomSensor(1, MEAN, STDDEV, stop_after=5))
  32. # Now, we will define a pretend LED as a subscriber. Each time is it passed
  33. # True, it will print 'On'. Each time it is passed False, it will print 'Off'.
  34. from thingflow.base import InputThing
  35. class LED(InputThing):
  36. def on_next(self, x):
  37. if x:
  38. print("On")
  39. else:
  40. print("Off")
  41. def on_error(self, e):
  42. print("Got an error: %s" % e)
  43. def on_completed(self):
  44. print("LED Completed")
  45. def __str__(self):
  46. return 'LED'
  47. # instantiate an LED
  48. led = LED()
  49. # Now, build a pipeline to sample events returned from the sensor,
  50. # convert to a boolean based on whether the value is greater than
  51. # the mean, and output to the LED.
  52. import thingflow.filters.map
  53. sensor.map(lambda evt: evt.val > MEAN).connect(led)
  54. # If you want to see the raw value of each sensor, just add the output() element
  55. import thingflow.filters.output
  56. sensor.output()
  57. # Call a debug method on the base output_thing class to see the element tree rooted
  58. # at sensor.
  59. sensor.print_downstream()
  60. # Now, we need to schedule the sensor to be sampled
  61. import asyncio
  62. from thingflow.base import Scheduler
  63. scheduler = Scheduler(asyncio.get_event_loop())
  64. scheduler.schedule_periodic(sensor, 1.0) # sample once a second
  65. scheduler.run_forever() # run until all sensors complete
  66. print("That's all folks!")