Przeglądaj źródła

no work week changes

esurfer 9 lat temu
rodzic
commit
118f5a3929

+ 36 - 17
src/comms/ibc/base_client_messaging.py

@@ -51,7 +51,9 @@ class GatewayCommandWrapper():
         self.producer.send_message('reqAccountUpdates', json.dumps({'subscribe': subscribe, 'acct_code': acct_code}))
 
     def reqExecutions(self, exec_filter=None):
-        self.producer.send_message('reqExecutions', ExecutionFilterHelper.object2kvstring(exec_filter) if exec_filter <> None else '')
+
+        #self.producer.send_message('reqExecutions', json.dumps({'exec_filter': 'null'}))
+        self.producer.send_message('reqExecutions', json.dumps({'exec_filter': ExecutionFilterHelper.object2kvstring(exec_filter) if exec_filter else 'null'}))
 
 #     def reqMktData(self, contract):
 #         self.producer.send_message('reqMktData', ContractHelper.object2kvstring(contract))
@@ -119,21 +121,38 @@ class AbstractGatewayListener(BaseMessageListener):
         """ generated source for method openOrderEnd """
         raise NotImplementedException
    
-    def updateAccountValue(self, event, key, value, currency, account):  # key, value, currency, accountName):
-        """ generated source for method updateAccountValue """
-        raise NotImplementedException
-
-    def updatePortfolio(self, event, contract_key, position, market_price, market_value, average_cost, unrealized_PNL, realized_PNL, account):
-        """ generated source for method updatePortfolio """
-        raise NotImplementedException
-   
-    def updateAccountTime(self, event, timestamp):
-        """ generated source for method updateAccountTime """
-        raise NotImplementedException
+    def update_portfolio_account(self, event, **message_value):
+        '''
+            this message wraps TWS events 'updateAccountValue', 'updatePortfolio',
+            'updateAccountTime', 'accountDownloadEnd' and their parameters into a dict in 
+            the message_value parameter
+            
+            the name of the tws event is stored in a key 'tws_event'
+            
+             val->[{u'currency': u'JPY', u'account': u'U8379890', u'value': u'0.0698037', u'tws_event': u'updateAccountValue', u'key': u'ExchangeRate'}]
+ 
+            
+        '''
+        tws_event = message_value['tws_event']
+        del(message_value['tws_event'])
+        getattr(self, tws_event)(tws_event, **message_value)
+        
    
-    def accountDownloadEnd(self, event, account):  # accountName):
-        """ generated source for method accountDownloadEnd """
-        raise NotImplementedException
+#     def updateAccountValue(self, event, key, value, currency, account):  # key, value, currency, accountName):
+#         """ generated source for method updateAccountValue """
+#         raise NotImplementedException
+# 
+#     def updatePortfolio(self, event, contract_key, position, market_price, market_value, average_cost, unrealized_PNL, realized_PNL, account):
+#         """ generated source for method updatePortfolio """
+#         raise NotImplementedException
+#    
+#     def updateAccountTime(self, event, timestamp):
+#         """ generated source for method updateAccountTime """
+#         raise NotImplementedException
+#    
+#     def accountDownloadEnd(self, event, account):  # accountName):
+#         """ generated source for method accountDownloadEnd """
+#         raise NotImplementedException
    
     def nextValidId(self, event, message_value):  # orderId):
         """ generated source for method nextValidId """
@@ -151,7 +170,7 @@ class AbstractGatewayListener(BaseMessageListener):
         """ generated source for method contractDetailsEnd """
         raise NotImplementedException
    
-    def execDetails(self, event, message_value):  # reqId, contract, execution):
+    def execDetails(self, event, req_id, contract_key, execution, end_batch): 
         """ generated source for method execDetails """
         raise NotImplementedException
    
@@ -223,7 +242,7 @@ class AbstractGatewayListener(BaseMessageListener):
         """ generated source for method commissionReport """
         raise NotImplementedException
    
-    def position(self, event, account, contract_key, pos, avg_cost):
+    def position(self, event, account, contract_key, pos, avg_cost, end_batch):
         """ generated source for method position """
         raise NotImplementedException
    

+ 75 - 31
src/comms/ibc/gw_ex_request_exit.py

@@ -7,7 +7,7 @@ import sys
 
 from ib.ext.Contract import Contract
 from optparse import OptionParser
-from misc2.helpers import ContractHelper, HelperFunctions
+from misc2.helpers import ContractHelper, HelperFunctions, ExecutionFilter
 from comms.ibgw.base_messaging import Prosumer
 from comms.tws_protocol_helper import TWS_Protocol
 from comms.ibc.tws_client_lib import TWS_client_manager, AbstractGatewayListener
@@ -16,16 +16,19 @@ from rethink.tick_datastore import TickDataStore
 from finopt.instrument import Symbol
 
          
-class MessageListener(AbstractGatewayListener):   
+class ClientMessageListener(AbstractGatewayListener):   
     def __init__(self, name, tick_ds):
         AbstractGatewayListener.__init__(self, name)
         self.tick_ds = tick_ds
 
    
+    def raw_dump(self, event, items):
+        del(items['self'])
+        logging.info('%s [[ %s ]]' % (event, items))        
    
-    def position(self, event, account, contract_key, position, average_cost):
-        """ generated source for method position """
-        logging.info('%s [[ %s ]]' % (event, vars()))
+    def position(self, event, account, contract_key, position, average_cost, end_batch):
+        self.raw_dump(event, vars())
+
    
     def positionEnd(self, event): #, message_value):
         """ generated source for method positionEnd """
