Pārlūkot izejas kodu

API new functions

return margin information in order status message
support for options greeks in quote message
fix bug to handle null exchange in contract
replace config handling method in ib_heartbeat
revamp ib_mds tick data for added support in saving 1 min us stock
options snapshots
add support for position query
add support for accountSummary in tws_event_handler
laxaurus 6 gadi atpakaļ
vecāks
revīzija
8c73e1932b

+ 161 - 94
src/cep/ib_mds.py

@@ -1,11 +1,8 @@
 # -*- coding: utf-8 -*-
-
 import sys, traceback
 import json
 import logging
-import ConfigParser
-from ib.ext.Contract import Contract
-from ib.opt import ibConnection, message
+from misc2.helpers import ContractHelper, ConfigMap
 from time import sleep
 import time, datetime
 from os import listdir
@@ -14,14 +11,11 @@ from threading import Lock
 from comms.ib_heartbeat import IbHeartBeat
 import threading, urllib2
 from optparse import OptionParser
-#from options_data import ContractHelper
-
 import finopt.options_data as options_data
-#from kafka.client import KafkaClient
-#from kafka.producer import SimpleProducer
-from kafka import KafkaConsumer, KafkaProducer
-from comms.alert_bot import AlertHelper
-
+from kafka import KafkaProducer
+from misc2.observer import Publisher
+from ib.opt import ibConnection
+import importlib
 ## 
 ## to run, start kafka server on vsu-01 <administrator, password>
 ## start ibgw or tws
@@ -29,53 +23,39 @@ from comms.alert_bot import AlertHelper
 ## check the settings in the console
 ## 
 
-class IbKafkaProducer():
+class IbKafkaProducer(Publisher):
     
-    config = None
-    con = None
-    quit = False
-    producer = None
-    IB_TICK_PRICE = None
-    IB_TICK_SIZE = None
-    toggle = False
-    persist = {}
-    ibh = None
-    tlock = None
-    ib_conn_status = None
-    
-    id2contract = {'conId': 1, 'id2contracts': {} }
+    IB_TICK_PRICE = 'tickPrice'
+    IB_TICK_SIZE = 'tickSize'
+    IB_TICK_OPTION_COMPUTATION = 'tickOptionComputation'
     
+    EVENT_READ_FILE_LINE = 'event_read_file_line'
+    PUBLISH_EVENTS = [EVENT_READ_FILE_LINE, IB_TICK_PRICE, IB_TICK_SIZE, IB_TICK_OPTION_COMPUTATION]
     
     def __init__(self, config, replay = False):
         
+        Publisher.__init__(self, IbKafkaProducer.PUBLISH_EVENTS)
+        
+        
         self.config = config
         self.tlock = Lock()
-#         host = self.config.get("ib_mds", "ib_mds.gateway").strip('"').strip("'")
-#         port = int(self.config.get("ib_mds", "ib_mds.ib_port"))
-#         appid = int(self.config.get("ib_mds", "ib_mds.appid.id"))  
-        kafka_host = self.config.get("cep", "kafka.host").strip('"').strip("'")
-        kafka_port =  self.config.get("cep", "kafka.port").strip('"').strip("'")
-        self.persist['is_persist'] = self.config.get("ib_mds", "ib_mds.is_persist")
-        self.persist['persist_dir'] =self.config.get("ib_mds", "ib_mds.persist_dir").strip('"').strip("'")
+        self.persist = {}
+        self.quit = False
+        self.toggle = False
+        self.id2contract = {'conId': 1, 'id2contracts': {} }
+        kafka_host = self.config["kafka.host"]
+        kafka_port =  self.config["kafka.port"]
+        self.persist['is_persist'] = config["ib_mds.is_persist"]
+        self.persist['persist_dir'] =config["ib_mds.persist_dir"]
         self.persist['file_exist'] = False
-        self.persist['spill_over_limit'] = int(self.config.get("ib_mds", "ib_mds.spill_over_limit"))
-
-        
-        IbKafkaProducer.IB_TICK_PRICE = self.config.get("cep", "kafka.ib.topic.tick_price").strip('"').strip("'")
-        IbKafkaProducer.IB_TICK_SIZE = self.config.get("cep", "kafka.ib.topic.tick_size").strip('"').strip("'")
-#         self.con = ibConnection(host, port, appid)
-#         self.con.registerAll(self.on_ib_message)
-#         rc = self.con.connect()
-#         if rc:
-#             self.ib_conn_status = 'OK'
-        
-#        logging.info('IbKafkaProducer: connection status to IB is %d' % rc)
-
+        self.persist['spill_over_limit'] = int(config["ib_mds.spill_over_limit"])
+        self.load_processors(config['ib_mds.processors'])       
+        IbKafkaProducer.IB_TICK_PRICE = config["kafka.ib.topic.tick_price"]
+        IbKafkaProducer.IB_TICK_SIZE = config["kafka.ib.topic.tick_size"]
         logging.info('******* Starting IbKafkaProducer')
         logging.info('IbKafkaProducer: connecting to kafka host: %s...' % kafka_host)
         logging.info('IbKafkaProducer: message mode is async')
 
-
         kwargs = {
           'name': 'ib_mds',
           'bootstrap_host': kafka_host,
@@ -86,17 +66,30 @@ class IbKafkaProducer():
           'order_transmit': False
           }
 
-        
-        #client = KafkaClient(**kwargs)
         self.producer = KafkaProducer(bootstrap_servers='%s:%s' % (kwargs['bootstrap_host'], kwargs['bootstrap_port']))
-        
         if not replay:
             self.start_ib_connection()
         
-#         # start heart beat monitor
-#         self.ibh = IbHeartBeat(config)
-#         self.ibh.register_listener([self.on_ib_conn_broken])
-#         #self.ibh.run()        
+#from cep.ib_mds.processor.us_stkopt import StockOptionsSnapshot
+        self.main_loop()
+    
+    def load_processors(self, plist):
+        
+        
+        def instantiate_processor(p):
+            p_toks = p.split('.')
+            class_name = p_toks[len(p_toks)-1]
+            module_name = p_toks[:len(p_toks)-1]
+            module = importlib.import_module(p.replace('.%s' % class_name, ''))
+            class_ = getattr(module, class_name)
+            return class_(self.config)
+                
+        processors = map(instantiate_processor, plist)
+        for p in processors:
+            map(lambda e: self.register(e, p, getattr(p, e)), IbKafkaProducer.PUBLISH_EVENTS)
+        
+        
+        
     
     def pub_cn_index(self, sec):
         #http://blog.csdn.net/moneyice/article/details/7877030
@@ -129,13 +122,15 @@ class IbKafkaProducer():
         
         
     def add_contract(self, clist):
-        
         tuple = map(lambda x: x if x not in [''] else None, clist) 
         c = options_data.ContractHelper.makeContract(tuple)
         self.con.reqMktData(self.id2contract['conId'], c, '', False)
         self.id2contract['id2contracts'][self.id2contract['conId']] = options_data.ContractHelper.makeRedisKeyEx(c)
         self.id2contract['conId']+=1
         
+    def delete_contract(self, id):
+        pass
+        
     def on_ib_conn_broken(self, msg):
         logging.error('IbKafkaProducer: connection is broken!')
         self.ib_conn_status = 'ERROR'
@@ -147,9 +142,9 @@ class IbKafkaProducer():
             self.con.eDisconnect()
     
             
-            host = self.config.get("ib_mds", "ib_mds.gateway").strip('"').strip("'")
-            port = int(self.config.get("ib_mds", "ib_mds.ib_port"))
-            appid = int(self.config.get("ib_mds", "ib_mds.appid.id"))        
+            host = self.config["ib_mds.gateway"]
+            port = int(self.config["ib_mds.ib_port"])
+            appid = int(self.config["ib_mds.appid.id"])        
             self.con = ibConnection(host, port, appid)
             self.con.registerAll(self.on_ib_message)
             rc = None
