bobhk 9 éve
szülő
commit
cc01cf1bc2

+ 7 - 6
src/comms/ibc/gw_ex_request_exit.py

@@ -23,7 +23,7 @@ class MessageListener(AbstractGatewayListener):
 
    
    
-    def position(self, event, account, contract_key, pos, avg_cost):
+    def position(self, event, account, contract_key, position, average_cost):
         """ generated source for method position """
         logging.info('%s [[ %s ]]' % (event, vars()))
    
@@ -112,8 +112,8 @@ def test_client2(kwargs):
     cm.start_manager()
                           
                               
-    #cm.reqPositions()
-    cm.reqAccountUpdates(True, 'U8379890')
+    cm.reqPositions()
+    #cm.reqAccountUpdates(True, 'U8379890')
     
     try:
         logging.info('TWS_gateway:main_loop ***** accepting console input...')
@@ -148,9 +148,10 @@ if __name__ == '__main__':
       'group_id': 'EX_REQUEST',
       'session_timeout_ms': 10000,
       'clear_offsets':  False,
-      'logconfig': {'level': logging.INFO},
-      'topics': ['tickSize', 'tickPrice',  'position', 'positionEnd'],
-      'seek_to_end': ['tickPrice', 'tickSize','position', 'positionEnd'],
+      'logconfig': {'level': logging.INFO, 'filemode': 'w', 'filename': '/tmp/gw_ex.log'},
+      'topics': ['tickSize', 'tickPrice',  'position', 'positionEnd', 'updateAccountValue', 'updatePortfolio', 'updateAccountTime', 'accountDownloadEnd'],
+      'seek_to_end': ['tickPrice', 'tickSize','position', 'positionEnd', 'updateAccountValue', 
+                      'updatePortfolio', 'updateAccountTime', 'accountDownloadEnd']
       }
 
     usage = "usage: %prog [options]"

+ 1 - 1
src/comms/ibc/tws_client_lib.py

@@ -36,7 +36,7 @@ class TWS_client_manager(GatewayCommandWrapper):
       'session_timeout_ms': 10000,
       'clear_offsets':  False,
       
-      'topics': list(TWS_Protocol.topicEvents) + list(TWS_Protocol.gatewayEvents)
+      'topics': list(TWS_Protocol.topicEvents) 
       }
       
                

+ 4 - 3
src/comms/ibgw/base_messaging.py

@@ -1,5 +1,5 @@
  #!/usr/bin/env python
-import threading, logging, time
+import threading, logging, time, traceback
 import sys
 import copy
 import datetime
@@ -311,7 +311,7 @@ class BaseConsumer(threading.Thread, Publisher):
                         
                         
                     if '*' in self.kwargs['seek_to_end'] or message.topic in self.kwargs['seek_to_end']:
-                        print 'baseconsumer run %s %d' % (message.top, gap)
+                        #print 'baseconsumer run %s %d' % (message.topic, gap)
                         # if there is no gap                          
                         if gap == 1:
                             # the message is valid for dispatching and not to be skipped
@@ -359,7 +359,8 @@ class BaseConsumer(threading.Thread, Publisher):
                 continue
             except TypeError:
                 logging.error('BaseConsumer:run. Caught TypeError Exception while processing a message. Malformat json string? %s: %s' % (message.topic, message.value))
-            
+                logging.error(traceback.format_exc())
+                
         consumer.close()   
         logging.info ('******** BaseConsumer exit done.')
 

+ 23 - 42
src/comms/ibgw/tws_event_handler.py

@@ -1,7 +1,5 @@
 #!/usr/bin/env python
 # -*- coding: utf-8 -*-
-from time import strftime
-from datetime import datetime
 from misc2.helpers import ContractHelper
 import logging
 import traceback
@@ -25,7 +23,7 @@ class TWS_event_handler(EWrapper):
     def broadcast_event(self, message, mapping):
 
         try:
-            dict = self.tick_process_message(message, mapping)     
+            dict = self.pre_process_message(message, mapping)     
             logging.info('broadcast_event %s:%s' % (message, dict))
             self.producer.send_message(message, self.producer.message_dumps(dict))    
         except:
@@ -36,21 +34,10 @@ class TWS_event_handler(EWrapper):
             
 
     
-    def tick_process_message(self, message_name, items):
-        #return items
+    def pre_process_message(self, message_name, items):
 
         t = items.copy()
-        # if the tickerId is in the snapshot range
-        # deduct the gap to derive the original tickerId
-        # --- check logic in subscription manager
-#         try:
-#             
-#             if (t['tickerId']  >= TWS_event_handler.TICKER_GAP):
-#                 #print 'tick_process_message************************ SNAPSHOT %d' % t['tickerId']
-#                 t['tickerId'] = t['tickerId']  - TWS_event_handler.TICKER_GAP
-#                 
-#         except (KeyError, ):
-#             pass          
+       
             
         try:
             del(t['self'])
@@ -63,6 +50,8 @@ class TWS_event_handler(EWrapper):
                 #if type(v) in [Contract, Execution, ExecutionFilter, OrderState, Order, CommissionReport]:
             if 'ib.ext.' in str(type(v)):     
                 t[k] = v.__dict__
+            elif 'exceptions.' in str(type(v)):
+                t[k] = '%s:%s' % (str(type(v)), str(v))
             else:
                 t[k] = v
         
