Przeglądaj źródła

finish new kf_tws_gateway.py, new package ibgw and ibc

bobhk 9 lat temu
rodzic
commit
9557dfdf2d

+ 0 - 0
src/comms/ibc/__init__.py


+ 424 - 0
src/comms/ibc/tws_client_lib.py

@@ -0,0 +1,424 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+import sys
+import copy
+from time import sleep, strftime
+import ConfigParser
+import logging
+import json
+
+from ib.ext.Contract import Contract
+
+from misc2.helpers import ContractHelper, ExecutionFilterHelper, OrderHelper
+from comms.ibgw.base_messaging import Prosumer, BaseMessageListener
+from comms.tws_protocol_helper import TWS_Protocol
+import redis
+         
+class TWS_user():
+
+    
+    # monitor IB connection / heart beat
+#     ibh = None
+#     tlock = None
+#     ib_conn_status = None
+    TWS_CLI_DEFAULT_CONFIG = {
+      'name': 'tws_gateway_client',
+      'bootstrap_host': 'localhost',
+      'bootstrap_port': 9092,
+      'redis_host': 'localhost',
+      'redis_port': 6379,
+      'redis_db': 0,
+      'tws_host': 'localhost',
+      'tws_api_port': 8496,
+      'tws_app_id': 38868,
+      'group_id': 'TWS_CLI',
+      'session_timeout_ms': 10000,
+      'clear_offsets':  False,
+      
+      'topics': list(TWS_Protocol.topicEvents) + list(TWS_Protocol.gatewayEvents)
+      }
+      
+               
+    
+    def __init__(self, kwargs):
+        
+
+             
+        
+        self.kwargs = copy.copy(TWS_user.TWS_CLI_DEFAULT_CONFIG)
+        for key in self.kwargs:
+            if key in kwargs:
+                self.kwargs[key] = kwargs.pop(key)        
+        self.kwargs.update(kwargs)        
+        
+        
+
+
+        '''
+            TWS_user start up sequence
+            
+            1. establish redis connection
+            2. initialize prosumer instance - gateway message handler
+            
+            4. initialize listeners: 
+            5. start the prosumer 
+        
+        '''
+
+        logging.info('starting up TWS_user...')
+        
+        
+        logging.info('establishing redis connection...')
+        self.initialize_redis()
+        
+        logging.info('starting up gateway message handler - kafka Prosumer...')        
+        self.gw_message_handler = Prosumer(name='tws_cli_prosumer', kwargs=self.kwargs)
+        
+        
+        logging.info('establishing TWS gateway connectivity...')
+        if not self.connect_tws():
+            logging.error('TWS_user: unable to establish connection to IB %s:%d' % 
+                          (self.kwargs['tws_host'], self.kwargs['tws_api_port']))
+            self.disconnect_tws()
+            sys.exit(-1)
+        else:
+            # start heart beat monitor
+            pass
+#             logging.info('starting up IB heart beat monitor...')
+#             self.tlock = Lock()
+#             self.ibh = IbHeartBeat(config)
+#             self.ibh.register_listener([self.on_ib_conn_broken])
+#             self.ibh.run()  
+
+        logging.info('start TWS_event_handler. Entering processing loop...')
+        self.gw_message_handler.start_prosumer()
+
+        logging.info('instantiating gw_command_proxy')        
+        self.gw_command_proxy = GatewayCommandProxy('gw_command_proxy', self.gw_message_handler)
+        logging.info('instantiating listeners subscription manager...')
+
+        logging.info('**** Completed initialization sequence. ****')
+        self.main_loop()
+        
+
+
+    def initialize_redis(self):
+
+        self.rs = redis.Redis(self.kwargs['redis_host'], self.kwargs['redis_port'], self.kwargs['redis_db'])
+        try:
+            self.rs.client_list()
+        except redis.ConnectionError:
+            logging.error('TWS_user: unable to connect to redis server using these settings: %s port:%d db:%d' % 
+                          (self.kwargs['redis_host'], self.kwargs['redis_port'], self.kwargs['redis_db']))
+            logging.error('aborting...')
+            sys.exit(-1)
+        
+
+    def main_loop(self):
+        try:
+            logging.info('TWS_user:main_loop ***** accepting console input...')
+            while True: 
+                
+                sleep(.45)
+                
+        except (KeyboardInterrupt, SystemExit):
+                logging.error('TWS_user: caught user interrupt. Shutting down...')
+                self.gw_message_handler.set_stop()
+                self.gw_message_handler.join()
+                logging.info('TWS_user: Service shut down complete...')
+                sys.exit(0)        
+
+
+
+class GatewayCommandProxy():
+
+    def __init__(self, producer):
+        self.producer = producer
+  
+    def reqOpenOrders(self):
+        self.producer.send_message('reqOpenOrders', '')
+    
+    def reqIds(self):
+        self.producer.send_message('reqIds', '')
+    
+    def reqNewsBulletins(self):
+        logging.error('reqNewsBulletins: NOT IMPLEMENTED')
+    
+    def cancelNewsBulletins(self):
+        logging.error('cancelNewsBulletins: NOT IMPLEMENTED')
+    
+    def setServerLogLevel(self):
+        logging.error('setServerLogLevel: NOT IMPLEMENTED')
+  
+    def reqAutoOpenOrders(self):
+        logging.error('reqAutoOpenOrders: NOT IMPLEMENTED')
+    
+    def reqAllOpenOrders(self):
+        logging.error('reqAllOpenOrders: NOT IMPLEMENTED')
+    
+    def reqManagedAccts(self):
+        logging.error('reqManagedAccts: NOT IMPLEMENTED')
+    
+    def requestFA(self):
+        logging.error('requestFA: NOT IMPLEMENTED')
+    
+    def reqPositions(self):
+        self.producer.send_message('reqPositions', '')
+        
+    def reqHistoricalData(self):
+        logging.error('reqHistoricalData: NOT IMPLEMENTED')
+        
+    def reqAccountUpdates(self):
+        self.producer.send_message('reqAccountUpdates', '1')
+
+    def reqExecutions(self, exec_filter=None):
+        self.producer.send_message('reqExecutions', ExecutionFilterHelper.object2kvstring(exec_filter) if exec_filter <> None else '')
+
+    def reqMktData(self, contract):
+        self.producer.send_message('reqMktData', ContractHelper.object2kvstring(contract))
+        
+    def reqAccountSummary(self, reqId, group, tags):
+        self.producer.send_message('reqAccountSummary', self.producer.message_dumps([reqId, group, tags]))
+    
+    def placeOrder(self, id, contract, order):
+        self.producer.send_message('placeOrder', 
+                                   self.producer.message_dumps([id, ContractHelper.contract2kvstring(contract), OrderHelper.object2kvstring(order)]))
+    
+        
+
+    def gw_req_subscriptions(self, event, items):
+        
+        logging.info("[%s] received gw_req_subscriptions content:[%s]" % (self.name, items))
+        vars= self.producer.message_loads(items['value'])
+        self.producer.send_message('gw_req_subscriptions', self.producer.message_dumps(None))
+
+
+                                
+        
+        
+    def reqMktData(self, event, items):
+        logging.info("[%s] received %s content:[%s]" % (self.name, event, items))
+        self.producer.send_message('tickPrice', 
+                        self.producer.message_dumps({'field':4, 'typeName':'tickPrice', 'price':1.0682, 'ts':1485661437.83, 'source':'IB', 'tickerId':79, 'canAutoExecute':0}))
+        
+
+    
+class GatewayMessageListener(BaseMessageListener):
+    
+    def __init__(self, name, producer):
+        BaseMessageListener.__init__(self, name)
+        self.producer = producer
+  
+    def tickPrice(self, tickerId, field, price, canAutoExecute):
+        """ generated source for method tickPrice """
+       
+    
+    
+    def tickPrice(self, event, items):   
+        logging.info("[%s] received %s content:[%s]" % (self.name, event, items))
+
+   
+    def tickSize(self, tickerId, field, size):
+        """ generated source for method tickSize """
+
+   
+    def tickOptionComputation(self, tickerId, field, impliedVol, delta, optPrice, pvDividend, gamma, vega, theta, undPrice):
+        """ generated source for method tickOptionComputation """
+
+   
+    def tickGeneric(self, tickerId, tickType, value):
+        """ generated source for method tickGeneric """
+
+   
+    def tickString(self, tickerId, tickType, value):
+        """ generated source for method tickString """
+
+   
+    def tickEFP(self, tickerId, tickType, basisPoints, formattedBasisPoints, impliedFuture, holdDays, futureExpiry, dividendImpact, dividendsToExpiry):
+        """ generated source for method tickEFP """
+
+   
+    def orderStatus(self, orderId, status, filled, remaining, avgFillPrice, permId, parentId, lastFillPrice, clientId, whyHeld):
+        """ generated source for method orderStatus """
+
+   
+    def openOrder(self, orderId, contract, order, orderState):
+        """ generated source for method openOrder """
+
+   
+    def openOrderEnd(self):
+        """ generated source for method openOrderEnd """
+
+   
+    def updateAccountValue(self, key, value, currency, accountName):
+        """ generated source for method updateAccountValue """
+
+   
+    def updatePortfolio(self, contract, position, marketPrice, marketValue, averageCost, unrealizedPNL, realizedPNL, accountName):
+        """ generated source for method updatePortfolio """
+
+   
+    def updateAccountTime(self, timeStamp):
+        """ generated source for method updateAccountTime """
+
+   
+    def accountDownloadEnd(self, accountName):
+        """ generated source for method accountDownloadEnd """
+
+   
+    def nextValidId(self, orderId):
+        """ generated source for method nextValidId """
+
+   
+    def contractDetails(self, reqId, contractDetails):
+        """ generated source for method contractDetails """
+
+   
+    def bondContractDetails(self, reqId, contractDetails):
+        """ generated source for method bondContractDetails """
+
+   
+    def contractDetailsEnd(self, reqId):
+        """ generated source for method contractDetailsEnd """
+
+   
+    def execDetails(self, reqId, contract, execution):
+        """ generated source for method execDetails """
+
+   
+    def execDetailsEnd(self, reqId):
+        """ generated source for method execDetailsEnd """
+
+   
+    def updateMktDepth(self, tickerId, position, operation, side, price, size):
+        """ generated source for method updateMktDepth """
+
+   
+    def updateMktDepthL2(self, tickerId, position, marketMaker, operation, side, price, size):
+        """ generated source for method updateMktDepthL2 """
+
+   
+    def updateNewsBulletin(self, msgId, msgType, message, origExchange):
+        """ generated source for method updateNewsBulletin """
+
+   
+    def managedAccounts(self, accountsList):
+        """ generated source for method managedAccounts """
+
+   
+    def receiveFA(self, faDataType, xml):
+        """ generated source for method receiveFA """
+
+   
+    def historicalData(self, reqId, date, open, high, low, close, volume, count, WAP, hasGaps):
+        """ generated source for method historicalData """
+
+   
+    def scannerParameters(self, xml):
+        """ generated source for method scannerParameters """
+
+   
+    def scannerData(self, reqId, rank, contractDetails, distance, benchmark, projection, legsStr):
+        """ generated source for method scannerData """
+
+   
+    def scannerDataEnd(self, reqId):
+        """ generated source for method scannerDataEnd """
+
+   
+    def realtimeBar(self, reqId, time, open, high, low, close, volume, wap, count):
+        """ generated source for method realtimeBar """
+
+   
+    def currentTime(self, time):
+        """ generated source for method currentTime """
+
+   
+    def fundamentalData(self, reqId, data):
+        """ generated source for method fundamentalData """
+
+   
+    def deltaNeutralValidation(self, reqId, underComp):
+        """ generated source for method deltaNeutralValidation """
+
+   
+    def tickSnapshotEnd(self, reqId):
+        """ generated source for method tickSnapshotEnd """
+
+   
+    def marketDataType(self, reqId, marketDataType):
+        """ generated source for method marketDataType """
+
+   
+    def commissionReport(self, commissionReport):
+        """ generated source for method commissionReport """
+
+   
+    def position(self, account, contract, pos, avgCost):
+        """ generated source for method position """
+
+   
+    def positionEnd(self):
+        """ generated source for method positionEnd """
+
+   
+    def accountSummary(self, reqId, account, tag, value, currency):
+        """ generated source for method accountSummary """
+
+   
+    def accountSummaryEnd(self, reqId):
+        """ generated source for method accountSummaryEnd """
+
+    def gw_subscription_changed(self, event, items):
+        logging.info("[%s] received gw_subscription_changed content: [%s]" % (self.name, items))
+        #print 'SubscriptionListener:gw_subscription_changed %s' % items
+        
+        
+    def on_kb_reached_last_offset(self, event, items):
+        logging.info("[%s] received on_kb_reached_last_offset content: [%s]" % (self.name, items))
+        print "on_kb_reached_last_offset [%s] %s" % (self.name, items)
+        
+
+    
+class ConfigMap():
+    
+    def kwargs_from_file(self, path):
+        cfg = ConfigParser.ConfigParser()            
+        if len(cfg.read(path)) == 0: 
+            raise ValueError, "Failed to open config file [%s]" % path 
+
+        kwargs = {}
+        for section in cfg.sections():
+            optval_list = map(lambda o: (o, cfg.get(section, o)), cfg.options(section)) 
+            for ov in optval_list:
+                try:
+                    
+                    kwargs[ov[0]] = eval(ov[1])
+                except:
+                    continue
+                
+        #logging.debug('ConfigMap: %s' % kwargs)
+        return kwargs
+        
+    
+if __name__ == '__main__':
+    
+    if len(sys.argv) != 2:
+        print("Usage: %s <config file>" % sys.argv[0])
+        exit(-1)    
+
+
+
+    cfg_path= sys.argv[1:]
+    kwargs = ConfigMap().kwargs_from_file(cfg_path)
+   
+      
+    logconfig = kwargs['logconfig']
+    logconfig['format'] = '%(asctime)s %(levelname)-8s %(message)s'    
+    logging.basicConfig(**logconfig)        
+    
+    
+    app = TWS_user(kwargs)
+    
+     

+ 0 - 0
src/comms/ibgw.py


+ 0 - 0
src/comms/ibgw/__init__.py


BIN
src/comms/ibgw/__init__.pyc


+ 593 - 0
src/comms/ibgw/base_messaging.py

