瀏覽代碼

go home changes

bobhk 9 年之前
父節點
當前提交
e7ff5e5b51
共有 3 個文件被更改,包括 61 次插入88 次删除
  1. 53 74
      src/comms/ibc/tws_client_lib.py
  2. 0 13
      src/comms/test/b1.py
  3. 8 1
      src/comms/test/quick_test_ib.py

+ 53 - 74
src/comms/ibc/tws_client_lib.py

@@ -75,29 +75,15 @@ class TWS_user():
         logging.info('starting up gateway message handler - kafka Prosumer...')        
         self.gw_message_handler = Prosumer(name='tws_cli_prosumer', kwargs=self.kwargs)
         
-        
-        logging.info('establishing TWS gateway connectivity...')
-        if not self.connect_tws():
-            logging.error('TWS_user: unable to establish connection to IB %s:%d' % 
-                          (self.kwargs['tws_host'], self.kwargs['tws_api_port']))
-            self.disconnect_tws()
-            sys.exit(-1)
-        else:
-            # start heart beat monitor
-            pass
-#             logging.info('starting up IB heart beat monitor...')
-#             self.tlock = Lock()
-#             self.ibh = IbHeartBeat(config)
-#             self.ibh.register_listener([self.on_ib_conn_broken])
-#             self.ibh.run()  
-
-        logging.info('start TWS_event_handler. Entering processing loop...')
+        logging.info('start gw_message_handler. Entering processing loop...')
         self.gw_message_handler.start_prosumer()
 
         logging.info('instantiating gw_command_proxy')        
-        self.gw_command_proxy = GatewayCommandProxy('gw_command_proxy', self.gw_message_handler)
+        self.gw_send_command = GatewayCommandProxy('gw_send_command', self.gw_message_handler)
         logging.info('instantiating listeners subscription manager...')
 
+                
+
         logging.info('**** Completed initialization sequence. ****')
         self.main_loop()
         
@@ -194,14 +180,9 @@ class GatewayCommandProxy():
         self.producer.send_message('gw_req_subscriptions', self.producer.message_dumps(None))
 
 
-                                
+                               
         
         
-    def reqMktData(self, event, items):
-        logging.info("[%s] received %s content:[%s]" % (self.name, event, items))
-        self.producer.send_message('tickPrice', 
-                        self.producer.message_dumps({'field':4, 'typeName':'tickPrice', 'price':1.0682, 'ts':1485661437.83, 'source':'IB', 'tickerId':79, 'canAutoExecute':0}))
-        
 
     
 class GatewayMessageListener(BaseMessageListener):
@@ -210,175 +191,173 @@ class GatewayMessageListener(BaseMessageListener):
         BaseMessageListener.__init__(self, name)
         self.producer = producer
   
-    def tickPrice(self, tickerId, field, price, canAutoExecute):
-        """ generated source for method tickPrice """
-       
     
     
-    def tickPrice(self, event, items):   
-        logging.info("[%s] received %s content:[%s]" % (self.name, event, items))
-
+    def tickPrice(self, event, message_value):  # tickerId, field, price, canAutoExecute):
+        """ generated source for method tickPrice """
+       
    
-    def tickSize(self, tickerId, field, size):
+    def tickSize(self, event, message_value):  # tickerId, field, size):
         """ generated source for method tickSize """
 
    
-    def tickOptionComputation(self, tickerId, field, impliedVol, delta, optPrice, pvDividend, gamma, vega, theta, undPrice):
+    def tickOptionComputation(self, event, message_value):  # tickerId, field, impliedVol, delta, optPrice, pvDividend, gamma, vega, theta, undPrice):
         """ generated source for method tickOptionComputation """
 
    
-    def tickGeneric(self, tickerId, tickType, value):
+    def tickGeneric(self, event, message_value):  # tickerId, tickType, value):
         """ generated source for method tickGeneric """
 
    
-    def tickString(self, tickerId, tickType, value):
+    def tickString(self, event, message_value):  # tickerId, tickType, value):
         """ generated source for method tickString """
 
    
-    def tickEFP(self, tickerId, tickType, basisPoints, formattedBasisPoints, impliedFuture, holdDays, futureExpiry, dividendImpact, dividendsToExpiry):
+    def tickEFP(self, event, message_value):  # tickerId, tickType, basisPoints, formattedBasisPoints, impliedFuture, holdDays, futureExpiry, dividendImpact, dividendsToExpiry):
         """ generated source for method tickEFP """
 
    
