Ver Fonte

add flasker support and other enhancements

change to use local minquery for faster performance
add helper methods to compute nearest min max
add new account fields available funds and total cash value in port
summary
fix ib_hearbeat not shutdown properly
add webconsole rest api feature to pm and tws_gateway
fix portfolio shutdown mechanism
laxaurus há 6 anos atrás
pai
commit
63833cb3a7

+ 2 - 2
src/comms/ibgw/base_messaging.py

@@ -173,12 +173,12 @@ class BaseConsumer(threading.Thread, Publisher):
             clear the offsets in redis by removing the value from redis
             and emptying the internal my_topics dict
             
-            clear offsets is ncessary when the offset in redis was saved 
+            clear offsets is necessary when the offset in redis was saved 
             at a time since kafka manager was shut down
             when kafka restarts, previously buffered 
             messages are no longer available and instead it will restart its offset at 0.
             Reading an old offset by BaseConsumer will cause it to think that it
-            is still receving old buffered messages from Kafa but in fact all the messages 
+            is still receiving old buffered messages from Kafa but in fact all the messages 
             since the last shut down of kafka are all gone
         """
         for t in self.kwargs['topics']:

+ 19 - 3
src/comms/ibgw/ib_heartbeat.py

@@ -40,6 +40,7 @@ class IbHeartBeat():
 
     def shutdown(self):
         self.quit = True
+        logging.info('ib_hearbeat: attempting to shutdown....')
         
     def keep_trying(self):
         host = self.kwargs["ib_heartbeat.gateway"]
@@ -49,6 +50,21 @@ class IbHeartBeat():
         suppress_msg_interval = self.kwargs["ib_heartbeat.suppress_msg_interval"]
         logging.info('ib gateway->%s:%d, appid->%d, try_interval->%d, suppress msg interval->%d' % \
                      (host, port, appid, try_interval, suppress_msg_interval))
+        
+        
+        '''''
+            this function breaks up long sleep into smaller intervals
+            to allow a chance to process termination request
+        '''''
+        def smart_sleep(sleep_duration):
+            num_steps = 20
+            short_break = sleep_duration / num_steps
+            for i in range(num_steps):
+                if self.quit:
+                    break
+                else:
+                    sleep(short_break)
+        
         while not self.quit:
             con = ibConnection(host, port, appid)
             rc = con.connect()
@@ -73,10 +89,10 @@ class IbHeartBeat():
                     self.alert_listeners(msg)
                     self.last_broken_time = now 
                     
-                
-            sleep(try_interval)
+               
+            smart_sleep(try_interval)
             
-
+        logging.info('ib_hearbeat: shut down complete...')
 
 
         

+ 16 - 6
src/comms/ibgw/subscription_manager.py

@@ -250,12 +250,22 @@ class SubscriptionManager(BaseMessageListener):
 
     def dump(self):
 
-        logging.info('subscription manager table:---------------------\n')
-        logging.info(''.join ('\n[%s]:[%s]' % (str(ic[0]).rjust(4), ic[1]) for ic in self.get_id_contracts(db=False)))
-        logging.info(''.join ('\n[%s]:[%d]' % (k.rjust(20), self.idContractMap['contract_id'][k]) 
-                               for k in sorted(self.idContractMap['contract_id'])))       
-        logging.info( 'Number of instruments subscribed: %d' % self.idContractMap['next_id'])
-        logging.info( '------------------------------------------------')
+        s = 'subscription manager table:---------------------\n'
+        s = s + ''.join ('\n[%s]:[%s]' % (str(ic[0]).rjust(4), ic[1]) for ic in self.get_id_contracts(db=False))
+        s = s + ''.join ('\n[%s]:[%d]' % (k.rjust(20), self.idContractMap['contract_id'][k]) 
+                               for k in sorted(self.idContractMap['contract_id']))       
+        s = s +  'Number of instruments subscribed: %d' % self.idContractMap['next_id']
+        s = s +  '------------------------------------------------'
+
+        logging.info(s)
+        return s
+
+#         logging.info('subscription manager table:---------------------\n')
+#         logging.info(''.join ('\n[%s]:[%s]' % (str(ic[0]).rjust(4), ic[1]) for ic in self.get_id_contracts(db=False)))
+#         logging.info(''.join ('\n[%s]:[%d]' % (k.rjust(20), self.idContractMap['contract_id'][k]) 
+#                                for k in sorted(self.idContractMap['contract_id'])))       
+#         logging.info( 'Number of instruments subscribed: %d' % self.idContractMap['next_id'])
+#         logging.info( '------------------------------------------------')
 
 
     """

+ 76 - 8
src/comms/ibgw/tws_gateway.py

@@ -17,7 +17,8 @@ from comms.ibgw.tws_event_handler import TWS_event_handler
 from comms.ibgw.ib_heartbeat import IbHeartBeat
 from comms.ibgw.client_request_handler import ClientRequestHandler
 from comms.ibgw.subscription_manager import SubscriptionManager
-from comms.tws_protocol_helper import TWS_Protocol 
+from comms.tws_protocol_helper import TWS_Protocol
+from comms.ibgw.tws_gateway_restapi import WebConsole 
 import redis
 import threading
 from threading import Lock
@@ -69,6 +70,7 @@ class TWS_gateway():
             
             4. initialize listeners: ClientRequestHandler and SubscriptionManager
             5. start the prosumer 
