| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155 |
- .. _functional:
- 5. Functional API
- =================
- Motivation
- ----------
- The primary API that ThingFlow provides for filters is a *fluent* API based
- on the concept of *method chaining*: each filter method on the ``OutputThing``
- base class returns the last thing in the connection chain. This
- result can then be used for subsequent calls. For example, to apply a
- filter followed by a map, we might say::
- thing.filter(lambda evt: evt.val > 300).map(lambda evt:evt.val)
- Underneath the covers, the ``filter()`` call returns a ``Filter`` object
- (a subclass of ``OutputThing``). The ``map()`` method call is then made
- against this object.
- This approach is convenient when your processing pipeline really is a
- straight line. If you have parallel branches, or more complex structures,
- you end up having to break it up with assignment statements. For example,
- consider the following dataflow, based on the code in
- ``examples/rpi/lux_sensor_example.py``:
- .. code-block:: python
- lux = SensorPub(LuxSensor())
- lux.output()
- lux.csv_writer(os.path.expanduser('~/lux.csv'))
- actions = lux.map(lambda event: event.val > threshold)
- actions.subscribe(GpioPinOut())
- actions.subscribe(lambda v: print('ON' if v else 'OFF'))
- scheduler = Scheduler(asyncio.get_event_loop())
- scheduler.schedule_periodic_on_separate_thread(lux, interval)
- scheduler.run_forever()
- In the above code, ``lux`` has three subscribers, and the output of the ``map``
- filter has two subscribers.
- Functional API
- --------------
- To simplfy these cases, we provide a *functional* API that can be used in
- place of (or along with) the *fluent* API. For each method added to the
- thing via the ``@filtermethod`` decorator (in ``thingflow.base``), a
- function with the same name is added to the module containing the definition
- (e.g. ``thingflow.filters.output`` has an ``output`` function and
- ``thingflow.filters.map`` has ``map`` and ``select`` functions). These functions
- take all the parameters of the associated method call (except for the implied
- ``self`` parameter of a bound method) and return what we call a *thunk*.
- In this case, a thunk is a function that accepts exactly one parameter, a
- output thing. The thunk subscribes one or more fitlers to the output thing and, if
- further downstream connections are permitted, returns the last filter in the
- chain. When composing filters, thunks can be used as follows:
- 1. The ``Schedule`` class has ``schedule_sensor()`` and
- ``schedule_sensor_on_separate_thread()`` methods. These take a
- sensor, wrap it in a ``SensorAsOutputThing`` instance, and then connect a sequence
- of filters to the output thing. Each filter can be passed in directly or
- passed indirectly via thunks.
- 2. The module ``thingflow.filters.combinators`` defines several functions that
- can be used to combine filters and thunks. These include ``compose``
- (sequential composition), ``parallel`` (parallel composition), and
- ``passthrough`` (parallel composition of a single spur off the main chain).
- Example
- -------
- Now, let us look at the lux sensor example, using the functional API [1]_:
- .. code-block:: python
-
- scheduler = Scheduler(asyncio.get_event_loop())
- scheduler.schedule_sensor(lux, interval,
- passthrough(output()),
- passthrough(csv_writer('/tmp/lux.csv')),
- map(lambda event:event.val > THRESHOLD),
- passthrouh(lambda v: print('ON' if v else 'OFF')),
- GpioPinOut())
- scheduler.run_forever()
-
- Notice that we do not need to instantiate any intermediate variables. Everything
- happens in the ``schedule_sensor()`` call. The first argument to this call is
- the sensor (without being wrapped in ``SensorAsOutputThing``) and the second argument
- is the sample interval. The rest of the arguments are a sequence of filters
- and thunks to be called. Using a bit of ASCII art, the graph created looks as
- follows::
- output
- /
- LuxSensor - csv_writer
- \
- map - lambda v: print(...)
- \
- GpioPinOut
- The lux sensor has three connections: ``output``, ``csv_writer``, and ``map``.
- We get this fanout by using the ``passthrough`` combinator, which creates a
- spur off the main chain. A ``passthrough`` is then used with the output of
- the ``map``, with the main chain finally ending at ``GpioPinOut``.
- .. [1] A full, self-contained version of this example may be found at
- ``examples/functional_api_example.py``.
- Combining the Fluent and Functional APIs
- ----------------------------------------
- You can use the functional API within a fluent API method chain. For example,
- let us include a sequence of filters in a ``passthrough()``:
- .. code-block:: python
- sensor = SensorAsOutputThing(LuxSensor())
- sensor.passthrough(compose(map(lambda event:event.val>THRESHOLD), output()))\
- .csv_writer('/tmp/lux.csv')
- Here, we used ``compose`` to build a sequence of ``map`` followed by ``output``.
- Note that the final ``csv_writer`` call is run against the original events
- output by the sensor, not on the mapped events. Here is the resuting
- graph::
- map - output
- /
- LuxSensor - csvwriter
- Internals
- ---------
- The linq-style functions of the fluent API are defined to be
- a kind of extension method -- their first parameter, usually named ``this``, is
- the output thing on which the method will eventually be attached (to borrow
- Smalltalk terminology, the "receiver"). The function
- takes zero or more additional parameters and returns a ``Filter`` object to be
- used for further chaining.
- The decorator ``thingflow.base.filtermethod`` adds a linq-function as a method
- on a base class (usually ``OutputThing``), effectively binding the ``this``
- parameter and, thus, the receiver. To support the functional API, the
- ``filtermethod`` decorator also wraps the linq-function in a
- ``_ThunkBuilder`` object. This object, when called with
- the parameters intended for our linq-function, returns a *thunk* -- a function
- that has all parameters bound except the ``this`` receiver. When a thunk is
- called (passing a output thing as a parameter), it calls the original linq-function
- with the output thing as the ``this`` receiver and the rest of the parameters
- coming from the original ``_ThunkBuilder`` call.
- The functional API also needs some special handling in cases where we may make
- ``connect`` calls under the covers (e.g. the ``Scheduler.schedule_sensor()``
- method or the various combinators in ``thingflow.filters.combinators``). Depending
- on whether the input thing being passed in is a filter, a thunk, a thunk-builder,
- or a plain function, we need to handle it differently. For example, if we are
- given a filter ``f``, we can connect it to our receiver ``this`` via
- ``this.connect(f)``. However, if we are given a thunk ``t``, we achieve the
- same thing via ``t(this)``. All of this logic is cenralized in
- ``thingflow.base._subscribe_thunk``.
|