Ver código fonte

tws_client_lib. complete code to connect gateway and send message to TWS

esurfer 9 anos atrás
pai
commit
ed15c4da2a

+ 164 - 154
src/comms/ibc/tws_client_lib.py

@@ -13,111 +13,11 @@ from ib.ext.Contract import Contract
 from misc2.helpers import ContractHelper, ExecutionFilterHelper, OrderHelper
 from comms.ibgw.base_messaging import Prosumer, BaseMessageListener
 from comms.tws_protocol_helper import TWS_Protocol
+from misc2.observer import NotImplementedException
 import redis
          
-class TWS_user():
-
-    
-    # monitor IB connection / heart beat
-#     ibh = None
-#     tlock = None
-#     ib_conn_status = None
-    TWS_CLI_DEFAULT_CONFIG = {
-      'name': 'tws_gateway_client',
-      'bootstrap_host': 'localhost',
-      'bootstrap_port': 9092,
-      'redis_host': 'localhost',
-      'redis_port': 6379,
-      'redis_db': 0,
-      'tws_host': 'localhost',
-      'tws_api_port': 8496,
-      'tws_app_id': 38868,
-      'group_id': 'TWS_CLI',
-      'session_timeout_ms': 10000,
-      'clear_offsets':  False,
-      
-      'topics': list(TWS_Protocol.topicEvents) + list(TWS_Protocol.gatewayEvents)
-      }
-      
-               
-    
-    def __init__(self, kwargs):
-        
-
-             
-        
-        self.kwargs = copy.copy(TWS_user.TWS_CLI_DEFAULT_CONFIG)
-        for key in self.kwargs:
-            if key in kwargs:
-                self.kwargs[key] = kwargs.pop(key)        
-        self.kwargs.update(kwargs)        
-        
-        
-
-
-        '''
-            TWS_user start up sequence
-            
-            1. establish redis connection
-            2. initialize prosumer instance - gateway message handler
-            
-            4. initialize listeners: 
-            5. start the prosumer 
-        
-        '''
-
-        logging.info('starting up TWS_user...')
-        
-        
-        logging.info('establishing redis connection...')
-        self.initialize_redis()
-        
-        logging.info('starting up gateway message handler - kafka Prosumer...')        
-        self.gw_message_handler = Prosumer(name='tws_cli_prosumer', kwargs=self.kwargs)
-        
-        logging.info('start gw_message_handler. Entering processing loop...')
-        self.gw_message_handler.start_prosumer()
-
-        logging.info('instantiating gw_command_proxy')        
-        self.gw_send_command = GatewayCommandProxy('gw_send_command', self.gw_message_handler)
-        logging.info('instantiating listeners subscription manager...')
-
-                
-
-        logging.info('**** Completed initialization sequence. ****')
-        self.main_loop()
-        
-
-
-    def initialize_redis(self):
-
-        self.rs = redis.Redis(self.kwargs['redis_host'], self.kwargs['redis_port'], self.kwargs['redis_db'])
-        try:
-            self.rs.client_list()
-        except redis.ConnectionError:
-            logging.error('TWS_user: unable to connect to redis server using these settings: %s port:%d db:%d' % 
-                          (self.kwargs['redis_host'], self.kwargs['redis_port'], self.kwargs['redis_db']))
-            logging.error('aborting...')
-            sys.exit(-1)
-        
-
-    def main_loop(self):
-        try:
-            logging.info('TWS_user:main_loop ***** accepting console input...')
-            while True: 
-                
-                sleep(.45)
-                
-        except (KeyboardInterrupt, SystemExit):
-                logging.error('TWS_user: caught user interrupt. Shutting down...')
-                self.gw_message_handler.set_stop()
-                self.gw_message_handler.join()
-                logging.info('TWS_user: Service shut down complete...')
-                sys.exit(0)        
-
-
-
-class GatewayCommandProxy():
+         
+class GatewayCommandWrapper():
 
     def __init__(self, producer):
         self.producer = producer
@@ -180,180 +80,261 @@ class GatewayCommandProxy():
         self.producer.send_message('gw_req_subscriptions', self.producer.message_dumps(None))
 
 
