Browse Source

night changes

esurfer 9 years ago
parent
commit
964051d4c9

+ 2 - 3
src/comms/ibc/base_client_messaging.py

@@ -65,9 +65,8 @@ class GatewayCommandWrapper():
     
         
 
-    def gw_req_subscriptions(self):
-        
-        self.producer.send_message('gw_req_subscriptions', self.producer.message_dumps(None))
+    def gw_req_subscriptions(self, sender_id):
+        self.producer.send_message('gw_req_subscriptions', self.producer.message_dumps({'sender_id': sender_id}))
         
         
         

+ 1 - 1
src/comms/ibc/gw_ex1.py

@@ -47,7 +47,7 @@ def test_client(kwargs):
     cm.add_listener_topics(cl, kwargs['topics'])
     cm.start_manager()
     cm.reqPositions()
-    cm.gw_req_subscriptions()
+    cm.gw_req_subscriptions(sender_id=kwargs['name'])
     try:
         logging.info('TWS_gateway:main_loop ***** accepting console input...')
         while not cm.is_stopped(): 

+ 7 - 1
src/comms/ibgw/base_messaging.py

@@ -349,6 +349,8 @@ class BaseConsumer(threading.Thread, Publisher):
 
 class BaseMessageListener(Subscriber):
     
+    def __init__(self, name):
+        self.name = name
     
     
     def update(self, event, param=none):
@@ -474,9 +476,13 @@ class Prosumer(BaseProducer):
     def message_dumps(self, obj_msg):
         return json.dumps(obj_msg)
 
+
+
     
 class SubscriptionListener(BaseMessageListener):
-    
+    '''
+    test code used by test cases
+    '''
     
     def __init__(self, name, producer):
         BaseMessageListener.__init__(self, name)

+ 4 - 19
src/comms/ibgw/subscription_manager.py

