Sfoglia il codice sorgente

streaming callbacks

add support for API order/quote websocket streaming
laxaurus 6 anni fa
parent
commit
a6f2e40599

+ 8 - 0
src/comms/ibgw/subscription_manager.py

@@ -62,6 +62,14 @@ class SubscriptionManager(BaseMessageListener):
 
         
         raise
+    
+    # this routine is used by REST_API quote_handler ONLY
+    def get_contract_key_by_id(self, id):
+        try:
+            return self.idContractMap['id_contract'][id]
+        except:
+            return None
+        
             
     def reset_subscriptions(self, reset_db):
         if reset_db:

+ 10 - 3
src/comms/ibgw/tws_gateway.py

@@ -9,8 +9,7 @@ import json
 
 from ib.ext.Contract import Contract
 from ib.ext.EClientSocket import EClientSocket
-
-from misc2.helpers import ContractHelper, ConfigMap
+from misc2.helpers import ContractHelper, ConfigMap, LoggerNoBaseMessagingFilter  
 from optparse import OptionParser
 from comms.ibgw.base_messaging import Prosumer
 from comms.ibgw.tws_event_handler import TWS_event_handler
@@ -23,6 +22,7 @@ from comms.ibgw.order_manager import OrderManager
 from ormdapi.v2.quote_handler import QuoteRESTHandler
 from ormdapi.v2.position_handler import AccountPositionTracker
 from ormdapi.v2.contract_handler import ContractHandler
+from ormdapi.v2.ws.ws_api_server import ApiSocketServer
 import redis
 import threading
 from threading import Lock
@@ -166,6 +166,10 @@ class TWS_gateway():
         self.quote_manager = QuoteRESTHandler('quote_manager', self)
         self.pos_manager = AccountPositionTracker('acctpos_manager', self)
         self.contract_info_manager = ContractHandler('contract_info_mgr', self)
+        self.ws_manager = ApiSocketServer('api_ws_manager', self, self.kwargs['restapi.web_socket_host'], self.kwargs['restapi.web_socket_port'])
+        t = threading.Thread(name='websocket server', target=self.ws_manager.run_forever)
+        t.setDaemon(True)
+        t.start()
         
     def initialize_redis(self):
 
@@ -222,6 +226,9 @@ class TWS_gateway():
     def get_contract_info_manager(self):
         return self.contract_info_manager
     
+    def get_ws_manager(self):
+        return self.ws_manager
+    
     def get_redis_conn(self):
         return self.rs
 
@@ -387,7 +394,7 @@ if __name__ == '__main__':
     logconfig = kwargs['logconfig']
     logconfig['format'] = '%(asctime)s %(levelname)-8s %(message)s'    
     logging.basicConfig(**logconfig)        
-
+    logging.getLogger().addFilter(LoggerNoBaseMessagingFilter())
     logging.info('config settings: %s' % kwargs)
     
     app = TWS_gateway(kwargs)

+ 1 - 1
src/config/pm.cfg

@@ -4,7 +4,7 @@ bootstrap_host: 'localhost'
 bootstrap_port: 9092
 redis_host: 'localhost'
 redis_port: 8379
-redis_db: 2
+redis_db: 1
 tws_host: 'localhost'
 group_id: 'PM'
 session_timeout_ms: 10000

+ 7 - 5
src/config/tws_gateway.cfg

@@ -13,15 +13,16 @@ redis_port: 8379
 #
 # DIFFERENT REDIS DB for PRODUCTION == 1 ELSE 0!!!
 redis_db: 2
+#redis_db: 1
 #
 # TWS gateway settings
 #
 # paper trade - 7497
 #
-#tws_host: 'localhost'
-#tws_api_port: 7497
-tws_host: 'vsu-longhorn'
-tws_api_port: 8496
+tws_host: 'localhost'
+tws_api_port: 7497
+#tws_host: 'vsu-longhorn'
+#tws_api_port: 8496
 tws_app_id: 6567
 #
 #
@@ -56,5 +57,6 @@ webconsole.auto_reload: False
 #
 restapi.list_label: 'api_log'
 #restapi.telegram_tok: '870564010:AAFxQa7-WFe2JjTP3KLoyY77NW90smTrnig'
-
+restapi.web_socket_host: 'localhost'
+restapi.web_socket_port: 9005
 

+ 2 - 2
src/config/ws.cfg

@@ -1,10 +1,10 @@
 [webSocketServer]
 name: 'WebSocketServer'
-bootstrap_host: 'vorsprung'
+bootstrap_host: 'localhost'
 bootstrap_port: 9092
 redis_host: 'localhost'
 redis_port: 6379
-redis_db: 0
+redis_db: 1
 group_id: 'WS'
 session_timeout_ms: 10000
 clear_offsets:  False

+ 143 - 143
src/ormdapi/v2/api_utilities.py

@@ -124,149 +124,149 @@ class TelegramApiMessageAlert(BaseApiMessageSubscribe):
         self.send_message('[%s%s] %s' % (ts, source, message))
 
 
