Преглед изворни кода

fixed first message not sent in baseconsumer

esurfer пре 9 година
родитељ
комит
03f64b144e

+ 16 - 3
src/comms/ibc/gw_ex1.py

@@ -5,7 +5,7 @@ import logging
 import json
 
 from ib.ext.Contract import Contract
-
+from optparse import OptionParser
 from misc2.helpers import ContractHelper
 from comms.ibgw.base_messaging import Prosumer
 from comms.tws_protocol_helper import TWS_Protocol
@@ -83,7 +83,20 @@ if __name__ == '__main__':
       'topics': ['position', 'positionEnd', 'gw_subscriptions', 'gw_subscription_changed']
       }
 
-   
+    usage = "usage: %prog [options]"
+    parser = OptionParser(usage=usage)
+    parser.add_option("-c", "--clear_offsets", action="store_true", dest="clear_offsets",
+                      help="delete all redis offsets used by this program")
+    parser.add_option("-g", "--group_id",
+                      action="store", dest="group_id", 
+                      help="assign group_id to this running instance")
+    
+    (options, args) = parser.parse_args()
+    for option, value in options.__dict__.iteritems():
+        if value <> None:
+            kwargs[option] = value
+            
+    #print kwargs    
       
     logconfig = kwargs['logconfig']
     logconfig['format'] = '%(asctime)s %(levelname)-8s %(message)s'    
@@ -92,4 +105,4 @@ if __name__ == '__main__':
     
     test_client(kwargs)
     
-     
+     

+ 22 - 271
src/comms/ibc/tws_client_lib.py

@@ -4,79 +4,19 @@
 import sys
 import copy
 from time import sleep, strftime
-import ConfigParser
 import logging
 import json
-
+from optparse import OptionParser
 from ib.ext.Contract import Contract
 
-from misc2.helpers import ContractHelper, ExecutionFilterHelper, OrderHelper
+from misc2.helpers import ContractHelper, ExecutionFilterHelper, OrderHelper, ConfigMap
 from comms.ibgw.base_messaging import Prosumer, BaseMessageListener
+from comms.ibc.base_client_messaging import GatewayCommandWrapper, AbstractGatewayListener
 from comms.tws_protocol_helper import TWS_Protocol
 from misc2.observer import NotImplementedException
 import redis
          
          
-class GatewayCommandWrapper():
-
-    def __init__(self, producer):
-        self.producer = producer
-  
-    def reqOpenOrders(self):
-        self.producer.send_message('reqOpenOrders', '')
-    
-    def reqIds(self):
-        self.producer.send_message('reqIds', '')
-    
-    def reqNewsBulletins(self):
-        logging.error('reqNewsBulletins: NOT IMPLEMENTED')
-    
-    def cancelNewsBulletins(self):
-        logging.error('cancelNewsBulletins: NOT IMPLEMENTED')
-    
-    def setServerLogLevel(self):
-        logging.error('setServerLogLevel: NOT IMPLEMENTED')
-  
-    def reqAutoOpenOrders(self):
-        logging.error('reqAutoOpenOrders: NOT IMPLEMENTED')
-    
-    def reqAllOpenOrders(self):
-        logging.error('reqAllOpenOrders: NOT IMPLEMENTED')
-    
-    def reqManagedAccts(self):
-        logging.error('reqManagedAccts: NOT IMPLEMENTED')
-    
-    def requestFA(self):
-        logging.error('requestFA: NOT IMPLEMENTED')
-    
-    def reqPositions(self):
-        self.producer.send_message('reqPositions', '')
-        
-    def reqHistoricalData(self):
-        logging.error('reqHistoricalData: NOT IMPLEMENTED')
-        
-    def reqAccountUpdates(self):
-        self.producer.send_message('reqAccountUpdates', '1')
-
-    def reqExecutions(self, exec_filter=None):
-        self.producer.send_message('reqExecutions', ExecutionFilterHelper.object2kvstring(exec_filter) if exec_filter <> None else '')
-
-    def reqMktData(self, contract):
-        self.producer.send_message('reqMktData', ContractHelper.object2kvstring(contract))
-        
-    def reqAccountSummary(self, reqId, group, tags):
-        self.producer.send_message('reqAccountSummary', self.producer.message_dumps([reqId, group, tags]))
-    
-    def placeOrder(self, id, contract, order):
-        self.producer.send_message('placeOrder', 
-                                   self.producer.message_dumps([id, ContractHelper.contract2kvstring(contract), OrderHelper.object2kvstring(order)]))
-    
-        
-
-    def gw_req_subscriptions(self):
-        
-        self.producer.send_message('gw_req_subscriptions', self.producer.message_dumps(None))
-
 
          
 class TWS_client_manager(GatewayCommandWrapper):
@@ -164,207 +104,9 @@ class TWS_client_manager(GatewayCommandWrapper):
         
 
                                   
