ソースを参照

nightly code - buggy in reqAccountUpdates

esurfer 9 年 前
コミット
eba7a87d39

+ 4 - 5
src/comms/ibc/base_client_messaging.py

@@ -42,13 +42,13 @@ class GatewayCommandWrapper():
         logging.error('requestFA: NOT IMPLEMENTED')
     
     def reqPositions(self):
-        self.producer.send_message('reqPositions', '')
+        self.producer.send_message('reqPositions', '{}')
         
     def reqHistoricalData(self):
         logging.error('reqHistoricalData: NOT IMPLEMENTED')
         
-    def reqAccountUpdates(self):
-        self.producer.send_message('reqAccountUpdates', '1')
+    def reqAccountUpdates(self, subscribe, acct_code):
+        self.producer.send_message('reqAccountUpdates', json.dumps({'subscribe': subscribe, 'acct_code': acct_code}))
 
     def reqExecutions(self, exec_filter=None):
         self.producer.send_message('reqExecutions', ExecutionFilterHelper.object2kvstring(exec_filter) if exec_filter <> None else '')
@@ -72,8 +72,7 @@ class GatewayCommandWrapper():
     
         
 
-    def gw_req_subscriptions(self, sender_id):
-        self.producer.send_message('gw_req_subscriptions', self.producer.message_dumps({'sender_id': sender_id}))
+
         
         
         

+ 5 - 4
src/comms/ibc/gw_ex_request_exit.py

@@ -7,7 +7,7 @@ import sys
 
 from ib.ext.Contract import Contract
 from optparse import OptionParser
-from misc2.helpers import ContractHelper
+from misc2.helpers import ContractHelper, HelperFunctions
 from comms.ibgw.base_messaging import Prosumer
 from comms.tws_protocol_helper import TWS_Protocol
 from comms.ibc.tws_client_lib import TWS_client_manager, AbstractGatewayListener
@@ -112,8 +112,8 @@ def test_client2(kwargs):
     cm.start_manager()
                           
                               
-    cm.reqPositions()
-    cm.reqAccountUpdates()
+    #cm.reqPositions()
+    cm.reqAccountUpdates(True, 'U8379890')
     
     try:
         logging.info('TWS_gateway:main_loop ***** accepting console input...')
@@ -182,6 +182,7 @@ if __name__ == '__main__':
     logging.basicConfig(**logconfig)        
     
     
-    test_client(kwargs)
+    test_client2(kwargs)
+
     
      

+ 4 - 3
src/comms/ibgw/base_messaging.py

@@ -311,7 +311,7 @@ class BaseConsumer(threading.Thread, Publisher):
                         
                         
                     if '*' in self.kwargs['seek_to_end'] or message.topic in self.kwargs['seek_to_end']:
-                        
+                        print 'baseconsumer run %s %d' % (message.top, gap)
                         # if there is no gap                          
                         if gap == 1:
                             # the message is valid for dispatching and not to be skipped
@@ -352,12 +352,13 @@ class BaseConsumer(threading.Thread, Publisher):
                         logging.info('********************** reached the last message previously processed %s %d' % (message.topic, message.offset))
                     else:
                         self.persist_offsets(message.topic, message.partition, message.offset)
-                        #self.dispatch(BaseConsumer.KB_EVENT, {'message': message})
-                        #self.dispatch(message.topic, self.enrich_message(message))
                         self.dispatch(message.topic, self.extract_message_content(message))
+                        
             except StopIteration:
                 logging.debug('BaseConsumer:run StopIteration Caught. No new message arriving...')
                 continue
+            except TypeError:
+                logging.error('BaseConsumer:run. Caught TypeError Exception while processing a message. Malformat json string? %s: %s' % (message.topic, message.value))
             
         consumer.close()   
         logging.info ('******** BaseConsumer exit done.')

+ 5 - 5
src/comms/ibgw/client_request_handler.py

@@ -4,7 +4,7 @@ import logging
 import json
 import traceback
 from time import strftime 