-                               
+         
+class TWS_client_manager(GatewayCommandWrapper):
+
+    
+    TWS_CLI_DEFAULT_CONFIG = {
+      'name': 'tws_gateway_client',
+      'bootstrap_host': 'localhost',
+      'bootstrap_port': 9092,
+      'redis_host': 'localhost',
+      'redis_port': 6379,
+      'redis_db': 0,
+      'tws_host': 'localhost',
+      'tws_api_port': 8496,
+      'tws_app_id': 38868,
+      'group_id': 'TWS_CLI',
+      'session_timeout_ms': 10000,
+      'clear_offsets':  False,
+      
+      'topics': list(TWS_Protocol.topicEvents) + list(TWS_Protocol.gatewayEvents)
+      }
+      
+               
+    
+    def __init__(self, kwargs):
+        
+
+        temp_kwargs = copy.copy(kwargs)
+        self.kwargs = copy.copy(TWS_client_manager.TWS_CLI_DEFAULT_CONFIG)
+        for key in self.kwargs:
+            if key in temp_kwargs:
+                self.kwargs[key] = temp_kwargs.pop(key)        
+        self.kwargs.update(temp_kwargs)        
+        
+        '''
+            TWS_client_manager start up sequence
+            
+            1. establish redis connection
+            2. initialize prosumer instance - gateway message handler
+            
+            4. initialize listeners: 
+            5. start the prosumer 
+        
+        '''
+
+        logging.info('starting up TWS_client_manager...')
+        logging.info('establishing redis connection...')
+        self.initialize_redis()
+        
+        logging.info('starting up gateway message handler - kafka Prosumer...')        
+        self.gw_message_handler = Prosumer(name='tws_cli_prosumer', kwargs=self.kwargs)
+        GatewayCommandWrapper.__init__(self, self.gw_message_handler)        
+
+
+
+                
+                
+                    
+        
+        logging.info('**** Completed initialization sequence. ****')
+        
+        
+
+    def start_manager(self):
+        logging.info('start gw_message_handler. Entering processing loop...')
+        self.gw_message_handler.start_prosumer()
+    
+    def add_listener_topics(self, listener, topics):
+        self.gw_message_handler.add_listener_topics(listener, topics)
+
+    def initialize_redis(self):
+
+        self.rs = redis.Redis(self.kwargs['redis_host'], self.kwargs['redis_port'], self.kwargs['redis_db'])
+        try:
+            self.rs.client_list()
+        except redis.ConnectionError:
+            logging.error('TWS_client_manager: unable to connect to redis server using these settings: %s port:%d db:%d' % 
+                          (self.kwargs['redis_host'], self.kwargs['redis_port'], self.kwargs['redis_db']))
+            logging.error('aborting...')
+            sys.exit(-1)
+        
+
+                                  
         
         
 
     
-class GatewayMessageListener(BaseMessageListener):
+class AbstractGatewayListener(BaseMessageListener):
     
-    def __init__(self, name, producer):
+    def __init__(self, name):
         BaseMessageListener.__init__(self, name)
-        self.producer = producer
-  
-    
+        
     
     def tickPrice(self, event, message_value):  # tickerId, field, price, canAutoExecute):
         """ generated source for method tickPrice """
-       
+        raise NotImplementedException
    
     def tickSize(self, event, message_value):  # tickerId, field, size):
         """ generated source for method tickSize """
-
+        raise NotImplementedException
    
     def tickOptionComputation(self, event, message_value):  # tickerId, field, impliedVol, delta, optPrice, pvDividend, gamma, vega, theta, undPrice):
         """ generated source for method tickOptionComputation """
-
+        raise NotImplementedException
    
     def tickGeneric(self, event, message_value):  # tickerId, tickType, value):
         """ generated source for method tickGeneric """
-
+        raise NotImplementedException
    
     def tickString(self, event, message_value):  # tickerId, tickType, value):
         """ generated source for method tickString """
-
+        raise NotImplementedException
    
     def tickEFP(self, event, message_value):  # tickerId, tickType, basisPoints, formattedBasisPoints, impliedFuture, holdDays, futureExpiry, dividendImpact, dividendsToExpiry):
         """ generated source for method tickEFP """
-
+        raise NotImplementedException
    
     def orderStatus(self, event, message_value):  # orderId, status, filled, remaining, avgFillPrice, permId, parentId, lastFillPrice, clientId, whyHeld):
         """ generated source for method orderStatus """
-
+        raise NotImplementedException
    
     def openOrder(self, event, message_value):  # orderId, contract, order, orderState):
         """ generated source for method openOrder """
-
+        raise NotImplementedException
    
     def openOrderEnd(self, event, message_value):
         """ generated source for method openOrderEnd """
-
+        raise NotImplementedException
    
     def updateAccountValue(self, event, message_value):  # key, value, currency, accountName):
         """ generated source for method updateAccountValue """
-
+        raise NotImplementedException
    
     def updatePortfolio(self, event, message_value):  # contract, position, marketPrice, marketValue, averageCost, unrealizedPNL, realizedPNL, accountName):
         """ generated source for method updatePortfolio """
