esurfer 9 éve
szülő
commit
7238d190a4
1 módosított fájl, 87 hozzáadás és 120 törlés
  1. 87 120
      src/comms/kf_tws_gateway.py

+ 87 - 120
src/comms/kf_tws_gateway.py

@@ -2,6 +2,7 @@
 # -*- coding: utf-8 -*-
 
 import sys
+import copy
 from time import sleep, strftime
 import time, datetime
 import ConfigParser
@@ -51,36 +52,12 @@ class TWS_event_handler(EWrapper):
         self.producer = producer
  
  
+
  
-    def serialize_vars_to_dict(self, message, mapping, source='IB'):
-        def create_kmessage(items):
-            d = {}
-            for k,v in items:
-                #print k, v, type(v)
-                #if type(v) in [Contract, Execution, ExecutionFilter, OrderState, Order, CommissionReport]:
-                if 'ib.ext.' in str(type(v)):     
-                    d[k] = v.__dict__
-                else:
-                    d[k] = v
-    
-               
-            d['ts'] = time.time()
-            d['typeName'] = message
-            d['source'] = source
-            return d
-        
-        
-        try:
-            del(mapping['self'])
-        except (KeyError, ):
-            pass
-        items = list(mapping.items())
-        return create_kmessage(items)
- 
-    def broadcast_event(self, message, mapping, source='IB'):
+    def broadcast_event(self, message, mapping):
 
         try:
-            dict = self.tick_process_message(message, mapping, source)     
+            dict = self.tick_process_message(message, mapping)     
             if message == 'gw_subscriptions' or message == 'gw_subscription_changed':   
                 logging.info('TWS_event_handler: broadcast event: %s [%s]' % (dict['typeName'], dict))
             self.producer.send(message, self.producer.message_dumps(dict))    
@@ -92,7 +69,7 @@ class TWS_event_handler(EWrapper):
             
 
     
-    def tick_process_message(self, message_name, items, source):
+    def tick_process_message(self, message_name, items):
         
         t = {}
         t = items.copy()
@@ -116,10 +93,6 @@ class TWS_event_handler(EWrapper):
                 t[k] = v
         
                
-        t['ts'] = time.time()
-        t['typeName'] = message_name
-        t['source'] = source
-            
         
         return t  
             
@@ -139,12 +112,10 @@ class TWS_event_handler(EWrapper):
         pass
 
     def tickGeneric(self, tickerId, tickType, value):
-        #self.broadcast_event('tickGeneric', vars())
-        self.broadcast_event('tickGeneric', vars()) #vars())
+        self.broadcast_event('tickGeneric', vars()) 
 
     def tickString(self, tickerId, tickType, value):
-        #self.broadcast_event('tickString', vars())
-        self.broadcast_event('tickString', vars()) #vars())
+        self.broadcast_event('tickString', vars()) 
 
     def tickEFP(self, tickerId, tickType, basisPoints, formattedBasisPoints, impliedFuture, holdDays, futureExpiry, dividendImpact, dividendsToExpiry):
         self.broadcast_event('tickEFP', vars())
@@ -274,22 +245,6 @@ class TWS_event_handler(EWrapper):
 
 class TWS_gateway(threading.Thread):
 
-    # config
-    config = None
-    # redis connection
-    rs = None
-
-    
-    # channel clients' requests to IB/TWS
-    gw_message_handler = None
-
-    # manage conID / contracts mapping
-    contract_subscription_mgr = None  
-    
-    connection = None
-    
-    # handler to process incoming IB/TWS messages and echo back to clients  
-    tws_event_handler = None
     
     # monitor IB connection / heart beat
     ibh = None
@@ -298,50 +253,59 @@ class TWS_gateway(threading.Thread):
     ib_order_transmit = False
     
     
-    def __init__(self, host, port, clientId, kafka_host, kafka_port, config):
+    def __init__(self, kwargs):
+        
+
+        '''
+         kwargs 
+        
+         'name'
+         'bootstrap_host'
+         'bootstrap_port'
+         'redis_host'
+         'redis_port'
+         'redis_db'
+         'group_id'
+         'session_timeout_ms'
+         'topics'
+         'clear_offsets'
+         'order_transmit
+         
+        '''
+        
         super(TWS_gateway, self).__init__()
-        self.config = config
-        self.host = host
-        self.port = port
-        self.clientId = clientId
-        self.ib_order_transmit = config.get("tws_gateway", "tws_gateway.order_transmit").strip('"').strip("'") if \
-                                        config.get("tws_gateway", "tws_gateway.order_transmit").strip('"').strip("'") <> None\
-                                        else False
+        self.kwargs = copy.copy(kwargs)
+        self.ib_order_transmit = self.kwargs['order_transmit']
         
         logging.info('starting up TWS_gateway...')
         logging.info('Order straight through (no-touch) flag = %s' % ('True' if self.ib_order_transmit == True else 'False'))
 
