Переглянути джерело

add back heartbeat and reconnect mechanism in tws_gateway

laxaurus 8 роки тому
батько
коміт
d9cd9128e1
3 змінених файлів з 158 додано та 10 видалено
  1. 138 0
      src/comms/ibgw/ib_heartbeat.py
  2. 12 9
      src/comms/ibgw/tws_gateway.py
  3. 8 1
      src/config/tws_gateway.cfg

+ 138 - 0
src/comms/ibgw/ib_heartbeat.py

@@ -0,0 +1,138 @@
+# -*- coding: utf-8 -*-
+
+import sys, traceback
+import json
+import logging
+import ConfigParser
+from ib.opt import ibConnection
+from time import sleep
+import datetime
+import threading
+from optparse import OptionParser
+from misc2.helpers import ConfigMap
+
+class IbHeartBeat():
+    config = None
+    quit = False
+    prev_state = ''
+    q = None
+    #chat_handle = None
+    last_broken_time = None
+    alert_callbacks = []
+    
+    def __init__(self, kwargs):
+        self.kwargs = kwargs
+        
+        # ensure the message will get printed right away when the connection is first broken
+        self.last_broken_time =  datetime.datetime.now() - datetime.timedelta(seconds=90)  
+        
+        
+
+    def register_listener(self, fns):
+        self.alert_callbacks = [fn for fn in fns]
+        
+    def alert_listeners(self, msg):
+        [fn(msg) for fn in self.alert_callbacks]
+
+    def run(self):
+        t = threading.Thread(target = self.keep_trying, args=())
+        t.start()
+
+    def shutdown(self):
+        self.quit = True
+        
+    def keep_trying(self):
+        host = self.kwargs["ib_heartbeat.gateway"]
+        port = self.kwargs["ib_heartbeat.ib_port"]
+        appid = self.kwargs["ib_heartbeat.appid.id"]      
+        try_interval = self.kwargs["ib_heartbeat.try_interval"]
+        suppress_msg_interval = self.kwargs["ib_heartbeat.suppress_msg_interval"]
+        logging.info('ib gateway->%s:%d, appid->%d, try_interval->%d, suppress msg interval->%d' % \
+                     (host, port, appid, try_interval, suppress_msg_interval))
+        while not self.quit:
+            con = ibConnection(host, port, appid)
+            rc = con.connect()
+            if rc:
+                if self.prev_state == 'broken':
+                    msg = '*** Connection restored at %s **********' % datetime.datetime.now().strftime('%H:%M:%S')
+                    #self.chat_handle.post_msg(msg)
+                    self.alert_listeners(msg)
+                    self.prev_state = ''
+                    # reset to a much earlier time
+                    self.last_broken_time = datetime.datetime.now() - datetime.timedelta(seconds=90)
+                con.eDisconnect()
+            else:
+                msg = '*** Connection to IB API is broken **********'
+                now = datetime.datetime.now()
+                
+                #self.prev_state = 'broken' 
+                #print now, self.last_broken_time, (now - self.last_broken_time).seconds
+                if self.last_broken_time == None or (now - self.last_broken_time).seconds > suppress_msg_interval:
+                    #self.chat_handle.post_msg(msg)
+                    logging.error(msg)
+                    self.alert_listeners(msg)
+                    self.last_broken_time = now 
+                    
+                
+            sleep(try_interval)
+            
+
+
+
+        
+    
+
+if __name__ == '__main__':
+           
+  
+    
+#     if len(sys.argv) != 2:
+#         print("Usage: %s <config file>" % sys.argv[0])
+#         exit(-1)    
+# 
+#     cfg_path= sys.argv[1:]
+#     config = ConfigParser.SafeConfigParser()
+#     if len(config.read(cfg_path)) == 0:      
+#         raise ValueError, "Failed to open config file" 
+#     
+#       
+#     logconfig = eval(config.get("ib_mds", "ib_mds.logconfig").strip('"').strip("'"))
+#     logconfig['format'] = '%(asctime)s %(levelname)-8s %(message)s'    
+#     logging.basicConfig(**logconfig)    
+#     ibh = IbHeartBeat(config)
+#     
+#     def warn_me(msg):
+#         print 'warn_me: received %s' % msg
+#     
+#     ibh.register_listener([warn_me])
+#     ibh.run()
+    
+    
+    
+    usage = "usage: %prog [options]"
+    parser = OptionParser(usage=usage)   
+    parser.add_option("-f", "--config_file",
+                      action="store", dest="config_file", 
+                      help="path to the config file")
+    
+    (options, args) = parser.parse_args()
+    
+    kwargs = ConfigMap().kwargs_from_file(options.config_file)
+    for option, value in options.__dict__.iteritems():
+        
+        if value <> None:
+            kwargs[option] = value
+
+
+    logconfig = kwargs['logconfig']
+    logconfig['format'] = '%(asctime)s %(levelname)-8s %(message)s'    
+    logging.basicConfig(**logconfig)        
+
+    ibh = IbHeartBeat(kwargs)
+    
+    def warn_me(msg):
+        print 'warn_me: received %s' % msg
+    
+    ibh.register_listener([warn_me])
+    ibh.run()
+    

