Parcourir la source

alpha stable version of tws client lib

esurfer il y a 9 ans
Parent
commit
9dac9f9a40

+ 251 - 0
src/comms/ibc/base_client_messaging.py

@@ -0,0 +1,251 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+import logging
+
+
+from ib.ext.Contract import Contract
+from misc2.helpers import ContractHelper, ExecutionFilterHelper, OrderHelper
+from comms.ibgw.base_messaging import  BaseMessageListener
+from misc2.observer import NotImplementedException
+         
+
+class GatewayCommandWrapper():
+
+    def __init__(self, producer):
+        self.producer = producer
+  
+    def reqOpenOrders(self):
+        self.producer.send_message('reqOpenOrders', '')
+    
+    def reqIds(self):
+        self.producer.send_message('reqIds', '')
+    
+    def reqNewsBulletins(self):
+        logging.error('reqNewsBulletins: NOT IMPLEMENTED')
+    
+    def cancelNewsBulletins(self):
+        logging.error('cancelNewsBulletins: NOT IMPLEMENTED')
+    
+    def setServerLogLevel(self):
+        logging.error('setServerLogLevel: NOT IMPLEMENTED')
+  
+    def reqAutoOpenOrders(self):
+        logging.error('reqAutoOpenOrders: NOT IMPLEMENTED')
+    
+    def reqAllOpenOrders(self):
+        logging.error('reqAllOpenOrders: NOT IMPLEMENTED')
+    
+    def reqManagedAccts(self):
+        logging.error('reqManagedAccts: NOT IMPLEMENTED')
+    
+    def requestFA(self):
+        logging.error('requestFA: NOT IMPLEMENTED')
+    
+    def reqPositions(self):
+        self.producer.send_message('reqPositions', '')
+        
+    def reqHistoricalData(self):
+        logging.error('reqHistoricalData: NOT IMPLEMENTED')
+        
+    def reqAccountUpdates(self):
+        self.producer.send_message('reqAccountUpdates', '1')
+
+    def reqExecutions(self, exec_filter=None):
+        self.producer.send_message('reqExecutions', ExecutionFilterHelper.object2kvstring(exec_filter) if exec_filter <> None else '')
+
+    def reqMktData(self, contract):
+        self.producer.send_message('reqMktData', ContractHelper.object2kvstring(contract))
+        
+    def reqAccountSummary(self, reqId, group, tags):
+        self.producer.send_message('reqAccountSummary', self.producer.message_dumps([reqId, group, tags]))
+    
+    def placeOrder(self, id, contract, order):
+        self.producer.send_message('placeOrder', 
+                                   self.producer.message_dumps([id, ContractHelper.contract2kvstring(contract), OrderHelper.object2kvstring(order)]))
+    
+        
+
+    def gw_req_subscriptions(self):
+        
+        self.producer.send_message('gw_req_subscriptions', self.producer.message_dumps(None))
+        
+        
+        
+    
+class AbstractGatewayListener(BaseMessageListener):
+    
+    def __init__(self, name):
+        BaseMessageListener.__init__(self, name)
+        
+    
+    def tickPrice(self, event, message_value):  # tickerId, field, price, canAutoExecute):
+        """ generated source for method tickPrice """
+        raise NotImplementedException
+   
+    def tickSize(self, event, message_value):  # tickerId, field, size):
+        """ generated source for method tickSize """
+        raise NotImplementedException
+   
+    def tickOptionComputation(self, event, message_value):  # tickerId, field, impliedVol, delta, optPrice, pvDividend, gamma, vega, theta, undPrice):
+        """ generated source for method tickOptionComputation """
+        raise NotImplementedException
+   
+    def tickGeneric(self, event, message_value):  # tickerId, tickType, value):
+        """ generated source for method tickGeneric """
+        raise NotImplementedException
+   
+    def tickString(self, event, message_value):  # tickerId, tickType, value):
+        """ generated source for method tickString """
+        raise NotImplementedException
+   
+    def tickEFP(self, event, message_value):  # tickerId, tickType, basisPoints, formattedBasisPoints, impliedFuture, holdDays, futureExpiry, dividendImpact, dividendsToExpiry):
+        """ generated source for method tickEFP """
+        raise NotImplementedException
+   
+    def orderStatus(self, event, message_value):  # orderId, status, filled, remaining, avgFillPrice, permId, parentId, lastFillPrice, clientId, whyHeld):
+        """ generated source for method orderStatus """
+        raise NotImplementedException
+   
+    def openOrder(self, event, message_value):  # orderId, contract, order, orderState):
+        """ generated source for method openOrder """
+        raise NotImplementedException
+   
+    def openOrderEnd(self, event, message_value):
+        """ generated source for method openOrderEnd """
+        raise NotImplementedException
+   
+    def updateAccountValue(self, event, message_value):  # key, value, currency, accountName):
+        """ generated source for method updateAccountValue """
+        raise NotImplementedException
+   
+    def updatePortfolio(self, event, message_value):  # contract, position, marketPrice, marketValue, averageCost, unrealizedPNL, realizedPNL, accountName):
+        """ generated source for method updatePortfolio """
+        raise NotImplementedException
+   
+    def updateAccountTime(self, event, message_value):  # timeStamp):
+        """ generated source for method updateAccountTime """
+        raise NotImplementedException
+   
+    def accountDownloadEnd(self, event, message_value):  # accountName):
+        """ generated source for method accountDownloadEnd """
+        raise NotImplementedException
+   
+    def nextValidId(self, event, message_value):  # orderId):
+        """ generated source for method nextValidId """
+        raise NotImplementedException
+   
+    def contractDetails(self, event, message_value):  # reqId, contractDetails):
+        """ generated source for method contractDetails """
+        raise NotImplementedException
+   
+    def bondContractDetails(self, event, message_value):  # reqId, contractDetails):
+        """ generated source for method bondContractDetails """
+        raise NotImplementedException
+   
+    def contractDetailsEnd(self, event, message_value):  # reqId):
+        """ generated source for method contractDetailsEnd """
+        raise NotImplementedException
+   
+    def execDetails(self, event, message_value):  # reqId, contract, execution):
+        """ generated source for method execDetails """
+        raise NotImplementedException
+   
+    def execDetailsEnd(self, event, message_value):  # reqId):
+        """ generated source for method execDetailsEnd """
+        raise NotImplementedException
+   
+    def updateMktDepth(self, event, message_value):  # tickerId, position, operation, side, price, size):
+        """ generated source for method updateMktDepth """
+        raise NotImplementedException
+   
+    def updateMktDepthL2(self, event, message_value):  # tickerId, position, marketMaker, operation, side, price, size):
+        """ generated source for method updateMktDepthL2 """
+        raise NotImplementedException
+   
+    def updateNewsBulletin(self, event, message_value):  # msgId, msgType, message, origExchange):
+        """ generated source for method updateNewsBulletin """
+        raise NotImplementedException
+   
+    def managedAccounts(self, event, message_value):  # accountsList):
+        """ generated source for method managedAccounts """
+        raise NotImplementedException
+   
+    def receiveFA(self, event, message_value):  # faDataType, xml):
+        """ generated source for method receiveFA """
+        raise NotImplementedException
+   
+    def historicalData(self, event, message_value):  # reqId, date, open, high, low, close, volume, count, WAP, hasGaps):
+        """ generated source for method historicalData """
+        raise NotImplementedException
+   
+    def scannerParameters(self, event, message_value):  # xml):
+        """ generated source for method scannerParameters """
+        raise NotImplementedException
+   
+    def scannerData(self, event, message_value):  # reqId, rank, contractDetails, distance, benchmark, projection, legsStr):
+        """ generated source for method scannerData """
+        raise NotImplementedException
+   
+    def scannerDataEnd(self, event, message_value):  # reqId):
+        """ generated source for method scannerDataEnd """
+        raise NotImplementedException
+   
+    def realtimeBar(self, event, message_value):  # reqId, time, open, high, low, close, volume, wap, count):
+        """ generated source for method realtimeBar """
+        raise NotImplementedException
+   
+    def currentTime(self, event, message_value):  # time):
+        """ generated source for method currentTime """
+        raise NotImplementedException
+   
+    def fundamentalData(self, event, message_value):  # reqId, data):
+        """ generated source for method fundamentalData """
+        raise NotImplementedException
+   
+    def deltaNeutralValidation(self, event, message_value):  # reqId, underComp):
+        """ generated source for method deltaNeutralValidation """
+        raise NotImplementedException
+   
+    def tickSnapshotEnd(self, event, message_value):  # reqId):
+        """ generated source for method tickSnapshotEnd """
+        raise NotImplementedException
+   
+    def marketDataType(self, event, message_value):  # reqId, marketDataType):
+        """ generated source for method marketDataType """
+        raise NotImplementedException
+   
+    def commissionReport(self, event, message_value):  # commissionReport):
+        """ generated source for method commissionReport """
+        raise NotImplementedException
+   
+    def position(self, event, message_value):  # account, contract, pos, avgCost):
+        """ generated source for method position """
+        raise NotImplementedException
+   
+    def positionEnd(self, event, message_value):
+        """ generated source for method positionEnd """
+        raise NotImplementedException
+   
+    def accountSummary(self, event, message_value):  # reqId, account, tag, value, currency):
+        """ generated source for method accountSummary """
+        raise NotImplementedException
+   
+    def accountSummaryEnd(self, event, message_value):  # reqId):
+        """ generated source for method accountSummaryEnd """
+        raise NotImplementedException
+
+    def gw_subscription_changed(self, event, message_value):  # event, items):
+        raise NotImplementedException        
+#         logging.info("[%s] received gw_subscription_changed content: [%s]" % (self.name, message_value))
+        
+    def gw_subscriptions(self, event, message_value):
+        raise NotImplementedException        
+      
+    def error(self, event, message_value):
+        raise NotImplementedException
+    
+    def on_kb_reached_last_offset(self, event, message_value):  # event, items):
+        logging.info("[%s] received on_kb_reached_last_offset content: [%s]" % (self.name, message_value))
+        print "on_kb_reached_last_offset [%s] %s" % (self.name, message_value)
+    
+        

