Sfoglia il codice sorgente

Merge branch 'ironfly' of https://github.com/laxaurus/finopt.git into ironfly

esurfer 9 anni fa
parent
commit
05030a2bfa
1 ha cambiato i file con 54 aggiunte e 11 eliminazioni
  1. 54 11
      src/comms/test/base_messaging.py

+ 54 - 11
src/comms/test/base_messaging.py

@@ -1,11 +1,11 @@
-#!/usr/bin/env python
+ #!/usr/bin/env python
 import threading, logging, time
 import sys
 import datetime
 import uuid
 from Queue import Queue
 from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor
-
+from misc2.observer import NotImplementedException
 from kafka import KafkaConsumer, KafkaProducer
 from kafka.structs import TopicPartition
 from kafka.errors import NoBrokersAvailable
@@ -158,6 +158,29 @@ class BaseConsumer(threading.Thread, Publisher):
     def consumer_topic(self, tp):
         return tp + '@' + self.name 
     
+    
+    def clear_offsets(self):
+        """
+            clear the offsets in redis by removing the value from redis
+            and emptying the internal my_topics dict
+            
+            clear offsets is ncessary when the offset in redis was saved 
+            at a time since kafka manager was shut down
+            when kafka restarts, previously buffered 
+            messages are no longer available and instead it will restart its offset at 0.
+            Reading an old offset by BaseConsumer will cause it to think that it
+            is still receving old buffered messages from Kafa but in fact all the messages 
+            since the last shut down of kafka are all gone
+        """
+        for t in self.kwargs['topics']:
+            self.my_topics[t]= {}
+            logging.info("BaseConsumer:clear_offsets Deleting %s from redis..." % self.consumer_topic(t))
+            self.rs.delete(self.consumer_topic(t))
+            
+             
+        #raise NotImplementedException
+    
+    
     def persist_offsets(self, topic, partition, offset):
         #self.rs.set(self.consumer_topic(topic), json.dumps({'partition': partition, 'offset':offset}))
         self.my_topics[topic][str(partition)] = offset
@@ -171,6 +194,10 @@ class BaseConsumer(threading.Thread, Publisher):
     
     def run(self):
         print '%s:%s started' % (self.kwargs['group_id'], self.name)
+        
+        if self.kwargs['clear_offsets'] == 1:
+            self.clear_offsets()
+        
         consumer = KafkaConsumer(bootstrap_servers='%s:%s' % (self.kwargs['bootstrap_host'], self.kwargs['bootstrap_port']),
                                  auto_offset_reset='earliest',
                                  #
@@ -257,7 +284,12 @@ class BaseConsumer(threading.Thread, Publisher):
                 if self.my_topics[message.topic][str(message.partition)] > message.offset:
                     print '********************** old message...discarding %s %d' % (message.topic, message.offset)
                 else:
-                    if self.my_topics[message.topic][str(message.partition)] == message.offset:
+                    #if self.my_topics[message.topic][str(message.partition)] == message.offset:
+                    # if the stored offset in redis equals to the current offset
+                    # notify the observers
+                    # the "and" condition ensures that on a fresh start of kafka server this event is not triggered as
+                    # both saved value in redis and current offset are both 0
+                    if self.my_topics[message.topic][str(message.partition)] == message.offset and message.ofset <> 0:
                         self.dispatch(BaseConsumer.KB_REACHED_LAST_OFFSET, self.enrich_message(message))
                         logging.info('********************** reached the last message previously processed %s %d' % (message.topic, message.offset))
                     else:
@@ -308,6 +340,7 @@ class Prosumer(BaseProducer):
         self.kwargs = kwargs
         
     
+
     
     def add_listeners(self, listeners):
         
@@ -315,6 +348,8 @@ class Prosumer(BaseProducer):
             map(lambda e: self.kconsumer.register(e, l, getattr(l, e)), self.kwargs['topics'])
         
     
+
+    
     def set_stop(self):
         BaseProducer.set_stop(self)
         self.kconsumer.set_stop()
@@ -355,7 +390,14 @@ class SubscriptionListener(BaseMessageListener):
                                    )
         self.i = self.i + 1
         
-            
+    def reqMktData(self, event, items):
+        logging.info("[%s] received %s content:[%s]" % (self.name, event, items))
+        self.producer.send_message('tickPrice', 
+                        self.producer.message_dumps({'field':4, 'typeName':'tickPrice', 'price':1.0682, 'ts':1485661437.83, 'source':'IB', 'tickerId':79, 'canAutoExecute':0}))
+        
+    
+    def tickPrice(self, event, items):   
+        logging.info("[%s] received %s content:[%s]" % (self.name, event, items))
         
     def on_kb_reached_last_offset(self, event, items):
         logging.info("[%s] received on_kb_reached_last_offset content: [%s]" % (self.name, items))
@@ -366,12 +408,12 @@ def test_prosumer2(mode):
     
     if mode == 'A':
                 
-        topicsA = ['gw_subscription_changed']
+        topicsA = ['gw_subscription_changed', 'tickPrice']
         
         pA = Prosumer(name='A', kwargs={'bootstrap_host':'localhost', 'bootstrap_port':9092,
                                         'redis_host':'localhost', 'redis_port':6379, 'redis_db':0,
                                         'group_id': 'groupA', 'session_timeout_ms':10000,
-                                                 'topics': topicsA})
+                                                 'topics': topicsA, 'clear_offsets' : 0})
         sA = SubscriptionListener('earA', pA)
         
         pA.add_listeners([sA])
@@ -379,10 +421,11 @@ def test_prosumer2(mode):
         i = 0
 
         try:
+            pA.send_message('reqMktData', pA.message_dumps({'contract':'dummy'}))
             while True: #i < 5:
                 
-                pA.send_message('gw_req_subscriptions', json.dumps({'desc': 'requesting subscription msg counter:%d' % i, 
-                                                                    'reqid': i}))
+                #pA.send_message('gw_req_subscriptions', pA.message_dumps({'desc': 'requesting subscription msg counter:%d' % i, 
+                #                                                    'reqid': i}))
                 i= i + 1
                 time.sleep(.45)
                 
@@ -396,12 +439,12 @@ def test_prosumer2(mode):
 
         
     else:    
-        topicsB = ['gw_req_subscriptions']
+        topicsB = ['gw_req_subscriptions', 'reqMktData']
         
         pB = Prosumer(name='B', kwargs={'bootstrap_host':'localhost', 'bootstrap_port':9092,
                                         'redis_host':'localhost', 'redis_port':6379, 'redis_db':0,
                                         'group_id': 'groupB', 'session_timeout_ms':10000,
-                                                 'topics': topicsB})
+                                                 'topics': topicsB, 'clear_offsets' : 0})
         sB = SubscriptionListener('earB', pB)
         pB.add_listeners([sB])
         pB.start_prosumer()
@@ -480,7 +523,7 @@ def main():
 #         except (KeyboardInterrupt, SystemExit):
 #                 logging.error('caught user interrupt')
 #                 sys.exit(-1)
-
+