tws_gateway.py 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. import sys
  4. from time import sleep, strftime
  5. import time, datetime
  6. import ConfigParser
  7. from optparse import OptionParser
  8. import logging
  9. import thread
  10. import threading
  11. import traceback
  12. import json
  13. from threading import Lock
  14. from ib.ext.Contract import Contract
  15. from ib.ext.EWrapper import EWrapper
  16. from ib.ext.EClientSocket import EClientSocket
  17. from ib.ext.ExecutionFilter import ExecutionFilter
  18. from ib.ext.Execution import Execution
  19. from ib.ext.OrderState import OrderState
  20. from ib.ext.Order import Order
  21. from kafka import KafkaProducer
  22. from kafka import KafkaConsumer
  23. from kafka.errors import KafkaError
  24. from misc2.helpers import ContractHelper, OrderHelper, ExecutionFilterHelper
  25. from comms.ib_heartbeat import IbHeartBeat
  26. from comms.test.base_messaging import Prosumer, BaseMessageListener
  27. from misc2.observer import Subscriber, Publisher
  28. from tws_protocol_helper import TWS_Protocol
  29. import redis
  30. class TWS_event_handler(EWrapper):
  31. TICKER_GAP = 1000
  32. def __init__(self, tws_gateway):
  33. # reference to the parent TWS_gateway
  34. self.producer = tws_gateway
  35. def broadcast_event(self, message, mapping): #, source='IB'):
  36. try:
  37. dict = self.tick_process_message(message, mapping) #, source)
  38. if message == 'gw_subscriptions' or message == 'gw_subscription_changed':
  39. logging.info('TWS_event_handler: broadcast event: %s [%s]' % (dict['typeName'], dict))
  40. self.producer.send(message, self.producer.message_dumps(dict))
  41. except:
  42. logging.error('broadcast_event: exception while encoding IB event to client: [%s]' % message)
  43. logging.error(traceback.format_exc())
  44. def tick_process_message(self, message_name, items): #, source):
  45. t = items.copy()
  46. # if the tickerId is in the snapshot range
  47. # deduct the gap to derive the original tickerId
  48. # --- check logic in subscription manager
  49. if (t['tickerId'] >= TWS_event_handler.TICKER_GAP):
  50. t['tickerId'] = t['tickerId'] - TWS_event_handler.TICKER_GAP
  51. try:
  52. del(t['self'])
  53. except (KeyError, ):
  54. pass
  55. for k,v in t.iteritems():
  56. #print k, v, type(v)
  57. #if type(v) in [Contract, Execution, ExecutionFilter, OrderState, Order, CommissionReport]:
  58. if 'ib.ext.' in str(type(v)):
  59. t[k] = v.__dict__
  60. else:
  61. t[k] = v
  62. # t['ts'] = time.time()
  63. # t['typeName'] = message_name
  64. # t['source'] = source
  65. return t
  66. def tickPrice(self, tickerId, field, price, canAutoExecute):
  67. self.broadcast_event('tickPrice', vars())
  68. def tickSize(self, tickerId, field, size):
  69. self.broadcast_event('tickSize', vars()) #vars())
  70. def tickOptionComputation(self, tickerId, field, impliedVol, delta, optPrice, pvDividend, gamma, vega, theta, undPrice):
  71. #self.broadcast_event('tickOptionComputation', self.tick_process_message(vars())) #vars())
  72. pass
  73. def tickGeneric(self, tickerId, tickType, value):
  74. #self.broadcast_event('tickGeneric', vars())
  75. self.broadcast_event('tickGeneric', vars()) #vars())
  76. def tickString(self, tickerId, tickType, value):
  77. #self.broadcast_event('tickString', vars())
  78. self.broadcast_event('tickString', vars()) #vars())
  79. def tickEFP(self, tickerId, tickType, basisPoints, formattedBasisPoints, impliedFuture, holdDays, futureExpiry, dividendImpact, dividendsToExpiry):
  80. self.broadcast_event('tickEFP', vars())
  81. def orderStatus(self, orderId, status, filled, remaining, avgFillPrice, permId, parentId, lastFillPrice, clientId, whyHeId):
  82. self.broadcast_event('orderStatus', vars())
  83. def openOrder(self, orderId, contract, order, state):
  84. self.broadcast_event('openOrder', vars())
  85. def openOrderEnd(self):
  86. self.broadcast_event('openOrderEnd', vars())
  87. def updateAccountValue(self, key, value, currency, accountName):
  88. self.broadcast_event('updateAccountValue', vars())
  89. def updatePortfolio(self, contract, position, marketPrice, marketValue, averageCost, unrealizedPNL, realizedPNL, accountName):
  90. self.broadcast_event('updatePortfolio', vars())
  91. def updateAccountTime(self, timeStamp):
  92. self.broadcast_event('updateAccountTime', vars())
  93. def accountDownloadEnd(self, accountName):
  94. self.broadcast_event('accountDownloadEnd', vars())
  95. def nextValidId(self, orderId):
  96. self.broadcast_event('nextValidId', vars())
  97. def contractDetails(self, reqId, contractDetails):
  98. self.broadcast_event('contractDetails', vars())
  99. def contractDetailsEnd(self, reqId):
  100. self.broadcast_event('contractDetailsEnd', vars())
  101. def bondContractDetails(self, reqId, contractDetails):
  102. self.broadcast_event('bondContractDetails', vars())
  103. def execDetails(self, reqId, contract, execution):
  104. self.broadcast_event('execDetails', vars())
  105. def execDetailsEnd(self, reqId):
  106. self.broadcast_event('execDetailsEnd', vars())
  107. def connectionClosed(self):
  108. self.broadcast_event('connectionClosed', {})
  109. def error(self, id=None, errorCode=None, errorMsg=None):
  110. logging.error(self.serialize_vars_to_dict('error', vars()))
  111. self.broadcast_event('error', vars())
  112. def error_0(self, strvalue=None):
  113. logging.error(self.serialize_vars_to_dict('error_0', vars()))
  114. self.broadcast_event('error_0', vars())
  115. def error_1(self, id=None, errorCode=None, errorMsg=None):
  116. logging.error(self.serialize_vars_to_dict('error_1', vars()))
  117. self.broadcast_event('error_1', vars())
  118. def updateMktDepth(self, tickerId, position, operation, side, price, size):
  119. self.broadcast_event('updateMktDepth', vars())
  120. def updateMktDepthL2(self, tickerId, position, marketMaker, operation, side, price, size):
  121. self.broadcast_event('updateMktDepthL2', vars())
  122. def updateNewsBulletin(self, msgId, msgType, message, origExchange):
  123. self.broadcast_event('updateNewsBulletin', vars())
  124. def managedAccounts(self, accountsList):
  125. logging.info(self.serialize_vars_to_dict('managedAccounts', vars()))
  126. self.broadcast_event('managedAccounts', vars())
  127. def receiveFA(self, faDataType, xml):
  128. self.broadcast_event('receiveFA', vars())
  129. def historicalData(self, reqId, date, open, high, low, close, volume, count, WAP, hasGaps):
  130. self.broadcast_event('historicalData', vars())
  131. def scannerParameters(self, xml):
  132. self.broadcast_event('scannerParameters', vars())
  133. def scannerData(self, reqId, rank, contractDetails, distance, benchmark, projection, legsStr):
  134. self.broadcast_event('scannerData', vars())
  135. def commissionReport(self, commissionReport):
  136. self.broadcast_event('commissionReport', vars())
  137. def currentTime(self, time):
  138. self.broadcast_event('currentTime', vars())
  139. def deltaNeutralValidation(self, reqId, underComp):
  140. self.broadcast_event('deltaNeutralValidation', vars())
  141. def fundamentalData(self, reqId, data):
  142. self.broadcast_event('fundamentalData', vars())
  143. def marketDataType(self, reqId, marketDataType):
  144. self.broadcast_event('marketDataType', vars())
  145. def realtimeBar(self, reqId, time, open, high, low, close, volume, wap, count):
  146. self.broadcast_event('realtimeBar', vars())
  147. def scannerDataEnd(self, reqId):
  148. self.broadcast_event('scannerDataEnd', vars())
  149. def tickSnapshotEnd(self, reqId):
  150. self.broadcast_event('tickSnapshotEnd', vars())
  151. def position(self, account, contract, pos, avgCost):
  152. self.broadcast_event('position', vars())
  153. def positionEnd(self):
  154. self.broadcast_event('positionEnd', vars())
  155. def accountSummary(self, reqId, account, tag, value, currency):
  156. self.broadcast_event('accountSummary', vars())
  157. def accountSummaryEnd(self, reqId):
  158. self.broadcast_event('accountSummaryEnd', vars())
  159. class TWS_gateway():
  160. # config
  161. config = None
  162. # redis connection
  163. rs = None
  164. # channel clients' requests to IB/TWS
  165. gw_message_prosumer = None
  166. # manage conID / contracts mapping
  167. contract_subscription_mgr = None
  168. connection = None
  169. # handler to process incoming IB/TWS messages and echo back to clients
  170. tws_event_handler = None
  171. # monitor IB connection / heart beat
  172. ibh = None
  173. tlock = None
  174. ib_conn_status = None
  175. ib_order_transmit = False
  176. def __init__(self, host, port, clientId, kafka_host, kafka_port, config):
  177. super(TWS_gateway, self).__init__()
  178. self.config = config
  179. self.host = host
  180. self.port = port
  181. self.clientId = clientId
  182. self.ib_order_transmit = config.get("tws_gateway", "tws_gateway.order_transmit").strip('"').strip("'") if \
  183. config.get("tws_gateway", "tws_gateway.order_transmit").strip('"').strip("'") <> None\
  184. else False
  185. logging.info('starting up TWS_gateway...')
  186. logging.info('Order straight through (no-touch) flag = %s' % ('True' if self.ib_order_transmit == True else 'False'))
  187. logging.info('connecting to Redis server...')
  188. self.initialize_redis(config)
  189. logging.info('starting up gateway message handler - kafka Prosumer...')
  190. client_requests = list(TWS_Protocol.topicMethods) + list(TWS_Protocol.gatewayMethods)
  191. self.gw_message_prosumer = Prosumer(name='tws_gw_prosumer', kwargs={'bootstrap_host':'localhost', 'bootstrap_port':9092,
  192. 'redis_host':'localhost', 'redis_port':6379, 'redis_db':0,
  193. 'group_id': 'groupA', 'session_timeout_ms':10000,
  194. 'topics': client_requests, 'clear_offsets' : 0})
  195. logging.info('register listeners for client requests')
  196. logging.info('starting up TWS_event_handler...')
  197. self.tws_event_handler = TWS_event_handler(self.gw_message_prosumer)
  198. logging.info('starting up IB EClientSocket...')
  199. self.connection = EClientSocket(self.tws_event_handler)
  200. #
  201. # register listeners
  202. #
  203. if not self.eConnect():
  204. logging.error('TWS_gateway: unable to establish connection to IB %s:%d' % (self.host, self.port))
  205. sys.exit(-1)
  206. else:
  207. # start heart beat monitor
  208. logging.info('starting up IB heart beat monitor...')
  209. self.tlock = Lock()
  210. self.ibh = IbHeartBeat(config)
  211. self.ibh.register_listener([self.on_ib_conn_broken])
  212. self.ibh.run()
  213. logging.info('starting up subscription manager...')
  214. self.initialize_subscription_mgr()
  215. def start_gateway(self):
  216. self.get_gw_message_prosumer().start_prosumer()
  217. def initialize_subscription_mgr(self):
  218. self.contract_subscription_mgr = SubscriptionManager(self)
  219. self.contract_subscription_mgr.register_persistence_callback(self.persist_subscriptions)
  220. key = self.config.get("tws_gateway", "subscription_manager.subscriptions.redis_key").strip('"').strip("'")
  221. if self.rs.get(key):
  222. #contracts = map(lambda x: ContractHelper.kvstring2contract(x), json.loads(self.rs.get(key)))
  223. def is_outstanding(c):
  224. today = time.strftime('%Y%m%d')
  225. if c.m_expiry < today:
  226. logging.info('initialize_subscription_mgr: ignoring expired contract %s%s%s' % (c.m_expiry, c.m_strike, c.m_right))
  227. return False
  228. return True
  229. contracts = filter(lambda x: is_outstanding(x),
  230. map(lambda x: ContractHelper.kvstring2object(x, Contract), json.loads(self.rs.get(key))))
  231. self.contract_subscription_mgr.load_subscription(contracts)
  232. def persist_subscriptions(self, contracts):
  233. key = self.config.get("tws_gateway", "subscription_manager.subscriptions.redis_key").strip('"').strip("'")
  234. #cs = json.dumps(map(lambda x: ContractHelper.contract2kvstring(x) if x <> None else None, contracts))
  235. cs = json.dumps(map(lambda x: ContractHelper.object2kvstring(x) if x <> None else None, contracts))
  236. logging.debug('Tws_gateway: updating subscription table to redis store %s' % cs)
  237. self.rs.set(key, cs)
  238. def initialize_redis(self, config):
  239. r_host = config.get("redis", "redis.server").strip('"').strip("'")
  240. r_port = config.get("redis", "redis.port")
  241. r_db = config.get("redis", "redis.db")
  242. self.rs = redis.Redis(r_host, r_port, r_db)
  243. try:
  244. self.rs.client_list()
  245. except redis.ConnectionError:
  246. logging.error('TWS_gateway: unable to connect to redis server using these settings: %s port:%d db:%d' % (r_host, r_port, r_db))
  247. logging.error('aborting...')
  248. sys.exit(-1)
  249. def eConnect(self):
  250. logging.info('ClientRequestHandler - eConnect. Connecting to %s:%s App Id: %s' % (self.host, self.port, self.clientId))
  251. self.connection.eConnect(self.host, self.port, self.clientId)
  252. return self.tws_connect.isConnected()
  253. def eDisconnect(self, value=None):
  254. sleep(2)
  255. self.connection.eDisconnect()
  256. def get_gw_message_prosumer(self):
  257. return self.gw_message_prosumer
  258. def on_ib_conn_broken(self, msg):
  259. logging.error('TWS_gateway: detected broken IB connection!')
  260. self.ib_conn_status = 'ERROR'
  261. self.tlock.acquire() # this function may get called multiple times
  262. try: # block until another party finishes executing
  263. if self.ib_conn_status == 'OK': # check status
  264. return # if already fixed up while waiting, return
  265. self.eDisconnect()
  266. self.eConnect()
  267. while not self.connection.isConnected():
  268. logging.error('TWS_gateway: attempt to reconnect...')
  269. self.eConnect()
  270. sleep(2)
  271. # we arrived here because the connection has been restored
  272. # resubscribe tickers again!
  273. logging.info('TWS_gateway: IB connection restored...resubscribe contracts')
  274. self.contract_subscription_mgr.force_resubscription()
  275. finally:
  276. self.tlock.release()
  277. class ClientRequestHandler(BaseMessageListener):
  278. def __init__(self, name, tws_gateway):
  279. BaseMessageListener.__init__(self, name, tws_gateway)
  280. self.producer = tws_gateway
  281. self.tws_connect = tws_gateway.connection
  282. def reqAccountUpdates(self, value=None):
  283. logging.info('ClientRequestHandler - reqAccountUpdates value=%s' % value)
  284. self.tws_connect.reqAccountUpdates(1, '')
  285. def reqAccountSummary(self, value):
  286. logging.info('ClientRequestHandler - reqAccountSummary value=%s' % value)
  287. vals = map(lambda x: x.encode('ascii') if isinstance(x, unicode) else x, json.loads(value))
  288. self.tws_connect.reqAccountSummary(vals[0], vals[1], vals[2])
  289. def reqOpenOrders(self, value=None):
  290. self.tws_connect.reqOpenOrders()
  291. def reqPositions(self, value=None):
  292. self.tws_connect.reqPositions()
  293. def reqExecutions(self, value):
  294. try:
  295. filt = ExecutionFilter() if value == '' else ExecutionFilterHelper.kvstring2object(value, ExecutionFilter)
  296. self.tws_connect.reqExecutions(0, filt)
  297. except:
  298. logging.error(traceback.format_exc())
  299. def reqIds(self, value=None):
  300. self.tws_connect.reqIds(1)
  301. def reqNewsBulletins(self):
  302. self.tws_connect.reqNewsBulletins(1)
  303. def cancelNewsBulletins(self):
  304. self.tws_connect.cancelNewsBulletins()
  305. def setServerLogLevel(self):
  306. self.tws_connect.setServerLogLevel(3)
  307. def reqAutoOpenOrders(self):
  308. self.tws_connect.reqAutoOpenOrders(1)
  309. def reqAllOpenOrders(self):
  310. self.tws_connect.reqAllOpenOrders()
  311. def reqManagedAccts(self):
  312. self.tws_connect.reqManagedAccts()
  313. def requestFA(self):
  314. self.tws_connect.requestFA(1)
  315. def reqMktData(self, sm_contract):
  316. logging.info('ClientRequestHandler received reqMktData request <no action: pass...> : %s' % sm_contract)
  317. def reqHistoricalData(self):
  318. contract = Contract()
  319. contract.m_symbol = 'QQQQ'
  320. contract.m_secType = 'STK'
  321. contract.m_exchange = 'SMART'
  322. endtime = strftime('%Y%m%d %H:%M:%S')
  323. self.tws_connect.reqHistoricalData(
  324. tickerId=1,
  325. contract=contract,
  326. endDateTime=endtime,
  327. durationStr='1 D',
  328. barSizeSetting='1 min',
  329. whatToShow='TRADES',
  330. useRTH=0,
  331. formatDate=1)
  332. def placeOrder(self, value=None):
  333. logging.info('TWS_gateway - placeOrder value=%s' % value)
  334. try:
  335. vals = json.loads(value)
  336. except ValueError:
  337. logging.error('TWS_gateway - placeOrder Exception %s' % traceback.format_exc())
  338. return
  339. # c = ContractHelper.kvstring2contract(vals[1])
  340. o = OrderHelper.kvstring2object(vals[2], Order)
  341. o.__dict__['transmit'] = self.ib_order_transmit
  342. # print c.__dict__
  343. # print o.__dict__
  344. # print '---------------------'
  345. #self.connection.placeOrder(vals[0], ContractHelper.kvstring2contract(vals[1]), OrderHelper.kvstring2object(vals[2], Order))
  346. self.tws_connect.placeOrder(vals[0], ContractHelper.kvstring2object(vals[1], Contract), OrderHelper.kvstring2object(vals[2], Order))
  347. # self.connection.placeOrder(orderId, contract, newOptOrder)
  348. """
  349. Client requests to TWS_gateway
  350. """
  351. def gw_req_subscriptions(self, value=None):
  352. #subm = map(lambda i: ContractHelper.contract2kvstring(self.contract_subscription_mgr.handle[i]), range(len(self.contract_subscription_mgr.handle)))
  353. #subm = map(lambda i: ContractHelper.object2kvstring(self.contract_subscription_mgr.handle[i]), range(len(self.contract_subscription_mgr.handle)))
  354. subm = map(lambda i: (i, ContractHelper.object2kvstring(self.contract_subscription_mgr.handle[i])), range(len(self.contract_subscription_mgr.handle)))
  355. print subm
  356. if subm:
  357. self.producer.send_message('gw_subscriptions', self.produce.message_dumps({'subscriptions': subm}))
  358. class SubscriptionManager(BaseMessageListener):
  359. # array list of contracts
  360. handle = []
  361. # contract key map to contract ID (index of the handle array)
  362. tickerId = {}
  363. persist_f = None
  364. def __init__(self, name, tws_gateway):
  365. BaseMessageListener.__init__(self, name)
  366. self.producer = tws_gateway
  367. self.tws_connect = tws_gateway.connection
  368. def load_subscription(self, contracts):
  369. for c in contracts:
  370. self.reqMktData(c)
  371. self.dump()
  372. # returns -1 if not found, else the key id (which could be a zero value)
  373. def is_subscribed(self, contract):
  374. #print self.conId.keys()
  375. ckey = ContractHelper.makeRedisKeyEx(contract)
  376. if ckey not in self.tickerId.keys():
  377. return -1
  378. else:
  379. # note that 0 can be a key
  380. # be careful when checking the return values
  381. # check for true false instead of using implicit comparsion
  382. return self.tickerId[ckey]
  383. # def reqMktDataxx(self, contract):
  384. # print '---------------'
  385. # contractTuple = ('USO', 'STK', 'SMART', 'USD', '', 0.0, '')
  386. # stkContract = self.makeStkContract(contractTuple)
  387. # stkContract.m_includeExpired = False
  388. # self.parent.connection.reqMktData(1, stkContract, '', False)
  389. #
  390. # contractTuple = ('IBM', 'STK', 'SMART', 'USD', '', 0.0, '')
  391. # stkContract = self.makeStkContract(contractTuple)
  392. # stkContract.m_includeExpired = False
  393. # print stkContract
  394. # print stkContract.__dict__
  395. # self.parent.connection.reqMktData(2, stkContract, '', False)
  396. #
  397. def reqMktData(self, kvs_contract):
  398. #logging.info('SubscriptionManager: reqMktData')
  399. logging.info('SubscriptionManager received reqMktData request: %s' % kvs_contract)
  400. def add_subscription(contract):
  401. self.handle.append(contract)
  402. newId = len(self.handle) - 1
  403. self.tickerId[ContractHelper.makeRedisKeyEx(contract)] = newId
  404. return newId
  405. contract = ContractHelper.kvstring2object(kvs_contract, Contract)
  406. id = self.is_subscribed(contract)
  407. if id == -1: # not found
  408. id = add_subscription(contract)
  409. #
  410. # the conId must be set to zero when calling TWS reqMktData
  411. # otherwise TWS will fail to subscribe the contract
  412. self.tws_connect.reqMktData(id, contract, '', False)
  413. if self.persist_f:
  414. logging.debug('SubscriptionManager reqMktData: trigger callback')
  415. self.persist_f(self.handle)
  416. logging.info('SubscriptionManager: reqMktData. Requesting market data, id = %d, contract = %s' % (id, ContractHelper.makeRedisKeyEx(contract)))
  417. else:
  418. self.tws_connect.reqMktData(1000 + id, contract, '', True)
  419. logging.info('SubscriptionManager: reqMktData: contract already subscribed. Request snapshot = %d, contract = %s' % (id, ContractHelper.makeRedisKeyEx(contract)))
  420. #self.dump()
  421. #
  422. # instruct gateway to broadcast new id has been assigned to a new contract
  423. #
  424. self.producer.send_message('gw_notify_subscription_changed', self.producer.message_dumps({id: ContractHelper.object2kvstring(contract)}))
  425. #>>>self.parent.gw_notify_subscription_changed({id: ContractHelper.object2kvstring(contract)})
  426. logging.info('SubscriptionManager reqMktData: gw_notify_subscription_changed: %d:%s' % (id, ContractHelper.makeRedisKeyEx(contract)))
  427. # def makeStkContract(self, contractTuple):
  428. # newContract = Contract()
  429. # newContract.m_symbol = contractTuple[0]
  430. # newContract.m_secType = contractTuple[1]
  431. # newContract.m_exchange = contractTuple[2]
  432. # newContract.m_currency = contractTuple[3]
  433. # newContract.m_expiry = contractTuple[4]
  434. # newContract.m_strike = contractTuple[5]
  435. # newContract.m_right = contractTuple[6]
  436. # print 'Contract Values:%s,%s,%s,%s,%s,%s,%s:' % contractTuple
  437. # return newContract
  438. # use only after a broken connection is restored
  439. # to re request market data
  440. def force_resubscription(self):
  441. # starting from index 1 of the contract list, call reqmktdata, and format the result into a list of tuples
  442. for i in range(1, len(self.handle)):
  443. self.tws_connect.reqMktData(i, self.handle[i], '', False)
  444. logging.info('force_resubscription: %s' % ContractHelper.printContract(self.handle[i]))
  445. def itemAt(self, id):
  446. if id > 0 and id < len(self.handle):
  447. return self.handle[id]
  448. return -1
  449. def dump(self):
  450. logging.info('subscription manager table:---------------------')
  451. logging.info(''.join('%d: {%s},\n' % (i, ''.join('%s:%s, ' % (k, v) for k, v in self.handle[i].__dict__.iteritems() )\
  452. if self.handle[i] <> None else '' ) for i in range(len(self.handle)))\
  453. )
  454. #logging.info( ''.join('%s[%d],\n' % (k, v) for k, v in self.conId.iteritems()))
  455. logging.info( 'Number of instruments subscribed: %d' % len(self.handle))
  456. logging.info( '------------------------------------------------')
  457. def register_persistence_callback(self, func):
  458. logging.info('subscription manager: registering callback')
  459. self.persist_f = func
  460. def test_subscription():
  461. s = SubscriptionManager()
  462. contractTuple = ('HSI', 'FUT', 'HKFE', 'HKD', '20151029', 0, '')
  463. c = ContractHelper.makeContract(contractTuple)
  464. print s.is_subscribed(c)
  465. print s.add_subscription(c)
  466. print s.is_subscribed(c)
  467. s.dump()
  468. fr = open('/home/larry-13.04/workspace/finopt/data/subscription-hsio.txt')
  469. for l in fr.readlines():
  470. if l[0] <> '#':
  471. s.add_subscription(ContractHelper.makeContract(tuple([t for t in l.strip('\n').split(',')])))
  472. fr.close()
  473. s.dump()
  474. fr = open('/home/larry-13.04/workspace/finopt/data/subscription-hsio.txt')
  475. for l in fr.readlines():
  476. if l[0] <> '#':
  477. print s.add_subscription(ContractHelper.makeContract(tuple([t for t in l.strip('\n').split(',')])))
  478. s.dump()
  479. contractTuple = ('HSI', 'FUT', 'HKFE', 'HKD', '20151127', 0, '')
  480. c = ContractHelper.makeContract(contractTuple)
  481. print s.is_subscribed(c)
  482. print s.add_subscription(c)
  483. print s.is_subscribed(c), ContractHelper.printContract(s.itemAt(s.is_subscribed(c)))
  484. print 'test itemAt:'
  485. contractTuple = ('HSI', 'OPT', 'HKFE', 'HKD', '20151127', 21400, 'C')
  486. c = ContractHelper.makeContract(contractTuple)
  487. print s.is_subscribed(c), ContractHelper.printContract(s.itemAt(s.is_subscribed(c)))
  488. if __name__ == '__main__':
  489. if len(sys.argv) != 2:
  490. print("Usage: %s <config file>" % sys.argv[0])
  491. exit(-1)
  492. cfg_path= sys.argv[1:]
  493. config = ConfigParser.SafeConfigParser()
  494. if len(config.read(cfg_path)) == 0:
  495. raise ValueError, "Failed to open config file"
  496. logconfig = eval(config.get("tws_gateway", "tws_gateway.logconfig").strip('"').strip("'"))
  497. logconfig['format'] = '%(asctime)s %(levelname)-8s %(message)s'
  498. logging.basicConfig(**logconfig)
  499. khost = config.get("epc", "kafka.host").strip('"').strip("'")
  500. kport = config.get("epc", "kafka.port")
  501. ihost = config.get("market", "ib.gateway").strip('"').strip("'")
  502. iport = int(config.get("market", "ib.port"))
  503. iappid = int(config.get("market", "ib.appid.portfolio"))
  504. #print 'give kafka server some time to register the topics...'
  505. #sleep(2)
  506. app = TWS_gateway(ihost, iport, iappid, khost, kport, config)
  507. app.start_gateway()
  508. print 'TWS_gateway started.'
  509. #
  510. # test_subscription()