Browse Source

redo kafka and observer pattern

esurfer 9 years ago
parent
commit
7c37ef9ace

+ 86 - 0
alerts/isp_ip.py

@@ -0,0 +1,86 @@
+from bs4 import BeautifulSoup
+from urllib2 import urlopen
+from time import strftime
+import time, sys
+
+def send_email(user, pwd, recipient, subject, body):
+    import smtplib
+
+    gmail_user = user
+    gmail_pwd = pwd
+    FROM = user
+    TO = recipient if type(recipient) is list else [recipient]
+    SUBJECT = subject
+    TEXT = body
+
+    # Prepare actual message
+    message = """\From: %s\nTo: %s\nSubject: %s\n\n%s
+    """ % (FROM, ", ".join(TO), SUBJECT, TEXT)
+    try:
+        server = smtplib.SMTP("smtp.gmail.com", 587)
+        server.ehlo()
+        server.starttls()
+        server.login(gmail_user, gmail_pwd)
+        server.sendmail(FROM, TO, message)
+        server.close()
+        print 'successfully sent the mail'
+    except:
+        print "failed to send mail"
+
+
+def send_ip_alert(curr_ip, old_ip):
+    user='cigarbar@gmail.com'
+    pwd='taipeii0i'
+    recipient='larry1chan@gmail.com'
+    
+    body = 'different ip detected. Update new record at no-ip to avoid hosts not reachable\nNew: [%s]  Old:[%s]\n' % (curr_ip, old_ip)
+    subject='IP changed %s' % curr_ip 
+    send_email(user, pwd, recipient, subject, body)
+
+
+def isp_assigned_ip(iphist):
+    curr_ip = None
+    try:
+        url = 'http://ipecho.net/plain'
+        curr_ip = urlopen(url).read()
+        
+        
+    except:
+        return 'error getting the current ip address!'
+
+    try:
+        f = open(iphist)
+        old_ip = f.readline()
+        if old_ip <> curr_ip:
+            send_ip_alert(curr_ip,  old_ip)
+            print 'different ip detected. Update new record at no-ip to avoid hosts not reachable'
+            f.close()
+            f = open(iphist, 'w')
+            f.write(curr_ip)
+            f.close()
+        else:
+            print 'same ip - nothing to do'
+    except IOError:
+        old_ip = "no previous ip record!"
+        f = open(iphist, 'w')
+        f.write(curr_ip)
+        f.close()
+        send_ip_alert(curr_ip,  old_ip)
+        
+    
+
+
+if __name__ == '__main__':      
+    #send_daily_alert()
+    if len(sys.argv) != 2:
+        print("Usage: %s <path to iphistory.dat>" % sys.argv[0])
+        exit(-1)   
+        
+    isp_assigned_ip(sys.argv[1])
+     
+#     print allianz()
+#     
+#     print cn_huaxia()
+#     print bct_funds()
+    
+    

+ 33 - 11
comms/sample_tws_client.py

@@ -55,6 +55,7 @@ class SampleClient(SimpleTWSClient):
                                         (items.__dict__['tickerId'], ContractHelper.makeRedisKeyEx(ct),\
                                         'bid' if field == 0 else ('ask' if field == 3 else ('last' if field == 5 else field)), \
                                         items.__dict__['size'], datetime.datetime.fromtimestamp(items.__dict__['ts']).strftime('%Y-%m-%d %H:%M:%S.%f')))
+            
         except KeyError:
             print 'tickSize: keyerror: (this could happen on the 1st run as the subscription manager sub list is still empty.'
             print items
@@ -110,6 +111,10 @@ class SampleClient(SimpleTWSClient):
             self.tickerMap.update(i)   
         print 'gw_subscriptions -> dump tickerMap '
         print self.tickerMap 
+        
+    def gw_subscription_changed(self, items):
+        print '[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[['
+        print items        
     
 # override this function to perform your own processing
 #    def accountDownloadEnd(self, items):
@@ -318,10 +323,10 @@ def test1():
      
      
     contract = Contract() #
-    contract.m_symbol = 'EUR'
+    contract.m_symbol = 'WMT'
     contract.m_currency = 'USD'
-    contract.m_secType = 'CASH'
-    contract.m_exchange = 'IDEALPRO'
+    contract.m_secType = 'STK'
+    contract.m_exchange = 'SMART'
     twsc.get_command_handler().reqMktData(contract)
       
     twsc.connect()
@@ -330,6 +335,19 @@ def test1():
     print 'completed...'
 
 
+def test15():
+    contract = Contract() #
+    contract.m_symbol = 'WMT'
+    contract.m_currency = 'USD'
+    contract.m_secType = 'STK'
+    contract.m_exchange = 'SMART'
+    c = SampleClient(host, port, 'SampleClient-777')
+    c.connect()
+
+    c.get_command_handler().reqMktData(contract)
+    sleep(4)
+    c.disconnect()
+    print 'completed...'
     
 def test2():
     
@@ -477,15 +495,16 @@ def test6():
         
         
 def test7():
-    contractTuple = ('VMW', 'STK', 'SMART', 'USD', '', 0, '')
+    contractTuple = ('QQQ', 'STK', 'SMART', 'USD', '', 0, '')
     contract = ContractHelper.makeContract(contractTuple)  
     oc = OptionsChain('t7')
     
     
     oc.set_underlying(contract)
-       
-    oc.set_option_structure(contract, 0.5, 100, 0.0012, 0.0328, '20151211')
-    oc.build_chain(59.3, 0.08, 0.22)            
+    
+    # underlying, spd_size, multiplier, rate, div, expiry):   
+    oc.set_option_structure(contract, 0.5, 100, 0.0012, 0.0328, '20170217')
+    oc.build_chain(125, 0.04, 0.22)            
     
     c = SampleClient(host, port, 'SampleClient-777')
     c.connect()
@@ -497,7 +516,7 @@ def test7():
     for ct in oc.get_option_chain():    
         c.get_command_handler().reqMktData(ct.get_contract())
         print ContractHelper.object2kvstring(ct.get_contract())
-    sleep(3)
+    sleep(1)
     c.disconnect()
     
 def test8():    
@@ -562,13 +581,16 @@ if __name__ == '__main__':
 
     choice= sys.argv[1]
            
-    logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)-8s %(message)s')
+    logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)-8s %(message)s',
+                        filename= '/home/larry-13.04/workspace/finopt/log/unitest.log')
+    
     
-    host = 'vsu-01'
+    # bootstrap server settings
+    host = 'localhost'
     port = 9092
 
     print 'choice: %s' % choice
-    test9()
+    test4()
     #test8()
 #     if choice == '2': 
 #         

+ 0 - 0
comms/test/__init__.py


+ 13 - 0
comms/test/b1.py

@@ -0,0 +1,13 @@
+
+
+
+'''
+    create 2 generic communicators
+    
+    the communicator belongs to class Communicator
+    
+    Communicator inherits BaseConsumer and
+    Composes of a BaseProducer
+    
+    
+'''

+ 343 - 0
comms/test/base_messaging.py

