tws_gateway.py 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. import sys
  4. import copy
  5. from time import sleep, strftime
  6. import logging
  7. import json
  8. from ib.ext.Contract import Contract
  9. from ib.ext.EClientSocket import EClientSocket
  10. from misc2.helpers import ContractHelper, ConfigMap
  11. from optparse import OptionParser
  12. from comms.ibgw.base_messaging import Prosumer
  13. from comms.ibgw.tws_event_handler import TWS_event_handler
  14. from comms.ibgw.ib_heartbeat import IbHeartBeat
  15. from comms.ibgw.client_request_handler import ClientRequestHandler
  16. from comms.ibgw.subscription_manager import SubscriptionManager
  17. from comms.tws_protocol_helper import TWS_Protocol
  18. import redis
  19. import threading
  20. from threading import Lock
  21. class TWS_gateway():
  22. # monitor IB connection / heart beat
  23. # ibh = None
  24. # tlock = None
  25. # ib_conn_status = None
  26. TWS_GW_DEFAULT_CONFIG = {
  27. 'name': 'tws_gateway_server',
  28. 'bootstrap_host': 'localhost',
  29. 'bootstrap_port': 9092,
  30. 'redis_host': 'localhost',
  31. 'redis_port': 6379,
  32. 'redis_db': 0,
  33. 'tws_host': 'localhost',
  34. 'tws_api_port': 8496,
  35. 'tws_app_id': 38888,
  36. 'group_id': 'TWS_GW',
  37. 'session_timeout_ms': 10000,
  38. 'clear_offsets': False,
  39. 'order_transmit': False,
  40. 'topics': list(TWS_Protocol.topicMethods),
  41. 'reset_db_subscriptions': False
  42. }
  43. def __init__(self, kwargs):
  44. temp_kwargs = copy.copy(kwargs)
  45. self.kwargs = copy.copy(TWS_gateway.TWS_GW_DEFAULT_CONFIG)
  46. for key in self.kwargs:
  47. if key in temp_kwargs:
  48. self.kwargs[key] = temp_kwargs.pop(key)
  49. self.kwargs.update(temp_kwargs)
  50. '''
  51. TWS_gateway start up sequence
  52. 1. establish redis connection
  53. 2. initialize prosumer instance - gateway message handler
  54. 3. establish TWS gateway connectivity
  55. 4. initialize listeners: ClientRequestHandler and SubscriptionManager
  56. 5. start the prosumer
  57. '''
  58. logging.info('starting up TWS_gateway...')
  59. self.ib_order_transmit = self.kwargs['order_transmit']
  60. logging.info('Order straight through (no-touch) flag = %s' % ('True' if self.ib_order_transmit == True else 'False'))
  61. logging.info('establishing redis connection...')
  62. self.initialize_redis()
  63. logging.info('starting up gateway message handler - kafka Prosumer...')
  64. self.gw_message_handler = Prosumer(name='tws_gw_prosumer', kwargs=self.kwargs)
  65. logging.info('initializing TWS_event_handler...')
  66. self.tws_event_handler = TWS_event_handler(self.gw_message_handler)
  67. logging.info('starting up IB EClientSocket...')
  68. self.tws_connection = EClientSocket(self.tws_event_handler)
  69. logging.info('establishing TWS gateway connectivity...')
  70. if self.connect_tws() == False:
  71. logging.error('TWS_gateway: unable to establish connection to IB %s:%d' %
  72. (self.kwargs['tws_host'], self.kwargs['tws_api_port']))
  73. self.disconnect_tws()
  74. sys.exit(-1)
  75. else:
  76. # start heart beat monitor
  77. logging.info('starting up IB heart beat monitor...')
  78. self.tlock = Lock()
  79. self.ibh = IbHeartBeat(self.kwargs)
  80. self.ibh.register_listener([self.on_ib_conn_broken])
  81. self.ibh.run()
  82. logging.info('instantiating listeners...cli_req_handler')
  83. self.cli_req_handler = ClientRequestHandler('client_request_handler', self)
  84. logging.info('instantiating listeners subscription manager...')
  85. self.initialize_subscription_mgr()
  86. logging.info('registering messages to listen...')
  87. self.gw_message_handler.add_listeners([self.cli_req_handler])
  88. self.gw_message_handler.add_listener_topics(self.contract_subscription_mgr, self.kwargs['subscription_manager.topics'])
  89. logging.info('start TWS_event_handler. Start prosumer processing loop...')
  90. self.gw_message_handler.start_prosumer()
  91. logging.info('**** Completed initialization sequence. ****')
  92. self.main_loop()
  93. def initialize_subscription_mgr(self):
  94. self.contract_subscription_mgr = SubscriptionManager(self.kwargs['name'], self.tws_connection,
  95. self.gw_message_handler,
  96. self.get_redis_conn(), self.kwargs)
  97. self.tws_event_handler.set_subscription_manager(self.contract_subscription_mgr)
  98. def initialize_redis(self):
  99. self.rs = redis.Redis(self.kwargs['redis_host'], self.kwargs['redis_port'], self.kwargs['redis_db'])
  100. try:
  101. self.rs.client_list()
  102. except redis.ConnectionError:
  103. logging.error('TWS_gateway: unable to connect to redis server using these settings: %s port:%d db:%d' %
  104. (self.kwargs['redis_host'], self.kwargs['redis_port'], self.kwargs['redis_db']))
  105. logging.error('aborting...')
  106. sys.exit(-1)
  107. def get_redis_conn(self):
  108. return self.rs
  109. def connect_tws(self):
  110. if type(self.kwargs['tws_app_id']) <> int:
  111. logging.error('TWS_gateway:connect_tws. tws_app_id must be of int type but detected %s!' % str(type(kwargs['tws_app_id'])))
  112. sys.exit(-1)
  113. logging.info('TWS_gateway - eConnect. Connecting to %s:%d App Id: %d...' %
  114. (self.kwargs['tws_host'], self.kwargs['tws_api_port'], self.kwargs['tws_app_id']))
  115. self.tws_connection.eConnect(self.kwargs['tws_host'], self.kwargs['tws_api_port'], self.kwargs['tws_app_id'])
  116. return self.tws_connection.isConnected()
  117. def disconnect_tws(self, value=None):
  118. sleep(2)
  119. self.tws_connection.eDisconnect()
  120. def on_ib_conn_broken(self, msg):
  121. logging.error('TWS_gateway: detected broken IB connection!')
  122. self.ib_conn_status = 'ERROR'
  123. self.tlock.acquire() # this function may get called multiple times
  124. try: # block until another party finishes executing
  125. if self.ib_conn_status == 'OK': # check status
  126. return # if already fixed up while waiting, return
  127. #self.disconnect_tws()
  128. self.connect_tws()
  129. while not self.tws_connection.isConnected():
  130. logging.error('TWS_gateway: attempt to reconnect...')
  131. self.connect_tws()
  132. sleep(2)
  133. # we arrived here because the connection has been restored
  134. # resubscribe tickers again!
  135. self.ib_conn_status = 'OK'
  136. logging.info('TWS_gateway: IB connection restored...resubscribe contracts')
  137. self.contract_subscription_mgr.force_resubscription()
  138. finally:
  139. self.tlock.release()
  140. def persist_subscription_table(self):
  141. self.pcounter = (self.pcounter + 1) % 10
  142. if (self.pcounter >= 8):
  143. self.contract_subscription_mgr.persist_subscriptions()
  144. def main_loop(self):
  145. try:
  146. logging.info('TWS_gateway:main_loop ***** accepting console input...')
  147. self.pcounter = 0
  148. while True:
  149. sleep(.5)
  150. self.persist_subscription_table()
  151. except (KeyboardInterrupt, SystemExit):
  152. logging.error('TWS_gateway: caught user interrupt. Shutting down...')
  153. self.gw_message_handler.set_stop()
  154. self.gw_message_handler.join()
  155. self.ibh.shutdown()
  156. logging.info('TWS_gateway: Service shut down complete...')
  157. sys.exit(0)
  158. if __name__ == '__main__':
  159. usage = "usage: %prog [options]"
  160. parser = OptionParser(usage=usage)
  161. parser.add_option("-c", "--clear_offsets", action="store_true", dest="clear_offsets",
  162. help="delete all redis offsets used by this program")
  163. parser.add_option("-r", "--reset_db_subscriptions", action="store_true", dest="reset_db_subscriptions",
  164. help="delete subscriptions entries in redis used by this program")
  165. parser.add_option("-f", "--config_file",
  166. action="store", dest="config_file",
  167. help="path to the config file")
  168. (options, args) = parser.parse_args()
  169. kwargs = ConfigMap().kwargs_from_file(options.config_file)
  170. for option, value in options.__dict__.iteritems():
  171. if value <> None:
  172. kwargs[option] = value
  173. logconfig = kwargs['logconfig']
  174. logconfig['format'] = '%(asctime)s %(levelname)-8s %(message)s'
  175. logging.basicConfig(**logconfig)
  176. logging.info('config settings: %s' % kwargs)
  177. app = TWS_gateway(kwargs)