Просмотр исходного кода

add support for seek_to_latest in baseconsumer

split up classes in tws_client_lib into separate source files.
rework subscription manager internal data structure
minor bug fixes
esurfer 9 лет назад
Родитель
Сommit
a1e17e4cf2

BIN
src/comms/__init__.pyc


BIN
src/comms/ibgw/__init__.pyc


+ 113 - 36
src/comms/ibgw/base_messaging.py

@@ -26,7 +26,7 @@ class Producer(threading.Thread):
     def run(self):
         try:
             producer = KafkaProducer(bootstrap_servers='localhost:9092')
-            topics = ['my-topic', 'my-topic2']
+            topics = ['my_topic', 'my_topic2']
             i = 0
             while True:
                 #today = datetime.date.today()
@@ -99,9 +99,11 @@ class BaseProducer(threading.Thread, Subscriber):
 class BaseConsumer(threading.Thread, Publisher):
     
     #KB_EVENT = "on_kb_event"
+    SLOW_CONSUMER_CHECK_EVERY = 50
+    SLOW_CONSUMER_QUALIFY_NUM = 500
     KB_REACHED_LAST_OFFSET = "on_kb_reached_last_offset"
     
-    #my_topics =  {'my-topic':{}, 'my-topic2':{}}    
+    #my_topics =  {'my_topic':{}, 'my_topic2':{}}    
 
     def __init__(self, group=None, target=None, name=None,
                  args=(), kwargs=None, verbose=None):
@@ -116,9 +118,10 @@ class BaseConsumer(threading.Thread, Publisher):
             redis_db
             group_id
             consumer_id: name 
-            topics: a list of topic strings
-            session_timeout_ms: 
+            topics- a list of topic strings
+            session_timeout_ms 
             consumer_timeout_ms
+            seek_to_end- a list of topics that only wants the latest message
         """
         
         
@@ -127,6 +130,10 @@ class BaseConsumer(threading.Thread, Publisher):
         self.args = args
         self.kwargs = kwargs
         self.rs = Redis(self.kwargs['redis_host'], self.kwargs['redis_port'], self.kwargs['redis_db'])
+        try:
+            self.kwargs['seek_to_end']
+        except KeyError:
+            self.kwargs['seek_to_end'] = []
         self.my_topics = {}
         for t in self.kwargs['topics']:
             self.my_topics[t]= {} 
@@ -144,10 +151,10 @@ class BaseConsumer(threading.Thread, Publisher):
     
     """
      each consumer has its own set of topics offsets stored in redis
-     for consumer A and consumer B (with different group_ids) subscribing to the same my-topic, each 
+     for consumer A and consumer B (with different group_ids) subscribing to the same my_topic, each 
      each of them will need to keep track of its own offsets
      to make the redis key unique by consumer, the key is created by topic + '@' + consumer name
