Преглед изворни кода

daily updates - ConfigParser support

bobhk пре 9 година
родитељ
комит
56c191d499
3 измењених фајлова са 112 додато и 91 уклоњено
  1. 81 78
      src/comms/kf_tws_gateway.py
  2. 7 6
      src/comms/test/base_messaging.py
  3. 24 7
      src/config/tws_gateway.cfg

+ 81 - 78
src/comms/kf_tws_gateway.py

@@ -76,8 +76,12 @@ class TWS_event_handler(EWrapper):
         # if the tickerId is in the snapshot range
         # deduct the gap to derive the original tickerId
         # --- check logic in subscription manager
-        if (t['tickerId']  >= TWS_event_handler.TICKER_GAP):
-            t['tickerId'] = t['tickerId']  - TWS_event_handler.TICKER_GAP
+        try:
+            if (t['tickerId']  >= TWS_event_handler.TICKER_GAP):
+                t['tickerId'] = t['tickerId']  - TWS_event_handler.TICKER_GAP
+        except (KeyError, ):
+            pass          
+            
         try:
             del(t['self'])
         except (KeyError, ):
@@ -163,15 +167,15 @@ class TWS_event_handler(EWrapper):
         self.broadcast_event('connectionClosed', {})
 
     def error(self, id=None, errorCode=None, errorMsg=None):
-        logging.error(self.serialize_vars_to_dict('error', vars()))
+        logging.error(self.tick_process_message('error', vars()))
         self.broadcast_event('error', vars())
 
     def error_0(self, strvalue=None):
-        logging.error(self.serialize_vars_to_dict('error_0', vars()))
+        logging.error(self.tick_process_message('error_0', vars()))
         self.broadcast_event('error_0', vars())
 
     def error_1(self, id=None, errorCode=None, errorMsg=None):
-        logging.error(self.serialize_vars_to_dict('error_1', vars()))        
+        logging.error(self.tick_process_message('error_1', vars()))        
         self.broadcast_event('error_1', vars())
 
     def updateMktDepth(self, tickerId, position, operation, side, price, size):
@@ -184,7 +188,7 @@ class TWS_event_handler(EWrapper):
         self.broadcast_event('updateNewsBulletin', vars())
 
     def managedAccounts(self, accountsList):
-        logging.info(self.serialize_vars_to_dict('managedAccounts', vars()))
+        logging.info(self.tick_process_message('managedAccounts', vars()))
         self.broadcast_event('managedAccounts', vars())
 
     def receiveFA(self, faDataType, xml):
@@ -247,83 +251,87 @@ class TWS_gateway(threading.Thread):
 
     
     # monitor IB connection / heart beat
-    ibh = None
-    tlock = None
-    ib_conn_status = None
-    ib_order_transmit = False
-    
+#     ibh = None
+#     tlock = None
+#     ib_conn_status = None
+    TWS_GW_DEFAULT_CONFIG = {
+      'name': 'tws_gateway_server',
+      'bootstrap_host': 'localhost',
+      'bootstrap_port': 9092,
+      'redis_host': 'localhost',
+      'redis_port': 6379,
+      'redis_db': 0,
+      'group_id': 'TWS_GW',
+      'session_timeout_ms': 10000,
+      'clear_offsets':  False,
+      'order_transmit': False
+      }
+               
     
     def __init__(self, kwargs):
         
 
-        '''
-         kwargs 
-        
-         'name'
-         'bootstrap_host'
-         'bootstrap_port'
-         'redis_host'
-         'redis_port'
-         'redis_db'
-         'group_id'
-         'session_timeout_ms'
-         'topics'
-         'clear_offsets'
-         'order_transmit
-         
-        '''
-        
+             
         super(TWS_gateway, self).__init__()
-        self.kwargs = copy.copy(kwargs)
+        self.kwargs = copy.copy(TWS_gateway.TWS_GW_DEFAULT_CONFIG)
+        for key in self.kwargs:
+            if key in kwargs:
+                self.kwargs[key] = kwargs.pop(key)        
+        self.kwargs.update(kwargs)        
+        
+        # convert some config string values to object 
+        self.kwargs['topics'] = list(eval(self.kwargs['topics']))
         self.ib_order_transmit = self.kwargs['order_transmit']
         