@@ -217,28 +217,13 @@ class SubscriptionManager(BaseMessageListener):
        Client requests to TWS_gateway
     """
     def gw_req_subscriptions(self, event, message):
-        
+     
+        from_id = json.loads(message['value'])['sender_id']
         ic = self.get_id_kvs_contracts(db=False)
+        print self.producer.message_dumps({'subscriptions': ic, 'sender_id':self.name, 'target_id':from_id})
         if ic:
              
             logging.info('SubscriptionManager:gw_req_subscriptions-------\n%s' % ic)
-            self.producer.send_message('gw_subscriptions', self.producer.message_dumps({'subscriptions': ic}))
+            self.producer.send_message('gw_subscriptions', self.producer.message_dumps({'subscriptions': ic, 'sender_id':self.name, 'target_id':from_id}))
         
        
-<<<<<<< Upstream, based on origin/ironfly
-=======
-       
-
-       
-
-       
->>>>>>> fec2a94 synchronized changes
-
-
-
-<<<<<<< Upstream, based on origin/ironfly
-=======
-
-    
->>>>>>> fec2a94 synchronized changes
-    

+ 91 - 18
src/rethink/analytics_engine.py

@@ -1,6 +1,9 @@
 import logging
 import json
+import copy
+from optparse import OptionParser
 from time import sleep
+from misc2.observer import Subscriber
 from rethink.tick_datastore import TickDataStore
 from comms.ibc.tws_client_lib import TWS_client_manager, AbstractGatewayListener
 
@@ -8,7 +11,7 @@ from comms.ibc.tws_client_lib import TWS_client_manager, AbstractGatewayListener
 
 
 
-class AnalyticsEngine(AbstractGatewayListener):
+class AnalyticsEngine(Subscriber, AbstractGatewayListener):
 
     AE_OPTIONS_CONFIG = {
         'underlying_substitution': {'IND': 'FUT'},
@@ -17,24 +20,22 @@ class AnalyticsEngine(AbstractGatewayListener):
     
     
     
-    def __init__(self, name, kwargs):
+    def __init__(self, kwargs):
+        self.kwargs = copy.copy(kwargs)
         self.twsc = TWS_client_manager(kwargs)
-        AbstractGatewayListener.__init__(self, name)
+        AbstractGatewayListener.__init__(self, kwargs['name'])
     
-        self.tds = TickDataStore(name, self.twsc)
+        self.tds = TickDataStore(kwargs['name'])
+        self.tds.register_listener(self)
         self.twsc.add_listener_topics(self, kwargs['topics'])
-        self.twsc.add_listener_topics(self.tds, kwargs['tds_topics'])
+        
  
         self.option_chains = {}
         
     
     def start_engine(self):
         self.twsc.start_manager()
-        self.twsc.gw_req_subscriptions()
-        self.initial_run = True
-        while self.initial_run:
-            sleep(0.5)
-
+        self.request_subscrptions()
 
         try:
             logging.info('AnalyticsEngine:main_loop ***** accepting console input...')
@@ -49,27 +50,99 @@ class AnalyticsEngine(AbstractGatewayListener):
             logging.info('AnalyticsEngine: Service shut down complete...')               
     
     
+    def request_subscrptions(self):
+        self.initial_run = True
+        self.twsc.gw_req_subscriptions(self.kwargs['name'])
+        while self.initial_run:
+            sleep(0.5)
+
+
+    #
+    # tds call backs
+    #
+    def tds_event_new_symbol_added(self, event, items):
+        pass
     
+    def tds_event_tick_updated(self, event, items):
+        pass
+    
+    #
+    # external ae requests
+    #
     def ae_req_greeks(self, event, message_value):
         #(int tickerId, int field, double impliedVol, double delta, double optPrice, double pvDividend, double gamma, double vega, double theta, double undPrice) 
         pass
     
+    
+    #
+    # gateway events
+    #
     def gw_subscription_changed(self, event, message_value):
-        logging.info('MessageListener:%s. val->[%s]' % (event, message_value))
- 
+        logging.info('AnalyticsEngine:%s. val->[%s]' % (event, message_value))
  
-            
+             
     def gw_subscriptions(self, event, message_value):
-        logging.info('MessageListener:%s. val->[%s]' % (event, message_value))
+        logging.info('AnalyticsEngine:%s. val->[%s]' % (event, message_value))
 
         if self.initial_run:
             self.tds.update_datastore(message_value)
-            self.pending_gw_reply = False
-            
-        
+            self.initial_run = False
+
+    #            
+    # tws events     
+    #
     def tickPrice(self, event, message_value):   
         self.tds.update_symbol_price(event, message_value)
 
  
     def error(self, event, message_value):
-        logging.info('MessageListener:%s. val->[%s]' % (event, message_value))              
+        logging.info('AnalyticsEngine:%s. val->[%s]' % (event, message_value))         
+        
+        
+if __name__ == '__main__':
+    
+
+    
+    kwargs = {
+      'name': 'analytics_engine',
+      'bootstrap_host': 'localhost',
+      'bootstrap_port': 9092,
+      'redis_host': 'localhost',
+      'redis_port': 6379,
+      'redis_db': 0,
+      'tws_host': 'localhost',
+      'tws_api_port': 8496,
+      'tws_app_id': 38868,
+      'group_id': 'AE',
+      'session_timeout_ms': 10000,
+      'clear_offsets':  False,
+      'logconfig': {'level': logging.INFO},
+      'topics': ['gw_subscriptions', 'gw_subscription_changed'],
+      'seek_to_end':['tickSize', 'tickPrice']
+      }
+
+    usage = "usage: %prog [options]"
+    parser = OptionParser(usage=usage)
+    parser.add_option("-c", "--clear_offsets", action="store_true", dest="clear_offsets",
+                      help="delete all redis offsets used by this program")
+    parser.add_option("-g", "--group_id",
+                      action="store", dest="group_id", 
+                      help="assign group_id to this running instance")
+    
+    (options, args) = parser.parse_args()
+    for option, value in options.__dict__.iteritems():
+        if value <> None:
+            kwargs[option] = value
+            
+    #print kwargs    
+      
+    logconfig = kwargs['logconfig']
+    logconfig['format'] = '%(asctime)s %(levelname)-8s %(message)s'    
+    logging.basicConfig(**logconfig)        
+    
+    
+    server = AnalyticsEngine(kwargs)
+    server.start_engine()
+    
+          
+        

+ 29 - 49
src/rethink/tick_datastore.py

@@ -6,7 +6,7 @@ from misc2.observer import NotImplementedException
 from misc2.helpers import ContractHelper
 from comms.ibc.base_client_messaging import AbstractGatewayListener
 
-class TickDataStore(Publisher, AbstractGatewayListener):
+class TickDataStore(Publisher):
     """
     
     Data structure:
