| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271 |
- import logging
- import json
- import time, datetime
- import copy
- from optparse import OptionParser
- from time import sleep
- from misc2.observer import Subscriber
- from misc2.helpers import ContractHelper
- from finopt.instrument import Symbol, Option
- from rethink.option_chain import OptionsChain
- from rethink.tick_datastore import TickDataStore
- from comms.ibc.tws_client_lib import TWS_client_manager, AbstractGatewayListener
- class AnalyticsEngine(AbstractGatewayListener):
-
-
-
- def __init__(self, kwargs):
- self.kwargs = copy.copy(kwargs)
- self.twsc = TWS_client_manager(kwargs)
- AbstractGatewayListener.__init__(self, kwargs['name'])
-
- self.tds = TickDataStore(kwargs['name'])
- self.tds.register_listener(self)
- self.twsc.add_listener_topics(self, kwargs['topics'])
-
-
- self.option_chains = {}
-
-
- def test_oc(self, oc2):
- expiry = '20170330'
- contractTuple = ('HSI', 'FUT', 'HKFE', 'HKD', expiry, 0, '')
- contract = ContractHelper.makeContract(contractTuple)
-
- oc2.set_option_structure(contract, 200, 50, 0.0012, 0.0328, expiry)
-
- oc2.build_chain(24119, 0.03, 0.22)
-
- # expiry='20170324'
- # contractTuple = ('QQQ', 'STK', 'SMART', 'USD', '', 0, '')
- # contract = ContractHelper.makeContract(contractTuple)
- #
- # oc2.set_option_structure(contract, 0.5, 100, 0.0012, 0.0328, expiry)
- #
- # oc2.build_chain(132.11, 0.02, 0.22)
-
-
- oc2.pretty_print()
- for o in oc2.get_option_chain():
- self.tds.add_symbol(o)
- self.tds.add_symbol(oc2.get_underlying())
-
-
- def test_oc3(self, oc3):
- # expiry = '20170330'
- # contractTuple = ('HHI.HK', 'FUT', 'HKFE', 'HKD', expiry, 0, '')
- # contract = ContractHelper.makeContract(contractTuple)
- #
- # oc3.set_option_structure(contract, 200, 50, 0.0012, 0.0328, expiry)
- #
- # oc3.build_chain(10445, 0.03, 0.22)
- expiry = '20170331'
- contractTuple = ('QQQ', 'STK', 'SMART', 'USD', '', 0, '')
- contract = ContractHelper.makeContract(contractTuple)
-
- oc3.set_option_structure(contract, 0.5, 100, 0.0012, 0.0328, expiry)
-
- oc3.build_chain(130, 0.03, 0.22)
-
- # expiry='20170324'
- # contractTuple = ('QQQ', 'STK', 'SMART', 'USD', '', 0, '')
- # contract = ContractHelper.makeContract(contractTuple)
- #
- # oc2.set_option_structure(contract, 0.5, 100, 0.0012, 0.0328, expiry)
- #
- # oc2.build_chain(132.11, 0.02, 0.22)
-
-
- oc3.pretty_print()
- for o in oc3.get_option_chain():
- self.tds.add_symbol(o)
- self.tds.add_symbol(oc3.get_underlying())
-
-
- def start_engine(self):
- self.twsc.start_manager()
- oc2 = OptionsChain('oc2')
- oc2.register_listener(self)
- self.test_oc(oc2)
- oc3 = OptionsChain('oc3')
- oc3.register_listener(self)
- self.test_oc3(oc3)
- self.option_chains[oc2.name] = oc2
- self.option_chains[oc3.name] = oc3
-
- try:
- logging.info('AnalyticsEngine:main_loop ***** accepting console input...')
- while True:
-
- read_ch = raw_input("Enter command:")
- oc2.pretty_print()
- oc3.pretty_print()
- self.tds.dump()
- sleep(0.45)
-
- except (KeyboardInterrupt, SystemExit):
- logging.error('AnalyticsEngine: caught user interrupt. Shutting down...')
- self.twsc.gw_message_handler.set_stop()
-
- logging.info('AnalyticsEngine: Service shut down complete...')
-
-
- # EVENT_OPTION_UPDATED = 'oc_option_updated'
- # EVENT_UNDERLYING_ADDED = 'oc_underlying_added
- def oc_option_updated(self, event, update_mode, name, instrument):
- logging.info('oc_option_updated. %s %s' % (event, vars()))
- self.tds.add_symbol(instrument)
- self.twsc.reqMktData(instrument.get_contract(), True)
-
-
- def oc_underlying_added(self, event, update_mode, name, instrument):
-
- logging.info('oc_underlying_added. %s %s' % (event, vars()))
- self.tds.add_symbol(instrument)
- self.twsc.reqMktData(instrument.get_contract(), True)
- #
- # tds call backs
- #
- #
- # EVENT_TICK_UPDATED = 'tds_event_tick_updated'
- # EVENT_SYMBOL_ADDED = 'tds_event_symbol_added'
- # EVENT_SYMBOL_DELETED = 'tds_event_symbol_deleted'
-
- def tds_event_symbol_added(self, event, update_mode, name, instrument):
- pass
- #logging.info('tds_event_new_symbol_added. %s' % ContractHelper.object2kvstring(symbol.get_contract()))
-
-
- def tds_event_tick_updated(self, event, contract_key, field, price, syms):
- results = {}
- for s in syms:
-
- if OptionsChain.CHAIN_IDENTIFIER in s.get_extra_attributes():
-
- chain_id = s.get_extra_attributes()[OptionsChain.CHAIN_IDENTIFIER]
- logging.info('AnalyticsEngine:tds_event_tick_updated chain_id %s' % chain_id)
- if chain_id in self.option_chains.keys():
- if 'FUT' in contract_key or 'STK' in contract_key:
- results = self.option_chains[chain_id].cal_greeks_in_chain(self.kwargs['evaluation_date'])
- else:
- results[ContractHelper.makeRedisKeyEx(s.get_contract())] = self.option_chains[chain_id].cal_option_greeks(s, self.kwargs['evaluation_date'])
- logging.info('AnalysticsEngine:tds_event_tick_updated. compute greek results %s' % results)
- # set_analytics(self, imvol=None, delta=None, gamma=None, theta=None, vega=None, npv=None):
- #
- def update_tds_analytics(key_greeks):
-
- self.tds.set_symbol_analytics(key_greeks[0], Option.IMPL_VOL, key_greeks[1][Option.IMPL_VOL])
- self.tds.set_symbol_analytics(key_greeks[0], Option.DELTA, key_greeks[1][Option.DELTA])
- self.tds.set_symbol_analytics(key_greeks[0], Option.GAMMA, key_greeks[1][Option.GAMMA])
- self.tds.set_symbol_analytics(key_greeks[0], Option.THETA, key_greeks[1][Option.THETA])
- self.tds.set_symbol_analytics(key_greeks[0], Option.VEGA, key_greeks[1][Option.VEGA])
-
- map(update_tds_analytics, list(results.iteritems()))
- else:
-
- continue
-
-
- def tds_event_symbol_deleted(self, event, update_mode, name, instrument):
- pass
- #
- # external ae requests
- #
- def ae_req_greeks(self, event, message_value):
- #(int tickerId, int field, double impliedVol, double delta, double optPrice, double pvDividend, double gamma, double vega, double theta, double undPrice)
- pass
-
- def ae_req_tds_internal(self, event, message_value):
- logging.info('received ae_req_tds_internal')
- self.tds.dump()
-
- #
- # gateway events
- #
- def tickPrice(self, event, contract_key, field, price, canAutoExecute):
- logging.debug('MessageListener:%s. %s %d %8.2f' % (event, contract_key, field, price))
- self.tds.set_symbol_tick_price(contract_key, field, price, canAutoExecute)
- def tickSize(self, event, contract_key, field, size):
- self.tds.set_symbol_tick_size(contract_key, field, size)
- #logging.info('MessageListener:%s. %s: %d %8.2f' % (event, contract_key, field, size))
-
- def error(self, event, message_value):
- logging.info('AnalyticsEngine:%s. val->[%s]' % (event, message_value))
-
-
- if __name__ == '__main__':
-
-
- kwargs = {
- 'name': 'analytics_engine',
- 'bootstrap_host': 'localhost',
- 'bootstrap_port': 9092,
- 'redis_host': 'localhost',
- 'redis_port': 6379,
- 'redis_db': 0,
- 'tws_host': 'localhost',
- 'tws_api_port': 8496,
- 'tws_app_id': 38868,
- 'group_id': 'AE',
- 'session_timeout_ms': 10000,
- 'clear_offsets': False,
- 'logconfig': {'level': logging.INFO, 'filemode': 'w', 'filename': '/tmp/ae.log'},
- 'topics': ['tickPrice', 'tickSize'],
- 'seek_to_end': ['*']
- #'seek_to_end':['tickSize', 'tickPrice','gw_subscriptions', 'gw_subscription_changed']
- }
- usage = "usage: %prog [options]"
- parser = OptionParser(usage=usage)
- parser.add_option("-c", "--clear_offsets", action="store_true", dest="clear_offsets",
- help="delete all redis offsets used by this program")
- parser.add_option("-g", "--group_id",
- action="store", dest="group_id",
- help="assign group_id to this running instance")
- parser.add_option("-e", "--evaluation_date",
- action="store", dest="evaluation_date",
- help="specify evaluation date for option calculations")
-
- (options, args) = parser.parse_args()
- if options.evaluation_date == None:
- options.evaluation_date = time.strftime('%Y%m%d')
-
- for option, value in options.__dict__.iteritems():
- if value <> None:
- kwargs[option] = value
-
-
-
-
-
- logconfig = kwargs['logconfig']
- logconfig['format'] = '%(asctime)s %(levelname)-8s %(message)s'
- logging.basicConfig(**logconfig)
-
-
- server = AnalyticsEngine(kwargs)
- server.start_engine()
-
-
-
|