-        logging.info('starting up TWS_gateway...')
-        logging.info('Order straight through (no-touch) flag = %s' % ('True' if self.ib_order_transmit == True else 'False'))
 
 
         '''
             TWS_gateway start up sequence
             
             1. establish redis connection
-            2. initialize prosumer instance
+            2. initialize prosumer instance - gateway message handler
             3. establish TWS gateway connectivity
             
             4. initialize listeners: ClientRequestHandler and SubscriptionManager
             5. start the prosumer 
         
         '''
+
+        logging.info('starting up TWS_gateway...')
+        logging.info('Order straight through (no-touch) flag = %s' % ('True' if self.ib_order_transmit == True else 'False'))
         
         
         logging.info('establishing redis connection...')
-        self.initialize_redis(config)
+        self.initialize_redis()
         
+        logging.info('starting up gateway message handler - kafka Prosumer...')        
+        self.gw_message_handler = Prosumer(name='tws_gw_prosumer', kwargs=self.kwargs)
+
         
-        logging.info('starting up TWS_event_handler...')
-        
+        logging.info('starting up TWS_event_handler...')        
         self.tws_event_handler = TWS_event_handler(self.gw_message_handler)
+        
         logging.info('starting up IB EClientSocket...')
         self.tws_connection = EClientSocket(self.tws_event_handler)
+
+
         
         
         logging.info('establishing TWS gateway connectivity...')
         if not self.eConnect():
-            logging.error('TWS_gateway: unable to establish connection to IB %s:%d' % (self.host, self.port))
+            logging.error('TWS_gateway: unable to establish connection to IB %s:%d' % 
+                          (self.kwargs['tws_host'], self.kwargs['tws_api_port']))
             sys.exit(-1)
         else:
             # start heart beat monitor
-            logging.info('starting up IB heart beat monitor...')
-            self.tlock = Lock()
-            self.ibh = IbHeartBeat(config)
-            self.ibh.register_listener([self.on_ib_conn_broken])
-            self.ibh.run()  
+            pass
+#             logging.info('starting up IB heart beat monitor...')
+#             self.tlock = Lock()
+#             self.ibh = IbHeartBeat(config)
+#             self.ibh.register_listener([self.on_ib_conn_broken])
+#             self.ibh.run()  
 
 
-        logging.info('starting up gateway message handler - kafka Prosumer...')
-        client_requests = list(TWS_Protocol.topicMethods) + list(TWS_Protocol.gatewayMethods)
-        self.gw_message_handler = Prosumer(name='tws_gw_prosumer', kwargs={'bootstrap_host':'localhost', 'bootstrap_port':9092,
-                                        'redis_host':'localhost', 'redis_port':6379, 'redis_db':0,
-                                        'group_id': 'groupA', 'session_timeout_ms':10000,
-                                                 'topics': client_requests, 'clear_offsets' : 0})
 
         logging.info('instantiating listeners...cli_req_handler')        
         self.cli_req_handler = ClientRequestHandler('client_request_handler', self.gw_message_handler)
@@ -342,7 +350,7 @@ class TWS_gateway(threading.Thread):
         self.contract_subscription_mgr.register_persistence_callback(self.persist_subscriptions)
         
         
-        key = self.config.get("tws_gateway",  "subscription_manager.subscriptions.redis_key").strip('"').strip("'")
+        key = self.kwargs["subscription_manager.subscriptions.redis_key"]
         if self.rs.get(key):
             #contracts = map(lambda x: ContractHelper.kvstring2contract(x), json.loads(self.rs.get(key)))
             
@@ -365,7 +373,7 @@ class TWS_gateway(threading.Thread):
 
     def persist_subscriptions(self, contracts):
          
-        key = self.config.get("tws_gateway",  "subscription_manager.subscriptions.redis_key").strip('"').strip("'")
+        key = self.kwargs["subscription_manager.subscriptions.redis_key"]
         #cs = json.dumps(map(lambda x: ContractHelper.contract2kvstring(x) if x <> None else None, contracts))
         cs = json.dumps(map(lambda x: ContractHelper.object2kvstring(x) if x <> None else None, contracts))
         logging.debug('Tws_gateway: updating subscription table to redis store %s' % cs)
