| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788 |
- # Copyright 2016 by MPI-SWS and Data-Ken Research.
- # Licensed under the Apache 2.0 License.
- """
- Test the postgres adapters
- """
- try:
- import psycopg2
- PREREQS_AVAILABLE = True
- except ImportError:
- PREREQS_AVAILABLE = False
- try:
- from config_for_tests import POSTGRES_DBNAME, POSTGRES_USER
- except ImportError:
- POSTGRES_DBNAME=None
- POSTGRES_USER=None
- # Set the following to True to skip the tearDown(). This is useful when
- # trying to debug a failing test but should be left at False in
- # production.
- DEBUG_MODE = False
- import asyncio
- import unittest
- from utils import ValueListSensor, SensorEventValidationInputThing
- from thingflow.base import Scheduler, InputThing
- if PREREQS_AVAILABLE:
- from thingflow.adapters.postgres import PostgresWriter, SensorEventMapping,\
- create_sensor_table, delete_sensor_table, PostgresReader
- from thingflow.filters.output import output
- from thingflow.filters.combinators import parallel
- sensor_values = [1, 2, 3, 4, 5]
- class CaptureInputThing(InputThing):
- def __init__(self):
- self.seq = []
- def on_next(self, x):
- self.seq.append(x)
-
-
- @unittest.skipUnless(PREREQS_AVAILABLE, "postgress client library not installed")
- @unittest.skipUnless(POSTGRES_DBNAME and POSTGRES_USER,
- "POSTGRES_DBNAME and POSTGRES_USER not defined in config_for_tests")
- class TestCase(unittest.TestCase):
- def setUp(self):
- self.mapping = SensorEventMapping('test_events')
- self.connect_string = "dbname=%s user=%s" % (POSTGRES_DBNAME,
- POSTGRES_USER)
- conn = psycopg2.connect(self.connect_string)
- create_sensor_table(conn, 'test_events', drop_if_exists=True)
- conn.close()
- def tearDown(self):
- if DEBUG_MODE:
- print("DEBUG_MODE=True, SKIPPING tearDown()")
- return
- self.connect_string = "dbname=%s user=%s" % (POSTGRES_DBNAME,
- POSTGRES_USER)
- conn = psycopg2.connect(self.connect_string)
- delete_sensor_table(conn, 'test_events')
- conn.close()
-
- def test_publish_and_subscribe(self):
- sensor = ValueListSensor(1, sensor_values)
- scheduler = Scheduler(asyncio.get_event_loop())
- pg = PostgresWriter(scheduler, self.connect_string, self.mapping)
- capture = CaptureInputThing()
- scheduler.schedule_sensor(sensor, 0.5,
- parallel(pg, output, capture))
- scheduler.run_forever()
- print("finish writing to the database")
- row_source = PostgresReader(self.connect_string, self.mapping)
- row_source.output()
- validate = SensorEventValidationInputThing(capture.seq, self)
- row_source.connect(validate)
- scheduler.schedule_recurring(row_source)
- scheduler.run_forever()
- self.assertTrue(validate.completed)
- print("finished reading rows")
- if __name__ == '__main__':
- unittest.main()
|