@@ -35,23 +38,29 @@ class MessageListener(AbstractGatewayListener):
     def error(self, event, id, errorCode, errorMsg):
         logging.info('MessageListener:%s. val->[%s]' % (event, vars()))  
 
-
+#     def update_portfolio_account(self, event, **items):
+#         logging.info('MessageListener:%s. val->[%s]' % (event, items))
+        
+        
+    '''
+        the 4 account functions below are invoked by AbstractListener.update_portfolio_account.
+        the original message from TWS is first wrapped into update_portfolio_account event in 
+        class TWS_event_handler and then expanded by AbstractListener.update_portfolio_account
+        (check tws_event_hander)
+    '''
     def updateAccountValue(self, event, key, value, currency, account):  # key, value, currency, accountName):
-        """ generated source for method updateAccountValue """
-        logging.info('%s [[ %s ]]' % (event, vars()))
-
+        self.raw_dump(event, vars())
+ 
     def updatePortfolio(self, event, contract_key, position, market_price, market_value, average_cost, unrealized_PNL, realized_PNL, account):
-        """ generated source for method updatePortfolio """
-        logging.info('%s [[ %s ]]' % (event, vars()))
-   
+        self.raw_dump(event, vars())
+        
+            
     def updateAccountTime(self, event, timestamp):
-        """ generated source for method updateAccountTime """
-        logging.info('%s [[ %s ]]' % (event, vars()))
-   
+        self.raw_dump(event, vars())
+        
     def accountDownloadEnd(self, event, account):  # accountName):
-        """ generated source for method accountDownloadEnd """
-        logging.info('%s [[ %s ]]' % (event, vars()))
-      
+        self.raw_dump(event, vars())
+        
 
     def tickPrice(self, event, contract_key, field, price, canAutoExecute):
         #logging.info('MessageListener:%s. %s %d %8.2f' % (event, contract_key, field, price))
@@ -62,13 +71,15 @@ class MessageListener(AbstractGatewayListener):
         #logging.info('MessageListener:%s. %s: %d %8.2f' % (event, contract_key, field, size))
         
 
+    def execDetails(self, event, req_id, contract, execution, end_batch):
+        self.raw_dump(event, vars())
 
 
 def test_client(kwargs):
 
     ts = TickDataStore(kwargs['name'])
     cm = TWS_client_manager(kwargs)
-    cl = MessageListener('gw_client_message_listener', ts)
+    cl = ClientMessageListener('gw_client_message_listener', ts)
     
     cm.add_listener_topics(cl, kwargs['topics'])
     cm.start_manager()
@@ -106,26 +117,59 @@ def test_client2(kwargs):
 
     ts = TickDataStore(kwargs['name'])
     cm = TWS_client_manager(kwargs)
-    cl = MessageListener('gw_client_message_listener', ts)
+    cl = ClientMessageListener('gw_client_message_listener', ts)
     
     cm.add_listener_topics(cl, kwargs['topics'])
     cm.start_manager()
                           
                               
-    cm.reqPositions()
+    #cm.reqPositions()
     #cm.reqAccountUpdates(True, 'U8379890')
     
+
+
+
     try:
-        logging.info('TWS_gateway:main_loop ***** accepting console input...')
-        while not cm.is_stopped(): 
-        
-            sleep(.45)
-            read_ch = raw_input("Enter command:")
+        logging.info('gw_ex_request:main_loop ***** accepting console input...')
+        menu = {}
+        menu['1']="Request positions" 
+        menu['2']="Request account updates "
+        menu['3']="End request account updates"
+        menu['4']="Request executions"
+        menu['9']="Exit"
+        while True: 
+            choices=menu.keys()
+            choices.sort()
+            for entry in choices: 
+                print entry, menu[entry]            
+
+            selection = raw_input("Enter command:")
+            if selection =='1':
+                cm.reqPositions()
+            elif selection == '2': 
+                cm.reqAccountUpdates(True, 'U8379890')
+            elif selection == '3':
+                cm.reqAccountUpdates(False, 'U8379890')
+            elif selection == '4':
+                
+                cm.reqExecutions()
+                
+            elif selection == '9': 
+                cm.gw_message_handler.set_stop()
+                break
+            else: 
+                print "Unknown Option Selected!"                 
             
+            sleep(0.15)
         
     except (KeyboardInterrupt, SystemExit):
-        logging.error('TWS_client_manager: caught user interrupt. Shutting down...')
-        cm.gw_message_handler.set_stop()
+        logging.error('AnalyticsEngine: caught user interrupt. Shutting down...')
+        cm.gw_message_handler.set_stop() 
+        logging.info('AnalyticsEngine: Service shut down complete...')     
+
+
+
+
         
         logging.info('TWS_client_manager: Service shut down complete...')
            
@@ -149,9 +193,9 @@ if __name__ == '__main__':
       'session_timeout_ms': 10000,
       'clear_offsets':  False,
       'logconfig': {'level': logging.INFO, 'filemode': 'w', 'filename': '/tmp/gw_ex.log'},
-      'topics': ['tickSize', 'tickPrice',  'position', 'positionEnd', 'updateAccountValue', 'updatePortfolio', 'updateAccountTime', 'accountDownloadEnd'],
-      'seek_to_end': ['tickPrice', 'tickSize','position', 'positionEnd', 'updateAccountValue', 
-                      'updatePortfolio', 'updateAccountTime', 'accountDownloadEnd']
+      #'topics': ['tickSize', 'tickPrice',  'position', 'positionEnd', 'updateAccountValue', 'updatePortfolio', 'updateAccountTime', 'accountDownloadEnd'],
+      'topics': ['tickSize', 'tickPrice',  'position', 'update_portfolio_account', 'execDetails'],
+      'seek_to_end': ['tickPrice', 'tickSize']
       }
 
     usage = "usage: %prog [options]"

