Explorar el Código

daily commits

bobhk hace 9 años
padre
commit
8f07008da9

BIN
src/comms/alert_bot.pyc


BIN
src/comms/epc.pyc


BIN
src/comms/ib_heartbeat.pyc


+ 95 - 0
src/comms/ibc/gw_ex1.py

@@ -0,0 +1,95 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+from time import sleep, strftime
+import logging
+import json
+
+from ib.ext.Contract import Contract
+
+from misc2.helpers import ContractHelper
+from comms.ibgw.base_messaging import Prosumer
+from comms.tws_protocol_helper import TWS_Protocol
+from comms.ibc.tws_client_lib import TWS_client_manager, AbstractGatewayListener
+
+         
+class MessageListener(AbstractGatewayListener):   
+    def __init__(self, name, parent):
+        AbstractGatewayListener.__init__(self, name)
+        self.parent = parent
+
+    def position(self, event, message_value):  # account, contract, pos, avgCost):
+        logging.info('MessageListener:%s. val->[%s]' % (event, message_value))
+   
+    def positionEnd(self, event, message_value):
+        logging.info('MessageListener:%s. val->[%s]' % (event, message_value))
+        #self.parent.stop_manager()
+        
+    def error(self, event, message_value):
+        logging.info('MessageListener:%s. val->[%s]' % (event, message_value))  
+
+
+    def gw_subscriptions(self, event, message_value):
+        logging.info('MessageListener:%s. val->[%s]' % (event, message_value))
+        
+
+    def gw_subscription_changed(self, event, message_value):
+        logging.info('MessageListener:%s. val->[%s]' % (event, message_value))
+        
+
+    
+
+
+def test_client(kwargs):
+
+    cm = TWS_client_manager(kwargs)
+    cl = MessageListener('gw_client_message_listener', cm)
+    
+    cm.add_listener_topics(cl, kwargs['topics'])
+    cm.start_manager()
+    cm.reqPositions()
+    cm.gw_req_subscriptions()
+    try:
+        logging.info('TWS_gateway:main_loop ***** accepting console input...')
+        while not cm.is_stopped(): 
+        
+            sleep(.45)
+        
+    except (KeyboardInterrupt, SystemExit):
+        logging.error('TWS_client_manager: caught user interrupt. Shutting down...')
+        cm.gw_message_handler.set_stop()
+        
+        logging.info('TWS_client_manager: Service shut down complete...')
+           
+    print 'end of test_client function'
+      
+if __name__ == '__main__':
+    
+
+    
+    kwargs = {
+      'name': 'tws_example1',
+      'bootstrap_host': 'localhost',
+      'bootstrap_port': 9092,
+      'redis_host': 'localhost',
+      'redis_port': 6379,
+      'redis_db': 0,
+      'tws_host': 'localhost',
+      'tws_api_port': 8496,
+      'tws_app_id': 38868,
+      'group_id': 'TWS_CLI_EX1',
+      'session_timeout_ms': 10000,
+      'clear_offsets':  False,
+      'logconfig': {'level': logging.INFO},
+      'topics': ['position', 'positionEnd', 'gw_subscriptions', 'gw_subscription_changed']
+      }
+
+   
+      
+    logconfig = kwargs['logconfig']
+    logconfig['format'] = '%(asctime)s %(levelname)-8s %(message)s'    
+    logging.basicConfig(**logconfig)        
+    
+    
+    test_client(kwargs)
+    
+     

+ 20 - 10
src/comms/ibc/tws_client_lib.py

@@ -73,10 +73,8 @@ class GatewayCommandWrapper():
     
         
 
-    def gw_req_subscriptions(self, event, items):
+    def gw_req_subscriptions(self):
         
-        logging.info("[%s] received gw_req_subscriptions content:[%s]" % (self.name, items))
-        vars= self.producer.message_loads(items['value'])
         self.producer.send_message('gw_req_subscriptions', self.producer.message_dumps(None))
 
 