+ 12 - 9
src/comms/ibgw/tws_gateway.py

@@ -14,11 +14,13 @@ from misc2.helpers import ContractHelper, ConfigMap
 from optparse import OptionParser
 from comms.ibgw.base_messaging import Prosumer
 from comms.ibgw.tws_event_handler import TWS_event_handler
+from comms.ibgw.ib_heartbeat import IbHeartBeat
 from comms.ibgw.client_request_handler import ClientRequestHandler
 from comms.ibgw.subscription_manager import SubscriptionManager
 from comms.tws_protocol_helper import TWS_Protocol 
 import redis
 import threading
+from threading import Lock
          
 class TWS_gateway():
 
@@ -100,12 +102,11 @@ class TWS_gateway():
             sys.exit(-1)
         else:
             # start heart beat monitor
-            pass
-#             logging.info('starting up IB heart beat monitor...')
-#             self.tlock = Lock()
-#             self.ibh = IbHeartBeat(config)
-#             self.ibh.register_listener([self.on_ib_conn_broken])
-#             self.ibh.run()  
+            logging.info('starting up IB heart beat monitor...')
+            self.tlock = Lock()
+            self.ibh = IbHeartBeat(self.kwargs)
+            self.ibh.register_listener([self.on_ib_conn_broken])
+            self.ibh.run()  
 
 
         logging.info('instantiating listeners...cli_req_handler')        
@@ -175,15 +176,16 @@ class TWS_gateway():
             if self.ib_conn_status == 'OK': # check status
                 return                      # if already fixed up while waiting, return 
             
-            self.eDisconnect()
-            self.eConnect()
+            #self.disconnect_tws()
+            self.connect_tws()
             while not self.tws_connection.isConnected():
                 logging.error('TWS_gateway: attempt to reconnect...')
-                self.eConnect()
+                self.connect_tws()
                 sleep(2)
             
             # we arrived here because the connection has been restored
             # resubscribe tickers again!
+            self.ib_conn_status = 'OK'
             logging.info('TWS_gateway: IB connection restored...resubscribe contracts')
             self.contract_subscription_mgr.force_resubscription()             
             
@@ -216,6 +218,7 @@ class TWS_gateway():
                 logging.error('TWS_gateway: caught user interrupt. Shutting down...')
                 self.gw_message_handler.set_stop()
                 self.gw_message_handler.join()
+                self.ibh.shutdown()
                 logging.info('TWS_gateway: Service shut down complete...')
                 sys.exit(0)        
 

+ 8 - 1
src/config/tws_gateway.cfg

@@ -34,4 +34,11 @@ subscription_manager.topics: ['reqMktData']
 #logconfig: {'filename': '/home/larry-13.04/workspace/finopt/log/tws_gateway.log', 'filemode': 'w','level': logging.INFO}
 logconfig: {'level': logging.INFO,  'filemode': 'w', 'filename':'/tmp/tws_gateway.log'}
 order_transmit: False
-reset_db_subscriptions: False
+reset_db_subscriptions: False
+#
+#
+ib_heartbeat.ib_port: 7496
+ib_heartbeat.appid.id: 9911
+ib_heartbeat.gateway: 'localhost'
+ib_heartbeat.try_interval: 10
+ib_heartbeat.suppress_msg_interval: 5