| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116 |
- # Copyright 2016 by MPI-SWS and Data-Ken Research.
- # Licensed under the Apache 2.0 License.
- """These tests are designed to be run on a desktop. You can use
- them to validate the system before deploying to 8266. They use stub
- sensors.
- Test end-to-end functionality where we sample a sensor event and write to a
- queue. This only works if you have a broker at localhost:1883.
- To validate that the messages are being received, go to the commandline and
- run:
- mosquitto_sub -t test
- """
- MQTT_PORT=1883
- import sys
- import os
- import os.path
- import time
- try:
- from thingflow import *
- except ImportError:
- sys.path.append(os.path.abspath('../'))
- from thingflow import *
- import unittest
- from mqtt_writer import MQTTWriter
- class DummySensor(object):
- def __init__(self, sensor_id, value_stream, sample_time=0):
- self.sensor_id = sensor_id
- self.value_stream = value_stream
- self.idx = 0
- self.sample_time = sample_time
- def sample(self):
- if self.idx==len(self.value_stream):
- raise StopIteration()
- else:
- if self.sample_time > 0:
- print("Sensor simulating a sample time of %d seconds with a sleep" %
- self.sample_time)
- time.sleep(self.sample_time)
- val = self.value_stream[self.idx]
- self.idx += 1
- return val
- def __str__(self):
- return 'DummySensor'
- class ValidationInputThing:
- """Compare the values in a event stream to the expected values.
- Use the test_case for the assertions (for proper error reporting in a unit
- test).
- """
- def __init__(self, expected_stream, test_case,
- extract_value_fn=lambda event:event[2]):
- self.expected_stream = expected_stream
- self.next_idx = 0
- self.test_case = test_case
- self.extract_value_fn = extract_value_fn
- self.completed = False
- def on_next(self, x):
- tc = self.test_case
- tc.assertLess(self.next_idx, len(self.expected_stream),
- "Got an event after reaching the end of the expected stream")
- expected = self.expected_stream[self.next_idx]
- actual = self.extract_value_fn(x)
- tc.assertEqual(actual, expected,
- "Values for element %d of event stream mismatch" % self.next_idx)
- self.next_idx += 1
- def on_completed(self):
- tc = self.test_case
- tc.assertEqual(self.next_idx, len(self.expected_stream),
- "Got on_completed() before end of stream")
- self.completed = True
- def on_error(self, exc):
- tc = self.test_case
- tc.assertTrue(False,
- "Got an unexpected on_error call with parameter: %s" % exc)
- def is_broker_running():
- import subprocess
- rc = subprocess.call("netstat -an | grep %d" % MQTT_PORT, shell=True)
- if rc==0:
- return True
- else:
- return False
- @unittest.skipUnless(is_broker_running(),
- "Did not find a broker listening on port %d" % MQTT_PORT)
- class TestEndToEnd(unittest.TestCase):
- def test_publish_sensor(self):
- expected = [1, 2, 3, 4, 5]
- sensor = DummySensor('lux-1', expected)
- output_thing = SensorAsOutputThing(sensor)
- validator = ValidationInputThing(expected, self)
- output_thing.connect(validator)
- self.writer = MQTTWriter('thingflow', 'localhost', MQTT_PORT, 'test')
- output_thing.connect(self.writer)
- scheduler = Scheduler()
- scheduler.schedule_periodic(output_thing, 1)
- scheduler.run_forever()
- self.assertTrue(validator.completed)
-
- if __name__ == '__main__':
- unittest.main()
|