-        logging.info('connecting to Redis server...')
-        self.initialize_redis(config)
+
+        '''
+            TWS_gateway start up sequence
+            
+            1. establish redis connection
+            2. initialize prosumer instance
+            3. establish TWS gateway connectivity
+            
+            4. initialize listeners: ClientRequestHandler and SubscriptionManager
+            5. start the prosumer 
         
+        '''
         
-        logging.info('starting up gateway message handler - kafka Prosumer...')
-        client_requests = list(TWS_Protocol.topicMethods) + list(TWS_Protocol.gatewayMethods)
-        self.gw_message_handler = Prosumer(name='tws_gw_prosumer', kwargs={'bootstrap_host':'localhost', 'bootstrap_port':9092,
-                                        'redis_host':'localhost', 'redis_port':6379, 'redis_db':0,
-                                        'group_id': 'groupA', 'session_timeout_ms':10000,
-                                                 'topics': client_requests, 'clear_offsets' : 0})
         
-        logging.info('register listeners for client requests')
+        logging.info('establishing redis connection...')
+        self.initialize_redis(config)
+        
         
         logging.info('starting up TWS_event_handler...')
         
         self.tws_event_handler = TWS_event_handler(self.gw_message_handler)
         logging.info('starting up IB EClientSocket...')
-        self.connection = EClientSocket(self.tws_event_handler)
-        
+        self.tws_connection = EClientSocket(self.tws_event_handler)
         
-
         
-
-
-        
-        
-
-
-         
-        
-
+        logging.info('establishing TWS gateway connectivity...')
         if not self.eConnect():
             logging.error('TWS_gateway: unable to establish connection to IB %s:%d' % (self.host, self.port))
             sys.exit(-1)
@@ -354,15 +318,30 @@ class TWS_gateway(threading.Thread):
             self.ibh.run()  
 
 
-        logging.info('starting up subscription manager...')
+        logging.info('starting up gateway message handler - kafka Prosumer...')
+        client_requests = list(TWS_Protocol.topicMethods) + list(TWS_Protocol.gatewayMethods)
+        self.gw_message_handler = Prosumer(name='tws_gw_prosumer', kwargs={'bootstrap_host':'localhost', 'bootstrap_port':9092,
+                                        'redis_host':'localhost', 'redis_port':6379, 'redis_db':0,
+                                        'group_id': 'groupA', 'session_timeout_ms':10000,
+                                                 'topics': client_requests, 'clear_offsets' : 0})
+
+        logging.info('instantiating listeners...cli_req_handler')        
+        self.cli_req_handler = ClientRequestHandler('client_request_handler', self.gw_message_handler)
+        logging.info('instantiating listeners subscription manager...')
         self.initialize_subscription_mgr()
+        logging.info('registering messages to listen...')
+        self.gw_message_handler.add_listeners([self.cli_req_handler])
+        self.gw_message_handler.add_listener_topics(self.contract_subscription_mgr, 'reqMktData')
 
+        logging.info('Completed initialization sequence.')
 
 
     def initialize_subscription_mgr(self):
         
-        self.contract_subscription_mgr = SubscriptionManager(self)
+        self.contract_subscription_mgr = SubscriptionManager(self, self.gw_message_handler)
         self.contract_subscription_mgr.register_persistence_callback(self.persist_subscriptions)
+        
+        
         key = self.config.get("tws_gateway",  "subscription_manager.subscriptions.redis_key").strip('"').strip("'")
         if self.rs.get(key):
             #contracts = map(lambda x: ContractHelper.kvstring2contract(x), json.loads(self.rs.get(key)))
@@ -393,22 +372,26 @@ class TWS_gateway(threading.Thread):
         self.rs.set(key, cs)
 
 
-    def initialize_redis(self, config):
-        r_host = config.get("redis", "redis.server").strip('"').strip("'")
-        r_port = config.get("redis", "redis.port")
-        r_db = config.get("redis", "redis.db")     
+    def initialize_redis(self):
 
-        self.rs = redis.Redis(r_host, r_port, r_db)
+        self.rs = redis.Redis(self.kwargs['redis_host'], self.kwargs['redis_port'], self.kwargs['redis_db'])
         try:
             self.rs.client_list()
         except redis.ConnectionError:
-            logging.error('TWS_gateway: unable to connect to redis server using these settings: %s port:%d db:%d' % (r_host, r_port, r_db))
+            logging.error('TWS_gateway: unable to connect to redis server using these settings: %s port:%d db:%d' % 
+                          (self.kwargs['redis_host'], self.kwargs['redis_port'], self.kwargs['redis_db']))
             logging.error('aborting...')
             sys.exit(-1)
             
 
