Ver código fonte

Merge branch 'ironfly' of https://github.com/laxaurus/finopt.git into ironfly

bobhk 9 anos atrás
pai
commit
b1c4ea3810
1 arquivos alterados com 129 adições e 179 exclusões
  1. 129 179
      src/comms/tws_gateway.py

+ 129 - 179
src/comms/tws_gateway.py

@@ -1,9 +1,6 @@
 #!/usr/bin/env python
 #!/usr/bin/env python
 # -*- coding: utf-8 -*-
 # -*- coding: utf-8 -*-
 
 
-
-
-
 import sys
 import sys
 from time import sleep, strftime
 from time import sleep, strftime
 import time, datetime
 import time, datetime
@@ -28,12 +25,11 @@ from kafka import KafkaConsumer
 from kafka.errors import KafkaError
 from kafka.errors import KafkaError
 
 
 
 
-#from kafka.client import KafkaClient
-#from kafka.producer import SimpleProducer
-#from kafka.common import LeaderNotAvailableError
-
 from misc2.helpers import ContractHelper, OrderHelper, ExecutionFilterHelper
 from misc2.helpers import ContractHelper, OrderHelper, ExecutionFilterHelper
 from comms.ib_heartbeat import IbHeartBeat
 from comms.ib_heartbeat import IbHeartBeat
+
+from comms.test.base_messaging import Prosumer, BaseMessageListener
+from misc2.observer import Subscriber, Publisher
 from tws_protocol_helper import TWS_Protocol 
 from tws_protocol_helper import TWS_Protocol 
 
 
 import redis
 import redis
@@ -44,66 +40,34 @@ import redis
 class TWS_event_handler(EWrapper):
 class TWS_event_handler(EWrapper):
 
 
     TICKER_GAP = 1000
     TICKER_GAP = 1000
-    producer = None
     
     
-    def __init__(self, host, port):
+    
+    def __init__(self, tws_gateway):
         
         
-        #client = KafkaClient()#{'bootstrap_servers': '%s:%s' % (host, port)})
-        #self.producer = SimpleProducer(client, async=False)
-        self.producer = KafkaProducer(bootstrap_servers='%s:%s' % (host, port))    
-        logging.info('TWS_event_handler: __init__ Creating kafka client producer at %s:%s' % (host, port))
+        # reference to the parent TWS_gateway
+        self.producer = tws_gateway
  
  
  
  
-    def serialize_vars_to_dict(self, message, mapping, source='IB'):
-        def create_kmessage(items):
-            d = {}
-            for k,v in items:
-                #print k, v, type(v)
-                #if type(v) in [Contract, Execution, ExecutionFilter, OrderState, Order, CommissionReport]:
-                if 'ib.ext.' in str(type(v)):     
-                    d[k] = v.__dict__
-                else:
-                    d[k] = v
-    
-               
-            d['ts'] = time.time()
-            d['typeName'] = message
-            d['source'] = source
-            return d
-        
-        
-        try:
-            del(mapping['self'])
-        except (KeyError, ):
-            pass
-        items = list(mapping.items())
-        return create_kmessage(items)
+
  
  
-    def broadcast_event(self, message, mapping, source='IB'):
+    def broadcast_event(self, message, mapping): #, source='IB'):
 
 
         try:
         try:
-            dict = self.serialize_vars_to_dict(message, mapping, source)     
+            dict = self.tick_process_message(message, mapping) #, source)     
             if message == 'gw_subscriptions' or message == 'gw_subscription_changed':   
             if message == 'gw_subscriptions' or message == 'gw_subscription_changed':   
                 logging.info('TWS_event_handler: broadcast event: %s [%s]' % (dict['typeName'], dict))
                 logging.info('TWS_event_handler: broadcast event: %s [%s]' % (dict['typeName'], dict))
-            self.producer.send(message, json.dumps(dict))    
+            self.producer.send(message, self.producer.message_dumps(dict))    
         except:
         except:
             logging.error('broadcast_event: exception while encoding IB event to client:  [%s]' % message)
             logging.error('broadcast_event: exception while encoding IB event to client:  [%s]' % message)
             logging.error(traceback.format_exc())
             logging.error(traceback.format_exc())
             
             