@@ -0,0 +1,343 @@
+#!/usr/bin/env python
+import threading, logging, time
+import sys
+import datetime
+import uuid
+from Queue import Queue
+from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor
+
+from kafka import KafkaConsumer, KafkaProducer
+from kafka.structs import TopicPartition
+from kafka.errors import NoBrokersAvailable
+# 
+# packages required for ConsumerNextIteratorPersist
+import json
+from redis import Redis
+
+from misc2.observer import Subscriber, Publisher
+from numpy.distutils.fcompiler import none
+
+
+
+class Producer(threading.Thread):
+    daemon = True
+
+    def run(self):
+        try:
+            producer = KafkaProducer(bootstrap_servers='localhost:9092')
+            topics = ['my-topic', 'my-topic2']
+            i = 0
+            while True:
+                #today = datetime.date.today()
+                s = "%d %s test %s" % (i, topics[i%2], time.strftime("%b %d %Y %H:%M:%S"))
+                logging.info(s)
+                producer.send(topics[i%2], s)
+                
+                time.sleep(.45)
+                i=i+1
+        except NoBrokersAvailable:
+            logging.error("NoBrokersAvailable: Has kafka started?")
+
+
+class BaseProducer(threading.Thread, Subscriber):
+
+    def __init__(self, group=None, target=None, name=None,
+                 args=(), kwargs=None, verbose=None):
+        threading.Thread.__init__(self, group=group, target=target, name=name,
+                                  verbose=verbose)
+        """
+        kwargs:
+            bootstrap_host
+            bootstrap_host
+            redis_host
+            session_timeout_ms: 
+        """
+        
+        
+        self.name = '%s-%s' % (name, uuid.uuid5(uuid.NAMESPACE_OID, name)) 
+        logging.info('BaseConsumer __init__: name=%s' % self.name)
+        self.args = args
+        self.kwargs = kwargs
+        
+        self.event_q = Queue()
+        return
+
+    def send_message(self, topic, message):
+        self.event_q.put((topic, message))
+        self.event_q.task_done()
+
+    def run(self):
+        try:
+            producer = KafkaProducer(bootstrap_servers='%s:%s' % (self.kwargs['bootstrap_host'], self.kwargs['bootstrap_port']))
+            
+            
+            while True:
+                #today = datetime.date.today()
+                
+                if not self.event_q.empty():
+                    topic, message = self.event_q.get()
+                    #s = "BaseProducer topic:[%s] msg:[%s]" % (i, topics[i%2], time.strftime("%b %d %Y %H:%M:%S"))
+                    logging.debug("BaseProducer topic:[%s] msg:[%s]" % (topic, message))
+                    producer.send(topic, message)
+                
+        except NoBrokersAvailable:
+            logging.error("NoBrokersAvailable: Has kafka started?")
+    
+
+class BaseConsumer(threading.Thread, Publisher):
+    
+    KB_EVENT = "on_kb_event"
+    KB_REACHED_LAST_OFFSET = "on_kb_reached_last_offset"
+    
+    #my_topics =  {'my-topic':{}, 'my-topic2':{}}    
+
+    def __init__(self, group=None, target=None, name=None,
+                 args=(), kwargs=None, verbose=None):
+        threading.Thread.__init__(self, group=group, target=target, name=name,
+                                  verbose=verbose)
+        """
+        kwargs:
+            bootstrap_host
+            bootstrap_host
+            redis_host
+            redis_port
+            redis_db
+            group_id
+            consumer_id: name 
+            topics: a list of topic strings
+            session_timeout_ms: 
+        """
+        
+        
+        self.name = '%s-%s' % (name, uuid.uuid5(uuid.NAMESPACE_OID, name)) 
+        logging.info('BaseConsumer __init__: name=%s' % self.name)
+        self.args = args
+        self.kwargs = kwargs
+        self.rs = Redis(self.kwargs['redis_host'], self.kwargs['redis_port'], self.kwargs['redis_db'])
+        self.my_topics = {}
+        for t in self.kwargs['topics']:
+            self.my_topics[t]= {} 
+            
+        self.events = {event: dict() for event in [BaseConsumer.KB_EVENT, BaseConsumer.KB_REACHED_LAST_OFFSET]}
+        return
+    
+    
+    # no use: doesn't work
+    def seek_to_last_read_offset(self, consumer):
+        for t in self.my_topics.keys():
+            po = json.loads(self.rs.get(t))
+            consumer.seek(TopicPartition(topic=t, partition=po['partition']), po['offset'])
+    
+    """
+     each consumer has its own set of topics offsets stored in redis
+     for consumer A and consumer B (with different group_ids) subscribing to the same my-topic, each 
+     each of them will need to keep track of its own offsets
+     to make the redis key unique by consumer, the key is created by topic + '@' + consumer name
+     example: my-topic@consumerA
+     
+     offsets = { topic-consumer_name:
+                     {
+                         partition0: offset0,
+                         partition1: offset1,... 
+                     }
+               }
+    """
+    
+    def consumer_topic(self, tp):
+        return tp + '@' + self.name 
+    
+    def persist_offsets(self, topic, partition, offset):
+        #self.rs.set(self.consumer_topic(topic), json.dumps({'partition': partition, 'offset':offset}))
+        self.my_topics[topic][str(partition)] = offset
+        self.rs.set(self.consumer_topic(topic), json.dumps(self.my_topics[topic]))
+        
+    def run(self):
+        print '%s:%s started' % (self.kwargs['group_id'], self.name)
+        consumer = KafkaConsumer(bootstrap_servers='%s:%s' % (self.kwargs['bootstrap_host'], self.kwargs['bootstrap_port']),
+                                 auto_offset_reset='earliest',
+                                 #
+                                 # consumers having the same group id works as   
+                                 # a group to seize messages published by the publisher
+                                 # (like a queue, each message is consumed exactly once
+                                 #  by a consumer)
+                                 #
+                                 #
+                                 # in a 'pub-sub' environment, set each consumer having 
+                                 # a unique group_id
+                                 #
+                                 group_id = self.kwargs['group_id'],
+                                 client_id = self.name,
+                                 #
+                                 # session_timeout_ms is the time it takes for another consumer 
+                                 # that has the same group_id to pick up the work
+                                 # to consume messages when this consumer is dead 
+                                 # 
+                                 session_timeout_ms = self.kwargs['session_timeout_ms'],
+                                 #
+                                 # 
+                                 #
+                                 partition_assignment_strategy=[RoundRobinPartitionAssignor])
+
+
+
+        
+        for topic in self.my_topics.keys():
+            # 
+            # if a topic offset is stored previously (the consumer was run before), 
+            # then load the offset values
+            # and save it locally into the my_topics map 
+            # else self.my_topics[topic] would have zero elements in it
+            if self.rs.keys(self.consumer_topic(topic)):
+                self.my_topics[topic] = json.loads(self.rs.get(self.consumer_topic(topic)))
+
+        print self.my_topics
+        
+            
+        consumer.subscribe(self.my_topics.keys())
+
+        
+        #consumer.seek_to_end(TopicPartition(topic='my-topic', partition=0))
+
+        done = False
+        while not done:
+            message = consumer.next()
+            #time.sleep(0.25)
+            if message.offset % 50 == 0:
+                logging.info( "[%s]:highwater:%d offset:%d part:%d <%s>" % (self.name, consumer.highwater(TopicPartition(message.topic, message.partition)),
+                                                                         message.offset, message.partition, message.value))
+            
+#             for t, ps in map(lambda t: (t, consumer.partitions_for_topic(t)), self.my_topics.keys()):
+#                 print "t:%s %s" % (t,  ','.join('p:%d, offset:%d' % (p, consumer.position(TopicPartition(topic=t, partition=p))) for p in ps)) # consumer.position(TopicPartition(topic=t, partition=p)))
+            
+            # if this is the first time the consumer is run
+            # it contains no offsets in the redis map, so it has 
+            # 0 elements in the map,
+            # then insert a new offset in redis and populate
+            # the local my_topics dict
+            
+            
+            if len(self.my_topics[message.topic]) == 0:
+                self.persist_offsets(message.topic, message.partition, message.offset)
+                self.my_topics[message.topic] = json.loads(self.rs.get(self.consumer_topic(message.topic)))
+                continue
+                
+            
+            if self.my_topics[message.topic][str(message.partition)] > message.offset:
+                print '********************** old message...discarding %s %d' % (message.topic, message.offset)
+            else:
+                if self.my_topics[message.topic][str(message.partition)] == message.offset:
+                    self.dispatch(BaseConsumer.KB_REACHED_LAST_OFFSET, {'message': message})
+                    logging.info('********************** reached the last message previously processed %s %d' % (message.topic, message.offset))
+                else:
+                    self.persist_offsets(message.topic, message.partition, message.offset)
+                    self.dispatch(BaseConsumer.KB_EVENT, {'message': message})
+            
+            
+        logging.info ('**********************************************done')
+
+
+
+class BaseMessageListener(Subscriber):
+    
+    
+    
+    def update(self, event, param=none):
+        try:
+            event_fn = getattr(self, event)
+            event_fn(param)
+        except AttributeError:
+            err_msg = 'BaseMessageListener:update| function %s not implemented.' % event
+            logging.error('BaseMessageListener [%s]:update %s' % (self.name, err_msg))
+        logging.debug("BaseMessageListener [%s]:update|Event type:[%s] content:[%s]" % (self.name, event, json.dumps(param) if param <> None else "<empty param>"))
+
+
+class SimpleMessageListener(BaseMessageListener):
+    
+    def __init__(self, name):
+        BaseMessageListener.__init__(self, name)
+    
+    def on_kb_event(self, param):
+        print "on_kb_event [%s] %s" % (self.name, param)
+    
+    def on_kb_reached_last_offset(self, param):
+        print "on_kb_reached_last_offset [%s] %s" % (self.name, param)
+
+
+class  TestProducer(BaseProducer):
+    pass
+
+def test_base_proconsumer(mode):
+
+    if mode == 'P':
+        #Producer().start()
+        topics = ['my-topic', 'my-topic2']
+        tp = TestProducer(name = 'testproducer', kwargs={
+                                             'bootstrap_host':'localhost', 'bootstrap_port':9092,
+                                             'topics': topics})
+        tp.start()
+        i = 0 
+        while True:
+            #today = datetime.date.today()
+            s = "%d %s test %s" % (i, topics[i%2], time.strftime("%b %d %Y %H:%M:%S"))
+            logging.info(s)
+            tp.send_message(topics[i%2], s)
+            
+            time.sleep(.45)
+            i=i+1
+        
+        
+    else:
+        
+        bc = BaseConsumer(name='bc', kwargs={'redis_host':'localhost', 'redis_port':6379, 'redis_db':0,
+                                             'bootstrap_host':'localhost', 'bootstrap_port':9092,
+                                             'group_id':'gid', 'session_timeout_ms':10000, 'topics': ['my-topic', 'my-topic2']})
+        #bml = BaseMessageListener('bml')
+        sml = SimpleMessageListener('simple')
+        #bc.register(BaseConsumer.KB_EVENT, bml)
+        #bc.register(BaseConsumer.KB_REACHED_LAST_OFFSET, bml)
+        bc.register(BaseConsumer.KB_EVENT, sml)
+        bc.register(BaseConsumer.KB_REACHED_LAST_OFFSET, sml)
+        
+        bc.start()
+
+
+
+    
+def main():
+    
+    #
+    # test cases
+    #
+    tp = [ test_base_proconsumer]
+    
+    if len(sys.argv) != 3:
+        print("Usage: %s <role(producer or consumer): P|C> <test case #[0..1]>" % sys.argv[0])
+        print "\n".join('case #%d: %s' % (i, tp[i].__name__) for i in range(len(tp)))
+        print "example: python %s P 1" % sys.argv[0]
+        print "example: python %s C 1" % sys.argv[0]
+        exit(-1)    
+
+    mode = sys.argv[1] 
+    #gid = sys.argv[2] if sys.argv[2] <> None else "q-group"  
+
+    
+    tp[int(sys.argv[2])](mode)
+
+    #time.sleep(30)
+    while 1:
+        try:
+            time.sleep(5)
+            pass
+        except (KeyboardInterrupt, SystemExit):
+                logging.error('caught user interrupt')
+                sys.exit(-1)
+
+    
+    
+
+if __name__ == "__main__":
+    logging.basicConfig(
+        format='%(asctime)s.%(msecs)s:%(name)s:%(thread)d:%(levelname)s:%(process)d:%(message)s',
+        level=logging.INFO
+        )
+    main()

+ 328 - 0
comms/test/test_kafka.py

