ソースを参照

working version - web socket pushing port data

bobhk 8 年 前
コミット
ba5199aa53

+ 2 - 1
src/comms/ibgw/base_messaging.py

@@ -342,7 +342,8 @@ class BaseConsumer(threading.Thread, Publisher):
                       
                 """
                 if self.my_topics[message.topic][str(message.partition)] > message.offset:
-                    logging.info('BaseConsumer ********************** old message...discarding %s %d' % (message.topic, message.offset))
+                    logging.info('BaseConsumer ********************** old message...discarding %s %d(%d)' % (message.topic, message.offset, 
+                                                                                        self.my_topics[message.topic][str(message.partition)]))
                 else:
                     #if self.my_topics[message.topic][str(message.partition)] == message.offset:
                     # if the stored offset in redis equals to the current offset

+ 24 - 14
src/rethink/portfolio_item.py

@@ -37,7 +37,7 @@ class PortfolioItem():
     AVERAGE_COST = 7002
     POSITION_DELTA = 7003
     POSITION_THETA = 7004
-    POSITION_GAMMA = 7009
+    GAMMA_PERCENT = 7009
     UNREAL_PL = 7005
     PERCENT_GAIN_LOSS = 7006
     AVERAGE_PRICE = 7007
@@ -140,7 +140,7 @@ class PortfolioItem():
                 
                 pos_delta = self.get_quantity() * self.instrument.get_tick_value(Option.DELTA) * multiplier                                
                 pos_theta = self.get_quantity() * self.instrument.get_tick_value(Option.THETA) * multiplier
-                pos_gamma = self.get_quantity() * self.instrument.get_tick_value(Option.GAMMA) * multiplier                               
+                gamma_percent = pos_delta * (1 + self.instrument.get_tick_value(Option.GAMMA))                               
 
                 #(spot premium * multiplier - avgcost) * pos)
                 try:
@@ -158,22 +158,22 @@ class PortfolioItem():
                     average_px = float('nan')
                             
             else:
-                
-                pos_delta = self.get_quantity() * 1.0 * \
-                               PortfolioRules.rule_map['option_structure'][self.get_symbol_id()]['multiplier'] 
+                multiplier =  PortfolioRules.rule_map['option_structure'][self.get_symbol_id()]['multiplier']
+                pos_delta = self.get_quantity() * 1.0 * multiplier
+                                
                 pos_theta = 0
-                pos_gamma = 0
+                gamma_percent = 0
+                
                 # (S - X) * pos * multiplier
-                unreal_pl = (self.instrument.get_tick_value(4) - self.get_average_cost() ) * self.get_quantity() * \
-                               PortfolioRules.rule_map['option_structure'][self.get_symbol_id()]['multiplier']
+                unreal_pl = (spot_px * multiplier - self.get_average_cost() ) * self.get_quantity() 
                                
-                sign = abs(self.get_quantity()) / self.get_quantity()                                
-                percent_gain_loss = sign * (spot_px - self.get_average_cost() / multiplier) / (self.get_average_cost() / multiplier) * 100
+                #sign = abs(self.get_quantity()) / self.get_quantity()                                
+                percent_gain_loss = unreal_pl / self.get_average_cost() * 100
                 average_px = self.get_average_cost() / multiplier
                         
             self.set_port_field(PortfolioItem.POSITION_DELTA, pos_delta)
             self.set_port_field(PortfolioItem.POSITION_THETA, pos_theta)
-            self.set_port_field(PortfolioItem.POSITION_GAMMA, pos_gamma)
+            self.set_port_field(PortfolioItem.GAMMA_PERCENT, gamma_percent)
             self.set_port_field(PortfolioItem.UNREAL_PL, unreal_pl)
             self.set_port_field(PortfolioItem.AVERAGE_PRICE, average_px)
             self.set_port_field(PortfolioItem.PERCENT_GAIN_LOSS, percent_gain_loss)
@@ -218,6 +218,9 @@ class Portfolio(AbstractTableModel):
         self.create_empty_portfolio()
         AbstractTableModel.__init__(self)
         
+    def get_object_name(self):
+        return 'p-%s-%s' % (self.account, id(self))
+    
     def is_contract_in_portfolio(self, contract_key):
         return self.get_portfolio_port_item(contract_key)
             
@@ -290,7 +293,7 @@ class Portfolio(AbstractTableModel):
         self.port['g_table']['header'] = [('symbol', 'Symbol', 'string'), ('right', 'Right', 'string'), ('avgcost', 'Avg Cost', 'number'), ('market_value', 'Market Value', 'number'), 
                   ('avgpx', 'Avg Price', 'number'), ('spotpx', 'Spot Price', 'number'), ('pos', 'Quantity', 'number'), 
                   ('delta', 'Delta', 'number'), ('theta', 'Theta', 'number'), ('gamma', 'Gamma', 'number'), 
-                  ('pos_delta', 'P. Delta', 'number'), ('pos_theta', 'P. Theta', 'number'), ('pos_gamma', 'P. Gamma', 'number'), 
+                  ('pos_delta', 'P. Delta', 'number'), ('pos_theta', 'P. Theta', 'number'), ('gamma_percent', 'P. Gamma', 'number'), 
                   ('unreal_pl', 'Unreal P/L', 'number'), ('percent_gain_loss', '% gain/loss', 'number'),
                   ('symbolid', 'Sym Id', 'string')
                   ]  
@@ -356,7 +359,7 @@ class Portfolio(AbstractTableModel):
              {'v': handle_NaN(x[1].get_instrument().get_tick_value(Option.GAMMA))},
              {'v': handle_NaN(x[1].get_port_field(PortfolioItem.POSITION_DELTA))},
              {'v': handle_NaN(x[1].get_port_field(PortfolioItem.POSITION_THETA))},
-             {'v': handle_NaN(x[1].get_port_field(PortfolioItem.POSITION_GAMMA))},
+             {'v': handle_NaN(x[1].get_port_field(PortfolioItem.GAMMA_PERCENT))},
              {'v': handle_NaN(x[1].get_port_field(PortfolioItem.UNREAL_PL))},
              {'v': handle_NaN(x[1].get_port_field(PortfolioItem.PERCENT_GAIN_LOSS))},
              {'v': x[1].get_symbol_id()}
@@ -386,7 +389,14 @@ class Portfolio(AbstractTableModel):
         map(lambda hf: dtj['cols'].append({'id': hf[0], 'label': hf[1], 'type': hf[2]}), self.port['g_table']['header'])
         
         #p_items = sorted([x for x in self.port['port_items'].iteritems()])
-        p_items = [x for x in self.port['port_items'].iteritems()]
+        
+        # create a list of port items tuples (contract_key, port_item) ordered by row_id
+        # that is in the order when each items was created and inserted into the map
+        # this ensures that the same sequence is replicated to the google datatable
+        p_items = map(lambda x:(self.port['g_table']['row_to_ckey_index'][x], 
+                        self.port['port_items'][ self.port['g_table']['row_to_ckey_index'][x] ]), range(self.port['g_table']['row_index']))
+        
+
         #p1_items = filter(lambda x: x[1].get_symbol_id() in PortfolioRules.rule_map['interested_position_types']['symbol'], p_items)
         #p2_items = filter(lambda x: x[1].get_instrument_type() in  PortfolioRules.rule_map['interested_position_types']['instrument_type'], p1_items)
         #map(lambda p: dtj['rows'].append({'c': self.port_item_to_row_fields(p)}), p2_items)

+ 9 - 3
src/rethink/portfolio_monitor.py

@@ -295,7 +295,12 @@ class PortfolioMonitor(AbstractGatewayListener, AbstractPortfolioTableModelListe
                                
                     
             else:
-                logging.info('PortfolioMonitor:tds_event_tick_updated ignoring uninterested ticks %s' % contract_key)
+                for acct in self.portfolios:
+                    if self.portfolios[acct].is_contract_in_portfolio(contract_key):
+                        self.portfolios[acct].calculate_item_pl(contract_key)
+                        self.notify_table_model_changes(acct, self.portfolios[acct], contract_key, mode='U')
+                    else:    
+                        logging.info('PortfolioMonitor:tds_event_tick_updated ignoring uninterested ticks %s' % contract_key)
                 continue
              
         
@@ -368,14 +373,15 @@ class PortfolioMonitor(AbstractGatewayListener, AbstractPortfolioTableModelListe
         #logging.info('---- %s' % str(rvs))
         port.fire_table_row_updated(row, rvs)
         event_type = AbstractTableModel.EVENT_TM_TABLE_ROW_UPDATED if mode == 'U' else AbstractTableModel.EVENT_TM_TABLE_ROW_INSERTED
-        self.get_kproducer().send_message(event_type, json.dumps({'source': 'dt_port-%s' % account, 'row': row, 'row_values': rvs}))
+        self.get_kproducer().send_message(event_type, json.dumps({'source': '%s' % port.get_object_name(), 'row': row, 'row_values': rvs}))
     
     # implment AbstractPortfolioTableModelListener
     # handle requests to get data table json
     def event_tm_request_table_structure(self, event, request_id, account):
         try:
             self.get_kproducer().send_message(AbstractTableModel.EVENT_TM_TABLE_STRUCTURE_CHANGED,                                               
-                                          json.dumps({'origin_request_id': request_id, 'account': account, 
+                                          json.dumps({'source': self.portfolios[account].get_object_name(), 
+                                                      'origin_request_id': request_id, 'account': account, 
                                                       'data_table_json': self.portfolios[account].get_JSON()}))
         except:
             logging.error("PortfolioMonitor:event_tm_request_table_structure. Error invoking get_JSON[%s]. Client request id:%s, %s" %

+ 1 - 1
src/rethink/table_model.py

@@ -77,7 +77,7 @@ class AbstractPortfolioTableModelListener(BaseMessageListener):
     def event_tm_table_row_updated(self, event, source, row, row_values):   
         logging.info("[%s] received %s content:[%s]" % (self.name, event, vars()))
     
-    def event_tm_table_structure_changed(self, event, origin_request_id, account, data_table_json):
+    def event_tm_table_structure_changed(self, event, source, origin_request_id, account, data_table_json):
         logging.info("[%s] received %s content:[%s]" % (self.name, event, vars()))
         
     def event_tm_request_table_structure(self, event, request_id, account):

+ 2 - 2
src/sh/ae.sh

@@ -11,5 +11,5 @@ else
 	FINOPT_HOME=~/l1304/workspace/finopt-ironfly/finopt/src
 fi
 export PYTHONPATH=$FINOPT_HOME:$PYTHONPATH
-python $FINOPT_HOME/rethink/analytics_engine.py  -c -g AE1  
-#python $FINOPT_HOME/rethink/analytics_engine.py   -g AE1  
+#python $FINOPT_HOME/rethink/analytics_engine.py  -c -g AE1  
+python $FINOPT_HOME/rethink/analytics_engine.py   -g AE1  

+ 53 - 25
src/ws/client_g.html

@@ -8,7 +8,7 @@
 </head>
 
 <body>
-   <div id="table_div"></div>
+   <div id="table_div" style="height:500"></div>
    <button id="change-btn">change columns</button>
    <button id="test-btn">test</button>
   <form onsubmit="onSubmit(); return false;">
@@ -25,12 +25,10 @@
 	var view = null;
 	var data = null;
 	var table= null;   
-//        var options = {sortColumn:1, showRowNumber: true, width: '100%', height: '100%'};
-        var options = {sort: 'disable', sortColumn:-1, showRowNumber: true, width: '100%', height: '100%'};
-	   //google.charts.load('current', {'packages':['table']});
-	   google.load("visualization", "1.1", {packages:["corechart", 'table','gauge']});
+    var options = {allowHtml: true, sortColumn:1, showRowNumber: true, width: '100%', height: '100%'};
+	google.load("visualization", "1.1", {packages:["corechart", 'table','gauge']});
 	   //google.setOnLoadCallback(drawTable);
-	   google.setOnLoadCallback(init);
+	google.setOnLoadCallback(init);
 	
 
 	
@@ -46,20 +44,40 @@
 	}
 	
 	
-	   
-	function drawTable() {
-	 
-   	   data= new google.visualization.DataTable();
-	   table = new google.visualization.Table(document.getElementById('table_div'));
-	   table.draw(data, {sort: 'event', sortColumn:-1, showRowNumber: true, width: '100%', height: '100%'});
-	 }
+	function setupFormatter(tableData){
+	    var numF = new google.visualization.NumberFormat(
+    		    {prefix: '$', pattern:'0.00', negativeColor: 'red', negativeParens: true});
+	    var percentF = new google.visualization.NumberFormat(
+	    		{pattern: '##.#%'})
+	    var arrowF = new google.visualization.ArrowFormat();
+        var barF = new google.visualization.BarFormat({width: 80,
+			colorPositive: 'green', max:100 });       
+    			    
+    	var colorF = new google.visualization.ColorFormat();
+    	colorF.addRange(-100, 0, 'white', 'red');
+    	colorF.addRange(0, 100, 'white', 'blue');
+
+
+	var colorGF = new google.visualization.ColorFormat();
+	colorGF.addGradientRange(null, null, 'white', 'orange', 'blue');
+
+    	 	    
+	    numF.format(data, 2);  //avg cost
+	    numF.format(data, 3);  //market value
+	    colorF.format(data, 6); // position
+	    percentF.format(data, 7); //delta		
+	    
+	    numF.format(data, 10); // position delta
+	    numF.format(data, 11); // position theta
+	    numF.format(data, 12); // position gamma
+	    colorGF.format(data, 13); //unreal p/l
+	    barF.format(data, 14) //% gain loss
+		
+	}	   
 	   
 
     var ws;
 	
-	
-	
-    
     function init() {
 
       // Connect to Web Socket
@@ -80,26 +98,36 @@
 
 			console.log(d1);
 			data = new google.visualization.DataTable(d1.value);
-			//view = new google.visualization.DataView(data);
-			/*view.setRows(view.getFilteredRows([{column: 14, test: function(value, row, column, table) {
+			view = new google.visualization.DataView(data);
+			view.setRows(view.getFilteredRows([{column: 15, test: function(value, row, column, table) {
         return (value == 'HSI' || value == 'MHI') 
-    }}]));*/
+    }}]));
+    		setupFormatter(data);
 			table = new google.visualization.Table(document.getElementById('table_div'));
-			table.draw(data, options);
+			table.draw(view, options);
 			output('number of rows: ' + data.getNumberOfRows());
            
         } else if (d1.event == 'event_tm_table_row_updated'){
 
-			console.log(d1.value.row.toString()+ ':' + d1.value.row_values[0]['v']);
+			//console.log(d1.value.row.toString()+ ':' + d1.value.row_values[0]['v']);
 			for (var c=2; c < d1.value.row_values.length; c++){
 				data.setCell(d1.value.row, c, d1.value.row_values[c]["v"]);
 
 			}
-			data.setCell(d1.value.row, 1, d1.value.row_values[0]['v']);
+			//data.setCell(d1.value.row, 1, d1.value.row_values[0]['v']);
 			
-			table.draw(data, options);
+			options.sortColumn = table.getSortInfo().column;
+			table.draw(view, options);
 //			table.draw(data);           
-//        	setValueAtRandomCell(getRandomInt(1000, 5000));
+
+        } else if (d1.event == 'event_tm_table_row_inserted'){
+        	var newRow = d1.value.row_values.map(function(x){
+        		return x['v'];
+        	});
+        	output('adding new row: ' + newRow.toString());
+        	data.addRow(newRow);
+        	options.sortColumn = table.getSortInfo().column;
+        	table.draw(view, options);
         }
         
       };
@@ -137,4 +165,4 @@
 	 
   </script>
 </body>
-</html>
+</html>

+ 17 - 5
src/ws/ws_server.py

@@ -146,7 +146,7 @@ class PortfolioTableModelListener(BaseMessageListener):
     '''
     
     '''
-    CACHE_MAX = 25
+    CACHE_MAX = 12
     TIME_MAX = 1.0
     
     def __init__(self, name, server_wrapper):
@@ -161,10 +161,14 @@ class PortfolioTableModelListener(BaseMessageListener):
         
     def event_tm_table_row_inserted(self, event, source, row, row_values):
         logging.info("[%s] received %s content:[%s]" % (self.name, event, vars()))
+        self.handle_tm_table_row_changes(event, source, row, row_values)
+
 
     def event_tm_table_row_updated(self, event, source, row, row_values):   
-        #logging.info("[%s] received %s content:[%s]" % (self.name, event, vars()))
-        #logging.info("[%s] received %s content:[%d]" % (self.name, event, row))
+        self.handle_tm_table_row_changes(event, source, row, row_values)
+
+    def handle_tm_table_row_changes(self, event, source, row, row_values):
+
         def notify_client():
             self.mwss.get_server().send_message_to_all(json.dumps(
                         {'event':event, 'value':{'row': row, 'row_values': row_values}}));
@@ -188,9 +192,9 @@ class PortfolioTableModelListener(BaseMessageListener):
             self.simple_caching[row] = {'count': 1, 'ts': curr_ts}
             notify_client()    
     
-    def event_tm_table_structure_changed(self, event, origin_request_id, account, data_table_json):
+    def event_tm_table_structure_changed(self, event, source, origin_request_id, account, data_table_json):
         try:
-            logging.info("[%s] received %s content:[%d]" % (self.name, event, origin_request_id))
+            logging.info("[%s] received %s from %s. content:[%d]" % (self.name, event, source, origin_request_id))
             self.mwss.get_server().send_message(self.mwss.clients[origin_request_id], 
                                                 json.dumps({'event': event, 'value': data_table_json})) 
         #except IndexError, KeyError:
@@ -237,6 +241,14 @@ class MainWebSocketServer(BaseWebSocketServerWrapper):
         self.message_handler.add_listeners([tbl_listener])
         self.message_handler.start_prosumer()        
         self.clients = {}
+        
+        # stores server side object id to request ws clients
+        # this allows the server to determine
+        # which ws client is to dispatch the returned messages 
+        # from the server end
+        # 
+        # server_handler_map: {<source_id>: client}
+        self.server_handler_map ={}
 
         self.once = False