-            #
-            # try to broadcast the message a 2nd time
-            # no catch if fails again
-            if message == 'gw_subscriptions':   
-                sleep(2)
-                logging.info('TWS_event_handler: Retry once broadcasting gw_subscription %s [%s]' % (dict['typeName'], dict))
-                self.producer.send(message, json.dumps(dict))    
             
             
             
             
 
 
     
     
-    def tick_process_message(self, items):
+    def tick_process_message(self, message_name, items):  #, source):
+        
         
         
-        t = {}
         t = items.copy()
         t = items.copy()
         # if the tickerId is in the snapshot range
         # if the tickerId is in the snapshot range
         # deduct the gap to derive the original tickerId
         # deduct the gap to derive the original tickerId
@@ -115,17 +79,32 @@ class TWS_event_handler(EWrapper):
         except (KeyError, ):
         except (KeyError, ):
             pass          
             pass          
         
         
+
+        for k,v in t.iteritems():
+                #print k, v, type(v)
+                #if type(v) in [Contract, Execution, ExecutionFilter, OrderState, Order, CommissionReport]:
+            if 'ib.ext.' in str(type(v)):     
+                t[k] = v.__dict__
+            else:
+                t[k] = v
+        
+               
+#         t['ts'] = time.time()
+#         t['typeName'] = message_name
+#         t['source'] = source
+            
+        
         return t  
         return t  
             
             
                 
                 
     
     
     def tickPrice(self, tickerId, field, price, canAutoExecute):
     def tickPrice(self, tickerId, field, price, canAutoExecute):
         
         
-        self.broadcast_event('tickPrice', self.tick_process_message(vars()))
+        self.broadcast_event('tickPrice', vars())
 
 
     def tickSize(self, tickerId, field, size):
     def tickSize(self, tickerId, field, size):
         
         
-        self.broadcast_event('tickSize', self.tick_process_message(vars())) #vars())
+        self.broadcast_event('tickSize', vars()) #vars())
 
 
     def tickOptionComputation(self, tickerId, field, impliedVol, delta, optPrice, pvDividend, gamma, vega, theta, undPrice):
     def tickOptionComputation(self, tickerId, field, impliedVol, delta, optPrice, pvDividend, gamma, vega, theta, undPrice):
         
         
@@ -134,11 +113,11 @@ class TWS_event_handler(EWrapper):
 
 
     def tickGeneric(self, tickerId, tickType, value):
     def tickGeneric(self, tickerId, tickType, value):
         #self.broadcast_event('tickGeneric', vars())
         #self.broadcast_event('tickGeneric', vars())
-        self.broadcast_event('tickGeneric', self.tick_process_message(vars())) #vars())
+        self.broadcast_event('tickGeneric', vars()) #vars())
 
 
     def tickString(self, tickerId, tickType, value):
     def tickString(self, tickerId, tickType, value):
         #self.broadcast_event('tickString', vars())
         #self.broadcast_event('tickString', vars())
-        self.broadcast_event('tickString', self.tick_process_message(vars())) #vars())
+        self.broadcast_event('tickString', vars()) #vars())
 
 
     def tickEFP(self, tickerId, tickType, basisPoints, formattedBasisPoints, impliedFuture, holdDays, futureExpiry, dividendImpact, dividendsToExpiry):
     def tickEFP(self, tickerId, tickType, basisPoints, formattedBasisPoints, impliedFuture, holdDays, futureExpiry, dividendImpact, dividendsToExpiry):
         self.broadcast_event('tickEFP', vars())
         self.broadcast_event('tickEFP', vars())
@@ -266,7 +245,7 @@ class TWS_event_handler(EWrapper):
 
 
 
 
 
 
-class TWS_gateway(threading.Thread):
+class TWS_gateway():
 
 
     # config
     # config
     config = None
     config = None
@@ -275,7 +254,7 @@ class TWS_gateway(threading.Thread):
 
 
     
     
     # channel clients' requests to IB/TWS
     # channel clients' requests to IB/TWS
