test_influxdb.py 3.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192
  1. # Make sure influxdb is running
  2. try:
  3. from influxdb import InfluxDBClient
  4. from thingflow.adapters.influxdb import InfluxDBWriter,\
  5. InfluxDBReader
  6. PREREQS_AVAILABLE=True
  7. except ImportError:
  8. PREREQS_AVAILABLE = False
  9. try:
  10. from config_for_tests import INFLUXDB_USER, INFLUXDB_PASSWORD
  11. except ImportError:
  12. INFLUXDB_USER=None
  13. INFLUXDB_PASSWORD=None
  14. try:
  15. from config_for_tests import INFLUXDB_DATABASE
  16. except ImportError:
  17. INFLUXDB_DATABASE='thingflow' # the default
  18. import asyncio
  19. import datetime, time
  20. from collections import namedtuple
  21. import unittest
  22. from utils import ValueListSensor
  23. from thingflow.base import Scheduler, SensorAsOutputThing, \
  24. SensorEvent, CallableAsInputThing
  25. Sensor = namedtuple('Sensor', ['series_name', 'fields', 'tags'])
  26. value_stream = [10, 13, 20, 20, 19, 19, 20, 21, 28, 28, 23, 21, 21, 18, 19, 16, 21,
  27. 10, 13, 20, 20, 19, 19, 20, 21, 28, 28, 23, 21, 21, 18, 19, 16, 21]
  28. value_stream2 = [2, 3, 2, 2, 9, 9, 2, 1, 8, 8, 3, 2, 1, 8, 9, 6, 2, 2, 3, 4, 5, 6, 7, 8, 2, 3, 4,
  29. 6, 2, 3, 2, 2, 9, 9, 2, 1, 8, 8, 3, 2, 1, 8, 9, 6, 2, 2, 3, 4, 5, 6, 7, 8, 2, 3, 4, 6,
  30. 6, 2, 3, 2, 2, 9, 9, 2, 1, 8, 8, 3, 2, 1, 8, 9, 6, 2, 2, 3, 4, 5, 6, 7, 8, 2, 3, 4, 6,
  31. 6, 2, 3, 2, 2, 9, 9, 2, 1, 8, 8, 3, 2, 1, 8, 9, 6, 2, 2, 3, 4, 5, 6, 7, 8, 2, 3, 4, 6,
  32. 6, 2, 3, 2, 2, 9, 9, 2, 1, 8, 8, 3, 2, 1, 8, 9, 6, 2, 2, 3, 4, 5, 6, 7, 8, 2, 3, 4, 6]
  33. @unittest.skipUnless(PREREQS_AVAILABLE,
  34. "influxdb client library not installed")
  35. @unittest.skipUnless(INFLUXDB_USER is not None,
  36. "Influxdb not configured in config_for_tests.py")
  37. class TestInflux(unittest.TestCase):
  38. def test_influx_output(self):
  39. loop = asyncio.get_event_loop()
  40. s = ValueListSensor(1, value_stream)
  41. p = SensorAsOutputThing(s)
  42. b = InfluxDBWriter(msg_format=Sensor(series_name='Sensor', fields=['val', 'ts'], tags=['sensor_id']),
  43. generate_timestamp=False,
  44. username=INFLUXDB_USER,
  45. password=INFLUXDB_PASSWORD,
  46. database=INFLUXDB_DATABASE)
  47. p.connect(b)
  48. scheduler = Scheduler(loop)
  49. scheduler.schedule_periodic(p, 0.2) # sample five times every second
  50. scheduler.run_forever()
  51. # Now play back
  52. rs = self.c.query('SELECT * FROM Sensor;').get_points()
  53. for d in rs:
  54. print(d)
  55. # Play back using an output thing
  56. p = InfluxDBReader('SELECT * FROM Sensor;',
  57. database=INFLUXDB_DATABASE,
  58. username=INFLUXDB_USER,
  59. password=INFLUXDB_PASSWORD)
  60. p.connect(CallableAsInputThing(print))
  61. scheduler = Scheduler(loop)
  62. scheduler.schedule_periodic(p, 0.2) # sample five times every second
  63. scheduler.run_forever()
  64. print("That's all folks")
  65. def setUp(self):
  66. self.c = InfluxDBClient(database=INFLUXDB_DATABASE,
  67. username=INFLUXDB_USER,
  68. password=INFLUXDB_PASSWORD)
  69. self.c.delete_series(measurement='Sensor')
  70. #self.c.query('DELETE from Sensor;')
  71. def tearDown(self):
  72. self.c.delete_series(measurement='Sensor')
  73. #self.c.query('DELETE from Sensor;')
  74. if __name__ == '__main__':
  75. unittest.main()