-from websocket_server import WebsocketServer
-from ws.ws_server import BaseWebSocketServerWrapper
-
-class APIWebSocketServer(BaseWebSocketServerWrapper, Subscriber):
-    
-    def __init__(self, name, kwargs):
-        
-        BaseWebSocketServerWrapper.__init__(self, name, kwargs)
-        self.clients = {}
-
-    def loop_forever(self):
-        pass
-        
-    def encode_message(self, event_type, content):        
-        return json.dumps({'event': event_type, 'value': content})
-            
-    def new_client(self, client, server):
-        BaseWebSocketServerWrapper.new_client(self, client, server)
-        self.clients[client['id']] = client
-        self.clients[client['id']]['request'] = {}
-        
-        
-        server.send_message(client, '%s' % client)
-    
-    # Called for every client disconnecting
-    def client_left(self, client, server):
-        BaseWebSocketServerWrapper.client_left(self, client, server)
-        del self.clients[client['id']]
-        
-   
-    # Called when a client sends a message1
-    def message_received(self, client, server, message):
-        BaseWebSocketServerWrapper.message_received(self, client, server, message)
-        print '%s %s %s' % (client, server, message)
-        
-
-    def handle_tws_event(self, **param):
-        print "APIWebSocketServer received tws events forwarded by gw"
-            
-    def update(self, event, **param): 
-        if event == 'orderStatus':
-            self.handle_tws_event(**param)
-        elif event == 'openOrder':
-            self.handle_tws_event(**param)
-        elif event == 'openOrderEnd':
-            self.handle_tws_event(**param)
-        elif event == 'error':
-            try:
-                id = param['id']
-                if id <> -1:
-                    self.handle_event(**param)
-            except:
-                logging.error('OrderBook ERROR: in processing tws error event %s' % param)
-                return
-            
-            
-
-import websocket
-import thread, time
-from threading import Thread
-class APIClient():
- 
-    def __init__(self):
-         
-        self.ws = websocket.WebSocketApp("ws://localhost:9001",
-                                    on_message = self.on_message,
-                                    on_error = self.on_error,
-                                    on_close = self.on_close,
-                                    on_open = self.on_open)
-        #self.ws.on_open = self.on_open
-        self.stop = False
-        
-
-
-    def run_forever(self):
-        self.ws.run_forever()
-         
-    def on_message(self, message):
-        print 'apiclient recv %s' % message
-     
-    def on_error(self, error):
-        print error
-     
-    def on_close(self):
-        print "### closed ###"
-     
-     
-    def set_stop(self):
-        self.stop = True
-        
-    
-    def request_push_updates(self, request_type=None, fn_name=None ):
-        self.ws.send(json.dumps({'request_type': request_type, 'fn_name': fn_name}))
-
-    def on_open(self): #, ws):
-        
-        
-        def run(*args):
-            while not self.stop:
-                time.sleep(1)
-            self.ws.close()
-            print "thread terminating..."
-        thread.start_new_thread(run, ())
-
-
-
-def order_status():
-    pass
-        
-if __name__ == '__main__':
-
-    
-
-    
-    logging.basicConfig(**{'level': logging.INFO,  'filemode': 'w', 'filename':'/tmp/api.log'}) 
-    logging.info('started...')
-
-
-    aws = APIWebSocketServer('aws', {'ws_port': 9001})
-     
-    def run_background():
-        aws.server.run_forever()
-             
-    t = Thread(target=run_background)        
-    t.start()    
-     
-    ac = APIClient()
-    def run_apiclient():
-        ac.run_forever()
-        
-    s = Thread(target=run_apiclient)        
-    s.start()    
-    
-    
-    f_info = {'order_status': order_status}
-    ac.request_push_updates('order_status', 'order_status')
+# from websocket_server import WebsocketServer
+# from ws.ws_server import BaseWebSocketServerWrapper
+# 
+# class APIWebSocketServer(BaseWebSocketServerWrapper, Subscriber):
+#     
+#     def __init__(self, name, kwargs):
+#         
+#         BaseWebSocketServerWrapper.__init__(self, name, kwargs)
+#         self.clients = {}
+# 
+#     def loop_forever(self):
+#         pass
+#         
+#     def encode_message(self, event_type, content):        
+#         return json.dumps({'event': event_type, 'value': content})
+#             
+#     def new_client(self, client, server):
+#         BaseWebSocketServerWrapper.new_client(self, client, server)
+#         self.clients[client['id']] = client
+#         self.clients[client['id']]['request'] = {}
+#         
+#         
+#         server.send_message(client, '%s' % client)
+#     
+#     # Called for every client disconnecting
+#     def client_left(self, client, server):
+#         BaseWebSocketServerWrapper.client_left(self, client, server)
+#         del self.clients[client['id']]
+#         
+#    
+#     # Called when a client sends a message1
+#     def message_received(self, client, server, message):
+#         BaseWebSocketServerWrapper.message_received(self, client, server, message)
+#         print '%s %s %s' % (client, server, message)
+#         
+# 
+#     def handle_tws_event(self, **param):
+#         print "APIWebSocketServer received tws events forwarded by gw"
+#             
+#     def update(self, event, **param): 
+#         if event == 'orderStatus':
+#             self.handle_tws_event(**param)
+#         elif event == 'openOrder':
+#             self.handle_tws_event(**param)
+#         elif event == 'openOrderEnd':
+#             self.handle_tws_event(**param)
+#         elif event == 'error':
+#             try:
+#                 id = param['id']
+#                 if id <> -1:
+#                     self.handle_event(**param)
+#             except:
+#                 logging.error('OrderBook ERROR: in processing tws error event %s' % param)
+#                 return
+#             
+#             
+# 
+# import websocket
+# import thread, time
+# from threading import Thread
+# class APIClient():
+#  
+#     def __init__(self):
+#          
+#         self.ws = websocket.WebSocketApp("ws://localhost:9001",
+#                                     on_message = self.on_message,
+#                                     on_error = self.on_error,
+#                                     on_close = self.on_close,
+#                                     on_open = self.on_open)
+#         #self.ws.on_open = self.on_open
+#         self.stop = False
+#         
+# 
+# 
+#     def run_forever(self):
+#         self.ws.run_forever()
+#          
+#     def on_message(self, message):
+#         print 'apiclient recv %s' % message
+#      
+#     def on_error(self, error):
+#         print error
+#      
+#     def on_close(self):
+#         print "### closed ###"
+#      
+#      
+#     def set_stop(self):
+#         self.stop = True
+#         
+#     
+#     def request_push_updates(self, request_type=None, fn_name=None ):
+#         self.ws.send(json.dumps({'request_type': request_type, 'fn_name': fn_name}))
+# 
+#     def on_open(self): #, ws):
+#         
+#         
+#         def run(*args):
+#             while not self.stop:
+#                 time.sleep(1)
+#             self.ws.close()
+#             print "thread terminating..."
+#         thread.start_new_thread(run, ())
+# 
+# 
+# 
+# def order_status():
+#     pass
+#         
+# if __name__ == '__main__':
+# 
+#     
+# 
+#     
+#     logging.basicConfig(**{'level': logging.INFO,  'filemode': 'w', 'filename':'/tmp/api.log'}) 
+#     logging.info('started...')
+# 
+# 
+#     aws = APIWebSocketServer('aws', {'ws_port': 9001})
+#      
+#     def run_background():
+#         aws.server.run_forever()
+#              
+#     t = Thread(target=run_background)        
+#     t.start()    
+#      
+#     ac = APIClient()
+#     def run_apiclient():
+#         ac.run_forever()
+#         
+#     s = Thread(target=run_apiclient)        
+#     s.start()    
 #     
 #     
