瀏覽代碼

separate ws server into a standalone server. developing stage.

laxaurus 9 年之前
父節點
當前提交
14fcd90d5a
共有 4 個文件被更改,包括 55 次插入35 次删除
  1. 35 14
      src/rethink/portfolio_monitor.py
  2. 13 4
      src/rethink/table_model.py
  3. 1 0
      src/ws/client_g.html
  4. 6 17
      src/ws/ws_server.py

+ 35 - 14
src/rethink/portfolio_monitor.py

@@ -13,7 +13,7 @@ from rethink.option_chain import OptionsChain
 from rethink.tick_datastore import TickDataStore
 from rethink.portfolio_item import PortfolioItem, PortfolioRules, Portfolio
 from comms.ibc.tws_client_lib import TWS_client_manager, AbstractGatewayListener
-from ws.ws_server import BaseWebSocketServerWrapper
+
 
 
 
@@ -30,7 +30,8 @@ class PortfolioMonitor(AbstractGatewayListener):
         self.twsc.add_listener_topics(self, kwargs['topics'])
         
         
-        #self.ws_server = 
+        
+        self.ws_server =  None
         '''
             portfolios: {<account>: <portfolio>}
         '''
@@ -41,7 +42,7 @@ class PortfolioMonitor(AbstractGatewayListener):
     def start_engine(self):
         self.twsc.start_manager()
         self.twsc.reqPositions()
-        self.starting_engine = True
+        self.starting_engine = {}
         try:
             logging.info('PortfolioMonitor:main_loop ***** accepting console input...')
             menu = {}
@@ -86,6 +87,11 @@ class PortfolioMonitor(AbstractGatewayListener):
             logging.info('PortfolioMonitor: Service shut down complete...')               
     
         
+    def kproducer(self):
+        # returns a reference to the kafka base producer that we can 
+        # use for sending messages
+        return self.twsc.gw_message_handler()
+    
                 
     def get_portfolio(self, account):
         try:
@@ -152,7 +158,11 @@ class PortfolioMonitor(AbstractGatewayListener):
             port_item.update_position(position, average_cost, extra_info)
             port_item.calculate_pl(contract_key)
             
-            port.fire_table_row_updated(port.ckey_to_row(contract_key))
+            
+            # dispatch the update to internal listeners
+            # and also send out the kafka message to external parties
+            self.notify_table_model_changes(account, port, contract_key, mode='U')
+            
             logging.info('PortfolioMonitor:process_position. Position updated: %s:[%d]' % (contract_key, port.ckey_to_row(contract_key)))
         # new position 
         else:
@@ -178,7 +188,8 @@ class PortfolioMonitor(AbstractGatewayListener):
                     logging.error('PortfolioMonitor:process_position. **** Error in adding the new position %s' % contract_key)
 
                 
-            port.fire_table_row_inserted(port.ckey_to_row(contract_key))
+            
+            self.notify_table_model_changes(account, port, contract_key, mode='I')
             logging.info('PortfolioMonitor:process_position. New position: %s:[%d]' % (contract_key, port.ckey_to_row(contract_key)))
             port.dump_portfolio()
             
@@ -245,7 +256,7 @@ class PortfolioMonitor(AbstractGatewayListener):
                         self.tds.set_symbol_analytics(key_greeks[0], Option.VEGA, key_greeks[1][Option.VEGA])
                         
                         self.portfolios[acct].calculate_item_pl(key_greeks[0])
-                        self.portfolios[acct].fire_table_row_updated(self.portfolios[acct].ckey_to_row(key_greeks[0]))
+                        self.notify_table_model_changes(acct, self.portfolios[acct], key_greeks[0], mode='U')
                         logging.info('PortfolioMonitor:tds_event_tick_updated. Position updated: %s:[%d]' % (key_greeks[0], self.portfolios[acct].ckey_to_row(key_greeks[0])))
                     
                     if results:
@@ -281,14 +292,14 @@ class PortfolioMonitor(AbstractGatewayListener):
             self.process_position(account, contract_key, position, average_cost)
    
         else:
