more-examples.rst 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220
  1. .. _more-examples:
  2. 6. More Examples
  3. ================
  4. This section contains more examples of ThingFlow in action. The code for these
  5. examples may be found in the ``examples`` subdirectory of the ``thingflow-python``
  6. repository.
  7. .. _solar-water-heater:
  8. Solar Water Heater
  9. ------------------
  10. In this scenario, we have a solar water heater that provides hot water for a
  11. residence. There is a water temperature
  12. sensor on the output pipe of the heater. There is also an actuator which
  13. controls a bypass valve: if the actuator is ON, the hot water is redirected to a
  14. spa, instead of going to the house. The spa is acting as a heat sink, taking
  15. up the extra heat, so that the water in the house never gets too hot.
  16. We will implement a state machine which looks at the data
  17. from the temperature sensor and turns on the bypass valve when the heated water
  18. is too hot. To avoid oscillations, we use the following logic:
  19. 1. If the running average of the temperature exceeds T_high, turn on the bypass
  20. 2. When the running average dips below T_low (where T_low<T_high), then turn
  21. off the bypass.
  22. Here is a diagram of the ThingFlow flow which implements this application:
  23. .. image:: images/solar-water-heater.png
  24. We see that the water sensor's output is run through a low pass filter to
  25. reduce noise in the reading. It is then passed to a dispatcher 1[#]_
  26. which sends each event to one of several output ports, depending on how it compares
  27. to T_low and T_high. The control state machine determines when to turn on
  28. the actuator, based on the current state and the port of the input event.
  29. Here is the ThingFlow code connecting everything together:
  30. .. code-block:: python
  31. T_high = 110 # Upper threshold (degrees fahrenheit)
  32. T_low = 90 # Lower threshold
  33. sensor = TempSensor(gpio_port=1)
  34. # The dispatcher converts a sensor reading into
  35. # threshold events
  36. dispatcher = sensor.transduce(RunningAvg(5)) \
  37. .dispatch([(lambda v: v[2]>=T_high, 't_high'),
  38. (lambda v: v[2]<=T_low, 't_low')])
  39. controller = Controller()
  40. dispatcher.connect(controller, port_mapping=('t_high','t_high'))
  41. dispatcher.connect(controller, port_mapping=('t_low', 't_low'))
  42. dispatcher.connect(controller, port_mapping=('default', 'between'))
  43. actuator = Actuator()
  44. controller.connect(actuator)
  45. Here is a the state machine to be implemented by the ``Controller`` class:
  46. .. image:: images/solar-heater-state-machine.png
  47. We start in the state ``Initial``. If the first incoming event is ``T_high``,
  48. we go to the ``Normal`` state. Othewise, we immediately go to ``Too Hot``, which
  49. will turn off the actuator. After the initial event, we move from ``Normal`` to
  50. ``Too Hot`` if we receive a ``T_high`` event and then back to ``Normal`` when we
  51. receive ``T_low``. Here is the code which implements this state machine as a
  52. ThingFlow filter:
  53. .. code-block:: python
  54. class Controller(OutputThing):
  55. def __init__(self):
  56. super().__init__()
  57. self.state = INITIAL
  58. self.completed = False
  59. def _make_event(self, val):
  60. return SensorEvent(ts=time.time(), sensor_id='Controller', val=val)
  61. def on_t_high_next(self, event):
  62. if self.state==NORMAL or self.state==INITIAL:
  63. self._dispatch_next(self._make_event("ON"))
  64. self.state = TOO_HOT
  65. def on_t_high_completed(self):
  66. if not self.completed:
  67. self._dispatch_completed()
  68. self.completed = True
  69. def on_t_high_error(self, e):
  70. pass
  71. def on_t_low_next(self, event):
  72. if self.state==TOO_HOT or self.state==INITIAL:
  73. self._dispatch_next(self._make_event("OFF"))
  74. self.state = NORMAL
  75. def on_t_low_completed(self):
  76. if not self.completed:
  77. self._dispatch_completed()
  78. self.completed = True
  79. def on_t_low_error(self, e):
  80. pass
  81. def on_between_next(self, x):
  82. if self.state==INITIAL:
  83. self.state = NORMAL
  84. self._dispatch_next(self._make_event("OFF"))
  85. else:
  86. pass # stay in current state
  87. def on_between_error(self, e):
  88. pass
  89. def on_between_completed(self):
  90. # don't want to pass this forward,
  91. # as it will happen after the first item
  92. pass
  93. As you can see, we have ``on_next``, ``on_completed``, and ``on_error`` methods
  94. for each of the three input ports. A nice property of this design is that the
  95. state machine logic is isolated to a single class and does not ever deal with
  96. actual sensor readings. This makes it easy to test to test the controller logic
  97. independent of the physical sensor and actuator.
  98. The full code for this example may be found at ``examples/solar_heater_scenario.py``.
  99. .. [#] ``thingflow.filters.dispatch.Dispatcher``
  100. GE Predix Adapters
  101. ------------------
  102. GE Digital's `Predix <https://predix.io>`_ platform is a public cloud service
  103. optimized for building IoT data analyses and applications. The
  104. `Time Series Service <https://docs.predix.io/en-US/content/service/data_management/time_series/>`_
  105. supports the storage and retrieval of cloud sensor event data. ThingFlow events map
  106. very naturally to this service, and adapters are provided in the
  107. ``thingflow.adapters.predix`` module. This allows us to build ThingFlow
  108. applications that run "on the edge" and upload their event data to Predix for
  109. analysis.
  110. Initial Configuration
  111. ~~~~~~~~~~~~~~~~~~~~~
  112. Before using the Predix adapters, you will need to configure on Predix a UAA
  113. (User Authentication and Authorization) service and a Timeseries service. You will
  114. also need to install some client side CLI appliations to query and update Predix
  115. configurations. Instructions and hints on this may be found in a separate Github
  116. repository: https://github.com/jfischer/ge-predix-python-timeseries-example.
  117. PredixWriter
  118. ~~~~~~~~~~~~
  119. The ``PredixWriter`` class is an ``InputThing`` which accepts ThingFlow events
  120. and sends them to the Predix Time Series Service via a websocket API. Here
  121. is some example code which instantiates a writer, connects it to a sensor and
  122. runs the resulting flow:
  123. .. code-block:: python
  124. # The next three values are specific to your install
  125. INGEST_URL = ...
  126. PREDIX_ZONE_ID = ...
  127. TOKEN = ...
  128. # The data comes from a light sensor
  129. sensor1 = SensorAsOutputThing(LuxSensor('test-sensor1'))
  130. writer = PredixWriter(INGEST_URL, PREDIX_ZONE_ID, TOKEN,
  131. batch_size=3)
  132. sensor1.connect(writer)
  133. scheduler = Scheduler(asyncio.get_event_loop())
  134. scheduler.schedule_periodic(sensor1, 0.5)
  135. scheduler.run_forever()
  136. The ``INGEST_URL``, ``PREDIX_ZONE_ID``, and ``TOKEN`` parameters
  137. are described in the example repository's
  138. `README <https://github.com/jfischer/ge-predix-python-timeseries-example/blob/master/README.rst>`_ file.
  139. The ``batch_size`` parameter indicates how many events to buffer in memory before
  140. sending them up within a single message.
  141. By default, ``PredixWriter`` expects to receive instances of ``SensorEvent``,
  142. which each include a Unix timestamp (in seconds as is the Python convention),
  143. a sensor id, and a value. If you want to sent data using a different representation
  144. in ThingFlow, you can subclass from ``EventExtractor``, which defines how to
  145. obtain the Predix event fields from an incoming ThingFlow event. This extractor
  146. is then passed as an ``extractor`` keyword parameter to the ``PredixWriter``
  147. constructor. Note that Predix timestamps are expressed in milliseconds rather
  148. than seconds.
  149. PredixReader
  150. ~~~~~~~~~~~~
  151. The ``PredixReader`` class queries the Predix Time Series service for
  152. events from a specified sensor over a specified time range. These are then
  153. mapped to events in the ThingFlow world and passed to any connected things.
  154. Here is a short example which instantiates a ``PredixReader`` to query
  155. for events in the last minute, connects it ``print``, schedules the reader
  156. to be run once every 60 seconds, and starts the flow.
  157. .. code-block:: python
  158. # The next three values are specific to your install
  159. QUERY_URL = ...
  160. PREDIX_ZONE_ID = ...
  161. TOKEN = ...
  162. reader = PredixReader(QUERY_URL, PREDIX_ZONE_ID, TOKEN,
  163. 'test-sensor1', start_time=time.time()-3600,
  164. one_shot=False)
  165. reader.connect(print)
  166. scheduler.schedule_periodic(reader, 60)
  167. scheduler.run_forever()
  168. As with the writer, you must pass in a Query URL, Predix Zone Id,
  169. and bearer token as determined through your Predix configuration.
  170. The ``start_time`` parameter is the starting timestamp for the query.
  171. By specifying ``one_shot`` as ``False``, we are requesting that the
  172. reader be run until the process is killed. If ``one_shot`` was
  173. set to ``True``, the reader will close its event stream after one
  174. query.
  175. If you want the reader to emit a different type of event, pass in a value for the
  176. ``build_event_fn`` keyword parameter of the ``PredixReader`` constructor.
  177. The function should take as arguments
  178. the sensor id, the predix timestamp (in milliseconds), the sensor value,
  179. and a quality code (which is dropped in the default implementation). The
  180. function should return a corresponding event for use in ThingFlow.
  181. A complete version of this example may be found at ``examples/predix_example.py``.
  182. This script can be executed from the command line.