| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081 |
- """A simple example to demonstrate a thing with multiple ports. The
- thing samples values from a sensor and sends them on different output
- ports depending on the divisibility of the value. See docs/ports.rst
- for a more detailed explanation.
- """
- import random
- import asyncio
- from thingflow.base import OutputThing, InputThing, Scheduler,\
- SensorAsOutputThing
- class MultiPortOutputThing(OutputThing, InputThing):
- def __init__(self, previous_in_chain):
- super().__init__(ports=['divisible_by_two', 'divisible_by_three',
- 'other'])
- # connect to the previous filter
- self.disconnect_from_upstream = previous_in_chain.connect(self)
-
- def on_next(self, x):
- val = int(round(x.val))
- if (val%2)==0:
- self._dispatch_next(val, port='divisible_by_two')
- if (val%3)==0:
- self._dispatch_next(val, port='divisible_by_three')
- if (val%3)!=0 and (val%2)!=0:
- self._dispatch_next(val, port='other')
- def on_completed(self):
- self._dispatch_completed(port='divisible_by_two')
- self._dispatch_completed(port='divisible_by_three')
- self._dispatch_completed(port='other')
-
- def on_error(self, e):
- self._dispatch_error(e, port='divisible_by_two')
- self._dispatch_error(e, port='divisible_by_three')
- self._dispatch_error(e, port='other')
-
- def __repr__(self):
- return 'MultiPortOutputThing()'
- class RandomSensor:
- def __init__(self, sensor_id, mean=100.0, stddev=20.0, stop_after_events=None):
- self.sensor_id = sensor_id
- self.mean = mean
- self.stddev = stddev
- self.stop_after_events = stop_after_events
- if stop_after_events is not None:
- def generator():
- for i in range(stop_after_events):
- yield random.gauss(mean, stddev)
- else: # go on forever
- def generator():
- while True:
- yield random.gauss(mean, stddev)
- self.generator = generator()
- def sample(self):
- return self.generator.__next__()
- def __repr__(self):
- if self.stop_after_events is None:
- return 'RandomSensor(%s, mean=%s, stddev=%s)' % \
- (self.sensor_id, self.mean, self.stddev)
- else:
- return 'RandomSensor(%s, mean=%s, stddev=%s, stop_after_events=%s)' % \
- (self.sensor_id, self.mean, self.stddev, self.stop_after_events)
- scheduler = Scheduler(asyncio.get_event_loop())
- sensor = SensorAsOutputThing(RandomSensor(1, mean=10, stddev=5,
- stop_after_events=10))
- mtthing = MultiPortOutputThing(sensor)
- mtthing.connect(lambda v: print("even: %s" % v),
- port_mapping=('divisible_by_two', 'default'))
- mtthing.connect(lambda v: print("divisible by three: %s" % v),
- port_mapping=('divisible_by_three', 'default'))
- mtthing.connect(lambda v: print("not divisible: %s" % v),
- port_mapping=('other', 'default'))
- mtthing.print_downstream()
- scheduler.schedule_recurring(sensor)
- scheduler.run_forever()
|