| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172 |
- # Copyright 2016 by MPI-SWS and Data-Ken Research.
- # Licensed under the Apache 2.0 License.
- """Tests of the linq apis. Pretty much still manually verified, although running
- it as a part of the automated test suite makes a decent regression test.
- """
- import asyncio
- import unittest
- from thingflow.base import *
- from utils import make_test_output_thing, make_test_output_thing_from_vallist,\
- ValidationInputThing
- import thingflow.filters.where
- import thingflow.filters.output
- import thingflow.filters
- from thingflow.filters.never import Never
- def pp_buf(x):
- print("Buffered output: ", x)
- print("\n")
- class TestLinq(unittest.TestCase):
- def test_case(self):
- """Rupak, if you want to test more, just add it here or add additional
- methods starting with test_
- """
- loop = asyncio.get_event_loop()
-
- s = make_test_output_thing(1, stop_after_events=5)
-
- t = s.skip(2).some(lambda x: x[2]>100)
- s.connect(print)
- t.connect(print)
- scheduler = Scheduler(loop)
- scheduler.schedule_periodic(s, 2) # sample once every 2 seconds
- u = s.take_last(3).scan(lambda a, x: a+x[2], 0)
- u.connect(print)
- v = s.take_last(3).reduce(lambda a, x: a+x[2], 0)
- v.connect(print)
- w = s.buffer_with_time(5, scheduler)
- w.connect(pp_buf)
- # w = Never()
- # w.connect(print)
- # scheduler.schedule_periodic(w, 1)
- s.print_downstream()
- loop.call_later(30, scheduler.stop)
- scheduler.run_forever()
- print("That's all folks")
- def test_first(self):
- """Test the first() operator
- """
- p = make_test_output_thing_from_vallist(1, [1, 2, 3, 4, 5, 6])
- vs = ValidationInputThing([1], self)
- p.first().connect(vs)
- scheduler = Scheduler(asyncio.get_event_loop())
- scheduler.schedule_recurring(p)
- scheduler.run_forever()
- self.assertTrue(vs.completed)
-
-
- if __name__ == '__main__':
- unittest.main()
|