@@ -140,7 +138,11 @@ class TWS_client_manager(GatewayCommandWrapper):
         
         logging.info('**** Completed initialization sequence. ****')
         
-        
+    def is_stopped(self):
+        return self.gw_message_handler.is_stopped()
+    
+    def stop_manager(self):
+        self.gw_message_handler.set_stop()
 
     def start_manager(self):
         logging.info('start gw_message_handler. Entering processing loop...')
@@ -335,6 +337,9 @@ class AbstractGatewayListener(BaseMessageListener):
     def gw_subscriptions(self, event, message_value):
         raise NotImplementedException        
       
+    def error(self, event, message_value):
+        raise NotImplementedException
+    
     def on_kb_reached_last_offset(self, event, message_value):  # event, items):
         logging.info("[%s] received on_kb_reached_last_offset content: [%s]" % (self.name, message_value))
         print "on_kb_reached_last_offset [%s] %s" % (self.name, message_value)
@@ -366,21 +371,26 @@ class GatewayMessageListener(AbstractGatewayListener):
         AbstractGatewayListener.__init__(self, name)
              
     def tickPrice(self, event, message_value):  # tickerId, field, price, canAutoExecute):
-        logging.info('GatewayMessageListener:tickPrice. val->[%s]' % message_value)
+        logging.info('GatewayMessageListener:%s. val->[%s]' % (event, message_value))
 
     def tickSize(self, event, message_value):  # tickerId, field, price, canAutoExecute):
-        logging.info('GatewayMessageListener:tickSize. val->[%s]' % message_value)
+        logging.info('GatewayMessageListener:%s. val->[%s]' % (event, message_value))
+        
+    def error(self, event, message_value):
+        logging.info('GatewayMessageListener:%s. val->[%s]' % (event, message_value))  
 
 def test_client(kwargs):
-    contractTuple = ('HSI', 'FUT', 'HKFE', 'HKD', '20170330', 0, '')
-    contract = ContractHelper.makeContract(contractTuple)    
+    contractTuples = [('HSI', 'FUT', 'HKFE', 'HKD', '20170330', 0, ''),
+                      ('USD', 'CASH', 'IDEALPRO', 'JPY', '', 0, ''),]
+                      
+        
     print kwargs 
     cm = TWS_client_manager(kwargs)
     cl = GatewayMessageListener('gw_client_message_listener')
     
     cm.add_listener_topics(cl, kwargs['topics'])
     cm.start_manager()
-    cm.reqMktData(contract)
+    map(lambda c: cm.reqMktData(ContractHelper.makeContract(c)), contractTuples)
     try:
         logging.info('TWS_gateway:main_loop ***** accepting console input...')
         while True: 
@@ -390,7 +400,7 @@ def test_client(kwargs):
     except (KeyboardInterrupt, SystemExit):
         logging.error('TWS_client_manager: caught user interrupt. Shutting down...')
         cm.gw_message_handler.set_stop()
-        cm.join()
+        
         logging.info('TWS_client_manager: Service shut down complete...')
            
         

+ 13 - 3
src/comms/ibgw/base_messaging.py

@@ -90,7 +90,7 @@ class BaseProducer(threading.Thread, Subscriber):
             
             
             logging.info ('******** BaseProducer exit done.')
-            
+            producer.close(1)
                 
         except NoBrokersAvailable:
             logging.error("NoBrokersAvailable: Has kafka started?")
@@ -302,7 +302,7 @@ class BaseConsumer(threading.Thread, Publisher):
                 logging.debug('BaseConsumer:run StopIteration Caught. No new message arriving...')
                 continue
             
-            
+        consumer.close()   
         logging.info ('******** BaseConsumer exit done.')
 
 
@@ -386,7 +386,12 @@ class Prosumer(BaseProducer):
         
     
     def add_listener_topics(self, listener, topics):
