Browse Source

CNY updates

compute potential gain
add new message to handle broadcast of portfolio values
handle zero positions
skip processing non options/futures instruments position messages
laxaurus 7 years ago
parent
commit
aa29b191d0

BIN
src/docs/finopt msg flow analysis.ods


+ 56 - 2
src/finopt/instrument.py

@@ -2,6 +2,59 @@ import logging
 from ib.ext.Contract import Contract
 from misc2.helpers import ContractHelper, dict2str
 
+class InstrumentIdMap():
+    
+    idmap = {0: 'BSIZE',
+                    1: 'BID',
+                    2: 'ASK',
+                    3: 'ASIZE',
+                    4: 'LAST',
+                    5: 'LSIZE',
+                    6: 'HIGH',
+                    7: 'LOW',
+                    8: 'VOLUM',
+                    9: 'CLOSE',
+                    14: 'OPEN',
+                    5001: 'IVOL',
+                    5002: 'DELTA',
+                    5003: 'GAMA',
+                    5004: 'THETA',
+                    5005: 'VEGA',
+                    5006: 'PREMIUM',
+                    7001: 'POS',
+                    7002: 'AVGCST',
+                    7003: 'PDLTA',
+                    7004: 'PTHTA',
+                    7009: 'GAMA%',
+                    7005: 'P/L',
+                    7006: '%GAIN',
+                    7007: 'AVGPX',
+                    7008: 'MKTVAL',
+                    7041: 'PTGAIN',
+                    9000: 'TDLTA',
+                    9001: 'TDL_F',
+                    9002: 'TDL_C',
+                    9003: 'TDL_P',
+                    9010: 'TTHEA',
+                    9012: 'TTHEAC',
+                    9013: 'TTHEAP',
+                    9020: 'TGAM%',
+                    9031: 'NUMC',
+                    9032: 'NUMP',
+                    9040: 'T_UNRPL',
+                    9041: 'TPTGAIN'
+    
+                    
+                    }
+    
+    @staticmethod
+    def id2str(id):
+        try:
+            
+            return InstrumentIdMap.idmap[id]
+        except:
+            return str(id)
+
 class Symbol():
     key = None
     
@@ -20,8 +73,9 @@ class Symbol():
     LASTSIZE = 5
     CLOSE = 9
     VOLUME = 8
-    
-    
+    HIGH = 6
+    LOW = 7
+    OPEN_TICK =14
     
     
     def __init__(self, contract):

+ 36 - 6
src/html/client_g.html

@@ -8,6 +8,9 @@
 </head>
 
 <body>
+  <input type="radio" name='acct' id="acct" value="U9050568" checked="checked"> U9050568<br>
+  <input type="radio" name='acct' id="acct" value="U8080985"> U8080985<br>
+   <button id="reload_port">Reload Portfolio</button>
    <div id="table_div" style="height:600"></div>
    <div id="chart_div"></div> 
    <button id="change-btn">change columns</button>
@@ -22,6 +25,13 @@
 	$('#test-btn').click(function(){
 		alert('hereee');
 	});  
+	$('#reload_port').click(function(){
+		alert('Reloading portfolio...');
+		if (ws != null){
+			ws.close();
+			location.reload();
+		}
+	});  
 
     var chartOptions = {
             width: 800,
@@ -38,7 +48,7 @@
 	var columnChart = {'view': null, 'data': null, 'chart': null, 'options': chartOptions};
 
     var tableOptions = {allowHtml: true, sortColumn:1, 
-    		showRowNumber: true, width: '100%', height: '100%'};
+    		showRowNumber: true, width: '90%', height: '80%'};
 	var tableChart = {'view': null, 'data': null, 'chart': null, 'options': tableOptions};
 	
 
@@ -92,16 +102,21 @@
 	   
 
     var ws;