-        
-        
 
     
-class AbstractGatewayListener(BaseMessageListener):
-    
-    def __init__(self, name):
-        BaseMessageListener.__init__(self, name)
-        
-    
-    def tickPrice(self, event, message_value):  # tickerId, field, price, canAutoExecute):
-        """ generated source for method tickPrice """
-        raise NotImplementedException
-   
-    def tickSize(self, event, message_value):  # tickerId, field, size):
-        """ generated source for method tickSize """
-        raise NotImplementedException
-   
-    def tickOptionComputation(self, event, message_value):  # tickerId, field, impliedVol, delta, optPrice, pvDividend, gamma, vega, theta, undPrice):
-        """ generated source for method tickOptionComputation """
-        raise NotImplementedException
-   
-    def tickGeneric(self, event, message_value):  # tickerId, tickType, value):
-        """ generated source for method tickGeneric """
-        raise NotImplementedException
-   
-    def tickString(self, event, message_value):  # tickerId, tickType, value):
-        """ generated source for method tickString """
-        raise NotImplementedException
-   
-    def tickEFP(self, event, message_value):  # tickerId, tickType, basisPoints, formattedBasisPoints, impliedFuture, holdDays, futureExpiry, dividendImpact, dividendsToExpiry):
-        """ generated source for method tickEFP """
-        raise NotImplementedException
-   
-    def orderStatus(self, event, message_value):  # orderId, status, filled, remaining, avgFillPrice, permId, parentId, lastFillPrice, clientId, whyHeld):
-        """ generated source for method orderStatus """
-        raise NotImplementedException
-   
-    def openOrder(self, event, message_value):  # orderId, contract, order, orderState):
-        """ generated source for method openOrder """
-        raise NotImplementedException
-   
-    def openOrderEnd(self, event, message_value):
-        """ generated source for method openOrderEnd """
-        raise NotImplementedException
-   
-    def updateAccountValue(self, event, message_value):  # key, value, currency, accountName):
-        """ generated source for method updateAccountValue """
-        raise NotImplementedException
-   
-    def updatePortfolio(self, event, message_value):  # contract, position, marketPrice, marketValue, averageCost, unrealizedPNL, realizedPNL, accountName):
-        """ generated source for method updatePortfolio """
-        raise NotImplementedException
-   
-    def updateAccountTime(self, event, message_value):  # timeStamp):
-        """ generated source for method updateAccountTime """
-        raise NotImplementedException
-   
-    def accountDownloadEnd(self, event, message_value):  # accountName):
-        """ generated source for method accountDownloadEnd """
-        raise NotImplementedException
-   
-    def nextValidId(self, event, message_value):  # orderId):
-        """ generated source for method nextValidId """
-        raise NotImplementedException
-   
-    def contractDetails(self, event, message_value):  # reqId, contractDetails):
-        """ generated source for method contractDetails """
-        raise NotImplementedException
-   
-    def bondContractDetails(self, event, message_value):  # reqId, contractDetails):
-        """ generated source for method bondContractDetails """
-        raise NotImplementedException
-   
-    def contractDetailsEnd(self, event, message_value):  # reqId):
-        """ generated source for method contractDetailsEnd """
-        raise NotImplementedException
-   
-    def execDetails(self, event, message_value):  # reqId, contract, execution):
-        """ generated source for method execDetails """
-        raise NotImplementedException
-   
-    def execDetailsEnd(self, event, message_value):  # reqId):
-        """ generated source for method execDetailsEnd """
-        raise NotImplementedException
-   
-    def updateMktDepth(self, event, message_value):  # tickerId, position, operation, side, price, size):
-        """ generated source for method updateMktDepth """
-        raise NotImplementedException
-   
-    def updateMktDepthL2(self, event, message_value):  # tickerId, position, marketMaker, operation, side, price, size):
-        """ generated source for method updateMktDepthL2 """
-        raise NotImplementedException
-   
-    def updateNewsBulletin(self, event, message_value):  # msgId, msgType, message, origExchange):
-        """ generated source for method updateNewsBulletin """
-        raise NotImplementedException
-   
-    def managedAccounts(self, event, message_value):  # accountsList):
-        """ generated source for method managedAccounts """
-        raise NotImplementedException
-   
-    def receiveFA(self, event, message_value):  # faDataType, xml):
-        """ generated source for method receiveFA """
-        raise NotImplementedException
-   
-    def historicalData(self, event, message_value):  # reqId, date, open, high, low, close, volume, count, WAP, hasGaps):
-        """ generated source for method historicalData """
-        raise NotImplementedException
-   
-    def scannerParameters(self, event, message_value):  # xml):
-        """ generated source for method scannerParameters """
-        raise NotImplementedException
-   
-    def scannerData(self, event, message_value):  # reqId, rank, contractDetails, distance, benchmark, projection, legsStr):
-        """ generated source for method scannerData """
-        raise NotImplementedException
-   
-    def scannerDataEnd(self, event, message_value):  # reqId):
-        """ generated source for method scannerDataEnd """
-        raise NotImplementedException
-   
-    def realtimeBar(self, event, message_value):  # reqId, time, open, high, low, close, volume, wap, count):
-        """ generated source for method realtimeBar """
-        raise NotImplementedException
-   
-    def currentTime(self, event, message_value):  # time):
-        """ generated source for method currentTime """
-        raise NotImplementedException
-   
-    def fundamentalData(self, event, message_value):  # reqId, data):
-        """ generated source for method fundamentalData """
-        raise NotImplementedException
-   
-    def deltaNeutralValidation(self, event, message_value):  # reqId, underComp):
-        """ generated source for method deltaNeutralValidation """
-        raise NotImplementedException
-   
-    def tickSnapshotEnd(self, event, message_value):  # reqId):
-        """ generated source for method tickSnapshotEnd """
-        raise NotImplementedException
-   
-    def marketDataType(self, event, message_value):  # reqId, marketDataType):
-        """ generated source for method marketDataType """
-        raise NotImplementedException
-   
-    def commissionReport(self, event, message_value):  # commissionReport):
-        """ generated source for method commissionReport """
-        raise NotImplementedException
-   
-    def position(self, event, message_value):  # account, contract, pos, avgCost):
-        """ generated source for method position """
-        raise NotImplementedException
-   
-    def positionEnd(self, event, message_value):
-        """ generated source for method positionEnd """
-        raise NotImplementedException
-   
-    def accountSummary(self, event, message_value):  # reqId, account, tag, value, currency):
-        """ generated source for method accountSummary """
-        raise NotImplementedException
-   
-    def accountSummaryEnd(self, event, message_value):  # reqId):
-        """ generated source for method accountSummaryEnd """
-        raise NotImplementedException
-
-    def gw_subscription_changed(self, event, message_value):  # event, items):
-        raise NotImplementedException        
-#         logging.info("[%s] received gw_subscription_changed content: [%s]" % (self.name, message_value))
-        
-    def gw_subscriptions(self, event, message_value):
-        raise NotImplementedException        
-      
-    def error(self, event, message_value):
-        raise NotImplementedException
-    
-    def on_kb_reached_last_offset(self, event, message_value):  # event, items):
-        logging.info("[%s] received on_kb_reached_last_offset content: [%s]" % (self.name, message_value))
-        print "on_kb_reached_last_offset [%s] %s" % (self.name, message_value)
-    
 
-    
-class ConfigMap():
-    
-    def kwargs_from_file(self, path):
-        cfg = ConfigParser.ConfigParser()            
-        if len(cfg.read(path)) == 0: 
-            raise ValueError, "Failed to open config file [%s]" % path 
-
-        kwargs = {}
-        for section in cfg.sections():
-            optval_list = map(lambda o: (o, cfg.get(section, o)), cfg.options(section)) 
-            for ov in optval_list:
-                try:
-                    
-                    kwargs[ov[0]] = eval(ov[1])
-                except:
-                    continue
-                
-        #logging.debug('ConfigMap: %s' % kwargs)
-        return kwargs
        
 class GatewayMessageListener(AbstractGatewayListener):   
     def __init__(self, name):