@@ -385,8 +393,9 @@ class TWS_gateway(threading.Thread):
             
 
     def eConnect(self):
-        logging.info('ClientRequestHandler - eConnect. Connecting to %s:%s App Id: %s' % (self.host, self.port, self.clientId))
-        self.tws_connection.eConnect(self.host, self.port, self.clientId)
+        logging.info('TWS_gateway - eConnect. Connecting to %s:%s App Id: %s' % 
+                     (self.kwargs['tws_host'], self.kwargs['tws_api_port'], self.kwargs['name']))
+        self.tws_connection.eConnect(self.kwargs['tws_host'], self.kwargs['tws_api_port'], self.kwargs['name'])
         return self.tws_connection.isConnected()
 
     def eDisconnect(self, value=None):
@@ -568,16 +577,15 @@ class ClientRequestHandler(BaseMessageListener):
         
 class SubscriptionManager(BaseMessageListener):
     
-    # array list of contracts
-    handle = []
-    # contract key map to contract ID (index of the handle array)
-    tickerId = {}
     
     persist_f = None
     
     def __init__(self, name, tws_gateway):
         BaseMessageListener.__init__(self, name, tws_gateway)
-        self.tws_connect = tws_gateway.tws_connection    
+        self.tws_connect = tws_gateway.tws_connection
+        self.handle = []    
+        # contract key map to contract ID (index of the handle array)
+        self.tickerId = {}
    
         
         
@@ -744,9 +752,21 @@ def test_subscription():
     print s.is_subscribed(c), ContractHelper.printContract(s.itemAt(s.is_subscribed(c)))
     
     
-
-
+class ConfigMap():
     
+    def kwargs_from_file(self, path):
+        cfg = ConfigParser.ConfigParser()            
+        if len(cfg.read(path)) == 0: 
+            raise ValueError, "Failed to open config file [%s]" % path 
+
+        kwargs = {}
+        for section in cfg.sections():
+            optval_list = map(lambda o: (o, cfg.get(section, o)), cfg.options(section)) 
+            for ov in optval_list:
+                kwargs[ov[0]] = ov[1]
+        
+        return kwargs
+        
     
 if __name__ == '__main__':
     
@@ -755,32 +775,15 @@ if __name__ == '__main__':
         exit(-1)    
 
     cfg_path= sys.argv[1:]
-    config = ConfigParser.SafeConfigParser()
-    if len(config.read(cfg_path)) == 0: 
-        raise ValueError, "Failed to open config file" 
-    
+    kwargs = ConfigMap().kwargs_from_file(cfg_path)
    
       
-    logconfig = eval(config.get("tws_gateway", "tws_gateway.logconfig").strip('"').strip("'"))
+    logconfig = eval(kwargs['logconfig'])
     logconfig['format'] = '%(asctime)s %(levelname)-8s %(message)s'    
     logging.basicConfig(**logconfig)        
     
-
-    
-    
-    khost = config.get("epc", "kafka.host").strip('"').strip("'")
-    kport = config.get("epc", "kafka.port")
-    ihost = config.get("market", "ib.gateway").strip('"').strip("'")
-    iport = int(config.get("market", "ib.port"))
-    iappid = int(config.get("market", "ib.appid.portfolio"))   
-    
-    
-    
-    
-    #print 'give kafka server some time to register the topics...'
-    #sleep(2)
     
-    app = TWS_gateway(ihost, iport, iappid, khost, kport, config)
+    app = TWS_gateway(kwargs)
     app.start()
      
     print 'TWS_gateway started.'

+ 7 - 6
src/comms/test/base_messaging.py

@@ -196,7 +196,7 @@ class BaseConsumer(threading.Thread, Publisher):
     def run(self):
         print '%s:%s started' % (self.kwargs['group_id'], self.name)
         
