test_mqtt_async.py 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116
  1. # Copyright 2017 by MPI-SWS and Data-Ken Research.
  2. # Licensed under the Apache 2.0 License.
  3. """Test async version of mqtt libraries. Depends on hbmqtt
  4. (https://github.com/beerfactory/hbmqtt)
  5. """
  6. import unittest
  7. import sys
  8. import asyncio
  9. import string
  10. from random import choice, seed
  11. from thingflow.base import Scheduler, SensorAsOutputThing, SensorEvent
  12. import thingflow.filters.output
  13. import thingflow.filters.combinators
  14. import thingflow.filters.select
  15. from thingflow.filters.transducer import PeriodicMedianTransducer
  16. from utils import ValueListSensor, ValidateAndStopInputThing
  17. seed()
  18. try:
  19. import hbmqtt
  20. from thingflow.adapters.mqtt_async import QueueWriter, QueueReader
  21. HBMQTT_AVAILABLE = True
  22. except ImportError:
  23. HBMQTT_AVAILABLE = False
  24. URL = "mqtt://localhost:1883"
  25. VALUES = [
  26. 1.0,
  27. 2.5,
  28. 3.7,
  29. 4.1,
  30. 8.1,
  31. 0.5,
  32. 6.5,
  33. 4.5,
  34. 3.9,
  35. 6.5
  36. ]
  37. EXPECTED = [
  38. 2.5,
  39. 4.1,
  40. 4.5,
  41. 6.5
  42. ]
  43. def msg_to_event(msg):
  44. return SensorEvent(sensor_id=msg[0], ts=msg[1], val=msg[2])
  45. CHARS=string.ascii_letters+string.digits
  46. def get_topic_name(test_class):
  47. return test_class.__class__.__name__ + ''.join([ choice(CHARS) for i in range(5) ])
  48. @unittest.skipUnless(HBMQTT_AVAILABLE,
  49. "HBMQTT library not installed for python at %s" %
  50. sys.executable)
  51. class TestCase(unittest.TestCase):
  52. def setUp(self):
  53. # Creating a new event loop each test case does not seem to work.
  54. # I think it is due to hbmqtt not cleaning up some state in the asyncio
  55. # layer.
  56. #self.loop = asyncio.new_event_loop()
  57. self.loop = asyncio.get_event_loop()
  58. self.sched = Scheduler(self.loop)
  59. def tearDown(self):
  60. pass
  61. #self.loop.stop()
  62. #self.loop.close()
  63. def test_client_only(self):
  64. SENSOR_ID='sensor-1'
  65. TOPIC=get_topic_name(self)
  66. sensor = SensorAsOutputThing(ValueListSensor(SENSOR_ID, VALUES))
  67. td = sensor.transduce(PeriodicMedianTransducer(period=3))
  68. qw = QueueWriter(td, URL, TOPIC, self.sched)
  69. qw.output()
  70. self.sched.schedule_periodic(sensor, 0.5)
  71. self.sched.run_forever()
  72. self.assertFalse(qw.has_pending_requests(),
  73. "QueueWriter has pending requests: %s" % qw.dump_state())
  74. print("test_client_only completed")
  75. def send_and_recv_body(self, sleep_timeout):
  76. SENSOR_ID='sensor-1'
  77. TOPIC=get_topic_name(self)
  78. sensor = SensorAsOutputThing(ValueListSensor(SENSOR_ID, VALUES))
  79. td = sensor.transduce(PeriodicMedianTransducer(period=3))
  80. qw = QueueWriter(td, URL, TOPIC, self.sched)
  81. qw.output()
  82. qr = QueueReader(URL, TOPIC, self.sched, timeout=sleep_timeout)
  83. self.sched.schedule_periodic(sensor, 0.5)
  84. stop_qr = self.sched.schedule_on_main_event_loop(qr)
  85. vs = ValidateAndStopInputThing(EXPECTED, self, stop_qr)
  86. qr.select(msg_to_event).connect(vs)
  87. self.sched.run_forever()
  88. self.assertFalse(qw.has_pending_requests(),
  89. "QueueWriter has pending requests: %s" % qw.dump_state())
  90. self.assertEqual(qr.state, QueueReader.FINAL_STATE)
  91. self.assertEqual(vs.next_idx, len(EXPECTED))
  92. print("send_and_recv_bod(%s) completed" % sleep_timeout)
  93. def test_short_timeout(self):
  94. self.send_and_recv_body(0.1)
  95. def test_long_timeout(self):
  96. self.send_and_recv_body(3.0)
  97. if __name__ == '__main__':
  98. unittest.main()