+ 3 - 258
src/comms/ibc/tws_client_lib.py

@@ -4,80 +4,20 @@
 import sys
 import copy
 from time import sleep, strftime
-import ConfigParser
+
 import logging
 import json
 
 from ib.ext.Contract import Contract
 
-from misc2.helpers import ContractHelper, ExecutionFilterHelper, OrderHelper
+from misc2.helpers import ContractHelper, ExecutionFilterHelper, OrderHelper, ConfigMap
 from comms.ibgw.base_messaging import Prosumer, BaseMessageListener
 from comms.tws_protocol_helper import TWS_Protocol
 from misc2.observer import NotImplementedException
+from comms.ibc.base_client_messaging import GatewayCommandWrapper, AbstractGatewayListener
 import redis
          
          
-class GatewayCommandWrapper():
-
-    def __init__(self, producer):
-        self.producer = producer
-  
-    def reqOpenOrders(self):
-        self.producer.send_message('reqOpenOrders', '')
-    
-    def reqIds(self):
-        self.producer.send_message('reqIds', '')
-    
-    def reqNewsBulletins(self):
-        logging.error('reqNewsBulletins: NOT IMPLEMENTED')
-    
-    def cancelNewsBulletins(self):
-        logging.error('cancelNewsBulletins: NOT IMPLEMENTED')
-    
-    def setServerLogLevel(self):
-        logging.error('setServerLogLevel: NOT IMPLEMENTED')
-  
-    def reqAutoOpenOrders(self):
-        logging.error('reqAutoOpenOrders: NOT IMPLEMENTED')
-    
-    def reqAllOpenOrders(self):
-        logging.error('reqAllOpenOrders: NOT IMPLEMENTED')
-    
-    def reqManagedAccts(self):
-        logging.error('reqManagedAccts: NOT IMPLEMENTED')
-    
-    def requestFA(self):
-        logging.error('requestFA: NOT IMPLEMENTED')
-    
-    def reqPositions(self):
-        self.producer.send_message('reqPositions', '')
-        
-    def reqHistoricalData(self):
-        logging.error('reqHistoricalData: NOT IMPLEMENTED')
-        
-    def reqAccountUpdates(self):
-        self.producer.send_message('reqAccountUpdates', '1')
-
-    def reqExecutions(self, exec_filter=None):
-        self.producer.send_message('reqExecutions', ExecutionFilterHelper.object2kvstring(exec_filter) if exec_filter <> None else '')
-
-    def reqMktData(self, contract):
-        self.producer.send_message('reqMktData', ContractHelper.object2kvstring(contract))
-        
-    def reqAccountSummary(self, reqId, group, tags):
-        self.producer.send_message('reqAccountSummary', self.producer.message_dumps([reqId, group, tags]))
-    
-    def placeOrder(self, id, contract, order):
-        self.producer.send_message('placeOrder', 
-                                   self.producer.message_dumps([id, ContractHelper.contract2kvstring(contract), OrderHelper.object2kvstring(order)]))
-    
-        
-
-    def gw_req_subscriptions(self):
-        
-        self.producer.send_message('gw_req_subscriptions', self.producer.message_dumps(None))
-
-
          
 class TWS_client_manager(GatewayCommandWrapper):
 
