Explorar o código

integrate analytics_engine code with tick_datastore and option_chain

esurfer %!s(int64=9) %!d(string=hai) anos
pai
achega
3879c95808

+ 6 - 8
src/comms/ibc/base_client_messaging.py

@@ -56,6 +56,10 @@ class GatewayCommandWrapper():
 #     def reqMktData(self, contract):
 #     def reqMktData(self, contract):
 #         self.producer.send_message('reqMktData', ContractHelper.object2kvstring(contract))
 #         self.producer.send_message('reqMktData', ContractHelper.object2kvstring(contract))
     def reqMktData(self, contract, snapshot=False):
     def reqMktData(self, contract, snapshot=False):
+        # send request to TWS gateway for market data
+        # contract - typeof ib.ext.contract
+        # snapshot - True/False default is false to receive continuous updates
+        #
         self.producer.send_message('reqMktData', json.dumps({'contract': ContractHelper.object2kvstring(contract), 
         self.producer.send_message('reqMktData', json.dumps({'contract': ContractHelper.object2kvstring(contract), 
                                                   'snapshot': snapshot}))        
                                                   'snapshot': snapshot}))        
         
         
@@ -236,13 +240,7 @@ class AbstractGatewayListener(BaseMessageListener):
         """ generated source for method accountSummaryEnd """
         """ generated source for method accountSummaryEnd """
         raise NotImplementedException
         raise NotImplementedException
 
 
-    def gw_subscription_changed(self, event, message_value):  # event, items):
-        raise NotImplementedException        
-#         logging.info("[%s] received gw_subscription_changed content: [%s]" % (self.name, message_value))
-        
-    def gw_subscriptions(self, event, message_value):
-        raise NotImplementedException        
-      
+     
     def error(self, id=None, errorCode=None, errorMsg=None):
     def error(self, id=None, errorCode=None, errorMsg=None):
         raise NotImplementedException
         raise NotImplementedException
     
     
@@ -250,4 +248,4 @@ class AbstractGatewayListener(BaseMessageListener):
         logging.info("[%s] received on_kb_reached_last_offset content: [%s]" % (self.name, message_value))
         logging.info("[%s] received on_kb_reached_last_offset content: [%s]" % (self.name, message_value))
         print "on_kb_reached_last_offset [%s] %s" % (self.name, message_value)
         print "on_kb_reached_last_offset [%s] %s" % (self.name, message_value)
     
     
-        
+

+ 2 - 7
src/comms/ibc/gw_ex_request_exit.py

@@ -33,13 +33,7 @@ class MessageListener(AbstractGatewayListener):
         logging.info('MessageListener:%s. val->[%s]' % (event, vars()))  
         logging.info('MessageListener:%s. val->[%s]' % (event, vars()))  
 
 
 
 
-    def gw_subscriptions(self, event, message_value):
-        logging.info('MessageListener:%s. val->[%s]' % (event, message_value))
-        
-
-    def gw_subscription_changed(self, event, message_value):
-        logging.info('MessageListener:%s. val->[%s]' % (event, message_value))
-        
+      
 
 
     def tickPrice(self, event, contract_key, field, price, canAutoExecute):
     def tickPrice(self, event, contract_key, field, price, canAutoExecute):
         #logging.info('MessageListener:%s. %s %d %8.2f' % (event, contract_key, field, price))
         #logging.info('MessageListener:%s. %s %d %8.2f' % (event, contract_key, field, price))
@@ -64,6 +58,7 @@ def test_client(kwargs):
                       ('USD', 'CASH', 'IDEALPRO', 'JPY', '', 0, ''),
                       ('USD', 'CASH', 'IDEALPRO', 'JPY', '', 0, ''),
                       ('AUD', 'CASH', 'IDEALPRO', 'USD', '', 0, ''),
                       ('AUD', 'CASH', 'IDEALPRO', 'USD', '', 0, ''),
                       ('QQQ', 'STK', 'SMART', 'USD', '', 0, ''),
                       ('QQQ', 'STK', 'SMART', 'USD', '', 0, ''),
