test_blocking_input_thing.py 2.0 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556
  1. # Copyright 2016 by MPI-SWS and Data-Ken Research.
  2. # Licensed under the Apache 2.0 License.
  3. """Run a subscriber that blocks in its send call. It gets a separate dedicated thread.
  4. """
  5. import unittest
  6. import asyncio
  7. import time
  8. from thingflow.base import BlockingInputThing, Scheduler
  9. from utils import make_test_output_thing_from_vallist
  10. values = [ 1, 2, 3, 4, 5 ]
  11. class TestInputThing(BlockingInputThing):
  12. def __init__(self, scheduler, expected_sequence, test_case):
  13. self.tc = test_case
  14. self.expected_sequence = expected_sequence
  15. self.idx = 0
  16. self.completed = False
  17. super().__init__(scheduler)
  18. def _on_next(self, port, x):
  19. assert port=='default'
  20. print("TestInputThing._on_next(%s)" % x.__repr__())
  21. self.tc.assertTrue(self.idx < len(self.expected_sequence),
  22. "Received an event %s, but already at end of expected sequence" %
  23. x.__repr__())
  24. self.tc.assertEqual(self.expected_sequence[self.idx], x[2],
  25. "Expected and actual values do not match for item %d" % self.idx)
  26. self.idx += 1
  27. def _on_completed(self, port):
  28. assert port=='default'
  29. self.tc.assertEqual(len(self.expected_sequence), self.idx,
  30. "Received on_completed when not at end of expected sequence")
  31. self.completed = True
  32. def _on_error(self, port, e):
  33. raise Exception("Should not get an on_error event. Got exception %s" % e)
  34. class TestCase(unittest.TestCase):
  35. def test(self):
  36. scheduler = Scheduler(asyncio.get_event_loop())
  37. sensor = make_test_output_thing_from_vallist(1, values)
  38. scheduler.schedule_periodic(sensor, 1)
  39. blocking_subscriber = TestInputThing(scheduler, values, self)
  40. sensor.connect(blocking_subscriber)
  41. scheduler.run_forever()
  42. self.assertTrue(blocking_subscriber.completed)
  43. if __name__ == '__main__':
  44. unittest.main()