Преглед на файлове

fixed the console freeze bug

bobhk преди 8 години
родител
ревизия
31468c7c9b
променени са 4 файла, в които са добавени 96 реда и са изтрити 42 реда
  1. 5 1
      src/rethink/portfolio_item.py
  2. 60 34
      src/rethink/portfolio_monitor.py
  3. 5 4
      src/ws/client_g.html
  4. 26 3
      src/ws/ws_server.py

+ 5 - 1
src/rethink/portfolio_item.py

@@ -282,6 +282,7 @@ class Portfolio(AbstractTableModel):
     
     
     
+    
     '''
         implement AbstractTableModel methods and other routines
     '''
@@ -391,5 +392,8 @@ class Portfolio(AbstractTableModel):
         map(lambda p: dtj['rows'].append({'c': self.port_item_to_row_fields(p)}), p_items)
         
         
-        return json.dumps(dtj) #, indent=4)            
+        return json.dumps(dtj) #, indent=4)     
+    
+    def dump_table_index_map(self):
+        return '\n'.join('[%d]:%s' % (x[0], x[1]) for x in  self.port['g_table']['row_to_ckey_index'].items())       
         

+ 60 - 34
src/rethink/portfolio_monitor.py

@@ -1,7 +1,7 @@
 # -*- coding: utf-8 -*-
 import sys, traceback
 import logging
-import json
+import json, threading
 import time, datetime
 import copy
 from optparse import OptionParser
@@ -41,48 +41,74 @@ class PortfolioMonitor(AbstractGatewayListener, AbstractPortfolioTableModelListe
         self.starting_engine = {}
         
     
+
+            
+        
+    
     def start_engine(self):
         self.twsc.start_manager()
         self.twsc.reqPositions()
-        
+     
         try:
-            logging.info('PortfolioMonitor:main_loop ***** accepting console input...')
-            menu = {}
-            menu['1']="Request position" 
-            menu['2']="Portfolio dump dtj"
-            menu['3']="TDS dump"
-            menu['4']="Request account updates"
-            menu['5']="Table chart JSON"
-            menu['9']="Exit"
-            while True: 
+            def print_menu():
+                menu = {}
+                menu['1']="Request position" 
+                menu['2']="Portfolio dump dtj"
+                menu['3']="TDS dump"
+                menu['4']="Request account updates"
+                menu['5']="Table chart JSON"
+                menu['6']="Table index mapping"
+                menu['9']="Exit"
+        
                 choices=menu.keys()
                 choices.sort()
                 for entry in choices: 
-                    print entry, menu[entry]            
-                
-                sleep(0.15)
-                selection = raw_input("Enter command:")
-                if selection =='1':
-                    self.twsc.reqPositions()
-                elif selection == '2': 
-                    for port in self.portfolios.values():
-                        print port.dump_portfolio()
-                elif selection == '3': 
-                    print self.tds.dump()
-                elif selection == '4': 
-                    for acct in self.portfolios.keys():
-                        self.twsc.reqAccountUpdates(True, acct)
-                elif selection == '5':
-                    for port in self.portfolios.values():
-                        print port.get_JSON() 
-                elif selection == '9': 
-                    self.twsc.gw_message_handler.set_stop()
-                    break
-                else: 
-                    pass                
-                
+                    print entry, menu[entry]                             
                 
+            def get_user_input(selection):
+                    logging.info('PortfolioMonitor:main_loop ***** accepting console input...')
+                    print_menu()
+                    while 1:
+                        resp = sys.stdin.readline()
+                        response[0]= resp.strip('\n')
+                        #print response[0]
+                                
             
+            response = [None]
+            user_input_th = threading.Thread(target=get_user_input, args=(response,))
+            user_input_th.daemon = True
+            user_input_th.start()               
+            while True:
+                sleep(0.4)
+                
+                if response[0] is not None:
+                    selection = response[0]
+                    if selection =='1':
+                        self.twsc.reqPositions()
+                    elif selection == '2': 
+                        for port in self.portfolios.values():
+                            print port.dump_portfolio()
+                    elif selection == '3': 
+                        
+                        print self.tds.dump()
+                    elif selection == '4': 
+                        for acct in self.portfolios.keys():
+                            self.twsc.reqAccountUpdates(True, acct)
+                    elif selection == '5':
+                        for port in self.portfolios.values():
+                            print port.get_JSON() 
+                    elif selection == '6':
+                        for acct in self.portfolios.keys():
+                            print self.portfolios[acct].dump_table_index_map() 
+                            
+                    elif selection == '9': 
+                        self.twsc.gw_message_handler.set_stop()
+                        break
+                    else: 
+                        pass                        
+                    response[0] = None
+                    print_menu()
+                    
         except (KeyboardInterrupt, SystemExit):
             logging.error('PortfolioMonitor: caught user interrupt. Shutting down...')
             self.twsc.gw_message_handler.set_stop() 

+ 5 - 4
src/ws/client_g.html

@@ -23,7 +23,8 @@
 	
 	var view = null;
 	var data = null;
-	var table= null;    	
+	var table= null;   
+        var options = {sortColumn:1, showRowNumber: true, width: '100%', height: '100%'}; 	
 	   //google.charts.load('current', {'packages':['table']});
 	   google.load("visualization", "1.1", {packages:["corechart", 'table','gauge']});
 	   google.setOnLoadCallback(drawTable);
@@ -88,7 +89,7 @@
 
       // Connect to Web Socket
       ws = new WebSocket("ws://localhost:9001/");
-
+   
       // Set event handlers.
       ws.onopen = function() {
         output("onopen");
@@ -108,7 +109,7 @@
 			/*view.setRows(view.getFilteredRows([{column: 14, test: function(value, row, column, table) {
         return (value == 'HSI' || value == 'MHI') 
     }}]));*/
-			table.draw(view, {sortColumn:1});
+			table.draw(view, options);
 			output('number of rows: ' + data.getNumberOfRows());
            
         } else if (d1.event == 'event_tm_table_row_updated'){
@@ -118,7 +119,7 @@
 				data.setCell(d1.value.row, c, d1.value.row_values[c]["v"]);
 
 			}
-			table.draw(view);
+			table.draw(view, options);
 //			table.draw(data);           
 //        	setValueAtRandomCell(getRandomInt(1000, 5000));
         }

