소스 검색

# WARNING: head commit changed in the meantime

Merge branch 'ironfly' of https://github.com/laxaurus/finopt.git into
ironfly

Conflicts:
	src/comms/ibgw/subscription_manager.py
bobhk 9 년 전
부모
커밋
5abf168768
6개의 변경된 파일144개의 추가작업 그리고 22개의 파일을 삭제
  1. 107 0
      src/comms/ibc/gw_ex_request_exit.py
  2. 9 4
      src/comms/ibgw/base_messaging.py
  3. 1 0
      src/config/tws_gateway.cfg
  4. 21 15
      src/rethink/analytics_engine.py
  5. 5 2
      src/rethink/tick_datastore.py
  6. 1 1
      src/sh/start_twsgw.sh

+ 107 - 0
src/comms/ibc/gw_ex_request_exit.py

@@ -0,0 +1,107 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+from time import sleep, strftime
+import logging
+import json
+
+from ib.ext.Contract import Contract
+from optparse import OptionParser
+from misc2.helpers import ContractHelper
+from comms.ibgw.base_messaging import Prosumer
+from comms.tws_protocol_helper import TWS_Protocol
+from comms.ibc.tws_client_lib import TWS_client_manager, AbstractGatewayListener
+
+         
+class MessageListener(AbstractGatewayListener):   
+    def __init__(self, name, parent):
+        AbstractGatewayListener.__init__(self, name)
+        self.parent = parent
+
+    def position(self, event, message_value):  # account, contract, pos, avgCost):
+        logging.info('MessageListener:%s. val->[%s]' % (event, message_value))
+   
+    def positionEnd(self, event, message_value):
+        logging.info('MessageListener:%s. val->[%s]' % (event, message_value))
+        #self.parent.stop_manager()
+        
+    def error(self, event, message_value):
+        logging.info('MessageListener:%s. val->[%s]' % (event, message_value))  
+
+
+    def gw_subscriptions(self, event, message_value):
+        logging.info('MessageListener:%s. val->[%s]' % (event, message_value))
+        
+
+    def gw_subscription_changed(self, event, message_value):
+        logging.info('MessageListener:%s. val->[%s]' % (event, message_value))
+        
+
+    
+
+
+def test_client(kwargs):
+
+    cm = TWS_client_manager(kwargs)
+    cl = MessageListener('gw_client_message_listener', cm)
+    
+    cm.add_listener_topics(cl, kwargs['topics'])
+    cm.start_manager()
+    cm.gw_message_handler.send_message('ae_req_tds_internal', '')
+    try:
+        logging.info('TWS_gateway:main_loop ***** accepting console input...')
+        while not cm.is_stopped(): 
+        
+            sleep(.45)
+        
+    except (KeyboardInterrupt, SystemExit):
+        logging.error('TWS_client_manager: caught user interrupt. Shutting down...')
+        cm.gw_message_handler.set_stop()
+        
+        logging.info('TWS_client_manager: Service shut down complete...')
+           
+    print 'end of test_client function'
+      
+if __name__ == '__main__':
+    
+
+    
+    kwargs = {
+      'name': 'simple_request',
+      'bootstrap_host': 'localhost',
+      'bootstrap_port': 9092,
+      'redis_host': 'localhost',
+      'redis_port': 6379,
+      'redis_db': 0,
+      'tws_host': 'localhost',
+      'tws_api_port': 8496,
+      'tws_app_id': 38868,
+      'group_id': 'EX_REQUEST',
+      'session_timeout_ms': 10000,
+      'clear_offsets':  False,
+      'logconfig': {'level': logging.INFO},
+      'topics': ['positionEnd']
+      }
+
+    usage = "usage: %prog [options]"
+    parser = OptionParser(usage=usage)
+    parser.add_option("-c", "--clear_offsets", action="store_true", dest="clear_offsets",
+                      help="delete all redis offsets used by this program")
+    parser.add_option("-g", "--group_id",
+                      action="store", dest="group_id", 
+                      help="assign group_id to this running instance")
+    
+    (options, args) = parser.parse_args()
+    for option, value in options.__dict__.iteritems():
+        if value <> None:
+            kwargs[option] = value
+            
+    #print kwargs    
+      
+    logconfig = kwargs['logconfig']
+    logconfig['format'] = '%(asctime)s %(levelname)-8s %(message)s'    
+    logging.basicConfig(**logconfig)        
+    
+    
+    test_client(kwargs)
+    
+     