-        if self.kwargs['clear_offsets'] == 1:
+        if self.kwargs['clear_offsets'] == True:
             self.clear_offsets()
         
         consumer = KafkaConsumer(bootstrap_servers='%s:%s' % (self.kwargs['bootstrap_host'], self.kwargs['bootstrap_port']),
@@ -365,8 +365,6 @@ class Prosumer(BaseProducer):
     
     
     def __init__(self, name, kwargs=None):
-        BaseProducer.__init__(self, group=None, target=None, name=name,
-                 args=(), kwargs=kwargs, verbose=None)
         
         
         self.kwargs = copy.copy(self.PROSUMER_DEFAULT_CONFIG)
@@ -374,10 +372,13 @@ class Prosumer(BaseProducer):
             if key in kwargs:
                 self.kwargs[key] = kwargs.pop(key)        
         self.kwargs.update(kwargs)
-        
+
         logging.info('\nProsumer:init: **** Configurations dump ***')
         logging.info('\n'.join('%s:%s' % (k.ljust(40), self.kwargs[k]) for k in sorted(self.kwargs)))
 
+        BaseProducer.__init__(self, group=None, target=None, name=name,
+                 args=(), kwargs=self.kwargs, verbose=None)
+
         
         self.kconsumer = BaseConsumer(name=name,  kwargs=self.kwargs)
         
@@ -457,7 +458,7 @@ def test_prosumer2(mode):
         pA = Prosumer(name='A', kwargs={'bootstrap_host':'localhost', 'bootstrap_port':9092,
                                         'redis_host':'localhost', 'redis_port':6379, 'redis_db':0,
                                         'group_id': 'groupA', 'session_timeout_ms':10000,
-                                                 'topics': topicsA, 'clear_offsets' : 0})
+                                                 'topics': topicsA, 'clear_offsets' : False})
         sA = SubscriptionListener('earA', pA)
         
         pA.add_listeners([sA])
@@ -488,7 +489,7 @@ def test_prosumer2(mode):
         pB = Prosumer(name='B', kwargs={'bootstrap_host':'localhost', 'bootstrap_port':9092,
                                         'redis_host':'localhost', 'redis_port':6379, 'redis_db':0,
                                         'group_id': 'groupB', 'session_timeout_ms':10000,
-                                                 'topics': topicsB, 'clear_offsets' : 0})
+                                                 'topics': topicsB, 'clear_offsets' : False})
         sB = SubscriptionListener('earB', pB)
         pB.add_listeners([sB])
         pB.start_prosumer()

+ 24 - 7
src/config/tws_gateway.cfg

@@ -1,14 +1,31 @@
 [tws_gateway]
-name: 'tws_gateway_server'
-bootstrap_host: 'localhost'
+name: tws_gateway_server
+#
+# kafka settings
+#
+bootstrap_host: localhost
 bootstrap_port: 9092
-redis_host: 'localhost'
+#
+# redis persistence
+#
+redis_host: localhost
 redis_port: 6379
 redis_db: 0
-group_id: 'TWS_GW'
+#
+# TWS gateway settings
+#
+# 7496 - production larry046, 7496 - development,  8496 production mchan927
+#
+tws_host: localhost
+tws_api_port: 7496 
+#
+#
+#
+group_id: TWS_GW
 session_timeout_ms: 10000
-topics: "'reqAccountUpdates', 'reqOpenOrders', 'reqExecutions', 'reqIds', 'reqNewsBulletins', 'cancelNewsBulletins', 'setServerLogLevel', 'reqAccountSummary', 'reqPositions', 'reqAutoOpenOrders', 'reqAllOpenOrders', 'reqManagedAccts', 'requestFA', 'reqMktData', 'reqHistoricalData', 'placeOrder', 'gw_req_subscriptions'"
+topics: 'reqAccountUpdates', 'reqOpenOrders', 'reqExecutions', 'reqIds', 'reqNewsBulletins', 'cancelNewsBulletins', 'setServerLogLevel', 'reqAccountSummary', 'reqPositions', 'reqAutoOpenOrders', 'reqAllOpenOrders', 'reqManagedAccts', 'requestFA', 'reqMktData', 'reqHistoricalData', 'placeOrder', 'gw_req_subscriptions'
 clear_offsets: False
-subscription_manager.subscriptions.redis_key: 'subscriptions'  
-logconfig: "{'filename': '/home/larry-13.04/workspace/finopt/log/tws_gateway.log', 'filemode': 'w','level': logging.INFO}"
+subscription_manager.subscriptions.redis_key: subscriptions  
+#logconfig: {'filename': '/home/larry-13.04/workspace/finopt/log/tws_gateway.log', 'filemode': 'w','level': logging.INFO}
+logconfig: {'level': logging.INFO}
 order_transmit: False