@@ -0,0 +1,593 @@
+ #!/usr/bin/env python
+import threading, logging, time
+import sys
+import copy
+import datetime
+import uuid
+from Queue import Queue
+from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor
+from misc2.observer import NotImplementedException
+from kafka import KafkaConsumer, KafkaProducer
+from kafka.structs import TopicPartition
+from kafka.errors import NoBrokersAvailable
+# 
+# packages required for ConsumerNextIteratorPersist
+import json
+from redis import Redis
+
+from misc2.observer import Subscriber, Publisher
+from numpy.distutils.fcompiler import none
+
+
+
+class Producer(threading.Thread):
+    daemon = True
+
+    def run(self):
+        try:
+            producer = KafkaProducer(bootstrap_servers='localhost:9092')
+            topics = ['my-topic', 'my-topic2']
+            i = 0
+            while True:
+                #today = datetime.date.today()
+                s = "%d %s test %s" % (i, topics[i%2], time.strftime("%b %d %Y %H:%M:%S"))
+                logging.info(s)
+                producer.send(topics[i%2], s)
+                
+                time.sleep(.45)
+                i=i+1
+        except NoBrokersAvailable:
+            logging.error("NoBrokersAvailable: Has kafka started?")
+
+
+class BaseProducer(threading.Thread, Subscriber):
+
+    def __init__(self, group=None, target=None, name=None,
+                 args=(), kwargs=None, verbose=None):
+        threading.Thread.__init__(self, group=group, target=target, name=name,
+                                  verbose=verbose)
+        """
+        kwargs:
+            bootstrap_host
+            bootstrap_host
+            redis_host
+            session_timeout_ms: 
+        """
+        
+        
+        self.name = '%s-%s' % (name, uuid.uuid5(uuid.NAMESPACE_OID, name)) 
+        logging.info('BaseProducer __init__: name=%s' % self.name)
+        self.args = args
+        self.kwargs = kwargs
+        
+        self.event_q = Queue()
+        return
+
+
+    def send_message(self, topic, plain_text):
+        self.event_q.put((topic, plain_text))
+        self.event_q.task_done()
+
+    def set_stop(self):
+        self.done = True
+        
+    def run(self):
+        try:
+            producer = KafkaProducer(bootstrap_servers='%s:%s' % (self.kwargs['bootstrap_host'], self.kwargs['bootstrap_port']))
+            
+            self.done = False
+            while self.done <> True:
+                #today = datetime.date.today()
+                
+                if not self.event_q.empty():
+                    topic, plain_text = self.event_q.get()
+                    #s = "BaseProducer topic:[%s] msg:[%s]" % (i, topics[i%2], time.strftime("%b %d %Y %H:%M:%S"))
+                    logging.info("BaseProducer topic:[%s] msg:[%s]" % (topic, plain_text))
+                    producer.send(topic, plain_text)
+                    
+                # to prevent excessive CPU use
+                time.sleep(0.1)
+            
+            
+            logging.info ('******** BaseProducer exit done.')
+            
+                
+        except NoBrokersAvailable:
+            logging.error("NoBrokersAvailable: Has kafka started?")
+    
+
+class BaseConsumer(threading.Thread, Publisher):
+    
+    #KB_EVENT = "on_kb_event"
+    KB_REACHED_LAST_OFFSET = "on_kb_reached_last_offset"
+    
+    #my_topics =  {'my-topic':{}, 'my-topic2':{}}    
+
+    def __init__(self, group=None, target=None, name=None,
+                 args=(), kwargs=None, verbose=None):
+        threading.Thread.__init__(self, group=group, target=target, name=name,
+                                  verbose=verbose)
+        """
+        kwargs:
+            bootstrap_host
+            bootstrap_host
+            redis_host
+            redis_port
+            redis_db
+            group_id
+            consumer_id: name 
+            topics: a list of topic strings
+            session_timeout_ms: 
+            consumer_timeout_ms
+        """
+        
+        
+        self.name = '%s-%s' % (name, uuid.uuid5(uuid.NAMESPACE_OID, name)) 
+        logging.info('BaseConsumer __init__: name=%s' % self.name)
+        self.args = args
+        self.kwargs = kwargs
+        self.rs = Redis(self.kwargs['redis_host'], self.kwargs['redis_port'], self.kwargs['redis_db'])
+        self.my_topics = {}
+        for t in self.kwargs['topics']:
+            self.my_topics[t]= {} 
+            
+        #self.events = {event: dict() for event in [BaseConsumer.KB_EVENT, BaseConsumer.KB_REACHED_LAST_OFFSET]}
+        self.events = {event: dict() for event in [BaseConsumer.KB_REACHED_LAST_OFFSET] + self.kwargs['topics']}
+        return
+    
+    
+    # no use: doesn't work
+    def seek_to_last_read_offset(self, consumer):
+        for t in self.my_topics.keys():
+            po = json.loads(self.rs.get(t))
+            consumer.seek(TopicPartition(topic=t, partition=po['partition']), po['offset'])
+    
+    """
+     each consumer has its own set of topics offsets stored in redis
+     for consumer A and consumer B (with different group_ids) subscribing to the same my-topic, each 
+     each of them will need to keep track of its own offsets
+     to make the redis key unique by consumer, the key is created by topic + '@' + consumer name
+     example: my-topic@consumerA
+     
+     offsets = { topic-consumer_name:
+                     {
+                         partition0: offset0,
+                         partition1: offset1,... 
+                     }
+               }
+    """
+    
+    def consumer_topic(self, tp):
+        return tp + '@' + self.name 
+    
+    
+    def clear_offsets(self):
+        """
+            clear the offsets in redis by removing the value from redis
+            and emptying the internal my_topics dict
+            
+            clear offsets is ncessary when the offset in redis was saved 
+            at a time since kafka manager was shut down
+            when kafka restarts, previously buffered 
+            messages are no longer available and instead it will restart its offset at 0.
+            Reading an old offset by BaseConsumer will cause it to think that it
+            is still receving old buffered messages from Kafa but in fact all the messages 
+            since the last shut down of kafka are all gone
+        """
+        for t in self.kwargs['topics']:
+            self.my_topics[t]= {}
+            logging.info("BaseConsumer:clear_offsets Deleting %s from redis..." % self.consumer_topic(t))
+            self.rs.delete(self.consumer_topic(t))
+            
+             
+        #raise NotImplementedException
+    
+    
+    def persist_offsets(self, topic, partition, offset):
+        #self.rs.set(self.consumer_topic(topic), json.dumps({'partition': partition, 'offset':offset}))
+        self.my_topics[topic][str(partition)] = offset
+        self.rs.set(self.consumer_topic(topic), json.dumps(self.my_topics[topic]))
+    
+    def enrich_message(self, message):
+        return {'value': message.value, 'partition':message.partition, 'offset': message.offset}
+    
+    def set_stop(self):
+        self.done = True
+    
+    def run(self):
+        print '%s:%s started' % (self.kwargs['group_id'], self.name)
+        
+        if self.kwargs['clear_offsets'] == True:
+            self.clear_offsets()
+        
+        consumer = KafkaConsumer(bootstrap_servers='%s:%s' % (self.kwargs['bootstrap_host'], self.kwargs['bootstrap_port']),
+                                 auto_offset_reset='earliest',
+                                 #
+                                 # consumers having the same group id works as   
+                                 # a group to seize messages published by the publisher
+                                 # (like a queue, each message is consumed exactly once
+                                 #  by a consumer)
+                                 #
+                                 #
+                                 # in a 'pub-sub' environment, set each consumer having 
+                                 # a unique group_id
+                                 #
+                                 group_id = self.kwargs['group_id'],
+                                 client_id = self.name,
+                                 #
+                                 # session_timeout_ms is the time it takes for another consumer 
+                                 # that has the same group_id to pick up the work
+                                 # to consume messages when this consumer is dead 
+                                 # 
+                                 session_timeout_ms = self.kwargs['session_timeout_ms'],
+                                 #
+                                 # 
+                                 #
+                                 partition_assignment_strategy=[RoundRobinPartitionAssignor],
+                                 #
+                                 #
+                                 consumer_timeout_ms=self.kwargs['consumer_timeout_ms']
+                                )
+
+
+        
+        for topic in self.my_topics.keys():
+            # 
+            # if a topic offset is stored previously (the consumer was run before), 
+            # then load the offset values
+            # and save it locally into the my_topics map 
+            # else self.my_topics[topic] would have zero elements in it
+            if self.rs.keys(self.consumer_topic(topic)):
+                self.my_topics[topic] = json.loads(self.rs.get(self.consumer_topic(topic)))
+
+        print self.my_topics
+        
+            
+        consumer.subscribe(self.my_topics.keys())
+
+        
+        #consumer.seek_to_end(TopicPartition(topic='my-topic', partition=0))
+
+        self.done = False
+        while self.done <> True:
+            try:
+                message = consumer.next()
+                
+                #time.sleep(0.25)
+                if message.offset % 50 == 0:
+                    logging.info( "[%s]:highwater:%d offset:%d part:%d <%s>" % (self.name, consumer.highwater(TopicPartition(message.topic, message.partition)),
+                                                                             message.offset, message.partition, message.value))
+                
+    #             for t, ps in map(lambda t: (t, consumer.partitions_for_topic(t)), self.my_topics.keys()):
+    #                 print "t:%s %s" % (t,  ','.join('p:%d, offset:%d' % (p, consumer.position(TopicPartition(topic=t, partition=p))) for p in ps)) # consumer.position(TopicPartition(topic=t, partition=p)))
+                
+                # if this is the first time the consumer is run
+                # it contains no offsets in the redis map, so it has 
+                # 0 elements in the map,
+                # then insert a new offset in redis and populate
+                # the local my_topics dict
+                
+                if len(self.my_topics[message.topic]) == 0:
+                    self.persist_offsets(message.topic, message.partition, message.offset)
+                    self.my_topics[message.topic] = json.loads(self.rs.get(self.consumer_topic(message.topic)))
+                    #continue
+                    
+                """
+                    the message.value received from kafaproducer is expected to contain 
+                    plain text encoded as a json string
+                    the content of message.value is not altered. it's content is stored in a dict object 
+                    with key = 'value' along with additional kafa metadata
+                                    
+                     
+                    it is the subscriber's job to interpret the content stored in the 'value' key. Typically
+                    it means decoding the content by invoking json.loads 
+                      
+                """
+                if self.my_topics[message.topic][str(message.partition)] > message.offset:
+                    print '********************** old message...discarding %s %d' % (message.topic, message.offset)
+                else:
+                    #if self.my_topics[message.topic][str(message.partition)] == message.offset:
+                    # if the stored offset in redis equals to the current offset
+                    # notify the observers
+                    # the "and" condition ensures that on a fresh start of kafka server this event is not triggered as
+                    # both saved value in redis and current offset are both 0
+                    if self.my_topics[message.topic][str(message.partition)] == message.offset and message.offset <> 0:
+                        self.dispatch(BaseConsumer.KB_REACHED_LAST_OFFSET, self.enrich_message(message))
+                        logging.info('********************** reached the last message previously processed %s %d' % (message.topic, message.offset))
+                    else:
+                        self.persist_offsets(message.topic, message.partition, message.offset)
+                        #self.dispatch(BaseConsumer.KB_EVENT, {'message': message})
+                        self.dispatch(message.topic, self.enrich_message(message))
+            except StopIteration:
+                logging.debug('BaseConsumer:run StopIteration Caught. No new message arriving...')
+                continue
+            
+            
+        logging.info ('******** BaseConsumer exit done.')
+
+
+
+class BaseMessageListener(Subscriber):
+    
+    
+    
+    def update(self, event, param=none):
+        try:
+            event_fn = getattr(self, event)
+            event_fn(param)
+        except AttributeError:
+            err_msg = 'BaseMessageListener:update| function %s not implemented.' % event
+            logging.error('BaseMessageListener [%s]:update %s' % (self.name, err_msg))
+        logging.debug("BaseMessageListener [%s]:update|Event type:[%s] content:[%s]" % (self.name, event, json.dumps(param) if param <> None else "<empty param>"))
+
+
+class SimpleMessageListener(BaseMessageListener):
+    
+    def __init__(self, name):
+        BaseMessageListener.__init__(self, name)
+    
+#     def on_kb_event(self, param):
+#         print "on_kb_event [%s] %s" % (self.name, param)
+    
+    def on_kb_reached_last_offset(self, param):
+        print "on_kb_reached_last_offset [%s] %s" % (self.name, param)
+
+
+class Prosumer(BaseProducer):
+    # wrapper object
+    PROSUMER_DEFAULT_CONFIG = {
+        'bootstrap_servers': 'localhost',
+        'client_id': 'kafka-prosumer' ,
+        'group_id': 'kafka-prosumer-default-group',
+        'key_deserializer': None,
+        'value_deserializer': None,
+        'fetch_max_wait_ms': 500,
+        'fetch_min_bytes': 1,
+        'max_partition_fetch_bytes': 1 * 1024 * 1024,
+        'request_timeout_ms': 40 * 1000,
+        'retry_backoff_ms': 100,
+        'reconnect_backoff_ms': 50,
+        'max_in_flight_requests_per_connection': 5,
+        'auto_offset_reset': 'latest',
+        'enable_auto_commit': True,
+        'auto_commit_interval_ms': 5000,
+        'default_offset_commit_callback': lambda offsets, response: True,
+        'check_crcs': True,
+        'metadata_max_age_ms': 5 * 60 * 1000,
+        'partition_assignment_strategy': (RoundRobinPartitionAssignor),
+        'heartbeat_interval_ms': 3000,
+        'session_timeout_ms': 30000,
+        'max_poll_records': sys.maxsize,
+        'receive_buffer_bytes': None,
+        'send_buffer_bytes': None,
+        'consumer_timeout_ms': 1000,
+        'connections_max_idle_ms': 9 * 60 * 1000, # not implemented yet
+    }    
+    
+    
+    def __init__(self, name, kwargs=None):
+        
+        
+        self.kwargs = copy.copy(self.PROSUMER_DEFAULT_CONFIG)
+        for key in self.kwargs:
+            if key in kwargs:
+                self.kwargs[key] = kwargs.pop(key)        
+        self.kwargs.update(kwargs)
+
+        logging.info('\nProsumer:init: **** Configurations dump ***')
+        logging.info('\n'.join('%s:%s' % (k.ljust(40), self.kwargs[k]) for k in sorted(self.kwargs)))
+
+        BaseProducer.__init__(self, group=None, target=None, name=name,
+                 args=(), kwargs=self.kwargs, verbose=None)
+
+        
+        self.kconsumer = BaseConsumer(name=name,  kwargs=self.kwargs)
+        
+        
+    
+    def add_listener_topics(self, listener, topics):
+        map(lambda e: self.kconsumer.register(e, listener, getattr(listener, e)), topics)
+        
+    def add_listeners(self, listeners):
+        
+        for l in listeners:
+            map(lambda e: self.kconsumer.register(e, l, getattr(l, e)), self.kwargs['topics'])
+        
+    
+
+    
+    def set_stop(self):
+        BaseProducer.set_stop(self)
+        self.kconsumer.set_stop()
+    
+    def start_prosumer(self):
+        self.kconsumer.start()
+        self.start()
+        
+    
+    def message_loads(self, text_msg):
+        return json.loads(text_msg)
+    
+    
+    def message_dumps(self, obj_msg):
+        return json.dumps(obj_msg)
+
+    
+class SubscriptionListener(BaseMessageListener):
+    
+    
+    def __init__(self, name, producer):
+        BaseMessageListener.__init__(self, name)
+        self.producer = producer
+        self.i = 0
+    
+    def gw_subscription_changed(self, event, items):
+        logging.info("[%s] received gw_subscription_changed content: [%s]" % (self.name, items))
+        #print 'SubscriptionListener:gw_subscription_changed %s' % items
+        
+#     def on_kb_event(self, param):
+#         print "on_kb_event [%s] %s" % (self.name, param)
+    def gw_req_subscriptions(self, event, items):
+        
+        logging.info("[%s] received gw_req_subscriptions content:[%s]" % (self.name, items))
+        vars= self.producer.message_loads(items['value'])
+        self.producer.send_message('gw_subscription_changed', self.producer.message_dumps({'id': self.i, 'reqid': vars['reqid'], 
+                                                                          'response' : "%s" % (time.strftime("%b %d %Y %H:%M:%S"))})
+                                   )
+        self.i = self.i + 1
+        
+    def reqMktData(self, event, items):
+        logging.info("[%s] received %s content:[%s]" % (self.name, event, items))
+        self.producer.send_message('tickPrice', 
+                        self.producer.message_dumps({'field':4, 'typeName':'tickPrice', 'price':1.0682, 'ts':1485661437.83, 'source':'IB', 'tickerId':79, 'canAutoExecute':0}))
+        
+    
+    def tickPrice(self, event, items):   
+        logging.info("[%s] received %s content:[%s]" % (self.name, event, items))
+        
+    def on_kb_reached_last_offset(self, event, items):
+        logging.info("[%s] received on_kb_reached_last_offset content: [%s]" % (self.name, items))
+        print "on_kb_reached_last_offset [%s] %s" % (self.name, items)
+        
+            
+def test_prosumer2(mode):
+    
+    if mode == 'A':
+                
+        topicsA = ['gw_subscription_changed', 'tickPrice']
+        
+        pA = Prosumer(name='A', kwargs={'bootstrap_host':'localhost', 'bootstrap_port':9092,
+                                        'redis_host':'localhost', 'redis_port':6379, 'redis_db':0,
+                                        'group_id': 'groupA', 'session_timeout_ms':10000,
+                                                 'topics': topicsA, 'clear_offsets' : False})
+        sA = SubscriptionListener('earA', pA)
+        
+        pA.add_listeners([sA])
+        pA.start_prosumer()
+        i = 0
+
+        try:
+            pA.send_message('reqMktData', pA.message_dumps({'contract':'dummy'}))
+            while True: #i < 5:
+                
+                #pA.send_message('gw_req_subscriptions', pA.message_dumps({'desc': 'requesting subscription msg counter:%d' % i, 
+                #                                                    'reqid': i}))
+                i= i + 1
+                time.sleep(.45)
+                
+        except (KeyboardInterrupt, SystemExit):
+                logging.error('caught user interrupt')
+                pA.set_stop()
+                pA.join()
+      
+            
+        
+
+        
+    else:    
+        topicsB = ['gw_req_subscriptions', 'reqMktData']
+        
+        pB = Prosumer(name='B', kwargs={'bootstrap_host':'localhost', 'bootstrap_port':9092,
+                                        'redis_host':'localhost', 'redis_port':6379, 'redis_db':0,
+                                        'group_id': 'groupB', 'session_timeout_ms':10000,
+                                                 'topics': topicsB, 'clear_offsets' : False})
+        sB = SubscriptionListener('earB', pB)
+        pB.add_listeners([sB])
+        pB.start_prosumer()
+        try:
+            
+            while True: #i < 5:
+                
+                pB.send_message('tickPrice', 
+                        pB.message_dumps({'field':5, 'typeName':'tickPrice', 'price':2.0682, 'ts':1485661437.83, 'source':'IB', 'tickerId':79, 'canAutoExecute':0}))
+                
+                time.sleep(.45)
+                
+        except (KeyboardInterrupt, SystemExit):
+                logging.error('caught user interrupt')
+                pB.set_stop()
+                pB.join()    
+    
+    
+
+    
+
+class TestProducer(BaseProducer):
+    pass
+
+def test_base_proconsumer(mode):
+
+    if mode == 'P':
+        #Producer().start()
+        topics = ['my-topic', 'my-topic2']
+        tp = TestProducer(name = 'testproducer', kwargs={
+                                             'bootstrap_host':'localhost', 'bootstrap_port':9092,
+                                             'topics': topics})
+        tp.start()
+        i = 0 
+        while True:
+            
+            #today = datetime.date.today()
+            s = "%d %s test %s" % (i, topics[i%2], time.strftime("%b %d %Y %H:%M:%S"))
+            logging.info(s)
+            tp.send_message(topics[i%2], s)
+            
+            time.sleep(.45)
+            i=i+1
+        
+        
+    else:
+        
+        bc = BaseConsumer(name='bc', kwargs={'redis_host':'localhost', 'redis_port':6379, 'redis_db':0,
+                                             'bootstrap_host':'localhost', 'bootstrap_port':9092,
+                                             'group_id':'gid', 'session_timeout_ms':10000, 'topics': ['my-topic', 'my-topic2']})
+        #bml = BaseMessageListener('bml')
+        sml = SimpleMessageListener('simple')
+        #bc.register(BaseConsumer.KB_EVENT, bml)
+        #bc.register(BaseConsumer.KB_REACHED_LAST_OFFSET, bml)
+        bc.register(BaseConsumer.KB_EVENT, sml)
+        bc.register(BaseConsumer.KB_REACHED_LAST_OFFSET, sml)
+        
+        bc.start()
+
+
+
+    
+def main():
+    
+    #
+    # test cases
+    #
+    tp = [ test_base_proconsumer, test_prosumer2]
+    
+    if len(sys.argv) != 3:
+        print("Usage: %s <role(producer or consumer): P|C> <test case #[0..1]>" % sys.argv[0])
+        print "\n".join('case #%d: %s' % (i, tp[i].__name__) for i in range(len(tp)))
+        print "example: python %s P 1" % sys.argv[0]
+        print "example: python %s C 1" % sys.argv[0]
+        exit(-1)    
+
+    mode = sys.argv[1] 
+    #gid = sys.argv[2] if sys.argv[2] <> None else "q-group"  
+
+    
+    tp[int(sys.argv[2])](mode)
+
+    #time.sleep(30)
+#     while 1:
+#         try:
+#             time.sleep(5)
+#             pass
+#         except (KeyboardInterrupt, SystemExit):
+#                 logging.error('caught user interrupt')
+#                 sys.exit(-1)
+ 
+    
+    
+
+if __name__ == "__main__":
+    logging.basicConfig(
+        format='%(asctime)s.%(msecs)s:%(name)s:%(thread)d:%(levelname)s:%(process)d:%(message)s',
+        level=logging.INFO
+        )
+    main()