+ 9 - 4
src/comms/ibgw/base_messaging.py

@@ -17,6 +17,7 @@ from redis import Redis
 
 from misc2.observer import Subscriber, Publisher
 from numpy.distutils.fcompiler import none
+from types import NoneType
 
 
 
@@ -288,15 +289,19 @@ class BaseConsumer(threading.Thread, Publisher):
                                   % (message.topic, message.partition, message.offset, gap, highwater))
                                                                                                 
                     for t, ps in map(lambda t: (t, consumer.partitions_for_topic(t)), self.my_topics.keys()):
-                        logging.info ("*** On first iteration: T/P Table: topic:[%s] %s" % (t.rjust(25),  
-                                                         ','.join('part:%d, off:%d' % (p, consumer.position(TopicPartition(topic=t, partition=p))) for p in ps)
-                                                         ))
+                        try:
+                            logging.info ("*** On first iteration: T/P Table: topic:[%s] %s" % (t.rjust(25),  
+                                                             ','.join('part:%d, off:%d' % (p, consumer.position(TopicPartition(topic=t, partition=p))) for p in ps)
+                                                             ))
+                        except TypeError:
+                            logging.warn ('*** On first iteration: [*** %s not registered in kafka topics yet ***]. This message should go away the next time the program is run.' % t)
+                            continue
                         
                     self.persist_offsets(message.topic, message.partition, message.offset)
                     self.my_topics[message.topic] = json.loads(self.rs.get(self.consumer_topic(message.topic)))
                         
                         
-                    if message.topic in self.kwargs['seek_to_end']:
+                    if '*' in self.kwargs['seek_to_end'] or message.topic in self.kwargs['seek_to_end']:
                         
                         # if there is no gap                          
                         if gap == 1:

+ 1 - 0
src/config/tws_gateway.cfg

@@ -26,6 +26,7 @@ group_id: 'TWS_GW'
 session_timeout_ms: 10000
 topics: ['reqAccountUpdates', 'reqOpenOrders', 'reqExecutions', 'reqIds', 'reqNewsBulletins', 'cancelNewsBulletins', 'setServerLogLevel', 'reqAccountSummary', 'reqPositions', 'reqAutoOpenOrders', 'reqAllOpenOrders', 'reqManagedAccts', 'requestFA', 'reqMktData', 'reqHistoricalData', 'placeOrder', 'gw_req_subscriptions']
 clear_offsets: False
+seek_to_end:['gw_req_subscriptions', 'reqPositions', 'reqMktData']
 #
 #
 subscription_manager.subscriptions.redis_key: 'subscriptions'

+ 21 - 15
src/rethink/analytics_engine.py

@@ -36,27 +36,28 @@ class AnalyticsEngine(Subscriber, AbstractGatewayListener):
         
     
     def test_oc(self, oc2):
-#         expiry = '20170330'
-#         contractTuple = ('HSI', 'FUT', 'HKFE', 'HKD', '', 0, expiry)
+        expiry = '20170330'
+        contractTuple = ('HSI', 'FUT', 'HKFE', 'HKD', '', 0, expiry)
+        contract = ContractHelper.makeContract(contractTuple)  
+ 
+        oc2.set_option_structure(contract, 200, 50, 0.0012, 0.0328, expiry)        
+     
+        oc2.build_chain(24119, 0.03, 0.22)
+        
+#         expiry='20170324'
+#         contractTuple = ('QQQ', 'STK', 'SMART', 'USD', '', 0, '')
 #         contract = ContractHelper.makeContract(contractTuple)  
 # 
-#         oc2.set_option_structure(contract, 200, 50, 0.0012, 0.0328, expiry)        
+#         oc2.set_option_structure(contract, 0.5, 100, 0.0012, 0.0328, expiry)        
 #     