-        map(lambda e: self.kconsumer.register(e, listener, getattr(listener, e)), topics)
+        try:
+            map(lambda e: self.kconsumer.register(e, listener, getattr(listener, e)), topics)
+        except AttributeError as e:
+            logging.error("Prosumer:add_listener_topics. Function not implemented in the listener. %s" % e)
+            raise NotImplementedException
+            
         
     def add_listeners(self, listeners):
         
@@ -395,12 +400,17 @@ class Prosumer(BaseProducer):
         
     
 
+    def is_stopped(self):
+        return self.stopped
     
     def set_stop(self):
         BaseProducer.set_stop(self)
         self.kconsumer.set_stop()
+        logging.info('Prosumer:set_stop. Pending kconsumer to shutdown in 2s...')
+        self.stopped = True
     
     def start_prosumer(self):
+        self.stopped = False
         self.kconsumer.start()
         self.start()
         

+ 17 - 28
src/comms/ibgw/client_request_handler.py

@@ -23,24 +23,24 @@ class ClientRequestHandler(BaseMessageListener):
 
     
     
-    def reqAccountUpdates(self, value=None):
+    def reqAccountUpdates(self, event, value=None):
         logging.info('ClientRequestHandler - reqAccountUpdates value=%s' % value)
         self.tws_connect.reqAccountUpdates(1, '')
     
-    def reqAccountSummary(self, value):
+    def reqAccountSummary(self, event, value):
         logging.info('ClientRequestHandler - reqAccountSummary value=%s' % value)
         
         vals = map(lambda x: x.encode('ascii') if isinstance(x, unicode) else x, json.loads(value))
         self.tws_connect.reqAccountSummary(vals[0], vals[1], vals[2])
         
-    def reqOpenOrders(self, value=None):
+    def reqOpenOrders(self, event, value=None):
         self.tws_connect.reqOpenOrders()
     
-    def reqPositions(self, value=None):
+    def reqPositions(self, event, value=None):
         self.tws_connect.reqPositions()
         
         
-    def reqExecutions(self, value):
+    def reqExecutions(self, event, value):
         try:
             filt = ExecutionFilter() if value == '' else ExecutionFilterHelper.kvstring2object(value, ExecutionFilter)
             self.tws_connect.reqExecutions(0, filt)
@@ -48,35 +48,35 @@ class ClientRequestHandler(BaseMessageListener):
             logging.error(traceback.format_exc())
     
     
-    def reqIds(self, value=None):
+    def reqIds(self, event, value=None):
         self.tws_connect.reqIds(1)
     
     
-    def reqNewsBulletins(self):
+    def reqNewsBulletins(self, event):
         self.tws_connect.reqNewsBulletins(1)
     
     
-    def cancelNewsBulletins(self):
+    def cancelNewsBulletins(self, event ):
         self.tws_connect.cancelNewsBulletins()
     
     
-    def setServerLogLevel(self):
+    def setServerLogLevel(self, event):
         self.tws_connect.setServerLogLevel(3)
     
     
-    def reqAutoOpenOrders(self):
+    def reqAutoOpenOrders(self, event):
         self.tws_connect.reqAutoOpenOrders(1)
     
     
-    def reqAllOpenOrders(self):
+    def reqAllOpenOrders(self, event):
         self.tws_connect.reqAllOpenOrders()
     
     
-    def reqManagedAccts(self):
+    def reqManagedAccts(self, event):
         self.tws_connect.reqManagedAccts()
     
     
-    def requestFA(self):
+    def requestFA(self, event):
         self.tws_connect.requestFA(1)
     
     
@@ -88,7 +88,7 @@ class ClientRequestHandler(BaseMessageListener):
         except:
             pass
     
-    def reqHistoricalData(self):
+    def reqHistoricalData(self, event):
         contract = Contract()
         contract.m_symbol = 'QQQQ'
         contract.m_secType = 'STK'
@@ -105,7 +105,7 @@ class ClientRequestHandler(BaseMessageListener):
             formatDate=1)
     
     
