| 1234567891011121314151617181920212223242526272829303132333435363738394041424344 |
- # Copyright 2016 by MPI-SWS and Data-Ken Research.
- # Licensed under the Apache 2.0 License.
- """Run an observable that has its own event loop.
- We use the MQTT reader with the mock client to test this
- (does not require an mqtt broker).
- """
- import thingflow.filters.output
- from thingflow.base import Scheduler, InputThing
- from thingflow.adapters.mqtt import MQTTReader, MockMQTTClient
- import unittest
- import asyncio
- class StopLoopAfter(InputThing):
- def __init__(self, stop_after, cancel_thunk):
- self.events_left = stop_after
- self.cancel_thunk = cancel_thunk
- def on_next(self, x):
- self.events_left -= 1
- if self.events_left == 0:
- print("Requesting stop of event loop")
- self.cancel_thunk()
- class TestExternalEventStream(unittest.TestCase):
- def test_case(self):
- """Just run the reader in its own event loop. We stop everything after 4
- events.
- """
- s = Scheduler(asyncio.get_event_loop())
- m = MQTTReader("localhost", topics=[('bogus/bogus', 0),],
- mock_class=MockMQTTClient)
- m.output()
- c = s.schedule_on_private_event_loop(m)
- m.connect(StopLoopAfter(4, c))
- m.print_downstream()
- s.run_forever()
- print("that's it")
- if __name__ == '__main__':
- unittest.main()
-
|