test_predix.py 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149
  1. """
  2. Test of timeseries ingestion
  3. See http://predix01.cloud.answerhub.com/questions/21920/time-series-3.html?childToView=21931#answer-21931
  4. and https://www.predix.io/resources/tutorials/tutorial-details.html?tutorial_id=1549&tag=1613&journey=Exploring%20Security%20services&resources=1594,1593,2105,1544,1549,2255,1951
  5. """
  6. import logging
  7. import time
  8. import asyncio
  9. import unittest
  10. from thingflow.base import InputThing, Scheduler
  11. from utils import make_test_output_thing_from_vallist
  12. try:
  13. from config_for_tests import PREDIX_TOKEN, PREDIX_ZONE_ID, \
  14. PREDIX_INGEST_URL,PREDIX_QUERY_URL
  15. except ImportError:
  16. PREDIX_TOKEN=None
  17. PREDIX_ZONE_ID=None
  18. PREDIX_INGEST_URL=None
  19. PREDIX_QUERY_URL=None
  20. try:
  21. import websocket
  22. import requests
  23. from thingflow.adapters.predix import *
  24. PREREQS_AVAILABLE = True
  25. except ImportError:
  26. PREREQS_AVAILABLE = False
  27. logger = logging.getLogger()
  28. logger.setLevel(logging.DEBUG)
  29. logger.addHandler(logging.StreamHandler())
  30. logging.basicConfig(level=logging.DEBUG)
  31. VALUE_STREAM = [1, 2, 3, 4, 5]
  32. @unittest.skipUnless(PREREQS_AVAILABLE,
  33. "Predix prequisites not available")
  34. @unittest.skipUnless(PREDIX_TOKEN is not None and PREDIX_ZONE_ID is not None and\
  35. PREDIX_INGEST_URL is not None and PREDIX_QUERY_URL is not None,
  36. "Predix not configured in config_for_tests.py")
  37. class TestInput(InputThing):
  38. """Accept events from the predix reader and print them.
  39. After the specified number, disconnect.
  40. """
  41. def __init__(self, prev_in_chain, name):
  42. self.events = []
  43. self.name = name
  44. self.disconnect = prev_in_chain.connect(self)
  45. self.values = []
  46. def on_next(self, x):
  47. print(x)
  48. self.values.append(x.val)
  49. if len(self.values)==len(VALUE_STREAM):
  50. self.disconnect()
  51. print("TestInput %s disconnected" % self.name)
  52. def on_completed(self):
  53. print("Reader %s received %d events" % (self.name, len(self.events)))
  54. TEST_SENSOR1 = 'test-sensor-1'
  55. TEST_SENSOR2 = 'test-sensor-2'
  56. @unittest.skipUnless(PREREQS_AVAILABLE,
  57. "Predix prequisites not available")
  58. @unittest.skipUnless(PREDIX_TOKEN is not None and PREDIX_ZONE_ID is not None and\
  59. PREDIX_INGEST_URL is not None and PREDIX_QUERY_URL is not None,
  60. "Predix not configured in config_for_tests.py")
  61. class TestPredix(unittest.TestCase):
  62. def test_batching(self):
  63. """We write out a set of event from two simulated sensors using an odd batch size (3).
  64. We then read them back and verify that we got all the events.
  65. """
  66. sensor1 = make_test_output_thing_from_vallist(TEST_SENSOR1, VALUE_STREAM)
  67. sensor2 = make_test_output_thing_from_vallist(TEST_SENSOR2, VALUE_STREAM)
  68. writer = PredixWriter(PREDIX_INGEST_URL, PREDIX_ZONE_ID, PREDIX_TOKEN,
  69. extractor=EventExtractor(attributes={'test':True}),
  70. batch_size=3)
  71. sensor1.connect(writer)
  72. sensor2.connect(writer)
  73. scheduler = Scheduler(asyncio.get_event_loop())
  74. scheduler.schedule_periodic(sensor1, 0.5)
  75. scheduler.schedule_periodic(sensor2, 0.5)
  76. start_time = time.time()
  77. scheduler.run_forever()
  78. # Now we read the events back
  79. reader1 = PredixReader(PREDIX_QUERY_URL, PREDIX_ZONE_ID, PREDIX_TOKEN, TEST_SENSOR1,
  80. start_time=start_time,
  81. one_shot=False)
  82. reader2 = PredixReader(PREDIX_QUERY_URL, PREDIX_ZONE_ID, PREDIX_TOKEN, TEST_SENSOR2,
  83. start_time=start_time,
  84. one_shot=False)
  85. ti1 = TestInput(reader1, 'sensor-1')
  86. ti2 = TestInput(reader2, 'sensor-2')
  87. scheduler.schedule_periodic(reader1, 2)
  88. scheduler.schedule_periodic(reader2, 2)
  89. scheduler.run_forever()
  90. self.assertListEqual(VALUE_STREAM, ti1.values)
  91. self.assertListEqual(VALUE_STREAM, ti2.values)
  92. def test_individual(self):
  93. """We write out a set of event from two simulated sensors using a batch size of 1.
  94. We then read them back and verify that we got all the events.
  95. """
  96. sensor1 = make_test_output_thing_from_vallist(TEST_SENSOR1, VALUE_STREAM)
  97. sensor2 = make_test_output_thing_from_vallist(TEST_SENSOR2, VALUE_STREAM)
  98. writer = PredixWriter(PREDIX_INGEST_URL, PREDIX_ZONE_ID, PREDIX_TOKEN,
  99. extractor=EventExtractor(attributes={'test':True}),
  100. batch_size=1)
  101. sensor1.connect(writer)
  102. sensor2.connect(writer)
  103. scheduler = Scheduler(asyncio.get_event_loop())
  104. scheduler.schedule_periodic(sensor1, 0.5)
  105. scheduler.schedule_periodic(sensor2, 0.5)
  106. start_time = time.time()
  107. scheduler.run_forever()
  108. # Now we read the events back
  109. reader1 = PredixReader(PREDIX_QUERY_URL, PREDIX_ZONE_ID, PREDIX_TOKEN, TEST_SENSOR1,
  110. start_time=start_time,
  111. one_shot=False)
  112. reader2 = PredixReader(PREDIX_QUERY_URL, PREDIX_ZONE_ID, PREDIX_TOKEN, TEST_SENSOR2,
  113. start_time=start_time,
  114. one_shot=False)
  115. ti1 = TestInput(reader1, 'sensor-1')
  116. ti2 = TestInput(reader2, 'sensor-2')
  117. scheduler.schedule_periodic(reader1, 2)
  118. scheduler.schedule_periodic(reader2, 2)
  119. scheduler.run_forever()
  120. self.assertListEqual(VALUE_STREAM, ti1.values)
  121. self.assertListEqual(VALUE_STREAM, ti2.values)
  122. if __name__ == '__main__':
  123. logger = logging.getLogger()
  124. logger.setLevel(logging.DEBUG)
  125. logger.addHandler(logging.StreamHandler())
  126. logging.basicConfig(level=logging.DEBUG)
  127. unittest.main()