simple_csv_reader.py 2.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283
  1. """
  2. This is the example from output_things.rst. It reads a CSV-formatted spreadsheet
  3. file and generates an event from each line. We call output_things that pull
  4. data from an external source "readers".
  5. To run this script::
  6. python simple_csv_reader.py CSV_FILE
  7. For your csv file, here is some sample data:
  8. ts,id,value
  9. 1490576783,sensor-1,1
  10. 1490576784,sensor-1,1
  11. 1490576785,sensor-1,3
  12. 1490576786,sensor-1,4
  13. There is a more flexible csv reader class defined in
  14. thingflow.adapters.csv.
  15. """
  16. import csv
  17. import sys
  18. import asyncio
  19. from thingflow.base import OutputThing, DirectOutputThingMixin, Scheduler,\
  20. SensorEvent, FatalError
  21. import thingflow.filters.output # load the output method on the output_thing
  22. class SimpleCsvReader(OutputThing, DirectOutputThingMixin):
  23. """A simple csv file reader. We assume that each row contains
  24. a timestamp, a sensor id, and a value.
  25. We could save some work here by subclassing from
  26. thingflow.generic.DirectReader.
  27. """
  28. def __init__(self, filename, has_header_row=True):
  29. super().__init__() # Make sure the output_thing class is initialized
  30. self.filename = filename
  31. self.file = open(filename, 'r', newline='')
  32. self.reader = csv.reader(self.file)
  33. if has_header_row:
  34. # swallow up the header row so it is not passed as data
  35. try:
  36. self.reader.__next__()
  37. except Exception as e:
  38. raise FatalError("Problem reading header row of csv file %s: %s" %
  39. (filename, e))
  40. def _observe(self):
  41. try:
  42. row = self.reader.__next__()
  43. event = SensorEvent(ts=float(row[0]), sensor_id=row[1],
  44. val=float(row[2]))
  45. self._dispatch_next(event)
  46. except StopIteration:
  47. self.file.close()
  48. self._dispatch_completed()
  49. except FatalError:
  50. self._close()
  51. raise
  52. except Exception as e:
  53. self.file.close()
  54. self._dispatch_error(e)
  55. # If we are running this as a script, read events from the specified
  56. # file and print them via output().
  57. if __name__ == '__main__':
  58. # check command line arguments
  59. if len(sys.argv)!=2:
  60. # did not provide filename or provided too many arguments
  61. sys.stderr.write("%s FILENAME\n" % sys.argv[0])
  62. if len(sys.argv)==1:
  63. sys.stderr.write(" FILENAME is a required parameter\n")
  64. sys.exit(1)
  65. reader = SimpleCsvReader(sys.argv[1])
  66. reader.output()
  67. scheduler = Scheduler(asyncio.get_event_loop())
  68. scheduler.schedule_recurring(reader)
  69. scheduler.run_forever()
  70. sys.exit(0)