Bläddra i källkod

daily changes - analytics and datastore

bobhk 9 år sedan
förälder
incheckning
8b3ca4709a

+ 6 - 3
src/comms/ibgw/subscription_manager.py

@@ -217,10 +217,13 @@ class SubscriptionManager(BaseMessageListener):
        Client requests to TWS_gateway
     """
     def gw_req_subscriptions(self, event, message):
-     
-        from_id = json.loads(message['value'])['sender_id']
+        try:
+            from_id = json.loads(message['value'])['sender_id']
+        except:
+            from_id = '<empty_sender_id>'
+            
         ic = self.get_id_kvs_contracts(db=False)
-        print self.producer.message_dumps({'subscriptions': ic, 'sender_id':self.name, 'target_id':from_id})
+        #print self.producer.message_dumps({'subscriptions': ic, 'sender_id':self.name, 'target_id':from_id})
         if ic:
              
             logging.info('SubscriptionManager:gw_req_subscriptions-------\n%s' % ic)

+ 28 - 8
src/rethink/analytics_engine.py

@@ -4,6 +4,8 @@ import copy
 from optparse import OptionParser
 from time import sleep
 from misc2.observer import Subscriber
+from misc2.helpers import ContractHelper
+from finopt.options_chain import OptionsChain
 from rethink.tick_datastore import TickDataStore
 from comms.ibc.tws_client_lib import TWS_client_manager, AbstractGatewayListener
 
@@ -33,10 +35,26 @@ class AnalyticsEngine(Subscriber, AbstractGatewayListener):
         self.option_chains = {}
         
     
+    def test_oc(self):
+        expiry = '20170330'
+        contractTuple = ('HSI', 'FUT', 'HKFE', 'HKD', '', 0, expiry)
+        contract = ContractHelper.makeContract(contractTuple)  
+        oc2 = OptionsChain('qqq-%s' % expiry)
+        oc2.set_option_structure(contract, 200, 50, 0.0012, 0.0328, expiry)        
+    
+        oc2.build_chain(24119, 0.03, 0.22)
+        oc2.pretty_print()        
+
+        for o in oc2.get_option_chain():
+            self.tds.add_symbol(o)
+    
+    
     def start_engine(self):
         self.twsc.start_manager()
         self.request_subscrptions()
-
+        
+        self.test_oc()
+        
         try:
             logging.info('AnalyticsEngine:main_loop ***** accepting console input...')
             while True: 
@@ -60,11 +78,13 @@ class AnalyticsEngine(Subscriber, AbstractGatewayListener):
     #
     # tds call backs
     #
-    def tds_event_new_symbol_added(self, event, items):
-        pass
+    def tds_event_new_symbol_added(self, event, symbol):
+       
+        logging.info('tds_event_new_symbol_added. %s' % ContractHelper.object2kvstring(symbol.get_contract()))
+        self.twsc.reqMktData(symbol.get_contract())
     
     def tds_event_tick_updated(self, event, items):
-        pass
+        logging.info('tds_event_tick_updated. %s' % items)
     
     #
     # external ae requests
@@ -79,7 +99,7 @@ class AnalyticsEngine(Subscriber, AbstractGatewayListener):
     #
     def gw_subscription_changed(self, event, message_value):
         logging.info('AnalyticsEngine:%s. val->[%s]' % (event, message_value))
- 
+        self.tds.update_datastore(message_value)
              
     def gw_subscriptions(self, event, message_value):
         logging.info('AnalyticsEngine:%s. val->[%s]' % (event, message_value))
@@ -92,7 +112,7 @@ class AnalyticsEngine(Subscriber, AbstractGatewayListener):
     # tws events     
     #
     def tickPrice(self, event, message_value):   
-        self.tds.update_symbol_price(event, message_value)
+        self.tds.set_symbol_price(event, message_value)
 
  
     def error(self, event, message_value):
@@ -117,7 +137,7 @@ if __name__ == '__main__':
       'session_timeout_ms': 10000,
       'clear_offsets':  False,
       'logconfig': {'level': logging.INFO},
-      'topics': ['gw_subscriptions', 'gw_subscription_changed'],
+      'topics': ['tickPrice', 'gw_subscriptions', 'gw_subscription_changed'],
       'seek_to_end':['tickSize', 'tickPrice']
       }
 
@@ -134,7 +154,7 @@ if __name__ == '__main__':
         if value <> None:
             kwargs[option] = value
             
-    #print kwargs    
+  
       
     logconfig = kwargs['logconfig']
     logconfig['format'] = '%(asctime)s %(levelname)-8s %(message)s'    

+ 27 - 22
src/rethink/tick_datastore.py

@@ -63,24 +63,28 @@ class TickDataStore(Publisher):
 
     def dump(self):
             # print ', '.join('[%s:%s]' % (k, v['ticker_id'])) 
-        logging.info('TickDataStore-symbols: [Key: Ticker ID: # options objects]: ---->\n%s' % (',\n'.join('[%s:%d:%d]' % (k, v['ticker_id'], len(v['syms'])) for k, v in self.symbols.iteritems())))
-        logging.info('TickDataStore-tickers: %s' % self.tickers)
+        logging.info('TickDataStore-symbols:\nkey : ticker : object cnt---->\n%s' % ('\n'.join('[%s :  %d : %d]' % 
+                                                                                                (k, v['ticker_id'], len(v['syms'])) for k, v in self.symbols.iteritems())))
+        logging.info('TickDataStore-tickers:\nticker: object%s' % ('\n'.join('%s:%s' % (str(k).ljust(4), 
+                                                                          ContractHelper.makeRedisKeyEx(v))) for k, v in self.tickers.iteritems()))
     
      
     
     def add_symbol(self, symbol):
         try:
+            dispatch = False
             self.lock.acquire()
             key = symbol.get_key()
             if key not in self.symbols:
                 self.symbols[key] = {'ticker_id':-1, 'syms': []}
+                dispatch = True
                 
             self.symbols[key]['syms'].append(symbol)        
     
             # defer the dispatch at the end of this method        
-            if key not in self.symbols:
+            if dispatch:
                 self.dispatch(TickDataStore.EVENT_NEW_SYMBOL_ADDED, symbol)
-        finally:
+        finally:            
             self.lock.release()
             
     def del_symbol(self, symbol):
@@ -91,7 +95,8 @@ class TickDataStore(Publisher):
     def set_symbol_price(self, event, message_value):   
         
         # 'value': '{"tickerId": 0, "size": 3, "field": 3}'
-        items = json.loads(message_value)
+        
+        items = json.loads(message_value['value'])
         tid = items['tickerId']
 
         try:
@@ -107,7 +112,22 @@ class TickDataStore(Publisher):
             self.lock.release()
             self.dispatch(TickDataStore.EVENT_TICK_UPDATED, message_value)
             
+    def set_datastore_values(self, idc):
+        
 
+        key = ContractHelper.makeRedisKeyEx(idc[1])
+        if key in self.symbols and idc[0] <> self.symbols[key]['ticker_id']:
+            # if this condition is met, one should delete the old entry
+            # and move all object references to the new key/ticker_id
+            raise
+        
+        self.tickers[idc[0]] = key
+        try:
+            self.symbols[key]['ticker_id'] = idc[0]
+        except KeyError:
+            self.symbols[key] = {'ticker_id': idc[0],
+                                   'syms': []}
+        return key
 
     def update_datastore(self, subscription_message_value):
         '''
@@ -119,22 +139,7 @@ class TickDataStore(Publisher):
         'offset': 13
         }
         '''
-        def set_values2(idc):
-            
-            
-            key = ContractHelper.makeRedisKeyEx(idc[1])
-            
-            if key in self.symbols and idc[0] <> self.symbols[key]['ticker_id']:
-                # if this condition is met, one should delete the old entry
-                # and move all object references to the new key/ticker_id
-                raise
-            
-            self.tickers[idc[0]] = key
-            try:
-                self.symbols[key]['ticker_id'] = idc[0]
-            except KeyError:
-                self.symbols[key] = {'ticker_id': idc[0],
-                                       'syms': []}
+
                 
 
         
@@ -147,7 +152,7 @@ class TickDataStore(Publisher):
             items = json.loads(subscription_message_value['value'])
             logging.info('TickDataStore:update_datastore. items: %s ' % items)
             id_contracts = map(lambda x: (x[0], ContractHelper.kvstring2contract(utf2asc(x[1]))), items['subscriptions'])
-            map(lambda idc: set_values2, id_contracts)   
+               
             self.dump()
         except TypeError:
             logging.error('TickDataStore:gw_subscriptions. Exception when trying to get id:contracts.')

+ 1 - 1
src/sh/start_twsgw.sh

@@ -9,4 +9,4 @@ else
 	FINOPT_HOME=~/l1304/workspace/finopt-ironfly/finopt/src
 fi
 export PYTHONPATH=$FINOPT_HOME:$PYTHONPATH
-python $FINOPT_HOME/comms/ibgw/tws_gateway.py $FINOPT_HOME/config/tws_gateway.cfg 
+python $FINOPT_HOME/comms/ibgw/tws_gateway.py -c -f $FINOPT_HOME/config/tws_gateway.cfg