ports.rst 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185
  1. .. _ports:
  2. 4. Things with Non-default Ports
  3. ================================
  4. ThingFlow provides a general dataflow architecture. Output things can
  5. output events on different ports and input things can receive messages via
  6. different ports. Each ``connect()`` call can rename ports, allowing the
  7. interconnection of any compatible ports. For example, one might have code like::
  8. output_thing.connect(input_thing,
  9. port_mapping=('out_port_name', 'in_port_name'))
  10. As you know, ThingFlow provides a special ``default`` port that does not need
  11. any mapping. This makes it convenient for building chains of filters and is good
  12. enough most of the time. However, when you need a more complex data flow, the
  13. more general mapping capability can be very helpful. We will now look at it in
  14. more detail.
  15. Multiple Output Ports
  16. ---------------------
  17. To create an output thing which sends messaages on multiple output ports,
  18. one subclasses from ``OutputThing`` or one of its descendents. Here is a simple
  19. thing that accepts events on the default input port and sends values to one or
  20. more of three ports:
  21. .. code-block:: python
  22. class MultiPortOutputThing(OutputThing, InputThing):
  23. def __init__(self, previous_in_chain):
  24. super().__init__(ports=['divisible_by_two', 'divisible_by_three',
  25. 'other'])
  26. # connect to the previous filter
  27. self.disconnect_from_upstream = previous_in_chain.connect(self)
  28. def on_next(self, x):
  29. val = int(round(x.val))
  30. if (val%2)==0:
  31. self._dispatch_next(val, port='divisible_by_two')
  32. if (val%3)==0:
  33. self._dispatch_next(val, port='divisible_by_three')
  34. if (val%3)!=0 and (val%2)!=0:
  35. self._dispatch_next(val, port='other')
  36. def on_completed(self):
  37. self._dispatch_completed(port='divisible_by_two')
  38. self._dispatch_completed(port='divisible_by_three')
  39. self._dispatch_completed(port='other')
  40. def on_error(self, e):
  41. self._dispatch_error(e, port='divisible_by_two')
  42. self._dispatch_error(e, port='divisible_by_three')
  43. self._dispatch_error(e, port='other')
  44. In the ``_init__`` constructor, we must be sure to call the super class's
  45. constructor, passing it the list of ports that will be used. If the list is
  46. not provided, it is initialized to the default port, and sending to any other
  47. port would be a runtime error.
  48. This thing will accept events from the default input port, so we subclass from
  49. ``InputThing`` and process sensor values in the ``on_next()`` method.
  50. We first obtain a value from the event and round it
  51. to the nearest integer. Next, we see if it is divisible by 2. If so, we call
  52. ``_dispatch_next()`` to dispatch the value to the ``divisible_by_two`` port,
  53. passing the port name as the second parameter (it defaults to ``default``).
  54. Next, we check for divisibity by three, and dispatch the value to the
  55. ``divisible_by_three`` port if it is divisible. Note that a number like six
  56. will get dispatched to both ports. Finally, if the value is not divisible by
  57. either two or three, we dispatch it to the ``other`` port.
  58. For the ``on_completed()`` and ``on_error()`` events, we forward the
  59. notifications to each of the output ports, by calling ``_dispatch_completed()``
  60. and ``_dispatch_next()`` three times. In general, each port can be viewed as
  61. a separate event stream with its own state. An output thing might decide to
  62. mark completed a subset of its ports while continuing to send new events
  63. on other ports.
  64. Let us look at how this thing might be called:
  65. .. code-block:: python
  66. sensor = SensorAsOutputThing(RandomSensor(1, mean=10, stddev=5,
  67. stop_after_events=10))
  68. mtthing = MultiPortOutputThing(sensor)
  69. mtthing.connect(lambda v: print("even: %s" % v),
  70. port_mapping=('divisible_by_two', 'default'))
  71. mtthing.connect(lambda v: print("divisible by three: %s" % v),
  72. port_mapping=('divisible_by_three', 'default'))
  73. mtthing.connect(lambda v: print("not divisible: %s" % v),
  74. port_mapping=('other', 'default'))
  75. scheduler.schedule_recurring(sensor)
  76. scheduler.run_forever()
  77. Here, we map a different anonymous print function to each output port of the
  78. thing. Internally, ``connect`` is wrapping the anonymous functions with
  79. ``CallableAsInputThing``. This thing only listens on a default port, so we
  80. have to map the port names in the ``connect()`` calls.
  81. The full code for this example is at ``examples/multi_port_example.py``.
  82. Multiple Input Ports
  83. --------------------
  84. Now, let us consider a thing that supports incoming messages on multiple
  85. ports. Messages on non-default input ports are passed to different methods on an
  86. input thing. Specifically, given a port name ``PORT``, events are dispatched
  87. to the method ``on_PORT_next()``, completion of the port's stream is
  88. dispatched to ``on_PORT_completed()``, and errors are dispatched to
  89. ``on_PORT_error()``. Multiple ports are frequently useful
  90. when implementing state machines or filters that combine multiple inputs.
  91. As an example, assume that we have a state machine that reads data
  92. from two sensors: a *left* sensor and a *right* sensor. Here is how the code
  93. might be structured:
  94. .. code-block:: python
  95. class StateMachine:
  96. def on_left_next(self, x):
  97. ...
  98. def on_left_completed(self):
  99. ...
  100. def on_left_error(self):
  101. ...
  102. def on_right_next(self, x):
  103. ...
  104. def on_right_completed(self):
  105. ...
  106. def on_right_error(self):
  107. ...
  108. Here is how we might set up the connections to the sensors:
  109. .. code-block:: python
  110. left = SensorAsOutputThing(LuxSensor('left'))
  111. right = SensorPsOutputThing(LuxSensor('right'))
  112. state_machine = StateMachine()
  113. left.connect(state_machine, port_mapping=('default', 'left'))
  114. right.connect(state_machine, port_mapping=('default', 'right'))
  115. Each sensor outputs its data on the default port, so we map the connections
  116. to the ``left`` and ``right`` ports on the state machine.
  117. Multi-port Filters
  118. -------------------
  119. A *filter* is an ThingFlow element that has both default input and default
  120. output ports. Filters can be easily connected into pipelines.
  121. Filters usually have a single input port and a single output port, but other
  122. topologies are possible (typically one-to-many or many-to-one). One particularly
  123. useful filter is the *dispatcher*. A dispatcher routes each incoming event
  124. (on the default input port) to one of several output ports, based on some
  125. criteria.
  126. For example, consider the filter ``thingflow.filters.dispatch.Dispatcher``. This
  127. filter is provided a set of routing rules in the form of (predicate function,
  128. output port) pairs. An output port is created for each rule (plus the default
  129. port). In the ``on_next()`` method of the filter's InputThing interface, an
  130. incoming event is tested on each of the predicate functions in order. When a
  131. predicate is found that returns true, the event is dispatched to the associated
  132. port and the rule search stops for that event. If an event fails all the
  133. predicate checks, it is passed to the ``default`` port.
  134. Here is the most relevant parts of the filter code (see ``dispatch.py`` for the
  135. complete code):
  136. .. code-block:: python
  137. class Dispatcher(OutputThing, InputThing):
  138. def __init__(self, previous_in_chain, dispatch_rules):
  139. ports = [port for (pred, port) in dispatch_rules] + ['default']
  140. super().__init__(ports=ports)
  141. self.dispatch_rules = dispatch_rules
  142. self.disconnect = previous_in_chain.connect(self)
  143. def on_next(self, x):
  144. for (pred, port) in self.dispatch_rules:
  145. if pred(x):
  146. self._dispatch_next(x, port=port)
  147. return
  148. self._dispatch_next(x, port='default') # fallthrough case
  149. We will use this dispatcher within a larger example in the subsection :ref:`solar-water-heater`.