test_iterable_as_output_thing.py 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990
  1. # Copyright 2016 by MPI-SWS and Data-Ken Research.
  2. # Licensed under the Apache 2.0 License.
  3. """Test of IterableAsOutputThing. This was originally test_base.py, but we then
  4. added the sensor infrastructure and rewrote the test. This test verfies the
  5. specific IterableAsOutputThing code.
  6. """
  7. import asyncio
  8. import unittest
  9. from thingflow.base import Scheduler, IterableAsOutputThing
  10. from utils import make_test_output_thing_from_vallist, ValidationInputThing
  11. import thingflow.filters.where
  12. import thingflow.filters.output
  13. value_stream = [
  14. 20,
  15. 30,
  16. 100,
  17. 120,
  18. 20,
  19. 5,
  20. 2222
  21. ]
  22. expected_stream = [
  23. 100,
  24. 120,
  25. 2222
  26. ]
  27. def predicate(v):
  28. if v[2]>=100.0:
  29. print("v=%s, True" % v[2])
  30. return True
  31. else:
  32. print("v=%s, False" % v[2])
  33. return False
  34. class ErrorIterator:
  35. """An iterator that thows an error after the initial stream
  36. (instead of StopIteration).
  37. """
  38. def __init__(self, expected_stream):
  39. self.expected_stream = expected_stream
  40. self.idx = 0
  41. def __iterator__(self):
  42. return self
  43. def __next__(self):
  44. if self.idx==len(self.expected_stream):
  45. raise Exception("Throwing an exception in ErrorIterator")
  46. else:
  47. v = self.expected_stream[self.idx]
  48. self.idx += 1
  49. return v
  50. class TestIterableAsOutputThing(unittest.TestCase):
  51. def test_where(self):
  52. s = make_test_output_thing_from_vallist(1, value_stream)
  53. w = s.where(predicate)
  54. w.output()
  55. vo = ValidationInputThing(expected_stream, self)
  56. w.connect(vo)
  57. scheduler = Scheduler(asyncio.get_event_loop())
  58. scheduler.schedule_periodic(s, 0.5) # sample twice every second
  59. s.print_downstream()
  60. scheduler.run_forever()
  61. self.assertTrue(vo.completed,
  62. "Schedule exited before validation observer completed")
  63. self.assertTrue(vo.completed)
  64. print("That's all folks")
  65. def test_error_handling(self):
  66. """This is a non-fatal error, so we should just print the error and
  67. exit cleanly without propagating the exception.
  68. """
  69. s = IterableAsOutputThing(ErrorIterator(expected_stream))
  70. s.output()
  71. scheduler = Scheduler(asyncio.get_event_loop())
  72. scheduler.schedule_periodic(s, 0.5) # sample twice every second
  73. s.print_downstream()
  74. scheduler.run_forever()
  75. if __name__ == '__main__':
  76. unittest.main()