predix.py 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257
  1. """
  2. Adapters to Predix TimeSeries API
  3. Library dependencies: websocket-client requests
  4. """
  5. import logging
  6. import json
  7. import time
  8. import os
  9. from websocket import create_connection
  10. import requests
  11. from thingflow.base import SensorEvent, InputThing, OutputThing, FatalError
  12. logger = logging.getLogger(__name__)
  13. def ts_to_predix_ts(ts):
  14. return int(round(1000*ts))
  15. # we use this to generate unique message ids
  16. _PIDSTR = str(os.getpid())
  17. class EventExtractor:
  18. """Methods to access data from an event for use in a
  19. predix ingestion message. This implementation is for the default SensorEvent
  20. namedtuple. Methods can be overridden or a new class created to handle
  21. other event data types or change functionality.
  22. """
  23. def __init__(self, quality=3, attributes=None):
  24. """Since we do not have a quality value in our default event type,
  25. we set the same quality for all datapoints. Currently the default is 3 (good),
  26. although they seem to recommend 1 (uncertain).
  27. """
  28. self.quality = quality
  29. self.attributes = attributes
  30. def get_message_id(self):
  31. """Not associated with the event, but with the message that will be sent.
  32. """
  33. return _PIDSTR + str(int(round(1000*time.time())))
  34. def get_attributes(self, sensor_id):
  35. """Attributes are a property of the sensor, not the individual event.
  36. Return either None or a dict of key/value pairs.
  37. """
  38. return self.attributes
  39. def get_sensor_id(self, event):
  40. return event.sensor_id
  41. def get_predix_timestamp(self, event):
  42. """Get the timestamp in miliseconds since the epoch
  43. """
  44. return ts_to_predix_ts(event.ts)
  45. def get_value(self, event):
  46. return event.val
  47. def get_quality(self, event):
  48. """See the predix documentation. 3 means good.
  49. """
  50. return self.quality
  51. def _create_ingest_body(events, extractor):
  52. """Create the POST body for an ingest message. Accepts a list of events.
  53. See https://docs.predix.io/en-US/content/service/data_management/time_series/using-the-time-series-service#concept_dc613f2c-bb63-4287-9c95-8aaf2c1ca6f7 for details
  54. """
  55. mid = extractor.get_message_id()
  56. datapoints_by_sensor = {}
  57. for event in events:
  58. sensor_id = extractor.get_sensor_id(event)
  59. if sensor_id not in datapoints_by_sensor:
  60. datapoints_by_sensor[sensor_id] = []
  61. datapoints_by_sensor[sensor_id].append(
  62. [extractor.get_predix_timestamp(event),
  63. extractor.get_value(event),
  64. extractor.get_quality(event)])
  65. body = []
  66. for sensor_id in sorted(datapoints_by_sensor.keys()):
  67. body_data = {'name':str(sensor_id), 'datapoints':datapoints_by_sensor[sensor_id]}
  68. attributes = extractor.get_attributes(sensor_id)
  69. if attributes is not None:
  70. body_data['attributes'] = attributes
  71. body.append(body_data)
  72. return {'messageId':mid,
  73. 'body': body}
  74. class PredixError(Exception):
  75. pass
  76. class PredixWriter(InputThing):
  77. """Adapter that sends mesages to the Predix TimeSeries service via the
  78. Ingest websocket API.
  79. The batch_size is used to determine how many events go into a message.
  80. The extractor parameter specifies a class to map from internal events to
  81. Predix events. The default value maps from thingflow.base.SensorEvent.
  82. """
  83. def __init__(self, ingest_url, predix_zone_id, token, batch_size=1,
  84. extractor=EventExtractor()):
  85. self.ingest_url = ingest_url
  86. self.batch_size = batch_size
  87. self.extractor = extractor
  88. self.headers = {'Predix-Zone-Id': predix_zone_id,
  89. 'Authorization': 'Bearer ' + token,
  90. 'Content-Type': 'application/json'}
  91. self.ws = None
  92. self.pending_events = []
  93. def _send(self):
  94. if self.ws is None:
  95. logging.info("Connecting to Predix...")
  96. self.ws = create_connection(self.ingest_url, header=self.headers)
  97. logging.info("Connected")
  98. body = json.dumps(_create_ingest_body(self.pending_events, extractor=self.extractor))
  99. #print(body)
  100. self.ws.send(body)
  101. result = self.ws.recv()
  102. rdata = json.loads(result)
  103. if rdata['statusCode']!=202:
  104. raise PredixError("Unexpected websocket response: %s" % result)
  105. self.pending_events = []
  106. def on_next(self, msg):
  107. self.pending_events.append(msg)
  108. if len(self.pending_events)==self.batch_size:
  109. self._send()
  110. def on_completed(self):
  111. if len(self.pending_events)>0:
  112. self._send()
  113. if self.ws:
  114. self.ws.close()
  115. def on_error(self, e):
  116. if len(self.pending_events)>0:
  117. try:
  118. self._send()
  119. except Exception as e:
  120. logger.exception("Error in attemping to flush pending messages to predix", e)
  121. if self.ws:
  122. self.ws.close()
  123. def _create_query_body(sensor_id, start_time_ms, end_time_ms):
  124. return {
  125. "cache_time": 0,
  126. "tags": [
  127. {
  128. "name": sensor_id,
  129. "order": "asc"
  130. }
  131. ],
  132. "start": start_time_ms,
  133. "end": end_time_ms
  134. }
  135. def build_sensor_event(sensor_id, predix_timestamp, value, quality):
  136. """Use the data from a predix datapoint to construct a sensor event
  137. """
  138. return SensorEvent(sensor_id, predix_timestamp/1000, value)
  139. def _parse_query_response(resp, build_event_fn=build_sensor_event):
  140. try:
  141. l = resp['tags'][0]
  142. results = l['results']
  143. sensor_id=l['name']
  144. values = [build_sensor_event(sensor_id, v[0], v[1], v[2]) for v in
  145. results[0]['values']]
  146. count = l['stats']['rawCount']
  147. if count>0:
  148. last_timestamp_ms = results[0]['values'][-1][0]
  149. else:
  150. last_timestamp_ms = None
  151. return count, values, last_timestamp_ms
  152. except Exception as e:
  153. logger.exception("Parse error for query response %s" % resp, e)
  154. raise Exception("Parse error for query response %s" % resp)
  155. class PredixReader(OutputThing):
  156. """Query the Predix time series service for events and inject them
  157. into thingflow.
  158. The query API is a little problematic for ongoing event
  159. queries, as it is stateless and does not have a concept of event ids.
  160. Thus, we have to query for events within specific ranges. We can work
  161. around this by querying for one millisecond more than the last event.
  162. """
  163. def __init__(self, query_url, predix_zone_id, token, sensor_id, start_time=None,
  164. one_shot=False, build_event_fn=build_sensor_event):
  165. """start_time is the starting time for the first query. If not specified,
  166. the time that the reader object was constructed is used. The end time
  167. for queries is always the current time.
  168. If one_shot is True, we just do a single query and close the stream.
  169. build_event_fn is a function mapping from fields of a predix timeseries
  170. event to internal events to be used within ThingFlow. By default, it
  171. maps to thingflow.base.SensorEvent tuples.
  172. """
  173. super().__init__()
  174. self.query_url = query_url
  175. self.headers = {'Predix-Zone-Id': predix_zone_id,
  176. 'Authorization': 'Bearer ' + token,
  177. 'Content-Type': 'application/json'}
  178. self.sensor_id = sensor_id
  179. if start_time:
  180. self.start_time_ms = ts_to_predix_ts(start_time)
  181. else:
  182. self.start_time_ms = ts_to_predix_ts(time.time())
  183. self.one_shot = one_shot
  184. self.build_event_fn = build_sensor_event
  185. def _query(self, start_time_ms, end_time_ms):
  186. body = _create_query_body(self.sensor_id, start_time_ms, end_time_ms)
  187. #print(body)
  188. r = requests.post(self.query_url, data=bytes(json.dumps(body), encoding='utf-8'),
  189. headers=self.headers)
  190. resp = r.json()
  191. logger.debug("response: %s", resp)
  192. (count, events, last_timestamp_ms) = \
  193. _parse_query_response(resp, build_event_fn=self.build_event_fn)
  194. assert count==len(events)
  195. r.close()
  196. return events, last_timestamp_ms
  197. def _observe(self):
  198. query_time_ms = ts_to_predix_ts(time.time())
  199. try:
  200. (events, last_timestamp_ms) = self._query(self.start_time_ms, query_time_ms)
  201. for event in events:
  202. self._dispatch_next(event)
  203. except FatalError:
  204. raise
  205. except Exception as e:
  206. logger.exception("Got an error during query", e)
  207. self._dispatch_error(e)
  208. return
  209. if self.one_shot:
  210. logger.info("Done with data from %s" % self.sensor_id)
  211. self._dispatch_completed()
  212. return
  213. # if not one shot, we use the last timestamp time + 1 tick
  214. # as the next start time.
  215. # If no events were found, we leave the same start time
  216. if len(events)>0:
  217. self.start_time_ms = last_timestamp_ms + 1