test_csv_adapters.py 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137
  1. # Copyright 2016 by MPI-SWS and Data-Ken Research.
  2. # Licensed under the Apache 2.0 License.
  3. """Verify the csv reader/writer through a round trip
  4. """
  5. import unittest
  6. import time
  7. from tempfile import NamedTemporaryFile
  8. import os
  9. import asyncio
  10. import datetime
  11. from thingflow.base import Scheduler, IterableAsOutputThing, SensorEvent
  12. from thingflow.adapters.csv import CsvReader, default_event_mapper
  13. import thingflow.filters.dispatch
  14. from utils import make_test_output_thing, CaptureInputThing, \
  15. SensorEventValidationInputThing
  16. NUM_EVENTS=5
  17. class TestCases(unittest.TestCase):
  18. def test_default_mapper(self):
  19. """Verify the class that maps between an event and a sensor
  20. """
  21. event = SensorEvent(ts=time.time(), sensor_id=1, val=123.456)
  22. row = default_event_mapper.event_to_row(event)
  23. event2 = default_event_mapper.row_to_event(row)
  24. self.assertEqual(event2, event,
  25. "Round-tripped event does not match original event")
  26. def test_file_write_read(self):
  27. tf = NamedTemporaryFile(mode='w', delete=False)
  28. tf.close()
  29. try:
  30. sensor = make_test_output_thing(1, stop_after_events=NUM_EVENTS)
  31. capture = CaptureInputThing()
  32. sensor.connect(capture)
  33. sensor.csv_writer(tf.name)
  34. scheduler = Scheduler(asyncio.get_event_loop())
  35. scheduler.schedule_recurring(sensor)
  36. print("Writing sensor events to temp file")
  37. scheduler.run_forever()
  38. self.assertTrue(capture.completed, "CaptureInputThing did not complete")
  39. self.assertEqual(len(capture.events), NUM_EVENTS,
  40. "number of events captured did not match generated events")
  41. reader = CsvReader(tf.name)
  42. vs = SensorEventValidationInputThing(capture.events, self)
  43. reader.connect(vs)
  44. scheduler.schedule_recurring(reader)
  45. print("reading sensor events back from temp file")
  46. scheduler.run_forever()
  47. self.assertTrue(vs.completed, "ValidationInputThing did not complete")
  48. finally:
  49. os.remove(tf.name)
  50. # data for rollover test
  51. ROLLING_FILE1 = 'dining-room-2015-01-01.csv'
  52. ROLLING_FILE2 = 'dining-room-2015-01-02.csv'
  53. FILES = [ROLLING_FILE1, ROLLING_FILE2]
  54. def make_ts(day, hr, minute):
  55. return (datetime.datetime(2015, 1, day, hr, minute) - datetime.datetime(1970,1,1)).total_seconds()
  56. EVENTS = [SensorEvent('dining-room', make_ts(1, 11, 1), 1),
  57. SensorEvent('dining-room', make_ts(1, 11, 2), 2),
  58. SensorEvent('dining-room', make_ts(2, 11, 1), 3),
  59. SensorEvent('dining-room', make_ts(2, 11, 2), 4)]
  60. # data for dispatch test
  61. sensor_ids = ['dining-room', 'living-room']
  62. ROLLING_FILE3 = 'living-room-2015-01-01.csv'
  63. ROLLING_FILE4 = 'living-room-2015-01-02.csv'
  64. FILES2 = [ROLLING_FILE1, ROLLING_FILE2, ROLLING_FILE3, ROLLING_FILE4]
  65. EVENTS2 = [SensorEvent('dining-room', make_ts(1, 11, 1), 1),
  66. SensorEvent('living-room', make_ts(1, 11, 2), 2),
  67. SensorEvent('living-room', make_ts(2, 11, 1), 3),
  68. SensorEvent('dining-room', make_ts(2, 11, 2), 4)]
  69. def make_rule(sensor_id):
  70. return (lambda evt: evt.sensor_id==sensor_id, sensor_id)
  71. dispatch_rules = [make_rule(s) for s in sensor_ids]
  72. class TestRollingCsvWriter(unittest.TestCase):
  73. def _cleanup(self):
  74. for f in FILES2:
  75. if os.path.exists(f):
  76. os.remove(f)
  77. def setUp(self):
  78. self._cleanup()
  79. def tearDown(self):
  80. self._cleanup()
  81. def test_rollover(self):
  82. def generator():
  83. for e in EVENTS:
  84. yield e
  85. sensor = IterableAsOutputThing(generator(), name='sensor')
  86. sensor.rolling_csv_writer('.', 'dining-room')
  87. vs = SensorEventValidationInputThing(EVENTS, self)
  88. sensor.connect(vs)
  89. scheduler = Scheduler(asyncio.get_event_loop())
  90. scheduler.schedule_recurring(sensor)
  91. scheduler.run_forever()
  92. for f in FILES:
  93. self.assertTrue(os.path.exists(f), 'did not find file %s' % f)
  94. print("found log file %s" % f)
  95. def test_dispatch(self):
  96. """Test a scenario where we dispatch to one of several writers
  97. depending on the sensor id.
  98. """
  99. def generator():
  100. for e in EVENTS2:
  101. yield e
  102. sensor = IterableAsOutputThing(generator(), name='sensor')
  103. dispatcher = sensor.dispatch(dispatch_rules)
  104. for s in sensor_ids:
  105. dispatcher.rolling_csv_writer('.', s, sub_port=s)
  106. dispatcher.connect(lambda x: self.assertTrue(False, "bad dispatch of %s" % x))
  107. scheduler = Scheduler(asyncio.get_event_loop())
  108. scheduler.schedule_recurring(sensor)
  109. scheduler.run_forever()
  110. for f in FILES2:
  111. self.assertTrue(os.path.exists(f), 'did not find file %s' % f)
  112. cnt = 0
  113. with open(f, 'r') as fobj:
  114. for line in fobj:
  115. cnt +=1
  116. self.assertEqual(2, cnt, "File %s did not have 2 lines" % f)
  117. print("found log file %s" % f)
  118. if __name__ == '__main__':
  119. unittest.main()