-    var account = 'U8379890';
+    var account = $('input[id=acct]:checked').val();
+    //var account = 'U8080985';
 	
     function init() {
 
       // Connect to Web Socket
-      ws = new WebSocket("ws://vorsprung:9001/");
+      ws = new WebSocket("ws://localhost:9001/");
    
       // Set event handlers.
       ws.onopen = function() {
-        output("onopen");
+    	var account = $('input[id=acct]:checked').val();
+    	  
+    	  
+        output("onopen account = " + account );
+        
         ws.send(JSON.stringify({event: 'event_tm_request_table_structure', target_resource:{'class': 'Portfolio'}, 'account': account}));
         ws.send(JSON.stringify({event: 'event_tm_request_table_structure', target_resource:{'class': 'PortfolioColumnChartTM'}, 'account': account}));
         
@@ -113,9 +128,19 @@
         //output("onmessage: " + e.data);
         d1=  JSON.parse(e.data);
 		console.log(d1);
-        
+
+		
+    	// 2019.1 if the messageis not intended 
+	if (d1.event == 'event_tm_table_structure_changed' && d1.account != account) 
+		return;
+	else if (d1.event == 'event_tm_table_row_updated' && d1['value']['source']['account'] != account) 
+		return;
+		
+		
         if (d1.event == "event_tm_table_structure_changed"){
 
+        	
+        	
 			if (d1.source.class == 'Portfolio'){
 				tableChart.data = new google.visualization.DataTable(d1.value);
 				tableChart.view = new google.visualization.DataView(tableChart.data);
@@ -137,6 +162,7 @@
 				
 			} else if (d1.source.class == 'PortfolioColumnChartTM'){
 				columnChart.data = new google.visualization.DataTable(d1.value);
+				console.log(d1.account);
 				columnChart.view = new google.visualization.DataView(columnChart.data);
 				columnChart.chart = new google.visualization.ColumnChart(
 				        document.getElementById('chart_div'));
@@ -164,6 +190,9 @@
         	data.addRow(newRow);
         	options.sortColumn = table.getSortInfo().column;
         	table.draw(view, options);
+        } else if (d1.event == 'event_port_values_updated'){
+        	
+        	console.log(d1.value.port_values);
         }
         
       };
@@ -191,6 +220,7 @@
     }
     
     function onCloseClick() {
+     
       ws.close();
     }
     
@@ -203,4 +233,4 @@
 	 
   </script>
 </body>
-</html>
+</html>

+ 86 - 17
src/rethink/portfolio_item.py

@@ -8,11 +8,12 @@ import copy
 from optparse import OptionParser
 from time import sleep
 from misc2.helpers import ContractHelper
-from finopt.instrument import Symbol, Option
+from finopt.instrument import Symbol, Option, InstrumentIdMap
 from rethink.option_chain import OptionsChain
 from rethink.tick_datastore import TickDataStore
 from numpy import average
 from rethink.table_model import AbstractTableModel
+import pandas as pd
 
 
 
@@ -43,6 +44,7 @@ class PortfolioItem():
     PERCENT_GAIN_LOSS = 7006
     AVERAGE_PRICE = 7007
     MARKET_VALUE = 7008
+    POTENTIAL_GAIN = 7041
     
         
     def __init__(self, account, contract_key, position, average_cost):
@@ -56,7 +58,8 @@ class PortfolioItem():
                             PortfolioItem.UNREAL_PL: float('nan'),
                             PortfolioItem.PERCENT_GAIN_LOSS: float('nan'),
                             PortfolioItem.AVERAGE_PRICE: float('nan'),
-                            PortfolioItem.MARKET_VALUE: float('nan')
+                            PortfolioItem.MARKET_VALUE: float('nan'),
+                            PortfolioItem.POTENTIAL_GAIN: float('nan')
                             
                             }
         
@@ -135,42 +138,67 @@ class PortfolioItem():
         try:
             assert contract_key == self.contract_key
             spot_px = self.instrument.get_tick_value(4)