+                      ('YM', 'IND', 'ECBOT', 'USD', '', 0, ''),
                       ]
                       ]
                           
                           
                               
                               

+ 4 - 6
src/misc2/observer.py

@@ -37,14 +37,12 @@ class Publisher:
         del self.get_subscribers(event)[who]
         del self.get_subscribers(event)[who]
     
     
     def dispatch(self, event, params=None):
     def dispatch(self, event, params=None):
-        
+        #print 'observer:: subscriber**** %s' % params
         for subscriber, callback in self.get_subscribers(event).items():
         for subscriber, callback in self.get_subscribers(event).items():
+            
             callback(event, **params)
             callback(event, **params)
-            #print 'observer:: subscriber**** %s' % subscriber
-#             try:
-#                 callback(event, **params)
-#             except TypeError:
-#                 logging.error (sys.exc_info()[0])
+                
+
             
             
 #############################################################
 #############################################################
 # Test classes to demo usage of Publisher and Subscriber
 # Test classes to demo usage of Publisher and Subscriber

+ 44 - 44
src/rethink/analytics_engine.py

@@ -5,15 +5,16 @@ from optparse import OptionParser
 from time import sleep
 from time import sleep
 from misc2.observer import Subscriber
 from misc2.observer import Subscriber
 from misc2.helpers import ContractHelper
 from misc2.helpers import ContractHelper
-from finopt.options_chain import OptionsChain
+from finopt.instrument import Symbol
+from rethink.option_chain import OptionsChain
 from rethink.tick_datastore import TickDataStore
 from rethink.tick_datastore import TickDataStore
 from comms.ibc.tws_client_lib import TWS_client_manager, AbstractGatewayListener
 from comms.ibc.tws_client_lib import TWS_client_manager, AbstractGatewayListener
