Parcourir la source

mid night commits

esurfer il y a 9 ans
Parent
commit
9502f560a7

+ 2 - 2
src/comms/ibgw/base_messaging.py

@@ -288,8 +288,8 @@ class BaseConsumer(threading.Thread, Publisher):
                                   % (message.topic, message.partition, message.offset, gap, highwater))
                                                                                                 
                     for t, ps in map(lambda t: (t, consumer.partitions_for_topic(t)), self.my_topics.keys()):
-                        logging.info ("*** On first iteration: Topic Partition Table: topic:[%s] %s" % (t.rjust(20),  
-                                                         ','.join('partition:%d, offset:%d' % (p, consumer.position(TopicPartition(topic=t, partition=p))) for p in ps)
+                        logging.info ("*** On first iteration: T/P Table: topic:[%s] %s" % (t.rjust(25),  
+                                                         ','.join('part:%d, off:%d' % (p, consumer.position(TopicPartition(topic=t, partition=p))) for p in ps)
                                                          ))
                         
                     self.persist_offsets(message.topic, message.partition, message.offset)

+ 27 - 11
src/comms/ibgw/subscription_manager.py

@@ -15,13 +15,16 @@ class SubscriptionManager(BaseMessageListener):
     
     TICKER_GAP = 1000
     
-    def __init__(self, name, tws_connection, producer, rs_conn, subscription_key):
+    def __init__(self, name, tws_connection, producer, rs_conn, kwargs):
         BaseMessageListener.__init__(self, name)
         
         self.tws_connect = tws_connection
         self.producer = producer
         self.rs = rs_conn
-        self.subscription_key = subscription_key
+        self.subscription_key = kwargs['subscription_manager.subscriptions.redis_key']
+        
+        
+        self.reset_subscriptions(kwargs['reset_db_subscriptions'])
 
         #self.handle = []
         # contract key map to contract ID (index of the handle array)
@@ -44,8 +47,10 @@ class SubscriptionManager(BaseMessageListener):
         self.load_subscriptions()
         
             
-
-        
+    def reset_subscriptions(self, reset_db):
+        if reset_db:
+            logging.warn('SubscriptionManager:reset_subscriptions. Delete subscription entry in redis')
+            self.rs.delete(self.subscription_key)
         
     def load_subscriptions(self):
         '''
@@ -103,10 +108,13 @@ class SubscriptionManager(BaseMessageListener):
             # the call to TWS will return a snapshot follow 
             # by the subscription being cancelled. Add 1000 to avoid clashing 
             # with other subscription ids.  
+            print 'request_market_data: %d' % (id + TWS_event_handler.TICKER_GAP)
             self.tws_connect.reqMktData(id + TWS_event_handler.TICKER_GAP, contract, '', True)
         else:
             self.tws_connect.reqMktData(id, contract, '', False)
-            
+#
+#         self.tws_connect.reqMktData(id + TWS_event_handler.TICKER_GAP, contract, '', True)
+#         self.tws_connect.reqMktData(id, contract, '', False)
     
     # returns -1 if not found, else the key id (which could be a zero value)
     def is_subscribed(self, contract):
@@ -159,7 +167,17 @@ class SubscriptionManager(BaseMessageListener):
         #
         # instruct gateway to broadcast new id has been assigned to a new contract
         #
-        self.producer.send_message('gw_subscription_changed', self.producer.message_dumps({id: ContractHelper.object2kvstring(contract)}))
+        '''
+        sample value for gw_ga:
+        {
+        'partition': 0, 'value': '{"target_id": "analytics_engine", "sender_id": "tws_gateway_server", 
+        "subscriptions": [[0, "{\\"m_conId\\": 0, \\"m_right\\": \\"\\", \\"m_symbol\\": \\"HSI\\", \\"m_secType\\": \\"FUT\\", 
+        \\"m_includeExpired\\": false, \\"m_expiry\\": \\"20170330\\", \\"m_currency\\": \\"HKD\\", \\"m_exchange\\": \\"HKFE\\", \\"m_strike\\": 0}"]]}', 
+        'offset': 13
+        }
+        '''      
+        subscription_array =  {'subscriptions': [[id, ContractHelper.object2kvstring(contract)]] }
+        self.producer.send_message('gw_subscription_changed', self.producer.message_dumps( subscription_array   ))
         logging.info('SubscriptionManager:reqMktData. Publish gw_subscription_changed: %d:%s' % (id, ContractHelper.makeRedisKeyEx(contract)))
         
         
@@ -223,10 +241,8 @@ class SubscriptionManager(BaseMessageListener):
             from_id = '<empty_sender_id>'
             
         ic = self.get_id_kvs_contracts(db=False)
-        #print self.producer.message_dumps({'subscriptions': ic, 'sender_id':self.name, 'target_id':from_id})
-        if ic:
-             
-            logging.info('SubscriptionManager:gw_req_subscriptions-------\n%s' % ic)
-            self.producer.send_message('gw_subscriptions', self.producer.message_dumps({'subscriptions': ic, 'sender_id':self.name, 'target_id':from_id}))
+    
         
+        self.producer.send_message('gw_subscriptions', self.producer.message_dumps({'subscriptions': ic , 'sender_id':self.name, 'target_id':from_id}))
+        logging.info('SubscriptionManager:gw_req_subscriptions-------\n%s' % self.producer.message_dumps({'subscriptions': ic , 'sender_id':self.name, 'target_id':from_id}))
        

+ 3 - 0
src/comms/ibgw/tws_event_handler.py

@@ -40,8 +40,11 @@ class TWS_event_handler(EWrapper):
         # deduct the gap to derive the original tickerId
         # --- check logic in subscription manager
         try:
+            
             if (t['tickerId']  >= TWS_event_handler.TICKER_GAP):
+                #print 'tick_process_message************************ SNAPSHOT %d' % t['tickerId']
                 t['tickerId'] = t['tickerId']  - TWS_event_handler.TICKER_GAP
+                
         except (KeyError, ):
             pass          
             

+ 7 - 3
src/comms/ibgw/tws_gateway.py

@@ -40,7 +40,8 @@ class TWS_gateway():
       'session_timeout_ms': 10000,
       'clear_offsets':  False,
       'order_transmit': False,
-      'topics': list(TWS_Protocol.topicMethods) + list(TWS_Protocol.gatewayMethods)
+      'topics': list(TWS_Protocol.topicMethods) + list(TWS_Protocol.gatewayMethods),
+      'reset_db_subscriptions': False
       }
                
     
@@ -122,7 +123,7 @@ class TWS_gateway():
         
         self.contract_subscription_mgr = SubscriptionManager(self.kwargs['name'], self.tws_connection, 
                                                              self.gw_message_handler, 
-                                                             self.get_redis_conn(), self.kwargs['subscription_manager.subscriptions.redis_key'])
+                                                             self.get_redis_conn(), self.kwargs)
         
         
 
@@ -224,6 +225,9 @@ if __name__ == '__main__':
     parser = OptionParser(usage=usage)
     parser.add_option("-c", "--clear_offsets", action="store_true", dest="clear_offsets",
                       help="delete all redis offsets used by this program")
+    parser.add_option("-r", "--reset_db_subscriptions", action="store_true", dest="reset_db_subscriptions",
+                      help="delete subscriptions entries in redis used by this program")
+    
     parser.add_option("-f", "--config_file",
                       action="store", dest="config_file", 
                       help="path to the config file")
@@ -241,7 +245,7 @@ if __name__ == '__main__':
     logconfig['format'] = '%(asctime)s %(levelname)-8s %(message)s'    
     logging.basicConfig(**logconfig)        
 
-    logging.debug('config settings: %s' % kwargs)
+    logging.info('config settings: %s' % kwargs)
     
     app = TWS_gateway(kwargs)
     

+ 2 - 1
src/config/tws_gateway.cfg

@@ -32,4 +32,5 @@ subscription_manager.subscriptions.redis_key: 'subscriptions'
 subscription_manager.topics: ['reqMktData', 'gw_req_subscriptions']
 #logconfig: {'filename': '/home/larry-13.04/workspace/finopt/log/tws_gateway.log', 'filemode': 'w','level': logging.INFO}
 logconfig: {'level': logging.INFO,  'filemode': 'w', 'filename':'/tmp/tws_gateway.log'}
-order_transmit: False
+order_transmit: False
+reset_db_subscriptions: False

+ 37 - 15
src/rethink/analytics_engine.py

@@ -35,14 +35,24 @@ class AnalyticsEngine(Subscriber, AbstractGatewayListener):
         self.option_chains = {}
         
     
-    def test_oc(self):
-        expiry = '20170330'
-        contractTuple = ('HSI', 'FUT', 'HKFE', 'HKD', '', 0, expiry)
+    def test_oc(self, oc2):
+#         expiry = '20170330'
+#         contractTuple = ('HSI', 'FUT', 'HKFE', 'HKD', '', 0, expiry)
+#         contract = ContractHelper.makeContract(contractTuple)  
+# 
+#         oc2.set_option_structure(contract, 200, 50, 0.0012, 0.0328, expiry)        
+#     
+#         oc2.build_chain(24119, 0.02, 0.22)
+        
+        expiry='20170324'
+        contractTuple = ('QQQ', 'STK', 'SMART', 'USD', '', 0, '')
         contract = ContractHelper.makeContract(contractTuple)  
-        oc2 = OptionsChain('qqq-%s' % expiry)
-        oc2.set_option_structure(contract, 200, 50, 0.0012, 0.0328, expiry)        
+
+        oc2.set_option_structure(contract, 0.5, 100, 0.0012, 0.0328, expiry)        
     
-        oc2.build_chain(24119, 0.03, 0.22)
+        oc2.build_chain(132.11, 0.02, 0.22)
+        
+        
         oc2.pretty_print()        
 
         for o in oc2.get_option_chain():
@@ -52,14 +62,15 @@ class AnalyticsEngine(Subscriber, AbstractGatewayListener):
     def start_engine(self):
         self.twsc.start_manager()
         self.request_subscrptions()
-        
-        self.test_oc()
+        oc2 = OptionsChain('oc2')
+        self.test_oc(oc2)
         
         try:
             logging.info('AnalyticsEngine:main_loop ***** accepting console input...')
             while True: 
             
-                sleep(.45)
+                sleep(3.0)
+                oc2.pretty_print()
             
         except (KeyboardInterrupt, SystemExit):
             logging.error('AnalyticsEngine: caught user interrupt. Shutting down...')
@@ -80,11 +91,17 @@ class AnalyticsEngine(Subscriber, AbstractGatewayListener):
     #
     def tds_event_new_symbol_added(self, event, symbol):
        
-        logging.info('tds_event_new_symbol_added. %s' % ContractHelper.object2kvstring(symbol.get_contract()))
+        #logging.info('tds_event_new_symbol_added. %s' % ContractHelper.object2kvstring(symbol.get_contract()))
         self.twsc.reqMktData(symbol.get_contract())
     
     def tds_event_tick_updated(self, event, items):
-        logging.info('tds_event_tick_updated. %s' % items)
+        #tds_event_tick_updated:
+        # dict object: {'partition': 0, 'value': '{"field": 7, "price": 35.0, "canAutoExecute": 0, "tickerId": 10}', 'offset': 527}
+        #logging.info('tds_event_tick_updated. %s' % items)
+        pass
+        # this is a callback after tick price is updated in tds
+        # do not call update again as it will go into an endless loop
+        # function probably to take out later....
     
     #
     # external ae requests
@@ -98,11 +115,12 @@ class AnalyticsEngine(Subscriber, AbstractGatewayListener):
     # gateway events
     #
     def gw_subscription_changed(self, event, message_value):
+        #pass
         logging.info('AnalyticsEngine:%s. val->[%s]' % (event, message_value))
         self.tds.update_datastore(message_value)
              
     def gw_subscriptions(self, event, message_value):
-        logging.info('AnalyticsEngine:%s. val->[%s]' % (event, message_value))
+        logging.info('AnalyticsEngine:%s. Received event.' % (event))
 
         if self.initial_run:
             self.tds.update_datastore(message_value)
@@ -112,7 +130,11 @@ class AnalyticsEngine(Subscriber, AbstractGatewayListener):
     # tws events     
     #
     def tickPrice(self, event, message_value):   
-        self.tds.set_symbol_price(event, message_value)
+        #
+        # dict: {'partition': 0, 'value': '{"field": 2, "price": 0.65, "canAutoExecute": 1, "tickerId": 1}', 'offset': 2151}
+        #
+        self.tds.set_symbol_price(json.loads(message_value['value']))
+        #pass
 
  
     def error(self, event, message_value):
@@ -135,8 +157,8 @@ if __name__ == '__main__':
       'tws_app_id': 38868,
       'group_id': 'AE',
       'session_timeout_ms': 10000,
-      'clear_offsets':  False,
-      'logconfig': {'level': logging.INFO},
+      'clear_offsets':  True,
+      'logconfig': {'level': logging.INFO, 'filemode': 'w', 'filename': '/tmp/ae.log'},
       'topics': ['tickPrice', 'gw_subscriptions', 'gw_subscription_changed'],
       'seek_to_end':['tickSize', 'tickPrice']
       }

+ 36 - 26
src/rethink/tick_datastore.py

@@ -65,9 +65,10 @@ class TickDataStore(Publisher):
             # print ', '.join('[%s:%s]' % (k, v['ticker_id'])) 
         logging.info('TickDataStore-symbols:\nkey : ticker : object cnt---->\n%s' % ('\n'.join('[%s :  %d : %d]' % 
                                                                                                 (k, v['ticker_id'], len(v['syms'])) for k, v in self.symbols.iteritems())))
-        logging.info('TickDataStore-tickers:\nticker: object%s' % ('\n'.join('%s:%s' % (str(k).ljust(4), 
-                                                                          ContractHelper.makeRedisKeyEx(v))) for k, v in self.tickers.iteritems()))
+        logging.info('TickDataStore-tickers:\nticker: object\n%s' % ('\n'.join('%s:%s' % (str(k).ljust(4), v) for k, v in self.tickers.iteritems())
+                                                                   ))
     
+        
      
     
     def add_symbol(self, symbol):
@@ -78,12 +79,16 @@ class TickDataStore(Publisher):
             if key not in self.symbols:
                 self.symbols[key] = {'ticker_id':-1, 'syms': []}
                 dispatch = True
-                
+            else:
+                ticker_id = self.symbols[key]['ticker_id']
+                self.tickers[ticker_id] = key
             self.symbols[key]['syms'].append(symbol)        
     
             # defer the dispatch at the end of this method        
             if dispatch:
                 self.dispatch(TickDataStore.EVENT_NEW_SYMBOL_ADDED, symbol)
+        except KeyError:
+            logging.error('TickDataStore: add_symbol. Exception when adding symbol:%s' % ContractHelper.makeRedisKeyEx(symbol.get_contract()))
         finally:            
             self.lock.release()
             
@@ -92,42 +97,29 @@ class TickDataStore(Publisher):
     
        
         
-    def set_symbol_price(self, event, message_value):   
+    def set_symbol_price(self, items):   
+        
+        # message_value: dict: '{"tickerId": 0, "size": 3, "field": 3}'
         
-        # 'value': '{"tickerId": 0, "size": 3, "field": 3}'
         
-        items = json.loads(message_value['value'])
         tid = items['tickerId']
-
+        logging.debug('set_symbol_price: -------------------')
         try:
             self.lock.acquire()
             contract_key = self.tickers[tid]
-            # print contract_key
+            logging.debug('set_symbol_price: -------------------tick id:%d symbol list length=%d' % (tid, len(self.symbols[contract_key]['syms'])))
             map(lambda e: e.set_tick_value(items['field'], items['price']), self.symbols[contract_key]['syms'])
             
         except KeyError:
             # contract not set up in the datastore, ignore message
+            logging.error('set_symbol_price: KeyError: %d' % tid)
+            self.dump()
             pass
         finally:
             self.lock.release()
-            self.dispatch(TickDataStore.EVENT_TICK_UPDATED, message_value)
+            self.dispatch(TickDataStore.EVENT_TICK_UPDATED, items)
             
-    def set_datastore_values(self, idc):
-        
 
-        key = ContractHelper.makeRedisKeyEx(idc[1])
-        if key in self.symbols and idc[0] <> self.symbols[key]['ticker_id']:
-            # if this condition is met, one should delete the old entry
-            # and move all object references to the new key/ticker_id
-            raise
-        
-        self.tickers[idc[0]] = key
-        try:
-            self.symbols[key]['ticker_id'] = idc[0]
-        except KeyError:
-            self.symbols[key] = {'ticker_id': idc[0],
-                                   'syms': []}
-        return key
 
     def update_datastore(self, subscription_message_value):
         '''
@@ -139,7 +131,25 @@ class TickDataStore(Publisher):
         'offset': 13
         }
         '''
-
+        def set_datastore_values(idc):
+            
+            
+            key = ContractHelper.makeRedisKeyEx(idc[1])
+            if key in self.symbols and idc[0] <> self.symbols[key]['ticker_id']:
+                # if this condition is met, one should delete the old entry
+                # and move all object references to the new key/ticker_id
+                if self.symbols[key]['ticker_id'] <> -1:
+                    raise
+            
+            self.tickers[idc[0]] = key
+            try:
+                self.symbols[key]['ticker_id'] = idc[0]
+                
+            except KeyError:
+                self.symbols[key] = {'ticker_id': idc[0], 'syms': []}
+            
+            self.dump()
+            return key
                 
 
         
@@ -152,7 +162,7 @@ class TickDataStore(Publisher):
             items = json.loads(subscription_message_value['value'])
             logging.info('TickDataStore:update_datastore. items: %s ' % items)
             id_contracts = map(lambda x: (x[0], ContractHelper.kvstring2contract(utf2asc(x[1]))), items['subscriptions'])
-               
+            map(set_datastore_values, id_contracts)
             self.dump()
         except TypeError:
             logging.error('TickDataStore:gw_subscriptions. Exception when trying to get id:contracts.')

+ 2 - 1
src/sh/start_twsgw.sh

@@ -9,4 +9,5 @@ else
 	FINOPT_HOME=~/l1304/workspace/finopt-ironfly/finopt/src
 fi
 export PYTHONPATH=$FINOPT_HOME:$PYTHONPATH
-python $FINOPT_HOME/comms/ibgw/tws_gateway.py -c -f $FINOPT_HOME/config/tws_gateway.cfg 
+#python $FINOPT_HOME/comms/ibgw/tws_gateway.py -r -c -f $FINOPT_HOME/config/tws_gateway.cfg 
+python $FINOPT_HOME/comms/ibgw/tws_gateway.py  -f $FINOPT_HOME/config/tws_gateway.cfg