test_descheduling.py 1.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455
  1. """
  2. Test that, when all downstream input things are disconnected,
  3. the schedule deschedules the OutputThing.
  4. """
  5. import asyncio
  6. import unittest
  7. from thingflow.base import InputThing, SensorAsOutputThing, Scheduler
  8. from utils import RandomSensor
  9. class PrintAndDeschedule(InputThing):
  10. def __init__(self, prev_in_chain, num_events=5):
  11. self.disconnect = prev_in_chain.connect(self)
  12. self.num_events = num_events
  13. self.prev_in_chain = prev_in_chain
  14. self.count = 0
  15. def on_next(self, x):
  16. print(x)
  17. self.count+=1
  18. if self.count==self.num_events:
  19. print("Disconnecting from sensor")
  20. self.disconnect()
  21. print(self.prev_in_chain.__connections__)
  22. def on_completed(self):
  23. print("on_completed")
  24. def on_error(self, e):
  25. print("on_error(%s)" % e)
  26. class TestDescheduling(unittest.TestCase):
  27. def test_recurring(self):
  28. s = SensorAsOutputThing(RandomSensor(1))
  29. PrintAndDeschedule(s)
  30. scheduler = Scheduler(asyncio.get_event_loop())
  31. scheduler.schedule_recurring(s)
  32. scheduler.run_forever()
  33. print("Exited successfully")
  34. def test_periodic(self):
  35. s = SensorAsOutputThing(RandomSensor(1))
  36. PrintAndDeschedule(s)
  37. scheduler = Scheduler(asyncio.get_event_loop())
  38. scheduler.schedule_periodic(s, 0.25)
  39. scheduler.run_forever()
  40. print("Exited successfully")
  41. if __name__ == '__main__':
  42. unittest.main()