Bläddra i källkod

starbucks thursday release

fix exception handling to trap all errors in apiv2
validation logic to check limit order with zero price
implement function to return system error logs and connection status
handle the case when ws server attempts to send data to a broken client
laxaurus 6 år sedan
förälder
incheckning
e84f79d78e

+ 6 - 6
src/comms/ibgw/subscription_manager.py

@@ -72,13 +72,13 @@ class SubscriptionManager(BaseMessageListener):
         '''
         '''
             the function retrieves a json string representation of a list of {id:contracts}
             the function retrieves a json string representation of a list of {id:contracts}
             from redis.
             from redis.
-            next, get rid of the contracts that are expired and of type of either fut or opt
-            next, rebuild the internal dict idContractMap['id_contract'] and reverse dict
+            next it gets rid of fut and opt contracts that have expired
+            next, it rebuilds the internal dict idContractMap['id_contract'] and reverse dict
             idContractMap['contract_id']
             idContractMap['contract_id']
-            gather all the ids in the newly populated dict (which may contain holes due to
-            expired contracts and thus not necessarily a sequence), determine the max id
-            add 1 to it to form the next_id
-            request snapshot and fresh market data from the TWS gateway
+            it then gathers all the ids in the newly populated dict (which may contain holes due to
+            expired contracts) and determine the max id
+            add 1 to it to get the next_id
+            request snapshot and new market data from the TWS gateway
             
             
         '''
         '''
         def is_outstanding(ic):
         def is_outstanding(ic):

+ 3 - 0
src/comms/ibgw/tws_gateway.py

@@ -222,6 +222,9 @@ class TWS_gateway():
 
 
     def get_config(self):
     def get_config(self):
         return self.kwargs
         return self.kwargs
+    
+    def get_ib_conn_status(self):
+        return self.ib_conn_status
 
 
     def connect_tws(self):
     def connect_tws(self):
         if type(self.kwargs['tws_app_id']) <> int:
         if type(self.kwargs['tws_app_id']) <> int:

+ 26 - 2
src/comms/ibgw/tws_gateway_restapi.py

@@ -1,11 +1,12 @@
 from flask import Flask, jsonify
 from flask import Flask, jsonify
 
 
 import json
 import json
+import redis
 import threading
 import threading
 from time import sleep
 from time import sleep
 from misc2.observer import Subscriber
 from misc2.observer import Subscriber
 from flask_restful import Resource, Api, reqparse
 from flask_restful import Resource, Api, reqparse
-
+from datetime import datetime
 import traceback
 import traceback
 from ormdapi.v1 import apiv1
 from ormdapi.v1 import apiv1
 from ormdapi.v2 import apiv2
 from ormdapi.v2 import apiv2
@@ -16,6 +17,8 @@ import logging
 
 
 class WebConsole(Subscriber):
 class WebConsole(Subscriber):
 
 
+    TWS_LOG_REDIS_HANDLE = 'api_log'
+    TWS_LOG_DEFAULT_RETRIEVE_SIZE = 100
     
     
     app = Flask(__name__)
     app = Flask(__name__)
     api = Api(app)
     api = Api(app)
@@ -37,6 +40,12 @@ class WebConsole(Subscriber):
         except KeyError:
         except KeyError:
             logging.error('Webconsole: fail to get access token for telegram bot. ') 
             logging.error('Webconsole: fail to get access token for telegram bot. ') 
         self.message_sink.start()
         self.message_sink.start()
+        
+        '''
+            create a dedicated rs for rest api use
+        '''
+        kwargs = self.parent.kwargs
+        self.rs = redis.Redis(kwargs['redis_host'], kwargs['redis_port'], kwargs['redis_db'])
 
 
     def get_parent(self):
     def get_parent(self):
         return self.parent
         return self.parent
@@ -61,7 +70,8 @@ class WebConsole(Subscriber):
         WebConsole.api.add_resource(apiv2.OpenOrdersStatus_v2, '/v2/open_orders', resource_class_kwargs={'webconsole': self})
         WebConsole.api.add_resource(apiv2.OpenOrdersStatus_v2, '/v2/open_orders', resource_class_kwargs={'webconsole': self})
         WebConsole.api.add_resource(apiv2.QuoteRequest_v2, '/v2/quote', resource_class_kwargs={'webconsole': self})
         WebConsole.api.add_resource(apiv2.QuoteRequest_v2, '/v2/quote', resource_class_kwargs={'webconsole': self})
         WebConsole.api.add_resource(apiv2.AcctPosition_v2, '/v2/position', resource_class_kwargs={'webconsole': self})
         WebConsole.api.add_resource(apiv2.AcctPosition_v2, '/v2/position', resource_class_kwargs={'webconsole': self})
-
+        WebConsole.api.add_resource(apiv2.SystemStatus, '/v2/system', resource_class_kwargs={'webconsole': self})
+        
 
 
     def set_stop(self):
     def set_stop(self):
         self.message_sink.set_stop()
         self.message_sink.set_stop()
