test_base.py 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137
  1. # Copyright 2016 by MPI-SWS and Data-Ken Research.
  2. # Licensed under the Apache 2.0 License.
  3. """These tests are designed to be run on a desktop. You can use
  4. them to validate the system before deploying to 8266. They use stub
  5. sensors.
  6. Test the core thingflow functionality.
  7. """
  8. import sys
  9. import os
  10. import os.path
  11. import time
  12. try:
  13. from thingflow import *
  14. except ImportError:
  15. sys.path.append(os.path.abspath('../'))
  16. from thingflow import *
  17. import unittest
  18. class DummySensor:
  19. __slots__ = ('value_stream', 'sample_time', 'idx', 'sensor_id')
  20. def __init__(self, value_stream, sample_time=0):
  21. self.value_stream = value_stream
  22. self.idx = 0
  23. self.sample_time = sample_time
  24. self.sensor_id = 1
  25. def sample(self):
  26. if self.idx==len(self.value_stream):
  27. raise StopIteration()
  28. else:
  29. if self.sample_time > 0:
  30. print("Sensor simulating a sample time of %d seconds with a sleep" %
  31. self.sample_time)
  32. time.sleep(self.sample_time)
  33. val = self.value_stream[self.idx]
  34. self.idx += 1
  35. return val
  36. def __str__(self):
  37. return 'DummySensor'
  38. class ValidationInputThing:
  39. """Compare the values in a event stream to the expected values.
  40. Use the test_case for the assertions (for proper error reporting in a unit
  41. test).
  42. """
  43. def __init__(self, expected_stream, test_case,
  44. extract_value_fn=lambda event:event[2]):
  45. self.expected_stream = expected_stream
  46. self.next_idx = 0
  47. self.test_case = test_case
  48. self.extract_value_fn = extract_value_fn
  49. self.completed = False
  50. def on_next(self, x):
  51. tc = self.test_case
  52. tc.assertLess(self.next_idx, len(self.expected_stream),
  53. "Got an event after reaching the end of the expected stream")
  54. expected = self.expected_stream[self.next_idx]
  55. actual = self.extract_value_fn(x)
  56. tc.assertEqual(actual, expected,
  57. "Values for element %d of event stream mismatch" % self.next_idx)
  58. print(x)
  59. self.next_idx += 1
  60. def on_completed(self):
  61. tc = self.test_case
  62. tc.assertEqual(self.next_idx, len(self.expected_stream),
  63. "Got on_completed() before end of stream")
  64. self.completed = True
  65. def on_error(self, exc):
  66. tc = self.test_case
  67. tc.assertTrue(False,
  68. "Got an unexpected on_error call with parameter: %s" % exc)
  69. class TestBase(unittest.TestCase):
  70. def test_base(self):
  71. expected = [1, 2, 3, 4, 5]
  72. sensor = DummySensor(expected)
  73. output_thing = SensorAsOutputThing(sensor)
  74. validator = ValidationInputThing(expected, self)
  75. output_thing.connect(validator)
  76. scheduler = Scheduler()
  77. scheduler.schedule_periodic(output_thing, 1)
  78. scheduler.run_forever()
  79. self.assertTrue(validator.completed)
  80. def test_schedule_sensor(self):
  81. expected = [1, 2, 3, 4, 5]
  82. sensor = DummySensor(expected)
  83. validator = ValidationInputThing(expected, self)
  84. scheduler = Scheduler()
  85. scheduler.schedule_sensor(sensor, 1, validator)
  86. scheduler.run_forever()
  87. self.assertTrue(validator.completed)
  88. def test_nonzero_sample_time(self):
  89. """Sensor sample time is greater than the interval between samples!
  90. """
  91. expected = [1, 2, 3, 4, 5]
  92. sensor = DummySensor(expected, sample_time=2)
  93. output_thing = SensorAsOutputThing(sensor)
  94. validator = ValidationInputThing(expected, self)
  95. output_thing.connect(validator)
  96. scheduler = Scheduler()
  97. scheduler.schedule_periodic(output_thing, 1)
  98. scheduler.run_forever()
  99. self.assertTrue(validator.completed)
  100. def test_subsecond_schedule_interval(self):
  101. expected = [1, 2, 3, 4, 5]
  102. sensor = DummySensor(expected)
  103. output_thing = SensorAsOutputThing(sensor)
  104. validator = ValidationInputThing(expected, self)
  105. output_thing.connect(validator)
  106. scheduler = Scheduler()
  107. scheduler.schedule_periodic(output_thing, 0.25)
  108. start = time.time()
  109. scheduler.run_forever()
  110. stop = time.time()
  111. elapsed = stop - start
  112. print("elapsed was %s" % round(elapsed, 2))
  113. self.assertTrue((elapsed>1.0) and (elapsed<2.0),
  114. "Elapsed time should be between 1 and 2 seconds, was %s" % elapsed)
  115. self.assertTrue(validator.completed)
  116. if __name__ == '__main__':
  117. unittest.main()