-#     websocket.enableTrace(True)
-#     ws = websocket.WebSocketApp("ws://localhost:9001",
-#                               on_message = on_message,
-#                               on_error = on_error,
-#                               on_close = on_close)
-#     ws.on_open = on_open
-#     ws.run_forever()           
+#     f_info = {'order_status': order_status}
+#     ac.request_push_updates('order_status', 'order_status')
+# #     
+# #     
+# #     websocket.enableTrace(True)
+# #     ws = websocket.WebSocketApp("ws://localhost:9001",
+# #                               on_message = on_message,
+# #                               on_error = on_error,
+# #                               on_close = on_close)
+# #     ws.on_open = on_open
+# #     ws.run_forever()           
 #     

+ 65 - 21
src/ormdapi/v2/apiv2.py

@@ -4,7 +4,8 @@ from misc2.observer import Publisher
 from finopt.instrument import Symbol
 from time import sleep
 from ormdapi.v2.position_handler import AccountSummaryTags
-import uuid
+from ormdapi.v2.ws.ws_api_server import ApiSocketServer
+import uuid, logging
 import traceback
 import json
 from threading import RLock    
@@ -263,13 +264,26 @@ class SyncOrderCRUD_v2(Resource):
         parser = reqparse.RequestParser()
         parser.add_argument('contract', required=True, help="contract is required.")
         parser.add_argument('order_condition', required=True, help="order_condition is required.")
+        parser.add_argument('live_update', required=False, help="obtain live udpates via the websocket")
         args = parser.parse_args()
         js_contract = args.get('contract')
-        contract = v2_helper.format_v2_str_to_contract(js_contract)
+        try:
+            contract = v2_helper.format_v2_str_to_contract(js_contract)
+        except:
+            return {'error': 'check the format of the contract string! %s' % traceback.format_exc()}, 409
         js_order_cond = args.get('order_condition')
         clordid = str(uuid.uuid4())
 
         self.wc.get_api_sink().add_message('/order', 'SyncOrderCRUD_v2:post', 'received new order %s condition: %s' % (js_contract, js_order_cond))
+        logging.info('SyncOrderCRUD_v2:post received new order %s condition: %s' % (js_contract, js_order_cond))
+        '''
+           to do FIX error handling above ^^^ 
+        '''
+        if args['live_update']:
+            handle_id = args['live_update'].strip('"').strip("'")
+            result = self.wc.get_parent().get_ws_manager().register_request(handle_id, ApiSocketServer.RS_ORDER_STATUS)
+            
+
         
         done = False
         iom = self.wc.get_parent().get_order_id_manager()
@@ -285,7 +299,7 @@ class SyncOrderCRUD_v2(Resource):
             order = v2_helper.format_v2_str_to_order(js_order_cond)
             OrderHelper.order_validation_ex(order)
             self.gw_conn.placeOrder(id['next_valid_id'], contract, order)
-            return {'order id': id['next_valid_id']}, 201
+            return {'order_id': id['next_valid_id']}, 201
         
         except OrderValidationException as e:
             return {'error': e.args[0]}, 409
@@ -326,14 +340,16 @@ class QuoteRequest_v2(Resource, Publisher):
         self.wc = webconsole
         self.contract_mgr = self.wc.get_parent().get_subscription_manager()
         self.quote_mgr = self.wc.get_parent().get_quote_manager()
