Bladeren bron

monday release

add support for getting contract details
fix order validation
minor enhancement in ib_mds
laxaurus 6 jaren geleden
bovenliggende
commit
f3442a56f6

+ 47 - 30
src/cep/ib_mds.py

@@ -16,6 +16,7 @@ from kafka import KafkaProducer
 from misc2.observer import Publisher
 from ib.opt import ibConnection
 import importlib
+import copy
 ## 
 ## to run, start kafka server on vsu-01 <administrator, password>
 ## start ibgw or tws
@@ -31,13 +32,28 @@ class IbKafkaProducer(Publisher):
     
     EVENT_READ_FILE_LINE = 'event_read_file_line'
     PUBLISH_EVENTS = [EVENT_READ_FILE_LINE, IB_TICK_PRICE, IB_TICK_SIZE, IB_TICK_OPTION_COMPUTATION]
-    
-    def __init__(self, config, replay = False):
+    DEFAULT_CONFIG = {
+      'name': 'ib_mds',
+      'group_id': 'mds',
+      'session_timeout_ms': 10000,
+      'clear_offsets':  False,
+      'order_transmit': False
+    }    
+    def __init__(self, config):
         
         Publisher.__init__(self, IbKafkaProducer.PUBLISH_EVENTS)
         
+
+        
         
-        self.config = config
+        temp_kwargs = copy.copy(config)
+        self.config = copy.copy(IbKafkaProducer.DEFAULT_CONFIG)
+        for key in self.config:
+            if key in temp_kwargs:
+                self.config[key] = temp_kwargs.pop(key)        
+        self.config.update(temp_kwargs)            
+        
+
         self.tlock = Lock()
         self.persist = {}
         self.quit = False
@@ -45,30 +61,30 @@ class IbKafkaProducer(Publisher):
         self.id2contract = {'conId': 1, 'id2contracts': {} }
         kafka_host = self.config["kafka.host"]
         kafka_port =  self.config["kafka.port"]
-        self.persist['is_persist'] = config["ib_mds.is_persist"]
-        self.persist['persist_dir'] =config["ib_mds.persist_dir"]
+        self.persist['is_persist'] = self.config["ib_mds.is_persist"]
+        self.persist['persist_dir'] =self.config["ib_mds.persist_dir"]
         self.persist['file_exist'] = False
-        self.persist['spill_over_limit'] = int(config["ib_mds.spill_over_limit"])
-        self.load_processors(config['ib_mds.processors'])       
-        IbKafkaProducer.IB_TICK_PRICE = config["kafka.ib.topic.tick_price"]
-        IbKafkaProducer.IB_TICK_SIZE = config["kafka.ib.topic.tick_size"]
+        self.persist['spill_over_limit'] = int(self.config["ib_mds.spill_over_limit"])
+        #self.load_processors(config['ib_mds.processors'])       
+        IbKafkaProducer.IB_TICK_PRICE = self.config["kafka.ib.topic.tick_price"]
+        IbKafkaProducer.IB_TICK_SIZE = self.config["kafka.ib.topic.tick_size"]
         logging.info('******* Starting IbKafkaProducer')
         logging.info('IbKafkaProducer: connecting to kafka host: %s...' % kafka_host)
         logging.info('IbKafkaProducer: message mode is async')
 
-        kwargs = {
-          'name': 'ib_mds',
-          'bootstrap_host': kafka_host,
-          'bootstrap_port': kafka_port,
-          'group_id': 'mds',
-          'session_timeout_ms': 10000,
-          'clear_offsets':  False,
-          'order_transmit': False
-          }
-
-        self.producer = KafkaProducer(bootstrap_servers='%s:%s' % (kwargs['bootstrap_host'], kwargs['bootstrap_port']))
+
+
+        self.producer = KafkaProducer(bootstrap_servers='%s:%s' % (kafka_host, kafka_port))
+        try:
+            replay = True if self.config['replay_dir'] <> None else False
+        except:
+            replay = False 
         if not replay:
+            
             self.start_ib_connection()
+            self.load_tickers() 
+        else:
+            self.replay(self.config['replay_dir'])
         
 #from cep.ib_mds.processor.us_stkopt import StockOptionsSnapshot
         self.main_loop()
@@ -320,7 +336,7 @@ class IbKafkaProducer(Publisher):
                 s_msg = line.split('|')[1]
                 msg = json.loads(s_msg)
                 msg_ts = datetime.datetime.fromtimestamp(msg['ts'])
