mqtt_writer.py 802 B

12345678910111213141516171819202122232425262728293031
  1. # Writer interface over umqtt API.
  2. from umqtt.robust import MQTTClient
  3. import json
  4. class MQTTWriter:
  5. __slots__ = ('host', 'port', 'topic', 'client')
  6. def __init__(self, name, host, port, topic):
  7. self.topic = topic
  8. self.host = host
  9. self.port = port
  10. self.client = MQTTClient(name, host, port)
  11. self._connect()
  12. def _connect(self):
  13. print("Connecting to %s:%s" % (self.host, self.port))
  14. self.client.connect()
  15. print("Connection successful")
  16. def on_next(self, x):
  17. data = bytes(json.dumps(x), 'utf-8')
  18. self.client.publish(bytes(self.topic, 'utf-8'), data)
  19. def on_completed(self):
  20. print("mqtt_completed, disconnecting")
  21. self.client.disconnect()
  22. def on_error(self, e):
  23. print("mqtt on_error: %s, disconnecting" %e)
  24. self.client.disconnect()