| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704 |
- #!/usr/bin/env python
- import threading, logging, time, traceback
- import sys
- import copy
- import datetime
- import uuid
- from Queue import Queue
- from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor
- from misc2.observer import NotImplementedException
- from kafka import KafkaConsumer, KafkaProducer
- from kafka.structs import TopicPartition
- from kafka.errors import NoBrokersAvailable
- #
- # packages required for ConsumerNextIteratorPersist
- import json
- from redis import Redis
- from misc2.observer import Subscriber, Publisher
- from numpy.distutils.fcompiler import none
- from types import NoneType
- class Producer(threading.Thread):
- daemon = True
- def run(self):
- try:
- producer = KafkaProducer(bootstrap_servers='localhost:9092')
- topics = ['my_topic', 'my_topic2']
- i = 0
- while True:
- #today = datetime.date.today()
- s = "%d %s test %s" % (i, topics[i%2], time.strftime("%b %d %Y %H:%M:%S"))
- logging.info(s)
- producer.send(topics[i%2], s)
-
- time.sleep(.45)
- i=i+1
- except NoBrokersAvailable:
- logging.error("NoBrokersAvailable: Has kafka started?")
- class BaseProducer(threading.Thread, Subscriber):
- def __init__(self, group=None, target=None, name=None,
- args=(), kwargs=None, verbose=None):
- threading.Thread.__init__(self, group=group, target=target, name=name,
- verbose=verbose)
- """
- kwargs:
- bootstrap_host
- bootstrap_host
- redis_host
- session_timeout_ms:
- """
-
-
- self.name = '%s-%s' % (name, uuid.uuid5(uuid.NAMESPACE_OID, name))
- logging.info('BaseProducer __init__: name=%s' % self.name)
- self.args = args
- self.kwargs = kwargs
-
- self.event_q = Queue()
- return
- def send_message(self, topic, plain_text):
- self.event_q.put((topic, plain_text))
- self.event_q.task_done()
- def set_stop(self):
- self.done = True
-
- def run(self):
- try:
- producer = KafkaProducer(bootstrap_servers='%s:%s' % (self.kwargs['bootstrap_host'], self.kwargs['bootstrap_port']))
-
- self.done = False
- while self.done <> True:
- #today = datetime.date.today()
-
- #if not self.event_q.empty():
- while not self.event_q.empty():
- topic, plain_text = self.event_q.get()
- #s = "BaseProducer topic:[%s] msg:[%s]" % (i, topics[i%2], time.strftime("%b %d %Y %H:%M:%S"))
- logging.info("BaseProducer topic:[%s] msg:[%s]" % (topic, plain_text))
- producer.send(topic, plain_text)
-
- # to prevent excessive CPU use
- time.sleep(0.1)
-
-
- logging.info ('******** BaseProducer exit done.')
- producer.close(1)
-
- except NoBrokersAvailable:
- logging.error("NoBrokersAvailable: Has kafka started?")
-
- class BaseConsumer(threading.Thread, Publisher):
-
- #KB_EVENT = "on_kb_event"
- SLOW_CONSUMER_CHECK_EVERY = 50
- SLOW_CONSUMER_QUALIFY_NUM = 500
- KB_REACHED_LAST_OFFSET = "on_kb_reached_last_offset"
-
- #my_topics = {'my_topic':{}, 'my_topic2':{}}
- def __init__(self, group=None, target=None, name=None,
- args=(), kwargs=None, verbose=None):
- threading.Thread.__init__(self, group=group, target=target, name=name,
- verbose=verbose)
- """
- kwargs:
- bootstrap_host
- bootstrap_host
- redis_host
- redis_port
- redis_db
- group_id
- consumer_id: name
- topics- a list of topic strings
- session_timeout_ms
- consumer_timeout_ms
- seek_to_end- a list of topics that only wants the latest message
- """
-
-
- self.name = '%s-%s' % (name, uuid.uuid5(uuid.NAMESPACE_OID, name))
- logging.info('BaseConsumer __init__: name=%s' % self.name)
- self.args = args
- self.kwargs = kwargs
- self.rs = Redis(self.kwargs['redis_host'], self.kwargs['redis_port'], self.kwargs['redis_db'])
- try:
- self.kwargs['seek_to_end']
- except KeyError:
- self.kwargs['seek_to_end'] = []
- self.my_topics = {}
- for t in self.kwargs['topics']:
- self.my_topics[t]= {}
-
- #self.events = {event: dict() for event in [BaseConsumer.KB_EVENT, BaseConsumer.KB_REACHED_LAST_OFFSET]}
- self.events = {event: dict() for event in [BaseConsumer.KB_REACHED_LAST_OFFSET] + self.kwargs['topics']}
- return
-
-
-
- """
- each consumer has its own set of topics offsets stored in redis
- for consumer A and consumer B (with different group_ids) subscribing to the same my_topic, each
- each of them will need to keep track of its own offsets
- to make the redis key unique by consumer, the key is created by topic + '@' + consumer name
- example: my_topic@consumerA
-
- offsets = { topic-consumer_name:
- {
- partition0: offset0,
- partition1: offset1,...
- }
- }
- """
-
- def consumer_topic(self, tp):
- return tp + '@' + self.name
-
-
- def clear_offsets(self):
- """
- clear the offsets in redis by removing the value from redis
- and emptying the internal my_topics dict
-
- clear offsets is ncessary when the offset in redis was saved
- at a time since kafka manager was shut down
- when kafka restarts, previously buffered
- messages are no longer available and instead it will restart its offset at 0.
- Reading an old offset by BaseConsumer will cause it to think that it
- is still receving old buffered messages from Kafa but in fact all the messages
- since the last shut down of kafka are all gone
- """
- for t in self.kwargs['topics']:
- self.my_topics[t]= {}
- logging.info("BaseConsumer:clear_offsets Deleting %s from redis..." % self.consumer_topic(t))
- self.rs.delete(self.consumer_topic(t))
-
-
-
-
- def persist_offsets(self, topic, partition, offset):
- #self.rs.set(self.consumer_topic(topic), json.dumps({'partition': partition, 'offset':offset}))
- self.my_topics[topic][str(partition)] = offset
- self.rs.set(self.consumer_topic(topic), json.dumps(self.my_topics[topic]))
-
-
- def extract_message_content(self, message):
- #logging.info('BaseConsumer: extract_message_content. %s %s' % (type(message), message))
- try:
- return json.loads(message.value)
- except ValueError:
- logging.info('extract_message_content exception: %s' % message)
- return {}
-
- def set_stop(self):
- self.done = True
-
- def run(self):
- logging.info('BaseConsumer:run. %s:%s started' % (self.kwargs['group_id'], self.name))
-
- if self.kwargs['clear_offsets'] == True:
- self.clear_offsets()
-
- consumer = KafkaConsumer(bootstrap_servers='%s:%s' % (self.kwargs['bootstrap_host'], self.kwargs['bootstrap_port']),
- auto_offset_reset='earliest',
- #
- # consumers having the same group id works as
- # a group to seize messages published by the publisher
- # (like a queue, each message is consumed exactly once
- # by a consumer)
- #
- #
- # in a 'pub-sub' environment, set each consumer having
- # a unique group_id
- #
- group_id = self.kwargs['group_id'],
- client_id = self.name,
- #
- # session_timeout_ms is the time it takes for another consumer
- # that has the same group_id to pick up the work
- # to consume messages when this consumer is dead
- #
- session_timeout_ms = self.kwargs['session_timeout_ms'],
- #
- #
- #
- partition_assignment_strategy=[RoundRobinPartitionAssignor],
- #
- #
- consumer_timeout_ms=self.kwargs['consumer_timeout_ms']
- )
-
- for topic in self.my_topics.keys():
- #
- # if a topic offset is stored previously (the consumer was run before),
- # then load the offset values
- # and save it locally into the my_topics map
- # else self.my_topics[topic] would have zero elements in it
- if self.rs.keys(self.consumer_topic(topic)):
- self.my_topics[topic] = json.loads(self.rs.get(self.consumer_topic(topic)))
- logging.info('BaseConsumer:run. Topics subscribed: %s' % self.my_topics)
-
-
- consumer.subscribe(self.my_topics.keys())
- #consumer.seek_to_end(TopicPartition(topic='my_topic', partition=0))
- self.done = False
-
-
- while self.done <> True:
- try:
- message = consumer.next()
-
- # the next if block is there to serve information purpose only
- # it may be useful to detect slow consumer situation
- if message.offset % BaseConsumer.SLOW_CONSUMER_QUALIFY_NUM == 0:
- highwater = consumer.highwater(TopicPartition(message.topic, message.partition))
- logging.info( "BaseConsumer [%s]:highwater:%d offset:%d part:%d <%s>" % (self.name, highwater, message.offset, message.partition, message.topic))
-
- if highwater - message.offset >= BaseConsumer.SLOW_CONSUMER_QUALIFY_NUM:
- logging.warn("BaseConsumer:run Slow consumer detected! current: %d, highwater:%d, gap:%d" %
- (message.offset, highwater, highwater - message.offset))
- # the next block is designed to handle the first time the
- # consumer encounters a topic partition it hasnt' seen yet
- try:
- # the try catch block ensures that on the first encounter of a topic/partition
- # the except block is executed once and only once, thereafter since the
- # dictionary object keys are assigned the exception will never
- # be caught again
- #
- # the try catch is supposed to be faster than a if-else block...
- #
- # the below statement has no meaning, it is there merely to ensure that
- # the block fails on the first run
- self.my_topics[message.topic][str(message.partition)]
- except KeyError:
- highwater = consumer.highwater(TopicPartition(message.topic, message.partition))
- gap = highwater - message.offset
- logging.info( "*** On first iteration: [Topic:%s:Part:%d:Offset:%d]: Number of messages lagging behind= %d. Highwater:%d"
- % (message.topic, message.partition, message.offset, gap, highwater))
-
- for t, ps in map(lambda t: (t, consumer.partitions_for_topic(t)), self.my_topics.keys()):
- try:
- logging.info ("*** On first iteration: T/P Table: topic:[%s] %s" % (t.rjust(25),
- ','.join('part:%d, off:%d' % (p, consumer.position(TopicPartition(topic=t, partition=p))) for p in ps)
- ))
- except TypeError:
- logging.warn ('*** On first iteration: [*** %s not registered in kafka topics yet ***]. This message should go away the next time the program is run.' % t)
- continue
-
- self.persist_offsets(message.topic, message.partition, message.offset)
- self.my_topics[message.topic] = json.loads(self.rs.get(self.consumer_topic(message.topic)))
-
-
- if '*' in self.kwargs['seek_to_end'] or message.topic in self.kwargs['seek_to_end']:
- #print 'baseconsumer run %s %d' % (message.topic, gap)
- # if there is no gap
- '''
-
- use seek_to_end only for messages that keep streaming and you don't
- care whether messages are lost or not
-
-
- '''
- if gap <=1:
- # the message is valid for dispatching and not to be skipped
- self.dispatch(message.topic, self.extract_message_content(message))
- logging.debug('*** On first iteration: Gap=%d Dispatch this valid message to the listener <%s>' % (gap, message.value))
- else: # gap exists
- logging.info("*** On first iteration: [Topic:%s:Part:%d:Offset:%d]: Gap:%d Attempting to seek to latest message ..."
- % (message.topic, message.partition, message.offset, gap))
- consumer.seek_to_end((TopicPartition(topic=message.topic, partition= message.partition)))
- # skip this message from dispatching to listeners
- continue
-
-
- """
- the message.value received from kafaproducer is expected to contain
- plain text encoded as a json string
- the content of message.value is not altered. it's content is stored in a dict object
- with key = 'value' and enriched with additional kafa metadata
-
-
- it is the subscriber's job to interpret the content stored in the 'value' key. Typically
- it means decoding the content by invoking json.loads
-
- """
- if self.my_topics[message.topic][str(message.partition)] > message.offset:
- logging.info('BaseConsumer ********************** old message...discarding %s %d' % (message.topic, message.offset))
- else:
- #if self.my_topics[message.topic][str(message.partition)] == message.offset:
- # if the stored offset in redis equals to the current offset
- # notify the observers
- # the "and" condition ensures that on a fresh start of kafka server this event is not triggered as
- # both saved value in redis and current offset are both 0
- if self.my_topics[message.topic][str(message.partition)] == message.offset and message.offset <> 0:
- self.dispatch(BaseConsumer.KB_REACHED_LAST_OFFSET, self.extract_message_content(message))
- self.dispatch(message.topic, self.extract_message_content(message))
- logging.info('********************** reached the last message previously processed %s %d' % (message.topic, message.offset))
- else:
- self.persist_offsets(message.topic, message.partition, message.offset)
- self.dispatch(message.topic, self.extract_message_content(message))
-
- except StopIteration:
- logging.debug('BaseConsumer:run StopIteration Caught. No new message arriving...')
- continue
- except TypeError:
- logging.error('BaseConsumer:run. Caught TypeError Exception while processing a message. Malformat json string? %s: %s' % (message.topic, message.value))
- logging.error(traceback.format_exc())
-
- consumer.close()
- logging.info ('******** BaseConsumer exit done.')
- class BaseMessageListener(Subscriber):
-
- def __init__(self, name):
- self.name = name
-
-
- def update(self, event, param=none):
- try:
- event_fn = getattr(self, event)
- event_fn(param)
- except AttributeError:
- err_msg = 'BaseMessageListener:update| function %s not implemented.' % event
- logging.error('BaseMessageListener [%s]:update %s' % (self.name, err_msg))
- logging.debug("BaseMessageListener [%s]:update|Event type:[%s] content:[%s]" % (self.name, event, json.dumps(param) if param <> None else "<empty param>"))
- class SimpleMessageListener(BaseMessageListener):
-
- def __init__(self, name):
- BaseMessageListener.__init__(self, name)
- self.cnt_my_topic = 0
- self.cnt_my_topic2 = 0
-
- # def on_kb_event(self, param):
- # print "on_kb_event [%s] %s" % (self.name, param)
- def my_topic(self, param):
- if self.cnt_my_topic % 50 == 0:
- print "SimpleMessageListener:my_topic. %s" % param
- self.cnt_my_topic += 1
- def my_topic2(self, param):
- if self.cnt_my_topic2 % 50 == 0:
- print "SimpleMessageListener:my_topic2. %s" % param
- self.cnt_my_topic2 += 1
-
- def on_kb_reached_last_offset(self, param):
- print "on_kb_reached_last_offset [%s] %s" % (self.name, param)
- class Prosumer(BaseProducer):
- # wrapper object
- PROSUMER_DEFAULT_CONFIG = {
- 'bootstrap_servers': 'localhost',
- 'client_id': 'kafka-prosumer' ,
- 'group_id': 'kafka-prosumer-default-group',
- 'key_deserializer': None,
- 'value_deserializer': None,
- 'fetch_max_wait_ms': 500,
- 'fetch_min_bytes': 1,
- 'max_partition_fetch_bytes': 1 * 1024 * 1024,
- 'request_timeout_ms': 40 * 1000,
- 'retry_backoff_ms': 100,
- 'reconnect_backoff_ms': 50,
- 'max_in_flight_requests_per_connection': 5,
- 'auto_offset_reset': 'latest',
- 'enable_auto_commit': True,
- 'auto_commit_interval_ms': 5000,
- 'default_offset_commit_callback': lambda offsets, response: True,
- 'check_crcs': True,
- 'metadata_max_age_ms': 5 * 60 * 1000,
- 'partition_assignment_strategy': (RoundRobinPartitionAssignor),
- 'heartbeat_interval_ms': 3000,
- 'session_timeout_ms': 30000,
- 'max_poll_records': sys.maxsize,
- 'receive_buffer_bytes': None,
- 'send_buffer_bytes': None,
- 'consumer_timeout_ms': 1000,
- 'connections_max_idle_ms': 9 * 60 * 1000, # not implemented yet
- }
-
-
- def __init__(self, name, kwargs=None):
-
-
- self.kwargs = copy.copy(self.PROSUMER_DEFAULT_CONFIG)
- for key in self.kwargs:
- if key in kwargs:
- self.kwargs[key] = kwargs.pop(key)
- self.kwargs.update(kwargs)
- logging.info('\nProsumer:init: **** Configurations dump ***')
- logging.info('\n'.join('%s:%s' % (k.ljust(40), self.kwargs[k]) for k in sorted(self.kwargs)))
- BaseProducer.__init__(self, group=None, target=None, name=name,
- args=(), kwargs=self.kwargs, verbose=None)
-
- self.kconsumer = BaseConsumer(name=name, kwargs=self.kwargs)
-
-
-
- def add_listener_topics(self, listener, topics):
- try:
- map(lambda e: self.kconsumer.register(e, listener, getattr(listener, e)), topics)
- except AttributeError as e:
- logging.error("Prosumer:add_listener_topics. Function not implemented in the listener. %s" % e)
- raise NotImplementedException
-
-
- def add_listeners(self, listeners):
-
- for l in listeners:
- map(lambda e: self.kconsumer.register(e, l, getattr(l, e)), self.kwargs['topics'])
-
-
- def is_stopped(self):
- return self.stopped
-
- def set_stop(self):
- BaseProducer.set_stop(self)
- self.kconsumer.set_stop()
- logging.info('Prosumer:set_stop. Pending kconsumer to shutdown in 2s...')
- self.stopped = True
-
- def start_prosumer(self):
- self.stopped = False
- self.kconsumer.start()
- self.start()
-
-
- def message_loads(self, text_msg):
- return json.loads(text_msg)
-
-
- def message_dumps(self, obj_msg):
- return json.dumps(obj_msg)
-
- class SubscriptionListener(BaseMessageListener):
- '''
- test code used by test cases
- '''
-
- def __init__(self, name, producer):
- BaseMessageListener.__init__(self, name)
- self.producer = producer
- self.i = 0
-
- def gw_subscription_changed(self, event, items):
- logging.info("[%s] received gw_subscription_changed content: [%s]" % (self.name, items))
- #print 'SubscriptionListener:gw_subscription_changed %s' % items
-
- # def on_kb_event(self, param):
- # print "on_kb_event [%s] %s" % (self.name, param)
- def gw_req_subscriptions(self, event, items):
-
- logging.info("[%s] received gw_req_subscriptions content:[%s]" % (self.name, items))
- vars= self.producer.message_loads(items['value'])
- self.producer.send_message('gw_subscription_changed', self.producer.message_dumps({'id': self.i, 'reqid': vars['reqid'],
- 'response' : "%s" % (time.strftime("%b %d %Y %H:%M:%S"))})
- )
- self.i = self.i + 1
-
- def reqMktData(self, event, items):
- logging.info("[%s] received %s content:[%s]" % (self.name, event, items))
- self.producer.send_message('tickPrice',
- self.producer.message_dumps({'field':4, 'typeName':'tickPrice', 'price':1.0682, 'ts':1485661437.83, 'source':'IB', 'tickerId':79, 'canAutoExecute':0}))
-
-
- def tickPrice(self, event, items):
- logging.info("[%s] received %s content:[%s]" % (self.name, event, items))
-
- def on_kb_reached_last_offset(self, event, items):
- logging.info("[%s] received on_kb_reached_last_offset content: [%s]" % (self.name, items))
- print "on_kb_reached_last_offset [%s] %s" % (self.name, items)
-
-
- def test_prosumer2(mode):
-
- if mode == 'A':
-
- topicsA = ['gw_subscription_changed', 'tickPrice']
-
- pA = Prosumer(name='A', kwargs={'bootstrap_host':'localhost', 'bootstrap_port':9092,
- 'redis_host':'localhost', 'redis_port':6379, 'redis_db':0,
- 'group_id': 'groupA', 'session_timeout_ms':10000,
- 'topics': topicsA, 'clear_offsets' : False})
- sA = SubscriptionListener('earA', pA)
-
- pA.add_listeners([sA])
- pA.start_prosumer()
- i = 0
- try:
- pA.send_message('reqMktData', pA.message_dumps({'contract':'dummy'}))
- while True: #i < 5:
-
- #pA.send_message('gw_req_subscriptions', pA.message_dumps({'desc': 'requesting subscription msg counter:%d' % i,
- # 'reqid': i}))
- i= i + 1
- time.sleep(.45)
-
- except (KeyboardInterrupt, SystemExit):
- logging.error('caught user interrupt')
- pA.set_stop()
- pA.join()
-
-
-
-
- else:
- topicsB = ['gw_req_subscriptions', 'reqMktData']
-
- pB = Prosumer(name='B', kwargs={'bootstrap_host':'localhost', 'bootstrap_port':9092,
- 'redis_host':'localhost', 'redis_port':6379, 'redis_db':0,
- 'group_id': 'groupB', 'session_timeout_ms':10000,
- 'topics': topicsB, 'clear_offsets' : False})
- sB = SubscriptionListener('earB', pB)
- pB.add_listeners([sB])
- pB.start_prosumer()
- try:
-
- while True: #i < 5:
-
- pB.send_message('tickPrice',
- pB.message_dumps({'field':5, 'typeName':'tickPrice', 'price':2.0682, 'ts':1485661437.83, 'source':'IB', 'tickerId':79, 'canAutoExecute':0}))
-
- time.sleep(.45)
-
- except (KeyboardInterrupt, SystemExit):
- logging.error('caught user interrupt')
- pB.set_stop()
- pB.join()
-
-
-
- class TestProducer(BaseProducer):
- pass
- def test_base_proconsumer(mode):
- '''
- This example demonstrates
-
- 1) use of consumer_timeout_ms to break out from the consumer.next loop
- 2) how to trap ctrl-c and break out of the running threads
- 3) using Queue to store calls to producer.send_message
- 4) using redis to store the consumer last processed offsets
- 5) use of try-catch block to implement seek_to_latest offset
- 6) inherit and implement MessageListener to subscribe messages dispatched by the consumer
- '''
- if mode == 'P':
- #Producer().start()
- topics = ['my_topic', 'my_topic2']
- tp = TestProducer(name = 'testproducer', kwargs={
- 'bootstrap_host':'localhost', 'bootstrap_port':9092,
- 'topics': topics})
- tp.start()
- i = 0
- while True:
-
- #today = datetime.date.today()
- try:
- s = "%d %s test %s" % (i, topics[i%2], time.strftime("%b %d %Y %H:%M:%S"))
- logging.info(s)
- tp.send_message(topics[i%2], s)
-
- time.sleep(.25)
- i=i+1
- except (KeyboardInterrupt, SystemExit):
- logging.error('caught user interrupt')
- tp.set_stop()
- tp.join()
- sys.exit(-1)
-
-
- else:
-
- bc = BaseConsumer(name='bc', kwargs={'redis_host':'localhost', 'redis_port':6379, 'redis_db':0,
- 'bootstrap_host':'localhost', 'bootstrap_port':9092,
- 'group_id':'gid', 'session_timeout_ms':10000, 'topics': ['my_topic', 'my_topic2'],
- 'clear_offsets': True, 'consumer_timeout_ms':1000,
- # uncomment the next line to process messages from since the program was last shut down
- # if seek_to_end is present, for the topic specified the consumer will begin
- # sending the latest message to the listener
- # note that the list only specifies my_topic to receive the latest
- # but not my_topic2. Observe the different behavior by checking the message offset
- # in the program log
- 'seek_to_end': ['my_topic'],
- })
- #bml = BaseMessageListener('bml')
- sml = SimpleMessageListener('simple')
- bc.register(BaseConsumer.KB_REACHED_LAST_OFFSET, sml)
- bc.register('my_topic', sml)
- bc.register('my_topic2', sml)
- bc.start()
-
- def main():
-
- #
- # test cases
- #
- tp = [ test_base_proconsumer, test_prosumer2]
-
- if len(sys.argv) != 3:
- print("Usage: %s <role(producer or consumer): P|C> <test case #[0..1]>" % sys.argv[0])
- print "\n".join('case #%d: %s' % (i, tp[i].__name__) for i in range(len(tp)))
- print "example: python %s P 1" % sys.argv[0]
- print "example: python %s C 1" % sys.argv[0]
- exit(-1)
- mode = sys.argv[1]
- #gid = sys.argv[2] if sys.argv[2] <> None else "q-group"
-
- tp[int(sys.argv[2])](mode)
- #time.sleep(30)
- # while 1:
- # try:
- # time.sleep(5)
- # pass
- # except (KeyboardInterrupt, SystemExit):
- # logging.error('caught user interrupt')
- # sys.exit(-1)
-
-
-
- if __name__ == "__main__":
- logging.basicConfig(
- format='%(asctime)s.%(msecs)s:%(name)s:%(thread)d:%(levelname)s:%(process)d:%(message)s',
- level=logging.INFO
- )
- main()
|