bobhk пре 9 година
родитељ
комит
1e78e68355
3 измењених фајлова са 451 додато и 17 уклоњено
  1. 47 17
      src/rethink/analytics_engine.py
  2. 220 0
      src/rethink/option_chain.py
  3. 184 0
      src/rethink/tick_datastore.py

+ 47 - 17
src/rethink/analytics_engine.py

@@ -1,22 +1,59 @@
 import logging
 import json
-from comms.ibc.base_client_messaging import AbstractGatewayListener
+from time import sleep
+from rethink.tick_datastore import TickDataStore
+from comms.ibc.tws_client_lib import TWS_client_manager, AbstractGatewayListener
 
 
 
 
 
 class AnalyticsEngine(AbstractGatewayListener):
+
+    AE_OPTIONS_CONFIG = {
+        'underlying_substitution': {'IND': 'FUT'},
+        'underlying_sub_list': ['HSI', 'MHI']
+    }
+    
     
     
     def __init__(self, name, kwargs):
-        pass
+        self.twsc = TWS_client_manager(kwargs)
+        AbstractGatewayListener.__init__(self, name)
     
+        self.tds = TickDataStore(name, self.twsc)
+        self.twsc.add_listener_topics(self, kwargs['topics'])
+        self.twsc.add_listener_topics(self.tds, kwargs['tds_topics'])
+ 
+        self.option_chains = {}
+        
+    
+    def start_engine(self):
+        self.twsc.start_manager()
+        self.twsc.gw_req_subscriptions()
+        self.initial_run = True
+        while self.initial_run:
+            sleep(0.5)
+
+
+        try:
+            logging.info('AnalyticsEngine:main_loop ***** accepting console input...')
+            while True: 
+            
+                sleep(.45)
+            
+        except (KeyboardInterrupt, SystemExit):
+            logging.error('AnalyticsEngine: caught user interrupt. Shutting down...')
+            self.twsc.gw_message_handler.set_stop()
+            
+            logging.info('AnalyticsEngine: Service shut down complete...')               
     
-    def ae_req_greeks(self):
-        pass
     
     
+    def ae_req_greeks(self, event, message_value):
+        #(int tickerId, int field, double impliedVol, double delta, double optPrice, double pvDividend, double gamma, double vega, double theta, double undPrice) 
+        pass
+    
     def gw_subscription_changed(self, event, message_value):
         logging.info('MessageListener:%s. val->[%s]' % (event, message_value))
  
@@ -24,21 +61,14 @@ class AnalyticsEngine(AbstractGatewayListener):
             
     def gw_subscriptions(self, event, message_value):
         logging.info('MessageListener:%s. val->[%s]' % (event, message_value))
-        items = json.loads(message_value)
-        
+
+        if self.initial_run:
+            self.tds.update_datastore(message_value)
+            self.pending_gw_reply = False
+            
         
     def tickPrice(self, event, message_value):   
-        
-        #'value': '{"tickerId": 0, "size": 3, "field": 3}'
-        items = json.loads(message_value)
-        tid = items['tickerId']
-
-        try:
-            contract_key = self.tickers[tid]
-            #print contract_key
-            map(lambda e: e.set_tick_value(items['field'], items['price']), self.symbols[contract_key]['syms'])
-        except KeyError:
-            pass
+        self.tds.update_symbol_price(event, message_value)
 
  
     def error(self, event, message_value):

+ 220 - 0
src/rethink/option_chain.py

