| 123456789101112131415161718192021222324252627282930313233343536 |
- # Copyright 2016 by MPI-SWS and Data-Ken Research.
- # Licensed under the Apache 2.0 License.
- """Cancel an active schedule. Since this is the last active schedule, it
- should cleanly stop the scheduler.
- """
- from thingflow.base import *
- from utils import make_test_output_thing
- import asyncio
- import unittest
- class CallAfter(InputThing):
- def __init__(self, num_events, fn):
- self.events_left = num_events
- self.fn = fn
- def on_next(self, x):
- self.events_left -= 1
- if self.events_left == 0:
- print("calling fn %s" % self.fn)
- self.fn()
- class TestSchedulerCancel(unittest.TestCase):
- def test_case(self):
- sensor = make_test_output_thing(1)
- sensor.connect(print)
- s = Scheduler(asyncio.get_event_loop())
- cancel_schedule = s.schedule_periodic(sensor, 1)
- sensor.connect(CallAfter(4, cancel_schedule))
- sensor.print_downstream()
- s.run_forever()
- print("got to end")
- if __name__ == '__main__':
- unittest.main()
|