esurfer 9 年之前
父节点
当前提交
bc31a53ae0
共有 1 个文件被更改,包括 62 次插入6 次删除
  1. 62 6
      src/comms/test/base_messaging.py

+ 62 - 6
src/comms/test/base_messaging.py

@@ -1,6 +1,7 @@
  #!/usr/bin/env python
 import threading, logging, time
 import sys
+import copy
 import datetime
 import uuid
 from Queue import Queue
@@ -224,7 +225,7 @@ class BaseConsumer(threading.Thread, Publisher):
                                  partition_assignment_strategy=[RoundRobinPartitionAssignor],
                                  #
                                  #
-                                 consumer_timeout_ms=1000
+                                 consumer_timeout_ms=self.kwargs['consumer_timeout_ms']
                                 )
 
 
@@ -289,7 +290,7 @@ class BaseConsumer(threading.Thread, Publisher):
                     # notify the observers
                     # the "and" condition ensures that on a fresh start of kafka server this event is not triggered as
                     # both saved value in redis and current offset are both 0
-                    if self.my_topics[message.topic][str(message.partition)] == message.offset and message.ofset <> 0:
+                    if self.my_topics[message.topic][str(message.partition)] == message.offset and message.offset <> 0:
                         self.dispatch(BaseConsumer.KB_REACHED_LAST_OFFSET, self.enrich_message(message))
                         logging.info('********************** reached the last message previously processed %s %d' % (message.topic, message.offset))
                     else:
@@ -333,15 +334,58 @@ class SimpleMessageListener(BaseMessageListener):
 
 class Prosumer(BaseProducer):
     # wrapper object
+    PROSUMER_DEFAULT_CONFIG = {
+        'bootstrap_servers': 'localhost',
+        'client_id': 'kafka-prosumer' ,
+        'group_id': 'kafka-prosumer-default-group',
+        'key_deserializer': None,
+        'value_deserializer': None,
+        'fetch_max_wait_ms': 500,
+        'fetch_min_bytes': 1,
+        'max_partition_fetch_bytes': 1 * 1024 * 1024,
+        'request_timeout_ms': 40 * 1000,
+        'retry_backoff_ms': 100,
+        'reconnect_backoff_ms': 50,
+        'max_in_flight_requests_per_connection': 5,
+        'auto_offset_reset': 'latest',
+        'enable_auto_commit': True,
+        'auto_commit_interval_ms': 5000,
+        'default_offset_commit_callback': lambda offsets, response: True,
+        'check_crcs': True,
+        'metadata_max_age_ms': 5 * 60 * 1000,
+        'partition_assignment_strategy': (RoundRobinPartitionAssignor),
+        'heartbeat_interval_ms': 3000,
+        'session_timeout_ms': 30000,
+        'max_poll_records': sys.maxsize,
+        'receive_buffer_bytes': None,
+        'send_buffer_bytes': None,
+        'consumer_timeout_ms': 1000,
+        'connections_max_idle_ms': 9 * 60 * 1000, # not implemented yet
+    }    
+    
+    
     def __init__(self, name, kwargs=None):
         BaseProducer.__init__(self, group=None, target=None, name=name,
                  args=(), kwargs=kwargs, verbose=None)
-        self.kconsumer = BaseConsumer(name=name,  kwargs=kwargs)
-        self.kwargs = kwargs
         
-    
+        
+        self.kwargs = copy.copy(self.PROSUMER_DEFAULT_CONFIG)
+        for key in self.kwargs:
+            if key in kwargs:
+                self.kwargs[key] = kwargs.pop(key)        
+        self.kwargs.update(kwargs)
+        
+        logging.info('\nProsumer:init: **** Configurations dump ***')
+        logging.info('\n'.join('%s:%s' % (k.ljust(40), self.kwargs[k]) for k in sorted(self.kwargs)))
 
+        
+        self.kconsumer = BaseConsumer(name=name,  kwargs=self.kwargs)
+        
+        
     
+    def add_listener_topics(self, listener, topics):
+        map(lambda e: self.kconsumer.register(e, listener, getattr(listener, e)), topics)
+        
     def add_listeners(self, listeners):
         
         for l in listeners:
@@ -448,7 +492,19 @@ def test_prosumer2(mode):
         sB = SubscriptionListener('earB', pB)
         pB.add_listeners([sB])
         pB.start_prosumer()
-    
+        try:
+            
+            while True: #i < 5:
+                
+                pB.send_message('tickPrice', 
+                        pB.message_dumps({'field':5, 'typeName':'tickPrice', 'price':2.0682, 'ts':1485661437.83, 'source':'IB', 'tickerId':79, 'canAutoExecute':0}))
+                
+                time.sleep(.45)
+                
+        except (KeyboardInterrupt, SystemExit):
+                logging.error('caught user interrupt')
+                pB.set_stop()
+                pB.join()