tws_client.py 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. import sys
  4. import json
  5. import logging
  6. import ConfigParser
  7. from time import sleep
  8. import time, datetime
  9. from threading import Lock
  10. from kafka.client import KafkaClient
  11. from kafka import KafkaConsumer
  12. from kafka.producer import SimpleProducer
  13. from kafka.common import LeaderNotAvailableError, ConsumerTimeout
  14. import threading
  15. from misc2.helpers import ContractHelper, OrderHelper, ExecutionFilterHelper
  16. import uuid
  17. from tws_protocol_helper import TWS_Protocol, Message
  18. class TWS_client_base_app(threading.Thread):
  19. producer = None
  20. consumer = None
  21. command_handler = None
  22. stop_consumer = False
  23. reg_all_callbacks = []
  24. # reg_event_func_map = {}
  25. def __init__(self, host, port, id=None):
  26. super(TWS_client_base_app, self).__init__()
  27. client = KafkaClient('%s:%s' % (host, port))
  28. self.producer = SimpleProducer(client, async=False)
  29. # consumer_timeout_ms must be set - this allows the consumer an interval to exit from its blocking loop
  30. self.consumer = KafkaConsumer( *[(v,0) for v in list(TWS_Protocol.topicEvents) + list(TWS_Protocol.gatewayEvents)] , \
  31. metadata_broker_list=['%s:%s' % (host, port)],\
  32. client_id = str(uuid.uuid1()) if id == None else id,\
  33. group_id = 'epc.group',\
  34. auto_commit_enable=True,\
  35. consumer_timeout_ms = 2000,\
  36. auto_commit_interval_ms=30 * 1000,\
  37. auto_offset_reset='largest') # discard old ones
  38. self.reset_message_offset()
  39. #self.consumer.set_topic_partitions(('gw_subscriptions', 0, 114,),('tickPrice', 0, 27270,))
  40. self.command_handler= TWS_server_wrapper(self.producer)
  41. def reset_message_offset(self):
  42. # 90 is a magic number or don't care (max_num_offsets)
  43. topic_offsets = map(lambda topic: (topic, self.consumer.get_partition_offsets(topic, 0, -1, 90)), TWS_Protocol.topicEvents + TWS_Protocol.gatewayEvents)
  44. topic_offsets = filter(lambda x: x <> None, map(lambda x: (x[0], x[1][1], max(x[1][0], 0)) if len(x[1]) > 1 else None, topic_offsets))
  45. logging.info("TWS_client_base_app: topic offset dump ------:")
  46. logging.info (topic_offsets)
  47. logging.info('TWS_client_base_app set topic offset to the latest point\n%s' % (''.join('%s,%s,%s\n' % (x[0], x[1], x[2]) for x in topic_offsets)))
  48. # the set_topic_partitions call clears out all previous settings when starts
  49. # therefore it's not possible to do something like this:
  50. # self.consumer.set_topic_partitions(('gw_subscriptions', 0, 114,)
  51. # self.consumer.set_topic_partitions(('tickPrice', 0, 27270,))
  52. # as the second call will wipe out whatever was done previously
  53. self.consumer.set_topic_partitions(*topic_offsets)
  54. def get_producer(self):
  55. return self.producer
  56. def run(self):
  57. # for message in self.consumer:
  58. #
  59. # logging.info("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
  60. # message.offset, message.key,
  61. # message.value))
  62. #
  63. # print ("received %s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
  64. # message.offset, message.key,
  65. # message.value))
  66. #
  67. # #self.on_tickPrice(message.value)
  68. # getattr(self, message.topic, None)(message.value)
  69. # # send message to the reg callback func pointers
  70. # # the message is turned into IB compatible type before firing the callbacks
  71. # [f(self.convertItemsToIBmessage(message.value)) for f in self.reg_all_callbacks]
  72. logging.info ('TWS_client_base_app: consumer_timeout_ms = %d' % self.consumer._config['consumer_timeout_ms'])
  73. # keep running until someone tells us to stop
  74. while self.stop_consumer == False:
  75. #while True:
  76. try:
  77. # the next() function runs an infinite blocking loop
  78. # it will raise a consumertimeout if no message is received after a pre-set interval
  79. message = self.consumer.next()
  80. logging.debug("TWS_client_base_app: %s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
  81. message.offset, message.key,
  82. message.value))
  83. kvmessage = self.convertItemsToIBmessage(message.value)
  84. getattr(self, message.topic, None)(kvmessage)
  85. # send message to the reg callback func pointers
  86. # the message is trasnformed into an IB compatible msg before firing the callbacks
  87. [f(kvmessage) for f in self.reg_all_callbacks]
  88. #self.consumer.task_done(message)
  89. except ConsumerTimeout:
  90. logging.info('TWS_client_base_app run: ConsumerTimeout. Check new message in the next round...')
  91. def stop(self):
  92. logging.info('TWS_client_base_app: --------------- stopping consumer')
  93. self.stop_consumer = True
  94. def registerAll(self, funcs):
  95. [self.reg_all_callbacks.append(f) for f in funcs]
  96. # def register(self, event, func):
  97. # if event in TWS_Protocol.topicEvents:
  98. # self.reg_event_func_map[event] = [func] if self.reg_event_func_map[event] is None else self.reg_event_func_map[event].append(func)
  99. # else:
  100. # # raise exception
  101. # pass
  102. def handlemessage(self, msg_name, mapping):
  103. items = list(mapping.items())
  104. items.sort()
  105. print(('### %s' % (msg_name, )))
  106. for k, v in items:
  107. print((' %s:%s' % (k, v)))
  108. def ascii_encode_dict(self, data):
  109. ascii_encode = lambda x: x.encode('ascii') if isinstance(x, unicode) else x
  110. return dict(map(ascii_encode, pair) for pair in data.items())
  111. def convertItemsToIBmessage(self, items):
  112. # convert into our version of Message
  113. try:
  114. items = json.loads(items, object_hook=self.ascii_encode_dict)
  115. if 'contract' in items:
  116. items['contract'] = ContractHelper.kv2contract(items['contract'])
  117. del(items['self'])
  118. except (KeyError, ):
  119. pass
  120. return Message(**items)
  121. def get_command_handler(self):
  122. return self.command_handler
  123. def accountDownloadEnd(self, items):
  124. self.handlemessage("accountDownloadEnd", items)
  125. def execDetailsEnd(self, items):
  126. self.handlemessage("execDetailsEnd", items)
  127. def updateAccountTime(self, items):
  128. self.handlemessage("updateAccountTime", items)
  129. def deltaNeutralValidation(self, items):
  130. self.handlemessage("deltaNeutralValidation", items)
  131. def orderStatus(self, items):
  132. self.handlemessage("orderStatus", items)
  133. def updateAccountValue(self, items):
  134. self.handlemessage("updateAccountValue", items)
  135. def historicalData(self, items):
  136. self.handlemessage("historicalData", items)
  137. def openOrderEnd(self, items):
  138. self.handlemessage("openOrderEnd", items)
  139. def updatePortfolio(self, items):
  140. self.handlemessage("updatePortfolio", items)
  141. def managedAccounts(self, items):
  142. self.handlemessage("managedAccounts", items)
  143. def contractDetailsEnd(self, items):
  144. self.handlemessage("contractDetailsEnd", items)
  145. def positionEnd(self, items):
  146. self.handlemessage("positionEnd", items)
  147. def bondContractDetails(self, items):
  148. self.handlemessage("bondContractDetails", items)
  149. def accountSummary(self, items):
  150. self.handlemessage("accountSummary", items)
  151. def updateNewsBulletin(self, items):
  152. self.handlemessage("updateNewsBulletin", items)
  153. def scannerParameters(self, items):
  154. self.handlemessage("scannerParameters", items)
  155. def tickString(self, items):
  156. self.handlemessage("tickString", items)
  157. def accountSummaryEnd(self, items):
  158. self.handlemessage("accountSummaryEnd", items)
  159. def scannerDataEnd(self, items):
  160. self.handlemessage("scannerDataEnd", items)
  161. def commissionReport(self, items):
  162. self.handlemessage("commissionReport", items)
  163. def error(self, items):
  164. self.handlemessage("error", items)
  165. def tickGeneric(self, items):
  166. self.handlemessage("tickGeneric", items)
  167. def tickPrice(self, items):
  168. self.handlemessage("tickPrice", items)
  169. def nextValidId(self, items):
  170. self.handlemessage("nextValidId", items)
  171. def openOrder(self, items):
  172. self.handlemessage("openOrder", items)
  173. def realtimeBar(self, items):
  174. self.handlemessage("realtimeBar", items)
  175. def contractDetails(self, items):
  176. self.handlemessage("contractDetails", items)
  177. def execDetails(self, items):
  178. self.handlemessage("execDetails", items)
  179. def tickOptionComputation(self, items):
  180. self.handlemessage("tickOptionComputation", items)
  181. def updateMktDepth(self, items):
  182. self.handlemessage("updateMktDepth", items)
  183. def scannerData(self, items):
  184. self.handlemessage("scannerData", items)
  185. def currentTime(self, items):
  186. self.handlemessage("currentTime", items)
  187. def error_0(self, items):
  188. self.handlemessage("error_0", items)
  189. def error_1(self, items):
  190. self.handlemessage("error_1", items)
  191. def tickSnapshotEnd(self, items):
  192. self.handlemessage("tickSnapshotEnd", items)
  193. def tickSize(self, items):
  194. self.handlemessage("tickSize", items)
  195. def receiveFA(self, items):
  196. self.handlemessage("receiveFA", items)
  197. def connectionClosed(self, items):
  198. self.handlemessage("connectionClosed", items)
  199. def position(self, items):
  200. self.handlemessage("position", items)
  201. def updateMktDepthL2(self, items):
  202. self.handlemessage("updateMktDepthL2", items)
  203. def fundamentalData(self, items):
  204. self.handlemessage("fundamentalData", items)
  205. def tickEFP(self, items):
  206. self.handlemessage("tickEFP", items)
  207. ###########################################################################3333
  208. # Gateway respond events
  209. #
  210. #
  211. def gw_subscriptions(self, items):
  212. self.handlemessage("gw_subscriptions", items)
  213. # def on_tickPrice(self, tickerId, field, price, canAutoExecute):
  214. # self.handlemessage('tickPrice', vars())
  215. #
  216. # def on_tickSize(self, tickerId, field, size):
  217. # self.handlemessage('tickSize', vars())
  218. #
  219. # def on_tickOptionComputation(self, tickerId, field, impliedVol, delta, optPrice, pvDividend, gamma, vega, theta, undPrice):
  220. # self.handlemessage('tickOptionComputation', vars())
  221. #
  222. # def on_tickGeneric(self, tickerId, tickType, value):
  223. # self.handlemessage('tickGeneric', vars())
  224. #
  225. # def on_tickString(self, tickerId, tickType, value):
  226. # self.handlemessage('tickString', vars())
  227. #
  228. # def on_tickEFP(self, tickerId, tickType, basisPoints, formattedBasisPoints, impliedFuture, holdDays, futureExpiry, dividendImpact, dividendsToExpiry):
  229. # self.handlemessage('tickEFP', vars())
  230. #
  231. # def on_orderStatus(self, orderId, status, filled, remaining, avgFillPrice, permId, parentId, lastFillPrice, clientId, whyHeId):
  232. # self.handlemessage('orderStatus', vars())
  233. #
  234. # def on_openOrder(self, orderId, contract, order, state):
  235. # self.handlemessage('openOrder', vars())
  236. #
  237. # def on_openOrderEnd(self):
  238. # self.handlemessage('openOrderEnd', vars())
  239. #
  240. # def on_updateAccountValue(self, key, value, currency, accountName):
  241. # self.handlemessage('updateAccountValue', vars())
  242. #
  243. # def on_updatePortfolio(self, contract, position, marketPrice, marketValue, averageCost, unrealizedPNL, realizedPNL, accountName):
  244. # self.handlemessage('updatePortfolio', vars())
  245. #
  246. # def on_updateAccountTime(self, timeStamp):
  247. # self.handlemessage('updateAccountTime', vars())
  248. #
  249. # def on_accountDownloadEnd(self, accountName):
  250. # self.handlemessage('accountDownloadEnd', vars())
  251. #
  252. # def on_nextValidId(self, orderId):
  253. # self.handlemessage('nextValidId', vars())
  254. #
  255. # def on_contractDetails(self, reqId, contractDetails):
  256. # self.handlemessage('contractDetails', vars())
  257. #
  258. # def on_contractDetailsEnd(self, reqId):
  259. # self.handlemessage('contractDetailsEnd', vars())
  260. #
  261. # def on_bondContractDetails(self, reqId, contractDetails):
  262. # self.handlemessage('bondContractDetails', vars())
  263. #
  264. # def on_execDetails(self, reqId, contract, execution):
  265. # self.handlemessage('execDetails', vars())
  266. #
  267. # def on_execDetailsEnd(self, reqId):
  268. # self.handlemessage('execDetailsEnd', vars())
  269. #
  270. # def on_connectionClosed(self):
  271. # self.handlemessage('connectionClosed', {})
  272. #
  273. # def on_error(self, id=None, errorCode=None, errorMsg=None):
  274. # self.handlemessage('error', vars())
  275. #
  276. # def on_error_0(self, strvalue=None):
  277. # self.handlemessage('error_0', vars())
  278. #
  279. # def on_error_1(self, id=None, errorCode=None, errorMsg=None):
  280. # self.handlemessage('error_1', vars())
  281. #
  282. # def on_updateMktDepth(self, tickerId, position, operation, side, price, size):
  283. # self.handlemessage('updateMktDepth', vars())
  284. #
  285. # def on_updateMktDepthL2(self, tickerId, position, marketMaker, operation, side, price, size):
  286. # self.handlemessage('updateMktDepthL2', vars())
  287. #
  288. # def on_updateNewsBulletin(self, msgId, msgType, message, origExchange):
  289. # self.handlemessage('updateNewsBulletin', vars())
  290. #
  291. # def on_managedAccounts(self, accountsList):
  292. # self.handlemessage('managedAccounts', vars())
  293. #
  294. # def on_receiveFA(self, faDataType, xml):
  295. # self.handlemessage('receiveFA', vars())
  296. #
  297. # def on_historicalData(self, reqId, date, open, high, low, close, volume, count, WAP, hasGaps):
  298. # self.handlemessage('historicalData', vars())
  299. #
  300. # def on_scannerParameters(self, xml):
  301. # self.handlemessage('scannerParameters', vars())
  302. #
  303. # def on_scannerData(self, reqId, rank, contractDetails, distance, benchmark, projection, legsStr):
  304. # self.handlemessage('scannerData', vars())
  305. #
  306. #
  307. # def on_commissionReport(self, commissionReport):
  308. # self.handlemessage('commissionReport', vars())
  309. #
  310. #
  311. # def on_currentTime(self, time):
  312. # self.handlemessage('currentTime', vars())
  313. #
  314. # def on_deltaNeutralValidation(self, reqId, underComp):
  315. # self.handlemessage('deltaNeutralValidation', vars())
  316. #
  317. #
  318. # def on_fundamentalData(self, reqId, data):
  319. # self.handlemessage('fundamentalData', vars())
  320. #
  321. # def on_marketDataType(self, reqId, marketDataType):
  322. # self.handlemessage('marketDataType', vars())
  323. #
  324. #
  325. # def on_realtimeBar(self, reqId, time, open, high, low, close, volume, wap, count):
  326. # self.handlemessage('realtimeBar', vars())
  327. #
  328. # def on_scannerDataEnd(self, reqId):
  329. # self.handlemessage('scannerDataEnd', vars())
  330. #
  331. #
  332. #
  333. # def on_tickSnapshotEnd(self, reqId):
  334. # self.handlemessage('tickSnapshotEnd', vars())
  335. #
  336. #
  337. # def on_position(self, account, contract, pos, avgCost):
  338. # self.handlemessage('position', vars())
  339. #
  340. # def on_positionEnd(self):
  341. # self.handlemessage('positionEnd', vars())
  342. #
  343. # def on_accountSummary(self, reqId, account, tag, value, currency):
  344. # self.handlemessage('accountSummary', vars())
  345. #
  346. # def on_accountSummaryEnd(self, reqId):
  347. # self.handlemessage('accountSummaryEnd', vars())
  348. class TWS_server_wrapper():
  349. producer = None
  350. def __init__(self, producer):
  351. self.producer = producer
  352. def reqOpenOrders(self):
  353. self.post_msg('reqOpenOrders', '')
  354. def reqIds(self):
  355. self.post_msg('reqIds', '')
  356. def reqNewsBulletins(self):
  357. logging.error('reqNewsBulletins: NOT IMPLEMENTED')
  358. def cancelNewsBulletins(self):
  359. logging.error('cancelNewsBulletins: NOT IMPLEMENTED')
  360. def setServerLogLevel(self):
  361. logging.error('setServerLogLevel: NOT IMPLEMENTED')
  362. def reqAutoOpenOrders(self):
  363. logging.error('reqAutoOpenOrders: NOT IMPLEMENTED')
  364. def reqAllOpenOrders(self):
  365. logging.error('reqAllOpenOrders: NOT IMPLEMENTED')
  366. def reqManagedAccts(self):
  367. logging.error('reqManagedAccts: NOT IMPLEMENTED')
  368. def requestFA(self):
  369. logging.error('requestFA: NOT IMPLEMENTED')
  370. def reqPositions(self):
  371. self.post_msg('reqPositions', '')
  372. def reqHistoricalData(self):
  373. logging.error('reqHistoricalData: NOT IMPLEMENTED')
  374. def reqAccountUpdates(self):
  375. self.post_msg('reqAccountUpdates', '1')
  376. def reqExecutions(self, exec_filter=None):
  377. self.post_msg('reqExecutions', ExecutionFilterHelper.object2kvstring(exec_filter) if exec_filter <> None else '')
  378. def reqMktData(self, contract):
  379. #self.post_msg('reqMktData', ContractHelper.contract2kvstring(contract))
  380. self.post_msg('reqMktData', ContractHelper.object2kvstring(contract))
  381. def reqAccountSummary(self, reqId, group, tags):
  382. self.post_msg('reqAccountSummary', json.dumps([reqId, group, tags]))
  383. def placeOrder(self, id, contract, order):
  384. self.post_msg('placeOrder', json.dumps([id, ContractHelper.contract2kvstring(contract), OrderHelper.object2kvstring(order)]))
  385. def post_msg(self, topic, msg):
  386. logging.info('post_msg sending request to gateway: %s[%s]' % (topic,msg))
  387. self.producer.send_messages(topic, msg)
  388. #############################################################33
  389. # Gateway methods
  390. def gw_req_subscriptions(self):
  391. self.post_msg('gw_req_subscriptions', '')
  392. class SimpleTWSClient(TWS_client_base_app):
  393. def __init__(self, host, port, id=None):
  394. super(SimpleTWSClient, self).__init__(host, port, id)
  395. logging.info('SimpleTWSClient client id=%s' % id)
  396. def connect(self):
  397. self.start()
  398. def disconnect(self):
  399. logging.info ('SimpleTWSClient: received disconnect. asking base class consumer to stop...')
  400. self.stop()
  401. if __name__ == '__main__':
  402. if len(sys.argv) != 2:
  403. print("Usage: %s <config file>" % sys.argv[0])
  404. exit(-1)
  405. cfg_path= sys.argv[1:]
  406. config = ConfigParser.SafeConfigParser()
  407. if len(config.read(cfg_path)) == 0:
  408. raise ValueError, "Failed to open config file"
  409. logging.basicConfig(level=logging.INFO,
  410. format='%(asctime)s %(levelname)s %(message)s')
  411. host = config.get("epc", "kafka.host").strip('"').strip("'")
  412. port = config.get("epc", "kafka.port")
  413. e = SimpleTWSClient(host, port)
  414. #e.registerAll([e.on_ib_message])
  415. e.start()
  416. #e.command_handler.reqAccountUpdates()
  417. #e.command_handler.reqExecutions()
  418. #USD,CASH,IDEALPRO,JPY,,0,
  419. contractTuple = ('USD', 'CASH', 'IDEALPRO', 'JPY', '', 0, '')
  420. c = ContractHelper.makeContract(contractTuple)
  421. #e.command_handler.reqMktData(c)
  422. contractTuple = ('USO', 'STK', 'SMART', 'USD', '', 0, '')
  423. c = ContractHelper.makeContract(contractTuple)
  424. #e.command_handler.reqMktData(c)
  425. e.get_command_handler().reqPositions()
  426. # print dummy()
  427. # kwargs = {"arg3": 3, "arg2": "two","arg1":5}
  428. # m = Message(**kwargs)
  429. # print m.items()
  430. # print m.arg3