| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242 |
- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
- from misc2.helpers import ContractHelper
- import logging
- import traceback
- from ib.ext.EWrapper import EWrapper
-
- class TWS_event_handler(EWrapper):
- TICKER_GAP = 1000
- producer = None
-
- def __init__(self, producer):
- self.producer = producer
-
-
-
- def set_subscription_manager(self, subscription_manager):
- self.subscription_manger = subscription_manager
-
- def broadcast_event(self, message, mapping):
- try:
- dict = self.pre_process_message(message, mapping)
- logging.info('broadcast_event %s:%s' % (message, dict))
- self.producer.send_message(message, self.producer.message_dumps(dict))
- except:
- logging.error('broadcast_event: exception while encoding IB event to client: [%s]' % message)
- logging.error(traceback.format_exc())
-
-
-
-
- def pre_process_message(self, message_name, items):
- t = items.copy()
-
-
- try:
- del(t['self'])
- except (KeyError, ):
- pass
-
- for k,v in t.iteritems():
- #print k, v, type(v)
- #if type(v) in [Contract, Execution, ExecutionFilter, OrderState, Order, CommissionReport]:
- if 'ib.ext.' in str(type(v)):
- t[k] = v.__dict__
- elif 'exceptions.' in str(type(v)):
- t[k] = '%s:%s' % (str(type(v)), str(v))
- else:
- t[k] = v
-
-
-
- return t
-
-
-
- def tickPrice(self, tickerId, field, price, canAutoExecute):
- logging.info('TWS_event_handler:tickPrice. %d<->%s' % (tickerId,self.subscription_manger.get_contract_by_id(tickerId) ))
- self.broadcast_event('tickPrice', {'contract_key': self.subscription_manger.get_contract_by_id(tickerId),
- 'field': field, 'price': price, 'canAutoExecute': canAutoExecute})
- #pass
-
- def tickSize(self, tickerId, field, size):
- logging.info('TWS_event_handler:tickSize. %d<->%s' % (tickerId,self.subscription_manger.get_contract_by_id(tickerId) ))
- self.broadcast_event('tickSize', {'contract_key': self.subscription_manger.get_contract_by_id(tickerId),
- 'field': field, 'size': size})
- #pass
-
-
- def tickOptionComputation(self, tickerId, field, impliedVol, delta, optPrice, pvDividend, gamma, vega, theta, undPrice):
-
- #self.broadcast_event('tickOptionComputation', self.pre_process_message(vars())) #vars())
- pass
- def tickGeneric(self, tickerId, tickType, value):
- #self.broadcast_event('tickGeneric', vars())
- pass
- def tickString(self, tickerId, tickType, value):
- #self.broadcast_event('tickString', vars())
- pass
- def tickEFP(self, tickerId, tickType, basisPoints, formattedBasisPoints, impliedFuture, holdDays, futureExpiry, dividendImpact, dividendsToExpiry):
- #self.broadcast_event('tickEFP', vars())
- pass
- def orderStatus(self, orderId, status, filled, remaining, avgFillPrice, permId, parentId, lastFillPrice, clientId, whyHeId):
- pass
- def openOrder(self, orderId, contract, order, state):
- pass
- def openOrderEnd(self):
- pass
- def updateAccountValue(self, key, value, currency, accountName):
-
- logging.info('TWS_event_handler:updateAccountValue. [%s]:%s' % (key.ljust(40), value))
- self.broadcast_event('updateAccountValue', {'key': key,
- 'value': value, 'currency': currency, 'account':accountName})
-
- def updatePortfolio(self, contract, position, marketPrice, marketValue, averageCost, unrealizedPNL, realizedPNL, accountName):
- contract_key= ContractHelper.makeRedisKeyEx(contract)
- logging.info('TWS_event_handler:updatePortfolio. [%s]:position= %d' % (contract_key, position))
- self.broadcast_event('updatePortfolio', {
- 'contract_key': contract_key,
- 'position': position, 'market_price': marketPrice,
- 'market_value': marketValue, 'average_cost': averageCost,
- 'unrealized_PNL': unrealizedPNL, 'realized_PNL': realizedPNL,
- 'account': accountName
-
- })
-
- def updateAccountTime(self, timeStamp):
-
- self.broadcast_event('updateAccountTime', {'timestamp': timeStamp})
-
- def accountDownloadEnd(self, accountName):
- self.broadcast_event('accountDownloadEnd', {'account':accountName})
-
-
- def nextValidId(self, orderId):
- self.broadcast_event('nextValidId', vars())
- def contractDetails(self, reqId, contractDetails):
- self.broadcast_event('contractDetails', vars())
- def contractDetailsEnd(self, reqId):
- self.broadcast_event('contractDetailsEnd', vars())
- def bondContractDetails(self, reqId, contractDetails):
- self.broadcast_event('bondContractDetails', vars())
- def execDetails(self, reqId, contract, execution):
- self.broadcast_event('execDetails', vars())
- def execDetailsEnd(self, reqId):
- self.broadcast_event('execDetailsEnd', vars())
- def connectionClosed(self):
- self.broadcast_event('connectionClosed', {})
- def error(self, id=None, errorCode=None, errorMsg=None):
- try:
- logging.error(self.pre_process_message('error', vars()))
- self.broadcast_event('error', {'id': id,
- 'errorCode': errorCode, 'errorMsg': errorMsg})
- except:
- pass
- def error_0(self, strvalue=None):
- logging.error(self.pre_process_message('error_0', vars()))
- self.broadcast_event('error_0', vars())
- def error_1(self, id=None, errorCode=None, errorMsg=None):
- logging.error(self.pre_process_message('error_1', vars()))
- self.broadcast_event('error_1', vars())
- def updateMktDepth(self, tickerId, position, operation, side, price, size):
- self.broadcast_event('updateMktDepth', vars())
- def updateMktDepthL2(self, tickerId, position, marketMaker, operation, side, price, size):
- self.broadcast_event('updateMktDepthL2', vars())
- def updateNewsBulletin(self, msgId, msgType, message, origExchange):
- self.broadcast_event('updateNewsBulletin', vars())
- def managedAccounts(self, accountsList):
- logging.info(self.pre_process_message('managedAccounts', vars()))
- self.broadcast_event('managedAccounts', vars())
- def receiveFA(self, faDataType, xml):
- self.broadcast_event('receiveFA', vars())
- def historicalData(self, reqId, date, open, high, low, close, volume, count, WAP, hasGaps):
- self.broadcast_event('historicalData', vars())
- def scannerParameters(self, xml):
- self.broadcast_event('scannerParameters', vars())
- def scannerData(self, reqId, rank, contractDetails, distance, benchmark, projection, legsStr):
- self.broadcast_event('scannerData', vars())
- def commissionReport(self, commissionReport):
- self.broadcast_event('commissionReport', vars())
- def currentTime(self, time):
- self.broadcast_event('currentTime', vars())
- def deltaNeutralValidation(self, reqId, underComp):
- self.broadcast_event('deltaNeutralValidation', vars())
- def fundamentalData(self, reqId, data):
- self.broadcast_event('fundamentalData', vars())
- def marketDataType(self, reqId, marketDataType):
- self.broadcast_event('marketDataType', vars())
- def realtimeBar(self, reqId, time, open, high, low, close, volume, wap, count):
- self.broadcast_event('realtimeBar', vars())
- def scannerDataEnd(self, reqId):
- self.broadcast_event('scannerDataEnd', vars())
- def tickSnapshotEnd(self, reqId):
- self.broadcast_event('tickSnapshotEnd', vars())
- def position(self, account, contract, pos, avgCost):
- contract_key= ContractHelper.makeRedisKeyEx(contract)
- logging.info('TWS_event_handler:position. [%s]:position= %d' % (contract_key, pos))
- self.broadcast_event('position', {
- 'account': account,
- 'contract_key': contract_key,
- 'position': pos, 'average_cost': avgCost
-
- })
- def positionEnd(self):
- self.broadcast_event('positionEnd', {})
- def accountSummary(self, reqId, account, tag, value, currency):
- self.broadcast_event('accountSummary', vars())
- def accountSummaryEnd(self, reqId):
- self.broadcast_event('accountSummaryEnd', vars())
|