@@ -0,0 +1,328 @@
+#!/usr/bin/env python
+import threading, logging, time
+import signal
+import sys
+import datetime
+from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor
+
+from kafka import KafkaConsumer, KafkaProducer
+from kafka.structs import TopicPartition
+
+# 
+# packages required for ConsumerNextIteratorPersist
+import json
+from redis import Redis
+
+class Producer(threading.Thread):
+    daemon = True
+
+    def run(self):
+        producer = KafkaProducer(bootstrap_servers='localhost:9092')
+        topics = ['my-topic', 'my-topic2']
+        i = 0
+        while True:
+            #today = datetime.date.today()
+            s = "%d %s test %s" % (i, topics[i%2], time.strftime("%b %d %Y %H:%M:%S"))
+            logging.info(s)
+            producer.send(topics[i%2], s)
+            
+            time.sleep(.15)
+            i=i+1
+
+
+
+class Consumer(threading.Thread):
+    daemon = True
+    my_topics =  ['my-topic', 'my-topic2']
+
+    def __init__(self, group=None, target=None, name=None,
+                 args=(), kwargs=None, verbose=None):
+        threading.Thread.__init__(self, group=group, target=target, name=name,
+                                  verbose=verbose)
+        self.name = name
+        self.args = args
+        self.kwargs = kwargs
+        return
+        
+    def run(self):
+        print '%s:%s started' % (self.kwargs['group_id'], self.name)
+        consumer = KafkaConsumer(bootstrap_servers='localhost:9092',
+                                 auto_offset_reset='latest',
+                                 #
+                                 # consumers having the same group id works as   
+                                 # a group to seize messages published by the publisher
+                                 # (like a queue, each message is consumed exactly once
+                                 #  by a consumer)
+                                 #
+                                 #
+                                 # in a 'pub-sub' environment, set each consumer having 
+                                 # a unique group_id
+                                 #
+                                 group_id = self.kwargs['group_id'],
+                                 client_id = self.name,
+                                 #
+                                 # session_timeout_ms is the time it takes for another consumer 
+                                 # that has the same group_id to pick up the work
+                                 # to consume messages when this consumer is dead 
+                                 # 
+                                 session_timeout_ms = 10000,
+                                 #
+                                 # 
+                                 #
+                                 partition_assignment_strategy=[RoundRobinPartitionAssignor])
+
+        consumer.subscribe(self.my_topics)
+#         print consumer.partitions_for_topic("my-topic")
+#         print consumer.assignment()
+#         print consumer.subscription()
+        
+        #consumer.seek_to_end(TopicPartition(topic='my-topic', partition=0))
+
+        
+        for message in consumer:
+            #time.sleep(0.25)
+            logging.info( "%s:offset:%d part:%d %s" % (self.name, message.offset, message.partition, message.value))
+            for t, ps in map(lambda t: (t, consumer.partitions_for_topic(t)), self.my_topics):
+                print "t:%s %s" % (t,  ','.join('p:%d, offset:%d' % (p, consumer.position(TopicPartition(topic=t, partition=p))) for p in ps)) # consumer.position(TopicPartition(topic=t, partition=p)))
+                      
+            
+
+        logging.info ('**********************************************done')
+
+
+
+class ConsumerNextIteratorPersist(threading.Thread):
+    # this class demonstrates the use of next iterator to process message
+    # and logic to save offsets to an external storage2
+    
+    daemon = True
+    my_topics =  {'my-topic':{}, 'my-topic2':{}}
+     
+    rs = None
+
+    def __init__(self, group=None, target=None, name=None,
+                 args=(), kwargs=None, verbose=None):
+        threading.Thread.__init__(self, group=group, target=target, name=name,
+                                  verbose=verbose)
+        self.name = name
+        self.args = args
+        self.kwargs = kwargs
+        self.rs = Redis('localhost', 6379, 0)
+        return
+    
+    
+    # no use: doesn't work
+    def seek_to_last_read_offset(self, consumer):
+        for t in self.my_topics.keys():
+            po = json.loads(self.rs.get(t))
+            consumer.seek(TopicPartition(topic=t, partition=po['partition']), po['offset'])
+    
+    """
+     each consumer has its own set of topics offsets stored in redis
+     for consumer A and consumer B (with different group_ids) subscribing to the same my-topic, each 
+     each of them will need to keep track of its own offsets
+     to make the redis key unique by consumer, the key is created by topic + '@' + consumer name
+     example: my-topic@consumerA
+     
+     offsets = { topic-consumer_name:
+                     {
+                         partition0: offset0,
+                         partition1: offset1,... 
+                     }
+               }
+    """
+    
+    def consumer_topic(self, tp):
+        return tp + '@' + self.name 
+    
+    def persist_offsets(self, topic, partition, offset):
+        #self.rs.set(self.consumer_topic(topic), json.dumps({'partition': partition, 'offset':offset}))
+        self.my_topics[topic][str(partition)] = offset
+        self.rs.set(self.consumer_topic(topic), json.dumps(self.my_topics[topic]))
+        
+    def run(self):
+        print '%s:%s started' % (self.kwargs['group_id'], self.name)
+        consumer = KafkaConsumer(bootstrap_servers='localhost:9092',
+                                 auto_offset_reset='earliest',
+                                 #
+                                 # consumers having the same group id works as   
+                                 # a group to seize messages published by the publisher
+                                 # (like a queue, each message is consumed exactly once
+                                 #  by a consumer)
+                                 #
+                                 #
+                                 # in a 'pub-sub' environment, set each consumer having 
+                                 # a unique group_id
+                                 #
+                                 group_id = self.kwargs['group_id'],
+                                 client_id = self.name,
+                                 #
+                                 # session_timeout_ms is the time it takes for another consumer 
+                                 # that has the same group_id to pick up the work
+                                 # to consume messages when this consumer is dead 
+                                 # 
+                                 session_timeout_ms = 10000,
+                                 #
+                                 # 
+                                 #
+                                 partition_assignment_strategy=[RoundRobinPartitionAssignor])
+
+
+
+        
+        for topic in self.my_topics.keys():
+            # 
+            # if a topic offset is stored previously (the consumer was run before), 
+            # then load the offset values
+            # and save it locally into the my_topics map 
+            # else self.my_topics[topic] would have zero elements in it
+            if self.rs.keys(self.consumer_topic(topic)):
+                self.my_topics[topic] = json.loads(self.rs.get(self.consumer_topic(topic)))
+
+        print self.my_topics
+        
+            
+        consumer.subscribe(self.my_topics.keys())
+
+        
+        #consumer.seek_to_end(TopicPartition(topic='my-topic', partition=0))
+
+        done = False
+        while not done:
+            message = consumer.next()
+            #time.sleep(0.25)
+            if message.offset % 50 == 0:
+                logging.info( "[%s]:highwater:%d offset:%d part:%d <%s>" % (self.name, consumer.highwater(TopicPartition(message.topic, message.partition)),
+                                                                         message.offset, message.partition, message.value))
+            
+#             for t, ps in map(lambda t: (t, consumer.partitions_for_topic(t)), self.my_topics.keys()):
+#                 print "t:%s %s" % (t,  ','.join('p:%d, offset:%d' % (p, consumer.position(TopicPartition(topic=t, partition=p))) for p in ps)) # consumer.position(TopicPartition(topic=t, partition=p)))
+            
+            # if this is the first time the consumer is run
+            # it contains no offsets in the redis map, so it has 
+            # 0 elements in the map,
+            # then insert a new offset in redis and populate
+            # the local my_topics dict
+            
+            
+            if len(self.my_topics[message.topic]) == 0:
+                self.persist_offsets(message.topic, message.partition, message.offset)
+                self.my_topics[message.topic] = json.loads(self.rs.get(self.consumer_topic(message.topic)))
+                continue
+                
+            #if self.my_topics[message.topic]['offset'] >= message.offset and self.my_topics[message.topic]['partition'] == message.partition:
+            if self.my_topics[message.topic][str(message.partition)] >= message.offset:
+                print '********************** processed previously...discarding %s %d' % (message.topic, message.offset)
+            else:
+                self.persist_offsets(message.topic, message.partition, message.offset)
+            
+        logging.info ('**********************************************done')
+
+
+def test_queue_mechanism(mode, gid):
+    #
+    # at the console, start a producer by typing python test_kafka.py P x 1
+    # the 2nd param is consumer group which has no use for a producer
+    # (so just specify some jibberish for the program to run)
+    # the 3rd param is the position of the test function in the test cases list
+    #
+    # create a new console, start a consumer by typing python test_kafka.py C group1
+    # create another instance of the consumer, again issuing the command
+    # python test_kafka.py C group1
+    #
+    # Make sure the producer has the following settings when it is first created:
+    # partition_assignment_strategy=[RoundRobinPartitionAssignor])
+    #
+    # watch the 2 instances consuming messages simultaneously
+    # kill one of the consumer and view the other one picks up message topic originally
+    # assigned to the now deceased consumer instance
+    # 
+    # the time it takes for the 2nd consumer to assume the role of the first consumer 
+    # is determined by the variable session_timeout_ms = 10000,
+    threads = []
+    if mode == 'P':
+        threads.append(Producer())
+    else:    
+        threads = [
+            Consumer(name='c1', kwargs={'group_id':gid}),
+            Consumer(name='c2', kwargs={'group_id':gid})
+            ]
+
+    for t in threads:
+        t.start()   
+
+def test_recovery(mode, gid):
+    #
+    # this demo requires redis library to 
+    # persist the topic offsets in the redis database
+    #
+    # 1. start a producer by typing test_kafka.py P x 1
+    # 2. start a consumer and let it run for awhile
+    # 3. kill the consumer but leave the producer running
+    # 4. write down the last saved topic offset in redis
+    # 5. after awhile, restart the consumer 
+    # 6. notice the output of the consumer, there should 
+    # be some output that says messages are discarded because
+    # they had been processed previously 
+    # 
+    # the consumer self.name has to be the same across 
+    # re-runs in order for the recovery to work
+    # in this example, it has been hard coded to 'c_test_recovery'
+    #
+    threads = []
+    if mode == 'P':
+        threads.append(Producer())
+    else:
+        
+        threads = [
+            ConsumerNextIteratorPersist(name='c_test_recovery', kwargs={'group_id':gid}),
+            #Consumer(name='c2')
+            ]
+
+    for t in threads:
+        t.start()
+
+
+def test_recovery_discard_aged():
+    # internal pub/sub callback pattern
+    #fire_event(f_msg, kv, age_factor)
+    pass
+    
+def main():
+    
+    #
+    # test cases
+    #
+    tp = [test_queue_mechanism, test_recovery, test_recovery_discard_aged]
+    
+    if len(sys.argv) != 4:
+        print("Usage: %s <role(producer or consumer): P|C> <consumer group id> <test case #[0..1]>" % sys.argv[0])
+        print "\n".join('case #%d: %s' % (i, tp[i].__name__) for i in range(len(tp)))
+        print "example: python %s producer g2 1" % sys.argv[0]
+        print "example: python %s consumer g2 1" % sys.argv[0]
+        exit(-1)    
+
+    mode = sys.argv[1] 
+    gid = sys.argv[2] if sys.argv[2] <> None else "q-group"  
+
+    
+    tp[int(sys.argv[3])](mode, gid)
+
+    #time.sleep(30)
+    while 1:
+        try:
+            time.sleep(5)
+            pass
+        except (KeyboardInterrupt, SystemExit):
+                logging.error('caught user interrupt')
+                sys.exit(-1)
+
+    
+    
+
+if __name__ == "__main__":
+    logging.basicConfig(
+        format='%(asctime)s.%(msecs)s:%(name)s:%(thread)d:%(levelname)s:%(process)d:%(message)s',
+        level=logging.INFO
+        )
+    main()

