csv.py 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198
  1. # Copyright 2016 by MPI-SWS and Data-Ken Research.
  2. # Licensed under the Apache 2.0 License.
  3. """Adapters for reading/writing event streams to CSV (spreadsheet) files.
  4. """
  5. """Define an event type for time-series data from sensors.
  6. from collections import namedtuple
  7. # Define a sensor event as a tuple of sensor id, timestamp, and value.
  8. # A 'sensor' is just a generator of sensor events.
  9. SensorEvent = namedtuple('SensorEvent', ['sensor_id', 'ts', 'val'])
  10. """
  11. import datetime
  12. import csv as csvlib
  13. import logging
  14. import os.path
  15. logger = logging.getLogger(__name__)
  16. from thingflow.base import InputThing, OutputThing, FatalError, \
  17. SensorEvent, filtermethod
  18. from thingflow.adapters.generic import EventRowMapping, DirectReader
  19. class EventSpreadsheetMapping(EventRowMapping):
  20. """Define the mapping between an event record and a spreadsheet.
  21. """
  22. def get_header_row(self):
  23. """Return a list of header row column names.
  24. """
  25. raise NotImplemented
  26. class SensorEventMapping(EventSpreadsheetMapping):
  27. """A maping that works for SensorEvent tuples. We map the time
  28. values twice - as the raw timestamp and as an iso-formatted datetime.
  29. """
  30. def get_header_row(self):
  31. return ['timestamp', 'datetime', 'sensor_id', 'value']
  32. def event_to_row(self, event):
  33. return [event.ts,
  34. datetime.datetime.utcfromtimestamp(event.ts).isoformat(),
  35. event.sensor_id,
  36. event.val]
  37. def row_to_event(self, row):
  38. ts = float(row[0])
  39. try:
  40. sensor_id = int(row[2])
  41. except ValueError:
  42. sensor_id = row[2] # does ot necessarily have to be an int
  43. val = float(row[3])
  44. return SensorEvent(ts=ts, sensor_id=sensor_id, val=val)
  45. default_event_mapper = SensorEventMapping()
  46. class CsvWriter(OutputThing, InputThing):
  47. def __init__(self, previous_in_chain, filename,
  48. mapper=default_event_mapper):
  49. super().__init__()
  50. self.filename = filename
  51. self.mapper = mapper
  52. self.file = open(filename, 'w', newline='')
  53. self.writer = csvlib.writer(self.file)
  54. self.writer.writerow(self.mapper.get_header_row())
  55. self.file.flush()
  56. self.dispose = previous_in_chain.connect(self)
  57. def on_next(self, x):
  58. self.writer.writerow(self.mapper.event_to_row(x))
  59. self.file.flush()
  60. self._dispatch_next(x)
  61. def on_completed(self):
  62. self.file.close()
  63. self._dispatch_completed()
  64. def on_error(self, e):
  65. self.file.close()
  66. self._dispatch_error(e)
  67. def __str__(self):
  68. return 'csv_writer(%s)' % self.filename
  69. @filtermethod(OutputThing)
  70. def csv_writer(this, filename, mapper=default_event_mapper):
  71. """Write an event stream to a csv file. mapper is an
  72. instance of EventSpreadsheetMapping.
  73. """
  74. return CsvWriter(this, filename, mapper)
  75. def default_get_date_from_event(event):
  76. return datetime.datetime.utcfromtimestamp(event.ts).date()
  77. class RollingCsvWriter(OutputThing, InputThing):
  78. """Write an event stream to csv files, rolling to a new file
  79. daily. The filename is basename-yyyy-mm-dd.cvv. Typically,
  80. basename is the sensor id.
  81. If sub_port is specified, the writer will subscribe to the specified port
  82. in the previous filter, rather than the default port. This is helpful
  83. when connecting to a dispatcher.
  84. """
  85. def __init__(self, previous_in_chain, directory,
  86. base_name,
  87. mapper=default_event_mapper,
  88. get_date=default_get_date_from_event,
  89. sub_port=None):
  90. super().__init__()
  91. self.directory = directory
  92. self.base_name = base_name
  93. self.mapper = mapper
  94. self.get_date = get_date
  95. self.current_file_date = None
  96. self.file = None
  97. self.writer = None
  98. if sub_port is None:
  99. self.dispose = previous_in_chain.connect(self)
  100. else:
  101. self.dispose = previous_in_chain.connect(self,
  102. port_mapping=(sub_port, 'default'))
  103. def _start_file(self, event_date):
  104. filename = os.path.join(self.directory,
  105. self.base_name +
  106. ('-%d-%02d-%02d.csv' %
  107. (event_date.year, event_date.month,
  108. event_date.day)))
  109. if os.path.exists(filename):
  110. self.file = open(filename, 'a', newline='')
  111. self.writer = csvlib.writer(self.file)
  112. # don't write header row for existing file
  113. else:
  114. self.file = open(filename, 'w', newline='')
  115. self.writer = csvlib.writer(self.file)
  116. self.writer.writerow(self.mapper.get_header_row())
  117. self.file.flush()
  118. self.current_file_date = event_date
  119. def on_next(self, x):
  120. event_date = self.get_date(x)
  121. if event_date!=self.current_file_date:
  122. if self.file:
  123. self.file.close()
  124. self._start_file(event_date)
  125. self.writer.writerow(self.mapper.event_to_row(x))
  126. self.file.flush()
  127. self._dispatch_next(x)
  128. def on_completed(self):
  129. if self.file:
  130. self.file.close()
  131. self._dispatch_completed()
  132. def on_error(self, e):
  133. if self.file:
  134. self.file.close()
  135. self._dispatch_error(e)
  136. def __str__(self):
  137. return 'rolling_csv_writer(%s)' % self.base_name
  138. @filtermethod(OutputThing)
  139. def rolling_csv_writer(this, directory, basename, mapper=default_event_mapper,
  140. get_date=default_get_date_from_event, sub_port=None):
  141. """Write an event stream to csv files, rolling to a new file
  142. daily. The filename is basename-yyyy-mm-dd.cvv. Typically,
  143. basename is the sensor id.
  144. If sub_port is specified, the writer will subscribe to the specified port
  145. in the previous filter, rather than the default port. This is helpful
  146. when connecting to a dispatcher.
  147. """
  148. return RollingCsvWriter(this, directory, basename, mapper=mapper,
  149. get_date=get_date, sub_port=sub_port)
  150. class CsvReader(DirectReader):
  151. def __init__(self, filename, mapper=default_event_mapper,
  152. has_header_row=True):
  153. """Creates a output_thing that reads a row at a time from a csv file
  154. and converts the rows into events using the specified mapping.
  155. """
  156. self.filename = filename
  157. self.file = open(filename, 'r', newline='')
  158. reader = csvlib.reader(self.file)
  159. if has_header_row:
  160. # swallow up the header row so it is not passed as data
  161. try:
  162. header_row = reader.__next__()
  163. logger.debug("header row of %s: %s", filename, ', '.join(header_row))
  164. except:
  165. raise FatalError("Problem in reading header row of csv file %s" % filename)
  166. super().__init__(reader, mapper, name='CsvReader(%s)'%filename)
  167. def _close(self):
  168. self.file.close()