| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990 |
- # Copyright 2016 by MPI-SWS and Data-Ken Research.
- # Licensed under the Apache 2.0 License.
- """Test of IterableAsOutputThing. This was originally test_base.py, but we then
- added the sensor infrastructure and rewrote the test. This test verfies the
- specific IterableAsOutputThing code.
- """
- import asyncio
- import unittest
- from thingflow.base import Scheduler, IterableAsOutputThing
- from utils import make_test_output_thing_from_vallist, ValidationInputThing
- import thingflow.filters.where
- import thingflow.filters.output
- value_stream = [
- 20,
- 30,
- 100,
- 120,
- 20,
- 5,
- 2222
- ]
- expected_stream = [
- 100,
- 120,
- 2222
- ]
- def predicate(v):
- if v[2]>=100.0:
- print("v=%s, True" % v[2])
- return True
- else:
- print("v=%s, False" % v[2])
- return False
- class ErrorIterator:
- """An iterator that thows an error after the initial stream
- (instead of StopIteration).
- """
- def __init__(self, expected_stream):
- self.expected_stream = expected_stream
- self.idx = 0
- def __iterator__(self):
- return self
- def __next__(self):
- if self.idx==len(self.expected_stream):
- raise Exception("Throwing an exception in ErrorIterator")
- else:
- v = self.expected_stream[self.idx]
- self.idx += 1
- return v
-
- class TestIterableAsOutputThing(unittest.TestCase):
- def test_where(self):
- s = make_test_output_thing_from_vallist(1, value_stream)
- w = s.where(predicate)
- w.output()
- vo = ValidationInputThing(expected_stream, self)
- w.connect(vo)
- scheduler = Scheduler(asyncio.get_event_loop())
- scheduler.schedule_periodic(s, 0.5) # sample twice every second
- s.print_downstream()
- scheduler.run_forever()
- self.assertTrue(vo.completed,
- "Schedule exited before validation observer completed")
- self.assertTrue(vo.completed)
- print("That's all folks")
- def test_error_handling(self):
- """This is a non-fatal error, so we should just print the error and
- exit cleanly without propagating the exception.
- """
- s = IterableAsOutputThing(ErrorIterator(expected_stream))
- s.output()
- scheduler = Scheduler(asyncio.get_event_loop())
- scheduler.schedule_periodic(s, 0.5) # sample twice every second
- s.print_downstream()
- scheduler.run_forever()
-
- if __name__ == '__main__':
- unittest.main()
|