Sfoglia il codice sorgente

api functions upgrade

add sink utilities to save user actions
migrate ib_mds to work on newer version of kakfa
add telegram alert
laxaurus 6 anni fa
parent
commit
701da3b51c

+ 26 - 12
src/cep/ib_mds.py

@@ -17,8 +17,9 @@ from optparse import OptionParser
 #from options_data import ContractHelper
 
 import finopt.options_data as options_data
-from kafka.client import KafkaClient
-from kafka.producer import SimpleProducer
+#from kafka.client import KafkaClient
+#from kafka.producer import SimpleProducer
+from kafka import KafkaConsumer, KafkaProducer
 from comms.alert_bot import AlertHelper
 
 ## 
@@ -53,7 +54,7 @@ class IbKafkaProducer():
 #         port = int(self.config.get("ib_mds", "ib_mds.ib_port"))
 #         appid = int(self.config.get("ib_mds", "ib_mds.appid.id"))  
         kafka_host = self.config.get("cep", "kafka.host").strip('"').strip("'")
-        
+        kafka_port =  self.config.get("cep", "kafka.port").strip('"').strip("'")
         self.persist['is_persist'] = self.config.get("ib_mds", "ib_mds.is_persist")
         self.persist['persist_dir'] =self.config.get("ib_mds", "ib_mds.persist_dir").strip('"').strip("'")
         self.persist['file_exist'] = False
@@ -73,9 +74,21 @@ class IbKafkaProducer():
         logging.info('******* Starting IbKafkaProducer')
         logging.info('IbKafkaProducer: connecting to kafka host: %s...' % kafka_host)
         logging.info('IbKafkaProducer: message mode is async')
+
+
+        kwargs = {
+          'name': 'ib_mds',
+          'bootstrap_host': kafka_host,
+          'bootstrap_port': kafka_port,
+          'group_id': 'mds',
+          'session_timeout_ms': 10000,
+          'clear_offsets':  False,
+          'order_transmit': False
+          }
+
         
-        client = KafkaClient(kafka_host)
-        self.producer = SimpleProducer(client, async=False)
+        #client = KafkaClient(**kwargs)
+        self.producer = KafkaProducer(bootstrap_servers='%s:%s' % (kwargs['bootstrap_host'], kwargs['bootstrap_port']))
         
         if not replay:
             self.start_ib_connection()
@@ -109,14 +122,15 @@ class IbKafkaProducer():
                 msg =  "%s,%d,%d,%s,%s" % (datetime.datetime.now().strftime('%Y%m%d%H%M%S'), tick_id ,\
                                                                 4, v['price'], c_name)
                 print msg
-                self.producer.send_messages('my.price', msg.encode('utf-8'))
+                self.producer.send('my.price', msg.encode('utf-8'))
                 tick_id+=1
 
             sleep(sec)
         
         
-    def add_contract(self, tuple):
-        print tuple
+    def add_contract(self, clist):
+        
+        tuple = map(lambda x: x if x not in [''] else None, clist) 
         c = options_data.ContractHelper.makeContract(tuple)
         self.con.reqMktData(self.id2contract['conId'], c, '', False)
         self.id2contract['id2contracts'][self.id2contract['conId']] = options_data.ContractHelper.makeRedisKeyEx(c)
@@ -181,7 +195,7 @@ class IbKafkaProducer():
             logging.debug(t)   
             if self.toggle:
                 print t
-            self.producer.send_messages(IbKafkaProducer.IB_TICK_PRICE if msg.typeName == 'tickPrice' else IbKafkaProducer.IB_TICK_SIZE, t) 
+            self.producer.send(IbKafkaProducer.IB_TICK_PRICE if msg.typeName == 'tickPrice' else IbKafkaProducer.IB_TICK_SIZE, t) 
             if self.persist['is_persist']:
                 self.write_message_to_file(t)
                 
@@ -240,9 +254,9 @@ class IbKafkaProducer():
         logging.info('load_tickers: attempt to open file %s' % path)
         fr = open(path)
         for l in fr.readlines():
