generic.py 1.9 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061
  1. # Copyright 2016 by MPI-SWS and Data-Ken Research.
  2. # Licensed under the Apache 2.0 License.
  3. """
  4. Generic reader and writer classes, to be subclassed for specific adapters.
  5. """
  6. from thingflow.base import OutputThing, DirectOutputThingMixin, FatalError
  7. class EventRowMapping:
  8. """Interface that converts between events and "rows"
  9. """
  10. def event_to_row(self, event):
  11. """Convert an event to the row representation (usually a
  12. list of values).
  13. """
  14. raise NotImplemented
  15. def row_to_event(self, row):
  16. """Convert a row to an event.
  17. """
  18. raise NotImplemented
  19. class DirectReader(OutputThing, DirectOutputThingMixin):
  20. """A reader that can be run in the current thread (does not block
  21. indefinitely). Reads rows from the iterable, converts them to events
  22. using the mapping and passes them on.
  23. """
  24. def __init__(self, iterable, mapper, name=None):
  25. super().__init__()
  26. self.iterable = iterable
  27. self.mapper = mapper
  28. self.name = name
  29. def _observe(self):
  30. try:
  31. row = self.iterable.__next__()
  32. self._dispatch_next(self.mapper.row_to_event(row))
  33. except StopIteration:
  34. self._close()
  35. self._dispatch_completed()
  36. except FatalError:
  37. self._close()
  38. raise
  39. except Exception as e:
  40. self._close()
  41. self._dispatch_error(e)
  42. def _close(self):
  43. """This method is called when we stop the iteration, either due to
  44. reaching the end of the sequence or an error. It can be overridden by
  45. subclasses to clean up any state and release resources (e.g. closing
  46. open files/connections).
  47. """
  48. pass
  49. def __str__(self):
  50. if hasattr(self, 'name') and self.name:
  51. return self.name
  52. else:
  53. return super().__str__()