|
|
@@ -0,0 +1,787 @@
|
|
|
+#!/usr/bin/env python
|
|
|
+# -*- coding: utf-8 -*-
|
|
|
+
|
|
|
+##
|
|
|
+# This script is an exmple of using the generated code within IbPy in
|
|
|
+# the same manner as the Java code. We subclass EWrapper and give an
|
|
|
+# instance of the wrapper to an EClientSocket.
|
|
|
+##
|
|
|
+
|
|
|
+
|
|
|
+import sys
|
|
|
+from time import sleep, strftime
|
|
|
+import time, datetime
|
|
|
+import ConfigParser
|
|
|
+from optparse import OptionParser
|
|
|
+import logging
|
|
|
+import thread
|
|
|
+import threading
|
|
|
+import traceback
|
|
|
+import json
|
|
|
+from threading import Lock
|
|
|
+from ib.ext.Contract import Contract
|
|
|
+from ib.ext.EWrapper import EWrapper
|
|
|
+from ib.ext.EClientSocket import EClientSocket
|
|
|
+from ib.ext.ExecutionFilter import ExecutionFilter
|
|
|
+from ib.ext.Execution import Execution
|
|
|
+from ib.ext.OrderState import OrderState
|
|
|
+from ib.ext.Order import Order
|
|
|
+
|
|
|
+from kafka.client import KafkaClient
|
|
|
+from kafka import KafkaConsumer
|
|
|
+from kafka.producer import SimpleProducer
|
|
|
+from kafka.common import LeaderNotAvailableError
|
|
|
+
|
|
|
+from misc2.helpers import ContractHelper, OrderHelper, ExecutionFilterHelper
|
|
|
+from comms.ib_heartbeat import IbHeartBeat
|
|
|
+from tws_protocol_helper import TWS_Protocol
|
|
|
+
|
|
|
+import redis
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+class TWS_event_handler(EWrapper):
|
|
|
+
|
|
|
+ TICKER_GAP = 1000
|
|
|
+ producer = None
|
|
|
+
|
|
|
+ def __init__(self, host, port):
|
|
|
+
|
|
|
+ client = KafkaClient('%s:%s' % (host, port))
|
|
|
+ self.producer = SimpleProducer(client, async=False)
|
|
|
+ logging.info('TWS_event_handler: __init__ Creating kafka client producer at %s:%s' % (host, port))
|
|
|
+
|
|
|
+
|
|
|
+ def serialize_vars_to_dict(self, message, mapping, source='IB'):
|
|
|
+ def create_kmessage(items):
|
|
|
+ d = {}
|
|
|
+ for k,v in items:
|
|
|
+ #print k, v, type(v)
|
|
|
+ #if type(v) in [Contract, Execution, ExecutionFilter, OrderState, Order, CommissionReport]:
|
|
|
+ if 'ib.ext.' in str(type(v)):
|
|
|
+ d[k] = v.__dict__
|
|
|
+ else:
|
|
|
+ d[k] = v
|
|
|
+
|
|
|
+
|
|
|
+ d['ts'] = time.time()
|
|
|
+ d['typeName'] = message
|
|
|
+ d['source'] = source
|
|
|
+ return d
|
|
|
+
|
|
|
+
|
|
|
+ try:
|
|
|
+ del(mapping['self'])
|
|
|
+ except (KeyError, ):
|
|
|
+ pass
|
|
|
+ items = list(mapping.items())
|
|
|
+ return create_kmessage(items)
|
|
|
+
|
|
|
+ def broadcast_event(self, message, mapping, source='IB'):
|
|
|
+
|
|
|
+ try:
|
|
|
+ dict = self.serialize_vars_to_dict(message, mapping, source)
|
|
|
+ if message == 'gw_subscriptions':
|
|
|
+ logging.info('TWS_event_handler: broadcast event: %s [%s]' % (dict['typeName'], dict))
|
|
|
+ self.producer.send_messages(message, json.dumps(dict))
|
|
|
+ except:
|
|
|
+ logging.error('broadcast_event: exception while encoding IB event to client: [%s]' % message)
|
|
|
+ logging.error(traceback.format_exc())
|
|
|
+
|
|
|
+ #
|
|
|
+ # try to broadcast the message a 2nd time
|
|
|
+ # no catch if fails again
|
|
|
+ if message == 'gw_subscriptions':
|
|
|
+ sleep(2)
|
|
|
+ logging.info('TWS_event_handler: Retry once broadcasting gw_subscription ' % (dict['typeName'], dict))
|
|
|
+ self.producer.send_messages(message, json.dumps(dict))
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ def tick_process_message(self, items):
|
|
|
+
|
|
|
+ t = {}
|
|
|
+ t = items.copy()
|
|
|
+ # if the tickerId is in the snapshot range
|
|
|
+ # deduct the gap to derive the original tickerId
|
|
|
+ # --- check logic in subscription manager
|
|
|
+ if (t['tickerId'] >= TWS_event_handler.TICKER_GAP):
|
|
|
+ t['tickerId'] = t['tickerId'] - TWS_event_handler.TICKER_GAP
|
|
|
+ try:
|
|
|
+ del(t['self'])
|
|
|
+ except (KeyError, ):
|
|
|
+ pass
|
|
|
+
|
|
|
+ return t
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ def tickPrice(self, tickerId, field, price, canAutoExecute):
|
|
|
+
|
|
|
+ self.broadcast_event('tickPrice', self.tick_process_message(vars()))
|
|
|
+
|
|
|
+ def tickSize(self, tickerId, field, size):
|
|
|
+
|
|
|
+ self.broadcast_event('tickSize', self.tick_process_message(vars())) #vars())
|
|
|
+
|
|
|
+ def tickOptionComputation(self, tickerId, field, impliedVol, delta, optPrice, pvDividend, gamma, vega, theta, undPrice):
|
|
|
+
|
|
|
+ #self.broadcast_event('tickOptionComputation', self.tick_process_message(vars())) #vars())
|
|
|
+ pass
|
|
|
+
|
|
|
+ def tickGeneric(self, tickerId, tickType, value):
|
|
|
+ #self.broadcast_event('tickGeneric', vars())
|
|
|
+ self.broadcast_event('tickGeneric', self.tick_process_message(vars())) #vars())
|
|
|
+
|
|
|
+ def tickString(self, tickerId, tickType, value):
|
|
|
+ #self.broadcast_event('tickString', vars())
|
|
|
+ self.broadcast_event('tickString', self.tick_process_message(vars())) #vars())
|
|
|
+
|
|
|
+ def tickEFP(self, tickerId, tickType, basisPoints, formattedBasisPoints, impliedFuture, holdDays, futureExpiry, dividendImpact, dividendsToExpiry):
|
|
|
+ self.broadcast_event('tickEFP', vars())
|
|
|
+
|
|
|
+ def orderStatus(self, orderId, status, filled, remaining, avgFillPrice, permId, parentId, lastFillPrice, clientId, whyHeId):
|
|
|
+ self.broadcast_event('orderStatus', vars())
|
|
|
+
|
|
|
+ def openOrder(self, orderId, contract, order, state):
|
|
|
+ self.broadcast_event('openOrder', vars())
|
|
|
+
|
|
|
+ def openOrderEnd(self):
|
|
|
+ self.broadcast_event('openOrderEnd', vars())
|
|
|
+
|
|
|
+ def updateAccountValue(self, key, value, currency, accountName):
|
|
|
+ self.broadcast_event('updateAccountValue', vars())
|
|
|
+
|
|
|
+ def updatePortfolio(self, contract, position, marketPrice, marketValue, averageCost, unrealizedPNL, realizedPNL, accountName):
|
|
|
+ self.broadcast_event('updatePortfolio', vars())
|
|
|
+
|
|
|
+ def updateAccountTime(self, timeStamp):
|
|
|
+ self.broadcast_event('updateAccountTime', vars())
|
|
|
+
|
|
|
+ def accountDownloadEnd(self, accountName):
|
|
|
+ self.broadcast_event('accountDownloadEnd', vars())
|
|
|
+
|
|
|
+ def nextValidId(self, orderId):
|
|
|
+ self.broadcast_event('nextValidId', vars())
|
|
|
+
|
|
|
+ def contractDetails(self, reqId, contractDetails):
|
|
|
+ self.broadcast_event('contractDetails', vars())
|
|
|
+
|
|
|
+ def contractDetailsEnd(self, reqId):
|
|
|
+ self.broadcast_event('contractDetailsEnd', vars())
|
|
|
+
|
|
|
+ def bondContractDetails(self, reqId, contractDetails):
|
|
|
+ self.broadcast_event('bondContractDetails', vars())
|
|
|
+
|
|
|
+ def execDetails(self, reqId, contract, execution):
|
|
|
+ self.broadcast_event('execDetails', vars())
|
|
|
+
|
|
|
+ def execDetailsEnd(self, reqId):
|
|
|
+ self.broadcast_event('execDetailsEnd', vars())
|
|
|
+
|
|
|
+ def connectionClosed(self):
|
|
|
+ self.broadcast_event('connectionClosed', {})
|
|
|
+
|
|
|
+ def error(self, id=None, errorCode=None, errorMsg=None):
|
|
|
+ logging.error(self.serialize_vars_to_dict('error', vars()))
|
|
|
+ self.broadcast_event('error', vars())
|
|
|
+
|
|
|
+ def error_0(self, strvalue=None):
|
|
|
+ logging.error(self.serialize_vars_to_dict('error_0', vars()))
|
|
|
+ self.broadcast_event('error_0', vars())
|
|
|
+
|
|
|
+ def error_1(self, id=None, errorCode=None, errorMsg=None):
|
|
|
+ logging.error(self.serialize_vars_to_dict('error_1', vars()))
|
|
|
+ self.broadcast_event('error_1', vars())
|
|
|
+
|
|
|
+ def updateMktDepth(self, tickerId, position, operation, side, price, size):
|
|
|
+ self.broadcast_event('updateMktDepth', vars())
|
|
|
+
|
|
|
+ def updateMktDepthL2(self, tickerId, position, marketMaker, operation, side, price, size):
|
|
|
+ self.broadcast_event('updateMktDepthL2', vars())
|
|
|
+
|
|
|
+ def updateNewsBulletin(self, msgId, msgType, message, origExchange):
|
|
|
+ self.broadcast_event('updateNewsBulletin', vars())
|
|
|
+
|
|
|
+ def managedAccounts(self, accountsList):
|
|
|
+ logging.info(self.serialize_vars_to_dict('managedAccounts', vars()))
|
|
|
+ self.broadcast_event('managedAccounts', vars())
|
|
|
+
|
|
|
+ def receiveFA(self, faDataType, xml):
|
|
|
+ self.broadcast_event('receiveFA', vars())
|
|
|
+
|
|
|
+ def historicalData(self, reqId, date, open, high, low, close, volume, count, WAP, hasGaps):
|
|
|
+ self.broadcast_event('historicalData', vars())
|
|
|
+
|
|
|
+ def scannerParameters(self, xml):
|
|
|
+ self.broadcast_event('scannerParameters', vars())
|
|
|
+
|
|
|
+ def scannerData(self, reqId, rank, contractDetails, distance, benchmark, projection, legsStr):
|
|
|
+ self.broadcast_event('scannerData', vars())
|
|
|
+
|
|
|
+
|
|
|
+ def commissionReport(self, commissionReport):
|
|
|
+ self.broadcast_event('commissionReport', vars())
|
|
|
+
|
|
|
+
|
|
|
+ def currentTime(self, time):
|
|
|
+ self.broadcast_event('currentTime', vars())
|
|
|
+
|
|
|
+ def deltaNeutralValidation(self, reqId, underComp):
|
|
|
+ self.broadcast_event('deltaNeutralValidation', vars())
|
|
|
+
|
|
|
+
|
|
|
+ def fundamentalData(self, reqId, data):
|
|
|
+ self.broadcast_event('fundamentalData', vars())
|
|
|
+
|
|
|
+ def marketDataType(self, reqId, marketDataType):
|
|
|
+ self.broadcast_event('marketDataType', vars())
|
|
|
+
|
|
|
+
|
|
|
+ def realtimeBar(self, reqId, time, open, high, low, close, volume, wap, count):
|
|
|
+ self.broadcast_event('realtimeBar', vars())
|
|
|
+
|
|
|
+ def scannerDataEnd(self, reqId):
|
|
|
+ self.broadcast_event('scannerDataEnd', vars())
|
|
|
+
|
|
|
+
|
|
|
+ def tickSnapshotEnd(self, reqId):
|
|
|
+ self.broadcast_event('tickSnapshotEnd', vars())
|
|
|
+
|
|
|
+
|
|
|
+ def position(self, account, contract, pos, avgCost):
|
|
|
+ self.broadcast_event('position', vars())
|
|
|
+
|
|
|
+ def positionEnd(self):
|
|
|
+ self.broadcast_event('positionEnd', vars())
|
|
|
+
|
|
|
+ def accountSummary(self, reqId, account, tag, value, currency):
|
|
|
+ self.broadcast_event('accountSummary', vars())
|
|
|
+
|
|
|
+ def accountSummaryEnd(self, reqId):
|
|
|
+ self.broadcast_event('accountSummaryEnd', vars())
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+class TWS_gateway(threading.Thread):
|
|
|
+
|
|
|
+ # config
|
|
|
+ config = None
|
|
|
+ # redis connection
|
|
|
+ rs = None
|
|
|
+
|
|
|
+
|
|
|
+ # channel clients' requests to IB/TWS
|
|
|
+ cli_request_handler = None
|
|
|
+
|
|
|
+ # manage conID / contracts mapping
|
|
|
+ contract_subscription_mgr = None
|
|
|
+
|
|
|
+ connection = None
|
|
|
+
|
|
|
+ # handler to process incoming IB/TWS messages and echo back to clients
|
|
|
+ tws_event_handler = None
|
|
|
+
|
|
|
+ # monitor IB connection / heart beat
|
|
|
+ ibh = None
|
|
|
+ tlock = None
|
|
|
+ ib_conn_status = None
|
|
|
+ ib_order_transmit = False
|
|
|
+
|
|
|
+
|
|
|
+ def __init__(self, host, port, clientId, kafka_host, kafka_port, config):
|
|
|
+ super(TWS_gateway, self).__init__()
|
|
|
+ self.config = config
|
|
|
+ self.host = host
|
|
|
+ self.port = port
|
|
|
+ self.clientId = clientId
|
|
|
+ self.ib_order_transmit = config.get("tws_gateway", "tws_gateway.order_transmit").strip('"').strip("'") if \
|
|
|
+ config.get("tws_gateway", "tws_gateway.order_transmit").strip('"').strip("'") <> None\
|
|
|
+ else False
|
|
|
+
|
|
|
+ logging.info('starting up TWS_gateway...')
|
|
|
+ logging.info('Order straight through (no-touch) flag = %s' % ('True' if self.ib_order_transmit == True else 'False'))
|
|
|
+
|
|
|
+ logging.info('connecting to Redis server...')
|
|
|
+ self.initialize_redis(config)
|
|
|
+
|
|
|
+ logging.info('starting up TWS_event_handler...')
|
|
|
+ self.tws_event_handler = TWS_event_handler(kafka_host, kafka_port)
|
|
|
+
|
|
|
+ logging.info('starting up IB EClientSocket...')
|
|
|
+ self.connection = EClientSocket(self.tws_event_handler)
|
|
|
+
|
|
|
+
|
|
|
+ logging.info('starting up client request handler - kafkaConsumer...')
|
|
|
+ self.cli_request_handler = KafkaConsumer( *[(v,0) for v in list(TWS_Protocol.topicMethods) + list(TWS_Protocol.gatewayMethods) ], \
|
|
|
+ metadata_broker_list=['%s:%s' % (kafka_host, kafka_port)],\
|
|
|
+ group_id = 'epc.tws_gateway',\
|
|
|
+ auto_commit_enable=True,\
|
|
|
+ auto_commit_interval_ms=30 * 1000,\
|
|
|
+ auto_offset_reset='largest') # discard old ones
|
|
|
+
|
|
|
+ self.reset_message_offset()
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ if not self.eConnect():
|
|
|
+ logging.error('TWS_gateway: unable to establish connection to IB %s:%d' % (self.host, self.port))
|
|
|
+ sys.exit(-1)
|
|
|
+ else:
|
|
|
+ # start heart beat monitor
|
|
|
+ logging.info('starting up IB heart beat monitor...')
|
|
|
+ self.tlock = Lock()
|
|
|
+ self.ibh = IbHeartBeat(config)
|
|
|
+ self.ibh.register_listener([self.on_ib_conn_broken])
|
|
|
+ self.ibh.run()
|
|
|
+
|
|
|
+
|
|
|
+ logging.info('starting up subscription manager...')
|
|
|
+ self.initialize_subscription_mgr()
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ def initialize_subscription_mgr(self):
|
|
|
+
|
|
|
+ self.contract_subscription_mgr = SubscriptionManager(self)
|
|
|
+ self.contract_subscription_mgr.register_persistence_callback(self.persist_subscriptions)
|
|
|
+ key = self.config.get("tws_gateway", "subscription_manager.subscriptions.redis_key").strip('"').strip("'")
|
|
|
+ if self.rs.get(key):
|
|
|
+ #contracts = map(lambda x: ContractHelper.kvstring2contract(x), json.loads(self.rs.get(key)))
|
|
|
+ contracts = map(lambda x: ContractHelper.kvstring2object(x, Contract), json.loads(self.rs.get(key)))
|
|
|
+ self.contract_subscription_mgr.load_subscription(contracts)
|
|
|
+
|
|
|
+
|
|
|
+ def persist_subscriptions(self, contracts):
|
|
|
+
|
|
|
+ key = self.config.get("tws_gateway", "subscription_manager.subscriptions.redis_key").strip('"').strip("'")
|
|
|
+ #cs = json.dumps(map(lambda x: ContractHelper.contract2kvstring(x) if x <> None else None, contracts))
|
|
|
+ cs = json.dumps(map(lambda x: ContractHelper.object2kvstring(x) if x <> None else None, contracts))
|
|
|
+ logging.debug('Tws_gateway: updating subscription table to redis store %s' % cs)
|
|
|
+ self.rs.set(key, cs)
|
|
|
+
|
|
|
+
|
|
|
+ def initialize_redis(self, config):
|
|
|
+ r_host = config.get("redis", "redis.server").strip('"').strip("'")
|
|
|
+ r_port = config.get("redis", "redis.port")
|
|
|
+ r_db = config.get("redis", "redis.db")
|
|
|
+
|
|
|
+ self.rs = redis.Redis(r_host, r_port, r_db)
|
|
|
+ try:
|
|
|
+ self.rs.client_list()
|
|
|
+ except redis.ConnectionError:
|
|
|
+ logging.error('TWS_gateway: unable to connect to redis server using these settings: %s port:%d db:%d' % (r_host, r_port, r_db))
|
|
|
+ logging.error('aborting...')
|
|
|
+ sys.exit(-1)
|
|
|
+
|
|
|
+
|
|
|
+ def reset_message_offset(self):
|
|
|
+ topic_offsets = map(lambda topic: (topic, self.cli_request_handler.get_partition_offsets(topic, 0, -1, 999)), TWS_Protocol.topicMethods + TWS_Protocol.gatewayMethods)
|
|
|
+ topic_offsets = filter(lambda x: x <> None, map(lambda x: (x[0], x[1][1], x[1][0]) if len(x[1]) > 1 else None, topic_offsets))
|
|
|
+ logging.info('TWS_gateway set topic offset to the latest point\n%s' % (''.join('%s,%s,%s\n' % (x[0], x[1], x[2]) for x in topic_offsets)))
|
|
|
+
|
|
|
+ # the set_topic_partitions method clears out all previous settings when executed
|
|
|
+ # therefore it's not possible to call the function multiple times:
|
|
|
+ # self.consumer.set_topic_partitions(('gw_subscriptions', 0, 114,)
|
|
|
+ # self.consumer.set_topic_partitions(('tickPrice', 0, 27270,))
|
|
|
+ # as the second call will wipe out whatever was done previously
|
|
|
+
|
|
|
+ self.cli_request_handler.set_topic_partitions(*topic_offsets)
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ def run(self):
|
|
|
+
|
|
|
+
|
|
|
+ for message in self.cli_request_handler:
|
|
|
+
|
|
|
+ logging.info("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
|
|
|
+ message.offset, message.key,
|
|
|
+ message.value))
|
|
|
+
|
|
|
+# print ("TWS_gateway: received client request %s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
|
|
|
+# message.offset, message.key,
|
|
|
+# message.value))
|
|
|
+
|
|
|
+
|
|
|
+ getattr(self, message.topic, None)(message.value)
|
|
|
+ #self.cli_request_handler.task_done(message)
|
|
|
+
|
|
|
+
|
|
|
+ def on_ib_conn_broken(self, msg):
|
|
|
+ logging.error('TWS_gateway: detected broken IB connection!')
|
|
|
+ self.ib_conn_status = 'ERROR'
|
|
|
+ self.tlock.acquire() # this function may get called multiple times
|
|
|
+ try: # block until another party finishes executing
|
|
|
+ if self.ib_conn_status == 'OK': # check status
|
|
|
+ return # if already fixed up while waiting, return
|
|
|
+
|
|
|
+ self.eDisconnect()
|
|
|
+ self.eConnect()
|
|
|
+ while not self.connection.isConnected():
|
|
|
+ logging.error('TWS_gateway: attempt to reconnect...')
|
|
|
+ self.eConnect()
|
|
|
+ sleep(2)
|
|
|
+
|
|
|
+ # we arrived here because the connection has been restored
|
|
|
+ # resubscribe tickers again!
|
|
|
+ logging.info('TWS_gateway: IB connection restored...resubscribe contracts')
|
|
|
+ self.contract_subscription_mgr.force_resubscription()
|
|
|
+
|
|
|
+
|
|
|
+ finally:
|
|
|
+ self.tlock.release()
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ def eConnect(self):
|
|
|
+ logging.info('TWS_gateway - eConnect. Connecting to %s:%s App Id: %s' % (self.host, self.port, self.clientId))
|
|
|
+ self.connection.eConnect(self.host, self.port, self.clientId)
|
|
|
+ return self.connection.isConnected()
|
|
|
+
|
|
|
+
|
|
|
+ def reqAccountUpdates(self, value=None):
|
|
|
+ logging.info('TWS_gateway - reqAccountUpdates value=%s' % value)
|
|
|
+ self.connection.reqAccountUpdates(1, '')
|
|
|
+
|
|
|
+ def reqAccountSummary(self, value):
|
|
|
+ logging.info('TWS_gateway - reqAccountSummary value=%s' % value)
|
|
|
+
|
|
|
+ vals = map(lambda x: x.encode('ascii') if isinstance(x, unicode) else x, json.loads(value))
|
|
|
+ self.connection.reqAccountSummary(vals[0], vals[1], vals[2])
|
|
|
+
|
|
|
+ def reqOpenOrders(self, value=None):
|
|
|
+ self.connection.reqOpenOrders()
|
|
|
+
|
|
|
+ def reqPositions(self, value=None):
|
|
|
+ self.connection.reqPositions()
|
|
|
+
|
|
|
+
|
|
|
+ def reqExecutions(self, value):
|
|
|
+ try:
|
|
|
+ filt = ExecutionFilter() if value == '' else ExecutionFilterHelper.kvstring2object(value, ExecutionFilter)
|
|
|
+ self.connection.reqExecutions(0, filt)
|
|
|
+ except:
|
|
|
+ logging.error(traceback.format_exc())
|
|
|
+
|
|
|
+
|
|
|
+ def reqIds(self, value=None):
|
|
|
+ self.connection.reqIds(1)
|
|
|
+
|
|
|
+
|
|
|
+ def reqNewsBulletins(self):
|
|
|
+ self.connection.reqNewsBulletins(1)
|
|
|
+
|
|
|
+
|
|
|
+ def cancelNewsBulletins(self):
|
|
|
+ self.connection.cancelNewsBulletins()
|
|
|
+
|
|
|
+
|
|
|
+ def setServerLogLevel(self):
|
|
|
+ self.connection.setServerLogLevel(3)
|
|
|
+
|
|
|
+
|
|
|
+ def reqAutoOpenOrders(self):
|
|
|
+ self.connection.reqAutoOpenOrders(1)
|
|
|
+
|
|
|
+
|
|
|
+ def reqAllOpenOrders(self):
|
|
|
+ self.connection.reqAllOpenOrders()
|
|
|
+
|
|
|
+
|
|
|
+ def reqManagedAccts(self):
|
|
|
+ self.connection.reqManagedAccts()
|
|
|
+
|
|
|
+
|
|
|
+ def requestFA(self):
|
|
|
+ self.connection.requestFA(1)
|
|
|
+
|
|
|
+
|
|
|
+ def reqMktData(self, sm_contract):
|
|
|
+ logging.info('TWS Gateway received reqMktData request: %s' % sm_contract)
|
|
|
+ try:
|
|
|
+ #self.contract_subscription_mgr.reqMktData(ContractHelper.kvstring2contract(sm_contract))
|
|
|
+ self.contract_subscription_mgr.reqMktData(ContractHelper.kvstring2object(sm_contract, Contract))
|
|
|
+ except:
|
|
|
+ pass
|
|
|
+
|
|
|
+ def reqHistoricalData(self):
|
|
|
+ contract = Contract()
|
|
|
+ contract.m_symbol = 'QQQQ'
|
|
|
+ contract.m_secType = 'STK'
|
|
|
+ contract.m_exchange = 'SMART'
|
|
|
+ endtime = strftime('%Y%m%d %H:%M:%S')
|
|
|
+ self.connection.reqHistoricalData(
|
|
|
+ tickerId=1,
|
|
|
+ contract=contract,
|
|
|
+ endDateTime=endtime,
|
|
|
+ durationStr='1 D',
|
|
|
+ barSizeSetting='1 min',
|
|
|
+ whatToShow='TRADES',
|
|
|
+ useRTH=0,
|
|
|
+ formatDate=1)
|
|
|
+
|
|
|
+
|
|
|
+ def placeOrder(self, value=None):
|
|
|
+ logging.info('TWS_gateway - placeOrder value=%s' % value)
|
|
|
+ try:
|
|
|
+ vals = json.loads(value)
|
|
|
+ except ValueError:
|
|
|
+ logging.error('TWS_gateway - placeOrder Exception %s' % traceback.format_exc())
|
|
|
+ return
|
|
|
+
|
|
|
+# c = ContractHelper.kvstring2contract(vals[1])
|
|
|
+ o = OrderHelper.kvstring2object(vals[2], Order)
|
|
|
+ o.__dict__['transmit'] = self.ib_order_transmit
|
|
|
+# print c.__dict__
|
|
|
+# print o.__dict__
|
|
|
+# print '---------------------'
|
|
|
+
|
|
|
+
|
|
|
+ #self.connection.placeOrder(vals[0], ContractHelper.kvstring2contract(vals[1]), OrderHelper.kvstring2object(vals[2], Order))
|
|
|
+ self.connection.placeOrder(vals[0], ContractHelper.kvstring2object(vals[1], Contract), OrderHelper.kvstring2object(vals[2], Order))
|
|
|
+# self.connection.placeOrder(orderId, contract, newOptOrder)
|
|
|
+
|
|
|
+ def eDisconnect(self, value=None):
|
|
|
+ sleep(2)
|
|
|
+ self.connection.eDisconnect()
|
|
|
+
|
|
|
+
|
|
|
+####################################################################3
|
|
|
+# Gateway commands
|
|
|
+ def gw_req_subscriptions(self, value=None):
|
|
|
+
|
|
|
+ #subm = map(lambda i: ContractHelper.contract2kvstring(self.contract_subscription_mgr.handle[i]), range(len(self.contract_subscription_mgr.handle)))
|
|
|
+ #subm = map(lambda i: ContractHelper.object2kvstring(self.contract_subscription_mgr.handle[i]), range(len(self.contract_subscription_mgr.handle)))
|
|
|
+ subm = map(lambda i: (i, ContractHelper.object2kvstring(self.contract_subscription_mgr.handle[i])), range(len(self.contract_subscription_mgr.handle)))
|
|
|
+
|
|
|
+ print subm
|
|
|
+ if subm:
|
|
|
+ self.tws_event_handler.broadcast_event('gw_subscriptions', {'subscriptions': subm}, source='GW')
|
|
|
+
|
|
|
+class SubscriptionManager():
|
|
|
+
|
|
|
+ parent = None
|
|
|
+ # array list of contracts
|
|
|
+ handle = []
|
|
|
+ # contract key map to contract ID (index of the handle array)
|
|
|
+ tickerId = {}
|
|
|
+
|
|
|
+ persist_f = None
|
|
|
+
|
|
|
+ def __init__(self, parent=None):
|
|
|
+ self.parent = parent
|
|
|
+
|
|
|
+
|
|
|
+ def load_subscription(self, contracts):
|
|
|
+ for c in contracts:
|
|
|
+ self.reqMktData(c)
|
|
|
+
|
|
|
+ self.dump()
|
|
|
+
|
|
|
+ # returns -1 if not found, else the key id (which could be a zero value)
|
|
|
+ def is_subscribed(self, contract):
|
|
|
+ #print self.conId.keys()
|
|
|
+ ckey = ContractHelper.makeRedisKeyEx(contract)
|
|
|
+ if ckey not in self.tickerId.keys():
|
|
|
+ return -1
|
|
|
+ else:
|
|
|
+ # note that 0 can be a key
|
|
|
+ # be careful when checking the return values
|
|
|
+ # check for true false instead of using implicit comparsion
|
|
|
+ return self.tickerId[ckey]
|
|
|
+
|
|
|
+
|
|
|
+# def reqMktDataxx(self, contract):
|
|
|
+# print '---------------'
|
|
|
+# contractTuple = ('USO', 'STK', 'SMART', 'USD', '', 0.0, '')
|
|
|
+# stkContract = self.makeStkContract(contractTuple)
|
|
|
+# stkContract.m_includeExpired = False
|
|
|
+# self.parent.connection.reqMktData(1, stkContract, '', False)
|
|
|
+#
|
|
|
+# contractTuple = ('IBM', 'STK', 'SMART', 'USD', '', 0.0, '')
|
|
|
+# stkContract = self.makeStkContract(contractTuple)
|
|
|
+# stkContract.m_includeExpired = False
|
|
|
+# print stkContract
|
|
|
+# print stkContract.__dict__
|
|
|
+# self.parent.connection.reqMktData(2, stkContract, '', False)
|
|
|
+#
|
|
|
+
|
|
|
+
|
|
|
+ def reqMktData(self, contract):
|
|
|
+
|
|
|
+
|
|
|
+ #logging.info('SubscriptionManager: reqMktData')
|
|
|
+
|
|
|
+ def add_subscription(contract):
|
|
|
+ self.handle.append(contract)
|
|
|
+ newId = len(self.handle) - 1
|
|
|
+ self.tickerId[ContractHelper.makeRedisKeyEx(contract)] = newId
|
|
|
+
|
|
|
+ return newId
|
|
|
+
|
|
|
+ id = self.is_subscribed(contract)
|
|
|
+ if id == -1: # not found
|
|
|
+ id = add_subscription(contract)
|
|
|
+
|
|
|
+ #
|
|
|
+ # the conId must be set to zero when calling TWS reqMktData
|
|
|
+ # otherwise TWS will fail to subscribe the contract
|
|
|
+ self.parent.connection.reqMktData(id, contract, '', False)
|
|
|
+
|
|
|
+
|
|
|
+# if self.persist_f:
|
|
|
+# logging.debug('SubscriptionManager reqMktData: trigger callback')
|
|
|
+# self.persist_f(self.handle)
|
|
|
+
|
|
|
+ logging.info('SubscriptionManager: reqMktData. Requesting market data, id = %d, contract = %s' % (id, ContractHelper.makeRedisKeyEx(contract)))
|
|
|
+
|
|
|
+ else:
|
|
|
+ self.parent.connection.reqMktData(1000 + id, contract, '', True)
|
|
|
+ logging.info('SubscriptionManager: reqMktData: contract already subscribed. Request snapshot = %d, contract = %s' % (id, ContractHelper.makeRedisKeyEx(contract)))
|
|
|
+ #self.dump()
|
|
|
+
|
|
|
+# def makeStkContract(self, contractTuple):
|
|
|
+# newContract = Contract()
|
|
|
+# newContract.m_symbol = contractTuple[0]
|
|
|
+# newContract.m_secType = contractTuple[1]
|
|
|
+# newContract.m_exchange = contractTuple[2]
|
|
|
+# newContract.m_currency = contractTuple[3]
|
|
|
+# newContract.m_expiry = contractTuple[4]
|
|
|
+# newContract.m_strike = contractTuple[5]
|
|
|
+# newContract.m_right = contractTuple[6]
|
|
|
+# print 'Contract Values:%s,%s,%s,%s,%s,%s,%s:' % contractTuple
|
|
|
+# return newContract
|
|
|
+
|
|
|
+ # use only after a broken connection is restored
|
|
|
+ # to re request market data
|
|
|
+ def force_resubscription(self):
|
|
|
+ # starting from index 1 of the contract list, call reqmktdata, and format the result into a list of tuples
|
|
|
+ for i in range(1, len(self.handle)):
|
|
|
+ self.parent.connection.reqMktData(i, self.handle[i], '', False)
|
|
|
+ logging.info('force_resubscription: %s' % ContractHelper.printContract(self.handle[i]))
|
|
|
+
|
|
|
+
|
|
|
+ def itemAt(self, id):
|
|
|
+ if id > 0 and id < len(self.handle):
|
|
|
+ return self.handle[id]
|
|
|
+ return -1
|
|
|
+
|
|
|
+ def dump(self):
|
|
|
+
|
|
|
+ logging.info('subscription manager table:---------------------')
|
|
|
+ logging.info(''.join('%d: {%s},\n' % (i, ''.join('%s:%s, ' % (k, v) for k, v in self.handle[i].__dict__.iteritems() )\
|
|
|
+ if self.handle[i] <> None else '' ) for i in range(len(self.handle)))\
|
|
|
+ )
|
|
|
+
|
|
|
+ logging.info( ''.join('%s[%d],\n' % (k, v) for k, v in self.conId.iteritems()))
|
|
|
+ logging.info( 'Number of instruments subscribed: %d' % len(self.handle))
|
|
|
+ logging.info( '------------------------------------------------')
|
|
|
+
|
|
|
+ def register_persistence_callback(self, func):
|
|
|
+ logging.info('subscription manager: registering callback')
|
|
|
+ self.persist_f = func
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+def test_subscription():
|
|
|
+ s = SubscriptionManager()
|
|
|
+ contractTuple = ('HSI', 'FUT', 'HKFE', 'HKD', '20151029', 0, '')
|
|
|
+ c = ContractHelper.makeContract(contractTuple)
|
|
|
+ print s.is_subscribed(c)
|
|
|
+ print s.add_subscription(c)
|
|
|
+ print s.is_subscribed(c)
|
|
|
+ s.dump()
|
|
|
+
|
|
|
+ fr = open('/home/larry-13.04/workspace/finopt/data/subscription-hsio.txt')
|
|
|
+ for l in fr.readlines():
|
|
|
+ if l[0] <> '#':
|
|
|
+
|
|
|
+ s.add_subscription(ContractHelper.makeContract(tuple([t for t in l.strip('\n').split(',')])))
|
|
|
+ fr.close()
|
|
|
+ s.dump()
|
|
|
+
|
|
|
+ fr = open('/home/larry-13.04/workspace/finopt/data/subscription-hsio.txt')
|
|
|
+ for l in fr.readlines():
|
|
|
+ if l[0] <> '#':
|
|
|
+
|
|
|
+ print s.add_subscription(ContractHelper.makeContract(tuple([t for t in l.strip('\n').split(',')])))
|
|
|
+ s.dump()
|
|
|
+ contractTuple = ('HSI', 'FUT', 'HKFE', 'HKD', '20151127', 0, '')
|
|
|
+ c = ContractHelper.makeContract(contractTuple)
|
|
|
+ print s.is_subscribed(c)
|
|
|
+ print s.add_subscription(c)
|
|
|
+ print s.is_subscribed(c), ContractHelper.printContract(s.itemAt(s.is_subscribed(c)))
|
|
|
+
|
|
|
+ print 'test itemAt:'
|
|
|
+ contractTuple = ('HSI', 'OPT', 'HKFE', 'HKD', '20151127', 21400, 'C')
|
|
|
+ c = ContractHelper.makeContract(contractTuple)
|
|
|
+ print s.is_subscribed(c), ContractHelper.printContract(s.itemAt(s.is_subscribed(c)))
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+if __name__ == '__main__':
|
|
|
+
|
|
|
+ if len(sys.argv) != 2:
|
|
|
+ print("Usage: %s <config file>" % sys.argv[0])
|
|
|
+ exit(-1)
|
|
|
+
|
|
|
+ cfg_path= sys.argv[1:]
|
|
|
+ config = ConfigParser.SafeConfigParser()
|
|
|
+ if len(config.read(cfg_path)) == 0:
|
|
|
+ raise ValueError, "Failed to open config file"
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ logconfig = eval(config.get("tws_gateway", "tws_gateway.logconfig").strip('"').strip("'"))
|
|
|
+ logconfig['format'] = '%(asctime)s %(levelname)-8s %(message)s'
|
|
|
+ logging.basicConfig(**logconfig)
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ khost = config.get("epc", "kafka.host").strip('"').strip("'")
|
|
|
+ kport = config.get("epc", "kafka.port")
|
|
|
+ ihost = config.get("market", "ib.gateway").strip('"').strip("'")
|
|
|
+ iport = int(config.get("market", "ib.port"))
|
|
|
+ iappid = int(config.get("market", "ib.appid.portfolio"))
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ #print 'give kafka server some time to register the topics...'
|
|
|
+ #sleep(2)
|
|
|
+
|
|
|
+ app = TWS_gateway(ihost, iport, iappid, khost, kport, config)
|
|
|
+ app.start()
|
|
|
+
|
|
|
+ print 'TWS_gateway started.'
|
|
|
+#
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+# test_subscription()
|
|
|
+
|