simple.py 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204
  1. import usocket as socket
  2. import ustruct as struct
  3. from ubinascii import hexlify
  4. class MQTTException(Exception):
  5. pass
  6. class MQTTClient:
  7. def __init__(self, client_id, server, port=0, user=None, password=None, keepalive=0,
  8. ssl=False, ssl_params={}):
  9. if port == 0:
  10. port = 8883 if ssl else 1883
  11. self.client_id = client_id
  12. self.sock = None
  13. self.server = server
  14. self.port = port
  15. self.ssl = ssl
  16. self.ssl_params = ssl_params
  17. self.pid = 0
  18. self.cb = None
  19. self.user = user
  20. self.pswd = password
  21. self.keepalive = keepalive
  22. self.lw_topic = None
  23. self.lw_msg = None
  24. self.lw_qos = 0
  25. self.lw_retain = False
  26. def _send_str(self, s):
  27. self.sock.write(struct.pack("!H", len(s)))
  28. self.sock.write(s)
  29. def _recv_len(self):
  30. n = 0
  31. sh = 0
  32. while 1:
  33. b = self.sock.read(1)[0]
  34. n |= (b & 0x7f) << sh
  35. if not b & 0x80:
  36. return n
  37. sh += 7
  38. def set_callback(self, f):
  39. self.cb = f
  40. def set_last_will(self, topic, msg, retain=False, qos=0):
  41. assert 0 <= qos <= 2
  42. assert topic
  43. self.lw_topic = topic
  44. self.lw_msg = msg
  45. self.lw_qos = qos
  46. self.lw_retain = retain
  47. def connect(self, clean_session=True):
  48. self.sock = socket.socket()
  49. addr = socket.getaddrinfo(self.server, self.port)[0][-1]
  50. self.sock.connect(addr)
  51. if self.ssl:
  52. import ussl
  53. self.sock = ussl.wrap_socket(self.sock, **self.ssl_params)
  54. premsg = bytearray(b"\x10\0\0\0\0\0")
  55. msg = bytearray(b"\x04MQTT\x04\x02\0\0")
  56. sz = 10 + 2 + len(self.client_id)
  57. msg[6] = clean_session << 1
  58. if self.user is not None:
  59. sz += 2 + len(self.user) + 2 + len(self.pswd)
  60. msg[6] |= 0xC0
  61. if self.keepalive:
  62. assert self.keepalive < 65536
  63. msg[7] |= self.keepalive >> 8
  64. msg[8] |= self.keepalive & 0x00FF
  65. if self.lw_topic:
  66. sz += 2 + len(self.lw_topic) + 2 + len(self.lw_msg)
  67. msg[6] |= 0x4 | (self.lw_qos & 0x1) << 3 | (self.lw_qos & 0x2) << 3
  68. msg[6] |= self.lw_retain << 5
  69. i = 1
  70. while sz > 0x7f:
  71. premsg[i] = (sz & 0x7f) | 0x80
  72. sz >>= 7
  73. i += 1
  74. premsg[i] = sz
  75. self.sock.write(premsg, i + 2)
  76. self.sock.write(msg)
  77. #print(hex(len(msg)), hexlify(msg, ":"))
  78. self._send_str(self.client_id)
  79. if self.lw_topic:
  80. self._send_str(self.lw_topic)
  81. self._send_str(self.lw_msg)
  82. if self.user is not None:
  83. self._send_str(self.user)
  84. self._send_str(self.pswd)
  85. resp = self.sock.read(4)
  86. assert resp[0] == 0x20 and resp[1] == 0x02
  87. if resp[3] != 0:
  88. raise MQTTException(resp[3])
  89. return resp[2] & 1
  90. def disconnect(self):
  91. self.sock.write(b"\xe0\0")
  92. self.sock.close()
  93. def ping(self):
  94. self.sock.write(b"\xc0\0")
  95. def publish(self, topic, msg, retain=False, qos=0):
  96. pkt = bytearray(b"\x30\0\0\0")
  97. pkt[0] |= qos << 1 | retain
  98. sz = 2 + len(topic) + len(msg)
  99. if qos > 0:
  100. sz += 2
  101. assert sz < 2097152
  102. i = 1
  103. while sz > 0x7f:
  104. pkt[i] = (sz & 0x7f) | 0x80
  105. sz >>= 7
  106. i += 1
  107. pkt[i] = sz
  108. #print(hex(len(pkt)), hexlify(pkt, ":"))
  109. self.sock.write(pkt, i + 1)
  110. self._send_str(topic)
  111. if qos > 0:
  112. self.pid += 1
  113. pid = self.pid
  114. struct.pack_into("!H", pkt, 0, pid)
  115. self.sock.write(pkt, 2)
  116. self.sock.write(msg)
  117. if qos == 1:
  118. while 1:
  119. op = self.wait_msg()
  120. if op == 0x40:
  121. sz = self.sock.read(1)
  122. assert sz == b"\x02"
  123. rcv_pid = self.sock.read(2)
  124. rcv_pid = rcv_pid[0] << 8 | rcv_pid[1]
  125. if pid == rcv_pid:
  126. return
  127. elif qos == 2:
  128. assert 0
  129. def subscribe(self, topic, qos=0):
  130. assert self.cb is not None, "Subscribe callback is not set"
  131. pkt = bytearray(b"\x82\0\0\0")
  132. self.pid += 1
  133. struct.pack_into("!BH", pkt, 1, 2 + 2 + len(topic) + 1, self.pid)
  134. #print(hex(len(pkt)), hexlify(pkt, ":"))
  135. self.sock.write(pkt)
  136. self._send_str(topic)
  137. self.sock.write(qos.to_bytes(1, "little"))
  138. while 1:
  139. op = self.wait_msg()
  140. if op == 0x90:
  141. resp = self.sock.read(4)
  142. #print(resp)
  143. assert resp[1] == pkt[2] and resp[2] == pkt[3]
  144. if resp[3] == 0x80:
  145. raise MQTTException(resp[3])
  146. return
  147. # Wait for a single incoming MQTT message and process it.
  148. # Subscribed messages are delivered to a callback previously
  149. # set by .set_callback() method. Other (internal) MQTT
  150. # messages processed internally.
  151. def wait_msg(self):
  152. res = self.sock.read(1)
  153. self.sock.setblocking(True)
  154. if res is None:
  155. return None
  156. if res == b"":
  157. raise OSError(-1)
  158. if res == b"\xd0": # PINGRESP
  159. sz = self.sock.read(1)[0]
  160. assert sz == 0
  161. return None
  162. op = res[0]
  163. if op & 0xf0 != 0x30:
  164. return op
  165. sz = self._recv_len()
  166. topic_len = self.sock.read(2)
  167. topic_len = (topic_len[0] << 8) | topic_len[1]
  168. topic = self.sock.read(topic_len)
  169. sz -= topic_len + 2
  170. if op & 6:
  171. pid = self.sock.read(2)
  172. pid = pid[0] << 8 | pid[1]
  173. sz -= 2
  174. msg = self.sock.read(sz)
  175. self.cb(topic, msg)
  176. if op & 6 == 2:
  177. pkt = bytearray(b"\x40\x02\0\0")
  178. struct.pack_into("!H", pkt, 2, pid)
  179. self.sock.write(pkt)
  180. elif op & 6 == 4:
  181. assert 0
  182. # Checks whether a pending message from server is available.
  183. # If not, returns immediately with None. Otherwise, does
  184. # the same processing as wait_msg.
  185. def check_msg(self):
  186. self.sock.setblocking(False)
  187. return self.wait_msg()