epc.py 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. import sys
  4. import json
  5. import logging
  6. import ConfigParser
  7. from time import sleep
  8. import time, datetime
  9. import sleekxmpp
  10. from threading import Lock
  11. from kafka.client import KafkaClient
  12. from kafka import KafkaConsumer
  13. from kafka.producer import SimpleProducer
  14. from kafka.common import LeaderNotAvailableError
  15. import threading
  16. class EPCPub():
  17. producer = None
  18. def __init__(self, config):
  19. host = config.get("epc", "kafka.host").strip('"').strip("'")
  20. port = config.get("epc", "kafka.port")
  21. #client = KafkaClient('%s:%s' % (host, port))
  22. # this version of kafka client has already deprecated
  23. # the program no longer works with the latest version of kafka
  24. # the next line is just to make the program starts up
  25. client = None
  26. self.producer = SimpleProducer(client, async=False)
  27. def post_msg(self, topic, msg):
  28. self.producer.send_messages(topic, msg)
  29. def post_portfolio_summary(self, dict):
  30. msg= (time.time(), ExternalProcessComm.EPC_TOPICS['EPC_PORT_SUMMARY_TOPIC'], dict)
  31. self.post_msg(ExternalProcessComm.EPC_TOPICS['EPC_PORT_SUMMARY_TOPIC'], json.dumps(msg))
  32. def post_portfolio_items(self, ldict):
  33. msg= (time.time(), ExternalProcessComm.EPC_TOPICS['EPC_PORT_ITEM_TOPIC'], ldict)
  34. self.post_msg(ExternalProcessComm.EPC_TOPICS['EPC_PORT_ITEM_TOPIC'], json.dumps(msg))
  35. def post_account_summary(self, dict):
  36. msg = (time.time(), ExternalProcessComm.EPC_TOPICS['EPC_ACCT_SUMMARY_TOPIC'], dict)
  37. self.post_msg(ExternalProcessComm.EPC_TOPICS['EPC_ACCT_SUMMARY_TOPIC'], json.dumps(msg))
  38. class ExternalProcessComm(threading.Thread):
  39. producer = None
  40. consumer = None
  41. EPC_TOPICS= {'EPC_PORT_SUMMARY_TOPIC': 'port_summary',
  42. 'EPC_PORT_ITEM_TOPIC': 'port_item',
  43. 'EPC_ACCT_SUMMARY_TOPIC': 'acct_summary'}
  44. def __init__(self, config):
  45. super(ExternalProcessComm, self).__init__()
  46. host = config.get("epc", "kafka.host").strip('"').strip("'")
  47. port = config.get("epc", "kafka.port")
  48. client = KafkaClient('%s:%s' % (host, port))
  49. self.producer = SimpleProducer(client, async=False)
  50. #sleep(1)
  51. print 'create EPC'
  52. # the kafkaConsumer will fail with a no topic error if the topic is not found in the broker
  53. # the next line uses the producer to produce the required topic which will create one
  54. # if it has not been created already
  55. [self.post_msg(v, 'init msg') for k,v in ExternalProcessComm.EPC_TOPICS.iteritems()]
  56. self.consumer = KafkaConsumer( *[(v,0) for k,v in ExternalProcessComm.EPC_TOPICS.iteritems()], \
  57. metadata_broker_list=['%s:%s' % (host, port)],\
  58. group_id = 'epc.group',\
  59. auto_commit_enable=True,\
  60. auto_commit_interval_ms=30 * 1000,\
  61. auto_offset_reset='largest') # discard old ones
  62. # https://kafka.apache.org/08/configuration.html
  63. # What to do when there is no initial offset in Zookeeper or if an offset is out of range:
  64. # * smallest : automatically reset the offset to the smallest offset
  65. # * largest : automatically reset the offset to the largest offset
  66. # * anything else: throw exception to the consumer. If this is set to largest,
  67. # the consumer may lose some messages when the number of partitions, for the topics
  68. # it subscribes to, changes on the broker. To prevent data loss during partition addition, set auto.offset.reset to smallest
  69. def post_msg(self, topic, msg):
  70. self.producer.send_messages(topic, msg)
  71. def post_portfolio_summary(self, dict):
  72. msg= (time.time(), dict)
  73. self.post_msg(ExternalProcessComm.EPC_TOPICS['EPC_PORT_SUMMARY_TOPIC'], json.dumps(msg))
  74. def post_portfolio_items(self, ldict):
  75. msg= (time.time(), ldict)
  76. self.post_msg(ExternalProcessComm.EPC_TOPICS['EPC_PORT_ITEM_TOPIC'], json.dumps(msg))
  77. def post_account_summary(self, dict):
  78. msg = (time.time(), dict)
  79. self.post_msg(ExternalProcessComm.EPC_TOPICS['EPC_ACCT_SUMMARY_TOPIC'], json.dumps(msg))
  80. def run(self):
  81. for message in self.consumer:
  82. logging.info("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
  83. message.offset, message.key,
  84. message.value))
  85. print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
  86. message.offset, message.key,
  87. message.value))
  88. if __name__ == '__main__':
  89. if len(sys.argv) != 2:
  90. print("Usage: %s <config file>" % sys.argv[0])
  91. exit(-1)
  92. cfg_path= sys.argv[1:]
  93. config = ConfigParser.SafeConfigParser()
  94. if len(config.read(cfg_path)) == 0:
  95. raise ValueError, "Failed to open config file"
  96. logging.basicConfig(level=logging.INFO,
  97. format='%(asctime)s %(levelname)s %(message)s')
  98. e = ExternalProcessComm(config)
  99. e.start()
  100. e.post_msg(ExternalProcessComm.EPC_TOPICS['EPC_PORT_SUMMARY_TOPIC'], 'test msg')