-     example: my-topic@consumerA
+     example: my_topic@consumerA
      
      offsets = { topic-consumer_name:
                      {
@@ -245,38 +252,74 @@ class BaseConsumer(threading.Thread, Publisher):
             
         consumer.subscribe(self.my_topics.keys())
 
-        
-        #consumer.seek_to_end(TopicPartition(topic='my-topic', partition=0))
+        #consumer.seek_to_end(TopicPartition(topic='my_topic', partition=0))
 
         self.done = False
+       
+            
         while self.done <> True:
             try:
                 message = consumer.next()
                 
-                #time.sleep(0.25)
-                if message.offset % 50 == 0:
-                    logging.info( "[%s]:highwater:%d offset:%d part:%d <%s>" % (self.name, consumer.highwater(TopicPartition(message.topic, message.partition)),
-                                                                             message.offset, message.partition, message.value))
                 
-    #             for t, ps in map(lambda t: (t, consumer.partitions_for_topic(t)), self.my_topics.keys()):
-    #                 print "t:%s %s" % (t,  ','.join('p:%d, offset:%d' % (p, consumer.position(TopicPartition(topic=t, partition=p))) for p in ps)) # consumer.position(TopicPartition(topic=t, partition=p)))
+                # the next if block is there to serve information purpose only
+                # it may be useful to detect slow consumer situation
+                if message.offset % BaseConsumer.SLOW_CONSUMER_QUALIFY_NUM == 0:
+                    highwater = consumer.highwater(TopicPartition(message.topic, message.partition))
+                    logging.info( "BaseConsumer [%s]:highwater:%d offset:%d part:%d <%s>" %  (self.name, highwater, message.offset, message.partition, message.value))
+                    
+                    if highwater - message.offset >= BaseConsumer.SLOW_CONSUMER_QUALIFY_NUM:
+                        logging.warn("BaseConsumer:run Slow consumer detected! current: %d, highwater:%d, gap:%d" %
+                                        (message.offset, highwater, highwater - message.offset))
+                # the next block is designed to handle the first time the
+                # consumer encounters a topic partition it hasnt' seen yet
+                try:
+                    # the try catch block ensures that on the first run
+                    # the except block is executed once and only once, thereafter since the 
+                    # dictionary object keys are assigned the exception will never
+                    # be caught again
+                    # 
+                    # the try catch is supposed to be faster than a if-else block...
+                    # 
+                    # the try statement below has no meaning, it is there merely to ensure that 
+                    # the block fails on the first run
+                    self.my_topics[message.topic][str(message.partition)] 
+                except KeyError:
+
+                    highwater = consumer.highwater(TopicPartition(message.topic, message.partition))
+                    logging.info( "*** On first iteration: [Topic:%s:Part:%d:Offset:%d]: Number of messages lagging behind= %d. Highwater:%d" 
+                                  % (message.topic, message.partition, message.offset, highwater - message.offset, highwater))
+                                                                                                
+                    for t, ps in map(lambda t: (t, consumer.partitions_for_topic(t)), self.my_topics.keys()):
+                        logging.info ("*** On first iteration: Topic Partition Table: topic:[%s] %s" % (t.rjust(20),  
+                                                         ','.join('partition:%d, offset:%d' % (p, consumer.position(TopicPartition(topic=t, partition=p))) for p in ps)
+                                                         ))
+                        
+                    self.persist_offsets(message.topic, message.partition, message.offset)
+                    self.my_topics[message.topic] = json.loads(self.rs.get(self.consumer_topic(message.topic)))
+                        
+                        
+                    if message.topic in self.kwargs['seek_to_end']:
+                        logging.info("*** On first iteration: [Topic:%s:Part:%d:Offset:%d]: Attempting to seek to latest message ..." 
+                                     % (message.topic, message.partition, message.offset))
+                        consumer.seek_to_end((TopicPartition(topic=message.topic, partition= message.partition)))
+                        continue
                 
                 # if this is the first time the consumer is run
                 # it contains no offsets in the redis map, so it has 
                 # 0 elements in the map,
                 # then insert a new offset in redis and populate
-                # the local my_topics dict
-                
-                if len(self.my_topics[message.topic]) == 0:
-                    self.persist_offsets(message.topic, message.partition, message.offset)
-                    self.my_topics[message.topic] = json.loads(self.rs.get(self.consumer_topic(message.topic)))
-                    #continue
+                # the local my_topics dict                
+#                 if len(self.my_topics[message.topic]) == 0:
+#                     self.persist_offsets(message.topic, message.partition, message.offset)
+#                     self.my_topics[message.topic] = json.loads(self.rs.get(self.consumer_topic(message.topic)))
+#                     #continue
                     
                 """
                     the message.value received from kafaproducer is expected to contain 
                     plain text encoded as a json string
                     the content of message.value is not altered. it's content is stored in a dict object 
-                    with key = 'value' along with additional kafa metadata
+                    with key = 'value' and enriched with additional kafa metadata
                                     
                      
                     it is the subscriber's job to interpret the content stored in the 'value' key. Typically
@@ -325,10 +368,22 @@ class SimpleMessageListener(BaseMessageListener):
     
     def __init__(self, name):
         BaseMessageListener.__init__(self, name)
+        self.cnt_my_topic = 0
+        self.cnt_my_topic2 = 0
     
 #     def on_kb_event(self, param):
 #         print "on_kb_event [%s] %s" % (self.name, param)
-    
+    def my_topic(self, param):
+        if self.cnt_my_topic % 50 == 0:
+            print "SimpleMessageListener:my_topic. %s" % param
+            self.cnt_my_topic += 1
+
+    def my_topic2(self, param):
+        if self.cnt_my_topic2 % 50 == 0:
+            print "SimpleMessageListener:my_topic2. %s" % param
+            self.cnt_my_topic2 += 1
+
+        
     def on_kb_reached_last_offset(self, param):
         print "on_kb_reached_last_offset [%s] %s" % (self.name, param)
 
@@ -525,10 +580,19 @@ class TestProducer(BaseProducer):
     pass
 
 def test_base_proconsumer(mode):
-
+    '''
+        This example demonstrates
+        
+        1) use of consumer_timeout_ms to break out from the consumer.next loop
+        2) how to trap ctrl-c and break out of the running threads
+        3) using Queue to store calls to producer.send_message
+        4) using redis to store the consumer last processed offsets
+        5) use of try-catch block to implement seek_to_latest offset
+        6) inherit and implement MessageListener to subscribe messages dispatched by the consumer 
+    '''
     if mode == 'P':
         #Producer().start()
-        topics = ['my-topic', 'my-topic2']
+        topics = ['my_topic', 'my_topic2']
         tp = TestProducer(name = 'testproducer', kwargs={
                                              'bootstrap_host':'localhost', 'bootstrap_port':9092,
                                              'topics': topics})
@@ -537,26 +601,39 @@ def test_base_proconsumer(mode):
         while True:
             
             #today = datetime.date.today()
-            s = "%d %s test %s" % (i, topics[i%2], time.strftime("%b %d %Y %H:%M:%S"))
-            logging.info(s)
-            tp.send_message(topics[i%2], s)
-            
-            time.sleep(.45)
-            i=i+1
-        
+            try:
+                s = "%d %s test %s" % (i, topics[i%2], time.strftime("%b %d %Y %H:%M:%S"))
+                logging.info(s)
+                tp.send_message(topics[i%2], s)
+                
+                time.sleep(.25)
+                i=i+1
+            except (KeyboardInterrupt, SystemExit):
+                logging.error('caught user interrupt')
+                tp.set_stop()
+                tp.join()
+                sys.exit(-1)
+                    
         
     else:
         
         bc = BaseConsumer(name='bc', kwargs={'redis_host':'localhost', 'redis_port':6379, 'redis_db':0,
                                              'bootstrap_host':'localhost', 'bootstrap_port':9092,
-                                             'group_id':'gid', 'session_timeout_ms':10000, 'topics': ['my-topic', 'my-topic2']})
+                                             'group_id':'gid', 'session_timeout_ms':10000, 'topics': ['my_topic', 'my_topic2'],
+                                             'clear_offsets': True, 'consumer_timeout_ms':1000,
+                                             # uncomment the next line to process messages from since the program was last shut down
+                                             # if seek_to_end is present, for the topic specified the consumer will begin
+                                             # sending the latest message to the listener
+                                             # note that the list only specifies my_topic to receive the latest
+                                             # but not my_topic2. Observe the different behavior by checking the message offset 
+                                             # in the program log
+                                             'seek_to_end': ['my_topic'],            
+                                             }) 
         #bml = BaseMessageListener('bml')
         sml = SimpleMessageListener('simple')
-        #bc.register(BaseConsumer.KB_EVENT, bml)
-        #bc.register(BaseConsumer.KB_REACHED_LAST_OFFSET, bml)
-        bc.register(BaseConsumer.KB_EVENT, sml)
         bc.register(BaseConsumer.KB_REACHED_LAST_OFFSET, sml)
-        
+        bc.register('my_topic', sml)
+        bc.register('my_topic2', sml)
         bc.start()
 
 

+ 97 - 65
src/comms/ibgw/subscription_manager.py

@@ -6,13 +6,14 @@ import json
 from misc2.helpers import ContractHelper
 from ib.ext.Contract import Contract
 from comms.ibgw.base_messaging import BaseMessageListener
+from comms.ibgw.tws_event_handler import TWS_event_handler
 
 
 
 class SubscriptionManager(BaseMessageListener):
     
     
-    
+    TICKER_GAP = 1000
     
     def __init__(self, name, tws_connection, producer, rs_conn, subscription_key):
         BaseMessageListener.__init__(self, name)
@@ -22,10 +23,21 @@ class SubscriptionManager(BaseMessageListener):
         self.rs = rs_conn
         self.subscription_key = subscription_key
 
-        
-        self.handle = []
+        #self.handle = []
         # contract key map to contract ID (index of the handle array)
-        self.tickerId = {}   
+        #self.tickerId = {}
+        '''
+            idContractMap has 3 keys
+            
+            next_id keeps track of the next_id to use when subscribing market data from TWS
+            id_contract and contract_id are dict and reverse dict that store the index of id 
+            to contarct and vice versa
+            
+            id_contract: {<int>, <Contract>}
+            contract_id: {<kvs_contract>, <int>}
+            
+        '''
+        self.idContractMap ={'next_id': 0, 'id_contract':{},'contract_id':{}}       
         # flag to indicate whether to save changes when persist_subscriptions is called       
         self.is_dirty = False
 
@@ -37,36 +49,57 @@ class SubscriptionManager(BaseMessageListener):
         
     def load_subscriptions(self):
         
-        def is_outstanding(c):
+        def is_outstanding(ic):
             
+            c = ic[1]
             today = strftime('%Y%m%d') 
             if c.m_expiry < today and (c.m_secType == 'OPT' or c.m_secType == 'FUT'):
                 logging.info('initialize_subscription_mgr: ignoring expired contract %s%s%s' % (c.m_expiry, c.m_strike, c.m_right))
                 return False
             return True
             
-        if self.rs.get(self.subscription_key):
-            contracts = filter(lambda x: is_outstanding(x), 
-                                   map(lambda x: ContractHelper.kvstring2object(x, Contract), json.loads(self.rs.get(self.subscription_key))))        
-            for c in contracts:
-                logging.info('SubscriptionManager:load_subscription. request market data for: %s' % (ContractHelper.printContract(c)))
-                self.reqMktData('internal-dummy-call', {'value': ContractHelper.contract2kvstring(c)}) 
-            
+        # retrieve the id-contract list from db
+        # remap the list by instantiating the string to object
+        # get rid of the already expired contracts
+        saved_iclist = self.get_id_contracts(db=True)
+        if saved_iclist:
+            ic_list= filter(lambda ic:is_outstanding, map(lambda ic: (ic[0], ContractHelper.kvstring2object(ic[1], Contract)), saved_iclist))
             
+            map(lambda ic: self.request_market_data(ic[0], ic[1], snapshot=False), ic_list) 
+            map(lambda ic: self.request_market_data(ic[0], ic[1], snapshot=True), ic_list)         
+            logging.info('SubscriptionManager:load_subscription. request market data for: %s' % (ic_list))
         
     
+    def request_market_data(self, id, contract, snapshot=False):
+        if snapshot:
+            # the call to TWS will return a snapshot follow 
+            # by the subscription being cancelled. Add 1000 to avoid clashing 
+            # with other subscription ids.  
+            self.tws_connect.reqMktData(id + TWS_event_handler.TICKER_GAP, contract, '', True)
+        else:
+            self.tws_connect.reqMktData(id, contract, '', False)
+            
+    
     # returns -1 if not found, else the key id (which could be a zero value)
     def is_subscribed(self, contract):
-        #print self.conId.keys()
-        ckey = ContractHelper.makeRedisKeyEx(contract) 
-        if ckey not in self.tickerId.keys():
+
+        
+        ckey = ContractHelper.makeRedisKeyEx(contract)
+        try:
+            return self.idContractMap['contract_id'][ckey]
+        except KeyError:
             return -1
-        else:
-            # note that 0 can be a key 
-            # be careful when checking the return values
-            # check for true false instead of using implicit comparsion
-            return self.tickerId[ckey]
-    
+
+    def add_subscription(self, contract):
+        #
+        # structure of idContractMap ={'next_id': -1, 'id_contract':{}, 'contract_id':{}}
+        #
+        id = self.idContractMap['next_id']
+        self.idContractMap['id_contract'][id] = contract
+        self.idContractMap['contract_id']['ContractHelper.makeRedisKeyEx(contract)'] = id        
+        self.idContractMap['next_id'] = id + 1
+  
+        return self.idContractMap['next_id']
 
             
     def reqMktData(self, event, message):
@@ -74,28 +107,20 @@ class SubscriptionManager(BaseMessageListener):
         contract = ContractHelper.kvstring2object(message['value'], Contract)
         #logging.info('SubscriptionManager: reqMktData')
   
-        def add_subscription(contract):
-            self.handle.append(contract)
-            newId = len(self.handle) - 1
-            self.tickerId[ContractHelper.makeRedisKeyEx(contract)] = newId 
-            
-            return newId
-  
         id = self.is_subscribed(contract)
         if id == -1: # not found
-            id = add_subscription(contract)
-            
+            id = self.add_subscription(contract)
             #
             # the conId must be set to zero when calling TWS reqMktData
             # otherwise TWS will fail to subscribe the contract
             contract.m_conId = 0
-            self.tws_connect.reqMktData(id, contract, '', False) 
+            self.request_market_data(id, contract, False) 
             self.is_dirty = True
                 
             logging.info('SubscriptionManager:reqMktData. Requesting market data, id = %d, contract = %s' % (id, ContractHelper.makeRedisKeyEx(contract)))
         
         else:    
-            self.tws_connect.reqMktData(1000 + id, contract, '', True)
+            self.request_market_data(id, contract, True)
             logging.info('SubscriptionManager:reqMktData. contract already subscribed. Request snapshot = %d, contract = %s' % (id, ContractHelper.makeRedisKeyEx(contract)))
         #self.dump()
 
@@ -109,40 +134,53 @@ class SubscriptionManager(BaseMessageListener):
         
 
     # use only after a broken connection is restored
-    # to re request market data 
-    #>>>> not enhanced yet...old code
     def force_resubscription(self):
-        # starting from index 1 of the contract list, call  reqmktdata, and format the result into a list of tuples
-        for i in range(1, len(self.handle)):
-            self.tws_connect.reqMktData(i, self.handle[i], '', False)
-            logging.info('force_resubscription: %s' % ContractHelper.printContract(self.handle[i]))
-       
+       self.load_subscriptions()
             
-    def itemAt(self, id):
-        if id > 0 and id < len(self.handle):
-            return self.handle[id]
-        return -1
 
+    # return id:contract object
+    def get_id_contracts(self, db=False):
+        if db:
+            try:
+                id_contracts = json.loads(self.rs.get(self.subscription_key))
+                return map(lambda x: (x[0], ContractHelper.kv2object(x[1], Contract), id_contracts))
+            except TypeError:
+                logging.info('SubscriptionManager:get_id_contracts. Exception when trying to get id_contracts from redis ***')
+                return None
+        else:
+            return map(lambda x: (x[0], x[1]), 
+                                list(self.idContractMap['id_contract'].iteritems()))
 
+    # return id:contract_strings
+    def get_id_kvs_contracts(self, db):
+        return map(lambda x:(x[0], ContractHelper.contract2kvstring(x[1])), self.get_id_contracts(db))
+    
     def persist_subscriptions(self):
          
+#         if self.is_dirty:
+#             cs = json.dumps(map(lambda x: ContractHelper.object2kvstring(x) if x <> None else None, self.handle))
+#             logging.info('Tws_gateway:persist_subscriptions. updating subscription table to redis store %s' % cs)
+#             self.dump()
+#             self.rs.set(self.subscription_key, cs)
+#             self.is_dirty = False
+
+        #self.idContractMap ={'next_id': -1, 'id_contract':{}, 'contract_id':{}}
         if self.is_dirty:
-            cs = json.dumps(map(lambda x: ContractHelper.object2kvstring(x) if x <> None else None, self.handle))
-            logging.info('Tws_gateway:persist_subscriptions. updating subscription table to redis store %s' % cs)
-            self.dump()
-            self.rs.set(self.subscription_key, cs)
+            # for each id:contract pair in idContractMap['id_contract'] dict, map to a list of (id, kvs_contract) values
+            ic = json.dumps(self.get_id_kvs_contracts(db=False))
+            self.rs.set(self.subscription_key, ic)
             self.is_dirty = False
 
+            logging.info('Tws_gateway:persist_subscriptions. updating subscription table to redis store %s' % ic)
+            self.dump()
 
     def dump(self):
-        
+
         logging.info('subscription manager table:---------------------\n')
-        logging.info(''.join('%d: {%s},\n' % (i,  ''.join('%s:%s, ' % (k, v) for k, v in self.handle[i].__dict__.iteritems() )\
-                                     if self.handle[i] <> None else ''          ) for i in range(len(self.handle)))\
-                     )
-        
-        #logging.info( ''.join('%s[%d],\n' % (k, v) for k, v in self.conId.iteritems()))
-        logging.info( 'Number of instruments subscribed: %d' % len(self.handle))
+        logging.info(''.join ('\n[%s]:[%s]' % (str(ic[0]).rjust(4), ic[1]) for ic in self.get_id_kvs_contracts(db=False)))
+        logging.info(''.join ('\n[%s]:[%d]' % (k.rjust(20), self.idContractMap['contract_id'][k]) 
+                               for k in sorted(self.idContractMap['contract_id'])))       
+        logging.info( 'Number of instruments subscribed: %d' % self.idContractMap['next_id'])
         logging.info( '------------------------------------------------')
 
 
@@ -151,17 +189,11 @@ class SubscriptionManager(BaseMessageListener):
     """
     def gw_req_subscriptions(self, event, message):
         
-        #subm = map(lambda i: ContractHelper.contract2kvstring(self.contract_subscription_mgr.handle[i]), range(len(self.contract_subscription_mgr.handle)))
-        #subm = map(lambda i: ContractHelper.object2kvstring(self.contract_subscription_mgr.handle[i]), range(len(self.contract_subscription_mgr.handle)))
-        subm = map(lambda i: (i, ContractHelper.object2kvstring(self.handle[i])),
-                    range(len(self.handle)))
-        
-        
-        if subm:
-            
-            logging.info('SubscriptionManager:gw_req_subscriptions-------\n%s' % ''.join('\n%s:%s' % (str(v[0]).rjust(6), v[1]) for v in subm))
-            self.producer.send_message('gw_subscriptions', self.producer.message_dumps({'subscriptions': subm}))
-
+        ic = self.get_id_kvs_contracts(db=False)
+        if ic:
+             
+            logging.info('SubscriptionManager:gw_req_subscriptions-------\n%s' % ic)
+            self.producer.send_message('gw_subscriptions', self.producer.message_dumps({'subscriptions': ic}))
         
        
 

+ 0 - 593
src/comms/test/base_messaging.py

@@ -1,593 +0,0 @@
- #!/usr/bin/env python
-import threading, logging, time
-import sys
-import copy
-import datetime
-import uuid
-from Queue import Queue
-from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor
-from misc2.observer import NotImplementedException
-from kafka import KafkaConsumer, KafkaProducer
-from kafka.structs import TopicPartition
-from kafka.errors import NoBrokersAvailable
-# 
-# packages required for ConsumerNextIteratorPersist
-import json
-from redis import Redis
-
-from misc2.observer import Subscriber, Publisher
-from numpy.distutils.fcompiler import none
-
-
-
-class Producer(threading.Thread):
-    daemon = True
-
-    def run(self):
-        try:
-            producer = KafkaProducer(bootstrap_servers='localhost:9092')
-            topics = ['my-topic', 'my-topic2']
-            i = 0
-            while True:
-                #today = datetime.date.today()
-                s = "%d %s test %s" % (i, topics[i%2], time.strftime("%b %d %Y %H:%M:%S"))
-                logging.info(s)
-                producer.send(topics[i%2], s)
-                
-                time.sleep(.45)
-                i=i+1
-        except NoBrokersAvailable:
-            logging.error("NoBrokersAvailable: Has kafka started?")
-
-
-class BaseProducer(threading.Thread, Subscriber):
-
-    def __init__(self, group=None, target=None, name=None,
-                 args=(), kwargs=None, verbose=None):
-        threading.Thread.__init__(self, group=group, target=target, name=name,
-                                  verbose=verbose)
-        """
-        kwargs:
-            bootstrap_host
-            bootstrap_host
-            redis_host
-            session_timeout_ms: 
-        """
-        
-        
-        self.name = '%s-%s' % (name, uuid.uuid5(uuid.NAMESPACE_OID, name)) 
-        logging.info('BaseProducer __init__: name=%s' % self.name)
-        self.args = args
-        self.kwargs = kwargs
-        
-        self.event_q = Queue()
-        return
-
-
-    def send_message(self, topic, plain_text):
-        self.event_q.put((topic, plain_text))
-        self.event_q.task_done()
-
-    def set_stop(self):
-        self.done = True
-        
-    def run(self):
-        try:
-            producer = KafkaProducer(bootstrap_servers='%s:%s' % (self.kwargs['bootstrap_host'], self.kwargs['bootstrap_port']))
-            
-            self.done = False
-            while self.done <> True:
-                #today = datetime.date.today()
-                
-                if not self.event_q.empty():
-                    topic, plain_text = self.event_q.get()
-                    #s = "BaseProducer topic:[%s] msg:[%s]" % (i, topics[i%2], time.strftime("%b %d %Y %H:%M:%S"))
-                    logging.info("BaseProducer topic:[%s] msg:[%s]" % (topic, plain_text))
-                    producer.send(topic, plain_text)
-                    
-                # to prevent excessive CPU use
-                time.sleep(0.1)
-            
-            
-            logging.info ('******** BaseProducer exit done.')
-            
-                
-        except NoBrokersAvailable:
-            logging.error("NoBrokersAvailable: Has kafka started?")
-    
-
-class BaseConsumer(threading.Thread, Publisher):
-    
-    #KB_EVENT = "on_kb_event"
-    KB_REACHED_LAST_OFFSET = "on_kb_reached_last_offset"
-    
-    #my_topics =  {'my-topic':{}, 'my-topic2':{}}    
-
-    def __init__(self, group=None, target=None, name=None,
-                 args=(), kwargs=None, verbose=None):
-        threading.Thread.__init__(self, group=group, target=target, name=name,
-                                  verbose=verbose)
-        """
-        kwargs:
-            bootstrap_host
-            bootstrap_host
-            redis_host
-            redis_port
-            redis_db
-            group_id
-            consumer_id: name 
-            topics: a list of topic strings
-            session_timeout_ms: 
-            consumer_timeout_ms
-        """
-        
-        
-        self.name = '%s-%s' % (name, uuid.uuid5(uuid.NAMESPACE_OID, name)) 
-        logging.info('BaseConsumer __init__: name=%s' % self.name)
-        self.args = args
-        self.kwargs = kwargs
-        self.rs = Redis(self.kwargs['redis_host'], self.kwargs['redis_port'], self.kwargs['redis_db'])
-        self.my_topics = {}
-        for t in self.kwargs['topics']:
-            self.my_topics[t]= {} 
-            
-        #self.events = {event: dict() for event in [BaseConsumer.KB_EVENT, BaseConsumer.KB_REACHED_LAST_OFFSET]}
-        self.events = {event: dict() for event in [BaseConsumer.KB_REACHED_LAST_OFFSET] + self.kwargs['topics']}
-        return
-    
-    
-    # no use: doesn't work
-    def seek_to_last_read_offset(self, consumer):
-        for t in self.my_topics.keys():
-            po = json.loads(self.rs.get(t))
-            consumer.seek(TopicPartition(topic=t, partition=po['partition']), po['offset'])
-    
-    """
-     each consumer has its own set of topics offsets stored in redis
-     for consumer A and consumer B (with different group_ids) subscribing to the same my-topic, each 
-     each of them will need to keep track of its own offsets
-     to make the redis key unique by consumer, the key is created by topic + '@' + consumer name
-     example: my-topic@consumerA
-     
-     offsets = { topic-consumer_name:
-                     {
-                         partition0: offset0,
-                         partition1: offset1,... 
-                     }
-               }
-    """
-    
-    def consumer_topic(self, tp):
-        return tp + '@' + self.name 
-    
-    
-    def clear_offsets(self):
-        """
-            clear the offsets in redis by removing the value from redis
-            and emptying the internal my_topics dict
-            
-            clear offsets is ncessary when the offset in redis was saved 
-            at a time since kafka manager was shut down
-            when kafka restarts, previously buffered 
-            messages are no longer available and instead it will restart its offset at 0.
-            Reading an old offset by BaseConsumer will cause it to think that it
-            is still receving old buffered messages from Kafa but in fact all the messages 
-            since the last shut down of kafka are all gone
-        """
-        for t in self.kwargs['topics']:
-            self.my_topics[t]= {}
-            logging.info("BaseConsumer:clear_offsets Deleting %s from redis..." % self.consumer_topic(t))
-            self.rs.delete(self.consumer_topic(t))
-            
-             
-        #raise NotImplementedException
-    
-    
-    def persist_offsets(self, topic, partition, offset):
-        #self.rs.set(self.consumer_topic(topic), json.dumps({'partition': partition, 'offset':offset}))
-        self.my_topics[topic][str(partition)] = offset
-        self.rs.set(self.consumer_topic(topic), json.dumps(self.my_topics[topic]))
-    
-    def enrich_message(self, message):
-        return {'value': message.value, 'partition':message.partition, 'offset': message.offset}
-    
-    def set_stop(self):
-        self.done = True
-    
-    def run(self):
-        print '%s:%s started' % (self.kwargs['group_id'], self.name)
-        
-        if self.kwargs['clear_offsets'] == True:
-            self.clear_offsets()
-        
-        consumer = KafkaConsumer(bootstrap_servers='%s:%s' % (self.kwargs['bootstrap_host'], self.kwargs['bootstrap_port']),
-                                 auto_offset_reset='earliest',
-                                 #
-                                 # consumers having the same group id works as   
-                                 # a group to seize messages published by the publisher
-                                 # (like a queue, each message is consumed exactly once
-                                 #  by a consumer)
-                                 #
-                                 #
-                                 # in a 'pub-sub' environment, set each consumer having 
-                                 # a unique group_id
-                                 #
-                                 group_id = self.kwargs['group_id'],
-                                 client_id = self.name,
-                                 #
-                                 # session_timeout_ms is the time it takes for another consumer 
-                                 # that has the same group_id to pick up the work
-                                 # to consume messages when this consumer is dead 
-                                 # 
-                                 session_timeout_ms = self.kwargs['session_timeout_ms'],
-                                 #
-                                 # 
-                                 #
-                                 partition_assignment_strategy=[RoundRobinPartitionAssignor],
-                                 #
-                                 #
-                                 consumer_timeout_ms=self.kwargs['consumer_timeout_ms']
-                                )
-
-
-        
-        for topic in self.my_topics.keys():
-            # 
-            # if a topic offset is stored previously (the consumer was run before), 
-            # then load the offset values
-            # and save it locally into the my_topics map 
-            # else self.my_topics[topic] would have zero elements in it
-            if self.rs.keys(self.consumer_topic(topic)):
-                self.my_topics[topic] = json.loads(self.rs.get(self.consumer_topic(topic)))
-
-        print self.my_topics
-        
-            
-        consumer.subscribe(self.my_topics.keys())
-
-        
-        #consumer.seek_to_end(TopicPartition(topic='my-topic', partition=0))
-
-        self.done = False
-        while self.done <> True:
-            try:
-                message = consumer.next()
-                
-                #time.sleep(0.25)
-                if message.offset % 50 == 0:
-                    logging.info( "[%s]:highwater:%d offset:%d part:%d <%s>" % (self.name, consumer.highwater(TopicPartition(message.topic, message.partition)),
-                                                                             message.offset, message.partition, message.value))
-                
-    #             for t, ps in map(lambda t: (t, consumer.partitions_for_topic(t)), self.my_topics.keys()):
-    #                 print "t:%s %s" % (t,  ','.join('p:%d, offset:%d' % (p, consumer.position(TopicPartition(topic=t, partition=p))) for p in ps)) # consumer.position(TopicPartition(topic=t, partition=p)))
-                
-                # if this is the first time the consumer is run
-                # it contains no offsets in the redis map, so it has 
-                # 0 elements in the map,
-                # then insert a new offset in redis and populate
-                # the local my_topics dict
-                
-                if len(self.my_topics[message.topic]) == 0:
-                    self.persist_offsets(message.topic, message.partition, message.offset)
-                    self.my_topics[message.topic] = json.loads(self.rs.get(self.consumer_topic(message.topic)))
-                    #continue
-                    
-                """
-                    the message.value received from kafaproducer is expected to contain 
-                    plain text encoded as a json string
-                    the content of message.value is not altered. it's content is stored in a dict object 
-                    with key = 'value' along with additional kafa metadata
-                                    
-                     
-                    it is the subscriber's job to interpret the content stored in the 'value' key. Typically
-                    it means decoding the content by invoking json.loads 
-                      
-                """
-                if self.my_topics[message.topic][str(message.partition)] > message.offset:
-                    print '********************** old message...discarding %s %d' % (message.topic, message.offset)
-                else:
-                    #if self.my_topics[message.topic][str(message.partition)] == message.offset:
-                    # if the stored offset in redis equals to the current offset
-                    # notify the observers
-                    # the "and" condition ensures that on a fresh start of kafka server this event is not triggered as
-                    # both saved value in redis and current offset are both 0
-                    if self.my_topics[message.topic][str(message.partition)] == message.offset and message.offset <> 0:
-                        self.dispatch(BaseConsumer.KB_REACHED_LAST_OFFSET, self.enrich_message(message))
-                        logging.info('********************** reached the last message previously processed %s %d' % (message.topic, message.offset))
-                    else:
-                        self.persist_offsets(message.topic, message.partition, message.offset)
-                        #self.dispatch(BaseConsumer.KB_EVENT, {'message': message})
-                        self.dispatch(message.topic, self.enrich_message(message))
-            except StopIteration:
-                logging.debug('BaseConsumer:run StopIteration Caught. No new message arriving...')
-                continue
-            
-            
-        logging.info ('******** BaseConsumer exit done.')
-
-
-
-class BaseMessageListener(Subscriber):
-    
-    
-    
-    def update(self, event, param=none):
-        try:
-            event_fn = getattr(self, event)
-            event_fn(param)
-        except AttributeError:
-            err_msg = 'BaseMessageListener:update| function %s not implemented.' % event
-            logging.error('BaseMessageListener [%s]:update %s' % (self.name, err_msg))
-        logging.debug("BaseMessageListener [%s]:update|Event type:[%s] content:[%s]" % (self.name, event, json.dumps(param) if param <> None else "<empty param>"))
-
-
-class SimpleMessageListener(BaseMessageListener):
-    
-    def __init__(self, name):
-        BaseMessageListener.__init__(self, name)
-    
-#     def on_kb_event(self, param):
-#         print "on_kb_event [%s] %s" % (self.name, param)
-    
-    def on_kb_reached_last_offset(self, param):
-        print "on_kb_reached_last_offset [%s] %s" % (self.name, param)
-
-
-class Prosumer(BaseProducer):
-    # wrapper object
-    PROSUMER_DEFAULT_CONFIG = {
-        'bootstrap_servers': 'localhost',
-        'client_id': 'kafka-prosumer' ,
-        'group_id': 'kafka-prosumer-default-group',
-        'key_deserializer': None,
-        'value_deserializer': None,
-        'fetch_max_wait_ms': 500,
-        'fetch_min_bytes': 1,
-        'max_partition_fetch_bytes': 1 * 1024 * 1024,
-        'request_timeout_ms': 40 * 1000,
-        'retry_backoff_ms': 100,
-        'reconnect_backoff_ms': 50,
-        'max_in_flight_requests_per_connection': 5,
-        'auto_offset_reset': 'latest',
-        'enable_auto_commit': True,
-        'auto_commit_interval_ms': 5000,
-        'default_offset_commit_callback': lambda offsets, response: True,
-        'check_crcs': True,
-        'metadata_max_age_ms': 5 * 60 * 1000,
-        'partition_assignment_strategy': (RoundRobinPartitionAssignor),
-        'heartbeat_interval_ms': 3000,
-        'session_timeout_ms': 30000,
-        'max_poll_records': sys.maxsize,
-        'receive_buffer_bytes': None,
-        'send_buffer_bytes': None,
-        'consumer_timeout_ms': 1000,
-        'connections_max_idle_ms': 9 * 60 * 1000, # not implemented yet
-    }    
-    
-    
-    def __init__(self, name, kwargs=None):
-        
-        
-        self.kwargs = copy.copy(self.PROSUMER_DEFAULT_CONFIG)
-        for key in self.kwargs:
-            if key in kwargs:
-                self.kwargs[key] = kwargs.pop(key)        
-        self.kwargs.update(kwargs)
-
-        logging.info('\nProsumer:init: **** Configurations dump ***')
-        logging.info('\n'.join('%s:%s' % (k.ljust(40), self.kwargs[k]) for k in sorted(self.kwargs)))
-
-        BaseProducer.__init__(self, group=None, target=None, name=name,
-                 args=(), kwargs=self.kwargs, verbose=None)
-
-        
-        self.kconsumer = BaseConsumer(name=name,  kwargs=self.kwargs)
-        
-        
-    
-    def add_listener_topics(self, listener, topics):
-        map(lambda e: self.kconsumer.register(e, listener, getattr(listener, e)), topics)
-        
-    def add_listeners(self, listeners):
-        
-        for l in listeners:
-            map(lambda e: self.kconsumer.register(e, l, getattr(l, e)), self.kwargs['topics'])
-        
-    
-
-    
-    def set_stop(self):
-        BaseProducer.set_stop(self)
-        self.kconsumer.set_stop()
-    
-    def start_prosumer(self):
-        self.kconsumer.start()
-        self.start()
-        
-    
-    def message_loads(self, text_msg):
-        return json.loads(text_msg)
-    
-    
-    def message_dumps(self, obj_msg):
-        return json.dumps(obj_msg)
-
-    
-class SubscriptionListener(BaseMessageListener):
-    
-    
-    def __init__(self, name, producer):
-        BaseMessageListener.__init__(self, name)
-        self.producer = producer
-        self.i = 0
-    
-    def gw_subscription_changed(self, event, items):
-        logging.info("[%s] received gw_subscription_changed content: [%s]" % (self.name, items))
-        #print 'SubscriptionListener:gw_subscription_changed %s' % items
-        
-#     def on_kb_event(self, param):
-#         print "on_kb_event [%s] %s" % (self.name, param)
-    def gw_req_subscriptions(self, event, items):
-        
-        logging.info("[%s] received gw_req_subscriptions content:[%s]" % (self.name, items))
-        vars= self.producer.message_loads(items['value'])
-        self.producer.send_message('gw_subscription_changed', self.producer.message_dumps({'id': self.i, 'reqid': vars['reqid'], 
-                                                                          'response' : "%s" % (time.strftime("%b %d %Y %H:%M:%S"))})
-                                   )
-        self.i = self.i + 1
-        
-    def reqMktData(self, event, items):
-        logging.info("[%s] received %s content:[%s]" % (self.name, event, items))
-        self.producer.send_message('tickPrice', 
-                        self.producer.message_dumps({'field':4, 'typeName':'tickPrice', 'price':1.0682, 'ts':1485661437.83, 'source':'IB', 'tickerId':79, 'canAutoExecute':0}))
-        
-    
-    def tickPrice(self, event, items):   
-        logging.info("[%s] received %s content:[%s]" % (self.name, event, items))
-        
-    def on_kb_reached_last_offset(self, event, items):
-        logging.info("[%s] received on_kb_reached_last_offset content: [%s]" % (self.name, items))
-        print "on_kb_reached_last_offset [%s] %s" % (self.name, items)
-        
-            
-def test_prosumer2(mode):
-    
-    if mode == 'A':
-                
-        topicsA = ['gw_subscription_changed', 'tickPrice']
-        
-        pA = Prosumer(name='A', kwargs={'bootstrap_host':'localhost', 'bootstrap_port':9092,
-                                        'redis_host':'localhost', 'redis_port':6379, 'redis_db':0,
-                                        'group_id': 'groupA', 'session_timeout_ms':10000,
-                                                 'topics': topicsA, 'clear_offsets' : False})
-        sA = SubscriptionListener('earA', pA)
-        
-        pA.add_listeners([sA])
-        pA.start_prosumer()
-        i = 0
-
-        try:
-            pA.send_message('reqMktData', pA.message_dumps({'contract':'dummy'}))
-            while True: #i < 5:
-                
-                #pA.send_message('gw_req_subscriptions', pA.message_dumps({'desc': 'requesting subscription msg counter:%d' % i, 
-                #                                                    'reqid': i}))
-                i= i + 1
-                time.sleep(.45)
-                
-        except (KeyboardInterrupt, SystemExit):
-                logging.error('caught user interrupt')
-                pA.set_stop()
-                pA.join()
-      
-            
-        
-
-        
-    else:    
-        topicsB = ['gw_req_subscriptions', 'reqMktData']
-        
-        pB = Prosumer(name='B', kwargs={'bootstrap_host':'localhost', 'bootstrap_port':9092,
-                                        'redis_host':'localhost', 'redis_port':6379, 'redis_db':0,
-                                        'group_id': 'groupB', 'session_timeout_ms':10000,
-                                                 'topics': topicsB, 'clear_offsets' : False})
-        sB = SubscriptionListener('earB', pB)
-        pB.add_listeners([sB])
-        pB.start_prosumer()
-        try:
-            
-            while True: #i < 5:
-                
-                pB.send_message('tickPrice', 
-                        pB.message_dumps({'field':5, 'typeName':'tickPrice', 'price':2.0682, 'ts':1485661437.83, 'source':'IB', 'tickerId':79, 'canAutoExecute':0}))
-                
-                time.sleep(.45)
-                
-        except (KeyboardInterrupt, SystemExit):
-                logging.error('caught user interrupt')
-                pB.set_stop()
-                pB.join()    
-    
-    
-
-    
-
-class TestProducer(BaseProducer):
-    pass
-
-def test_base_proconsumer(mode):
-
-    if mode == 'P':
-        #Producer().start()
-        topics = ['my-topic', 'my-topic2']
-        tp = TestProducer(name = 'testproducer', kwargs={
-                                             'bootstrap_host':'localhost', 'bootstrap_port':9092,
-                                             'topics': topics})
-        tp.start()
-        i = 0 
-        while True:
-            
-            #today = datetime.date.today()
-            s = "%d %s test %s" % (i, topics[i%2], time.strftime("%b %d %Y %H:%M:%S"))
-            logging.info(s)
-            tp.send_message(topics[i%2], s)
-            
-            time.sleep(.45)
-            i=i+1
-        
-        
-    else:
-        
-        bc = BaseConsumer(name='bc', kwargs={'redis_host':'localhost', 'redis_port':6379, 'redis_db':0,
-                                             'bootstrap_host':'localhost', 'bootstrap_port':9092,
-                                             'group_id':'gid', 'session_timeout_ms':10000, 'topics': ['my-topic', 'my-topic2']})
-        #bml = BaseMessageListener('bml')
-        sml = SimpleMessageListener('simple')
-        #bc.register(BaseConsumer.KB_EVENT, bml)
-        #bc.register(BaseConsumer.KB_REACHED_LAST_OFFSET, bml)
-        bc.register(BaseConsumer.KB_EVENT, sml)
-        bc.register(BaseConsumer.KB_REACHED_LAST_OFFSET, sml)
-        
-        bc.start()
-
-
-
-    
-def main():
-    
-    #
-    # test cases
-    #
-    tp = [ test_base_proconsumer, test_prosumer2]
-    
-    if len(sys.argv) != 3:
-        print("Usage: %s <role(producer or consumer): P|C> <test case #[0..1]>" % sys.argv[0])
-        print "\n".join('case #%d: %s' % (i, tp[i].__name__) for i in range(len(tp)))
-        print "example: python %s P 1" % sys.argv[0]
-        print "example: python %s C 1" % sys.argv[0]
-        exit(-1)    
-
-    mode = sys.argv[1] 
-    #gid = sys.argv[2] if sys.argv[2] <> None else "q-group"  
-
-    
-    tp[int(sys.argv[2])](mode)
-
-    #time.sleep(30)
-#     while 1:
-#         try:
-#             time.sleep(5)
-#             pass
-#         except (KeyboardInterrupt, SystemExit):
-#                 logging.error('caught user interrupt')
-#                 sys.exit(-1)
- 
-    
-    
-
-if __name__ == "__main__":
-    logging.basicConfig(
-        format='%(asctime)s.%(msecs)s:%(name)s:%(thread)d:%(levelname)s:%(process)d:%(message)s',
-        level=logging.INFO
-        )
-    main()

+ 1 - 1
src/config/tws_gateway.cfg

@@ -17,7 +17,7 @@ redis_db: 0
 # 7496 - production larry046, 7496 - development,  8496 production mchan927
 #
 tws_host: 'localhost'
-tws_api_port: 7496
+tws_api_port: 4001
 tws_app_id: 74960
 #
 #

BIN
src/misc2/__init__.pyc


+ 9 - 11
src/sh/base_messaging.sh

@@ -1,14 +1,12 @@
 #!/bin/bash
-<<<<<<< HEAD
-FINOPT_HOME=~/l1304/workspace/finopt-ironfly/finopt/src/
-=======
-#FINOPT_HOME=~/l1304/workspace/finopt/src/
-FINOPT_HOME=~/ironfly-workspace/finopt/src/
->>>>>>> branch 'ironfly' of https://github.com/laxaurus/finopt.git
-ROOT=$FINOPT_HOME
-FINDATA=$ROOT/../data 
-SRC=$ROOT
-export PYTHONPATH=$SRC:$PYTHONPATH
 
-python $FINOPT_HOME/comms/test/base_messaging.py $1 $2
 
+HOST=$(hostname)
+echo $HOST
+if [ $HOST == 'hkc-larryc-vm1' ]; then
+	FINOPT_HOME=~/ironfly-workspace/finopt/src
+else
+	FINOPT_HOME=~/l1304/workspace/finopt-ironfly/finopt/src
+fi
+export PYTHONPATH=$FINOPT_HOME:$PYTHONPATH
+python $FINOPT_HOME/comms/ibgw/base_messaging.py $1 $2