test_postgres_adapters.py 3.0 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788
  1. # Copyright 2016 by MPI-SWS and Data-Ken Research.
  2. # Licensed under the Apache 2.0 License.
  3. """
  4. Test the postgres adapters
  5. """
  6. try:
  7. import psycopg2
  8. PREREQS_AVAILABLE = True
  9. except ImportError:
  10. PREREQS_AVAILABLE = False
  11. try:
  12. from config_for_tests import POSTGRES_DBNAME, POSTGRES_USER
  13. except ImportError:
  14. POSTGRES_DBNAME=None
  15. POSTGRES_USER=None
  16. # Set the following to True to skip the tearDown(). This is useful when
  17. # trying to debug a failing test but should be left at False in
  18. # production.
  19. DEBUG_MODE = False
  20. import asyncio
  21. import unittest
  22. from utils import ValueListSensor, SensorEventValidationInputThing
  23. from thingflow.base import Scheduler, InputThing
  24. if PREREQS_AVAILABLE:
  25. from thingflow.adapters.postgres import PostgresWriter, SensorEventMapping,\
  26. create_sensor_table, delete_sensor_table, PostgresReader
  27. from thingflow.filters.output import output
  28. from thingflow.filters.combinators import parallel
  29. sensor_values = [1, 2, 3, 4, 5]
  30. class CaptureInputThing(InputThing):
  31. def __init__(self):
  32. self.seq = []
  33. def on_next(self, x):
  34. self.seq.append(x)
  35. @unittest.skipUnless(PREREQS_AVAILABLE, "postgress client library not installed")
  36. @unittest.skipUnless(POSTGRES_DBNAME and POSTGRES_USER,
  37. "POSTGRES_DBNAME and POSTGRES_USER not defined in config_for_tests")
  38. class TestCase(unittest.TestCase):
  39. def setUp(self):
  40. self.mapping = SensorEventMapping('test_events')
  41. self.connect_string = "dbname=%s user=%s" % (POSTGRES_DBNAME,
  42. POSTGRES_USER)
  43. conn = psycopg2.connect(self.connect_string)
  44. create_sensor_table(conn, 'test_events', drop_if_exists=True)
  45. conn.close()
  46. def tearDown(self):
  47. if DEBUG_MODE:
  48. print("DEBUG_MODE=True, SKIPPING tearDown()")
  49. return
  50. self.connect_string = "dbname=%s user=%s" % (POSTGRES_DBNAME,
  51. POSTGRES_USER)
  52. conn = psycopg2.connect(self.connect_string)
  53. delete_sensor_table(conn, 'test_events')
  54. conn.close()
  55. def test_publish_and_subscribe(self):
  56. sensor = ValueListSensor(1, sensor_values)
  57. scheduler = Scheduler(asyncio.get_event_loop())
  58. pg = PostgresWriter(scheduler, self.connect_string, self.mapping)
  59. capture = CaptureInputThing()
  60. scheduler.schedule_sensor(sensor, 0.5,
  61. parallel(pg, output, capture))
  62. scheduler.run_forever()
  63. print("finish writing to the database")
  64. row_source = PostgresReader(self.connect_string, self.mapping)
  65. row_source.output()
  66. validate = SensorEventValidationInputThing(capture.seq, self)
  67. row_source.connect(validate)
  68. scheduler.schedule_recurring(row_source)
  69. scheduler.run_forever()
  70. self.assertTrue(validate.completed)
  71. print("finished reading rows")
  72. if __name__ == '__main__':
  73. unittest.main()