| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155 |
- # Copyright 2016 by MPI-SWS and Data-Ken Research.
- # Licensed under the Apache 2.0 License.
- """Some simple tests for the base layer.
- """
- import asyncio
- import unittest
- from thingflow.base import Scheduler, SensorAsOutputThing, FunctionFilter
- from utils import ValueListSensor, ValidationInputThing, CaptureInputThing
- from thingflow.filters.where import where
- from thingflow.filters.output import output
- from thingflow.filters.map import map
- from thingflow.filters.combinators import passthrough
- value_stream = [
- 20,
- 30,
- 100,
- 120,
- 20,
- 5,
- 2222
- ]
- expected_stream = [
- 100,
- 120,
- 2222
- ]
- def predicate(v):
- if v[2]>=100.0:
- print("v=%s, True" % v[2])
- return True
- else:
- print("v=%s, False" % v[2])
- return False
- class TestBaseScenario(unittest.TestCase):
- def test_where(self):
- """In this version, we create a publisher and use method chaining to
- compose the filters"""
- s = ValueListSensor(1, value_stream)
- p = SensorAsOutputThing(s)
- w = p.where(predicate)
- w.output()
- vo = ValidationInputThing(expected_stream, self.test_where)
- w.connect(vo)
- scheduler = Scheduler(asyncio.get_event_loop())
- scheduler.schedule_periodic(p, 0.5) # sample twice every second
- p.print_downstream()
- scheduler.run_forever()
- self.assertTrue(vo.completed,
- "Schedule exited before validation observer completed")
- print("That's all folks")
- def test_schedule_sensor(self):
- """In this version, we pass the sensor directly to the scheduler and use
- a functional style to compose the filters"""
- s = ValueListSensor(1, value_stream)
- vo = ValidationInputThing(expected_stream, self.test_schedule_sensor)
- scheduler = Scheduler(asyncio.get_event_loop())
- scheduler.schedule_sensor(s, 0.5,
- where(predicate),
- passthrough(vo),
- output())
- scheduler.run_forever()
- self.assertTrue(vo.completed,
- "Schedule exited before validation observer completed")
- print("That's all folks")
- class TestFunctionFilter(unittest.TestCase):
- def test_function_filter(self):
- """Verify the function filter class
- """
- s = ValueListSensor(1, value_stream)
- st = SensorAsOutputThing(s)
- captured_list_ref = [[]]
- got_completed_ref = [False,]
- def on_next(self, x):
- captured_list_ref[0].append(x.val)
- self._dispatch_next(x)
- def on_completed(self):
- got_completed_ref[0] = True
- self._dispatch_completed()
- ff = FunctionFilter(st, on_next=on_next, on_completed=on_completed)
- vo = ValidationInputThing(value_stream, self.test_function_filter)
- ff.connect(vo)
- st.print_downstream()
- scheduler = Scheduler(asyncio.get_event_loop())
- scheduler.schedule_periodic(st, 0.5) # sample twice every second
- scheduler.run_forever()
- self.assertTrue(vo.completed,
- "Schedule exited before validation observer completed")
- self.assertEqual(value_stream, captured_list_ref[0])
- self.assertTrue(got_completed_ref[0])
- print("That's all folks")
- def test_function_filter_error_handling(self):
- """Verify the error handling functionality of the function filter. We do
- this by connecting two downstream paths to the sensor. The first includes
- a function filter that throws an error when it encouters a sensor reading
- of 120. This should disconnect th stream at this point. The second is
- a normal validation input thing. It is connected directly to the sensor,
- and thus should not see any errors.
- """
- s = ValueListSensor(1, value_stream)
- st = SensorAsOutputThing(s)
- captured_list_ref = [[]]
- got_completed_ref = [False,]
- got_on_error_ref = [False,]
- def on_next_throw_exc(self, x):
- if x.val==120:
- raise Exception("expected exc")
- else:
- captured_list_ref[0].append(x.val)
- self._dispatch_next(x)
- def on_completed(self):
- got_completed_ref[0] = True
- self._dispatch_completed()
- def on_error(self, e):
- got_on_error_ref[0] = True
- self._dispatch_error(e)
- ff = FunctionFilter(st, on_next=on_next_throw_exc,
- on_completed=on_completed,
- on_error=on_error)
- ct = CaptureInputThing(expecting_error=True)
- ff.map(lambda x: x.val).connect(ct)
- vo = ValidationInputThing(value_stream, self.test_function_filter_error_handling)
- st.connect(vo)
- st.print_downstream()
- scheduler = Scheduler(asyncio.get_event_loop())
- scheduler.schedule_periodic(st, 0.5) # sample twice every second
- scheduler.run_forever()
- self.assertTrue(vo.completed,
- "Schedule exited before validation observer completed")
- self.assertFalse(ct.completed,
- "Capture thing should not have completed")
- self.assertTrue(ct.errored,
- "Capture thing should have seen an error")
- self.assertFalse(got_completed_ref[0])
- self.assertTrue(got_on_error_ref[0])
- self.assertEqual([20, 30, 100], ct.events, "Capture thing event mismatch")
- self.assertEqual([20, 30, 100], captured_list_ref[0], "captured_list_ref mismatch")
- print("That's all folks")
-
-
- if __name__ == '__main__':
- unittest.main()
|