@@ -163,8 +158,8 @@ class IbKafkaProducer():
             if not self.quit:
                 # resubscribe tickers again!
                 self.load_tickers()                
-                a = AlertHelper(self.config)
-                a.post_msg('ib_mds recovered from broken ib conn, re-subscribe tickers...')            
+#                 a = AlertHelper(self.config)
+#                 a.post_msg('ib_mds recovered from broken ib conn, re-subscribe tickers...')            
                 logging.debug('on_ib_conn_broken: completed restoration. releasing lock...')
                 self.ib_conn_status = 'OK'
             
@@ -189,7 +184,7 @@ class IbKafkaProducer():
             return json.dumps(d)
 
         
-        if msg.typeName in ['tickPrice', 'tickSize']:
+        if msg.typeName in ['tickPrice', 'tickSize', 'tickOptionComputation']:
 
             t = create_tick_kmessage(msg)             
             logging.debug(t)   
@@ -199,7 +194,14 @@ class IbKafkaProducer():
             if self.persist['is_persist']:
                 self.write_message_to_file(t)
                 
+            event = msg.typeName
+            attrs = dir(msg)
+            params = dict()
+            for e in filter(lambda a: a not in ['typeName', 'keys', 'items', 'values'] and '__' not in a, attrs):
+                params[e] = getattr(msg, e)
+            params['contract_key'] = self.id2contract['id2contracts'][msg.tickerId]
             
+            self.dispatch(event, params)
             
 #         print ",%0.4f,%0.4f" % (msg.pos, msg.avgCost)
 #         self.producer.send_messages('my.topic', "%0.4f,%0.4f" % (msg.pos, msg.avgCost))
@@ -228,9 +230,9 @@ class IbKafkaProducer():
 
 
     def start_ib_connection(self):
-        host = self.config.get("ib_mds", "ib_mds.gateway").strip('"').strip("'")
-        port = int(self.config.get("ib_mds", "ib_mds.ib_port"))
-        appid = int(self.config.get("ib_mds", "ib_mds.appid.id"))          
+        host = self.config["ib_mds.gateway"]
+        port = int(self.config["ib_mds.ib_port"])
+        appid = int(self.config["ib_mds.appid.id"])          
         self.con = ibConnection(host, port, appid)
         self.con.registerAll(self.on_ib_message)
         rc = self.con.connect()
@@ -239,18 +241,38 @@ class IbKafkaProducer():
         logging.info('start_ib_connection: connection status to IB is %d' % rc)
         
         # start heart beat monitor
-        self.ibh = IbHeartBeat(config)
+        self.ibh = IbHeartBeat(self.config)
         self.ibh.register_listener([self.on_ib_conn_broken])
         self.ibh.run()        
         
-
+    '''
+        SAMPLE FILE FORMAT
+        
+        #
+        # US Stock options
+        # symbol, sectype, exch, ccy, expiry, strike, right
+        #
+        #
+        GOOG,STK,SMART,USD,,,
+        GOOG,OPT,SMART,USD,20190510,1172.5,C
+        GOOG,OPT,SMART,USD,20190510,1172.5,P
+        GOOG,OPT,SMART,USD,20190510,1175,C
+        GOOG,OPT,SMART,USD,20190510,1175,P
+        GOOG,OPT,SMART,USD,20190510,1177.5,C
+        GOOG,OPT,SMART,USD,20190510,1177.5,P
+        GOOG,OPT,SMART,USD,20190510,1180,C
+        GOOG,OPT,SMART,USD,20190510,1180,P
+        GOOG,OPT,SMART,USD,20190510,1182.5,C
+        GOOG,OPT,SMART,USD,20190510,1182.5,P
+    
+    '''
     def load_tickers(self, path=None):
         
         self.id2contract = {'conId': 1, 'id2contracts': {} }
         
         
         if path is None:
-            path = self.config.get("ib_mds", "ib_mds.subscription.fileloc").strip('"').strip("'")
+            path = self.config["ib_mds.subscription.fileloc"]
         logging.info('load_tickers: attempt to open file %s' % path)
         fr = open(path)
         for l in fr.readlines():
@@ -258,6 +280,7 @@ class IbKafkaProducer():
                  
                 self.add_contract([t for t in l.strip('\n').split(',')])
             
+            self.dispatch(IbKafkaProducer.EVENT_READ_FILE_LINE, {'record': l})
 
     def do_work(self):
         while not self.quit:
@@ -311,28 +334,60 @@ class IbKafkaProducer():
             process_msg(f)
         
 
-    def console(self):
+
+
+
+    def main_loop(self):
+        def print_menu():
+            menu = {}
+            menu['1']="list all subscribed contracts," 
+            menu['2']="turn/on off  output"
+            menu['3']="Start up configuration"
+            menu['4']=""
+            menu['9']="Exit"
+    
+            choices=menu.keys()
+            choices.sort()
+            for entry in choices: 
+                print entry, menu[entry]                             
+            
+        def get_user_input(selection):
+                
+                print_menu()
+                while 1:
+                    resp = sys.stdin.readline()
+                    response[0] = resp.strip('\n')        
         try:
-            while not self.quit:
-                print "Available commands are: l - list all subscribed contracts, a <symbol> <sectype> <exch> <ccy>"
-                print "                        t - turn/on off  output q - terminate program"
-                cmd = raw_input(">>")
-                input = cmd.split(' ')
-                if input[0]  == "q":
-                    print 'quit command received...'
-                    self.quit = True
-                elif input[0] == 'l' :
-                    print ''.join('%s: %s\n' % (k, v) for k, v in self.id2contract['id2contracts'].iteritems())
-                elif input[0] == 'a':
-                    self.add_contract((input[1],input[2],input[3],input[4],'',0,''))
-                elif input[0] == 't':
-                    self.toggle = False if self.toggle else True
-                else:
-                    pass
             
-        except:
-            exc_type, exc_value, exc_traceback = sys.exc_info()
-            traceback.print_tb(exc_traceback, limit=1, file=sys.stdout)
+            response = [None]
+            user_input_th = threading.Thread(target=get_user_input, args=(response,))
+            user_input_th.daemon = True
+            user_input_th.start()               
+            self.pcounter = 0
+            self.menu_loop_done = False
+            while not self.menu_loop_done: 
+                
+                sleep(.5)
+                
+                if response[0] is not None:
+                    selection = response[0]
+                    if selection =='1':
+                        print ''.join('%s: %s\n' % (k, v) for k, v in self.id2contract['id2contracts'].iteritems())
+                    elif selection == 't':
+                        self.toggle = False if self.toggle else True
+                    elif selection == '9': 
+                        print 'quit command received...'
+                        self.quit = True                        
+                        sys.exit(0)
+                        break
+                    else: 
+                        pass                        
+                    response[0] = None
+                    print_menu()                
+                
+        except (KeyboardInterrupt, SystemExit):
+                logging.error('ib_mds: caught user interrupt. Shutting down...')
+                sys.exit(0)   
             
 
 if __name__ == '__main__':
@@ -343,7 +398,12 @@ if __name__ == '__main__':
     parser.add_option("-r", "--replay",
                       dest="replay_dir",
                       help="replay recorded mds files stored in the specified directory")
-                      
+    parser.add_option("-s", "--symbols",
+                      action="store", dest="symbols_file", 
+                      help="the file defines stocks symbols for which to save ticks")                      
+    parser.add_option("-f", "--config_file",
+                      action="store", dest="config_file", 
+                      help="path to the config file")                
 
     
     options, arguments = parser.parse_args()
@@ -354,17 +414,24 @@ if __name__ == '__main__':
         print("Usage: %s [options] <config file>" % sys.argv[0])
         exit(-1)    
 
