|
|
@@ -6,31 +6,19 @@ import time, datetime
|
|
|
import copy
|
|
|
from optparse import OptionParser
|
|
|
from time import sleep
|
|
|
-from misc2.observer import Subscriber
|
|
|
+from misc2.observer import Subscriber, Publisher
|
|
|
from misc2.helpers import ContractHelper
|
|
|
from finopt.instrument import Symbol, Option
|
|
|
from rethink.option_chain import OptionsChain
|
|
|
from rethink.tick_datastore import TickDataStore
|
|
|
-from rethink.portfolio_item import PortfolioItem, PortfolioRules
|
|
|
+from rethink.portfolio_item import PortfolioItem, PortfolioRules, Portfolio
|
|
|
from comms.ibc.tws_client_lib import TWS_client_manager, AbstractGatewayListener
|
|
|
-from numpy import average
|
|
|
+from ws.ws_server import WebSocketServerWrapper
|
|
|
|
|
|
|
|
|
-
|
|
|
-
|
|
|
|
|
|
class PortfolioMonitor(AbstractGatewayListener):
|
|
|
|
|
|
-
|
|
|
- '''
|
|
|
- portfolios :
|
|
|
- {
|
|
|
- <account_id>: {'port_items': {<contract_key>, PortItem}, 'opt_chains': {<oc_id>: option_chain},
|
|
|
- 'g_table':{'rows':{...} , 'cols':{...}, 'ckey_to_row_index':{'row_id':<row_id>, 'dirty': <true/false>, 'count':0}
|
|
|
- }
|
|
|
-
|
|
|
- '''
|
|
|
-
|
|
|
|
|
|
def __init__(self, kwargs):
|
|
|
self.kwargs = copy.copy(kwargs)
|
|
|
@@ -41,6 +29,9 @@ class PortfolioMonitor(AbstractGatewayListener):
|
|
|
self.tds.register_listener(self)
|
|
|
self.twsc.add_listener_topics(self, kwargs['topics'])
|
|
|
|
|
|
+ '''
|
|
|
+ portfolios: {<account>: <portfolio>}
|
|
|
+ '''
|
|
|
self.portfolios = {}
|
|
|
|
|
|
|
|
|
@@ -68,63 +59,43 @@ class PortfolioMonitor(AbstractGatewayListener):
|
|
|
if selection =='1':
|
|
|
self.twsc.reqPositions()
|
|
|
elif selection == '2':
|
|
|
- for acct in self.portfolios.keys():
|
|
|
- #print self.dump_portfolio(acct)
|
|
|
- print self.portfolios[acct]['g_table']
|
|
|
+ for port in self.portfolios.values():
|
|
|
+ print port.dump_portfolio()
|
|
|
+ #print self.portfolios[acct]['g_table']
|
|
|
elif selection == '3':
|
|
|
print self.tds.dump()
|
|
|
elif selection == '4':
|
|
|
for acct in self.portfolios.keys():
|
|
|
self.twsc.reqAccountUpdates(True, acct)
|
|
|
elif selection == '5':
|
|
|
- for acct in self.portfolios.keys():
|
|
|
- print self.g_datatable_json(acct)
|
|
|
+ for port in self.portfolios.values():
|
|
|
+ print port.get_JSON()
|
|
|
elif selection == '9':
|
|
|
self.twsc.gw_message_handler.set_stop()
|
|
|
break
|
|
|
else:
|
|
|
pass
|
|
|
|
|
|
- sleep(0.15)
|
|
|
+ sleep(0.08)
|
|
|
|
|
|
except (KeyboardInterrupt, SystemExit):
|
|
|
logging.error('PortfolioMonitor: caught user interrupt. Shutting down...')
|
|
|
self.twsc.gw_message_handler.set_stop()
|
|
|
logging.info('PortfolioMonitor: Service shut down complete...')
|
|
|
|
|
|
- def is_contract_in_portfolio(self, account, contract_key):
|
|
|
- return self.get_portfolio_port_item(account, contract_key)
|
|
|
-
|
|
|
- def get_portfolio_port_item(self, account, contract_key):
|
|
|
- try:
|
|
|
- return self.portfolios[account]['port_items'][contract_key]
|
|
|
- except KeyError:
|
|
|
- return None
|
|
|
-
|
|
|
- def set_portfolio_port_item(self, account, contract_key, port_item):
|
|
|
- self.portfolios[account]['port_items'][contract_key] = port_item
|
|
|
|
|
|
-
|
|
|
- def create_empty_portfolio(self, account):
|
|
|
- port = self.portfolios[account] = {}
|
|
|
- self.portfolios[account]['port_items']= {}
|
|
|
- self.portfolios[account]['opt_chains']= {}
|
|
|
- self.portfolios[account]['g_table']= {'ckey_to_row_index': {'count':0}}
|
|
|
- return port
|
|
|
|
|
|
def get_portfolio(self, account):
|
|
|
try:
|
|
|
return self.portfolios[account]
|
|
|
except KeyError:
|
|
|
- self.portfolios[account] = self.create_empty_portfolio(account)
|
|
|
+ self.portfolios[account] = Portfolio(account)
|
|
|
return self.portfolios[account]
|
|
|
|
|
|
def deduce_option_underlying(self, option):
|
|
|
'''
|
|
|
given an Option object, return the underlying Symbol object
|
|
|
'''
|
|
|
-
|
|
|
-
|
|
|
try:
|
|
|
symbol_id = option.get_contract().m_symbol
|
|
|
underlying_sectype = PortfolioRules.rule_map['symbol'][symbol_id]
|
|
|
@@ -141,13 +112,6 @@ class PortfolioMonitor(AbstractGatewayListener):
|
|
|
return None
|
|
|
|
|
|
|
|
|
- def is_oc_in_portfolio(self, account, oc_id):
|
|
|
- try:
|
|
|
- return self.portfolios[account]['opt_chains'][oc_id]
|
|
|
- except KeyError:
|
|
|
- return None
|
|
|
-
|
|
|
-
|
|
|
def get_portfolio_option_chain(self, account, underlying):
|
|
|
|
|
|
|
|
|
@@ -157,7 +121,7 @@ class PortfolioMonitor(AbstractGatewayListener):
|
|
|
underlying_id = underlying.get_contract().m_symbol
|
|
|
month = underlying.get_contract().m_expiry
|
|
|
oc_id = create_oc_id(account, underlying_id, month)
|
|
|
- oc = self.is_oc_in_portfolio(account, oc_id)
|
|
|
+ oc = self.portfolios[account].is_oc_in_portfolio(oc_id)
|
|
|
if oc == None:
|
|
|
oc = OptionsChain(oc_id)
|
|
|
oc.register_listener(self)
|
|
|
@@ -169,35 +133,30 @@ class PortfolioMonitor(AbstractGatewayListener):
|
|
|
month,
|
|
|
PortfolioRules.rule_map['option_structure'][underlying_id]['trade_vol'])
|
|
|
|
|
|
- self.portfolios[account]['opt_chains'][oc_id] = oc
|
|
|
+ self.portfolios[account].set_option_chain(oc_id, oc)
|
|
|
|
|
|
|
|
|
return oc
|
|
|
|
|
|
- def mark_gtable_row_dirty(self, account, contract_key, dirty=True):
|
|
|
- self.portfolios[account]['g_table'][contract_key]['dirty'] = dirty
|
|
|
- self.portfolios[account]['g_table'][contract_key]['count'] += 1
|
|
|
- return self.portfolios[account]['g_table'][contract_key]['row_id']
|
|
|
+
|
|
|
|
|
|
def process_position(self, account, contract_key, position, average_cost, extra_info=None):
|
|
|
|
|
|
# obtain a reference to the portfolio, if not exist create a new one
|
|
|
port = self.get_portfolio(account)
|
|
|
- port_item = self.is_contract_in_portfolio(account, contract_key)
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
+ port_item = port.is_contract_in_portfolio(contract_key)
|
|
|
if port_item:
|
|
|
# update the values and recalculate p/l
|
|
|
port_item.update_position(position, average_cost, extra_info)
|
|
|
port_item.calculate_pl(contract_key)
|
|
|
|
|
|
- # update the affected row in gtable as changed
|
|
|
- self.mark_gtable_row_dirty(True)
|
|
|
+ port.fire_table_row_updated(port.ckey_to_row(contract_key))
|
|
|
+ logging.info('PortfolioMonitor:process_position. Position updated: %s:[%d]' % (contract_key, port.ckey_to_row(contract_key)))
|
|
|
# new position
|
|
|
else:
|
|
|
port_item = PortfolioItem(account, contract_key, position, average_cost)
|
|
|
- port['port_items'][contract_key] = port_item
|
|
|
+ #port['port_items'][contract_key] = port_item
|
|
|
+ port.set_portfolio_port_item(contract_key, port_item)
|
|
|
instrument = port_item.get_instrument()
|
|
|
self.tds.add_symbol(instrument)
|
|
|
self.twsc.reqMktData(instrument.get_contract(), True)
|
|
|
@@ -211,93 +170,20 @@ class PortfolioMonitor(AbstractGatewayListener):
|
|
|
underlying = self.deduce_option_underlying(instrument)
|
|
|
if underlying:
|
|
|
oc = self.get_portfolio_option_chain(account, underlying)
|
|
|
-
|
|
|
instrument.set_extra_attributes(OptionsChain.CHAIN_IDENTIFIER, oc.get_name())
|
|
|
oc.add_option(instrument)
|
|
|
-
|
|
|
-
|
|
|
else:
|
|
|
logging.error('PortfolioMonitor:process_position. **** Error in adding the new position %s' % contract_key)
|
|
|
- # non options. stocks, futures that is...
|
|
|
- else:
|
|
|
- logging.info('PortfolioMonitor:process_position. Adding a new non-option position into the portfolio [%s]' % port_item.dump())
|
|
|
- port['port_items'][contract_key] = port_item
|
|
|
+
|
|
|
|
|
|
- self.dump_portfolio(account)
|
|
|
+ port.fire_table_row_inserted(port.ckey_to_row(contract_key))
|
|
|
+ logging.info('PortfolioMonitor:process_position. New position: %s:[%d]' % (contract_key, port.ckey_to_row(contract_key)))
|
|
|
+ port.dump_portfolio()
|
|
|
+
|
|
|
+
|
|
|
|
|
|
|
|
|
-
|
|
|
- def dump_portfolio(self, account):
|
|
|
- #<account_id>: {'port_items': {<contract_key>, instrument}, 'opt_chains': {<oc_id>: option_chain}}
|
|
|
-
|
|
|
- def print_port_items(x):
|
|
|
- return '[%s]: %s %s' % (x[0], ', '.join('%s: %s' % (k,str(v)) for k, v in x[1].get_port_fields().iteritems()),
|
|
|
- ', '.join('%s: %s' % (k,str(v)) for k, v in x[1].get_instrument().get_tick_values().iteritems()))
|
|
|
-
|
|
|
- p_items = map(print_port_items, [x for x in self.portfolios[account]['port_items'].iteritems()])
|
|
|
- logging.info('PortfolioMonitor:dump_portfolio %s' % ('\n'.join(p_items)))
|
|
|
- return '\n'.join(p_items)
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
- def g_datatable_json(self, account):
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
- dtj = {'cols':[], 'rows':[], 'ckey_to_row_index':{}}
|
|
|
- header = [('symbol', 'Symbol', 'string'), ('right', 'Right', 'string'), ('avgcost', 'Avg Cost', 'number'), ('market_value', 'Market Value', 'number'),
|
|
|
- ('avgpx', 'Avg Price', 'number'), ('spotpx', 'Spot Price', 'number'), ('pos', 'Quantity', 'number'),
|
|
|
- ('delta', 'Delta', 'number'), ('theta', 'Theta', 'number'), ('gamma', 'Gamma', 'number'),
|
|
|
- ('pos_delta', 'P. Delta', 'number'), ('pos_theta', 'P. Theta', 'number'), ('pos_gamma', 'P. Gamma', 'number'),
|
|
|
- ('unreal_pl', 'Unreal P/L', 'number'), ('percent_gain_loss', '% gain/loss', 'number')
|
|
|
- ]
|
|
|
- # header fields
|
|
|
- map(lambda hf: dtj['cols'].append({'id': hf[0], 'label': hf[1], 'type': hf[2]}), header)
|
|
|
-
|
|
|
-
|
|
|
- def get_spot_px(x):
|
|
|
- px = float('nan')
|
|
|
- if x.get_quantity() > 0:
|
|
|
- px= x.get_instrument().get_tick_value(Symbol.BID)
|
|
|
- elif x.get_quantity() < 0:
|
|
|
- px= x.get_instrument().get_tick_value(Symbol.ASK)
|
|
|
- if px == -1:
|
|
|
- return x.get_instrument().get_tick_value(Symbol.LAST)
|
|
|
-
|
|
|
- # table rows
|
|
|
- def row_fields(x):
|
|
|
|
|
|
- rf = [{'v': '%s-%s-%s' % (x[1].get_symbol_id(), x[1].get_expiry(), x[1].get_strike())},
|
|
|
- {'v': x[1].get_right()},
|
|
|
- {'v': x[1].get_port_field(PortfolioItem.AVERAGE_COST)},
|
|
|
- {'v': x[1].get_port_field(PortfolioItem.MARKET_VALUE)},
|
|
|
- {'v': x[1].get_port_field(PortfolioItem.AVERAGE_PRICE)},
|
|
|
- {'v': get_spot_px(x[1])},
|
|
|
- {'v': x[1].get_quantity()},
|
|
|
- {'v': x[1].get_instrument().get_tick_value(Option.DELTA)},
|
|
|
- {'v': x[1].get_instrument().get_tick_value(Option.THETA)},
|
|
|
- {'v': x[1].get_instrument().get_tick_value(Option.GAMMA)},
|
|
|
- {'v': x[1].get_port_field(PortfolioItem.POSITION_DELTA)},
|
|
|
- {'v': x[1].get_port_field(PortfolioItem.POSITION_THETA)},
|
|
|
- {'v': x[1].get_port_field(PortfolioItem.POSITION_GAMMA)},
|
|
|
- {'v': x[1].get_port_field(PortfolioItem.UNREAL_PL)},
|
|
|
- {'v': x[1].get_port_field(PortfolioItem.PERCENT_GAIN_LOSS)}]
|
|
|
-
|
|
|
-
|
|
|
- return rf
|
|
|
-
|
|
|
- def set_contract_key_to_row_index(i):
|
|
|
- dtj['ckey_to_row_index'][p2_items[i].get_instrument().get_contract_key()]['row_id'] = i
|
|
|
- dtj['ckey_to_row_index'][p2_items[i].get_instrument().get_contract_key()]['dirty'] = False
|
|
|
-
|
|
|
- p_items = sorted([x for x in self.portfolios[account]['port_items'].iteritems()])
|
|
|
- p1_items = filter(lambda x: x[1].get_symbol_id() in self.kwargs['interested_position_types']['symbol'], p_items)
|
|
|
- p2_items = filter(lambda x: x[1].get_instrument_type() in self.kwargs['interested_position_types']['instrument_type'], p1_items)
|
|
|
- map(lambda p: dtj['rows'].append({'c': row_fields(p)}), p2_items)
|
|
|
- map(set_contract_key_to_row_index, range(len(p2_items)))
|
|
|
-
|
|
|
- return json.dumps(dtj) #, indent=4)
|
|
|
|
|
|
# EVENT_OPTION_UPDATED = 'oc_option_updated'
|
|
|
# EVENT_UNDERLYING_ADDED = 'oc_underlying_added
|
|
|
@@ -313,16 +199,9 @@ class PortfolioMonitor(AbstractGatewayListener):
|
|
|
self.tds.add_symbol(instrument)
|
|
|
self.twsc.reqMktData(instrument.get_contract(), True)
|
|
|
|
|
|
- #
|
|
|
- # tds call backs
|
|
|
- #
|
|
|
- #
|
|
|
- # EVENT_TICK_UPDATED = 'tds_event_tick_updated'
|
|
|
- # EVENT_SYMBOL_ADDED = 'tds_event_symbol_added'
|
|
|
- # EVENT_SYMBOL_DELETED = 'tds_event_symbol_deleted'
|
|
|
|
|
|
def tds_event_symbol_added(self, event, update_mode, name, instrument):
|
|
|
- pass
|
|
|
+ pass
|
|
|
#logging.info('tds_event_new_symbol_added. %s' % ContractHelper.object2kvstring(symbol.get_contract()))
|
|
|
|
|
|
|
|
|
@@ -337,13 +216,16 @@ class PortfolioMonitor(AbstractGatewayListener):
|
|
|
|
|
|
for acct in self.portfolios:
|
|
|
|
|
|
- if chain_id in self.portfolios[acct]['opt_chains'].keys():
|
|
|
+ #if chain_id in self.portfolios[acct]['opt_chains'].keys():
|
|
|
+ if chain_id in self.portfolios[acct].get_option_chains():
|
|
|
#logging.info('PortfolioMonitor:tds_event_tick_updated --> portfolio opt_chains: [ %s ] ' %
|
|
|
# str(self.portfolios[acct]['opt_chains'].keys()))
|
|
|
if 'FUT' in contract_key or 'STK' in contract_key:
|
|
|
- results = self.portfolios[acct]['opt_chains'][chain_id].cal_greeks_in_chain(self.kwargs['evaluation_date'])
|
|
|
+ #results = self.portfolios[acct]['opt_chains'][chain_id].cal_greeks_in_chain(self.kwargs['evaluation_date'])
|
|
|
+ results = self.portfolios[acct].get_option_chain(chain_id).cal_greeks_in_chain(self.kwargs['evaluation_date'])
|
|
|
else:
|
|
|
- results[ContractHelper.makeRedisKeyEx(s.get_contract())] = self.portfolios[acct]['opt_chains'][chain_id].cal_option_greeks(s, self.kwargs['evaluation_date'])
|
|
|
+ #results[ContractHelper.makeRedisKeyEx(s.get_contract())] = self.portfolios[acct]['opt_chains'][chain_id].cal_option_greeks(s, self.kwargs['evaluation_date'])
|
|
|
+ results[ContractHelper.makeRedisKeyEx(s.get_contract())] = self.portfolios[acct].get_option_chain(chain_id).cal_option_greeks(s, self.kwargs['evaluation_date'])
|
|
|
#logging.info('PortfolioMonitor:tds_event_tick_updated. compute greek results %s' % results)
|
|
|
|
|
|
#underlying_px = self.portfolios[acct]['opt_chains'][chain_id].get_underlying().get_tick_value(4)
|
|
|
@@ -356,15 +238,14 @@ class PortfolioMonitor(AbstractGatewayListener):
|
|
|
self.tds.set_symbol_analytics(key_greeks[0], Option.THETA, key_greeks[1][Option.THETA])
|
|
|
self.tds.set_symbol_analytics(key_greeks[0], Option.VEGA, key_greeks[1][Option.VEGA])
|
|
|
|
|
|
- if contract_key in self.portfolios[acct]['port_items']:
|
|
|
- self.portfolios[acct]['port_items'][contract_key].calculate_pl(key_greeks[0]) #, underlying_px)
|
|
|
+ #if contract_key in self.portfolios[acct]['port_items']:
|
|
|
+ if self.portfolios[acct].is_contract_in_portfolio(contract_key):
|
|
|
+ #self.portfolios[acct]['port_items'][contract_key].calculate_pl(key_greeks[0]) #, underlying_px)
|
|
|
+ self.portfolios[acct].calculate_item_pl(contract_key)
|
|
|
|
|
|
- # dispatch pm_event to listeners
|
|
|
- self.mark_gtable_row_dirty(acct, contract_key, True)
|
|
|
- logging.info('PortfolioMonitor:tds_event_tick_updated...marking the affected row %d:[%s] as dirty' %
|
|
|
- (self.portfolios[acct]['g_table']['dtj']['ckey_to_row_index']['row_id'], contract_key))
|
|
|
|
|
|
-
|
|
|
+ self.portfolios[acct].fire_table_row_updated(self.portfolios[acct].ckey_to_row(contract_key))
|
|
|
+ logging.info('PortfolioMonitor:tds_event_tick_updated. Position updated: %s:[%d]' % (contract_key, self.portfolios[acct].ckey_to_row(contract_key)))
|
|
|
if results:
|
|
|
#logging.info('PortfolioMonitor:tds_event_tick_updated ....before map')
|
|
|
map(update_portfolio_fields, list(results.iteritems()))
|
|
|
@@ -382,9 +263,6 @@ class PortfolioMonitor(AbstractGatewayListener):
|
|
|
def tds_event_symbol_deleted(self, event, update_mode, name, instrument):
|
|
|
pass
|
|
|
|
|
|
- #
|
|
|
- # gateway events
|
|
|
- #
|
|
|
|
|
|
def tickPrice(self, event, contract_key, field, price, canAutoExecute):
|
|
|
self.tds.set_symbol_tick_price(contract_key, field, price, canAutoExecute)
|
|
|
@@ -405,11 +283,10 @@ class PortfolioMonitor(AbstractGatewayListener):
|
|
|
# subscribe to automatic account updates
|
|
|
if self.starting_engine:
|
|
|
for acct in self.portfolios.keys():
|
|
|
- self.portfolios[acct]['g_table'] = self.g_datatable_json(acct)
|
|
|
- logging.info('PortfolioMonitor:position. generate gtable for ac: [%s]' % acct)
|
|
|
self.twsc.reqAccountUpdates(True, acct)
|
|
|
logging.info('PortfolioMonitor:position. subscribing to auto updates for ac: [%s]' % acct)
|
|
|
self.starting_engine = False
|
|
|
+ pass
|
|
|
|
|
|
'''
|
|
|
the 4 account functions below are invoked by AbstractListener.update_portfolio_account.
|
|
|
@@ -417,10 +294,7 @@ class PortfolioMonitor(AbstractGatewayListener):
|
|
|
class TWS_event_handler and then expanded by AbstractListener.update_portfolio_account
|
|
|
(check tws_event_hander)
|
|
|
'''
|
|
|
- def raw_dump(self, event, items):
|
|
|
- del(items['self'])
|
|
|
- logging.info('%s [[ %s ]]' % (event, items))
|
|
|
-
|
|
|
+
|
|
|
|
|
|
def updateAccountValue(self, event, key, value, currency, account): # key, value, currency, accountName):
|
|
|
self.raw_dump(event, vars())
|
|
|
@@ -441,6 +315,10 @@ class PortfolioMonitor(AbstractGatewayListener):
|
|
|
def error(self, event, message_value):
|
|
|
logging.info('PortfolioMonitor:%s. val->[%s]' % (event, message_value))
|
|
|
|
|
|
+ def raw_dump(self, event, items):
|
|
|
+ del(items['self'])
|
|
|
+ logging.info('%s [[ %s ]]' % (event, items))
|
|
|
+
|
|
|
|
|
|
if __name__ == '__main__':
|
|
|
|
|
|
@@ -461,9 +339,7 @@ if __name__ == '__main__':
|
|
|
'clear_offsets': False,
|
|
|
'logconfig': {'level': logging.INFO, 'filemode': 'w', 'filename': '/tmp/pm.log'},
|
|
|
'topics': ['position', 'positionEnd', 'tickPrice', 'update_portfolio_account'],
|
|
|
- 'seek_to_end': ['*'],
|
|
|
- 'interested_position_types': {'symbol': ['HSI', 'MHI'], 'instrument_type': ['OPT', 'FUT']}
|
|
|
-
|
|
|
+ 'seek_to_end': ['*']
|
|
|
|
|
|
}
|
|
|
|