+ 29 - 20
comms/tws_client.py

@@ -8,10 +8,13 @@ import ConfigParser
 from time import sleep
 import time, datetime
 from threading import Lock
-from kafka.client import KafkaClient
+#from kafka.client import KafkaClient
+
+
+from kafka import KafkaProducer
 from kafka import KafkaConsumer
-from kafka.producer import SimpleProducer
-from kafka.common import LeaderNotAvailableError, ConsumerTimeout
+#from kafka.producer import SimpleProducer
+#from kafka.common import LeaderNotAvailableError, ConsumerTimeout
 import threading
 
 from misc2.helpers import ContractHelper, OrderHelper, ExecutionFilterHelper
@@ -35,22 +38,23 @@ class TWS_client_base_app(threading.Thread):
     def __init__(self, host, port, id=None):
 
         super(TWS_client_base_app, self).__init__()
-        client = KafkaClient('%s:%s' % (host, port))
-        self.producer = SimpleProducer(client, async=False)
+        #client = KafkaClient('%s:%s' % (host, port))
+        #self.producer = SimpleProducer(client, async=False)
+        self.producer = KafkaProducer(bootstrap_servers='%s:%s' % (host, port))
         
 
  
         # consumer_timeout_ms must be set - this allows the consumer an interval to exit from its blocking loop
-        self.consumer = KafkaConsumer( *[(v,0) for v in list(TWS_Protocol.topicEvents) + list(TWS_Protocol.gatewayEvents)] , \
-                                       metadata_broker_list=['%s:%s' % (host, port)],\
+        self.consumer = KafkaConsumer( *[v for v in list(TWS_Protocol.topicEvents) + list(TWS_Protocol.gatewayEvents)] , \
+                                       bootstrap_servers=['%s:%s' % (host, port)],\
                                        client_id = str(uuid.uuid1()) if id == None else id,\
                                        group_id = 'epc.group',\
-                                       auto_commit_enable=True,\
+                                       enable_auto_commit=True,\
                                        consumer_timeout_ms = 2000,\
                                        auto_commit_interval_ms=30 * 1000,\
-                                       auto_offset_reset='largest') # discard old ones
+                                       auto_offset_reset='latest') # discard old ones
         
-        self.reset_message_offset()
+        #self.reset_message_offset()
         
         #self.consumer.set_topic_partitions(('gw_subscriptions', 0, 114,),('tickPrice', 0, 27270,))
         self.command_handler= TWS_server_wrapper(self.producer)
@@ -94,7 +98,7 @@ class TWS_client_base_app(threading.Thread):
 #                 # the message is turned into IB compatible type before firing the callbacks
 #                 [f(self.convertItemsToIBmessage(message.value)) for f in self.reg_all_callbacks]
 
-            logging.info ('TWS_client_base_app: consumer_timeout_ms = %d' % self.consumer._config['consumer_timeout_ms']) 
+            logging.info ('TWS_client_base_app: consumer_timeout_ms = %d' % self.consumer.config['consumer_timeout_ms']) 
  
             # keep running until someone tells us to stop
             while self.stop_consumer == False:
@@ -105,8 +109,8 @@ class TWS_client_base_app(threading.Thread):
                         # it will raise a consumertimeout if no message is received after a pre-set interval   
                         message = self.consumer.next()
                         
-                        
-                        logging.debug("TWS_client_base_app: %s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
+                        if message.topic == 'gw_subscription_changed':
+                            logging.info("TWS_client_base_app: %s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
                                                      message.offset, message.key,
                                                      message.value))
                         
@@ -120,7 +124,7 @@ class TWS_client_base_app(threading.Thread):
                         
                         #self.consumer.task_done(message)
                         
-                    except ConsumerTimeout:
+                    except: # ConsumerTimeout:
                         logging.info('TWS_client_base_app run: ConsumerTimeout. Check new message in the next round...')
                                         
                   
@@ -145,10 +149,10 @@ class TWS_client_base_app(threading.Thread):
 
         items = list(mapping.items())
         items.sort()
-        print(('### %s' % (msg_name, )))
-        for k, v in items:
-            print(('    %s:%s' % (k, v)))
-
+#         print(('### %s' % (msg_name, )))
+#         for k, v in items:
+#             print(('    %s:%s' % (k, v)))
+        print "TWS client base app: handlemessage<%s>: %s" % (msg_name, json.dumps(items))
     
     def ascii_encode_dict(self, data):
         ascii_encode = lambda x: x.encode('ascii') if isinstance(x, unicode) else x
@@ -163,6 +167,7 @@ class TWS_client_base_app(threading.Thread):
                 items['contract'] = ContractHelper.kv2contract(items['contract'])
             del(items['self'])
         except (KeyError, ):
+            #logging.error('convertItemsToIBmessage Exception: %s' % str(items))
             pass        
         return Message(**items)
     
@@ -296,7 +301,7 @@ class TWS_client_base_app(threading.Thread):
         self.handlemessage("tickEFP", items)
 
 
-###########################################################################3333
+###########################################################################
 #   Gateway respond events
 #
 #
@@ -305,6 +310,10 @@ class TWS_client_base_app(threading.Thread):
         self.handlemessage("gw_subscriptions", items)
 
 
+    def gw_subscription_changed(self, items):
+        print '************************'
+        self.handlemessage("gw_subscription_changed", items)
+
 #     def on_tickPrice(self, tickerId, field, price, canAutoExecute):
 #         self.handlemessage('tickPrice', vars())
 # 
@@ -519,7 +528,7 @@ class TWS_server_wrapper():
         
     def post_msg(self, topic, msg):
         logging.info('post_msg sending request to gateway: %s[%s]' % (topic,msg))
-        self.producer.send_messages(topic, msg)
+        self.producer.send(topic, msg)
 
 
 #############################################################33

BIN
comms/tws_client.pyc


+ 45 - 18
comms/tws_gateway.py

@@ -1,11 +1,7 @@
 #!/usr/bin/env python
 # -*- coding: utf-8 -*-
 
-##
-# This script is an exmple of using the generated code within IbPy in
-# the same manner as the Java code.  We subclass EWrapper and give an
-# instance of the wrapper to an EClientSocket.
-##
+
 
 
 import sys
@@ -27,10 +23,14 @@ from ib.ext.Execution import Execution
 from ib.ext.OrderState import OrderState
 from ib.ext.Order import Order
 
-from kafka.client import KafkaClient
+from kafka import KafkaProducer
 from kafka import KafkaConsumer
-from kafka.producer import SimpleProducer
-from kafka.common import LeaderNotAvailableError
+from kafka.errors import KafkaError
+
+
+#from kafka.client import KafkaClient
+#from kafka.producer import SimpleProducer
+#from kafka.common import LeaderNotAvailableError
 
 from misc2.helpers import ContractHelper, OrderHelper, ExecutionFilterHelper
 from comms.ib_heartbeat import IbHeartBeat
@@ -48,8 +48,9 @@ class TWS_event_handler(EWrapper):
     
     def __init__(self, host, port):
         
-        client = KafkaClient('%s:%s' % (host, port))
-        self.producer = SimpleProducer(client, async=False)    
+        #client = KafkaClient()#{'bootstrap_servers': '%s:%s' % (host, port)})
+        #self.producer = SimpleProducer(client, async=False)
+        self.producer = KafkaProducer(bootstrap_servers='%s:%s' % (host, port))    
         logging.info('TWS_event_handler: __init__ Creating kafka client producer at %s:%s' % (host, port))
  
  
@@ -82,9 +83,9 @@ class TWS_event_handler(EWrapper):
 
         try:
             dict = self.serialize_vars_to_dict(message, mapping, source)     
-            if message == 'gw_subscriptions':   
+            if message == 'gw_subscriptions' or message == 'gw_subscription_changed':   
                 logging.info('TWS_event_handler: broadcast event: %s [%s]' % (dict['typeName'], dict))
-            self.producer.send_messages(message, json.dumps(dict))    
+            self.producer.send(message, json.dumps(dict))    
         except:
             logging.error('broadcast_event: exception while encoding IB event to client:  [%s]' % message)
             logging.error(traceback.format_exc())
@@ -95,7 +96,7 @@ class TWS_event_handler(EWrapper):
             if message == 'gw_subscriptions':   
                 sleep(2)
                 logging.info('TWS_event_handler: Retry once broadcasting gw_subscription %s [%s]' % (dict['typeName'], dict))
-                self.producer.send_messages(message, json.dumps(dict))    
+                self.producer.send(message, json.dumps(dict))    
             
             
 