@@ -80,3 +90,17 @@ class WebConsole(Subscriber):
             print ('webconsole override %s: %s %s %s' % (self.name, event, "<empty param>" if not param else param,
             print ('webconsole override %s: %s %s %s' % (self.name, event, "<empty param>" if not param else param,
                                           
                                           
                                          '<none>' if not param else param.__class__))
                                          '<none>' if not param else param.__class__))
+            '''
+                insert log into redis using lpush 
+                last-in-first-out 
+            '''
+            param.update({'ts':datetime.today().strftime('%Y%m%d %H:%M:%S')})
+            self.rs.lpush(WebConsole.TWS_LOG_REDIS_HANDLE, param)
+            
+    def retrieve_logs(self, num_lines=None):
+        num_lines = num_lines if num_lines <> None else WebConsole.TWS_LOG_DEFAULT_RETRIEVE_SIZE
+        len = min(self.rs.llen(WebConsole.TWS_LOG_REDIS_HANDLE), num_lines)
+        return self.rs.lrange(WebConsole.TWS_LOG_REDIS_HANDLE, 0, len - 1)
+         
+        
+            

+ 2 - 0
src/misc2/helpers.py

@@ -55,6 +55,8 @@ class OrderHelper(BaseHelper):
         except ValueError:
         except ValueError:
             raise OrderValidationException("price or quantity must be a numeric value")
             raise OrderValidationException("price or quantity must be a numeric value")
         
         
+        if price == 0:
+            raise OrderValidationException("price must not be zero")
         return True
         return True
         
         
         
         

+ 0 - 5
src/ormdapi/v2/api_utilities.py

@@ -2,12 +2,7 @@
 # -*- coding: utf-8 -*-
 # -*- coding: utf-8 -*-
 import logging
 import logging
 from misc2.observer import Publisher, Subscriber
 from misc2.observer import Publisher, Subscriber
-import sys, traceback
 import json
 import json
-from time import sleep
-from misc2.helpers import ContractHelper
-from misc2.observer import Publisher
-from ib.ext.Contract import Contract
 from Queue import Queue
 from Queue import Queue
 import threading
 import threading
 from datetime import datetime
 from datetime import datetime

+ 37 - 1
src/ormdapi/v2/apiv2.py

@@ -239,8 +239,10 @@ class SyncOrderCRUD_v2(Resource):
         
         
         except OrderValidationException as e:
         except OrderValidationException as e:
             return {'error': e.args[0]}, 409
             return {'error': e.args[0]}, 409
-        except ValueError:
+        except:
             return {'error': 'check the format of the order message! %s' % traceback.format_exc()}, 409
             return {'error': 'check the format of the order message! %s' % traceback.format_exc()}, 409
+        
+            
     
     
     def get(self):
     def get(self):
         pass
         pass
@@ -383,5 +385,39 @@ class AcctPosition_v2(Resource, Publisher):
         except:
         except:
             
             
             return {'error': 'AcctPosition_v2: %s' % traceback.format_exc()}, 409
             return {'error': 'AcctPosition_v2: %s' % traceback.format_exc()}, 409
+   
+   
+   
+class SystemStatus(Resource):        
+
+    def __init__(self, webconsole):
+        self.wc = webconsole
+        
+        
+
+    def get(self):
+        parser = reqparse.RequestParser()
+        parser.add_argument('last_error', required=False)
+        parser.add_argument('num_lines', required=False)
+        args = parser.parse_args()
+        try:        
+            if args['last_error'] != None:
+                try:
+                    nl = int(args['num_lines'])
+                except:
+                    nl = None
+                return self.wc.retrieve_logs(nl), 200
+                
+            else:
+                '''
+                    return connectivity status
+                '''
+                return {'TWS connection status: [%s]': 'Connected' if self.wc.get_parent().get_ib_conn_status() else 'Disconnected. Wait for retry...'}, 200
+
+        except:
+            
+            return {'error': 'SystemStatus: %s' % traceback.format_exc()}, 404
+        
+        return self.wc.retrieve_logs(), 200
         
         
         
         

+ 6 - 0
src/ws/ws_server.py

@@ -206,6 +206,12 @@ class PortfolioTableModelListener(BaseMessageListener):
         except KeyError:
         except KeyError:
             self.simple_caching[row] = {'count': 1, 'ts': curr_ts, 'row_values': row_values}
             self.simple_caching[row] = {'count': 1, 'ts': curr_ts, 'row_values': row_values}
             notify_client()    
             notify_client()    
+        except:
+            '''
+                trap cases when notify_client fails, probably due to the other side
+                closed the connection but the server still thinks that the connection still alives
+            '''
+            logging.error('PortfolioTableModelListener: %s' % traceback.format_exc())
     
     
     def event_tm_table_structure_changed(self, event, source, origin_request_id, account, data_table_json):
     def event_tm_table_structure_changed(self, event, source, origin_request_id, account, data_table_json):
         try:
         try: