sample_tws_client.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580
  1. # -*- coding: utf-8 -*-
  2. import sys, traceback
  3. import json
  4. import logging
  5. import thread, threading
  6. from threading import Lock
  7. import ConfigParser
  8. from time import sleep
  9. import time, datetime
  10. from ib.ext.Contract import Contract
  11. from ib.ext.Order import Order
  12. from ib.ext.ExecutionFilter import ExecutionFilter
  13. from random import randint
  14. from finopt.options_analytics import AnalyticsListener
  15. from misc2.helpers import ContractHelper, OrderHelper, ExecutionFilterHelper
  16. from finopt.options_chain import OptionsChain
  17. from comms.tws_client import SimpleTWSClient
  18. from kafka import KafkaConsumer
  19. from comms.tws_protocol_helper import TWS_Protocol
  20. class SampleClient(SimpleTWSClient):
  21. tickerMap = {}
  22. def dump(self, msg_name, mapping):
  23. # the mapping is a comms.tws_protocol_helper.Message object
  24. # which can be accessed directly using the __dict__.['xxx'] method
  25. items = list(mapping.items())
  26. items.sort()
  27. print ('>>> %s <<< %s' % (msg_name, ''.join('%s=%s, '% (k, v if k <> 'ts' else datetime.datetime.fromtimestamp(v).strftime('%Y-%m-%d %H:%M:%S.%f')) for k, v in items)))
  28. def accountSummaryEnd(self, items):
  29. self.dump('accountSummaryEnd', items)
  30. def accountSummary(self, items):
  31. self.dump('accountSummary', items)
  32. # override the tickSize message
  33. def tickSize(self, items):
  34. try:
  35. contract = self.tickerMap[items.__dict__['tickerId']]
  36. field = items.__dict__['field']
  37. ct = ContractHelper.kv2object(contract, Contract)
  38. print 'tickSize>> %s' % ('[%d:%s] %s=%0.4f [%s]' % \
  39. (items.__dict__['tickerId'], ContractHelper.makeRedisKeyEx(ct),\
  40. 'bid' if field == 0 else ('ask' if field == 3 else ('last' if field == 5 else field)), \
  41. items.__dict__['size'], datetime.datetime.fromtimestamp(items.__dict__['ts']).strftime('%Y-%m-%d %H:%M:%S.%f')))
  42. except KeyError:
  43. print 'tickSize: keyerror: (this could happen on the 1st run as the subscription manager sub list is still empty.'
  44. print items
  45. def tickPrice(self, items):
  46. try:
  47. contract = self.tickerMap[items.__dict__['tickerId']]
  48. field = items.__dict__['field']
  49. ct = ContractHelper.kv2object(contract, Contract)
  50. print 'tickPrice>> %s' % ('[%d:%s] %s=%0.4f [%s]' % \
  51. (items.__dict__['tickerId'], ContractHelper.makeRedisKeyEx(ct),\
  52. 'bid_q' if field == 1 else ('ask_q' if field == 2 else ('last_q' if field == 4 else field)), \
  53. items.__dict__['price'], datetime.datetime.fromtimestamp(items.__dict__['ts']).strftime('%Y-%m-%d %H:%M:%S.%f')))
  54. except KeyError:
  55. print 'tickPrice: keyerror:'
  56. print items
  57. def tickString(self, items):
  58. pass
  59. def tickGeneric(self, items):
  60. pass
  61. def positionEnd(self, items):
  62. self.dump('positionEnd', items)
  63. def position(self, items):
  64. self.dump('position', items)
  65. #pass
  66. def error(self, items):
  67. self.dump('error', items)
  68. def error_0(self, items):
  69. self.dump('error', items)
  70. def error_1(self, items):
  71. self.dump('error', items)
  72. def gw_subscriptions(self, items):
  73. # <class 'comms.tws_protocol_helper.Message'>
  74. # sample
  75. #[[0, u'{"m_conId": 0, "m_right": "", "m_symbol": "HSI", "m_secType": "FUT", "m_includeExpired": false, "m_expiry": "20151127", "m_currency": "HKD", "m_exchange": "HKFE", "m_strike": 0}'], [1, u'{"m_conId": 0, "m_right": "P", "m_symbol": "HSI", "m_secType": "OPT", "m_includeExpired": false, "m_expiry": "20151127", "m_currency": "HKD", "m_exchange": "HKFE", "m_strike": 22200}'], [2, u'{"m_conId": 0, "m_right": "P", "m_symbol": "HSI", "m_secType": "OPT", "m_includeExpired": false, "m_expiry": "20151127", "m_currency": "HKD", "m_exchange": "HKFE", "m_strike": 22400}'], [3, u'{"m_conId": 0, "m_right": "P", "m_symbol": "HSI", "m_secType": "OPT", "m_includeExpired": false, "m_expiry": "20151127", "m_currency": "HKD", "m_exchange": "HKFE", "m_strike": 22600}'], [4, u'{"m_conId": 0, "m_right": "P", "m_symbol": "HSI", "m_secType": "OPT", "m_includeExpired": false, "m_expiry": "20151127", "m_currency": "HKD", "m_exchange": "HKFE", "m_strike": 22800}'], [5, u'{"m_conId": 0, "m_right": "P", "m_symbol": "HSI", "m_secType": "OPT", "m_includeExpired": false, "m_expiry": "20151127", "m_currency": "HKD", "m_exchange": "HKFE", "m_strike": 23000}'], [6, u'{"m_conId": 0, "m_right": "P", "m_symbol": "HSI", "m_secType": "OPT", "m_includeExpired": false, "m_expiry": "20151127", "m_currency": "HKD", "m_exchange": "HKFE", "m_strike": 23200}']]
  76. #print items.__dict__['subscriptions']
  77. #l = map(lambda x: {x[0]: x[1]}, map(lambda x: (x[0], ContractHelper.kvstring2object(x[1], Contract)), items.__dict__['subscriptions']))
  78. l = map(lambda x: {x[0]: x[1]}, map(lambda x: (x[0], json.loads(x[1])), items.__dict__['subscriptions']))
  79. for i in l:
  80. self.tickerMap.update(i)
  81. print 'gw_subscriptions -> dump tickerMap '
  82. print self.tickerMap
  83. # override this function to perform your own processing
  84. # def accountDownloadEnd(self, items):
  85. # pass
  86. # override this function to perform your own processing
  87. # def execDetailsEnd(self, items):
  88. # pass
  89. # override this function to perform your own processing
  90. # def updateAccountTime(self, items):
  91. # pass
  92. # override this function to perform your own processing
  93. # def deltaNeutralValidation(self, items):
  94. # pass
  95. # override this function to perform your own processing
  96. # def orderStatus(self, items):
  97. # pass
  98. # override this function to perform your own processing
  99. # def updateAccountValue(self, items):
  100. # pass
  101. # override this function to perform your own processing
  102. # def historicalData(self, items):
  103. # pass
  104. # override this function to perform your own processing
  105. # def openOrderEnd(self, items):
  106. # pass
  107. # override this function to perform your own processing
  108. # def updatePortfolio(self, items):
  109. # pass
  110. # override this function to perform your own processing
  111. # def managedAccounts(self, items):
  112. # pass
  113. # override this function to perform your own processing
  114. # def contractDetailsEnd(self, items):
  115. # pass
  116. # override this function to perform your own processing
  117. # def positionEnd(self, items):
  118. # pass
  119. # override this function to perform your own processing
  120. # def bondContractDetails(self, items):
  121. # pass
  122. # override this function to perform your own processing
  123. # def accountSummary(self, items):
  124. # pass
  125. # override this function to perform your own processing
  126. # def updateNewsBulletin(self, items):
  127. # pass
  128. # override this function to perform your own processing
  129. # def scannerParameters(self, items):
  130. # pass
  131. # override this function to perform your own processing
  132. # def tickString(self, items):
  133. # pass
  134. # override this function to perform your own processing
  135. # def accountSummaryEnd(self, items):
  136. # pass
  137. # override this function to perform your own processing
  138. # def scannerDataEnd(self, items):
  139. # pass
  140. # override this function to perform your own processing
  141. # def commissionReport(self, items):
  142. # pass
  143. # override this function to perform your own processing
  144. # def error(self, items):
  145. # pass
  146. # override this function to perform your own processing
  147. # def tickGeneric(self, items):
  148. # pass
  149. # override this function to perform your own processing
  150. # def tickPrice(self, items):
  151. # pass
  152. # override this function to perform your own processing
  153. # def nextValidId(self, items):
  154. # pass
  155. # override this function to perform your own processing
  156. # def openOrder(self, items):
  157. # pass
  158. # override this function to perform your own processing
  159. # def realtimeBar(self, items):
  160. # pass
  161. # override this function to perform your own processing
  162. # def contractDetails(self, items):
  163. # pass
  164. # override this function to perform your own processing
  165. # def execDetails(self, items):
  166. # pass
  167. # override this function to perform your own processing
  168. # def tickOptionComputation(self, items):
  169. # pass
  170. # override this function to perform your own processing
  171. # def updateMktDepth(self, items):
  172. # pass
  173. # override this function to perform your own processing
  174. # def scannerData(self, items):
  175. # pass
  176. # override this function to perform your own processing
  177. # def currentTime(self, items):
  178. # pass
  179. # override this function to perform your own processing
  180. # def error_0(self, items):
  181. # pass
  182. # override this function to perform your own processing
  183. # def error_1(self, items):
  184. # pass
  185. # override this function to perform your own processing
  186. # def tickSnapshotEnd(self, items):
  187. # pass
  188. # override this function to perform your own processing
  189. # def tickSize(self, items):
  190. # pass
  191. # override this function to perform your own processing
  192. # def receiveFA(self, items):
  193. # pass
  194. # override this function to perform your own processing
  195. # def connectionClosed(self, items):
  196. # pass
  197. # override this function to perform your own processing
  198. # def position(self, items):
  199. # pass
  200. # override this function to perform your own processing
  201. # def updateMktDepthL2(self, items):
  202. # pass
  203. # override this function to perform your own processing
  204. # def fundamentalData(self, items):
  205. # pass
  206. # override this function to perform your own processing
  207. # def tickEFP(self, items):
  208. # pass
  209. def on_ib_message(msg):
  210. print msg
  211. def makeOrder(action, orderID, tif, orderType, price, qty):
  212. newOptOrder = Order()
  213. newOptOrder.m_orderId = orderID
  214. newOptOrder.m_clientId = 0
  215. newOptOrder.m_permid = 0
  216. newOptOrder.m_action = action
  217. newOptOrder.m_lmtPrice = price
  218. newOptOrder.m_auxPrice = 0
  219. newOptOrder.m_tif = tif
  220. newOptOrder.m_transmit = True
  221. newOptOrder.m_orderType = orderType
  222. newOptOrder.m_totalQuantity = qty
  223. return newOptOrder
  224. def test1():
  225. twsc = SimpleTWSClient(host, port, '888')
  226. twsc.registerAll([on_ib_message])
  227. #twsc.get_command_handler().reqAccountSummary(100, 'All', "AccountType,NetLiquidation,TotalCashValue,BuyingPower,EquityWithLoanValue")
  228. contract = Contract() #
  229. contract.m_symbol = 'EUR'
  230. contract.m_currency = 'USD'
  231. contract.m_secType = 'CASH'
  232. contract.m_exchange = 'IDEALPRO'
  233. twsc.get_command_handler().reqMktData(contract)
  234. twsc.connect()
  235. sleep(4)
  236. twsc.disconnect()
  237. print 'completed...'
  238. def test2():
  239. contract = Contract() #
  240. contract.m_symbol = 'EUR'
  241. contract.m_currency = 'USD'
  242. contract.m_secType = 'CASH'
  243. contract.m_exchange = 'IDEALPRO'
  244. c = SampleClient(host, port, 'SampleClient-777')
  245. c.connect()
  246. c.get_command_handler().gw_req_subscriptions()
  247. #c.get_command_handler().reqIds()
  248. c.get_command_handler().reqMktData(contract)
  249. for i in range(567,568):
  250. orderID = i
  251. order = makeOrder( 'SELL', i, 'DAY', 'LMT', 1.0, randint(10,20) * 1000)
  252. c.get_command_handler().placeOrder(orderID, contract, order)
  253. sleep(3)
  254. c.get_command_handler().reqOpenOrders()
  255. c.get_command_handler().reqExecutions()
  256. c.get_command_handler().reqPositions()
  257. sleep(8)
  258. c.disconnect()
  259. print 'completed...'
  260. def test3():
  261. c = SampleClient(host, port, 'SampleClient-777')
  262. c.connect()
  263. # m_clientId = 0 # zero means no filtering on this field
  264. # m_acctCode = ""
  265. # m_time = ""
  266. # m_symbol = ""
  267. # m_secType = ""
  268. # m_exchange = ""
  269. # m_side = ""
  270. filter = ExecutionFilterHelper.kv2object({'m_time': '20151104 09:35:00'}, ExecutionFilter)
  271. c.get_command_handler().reqExecutions(filter)
  272. sleep(7)
  273. #"yyyymmdd-hh:mm:ss"
  274. c.disconnect()
  275. def test4():
  276. #global host, port
  277. f = open('/home/larry/l1304/workspace/finopt/data/subscription-nov-HSI.txt')
  278. lns = f.readlines()
  279. cs = map (lambda l: ContractHelper.makeContract(tuple(l.strip('\n').split(','))), lns)
  280. c = SampleClient(host, port, 'SampleClient-777')
  281. c.connect()
  282. #for contract in cs:
  283. #c.get_command_handler().reqMktData(cs[0])
  284. #
  285. # contract = Contract() #
  286. # contract.m_symbol = 'EUR'
  287. # contract.m_currency = 'USD'
  288. # contract.m_secType = 'CASH'
  289. # contract.m_exchange = 'IDEALPRO'
  290. # c.get_command_handler().reqMktData(contract)
  291. #
  292. # contract.m_symbol = 'HSI'
  293. # contract.m_currency = 'HKD'
  294. # contract.m_secType = 'OPT'
  295. # contract.m_exchange = 'HKFE'
  296. # contract.m_strike = 21000
  297. # contract.m_multiplier = 50.0
  298. # contract.m_includeExpired = False
  299. #
  300. # contract.m_right = 'P'
  301. # contract.m_expiry = '20151127'
  302. contract = Contract()
  303. contract.m_symbol = 'GOOG'
  304. contract.m_currency = 'USD'
  305. contract.m_secType = 'STK'
  306. contract.m_exchange = 'SMART'
  307. #contract.m_strike = 58.5
  308. #contract.m_multiplier = 100
  309. #contract.m_includeExpired = False
  310. #contract.m_right = 'P'
  311. #contract.m_expiry = '20151120'
  312. c.get_command_handler().reqMktData(contract)
  313. contract = Contract()
  314. contract.m_symbol = 'EWT'
  315. contract.m_currency = 'USD'
  316. contract.m_secType = 'STK'
  317. contract.m_exchange = 'SMART'
  318. c.get_command_handler().reqMktData(contract)
  319. sleep(1)
  320. c.get_command_handler().gw_req_subscriptions()
  321. sleep(10)
  322. c.disconnect()
  323. def test5():
  324. print '******************************** TEST 5'
  325. c = SampleClient(host, port, 'SampleClient-777')
  326. c.connect()
  327. #c.get_command_handler().reqIds()
  328. c.get_command_handler().gw_req_subscriptions()
  329. c.get_command_handler().reqExecutions()
  330. sleep(8)
  331. c.disconnect()
  332. def test6():
  333. contractTuple = ('HSI', 'FUT', 'HKFE', 'HKD', '20151127', 0, '')
  334. #contractTuple = ('VMW', 'STK', 'SMART', 'USD', '', 0, '')
  335. contract = ContractHelper.makeContract(contractTuple)
  336. oc = OptionsChain('test6')
  337. oc.set_underlying(contract)
  338. oc.set_option_structure(contract, 200, 50, 0.005, 0.003, '20151127')
  339. oc.build_chain(22300, 0.1)
  340. c = SampleClient(host, port, 'SampleClient-777')
  341. c.connect()
  342. #c.get_command_handler().reqIds()
  343. c.get_command_handler().gw_req_subscriptions()
  344. c.get_command_handler().reqMktData(contract)
  345. for ct in oc.get_option_chain():
  346. c.get_command_handler().reqMktData(ct.get_contract())
  347. print ContractHelper.object2kvstring(ct.get_contract())
  348. sleep(8)
  349. c.disconnect()
  350. def test7():
  351. contractTuple = ('VMW', 'STK', 'SMART', 'USD', '', 0, '')
  352. contract = ContractHelper.makeContract(contractTuple)
  353. oc = OptionsChain('t7')
  354. oc.set_underlying(contract)
  355. oc.set_option_structure(contract, 0.5, 100, 0.0012, 0.0328, '20151211')
  356. oc.build_chain(59.3, 0.08, 0.22)
  357. c = SampleClient(host, port, 'SampleClient-777')
  358. c.connect()
  359. #c.get_command_handler().reqIds()
  360. c.get_command_handler().gw_req_subscriptions()
  361. c.get_command_handler().reqMktData(contract)
  362. for ct in oc.get_option_chain():
  363. c.get_command_handler().reqMktData(ct.get_contract())
  364. print ContractHelper.object2kvstring(ct.get_contract())
  365. sleep(3)
  366. c.disconnect()
  367. def test8():
  368. # c = SampleClient(host, port, 'SampleClient-777')
  369. # c.connect()
  370. consumer = KafkaConsumer( *[(v,0) for v in list(TWS_Protocol.oceEvents)] , \
  371. metadata_broker_list=['%s:%s' % (host, port)],\
  372. client_id = 'test8',\
  373. group_id = 'epc.group',\
  374. auto_commit_enable=True,\
  375. consumer_timeout_ms = 2000,\
  376. auto_commit_interval_ms=30 * 1000,\
  377. auto_offset_reset='smallest')
  378. # c.get_command_handler().gw_req_subscriptions()
  379. for message in consumer:
  380. print ("received %s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
  381. message.offset, message.key,
  382. message.value))
  383. print 'here'
  384. # c.disconnect()
  385. def on_analytics(msg):
  386. print msg
  387. kv = json.loads(msg)
  388. print 'x=%s imvol=%0.2f theta=%0.2f' % (kv['contract']['m_strike'], kv['analytics']['imvol'],kv['analytics']['theta'])
  389. def test9():
  390. al = AnalyticsListener(host, port, 'al')
  391. al.registerAll([on_analytics])
  392. al.start()
  393. if __name__ == '__main__':
  394. """
  395. this sample demonstrates how to use SimpleTWSClient
  396. to connect to IB TWS gateway
  397. usage scenarios:
  398. case 1
  399. re-use SimpleTWSClient object
  400. register to listen for all messages
  401. perform processing within the callback function
  402. case 2
  403. inherit SimpleTWSClient class
  404. override event callback functions to use
  405. each function associates with a specific message type
  406. """
  407. if len(sys.argv) != 2:
  408. print("Usage: %s <test case #>" % sys.argv[0])
  409. exit(-1)
  410. choice= sys.argv[1]
  411. logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)-8s %(message)s')
  412. host = 'vsu-01'
  413. port = 9092
  414. print 'choice: %s' % choice
  415. test9()
  416. #test8()
  417. # if choice == '2':
  418. #
  419. # test2()
  420. # else:
  421. #
  422. # test3()