| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172 |
- .. _output_things:
- 3. Implementing an OutputThing
- ==============================
- In most cases, one can simply wrap a sensor in the ``SensorAsOutputThing``
- class and not worry about the details of how to implement output things. There
- are also several pre-defined *readers* under ``thingflow.adapters`` that can
- obtain events from external sources like message brokers, flat files, and
- databases.
- The most likly reason for implmenting a new OutputThing is that you want to
- create a new adapter type that does not exist in the standard ThingFlow
- library. We will walk through the details in this document.
- Subclassing
- -----------
- When implmenting an output thing, one subclasses from
- ``thingflow.base.OutputThing``. To emit a new event, the subclass calls the
- ``_dispatch_next`` method with the event and port name. To signal an error or
- completion of the event stream,
- one calls ``_dispatch_error`` or ``_dispatch_completed``, respectively. The
- base class implementation of these methods are responsible for calling the
- ``on_next``, ``on_error``, and ``on_completed`` methods for each of the
- connected things.
- The code to call these ``_dispatch`` methods goes into a well-known method to be
- called by the scheduler. The specific method depends how the output thing will
- interact with the scheduler. There are two
- cases supported by ThingFlow and three associated mixin-classes that define
- the methods:
- 1. ``DirectOutputThingMixin`` defines an ``_observe`` method that can be called
- directly by the scheduler either in the main thread (via
- ``Scheduler.schedule_period()`` or ``Scheduler.schedule_recurring()``) or
- in a separate thread (via
- ``Scheduler.schedule_periodic_on_separate_thread()``).
- 2. ``EventLoopOutputThingMixin`` is used for an output thing that has its own separate
- event loop. This is run in a separate thread and the connected input things
- are called in the main thread.
- Simple CSV Reader
- -----------------
- OK, with all that out of the way, let us define a simple OutputThing. We will
- create a simple CSV-formatted spreadsheet file reader. Each row in the
- file corresponds to an event. Here is the class definition (found in
- ``examples/simple_csv_reader.py``):
- .. code-block:: python
- import csv
- from thingflow.base import OutputThing, DirectOutputThingMixin,\
- SensorEvent, FatalError
-
- class SimpleCsvReader(OutputThing, DirectOutputThingMixin):
- def __init__(self, filename, has_header_row=True):
- super().__init__() # Make sure the output_thing class is initialized
- self.filename = filename
- self.file = open(filename, 'r', newline='')
- self.reader = csv.reader(self.file)
- if has_header_row:
- # swallow up the header row so it is not passed as data
- try:
- self.reader.__next__()
- except Exception as e:
- raise FatalError("Problem reading header row of csv file %s: %s" %
- (filename, e))
-
- def _observe(self):
- try:
- row = self.reader.__next__()
- event = SensorEvent(ts=float(row[0]), sensor_id=row[1],
- val=float(row[2]))
- self._dispatch_next(event)
- except StopIteration:
- self.file.close()
- self._dispatch_completed()
- except FatalError:
- self._close()
- raise
- except Exception as e:
- self.file.close()
- self._dispatch_error(e)
-
- The ``SimpleCsvReader`` class subclasses from both ``OutputThing`` and
- ``DirectOutputThingMixin``. Subclassing from ``OutputThing`` provides the
- machinery needed to register connections and propagate events to downstream
- input things. ``DirectOutputThingMixin`` defines an empty ``_observe()`` method and
- indicates that the scheduler should call ``_observe()`` to dispatch events
- whenever the reader has been scheduled.
- In the ``__init__()`` constructor, we first make sure that the base class
- infrastructure is initialized through ``super().__init__()``. Next, we
- open the file, set up the csv reader, and read the header (if needed).
- The main action is happening in ``_observe()``. When scheduled, it reads
- the next row from the csv file and creates a ``SensorEvent`` from it.
- This event is passed on to the output port's connections via
- ``_dispatch_next()``. If
- the end of the file has been reached (indicated by the ``StopIteration``
- exception), we instead call ``_dispatch_completed()``. There are two
- error cases:
- 1. If a ``FatalError`` exception is thrown, we close our connection and
- propagate the error up. This will lead to an early termination of
- the event loop.
- 2. If any other exception is thrown, we pass it downstream via
- ``_dispatch_error()``. It will also close the event stream and
- cause the ``SimpleCsvReader`` to be de-scheduled. The main event
- loop may continue, assuming that there are other scheduled objects.
-
- We could save some work in implementing our reader by subclassing from
- ``thingflow.adapters.generic.DirectReader``. It provides the dispatch
- behavior common to most readers.
- Reading a File
- ~~~~~~~~~~~~~~
- Now, let us create a simple data file ``test.csv``::
- ts,id,value
- 1,1,2
- 2,1,3
- 3,1,455
- 4,1,55
- We can instantiate a ``SimpleCsvReader`` to read in the file via::
- reader = SimpleCsvReader("test.csv")
- Now, let's hook it to an printing input thing and then run it in the event
- loop:
- .. code-block:: python
- import asyncio
- from thingflow.base import Scheduler
- import thingflow.adapters.output # load the output method
-
- reader.output()
- scheduler = Scheduler(asyncio.get_event_loop())
- scheduler.schedule_recurring(reader)
- scheduler.run_forever()
- We use ``schedule_recurring()`` instead of ``schedule_periodic()``, as we
- expect all the data to be already present in the file. There is no sense in
- taking periodic samples.
- The output looks as follows::
- SensorEvent(sensor_id='1', ts=1.0, val=2.0)
- SensorEvent(sensor_id='1', ts=2.0, val=3.0)
- SensorEvent(sensor_id='1', ts=3.0, val=455.0)
- SensorEvent(sensor_id='1', ts=4.0, val=55.0)
- No more active schedules, will exit event loop
- Note that the event loop terminates on its own. This is due to the call to
- ``_dispatch_completed()`` when the csv reader throws ``StopIteration``.
- Output Things with Private Event Loops
- --------------------------------------
- There can be cases when the underlying API to be called by the OutputThing
- requires its own event loop / event listener. To handle this situation,
- use the interface provided by ``EventLoopOutputThingMixin``. Your main
- event loop for the output ting is implemented in the ``_observe_event_loop()``.
- If you call the scheduler's ``schedule_on_private_event_loop()`` method, it
- will run this method in a separate thread and then dispatch any events to
- the scheduler's main event loop (running in the main thread).
- To see some example code demonstrating an output thing using a private event
- loop, see ``thingflow.adapters.mqtt.MQTTReader``.
|