-    cli_request_handler = None
+    gw_message_prosumer = None
 
 
     # manage conID / contracts mapping
     # manage conID / contracts mapping
     contract_subscription_mgr = None  
     contract_subscription_mgr = None  
@@ -304,30 +283,34 @@ class TWS_gateway(threading.Thread):
         
         
         logging.info('starting up TWS_gateway...')
         logging.info('starting up TWS_gateway...')
         logging.info('Order straight through (no-touch) flag = %s' % ('True' if self.ib_order_transmit == True else 'False'))
         logging.info('Order straight through (no-touch) flag = %s' % ('True' if self.ib_order_transmit == True else 'False'))
-        
+
         logging.info('connecting to Redis server...')
         logging.info('connecting to Redis server...')
         self.initialize_redis(config)
         self.initialize_redis(config)
         
         
+        
+        logging.info('starting up gateway message handler - kafka Prosumer...')
+        client_requests = list(TWS_Protocol.topicMethods) + list(TWS_Protocol.gatewayMethods)
+        self.gw_message_prosumer = Prosumer(name='tws_gw_prosumer', kwargs={'bootstrap_host':'localhost', 'bootstrap_port':9092,
+                                        'redis_host':'localhost', 'redis_port':6379, 'redis_db':0,
+                                        'group_id': 'groupA', 'session_timeout_ms':10000,
+                                                 'topics': client_requests, 'clear_offsets' : 0})
+        
+        logging.info('register listeners for client requests')
+        
         logging.info('starting up TWS_event_handler...')
         logging.info('starting up TWS_event_handler...')
-        self.tws_event_handler = TWS_event_handler(kafka_host, kafka_port)
         
         
+        self.tws_event_handler = TWS_event_handler(self.gw_message_prosumer)
         logging.info('starting up IB EClientSocket...')
         logging.info('starting up IB EClientSocket...')
         self.connection = EClientSocket(self.tws_event_handler)
         self.connection = EClientSocket(self.tws_event_handler)
         
         
         
         
-        logging.info('starting up client request handler - kafkaConsumer...')
-        self.cli_request_handler = KafkaConsumer( *[v for v in list(TWS_Protocol.topicMethods) + list(TWS_Protocol.gatewayMethods) ], \
-                                   bootstrap_servers=['%s:%s' % (kafka_host, kafka_port)],\
-                                   group_id = 'epc.tws_gateway',\
-                                   enable_auto_commit=True,\
-                                   auto_commit_interval_ms=30 * 1000,\
-                                   auto_offset_reset='latest') # discard old ones
-        
-        #self.reset_message_offset()
-        
 
 
+        
 
 
         
         
+        #
+        #    register listeners
+        #
         
         
 
 
 
 
@@ -350,6 +333,8 @@ class TWS_gateway(threading.Thread):
         self.initialize_subscription_mgr()
         self.initialize_subscription_mgr()
 
 
 
 
+    def start_gateway(self):
+        self.get_gw_message_prosumer().start_prosumer()
 
 
     def initialize_subscription_mgr(self):
     def initialize_subscription_mgr(self):
         
         
@@ -399,40 +384,20 @@ class TWS_gateway(threading.Thread):
             sys.exit(-1)
             sys.exit(-1)
             
             
 
 