@@ -167,204 +107,9 @@ class TWS_client_manager(GatewayCommandWrapper):
         
         
 
-    
-class AbstractGatewayListener(BaseMessageListener):
-    
-    def __init__(self, name):
-        BaseMessageListener.__init__(self, name)
-        
-    
-    def tickPrice(self, event, message_value):  # tickerId, field, price, canAutoExecute):
-        """ generated source for method tickPrice """
-        raise NotImplementedException
-   
-    def tickSize(self, event, message_value):  # tickerId, field, size):
-        """ generated source for method tickSize """
-        raise NotImplementedException
-   
-    def tickOptionComputation(self, event, message_value):  # tickerId, field, impliedVol, delta, optPrice, pvDividend, gamma, vega, theta, undPrice):
-        """ generated source for method tickOptionComputation """
-        raise NotImplementedException
-   
-    def tickGeneric(self, event, message_value):  # tickerId, tickType, value):
-        """ generated source for method tickGeneric """
-        raise NotImplementedException
-   
-    def tickString(self, event, message_value):  # tickerId, tickType, value):
-        """ generated source for method tickString """
-        raise NotImplementedException
-   
-    def tickEFP(self, event, message_value):  # tickerId, tickType, basisPoints, formattedBasisPoints, impliedFuture, holdDays, futureExpiry, dividendImpact, dividendsToExpiry):
-        """ generated source for method tickEFP """
-        raise NotImplementedException
-   
-    def orderStatus(self, event, message_value):  # orderId, status, filled, remaining, avgFillPrice, permId, parentId, lastFillPrice, clientId, whyHeld):
-        """ generated source for method orderStatus """
-        raise NotImplementedException
-   
-    def openOrder(self, event, message_value):  # orderId, contract, order, orderState):
-        """ generated source for method openOrder """
-        raise NotImplementedException
-   
-    def openOrderEnd(self, event, message_value):
-        """ generated source for method openOrderEnd """
-        raise NotImplementedException
-   
-    def updateAccountValue(self, event, message_value):  # key, value, currency, accountName):
-        """ generated source for method updateAccountValue """
-        raise NotImplementedException
-   
-    def updatePortfolio(self, event, message_value):  # contract, position, marketPrice, marketValue, averageCost, unrealizedPNL, realizedPNL, accountName):
-        """ generated source for method updatePortfolio """
-        raise NotImplementedException
-   
-    def updateAccountTime(self, event, message_value):  # timeStamp):
-        """ generated source for method updateAccountTime """
-        raise NotImplementedException
-   
-    def accountDownloadEnd(self, event, message_value):  # accountName):
-        """ generated source for method accountDownloadEnd """
-        raise NotImplementedException
-   
-    def nextValidId(self, event, message_value):  # orderId):
-        """ generated source for method nextValidId """
-        raise NotImplementedException
-   
-    def contractDetails(self, event, message_value):  # reqId, contractDetails):
-        """ generated source for method contractDetails """
-        raise NotImplementedException
-   
-    def bondContractDetails(self, event, message_value):  # reqId, contractDetails):
-        """ generated source for method bondContractDetails """
-        raise NotImplementedException
-   
-    def contractDetailsEnd(self, event, message_value):  # reqId):
-        """ generated source for method contractDetailsEnd """
-        raise NotImplementedException
-   
-    def execDetails(self, event, message_value):  # reqId, contract, execution):
-        """ generated source for method execDetails """
-        raise NotImplementedException
-   
-    def execDetailsEnd(self, event, message_value):  # reqId):
-        """ generated source for method execDetailsEnd """
-        raise NotImplementedException
-   
-    def updateMktDepth(self, event, message_value):  # tickerId, position, operation, side, price, size):
-        """ generated source for method updateMktDepth """
-        raise NotImplementedException
-   
-    def updateMktDepthL2(self, event, message_value):  # tickerId, position, marketMaker, operation, side, price, size):
-        """ generated source for method updateMktDepthL2 """
-        raise NotImplementedException
-   
-    def updateNewsBulletin(self, event, message_value):  # msgId, msgType, message, origExchange):
-        """ generated source for method updateNewsBulletin """
-        raise NotImplementedException
-   
-    def managedAccounts(self, event, message_value):  # accountsList):
-        """ generated source for method managedAccounts """
-        raise NotImplementedException
-   
-    def receiveFA(self, event, message_value):  # faDataType, xml):
-        """ generated source for method receiveFA """
-        raise NotImplementedException
-   
-    def historicalData(self, event, message_value):  # reqId, date, open, high, low, close, volume, count, WAP, hasGaps):
-        """ generated source for method historicalData """
-        raise NotImplementedException
-   
-    def scannerParameters(self, event, message_value):  # xml):
-        """ generated source for method scannerParameters """
-        raise NotImplementedException
-   
-    def scannerData(self, event, message_value):  # reqId, rank, contractDetails, distance, benchmark, projection, legsStr):
-        """ generated source for method scannerData """
-        raise NotImplementedException
-   
-    def scannerDataEnd(self, event, message_value):  # reqId):
-        """ generated source for method scannerDataEnd """
-        raise NotImplementedException
-   
-    def realtimeBar(self, event, message_value):  # reqId, time, open, high, low, close, volume, wap, count):
-        """ generated source for method realtimeBar """
-        raise NotImplementedException
-   
-    def currentTime(self, event, message_value):  # time):
-        """ generated source for method currentTime """
-        raise NotImplementedException
-   
-    def fundamentalData(self, event, message_value):  # reqId, data):
-        """ generated source for method fundamentalData """
-        raise NotImplementedException
-   
-    def deltaNeutralValidation(self, event, message_value):  # reqId, underComp):
-        """ generated source for method deltaNeutralValidation """
-        raise NotImplementedException
-   
-    def tickSnapshotEnd(self, event, message_value):  # reqId):
-        """ generated source for method tickSnapshotEnd """
-        raise NotImplementedException
-   
-    def marketDataType(self, event, message_value):  # reqId, marketDataType):
-        """ generated source for method marketDataType """
-        raise NotImplementedException
-   
-    def commissionReport(self, event, message_value):  # commissionReport):
-        """ generated source for method commissionReport """
-        raise NotImplementedException
-   
-    def position(self, event, message_value):  # account, contract, pos, avgCost):
-        """ generated source for method position """
-        raise NotImplementedException
-   
-    def positionEnd(self, event, message_value):
-        """ generated source for method positionEnd """
-        raise NotImplementedException
-   
-    def accountSummary(self, event, message_value):  # reqId, account, tag, value, currency):
-        """ generated source for method accountSummary """
-        raise NotImplementedException
-   
-    def accountSummaryEnd(self, event, message_value):  # reqId):
-        """ generated source for method accountSummaryEnd """
-        raise NotImplementedException
 