-    def orderStatus(self, orderId, status, filled, remaining, avgFillPrice, permId, parentId, lastFillPrice, clientId, whyHeld):
+    def orderStatus(self, event, message_value):  # orderId, status, filled, remaining, avgFillPrice, permId, parentId, lastFillPrice, clientId, whyHeld):
         """ generated source for method orderStatus """
 
    
-    def openOrder(self, orderId, contract, order, orderState):
+    def openOrder(self, event, message_value):  # orderId, contract, order, orderState):
         """ generated source for method openOrder """
 
    
-    def openOrderEnd(self):
+    def openOrderEnd(self, event, message_value):
         """ generated source for method openOrderEnd """
 
    
-    def updateAccountValue(self, key, value, currency, accountName):
+    def updateAccountValue(self, event, message_value):  # key, value, currency, accountName):
         """ generated source for method updateAccountValue """
 
    
-    def updatePortfolio(self, contract, position, marketPrice, marketValue, averageCost, unrealizedPNL, realizedPNL, accountName):
+    def updatePortfolio(self, event, message_value):  # contract, position, marketPrice, marketValue, averageCost, unrealizedPNL, realizedPNL, accountName):
         """ generated source for method updatePortfolio """
 
    
-    def updateAccountTime(self, timeStamp):
+    def updateAccountTime(self, event, message_value):  # timeStamp):
         """ generated source for method updateAccountTime """
 
    
-    def accountDownloadEnd(self, accountName):
+    def accountDownloadEnd(self, event, message_value):  # accountName):
         """ generated source for method accountDownloadEnd """
 
    
-    def nextValidId(self, orderId):
+    def nextValidId(self, event, message_value):  # orderId):
         """ generated source for method nextValidId """
 
    
-    def contractDetails(self, reqId, contractDetails):
+    def contractDetails(self, event, message_value):  # reqId, contractDetails):
         """ generated source for method contractDetails """
 
    
-    def bondContractDetails(self, reqId, contractDetails):
+    def bondContractDetails(self, event, message_value):  # reqId, contractDetails):
         """ generated source for method bondContractDetails """
 
    
-    def contractDetailsEnd(self, reqId):
+    def contractDetailsEnd(self, event, message_value):  # reqId):
         """ generated source for method contractDetailsEnd """
 
    
-    def execDetails(self, reqId, contract, execution):
+    def execDetails(self, event, message_value):  # reqId, contract, execution):
         """ generated source for method execDetails """
 
    
-    def execDetailsEnd(self, reqId):
+    def execDetailsEnd(self, event, message_value):  # reqId):
         """ generated source for method execDetailsEnd """
 
    
-    def updateMktDepth(self, tickerId, position, operation, side, price, size):
+    def updateMktDepth(self, event, message_value):  # tickerId, position, operation, side, price, size):
         """ generated source for method updateMktDepth """
 
    
-    def updateMktDepthL2(self, tickerId, position, marketMaker, operation, side, price, size):
+    def updateMktDepthL2(self, event, message_value):  # tickerId, position, marketMaker, operation, side, price, size):
         """ generated source for method updateMktDepthL2 """
 
    
-    def updateNewsBulletin(self, msgId, msgType, message, origExchange):
+    def updateNewsBulletin(self, event, message_value):  # msgId, msgType, message, origExchange):
         """ generated source for method updateNewsBulletin """
 
    
-    def managedAccounts(self, accountsList):
+    def managedAccounts(self, event, message_value):  # accountsList):
         """ generated source for method managedAccounts """
 
    
-    def receiveFA(self, faDataType, xml):
+    def receiveFA(self, event, message_value):  # faDataType, xml):
         """ generated source for method receiveFA """
 
    
-    def historicalData(self, reqId, date, open, high, low, close, volume, count, WAP, hasGaps):
+    def historicalData(self, event, message_value):  # reqId, date, open, high, low, close, volume, count, WAP, hasGaps):
         """ generated source for method historicalData """
 
    
-    def scannerParameters(self, xml):
+    def scannerParameters(self, event, message_value):  # xml):
         """ generated source for method scannerParameters """
 
    