+ 10 - 7
src/comms/ibgw/base_messaging.py

@@ -193,8 +193,6 @@ class BaseConsumer(threading.Thread, Publisher):
         self.my_topics[topic][str(partition)] = offset
         self.rs.set(self.consumer_topic(topic), json.dumps(self.my_topics[topic]))
     
-    def enrich_message(self, message):
-        return {'value': message.value, 'partition':message.partition, 'offset': message.offset}
         
     def extract_message_content(self, message):
         #logging.info('BaseConsumer: extract_message_content. %s %s' % (type(message), message))
@@ -272,7 +270,7 @@ class BaseConsumer(threading.Thread, Publisher):
                 # it may be useful to detect slow consumer situation
                 if message.offset % BaseConsumer.SLOW_CONSUMER_QUALIFY_NUM == 0:
                     highwater = consumer.highwater(TopicPartition(message.topic, message.partition))
-                    logging.info( "BaseConsumer [%s]:highwater:%d offset:%d part:%d <%s>" %  (self.name, highwater, message.offset, message.partition, message.value))
+                    logging.info( "BaseConsumer [%s]:highwater:%d offset:%d part:%d <%s>" %  (self.name, highwater, message.offset, message.partition, message.topic))
                     
                     if highwater - message.offset >= BaseConsumer.SLOW_CONSUMER_QUALIFY_NUM:
                         logging.warn("BaseConsumer:run Slow consumer detected! current: %d, highwater:%d, gap:%d" %
@@ -312,10 +310,16 @@ class BaseConsumer(threading.Thread, Publisher):
                         
                     if '*' in self.kwargs['seek_to_end'] or message.topic in self.kwargs['seek_to_end']:
                         #print 'baseconsumer run %s %d' % (message.topic, gap)
-                        # if there is no gap                          
-                        if gap == 1:
+                        # if there is no gap
+                        '''
+                        
+                            use seek_to_end only for messages that keep streaming and you don't
+                            care whether messages are lost or not
+                            
+                        
+                        '''
+                        if gap <=1:
                             # the message is valid for dispatching and not to be skipped
-                            #self.dispatch(message.topic, self.enrich_message(message))
                             self.dispatch(message.topic, self.extract_message_content(message))
                             logging.debug('*** On first iteration: Gap=%d Dispatch this valid message to the listener <%s>' % (gap, message.value))
                         else: # gap exists
@@ -347,7 +351,6 @@ class BaseConsumer(threading.Thread, Publisher):
                     # both saved value in redis and current offset are both 0
                     if self.my_topics[message.topic][str(message.partition)] == message.offset and message.offset <> 0:
                         self.dispatch(BaseConsumer.KB_REACHED_LAST_OFFSET, self.extract_message_content(message))
-                        #self.dispatch(message.topic, self.enrich_message(message))
                         self.dispatch(message.topic, self.extract_message_content(message))
                         logging.info('********************** reached the last message previously processed %s %d' % (message.topic, message.offset))
                     else:

+ 7 - 6
src/comms/ibgw/client_request_handler.py

@@ -40,12 +40,13 @@ class ClientRequestHandler(BaseMessageListener):
         self.tws_connect.reqPositions()
         
         
-    def reqExecutions(self, event, value):
-        try:
-            filt = ExecutionFilter() if value == '' else ExecutionFilterHelper.kvstring2object(value, ExecutionFilter)
-            self.tws_connect.reqExecutions(0, filt)
-        except:
-            logging.error(traceback.format_exc())
+    def reqExecutions(self, event, exec_filter=None):
+        
+        if exec_filter == 'null':
+            exec_filter = ExecutionFilter()
+        else:
+            ExecutionFilterHelper.kv2object(exec_filter, ExecutionFilter)
+        self.tws_connect.reqExecutions(0, exec_filter)
     
     
     def reqIds(self, event, value=None):

+ 61 - 23
src/comms/ibgw/tws_event_handler.py

@@ -36,14 +36,17 @@ class TWS_event_handler(EWrapper):
     
     def pre_process_message(self, message_name, items):
 
-        t = items.copy()
+        #t = items.copy()
+        t = items
        
             
-        try:
+#         try:
+#             del(t['self'])
+#         except (KeyError, ):
+#             pass          
+        if 'self' in t:
             del(t['self'])
-        except (KeyError, ):
-            pass          
-        
+            
 
         for k,v in t.iteritems():
                 #print k, v, type(v)
@@ -62,13 +65,13 @@ class TWS_event_handler(EWrapper):
                 
     
     def tickPrice(self, tickerId, field, price, canAutoExecute):
-        logging.info('TWS_event_handler:tickPrice. %d<->%s' % (tickerId,self.subscription_manger.get_contract_by_id(tickerId) ))
+        logging.debug('TWS_event_handler:tickPrice. %d<->%s' % (tickerId,self.subscription_manger.get_contract_by_id(tickerId) ))
         self.broadcast_event('tickPrice', {'contract_key': self.subscription_manger.get_contract_by_id(tickerId), 
                                           'field': field, 'price': price, 'canAutoExecute': canAutoExecute})
         #pass
     
     def tickSize(self, tickerId, field, size):
-         logging.info('TWS_event_handler:tickSize. %d<->%s' % (tickerId,self.subscription_manger.get_contract_by_id(tickerId) ))
+         logging.debug('TWS_event_handler:tickSize. %d<->%s' % (tickerId,self.subscription_manger.get_contract_by_id(tickerId) ))
          self.broadcast_event('tickSize', {'contract_key': self.subscription_manger.get_contract_by_id(tickerId), 
                                             'field': field, 'size': size})
         #pass
@@ -100,33 +103,48 @@ class TWS_event_handler(EWrapper):
     def openOrderEnd(self):
         pass
 
+
+    def update_portfolio_account(self, items):
+        self.broadcast_event('update_portfolio_account', items)
+        
     def updateAccountValue(self, key, value, currency, accountName):
         
         logging.info('TWS_event_handler:updateAccountValue. [%s]:%s' % (key.ljust(40), value))
-        self.broadcast_event('updateAccountValue', {'key': key, 
-                                          'value': value, 'currency': currency, 'account':accountName})
+        self.update_portfolio_account({'tws_event': 'updateAccountValue', 
+                              'key': key, 'value': value, 'currency': currency, 'account':accountName})
+#         self.broadcast_event('updateAccountValue', {'key': key, 
+#                                           'value': value, 'currency': currency, 'account':accountName, 'batch_end': False})
                 
 
     def updatePortfolio(self, contract, position, marketPrice, marketValue, averageCost, unrealizedPNL, realizedPNL, accountName):
         contract_key= ContractHelper.makeRedisKeyEx(contract)
-        logging.info('TWS_event_handler:updatePortfolio. [%s]:position= %d' % (contract_key, position))
-        self.broadcast_event('updatePortfolio', {
+        self.update_portfolio_account(
+                              {'tws_event': 'updatePortfolio',
                                 'contract_key': contract_key, 
                                 'position': position, 'market_price': marketPrice,
                                 'market_value': marketValue, 'average_cost': averageCost, 
                                 'unrealized_PNL': unrealizedPNL, 'realized_PNL': realizedPNL, 
-                                'account': accountName
-                                
-                                })
+                                'account': accountName}
+                              )
+        logging.info('TWS_event_handler:updatePortfolio. [%s]:position= %d' % (contract_key, position))
+#         self.broadcast_event('updatePortfolio', {
+#                                 'contract_key': contract_key, 
+#                                 'position': position, 'market_price': marketPrice,
+#                                 'market_value': marketValue, 'average_cost': averageCost, 
+#                                 'unrealized_PNL': unrealizedPNL, 'realized_PNL': realizedPNL, 
+#                                 'account': accountName,
+#                                 'batch_end': False
+#                                 })
                 
 
     def updateAccountTime(self, timeStamp):
-        
-        self.broadcast_event('updateAccountTime', {'timestamp': timeStamp})
+        self.update_portfolio_account({'tws_event':'updateAccountTime', 'timestamp': timeStamp})        
+#        self.broadcast_event('updateAccountTime', {'timestamp': timeStamp, 'batch_end': False})
                 
 
     def accountDownloadEnd(self, accountName):
-        self.broadcast_event('accountDownloadEnd', {'account':accountName})
+        self.update_portfolio_account({'tws_event':'accountDownloadEnd', 'account':accountName})
+#        self.broadcast_event('accountDownloadEnd', {'account':accountName})
         
         
     def nextValidId(self, orderId):
@@ -142,19 +160,19 @@ class TWS_event_handler(EWrapper):
         self.broadcast_event('bondContractDetails', vars())
 
     def execDetails(self, reqId, contract, execution):
-        self.broadcast_event('execDetails', vars())
+        self.broadcast_event('execDetails', {'req_id': reqId, 'contract': contract, 'execution': execution, 'end_batch': False})
 
     def execDetailsEnd(self, reqId):
-        self.broadcast_event('execDetailsEnd', vars())
+        self.broadcast_event('execDetails', {'req_id': reqId, 'contract': None, 'execution': None, 'end_batch': True})
 
     def connectionClosed(self):
         self.broadcast_event('connectionClosed', {})
 
     def error(self, id=None, errorCode=None, errorMsg=None):
         try:
-            logging.error(self.pre_process_message('error', vars()))
+            logging.error('TWS_event_handler:error. id:%s, errorCode:%s, errorMsg:%s' % (id, errorCode, errorMsg))
             self.broadcast_event('error', {'id': id, 
-                                           'errorCode': errorCode, 'errorMsg': errorMsg})
+                                           'errorCode': errorCode, 'errorMsg': '%s(%s)' % (str(type(errorMsg)), str(errorMsg)) })
 
         except:
             pass
