test_transducer.py 1.7 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667
  1. # Copyright 2016 by MPI-SWS and Data-Ken Research.
  2. # Licensed under the Apache 2.0 License.
  3. """
  4. Tests related to the transducers framework and specific transducers that are
  5. defined in thingflow.filters.transducer
  6. """
  7. import asyncio
  8. import unittest
  9. from utils import ValueListSensor, ValidationInputThing
  10. from thingflow.base import Scheduler
  11. from thingflow.filters.transducer import SensorSlidingMean, PeriodicMedianTransducer, transduce
  12. from thingflow.filters.combinators import parallel
  13. from thingflow.filters.output import output
  14. value_stream = [
  15. 10,
  16. 11,
  17. 9,
  18. 12,
  19. 15,
  20. 6,
  21. 14,
  22. 9
  23. ]
  24. mean_stream = [
  25. 10,
  26. 10.5,
  27. 10,
  28. 10.5,
  29. 11.75,
  30. 10.5,
  31. 11.75,
  32. 11.0
  33. ]
  34. periodic_median_stream = [
  35. 10,
  36. 12,
  37. 11.5
  38. ]
  39. class TestCase(unittest.TestCase):
  40. def setUp(self):
  41. self.scheduler = Scheduler(asyncio.get_event_loop())
  42. self.sensor = ValueListSensor(1, value_stream)
  43. def test_sensor_event_sliding_window(self):
  44. vs = ValidationInputThing(mean_stream, self)
  45. self.scheduler.schedule_sensor(self.sensor, 0.1,
  46. transduce(SensorSlidingMean(4)),
  47. parallel(vs, output()))
  48. self.scheduler.run_forever()
  49. self.assertTrue(vs.completed)
  50. def test_periodic_median_transducer(self):
  51. vs = ValidationInputThing(periodic_median_stream, self)
  52. self.scheduler.schedule_sensor(self.sensor, 0.1,
  53. transduce(PeriodicMedianTransducer(3)),
  54. parallel(vs, output()))
  55. self.scheduler.run_forever()
  56. self.assertTrue(vs.completed)
  57. if __name__ == '__main__':
  58. unittest.main()