Browse Source

saturday coding hero

bobhk 9 năm trước cách đây
mục cha
commit
d9db0cde19

+ 5 - 2
src/comms/ibc/base_client_messaging.py

@@ -53,8 +53,11 @@ class GatewayCommandWrapper():
     def reqExecutions(self, exec_filter=None):
         self.producer.send_message('reqExecutions', ExecutionFilterHelper.object2kvstring(exec_filter) if exec_filter <> None else '')
 
-    def reqMktData(self, contract):
-        self.producer.send_message('reqMktData', ContractHelper.object2kvstring(contract))
+#     def reqMktData(self, contract):
+#         self.producer.send_message('reqMktData', ContractHelper.object2kvstring(contract))
+    def reqMktData(self, contract, snapshot=False):
+        self.producer.send_message('reqMktData', {'contract': ContractHelper.object2kvstring(contract), 
+                                                  'snapshot': snapshot})        
         
     def reqAccountSummary(self, reqId, group, tags):
         self.producer.send_message('reqAccountSummary', self.producer.message_dumps([reqId, group, tags]))

+ 2 - 1
src/comms/ibc/gw_ex_request_exit.py

@@ -36,7 +36,8 @@ class MessageListener(AbstractGatewayListener):
         logging.info('MessageListener:%s. val->[%s]' % (event, message_value))
         
 
-    
+    def tickPrice(self, event, contract_key, field, price, canAutoExecute):
+        logging.info('MessageListener: %s' % vars())
 
 
 def test_client(kwargs):

+ 9 - 3
src/comms/ibgw/base_messaging.py

@@ -194,6 +194,9 @@ class BaseConsumer(threading.Thread, Publisher):
     
     def enrich_message(self, message):
         return {'value': message.value, 'partition':message.partition, 'offset': message.offset}
+        
+    def extract_message_content(self, message):
+        return message.value
     
     def set_stop(self):
         self.done = True
@@ -306,7 +309,8 @@ class BaseConsumer(threading.Thread, Publisher):
                         # if there is no gap                          
                         if gap == 1:
                             # the message is valid for dispatching and not to be skipped