-                interval = (msg_ts - (last_record_ts if last_record_ts <> None else msg_ts)).microseconds / 1000000.0
+                interval = (msg_ts - (last_record_ts if last_record_ts <> None else msg_ts)).microseconds / 500000.0   #100000.0
                 
                 print '%s %s %s' % (msg_ts.strftime('%Y-%m-%d %H:%M:%S.%f'), s_msg, fn)
                 self.producer.send(IbKafkaProducer.IB_TICK_PRICE if msg['typeName'] == 'tickPrice' else IbKafkaProducer.IB_TICK_SIZE, s_msg)
@@ -328,7 +344,7 @@ class IbKafkaProducer(Publisher):
                 last_record_ts = msg_ts
                 sleep(interval)
 
-        files = sorted([ join(dir_loc,f) for f in listdir(dir_loc) if isfile(join(dir_loc,f)) ])   
+        files = sorted([ join(dir_loc,f) for f in listdir(dir_loc) if (isfile(join(dir_loc,f)) and f.endswith('.txt')) ])   
                  
         for f in files:
             process_msg(f)
@@ -420,7 +436,8 @@ if __name__ == '__main__':
         
         if value <> None:
             kwargs[option] = value
-    cfg_path= options.config_file
+
+    #cfg_path= options.config_file
 
     if options.symbols_file:
         kwargs['ib_mds.subscription.fileloc']= options.symbols_file
@@ -430,13 +447,13 @@ if __name__ == '__main__':
     logging.basicConfig(**logconfig)        
 
     logging.info('config settings: %s' % kwargs)
-    replay = True if options.replay_dir <> None else False 
-    ik = IbKafkaProducer(kwargs, replay)
+    #replay = True if options.replay_dir <> None else False 
+    ik = IbKafkaProducer(kwargs)
     
-    if not replay:
-        ik.load_tickers()    
-    else:
-        ik.replay(options.replay_dir)
+#     if not replay:
+#         ik.load_tickers()    
+#     else:
+#         ik.replay(options.replay_dir)
         
-    ik.run_forever()
+#    ik.run_forever()
     

+ 3 - 0
src/comms/ibgw/client_request_handler.py

@@ -50,6 +50,9 @@ class ClientRequestHandler(BaseMessageListener):
         self.tws_connect.reqExecutions(0, ef)
     
     
+    def reqContractDetails(self, event, contract_key):
+        pass
+    
     def reqIds(self, event, value=None):
         self.tws_connect.reqIds(1)
     

+ 28 - 3
src/comms/ibgw/tws_event_handler.py

@@ -19,7 +19,7 @@ class TWS_event_handler(EWrapper, Publisher):
     # WebConsole is one such subscriber
     # it is interested in 
     PUBLISH_TWS_EVENTS = ['error', 'openOrder', 'openOrderEnd', 'orderStatus', 'openBound', 'tickPrice', 'tickSize',
-                          'tickOptionComputation', 'position', 'accountSummary'
+                          'tickOptionComputation', 'position', 'accountSummary', 'contractDetails', 'update_portfolio_account'
                           ]
     
     def __init__(self, producer):
@@ -186,10 +186,35 @@ class TWS_event_handler(EWrapper, Publisher):
         
 
     def contractDetails(self, reqId, contractDetails):
-        self.broadcast_event('contractDetails', vars())
+        '''
+            ContractDetails (Contract summary, String marketName, double minTick, String orderTypes, 
+                String validExchanges, int underConId, String longName, String contractMonth, String industry, 
+                String category, String subcategory, String timeZoneId, String tradingHours, 
+                String liquidHours, String evRule, double evMultiplier, int aggGroup)
+        '''
+        # the next few lines serialize the class object into dict
+        cd = deepcopy(contractDetails.__dict__)
+        contract = deepcopy(cd['m_summary'].__dict__)
+        cd.pop('m_summary')
+        cd['m_summary']= contract
+        try:
+            cd.pop('m_secIdList')
+        except:
+            pass
+        self.broadcast_event('contractDetails', {
+                                'req_id': reqId, 
+                                'contract_info': cd, 
+                                'end_batch': False
+                                })  
+
 
     def contractDetailsEnd(self, reqId):
-        self.broadcast_event('contractDetailsEnd', vars())
+        self.broadcast_event('contractDetails', {
+                                'req_id': reqId, 
+                                'contract_info': None, 
+                                'end_batch': True
+                                
+                                })  
 
     def bondContractDetails(self, reqId, contractDetails):
         self.broadcast_event('bondContractDetails', vars())

