| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283 |
- """
- This is the example from output_things.rst. It reads a CSV-formatted spreadsheet
- file and generates an event from each line. We call output_things that pull
- data from an external source "readers".
- To run this script::
- python simple_csv_reader.py CSV_FILE
- For your csv file, here is some sample data:
- ts,id,value
- 1490576783,sensor-1,1
- 1490576784,sensor-1,1
- 1490576785,sensor-1,3
- 1490576786,sensor-1,4
- There is a more flexible csv reader class defined in
- thingflow.adapters.csv.
- """
- import csv
- import sys
- import asyncio
- from thingflow.base import OutputThing, DirectOutputThingMixin, Scheduler,\
- SensorEvent, FatalError
- import thingflow.filters.output # load the output method on the output_thing
- class SimpleCsvReader(OutputThing, DirectOutputThingMixin):
- """A simple csv file reader. We assume that each row contains
- a timestamp, a sensor id, and a value.
- We could save some work here by subclassing from
- thingflow.generic.DirectReader.
- """
- def __init__(self, filename, has_header_row=True):
- super().__init__() # Make sure the output_thing class is initialized
- self.filename = filename
- self.file = open(filename, 'r', newline='')
- self.reader = csv.reader(self.file)
- if has_header_row:
- # swallow up the header row so it is not passed as data
- try:
- self.reader.__next__()
- except Exception as e:
- raise FatalError("Problem reading header row of csv file %s: %s" %
- (filename, e))
-
- def _observe(self):
- try:
- row = self.reader.__next__()
- event = SensorEvent(ts=float(row[0]), sensor_id=row[1],
- val=float(row[2]))
- self._dispatch_next(event)
- except StopIteration:
- self.file.close()
- self._dispatch_completed()
- except FatalError:
- self._close()
- raise
- except Exception as e:
- self.file.close()
- self._dispatch_error(e)
- # If we are running this as a script, read events from the specified
- # file and print them via output().
- if __name__ == '__main__':
- # check command line arguments
- if len(sys.argv)!=2:
- # did not provide filename or provided too many arguments
- sys.stderr.write("%s FILENAME\n" % sys.argv[0])
- if len(sys.argv)==1:
- sys.stderr.write(" FILENAME is a required parameter\n")
- sys.exit(1)
- reader = SimpleCsvReader(sys.argv[1])
- reader.output()
- scheduler = Scheduler(asyncio.get_event_loop())
- scheduler.schedule_recurring(reader)
- scheduler.run_forever()
- sys.exit(0)
-
|