-    def gw_subscription_changed(self, event, message_value):  # event, items):
-        raise NotImplementedException        
-#         logging.info("[%s] received gw_subscription_changed content: [%s]" % (self.name, message_value))
-        
-    def gw_subscriptions(self, event, message_value):
-        raise NotImplementedException        
-      
-    def error(self, event, message_value):
-        raise NotImplementedException
-    
-    def on_kb_reached_last_offset(self, event, message_value):  # event, items):
-        logging.info("[%s] received on_kb_reached_last_offset content: [%s]" % (self.name, message_value))
-        print "on_kb_reached_last_offset [%s] %s" % (self.name, message_value)
     
 
-    
-class ConfigMap():
-    
-    def kwargs_from_file(self, path):
-        cfg = ConfigParser.ConfigParser()            
-        if len(cfg.read(path)) == 0: 
-            raise ValueError, "Failed to open config file [%s]" % path 
-
-        kwargs = {}
-        for section in cfg.sections():
-            optval_list = map(lambda o: (o, cfg.get(section, o)), cfg.options(section)) 
-            for ov in optval_list:
-                try:
-                    
-                    kwargs[ov[0]] = eval(ov[1])
-                except:
-                    continue
-                
-        #logging.debug('ConfigMap: %s' % kwargs)
-        return kwargs
        
 class GatewayMessageListener(AbstractGatewayListener):   
     def __init__(self, name):

