epc.py 1.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. import sys
  4. import json
  5. import logging
  6. import ConfigParser
  7. from time import sleep
  8. import time, datetime
  9. import sleekxmpp
  10. from threading import Lock
  11. from kafka.client import KafkaClient
  12. from kafka.consumer import SimpleConsumer
  13. from kafka.producer import SimpleProducer
  14. import threading
  15. class ExternalProcessComm(threading.Thread):
  16. producer = None
  17. consumer = None
  18. def __init__(self, config):
  19. super(ExternalProcessComm, self).__init__()
  20. host = config.get("epc", "kafka.host").strip('"').strip("'")
  21. port = config.get("epc", "kafka.port")
  22. client = KafkaClient('%s:%s' % (host, port))
  23. self.producer = SimpleProducer(client, async=False)
  24. self.consumer = SimpleConsumer(client, "epc.group", "epc.topic")
  25. def post_msg(self, topic, msg):
  26. self.producer.send_messages(topic, msg)
  27. def run(self):
  28. for message in self.consumer:
  29. logging.info(message)
  30. if __name__ == '__main__':
  31. if len(sys.argv) != 2:
  32. print("Usage: %s <config file>" % sys.argv[0])
  33. exit(-1)
  34. cfg_path= sys.argv[1:]
  35. config = ConfigParser.SafeConfigParser()
  36. if len(config.read(cfg_path)) == 0:
  37. raise ValueError, "Failed to open config file"
  38. logging.basicConfig(level=logging.INFO,
  39. format='%(asctime)s %(levelname)s %(message)s')
  40. e = ExternalProcessComm(config)
  41. e.start()
  42. e.post_msg('epc.topic', 'test msg')