-    cfg_path= arguments[0]
-    config = ConfigParser.SafeConfigParser()
-    if len(config.read(cfg_path)) == 0:      
-        raise ValueError, "Failed to open config file" 
     
+    kwargs = ConfigMap().kwargs_from_file(options.config_file)
+    for option, value in options.__dict__.iteritems():
+        
+        if value <> None:
+            kwargs[option] = value
+    cfg_path= options.config_file
+
+    if options.symbols_file:
+        kwargs['ib_mds.subscription.fileloc']= options.symbols_file
       
-    logconfig = eval(config.get("ib_mds", "ib_mds.logconfig").strip('"').strip("'"))
+    logconfig = kwargs['ib_mds.logconfig']
     logconfig['format'] = '%(asctime)s %(levelname)-8s %(message)s'    
-    logging.basicConfig(**logconfig)    
+    logging.basicConfig(**logconfig)        
+
+    logging.info('config settings: %s' % kwargs)
     replay = True if options.replay_dir <> None else False 
-    ik = IbKafkaProducer(config, replay)
+    ik = IbKafkaProducer(kwargs, replay)
     
     if not replay:
         ik.load_tickers()    

+ 0 - 0
src/cep/ib_mds/__init__.py


+ 0 - 0
src/cep/ib_mds/processor/__init__.py


+ 146 - 0
src/cep/ib_mds/processor/us_stkopt.py

@@ -0,0 +1,146 @@
+# -*- coding: utf-8 -*-
+import sys, traceback
+import json
+import logging
+from time import sleep
+import threading
+import time, datetime
+from finopt.instrument import Symbol
+from misc2.helpers import ContractHelper
+from misc2.observer import Subscriber
+import os
+
+class StockOptionsSnapshot(threading.Thread, Subscriber):
+    
+    def __init__(self, config):
+        threading.Thread.__init__(self)
+        Subscriber.__init__(self, 'StockOptionsSnapshot')
+        self.symbols = {}
+        self.run_forever()
+        self.persist={}
+        self.persist['is_persist'] = config["stkopt.is_persist"]
+        self.persist['persist_dir'] =config["stkopt.persist_dir"]
+        self.persist['file_exist'] = False
+        self.persist['spill_over_limit'] = int(config["stkopt.spill_over_limit"])
+        
+        
+    def run_forever(self):
+        self.start()
+    
+    def set_stop(self):
+        self.stop = True
+        
+    
+    def event_read_file_line(self, event, record=None):
+        print record
+    
+    def tickPrice(self, event, **params):
+        logging.debug('StockOptionsSnapshot:tickPrice')
+        try:
+            contract_key = params['contract_key']
+            s = self.symbols[contract_key]
+        except KeyError:
+            s = Symbol(ContractHelper.makeContractfromRedisKeyEx(contract_key))
+            self.symbols[contract_key] = s
+        s.set_tick_value(params['field'], params['price'])        
+    
+    def tickSize(self, event, **params):
+        logging.debug('QuoteHandler:ticksize')
+        try:
+            contract_key = params['contract_key']
+            s = self.symbols[contract_key]
+        except KeyError:
+            s = Symbol(ContractHelper.makeContractfromRedisKeyEx(contract_key))
+            self.symbols[contract_key] = s
+        s.set_tick_value(params['field'], params['size'])
+
+
+    #def tickOptionComputation(self, tickerId, field, impliedVol, delta, optPrice, pvDividend, gamma, vega, theta, undPrice):
+    def tickOptionComputation(self, event, **params):
+        logging.debug('QuoteHandler:tickOptionComputation')
+        try:
+            contract_key = params['contract_key']
+            s = self.symbols[contract_key]
+        except KeyError:
+            s = Symbol(ContractHelper.makeContractfromRedisKeyEx(contract_key))
+            self.symbols[contract_key] = s
+        greeks = dict(params)
+        del greeks['field']
+        del greeks['contract_key']
+        s.set_ib_option_greeks(params['field'], greeks)       
+    
+    def save_record(self, ts, symbol_key):
+        if self.persist['file_exist'] == False:
+            if not os.path.isdir(self.persist['persist_dir']):
+                os.mkdir(self.persist['persist_dir'])
+            fn = '%s/ibkdump-%s.txt' % (self.persist['persist_dir'], datetime.datetime.now().strftime('%Y%m%d%H%M%S'))
+            logging.debug('write_message_to_file: path %s', fn)
+            self.persist['fp'] = open(fn, 'w')
+            self.persist['file_exist'] = True
+            self.persist['spill_over_count'] = 0
+            self.persist['fp'].write('#\r\n')
+            self.persist['fp'].write('# [time][symbol]0----1----2----3-----4----5----6----7----8----9----10----11----12----13----14----15-----[bid greeks: impliedVol, delta, optPrice, pvDividend, gamma, vega, theta, undPrice][ask greeks: impliedVol, delta, optPrice, pvDividend, gamma, vega, theta, undPrice][last greeks: impliedVol, delta, optPrice, pvDividend, gamma, vega, theta, undPrice]\r\n')
+            self.persist['fp'].write('#               bsiz last ask  asiz  lsiz high low  volm cls                                     opentic\r\n')
+            self.persist['fp'].write('#\r\n')
+        
+        def create_record_str(symbol_key):
+            tickvals = [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0]
+            for item in self.symbols[symbol_key].get_tick_values().items():
+                tickvals[item[0]] = item[1]
+                
+            opt_fields = [(Symbol.BID_OPTION, [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0]), 
+                          (Symbol.ASK_OPTION, [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0]), 
+                          (Symbol.LAST_OPTION, [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0])]
+            for ofld in opt_fields:
+                try:
+                    gdict = self.symbols[symbol_key].get_ib_option_greeks(ofld[0])
+                    ofld[1][0] = gdict['impliedVol']
+                    ofld[1][1] = gdict['delta']
+                    ofld[1][2] = gdict['optPrice']
+                    ofld[1][3] = gdict['pvDividend']
+                    ofld[1][4] = gdict['gamma']
+                    ofld[1][5] = gdict['vega']
+                    ofld[1][6] = gdict['theta']
+                    ofld[1][7] = gdict['undPrice'] 
+                except:
+                    continue
+            ofld_str = ''
+            for greek_blob in opt_fields:
+                ofld_str += ''.join('%0.4f,' % e for e in greek_blob[1])
+            return '%s,%s,%s,%s\r' % (ts, symbol_key,  ','.join('%0.2f' % e for e in tickvals), ofld_str)
+            
+            
+        s = create_record_str(symbol_key)
+        self.persist['fp'].write(s)
+        self.persist['fp'].flush()
+        print s
+        self.persist['spill_over_count'] +=1
+        #print self.persist['spill_over_count']
+        if self.persist['spill_over_count'] == self.persist['spill_over_limit']:
+            self.persist['fp'].flush()
+            self.persist['fp'].close()
+            self.persist['file_exist'] = False
+    
+        
+    
+    def run(self):
+        self.stop = False
+        self.record_to_file = True
+
+        def save_all_records():
+            for k in self.symbols.keys():
+                self.save_record(now.strftime("%H:%M:%S"), k)
+
+        save_all_records()
+        while not self.stop:
+            now =  datetime.datetime.now()
+            zz = now.strftime("%S")
+            if zz == '00' and self.record_to_file:
+                self.record_to_file = False
+                save_all_records()
+                logging.info( 'write stuff to file at one minute interval %s' % now.strftime("%H:%M:%S"))
+            elif zz <> '00':
+                self.record_to_file = True
+            
+    
+            sleep(0.1)

+ 8 - 8
src/comms/ib_heartbeat.py

@@ -26,7 +26,7 @@ class IbHeartBeat():
     
     def __init__(self, config):
         self.config = config
