bobhk hace 9 años
padre
commit
165ed365bc
Se han modificado 5 ficheros con 48 adiciones y 32 borrados
  1. 10 3
      src/rethink/analytics_engine.py
  2. 3 1
      src/rethink/portfolio_monitor.py
  3. 2 1
      src/sh/ae.sh
  4. 2 2
      src/sh/start_twsgw.sh
  5. 31 25
      src/ws/ws_server.py

+ 10 - 3
src/rethink/analytics_engine.py

@@ -170,7 +170,9 @@ class AnalyticsEngine(AbstractGatewayListener):
         
     
     def tds_event_tick_updated(self, event, contract_key, field, price, syms):
-        
+        if field not in [Symbol.ASK, Symbol.BID, Symbol.LAST]:
+            return
+                
         for s in syms:
             
             if OptionsChain.CHAIN_IDENTIFIER in s.get_extra_attributes():
@@ -179,9 +181,12 @@ class AnalyticsEngine(AbstractGatewayListener):
                 logging.info('AnalyticsEngine:tds_event_tick_updated chain_id %s' % chain_id)
                 if chain_id  in self.option_chains.keys():
                     if 'FUT' in contract_key or 'STK' in contract_key:
-                        results = self.option_chains[chain_id].cal_greeks_in_chain(self.kwargs['evaluation_date'])
+                        results = self.option_chains[chain_id].cal_greeks_in_chain(self.kwargs['evaluation_date'], price)
+                        
                     else:
-                        results[ContractHelper.makeRedisKeyEx(s.get_contract())] = self.option_chains[chain_id].cal_option_greeks(s, self.kwargs['evaluation_date'])
+                        results[ContractHelper.makeRedisKeyEx(s.get_contract())] = self.option_chains[chain_id].cal_option_greeks\
+                                                                                        (s, self.kwargs['evaluation_date'], float('nan'), price)
+                        
                 logging.info('AnalysticsEngine:tds_event_tick_updated. compute greek results %s' % results)    
                 # set_analytics(self, imvol=None, delta=None, gamma=None, theta=None, vega=None, npv=None):
                 # 
@@ -199,6 +204,8 @@ class AnalyticsEngine(AbstractGatewayListener):
                 
                 continue
              
+
+        
         
 
 

+ 3 - 1
src/rethink/portfolio_monitor.py

@@ -13,7 +13,7 @@ from rethink.option_chain import OptionsChain
 from rethink.tick_datastore import TickDataStore
 from rethink.portfolio_item import PortfolioItem, PortfolioRules, Portfolio
 from comms.ibc.tws_client_lib import TWS_client_manager, AbstractGatewayListener
-from ws.ws_server import WebSocketServerWrapper
+from ws.ws_server import BaseWebSocketServerWrapper
 
 
 
@@ -29,6 +29,8 @@ class PortfolioMonitor(AbstractGatewayListener):
         self.tds.register_listener(self)
         self.twsc.add_listener_topics(self, kwargs['topics'])
         
+        
+        #self.ws_server = 
         '''
             portfolios: {<account>: <portfolio>}
         '''

+ 2 - 1
src/sh/ae.sh

@@ -11,4 +11,5 @@ else
 	FINOPT_HOME=~/l1304/workspace/finopt-ironfly/finopt/src
 fi
 export PYTHONPATH=$FINOPT_HOME:$PYTHONPATH
-python $FINOPT_HOME/rethink/analytics_engine.py  -c -g AE1  
+#python $FINOPT_HOME/rethink/analytics_engine.py  -c -g AE1  
+python $FINOPT_HOME/rethink/analytics_engine.py   -g AE1  

+ 2 - 2
src/sh/start_twsgw.sh

@@ -13,7 +13,7 @@ fi
 export PYTHONPATH=$FINOPT_HOME:$PYTHONPATH
 #  
 # clear all topic offsets and erased saved subscriptions
-#python $FINOPT_HOME/comms/ibgw/tws_gateway.py -r -c -f $FINOPT_HOME/config/tws_gateway.cfg 
+python $FINOPT_HOME/comms/ibgw/tws_gateway.py -r -c -f $FINOPT_HOME/config/tws_gateway.cfg 
 
 
 #
@@ -22,7 +22,7 @@ export PYTHONPATH=$FINOPT_HOME:$PYTHONPATH
 
 
 # restart gateway keep the redis offsets but erase the subscription entries