+            qty = self.get_quantity()
+            if qty == 0:
+                self.set_port_field(PortfolioItem.POSITION_DELTA, 0.0)
+                self.set_port_field(PortfolioItem.POSITION_THETA, 0.0)
+                self.set_port_field(PortfolioItem.GAMMA_PERCENT, 0.0)
+                self.set_port_field(PortfolioItem.UNREAL_PL, 0.0)
+                self.set_port_field(PortfolioItem.AVERAGE_PRICE, 0.0)
+                self.set_port_field(PortfolioItem.PERCENT_GAIN_LOSS, 0.0)
+                self.set_port_field(PortfolioItem.POTENTIAL_GAIN, 0.0)
+                return
+                
+            
+            
+            potential_gain = float('nan')
             if self.get_instrument_type() == 'OPT':
                 #spot_px = self.instrument.get_tick_value(4)
                 multiplier =  PortfolioRules.rule_map['option_structure'][self.get_symbol_id()]['multiplier']
                 
-                pos_delta = self.get_quantity() * self.instrument.get_tick_value(Option.DELTA) * multiplier                                
-                pos_theta = self.get_quantity() * self.instrument.get_tick_value(Option.THETA) * multiplier
+                pos_delta = qty * self.instrument.get_tick_value(Option.DELTA) * multiplier                                
+                pos_theta = qty * self.instrument.get_tick_value(Option.THETA) * multiplier
                 gamma_percent = pos_delta * (1 + self.instrument.get_tick_value(Option.GAMMA))                               
 
                 #(spot premium * multiplier - avgcost) * pos)
                 try:
-                    unreal_pl = (spot_px * multiplier - self.get_average_cost()) * self.get_quantity()
+                    
+                    unreal_pl = (spot_px * multiplier - self.get_average_cost()) * qty
                     #print "%f %f %d" % (spot_px, self.get_average_cost(), multiplier)
                     percent_gain_loss = (1 - spot_px / (self.get_average_cost() / multiplier)) * 100 \
-                                            if self.get_quantity() < 0 else \
+                                            if qty < 0 else \
                                             (spot_px - self.get_average_cost() / multiplier) / (self.get_average_cost() / multiplier) * 100 
                                         
-                    average_px = self.get_average_cost() / multiplier                    
+                    average_px = self.get_average_cost() / multiplier
+                    
+                    # added logic to cal potential gain
+                    if qty < 0:
+                        market_value= spot_px * qty * multiplier
+                        potential_gain = market_value - unreal_pl * (1.0 if unreal_pl < 0 else 0)
+                        
                 except ZeroDivisionError, TypeError:
                     # caught error for cases where get_average_cost and quantity may be None
                     unreal_pl = float('nan')
                     percent_gain_loss = float('nan')
                     average_px = float('nan')
                             
-            else:
+            elif self.get_instrument_type() == 'FUT':
                 multiplier =  PortfolioRules.rule_map['option_structure'][self.get_symbol_id()]['multiplier']
-                pos_delta = self.get_quantity() * 1.0 * multiplier
+                pos_delta = qty * 1.0 * multiplier
                                 
                 pos_theta = 0
                 gamma_percent = 0
 
                 # (S - X) * pos * multiplier
-                unreal_pl = (spot_px * multiplier - self.get_average_cost() ) * self.get_quantity() 
+                unreal_pl = (spot_px * multiplier - self.get_average_cost() ) * qty 
                                
                 #sign = abs(self.get_quantity()) / self.get_quantity()                                
                 percent_gain_loss = unreal_pl / self.get_average_cost() * 100
                 average_px = self.get_average_cost() / multiplier
+            
+            # not option nor futures, just skip and do nothing
+            else: 
+                return
                         
             self.set_port_field(PortfolioItem.POSITION_DELTA, pos_delta)
             self.set_port_field(PortfolioItem.POSITION_THETA, pos_theta)
@@ -178,6 +206,7 @@ class PortfolioItem():
             self.set_port_field(PortfolioItem.UNREAL_PL, unreal_pl)
             self.set_port_field(PortfolioItem.AVERAGE_PRICE, average_px)
             self.set_port_field(PortfolioItem.PERCENT_GAIN_LOSS, percent_gain_loss)
+            self.set_port_field(PortfolioItem.POTENTIAL_GAIN, potential_gain)
             
         except Exception, err:
             