+ 144 - 0
src/comms/ibgw/client_request_handler.py

@@ -0,0 +1,144 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+import logging
+import json
+import traceback
+from time import strftime 
+from misc2.helpers import ContractHelper, OrderHelper, ExecutionFilterHelper
+from comms.ibgw.base_messaging import BaseMessageListener
+
+from ib.ext.Contract import Contract
+from ib.ext.ExecutionFilter import ExecutionFilter
+from ib.ext.Execution import Execution
+from ib.ext.OrderState import OrderState
+from ib.ext.Order import Order
+
+
+class ClientRequestHandler(BaseMessageListener):
+    
+    def __init__(self, name, tws_gateway):
+        BaseMessageListener.__init__(self, name)
+        self.tws_connect = tws_gateway.tws_connection
+            
+
+    
+    
+    def reqAccountUpdates(self, value=None):
+        logging.info('ClientRequestHandler - reqAccountUpdates value=%s' % value)
+        self.tws_connect.reqAccountUpdates(1, '')
+    
+    def reqAccountSummary(self, value):
+        logging.info('ClientRequestHandler - reqAccountSummary value=%s' % value)
+        
+        vals = map(lambda x: x.encode('ascii') if isinstance(x, unicode) else x, json.loads(value))
+        self.tws_connect.reqAccountSummary(vals[0], vals[1], vals[2])
+        
+    def reqOpenOrders(self, value=None):
+        self.tws_connect.reqOpenOrders()
+    
+    def reqPositions(self, value=None):
+        self.tws_connect.reqPositions()
+        
+        
+    def reqExecutions(self, value):
+        try:
+            filt = ExecutionFilter() if value == '' else ExecutionFilterHelper.kvstring2object(value, ExecutionFilter)
+            self.tws_connect.reqExecutions(0, filt)
+        except:
+            logging.error(traceback.format_exc())
+    
+    
+    def reqIds(self, value=None):
+        self.tws_connect.reqIds(1)
+    
+    
+    def reqNewsBulletins(self):
+        self.tws_connect.reqNewsBulletins(1)
+    
+    
+    def cancelNewsBulletins(self):
+        self.tws_connect.cancelNewsBulletins()
+    
+    
+    def setServerLogLevel(self):
+        self.tws_connect.setServerLogLevel(3)
+    
+    
+    def reqAutoOpenOrders(self):
+        self.tws_connect.reqAutoOpenOrders(1)
+    
+    
+    def reqAllOpenOrders(self):
+        self.tws_connect.reqAllOpenOrders()
+    
+    
+    def reqManagedAccts(self):
+        self.tws_connect.reqManagedAccts()
+    
+    
+    def requestFA(self):
+        self.tws_connect.requestFA(1)
+    
+    
+    def reqMktData(self, sm_contract):
+        logging.info('ClientRequestHandler received reqMktData request: %s' % sm_contract)
+        try:
+            #self.contract_subscription_mgr.reqMktData(ContractHelper.kvstring2contract(sm_contract))
+            self.contract_subscription_mgr.reqMktData(ContractHelper.kvstring2object(sm_contract, Contract))
+        except:
+            pass
+    
+    def reqHistoricalData(self):
+        contract = Contract()
+        contract.m_symbol = 'QQQQ'
+        contract.m_secType = 'STK'
+        contract.m_exchange = 'SMART'
+        endtime = strftime('%Y%m%d %H:%M:%S')
+        self.tws_connect.reqHistoricalData(
+            tickerId=1,
+            contract=contract,
+            endDateTime=endtime,
+            durationStr='1 D',
+            barSizeSetting='1 min',
+            whatToShow='TRADES',
+            useRTH=0,
+            formatDate=1)
+    
+    
+    def placeOrder(self, value=None):
+        logging.info('TWS_gateway - placeOrder value=%s' % value)
+        try:
+            vals = json.loads(value)
+        except ValueError:
+            logging.error('TWS_gateway - placeOrder Exception %s' % traceback.format_exc())
+            return
+        
+    #        c = ContractHelper.kvstring2contract(vals[1])
+        o = OrderHelper.kvstring2object(vals[2], Order)
+        o.__dict__['transmit'] = self.ib_order_transmit
+    #         print c.__dict__
+    #         print o.__dict__
+    #         print '---------------------'
+    
+           
+        #self.connection.placeOrder(vals[0], ContractHelper.kvstring2contract(vals[1]), OrderHelper.kvstring2object(vals[2], Order))
+        self.tws_connect.placeOrder(vals[0], ContractHelper.kvstring2object(vals[1], Contract), OrderHelper.kvstring2object(vals[2], Order))
+    #        self.connection.placeOrder(orderId, contract, newOptOrder)
+        
+
+    
+    
+    """
+       Client requests to TWS_gateway
+    """
+    def gw_req_subscriptions(self, value=None):
+        
+        #subm = map(lambda i: ContractHelper.contract2kvstring(self.contract_subscription_mgr.handle[i]), range(len(self.contract_subscription_mgr.handle)))
+        #subm = map(lambda i: ContractHelper.object2kvstring(self.contract_subscription_mgr.handle[i]), range(len(self.contract_subscription_mgr.handle)))
+        subm = map(lambda i: (i, ContractHelper.object2kvstring(self.contract_subscription_mgr.handle[i])), range(len(self.contract_subscription_mgr.handle)))
+        
+        print subm
+        if subm:
+            self.producer.send_message('gw_subscriptions', self.produce.message_dumps({'subscriptions': subm}))
+            
+            

+ 179 - 0
src/comms/ibgw/subscription_manager.py