-                            self.dispatch(message.topic, self.enrich_message(message))
+                            #self.dispatch(message.topic, self.enrich_message(message))
+                            self.dispatch(message.topic, **self.extract_message_content(message))
                             logging.debug('*** On first iteration: Gap=%d Dispatch this valid message to the listener <%s>' % (gap, message.value))
                         else: # gap exists
                             logging.info("*** On first iteration: [Topic:%s:Part:%d:Offset:%d]: Gap:%d Attempting to seek to latest message ..." 
@@ -337,12 +341,14 @@ class BaseConsumer(threading.Thread, Publisher):
                     # both saved value in redis and current offset are both 0
                     if self.my_topics[message.topic][str(message.partition)] == message.offset and message.offset <> 0:
                         self.dispatch(BaseConsumer.KB_REACHED_LAST_OFFSET, self.enrich_message(message))
-                        self.dispatch(message.topic, self.enrich_message(message))
+                        #self.dispatch(message.topic, self.enrich_message(message))
+                        self.dispatch(message.topic, **self.extract_message_content(message))
                         logging.info('********************** reached the last message previously processed %s %d' % (message.topic, message.offset))
                     else:
                         self.persist_offsets(message.topic, message.partition, message.offset)
                         #self.dispatch(BaseConsumer.KB_EVENT, {'message': message})
-                        self.dispatch(message.topic, self.enrich_message(message))
+                        #self.dispatch(message.topic, self.enrich_message(message))
+                        self.dispatch(message.topic, **self.extract_message_content(message))
             except StopIteration:
                 logging.debug('BaseConsumer:run StopIteration Caught. No new message arriving...')
                 continue

+ 13 - 7
src/comms/ibgw/subscription_manager.py

@@ -46,6 +46,9 @@ class SubscriptionManager(BaseMessageListener):
 
         self.load_subscriptions()
         
+    
+    def get_contract_by_id(self, id):
+        return self.idContractMap['id_contract']
             
     def reset_subscriptions(self, reset_db):
         if reset_db:
@@ -143,7 +146,8 @@ class SubscriptionManager(BaseMessageListener):
             
     def reqMktData(self, event, message):
                   
-        contract = ContractHelper.kvstring2object(message['value'], Contract)
+        contract = ContractHelper.kvstring2object(message['contract'], Contract)
+        snapshot = message['snapshot']
         #logging.info('SubscriptionManager: reqMktData')
   
         id = self.is_subscribed(contract)
@@ -154,14 +158,16 @@ class SubscriptionManager(BaseMessageListener):
             # the conId must be set to zero when calling TWS reqMktData
             # otherwise TWS will fail to subscribe the contract
             contract.m_conId = 0
+            self.request_market_data(id, contract, True)
             self.request_market_data(id, contract, False) 
             self.is_dirty = True
                 
             logging.info('SubscriptionManager:reqMktData. Requesting market data, id = %d, contract = %s' % (id, ContractHelper.makeRedisKeyEx(contract)))
         
-        else:    
-            self.request_market_data(id, contract, True)
-            logging.info('SubscriptionManager:reqMktData. contract already subscribed. Request snapshot = %d, contract = %s' % (id, ContractHelper.makeRedisKeyEx(contract)))
+        else:  
+            self.request_market_data(id, contract, snapshot)
+            logging.info('SubscriptionManager:reqMktData. Request id: %d, contract = %s snapshot=%s' % 
+                         (id, ContractHelper.makeRedisKeyEx(contract), snapshot))
         #self.dump()
 
         #
@@ -176,9 +182,9 @@ class SubscriptionManager(BaseMessageListener):
         'offset': 13
         }
         '''      
-        subscription_array =  {'subscriptions': [[id, ContractHelper.object2kvstring(contract)]] }
-        self.producer.send_message('gw_subscription_changed', self.producer.message_dumps( subscription_array   ))
-        logging.info('SubscriptionManager:reqMktData. Publish gw_subscription_changed: %d:%s' % (id, ContractHelper.makeRedisKeyEx(contract)))
+#         subscription_array =  {'subscriptions': [[id, ContractHelper.object2kvstring(contract)]] }
+#         self.producer.send_message('gw_subscription_changed', self.producer.message_dumps( subscription_array   ))
+#         logging.info('SubscriptionManager:reqMktData. Publish gw_subscription_changed: %d:%s' % (id, ContractHelper.makeRedisKeyEx(contract)))
         
         
         

+ 5 - 2
src/comms/ibgw/tws_event_handler.py

@@ -4,6 +4,7 @@ from time import strftime
 import logging
 import traceback
 from ib.ext.EWrapper import EWrapper
+from comms.ibgw import subscription_manager
 
         
 class TWS_event_handler(EWrapper):
@@ -11,8 +12,9 @@ class TWS_event_handler(EWrapper):
     TICKER_GAP = 1000
     producer = None
     
-    def __init__(self, producer):        
+    def __init__(self, producer, subscription_manager):        
         self.producer = producer
+        self.subscription_manger = subscription_manager
  
  
 
@@ -70,7 +72,8 @@ class TWS_event_handler(EWrapper):
     
     def tickPrice(self, tickerId, field, price, canAutoExecute):
         
-        self.broadcast_event('tickPrice', vars())
+        self.broadcast_event('tickPrice', {self.subscription_manger.get_contract_by_id(tickerId),
+                                           field, price, canAutoExecute})
 
     def tickSize(self, tickerId, field, size):
         

+ 9 - 7
src/docs/ticker_handling.txt

@@ -26,20 +26,22 @@ Case 1: Request market data snapshot
 4. map id:key 
 - tws_connection -> reqMktData(id + 1000|id, contract, True|False)
 
-Case 2: Receive market data
+Case 2: Receive market data from TWS
 
 5. tws <-> 6. server messaging <-> 7. client messaging <-> 8. client
 
-5. Receive tickPrice
-- TWS_event_handler(EWrapper) -> tickPrice -> remap id:contract-key
-- update vars() by replacing tickId with contract-key 
+5. Receive tickPrice 
+- comms.ibgw.tws_event_handler TWS_event_handler(EWrapper) -> tickPrice -> remap id:contract-key
+- update vars() by replacing tickId with contract-key subscrition_manager.get_contract_by_id
 6
-- self.broadcast_event('tickPrice', vars())
+- tws_event_handler->self.broadcast_event('tickPrice', {self.subscription_manger.get_contract_by_id(tickerId),
+                                           field, price, canAutoExecute})
 - self.producer.send_message(message, self.producer.message_dumps(dict))
 
 
-7 
-- baseconsumer (**json.loads(text_message))
+7
+-  
+- baseconsumer: self.dispatch(message.topic, **self.extract_message_content(message))
 - Inherit AbstractGatewayListener(BaseMessageListener) / tickPrice(tickerId, field, price, canAutoExecute):