test_multiple_output_ports.py 2.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465
  1. # Copyright 2016 by MPI-SWS and Data-Ken Research.
  2. # Licensed under the Apache 2.0 License.
  3. """
  4. Build a filter that takes an input stream and dispatches to one of several
  5. output ports based on the input value.
  6. """
  7. import asyncio
  8. import unittest
  9. from thingflow.base import OutputThing, InputThing, Scheduler
  10. from utils import make_test_output_thing
  11. import thingflow.filters.where
  12. import thingflow.filters.output
  13. class SplitOutputThing(OutputThing, InputThing):
  14. """Here is a filter that takes a sequence of sensor events as its input
  15. and the splits it into one of three output ports: 'below' if the
  16. value is below one standard deviation from the mean, 'above'
  17. if the value is above one standard deviation from the mean, and
  18. 'within' if the value is within a standard deviation from the mean.
  19. """
  20. def __init__(self, mean=100.0, stddev=20.0):
  21. OutputThing.__init__(self, ports=['above', 'below', 'within'])
  22. self.mean = mean
  23. self.stddev = stddev
  24. def on_next(self, x):
  25. val = x[2]
  26. if val < (self.mean-self.stddev):
  27. #print("split: value=%s dispatching to below" % val)
  28. self._dispatch_next(val, port='below')
  29. elif val > (self.mean+self.stddev):
  30. #print("split: value=%s dispatching to above" % val)
  31. self._dispatch_next(val, port='above')
  32. else:
  33. #print("split: value=%s dispatching to within" % val)
  34. self._dispatch_next(val, port='within')
  35. def __str__(self):
  36. return "SplitOutputThing"
  37. class TestMultiplePubports(unittest.TestCase):
  38. def test_case(self):
  39. sensor = make_test_output_thing(1, stop_after_events=10)
  40. split= SplitOutputThing()
  41. sensor.connect(split)
  42. split.connect(lambda x: print("above:%s" % x),
  43. port_mapping=('above','default'))
  44. split.connect(lambda x: print("below:%s" % x),
  45. port_mapping=('below', 'default'))
  46. split.connect(lambda x: print("within:%s" % x),
  47. port_mapping=('within', 'default'))
  48. scheduler = Scheduler(asyncio.get_event_loop())
  49. scheduler.schedule_periodic(sensor, 1)
  50. sensor.print_downstream()
  51. scheduler.run_forever()
  52. print("that's all")
  53. if __name__ == '__main__':
  54. unittest.main()