|
@@ -2,6 +2,7 @@
|
|
|
# -*- coding: utf-8 -*-
|
|
# -*- coding: utf-8 -*-
|
|
|
|
|
|
|
|
import sys
|
|
import sys
|
|
|
|
|
+import copy
|
|
|
from time import sleep, strftime
|
|
from time import sleep, strftime
|
|
|
import time, datetime
|
|
import time, datetime
|
|
|
import ConfigParser
|
|
import ConfigParser
|
|
@@ -51,36 +52,12 @@ class TWS_event_handler(EWrapper):
|
|
|
self.producer = producer
|
|
self.producer = producer
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
+
|
|
|
|
|
|
|
|
- def serialize_vars_to_dict(self, message, mapping, source='IB'):
|
|
|
|
|
- def create_kmessage(items):
|
|
|
|
|
- d = {}
|
|
|
|
|
- for k,v in items:
|
|
|
|
|
- #print k, v, type(v)
|
|
|
|
|
- #if type(v) in [Contract, Execution, ExecutionFilter, OrderState, Order, CommissionReport]:
|
|
|
|
|
- if 'ib.ext.' in str(type(v)):
|
|
|
|
|
- d[k] = v.__dict__
|
|
|
|
|
- else:
|
|
|
|
|
- d[k] = v
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
- d['ts'] = time.time()
|
|
|
|
|
- d['typeName'] = message
|
|
|
|
|
- d['source'] = source
|
|
|
|
|
- return d
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
- try:
|
|
|
|
|
- del(mapping['self'])
|
|
|
|
|
- except (KeyError, ):
|
|
|
|
|
- pass
|
|
|
|
|
- items = list(mapping.items())
|
|
|
|
|
- return create_kmessage(items)
|
|
|
|
|
-
|
|
|
|
|
- def broadcast_event(self, message, mapping, source='IB'):
|
|
|
|
|
|
|
+ def broadcast_event(self, message, mapping):
|
|
|
|
|
|
|
|
try:
|
|
try:
|
|
|
- dict = self.tick_process_message(message, mapping, source)
|
|
|
|
|
|
|
+ dict = self.tick_process_message(message, mapping)
|
|
|
if message == 'gw_subscriptions' or message == 'gw_subscription_changed':
|
|
if message == 'gw_subscriptions' or message == 'gw_subscription_changed':
|
|
|
logging.info('TWS_event_handler: broadcast event: %s [%s]' % (dict['typeName'], dict))
|
|
logging.info('TWS_event_handler: broadcast event: %s [%s]' % (dict['typeName'], dict))
|
|
|
self.producer.send(message, self.producer.message_dumps(dict))
|
|
self.producer.send(message, self.producer.message_dumps(dict))
|
|
@@ -92,7 +69,7 @@ class TWS_event_handler(EWrapper):
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
- def tick_process_message(self, message_name, items, source):
|
|
|
|
|
|
|
+ def tick_process_message(self, message_name, items):
|
|
|
|
|
|
|
|
t = {}
|
|
t = {}
|
|
|
t = items.copy()
|
|
t = items.copy()
|
|
@@ -116,10 +93,6 @@ class TWS_event_handler(EWrapper):
|
|
|
t[k] = v
|
|
t[k] = v
|
|
|
|
|
|
|
|
|
|
|
|
|
- t['ts'] = time.time()
|
|
|
|
|
- t['typeName'] = message_name
|
|
|
|
|
- t['source'] = source
|
|
|
|
|
-
|
|
|
|
|
|
|
|
|
|
return t
|
|
return t
|
|
|
|
|
|
|
@@ -139,12 +112,10 @@ class TWS_event_handler(EWrapper):
|
|
|
pass
|
|
pass
|
|
|
|
|
|
|
|
def tickGeneric(self, tickerId, tickType, value):
|
|
def tickGeneric(self, tickerId, tickType, value):
|
|
|
- #self.broadcast_event('tickGeneric', vars())
|
|
|
|
|
- self.broadcast_event('tickGeneric', vars()) #vars())
|
|
|
|
|
|
|
+ self.broadcast_event('tickGeneric', vars())
|
|
|
|
|
|
|
|
def tickString(self, tickerId, tickType, value):
|
|
def tickString(self, tickerId, tickType, value):
|
|
|
- #self.broadcast_event('tickString', vars())
|
|
|
|
|
- self.broadcast_event('tickString', vars()) #vars())
|
|
|
|
|
|
|
+ self.broadcast_event('tickString', vars())
|
|
|
|
|
|
|
|
def tickEFP(self, tickerId, tickType, basisPoints, formattedBasisPoints, impliedFuture, holdDays, futureExpiry, dividendImpact, dividendsToExpiry):
|
|
def tickEFP(self, tickerId, tickType, basisPoints, formattedBasisPoints, impliedFuture, holdDays, futureExpiry, dividendImpact, dividendsToExpiry):
|
|
|
self.broadcast_event('tickEFP', vars())
|
|
self.broadcast_event('tickEFP', vars())
|
|
@@ -274,22 +245,6 @@ class TWS_event_handler(EWrapper):
|
|
|
|
|
|
|
|
class TWS_gateway(threading.Thread):
|
|
class TWS_gateway(threading.Thread):
|
|
|
|
|
|
|
|
- # config
|
|
|
|
|
- config = None
|
|
|
|
|
- # redis connection
|
|
|
|
|
- rs = None
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
- # channel clients' requests to IB/TWS
|
|
|
|
|
- gw_message_handler = None
|
|
|
|
|
-
|
|
|
|
|
- # manage conID / contracts mapping
|
|
|
|
|
- contract_subscription_mgr = None
|
|
|
|
|
-
|
|
|
|
|
- connection = None
|
|
|
|
|
-
|
|
|
|
|
- # handler to process incoming IB/TWS messages and echo back to clients
|
|
|
|
|
- tws_event_handler = None
|
|
|
|
|
|
|
|
|
|
# monitor IB connection / heart beat
|
|
# monitor IB connection / heart beat
|
|
|
ibh = None
|
|
ibh = None
|
|
@@ -298,50 +253,59 @@ class TWS_gateway(threading.Thread):
|
|
|
ib_order_transmit = False
|
|
ib_order_transmit = False
|
|
|
|
|
|
|
|
|
|
|
|
|
- def __init__(self, host, port, clientId, kafka_host, kafka_port, config):
|
|
|
|
|
|
|
+ def __init__(self, kwargs):
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+ '''
|
|
|
|
|
+ kwargs
|
|
|
|
|
+
|
|
|
|
|
+ 'name'
|
|
|
|
|
+ 'bootstrap_host'
|
|
|
|
|
+ 'bootstrap_port'
|
|
|
|
|
+ 'redis_host'
|
|
|
|
|
+ 'redis_port'
|
|
|
|
|
+ 'redis_db'
|
|
|
|
|
+ 'group_id'
|
|
|
|
|
+ 'session_timeout_ms'
|
|
|
|
|
+ 'topics'
|
|
|
|
|
+ 'clear_offsets'
|
|
|
|
|
+ 'order_transmit
|
|
|
|
|
+
|
|
|
|
|
+ '''
|
|
|
|
|
+
|
|
|
super(TWS_gateway, self).__init__()
|
|
super(TWS_gateway, self).__init__()
|
|
|
- self.config = config
|
|
|
|
|
- self.host = host
|
|
|
|
|
- self.port = port
|
|
|
|
|
- self.clientId = clientId
|
|
|
|
|
- self.ib_order_transmit = config.get("tws_gateway", "tws_gateway.order_transmit").strip('"').strip("'") if \
|
|
|
|
|
- config.get("tws_gateway", "tws_gateway.order_transmit").strip('"').strip("'") <> None\
|
|
|
|
|
- else False
|
|
|
|
|
|
|
+ self.kwargs = copy.copy(kwargs)
|
|
|
|
|
+ self.ib_order_transmit = self.kwargs['order_transmit']
|
|
|
|
|
|
|
|
logging.info('starting up TWS_gateway...')
|
|
logging.info('starting up TWS_gateway...')
|
|
|
logging.info('Order straight through (no-touch) flag = %s' % ('True' if self.ib_order_transmit == True else 'False'))
|
|
logging.info('Order straight through (no-touch) flag = %s' % ('True' if self.ib_order_transmit == True else 'False'))
|
|
|
|
|
|
|
|
- logging.info('connecting to Redis server...')
|
|
|
|
|
- self.initialize_redis(config)
|
|
|
|
|
|
|
+
|
|
|
|
|
+ '''
|
|
|
|
|
+ TWS_gateway start up sequence
|
|
|
|
|
+
|
|
|
|
|
+ 1. establish redis connection
|
|
|
|
|
+ 2. initialize prosumer instance
|
|
|
|
|
+ 3. establish TWS gateway connectivity
|
|
|
|
|
+
|
|
|
|
|
+ 4. initialize listeners: ClientRequestHandler and SubscriptionManager
|
|
|
|
|
+ 5. start the prosumer
|
|
|
|
|
|
|
|
|
|
+ '''
|
|
|
|
|
|
|
|
- logging.info('starting up gateway message handler - kafka Prosumer...')
|
|
|
|
|
- client_requests = list(TWS_Protocol.topicMethods) + list(TWS_Protocol.gatewayMethods)
|
|
|
|
|
- self.gw_message_handler = Prosumer(name='tws_gw_prosumer', kwargs={'bootstrap_host':'localhost', 'bootstrap_port':9092,
|
|
|
|
|
- 'redis_host':'localhost', 'redis_port':6379, 'redis_db':0,
|
|
|
|
|
- 'group_id': 'groupA', 'session_timeout_ms':10000,
|
|
|
|
|
- 'topics': client_requests, 'clear_offsets' : 0})
|
|
|
|
|
|
|
|
|
|
- logging.info('register listeners for client requests')
|
|
|
|
|
|
|
+ logging.info('establishing redis connection...')
|
|
|
|
|
+ self.initialize_redis(config)
|
|
|
|
|
+
|
|
|
|
|
|
|
|
logging.info('starting up TWS_event_handler...')
|
|
logging.info('starting up TWS_event_handler...')
|
|
|
|
|
|
|
|
self.tws_event_handler = TWS_event_handler(self.gw_message_handler)
|
|
self.tws_event_handler = TWS_event_handler(self.gw_message_handler)
|
|
|
logging.info('starting up IB EClientSocket...')
|
|
logging.info('starting up IB EClientSocket...')
|
|
|
- self.connection = EClientSocket(self.tws_event_handler)
|
|
|
|
|
-
|
|
|
|
|
|
|
+ self.tws_connection = EClientSocket(self.tws_event_handler)
|
|
|
|
|
|
|
|
-
|
|
|
|
|
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
|
|
+ logging.info('establishing TWS gateway connectivity...')
|
|
|
if not self.eConnect():
|
|
if not self.eConnect():
|
|
|
logging.error('TWS_gateway: unable to establish connection to IB %s:%d' % (self.host, self.port))
|
|
logging.error('TWS_gateway: unable to establish connection to IB %s:%d' % (self.host, self.port))
|
|
|
sys.exit(-1)
|
|
sys.exit(-1)
|
|
@@ -354,15 +318,30 @@ class TWS_gateway(threading.Thread):
|
|
|
self.ibh.run()
|
|
self.ibh.run()
|
|
|
|
|
|
|
|
|
|
|
|
|
- logging.info('starting up subscription manager...')
|
|
|
|
|
|
|
+ logging.info('starting up gateway message handler - kafka Prosumer...')
|
|
|
|
|
+ client_requests = list(TWS_Protocol.topicMethods) + list(TWS_Protocol.gatewayMethods)
|
|
|
|
|
+ self.gw_message_handler = Prosumer(name='tws_gw_prosumer', kwargs={'bootstrap_host':'localhost', 'bootstrap_port':9092,
|
|
|
|
|
+ 'redis_host':'localhost', 'redis_port':6379, 'redis_db':0,
|
|
|
|
|
+ 'group_id': 'groupA', 'session_timeout_ms':10000,
|
|
|
|
|
+ 'topics': client_requests, 'clear_offsets' : 0})
|
|
|
|
|
+
|
|
|
|
|
+ logging.info('instantiating listeners...cli_req_handler')
|
|
|
|
|
+ self.cli_req_handler = ClientRequestHandler('client_request_handler', self.gw_message_handler)
|
|
|
|
|
+ logging.info('instantiating listeners subscription manager...')
|
|
|
self.initialize_subscription_mgr()
|
|
self.initialize_subscription_mgr()
|
|
|
|
|
+ logging.info('registering messages to listen...')
|
|
|
|
|
+ self.gw_message_handler.add_listeners([self.cli_req_handler])
|
|
|
|
|
+ self.gw_message_handler.add_listener_topics(self.contract_subscription_mgr, 'reqMktData')
|
|
|
|
|
|
|
|
|
|
+ logging.info('Completed initialization sequence.')
|
|
|
|
|
|
|
|
|
|
|
|
|
def initialize_subscription_mgr(self):
|
|
def initialize_subscription_mgr(self):
|
|
|
|
|
|
|
|
- self.contract_subscription_mgr = SubscriptionManager(self)
|
|
|
|
|
|
|
+ self.contract_subscription_mgr = SubscriptionManager(self, self.gw_message_handler)
|
|
|
self.contract_subscription_mgr.register_persistence_callback(self.persist_subscriptions)
|
|
self.contract_subscription_mgr.register_persistence_callback(self.persist_subscriptions)
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
key = self.config.get("tws_gateway", "subscription_manager.subscriptions.redis_key").strip('"').strip("'")
|
|
key = self.config.get("tws_gateway", "subscription_manager.subscriptions.redis_key").strip('"').strip("'")
|
|
|
if self.rs.get(key):
|
|
if self.rs.get(key):
|
|
|
#contracts = map(lambda x: ContractHelper.kvstring2contract(x), json.loads(self.rs.get(key)))
|
|
#contracts = map(lambda x: ContractHelper.kvstring2contract(x), json.loads(self.rs.get(key)))
|
|
@@ -393,22 +372,26 @@ class TWS_gateway(threading.Thread):
|
|
|
self.rs.set(key, cs)
|
|
self.rs.set(key, cs)
|
|
|
|
|
|
|
|
|
|
|
|
|
- def initialize_redis(self, config):
|
|
|
|
|
- r_host = config.get("redis", "redis.server").strip('"').strip("'")
|
|
|
|
|
- r_port = config.get("redis", "redis.port")
|
|
|
|
|
- r_db = config.get("redis", "redis.db")
|
|
|
|
|
|
|
+ def initialize_redis(self):
|
|
|
|
|
|
|
|
- self.rs = redis.Redis(r_host, r_port, r_db)
|
|
|
|
|
|
|
+ self.rs = redis.Redis(self.kwargs['redis_host'], self.kwargs['redis_port'], self.kwargs['redis_db'])
|
|
|
try:
|
|
try:
|
|
|
self.rs.client_list()
|
|
self.rs.client_list()
|
|
|
except redis.ConnectionError:
|
|
except redis.ConnectionError:
|
|
|
- logging.error('TWS_gateway: unable to connect to redis server using these settings: %s port:%d db:%d' % (r_host, r_port, r_db))
|
|
|
|
|
|
|
+ logging.error('TWS_gateway: unable to connect to redis server using these settings: %s port:%d db:%d' %
|
|
|
|
|
+ (self.kwargs['redis_host'], self.kwargs['redis_port'], self.kwargs['redis_db']))
|
|
|
logging.error('aborting...')
|
|
logging.error('aborting...')
|
|
|
sys.exit(-1)
|
|
sys.exit(-1)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
+ def eConnect(self):
|
|
|
|
|
+ logging.info('ClientRequestHandler - eConnect. Connecting to %s:%s App Id: %s' % (self.host, self.port, self.clientId))
|
|
|
|
|
+ self.tws_connection.eConnect(self.host, self.port, self.clientId)
|
|
|
|
|
+ return self.tws_connection.isConnected()
|
|
|
|
|
|
|
|
-
|
|
|
|
|
|
|
+ def eDisconnect(self, value=None):
|
|
|
|
|
+ sleep(2)
|
|
|
|
|
+ self.tws_connection.eDisconnect()
|
|
|
|
|
|
|
|
def run(self):
|
|
def run(self):
|
|
|
|
|
|
|
@@ -438,7 +421,7 @@ class TWS_gateway(threading.Thread):
|
|
|
|
|
|
|
|
self.eDisconnect()
|
|
self.eDisconnect()
|
|
|
self.eConnect()
|
|
self.eConnect()
|
|
|
- while not self.connection.isConnected():
|
|
|
|
|
|
|
+ while not self.tws_connection.isConnected():
|
|
|
logging.error('TWS_gateway: attempt to reconnect...')
|
|
logging.error('TWS_gateway: attempt to reconnect...')
|
|
|
self.eConnect()
|
|
self.eConnect()
|
|
|
sleep(2)
|
|
sleep(2)
|
|
@@ -457,12 +440,9 @@ class ClientRequestHandler(BaseMessageListener):
|
|
|
|
|
|
|
|
def __init__(self, name, tws_gateway):
|
|
def __init__(self, name, tws_gateway):
|
|
|
BaseMessageListener.__init__(self, name, tws_gateway)
|
|
BaseMessageListener.__init__(self, name, tws_gateway)
|
|
|
- self.tws_connect = tws_gateway.connection
|
|
|
|
|
|
|
+ self.tws_connect = tws_gateway.tws_connection
|
|
|
|
|
|
|
|
- def eConnect(self):
|
|
|
|
|
- logging.info('ClientRequestHandler - eConnect. Connecting to %s:%s App Id: %s' % (self.host, self.port, self.clientId))
|
|
|
|
|
- self.tws_connect.eConnect(self.host, self.port, self.clientId)
|
|
|
|
|
- return self.tws_connect.isConnected()
|
|
|
|
|
|
|
+
|
|
|
|
|
|
|
|
|
|
|
|
|
def reqAccountUpdates(self, value=None):
|
|
def reqAccountUpdates(self, value=None):
|
|
@@ -567,13 +547,12 @@ class ClientRequestHandler(BaseMessageListener):
|
|
|
self.tws_connect.placeOrder(vals[0], ContractHelper.kvstring2object(vals[1], Contract), OrderHelper.kvstring2object(vals[2], Order))
|
|
self.tws_connect.placeOrder(vals[0], ContractHelper.kvstring2object(vals[1], Contract), OrderHelper.kvstring2object(vals[2], Order))
|
|
|
# self.connection.placeOrder(orderId, contract, newOptOrder)
|
|
# self.connection.placeOrder(orderId, contract, newOptOrder)
|
|
|
|
|
|
|
|
- def eDisconnect(self, value=None):
|
|
|
|
|
- sleep(2)
|
|
|
|
|
- self.tws_connect.eDisconnect()
|
|
|
|
|
|
|
+
|
|
|
|
|
|
|
|
|
|
|
|
|
- ####################################################################3
|
|
|
|
|
- # Gateway commands
|
|
|
|
|
|
|
+ """
|
|
|
|
|
+ Client requests to TWS_gateway
|
|
|
|
|
+ """
|
|
|
def gw_req_subscriptions(self, value=None):
|
|
def gw_req_subscriptions(self, value=None):
|
|
|
|
|
|
|
|
#subm = map(lambda i: ContractHelper.contract2kvstring(self.contract_subscription_mgr.handle[i]), range(len(self.contract_subscription_mgr.handle)))
|
|
#subm = map(lambda i: ContractHelper.contract2kvstring(self.contract_subscription_mgr.handle[i]), range(len(self.contract_subscription_mgr.handle)))
|
|
@@ -582,27 +561,13 @@ class ClientRequestHandler(BaseMessageListener):
|
|
|
|
|
|
|
|
print subm
|
|
print subm
|
|
|
if subm:
|
|
if subm:
|
|
|
- self.tws_event_handler.broadcast_event('gw_subscriptions', {'subscriptions': subm}, source='GW')
|
|
|
|
|
|
|
+ self.producer.send_message('gw_subscriptions', self.produce.message_dumps({'subscriptions': subm}))
|
|
|
|
|
|
|
|
|
|
|
|
|
-
|
|
|
|
|
- #####################################################################
|
|
|
|
|
- #
|
|
|
|
|
- # broadcast gateway notifications
|
|
|
|
|
- def gw_notify_subscription_changed(self, value):
|
|
|
|
|
- #
|
|
|
|
|
- # this function is triggered by SubscriptionManager
|
|
|
|
|
- # value param:
|
|
|
|
|
- #
|
|
|
|
|
- # {id: contractkv_str}
|
|
|
|
|
- #
|
|
|
|
|
- logging.info("TWS_gateway:gw_notify_subscription_changed: %s" % value)
|
|
|
|
|
- self.tws_event_handler.broadcast_event('gw_subscription_changed', value, source='GW')
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class SubscriptionManager(BaseMessageListener):
|
|
class SubscriptionManager(BaseMessageListener):
|
|
|
|
|
|
|
|
- parent = None
|
|
|
|
|
# array list of contracts
|
|
# array list of contracts
|
|
|
handle = []
|
|
handle = []
|
|
|
# contract key map to contract ID (index of the handle array)
|
|
# contract key map to contract ID (index of the handle array)
|
|
@@ -612,8 +577,10 @@ class SubscriptionManager(BaseMessageListener):
|
|
|
|
|
|
|
|
def __init__(self, name, tws_gateway):
|
|
def __init__(self, name, tws_gateway):
|
|
|
BaseMessageListener.__init__(self, name, tws_gateway)
|
|
BaseMessageListener.__init__(self, name, tws_gateway)
|
|
|
- self.tws_connect = tws_gateway.connection
|
|
|
|
|
|
|
+ self.tws_connect = tws_gateway.tws_connection
|
|
|
|
|
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
def load_subscription(self, contracts):
|
|
def load_subscription(self, contracts):
|
|
|
for c in contracts:
|
|
for c in contracts:
|
|
|
self.reqMktData(c)
|
|
self.reqMktData(c)
|
|
@@ -687,8 +654,8 @@ class SubscriptionManager(BaseMessageListener):
|
|
|
#
|
|
#
|
|
|
# instruct gateway to broadcast new id has been assigned to a new contract
|
|
# instruct gateway to broadcast new id has been assigned to a new contract
|
|
|
#
|
|
#
|
|
|
-
|
|
|
|
|
- >>>self.parent.gw_notify_subscription_changed({id: ContractHelper.object2kvstring(contract)})
|
|
|
|
|
|
|
+ self.producer.send_message('gw_notify_subscription_changed', self.producer.message_dumps({id: ContractHelper.object2kvstring(contract)}))
|
|
|
|
|
+ #>>>self.parent.gw_notify_subscription_changed({id: ContractHelper.object2kvstring(contract)})
|
|
|
logging.info('SubscriptionManager reqMktData: gw_notify_subscription_changed: %d:%s' % (id, ContractHelper.makeRedisKeyEx(contract)))
|
|
logging.info('SubscriptionManager reqMktData: gw_notify_subscription_changed: %d:%s' % (id, ContractHelper.makeRedisKeyEx(contract)))
|
|
|
|
|
|
|
|
|
|
|
|
@@ -824,4 +791,4 @@ if __name__ == '__main__':
|
|
|
|
|
|
|
|
|
|
|
|
|
# test_subscription()
|
|
# test_subscription()
|
|
|
-
|
|
|
|
|
|
|
+
|