-from misc2.helpers import ContractHelper, OrderHelper, ExecutionFilterHelper
+from misc2.helpers import ContractHelper, OrderHelper, ExecutionFilterHelper, HelperFunctions
 from comms.ibgw.base_messaging import BaseMessageListener
 
 from ib.ext.Contract import Contract
@@ -25,13 +25,13 @@ class ClientRequestHandler(BaseMessageListener):
     
     def reqAccountUpdates(self, event, subscribe, acct_code):
         logging.info('ClientRequestHandler - reqAccountUpdates value=%s' % vars())
-        self.tws_connect.reqAccountUpdates(subscribe, acct_code)
+        self.tws_connect.reqAccountUpdates(subscribe, HelperFunctions.utf2asc(acct_code))
     
     def reqAccountSummary(self, event, value):
         logging.info('ClientRequestHandler - reqAccountSummary value=%s' % value)
-        
-        vals = map(lambda x: x.encode('ascii') if isinstance(x, unicode) else x, json.loads(value))
-        self.tws_connect.reqAccountSummary(vals[0], vals[1], vals[2])
+        #old stuff
+#         vals = map(lambda x: x.encode('ascii') if isinstance(x, unicode) else x, json.loads(value))
+#         self.tws_connect.reqAccountSummary(vals[0], vals[1], vals[2])
         
     def reqOpenOrders(self, event, value=None):
         self.tws_connect.reqOpenOrders()

+ 47 - 15
src/comms/ibgw/tws_event_handler.py

@@ -1,6 +1,8 @@
 #!/usr/bin/env python
 # -*- coding: utf-8 -*-
 from time import strftime
+from datetime import datetime
+from misc2.helpers import ContractHelper
 import logging
 import traceback
 from ib.ext.EWrapper import EWrapper
@@ -24,7 +26,7 @@ class TWS_event_handler(EWrapper):
 
         try:
             dict = self.tick_process_message(message, mapping)     
-            logging.info('broadcast_event %s' % dict)
+            logging.info('broadcast_event %s:%s' % (message, dict))
             self.producer.send_message(message, self.producer.message_dumps(dict))    
         except:
             logging.error('broadcast_event: exception while encoding IB event to client:  [%s]' % message)
@@ -35,20 +37,20 @@ class TWS_event_handler(EWrapper):
 
     
     def tick_process_message(self, message_name, items):
-        return items
+        #return items
 
         t = items.copy()
         # if the tickerId is in the snapshot range
         # deduct the gap to derive the original tickerId
         # --- check logic in subscription manager
-        try:
-            
-            if (t['tickerId']  >= TWS_event_handler.TICKER_GAP):
-                #print 'tick_process_message************************ SNAPSHOT %d' % t['tickerId']
-                t['tickerId'] = t['tickerId']  - TWS_event_handler.TICKER_GAP
-                
-        except (KeyError, ):
-            pass          
+#         try:
+#             
+#             if (t['tickerId']  >= TWS_event_handler.TICKER_GAP):
+#                 #print 'tick_process_message************************ SNAPSHOT %d' % t['tickerId']
+#                 t['tickerId'] = t['tickerId']  - TWS_event_handler.TICKER_GAP
+#                 
+#         except (KeyError, ):
+#             pass          
             
         try:
             del(t['self'])
@@ -109,18 +111,48 @@ class TWS_event_handler(EWrapper):
     def openOrderEnd(self):
         self.broadcast_event('openOrderEnd', vars())
 
+#     def updateAccountValue(self, key, value, currency, accountName):
+#         self.broadcast_event('updateAccountValue', vars())
+# 
+#     def updatePortfolio(self, contract, position, marketPrice, marketValue, averageCost, unrealizedPNL, realizedPNL, accountName):
+#         self.broadcast_event('updatePortfolio', vars())
+# 
+#     def updateAccountTime(self, timeStamp):
+#         self.broadcast_event('updateAccountTime', vars())
+# 
+#     def accountDownloadEnd(self, accountName):
+#         self.broadcast_event('accountDownloadEnd', vars())
     def updateAccountValue(self, key, value, currency, accountName):
