analytics_engine.py 9.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271
  1. import logging
  2. import json
  3. import time, datetime
  4. import copy
  5. from optparse import OptionParser
  6. from time import sleep
  7. from misc2.observer import Subscriber
  8. from misc2.helpers import ContractHelper
  9. from finopt.instrument import Symbol, Option
  10. from rethink.option_chain import OptionsChain
  11. from rethink.tick_datastore import TickDataStore
  12. from comms.ibc.tws_client_lib import TWS_client_manager, AbstractGatewayListener
  13. class AnalyticsEngine(AbstractGatewayListener):
  14. def __init__(self, kwargs):
  15. self.kwargs = copy.copy(kwargs)
  16. self.twsc = TWS_client_manager(kwargs)
  17. AbstractGatewayListener.__init__(self, kwargs['name'])
  18. self.tds = TickDataStore(kwargs['name'])
  19. self.tds.register_listener(self)
  20. self.twsc.add_listener_topics(self, kwargs['topics'])
  21. self.option_chains = {}
  22. def test_oc(self, oc2):
  23. expiry = '20170330'
  24. contractTuple = ('HSI', 'FUT', 'HKFE', 'HKD', expiry, 0, '')
  25. contract = ContractHelper.makeContract(contractTuple)
  26. oc2.set_option_structure(contract, 200, 50, 0.0012, 0.0328, expiry)
  27. oc2.build_chain(24119, 0.03, 0.22)
  28. # expiry='20170324'
  29. # contractTuple = ('QQQ', 'STK', 'SMART', 'USD', '', 0, '')
  30. # contract = ContractHelper.makeContract(contractTuple)
  31. #
  32. # oc2.set_option_structure(contract, 0.5, 100, 0.0012, 0.0328, expiry)
  33. #
  34. # oc2.build_chain(132.11, 0.02, 0.22)
  35. oc2.pretty_print()
  36. for o in oc2.get_option_chain():
  37. self.tds.add_symbol(o)
  38. self.tds.add_symbol(oc2.get_underlying())
  39. def test_oc3(self, oc3):
  40. # expiry = '20170330'
  41. # contractTuple = ('HHI.HK', 'FUT', 'HKFE', 'HKD', expiry, 0, '')
  42. # contract = ContractHelper.makeContract(contractTuple)
  43. #
  44. # oc3.set_option_structure(contract, 200, 50, 0.0012, 0.0328, expiry)
  45. #
  46. # oc3.build_chain(10445, 0.03, 0.22)
  47. expiry = '20170331'
  48. contractTuple = ('QQQ', 'STK', 'SMART', 'USD', '', 0, '')
  49. contract = ContractHelper.makeContract(contractTuple)
  50. oc3.set_option_structure(contract, 0.5, 100, 0.0012, 0.0328, expiry)
  51. oc3.build_chain(130, 0.03, 0.22)
  52. # expiry='20170324'
  53. # contractTuple = ('QQQ', 'STK', 'SMART', 'USD', '', 0, '')
  54. # contract = ContractHelper.makeContract(contractTuple)
  55. #
  56. # oc2.set_option_structure(contract, 0.5, 100, 0.0012, 0.0328, expiry)
  57. #
  58. # oc2.build_chain(132.11, 0.02, 0.22)
  59. oc3.pretty_print()
  60. for o in oc3.get_option_chain():
  61. self.tds.add_symbol(o)
  62. self.tds.add_symbol(oc3.get_underlying())
  63. def start_engine(self):
  64. self.twsc.start_manager()
  65. oc2 = OptionsChain('oc2')
  66. oc2.register_listener(self)
  67. self.test_oc(oc2)
  68. oc3 = OptionsChain('oc3')
  69. oc3.register_listener(self)
  70. self.test_oc3(oc3)
  71. self.option_chains[oc2.name] = oc2
  72. self.option_chains[oc3.name] = oc3
  73. try:
  74. logging.info('AnalyticsEngine:main_loop ***** accepting console input...')
  75. while True:
  76. read_ch = raw_input("Enter command:")
  77. oc2.pretty_print()
  78. oc3.pretty_print()
  79. self.tds.dump()
  80. sleep(0.45)
  81. except (KeyboardInterrupt, SystemExit):
  82. logging.error('AnalyticsEngine: caught user interrupt. Shutting down...')
  83. self.twsc.gw_message_handler.set_stop()
  84. logging.info('AnalyticsEngine: Service shut down complete...')
  85. # EVENT_OPTION_UPDATED = 'oc_option_updated'
  86. # EVENT_UNDERLYING_ADDED = 'oc_underlying_added
  87. def oc_option_updated(self, event, update_mode, name, instrument):
  88. logging.info('oc_option_updated. %s %s' % (event, vars()))
  89. self.tds.add_symbol(instrument)
  90. self.twsc.reqMktData(instrument.get_contract(), True)
  91. def oc_underlying_added(self, event, update_mode, name, instrument):
  92. logging.info('oc_underlying_added. %s %s' % (event, vars()))
  93. self.tds.add_symbol(instrument)
  94. self.twsc.reqMktData(instrument.get_contract(), True)
  95. #
  96. # tds call backs
  97. #
  98. #
  99. # EVENT_TICK_UPDATED = 'tds_event_tick_updated'
  100. # EVENT_SYMBOL_ADDED = 'tds_event_symbol_added'
  101. # EVENT_SYMBOL_DELETED = 'tds_event_symbol_deleted'
  102. def tds_event_symbol_added(self, event, update_mode, name, instrument):
  103. pass
  104. #logging.info('tds_event_new_symbol_added. %s' % ContractHelper.object2kvstring(symbol.get_contract()))
  105. def tds_event_tick_updated(self, event, contract_key, field, price, syms):
  106. results = {}
  107. for s in syms:
  108. if OptionsChain.CHAIN_IDENTIFIER in s.get_extra_attributes():
  109. chain_id = s.get_extra_attributes()[OptionsChain.CHAIN_IDENTIFIER]
  110. logging.info('AnalyticsEngine:tds_event_tick_updated chain_id %s' % chain_id)
  111. if chain_id in self.option_chains.keys():
  112. if 'FUT' in contract_key or 'STK' in contract_key:
  113. results = self.option_chains[chain_id].cal_greeks_in_chain(self.kwargs['evaluation_date'])
  114. else:
  115. results[ContractHelper.makeRedisKeyEx(s.get_contract())] = self.option_chains[chain_id].cal_option_greeks(s, self.kwargs['evaluation_date'])
  116. logging.info('AnalysticsEngine:tds_event_tick_updated. compute greek results %s' % results)
  117. # set_analytics(self, imvol=None, delta=None, gamma=None, theta=None, vega=None, npv=None):
  118. #
  119. def update_tds_analytics(key_greeks):
  120. self.tds.set_symbol_analytics(key_greeks[0], Option.IMPL_VOL, key_greeks[1][Option.IMPL_VOL])
  121. self.tds.set_symbol_analytics(key_greeks[0], Option.DELTA, key_greeks[1][Option.DELTA])
  122. self.tds.set_symbol_analytics(key_greeks[0], Option.GAMMA, key_greeks[1][Option.GAMMA])
  123. self.tds.set_symbol_analytics(key_greeks[0], Option.THETA, key_greeks[1][Option.THETA])
  124. self.tds.set_symbol_analytics(key_greeks[0], Option.VEGA, key_greeks[1][Option.VEGA])
  125. map(update_tds_analytics, list(results.iteritems()))
  126. else:
  127. continue
  128. def tds_event_symbol_deleted(self, event, update_mode, name, instrument):
  129. pass
  130. #
  131. # external ae requests
  132. #
  133. def ae_req_greeks(self, event, message_value):
  134. #(int tickerId, int field, double impliedVol, double delta, double optPrice, double pvDividend, double gamma, double vega, double theta, double undPrice)
  135. pass
  136. def ae_req_tds_internal(self, event, message_value):
  137. logging.info('received ae_req_tds_internal')
  138. self.tds.dump()
  139. #
  140. # gateway events
  141. #
  142. def tickPrice(self, event, contract_key, field, price, canAutoExecute):
  143. logging.debug('MessageListener:%s. %s %d %8.2f' % (event, contract_key, field, price))
  144. self.tds.set_symbol_tick_price(contract_key, field, price, canAutoExecute)
  145. def tickSize(self, event, contract_key, field, size):
  146. self.tds.set_symbol_tick_size(contract_key, field, size)
  147. #logging.info('MessageListener:%s. %s: %d %8.2f' % (event, contract_key, field, size))
  148. def error(self, event, message_value):
  149. logging.info('AnalyticsEngine:%s. val->[%s]' % (event, message_value))
  150. if __name__ == '__main__':
  151. kwargs = {
  152. 'name': 'analytics_engine',
  153. 'bootstrap_host': 'localhost',
  154. 'bootstrap_port': 9092,
  155. 'redis_host': 'localhost',
  156. 'redis_port': 6379,
  157. 'redis_db': 0,
  158. 'tws_host': 'localhost',
  159. 'tws_api_port': 8496,
  160. 'tws_app_id': 38868,
  161. 'group_id': 'AE',
  162. 'session_timeout_ms': 10000,
  163. 'clear_offsets': False,
  164. 'logconfig': {'level': logging.INFO, 'filemode': 'w', 'filename': '/tmp/ae.log'},
  165. 'topics': ['tickPrice', 'tickSize'],
  166. 'seek_to_end': ['*']
  167. #'seek_to_end':['tickSize', 'tickPrice','gw_subscriptions', 'gw_subscription_changed']
  168. }
  169. usage = "usage: %prog [options]"
  170. parser = OptionParser(usage=usage)
  171. parser.add_option("-c", "--clear_offsets", action="store_true", dest="clear_offsets",
  172. help="delete all redis offsets used by this program")
  173. parser.add_option("-g", "--group_id",
  174. action="store", dest="group_id",
  175. help="assign group_id to this running instance")
  176. parser.add_option("-e", "--evaluation_date",
  177. action="store", dest="evaluation_date",
  178. help="specify evaluation date for option calculations")
  179. (options, args) = parser.parse_args()
  180. if options.evaluation_date == None:
  181. options.evaluation_date = time.strftime('%Y%m%d')
  182. for option, value in options.__dict__.iteritems():
  183. if value <> None:
  184. kwargs[option] = value
  185. logconfig = kwargs['logconfig']
  186. logconfig['format'] = '%(asctime)s %(levelname)-8s %(message)s'
  187. logging.basicConfig(**logconfig)
  188. server = AnalyticsEngine(kwargs)
  189. server.start_engine()