-            if l[0] <> '#':
+            if l[0] not in ['#', '', ' ']:
                  
-                self.add_contract(tuple([t for t in l.strip('\n').split(',')]))
+                self.add_contract([t for t in l.strip('\n').split(',')])
             
 
     def do_work(self):
@@ -286,7 +300,7 @@ class IbKafkaProducer():
                 interval = (msg_ts - (last_record_ts if last_record_ts <> None else msg_ts)).microseconds / 1000000.0
                 
                 print '%s %s %s' % (msg_ts.strftime('%Y-%m-%d %H:%M:%S.%f'), s_msg, fn)
-                self.producer.send_messages(IbKafkaProducer.IB_TICK_PRICE if msg['typeName'] == 'tickPrice' else IbKafkaProducer.IB_TICK_SIZE, s_msg)
+                self.producer.send(IbKafkaProducer.IB_TICK_PRICE if msg['typeName'] == 'tickPrice' else IbKafkaProducer.IB_TICK_SIZE, s_msg)
                  
                 last_record_ts = msg_ts
                 sleep(interval)

+ 6 - 2
src/comms/ibgw/order_manager.py

@@ -178,7 +178,7 @@ class OrderBook(Subscriber):
         try:
             _ = self.orders[orderId]
         except KeyError:
-            self.orders[orderId]= {'ord_status':{}, 'error':{}, 'order':{}, 'contract': {}}
+            self.orders[orderId]= {'ord_status':{}, 'error':{}, 'order':{}, 'contract': {}, 'state':{}}
              
         self.orders[orderId]['ord_status']['status'] = status
         self.orders[orderId]['ord_status']['filled'] = filled
@@ -199,12 +199,16 @@ class OrderBook(Subscriber):
         self.orders[id]['error'] = {'errorCode': errorCode, 'errorMsg': errorMsg}
         
     def handle_open_order(self, orderId, state, order, contract):
+        
+        
+        
         try:
             _ = self.orders[orderId]
         except KeyError:
-            self.orders[orderId]= {'ord_status':{}, 'error':{}, 'order':{}, 'contract': {}} 
+            self.orders[orderId]= {'ord_status':{}, 'error':{}, 'order':{}, 'contract': {}, 'state':{}} 
         self.orders[orderId]['order'] = order
         self.orders[orderId]['contract'] = contract
+        self.orders[orderId]['state'] = state
         
     def update(self, event, **param): 
         if event == 'orderStatus':

+ 9 - 5
src/comms/ibgw/tws_gateway.py

@@ -156,11 +156,11 @@ class TWS_gateway():
 #         self.order_id_mgr = OrderIdManager(self.tws_connection)
 #         self.tws_event_handler.set_order_id_manager(self.order_id_mgr)
 #         self.order_id_mgr.start()
-          self.order_manager = OrderManager('order_manager', self, self.kwargs)
-          self.order_manager.start_order_manager()
-          
-          
-          self.quote_manager = QuoteRESTHandler('quote_manager', self)
+        self.order_manager = OrderManager('order_manager', self, self.kwargs)
+        self.order_manager.start_order_manager()
+        
+        
+        self.quote_manager = QuoteRESTHandler('quote_manager', self)
         
     def initialize_redis(self):
 
@@ -214,6 +214,9 @@ class TWS_gateway():
     def get_redis_conn(self):
         return self.rs
 
+    def get_config(self):
+        return self.kwargs
+
     def connect_tws(self):
         if type(self.kwargs['tws_app_id']) <> int:
             logging.error('TWS_gateway:connect_tws. tws_app_id must be of int type but detected %s!' % str(type(kwargs['tws_app_id'])))
@@ -273,6 +276,7 @@ class TWS_gateway():
         self.ibh.shutdown()
         self.menu_loop_done = True
         self.get_order_id_manager().set_stop()
+        
         sys.exit(0)
         
 

+ 22 - 3
src/comms/ibgw/tws_gateway_restapi.py