-            # to be run once during start up
+            # to be run once per a/c during start up
             # subscribe to automatic account updates
-            if self.starting_engine:
-                for acct in self.portfolios.keys():
-                    self.twsc.reqAccountUpdates(True, acct)
-                    logging.info('PortfolioMonitor:position. subscribing to auto updates for ac: [%s]' % acct)
-            self.starting_engine = False
-            pass
+            try:
+                self.starting_engine[account]
+            except KeyError:
+                self.twsc.reqAccountUpdates(True, account)
+                logging.info('PortfolioMonitor:position. subscribing to auto updates for ac: [%s]' % account)  
+                self.starting_engine[account] = False
                     
     '''
         the 4 account functions below are invoked by AbstractListener.update_portfolio_account.
@@ -321,6 +332,15 @@ class PortfolioMonitor(AbstractGatewayListener):
         del(items['self'])
         logging.info('%s [[ %s ]]' % (event, items))      
         
+    
+    def notify_table_model_changes(self, account, port, contract_key, mode):
+            row = port.ckey_to_row(contract_key)
+            rvs = port.get_values_at(row)
+            port.fire_table_row_updated(row, rvs)
+            event_type = 'table_row_updated' if mode == 'U' else 'table_row_inserted'
+            self.kproducer().send_message(event_type, json.dumps({'source': 'dt_port-%s' % account, 'row': row, 'row_values': rvs}))
+    
+    
         
 if __name__ == '__main__':
     
@@ -341,7 +361,8 @@ if __name__ == '__main__':
       'clear_offsets':  False,
       'logconfig': {'level': logging.INFO, 'filemode': 'w', 'filename': '/tmp/pm.log'},
       'topics': ['position', 'positionEnd', 'tickPrice', 'update_portfolio_account'],
-      'seek_to_end': ['*']
+      'seek_to_end': ['*'],
+      'ws_port': 9001,
       
       }
 

+ 13 - 4
src/rethink/table_model.py

@@ -2,6 +2,7 @@ from misc2.observer import Subscriber, Publisher
 from misc2.observer import NotImplementedException
 import logging
 
+
 class AbstractTableModel(Publisher):
     
     EVENT_TM_TABLE_CELL_UPDATED = 'event_tm_table_cell_updated'
@@ -19,11 +20,11 @@ class AbstractTableModel(Publisher):
             logging.error("AbstractTableModel:register_listener. Function not implemented in the listener. %s" % e)
             raise NotImplementedException        
     
-    def fire_table_row_updated(self, row):
-        self.dispatch(AbstractTableModel.EVENT_TM_TABLE_ROW_UPDATED, {'row': row})
+    def fire_table_row_updated(self, row, row_values):
+        self.dispatch(AbstractTableModel.EVENT_TM_TABLE_ROW_UPDATED, {'row': row, 'row_values': row_values})
     
-    def fire_table_row_inserted(self, row):
-        self.dispatch(AbstractTableModel.EVENT_TM_TABLE_ROW_INSERTED, {'row': row})
+    def fire_table_row_inserted(self, row, row_values):
+        self.dispatch(AbstractTableModel.EVENT_TM_TABLE_ROW_INSERTED, {'row': row, 'row_values': row_values})
         
         
     def get_column_count(self):
@@ -41,10 +42,18 @@ class AbstractTableModel(Publisher):
     def get_value_at(self, row, col):
         raise NotImplementedException
     
+    def get_values_at(self, row):
+        raise NotImplementedException
+    
     def set_value_at(self, row, col, value):
         raise NotImplementedException
     
     def insert_row(self, values):
         raise NotImplementedException
 
+
+        
     
+
+
+

+ 1 - 0
src/ws/client_g.html

@@ -87,6 +87,7 @@
       // Set event handlers.
       ws.onopen = function() {
         output("onopen");
+        ws.send(JSON.stringify({'request_id':33, ‘request_for': 'init_data_table', 'request_handler': ''}));
       };
       
       ws.onmessage = function(e) {

文件差異過大導致無法顯示
+ 6 - 17
src/ws/ws_server.py


部分文件因文件數量過多而無法顯示