tws_client_lib.py 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. import sys
  4. import copy
  5. from time import sleep, strftime
  6. import logging
  7. import json
  8. from optparse import OptionParser
  9. from ib.ext.Contract import Contract
  10. from misc2.helpers import ContractHelper, ExecutionFilterHelper, OrderHelper, ConfigMap
  11. from comms.ibgw.base_messaging import Prosumer, BaseMessageListener
  12. from comms.ibc.base_client_messaging import GatewayCommandWrapper, AbstractGatewayListener
  13. from comms.tws_protocol_helper import TWS_Protocol
  14. from misc2.observer import NotImplementedException
  15. import redis
  16. class TWS_client_manager(GatewayCommandWrapper):
  17. TWS_CLI_DEFAULT_CONFIG = {
  18. 'name': 'tws_gateway_client',
  19. 'bootstrap_host': 'localhost',
  20. 'bootstrap_port': 9092,
  21. 'redis_host': 'localhost',
  22. 'redis_port': 6379,
  23. 'redis_db': 0,
  24. 'tws_host': 'localhost',
  25. 'tws_api_port': 8496,
  26. 'tws_app_id': 38868,
  27. 'group_id': 'TWS_CLI',
  28. 'session_timeout_ms': 10000,
  29. 'clear_offsets': False,
  30. 'topics': list(TWS_Protocol.topicEvents)
  31. }
  32. def __init__(self, kwargs):
  33. temp_kwargs = copy.copy(kwargs)
  34. self.kwargs = copy.copy(TWS_client_manager.TWS_CLI_DEFAULT_CONFIG)
  35. for key in self.kwargs:
  36. if key in temp_kwargs:
  37. self.kwargs[key] = temp_kwargs.pop(key)
  38. self.kwargs.update(temp_kwargs)
  39. '''
  40. TWS_client_manager start up sequence
  41. 1. establish redis connection
  42. 2. initialize prosumer instance - gateway message handler
  43. 4. initialize listeners:
  44. 5. start the prosumer
  45. '''
  46. logging.info('starting up TWS_client_manager...')
  47. logging.info('establishing redis connection...')
  48. self.initialize_redis()
  49. logging.info('starting up gateway message handler - kafka Prosumer...')
  50. self.gw_message_handler = Prosumer(name=self.kwargs['name'], kwargs=self.kwargs)
  51. GatewayCommandWrapper.__init__(self, self.gw_message_handler)
  52. logging.info('**** Completed initialization sequence. ****')
  53. def is_stopped(self):
  54. return self.gw_message_handler.is_stopped()
  55. def stop_manager(self):
  56. self.gw_message_handler.set_stop()
  57. def start_manager(self):
  58. logging.info('start gw_message_handler. Entering processing loop...')
  59. self.gw_message_handler.start_prosumer()
  60. def add_listener_topics(self, listener, topics):
  61. self.gw_message_handler.add_listener_topics(listener, topics)
  62. def initialize_redis(self):
  63. self.rs = redis.Redis(self.kwargs['redis_host'], self.kwargs['redis_port'], self.kwargs['redis_db'])
  64. try:
  65. self.rs.client_list()
  66. except redis.ConnectionError:
  67. logging.error('TWS_client_manager: unable to connect to redis server using these settings: %s port:%d db:%d' %
  68. (self.kwargs['redis_host'], self.kwargs['redis_port'], self.kwargs['redis_db']))
  69. logging.error('aborting...')
  70. sys.exit(-1)
  71. class GatewayMessageListener(AbstractGatewayListener):
  72. def __init__(self, name):
  73. AbstractGatewayListener.__init__(self, name)
  74. #def tickPrice(self, event, message_value): # tickerId, field, price, canAutoExecute):
  75. def tickPrice(self, event, contract_key, field, price, canAutoExecute): # tickerId, field, price, canAutoExecute):
  76. logging.info('GatewayMessageListener:%s. val->[%s]' % (event, vars()))
  77. def tickSize(self, event, message_value): # tickerId, field, price, canAutoExecute):
  78. logging.info('GatewayMessageListener:%s. val->[%s]' % (event, message_value))
  79. def error(self, event, message_value):
  80. logging.info('GatewayMessageListener:%s. val->[%s]' % (event, message_value))
  81. def test_client(kwargs):
  82. contractTuples = [('HSI', 'FUT', 'HKFE', 'HKD', '20170330', 0, '')]#,
  83. #('USD', 'CASH', 'IDEALPRO', 'JPY', '', 0, ''),]
  84. print kwargs
  85. cm = TWS_client_manager(kwargs)
  86. cl = GatewayMessageListener('gw_client_message_listener')
  87. cm.add_listener_topics(cl, kwargs['topics'])
  88. cm.start_manager()
  89. map(lambda c: cm.reqMktData(ContractHelper.makeContract(c)), contractTuples)
  90. try:
  91. logging.info('TWS_gateway:main_loop ***** accepting console input...')
  92. while True:
  93. sleep(.45)
  94. except (KeyboardInterrupt, SystemExit):
  95. logging.error('TWS_client_manager: caught user interrupt. Shutting down...')
  96. cm.gw_message_handler.set_stop()
  97. logging.info('TWS_client_manager: Service shut down complete...')
  98. if __name__ == '__main__':
  99. usage = "usage: %prog [options]"
  100. parser = OptionParser(usage=usage)
  101. parser.add_option("-c", "--clear_offsets", action="store_true", dest="clear_offsets",
  102. help="delete all redis offsets used by this program")
  103. parser.add_option("-f", "--config_file",
  104. action="store", dest="config_file",
  105. help="path to the config file")
  106. (options, args) = parser.parse_args()
  107. kwargs = ConfigMap().kwargs_from_file(options.config_file)
  108. for option, value in options.__dict__.iteritems():
  109. if value <> None:
  110. kwargs[option] = value
  111. logconfig = kwargs['logconfig']
  112. logconfig['format'] = '%(asctime)s %(levelname)-8s %(message)s'
  113. logging.basicConfig(**logconfig)
  114. logging.debug('config settings: %s' % kwargs)
  115. test_client(kwargs)