@@ -9,7 +9,8 @@ from flask_restful import Resource, Api, reqparse
 import traceback
 from ormdapi.v1 import apiv1
 from ormdapi.v2 import apiv2
-
+from ormdapi.v2.api_utilities import ApiMessagePersistence, ApiMessageSink, TelegramApiMessageAlert
+import logging
 
 
 
@@ -24,10 +25,25 @@ class WebConsole(Subscriber):
         Subscriber.__init__(self, 'WebConsole' )
         self.parent = parent
         self.id_message = {}
+        '''
+            message sink is a message queue that stores any event that the api classes wanted to log down
+            the sink broadcast any received message to interested subscribers: message_store and telegram bot
+            message store is a persistor that 
+        '''
+        self.message_sink = ApiMessageSink(self.parent.get_config())
+        message_store = ApiMessagePersistence(self.parent.get_redis_conn(), self.parent.get_config(), self.message_sink)
+        try:
+            tg_bot = TelegramApiMessageAlert(parent.kwargs['restapi.telegram_tok'], self.message_sink) 
+        except KeyError:
+            logging.error('Webconsole: fail to get access token for telegram bot. ') 
+        self.message_sink.start()
 
     def get_parent(self):
         return self.parent
     
+    def get_api_sink(self):
+        return self.message_sink
+    
     def add_resource(self):
         WebConsole.api.add_resource(apiv1.Commands, '/v1')
         WebConsole.api.add_resource(apiv1.ExitApp, '/v1/exit', resource_class_kwargs={'webconsole': self})
@@ -46,10 +62,13 @@ class WebConsole(Subscriber):
         WebConsole.api.add_resource(apiv2.QuoteRequest_v2, '/v2/quote', resource_class_kwargs={'webconsole': self})
 
 
-
+    def set_stop(self):
+        self.message_sink.set_stop()
+        logging.info('WebConsole: setting message sink stop flag to true')
         
     def post_shutdown(self):
-        self.parent.post_shutdown() 
+        self.parent.post_shutdown()
+         
     '''
         implement the consumer interface
         this function gets all tws events

+ 18 - 14
src/config/mds.cfg

@@ -1,12 +1,15 @@
 [redis]
 redis.server: "localhost"
-redis.port: 6379
-redis.db: 3
+#
+# vortify on fpydevs 
+#
+redis.port: 8379
+redis.db: 2
 redis.sleep: 0.5
 
 
 [cep]
-kafka.host: 'vsu-01'
+kafka.host: 'localhost'
 kafka.port: 9092
 kafka.ib.topic.tick_price: 'ib_tick_price'
 kafka.ib.topic.tick_size: 'ib_tick_size'
@@ -24,23 +27,24 @@ msg_bot.logconfig: "{'level': logging.INFO}"
 
 
 [ib_mds]
-ib_mds.logconfig: "{'filename': '/home/larry-13.04/workspace/finopt/log/ib_mds.log', 'filemode': 'w','level': logging.INFO}"
-ib_mds.ib_port: 7496
+ib_mds.logconfig: "{'filename': '/tmp/mds.log', 'filemode': 'w','level': logging.INFO}"
+#
+# 7497 vortify fpydev envt using paper account
+#
+ib_mds.ib_port: 7497
 #ib_mds.ib_port: 4001
 ib_mds.appid.id: 9800
-#ib_mds.gateway: 'localhost'
-ib_mds.gateway: '192.168.1.118'
+ib_mds.gateway: 'localhost'
 ib_mds.is_persist: 1
-ib_mds.persist_dir: '/home/larry-13.04/workspace/finopt/data/mds_files'
-ib_mds.subscription.fileloc: '/home/larry-13.04/workspace/finopt/data/subscription-hsio.txt'
+ib_mds.persist_dir: '/home/laxaurus/workspace/fpydevs/dat/mds_files'
+ib_mds.subscription.fileloc: '/home/laxaurus/workspace/fpydevs/dat/mds_files/instruments.txt'
 ib_mds.spill_over_limit: 10000
 
 [ib_heartbeat]
-ib_heartbeat.logconfig: "{'filename': '/home/larry-13.04/workspace/finopt/log/ib_mds.log', 'filemode': 'w','level': logging.INFO}"
+ib_heartbeat.logconfig: "{'filename': '/tmp/mds.log', 'filemode': 'w','level': logging.INFO}"
 #ib_heartbeat.ib_port: 4001
-ib_heartbeat.ib_port: 7496
-ib_heartbeat.appid.id: 9911
-#ib_heartbeat.gateway: 'localhost'
-ib_heartbeat.gateway: '192.168.1.118'
+ib_heartbeat.ib_port: 7497
+ib_heartbeat.appid.id: 9801
+ib_heartbeat.gateway: 'localhost'
 ib_heartbeat.try_interval: 90
 ib_heartbeat.suppress_msg_interval: 60

+ 7 - 1
src/config/tws_gateway.cfg

@@ -49,4 +49,10 @@ ib_heartbeat.suppress_msg_interval: 120
 webconsole.host: '0.0.0.0'
 webconsole.port:5001
 webconsole.debug: True
-webconsole.auto_reload: False
+webconsole.auto_reload: False
+#
+#
+restapi.list_label: 'api_log'
+restapi.telegram_tok: '870564010:AAFxQa7-WFe2JjTP3KLoyY77NW90smTrnig'
+
+

+ 2 - 1
src/misc2/helpers.py

@@ -219,7 +219,7 @@ class ContractHelper(BaseHelper):
         newContract.m_expiry = contractTuple[4]
         newContract.m_strike = contractTuple[5]
         newContract.m_right = contractTuple[6]
-        logging.debug( 'Contract Values:%s,%s,%s,%s,%s,%s,%s:' % contractTuple)
+        #logging.debug( 'Contract Values:%s,%s,%s,%s,%s,%s,%s:' % contractTuple)
         return newContract
     
     @staticmethod
@@ -357,6 +357,7 @@ class ContractHelper(BaseHelper):
         '''
         if contract.m_right == None or contract.m_right == '0' or contract.m_right == 0:
             contract.m_right = ''
