|
@@ -55,30 +55,40 @@ class BaseProducer(threading.Thread, Subscriber):
|
|
|
|
|
|
|
|
|
|
|
|
|
self.name = '%s-%s' % (name, uuid.uuid5(uuid.NAMESPACE_OID, name))
|
|
self.name = '%s-%s' % (name, uuid.uuid5(uuid.NAMESPACE_OID, name))
|
|
|
- logging.info('BaseConsumer __init__: name=%s' % self.name)
|
|
|
|
|
|
|
+ logging.info('BaseProducer __init__: name=%s' % self.name)
|
|
|
self.args = args
|
|
self.args = args
|
|
|
self.kwargs = kwargs
|
|
self.kwargs = kwargs
|
|
|
|
|
|
|
|
self.event_q = Queue()
|
|
self.event_q = Queue()
|
|
|
return
|
|
return
|
|
|
|
|
|
|
|
- def send_message(self, topic, message):
|
|
|
|
|
- self.event_q.put((topic, message))
|
|
|
|
|
|
|
+
|
|
|
|
|
+ def send_message(self, topic, plain_text):
|
|
|
|
|
+ self.event_q.put((topic, plain_text))
|
|
|
self.event_q.task_done()
|
|
self.event_q.task_done()
|
|
|
|
|
|
|
|
|
|
+ def set_stop(self):
|
|
|
|
|
+ self.done = True
|
|
|
|
|
+
|
|
|
def run(self):
|
|
def run(self):
|
|
|
try:
|
|
try:
|
|
|
producer = KafkaProducer(bootstrap_servers='%s:%s' % (self.kwargs['bootstrap_host'], self.kwargs['bootstrap_port']))
|
|
producer = KafkaProducer(bootstrap_servers='%s:%s' % (self.kwargs['bootstrap_host'], self.kwargs['bootstrap_port']))
|
|
|
|
|
|
|
|
-
|
|
|
|
|
- while True:
|
|
|
|
|
|
|
+ self.done = False
|
|
|
|
|
+ while self.done <> True:
|
|
|
#today = datetime.date.today()
|
|
#today = datetime.date.today()
|
|
|
|
|
|
|
|
if not self.event_q.empty():
|
|
if not self.event_q.empty():
|
|
|
- topic, message = self.event_q.get()
|
|
|
|
|
|
|
+ topic, plain_text = self.event_q.get()
|
|
|
#s = "BaseProducer topic:[%s] msg:[%s]" % (i, topics[i%2], time.strftime("%b %d %Y %H:%M:%S"))
|
|
#s = "BaseProducer topic:[%s] msg:[%s]" % (i, topics[i%2], time.strftime("%b %d %Y %H:%M:%S"))
|
|
|
- logging.debug("BaseProducer topic:[%s] msg:[%s]" % (topic, message))
|
|
|
|
|
- producer.send(topic, message)
|
|
|
|
|
|
|
+ logging.info("BaseProducer topic:[%s] msg:[%s]" % (topic, plain_text))
|
|
|
|
|
+ producer.send(topic, plain_text)
|
|
|
|
|
+
|
|
|
|
|
+ # to prevent excessive CPU use
|
|
|
|
|
+ time.sleep(0.1)
|
|
|
|
|
+
|
|
|
|
|
+ logging.info('completed run')
|
|
|
|
|
+
|
|
|
|
|
|
|
|
except NoBrokersAvailable:
|
|
except NoBrokersAvailable:
|
|
|
logging.error("NoBrokersAvailable: Has kafka started?")
|
|
logging.error("NoBrokersAvailable: Has kafka started?")
|
|
@@ -86,7 +96,7 @@ class BaseProducer(threading.Thread, Subscriber):
|
|
|
|
|
|
|
|
class BaseConsumer(threading.Thread, Publisher):
|
|
class BaseConsumer(threading.Thread, Publisher):
|
|
|
|
|
|
|
|
- KB_EVENT = "on_kb_event"
|
|
|
|
|
|
|
+ #KB_EVENT = "on_kb_event"
|
|
|
KB_REACHED_LAST_OFFSET = "on_kb_reached_last_offset"
|
|
KB_REACHED_LAST_OFFSET = "on_kb_reached_last_offset"
|
|
|
|
|
|
|
|
#my_topics = {'my-topic':{}, 'my-topic2':{}}
|
|
#my_topics = {'my-topic':{}, 'my-topic2':{}}
|
|
@@ -106,6 +116,7 @@ class BaseConsumer(threading.Thread, Publisher):
|
|
|
consumer_id: name
|
|
consumer_id: name
|
|
|
topics: a list of topic strings
|
|
topics: a list of topic strings
|
|
|
session_timeout_ms:
|
|
session_timeout_ms:
|
|
|
|
|
+ consumer_timeout_ms
|
|
|
"""
|
|
"""
|
|
|
|
|
|
|
|
|
|
|
|
@@ -118,7 +129,8 @@ class BaseConsumer(threading.Thread, Publisher):
|
|
|
for t in self.kwargs['topics']:
|
|
for t in self.kwargs['topics']:
|
|
|
self.my_topics[t]= {}
|
|
self.my_topics[t]= {}
|
|
|
|
|
|
|
|
- self.events = {event: dict() for event in [BaseConsumer.KB_EVENT, BaseConsumer.KB_REACHED_LAST_OFFSET]}
|
|
|
|
|
|
|
+ #self.events = {event: dict() for event in [BaseConsumer.KB_EVENT, BaseConsumer.KB_REACHED_LAST_OFFSET]}
|
|
|
|
|
+ self.events = {event: dict() for event in [BaseConsumer.KB_REACHED_LAST_OFFSET] + self.kwargs['topics']}
|
|
|
return
|
|
return
|
|
|
|
|
|
|
|
|
|
|
|
@@ -150,7 +162,13 @@ class BaseConsumer(threading.Thread, Publisher):
|
|
|
#self.rs.set(self.consumer_topic(topic), json.dumps({'partition': partition, 'offset':offset}))
|
|
#self.rs.set(self.consumer_topic(topic), json.dumps({'partition': partition, 'offset':offset}))
|
|
|
self.my_topics[topic][str(partition)] = offset
|
|
self.my_topics[topic][str(partition)] = offset
|
|
|
self.rs.set(self.consumer_topic(topic), json.dumps(self.my_topics[topic]))
|
|
self.rs.set(self.consumer_topic(topic), json.dumps(self.my_topics[topic]))
|
|
|
-
|
|
|
|
|
|
|
+
|
|
|
|
|
+ def enrich_message(self, message):
|
|
|
|
|
+ return {'value': message.value, 'partition':message.partition, 'offset': message.offset}
|
|
|
|
|
+
|
|
|
|
|
+ def set_stop(self):
|
|
|
|
|
+ self.done = True
|
|
|
|
|
+
|
|
|
def run(self):
|
|
def run(self):
|
|
|
print '%s:%s started' % (self.kwargs['group_id'], self.name)
|
|
print '%s:%s started' % (self.kwargs['group_id'], self.name)
|
|
|
consumer = KafkaConsumer(bootstrap_servers='%s:%s' % (self.kwargs['bootstrap_host'], self.kwargs['bootstrap_port']),
|
|
consumer = KafkaConsumer(bootstrap_servers='%s:%s' % (self.kwargs['bootstrap_host'], self.kwargs['bootstrap_port']),
|
|
@@ -176,8 +194,11 @@ class BaseConsumer(threading.Thread, Publisher):
|
|
|
#
|
|
#
|
|
|
#
|
|
#
|
|
|
#
|
|
#
|
|
|
- partition_assignment_strategy=[RoundRobinPartitionAssignor])
|
|
|
|
|
-
|
|
|
|
|
|
|
+ partition_assignment_strategy=[RoundRobinPartitionAssignor],
|
|
|
|
|
+ #
|
|
|
|
|
+ #
|
|
|
|
|
+ consumer_timeout_ms=1000
|
|
|
|
|
+ )
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@@ -198,39 +219,54 @@ class BaseConsumer(threading.Thread, Publisher):
|
|
|
|
|
|
|
|
#consumer.seek_to_end(TopicPartition(topic='my-topic', partition=0))
|
|
#consumer.seek_to_end(TopicPartition(topic='my-topic', partition=0))
|
|
|
|
|
|
|
|
- done = False
|
|
|
|
|
- while not done:
|
|
|
|
|
- message = consumer.next()
|
|
|
|
|
- #time.sleep(0.25)
|
|
|
|
|
- if message.offset % 50 == 0:
|
|
|
|
|
- logging.info( "[%s]:highwater:%d offset:%d part:%d <%s>" % (self.name, consumer.highwater(TopicPartition(message.topic, message.partition)),
|
|
|
|
|
- message.offset, message.partition, message.value))
|
|
|
|
|
-
|
|
|
|
|
-# for t, ps in map(lambda t: (t, consumer.partitions_for_topic(t)), self.my_topics.keys()):
|
|
|
|
|
-# print "t:%s %s" % (t, ','.join('p:%d, offset:%d' % (p, consumer.position(TopicPartition(topic=t, partition=p))) for p in ps)) # consumer.position(TopicPartition(topic=t, partition=p)))
|
|
|
|
|
-
|
|
|
|
|
- # if this is the first time the consumer is run
|
|
|
|
|
- # it contains no offsets in the redis map, so it has
|
|
|
|
|
- # 0 elements in the map,
|
|
|
|
|
- # then insert a new offset in redis and populate
|
|
|
|
|
- # the local my_topics dict
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
- if len(self.my_topics[message.topic]) == 0:
|
|
|
|
|
- self.persist_offsets(message.topic, message.partition, message.offset)
|
|
|
|
|
- self.my_topics[message.topic] = json.loads(self.rs.get(self.consumer_topic(message.topic)))
|
|
|
|
|
- continue
|
|
|
|
|
|
|
+ self.done = False
|
|
|
|
|
+ while self.done <> True:
|
|
|
|
|
+ try:
|
|
|
|
|
+ message = consumer.next()
|
|
|
|
|
|
|
|
-
|
|
|
|
|
- if self.my_topics[message.topic][str(message.partition)] > message.offset:
|
|
|
|
|
- print '********************** old message...discarding %s %d' % (message.topic, message.offset)
|
|
|
|
|
- else:
|
|
|
|
|
- if self.my_topics[message.topic][str(message.partition)] == message.offset:
|
|
|
|
|
- self.dispatch(BaseConsumer.KB_REACHED_LAST_OFFSET, {'message': message})
|
|
|
|
|
- logging.info('********************** reached the last message previously processed %s %d' % (message.topic, message.offset))
|
|
|
|
|
- else:
|
|
|
|
|
|
|
+ #time.sleep(0.25)
|
|
|
|
|
+ if message.offset % 50 == 0:
|
|
|
|
|
+ logging.info( "[%s]:highwater:%d offset:%d part:%d <%s>" % (self.name, consumer.highwater(TopicPartition(message.topic, message.partition)),
|
|
|
|
|
+ message.offset, message.partition, message.value))
|
|
|
|
|
+
|
|
|
|
|
+ # for t, ps in map(lambda t: (t, consumer.partitions_for_topic(t)), self.my_topics.keys()):
|
|
|
|
|
+ # print "t:%s %s" % (t, ','.join('p:%d, offset:%d' % (p, consumer.position(TopicPartition(topic=t, partition=p))) for p in ps)) # consumer.position(TopicPartition(topic=t, partition=p)))
|
|
|
|
|
+
|
|
|
|
|
+ # if this is the first time the consumer is run
|
|
|
|
|
+ # it contains no offsets in the redis map, so it has
|
|
|
|
|
+ # 0 elements in the map,
|
|
|
|
|
+ # then insert a new offset in redis and populate
|
|
|
|
|
+ # the local my_topics dict
|
|
|
|
|
+
|
|
|
|
|
+ if len(self.my_topics[message.topic]) == 0:
|
|
|
self.persist_offsets(message.topic, message.partition, message.offset)
|
|
self.persist_offsets(message.topic, message.partition, message.offset)
|
|
|
- self.dispatch(BaseConsumer.KB_EVENT, {'message': message})
|
|
|
|
|
|
|
+ self.my_topics[message.topic] = json.loads(self.rs.get(self.consumer_topic(message.topic)))
|
|
|
|
|
+ #continue
|
|
|
|
|
+
|
|
|
|
|
+ """
|
|
|
|
|
+ the message.value received from kafaproducer is expected to contain
|
|
|
|
|
+ plain text encoded as a json string
|
|
|
|
|
+ the content of message.value is not altered. it's content is stored in a dict object
|
|
|
|
|
+ with key = 'value' along with additional kafa metadata
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+ it is the subscriber's job to interpret the content stored in the 'value' key. Typically
|
|
|
|
|
+ it means decoding the content by invoking json.loads
|
|
|
|
|
+
|
|
|
|
|
+ """
|
|
|
|
|
+ if self.my_topics[message.topic][str(message.partition)] > message.offset:
|
|
|
|
|
+ print '********************** old message...discarding %s %d' % (message.topic, message.offset)
|
|
|
|
|
+ else:
|
|
|
|
|
+ if self.my_topics[message.topic][str(message.partition)] == message.offset:
|
|
|
|
|
+ self.dispatch(BaseConsumer.KB_REACHED_LAST_OFFSET, self.enrich_message(message))
|
|
|
|
|
+ logging.info('********************** reached the last message previously processed %s %d' % (message.topic, message.offset))
|
|
|
|
|
+ else:
|
|
|
|
|
+ self.persist_offsets(message.topic, message.partition, message.offset)
|
|
|
|
|
+ #self.dispatch(BaseConsumer.KB_EVENT, {'message': message})
|
|
|
|
|
+ self.dispatch(message.topic, self.enrich_message(message))
|
|
|
|
|
+ except StopIteration:
|
|
|
|
|
+ logging.debug('BaseConsumer:run StopIteration Caught. No new message arriving...')
|
|
|
|
|
+ continue
|
|
|
|
|
|
|
|
|
|
|
|
|
logging.info ('**********************************************done')
|
|
logging.info ('**********************************************done')
|
|
@@ -256,8 +292,8 @@ class SimpleMessageListener(BaseMessageListener):
|
|
|
def __init__(self, name):
|
|
def __init__(self, name):
|
|
|
BaseMessageListener.__init__(self, name)
|
|
BaseMessageListener.__init__(self, name)
|
|
|
|
|
|
|
|
- def on_kb_event(self, param):
|
|
|
|
|
- print "on_kb_event [%s] %s" % (self.name, param)
|
|
|
|
|
|
|
+# def on_kb_event(self, param):
|
|
|
|
|
+# print "on_kb_event [%s] %s" % (self.name, param)
|
|
|
|
|
|
|
|
def on_kb_reached_last_offset(self, param):
|
|
def on_kb_reached_last_offset(self, param):
|
|
|
print "on_kb_reached_last_offset [%s] %s" % (self.name, param)
|
|
print "on_kb_reached_last_offset [%s] %s" % (self.name, param)
|
|
@@ -265,8 +301,115 @@ class SimpleMessageListener(BaseMessageListener):
|
|
|
|
|
|
|
|
class Prosumer(BaseProducer):
|
|
class Prosumer(BaseProducer):
|
|
|
# wrapper object
|
|
# wrapper object
|
|
|
- pass
|
|
|
|
|
|
|
+ def __init__(self, name, kwargs=None):
|
|
|
|
|
+ BaseProducer.__init__(self, group=None, target=None, name=name,
|
|
|
|
|
+ args=(), kwargs=kwargs, verbose=None)
|
|
|
|
|
+ self.kconsumer = BaseConsumer(name=name, kwargs=kwargs)
|
|
|
|
|
+ self.kwargs = kwargs
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+ def add_listeners(self, listeners):
|
|
|
|
|
+
|
|
|
|
|
+ for l in listeners:
|
|
|
|
|
+ map(lambda e: self.kconsumer.register(e, l, getattr(l, e)), self.kwargs['topics'])
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+ def set_stop(self):
|
|
|
|
|
+ BaseProducer.set_stop(self)
|
|
|
|
|
+ self.kconsumer.set_stop()
|
|
|
|
|
+
|
|
|
|
|
+ def start_prosumer(self):
|
|
|
|
|
+ self.kconsumer.start()
|
|
|
|
|
+ self.start()
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+ def message_loads(self, text_msg):
|
|
|
|
|
+ return json.loads(text_msg)
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+ def message_dumps(self, obj_msg):
|
|
|
|
|
+ return json.dumps(obj_msg)
|
|
|
|
|
|
|
|
|
|
+
|
|
|
|
|
+class SubscriptionListener(BaseMessageListener):
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+ def __init__(self, name, producer):
|
|
|
|
|
+ BaseMessageListener.__init__(self, name)
|
|
|
|
|
+ self.producer = producer
|
|
|
|
|
+ self.i = 0
|
|
|
|
|
+
|
|
|
|
|
+ def gw_subscription_changed(self, event, items):
|
|
|
|
|
+ logging.info("[%s] received gw_subscription_changed content: [%s]" % (self.name, items))
|
|
|
|
|
+ #print 'SubscriptionListener:gw_subscription_changed %s' % items
|
|
|
|
|
+
|
|
|
|
|
+# def on_kb_event(self, param):
|
|
|
|
|
+# print "on_kb_event [%s] %s" % (self.name, param)
|
|
|
|
|
+ def gw_req_subscriptions(self, event, items):
|
|
|
|
|
+
|
|
|
|
|
+ logging.info("[%s] received gw_req_subscriptions content:[%s]" % (self.name, items))
|
|
|
|
|
+ vars= self.producer.message_loads(items['value'])
|
|
|
|
|
+ self.producer.send_message('gw_subscription_changed', self.producer.message_dumps({'id': self.i, 'reqid': vars['reqid'],
|
|
|
|
|
+ 'response' : "%s" % (time.strftime("%b %d %Y %H:%M:%S"))})
|
|
|
|
|
+ )
|
|
|
|
|
+ self.i = self.i + 1
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+ def on_kb_reached_last_offset(self, event, items):
|
|
|
|
|
+ logging.info("[%s] received on_kb_reached_last_offset content: [%s]" % (self.name, items))
|
|
|
|
|
+ print "on_kb_reached_last_offset [%s] %s" % (self.name, items)
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+def test_prosumer2(mode):
|
|
|
|
|
+
|
|
|
|
|
+ if mode == 'A':
|
|
|
|
|
+
|
|
|
|
|
+ topicsA = ['gw_subscription_changed']
|
|
|
|
|
+
|
|
|
|
|
+ pA = Prosumer(name='A', kwargs={'bootstrap_host':'localhost', 'bootstrap_port':9092,
|
|
|
|
|
+ 'redis_host':'localhost', 'redis_port':6379, 'redis_db':0,
|
|
|
|
|
+ 'group_id': 'groupA', 'session_timeout_ms':10000,
|
|
|
|
|
+ 'topics': topicsA})
|
|
|
|
|
+ sA = SubscriptionListener('earA', pA)
|
|
|
|
|
+
|
|
|
|
|
+ pA.add_listeners([sA])
|
|
|
|
|
+ pA.start_prosumer()
|
|
|
|
|
+ i = 0
|
|
|
|
|
+
|
|
|
|
|
+ try:
|
|
|
|
|
+ while True: #i < 5:
|
|
|
|
|
+
|
|
|
|
|
+ pA.send_message('gw_req_subscriptions', json.dumps({'desc': 'requesting subscription msg counter:%d' % i,
|
|
|
|
|
+ 'reqid': i}))
|
|
|
|
|
+ i= i + 1
|
|
|
|
|
+ time.sleep(.45)
|
|
|
|
|
+
|
|
|
|
|
+ except (KeyboardInterrupt, SystemExit):
|
|
|
|
|
+ logging.error('caught user interrupt')
|
|
|
|
|
+ pA.set_stop()
|
|
|
|
|
+ pA.join()
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+ else:
|
|
|
|
|
+ topicsB = ['gw_req_subscriptions']
|
|
|
|
|
+
|
|
|
|
|
+ pB = Prosumer(name='B', kwargs={'bootstrap_host':'localhost', 'bootstrap_port':9092,
|
|
|
|
|
+ 'redis_host':'localhost', 'redis_port':6379, 'redis_db':0,
|
|
|
|
|
+ 'group_id': 'groupB', 'session_timeout_ms':10000,
|
|
|
|
|
+ 'topics': topicsB})
|
|
|
|
|
+ sB = SubscriptionListener('earB', pB)
|
|
|
|
|
+ pB.add_listeners([sB])
|
|
|
|
|
+ pB.start_prosumer()
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
|
|
|
class TestProducer(BaseProducer):
|
|
class TestProducer(BaseProducer):
|
|
|
pass
|
|
pass
|
|
@@ -282,6 +425,7 @@ def test_base_proconsumer(mode):
|
|
|
tp.start()
|
|
tp.start()
|
|
|
i = 0
|
|
i = 0
|
|
|
while True:
|
|
while True:
|
|
|
|
|
+
|
|
|
#today = datetime.date.today()
|
|
#today = datetime.date.today()
|
|
|
s = "%d %s test %s" % (i, topics[i%2], time.strftime("%b %d %Y %H:%M:%S"))
|
|
s = "%d %s test %s" % (i, topics[i%2], time.strftime("%b %d %Y %H:%M:%S"))
|
|
|
logging.info(s)
|
|
logging.info(s)
|
|
@@ -313,7 +457,7 @@ def main():
|
|
|
#
|
|
#
|
|
|
# test cases
|
|
# test cases
|
|
|
#
|
|
#
|
|
|
- tp = [ test_base_proconsumer]
|
|
|
|
|
|
|
+ tp = [ test_base_proconsumer, test_prosumer2]
|
|
|
|
|
|
|
|
if len(sys.argv) != 3:
|
|
if len(sys.argv) != 3:
|
|
|
print("Usage: %s <role(producer or consumer): P|C> <test case #[0..1]>" % sys.argv[0])
|
|
print("Usage: %s <role(producer or consumer): P|C> <test case #[0..1]>" % sys.argv[0])
|
|
@@ -329,13 +473,13 @@ def main():
|
|
|
tp[int(sys.argv[2])](mode)
|
|
tp[int(sys.argv[2])](mode)
|
|
|
|
|
|
|
|
#time.sleep(30)
|
|
#time.sleep(30)
|
|
|
- while 1:
|
|
|
|
|
- try:
|
|
|
|
|
- time.sleep(5)
|
|
|
|
|
- pass
|
|
|
|
|
- except (KeyboardInterrupt, SystemExit):
|
|
|
|
|
- logging.error('caught user interrupt')
|
|
|
|
|
- sys.exit(-1)
|
|
|
|
|
|
|
+# while 1:
|
|
|
|
|
+# try:
|
|
|
|
|
+# time.sleep(5)
|
|
|
|
|
+# pass
|
|
|
|
|
+# except (KeyboardInterrupt, SystemExit):
|
|
|
|
|
+# logging.error('caught user interrupt')
|
|
|
|
|
+# sys.exit(-1)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|