| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889 |
- """
- This is an example thingflow program that is described in tutorial.rst.
- """
- # First, let's define a sensor that generates a random number each time
- # it is sampled.
- import random
- random.seed()
- from thingflow.base import SensorAsOutputThing
- class RandomSensor:
- def __init__(self, sensor_id, mean, stddev, stop_after):
- """This sensor will signal it is completed after the
- specified number of events have been sampled.
- """
- self.sensor_id = sensor_id
- self.mean = mean
- self.stddev = stddev
- self.events_left = stop_after
- def sample(self):
- if self.events_left>0:
- data = random.gauss(self.mean, self.stddev)
- self.events_left -= 1
- return data
- else:
- raise StopIteration
-
- def __str__(self):
- return "RandomSensor(%s, %s, %s)" % \
- (self.sensor_id, self.mean, self.stddev)
- # Instantiate our sensor
- MEAN = 100
- STDDEV = 10
- sensor = SensorAsOutputThing(RandomSensor(1, MEAN, STDDEV, stop_after=5))
- # Now, we will define a pretend LED as a subscriber. Each time is it passed
- # True, it will print 'On'. Each time it is passed False, it will print 'Off'.
- from thingflow.base import InputThing
- class LED(InputThing):
- def on_next(self, x):
- if x:
- print("On")
- else:
- print("Off")
- def on_error(self, e):
- print("Got an error: %s" % e)
- def on_completed(self):
- print("LED Completed")
- def __str__(self):
- return 'LED'
- # instantiate an LED
- led = LED()
- # Now, build a pipeline to sample events returned from the sensor,
- # convert to a boolean based on whether the value is greater than
- # the mean, and output to the LED.
- import thingflow.filters.map
- sensor.map(lambda evt: evt.val > MEAN).connect(led)
- # If you want to see the raw value of each sensor, just add the output() element
- import thingflow.filters.output
- sensor.output()
- # Call a debug method on the base output_thing class to see the element tree rooted
- # at sensor.
- sensor.print_downstream()
- # Now, we need to schedule the sensor to be sampled
- import asyncio
- from thingflow.base import Scheduler
- scheduler = Scheduler(asyncio.get_event_loop())
- scheduler.schedule_periodic(sensor, 1.0) # sample once a second
- scheduler.run_forever() # run until all sensors complete
- print("That's all folks!")
-
-
|