| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859 |
- """This is a demonstration of the fuctional API for filters. It is
- based on examples/rpi/lux_sensor_example.py. See docs/functional-api.rst
- for details.
- """
- import asyncio
- import random
- random.seed()
- from thingflow.base import InputThing, Scheduler
- from thingflow.filters.output import output
- from thingflow.filters.select import map
- from thingflow.adapters.csv import csv_writer
- from thingflow.filters.combinators import passthrough
- class DummyLuxSensor:
- def __init__(self, sensor_id, mean=300, stddev=100, stop_after=5):
- """Rather than use the real RPI sensor here, we will just
- define one that generates random numbers.
- """
- self.sensor_id = sensor_id
- self.mean = mean
- self.stddev = stddev
- self.events_left = stop_after
- def sample(self):
- if self.events_left>0:
- data = random.gauss(self.mean, self.stddev)
- self.events_left -= 1
- return data
- else:
- raise StopIteration
-
- def __repr__(self):
- return "DummyLuxSensor(%s, %s, %s)" % \
- (self.sensor_id, self.mean, self.stddev)
- class DummyLed(InputThing):
- def on_next(seelf, x):
- if x:
- print("LED ON")
- else:
- print("LED OFF")
- def __repr__(self):
- return 'DummyLed'
- THRESHOLD = 300
- # Instantiate the sensor and use the functional API to build a flow
- lux = DummyLuxSensor("lux-1")
- scheduler = Scheduler(asyncio.get_event_loop())
- scheduler.schedule_sensor(lux, 1.0,
- passthrough(output()),
- passthrough(csv_writer('/tmp/lux.csv')),
- map(lambda event:event.val > THRESHOLD),
- passthrough(lambda v: print('ON' if v else 'OFF')),
- DummyLed(), print_downstream=True)
- scheduler.run_forever()
- print("That's all folks")
|