@@ -228,12 +246,32 @@ class TWS_event_handler(EWrapper):
         self.broadcast_event('position', {
                                 'account': account,
                                 'contract_key': contract_key, 
-                                'position': pos, 'average_cost': avgCost
+                                'position': pos, 'average_cost': avgCost,
+                                'end_batch': False
                                 
                                 })        
 
     def positionEnd(self):
-        self.broadcast_event('positionEnd', {})
+        '''
+            positionEnd is sent to the client side as a 'position' event
+            this is to mimick TWS behavior such that positionEnd is always the last message to send
+            to the client as part of the reqPosition operation
+            
+            
+            kafka does not guarantee the arrival sequence amongst different topics. Therefore
+            if positionEnd is sent as a separate topic, it may arrive first before other
+            position messages have been received by the client
+            
+            the 'end_batch' keyword is set to true 
+        '''
+        self.broadcast_event('position', {
+                                'account': None,
+                                'contract_key': None, 
+                                'position': None, 'average_cost': None,
+                                'end_batch': True
+                                
+                                })        
+
 
     def accountSummary(self, reqId, account, tag, value, currency):
         self.broadcast_event('accountSummary', vars())

+ 48 - 2
src/misc2/helpers.py

@@ -47,9 +47,55 @@ class OrderHelper(BaseHelper):
 
     
 class ExecutionFilterHelper(BaseHelper):
