epc.py 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. import sys
  4. import json
  5. import logging
  6. import ConfigParser
  7. from time import sleep
  8. import time, datetime
  9. import sleekxmpp
  10. from threading import Lock
  11. from kafka.client import KafkaClient
  12. from kafka import KafkaConsumer
  13. from kafka.producer import SimpleProducer
  14. from kafka.common import LeaderNotAvailableError
  15. import threading
  16. class EPCPub():
  17. producer = None
  18. EPC_TOPICS= {'EPC_PORT_SUMMARY_TOPIC': 'port_summary',
  19. 'EPC_PORT_ITEM_TOPIC': 'port_item'}
  20. def __init__(self, config):
  21. host = config.get("epc", "kafka.host").strip('"').strip("'")
  22. port = config.get("epc", "kafka.port")
  23. client = KafkaClient('%s:%s' % (host, port))
  24. self.producer = SimpleProducer(client, async=False)
  25. def post_msg(self, topic, msg):
  26. self.producer.send_messages(topic, msg)
  27. def post_portfolio_summary(self, dict):
  28. msg= (time.time(), ExternalProcessComm.EPC_TOPICS['EPC_PORT_SUMMARY_TOPIC'], dict)
  29. self.post_msg(ExternalProcessComm.EPC_TOPICS['EPC_PORT_SUMMARY_TOPIC'], json.dumps(msg))
  30. def post_portfolio_items(self, ldict):
  31. msg= (time.time(), ExternalProcessComm.EPC_TOPICS['EPC_PORT_ITEM_TOPIC'], ldict)
  32. self.post_msg(ExternalProcessComm.EPC_TOPICS['EPC_PORT_ITEM_TOPIC'], json.dumps(msg))
  33. class ExternalProcessComm(threading.Thread):
  34. producer = None
  35. consumer = None
  36. EPC_TOPICS= {'EPC_PORT_SUMMARY_TOPIC': 'port_summary',
  37. 'EPC_PORT_ITEM_TOPIC': 'port_item'}
  38. def __init__(self, config):
  39. super(ExternalProcessComm, self).__init__()
  40. host = config.get("epc", "kafka.host").strip('"').strip("'")
  41. port = config.get("epc", "kafka.port")
  42. client = KafkaClient('%s:%s' % (host, port))
  43. self.producer = SimpleProducer(client, async=False)
  44. #sleep(1)
  45. print 'create EPC'
  46. # the kafkaConsumer will fail with a no topic error if the topic is not found in the broker
  47. # the next line uses the producer to produce the required topic which will create one
  48. # if it has not been created already
  49. [self.post_msg(v, 'init msg') for k,v in ExternalProcessComm.EPC_TOPICS.iteritems()]
  50. self.consumer = KafkaConsumer( *[(v,0) for k,v in ExternalProcessComm.EPC_TOPICS.iteritems()], \
  51. metadata_broker_list=['%s:%s' % (host, port)],\
  52. group_id = 'epc.group',\
  53. auto_commit_enable=True,\
  54. auto_commit_interval_ms=30 * 1000,\
  55. auto_offset_reset='largest') # discard old ones
  56. # https://kafka.apache.org/08/configuration.html
  57. # What to do when there is no initial offset in Zookeeper or if an offset is out of range:
  58. # * smallest : automatically reset the offset to the smallest offset
  59. # * largest : automatically reset the offset to the largest offset
  60. # * anything else: throw exception to the consumer. If this is set to largest,
  61. # the consumer may lose some messages when the number of partitions, for the topics
  62. # it subscribes to, changes on the broker. To prevent data loss during partition addition, set auto.offset.reset to smallest
  63. def post_msg(self, topic, msg):
  64. self.producer.send_messages(topic, msg)
  65. def post_portfolio_summary(self, dict):
  66. msg= (time.time(), dict)
  67. self.post_msg(ExternalProcessComm.EPC_TOPICS['EPC_PORT_SUMMARY_TOPIC'], json.dumps(msg))
  68. def post_portfolio_items(self, ldict):
  69. msg= (time.time(), ldict)
  70. self.post_msg(ExternalProcessComm.EPC_TOPICS['EPC_PORT_ITEM_TOPIC'], json.dumps(msg))
  71. def run(self):
  72. for message in self.consumer:
  73. logging.info("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
  74. message.offset, message.key,
  75. message.value))
  76. print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
  77. message.offset, message.key,
  78. message.value))
  79. if __name__ == '__main__':
  80. if len(sys.argv) != 2:
  81. print("Usage: %s <config file>" % sys.argv[0])
  82. exit(-1)
  83. cfg_path= sys.argv[1:]
  84. config = ConfigParser.SafeConfigParser()
  85. if len(config.read(cfg_path)) == 0:
  86. raise ValueError, "Failed to open config file"
  87. logging.basicConfig(level=logging.INFO,
  88. format='%(asctime)s %(levelname)s %(message)s')
  89. e = ExternalProcessComm(config)
  90. e.start()
  91. e.post_msg(ExternalProcessComm.EPC_TOPICS['EPC_PORT_SUMMARY_TOPIC'], 'test msg')