README.rst 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137
  1. ===========================
  2. Comparison with Async/Await
  3. ===========================
  4. Here, we compare ThingFlow to generic event-driven code using the new
  5. ``async`` / ``await`` language feature available starting with Python 3.5.
  6. Scenario
  7. --------
  8. We wish to sample a sensor once every half second and:
  9. 1. Write the associated event to a CSV fle
  10. 2. Once every 5 samples, send the median of the last five samples to
  11. an MQTT queue for remote processing.
  12. We have an asynchronous interface to the MQTT protocol (via hbmqtt),
  13. which provides coroutines to establish a connection, publish a message, and
  14. disconnect.
  15. Event Version without Async/Await
  16. ---------------------------------
  17. One could write an event-driven solution using the ``asyncio`` library
  18. without the ``async`` and ``await`` statements. This will involve a large
  19. number of callbacks and the resulting code will be very difficult to reason
  20. about. See `this paper <http://dl.acm.org/citation.cfm?id=1244403>`__ for
  21. an in-depth discussion of callback vs. coroutine event-driven programs.
  22. Async/Await Implementation
  23. --------------------------
  24. Assuming we already have a transducer class for the queue output
  25. and a (coroutine-based) adapter to the queue, we can create a coroutine for
  26. sampling and sending the results downstream::
  27. async def sample_and_process(sensor, mqtt_writer, xducer):
  28. try:
  29. sample = sensor.sample()
  30. except StopIteration:
  31. final_event = xducer.complete()
  32. if final_event:
  33. await mqtt_writer.send((final_event.sensor_id, final_event.ts,
  34. final_event.val),)
  35. print("disconnecting")
  36. await mqtt_writer.disconnect()
  37. return False
  38. event = SensorEvent(sensor_id=sensor.sensor_id, ts=time.time(), val=sample)
  39. csv_writer(event)
  40. median_event = xducer.step(event)
  41. if median_event:
  42. await mqtt_writer.send((median_event.sensor_id, median_event.ts,
  43. median_event.val),)
  44. return True
  45. The ``sample_and_process`` function must be a coroutine, as it calls coroutines
  46. defined by the message queue adapter.
  47. We now define global variables establish the main sampling loop::
  48. sensor = RandomSensor('sensor-2', stop_after_events=12)
  49. transducer = PeriodicMedianTransducer(5)
  50. event_loop = asyncio.get_event_loop()
  51. writer = MqttWriter(URL, sensor.sensor_id, event_loop)
  52. def loop():
  53. coro = sample_and_process(sensor, writer, transducer)
  54. task = event_loop.create_task(coro)
  55. def done_callback(f):
  56. exc = f.exception()
  57. if exc:
  58. raise exc
  59. elif f.result()==False:
  60. print("all done, no more callbacks to schedule")
  61. event_loop.stop()
  62. else:
  63. event_loop.call_later(0.5, loop)
  64. task.add_done_callback(done_callback)
  65. event_loop.call_soon(loop)
  66. event_loop.run_forever()
  67. The ``loop`` function schedules our coroutine and sets up a completion
  68. callback (``done_callback``). This callback checks for errors and the
  69. completion of the sensor. If neither of those happened, it reschedules
  70. the loop to run in half a second.
  71. ThingFlow Version
  72. -----------------
  73. In the ThingFlow version, state is maintained by each individual component, and
  74. we can rely on the scheduler to deal with calling our task periodically.
  75. Boundary events (errors and termination) are handled through a combination of
  76. the scheduler and the individual ThingFlow filters. Assuming we already have a
  77. transducer class and an adapter to the queue, the entire code for this scenario
  78. is::
  79. scheduler = Scheduler(asyncio.get_event_loop())
  80. sensor = SensorAsOutputThing(RandomSensor(SENSOR_ID, mean=10, stddev=5, stop_after_events=12))
  81. sensor.csv_writer('raw_data.csv'))
  82. q_writer = QueueWriter(URL, SENSOR_ID, scheduler)
  83. sensor.transduce(PeriodicMedianTransducer()).connect(q_writer)
  84. scheduler.schedule_periodic(sensor, 0.5)
  85. scheduler.run_forever()
  86. After creating our sensor, we subscribe a CSV file writer. We then create a
  87. second pipeline from the sensor, through the transducer, and to the queue.
  88. Evaluation
  89. ----------
  90. The async/await implementation largely follows a traditional procedural style. [1]_
  91. Overall, the async/await has two disadvantages relative to ThingFlow:
  92. 1. The async nature of the coroutines is viral in the sense that any
  93. functions/methods calling an async (coroutine) method must also be
  94. async. This causes implementation decisions about whether to use
  95. asynchronous or synchronous APIs to have a non-local impact. In contrast,
  96. ThingFlow can support asynchronous APIs as well as components running
  97. in separate threads, without any application-level changes.
  98. 2. Each function in the async/await program's call stack must also have control
  99. flow to handle three possible situations: a normal event, an error, or the
  100. end of events from the upstream sensor. In ThingFlow, these are handled
  101. by the (reusable) components via the ``on_next``, ``on_error``, and
  102. ``on_completed`` methods. ThingFlow application code only needs to
  103. be concerned with the overall structure of the data flow.
  104. ThingFlow achieves this simplicity by providing a level of indirection in the
  105. programming model. The ThingFlow code actually generates the application by
  106. connecting and configuring the requested components. The filter abstraction
  107. used by the application programmer is at a much higher level than the procedural
  108. abstractions used in an async/await application.
  109. .. [1] An exception is the periodic scheduling of the sensor function, which requires
  110. the mutually recursive ``loop`` and ``done_callback`` callback functions.
  111. Code
  112. ----
  113. Full working code for both versions is available in this directory:
  114. ``asyncawait.py`` implements the scenario using coroutines and ``things.py``
  115. uses ThingFlow.