+ 26 - 3
src/ws/ws_server.py

@@ -146,11 +146,13 @@ class PortfolioTableModelListener(BaseMessageListener):
     '''
     
     '''
+    CACHE_MAX = 50
+    TIME_MAX = 3.0
     
     def __init__(self, name, server_wrapper):
         BaseMessageListener.__init__(self, name)
         self.mwss = server_wrapper
-        
+        self.simple_caching = {}
 
     def event_tm_table_cell_updated(self, event, source, row, row_values):
         logging.info("[%s] received %s content:[%d]" % (self.name, event, row))
@@ -163,15 +165,36 @@ class PortfolioTableModelListener(BaseMessageListener):
     def event_tm_table_row_updated(self, event, source, row, row_values):   
         #logging.info("[%s] received %s content:[%s]" % (self.name, event, vars()))
         #logging.info("[%s] received %s content:[%d]" % (self.name, event, row))
-        self.mwss.get_server().send_message_to_all(json.dumps(
+        def notify_client():
+            self.mwss.get_server().send_message_to_all(json.dumps(
                         {'event':event, 'value':{'row': row, 'row_values': row_values}}));
+            
+        try:
+            curr_ts = time.time()
+            if self.simple_caching[row]['count'] < PortfolioTableModelListener.CACHE_MAX or\
+                curr_ts - self.simple_caching[row]['ts'] < PortfolioTableModelListener.TIME_MAX:
+                self.simple_caching[row]['count'] +=1
+                self.simple_caching[row]['ts'] = curr_ts
+            else:
+                logging.info('event_tm_table_row_updated: flush condition met, sending changes to clients. row:[%d] %d %0.2f' %
+                                (row, self.simple_caching[row]['count'], curr_ts - self.simple_caching[row]['ts']))
+                self.simple_caching[row]['count'] = 0
+                self.simple_caching[row]['ts'] = curr_ts
+                notify_client()
+                
+            
+            
+        except KeyError:
+            self.simple_caching[row] = {'count': 1, 'ts': curr_ts}
+            notify_client()    
     
     def event_tm_table_structure_changed(self, event, origin_request_id, account, data_table_json):
         try:
             logging.info("[%s] received %s content:[%d]" % (self.name, event, origin_request_id))
             self.mwss.get_server().send_message(self.mwss.clients[origin_request_id], 
                                                 json.dumps({'event': event, 'value': data_table_json})) 
-        except IndexError:
+        #except IndexError, KeyError:
+        except:
             logging.error('[%s]. index error %d' % (event, origin_request_id))