tws_gateway.py 9.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. import sys
  4. import copy
  5. from time import sleep, strftime
  6. import ConfigParser
  7. import logging
  8. import json
  9. from ib.ext.Contract import Contract
  10. from ib.ext.EClientSocket import EClientSocket
  11. from misc2.helpers import ContractHelper
  12. from comms.ibgw.base_messaging import Prosumer
  13. from comms.ibgw.tws_event_handler import TWS_event_handler
  14. from comms.ibgw.client_request_handler import ClientRequestHandler
  15. from comms.ibgw.subscription_manager import SubscriptionManager
  16. from comms.tws_protocol_helper import TWS_Protocol
  17. import redis
  18. class TWS_gateway():
  19. # monitor IB connection / heart beat
  20. # ibh = None
  21. # tlock = None
  22. # ib_conn_status = None
  23. TWS_GW_DEFAULT_CONFIG = {
  24. 'name': 'tws_gateway_server',
  25. 'bootstrap_host': 'localhost',
  26. 'bootstrap_port': 9092,
  27. 'redis_host': 'localhost',
  28. 'redis_port': 6379,
  29. 'redis_db': 0,
  30. 'tws_host': 'localhost',
  31. 'tws_api_port': 8496,
  32. 'tws_app_id': 38888,
  33. 'group_id': 'TWS_GW',
  34. 'session_timeout_ms': 10000,
  35. 'clear_offsets': False,
  36. 'order_transmit': False,
  37. 'topics': list(TWS_Protocol.topicMethods) + list(TWS_Protocol.gatewayMethods)
  38. }
  39. def __init__(self, kwargs):
  40. self.kwargs = copy.copy(TWS_gateway.TWS_GW_DEFAULT_CONFIG)
  41. for key in self.kwargs:
  42. if key in kwargs:
  43. self.kwargs[key] = kwargs.pop(key)
  44. self.kwargs.update(kwargs)
  45. '''
  46. TWS_gateway start up sequence
  47. 1. establish redis connection
  48. 2. initialize prosumer instance - gateway message handler
  49. 3. establish TWS gateway connectivity
  50. 4. initialize listeners: ClientRequestHandler and SubscriptionManager
  51. 5. start the prosumer
  52. '''
  53. logging.info('starting up TWS_gateway...')
  54. self.ib_order_transmit = self.kwargs['order_transmit']
  55. logging.info('Order straight through (no-touch) flag = %s' % ('True' if self.ib_order_transmit == True else 'False'))
  56. logging.info('establishing redis connection...')
  57. self.initialize_redis()
  58. logging.info('starting up gateway message handler - kafka Prosumer...')
  59. self.gw_message_handler = Prosumer(name='tws_gw_prosumer', kwargs=self.kwargs)
  60. logging.info('initializing TWS_event_handler...')
  61. self.tws_event_handler = TWS_event_handler(self.gw_message_handler)
  62. logging.info('starting up IB EClientSocket...')
  63. self.tws_connection = EClientSocket(self.tws_event_handler)
  64. logging.info('establishing TWS gateway connectivity...')
  65. if not self.connect_tws():
  66. logging.error('TWS_gateway: unable to establish connection to IB %s:%d' %
  67. (self.kwargs['tws_host'], self.kwargs['tws_api_port']))
  68. self.disconnect_tws()
  69. sys.exit(-1)
  70. else:
  71. # start heart beat monitor
  72. pass
  73. # logging.info('starting up IB heart beat monitor...')
  74. # self.tlock = Lock()
  75. # self.ibh = IbHeartBeat(config)
  76. # self.ibh.register_listener([self.on_ib_conn_broken])
  77. # self.ibh.run()
  78. logging.info('start TWS_event_handler. Entering processing loop...')
  79. self.gw_message_handler.start_prosumer()
  80. logging.info('instantiating listeners...cli_req_handler')
  81. self.cli_req_handler = ClientRequestHandler('client_request_handler', self)
  82. logging.info('instantiating listeners subscription manager...')
  83. self.initialize_subscription_mgr()
  84. logging.info('registering messages to listen...')
  85. self.gw_message_handler.add_listeners([self.cli_req_handler])
  86. self.gw_message_handler.add_listener_topics(self.contract_subscription_mgr, ['reqMktData'])
  87. logging.info('**** Completed initialization sequence. ****')
  88. self.main_loop()
  89. def initialize_subscription_mgr(self):
  90. self.contract_subscription_mgr = SubscriptionManager(self, self)
  91. self.contract_subscription_mgr.register_persistence_callback(self.persist_subscriptions)
  92. key = self.kwargs["subscription_manager.subscriptions.redis_key"]
  93. if self.rs.get(key):
  94. #contracts = map(lambda x: ContractHelper.kvstring2contract(x), json.loads(self.rs.get(key)))
  95. def is_outstanding(c):
  96. today = strftime('%Y%m%d')
  97. if c.m_expiry < today:
  98. logging.info('initialize_subscription_mgr: ignoring expired contract %s%s%s' % (c.m_expiry, c.m_strike, c.m_right))
  99. return False
  100. return True
  101. contracts = filter(lambda x: is_outstanding(x),
  102. map(lambda x: ContractHelper.kvstring2object(x, Contract), json.loads(self.rs.get(key))))
  103. self.contract_subscription_mgr.load_subscription(contracts)
  104. def persist_subscriptions(self, contracts):
  105. key = self.kwargs["subscription_manager.subscriptions.redis_key"]
  106. #cs = json.dumps(map(lambda x: ContractHelper.contract2kvstring(x) if x <> None else None, contracts))
  107. cs = json.dumps(map(lambda x: ContractHelper.object2kvstring(x) if x <> None else None, contracts))
  108. logging.debug('Tws_gateway: updating subscription table to redis store %s' % cs)
  109. self.rs.set(key, cs)
  110. def initialize_redis(self):
  111. self.rs = redis.Redis(self.kwargs['redis_host'], self.kwargs['redis_port'], self.kwargs['redis_db'])
  112. try:
  113. self.rs.client_list()
  114. except redis.ConnectionError:
  115. logging.error('TWS_gateway: unable to connect to redis server using these settings: %s port:%d db:%d' %
  116. (self.kwargs['redis_host'], self.kwargs['redis_port'], self.kwargs['redis_db']))
  117. logging.error('aborting...')
  118. sys.exit(-1)
  119. def connect_tws(self):
  120. if type(self.kwargs['tws_app_id']) <> int:
  121. logging.error('TWS_gateway:connect_tws. tws_app_id must be of int type but detected %s!' % str(type(kwargs['tws_app_id'])))
  122. sys.exit(-1)
  123. logging.info('TWS_gateway - eConnect. Connecting to %s:%d App Id: %d...' %
  124. (self.kwargs['tws_host'], self.kwargs['tws_api_port'], self.kwargs['tws_app_id']))
  125. self.tws_connection.eConnect(self.kwargs['tws_host'], self.kwargs['tws_api_port'], self.kwargs['tws_app_id'])
  126. return self.tws_connection.isConnected()
  127. def disconnect_tws(self, value=None):
  128. sleep(2)
  129. self.tws_connection.eDisconnect()
  130. def on_ib_conn_broken(self, msg):
  131. logging.error('TWS_gateway: detected broken IB connection!')
  132. self.ib_conn_status = 'ERROR'
  133. self.tlock.acquire() # this function may get called multiple times
  134. try: # block until another party finishes executing
  135. if self.ib_conn_status == 'OK': # check status
  136. return # if already fixed up while waiting, return
  137. self.eDisconnect()
  138. self.eConnect()
  139. while not self.tws_connection.isConnected():
  140. logging.error('TWS_gateway: attempt to reconnect...')
  141. self.eConnect()
  142. sleep(2)
  143. # we arrived here because the connection has been restored
  144. # resubscribe tickers again!
  145. logging.info('TWS_gateway: IB connection restored...resubscribe contracts')
  146. self.contract_subscription_mgr.force_resubscription()
  147. finally:
  148. self.tlock.release()
  149. def main_loop(self):
  150. try:
  151. logging.info('TWS_gateway:main_loop ***** accepting console input...')
  152. while True:
  153. sleep(.45)
  154. except (KeyboardInterrupt, SystemExit):
  155. logging.error('TWS_gateway: caught user interrupt. Shutting down...')
  156. self.gw_message_handler.set_stop()
  157. self.gw_message_handler.join()
  158. logging.info('TWS_gateway: Service shut down complete...')
  159. sys.exit(0)
  160. class ConfigMap():
  161. def kwargs_from_file(self, path):
  162. cfg = ConfigParser.ConfigParser()
  163. if len(cfg.read(path)) == 0:
  164. raise ValueError, "Failed to open config file [%s]" % path
  165. kwargs = {}
  166. for section in cfg.sections():
  167. optval_list = map(lambda o: (o, cfg.get(section, o)), cfg.options(section))
  168. for ov in optval_list:
  169. try:
  170. kwargs[ov[0]] = eval(ov[1])
  171. except:
  172. continue
  173. #logging.debug('ConfigMap: %s' % kwargs)
  174. return kwargs
  175. if __name__ == '__main__':
  176. if len(sys.argv) != 2:
  177. print("Usage: %s <config file>" % sys.argv[0])
  178. exit(-1)
  179. cfg_path= sys.argv[1:]
  180. kwargs = ConfigMap().kwargs_from_file(cfg_path)
  181. logconfig = kwargs['logconfig']
  182. logconfig['format'] = '%(asctime)s %(levelname)-8s %(message)s'
  183. logging.basicConfig(**logconfig)
  184. app = TWS_gateway(kwargs)