-
+        raise NotImplementedException
    
     def updateAccountTime(self, event, message_value):  # timeStamp):
         """ generated source for method updateAccountTime """
-
+        raise NotImplementedException
    
     def accountDownloadEnd(self, event, message_value):  # accountName):
         """ generated source for method accountDownloadEnd """
-
+        raise NotImplementedException
    
     def nextValidId(self, event, message_value):  # orderId):
         """ generated source for method nextValidId """
-
+        raise NotImplementedException
    
     def contractDetails(self, event, message_value):  # reqId, contractDetails):
         """ generated source for method contractDetails """
-
+        raise NotImplementedException
    
     def bondContractDetails(self, event, message_value):  # reqId, contractDetails):
         """ generated source for method bondContractDetails """
-
+        raise NotImplementedException
    
     def contractDetailsEnd(self, event, message_value):  # reqId):
         """ generated source for method contractDetailsEnd """
-
+        raise NotImplementedException
    
     def execDetails(self, event, message_value):  # reqId, contract, execution):
         """ generated source for method execDetails """
-
+        raise NotImplementedException
    
     def execDetailsEnd(self, event, message_value):  # reqId):
         """ generated source for method execDetailsEnd """
-
+        raise NotImplementedException
    
     def updateMktDepth(self, event, message_value):  # tickerId, position, operation, side, price, size):
         """ generated source for method updateMktDepth """
-
+        raise NotImplementedException
    
     def updateMktDepthL2(self, event, message_value):  # tickerId, position, marketMaker, operation, side, price, size):
         """ generated source for method updateMktDepthL2 """
-
+        raise NotImplementedException
    
     def updateNewsBulletin(self, event, message_value):  # msgId, msgType, message, origExchange):
         """ generated source for method updateNewsBulletin """
-
+        raise NotImplementedException
    
     def managedAccounts(self, event, message_value):  # accountsList):
         """ generated source for method managedAccounts """
-
+        raise NotImplementedException
    
     def receiveFA(self, event, message_value):  # faDataType, xml):
         """ generated source for method receiveFA """
-
+        raise NotImplementedException
    
     def historicalData(self, event, message_value):  # reqId, date, open, high, low, close, volume, count, WAP, hasGaps):
         """ generated source for method historicalData """
-
+        raise NotImplementedException
    
     def scannerParameters(self, event, message_value):  # xml):
         """ generated source for method scannerParameters """
-
+        raise NotImplementedException
    
     def scannerData(self, event, message_value):  # reqId, rank, contractDetails, distance, benchmark, projection, legsStr):
         """ generated source for method scannerData """
-
+        raise NotImplementedException
    
     def scannerDataEnd(self, event, message_value):  # reqId):
         """ generated source for method scannerDataEnd """
-
+        raise NotImplementedException
    
     def realtimeBar(self, event, message_value):  # reqId, time, open, high, low, close, volume, wap, count):
         """ generated source for method realtimeBar """
-
+        raise NotImplementedException
    
     def currentTime(self, event, message_value):  # time):
         """ generated source for method currentTime """
-
+        raise NotImplementedException
    
     def fundamentalData(self, event, message_value):  # reqId, data):
         """ generated source for method fundamentalData """
-
+        raise NotImplementedException
    
     def deltaNeutralValidation(self, event, message_value):  # reqId, underComp):
         """ generated source for method deltaNeutralValidation """
-
+        raise NotImplementedException
    
     def tickSnapshotEnd(self, event, message_value):  # reqId):
         """ generated source for method tickSnapshotEnd """
-
+        raise NotImplementedException
    
     def marketDataType(self, event, message_value):  # reqId, marketDataType):
         """ generated source for method marketDataType """
-
+        raise NotImplementedException
    
     def commissionReport(self, event, message_value):  # commissionReport):
         """ generated source for method commissionReport """
-
+        raise NotImplementedException
    
     def position(self, event, message_value):  # account, contract, pos, avgCost):
         """ generated source for method position """
-
+        raise NotImplementedException
    
     def positionEnd(self, event, message_value):
         """ generated source for method positionEnd """
-
+        raise NotImplementedException
    
     def accountSummary(self, event, message_value):  # reqId, account, tag, value, currency):
         """ generated source for method accountSummary """
-
+        raise NotImplementedException
    
     def accountSummaryEnd(self, event, message_value):  # reqId):
         """ generated source for method accountSummaryEnd """
-
+        raise NotImplementedException
 
     def gw_subscription_changed(self, event, message_value):  # event, items):
