epc.py 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. import sys
  4. import json
  5. import logging
  6. import ConfigParser
  7. from time import sleep
  8. import time, datetime
  9. import sleekxmpp
  10. from threading import Lock
  11. from kafka.client import KafkaClient
  12. from kafka import KafkaConsumer
  13. from kafka.producer import SimpleProducer
  14. from kafka.common import LeaderNotAvailableError
  15. import threading
  16. class EPCPub():
  17. producer = None
  18. def __init__(self, config):
  19. host = config.get("epc", "kafka.host").strip('"').strip("'")
  20. port = config.get("epc", "kafka.port")
  21. client = KafkaClient('%s:%s' % (host, port))
  22. self.producer = SimpleProducer(client, async=False)
  23. def post_msg(self, topic, msg):
  24. self.producer.send_messages(topic, msg)
  25. def post_portfolio_summary(self, dict):
  26. msg= (time.time(), ExternalProcessComm.EPC_TOPICS['EPC_PORT_SUMMARY_TOPIC'], dict)
  27. self.post_msg(ExternalProcessComm.EPC_TOPICS['EPC_PORT_SUMMARY_TOPIC'], json.dumps(msg))
  28. def post_portfolio_items(self, ldict):
  29. msg= (time.time(), ExternalProcessComm.EPC_TOPICS['EPC_PORT_ITEM_TOPIC'], ldict)
  30. self.post_msg(ExternalProcessComm.EPC_TOPICS['EPC_PORT_ITEM_TOPIC'], json.dumps(msg))
  31. def post_account_summary(self, dict):
  32. msg = (time.time(), ExternalProcessComm.EPC_TOPICS['EPC_ACCT_SUMMARY_TOPIC'], dict)
  33. self.post_msg(ExternalProcessComm.EPC_TOPICS['EPC_ACCT_SUMMARY_TOPIC'], json.dumps(msg))
  34. class ExternalProcessComm(threading.Thread):
  35. producer = None
  36. consumer = None
  37. EPC_TOPICS= {'EPC_PORT_SUMMARY_TOPIC': 'port_summary',
  38. 'EPC_PORT_ITEM_TOPIC': 'port_item',
  39. 'EPC_ACCT_SUMMARY_TOPIC': 'acct_summary'}
  40. def __init__(self, config):
  41. super(ExternalProcessComm, self).__init__()
  42. host = config.get("epc", "kafka.host").strip('"').strip("'")
  43. port = config.get("epc", "kafka.port")
  44. client = KafkaClient('%s:%s' % (host, port))
  45. self.producer = SimpleProducer(client, async=False)
  46. #sleep(1)
  47. print 'create EPC'
  48. # the kafkaConsumer will fail with a no topic error if the topic is not found in the broker
  49. # the next line uses the producer to produce the required topic which will create one
  50. # if it has not been created already
  51. [self.post_msg(v, 'init msg') for k,v in ExternalProcessComm.EPC_TOPICS.iteritems()]
  52. self.consumer = KafkaConsumer( *[(v,0) for k,v in ExternalProcessComm.EPC_TOPICS.iteritems()], \
  53. metadata_broker_list=['%s:%s' % (host, port)],\
  54. group_id = 'epc.group',\
  55. auto_commit_enable=True,\
  56. auto_commit_interval_ms=30 * 1000,\
  57. auto_offset_reset='largest') # discard old ones
  58. # https://kafka.apache.org/08/configuration.html
  59. # What to do when there is no initial offset in Zookeeper or if an offset is out of range:
  60. # * smallest : automatically reset the offset to the smallest offset
  61. # * largest : automatically reset the offset to the largest offset
  62. # * anything else: throw exception to the consumer. If this is set to largest,
  63. # the consumer may lose some messages when the number of partitions, for the topics
  64. # it subscribes to, changes on the broker. To prevent data loss during partition addition, set auto.offset.reset to smallest
  65. def post_msg(self, topic, msg):
  66. self.producer.send_messages(topic, msg)
  67. def post_portfolio_summary(self, dict):
  68. msg= (time.time(), dict)
  69. self.post_msg(ExternalProcessComm.EPC_TOPICS['EPC_PORT_SUMMARY_TOPIC'], json.dumps(msg))
  70. def post_portfolio_items(self, ldict):
  71. msg= (time.time(), ldict)
  72. self.post_msg(ExternalProcessComm.EPC_TOPICS['EPC_PORT_ITEM_TOPIC'], json.dumps(msg))
  73. def post_account_summary(self, dict):
  74. msg = (time.time(), dict)
  75. self.post_msg(ExternalProcessComm.EPC_TOPICS['EPC_ACCT_SUMMARY_TOPIC'], json.dumps(msg))
  76. def run(self):
  77. for message in self.consumer:
  78. logging.info("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
  79. message.offset, message.key,
  80. message.value))
  81. print ("received %s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
  82. message.offset, message.key,
  83. message.value))
  84. if __name__ == '__main__':
  85. if len(sys.argv) != 2:
  86. print("Usage: %s <config file>" % sys.argv[0])
  87. exit(-1)
  88. cfg_path= sys.argv[1:]
  89. config = ConfigParser.SafeConfigParser()
  90. if len(config.read(cfg_path)) == 0:
  91. raise ValueError, "Failed to open config file"
  92. logging.basicConfig(level=logging.INFO,
  93. format='%(asctime)s %(levelname)s %(message)s')
  94. e = ExternalProcessComm(config)
  95. e.start()
  96. e.post_msg(ExternalProcessComm.EPC_TOPICS['EPC_PORT_SUMMARY_TOPIC'], 'test msg')
  97. e2 = ExternalProcessComm(config)
  98. e2.start()