| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257 |
- """
- Adapters to Predix TimeSeries API
- Library dependencies: websocket-client requests
- """
- import logging
- import json
- import time
- import os
- from websocket import create_connection
- import requests
- from thingflow.base import SensorEvent, InputThing, OutputThing, FatalError
- logger = logging.getLogger(__name__)
- def ts_to_predix_ts(ts):
- return int(round(1000*ts))
- # we use this to generate unique message ids
- _PIDSTR = str(os.getpid())
- class EventExtractor:
- """Methods to access data from an event for use in a
- predix ingestion message. This implementation is for the default SensorEvent
- namedtuple. Methods can be overridden or a new class created to handle
- other event data types or change functionality.
- """
- def __init__(self, quality=3, attributes=None):
- """Since we do not have a quality value in our default event type,
- we set the same quality for all datapoints. Currently the default is 3 (good),
- although they seem to recommend 1 (uncertain).
- """
- self.quality = quality
- self.attributes = attributes
-
- def get_message_id(self):
- """Not associated with the event, but with the message that will be sent.
- """
- return _PIDSTR + str(int(round(1000*time.time())))
- def get_attributes(self, sensor_id):
- """Attributes are a property of the sensor, not the individual event.
- Return either None or a dict of key/value pairs.
- """
- return self.attributes
-
- def get_sensor_id(self, event):
- return event.sensor_id
- def get_predix_timestamp(self, event):
- """Get the timestamp in miliseconds since the epoch
- """
- return ts_to_predix_ts(event.ts)
- def get_value(self, event):
- return event.val
- def get_quality(self, event):
- """See the predix documentation. 3 means good.
- """
- return self.quality
-
- def _create_ingest_body(events, extractor):
- """Create the POST body for an ingest message. Accepts a list of events.
- See https://docs.predix.io/en-US/content/service/data_management/time_series/using-the-time-series-service#concept_dc613f2c-bb63-4287-9c95-8aaf2c1ca6f7 for details
- """
- mid = extractor.get_message_id()
- datapoints_by_sensor = {}
- for event in events:
- sensor_id = extractor.get_sensor_id(event)
- if sensor_id not in datapoints_by_sensor:
- datapoints_by_sensor[sensor_id] = []
- datapoints_by_sensor[sensor_id].append(
- [extractor.get_predix_timestamp(event),
- extractor.get_value(event),
- extractor.get_quality(event)])
- body = []
- for sensor_id in sorted(datapoints_by_sensor.keys()):
- body_data = {'name':str(sensor_id), 'datapoints':datapoints_by_sensor[sensor_id]}
- attributes = extractor.get_attributes(sensor_id)
- if attributes is not None:
- body_data['attributes'] = attributes
- body.append(body_data)
- return {'messageId':mid,
- 'body': body}
- class PredixError(Exception):
- pass
- class PredixWriter(InputThing):
- """Adapter that sends mesages to the Predix TimeSeries service via the
- Ingest websocket API.
- The batch_size is used to determine how many events go into a message.
- The extractor parameter specifies a class to map from internal events to
- Predix events. The default value maps from thingflow.base.SensorEvent.
- """
- def __init__(self, ingest_url, predix_zone_id, token, batch_size=1,
- extractor=EventExtractor()):
- self.ingest_url = ingest_url
- self.batch_size = batch_size
- self.extractor = extractor
- self.headers = {'Predix-Zone-Id': predix_zone_id,
- 'Authorization': 'Bearer ' + token,
- 'Content-Type': 'application/json'}
- self.ws = None
- self.pending_events = []
- def _send(self):
- if self.ws is None:
- logging.info("Connecting to Predix...")
- self.ws = create_connection(self.ingest_url, header=self.headers)
- logging.info("Connected")
- body = json.dumps(_create_ingest_body(self.pending_events, extractor=self.extractor))
- #print(body)
- self.ws.send(body)
- result = self.ws.recv()
- rdata = json.loads(result)
- if rdata['statusCode']!=202:
- raise PredixError("Unexpected websocket response: %s" % result)
- self.pending_events = []
- def on_next(self, msg):
- self.pending_events.append(msg)
- if len(self.pending_events)==self.batch_size:
- self._send()
- def on_completed(self):
- if len(self.pending_events)>0:
- self._send()
- if self.ws:
- self.ws.close()
- def on_error(self, e):
- if len(self.pending_events)>0:
- try:
- self._send()
- except Exception as e:
- logger.exception("Error in attemping to flush pending messages to predix", e)
- if self.ws:
- self.ws.close()
- def _create_query_body(sensor_id, start_time_ms, end_time_ms):
- return {
- "cache_time": 0,
- "tags": [
- {
- "name": sensor_id,
- "order": "asc"
- }
- ],
- "start": start_time_ms,
- "end": end_time_ms
- }
- def build_sensor_event(sensor_id, predix_timestamp, value, quality):
- """Use the data from a predix datapoint to construct a sensor event
- """
- return SensorEvent(sensor_id, predix_timestamp/1000, value)
- def _parse_query_response(resp, build_event_fn=build_sensor_event):
- try:
- l = resp['tags'][0]
- results = l['results']
- sensor_id=l['name']
- values = [build_sensor_event(sensor_id, v[0], v[1], v[2]) for v in
- results[0]['values']]
- count = l['stats']['rawCount']
- if count>0:
- last_timestamp_ms = results[0]['values'][-1][0]
- else:
- last_timestamp_ms = None
- return count, values, last_timestamp_ms
- except Exception as e:
- logger.exception("Parse error for query response %s" % resp, e)
- raise Exception("Parse error for query response %s" % resp)
-
- class PredixReader(OutputThing):
- """Query the Predix time series service for events and inject them
- into thingflow.
- The query API is a little problematic for ongoing event
- queries, as it is stateless and does not have a concept of event ids.
- Thus, we have to query for events within specific ranges. We can work
- around this by querying for one millisecond more than the last event.
- """
- def __init__(self, query_url, predix_zone_id, token, sensor_id, start_time=None,
- one_shot=False, build_event_fn=build_sensor_event):
- """start_time is the starting time for the first query. If not specified,
- the time that the reader object was constructed is used. The end time
- for queries is always the current time.
- If one_shot is True, we just do a single query and close the stream.
- build_event_fn is a function mapping from fields of a predix timeseries
- event to internal events to be used within ThingFlow. By default, it
- maps to thingflow.base.SensorEvent tuples.
- """
- super().__init__()
- self.query_url = query_url
- self.headers = {'Predix-Zone-Id': predix_zone_id,
- 'Authorization': 'Bearer ' + token,
- 'Content-Type': 'application/json'}
- self.sensor_id = sensor_id
- if start_time:
- self.start_time_ms = ts_to_predix_ts(start_time)
- else:
- self.start_time_ms = ts_to_predix_ts(time.time())
- self.one_shot = one_shot
- self.build_event_fn = build_sensor_event
-
- def _query(self, start_time_ms, end_time_ms):
- body = _create_query_body(self.sensor_id, start_time_ms, end_time_ms)
- #print(body)
- r = requests.post(self.query_url, data=bytes(json.dumps(body), encoding='utf-8'),
- headers=self.headers)
- resp = r.json()
- logger.debug("response: %s", resp)
- (count, events, last_timestamp_ms) = \
- _parse_query_response(resp, build_event_fn=self.build_event_fn)
- assert count==len(events)
- r.close()
- return events, last_timestamp_ms
-
- def _observe(self):
- query_time_ms = ts_to_predix_ts(time.time())
- try:
- (events, last_timestamp_ms) = self._query(self.start_time_ms, query_time_ms)
- for event in events:
- self._dispatch_next(event)
- except FatalError:
- raise
- except Exception as e:
- logger.exception("Got an error during query", e)
- self._dispatch_error(e)
- return
- if self.one_shot:
- logger.info("Done with data from %s" % self.sensor_id)
- self._dispatch_completed()
- return
- # if not one shot, we use the last timestamp time + 1 tick
- # as the next start time.
- # If no events were found, we leave the same start time
- if len(events)>0:
- self.start_time_ms = last_timestamp_ms + 1
-
-
|