-        self.broadcast_event('updateAccountValue', vars())
+        
+        logging.info('TWS_event_handler:updateAccountValue. [%s]:%s' % (key.ljust(40), value))
+        self.broadcast_event('updateAccountValue', {'key': key, 
+                                          'value': value, 'currency': currency, 'account':accountName})
+                
 
     def updatePortfolio(self, contract, position, marketPrice, marketValue, averageCost, unrealizedPNL, realizedPNL, accountName):
-        self.broadcast_event('updatePortfolio', vars())
+        contract_key= ContractHelper.makeRedisKeyEx(contract)
+        logging.info('TWS_event_handler:updatePortfolio. [%s]:position= %d' % (contract_key, position))
+        self.broadcast_event('updatePortfolio', {
+                                'contract_key': contract_key, 
+                                'position': position, 'market_price': marketPrice,
+                                'market_value': marketValue, 'average_cost': averageCost, 
+                                'unrealized_PNL': unrealizedPNL, 'realized_PNL': realizedPNL, 
+                                'account': accountName
+                                
+                                })
+                
 
     def updateAccountTime(self, timeStamp):
-        self.broadcast_event('updateAccountTime', vars())
+        
+        logging.info('TWS_event_handler:updateAccountTime. last updated at ts=%d' % 
+                     (datetime.fromtimestamp(timeStamp).strftime('%Y-%m-%d %H:%M:%S')))
+        self.broadcast_event('updateAccountTime', {'timestamp': timeStamp})
+                
 
     def accountDownloadEnd(self, accountName):
-        self.broadcast_event('accountDownloadEnd', vars())
-
+        self.broadcast_event('accountDownloadEnd', {'account':accountName})
+        
+        
     def nextValidId(self, orderId):
         self.broadcast_event('nextValidId', vars())
 

+ 2 - 2
src/config/tws_gateway.cfg

@@ -26,11 +26,11 @@ group_id: 'TWS_GW'
 session_timeout_ms: 10000
 topics: ['reqAccountUpdates', 'reqOpenOrders', 'reqExecutions', 'reqIds', 'reqNewsBulletins', 'cancelNewsBulletins', 'setServerLogLevel', 'reqAccountSummary', 'reqPositions', 'reqAutoOpenOrders', 'reqAllOpenOrders', 'reqManagedAccts', 'requestFA', 'reqMktData', 'reqHistoricalData', 'placeOrder', 'gw_req_subscriptions']
 clear_offsets: False
-seek_to_end:['gw_req_subscriptions', 'reqPositions', 'reqMktData']
+seek_to_end:['reqAccountUpdates', 'reqPositions', 'reqMktData']
 #
 #
 subscription_manager.subscriptions.redis_key: 'subscriptions'
-subscription_manager.topics: ['reqMktData', 'gw_req_subscriptions']
+subscription_manager.topics: ['reqMktData']
 #logconfig: {'filename': '/home/larry-13.04/workspace/finopt/log/tws_gateway.log', 'filemode': 'w','level': logging.INFO}
 logconfig: {'level': logging.INFO,  'filemode': 'w', 'filename':'/tmp/tws_gateway.log'}
 order_transmit: False

+ 7 - 0
src/misc2/helpers.py

@@ -211,6 +211,11 @@ class ContractHelper(BaseHelper):
 def dict2str(dict):
     # enclose strings in double quotes
     return '{'  + ', '.join('"%s" : %s' % (k, '"%s"' % v if type(v) == str else v) for k, v in dict.iteritems()) + '}'   
+
+class HelperFunctions():
+    @staticmethod
+    def utf2asc(x):
+        return x.encode('ascii') if isinstance(x, unicode) else x
     
 
 class ConfigMap():
@@ -232,3 +237,5 @@ class ConfigMap():
                 
         #logging.debug('ConfigMap: %s' % kwargs)
         return kwargs
+    
+