+ 0 - 1
src/comms/ibgw/base_messaging.py

@@ -519,7 +519,6 @@ def test_prosumer2(mode):
                 pB.join()    
     
     
-
     
 
 class TestProducer(BaseProducer):

+ 75 - 45
src/comms/ibgw/subscription_manager.py

@@ -1,6 +1,8 @@
 #!/usr/bin/env python
 # -*- coding: utf-8 -*-
 import logging
+from time import strftime
+import json
 from misc2.helpers import ContractHelper
 from ib.ext.Contract import Contract
 from comms.ibgw.base_messaging import BaseMessageListener
@@ -10,26 +12,48 @@ from comms.ibgw.base_messaging import BaseMessageListener
 class SubscriptionManager(BaseMessageListener):
     
     
-    persist_f = None
     
-    def __init__(self, name, tws_gateway):
+    
+    def __init__(self, name, tws_connection, producer, rs_conn, subscription_key):
         BaseMessageListener.__init__(self, name)
-        self.tws_connect = tws_gateway.tws_connection
-        self.producer = tws_gateway.gw_message_handler
-        self.handle = []    
+        
+        self.tws_connect = tws_connection
+        self.producer = producer
+        self.rs = rs_conn
+        self.subscription_key = subscription_key
+
+        
+        self.handle = []
         # contract key map to contract ID (index of the handle array)
