| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787 |
- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
- ##
- # This script is an exmple of using the generated code within IbPy in
- # the same manner as the Java code. We subclass EWrapper and give an
- # instance of the wrapper to an EClientSocket.
- ##
- import sys
- from time import sleep, strftime
- import time, datetime
- import ConfigParser
- from optparse import OptionParser
- import logging
- import thread
- import threading
- import traceback
- import json
- from threading import Lock
- from ib.ext.Contract import Contract
- from ib.ext.EWrapper import EWrapper
- from ib.ext.EClientSocket import EClientSocket
- from ib.ext.ExecutionFilter import ExecutionFilter
- from ib.ext.Execution import Execution
- from ib.ext.OrderState import OrderState
- from ib.ext.Order import Order
- from kafka.client import KafkaClient
- from kafka import KafkaConsumer
- from kafka.producer import SimpleProducer
- from kafka.common import LeaderNotAvailableError
- from misc2.helpers import ContractHelper, OrderHelper, ExecutionFilterHelper
- from comms.ib_heartbeat import IbHeartBeat
- from tws_protocol_helper import TWS_Protocol
- import redis
-
-
-
- class TWS_event_handler(EWrapper):
- TICKER_GAP = 1000
- producer = None
-
- def __init__(self, host, port):
-
- client = KafkaClient('%s:%s' % (host, port))
- self.producer = SimpleProducer(client, async=False)
- logging.info('TWS_event_handler: __init__ Creating kafka client producer at %s:%s' % (host, port))
-
-
- def serialize_vars_to_dict(self, message, mapping, source='IB'):
- def create_kmessage(items):
- d = {}
- for k,v in items:
- #print k, v, type(v)
- #if type(v) in [Contract, Execution, ExecutionFilter, OrderState, Order, CommissionReport]:
- if 'ib.ext.' in str(type(v)):
- d[k] = v.__dict__
- else:
- d[k] = v
-
-
- d['ts'] = time.time()
- d['typeName'] = message
- d['source'] = source
- return d
-
-
- try:
- del(mapping['self'])
- except (KeyError, ):
- pass
- items = list(mapping.items())
- return create_kmessage(items)
-
- def broadcast_event(self, message, mapping, source='IB'):
- try:
- dict = self.serialize_vars_to_dict(message, mapping, source)
- if message == 'gw_subscriptions':
- logging.info('TWS_event_handler: broadcast event: %s [%s]' % (dict['typeName'], dict))
- self.producer.send_messages(message, json.dumps(dict))
- except:
- logging.error('broadcast_event: exception while encoding IB event to client: [%s]' % message)
- logging.error(traceback.format_exc())
-
- #
- # try to broadcast the message a 2nd time
- # no catch if fails again
- if message == 'gw_subscriptions':
- sleep(2)
- logging.info('TWS_event_handler: Retry once broadcasting gw_subscription ' % (dict['typeName'], dict))
- self.producer.send_messages(message, json.dumps(dict))
-
-
-
- def tick_process_message(self, items):
-
- t = {}
- t = items.copy()
- # if the tickerId is in the snapshot range
- # deduct the gap to derive the original tickerId
- # --- check logic in subscription manager
- if (t['tickerId'] >= TWS_event_handler.TICKER_GAP):
- t['tickerId'] = t['tickerId'] - TWS_event_handler.TICKER_GAP
- try:
- del(t['self'])
- except (KeyError, ):
- pass
-
- return t
-
-
-
- def tickPrice(self, tickerId, field, price, canAutoExecute):
-
- self.broadcast_event('tickPrice', self.tick_process_message(vars()))
- def tickSize(self, tickerId, field, size):
-
- self.broadcast_event('tickSize', self.tick_process_message(vars())) #vars())
- def tickOptionComputation(self, tickerId, field, impliedVol, delta, optPrice, pvDividend, gamma, vega, theta, undPrice):
-
- #self.broadcast_event('tickOptionComputation', self.tick_process_message(vars())) #vars())
- pass
- def tickGeneric(self, tickerId, tickType, value):
- #self.broadcast_event('tickGeneric', vars())
- self.broadcast_event('tickGeneric', self.tick_process_message(vars())) #vars())
- def tickString(self, tickerId, tickType, value):
- #self.broadcast_event('tickString', vars())
- self.broadcast_event('tickString', self.tick_process_message(vars())) #vars())
- def tickEFP(self, tickerId, tickType, basisPoints, formattedBasisPoints, impliedFuture, holdDays, futureExpiry, dividendImpact, dividendsToExpiry):
- self.broadcast_event('tickEFP', vars())
- def orderStatus(self, orderId, status, filled, remaining, avgFillPrice, permId, parentId, lastFillPrice, clientId, whyHeId):
- self.broadcast_event('orderStatus', vars())
- def openOrder(self, orderId, contract, order, state):
- self.broadcast_event('openOrder', vars())
- def openOrderEnd(self):
- self.broadcast_event('openOrderEnd', vars())
- def updateAccountValue(self, key, value, currency, accountName):
- self.broadcast_event('updateAccountValue', vars())
- def updatePortfolio(self, contract, position, marketPrice, marketValue, averageCost, unrealizedPNL, realizedPNL, accountName):
- self.broadcast_event('updatePortfolio', vars())
- def updateAccountTime(self, timeStamp):
- self.broadcast_event('updateAccountTime', vars())
- def accountDownloadEnd(self, accountName):
- self.broadcast_event('accountDownloadEnd', vars())
- def nextValidId(self, orderId):
- self.broadcast_event('nextValidId', vars())
- def contractDetails(self, reqId, contractDetails):
- self.broadcast_event('contractDetails', vars())
- def contractDetailsEnd(self, reqId):
- self.broadcast_event('contractDetailsEnd', vars())
- def bondContractDetails(self, reqId, contractDetails):
- self.broadcast_event('bondContractDetails', vars())
- def execDetails(self, reqId, contract, execution):
- self.broadcast_event('execDetails', vars())
- def execDetailsEnd(self, reqId):
- self.broadcast_event('execDetailsEnd', vars())
- def connectionClosed(self):
- self.broadcast_event('connectionClosed', {})
- def error(self, id=None, errorCode=None, errorMsg=None):
- logging.error(self.serialize_vars_to_dict('error', vars()))
- self.broadcast_event('error', vars())
- def error_0(self, strvalue=None):
- logging.error(self.serialize_vars_to_dict('error_0', vars()))
- self.broadcast_event('error_0', vars())
- def error_1(self, id=None, errorCode=None, errorMsg=None):
- logging.error(self.serialize_vars_to_dict('error_1', vars()))
- self.broadcast_event('error_1', vars())
- def updateMktDepth(self, tickerId, position, operation, side, price, size):
- self.broadcast_event('updateMktDepth', vars())
- def updateMktDepthL2(self, tickerId, position, marketMaker, operation, side, price, size):
- self.broadcast_event('updateMktDepthL2', vars())
- def updateNewsBulletin(self, msgId, msgType, message, origExchange):
- self.broadcast_event('updateNewsBulletin', vars())
- def managedAccounts(self, accountsList):
- logging.info(self.serialize_vars_to_dict('managedAccounts', vars()))
- self.broadcast_event('managedAccounts', vars())
- def receiveFA(self, faDataType, xml):
- self.broadcast_event('receiveFA', vars())
- def historicalData(self, reqId, date, open, high, low, close, volume, count, WAP, hasGaps):
- self.broadcast_event('historicalData', vars())
- def scannerParameters(self, xml):
- self.broadcast_event('scannerParameters', vars())
- def scannerData(self, reqId, rank, contractDetails, distance, benchmark, projection, legsStr):
- self.broadcast_event('scannerData', vars())
- def commissionReport(self, commissionReport):
- self.broadcast_event('commissionReport', vars())
- def currentTime(self, time):
- self.broadcast_event('currentTime', vars())
- def deltaNeutralValidation(self, reqId, underComp):
- self.broadcast_event('deltaNeutralValidation', vars())
- def fundamentalData(self, reqId, data):
- self.broadcast_event('fundamentalData', vars())
- def marketDataType(self, reqId, marketDataType):
- self.broadcast_event('marketDataType', vars())
- def realtimeBar(self, reqId, time, open, high, low, close, volume, wap, count):
- self.broadcast_event('realtimeBar', vars())
- def scannerDataEnd(self, reqId):
- self.broadcast_event('scannerDataEnd', vars())
- def tickSnapshotEnd(self, reqId):
- self.broadcast_event('tickSnapshotEnd', vars())
- def position(self, account, contract, pos, avgCost):
- self.broadcast_event('position', vars())
- def positionEnd(self):
- self.broadcast_event('positionEnd', vars())
- def accountSummary(self, reqId, account, tag, value, currency):
- self.broadcast_event('accountSummary', vars())
- def accountSummaryEnd(self, reqId):
- self.broadcast_event('accountSummaryEnd', vars())
- class TWS_gateway(threading.Thread):
- # config
- config = None
- # redis connection
- rs = None
-
- # channel clients' requests to IB/TWS
- cli_request_handler = None
- # manage conID / contracts mapping
- contract_subscription_mgr = None
-
- connection = None
-
- # handler to process incoming IB/TWS messages and echo back to clients
- tws_event_handler = None
-
- # monitor IB connection / heart beat
- ibh = None
- tlock = None
- ib_conn_status = None
- ib_order_transmit = False
-
-
- def __init__(self, host, port, clientId, kafka_host, kafka_port, config):
- super(TWS_gateway, self).__init__()
- self.config = config
- self.host = host
- self.port = port
- self.clientId = clientId
- self.ib_order_transmit = config.get("tws_gateway", "tws_gateway.order_transmit").strip('"').strip("'") if \
- config.get("tws_gateway", "tws_gateway.order_transmit").strip('"').strip("'") <> None\
- else False
-
- logging.info('starting up TWS_gateway...')
- logging.info('Order straight through (no-touch) flag = %s' % ('True' if self.ib_order_transmit == True else 'False'))
-
- logging.info('connecting to Redis server...')
- self.initialize_redis(config)
-
- logging.info('starting up TWS_event_handler...')
- self.tws_event_handler = TWS_event_handler(kafka_host, kafka_port)
-
- logging.info('starting up IB EClientSocket...')
- self.connection = EClientSocket(self.tws_event_handler)
-
-
- logging.info('starting up client request handler - kafkaConsumer...')
- self.cli_request_handler = KafkaConsumer( *[(v,0) for v in list(TWS_Protocol.topicMethods) + list(TWS_Protocol.gatewayMethods) ], \
- metadata_broker_list=['%s:%s' % (kafka_host, kafka_port)],\
- group_id = 'epc.tws_gateway',\
- auto_commit_enable=True,\
- auto_commit_interval_ms=30 * 1000,\
- auto_offset_reset='largest') # discard old ones
-
- self.reset_message_offset()
-
-
-
-
-
- if not self.eConnect():
- logging.error('TWS_gateway: unable to establish connection to IB %s:%d' % (self.host, self.port))
- sys.exit(-1)
- else:
- # start heart beat monitor
- logging.info('starting up IB heart beat monitor...')
- self.tlock = Lock()
- self.ibh = IbHeartBeat(config)
- self.ibh.register_listener([self.on_ib_conn_broken])
- self.ibh.run()
- logging.info('starting up subscription manager...')
- self.initialize_subscription_mgr()
- def initialize_subscription_mgr(self):
-
- self.contract_subscription_mgr = SubscriptionManager(self)
- self.contract_subscription_mgr.register_persistence_callback(self.persist_subscriptions)
- key = self.config.get("tws_gateway", "subscription_manager.subscriptions.redis_key").strip('"').strip("'")
- if self.rs.get(key):
- #contracts = map(lambda x: ContractHelper.kvstring2contract(x), json.loads(self.rs.get(key)))
- contracts = map(lambda x: ContractHelper.kvstring2object(x, Contract), json.loads(self.rs.get(key)))
- self.contract_subscription_mgr.load_subscription(contracts)
-
- def persist_subscriptions(self, contracts):
-
- key = self.config.get("tws_gateway", "subscription_manager.subscriptions.redis_key").strip('"').strip("'")
- #cs = json.dumps(map(lambda x: ContractHelper.contract2kvstring(x) if x <> None else None, contracts))
- cs = json.dumps(map(lambda x: ContractHelper.object2kvstring(x) if x <> None else None, contracts))
- logging.debug('Tws_gateway: updating subscription table to redis store %s' % cs)
- self.rs.set(key, cs)
- def initialize_redis(self, config):
- r_host = config.get("redis", "redis.server").strip('"').strip("'")
- r_port = config.get("redis", "redis.port")
- r_db = config.get("redis", "redis.db")
- self.rs = redis.Redis(r_host, r_port, r_db)
- try:
- self.rs.client_list()
- except redis.ConnectionError:
- logging.error('TWS_gateway: unable to connect to redis server using these settings: %s port:%d db:%d' % (r_host, r_port, r_db))
- logging.error('aborting...')
- sys.exit(-1)
-
- def reset_message_offset(self):
- topic_offsets = map(lambda topic: (topic, self.cli_request_handler.get_partition_offsets(topic, 0, -1, 999)), TWS_Protocol.topicMethods + TWS_Protocol.gatewayMethods)
- topic_offsets = filter(lambda x: x <> None, map(lambda x: (x[0], x[1][1], x[1][0]) if len(x[1]) > 1 else None, topic_offsets))
- logging.info('TWS_gateway set topic offset to the latest point\n%s' % (''.join('%s,%s,%s\n' % (x[0], x[1], x[2]) for x in topic_offsets)))
- # the set_topic_partitions method clears out all previous settings when executed
- # therefore it's not possible to call the function multiple times:
- # self.consumer.set_topic_partitions(('gw_subscriptions', 0, 114,)
- # self.consumer.set_topic_partitions(('tickPrice', 0, 27270,))
- # as the second call will wipe out whatever was done previously
- self.cli_request_handler.set_topic_partitions(*topic_offsets)
-
-
-
- def run(self):
- for message in self.cli_request_handler:
-
- logging.info("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
- message.offset, message.key,
- message.value))
-
- # print ("TWS_gateway: received client request %s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
- # message.offset, message.key,
- # message.value))
-
- getattr(self, message.topic, None)(message.value)
- #self.cli_request_handler.task_done(message)
- def on_ib_conn_broken(self, msg):
- logging.error('TWS_gateway: detected broken IB connection!')
- self.ib_conn_status = 'ERROR'
- self.tlock.acquire() # this function may get called multiple times
- try: # block until another party finishes executing
- if self.ib_conn_status == 'OK': # check status
- return # if already fixed up while waiting, return
-
- self.eDisconnect()
- self.eConnect()
- while not self.connection.isConnected():
- logging.error('TWS_gateway: attempt to reconnect...')
- self.eConnect()
- sleep(2)
-
- # we arrived here because the connection has been restored
- # resubscribe tickers again!
- logging.info('TWS_gateway: IB connection restored...resubscribe contracts')
- self.contract_subscription_mgr.force_resubscription()
-
-
- finally:
- self.tlock.release()
-
-
- def eConnect(self):
- logging.info('TWS_gateway - eConnect. Connecting to %s:%s App Id: %s' % (self.host, self.port, self.clientId))
- self.connection.eConnect(self.host, self.port, self.clientId)
- return self.connection.isConnected()
-
- def reqAccountUpdates(self, value=None):
- logging.info('TWS_gateway - reqAccountUpdates value=%s' % value)
- self.connection.reqAccountUpdates(1, '')
- def reqAccountSummary(self, value):
- logging.info('TWS_gateway - reqAccountSummary value=%s' % value)
-
- vals = map(lambda x: x.encode('ascii') if isinstance(x, unicode) else x, json.loads(value))
- self.connection.reqAccountSummary(vals[0], vals[1], vals[2])
-
- def reqOpenOrders(self, value=None):
- self.connection.reqOpenOrders()
- def reqPositions(self, value=None):
- self.connection.reqPositions()
-
-
- def reqExecutions(self, value):
- try:
- filt = ExecutionFilter() if value == '' else ExecutionFilterHelper.kvstring2object(value, ExecutionFilter)
- self.connection.reqExecutions(0, filt)
- except:
- logging.error(traceback.format_exc())
-
- def reqIds(self, value=None):
- self.connection.reqIds(1)
-
- def reqNewsBulletins(self):
- self.connection.reqNewsBulletins(1)
-
- def cancelNewsBulletins(self):
- self.connection.cancelNewsBulletins()
-
- def setServerLogLevel(self):
- self.connection.setServerLogLevel(3)
-
- def reqAutoOpenOrders(self):
- self.connection.reqAutoOpenOrders(1)
-
- def reqAllOpenOrders(self):
- self.connection.reqAllOpenOrders()
-
- def reqManagedAccts(self):
- self.connection.reqManagedAccts()
-
- def requestFA(self):
- self.connection.requestFA(1)
-
- def reqMktData(self, sm_contract):
- logging.info('TWS Gateway received reqMktData request: %s' % sm_contract)
- try:
- #self.contract_subscription_mgr.reqMktData(ContractHelper.kvstring2contract(sm_contract))
- self.contract_subscription_mgr.reqMktData(ContractHelper.kvstring2object(sm_contract, Contract))
- except:
- pass
-
- def reqHistoricalData(self):
- contract = Contract()
- contract.m_symbol = 'QQQQ'
- contract.m_secType = 'STK'
- contract.m_exchange = 'SMART'
- endtime = strftime('%Y%m%d %H:%M:%S')
- self.connection.reqHistoricalData(
- tickerId=1,
- contract=contract,
- endDateTime=endtime,
- durationStr='1 D',
- barSizeSetting='1 min',
- whatToShow='TRADES',
- useRTH=0,
- formatDate=1)
-
- def placeOrder(self, value=None):
- logging.info('TWS_gateway - placeOrder value=%s' % value)
- try:
- vals = json.loads(value)
- except ValueError:
- logging.error('TWS_gateway - placeOrder Exception %s' % traceback.format_exc())
- return
-
- # c = ContractHelper.kvstring2contract(vals[1])
- o = OrderHelper.kvstring2object(vals[2], Order)
- o.__dict__['transmit'] = self.ib_order_transmit
- # print c.__dict__
- # print o.__dict__
- # print '---------------------'
-
- #self.connection.placeOrder(vals[0], ContractHelper.kvstring2contract(vals[1]), OrderHelper.kvstring2object(vals[2], Order))
- self.connection.placeOrder(vals[0], ContractHelper.kvstring2object(vals[1], Contract), OrderHelper.kvstring2object(vals[2], Order))
- # self.connection.placeOrder(orderId, contract, newOptOrder)
-
- def eDisconnect(self, value=None):
- sleep(2)
- self.connection.eDisconnect()
-
-
- ####################################################################3
- # Gateway commands
- def gw_req_subscriptions(self, value=None):
-
- #subm = map(lambda i: ContractHelper.contract2kvstring(self.contract_subscription_mgr.handle[i]), range(len(self.contract_subscription_mgr.handle)))
- #subm = map(lambda i: ContractHelper.object2kvstring(self.contract_subscription_mgr.handle[i]), range(len(self.contract_subscription_mgr.handle)))
- subm = map(lambda i: (i, ContractHelper.object2kvstring(self.contract_subscription_mgr.handle[i])), range(len(self.contract_subscription_mgr.handle)))
-
- print subm
- if subm:
- self.tws_event_handler.broadcast_event('gw_subscriptions', {'subscriptions': subm}, source='GW')
-
- class SubscriptionManager():
-
- parent = None
- # array list of contracts
- handle = []
- # contract key map to contract ID (index of the handle array)
- tickerId = {}
-
- persist_f = None
-
- def __init__(self, parent=None):
- self.parent = parent
-
-
- def load_subscription(self, contracts):
- for c in contracts:
- self.reqMktData(c)
-
- self.dump()
-
- # returns -1 if not found, else the key id (which could be a zero value)
- def is_subscribed(self, contract):
- #print self.conId.keys()
- ckey = ContractHelper.makeRedisKeyEx(contract)
- if ckey not in self.tickerId.keys():
- return -1
- else:
- # note that 0 can be a key
- # be careful when checking the return values
- # check for true false instead of using implicit comparsion
- return self.tickerId[ckey]
-
- # def reqMktDataxx(self, contract):
- # print '---------------'
- # contractTuple = ('USO', 'STK', 'SMART', 'USD', '', 0.0, '')
- # stkContract = self.makeStkContract(contractTuple)
- # stkContract.m_includeExpired = False
- # self.parent.connection.reqMktData(1, stkContract, '', False)
- #
- # contractTuple = ('IBM', 'STK', 'SMART', 'USD', '', 0.0, '')
- # stkContract = self.makeStkContract(contractTuple)
- # stkContract.m_includeExpired = False
- # print stkContract
- # print stkContract.__dict__
- # self.parent.connection.reqMktData(2, stkContract, '', False)
- #
-
- def reqMktData(self, contract):
-
-
- #logging.info('SubscriptionManager: reqMktData')
-
- def add_subscription(contract):
- self.handle.append(contract)
- newId = len(self.handle) - 1
- self.tickerId[ContractHelper.makeRedisKeyEx(contract)] = newId
-
- return newId
-
- id = self.is_subscribed(contract)
- if id == -1: # not found
- id = add_subscription(contract)
- #
- # the conId must be set to zero when calling TWS reqMktData
- # otherwise TWS will fail to subscribe the contract
- self.parent.connection.reqMktData(id, contract, '', False)
-
-
- # if self.persist_f:
- # logging.debug('SubscriptionManager reqMktData: trigger callback')
- # self.persist_f(self.handle)
-
- logging.info('SubscriptionManager: reqMktData. Requesting market data, id = %d, contract = %s' % (id, ContractHelper.makeRedisKeyEx(contract)))
-
- else:
- self.parent.connection.reqMktData(1000 + id, contract, '', True)
- logging.info('SubscriptionManager: reqMktData: contract already subscribed. Request snapshot = %d, contract = %s' % (id, ContractHelper.makeRedisKeyEx(contract)))
- #self.dump()
-
- # def makeStkContract(self, contractTuple):
- # newContract = Contract()
- # newContract.m_symbol = contractTuple[0]
- # newContract.m_secType = contractTuple[1]
- # newContract.m_exchange = contractTuple[2]
- # newContract.m_currency = contractTuple[3]
- # newContract.m_expiry = contractTuple[4]
- # newContract.m_strike = contractTuple[5]
- # newContract.m_right = contractTuple[6]
- # print 'Contract Values:%s,%s,%s,%s,%s,%s,%s:' % contractTuple
- # return newContract
-
- # use only after a broken connection is restored
- # to re request market data
- def force_resubscription(self):
- # starting from index 1 of the contract list, call reqmktdata, and format the result into a list of tuples
- for i in range(1, len(self.handle)):
- self.parent.connection.reqMktData(i, self.handle[i], '', False)
- logging.info('force_resubscription: %s' % ContractHelper.printContract(self.handle[i]))
-
-
- def itemAt(self, id):
- if id > 0 and id < len(self.handle):
- return self.handle[id]
- return -1
- def dump(self):
-
- logging.info('subscription manager table:---------------------')
- logging.info(''.join('%d: {%s},\n' % (i, ''.join('%s:%s, ' % (k, v) for k, v in self.handle[i].__dict__.iteritems() )\
- if self.handle[i] <> None else '' ) for i in range(len(self.handle)))\
- )
-
- logging.info( ''.join('%s[%d],\n' % (k, v) for k, v in self.conId.iteritems()))
- logging.info( 'Number of instruments subscribed: %d' % len(self.handle))
- logging.info( '------------------------------------------------')
-
- def register_persistence_callback(self, func):
- logging.info('subscription manager: registering callback')
- self.persist_f = func
-
-
-
-
-
- def test_subscription():
- s = SubscriptionManager()
- contractTuple = ('HSI', 'FUT', 'HKFE', 'HKD', '20151029', 0, '')
- c = ContractHelper.makeContract(contractTuple)
- print s.is_subscribed(c)
- print s.add_subscription(c)
- print s.is_subscribed(c)
- s.dump()
-
- fr = open('/home/larry-13.04/workspace/finopt/data/subscription-hsio.txt')
- for l in fr.readlines():
- if l[0] <> '#':
-
- s.add_subscription(ContractHelper.makeContract(tuple([t for t in l.strip('\n').split(',')])))
- fr.close()
- s.dump()
-
- fr = open('/home/larry-13.04/workspace/finopt/data/subscription-hsio.txt')
- for l in fr.readlines():
- if l[0] <> '#':
-
- print s.add_subscription(ContractHelper.makeContract(tuple([t for t in l.strip('\n').split(',')])))
- s.dump()
- contractTuple = ('HSI', 'FUT', 'HKFE', 'HKD', '20151127', 0, '')
- c = ContractHelper.makeContract(contractTuple)
- print s.is_subscribed(c)
- print s.add_subscription(c)
- print s.is_subscribed(c), ContractHelper.printContract(s.itemAt(s.is_subscribed(c)))
-
- print 'test itemAt:'
- contractTuple = ('HSI', 'OPT', 'HKFE', 'HKD', '20151127', 21400, 'C')
- c = ContractHelper.makeContract(contractTuple)
- print s.is_subscribed(c), ContractHelper.printContract(s.itemAt(s.is_subscribed(c)))
-
-
-
-
- if __name__ == '__main__':
-
- if len(sys.argv) != 2:
- print("Usage: %s <config file>" % sys.argv[0])
- exit(-1)
- cfg_path= sys.argv[1:]
- config = ConfigParser.SafeConfigParser()
- if len(config.read(cfg_path)) == 0:
- raise ValueError, "Failed to open config file"
-
-
-
- logconfig = eval(config.get("tws_gateway", "tws_gateway.logconfig").strip('"').strip("'"))
- logconfig['format'] = '%(asctime)s %(levelname)-8s %(message)s'
- logging.basicConfig(**logconfig)
-
-
-
- khost = config.get("epc", "kafka.host").strip('"').strip("'")
- kport = config.get("epc", "kafka.port")
- ihost = config.get("market", "ib.gateway").strip('"').strip("'")
- iport = int(config.get("market", "ib.port"))
- iappid = int(config.get("market", "ib.appid.portfolio"))
-
-
-
-
- #print 'give kafka server some time to register the topics...'
- #sleep(2)
-
- app = TWS_gateway(ihost, iport, iappid, khost, kport, config)
- app.start()
-
- print 'TWS_gateway started.'
- #
-
- # test_subscription()
-
|