-python $FINOPT_HOME/comms/ibgw/tws_gateway.py  -r -f $FINOPT_HOME/config/tws_gateway.cfg 
+#python $FINOPT_HOME/comms/ibgw/tws_gateway.py  -r -f $FINOPT_HOME/config/tws_gateway.cfg 
 
 # normal restart - keep the offsets and reload from saved subscription entries
 #python $FINOPT_HOME/comms/ibgw/tws_gateway.py   -f $FINOPT_HOME/config/tws_gateway.cfg 

+ 31 - 25
src/ws/ws_server.py

@@ -1,46 +1,56 @@
 from websocket_server import WebsocketServer
 import logging, time, traceback
 from threading import Thread
-import logging
+import logging, copy
 import json
 from comms.ibgw.base_messaging import BaseMessageListener
 # https://github.com/Pithikos/python-websocket-server
 
 
-class WebSocketServerWrapper(BaseMessageListener):
+class BaseWebSocketServerWrapper(BaseMessageListener):
     
-    def __init__(self, name):
+    DEFAULT_CONFIG = {
+      'ws_port': 9001,
+    }    
+    
+    def __init__(self, name, kwargs):
                
         #BaseMessageListener
+        self.kwargs = copy.copy(self.DEFAULT_CONFIG)
+        for key in self.kwargs:
+            if key in kwargs:
+                self.kwargs[key] = kwargs.pop(key)        
+        self.kwargs.update(kwargs)
+        
         self.clients = {}
         
-        PORT=9001
-        server = WebsocketServer(PORT)
+        
+        server = WebsocketServer(self.kwargs['ws_port'])
         self.set_server(server)
         server.set_fn_new_client(self.new_client)
         server.set_fn_client_left(self.client_left)
         server.set_fn_message_received(self.message_received)
         
-        def run_background():
-            server.run_forever()
-            
-        self.t = Thread(target=run_background)        
-        self.t.start()
-        
         
         
     def set_server(self, server):
         self.server = server
         
     def event_tm_table_row_inserted(self, row):
-        logging.info('WebSocketServerWrapper:event_tm_table_row_inserted %d', row)
+        logging.info('BaseWebSocketServerWrapper:event_tm_table_row_inserted %d', row)
     
     def event_tm_table_row_updated(self, row):
-        logging.info('WebSocketServerWrapper:event_tm_table_row_updated %d', row)
+        logging.info('BaseWebSocketServerWrapper:event_tm_table_row_updated %d', row)
         logging.info(json.dumps(self.port_table.get_values_at(row)))
             
     
     def start_server(self):
+        def run_background():
+            self.server.run_forever()
+            
+        self.t = Thread(target=run_background)        
+        self.t.start()
+        logging.info('BaseWebSocketServerWrapper:start_server. Server started. Awaiting clients on port %d...' % self.kwargs['ws_port'])
         while 1:
             time.sleep(0.5)
             #print 'sending stuff.. %s' % str(list(self.clients.iteritems()))
@@ -49,8 +59,7 @@ class WebSocketServerWrapper(BaseMessageListener):
             #msg = self.encode_message('update_chart', val)
             #map(lambda x: self.server.send_message(x[1], msg), list(self.clients.iteritems()))
             
-    def encode_message(self, event_type, content):
-        
+    def encode_message(self, event_type, content):        
         return json.dumps({'event': event_type, 'value': content})
             
     def new_client(self, client, server):
@@ -67,7 +76,7 @@ class WebSocketServerWrapper(BaseMessageListener):
         print("Client(%d) disconnected" % client['id'])
     
     
-    # Called when a client sends a message
+    # Called when a client sends a message1
     def message_received(self, client, server, message):
         if len(message) > 200:
             message = message[:200]+'..'
@@ -80,16 +89,13 @@ class WebSocketServerWrapper(BaseMessageListener):
             shut_down()
 
 def main():
-    wsw = WebSocketServerWrapper('hello')    
-    #wsw.start_server()
     
-#     PORT=9001
-#     server = WebsocketServer(PORT)
-#     wsw.set_server(server)
-#     server.set_fn_new_client(wsw.new_client)
-#     server.set_fn_client_left(wsw.client_left)
-#     server.set_fn_message_received(wsw.message_received)
-#     server.run_forever()
+    kwargs = {
+      'ws_port': 9001,
+    }       
+    wsw = BaseWebSocketServerWrapper('hello', kwargs)    
+    wsw.start_server()
+
     
     
 if __name__ == "__main__":