-        self.tickerId = {}
-   
+        self.tickerId = {}   
+        # flag to indicate whether to save changes when persist_subscriptions is called       
+        self.is_dirty = False
+
+        self.load_subscriptions()
+        
+            
+
+        
         
+    def load_subscriptions(self):
         
-    def load_subscription(self, contracts):
-        for c in contracts:
-            #print self.tws_connect.isConnected() 
-            print '%s' % (ContractHelper.printContract(c))
-            self.reqMktData('internal', {'value': ContractHelper.contract2kvstring(c)}) 
+        def is_outstanding(c):
+            
+            today = strftime('%Y%m%d') 
+            if c.m_expiry < today and (c.m_secType == 'OPT' or c.m_secType == 'FUT'):
+                logging.info('initialize_subscription_mgr: ignoring expired contract %s%s%s' % (c.m_expiry, c.m_strike, c.m_right))
+                return False
+            return True
+            
+        if self.rs.get(self.subscription_key):
+            contracts = filter(lambda x: is_outstanding(x), 
+                                   map(lambda x: ContractHelper.kvstring2object(x, Contract), json.loads(self.rs.get(self.subscription_key))))        
+            for c in contracts:
+                logging.info('SubscriptionManager:load_subscription. request market data for: %s' % (ContractHelper.printContract(c)))
+                self.reqMktData('internal-dummy-call', {'value': ContractHelper.contract2kvstring(c)}) 
             
             
-        self.dump()
+        
     
     # returns -1 if not found, else the key id (which could be a zero value)
     def is_subscribed(self, contract):
@@ -54,59 +78,39 @@ class SubscriptionManager(BaseMessageListener):
             self.handle.append(contract)
             newId = len(self.handle) - 1
             self.tickerId[ContractHelper.makeRedisKeyEx(contract)] = newId 
-             
+            
             return newId
   
         id = self.is_subscribed(contract)
         if id == -1: # not found
             id = add_subscription(contract)
-
+            
             #
             # the conId must be set to zero when calling TWS reqMktData
             # otherwise TWS will fail to subscribe the contract
-            
+            contract.m_conId = 0
             self.tws_connect.reqMktData(id, contract, '', False) 
-            
-            
-                   
-            if self.persist_f:
-                logging.debug('SubscriptionManager reqMktData: trigger callback')
-                self.persist_f(self.handle)
+            self.is_dirty = True
                 
-            logging.info('SubscriptionManager: reqMktData. Requesting market data, id = %d, contract = %s' % (id, ContractHelper.makeRedisKeyEx(contract)))
+            logging.info('SubscriptionManager:reqMktData. Requesting market data, id = %d, contract = %s' % (id, ContractHelper.makeRedisKeyEx(contract)))
         
         else:    
             self.tws_connect.reqMktData(1000 + id, contract, '', True)
-            logging.info('SubscriptionManager: reqMktData: contract already subscribed. Request snapshot = %d, contract = %s' % (id, ContractHelper.makeRedisKeyEx(contract)))
+            logging.info('SubscriptionManager:reqMktData. contract already subscribed. Request snapshot = %d, contract = %s' % (id, ContractHelper.makeRedisKeyEx(contract)))
         #self.dump()
 
         #
         # instruct gateway to broadcast new id has been assigned to a new contract
         #
         self.producer.send_message('gw_subscription_changed', self.producer.message_dumps({id: ContractHelper.object2kvstring(contract)}))
-        #>>>self.parent.gw_notify_subscription_changed({id: ContractHelper.object2kvstring(contract)})
-        logging.info('SubscriptionManager reqMktData: gw_subscription_changed: %d:%s' % (id, ContractHelper.makeRedisKeyEx(contract)))
-        
-    """
-       Client requests to TWS_gateway
-    """
-    def gw_req_subscriptions(self, event, message):
+        logging.info('SubscriptionManager:reqMktData. Publish gw_subscription_changed: %d:%s' % (id, ContractHelper.makeRedisKeyEx(contract)))
         
-        #subm = map(lambda i: ContractHelper.contract2kvstring(self.contract_subscription_mgr.handle[i]), range(len(self.contract_subscription_mgr.handle)))
-        #subm = map(lambda i: ContractHelper.object2kvstring(self.contract_subscription_mgr.handle[i]), range(len(self.contract_subscription_mgr.handle)))
-        subm = map(lambda i: (i, ContractHelper.object2kvstring(self.handle[i])),
-                    range(len(self.handle)))
-        
-        
-        if subm:
-            
-            logging.info('SubscriptionManager:gw_req_subscriptions-------\n%s' % ''.join('\n%s:%s' % (str(v[0]).rjust(6), v[1]) for v in subm))
-            self.producer.send_message('gw_subscriptions', self.producer.message_dumps({'subscriptions': subm}))
         
         
 
     # use only after a broken connection is restored
     # to re request market data 
+    #>>>> not enhanced yet...old code
     def force_resubscription(self):
         # starting from index 1 of the contract list, call  reqmktdata, and format the result into a list of tuples
         for i in range(1, len(self.handle)):
@@ -119,9 +123,20 @@ class SubscriptionManager(BaseMessageListener):
             return self.handle[id]
         return -1
 
+
+    def persist_subscriptions(self):
+         
+        if self.is_dirty:
+            cs = json.dumps(map(lambda x: ContractHelper.object2kvstring(x) if x <> None else None, self.handle))
+            logging.info('Tws_gateway:persist_subscriptions. updating subscription table to redis store %s' % cs)
+            self.dump()
+            self.rs.set(self.subscription_key, cs)
+            self.is_dirty = False
+
+
     def dump(self):
         
-        logging.info('subscription manager table:---------------------')
+        logging.info('subscription manager table:---------------------\n')
         logging.info(''.join('%d: {%s},\n' % (i,  ''.join('%s:%s, ' % (k, v) for k, v in self.handle[i].__dict__.iteritems() )\
                                      if self.handle[i] <> None else ''          ) for i in range(len(self.handle)))\
                      )
@@ -129,11 +144,26 @@ class SubscriptionManager(BaseMessageListener):
         #logging.info( ''.join('%s[%d],\n' % (k, v) for k, v in self.conId.iteritems()))
         logging.info( 'Number of instruments subscribed: %d' % len(self.handle))
         logging.info( '------------------------------------------------')
+
+
+    """
+       Client requests to TWS_gateway
+    """
+    def gw_req_subscriptions(self, event, message):
+        
+        #subm = map(lambda i: ContractHelper.contract2kvstring(self.contract_subscription_mgr.handle[i]), range(len(self.contract_subscription_mgr.handle)))
+        #subm = map(lambda i: ContractHelper.object2kvstring(self.contract_subscription_mgr.handle[i]), range(len(self.contract_subscription_mgr.handle)))
+        subm = map(lambda i: (i, ContractHelper.object2kvstring(self.handle[i])),
+                    range(len(self.handle)))
         
-    def register_persistence_callback(self, func):
-        logging.info('subscription manager: registering callback')
-        self.persist_f = func
         
+        if subm:
+            
+            logging.info('SubscriptionManager:gw_req_subscriptions-------\n%s' % ''.join('\n%s:%s' % (str(v[0]).rjust(6), v[1]) for v in subm))
+            self.producer.send_message('gw_subscriptions', self.producer.message_dumps({'subscriptions': subm}))
+
+        
+       
 
 
 def test_subscription():        

+ 20 - 50
src/comms/ibgw/tws_gateway.py

@@ -11,7 +11,7 @@ import json
 from ib.ext.Contract import Contract
 from ib.ext.EClientSocket import EClientSocket
 
-from misc2.helpers import ContractHelper
+from misc2.helpers import ContractHelper, ConfigMap
 from comms.ibgw.base_messaging import Prosumer
 from comms.ibgw.tws_event_handler import TWS_event_handler
 from comms.ibgw.client_request_handler import ClientRequestHandler
@@ -119,39 +119,13 @@ class TWS_gateway():
 
     def initialize_subscription_mgr(self):
         
-        self.contract_subscription_mgr = SubscriptionManager(self, self)
-        self.contract_subscription_mgr.register_persistence_callback(self.persist_subscriptions)
         
