subscription_manager.py 9.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. import logging
  4. from time import strftime
  5. import json
  6. from misc2.helpers import ContractHelper
  7. from ib.ext.Contract import Contract
  8. from comms.ibgw.base_messaging import BaseMessageListener
  9. from comms.ibgw.tws_event_handler import TWS_event_handler
  10. class SubscriptionManager(BaseMessageListener):
  11. TICKER_GAP = 1000
  12. def __init__(self, name, tws_connection, producer, rs_conn, subscription_key):
  13. BaseMessageListener.__init__(self, name)
  14. self.tws_connect = tws_connection
  15. self.producer = producer
  16. self.rs = rs_conn
  17. self.subscription_key = subscription_key
  18. #self.handle = []
  19. # contract key map to contract ID (index of the handle array)
  20. #self.tickerId = {}
  21. '''
  22. idContractMap has 3 keys
  23. next_id keeps track of the next_id to use when subscribing market data from TWS
  24. id_contract and contract_id are dict and reverse dict that store the index of id
  25. to contarct and vice versa
  26. id_contract: {<int>, <Contract>}
  27. contract_id: {<kvs_contract>, <int>}
  28. '''
  29. self.idContractMap ={'next_id': 0, 'id_contract':{},'contract_id':{}}
  30. # flag to indicate whether to save changes when persist_subscriptions is called
  31. self.is_dirty = False
  32. self.load_subscriptions()
  33. def load_subscriptions(self):
  34. '''
  35. the function retrieves a json string representation of a list of {id:contracts}
  36. from redis.
  37. next, get rid of the contracts that are expired and of type of either fut or opt
  38. next, rebuild the internal dict idContractMap['id_contract'] and reverse dict
  39. idContractMap['contract_id']
  40. gather all the ids in the newly populated dict (which may contain holes due to
  41. expired contracts and thus not necessarily a sequence), determine the max id
  42. add 1 to it to form the next_id
  43. request snapshot and fresh market data from the TWS gateway
  44. '''
  45. def is_outstanding(ic):
  46. c = ic[1]
  47. today = strftime('%Y%m%d')
  48. if c.m_expiry < today and (c.m_secType == 'OPT' or c.m_secType == 'FUT'):
  49. logging.info('initialize_subscription_mgr: ignoring expired contract %s%s%s' % (c.m_expiry, c.m_strike, c.m_right))
  50. return False
  51. return True
  52. # retrieve the id-contract list from db
  53. # remap the list by instantiating the string to object
  54. # get rid of the already expired contracts
  55. saved_iclist = self.get_id_contracts(db=True)
  56. if saved_iclist:
  57. ic_list= filter(lambda ic:is_outstanding, saved_iclist)
  58. # rebuild the internal data map
  59. for ic in ic_list:
  60. self.idContractMap['id_contract'][ic[0]] = ic[1]
  61. self.idContractMap['contract_id'][ContractHelper.makeRedisKeyEx(ic[1])] = ic[0]
  62. # derive the next id by finding the max id
  63. max_id = reduce(lambda x,y: max(x,y), self.idContractMap['id_contract'].keys())
  64. self.idContractMap['next_id'] = max_id + 1
  65. logging.info('SubscriptionManager:load_subscription. the next_id is set to: %d' % (self.idContractMap['next_id']))
  66. self.dump()
  67. # subscribe market data, first call is normal subscription,
  68. # first for snapshot, then subscribe for the latest
  69. logging.info('SubscriptionManager:load_subscription. request market data for: %s' % (ic_list))
  70. map(lambda ic: self.request_market_data(ic[0], ic[1], snapshot=True), ic_list)
  71. map(lambda ic: self.request_market_data(ic[0], ic[1], snapshot=False), ic_list)
  72. else:
  73. logging.warn('SubscriptionManager:load_subscription. No saved id:contracts found in redis.')
  74. logging.info('SubscriptionManager:load_subscription. Complete populating stored map into idContract dict.')
  75. def request_market_data(self, id, contract, snapshot=False):
  76. if snapshot:
  77. # the call to TWS will return a snapshot follow
  78. # by the subscription being cancelled. Add 1000 to avoid clashing
  79. # with other subscription ids.
  80. self.tws_connect.reqMktData(id + TWS_event_handler.TICKER_GAP, contract, '', True)
  81. else:
  82. self.tws_connect.reqMktData(id, contract, '', False)
  83. # returns -1 if not found, else the key id (which could be a zero value)
  84. def is_subscribed(self, contract):
  85. ckey = ContractHelper.makeRedisKeyEx(contract)
  86. logging.debug('is_subscribed %s' % ckey)
  87. try:
  88. return self.idContractMap['contract_id'][ckey]
  89. except KeyError:
  90. logging.debug('is_subscribed: key not found %s' % ckey)
  91. return -1
  92. def add_subscription(self, contract):
  93. #
  94. # structure of idContractMap ={'next_id': -1, 'id_contract':{}, 'contract_id':{}}
  95. #
  96. id = self.idContractMap['next_id']
  97. self.idContractMap['id_contract'][id] = contract
  98. logging.debug('add_subscription %s' % ContractHelper.makeRedisKeyEx(contract))
  99. self.idContractMap['contract_id'][ContractHelper.makeRedisKeyEx(contract)] = id
  100. self.idContractMap['next_id'] = id + 1
  101. return self.idContractMap['next_id']
  102. def reqMktData(self, event, message):
  103. contract = ContractHelper.kvstring2object(message['value'], Contract)
  104. #logging.info('SubscriptionManager: reqMktData')
  105. id = self.is_subscribed(contract)
  106. if id == -1: # not found
  107. id = self.add_subscription(contract)
  108. #
  109. # the conId must be set to zero when calling TWS reqMktData
  110. # otherwise TWS will fail to subscribe the contract
  111. contract.m_conId = 0
  112. self.request_market_data(id, contract, False)
  113. self.is_dirty = True
  114. logging.info('SubscriptionManager:reqMktData. Requesting market data, id = %d, contract = %s' % (id, ContractHelper.makeRedisKeyEx(contract)))
  115. else:
  116. self.request_market_data(id, contract, True)
  117. logging.info('SubscriptionManager:reqMktData. contract already subscribed. Request snapshot = %d, contract = %s' % (id, ContractHelper.makeRedisKeyEx(contract)))
  118. #self.dump()
  119. #
  120. # instruct gateway to broadcast new id has been assigned to a new contract
  121. #
  122. self.producer.send_message('gw_subscription_changed', self.producer.message_dumps({id: ContractHelper.object2kvstring(contract)}))
  123. logging.info('SubscriptionManager:reqMktData. Publish gw_subscription_changed: %d:%s' % (id, ContractHelper.makeRedisKeyEx(contract)))
  124. # use only after a broken connection is restored
  125. def force_resubscription(self):
  126. self.load_subscriptions()
  127. # return id:contract object
  128. def get_id_contracts(self, db=False):
  129. if db:
  130. try:
  131. id_contracts = json.loads(self.rs.get(self.subscription_key))
  132. def utf2asc(x):
  133. return x if isinstance(x, unicode) else x
  134. return map(lambda x: (x[0], ContractHelper.kvstring2contract(utf2asc(x[1]))), id_contracts)
  135. except TypeError:
  136. logging.error('SubscriptionManager:get_id_contracts. Exception when trying to get id_contracts from redis ***')
  137. return None
  138. else:
  139. return map(lambda x: (x[0], x[1]),
  140. list(self.idContractMap['id_contract'].iteritems()))
  141. # return id:contract_strings
  142. def get_id_kvs_contracts(self, db):
  143. return map(lambda x:(x[0], ContractHelper.contract2kvstring(x[1])), self.get_id_contracts(db))
  144. def persist_subscriptions(self):
  145. if self.is_dirty:
  146. # for each id:contract pair in idContractMap['id_contract'] dict, map to a list of (id, kvs_contract) values
  147. ic = json.dumps(self.get_id_kvs_contracts(db=False))
  148. self.rs.set(self.subscription_key, ic)
  149. self.is_dirty = False
  150. logging.info('Tws_gateway:persist_subscriptions. updating subscription table to redis store %s' % ic)
  151. self.dump()
  152. def dump(self):
  153. logging.info('subscription manager table:---------------------\n')
  154. logging.info(''.join ('\n[%s]:[%s]' % (str(ic[0]).rjust(4), ic[1]) for ic in self.get_id_kvs_contracts(db=False)))
  155. logging.info(''.join ('\n[%s]:[%d]' % (k.rjust(20), self.idContractMap['contract_id'][k])
  156. for k in sorted(self.idContractMap['contract_id'])))
  157. logging.info( 'Number of instruments subscribed: %d' % self.idContractMap['next_id'])
  158. logging.info( '------------------------------------------------')
  159. """
  160. Client requests to TWS_gateway
  161. """
  162. def gw_req_subscriptions(self, event, message):
  163. try:
  164. from_id = json.loads(message['value'])['sender_id']
  165. except:
  166. from_id = '<empty_sender_id>'
  167. ic = self.get_id_kvs_contracts(db=False)
  168. #print self.producer.message_dumps({'subscriptions': ic, 'sender_id':self.name, 'target_id':from_id})
  169. if ic:
  170. logging.info('SubscriptionManager:gw_req_subscriptions-------\n%s' % ic)
  171. self.producer.send_message('gw_subscriptions', self.producer.message_dumps({'subscriptions': ic, 'sender_id':self.name, 'target_id':from_id}))