@@ -0,0 +1,179 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+import logging
+from misc2.helpers import ContractHelper
+from ib.ext.Contract import Contract
+from comms.ibgw.base_messaging import BaseMessageListener
+
+
+
+class SubscriptionManager(BaseMessageListener):
+    
+    
+    persist_f = None
+    
+    def __init__(self, name, tws_gateway):
+        BaseMessageListener.__init__(self, name)
+        self.tws_connect = tws_gateway.tws_connection
+        self.handle = []    
+        # contract key map to contract ID (index of the handle array)
+        self.tickerId = {}
+   
+        
+        
+    def load_subscription(self, contracts):
+        for c in contracts:
+            self.reqMktData(c)
+            
+        self.dump()
+    
+    # returns -1 if not found, else the key id (which could be a zero value)
+    def is_subscribed(self, contract):
+        #print self.conId.keys()
+        ckey = ContractHelper.makeRedisKeyEx(contract) 
+        if ckey not in self.tickerId.keys():
+            return -1
+        else:
+            # note that 0 can be a key 
+            # be careful when checking the return values
+            # check for true false instead of using implicit comparsion
+            return self.tickerId[ckey]
+    
+
+#     def reqMktDataxx(self, contract):
+#         print '---------------'
+#         contractTuple = ('USO', 'STK', 'SMART', 'USD', '', 0.0, '')
+#         stkContract = self.makeStkContract(contractTuple)     
+#         stkContract.m_includeExpired = False       
+#         self.parent.connection.reqMktData(1, stkContract, '', False)     
+# 
+#         contractTuple = ('IBM', 'STK', 'SMART', 'USD', '', 0.0, '')
+#         stkContract = self.makeStkContract(contractTuple)
+#         stkContract.m_includeExpired = False
+#         print stkContract   
+#         print stkContract.__dict__         
+#         self.parent.connection.reqMktData(2, stkContract, '', False)     
+#             
+
+            
+    def reqMktData(self, kvs_contract):
+                  
+        contract = ContractHelper.kvstring2object(kvs_contract, Contract)
+        #logging.info('SubscriptionManager: reqMktData')
+  
+        def add_subscription(contract):
+            self.handle.append(contract)
+            newId = len(self.handle) - 1
+            self.tickerId[ContractHelper.makeRedisKeyEx(contract)] = newId 
+             
+            return newId
+  
+        id = self.is_subscribed(contract)
+        if id == -1: # not found
+            id = add_subscription(contract)
+
+            #
+            # the conId must be set to zero when calling TWS reqMktData
+            # otherwise TWS will fail to subscribe the contract
+            
+            self.tws_connect.reqMktData(id, contract, '', False) 
+            
+            
+                   
+            if self.persist_f:
+                logging.debug('SubscriptionManager reqMktData: trigger callback')
+                self.persist_f(self.handle)
+                
+            logging.info('SubscriptionManager: reqMktData. Requesting market data, id = %d, contract = %s' % (id, ContractHelper.makeRedisKeyEx(contract)))
+        
+        else:    
+            self.tws_connect.reqMktData(1000 + id, contract, '', True)
+            logging.info('SubscriptionManager: reqMktData: contract already subscribed. Request snapshot = %d, contract = %s' % (id, ContractHelper.makeRedisKeyEx(contract)))
+        #self.dump()
+
+        #
+        # instruct gateway to broadcast new id has been assigned to a new contract
+        #
+        self.producer.send_message('gw_notify_subscription_changed', self.producer.message_dumps({id: ContractHelper.object2kvstring(contract)}))
+        #>>>self.parent.gw_notify_subscription_changed({id: ContractHelper.object2kvstring(contract)})
+        logging.info('SubscriptionManager reqMktData: gw_notify_subscription_changed: %d:%s' % (id, ContractHelper.makeRedisKeyEx(contract)))
+        
+        
+        
+#     def makeStkContract(self, contractTuple):
+#         newContract = Contract()
+#         newContract.m_symbol = contractTuple[0]
+#         newContract.m_secType = contractTuple[1]
+#         newContract.m_exchange = contractTuple[2]
+#         newContract.m_currency = contractTuple[3]
+#         newContract.m_expiry = contractTuple[4]
+#         newContract.m_strike = contractTuple[5]
+#         newContract.m_right = contractTuple[6]
+#         print 'Contract Values:%s,%s,%s,%s,%s,%s,%s:' % contractTuple
+#         return newContract        
+   
+    # use only after a broken connection is restored
+    # to re request market data 
+    def force_resubscription(self):
+        # starting from index 1 of the contract list, call  reqmktdata, and format the result into a list of tuples
+        for i in range(1, len(self.handle)):
+            self.tws_connect.reqMktData(i, self.handle[i], '', False)
+            logging.info('force_resubscription: %s' % ContractHelper.printContract(self.handle[i]))
+       
+            
+    def itemAt(self, id):
+        if id > 0 and id < len(self.handle):
+            return self.handle[id]
+        return -1
+
+    def dump(self):
+        
+        logging.info('subscription manager table:---------------------')
+        logging.info(''.join('%d: {%s},\n' % (i,  ''.join('%s:%s, ' % (k, v) for k, v in self.handle[i].__dict__.iteritems() )\
+                                     if self.handle[i] <> None else ''          ) for i in range(len(self.handle)))\
+                     )
+        
+        #logging.info( ''.join('%s[%d],\n' % (k, v) for k, v in self.conId.iteritems()))
+        logging.info( 'Number of instruments subscribed: %d' % len(self.handle))
+        logging.info( '------------------------------------------------')
+        
+    def register_persistence_callback(self, func):
+        logging.info('subscription manager: registering callback')
+        self.persist_f = func
+        
+
+
+def test_subscription():        
+    s = SubscriptionManager()
+    contractTuple = ('HSI', 'FUT', 'HKFE', 'HKD', '20151029', 0, '')
+    c = ContractHelper.makeContract(contractTuple)   
+    print s.is_subscribed(c)
+    print s.add_subscription(c)
+    print s.is_subscribed(c)
+    s.dump()
+    
+    fr = open('/home/larry-13.04/workspace/finopt/data/subscription-hsio.txt')
+    for l in fr.readlines():
+        if l[0] <> '#':
+             
+            s.add_subscription(ContractHelper.makeContract(tuple([t for t in l.strip('\n').split(',')])))    
+    fr.close()
+    s.dump()
+    
+    fr = open('/home/larry-13.04/workspace/finopt/data/subscription-hsio.txt')
+    for l in fr.readlines():
+        if l[0] <> '#':
+             
+            print s.add_subscription(ContractHelper.makeContract(tuple([t for t in l.strip('\n').split(',')])))    
+    s.dump()
+    contractTuple = ('HSI', 'FUT', 'HKFE', 'HKD', '20151127', 0, '')
+    c = ContractHelper.makeContract(contractTuple)   
+    print s.is_subscribed(c)
+    print s.add_subscription(c)
+    print s.is_subscribed(c), ContractHelper.printContract(s.itemAt(s.is_subscribed(c))) 
+    
+    print 'test itemAt:'
+    contractTuple = ('HSI', 'OPT', 'HKFE', 'HKD', '20151127', 21400, 'C')
+    c = ContractHelper.makeContract(contractTuple)
+    print s.is_subscribed(c), ContractHelper.printContract(s.itemAt(s.is_subscribed(c)))
+    

+ 212 - 0
src/comms/ibgw/tws_event_handler.py

@@ -0,0 +1,212 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+from time import strftime
+import logging
+import traceback
+from ib.ext.EWrapper import EWrapper
+
+        
+class TWS_event_handler(EWrapper):
+
+    TICKER_GAP = 1000
+    producer = None
+    
+    def __init__(self, producer):        
+        self.producer = producer
+ 
+ 
+
+ 
+    def broadcast_event(self, message, mapping):
+
+        try:
+            dict = self.tick_process_message(message, mapping)     
+            if message == 'gw_subscriptions' or message == 'gw_subscription_changed':   
+                logging.info('TWS_event_handler: broadcast event: %s [%s]' % (dict['typeName'], dict))
+            self.producer.send_message(message, self.producer.message_dumps(dict))    
+        except:
+            logging.error('broadcast_event: exception while encoding IB event to client:  [%s]' % message)
+            logging.error(traceback.format_exc())
+            
+            
+            
+
+    
+    def tick_process_message(self, message_name, items):
+        
+
+        t = items.copy()
+        # if the tickerId is in the snapshot range
+        # deduct the gap to derive the original tickerId
+        # --- check logic in subscription manager
+        try:
+            if (t['tickerId']  >= TWS_event_handler.TICKER_GAP):
+                t['tickerId'] = t['tickerId']  - TWS_event_handler.TICKER_GAP
+        except (KeyError, ):
+            pass          
+            
+        try:
+            del(t['self'])
+        except (KeyError, ):
+            pass          
+        
+
+        for k,v in t.iteritems():
+                #print k, v, type(v)
+                #if type(v) in [Contract, Execution, ExecutionFilter, OrderState, Order, CommissionReport]:
+            if 'ib.ext.' in str(type(v)):     
+                t[k] = v.__dict__
+            else:
+                t[k] = v
+        
+               
+        
+        return t  
+            
+                
+    
+    def tickPrice(self, tickerId, field, price, canAutoExecute):
+        
+        self.broadcast_event('tickPrice', vars())
+
+    def tickSize(self, tickerId, field, size):
+        
+        self.broadcast_event('tickSize', vars()) #vars())
+
+    def tickOptionComputation(self, tickerId, field, impliedVol, delta, optPrice, pvDividend, gamma, vega, theta, undPrice):
+        
+        #self.broadcast_event('tickOptionComputation', self.tick_process_message(vars())) #vars())
+        pass
+
+    def tickGeneric(self, tickerId, tickType, value):
+        self.broadcast_event('tickGeneric', vars()) 
+
+    def tickString(self, tickerId, tickType, value):
+        self.broadcast_event('tickString', vars()) 
+
+    def tickEFP(self, tickerId, tickType, basisPoints, formattedBasisPoints, impliedFuture, holdDays, futureExpiry, dividendImpact, dividendsToExpiry):
+        self.broadcast_event('tickEFP', vars())
+
+    def orderStatus(self, orderId, status, filled, remaining, avgFillPrice, permId, parentId, lastFillPrice, clientId, whyHeId):
+        self.broadcast_event('orderStatus', vars())
+
+    def openOrder(self, orderId, contract, order, state):
+        self.broadcast_event('openOrder', vars())
+
+    def openOrderEnd(self):
+        self.broadcast_event('openOrderEnd', vars())
+
+    def updateAccountValue(self, key, value, currency, accountName):
+        self.broadcast_event('updateAccountValue', vars())
+
+    def updatePortfolio(self, contract, position, marketPrice, marketValue, averageCost, unrealizedPNL, realizedPNL, accountName):
+        self.broadcast_event('updatePortfolio', vars())
+
+    def updateAccountTime(self, timeStamp):
+        self.broadcast_event('updateAccountTime', vars())
+
+    def accountDownloadEnd(self, accountName):
+        self.broadcast_event('accountDownloadEnd', vars())
+
+    def nextValidId(self, orderId):
+        self.broadcast_event('nextValidId', vars())
+
+    def contractDetails(self, reqId, contractDetails):
+        self.broadcast_event('contractDetails', vars())
+
+    def contractDetailsEnd(self, reqId):
+        self.broadcast_event('contractDetailsEnd', vars())
+
+    def bondContractDetails(self, reqId, contractDetails):
+        self.broadcast_event('bondContractDetails', vars())
+
+    def execDetails(self, reqId, contract, execution):
+        self.broadcast_event('execDetails', vars())
+
+    def execDetailsEnd(self, reqId):
+        self.broadcast_event('execDetailsEnd', vars())
+
+    def connectionClosed(self):
+        self.broadcast_event('connectionClosed', {})
+
+    def error(self, id=None, errorCode=None, errorMsg=None):
+        try:
+            logging.error(self.tick_process_message('error', vars()))
+            self.broadcast_event('error', vars())
+        except:
+            pass
+
+    def error_0(self, strvalue=None):
+        logging.error(self.tick_process_message('error_0', vars()))
+        self.broadcast_event('error_0', vars())
+
+    def error_1(self, id=None, errorCode=None, errorMsg=None):
+        logging.error(self.tick_process_message('error_1', vars()))        
+        self.broadcast_event('error_1', vars())
+
+    def updateMktDepth(self, tickerId, position, operation, side, price, size):
+        self.broadcast_event('updateMktDepth', vars())
+
+    def updateMktDepthL2(self, tickerId, position, marketMaker, operation, side, price, size):
+        self.broadcast_event('updateMktDepthL2', vars())
+
+    def updateNewsBulletin(self, msgId, msgType, message, origExchange):
+        self.broadcast_event('updateNewsBulletin', vars())
+
+    def managedAccounts(self, accountsList):
+        logging.info(self.tick_process_message('managedAccounts', vars()))
+        self.broadcast_event('managedAccounts', vars())
+
+    def receiveFA(self, faDataType, xml):
+        self.broadcast_event('receiveFA', vars())
+
+    def historicalData(self, reqId, date, open, high, low, close, volume, count, WAP, hasGaps):
+        self.broadcast_event('historicalData', vars())
+
+    def scannerParameters(self, xml):
+        self.broadcast_event('scannerParameters', vars())
+
+    def scannerData(self, reqId, rank, contractDetails, distance, benchmark, projection, legsStr):
+        self.broadcast_event('scannerData', vars())
+
+
+    def commissionReport(self, commissionReport):
+        self.broadcast_event('commissionReport', vars())
+
+
+    def currentTime(self, time):
+        self.broadcast_event('currentTime', vars())
+
+    def deltaNeutralValidation(self, reqId, underComp):
+        self.broadcast_event('deltaNeutralValidation', vars())
+
+
+    def fundamentalData(self, reqId, data):
+        self.broadcast_event('fundamentalData', vars())
+
+    def marketDataType(self, reqId, marketDataType):
+        self.broadcast_event('marketDataType', vars())
+
+
+    def realtimeBar(self, reqId, time, open, high, low, close, volume, wap, count):
+        self.broadcast_event('realtimeBar', vars())
+
+    def scannerDataEnd(self, reqId):
+        self.broadcast_event('scannerDataEnd', vars())
+
+
+    def tickSnapshotEnd(self, reqId):
+        self.broadcast_event('tickSnapshotEnd', vars())
+
+
+    def position(self, account, contract, pos, avgCost):
+        self.broadcast_event('position', vars())
+
+    def positionEnd(self):
+        self.broadcast_event('positionEnd', vars())
+
+    def accountSummary(self, reqId, account, tag, value, currency):
+        self.broadcast_event('accountSummary', vars())
+
+    def accountSummaryEnd(self, reqId):
+        self.broadcast_event('accountSummaryEnd', vars())

BIN
src/comms/ibgw/tws_event_handler.pyc


+ 268 - 0
src/comms/ibgw/tws_gateway.py

