|
|
@@ -21,6 +21,7 @@ from types import NoneType
|
|
|
|
|
|
|
|
|
|
|
|
+
|
|
|
class Producer(threading.Thread):
|
|
|
daemon = True
|
|
|
|
|
|
@@ -196,6 +197,7 @@ class BaseConsumer(threading.Thread, Publisher):
|
|
|
return {'value': message.value, 'partition':message.partition, 'offset': message.offset}
|
|
|
|
|
|
def extract_message_content(self, message):
|
|
|
+ logging.info('BaseConsumer: extract_message_content. %s %s' % (type(message), message))
|
|
|
return json.loads(message.value)
|
|
|
|
|
|
def set_stop(self):
|
|
|
@@ -310,7 +312,7 @@ class BaseConsumer(threading.Thread, Publisher):
|
|
|
if gap == 1:
|
|
|
# the message is valid for dispatching and not to be skipped
|
|
|
#self.dispatch(message.topic, self.enrich_message(message))
|
|
|
- self.dispatch(message.topic, **self.extract_message_content(message))
|
|
|
+ self.dispatch(message.topic, self.extract_message_content(message))
|
|
|
logging.debug('*** On first iteration: Gap=%d Dispatch this valid message to the listener <%s>' % (gap, message.value))
|
|
|
else: # gap exists
|
|
|
logging.info("*** On first iteration: [Topic:%s:Part:%d:Offset:%d]: Gap:%d Attempting to seek to latest message ..."
|
|
|
@@ -340,7 +342,7 @@ class BaseConsumer(threading.Thread, Publisher):
|
|
|
# the "and" condition ensures that on a fresh start of kafka server this event is not triggered as
|
|
|
# both saved value in redis and current offset are both 0
|
|
|
if self.my_topics[message.topic][str(message.partition)] == message.offset and message.offset <> 0:
|
|
|
- self.dispatch(BaseConsumer.KB_REACHED_LAST_OFFSET, self.enrich_message(message))
|
|
|
+ self.dispatch(BaseConsumer.KB_REACHED_LAST_OFFSET, self.extract_message_content(message))
|
|
|
#self.dispatch(message.topic, self.enrich_message(message))
|
|
|
self.dispatch(message.topic, self.extract_message_content(message))
|
|
|
logging.info('********************** reached the last message previously processed %s %d' % (message.topic, message.offset))
|