| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105 |
- """
- Example of Predix Time Series APIs
- This sends a sequence of data points via PredixWriter and then
- queries then back via PredixReader.
- For hints on configuring Predix, see
- https://github.com/jfischer/ge-predix-python-timeseries-example
- """
- import sys
- import argparse
- import logging
- import time
- import asyncio
- import random
- random.seed()
- from thingflow.base import Scheduler, SensorAsOutputThing
- from thingflow.adapters.predix import PredixWriter, PredixReader, EventExtractor
- logger = logging.getLogger()
- logger.setLevel(logging.DEBUG)
- logger.addHandler(logging.StreamHandler())
- logging.basicConfig(level=logging.DEBUG)
- TEST_SENSOR1 = 'test-sensor-1'
- class TestSensor:
- """Generate a random value for the specified number of samples.
- """
- def __init__(self, sensor_id, num_events):
- self.sensor_id = sensor_id
- self.events_remaining = num_events
- def sample(self):
- if self.events_remaining>0:
- self.events_remaining -= 1
- return random.gauss(100, 5)
- else:
- raise StopIteration
- @staticmethod
- def output_thing(sensor_id, num_events):
- return SensorAsOutputThing(TestSensor(sensor_id, num_events))
- def run(args, token):
- sensor1 = TestSensor.output_thing(TEST_SENSOR1, 5)
- writer = PredixWriter(args.ingest_url, args.predix_zone_id, token,
- extractor=EventExtractor(attributes={'test':True}),
- batch_size=3)
- sensor1.connect(writer)
- sensor1.connect(print) # also print the event
- scheduler = Scheduler(asyncio.get_event_loop())
- scheduler.schedule_periodic(sensor1, 0.5)
-
- start_time = time.time()
- scheduler.run_forever()
-
- print("Reading back events")
- reader1 = PredixReader(args.query_url, args.predix_zone_id, token,
- TEST_SENSOR1,
- start_time=start_time,
- one_shot=True)
- reader1.connect(print)
- scheduler.schedule_recurring(reader1)
- scheduler.run_forever()
- INGEST_URL = 'wss://gateway-predix-data-services.run.aws-usw02-pr.ice.predix.io/v1/stream/messages'
- QUERY_URL='https://time-series-store-predix.run.aws-usw02-pr.ice.predix.io/v1/datapoints'
-
- DESCRIPTION = \
- """Example of Predix Time Series adapters for ThingFlow.
- Sends a sequence of data points via PredixWriter and then
- queries them back via PredixReader."""
- def main(argv=sys.argv[1:]):
- parser = argparse.ArgumentParser(description=DESCRIPTION)
- parser.add_argument("--ingest-url", default=INGEST_URL,
- help="Websockets URL for ingest. Default is for Western US datacenter")
- parser.add_argument("--query-url", default=QUERY_URL,
- help="HTTPS URL for query. Default is for Western US datacenter")
- parser.add_argument("--sensor-id", default="sensor-1",
- help="Sensor id (tag name) to use. Defaults to 'sensor-1'")
- parser.add_argument("predix_zone_id", metavar="PREDIX_ZONE_ID",
- help="Zone Id for authentication")
- parser.add_argument("token_file", metavar="TOKEN_FILE",
- help="Filename of a file containing the bearer token for authentication")
- parsed_args = parser.parse_args(args=argv)
- try:
- with open(parsed_args.token_file, 'r') as tf:
- token = tf.read().rstrip()
- except:
- parser.error("Problem opening/reading token file %s" % parsed_args.token_file)
- run(parsed_args, token)
- print("Test successful.")
- return 0
- if __name__=="__main__":
- sys.exit(main())
|