test_fatal_error_handling.py 1.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041
  1. # Copyright 2016 by MPI-SWS and Data-Ken Research.
  2. # Licensed under the Apache 2.0 License.
  3. """Test that a fatal error causes the scheduler to exit.
  4. """
  5. from thingflow.base import *
  6. from utils import make_test_output_thing
  7. import sys
  8. import asyncio
  9. import unittest
  10. class DieAfter(InputThing):
  11. def __init__(self, num_events):
  12. self.events_left = num_events
  13. def on_next(self, x):
  14. self.events_left -= 1
  15. if self.events_left == 0:
  16. print("throwing fatal error")
  17. raise FatalError("this is a fatal error")
  18. class TestFatalErrorHandling(unittest.TestCase):
  19. def test_case(self):
  20. sensor = make_test_output_thing(1)
  21. sensor.connect(print)
  22. sensor2 = make_test_output_thing(2)
  23. sensor2.connect(print)
  24. s = Scheduler(asyncio.get_event_loop())
  25. s.schedule_periodic(sensor, 1)
  26. s.schedule_periodic(sensor2, 1)
  27. sensor.connect(DieAfter(4))
  28. sensor.print_downstream()
  29. try:
  30. s.run_forever()
  31. except FatalError:
  32. print("got to end with fatal error thrown as expected")
  33. else:
  34. self.assertFalse(1, "Did not get to a fatal error")
  35. if __name__ == '__main__':
  36. unittest.main()