Parcourir la source

added logic to handle kafka restart by clearing the offsets in redis

bobhk il y a 9 ans
Parent
commit
636500130f
1 fichiers modifiés avec 54 ajouts et 11 suppressions
  1. 54 11
      src/comms/test/base_messaging.py

+ 54 - 11
src/comms/test/base_messaging.py

@@ -1,11 +1,11 @@
-#!/usr/bin/env python
+ #!/usr/bin/env python
 import threading, logging, time
 import threading, logging, time
 import sys
 import sys
 import datetime
 import datetime
 import uuid
 import uuid
 from Queue import Queue
 from Queue import Queue
 from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor
 from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor
-
+from misc2.observer import NotImplementedException
 from kafka import KafkaConsumer, KafkaProducer
 from kafka import KafkaConsumer, KafkaProducer
 from kafka.structs import TopicPartition
 from kafka.structs import TopicPartition
 from kafka.errors import NoBrokersAvailable
 from kafka.errors import NoBrokersAvailable
@@ -158,6 +158,29 @@ class BaseConsumer(threading.Thread, Publisher):
     def consumer_topic(self, tp):
     def consumer_topic(self, tp):
         return tp + '@' + self.name 
         return tp + '@' + self.name 
     
     
+    
+    def clear_offsets(self):
+        """
+            clear the offsets in redis by removing the value from redis
+            and emptying the internal my_topics dict
+            
+            clear offsets is ncessary when the offset in redis was saved 
+            at a time since kafka manager was shut down
+            when kafka restarts, previously buffered 
+            messages are no longer available and instead it will restart its offset at 0.
+            Reading an old offset by BaseConsumer will cause it to think that it
+            is still receving old buffered messages from Kafa but in fact all the messages 
+            since the last shut down of kafka are all gone
+        """
+        for t in self.kwargs['topics']:
+            self.my_topics[t]= {}
+            logging.info("BaseConsumer:clear_offsets Deleting %s from redis..." % self.consumer_topic(t))
+            self.rs.delete(self.consumer_topic(t))
+            
+             
+        #raise NotImplementedException
+    
+    
     def persist_offsets(self, topic, partition, offset):
     def persist_offsets(self, topic, partition, offset):
         #self.rs.set(self.consumer_topic(topic), json.dumps({'partition': partition, 'offset':offset}))
         #self.rs.set(self.consumer_topic(topic), json.dumps({'partition': partition, 'offset':offset}))
         self.my_topics[topic][str(partition)] = offset
         self.my_topics[topic][str(partition)] = offset
@@ -171,6 +194,10 @@ class BaseConsumer(threading.Thread, Publisher):
     
     
     def run(self):
     def run(self):
         print '%s:%s started' % (self.kwargs['group_id'], self.name)
         print '%s:%s started' % (self.kwargs['group_id'], self.name)
+        
+        if self.kwargs['clear_offsets'] == 1:
+            self.clear_offsets()
+        
         consumer = KafkaConsumer(bootstrap_servers='%s:%s' % (self.kwargs['bootstrap_host'], self.kwargs['bootstrap_port']),
         consumer = KafkaConsumer(bootstrap_servers='%s:%s' % (self.kwargs['bootstrap_host'], self.kwargs['bootstrap_port']),
                                  auto_offset_reset='earliest',
                                  auto_offset_reset='earliest',
                                  #
                                  #
@@ -257,7 +284,12 @@ class BaseConsumer(threading.Thread, Publisher):
                 if self.my_topics[message.topic][str(message.partition)] > message.offset:
                 if self.my_topics[message.topic][str(message.partition)] > message.offset:
                     print '********************** old message...discarding %s %d' % (message.topic, message.offset)
                     print '********************** old message...discarding %s %d' % (message.topic, message.offset)
                 else:
                 else:
-                    if self.my_topics[message.topic][str(message.partition)] == message.offset:
+                    #if self.my_topics[message.topic][str(message.partition)] == message.offset:
+                    # if the stored offset in redis equals to the current offset
+                    # notify the observers
+                    # the "and" condition ensures that on a fresh start of kafka server this event is not triggered as
+                    # both saved value in redis and current offset are both 0
+                    if self.my_topics[message.topic][str(message.partition)] == message.offset and message.ofset <> 0:
                         self.dispatch(BaseConsumer.KB_REACHED_LAST_OFFSET, self.enrich_message(message))
                         self.dispatch(BaseConsumer.KB_REACHED_LAST_OFFSET, self.enrich_message(message))
                         logging.info('********************** reached the last message previously processed %s %d' % (message.topic, message.offset))
                         logging.info('********************** reached the last message previously processed %s %d' % (message.topic, message.offset))
                     else:
                     else:
@@ -308,6 +340,7 @@ class Prosumer(BaseProducer):
         self.kwargs = kwargs
         self.kwargs = kwargs
         
         
     
     
+
     
     
     def add_listeners(self, listeners):
     def add_listeners(self, listeners):
         
         
@@ -315,6 +348,8 @@ class Prosumer(BaseProducer):
             map(lambda e: self.kconsumer.register(e, l, getattr(l, e)), self.kwargs['topics'])
             map(lambda e: self.kconsumer.register(e, l, getattr(l, e)), self.kwargs['topics'])
         
         
     
     
+
+    
     def set_stop(self):
     def set_stop(self):
         BaseProducer.set_stop(self)
         BaseProducer.set_stop(self)
         self.kconsumer.set_stop()
         self.kconsumer.set_stop()
@@ -355,7 +390,14 @@ class SubscriptionListener(BaseMessageListener):
                                    )
                                    )
         self.i = self.i + 1
         self.i = self.i + 1
         
         