+ 6 - 1
src/comms/ibgw/tws_gateway.py

@@ -22,6 +22,7 @@ from comms.ibgw.tws_gateway_restapi import WebConsole
 from comms.ibgw.order_manager import OrderManager
 from ormdapi.v2.quote_handler import QuoteRESTHandler
 from ormdapi.v2.position_handler import AccountPositionTracker
+from ormdapi.v2.contract_handler import ContractHandler
 import redis
 import threading
 from threading import Lock
@@ -74,7 +75,7 @@ class TWS_gateway():
             3. establish TWS gateway connectivity
             
             4. initialize listeners: ClientRequestHandler and SubscriptionManager
-            4a. start order_id_manager
+            4a. start a bunch of services for the REST API
             5. start the prosumer 
             6. run web console
         
@@ -164,6 +165,7 @@ class TWS_gateway():
         self.order_manager.start_order_manager()
         self.quote_manager = QuoteRESTHandler('quote_manager', self)
         self.pos_manager = AccountPositionTracker('acctpos_manager', self)
+        self.contract_info_manager = ContractHandler('contract_info_mgr', self)
         
     def initialize_redis(self):
 
@@ -217,6 +219,9 @@ class TWS_gateway():
     def get_pos_manager(self):
         return self.pos_manager
     
+    def get_contract_info_manager(self):
+        return self.contract_info_manager
+    
     def get_redis_conn(self):
         return self.rs
 

+ 2 - 1
src/comms/ibgw/tws_gateway_restapi.py

@@ -70,7 +70,8 @@ class WebConsole(Subscriber):
         WebConsole.api.add_resource(apiv2.OpenOrdersStatus_v2, '/v2/open_orders', resource_class_kwargs={'webconsole': self})
         WebConsole.api.add_resource(apiv2.QuoteRequest_v2, '/v2/quote', resource_class_kwargs={'webconsole': self})
         WebConsole.api.add_resource(apiv2.AcctPosition_v2, '/v2/position', resource_class_kwargs={'webconsole': self})
-        WebConsole.api.add_resource(apiv2.SystemStatus, '/v2/system', resource_class_kwargs={'webconsole': self})
+        WebConsole.api.add_resource(apiv2.SystemStatus_v2, '/v2/system', resource_class_kwargs={'webconsole': self})
+        WebConsole.api.add_resource(apiv2.ContractInfo_v2, '/v2/contract', resource_class_kwargs={'webconsole': self})
         
 
     def set_stop(self):

+ 1 - 1
src/config/mds.cfg

@@ -1,5 +1,5 @@
 [cep]
-kafka.host: 'vorsprung'
+kafka.host: 'localhost'
 kafka.port: 9092
 kafka.ib.topic.tick_price: 'ib_tick_price'
 kafka.ib.topic.tick_size: 'ib_tick_size'

+ 4 - 4
src/misc2/helpers.py

@@ -37,7 +37,7 @@ class OrderValidationException(Exception):
 class OrderHelper(BaseHelper):
     
     @staticmethod
-    def order_validation(account, side, quantity, price):
+    def order_validation(account, side, quantity, price, order_type):
         if account == None or account == '':
             raise OrderValidationException("account must not be blank!")
         elif str(side) not in ['BUY', 'SELL']:
@@ -55,7 +55,7 @@ class OrderHelper(BaseHelper):
         except ValueError:
             raise OrderValidationException("price or quantity must be a numeric value")
         
-        if price == 0:
+        if order_type <> 'MKT' and price == 0:
             raise OrderValidationException("price must not be zero")
         return True
         
@@ -65,9 +65,9 @@ class OrderHelper(BaseHelper):
         if order.m_orderType == None:
             raise OrderValidationException("order type must not be blank")
         elif order.m_orderType in ['LMT', 'STP LMT']:
-            OrderHelper.order_validation(order.m_account, order.m_action, order.m_totalQuantity, order.m_lmtPrice)
+            OrderHelper.order_validation(order.m_account, order.m_action, order.m_totalQuantity, order.m_lmtPrice, order.m_orderType)
         elif order.m_orderType =='MKT':
-            OrderHelper.order_validation(order.m_account, order.m_action, order.m_totalQuantity, 0.0)
+            OrderHelper.order_validation(order.m_account, order.m_action, order.m_totalQuantity, 0.0, order.m_orderType)
         
             
         

+ 15 - 1
src/ormdapi/test/api_test.py

@@ -11,6 +11,19 @@ def api_test(url, params, jdata=None):
     r = requests.post(url, params, jdata)
     print r.content
 