-        self.chat_handle = AlertHelper(config)              
+        #self.chat_handle = AlertHelper(config)              
         
         # ensure the message will get printed right away when the connection is first broken
         self.last_broken_time =  datetime.datetime.now() - datetime.timedelta(seconds=90)  
@@ -47,11 +47,11 @@ class IbHeartBeat():
         self.quit = True
         
     def keep_trying(self):
-        host = self.config.get("ib_heartbeat", "ib_heartbeat.gateway").strip('"').strip("'")
-        port = int(self.config.get("ib_heartbeat", "ib_heartbeat.ib_port"))
-        appid = int(self.config.get("ib_heartbeat", "ib_heartbeat.appid.id"))      
-        try_interval = int(self.config.get("ib_heartbeat", "ib_heartbeat.try_interval"))
-        suppress_msg_interval = int(self.config.get("ib_heartbeat", "ib_heartbeat.suppress_msg_interval"))
+        host = self.config["ib_heartbeat.gateway"]
+        port = int(self.config["ib_heartbeat.ib_port"])
+        appid = int(self.config["ib_heartbeat.appid.id"])      
+        try_interval = int(self.config["ib_heartbeat.try_interval"])
+        suppress_msg_interval = int(self.config["ib_heartbeat.suppress_msg_interval"])
         logging.info('ib gateway->%s:%d, appid->%d, try_interval->%d, suppress msg interval->%d' % \
                      (host, port, appid, try_interval, suppress_msg_interval))
         while not self.quit:
@@ -60,7 +60,7 @@ class IbHeartBeat():
             if rc:
                 if self.prev_state == 'broken':
                     msg = '*** Connection restored at %s **********' % datetime.datetime.now().strftime('%H:%M:%S')
-                    self.chat_handle.post_msg(msg)
+                    #self.chat_handle.post_msg(msg)
                     self.alert_listeners(msg)
                     self.prev_state = ''
                     # reset to a much earlier time
@@ -72,7 +72,7 @@ class IbHeartBeat():
                 self.prev_state = 'broken' 
                 #print now, self.last_broken_time, (now - self.last_broken_time).seconds
                 if (now - self.last_broken_time).seconds > suppress_msg_interval:
-                    self.chat_handle.post_msg(msg)
+                    #self.chat_handle.post_msg(msg)
                     self.alert_listeners(msg)
                     self.last_broken_time = now 
                     logging.error(msg)

+ 2 - 1
src/comms/ibgw/order_manager.py

@@ -5,6 +5,7 @@ import sys, traceback
 import json
 from time import sleep
 from misc2.helpers import ContractHelper
+from misc2.observer import Subscriber
 from ib.ext.Contract import Contract
 from comms.ibgw.base_messaging import BaseMessageListener
 from comms.ibgw.tws_event_handler import TWS_event_handler
@@ -12,7 +13,7 @@ from Queue import Queue
 import threading
 import uuid
 import numpy as np
-from finopt.test_pattern import Subscriber
+
 
 
 class OrderManagerException(Exception):

+ 23 - 15
src/comms/ibgw/tws_event_handler.py

@@ -6,7 +6,7 @@ import logging
 import traceback
 from ib.ext.EWrapper import EWrapper
 import json
-
+from copy import deepcopy
 
         
 class TWS_event_handler(EWrapper, Publisher):
@@ -18,7 +18,9 @@ class TWS_event_handler(EWrapper, Publisher):
     # any classes that is interested in listening
     # WebConsole is one such subscriber
     # it is interested in 
-    PUBLISH_TWS_EVENTS = ['error', 'openOrder', 'openOrderEnd', 'orderStatus', 'openBound', 'tickPrice', 'tickSize']
+    PUBLISH_TWS_EVENTS = ['error', 'openOrder', 'openOrderEnd', 'orderStatus', 'openBound', 'tickPrice', 'tickSize',
+                          'tickOptionComputation', 'position', 'accountSummary'
+                          ]
     
     def __init__(self, producer):
         self.producer = producer
@@ -43,7 +45,7 @@ class TWS_event_handler(EWrapper, Publisher):
             self.producer.send_message(message, self.producer.message_dumps(dict))   
             
             # forward message to subscribed consumers,
-            # that is webconsole
+            # that is webconsole / RESTAPI interfaces
             if message in self.PUBLISH_TWS_EVENTS:
                 self.dispatch(message, dict) 
         except:
@@ -91,16 +93,19 @@ class TWS_event_handler(EWrapper, Publisher):
         #pass
     
     def tickSize(self, tickerId, field, size):
-         logging.debug('TWS_event_handler:tickSize. %d<->%s' % (tickerId,self.subscription_manger.get_contract_by_id(tickerId) ))
-         self.broadcast_event('tickSize', {'contract_key': self.subscription_manger.get_contract_by_id(tickerId), 
+        logging.debug('TWS_event_handler:tickSize. %d<->%s' % (tickerId,self.subscription_manger.get_contract_by_id(tickerId) ))
+        self.broadcast_event('tickSize', {'contract_key': self.subscription_manger.get_contract_by_id(tickerId), 
                                             'field': field, 'size': size})
         #pass
     
     
     def tickOptionComputation(self, tickerId, field, impliedVol, delta, optPrice, pvDividend, gamma, vega, theta, undPrice):
+        greeks = vars().copy()
+        del greeks['tickerId']
+        del greeks['self']
+        self.broadcast_event('tickOptionComputation', {'contract_key': self.subscription_manger.get_contract_by_id(tickerId), 
+                                                       'greeks': greeks}) 
         
-        #self.broadcast_event('tickOptionComputation', self.pre_process_message(vars())) #vars())
-        pass
 
     def tickGeneric(self, tickerId, tickType, value):
         #self.broadcast_event('tickGeneric', vars())
@@ -304,15 +309,16 @@ class TWS_event_handler(EWrapper, Publisher):
 
 
     def position(self, account, contract, pos, avgCost):
+        
         contract_key= ContractHelper.makeRedisKeyEx(contract)
         logging.info('TWS_event_handler:position. [%s]:position= %d' % (contract_key, pos))
         self.broadcast_event('position', {
-                                'account': account,
-                                'contract_key': contract_key, 
-                                'position': pos, 'average_cost': avgCost,
-                                'end_batch': False
-                                
-                                })        
+                                 'account': account,
+                                 'contract_key': contract_key, 
+                                 'position': pos, 'average_cost': avgCost,
+                                 'end_batch': False
+                                 
+                                 })        
 
     def positionEnd(self):
         '''
@@ -337,7 +343,9 @@ class TWS_event_handler(EWrapper, Publisher):
 
 
     def accountSummary(self, reqId, account, tag, value, currency):
-        self.broadcast_event('accountSummary', vars())
+        v = {'reqId': reqId, 'account': account, 'tag': tag, 'value': value, 'currency': currency, 'end_batch': False}
+        self.broadcast_event('accountSummary', v)
 
     def accountSummaryEnd(self, reqId):
-        self.broadcast_event('accountSummaryEnd', vars())
+        v = {'reqId': reqId, 'end_batch': True}
+        self.broadcast_event('accountSummary', v)

+ 8 - 2
src/comms/ibgw/tws_gateway.py

@@ -21,6 +21,7 @@ from comms.tws_protocol_helper import TWS_Protocol
 from comms.ibgw.tws_gateway_restapi import WebConsole 
 from comms.ibgw.order_manager import OrderManager
 from ormdapi.v2.quote_handler import QuoteRESTHandler
+from ormdapi.v2.position_handler import AccountPositionTracker
 import redis
 import threading
 from threading import Lock
@@ -152,15 +153,17 @@ class TWS_gateway():
         self.tws_event_handler.set_subscription_manager(self.contract_subscription_mgr)
 
 
+    '''
+        this group of objects are for handling rest API requests
+    '''
     def initialize_order_quote_manager(self):
 #         self.order_id_mgr = OrderIdManager(self.tws_connection)
 #         self.tws_event_handler.set_order_id_manager(self.order_id_mgr)
 #         self.order_id_mgr.start()
         self.order_manager = OrderManager('order_manager', self, self.kwargs)
         self.order_manager.start_order_manager()
-        
-        
         self.quote_manager = QuoteRESTHandler('quote_manager', self)
+        self.pos_manager = AccountPositionTracker('acctpos_manager', self)
         
     def initialize_redis(self):
 
@@ -211,6 +214,9 @@ class TWS_gateway():
     def get_quote_manager(self):
         return self.quote_manager
     
+    def get_pos_manager(self):
+        return self.pos_manager
+    
     def get_redis_conn(self):
         return self.rs
 

+ 4 - 3
src/comms/ibgw/tws_gateway_restapi.py

@@ -26,9 +26,9 @@ class WebConsole(Subscriber):
         self.parent = parent
         self.id_message = {}
         '''
