esurfer 9 лет назад
Родитель
Сommit
80c13194aa

+ 15 - 2
src/comms/ibc/gw_ex1.py

@@ -5,7 +5,7 @@ import logging
 import json
 
 from ib.ext.Contract import Contract
-
+from optparse import OptionParser
 from misc2.helpers import ContractHelper
 from comms.ibgw.base_messaging import Prosumer
 from comms.tws_protocol_helper import TWS_Protocol
@@ -83,7 +83,20 @@ if __name__ == '__main__':
       'topics': ['position', 'positionEnd', 'gw_subscriptions', 'gw_subscription_changed']
       }
 
-   
+    usage = "usage: %prog [options]"
+    parser = OptionParser(usage=usage)
+    parser.add_option("-c", "--clear_offsets", action="store_true", dest="clear_offsets",
+                      help="delete all redis offsets used by this program")
+    parser.add_option("-g", "--group_id",
+                      action="store", dest="group_id", 
+                      help="assign group_id to this running instance")
+    
+    (options, args) = parser.parse_args()
+    for option, value in options.__dict__.iteritems():
+        if value <> None:
+            kwargs[option] = value
+            
+    #print kwargs    
       
     logconfig = kwargs['logconfig']
     logconfig['format'] = '%(asctime)s %(levelname)-8s %(message)s'    

+ 1 - 6
src/comms/ibgw/base_messaging.py

@@ -143,11 +143,7 @@ class BaseConsumer(threading.Thread, Publisher):
         return
     
     