-    pass
+    @staticmethod
+    def makeExeuction(executionTuple):
+        '''
+        String m_acctNumber    The customer account number.
+        double m_avgPrice    Average price. Used in regular trades, combo trades and legs of the combo. Does not include commissions.
+        int m_clientId    "The id of the client that placed the order.
+        Note: TWS orders have a fixed client id of ""0."""
+        int m_cumQty    Cumulative quantity. Used in regular trades, combo trades and legs of the combo.
+        String m_exchange    Exchange that executed the order.
+        String m_execId    Unique order execution id.
+        int m_liquidation    Identifies the position as one to be liquidated last should the need arise.
+        int m_orderId    "The order id.
+        Note:  TWS orders have a fixed order id of ""0."""
+        int m_permId    The TWS id used to identify orders, remains the same over TWS sessions.
+        double m_price    The order execution price, not including commissions.
+        int m_shares    The number of shares filled.
+        String m_side    "Specifies if the transaction was a sale or a purchase. Valid values are:
+        BOT
+        SLD"
+        String m_time    The order execution time.
 
-    
+        '''
+        
+        pass
+
+
+    @staticmethod
+    def makeExeuctionFilter(executionFilterTuple):
+        '''
+        String m_acctCode    Filter the results of the reqExecutions() method based on an account code. Note: this is only relevant for Financial Advisor (FA) accounts.
+        int m_clientId    Filter the results of the reqExecutions() method based on the clientId.
+        String m_exchange    Filter the results of the reqExecutions() method based on theorder exchange.
+        String m_secType    "Filter the results of the reqExecutions() method based on the order security type.
+        Note: Refer to the Contract struct for the list of valid security types."
+        String m_side    "Filter the results of the reqExecutions() method based on the order action.
+        Note: Refer to the Order class for the list of valid order actions."
+        String m_symbol    Filter the results of the reqExecutions() method based on the order symbol.
+        String m_time    "Filter the results of the reqExecutions() method based on execution reports received after the specified time.
+        The format for timeFilter is ""yyyymmdd-hh:mm:ss"""
+        '''
+        new_filter = ExecutionFilter()
+        new_filter.m_acctCode = executionFilterTuple[0]
+        new_filter.m_clientId = executionFilterTuple[1] 
+        new_filter.m_exchange = executionFilterTuple[2]
+        new_filter.m_secType = executionFilterTuple[3]
+        new_filter.m_side = executionFilterTuple[4]
+        new_filter.m_symbol = executionFilterTuple[5]
+        new_filter.m_time = executionFilterTuple[6]
+        return new_filter
 
 
 class ContractHelper(BaseHelper):

+ 14 - 10
src/rethink/analytics_engine.py

@@ -34,13 +34,13 @@ class AnalyticsEngine(AbstractGatewayListener):
         
     
     def test_oc(self, oc2):
-        expiry = '20170330'
+        expiry = '20170427'
         contractTuple = ('HSI', 'FUT', 'HKFE', 'HKD', expiry, 0, '')
         contract = ContractHelper.makeContract(contractTuple)  
         
         oc2.set_option_structure(contract, 200, 50, 0.0012, 0.0328, expiry)        
         
-        oc2.build_chain(24119, 0.03, 0.22)
+        oc2.build_chain(24172, 0.04, 0.22)
         
 #         expiry='20170324'
 #         contractTuple = ('QQQ', 'STK', 'SMART', 'USD', '', 0, '')
@@ -59,13 +59,13 @@ class AnalyticsEngine(AbstractGatewayListener):
         
     
     def test_oc3(self, oc3):
-        expiry = '20170427'
+        expiry = '20170529'
         contractTuple = ('HSI', 'FUT', 'HKFE', 'HKD', expiry, 0, '')
         contract = ContractHelper.makeContract(contractTuple)  
          
         oc3.set_option_structure(contract, 200, 50, 0.0012, 0.0328, expiry)        
          
-        oc3.build_chain(24380, 0.03, 0.22)
+        oc3.build_chain(24172, 0.04, 0.22)
 
 #         expiry = '20170331'
 #         contractTuple = ('QQQ', 'STK', 'SMART', 'USD', '', 0, '')
@@ -93,6 +93,7 @@ class AnalyticsEngine(AbstractGatewayListener):
         self.tds.add_symbol(oc3.get_underlying())
         
     
+    
     def start_engine(self):
         self.twsc.start_manager()
         oc2 = OptionsChain('oc2')
@@ -107,10 +108,11 @@ class AnalyticsEngine(AbstractGatewayListener):
         try:
             logging.info('AnalyticsEngine:main_loop ***** accepting console input...')
             menu = {}
-            menu['1']="Display option chain <id>" 
+            menu['1']="Display option chain oc2" 
             menu['2']="Display tick data store "
-            menu['3']="No opt"
-            menu['4']="Exit"
+            menu['3']="Display option chain oc3"
+            menu['4']="Generate oc3 gtable json"
+            menu['9']="Exit"
             while True: 
                 choices=menu.keys()
                 choices.sort()
@@ -124,13 +126,15 @@ class AnalyticsEngine(AbstractGatewayListener):
                     self.tds.dump()
                 elif selection == '3':
                     oc3.pretty_print()
-                elif selection == '4': 
+                elif selection == '4':
+                    print oc3.g_datatable_json()
+                elif selection == '9': 
                     self.twsc.gw_message_handler.set_stop()
                     break
                 else: 
-                    print "Unknown Option Selected!"                 
+                    oc3.pretty_print()                
                 
-                sleep(0.45)
+                sleep(0.15)
             
         except (KeyboardInterrupt, SystemExit):
             logging.error('AnalyticsEngine: caught user interrupt. Shutting down...')