@@ -315,14 +316,14 @@ class TWS_gateway(threading.Thread):
         
         
         logging.info('starting up client request handler - kafkaConsumer...')
-        self.cli_request_handler = KafkaConsumer( *[(v,0) for v in list(TWS_Protocol.topicMethods) + list(TWS_Protocol.gatewayMethods) ], \
-                                   metadata_broker_list=['%s:%s' % (kafka_host, kafka_port)],\
+        self.cli_request_handler = KafkaConsumer( *[v for v in list(TWS_Protocol.topicMethods) + list(TWS_Protocol.gatewayMethods) ], \
+                                   bootstrap_servers=['%s:%s' % (kafka_host, kafka_port)],\
                                    group_id = 'epc.tws_gateway',\
-                                   auto_commit_enable=True,\
+                                   enable_auto_commit=True,\
                                    auto_commit_interval_ms=30 * 1000,\
-                                   auto_offset_reset='largest') # discard old ones
+                                   auto_offset_reset='latest') # discard old ones
         
-        self.reset_message_offset()
+        #self.reset_message_offset()
         
 
 
@@ -584,6 +585,22 @@ class TWS_gateway(threading.Thread):
         print subm
         if subm:
             self.tws_event_handler.broadcast_event('gw_subscriptions',  {'subscriptions': subm}, source='GW')
+            
+            
+       
+#####################################################################
+#    
+#    broadcast gateway notifications  
+    def gw_notify_subscription_changed(self, value): 
+        #
+        # this function is triggered by SubscriptionManager
+        # value param:
+        #
+        #     {id: contractkv_str}
+        #
+        logging.info("TWS_gateway:gw_notify_subscription_changed: %s" % value)
+        self.tws_event_handler.broadcast_event('gw_subscription_changed',  value, source='GW')
+        
         
 class SubscriptionManager():
     
@@ -653,8 +670,10 @@ class SubscriptionManager():
             #
             # the conId must be set to zero when calling TWS reqMktData
             # otherwise TWS will fail to subscribe the contract
+            
             self.parent.connection.reqMktData(id, contract, '', False) 
             
+            
                    
             if self.persist_f:
                 logging.debug('SubscriptionManager reqMktData: trigger callback')
@@ -666,6 +685,14 @@ class SubscriptionManager():
             self.parent.connection.reqMktData(1000 + id, contract, '', True)
             logging.info('SubscriptionManager: reqMktData: contract already subscribed. Request snapshot = %d, contract = %s' % (id, ContractHelper.makeRedisKeyEx(contract)))
         #self.dump()
+
+        #
+        # instruct gateway to broadcast new id has been assigned to a new contract
+        #
+        self.parent.gw_notify_subscription_changed({id: ContractHelper.object2kvstring(contract)})
+        logging.info('SubscriptionManager reqMktData: gw_notify_subscription_changed: %d:%s' % (id, ContractHelper.makeRedisKeyEx(contract)))
+        
+        
         
 #     def makeStkContract(self, contractTuple):
 #         newContract = Contract()

+ 1 - 1
comms/tws_protocol_helper.py

@@ -18,7 +18,7 @@ class TWS_Protocol:
     
     
     gatewayMethods = ('gw_req_subscriptions',)
-    gatewayEvents = ('gw_subscriptions',)
+    gatewayEvents = ('gw_subscriptions', 'gw_subscription_changed')
     
     oceMethods = ()
     oceEvents = ('optionAnalytics','optionsSnapshot')

BIN
comms/tws_protocol_helper.pyc


+ 7 - 7
config/app.cfg

@@ -7,8 +7,8 @@ server.socket_port: 8091
 redis.server: "localhost"
 redis.port: 6379
 # db - 0 production larry046, 3 development, 2 production mchan927
-#redis.db: 3
-redis.db:2
+redis.db: 3
+
 
 redis.sleep: 0.5
 redis.datastore.key.option_implv: 'opt_implv'
@@ -62,7 +62,7 @@ opt_serve.logconfig: "{'filename': '/home/larry-13.04/workspace/finopt/log/serve
 
 
 [cep]
-kafka.host: 'vsu-01'
+kafka.host: 'localhost'
 kafka.port: 9092
 kafka.ib.topic.tick_price: 'ib_tick_price'
 kafka.ib.topic.tick_size: 'ib_tick_size'
@@ -74,11 +74,11 @@ hkex.openhours: '{"morning":[915,1200], "afternoon":[1300,1615]}'
 
 
 
-ib.gateway: '127.0.0.1'
-#ib.gateway: 'vsu-01'
+#ib.gateway: '127.0.0.1'
+ib.gateway: 'vsu-bison'
 
 # 7496 - production larry046, 7496 - development,  8496 production mchan927
-ib.port: 8496
+ib.port: 7496
 #gw port
 #ib.port:4001
 
@@ -114,7 +114,7 @@ msg_bot.redis_prefix: 'alert_bot'
 msg_bot.logconfig: "{'level': logging.INFO}"
 
 [epc]
-kafka.host: 'vsu-01'
+kafka.host: 'localhost'
 kafka.port: 9092
 
 [ib_mds]

+ 1 - 0
config/iphist.dat

@@ -0,0 +1 @@
+223.16.226.61

+ 102 - 0
finopt/instrument.py

@@ -0,0 +1,102 @@
+import logging
+from ib.ext.Contract import Contract
+from misc2.helpers import ContractHelper, dict2str
+
+class Symbol():
+    key = None
+    
+    def __init__(self, contract):
+        self.contract = contract
+        self.tick_values = {}
+        self.extra = {}
+        self.key = ContractHelper.makeRedisKeyEx(contract)    
+    
+    def set_tick_value(self, id, price):
+        self.tick_values[id] = price
+
+    def get_tick_value(self, id):
+        try:
+            
+            return self.tick_values[id]
+    
+        except:
+            
+            return None
+
+    def get_contract(self):
+        return self.contract
+    
+    def get_tick_values(self):
+        return self.tick_values
+    
+    def set_extra_attributes(self, id, val):
+        self.extra[id] = val
+            
+    def get_extra_attributes(self):
+        return self.extra
+    
+    def get_key(self):
+        return self.key
+    
+class Option(Symbol):
+    
+    analytics = None
+    IMPL_VOL = 'imvol'
+    DELTA    = 'delta'
+    GAMMA    = 'gamma'
+    THETA    = 'theta'
+    VEGA     = 'vega'
+    PREMIUM  = 'npv'
+    
+    
+    #[0,1,2,3,4,5,6,7,8,9,14,5001,5002,5003,5004,5005,5006]
+        
+    def __init__(self, contract):
+        Symbol.__init__(self, contract)
+        
+        self.set_analytics(-1.0, -1.0, -1.0, -1.0, -1.0, -1.0)
+
+        
+    def set_analytics(self, imvol=None, delta=None, gamma=None, theta=None, vega=None, npv=None):
+        
+        
+        if self.analytics == None:
+            self.analytics = {}           
+        self.analytics[Option.IMPL_VOL] = imvol
+        self.analytics[Option.DELTA] = delta 
+        self.analytics[Option.GAMMA] = gamma
+        self.analytics[Option.THETA] = theta
+        self.analytics[Option.VEGA] = vega
+        self.analytics[Option.PREMIUM] = npv
+        
+        
+    def get_analytics(self):
+        return self.analytics
+    
+    
+    def object2kvstring(self):
+        
+        try:           
+            kv = self.object2kv()
+            return '{"%s":%s, "%s":%s, "%s":%s, "%s":%s}' % ('analytics', dict2str(kv['analytics']), 
+                                                    'contract', ContractHelper.contract2kvstring(self.get_contract()), 
+                                                    'tick_values', dict2str(kv['tick_values']),
+                                                    'extra', dict2str(kv['extra']))
+        except:
+            logging.error( 'Exception Option.object2kvstring')
+               
+        return None
+    
+    
+    def object2kv(self):
+        try:
+            analytics = self.get_analytics()
+            contract =  self.get_contract()
+            tick_values = self.get_tick_values()
+            extra = self.get_extra_attributes()
+            return {'analytics': analytics, 'contract': contract, 'tick_values': tick_values, 'extra': extra}            
+        except:
+            logging.error( 'Exception Option.object2kv')
+               
+        return None
+        

BIN
finopt/instrument.pyc


+ 293 - 139
finopt/options_chain.py

@@ -7,6 +7,8 @@ import logging
 import threading
 from ib.ext.Contract import Contract
 from misc2.helpers import ContractHelper, dict2str
+from instrument import Symbol, Option
+from misc2.observer import Publisher, Subscriber 
 #from misc2.helpers import ContractHelper, OrderHelper, ExecutionFilterHelper
 from comms.tws_client import SimpleTWSClient
 from time import sleep
@@ -15,105 +17,12 @@ import optcal
 import traceback
 import redis
 