-            message sink is a message queue that stores any event that the api classes wanted to log down
-            the sink broadcast any received message to interested subscribers: message_store and telegram bot
-            message store is a persistor that 
+            message sink is a message queue that stores any event to be logged by the api classes
+            the sink broadcasts any received message to interested subscribers: message_store and telegram bot
+            message store persists events in redis 
         '''
         self.message_sink = ApiMessageSink(self.parent.get_config())
         message_store = ApiMessagePersistence(self.parent.get_redis_conn(), self.parent.get_config(), self.message_sink)
@@ -60,6 +60,7 @@ class WebConsole(Subscriber):
         WebConsole.api.add_resource(apiv2.OrderStatus_v2, '/v2/order_status/<id>', resource_class_kwargs={'webconsole': self})
         WebConsole.api.add_resource(apiv2.OpenOrdersStatus_v2, '/v2/open_orders', resource_class_kwargs={'webconsole': self})
         WebConsole.api.add_resource(apiv2.QuoteRequest_v2, '/v2/quote', resource_class_kwargs={'webconsole': self})
+        WebConsole.api.add_resource(apiv2.AcctPosition_v2, '/v2/position', resource_class_kwargs={'webconsole': self})
 
 
     def set_stop(self):

+ 18 - 26
src/config/mds.cfg

@@ -1,44 +1,36 @@
-[redis]
-redis.server: "localhost"
-#
-# vortify on fpydevs 
-#
-redis.port: 8379
-redis.db: 2
-redis.sleep: 0.5
-
-
 [cep]
-kafka.host: 'localhost'
+kafka.host: 'vorsprung'
 kafka.port: 9092
 kafka.ib.topic.tick_price: 'ib_tick_price'
 kafka.ib.topic.tick_size: 'ib_tick_size'
 
 
-[alert_bot]
-msg_bot.jid: "robo@route69.hopto.org"
-msg_bot.pass: 123
-msg_bot.recipients: "['blueman@route69.hopto.org']"
-msg_bot.redis_mq: 'chatq'
-msg_bot.redis_prefix: 'alert_bot'  
-#
-# 'filename': '../log/alert_bot.log', 'filemode': 'w', 
-msg_bot.logconfig: "{'level': logging.INFO}"
-
-
 [ib_mds]
-ib_mds.logconfig: "{'filename': '/tmp/mds.log', 'filemode': 'w','level': logging.INFO}"
+ib_mds.logconfig: {'filename': '/tmp/mds.log', 'filemode': 'w','level': logging.INFO}
 #
 # 7497 vortify fpydev envt using paper account
 #
-ib_mds.ib_port: 7497
-#ib_mds.ib_port: 4001
+#ib_mds.ib_port: 7497
+#ib_mds.appid.id: 9800
+#ib_mds.gateway: 'localhost'
+ib_mds.ib_port: 8496
 ib_mds.appid.id: 9800
-ib_mds.gateway: 'localhost'
+ib_mds.gateway: 'vsu-longhorn'
+
 ib_mds.is_persist: 1
 ib_mds.persist_dir: '/home/laxaurus/workspace/fpydevs/dat/mds_files'
 ib_mds.subscription.fileloc: '/home/laxaurus/workspace/fpydevs/dat/mds_files/instruments.txt'
 ib_mds.spill_over_limit: 10000
+ib_mds.processors: ['cep.ib_mds.processor.us_stkopt.StockOptionsSnapshot']
+
+[StockOptionsSnapshot]
+stkopt.interval: 60
+stkopt.is_persist: 1
+stkopt.persist_dir: '/home/laxaurus/workspace/fpydevs/dat/us_stkopt'
+stkopt.spill_over_limit: 10000
+stkopt.processors: ['cep.ib_mds.processor.us_stkopt.StockOptionsSnapshot']
+
+
 
 [ib_heartbeat]
 ib_heartbeat.logconfig: "{'filename': '/tmp/mds.log', 'filemode': 'w','level': logging.INFO}"

+ 1 - 1
src/config/tws_gateway.cfg

@@ -53,6 +53,6 @@ webconsole.auto_reload: False
 #
 #
 restapi.list_label: 'api_log'
-restapi.telegram_tok: '870564010:AAFxQa7-WFe2JjTP3KLoyY77NW90smTrnig'
+#restapi.telegram_tok: '870564010:AAFxQa7-WFe2JjTP3KLoyY77NW90smTrnig'
 
 

+ 21 - 0
src/finopt/instrument.py

@@ -79,9 +79,20 @@ class Symbol():
     OPEN_TICK =14
     
     
+    #https://interactivebrokers.github.io/tws-api/tick_types.html
+    BID_OPTION= 10
+    ASK_OPTION= 11
+    LAST_OPTION=12
+    #def tickOptionComputation(self, tickerId, field, impliedVol, delta, optPrice, pvDividend, gamma, vega, theta, undPrice):
+    
     def __init__(self, contract):
         self.contract = contract
         self.tick_values = {}
+        '''
+            ib_option_greeks:
+            key in 10,11,12 and value is a dict of impliedVol, delta, optPrice, pvDividend, gamma, vega, theta, undPrice
+        '''
+        self.ib_option_greeks = {} 
         self.extra = {}
         self.key = ContractHelper.makeRedisKeyEx(contract)    
     
@@ -112,6 +123,16 @@ class Symbol():
     def get_key(self):
         return self.key
     
+    def set_ib_option_greeks(self, id, greeks):
+        for k, v in greeks.iteritems():
+            # overflow 9223372036854775808
+            if v > 9223372036854700000:
+                greeks[k] = float('nan')
+        self.ib_option_greeks[id] = greeks
+        
+    def get_ib_option_greeks(self, id):
+        return self.ib_option_greeks[id]
+    
 class Option(Symbol):
     """
         Tick Value      Description

+ 3 - 2
src/misc2/helpers.py

@@ -339,8 +339,9 @@ class ContractHelper(BaseHelper):
         try:
             contract.m_exchange = ContractHelper.map_rules['exchange'][contract.m_symbol]
         except:
-            
-            if 'CASH' in contract.m_secType:
+            if contract.m_exchange == None:
+                contract.m_exchange = ''  
+            elif 'CASH' in contract.m_secType:
                 pass
             elif 'USD' in contract.m_currency and 'SMART' in contract.m_exchange:
                 pass 

+ 8 - 6
src/ormdapi/test/order_generator.py

@@ -32,7 +32,7 @@ def read_dat_v1(path, mode, url ):
         contract = [None, None, None, None, None, None, None]
         read_vals = list(eval(vals[1]))
         contract = read_vals + contract[len(read_vals):]
-        print contract
+        #print contract
         c = ContractHelper.makeContract(tuple(contract))
         cs = ContractHelper.contract2kvstring(c)
         
@@ -85,7 +85,7 @@ def format_order_to_v2_str(order):
     return json.dumps(odict)
     
 
-def read_dat_v2(path, mode, url, version_digit ):
+def read_dat_v2(path, mode, url, quote_url, version_digit ):
     results = []
     f = open(path)
     lns = f.readlines()
@@ -102,7 +102,7 @@ def read_dat_v2(path, mode, url, version_digit ):
         contract = read_vals + contract[len(read_vals):]
         c = ContractHelper.makeContract(tuple(contract))
         del c.__dict__['m_includeExpired']
-        print c.__dict__
+        #print c.__dict__
         
         cs = format_contract_to_v2_str(c)
         
@@ -126,7 +126,7 @@ def read_dat_v2(path, mode, url, version_digit ):
         
         if mode == 'sync':
             results.append(url % (version_digit, cs, os))
-        
+            results.append(quote_url % (version_digit, cs))
     
     return results   
 
@@ -163,7 +163,9 @@ if __name__ == '__main__':
     kwargs = {
                 'logconfig': {'level': logging.INFO},
                 'mode': 'sync',
-                'url': 'http://ormd.vortifytech.com/v%s/order?contract=%s&order_condition=%s'
+                #'url': 'http://ormd.vortifytech.com/v%s/order?contract=%s&order_condition=%s'
+                'url': 'http://localhost:5001/v%s/order?contract=%s&order_condition=%s',
+                'quote_url': 'http://localhost:5001/v%s/quote?contract=%s'
                 
               }
     #'url': 'http://localhost:5001/v%s/order?contract=%s&order_condition=%s'
@@ -207,7 +209,7 @@ if __name__ == '__main__':
     if kwargs['version'] == '1':
         results= read_dat_v1(kwargs['dat_file'], kwargs['mode'], kwargs['url'], kwargs['version'])
     else:
-        results= read_dat_v2(kwargs['dat_file'], kwargs['mode'], kwargs['url'], kwargs['version'])
+        results= read_dat_v2(kwargs['dat_file'], kwargs['mode'], kwargs['url'], kwargs['quote_url'], kwargs['version'])
     if kwargs['case_no'] == '1':
         print '\n'.join(s for s in results)
     elif kwargs['case_no'] == '2':

+ 14 - 4
src/ormdapi/test/orders.dat

@@ -7,6 +7,9 @@
 #
 # each line:
 # C:(contract tuple)|O:order instruction
+#
+#
+#
 
 C:('HSI','FUT','HKFE','HKD','20190530',0,'')|O:'LMT','U9050568','BUY',1,29200
 C:('MHI','FUT','HKFE','HKD','20190530',0,'')|O:'LMT','U9050568','BUY',1,29000
@@ -19,11 +22,18 @@ C:('GBP','CASH','IDEALPRO','USD')|O:'LMT','DU1460682', 'BUY', 100000, 1.1986
 C:('USD','CASH','IDEALPRO','JPY')|O:'LMT','DU1460682', 'SELL', 100000, 119.55
 C:('BA','STK','SMART','USD')|O:'LMT','DU1460682', 'SELL', 100, 380.5
 C:('TSLA','STK','SMART','USD')|O:'MKT','DU1460682', 'SELL', 100, 0
-C:('TSLA', 'OPT', 'SMART', 'USD', '20190510', 270.0, 'C')|O:'MKT','DU1460682', 'SELL', 8, 0 
-C:('TSLA', 'OPT', 'SMART', 'USD', '20190510', 235.0, 'C')|O:'LMT','DU1460682', 'SELL', 8, 4.65
-C:('TSLA', 'OPT', 'SMART', 'USD', '20190510', 238.0, 'C')|O:'LMT','DU1460682', 'BUY', 8, 3.5
+C:('TSLA', 'OPT', 'SMART', 'USD', '20190524', 270.0, 'C')|O:'MKT','DU1460682', 'SELL', 8, 0 
+C:('TSLA', 'OPT', 'SMART', 'USD', '20190524', 235.0, 'C')|O:'LMT','DU1460682', 'SELL', 8, 4.65
+C:('TSLA', 'OPT', 'SMART', 'USD', '20190524', 238.0, 'C')|O:'LMT','DU1460682', 'BUY', 8, 3.5
+C:('GOOG', 'OPT', 'SMART', 'USD', '20190524', 1162.5, 'C')|O:'LMT','DU1460682', 'SELL', 10, 3.2
+C:('GOOG', 'OPT', 'SMART', 'USD', '20190524', 1167.5, 'C')|O:'LMT','DU1460682', 'BUY', 10, 1.6
+C:('GOOG', 'OPT', 'SMART', 'USD', '20190524', 1160, 'P')|O:'LMT','DU1460682', 'SELL', 10, 0.5
+C:('GOOG', 'OPT', 'SMART', 'USD', '20190524', 1157.5, 'P')|O:'LMT','DU1460682', 'BUY', 10, 0.25
+#
 #
 # invalid stock code
 # 
 #C:('BAF55','STK','SMART','USD')|O:'LMT','DU1460682', 'SELL', 100, 380.5
-#C:('USD.JJJ','CASH','IDEALPRO','JPY')|O:'LMT','DU1460682', 'SELL', 100000, 119.55
+#C:('USD.JJJ','CASH','IDEALPRO','JPY')|O:'LMT','DU1460682', 'SELL', 100000, 119.55
+#
+#

+ 89 - 10
src/ormdapi/v2/apiv2.py

@@ -3,6 +3,7 @@ from misc2.helpers import ContractHelper, OrderHelper, OrderValidationException
 from misc2.observer import Publisher
 from finopt.instrument import Symbol
 from time import sleep
+from ormdapi.v2.position_handler import AccountSummaryTags
 import uuid
 import traceback
 import json
@@ -10,6 +11,7 @@ import json
 
 
 
+
 class InterestedTags():
     
     '''
