influxdb.py 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156
  1. import datetime
  2. from influxdb import InfluxDBClient, SeriesHelper
  3. from thingflow.base import OutputThing, InputThing, FatalError
  4. # class BulkUploader(SeriesHelper):
  5. # def __new__(self, client, msg_format, bulk_size=10):
  6. # self.client = client
  7. # self.msg_format = msg_format
  8. # self.bulk_size = bulk_size
  9. #
  10. # # Meta class stores time series helper configuration.
  11. # class Meta:
  12. # # The client should be an instance of InfluxDBClient.
  13. # client = client
  14. # # The series name must be a string. Add dependent fields/tags in curly brackets.
  15. # series_name = msg_format.series_name
  16. # # Defines all the fields in this time series.
  17. # fields = msg_format.fields
  18. # # Defines all the tags for the series.
  19. # tags = msg_format.tags
  20. # # Defines the number of data points to store prior to writing on the wire.
  21. # bulk_size = bulk_size
  22. # # autocommit must be set to True when using bulk_size
  23. # autocommit = True
  24. class InfluxDBWriter(InputThing):
  25. """Subscribes to events and writes out to an InfluxDB database"""
  26. def __init__(self, msg_format, generate_timestamp=True, host="127.0.0.1", port=8086, database="thingflow",
  27. username="root", password="root",
  28. ssl=False, verify_ssl=False, timeout=None,
  29. use_udp=False, udp_port=4444, proxies=None,
  30. bulk_size=10):
  31. self.dbname = database
  32. self.msg_format = msg_format # a tuple consisting of {series_name, fields, tags}
  33. if not self._validate_msg_format(msg_format):
  34. raise Exception("Message format should contain series_name (string), fields (string list), and tags (string list)")
  35. self.generate_timestamp = generate_timestamp
  36. self.epoch = datetime.datetime.utcfromtimestamp(0)
  37. self.client = InfluxDBClient(host=host, port=port,
  38. username=username, password=password,
  39. database=database,
  40. ssl=ssl, verify_ssl=verify_ssl,
  41. timeout=timeout,
  42. use_udp=use_udp, udp_port=udp_port,
  43. proxies=proxies)
  44. self.bulk_size = bulk_size
  45. print("Message format")
  46. print(msg_format.series_name)
  47. print(msg_format.fields)
  48. print(msg_format.tags)
  49. # self.client.create_database(database, if_not_exists=True)
  50. class BulkUploader(SeriesHelper):
  51. # Meta class stores time series helper configuration.
  52. class Meta:
  53. # The client should be an instance of InfluxDBClient.
  54. client = self.client
  55. # The series name must be a string. Add dependent fields/tags in curly brackets.
  56. series_name = msg_format.series_name
  57. # Defines all the fields in this time series.
  58. fields = msg_format.fields
  59. # Defines all the tags for the series.
  60. tags = msg_format.tags
  61. # Defines the number of data points to store prior to writing on the wire.
  62. bulk_size = self.bulk_size
  63. # autocommit must be set to True when using bulk_size
  64. autocommit = True
  65. print('In Meta')
  66. print(series_name, fields, tags)
  67. # self.bulk_uploader = BulkUploader()
  68. def _validate_msg_format(self, msg_format):
  69. return (hasattr(msg_format, 'series_name') and \
  70. hasattr(msg_format, 'tags') and \
  71. hasattr(msg_format, 'fields') )
  72. def on_next(self, msg):
  73. # write the message on to the database (use the bulk uploader to group writes)
  74. # assume msg is a dictionary-like object with all fields from msg_format
  75. flds = { }
  76. for f in self.msg_format.fields:
  77. flds[f] = getattr(msg, f)
  78. tags = { }
  79. for t in self.msg_format.tags:
  80. tags[t] = getattr(msg, t)
  81. if not (self.generate_timestamp) and hasattr(msg, 'ts'):
  82. time = int(getattr(msg, 'ts') * 1e9)
  83. json_msg = [ { 'measurement' : self.msg_format.series_name,
  84. 'time' : time,
  85. 'fields' : flds,
  86. 'tags' : tags
  87. } ]
  88. else:
  89. json_msg = [ { 'measurement' : self.msg_format.series_name,
  90. 'fields' : flds,
  91. 'tags' : tags
  92. } ]
  93. print(json_msg)
  94. self.client.write_points(json_msg)
  95. # self.BulkUploader(msg)
  96. def on_error(self, e):
  97. # influx does not have a disconnect. This is because the connection is
  98. # through a REST API
  99. # self.BulkUploader.commit()
  100. pass
  101. def on_completed(self):
  102. # self.BulkUploader.commit()
  103. pass
  104. def __str__(self):
  105. return 'InfluxDB Client(msg=%s)' % self.msg_format.__str__()
  106. class InfluxDBReader(OutputThing):
  107. def __init__(self, query, host="127.0.0.1", port=8086, database="thingflow",
  108. username="root", password="root",
  109. ssl=False, verify_ssl=False, timeout=None,
  110. use_udp=False, udp_port=4444, proxies=None,
  111. bulk_size=10):
  112. super().__init__()
  113. self.dbname = database
  114. self.client = InfluxDBClient(host=host, port=port,
  115. username=username, password=password,
  116. database=database,
  117. ssl=ssl, verify_ssl=verify_ssl,
  118. timeout=timeout,
  119. use_udp=use_udp, udp_port=udp_port,
  120. proxies=proxies)
  121. self.query = query
  122. self.points = self.client.query(query).get_points()
  123. def __str__(self):
  124. return 'InfluxDB[%s]: %s' % (self.dbname, self.query)
  125. def _observe(self):
  126. try:
  127. event = self.points.__next__()
  128. self._dispatch_next(event)
  129. except StopIteration:
  130. self._dispatch_completed()
  131. except FatalError:
  132. raise
  133. except Exception as e:
  134. self._close()
  135. self._dispatch_error(e)
  136. def _close(self):
  137. pass