+ 44 - 2
src/rethink/option_chain.py

@@ -301,7 +301,7 @@ class OptionsChain(Publisher):
                                 )
         
         #title = '%s%30s%s%s' % ('-' * 40, ContractHelper.makeRedisKeyEx(self.get_underlying().get_contract()).center(50, ' '), undlypx, '-' * 40) 
-        title = '%s%30s%s%s' % ('-' * 41, ContractHelper.makeRedisKeyEx(self.get_underlying().get_contract()).center(42, ' '), undlypx, '-' * 27)
+        title = '%s CALL %s%30s%s%s PUT %s' % ('-' * 17, '-' * 18,ContractHelper.makeRedisKeyEx(self.get_underlying().get_contract()).center(42, ' '), undlypx, '-' * 11, '-' * 11)
         header = '%8s|%8s|%8s|%8s|%8s|%8s|%8s|%8s |%8s| %8s|%8s|%8s|%8s|%8s|%8s|%8s|%8s' % ('last', 'bidq', 'bid', 'ask', 'askq', 'ivol', 'delta', 'theta', 'strike', 'last', 'bidq', 'bid', 'ask', 'askq', 'ivol', 'delta', 'theta')
         combined = map(lambda i: '%s |%8.2f| %s' % (fmt_call[i][1], fmt_put[i][0], fmt_put[i][1]), range(len(fmt_call)) )
         footer = '%s' % ('-' * 154) 
@@ -311,4 +311,46 @@ class OptionsChain(Publisher):
             print e
         print footer
         
-     
+    def g_datatable_json(self):
+        
+        sorted_opt = sorted(map(lambda i: (self.options[i].get_contract().m_strike, self.options[i]) , range(len(self.options))))
+        
+        sorted_call = filter(lambda x: x[1].get_contract().m_right == 'C', sorted_opt)
+        sorted_put = filter(lambda x: x[1].get_contract().m_right == 'P', sorted_opt)
+        
+
+        
+        dtj = {'cols':[], 'rows':[]}
+        header = [('last', 'number'), ('bidq', 'number'), ('bid', 'number'), 
+                  ('ask', 'number'), ('askq', 'number'), ('ivol', 'number'), 
+                  ('delta', 'number'), ('theta', 'number'), ('strike', 'number'), 
+                  ('last', 'number'), ('bidq', 'number'), ('bid', 'number'), 
+                  ('ask', 'number'), ('askq', 'number'), ('ivol', 'number'), 
+                  ('delta', 'number'), ('theta', 'number')
+                  ]  
+        # header fields      
+        map(lambda hf: dtj['cols'].append({'id': hf[0], 'label': hf[0], 'type': hf[1]}), header)
+        
+        
+        # table rows
+        # arrange each row with C on the left, strike in the middle, and P on the right
+        def row_fields(x):
+            
+            rf = [{'v': x[1].get_tick_value(4)}, 
+                 {'v': x[1].get_tick_value(0)},
+                 {'v': x[1].get_tick_value(1)},
+                 {'v': x[1].get_tick_value(2)},
+                 {'v': x[1].get_tick_value(3)},
+                 {'v': x[1].get_tick_value(Option.IMPL_VOL)},
+                 {'v': x[1].get_tick_value(Option.DELTA)},
+                 {'v': x[1].get_tick_value(Option.THETA)}]                 
+                 
+             
+            return rf 
+        
+        map(lambda i: dtj['rows'].append({'c': row_fields(sorted_call[i]) +
+                                                [{'v': sorted_call[i][0]}] + 
+                                                row_fields(sorted_put[i])}), range(len(sorted_call)))
+    
+        
+        return json.dumps(dtj) #, indent=4)        

+ 295 - 0
src/rethink/portfolio_monitor.py