@@ -48,9 +50,17 @@ class InterestedTags():
                                      'm_secType': 'sec_type',
                                      'm_strike': 'strike',
                                      'm_expirt': 'expiry'},
-                        
+                        'state':{   'm_initMargin': "init_margin",
+                                    'm_maintMargin': "maint_margin",
+                                    'm_equityWithLoan': "equity_with_loan",
+                                    'm_commission': "commission",
+                                    'm_minCommission': "min_commission",
+                                    'm_maxCommission': "max_commission",
+                                    'm_commissionCurrency': "commission_currency",
+                                    'm_warningText': "warning_text"},
                         'error': {'errorCode': 'error_code',
                                   'errorMsg': 'error_msg'},
+                        
                         }
     
     
@@ -62,6 +72,9 @@ class InterestedTags():
             os[v] = o_status['order'][k]
         for k,v in InterestedTags.OrderStatus_tags['ord_status'].iteritems():
             os[v] = o_status['ord_status'][k]
+        for k,v in InterestedTags.OrderStatus_tags['state'].iteritems():
+            os[v] = o_status['state'][k]
+            
         try:
             os['error'] = 'error_code:%d, error_msg:%s' % (o_status['error']['errorCode'], o_status['error']['errorMsg'])
         except KeyError:
@@ -268,9 +281,9 @@ class QuoteRequest_v2(Resource, Publisher):
     def get(self):
         parser = reqparse.RequestParser()
         parser.add_argument('contract', required=True, help="contract is required.")
+        parser.add_argument('greeks', required=False, help="obtain option greeks")
         args = parser.parse_args()
-        contract = v2_helper.format_v2_str_to_contract(args['contract'])
-        
+
         '''
             if the contract is already in quote_handler
                 just read off the values from quote_handler and return
@@ -282,22 +295,52 @@ class QuoteRequest_v2(Resource, Publisher):
                     
                 
         '''