+from comms.ibgw.base_messaging import BaseMessageListener
 
 
 
 
 
 
 
 
-
-class AnalyticsEngine(Subscriber, AbstractGatewayListener):
+class AnalyticsEngine(AbstractGatewayListener):
 
 
     AE_OPTIONS_CONFIG = {
     AE_OPTIONS_CONFIG = {
         'underlying_substitution': {'IND': 'FUT'},
         'underlying_substitution': {'IND': 'FUT'},
@@ -39,9 +40,9 @@ class AnalyticsEngine(Subscriber, AbstractGatewayListener):
         expiry = '20170330'
         expiry = '20170330'
         contractTuple = ('HSI', 'FUT', 'HKFE', 'HKD', '', 0, expiry)
         contractTuple = ('HSI', 'FUT', 'HKFE', 'HKD', '', 0, expiry)
         contract = ContractHelper.makeContract(contractTuple)  
         contract = ContractHelper.makeContract(contractTuple)  
- 
+        
         oc2.set_option_structure(contract, 200, 50, 0.0012, 0.0328, expiry)        
         oc2.set_option_structure(contract, 200, 50, 0.0012, 0.0328, expiry)        
-     
+        
         oc2.build_chain(24119, 0.03, 0.22)
         oc2.build_chain(24119, 0.03, 0.22)
         
         
 #         expiry='20170324'
 #         expiry='20170324'
@@ -58,12 +59,13 @@ class AnalyticsEngine(Subscriber, AbstractGatewayListener):
         for o in oc2.get_option_chain():
         for o in oc2.get_option_chain():
             self.tds.add_symbol(o)
             self.tds.add_symbol(o)
         self.tds.add_symbol(oc2.get_underlying())
         self.tds.add_symbol(oc2.get_underlying())
-    
+        
     
     
     def start_engine(self):
     def start_engine(self):
         self.twsc.start_manager()
         self.twsc.start_manager()
-        self.request_subscrptions()
         oc2 = OptionsChain('oc2')
         oc2 = OptionsChain('oc2')
+        oc2.register_listener(self)
+        
         self.test_oc(oc2)
         self.test_oc(oc2)
         
         
         try:
         try:
@@ -71,8 +73,9 @@ class AnalyticsEngine(Subscriber, AbstractGatewayListener):
             while True: 
             while True: 
             
             
 
 
+                read_ch = raw_input("Enter command:")
                 oc2.pretty_print()
                 oc2.pretty_print()
-                sleep(10.0)
+                sleep(0.45)
             
             
         except (KeyboardInterrupt, SystemExit):
         except (KeyboardInterrupt, SystemExit):
             logging.error('AnalyticsEngine: caught user interrupt. Shutting down...')
             logging.error('AnalyticsEngine: caught user interrupt. Shutting down...')
@@ -81,30 +84,41 @@ class AnalyticsEngine(Subscriber, AbstractGatewayListener):
             logging.info('AnalyticsEngine: Service shut down complete...')               
             logging.info('AnalyticsEngine: Service shut down complete...')               
     
     
     
     
-    def request_subscrptions(self):
-        self.initial_run = True
-        self.twsc.gw_req_subscriptions(self.kwargs['name'])
-        while self.initial_run:
-            sleep(0.5)
-
+    #         EVENT_OPTION_UPDATED = 'oc_option_updated'
+    #         EVENT_UNDERLYING_ADDED = 'oc_underlying_added
+    def oc_option_updated(self, event, update_mode, name, instrument):        
+        logging.info('oc_option_updated. %s %s' % (event, vars()))
+        self.tds.add_symbol(instrument)
+        self.twsc.reqMktData(instrument.get_contract(), True)
+        
+    
+    def oc_underlying_added(self, event, update_mode, name, instrument):
+        
+        logging.info('oc_underlying_added. %s %s' % (event, vars()))
+        self.tds.add_symbol(instrument)
+        self.twsc.reqMktData(instrument.get_contract(), True)
 
 
     #
     #
     # tds call backs
     # tds call backs
     #
     #
-    def tds_event_new_symbol_added(self, event, symbol):
-       
+    #     
+    #         EVENT_TICK_UPDATED = 'tds_event_tick_updated'
+    #         EVENT_SYMBOL_ADDED = 'tds_event_symbol_added'
+    #         EVENT_SYMBOL_DELETED = 'tds_event_symbol_deleted'    
+    
+    def tds_event_symbol_added(self, event, update_mode, name, instrument):
+       pass
         #logging.info('tds_event_new_symbol_added. %s' % ContractHelper.object2kvstring(symbol.get_contract()))
         #logging.info('tds_event_new_symbol_added. %s' % ContractHelper.object2kvstring(symbol.get_contract()))
-        self.twsc.reqMktData(symbol.get_contract())
+        
     
     
-    def tds_event_tick_updated(self, event, items):
+    def tds_event_tick_updated(self, event, contract_key, field, price, canAutoExecute):
         #tds_event_tick_updated:
         #tds_event_tick_updated:
         # dict object: {'partition': 0, 'value': '{"field": 7, "price": 35.0, "canAutoExecute": 0, "tickerId": 10}', 'offset': 527}
         # dict object: {'partition': 0, 'value': '{"field": 7, "price": 35.0, "canAutoExecute": 0, "tickerId": 10}', 'offset': 527}
         #logging.info('tds_event_tick_updated. %s' % items)
         #logging.info('tds_event_tick_updated. %s' % items)
         pass
         pass
-        # this is a callback after tick price is updated in tds
-        # do not call update again as it will go into an endless loop
-        # function probably to take out later....
-    
+
+    def tds_event_symbol_deleted(self, event, update_mode, name, instrument):
+        pass
     #
     #
     # external ae requests
     # external ae requests
     #
     #
@@ -119,28 +133,14 @@ class AnalyticsEngine(Subscriber, AbstractGatewayListener):
     #
     #
     # gateway events
     # gateway events
     #
     #
-    def gw_subscription_changed(self, event, message_value):
-        #pass
-        logging.info('AnalyticsEngine:%s. val->[%s]' % (event, message_value))
-        self.tds.update_datastore(message_value)
-             
-    def gw_subscriptions(self, event, message_value):
-        logging.info('AnalyticsEngine:%s. Received event.' % (event))
 
 
-        if self.initial_run:
-            self.tds.update_datastore(message_value)
-            self.initial_run = False
-
-    #            
-    # tws events     
-    #
-    def tickPrice(self, event, message_value):   
-        #
-        # dict: {'partition': 0, 'value': '{"field": 2, "price": 0.65, "canAutoExecute": 1, "tickerId": 1}', 'offset': 2151}
-        #
-        self.tds.set_symbol_price(json.loads(message_value['value']))
-        #pass
+    def tickPrice(self, event, contract_key, field, price, canAutoExecute):
+        logging.info('MessageListener:%s. %s %d %8.2f' % (event, contract_key, field, price))
+        self.tds.set_symbol_tick_price(contract_key, field, price, canAutoExecute)
 
 
+    def tickSize(self, event, contract_key, field, size):
+        self.tds.set_symbol_tick_price(contract_key, field, size, 0)
+        #logging.info('MessageListener:%s. %s: %d %8.2f' % (event, contract_key, field, size))
  
  
     def error(self, event, message_value):
     def error(self, event, message_value):
         logging.info('AnalyticsEngine:%s. val->[%s]' % (event, message_value))         
         logging.info('AnalyticsEngine:%s. val->[%s]' % (event, message_value))         
@@ -162,9 +162,9 @@ if __name__ == '__main__':
       'tws_app_id': 38868,
       'tws_app_id': 38868,
       'group_id': 'AE',
       'group_id': 'AE',
       'session_timeout_ms': 10000,
       'session_timeout_ms': 10000,
-      'clear_offsets':  True,
+      'clear_offsets':  False,
       'logconfig': {'level': logging.INFO, 'filemode': 'w', 'filename': '/tmp/ae.log'},
       'logconfig': {'level': logging.INFO, 'filemode': 'w', 'filename': '/tmp/ae.log'},
-      'topics': ['tickPrice', 'gw_subscriptions', 'gw_subscription_changed', 'ae_req_tds_internal'],
+      'topics': ['tickPrice'],
       'seek_to_end': ['*'],
       'seek_to_end': ['*'],
       #'seek_to_end':['tickSize', 'tickPrice','gw_subscriptions', 'gw_subscription_changed']
       #'seek_to_end':['tickSize', 'tickPrice','gw_subscriptions', 'gw_subscription_changed']
       }
       }

+ 46 - 11
src/rethink/option_chain.py

@@ -6,7 +6,8 @@ from misc2.helpers import ContractHelper, dict2str
 from finopt.instrument import Symbol, Option
 from finopt.instrument import Symbol, Option
 from comms.ibc.base_client_messaging import AbstractGatewayListener
 from comms.ibc.base_client_messaging import AbstractGatewayListener
 from misc2.observer import Publisher, Subscriber 
 from misc2.observer import Publisher, Subscriber 
-#from misc2.helpers import ContractHelper, OrderHelper, ExecutionFilterHelper
+from misc2.observer import NotImplementedException
+
 
 
 from time import sleep
 from time import sleep
 import finopt.optcal
 import finopt.optcal
@@ -21,24 +22,48 @@ class OptionsChain(Publisher):
     # options is a list containing Option object
     # options is a list containing Option object
     options = None
     options = None
     
     
-    id = None
+    name = None
     div = 0.0
     div = 0.0
     rate = 0.0
     rate = 0.0
     expiry = None
     expiry = None
     trade_vol = None
     trade_vol = None
     #iv = optcal.cal_implvol(spot, contract.m_strike, contract.m_right, today, contract.m_expiry, rate, div, vol, premium)
     #iv = optcal.cal_implvol(spot, contract.m_strike, contract.m_right, today, contract.m_expiry, rate, div, vol, premium)
     
     
-    option_chain_events = ('on_option_added', 'on_option_deleted', 'on_option_updated')
     
     
-    def __init__(self, id):
-        self.id = id
+    
+    '''
+        EVENT_OPTION_UPDATED 
+        
+        param = {'update_mode': A|D|U <- add/udpate/delete,
+                 'name': name_of_this_oc,
+                 'instrument: the option associated with this event 
+                }
+                
+        EVENT_UNDERLYING_ADDED
+        param = {'update_mode':
+                 'name':
+                 'instrument': 
+                
+    '''
+    EVENT_OPTION_UPDATED = 'oc_option_updated'
+    EVENT_UNDERLYING_ADDED = 'oc_underlying_added'
+    OC_EVENTS = [EVENT_OPTION_UPDATED, EVENT_UNDERLYING_ADDED]     
+    
+    def __init__(self, name):
+        self.name = name
         self.options = []
         self.options = []
-        Publisher.__init__(self, OptionsChain.option_chain_events)
+        Publisher.__init__(self, OptionsChain.OC_EVENTS)
         
         
 
 
+    def register_listener(self, listener):
+        try:
+            map(lambda e: self.register(e, listener, getattr(listener, e)), OptionsChain.OC_EVENTS)
+        except AttributeError as e:
+            logging.error("OptionsChain:add_listener_topics. Function not implemented in the listener. %s" % e)
+            raise NotImplementedException        
     
     
-    def get_id(self):
-        return self.id
+    def get_name(self):
+        return self.name
     
     
     def get_underlying(self):
     def get_underlying(self):
         return self.underlying
         return self.underlying
@@ -86,7 +111,10 @@ class OptionsChain(Publisher):
         #     notify listener(s) the option's underlying
         #     notify listener(s) the option's underlying
         #     allowing the listeners to store the reference to OptionsChain underlying 
         #     allowing the listeners to store the reference to OptionsChain underlying 
         #
         #
-        self.dispatch(OptionsChain.option_chain_events[0], self.get_underlying())
+        self.dispatch(OptionsChain.EVENT_UNDERLYING_ADDED, {'update_mode': 'A', 
+                                                            'name': self.name,
+                                                            'instrument' : self.get_underlying()}
+                      )
         #
         #
         #
         #
         #
         #
@@ -157,7 +185,10 @@ class OptionsChain(Publisher):
         #
         #
         # 
         # 
         self.options.append(option)
         self.options.append(option)
-        self.dispatch(OptionsChain.option_chain_events[0], option)
+        self.dispatch(OptionsChain.EVENT_OPTION_UPDATED, {'update_mode': 'A', 
+                                                            'name': self.name,
+                                                            'instrument' : option}
+                      )
     
     
     
     
     def pretty_print(self):
     def pretty_print(self):
@@ -179,6 +210,10 @@ class OptionsChain(Publisher):
         fmt_spec = '%8.2f'
         fmt_spec = '%8.2f'
         fmt_spec2 = '%8.4f'
         fmt_spec2 = '%8.4f'
         fmt_specq = '%8d'
         fmt_specq = '%8d'