-class Symbol():
-    
-    def __init__(self, contract):
-        self.contract = contract
-        self.tick_values = {}
-        self.extra = {}    
-    
-    def set_tick_value(self, id, price):
-        self.tick_values[id] = price
-
-    def get_tick_value(self, id):
-        try:
-            
-            return self.tick_values[id]
-    
-        except:
-            
-            return None
-
-    def get_contract(self):
-        return self.contract
-    
-    def get_tick_values(self):
-        return self.tick_values
-    
-    def set_extra_attributes(self, id, val):
-        self.extra[id] = val
-            
-    def get_extra_attributes(self):
-        return self.extra
-    
-class Option(Symbol):
-    
-    analytics = None
-    IMPL_VOL = 'imvol'
-    DELTA    = 'delta'
-    GAMMA    = 'gamma'
-    THETA    = 'theta'
-    VEGA     = 'vega'
-    PREMIUM  = 'npv'
-    
-    
-    #[0,1,2,3,4,5,6,7,8,9,14,5001,5002,5003,5004,5005,5006]
-        
-    def __init__(self, contract):
-        Symbol.__init__(self, contract)
-        
-        self.set_analytics(-1.0, -1.0, -1.0, -1.0, -1.0, -1.0)
-
-        
-    def set_analytics(self, imvol=None, delta=None, gamma=None, theta=None, vega=None, npv=None):
-        
-        
-        if self.analytics == None:
-            self.analytics = {}           
-        self.analytics[Option.IMPL_VOL] = imvol
-        self.analytics[Option.DELTA] = delta 
-        self.analytics[Option.GAMMA] = gamma
-        self.analytics[Option.THETA] = theta
-        self.analytics[Option.VEGA] = vega
-        self.analytics[Option.PREMIUM] = npv
-        
-        
-    def get_analytics(self):
-        return self.analytics
-    
-    
-    def object2kvstring(self):
-        
-        try:           
-            kv = self.object2kv()
-            return '{"%s":%s, "%s":%s, "%s":%s, "%s":%s}' % ('analytics', dict2str(kv['analytics']), 
-                                                    'contract', ContractHelper.contract2kvstring(self.get_contract()), 
-                                                    'tick_values', dict2str(kv['tick_values']),
-                                                    'extra', dict2str(kv['extra']))
-        except:
-            logging.error( 'Exception Option.object2kvstring')
-               
-        return None
-    
-    
-    def object2kv(self):
-        try:
-            analytics = self.get_analytics()
-            contract =  self.get_contract()
-            tick_values = self.get_tick_values()
-            extra = self.get_extra_attributes()
-            return {'analytics': analytics, 'contract': contract, 'tick_values': tick_values, 'extra': extra}            
-        except:
-            logging.error( 'Exception Option.object2kv')
-               
-        return None
-        
             
     
                
         
 
-class OptionsChain():
+class OptionsChain(Publisher):
     underlying = None
     spd_size = None
     multiplier = None
@@ -129,11 +38,14 @@ class OptionsChain():
     trade_vol = None
     #iv = optcal.cal_implvol(spot, contract.m_strike, contract.m_right, today, contract.m_expiry, rate, div, vol, premium)
     
+    option_chain_events = ('on_option_added', 'on_option_deleted', 'on_option_updated')
     
     def __init__(self, id):
         self.id = id
         self.options = []
+        Publisher.__init__(self, OptionsChain.option_chain_events)
         
+
     
     def get_id(self):
         return self.id
@@ -144,6 +56,8 @@ class OptionsChain():
     def set_underlying(self, contract):
         #self.underlying = contract
         self.underlying = Symbol(contract)
+
+        
         
     def set_spread_table(self, spd_size, multiplier):
         self.spd_size = spd_size
@@ -174,8 +88,20 @@ class OptionsChain():
         upper_limit = undlypx * (1 + bound)
         lower_limit = undlypx * (1 - bound)          
         
+        
+        
         base_opt_contract = json.loads(ContractHelper.object2kvstring(self.get_underlying().get_contract()))
-
+        
+        #
+        #     notify listener(s) the option's underlying
+        #     allowing the listeners to store the reference to OptionsChain underlying 
+        #
+        self.dispatch(OptionsChain.option_chain_events[0], self.get_underlying())
+        #
+        #
+        #
+        
+        
         #for i in self.xfrange(int(undlypx), int(upper_limit ), self.spd_size):
         for i in self.xfrange(undlypx, upper_limit, self.spd_size):
 
@@ -184,30 +110,34 @@ class OptionsChain():
             base_opt_contract['m_expiry'] = self.expiry
             base_opt_contract['m_right'] = 'C'
             base_opt_contract['m_multiplier'] = self.multiplier
-            #self.options.append(ContractHelper.kv2object(base_opt_contract, Contract))
-            self.options.append(Option(ContractHelper.kv2object(base_opt_contract, Contract)))
+            
+            #self.options.append(Option(ContractHelper.kv2object(base_opt_contract, Contract)))
+            self.add_option(Option(ContractHelper.kv2object(base_opt_contract, Contract)))
             
             base_opt_contract['m_right'] = 'P'
             #self.options.append(ContractHelper.kv2object(base_opt_contract, Contract))
-            self.options.append(Option(ContractHelper.kv2object(base_opt_contract, Contract)))
+            #self.options.append(Option(ContractHelper.kv2object(base_opt_contract, Contract)))
+            self.add_option(Option(ContractHelper.kv2object(base_opt_contract, Contract)))
  
-        for i in self.xfrange(undlypx - self.spd_size, lower_limit, -self.spd_size):             
+        
+        for i in self.xfrange(undlypx - self.spd_size, lower_limit, -self.spd_size):      
+            #print i, lower_limit
             base_opt_contract['m_secType'] = 'OPT'
             base_opt_contract['m_strike'] = i
             base_opt_contract['m_expiry'] = self.expiry
             base_opt_contract['m_right'] = 'C'
             base_opt_contract['m_multiplier'] = self.multiplier
             #self.options.append(ContractHelper.kv2object(base_opt_contract, Contract))
-            self.options.append(Option(ContractHelper.kv2object(base_opt_contract, Contract)))
+            #self.options.append(Option(ContractHelper.kv2object(base_opt_contract, Contract)))
+            self.add_option(Option(ContractHelper.kv2object(base_opt_contract, Contract)))
              
             base_opt_contract['m_right'] = 'P'
             #self.options.append(ContractHelper.kv2object(base_opt_contract, Contract))
-            self.options.append(Option(ContractHelper.kv2object(base_opt_contract, Contract)))
+            #self.options.append(Option(ContractHelper.kv2object(base_opt_contract, Contract)))
+            self.add_option(Option(ContractHelper.kv2object(base_opt_contract, Contract)))
         
-#         print '------------####'     
-#         for c in self.options:
-#              #print ContractHelper.contract2kvstring(c)
-#              print c.option2kv()
+        
+
         
 
     def xfrange(self, start, stop=None, step=None):
@@ -232,8 +162,12 @@ class OptionsChain():
         return self.options
 
         
-    def add_option(self, kvc):
-        pass
+    def add_option(self, option):
+        #events = ('on_option_added', 'on_option_deleted', 'on_option_updated')
+        #
+        # 
+        self.options.append(option)
+        self.dispatch(OptionsChain.option_chain_events[0], option)
     
     
     def pretty_print(self):
@@ -274,10 +208,19 @@ class OptionsChain():
                                                format_tick_val(x[1].get_analytics()[Option.DELTA], fmt_spec2),
                                                format_tick_val(x[1].get_analytics()[Option.THETA], fmt_spec2),                    
                                                )), sorted_put)
-        title = '%s%30s%s' % ('-' * 40, ContractHelper.makeRedisKeyEx(self.get_underlying().get_contract()).center(50, ' '), '-' * 40) 
+        
+        undlypx = '%s,%s,%s,%s,%s' % (format_tick_val(self.get_underlying().get_tick_value(4), fmt_spec), 
+                                  format_tick_val(self.get_underlying().get_tick_value(0), fmt_specq),
+                                           format_tick_val(self.get_underlying().get_tick_value(1), fmt_spec),
+                                           format_tick_val(self.get_underlying().get_tick_value(2), fmt_spec),
+                                           format_tick_val(self.get_underlying().get_tick_value(3), fmt_spec)
+                                )
+        
+        #title = '%s%30s%s%s' % ('-' * 40, ContractHelper.makeRedisKeyEx(self.get_underlying().get_contract()).center(50, ' '), undlypx, '-' * 40) 
+        title = '%s%30s%s%s' % ('-' * 41, ContractHelper.makeRedisKeyEx(self.get_underlying().get_contract()).center(42, ' '), undlypx, '-' * 27)
         header = '%8s|%8s|%8s|%8s|%8s|%8s|%8s|%8s |%8s| %8s|%8s|%8s|%8s|%8s|%8s|%8s|%8s' % ('last', 'bidq', 'bid', 'ask', 'askq', 'ivol', 'delta', 'theta', 'strike', 'last', 'bidq', 'bid', 'ask', 'askq', 'ivol', 'delta', 'theta')
         combined = map(lambda i: '%s |%8.2f| %s' % (fmt_call[i][1], fmt_put[i][0], fmt_put[i][1]), range(len(fmt_call)) )
-        footer = '%s' % ('-' * 130) 
+        footer = '%s' % ('-' * 154) 
         print title
         print header
         for e in combined:
@@ -338,6 +281,54 @@ class OptionsCalculationEngine(SimpleTWSClient):
     rs = None
     rs_oc_prefix = None
     
+    
+    """
+        symbols= {}
+        tickers={}
+        optionChain->id, [optsym1,optsym2]
+        
+        
+        
+        symbols[symbol_key]={'ticker_id': None, 'syms': [symbol1, symbol2...symbolN]}
+        tickers[ticker_id]=symbol_key
+        
+        
+        oce = OptionCalEngine('OCE')
+        oce.start()
+        
+        oc = OptionChain('QQQ')
+        oce.register(oc) # listen for oc events
+        
+        
+        o1 = Option(ContractHelper.makeContract(contractTuple))
+        oc.add_option(o1) --> notify listeners' update method
+        
+        #oce
+        def update(method, option):
+            key = ContractHelper.makeRedisKeyEx(option)
+            symbols[key]['ticker_id'] = -1
+        
+            symbols[key]['syms'].append(option)
+        
+        
+        def gw_subscription_changed(param):
+            ticker_id = param[ticker_id]
+            symbol_key= param[symbol_key]
+            symbols[key]['ticker_id'] = ticker_id
+            tickers[ticker_id] = key
+        
+        def tick_price(vars):
+        
+            key= tickers[vars['id']]
+            for o in symbols[key]['syms']:
+                # update message values 
+    
+    """
+    
+    
+    
+    
+    
     def __init__(self, config): #host, port, id=None):
         
         
@@ -627,7 +618,173 @@ class OptionsCalculationEngine(SimpleTWSClient):
     #def broadcast_chain_snapshots(self):
         
 
