test_linq.py 2.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172
  1. # Copyright 2016 by MPI-SWS and Data-Ken Research.
  2. # Licensed under the Apache 2.0 License.
  3. """Tests of the linq apis. Pretty much still manually verified, although running
  4. it as a part of the automated test suite makes a decent regression test.
  5. """
  6. import asyncio
  7. import unittest
  8. from thingflow.base import *
  9. from utils import make_test_output_thing, make_test_output_thing_from_vallist,\
  10. ValidationInputThing
  11. import thingflow.filters.where
  12. import thingflow.filters.output
  13. import thingflow.filters
  14. from thingflow.filters.never import Never
  15. def pp_buf(x):
  16. print("Buffered output: ", x)
  17. print("\n")
  18. class TestLinq(unittest.TestCase):
  19. def test_case(self):
  20. """Rupak, if you want to test more, just add it here or add additional
  21. methods starting with test_
  22. """
  23. loop = asyncio.get_event_loop()
  24. s = make_test_output_thing(1, stop_after_events=5)
  25. t = s.skip(2).some(lambda x: x[2]>100)
  26. s.connect(print)
  27. t.connect(print)
  28. scheduler = Scheduler(loop)
  29. scheduler.schedule_periodic(s, 2) # sample once every 2 seconds
  30. u = s.take_last(3).scan(lambda a, x: a+x[2], 0)
  31. u.connect(print)
  32. v = s.take_last(3).reduce(lambda a, x: a+x[2], 0)
  33. v.connect(print)
  34. w = s.buffer_with_time(5, scheduler)
  35. w.connect(pp_buf)
  36. # w = Never()
  37. # w.connect(print)
  38. # scheduler.schedule_periodic(w, 1)
  39. s.print_downstream()
  40. loop.call_later(30, scheduler.stop)
  41. scheduler.run_forever()
  42. print("That's all folks")
  43. def test_first(self):
  44. """Test the first() operator
  45. """
  46. p = make_test_output_thing_from_vallist(1, [1, 2, 3, 4, 5, 6])
  47. vs = ValidationInputThing([1], self)
  48. p.first().connect(vs)
  49. scheduler = Scheduler(asyncio.get_event_loop())
  50. scheduler.schedule_recurring(p)
  51. scheduler.run_forever()
  52. self.assertTrue(vs.completed)
  53. if __name__ == '__main__':
  54. unittest.main()