@@ -0,0 +1,268 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+import sys
+import copy
+from time import sleep, strftime
+import ConfigParser
+import logging
+import json
+
+from ib.ext.Contract import Contract
+from ib.ext.EClientSocket import EClientSocket
+
+from misc2.helpers import ContractHelper
+from comms.ibgw.base_messaging import Prosumer
+from comms.ibgw.tws_event_handler import TWS_event_handler
+from comms.ibgw.client_request_handler import ClientRequestHandler
+from comms.ibgw.subscription_manager import SubscriptionManager
+from comms.tws_protocol_helper import TWS_Protocol 
+import redis
+         
+class TWS_gateway():
+
+    
+    # monitor IB connection / heart beat
+#     ibh = None
+#     tlock = None
+#     ib_conn_status = None
+    TWS_GW_DEFAULT_CONFIG = {
+      'name': 'tws_gateway_server',
+      'bootstrap_host': 'localhost',
+      'bootstrap_port': 9092,
+      'redis_host': 'localhost',
+      'redis_port': 6379,
+      'redis_db': 0,
+      'tws_host': 'localhost',
+      'tws_api_port': 8496,
+      'tws_app_id': 38888,
+      'group_id': 'TWS_GW',
+      'session_timeout_ms': 10000,
+      'clear_offsets':  False,
+      'order_transmit': False,
+      'topics': list(TWS_Protocol.topicMethods) + list(TWS_Protocol.gatewayMethods)
+      }
+               
+    
+    def __init__(self, kwargs):
+        
+
+             
+        
+        self.kwargs = copy.copy(TWS_gateway.TWS_GW_DEFAULT_CONFIG)
+        for key in self.kwargs:
+            if key in kwargs:
+                self.kwargs[key] = kwargs.pop(key)        
+        self.kwargs.update(kwargs)        
+        
+        
+
+
+        '''
+            TWS_gateway start up sequence
+            
+            1. establish redis connection
+            2. initialize prosumer instance - gateway message handler
+            3. establish TWS gateway connectivity
+            
+            4. initialize listeners: ClientRequestHandler and SubscriptionManager
+            5. start the prosumer 
+        
+        '''
+
+        logging.info('starting up TWS_gateway...')
+        self.ib_order_transmit = self.kwargs['order_transmit']
+        logging.info('Order straight through (no-touch) flag = %s' % ('True' if self.ib_order_transmit == True else 'False'))
+        
+        
+        logging.info('establishing redis connection...')
+        self.initialize_redis()
+        
+        logging.info('starting up gateway message handler - kafka Prosumer...')        
+        self.gw_message_handler = Prosumer(name='tws_gw_prosumer', kwargs=self.kwargs)
+        
+        logging.info('initializing TWS_event_handler...')        
+        self.tws_event_handler = TWS_event_handler(self.gw_message_handler)
+        
+        logging.info('starting up IB EClientSocket...')
+        self.tws_connection = EClientSocket(self.tws_event_handler)
+        
+        logging.info('establishing TWS gateway connectivity...')
+        if not self.connect_tws():
+            logging.error('TWS_gateway: unable to establish connection to IB %s:%d' % 
+                          (self.kwargs['tws_host'], self.kwargs['tws_api_port']))
+            self.disconnect_tws()
+            sys.exit(-1)
+        else:
+            # start heart beat monitor
+            pass
+#             logging.info('starting up IB heart beat monitor...')
+#             self.tlock = Lock()
+#             self.ibh = IbHeartBeat(config)
+#             self.ibh.register_listener([self.on_ib_conn_broken])
+#             self.ibh.run()  
+
+        logging.info('start TWS_event_handler. Entering processing loop...')
+        self.gw_message_handler.start_prosumer()
+
+        logging.info('instantiating listeners...cli_req_handler')        
+        self.cli_req_handler = ClientRequestHandler('client_request_handler', self)
+        logging.info('instantiating listeners subscription manager...')
+        self.initialize_subscription_mgr()
+        logging.info('registering messages to listen...')
+        self.gw_message_handler.add_listeners([self.cli_req_handler])
+        self.gw_message_handler.add_listener_topics(self.contract_subscription_mgr, ['reqMktData'])
+
+        logging.info('**** Completed initialization sequence. ****')
+        self.main_loop()
+        
+
+    def initialize_subscription_mgr(self):
+        
+        self.contract_subscription_mgr = SubscriptionManager(self, self)
+        self.contract_subscription_mgr.register_persistence_callback(self.persist_subscriptions)
+        
+        
+        key = self.kwargs["subscription_manager.subscriptions.redis_key"]
+        if self.rs.get(key):
+            #contracts = map(lambda x: ContractHelper.kvstring2contract(x), json.loads(self.rs.get(key)))
+            
+            def is_outstanding(c):
+                
+                today = strftime('%Y%m%d') 
+                if c.m_expiry < today:
+                    logging.info('initialize_subscription_mgr: ignoring expired contract %s%s%s' % (c.m_expiry, c.m_strike, c.m_right))
+                    return False
+                return True
+            
+            contracts = filter(lambda x: is_outstanding(x), 
+                               map(lambda x: ContractHelper.kvstring2object(x, Contract), json.loads(self.rs.get(key))))
+            
+            
+            
+            
+            self.contract_subscription_mgr.load_subscription(contracts)
+        
+
+    def persist_subscriptions(self, contracts):
+         
+        key = self.kwargs["subscription_manager.subscriptions.redis_key"]
+        #cs = json.dumps(map(lambda x: ContractHelper.contract2kvstring(x) if x <> None else None, contracts))
+        cs = json.dumps(map(lambda x: ContractHelper.object2kvstring(x) if x <> None else None, contracts))
+        logging.debug('Tws_gateway: updating subscription table to redis store %s' % cs)
+        self.rs.set(key, cs)
+
+
+    def initialize_redis(self):
+
+        self.rs = redis.Redis(self.kwargs['redis_host'], self.kwargs['redis_port'], self.kwargs['redis_db'])
+        try:
+            self.rs.client_list()
+        except redis.ConnectionError:
+            logging.error('TWS_gateway: unable to connect to redis server using these settings: %s port:%d db:%d' % 
+                          (self.kwargs['redis_host'], self.kwargs['redis_port'], self.kwargs['redis_db']))
+            logging.error('aborting...')
+            sys.exit(-1)
+            
+
+    def connect_tws(self):
+        if type(self.kwargs['tws_app_id']) <> int:
+            logging.error('TWS_gateway:connect_tws. tws_app_id must be of int type but detected %s!' % str(type(kwargs['tws_app_id'])))
+            sys.exit(-1)
+            
+        logging.info('TWS_gateway - eConnect. Connecting to %s:%d App Id: %d...' % 
+                     (self.kwargs['tws_host'], self.kwargs['tws_api_port'], self.kwargs['tws_app_id']))
+        self.tws_connection.eConnect(self.kwargs['tws_host'], self.kwargs['tws_api_port'], self.kwargs['tws_app_id'])
+        
+        return self.tws_connection.isConnected()
+
+    def disconnect_tws(self, value=None):
+        sleep(2)
+        self.tws_connection.eDisconnect()
+
+
+
+
+    def on_ib_conn_broken(self, msg):
+        logging.error('TWS_gateway: detected broken IB connection!')
+        self.ib_conn_status = 'ERROR'
+        self.tlock.acquire() # this function may get called multiple times
+        try:                 # block until another party finishes executing
+            if self.ib_conn_status == 'OK': # check status
+                return                      # if already fixed up while waiting, return 
+            
+            self.eDisconnect()
+            self.eConnect()
+            while not self.tws_connection.isConnected():
+                logging.error('TWS_gateway: attempt to reconnect...')
+                self.eConnect()
+                sleep(2)
+            
+            # we arrived here because the connection has been restored
+            # resubscribe tickers again!
+            logging.info('TWS_gateway: IB connection restored...resubscribe contracts')
+            self.contract_subscription_mgr.force_resubscription()             
+            
+            
+        finally:
+            self.tlock.release()          
+        
+
+    def main_loop(self):
+        try:
+            logging.info('TWS_gateway:main_loop ***** accepting console input...')
+            while True: 
+                
+                sleep(.45)
+                
+        except (KeyboardInterrupt, SystemExit):
+                logging.error('TWS_gateway: caught user interrupt. Shutting down...')
+                self.gw_message_handler.set_stop()
+                self.gw_message_handler.join()
+                logging.info('TWS_gateway: Service shut down complete...')
+                sys.exit(0)        
+
+
+
+    
+class ConfigMap():
+    
+    def kwargs_from_file(self, path):
+        cfg = ConfigParser.ConfigParser()            
+        if len(cfg.read(path)) == 0: 
+            raise ValueError, "Failed to open config file [%s]" % path 
+
+        kwargs = {}
+        for section in cfg.sections():
+            optval_list = map(lambda o: (o, cfg.get(section, o)), cfg.options(section)) 
+            for ov in optval_list:
+                try:
+                    
+                    kwargs[ov[0]] = eval(ov[1])
+                except:
+                    continue
+                
+        #logging.debug('ConfigMap: %s' % kwargs)
+        return kwargs
+        
+    
+if __name__ == '__main__':
+    
+    if len(sys.argv) != 2:
+        print("Usage: %s <config file>" % sys.argv[0])
+        exit(-1)    
+
+
+
+    cfg_path= sys.argv[1:]
+    kwargs = ConfigMap().kwargs_from_file(cfg_path)
+   
+      
+    logconfig = kwargs['logconfig']
+    logconfig['format'] = '%(asctime)s %(levelname)-8s %(message)s'    
+    logging.basicConfig(**logconfig)        
+    
+    
+    app = TWS_gateway(kwargs)
+    
+     

+ 60 - 54
src/comms/kf_tws_gateway.py

@@ -43,12 +43,7 @@ class TWS_event_handler(EWrapper):
     TICKER_GAP = 1000
     producer = None
     
-    def __init__(self, producer):
-        
-        #client = KafkaClient()#{'bootstrap_servers': '%s:%s' % (host, port)})
-        #self.producer = SimpleProducer(client, async=False)
-        #self.producer = KafkaProducer(bootstrap_servers='%s:%s' % (host, port))    
-        #logging.info('TWS_event_handler: __init__ Creating kafka client producer at %s:%s' % (host, port))
+    def __init__(self, producer):        
         self.producer = producer
  
  
@@ -60,7 +55,7 @@ class TWS_event_handler(EWrapper):
             dict = self.tick_process_message(message, mapping)     
             if message == 'gw_subscriptions' or message == 'gw_subscription_changed':   
                 logging.info('TWS_event_handler: broadcast event: %s [%s]' % (dict['typeName'], dict))
-            self.producer.send(message, self.producer.message_dumps(dict))    
+            self.producer.send_message(message, self.producer.message_dumps(dict))    
         except:
             logging.error('broadcast_event: exception while encoding IB event to client:  [%s]' % message)
             logging.error(traceback.format_exc())
@@ -71,7 +66,7 @@ class TWS_event_handler(EWrapper):
     
     def tick_process_message(self, message_name, items):
         
-        t = {}
+
         t = items.copy()
         # if the tickerId is in the snapshot range
         # deduct the gap to derive the original tickerId
@@ -167,8 +162,11 @@ class TWS_event_handler(EWrapper):
         self.broadcast_event('connectionClosed', {})
 
     def error(self, id=None, errorCode=None, errorMsg=None):
-        logging.error(self.tick_process_message('error', vars()))
-        self.broadcast_event('error', vars())
+        try:
+            logging.error(self.tick_process_message('error', vars()))
+            self.broadcast_event('error', vars())
+        except:
+            pass
 
     def error_0(self, strvalue=None):
         logging.error(self.tick_process_message('error_0', vars()))
@@ -247,7 +245,7 @@ class TWS_event_handler(EWrapper):
 
 
 
-class TWS_gateway(threading.Thread):
+class TWS_gateway():
 
     
     # monitor IB connection / heart beat
@@ -261,6 +259,8 @@ class TWS_gateway(threading.Thread):
       'redis_host': 'localhost',
       'redis_port': 6379,
       'redis_db': 0,
+      'tws_host': 'localhost',
+      'tws_api_port': 8496,
       'group_id': 'TWS_GW',
       'session_timeout_ms': 10000,
       'clear_offsets':  False,
@@ -272,16 +272,13 @@ class TWS_gateway(threading.Thread):
         
 
              
-        super(TWS_gateway, self).__init__()
+        
         self.kwargs = copy.copy(TWS_gateway.TWS_GW_DEFAULT_CONFIG)
         for key in self.kwargs:
             if key in kwargs:
                 self.kwargs[key] = kwargs.pop(key)        
         self.kwargs.update(kwargs)        
         
-        # convert some config string values to object 
-        self.kwargs['topics'] = list(eval(self.kwargs['topics']))
-        self.ib_order_transmit = self.kwargs['order_transmit']
         
 
 