-    def reset_message_offset(self):
-        topic_offsets =  map(lambda topic: (topic, self.cli_request_handler.get_partition_offsets(topic, 0, -1, 999)), TWS_Protocol.topicMethods + TWS_Protocol.gatewayMethods)
-        topic_offsets = filter(lambda x: x <> None, map(lambda x: (x[0], x[1][1], x[1][0]) if len(x[1]) > 1 else None, topic_offsets))
-        logging.info('TWS_gateway set topic offset to the latest point\n%s' % (''.join('%s,%s,%s\n' % (x[0], x[1], x[2]) for x in topic_offsets)))
-
-        # the set_topic_partitions method clears out all previous settings when executed
-        # therefore it's not possible to call the function multiple times:
-        # self.consumer.set_topic_partitions(('gw_subscriptions', 0, 114,)
-        # self.consumer.set_topic_partitions(('tickPrice', 0, 27270,))
-        # as the second call will wipe out whatever was done previously
-
-        self.cli_request_handler.set_topic_partitions(*topic_offsets)
-        
-        
-        
-
-
-
-    def run(self):
+    def eConnect(self):
+        logging.info('ClientRequestHandler - eConnect. Connecting to %s:%s App Id: %s' % (self.host, self.port, self.clientId))
+        self.connection.eConnect(self.host, self.port, self.clientId)
+        return self.tws_connect.isConnected()
 
 
+    def eDisconnect(self, value=None):
+        sleep(2)
+        self.connection.eDisconnect()
+    
 
 
-        for message in self.cli_request_handler:
-             
-            logging.info("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
-                                         message.offset, message.key,
-                                         message.value))
+    def get_gw_message_prosumer(self):
+        return self.gw_message_prosumer
+    
  
  
-#             print ("TWS_gateway: received client request %s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
-#                                          message.offset, message.key,
-#                                          message.value))
-             
-
-            getattr(self, message.topic, None)(message.value)
-            #self.cli_request_handler.task_done(message)
 
 
 
 
     def on_ib_conn_broken(self, msg):
     def on_ib_conn_broken(self, msg):
@@ -460,77 +425,74 @@ class TWS_gateway(threading.Thread):
             self.tlock.release()          
             self.tlock.release()          
         
         
 
 
+class ClientRequestHandler(BaseMessageListener):
+    
+    def __init__(self, name, tws_gateway):
+        BaseMessageListener.__init__(self, name, tws_gateway)
+        self.producer = tws_gateway
+        self.tws_connect = tws_gateway.connection
+            
     
     
-    def eConnect(self):
-        logging.info('TWS_gateway - eConnect. Connecting to %s:%s App Id: %s' % (self.host, self.port, self.clientId))
-        self.connection.eConnect(self.host, self.port, self.clientId)
-        return self.connection.isConnected()
-
     
     
     def reqAccountUpdates(self, value=None):
     def reqAccountUpdates(self, value=None):
-        logging.info('TWS_gateway - reqAccountUpdates value=%s' % value)
-        self.connection.reqAccountUpdates(1, '')
-
+        logging.info('ClientRequestHandler - reqAccountUpdates value=%s' % value)
+        self.tws_connect.reqAccountUpdates(1, '')
+    
     def reqAccountSummary(self, value):
     def reqAccountSummary(self, value):
-        logging.info('TWS_gateway - reqAccountSummary value=%s' % value)
+        logging.info('ClientRequestHandler - reqAccountSummary value=%s' % value)
         
         
         vals = map(lambda x: x.encode('ascii') if isinstance(x, unicode) else x, json.loads(value))
         vals = map(lambda x: x.encode('ascii') if isinstance(x, unicode) else x, json.loads(value))
-        self.connection.reqAccountSummary(vals[0], vals[1], vals[2])
+        self.tws_connect.reqAccountSummary(vals[0], vals[1], vals[2])
         
         
     def reqOpenOrders(self, value=None):
     def reqOpenOrders(self, value=None):
-        self.connection.reqOpenOrders()
-
+        self.tws_connect.reqOpenOrders()
+    
     def reqPositions(self, value=None):
     def reqPositions(self, value=None):
-        self.connection.reqPositions()
+        self.tws_connect.reqPositions()
         
         
         
         
     def reqExecutions(self, value):
     def reqExecutions(self, value):
         try:
         try:
             filt = ExecutionFilter() if value == '' else ExecutionFilterHelper.kvstring2object(value, ExecutionFilter)
             filt = ExecutionFilter() if value == '' else ExecutionFilterHelper.kvstring2object(value, ExecutionFilter)
-            self.connection.reqExecutions(0, filt)
+            self.tws_connect.reqExecutions(0, filt)
         except:
         except:
             logging.error(traceback.format_exc())
             logging.error(traceback.format_exc())
