|
|
@@ -196,7 +196,7 @@ class BaseConsumer(threading.Thread, Publisher):
|
|
|
return {'value': message.value, 'partition':message.partition, 'offset': message.offset}
|
|
|
|
|
|
def extract_message_content(self, message):
|
|
|
- return message.value
|
|
|
+ return json.loads(message.value)
|
|
|
|
|
|
def set_stop(self):
|
|
|
self.done = True
|
|
|
@@ -342,13 +342,13 @@ class BaseConsumer(threading.Thread, Publisher):
|
|
|
if self.my_topics[message.topic][str(message.partition)] == message.offset and message.offset <> 0:
|
|
|
self.dispatch(BaseConsumer.KB_REACHED_LAST_OFFSET, self.enrich_message(message))
|
|
|
#self.dispatch(message.topic, self.enrich_message(message))
|
|
|
- self.dispatch(message.topic, **self.extract_message_content(message))
|
|
|
+ self.dispatch(message.topic, self.extract_message_content(message))
|
|
|
logging.info('********************** reached the last message previously processed %s %d' % (message.topic, message.offset))
|
|
|
else:
|
|
|
self.persist_offsets(message.topic, message.partition, message.offset)
|
|
|
#self.dispatch(BaseConsumer.KB_EVENT, {'message': message})
|
|
|
#self.dispatch(message.topic, self.enrich_message(message))
|
|
|
- self.dispatch(message.topic, **self.extract_message_content(message))
|
|
|
+ self.dispatch(message.topic, self.extract_message_content(message))
|
|
|
except StopIteration:
|
|
|
logging.debug('BaseConsumer:run StopIteration Caught. No new message arriving...')
|
|
|
continue
|