@@ -0,0 +1,295 @@
+import logging
+import json
+import time, datetime
+import copy
+from optparse import OptionParser
+from time import sleep
+from misc2.observer import Subscriber
+from misc2.helpers import ContractHelper
+from finopt.instrument import Symbol, Option
+from rethink.option_chain import OptionsChain
+from rethink.tick_datastore import TickDataStore
+from comms.ibc.tws_client_lib import TWS_client_manager, AbstractGatewayListener
+
+
+
+
+
+class PortfolioMonitor(AbstractGatewayListener):
+
+  
+    
+    
+    def __init__(self, kwargs):
+        self.kwargs = copy.copy(kwargs)
+        self.twsc = TWS_client_manager(kwargs)
+        AbstractGatewayListener.__init__(self, kwargs['name'])
+    
+        self.tds = TickDataStore(kwargs['name'])
+        self.tds.register_listener(self)
+        self.twsc.add_listener_topics(self, kwargs['topics'])
+        
+        
+        self.option_chains = {}
+        
+    
+    def create_option_chain(self):
+        '''
+            'underlying': 
+        '''
+        pass
+    def update_option_chain(self, chain_id):
+        pass
+    
+    def test_oc(self, oc2):
+        expiry = '20170330'
+        contractTuple = ('HSI', 'FUT', 'HKFE', 'HKD', expiry, 0, '')
+        contract = ContractHelper.makeContract(contractTuple)  
+        
+        oc2.set_option_structure(contract, 200, 50, 0.0012, 0.0328, expiry)        
+        
+        oc2.build_chain(24119, 0.03, 0.22)
+        
+#         expiry='20170324'
+#         contractTuple = ('QQQ', 'STK', 'SMART', 'USD', '', 0, '')
+#         contract = ContractHelper.makeContract(contractTuple)  
+# 
+#         oc2.set_option_structure(contract, 0.5, 100, 0.0012, 0.0328, expiry)        
+#     
+#         oc2.build_chain(132.11, 0.02, 0.22)
+        
+        
+        oc2.pretty_print()        
+
+        for o in oc2.get_option_chain():
+            self.tds.add_symbol(o)
+        self.tds.add_symbol(oc2.get_underlying())
+        
+    
+    def test_oc3(self, oc3):
+        expiry = '20170427'
+        contractTuple = ('HSI', 'FUT', 'HKFE', 'HKD', expiry, 0, '')
+        contract = ContractHelper.makeContract(contractTuple)  
+         
+        oc3.set_option_structure(contract, 200, 50, 0.0012, 0.0328, expiry)        
+         
+        oc3.build_chain(24380, 0.03, 0.22)
+
+#         expiry = '20170331'
+#         contractTuple = ('QQQ', 'STK', 'SMART', 'USD', '', 0, '')
+# 
+# 
+#         contract = ContractHelper.makeContract(contractTuple)  
+#         
+#         oc3.set_option_structure(contract, 0.5, 100, 0.0012, 0.0328, expiry)        
+#         
+#         oc3.build_chain(130, 0.03, 0.22)
+        
+#         expiry='20170324'
+#         contractTuple = ('QQQ', 'STK', 'SMART', 'USD', '', 0, '')
+#         contract = ContractHelper.makeContract(contractTuple)  
+# 
+#         oc2.set_option_structure(contract, 0.5, 100, 0.0012, 0.0328, expiry)        
+#     
+#         oc2.build_chain(132.11, 0.02, 0.22)
+        
+        
+        oc3.pretty_print()        
+
+        for o in oc3.get_option_chain():
+            self.tds.add_symbol(o)
+        self.tds.add_symbol(oc3.get_underlying())
+        
+    
+    def start_engine(self):
+        self.twsc.start_manager()
+        oc2 = OptionsChain('oc2')
+        oc2.register_listener(self)
+        self.test_oc(oc2)
+        oc3 = OptionsChain('oc3')
+        oc3.register_listener(self)
+        self.test_oc3(oc3)
+        self.option_chains[oc2.name] = oc2
+        self.option_chains[oc3.name] = oc3
+        
+        try:
+            logging.info('PortfolioMonitor:main_loop ***** accepting console input...')
+            menu = {}
+            menu['1']="Display option chain <id>" 
+            menu['2']="Display tick data store "
+            menu['3']="No opt"
+            menu['4']="Exit"
+            while True: 
+                choices=menu.keys()
+                choices.sort()
+                for entry in choices: 
+                    print entry, menu[entry]            
+
+                selection = raw_input("Enter command:")
+                if selection =='1':
+                    oc2.pretty_print()
+                elif selection == '2': 
+                    self.tds.dump()
+                elif selection == '3':
+                    oc3.pretty_print()
+                elif selection == '4': 
+                    self.twsc.gw_message_handler.set_stop()
+                    break
+                else: 
+                    oc3.pretty_print()                
+                
+                sleep(0.15)
+            
+        except (KeyboardInterrupt, SystemExit):
+            logging.error('PortfolioMonitor: caught user interrupt. Shutting down...')
+            self.twsc.gw_message_handler.set_stop() 
+            logging.info('PortfolioMonitor: Service shut down complete...')               
+    
+    
+    #         EVENT_OPTION_UPDATED = 'oc_option_updated'
+    #         EVENT_UNDERLYING_ADDED = 'oc_underlying_added
+    def oc_option_updated(self, event, update_mode, name, instrument):        
+        logging.info('oc_option_updated. %s %s' % (event, vars()))
+        self.tds.add_symbol(instrument)
+        self.twsc.reqMktData(instrument.get_contract(), True)
+        
+    
+    def oc_underlying_added(self, event, update_mode, name, instrument):
+        
+        logging.info('oc_underlying_added. %s %s' % (event, vars()))
+        self.tds.add_symbol(instrument)
+        self.twsc.reqMktData(instrument.get_contract(), True)
+
+    #
+    # tds call backs
+    #
+    #     
+    #         EVENT_TICK_UPDATED = 'tds_event_tick_updated'
+    #         EVENT_SYMBOL_ADDED = 'tds_event_symbol_added'
+    #         EVENT_SYMBOL_DELETED = 'tds_event_symbol_deleted'    
+    
+    def tds_event_symbol_added(self, event, update_mode, name, instrument):
+       pass
+        #logging.info('tds_event_new_symbol_added. %s' % ContractHelper.object2kvstring(symbol.get_contract()))
+        
+    
+    def tds_event_tick_updated(self, event, contract_key, field, price, syms):
+        
+        for s in syms:
+            
+            if OptionsChain.CHAIN_IDENTIFIER in s.get_extra_attributes():
+                results = {}
+                chain_id = s.get_extra_attributes()[OptionsChain.CHAIN_IDENTIFIER]
+                logging.info('PortfolioMonitor:tds_event_tick_updated chain_id %s' % chain_id)
+                if chain_id  in self.option_chains.keys():
+                    if 'FUT' in contract_key or 'STK' in contract_key:
+                        results = self.option_chains[chain_id].cal_greeks_in_chain(self.kwargs['evaluation_date'])
+                    else:
+                        results[ContractHelper.makeRedisKeyEx(s.get_contract())] = self.option_chains[chain_id].cal_option_greeks(s, self.kwargs['evaluation_date'])
+                logging.info('AnalysticsEngine:tds_event_tick_updated. compute greek results %s' % results)    
+                # set_analytics(self, imvol=None, delta=None, gamma=None, theta=None, vega=None, npv=None):
+                # 
+                def update_tds_analytics(key_greeks):
+                    
+                    self.tds.set_symbol_analytics(key_greeks[0], Option.IMPL_VOL, key_greeks[1][Option.IMPL_VOL])
+                    self.tds.set_symbol_analytics(key_greeks[0], Option.DELTA, key_greeks[1][Option.DELTA])
+                    self.tds.set_symbol_analytics(key_greeks[0], Option.GAMMA, key_greeks[1][Option.GAMMA])
+                    self.tds.set_symbol_analytics(key_greeks[0], Option.THETA, key_greeks[1][Option.THETA])
+                    self.tds.set_symbol_analytics(key_greeks[0], Option.VEGA, key_greeks[1][Option.VEGA])
+                    
+                map(update_tds_analytics, list(results.iteritems()))                
+
+            else:
+                
+                continue
+             
+        
+
+
+    def tds_event_symbol_deleted(self, event, update_mode, name, instrument):
+        pass
+    #
+    # external ae requests
+    #
+    def ae_req_greeks(self, event, message_value):
+        #(int tickerId, int field, double impliedVol, double delta, double optPrice, double pvDividend, double gamma, double vega, double theta, double undPrice) 
+        pass
+    
+    def ae_req_tds_internal(self, event, message_value):
+        logging.info('received ae_req_tds_internal')
+        self.tds.dump()
+    
+    #
+    # gateway events
+    #
+
+    def tickPrice(self, event, contract_key, field, price, canAutoExecute):
+        logging.debug('MessageListener:%s. %s %d %8.2f' % (event, contract_key, field, price))
+        self.tds.set_symbol_tick_price(contract_key, field, price, canAutoExecute)
+
+
+    def tickSize(self, event, contract_key, field, size):
+        self.tds.set_symbol_tick_size(contract_key, field, size)
+        #logging.info('MessageListener:%s. %s: %d %8.2f' % (event, contract_key, field, size))
+ 
+    def error(self, event, message_value):
+        logging.info('PortfolioMonitor:%s. val->[%s]' % (event, message_value))         
+        
+        
+if __name__ == '__main__':
+    
+
+    
+    kwargs = {
+      'name': 'portfolio_monitor',
+      'bootstrap_host': 'localhost',
+      'bootstrap_port': 9092,
+      'redis_host': 'localhost',
+      'redis_port': 6379,
+      'redis_db': 0,
+      'tws_host': 'localhost',
+      'tws_api_port': 8496,
+      'tws_app_id': 38868,
+      'group_id': 'PM',
+      'session_timeout_ms': 10000,
+      'clear_offsets':  False,
+      'logconfig': {'level': logging.INFO, 'filemode': 'w', 'filename': '/tmp/pm.log'},
+      'topics': ['tickPrice', 'tickSize'],
+      'seek_to_end': ['*']
+
+      #'seek_to_end':['tickSize', 'tickPrice','gw_subscriptions', 'gw_subscription_changed']
+      }
+
+    usage = "usage: %prog [options]"
+    parser = OptionParser(usage=usage)
+    parser.add_option("-c", "--clear_offsets", action="store_true", dest="clear_offsets",
+                      help="delete all redis offsets used by this program")
+    parser.add_option("-g", "--group_id",
+                      action="store", dest="group_id", 
+                      help="assign group_id to this running instance")
+    parser.add_option("-e", "--evaluation_date",
+                     action="store", dest="evaluation_date", 
+                     help="specify evaluation date for option calculations")   
+    
+    (options, args) = parser.parse_args()
+    if options.evaluation_date == None:
+        options.evaluation_date = time.strftime('%Y%m%d') 
+    
+    for option, value in options.__dict__.iteritems():
+        if value <> None:
+            kwargs[option] = value
+    
+    
+            
+  
+      
+    logconfig = kwargs['logconfig']
+    logconfig['format'] = '%(asctime)s %(levelname)-8s %(message)s'    
+    logging.basicConfig(**logconfig)        
+    
+    
+    server = PortfolioMonitor(kwargs)
+    server.start_engine()
+    
+          
+        