+            contract.m_strike = 0
             
         if contract.m_expiry == None:
             contract.m_expiry = ''

+ 29 - 0
src/ormdapi/test/api_test.py

@@ -0,0 +1,29 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+import logging
+import json
+import requests
+
+
+
+def api_test(url, params, jdata=None):
+    headers = ''
+    r = requests.post(url, params, jdata)
+    print r.content
+
+if __name__ == '__main__':
+
+    kwargs = {
+                'logconfig': {'level': logging.INFO},
+                'mode': 'sync',
+                
+                
+              }
+    
+    url = 'http://localhost:5001/v2/order'
+    
+    params = {'contract': json.dumps({"right": "", "exchange": "HKFE", "symbol": "HSI", "expiry": "20190530", "currency": "HKD", "sec_type": "FUT", "strike": 0}),
+              'order_condition': json.dumps({"account": "U5550568", "order_type": "LMT", "price": 29200, "side": "BUY", "quantity": 1})}
+    data = {}
+    api_test(url, params, data)
+    

+ 8 - 1
src/ormdapi/test/order_generator.py

@@ -132,6 +132,11 @@ def read_dat_v2(path, mode, url, version_digit ):
 
 
 
+def api_test3():
+    headers = ''
+    data = ''
+    r = requests.post(url, data=json.dumps(data), headers=headers)
+
 def api_post(url):
 
     try:
@@ -158,8 +163,10 @@ if __name__ == '__main__':
     kwargs = {
                 'logconfig': {'level': logging.INFO},
                 'mode': 'sync',
-                'url': 'http://localhost:5001/v%s/order?contract=%s&order_condition=%s'
+                'url': 'http://ormd.vortifytech.com/v%s/order?contract=%s&order_condition=%s'
+                
               }
+    #'url': 'http://localhost:5001/v%s/order?contract=%s&order_condition=%s'
     
     usage = "usage: %prog [options]"
     parser = OptionParser(usage=usage)

