test_end_to_end.py 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116
  1. # Copyright 2016 by MPI-SWS and Data-Ken Research.
  2. # Licensed under the Apache 2.0 License.
  3. """These tests are designed to be run on a desktop. You can use
  4. them to validate the system before deploying to 8266. They use stub
  5. sensors.
  6. Test end-to-end functionality where we sample a sensor event and write to a
  7. queue. This only works if you have a broker at localhost:1883.
  8. To validate that the messages are being received, go to the commandline and
  9. run:
  10. mosquitto_sub -t test
  11. """
  12. MQTT_PORT=1883
  13. import sys
  14. import os
  15. import os.path
  16. import time
  17. try:
  18. from thingflow import *
  19. except ImportError:
  20. sys.path.append(os.path.abspath('../'))
  21. from thingflow import *
  22. import unittest
  23. from mqtt_writer import MQTTWriter
  24. class DummySensor(object):
  25. def __init__(self, sensor_id, value_stream, sample_time=0):
  26. self.sensor_id = sensor_id
  27. self.value_stream = value_stream
  28. self.idx = 0
  29. self.sample_time = sample_time
  30. def sample(self):
  31. if self.idx==len(self.value_stream):
  32. raise StopIteration()
  33. else:
  34. if self.sample_time > 0:
  35. print("Sensor simulating a sample time of %d seconds with a sleep" %
  36. self.sample_time)
  37. time.sleep(self.sample_time)
  38. val = self.value_stream[self.idx]
  39. self.idx += 1
  40. return val
  41. def __str__(self):
  42. return 'DummySensor'
  43. class ValidationInputThing:
  44. """Compare the values in a event stream to the expected values.
  45. Use the test_case for the assertions (for proper error reporting in a unit
  46. test).
  47. """
  48. def __init__(self, expected_stream, test_case,
  49. extract_value_fn=lambda event:event[2]):
  50. self.expected_stream = expected_stream
  51. self.next_idx = 0
  52. self.test_case = test_case
  53. self.extract_value_fn = extract_value_fn
  54. self.completed = False
  55. def on_next(self, x):
  56. tc = self.test_case
  57. tc.assertLess(self.next_idx, len(self.expected_stream),
  58. "Got an event after reaching the end of the expected stream")
  59. expected = self.expected_stream[self.next_idx]
  60. actual = self.extract_value_fn(x)
  61. tc.assertEqual(actual, expected,
  62. "Values for element %d of event stream mismatch" % self.next_idx)
  63. self.next_idx += 1
  64. def on_completed(self):
  65. tc = self.test_case
  66. tc.assertEqual(self.next_idx, len(self.expected_stream),
  67. "Got on_completed() before end of stream")
  68. self.completed = True
  69. def on_error(self, exc):
  70. tc = self.test_case
  71. tc.assertTrue(False,
  72. "Got an unexpected on_error call with parameter: %s" % exc)
  73. def is_broker_running():
  74. import subprocess
  75. rc = subprocess.call("netstat -an | grep %d" % MQTT_PORT, shell=True)
  76. if rc==0:
  77. return True
  78. else:
  79. return False
  80. @unittest.skipUnless(is_broker_running(),
  81. "Did not find a broker listening on port %d" % MQTT_PORT)
  82. class TestEndToEnd(unittest.TestCase):
  83. def test_publish_sensor(self):
  84. expected = [1, 2, 3, 4, 5]
  85. sensor = DummySensor('lux-1', expected)
  86. output_thing = SensorAsOutputThing(sensor)
  87. validator = ValidationInputThing(expected, self)
  88. output_thing.connect(validator)
  89. self.writer = MQTTWriter('thingflow', 'localhost', MQTT_PORT, 'test')
  90. output_thing.connect(self.writer)
  91. scheduler = Scheduler()
  92. scheduler.schedule_periodic(output_thing, 1)
  93. scheduler.run_forever()
  94. self.assertTrue(validator.completed)
  95. if __name__ == '__main__':
  96. unittest.main()