+ 2 - 2
src/sh/gw_ex_request_exit.sh

@@ -9,5 +9,5 @@ else
 	FINOPT_HOME=~/l1304/workspace/finopt-ironfly/finopt/src
 fi
 export PYTHONPATH=$FINOPT_HOME:$PYTHONPATH
-#python $FINOPT_HOME/comms/ibc/gw_ex_request_exit.py -g AAA -n dumpty 
-python $FINOPT_HOME/comms/ibc/gw_ex_request_exit.py -c -g AAA -n dumpty 
+python $FINOPT_HOME/comms/ibc/gw_ex_request_exit.py -g AAA -n dumpty 
+#python $FINOPT_HOME/comms/ibc/gw_ex_request_exit.py -c -g AAA -n dumpty 

+ 2 - 2
src/sh/start_twsgw.sh

@@ -20,7 +20,7 @@ export PYTHONPATH=$FINOPT_HOME:$PYTHONPATH
 
 
 # restart gateway keep the redis offsets but erase the subscription entries
-python $FINOPT_HOME/comms/ibgw/tws_gateway.py  -r -f $FINOPT_HOME/config/tws_gateway.cfg 
+#python $FINOPT_HOME/comms/ibgw/tws_gateway.py  -r -f $FINOPT_HOME/config/tws_gateway.cfg 
 
 # normal restart - keep the offsets and reload from saved subscription entries
-#python $FINOPT_HOME/comms/ibgw/tws_gateway.py   -f $FINOPT_HOME/config/tws_gateway.cfg 
+python $FINOPT_HOME/comms/ibgw/tws_gateway.py   -f $FINOPT_HOME/config/tws_gateway.cfg