+ 9 - 7
src/ormdapi/test/orders.dat

@@ -8,10 +8,10 @@
 # each line:
 # C:(contract tuple)|O:order instruction
 
-C:('HSI','FUT','HKFE','HKD','20190429',0,'')|O:'LMT','U9050568','BUY',1,29200
-C:('MHI','FUT','HKFE','HKD','20190429',0,'')|O:'LMT','U9050568','BUY',1,29000
-C:('HSI','OPT','HKFE','HKD','20190429',28200,'P')|O:'LMT','U9050568','BUY',1,2
-C:('HSI','OPT','HKFE','HKD','20190429',29200,'P')|O:'LMT','U9050568','BUY',3,2
+C:('HSI','FUT','HKFE','HKD','20190530',0,'')|O:'LMT','U9050568','BUY',1,29200
+C:('MHI','FUT','HKFE','HKD','20190530',0,'')|O:'LMT','U9050568','BUY',1,29000
+C:('HSI','OPT','HKFE','HKD','20190530',28200,'P')|O:'LMT','U9050568','BUY',1,2
+C:('HSI','OPT','HKFE','HKD','20190530',29200,'P')|O:'LMT','U9050568','BUY',3,2
 #
 # 
 C:('GBP','CASH','IDEALPRO','USD')|O:'MKT','DU1460682', 'BUY', 100000, 1.2986
@@ -19,9 +19,11 @@ C:('GBP','CASH','IDEALPRO','USD')|O:'LMT','DU1460682', 'BUY', 100000, 1.1986
 C:('USD','CASH','IDEALPRO','JPY')|O:'LMT','DU1460682', 'SELL', 100000, 119.55
 C:('BA','STK','SMART','USD')|O:'LMT','DU1460682', 'SELL', 100, 380.5
 C:('TSLA','STK','SMART','USD')|O:'MKT','DU1460682', 'SELL', 100, 0
-C:('TSLA', 'OPT', 'SMART', 'USD', '20190426', 270.0, 'C')|O:'MKT','DU1460682', 'SELL', 8, 0 
+C:('TSLA', 'OPT', 'SMART', 'USD', '20190510', 270.0, 'C')|O:'MKT','DU1460682', 'SELL', 8, 0 
+C:('TSLA', 'OPT', 'SMART', 'USD', '20190510', 235.0, 'C')|O:'LMT','DU1460682', 'SELL', 8, 4.65
+C:('TSLA', 'OPT', 'SMART', 'USD', '20190510', 238.0, 'C')|O:'LMT','DU1460682', 'BUY', 8, 3.5
 #
 # invalid stock code
 # 
-C:('BAF55','STK','SMART','USD')|O:'LMT','DU1460682', 'SELL', 100, 380.5
-C:('USD.JJJ','CASH','IDEALPRO','JPY')|O:'LMT','DU1460682', 'SELL', 100000, 119.55
+#C:('BAF55','STK','SMART','USD')|O:'LMT','DU1460682', 'SELL', 100, 380.5
+#C:('USD.JJJ','CASH','IDEALPRO','JPY')|O:'LMT','DU1460682', 'SELL', 100000, 119.55

+ 277 - 0
src/ormdapi/v2/api_utilities.py

