tcpstreamer.py 2.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677
  1. # Copyright 2016 by MPI-SWS and Data-Ken Research.
  2. # Licensed under the Apache 2.0 License.
  3. """TODO: This needs to be updated to match the latest base.py!
  4. """
  5. import asyncio
  6. # log = logging.getLogger(__name__)
  7. # formatter = logging.Formatter("%(asctime)s %(levelname)s " +
  8. # "[%(module)s:%(lineno)d] %(message)s")
  9. # log.setLevel(logging.DEBUG)
  10. from thingflow.base import InputThing
  11. clients = {} # task -> (reader, writer)
  12. class TcpStreamInputThing(InputThing):
  13. def __init__(self, loop, host=None, port=2991):
  14. self.server = None
  15. self.task = 0
  16. self.clients = { }
  17. self.host = host
  18. self.port = 2991
  19. self.loop = loop # TODO REMOVE
  20. # set up tcp stream
  21. self.server = loop.run_until_complete(
  22. asyncio.streams.start_server(self._accept_client,
  23. host, port, loop=loop))
  24. def __str__(self):
  25. return "TcpStreamInputThing[{0}, {1}]".format(self.host, self.port)
  26. def _accept_client(self, client_reader, client_writer):
  27. """
  28. This method accepts a new client connection and creates a Task
  29. to handle this client. self.clients is updated to keep track
  30. of the new client.
  31. """
  32. print("Accepting new client")
  33. self.task = self.task + 1
  34. self.clients[self.task] = (client_reader, client_writer)
  35. def stop(self):
  36. """
  37. Stops the TCP server, i.e. closes the listening socket(s).
  38. This method runs the loop until the server sockets are closed.
  39. """
  40. if self.server is not None:
  41. self.server.close()
  42. # TODO: do not have access to loop
  43. self.loop.run_until_complete(self.server.wait_closed())
  44. self.server = None
  45. def on_next(self, msg):
  46. # send message on tcp stream
  47. print("tcp: on_next")
  48. for task, (reader, writer) in self.clients.items():
  49. try:
  50. print("tcp: writing to client")
  51. writer.write(str(msg).encode('utf-8'))
  52. writer.write('\n'.encode('utf-8'))
  53. asyncio.async(writer.drain()) # can this raise exception?
  54. except ConnectionResetError:
  55. print("tcp: client disconnected")
  56. del self.clients[task]
  57. print("tcp: on_next done")
  58. def on_error(self, e):
  59. # close tcp connection
  60. self.stop()
  61. print(e)
  62. def on_completed(self):
  63. # close tcp connection
  64. self.stop()