tutorial.rst 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448
  1. .. _tutorial:
  2. 2. Tutorial
  3. ===========
  4. To understand the core concepts in ThingFlow, let us build a simple app with a
  5. dummy sensor that generates random data and feeds it to a dummy LED. The final
  6. code for this example is at ``thingflow-python/examples/tutorial.py``.
  7. Input Things and Output Things
  8. ------------------------------
  9. Each ThingFlow "thing" is either an *output thing*, which
  10. emits events and and puts the into the workflow, an *input thing*, which consumes
  11. events, accepting event streams from the workflow, or both.
  12. An output thing may create multiple output event streams. Each output stream is
  13. associated with a named *output port*. Likewise, an input thing may accept
  14. input streams via named *input ports*. Input and output ports form the basis
  15. for interconnections in our data flows.
  16. In general, we can connect an input port to an output port via an
  17. output thing's ``connect()`` method like this::
  18. output_thing.connect(input_thing,
  19. port_mapping=('output_port_name', 'input_port_name'))
  20. There also exists a special *default* port, which is used when no port name
  21. is specified on a connection. If you leave off the port mapping
  22. parameter in the ``connect()`` call, it maps the default port of the
  23. output to the default port of the input::
  24. output_thing.connect(input_thing)
  25. Once connected through the ``connect`` call, a output and input thing interact
  26. through three methods on the input thing:
  27. * ``on_next``, which passes the next event in the stream to the input thing.
  28. * ``on_error``, which should be called at most once, if a fatal error occurs. The
  29. exception that caused the error is passed as the parameter.
  30. * ``on_completed``, which signals the end of the stream and takes no parameters.
  31. Note that each output port may have multiple connections. The functionality
  32. in the ``thingflow.base.OutputThing`` base class handles dispatching the
  33. events to all downstream consumers.
  34. More terms for specialized things
  35. ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
  36. We call things which have a default input port and a default output port *filters*.
  37. Filters can be easily composed into pipelines. We talk more about filters
  38. :ref:`below <filters>`. A number of filters are defined by ThingFlow under the
  39. module ``thingflow.filters``.
  40. Some things interface to outside world, connecting ThingFlow to
  41. transports and data stores like MQTT,
  42. PostgreSQL, and flat CSV files. We call these things *adapters*. Several may be
  43. found under ``thingflow.adapters``. We call an output thing that emits events
  44. coming from an outside source a *reader*. An input thing which accepts event
  45. and conveys them to an outside system a *writer*.
  46. Sensors
  47. -------
  48. Since ThingFlow is designed for Internet of Things applications, data capture
  49. from sensors is an important part of most applications. To this end, ThingFlow
  50. provides a *sensor* abstraction. A sensor is any python class that implements
  51. a ``sample()`` method and has a ``sensor_id`` property. The ``sample()`` method
  52. takes no arguments and returns the current value of the sensor. The ``sensor_id``
  53. property is used to identify the sensor in downstream events. Optionally, a
  54. sensor can indicate that there is no more data available by thowing a
  55. ``StopIteration`` exception.
  56. To plug sensors into the world of input and output things, ThingFlow provides
  57. the ``SensorAsOutputThing`` class. This class wraps any sensor, creating an
  58. output thing. When the thing is called by the scheduler, it calls the sensor's ``sample()``
  59. method, wraps the value in an event (either ``SensorEvent`` or a custom
  60. event type), and pushes it to any connected input things. We will see
  61. ``SensorAsOutputThing`` in action below.
  62. There are cases where this simple sensor abstraction is not sufficient to model
  63. a real-life sensor or you are outputting events that are not coming directly
  64. from a sensor (e.g. from a file or a message broker). In those situations,
  65. you can just create your own output thing class, subclassing from the base
  66. ``OutputThing`` class.
  67. Implementing a Sensor
  68. ~~~~~~~~~~~~~~~~~~~~~
  69. Now, we will implement a simple test sensor that generates random values.
  70. There is no base sensor class in ThingFlow, we just need a class that
  71. provides a ``sensor_id`` property and a ``sample()`` method. We'll take
  72. the ``sensor_id`` value as an argument to ``__init__()``. The sample
  73. value will be a random number generated with a Gaussian distribution,
  74. via ``random.gauss``. Here is the code for a simple version of our
  75. sensor:
  76. .. code-block:: python
  77. import random
  78. random.seed()
  79. class RandomSensor:
  80. def __init__(self, sensor_id, mean, stddev):
  81. """Generate a random value each time sample() is
  82. called, using the specified mean and standard
  83. deviation.
  84. """
  85. self.sensor_id = sensor_id
  86. self.mean = mean
  87. self.stddev = stddev
  88. def sample(self):
  89. return random.gauss(self.mean, self.stddev)
  90. def __str__(self):
  91. return "RandomSensor(%s, %s, %s)" % \
  92. (self.sensor_id, self.mean, self.stddev)
  93. This sensor will generate a new random value each time it is called. If we
  94. run it with a scheduler, it will run forever (at least until the program
  95. is interrupted via Control-C). For testing, it would be helpful to stop
  96. the program after a certain number of events. We can do that, by passing
  97. an event limit to the constructor, counting down the events, and throwing
  98. a ``StopIteration`` exception when the limit has been reached. Here is
  99. an improved version of our sensor that can signal a stop after the specified
  100. number of events:
  101. .. code-block:: python
  102. import random
  103. random.seed()
  104. import time
  105. from thingflow.base import SensorAsOutputThing
  106. class RandomSensor:
  107. def __init__(self, sensor_id, mean, stddev, stop_after):
  108. """This sensor will signal it is completed after the
  109. specified number of events have been sampled.
  110. """
  111. self.sensor_id = sensor_id
  112. self.mean = mean
  113. self.stddev = stddev
  114. self.events_left = stop_after
  115. def sample(self):
  116. if self.events_left>0:
  117. data = random.gauss(self.mean, self.stddev)
  118. self.events_left -= 1
  119. return data
  120. else:
  121. raise StopIteration
  122. def __str__(self):
  123. return "RandomSensor(%s, %s, %s)" % \
  124. (self.sensor_id, self.mean, self.stddev)
  125. Now, let's instantiate our sensor:
  126. .. code-block:: python
  127. from thingflow.base import SensorAsOutputThing
  128. MEAN = 100
  129. STDDEV = 10
  130. sensor = SensorAsOutputThing(RandomSensor(1, MEAN, STDDEV, stop_after=5))
  131. Implementing an Input Thing
  132. ---------------------------
  133. Now, let us define a simple intput thing -- a dummy LED actuator. The LED will
  134. inherit from the ``thingflow.base.IntputThing`` class, which defines the
  135. input thing interface for receiving events on the default port. Here is the code:
  136. .. code-block:: python
  137. from thingflow.base import InputThing
  138. class LED(InputThing):
  139. def on_next(self, x):
  140. if x:
  141. print("On")
  142. else:
  143. print("Off")
  144. def on_error(self, e):
  145. print("Got an error: %s" % e)
  146. def on_completed(self):
  147. print("LED Completed")
  148. def __str__(self):
  149. return 'LED'
  150. As you can see, the main logic is in ``on_next`` -- if the event looks like a
  151. true value, we just print "On", otherwise we print "Off". We won't do anything
  152. special for the ``on_error`` and ``on_completed`` callbacks. Now, we can
  153. instantiate an LED:
  154. .. code-block:: python
  155. led = LED()
  156. .. _filters:
  157. Filters
  158. -------
  159. A *filter* is a thing that as a single default input port and a single default
  160. output port. There is a base class for filters, ``thingflow.base.Filter``,
  161. which subclasses from both ``InputThing`` and ``OutputThing``.
  162. Although you can instantiate
  163. filter classes directly, ThingFlow makes use of some Python metaprogramming
  164. to dynamically add convenience methods to the base ``OutputThing`` class
  165. to create and return filtes. This allows filters can be easily chained
  166. together, implementing multi-step query pipelines without any glue code.
  167. Let us now create a series of filters that connect together our dummy light
  168. sensor and our LED. Here is some code to look at each event and send ``True`` to
  169. the LED if the value exceeds the mean (provided to the sensor) and ``False``
  170. otherwise:
  171. .. code-block:: python
  172. import thingflow.filters.map
  173. sensor.map(lambda evt: evt.val > MEAN).connect(led)
  174. The ``import`` statement loads the code for the ``map`` filter. By loading
  175. it, it is added as a method to the ``OutputThing`` class. Since the sensor was
  176. wrapped in ``SensorAsOutputThing``, which inherits from ``OutputThing``, it
  177. gets this method as
  178. well. Calling the method creates a filter which runs the supplied
  179. anonymous function on each event. This
  180. filter is automatically connected to the sensor's default output port.
  181. The ``map`` call returns the filter, allowing it to be used
  182. in chained method calls. In this case, we ``connect`` the ``led`` to the
  183. filter's event stream.
  184. Inside the Map filter
  185. ~~~~~~~~~~~~~~~~~~~~~
  186. It is important to note that the call to a filter method returns a filter
  187. object and not an event. This call happens at initializaiton time.
  188. To get a better understanding of what's happening, let's take a look
  189. inside the ``map`` filter.
  190. First, let us create a straightfoward implementation of our filter
  191. by subclassing from the base ``Filter`` class and then overridding
  192. the ``on_next`` method:
  193. .. code-block:: python
  194. from thingflow.base import Filter, filtermethod
  195. class MapFilter(Filter):
  196. def __init__(self, previous_in_chain, mapfun):
  197. super().__init__(previous_in_chain)
  198. self.mapfun = mapfun
  199. def on_next(self, x):
  200. next = self.mapfun(x)
  201. if next is not None:
  202. self._dispatch_net(next)
  203. @filtermethod(OutputThing)
  204. def map(this, mapfun):
  205. return MapFilter(this, mapfun)
  206. In this case, the ``on_next`` method applies the provided ``mapfun``
  207. mapping function to each incoming event and, if the result is not ``None``,
  208. passes it on to the default output port via the method ``dispatch_next``
  209. (whose implementation is inherited from the base ``OutputThing`` class).
  210. In the ``__init__`` method of our filter, we accept a ``previous_in_chain``
  211. argument and pass it to the parent class's constructor. As the name implies,
  212. this argument should be the previous filter in the chain which is acting as
  213. a source of events to this filter. ``Filter.__init__`` will perform a
  214. ``previous_in_chain.connect(self)`` call to establish the connection.
  215. We can now wrap our filter in the function ``map``, which takes the previous
  216. filter in the chain and our mapping function as arguments, returning a new
  217. instance of ``MapFilter``. The decorator ``functionfilter`` is used to attach
  218. this function to ``OutputThing`` as a method. We can then make calls
  219. like ``thing.map(mapfun)``.
  220. The actual code for ``map``in ThingFlow map be found in the module ``thingflow.filters.map``.
  221. It is written slightly differently, in a more functional style:
  222. .. code-block:: python
  223. from thingflow.base import OutputThing, FunctionFilter, filtermethod
  224. @filtermethod(OutputThing, alias="select")
  225. def map(this, mapfun):
  226. def on_next(self, x):
  227. y = mapfun(x)
  228. if y is not None:
  229. self._dispatch_next(y)
  230. return FunctionFilter(this, on_next, name='map')
  231. The ``FunctionFilter`` class is a subclass of ``Filter`` which takes its ``on_next``,
  232. ``on_error``, and ``on_completed`` method implementations as function parameters.
  233. In this case, we define ``on_next`` inside of our ``map`` filter. This avoids the
  234. need to even create a ``MapFilter`` class.
  235. Sensor Events
  236. -------------
  237. ThingFlow provides a *namedtuple* called ``thingflow.base.SensorEvent``, to
  238. serve as elements of our data stream. The first member of the tuple, called
  239. ``sensor_id`` is the sensor id property of the sensor from which the event
  240. originated. The second member of the event tuple, ``ts``, is a timestamp
  241. of when the event was generated. The third member, ``val``, is the value
  242. returned by the sensor's ``sample()`` method.
  243. The ``SensorAsOutputThing`` wrapper class creates ``SensorEvent`` instances by default.
  244. However, you can provide an optional ``make_sensor_event`` callback to
  245. ``SensorAsOutputThing`` to override this behavior and provide your own event types.
  246. Sensor Output Example
  247. ---------------------
  248. Imagine that the sensor defined above outputs the following three events,
  249. separated by 10 seconds each::
  250. SensorEvent(1, 2016-06-21T17:43:25, 95)
  251. SensorEvent(1, 2016-06-21T17:43:35, 101)
  252. SensorEvent(1, 2016-06-21T17:43:45, 98)
  253. The ``select`` filter would output the following::
  254. False
  255. True
  256. False
  257. The LED would print the following::
  258. Off
  259. On
  260. Off
  261. Some Debug Output
  262. -----------------
  263. There are a number of approaches one can take to help understand the behavior of
  264. an event dataflow. First, can add an ``output`` thing to various points in the
  265. flow. The ``output`` thing just prints each event that it see. It is another
  266. filter that can be added to the base ``OutputThing`` class by importing the
  267. associated Python package. For example, here is how we add it as a connection to
  268. our sensor, to print out every event the sensor emits::
  269. import thingflow.filters.output
  270. sensor.output()
  271. Note that this does not actually print anything yet, we have to run the
  272. *scheduler* to start up our dataflow and begin sampling events from the sensor.
  273. Another useful debugging tool is the ``print_downstream`` method on
  274. ``OutputThing``. It can be called on any subclass to see a representation
  275. of the event tree rooted at the given thing. For example, here is what we
  276. get when we call it on the ``sensor`` at this point::
  277. ***** Dump of all paths from RandomSensor(1, 100, 10) *****
  278. RandomSensor(1, 100, 10) => select => LED
  279. RandomSensor(1, 100, 10) => output
  280. ************************************
  281. Finally, the ``OutputThing`` class also provices a ``trace_downstream`` method.
  282. It will instument (transitively) all downstream connections. When the scheduler
  283. runs the thing, all events passing over these connections will be printed.
  284. The Scheduler
  285. -------------
  286. As you can see, it is easy to create these pipelines. However, this sequence of
  287. things will do nothing until we hook it into the main
  288. event loop. In particular, any output thing that source events into the system
  289. (e.g. sensors) must be made known to the *scheduler*. Here is an example where
  290. we take the dataflow rooted at the light sensor, tell the scheduler to sample it
  291. once every second, and then start up the event loop:
  292. .. code-block:: python
  293. import asyncio
  294. from thingflow.base import Scheduler
  295. scheduler = Scheduler(asyncio.get_event_loop())
  296. scheduler.schedule_periodic(sensor, 1.0) # sample once a second
  297. scheduler.run_forever() # will run until there are no more active sensors
  298. print("That's all folks!") # This will never get called in the current version
  299. The output will look something like this::
  300. Off
  301. SensorEvent(sensor_id=1, ts=1466554963.321487, val=91.80221483640152)
  302. On
  303. SensorEvent(sensor_id=1, ts=1466554964.325713, val=105.20052817504502)
  304. Off
  305. SensorEvent(sensor_id=1, ts=1466554965.330321, val=97.78633493089245)
  306. Off
  307. SensorEvent(sensor_id=1, ts=1466554966.333975, val=90.08049816341648)
  308. Off
  309. SensorEvent(sensor_id=1, ts=1466554967.338074, val=89.52641383841595)
  310. On
  311. SensorEvent(sensor_id=1, ts=1466554968.342416, val=101.35659321534875)
  312. ...
  313. The scheduler calls the ``_observe`` method of ``SensorAsOutputThing`` once every second.
  314. This method samples the sensor and calls ``_dispatch_next`` to pass it to
  315. any downstream things connected to the output port.
  316. In the program output above,
  317. we are seeing the On/Off output from the LED interleaved with the original
  318. events printed by the ``output`` element we connected directly to the sensor.
  319. Note that this will keep running forever, until you use Control-C to stop the
  320. program.
  321. Stopping the Scheduler
  322. ~~~~~~~~~~~~~~~~~~~~~~
  323. As you saw in the last example, the ``run_forever`` method of the scheduler will
  324. keep on calling things as long as any have been scheduled. If we are just
  325. running a test, it would be nice to stop the program automatically
  326. ather than having to Control-C
  327. out of the running program. Our sensor class addresses this by including an
  328. optional ``stop_after`` parameter on the constuctor. When we instantiate our
  329. sensor, we can pass in this additional parameter::
  330. sensor = SensorAsOutputThing(RandomSensor(1, MEAN, STDDEV, stop_after=5))
  331. The scheduler's ``run_forever()`` method does not really run forever -- it only
  332. runs until there are no more schedulable actions. When our sensor throws the
  333. ``StopIteration`` exception, it causes the wrapping ``SensorAsOutputThing`` to deschedule
  334. the sensor. At that point, there are no more publishers being managed by
  335. the scheduler, so it exits the loop inside ``run_forever()``.
  336. When we run the example this time, the program stops after five samples::
  337. Off
  338. SensorEvent(sensor_id=1, ts=1466570049.852193, val=87.42239337997071)
  339. On
  340. SensorEvent(sensor_id=1, ts=1466570050.856118, val=114.47614678277142)
  341. Off
  342. SensorEvent(sensor_id=1, ts=1466570051.860044, val=90.26934530230736)
  343. On
  344. SensorEvent(sensor_id=1, ts=1466570052.864378, val=102.70094730226809)
  345. On
  346. SensorEvent(sensor_id=1, ts=1466570053.868465, val=102.65381015942252)
  347. LED Completed
  348. Calling unschedule hook for RandomSensor(1, 100, 10)
  349. No more active schedules, will exit event loop
  350. That's all folks!
  351. Next Steps
  352. ----------
  353. You have reached the end of the tutorial. To learn more, you might:
  354. * Continue with this documentation. In the :ref:`next section <output_things>`,
  355. we look at implementing output things.
  356. * Take a look at the code under the ``examples`` directory.
  357. * You can also read through the code in the ``thingflow`` proper -- a goal of the
  358. project is to ensure that it is clearly commented.