+            6. run web console
         
         '''
 
@@ -120,6 +122,8 @@ class TWS_gateway():
         logging.info('start TWS_event_handler. Start prosumer processing loop...')
         self.gw_message_handler.start_prosumer()
 
+        logging.info('start web console...')
+        self.start_web_console()
 
         logging.info('**** Completed initialization sequence. ****')
         
@@ -146,6 +150,21 @@ class TWS_gateway():
                           (self.kwargs['redis_host'], self.kwargs['redis_port'], self.kwargs['redis_db']))
             logging.error('aborting...')
             sys.exit(-1)
+    
+    
+    def start_web_console(self):
+        
+        def start_flask():
+            w = WebConsole(self)
+            w.add_resource()
+            w.app.run(host=self.kwargs['webconsole.host'], port=self.kwargs['webconsole.port'],
+                      debug=self.kwargs['webconsole.debug'], use_reloader=self.kwargs['webconsole.auto_reload'])
+            
+        t_webApp = threading.Thread(name='Web App', target=start_flask)
+        t_webApp.setDaemon(True)
+        t_webApp.start()
+                
+
             
     def get_redis_conn(self):
         return self.rs
@@ -200,25 +219,74 @@ class TWS_gateway():
         if (self.pcounter >= 8):
             self.contract_subscription_mgr.persist_subscriptions()
            
+   
+    def shutdown_all(self):
+        sleep(1)
+        logging.info('shutdown_all sequence started....')
+        self.gw_message_handler.set_stop()
+        self.gw_message_handler.join()
+        self.ibh.shutdown()
+        self.menu_loop_done = True
+        sys.exit(0)
+        
+
+    def post_shutdown(self):
+        th = threading.Thread(target=self.shutdown_all)
+        th.daemon = True
+        th.start()               
+
         
 
     def main_loop(self):
-        try:
-            logging.info('TWS_gateway:main_loop ***** accepting console input...')
+        def print_menu():
+            menu = {}
+            menu['1']="Dump subscription manager content to log" 
+            menu['2']=""
+            menu['3']="Start up configuration"
+            menu['4']=""
+            menu['9']="Exit"
+    
+            choices=menu.keys()
+            choices.sort()
+            for entry in choices: 
+                print entry, menu[entry]                             
             
+        def get_user_input(selection):
+                logging.info('TWS_gateway:main_loop ***** accepting console input...')
+                print_menu()
+                while 1:
+                    resp = sys.stdin.readline()
+                    response[0] = resp.strip('\n')        
+        try:
             
+            response = [None]
+            user_input_th = threading.Thread(target=get_user_input, args=(response,))
+            user_input_th.daemon = True
+            user_input_th.start()               
             self.pcounter = 0
-            while True: 
+            self.menu_loop_done = False
+            while not self.menu_loop_done: 
                 
                 sleep(.5)
                 self.persist_subscription_table()
-                
+                if response[0] is not None:
+                    selection = response[0]
+                    if selection =='1':
+                        self.contract_subscription_mgr.dump()
+                    elif selection == '3':
+                        print '\n'.join('[%s]:%s' % (k,v) for k,v in self.kwargs.iteritems())
+                    elif selection == '9': 
+                        self.shutdown_all()
+                        sys.exit(0)
+                        break
+                    else: 
+                        pass                        
+                    response[0] = None
+                    print_menu()                
                 
         except (KeyboardInterrupt, SystemExit):
                 logging.error('TWS_gateway: caught user interrupt. Shutting down...')
-                self.gw_message_handler.set_stop()
-                self.gw_message_handler.join()
-                self.ibh.shutdown()
+                self.shutdown_all()
                 logging.info('TWS_gateway: Service shut down complete...')
                 sys.exit(0)        
 

+ 90 - 0
src/comms/ibgw/tws_gateway_restapi.py

@@ -0,0 +1,90 @@
+from flask import Flask
+from flask_restful import Resource, Api, reqparse
+import json
+import threading
+import time
+from cheroot.server import Gateway
+import sys
+
+
+class WebConsole():
+
+    app = Flask(__name__)
+    api = Api(app)
+    parser = reqparse.RequestParser()
+    
+    def __init__(self, parent=None):
+        self.parent = parent
+
+    def get_parent(self):
+        return self.parent
+    
+    def add_resource(self):
+        WebConsole.api.add_resource(Commands, '/')
+        WebConsole.api.add_resource(ExitApp, '/exit', resource_class_kwargs={'gateway_instance': self.parent})
+        WebConsole.api.add_resource(Subscriptions, '/subscriptions', resource_class_kwargs={'gateway_instance': self.parent})
+        WebConsole.api.add_resource(GatewaySettings, '/settings', resource_class_kwargs={'gateway_instance': self.parent})
+        
+
+class Commands(Resource):
+    def get(self):
+        return {'status': True, 'Available REST API' : {'exit': 'shutdown gateway', 
+                                              'subscriptions': 'get a list of subscribed topics',
+                                              'settings': 'get gateway startup settings',
+                                              
+                                              }
+                }
+
+class ExitApp(Resource):
+    def __init__(self, gateway_instance):
+        self.gw = gateway_instance
+        
+    def get(self):
+        self.gw.post_shutdown()
+        return {'status': 'please check the log for exit status'}
+        
+
+class GatewaySettings(Resource):
+    def __init__(self, gateway_instance):
+        self.gw = gateway_instance
+        
+    def get(self):
+        return json.loads(json.dumps(self.gw.kwargs))
+    
+    
+
+    
+
+
+class Subscriptions(Resource):
+    def __init__(self, gateway_instance):
+        self.gw = gateway_instance
+    
+    def get(self):
+        idc = self.gw.contract_subscription_mgr.get_id_contracts()
+        c2id = self.gw.contract_subscription_mgr.idContractMap['contract_id']
+        return {'status': True, 'subscriptions' : {'id2c': idc, 'c2id': c2id}}
+
+
+# def start_flask():
+#     w = WebConsole()
+#     w.add_resource()
+#     w.app.run(debug=True, use_reloader=False)
+# 
+# if __name__ == '__main__':
+#     
+#     
+#     
+#     t_webApp = threading.Thread(name='Web App', target=start_flask)
+#     t_webApp.setDaemon(True)
+#     t_webApp.start()
+#     
+#     try:
+#         while True:
+#             print 'sleeping...'
+#             time.sleep(1)
+#     
+#     except KeyboardInterrupt:
+#         print("exiting")
+#         exit(0)    
+#     

+ 6 - 0
src/config/pm.cfg

@@ -12,3 +12,9 @@ clear_offsets:  False
 logconfig: {'level': logging.INFO, 'filemode': 'w', 'filename': '/tmp/pm.log'}
 topics: ['position', 'positionEnd', 'tickPrice', 'execDetails', 'update_portfolio_account', 'event_tm_request_table_structure', 'event_request_port_summary', AbstractTableModel.EVENT_TM_TABLE_STRUCTURE_CHANGED]
 seek_to_end: ['*']
+
+
+webconsole.host: '0.0.0.0'
+webconsole.port:6001
+webconsole.debug: True
+webconsole.auto_reload: False

+ 52 - 0
src/config/tws_gateway.cfg

@@ -0,0 +1,52 @@
+[tws_gateway]
+name: 'tws_gateway_server'
+#
+# kafka settings
+#
+bootstrap_host: 'vorsprung'
+bootstrap_port: 9092
+#
+# redis persistence
+#
+redis_host: 'localhost'
+redis_port: 6379
+#
+# DIFFERENT REDIS DB for PRODUCTION == 1 ELSE 0!!!
+redis_db: 0
+#
+# TWS gateway settings
+#
+# 7496 - production larry046, 7496 - development,  8496 production mchan927
+#
+tws_host: 'vsu-longhorn'
+tws_api_port: 8496
+tws_app_id: 5567
+#
+#
+#
+group_id: 'TWS_GW'
+session_timeout_ms: 10000
+topics: ['reqAccountUpdates', 'reqOpenOrders', 'reqExecutions', 'reqIds', 'reqNewsBulletins', 'cancelNewsBulletins', 'setServerLogLevel', 'reqAccountSummary', 'reqPositions', 'reqAutoOpenOrders', 'reqAllOpenOrders', 'reqManagedAccts', 'requestFA', 'reqMktData', 'reqHistoricalData', 'placeOrder', 'gw_req_subscriptions']
+clear_offsets: False
+seek_to_end:['reqAccountUpdates', 'reqPositions', 'reqMktData']
+#
+#
+subscription_manager.subscriptions.redis_key: 'subscriptions'
+subscription_manager.topics: ['reqMktData']
+#logconfig: {'filename': '/home/larry-13.04/workspace/finopt/log/tws_gateway.log', 'filemode': 'w','level': logging.INFO}
+logconfig: {'level': logging.INFO,  'filemode': 'w', 'filename':'/tmp/tws_gateway.log'}
+order_transmit: False
+reset_db_subscriptions: False
+#
+#
+ib_heartbeat.ib_port: 8496
+ib_heartbeat.appid.id: 9911
+ib_heartbeat.gateway: 'vsu-longhorn'
+ib_heartbeat.try_interval: 60
+ib_heartbeat.suppress_msg_interval: 120
+#
+#
+webconsole.host: '0.0.0.0'
+webconsole.port:5001
+webconsole.debug: True
+webconsole.auto_reload: False

+ 1 - 1
src/config/tws_gateway_prd.cfg

@@ -3,7 +3,7 @@ name: 'tws_gateway_server'
 #
 # kafka settings
 #
-bootstrap_host: 'vorsprung'
+bootstrap_host: 'vsu-longhorn'
 bootstrap_port: 9092
 #
 # redis persistence

+ 6 - 1
src/config/ws_webserver.cfg

@@ -4,6 +4,7 @@ server.socket_port: 8091
 log.screen: False
 log.error_file': '',
 log.access_file': ''
+logconfig: {'level': logging.INFO,  'filemode': 'w', 'filename':'/tmp/ws_webserver.log'}
 [/]
 tools.sessions.on : True
 tools.staticdir.root : '/home/laxaurus/workspace/finopt/src/'
@@ -15,4 +16,8 @@ tools.staticdir.tmpl : './html'
 
 [/public]
 tools.staticdir.on: True
-tools.staticdir.dir : './html/public'
+tools.staticdir.dir : './html/public'
+
+[/js]
+tools.staticdir.on: True
+tools.staticdir.dir : './html/js'

+ 3 - 2
src/finopt/instrument.py

@@ -42,8 +42,9 @@ class InstrumentIdMap():
                     9031: 'NUMC',
                     9032: 'NUMP',
                     9040: 'T_UNRPL',
-                    9041: 'TPTGAIN'
-    
+                    9041: 'TPTGAIN',
+                    9051: 'AVILFUNDS',
+                    9052: 'T_CASH'
                     
                     }
     

+ 48 - 9
src/html/client_g.html

@@ -1,8 +1,8 @@
 <html>
 <head>
-  <title>Simple client</title>
+  <title>Portfolio Monitor</title>
   
-  <script src="https://ajax.googleapis.com/ajax/libs/jquery/2.1.3/jquery.min.js"></script>
+  <script src="js/ajax-libs-jquery-2.1.3/jquery.min.js"></script>
   <script type="text/javascript" src="https://www.google.com/jsapi"></script>  
 
 </head>
@@ -101,14 +101,16 @@ table.minimalistBlack tfoot td {
 				<th>PL / Potential Gain<div class="container"></th>
 				<th>HSIF Curr / Next Mth</th>
 				<th>change %</th>
+				<th>C/P Gap</th>
+				<th>Funds/Net Cash</th>
 			</tr>
 		</thead>
 		
 		<tbody>
 			<tr>
-				<td>(nan)</td><td>(nan)</td><td>(nan)</td><td>(nan)</td><td>(nan)</td><td>(nan)</td><td>(nan)</td><td>(nan)</td></tr>
+				<td>(nan)</td><td>(nan)</td><td>(nan)</td><td>(nan)</td><td>(nan)</td><td>(nan)</td><td>(nan)</td><td>(nan)</td><td>(nan)</td><td>(nan)</td></tr>
 			<tr>
-				<td>(nan)</td><td>(nan)</td><td>(nan)</td><td>(nan)</td><td>(nan)</td><td>(nan)</td><td>(nan)</td><td>(nan)</td></tr>
+				<td>(nan)</td><td>(nan)</td><td>(nan)</td><td>(nan)</td><td>(nan)</td><td>(nan)</td><td>(nan)</td><td>(nan)</td><td>(nan)</td><td>(nan)</td></tr>
 			<tr>
 		</tbody>
 		</tr>
@@ -257,7 +259,35 @@ table.minimalistBlack tfoot td {
 
     var ws;
     //var ws_url = "ws://localhost:9001/";
-    var ws_url = "ws://"+window.location.hostname + ":9001";
+    
+    //
+    // for external access
+    // note: proxy_wstunnel must be enabled
+    // check route69.conf on vsu-jellyfish for details
+    //
+    //
+	/* 	
+	    <VirtualHost *>
+	    ServerName p3.algometic.com
+		
+		    ProxyRequests Off
+		    ProxyVia Off
+		
+		    <Proxy *>
+		        Require all granted
+		    </Proxy>
+		    ProxyPassMatch ^/(ws(/.*)?)$ ws://vsu-longhorn:9001/$1
+		    ProxyPass / http://vsu-longhorn.lan:8091/
+		    ProxyPassReverse / http://vsu-longhorn.lan:8091/
+		</VirtualHost>
+	 */
+	var ws_url = "ws://"+window.location.hostname + ":9001"; 
+    if (window.location.hostname == 'p3.algometic.com'){
+    	ws_url = "ws://p3.algometic.com/ws";	
+    } 
+	 
+    
+
     var tableChart;
 	
 	  function init() {
@@ -321,7 +351,7 @@ table.minimalistBlack tfoot td {
 			  		tableChart.chart = new google.visualization.Table(document.getElementById('table_div'));
 			  		tableChart.chart.draw(tableChart.view, tableChart.options);
 					output('number of rows: ' + tableChart.data.getNumberOfRows());
-					console.log(d1.value);
+					//console.log(d1.value);
 					
 				} else if (d1.source.class == 'PortfolioColumnChartTM'){
 					columnChart.data = new google.visualization.DataTable(d1.value);
@@ -401,7 +431,7 @@ table.minimalistBlack tfoot td {
 		document.querySelector('.overlay-marker').style.left = Math.floor(cli.getXLocation(spot) - 25) + "px";
 	}
 	  
-    function setPortSummaryTableVal(port_values){
+    function setPortSummaryTableVal(data){
     	var val2cellDict = {
     			9000: [1,0], 
     			9001: [1,1],
@@ -413,15 +443,24 @@ table.minimalistBlack tfoot td {
     			9012: [2,2],
     			9013: [2,3],
     			9032: [2,4],
-    			9041: [2,5]
+    			9041: [2,5],
+    			9051: [1,9],
+    			9052: [2,9]
     					}
     	
     	for(var port_id in val2cellDict) {
 		 	//console.log('in setPortSummaryTableVal ' + String(port_id) + ", ");
-			var num = Number(port_values[port_id]);	
+			var num = Number(data['port_values'][port_id]);
+			if (port_id == 9051 || port_id == 9052) num = num / 1000;
 			num = (isFloat(num)) ? num.toFixed(2) : num;
     		setCellValue('port_sum', val2cellDict[port_id][0], val2cellDict[port_id][1], num);
     	}
+    	var p_gap = data['closest_psc'][1] - data['closest_psc'][0];
+    	var c_gap = data['closest_psc'][2] - data['closest_psc'][1];
+    	var p_gap_percent = p_gap / data['closest_psc'][1] * 100;
+    	var c_gap_percent = c_gap / data['closest_psc'][1] * 100;
+    	setCellValue('port_sum', 2, 8, p_gap.toFixed(0).toString() + ' ( ' + p_gap_percent.toFixed(2).toString() + '%)'); 
+    	setCellValue('port_sum', 1, 8, c_gap.toFixed(0).toString() + ' ( ' + c_gap_percent.toFixed(2).toString() + '%)');    	
     }
     
 

Diff do ficheiro suprimidas por serem muito extensas
+ 1 - 0
src/html/js/ajax-libs-jquery-2.1.3/jquery.min.js


+ 33 - 0
src/html/jtest2.html

@@ -1,7 +1,17 @@
 <!DOCTYPE html>
+<head>
+  <title>Quick js test</title>
+  
+  <script src="js/ajax-libs-jquery-2.1.3/jquery.min.js"></script>
+  <script src="js/circle-progress.js"></script>  
+    
+
+</head>
 <html>
 <body>
 
+<div id="circle"></div>
+<div id="circle2"></div>
 <p id="demo">This is a p element with id="demo".</p>
 
 <p>Click the button to change the text of the p element.</p>
@@ -25,7 +35,30 @@ function myFunction() {
     var arr = JSON.parse(s2);
     document.querySelector("#demo").innerHTML = arr[0][1];
 }
+
+$('#circle').circleProgress({
+    value: 0.75,
+    size: 80,
+    fill: {
+      gradient: ["red", "orange"]
+    }
+  });
+$('#circle2').circleProgress({
+    value: 0.97,
+    size: 40,
+    fill: {
+      gradient: ["blue", "green"]
+    }
+  });
 </script>
 
+
+
+
+
+
+
+
+
 </body>
 </html>

+ 24 - 0
src/misc2/helpers.py

@@ -316,6 +316,29 @@ class HelperFunctions():
     def utf2asc(x):
         return x.encode('ascii') if isinstance(x, unicode) else x
     
+      
+    @staticmethod     
+    def nearest_min(nl, v):
+        min_last = -1
+        min_temp = 9999999
+        for n in nl:
+            if n < v:
+                min_temp = n 
+                if min_last < min_temp:
+                    min_last = min_temp
+        return min_last if min_last != -1 else None
+    
+    @staticmethod    
+    def nearest_max(nl, v):
+        max_last = 99999999 
+        max_temp = -9999999
+        for n in nl:
+            if n > v:
+                max_temp = n 
+                if max_last > max_temp:
+                    max_last = max_temp
+        return max_last if max_last != 9999 else None                
+    
 
 class ConfigMap():
     
@@ -339,6 +362,7 @@ class ConfigMap():
     
 
 
+
 class LoggerNoBaseMessagingFilter(logging.Filter):
     def filter(self, record):
         return not (record.getMessage().startswith('BaseConsumer') or 

+ 18 - 7
src/rethink/portfolio_item.py

@@ -17,7 +17,10 @@ import pandas as pd
 
 
 
+
 class PortfolioRules():
+    FULL_AVAILABLE_FUNDS = 9051    #FullExcessLiquidity
+    TOTAL_CASH_VALUE = 9052         #TotalCashValue
     rule_map = {
                 'symbol': {'HSI' : 'FUT', 'MHI' : 'FUT', 'QQQ' : 'STK'},
                 'expiry': {'HSI' : 'same_month', 'MHI': 'same_month', 'STK': 'leave_blank'},
@@ -27,7 +30,9 @@ class PortfolioRules():
                                         
                                     },
                 'exchange': {'HSI': 'HKFE', 'MHI': 'HKFE'},              
-                'interested_position_types': {'symbol': ['HSI', 'MHI'], 'instrument_type': ['OPT', 'FUT']}
+                'interested_position_types': {'symbol': ['HSI', 'MHI'], 'instrument_type': ['OPT', 'FUT']},
+                'interested_port_acct_keys': {'FullAvailableFunds': FULL_AVAILABLE_FUNDS, 
+                                              'TotalCashValue': TOTAL_CASH_VALUE}
 
                } 
     
@@ -191,7 +196,7 @@ class PortfolioItem():
                                 
                 pos_theta = 0
                 gamma_percent = 0
-		potential_gain = 0
+                potential_gain = 0
 
                 # (S - X) * pos * multiplier
                 unreal_pl = (spot_px * multiplier - self.get_average_cost() ) * qty 
@@ -260,6 +265,7 @@ class Portfolio(AbstractTableModel):
     NUM_PUTS       = 9032
     TOTAL_GAIN_LOSS = 9040
     TOTAL_POTENTIAL_GAIN = 9041
+
     
      
     
@@ -276,8 +282,8 @@ class Portfolio(AbstractTableModel):
     
     def get_strikes(self):
         strikes = []
-        for x in self.port['port_items'].iteritems():
-            strikes.append(x)
+        for k, v in self.port['port_items'].iteritems():
+            strikes.append(v.get_strike())
         
         return strikes
     
@@ -300,6 +306,7 @@ class Portfolio(AbstractTableModel):
         self.port = {}
         self.port['port_items']=  {}
         self.port['opt_chains']=  {}
+        self.port['port_v']= {}
         
         
         self.port['g_table']=  {'row_index': 0, 'ckey_to_row_index': {}, 'row_to_ckey_index': {}}
@@ -338,7 +345,6 @@ class Portfolio(AbstractTableModel):
         except:
             logging.error('PortfolioItem:calculate_item_pl *** ERROR: port a/c [%s], contract_key [%s]' % (self.get_account(), contract_key))
 
-    
         
     def calculate_port_pl(self):
 
@@ -404,8 +410,13 @@ class Portfolio(AbstractTableModel):
                 
                 
             
-        map(cal_port, p2_items)            
-        self.port['port_v'] = port_v 
+        map(cal_port, p2_items)      
+        # 2019.3
+        # use update instead of assignment as this
+        # dict has been created and 
+        # its keys may have been updated by others      
+        #self.port['port_v'] = port_v
+        self.port['port_v'].update(port_v) 
         return self.port['port_v']
     
     

+ 99 - 15
src/rethink/portfolio_monitor.py

@@ -12,7 +12,7 @@ from dateutil.relativedelta import relativedelta
 from ib.ext.Execution import Execution
 from ib.ext.ExecutionFilter import ExecutionFilter
 from finopt import optcal
-from misc2.helpers import ContractHelper, LoggerNoBaseMessagingFilter, ExecutionFilterHelper, ConfigMap 
+from misc2.helpers import ContractHelper, LoggerNoBaseMessagingFilter, ExecutionFilterHelper, ConfigMap, HelperFunctions
 from finopt.instrument import Symbol, Option, InstrumentIdMap, ExecFill
 from rethink.option_chain import OptionsChain
 from rethink.tick_datastore import TickDataStore
@@ -20,6 +20,8 @@ from rethink.portfolio_item import PortfolioItem, PortfolioRules, Portfolio, Por
 from rethink.portfolio_column_chart import PortfolioColumnChart,PortfolioColumnChartTM
 from rethink.table_model import AbstractTableModel, AbstractPortfolioTableModelListener
 from comms.ibc.tws_client_lib import TWS_client_manager, AbstractGatewayListener
+from rethink.portfolio_monitor_restapi import WebConsole
+
 #from pip._internal.req.constructors import deduce_helpful_msg
 
 
@@ -39,7 +41,7 @@ class PortfolioMonitor(AbstractGatewayListener, AbstractPortfolioTableModelListe
         self.tds = TickDataStore(kwargs['name'])
         self.tds.register_listener(self)
         self.twsc.add_listener_topics(self, kwargs['topics'])
-        
+        self.menu_loop_done = False 
         
         
         
@@ -62,10 +64,65 @@ class PortfolioMonitor(AbstractGatewayListener, AbstractPortfolioTableModelListe
         self.trades = {}
         
     
+    
+
+    def start_web_console(self):
+        
+        def start_flask():
+            w = WebConsole(self)
+            w.add_resource()
+            w.app.run(host=self.kwargs['webconsole.host'], port=self.kwargs['webconsole.port'],
+                      debug=self.kwargs['webconsole.debug'], use_reloader=self.kwargs['webconsole.auto_reload'])
+            
+        t_webApp = threading.Thread(name='Web App', target=start_flask)
+        t_webApp.setDaemon(True)
+        t_webApp.start()    
+
+    
+    def shutdown_all(self):
+        sleep(1)
+        logging.info('shutdown_all sequence started....')
+        self.twsc.gw_message_handler.set_stop()
+        self.menu_loop_done = True
+        sys.exit(0)
+        
+
+    def post_shutdown(self):
+        th = threading.Thread(target=self.shutdown_all)
+        th.daemon = True
+        th.start()           
+    
+    
+    def reqAllAcountUpdates(self):
+        for acct in self.portfolios.keys():
+            self.twsc.reqAccountUpdates(True, acct)
+
+    def re_request_market_data(self):
+        port_contracts = {}                
+        for acct in self.portfolios.keys():
+            def re_request_mkt(port_item):
+                key = ContractHelper.makeRedisKey(port_item.get_instrument().get_contract())
+                logging.info('PortfolioMontior re_request_mkt: [%s]' % key)
+                self.twsc.reqMktData(port_item.get_instrument().get_contract(), True)
+                return key
+            port_contracts[acct] = map(re_request_mkt, self.portfolios[acct].port['port_items'].values())
+        return port_contracts
+                
     def start_engine(self):
         self.twsc.start_manager()
-        self.twsc.reqPositions()
- 
+        
+        def delay_reqpos():
+            self.twsc.reqPositions()
+            sleep(2)
+            self.twsc.reqPositions()
+            sleep(2)
+            self.reqAllAcountUpdates() 
+            
+        th = threading.Thread(target=delay_reqpos)
+        th.daemon = True
+        th.start()             
+        
+        self.start_web_console()
      
         try:
             def print_menu():
@@ -79,6 +136,7 @@ class PortfolioMonitor(AbstractGatewayListener, AbstractPortfolioTableModelListe
                 menu['7']="Position Distribution JSON"
                 menu['8']="Update TDS table entries by inputting '8 <key> <field> <value>'"
                 menu['a']="request exeutions"
+                menu['b']="re-request market data"
                 menu['9']="Exit"
         
                 choices=menu.keys()
@@ -99,13 +157,13 @@ class PortfolioMonitor(AbstractGatewayListener, AbstractPortfolioTableModelListe
             user_input_th = threading.Thread(target=get_user_input, args=(response,))
             user_input_th.daemon = True
             user_input_th.start()               
-            while True:
+            while not self.menu_loop_done: 
                 sleep(0.4)
                 
                 if response[0] is not None:
                     selection = response[0]
-		    if selection =='':
-			continue
+                    if selection =='':
+                        continue
                     if selection =='1':
                         self.twsc.reqPositions()
                     elif selection == '2': 
@@ -118,9 +176,8 @@ class PortfolioMonitor(AbstractGatewayListener, AbstractPortfolioTableModelListe
                     elif selection == '3': 
                         
                         print self.tds.dump()
-                    elif selection == '4': 
-                        for acct in self.portfolios.keys():
-                            self.twsc.reqAccountUpdates(True, acct)
+                    elif selection == '4':
+                        self.reqAllAcountUpdates() 
                     elif selection == '5':
                         for port in self.portfolios.values():
                             print port.get_JSON() 
@@ -131,7 +188,7 @@ class PortfolioMonitor(AbstractGatewayListener, AbstractPortfolioTableModelListe
                         for acct in self.portfolios.keys():
                             pc = PortfolioColumnChart(self.portfolios[acct])
                             print pc.get_JSON()
-                            print pc.get_xy_array()
+                            #print pc.get_xy_array()
                     elif selection[0] == '8':
                         try:
                             params = selection.split(' ')
@@ -151,6 +208,8 @@ class PortfolioMonitor(AbstractGatewayListener, AbstractPortfolioTableModelListe
                         past =  today + relativedelta(months=-1)
                         exec_filter = ExecutionFilterHelper.kv2object({'m_time': past.strftime('%Y%m%d %H:%M:%S')}, ExecutionFilter)
                         self.twsc.reqExecutions(exec_filter)
+                    elif selection == 'b':
+                        self.re_request_market_data()
                     elif selection == '9': 
                         self.twsc.gw_message_handler.set_stop()
                         break
@@ -491,9 +550,13 @@ class PortfolioMonitor(AbstractGatewayListener, AbstractPortfolioTableModelListe
                 
     def updateAccountValue(self, event, key, value, currency, account):  # key, value, currency, accountName):
         self.raw_dump(event, vars())
+        if key in PortfolioRules.rule_map['interested_port_acct_keys']:
+            logging.info('PortfolioMonitor:updateAccountValue acct:[%s] %s=>%s' % (account, key, value))
+            self.get_portfolio(account).port['port_v'][PortfolioRules.rule_map['interested_port_acct_keys'][key]] = value
+            
  
     def updatePortfolio(self, event, contract_key, position, market_price, market_value, average_cost, unrealized_PNL, realized_PNL, account):
-        self.raw_dump(event, vars())
+        #self.raw_dump(event, vars())
         if self.is_interested_contract_type(contract_key):
             
             self.process_position(account, contract_key, position, average_cost, 
@@ -501,7 +564,7 @@ class PortfolioMonitor(AbstractGatewayListener, AbstractPortfolioTableModelListe
         
             
     def updateAccountTime(self, event, timestamp):
-        self.raw_dump(event, vars())
+        #self.raw_dump(event, vars())
         logging.info('PortfolioMonitor:updateAccountTime %s' % timestamp)
         for port in self.portfolios.values():
             port.calculate_port_pl()
@@ -588,14 +651,35 @@ class PortfolioMonitor(AbstractGatewayListener, AbstractPortfolioTableModelListe
         except:
             logging.error('PortfolioMonitor:event_request_port_summary failed to request port summary for [%s]' % account)
     
+    
+    def find_closest_pos_near_spot(self, account):
+        unpx = []
+        for oc in self.portfolios[account].get_option_chains():
+            px = self.portfolios[account].get_option_chain(oc).get_underlying().get_tick_value(Symbol.LAST)
+            if px == None:
+                px = self.portfolios[account].get_option_chain(oc).get_underlying().get_tick_value(Symbol.CLOSE)
+            if px != None:
+                unpx.append(px)
+        avg_spot = reduce(lambda x,y:x+y, unpx) / len(unpx)
+        pi = filter(lambda x: x.get_quantity() < 0, self.portfolios[account].get_portfolio_port_items().values())
+        strikes = map(lambda x: x.get_strike(), pi)
+        
+        logging.info('find_closest_pos_near_spot: avg_spot %d strikes: %s' % (avg_spot, strikes))
+        max_x = HelperFunctions.nearest_max(strikes, avg_spot)
+        min_x = HelperFunctions.nearest_min(strikes, avg_spot)
+        return [min_x, avg_spot, max_x]
+        
     def notify_port_values_updated(self, account, port):
         try:
+                
+            put_spot_call = self.find_closest_pos_near_spot(account)
+            logging.info('notify_port_values_updated: closest put call pair relative to current spot %s' % put_spot_call)
             self.get_kproducer().send_message(PortfolioMonitor.EVENT_PORT_VALUES_UPDATED,
                                                json.dumps({'account': account,
-                                                     'port_values': self.portfolios[account].get_potfolio_values()}),
+                                                     'data': {'port_values': self.portfolios[account].get_potfolio_values(), 'closest_psc': put_spot_call}}),
                                                )
             logging.info('**** PortfolioMonitor:notify_port_values_updated. %s' % json.dumps({'account': account,
-                                                     'port_values': self.portfolios[account].get_potfolio_values()}) )
+                                                     'data': {'port_values': self.portfolios[account].get_potfolio_values(), 'closest_psc': put_spot_call}}) )
         except:
             logging.error('**** Error PortfolioMonitor:notify_port_values_updated. %s' % traceback.format_exc() )
         

+ 108 - 0
src/rethink/portfolio_monitor_restapi.py

@@ -0,0 +1,108 @@
+from flask import Flask
+from flask_restful import Resource, Api, reqparse
+import json
+import threading
+import time
+import sys
+from misc2.helpers import ContractHelper
+import simplejson
+
+class WebConsole():
+
+    app = Flask(__name__)
+    api = Api(app)
+    parser = reqparse.RequestParser()
+    
+    def __init__(self, parent=None):
+        self.parent = parent
+
+    def get_parent(self):
+        return self.parent
+    
+    def add_resource(self):
+        WebConsole.api.add_resource(Commands, '/')
+        WebConsole.api.add_resource(ExitApp, '/exit', resource_class_kwargs={'pm_instance': self.parent})
+        WebConsole.api.add_resource(RequestPosition, '/reqPosition', resource_class_kwargs={'pm_instance': self.parent})
+        WebConsole.api.add_resource(RequestAccountUpdates, '/reqAccountUpdates', resource_class_kwargs={'pm_instance': self.parent})
+        WebConsole.api.add_resource(GetTDS, '/TDS', resource_class_kwargs={'pm_instance': self.parent})        
+        WebConsole.api.add_resource(GetPortfolio, '/Portfolio', resource_class_kwargs={'pm_instance': self.parent})
+        WebConsole.api.add_resource(RerequestMarketData, '/marketdata', resource_class_kwargs={'pm_instance': self.parent})        
+        
+
+class Commands(Resource):
+    def get(self):
+        return {'status': True, 'Available REST API' : {'exit': 'shutdown PM', 
+                                              'reqPosition': 'ask gateway to retrieve position from IB',
+                                              'reqAccountUpdates': 'ask gateway to retrieve account updates from IB',
+                                              'TDS': 'get TDS',
+                                              'Portfolio' : 'display portfolio positions',
+                                              'marketdata': 're-request market data'
+                                              }
+                }
+
+class ExitApp(Resource):
+    def __init__(self, pm_instance):
+        self.pm = pm_instance
+        
+    def get(self):
+        self.pm.post_shutdown()
+        return {'status': 'please check the log for exit status'}
+        
+
+class RequestPosition(Resource):
+    def __init__(self, pm_instance):
+        self.pm = pm_instance
+        
+    def get(self):
+        self.pm.twsc.reqPositions()
+        return {'status': True}
+    
+class RequestAccountUpdates(Resource):
+    def __init__(self, pm_instance):
+        self.pm = pm_instance
+        
+    def get(self):
+        self.pm.reqAllAcountUpdates()
+        return {'status': True}
+
+class RerequestMarketData(Resource):
+    def __init__(self, pm_instance):
+        self.pm = pm_instance
+        
+    def get(self):
+        return {'status': True, 'marketdata': json.loads(json.dumps(self.pm.re_request_market_data()))}
+    
+
+class GetTDS(Resource):
+    def __init__(self, pm_instance):
+        self.pm = pm_instance
+        
+    def get(self):
+        sl = map(lambda x: [ContractHelper.contract2kv(x['syms'][0].get_contract()), 
+                            x['syms'][0].get_tick_values(), x['syms'][0].get_extra_attributes()], self.pm.tds.symbols.values())
+        return json.loads(simplejson.dumps({'status': True, 'symbols': sl}, ignore_nan=True))
+
+
+class GetPortfolio(Resource):
+    def __init__(self, pm_instance):
+        self.pm = pm_instance
+        
+    def get(self):
+        try:
+            def format_port_data(x):
+                    return {'key': x[0], 'port_fields': x[1].get_port_fields(), 
+                            'tick_fields': x[1].get_instrument().get_tick_values()}
+            
+            
+            ports = {}
+            for p in self.pm.portfolios.values():
+                p.calculate_port_pl()
+                data = map(format_port_data, [x for x in p.port['port_items'].iteritems()])
+                ports[p.get_account()] = {'port_items': data, 'port_summary': p.port['port_v']}
+        except:
+            return {'status': False}
+        return json.loads(simplejson.dumps({'status': True, 'ports': ports}, ignore_nan=True))
+
+
+
+#     

+ 3 - 3
src/ws/ws_server.py

@@ -221,14 +221,14 @@ class PortfolioTableModelListener(BaseMessageListener):
             
 
     
-    def event_port_values_updated(self, event, account, port_values):
+    def event_port_values_updated(self, event, account, data):
         
-        logging.info("[%s] received %s from %s. content:[%s]" % (self.name, event, account, json.dumps(port_values)))
+        logging.info("[%s] received %s from %s. content:[%s]" % (self.name, event, account, json.dumps(data)))
         
         # broadcast to all subscribed clients
 
         self.mwss.get_server().send_message_to_all( 
-                                        json.dumps({'event': event, 'value': port_values, 'account': account})) 
+                                        json.dumps({'event': event, 'value': data, 'account': account})) 
 
 
 class MainWebSocketServer(BaseWebSocketServerWrapper, AbstractGatewayListener):

+ 16 - 3
src/ws/ws_webserver.py

@@ -4,6 +4,7 @@ import logging
 import cherrypy
 import ConfigParser
 import thread
+import requests
 
 
 class PortalServer(object):
@@ -24,7 +25,19 @@ class PortalServer(object):
         return self.ws()
     
  
- 
+    @cherrypy.expose
+    def expr(self):
+        html = '%s%s/jtest2.html' % (cherrypy.request.app.config['/']['tools.staticdir.root'], cherrypy.request.app.config['/static']['tools.staticdir.tmpl'])
+        f = open(html)
+        return f.read()
+
+
+    @cherrypy.expose
+    def rest(self):
+        url = 'http://localhost:6001/TDS'
+        response = requests.get(url)
+        return response.text
+        
     
     @cherrypy.expose
     def ws(self):
@@ -69,11 +82,11 @@ if __name__ == '__main__':
     if len(config.read(cfg_path)) == 0:      
         raise ValueError, "Failed to open config file" 
     
-    logconfig = eval(config.get("opt_serve", "opt_serve.logconfig").strip('"').strip("'"))
+    logconfig = eval(config.get("global", "logconfig").strip('"').strip("'"))
     logconfig['format'] = '%(asctime)s %(levelname)-8s %(message)s'
     logging.basicConfig(**logconfig)            
 
 
-    cherrypy.quickstart(PortalServer(config), '/', cfg_path[0])
+    cherrypy.quickstart(PortalServer(config, None), '/', cfg_path[0])
     
    

Alguns ficheiros não foram mostrados porque muitos ficheiros mudaram neste diff