|
|
@@ -36,20 +36,32 @@ class SubscriptionManager(BaseMessageListener):
|
|
|
id_contract and contract_id are dict and reverse dict that store the index of id
|
|
|
to contarct and vice versa
|
|
|
|
|
|
- id_contract: {<int>, <Contract>}
|
|
|
- contract_id: {<kvs_contract>, <int>}
|
|
|
+ id_contract: {<int>, <contract-redis-key>}
|
|
|
+ contract_id: {<contract-redis-key>, <int>}
|
|
|
|
|
|
'''
|
|
|
self.idContractMap ={'next_id': 0, 'id_contract':{},'contract_id':{}}
|
|
|
# flag to indicate whether to save changes when persist_subscriptions is called
|
|
|
self.is_dirty = False
|
|
|
|
|
|
- logging.warn('***** TEMPORARILY skip loading subscriptions from redis!!!')
|
|
|
+ #logging.warn('***** TEMPORARILY skip loading subscriptions from redis!!!')
|
|
|
self.load_subscriptions()
|
|
|
|
|
|
|
|
|
def get_contract_by_id(self, id):
|
|
|
- return self.idContractMap['id_contract'][id]
|
|
|
+
|
|
|
+ try:
|
|
|
+ logging.debug('get_contract_by_id %d' % id)
|
|
|
+ #self.dump()
|
|
|
+ return self.idContractMap['id_contract'][id]
|
|
|
+
|
|
|
+ except (KeyError, ):
|
|
|
+
|
|
|
+ if (id >= TWS_event_handler.TICKER_GAP):
|
|
|
+ return self.idContractMap['id_contract'][id - TWS_event_handler.TICKER_GAP]
|
|
|
+
|
|
|
+
|
|
|
+ raise
|
|
|
|
|
|
def reset_subscriptions(self, reset_db):
|
|
|
if reset_db:
|
|
|
@@ -89,7 +101,7 @@ class SubscriptionManager(BaseMessageListener):
|
|
|
# rebuild the internal data map
|
|
|
for ic in ic_list:
|
|
|
self.idContractMap['id_contract'][ic[0]] = ic[1]
|
|
|
- self.idContractMap['contract_id'][ContractHelper.makeRedisKeyEx(ic[1])] = ic[0]
|
|
|
+ self.idContractMap['contract_id'][ic[1]] = ic[0]
|
|
|
|
|
|
# derive the next id by finding the max id
|
|
|
max_id = reduce(lambda x,y: max(x,y), self.idContractMap['id_contract'].keys())
|
|
|
@@ -99,8 +111,9 @@ class SubscriptionManager(BaseMessageListener):
|
|
|
# subscribe market data, first call is normal subscription,
|
|
|
# first for snapshot, then subscribe for the latest
|
|
|
logging.info('SubscriptionManager:load_subscription. request market data for: %s' % (ic_list))
|
|
|
- map(lambda ic: self.request_market_data(ic[0], ic[1], snapshot=True), ic_list)
|
|
|
- map(lambda ic: self.request_market_data(ic[0], ic[1], snapshot=False), ic_list)
|
|
|
+
|
|
|
+ map(lambda ic: self.request_market_data(ic[0], ContractHelper.makeContractfromRedisKeyEx(ic[1]), snapshot=True), ic_list)
|
|
|
+ map(lambda ic: self.request_market_data(ic[0], ContractHelper.makeContractfromRedisKeyEx(ic[1]), snapshot=False), ic_list)
|
|
|
|
|
|
else:
|
|
|
logging.warn('SubscriptionManager:load_subscription. No saved id:contracts found in redis.')
|
|
|
@@ -108,12 +121,14 @@ class SubscriptionManager(BaseMessageListener):
|
|
|
logging.info('SubscriptionManager:load_subscription. Complete populating stored map into idContract dict.')
|
|
|
|
|
|
def request_market_data(self, id, contract, snapshot=False):
|
|
|
+ contract.m_conId = 0
|
|
|
if snapshot:
|
|
|
# the call to TWS will return a snapshot follow
|
|
|
# by the subscription being cancelled. Add 1000 to avoid clashing
|
|
|
# with other subscription ids.
|
|
|
- print 'request_market_data: %d' % (id + TWS_event_handler.TICKER_GAP)
|
|
|
+ logging.info( 'request_market_data: %d %s' % (id + TWS_event_handler.TICKER_GAP, ContractHelper.printContract(contract)))
|
|
|
self.tws_connect.reqMktData(id + TWS_event_handler.TICKER_GAP, contract, '', True)
|
|
|
+
|
|
|
else:
|
|
|
self.tws_connect.reqMktData(id, contract, '', False)
|
|
|
#
|
|
|
@@ -129,7 +144,7 @@ class SubscriptionManager(BaseMessageListener):
|
|
|
try:
|
|
|
return self.idContractMap['contract_id'][ckey]
|
|
|
except KeyError:
|
|
|
- logging.debug('is_subscribed: key not found %s' % ckey)
|
|
|
+ logging.warn('is_subscribed: key not found %s' % ckey)
|
|
|
return -1
|
|
|
|
|
|
def add_subscription(self, contract):
|
|
|
@@ -137,12 +152,13 @@ class SubscriptionManager(BaseMessageListener):
|
|
|
# structure of idContractMap ={'next_id': -1, 'id_contract':{}, 'contract_id':{}}
|
|
|
#
|
|
|
id = self.idContractMap['next_id']
|
|
|
- self.idContractMap['id_contract'][id] = contract
|
|
|
- logging.debug('add_subscription %s' % ContractHelper.makeRedisKeyEx(contract))
|
|
|
- self.idContractMap['contract_id'][ContractHelper.makeRedisKeyEx(contract)] = id
|
|
|
+ key = ContractHelper.makeRedisKeyEx(contract)
|
|
|
+ self.idContractMap['id_contract'][id] = key
|
|
|
+ logging.info('add_subscription %s' % key)
|
|
|
+ self.idContractMap['contract_id'][key] = id
|
|
|
self.idContractMap['next_id'] = id + 1
|
|
|
|
|
|
- return self.idContractMap['next_id']
|
|
|
+ return id
|
|
|
|
|
|
|
|
|
#def reqMktData(self, event, message):
|
|
|
@@ -153,23 +169,27 @@ class SubscriptionManager(BaseMessageListener):
|
|
|
#logging.info('SubscriptionManager: reqMktData')
|
|
|
|
|
|
contract = ContractHelper.kvstring2object(contract, Contract)
|
|
|
+
|
|
|
+
|
|
|
id = self.is_subscribed(contract)
|
|
|
+ logging.info('reqMktData subscription_manager: id %d' % id)
|
|
|
+
|
|
|
if id == -1: # not found
|
|
|
|
|
|
id = self.add_subscription(contract)
|
|
|
#
|
|
|
# the conId must be set to zero when calling TWS reqMktData
|
|
|
# otherwise TWS will fail to subscribe the contract
|
|
|
- contract.m_conId = 0
|
|
|
+
|
|
|
self.request_market_data(id, contract, True)
|
|
|
- self.request_market_data(id, contract, False)
|
|
|
+ self.request_market_data(id, contract, False)
|
|
|
self.is_dirty = True
|
|
|
|
|
|
- logging.info('SubscriptionManager:reqMktData. Requesting market data, id = %d, contract = %s' % (id, ContractHelper.makeRedisKeyEx(contract)))
|
|
|
+ logging.info('SubscriptionManager:reqMktData. New request: id = %d, contract = %s' % (id, ContractHelper.makeRedisKeyEx(contract)))
|
|
|
|
|
|
else:
|
|
|
- self.request_market_data(id, contract, snapshot)
|
|
|
- logging.info('SubscriptionManager:reqMktData. Request id: %d, contract = %s snapshot=%s' %
|
|
|
+ self.request_market_data(id, contract, True)
|
|
|
+ logging.info('SubscriptionManager:reqMktData. Existing request get snapshot. id: %d, contract = %s snapshot=%s' %
|
|
|
(id, ContractHelper.makeRedisKeyEx(contract), snapshot))
|
|
|
#self.dump()
|
|
|
|
|
|
@@ -203,10 +223,8 @@ class SubscriptionManager(BaseMessageListener):
|
|
|
try:
|
|
|
id_contracts = json.loads(self.rs.get(self.subscription_key))
|
|
|
|
|
|
- def utf2asc(x):
|
|
|
- return x if isinstance(x, unicode) else x
|
|
|
-
|
|
|
- return map(lambda x: (x[0], ContractHelper.kvstring2contract(utf2asc(x[1]))), id_contracts)
|
|
|
+
|
|
|
+ return id_contracts
|
|
|
except TypeError:
|
|
|
logging.error('SubscriptionManager:get_id_contracts. Exception when trying to get id_contracts from redis ***')
|
|
|
return None
|
|
|
@@ -223,7 +241,7 @@ class SubscriptionManager(BaseMessageListener):
|
|
|
|
|
|
if self.is_dirty:
|
|
|
# for each id:contract pair in idContractMap['id_contract'] dict, map to a list of (id, kvs_contract) values
|
|
|
- ic = json.dumps(self.get_id_kvs_contracts(db=False))
|
|
|
+ ic = json.dumps(self.get_id_contracts(db=False))
|
|
|
self.rs.set(self.subscription_key, ic)
|
|
|
self.is_dirty = False
|
|
|
|
|
|
@@ -233,7 +251,7 @@ class SubscriptionManager(BaseMessageListener):
|
|
|
def dump(self):
|
|
|
|
|
|
logging.info('subscription manager table:---------------------\n')
|
|
|
- logging.info(''.join ('\n[%s]:[%s]' % (str(ic[0]).rjust(4), ic[1]) for ic in self.get_id_kvs_contracts(db=False)))
|
|
|
+ logging.info(''.join ('\n[%s]:[%s]' % (str(ic[0]).rjust(4), ic[1]) for ic in self.get_id_contracts(db=False)))
|
|
|
logging.info(''.join ('\n[%s]:[%d]' % (k.rjust(20), self.idContractMap['contract_id'][k])
|
|
|
for k in sorted(self.idContractMap['contract_id'])))
|
|
|
logging.info( 'Number of instruments subscribed: %d' % self.idContractMap['next_id'])
|
|
|
@@ -249,7 +267,7 @@ class SubscriptionManager(BaseMessageListener):
|
|
|
except:
|
|
|
from_id = '<empty_sender_id>'
|
|
|
|
|
|
- ic = self.get_id_kvs_contracts(db=False)
|
|
|
+ ic = self.get_id_contracts(db=False)
|
|
|
|
|
|
|
|
|
self.producer.send_message('gw_subscriptions', self.producer.message_dumps({'subscriptions': ic , 'sender_id':self.name, 'target_id':from_id}))
|