@@ -0,0 +1,220 @@
+# -*- coding: utf-8 -*-
+import json
+import logging
+from ib.ext.Contract import Contract
+from misc2.helpers import ContractHelper, dict2str
+from finopt.instrument import Symbol, Option
+from comms.ibc.base_client_messaging import AbstractGatewayListener
+from misc2.observer import Publisher, Subscriber 
+#from misc2.helpers import ContractHelper, OrderHelper, ExecutionFilterHelper
+
+from time import sleep
+import finopt.optcal
+
+
+class OptionsChain(Publisher):
+    underlying = None
+    spd_size = None
+    multiplier = None
+    
+    #
+    # options is a list containing Option object
+    options = None
+    
+    id = None
+    div = 0.0
+    rate = 0.0
+    expiry = None
+    trade_vol = None
+    #iv = optcal.cal_implvol(spot, contract.m_strike, contract.m_right, today, contract.m_expiry, rate, div, vol, premium)
+    
+    option_chain_events = ('on_option_added', 'on_option_deleted', 'on_option_updated')
+    
+    def __init__(self, id):
+        self.id = id
+        self.options = []
+        Publisher.__init__(self, OptionsChain.option_chain_events)
+        
+
+    
+    def get_id(self):
+        return self.id
+    
+    def get_underlying(self):
+        return self.underlying
+    
+    def set_underlying(self, contract):
+        #self.underlying = contract
+        self.underlying = Symbol(contract)
+
+        
+        
+    def set_spread_table(self, spd_size, multiplier):
+        self.spd_size = spd_size
+        self.multiplier = multiplier
+    
+    def set_div(self, div):
+        self.div = div
+    
+    def set_rate(self, rate):
+        self.rate = rate
+        
+    def set_expiry(self, expiry):
+        self.expiry = expiry
+    
+    def set_trade_vol(self, tvol):
+        self.trade_vol = tvol
+        
+    def set_option_structure(self, underlying, spd_size, multiplier, rate, div, expiry):
+        self.set_div(div)
+        self.set_rate(rate)
+        self.set_spread_table(spd_size, multiplier)
+        self.set_underlying(underlying)
+        self.set_expiry(expiry)
+      
+    def build_chain(self, px, bound, trade_vol):
+        self.set_trade_vol(trade_vol)
+        undlypx = round(px  / self.spd_size) * self.spd_size
+        upper_limit = undlypx * (1 + bound)
+        lower_limit = undlypx * (1 - bound)          
+        
+        
+        
+        base_opt_contract = json.loads(ContractHelper.object2kvstring(self.get_underlying().get_contract()))
+        
+        #
+        #     notify listener(s) the option's underlying
+        #     allowing the listeners to store the reference to OptionsChain underlying 
+        #
+        self.dispatch(OptionsChain.option_chain_events[0], self.get_underlying())
+        #
+        #
+        #
+        
+        
+        #for i in self.xfrange(int(undlypx), int(upper_limit ), self.spd_size):
+        for i in self.xfrange(undlypx, upper_limit, self.spd_size):
+
+            base_opt_contract['m_secType'] = 'OPT'
+            base_opt_contract['m_strike'] = i
+            base_opt_contract['m_expiry'] = self.expiry
+            base_opt_contract['m_right'] = 'C'
+            base_opt_contract['m_multiplier'] = self.multiplier
+            
+            #self.options.append(Option(ContractHelper.kv2object(base_opt_contract, Contract)))
+            self.add_option(Option(ContractHelper.kv2object(base_opt_contract, Contract)))
+            
+            base_opt_contract['m_right'] = 'P'
+            #self.options.append(ContractHelper.kv2object(base_opt_contract, Contract))
+            #self.options.append(Option(ContractHelper.kv2object(base_opt_contract, Contract)))
+            self.add_option(Option(ContractHelper.kv2object(base_opt_contract, Contract)))
+ 
+        
+        for i in self.xfrange(undlypx - self.spd_size, lower_limit, -self.spd_size):      
+            #print i, lower_limit
+            base_opt_contract['m_secType'] = 'OPT'
+            base_opt_contract['m_strike'] = i
+            base_opt_contract['m_expiry'] = self.expiry
+            base_opt_contract['m_right'] = 'C'
+            base_opt_contract['m_multiplier'] = self.multiplier
+            #self.options.append(ContractHelper.kv2object(base_opt_contract, Contract))
+            #self.options.append(Option(ContractHelper.kv2object(base_opt_contract, Contract)))
+            self.add_option(Option(ContractHelper.kv2object(base_opt_contract, Contract)))
+             
+            base_opt_contract['m_right'] = 'P'
+            #self.options.append(ContractHelper.kv2object(base_opt_contract, Contract))
+            #self.options.append(Option(ContractHelper.kv2object(base_opt_contract, Contract)))
+            self.add_option(Option(ContractHelper.kv2object(base_opt_contract, Contract)))
+        
+        
+
+        
+
+    def xfrange(self, start, stop=None, step=None):
+        if stop is None:
+            stop = float(start)
+            start = 0.0
+        
+        if step is None:
+            step = 1.0
+        
+        cur = float(start)
+        if start <= stop:
+            while cur < stop:
+                yield cur
+                cur += step
+        else:
+            while cur > stop:
+                yield cur
+                cur += step
+            
+    def get_option_chain(self):
+        return self.options
+
+        
+    def add_option(self, option):
+        #events = ('on_option_added', 'on_option_deleted', 'on_option_updated')
+        #
+        # 
+        self.options.append(option)
+        self.dispatch(OptionsChain.option_chain_events[0], option)
+    
+    
+    def pretty_print(self):
+        sorted_opt = sorted(map(lambda i: (self.options[i].get_contract().m_strike, self.options[i]) , range(len(self.options))))
+        
+        def format_tick_val(val, fmt):
+            if val == None:
+                length = len(fmt % (0))
+                return ' ' * length
+            
+            return fmt % (val) 
+        
+
+
+        
+        sorted_call = filter(lambda x: x[1].get_contract().m_right == 'C', sorted_opt)
+        sorted_put = filter(lambda x: x[1].get_contract().m_right == 'P', sorted_opt)
+        # last, bidq, bid, ask, askq, imvol, delta, theta
+        fmt_spec = '%8.2f'
+        fmt_spec2 = '%8.4f'
+        fmt_specq = '%8d'
+        fmt_call = map(lambda x: (x[0], '%s,%s,%s,%s,%s,%s,%s,%s' % (format_tick_val(x[1].get_tick_value(4), fmt_spec),
+                                               format_tick_val(x[1].get_tick_value(0), fmt_specq),
+                                               format_tick_val(x[1].get_tick_value(1), fmt_spec),
+                                               format_tick_val(x[1].get_tick_value(2), fmt_spec),
+                                               format_tick_val(x[1].get_tick_value(3), fmt_specq),
+                                               format_tick_val(x[1].get_analytics()[Option.IMPL_VOL], fmt_spec2),
+                                               format_tick_val(x[1].get_analytics()[Option.DELTA], fmt_spec2),
+                                               format_tick_val(x[1].get_analytics()[Option.THETA], fmt_spec2),
+                                               )), sorted_call)
+        
+        fmt_put = map(lambda x: (x[0], '%s,%s,%s,%s,%s,%s,%s,%s' % (format_tick_val(x[1].get_tick_value(4), fmt_spec),
+                                               format_tick_val(x[1].get_tick_value(0), fmt_specq),                                                                                                                  
+                                               format_tick_val(x[1].get_tick_value(1), fmt_spec),
+                                               format_tick_val(x[1].get_tick_value(2), fmt_spec),
+                                               format_tick_val(x[1].get_tick_value(3), fmt_specq),
+                                               format_tick_val(x[1].get_analytics()[Option.IMPL_VOL], fmt_spec2),
+                                               format_tick_val(x[1].get_analytics()[Option.DELTA], fmt_spec2),
+                                               format_tick_val(x[1].get_analytics()[Option.THETA], fmt_spec2),                    
+                                               )), sorted_put)
+        
+        undlypx = '%s,%s,%s,%s,%s' % (format_tick_val(self.get_underlying().get_tick_value(4), fmt_spec), 
+                                  format_tick_val(self.get_underlying().get_tick_value(0), fmt_specq),
+                                           format_tick_val(self.get_underlying().get_tick_value(1), fmt_spec),
+                                           format_tick_val(self.get_underlying().get_tick_value(2), fmt_spec),
+                                           format_tick_val(self.get_underlying().get_tick_value(3), fmt_spec)
+                                )
+        
+        #title = '%s%30s%s%s' % ('-' * 40, ContractHelper.makeRedisKeyEx(self.get_underlying().get_contract()).center(50, ' '), undlypx, '-' * 40) 
+        title = '%s%30s%s%s' % ('-' * 41, ContractHelper.makeRedisKeyEx(self.get_underlying().get_contract()).center(42, ' '), undlypx, '-' * 27)
+        header = '%8s|%8s|%8s|%8s|%8s|%8s|%8s|%8s |%8s| %8s|%8s|%8s|%8s|%8s|%8s|%8s|%8s' % ('last', 'bidq', 'bid', 'ask', 'askq', 'ivol', 'delta', 'theta', 'strike', 'last', 'bidq', 'bid', 'ask', 'askq', 'ivol', 'delta', 'theta')
+        combined = map(lambda i: '%s |%8.2f| %s' % (fmt_call[i][1], fmt_put[i][0], fmt_put[i][1]), range(len(fmt_call)) )
+        footer = '%s' % ('-' * 154) 
+        print title
+        print header
+        for e in combined:
+            print e
+        print footer
+        
+     