@@ -226,6 +255,7 @@ class Portfolio(AbstractTableModel):
     NUM_CALLS       = 9031
     NUM_PUTS       = 9032
     TOTAL_GAIN_LOSS = 9040
+    TOTAL_POTENTIAL_GAIN = 9041
     
      
     
@@ -249,6 +279,9 @@ class Portfolio(AbstractTableModel):
     def get_portfolio_port_items(self):
             return self.port['port_items']
         
+    def get_potfolio_values(self):
+            return self.port['port_v']
+        
     def create_empty_portfolio(self):
         self.port = {}
         self.port['port_items']=  {}
@@ -305,7 +338,8 @@ class Portfolio(AbstractTableModel):
               Portfolio.TOTAL_GAMMA_PERCENT : 0.0,
               Portfolio.NUM_CALLS       : 0,
               Portfolio.NUM_PUTS       : 0,
-              Portfolio.TOTAL_GAIN_LOSS : 0.0
+              Portfolio.TOTAL_GAIN_LOSS : 0.0,
+              Portfolio.TOTAL_POTENTIAL_GAIN: 0.0,
               
             } 
         def cal_port(x_tuple):
@@ -331,6 +365,7 @@ class Portfolio(AbstractTableModel):
             port_v[Portfolio.TOTAL_DELTA] += x.get_port_field(PortfolioItem.POSITION_DELTA)
             port_v[Portfolio.TOTAL_THETA] += x.get_port_field(PortfolioItem.POSITION_THETA)
             port_v[Portfolio.TOTAL_GAIN_LOSS] += x.get_port_field(PortfolioItem.UNREAL_PL)
+            port_v[Portfolio.TOTAL_POTENTIAL_GAIN] += x.get_port_field(PortfolioItem.POTENTIAL_GAIN)
             try:
                 port_v[Portfolio.TOTAL_GAMMA_PERCENT] += x.get_port_field(PortfolioItem.GAMMA_PERCENT)
             except:
@@ -340,6 +375,8 @@ class Portfolio(AbstractTableModel):
         map(cal_port, p2_items)            
         self.port['port_v'] = port_v 
         return self.port['port_v']
+    
+    
 
     def dump_portfolio(self):
         #<account_id>: {'port_items': {<contract_key>, instrument}, 'opt_chains': {<oc_id>: option_chain}}
@@ -347,11 +384,43 @@ class Portfolio(AbstractTableModel):
         def print_port_items(x):
             return '[%s]: %s %s' % (x[0],  ', '.join('%s: %s' % (k,str(v)) for k, v in x[1].get_port_fields().iteritems()),
                                            ', '.join('%s: %s' % (k,str(v)) for k, v in x[1].get_instrument().get_tick_values().iteritems()))
+#        p_items = map(print_port_items, [x for x in self.port['port_items'].iteritems()])
+#         logging.info('PortfolioMonitor:dump_portfolio %s' % ('\n'.join(p_items)))
+#         return '\n'.join(p_items)
+
         
-        p_items = map(print_port_items, [x for x in self.port['port_items'].iteritems()])
-        logging.info('PortfolioMonitor:dump_portfolio %s' % ('\n'.join(p_items)))
-        return '\n'.join(p_items)
-    
+
+        def format_port_header(x):
+                #imap = InstrumentIdMap()
+                return ['contract'] +  map(lambda d:InstrumentIdMap.id2str(d), x.get_port_fields().keys()) \
+                    + map(lambda d:InstrumentIdMap.id2str(d), x.get_instrument().get_tick_values().keys())
+            
+        def format_port_data(x):
+                return [x[0]] + map(lambda d:d, x[1].get_port_fields().values()) + map(lambda d:d, x[1].get_instrument().get_tick_values().values())
+        
+
+        #df = pd.DataFrame(data = map(format_port_data, [x for x in self.port['port_items'].iteritems()]),
+        #                  columns = format_port_header(self.port['port_items'].iteritems()[0].keys()))
+        data = map(format_port_data, [x for x in self.port['port_items'].iteritems()])
+        y = list(self.port['port_items'])[0]
+        z = self.port['port_items'][y]
+        columns = format_port_header(z)
+        pd.set_option('display.max_columns', 50)
+        df1 = pd.DataFrame(data = data, columns = columns)
+        
+        # print portfolio items
+        try:
+            print '\n\n--------- Portfolio %s --------\n' % self.get_object_name()['account']
+            print df1
+            
+            # print summary
+            df2 = pd.DataFrame(data = [self.port['port_v'].values()], 
+                               columns = map(lambda k: InstrumentIdMap.id2str(k), self.port['port_v'].keys()))
+            print '\n\n--------- Summary -------------'
+            print df2
+            print     '-------------------------------'
+        except:
+            logging.error('Portfolio. Exception while dumping portfolio contents...%s' % traceback.format_exc())
     
     
     
