test_timeout.py 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113
  1. # Copyright 2016 by MPI-SWS and Data-Ken Research.
  2. # Licensed under the Apache 2.0 License.
  3. """Verify that we can set up timeout events
  4. """
  5. import asyncio
  6. import unittest
  7. from thingflow.base import Scheduler, FunctionFilter
  8. import thingflow.filters.timeout
  9. import thingflow.filters.output
  10. from utils import make_test_output_thing_from_vallist, ValidationInputThing
  11. def on_next_alternate(self, x):
  12. if self.keep_mode:
  13. print("Sending %s" % x.__repr__())
  14. self._dispatch_next(x)
  15. else:
  16. print("Dropping %s" % x.__repr__())
  17. self.countdown -= 1
  18. if self.countdown==0:
  19. self.countdown = self.N
  20. self.keep_mode = not self.keep_mode
  21. class DropPeriodic(FunctionFilter):
  22. """Allow through N events, drop the next N, and then repeat.
  23. """
  24. def __init__(self, previous_in_chain, N=1):
  25. self.N = N
  26. self.countdown = N
  27. self.keep_mode = True
  28. super().__init__(previous_in_chain, on_next=on_next_alternate,
  29. name='drop_alternate')
  30. class EventWatcher(thingflow.filters.timeout.EventWatcher):
  31. """Repeat the last good event
  32. """
  33. def __init__(self):
  34. self.last_good_event = None
  35. def on_next(self, x):
  36. self.last_good_event = x
  37. def produce_event_for_timeout(self):
  38. print("producing event for timeout: %s" %
  39. self.last_good_event.__repr__())
  40. return self.last_good_event
  41. sensor_values = [
  42. 1,
  43. 2,
  44. 3,
  45. 4,
  46. 5,
  47. 6
  48. ]
  49. expected_values = [
  50. 1,
  51. 1,
  52. 3,
  53. 3,
  54. 5,
  55. 5
  56. ]
  57. expected_values_multiple_timeouts = [
  58. 1,
  59. 2,
  60. 2,
  61. 2,
  62. 5,
  63. 6
  64. ]
  65. class TestTimeouts(unittest.TestCase):
  66. def test_supplying_event_on_timeout(self):
  67. """In this testcase, we drop every other event.
  68. We set the timeout to a bit longer than the event interval of
  69. one second. It then supplies the previous event. The resulting
  70. output stream will show every other value repeated twice.
  71. """
  72. sensor = make_test_output_thing_from_vallist(1, sensor_values)
  73. drop = DropPeriodic(sensor)
  74. scheduler = Scheduler(asyncio.get_event_loop())
  75. vo = ValidationInputThing(expected_values, self)
  76. drop.supply_event_when_timeout(EventWatcher(),
  77. scheduler, 1.1).output().connect(vo)
  78. scheduler.schedule_periodic(sensor, 1)
  79. sensor.print_downstream()
  80. scheduler.run_forever()
  81. self.assertTrue(vo.completed,
  82. "Schedule exited before validation observer completed")
  83. def test_multiple_timeouts(self):
  84. """In this testcase, we pass two events, drop two events, etc.
  85. We set the timeout to a bit longer than the event interval. The last
  86. good value is supplied when the timeout expires. Thus, we should see
  87. two good events, two repeats of the first event, two good events, etc.
  88. """
  89. sensor = make_test_output_thing_from_vallist(1, sensor_values)
  90. drop = DropPeriodic(sensor, N=2)
  91. scheduler = Scheduler(asyncio.get_event_loop())
  92. vo = ValidationInputThing(expected_values_multiple_timeouts, self)
  93. drop.supply_event_when_timeout(EventWatcher(),
  94. scheduler, 1.1).output().connect(vo)
  95. scheduler.schedule_periodic(sensor, 1)
  96. sensor.print_downstream()
  97. scheduler.run_forever()
  98. self.assertTrue(vo.completed,
  99. "Schedule exited before validation observer completed")
  100. if __name__ == '__main__':
  101. unittest.main()