-
+    
     
     
     def reqIds(self, value=None):
     def reqIds(self, value=None):
-        self.connection.reqIds(1)
-
+        self.tws_connect.reqIds(1)
+    
     
     
     def reqNewsBulletins(self):
     def reqNewsBulletins(self):
-        self.connection.reqNewsBulletins(1)
-
+        self.tws_connect.reqNewsBulletins(1)
+    
     
     
     def cancelNewsBulletins(self):
     def cancelNewsBulletins(self):
-        self.connection.cancelNewsBulletins()
-
+        self.tws_connect.cancelNewsBulletins()
+    
     
     
     def setServerLogLevel(self):
     def setServerLogLevel(self):
-        self.connection.setServerLogLevel(3)
-
+        self.tws_connect.setServerLogLevel(3)
+    
     
     
     def reqAutoOpenOrders(self):
     def reqAutoOpenOrders(self):
-        self.connection.reqAutoOpenOrders(1)
-
+        self.tws_connect.reqAutoOpenOrders(1)
+    
     
     
     def reqAllOpenOrders(self):
     def reqAllOpenOrders(self):
-        self.connection.reqAllOpenOrders()
-
+        self.tws_connect.reqAllOpenOrders()
+    
     
     
     def reqManagedAccts(self):
     def reqManagedAccts(self):
-        self.connection.reqManagedAccts()
-
+        self.tws_connect.reqManagedAccts()
+    
     
     
     def requestFA(self):
     def requestFA(self):
-        self.connection.requestFA(1)
-
+        self.tws_connect.requestFA(1)
+    
     
     
     def reqMktData(self, sm_contract):
     def reqMktData(self, sm_contract):
-        logging.info('TWS Gateway received reqMktData request: %s' % sm_contract)
-        try:
-            #self.contract_subscription_mgr.reqMktData(ContractHelper.kvstring2contract(sm_contract))
-            self.contract_subscription_mgr.reqMktData(ContractHelper.kvstring2object(sm_contract, Contract))
-        except:
-            pass
+        logging.info('ClientRequestHandler received reqMktData request <no action: pass...> : %s' % sm_contract)
     
     
     def reqHistoricalData(self):
     def reqHistoricalData(self):
         contract = Contract()
         contract = Contract()
@@ -538,7 +500,7 @@ class TWS_gateway(threading.Thread):
         contract.m_secType = 'STK'
         contract.m_secType = 'STK'
         contract.m_exchange = 'SMART'
         contract.m_exchange = 'SMART'
         endtime = strftime('%Y%m%d %H:%M:%S')
         endtime = strftime('%Y%m%d %H:%M:%S')
-        self.connection.reqHistoricalData(
+        self.tws_connect.reqHistoricalData(
             tickerId=1,
             tickerId=1,
             contract=contract,
             contract=contract,
             endDateTime=endtime,
             endDateTime=endtime,
@@ -547,7 +509,7 @@ class TWS_gateway(threading.Thread):
             whatToShow='TRADES',
             whatToShow='TRADES',
             useRTH=0,
             useRTH=0,
             formatDate=1)
             formatDate=1)
-
+    
     
     
     def placeOrder(self, value=None):
     def placeOrder(self, value=None):
         logging.info('TWS_gateway - placeOrder value=%s' % value)
         logging.info('TWS_gateway - placeOrder value=%s' % value)
@@ -557,25 +519,23 @@ class TWS_gateway(threading.Thread):
             logging.error('TWS_gateway - placeOrder Exception %s' % traceback.format_exc())
             logging.error('TWS_gateway - placeOrder Exception %s' % traceback.format_exc())
             return
             return
         
         
-#        c = ContractHelper.kvstring2contract(vals[1])
+    #        c = ContractHelper.kvstring2contract(vals[1])
         o = OrderHelper.kvstring2object(vals[2], Order)
         o = OrderHelper.kvstring2object(vals[2], Order)
         o.__dict__['transmit'] = self.ib_order_transmit
         o.__dict__['transmit'] = self.ib_order_transmit
