| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402 |
- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
- import sys
- import copy
- from time import sleep, strftime
- import logging
- import json
- from ib.ext.Contract import Contract
- from ib.ext.EClientSocket import EClientSocket
- from misc2.helpers import ContractHelper, ConfigMap, LoggerNoBaseMessagingFilter
- from optparse import OptionParser
- from comms.ibgw.base_messaging import Prosumer
- from comms.ibgw.tws_event_handler import TWS_event_handler
- from comms.ibgw.ib_heartbeat import IbHeartBeat
- from comms.ibgw.client_request_handler import ClientRequestHandler
- from comms.ibgw.subscription_manager import SubscriptionManager
- from comms.tws_protocol_helper import TWS_Protocol
- from comms.ibgw.tws_gateway_restapi import WebConsole
- from comms.ibgw.order_manager import OrderManager
- from ormdapi.v2.quote_handler import QuoteRESTHandler
- from ormdapi.v2.position_handler import AccountPositionTracker
- from ormdapi.v2.contract_handler import ContractHandler
- from ormdapi.v2.ws.ws_api_server import ApiSocketServer
- import redis
- import threading
- from threading import Lock
-
-
-
- class TWS_gateway():
-
- # monitor IB connection / heart beat
- # ibh = None
- # tlock = None
- # ib_conn_status = None
- TWS_GW_DEFAULT_CONFIG = {
- 'name': 'tws_gateway_server',
- 'bootstrap_host': 'localhost',
- 'bootstrap_port': 9092,
- 'redis_host': 'localhost',
- 'redis_port': 6379,
- 'redis_db': 0,
- 'tws_host': 'localhost',
- 'tws_api_port': 8496,
- 'tws_app_id': 38888,
- 'group_id': 'TWS_GW',
- 'session_timeout_ms': 10000,
- 'clear_offsets': False,
- 'order_transmit': False,
- 'topics': list(TWS_Protocol.topicMethods),
- 'reset_db_subscriptions': False
- }
-
-
- def __init__(self, kwargs):
-
-
-
- temp_kwargs = copy.copy(kwargs)
- self.kwargs = copy.copy(TWS_gateway.TWS_GW_DEFAULT_CONFIG)
- for key in self.kwargs:
- if key in temp_kwargs:
- self.kwargs[key] = temp_kwargs.pop(key)
- self.kwargs.update(temp_kwargs)
- '''
- TWS_gateway start up sequence
-
- 1. establish redis connection
- 2. initialize prosumer instance - gateway message handler
- 3. establish TWS gateway connectivity
-
- 4. initialize listeners: ClientRequestHandler and SubscriptionManager
- 4a. start a bunch of services for the REST API
- 5. start the prosumer
- 6. run web console
-
- '''
- logging.info('starting up TWS_gateway...')
- self.ib_order_transmit = self.kwargs['order_transmit']
- logging.info('Order straight through (no-touch) flag = %s' % ('True' if self.ib_order_transmit == True else 'False'))
-
-
- logging.info('establishing redis connection...')
- self.initialize_redis()
-
- logging.info('starting up gateway message handler - kafka Prosumer...')
- self.gw_message_handler = Prosumer(name='tws_gw_prosumer', kwargs=self.kwargs)
-
-
- logging.info('initializing TWS_event_handler...')
- self.tws_event_handler = TWS_event_handler(self.gw_message_handler)
-
- logging.info('starting up IB EClientSocket...')
-
-
- self.tws_connection = EClientSocket(self.tws_event_handler)
-
-
-
- logging.info('establishing TWS gateway connectivity...')
- if self.connect_tws() == False:
- logging.error('TWS_gateway: unable to establish connection to IB %s:%d' %
- (self.kwargs['tws_host'], self.kwargs['tws_api_port']))
- self.disconnect_tws()
- sys.exit(-1)
- else:
- # start heart beat monitor
- logging.info('starting up IB heart beat monitor...')
- self.tlock = Lock()
- self.ibh = IbHeartBeat(self.kwargs)
- self.ibh.register_listener([self.on_ib_conn_broken])
- self.ibh.run()
- logging.info('instantiating listeners...cli_req_handler')
- self.cli_req_handler = ClientRequestHandler('client_request_handler', self)
- logging.info('instantiating listeners subscription manager...')
- self.initialize_subscription_mgr()
- logging.info('registering messages to listen...')
- self.gw_message_handler.add_listeners([self.cli_req_handler])
- self.gw_message_handler.add_listener_topics(self.contract_subscription_mgr, self.kwargs['subscription_manager.topics'])
- logging.info('initialize order_id_manager and quote_handler for REST API...')
- self.initialize_order_quote_manager()
-
- logging.info('start TWS_event_handler. Start prosumer processing loop...')
- self.gw_message_handler.start_prosumer()
- logging.info('start web console...')
- self.start_web_console()
- logging.info('**** Completed initialization sequence. ****')
-
- self.main_loop()
-
- def initialize_subscription_mgr(self):
-
-
- self.contract_subscription_mgr = SubscriptionManager(self.kwargs['name'], self.tws_connection,
- self.gw_message_handler,
- self.get_redis_conn(), self.kwargs)
-
-
- self.tws_event_handler.set_subscription_manager(self.contract_subscription_mgr)
- '''
- this group of objects are for handling rest API requests
- '''
- def initialize_order_quote_manager(self):
- # self.order_id_mgr = OrderIdManager(self.tws_connection)
- # self.tws_event_handler.set_order_id_manager(self.order_id_mgr)
- # self.order_id_mgr.start()
- self.order_manager = OrderManager('order_manager', self, self.kwargs)
- self.order_manager.start_order_manager()
- self.quote_manager = QuoteRESTHandler('quote_manager', self)
- self.pos_manager = AccountPositionTracker('acctpos_manager', self)
- self.contract_info_manager = ContractHandler('contract_info_mgr', self)
- self.ws_manager = ApiSocketServer('api_ws_manager', self, self.kwargs['restapi.web_socket_host'], self.kwargs['restapi.web_socket_port'])
- t = threading.Thread(name='websocket server', target=self.ws_manager.run_forever)
- t.setDaemon(True)
- t.start()
-
- def initialize_redis(self):
- self.rs = redis.Redis(self.kwargs['redis_host'], self.kwargs['redis_port'], self.kwargs['redis_db'])
- try:
- self.rs.client_list()
- except redis.ConnectionError:
- logging.error('TWS_gateway: unable to connect to redis server using these settings: %s port:%d db:%d' %
- (self.kwargs['redis_host'], self.kwargs['redis_port'], self.kwargs['redis_db']))
- logging.error('aborting...')
- sys.exit(-1)
-
-
- def start_web_console(self):
-
- def start_flask():
- w = WebConsole(self)
-
- # tell tws_event_handler that WebConsole
- # is interested to receive all messages
- for e in TWS_event_handler.PUBLISH_TWS_EVENTS:
- self.tws_event_handler.register(e, w)
-
- w.add_resource()
- w.app.run(host=self.kwargs['webconsole.host'], port=self.kwargs['webconsole.port'],
- debug=self.kwargs['webconsole.debug'], use_reloader=self.kwargs['webconsole.auto_reload'])
-
- t_webApp = threading.Thread(name='Web App', target=start_flask)
- t_webApp.setDaemon(True)
- t_webApp.start()
-
-
- def get_order_id_manager(self):
- return self.order_manager.get_order_id_mgr()
- def get_order_manager(self):
- return self.order_manager
-
- def get_tws_connection(self):
- return self.tws_connection
-
- def get_tws_event_handler(self):
- return self.tws_event_handler
-
- def get_subscription_manager(self):
- return self.contract_subscription_mgr
-
- def get_quote_manager(self):
- return self.quote_manager
-
- def get_pos_manager(self):
- return self.pos_manager
-
- def get_contract_info_manager(self):
- return self.contract_info_manager
-
- def get_ws_manager(self):
- return self.ws_manager
-
- def get_redis_conn(self):
- return self.rs
- def get_config(self):
- return self.kwargs
-
- def get_ib_conn_status(self):
- return self.ib_conn_status
- def connect_tws(self):
- if type(self.kwargs['tws_app_id']) <> int:
- logging.error('TWS_gateway:connect_tws. tws_app_id must be of int type but detected %s!' % str(type(kwargs['tws_app_id'])))
- sys.exit(-1)
-
- logging.info('TWS_gateway - eConnect. Connecting to %s:%d App Id: %d...' %
- (self.kwargs['tws_host'], self.kwargs['tws_api_port'], self.kwargs['tws_app_id']))
- self.tws_connection.eConnect(self.kwargs['tws_host'], self.kwargs['tws_api_port'], self.kwargs['tws_app_id'])
-
- return self.tws_connection.isConnected()
- def disconnect_tws(self, value=None):
- sleep(2)
- self.tws_connection.eDisconnect()
- def on_ib_conn_broken(self, msg):
- logging.error('TWS_gateway: detected broken IB connection!')
- self.ib_conn_status = 'ERROR'
- self.tlock.acquire() # this function may get called multiple times
- try: # block until another party finishes executing
- if self.ib_conn_status == 'OK': # check status
- return # if already fixed up while waiting, return
-
- #self.disconnect_tws()
- self.connect_tws()
- while not self.tws_connection.isConnected():
- logging.error('TWS_gateway: attempt to reconnect...')
- self.connect_tws()
- sleep(2)
-
- # we arrived here because the connection has been restored
- # resubscribe tickers again!
- self.ib_conn_status = 'OK'
- logging.info('TWS_gateway: IB connection restored...resubscribe contracts')
- self.contract_subscription_mgr.force_resubscription()
-
-
- finally:
- self.tlock.release()
-
-
-
- def persist_subscription_table(self):
- self.pcounter = (self.pcounter + 1) % 10
- if (self.pcounter >= 8):
- self.contract_subscription_mgr.persist_subscriptions()
-
-
- def shutdown_all(self):
- sleep(1)
- logging.info('shutdown_all sequence started....')
- self.gw_message_handler.set_stop()
- self.gw_message_handler.join()
- self.ibh.shutdown()
- self.menu_loop_done = True
- self.get_order_id_manager().set_stop()
-
- sys.exit(0)
-
- def post_shutdown(self):
- th = threading.Thread(target=self.shutdown_all)
- th.daemon = True
- th.start()
-
- def main_loop(self):
- def print_menu():
- menu = {}
- menu['1']="Dump subscription manager content to log"
- menu['2']=""
- menu['3']="Start up configuration"
- menu['4']=""
- menu['9']="Exit"
-
- choices=menu.keys()
- choices.sort()
- for entry in choices:
- print entry, menu[entry]
-
- def get_user_input(selection):
- logging.info('TWS_gateway:main_loop ***** accepting console input...')
- print_menu()
- while 1:
- resp = sys.stdin.readline()
- response[0] = resp.strip('\n')
- try:
-
- response = [None]
- user_input_th = threading.Thread(target=get_user_input, args=(response,))
- user_input_th.daemon = True
- user_input_th.start()
- self.pcounter = 0
- self.menu_loop_done = False
- while not self.menu_loop_done:
-
- sleep(.5)
- self.persist_subscription_table()
- if response[0] is not None:
- selection = response[0]
- if selection =='1':
- self.contract_subscription_mgr.dump()
- elif selection == '3':
- print '\n'.join('[%s]:%s' % (k,v) for k,v in self.kwargs.iteritems())
- elif selection == '9':
- self.shutdown_all()
- sys.exit(0)
- break
- else:
- pass
- response[0] = None
- print_menu()
-
- except (KeyboardInterrupt, SystemExit):
- logging.error('TWS_gateway: caught user interrupt. Shutting down...')
- self.shutdown_all()
- logging.info('TWS_gateway: Service shut down complete...')
- sys.exit(0)
-
-
-
- if __name__ == '__main__':
- usage = "usage: %prog [options]"
- parser = OptionParser(usage=usage)
- parser.add_option("-c", "--clear_offsets", action="store_true", dest="clear_offsets",
- help="delete all redis offsets used by this program")
- parser.add_option("-r", "--reset_db_subscriptions", action="store_true", dest="reset_db_subscriptions",
- help="delete subscriptions entries in redis used by this program")
-
- parser.add_option("-f", "--config_file",
- action="store", dest="config_file",
- help="path to the config file")
-
- (options, args) = parser.parse_args()
-
-
- kwargs = ConfigMap().kwargs_from_file(options.config_file)
- for option, value in options.__dict__.iteritems():
-
- if value <> None:
- kwargs[option] = value
- logconfig = kwargs['logconfig']
- logconfig['format'] = '%(asctime)s %(levelname)-8s %(message)s'
- logging.basicConfig(**logconfig)
- logging.getLogger().addFilter(LoggerNoBaseMessagingFilter())
- logging.info('config settings: %s' % kwargs)
-
- app = TWS_gateway(kwargs)
-
-
|