test_functional_api.py 3.0 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374
  1. # Copyright 2016 by MPI-SWS and Data-Ken Research.
  2. # Licensed under the Apache 2.0 License.
  3. """Tests for the functional-style api
  4. """
  5. import asyncio
  6. import unittest
  7. import sys
  8. from utils import ValueListSensor, ValidationInputThing
  9. from thingflow.base import Scheduler, SensorAsOutputThing
  10. from thingflow.filters.output import output
  11. from thingflow.filters.select import map
  12. from thingflow.filters.where import where
  13. from thingflow.filters.combinators import passthrough, compose
  14. lux_data = [100, 200, 300, 450, 100, 200, 600]
  15. class TestFunctionalApi(unittest.TestCase):
  16. """These are tests of the various features of the
  17. functional-style API. We want to make certain they
  18. work as advertised.
  19. """
  20. def test_complex_workflow(self):
  21. THRESHOLD = 300
  22. lux = ValueListSensor('lux-1', lux_data)
  23. scheduler = Scheduler(asyncio.get_event_loop())
  24. vs1 = ValidationInputThing(lux_data, self)
  25. vs2 = ValidationInputThing([False, False, False, True, False, False, True],
  26. self,
  27. extract_value_fn=lambda v: v)
  28. scheduler.schedule_sensor(lux, 0.5,
  29. passthrough(output()),
  30. passthrough(vs1),
  31. map(lambda event:event.val > THRESHOLD),
  32. passthrough(lambda v: print('ON' if v else 'OFF')),
  33. vs2, print_downstream=True)
  34. scheduler.run_forever()
  35. self.assertTrue(vs1.completed)
  36. self.assertTrue(vs2.completed)
  37. print("That's all folks")
  38. def test_thunk_builder_handling(self):
  39. """We have logic where we can pass a thunk builder into a combinator and
  40. it will do the right thing. Check that it works"""
  41. scheduler = Scheduler(asyncio.get_event_loop())
  42. lux = ValueListSensor('lux-2', lux_data)
  43. vs = ValidationInputThing(lux_data, self)
  44. scheduler.schedule_sensor(lux, 0.5,
  45. passthrough(output()), # output() evaluates to a thunk
  46. passthrough(output), # output is a thunk builder
  47. passthrough(output(sys.stdout)), # output can take an arg
  48. vs, print_downstream=True)
  49. scheduler.run_forever()
  50. self.assertTrue(vs.completed)
  51. def test_passthrough_as_a_method(self):
  52. """Verify that, when passthrough is used as a method, it can still take
  53. thunks.
  54. """
  55. scheduler = Scheduler(asyncio.get_event_loop())
  56. luxpub = SensorAsOutputThing(ValueListSensor('lux-2', lux_data))
  57. vs1 = ValidationInputThing([450, 600], self)
  58. vs2 = ValidationInputThing(lux_data, self)
  59. luxpub.passthrough(compose(where(lambda evt: evt.val>300), vs1)).connect(vs2)
  60. scheduler.schedule_periodic(luxpub, 0.5)
  61. scheduler.run_forever()
  62. self.assertTrue(vs1.completed)
  63. self.assertTrue(vs2.completed)
  64. if __name__ == '__main__':
  65. unittest.main()