tws_gateway.py 30 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. import sys
  4. from time import sleep, strftime
  5. import time, datetime
  6. import ConfigParser
  7. from optparse import OptionParser
  8. import logging
  9. import thread
  10. import threading
  11. import traceback
  12. import json
  13. from threading import Lock
  14. from ib.ext.Contract import Contract
  15. from ib.ext.EWrapper import EWrapper
  16. from ib.ext.EClientSocket import EClientSocket
  17. from ib.ext.ExecutionFilter import ExecutionFilter
  18. from ib.ext.Execution import Execution
  19. from ib.ext.OrderState import OrderState
  20. from ib.ext.Order import Order
  21. from kafka import KafkaProducer
  22. from kafka import KafkaConsumer
  23. from kafka.errors import KafkaError
  24. #from kafka.client import KafkaClient
  25. #from kafka.producer import SimpleProducer
  26. #from kafka.common import LeaderNotAvailableError
  27. from misc2.helpers import ContractHelper, OrderHelper, ExecutionFilterHelper
  28. from comms.ib_heartbeat import IbHeartBeat
  29. from tws_protocol_helper import TWS_Protocol
  30. import redis
  31. class TWS_event_handler(EWrapper):
  32. TICKER_GAP = 1000
  33. producer = None
  34. def __init__(self, host, port):
  35. #client = KafkaClient()#{'bootstrap_servers': '%s:%s' % (host, port)})
  36. #self.producer = SimpleProducer(client, async=False)
  37. self.producer = KafkaProducer(bootstrap_servers='%s:%s' % (host, port))
  38. logging.info('TWS_event_handler: __init__ Creating kafka client producer at %s:%s' % (host, port))
  39. def serialize_vars_to_dict(self, message, mapping, source='IB'):
  40. def create_kmessage(items):
  41. d = {}
  42. for k,v in items:
  43. #print k, v, type(v)
  44. #if type(v) in [Contract, Execution, ExecutionFilter, OrderState, Order, CommissionReport]:
  45. if 'ib.ext.' in str(type(v)):
  46. d[k] = v.__dict__
  47. else:
  48. d[k] = v
  49. d['ts'] = time.time()
  50. d['typeName'] = message
  51. d['source'] = source
  52. return d
  53. try:
  54. del(mapping['self'])
  55. except (KeyError, ):
  56. pass
  57. items = list(mapping.items())
  58. return create_kmessage(items)
  59. def broadcast_event(self, message, mapping, source='IB'):
  60. try:
  61. dict = self.serialize_vars_to_dict(message, mapping, source)
  62. if message == 'gw_subscriptions' or message == 'gw_subscription_changed':
  63. logging.info('TWS_event_handler: broadcast event: %s [%s]' % (dict['typeName'], dict))
  64. self.producer.send(message, json.dumps(dict))
  65. except:
  66. logging.error('broadcast_event: exception while encoding IB event to client: [%s]' % message)
  67. logging.error(traceback.format_exc())
  68. #
  69. # try to broadcast the message a 2nd time
  70. # no catch if fails again
  71. if message == 'gw_subscriptions':
  72. sleep(2)
  73. logging.info('TWS_event_handler: Retry once broadcasting gw_subscription %s [%s]' % (dict['typeName'], dict))
  74. self.producer.send(message, json.dumps(dict))
  75. def tick_process_message(self, items):
  76. t = {}
  77. t = items.copy()
  78. # if the tickerId is in the snapshot range
  79. # deduct the gap to derive the original tickerId
  80. # --- check logic in subscription manager
  81. if (t['tickerId'] >= TWS_event_handler.TICKER_GAP):
  82. t['tickerId'] = t['tickerId'] - TWS_event_handler.TICKER_GAP
  83. try:
  84. del(t['self'])
  85. except (KeyError, ):
  86. pass
  87. return t
  88. def tickPrice(self, tickerId, field, price, canAutoExecute):
  89. self.broadcast_event('tickPrice', self.tick_process_message(vars()))
  90. def tickSize(self, tickerId, field, size):
  91. self.broadcast_event('tickSize', self.tick_process_message(vars())) #vars())
  92. def tickOptionComputation(self, tickerId, field, impliedVol, delta, optPrice, pvDividend, gamma, vega, theta, undPrice):
  93. #self.broadcast_event('tickOptionComputation', self.tick_process_message(vars())) #vars())
  94. pass
  95. def tickGeneric(self, tickerId, tickType, value):
  96. #self.broadcast_event('tickGeneric', vars())
  97. self.broadcast_event('tickGeneric', self.tick_process_message(vars())) #vars())
  98. def tickString(self, tickerId, tickType, value):
  99. #self.broadcast_event('tickString', vars())
  100. self.broadcast_event('tickString', self.tick_process_message(vars())) #vars())
  101. def tickEFP(self, tickerId, tickType, basisPoints, formattedBasisPoints, impliedFuture, holdDays, futureExpiry, dividendImpact, dividendsToExpiry):
  102. self.broadcast_event('tickEFP', vars())
  103. def orderStatus(self, orderId, status, filled, remaining, avgFillPrice, permId, parentId, lastFillPrice, clientId, whyHeId):
  104. self.broadcast_event('orderStatus', vars())
  105. def openOrder(self, orderId, contract, order, state):
  106. self.broadcast_event('openOrder', vars())
  107. def openOrderEnd(self):
  108. self.broadcast_event('openOrderEnd', vars())
  109. def updateAccountValue(self, key, value, currency, accountName):
  110. self.broadcast_event('updateAccountValue', vars())
  111. def updatePortfolio(self, contract, position, marketPrice, marketValue, averageCost, unrealizedPNL, realizedPNL, accountName):
  112. self.broadcast_event('updatePortfolio', vars())
  113. def updateAccountTime(self, timeStamp):
  114. self.broadcast_event('updateAccountTime', vars())
  115. def accountDownloadEnd(self, accountName):
  116. self.broadcast_event('accountDownloadEnd', vars())
  117. def nextValidId(self, orderId):
  118. self.broadcast_event('nextValidId', vars())
  119. def contractDetails(self, reqId, contractDetails):
  120. self.broadcast_event('contractDetails', vars())
  121. def contractDetailsEnd(self, reqId):
  122. self.broadcast_event('contractDetailsEnd', vars())
  123. def bondContractDetails(self, reqId, contractDetails):
  124. self.broadcast_event('bondContractDetails', vars())
  125. def execDetails(self, reqId, contract, execution):
  126. self.broadcast_event('execDetails', vars())
  127. def execDetailsEnd(self, reqId):
  128. self.broadcast_event('execDetailsEnd', vars())
  129. def connectionClosed(self):
  130. self.broadcast_event('connectionClosed', {})
  131. def error(self, id=None, errorCode=None, errorMsg=None):
  132. logging.error(self.serialize_vars_to_dict('error', vars()))
  133. self.broadcast_event('error', vars())
  134. def error_0(self, strvalue=None):
  135. logging.error(self.serialize_vars_to_dict('error_0', vars()))
  136. self.broadcast_event('error_0', vars())
  137. def error_1(self, id=None, errorCode=None, errorMsg=None):
  138. logging.error(self.serialize_vars_to_dict('error_1', vars()))
  139. self.broadcast_event('error_1', vars())
  140. def updateMktDepth(self, tickerId, position, operation, side, price, size):
  141. self.broadcast_event('updateMktDepth', vars())
  142. def updateMktDepthL2(self, tickerId, position, marketMaker, operation, side, price, size):
  143. self.broadcast_event('updateMktDepthL2', vars())
  144. def updateNewsBulletin(self, msgId, msgType, message, origExchange):
  145. self.broadcast_event('updateNewsBulletin', vars())
  146. def managedAccounts(self, accountsList):
  147. logging.info(self.serialize_vars_to_dict('managedAccounts', vars()))
  148. self.broadcast_event('managedAccounts', vars())
  149. def receiveFA(self, faDataType, xml):
  150. self.broadcast_event('receiveFA', vars())
  151. def historicalData(self, reqId, date, open, high, low, close, volume, count, WAP, hasGaps):
  152. self.broadcast_event('historicalData', vars())
  153. def scannerParameters(self, xml):
  154. self.broadcast_event('scannerParameters', vars())
  155. def scannerData(self, reqId, rank, contractDetails, distance, benchmark, projection, legsStr):
  156. self.broadcast_event('scannerData', vars())
  157. def commissionReport(self, commissionReport):
  158. self.broadcast_event('commissionReport', vars())
  159. def currentTime(self, time):
  160. self.broadcast_event('currentTime', vars())
  161. def deltaNeutralValidation(self, reqId, underComp):
  162. self.broadcast_event('deltaNeutralValidation', vars())
  163. def fundamentalData(self, reqId, data):
  164. self.broadcast_event('fundamentalData', vars())
  165. def marketDataType(self, reqId, marketDataType):
  166. self.broadcast_event('marketDataType', vars())
  167. def realtimeBar(self, reqId, time, open, high, low, close, volume, wap, count):
  168. self.broadcast_event('realtimeBar', vars())
  169. def scannerDataEnd(self, reqId):
  170. self.broadcast_event('scannerDataEnd', vars())
  171. def tickSnapshotEnd(self, reqId):
  172. self.broadcast_event('tickSnapshotEnd', vars())
  173. def position(self, account, contract, pos, avgCost):
  174. self.broadcast_event('position', vars())
  175. def positionEnd(self):
  176. self.broadcast_event('positionEnd', vars())
  177. def accountSummary(self, reqId, account, tag, value, currency):
  178. self.broadcast_event('accountSummary', vars())
  179. def accountSummaryEnd(self, reqId):
  180. self.broadcast_event('accountSummaryEnd', vars())
  181. class TWS_gateway(threading.Thread):
  182. # config
  183. config = None
  184. # redis connection
  185. rs = None
  186. # channel clients' requests to IB/TWS
  187. cli_request_handler = None
  188. # manage conID / contracts mapping
  189. contract_subscription_mgr = None
  190. connection = None
  191. # handler to process incoming IB/TWS messages and echo back to clients
  192. tws_event_handler = None
  193. # monitor IB connection / heart beat
  194. ibh = None
  195. tlock = None
  196. ib_conn_status = None
  197. ib_order_transmit = False
  198. def __init__(self, host, port, clientId, kafka_host, kafka_port, config):
  199. super(TWS_gateway, self).__init__()
  200. self.config = config
  201. self.host = host
  202. self.port = port
  203. self.clientId = clientId
  204. self.ib_order_transmit = config.get("tws_gateway", "tws_gateway.order_transmit").strip('"').strip("'") if \
  205. config.get("tws_gateway", "tws_gateway.order_transmit").strip('"').strip("'") <> None\
  206. else False
  207. logging.info('starting up TWS_gateway...')
  208. logging.info('Order straight through (no-touch) flag = %s' % ('True' if self.ib_order_transmit == True else 'False'))
  209. logging.info('connecting to Redis server...')
  210. self.initialize_redis(config)
  211. logging.info('starting up TWS_event_handler...')
  212. self.tws_event_handler = TWS_event_handler(kafka_host, kafka_port)
  213. logging.info('starting up IB EClientSocket...')
  214. self.connection = EClientSocket(self.tws_event_handler)
  215. logging.info('starting up client request handler - kafkaConsumer...')
  216. self.cli_request_handler = KafkaConsumer( *[v for v in list(TWS_Protocol.topicMethods) + list(TWS_Protocol.gatewayMethods) ], \
  217. bootstrap_servers=['%s:%s' % (kafka_host, kafka_port)],\
  218. group_id = 'epc.tws_gateway',\
  219. enable_auto_commit=True,\
  220. auto_commit_interval_ms=30 * 1000,\
  221. auto_offset_reset='latest') # discard old ones
  222. #self.reset_message_offset()
  223. if not self.eConnect():
  224. logging.error('TWS_gateway: unable to establish connection to IB %s:%d' % (self.host, self.port))
  225. sys.exit(-1)
  226. else:
  227. # start heart beat monitor
  228. logging.info('starting up IB heart beat monitor...')
  229. self.tlock = Lock()
  230. self.ibh = IbHeartBeat(config)
  231. self.ibh.register_listener([self.on_ib_conn_broken])
  232. self.ibh.run()
  233. logging.info('starting up subscription manager...')
  234. self.initialize_subscription_mgr()
  235. def initialize_subscription_mgr(self):
  236. self.contract_subscription_mgr = SubscriptionManager(self)
  237. self.contract_subscription_mgr.register_persistence_callback(self.persist_subscriptions)
  238. key = self.config.get("tws_gateway", "subscription_manager.subscriptions.redis_key").strip('"').strip("'")
  239. if self.rs.get(key):
  240. #contracts = map(lambda x: ContractHelper.kvstring2contract(x), json.loads(self.rs.get(key)))
  241. def is_outstanding(c):
  242. today = time.strftime('%Y%m%d')
  243. if c.m_expiry < today:
  244. logging.info('initialize_subscription_mgr: ignoring expired contract %s%s%s' % (c.m_expiry, c.m_strike, c.m_right))
  245. return False
  246. return True
  247. contracts = filter(lambda x: is_outstanding(x),
  248. map(lambda x: ContractHelper.kvstring2object(x, Contract), json.loads(self.rs.get(key))))
  249. self.contract_subscription_mgr.load_subscription(contracts)
  250. def persist_subscriptions(self, contracts):
  251. key = self.config.get("tws_gateway", "subscription_manager.subscriptions.redis_key").strip('"').strip("'")
  252. #cs = json.dumps(map(lambda x: ContractHelper.contract2kvstring(x) if x <> None else None, contracts))
  253. cs = json.dumps(map(lambda x: ContractHelper.object2kvstring(x) if x <> None else None, contracts))
  254. logging.debug('Tws_gateway: updating subscription table to redis store %s' % cs)
  255. self.rs.set(key, cs)
  256. def initialize_redis(self, config):
  257. r_host = config.get("redis", "redis.server").strip('"').strip("'")
  258. r_port = config.get("redis", "redis.port")
  259. r_db = config.get("redis", "redis.db")
  260. self.rs = redis.Redis(r_host, r_port, r_db)
  261. try:
  262. self.rs.client_list()
  263. except redis.ConnectionError:
  264. logging.error('TWS_gateway: unable to connect to redis server using these settings: %s port:%d db:%d' % (r_host, r_port, r_db))
  265. logging.error('aborting...')
  266. sys.exit(-1)
  267. def reset_message_offset(self):
  268. topic_offsets = map(lambda topic: (topic, self.cli_request_handler.get_partition_offsets(topic, 0, -1, 999)), TWS_Protocol.topicMethods + TWS_Protocol.gatewayMethods)
  269. topic_offsets = filter(lambda x: x <> None, map(lambda x: (x[0], x[1][1], x[1][0]) if len(x[1]) > 1 else None, topic_offsets))
  270. logging.info('TWS_gateway set topic offset to the latest point\n%s' % (''.join('%s,%s,%s\n' % (x[0], x[1], x[2]) for x in topic_offsets)))
  271. # the set_topic_partitions method clears out all previous settings when executed
  272. # therefore it's not possible to call the function multiple times:
  273. # self.consumer.set_topic_partitions(('gw_subscriptions', 0, 114,)
  274. # self.consumer.set_topic_partitions(('tickPrice', 0, 27270,))
  275. # as the second call will wipe out whatever was done previously
  276. self.cli_request_handler.set_topic_partitions(*topic_offsets)
  277. def run(self):
  278. for message in self.cli_request_handler:
  279. logging.info("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
  280. message.offset, message.key,
  281. message.value))
  282. # print ("TWS_gateway: received client request %s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
  283. # message.offset, message.key,
  284. # message.value))
  285. getattr(self, message.topic, None)(message.value)
  286. #self.cli_request_handler.task_done(message)
  287. def on_ib_conn_broken(self, msg):
  288. logging.error('TWS_gateway: detected broken IB connection!')
  289. self.ib_conn_status = 'ERROR'
  290. self.tlock.acquire() # this function may get called multiple times
  291. try: # block until another party finishes executing
  292. if self.ib_conn_status == 'OK': # check status
  293. return # if already fixed up while waiting, return
  294. self.eDisconnect()
  295. self.eConnect()
  296. while not self.connection.isConnected():
  297. logging.error('TWS_gateway: attempt to reconnect...')
  298. self.eConnect()
  299. sleep(2)
  300. # we arrived here because the connection has been restored
  301. # resubscribe tickers again!
  302. logging.info('TWS_gateway: IB connection restored...resubscribe contracts')
  303. self.contract_subscription_mgr.force_resubscription()
  304. finally:
  305. self.tlock.release()
  306. def eConnect(self):
  307. logging.info('TWS_gateway - eConnect. Connecting to %s:%s App Id: %s' % (self.host, self.port, self.clientId))
  308. self.connection.eConnect(self.host, self.port, self.clientId)
  309. return self.connection.isConnected()
  310. def reqAccountUpdates(self, value=None):
  311. logging.info('TWS_gateway - reqAccountUpdates value=%s' % value)
  312. self.connection.reqAccountUpdates(1, '')
  313. def reqAccountSummary(self, value):
  314. logging.info('TWS_gateway - reqAccountSummary value=%s' % value)
  315. vals = map(lambda x: x.encode('ascii') if isinstance(x, unicode) else x, json.loads(value))
  316. self.connection.reqAccountSummary(vals[0], vals[1], vals[2])
  317. def reqOpenOrders(self, value=None):
  318. self.connection.reqOpenOrders()
  319. def reqPositions(self, value=None):
  320. self.connection.reqPositions()
  321. def reqExecutions(self, value):
  322. try:
  323. filt = ExecutionFilter() if value == '' else ExecutionFilterHelper.kvstring2object(value, ExecutionFilter)
  324. self.connection.reqExecutions(0, filt)
  325. except:
  326. logging.error(traceback.format_exc())
  327. def reqIds(self, value=None):
  328. self.connection.reqIds(1)
  329. def reqNewsBulletins(self):
  330. self.connection.reqNewsBulletins(1)
  331. def cancelNewsBulletins(self):
  332. self.connection.cancelNewsBulletins()
  333. def setServerLogLevel(self):
  334. self.connection.setServerLogLevel(3)
  335. def reqAutoOpenOrders(self):
  336. self.connection.reqAutoOpenOrders(1)
  337. def reqAllOpenOrders(self):
  338. self.connection.reqAllOpenOrders()
  339. def reqManagedAccts(self):
  340. self.connection.reqManagedAccts()
  341. def requestFA(self):
  342. self.connection.requestFA(1)
  343. def reqMktData(self, sm_contract):
  344. logging.info('TWS Gateway received reqMktData request: %s' % sm_contract)
  345. try:
  346. #self.contract_subscription_mgr.reqMktData(ContractHelper.kvstring2contract(sm_contract))
  347. self.contract_subscription_mgr.reqMktData(ContractHelper.kvstring2object(sm_contract, Contract))
  348. except:
  349. pass
  350. def reqHistoricalData(self):
  351. contract = Contract()
  352. contract.m_symbol = 'QQQQ'
  353. contract.m_secType = 'STK'
  354. contract.m_exchange = 'SMART'
  355. endtime = strftime('%Y%m%d %H:%M:%S')
  356. self.connection.reqHistoricalData(
  357. tickerId=1,
  358. contract=contract,
  359. endDateTime=endtime,
  360. durationStr='1 D',
  361. barSizeSetting='1 min',
  362. whatToShow='TRADES',
  363. useRTH=0,
  364. formatDate=1)
  365. def placeOrder(self, value=None):
  366. logging.info('TWS_gateway - placeOrder value=%s' % value)
  367. try:
  368. vals = json.loads(value)
  369. except ValueError:
  370. logging.error('TWS_gateway - placeOrder Exception %s' % traceback.format_exc())
  371. return
  372. # c = ContractHelper.kvstring2contract(vals[1])
  373. o = OrderHelper.kvstring2object(vals[2], Order)
  374. o.__dict__['transmit'] = self.ib_order_transmit
  375. # print c.__dict__
  376. # print o.__dict__
  377. # print '---------------------'
  378. #self.connection.placeOrder(vals[0], ContractHelper.kvstring2contract(vals[1]), OrderHelper.kvstring2object(vals[2], Order))
  379. self.connection.placeOrder(vals[0], ContractHelper.kvstring2object(vals[1], Contract), OrderHelper.kvstring2object(vals[2], Order))
  380. # self.connection.placeOrder(orderId, contract, newOptOrder)
  381. def eDisconnect(self, value=None):
  382. sleep(2)
  383. self.connection.eDisconnect()
  384. ####################################################################3
  385. # Gateway commands
  386. def gw_req_subscriptions(self, value=None):
  387. #subm = map(lambda i: ContractHelper.contract2kvstring(self.contract_subscription_mgr.handle[i]), range(len(self.contract_subscription_mgr.handle)))
  388. #subm = map(lambda i: ContractHelper.object2kvstring(self.contract_subscription_mgr.handle[i]), range(len(self.contract_subscription_mgr.handle)))
  389. subm = map(lambda i: (i, ContractHelper.object2kvstring(self.contract_subscription_mgr.handle[i])), range(len(self.contract_subscription_mgr.handle)))
  390. print subm
  391. if subm:
  392. self.tws_event_handler.broadcast_event('gw_subscriptions', {'subscriptions': subm}, source='GW')
  393. #####################################################################
  394. #
  395. # broadcast gateway notifications
  396. def gw_notify_subscription_changed(self, value):
  397. #
  398. # this function is triggered by SubscriptionManager
  399. # value param:
  400. #
  401. # {id: contractkv_str}
  402. #
  403. logging.info("TWS_gateway:gw_notify_subscription_changed: %s" % value)
  404. self.tws_event_handler.broadcast_event('gw_subscription_changed', value, source='GW')
  405. class SubscriptionManager():
  406. parent = None
  407. # array list of contracts
  408. handle = []
  409. # contract key map to contract ID (index of the handle array)
  410. tickerId = {}
  411. persist_f = None
  412. def __init__(self, parent=None):
  413. self.parent = parent
  414. def load_subscription(self, contracts):
  415. for c in contracts:
  416. self.reqMktData(c)
  417. self.dump()
  418. # returns -1 if not found, else the key id (which could be a zero value)
  419. def is_subscribed(self, contract):
  420. #print self.conId.keys()
  421. ckey = ContractHelper.makeRedisKeyEx(contract)
  422. if ckey not in self.tickerId.keys():
  423. return -1
  424. else:
  425. # note that 0 can be a key
  426. # be careful when checking the return values
  427. # check for true false instead of using implicit comparsion
  428. return self.tickerId[ckey]
  429. # def reqMktDataxx(self, contract):
  430. # print '---------------'
  431. # contractTuple = ('USO', 'STK', 'SMART', 'USD', '', 0.0, '')
  432. # stkContract = self.makeStkContract(contractTuple)
  433. # stkContract.m_includeExpired = False
  434. # self.parent.connection.reqMktData(1, stkContract, '', False)
  435. #
  436. # contractTuple = ('IBM', 'STK', 'SMART', 'USD', '', 0.0, '')
  437. # stkContract = self.makeStkContract(contractTuple)
  438. # stkContract.m_includeExpired = False
  439. # print stkContract
  440. # print stkContract.__dict__
  441. # self.parent.connection.reqMktData(2, stkContract, '', False)
  442. #
  443. def reqMktData(self, contract):
  444. #logging.info('SubscriptionManager: reqMktData')
  445. def add_subscription(contract):
  446. self.handle.append(contract)
  447. newId = len(self.handle) - 1
  448. self.tickerId[ContractHelper.makeRedisKeyEx(contract)] = newId
  449. return newId
  450. id = self.is_subscribed(contract)
  451. if id == -1: # not found
  452. id = add_subscription(contract)
  453. #
  454. # the conId must be set to zero when calling TWS reqMktData
  455. # otherwise TWS will fail to subscribe the contract
  456. self.parent.connection.reqMktData(id, contract, '', False)
  457. if self.persist_f:
  458. logging.debug('SubscriptionManager reqMktData: trigger callback')
  459. self.persist_f(self.handle)
  460. logging.info('SubscriptionManager: reqMktData. Requesting market data, id = %d, contract = %s' % (id, ContractHelper.makeRedisKeyEx(contract)))
  461. else:
  462. self.parent.connection.reqMktData(1000 + id, contract, '', True)
  463. logging.info('SubscriptionManager: reqMktData: contract already subscribed. Request snapshot = %d, contract = %s' % (id, ContractHelper.makeRedisKeyEx(contract)))
  464. #self.dump()
  465. #
  466. # instruct gateway to broadcast new id has been assigned to a new contract
  467. #
  468. self.parent.gw_notify_subscription_changed({id: ContractHelper.object2kvstring(contract)})
  469. logging.info('SubscriptionManager reqMktData: gw_notify_subscription_changed: %d:%s' % (id, ContractHelper.makeRedisKeyEx(contract)))
  470. # def makeStkContract(self, contractTuple):
  471. # newContract = Contract()
  472. # newContract.m_symbol = contractTuple[0]
  473. # newContract.m_secType = contractTuple[1]
  474. # newContract.m_exchange = contractTuple[2]
  475. # newContract.m_currency = contractTuple[3]
  476. # newContract.m_expiry = contractTuple[4]
  477. # newContract.m_strike = contractTuple[5]
  478. # newContract.m_right = contractTuple[6]
  479. # print 'Contract Values:%s,%s,%s,%s,%s,%s,%s:' % contractTuple
  480. # return newContract
  481. # use only after a broken connection is restored
  482. # to re request market data
  483. def force_resubscription(self):
  484. # starting from index 1 of the contract list, call reqmktdata, and format the result into a list of tuples
  485. for i in range(1, len(self.handle)):
  486. self.parent.connection.reqMktData(i, self.handle[i], '', False)
  487. logging.info('force_resubscription: %s' % ContractHelper.printContract(self.handle[i]))
  488. def itemAt(self, id):
  489. if id > 0 and id < len(self.handle):
  490. return self.handle[id]
  491. return -1
  492. def dump(self):
  493. logging.info('subscription manager table:---------------------')
  494. logging.info(''.join('%d: {%s},\n' % (i, ''.join('%s:%s, ' % (k, v) for k, v in self.handle[i].__dict__.iteritems() )\
  495. if self.handle[i] <> None else '' ) for i in range(len(self.handle)))\
  496. )
  497. #logging.info( ''.join('%s[%d],\n' % (k, v) for k, v in self.conId.iteritems()))
  498. logging.info( 'Number of instruments subscribed: %d' % len(self.handle))
  499. logging.info( '------------------------------------------------')
  500. def register_persistence_callback(self, func):
  501. logging.info('subscription manager: registering callback')
  502. self.persist_f = func
  503. def test_subscription():
  504. s = SubscriptionManager()
  505. contractTuple = ('HSI', 'FUT', 'HKFE', 'HKD', '20151029', 0, '')
  506. c = ContractHelper.makeContract(contractTuple)
  507. print s.is_subscribed(c)
  508. print s.add_subscription(c)
  509. print s.is_subscribed(c)
  510. s.dump()
  511. fr = open('/home/larry-13.04/workspace/finopt/data/subscription-hsio.txt')
  512. for l in fr.readlines():
  513. if l[0] <> '#':
  514. s.add_subscription(ContractHelper.makeContract(tuple([t for t in l.strip('\n').split(',')])))
  515. fr.close()
  516. s.dump()
  517. fr = open('/home/larry-13.04/workspace/finopt/data/subscription-hsio.txt')
  518. for l in fr.readlines():
  519. if l[0] <> '#':
  520. print s.add_subscription(ContractHelper.makeContract(tuple([t for t in l.strip('\n').split(',')])))
  521. s.dump()
  522. contractTuple = ('HSI', 'FUT', 'HKFE', 'HKD', '20151127', 0, '')
  523. c = ContractHelper.makeContract(contractTuple)
  524. print s.is_subscribed(c)
  525. print s.add_subscription(c)
  526. print s.is_subscribed(c), ContractHelper.printContract(s.itemAt(s.is_subscribed(c)))
  527. print 'test itemAt:'
  528. contractTuple = ('HSI', 'OPT', 'HKFE', 'HKD', '20151127', 21400, 'C')
  529. c = ContractHelper.makeContract(contractTuple)
  530. print s.is_subscribed(c), ContractHelper.printContract(s.itemAt(s.is_subscribed(c)))
  531. if __name__ == '__main__':
  532. if len(sys.argv) != 2:
  533. print("Usage: %s <config file>" % sys.argv[0])
  534. exit(-1)
  535. cfg_path= sys.argv[1:]
  536. config = ConfigParser.SafeConfigParser()
  537. if len(config.read(cfg_path)) == 0:
  538. raise ValueError, "Failed to open config file"
  539. logconfig = eval(config.get("tws_gateway", "tws_gateway.logconfig").strip('"').strip("'"))
  540. logconfig['format'] = '%(asctime)s %(levelname)-8s %(message)s'
  541. logging.basicConfig(**logconfig)
  542. khost = config.get("epc", "kafka.host").strip('"').strip("'")
  543. kport = config.get("epc", "kafka.port")
  544. ihost = config.get("market", "ib.gateway").strip('"').strip("'")
  545. iport = int(config.get("market", "ib.port"))
  546. iappid = int(config.get("market", "ib.appid.portfolio"))
  547. #print 'give kafka server some time to register the topics...'
  548. #sleep(2)
  549. app = TWS_gateway(ihost, iport, iappid, khost, kport, config)
  550. app.start()
  551. print 'TWS_gateway started.'
  552. #
  553. # test_subscription()