-        logging.info("[%s] received gw_subscription_changed content: [%s]" % (self.name, message_value))
-        #print 'SubscriptionListener:gw_subscription_changed %s' % items
-        
+        raise NotImplementedException        
+#         logging.info("[%s] received gw_subscription_changed content: [%s]" % (self.name, message_value))
         
+    def gw_subscriptions(self, event, message_value):
+        raise NotImplementedException        
+      
     def on_kb_reached_last_offset(self, event, message_value):  # event, items):
         logging.info("[%s] received on_kb_reached_last_offset content: [%s]" % (self.name, message_value))
         print "on_kb_reached_last_offset [%s] %s" % (self.name, message_value)
@@ -379,8 +360,37 @@ class ConfigMap():
                 
         #logging.debug('ConfigMap: %s' % kwargs)
         return kwargs
-        
+       
+class GatewayMessageListener(AbstractGatewayListener):   
+    def __init__(self, name):
+        AbstractGatewayListener.__init__(self, name)
+             
+    def tickPrice(self, event, message_value):  # tickerId, field, price, canAutoExecute):
+        logging.info('GatewayMessageListener:tickPrice. val->[%s]' % message_value)
+
+def test_client(kwargs):
+    contractTuple = ('USO', 'STK', 'SMART', 'USD', '', 0.0, '')
+    contract = ContractHelper.makeContract(contractTuple)    
+    print kwargs 
+    cm = TWS_client_manager(kwargs)
+    cl = AbstractGatewayListener('gw_client_message_listener')
     
+    cm.add_listener_topics(cl, kwargs['topics'])
+    cm.start_manager()
+    cm.reqMktData(contract)
+    try:
+        logging.info('TWS_gateway:main_loop ***** accepting console input...')
+        while True: 
+        
+            sleep(.45)
+        
+    except (KeyboardInterrupt, SystemExit):
+        logging.error('TWS_client_manager: caught user interrupt. Shutting down...')
+        cm.gw_message_handler.set_stop()
+        cm.join()
+        logging.info('TWS_client_manager: Service shut down complete...')
+           
+        
 if __name__ == '__main__':
     
     if len(sys.argv) != 2:
@@ -397,7 +407,7 @@ if __name__ == '__main__':
     logconfig['format'] = '%(asctime)s %(levelname)-8s %(message)s'    
     logging.basicConfig(**logconfig)        
     
-    
-    app = TWS_user(kwargs)
+    print ContractHelper.kvstring2object('{"m_conId": 0, "m_symbol": "USO", "m_secType": "STK", "m_includeExpired": false, "m_right": "", "m_expiry": "", "m_currency": "USD", "m_exchange": "SMART", "m_strike": 0.0}', Contract)
+    #test_client(kwargs)
     
      

+ 3 - 3
src/comms/ibgw/base_messaging.py

@@ -195,7 +195,7 @@ class BaseConsumer(threading.Thread, Publisher):
         self.done = True
     
     def run(self):
-        print '%s:%s started' % (self.kwargs['group_id'], self.name)
+        logging.info('BaseConsumer:run. %s:%s started' % (self.kwargs['group_id'], self.name))
         
         if self.kwargs['clear_offsets'] == True:
             self.clear_offsets()
@@ -240,7 +240,7 @@ class BaseConsumer(threading.Thread, Publisher):
             if self.rs.keys(self.consumer_topic(topic)):
                 self.my_topics[topic] = json.loads(self.rs.get(self.consumer_topic(topic)))
 
-        print self.my_topics
+        logging.info('BaseConsumer:run. Topics subscribed: %s' % self.my_topics)
         
             
         consumer.subscribe(self.my_topics.keys())
