test_blocking_output_thing.py 2.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586
  1. # Copyright 2016 by MPI-SWS and Data-Ken Research.
  2. # Licensed under the Apache 2.0 License.
  3. """Run an output_thing that might block in a separate background thread
  4. """
  5. import time
  6. import unittest
  7. from thingflow.base import OutputThing, DirectOutputThingMixin, InputThing,\
  8. Scheduler
  9. from thingflow.filters.combinators import passthrough
  10. from thingflow.filters.output import output
  11. from utils import ValidationInputThing
  12. import asyncio
  13. EVENTS = 4
  14. class BlockingOutputThing(OutputThing, DirectOutputThingMixin):
  15. def __init__(self):
  16. super().__init__()
  17. self.event_count = 0
  18. def _observe(self):
  19. self.event_count += 1
  20. time.sleep(0.5) # simulate a blocking call
  21. self._dispatch_next(self.event_count)
  22. class StopLoopAfter(InputThing):
  23. def __init__(self, stop_after, cancel_thunk):
  24. self.events_left = stop_after
  25. self.cancel_thunk = cancel_thunk
  26. def on_next(self, x):
  27. self.events_left -= 1
  28. if self.events_left == 0:
  29. print("Requesting stop of event loop")
  30. self.cancel_thunk()
  31. class BlockingSensor:
  32. def __init__(self, sensor_id, stop_after):
  33. self.sensor_id = sensor_id
  34. self.stop_after = stop_after
  35. self.event_count = 0
  36. def sample(self):
  37. if self.event_count==self.stop_after:
  38. raise StopIteration
  39. self.event_count += 1
  40. time.sleep(0.5) # simulate a blocking call
  41. return self.event_count
  42. def __repr__(self):
  43. return "BlockingSensor(%s, stop_after=%s)" % (self.sensor_id,
  44. self.stop_after)
  45. class TestCase(unittest.TestCase):
  46. def test_blocking_output_thing(self):
  47. o = BlockingOutputThing()
  48. o.output()
  49. scheduler = Scheduler(asyncio.get_event_loop())
  50. c = scheduler.schedule_periodic_on_separate_thread(o, 1)
  51. vs = ValidationInputThing([i+1 for i in range(EVENTS)], self,
  52. extract_value_fn=lambda v:v)
  53. o.connect(vs)
  54. o.connect(StopLoopAfter(EVENTS, c))
  55. o.print_downstream()
  56. scheduler.run_forever()
  57. print("that's it")
  58. def test_blocking_sensor(self):
  59. s = BlockingSensor(1, stop_after=EVENTS)
  60. scheduler = Scheduler(asyncio.get_event_loop())
  61. scheduler.schedule_sensor_on_separate_thread(s, 1,
  62. passthrough(output()),
  63. ValidationInputThing([i+1 for i in range(EVENTS)], self,
  64. extract_value_fn=lambda v:v),
  65. make_event_fn=lambda s, v: v)
  66. scheduler.run_forever()
  67. print("that's it")
  68. if __name__ == '__main__':
  69. unittest.main()