test_base.py 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155
  1. # Copyright 2016 by MPI-SWS and Data-Ken Research.
  2. # Licensed under the Apache 2.0 License.
  3. """Some simple tests for the base layer.
  4. """
  5. import asyncio
  6. import unittest
  7. from thingflow.base import Scheduler, SensorAsOutputThing, FunctionFilter
  8. from utils import ValueListSensor, ValidationInputThing, CaptureInputThing
  9. from thingflow.filters.where import where
  10. from thingflow.filters.output import output
  11. from thingflow.filters.map import map
  12. from thingflow.filters.combinators import passthrough
  13. value_stream = [
  14. 20,
  15. 30,
  16. 100,
  17. 120,
  18. 20,
  19. 5,
  20. 2222
  21. ]
  22. expected_stream = [
  23. 100,
  24. 120,
  25. 2222
  26. ]
  27. def predicate(v):
  28. if v[2]>=100.0:
  29. print("v=%s, True" % v[2])
  30. return True
  31. else:
  32. print("v=%s, False" % v[2])
  33. return False
  34. class TestBaseScenario(unittest.TestCase):
  35. def test_where(self):
  36. """In this version, we create a publisher and use method chaining to
  37. compose the filters"""
  38. s = ValueListSensor(1, value_stream)
  39. p = SensorAsOutputThing(s)
  40. w = p.where(predicate)
  41. w.output()
  42. vo = ValidationInputThing(expected_stream, self.test_where)
  43. w.connect(vo)
  44. scheduler = Scheduler(asyncio.get_event_loop())
  45. scheduler.schedule_periodic(p, 0.5) # sample twice every second
  46. p.print_downstream()
  47. scheduler.run_forever()
  48. self.assertTrue(vo.completed,
  49. "Schedule exited before validation observer completed")
  50. print("That's all folks")
  51. def test_schedule_sensor(self):
  52. """In this version, we pass the sensor directly to the scheduler and use
  53. a functional style to compose the filters"""
  54. s = ValueListSensor(1, value_stream)
  55. vo = ValidationInputThing(expected_stream, self.test_schedule_sensor)
  56. scheduler = Scheduler(asyncio.get_event_loop())
  57. scheduler.schedule_sensor(s, 0.5,
  58. where(predicate),
  59. passthrough(vo),
  60. output())
  61. scheduler.run_forever()
  62. self.assertTrue(vo.completed,
  63. "Schedule exited before validation observer completed")
  64. print("That's all folks")
  65. class TestFunctionFilter(unittest.TestCase):
  66. def test_function_filter(self):
  67. """Verify the function filter class
  68. """
  69. s = ValueListSensor(1, value_stream)
  70. st = SensorAsOutputThing(s)
  71. captured_list_ref = [[]]
  72. got_completed_ref = [False,]
  73. def on_next(self, x):
  74. captured_list_ref[0].append(x.val)
  75. self._dispatch_next(x)
  76. def on_completed(self):
  77. got_completed_ref[0] = True
  78. self._dispatch_completed()
  79. ff = FunctionFilter(st, on_next=on_next, on_completed=on_completed)
  80. vo = ValidationInputThing(value_stream, self.test_function_filter)
  81. ff.connect(vo)
  82. st.print_downstream()
  83. scheduler = Scheduler(asyncio.get_event_loop())
  84. scheduler.schedule_periodic(st, 0.5) # sample twice every second
  85. scheduler.run_forever()
  86. self.assertTrue(vo.completed,
  87. "Schedule exited before validation observer completed")
  88. self.assertEqual(value_stream, captured_list_ref[0])
  89. self.assertTrue(got_completed_ref[0])
  90. print("That's all folks")
  91. def test_function_filter_error_handling(self):
  92. """Verify the error handling functionality of the function filter. We do
  93. this by connecting two downstream paths to the sensor. The first includes
  94. a function filter that throws an error when it encouters a sensor reading
  95. of 120. This should disconnect th stream at this point. The second is
  96. a normal validation input thing. It is connected directly to the sensor,
  97. and thus should not see any errors.
  98. """
  99. s = ValueListSensor(1, value_stream)
  100. st = SensorAsOutputThing(s)
  101. captured_list_ref = [[]]
  102. got_completed_ref = [False,]
  103. got_on_error_ref = [False,]
  104. def on_next_throw_exc(self, x):
  105. if x.val==120:
  106. raise Exception("expected exc")
  107. else:
  108. captured_list_ref[0].append(x.val)
  109. self._dispatch_next(x)
  110. def on_completed(self):
  111. got_completed_ref[0] = True
  112. self._dispatch_completed()
  113. def on_error(self, e):
  114. got_on_error_ref[0] = True
  115. self._dispatch_error(e)
  116. ff = FunctionFilter(st, on_next=on_next_throw_exc,
  117. on_completed=on_completed,
  118. on_error=on_error)
  119. ct = CaptureInputThing(expecting_error=True)
  120. ff.map(lambda x: x.val).connect(ct)
  121. vo = ValidationInputThing(value_stream, self.test_function_filter_error_handling)
  122. st.connect(vo)
  123. st.print_downstream()
  124. scheduler = Scheduler(asyncio.get_event_loop())
  125. scheduler.schedule_periodic(st, 0.5) # sample twice every second
  126. scheduler.run_forever()
  127. self.assertTrue(vo.completed,
  128. "Schedule exited before validation observer completed")
  129. self.assertFalse(ct.completed,
  130. "Capture thing should not have completed")
  131. self.assertTrue(ct.errored,
  132. "Capture thing should have seen an error")
  133. self.assertFalse(got_completed_ref[0])
  134. self.assertTrue(got_on_error_ref[0])
  135. self.assertEqual([20, 30, 100], ct.events, "Capture thing event mismatch")
  136. self.assertEqual([20, 30, 100], captured_list_ref[0], "captured_list_ref mismatch")
  137. print("That's all folks")
  138. if __name__ == '__main__':
  139. unittest.main()