+
+def test_contract_info_v2():
+    urls = [
+    'http://localhost:5001/v2/contract?contract_info={"currency": "USD", "symbol": "BA", "sec_type":"STK"}',
+    'http://localhost:5001/v2/contract?contract_info={"currency": "HKD", "symbol": "700", "sec_type":"STK"}',
+    'http://localhost:5001/v2/contract?contract_info={"right": "P", "exchange": "SMART", "symbol": "GOOG", "expiry": "20190524", "currency": "USD", "sec_type": "OPT", "strike": 1160}'
+    ]
+    for u in urls:
+        
+        r = requests.get(u, {})
+        print r.content
+
+
 if __name__ == '__main__':
 
     kwargs = {
@@ -25,5 +38,6 @@ if __name__ == '__main__':
     params = {'contract': json.dumps({"right": "", "exchange": "HKFE", "symbol": "HSI", "expiry": "20190530", "currency": "HKD", "sec_type": "FUT", "strike": 0}),
               'order_condition': json.dumps({"account": "U5550568", "order_type": "LMT", "price": 29200, "side": "BUY", "quantity": 1})}
     data = {}
-    api_test(url, params, data)
+    #api_test(url, params, data)
+    test_contract_info_v2()
     

+ 200 - 39
src/ormdapi/v2/apiv2.py

@@ -7,28 +7,14 @@ from ormdapi.v2.position_handler import AccountSummaryTags
 import uuid
 import traceback
 import json
-
+from threading import RLock    
 
 
 
 
 class InterestedTags():
     
-    '''
-    
-    order state information is not processed at this time.
-        
-        m_status = ""
-        m_initMargin = ""
-        m_maintMargin = ""
-        m_equityWithLoan = ""
-        m_commission = float()
-        m_minCommission = float()
-        m_maxCommission = float()
-        m_commissionCurrency = ""
-        m_warningText = ""
-    
-    '''
+
     
     OrderStatus_tags = {'order': {'m_orderId': 'order_id',
                                   'm_clientId': 'client_id',
@@ -53,10 +39,10 @@ class InterestedTags():
                         'state':{   'm_initMargin': "init_margin",
                                     'm_maintMargin': "maint_margin",
                                     'm_equityWithLoan': "equity_with_loan",
-                                    'm_commission': "commission",
-                                    'm_minCommission': "min_commission",
-                                    'm_maxCommission': "max_commission",
-                                    'm_commissionCurrency': "commission_currency",
+#                                     'm_commission': "commission",
+#                                     'm_minCommission': "min_commission",
+#                                     'm_maxCommission': "max_commission",
+#                                     'm_commissionCurrency': "commission_currency",
                                     'm_warningText': "warning_text"},
                         'error': {'errorCode': 'error_code',
                                   'errorMsg': 'error_msg'},
@@ -65,16 +51,59 @@ class InterestedTags():
     
     
     
+    ContractDetails_tags = {'contract_info': {
+                                        'm_industry':'industry',
+                                        'm_liquidHours':'liquid_hours',
+                                        'm_marketName':'market_name',
+#                                        'm_evMultiplier':'ev_multiplier',
+#                                        'm_evRule':'ev_rule',
+                                        'm_summary':'summary',
+                                        'm_minTick':'min_tick',
+                                        'm_contractMonth':'contract_month',
+                                        'm_longName':'long_name',
+                                        'm_timeZoneId':'time_zoneId',
+#                                        'm_orderTypes':'order_types',
+#                                        'm_category':'category',
+                                        'm_tradingHours':'trading_hours',
+#                                        'm_validExchanges':'valid_exchanges',
+#                                        'm_underConId':'under_conId',
+#                                        'm_subcategory':'subcategory',
+                                        'm_priceMagnifier':'price_magnifier',   
+                                },
+                            'summary': {
+                                        'm_tradingClass':'trading_class',
+                                        'm_right':'right',
+                                        'm_symbol':'symbol',
+#                                        'm_conId':'con_id',
+                                        'm_secType':'sec_type',
+#                                        'm_includeExpired':'include_expired',
+                                        'm_primaryExch':'primary_exch',
+                                        'm_multiplier':'multiplier',
+                                        'm_expiry':'expiry',
+                                        'm_currency':'currency',
+                                        'm_localSymbol':'local_symbol',
+                                        'm_exchange':'exchange',
+                                        'm_strike':'strike',                                
+                                }
+                                        
+        }
+    
     @staticmethod
     def filter_unwanted_tags(o_status):
         os = {}
-        for k,v in InterestedTags.OrderStatus_tags['order'].iteritems():
-            os[v] = o_status['order'][k]
-        for k,v in InterestedTags.OrderStatus_tags['ord_status'].iteritems():
-            os[v] = o_status['ord_status'][k]
-        for k,v in InterestedTags.OrderStatus_tags['state'].iteritems():
-            os[v] = o_status['state'][k]
-            
+        try:
+            for k,v in InterestedTags.OrderStatus_tags['order'].iteritems():
+                os[v] = o_status['order'][k]
+            for k,v in InterestedTags.OrderStatus_tags['ord_status'].iteritems():
+                os[v] = o_status['ord_status'][k]
+        except:
+            pass
+        try:
+            for k,v in InterestedTags.OrderStatus_tags['state'].iteritems():
+                os[v] = o_status['state'][k]
+        except:
+            pass
+        
         try:
             os['error'] = 'error_code:%d, error_msg:%s' % (o_status['error']['errorCode'], o_status['error']['errorMsg'])
         except KeyError:
@@ -82,6 +111,23 @@ class InterestedTags():
         
         return os
     
+    @staticmethod
+    def filter_unwanted_ci_tags(c_info_list):
+        def process_tags(c_info):
+            ci = {}
+            try:
+                for k,v in InterestedTags.ContractDetails_tags['contract_info'].iteritems():
+                    ci[v] = c_info[k]
+                ci.pop('summary')
+                ci['summary'] = {}
+                for k,v in InterestedTags.ContractDetails_tags['summary'].iteritems():
+                    ci['summary'][v] = c_info['m_summary'][k]
+                
+            except:
+                pass
+            return ci
+        return map(process_tags, c_info_list['contract_info'])
+    
 '''
     function to force tws to return all open orders status
     this function will only cause open orders status to be updated in the 
@@ -168,19 +214,19 @@ class v2_helper():
         js_v2 = json.loads(contract_v2str)
         for k,v in js_v2.iteritems():
             if k in mmap:
-                 cdict[mmap[k]] = v
+                cdict[mmap[k]] = v
         return ContractHelper.kv2contract(cdict)
     
 
     @staticmethod
-    def format_v2_str_to_order(order_v2str):
+    def format_v2_str_to_order(order_v2str, margin_check=False):
         omap = {'order_type': 'm_orderType',
                 'account': 'm_account',
                 'side': 'm_action',
                 'quantity': 'm_totalQuantity', 
                 'price':'m_lmtPrice',
                 'aux_price': 'm_auxPrice',
-                'order_ref': 'm_orderRef'
+                'order_ref': 'm_orderRef',
                 }        
     
         
@@ -188,7 +234,13 @@ class v2_helper():
         js_v2 = json.loads(order_v2str)
         for k,v in js_v2.iteritems():
             if k in omap:
-                 odict[omap[k]] = v
+                odict[omap[k]] = v
+                
+        try:
+            if margin_check:
+                odict['m_whatIf'] = True
+        except:
+            pass
         return OrderHelper.kv2object(odict)
     
 
@@ -200,8 +252,6 @@ class SyncOrderCRUD_v2(Resource):
         self.wc = webconsole
         self.gw_conn = self.wc.get_parent().get_tws_connection()
         
-        
-    
     '''
         
         create order
@@ -367,17 +417,20 @@ class AcctPosition_v2(Resource, Publisher):
         self.reqId = 4567
     
     def get(self):
+        
         parser = reqparse.RequestParser()
         parser.add_argument('account', required=False, help="specify account name or leave blank to return all accounts")
         args = parser.parse_args()        
         try:
             
-            # reqPositions must be called as the get_positions method
+            # reqPositions must be called first as the get_positions method
             # in AccountPositionTracker relies on the positionEnd flag to be 
             # set True 
             self.gw_conn.reqPositions()
             self.gw_conn.reqAccountSummary(self.reqId, "All", AccountSummaryTags.get_all_tags())
-            return self.pm.get_positions(args['account']), 201
+            self.gw_conn.reqAccountUpdates(True, '') #args['account']) 
+#            return self.pm.get_positions(args['account']), 201 
+            return self.pm.get_positions(), 201  ### the API doesn't support passing account as a param
             
         except KeyError:
             return self.pm.get_positions(), 201
@@ -388,12 +441,10 @@ class AcctPosition_v2(Resource, Publisher):
    
    
    
-class SystemStatus(Resource):        
+class SystemStatus_v2(Resource):        
 
     def __init__(self, webconsole):
         self.wc = webconsole
-        
-        
 
     def get(self):
         parser = reqparse.RequestParser()
@@ -412,7 +463,7 @@ class SystemStatus(Resource):
                 '''
                     return connectivity status
                 '''
-                return {'TWS connection status: [%s]': 'Connected' if self.wc.get_parent().get_ib_conn_status() else 'Disconnected. Wait for retry...'}, 200
+                return {'TWS connection status:': 'Connected' if self.wc.get_parent().get_ib_conn_status() else 'Disconnected. Wait for retry...'}, 200
 
         except:
             
@@ -420,4 +471,114 @@ class SystemStatus(Resource):
         
         return self.wc.retrieve_logs(), 200
         
+
+class PreOrderMarginCheck_v2(Resource):
+
+
+    def __init__(self, webconsole):
+        self.wc = webconsole
+        self.gw_conn = self.wc.get_parent().get_tws_connection()
+        self.om = self.wc.get_parent().get_order_manager()
+    '''
+        
+
+    '''    
+    def get(self):
+        parser = reqparse.RequestParser()
+        parser.add_argument('contract', required=True, help="contract is required.")
+        parser.add_argument('order_condition', required=True, help="order_condition is required.")
+        args = parser.parse_args()
+        js_contract = args.get('contract')
+        '''
+            set the margin check flag to true
+        '''
+        contract = v2_helper.format_v2_str_to_contract(js_contract, True)
+        js_order_cond = args.get('order_condition')
+        clordid = str(uuid.uuid4())
+
+        self.wc.get_api_sink().add_message('/order', 'PreOrderMarginCheck_v2:get', 'received new order %s condition: %s' % (js_contract, js_order_cond))
+        
+        done = False
+        iom = self.wc.get_parent().get_order_id_manager()
+        iom.request_id('rest-api', clordid)
+        id = None
+        while not done:
+            id = iom.assigned_id(clordid)
+            if id != None:
+                break
+            sleep(0.1)
+        
+        try:    
+            order = v2_helper.format_v2_str_to_order(js_order_cond)
+            OrderHelper.order_validation_ex(order)
+            self.gw_conn.placeOrder(id['next_valid_id'], contract, order)
+            i = 0
+            while 1:
+                 
+                ob = self.om.get_order_book()
+                status =  ob.get_order_status(id)
+                if status:
+                    return status, 201
+                sleep(0.1)
+                i += 0.5 
+                if i >= 15:
+                    return 'Not getting any margin information from the server after waited 5 seconds! Contact administrator', 404            
+            
+        except OrderValidationException as e:
+            return {'error': e.args[0]}, 409
+        except:
+            return {'error': 'check the format of the margin check message! %s' % traceback.format_exc()}, 409
+        
+            
+   
+
+
+class ContractInfo_v2(Resource):
+    req_id = 0
+    def __init__(self, webconsole):
+        self.wc = webconsole    
+        self.gw_conn = self.wc.get_parent().get_tws_connection()
+        self.contract_info_mgr = self.wc.get_parent().get_contract_info_manager()
+        self.lock = RLock()
+
+    def get_req_id(self):
+        try:
+            dispatch = True
+            self.lock.acquire()
+            ContractInfo_v2.req_id += 1
+        except:
+            pass 
+        finally:            
+            self.lock.release()        
+        return ContractInfo_v2.req_id
+                    
+    
+    
+    def get(self):
+        parser = reqparse.RequestParser()
+        parser.add_argument('contract_info', required=True, help="contract is required.")
+        args = parser.parse_args()
+        js_contract = args.get('contract_info')
+        c = v2_helper.format_v2_str_to_contract(js_contract)
+        id = self.get_req_id()
+        self.gw_conn.reqContractDetails(id, c)
+        done = False
+        i=0
+        try:
+            while not done:
+                sleep(0.1)
+                i += 0.5 
+                if i >= 15:
+                    return 'Not getting any contract information from the server after waited 5 seconds! Contact administrator', 404
+                cd = self.contract_info_mgr.get_contract_details(id)
+                if cd <> None:
+                    return {'contract_info': InterestedTags.filter_unwanted_ci_tags(cd)}, 200
+                
+        except:
+            return {'error': 'check the format of the contract message! %s' % traceback.format_exc()}, 409 
+        
+
+
+        
+        
         

+ 57 - 0
src/ormdapi/v2/contract_handler.py

@@ -0,0 +1,57 @@
+import logging
+import json
+from threading import RLock
+from misc2.observer import Subscriber
+from misc2.observer import NotImplementedException
+from misc2.helpers import ContractHelper
+from copy import deepcopy
+import traceback
+from finopt.instrument import Symbol
+
+class ContractHandler(Subscriber):
+
+    
+    def __init__(self, name, gw_parent):
+        
+        self.contract_details = {}
+        self.name = name
+        self.gw_parent = gw_parent
+        self.tws_event_handler = gw_parent.get_tws_event_handler()
+        self.end = False
+        
+        
+        Subscriber.__init__(self, self.name)
+        
+        '''
+             ask tws_event_handler to forward contract details message to
+             this class
+             
+        '''
+        for e in ['contractDetails']:
+            self.tws_event_handler.register(e, self)           
+        
+
+    def handle_contract_details(self, req_id, contract_info, end_batch):
+        logging.debug('ContractHandler:handle_contract_details')
+        if not end_batch:
+            self.end = False
+            try:
+                cd = self.contract_details[req_id]
+            except KeyError:
+                cd = self.contract_details[req_id] = {'contract_info': []}
+            cd['contract_info'].append(contract_info)
+        else:
+            self.end = True
+        
+    
+        
+    def update(self, event, **param): 
+        if event == 'contractDetails':
+            self.handle_contract_details(**param)
+        
+    
+    def get_contract_details(self, req_id):
+        return self.contract_details.pop(req_id, None)
+        
+        
+

+ 89 - 5
src/ormdapi/v2/position_handler.py

@@ -8,6 +8,7 @@ from misc2.helpers import ContractHelper
 from misc2.observer import Subscriber
 from copy import deepcopy
 import threading
+from urllib3.packages.six import _MovedItems
 
 
 class AccountSummaryTags():
@@ -56,14 +57,18 @@ class AccountPositionTrackerException(Exception):
 
 class AccountPositionTracker(Subscriber):
     
-    POSITION_EVENTS = ['position', 'accountSummary']
+    POSITION_EVENTS = ['position', 'accountSummary', 'update_portfolio_account']
     
     def __init__(self, name, gw_parent):
 
         Subscriber.__init__(self, name)
         self.name = name
+        # indicate end of position download
         self.pos_end = False 
+        # indicate end of acct summary download
         self.acct_end = False
+        # indicate end of port acct download
+        self.port_acct_end = {}
         self.account_position = {}
         self.gw_parent = gw_parent
         self.tws_event_handler = gw_parent.get_tws_event_handler()
@@ -73,10 +78,80 @@ class AccountPositionTracker(Subscriber):
              this class
              
         '''
-        for e in ['position', 'accountSummary']:
+        for e in AccountPositionTracker.POSITION_EVENTS:
             self.tws_event_handler.register(e, self)                  
     
+    def handle_portfolio_account(self, **items):
+        '''
+            
+            in tws_event_handler, the update_portfolio_account message is used for handling the four types 
+            of tws events below, all these events are sent as "update_portfolio_account" event 
+            to find out which event is being sent by update_portfolio_account, look into 
+            the param for the key "tws_event"
+             
+            refer to line 142 in the source file:
+                self.update_portfolio_account(
+                              {'tws_event': 'updatePortfolio'....
+                              
+            Note that each tws event carried different params, as such for different event
+            the **items will contain different things
         
+            def updatePortfolio(self, contract, position, marketPrice, marketValue, averageCost, unrealizedPNL, realizedPNL, account):
+            def updateAccountValue(self, key, value, currency, account):
+            def updateAccountTime(self, timeStamp):
+            def accountDownloadEnd(self, account):
+            
+            the variables are stored as below:
+                retrieve the accountName in the event
+                
+                self.account_position[accountName]['lastUpdated'] = timeStamp
+                
+            
+        '''
+        ev = items['tws_event']
+            
+          
+        if ev == 'accountDownloadEnd':
+            self.port_acct_end[items['account']] = True
+        elif ev == 'updateAccountTime':
+            try:
+                for a in self.account_position.keys():
+                    self.account_position[a]['portfolio_account']['timestamp'] = items['timestamp']
+                #map(lambda x: x['portfolio_account'].__setitem__('timestamp', items['timeStamp']), self.account_position)
+            except:
+                #pass
+                logging.error(traceback.format_exc())
+
+        else:
+            try:
+                account = items['account']
+                _ = self.account_position[account]
+            except:
+                self.account_position[account] = {'acct_position': {}, 
+                                                               'account_summary': {'tags':{}},
+                                                               'portfolio_account': {'tags':{}, 'contracts':[]}
+                                                               }
+                
+            if ev == 'updateAccountValue':
+                self.account_position[account]['portfolio_account']['tags'][items['key']] = items['value'] 
+                self.account_position[account]['portfolio_account']['tags']['currency'] = items['currency']
+            elif ev == 'updatePortfolio':
+                try:
+                    contract_key = items['contract_key']
+                    self.account_position[account]['portfolio_account']
+                    ckv = ContractHelper.contract2kv(ContractHelper.makeContractfromRedisKeyEx(contract_key)) 
+                    self.account_position[account]['portfolio_account'][contract_key] = {'contract': ckv,
+                                                                  'position': items['position'], 
+                                                                  'market_price': items['market_price'],
+                                                                  'market_value': items['market_value'],
+                                                                  'average_cost': items['average_cost'],
+                                                                  'unrealized_PNL': items['unrealized_PNL'],
+                                                                  'realized_PNL': items['realized_PNL'],
+                                                                  
+                                                                  }
+                except:
+                    logging.error(traceback.format_exc())
+       
     def handle_position(self, account, contract_key, position, average_cost, end_batch):
         logging.info('AccountPositionTracker:position account[%s] contract[%s]', account, contract_key)
         if not end_batch:
@@ -87,7 +162,9 @@ class AccountPositionTracker(Subscriber):
             except:
                 #logging.error(traceback.format_exc())
                 self.account_position[account] = {'acct_position': {contract_key: {'contract': ckv, 'position': position, 'average_cost': average_cost}},
-                                                  'account_summary': {'tags':{}}}
+                                                  'account_summary': {'tags':{}},
+                                                  'portfolio_account':{'tags':{}, 'contracts':[]}
+                                                 }
             
             
 
@@ -105,7 +182,10 @@ class AccountPositionTracker(Subscriber):
             try:
                 _ = self.account_position[items['account']]
             except KeyError:
-                self.account_position[items['account']]= {'acct_position': {}, 'account_summary': {'tags':{}}}
+                self.account_position[items['account']]= {'acct_position': {}, 
+                                                          'account_summary': {'tags':{}},
+                                                          'portfolio_account': {'tags':{}, 'contracts':[]}
+                                                           }
                 
             for k, v in items.iteritems():
                 if k == 'tag':
@@ -122,7 +202,8 @@ class AccountPositionTracker(Subscriber):
             self.handle_position(**param)
         elif event == 'accountSummary':
             self.handle_account_summary(**param)
-
+        elif event == 'update_portfolio_account':
+            self.handle_portfolio_account(**param)
    
    
     def get_positions(self, account=None):
@@ -148,4 +229,7 @@ class AccountPositionTracker(Subscriber):
         finally:
             lock.release()
             return result     
+        
+    def get_accounts(self):
+        return self.account_position.keys()
            

+ 0 - 3
src/ormdapi/v2/quote_handler.py

@@ -1,8 +1,5 @@
 import logging
-import json
-from threading import RLock
 from misc2.observer import Subscriber
-from misc2.observer import NotImplementedException
 from misc2.helpers import ContractHelper
 from copy import deepcopy
 import traceback

+ 4 - 2
src/sh/run_mds.sh

@@ -13,7 +13,8 @@ elif [ $HOST == 'astron' ]; then
 	# virtual env
 	FINOPT_HOME=~/workspace/fpydevs/eclipse/finopt/src
 	source /home/laxaurus/workspace/fpydevs/env/bin/activate
-        SYMBOLS_PATH=/home/laxaurus/workspace/fpydevs/dat/symbols/goog.txt
+    SYMBOLS_PATH=/home/laxaurus/workspace/fpydevs/dat/symbols/goog.txt
+    REPLAY_PATH=/home/laxaurus/workspace/fpydevs/dat/mds_files
 
 
 elif [ $HOST == 'vsu-longhorn' ]; then
@@ -33,7 +34,8 @@ export PYTHONPATH=$FINOPT_HOME:$PYTHONPATH
 # clear all topic offsets and erased saved subscriptions
 
 
-python $FINOPT_HOME/cep/ib_mds.py -s $SYMBOLS_PATH -f $FINOPT_HOME/config/$MDS_CFG 
+#python $FINOPT_HOME/cep/ib_mds.py -s $SYMBOLS_PATH -f $FINOPT_HOME/config/$MDS_CFG 
+python $FINOPT_HOME/cep/ib_mds.py -r $REPLAY_PATH -f $FINOPT_HOME/config/$MDS_CFG
 
 
 #