@@ -0,0 +1,277 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+import logging
+from misc2.observer import Publisher, Subscriber
+import sys, traceback
+import json
+from time import sleep
+from misc2.helpers import ContractHelper
+from misc2.observer import Publisher
+from ib.ext.Contract import Contract
+from Queue import Queue
+import threading
+from datetime import datetime
+from py4j.tests.java_gateway_test import sleep
+from redis import Redis
+
+
+
+class ApiMessageSinkException(Exception):
+    pass
+
+
+class ApiMessageSink(threading.Thread, Publisher):
+
+
+    '''
+        the API message sink is designed to receive messages that are being
+        forwarded to here from rest API classes 
+        Example of messages that are sent to the sink include order requests 
+        received by the REST API classes from external clients.
+        Messages received are stored in an internal queue and then
+        get broadcasted to any other class that wants to consume these messages
+        
+        ApiMessagePersistence is a subscriber of these events with a purpose
+        to store the events permanently in the redis store
+        
+        TelegramApiMessageAlert is another subscriber that forwards messages
+        to the telegram bot so any one that has access to the bot can get
+        real time alerts of orders placed by a client
+    '''
+    ON_API_MESSAGE = 'on_api_message'
+    API_MESSAGES = [ON_API_MESSAGE] 
+
+    def __init__(self, kwargs):
+        threading.Thread.__init__(self)   
+        Publisher.__init__(self, ApiMessageSink.API_MESSAGES) 
+        self.msg_q = Queue()
+        self.stop = False
+        self.kwargs = kwargs
+        
+        
+    def add_message(self, event, source, message):
+        
+        now= datetime.today().strftime('%Y%m%d %H:%M:%S')
+        self.msg_q.put({'event': event, 'source': source, 'message': message, 'ts': now})
+        self.msg_q.task_done()
+
+    def set_stop(self):
+        self.stop= True
+        logging.info('ApiMessageSink: stopping....')
+    
+    def run(self):
+        while not self.stop:
+            if not self.msg_q.empty():
+                
+                m = self.msg_q.get()
+                del(m['event'])
+                self.dispatch('on_api_message', m)
+                
+
+                 
+            sleep(0.5)
+
+
+class BaseApiMessageSubscribe(Subscriber):
+    def __init__(self, api_sink):
+        self.api_sink = api_sink
+        for e in ApiMessageSink.API_MESSAGES:
+            self.api_sink.register(e, self, self.on_api_message)
+
+    def on_api_message(self, event, source, message, ts):
+        raise ApiMessageSinkException('Override this function in the inherited class!')
+
+
+class ApiMessagePersistence(BaseApiMessageSubscribe):
+    
+    def __init__(self, rs, kwargs, api_sink):
+        BaseApiMessageSubscribe.__init__(self, api_sink)
+        self.kwargs = kwargs
+        self.rs = rs
+        try:
+            self.list_label = '%d-%s' % (self.kwargs['tws_app_id'], self.kwargs['restapi.list_label'])
+        except KeyError:
+            self.list_label = '%d-%s' % (self.kwargs['tws_app_id'], 'api_log')
+         
+    
+    def on_api_message(self, event, source, message, ts):
+    #def on_api_message(self, **vars):
+        self.rs.rpush(self.list_label, [event, source, message, ts])
+
+from telegram.ext import Updater
+from telegram.ext import CommandHandler
+
+class TelegramApiMessageAlert(BaseApiMessageSubscribe):
+
+    def __init__(self, access_token, api_sink):
+        BaseApiMessageSubscribe.__init__(self, api_sink)
+        updater = Updater(token=access_token)
+        dispatcher = updater.dispatcher
+        start_handler = CommandHandler('start', self.start)
+        dispatcher.add_handler(start_handler)
+        updater.start_polling()
+        self.bot = None
+    
+    def start(self, bot, update):
+        bot.send_message(chat_id=update.message.chat_id, text="ordm alert bot ready.")
+        self.bot = bot
+        self.chat_id = update.message.chat_id
+        
+
+    def send_message(self, message):
+        print message
+        if self.bot:
+            self.bot.send_message(chat_id=self.chat_id, text=message)
+  
+
+
+    def on_api_message(self, event, source, message, ts):
+        self.send_message('[%s%s] %s' % (ts, source, message))
+
+
+from websocket_server import WebsocketServer
+from ws.ws_server import BaseWebSocketServerWrapper
+
+class APIWebSocketServer(BaseWebSocketServerWrapper, Subscriber):
+    
+    def __init__(self, name, kwargs):
+        
+        BaseWebSocketServerWrapper.__init__(self, name, kwargs)
+        self.clients = {}
+
+    def loop_forever(self):
+        pass
+        
+    def encode_message(self, event_type, content):        
+        return json.dumps({'event': event_type, 'value': content})
+            
+    def new_client(self, client, server):
+        BaseWebSocketServerWrapper.new_client(self, client, server)
+        self.clients[client['id']] = client
+        self.clients[client['id']]['request'] = {}
+        
+        
+        server.send_message(client, '%s' % client)
+    
+    # Called for every client disconnecting
+    def client_left(self, client, server):
+        BaseWebSocketServerWrapper.client_left(self, client, server)
+        del self.clients[client['id']]
+        
+   
+    # Called when a client sends a message1
+    def message_received(self, client, server, message):
+        BaseWebSocketServerWrapper.message_received(self, client, server, message)
+        print '%s %s %s' % (client, server, message)
+        
+
+    def handle_tws_event(self, **param):
+        print "APIWebSocketServer received tws events forwarded by gw"
+            
+    def update(self, event, **param): 
+        if event == 'orderStatus':
+            self.handle_tws_event(**param)
+        elif event == 'openOrder':
+            self.handle_tws_event(**param)
+        elif event == 'openOrderEnd':
+            self.handle_tws_event(**param)
+        elif event == 'error':
+            try:
+                id = param['id']
+                if id <> -1:
+                    self.handle_event(**param)
+            except:
+                logging.error('OrderBook ERROR: in processing tws error event %s' % param)
+                return
+            
+            
+
+import websocket
+import thread, time
+from threading import Thread
+class APIClient():
+ 
+    def __init__(self):
+         
+        self.ws = websocket.WebSocketApp("ws://localhost:9001",
+                                    on_message = self.on_message,
+                                    on_error = self.on_error,
+                                    on_close = self.on_close,
+                                    on_open = self.on_open)
+        #self.ws.on_open = self.on_open
+        self.stop = False
+        
+
+
+    def run_forever(self):
+        self.ws.run_forever()
+         
+    def on_message(self, message):
+        print 'apiclient recv %s' % message
+     
+    def on_error(self, error):
+        print error
+     
+    def on_close(self):
+        print "### closed ###"
+     
+     
+    def set_stop(self):
+        self.stop = True
+        
+    
+    def request_push_updates(self, request_type=None, fn_name=None ):
+        self.ws.send(json.dumps({'request_type': request_type, 'fn_name': fn_name}))
+
+    def on_open(self): #, ws):
+        
+        
+        def run(*args):
+            while not self.stop:
+                time.sleep(1)
+            self.ws.close()
+            print "thread terminating..."
+        thread.start_new_thread(run, ())
+
+
+
+def order_status():
+    pass
+        
+if __name__ == '__main__':
+
+    
+
+    
+    logging.basicConfig(**{'level': logging.INFO,  'filemode': 'w', 'filename':'/tmp/api.log'}) 
+    logging.info('started...')
+
+
+    aws = APIWebSocketServer('aws', {'ws_port': 9001})
+     
+    def run_background():
+        aws.server.run_forever()
+             
+    t = Thread(target=run_background)        
+    t.start()    
+     
+    ac = APIClient()
+    def run_apiclient():
+        ac.run_forever()
+        
+    s = Thread(target=run_apiclient)        
+    s.start()    
+    
+    
+    f_info = {'order_status': order_status}
+    ac.request_push_updates('order_status', 'order_status')
+#     
+#     
+#     websocket.enableTrace(True)
+#     ws = websocket.WebSocketApp("ws://localhost:9001",
+#                               on_message = on_message,
+#                               on_error = on_error,
+#                               on_close = on_close)
+#     ws.on_open = on_open
+#     ws.run_forever()           
+#     

