postgres.py 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152
  1. # Copyright 2016 by MPI-SWS and Data-Ken Research.
  2. # Licensed under the Apache 2.0 License.
  3. from thingflow.base import BlockingInputThing, OutputThing, DirectOutputThingMixin,\
  4. FatalError, SensorEvent
  5. from thingflow.adapters.generic import EventRowMapping
  6. import datetime
  7. import psycopg2
  8. class DatabaseMapping(EventRowMapping):
  9. def __init__(self, table_name):
  10. """Define how we map between the event and database world for
  11. a given port. field_to_colname mappings should be a list
  12. of (field_name, column_name) pairs.
  13. """
  14. self.table_name = table_name
  15. self.insert_sql = "insert into %s (%s) values (%s);" % \
  16. (table_name,
  17. ', '.join(self.get_col_names()),
  18. ', '.join(['%s']*len(self.get_col_names())))
  19. self.query_sql = "select %s from %s order by id asc;" % \
  20. (', '.join(self.get_col_names()), table_name)
  21. def get_col_names(self):
  22. """Return the column names for use in sql statements
  23. """
  24. raise NotImplemented
  25. def event_to_row(self, event):
  26. """Convert an event to a tuple of values suitable for use as sql
  27. bind values for a single row.
  28. """
  29. raise NotImplemented
  30. def row_to_event(self, row):
  31. """Given a tuple representing a row of results returned by a sql
  32. select, return an event
  33. """
  34. raise NotImplemented
  35. class SensorEventMapping(DatabaseMapping):
  36. def __init__(self, table_name):
  37. super().__init__(table_name)
  38. def get_col_names(self):
  39. return ['ts', 'sensor_id', 'val']
  40. def event_to_row(self, event):
  41. return (datetime.datetime.fromtimestamp(event.ts), event.sensor_id, event.val)
  42. def row_to_event(self, row):
  43. assert len(row)==3, "Expecting 3 elements, got '%s'" % row.__repr__()
  44. #dt = datetime.datetime.strptime(row[0], "%Y-%m-%d %H:%M:%S")
  45. return SensorEvent(ts=row[0].timestamp(), sensor_id=row[1], val=row[2])
  46. def create_sensor_table(conn, table_name, drop_if_exists=False):
  47. """Utility function to create a sensor event table.
  48. """
  49. seqname = table_name + '_seq'
  50. cur = conn.cursor()
  51. def exe(stmt):
  52. print(stmt)
  53. cur.execute(stmt)
  54. if drop_if_exists:
  55. exe("drop table if exists %s" % table_name)
  56. exe("drop sequence if exists %s;" % seqname)
  57. exe("create sequence %s" % seqname)
  58. exe("create table %s(id bigint NOT NULL DEFAULT nextval('%s'), ts timestamp NOT NULL, sensor_id integer NOT NULL, val double precision NOT NULL);" %
  59. (table_name, seqname))
  60. exe("create unique index on %s (id);" % table_name)
  61. exe("create index on %s (ts);" % table_name)
  62. exe("create index on %s (sensor_id);" % table_name)
  63. conn.commit()
  64. cur.close()
  65. def delete_sensor_table(conn, table_name):
  66. """Utility funtion to delete a sensor event table and its associated sequence.
  67. """
  68. seqname = table_name + '_seq'
  69. cur = conn.cursor()
  70. def exe(stmt):
  71. print(stmt)
  72. cur.execute(stmt)
  73. exe("drop table if exists %s" % table_name)
  74. exe("drop sequence if exists %s;" % seqname)
  75. class PostgresWriter(BlockingInputThing):
  76. """Write the events to the database.
  77. """
  78. def __init__(self, scheduler, connect_string, mapping):
  79. self.mapping = mapping
  80. self.conn = psycopg2.connect(connect_string)
  81. super().__init__(scheduler)
  82. def _on_next(self, port, x):
  83. data = self.mapping.event_to_row(x)
  84. cur = self.conn.cursor()
  85. cur.execute(self.mapping.insert_sql,data)
  86. print("%s %s" % (self.mapping.insert_sql, data.__repr__()))
  87. self.conn.commit()
  88. cur.close()
  89. def _on_completed(self, port):
  90. pass
  91. def _on_error(self, port):
  92. pass
  93. def _close(self):
  94. self.conn.close()
  95. class PostgresReader(OutputThing, DirectOutputThingMixin):
  96. """Read a row from the table to the default port each
  97. time _observe() is called. Note that this output_thing signals
  98. completed when it finishes the query. We could also imagine a
  99. version that keeps looking for new rows, re-running the query
  100. as needed.
  101. """
  102. def __init__(self, connect_string, mapping):
  103. self.conn = psycopg2.connect(connect_string)
  104. self.mapping = mapping
  105. self.cur = None
  106. super().__init__()
  107. def _observe(self):
  108. try:
  109. if not self.cur:
  110. self.cur = self.conn.cursor()
  111. print(self.mapping.query_sql)
  112. self.cur.execute(self.mapping.query_sql)
  113. row = self.cur.fetchone()
  114. if row is not None:
  115. self._dispatch_next(self.mapping.row_to_event(row))
  116. return True
  117. else:
  118. if self.cur:
  119. self.cur.close()
  120. self.conn.close()
  121. self._dispatch_completed()
  122. return False
  123. except FatalError:
  124. raise
  125. except Exception as e:
  126. if self.cur:
  127. self.cur.close()
  128. self.conn.close()
  129. self._dispatch_error(e)
  130. return False