| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178 |
- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
- import sys
- import copy
- from time import sleep, strftime
- import logging
- import json
- from optparse import OptionParser
- from ib.ext.Contract import Contract
- from misc2.helpers import ContractHelper, ExecutionFilterHelper, OrderHelper, ConfigMap
- from comms.ibgw.base_messaging import Prosumer, BaseMessageListener
- from comms.ibc.base_client_messaging import GatewayCommandWrapper, AbstractGatewayListener
- from comms.tws_protocol_helper import TWS_Protocol
- from misc2.observer import NotImplementedException
- import redis
-
-
-
- class TWS_client_manager(GatewayCommandWrapper):
-
- TWS_CLI_DEFAULT_CONFIG = {
- 'name': 'tws_gateway_client',
- 'bootstrap_host': 'localhost',
- 'bootstrap_port': 9092,
- 'redis_host': 'localhost',
- 'redis_port': 6379,
- 'redis_db': 0,
- 'tws_host': 'localhost',
- 'tws_api_port': 8496,
- 'tws_app_id': 38868,
- 'group_id': 'TWS_CLI',
- 'session_timeout_ms': 10000,
- 'clear_offsets': False,
-
- 'topics': list(TWS_Protocol.topicEvents)
- }
-
-
-
- def __init__(self, kwargs):
-
- temp_kwargs = copy.copy(kwargs)
- self.kwargs = copy.copy(TWS_client_manager.TWS_CLI_DEFAULT_CONFIG)
- for key in self.kwargs:
- if key in temp_kwargs:
- self.kwargs[key] = temp_kwargs.pop(key)
- self.kwargs.update(temp_kwargs)
-
- '''
- TWS_client_manager start up sequence
-
- 1. establish redis connection
- 2. initialize prosumer instance - gateway message handler
-
- 4. initialize listeners:
- 5. start the prosumer
-
- '''
- logging.info('starting up TWS_client_manager...')
- logging.info('establishing redis connection...')
- self.initialize_redis()
-
- logging.info('starting up gateway message handler - kafka Prosumer...')
- self.gw_message_handler = Prosumer(name=self.kwargs['name'], kwargs=self.kwargs)
- GatewayCommandWrapper.__init__(self, self.gw_message_handler)
-
-
-
-
- logging.info('**** Completed initialization sequence. ****')
-
- def is_stopped(self):
- return self.gw_message_handler.is_stopped()
-
- def stop_manager(self):
- self.gw_message_handler.set_stop()
- def start_manager(self):
- logging.info('start gw_message_handler. Entering processing loop...')
- self.gw_message_handler.start_prosumer()
-
- def add_listener_topics(self, listener, topics):
- self.gw_message_handler.add_listener_topics(listener, topics)
- def initialize_redis(self):
- self.rs = redis.Redis(self.kwargs['redis_host'], self.kwargs['redis_port'], self.kwargs['redis_db'])
- try:
- self.rs.client_list()
- except redis.ConnectionError:
- logging.error('TWS_client_manager: unable to connect to redis server using these settings: %s port:%d db:%d' %
- (self.kwargs['redis_host'], self.kwargs['redis_port'], self.kwargs['redis_db']))
- logging.error('aborting...')
- sys.exit(-1)
-
-
-
-
- class GatewayMessageListener(AbstractGatewayListener):
- def __init__(self, name):
- AbstractGatewayListener.__init__(self, name)
-
- #def tickPrice(self, event, message_value): # tickerId, field, price, canAutoExecute):
- def tickPrice(self, event, contract_key, field, price, canAutoExecute): # tickerId, field, price, canAutoExecute):
- logging.info('GatewayMessageListener:%s. val->[%s]' % (event, vars()))
- def tickSize(self, event, message_value): # tickerId, field, price, canAutoExecute):
- logging.info('GatewayMessageListener:%s. val->[%s]' % (event, message_value))
-
- def error(self, event, message_value):
- logging.info('GatewayMessageListener:%s. val->[%s]' % (event, message_value))
- def test_client(kwargs):
- contractTuples = [('HSI', 'FUT', 'HKFE', 'HKD', '20170330', 0, '')]#,
- #('USD', 'CASH', 'IDEALPRO', 'JPY', '', 0, ''),]
-
-
- print kwargs
- cm = TWS_client_manager(kwargs)
- cl = GatewayMessageListener('gw_client_message_listener')
-
- cm.add_listener_topics(cl, kwargs['topics'])
- cm.start_manager()
- map(lambda c: cm.reqMktData(ContractHelper.makeContract(c)), contractTuples)
- try:
- logging.info('TWS_gateway:main_loop ***** accepting console input...')
- while True:
-
- sleep(.45)
-
- except (KeyboardInterrupt, SystemExit):
- logging.error('TWS_client_manager: caught user interrupt. Shutting down...')
- cm.gw_message_handler.set_stop()
-
- logging.info('TWS_client_manager: Service shut down complete...')
-
-
- if __name__ == '__main__':
-
- usage = "usage: %prog [options]"
- parser = OptionParser(usage=usage)
- parser.add_option("-c", "--clear_offsets", action="store_true", dest="clear_offsets",
- help="delete all redis offsets used by this program")
- parser.add_option("-f", "--config_file",
- action="store", dest="config_file",
- help="path to the config file")
-
- (options, args) = parser.parse_args()
-
- kwargs = ConfigMap().kwargs_from_file(options.config_file)
- for option, value in options.__dict__.iteritems():
-
- if value <> None:
- kwargs[option] = value
- logconfig = kwargs['logconfig']
- logconfig['format'] = '%(asctime)s %(levelname)-8s %(message)s'
- logging.basicConfig(**logconfig)
- logging.debug('config settings: %s' % kwargs)
-
-
- test_client(kwargs)
-
-
|