tws_gateway.py 14 KB


  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. import sys
  4. import copy
  5. from time import sleep, strftime
  6. import logging
  7. import json
  8. from ib.ext.Contract import Contract
  9. from ib.ext.EClientSocket import EClientSocket
  10. from misc2.helpers import ContractHelper, ConfigMap, LoggerNoBaseMessagingFilter
  11. from optparse import OptionParser
  12. from comms.ibgw.base_messaging import Prosumer
  13. from comms.ibgw.tws_event_handler import TWS_event_handler
  14. from comms.ibgw.ib_heartbeat import IbHeartBeat
  15. from comms.ibgw.client_request_handler import ClientRequestHandler
  16. from comms.ibgw.subscription_manager import SubscriptionManager
  17. from comms.tws_protocol_helper import TWS_Protocol
  18. from comms.ibgw.tws_gateway_restapi import WebConsole
  19. from comms.ibgw.order_manager import OrderManager
  20. from ormdapi.v2.quote_handler import QuoteRESTHandler
  21. from ormdapi.v2.position_handler import AccountPositionTracker
  22. from ormdapi.v2.contract_handler import ContractHandler
  23. from ormdapi.v2.ws.ws_api_server import ApiSocketServer
  24. import redis
  25. import threading
  26. from threading import Lock
  27. class TWS_gateway():
  28. # monitor IB connection / heart beat
  29. # ibh = None
  30. # tlock = None
  31. # ib_conn_status = None
  32. TWS_GW_DEFAULT_CONFIG = {
  33. 'name': 'tws_gateway_server',
  34. 'bootstrap_host': 'localhost',
  35. 'bootstrap_port': 9092,
  36. 'redis_host': 'localhost',
  37. 'redis_port': 6379,
  38. 'redis_db': 0,
  39. 'tws_host': 'localhost',
  40. 'tws_api_port': 8496,
  41. 'tws_app_id': 38888,
  42. 'group_id': 'TWS_GW',
  43. 'session_timeout_ms': 10000,
  44. 'clear_offsets': False,
  45. 'order_transmit': False,
  46. 'topics': list(TWS_Protocol.topicMethods),
  47. 'reset_db_subscriptions': False
  48. }
  49. def __init__(self, kwargs):
  50. temp_kwargs = copy.copy(kwargs)
  51. self.kwargs = copy.copy(TWS_gateway.TWS_GW_DEFAULT_CONFIG)
  52. for key in self.kwargs:
  53. if key in temp_kwargs:
  54. self.kwargs[key] = temp_kwargs.pop(key)
  55. self.kwargs.update(temp_kwargs)
  56. '''
  57. TWS_gateway start up sequence
  58. 1. establish redis connection
  59. 2. initialize prosumer instance - gateway message handler
  60. 3. establish TWS gateway connectivity
  61. 4. initialize listeners: ClientRequestHandler and SubscriptionManager
  62. 4a. start a bunch of services for the REST API
  63. 5. start the prosumer
  64. 6. run web console
  65. '''
  66. logging.info('starting up TWS_gateway...')
  67. self.ib_order_transmit = self.kwargs['order_transmit']
  68. logging.info('Order straight through (no-touch) flag = %s' % ('True' if self.ib_order_transmit == True else 'False'))
  69. logging.info('establishing redis connection...')
  70. self.initialize_redis()
  71. logging.info('starting up gateway message handler - kafka Prosumer...')
  72. self.gw_message_handler = Prosumer(name='tws_gw_prosumer', kwargs=self.kwargs)
  73. logging.info('initializing TWS_event_handler...')
  74. self.tws_event_handler = TWS_event_handler(self.gw_message_handler)
  75. logging.info('starting up IB EClientSocket...')
  76. self.tws_connection = EClientSocket(self.tws_event_handler)
  77. logging.info('establishing TWS gateway connectivity...')
  78. if self.connect_tws() == False:
  79. logging.error('TWS_gateway: unable to establish connection to IB %s:%d' %
  80. (self.kwargs['tws_host'], self.kwargs['tws_api_port']))
  81. self.disconnect_tws()
  82. sys.exit(-1)
  83. else:
  84. # start heart beat monitor
  85. logging.info('starting up IB heart beat monitor...')
  86. self.tlock = Lock()
  87. self.ibh = IbHeartBeat(self.kwargs)
  88. self.ibh.register_listener([self.on_ib_conn_broken])
  89. self.ibh.run()
  90. logging.info('instantiating listeners...cli_req_handler')
  91. self.cli_req_handler = ClientRequestHandler('client_request_handler', self)
  92. logging.info('instantiating listeners subscription manager...')
  93. self.initialize_subscription_mgr()
  94. logging.info('registering messages to listen...')
  95. self.gw_message_handler.add_listeners([self.cli_req_handler])
  96. self.gw_message_handler.add_listener_topics(self.contract_subscription_mgr, self.kwargs['subscription_manager.topics'])
  97. logging.info('initialize order_id_manager and quote_handler for REST API...')
  98. self.initialize_order_quote_manager()
  99. logging.info('start TWS_event_handler. Start prosumer processing loop...')
  100. self.gw_message_handler.start_prosumer()
  101. logging.info('start web console...')
  102. self.start_web_console()
  103. logging.info('**** Completed initialization sequence. ****')
  104. self.main_loop()
  105. def initialize_subscription_mgr(self):
  106. self.contract_subscription_mgr = SubscriptionManager(self.kwargs['name'], self.tws_connection,
  107. self.gw_message_handler,
  108. self.get_redis_conn(), self.kwargs)
  109. self.tws_event_handler.set_subscription_manager(self.contract_subscription_mgr)
  110. '''
  111. this group of objects are for handling rest API requests
  112. '''
  113. def initialize_order_quote_manager(self):
  114. # self.order_id_mgr = OrderIdManager(self.tws_connection)
  115. # self.tws_event_handler.set_order_id_manager(self.order_id_mgr)
  116. # self.order_id_mgr.start()
  117. self.order_manager = OrderManager('order_manager', self, self.kwargs)
  118. self.order_manager.start_order_manager()
  119. self.quote_manager = QuoteRESTHandler('quote_manager', self)
  120. self.pos_manager = AccountPositionTracker('acctpos_manager', self)
  121. self.contract_info_manager = ContractHandler('contract_info_mgr', self)
  122. self.ws_manager = ApiSocketServer('api_ws_manager', self, self.kwargs['restapi.web_socket_host'], self.kwargs['restapi.web_socket_port'])
  123. t = threading.Thread(name='websocket server', target=self.ws_manager.run_forever)
  124. t.setDaemon(True)
  125. t.start()
  126. def initialize_redis(self):
  127. self.rs = redis.Redis(self.kwargs['redis_host'], self.kwargs['redis_port'], self.kwargs['redis_db'])
  128. try:
  129. self.rs.client_list()
  130. except redis.ConnectionError:
  131. logging.error('TWS_gateway: unable to connect to redis server using these settings: %s port:%d db:%d' %
  132. (self.kwargs['redis_host'], self.kwargs['redis_port'], self.kwargs['redis_db']))
  133. logging.error('aborting...')
  134. sys.exit(-1)
  135. def start_web_console(self):
  136. def start_flask():
  137. w = WebConsole(self)
  138. # tell tws_event_handler that WebConsole
  139. # is interested to receive all messages
  140. for e in TWS_event_handler.PUBLISH_TWS_EVENTS:
  141. self.tws_event_handler.register(e, w)
  142. w.add_resource()
  143. w.app.run(host=self.kwargs['webconsole.host'], port=self.kwargs['webconsole.port'],
  144. debug=self.kwargs['webconsole.debug'], use_reloader=self.kwargs['webconsole.auto_reload'])
  145. t_webApp = threading.Thread(name='Web App', target=start_flask)
  146. t_webApp.setDaemon(True)
  147. t_webApp.start()
  148. def get_order_id_manager(self):
  149. return self.order_manager.get_order_id_mgr()
  150. def get_order_manager(self):
  151. return self.order_manager
  152. def get_tws_connection(self):
  153. return self.tws_connection
  154. def get_tws_event_handler(self):
  155. return self.tws_event_handler
  156. def get_subscription_manager(self):
  157. return self.contract_subscription_mgr
  158. def get_quote_manager(self):
  159. return self.quote_manager
  160. def get_pos_manager(self):
  161. return self.pos_manager
  162. def get_contract_info_manager(self):
  163. return self.contract_info_manager
  164. def get_ws_manager(self):
  165. return self.ws_manager
  166. def get_redis_conn(self):
  167. return self.rs
  168. def get_config(self):
  169. return self.kwargs
  170. def get_ib_conn_status(self):
  171. return self.ib_conn_status
  172. def connect_tws(self):
  173. if type(self.kwargs['tws_app_id']) <> int:
  174. logging.error('TWS_gateway:connect_tws. tws_app_id must be of int type but detected %s!' % str(type(kwargs['tws_app_id'])))
  175. sys.exit(-1)
  176. logging.info('TWS_gateway - eConnect. Connecting to %s:%d App Id: %d...' %
  177. (self.kwargs['tws_host'], self.kwargs['tws_api_port'], self.kwargs['tws_app_id']))
  178. self.tws_connection.eConnect(self.kwargs['tws_host'], self.kwargs['tws_api_port'], self.kwargs['tws_app_id'])
  179. return self.tws_connection.isConnected()
  180. def disconnect_tws(self, value=None):
  181. sleep(2)
  182. self.tws_connection.eDisconnect()
  183. def on_ib_conn_broken(self, msg):
  184. logging.error('TWS_gateway: detected broken IB connection!')
  185. self.ib_conn_status = 'ERROR'
  186. self.tlock.acquire() # this function may get called multiple times
  187. try: # block until another party finishes executing
  188. if self.ib_conn_status == 'OK': # check status
  189. return # if already fixed up while waiting, return
  190. #self.disconnect_tws()
  191. self.connect_tws()
  192. while not self.tws_connection.isConnected():
  193. logging.error('TWS_gateway: attempt to reconnect...')
  194. self.connect_tws()
  195. sleep(2)
  196. # we arrived here because the connection has been restored
  197. # resubscribe tickers again!
  198. self.ib_conn_status = 'OK'
  199. logging.info('TWS_gateway: IB connection restored...resubscribe contracts')
  200. self.contract_subscription_mgr.force_resubscription()
  201. finally:
  202. self.tlock.release()
  203. def persist_subscription_table(self):
  204. self.pcounter = (self.pcounter + 1) % 10
  205. if (self.pcounter >= 8):
  206. self.contract_subscription_mgr.persist_subscriptions()
  207. def shutdown_all(self):
  208. sleep(1)
  209. logging.info('shutdown_all sequence started....')
  210. self.gw_message_handler.set_stop()
  211. self.gw_message_handler.join()
  212. self.ibh.shutdown()
  213. self.menu_loop_done = True
  214. self.get_order_id_manager().set_stop()
  215. sys.exit(0)
  216. def post_shutdown(self):
  217. th = threading.Thread(target=self.shutdown_all)
  218. th.daemon = True
  219. th.start()
  220. def main_loop(self):
  221. def print_menu():
  222. menu = {}
  223. menu['1']="Dump subscription manager content to log"
  224. menu['2']=""
  225. menu['3']="Start up configuration"
  226. menu['4']=""
  227. menu['9']="Exit"
  228. choices=menu.keys()
  229. choices.sort()
  230. for entry in choices:
  231. print entry, menu[entry]
  232. def get_user_input(selection):
  233. logging.info('TWS_gateway:main_loop ***** accepting console input...')
  234. print_menu()
  235. while 1:
  236. resp = sys.stdin.readline()
  237. response[0] = resp.strip('\n')
  238. try:
  239. response = [None]
  240. user_input_th = threading.Thread(target=get_user_input, args=(response,))
  241. user_input_th.daemon = True
  242. user_input_th.start()
  243. self.pcounter = 0
  244. self.menu_loop_done = False
  245. while not self.menu_loop_done:
  246. sleep(.5)
  247. self.persist_subscription_table()
  248. if response[0] is not None:
  249. selection = response[0]
  250. if selection =='1':
  251. self.contract_subscription_mgr.dump()
  252. elif selection == '3':
  253. print '\n'.join('[%s]:%s' % (k,v) for k,v in self.kwargs.iteritems())
  254. elif selection == '9':
  255. self.shutdown_all()
  256. sys.exit(0)
  257. break
  258. else:
  259. pass
  260. response[0] = None
  261. print_menu()
  262. except (KeyboardInterrupt, SystemExit):
  263. logging.error('TWS_gateway: caught user interrupt. Shutting down...')
  264. self.shutdown_all()
  265. logging.info('TWS_gateway: Service shut down complete...')
  266. sys.exit(0)
  267. if __name__ == '__main__':
  268. usage = "usage: %prog [options]"
  269. parser = OptionParser(usage=usage)
  270. parser.add_option("-c", "--clear_offsets", action="store_true", dest="clear_offsets",
  271. help="delete all redis offsets used by this program")
  272. parser.add_option("-r", "--reset_db_subscriptions", action="store_true", dest="reset_db_subscriptions",
  273. help="delete subscriptions entries in redis used by this program")
  274. parser.add_option("-f", "--config_file",
  275. action="store", dest="config_file",
  276. help="path to the config file")
  277. (options, args) = parser.parse_args()
  278. kwargs = ConfigMap().kwargs_from_file(options.config_file)
  279. for option, value in options.__dict__.iteritems():
  280. if value <> None:
  281. kwargs[option] = value
  282. logconfig = kwargs['logconfig']
  283. logconfig['format'] = '%(asctime)s %(levelname)-8s %(message)s'
  284. logging.basicConfig(**logconfig)
  285. logging.getLogger().addFilter(LoggerNoBaseMessagingFilter())
  286. logging.info('config settings: %s' % kwargs)
  287. app = TWS_gateway(kwargs)