-        self.event = 'reqMktData'
-        Publisher.__init__(self, [self.event])
-        self.register(self.event, self.contract_mgr, callback=getattr(self.contract_mgr, self.event))
+        self.e_mkt_data = 'reqMktData'
+        Publisher.__init__(self, [self.e_mkt_data])
+        self.register(self.e_mkt_data, self.contract_mgr, callback=getattr(self.contract_mgr, self.e_mkt_data))
+    
     
     def get(self):
         parser = reqparse.RequestParser()
         parser.add_argument('contract', required=True, help="contract is required.")
         parser.add_argument('greeks', required=False, help="obtain option greeks")
+        parser.add_argument('live_update', required=False, help="obtain live udpates via the websocket")
         args = parser.parse_args()
 
         '''
@@ -382,27 +398,55 @@ class QuoteRequest_v2(Resource, Publisher):
                     require_greeks = True
         except:
             pass
-                                    
+        
+        if args['live_update']:
+            handle_id = args['live_update'].strip('"').strip("'")
+            result = self.wc.get_parent().get_ws_manager().register_request(handle_id, ApiSocketServer.RS_QUOTE)
+        '''
+           to do FIX error handling above ^^^ 
+        '''
+        
+                          
         sym = self.quote_mgr.get_symbol(contract)
         if sym:
-            return output_result(sym, require_greeks), 200
+            try:
+                return output_result(sym, require_greeks), 200
+            except:
+                return 'invalid request! The supplied contract is found in the database but its definition could be invalid and rejected by the server!', 404
                     
         else:
-            print ContractHelper.contract2kvstring(contract)
-            self.dispatch(self.event, {'contract': ContractHelper.contract2kvstring(contract), 'snapshot': False})
+            logging.info('QuoteRequest_v2: contract %s' %  ContractHelper.contract2kvstring(contract))
+            self.dispatch(self.e_mkt_data, {'contract': ContractHelper.contract2kvstring(contract), 'snapshot': False})
             i = 0
-            while 1:
-                sym =  self.quote_mgr.get_symbol(contract)
-                if sym:
-                    break
-                sleep(0.1)
-                i += 0.5 
-                if i >= 15:
-                    return 'Not getting any quotes from the server after waited 5 seconds! Contact administrator', 404
-                
-            return output_result(sym, require_greeks), 200
-        
+#             while 1:
+#                 sym =  self.quote_mgr.get_symbol(contract)
+#                 if sym:
+#                     break
+#                 sleep(0.1)
+#                 i += 0.5 
+#                 if i >= 15:
+#                     return 'Not getting any quotes from the server after waited 5 seconds! Contact administrator', 404
+#                 
+#             return output_result(sym, require_greeks), 200
         
+            
+            done = False       
+            try:
+                while not done:
+                    sleep(0.1)
+                    i += 0.5 
+                    if i >= 15:
+                        return 'Not getting any quotes  from the server after waited 10 seconds! Contact administrator', 404
+                    sym =  self.quote_mgr.get_symbol(contract)
+                    if sym <> None:
+                        try:
+                            _ = sym.get_contract()
+                            return output_result(sym, require_greeks), 200
+                        except:
+                            return sym, 409
+                    
+            except:
+                return {'error': 'check the format of the contract message! %s' % traceback.format_exc()}, 409         
         
 
 '''

+ 12 - 2
src/ormdapi/v2/quote_handler.py

@@ -20,6 +20,7 @@ class QuoteRESTHandler(Subscriber):
         self.name = name
         self.gw_parent = gw_parent
         self.tws_event_handler = gw_parent.get_tws_event_handler()
+        self.sub_mgr = gw_parent.get_subscription_manager()
         
         Subscriber.__init__(self, self.name)
         
