| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149 |
- """
- Test of timeseries ingestion
- See http://predix01.cloud.answerhub.com/questions/21920/time-series-3.html?childToView=21931#answer-21931
- and https://www.predix.io/resources/tutorials/tutorial-details.html?tutorial_id=1549&tag=1613&journey=Exploring%20Security%20services&resources=1594,1593,2105,1544,1549,2255,1951
- """
- import logging
- import time
- import asyncio
- import unittest
- from thingflow.base import InputThing, Scheduler
- from utils import make_test_output_thing_from_vallist
- try:
- from config_for_tests import PREDIX_TOKEN, PREDIX_ZONE_ID, \
- PREDIX_INGEST_URL,PREDIX_QUERY_URL
- except ImportError:
- PREDIX_TOKEN=None
- PREDIX_ZONE_ID=None
- PREDIX_INGEST_URL=None
- PREDIX_QUERY_URL=None
- try:
- import websocket
- import requests
- from thingflow.adapters.predix import *
- PREREQS_AVAILABLE = True
- except ImportError:
- PREREQS_AVAILABLE = False
-
- logger = logging.getLogger()
- logger.setLevel(logging.DEBUG)
- logger.addHandler(logging.StreamHandler())
- logging.basicConfig(level=logging.DEBUG)
- VALUE_STREAM = [1, 2, 3, 4, 5]
- @unittest.skipUnless(PREREQS_AVAILABLE,
- "Predix prequisites not available")
- @unittest.skipUnless(PREDIX_TOKEN is not None and PREDIX_ZONE_ID is not None and\
- PREDIX_INGEST_URL is not None and PREDIX_QUERY_URL is not None,
- "Predix not configured in config_for_tests.py")
- class TestInput(InputThing):
- """Accept events from the predix reader and print them.
- After the specified number, disconnect.
- """
- def __init__(self, prev_in_chain, name):
- self.events = []
- self.name = name
- self.disconnect = prev_in_chain.connect(self)
- self.values = []
-
- def on_next(self, x):
- print(x)
- self.values.append(x.val)
- if len(self.values)==len(VALUE_STREAM):
- self.disconnect()
- print("TestInput %s disconnected" % self.name)
- def on_completed(self):
- print("Reader %s received %d events" % (self.name, len(self.events)))
- TEST_SENSOR1 = 'test-sensor-1'
- TEST_SENSOR2 = 'test-sensor-2'
- @unittest.skipUnless(PREREQS_AVAILABLE,
- "Predix prequisites not available")
- @unittest.skipUnless(PREDIX_TOKEN is not None and PREDIX_ZONE_ID is not None and\
- PREDIX_INGEST_URL is not None and PREDIX_QUERY_URL is not None,
- "Predix not configured in config_for_tests.py")
- class TestPredix(unittest.TestCase):
- def test_batching(self):
- """We write out a set of event from two simulated sensors using an odd batch size (3).
- We then read them back and verify that we got all the events.
- """
- sensor1 = make_test_output_thing_from_vallist(TEST_SENSOR1, VALUE_STREAM)
- sensor2 = make_test_output_thing_from_vallist(TEST_SENSOR2, VALUE_STREAM)
- writer = PredixWriter(PREDIX_INGEST_URL, PREDIX_ZONE_ID, PREDIX_TOKEN,
- extractor=EventExtractor(attributes={'test':True}),
- batch_size=3)
- sensor1.connect(writer)
- sensor2.connect(writer)
- scheduler = Scheduler(asyncio.get_event_loop())
- scheduler.schedule_periodic(sensor1, 0.5)
- scheduler.schedule_periodic(sensor2, 0.5)
- start_time = time.time()
- scheduler.run_forever()
- # Now we read the events back
- reader1 = PredixReader(PREDIX_QUERY_URL, PREDIX_ZONE_ID, PREDIX_TOKEN, TEST_SENSOR1,
- start_time=start_time,
- one_shot=False)
- reader2 = PredixReader(PREDIX_QUERY_URL, PREDIX_ZONE_ID, PREDIX_TOKEN, TEST_SENSOR2,
- start_time=start_time,
- one_shot=False)
- ti1 = TestInput(reader1, 'sensor-1')
- ti2 = TestInput(reader2, 'sensor-2')
- scheduler.schedule_periodic(reader1, 2)
- scheduler.schedule_periodic(reader2, 2)
- scheduler.run_forever()
- self.assertListEqual(VALUE_STREAM, ti1.values)
- self.assertListEqual(VALUE_STREAM, ti2.values)
- def test_individual(self):
- """We write out a set of event from two simulated sensors using a batch size of 1.
- We then read them back and verify that we got all the events.
- """
- sensor1 = make_test_output_thing_from_vallist(TEST_SENSOR1, VALUE_STREAM)
- sensor2 = make_test_output_thing_from_vallist(TEST_SENSOR2, VALUE_STREAM)
- writer = PredixWriter(PREDIX_INGEST_URL, PREDIX_ZONE_ID, PREDIX_TOKEN,
- extractor=EventExtractor(attributes={'test':True}),
- batch_size=1)
- sensor1.connect(writer)
- sensor2.connect(writer)
- scheduler = Scheduler(asyncio.get_event_loop())
- scheduler.schedule_periodic(sensor1, 0.5)
- scheduler.schedule_periodic(sensor2, 0.5)
- start_time = time.time()
- scheduler.run_forever()
- # Now we read the events back
- reader1 = PredixReader(PREDIX_QUERY_URL, PREDIX_ZONE_ID, PREDIX_TOKEN, TEST_SENSOR1,
- start_time=start_time,
- one_shot=False)
- reader2 = PredixReader(PREDIX_QUERY_URL, PREDIX_ZONE_ID, PREDIX_TOKEN, TEST_SENSOR2,
- start_time=start_time,
- one_shot=False)
- ti1 = TestInput(reader1, 'sensor-1')
- ti2 = TestInput(reader2, 'sensor-2')
- scheduler.schedule_periodic(reader1, 2)
- scheduler.schedule_periodic(reader2, 2)
- scheduler.run_forever()
- self.assertListEqual(VALUE_STREAM, ti1.values)
- self.assertListEqual(VALUE_STREAM, ti2.values)
-
- if __name__ == '__main__':
- logger = logging.getLogger()
- logger.setLevel(logging.DEBUG)
- logger.addHandler(logging.StreamHandler())
- logging.basicConfig(level=logging.DEBUG)
- unittest.main()
-
|