-    # no use: doesn't work
-    def seek_to_last_read_offset(self, consumer):
-        for t in self.my_topics.keys():
-            po = json.loads(self.rs.get(t))
-            consumer.seek(TopicPartition(topic=t, partition=po['partition']), po['offset'])
+
     
     """
      each consumer has its own set of topics offsets stored in redis
@@ -187,7 +183,6 @@ class BaseConsumer(threading.Thread, Publisher):
             self.rs.delete(self.consumer_topic(t))
             
              
-        #raise NotImplementedException
     
     
     def persist_offsets(self, topic, partition, offset):

+ 46 - 49
src/comms/ibgw/subscription_manager.py

@@ -48,7 +48,18 @@ class SubscriptionManager(BaseMessageListener):
         
         
     def load_subscriptions(self):
-        
+        '''
+            the function retrieves a json string representation of a list of {id:contracts}
+            from redis.
+            next, get rid of the contracts that are expired and of type of either fut or opt
+            next, rebuild the internal dict idContractMap['id_contract'] and reverse dict
+            idContractMap['contract_id']
+            gather all the ids in the newly populated dict (which may contain holes due to
+            expired contracts and thus not necessarily a sequence), determine the max id
+            add 1 to it to form the next_id
+            request snapshot and fresh market data from the TWS gateway
+            
+        '''
         def is_outstanding(ic):
             
             c = ic[1]
@@ -62,13 +73,30 @@ class SubscriptionManager(BaseMessageListener):
         # remap the list by instantiating the string to object
         # get rid of the already expired contracts
         saved_iclist = self.get_id_contracts(db=True)
+       
         if saved_iclist:
-            ic_list= filter(lambda ic:is_outstanding, map(lambda ic: (ic[0], ContractHelper.kvstring2object(ic[1], Contract)), saved_iclist))
             
-            map(lambda ic: self.request_market_data(ic[0], ic[1], snapshot=False), ic_list) 
-            map(lambda ic: self.request_market_data(ic[0], ic[1], snapshot=True), ic_list)         
+            ic_list= filter(lambda ic:is_outstanding, saved_iclist)
+            # rebuild the internal data map
+            for ic in ic_list:
+                self.idContractMap['id_contract'][ic[0]] = ic[1]
+                self.idContractMap['contract_id'][ContractHelper.makeRedisKeyEx(ic[1])] = ic[0]        
+            
+            # derive the next id by finding the max id
+            max_id = reduce(lambda x,y: max(x,y), self.idContractMap['id_contract'].keys())
+            self.idContractMap['next_id'] = max_id + 1
+            logging.info('SubscriptionManager:load_subscription. the next_id is set to: %d' % (self.idContractMap['next_id']))
+            self.dump()
+            # subscribe market data, first call is normal subscription,
+            # first for snapshot, then subscribe for the latest
             logging.info('SubscriptionManager:load_subscription. request market data for: %s' % (ic_list))
-        
+            map(lambda ic: self.request_market_data(ic[0], ic[1], snapshot=True), ic_list)
+            map(lambda ic: self.request_market_data(ic[0], ic[1], snapshot=False), ic_list) 
+            
+        else:
+            logging.warn('SubscriptionManager:load_subscription. No saved id:contracts found in redis.')
+             
+        logging.info('SubscriptionManager:load_subscription. Complete populating stored map into idContract dict.')
     
     def request_market_data(self, id, contract, snapshot=False):
         if snapshot:
@@ -85,9 +113,11 @@ class SubscriptionManager(BaseMessageListener):
 
         
         ckey = ContractHelper.makeRedisKeyEx(contract)
+        logging.debug('is_subscribed %s' % ckey)
         try:
             return self.idContractMap['contract_id'][ckey]
         except KeyError:
+            logging.debug('is_subscribed: key not found %s' % ckey)
             return -1
 
     def add_subscription(self, contract):
@@ -96,7 +126,8 @@ class SubscriptionManager(BaseMessageListener):
         #
         id = self.idContractMap['next_id']
         self.idContractMap['id_contract'][id] = contract
-        self.idContractMap['contract_id']['ContractHelper.makeRedisKeyEx(contract)'] = id        
+        logging.debug('add_subscription %s' % ContractHelper.makeRedisKeyEx(contract))
+        self.idContractMap['contract_id'][ContractHelper.makeRedisKeyEx(contract)] = id        
         self.idContractMap['next_id'] = id + 1
   
         return self.idContractMap['next_id']
@@ -109,6 +140,7 @@ class SubscriptionManager(BaseMessageListener):
   
         id = self.is_subscribed(contract)
         if id == -1: # not found
+            
             id = self.add_subscription(contract)
             #
             # the conId must be set to zero when calling TWS reqMktData
@@ -143,9 +175,13 @@ class SubscriptionManager(BaseMessageListener):
         if db:
             try:
                 id_contracts = json.loads(self.rs.get(self.subscription_key))
-                return map(lambda x: (x[0], ContractHelper.kv2object(x[1], Contract), id_contracts))
+                
+                def utf2asc(x):
+                    return x if isinstance(x, unicode) else x
+                
+                return map(lambda x: (x[0], ContractHelper.kvstring2contract(utf2asc(x[1]))), id_contracts)
             except TypeError:
-                logging.info('SubscriptionManager:get_id_contracts. Exception when trying to get id_contracts from redis ***')
+                logging.error('SubscriptionManager:get_id_contracts. Exception when trying to get id_contracts from redis ***')
                 return None
         else:
             return map(lambda x: (x[0], x[1]), 
@@ -157,14 +193,7 @@ class SubscriptionManager(BaseMessageListener):
     
     def persist_subscriptions(self):
          
-#         if self.is_dirty:
-#             cs = json.dumps(map(lambda x: ContractHelper.object2kvstring(x) if x <> None else None, self.handle))
-#             logging.info('Tws_gateway:persist_subscriptions. updating subscription table to redis store %s' % cs)
-#             self.dump()
-#             self.rs.set(self.subscription_key, cs)
-#             self.is_dirty = False
-
-        #self.idContractMap ={'next_id': -1, 'id_contract':{}, 'contract_id':{}}
+
         if self.is_dirty:
             # for each id:contract pair in idContractMap['id_contract'] dict, map to a list of (id, kvs_contract) values
             ic = json.dumps(self.get_id_kvs_contracts(db=False))
@@ -198,37 +227,5 @@ class SubscriptionManager(BaseMessageListener):
        
 
 
-def test_subscription():        
-    s = SubscriptionManager()
-    contractTuple = ('HSI', 'FUT', 'HKFE', 'HKD', '20151029', 0, '')
-    c = ContractHelper.makeContract(contractTuple)   
-    print s.is_subscribed(c)
-    print s.add_subscription(c)
-    print s.is_subscribed(c)
-    s.dump()
-    
-    fr = open('/home/larry-13.04/workspace/finopt/data/subscription-hsio.txt')
-    for l in fr.readlines():
-        if l[0] <> '#':
-             
-            s.add_subscription(ContractHelper.makeContract(tuple([t for t in l.strip('\n').split(',')])))    
-    fr.close()
-    s.dump()
-    
-    fr = open('/home/larry-13.04/workspace/finopt/data/subscription-hsio.txt')
-    for l in fr.readlines():
-        if l[0] <> '#':
-             
-            print s.add_subscription(ContractHelper.makeContract(tuple([t for t in l.strip('\n').split(',')])))    
-    s.dump()
-    contractTuple = ('HSI', 'FUT', 'HKFE', 'HKD', '20151127', 0, '')
-    c = ContractHelper.makeContract(contractTuple)   
-    print s.is_subscribed(c)
-    print s.add_subscription(c)
-    print s.is_subscribed(c), ContractHelper.printContract(s.itemAt(s.is_subscribed(c))) 
-    
-    print 'test itemAt:'
-    contractTuple = ('HSI', 'OPT', 'HKFE', 'HKD', '20151127', 21400, 'C')
-    c = ContractHelper.makeContract(contractTuple)
-    print s.is_subscribed(c), ContractHelper.printContract(s.itemAt(s.is_subscribed(c)))
+
     

+ 1 - 0
src/config/tws_client_lib.cfg

@@ -17,5 +17,6 @@ redis_db: 0
 group_id: 'TWS_CLI'
 session_timeout_ms: 10000
 topics:['tickSize', 'tickPrice', 'error']
+seek_to_end:['tickSize', 'tickPrice']
 logconfig: { 'filemode': 'w', 'filename': '/tmp/tws_client_lib.log',  'level': logging.INFO}
 #logconfig: {'level': logging.INFO}