@@ -380,8 +122,8 @@ class GatewayMessageListener(AbstractGatewayListener):
         logging.info('GatewayMessageListener:%s. val->[%s]' % (event, message_value))  
 
 def test_client(kwargs):
-    contractTuples = [('HSI', 'FUT', 'HKFE', 'HKD', '20170330', 0, ''),
-                      ('USD', 'CASH', 'IDEALPRO', 'JPY', '', 0, ''),]
+    contractTuples = [('HSI', 'FUT', 'HKFE', 'HKD', '20170330', 0, '')]#,
+                      #('USD', 'CASH', 'IDEALPRO', 'JPY', '', 0, ''),]
                       
         
     print kwargs 
@@ -406,19 +148,28 @@ def test_client(kwargs):
         
 if __name__ == '__main__':
     
-    if len(sys.argv) != 2:
-        print("Usage: %s <config file>" % sys.argv[0])
-        exit(-1)    
-
+    usage = "usage: %prog [options]"
+    parser = OptionParser(usage=usage)
+    parser.add_option("-c", "--clear_offsets", action="store_true", dest="clear_offsets",
+                      help="delete all redis offsets used by this program")
+    parser.add_option("-f", "--config_file",
+                      action="store", dest="config_file", 
+                      help="path to the config file")
+    
+    (options, args) = parser.parse_args()
+    
+    kwargs = ConfigMap().kwargs_from_file(options.config_file)
+    for option, value in options.__dict__.iteritems():
+        
+        if value <> None:
+            kwargs[option] = value
 
 
-    cfg_path= sys.argv[1:]
-    kwargs = ConfigMap().kwargs_from_file(cfg_path)
-   
-      
     logconfig = kwargs['logconfig']
     logconfig['format'] = '%(asctime)s %(levelname)-8s %(message)s'    
     logging.basicConfig(**logconfig)        
+
+    logging.debug('config settings: %s' % kwargs)
     
     
     test_client(kwargs)

+ 118 - 46
src/comms/ibgw/base_messaging.py

@@ -26,7 +26,7 @@ class Producer(threading.Thread):
     def run(self):
         try:
             producer = KafkaProducer(bootstrap_servers='localhost:9092')
-            topics = ['my-topic', 'my-topic2']
+            topics = ['my_topic', 'my_topic2']
             i = 0
             while True:
                 #today = datetime.date.today()
@@ -99,9 +99,11 @@ class BaseProducer(threading.Thread, Subscriber):
 class BaseConsumer(threading.Thread, Publisher):
     
     #KB_EVENT = "on_kb_event"
+    SLOW_CONSUMER_CHECK_EVERY = 50
+    SLOW_CONSUMER_QUALIFY_NUM = 500
     KB_REACHED_LAST_OFFSET = "on_kb_reached_last_offset"
     
-    #my_topics =  {'my-topic':{}, 'my-topic2':{}}    
+    #my_topics =  {'my_topic':{}, 'my_topic2':{}}    
 
     def __init__(self, group=None, target=None, name=None,
                  args=(), kwargs=None, verbose=None):
@@ -116,9 +118,10 @@ class BaseConsumer(threading.Thread, Publisher):
             redis_db
             group_id
             consumer_id: name 
-            topics: a list of topic strings
-            session_timeout_ms: 
+            topics- a list of topic strings
+            session_timeout_ms 
             consumer_timeout_ms
+            seek_to_end- a list of topics that only wants the latest message
         """
         
         
@@ -127,6 +130,10 @@ class BaseConsumer(threading.Thread, Publisher):
         self.args = args
         self.kwargs = kwargs
         self.rs = Redis(self.kwargs['redis_host'], self.kwargs['redis_port'], self.kwargs['redis_db'])
+        try:
+            self.kwargs['seek_to_end']
+        except KeyError:
+            self.kwargs['seek_to_end'] = []
         self.my_topics = {}
         for t in self.kwargs['topics']:
             self.my_topics[t]= {} 
@@ -136,18 +143,14 @@ class BaseConsumer(threading.Thread, Publisher):
         return
     
     
-    # no use: doesn't work
-    def seek_to_last_read_offset(self, consumer):
-        for t in self.my_topics.keys():
-            po = json.loads(self.rs.get(t))
-            consumer.seek(TopicPartition(topic=t, partition=po['partition']), po['offset'])
+
     
     """
      each consumer has its own set of topics offsets stored in redis
-     for consumer A and consumer B (with different group_ids) subscribing to the same my-topic, each 
+     for consumer A and consumer B (with different group_ids) subscribing to the same my_topic, each 
      each of them will need to keep track of its own offsets
      to make the redis key unique by consumer, the key is created by topic + '@' + consumer name