-            
+    def reqMktData(self, event, items):
+        logging.info("[%s] received %s content:[%s]" % (self.name, event, items))
+        self.producer.send_message('tickPrice', 
+                        self.producer.message_dumps({'field':4, 'typeName':'tickPrice', 'price':1.0682, 'ts':1485661437.83, 'source':'IB', 'tickerId':79, 'canAutoExecute':0}))
+        
+    
+    def tickPrice(self, event, items):   
+        logging.info("[%s] received %s content:[%s]" % (self.name, event, items))
         
         
     def on_kb_reached_last_offset(self, event, items):
     def on_kb_reached_last_offset(self, event, items):
         logging.info("[%s] received on_kb_reached_last_offset content: [%s]" % (self.name, items))
         logging.info("[%s] received on_kb_reached_last_offset content: [%s]" % (self.name, items))
@@ -366,12 +408,12 @@ def test_prosumer2(mode):
     
     
     if mode == 'A':
     if mode == 'A':
                 
                 
-        topicsA = ['gw_subscription_changed']
+        topicsA = ['gw_subscription_changed', 'tickPrice']
         
         
         pA = Prosumer(name='A', kwargs={'bootstrap_host':'localhost', 'bootstrap_port':9092,
         pA = Prosumer(name='A', kwargs={'bootstrap_host':'localhost', 'bootstrap_port':9092,
                                         'redis_host':'localhost', 'redis_port':6379, 'redis_db':0,
                                         'redis_host':'localhost', 'redis_port':6379, 'redis_db':0,
                                         'group_id': 'groupA', 'session_timeout_ms':10000,
                                         'group_id': 'groupA', 'session_timeout_ms':10000,
-                                                 'topics': topicsA})
+                                                 'topics': topicsA, 'clear_offsets' : 0})
         sA = SubscriptionListener('earA', pA)
         sA = SubscriptionListener('earA', pA)
         
         
         pA.add_listeners([sA])
         pA.add_listeners([sA])
@@ -379,10 +421,11 @@ def test_prosumer2(mode):
         i = 0
         i = 0
 
 
         try:
         try:
+            pA.send_message('reqMktData', pA.message_dumps({'contract':'dummy'}))
             while True: #i < 5:
             while True: #i < 5:
                 
                 
-                pA.send_message('gw_req_subscriptions', json.dumps({'desc': 'requesting subscription msg counter:%d' % i, 
-                                                                    'reqid': i}))
+                #pA.send_message('gw_req_subscriptions', pA.message_dumps({'desc': 'requesting subscription msg counter:%d' % i, 
+                #                                                    'reqid': i}))
                 i= i + 1
                 i= i + 1
                 time.sleep(.45)
                 time.sleep(.45)
                 
                 
@@ -396,12 +439,12 @@ def test_prosumer2(mode):
 
 
         
         
     else:    
     else:    
-        topicsB = ['gw_req_subscriptions']
+        topicsB = ['gw_req_subscriptions', 'reqMktData']
         
         
         pB = Prosumer(name='B', kwargs={'bootstrap_host':'localhost', 'bootstrap_port':9092,
         pB = Prosumer(name='B', kwargs={'bootstrap_host':'localhost', 'bootstrap_port':9092,
                                         'redis_host':'localhost', 'redis_port':6379, 'redis_db':0,
                                         'redis_host':'localhost', 'redis_port':6379, 'redis_db':0,
                                         'group_id': 'groupB', 'session_timeout_ms':10000,
                                         'group_id': 'groupB', 'session_timeout_ms':10000,
-                                                 'topics': topicsB})
+                                                 'topics': topicsB, 'clear_offsets' : 0})
         sB = SubscriptionListener('earB', pB)
         sB = SubscriptionListener('earB', pB)
         pB.add_listeners([sB])
         pB.add_listeners([sB])
         pB.start_prosumer()
         pB.start_prosumer()
@@ -480,7 +523,7 @@ def main():
 #         except (KeyboardInterrupt, SystemExit):
 #         except (KeyboardInterrupt, SystemExit):
 #                 logging.error('caught user interrupt')
 #                 logging.error('caught user interrupt')
 #                 sys.exit(-1)
 #                 sys.exit(-1)
-
+