-    def placeOrder(self, value=None):
+    def placeOrder(self, event, value=None):
         logging.info('TWS_gateway - placeOrder value=%s' % value)
         try:
             vals = json.loads(value)
@@ -128,17 +128,6 @@ class ClientRequestHandler(BaseMessageListener):
 
     
     
-    """
-       Client requests to TWS_gateway
-    """
-    def gw_req_subscriptions(self, value=None):
-        
-        #subm = map(lambda i: ContractHelper.contract2kvstring(self.contract_subscription_mgr.handle[i]), range(len(self.contract_subscription_mgr.handle)))
-        #subm = map(lambda i: ContractHelper.object2kvstring(self.contract_subscription_mgr.handle[i]), range(len(self.contract_subscription_mgr.handle)))
-        subm = map(lambda i: (i, ContractHelper.object2kvstring(self.contract_subscription_mgr.handle[i])), range(len(self.contract_subscription_mgr.handle)))
-        
-        print subm
-        if subm:
-            self.producer.send_message('gw_subscriptions', self.produce.message_dumps({'subscriptions': subm}))
-            
+    def gw_req_subscriptions(self, event, value=None):
+        logging.info('TWS_gateway - gw_req_subscriptions')
             

+ 22 - 30
src/comms/ibgw/subscription_manager.py

@@ -24,7 +24,10 @@ class SubscriptionManager(BaseMessageListener):
         
     def load_subscription(self, contracts):
         for c in contracts:
-            self.reqMktData(c)
+            #print self.tws_connect.isConnected() 
+            print '%s' % (ContractHelper.printContract(c))
+            self.reqMktData('internal', {'value': ContractHelper.contract2kvstring(c)}) 
+            
             
         self.dump()
     
@@ -41,21 +44,6 @@ class SubscriptionManager(BaseMessageListener):
             return self.tickerId[ckey]
     
 
-#     def reqMktDataxx(self, contract):
-#         print '---------------'
-#         contractTuple = ('USO', 'STK', 'SMART', 'USD', '', 0.0, '')
-#         stkContract = self.makeStkContract(contractTuple)     
-#         stkContract.m_includeExpired = False       
-#         self.parent.connection.reqMktData(1, stkContract, '', False)     
-# 
-#         contractTuple = ('IBM', 'STK', 'SMART', 'USD', '', 0.0, '')
-#         stkContract = self.makeStkContract(contractTuple)
-#         stkContract.m_includeExpired = False
-#         print stkContract   
-#         print stkContract.__dict__         
-#         self.parent.connection.reqMktData(2, stkContract, '', False)     
-#             
-
             
     def reqMktData(self, event, message):
                   
@@ -95,24 +83,28 @@ class SubscriptionManager(BaseMessageListener):
         #
         # instruct gateway to broadcast new id has been assigned to a new contract
         #
-        self.producer.send_message('gw_notify_subscription_changed', self.producer.message_dumps({id: ContractHelper.object2kvstring(contract)}))
+        self.producer.send_message('gw_subscription_changed', self.producer.message_dumps({id: ContractHelper.object2kvstring(contract)}))
         #>>>self.parent.gw_notify_subscription_changed({id: ContractHelper.object2kvstring(contract)})
-        logging.info('SubscriptionManager reqMktData: gw_notify_subscription_changed: %d:%s' % (id, ContractHelper.makeRedisKeyEx(contract)))
+        logging.info('SubscriptionManager reqMktData: gw_subscription_changed: %d:%s' % (id, ContractHelper.makeRedisKeyEx(contract)))
         
+    """
+       Client requests to TWS_gateway
+    """
+    def gw_req_subscriptions(self, event, message):
         
+        #subm = map(lambda i: ContractHelper.contract2kvstring(self.contract_subscription_mgr.handle[i]), range(len(self.contract_subscription_mgr.handle)))
+        #subm = map(lambda i: ContractHelper.object2kvstring(self.contract_subscription_mgr.handle[i]), range(len(self.contract_subscription_mgr.handle)))
+        subm = map(lambda i: (i, ContractHelper.object2kvstring(self.handle[i])),
+                    range(len(self.handle)))
         