@@ -87,7 +76,7 @@ class TWS_event_handler(EWrapper):
     
     def tickOptionComputation(self, tickerId, field, impliedVol, delta, optPrice, pvDividend, gamma, vega, theta, undPrice):
         
-        #self.broadcast_event('tickOptionComputation', self.tick_process_message(vars())) #vars())
+        #self.broadcast_event('tickOptionComputation', self.pre_process_message(vars())) #vars())
         pass
 
     def tickGeneric(self, tickerId, tickType, value):
@@ -103,25 +92,14 @@ class TWS_event_handler(EWrapper):
         pass
 
     def orderStatus(self, orderId, status, filled, remaining, avgFillPrice, permId, parentId, lastFillPrice, clientId, whyHeId):
-        self.broadcast_event('orderStatus', vars())
+        pass
 
     def openOrder(self, orderId, contract, order, state):
-        self.broadcast_event('openOrder', vars())
+        pass
 
     def openOrderEnd(self):
-        self.broadcast_event('openOrderEnd', vars())
-
-#     def updateAccountValue(self, key, value, currency, accountName):
-#         self.broadcast_event('updateAccountValue', vars())
-# 
-#     def updatePortfolio(self, contract, position, marketPrice, marketValue, averageCost, unrealizedPNL, realizedPNL, accountName):
-#         self.broadcast_event('updatePortfolio', vars())
-# 
-#     def updateAccountTime(self, timeStamp):
-#         self.broadcast_event('updateAccountTime', vars())
-# 
-#     def accountDownloadEnd(self, accountName):
-#         self.broadcast_event('accountDownloadEnd', vars())
+        pass
+
     def updateAccountValue(self, key, value, currency, accountName):
         
         logging.info('TWS_event_handler:updateAccountValue. [%s]:%s' % (key.ljust(40), value))
@@ -144,8 +122,6 @@ class TWS_event_handler(EWrapper):
 
     def updateAccountTime(self, timeStamp):
         
-        logging.info('TWS_event_handler:updateAccountTime. last updated at ts=%d' % 
-                     (datetime.fromtimestamp(timeStamp).strftime('%Y-%m-%d %H:%M:%S')))
         self.broadcast_event('updateAccountTime', {'timestamp': timeStamp})
                 
 
@@ -176,7 +152,7 @@ class TWS_event_handler(EWrapper):
 
     def error(self, id=None, errorCode=None, errorMsg=None):
         try:
-            logging.error(self.tick_process_message('error', vars()))
+            logging.error(self.pre_process_message('error', vars()))
             self.broadcast_event('error', {'id': id, 
                                            'errorCode': errorCode, 'errorMsg': errorMsg})
 
@@ -184,11 +160,11 @@ class TWS_event_handler(EWrapper):
             pass
 
     def error_0(self, strvalue=None):
-        logging.error(self.tick_process_message('error_0', vars()))
+        logging.error(self.pre_process_message('error_0', vars()))
         self.broadcast_event('error_0', vars())
 
     def error_1(self, id=None, errorCode=None, errorMsg=None):
-        logging.error(self.tick_process_message('error_1', vars()))        
+        logging.error(self.pre_process_message('error_1', vars()))        
         self.broadcast_event('error_1', vars())
 
     def updateMktDepth(self, tickerId, position, operation, side, price, size):
@@ -201,7 +177,7 @@ class TWS_event_handler(EWrapper):
         self.broadcast_event('updateNewsBulletin', vars())
 
     def managedAccounts(self, accountsList):
-        logging.info(self.tick_process_message('managedAccounts', vars()))
+        logging.info(self.pre_process_message('managedAccounts', vars()))
         self.broadcast_event('managedAccounts', vars())
 
     def receiveFA(self, faDataType, xml):
@@ -247,12 +223,17 @@ class TWS_event_handler(EWrapper):
 
 
     def position(self, account, contract, pos, avgCost):
-        #self.broadcast_event('position', vars())
-        self.broadcast_event('position', vars())
-        
+        contract_key= ContractHelper.makeRedisKeyEx(contract)
+        logging.info('TWS_event_handler:position. [%s]:position= %d' % (contract_key, pos))
+        self.broadcast_event('position', {
+                                'account': account,
+                                'contract_key': contract_key, 
+                                'position': pos, 'average_cost': avgCost
+                                
+                                })        
 
     def positionEnd(self):
-        self.broadcast_event('positionEnd', vars())
+        self.broadcast_event('positionEnd', {})
 
     def accountSummary(self, reqId, account, tag, value, currency):
         self.broadcast_event('accountSummary', vars())

+ 1 - 1
src/comms/ibgw/tws_gateway.py

@@ -41,7 +41,7 @@ class TWS_gateway():
       'session_timeout_ms': 10000,
       'clear_offsets':  False,
       'order_transmit': False,
-      'topics': list(TWS_Protocol.topicMethods) + list(TWS_Protocol.gatewayMethods),
+      'topics': list(TWS_Protocol.topicMethods),
       'reset_db_subscriptions': False
       }
                

+ 1 - 2
src/comms/tws_protocol_helper.py

@@ -17,8 +17,7 @@ class TWS_Protocol:
     
     
     
-    gatewayMethods = ('gw_req_subscriptions',)
-    gatewayEvents = ('gw_subscriptions', 'gw_subscription_changed')
+
     
     aeMethods = ('ae_req_greeks')
     aeEvents = ('ae_req_ack', 'ae_greeks','ae_greeks_snapshot')