portfolio_monitor.py 24 KB


  1. # -*- coding: utf-8 -*-
  2. import sys, traceback
  3. import logging
  4. import json, threading
  5. import copy
  6. from optparse import OptionParser
  7. from time import sleep
  8. import time
  9. from misc2.helpers import ContractHelper
  10. from finopt.instrument import Symbol, Option
  11. from rethink.option_chain import OptionsChain
  12. from rethink.tick_datastore import TickDataStore
  13. from rethink.portfolio_item import PortfolioItem, PortfolioRules, Portfolio
  14. from rethink.portfolio_column_chart import PortfolioColumnChart,PortfolioColumnChartTM
  15. from rethink.table_model import AbstractTableModel, AbstractPortfolioTableModelListener
  16. from comms.ibc.tws_client_lib import TWS_client_manager, AbstractGatewayListener
  17. class PortfolioMonitor(AbstractGatewayListener, AbstractPortfolioTableModelListener):
  18. def __init__(self, kwargs):
  19. self.kwargs = copy.copy(kwargs)
  20. self.twsc = TWS_client_manager(kwargs)
  21. #AbstractGatewayListener.__init__(self, kwargs['name'])
  22. #AbstractPortfolioTableModelListener.__init__(self, kwargs['name'])
  23. self.tds = TickDataStore(kwargs['name'])
  24. self.tds.register_listener(self)
  25. self.twsc.add_listener_topics(self, kwargs['topics'])
  26. '''
  27. portfolios: {<account>: <portfolio>}
  28. '''
  29. self.portfolios = {}
  30. self.starting_engine = True
  31. '''
  32. portfolio_charts: {<account>, {'<chart type'>, <chart object ref>...
  33. '''
  34. self.portfolio_charts = {}
  35. def start_engine(self):
  36. self.twsc.start_manager()
  37. self.twsc.reqPositions()
  38. try:
  39. def print_menu():
  40. menu = {}
  41. menu['1']="Request position"
  42. menu['2']="Portfolio dump dtj"
  43. menu['3']="TDS dump"
  44. menu['4']="Request account updates"
  45. menu['5']="Table chart JSON"
  46. menu['6']="Table index mapping"
  47. menu['7']="Position Distribution JSON"
  48. menu['9']="Exit"
  49. choices=menu.keys()
  50. choices.sort()
  51. for entry in choices:
  52. print entry, menu[entry]
  53. def get_user_input(selection):
  54. logging.info('PortfolioMonitor:main_loop ***** accepting console input...')
  55. print_menu()
  56. while 1:
  57. resp = sys.stdin.readline()
  58. response[0]= resp.strip('\n')
  59. #print response[0]
  60. response = [None]
  61. user_input_th = threading.Thread(target=get_user_input, args=(response,))
  62. user_input_th.daemon = True
  63. user_input_th.start()
  64. while True:
  65. sleep(0.4)
  66. if response[0] is not None:
  67. selection = response[0]
  68. if selection =='1':
  69. self.twsc.reqPositions()
  70. elif selection == '2':
  71. for port in self.portfolios.values():
  72. print port.dump_portfolio()
  73. print ''.join('%d:[%6.2f]\n' % (k, v) for k, v in port.calculate_port_pl().iteritems())
  74. elif selection == '3':
  75. print self.tds.dump()
  76. elif selection == '4':
  77. for acct in self.portfolios.keys():
  78. self.twsc.reqAccountUpdates(True, acct)
  79. elif selection == '5':
  80. for port in self.portfolios.values():
  81. print port.get_JSON()
  82. elif selection == '6':
  83. for acct in self.portfolios.keys():
  84. print self.portfolios[acct].dump_table_index_map()
  85. elif selection == '7':
  86. for acct in self.portfolios.keys():
  87. pc = PortfolioColumnChart(self.portfolios[acct])
  88. print pc.get_JSON()
  89. print pc.get_xy_array()
  90. elif selection == '9':
  91. self.twsc.gw_message_handler.set_stop()
  92. break
  93. else:
  94. pass
  95. response[0] = None
  96. print_menu()
  97. except (KeyboardInterrupt, SystemExit):
  98. logging.error('PortfolioMonitor: caught user interrupt. Shutting down...')
  99. self.twsc.gw_message_handler.set_stop()
  100. logging.info('PortfolioMonitor: Service shut down complete...')
  101. except:
  102. logging.error('PortfolioMonitor. caught user interrupt. Shutting down...%s' % traceback.format_exc())
  103. self.twsc.gw_message_handler.set_stop()
  104. logging.info('PortfolioMonitor: Service shut down complete...')
  105. def get_kproducer(self):
  106. # returns a reference to the kafka base producer that we can
  107. # use for sending messages
  108. return self.twsc.gw_message_handler
  109. def get_portfolio(self, account):
  110. try:
  111. return self.portfolios[account]
  112. except KeyError:
  113. # create a new portfolio
  114. self.portfolios[account] = Portfolio(account)
  115. #
  116. # set up portfolio chart objects1
  117. #
  118. pcc = PortfolioColumnChartTM('PortfolioColumnChartTM-%s' % account,
  119. self.portfolios[account],
  120. self.get_kproducer())
  121. self.portfolio_charts[account] = {'PortfolioColumnChartTM': pcc}
  122. self.twsc.add_listener_topics(pcc, [AbstractTableModel.EVENT_TM_REQUEST_TABLE_STRUCTURE, AbstractTableModel.EVENT_TM_TABLE_STRUCTURE_CHANGED] )
  123. logging.info('PortfoioMonitor:get_portfolio creating port and chart object...%s' % account)
  124. return self.portfolios[account]
  125. def deduce_option_underlying(self, option):
  126. '''
  127. given an Option object, return the underlying Symbol object
  128. '''
  129. try:
  130. symbol_id = option.get_contract().m_symbol
  131. underlying_sectype = PortfolioRules.rule_map['symbol'][symbol_id]
  132. exchange = option.get_contract().m_exchange
  133. currency = option.get_contract().m_currency
  134. expiry = option.get_contract().m_expiry if PortfolioRules.rule_map['expiry'][symbol_id] == 'same_month' else ''
  135. contractTuple = (symbol_id, underlying_sectype, exchange, currency, expiry, 0, '')
  136. logging.info('PortfolioMonitor:deduce_option_underlying. Deduced underlying==> %s' %
  137. str(contractTuple))
  138. return Symbol(ContractHelper.makeContract(contractTuple))
  139. except KeyError:
  140. logging.error('PortfolioMonitor:deduce_option_underlying. Unable to deduce the underlying for the given option %s' %
  141. ContractHelper.printContract(option.get_contract))
  142. return None
  143. def get_portfolio_option_chain(self, account, underlying):
  144. def create_oc_id(account, underlying_id, month):
  145. return '%s-%s-%s' % (account, underlying_id, month)
  146. underlying_id = underlying.get_contract().m_symbol
  147. month = underlying.get_contract().m_expiry
  148. oc_id = create_oc_id(account, underlying_id, month)
  149. oc = self.portfolios[account].is_oc_in_portfolio(oc_id)
  150. if oc == None:
  151. oc = OptionsChain(oc_id)
  152. oc.register_listener(self)
  153. oc.set_option_structure(underlying.get_contract(),
  154. PortfolioRules.rule_map['option_structure'][underlying_id]['spd_size'],
  155. PortfolioRules.rule_map['option_structure'][underlying_id]['multiplier'],
  156. PortfolioRules.rule_map['option_structure'][underlying_id]['rate'],
  157. PortfolioRules.rule_map['option_structure'][underlying_id]['div'],
  158. month,
  159. PortfolioRules.rule_map['option_structure'][underlying_id]['trade_vol'])
  160. self.portfolios[account].set_option_chain(oc_id, oc)
  161. return oc
  162. def process_position(self, account, contract_key, position, average_cost, extra_info=None):
  163. # obtain a reference to the portfolio, if not exist create a new one
  164. port = self.get_portfolio(account)
  165. port_item = port.is_contract_in_portfolio(contract_key)
  166. if port_item:
  167. # update the values and recalculate p/l
  168. port_item.update_position(position, average_cost, extra_info)
  169. port_item.calculate_pl(contract_key)
  170. # if the function call is triggered by event accountUpdates from TWS
  171. # compute the overall portfolio greeks and p/l
  172. # (that is extra_info is not null)
  173. if extra_info:
  174. logging.info('PortfolioMonitor:process_position Recal overall port figures...')
  175. port.calculate_port_pl()
  176. # dispatch the update to internal listeners
  177. # and also send out the kafka message to external parties
  178. self.notify_table_model_changes(account, port, contract_key, mode='U')
  179. logging.info('PortfolioMonitor:process_position. Position updated: %s:[%d]' % (contract_key, port.ckey_to_row(contract_key)))
  180. # new position
  181. else:
  182. port_item = PortfolioItem(account, contract_key, position, average_cost)
  183. #port['port_items'][contract_key] = port_item
  184. port.set_portfolio_port_item(contract_key, port_item)
  185. instrument = port_item.get_instrument()
  186. self.tds.add_symbol(instrument)
  187. self.twsc.reqMktData(instrument.get_contract(), True)
  188. # option position
  189. if port_item.get_instrument_type() == 'OPT':
  190. '''
  191. deduce option's underlying
  192. resolve associated option chain by month, underlying
  193. '''
  194. underlying = self.deduce_option_underlying(instrument)
  195. if underlying:
  196. oc = self.get_portfolio_option_chain(account, underlying)
  197. instrument.set_extra_attributes(OptionsChain.CHAIN_IDENTIFIER, oc.get_name())
  198. oc.add_option(instrument)
  199. else:
  200. logging.error('PortfolioMonitor:process_position. **** Error in adding the new position %s' % contract_key)
  201. self.notify_table_model_changes(account, port, contract_key, mode='I')
  202. logging.info('PortfolioMonitor:process_position. New position: %s:[%d]' % (contract_key, port.ckey_to_row(contract_key)))
  203. port.dump_portfolio()
  204. # EVENT_OPTION_UPDATED = 'oc_option_updated'
  205. # EVENT_UNDERLYING_ADDED = 'oc_underlying_added
  206. def oc_option_updated(self, event, update_mode, name, instrument):
  207. logging.info('oc_option_updated. %s %s' % (event, vars()))
  208. self.tds.add_symbol(instrument)
  209. self.twsc.reqMktData(instrument.get_contract(), True)
  210. def oc_underlying_added(self, event, update_mode, name, instrument):
  211. logging.info('oc_underlying_added. %s %s' % (event, vars()))
  212. self.tds.add_symbol(instrument)
  213. self.twsc.reqMktData(instrument.get_contract(), True)
  214. def tds_event_symbol_added(self, event, update_mode, name, instrument):
  215. pass
  216. #logging.info('tds_event_new_symbol_added. %s' % ContractHelper.object2kvstring(symbol.get_contract()))
  217. def tds_event_tick_updated(self, event, contract_key, field, price, syms):
  218. if field not in [Symbol.ASK, Symbol.BID, Symbol.LAST]:
  219. return
  220. for s in syms:
  221. if OptionsChain.CHAIN_IDENTIFIER in s.get_extra_attributes():
  222. results = {}
  223. chain_id = s.get_extra_attributes()[OptionsChain.CHAIN_IDENTIFIER]
  224. #logging.info('PortfolioMonitor:tds_event_tick_updated chain_id %s' % chain_id)
  225. for acct in self.portfolios:
  226. #if chain_id in self.portfolios[acct]['opt_chains'].keys():
  227. if chain_id in self.portfolios[acct].get_option_chains():
  228. #logging.info('PortfolioMonitor:tds_event_tick_updated --> portfolio opt_chains: [ %s ] ' %
  229. # str(self.portfolios[acct]['opt_chains'].keys()))
  230. if 'FUT' in contract_key or 'STK' in contract_key:
  231. results = self.portfolios[acct].get_option_chain(chain_id).cal_greeks_in_chain(self.kwargs['evaluation_date'], price)
  232. else:
  233. results[ContractHelper.makeRedisKeyEx(s.get_contract())] = self.portfolios[acct].get_option_chain(chain_id)\
  234. .cal_option_greeks(s, self.kwargs['evaluation_date'], float('nan'), price)
  235. #logging.info('PortfolioMonitor:tds_event_tick_updated. compute greek results %s' % results)
  236. #underlying_px = self.portfolios[acct]['opt_chains'][chain_id].get_underlying().get_tick_value(4)
  237. def update_portfolio_fields(key_greeks):
  238. self.tds.set_symbol_analytics(key_greeks[0], Option.IMPL_VOL, key_greeks[1][Option.IMPL_VOL])
  239. self.tds.set_symbol_analytics(key_greeks[0], Option.DELTA, key_greeks[1][Option.DELTA])
  240. self.tds.set_symbol_analytics(key_greeks[0], Option.GAMMA, key_greeks[1][Option.GAMMA])
  241. self.tds.set_symbol_analytics(key_greeks[0], Option.THETA, key_greeks[1][Option.THETA])
  242. self.tds.set_symbol_analytics(key_greeks[0], Option.VEGA, key_greeks[1][Option.VEGA])
  243. self.portfolios[acct].calculate_item_pl(key_greeks[0])
  244. self.notify_table_model_changes(acct, self.portfolios[acct], key_greeks[0], mode='U')
  245. logging.info('PortfolioMonitor:tds_event_tick_updated. Position updated: %s:[%d]' % (key_greeks[0], self.portfolios[acct].ckey_to_row(key_greeks[0])))
  246. if results:
  247. #logging.info('PortfolioMonitor:tds_event_tick_updated ....before map')
  248. map(update_portfolio_fields, list(results.iteritems()))
  249. #logging.info('PortfolioMonitor:tds_event_tick_updated ....after map')
  250. else:
  251. for acct in self.portfolios:
  252. if self.portfolios[acct].is_contract_in_portfolio(contract_key):
  253. self.portfolios[acct].calculate_item_pl(contract_key)
  254. self.notify_table_model_changes(acct, self.portfolios[acct], contract_key, mode='U')
  255. else:
  256. logging.info('PortfolioMonitor:tds_event_tick_updated ignoring uninterested ticks %s' % contract_key)
  257. continue
  258. def tds_event_symbol_deleted(self, event, update_mode, name, instrument):
  259. pass
  260. def tickPrice(self, event, contract_key, field, price, canAutoExecute):
  261. self.tds.set_symbol_tick_price(contract_key, field, price, canAutoExecute)
  262. def tickSize(self, event, contract_key, field, size):
  263. #self.tds.set_symbol_tick_size(contract_key, field, size)
  264. #logging.info('MessageListener:%s. %s: %d %8.2f' % (event, contract_key, field, size))
  265. pass
  266. def position(self, event, account, contract_key, position, average_cost, end_batch):
  267. if not end_batch:
  268. #logging.info('PortfolioMonitor:position. received position message contract=%s' % contract_key)
  269. self.process_position(account, contract_key, position, average_cost)
  270. else:
  271. # to be run once per a/c during start up
  272. # subscribe to automatic account updates
  273. if self.starting_engine:
  274. for acct in self.portfolios.keys():
  275. self.twsc.reqAccountUpdates(True, account)
  276. logging.info('PortfolioMonitor:position. subscribing to auto updates for ac: [%s]' % account)
  277. self.starting_engine = False
  278. '''
  279. the 4 account functions below are invoked by AbstractListener.update_portfolio_account.
  280. the original message from TWS is first wrapped into update_portfolio_account event in
  281. class TWS_event_handler and then expanded by AbstractListener.update_portfolio_account
  282. (check tws_event_hander)
  283. '''
  284. def updateAccountValue(self, event, key, value, currency, account): # key, value, currency, accountName):
  285. self.raw_dump(event, vars())
  286. def updatePortfolio(self, event, contract_key, position, market_price, market_value, average_cost, unrealized_PNL, realized_PNL, account):
  287. self.raw_dump(event, vars())
  288. self.process_position(account, contract_key, position, average_cost,
  289. {'market_price':market_price, 'market_value':market_value, 'unrealized_PNL': unrealized_PNL, 'realized_PNL': realized_PNL})
  290. def updateAccountTime(self, event, timestamp):
  291. self.raw_dump(event, vars())
  292. def accountDownloadEnd(self, event, account): # accountName):
  293. self.raw_dump(event, vars())
  294. def error(self, event, message_value):
  295. logging.info('PortfolioMonitor:%s. val->[%s]' % (event, message_value))
  296. def raw_dump(self, event, items):
  297. del(items['self'])
  298. logging.info('%s [[ %s ]]' % (event, items))
  299. def notify_table_model_changes(self, account, port, contract_key, mode):
  300. row = port.ckey_to_row(contract_key)
  301. rvs = port.get_values_at(row)
  302. #logging.info('---- %s' % str(rvs))
  303. port.fire_table_row_updated(row, rvs)
  304. event_type = AbstractTableModel.EVENT_TM_TABLE_ROW_UPDATED if mode == 'U' else AbstractTableModel.EVENT_TM_TABLE_ROW_INSERTED
  305. self.get_kproducer().send_message(event_type, json.dumps({'source': '%s' % port.get_object_name(), 'row': row, 'row_values': rvs}))
  306. # notify chart objects to do their thing...
  307. try:
  308. pcc = self.portfolio_charts[account]['PortfolioColumnChartTM']
  309. if self.starting_engine == False:
  310. if pcc.never_been_run == True:
  311. pcc.fire_table_structure_changed(AbstractTableModel.EVENT_TM_TABLE_STRUCTURE_CHANGED,
  312. pcc.get_object_name(), None, account, pcc.get_JSON())
  313. pcc.never_been_run = False
  314. logging.info('PortfolioMonitor:notify_table_model_changes. first time trigger columnchart %d' % pcc.get_last_tally())
  315. elif pcc.get_last_tally() <> pcc.count_tally():
  316. pcc.fire_table_structure_changed(AbstractTableModel.EVENT_TM_TABLE_STRUCTURE_CHANGED,
  317. pcc.get_object_name(), None, account, pcc.get_JSON())
  318. logging.info('PortfolioMonitor:notify_table_model_changes. tally count %d...dump json' % pcc.get_last_tally())
  319. pcc.update_tally_count()
  320. else:
  321. pcc.update_tally_count()
  322. logging.info('PortfolioMonitor:notify_table_model_changes. tally count %d' % pcc.get_last_tally())
  323. # if mode == 'I':
  324. # pcc.fire_table_structure_changed(AbstractTableModel.EVENT_TM_TABLE_STRUCTURE_CHANGED,
  325. # pcc.get_object_name(), None, account, pcc.get_JSON())
  326. # else:
  327. # row = pcc.ckey_to_row(contract_key)
  328. # rvs = pcc.get_values_at(row)
  329. # logging.info('PortfolioMonitor:notify_table_model_changes. PortfolioColumnChartTM %d' % row)
  330. #pcc.fire_table_row_updated(row, rvs)
  331. except: # KeyError:
  332. # object does not exist yet?
  333. # fields have no value causing computing errors? None objects?
  334. logging.error('**** Error PortfolioMonitor:notify_table_model_changes. %s' % traceback.format_exc() )
  335. # implment AbstractPortfolioTableModelListener
  336. # handle requests to get data table json
  337. def event_tm_request_table_structure(self, event, request_id, target_resource, account):
  338. try:
  339. if target_resource['class'] == 'Portfolio':
  340. self.get_kproducer().send_message(AbstractTableModel.EVENT_TM_TABLE_STRUCTURE_CHANGED,
  341. json.dumps({'source': self.portfolios[account].get_object_name(),
  342. 'origin_request_id': request_id, 'account': account,
  343. 'data_table_json': self.portfolios[account].get_JSON()}))
  344. except:
  345. logging.error("PortfolioMonitor:event_tm_request_table_structure. Error invoking get_JSON[%s]. Client request id:%s, %s" %
  346. (account, request_id, ', '.join(e for e in sys.exc_info())))
  347. def event_tm_table_structure_changed(self, event, source, origin_request_id, account, data_table_json):
  348. logging.info("[PortfolioColumnChartTM:] received %s content:[%s]" % (event, data_table_json) )
  349. if __name__ == '__main__':
  350. kwargs = {
  351. 'name': 'portfolio_monitor',
  352. 'bootstrap_host': 'localhost',
  353. 'bootstrap_port': 9092,
  354. 'redis_host': 'localhost',
  355. 'redis_port': 6379,
  356. 'redis_db': 0,
  357. 'tws_host': 'localhost',
  358. 'tws_api_port': 8496,
  359. 'tws_app_id': 38868,
  360. 'group_id': 'PM',
  361. 'session_timeout_ms': 10000,
  362. 'clear_offsets': False,
  363. 'logconfig': {'level': logging.INFO, 'filemode': 'w', 'filename': '/tmp/pm.log'},
  364. 'topics': ['position', 'positionEnd', 'tickPrice', 'update_portfolio_account', 'event_tm_request_table_structure', AbstractTableModel.EVENT_TM_TABLE_STRUCTURE_CHANGED],
  365. 'seek_to_end': ['*'],
  366. }
  367. usage = "usage: %prog [options]"
  368. parser = OptionParser(usage=usage)
  369. parser.add_option("-c", "--clear_offsets", action="store_true", dest="clear_offsets",
  370. help="delete all redis offsets used by this program")
  371. parser.add_option("-g", "--group_id",
  372. action="store", dest="group_id",
  373. help="assign group_id to this running instance")
  374. parser.add_option("-e", "--evaluation_date",
  375. action="store", dest="evaluation_date",
  376. help="specify evaluation date for option calculations")
  377. (options, args) = parser.parse_args()
  378. if options.evaluation_date == None:
  379. options.evaluation_date = time.strftime('%Y%m%d')
  380. for option, value in options.__dict__.iteritems():
  381. if value <> None:
  382. kwargs[option] = value
  383. logconfig = kwargs['logconfig']
  384. logconfig['format'] = '%(asctime)s %(levelname)-8s %(message)s'
  385. logging.basicConfig(**logconfig)
  386. server = PortfolioMonitor(kwargs)
  387. server.start_engine()