|
|
@@ -12,12 +12,13 @@ from finopt.instrument import Symbol, Option
|
|
|
from rethink.option_chain import OptionsChain
|
|
|
from rethink.tick_datastore import TickDataStore
|
|
|
from rethink.portfolio_item import PortfolioItem, PortfolioRules, Portfolio
|
|
|
+from rethink.table_model import AbstractTableModel, AbstractPortfolioTableModelListener
|
|
|
from comms.ibc.tws_client_lib import TWS_client_manager, AbstractGatewayListener
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
-class PortfolioMonitor(AbstractGatewayListener):
|
|
|
+class PortfolioMonitor(AbstractGatewayListener, AbstractPortfolioTableModelListener):
|
|
|
|
|
|
|
|
|
def __init__(self, kwargs):
|
|
|
@@ -31,7 +32,7 @@ class PortfolioMonitor(AbstractGatewayListener):
|
|
|
|
|
|
|
|
|
|
|
|
- self.ws_server = None
|
|
|
+
|
|
|
'''
|
|
|
portfolios: {<account>: <portfolio>}
|
|
|
'''
|
|
|
@@ -87,10 +88,10 @@ class PortfolioMonitor(AbstractGatewayListener):
|
|
|
logging.info('PortfolioMonitor: Service shut down complete...')
|
|
|
|
|
|
|
|
|
- def kproducer(self):
|
|
|
+ def get_kproducer(self):
|
|
|
# returns a reference to the kafka base producer that we can
|
|
|
# use for sending messages
|
|
|
- return self.twsc.gw_message_handler()
|
|
|
+ return self.twsc.gw_message_handler
|
|
|
|
|
|
|
|
|
def get_portfolio(self, account):
|
|
|
@@ -334,13 +335,24 @@ class PortfolioMonitor(AbstractGatewayListener):
|
|
|
|
|
|
|
|
|
def notify_table_model_changes(self, account, port, contract_key, mode):
|
|
|
- row = port.ckey_to_row(contract_key)
|
|
|
- rvs = port.get_values_at(row)
|
|
|
- port.fire_table_row_updated(row, rvs)
|
|
|
- event_type = 'table_row_updated' if mode == 'U' else 'table_row_inserted'
|
|
|
- self.kproducer().send_message(event_type, json.dumps({'source': 'dt_port-%s' % account, 'row': row, 'row_values': rvs}))
|
|
|
-
|
|
|
+
|
|
|
+ row = port.ckey_to_row(contract_key)
|
|
|
+ rvs = port.get_values_at(row)
|
|
|
+ #logging.info('---- %s' % str(rvs))
|
|
|
+ port.fire_table_row_updated(row, rvs)
|
|
|
+ event_type = AbstractTableModel.EVENT_TM_TABLE_ROW_UPDATED if mode == 'U' else AbstractTableModel.EVENT_TM_TABLE_ROW_INSERTED
|
|
|
+ self.get_kproducer().send_message(event_type, json.dumps({'source': 'dt_port-%s' % account, 'row': row, 'row_values': rvs}))
|
|
|
|
|
|
+ # implment AbstractPortfolioTableModelListener
|
|
|
+ # handle requests to get data table json
|
|
|
+ def event_tm_request_table_structure(self, event, request_id, account):
|
|
|
+ try:
|
|
|
+ self.get_kproducer().send_message(AbstractTableModel.EVENT_TM_TABLE_STRUCTURE_CHANGED,
|
|
|
+ request_id, self.portfolios[account].get_JSON())
|
|
|
+ except:
|
|
|
+ logging.error("PortfolioMonitor:event_tm_request_table_structure. Error invoking get_JSON[%s]. Client request id:%s, %s" %
|
|
|
+ account, request_id, ', '.join(e for e in sys.exc_info()))
|
|
|
+
|
|
|
|
|
|
if __name__ == '__main__':
|
|
|
|
|
|
@@ -360,7 +372,7 @@ if __name__ == '__main__':
|
|
|
'session_timeout_ms': 10000,
|
|
|
'clear_offsets': False,
|
|
|
'logconfig': {'level': logging.INFO, 'filemode': 'w', 'filename': '/tmp/pm.log'},
|
|
|
- 'topics': ['position', 'positionEnd', 'tickPrice', 'update_portfolio_account'],
|
|
|
+ 'topics': ['position', 'positionEnd', 'tickPrice', 'update_portfolio_account', 'event_tm_request_table_structure'],
|
|
|
'seek_to_end': ['*'],
|
|
|
'ws_port': 9001,
|
|
|
|