@@ -43,28 +43,28 @@ class TickDataStore(Publisher, AbstractGatewayListener):
     """
     
 
-    TICK_PRICE_UPDATED = 'tds_price_updated'
-    NEW_SYMBOL_ADDED = 'tds_new_symbol_added'
-    TDS_EVENTS = [TICK_PRICE_UPDATED, NEW_SYMBOL_ADDED] 
+    EVENT_TICK_UPDATED = 'tds_event_tick_updated'
+    EVENT_NEW_SYMBOL_ADDED = 'tds_event_new_symbol_added'
+    TDS_EVENTS = [EVENT_TICK_UPDATED, EVENT_NEW_SYMBOL_ADDED] 
 
     
     def __init__(self, name):
         
-        AbstractGatewayListener.__init__(self, name)
+
         self.tickers = {}
         self.symbols = {}
         self.lock = RLock()
-        
+        Publisher.__init__(self, TickDataStore.TDS_EVENTS)
         self.first_run = True
         
         
     def register_listener(self, l):
-        map(lambda e: self.register(e, l, l.tds_price_updated), TickDataStore.TDS_EVENTS)
+        map(lambda e: self.register(e, l, getattr(l, e)), TickDataStore.TDS_EVENTS)
 
     def dump(self):
             # print ', '.join('[%s:%s]' % (k, v['ticker_id'])) 
-        logging.debug('TickDataStore-symbols: [Key: Ticker ID: # options objects]: ---->\n%s' % (',\n'.join('[%s:%d:%d]' % (k, v['ticker_id'], len(v['syms'])) for k, v in self.symbols.iteritems())))
-        logging.debug('TickDataStore-tickers: %s' % self.tickers)
+        logging.info('TickDataStore-symbols: [Key: Ticker ID: # options objects]: ---->\n%s' % (',\n'.join('[%s:%d:%d]' % (k, v['ticker_id'], len(v['syms'])) for k, v in self.symbols.iteritems())))
+        logging.info('TickDataStore-tickers: %s' % self.tickers)
     
      
     
@@ -79,7 +79,7 @@ class TickDataStore(Publisher, AbstractGatewayListener):
     
             # defer the dispatch at the end of this method        
             if key not in self.symbols:
-                self.dispatch(TickDataStore.NEW_SYMBOL_ADDED, symbol)
+                self.dispatch(TickDataStore.EVENT_NEW_SYMBOL_ADDED, symbol)
         finally:
             self.lock.release()
             
@@ -88,7 +88,7 @@ class TickDataStore(Publisher, AbstractGatewayListener):
     
        
         
-    def update_symbol_price(self, event, message_value):   
+    def set_symbol_price(self, event, message_value):   
         
         # 'value': '{"tickerId": 0, "size": 3, "field": 3}'
         items = json.loads(message_value)
@@ -105,21 +105,21 @@ class TickDataStore(Publisher, AbstractGatewayListener):
             pass
         finally:
             self.lock.release()
-            self.dispatch(TickDataStore.TICK_PRICE_UPDATED, message_value)
+            self.dispatch(TickDataStore.EVENT_TICK_UPDATED, message_value)
             
-    def error(self, event, message_value):
-        logging.info('TickDataStore:%s. val->[%s]' % (event, message_value))  
 
 
-    
-    def gw_subscription_changed(self, event, message_value):
-        logging.info('TickDataStore:%s. val->[%s]' % (event, message_value))
-        self.update_datastore(message_value)
- 
-
     def update_datastore(self, subscription_message_value):
-        
-        def set_values(idc):
+        '''
+        sample value:
+        {
+        'partition': 0, 'value': '{"target_id": "analytics_engine", "sender_id": "tws_gateway_server", 
+        "subscriptions": [[0, "{\\"m_conId\\": 0, \\"m_right\\": \\"\\", \\"m_symbol\\": \\"HSI\\", \\"m_secType\\": \\"FUT\\", 
+        \\"m_includeExpired\\": false, \\"m_expiry\\": \\"20170330\\", \\"m_currency\\": \\"HKD\\", \\"m_exchange\\": \\"HKFE\\", \\"m_strike\\": 0}"]]}', 
+        'offset': 13
+        }
+        '''
+        def set_values2(idc):
             
             
             key = ContractHelper.makeRedisKeyEx(idc[1])
@@ -143,10 +143,12 @@ class TickDataStore(Publisher, AbstractGatewayListener):
                 return x if isinstance(x, unicode) else x
         
             self.lock.acquire()
-            items = json.loads(subscription_message_value)    
-            id_contracts = map(lambda x: (x[0], ContractHelper.kvstring2contract(utf2asc(x[1]))), items)
-        
-            map(lambda idc: set_values, id_contracts)    
+            
+            items = json.loads(subscription_message_value['value'])
+            logging.info('TickDataStore:update_datastore. items: %s ' % items)
+            id_contracts = map(lambda x: (x[0], ContractHelper.kvstring2contract(utf2asc(x[1]))), items['subscriptions'])
+            map(lambda idc: set_values2, id_contracts)   
+            self.dump()
         except TypeError:
             logging.error('TickDataStore:gw_subscriptions. Exception when trying to get id:contracts.')
             return None       
@@ -154,30 +156,8 @@ class TickDataStore(Publisher, AbstractGatewayListener):
             self.lock.release()     
         
             
-    def gw_subscriptions(self, event, message_value):
-        logging.info('TickDataStore:%s. val->[%s]' % (event, message_value))
-        self.update_datastore(message_value)
-        if self.first_run:
-            self.dispatch(TickDataStore.TDS_INIT_COMPLETE, {})
-            self.first_run = False
-            
-    
-        """
-        {0: {u'm_conId': 0, u'm_right': u'', u'm_symbol': u'QQQ', 
-        u'm_secType': u'STK', u'm_includeExpired': False, 
-        u'm_expiry': u'', u'm_currency': u'USD', u'm_exchange': u'SMART', u'm_strike': 0}, 
-        1: {u'm_conId': 0, u'm_right': u'C', u'm_symbol': u'QQQ', u'm_secType': u'OPT', 
-        u'm_includeExpired': False, u'm_multiplier': 100, u'm_expiry': u'20170217', u'm_currency': u'USD', u'm_exchange': u'SMART', u'm_strike': 125.0}, 
-        2: {u'm_conId': 0, u'm_right': u'P', u'm_symbol': u'QQQ', u'm_secType': u'OPT', u'm_includeExpired': False, u'm_multiplier': 100, 
-        u'm_expiry': u'20170217', u'm_currency': u'USD', u'm_exchange': u'SMART', u'm_strike': 125.0}, 
-        ...
-         
-        78: {u'm_conId': 0, u'm_right': u'P', u'm_symbol': u'QQQ', u'm_secType': u'OPT', 
-        u'm_includeExpired': False, u'm_multiplier': 100, u'm_expiry': u'20170217', 
-        u'm_currency': u'USD', u'm_exchange': u'SMART', u'm_strike': 115.5}}
-        tickPrice>> [0:QQQ--0.00--STK-USD-SMART-102] bid_q=-1.0000 [2017-01-28 12:08:49.587014]
 
-        """
+