-        def output_result(sym):
-            return {'asize': sym.get_tick_value(Symbol.ASKSIZE), 'ask': sym.get_tick_value(Symbol.ASK),
+        def output_result(sym, require_greeks):
+            
+            
+            res =  {'asize': sym.get_tick_value(Symbol.ASKSIZE), 'ask': sym.get_tick_value(Symbol.ASK),
                     'bsize': sym.get_tick_value(Symbol.BIDSIZE), 'bid': sym.get_tick_value(Symbol.BID),
                     'last': sym.get_tick_value(Symbol.LAST), 'high': sym.get_tick_value(Symbol.LOW),
                     'close': sym.get_tick_value(Symbol.CLOSE)}
-                        
-        sym = self.quote_mgr.get_symbol_ticks(contract)
+            
+            if require_greeks:
+                opt_fields = [(Symbol.BID_OPTION, 'bid_option'),
+                              (Symbol.ASK_OPTION, 'ask_option'),
+                              (Symbol.LAST_OPTION, 'last_option')]
+                              
+                for ofld in opt_fields:
+                    option_greeks = {}
+                    try:
+                        option_greeks[ofld[1]] = sym.get_ib_option_greeks(ofld[0])
+                        res.update(option_greeks) 
+                    except:
+                        continue                
+                
+                
+            return res
+        
+        
+
+
+        contract = v2_helper.format_v2_str_to_contract(args['contract'])
+        require_greeks = False
+        try:
+            if contract.m_secType in ['OPT']:
+                if args['greeks'].upper() == 'TRUE':
+                    require_greeks = True
+        except:
+            pass
+                                    
+        sym = self.quote_mgr.get_symbol(contract)
         if sym:
-            return output_result(sym), 200
+            return output_result(sym, require_greeks), 200
                     
         else:
             print ContractHelper.contract2kvstring(contract)
             self.dispatch(self.event, {'contract': ContractHelper.contract2kvstring(contract), 'snapshot': False})
             i = 0
             while 1:
-                sym =  self.quote_mgr.get_symbol_ticks(contract)
+                sym =  self.quote_mgr.get_symbol(contract)
                 if sym:
                     break
                 sleep(0.1)
@@ -305,4 +348,40 @@ class QuoteRequest_v2(Resource, Publisher):
                 if i >= 15:
                     return 'Not getting any quotes from the server after waited 5 seconds! Contact administrator', 404
                 
-            return output_result(sym), 200
+            return output_result(sym, require_greeks), 200
+        
+        
+        
+
+'''
+    function to ....
+    
+'''
+class AcctPosition_v2(Resource, Publisher):
+    def __init__(self, webconsole):
+        self.wc = webconsole
+        self.gw_conn = self.wc.get_parent().get_tws_connection()
+        self.pm = self.wc.get_parent().get_pos_manager()
+        self.reqId = 4567
+    
+    def get(self):
+        parser = reqparse.RequestParser()
+        parser.add_argument('account', required=False, help="specify account name or leave blank to return all accounts")
+        args = parser.parse_args()        
+        try:
+            
+            # reqPositions must be called as the get_positions method
+            # in AccountPositionTracker relies on the positionEnd flag to be 
+            # set True 
+            self.gw_conn.reqPositions()
+            self.gw_conn.reqAccountSummary(self.reqId, "All", AccountSummaryTags.get_all_tags())
+            return self.pm.get_positions(args['account']), 201
+            
+        except KeyError:
+            return self.pm.get_positions(), 201
+
+        except:
+            
+            return {'error': 'AcctPosition_v2: %s' % traceback.format_exc()}, 409
+        
+        

+ 151 - 0
src/ormdapi/v2/position_handler.py

@@ -0,0 +1,151 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+import logging
+import sys, traceback
+import json
+from time import sleep
+from misc2.helpers import ContractHelper
+from misc2.observer import Subscriber
+from copy import deepcopy
+import threading
+
+
+class AccountSummaryTags():
+    '''
+        https://interactivebrokers.github.io/tws-api/classIBApi_1_1AccountSummaryTags.html
+    '''
+    AccountType = "AccountType"
+    NetLiquidation = "NetLiquidation"
+    TotalCashValue = "TotalCashValue"
+    SettledCash = "SettledCash"
+    AccruedCash = "AccruedCash"
+    BuyingPower = "BuyingPower"
+    EquityWithLoanValue = "EquityWithLoanValue"
+    PreviousEquityWithLoanValue = "PreviousEquityWithLoanValue"
+    GrossPositionValue = "GrossPositionValue"
+    ReqTEquity = "ReqTEquity"
+    ReqTMargin = "ReqTMargin"
+    SMA = "SMA"
+    InitMarginReq = "InitMarginReq"
+    MaintMarginReq = "MaintMarginReq"
+    AvailableFunds = "AvailableFunds"
+    ExcessLiquidity = "ExcessLiquidity"
+    Cushion = "Cushion"
+    FullInitMarginReq = "FullInitMarginReq"
+    FullMaintMarginReq = "FullMaintMarginReq"
+    FullAvailableFunds = "FullAvailableFunds"
+    FullExcessLiquidity = "FullExcessLiquidity"
+    LookAheadNextChange = "LookAheadNextChange"
+    LookAheadInitMarginReq = "LookAheadInitMarginReq"
+    LookAheadMaintMarginReq = "LookAheadMaintMarginReq"
+    LookAheadAvailableFunds = "LookAheadAvailableFunds"
+    LookAheadExcessLiquidity = "LookAheadExcessLiquidity"
+    HighestSeverity = "HighestSeverity"
+    DayTradesRemaining = "DayTradesRemaining"
+    Leverage = "Leverage"
+    
+    
+    @staticmethod
+    def get_all_tags():
+        return ','.join('%s' % tag for tag in filter(lambda t: '__' not in t, dir(AccountSummaryTags)))
+
+    
+
+class AccountPositionTrackerException(Exception):
+    pass
+
+class AccountPositionTracker(Subscriber):
+    
+    POSITION_EVENTS = ['position', 'accountSummary']
+    
+    def __init__(self, name, gw_parent):
+
+        Subscriber.__init__(self, name)
+        self.name = name
+        self.pos_end = False 
+        self.acct_end = False
+        self.account_position = {}
+        self.gw_parent = gw_parent
+        self.tws_event_handler = gw_parent.get_tws_event_handler()
+        
+        '''
+             ask tws_event_handler to forward tick messages to
+             this class
+             
+        '''
+        for e in ['position', 'accountSummary']:
+            self.tws_event_handler.register(e, self)                  
+    
+        
+    def handle_position(self, account, contract_key, position, average_cost, end_batch):
+        logging.info('AccountPositionTracker:position account[%s] contract[%s]', account, contract_key)
+        if not end_batch:
+            self.pos_end = False
+            ckv = ContractHelper.contract2kv(ContractHelper.makeContractfromRedisKeyEx(contract_key)) 
+            try:
+                _ = self.account_position[account]                
+            except:
+                #logging.error(traceback.format_exc())
+                self.account_position[account] = {'acct_position': {contract_key: {'contract': ckv, 'position': position, 'average_cost': average_cost}},
+                                                  'account_summary': {'tags':{}}}
+            
+            
+
+            self.account_position[account]['acct_position'][contract_key] = {'contract': ckv,
+                                                              'position': position, 'average_cost': average_cost}
+            
+            
+                        
+        else:
+            self.pos_end = True
+                    
+    def handle_account_summary(self, **items):       
+        if not items['end_batch']:     
+            self.acct_end = False 
+            try:
+                _ = self.account_position[items['account']]
+            except KeyError:
+                self.account_position[items['account']]= {'acct_position': {}, 'account_summary': {'tags':{}}}
+                
+            for k, v in items.iteritems():
+                if k == 'tag':
+                    self.account_position[items['account']]['account_summary']['tags'][v] = items['value'] 
+                elif k == 'value':
+                    pass
+                else:
+                    self.account_position[items['account']]['account_summary'][k] = v 
+        else:
+            self.acct_end = True
+                         
+    def update(self, event, **param): 
+        if event == 'position':
+            self.handle_position(**param)
+        elif event == 'accountSummary':
+            self.handle_account_summary(**param)
+
+   
+   
+    def get_positions(self, account=None):
+        lock = threading.RLock()
+        result = None
+        try:
+            lock.acquire()
+            i = 0
+            while self.pos_end == False or self.acct_end == False:
+                sleep(0.5)
+                i += 1
+                logging.warn('AccountPositionTracker: waiting position status to change to finish: round # %d... ' % i)
+                if i == 10:
+                    raise AccountPositionTrackerException
+            try:
+                result= self.account_position[account]
+            except:
+                result =  self.account_position
+        except AccountPositionTrackerException:
+            logging.error('AccountPositionTracker: get_positions time out exception %s' % traceback.format_exc())
+        except:
+            logging.error('AccountPositionTracker: %s' % traceback.format_exc())
+        finally:
+            lock.release()
+            return result     
+           

+ 17 - 53
src/ormdapi/v2/quote_handler.py

@@ -4,7 +4,7 @@ from threading import RLock
 from misc2.observer import Subscriber
 from misc2.observer import NotImplementedException
 from misc2.helpers import ContractHelper
-
+from copy import deepcopy
 import traceback
 from finopt.instrument import Symbol
 
@@ -31,7 +31,7 @@ class QuoteRESTHandler(Subscriber):
              this class
              
         '''
-        for e in ['tickPrice', 'tickSize']:
+        for e in ['tickPrice', 'tickSize', 'tickOptionComputation']:
             self.tws_event_handler.register(e, self)           
         
 
@@ -56,68 +56,32 @@ class QuoteRESTHandler(Subscriber):
             self.symbols[contract_key] = s
         s.set_tick_value(field, size)
 
+    def handle_tickgreeks(self, **params):
+        logging.debug('QuoteHandler:tickOptionComputation')
+        try:
+            contract_key = params['contract_key']
+            s = self.symbols[contract_key]
+        except KeyError:
+            s = Symbol(ContractHelper.makeContractfromRedisKeyEx(contract_key))
+            self.symbols[contract_key] = s
+        greeks = deepcopy(params)
+        del greeks['greeks']['field']
+        del greeks['contract_key']
+        s.set_ib_option_greeks(params['greeks']['field'], greeks['greeks'])      
         
     def update(self, event, **param): 
         if event == 'tickPrice':
             self.handle_tickprice(**param)
         elif event == 'tickSize':
             self.handle_ticksize(**param)
+        elif event == 'tickOptionComputation':
+            self.handle_tickgreeks(**param)
         
     
-    def get_symbol_ticks(self, contract):
+    def get_symbol(self, contract):
         try:
             return self.symbols[ContractHelper.makeRedisKeyEx(contract)]
         except KeyError:
             return None
         
-        
-    def dump(self):
-    
-        
-        def format_tick_val(val, fmt):
-            if val == None:
-                length = len(fmt % (0))
-                return ' ' * length
-            
-            return fmt % (val) 
-        
-        
-        fmt_spec = '%8.2f'
-        fmt_spec2 = '%8.4f'
-        fmt_specq = '%8d'
-        
-        
-        def get_field(sym, fld_id):
-            try:
-                return sym.get_tick_value(fld_id)
-            except:
-                return ''
 
-        
-        
-        fmt_sym = map(lambda x: (x[0], '%s,%s,%s,%s,%s,%s,%s' % (
-                                            format_tick_val(get_field(x[1],Symbol.LAST), fmt_spec),
-                                            format_tick_val(get_field(x[1],Symbol.BIDSIZE), fmt_specq),                                                                                                                  
-                                            format_tick_val(get_field(x[1],Symbol.BID), fmt_spec),
-                                            format_tick_val(get_field(x[1],Symbol.ASK), fmt_spec), 
-                                            format_tick_val(get_field(x[1],Symbol.ASKSIZE), fmt_specq),
-                                            format_tick_val(get_field(x[1],Symbol.CLOSE), fmt_spec),
-                                            format_tick_val(get_field(x[1],Symbol.VOLUME), fmt_specq),
-                                            )), [(k,v) for k, v in self.symbols.iteritems()])        
-        
-        print('%40s,%8s,%8s,%8s,%8s,%8s,%8s,%8s\n' % ('SYM', 'LAST', 'BIDSIZE','BID','ASK','ASKSIZE','CLOSE','VOLUME'
-                                             ))
-
-        for e in fmt_sym:
-            print('[%s]%s' % (e[0].ljust(40), e[1]))
-
-    
-    
-        
-
-
-
-
-        
-        
-        

+ 45 - 5
src/sh/run_mds.sh

@@ -1,8 +1,48 @@
 #!/bin/bash
-ROOT=$FINOPT_HOME
+
+MDS_CFG=mds.cfg 
+HOST=$(hostname)
+echo $HOST
+if [ $HOST == 'hkc-larryc-vm1' ]; then
+	FINOPT_HOME=~/ironfly-workspace/finopt/src
+elif [ $HOST == 'vorsprung' ]; then
+	FINOPT_HOME=~/workspace/finopt/src
+elif [ $HOST == 'astron' ]; then
+	#	FINOPT_HOME=~/workspace/finopt/src
+
+	# virtual env
+	FINOPT_HOME=~/workspace/fpydevs/eclipse/finopt/src
+	source /home/laxaurus/workspace/fpydevs/env/bin/activate
+        SYMBOLS_PATH=/home/laxaurus/workspace/fpydevs/dat/symbols/goog.txt
+
+
+elif [ $HOST == 'vsu-longhorn' ]; then
+        FINOPT_HOME=~/pyenvs/ironfly/finopt/src
+        source /home/vuser-longhorn/pyenvs/finopt/bin/activate
+        MDS_CFG=mds_prd.cfg
+elif [ $HOST == 'vsu-vortify' ]; then
+        FINOPT_HOME=~/workspace/fpydevs/eclipse/finopt/src
+        source /home/vuser-vortify/workspace/fpydevs/env/bin/activate
+        MDS_CFG=mds_avant.cfg
+        SYMBOLS_PATH=$FINOPT_HOME/../../../dat/symbols/goog.txt
+fi
+									
+						
 export PYTHONPATH=$FINOPT_HOME:$PYTHONPATH
-# real time mode
-python $FINOPT_HOME/cep/ib_mds.py $FINOPT_HOME/config/mds.cfg
-# replay mode
-#python $FINOPT_HOME/cep/ib_mds.py -r $FINOPT_HOME/../data/mds_files/20151006 $FINOPT_HOME/config/mds.cfg
+#  
+# clear all topic offsets and erased saved subscriptions
+
+
+python $FINOPT_HOME/cep/ib_mds.py -s $SYMBOLS_PATH -f $FINOPT_HOME/config/$MDS_CFG 
+
+
+#
+# clear offsets in redis / reload saved subscription entries
+#python $FINOPT_HOME/comms/ibgw/mds.py  -c -f $FINOPT_HOME/config/mds.cfg 
+
+
+# restart gateway keep the redis offsets but erase the subscription entries
+#python $FINOPT_HOME/comms/ibgw/mds.py  -r -f $FINOPT_HOME/config/mds.cfg 
 
+# normal restart - keep the offsets and reload from saved subscription entries
+#python $FINOPT_HOME/comms/ibgw/mds.py   -f $FINOPT_HOME/config/mds.cfg