tws_client.py 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. import sys
  4. import json
  5. import logging
  6. import ConfigParser
  7. from time import sleep
  8. import time, datetime
  9. from threading import Lock
  10. #from kafka.client import KafkaClient
  11. from kafka import KafkaProducer
  12. from kafka import KafkaConsumer
  13. #from kafka.producer import SimpleProducer
  14. #from kafka.common import LeaderNotAvailableError, ConsumerTimeout
  15. import threading
  16. from misc2.helpers import ContractHelper, OrderHelper, ExecutionFilterHelper
  17. import uuid
  18. from tws_protocol_helper import TWS_Protocol, Message
  19. class TWS_client_base_app(threading.Thread):
  20. producer = None
  21. consumer = None
  22. command_handler = None
  23. stop_consumer = False
  24. reg_all_callbacks = []
  25. # reg_event_func_map = {}
  26. def __init__(self, host, port, id=None):
  27. super(TWS_client_base_app, self).__init__()
  28. #client = KafkaClient('%s:%s' % (host, port))
  29. #self.producer = SimpleProducer(client, async=False)
  30. self.producer = KafkaProducer(bootstrap_servers='%s:%s' % (host, port))
  31. # consumer_timeout_ms must be set - this allows the consumer an interval to exit from its blocking loop
  32. self.consumer = KafkaConsumer( *[v for v in list(TWS_Protocol.topicEvents) + list(TWS_Protocol.gatewayEvents)] , \
  33. bootstrap_servers=['%s:%s' % (host, port)],\
  34. client_id = str(uuid.uuid1()) if id == None else id,\
  35. group_id = 'epc.group',\
  36. enable_auto_commit=True,\
  37. consumer_timeout_ms = 2000,\
  38. auto_commit_interval_ms=30 * 1000,\
  39. auto_offset_reset='latest') # discard old ones
  40. #self.reset_message_offset()
  41. #self.consumer.set_topic_partitions(('gw_subscriptions', 0, 114,),('tickPrice', 0, 27270,))
  42. self.command_handler= TWS_server_wrapper(self.producer)
  43. def reset_message_offset(self):
  44. # 90 is a magic number or don't care (max_num_offsets)
  45. topic_offsets = map(lambda topic: (topic, self.consumer.get_partition_offsets(topic, 0, -1, 90)), TWS_Protocol.topicEvents + TWS_Protocol.gatewayEvents)
  46. topic_offsets = filter(lambda x: x <> None, map(lambda x: (x[0], x[1][1], max(x[1][0], 0)) if len(x[1]) > 1 else None, topic_offsets))
  47. logging.info("TWS_client_base_app: topic offset dump ------:")
  48. logging.info (topic_offsets)
  49. logging.info('TWS_client_base_app set topic offset to the latest point\n%s' % (''.join('%s,%s,%s\n' % (x[0], x[1], x[2]) for x in topic_offsets)))
  50. # the set_topic_partitions call clears out all previous settings when starts
  51. # therefore it's not possible to do something like this:
  52. # self.consumer.set_topic_partitions(('gw_subscriptions', 0, 114,)
  53. # self.consumer.set_topic_partitions(('tickPrice', 0, 27270,))
  54. # as the second call will wipe out whatever was done previously
  55. self.consumer.set_topic_partitions(*topic_offsets)
  56. def get_producer(self):
  57. return self.producer
  58. def run(self):
  59. # for message in self.consumer:
  60. #
  61. # logging.info("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
  62. # message.offset, message.key,
  63. # message.value))
  64. #
  65. # print ("received %s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
  66. # message.offset, message.key,
  67. # message.value))
  68. #
  69. # #self.on_tickPrice(message.value)
  70. # getattr(self, message.topic, None)(message.value)
  71. # # send message to the reg callback func pointers
  72. # # the message is turned into IB compatible type before firing the callbacks
  73. # [f(self.convertItemsToIBmessage(message.value)) for f in self.reg_all_callbacks]
  74. logging.info ('TWS_client_base_app: consumer_timeout_ms = %d' % self.consumer.config['consumer_timeout_ms'])
  75. # keep running until someone tells us to stop
  76. while self.stop_consumer == False:
  77. #while True:
  78. try:
  79. # the next() function runs an infinite blocking loop
  80. # it will raise a consumertimeout if no message is received after a pre-set interval
  81. message = self.consumer.next()
  82. if message.topic == 'gw_subscription_changed':
  83. logging.info("TWS_client_base_app: %s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
  84. message.offset, message.key,
  85. message.value))
  86. kvmessage = self.convertItemsToIBmessage(message.value)
  87. getattr(self, message.topic, None)(kvmessage)
  88. # send message to the reg callback func pointers
  89. # the message is trasnformed into an IB compatible msg before firing the callbacks
  90. [f(kvmessage) for f in self.reg_all_callbacks]
  91. #self.consumer.task_done(message)
  92. except: # ConsumerTimeout:
  93. logging.info('TWS_client_base_app run: ConsumerTimeout. Check new message in the next round...')
  94. def stop(self):
  95. logging.info('TWS_client_base_app: --------------- stopping consumer')
  96. self.stop_consumer = True
  97. def registerAll(self, funcs):
  98. [self.reg_all_callbacks.append(f) for f in funcs]
  99. # def register(self, event, func):
  100. # if event in TWS_Protocol.topicEvents:
  101. # self.reg_event_func_map[event] = [func] if self.reg_event_func_map[event] is None else self.reg_event_func_map[event].append(func)
  102. # else:
  103. # # raise exception
  104. # pass
  105. def handlemessage(self, msg_name, mapping):
  106. items = list(mapping.items())
  107. items.sort()
  108. # print(('### %s' % (msg_name, )))
  109. # for k, v in items:
  110. # print((' %s:%s' % (k, v)))
  111. print "TWS client base app: handlemessage<%s>: %s" % (msg_name, json.dumps(items))
  112. def ascii_encode_dict(self, data):
  113. ascii_encode = lambda x: x.encode('ascii') if isinstance(x, unicode) else x
  114. return dict(map(ascii_encode, pair) for pair in data.items())
  115. def convertItemsToIBmessage(self, items):
  116. # convert into our version of Message
  117. try:
  118. items = json.loads(items, object_hook=self.ascii_encode_dict)
  119. if 'contract' in items:
  120. items['contract'] = ContractHelper.kv2contract(items['contract'])
  121. del(items['self'])
  122. except (KeyError, ):
  123. #logging.error('convertItemsToIBmessage Exception: %s' % str(items))
  124. pass
  125. return Message(**items)
  126. def get_command_handler(self):
  127. return self.command_handler
  128. def accountDownloadEnd(self, items):
  129. self.handlemessage("accountDownloadEnd", items)
  130. def execDetailsEnd(self, items):
  131. self.handlemessage("execDetailsEnd", items)
  132. def updateAccountTime(self, items):
  133. self.handlemessage("updateAccountTime", items)
  134. def deltaNeutralValidation(self, items):
  135. self.handlemessage("deltaNeutralValidation", items)
  136. def orderStatus(self, items):
  137. self.handlemessage("orderStatus", items)
  138. def updateAccountValue(self, items):
  139. self.handlemessage("updateAccountValue", items)
  140. def historicalData(self, items):
  141. self.handlemessage("historicalData", items)
  142. def openOrderEnd(self, items):
  143. self.handlemessage("openOrderEnd", items)
  144. def updatePortfolio(self, items):
  145. self.handlemessage("updatePortfolio", items)
  146. def managedAccounts(self, items):
  147. self.handlemessage("managedAccounts", items)
  148. def contractDetailsEnd(self, items):
  149. self.handlemessage("contractDetailsEnd", items)
  150. def positionEnd(self, items):
  151. self.handlemessage("positionEnd", items)
  152. def bondContractDetails(self, items):
  153. self.handlemessage("bondContractDetails", items)
  154. def accountSummary(self, items):
  155. self.handlemessage("accountSummary", items)
  156. def updateNewsBulletin(self, items):
  157. self.handlemessage("updateNewsBulletin", items)
  158. def scannerParameters(self, items):
  159. self.handlemessage("scannerParameters", items)
  160. def tickString(self, items):
  161. self.handlemessage("tickString", items)
  162. def accountSummaryEnd(self, items):
  163. self.handlemessage("accountSummaryEnd", items)
  164. def scannerDataEnd(self, items):
  165. self.handlemessage("scannerDataEnd", items)
  166. def commissionReport(self, items):
  167. self.handlemessage("commissionReport", items)
  168. def error(self, items):
  169. self.handlemessage("error", items)
  170. def tickGeneric(self, items):
  171. self.handlemessage("tickGeneric", items)
  172. def tickPrice(self, items):
  173. self.handlemessage("tickPrice", items)
  174. def nextValidId(self, items):
  175. self.handlemessage("nextValidId", items)
  176. def openOrder(self, items):
  177. self.handlemessage("openOrder", items)
  178. def realtimeBar(self, items):
  179. self.handlemessage("realtimeBar", items)
  180. def contractDetails(self, items):
  181. self.handlemessage("contractDetails", items)
  182. def execDetails(self, items):
  183. self.handlemessage("execDetails", items)
  184. def tickOptionComputation(self, items):
  185. self.handlemessage("tickOptionComputation", items)
  186. def updateMktDepth(self, items):
  187. self.handlemessage("updateMktDepth", items)
  188. def scannerData(self, items):
  189. self.handlemessage("scannerData", items)
  190. def currentTime(self, items):
  191. self.handlemessage("currentTime", items)
  192. def error_0(self, items):
  193. self.handlemessage("error_0", items)
  194. def error_1(self, items):
  195. self.handlemessage("error_1", items)
  196. def tickSnapshotEnd(self, items):
  197. self.handlemessage("tickSnapshotEnd", items)
  198. def tickSize(self, items):
  199. self.handlemessage("tickSize", items)
  200. def receiveFA(self, items):
  201. self.handlemessage("receiveFA", items)
  202. def connectionClosed(self, items):
  203. self.handlemessage("connectionClosed", items)
  204. def position(self, items):
  205. self.handlemessage("position", items)
  206. def updateMktDepthL2(self, items):
  207. self.handlemessage("updateMktDepthL2", items)
  208. def fundamentalData(self, items):
  209. self.handlemessage("fundamentalData", items)
  210. def tickEFP(self, items):
  211. self.handlemessage("tickEFP", items)
  212. ###########################################################################
  213. # Gateway respond events
  214. #
  215. #
  216. def gw_subscriptions(self, items):
  217. self.handlemessage("gw_subscriptions", items)
  218. def gw_subscription_changed(self, items):
  219. print '************************'
  220. self.handlemessage("gw_subscription_changed", items)
  221. # def on_tickPrice(self, tickerId, field, price, canAutoExecute):
  222. # self.handlemessage('tickPrice', vars())
  223. #
  224. # def on_tickSize(self, tickerId, field, size):
  225. # self.handlemessage('tickSize', vars())
  226. #
  227. # def on_tickOptionComputation(self, tickerId, field, impliedVol, delta, optPrice, pvDividend, gamma, vega, theta, undPrice):
  228. # self.handlemessage('tickOptionComputation', vars())
  229. #
  230. # def on_tickGeneric(self, tickerId, tickType, value):
  231. # self.handlemessage('tickGeneric', vars())
  232. #
  233. # def on_tickString(self, tickerId, tickType, value):
  234. # self.handlemessage('tickString', vars())
  235. #
  236. # def on_tickEFP(self, tickerId, tickType, basisPoints, formattedBasisPoints, impliedFuture, holdDays, futureExpiry, dividendImpact, dividendsToExpiry):
  237. # self.handlemessage('tickEFP', vars())
  238. #
  239. # def on_orderStatus(self, orderId, status, filled, remaining, avgFillPrice, permId, parentId, lastFillPrice, clientId, whyHeId):
  240. # self.handlemessage('orderStatus', vars())
  241. #
  242. # def on_openOrder(self, orderId, contract, order, state):
  243. # self.handlemessage('openOrder', vars())
  244. #
  245. # def on_openOrderEnd(self):
  246. # self.handlemessage('openOrderEnd', vars())
  247. #
  248. # def on_updateAccountValue(self, key, value, currency, accountName):
  249. # self.handlemessage('updateAccountValue', vars())
  250. #
  251. # def on_updatePortfolio(self, contract, position, marketPrice, marketValue, averageCost, unrealizedPNL, realizedPNL, accountName):
  252. # self.handlemessage('updatePortfolio', vars())
  253. #
  254. # def on_updateAccountTime(self, timeStamp):
  255. # self.handlemessage('updateAccountTime', vars())
  256. #
  257. # def on_accountDownloadEnd(self, accountName):
  258. # self.handlemessage('accountDownloadEnd', vars())
  259. #
  260. # def on_nextValidId(self, orderId):
  261. # self.handlemessage('nextValidId', vars())
  262. #
  263. # def on_contractDetails(self, reqId, contractDetails):
  264. # self.handlemessage('contractDetails', vars())
  265. #
  266. # def on_contractDetailsEnd(self, reqId):
  267. # self.handlemessage('contractDetailsEnd', vars())
  268. #
  269. # def on_bondContractDetails(self, reqId, contractDetails):
  270. # self.handlemessage('bondContractDetails', vars())
  271. #
  272. # def on_execDetails(self, reqId, contract, execution):
  273. # self.handlemessage('execDetails', vars())
  274. #
  275. # def on_execDetailsEnd(self, reqId):
  276. # self.handlemessage('execDetailsEnd', vars())
  277. #
  278. # def on_connectionClosed(self):
  279. # self.handlemessage('connectionClosed', {})
  280. #
  281. # def on_error(self, id=None, errorCode=None, errorMsg=None):
  282. # self.handlemessage('error', vars())
  283. #
  284. # def on_error_0(self, strvalue=None):
  285. # self.handlemessage('error_0', vars())
  286. #
  287. # def on_error_1(self, id=None, errorCode=None, errorMsg=None):
  288. # self.handlemessage('error_1', vars())
  289. #
  290. # def on_updateMktDepth(self, tickerId, position, operation, side, price, size):
  291. # self.handlemessage('updateMktDepth', vars())
  292. #
  293. # def on_updateMktDepthL2(self, tickerId, position, marketMaker, operation, side, price, size):
  294. # self.handlemessage('updateMktDepthL2', vars())
  295. #
  296. # def on_updateNewsBulletin(self, msgId, msgType, message, origExchange):
  297. # self.handlemessage('updateNewsBulletin', vars())
  298. #
  299. # def on_managedAccounts(self, accountsList):
  300. # self.handlemessage('managedAccounts', vars())
  301. #
  302. # def on_receiveFA(self, faDataType, xml):
  303. # self.handlemessage('receiveFA', vars())
  304. #
  305. # def on_historicalData(self, reqId, date, open, high, low, close, volume, count, WAP, hasGaps):
  306. # self.handlemessage('historicalData', vars())
  307. #
  308. # def on_scannerParameters(self, xml):
  309. # self.handlemessage('scannerParameters', vars())
  310. #
  311. # def on_scannerData(self, reqId, rank, contractDetails, distance, benchmark, projection, legsStr):
  312. # self.handlemessage('scannerData', vars())
  313. #
  314. #
  315. # def on_commissionReport(self, commissionReport):
  316. # self.handlemessage('commissionReport', vars())
  317. #
  318. #
  319. # def on_currentTime(self, time):
  320. # self.handlemessage('currentTime', vars())
  321. #
  322. # def on_deltaNeutralValidation(self, reqId, underComp):
  323. # self.handlemessage('deltaNeutralValidation', vars())
  324. #
  325. #
  326. # def on_fundamentalData(self, reqId, data):
  327. # self.handlemessage('fundamentalData', vars())
  328. #
  329. # def on_marketDataType(self, reqId, marketDataType):
  330. # self.handlemessage('marketDataType', vars())
  331. #
  332. #
  333. # def on_realtimeBar(self, reqId, time, open, high, low, close, volume, wap, count):
  334. # self.handlemessage('realtimeBar', vars())
  335. #
  336. # def on_scannerDataEnd(self, reqId):
  337. # self.handlemessage('scannerDataEnd', vars())
  338. #
  339. #
  340. #
  341. # def on_tickSnapshotEnd(self, reqId):
  342. # self.handlemessage('tickSnapshotEnd', vars())
  343. #
  344. #
  345. # def on_position(self, account, contract, pos, avgCost):
  346. # self.handlemessage('position', vars())
  347. #
  348. # def on_positionEnd(self):
  349. # self.handlemessage('positionEnd', vars())
  350. #
  351. # def on_accountSummary(self, reqId, account, tag, value, currency):
  352. # self.handlemessage('accountSummary', vars())
  353. #
  354. # def on_accountSummaryEnd(self, reqId):
  355. # self.handlemessage('accountSummaryEnd', vars())
  356. class TWS_server_wrapper():
  357. producer = None
  358. def __init__(self, producer):
  359. self.producer = producer
  360. def reqOpenOrders(self):
  361. self.post_msg('reqOpenOrders', '')
  362. def reqIds(self):
  363. self.post_msg('reqIds', '')
  364. def reqNewsBulletins(self):
  365. logging.error('reqNewsBulletins: NOT IMPLEMENTED')
  366. def cancelNewsBulletins(self):
  367. logging.error('cancelNewsBulletins: NOT IMPLEMENTED')
  368. def setServerLogLevel(self):
  369. logging.error('setServerLogLevel: NOT IMPLEMENTED')
  370. def reqAutoOpenOrders(self):
  371. logging.error('reqAutoOpenOrders: NOT IMPLEMENTED')
  372. def reqAllOpenOrders(self):
  373. logging.error('reqAllOpenOrders: NOT IMPLEMENTED')
  374. def reqManagedAccts(self):
  375. logging.error('reqManagedAccts: NOT IMPLEMENTED')
  376. def requestFA(self):
  377. logging.error('requestFA: NOT IMPLEMENTED')
  378. def reqPositions(self):
  379. self.post_msg('reqPositions', '')
  380. def reqHistoricalData(self):
  381. logging.error('reqHistoricalData: NOT IMPLEMENTED')
  382. def reqAccountUpdates(self):
  383. self.post_msg('reqAccountUpdates', '1')
  384. def reqExecutions(self, exec_filter=None):
  385. self.post_msg('reqExecutions', ExecutionFilterHelper.object2kvstring(exec_filter) if exec_filter <> None else '')
  386. def reqMktData(self, contract):
  387. #self.post_msg('reqMktData', ContractHelper.contract2kvstring(contract))
  388. self.post_msg('reqMktData', ContractHelper.object2kvstring(contract))
  389. def reqAccountSummary(self, reqId, group, tags):
  390. self.post_msg('reqAccountSummary', json.dumps([reqId, group, tags]))
  391. def placeOrder(self, id, contract, order):
  392. self.post_msg('placeOrder', json.dumps([id, ContractHelper.contract2kvstring(contract), OrderHelper.object2kvstring(order)]))
  393. def post_msg(self, topic, msg):
  394. logging.info('post_msg sending request to gateway: %s[%s]' % (topic,msg))
  395. self.producer.send(topic, msg)
  396. #############################################################33
  397. # Gateway methods
  398. def gw_req_subscriptions(self):
  399. self.post_msg('gw_req_subscriptions', '')
  400. class SimpleTWSClient(TWS_client_base_app):
  401. def __init__(self, host, port, id=None):
  402. super(SimpleTWSClient, self).__init__(host, port, id)
  403. logging.info('SimpleTWSClient client id=%s' % id)
  404. def connect(self):
  405. self.start()
  406. def disconnect(self):
  407. logging.info ('SimpleTWSClient: received disconnect. asking base class consumer to stop...')
  408. self.stop()
  409. if __name__ == '__main__':
  410. if len(sys.argv) != 2:
  411. print("Usage: %s <config file>" % sys.argv[0])
  412. exit(-1)
  413. cfg_path= sys.argv[1:]
  414. config = ConfigParser.SafeConfigParser()
  415. if len(config.read(cfg_path)) == 0:
  416. raise ValueError, "Failed to open config file"
  417. logging.basicConfig(level=logging.INFO,
  418. format='%(asctime)s %(levelname)s %(message)s')
  419. host = config.get("epc", "kafka.host").strip('"').strip("'")
  420. port = config.get("epc", "kafka.port")
  421. e = SimpleTWSClient(host, port)
  422. #e.registerAll([e.on_ib_message])
  423. e.start()
  424. #e.command_handler.reqAccountUpdates()
  425. #e.command_handler.reqExecutions()
  426. #USD,CASH,IDEALPRO,JPY,,0,
  427. contractTuple = ('USD', 'CASH', 'IDEALPRO', 'JPY', '', 0, '')
  428. c = ContractHelper.makeContract(contractTuple)
  429. #e.command_handler.reqMktData(c)
  430. contractTuple = ('USO', 'STK', 'SMART', 'USD', '', 0, '')
  431. c = ContractHelper.makeContract(contractTuple)
  432. #e.command_handler.reqMktData(c)
  433. e.get_command_handler().reqPositions()
  434. # print dummy()
  435. # kwargs = {"arg3": 3, "arg2": "two","arg1":5}
  436. # m = Message(**kwargs)
  437. # print m.items()
  438. # print m.arg3