@@ -361,8 +430,8 @@ class Portfolio(AbstractTableModel):
         implement AbstractTableModel methods and other routines
     '''
     def init_table(self):
-        self.port['g_table']['header'] = [('symbol', 'Symbol', 'string'), ('right', 'Right', 'string'), ('avgcost', 'Avg Cost', 'number'), ('market_value', 'Market Value', 'number'), 
-                  ('avgpx', 'Avg Price', 'number'), ('spotpx', 'Spot Price', 'number'), ('pos', 'Quantity', 'number'), 
+        self.port['g_table']['header'] = [('symbol', 'Symbol', 'string'), ('right', 'Right', 'string'), ('avgcost', 'Avg Cost', 'number'), ('market_value', 'Mkt Val', 'number'), 
+                  ('avgpx', 'Avg Px', 'number'), ('spotpx', 'Spot Px', 'number'), ('pos', 'Qty', 'number'), 
                   ('delta', 'Delta', 'number'), ('theta', 'Theta', 'number'), ('gamma', 'Gamma', 'number'), 
                   ('pos_delta', 'P. Delta', 'number'), ('pos_theta', 'P. Theta', 'number'), ('gamma_percent', 'P. Gamma', 'number'), 
                   ('unreal_pl', 'Unreal P/L', 'number'), ('percent_gain_loss', '% gain/loss', 'number'),

+ 63 - 11
src/rethink/portfolio_monitor.py

@@ -20,6 +20,8 @@ from comms.ibc.tws_client_lib import TWS_client_manager, AbstractGatewayListener
 
 class PortfolioMonitor(AbstractGatewayListener, AbstractPortfolioTableModelListener):
 
+    EVENT_PORT_VALUES_UPDATED = 'event_port_values_updated'
+    
 
     def __init__(self, kwargs):
         self.kwargs = copy.copy(kwargs)
@@ -93,8 +95,10 @@ class PortfolioMonitor(AbstractGatewayListener, AbstractPortfolioTableModelListe
                         self.twsc.reqPositions()
                     elif selection == '2': 
                         for port in self.portfolios.values():
-                            print port.dump_portfolio()
-                            print ''.join('%d:[%6.2f]\n' % (k, v) for k, v in port.calculate_port_pl().iteritems())
+                            
+                            port.calculate_port_pl()
+                            port.dump_portfolio()
+                            #print ''.join('%d:[%6.2f]\n' % (k, v) for k, v in port.calculate_port_pl().iteritems())
                     elif selection == '3': 
                         
                         print self.tds.dump()
@@ -202,24 +206,37 @@ class PortfolioMonitor(AbstractGatewayListener, AbstractPortfolioTableModelListe
         return oc
     
 
+    def is_interested_contract_type(self, contract_key):
+        # given an instrument key, determine whether the contract is relevant
+        # to the portfolio monitor
+        v = filter(lambda x:x in contract_key, PortfolioRules.rule_map['interested_position_types']['instrument_type'])
+        return True if len(v) > 0 else False
     
     def process_position(self, account, contract_key, position, average_cost, extra_info=None):
         
         # obtain a reference to the portfolio, if not exist create a new one 
         port = self.get_portfolio(account)
         port_item =  port.is_contract_in_portfolio(contract_key)
+        
+            
         if port_item:
+            
+            
+            
             # update the values and recalculate p/l
             port_item.update_position(position, average_cost, extra_info)
             port_item.calculate_pl(contract_key)
             
+            #print "process_position %s extra[%s]" % (account, extra_info)
+            
             # if the function call is triggered by event accountUpdates from TWS
             # compute the overall portfolio greeks and p/l
             # (that is extra_info is not null)
             if extra_info:
-                logging.info('PortfolioMonitor:process_position Recal overall port figures...')
+                logging.info('PortfolioMonitor:process_position Recal overall port figures: account[%s]...' % (account))
                 port.calculate_port_pl()
                 
+                self.notify_port_values_updated(account, port)
             
             # dispatch the update to internal listeners
             # and also send out the kafka message to external parties
@@ -253,7 +270,7 @@ class PortfolioMonitor(AbstractGatewayListener, AbstractPortfolioTableModelListe
             
             self.notify_table_model_changes(account, port, contract_key, mode='I')
             logging.info('PortfolioMonitor:process_position. New position: %s:[%d]' % (contract_key, port.ckey_to_row(contract_key)))
-            port.dump_portfolio()
+            #port.dump_portfolio()
             
             
               
@@ -275,16 +292,26 @@ class PortfolioMonitor(AbstractGatewayListener, AbstractPortfolioTableModelListe
     
     def tds_event_symbol_added(self, event, update_mode, name, instrument):
         pass
+    
         #logging.info('tds_event_new_symbol_added. %s' % ContractHelper.object2kvstring(symbol.get_contract()))
         
     
     def tds_event_tick_updated(self, event, contract_key, field, price, syms):
+
         
         if field not in [Symbol.ASK, Symbol.BID, Symbol.LAST]:
             return
         
+        
+        
         for s in syms:
             
+            # skip position types that are not options or futures 
+            # such as currency contracts
+            if not self.is_interested_contract_type(contract_key):
+                continue
+
+            
             if OptionsChain.CHAIN_IDENTIFIER in s.get_extra_attributes():
                 results = {}
                 chain_id = s.get_extra_attributes()[OptionsChain.CHAIN_IDENTIFIER]
@@ -343,6 +370,10 @@ class PortfolioMonitor(AbstractGatewayListener, AbstractPortfolioTableModelListe
 
 
     def tickPrice(self, event, contract_key, field, price, canAutoExecute):
+        if not self.is_interested_contract_type(contract_key):
+            return
+
+        
         self.tds.set_symbol_tick_price(contract_key, field, price, canAutoExecute)
 
 
@@ -350,20 +381,28 @@ class PortfolioMonitor(AbstractGatewayListener, AbstractPortfolioTableModelListe
         #self.tds.set_symbol_tick_size(contract_key, field, size)
         #logging.info('MessageListener:%s. %s: %d %8.2f' % (event, contract_key, field, size))
         pass
+        if not self.is_interested_contract_type(contract_key):
+            return
+
+    
  
     def position(self, event, account, contract_key, position, average_cost, end_batch):
-        
+
+
+        logging.info('PortfolioMonitor:position account[%s] contract[%s]', account, contract_key)
+
         if not end_batch:
             #logging.info('PortfolioMonitor:position. received position message contract=%s' % contract_key)
-            self.process_position(account, contract_key, position, average_cost)
+            if self.is_interested_contract_type(contract_key):
+                self.process_position(account, contract_key, position, average_cost)
    
         else:
             # to be run once per a/c during start up
             # subscribe to automatic account updates
             if self.starting_engine:
                 for acct in self.portfolios.keys():
-                    self.twsc.reqAccountUpdates(True, account)
-                    logging.info('PortfolioMonitor:position. subscribing to auto updates for ac: [%s]' % account)  
+                    self.twsc.reqAccountUpdates(True, acct)
+                    logging.info('PortfolioMonitor:position. subscribing to auto updates for ac: [%s]' % acct)  
                     self.starting_engine = False
                     
     '''
