predix_example.py 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105
  1. """
  2. Example of Predix Time Series APIs
  3. This sends a sequence of data points via PredixWriter and then
  4. queries then back via PredixReader.
  5. For hints on configuring Predix, see
  6. https://github.com/jfischer/ge-predix-python-timeseries-example
  7. """
  8. import sys
  9. import argparse
  10. import logging
  11. import time
  12. import asyncio
  13. import random
  14. random.seed()
  15. from thingflow.base import Scheduler, SensorAsOutputThing
  16. from thingflow.adapters.predix import PredixWriter, PredixReader, EventExtractor
  17. logger = logging.getLogger()
  18. logger.setLevel(logging.DEBUG)
  19. logger.addHandler(logging.StreamHandler())
  20. logging.basicConfig(level=logging.DEBUG)
  21. TEST_SENSOR1 = 'test-sensor-1'
  22. class TestSensor:
  23. """Generate a random value for the specified number of samples.
  24. """
  25. def __init__(self, sensor_id, num_events):
  26. self.sensor_id = sensor_id
  27. self.events_remaining = num_events
  28. def sample(self):
  29. if self.events_remaining>0:
  30. self.events_remaining -= 1
  31. return random.gauss(100, 5)
  32. else:
  33. raise StopIteration
  34. @staticmethod
  35. def output_thing(sensor_id, num_events):
  36. return SensorAsOutputThing(TestSensor(sensor_id, num_events))
  37. def run(args, token):
  38. sensor1 = TestSensor.output_thing(TEST_SENSOR1, 5)
  39. writer = PredixWriter(args.ingest_url, args.predix_zone_id, token,
  40. extractor=EventExtractor(attributes={'test':True}),
  41. batch_size=3)
  42. sensor1.connect(writer)
  43. sensor1.connect(print) # also print the event
  44. scheduler = Scheduler(asyncio.get_event_loop())
  45. scheduler.schedule_periodic(sensor1, 0.5)
  46. start_time = time.time()
  47. scheduler.run_forever()
  48. print("Reading back events")
  49. reader1 = PredixReader(args.query_url, args.predix_zone_id, token,
  50. TEST_SENSOR1,
  51. start_time=start_time,
  52. one_shot=True)
  53. reader1.connect(print)
  54. scheduler.schedule_recurring(reader1)
  55. scheduler.run_forever()
  56. INGEST_URL = 'wss://gateway-predix-data-services.run.aws-usw02-pr.ice.predix.io/v1/stream/messages'
  57. QUERY_URL='https://time-series-store-predix.run.aws-usw02-pr.ice.predix.io/v1/datapoints'
  58. DESCRIPTION = \
  59. """Example of Predix Time Series adapters for ThingFlow.
  60. Sends a sequence of data points via PredixWriter and then
  61. queries them back via PredixReader."""
  62. def main(argv=sys.argv[1:]):
  63. parser = argparse.ArgumentParser(description=DESCRIPTION)
  64. parser.add_argument("--ingest-url", default=INGEST_URL,
  65. help="Websockets URL for ingest. Default is for Western US datacenter")
  66. parser.add_argument("--query-url", default=QUERY_URL,
  67. help="HTTPS URL for query. Default is for Western US datacenter")
  68. parser.add_argument("--sensor-id", default="sensor-1",
  69. help="Sensor id (tag name) to use. Defaults to 'sensor-1'")
  70. parser.add_argument("predix_zone_id", metavar="PREDIX_ZONE_ID",
  71. help="Zone Id for authentication")
  72. parser.add_argument("token_file", metavar="TOKEN_FILE",
  73. help="Filename of a file containing the bearer token for authentication")
  74. parsed_args = parser.parse_args(args=argv)
  75. try:
  76. with open(parsed_args.token_file, 'r') as tf:
  77. token = tf.read().rstrip()
  78. except:
  79. parser.error("Problem opening/reading token file %s" % parsed_args.token_file)
  80. run(parsed_args, token)
  81. print("Test successful.")
  82. return 0
  83. if __name__=="__main__":
  84. sys.exit(main())