multi_port_example.py 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081
  1. """A simple example to demonstrate a thing with multiple ports. The
  2. thing samples values from a sensor and sends them on different output
  3. ports depending on the divisibility of the value. See docs/ports.rst
  4. for a more detailed explanation.
  5. """
  6. import random
  7. import asyncio
  8. from thingflow.base import OutputThing, InputThing, Scheduler,\
  9. SensorAsOutputThing
  10. class MultiPortOutputThing(OutputThing, InputThing):
  11. def __init__(self, previous_in_chain):
  12. super().__init__(ports=['divisible_by_two', 'divisible_by_three',
  13. 'other'])
  14. # connect to the previous filter
  15. self.disconnect_from_upstream = previous_in_chain.connect(self)
  16. def on_next(self, x):
  17. val = int(round(x.val))
  18. if (val%2)==0:
  19. self._dispatch_next(val, port='divisible_by_two')
  20. if (val%3)==0:
  21. self._dispatch_next(val, port='divisible_by_three')
  22. if (val%3)!=0 and (val%2)!=0:
  23. self._dispatch_next(val, port='other')
  24. def on_completed(self):
  25. self._dispatch_completed(port='divisible_by_two')
  26. self._dispatch_completed(port='divisible_by_three')
  27. self._dispatch_completed(port='other')
  28. def on_error(self, e):
  29. self._dispatch_error(e, port='divisible_by_two')
  30. self._dispatch_error(e, port='divisible_by_three')
  31. self._dispatch_error(e, port='other')
  32. def __repr__(self):
  33. return 'MultiPortOutputThing()'
  34. class RandomSensor:
  35. def __init__(self, sensor_id, mean=100.0, stddev=20.0, stop_after_events=None):
  36. self.sensor_id = sensor_id
  37. self.mean = mean
  38. self.stddev = stddev
  39. self.stop_after_events = stop_after_events
  40. if stop_after_events is not None:
  41. def generator():
  42. for i in range(stop_after_events):
  43. yield random.gauss(mean, stddev)
  44. else: # go on forever
  45. def generator():
  46. while True:
  47. yield random.gauss(mean, stddev)
  48. self.generator = generator()
  49. def sample(self):
  50. return self.generator.__next__()
  51. def __repr__(self):
  52. if self.stop_after_events is None:
  53. return 'RandomSensor(%s, mean=%s, stddev=%s)' % \
  54. (self.sensor_id, self.mean, self.stddev)
  55. else:
  56. return 'RandomSensor(%s, mean=%s, stddev=%s, stop_after_events=%s)' % \
  57. (self.sensor_id, self.mean, self.stddev, self.stop_after_events)
  58. scheduler = Scheduler(asyncio.get_event_loop())
  59. sensor = SensorAsOutputThing(RandomSensor(1, mean=10, stddev=5,
  60. stop_after_events=10))
  61. mtthing = MultiPortOutputThing(sensor)
  62. mtthing.connect(lambda v: print("even: %s" % v),
  63. port_mapping=('divisible_by_two', 'default'))
  64. mtthing.connect(lambda v: print("divisible by three: %s" % v),
  65. port_mapping=('divisible_by_three', 'default'))
  66. mtthing.connect(lambda v: print("not divisible: %s" % v),
  67. port_mapping=('other', 'default'))
  68. mtthing.print_downstream()
  69. scheduler.schedule_recurring(sensor)
  70. scheduler.run_forever()