@@ -28,7 +29,7 @@ class QuoteRESTHandler(Subscriber):
              this class
              
         '''
-        for e in ['tickPrice', 'tickSize', 'tickOptionComputation']:
+        for e in ['tickPrice', 'tickSize', 'tickOptionComputation', 'error']:
             self.tws_event_handler.register(e, self)           
         
 
@@ -73,7 +74,16 @@ class QuoteRESTHandler(Subscriber):
             self.handle_ticksize(**param)
         elif event == 'tickOptionComputation':
             self.handle_tickgreeks(**param)
-        
+        elif event == 'error':            
+            try:
+                
+                if param['id'] >= 1000 and param['id'] < 1999:
+                    ckey = self.sub_mgr.get_contract_key_by_id(param['id']-1000)
+                    self.symbols[ckey] = {'error': param['errorMsg']}
+                    
+            except:
+                pass
+            
     
     def get_symbol(self, contract):
         try:

+ 0 - 0
src/ormdapi/v2/ws/__init__.py


+ 0 - 0
src/ormdapi/v2/ws/client/__init__.py


+ 116 - 0
src/ormdapi/v2/ws/client/ws_api_client.py

@@ -0,0 +1,116 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+import websocket
+import threading
+import time, json, traceback, logging, requests
+
+class WsClient(websocket.WebSocketApp, threading.Thread):
+    
+    def __init__(self, parent, url):
+        threading.Thread.__init__(self)  
+        websocket.WebSocketApp.__init__(self,    
+                              url = url,                           
+                              on_message = self.on_message,
+                              on_error = self.on_error,
+                              on_close = self.on_close,
+                              on_open = self.on_open)
+        self.quit = False
+        self.connected = False
+        self.parent = parent
+        self.assigned_handle = None
+        
+    def encode_message(self, stuff):
+        return json.dumps({'msg': stuff})
+    
+    def on_message(self, message):
+        logging.info("WsClient:on_message received a message from the server %s" % message)
+        try:
+            e = json.loads(message)
+            if e['event'] == RestStream.RS_ACCEPTED:
+                self.assigned_handle = e['msg']['handle']
+            self.parent._dispatch(e['event'], e['msg'])
+                
+        except:
+            logging.error('WsClient: on_message %s' % traceback.format_exc())
+    
+    def request_updates(self, event_name):
+        self.send(json.dumps({'event': 'request', 'param': event_name}))
+    
+    def on_error(self, error):
+        logging.info(error)
+    
+    def on_close(self):
+        logging.info("WsClient:on_close ### closed ###")
+
+    
+    def on_open(self):
+        logging.info('WsClient:on_open connection opened')
+        self.connected = True
+        
+    def is_connected(self):
+        return self.connected
+    
+
+    
+    def shutdown(self):
+        self.quit = True
+        self.close()
+        logging.info("WsClient:shutdown thread terminating...")
+                
+
+class RestStream():
+    RS_ORDER_STATUS = 'rs_order_status'
+    RS_QUOTE = 'rs_quote'
+    RS_ACCEPTED = 'rs_accepted'
+    EVENTS = [RS_ORDER_STATUS, RS_QUOTE, RS_ACCEPTED]
+    
+    def __init__(self, host, port):
+        self.url = 'ws://%s:%d' % (host, port)
+        self.events = { event : []
+                          for event in RestStream.EVENTS }
+        
+    def connect(self):
+        try:
+            self.wsc = WsClient(self, self.url)
+            t = threading.Thread(name='wsc', target=self.wsc.run_forever)
+            t.setDaemon(True)
+            t.start()         
+            i = 0
+            time.sleep(0.5)
+            while not self.wsc.is_connected():
+                i+=1
+                time.sleep(0.5)
+                if i > 5:
+                    raise Exception('RestStream: connect timeout.')
+            
+            return self.get_assigned_handle()
+        except:
+            logging.error('RestStream:connect %s' % traceback.format_exc())
+            raise Exception('RestStream: connect timeout.')
+         
+    
+    def _get_subscribers(self, event):
+        return self.events[event]
+
+    def _dispatch(self, event, params=None):
+        for callback in self._get_subscribers(event):
+            callback(event, **params)
+    
+    def register(self, event, callback):
+        self._get_subscribers(event).append(callback)
+        self.wsc.request_updates(event)
+    
+    
+    def unregister(self, event, callback):
+        if callback in self._get_subscribers(event): 
+            self._get_subscribers.remove(callback)
+        
+    def get_assigned_handle(self):
+        if self.wsc:
+            return self.wsc.assigned_handle    
+        return None
+    
+    
+    def disconnect(self):
+        self.wsc.shutdown()
+           

+ 79 - 0
src/ormdapi/v2/ws/client/ws_api_test.py

@@ -0,0 +1,79 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+from ormdapi.v2.ws.client.ws_api_client import RestStream
+import logging, requests, time, json
+
+
+
+
+def order_status(event, **order_status_dict):
+    print order_status_dict
+    
+def price_quote(event, **quote_dict):
+    print quote_dict    
+
+def accepted(event, handle):
+    print 'connection established %s %s' % (event, handle)
+
+'''
+    test api
+'''
+def test_quote(rest_server, handle):
+    url = 'http://%s/v2/quote?contract={"currency": "USD", "symbol": "EUR", "sec_type": "CASH", "exchange": "IDEALPRO"}&live_update=%s' % (rest_server, handle)
+    print url
+    r = requests.get(url)
+    print r.content
+#         url = 'http://localhost:5001/v2/quote?contract={"currency": "HKD", "symbol": "HSI", "sec_type": "FUT", "exchange": "HKFE", "expiry":"20190627"}&live_update=%s' % handle
+    
+def test_new_order(rest_server, handle):
+    url = 'http://%s/v2/order?contract={"right": "C", "exchange": "HKFE", "symbol": "HSI", "expiry": "20190627", "currency": "HKD", "sec_type": "OPT", "strike": 30400}&order_condition={"account": "U9050568", "order_type": "LMT", "what_if": "False", "price":150,"side": "SELL", "quantity": "1"}&live_update=%s' % (rest_server, handle)
+    print url
+    r = requests.post(url)
+    id = None
+    if r.status_code == 201:
+        id = json.loads(r.content)['order_id']
+        print 'order created. order_id %d' % id        
+    if id:
+        print 'next we are going to cancel this order...%d' % id
+        time.sleep(2)
+        test_cancel_order(rest_server, id)
+
+def test_cancel_order(rest_server, order_id):
+    url = 'http://%s/v2/order?id=%d' % (rest_server, order_id)
+    r = requests.delete(url)
+    print r.content
+    
+    
+if __name__ == "__main__":
+    logging.basicConfig(
+    format='%(asctime)s.%(msecs)s:%(name)s:%(thread)d:%(levelname)s:%(process)d:%(message)s',
+    level=logging.INFO
+    )
+    
+    rest_server = 'localhost:5001' # ormd.vortifytech.com
+    ws_server = 'localhost'
+    rs = RestStream(ws_server, 9005)
+    try:
+        
+        '''
+            establish a websocket connection
+            set up the callbacks 
+        '''
+        handle = rs.connect()
+        rs.register(RestStream.RS_ORDER_STATUS, order_status)
+        rs.register(RestStream.RS_QUOTE, price_quote)
+        rs.register(RestStream.RS_ACCEPTED, accepted)    
+        
+        test_quote(rest_server, handle)
+        test_new_order(rest_server, handle)
+
+        # run a few seconds and quit 
+        time.sleep(5)
+        
+    except KeyboardInterrupt:
+        logging.warn('Received CTRL-C...terminating')
+    finally:
+        rs.disconnect()
+        
+    logging.info('exiting main()')   
+        

+ 188 - 0
src/ormdapi/v2/ws/ws_api_server.py

@@ -0,0 +1,188 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+from websocket_server import WebsocketServer
+import threading, logging, time, traceback
+import json
+from misc2.observer import Subscriber
+from misc2.helpers import ContractHelper
+from uuid import uuid4
+from threading import RLock    
+
+# https://github.com/Pithikos/python-websocket-server
+class ApiSocketServer(WebsocketServer, threading.Thread, Subscriber):
+    RS_ORDER_STATUS = 'rs_order_status'
+    RS_QUOTE = 'rs_quote'
+    RS_ACCEPTED = 'rs_accepted'
+    EVENTS = [RS_ORDER_STATUS, RS_QUOTE, RS_ACCEPTED] 
+    
+    def __init__(self, name, gw_parent, host, port):
+        threading.Thread.__init__(self, name=name)
+        WebsocketServer.__init__(self, port, host)
+        self.set_fn_new_client(self.new_client)
+        self.set_fn_client_left(self.client_left)
+        self.set_fn_message_received(self.message_received)
+
+        self.gw_parent = gw_parent
+        self.tws_event_handler = gw_parent.get_tws_event_handler()     
+        
+        '''
+            stores the mapping between each uuid handle and a web socket client id
+            
+        '''   
+        self.handle_client_map= {}
+        '''
+            stores the mapping between request message type and a web socket client
+        '''
+        self.cli_requests = {}
+        
+        Subscriber.__init__(self, self.name)
+        for e in ['error', 'orderStatus', 'tickPrice', 'tickSize',
+                          'tickOptionComputation']:
+            self.tws_event_handler.register(e, self)
+        self.lock = RLock()
+            
+
+        
+        
+    def encode_message(self, event, stuff):
+        return json.dumps({'event': event, 'msg': stuff})
+
+            
+    def new_client(self, client, server):
+        logging.info("New client connected and was given id %d" % client['id'])
+        #self.send_message_to_all("Hey all, a new client has joined us")
+        handle_id = str(uuid4())
+        self.handle_client_map[handle_id] = client
+        logging.info('ApiSocketServer: new_client handle->%s client->%s' % (handle_id, client))
+        self.send_message(client, self.encode_message('rs_accepted', {'handle': handle_id}))
+        
+    
+    # Called for every client disconnecting
+    def client_left(self, client, server):
+        logging.info("Client(%d) disconnected" % client['id'])
+#         for h, c in self.handle_client_map.iteritems():
+#             if c == client:
+#                 self.handle_client_map.pop(h)
+#                 break
+        for m in self.cli_requests:
+            try:
+                self.cli_requests[m].remove(client)
+            except ValueError:
+                continue
+                
+                
+                    
+    # Called when a client sends a message
+    def message_received(self, client, server, message):
+        logging.info("Client(%d) said: %s" % (client['id'], message))
+        ed = json.loads(message)
+#         if ed['event'] == 'request':
+#             self.handle_client_request(client, ed['param'])
+            
+    '''
+        this function is called by any rest API class that wants
+        streaming data via the websocket. The parameter handle_id
+        is returned to the client the first time it connects to the server 
+    '''     
+    def register_request(self, handle_id, msg_type):
+        
+        
+        def _register_request(handle_id, msg_type):
+            try:
+                
+    
+                if msg_type not in ApiSocketServer.EVENTS:
+                    logging.error('ApiSocketServer:handle_client_request invalid client request type %s!' % msg_type)
+                    #return {'error': 'ApiSocketServer:handle_client_request invalid client request type %s!' % msg_type}
+                _ = self.cli_requests[msg_type]
+                
+            except KeyError:
+                self.cli_requests[msg_type] = []
+                
+            try:
+                c = self.handle_client_map[handle_id]
+                if c not in self.cli_requests[msg_type]:
+                    self.cli_requests[msg_type].append(c)
+            except KeyError:
+                logging.error('ApiSocketServer:invalid handle %s!' % handle_id)
+                #return {'error': 'ApiSocketServer:invalid handle %s!' % handle_id}
+                
+ 
+        try:
+            self.lock.acquire()            
+            _register_request(handle_id, msg_type)
+        except:
+            logging.error('ApiSocketServer: register_request %s' % traceback.format_exc())
+        finally:            
+            self.lock.release()        
+               
+
+        
+         
+    '''
+        API handle quote stream
+    '''   
+    def handle_tickprice(self, contract_key, field, price, canAutoExecute):
+        logging.debug('ApiSocketServer:tickPrice')
+        try:
+            for c in self.cli_requests[ApiSocketServer.RS_QUOTE]:
+                self.send_message(c, self.encode_message(ApiSocketServer.RS_QUOTE,
+                                                               {'tick_info': 'tick_price',
+                                                                'contract_key': contract_key,
+                                                                'field': field,
+                                                                'price': price}
+                                                               ))
+        except:
+            '''
+                callbacks are being fired by tws_event_handler even when
+                the clients are long gone
+                we want to skip processing these callbacks
+            '''
+            pass
+        
+    def handle_ticksize(self, contract_key, field, size):
+        logging.debug('ApiSocketServer:ticksize')
+        #c = ContractHelper.makeContractfromRedisKeyEx(contract_key)
+        try:
+            for c in self.cli_requests[ApiSocketServer.RS_QUOTE]:
+                self.send_message(c, self.encode_message(ApiSocketServer.RS_QUOTE,
+                                                               {'tick_info': 'tick_size',
+                                                                'contract_key': contract_key,
+                                                                'field': field,
+                                                                'size': size}
+                                                               ))
+        except:
+            pass
+
+    def handle_tickgreeks(self, **params):
+        logging.debug('ApiSocketServer:tickOptionComputation')
+        pass
+    
+    def handle_orderstatus(self, orderId, status, filled, remaining, avgFillPrice, permId, parentId, lastFillPrice, clientId, whyHeld):
+        logging.debug('ApiSocketServer:handle_orderstatus')
+        try:
+            for c in self.cli_requests[ApiSocketServer.RS_ORDER_STATUS]:
+                self.send_message(c, self.encode_message(ApiSocketServer.RS_ORDER_STATUS,
+                                            {'order_id':orderId, 'status': status, 'filled':filled, 
+                                             'remaining':remaining, 'avg_fill_price':avgFillPrice, 
+                                             'perm_id':permId,
+                                             #'parent_id':parentId, 
+                                             'last_fill_price':lastFillPrice, 
+                                             #'client_id':clientId, 
+                                             'why_held':whyHeld}))        
+        except:
+            pass
+        
+    def update(self, event, **param): 
+        if event == 'tickPrice':
+            self.handle_tickprice(**param)
+        elif event == 'tickSize':
+            self.handle_ticksize(**param)
+        elif event == 'tickOptionComputation':
+            self.handle_tickgreeks(**param)
+        elif event == 'orderStatus':
+            self.handle_orderstatus(**param)
+'''
+a simplified version of this program for testing can be found under ws/tests/client2.py and server2.py
+'''
+    

+ 39 - 22
src/ws/server2.py

@@ -1,36 +1,52 @@
 from websocket_server import WebsocketServer
 import threading, logging, time, traceback
 import json
+
 # https://github.com/Pithikos/python-websocket-server
 
 
-class WebSocketServerWrapper(threading.Thread):
-    def __init__(self, name):
+class WebSocketServerWrapper(WebsocketServer, threading.Thread):
+ 
+    
+    
+    def __init__(self, name, host, port):
         threading.Thread.__init__(self, name=name)
-        self.clients = {}
+        WebsocketServer.__init__(self, port, host)
+        self.set_fn_new_client(self.new_client)
+        self.set_fn_client_left(self.client_left)
+        self.set_fn_message_received(self.message_received)
+        self.start()
+        self.cli_requests = {}
         
-    def set_server(self, server):
-        self.server = server
-        
-    def run(self):   
-        print 'started...'
+    def run(self):
+           
+        self.i = 0
+        def gen_msg(x):
+            self.i += 1
+            if self.i % 2:
+                return self.encode_message('rs_order_status', {'text': 'order stat to %s: %s' % (x['id'], time.ctime())})
+            else:
+                return self.encode_message('rs_quote', {'text': 'quote px to %s: %s' % (x['id'], time.ctime())})
+            
         while 1:
             time.sleep(1.5)
             #print 'sending stuff.. %s' % str(list(self.clients.iteritems()))
-            map(lambda x: self.server.send_message(x[1], 'msg to %d: %s' % (x[0], time.ctime())), list(self.clients.iteritems()))
+            map(lambda x: self.send_message(x, gen_msg(x)), self.clients)
             
             
+    def encode_message(self, event, stuff):
+        return json.dumps({'event': event, 'msg': stuff})
 
             
     def new_client(self, client, server):
         print("New client connected and was given id %d" % client['id'])
-        self.clients[client['id']] = client
-        server.send_message_to_all("Hey all, a new client has joined us")
+        #self.send_message_to_all("Hey all, a new client has joined us")
+        self.send_message(client, self.encode_message('rs_accepted', {'handle': {'id': client['id'], 'address': client['address']}}))
     
     
     # Called for every client disconnecting
     def client_left(self, client, server):
-        del self.clients[client['id']]
+        print client.keys()
         print("Client(%d) disconnected" % client['id'])
     
     
@@ -39,18 +55,19 @@ class WebSocketServerWrapper(threading.Thread):
         if len(message) > 200:
             message = message[:200]+'..'
         print("Client(%d) said: %s" % (client['id'], message))
-    
+        ed = json.loads(message)
+        if ed('event') == 'request':
+            self.handle_client_request(client, ed)
+            
+            
+    #def handle_client_request(self, client, ):
+
+
+# test this program with RestStream client found under ws/tests/client2.py
 
 def main():
-    wsw = WebSocketServerWrapper('hello')    
-    wsw.start()
-    PORT=9001
-    server = WebsocketServer(PORT)
-    wsw.set_server(server)
-    server.set_fn_new_client(wsw.new_client)
-    server.set_fn_client_left(wsw.client_left)
-    server.set_fn_message_received(wsw.message_received)
-    server.run_forever()
+    wsw = WebSocketServerWrapper('hello', 'localhost', 9001)    
+    wsw.run_forever()
     
     
 if __name__ == "__main__":

+ 162 - 0
src/ws/tests/client2.py

@@ -0,0 +1,162 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+import websocket
+import threading
+import time, json, traceback, logging, requests
+
+class WsClient(websocket.WebSocketApp, threading.Thread):
+    
+    def __init__(self, parent, url):
+        threading.Thread.__init__(self)  
+        websocket.WebSocketApp.__init__(self,    
+                              url = url,                           
+                              on_message = self.on_message,
+                              on_error = self.on_error,
+                              on_close = self.on_close,
+                              on_open = self.on_open)
+        self.quit = False
+        self.connected = False
+        self.parent = parent
+        self.assigned_handle = None
+        
+    def encode_message(self, stuff):
+        return json.dumps({'msg': stuff})
+    
+    def on_message(self, message):
+        logging.info("WsClient:on_message received a message from the server %s" % message)
+        try:
+            e = json.loads(message)
+            if e['event'] == RestStream.RS_ACCEPTED:
+                self.assigned_handle = e['msg']['handle']
+            self.parent._dispatch(e['event'], e['msg'])
+                
+        except:
+            logging.error('WsClient: on_message %s' % traceback.format_exc())
+    
+    def request_updates(self, event_name):
+        self.send(json.dumps({'event': 'request', 'param': event_name}))
+    
+    def on_error(self, error):
+        logging.info(error)
+    
+    def on_close(self):
+        logging.info("WsClient:on_close ### closed ###")
+
+    
+    def on_open(self):
+        logging.info('WsClient:on_open connection opened')
+        self.connected = True
+        
+    def is_connected(self):
+        return self.connected
+    
+
+    
+    def shutdown(self):
+        self.quit = True
+        self.close()
+        logging.info("WsClient:shutdown thread terminating...")
+                
+
+class RestStream():
+    RS_ORDER_STATUS = 'rs_order_status'
+    RS_QUOTE = 'rs_quote'
+    RS_ACCEPTED = 'rs_accepted'
+    EVENTS = [RS_ORDER_STATUS, RS_QUOTE, RS_ACCEPTED]
+    
+    def __init__(self, name, host, port):
+        self.url = 'ws://%s:%d' % (host, port)
+        self.events = { event : []
+                          for event in RestStream.EVENTS }
+        
+    def connect(self):
+        try:
+            self.wsc = WsClient(self, self.url)
+            t = threading.Thread(name='wsc', target=self.wsc.run_forever)
+            t.setDaemon(True)
+            t.start()         
+            i = 0
+            time.sleep(0.5)
+            while not self.wsc.is_connected():
+                i+=1
+                time.sleep(0.5)
+                if i > 5:
+                    raise Exception('RestStream: connect timeout.')
+            
+            return self.get_assigned_handle()
+        except:
+            logging.error('RestStream:connect %s' % traceback.format_exc())
+            raise Exception('RestStream: connect timeout.')
+         
+    
+    def _get_subscribers(self, event):
+        return self.events[event]
+
+    def _dispatch(self, event, params=None):
+        for callback in self._get_subscribers(event):
+            callback(event, **params)
+    
+    def register(self, event, callback):
+        self._get_subscribers(event).append(callback)
+        self.wsc.request_updates(event)
+    
+    
+    def unregister(self, event, callback):
+        if callback in self._get_subscribers(event): 
+            self._get_subscribers.remove(callback)
+        
+    def get_assigned_handle(self):
+        if self.wsc:
+            return self.wsc.assigned_handle    
+        return None
+    
+    
+    def disconnect(self):
+        self.wsc.shutdown()
+           
+''''''''''''''''''''''''''''''''''''''''''''''''''
+''''''''''''''''''''''''''''''''''''''''''''''''''
+
+def order_status(event, **order_status_dict):
+    print order_status_dict
+    
+def price_quote(event, **quote_dict):
+    print quote_dict    
+
+def accepted(event, handle):
+    print '%s %s' % (event, handle)
+
+if __name__ == "__main__":
+    logging.basicConfig(
+    format='%(asctime)s.%(msecs)s:%(name)s:%(thread)d:%(levelname)s:%(process)d:%(message)s',
+    level=logging.INFO
+    )
+    
+    rs = RestStream('rs', 'localhost', 9001)
+    try:
+        
+        '''
+            establish a websocket connection
+            set up the callbacks 
+        '''
+        handle = rs.connect()
+        rs.register(RestStream.RS_ORDER_STATUS, order_status)
+        rs.register(RestStream.RS_QUOTE, price_quote)
+        rs.register(RestStream.RS_ACCEPTED, accepted)    
+        
+        '''
+            request for quote updates
+        '''
+        url = 'http://localhost:5001/v2/quote?contract={"currency": "JPY", "symbol": "USD", "sec_type": "CASH", "exchange": "IDEALPRO"}&live_update=%s' % handle
+        r = requests.get(url)
+        print r.content
+                
+        
+        time.sleep(10)
+    except KeyboardInterrupt:
+        logging.warn('Received CTRL-C...terminating')
+    finally:
+        rs.disconnect()
+        
+    logging.info('exiting main()')   
+