+        
+        # last is 4
+        # close is 9
+        #fmt_call = map(lambda x: (x[0], '%s,%s,%s,%s,%s,%s,%s,%s' % (format_tick_val(x[1].get_tick_value(9), fmt_spec),
         fmt_call = map(lambda x: (x[0], '%s,%s,%s,%s,%s,%s,%s,%s' % (format_tick_val(x[1].get_tick_value(4), fmt_spec),
         fmt_call = map(lambda x: (x[0], '%s,%s,%s,%s,%s,%s,%s,%s' % (format_tick_val(x[1].get_tick_value(4), fmt_spec),
                                                format_tick_val(x[1].get_tick_value(0), fmt_specq),
                                                format_tick_val(x[1].get_tick_value(0), fmt_specq),
                                                format_tick_val(x[1].get_tick_value(1), fmt_spec),
                                                format_tick_val(x[1].get_tick_value(1), fmt_spec),
@@ -203,7 +238,7 @@ class OptionsChain(Publisher):
                                   format_tick_val(self.get_underlying().get_tick_value(0), fmt_specq),
                                   format_tick_val(self.get_underlying().get_tick_value(0), fmt_specq),
                                            format_tick_val(self.get_underlying().get_tick_value(1), fmt_spec),
                                            format_tick_val(self.get_underlying().get_tick_value(1), fmt_spec),
                                            format_tick_val(self.get_underlying().get_tick_value(2), fmt_spec),
                                            format_tick_val(self.get_underlying().get_tick_value(2), fmt_spec),
-                                           format_tick_val(self.get_underlying().get_tick_value(3), fmt_spec)
+                                           format_tick_val(self.get_underlying().get_tick_value(3), fmt_specq)
                                 )
                                 )
         
         
         #title = '%s%30s%s%s' % ('-' * 40, ContractHelper.makeRedisKeyEx(self.get_underlying().get_contract()).center(50, ' '), undlypx, '-' * 40) 
         #title = '%s%30s%s%s' % ('-' * 40, ContractHelper.makeRedisKeyEx(self.get_underlying().get_contract()).center(50, ' '), undlypx, '-' * 40) 

+ 35 - 86
src/rethink/tick_datastore.py

@@ -5,46 +5,31 @@ from misc2.observer import Publisher
 from misc2.observer import NotImplementedException
 from misc2.observer import NotImplementedException
 from misc2.helpers import ContractHelper
 from misc2.helpers import ContractHelper
 from comms.ibc.base_client_messaging import AbstractGatewayListener
 from comms.ibc.base_client_messaging import AbstractGatewayListener
-from numpy import disp
 import symbol
 import symbol
 
 
 class TickDataStore(Publisher):
 class TickDataStore(Publisher):
     """
     """
     
     
     Data structure:
     Data structure:
-        tickers map contains key value pairs of ticker id mapped to Symbol primary key
-        tickers => {id1: key1, id2:key2...}
-        
-        example: tickers = {9: 'QQQ-20170217-127.00-C-OPT-USD-SMART-102'
-                            43: 'QQQ-20170217-124.00-C-OPT-USD-SMART-102' ...}
-                            
-        symbols map contains key value pairs of Symbol primary key mapped to a dict object.
-        The dict object contains the ticker id and a list of Symbol objects associated with ticker_id
-        symbols => {key1: 
-                        { 'ticker_id': id1, 
-                          'syms' : [<object ref to Symbol1>,<object ref to Symbol2>...]
-                        }
-                    key2:
-                        ...
-                   }
-        
-        example: symbols = {'QQQ-20170217-127.00-C-OPT-USD-SMART-102':
-                                {'ticker_id': 9, 
-                                 'syms': [<object ref to Symbol QQQ>, ...]
-                                }
-                            }
-                            
-        Usage:
-        Given a ticker_id, the Symbol key can be looked up from tickers
-        With the Symbol key obtained, the reference to the actual object associated with the ticker_id can be retrieved
-        by looking up from symbols[key]['syms']
-        
-        speed: 2 x O(1) + n
+
     
     
     
     
     """
     """
     
     
-
+    '''
+        EVENT_TICK_UPDATED 
+        
+        param = {'update_mode': A|D|U <- add/udpate/delete,
+                 'name': name_of_this_oc,
+                 'instrument: the option associated with this event 
+                }
+                
+        EVENT_UNDERLYING_ADDED
+        param = {'update_mode':
+                 'name':
+                 'instrument': 
+                
+    '''
     EVENT_TICK_UPDATED = 'tds_event_tick_updated'
     EVENT_TICK_UPDATED = 'tds_event_tick_updated'
     EVENT_SYMBOL_ADDED = 'tds_event_symbol_added'
     EVENT_SYMBOL_ADDED = 'tds_event_symbol_added'
     EVENT_SYMBOL_DELETED = 'tds_event_symbol_deleted'
     EVENT_SYMBOL_DELETED = 'tds_event_symbol_deleted'
@@ -54,15 +39,21 @@ class TickDataStore(Publisher):
     def __init__(self, name):
     def __init__(self, name):
         
         
         self.symbols = {}
         self.symbols = {}
-        
+        self.name = name
         
         
         self.lock = RLock()
         self.lock = RLock()
         Publisher.__init__(self, TickDataStore.TDS_EVENTS)
         Publisher.__init__(self, TickDataStore.TDS_EVENTS)
         self.first_run = True
         self.first_run = True
         
         
         
         
-    def register_listener(self, l):
-        map(lambda e: self.register(e, l, getattr(l, e)), TickDataStore.TDS_EVENTS)
+    def register_listener(self, listener):
+        
+        try:
+            map(lambda e: self.register(e, listener, getattr(listener, e)), TickDataStore.TDS_EVENTS)
+        except AttributeError as e:
+            logging.error("TickDataStore:register_listener. Function not implemented in the listener. %s" % e)
+            raise NotImplementedException        
+        
 
 
     def dump(self):
     def dump(self):
     
     
@@ -87,18 +78,18 @@ class TickDataStore(Publisher):
                 return ''
                 return ''
 
 
         
         
-        fmt_sym = map(lambda x: (x[0], '%s,%s,%s,%s,%s' % (
+        fmt_sym = map(lambda x: (x[0], '%s,%s,%s,%s,%s,%s' % (
                                             format_tick_val(get_field(x[1]['syms'],4), fmt_spec),
                                             format_tick_val(get_field(x[1]['syms'],4), fmt_spec),
                                             format_tick_val(get_field(x[1]['syms'],0), fmt_specq),                                                                                                                  
                                             format_tick_val(get_field(x[1]['syms'],0), fmt_specq),                                                                                                                  
                                             format_tick_val(get_field(x[1]['syms'],1), fmt_spec),
                                             format_tick_val(get_field(x[1]['syms'],1), fmt_spec),
                                             format_tick_val(get_field(x[1]['syms'],2), fmt_spec), 
                                             format_tick_val(get_field(x[1]['syms'],2), fmt_spec), 
                                             format_tick_val(get_field(x[1]['syms'],3), fmt_specq),
                                             format_tick_val(get_field(x[1]['syms'],3), fmt_specq),
-                                            
+                                            format_tick_val(get_field(x[1]['syms'],9), fmt_spec),
                                             )), [(k,v) for k, v in self.symbols.iteritems()])        
                                             )), [(k,v) for k, v in self.symbols.iteritems()])        
         
         
 
 
         for e in fmt_sym:
         for e in fmt_sym:
-            print('[%s]%s' % (e[0].ljust(50), e[1]))
+            print('[%s]%s' % (e[0].ljust(40), e[1]))
 
 
     def is_symbol_in_list(self, symbol, list):
     def is_symbol_in_list(self, symbol, list):
     
     
@@ -128,7 +119,9 @@ class TickDataStore(Publisher):
         finally:            
         finally:            
             self.lock.release()        
             self.lock.release()        
             if dispatch:
             if dispatch:
-                self.dispatch(TickDataStore.EVENT_SYMBOL_ADDED, symbol)
+                self.dispatch(TickDataStore.EVENT_SYMBOL_ADDED, {'update_mode': 'A', 
+                                                            'name': self.name,
+                                                            'instrument' : symbol})
             
             
             
             
     def del_symbol(self, symbol):
     def del_symbol(self, symbol):
@@ -150,7 +143,9 @@ class TickDataStore(Publisher):
         finally:            
         finally:            
             self.lock.release()   
             self.lock.release()   
             if dispatch:
             if dispatch:
-               self.dispatch(TickDataStore.EVENT_SYMBOL_DELETED, symbol)                 
+               self.dispatch(TickDataStore.EVENT_SYMBOL_DELETED,  {'update_mode': 'D', 
+                                                            'name': self.name,
+                                                            'instrument' : symbol})                 
                                     
                                     
     
     
        
        
@@ -174,59 +169,13 @@ class TickDataStore(Publisher):
             pass
             pass
         finally:
         finally:
             self.lock.release()
             self.lock.release()
-            self.dispatch(TickDataStore.EVENT_TICK_UPDATED, vars())
+            self.dispatch(TickDataStore.EVENT_TICK_UPDATED, {'contract_key': contract_key, 'field': field, 
+                                                             'price': price, 'canAutoExecute': canAutoExecute})
             
             
 
 
 
 
-    def update_datastore(self, subscription_message_value):
-        '''
-        sample value:
-        {
-        'partition': 0, 'value': '{"target_id": "analytics_engine", "sender_id": "tws_gateway_server", 
-        "subscriptions": [[0, "{\\"m_conId\\": 0, \\"m_right\\": \\"\\", \\"m_symbol\\": \\"HSI\\", \\"m_secType\\": \\"FUT\\", 
-        \\"m_includeExpired\\": false, \\"m_expiry\\": \\"20170330\\", \\"m_currency\\": \\"HKD\\", \\"m_exchange\\": \\"HKFE\\", \\"m_strike\\": 0}"]]}', 
-        'offset': 13
-        }
-        '''
-        def set_datastore_values(idc):
-            
-            
-            key = ContractHelper.makeRedisKeyEx(idc[1])
-            if key in self.symbols and idc[0] <> self.symbols[key]['ticker_id']:
-                # if this condition is met, one should delete the old entry
-                # and move all object references to the new key/ticker_id
-                if self.symbols[key]['ticker_id'] <> -1:
-                    raise
-            
-            self.tickers[idc[0]] = key
-            try:
-                self.symbols[key]['ticker_id'] = idc[0]
-                
-            except KeyError:
-                self.symbols[key] = {'ticker_id': idc[0], 'syms': []}
-            
-            self.dump()
-            return key
-                
 
 
         
         
-        try:
-            def utf2asc(x):
-                return x if isinstance(x, unicode) else x
-        
-            self.lock.acquire()
-            
-            items = json.loads(subscription_message_value['value'])
-            logging.info('TickDataStore:update_datastore. items: %s ' % items)
-            id_contracts = map(lambda x: (x[0], ContractHelper.kvstring2contract(utf2asc(x[1]))), items['subscriptions'])
-            map(set_datastore_values, id_contracts)
-            self.dump()
-        except TypeError:
-            logging.error('TickDataStore:gw_subscriptions. Exception when trying to get id:contracts.')
-            return None       
-        finally:
-            self.lock.release()     
-        
             
             
 
 
 
 

+ 1 - 0
src/sh/gw_ex_request_exit.sh

@@ -10,3 +10,4 @@ else
 fi
 fi
 export PYTHONPATH=$FINOPT_HOME:$PYTHONPATH
 export PYTHONPATH=$FINOPT_HOME:$PYTHONPATH
 python $FINOPT_HOME/comms/ibc/gw_ex_request_exit.py -g AAA -n dumpty 
 python $FINOPT_HOME/comms/ibc/gw_ex_request_exit.py -g AAA -n dumpty 
+#python $FINOPT_HOME/comms/ibc/gw_ex_request_exit.py -c -g AAA -n dumpty 

+ 12 - 0
src/sh/start_twsgw.sh

@@ -9,6 +9,18 @@ else
 	FINOPT_HOME=~/l1304/workspace/finopt-ironfly/finopt/src
 	FINOPT_HOME=~/l1304/workspace/finopt-ironfly/finopt/src
 fi
 fi
 export PYTHONPATH=$FINOPT_HOME:$PYTHONPATH
 export PYTHONPATH=$FINOPT_HOME:$PYTHONPATH
+#  
+# clear all topic offsets and erased saved subscriptions
 #python $FINOPT_HOME/comms/ibgw/tws_gateway.py -r -c -f $FINOPT_HOME/config/tws_gateway.cfg 
 #python $FINOPT_HOME/comms/ibgw/tws_gateway.py -r -c -f $FINOPT_HOME/config/tws_gateway.cfg 
+
+
+#
+# clear offsets in redis / reload saved subscription entries
+#python $FINOPT_HOME/comms/ibgw/tws_gateway.py  -c -f $FINOPT_HOME/config/tws_gateway.cfg 
+
+
+# restart gateway keep the redis offsets but erase the subscription entries
 #python $FINOPT_HOME/comms/ibgw/tws_gateway.py  -r -f $FINOPT_HOME/config/tws_gateway.cfg 
 #python $FINOPT_HOME/comms/ibgw/tws_gateway.py  -r -f $FINOPT_HOME/config/tws_gateway.cfg 
+
+# normal restart - keep the offsets and reload from saved subscription entries
 python $FINOPT_HOME/comms/ibgw/tws_gateway.py   -f $FINOPT_HOME/config/tws_gateway.cfg 
 python $FINOPT_HOME/comms/ibgw/tws_gateway.py   -f $FINOPT_HOME/config/tws_gateway.cfg