-#         print c.__dict__
-#         print o.__dict__
-#         print '---------------------'
-
+    #         print c.__dict__
+    #         print o.__dict__
+    #         print '---------------------'
+    
            
            
         #self.connection.placeOrder(vals[0], ContractHelper.kvstring2contract(vals[1]), OrderHelper.kvstring2object(vals[2], Order))
         #self.connection.placeOrder(vals[0], ContractHelper.kvstring2contract(vals[1]), OrderHelper.kvstring2object(vals[2], Order))
-        self.connection.placeOrder(vals[0], ContractHelper.kvstring2object(vals[1], Contract), OrderHelper.kvstring2object(vals[2], Order))
-#        self.connection.placeOrder(orderId, contract, newOptOrder)
+        self.tws_connect.placeOrder(vals[0], ContractHelper.kvstring2object(vals[1], Contract), OrderHelper.kvstring2object(vals[2], Order))
+    #        self.connection.placeOrder(orderId, contract, newOptOrder)
         
         
-    def eDisconnect(self, value=None):
-        sleep(2)
-        self.connection.eDisconnect()
- 
- 
-####################################################################3
-#   Gateway commands
+
+    
+    """
+       Client requests to TWS_gateway
+    """
     def gw_req_subscriptions(self, value=None):
     def gw_req_subscriptions(self, value=None):
         
         
         #subm = map(lambda i: ContractHelper.contract2kvstring(self.contract_subscription_mgr.handle[i]), range(len(self.contract_subscription_mgr.handle)))
         #subm = map(lambda i: ContractHelper.contract2kvstring(self.contract_subscription_mgr.handle[i]), range(len(self.contract_subscription_mgr.handle)))
@@ -584,27 +544,15 @@ class TWS_gateway(threading.Thread):
         
         
         print subm
         print subm
         if subm:
         if subm:
-            self.tws_event_handler.broadcast_event('gw_subscriptions',  {'subscriptions': subm}, source='GW')
+            self.producer.send_message('gw_subscriptions', self.produce.message_dumps({'subscriptions': subm}))
             
             
             
             
        
        
-#####################################################################
-#    
-#    broadcast gateway notifications  
-    def gw_notify_subscription_changed(self, value): 
-        #
-        # this function is triggered by SubscriptionManager
-        # value param:
-        #
-        #     {id: contractkv_str}
-        #
-        logging.info("TWS_gateway:gw_notify_subscription_changed: %s" % value)
-        self.tws_event_handler.broadcast_event('gw_subscription_changed',  value, source='GW')
-        
+    
         
         
-class SubscriptionManager():
+class SubscriptionManager(BaseMessageListener):
+    
     
     
-    parent = None
     # array list of contracts
     # array list of contracts
     handle = []
     handle = []
     # contract key map to contract ID (index of the handle array)
     # contract key map to contract ID (index of the handle array)
@@ -612,9 +560,10 @@ class SubscriptionManager():
     
     
     persist_f = None
     persist_f = None
     
     
-    def __init__(self, parent=None):
-        self.parent = parent
-    
+    def __init__(self, name, tws_gateway):
+        BaseMessageListener.__init__(self, name)
+        self.producer = tws_gateway
+        self.tws_connect = tws_gateway.connection    
    
    
     def load_subscription(self, contracts):
     def load_subscription(self, contracts):
         for c in contracts:
         for c in contracts:
@@ -649,12 +598,12 @@ class SubscriptionManager():
 #         print stkContract.__dict__         
 #         print stkContract.__dict__         
 #         self.parent.connection.reqMktData(2, stkContract, '', False)     
 #         self.parent.connection.reqMktData(2, stkContract, '', False)     
 #             
 #             
-
             
             
-    def reqMktData(self, contract):
+    def reqMktData(self, kvs_contract):
                   
                   
         
         
         #logging.info('SubscriptionManager: reqMktData')
         #logging.info('SubscriptionManager: reqMktData')
+        logging.info('SubscriptionManager received reqMktData request: %s' % kvs_contract)
   
   
         def add_subscription(contract):
         def add_subscription(contract):
             self.handle.append(contract)
             self.handle.append(contract)
