tws_gateway.py 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. ##
  4. # This script is an exmple of using the generated code within IbPy in
  5. # the same manner as the Java code. We subclass EWrapper and give an
  6. # instance of the wrapper to an EClientSocket.
  7. ##
  8. import sys
  9. from time import sleep, strftime
  10. import time, datetime
  11. import ConfigParser
  12. from optparse import OptionParser
  13. import logging
  14. import thread
  15. import threading
  16. import traceback
  17. import json
  18. from threading import Lock
  19. from ib.ext.Contract import Contract
  20. from ib.ext.EWrapper import EWrapper
  21. from ib.ext.EClientSocket import EClientSocket
  22. from ib.ext.ExecutionFilter import ExecutionFilter
  23. from ib.ext.Execution import Execution
  24. from ib.ext.OrderState import OrderState
  25. from ib.ext.Order import Order
  26. from kafka.client import KafkaClient
  27. from kafka import KafkaConsumer
  28. from kafka.producer import SimpleProducer
  29. from kafka.common import LeaderNotAvailableError
  30. from misc2.helpers import ContractHelper, OrderHelper, ExecutionFilterHelper
  31. from comms.ib_heartbeat import IbHeartBeat
  32. from tws_protocol_helper import TWS_Protocol
  33. import redis
  34. class TWS_event_handler(EWrapper):
  35. TICKER_GAP = 1000
  36. producer = None
  37. def __init__(self, host, port):
  38. client = KafkaClient('%s:%s' % (host, port))
  39. self.producer = SimpleProducer(client, async=False)
  40. logging.info('TWS_event_handler: __init__ Creating kafka client producer at %s:%s' % (host, port))
  41. def serialize_vars_to_dict(self, message, mapping, source='IB'):
  42. def create_kmessage(items):
  43. d = {}
  44. for k,v in items:
  45. #print k, v, type(v)
  46. #if type(v) in [Contract, Execution, ExecutionFilter, OrderState, Order, CommissionReport]:
  47. if 'ib.ext.' in str(type(v)):
  48. d[k] = v.__dict__
  49. else:
  50. d[k] = v
  51. d['ts'] = time.time()
  52. d['typeName'] = message
  53. d['source'] = source
  54. return d
  55. try:
  56. del(mapping['self'])
  57. except (KeyError, ):
  58. pass
  59. items = list(mapping.items())
  60. return create_kmessage(items)
  61. def broadcast_event(self, message, mapping, source='IB'):
  62. try:
  63. dict = self.serialize_vars_to_dict(message, mapping, source)
  64. if message == 'gw_subscriptions':
  65. logging.info('TWS_event_handler: broadcast event: %s [%s]' % (dict['typeName'], dict))
  66. self.producer.send_messages(message, json.dumps(dict))
  67. except:
  68. logging.error('broadcast_event: exception while encoding IB event to client: [%s]' % message)
  69. logging.error(traceback.format_exc())
  70. #
  71. # try to broadcast the message a 2nd time
  72. # no catch if fails again
  73. if message == 'gw_subscriptions':
  74. sleep(2)
  75. logging.info('TWS_event_handler: Retry once broadcasting gw_subscription %s [%s]' % (dict['typeName'], dict))
  76. self.producer.send_messages(message, json.dumps(dict))
  77. def tick_process_message(self, items):
  78. t = {}
  79. t = items.copy()
  80. # if the tickerId is in the snapshot range
  81. # deduct the gap to derive the original tickerId
  82. # --- check logic in subscription manager
  83. if (t['tickerId'] >= TWS_event_handler.TICKER_GAP):
  84. t['tickerId'] = t['tickerId'] - TWS_event_handler.TICKER_GAP
  85. try:
  86. del(t['self'])
  87. except (KeyError, ):
  88. pass
  89. return t
  90. def tickPrice(self, tickerId, field, price, canAutoExecute):
  91. self.broadcast_event('tickPrice', self.tick_process_message(vars()))
  92. def tickSize(self, tickerId, field, size):
  93. self.broadcast_event('tickSize', self.tick_process_message(vars())) #vars())
  94. def tickOptionComputation(self, tickerId, field, impliedVol, delta, optPrice, pvDividend, gamma, vega, theta, undPrice):
  95. #self.broadcast_event('tickOptionComputation', self.tick_process_message(vars())) #vars())
  96. pass
  97. def tickGeneric(self, tickerId, tickType, value):
  98. #self.broadcast_event('tickGeneric', vars())
  99. self.broadcast_event('tickGeneric', self.tick_process_message(vars())) #vars())
  100. def tickString(self, tickerId, tickType, value):
  101. #self.broadcast_event('tickString', vars())
  102. self.broadcast_event('tickString', self.tick_process_message(vars())) #vars())
  103. def tickEFP(self, tickerId, tickType, basisPoints, formattedBasisPoints, impliedFuture, holdDays, futureExpiry, dividendImpact, dividendsToExpiry):
  104. self.broadcast_event('tickEFP', vars())
  105. def orderStatus(self, orderId, status, filled, remaining, avgFillPrice, permId, parentId, lastFillPrice, clientId, whyHeId):
  106. self.broadcast_event('orderStatus', vars())
  107. def openOrder(self, orderId, contract, order, state):
  108. self.broadcast_event('openOrder', vars())
  109. def openOrderEnd(self):
  110. self.broadcast_event('openOrderEnd', vars())
  111. def updateAccountValue(self, key, value, currency, accountName):
  112. self.broadcast_event('updateAccountValue', vars())
  113. def updatePortfolio(self, contract, position, marketPrice, marketValue, averageCost, unrealizedPNL, realizedPNL, accountName):
  114. self.broadcast_event('updatePortfolio', vars())
  115. def updateAccountTime(self, timeStamp):
  116. self.broadcast_event('updateAccountTime', vars())
  117. def accountDownloadEnd(self, accountName):
  118. self.broadcast_event('accountDownloadEnd', vars())
  119. def nextValidId(self, orderId):
  120. self.broadcast_event('nextValidId', vars())
  121. def contractDetails(self, reqId, contractDetails):
  122. self.broadcast_event('contractDetails', vars())
  123. def contractDetailsEnd(self, reqId):
  124. self.broadcast_event('contractDetailsEnd', vars())
  125. def bondContractDetails(self, reqId, contractDetails):
  126. self.broadcast_event('bondContractDetails', vars())
  127. def execDetails(self, reqId, contract, execution):
  128. self.broadcast_event('execDetails', vars())
  129. def execDetailsEnd(self, reqId):
  130. self.broadcast_event('execDetailsEnd', vars())
  131. def connectionClosed(self):
  132. self.broadcast_event('connectionClosed', {})
  133. def error(self, id=None, errorCode=None, errorMsg=None):
  134. logging.error(self.serialize_vars_to_dict('error', vars()))
  135. self.broadcast_event('error', vars())
  136. def error_0(self, strvalue=None):
  137. logging.error(self.serialize_vars_to_dict('error_0', vars()))
  138. self.broadcast_event('error_0', vars())
  139. def error_1(self, id=None, errorCode=None, errorMsg=None):
  140. logging.error(self.serialize_vars_to_dict('error_1', vars()))
  141. self.broadcast_event('error_1', vars())
  142. def updateMktDepth(self, tickerId, position, operation, side, price, size):
  143. self.broadcast_event('updateMktDepth', vars())
  144. def updateMktDepthL2(self, tickerId, position, marketMaker, operation, side, price, size):
  145. self.broadcast_event('updateMktDepthL2', vars())
  146. def updateNewsBulletin(self, msgId, msgType, message, origExchange):
  147. self.broadcast_event('updateNewsBulletin', vars())
  148. def managedAccounts(self, accountsList):
  149. logging.info(self.serialize_vars_to_dict('managedAccounts', vars()))
  150. self.broadcast_event('managedAccounts', vars())
  151. def receiveFA(self, faDataType, xml):
  152. self.broadcast_event('receiveFA', vars())
  153. def historicalData(self, reqId, date, open, high, low, close, volume, count, WAP, hasGaps):
  154. self.broadcast_event('historicalData', vars())
  155. def scannerParameters(self, xml):
  156. self.broadcast_event('scannerParameters', vars())
  157. def scannerData(self, reqId, rank, contractDetails, distance, benchmark, projection, legsStr):
  158. self.broadcast_event('scannerData', vars())
  159. def commissionReport(self, commissionReport):
  160. self.broadcast_event('commissionReport', vars())
  161. def currentTime(self, time):
  162. self.broadcast_event('currentTime', vars())
  163. def deltaNeutralValidation(self, reqId, underComp):
  164. self.broadcast_event('deltaNeutralValidation', vars())
  165. def fundamentalData(self, reqId, data):
  166. self.broadcast_event('fundamentalData', vars())
  167. def marketDataType(self, reqId, marketDataType):
  168. self.broadcast_event('marketDataType', vars())
  169. def realtimeBar(self, reqId, time, open, high, low, close, volume, wap, count):
  170. self.broadcast_event('realtimeBar', vars())
  171. def scannerDataEnd(self, reqId):
  172. self.broadcast_event('scannerDataEnd', vars())
  173. def tickSnapshotEnd(self, reqId):
  174. self.broadcast_event('tickSnapshotEnd', vars())
  175. def position(self, account, contract, pos, avgCost):
  176. self.broadcast_event('position', vars())
  177. def positionEnd(self):
  178. self.broadcast_event('positionEnd', vars())
  179. def accountSummary(self, reqId, account, tag, value, currency):
  180. self.broadcast_event('accountSummary', vars())
  181. def accountSummaryEnd(self, reqId):
  182. self.broadcast_event('accountSummaryEnd', vars())
  183. class TWS_gateway(threading.Thread):
  184. # config
  185. config = None
  186. # redis connection
  187. rs = None
  188. # channel clients' requests to IB/TWS
  189. cli_request_handler = None
  190. # manage conID / contracts mapping
  191. contract_subscription_mgr = None
  192. connection = None
  193. # handler to process incoming IB/TWS messages and echo back to clients
  194. tws_event_handler = None
  195. # monitor IB connection / heart beat
  196. ibh = None
  197. tlock = None
  198. ib_conn_status = None
  199. ib_order_transmit = False
  200. def __init__(self, host, port, clientId, kafka_host, kafka_port, config):
  201. super(TWS_gateway, self).__init__()
  202. self.config = config
  203. self.host = host
  204. self.port = port
  205. self.clientId = clientId
  206. self.ib_order_transmit = config.get("tws_gateway", "tws_gateway.order_transmit").strip('"').strip("'") if \
  207. config.get("tws_gateway", "tws_gateway.order_transmit").strip('"').strip("'") <> None\
  208. else False
  209. logging.info('starting up TWS_gateway...')
  210. logging.info('Order straight through (no-touch) flag = %s' % ('True' if self.ib_order_transmit == True else 'False'))
  211. logging.info('connecting to Redis server...')
  212. self.initialize_redis(config)
  213. logging.info('starting up TWS_event_handler...')
  214. self.tws_event_handler = TWS_event_handler(kafka_host, kafka_port)
  215. logging.info('starting up IB EClientSocket...')
  216. self.connection = EClientSocket(self.tws_event_handler)
  217. logging.info('starting up client request handler - kafkaConsumer...')
  218. self.cli_request_handler = KafkaConsumer( *[(v,0) for v in list(TWS_Protocol.topicMethods) + list(TWS_Protocol.gatewayMethods) ], \
  219. metadata_broker_list=['%s:%s' % (kafka_host, kafka_port)],\
  220. group_id = 'epc.tws_gateway',\
  221. auto_commit_enable=True,\
  222. auto_commit_interval_ms=30 * 1000,\
  223. auto_offset_reset='largest') # discard old ones
  224. self.reset_message_offset()
  225. if not self.eConnect():
  226. logging.error('TWS_gateway: unable to establish connection to IB %s:%d' % (self.host, self.port))
  227. sys.exit(-1)
  228. else:
  229. # start heart beat monitor
  230. logging.info('starting up IB heart beat monitor...')
  231. self.tlock = Lock()
  232. self.ibh = IbHeartBeat(config)
  233. self.ibh.register_listener([self.on_ib_conn_broken])
  234. self.ibh.run()
  235. logging.info('starting up subscription manager...')
  236. self.initialize_subscription_mgr()
  237. def initialize_subscription_mgr(self):
  238. self.contract_subscription_mgr = SubscriptionManager(self)
  239. self.contract_subscription_mgr.register_persistence_callback(self.persist_subscriptions)
  240. key = self.config.get("tws_gateway", "subscription_manager.subscriptions.redis_key").strip('"').strip("'")
  241. if self.rs.get(key):
  242. #contracts = map(lambda x: ContractHelper.kvstring2contract(x), json.loads(self.rs.get(key)))
  243. def is_outstanding(c):
  244. today = time.strftime('%Y%m%d')
  245. if c.m_expiry < today:
  246. logging.info('initialize_subscription_mgr: ignoring expired contract %s%s%s' % (c.m_expiry, c.m_strike, c.m_right))
  247. return False
  248. return True
  249. contracts = filter(lambda x: is_outstanding(x),
  250. map(lambda x: ContractHelper.kvstring2object(x, Contract), json.loads(self.rs.get(key))))
  251. self.contract_subscription_mgr.load_subscription(contracts)
  252. def persist_subscriptions(self, contracts):
  253. key = self.config.get("tws_gateway", "subscription_manager.subscriptions.redis_key").strip('"').strip("'")
  254. #cs = json.dumps(map(lambda x: ContractHelper.contract2kvstring(x) if x <> None else None, contracts))
  255. cs = json.dumps(map(lambda x: ContractHelper.object2kvstring(x) if x <> None else None, contracts))
  256. logging.debug('Tws_gateway: updating subscription table to redis store %s' % cs)
  257. self.rs.set(key, cs)
  258. def initialize_redis(self, config):
  259. r_host = config.get("redis", "redis.server").strip('"').strip("'")
  260. r_port = config.get("redis", "redis.port")
  261. r_db = config.get("redis", "redis.db")
  262. self.rs = redis.Redis(r_host, r_port, r_db)
  263. try:
  264. self.rs.client_list()
  265. except redis.ConnectionError:
  266. logging.error('TWS_gateway: unable to connect to redis server using these settings: %s port:%d db:%d' % (r_host, r_port, r_db))
  267. logging.error('aborting...')
  268. sys.exit(-1)
  269. def reset_message_offset(self):
  270. topic_offsets = map(lambda topic: (topic, self.cli_request_handler.get_partition_offsets(topic, 0, -1, 999)), TWS_Protocol.topicMethods + TWS_Protocol.gatewayMethods)
  271. topic_offsets = filter(lambda x: x <> None, map(lambda x: (x[0], x[1][1], x[1][0]) if len(x[1]) > 1 else None, topic_offsets))
  272. logging.info('TWS_gateway set topic offset to the latest point\n%s' % (''.join('%s,%s,%s\n' % (x[0], x[1], x[2]) for x in topic_offsets)))
  273. # the set_topic_partitions method clears out all previous settings when executed
  274. # therefore it's not possible to call the function multiple times:
  275. # self.consumer.set_topic_partitions(('gw_subscriptions', 0, 114,)
  276. # self.consumer.set_topic_partitions(('tickPrice', 0, 27270,))
  277. # as the second call will wipe out whatever was done previously
  278. self.cli_request_handler.set_topic_partitions(*topic_offsets)
  279. def run(self):
  280. for message in self.cli_request_handler:
  281. logging.info("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
  282. message.offset, message.key,
  283. message.value))
  284. # print ("TWS_gateway: received client request %s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
  285. # message.offset, message.key,
  286. # message.value))
  287. getattr(self, message.topic, None)(message.value)
  288. #self.cli_request_handler.task_done(message)
  289. def on_ib_conn_broken(self, msg):
  290. logging.error('TWS_gateway: detected broken IB connection!')
  291. self.ib_conn_status = 'ERROR'
  292. self.tlock.acquire() # this function may get called multiple times
  293. try: # block until another party finishes executing
  294. if self.ib_conn_status == 'OK': # check status
  295. return # if already fixed up while waiting, return
  296. self.eDisconnect()
  297. self.eConnect()
  298. while not self.connection.isConnected():
  299. logging.error('TWS_gateway: attempt to reconnect...')
  300. self.eConnect()
  301. sleep(2)
  302. # we arrived here because the connection has been restored
  303. # resubscribe tickers again!
  304. logging.info('TWS_gateway: IB connection restored...resubscribe contracts')
  305. self.contract_subscription_mgr.force_resubscription()
  306. finally:
  307. self.tlock.release()
  308. def eConnect(self):
  309. logging.info('TWS_gateway - eConnect. Connecting to %s:%s App Id: %s' % (self.host, self.port, self.clientId))
  310. self.connection.eConnect(self.host, self.port, self.clientId)
  311. return self.connection.isConnected()
  312. def reqAccountUpdates(self, value=None):
  313. logging.info('TWS_gateway - reqAccountUpdates value=%s' % value)
  314. self.connection.reqAccountUpdates(1, '')
  315. def reqAccountSummary(self, value):
  316. logging.info('TWS_gateway - reqAccountSummary value=%s' % value)
  317. vals = map(lambda x: x.encode('ascii') if isinstance(x, unicode) else x, json.loads(value))
  318. self.connection.reqAccountSummary(vals[0], vals[1], vals[2])
  319. def reqOpenOrders(self, value=None):
  320. self.connection.reqOpenOrders()
  321. def reqPositions(self, value=None):
  322. self.connection.reqPositions()
  323. def reqExecutions(self, value):
  324. try:
  325. filt = ExecutionFilter() if value == '' else ExecutionFilterHelper.kvstring2object(value, ExecutionFilter)
  326. self.connection.reqExecutions(0, filt)
  327. except:
  328. logging.error(traceback.format_exc())
  329. def reqIds(self, value=None):
  330. self.connection.reqIds(1)
  331. def reqNewsBulletins(self):
  332. self.connection.reqNewsBulletins(1)
  333. def cancelNewsBulletins(self):
  334. self.connection.cancelNewsBulletins()
  335. def setServerLogLevel(self):
  336. self.connection.setServerLogLevel(3)
  337. def reqAutoOpenOrders(self):
  338. self.connection.reqAutoOpenOrders(1)
  339. def reqAllOpenOrders(self):
  340. self.connection.reqAllOpenOrders()
  341. def reqManagedAccts(self):
  342. self.connection.reqManagedAccts()
  343. def requestFA(self):
  344. self.connection.requestFA(1)
  345. def reqMktData(self, sm_contract):
  346. logging.info('TWS Gateway received reqMktData request: %s' % sm_contract)
  347. try:
  348. #self.contract_subscription_mgr.reqMktData(ContractHelper.kvstring2contract(sm_contract))
  349. self.contract_subscription_mgr.reqMktData(ContractHelper.kvstring2object(sm_contract, Contract))
  350. except:
  351. pass
  352. def reqHistoricalData(self):
  353. contract = Contract()
  354. contract.m_symbol = 'QQQQ'
  355. contract.m_secType = 'STK'
  356. contract.m_exchange = 'SMART'
  357. endtime = strftime('%Y%m%d %H:%M:%S')
  358. self.connection.reqHistoricalData(
  359. tickerId=1,
  360. contract=contract,
  361. endDateTime=endtime,
  362. durationStr='1 D',
  363. barSizeSetting='1 min',
  364. whatToShow='TRADES',
  365. useRTH=0,
  366. formatDate=1)
  367. def placeOrder(self, value=None):
  368. logging.info('TWS_gateway - placeOrder value=%s' % value)
  369. try:
  370. vals = json.loads(value)
  371. except ValueError:
  372. logging.error('TWS_gateway - placeOrder Exception %s' % traceback.format_exc())
  373. return
  374. # c = ContractHelper.kvstring2contract(vals[1])
  375. o = OrderHelper.kvstring2object(vals[2], Order)
  376. o.__dict__['transmit'] = self.ib_order_transmit
  377. # print c.__dict__
  378. # print o.__dict__
  379. # print '---------------------'
  380. #self.connection.placeOrder(vals[0], ContractHelper.kvstring2contract(vals[1]), OrderHelper.kvstring2object(vals[2], Order))
  381. self.connection.placeOrder(vals[0], ContractHelper.kvstring2object(vals[1], Contract), OrderHelper.kvstring2object(vals[2], Order))
  382. # self.connection.placeOrder(orderId, contract, newOptOrder)
  383. def eDisconnect(self, value=None):
  384. sleep(2)
  385. self.connection.eDisconnect()
  386. ####################################################################3
  387. # Gateway commands
  388. def gw_req_subscriptions(self, value=None):
  389. #subm = map(lambda i: ContractHelper.contract2kvstring(self.contract_subscription_mgr.handle[i]), range(len(self.contract_subscription_mgr.handle)))
  390. #subm = map(lambda i: ContractHelper.object2kvstring(self.contract_subscription_mgr.handle[i]), range(len(self.contract_subscription_mgr.handle)))
  391. subm = map(lambda i: (i, ContractHelper.object2kvstring(self.contract_subscription_mgr.handle[i])), range(len(self.contract_subscription_mgr.handle)))
  392. print subm
  393. if subm:
  394. self.tws_event_handler.broadcast_event('gw_subscriptions', {'subscriptions': subm}, source='GW')
  395. class SubscriptionManager():
  396. parent = None
  397. # array list of contracts
  398. handle = []
  399. # contract key map to contract ID (index of the handle array)
  400. tickerId = {}
  401. persist_f = None
  402. def __init__(self, parent=None):
  403. self.parent = parent
  404. def load_subscription(self, contracts):
  405. for c in contracts:
  406. self.reqMktData(c)
  407. self.dump()
  408. # returns -1 if not found, else the key id (which could be a zero value)
  409. def is_subscribed(self, contract):
  410. #print self.conId.keys()
  411. ckey = ContractHelper.makeRedisKeyEx(contract)
  412. if ckey not in self.tickerId.keys():
  413. return -1
  414. else:
  415. # note that 0 can be a key
  416. # be careful when checking the return values
  417. # check for true false instead of using implicit comparsion
  418. return self.tickerId[ckey]
  419. # def reqMktDataxx(self, contract):
  420. # print '---------------'
  421. # contractTuple = ('USO', 'STK', 'SMART', 'USD', '', 0.0, '')
  422. # stkContract = self.makeStkContract(contractTuple)
  423. # stkContract.m_includeExpired = False
  424. # self.parent.connection.reqMktData(1, stkContract, '', False)
  425. #
  426. # contractTuple = ('IBM', 'STK', 'SMART', 'USD', '', 0.0, '')
  427. # stkContract = self.makeStkContract(contractTuple)
  428. # stkContract.m_includeExpired = False
  429. # print stkContract
  430. # print stkContract.__dict__
  431. # self.parent.connection.reqMktData(2, stkContract, '', False)
  432. #
  433. def reqMktData(self, contract):
  434. #logging.info('SubscriptionManager: reqMktData')
  435. def add_subscription(contract):
  436. self.handle.append(contract)
  437. newId = len(self.handle) - 1
  438. self.tickerId[ContractHelper.makeRedisKeyEx(contract)] = newId
  439. return newId
  440. id = self.is_subscribed(contract)
  441. if id == -1: # not found
  442. id = add_subscription(contract)
  443. #
  444. # the conId must be set to zero when calling TWS reqMktData
  445. # otherwise TWS will fail to subscribe the contract
  446. self.parent.connection.reqMktData(id, contract, '', False)
  447. if self.persist_f:
  448. logging.debug('SubscriptionManager reqMktData: trigger callback')
  449. self.persist_f(self.handle)
  450. logging.info('SubscriptionManager: reqMktData. Requesting market data, id = %d, contract = %s' % (id, ContractHelper.makeRedisKeyEx(contract)))
  451. else:
  452. self.parent.connection.reqMktData(1000 + id, contract, '', True)
  453. logging.info('SubscriptionManager: reqMktData: contract already subscribed. Request snapshot = %d, contract = %s' % (id, ContractHelper.makeRedisKeyEx(contract)))
  454. #self.dump()
  455. # def makeStkContract(self, contractTuple):
  456. # newContract = Contract()
  457. # newContract.m_symbol = contractTuple[0]
  458. # newContract.m_secType = contractTuple[1]
  459. # newContract.m_exchange = contractTuple[2]
  460. # newContract.m_currency = contractTuple[3]
  461. # newContract.m_expiry = contractTuple[4]
  462. # newContract.m_strike = contractTuple[5]
  463. # newContract.m_right = contractTuple[6]
  464. # print 'Contract Values:%s,%s,%s,%s,%s,%s,%s:' % contractTuple
  465. # return newContract
  466. # use only after a broken connection is restored
  467. # to re request market data
  468. def force_resubscription(self):
  469. # starting from index 1 of the contract list, call reqmktdata, and format the result into a list of tuples
  470. for i in range(1, len(self.handle)):
  471. self.parent.connection.reqMktData(i, self.handle[i], '', False)
  472. logging.info('force_resubscription: %s' % ContractHelper.printContract(self.handle[i]))
  473. def itemAt(self, id):
  474. if id > 0 and id < len(self.handle):
  475. return self.handle[id]
  476. return -1
  477. def dump(self):
  478. logging.info('subscription manager table:---------------------')
  479. logging.info(''.join('%d: {%s},\n' % (i, ''.join('%s:%s, ' % (k, v) for k, v in self.handle[i].__dict__.iteritems() )\
  480. if self.handle[i] <> None else '' ) for i in range(len(self.handle)))\
  481. )
  482. #logging.info( ''.join('%s[%d],\n' % (k, v) for k, v in self.conId.iteritems()))
  483. logging.info( 'Number of instruments subscribed: %d' % len(self.handle))
  484. logging.info( '------------------------------------------------')
  485. def register_persistence_callback(self, func):
  486. logging.info('subscription manager: registering callback')
  487. self.persist_f = func
  488. def test_subscription():
  489. s = SubscriptionManager()
  490. contractTuple = ('HSI', 'FUT', 'HKFE', 'HKD', '20151029', 0, '')
  491. c = ContractHelper.makeContract(contractTuple)
  492. print s.is_subscribed(c)
  493. print s.add_subscription(c)
  494. print s.is_subscribed(c)
  495. s.dump()
  496. fr = open('/home/larry-13.04/workspace/finopt/data/subscription-hsio.txt')
  497. for l in fr.readlines():
  498. if l[0] <> '#':
  499. s.add_subscription(ContractHelper.makeContract(tuple([t for t in l.strip('\n').split(',')])))
  500. fr.close()
  501. s.dump()
  502. fr = open('/home/larry-13.04/workspace/finopt/data/subscription-hsio.txt')
  503. for l in fr.readlines():
  504. if l[0] <> '#':
  505. print s.add_subscription(ContractHelper.makeContract(tuple([t for t in l.strip('\n').split(',')])))
  506. s.dump()
  507. contractTuple = ('HSI', 'FUT', 'HKFE', 'HKD', '20151127', 0, '')
  508. c = ContractHelper.makeContract(contractTuple)
  509. print s.is_subscribed(c)
  510. print s.add_subscription(c)
  511. print s.is_subscribed(c), ContractHelper.printContract(s.itemAt(s.is_subscribed(c)))
  512. print 'test itemAt:'
  513. contractTuple = ('HSI', 'OPT', 'HKFE', 'HKD', '20151127', 21400, 'C')
  514. c = ContractHelper.makeContract(contractTuple)
  515. print s.is_subscribed(c), ContractHelper.printContract(s.itemAt(s.is_subscribed(c)))
  516. if __name__ == '__main__':
  517. if len(sys.argv) != 2:
  518. print("Usage: %s <config file>" % sys.argv[0])
  519. exit(-1)
  520. cfg_path= sys.argv[1:]
  521. config = ConfigParser.SafeConfigParser()
  522. if len(config.read(cfg_path)) == 0:
  523. raise ValueError, "Failed to open config file"
  524. logconfig = eval(config.get("tws_gateway", "tws_gateway.logconfig").strip('"').strip("'"))
  525. logconfig['format'] = '%(asctime)s %(levelname)-8s %(message)s'
  526. logging.basicConfig(**logconfig)
  527. khost = config.get("epc", "kafka.host").strip('"').strip("'")
  528. kport = config.get("epc", "kafka.port")
  529. ihost = config.get("market", "ib.gateway").strip('"').strip("'")
  530. iport = int(config.get("market", "ib.port"))
  531. iappid = int(config.get("market", "ib.appid.portfolio"))
  532. #print 'give kafka server some time to register the topics...'
  533. #sleep(2)
  534. app = TWS_gateway(ihost, iport, iappid, khost, kport, config)
  535. app.start()
  536. print 'TWS_gateway started.'
  537. #
  538. # test_subscription()