Преглед изворни кода

support historical data

fix contract_info synchronized call handling
add support for multiple rest calls simultaneously
minor fix in ib_mds on heartbeat sleep interval
add support for historical data retrieval
laxaurus пре 6 година
родитељ
комит
6e8e6b072a

+ 34 - 26
src/cep/ib_mds.py

@@ -30,6 +30,8 @@ class IbKafkaProducer(Publisher):
     IB_TICK_SIZE = 'tickSize'
     IB_TICK_OPTION_COMPUTATION = 'tickOptionComputation'
     
+
+    
     EVENT_READ_FILE_LINE = 'event_read_file_line'
     PUBLISH_EVENTS = [EVENT_READ_FILE_LINE, IB_TICK_PRICE, IB_TICK_SIZE, IB_TICK_OPTION_COMPUTATION]
     DEFAULT_CONFIG = {
@@ -65,9 +67,17 @@ class IbKafkaProducer(Publisher):
         self.persist['persist_dir'] =self.config["ib_mds.persist_dir"]
         self.persist['file_exist'] = False
         self.persist['spill_over_limit'] = int(self.config["ib_mds.spill_over_limit"])
-        #self.load_processors(config['ib_mds.processors'])       
-        IbKafkaProducer.IB_TICK_PRICE = self.config["kafka.ib.topic.tick_price"]
-        IbKafkaProducer.IB_TICK_SIZE = self.config["kafka.ib.topic.tick_size"]
+        if self.config['ib_mds.load_processor'] == True:
+            self.load_processors(config['ib_mds.processors'])       
+
+        
+        #IbKafkaProducer.IB_TICK_PRICE = self.config["kafka.ib.topic.tick_price"]
+        #IbKafkaProducer.IB_TICK_SIZE = self.config["kafka.ib.topic.tick_size"]
+        self.mds_topic_map = {
+            IbKafkaProducer.IB_TICK_PRICE: self.config["kafka.ib.topic.tick_price"],
+            IbKafkaProducer.IB_TICK_SIZE: self.config["kafka.ib.topic.tick_size"],
+            IbKafkaProducer.IB_TICK_OPTION_COMPUTATION: self.config["kafka.ib.topic.tick_opt"]
+            }
         logging.info('******* Starting IbKafkaProducer')
         logging.info('IbKafkaProducer: connecting to kafka host: %s...' % kafka_host)
         logging.info('IbKafkaProducer: message mode is async')
@@ -82,6 +92,7 @@ class IbKafkaProducer(Publisher):
         if not replay:
             
             self.start_ib_connection()
+            time.sleep(1)
             self.load_tickers() 
         else:
             self.replay(self.config['replay_dir'])
@@ -100,8 +111,8 @@ class IbKafkaProducer(Publisher):
             class_ = getattr(module, class_name)
             return class_(self.config)
                 
-        processors = map(instantiate_processor, plist)
-        for p in processors:
+        self.processors = map(instantiate_processor, plist)
+        for p in self.processors:
             map(lambda e: self.register(e, p, getattr(p, e)), IbKafkaProducer.PUBLISH_EVENTS)
         
         
@@ -192,11 +203,14 @@ class IbKafkaProducer(Publisher):
         def create_tick_kmessage(msg):
             d = {}
             for t in msg.items():
-                d[t[0]] = t[1]
+                if t[0] == 'size':
+                    d['tsize'] = t[1]
+                else:
+                    d[t[0]] = t[1]
             d['ts'] = time.time()
             d['contract'] = self.id2contract['id2contracts'][msg.tickerId]
             d['typeName'] = msg.typeName
-            d['source'] = 'IB'
+            #d['source'] = 'IB'
             return json.dumps(d)
 
         
@@ -206,7 +220,10 @@ class IbKafkaProducer(Publisher):
             logging.debug(t)   
             if self.toggle:
                 print t
-            self.producer.send(IbKafkaProducer.IB_TICK_PRICE if msg.typeName == 'tickPrice' else IbKafkaProducer.IB_TICK_SIZE, t) 
+            
+            #self.producer.send(IbKafkaProducer.IB_TICK_PRICE if msg.typeName == 'tickPrice' else IbKafkaProducer.IB_TICK_SIZE, t)
+            self.producer.send(self.mds_topic_map[msg.typeName], t['contract'], t)
+             
             if self.persist['is_persist']:
                 self.write_message_to_file(t)
                 
@@ -224,8 +241,8 @@ class IbKafkaProducer(Publisher):
         elif msg.typeName in ['error']:
             logging.error('on_ib_message: %s %s' % (msg.errorMsg, self.id2contract['id2contracts'][msg.id] if msg.errorCode == 200 else str(msg.errorCode)) )
         else:
-           if self.toggle:
-               print msg
+            if self.toggle:
+                print msg
                 
          
     def write_message_to_file(self, t):
@@ -298,19 +315,6 @@ class IbKafkaProducer(Publisher):
             
             self.dispatch(IbKafkaProducer.EVENT_READ_FILE_LINE, {'record': l})
 
-    def do_work(self):
-        while not self.quit:
-            sleep(1)
-            pass
-        
-    def run_forever(self):
-        t = threading.Thread(target = self.do_work, args=())
-        t.start()
-        self.console()
-        print 'pending ib connection to shut down...'
-        self.disconnect()
-        t.join()
-        print 'shutdown complete.'
 
         
     def disconnect(self):
@@ -393,8 +397,10 @@ class IbKafkaProducer(Publisher):
                         self.toggle = False if self.toggle else True
                     elif selection == '9': 
                         print 'quit command received...'
-                        self.quit = True                        
-                        sys.exit(0)
+                        self.quit = True        
+                        self.menu_loop_done = True
+                        self.ibh.shutdown()                
+                        
                         break
                     else: 
                         pass                        
@@ -402,9 +408,11 @@ class IbKafkaProducer(Publisher):
                     print_menu()                
                 
         except (KeyboardInterrupt, SystemExit):
+                self.quit = True
                 logging.error('ib_mds: caught user interrupt. Shutting down...')
                 sys.exit(0)   
-            
+        
+        logging.info('quit console thread.')   
 
 if __name__ == '__main__':
            

+ 1 - 2
src/cep/ib_mds/processor/us_stkopt.py

@@ -16,13 +16,12 @@ class StockOptionsSnapshot(threading.Thread, Subscriber):
         threading.Thread.__init__(self)
         Subscriber.__init__(self, 'StockOptionsSnapshot')
         self.symbols = {}
-        self.run_forever()
         self.persist={}
         self.persist['is_persist'] = config["stkopt.is_persist"]
         self.persist['persist_dir'] =config["stkopt.persist_dir"]
         self.persist['file_exist'] = False
         self.persist['spill_over_limit'] = int(config["stkopt.spill_over_limit"])
-        
+        self.run_forever()
         
     def run_forever(self):
         self.start()

+ 11 - 1
src/comms/ib_heartbeat.py

@@ -54,6 +54,16 @@ class IbHeartBeat():
         suppress_msg_interval = int(self.config["ib_heartbeat.suppress_msg_interval"])
         logging.info('ib gateway->%s:%d, appid->%d, try_interval->%d, suppress msg interval->%d' % \
                      (host, port, appid, try_interval, suppress_msg_interval))
+        def smart_sleep(sleep_duration):
+            num_steps = 20
+            short_break = sleep_duration / num_steps
+            for i in range(num_steps):
+                if self.quit:
+                    break
+                else:
+                    sleep(short_break)
+                    
+                            
         while not self.quit:
             con = ibConnection(host, port, appid)
             rc = con.connect()
@@ -77,7 +87,7 @@ class IbHeartBeat():
                     self.last_broken_time = now 
                     logging.error(msg)
                 
-            sleep(try_interval)
+            smart_sleep(try_interval)
             
 
 

+ 2 - 1
src/comms/ibgw/tws_event_handler.py

@@ -19,7 +19,8 @@ class TWS_event_handler(EWrapper, Publisher):
     # WebConsole is one such subscriber
     # it is interested in 
     PUBLISH_TWS_EVENTS = ['error', 'openOrder', 'openOrderEnd', 'orderStatus', 'openBound', 'tickPrice', 'tickSize',
-                          'tickOptionComputation', 'position', 'accountSummary', 'contractDetails', 'update_portfolio_account'
+                          'tickOptionComputation', 'position', 'accountSummary', 'contractDetails', 'update_portfolio_account',
+                          'historicalData'
                           ]
     
     def __init__(self, producer):

+ 1 - 1
src/comms/ibgw/tws_gateway_restapi.py

@@ -73,7 +73,7 @@ class WebConsole(Subscriber):
         WebConsole.api.add_resource(apiv2.SystemStatus_v2, '/v2/system', resource_class_kwargs={'webconsole': self})
         WebConsole.api.add_resource(apiv2.ContractInfo_v2, '/v2/contract', resource_class_kwargs={'webconsole': self})
         WebConsole.api.add_resource(apiv2.PreOrderMarginCheck_v2, '/v2/margin', resource_class_kwargs={'webconsole': self})
-        
+        WebConsole.api.add_resource(apiv2.HistoricalData_v2, '/v2/historical_data', resource_class_kwargs={'webconsole': self})
         
 
     def set_stop(self):

+ 2 - 0
src/config/mds.cfg

@@ -3,6 +3,7 @@ kafka.host: 'localhost'
 kafka.port: 9092
 kafka.ib.topic.tick_price: 'ib_tick_price'
 kafka.ib.topic.tick_size: 'ib_tick_size'
+kafka.ib.topic.tick_opt: 'ib_tick_opt'
 
 
 [ib_mds]
@@ -21,6 +22,7 @@ ib_mds.is_persist: 1
 ib_mds.persist_dir: '/home/laxaurus/workspace/fpydevs/dat/mds_files'
 ib_mds.subscription.fileloc: '/home/laxaurus/workspace/fpydevs/dat/mds_files/instruments.txt'
 ib_mds.spill_over_limit: 10000
+ib_mds.load_processor: True
 ib_mds.processors: ['cep.ib_mds.processor.us_stkopt.StockOptionsSnapshot']
 
 [StockOptionsSnapshot]

+ 4 - 2
src/config/tws_gateway.cfg

@@ -18,8 +18,10 @@ redis_db: 2
 #
 # paper trade - 7497
 #
-tws_host: 'localhost'
-tws_api_port: 7497
+#tws_host: 'localhost'
+#tws_api_port: 7497
+tws_host: 'vsu-longhorn'
+tws_api_port: 8496
 tws_app_id: 6567
 #
 #

+ 68 - 5
src/ormdapi/v2/apiv2.py

@@ -518,7 +518,7 @@ class PreOrderMarginCheck_v2(Resource):
                 ob = self.om.get_order_book()
                 status =  ob.get_order_status(id['next_valid_id'])
                 if status:
-                    return status, 201
+                    return InterestedTags.filter_unwanted_tags(status), 201
                 sleep(0.1)
                 i += 0.5 
                 if i >= 15:
@@ -534,7 +534,8 @@ class PreOrderMarginCheck_v2(Resource):
 
 
 class ContractInfo_v2(Resource):
-    req_id = 0
+    CONTRACTINFO_REQID_START = 3000
+    req_id = CONTRACTINFO_REQID_START
     def __init__(self, webconsole):
         self.wc = webconsole    
         self.gw_conn = self.wc.get_parent().get_tws_connection()
@@ -568,17 +569,79 @@ class ContractInfo_v2(Resource):
             while not done:
                 sleep(0.1)
                 i += 0.5 
-                if i >= 15:
-                    return 'Not getting any contract information from the server after waited 5 seconds! Contact administrator', 404
+                if i >= 20:
+                    return 'Not getting any contract information from the server after waited 10 seconds! Contact administrator', 404
                 cd = self.contract_info_mgr.get_contract_details(id)
                 if cd <> None:
-                    return {'contract_info': InterestedTags.filter_unwanted_ci_tags(cd)}, 200
+                    try:
+                        _ = cd['contract_info']
+                        return {'contract_info': InterestedTags.filter_unwanted_ci_tags(cd)}, 200
+                    except:
+                        return cd, 409
                 
         except:
             return {'error': 'check the format of the contract message! %s' % traceback.format_exc()}, 409 
         
 
 
+
+class HistoricalData_v2(Resource):
+    HISTDATA_REQID_START = 4000
+    req_id = HISTDATA_REQID_START
+    def __init__(self, webconsole):
+        self.wc = webconsole    
+        self.gw_conn = self.wc.get_parent().get_tws_connection()
+        self.contract_info_mgr = self.wc.get_parent().get_contract_info_manager()
+        self.lock = RLock()
+
+    def get_req_id(self):
+        try:
+            dispatch = True
+            self.lock.acquire()
+            HistoricalData_v2.req_id += 1
+        except:
+            pass 
+        finally:            
+            self.lock.release()        
+        return HistoricalData_v2.req_id
+                    
+    
+    
+    def get(self):
+        parser = reqparse.RequestParser()
+        parser.add_argument('contract', required=True, help="contract is required.")
+        parser.add_argument('range', required=True, help="range is required.")
+        args = parser.parse_args()
+        js_contract = args.get('contract')
+        range = json.loads(args.get('range'))
+        c = v2_helper.format_v2_str_to_contract(js_contract)
+        id = self.get_req_id()
+        #
+        #def reqHistoricalData(self, tickerId, contract, endDateTime, durationStr, barSizeSetting, whatToShow, useRTH, formatDate)
+        self.gw_conn.reqHistoricalData(id, 
+                                        c, 
+                                        range['end_date'].encode('ascii'), 
+                                        range['duration'].encode('ascii'), 
+                                        range['bar_size'].encode('ascii'),
+                                        range['what_to_show'].encode('ascii'), 
+                                        range['incl_off_mkt_data'], 
+                                        1)
+            
+        done = False
+        i=0
+        try:
+            while not done:
+                sleep(0.1)
+                i += 0.5 
+                if i >= 60:
+                    return 'Not getting any contract information from the server after waited 10 seconds! Check for last errors', 404
+                hd = self.contract_info_mgr.get_historical_data(id)
+                if hd <> None:
+                    return {'historical_data': hd}, 200
+                
+        except:
+            return {'error': 'check the format of the contract message! %s' % traceback.format_exc()}, 409 
+        
         
         
         

+ 66 - 8
src/ormdapi/v2/contract_handler.py

@@ -14,10 +14,12 @@ class ContractHandler(Subscriber):
     def __init__(self, name, gw_parent):
         
         self.contract_details = {}
+        self.contract_end = {}
         self.name = name
         self.gw_parent = gw_parent
         self.tws_event_handler = gw_parent.get_tws_event_handler()
-        self.end = False
+        self.hist_data = {}
+        self.hist_data_end = {}
         
         
         Subscriber.__init__(self, self.name)
@@ -27,31 +29,87 @@ class ContractHandler(Subscriber):
              this class
              
         '''
-        for e in ['contractDetails']:
+        for e in ['contractDetails', 'historicalData', 'error']:
             self.tws_event_handler.register(e, self)           
         
 
+
     def handle_contract_details(self, req_id, contract_info, end_batch):
         logging.debug('ContractHandler:handle_contract_details')
+        try:
+            _ = self.contract_end[req_id]
+        except KeyError:
+            self.contract_end[req_id] = end_batch
+        
         if not end_batch:
-            self.end = False
+            self.contract_end[req_id] = False    
             try:
                 cd = self.contract_details[req_id]
             except KeyError:
                 cd = self.contract_details[req_id] = {'contract_info': []}
             cd['contract_info'].append(contract_info)
         else:
-            self.end = True
+            self.contract_end[req_id] = True
         
-    
+    def handle_historical_data(self, reqId, date, open, high, low, close, volume, count, WAP, hasGaps):
+        logging.info('ContractHandler:handle_historical_data %s %0.2f' % (date, close))
+        
+        try:
+            _ = self.hist_data_end[reqId]
+        except KeyError:
+            self.hist_data_end[reqId] = False
+            
+        self.hist_data_end[reqId] = True if 'finished-' in date else False
+        
+        if not self.hist_data_end[reqId]:
+            try:
+                _ = self.hist_data[reqId]
+            except KeyError:
+                self.hist_data[reqId] = [['date', 'open', 'high', 'low', 'close', 'volume', 'count', 'WAP', 'hasGaps']]
+            
+#            self.hist_data[reqId].append({'date': date, 'open': open, 'high': high, 'low': low, 
+#                                          'close': close, 'volume': volume, 'count': count, 'WAP': WAP, 'hasGaps': hasGaps})
+            self.hist_data[reqId].append([date, open, high, low, close, volume, count, WAP, hasGaps])
+        else:
+            logging.info('ContractHandler:handle_historical_data COMPLETE DOWNLOAD request id [%d]!' % reqId)  
         
     def update(self, event, **param): 
         if event == 'contractDetails':
             self.handle_contract_details(**param)
-        
+        elif event == 'historicalData':
+            self.handle_historical_data(**param)
+        elif event == 'error':
+            try:
+                
+                if param['id'] > 3000 and param['id'] < 3999:
+                    self.contract_details[param['id']] = {'error': param['errorMsg']}
+                    self.contract_end[param['id']] = True
+                    
+                elif param['id'] > 4000 and param['id'] < 4999:
+                    self.hist_data[param['id']] = {'error': param['errorMsg']}
+                    self.hist_data_end[param['id']] = True
+                    
+            except:
+                pass
     
     def get_contract_details(self, req_id):
-        return self.contract_details.pop(req_id, None)
+        try:
+            _ = self.contract_end[req_id]
+            if self.contract_end[req_id] == False:
+                return None
+            return self.contract_details.pop(req_id, None)    
+        except:
+            return None
         
         
-
+    def get_historical_data(self, req_id):
+        try:
+            _ = self.hist_data_end[req_id]
+            if self.hist_data_end[req_id] == False:
+                return None
+            return self.hist_data.pop(req_id, None)    
+        except:
+            return None        
+        
+        
+    

+ 7 - 3
src/sh/run_mds.sh

@@ -13,7 +13,8 @@ elif [ $HOST == 'astron' ]; then
 	# virtual env
 	FINOPT_HOME=~/workspace/fpydevs/eclipse/finopt/src
 	source /home/laxaurus/workspace/fpydevs/env/bin/activate
-    SYMBOLS_PATH=/home/laxaurus/workspace/fpydevs/dat/symbols/goog.txt
+    #SYMBOLS_PATH=/home/laxaurus/workspace/fpydevs/dat/symbols/goog.txt
+        SYMBOLS_PATH=$FINOPT_HOME/../../../dat/symbols/instruments.txt
     REPLAY_PATH=/home/laxaurus/workspace/fpydevs/dat/mds_files
 
 
@@ -26,6 +27,7 @@ elif [ $HOST == 'vsu-vortify' ]; then
         source /home/vuser-vortify/workspace/fpydevs/env/bin/activate
         MDS_CFG=mds_avant.cfg
         SYMBOLS_PATH=$FINOPT_HOME/../../../dat/symbols/goog.txt
+        #SYMBOLS_PATH=$FINOPT_HOME/../../../dat/symbols/instruments.txt
 fi
 									
 						
@@ -33,9 +35,11 @@ export PYTHONPATH=$FINOPT_HOME:$PYTHONPATH
 #  
 # clear all topic offsets and erased saved subscriptions
 
+# record and publish ticks
+python $FINOPT_HOME/cep/ib_mds.py -s $SYMBOLS_PATH -f $FINOPT_HOME/config/$MDS_CFG 
 
-#python $FINOPT_HOME/cep/ib_mds.py -s $SYMBOLS_PATH -f $FINOPT_HOME/config/$MDS_CFG 
-python $FINOPT_HOME/cep/ib_mds.py -r $REPLAY_PATH -f $FINOPT_HOME/config/$MDS_CFG
+# replay ticks
+#python $FINOPT_HOME/cep/ib_mds.py -r $REPLAY_PATH -f $FINOPT_HOME/config/$MDS_CFG
 
 
 #