@@ -663,6 +612,7 @@ class SubscriptionManager():
              
              
             return newId
             return newId
   
   
+        contract = ContractHelper.kvstring2object(kvs_contract, Contract)
         id = self.is_subscribed(contract)
         id = self.is_subscribed(contract)
         if id == -1: # not found
         if id == -1: # not found
             id = add_subscription(contract)
             id = add_subscription(contract)
@@ -671,7 +621,7 @@ class SubscriptionManager():
             # the conId must be set to zero when calling TWS reqMktData
             # the conId must be set to zero when calling TWS reqMktData
             # otherwise TWS will fail to subscribe the contract
             # otherwise TWS will fail to subscribe the contract
             
             
-            self.parent.connection.reqMktData(id, contract, '', False) 
+            self.tws_connect.reqMktData(id, contract, '', False) 
             
             
             
             
                    
                    
@@ -682,14 +632,15 @@ class SubscriptionManager():
             logging.info('SubscriptionManager: reqMktData. Requesting market data, id = %d, contract = %s' % (id, ContractHelper.makeRedisKeyEx(contract)))
             logging.info('SubscriptionManager: reqMktData. Requesting market data, id = %d, contract = %s' % (id, ContractHelper.makeRedisKeyEx(contract)))
         
         
         else:    
         else:    
-            self.parent.connection.reqMktData(1000 + id, contract, '', True)
+            self.tws_connect.reqMktData(1000 + id, contract, '', True)
             logging.info('SubscriptionManager: reqMktData: contract already subscribed. Request snapshot = %d, contract = %s' % (id, ContractHelper.makeRedisKeyEx(contract)))
             logging.info('SubscriptionManager: reqMktData: contract already subscribed. Request snapshot = %d, contract = %s' % (id, ContractHelper.makeRedisKeyEx(contract)))
         #self.dump()
         #self.dump()
 
 
         #
         #
         # instruct gateway to broadcast new id has been assigned to a new contract
         # instruct gateway to broadcast new id has been assigned to a new contract
         #
         #
-        self.parent.gw_notify_subscription_changed({id: ContractHelper.object2kvstring(contract)})
+        self.producer.send_message('gw_notify_subscription_changed', self.producer.message_dumps({id: ContractHelper.object2kvstring(contract)}))
+        #>>>self.parent.gw_notify_subscription_changed({id: ContractHelper.object2kvstring(contract)})
         logging.info('SubscriptionManager reqMktData: gw_notify_subscription_changed: %d:%s' % (id, ContractHelper.makeRedisKeyEx(contract)))
         logging.info('SubscriptionManager reqMktData: gw_notify_subscription_changed: %d:%s' % (id, ContractHelper.makeRedisKeyEx(contract)))
         
         
         
         
@@ -711,7 +662,7 @@ class SubscriptionManager():
     def force_resubscription(self):
     def force_resubscription(self):
         # starting from index 1 of the contract list, call  reqmktdata, and format the result into a list of tuples
         # starting from index 1 of the contract list, call  reqmktdata, and format the result into a list of tuples
         for i in range(1, len(self.handle)):
         for i in range(1, len(self.handle)):
-            self.parent.connection.reqMktData(i, self.handle[i], '', False)
+            self.tws_connect.reqMktData(i, self.handle[i], '', False)
             logging.info('force_resubscription: %s' % ContractHelper.printContract(self.handle[i]))
             logging.info('force_resubscription: %s' % ContractHelper.printContract(self.handle[i]))
        
        
             
             
@@ -815,8 +766,7 @@ if __name__ == '__main__':
     #sleep(2)
     #sleep(2)
     
     
     app = TWS_gateway(ihost, iport, iappid, khost, kport, config)
     app = TWS_gateway(ihost, iport, iappid, khost, kport, config)
-    app.start()
-     
+    app.start_gateway()
     print 'TWS_gateway started.'
     print 'TWS_gateway started.'
 #     
 #     
 
 
@@ -825,4 +775,4 @@ if __name__ == '__main__':
 
 
 
 
 #    test_subscription()
 #    test_subscription()
-    
+