@@ -284,7 +284,7 @@ class BaseConsumer(threading.Thread, Publisher):
                       
                 """
                 if self.my_topics[message.topic][str(message.partition)] > message.offset:
-                    print '********************** old message...discarding %s %d' % (message.topic, message.offset)
+                    logging.info('BaseConsumer ********************** old message...discarding %s %d' % (message.topic, message.offset))
                 else:
                     #if self.my_topics[message.topic][str(message.partition)] == message.offset:
                     # if the stored offset in redis equals to the current offset

+ 1 - 1
src/comms/ibgw/client_request_handler.py

@@ -80,7 +80,7 @@ class ClientRequestHandler(BaseMessageListener):
         self.tws_connect.requestFA(1)
     
     
-    def reqMktData(self, sm_contract):
+    def reqMktData(self, event, sm_contract):
         logging.info('ClientRequestHandler received reqMktData request: %s' % sm_contract)
         try:
             #self.contract_subscription_mgr.reqMktData(ContractHelper.kvstring2contract(sm_contract))

+ 3 - 2
src/comms/ibgw/subscription_manager.py

@@ -15,6 +15,7 @@ class SubscriptionManager(BaseMessageListener):
     def __init__(self, name, tws_gateway):
         BaseMessageListener.__init__(self, name)
         self.tws_connect = tws_gateway.tws_connection
+        self.producer = tws_gateway.gw_message_handler
         self.handle = []    
         # contract key map to contract ID (index of the handle array)
         self.tickerId = {}
@@ -56,9 +57,9 @@ class SubscriptionManager(BaseMessageListener):
 #             
 
             
-    def reqMktData(self, kvs_contract):
+    def reqMktData(self, event, message):
                   
-        contract = ContractHelper.kvstring2object(kvs_contract, Contract)
+        contract = ContractHelper.kvstring2object(message['value'], Contract)
         #logging.info('SubscriptionManager: reqMktData')
   
         def add_subscription(contract):

+ 4 - 2
src/comms/ibgw/tws_gateway.py

@@ -102,8 +102,6 @@ class TWS_gateway():
 #             self.ibh.register_listener([self.on_ib_conn_broken])
 #             self.ibh.run()  
 
-        logging.info('start TWS_event_handler. Entering processing loop...')
-        self.gw_message_handler.start_prosumer()
 
         logging.info('instantiating listeners...cli_req_handler')        
         self.cli_req_handler = ClientRequestHandler('client_request_handler', self)
@@ -113,6 +111,10 @@ class TWS_gateway():
         self.gw_message_handler.add_listeners([self.cli_req_handler])
         self.gw_message_handler.add_listener_topics(self.contract_subscription_mgr, ['reqMktData'])
 
+        logging.info('start TWS_event_handler. Start prosumer processing loop...')
+        self.gw_message_handler.start_prosumer()
+
+
         logging.info('**** Completed initialization sequence. ****')
         self.main_loop()
         

+ 14 - 7
src/comms/test/quick_test_ib.py

@@ -13,6 +13,7 @@ from threading import Lock
 from ib.ext.Contract import Contract
 from ib.ext.EWrapper import EWrapper
 from ib.ext.EClientSocket import EClientSocket
+from misc2.helpers import ContractHelper
 
 class Wrapger(EWrapper):
     def tickPrice(self, tickerId, field, price, canAutoExecute):
@@ -175,6 +176,7 @@ class Wrapger(EWrapper):
 
     def error(self, id=None, errorCode=None, errorMsg=None):
         """ generated source for method accountSummaryEnd """
+        print errorMsg
 
     def error_0(self, strvalue=None):
         """ generated source for method accountSummaryEnd """
@@ -186,9 +188,14 @@ class Wrapger(EWrapper):
 def test_IB():
     ew = Wrapger()
     es = EClientSocket(ew)
-    es.eConnect('localhost', 7496, 5555)
+    es.eConnect('localhost', 4001, 5555)
     print es.isConnected()
-    sleep(2)
+    
+    contractTuple = ('GOOG', 'STK', 'SMART', 'USD', '', 0.0, '')
+    contract = ContractHelper.makeContract(contractTuple)
+    es.reqMktData(0, contract, '', False) 
+                
+    sleep(5)
     print 'disconnecting...'
     es.eDisconnect()
 
@@ -200,8 +207,8 @@ if __name__ == '__main__':
     
 
 
-    #test_IB()
-    y = x()
-    y.f1(1, 2, {4:5}, [77,88])
-    vv = {'a': 1, 'c': {4: 5}, 'b': 2, 'd': [6,77, 88]}    
-    y.f1(**vv)
+    test_IB()
+#     y = x()
+#     y.f1(1, 2, {4:5}, [77,88])
+#     vv = {'a': 1, 'c': {4: 5}, 'b': 2, 'd': [6,77, 88]}    
+#     y.f1(**vv)

+ 2 - 2
src/config/tws_gateway.cfg

@@ -17,8 +17,8 @@ redis_db: 0
 # 7496 - production larry046, 7496 - development,  8496 production mchan927
 #
 tws_host: 'localhost'
-tws_api_port: 7496
-tws_app_id: 749601 
+tws_api_port: 4001
+tws_app_id: 74001 
 #
 #
 #

+ 1 - 0
src/misc2/observer.py

@@ -37,6 +37,7 @@ class Publisher:
     def dispatch(self, event, params=None):
         
         for subscriber, callback in self.get_subscribers(event).items():
+            print 'observer:: subscriber**** %s' % subscriber
             callback(event, params)
             
 #############################################################