-    def scannerData(self, reqId, rank, contractDetails, distance, benchmark, projection, legsStr):
+    def scannerData(self, event, message_value):  # reqId, rank, contractDetails, distance, benchmark, projection, legsStr):
         """ generated source for method scannerData """
 
    
-    def scannerDataEnd(self, reqId):
+    def scannerDataEnd(self, event, message_value):  # reqId):
         """ generated source for method scannerDataEnd """
 
    
-    def realtimeBar(self, reqId, time, open, high, low, close, volume, wap, count):
+    def realtimeBar(self, event, message_value):  # reqId, time, open, high, low, close, volume, wap, count):
         """ generated source for method realtimeBar """
 
    
-    def currentTime(self, time):
+    def currentTime(self, event, message_value):  # time):
         """ generated source for method currentTime """
 
    
-    def fundamentalData(self, reqId, data):
+    def fundamentalData(self, event, message_value):  # reqId, data):
         """ generated source for method fundamentalData """
 
    
-    def deltaNeutralValidation(self, reqId, underComp):
+    def deltaNeutralValidation(self, event, message_value):  # reqId, underComp):
         """ generated source for method deltaNeutralValidation """
 
    
-    def tickSnapshotEnd(self, reqId):
+    def tickSnapshotEnd(self, event, message_value):  # reqId):
         """ generated source for method tickSnapshotEnd """
 
    
-    def marketDataType(self, reqId, marketDataType):
+    def marketDataType(self, event, message_value):  # reqId, marketDataType):
         """ generated source for method marketDataType """
 
    
-    def commissionReport(self, commissionReport):
+    def commissionReport(self, event, message_value):  # commissionReport):
         """ generated source for method commissionReport """
 
    
-    def position(self, account, contract, pos, avgCost):
+    def position(self, event, message_value):  # account, contract, pos, avgCost):
         """ generated source for method position """
 
    
-    def positionEnd(self):
+    def positionEnd(self, event, message_value):
         """ generated source for method positionEnd """
 
    
-    def accountSummary(self, reqId, account, tag, value, currency):
+    def accountSummary(self, event, message_value):  # reqId, account, tag, value, currency):
         """ generated source for method accountSummary """
 
    
-    def accountSummaryEnd(self, reqId):
+    def accountSummaryEnd(self, event, message_value):  # reqId):
         """ generated source for method accountSummaryEnd """
 
-    def gw_subscription_changed(self, event, items):
-        logging.info("[%s] received gw_subscription_changed content: [%s]" % (self.name, items))
+
+    def gw_subscription_changed(self, event, message_value):  # event, items):
+        logging.info("[%s] received gw_subscription_changed content: [%s]" % (self.name, message_value))
         #print 'SubscriptionListener:gw_subscription_changed %s' % items
         
         
-    def on_kb_reached_last_offset(self, event, items):
-        logging.info("[%s] received on_kb_reached_last_offset content: [%s]" % (self.name, items))
-        print "on_kb_reached_last_offset [%s] %s" % (self.name, items)
-        
+    def on_kb_reached_last_offset(self, event, message_value):  # event, items):
+        logging.info("[%s] received on_kb_reached_last_offset content: [%s]" % (self.name, message_value))
+        print "on_kb_reached_last_offset [%s] %s" % (self.name, message_value)
+    
 
     
 class ConfigMap():

+ 0 - 13
src/comms/test/b1.py

@@ -1,13 +0,0 @@
-
-
-
-'''
-    create 2 generic communicators
-    
-    the communicator belongs to class Communicator
-    
-    Communicator inherits BaseConsumer and
-    Composes of a BaseProducer
-    
-    
-'''

+ 8 - 1
src/comms/test/quick_test_ib.py

@@ -192,9 +192,16 @@ def test_IB():
     print 'disconnecting...'
     es.eDisconnect()
 
+class x:
+    def f1(self, a, b, c, d):
+        print vars()
 
 if __name__ == '__main__':
     
 
 
-    test_IB()    
+    #test_IB()
+    y = x()
+    y.f1(1, 2, {4:5}, [77,88])
+    vv = {'a': 1, 'c': {4: 5}, 'b': 2, 'd': [6,77, 88]}    
+    y.f1(**vv)