test_scheduler_cancel.py 1.0 KB

123456789101112131415161718192021222324252627282930313233343536
  1. # Copyright 2016 by MPI-SWS and Data-Ken Research.
  2. # Licensed under the Apache 2.0 License.
  3. """Cancel an active schedule. Since this is the last active schedule, it
  4. should cleanly stop the scheduler.
  5. """
  6. from thingflow.base import *
  7. from utils import make_test_output_thing
  8. import asyncio
  9. import unittest
  10. class CallAfter(InputThing):
  11. def __init__(self, num_events, fn):
  12. self.events_left = num_events
  13. self.fn = fn
  14. def on_next(self, x):
  15. self.events_left -= 1
  16. if self.events_left == 0:
  17. print("calling fn %s" % self.fn)
  18. self.fn()
  19. class TestSchedulerCancel(unittest.TestCase):
  20. def test_case(self):
  21. sensor = make_test_output_thing(1)
  22. sensor.connect(print)
  23. s = Scheduler(asyncio.get_event_loop())
  24. cancel_schedule = s.schedule_periodic(sensor, 1)
  25. sensor.connect(CallAfter(4, cancel_schedule))
  26. sensor.print_downstream()
  27. s.run_forever()
  28. print("got to end")
  29. if __name__ == '__main__':
  30. unittest.main()