瀏覽代碼

feb 19 updates

fixes thread issue by cloning kwargs instead of an assignment in
base_messaging
add request position summary function in javascript
add support for far month options and quarterly futures
save row values to cache map in ws_server
laxaurus 7 年之前
父節點
當前提交
9837daea08

+ 13 - 11
src/comms/ibgw/base_messaging.py

@@ -131,7 +131,8 @@ class BaseConsumer(threading.Thread, Publisher):
         self.name = '%s-%s' % (name, uuid.uuid5(uuid.NAMESPACE_OID, name)) 
         logging.info('BaseConsumer __init__: name=%s' % self.name)
         self.args = args
-        self.kwargs = kwargs
+        #self.kwargs = kwargs
+        self.kwargs = copy.copy(kwargs)  
         self.rs = Redis(self.kwargs['redis_host'], self.kwargs['redis_port'], self.kwargs['redis_db'])
         try:
             self.kwargs['seek_to_end']
@@ -342,7 +343,8 @@ class BaseConsumer(threading.Thread, Publisher):
                       
                 """
                 if self.my_topics[message.topic][str(message.partition)] > message.offset:
-                    logging.info('BaseConsumer ********************** old message...discarding %s %d(%d)' % (message.topic, message.offset, 
+                    if (self.my_topics[message.topic][str(message.partition)] - message.offset) % 1000 == 0: 
+                        logging.info('BaseConsumer ********************** old message...discarding %s %d(%d)' % (message.topic, message.offset, 
                                                                                         self.my_topics[message.topic][str(message.partition)]))
                 else:
                     #if self.my_topics[message.topic][str(message.partition)] == message.offset:
@@ -539,9 +541,9 @@ class Prosumer2Listener(BaseMessageListener):
         
         
             
-def test_prosumer2(mode):
+def test_prosumer2(mode, boot_host):
     
-    bootstrap_host = 'vorsprung'
+    bootstrap_host = boot_host
     
     if mode == 'A':
                 
@@ -607,7 +609,7 @@ def test_prosumer2(mode):
 class TestProducer(BaseProducer):
     pass
 
-def test_base_proconsumer(mode):
+def test_base_proconsumer(mode, bootstrap_host):
     '''
         This example demonstrates
         
@@ -623,7 +625,7 @@ def test_base_proconsumer(mode):
         #Producer().start()
         
         tp = TestProducer(name = 'testproducer', kwargs={
-                                             'bootstrap_host':'vsu-bison', 'bootstrap_port':9092,
+                                             'bootstrap_host':bootstrap_host, 'bootstrap_port':9092,
                                              'topics': topics})
         tp.start()
         i = 0 
@@ -716,8 +718,8 @@ def main():
     #
     tp = [ test_base_proconsumer, test_prosumer2]
     
-    if len(sys.argv) != 3:
-        print("Usage: %s <role(producer or consumer): P|C> <test case #[0..1]>" % sys.argv[0])
+    if len(sys.argv) != 4:
+        print("Usage: %s <role(producer or consumer): P|C> <test case #[0..1]> <kafka_bootstrap_host name/ip>" % sys.argv[0])
         print "\n".join('case #%d: %s' % (i, tp[i].__name__) for i in range(len(tp)))
         print "example: python %s P 1" % sys.argv[0]
         print "example: python %s C 1" % sys.argv[0]
@@ -737,13 +739,13 @@ def main():
         
         test_prosumer2
         
-        Provide weather information: tp[1]('B')
-        Request weather information: tp[1]('A')
+        Provide weather information: tp[1]('B') ex. ./base_messaging.sh B 1 vorsprung
+        Request weather information: tp[1]('A') ex. ./base_messaging.sh A 1 vorsprung
     
         
         
     '''    
-    tp[int(sys.argv[2])](mode)
+    tp[int(sys.argv[2])](mode, sys.argv[3])
 
     #time.sleep(30)
 #     while 1:

+ 18 - 14
src/html/client_g.html

@@ -85,6 +85,7 @@ table.minimalistBlack tfoot td {
 		<li><input type="radio" name='acct' id="acct" value="U9050568" checked="checked"> U9050568</li>
 		<li><input type="radio" name='acct' id="acct" value="U8080985"> U8080985</li>
 		<li><button id="reload_port">Reload Portfolio</button></li>
+		<li><button id="port-btn">Update summary information</button></li>
 		<li class="bg-led indicator"></li>
 	</ul>  
 		
@@ -119,7 +120,7 @@ table.minimalistBlack tfoot td {
 	<img src="public/green_marker.png" height="50"><div id='spot'></div>
 	</div>   
    <button id="change-btn">change columns</button>
-   <button id="test-btn">test</button>
+   
   <form onsubmit="onSubmit(); return false;">
     <input type="text" id="input">
     <input type="submit" value="Send">
@@ -151,10 +152,9 @@ table.minimalistBlack tfoot td {
 	});
   
   
-	$('#test-btn').click(function(){
-		$('#port_sum').find("tr:eq(1)").find("td:eq(1)").html('new stuff');
-		
-		alert($('#port_sum').find("tr:eq(1)").find("td:eq(1)").html());
+	$('#port-btn').click(function(){
+		account = $('input[id=acct]:checked').val()
+		ws.send(JSON.stringify({event: 'event_request_port_summary', target_resource:{}, 'account': account}));
 	});  
 	$('#reload_port').click(function(){
 		alert('Reloading portfolio...');
@@ -187,7 +187,7 @@ table.minimalistBlack tfoot td {
 	var columnChart = {'view': null, 'data': null, 'chart': null, 'options': chartOptions};
 
     var tableOptions = {allowHtml: true, sortColumn:1, 
-    		showRowNumber: true, width: '90%', height: '80%'};
+    		showRowNumber: true, width: '95%', height: '100%'};
 	var tableChart = {'view': null, 'data': null, 'chart': null, 'options': tableOptions};
 	
 
@@ -227,6 +227,8 @@ table.minimalistBlack tfoot td {
 	    var arrowF = new google.visualization.ArrowFormat();
         var barF = new google.visualization.BarFormat({width: 80,
 			colorPositive: 'green', max:100 });       
+        var barIV = new google.visualization.BarFormat({width: 80,
+			colorPositive: 'red', min: 0, max:0.5 });       
     			    
     	var colorF = new google.visualization.ColorFormat();
     	colorF.addRange(-100, 0, 'white', 'red');
@@ -248,7 +250,7 @@ table.minimalistBlack tfoot td {
 		    numF.format(tableChart.data, 12); // position gamma
 		    colorGF.format(tableChart.data, 13); //unreal p/l
 		    barF.format(tableChart.data, 14) //% gain loss
-		    percentF.format(tableChart.data, 15); //delta
+		    barIV.format(tableChart.data, 15); //iv
 		
 	}	   
 	   
@@ -273,7 +275,7 @@ table.minimalistBlack tfoot td {
 	      
 	      ws.send(JSON.stringify({event: 'event_tm_request_table_structure', target_resource:{'class': 'Portfolio'}, 'account': account}));
 	      ws.send(JSON.stringify({event: 'event_tm_request_table_structure', target_resource:{'class': 'PortfolioColumnChartTM'}, 'account': account}));
-	      
+	      ws.send(JSON.stringify({event: 'event_request_port_summary', target_resource:{}, 'account': account}));
 	    };
 	    
 	    ws.onmessage = function(e) {
@@ -321,7 +323,7 @@ table.minimalistBlack tfoot td {
 					
 				} else if (d1.source.class == 'PortfolioColumnChartTM'){
 					columnChart.data = new google.visualization.DataTable(d1.value);
-					console.log(d1.account);
+					console.log('updating pcc for ' + d1.account);
 					columnChart.view = new google.visualization.DataView(columnChart.data);
 					columnChart.chart = new google.visualization.ColumnChart(
 					        document.getElementById('chart_div'));
@@ -331,7 +333,7 @@ table.minimalistBlack tfoot td {
 				        
 			} else if (d1.event == 'event_tm_table_row_updated'){
 				
-				console.log(d1.value.row.toString()+ ':' + d1.value.row_values[0]['v']);
+				//console.log(d1.value.row.toString()+ ':' + d1.value.row_values[0]['v']);
 				for (var c=2; c < d1.value.row_values.length; c++){
 					tableChart.data.setCell(d1.value.row, c, d1.value.row_values[c]["v"]);
 				
@@ -354,18 +356,20 @@ table.minimalistBlack tfoot td {
 			} else if (d1.event == 'event_port_values_updated' && d1.account == account){
 			 	setPortSummaryTableVal(d1.value);
 			} else if (d1.event == 'tickPrice'){
-					
 				if (d1.current_or_next == 'current'){
 					setCellValue('port_sum', 1, 6, d1.price);
-					setCellValue('port_sum', 1, 7, d1.change.toFixed(2));
+					if (d1.change)
+						setCellValue('port_sum', 1, 7, d1.change.toFixed(2));
 				}
 				else{ // next month
 					setCellValue('port_sum', 2, 6, d1.price);
-					setCellValue('port_sum', 2, 7, d1.change.toFixed(2));
+					if (d1.change)
+						setCellValue('port_sum', 2, 7, d1.change.toFixed(2));
 				}
 				if (columnChart){
 					placeIndexMarker();
-				}				
+				}		
+
 			     
 			}
 		};

+ 1 - 1
src/rethink/portfolio_column_chart.py

@@ -186,7 +186,7 @@ class PortfolioColumnChart():
             return 1
         
         map(update_ijv, ijv_dist)
-        print xy_arr
+        #print xy_arr
         logging.info('PortfolioColumnChart: JSON array->\n%s' % xy_arr)
         
         def gen_datatable():

+ 35 - 6
src/rethink/portfolio_monitor.py

@@ -6,10 +6,12 @@ import copy
 from optparse import OptionParser
 from time import sleep
 import time
+import math
 from datetime import datetime
 from dateutil.relativedelta import relativedelta
 from ib.ext.Execution import Execution
 from ib.ext.ExecutionFilter import ExecutionFilter
+from finopt import optcal
 from misc2.helpers import ContractHelper, LoggerNoBaseMessagingFilter, ExecutionFilterHelper
 from finopt.instrument import Symbol, Option, InstrumentIdMap, ExecFill
 from rethink.option_chain import OptionsChain
@@ -18,6 +20,7 @@ from rethink.portfolio_item import PortfolioItem, PortfolioRules, Portfolio, Por
 from rethink.portfolio_column_chart import PortfolioColumnChart,PortfolioColumnChartTM
 from rethink.table_model import AbstractTableModel, AbstractPortfolioTableModelListener
 from comms.ibc.tws_client_lib import TWS_client_manager, AbstractGatewayListener
+from pip._internal.req.constructors import deduce_helpful_msg
 
 
 
@@ -193,12 +196,26 @@ class PortfolioMonitor(AbstractGatewayListener, AbstractPortfolioTableModelListe
         '''
             given an Option object, return the underlying Symbol object
         '''
+        def deduce_nearest_quarter_month(m):
+            return int(math.ceil(m/3.0)*3)
+        
+        
         try:
             symbol_id = option.get_contract().m_symbol
             underlying_sectype = PortfolioRules.rule_map['symbol'][symbol_id]
             exchange = option.get_contract().m_exchange
             currency = option.get_contract().m_currency
-            expiry = option.get_contract().m_expiry if PortfolioRules.rule_map['expiry'][symbol_id] ==  'same_month' else ''
+            
+            opt_month = int(option.get_contract().m_expiry[4:6])
+            opt_year = int(option.get_contract().m_expiry[0:4])
+            opt_date = datetime.strptime(option.get_contract().m_expiry, '%Y%m%d')
+            today = datetime.now() 
+            month_delta = relativedelta(opt_date, today).months
+            if month_delta >=2:           
+                nearest_quarter_month = deduce_nearest_quarter_month(opt_month) 
+                expiry = optcal.get_HSI_last_trading_day_ex(nearest_quarter_month, opt_year)
+            else:
+                expiry = option.get_contract().m_expiry if PortfolioRules.rule_map['expiry'][symbol_id] ==  'same_month' else ''
             contractTuple = (symbol_id, underlying_sectype, exchange, currency, expiry, 0, '')
             logging.info('PortfolioMonitor:deduce_option_underlying. Deduced underlying==> %s' %
                           str(contractTuple))
@@ -207,7 +224,10 @@ class PortfolioMonitor(AbstractGatewayListener, AbstractPortfolioTableModelListe
             logging.error('PortfolioMonitor:deduce_option_underlying. Unable to deduce the underlying for the given option %s' %
                           ContractHelper.printContract(option.get_contract))
             return None
-        
+        except:
+            logging.error('PortfolioMonitor:deduce_option_underlying. %s' % traceback.format_exc() )
+            return None
+            
         
     def get_portfolio_option_chain(self, account, underlying):
         
@@ -524,9 +544,9 @@ class PortfolioMonitor(AbstractGatewayListener, AbstractPortfolioTableModelListe
             else:
                 pcc.update_tally_count()
                 logging.info('PortfolioMonitor:notify_table_model_changes. tally count %d' % pcc.get_last_tally())
-#            if mode == 'I':
-#                 pcc.fire_table_structure_changed(AbstractTableModel.EVENT_TM_TABLE_STRUCTURE_CHANGED, 
-#                                                  pcc.get_object_name(), None, account, pcc.get_JSON())
+#                 if mode == 'I':
+#                     pcc.fire_table_structure_changed(AbstractTableModel.EVENT_TM_TABLE_STRUCTURE_CHANGED, 
+#                                                      pcc.get_object_name(), None, account, pcc.get_JSON())
 #            else:
                 
 #                 row = pcc.ckey_to_row(contract_key)
@@ -556,6 +576,12 @@ class PortfolioMonitor(AbstractGatewayListener, AbstractPortfolioTableModelListe
         logging.info("[PortfolioColumnChartTM:] received %s  content:[%s]" % (event, data_table_json)    )        
     
     
+    def event_request_port_summary(self, event, request_id, account):
+        try:
+            port = self.portfolios[account]
+            self.notify_port_values_updated(account, port)
+        except:
+            logging.error('PortfolioMonitor:event_request_port_summary failed to request port summary for [%s]' % account)
     
     def notify_port_values_updated(self, account, port):
         try:
@@ -563,6 +589,8 @@ class PortfolioMonitor(AbstractGatewayListener, AbstractPortfolioTableModelListe
                                                json.dumps({'account': account,
                                                      'port_values': self.portfolios[account].get_potfolio_values()}),
                                                )
+            logging.info('**** PortfolioMonitor:notify_port_values_updated. %s' % json.dumps({'account': account,
+                                                     'port_values': self.portfolios[account].get_potfolio_values()}) )
         except:
             logging.error('**** Error PortfolioMonitor:notify_port_values_updated. %s' % traceback.format_exc() )
         
@@ -585,7 +613,8 @@ if __name__ == '__main__':
       'session_timeout_ms': 10000,
       'clear_offsets':  False,
       'logconfig': {'level': logging.INFO, 'filemode': 'w', 'filename': '/tmp/pm.log'},
-      'topics': ['position', 'positionEnd', 'tickPrice', 'execDetails', 'update_portfolio_account', 'event_tm_request_table_structure', AbstractTableModel.EVENT_TM_TABLE_STRUCTURE_CHANGED],
+      'topics': ['position', 'positionEnd', 'tickPrice', 'execDetails', 'update_portfolio_account', 'event_tm_request_table_structure', 'event_request_port_summary', 
+                 AbstractTableModel.EVENT_TM_TABLE_STRUCTURE_CHANGED],
       'seek_to_end': ['*'],
       
       

+ 1 - 1
src/sh/base_messaging.sh

@@ -16,5 +16,5 @@ FINDATA=$ROOT/../data
 SRC=$ROOT
 export PYTHONPATH=$SRC:$PYTHONPATH
 
-python $FINOPT_HOME/comms/ibgw/base_messaging.py $1 $2
+python $FINOPT_HOME/comms/ibgw/base_messaging.py $1 $2 $3
 

+ 2 - 2
src/sh/ws.sh

@@ -17,5 +17,5 @@ fi
 
 
 export PYTHONPATH=$FINOPT_HOME:$PYTHONPATH
-#python $FINOPT_HOME/ws/ws_server.py  -c -g AE1  
-python $FINOPT_HOME/ws/ws_server.py   -g AE1 -f ../config/ws.cfg 
+python $FINOPT_HOME/ws/ws_server.py  -c  -g AE1 -f ../config/ws.cfg 
+#python $FINOPT_HOME/ws/ws_server.py   -g AE1 -f ../config/ws.cfg 

+ 16 - 7
src/ws/ws_server.py

@@ -193,6 +193,7 @@ class PortfolioTableModelListener(BaseMessageListener):
                 curr_ts - self.simple_caching[row]['ts'] < PortfolioTableModelListener.TIME_MAX:
                 self.simple_caching[row]['count'] +=1
                 self.simple_caching[row]['ts'] = curr_ts
+                self.simple_caching[row]['row_values'] = row_values
             else:
                 logging.info('event_tm_table_row_updated: flush condition met, sending changes to clients. row:[%d] %d %0.2f' %
                                 (row, self.simple_caching[row]['count'], curr_ts - self.simple_caching[row]['ts']))
@@ -203,7 +204,7 @@ class PortfolioTableModelListener(BaseMessageListener):
             
             
         except KeyError:
-            self.simple_caching[row] = {'count': 1, 'ts': curr_ts}
+            self.simple_caching[row] = {'count': 1, 'ts': curr_ts, 'row_values': row_values}
             notify_client()    
     
     def event_tm_table_structure_changed(self, event, source, origin_request_id, account, data_table_json):
@@ -277,12 +278,15 @@ class MainWebSocketServer(BaseWebSocketServerWrapper, AbstractGatewayListener):
         # this problem has to be cleaned up at a later stage
         # for now just do a hack to pop the value in the topics key and
         # replace with tickPrice
-        kwargs['topics'].pop()
-        kwargs['topics'] = ['tickPrice']
-        self.twsc = TWS_client_manager(kwargs)
+        # 
+        
+        temp_kwargs = copy.copy(kwargs)  
+        temp_kwargs['topics'] = ['tickPrice']
+        temp_kwargs['parent'] = self
+        self.twsc = TWS_client_manager(temp_kwargs)
         self.twsc.add_listener_topics(self, ['tickPrice'] )
         self.subscribe_hsif_ticks()
-        self.www = HTTPServe(kwargs)
+        self.www = HTTPServe(temp_kwargs)
         th = threading.Thread(target=self.www.start_server)
         th.daemon = True 
         th.start()
@@ -403,12 +407,13 @@ class MainWebSocketServer(BaseWebSocketServerWrapper, AbstractGatewayListener):
             
     def new_client(self, client, server):
         self.clients[client['id']] = client 
-        print 'new client id:%d %s' % (client['id'], client)
+        logging.info('MainWebSocketServer:new client id:%d %s' % (client['id'], client))
     
     # Called for every client disconnecting
     def client_left(self, client, server):
         try:
             BaseWebSocketServerWrapper.client_left(self, client, server)
+            logging.info('MainWebSocketServer:client left:%d %s' % (client['id'], client))
             del self.clients[client['id']]
         except:
             pass
@@ -421,6 +426,10 @@ class MainWebSocketServer(BaseWebSocketServerWrapper, AbstractGatewayListener):
         if message['event'] == AbstractTableModel.EVENT_TM_REQUEST_TABLE_STRUCTURE:
             self.message_handler.send_message(AbstractTableModel.EVENT_TM_REQUEST_TABLE_STRUCTURE, 
                                           json.dumps({'request_id' : client['id'], 'target_resource': message['target_resource'], 'account': message['account']}))
+        elif message['event'] == 'event_request_port_summary':
+            self.message_handler.send_message('event_request_port_summary', 
+                                          json.dumps({'request_id' : client['id'], 'account': message['account']}))
+            
 
 
     def tickPrice(self, event, contract_key, field, price, canAutoExecute):
@@ -501,7 +510,7 @@ def main():
     logconfig = kwargs['logconfig']
     logconfig['format'] = '%(asctime)s %(levelname)-8s %(message)s'    
     logging.basicConfig(**logconfig)        
-    logging.getLogger().addFilter(LoggerNoBaseMessagingFilter())  
+    #logging.getLogger().addFilter(LoggerNoBaseMessagingFilter())  
     
     esw = MainWebSocketServer('ChartTableWS', kwargs)    
     esw.start_server()

+ 6 - 5
src/ws/ws_webserver.py

@@ -8,12 +8,12 @@ import thread
 
 class PortalServer(object):
     
-    config = None
     
-    
-    def __init__(self, config):
+    def __init__(self, config, ws_parent):
         super(PortalServer, self).__init__()
         PortalServer.config = config
+        self.ws_parent = ws_parent
+        
       
 
     
@@ -35,11 +35,12 @@ class PortalServer(object):
                  
 class HTTPServe():
     
-    def __init__(self, config):
+    def __init__(self, config, parent):
         self.config = config
+        self.ws_parent = parent
         
     def start_server(self):
-        cherrypy.quickstart(PortalServer(self.config), '/', self.config['ws_webserver_cfg_path'])
+        cherrypy.quickstart(PortalServer(self.config, self.ws_parent), '/', self.config['ws_webserver_cfg_path'])
     
     def stop_server(self):
         cherrypy.engine.exit()