+        self.contract_subscription_mgr = SubscriptionManager(self.kwargs['name'], self.tws_connection, 
+                                                             self.gw_message_handler, 
+                                                             self.get_redis_conn(), self.kwargs['subscription_manager.subscriptions.redis_key'])
         
-        key = self.kwargs["subscription_manager.subscriptions.redis_key"]
-        if self.rs.get(key):
-            #contracts = map(lambda x: ContractHelper.kvstring2contract(x), json.loads(self.rs.get(key)))
-            
-            def is_outstanding(c):
-                
-                today = strftime('%Y%m%d') 
-                if c.m_expiry < today:
-                    logging.info('initialize_subscription_mgr: ignoring expired contract %s%s%s' % (c.m_expiry, c.m_strike, c.m_right))
-                    return False
-                return True
-            
-            contracts = filter(lambda x: is_outstanding(x), 
-                               map(lambda x: ContractHelper.kvstring2object(x, Contract), json.loads(self.rs.get(key))))
-            
-            
-            
-            
-            self.contract_subscription_mgr.load_subscription(contracts)
         
 
-    def persist_subscriptions(self, contracts):
-         
-        key = self.kwargs["subscription_manager.subscriptions.redis_key"]
-        #cs = json.dumps(map(lambda x: ContractHelper.contract2kvstring(x) if x <> None else None, contracts))
-        cs = json.dumps(map(lambda x: ContractHelper.object2kvstring(x) if x <> None else None, contracts))
-        logging.debug('Tws_gateway: updating subscription table to redis store %s' % cs)
-        self.rs.set(key, cs)
-
 
     def initialize_redis(self):
 
@@ -164,6 +138,8 @@ class TWS_gateway():
             logging.error('aborting...')
             sys.exit(-1)
             
+    def get_redis_conn(self):
+        return self.rs
 
     def connect_tws(self):
         if type(self.kwargs['tws_app_id']) <> int:
@@ -207,13 +183,26 @@ class TWS_gateway():
         finally:
             self.tlock.release()          
         
+        
+    
+    def persist_subscription_table(self):
+        self.pcounter = (self.pcounter + 1) % 10
+        if (self.pcounter >= 8):
+            self.contract_subscription_mgr.persist_subscriptions()
+           
+        
 
     def main_loop(self):
         try:
             logging.info('TWS_gateway:main_loop ***** accepting console input...')
+            
+            
+            self.pcounter = 0
             while True: 
                 
-                sleep(.45)
+                sleep(.5)
+                self.persist_subscription_table()
+                
                 
         except (KeyboardInterrupt, SystemExit):
                 logging.error('TWS_gateway: caught user interrupt. Shutting down...')
@@ -224,26 +213,7 @@ class TWS_gateway():
 
 
 
-    
-class ConfigMap():
-    
-    def kwargs_from_file(self, path):
-        cfg = ConfigParser.ConfigParser()            
-        if len(cfg.read(path)) == 0: 
-            raise ValueError, "Failed to open config file [%s]" % path 
 
-        kwargs = {}
-        for section in cfg.sections():
-            optval_list = map(lambda o: (o, cfg.get(section, o)), cfg.options(section)) 
-            for ov in optval_list:
-                try:
-                    
-                    kwargs[ov[0]] = eval(ov[1])
-                except:
-                    continue
-                
-        #logging.debug('ConfigMap: %s' % kwargs)
-        return kwargs
         
     
 if __name__ == '__main__':

+ 20 - 1
src/misc2/helpers.py

@@ -4,6 +4,7 @@
 import json
 import logging
 import threading
+import ConfigParser
 from ib.ext.Contract import Contract
 from ib.ext.Order import Order
 from ib.ext.ExecutionFilter import ExecutionFilter
@@ -194,4 +195,22 @@ def dict2str(dict):
     return '{'  + ', '.join('"%s" : %s' % (k, '"%s"' % v if type(v) == str else v) for k, v in dict.iteritems()) + '}'   
     
 
-
+class ConfigMap():
+    
+    def kwargs_from_file(self, path):
+        cfg = ConfigParser.ConfigParser()            
+        if len(cfg.read(path)) == 0: 
+            raise ValueError, "Failed to open config file [%s]" % path 
+
+        kwargs = {}
+        for section in cfg.sections():
+            optval_list = map(lambda o: (o, cfg.get(section, o)), cfg.options(section)) 
+            for ov in optval_list:
+                try:
+                    
+                    kwargs[ov[0]] = eval(ov[1])
+                except:
+                    continue
+                
+        #logging.debug('ConfigMap: %s' % kwargs)
+        return kwargs