|
@@ -10,17 +10,54 @@ import time, datetime
|
|
|
import sleekxmpp
|
|
import sleekxmpp
|
|
|
from threading import Lock
|
|
from threading import Lock
|
|
|
from kafka.client import KafkaClient
|
|
from kafka.client import KafkaClient
|
|
|
-from kafka.consumer import SimpleConsumer
|
|
|
|
|
|
|
+from kafka import KafkaConsumer
|
|
|
from kafka.producer import SimpleProducer
|
|
from kafka.producer import SimpleProducer
|
|
|
|
|
+from kafka.common import LeaderNotAvailableError
|
|
|
import threading
|
|
import threading
|
|
|
|
|
|
|
|
|
|
+class EPCPub():
|
|
|
|
|
+
|
|
|
|
|
+ producer = None
|
|
|
|
|
+
|
|
|
|
|
+ EPC_TOPICS= {'EPC_PORT_SUMMARY_TOPIC': 'port_summary',
|
|
|
|
|
+ 'EPC_PORT_ITEM_TOPIC': 'port_item'}
|
|
|
|
|
+
|
|
|
|
|
+ def __init__(self, config):
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+ host = config.get("epc", "kafka.host").strip('"').strip("'")
|
|
|
|
|
+ port = config.get("epc", "kafka.port")
|
|
|
|
|
+
|
|
|
|
|
+ client = KafkaClient('%s:%s' % (host, port))
|
|
|
|
|
+ self.producer = SimpleProducer(client, async=False)
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+ def post_msg(self, topic, msg):
|
|
|
|
|
+ self.producer.send_messages(topic, msg)
|
|
|
|
|
+
|
|
|
|
|
|
|
|
|
|
+ def post_portfolio_summary(self, dict):
|
|
|
|
|
|
|
|
|
|
+ msg= (time.time(), ExternalProcessComm.EPC_TOPICS['EPC_PORT_SUMMARY_TOPIC'], dict)
|
|
|
|
|
+
|
|
|
|
|
+ self.post_msg(ExternalProcessComm.EPC_TOPICS['EPC_PORT_SUMMARY_TOPIC'], json.dumps(msg))
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+ def post_portfolio_items(self, ldict):
|
|
|
|
|
+ msg= (time.time(), ExternalProcessComm.EPC_TOPICS['EPC_PORT_ITEM_TOPIC'], ldict)
|
|
|
|
|
+ self.post_msg(ExternalProcessComm.EPC_TOPICS['EPC_PORT_ITEM_TOPIC'], json.dumps(msg))
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
|
|
|
class ExternalProcessComm(threading.Thread):
|
|
class ExternalProcessComm(threading.Thread):
|
|
|
|
|
|
|
|
producer = None
|
|
producer = None
|
|
|
consumer = None
|
|
consumer = None
|
|
|
|
|
+ EPC_TOPICS= {'EPC_PORT_SUMMARY_TOPIC': 'port_summary',
|
|
|
|
|
+ 'EPC_PORT_ITEM_TOPIC': 'port_item'}
|
|
|
|
|
+
|
|
|
def __init__(self, config):
|
|
def __init__(self, config):
|
|
|
|
|
|
|
|
super(ExternalProcessComm, self).__init__()
|
|
super(ExternalProcessComm, self).__init__()
|
|
@@ -29,17 +66,56 @@ class ExternalProcessComm(threading.Thread):
|
|
|
|
|
|
|
|
client = KafkaClient('%s:%s' % (host, port))
|
|
client = KafkaClient('%s:%s' % (host, port))
|
|
|
self.producer = SimpleProducer(client, async=False)
|
|
self.producer = SimpleProducer(client, async=False)
|
|
|
- self.consumer = SimpleConsumer(client, "epc.group", "epc.topic")
|
|
|
|
|
|
|
+ #sleep(1)
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+ print 'create EPC'
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+ # the kafkaConsumer will fail with a no topic error if the topic is not found in the broker
|
|
|
|
|
+ # the next line uses the producer to produce the required topic which will create one
|
|
|
|
|
+ # if it has not been created already
|
|
|
|
|
+
|
|
|
|
|
+ [self.post_msg(v, 'init msg') for k,v in ExternalProcessComm.EPC_TOPICS.iteritems()]
|
|
|
|
|
+ self.consumer = KafkaConsumer( *[(v,0) for k,v in ExternalProcessComm.EPC_TOPICS.iteritems()], \
|
|
|
|
|
+ metadata_broker_list=['%s:%s' % (host, port)],\
|
|
|
|
|
+ group_id = 'epc.group',\
|
|
|
|
|
+ auto_commit_enable=True,\
|
|
|
|
|
+ auto_commit_interval_ms=30 * 1000,\
|
|
|
|
|
+ auto_offset_reset='largest') # discard old ones
|
|
|
|
|
+# https://kafka.apache.org/08/configuration.html
|
|
|
|
|
+# What to do when there is no initial offset in Zookeeper or if an offset is out of range:
|
|
|
|
|
+# * smallest : automatically reset the offset to the smallest offset
|
|
|
|
|
+# * largest : automatically reset the offset to the largest offset
|
|
|
|
|
+# * anything else: throw exception to the consumer. If this is set to largest,
|
|
|
|
|
+# the consumer may lose some messages when the number of partitions, for the topics
|
|
|
|
|
+# it subscribes to, changes on the broker. To prevent data loss during partition addition, set auto.offset.reset to smallest
|
|
|
|
|
|
|
|
def post_msg(self, topic, msg):
|
|
def post_msg(self, topic, msg):
|
|
|
self.producer.send_messages(topic, msg)
|
|
self.producer.send_messages(topic, msg)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
+ def post_portfolio_summary(self, dict):
|
|
|
|
|
+ msg= (time.time(), dict)
|
|
|
|
|
+
|
|
|
|
|
+ self.post_msg(ExternalProcessComm.EPC_TOPICS['EPC_PORT_SUMMARY_TOPIC'], json.dumps(msg))
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+ def post_portfolio_items(self, ldict):
|
|
|
|
|
+ msg= (time.time(), ldict)
|
|
|
|
|
+ self.post_msg(ExternalProcessComm.EPC_TOPICS['EPC_PORT_ITEM_TOPIC'], json.dumps(msg))
|
|
|
|
|
+
|
|
|
def run(self):
|
|
def run(self):
|
|
|
|
|
+
|
|
|
for message in self.consumer:
|
|
for message in self.consumer:
|
|
|
|
|
|
|
|
- logging.info(message)
|
|
|
|
|
|
|
+ logging.info("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
|
|
|
|
|
+ message.offset, message.key,
|
|
|
|
|
+ message.value))
|
|
|
|
|
|
|
|
|
|
+ print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
|
|
|
|
|
+ message.offset, message.key,
|
|
|
|
|
+ message.value))
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@@ -58,4 +134,5 @@ if __name__ == '__main__':
|
|
|
|
|
|
|
|
e = ExternalProcessComm(config)
|
|
e = ExternalProcessComm(config)
|
|
|
e.start()
|
|
e.start()
|
|
|
- e.post_msg('epc.topic', 'test msg')
|
|
|
|
|
|
|
+
|
|
|
|
|
+ e.post_msg(ExternalProcessComm.EPC_TOPICS['EPC_PORT_SUMMARY_TOPIC'], 'test msg')
|