mqtt_async.py 9.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234
  1. """Async adapters to MQTT, using the hbmqtt library.
  2. We can make the send code slightly cleaner by using async/await.
  3. To preverse compatability with Python 3.4, we explicitly queue
  4. up the connect request instead.
  5. """
  6. import hbmqtt.client
  7. import hbmqtt.session
  8. import json
  9. import asyncio
  10. from collections import deque
  11. from thingflow.base import InputThing, FatalError, OutputThing, \
  12. filtermethod, EventLoopOutputThingMixin
  13. class QueueWriter(OutputThing, InputThing):
  14. def __init__(self, previous_in_chain, uri, topic, scheduler):
  15. super().__init__()
  16. self.uri = uri
  17. self.topic = topic
  18. self.scheduler = scheduler
  19. self.connected = False
  20. self.pending_task = None
  21. self.pending_message = None
  22. self.pending_error = None
  23. self.request_queue = deque()
  24. self.client = hbmqtt.client.MQTTClient(loop=scheduler.event_loop)
  25. self.dispose = previous_in_chain.connect(self)
  26. def has_pending_requests(self):
  27. """Return True if there are pending requests. Useful for tests
  28. without having to expose internal state. Note that, in the event
  29. of a diconnect(), we don't remove the pending task, as we will be
  30. calling on_error() or on_completed() directly intead of _process_queue().
  31. """
  32. return ((self.pending_task is not None) and (not self.pending_task.done())) or \
  33. (self.pending_message is not None) or \
  34. len(self.request_queue)>0
  35. def dump_state(self):
  36. """Return a string representing the internal state
  37. (for debugging).
  38. """
  39. return "QueueWriter(pending_task=%s,pending_message=%s,queue=%s)" %\
  40. (repr(self.pending_task), repr(self.pending_message),
  41. repr(self.request_queue))
  42. def _to_message(self, x):
  43. return bytes(json.dumps((x.sensor_id, x.ts, x.val),), encoding='utf-8')
  44. def _process_queue(self, future):
  45. assert future == self.pending_task
  46. exc = future.exception()
  47. if exc:
  48. raise FatalError("mqtt request failed with exception: %s" % exc)
  49. if self.pending_message:
  50. self._dispatch_next(self.pending_message)
  51. self.pending_message = None
  52. if len(self.request_queue)==0:
  53. self.pending_task = None
  54. #print("Completed last task")
  55. else:
  56. x = self.request_queue.popleft()
  57. if x is not None:
  58. self.pending_message = x
  59. self.pending_task = \
  60. self.scheduler._schedule_coroutine(self.client.publish(self.topic,
  61. self._to_message(x)),
  62. self._process_queue)
  63. #print("enqueuing message %s on %s (from request_q)" %
  64. # (repr(x), self.topic))
  65. else:
  66. e = self.pending_error
  67. self.pending_task = \
  68. self.scheduler._schedule_coroutine(self.client.disconnect(),
  69. lambda f: self._dispatch_error(e) if e is not None
  70. else lambda f: self._dispatch_completed)
  71. #print("initiated disconnect (pending_error=%s)" % e)
  72. def on_next(self, x):
  73. if self.connected == False:
  74. self.request_queue.append(x)
  75. self.pending_task = \
  76. self.scheduler._schedule_coroutine(self.client.connect(self.uri),
  77. self._process_queue)
  78. self.connected = True
  79. #print("connection in progress, put message %s on request_queue"
  80. # % repr(x))
  81. elif self.pending_task is not None:
  82. self.request_queue.append(x)
  83. #print("put message %s on request_queue" % repr(x))
  84. else:
  85. self.pending_message = x
  86. self.pending_task = \
  87. self.scheduler._schedule_coroutine(self.client.publish(self.topic, self._to_message(x)),
  88. self._process_queue)
  89. #print("enqueuing message %s on %s" % (repr(x), self.topic))
  90. def on_error(self, e):
  91. if len(self.request_queue)>0:
  92. # empty the pending request queue, we won't try to
  93. # send these.
  94. self.request_queue = deque()
  95. #print("on_error: dropped pending requests")
  96. if self.pending_task is None:
  97. self.pending_task = \
  98. self.scheduler._schedule_coroutine(self.client.disconnect(),
  99. lambda f: self._dispatch_error(e))
  100. #print("on_error: initiated disconnect")
  101. else:
  102. self.pending_error = e
  103. self.request_queue.append(None)
  104. #print("on_error: queued disconnect")
  105. def on_completed(self):
  106. if self.pending_task is None:
  107. self.pending_task = \
  108. self.scheduler._schedule_coroutine(self.client.disconnect(),
  109. self._process_queue)
  110. #print("on_completed: initiated disconnect")
  111. else:
  112. self.request_queue.append(None)
  113. #print("on_completed: queued disconnect")
  114. @filtermethod(OutputThing)
  115. def mqtt_async_send(this, uri, topic, scheduler):
  116. """
  117. Filter method to send a message on the specified uri and topic. It is
  118. added to the output_thing.
  119. """
  120. return QueueWriter(this, uri, topic, scheduler)
  121. DELIVER_TIMEOUT=2 # seconds
  122. class QueueReader(OutputThing, EventLoopOutputThingMixin):
  123. """Subscribe to a topic, wait for incoming messages,
  124. and push them downstream.
  125. """
  126. # state constants
  127. INITIAL_STATE = "INITIAL"
  128. CONNECTING_STATE = "CONNECTING"
  129. SUBSCRIBING_STATE = "SUBSCRIBING"
  130. ACTIVE_STATE = "ACTIVE"
  131. UNSUBSCRIBING_STATE = "UNSUBSCRIBING"
  132. DISCONNECTING_STATE = "DISCONNECTING"
  133. FINAL_STATE = "FINAL"
  134. def __init__(self, uri, topic, scheduler, qos=hbmqtt.client.QOS_1,
  135. timeout=DELIVER_TIMEOUT):
  136. super().__init__()
  137. self.uri = uri
  138. self.topic = topic
  139. self.qos = qos
  140. self.scheduler = scheduler
  141. self.state = QueueReader.INITIAL_STATE
  142. self.pending_task = None
  143. self.stop_requested = False
  144. self.client = hbmqtt.client.MQTTClient(loop=scheduler.event_loop)
  145. self.timeout = timeout # no need to change, overridable just for testing
  146. def _start_task(self, call, next_state):
  147. #print("_start_task: %s, next_state=%s" % (repr(call), next_state))
  148. self.state = next_state
  149. self.pending_task = self.scheduler._schedule_coroutine(call,
  150. self._process_event)
  151. def _process_stop_request(self):
  152. if self.stop_requested:
  153. #print("stop requested")
  154. self._start_task(self.client.unsubscribe([self.topic,]),
  155. QueueReader.UNSUBSCRIBING_STATE)
  156. return True
  157. else:
  158. return False
  159. def _process_event(self, future):
  160. assert future == self.pending_task
  161. #print("_process_event state=%s" % self.state)
  162. exc = future.exception()
  163. if exc and isinstance(exc, asyncio.TimeoutError) and\
  164. self.state==QueueReader.ACTIVE_STATE:
  165. # we timeout every few seconds to check for stop requests
  166. if not self._process_stop_request():
  167. self._start_task(self.client.deliver_message(self.timeout),
  168. QueueReader.ACTIVE_STATE)
  169. elif exc:
  170. raise FatalError("mqtt request failed with exception: %s" % exc)
  171. elif self.state==QueueReader.CONNECTING_STATE:
  172. if not self._process_stop_request():
  173. self._start_task(self.client.subscribe([(self.topic, self.qos),]),
  174. QueueReader.SUBSCRIBING_STATE)
  175. elif self.state==QueueReader.SUBSCRIBING_STATE:
  176. if not self._process_stop_request():
  177. self._start_task(self.client.deliver_message(self.timeout),
  178. QueueReader.ACTIVE_STATE)
  179. elif self.state==QueueReader.ACTIVE_STATE:
  180. result = future.result()
  181. assert isinstance(result, hbmqtt.session.ApplicationMessage)
  182. message = str(result.data, encoding='utf-8')
  183. self._dispatch_next(json.loads(message))
  184. if not self._process_stop_request():
  185. self._start_task(self.client.deliver_message(self.timeout),
  186. QueueReader.ACTIVE_STATE)
  187. elif self.state==QueueReader.UNSUBSCRIBING_STATE:
  188. self._start_task(self.client.disconnect(),
  189. QueueReader.DISCONNECTING_STATE)
  190. elif self.state==QueueReader.DISCONNECTING_STATE:
  191. self._dispatch_completed()
  192. self.state = QueueReader.FINAL_STATE
  193. self.pending_task = None
  194. self.scheduler._remove_from_active_schedules(self)
  195. print("QueueReader in FINAL state")
  196. elif self.state==QueueReader.FINAL_STATE:
  197. raise Exception("_process_event should not be called in final state")
  198. else:
  199. raise Exception("_process_event: invalidate state %s" % self.state)
  200. def _observe_event_loop(self):
  201. """This gets things kicked off. Most of the real
  202. action will occur in _process_event().
  203. """
  204. assert self.state == QueueReader.INITIAL_STATE,\
  205. "_observe_event_loop called when in state %s" % self.state
  206. self._start_task(self.client.connect(self.uri), QueueReader.CONNECTING_STATE)
  207. def _stop_loop(self):
  208. """Stop listening for new messages, processing any pending messages, and
  209. move to the final state.
  210. """
  211. self.stop_requested = True