| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137 |
- # Copyright 2016 by MPI-SWS and Data-Ken Research.
- # Licensed under the Apache 2.0 License.
- """Verify the csv reader/writer through a round trip
- """
- import unittest
- import time
- from tempfile import NamedTemporaryFile
- import os
- import asyncio
- import datetime
- from thingflow.base import Scheduler, IterableAsOutputThing, SensorEvent
- from thingflow.adapters.csv import CsvReader, default_event_mapper
- import thingflow.filters.dispatch
- from utils import make_test_output_thing, CaptureInputThing, \
- SensorEventValidationInputThing
- NUM_EVENTS=5
- class TestCases(unittest.TestCase):
- def test_default_mapper(self):
- """Verify the class that maps between an event and a sensor
- """
- event = SensorEvent(ts=time.time(), sensor_id=1, val=123.456)
- row = default_event_mapper.event_to_row(event)
- event2 = default_event_mapper.row_to_event(row)
- self.assertEqual(event2, event,
- "Round-tripped event does not match original event")
- def test_file_write_read(self):
- tf = NamedTemporaryFile(mode='w', delete=False)
- tf.close()
- try:
- sensor = make_test_output_thing(1, stop_after_events=NUM_EVENTS)
- capture = CaptureInputThing()
- sensor.connect(capture)
- sensor.csv_writer(tf.name)
- scheduler = Scheduler(asyncio.get_event_loop())
- scheduler.schedule_recurring(sensor)
- print("Writing sensor events to temp file")
- scheduler.run_forever()
- self.assertTrue(capture.completed, "CaptureInputThing did not complete")
- self.assertEqual(len(capture.events), NUM_EVENTS,
- "number of events captured did not match generated events")
- reader = CsvReader(tf.name)
- vs = SensorEventValidationInputThing(capture.events, self)
- reader.connect(vs)
- scheduler.schedule_recurring(reader)
- print("reading sensor events back from temp file")
- scheduler.run_forever()
- self.assertTrue(vs.completed, "ValidationInputThing did not complete")
- finally:
- os.remove(tf.name)
- # data for rollover test
- ROLLING_FILE1 = 'dining-room-2015-01-01.csv'
- ROLLING_FILE2 = 'dining-room-2015-01-02.csv'
- FILES = [ROLLING_FILE1, ROLLING_FILE2]
- def make_ts(day, hr, minute):
- return (datetime.datetime(2015, 1, day, hr, minute) - datetime.datetime(1970,1,1)).total_seconds()
- EVENTS = [SensorEvent('dining-room', make_ts(1, 11, 1), 1),
- SensorEvent('dining-room', make_ts(1, 11, 2), 2),
- SensorEvent('dining-room', make_ts(2, 11, 1), 3),
- SensorEvent('dining-room', make_ts(2, 11, 2), 4)]
- # data for dispatch test
- sensor_ids = ['dining-room', 'living-room']
- ROLLING_FILE3 = 'living-room-2015-01-01.csv'
- ROLLING_FILE4 = 'living-room-2015-01-02.csv'
- FILES2 = [ROLLING_FILE1, ROLLING_FILE2, ROLLING_FILE3, ROLLING_FILE4]
- EVENTS2 = [SensorEvent('dining-room', make_ts(1, 11, 1), 1),
- SensorEvent('living-room', make_ts(1, 11, 2), 2),
- SensorEvent('living-room', make_ts(2, 11, 1), 3),
- SensorEvent('dining-room', make_ts(2, 11, 2), 4)]
- def make_rule(sensor_id):
- return (lambda evt: evt.sensor_id==sensor_id, sensor_id)
- dispatch_rules = [make_rule(s) for s in sensor_ids]
- class TestRollingCsvWriter(unittest.TestCase):
- def _cleanup(self):
- for f in FILES2:
- if os.path.exists(f):
- os.remove(f)
- def setUp(self):
- self._cleanup()
- def tearDown(self):
- self._cleanup()
-
- def test_rollover(self):
- def generator():
- for e in EVENTS:
- yield e
- sensor = IterableAsOutputThing(generator(), name='sensor')
- sensor.rolling_csv_writer('.', 'dining-room')
- vs = SensorEventValidationInputThing(EVENTS, self)
- sensor.connect(vs)
- scheduler = Scheduler(asyncio.get_event_loop())
- scheduler.schedule_recurring(sensor)
- scheduler.run_forever()
- for f in FILES:
- self.assertTrue(os.path.exists(f), 'did not find file %s' % f)
- print("found log file %s" % f)
- def test_dispatch(self):
- """Test a scenario where we dispatch to one of several writers
- depending on the sensor id.
- """
- def generator():
- for e in EVENTS2:
- yield e
- sensor = IterableAsOutputThing(generator(), name='sensor')
- dispatcher = sensor.dispatch(dispatch_rules)
- for s in sensor_ids:
- dispatcher.rolling_csv_writer('.', s, sub_port=s)
- dispatcher.connect(lambda x: self.assertTrue(False, "bad dispatch of %s" % x))
- scheduler = Scheduler(asyncio.get_event_loop())
- scheduler.schedule_recurring(sensor)
- scheduler.run_forever()
- for f in FILES2:
- self.assertTrue(os.path.exists(f), 'did not find file %s' % f)
- cnt = 0
- with open(f, 'r') as fobj:
- for line in fobj:
- cnt +=1
- self.assertEqual(2, cnt, "File %s did not have 2 lines" % f)
- print("found log file %s" % f)
-
- if __name__ == '__main__':
- unittest.main()
-
-
|