| 1234567891011121314151617181920212223242526272829303132333435363738394041 |
- # Copyright 2016 by MPI-SWS and Data-Ken Research.
- # Licensed under the Apache 2.0 License.
- """Test that a fatal error causes the scheduler to exit.
- """
- from thingflow.base import *
- from utils import make_test_output_thing
- import sys
- import asyncio
- import unittest
- class DieAfter(InputThing):
- def __init__(self, num_events):
- self.events_left = num_events
- def on_next(self, x):
- self.events_left -= 1
- if self.events_left == 0:
- print("throwing fatal error")
- raise FatalError("this is a fatal error")
- class TestFatalErrorHandling(unittest.TestCase):
- def test_case(self):
- sensor = make_test_output_thing(1)
- sensor.connect(print)
- sensor2 = make_test_output_thing(2)
- sensor2.connect(print)
- s = Scheduler(asyncio.get_event_loop())
- s.schedule_periodic(sensor, 1)
- s.schedule_periodic(sensor2, 1)
- sensor.connect(DieAfter(4))
- sensor.print_downstream()
- try:
- s.run_forever()
- except FatalError:
- print("got to end with fatal error thrown as expected")
- else:
- self.assertFalse(1, "Did not get to a fatal error")
- if __name__ == '__main__':
- unittest.main()
|