+ 37 - 5
src/ormdapi/v2/apiv2.py

@@ -12,6 +12,22 @@ import json
 
 class InterestedTags():
     
+    '''
+    
+    order state information is not processed at this time.
+        
+        m_status = ""
+        m_initMargin = ""
+        m_maintMargin = ""
+        m_equityWithLoan = ""
+        m_commission = float()
+        m_minCommission = float()
+        m_maxCommission = float()
+        m_commissionCurrency = ""
+        m_warningText = ""
+    
+    '''
+    
     OrderStatus_tags = {'order': {'m_orderId': 'order_id',
                                   'm_clientId': 'client_id',
                                   'm_action': 'side',
@@ -25,11 +41,20 @@ class InterestedTags():
                                        'remaining': 'remaining',
                                        'avgFillPrice': 'avg_fill_price',    
                                        'permId': 'perm_id'},
+                        'contract': {'m_right': 'right', 
+                                     'm_exchange': 'exchange',
+                                     'm_symbol': 'symbol',
+                                     'm_currency': 'currency',
+                                     'm_secType': 'sec_type',
+                                     'm_strike': 'strike',
+                                     'm_expirt': 'expiry'},
+                        
                         'error': {'errorCode': 'error_code',
-                                  'errorMsg': 'error_msg'}
+                                  'errorMsg': 'error_msg'},
                         }
     
     