+
+class OCConsumer(Subscriber):
+    symbols = {}
+    tickers = {}
+    
+    """
+    
+    Data structure:
+        tickers map contains key value pairs of ticker id mapped to Symbol primary key
+        tickers => {id1: key1, id2:key2...}
+        
+        example: tickers = {9: 'QQQ-20170217-127.00-C-OPT-USD-SMART-102'
+                            43: 'QQQ-20170217-124.00-C-OPT-USD-SMART-102' ...}
+                            
+        symbols map contains key value pairs of Symbol primary key mapped to a dict object.
+        The dict object contains the ticker id and a list of Symbol objects associated with ticker_id
+        symbols => {key1: 
+                        { 'ticker_id': id1, 
+                          'syms' : [<object ref to Symbol1>,<object ref to Symbol2>...]
+                        }
+                    key2:
+                        ...
+                   }
+        
+        example: symbols = {'QQQ-20170217-127.00-C-OPT-USD-SMART-102':
+                                {'ticker_id': 9, 
+                                 'syms': [<object ref to Symbol QQQ>, ...]
+                                }
+                            }
+                            
+        Usage:
+        Given a ticker_id, the Symbol key can be looked up from tickers
+        With the Symbol key obtained, the reference to the actual object associated with the ticker_id can be retrieved
+        by looking up from symbols[key]['syms']
+        
+        speed: 2 x O(1) + n
+    
+    
+    """
+
+    def __init__(self, name):
+        Subscriber.__init__(self, name)
+    
+    def dump(self):
+            #print ', '.join('[%s:%s]' % (k, v['ticker_id'])) 
+        logging.debug('OCConsumer-symbols: [Key: Ticker ID: # options objects]: ---->\n%s' % (',\n'.join('[%s:%d:%d]' % (k, v['ticker_id'],len(v['syms'])) for k,v in self.symbols.iteritems())))
+        logging.debug('OCConsumer-tickers: %s' % self.tickers)
+        
+    def on_option_chain_changed(self, message, symbol=None):
+        key = symbol.get_key()
+        #print key
+        if key not in self.symbols:
+            self.symbols[key]= {'ticker_id': -1, 'syms': []}
+#         self.symbols[key]['ticker_id'] = -1
+        self.symbols[key]['syms'].append(symbol)        
+        logging.debug('OCConsumer: update event %s: %s %s' % (self.name,message, "none" if not symbol else symbol.get_key()))
+    
+        
+    def tickPrice(self, items):   
+        
+        tid = items['tickerId']
+        #print tid
+        if tid in self.tickers:
+            contract_key = self.tickers[tid]
+            #print contract_key
+            for e in self.symbols[contract_key]['syms']:
+                e.set_tick_value(items['field'], items['price'])
+            
+        
+    def gw_subscription_changed(self, items):
+        # <class 'comms.tws_protocol_helper.Message'>
+        # sample
+        #{0: {'contract': <ib.ext.Contract.Contract object at 0x7ff8f8c9e210>}, 1: {'contract': <ib.ext.Contract.Contract object at 0x7ff8f8c9e250>},... }
+        #print items.__dict__['subscriptions']
+        """
+        {0: {u'm_conId': 0, u'm_right': u'', u'm_symbol': u'QQQ', 
+        u'm_secType': u'STK', u'm_includeExpired': False, 
+        u'm_expiry': u'', u'm_currency': u'USD', u'm_exchange': u'SMART', u'm_strike': 0}, 
+        1: {u'm_conId': 0, u'm_right': u'C', u'm_symbol': u'QQQ', u'm_secType': u'OPT', 
+        u'm_includeExpired': False, u'm_multiplier': 100, u'm_expiry': u'20170217', u'm_currency': u'USD', u'm_exchange': u'SMART', u'm_strike': 125.0}, 
+        2: {u'm_conId': 0, u'm_right': u'P', u'm_symbol': u'QQQ', u'm_secType': u'OPT', u'm_includeExpired': False, u'm_multiplier': 100, 
+        u'm_expiry': u'20170217', u'm_currency': u'USD', u'm_exchange': u'SMART', u'm_strike': 125.0}, 
+        ...
+         
+        78: {u'm_conId': 0, u'm_right': u'P', u'm_symbol': u'QQQ', u'm_secType': u'OPT', 
+        u'm_includeExpired': False, u'm_multiplier': 100, u'm_expiry': u'20170217', 
+        u'm_currency': u'USD', u'm_exchange': u'SMART', u'm_strike': 115.5}}
+        tickPrice>> [0:QQQ--0.00--STK-USD-SMART-102] bid_q=-1.0000 [2017-01-28 12:08:49.587014]
+
+        """
+        #l = map(lambda x: {x[0]: {'contract': x[1]}}, map(lambda x: (x[0], ContractHelper.kvstring2object(x[1], Contract)), items)) #items.__dict__['subscriptions']))
+        #l = map(lambda x: {x[0]: x[1]}, map(lambda x: (x[0], json.loads(x[1])), items.__dict__['subscriptions']))
+
+        for tid, con in items.iteritems():
+            contract = ContractHelper.kv2contract(con)
+            key = ContractHelper.makeRedisKeyEx(contract)
+            if key in self.symbols: 
+                self.symbols[key]['ticker_id'] = tid
+                self.tickers[tid] = key
+        
         
+                
+        
+         
+              
+              
+              
+def unit_test1():
+    
+    fn = open('../../data/mock_msg/mock_msg.txt')
+    lines = map(lambda x: x.split('|'), filter(lambda x: x[0] <> '#', fn.readlines()))
+    mock_msg_str = filter(lambda x: x[0] == 'gw_subscription_changed', lines)[0]
+    #print mock_msg_str
+    mock_msg = eval(mock_msg_str[1])
+
+    
+    dc = OCConsumer('dummy consumer')
+
+
+
+    expiry = '20170217'
+    contractTuple = ('QQQ', 'STK', 'SMART', 'USD', '', 0, '')
+    contract = ContractHelper.makeContract(contractTuple)  
+    oc2 = OptionsChain('qqq-%s' % expiry)
+    oc2.set_option_structure(contract, 0.5, 100, 0.0012, 0.0328, expiry)
+
+
+    for i in range(len(OptionsChain.option_chain_events)):
+        oc2.register(OptionsChain.option_chain_events[i], dc, dc.on_option_chain_changed)
+        
+
+    
+    oc2.build_chain(125, 0.02, 0.22)
+#     for c in oc2.get_option_chain():
+#         print '%s' % ContractHelper.makeRedisKeyEx(c.get_contract())
+
+
+
+    for right in ['C','P']:    
+        optionTuple = ('QQQ', 'OPT', 'SMART', 'USD', expiry, 130.5, right)
+        o = Option(ContractHelper.makeContract(optionTuple))
+        oc2.add_option(o)
+#     optionTuple = ('HSI', 'OPT', 'HKFE', 'HKD', far_expiry, 23000, 'C')
+#     o = Option(ContractHelper.makeContract(optionTuple))
+#     oc2.add_option(o)
+    
+    
+    dc.gw_subscription_changed(mock_msg)
+     
+    mock_items= {'field':4, 'typeName':'tickPrice', 'price':1.0682, 'ts':1485661437.83, 'source':'IB', 'tickerId':79, 'canAutoExecute':0}
+    dc.tickPrice(mock_items)
+    mock_items= {'field':4, 'typeName':'tickPrice', 'price':125.82, 'ts':1485661437.83, 'source':'IB', 'tickerId':0, 'canAutoExecute':0}
+    dc.tickPrice(mock_items)
+    mock_items= {'field':2, 'typeName':'tickPrice', 'price':125.72, 'ts':1485661437.83, 'source':'IB', 'tickerId':0, 'canAutoExecute':0}
+    dc.tickPrice(mock_items)
+    mock_items= {'field':1, 'typeName':'tickPrice', 'price':124.72, 'ts':1485661437.83, 'source':'IB', 'tickerId':0, 'canAutoExecute':0}
+    dc.tickPrice(mock_items)
+    
+    mock_items= {'field':4, 'typeName':'tickPrice', 'price':1.0682, 'ts':1485661437.83, 'source':'IB', 'tickerId':3, 'canAutoExecute':0}
+    dc.tickPrice(mock_items)
+    mock_items= {'field':2, 'typeName':'tickPrice', 'price':1.0682, 'ts':1485661437.83, 'source':'IB', 'tickerId':5, 'canAutoExecute':0}
+    dc.tickPrice(mock_items)
+    mock_items= {'field':1, 'typeName':'tickPrice', 'price':1.0682, 'ts':1485661437.83, 'source':'IB', 'tickerId':10, 'canAutoExecute':0}
+    dc.tickPrice(mock_items) 
+    dc.dump()     
+    oc2.pretty_print()      
+     
     
 if __name__ == '__main__':
     
@@ -643,42 +800,39 @@ if __name__ == '__main__':
    
       
     logconfig = eval(config.get("options_chain", "options_calculation_engine.logconfig").strip('"').strip("'"))
-    logconfig['format'] = '%(asctime)s %(levelname)-8s %(message)s'    
+    logconfig['format'] = '%(asctime)s %(levelname)-8s %(message)s'
+    logconfig['level'] = logging.DEBUG
     logging.basicConfig(**logconfig)        
         
     