-     example: my-topic@consumerA
+     example: my_topic@consumerA
      
      offsets = { topic-consumer_name:
                      {
@@ -180,7 +183,6 @@ class BaseConsumer(threading.Thread, Publisher):
             self.rs.delete(self.consumer_topic(t))
             
              
-        #raise NotImplementedException
     
     
     def persist_offsets(self, topic, partition, offset):
@@ -245,38 +247,74 @@ class BaseConsumer(threading.Thread, Publisher):
             
         consumer.subscribe(self.my_topics.keys())
 
-        
-        #consumer.seek_to_end(TopicPartition(topic='my-topic', partition=0))
+        #consumer.seek_to_end(TopicPartition(topic='my_topic', partition=0))
 
         self.done = False
+       
+            
         while self.done <> True:
             try:
                 message = consumer.next()
                 
-                #time.sleep(0.25)
-                if message.offset % 50 == 0:
-                    logging.info( "[%s]:highwater:%d offset:%d part:%d <%s>" % (self.name, consumer.highwater(TopicPartition(message.topic, message.partition)),
-                                                                             message.offset, message.partition, message.value))
-                
-    #             for t, ps in map(lambda t: (t, consumer.partitions_for_topic(t)), self.my_topics.keys()):
-    #                 print "t:%s %s" % (t,  ','.join('p:%d, offset:%d' % (p, consumer.position(TopicPartition(topic=t, partition=p))) for p in ps)) # consumer.position(TopicPartition(topic=t, partition=p)))
-                
-                # if this is the first time the consumer is run
-                # it contains no offsets in the redis map, so it has 
-                # 0 elements in the map,
-                # then insert a new offset in redis and populate
-                # the local my_topics dict
                 
-                if len(self.my_topics[message.topic]) == 0:
+                # the next if block is there to serve information purpose only
+                # it may be useful to detect slow consumer situation
+                if message.offset % BaseConsumer.SLOW_CONSUMER_QUALIFY_NUM == 0:
+                    highwater = consumer.highwater(TopicPartition(message.topic, message.partition))
+                    logging.info( "BaseConsumer [%s]:highwater:%d offset:%d part:%d <%s>" %  (self.name, highwater, message.offset, message.partition, message.value))
+                    
+                    if highwater - message.offset >= BaseConsumer.SLOW_CONSUMER_QUALIFY_NUM:
+                        logging.warn("BaseConsumer:run Slow consumer detected! current: %d, highwater:%d, gap:%d" %
+                                        (message.offset, highwater, highwater - message.offset))
+                # the next block is designed to handle the first time the
+                # consumer encounters a topic partition it hasnt' seen yet
+                try:
+                    # the try catch block ensures that on the first encounter of a topic/partition
+                    # the except block is executed once and only once, thereafter since the 
+                    # dictionary object keys are assigned the exception will never
+                    # be caught again
+                    # 
+                    # the try catch is supposed to be faster than a if-else block...
+                    # 
+                    # the below statement has no meaning, it is there merely to ensure that 
+                    # the block fails on the first run
+                    self.my_topics[message.topic][str(message.partition)] 
+                except KeyError:
+
+                    highwater = consumer.highwater(TopicPartition(message.topic, message.partition))
+                    gap = highwater - message.offset
+                    logging.info( "*** On first iteration: [Topic:%s:Part:%d:Offset:%d]: Number of messages lagging behind= %d. Highwater:%d" 
+                                  % (message.topic, message.partition, message.offset, gap, highwater))
+                                                                                                
+                    for t, ps in map(lambda t: (t, consumer.partitions_for_topic(t)), self.my_topics.keys()):
+                        logging.info ("*** On first iteration: Topic Partition Table: topic:[%s] %s" % (t.rjust(20),  
+                                                         ','.join('partition:%d, offset:%d' % (p, consumer.position(TopicPartition(topic=t, partition=p))) for p in ps)
+                                                         ))
+                        
                     self.persist_offsets(message.topic, message.partition, message.offset)
                     self.my_topics[message.topic] = json.loads(self.rs.get(self.consumer_topic(message.topic)))
-                    #continue
+                        
+                        
+                    if message.topic in self.kwargs['seek_to_end']:
+                        
+                        # if there is no gap                          
+                        if gap == 1:
+                            # the message is valid for dispatching and not to be skipped
+                            self.dispatch(message.topic, self.enrich_message(message))
+                            logging.debug('*** On first iteration: Gap=%d Dispatch this valid message to the listener <%s>' % (gap, message.value))
+                        else: # gap exists
+                            logging.info("*** On first iteration: [Topic:%s:Part:%d:Offset:%d]: Gap:%d Attempting to seek to latest message ..." 
+                                         % (message.topic, message.partition, message.offset, gap))                            
+                            consumer.seek_to_end((TopicPartition(topic=message.topic, partition= message.partition)))
+                            # skip this message from dispatching to listeners
+                            continue
+                
                     
                 """
                     the message.value received from kafaproducer is expected to contain 
                     plain text encoded as a json string
                     the content of message.value is not altered. it's content is stored in a dict object 
-                    with key = 'value' along with additional kafa metadata
+                    with key = 'value' and enriched with additional kafa metadata
                                     
                      
                     it is the subscriber's job to interpret the content stored in the 'value' key. Typically
@@ -293,6 +331,7 @@ class BaseConsumer(threading.Thread, Publisher):
                     # both saved value in redis and current offset are both 0
                     if self.my_topics[message.topic][str(message.partition)] == message.offset and message.offset <> 0:
                         self.dispatch(BaseConsumer.KB_REACHED_LAST_OFFSET, self.enrich_message(message))
+                        self.dispatch(message.topic, self.enrich_message(message))
                         logging.info('********************** reached the last message previously processed %s %d' % (message.topic, message.offset))
                     else:
                         self.persist_offsets(message.topic, message.partition, message.offset)
@@ -325,10 +364,22 @@ class SimpleMessageListener(BaseMessageListener):
     
     def __init__(self, name):
         BaseMessageListener.__init__(self, name)
+        self.cnt_my_topic = 0
+        self.cnt_my_topic2 = 0
     
 #     def on_kb_event(self, param):
 #         print "on_kb_event [%s] %s" % (self.name, param)
-    
+    def my_topic(self, param):
+        if self.cnt_my_topic % 50 == 0:
+            print "SimpleMessageListener:my_topic. %s" % param
+            self.cnt_my_topic += 1
+
+    def my_topic2(self, param):
+        if self.cnt_my_topic2 % 50 == 0:
+            print "SimpleMessageListener:my_topic2. %s" % param
+            self.cnt_my_topic2 += 1
+
+        
     def on_kb_reached_last_offset(self, param):
         print "on_kb_reached_last_offset [%s] %s" % (self.name, param)
 
@@ -519,17 +570,25 @@ def test_prosumer2(mode):
                 pB.join()    
     
     
-
     
 
 class TestProducer(BaseProducer):
     pass
 
 def test_base_proconsumer(mode):
-
+    '''
+        This example demonstrates
+        
+        1) use of consumer_timeout_ms to break out from the consumer.next loop
+        2) how to trap ctrl-c and break out of the running threads
+        3) using Queue to store calls to producer.send_message
+        4) using redis to store the consumer last processed offsets
+        5) use of try-catch block to implement seek_to_latest offset
+        6) inherit and implement MessageListener to subscribe messages dispatched by the consumer 
+    '''
     if mode == 'P':
         #Producer().start()
-        topics = ['my-topic', 'my-topic2']
+        topics = ['my_topic', 'my_topic2']
         tp = TestProducer(name = 'testproducer', kwargs={
                                              'bootstrap_host':'localhost', 'bootstrap_port':9092,
                                              'topics': topics})
@@ -538,26 +597,39 @@ def test_base_proconsumer(mode):
         while True:
             
             #today = datetime.date.today()
-            s = "%d %s test %s" % (i, topics[i%2], time.strftime("%b %d %Y %H:%M:%S"))
-            logging.info(s)
-            tp.send_message(topics[i%2], s)
-            
-            time.sleep(.45)
-            i=i+1
-        
+            try:
+                s = "%d %s test %s" % (i, topics[i%2], time.strftime("%b %d %Y %H:%M:%S"))
+                logging.info(s)
+                tp.send_message(topics[i%2], s)
+                
+                time.sleep(.25)
+                i=i+1
+            except (KeyboardInterrupt, SystemExit):
+                logging.error('caught user interrupt')
+                tp.set_stop()
+                tp.join()
+                sys.exit(-1)
+                    
         
     else:
         
         bc = BaseConsumer(name='bc', kwargs={'redis_host':'localhost', 'redis_port':6379, 'redis_db':0,
                                              'bootstrap_host':'localhost', 'bootstrap_port':9092,
-                                             'group_id':'gid', 'session_timeout_ms':10000, 'topics': ['my-topic', 'my-topic2']})
+                                             'group_id':'gid', 'session_timeout_ms':10000, 'topics': ['my_topic', 'my_topic2'],
+                                             'clear_offsets': True, 'consumer_timeout_ms':1000,
+                                             # uncomment the next line to process messages from since the program was last shut down
+                                             # if seek_to_end is present, for the topic specified the consumer will begin
+                                             # sending the latest message to the listener
+                                             # note that the list only specifies my_topic to receive the latest
+                                             # but not my_topic2. Observe the different behavior by checking the message offset 
+                                             # in the program log
+                                             'seek_to_end': ['my_topic'],            
+                                             }) 
         #bml = BaseMessageListener('bml')
         sml = SimpleMessageListener('simple')
-        #bc.register(BaseConsumer.KB_EVENT, bml)
-        #bc.register(BaseConsumer.KB_REACHED_LAST_OFFSET, bml)
-        bc.register(BaseConsumer.KB_EVENT, sml)
         bc.register(BaseConsumer.KB_REACHED_LAST_OFFSET, sml)
-        
+        bc.register('my_topic', sml)
+        bc.register('my_topic2', sml)
         bc.start()
 
 

+ 173 - 114
src/comms/ibgw/subscription_manager.py

@@ -1,48 +1,136 @@
 #!/usr/bin/env python
 # -*- coding: utf-8 -*-
 import logging
+from time import strftime
+import json
 from misc2.helpers import ContractHelper
 from ib.ext.Contract import Contract
 from comms.ibgw.base_messaging import BaseMessageListener
+from comms.ibgw.tws_event_handler import TWS_event_handler
 
 
 
 class SubscriptionManager(BaseMessageListener):
     
     
-    persist_f = None
+    TICKER_GAP = 1000
     
-    def __init__(self, name, tws_gateway):
+    def __init__(self, name, tws_connection, producer, rs_conn, subscription_key):
         BaseMessageListener.__init__(self, name)
-        self.tws_connect = tws_gateway.tws_connection
-        self.producer = tws_gateway.gw_message_handler
-        self.handle = []    
+        
+        self.tws_connect = tws_connection
+        self.producer = producer
+        self.rs = rs_conn
+        self.subscription_key = subscription_key
+
+        #self.handle = []
         # contract key map to contract ID (index of the handle array)
-        self.tickerId = {}
-   
+        #self.tickerId = {}
+        '''
+            idContractMap has 3 keys
+            
+            next_id keeps track of the next_id to use when subscribing market data from TWS
+            id_contract and contract_id are dict and reverse dict that store the index of id 
+            to contarct and vice versa
+            
+            id_contract: {<int>, <Contract>}
+            contract_id: {<kvs_contract>, <int>}
+            
+        '''
+        self.idContractMap ={'next_id': 0, 'id_contract':{},'contract_id':{}}       
+        # flag to indicate whether to save changes when persist_subscriptions is called       
+        self.is_dirty = False
+
+        self.load_subscriptions()
+        
+            
+
         
         
-    def load_subscription(self, contracts):
-        for c in contracts:
-            #print self.tws_connect.isConnected() 
-            print '%s' % (ContractHelper.printContract(c))
-            self.reqMktData('internal', {'value': ContractHelper.contract2kvstring(c)}) 
+    def load_subscriptions(self):
+        '''
+            the function retrieves a json string representation of a list of {id:contracts}
+            from redis.
+            next, get rid of the contracts that are expired and of type of either fut or opt
+            next, rebuild the internal dict idContractMap['id_contract'] and reverse dict
+            idContractMap['contract_id']
+            gather all the ids in the newly populated dict (which may contain holes due to
+            expired contracts and thus not necessarily a sequence), determine the max id
+            add 1 to it to form the next_id
+            request snapshot and fresh market data from the TWS gateway
             
+        '''
+        def is_outstanding(ic):
+            
+            c = ic[1]
+            today = strftime('%Y%m%d') 
+            if c.m_expiry < today and (c.m_secType == 'OPT' or c.m_secType == 'FUT'):
+                logging.info('initialize_subscription_mgr: ignoring expired contract %s%s%s' % (c.m_expiry, c.m_strike, c.m_right))
+                return False
+            return True
+            
+        # retrieve the id-contract list from db
+        # remap the list by instantiating the string to object
+        # get rid of the already expired contracts
+        saved_iclist = self.get_id_contracts(db=True)
+       
+        if saved_iclist:
+            
+            ic_list= filter(lambda ic:is_outstanding, saved_iclist)
+            # rebuild the internal data map
+            for ic in ic_list:
+                self.idContractMap['id_contract'][ic[0]] = ic[1]
+                self.idContractMap['contract_id'][ContractHelper.makeRedisKeyEx(ic[1])] = ic[0]        
+            
+            # derive the next id by finding the max id
+            max_id = reduce(lambda x,y: max(x,y), self.idContractMap['id_contract'].keys())
+            self.idContractMap['next_id'] = max_id + 1
+            logging.info('SubscriptionManager:load_subscription. the next_id is set to: %d' % (self.idContractMap['next_id']))
+            self.dump()
+            # subscribe market data, first call is normal subscription,
+            # first for snapshot, then subscribe for the latest
+            logging.info('SubscriptionManager:load_subscription. request market data for: %s' % (ic_list))
+            map(lambda ic: self.request_market_data(ic[0], ic[1], snapshot=True), ic_list)
+            map(lambda ic: self.request_market_data(ic[0], ic[1], snapshot=False), ic_list) 
+            
+        else:
+            logging.warn('SubscriptionManager:load_subscription. No saved id:contracts found in redis.')
+             
+        logging.info('SubscriptionManager:load_subscription. Complete populating stored map into idContract dict.')
+    
+    def request_market_data(self, id, contract, snapshot=False):
+        if snapshot:
+            # the call to TWS will return a snapshot follow 
+            # by the subscription being cancelled. Add 1000 to avoid clashing 
+            # with other subscription ids.  
+            self.tws_connect.reqMktData(id + TWS_event_handler.TICKER_GAP, contract, '', True)
+        else:
+            self.tws_connect.reqMktData(id, contract, '', False)
             
-        self.dump()
     
     # returns -1 if not found, else the key id (which could be a zero value)
     def is_subscribed(self, contract):
-        #print self.conId.keys()
-        ckey = ContractHelper.makeRedisKeyEx(contract) 
-        if ckey not in self.tickerId.keys():
+
+        
+        ckey = ContractHelper.makeRedisKeyEx(contract)
+        logging.debug('is_subscribed %s' % ckey)
+        try:
+            return self.idContractMap['contract_id'][ckey]
+        except KeyError:
+            logging.debug('is_subscribed: key not found %s' % ckey)
             return -1
-        else:
-            # note that 0 can be a key 
-            # be careful when checking the return values
-            # check for true false instead of using implicit comparsion
-            return self.tickerId[ckey]
-    
+
+    def add_subscription(self, contract):
+        #
+        # structure of idContractMap ={'next_id': -1, 'id_contract':{}, 'contract_id':{}}
+        #
+        id = self.idContractMap['next_id']
+        self.idContractMap['id_contract'][id] = contract
+        logging.debug('add_subscription %s' % ContractHelper.makeRedisKeyEx(contract))
+        self.idContractMap['contract_id'][ContractHelper.makeRedisKeyEx(contract)] = id        
+        self.idContractMap['next_id'] = id + 1
+  
+        return self.idContractMap['next_id']
 
             
     def reqMktData(self, event, message):
@@ -50,123 +138,94 @@ class SubscriptionManager(BaseMessageListener):
         contract = ContractHelper.kvstring2object(message['value'], Contract)
         #logging.info('SubscriptionManager: reqMktData')
   
-        def add_subscription(contract):
-            self.handle.append(contract)
-            newId = len(self.handle) - 1
-            self.tickerId[ContractHelper.makeRedisKeyEx(contract)] = newId 
-             
-            return newId
-  
         id = self.is_subscribed(contract)
         if id == -1: # not found
-            id = add_subscription(contract)
-
+            
+            id = self.add_subscription(contract)
             #
             # the conId must be set to zero when calling TWS reqMktData
             # otherwise TWS will fail to subscribe the contract
-            
-            self.tws_connect.reqMktData(id, contract, '', False) 
-            
-            
-                   
-            if self.persist_f:
-                logging.debug('SubscriptionManager reqMktData: trigger callback')
-                self.persist_f(self.handle)
+            contract.m_conId = 0
+            self.request_market_data(id, contract, False) 
+            self.is_dirty = True
                 
-            logging.info('SubscriptionManager: reqMktData. Requesting market data, id = %d, contract = %s' % (id, ContractHelper.makeRedisKeyEx(contract)))
+            logging.info('SubscriptionManager:reqMktData. Requesting market data, id = %d, contract = %s' % (id, ContractHelper.makeRedisKeyEx(contract)))
         
         else:    
-            self.tws_connect.reqMktData(1000 + id, contract, '', True)
-            logging.info('SubscriptionManager: reqMktData: contract already subscribed. Request snapshot = %d, contract = %s' % (id, ContractHelper.makeRedisKeyEx(contract)))
+            self.request_market_data(id, contract, True)
+            logging.info('SubscriptionManager:reqMktData. contract already subscribed. Request snapshot = %d, contract = %s' % (id, ContractHelper.makeRedisKeyEx(contract)))
         #self.dump()
 
         #
         # instruct gateway to broadcast new id has been assigned to a new contract
         #
         self.producer.send_message('gw_subscription_changed', self.producer.message_dumps({id: ContractHelper.object2kvstring(contract)}))
-        #>>>self.parent.gw_notify_subscription_changed({id: ContractHelper.object2kvstring(contract)})
-        logging.info('SubscriptionManager reqMktData: gw_subscription_changed: %d:%s' % (id, ContractHelper.makeRedisKeyEx(contract)))
+        logging.info('SubscriptionManager:reqMktData. Publish gw_subscription_changed: %d:%s' % (id, ContractHelper.makeRedisKeyEx(contract)))
         
-    """
-       Client requests to TWS_gateway
-    """
-    def gw_req_subscriptions(self, event, message):
-        
-        #subm = map(lambda i: ContractHelper.contract2kvstring(self.contract_subscription_mgr.handle[i]), range(len(self.contract_subscription_mgr.handle)))
-        #subm = map(lambda i: ContractHelper.object2kvstring(self.contract_subscription_mgr.handle[i]), range(len(self.contract_subscription_mgr.handle)))
-        subm = map(lambda i: (i, ContractHelper.object2kvstring(self.handle[i])),
-                    range(len(self.handle)))
-        
-        
-        if subm:
-            
-            logging.info('SubscriptionManager:gw_req_subscriptions-------\n%s' % ''.join('\n%s:%s' % (str(v[0]).rjust(6), v[1]) for v in subm))
-            self.producer.send_message('gw_subscriptions', self.producer.message_dumps({'subscriptions': subm}))
         
         
 
     # use only after a broken connection is restored
-    # to re request market data 
     def force_resubscription(self):
-        # starting from index 1 of the contract list, call  reqmktdata, and format the result into a list of tuples
-        for i in range(1, len(self.handle)):
-            self.tws_connect.reqMktData(i, self.handle[i], '', False)
-            logging.info('force_resubscription: %s' % ContractHelper.printContract(self.handle[i]))
-       
+       self.load_subscriptions()
             
-    def itemAt(self, id):
-        if id > 0 and id < len(self.handle):
-            return self.handle[id]
-        return -1
+
+    # return id:contract object
+    def get_id_contracts(self, db=False):
+        if db:
+            try:
+                id_contracts = json.loads(self.rs.get(self.subscription_key))
+                
+                def utf2asc(x):
+                    return x if isinstance(x, unicode) else x
+                
+                return map(lambda x: (x[0], ContractHelper.kvstring2contract(utf2asc(x[1]))), id_contracts)
+            except TypeError:
+                logging.error('SubscriptionManager:get_id_contracts. Exception when trying to get id_contracts from redis ***')
+                return None
+        else:
+            return map(lambda x: (x[0], x[1]), 
+                                list(self.idContractMap['id_contract'].iteritems()))
+
+    # return id:contract_strings
+    def get_id_kvs_contracts(self, db):
+        return map(lambda x:(x[0], ContractHelper.contract2kvstring(x[1])), self.get_id_contracts(db))
+    
+    def persist_subscriptions(self):
+         
+
+        if self.is_dirty:
+            # for each id:contract pair in idContractMap['id_contract'] dict, map to a list of (id, kvs_contract) values
+            ic = json.dumps(self.get_id_kvs_contracts(db=False))
+            self.rs.set(self.subscription_key, ic)
+            self.is_dirty = False
+
+            logging.info('Tws_gateway:persist_subscriptions. updating subscription table to redis store %s' % ic)
+            self.dump()
 
     def dump(self):
-        
-        logging.info('subscription manager table:---------------------')
-        logging.info(''.join('%d: {%s},\n' % (i,  ''.join('%s:%s, ' % (k, v) for k, v in self.handle[i].__dict__.iteritems() )\
-                                     if self.handle[i] <> None else ''          ) for i in range(len(self.handle)))\
-                     )
-        
-        #logging.info( ''.join('%s[%d],\n' % (k, v) for k, v in self.conId.iteritems()))
-        logging.info( 'Number of instruments subscribed: %d' % len(self.handle))
+
+        logging.info('subscription manager table:---------------------\n')
+        logging.info(''.join ('\n[%s]:[%s]' % (str(ic[0]).rjust(4), ic[1]) for ic in self.get_id_kvs_contracts(db=False)))
+        logging.info(''.join ('\n[%s]:[%d]' % (k.rjust(20), self.idContractMap['contract_id'][k]) 
+                               for k in sorted(self.idContractMap['contract_id'])))       
+        logging.info( 'Number of instruments subscribed: %d' % self.idContractMap['next_id'])
         logging.info( '------------------------------------------------')
+
+
+    """
+       Client requests to TWS_gateway
+    """
+    def gw_req_subscriptions(self, event, message):
         
-    def register_persistence_callback(self, func):
-        logging.info('subscription manager: registering callback')
-        self.persist_f = func
+        ic = self.get_id_kvs_contracts(db=False)
+        if ic:
+             
+            logging.info('SubscriptionManager:gw_req_subscriptions-------\n%s' % ic)
+            self.producer.send_message('gw_subscriptions', self.producer.message_dumps({'subscriptions': ic}))
         
+       
 
 
-def test_subscription():        
-    s = SubscriptionManager()
-    contractTuple = ('HSI', 'FUT', 'HKFE', 'HKD', '20151029', 0, '')
-    c = ContractHelper.makeContract(contractTuple)   
-    print s.is_subscribed(c)
-    print s.add_subscription(c)
-    print s.is_subscribed(c)
-    s.dump()
-    
-    fr = open('/home/larry-13.04/workspace/finopt/data/subscription-hsio.txt')
-    for l in fr.readlines():
-        if l[0] <> '#':
-             
-            s.add_subscription(ContractHelper.makeContract(tuple([t for t in l.strip('\n').split(',')])))    
-    fr.close()
-    s.dump()
-    
-    fr = open('/home/larry-13.04/workspace/finopt/data/subscription-hsio.txt')
-    for l in fr.readlines():
-        if l[0] <> '#':
-             
-            print s.add_subscription(ContractHelper.makeContract(tuple([t for t in l.strip('\n').split(',')])))    
-    s.dump()
-    contractTuple = ('HSI', 'FUT', 'HKFE', 'HKD', '20151127', 0, '')
-    c = ContractHelper.makeContract(contractTuple)   
-    print s.is_subscribed(c)
-    print s.add_subscription(c)
-    print s.is_subscribed(c), ContractHelper.printContract(s.itemAt(s.is_subscribed(c))) 
-    
-    print 'test itemAt:'
-    contractTuple = ('HSI', 'OPT', 'HKFE', 'HKD', '20151127', 21400, 'C')
-    c = ContractHelper.makeContract(contractTuple)
-    print s.is_subscribed(c), ContractHelper.printContract(s.itemAt(s.is_subscribed(c)))
-    
+
+    

+ 40 - 60
src/comms/ibgw/tws_gateway.py

@@ -4,14 +4,14 @@
 import sys
 import copy
 from time import sleep, strftime
-import ConfigParser
 import logging
 import json
 
 from ib.ext.Contract import Contract
 from ib.ext.EClientSocket import EClientSocket
 
-from misc2.helpers import ContractHelper
+from misc2.helpers import ContractHelper, ConfigMap
+from optparse import OptionParser
 from comms.ibgw.base_messaging import Prosumer
 from comms.ibgw.tws_event_handler import TWS_event_handler
 from comms.ibgw.client_request_handler import ClientRequestHandler
@@ -119,39 +119,13 @@ class TWS_gateway():
 
     def initialize_subscription_mgr(self):
         
-        self.contract_subscription_mgr = SubscriptionManager(self, self)
-        self.contract_subscription_mgr.register_persistence_callback(self.persist_subscriptions)
         
+        self.contract_subscription_mgr = SubscriptionManager(self.kwargs['name'], self.tws_connection, 
+                                                             self.gw_message_handler, 
+                                                             self.get_redis_conn(), self.kwargs['subscription_manager.subscriptions.redis_key'])
         
-        key = self.kwargs["subscription_manager.subscriptions.redis_key"]
-        if self.rs.get(key):
-            #contracts = map(lambda x: ContractHelper.kvstring2contract(x), json.loads(self.rs.get(key)))
-            
-            def is_outstanding(c):
-                
-                today = strftime('%Y%m%d') 
-                if c.m_expiry < today:
-                    logging.info('initialize_subscription_mgr: ignoring expired contract %s%s%s' % (c.m_expiry, c.m_strike, c.m_right))
-                    return False
-                return True
-            
-            contracts = filter(lambda x: is_outstanding(x), 
-                               map(lambda x: ContractHelper.kvstring2object(x, Contract), json.loads(self.rs.get(key))))
-            
-            
-            
-            
-            self.contract_subscription_mgr.load_subscription(contracts)
         
 
-    def persist_subscriptions(self, contracts):
-         
-        key = self.kwargs["subscription_manager.subscriptions.redis_key"]
-        #cs = json.dumps(map(lambda x: ContractHelper.contract2kvstring(x) if x <> None else None, contracts))
-        cs = json.dumps(map(lambda x: ContractHelper.object2kvstring(x) if x <> None else None, contracts))
-        logging.debug('Tws_gateway: updating subscription table to redis store %s' % cs)
-        self.rs.set(key, cs)
-
 
     def initialize_redis(self):
 
@@ -164,6 +138,8 @@ class TWS_gateway():
             logging.error('aborting...')
             sys.exit(-1)
             
+    def get_redis_conn(self):
+        return self.rs
 
     def connect_tws(self):
         if type(self.kwargs['tws_app_id']) <> int:
@@ -207,13 +183,26 @@ class TWS_gateway():
         finally:
             self.tlock.release()          
         
+        
+    
+    def persist_subscription_table(self):
+        self.pcounter = (self.pcounter + 1) % 10
+        if (self.pcounter >= 8):
+            self.contract_subscription_mgr.persist_subscriptions()
+           
+        
 
     def main_loop(self):
         try:
             logging.info('TWS_gateway:main_loop ***** accepting console input...')
+            
+            
+            self.pcounter = 0
             while True: 
                 
-                sleep(.45)
+                sleep(.5)
+                self.persist_subscription_table()
+                
                 
         except (KeyboardInterrupt, SystemExit):
                 logging.error('TWS_gateway: caught user interrupt. Shutting down...')
@@ -225,43 +214,34 @@ class TWS_gateway():
 
 
     
-class ConfigMap():
-    
-    def kwargs_from_file(self, path):
-        cfg = ConfigParser.ConfigParser()            
-        if len(cfg.read(path)) == 0: 
-            raise ValueError, "Failed to open config file [%s]" % path 
-
-        kwargs = {}
-        for section in cfg.sections():
-            optval_list = map(lambda o: (o, cfg.get(section, o)), cfg.options(section)) 
-            for ov in optval_list:
-                try:
-                    
-                    kwargs[ov[0]] = eval(ov[1])
-                except:
-                    continue
-                
-        #logging.debug('ConfigMap: %s' % kwargs)
-        return kwargs
+
         
     
 if __name__ == '__main__':
-    
-    if len(sys.argv) != 2:
-        print("Usage: %s <config file>" % sys.argv[0])
-        exit(-1)    
 
 
+    usage = "usage: %prog [options]"
+    parser = OptionParser(usage=usage)
+    parser.add_option("-c", "--clear_offsets", action="store_true", dest="clear_offsets",
+                      help="delete all redis offsets used by this program")
+    parser.add_option("-f", "--config_file",
+                      action="store", dest="config_file", 
+                      help="path to the config file")
+    
+    (options, args) = parser.parse_args()
+    
+    kwargs = ConfigMap().kwargs_from_file(options.config_file)
+    for option, value in options.__dict__.iteritems():
+        
+        if value <> None:
+            kwargs[option] = value
+
 
-    cfg_path= sys.argv[1:]
-    kwargs = ConfigMap().kwargs_from_file(cfg_path)
-   
-      
     logconfig = kwargs['logconfig']
     logconfig['format'] = '%(asctime)s %(levelname)-8s %(message)s'    
     logging.basicConfig(**logconfig)        
-    
+
+    logging.debug('config settings: %s' % kwargs)
     
     app = TWS_gateway(kwargs)
     

+ 2 - 1
src/config/tws_client_lib.cfg

@@ -17,5 +17,6 @@ redis_db: 0
 group_id: 'TWS_CLI'
 session_timeout_ms: 10000
 topics:['tickSize', 'tickPrice', 'error']
+seek_to_end:['tickSize', 'tickPrice']
 logconfig: { 'filemode': 'w', 'filename': '/tmp/tws_client_lib.log',  'level': logging.INFO}
-#logconfig: {'level': logging.INFO}
+#logconfig: {'level': logging.INFO}

+ 20 - 1
src/misc2/helpers.py

@@ -4,6 +4,7 @@
 import json
 import logging
 import threading
+import ConfigParser
 from ib.ext.Contract import Contract
 from ib.ext.Order import Order
 from ib.ext.ExecutionFilter import ExecutionFilter
@@ -194,4 +195,22 @@ def dict2str(dict):
     return '{'  + ', '.join('"%s" : %s' % (k, '"%s"' % v if type(v) == str else v) for k, v in dict.iteritems()) + '}'   
     
 
-
+class ConfigMap():
+    
+    def kwargs_from_file(self, path):
+        cfg = ConfigParser.ConfigParser()            
+        if len(cfg.read(path)) == 0: 
+            raise ValueError, "Failed to open config file [%s]" % path 
+
+        kwargs = {}
+        for section in cfg.sections():
+            optval_list = map(lambda o: (o, cfg.get(section, o)), cfg.options(section)) 
+            for ov in optval_list:
+                try:
+                    
+                    kwargs[ov[0]] = eval(ov[1])
+                except:
+                    continue
+                
+        #logging.debug('ConfigMap: %s' % kwargs)
+        return kwargs

+ 1 - 1
src/sh/start_twscli.sh

@@ -9,4 +9,4 @@ else
 	FINOPT_HOME=~/l1304/workspace/finopt-ironfly/finopt/src
 fi
 export PYTHONPATH=$FINOPT_HOME:$PYTHONPATH
-python $FINOPT_HOME/comms/ibc/tws_client_lib.py $FINOPT_HOME/config/tws_client_lib.cfg 
+python $FINOPT_HOME/comms/ibc/tws_client_lib.py -f "$FINOPT_HOME/config/tws_client_lib.cfg" -c