output_things.rst 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172
  1. .. _output_things:
  2. 3. Implementing an OutputThing
  3. ==============================
  4. In most cases, one can simply wrap a sensor in the ``SensorAsOutputThing``
  5. class and not worry about the details of how to implement output things. There
  6. are also several pre-defined *readers* under ``thingflow.adapters`` that can
  7. obtain events from external sources like message brokers, flat files, and
  8. databases.
  9. The most likly reason for implmenting a new OutputThing is that you want to
  10. create a new adapter type that does not exist in the standard ThingFlow
  11. library. We will walk through the details in this document.
  12. Subclassing
  13. -----------
  14. When implmenting an output thing, one subclasses from
  15. ``thingflow.base.OutputThing``. To emit a new event, the subclass calls the
  16. ``_dispatch_next`` method with the event and port name. To signal an error or
  17. completion of the event stream,
  18. one calls ``_dispatch_error`` or ``_dispatch_completed``, respectively. The
  19. base class implementation of these methods are responsible for calling the
  20. ``on_next``, ``on_error``, and ``on_completed`` methods for each of the
  21. connected things.
  22. The code to call these ``_dispatch`` methods goes into a well-known method to be
  23. called by the scheduler. The specific method depends how the output thing will
  24. interact with the scheduler. There are two
  25. cases supported by ThingFlow and three associated mixin-classes that define
  26. the methods:
  27. 1. ``DirectOutputThingMixin`` defines an ``_observe`` method that can be called
  28. directly by the scheduler either in the main thread (via
  29. ``Scheduler.schedule_period()`` or ``Scheduler.schedule_recurring()``) or
  30. in a separate thread (via
  31. ``Scheduler.schedule_periodic_on_separate_thread()``).
  32. 2. ``EventLoopOutputThingMixin`` is used for an output thing that has its own separate
  33. event loop. This is run in a separate thread and the connected input things
  34. are called in the main thread.
  35. Simple CSV Reader
  36. -----------------
  37. OK, with all that out of the way, let us define a simple OutputThing. We will
  38. create a simple CSV-formatted spreadsheet file reader. Each row in the
  39. file corresponds to an event. Here is the class definition (found in
  40. ``examples/simple_csv_reader.py``):
  41. .. code-block:: python
  42. import csv
  43. from thingflow.base import OutputThing, DirectOutputThingMixin,\
  44. SensorEvent, FatalError
  45. class SimpleCsvReader(OutputThing, DirectOutputThingMixin):
  46. def __init__(self, filename, has_header_row=True):
  47. super().__init__() # Make sure the output_thing class is initialized
  48. self.filename = filename
  49. self.file = open(filename, 'r', newline='')
  50. self.reader = csv.reader(self.file)
  51. if has_header_row:
  52. # swallow up the header row so it is not passed as data
  53. try:
  54. self.reader.__next__()
  55. except Exception as e:
  56. raise FatalError("Problem reading header row of csv file %s: %s" %
  57. (filename, e))
  58. def _observe(self):
  59. try:
  60. row = self.reader.__next__()
  61. event = SensorEvent(ts=float(row[0]), sensor_id=row[1],
  62. val=float(row[2]))
  63. self._dispatch_next(event)
  64. except StopIteration:
  65. self.file.close()
  66. self._dispatch_completed()
  67. except FatalError:
  68. self._close()
  69. raise
  70. except Exception as e:
  71. self.file.close()
  72. self._dispatch_error(e)
  73. The ``SimpleCsvReader`` class subclasses from both ``OutputThing`` and
  74. ``DirectOutputThingMixin``. Subclassing from ``OutputThing`` provides the
  75. machinery needed to register connections and propagate events to downstream
  76. input things. ``DirectOutputThingMixin`` defines an empty ``_observe()`` method and
  77. indicates that the scheduler should call ``_observe()`` to dispatch events
  78. whenever the reader has been scheduled.
  79. In the ``__init__()`` constructor, we first make sure that the base class
  80. infrastructure is initialized through ``super().__init__()``. Next, we
  81. open the file, set up the csv reader, and read the header (if needed).
  82. The main action is happening in ``_observe()``. When scheduled, it reads
  83. the next row from the csv file and creates a ``SensorEvent`` from it.
  84. This event is passed on to the output port's connections via
  85. ``_dispatch_next()``. If
  86. the end of the file has been reached (indicated by the ``StopIteration``
  87. exception), we instead call ``_dispatch_completed()``. There are two
  88. error cases:
  89. 1. If a ``FatalError`` exception is thrown, we close our connection and
  90. propagate the error up. This will lead to an early termination of
  91. the event loop.
  92. 2. If any other exception is thrown, we pass it downstream via
  93. ``_dispatch_error()``. It will also close the event stream and
  94. cause the ``SimpleCsvReader`` to be de-scheduled. The main event
  95. loop may continue, assuming that there are other scheduled objects.
  96. We could save some work in implementing our reader by subclassing from
  97. ``thingflow.adapters.generic.DirectReader``. It provides the dispatch
  98. behavior common to most readers.
  99. Reading a File
  100. ~~~~~~~~~~~~~~
  101. Now, let us create a simple data file ``test.csv``::
  102. ts,id,value
  103. 1,1,2
  104. 2,1,3
  105. 3,1,455
  106. 4,1,55
  107. We can instantiate a ``SimpleCsvReader`` to read in the file via::
  108. reader = SimpleCsvReader("test.csv")
  109. Now, let's hook it to an printing input thing and then run it in the event
  110. loop:
  111. .. code-block:: python
  112. import asyncio
  113. from thingflow.base import Scheduler
  114. import thingflow.adapters.output # load the output method
  115. reader.output()
  116. scheduler = Scheduler(asyncio.get_event_loop())
  117. scheduler.schedule_recurring(reader)
  118. scheduler.run_forever()
  119. We use ``schedule_recurring()`` instead of ``schedule_periodic()``, as we
  120. expect all the data to be already present in the file. There is no sense in
  121. taking periodic samples.
  122. The output looks as follows::
  123. SensorEvent(sensor_id='1', ts=1.0, val=2.0)
  124. SensorEvent(sensor_id='1', ts=2.0, val=3.0)
  125. SensorEvent(sensor_id='1', ts=3.0, val=455.0)
  126. SensorEvent(sensor_id='1', ts=4.0, val=55.0)
  127. No more active schedules, will exit event loop
  128. Note that the event loop terminates on its own. This is due to the call to
  129. ``_dispatch_completed()`` when the csv reader throws ``StopIteration``.
  130. Output Things with Private Event Loops
  131. --------------------------------------
  132. There can be cases when the underlying API to be called by the OutputThing
  133. requires its own event loop / event listener. To handle this situation,
  134. use the interface provided by ``EventLoopOutputThingMixin``. Your main
  135. event loop for the output ting is implemented in the ``_observe_event_loop()``.
  136. If you call the scheduler's ``schedule_on_private_event_loop()`` method, it
  137. will run this method in a separate thread and then dispatch any events to
  138. the scheduler's main event loop (running in the main thread).
  139. To see some example code demonstrating an output thing using a private event
  140. loop, see ``thingflow.adapters.mqtt.MQTTReader``.