@@ -298,6 +295,7 @@ class TWS_gateway(threading.Thread):
         '''
 
         logging.info('starting up TWS_gateway...')
+        self.ib_order_transmit = self.kwargs['order_transmit']
         logging.info('Order straight through (no-touch) flag = %s' % ('True' if self.ib_order_transmit == True else 'False'))
         
         
@@ -306,21 +304,18 @@ class TWS_gateway(threading.Thread):
         
         logging.info('starting up gateway message handler - kafka Prosumer...')        
         self.gw_message_handler = Prosumer(name='tws_gw_prosumer', kwargs=self.kwargs)
-
         
-        logging.info('starting up TWS_event_handler...')        
+        logging.info('initializing TWS_event_handler...')        
         self.tws_event_handler = TWS_event_handler(self.gw_message_handler)
         
         logging.info('starting up IB EClientSocket...')
         self.tws_connection = EClientSocket(self.tws_event_handler)
-
-
-        
         
         logging.info('establishing TWS gateway connectivity...')
-        if not self.eConnect():
+        if not self.connect_tws():
             logging.error('TWS_gateway: unable to establish connection to IB %s:%d' % 
                           (self.kwargs['tws_host'], self.kwargs['tws_api_port']))
+            self.disconnect_tws()
             sys.exit(-1)
         else:
             # start heart beat monitor
@@ -331,22 +326,24 @@ class TWS_gateway(threading.Thread):
 #             self.ibh.register_listener([self.on_ib_conn_broken])
 #             self.ibh.run()  
 
-
+        logging.info('start TWS_event_handler. Entering processing loop...')
+        self.gw_message_handler.start_prosumer()
 
         logging.info('instantiating listeners...cli_req_handler')        
-        self.cli_req_handler = ClientRequestHandler('client_request_handler', self.gw_message_handler)
+        self.cli_req_handler = ClientRequestHandler('client_request_handler', self)
         logging.info('instantiating listeners subscription manager...')
         self.initialize_subscription_mgr()
         logging.info('registering messages to listen...')
         self.gw_message_handler.add_listeners([self.cli_req_handler])
-        self.gw_message_handler.add_listener_topics(self.contract_subscription_mgr, 'reqMktData')
-
-        logging.info('Completed initialization sequence.')
+        self.gw_message_handler.add_listener_topics(self.contract_subscription_mgr, ['reqMktData'])
 
+        logging.info('**** Completed initialization sequence. ****')
+        self.main_loop()
+        
 
     def initialize_subscription_mgr(self):
         
-        self.contract_subscription_mgr = SubscriptionManager(self, self.gw_message_handler)
+        self.contract_subscription_mgr = SubscriptionManager(self, self)
         self.contract_subscription_mgr.register_persistence_callback(self.persist_subscriptions)
         
         
@@ -392,33 +389,19 @@ class TWS_gateway(threading.Thread):
             sys.exit(-1)
             
 
-    def eConnect(self):
-        logging.info('TWS_gateway - eConnect. Connecting to %s:%s App Id: %s' % 
-                     (self.kwargs['tws_host'], self.kwargs['tws_api_port'], self.kwargs['name']))
-        self.tws_connection.eConnect(self.kwargs['tws_host'], self.kwargs['tws_api_port'], self.kwargs['name'])
+    def connect_tws(self):
+        logging.info('TWS_gateway - eConnect. Connecting to %s:%d App Id: %d...' % 
+                     (self.kwargs['tws_host'], self.kwargs['tws_api_port'], self.kwargs['tws_app_id']))
+        self.tws_connection.eConnect(self.kwargs['tws_host'], self.kwargs['tws_api_port'], self.kwargs['tws_app_id'])
+        
         return self.tws_connection.isConnected()
 
-    def eDisconnect(self, value=None):
+    def disconnect_tws(self, value=None):
         sleep(2)
         self.tws_connection.eDisconnect()
 
-    def run(self):
 
 
-        for message in self.gw_message_handler:
-             
-            logging.info("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
-                                         message.offset, message.key,
-                                         message.value))
- 
-#             print ("TWS_gateway: received client request %s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
-#                                          message.offset, message.key,
-#                                          message.value))
-             
-
-            getattr(self, message.topic, None)(message.value)
-            #self.cli_request_handler.task_done(message)
-
 
     def on_ib_conn_broken(self, msg):
         logging.error('TWS_gateway: detected broken IB connection!')
@@ -445,10 +428,24 @@ class TWS_gateway(threading.Thread):
             self.tlock.release()          
         
 
+    def main_loop(self):
+        try:
+            logging.info('TWS_gateway:main_loop ***** accepting console input...')
+            while True: 
+                
+                time.sleep(.45)
+                
+        except (KeyboardInterrupt, SystemExit):
+                logging.error('TWS_gateway: caught user interrupt. Shutting down...')
+                self.gw_message_handler.set_stop()
+                self.gw_message_handler.join()
+                logging.info('TWS_gateway: Service shut down complete...')
+                sys.exit(0)        
+
 class ClientRequestHandler(BaseMessageListener):
     
     def __init__(self, name, tws_gateway):
-        BaseMessageListener.__init__(self, name, tws_gateway)
+        BaseMessageListener.__init__(self, name)
         self.tws_connect = tws_gateway.tws_connection
             
 
@@ -581,7 +578,7 @@ class SubscriptionManager(BaseMessageListener):
     persist_f = None
     
     def __init__(self, name, tws_gateway):
-        BaseMessageListener.__init__(self, name, tws_gateway)
+        BaseMessageListener.__init__(self, name)
         self.tws_connect = tws_gateway.tws_connection
         self.handle = []    
         # contract key map to contract ID (index of the handle array)
@@ -752,6 +749,8 @@ def test_subscription():
     print s.is_subscribed(c), ContractHelper.printContract(s.itemAt(s.is_subscribed(c)))
     
     
+
+    
 class ConfigMap():
     
     def kwargs_from_file(self, path):
@@ -763,8 +762,13 @@ class ConfigMap():
         for section in cfg.sections():
             optval_list = map(lambda o: (o, cfg.get(section, o)), cfg.options(section)) 
             for ov in optval_list:
-                kwargs[ov[0]] = ov[1]
-        
+                try:
+                    
+                    kwargs[ov[0]] = eval(ov[1])
+                except:
+                    continue
+                
+        #logging.debug('ConfigMap: %s' % kwargs)
         return kwargs
         
     
@@ -774,19 +778,21 @@ if __name__ == '__main__':
         print("Usage: %s <config file>" % sys.argv[0])
         exit(-1)    
 
+
+
     cfg_path= sys.argv[1:]
     kwargs = ConfigMap().kwargs_from_file(cfg_path)
    
       
-    logconfig = eval(kwargs['logconfig'])
+    logconfig = kwargs['logconfig']
     logconfig['format'] = '%(asctime)s %(levelname)-8s %(message)s'    
     logging.basicConfig(**logconfig)        
     
     
     app = TWS_gateway(kwargs)
-    app.start()
+    
      
-    print 'TWS_gateway started.'
+
 #     
 
 

BIN
src/comms/test/__init__.pyc


+ 3 - 2
src/comms/test/base_messaging.py

@@ -88,7 +88,8 @@ class BaseProducer(threading.Thread, Subscriber):
                 # to prevent excessive CPU use
                 time.sleep(0.1)
             
-            logging.info('completed run')
+            
+            logging.info ('******** BaseProducer exit done.')
             
                 
         except NoBrokersAvailable:
@@ -302,7 +303,7 @@ class BaseConsumer(threading.Thread, Publisher):
                 continue
             
             
-        logging.info ('**********************************************done')
+        logging.info ('******** BaseConsumer exit done.')
 
 
 

+ 200 - 0
src/comms/test/quick_test_ib.py

@@ -0,0 +1,200 @@
+import sys
+import copy
+from time import sleep, strftime
+import time, datetime
+import ConfigParser
+from optparse import OptionParser
+import logging
+import thread
+import threading
+import traceback
+import json
+from threading import Lock
+from ib.ext.Contract import Contract
+from ib.ext.EWrapper import EWrapper
+from ib.ext.EClientSocket import EClientSocket
+
+class Wrapger(EWrapper):
+    def tickPrice(self, tickerId, field, price, canAutoExecute):
+        """ generated source for method tickPrice """
+
+   
+    def tickSize(self, tickerId, field, size):
+        """ generated source for method tickSize """
+
+   
+    def tickOptionComputation(self, tickerId, field, impliedVol, delta, optPrice, pvDividend, gamma, vega, theta, undPrice):
+        """ generated source for method tickOptionComputation """
+
+   
+    def tickGeneric(self, tickerId, tickType, value):
+        """ generated source for method tickGeneric """
+
+   
+    def tickString(self, tickerId, tickType, value):
+        """ generated source for method tickString """
+
+   
+    def tickEFP(self, tickerId, tickType, basisPoints, formattedBasisPoints, impliedFuture, holdDays, futureExpiry, dividendImpact, dividendsToExpiry):
+        """ generated source for method tickEFP """
+
+   
+    def orderStatus(self, orderId, status, filled, remaining, avgFillPrice, permId, parentId, lastFillPrice, clientId, whyHeld):
+        """ generated source for method orderStatus """
+
+   
+    def openOrder(self, orderId, contract, order, orderState):
+        """ generated source for method openOrder """
+
+   
+    def openOrderEnd(self):
+        """ generated source for method openOrderEnd """
+
+   
+    def updateAccountValue(self, key, value, currency, accountName):
+        """ generated source for method updateAccountValue """
+
+   
+    def updatePortfolio(self, contract, position, marketPrice, marketValue, averageCost, unrealizedPNL, realizedPNL, accountName):
+        """ generated source for method updatePortfolio """
+
+   
+    def updateAccountTime(self, timeStamp):
+        """ generated source for method updateAccountTime """
+
+   
+    def accountDownloadEnd(self, accountName):
+        """ generated source for method accountDownloadEnd """
+
+   
+    def nextValidId(self, orderId):
+        """ generated source for method nextValidId """
+
+   
+    def contractDetails(self, reqId, contractDetails):
+        """ generated source for method contractDetails """
+
+   
+    def bondContractDetails(self, reqId, contractDetails):
+        """ generated source for method bondContractDetails """
+
+   
+    def contractDetailsEnd(self, reqId):
+        """ generated source for method contractDetailsEnd """
+
+   
+    def execDetails(self, reqId, contract, execution):
+        """ generated source for method execDetails """
+
+   
+    def execDetailsEnd(self, reqId):
+        """ generated source for method execDetailsEnd """
+
+   
+    def updateMktDepth(self, tickerId, position, operation, side, price, size):
+        """ generated source for method updateMktDepth """
+
+   
+    def updateMktDepthL2(self, tickerId, position, marketMaker, operation, side, price, size):
+        """ generated source for method updateMktDepthL2 """
+
+   
+    def updateNewsBulletin(self, msgId, msgType, message, origExchange):
+        """ generated source for method updateNewsBulletin """
+
+   
+    def managedAccounts(self, accountsList):
+        """ generated source for method managedAccounts """
+
+   
+    def receiveFA(self, faDataType, xml):
+        """ generated source for method receiveFA """
+
+   
+    def historicalData(self, reqId, date, open, high, low, close, volume, count, WAP, hasGaps):
+        """ generated source for method historicalData """
+
+   
+    def scannerParameters(self, xml):
+        """ generated source for method scannerParameters """
+
+   
+    def scannerData(self, reqId, rank, contractDetails, distance, benchmark, projection, legsStr):
+        """ generated source for method scannerData """
+
+   
+    def scannerDataEnd(self, reqId):
+        """ generated source for method scannerDataEnd """
+
+   
+    def realtimeBar(self, reqId, time, open, high, low, close, volume, wap, count):
+        """ generated source for method realtimeBar """
+
+   
+    def currentTime(self, time):
+        """ generated source for method currentTime """
+
+   
+    def fundamentalData(self, reqId, data):
+        """ generated source for method fundamentalData """
+
+   
+    def deltaNeutralValidation(self, reqId, underComp):
+        """ generated source for method deltaNeutralValidation """
+
+   
+    def tickSnapshotEnd(self, reqId):
+        """ generated source for method tickSnapshotEnd """
+
+   
+    def marketDataType(self, reqId, marketDataType):
+        """ generated source for method marketDataType """
+
+   
+    def commissionReport(self, commissionReport):
+        """ generated source for method commissionReport """
+
+   
+    def position(self, account, contract, pos, avgCost):
+        """ generated source for method position """
+
+   
+    def positionEnd(self):
+        """ generated source for method positionEnd """
+
+   
+    def accountSummary(self, reqId, account, tag, value, currency):
+        """ generated source for method accountSummary """
+
+   
+    def accountSummaryEnd(self, reqId):
+        """ generated source for method accountSummaryEnd """
+    
+    def connectionClosed(self):
+        """ generated source for method accountSummaryEnd """
+
+    def error(self, id=None, errorCode=None, errorMsg=None):
+        """ generated source for method accountSummaryEnd """
+
+    def error_0(self, strvalue=None):
+        """ generated source for method accountSummaryEnd """
+
+    def error_1(self, id=None, errorCode=None, errorMsg=None):
+        """ generated source for method accountSummaryEnd """
+    
+
+def test_IB():
+    ew = Wrapger()
+    es = EClientSocket(ew)
+    es.eConnect('localhost', 7496, 5555)
+    print es.isConnected()
+    sleep(2)
+    print 'disconnecting...'
+    es.eDisconnect()
+
+
+if __name__ == '__main__':
+    
+
+
+    test_IB()    

+ 0 - 778
src/comms/tws_gateway.py

@@ -1,778 +0,0 @@
-#!/usr/bin/env python
-# -*- coding: utf-8 -*-
-
-import sys
-from time import sleep, strftime
-import time, datetime
-import ConfigParser
-from optparse import OptionParser
-import logging
-import thread
-import threading
-import traceback
-import json
-from threading import Lock
-from ib.ext.Contract import Contract
-from ib.ext.EWrapper import EWrapper
-from ib.ext.EClientSocket import EClientSocket
-from ib.ext.ExecutionFilter import ExecutionFilter
-from ib.ext.Execution import Execution
-from ib.ext.OrderState import OrderState
-from ib.ext.Order import Order
-
-from kafka import KafkaProducer
-from kafka import KafkaConsumer
-from kafka.errors import KafkaError
-
-
-from misc2.helpers import ContractHelper, OrderHelper, ExecutionFilterHelper
-from comms.ib_heartbeat import IbHeartBeat
-
-from comms.test.base_messaging import Prosumer, BaseMessageListener
-from misc2.observer import Subscriber, Publisher
-from tws_protocol_helper import TWS_Protocol 
-
-import redis
-         
-
-        
-        
-class TWS_event_handler(EWrapper):
-
-    TICKER_GAP = 1000
-    
-    
-    def __init__(self, tws_gateway):
-        
-        # reference to the parent TWS_gateway
-        self.producer = tws_gateway
- 
- 
-
- 
-    def broadcast_event(self, message, mapping): #, source='IB'):
-
-        try:
-            dict = self.tick_process_message(message, mapping) #, source)     
-            if message == 'gw_subscriptions' or message == 'gw_subscription_changed':   
-                logging.info('TWS_event_handler: broadcast event: %s [%s]' % (dict['typeName'], dict))
-            self.producer.send(message, self.producer.message_dumps(dict))    
-        except:
-            logging.error('broadcast_event: exception while encoding IB event to client:  [%s]' % message)
-            logging.error(traceback.format_exc())
-            
-            
-            
-
-    
-    def tick_process_message(self, message_name, items):  #, source):
-        
-        
-        t = items.copy()
-        # if the tickerId is in the snapshot range
-        # deduct the gap to derive the original tickerId
-        # --- check logic in subscription manager
-        if (t['tickerId']  >= TWS_event_handler.TICKER_GAP):
-            t['tickerId'] = t['tickerId']  - TWS_event_handler.TICKER_GAP
-        try:
-            del(t['self'])
-        except (KeyError, ):
-            pass          
-        
-
-        for k,v in t.iteritems():
-                #print k, v, type(v)
-                #if type(v) in [Contract, Execution, ExecutionFilter, OrderState, Order, CommissionReport]:
-            if 'ib.ext.' in str(type(v)):     
-                t[k] = v.__dict__
-            else:
-                t[k] = v
-        
-               
-#         t['ts'] = time.time()
-#         t['typeName'] = message_name
-#         t['source'] = source
-            
-        
-        return t  
-            
-                
-    
-    def tickPrice(self, tickerId, field, price, canAutoExecute):
-        
-        self.broadcast_event('tickPrice', vars())
-
-    def tickSize(self, tickerId, field, size):
-        
-        self.broadcast_event('tickSize', vars()) #vars())
-
-    def tickOptionComputation(self, tickerId, field, impliedVol, delta, optPrice, pvDividend, gamma, vega, theta, undPrice):
-        
-        #self.broadcast_event('tickOptionComputation', self.tick_process_message(vars())) #vars())
-        pass
-
-    def tickGeneric(self, tickerId, tickType, value):
-        #self.broadcast_event('tickGeneric', vars())
-        self.broadcast_event('tickGeneric', vars()) #vars())
-
-    def tickString(self, tickerId, tickType, value):
-        #self.broadcast_event('tickString', vars())
-        self.broadcast_event('tickString', vars()) #vars())
-
-    def tickEFP(self, tickerId, tickType, basisPoints, formattedBasisPoints, impliedFuture, holdDays, futureExpiry, dividendImpact, dividendsToExpiry):
-        self.broadcast_event('tickEFP', vars())
-
-    def orderStatus(self, orderId, status, filled, remaining, avgFillPrice, permId, parentId, lastFillPrice, clientId, whyHeId):
-        self.broadcast_event('orderStatus', vars())
-
-    def openOrder(self, orderId, contract, order, state):
-        self.broadcast_event('openOrder', vars())
-
-    def openOrderEnd(self):
-        self.broadcast_event('openOrderEnd', vars())
-
-    def updateAccountValue(self, key, value, currency, accountName):
-        self.broadcast_event('updateAccountValue', vars())
-
-    def updatePortfolio(self, contract, position, marketPrice, marketValue, averageCost, unrealizedPNL, realizedPNL, accountName):
-        self.broadcast_event('updatePortfolio', vars())
-
-    def updateAccountTime(self, timeStamp):
-        self.broadcast_event('updateAccountTime', vars())
-
-    def accountDownloadEnd(self, accountName):
-        self.broadcast_event('accountDownloadEnd', vars())
-
-    def nextValidId(self, orderId):
-        self.broadcast_event('nextValidId', vars())
-
-    def contractDetails(self, reqId, contractDetails):
-        self.broadcast_event('contractDetails', vars())
-
-    def contractDetailsEnd(self, reqId):
-        self.broadcast_event('contractDetailsEnd', vars())
-
-    def bondContractDetails(self, reqId, contractDetails):
-        self.broadcast_event('bondContractDetails', vars())
-
-    def execDetails(self, reqId, contract, execution):
-        self.broadcast_event('execDetails', vars())
-
-    def execDetailsEnd(self, reqId):
-        self.broadcast_event('execDetailsEnd', vars())
-
-    def connectionClosed(self):
-        self.broadcast_event('connectionClosed', {})
-
-    def error(self, id=None, errorCode=None, errorMsg=None):
-        logging.error(self.serialize_vars_to_dict('error', vars()))
-        self.broadcast_event('error', vars())
-
-    def error_0(self, strvalue=None):
-        logging.error(self.serialize_vars_to_dict('error_0', vars()))
-        self.broadcast_event('error_0', vars())
-
-    def error_1(self, id=None, errorCode=None, errorMsg=None):
-        logging.error(self.serialize_vars_to_dict('error_1', vars()))        
-        self.broadcast_event('error_1', vars())
-
-    def updateMktDepth(self, tickerId, position, operation, side, price, size):
-        self.broadcast_event('updateMktDepth', vars())
-
-    def updateMktDepthL2(self, tickerId, position, marketMaker, operation, side, price, size):
-        self.broadcast_event('updateMktDepthL2', vars())
-
-    def updateNewsBulletin(self, msgId, msgType, message, origExchange):
-        self.broadcast_event('updateNewsBulletin', vars())
-
-    def managedAccounts(self, accountsList):
-        logging.info(self.serialize_vars_to_dict('managedAccounts', vars()))
-        self.broadcast_event('managedAccounts', vars())
-
-    def receiveFA(self, faDataType, xml):
-        self.broadcast_event('receiveFA', vars())
-
-    def historicalData(self, reqId, date, open, high, low, close, volume, count, WAP, hasGaps):
-        self.broadcast_event('historicalData', vars())
-
-    def scannerParameters(self, xml):
-        self.broadcast_event('scannerParameters', vars())
-
-    def scannerData(self, reqId, rank, contractDetails, distance, benchmark, projection, legsStr):
-        self.broadcast_event('scannerData', vars())
-
-
-    def commissionReport(self, commissionReport):
-        self.broadcast_event('commissionReport', vars())
-
-
-    def currentTime(self, time):
-        self.broadcast_event('currentTime', vars())
-
-    def deltaNeutralValidation(self, reqId, underComp):
-        self.broadcast_event('deltaNeutralValidation', vars())
-
-
-    def fundamentalData(self, reqId, data):
-        self.broadcast_event('fundamentalData', vars())
-
-    def marketDataType(self, reqId, marketDataType):
-        self.broadcast_event('marketDataType', vars())
-
-
-    def realtimeBar(self, reqId, time, open, high, low, close, volume, wap, count):
-        self.broadcast_event('realtimeBar', vars())
-
-    def scannerDataEnd(self, reqId):
-        self.broadcast_event('scannerDataEnd', vars())
-
-
-    def tickSnapshotEnd(self, reqId):
-        self.broadcast_event('tickSnapshotEnd', vars())
-
-
-    def position(self, account, contract, pos, avgCost):
-        self.broadcast_event('position', vars())
-
-    def positionEnd(self):
-        self.broadcast_event('positionEnd', vars())
-
-    def accountSummary(self, reqId, account, tag, value, currency):
-        self.broadcast_event('accountSummary', vars())
-
-    def accountSummaryEnd(self, reqId):
-        self.broadcast_event('accountSummaryEnd', vars())
-
-
-
-class TWS_gateway():
-
-    # config
-    config = None
-    # redis connection
-    rs = None
-
-    
-    # channel clients' requests to IB/TWS
-    gw_message_prosumer = None
-
-    # manage conID / contracts mapping
-    contract_subscription_mgr = None  
-    
-    connection = None
-    
-    # handler to process incoming IB/TWS messages and echo back to clients  
-    tws_event_handler = None
-    
-    # monitor IB connection / heart beat
-    ibh = None
-    tlock = None
-    ib_conn_status = None
-    ib_order_transmit = False
-    
-    
-    def __init__(self, host, port, clientId, kafka_host, kafka_port, config):
-        super(TWS_gateway, self).__init__()
-        self.config = config
-        self.host = host
-        self.port = port
-        self.clientId = clientId
-        self.ib_order_transmit = config.get("tws_gateway", "tws_gateway.order_transmit").strip('"').strip("'") if \
-                                        config.get("tws_gateway", "tws_gateway.order_transmit").strip('"').strip("'") <> None\
-                                        else False
-        
-        logging.info('starting up TWS_gateway...')
-        logging.info('Order straight through (no-touch) flag = %s' % ('True' if self.ib_order_transmit == True else 'False'))
-
-        logging.info('connecting to Redis server...')
-        self.initialize_redis(config)
-        
-        
-        logging.info('starting up gateway message handler - kafka Prosumer...')
-        client_requests = list(TWS_Protocol.topicMethods) + list(TWS_Protocol.gatewayMethods)
-        self.gw_message_prosumer = Prosumer(name='tws_gw_prosumer', kwargs={'bootstrap_host':'localhost', 'bootstrap_port':9092,
-                                        'redis_host':'localhost', 'redis_port':6379, 'redis_db':0,
-                                        'group_id': 'groupA', 'session_timeout_ms':10000,
-                                                 'topics': client_requests, 'clear_offsets' : 0})
-        
-        logging.info('register listeners for client requests')
-        
-        logging.info('starting up TWS_event_handler...')
-        
-        self.tws_event_handler = TWS_event_handler(self.gw_message_prosumer)
-        logging.info('starting up IB EClientSocket...')
-        self.connection = EClientSocket(self.tws_event_handler)
-        
-        
-
-        
-
-        
-        #
-        #    register listeners
-        #
-        
-
-
-         
-        
-
-        if not self.eConnect():
-            logging.error('TWS_gateway: unable to establish connection to IB %s:%d' % (self.host, self.port))
-            sys.exit(-1)
-        else:
-            # start heart beat monitor
-            logging.info('starting up IB heart beat monitor...')
-            self.tlock = Lock()
-            self.ibh = IbHeartBeat(config)
-            self.ibh.register_listener([self.on_ib_conn_broken])
-            self.ibh.run()  
-
-
-        logging.info('starting up subscription manager...')
-        self.initialize_subscription_mgr()
-
-
-    def start_gateway(self):
-        self.get_gw_message_prosumer().start_prosumer()
-
-    def initialize_subscription_mgr(self):
-        
-        self.contract_subscription_mgr = SubscriptionManager(self)
-        self.contract_subscription_mgr.register_persistence_callback(self.persist_subscriptions)
-        key = self.config.get("tws_gateway",  "subscription_manager.subscriptions.redis_key").strip('"').strip("'")
-        if self.rs.get(key):
-            #contracts = map(lambda x: ContractHelper.kvstring2contract(x), json.loads(self.rs.get(key)))
-            
-            def is_outstanding(c):
-                
-                today = time.strftime('%Y%m%d') 
-                if c.m_expiry < today:
-                    logging.info('initialize_subscription_mgr: ignoring expired contract %s%s%s' % (c.m_expiry, c.m_strike, c.m_right))
-                    return False
-                return True
-            
-            contracts = filter(lambda x: is_outstanding(x), 
-                               map(lambda x: ContractHelper.kvstring2object(x, Contract), json.loads(self.rs.get(key))))
-            
-            
-            
-            
-            self.contract_subscription_mgr.load_subscription(contracts)
-        
-
-    def persist_subscriptions(self, contracts):
-         
-        key = self.config.get("tws_gateway",  "subscription_manager.subscriptions.redis_key").strip('"').strip("'")
-        #cs = json.dumps(map(lambda x: ContractHelper.contract2kvstring(x) if x <> None else None, contracts))
-        cs = json.dumps(map(lambda x: ContractHelper.object2kvstring(x) if x <> None else None, contracts))
-        logging.debug('Tws_gateway: updating subscription table to redis store %s' % cs)
-        self.rs.set(key, cs)
-
-
-    def initialize_redis(self, config):
-        r_host = config.get("redis", "redis.server").strip('"').strip("'")
-        r_port = config.get("redis", "redis.port")
-        r_db = config.get("redis", "redis.db")     
-
-        self.rs = redis.Redis(r_host, r_port, r_db)
-        try:
-            self.rs.client_list()
-        except redis.ConnectionError:
-            logging.error('TWS_gateway: unable to connect to redis server using these settings: %s port:%d db:%d' % (r_host, r_port, r_db))
-            logging.error('aborting...')
-            sys.exit(-1)
-            
-
-    def eConnect(self):
-        logging.info('ClientRequestHandler - eConnect. Connecting to %s:%s App Id: %s' % (self.host, self.port, self.clientId))
-        self.connection.eConnect(self.host, self.port, self.clientId)
-        return self.tws_connect.isConnected()
-
-    def eDisconnect(self, value=None):
-        sleep(2)
-        self.connection.eDisconnect()
-    
-
-    def get_gw_message_prosumer(self):
-        return self.gw_message_prosumer
-    
- 
-
-
-    def on_ib_conn_broken(self, msg):
-        logging.error('TWS_gateway: detected broken IB connection!')
-        self.ib_conn_status = 'ERROR'
-        self.tlock.acquire() # this function may get called multiple times
-        try:                 # block until another party finishes executing
-            if self.ib_conn_status == 'OK': # check status
-                return                      # if already fixed up while waiting, return 
-            
-            self.eDisconnect()
-            self.eConnect()
-            while not self.connection.isConnected():
-                logging.error('TWS_gateway: attempt to reconnect...')
-                self.eConnect()
-                sleep(2)
-            
-            # we arrived here because the connection has been restored
-            # resubscribe tickers again!
-            logging.info('TWS_gateway: IB connection restored...resubscribe contracts')
-            self.contract_subscription_mgr.force_resubscription()             
-            
-            
-        finally:
-            self.tlock.release()          
-        
-
-class ClientRequestHandler(BaseMessageListener):
-    
-    def __init__(self, name, tws_gateway):
-        BaseMessageListener.__init__(self, name, tws_gateway)
-        self.producer = tws_gateway
-        self.tws_connect = tws_gateway.connection
-            
-    
-    
-    def reqAccountUpdates(self, value=None):
-        logging.info('ClientRequestHandler - reqAccountUpdates value=%s' % value)
-        self.tws_connect.reqAccountUpdates(1, '')
-    
-    def reqAccountSummary(self, value):
-        logging.info('ClientRequestHandler - reqAccountSummary value=%s' % value)
-        
-        vals = map(lambda x: x.encode('ascii') if isinstance(x, unicode) else x, json.loads(value))
-        self.tws_connect.reqAccountSummary(vals[0], vals[1], vals[2])
-        
-    def reqOpenOrders(self, value=None):
-        self.tws_connect.reqOpenOrders()
-    
-    def reqPositions(self, value=None):
-        self.tws_connect.reqPositions()
-        
-        
-    def reqExecutions(self, value):
-        try:
-            filt = ExecutionFilter() if value == '' else ExecutionFilterHelper.kvstring2object(value, ExecutionFilter)
-            self.tws_connect.reqExecutions(0, filt)
-        except:
-            logging.error(traceback.format_exc())
-    
-    
-    def reqIds(self, value=None):
-        self.tws_connect.reqIds(1)
-    
-    
-    def reqNewsBulletins(self):
-        self.tws_connect.reqNewsBulletins(1)
-    
-    
-    def cancelNewsBulletins(self):
-        self.tws_connect.cancelNewsBulletins()
-    
-    
-    def setServerLogLevel(self):
-        self.tws_connect.setServerLogLevel(3)
-    
-    
-    def reqAutoOpenOrders(self):
-        self.tws_connect.reqAutoOpenOrders(1)
-    
-    
-    def reqAllOpenOrders(self):
-        self.tws_connect.reqAllOpenOrders()
-    
-    
-    def reqManagedAccts(self):
-        self.tws_connect.reqManagedAccts()
-    
-    
-    def requestFA(self):
-        self.tws_connect.requestFA(1)
-    
-    
-    def reqMktData(self, sm_contract):
-        logging.info('ClientRequestHandler received reqMktData request <no action: pass...> : %s' % sm_contract)
-    
-    def reqHistoricalData(self):
-        contract = Contract()
-        contract.m_symbol = 'QQQQ'
-        contract.m_secType = 'STK'
-        contract.m_exchange = 'SMART'
-        endtime = strftime('%Y%m%d %H:%M:%S')
-        self.tws_connect.reqHistoricalData(
-            tickerId=1,
-            contract=contract,
-            endDateTime=endtime,
-            durationStr='1 D',
-            barSizeSetting='1 min',
-            whatToShow='TRADES',
-            useRTH=0,
-            formatDate=1)
-    
-    
-    def placeOrder(self, value=None):
-        logging.info('TWS_gateway - placeOrder value=%s' % value)
-        try:
-            vals = json.loads(value)
-        except ValueError:
-            logging.error('TWS_gateway - placeOrder Exception %s' % traceback.format_exc())
-            return
-        
-    #        c = ContractHelper.kvstring2contract(vals[1])
-        o = OrderHelper.kvstring2object(vals[2], Order)
-        o.__dict__['transmit'] = self.ib_order_transmit
-    #         print c.__dict__
-    #         print o.__dict__
-    #         print '---------------------'
-    
-           
-        #self.connection.placeOrder(vals[0], ContractHelper.kvstring2contract(vals[1]), OrderHelper.kvstring2object(vals[2], Order))
-        self.tws_connect.placeOrder(vals[0], ContractHelper.kvstring2object(vals[1], Contract), OrderHelper.kvstring2object(vals[2], Order))
-    #        self.connection.placeOrder(orderId, contract, newOptOrder)
-        
-
-    
-    """
-       Client requests to TWS_gateway
-    """
-    def gw_req_subscriptions(self, value=None):
-        
-        #subm = map(lambda i: ContractHelper.contract2kvstring(self.contract_subscription_mgr.handle[i]), range(len(self.contract_subscription_mgr.handle)))
-        #subm = map(lambda i: ContractHelper.object2kvstring(self.contract_subscription_mgr.handle[i]), range(len(self.contract_subscription_mgr.handle)))
-        subm = map(lambda i: (i, ContractHelper.object2kvstring(self.contract_subscription_mgr.handle[i])), range(len(self.contract_subscription_mgr.handle)))
-        
-        print subm
-        if subm:
-            self.producer.send_message('gw_subscriptions', self.produce.message_dumps({'subscriptions': subm}))
-            
-            
-       
-    
-        
-class SubscriptionManager(BaseMessageListener):
-    
-    
-    # array list of contracts
-    handle = []
-    # contract key map to contract ID (index of the handle array)
-    tickerId = {}
-    
-    persist_f = None
-    
-    def __init__(self, name, tws_gateway):
-        BaseMessageListener.__init__(self, name)
-        self.producer = tws_gateway
-        self.tws_connect = tws_gateway.connection    
-   
-    def load_subscription(self, contracts):
-        for c in contracts:
-            self.reqMktData(c)
-            
-        self.dump()
-    
-    # returns -1 if not found, else the key id (which could be a zero value)
-    def is_subscribed(self, contract):
-        #print self.conId.keys()
-        ckey = ContractHelper.makeRedisKeyEx(contract) 
-        if ckey not in self.tickerId.keys():
-            return -1
-        else:
-            # note that 0 can be a key 
-            # be careful when checking the return values
-            # check for true false instead of using implicit comparsion
-            return self.tickerId[ckey]
-    
-
-#     def reqMktDataxx(self, contract):
-#         print '---------------'
-#         contractTuple = ('USO', 'STK', 'SMART', 'USD', '', 0.0, '')
-#         stkContract = self.makeStkContract(contractTuple)     
-#         stkContract.m_includeExpired = False       
-#         self.parent.connection.reqMktData(1, stkContract, '', False)     
-# 
-#         contractTuple = ('IBM', 'STK', 'SMART', 'USD', '', 0.0, '')
-#         stkContract = self.makeStkContract(contractTuple)
-#         stkContract.m_includeExpired = False
-#         print stkContract   
-#         print stkContract.__dict__         
-#         self.parent.connection.reqMktData(2, stkContract, '', False)     
-#             
-            
-    def reqMktData(self, kvs_contract):
-                  
-        
-        #logging.info('SubscriptionManager: reqMktData')
-        logging.info('SubscriptionManager received reqMktData request: %s' % kvs_contract)
-  
-        def add_subscription(contract):
-            self.handle.append(contract)
-            newId = len(self.handle) - 1
-            self.tickerId[ContractHelper.makeRedisKeyEx(contract)] = newId 
-             
-            return newId
-  
-        contract = ContractHelper.kvstring2object(kvs_contract, Contract)
-        id = self.is_subscribed(contract)
-        if id == -1: # not found
-            id = add_subscription(contract)
-
-            #
-            # the conId must be set to zero when calling TWS reqMktData
-            # otherwise TWS will fail to subscribe the contract
-            
-            self.tws_connect.reqMktData(id, contract, '', False) 
-            
-            
-                   
-            if self.persist_f:
-                logging.debug('SubscriptionManager reqMktData: trigger callback')
-                self.persist_f(self.handle)
-                
-            logging.info('SubscriptionManager: reqMktData. Requesting market data, id = %d, contract = %s' % (id, ContractHelper.makeRedisKeyEx(contract)))
-        
-        else:    
-            self.tws_connect.reqMktData(1000 + id, contract, '', True)
-            logging.info('SubscriptionManager: reqMktData: contract already subscribed. Request snapshot = %d, contract = %s' % (id, ContractHelper.makeRedisKeyEx(contract)))
-        #self.dump()
-
-        #
-        # instruct gateway to broadcast new id has been assigned to a new contract
-        #
-        self.producer.send_message('gw_notify_subscription_changed', self.producer.message_dumps({id: ContractHelper.object2kvstring(contract)}))
-        #>>>self.parent.gw_notify_subscription_changed({id: ContractHelper.object2kvstring(contract)})
-        logging.info('SubscriptionManager reqMktData: gw_notify_subscription_changed: %d:%s' % (id, ContractHelper.makeRedisKeyEx(contract)))
-        
-        
-        
-#     def makeStkContract(self, contractTuple):
-#         newContract = Contract()
-#         newContract.m_symbol = contractTuple[0]
-#         newContract.m_secType = contractTuple[1]
-#         newContract.m_exchange = contractTuple[2]
-#         newContract.m_currency = contractTuple[3]
-#         newContract.m_expiry = contractTuple[4]
-#         newContract.m_strike = contractTuple[5]
-#         newContract.m_right = contractTuple[6]
-#         print 'Contract Values:%s,%s,%s,%s,%s,%s,%s:' % contractTuple
-#         return newContract        
-   
-    # use only after a broken connection is restored
-    # to re request market data 
-    def force_resubscription(self):
-        # starting from index 1 of the contract list, call  reqmktdata, and format the result into a list of tuples
-        for i in range(1, len(self.handle)):
-            self.tws_connect.reqMktData(i, self.handle[i], '', False)
-            logging.info('force_resubscription: %s' % ContractHelper.printContract(self.handle[i]))
-       
-            
-    def itemAt(self, id):
-        if id > 0 and id < len(self.handle):
-            return self.handle[id]
-        return -1
-
-    def dump(self):
-        
-        logging.info('subscription manager table:---------------------')
-        logging.info(''.join('%d: {%s},\n' % (i,  ''.join('%s:%s, ' % (k, v) for k, v in self.handle[i].__dict__.iteritems() )\
-                                     if self.handle[i] <> None else ''          ) for i in range(len(self.handle)))\
-                     )
-        
-        #logging.info( ''.join('%s[%d],\n' % (k, v) for k, v in self.conId.iteritems()))
-        logging.info( 'Number of instruments subscribed: %d' % len(self.handle))
-        logging.info( '------------------------------------------------')
-        
-    def register_persistence_callback(self, func):
-        logging.info('subscription manager: registering callback')
-        self.persist_f = func
-        
-
-    
-        
-
-
-
-                
-    
-def test_subscription():        
-    s = SubscriptionManager()
-    contractTuple = ('HSI', 'FUT', 'HKFE', 'HKD', '20151029', 0, '')
-    c = ContractHelper.makeContract(contractTuple)   
-    print s.is_subscribed(c)
-    print s.add_subscription(c)
-    print s.is_subscribed(c)
-    s.dump()
-    
-    fr = open('/home/larry-13.04/workspace/finopt/data/subscription-hsio.txt')
-    for l in fr.readlines():
-        if l[0] <> '#':
-             
-            s.add_subscription(ContractHelper.makeContract(tuple([t for t in l.strip('\n').split(',')])))    
-    fr.close()
-    s.dump()
-    
-    fr = open('/home/larry-13.04/workspace/finopt/data/subscription-hsio.txt')
-    for l in fr.readlines():
-        if l[0] <> '#':
-             
-            print s.add_subscription(ContractHelper.makeContract(tuple([t for t in l.strip('\n').split(',')])))    
-    s.dump()
-    contractTuple = ('HSI', 'FUT', 'HKFE', 'HKD', '20151127', 0, '')
-    c = ContractHelper.makeContract(contractTuple)   
-    print s.is_subscribed(c)
-    print s.add_subscription(c)
-    print s.is_subscribed(c), ContractHelper.printContract(s.itemAt(s.is_subscribed(c))) 
-    
-    print 'test itemAt:'
-    contractTuple = ('HSI', 'OPT', 'HKFE', 'HKD', '20151127', 21400, 'C')
-    c = ContractHelper.makeContract(contractTuple)
-    print s.is_subscribed(c), ContractHelper.printContract(s.itemAt(s.is_subscribed(c)))
-    
-    
-
-
-    
-    
-if __name__ == '__main__':
-    
-    if len(sys.argv) != 2:
-        print("Usage: %s <config file>" % sys.argv[0])
-        exit(-1)    
-
-    cfg_path= sys.argv[1:]
-    config = ConfigParser.SafeConfigParser()
-    if len(config.read(cfg_path)) == 0: 
-        raise ValueError, "Failed to open config file" 
-    
-   
-      
-    logconfig = eval(config.get("tws_gateway", "tws_gateway.logconfig").strip('"').strip("'"))
-    logconfig['format'] = '%(asctime)s %(levelname)-8s %(message)s'    
-    logging.basicConfig(**logconfig)        
-    
-
-    
-    
-    khost = config.get("epc", "kafka.host").strip('"').strip("'")
-    kport = config.get("epc", "kafka.port")
-    ihost = config.get("market", "ib.gateway").strip('"').strip("'")
-    iport = int(config.get("market", "ib.port"))
-    iappid = int(config.get("market", "ib.appid.portfolio"))   
-    
-    
-    
-    
-    #print 'give kafka server some time to register the topics...'
-    #sleep(2)
-    
-    app = TWS_gateway(ihost, iport, iappid, khost, kport, config)
-    app.start_gateway()
-    print 'TWS_gateway started.'
-#     
-
-
-    
-
-
-#    test_subscription()
-    

+ 9 - 8
src/config/tws_gateway.cfg

@@ -1,14 +1,14 @@
 [tws_gateway]
-name: tws_gateway_server
+name: 'tws_gateway_server'
 #
 # kafka settings
 #
-bootstrap_host: localhost
+bootstrap_host: 'localhost'
 bootstrap_port: 9092
 #
 # redis persistence
 #
-redis_host: localhost
+redis_host: 'localhost'
 redis_port: 6379
 redis_db: 0
 #
@@ -16,16 +16,17 @@ redis_db: 0
 #
 # 7496 - production larry046, 7496 - development,  8496 production mchan927
 #
-tws_host: localhost
-tws_api_port: 7496 
+tws_host: 'localhost'
+tws_api_port: 7496
+tws_app_id: 749601 
 #
 #
 #
-group_id: TWS_GW
+group_id: 'TWS_GW'
 session_timeout_ms: 10000
-topics: 'reqAccountUpdates', 'reqOpenOrders', 'reqExecutions', 'reqIds', 'reqNewsBulletins', 'cancelNewsBulletins', 'setServerLogLevel', 'reqAccountSummary', 'reqPositions', 'reqAutoOpenOrders', 'reqAllOpenOrders', 'reqManagedAccts', 'requestFA', 'reqMktData', 'reqHistoricalData', 'placeOrder', 'gw_req_subscriptions'
+topics: ['reqAccountUpdates', 'reqOpenOrders', 'reqExecutions', 'reqIds', 'reqNewsBulletins', 'cancelNewsBulletins', 'setServerLogLevel', 'reqAccountSummary', 'reqPositions', 'reqAutoOpenOrders', 'reqAllOpenOrders', 'reqManagedAccts', 'requestFA', 'reqMktData', 'reqHistoricalData', 'placeOrder', 'gw_req_subscriptions']
 clear_offsets: False
-subscription_manager.subscriptions.redis_key: subscriptions  
+subscription_manager.subscriptions.redis_key: 'subscriptions'  
 #logconfig: {'filename': '/home/larry-13.04/workspace/finopt/log/tws_gateway.log', 'filemode': 'w','level': logging.INFO}
 logconfig: {'level': logging.INFO}
 order_transmit: False

+ 4 - 3
src/sh/start_twsgw.sh

@@ -1,5 +1,6 @@
 #!/bin/bash
-ROOT=$FINOPT_HOME
+FINOPT_HOME=~/ironfly-workspace/finopt/src
 export PYTHONPATH=$FINOPT_HOME:$PYTHONPATH
-# real time mode
-python $FINOPT_HOME/comms/tws_gateway.py $FINOPT_HOME/config/app.cfg
+
+python $FINOPT_HOME/comms/ibgw/tws_gateway.py $FINOPT_HOME/config/tws_gateway.cfg 
+