functional-api.rst 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155
  1. .. _functional:
  2. 5. Functional API
  3. =================
  4. Motivation
  5. ----------
  6. The primary API that ThingFlow provides for filters is a *fluent* API based
  7. on the concept of *method chaining*: each filter method on the ``OutputThing``
  8. base class returns the last thing in the connection chain. This
  9. result can then be used for subsequent calls. For example, to apply a
  10. filter followed by a map, we might say::
  11. thing.filter(lambda evt: evt.val > 300).map(lambda evt:evt.val)
  12. Underneath the covers, the ``filter()`` call returns a ``Filter`` object
  13. (a subclass of ``OutputThing``). The ``map()`` method call is then made
  14. against this object.
  15. This approach is convenient when your processing pipeline really is a
  16. straight line. If you have parallel branches, or more complex structures,
  17. you end up having to break it up with assignment statements. For example,
  18. consider the following dataflow, based on the code in
  19. ``examples/rpi/lux_sensor_example.py``:
  20. .. code-block:: python
  21. lux = SensorPub(LuxSensor())
  22. lux.output()
  23. lux.csv_writer(os.path.expanduser('~/lux.csv'))
  24. actions = lux.map(lambda event: event.val > threshold)
  25. actions.subscribe(GpioPinOut())
  26. actions.subscribe(lambda v: print('ON' if v else 'OFF'))
  27. scheduler = Scheduler(asyncio.get_event_loop())
  28. scheduler.schedule_periodic_on_separate_thread(lux, interval)
  29. scheduler.run_forever()
  30. In the above code, ``lux`` has three subscribers, and the output of the ``map``
  31. filter has two subscribers.
  32. Functional API
  33. --------------
  34. To simplfy these cases, we provide a *functional* API that can be used in
  35. place of (or along with) the *fluent* API. For each method added to the
  36. thing via the ``@filtermethod`` decorator (in ``thingflow.base``), a
  37. function with the same name is added to the module containing the definition
  38. (e.g. ``thingflow.filters.output`` has an ``output`` function and
  39. ``thingflow.filters.map`` has ``map`` and ``select`` functions). These functions
  40. take all the parameters of the associated method call (except for the implied
  41. ``self`` parameter of a bound method) and return what we call a *thunk*.
  42. In this case, a thunk is a function that accepts exactly one parameter, a
  43. output thing. The thunk subscribes one or more fitlers to the output thing and, if
  44. further downstream connections are permitted, returns the last filter in the
  45. chain. When composing filters, thunks can be used as follows:
  46. 1. The ``Schedule`` class has ``schedule_sensor()`` and
  47. ``schedule_sensor_on_separate_thread()`` methods. These take a
  48. sensor, wrap it in a ``SensorAsOutputThing`` instance, and then connect a sequence
  49. of filters to the output thing. Each filter can be passed in directly or
  50. passed indirectly via thunks.
  51. 2. The module ``thingflow.filters.combinators`` defines several functions that
  52. can be used to combine filters and thunks. These include ``compose``
  53. (sequential composition), ``parallel`` (parallel composition), and
  54. ``passthrough`` (parallel composition of a single spur off the main chain).
  55. Example
  56. -------
  57. Now, let us look at the lux sensor example, using the functional API [1]_:
  58. .. code-block:: python
  59. scheduler = Scheduler(asyncio.get_event_loop())
  60. scheduler.schedule_sensor(lux, interval,
  61. passthrough(output()),
  62. passthrough(csv_writer('/tmp/lux.csv')),
  63. map(lambda event:event.val > THRESHOLD),
  64. passthrouh(lambda v: print('ON' if v else 'OFF')),
  65. GpioPinOut())
  66. scheduler.run_forever()
  67. Notice that we do not need to instantiate any intermediate variables. Everything
  68. happens in the ``schedule_sensor()`` call. The first argument to this call is
  69. the sensor (without being wrapped in ``SensorAsOutputThing``) and the second argument
  70. is the sample interval. The rest of the arguments are a sequence of filters
  71. and thunks to be called. Using a bit of ASCII art, the graph created looks as
  72. follows::
  73. output
  74. /
  75. LuxSensor - csv_writer
  76. \
  77. map - lambda v: print(...)
  78. \
  79. GpioPinOut
  80. The lux sensor has three connections: ``output``, ``csv_writer``, and ``map``.
  81. We get this fanout by using the ``passthrough`` combinator, which creates a
  82. spur off the main chain. A ``passthrough`` is then used with the output of
  83. the ``map``, with the main chain finally ending at ``GpioPinOut``.
  84. .. [1] A full, self-contained version of this example may be found at
  85. ``examples/functional_api_example.py``.
  86. Combining the Fluent and Functional APIs
  87. ----------------------------------------
  88. You can use the functional API within a fluent API method chain. For example,
  89. let us include a sequence of filters in a ``passthrough()``:
  90. .. code-block:: python
  91. sensor = SensorAsOutputThing(LuxSensor())
  92. sensor.passthrough(compose(map(lambda event:event.val>THRESHOLD), output()))\
  93. .csv_writer('/tmp/lux.csv')
  94. Here, we used ``compose`` to build a sequence of ``map`` followed by ``output``.
  95. Note that the final ``csv_writer`` call is run against the original events
  96. output by the sensor, not on the mapped events. Here is the resuting
  97. graph::
  98. map - output
  99. /
  100. LuxSensor - csvwriter
  101. Internals
  102. ---------
  103. The linq-style functions of the fluent API are defined to be
  104. a kind of extension method -- their first parameter, usually named ``this``, is
  105. the output thing on which the method will eventually be attached (to borrow
  106. Smalltalk terminology, the "receiver"). The function
  107. takes zero or more additional parameters and returns a ``Filter`` object to be
  108. used for further chaining.
  109. The decorator ``thingflow.base.filtermethod`` adds a linq-function as a method
  110. on a base class (usually ``OutputThing``), effectively binding the ``this``
  111. parameter and, thus, the receiver. To support the functional API, the
  112. ``filtermethod`` decorator also wraps the linq-function in a
  113. ``_ThunkBuilder`` object. This object, when called with
  114. the parameters intended for our linq-function, returns a *thunk* -- a function
  115. that has all parameters bound except the ``this`` receiver. When a thunk is
  116. called (passing a output thing as a parameter), it calls the original linq-function
  117. with the output thing as the ``this`` receiver and the rest of the parameters
  118. coming from the original ``_ThunkBuilder`` call.
  119. The functional API also needs some special handling in cases where we may make
  120. ``connect`` calls under the covers (e.g. the ``Scheduler.schedule_sensor()``
  121. method or the various combinators in ``thingflow.filters.combinators``). Depending
  122. on whether the input thing being passed in is a filter, a thunk, a thunk-builder,
  123. or a plain function, we need to handle it differently. For example, if we are
  124. given a filter ``f``, we can connect it to our receiver ``this`` via
  125. ``this.connect(f)``. However, if we are given a thunk ``t``, we achieve the
  126. same thing via ``t(this)``. All of this logic is cenralized in
  127. ``thingflow.base._subscribe_thunk``.