test_tcp_stream.py 503 B

12345678910111213141516171819202122
  1. # Copyright 2016 by MPI-SWS and Data-Ken Research.
  2. # Licensed under the Apache 2.0 License.
  3. import asyncio
  4. from antevents.tcpstreamer import TcpStreamObserver
  5. from antevents.base import make_test_publisher, Scheduler
  6. loop = asyncio.get_event_loop()
  7. s = make_test_publisher(1, stop_after_events=10)
  8. t = TcpStreamObserver(loop, "localhost", 2991)
  9. s.subscribe(t)
  10. scheduler = Scheduler(loop)
  11. scheduler.schedule_periodic(s, 2) # sample once every 2 seconds
  12. scheduler.run_forever()
  13. scheduler.stop()