orbitzs 3e5a6ea51e add esp8266 project files пре 4 година
..
.gitignore 3e5a6ea51e add esp8266 project files пре 4 година
README.rst 3e5a6ea51e add esp8266 project files пре 4 година
asyncawait.py 3e5a6ea51e add esp8266 project files пре 4 година
event.py 3e5a6ea51e add esp8266 project files пре 4 година
things.py 3e5a6ea51e add esp8266 project files пре 4 година

README.rst

===========================
Comparison with Async/Await
===========================

Here, we compare ThingFlow to generic event-driven code using the new
``async`` / ``await`` language feature available starting with Python 3.5.

Scenario
--------
We wish to sample a sensor once every half second and:

1. Write the associated event to a CSV fle
2. Once every 5 samples, send the median of the last five samples to
an MQTT queue for remote processing.

We have an asynchronous interface to the MQTT protocol (via hbmqtt),
which provides coroutines to establish a connection, publish a message, and
disconnect.

Event Version without Async/Await
---------------------------------
One could write an event-driven solution using the ``asyncio`` library
without the ``async`` and ``await`` statements. This will involve a large
number of callbacks and the resulting code will be very difficult to reason
about. See `this paper `__ for
an in-depth discussion of callback vs. coroutine event-driven programs.

Async/Await Implementation
--------------------------
Assuming we already have a transducer class for the queue output
and a (coroutine-based) adapter to the queue, we can create a coroutine for
sampling and sending the results downstream::

async def sample_and_process(sensor, mqtt_writer, xducer):
try:
sample = sensor.sample()
except StopIteration:
final_event = xducer.complete()
if final_event:
await mqtt_writer.send((final_event.sensor_id, final_event.ts,
final_event.val),)
print("disconnecting")
await mqtt_writer.disconnect()
return False
event = SensorEvent(sensor_id=sensor.sensor_id, ts=time.time(), val=sample)
csv_writer(event)
median_event = xducer.step(event)
if median_event:
await mqtt_writer.send((median_event.sensor_id, median_event.ts,
median_event.val),)
return True

The ``sample_and_process`` function must be a coroutine, as it calls coroutines
defined by the message queue adapter.

We now define global variables establish the main sampling loop::

sensor = RandomSensor('sensor-2', stop_after_events=12)
transducer = PeriodicMedianTransducer(5)
event_loop = asyncio.get_event_loop()
writer = MqttWriter(URL, sensor.sensor_id, event_loop)

def loop():
coro = sample_and_process(sensor, writer, transducer)
task = event_loop.create_task(coro)
def done_callback(f):
exc = f.exception()
if exc:
raise exc
elif f.result()==False:
print("all done, no more callbacks to schedule")
event_loop.stop()
else:
event_loop.call_later(0.5, loop)
task.add_done_callback(done_callback)

event_loop.call_soon(loop)
event_loop.run_forever()

The ``loop`` function schedules our coroutine and sets up a completion
callback (``done_callback``). This callback checks for errors and the
completion of the sensor. If neither of those happened, it reschedules
the loop to run in half a second.

ThingFlow Version
-----------------
In the ThingFlow version, state is maintained by each individual component, and
we can rely on the scheduler to deal with calling our task periodically.
Boundary events (errors and termination) are handled through a combination of
the scheduler and the individual ThingFlow filters. Assuming we already have a
transducer class and an adapter to the queue, the entire code for this scenario
is::

scheduler = Scheduler(asyncio.get_event_loop())
sensor = SensorAsOutputThing(RandomSensor(SENSOR_ID, mean=10, stddev=5, stop_after_events=12))
sensor.csv_writer('raw_data.csv'))
q_writer = QueueWriter(URL, SENSOR_ID, scheduler)
sensor.transduce(PeriodicMedianTransducer()).connect(q_writer)
scheduler.schedule_periodic(sensor, 0.5)
scheduler.run_forever()

After creating our sensor, we subscribe a CSV file writer. We then create a
second pipeline from the sensor, through the transducer, and to the queue.

Evaluation
----------
The async/await implementation largely follows a traditional procedural style. [1]_
Overall, the async/await has two disadvantages relative to ThingFlow:

1. The async nature of the coroutines is viral in the sense that any
functions/methods calling an async (coroutine) method must also be
async. This causes implementation decisions about whether to use
asynchronous or synchronous APIs to have a non-local impact. In contrast,
ThingFlow can support asynchronous APIs as well as components running
in separate threads, without any application-level changes.
2. Each function in the async/await program's call stack must also have control
flow to handle three possible situations: a normal event, an error, or the
end of events from the upstream sensor. In ThingFlow, these are handled
by the (reusable) components via the ``on_next``, ``on_error``, and
``on_completed`` methods. ThingFlow application code only needs to
be concerned with the overall structure of the data flow.

ThingFlow achieves this simplicity by providing a level of indirection in the
programming model. The ThingFlow code actually generates the application by
connecting and configuring the requested components. The filter abstraction
used by the application programmer is at a much higher level than the procedural
abstractions used in an async/await application.


.. [1] An exception is the periodic scheduling of the sensor function, which requires
the mutually recursive ``loop`` and ``done_callback`` callback functions.

Code
----
Full working code for both versions is available in this directory:
``asyncawait.py`` implements the scenario using coroutines and ``things.py``
uses ThingFlow.