test_external_event_stream.py 1.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344
  1. # Copyright 2016 by MPI-SWS and Data-Ken Research.
  2. # Licensed under the Apache 2.0 License.
  3. """Run an observable that has its own event loop.
  4. We use the MQTT reader with the mock client to test this
  5. (does not require an mqtt broker).
  6. """
  7. import thingflow.filters.output
  8. from thingflow.base import Scheduler, InputThing
  9. from thingflow.adapters.mqtt import MQTTReader, MockMQTTClient
  10. import unittest
  11. import asyncio
  12. class StopLoopAfter(InputThing):
  13. def __init__(self, stop_after, cancel_thunk):
  14. self.events_left = stop_after
  15. self.cancel_thunk = cancel_thunk
  16. def on_next(self, x):
  17. self.events_left -= 1
  18. if self.events_left == 0:
  19. print("Requesting stop of event loop")
  20. self.cancel_thunk()
  21. class TestExternalEventStream(unittest.TestCase):
  22. def test_case(self):
  23. """Just run the reader in its own event loop. We stop everything after 4
  24. events.
  25. """
  26. s = Scheduler(asyncio.get_event_loop())
  27. m = MQTTReader("localhost", topics=[('bogus/bogus', 0),],
  28. mock_class=MockMQTTClient)
  29. m.output()
  30. c = s.schedule_on_private_event_loop(m)
  31. m.connect(StopLoopAfter(4, c))
  32. m.print_downstream()
  33. s.run_forever()
  34. print("that's it")
  35. if __name__ == '__main__':
  36. unittest.main()