+    
     @staticmethod
     def filter_unwanted_tags(o_status):
         os = {}
@@ -80,7 +105,7 @@ class OpenOrdersStatus_v2(Resource):
                     pass
                 return None
 
-            open_orders = map(filter_tags, res)
+            open_orders = filter(lambda x: x <> None, map(filter_tags, res))
         
             
             return open_orders, 201
@@ -180,6 +205,9 @@ class SyncOrderCRUD_v2(Resource):
         contract = v2_helper.format_v2_str_to_contract(js_contract)
         js_order_cond = args.get('order_condition')
         clordid = str(uuid.uuid4())
+
+        self.wc.get_api_sink().add_message('/order', 'SyncOrderCRUD_v2:post', 'received new order %s condition: %s' % (js_contract, js_order_cond))
+        
         done = False
         iom = self.wc.get_parent().get_order_id_manager()
         iom.request_id('rest-api', clordid)
@@ -188,7 +216,7 @@ class SyncOrderCRUD_v2(Resource):
             id = iom.assigned_id(clordid)
             if id != None:
                 break
-            sleep(0.5)
+            sleep(0.1)
         
         try:    
             order = v2_helper.format_v2_str_to_order(js_order_cond)
@@ -215,6 +243,10 @@ class SyncOrderCRUD_v2(Resource):
             parser.add_argument('id', required=True, help="order id is required")
             args = parser.parse_args()
             id = int(args['id'])
+            
+            self.wc.get_api_sink().add_message('/order', 'SyncOrderCRUD_v2:delete', 'received delete order %d' % (id))
+            
+            
             if self.wc.get_parent().get_order_manager().is_id_in_order_book(id):
                 self.gw_conn.cancelOrder(int(id))
                 return {'info': 'cancellation request sent. Check order status'}, 200
@@ -268,9 +300,9 @@ class QuoteRequest_v2(Resource, Publisher):
                 sym =  self.quote_mgr.get_symbol_ticks(contract)
                 if sym:
                     break
-                sleep(0.5)
+                sleep(0.1)
                 i += 0.5 
-                if i >= 10:
+                if i >= 15:
                     return 'Not getting any quotes from the server after waited 5 seconds! Contact administrator', 404
                 
             return output_result(sym), 200

+ 5 - 1
src/sh/start_twsgw.sh

@@ -19,8 +19,12 @@ elif [ $HOST == 'vsu-longhorn' ]; then
         FINOPT_HOME=~/pyenvs/ironfly/finopt/src
         source /home/vuser-longhorn/pyenvs/finopt/bin/activate
         TWS_GATEWAY_CFG=tws_gateway_prd.cfg
+elif [ $HOST == 'vsu-vortify' ]; then
+        FINOPT_HOME=~/workspace/fpydevs/eclipse/finopt/src
+        source /home/vuser-vortify/workspace/fpydevs/env/bin/activate
+        TWS_GATEWAY_CFG=tws_gateway_avant.cfg
 fi
-					
+									
 						
 export PYTHONPATH=$FINOPT_HOME:$PYTHONPATH
 #