-#         oc2.build_chain(24119, 0.02, 0.22)
-        
-        expiry='20170324'
-        contractTuple = ('QQQ', 'STK', 'SMART', 'USD', '', 0, '')
-        contract = ContractHelper.makeContract(contractTuple)  
-
-        oc2.set_option_structure(contract, 0.5, 100, 0.0012, 0.0328, expiry)        
-    
-        oc2.build_chain(132.11, 0.02, 0.22)
+#         oc2.build_chain(132.11, 0.02, 0.22)
         
         
         oc2.pretty_print()        
 
         for o in oc2.get_option_chain():
             self.tds.add_symbol(o)
+        self.tds.add_symbol(oc2.get_underlying())
     
     
     def start_engine(self):
@@ -69,8 +70,9 @@ class AnalyticsEngine(Subscriber, AbstractGatewayListener):
             logging.info('AnalyticsEngine:main_loop ***** accepting console input...')
             while True: 
             
-                sleep(3.0)
+
                 oc2.pretty_print()
+                sleep(10.0)
             
         except (KeyboardInterrupt, SystemExit):
             logging.error('AnalyticsEngine: caught user interrupt. Shutting down...')
@@ -110,6 +112,9 @@ class AnalyticsEngine(Subscriber, AbstractGatewayListener):
         #(int tickerId, int field, double impliedVol, double delta, double optPrice, double pvDividend, double gamma, double vega, double theta, double undPrice) 
         pass
     
+    def ae_req_tds_internal(self, event, message_value):
+        logging.info('received ae_req_tds_internal')
+        self.tds.dump()
     
     #
     # gateway events
@@ -159,8 +164,9 @@ if __name__ == '__main__':
       'session_timeout_ms': 10000,
       'clear_offsets':  True,
       'logconfig': {'level': logging.INFO, 'filemode': 'w', 'filename': '/tmp/ae.log'},
-      'topics': ['tickPrice', 'gw_subscriptions', 'gw_subscription_changed'],
-      'seek_to_end':['tickSize', 'tickPrice']
+      'topics': ['tickPrice', 'gw_subscriptions', 'gw_subscription_changed', 'ae_req_tds_internal'],
+      'seek_to_end': ['*'],
+      #'seek_to_end':['tickSize', 'tickPrice','gw_subscriptions', 'gw_subscription_changed']
       }
 
     usage = "usage: %prog [options]"

+ 5 - 2
src/rethink/tick_datastore.py

@@ -107,13 +107,16 @@ class TickDataStore(Publisher):
         try:
             self.lock.acquire()
             contract_key = self.tickers[tid]
-            logging.debug('set_symbol_price: -------------------tick id:%d symbol list length=%d' % (tid, len(self.symbols[contract_key]['syms'])))
+            if (tid == 10):
+                logging.info('set_symbol_price %s' % items)
+            logging.debug('set_symbol_price: -------------------tick id:%d symbol list length=%d %s' % (tid, len(self.symbols[contract_key]['syms']), 
+                                                                                            contract_key))
             map(lambda e: e.set_tick_value(items['field'], items['price']), self.symbols[contract_key]['syms'])
             
         except KeyError:
             # contract not set up in the datastore, ignore message
             logging.error('set_symbol_price: KeyError: %d' % tid)
-            self.dump()
+            #self.dump()
             pass
         finally:
             self.lock.release()

+ 1 - 1
src/sh/start_twsgw.sh

@@ -10,4 +10,4 @@ else
 fi
 export PYTHONPATH=$FINOPT_HOME:$PYTHONPATH
 #python $FINOPT_HOME/comms/ibgw/tws_gateway.py -r -c -f $FINOPT_HOME/config/tws_gateway.cfg 
-python $FINOPT_HOME/comms/ibgw/tws_gateway.py  -f $FINOPT_HOME/config/tws_gateway.cfg 
+python $FINOPT_HOME/comms/ibgw/tws_gateway.py  -r -f $FINOPT_HOME/config/tws_gateway.cfg