| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113 |
- # Copyright 2016 by MPI-SWS and Data-Ken Research.
- # Licensed under the Apache 2.0 License.
- """Verify that we can set up timeout events
- """
- import asyncio
- import unittest
- from thingflow.base import Scheduler, FunctionFilter
- import thingflow.filters.timeout
- import thingflow.filters.output
- from utils import make_test_output_thing_from_vallist, ValidationInputThing
- def on_next_alternate(self, x):
- if self.keep_mode:
- print("Sending %s" % x.__repr__())
- self._dispatch_next(x)
- else:
- print("Dropping %s" % x.__repr__())
- self.countdown -= 1
- if self.countdown==0:
- self.countdown = self.N
- self.keep_mode = not self.keep_mode
- class DropPeriodic(FunctionFilter):
- """Allow through N events, drop the next N, and then repeat.
- """
- def __init__(self, previous_in_chain, N=1):
- self.N = N
- self.countdown = N
- self.keep_mode = True
- super().__init__(previous_in_chain, on_next=on_next_alternate,
- name='drop_alternate')
- class EventWatcher(thingflow.filters.timeout.EventWatcher):
- """Repeat the last good event
- """
- def __init__(self):
- self.last_good_event = None
- def on_next(self, x):
- self.last_good_event = x
- def produce_event_for_timeout(self):
- print("producing event for timeout: %s" %
- self.last_good_event.__repr__())
- return self.last_good_event
- sensor_values = [
- 1,
- 2,
- 3,
- 4,
- 5,
- 6
- ]
- expected_values = [
- 1,
- 1,
- 3,
- 3,
- 5,
- 5
- ]
- expected_values_multiple_timeouts = [
- 1,
- 2,
- 2,
- 2,
- 5,
- 6
- ]
- class TestTimeouts(unittest.TestCase):
- def test_supplying_event_on_timeout(self):
- """In this testcase, we drop every other event.
- We set the timeout to a bit longer than the event interval of
- one second. It then supplies the previous event. The resulting
- output stream will show every other value repeated twice.
- """
- sensor = make_test_output_thing_from_vallist(1, sensor_values)
- drop = DropPeriodic(sensor)
- scheduler = Scheduler(asyncio.get_event_loop())
- vo = ValidationInputThing(expected_values, self)
- drop.supply_event_when_timeout(EventWatcher(),
- scheduler, 1.1).output().connect(vo)
- scheduler.schedule_periodic(sensor, 1)
- sensor.print_downstream()
- scheduler.run_forever()
- self.assertTrue(vo.completed,
- "Schedule exited before validation observer completed")
- def test_multiple_timeouts(self):
- """In this testcase, we pass two events, drop two events, etc.
- We set the timeout to a bit longer than the event interval. The last
- good value is supplied when the timeout expires. Thus, we should see
- two good events, two repeats of the first event, two good events, etc.
- """
- sensor = make_test_output_thing_from_vallist(1, sensor_values)
- drop = DropPeriodic(sensor, N=2)
- scheduler = Scheduler(asyncio.get_event_loop())
- vo = ValidationInputThing(expected_values_multiple_timeouts, self)
- drop.supply_event_when_timeout(EventWatcher(),
- scheduler, 1.1).output().connect(vo)
- scheduler.schedule_periodic(sensor, 1)
- sensor.print_downstream()
- scheduler.run_forever()
- self.assertTrue(vo.completed,
- "Schedule exited before validation observer completed")
- if __name__ == '__main__':
- unittest.main()
|