test_mqtt.py 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135
  1. # Copyright 2016 by MPI-SWS and Data-Ken Research.
  2. # Licensed under the Apache 2.0 License.
  3. """Test mqtt broker
  4. In addition to testing mqtt publish/subscribe functionality, this runs a
  5. output_thing that has its own event loop.
  6. To run the test, you will need the paho-mqtt client and the mosquitto broker.
  7. You can get the client via:
  8. pip install paho-mqtt
  9. On Debian-based linuxes, you can get the broker via:
  10. sudo apt-get install mosquitto
  11. We assume that the broker is listening on localhost:1883.
  12. """
  13. import unittest
  14. import sys
  15. import thingflow.filters.output
  16. import thingflow.filters.json
  17. import thingflow.filters.select
  18. from thingflow.base import Scheduler, InputThing, SensorEvent, ScheduleError, ExcInDispatch
  19. from thingflow.adapters.mqtt import MQTTReader, MQTTWriter
  20. from utils import make_test_output_thing_from_vallist, ValidationInputThing
  21. try:
  22. import paho.mqtt
  23. MQTT_CLIENT_AVAILABLE = True
  24. except ImportError:
  25. MQTT_CLIENT_AVAILABLE = False
  26. MQTT_PORT=1883
  27. import asyncio
  28. sensor_data = [1, 2, 3, 4, 5]
  29. class StopLoopAfter(InputThing):
  30. def __init__(self, stop_after, cancel_thunk):
  31. self.events_left = stop_after
  32. self.cancel_thunk = cancel_thunk
  33. def on_next(self, x):
  34. self.events_left -= 1
  35. if self.events_left == 0:
  36. print("Requesting stop of event loop")
  37. self.cancel_thunk()
  38. def mqtt_msg_to_unicode(m):
  39. v = (m.payload).decode("utf-8")
  40. return v
  41. def is_broker_running():
  42. import subprocess
  43. rc = subprocess.call("netstat -an | grep %d" % MQTT_PORT, shell=True)
  44. if rc==0:
  45. print("MQTT broker running")
  46. return True
  47. else:
  48. print("MQTT broker not running")
  49. return False
  50. @unittest.skipUnless(MQTT_CLIENT_AVAILABLE,
  51. "MQTT client not installed for python at %s" % sys.executable)
  52. @unittest.skipUnless(is_broker_running(),
  53. "MQTT broker not running on port %d" % MQTT_PORT)
  54. class TestCase(unittest.TestCase):
  55. def test_mqtt(self):
  56. loop = asyncio.get_event_loop()
  57. s = Scheduler(loop)
  58. sensor = make_test_output_thing_from_vallist(1, sensor_data)
  59. mqtt_writer = MQTTWriter('localhost', topics=[('bogus/bogus',0),])
  60. sensor.to_json().connect(mqtt_writer)
  61. s.schedule_periodic(sensor, 0.5)
  62. mqtt_reader = MQTTReader("localhost", topics=[('bogus/bogus', 0),])
  63. vs = ValidationInputThing(sensor_data, self)
  64. mqtt_reader.take(5).select(mqtt_msg_to_unicode).from_json(constructor=SensorEvent) \
  65. .output().connect(vs)
  66. c = s.schedule_on_private_event_loop(mqtt_reader)
  67. stop = StopLoopAfter(5, c)
  68. mqtt_reader.connect(stop)
  69. mqtt_reader.print_downstream()
  70. sensor.print_downstream()
  71. s.run_forever()
  72. loop.stop()
  73. self.assertTrue(vs.completed)
  74. print("that's it")
  75. def test_daniels_bug(self):
  76. """Test bug reported by Daniel (issue #1). If you call the mqtt writer without
  77. serializing the message, you should get a fatal error.
  78. """
  79. import time
  80. import asyncio
  81. import thingflow.filters.output # This has output side-effect
  82. from thingflow.base import Scheduler, from_list
  83. from thingflow.adapters.mqtt import MQTTReader, MQTTWriter
  84. from collections import namedtuple
  85. StripEvent = namedtuple('StripEvent', ['strip_id', 'ts', 'val'])
  86. strip_events = (
  87. StripEvent('strip-1', 1500000000, 50),
  88. StripEvent('strip-1', 1500000000, 5),
  89. StripEvent('strip-1', 1500000000, 50))
  90. mqtt = MQTTWriter('localhost', topics=[('strip-data', 0),])
  91. strip = from_list(strip_events)
  92. strip.connect(mqtt)
  93. strip.output()
  94. sched = Scheduler(asyncio.get_event_loop())
  95. sched.schedule_periodic(strip, 1.0)
  96. try:
  97. sched.run_forever()
  98. except ScheduleError as e:
  99. # verify the cause of the error
  100. dispatch_error = e.__cause__
  101. self.assertTrue(isinstance(dispatch_error, ExcInDispatch),
  102. "expecting cause to be a dispatch error, instead got %s" % repr(dispatch_error))
  103. orig_error = dispatch_error.__cause__
  104. self.assertTrue(isinstance(orig_error, TypeError),
  105. "expecting original exception to be a TypeError, intead got %s" % repr(orig_error))
  106. print("Got expected exception: '%s'" % e)
  107. if __name__ == '__main__':
  108. unittest.main()