+    def eConnect(self):
+        logging.info('ClientRequestHandler - eConnect. Connecting to %s:%s App Id: %s' % (self.host, self.port, self.clientId))
+        self.tws_connection.eConnect(self.host, self.port, self.clientId)
+        return self.tws_connection.isConnected()
 
-
+    def eDisconnect(self, value=None):
+        sleep(2)
+        self.tws_connection.eDisconnect()
 
     def run(self):
 
@@ -438,7 +421,7 @@ class TWS_gateway(threading.Thread):
             
             self.eDisconnect()
             self.eConnect()
-            while not self.connection.isConnected():
+            while not self.tws_connection.isConnected():
                 logging.error('TWS_gateway: attempt to reconnect...')
                 self.eConnect()
                 sleep(2)
@@ -457,12 +440,9 @@ class ClientRequestHandler(BaseMessageListener):
     
     def __init__(self, name, tws_gateway):
         BaseMessageListener.__init__(self, name, tws_gateway)
-        self.tws_connect = tws_gateway.connection
+        self.tws_connect = tws_gateway.tws_connection
             
-    def eConnect(self):
-        logging.info('ClientRequestHandler - eConnect. Connecting to %s:%s App Id: %s' % (self.host, self.port, self.clientId))
-        self.tws_connect.eConnect(self.host, self.port, self.clientId)
-        return self.tws_connect.isConnected()
+
     
     
     def reqAccountUpdates(self, value=None):
@@ -567,13 +547,12 @@ class ClientRequestHandler(BaseMessageListener):
         self.tws_connect.placeOrder(vals[0], ContractHelper.kvstring2object(vals[1], Contract), OrderHelper.kvstring2object(vals[2], Order))
     #        self.connection.placeOrder(orderId, contract, newOptOrder)
         
-    def eDisconnect(self, value=None):
-        sleep(2)
-        self.tws_connect.eDisconnect()
+
     
     
-    ####################################################################3
-    #   Gateway commands
+    """
+       Client requests to TWS_gateway
+    """
     def gw_req_subscriptions(self, value=None):
         
         #subm = map(lambda i: ContractHelper.contract2kvstring(self.contract_subscription_mgr.handle[i]), range(len(self.contract_subscription_mgr.handle)))
@@ -582,27 +561,13 @@ class ClientRequestHandler(BaseMessageListener):
         
         print subm
         if subm:
-            self.tws_event_handler.broadcast_event('gw_subscriptions',  {'subscriptions': subm}, source='GW')
+            self.producer.send_message('gw_subscriptions', self.produce.message_dumps({'subscriptions': subm}))
             
             
-       
-    #####################################################################
-    #    
-    #    broadcast gateway notifications  
-    def gw_notify_subscription_changed(self, value): 
-        #
-        # this function is triggered by SubscriptionManager
-        # value param:
-        #
-        #     {id: contractkv_str}
-        #
-        logging.info("TWS_gateway:gw_notify_subscription_changed: %s" % value)
-        self.tws_event_handler.broadcast_event('gw_subscription_changed',  value, source='GW')
     
         
 class SubscriptionManager(BaseMessageListener):
     
-    parent = None
     # array list of contracts
     handle = []
     # contract key map to contract ID (index of the handle array)
@@ -612,8 +577,10 @@ class SubscriptionManager(BaseMessageListener):
     
     def __init__(self, name, tws_gateway):
         BaseMessageListener.__init__(self, name, tws_gateway)
-        self.tws_connect = tws_gateway.connection    
+        self.tws_connect = tws_gateway.tws_connection    
    
+        
+        
     def load_subscription(self, contracts):
         for c in contracts:
             self.reqMktData(c)
@@ -687,8 +654,8 @@ class SubscriptionManager(BaseMessageListener):
         #
         # instruct gateway to broadcast new id has been assigned to a new contract
         #
-        
-        >>>self.parent.gw_notify_subscription_changed({id: ContractHelper.object2kvstring(contract)})
+        self.producer.send_message('gw_notify_subscription_changed', self.producer.message_dumps({id: ContractHelper.object2kvstring(contract)}))
+        #>>>self.parent.gw_notify_subscription_changed({id: ContractHelper.object2kvstring(contract)})
         logging.info('SubscriptionManager reqMktData: gw_notify_subscription_changed: %d:%s' % (id, ContractHelper.makeRedisKeyEx(contract)))
         
         
@@ -824,4 +791,4 @@ if __name__ == '__main__':
 
 
 #    test_subscription()
-    
+