+ 184 - 0
src/rethink/tick_datastore.py

@@ -0,0 +1,184 @@
+import logging
+import json
+from threading import RLock
+from misc2.observer import Publisher
+from misc2.observer import NotImplementedException
+from misc2.helpers import ContractHelper
+from comms.ibc.base_client_messaging import AbstractGatewayListener
+
+class TickDataStore(Publisher, AbstractGatewayListener):
+    """
+    
+    Data structure:
+        tickers map contains key value pairs of ticker id mapped to Symbol primary key
+        tickers => {id1: key1, id2:key2...}
+        
+        example: tickers = {9: 'QQQ-20170217-127.00-C-OPT-USD-SMART-102'
+                            43: 'QQQ-20170217-124.00-C-OPT-USD-SMART-102' ...}
+                            
+        symbols map contains key value pairs of Symbol primary key mapped to a dict object.
+        The dict object contains the ticker id and a list of Symbol objects associated with ticker_id
+        symbols => {key1: 
+                        { 'ticker_id': id1, 
+                          'syms' : [<object ref to Symbol1>,<object ref to Symbol2>...]
+                        }
+                    key2:
+                        ...
+                   }
+        
+        example: symbols = {'QQQ-20170217-127.00-C-OPT-USD-SMART-102':
+                                {'ticker_id': 9, 
+                                 'syms': [<object ref to Symbol QQQ>, ...]
+                                }
+                            }
+                            
+        Usage:
+        Given a ticker_id, the Symbol key can be looked up from tickers
+        With the Symbol key obtained, the reference to the actual object associated with the ticker_id can be retrieved
+        by looking up from symbols[key]['syms']
+        
+        speed: 2 x O(1) + n
+    
+    
+    """
+    
+
+    TICK_PRICE_UPDATED = 'tds_price_updated'
+    NEW_SYMBOL_ADDED = 'tds_new_symbol_added'
+    TDS_EVENTS = [TICK_PRICE_UPDATED, NEW_SYMBOL_ADDED] 
+
+    
+    def __init__(self, name):
+        
+        AbstractGatewayListener.__init__(self, name)
+        self.tickers = {}
+        self.symbols = {}
+        self.lock = RLock()
+        
+        self.first_run = True
+        
+        
+    def register_listener(self, l):
+        map(lambda e: self.register(e, l, l.tds_price_updated), TickDataStore.TDS_EVENTS)
+
+    def dump(self):
+            # print ', '.join('[%s:%s]' % (k, v['ticker_id'])) 
+        logging.debug('TickDataStore-symbols: [Key: Ticker ID: # options objects]: ---->\n%s' % (',\n'.join('[%s:%d:%d]' % (k, v['ticker_id'], len(v['syms'])) for k, v in self.symbols.iteritems())))
+        logging.debug('TickDataStore-tickers: %s' % self.tickers)
+    
+     
+    
+    def add_symbol(self, symbol):
+        try:
+            self.lock.acquire()
+            key = symbol.get_key()
+            if key not in self.symbols:
+                self.symbols[key] = {'ticker_id':-1, 'syms': []}
+                
+            self.symbols[key]['syms'].append(symbol)        
+    
+            # defer the dispatch at the end of this method        
+            if key not in self.symbols:
+                self.dispatch(TickDataStore.NEW_SYMBOL_ADDED, symbol)
+        finally:
+            self.lock.release()
+            
+    def del_symbol(self, symbol):
+        raise NotImplementedException     
+    
+       
+        
+    def update_symbol_price(self, event, message_value):   
+        
+        # 'value': '{"tickerId": 0, "size": 3, "field": 3}'
+        items = json.loads(message_value)
+        tid = items['tickerId']
+
+        try:
+            self.lock.acquire()
+            contract_key = self.tickers[tid]
+            # print contract_key
+            map(lambda e: e.set_tick_value(items['field'], items['price']), self.symbols[contract_key]['syms'])
+            
+        except KeyError:
+            # contract not set up in the datastore, ignore message
+            pass
+        finally:
+            self.lock.release()
+            self.dispatch(TickDataStore.TICK_PRICE_UPDATED, message_value)
+            
+    def error(self, event, message_value):
+        logging.info('TickDataStore:%s. val->[%s]' % (event, message_value))  
+
+
+    
+    def gw_subscription_changed(self, event, message_value):
+        logging.info('TickDataStore:%s. val->[%s]' % (event, message_value))
+        self.update_datastore(message_value)
+ 
+
+    def update_datastore(self, subscription_message_value):
+        
+        def set_values(idc):
+            
+            
+            key = ContractHelper.makeRedisKeyEx(idc[1])
+            
+            if key in self.symbols and idc[0] <> self.symbols[key]['ticker_id']:
+                # if this condition is met, one should delete the old entry
+                # and move all object references to the new key/ticker_id
+                raise
+            
+            self.tickers[idc[0]] = key
+            try:
+                self.symbols[key]['ticker_id'] = idc[0]
+            except KeyError:
+                self.symbols[key] = {'ticker_id': idc[0],
+                                       'syms': []}
+                
+
+        
+        try:
+            def utf2asc(x):
+                return x if isinstance(x, unicode) else x
+        
+            self.lock.acquire()
+            items = json.loads(subscription_message_value)    
+            id_contracts = map(lambda x: (x[0], ContractHelper.kvstring2contract(utf2asc(x[1]))), items)
+        
+            map(lambda idc: set_values, id_contracts)    
+        except TypeError:
+            logging.error('TickDataStore:gw_subscriptions. Exception when trying to get id:contracts.')
+            return None       
+        finally:
+            self.lock.release()     
+        
+            
+    def gw_subscriptions(self, event, message_value):
+        logging.info('TickDataStore:%s. val->[%s]' % (event, message_value))
+        self.update_datastore(message_value)
+        if self.first_run:
+            self.dispatch(TickDataStore.TDS_INIT_COMPLETE, {})
+            self.first_run = False
+            
+    
+        """
+        {0: {u'm_conId': 0, u'm_right': u'', u'm_symbol': u'QQQ', 
+        u'm_secType': u'STK', u'm_includeExpired': False, 
+        u'm_expiry': u'', u'm_currency': u'USD', u'm_exchange': u'SMART', u'm_strike': 0}, 
+        1: {u'm_conId': 0, u'm_right': u'C', u'm_symbol': u'QQQ', u'm_secType': u'OPT', 
+        u'm_includeExpired': False, u'm_multiplier': 100, u'm_expiry': u'20170217', u'm_currency': u'USD', u'm_exchange': u'SMART', u'm_strike': 125.0}, 
+        2: {u'm_conId': 0, u'm_right': u'P', u'm_symbol': u'QQQ', u'm_secType': u'OPT', u'm_includeExpired': False, u'm_multiplier': 100, 
+        u'm_expiry': u'20170217', u'm_currency': u'USD', u'm_exchange': u'SMART', u'm_strike': 125.0}, 
+        ...
+         
+        78: {u'm_conId': 0, u'm_right': u'P', u'm_symbol': u'QQQ', u'm_secType': u'OPT', 
+        u'm_includeExpired': False, u'm_multiplier': 100, u'm_expiry': u'20170217', 
+        u'm_currency': u'USD', u'm_exchange': u'SMART', u'm_strike': 115.5}}
+        tickPrice>> [0:QQQ--0.00--STK-USD-SMART-102] bid_q=-1.0000 [2017-01-28 12:08:49.587014]
+
+        """
+
+        
+        
+