-#     def makeStkContract(self, contractTuple):
-#         newContract = Contract()
-#         newContract.m_symbol = contractTuple[0]
-#         newContract.m_secType = contractTuple[1]
-#         newContract.m_exchange = contractTuple[2]
-#         newContract.m_currency = contractTuple[3]
-#         newContract.m_expiry = contractTuple[4]
-#         newContract.m_strike = contractTuple[5]
-#         newContract.m_right = contractTuple[6]
-#         print 'Contract Values:%s,%s,%s,%s,%s,%s,%s:' % contractTuple
-#         return newContract        
-   
+        
+        if subm:
+            
+            logging.info('SubscriptionManager:gw_req_subscriptions-------\n%s' % ''.join('\n%s:%s' % (str(v[0]).rjust(6), v[1]) for v in subm))
+            self.producer.send_message('gw_subscriptions', self.producer.message_dumps({'subscriptions': subm}))
+        
+        
+
     # use only after a broken connection is restored
     # to re request market data 
     def force_resubscription(self):

BIN
src/comms/ibgw/tws_event_handler.pyc


+ 6 - 8
src/comms/ibgw/tws_gateway.py

@@ -48,15 +48,13 @@ class TWS_gateway():
         
 
              
-        
+       
+        temp_kwargs = copy.copy(kwargs)
         self.kwargs = copy.copy(TWS_gateway.TWS_GW_DEFAULT_CONFIG)
         for key in self.kwargs:
-            if key in kwargs:
-                self.kwargs[key] = kwargs.pop(key)        
-        self.kwargs.update(kwargs)        
-        
-        
-
+            if key in temp_kwargs:
+                self.kwargs[key] = temp_kwargs.pop(key)        
+        self.kwargs.update(temp_kwargs)    
 
         '''
             TWS_gateway start up sequence
@@ -109,7 +107,7 @@ class TWS_gateway():
         self.initialize_subscription_mgr()
         logging.info('registering messages to listen...')
         self.gw_message_handler.add_listeners([self.cli_req_handler])
-        self.gw_message_handler.add_listener_topics(self.contract_subscription_mgr, ['reqMktData'])
+        self.gw_message_handler.add_listener_topics(self.contract_subscription_mgr, self.kwargs['subscription_manager.topics'])
 
         logging.info('start TWS_event_handler. Start prosumer processing loop...')
         self.gw_message_handler.start_prosumer()

BIN
src/comms/redisQueue.pyc


BIN
src/comms/test/__init__.pyc


BIN
src/comms/tws_client.pyc


BIN
src/comms/tws_gateway.pyc


BIN
src/comms/tws_protocol_helper.pyc


+ 21 - 0
src/config/tws_client_lib.cfg

@@ -0,0 +1,21 @@
+[tws_client_lib]
+name: 'reference_client'
+#
+# kafka settings
+#
+bootstrap_host: 'localhost'
+bootstrap_port: 9092
+#
+# redis persistence
+#
+redis_host: 'localhost'
+redis_port: 6379
+redis_db: 0
+#
+#
+#
+group_id: 'TWS_CLI'
+session_timeout_ms: 10000
+topics:['tickSize', 'tickPrice', 'error']
+logconfig: { 'filemode': 'w', 'filename': '/tmp/tws_client_lib.log',  'level': logging.INFO}
+#logconfig: {'level': logging.INFO}

+ 7 - 4
src/config/tws_gateway.cfg

@@ -17,8 +17,8 @@ redis_db: 0
 # 7496 - production larry046, 7496 - development,  8496 production mchan927
 #
 tws_host: 'localhost'
-tws_api_port: 4001
-tws_app_id: 74001 
+tws_api_port: 7496
+tws_app_id: 74960
 #
 #
 #
@@ -26,7 +26,10 @@ group_id: 'TWS_GW'
 session_timeout_ms: 10000
 topics: ['reqAccountUpdates', 'reqOpenOrders', 'reqExecutions', 'reqIds', 'reqNewsBulletins', 'cancelNewsBulletins', 'setServerLogLevel', 'reqAccountSummary', 'reqPositions', 'reqAutoOpenOrders', 'reqAllOpenOrders', 'reqManagedAccts', 'requestFA', 'reqMktData', 'reqHistoricalData', 'placeOrder', 'gw_req_subscriptions']
 clear_offsets: False
-subscription_manager.subscriptions.redis_key: 'subscriptions'  
+#
+#
+subscription_manager.subscriptions.redis_key: 'subscriptions'
+subscription_manager.topics: ['reqMktData', 'gw_req_subscriptions']
 #logconfig: {'filename': '/home/larry-13.04/workspace/finopt/log/tws_gateway.log', 'filemode': 'w','level': logging.INFO}
-logconfig: {'level': logging.INFO}
+logconfig: {'level': logging.INFO,  'filemode': 'w', 'filename':'/tmp/tws_gateway.log'}
 order_transmit: False

BIN
src/finopt/__init__.pyc


BIN
src/finopt/finopt.pyc


BIN
src/finopt/instrument.pyc


BIN
src/finopt/misc/__init__.pyc


BIN
src/finopt/opt_serve.pyc


BIN
src/finopt/optcal.pyc


BIN
src/finopt/options_analytics.pyc


BIN
src/finopt/options_chain.pyc


BIN
src/finopt/options_data.pyc


BIN
src/finopt/portfolio.pyc


BIN
src/finopt/ystockquote.pyc


BIN
src/misc2/helpers.pyc


+ 1 - 1
src/misc2/observer.py

@@ -37,7 +37,7 @@ class Publisher:
     def dispatch(self, event, params=None):
         
         for subscriber, callback in self.get_subscribers(event).items():
-            print 'observer:: subscriber**** %s' % subscriber
+            #print 'observer:: subscriber**** %s' % subscriber
             callback(event, params)
             
 #############################################################

BIN
src/misc2/observer.pyc


+ 12 - 0
src/sh/start_twscli.sh

@@ -0,0 +1,12 @@
+#!/bin/bash
+
+
+HOST=$(hostname)
+echo $HOST
+if [ $HOST == 'hkc-larryc-vm1' ]; then
+	FINOPT_HOME=~/ironfly-workspace/finopt/src
+else
+	FINOPT_HOME=~/l1304/workspace/finopt-ironfly/finopt/src
+fi
+export PYTHONPATH=$FINOPT_HOME:$PYTHONPATH
+python $FINOPT_HOME/comms/ibc/tws_client_lib.py $FINOPT_HOME/config/tws_client_lib.cfg 

+ 9 - 4
src/sh/start_twsgw.sh

@@ -1,7 +1,12 @@
 #!/bin/bash
-#FINOPT_HOME=~/ironfly-workspace/finopt/src
-FINOPT_HOME=~/l1304/workspace/finopt-ironfly/finopt/src
-export PYTHONPATH=$FINOPT_HOME:$PYTHONPATH
 
-python $FINOPT_HOME/comms/ibgw/tws_gateway.py $FINOPT_HOME/config/tws_gateway.cfg 
 
+HOST=$(hostname)
+echo $HOST
+if [ $HOST == 'hkc-larryc-vm1' ]; then
+	FINOPT_HOME=~/ironfly-workspace/finopt/src
+else
+	FINOPT_HOME=~/l1304/workspace/finopt-ironfly/finopt/src
+fi
+export PYTHONPATH=$FINOPT_HOME:$PYTHONPATH
+python $FINOPT_HOME/comms/ibgw/tws_gateway.py $FINOPT_HOME/config/tws_gateway.cfg