tws_event_handler.py 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. from time import strftime
  4. import logging
  5. import traceback
  6. from ib.ext.EWrapper import EWrapper
  7. class TWS_event_handler(EWrapper):
  8. TICKER_GAP = 1000
  9. producer = None
  10. def __init__(self, producer):
  11. self.producer = producer
  12. def broadcast_event(self, message, mapping):
  13. try:
  14. dict = self.tick_process_message(message, mapping)
  15. if message == 'gw_subscriptions' or message == 'gw_subscription_changed':
  16. logging.info('TWS_event_handler: broadcast event: %s [%s]' % (dict['typeName'], dict))
  17. self.producer.send_message(message, self.producer.message_dumps(dict))
  18. except:
  19. logging.error('broadcast_event: exception while encoding IB event to client: [%s]' % message)
  20. logging.error(traceback.format_exc())
  21. def tick_process_message(self, message_name, items):
  22. t = items.copy()
  23. # if the tickerId is in the snapshot range
  24. # deduct the gap to derive the original tickerId
  25. # --- check logic in subscription manager
  26. try:
  27. if (t['tickerId'] >= TWS_event_handler.TICKER_GAP):
  28. t['tickerId'] = t['tickerId'] - TWS_event_handler.TICKER_GAP
  29. except (KeyError, ):
  30. pass
  31. try:
  32. del(t['self'])
  33. except (KeyError, ):
  34. pass
  35. for k,v in t.iteritems():
  36. #print k, v, type(v)
  37. #if type(v) in [Contract, Execution, ExecutionFilter, OrderState, Order, CommissionReport]:
  38. if 'ib.ext.' in str(type(v)):
  39. t[k] = v.__dict__
  40. else:
  41. t[k] = v
  42. return t
  43. def tickPrice(self, tickerId, field, price, canAutoExecute):
  44. self.broadcast_event('tickPrice', vars())
  45. def tickSize(self, tickerId, field, size):
  46. self.broadcast_event('tickSize', vars()) #vars())
  47. def tickOptionComputation(self, tickerId, field, impliedVol, delta, optPrice, pvDividend, gamma, vega, theta, undPrice):
  48. #self.broadcast_event('tickOptionComputation', self.tick_process_message(vars())) #vars())
  49. pass
  50. def tickGeneric(self, tickerId, tickType, value):
  51. self.broadcast_event('tickGeneric', vars())
  52. def tickString(self, tickerId, tickType, value):
  53. self.broadcast_event('tickString', vars())
  54. def tickEFP(self, tickerId, tickType, basisPoints, formattedBasisPoints, impliedFuture, holdDays, futureExpiry, dividendImpact, dividendsToExpiry):
  55. self.broadcast_event('tickEFP', vars())
  56. def orderStatus(self, orderId, status, filled, remaining, avgFillPrice, permId, parentId, lastFillPrice, clientId, whyHeId):
  57. self.broadcast_event('orderStatus', vars())
  58. def openOrder(self, orderId, contract, order, state):
  59. self.broadcast_event('openOrder', vars())
  60. def openOrderEnd(self):
  61. self.broadcast_event('openOrderEnd', vars())
  62. def updateAccountValue(self, key, value, currency, accountName):
  63. self.broadcast_event('updateAccountValue', vars())
  64. def updatePortfolio(self, contract, position, marketPrice, marketValue, averageCost, unrealizedPNL, realizedPNL, accountName):
  65. self.broadcast_event('updatePortfolio', vars())
  66. def updateAccountTime(self, timeStamp):
  67. self.broadcast_event('updateAccountTime', vars())
  68. def accountDownloadEnd(self, accountName):
  69. self.broadcast_event('accountDownloadEnd', vars())
  70. def nextValidId(self, orderId):
  71. self.broadcast_event('nextValidId', vars())
  72. def contractDetails(self, reqId, contractDetails):
  73. self.broadcast_event('contractDetails', vars())
  74. def contractDetailsEnd(self, reqId):
  75. self.broadcast_event('contractDetailsEnd', vars())
  76. def bondContractDetails(self, reqId, contractDetails):
  77. self.broadcast_event('bondContractDetails', vars())
  78. def execDetails(self, reqId, contract, execution):
  79. self.broadcast_event('execDetails', vars())
  80. def execDetailsEnd(self, reqId):
  81. self.broadcast_event('execDetailsEnd', vars())
  82. def connectionClosed(self):
  83. self.broadcast_event('connectionClosed', {})
  84. def error(self, id=None, errorCode=None, errorMsg=None):
  85. try:
  86. logging.error(self.tick_process_message('error', vars()))
  87. self.broadcast_event('error', vars())
  88. except:
  89. pass
  90. def error_0(self, strvalue=None):
  91. logging.error(self.tick_process_message('error_0', vars()))
  92. self.broadcast_event('error_0', vars())
  93. def error_1(self, id=None, errorCode=None, errorMsg=None):
  94. logging.error(self.tick_process_message('error_1', vars()))
  95. self.broadcast_event('error_1', vars())
  96. def updateMktDepth(self, tickerId, position, operation, side, price, size):
  97. self.broadcast_event('updateMktDepth', vars())
  98. def updateMktDepthL2(self, tickerId, position, marketMaker, operation, side, price, size):
  99. self.broadcast_event('updateMktDepthL2', vars())
  100. def updateNewsBulletin(self, msgId, msgType, message, origExchange):
  101. self.broadcast_event('updateNewsBulletin', vars())
  102. def managedAccounts(self, accountsList):
  103. logging.info(self.tick_process_message('managedAccounts', vars()))
  104. self.broadcast_event('managedAccounts', vars())
  105. def receiveFA(self, faDataType, xml):
  106. self.broadcast_event('receiveFA', vars())
  107. def historicalData(self, reqId, date, open, high, low, close, volume, count, WAP, hasGaps):
  108. self.broadcast_event('historicalData', vars())
  109. def scannerParameters(self, xml):
  110. self.broadcast_event('scannerParameters', vars())
  111. def scannerData(self, reqId, rank, contractDetails, distance, benchmark, projection, legsStr):
  112. self.broadcast_event('scannerData', vars())
  113. def commissionReport(self, commissionReport):
  114. self.broadcast_event('commissionReport', vars())
  115. def currentTime(self, time):
  116. self.broadcast_event('currentTime', vars())
  117. def deltaNeutralValidation(self, reqId, underComp):
  118. self.broadcast_event('deltaNeutralValidation', vars())
  119. def fundamentalData(self, reqId, data):
  120. self.broadcast_event('fundamentalData', vars())
  121. def marketDataType(self, reqId, marketDataType):
  122. self.broadcast_event('marketDataType', vars())
  123. def realtimeBar(self, reqId, time, open, high, low, close, volume, wap, count):
  124. self.broadcast_event('realtimeBar', vars())
  125. def scannerDataEnd(self, reqId):
  126. self.broadcast_event('scannerDataEnd', vars())
  127. def tickSnapshotEnd(self, reqId):
  128. self.broadcast_event('tickSnapshotEnd', vars())
  129. def position(self, account, contract, pos, avgCost):
  130. self.broadcast_event('position', vars())
  131. def positionEnd(self):
  132. self.broadcast_event('positionEnd', vars())
  133. def accountSummary(self, reqId, account, tag, value, currency):
  134. self.broadcast_event('accountSummary', vars())
  135. def accountSummaryEnd(self, reqId):
  136. self.broadcast_event('accountSummaryEnd', vars())