-    contractTuple = ('QQQ', 'STK', 'SMART', 'USD', '', 0, '')
-    contract = ContractHelper.makeContract(contractTuple)  
-    oc = OptionsChain('QQQ-MAR24')
-    oc.set_option_structure(contract, 2.5, 100, 0.005, 0.003, '20160324')
-    oc.build_chain(98.0, 0.025, 0.25)
-    for c in oc.get_option_chain():
-        print '%s' % ContractHelper.makeRedisKeyEx(c.get_contract())
+#     contractTuple = ('QQQ', 'STK', 'SMART', 'USD', '', 0, '')
+#     
+#     contract = ContractHelper.makeContract(contractTuple)  
+#     oc = OptionsChain('QQQ-MAR24')
+#     
+#     oc.set_option_structure(contract, 0.5, 100, 0.005, 0.003, '20160324')
+#     oc.build_chain(98.0, 0.025, 0.25)
+#     
+#     for c in oc.get_option_chain():
+#         print '%s' % ContractHelper.makeRedisKeyEx(c.get_contract())
+    
+    unit_test1()
     
 
-    near_expiry = '20160226'
-    contractTuple = ('HSI', 'FUT', 'HKFE', 'HKD', near_expiry, 0, '')
-    contract = ContractHelper.makeContract(contractTuple)  
-    oc1 = OptionsChain('HSI-%s' % near_expiry)
-    oc1.set_option_structure(contract, 200, 50, 0.0012, 0.0328, near_expiry)
-    oc1.build_chain(19200, 0.08, 0.219)
-    for c in oc1.get_option_chain():
-        print '%s' % ContractHelper.makeRedisKeyEx(c.get_contract())
-
-    far_expiry = '20160330'
-    contractTuple = ('HSI', 'FUT', 'HKFE', 'HKD', far_expiry, 0, '')
-    contract = ContractHelper.makeContract(contractTuple)  
-    oc2 = OptionsChain('HSI-%s' % far_expiry)
-    oc2.set_option_structure(contract, 200, 50, 0.0012, 0.0328, far_expiry)
-    oc2.build_chain(19200, 0.08, 0.22)
-    for c in oc2.get_option_chain():
-        print '%s' % ContractHelper.makeRedisKeyEx(c.get_contract())
+#     near_expiry = '20160226'
+#     contractTuple = ('HSI', 'FUT', 'HKFE', 'HKD', near_expiry, 0, '')
+#     contract = ContractHelper.makeContract(contractTuple)  
+#     oc1 = OptionsChain('HSI-%s' % near_expiry)
+#     oc1.set_option_structure(contract, 200, 50, 0.0012, 0.0328, near_expiry)
+#     oc1.build_chain(19200, 0.08, 0.219)
+#     for c in oc1.get_option_chain():
+#         print '%s' % ContractHelper.makeRedisKeyEx(c.get_contract())
 
 
 
     
-    oce = OptionsCalculationEngine(config)
-#    oce.add_chain(oc)
-    oce.add_chain(oc1)
-    oce.add_chain(oc2)
-    oce.run_server()
+#     oce = OptionsCalculationEngine(config)
+# #    oce.add_chain(oc)
+#     oce.add_chain(oc1)
+#     oce.add_chain(oc2)
+#     oce.run_server()

BIN
finopt/options_chain.pyc


+ 0 - 53
finopt/test1.py

@@ -1,53 +0,0 @@
-# -*- coding: utf-8 -*-
-import threading, logging, time
-
-from kafka.client import KafkaClient
-from kafka.consumer import SimpleConsumer
-from kafka.producer import SimpleProducer
-
-class Producer(threading.Thread):
-    daemon = True
-
-    def run(self):
-        client = KafkaClient("vsu-01:9092")
-        producer = SimpleProducer(client)#, async=True)
-
-        while True:
-            producer.send_messages('my.topic', "this is a test")
-            producer.send_messages('my.topic', "\xc2Hola, bravo!")
-
-            
-            time.sleep(1)
-
-
-class Consumer(threading.Thread):
-    daemon = True
-
-    def run(self):
-        client = KafkaClient("vsu-01:9092")
-        consumer = SimpleConsumer(client, "test-group", "my.price")
-
-        for message in consumer:
-            
-            print(message)
-
-def main():
-    threads = [
-        Producer(),
-        Consumer()
-    ]
-
-    for t in threads:
-        t.start()
-
-    #time.sleep(5)
-    while 1:
-        pass
-    
-
-if __name__ == "__main__":
-    logging.basicConfig(
-        format='%(asctime)s.%(msecs)s:%(name)s:%(thread)d:%(levelname)s:%(process)d:%(message)s',
-        level=logging.INFO
-        )
-    main()

+ 87 - 0
finopt/test_pattern.py

@@ -0,0 +1,87 @@
+from misc2.helpers import ContractHelper
+
+class Subscriber:
+    def __init__(self, name):
+        self.name = name
+    def update(self, message):
+        print('{} got message "{}"'.format(self.name, message))
+        
+class Publisher:
+    def __init__(self, events):
+        # maps event names to subscribers
+        # str -> dict
+        self.events = { event : dict()
+                          for event in events }
+    def get_subscribers(self, event):
+        return self.events[event]
+    def register(self, event, who, callback=None):
+        if callback == None:
+            callback = getattr(who, 'update')
+        self.get_subscribers(event)[who] = callback
+    def unregister(self, event, who):
+        del self.get_subscribers(event)[who]
+    def dispatch(self, event, message):
+        for subscriber, callback in self.get_subscribers(event).items():
+            callback(message)
+            
+            
+class SomeAbstraction( object ):
+    xx = None
+    def __init__(self):
+        self.xx = 20
+    
+
+class Mixin1( object ):
+    def something( self ):
+        print 'mixin1' # one implementation
+
+class Mixin2( object ):
+    def something( self ):
+        pass # another
+
+class Concrete1( SomeAbstraction, Mixin1 ):
+    pass
+
+class Producer(Publisher):
+    def __init__(self, events):
+        Publisher.__init__(self, events)
+        print self.events
+
+class Consumer(Subscriber, Mixin1):
+    def __init__(self, name):
+        Subscriber.__init__(self, name)
+    def update(self, message):
+        print('override %s' % message)
+
+
+def test_contracthelper():
+    contractTuple = ('QQQ', 'STK', 'SMART', 'USD', '', 0, '')
+     
+    c1 = ContractHelper.makeContract(contractTuple)  
+    contractTuple = ('QQQ', 'STK', 'SMART', 'USD', '', 0, '')
+    c2 = ContractHelper.makeContract(contractTuple)
+    
+    print ContractHelper.is_equal(c1, c2)
+
+    c2 = contractTuple = ('XQQQ', 'STK', 'SMART', 'USD', '', 0, '')
+    print ContractHelper.is_equal(c1, c2)
+    
+if __name__ == '__main__':
+
+    sa = SomeAbstraction()
+    mx = Mixin1()
+    c = Concrete1()#sa, mx)
+    c.something()
+    print c.xx
+    
+    p = Producer(['e1', 'e2'])
+    p.get_subscribers('e1')
+    bb = Consumer('bb')
+
+    p.register('e1', bb)
+    p.get_subscribers('e1')
+    p.dispatch('e1', "e1 meessage!")
+    bb.something()
+    
+    test_contracthelper()
+    

+ 4 - 1
misc2/helpers.py

@@ -151,7 +151,7 @@ class ContractHelper(BaseHelper):
             return ContractHelper.makeRedisKey(contract)
         
         #contract.m_strike = int(contract.m_strike)
-        contract.m_strike = contract.m_strike
+        #contract.m_strike = contract.m_strike
 # amend 10/22 
 # add exchange to the key         
 #         s = '%s-%s-%s-%s-%s-%s-%s-%d' % (contract.m_symbol,
@@ -181,6 +181,9 @@ class ContractHelper(BaseHelper):
         return s
     
     
+    @staticmethod
+    def is_equal(c1, c2):
+        return ContractHelper.makeRedisKeyEx(c1) == ContractHelper.makeRedisKeyEx(c2) 
 
     
 # def str2dict(s):

BIN
misc2/helpers.pyc


+ 82 - 0
misc2/observer.py

@@ -0,0 +1,82 @@
+import json
+
+class NotImplementedException(Exception):
+    def __init__(self, value):
+        self.value = value
+    
+    def __str__(self):
+        return repr(self.value)
+
+
+class Subscriber:
+    def __init__(self, name):
+        self.name = name
+
+    def update(self, event, param=None):
+        #print('{} got message "{}"'.format(self.name, message))
+        raise NotImplementedException('update function is not implemented! Override the function by subclassing Subscriber!')
+        
+class Publisher:
+    def __init__(self, events):
+        # maps event names to subscribers
+        # str -> dict
+        self.events = { event : dict()
+                          for event in events }
+    
+    def get_subscribers(self, event):
+        return self.events[event]
+    
+    def register(self, event, who, callback=None):
+        if callback == None:
+            callback = getattr(who, 'update')
+        self.get_subscribers(event)[who] = callback
+    
+    def unregister(self, event, who):
+        del self.get_subscribers(event)[who]
+    
+    def dispatch(self, event, params=None):
+        
+        for subscriber, callback in self.get_subscribers(event).items():
+            callback(event, params)
+            
+#############################################################
+# Test classes to demo usage of Publisher and Subscriber
+#
+#
+class Producer(Publisher):
+    def __init__(self, events):
+        Publisher.__init__(self, events)
+    
+
+class Consumer(Subscriber):
+    def __init__(self, name):
+        Subscriber.__init__(self, name)
+        
+    def update(self, event, param=None):
+        print('override %s: %s %s %s' % (self.name, event, "<empy param>" if not param else param,
+                                         
+                                         '<none>' if not param else param.__class__))
+
+    def trigger(self, event, param=None):
+        print('trigger %s: %s %s %s' % (self.name, event, "<empy param>" if not param else param,
+                                         
+                                         '<none>' if not param else param.__class__))
+        
+
+if __name__ == '__main__':
+
+
+    
+    p = Producer(['e1', 'e2'])
+    bb = Consumer('bb')
+    cc = Consumer('cc')
+
+    p.register('e1', bb)
+    p.register('e1', cc, cc.trigger)
+    p.register('e2', cc)
+    p.dispatch('e1', {'xx':123, 'yy':444})
+    p.dispatch('e2', str({'x':123, 'y':444}))
+    
+    
+    
+    

BIN
misc2/observer.pyc


+ 9 - 0
sh/base_messaging.sh

@@ -0,0 +1,9 @@
+#!/bin/bash
+FINOPT_HOME=~/l1304/workspace/finopt/src/
+ROOT=$FINOPT_HOME
+FINDATA=$ROOT/../data 
+SRC=$ROOT
+export PYTHONPATH=$SRC:$PYTHONPATH
+
+python $FINOPT_HOME/comms/test/base_messaging.py $1 $2
+