@@ -379,7 +418,9 @@ class PortfolioMonitor(AbstractGatewayListener, AbstractPortfolioTableModelListe
  
     def updatePortfolio(self, event, contract_key, position, market_price, market_value, average_cost, unrealized_PNL, realized_PNL, account):
         self.raw_dump(event, vars())
-        self.process_position(account, contract_key, position, average_cost, 
+        if self.is_interested_contract_type(contract_key):
+            
+            self.process_position(account, contract_key, position, average_cost, 
                               {'market_price':market_price, 'market_value':market_value, 'unrealized_PNL': unrealized_PNL, 'realized_PNL': realized_PNL})
         
             
@@ -454,6 +495,17 @@ class PortfolioMonitor(AbstractGatewayListener, AbstractPortfolioTableModelListe
             
     def event_tm_table_structure_changed(self, event, source, origin_request_id, account, data_table_json):
         logging.info("[PortfolioColumnChartTM:] received %s  content:[%s]" % (event, data_table_json)    )        
+    
+    
+    
+    def notify_port_values_updated(self, account, port):
+        try:
+            self.get_kproducer().send_message(PortfolioMonitor.EVENT_PORT_VALUES_UPDATED,
+                                               json.dumps({'account': account,
+                                                     'port_values': self.portfolios[account].get_potfolio_values()}))
+        except:
+            logging.error('**** Error PortfolioMonitor:notify_port_values_updated. %s' % traceback.format_exc() )
+        
         
 if __name__ == '__main__':
     
@@ -461,12 +513,12 @@ if __name__ == '__main__':
     
     kwargs = {
       'name': 'portfolio_monitor',
-      'bootstrap_host': 'localhost',
+      'bootstrap_host': 'vorsprung',
       'bootstrap_port': 9092,
       'redis_host': 'localhost',
       'redis_port': 6379,
       'redis_db': 0,
-      'tws_host': 'localhost',
+      'tws_host': 'vsu-bison',
       'tws_api_port': 8496,
       'tws_app_id': 38868,
       'group_id': 'PM',

+ 22 - 1
src/rethink/tick_datastore.py

@@ -12,7 +12,27 @@ class TickDataStore(Publisher):
     """
     
     Data structure:
-
+        2019 add explainatory notes
+        
+        the tds stores object references of each symbol with the same contract key 
+        in a list. 
+        When there is a new subscription, create a new key in the symbols map and a new list
+        to store the symbol object
+            self.symbols[key] ={}
+            self.symbols[key] = {'syms':[ <s1> ]}
+            
+        Later on if a new symbol object that has the same key is created it is appended to the list
+        
+            self.symbols[key] = {'syms':[ <s1>, <s2>... ]}
+        
+        This way the TDS store keeps track of all the symbol objects, and know which object and
+        its field is to be updated
+        
+        Whenever there is a tick changed for a particular key, the TDS updates the tick fields for 
+        all the symbol objects with the same key  
+        
+            def set_symbol_tick_price(self, contract_key, field, price, canAutoExecute):
+                map(lambda e: e.set_tick_value(field, price), self.symbols[contract_key]['syms'])
     
     
     """
@@ -107,6 +127,7 @@ class TickDataStore(Publisher):
     
     
     def add_symbol(self, symbol):
+        
         try:
             dispatch = True
             self.lock.acquire()

+ 6 - 0
src/sh/ws.sh

@@ -1,15 +1,21 @@
 #!/bin/bash
 
 
+
 HOST=$(hostname)
 echo $HOST
 if [ $HOST == 'hkc-larryc-vm1' ]; then
 	FINOPT_HOME=~/ironfly-workspace/finopt/src
+elif [ $HOST == 'astron' ]; then
+	FINOPT_HOME=~/workspace/finopt/src
 elif [ $HOST == 'vorsprung' ]; then
 	FINOPT_HOME=~/workspace/finopt/src	
+		
 else
 	FINOPT_HOME=~/l1304/workspace/finopt-ironfly/finopt/src
 fi
+
+
 export PYTHONPATH=$FINOPT_HOME:$PYTHONPATH
 #python $FINOPT_HOME/ws/ws_server.py  -c -g AE1  
 python $FINOPT_HOME/ws/ws_server.py   -g AE1  

+ 20 - 6
src/ws/ws_server.py

@@ -7,6 +7,7 @@ from threading import Thread
 import copy, sys
 from misc2.observer import NotImplementedException, Subscriber, Publisher
 from rethink.table_model import AbstractTableModel
+from rethink.portfolio_monitor import PortfolioMonitor
 from comms.ibgw.base_messaging import BaseMessageListener, Prosumer
 # https://github.com/Pithikos/python-websocket-server
 
@@ -156,6 +157,7 @@ class PortfolioTableModelListener(BaseMessageListener):
         BaseMessageListener.__init__(self, name)
         self.mwss = server_wrapper
         self.simple_caching = {}
+        
 
     def event_tm_table_cell_updated(self, event, source, row, row_values):
         logging.info("[%s] received %s content:[%d]" % (self.name, event, row))
@@ -198,14 +200,25 @@ class PortfolioTableModelListener(BaseMessageListener):
     def event_tm_table_structure_changed(self, event, source, origin_request_id, account, data_table_json):
         try:
             logging.info("[%s] received %s from %s. content:[%d]" % (self.name, event, source, origin_request_id))
-            self.mwss.get_server().send_message(self.mwss.clients[origin_request_id], 
-                                                json.dumps({'source': source, 'event': event, 'value': data_table_json})) 
+            
+        # 2019 include account field as a parameter in the message
+            if source['account'] == account:
+                self.mwss.get_server().send_message(self.mwss.clients[origin_request_id], 
+                                                json.dumps({'source': source, 'event': event, 'value': data_table_json, 'account': account})) 
         #except IndexError, KeyError:
         except:
             logging.error('[%s]. index error %d' % (event, origin_request_id))
             
 
+    
+    def event_port_values_updated(self, event, account, port_values):
         
+        logging.info("[%s] received %s from %s. content:[%s]" % (self.name, event, account, json.dumps(port_values)))
+        
+        # broadcast to all subscribed clients
+
+        self.mwss.get_server().send_message_to_all( 
+                                        json.dumps({'event': event, 'value': port_values, 'account': account})) 
 
 
 class MainWebSocketServer(BaseWebSocketServerWrapper):
@@ -232,7 +245,7 @@ class MainWebSocketServer(BaseWebSocketServerWrapper):
         '''
 #         topics = ['event_tm_table_cell_updated', 'event_tm_table_row_inserted', 
 #                   'event_tm_table_row_updated', 'event_tm_table_structure_changed']
-        topics = AbstractTableModel.TM_EVENTS
+
                 
         self.message_handler = Prosumer(name='tblMessageHandler', kwargs=kwargs)
         tbl_listener = PortfolioTableModelListener('portTableModelListener', self)
@@ -329,6 +342,7 @@ class MainWebSocketServer(BaseWebSocketServerWrapper):
     # Called for every client disconnecting
     def client_left(self, client, server):
         try:
+            BaseWebSocketServerWrapper.client_left(self, client, server)
             del self.clients[client['id']]
         except:
             pass
@@ -350,17 +364,17 @@ def main():
     
     kwargs = {
       'name': 'WebSocketServer',
-      'bootstrap_host': 'localhost',
+      'bootstrap_host': 'vorsprung',
       'bootstrap_port': 9092,
       'redis_host': 'localhost',
       'redis_port': 6379,
       'redis_db': 0,
-      'tws_host': 'localhost',
+      'tws_host': 'vsu-bison',
       'group_id': 'WS',
       'session_timeout_ms': 10000,
       'clear_offsets':  False,
       'logconfig': {'level': logging.INFO, 'filemode': 'w', 'filename': '/tmp/ws.log'},
-      'topics': AbstractTableModel.TM_EVENTS,
+      'topics': AbstractTableModel.TM_EVENTS + [PortfolioMonitor.EVENT_PORT_VALUES_UPDATED],
       'seek_to_end': ['*'],
       'ws_flush_timeout': 2000,
       'ws_dirty_count': 15,