base_messaging.py 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682
  1. #!/usr/bin/env python
  2. import threading, logging, time
  3. import sys
  4. import copy
  5. import datetime
  6. import uuid
  7. from Queue import Queue
  8. from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor
  9. from misc2.observer import NotImplementedException
  10. from kafka import KafkaConsumer, KafkaProducer
  11. from kafka.structs import TopicPartition
  12. from kafka.errors import NoBrokersAvailable
  13. #
  14. # packages required for ConsumerNextIteratorPersist
  15. import json
  16. from redis import Redis
  17. from misc2.observer import Subscriber, Publisher
  18. from numpy.distutils.fcompiler import none
  19. class Producer(threading.Thread):
  20. daemon = True
  21. def run(self):
  22. try:
  23. producer = KafkaProducer(bootstrap_servers='localhost:9092')
  24. topics = ['my_topic', 'my_topic2']
  25. i = 0
  26. while True:
  27. #today = datetime.date.today()
  28. s = "%d %s test %s" % (i, topics[i%2], time.strftime("%b %d %Y %H:%M:%S"))
  29. logging.info(s)
  30. producer.send(topics[i%2], s)
  31. time.sleep(.45)
  32. i=i+1
  33. except NoBrokersAvailable:
  34. logging.error("NoBrokersAvailable: Has kafka started?")
  35. class BaseProducer(threading.Thread, Subscriber):
  36. def __init__(self, group=None, target=None, name=None,
  37. args=(), kwargs=None, verbose=None):
  38. threading.Thread.__init__(self, group=group, target=target, name=name,
  39. verbose=verbose)
  40. """
  41. kwargs:
  42. bootstrap_host
  43. bootstrap_host
  44. redis_host
  45. session_timeout_ms:
  46. """
  47. self.name = '%s-%s' % (name, uuid.uuid5(uuid.NAMESPACE_OID, name))
  48. logging.info('BaseProducer __init__: name=%s' % self.name)
  49. self.args = args
  50. self.kwargs = kwargs
  51. self.event_q = Queue()
  52. return
  53. def send_message(self, topic, plain_text):
  54. self.event_q.put((topic, plain_text))
  55. self.event_q.task_done()
  56. def set_stop(self):
  57. self.done = True
  58. def run(self):
  59. try:
  60. producer = KafkaProducer(bootstrap_servers='%s:%s' % (self.kwargs['bootstrap_host'], self.kwargs['bootstrap_port']))
  61. self.done = False
  62. while self.done <> True:
  63. #today = datetime.date.today()
  64. #if not self.event_q.empty():
  65. while not self.event_q.empty():
  66. topic, plain_text = self.event_q.get()
  67. #s = "BaseProducer topic:[%s] msg:[%s]" % (i, topics[i%2], time.strftime("%b %d %Y %H:%M:%S"))
  68. logging.info("BaseProducer topic:[%s] msg:[%s]" % (topic, plain_text))
  69. producer.send(topic, plain_text)
  70. # to prevent excessive CPU use
  71. time.sleep(0.1)
  72. logging.info ('******** BaseProducer exit done.')
  73. producer.close(1)
  74. except NoBrokersAvailable:
  75. logging.error("NoBrokersAvailable: Has kafka started?")
  76. class BaseConsumer(threading.Thread, Publisher):
  77. #KB_EVENT = "on_kb_event"
  78. SLOW_CONSUMER_CHECK_EVERY = 50
  79. SLOW_CONSUMER_QUALIFY_NUM = 500
  80. KB_REACHED_LAST_OFFSET = "on_kb_reached_last_offset"
  81. #my_topics = {'my_topic':{}, 'my_topic2':{}}
  82. def __init__(self, group=None, target=None, name=None,
  83. args=(), kwargs=None, verbose=None):
  84. threading.Thread.__init__(self, group=group, target=target, name=name,
  85. verbose=verbose)
  86. """
  87. kwargs:
  88. bootstrap_host
  89. bootstrap_host
  90. redis_host
  91. redis_port
  92. redis_db
  93. group_id
  94. consumer_id: name
  95. topics- a list of topic strings
  96. session_timeout_ms
  97. consumer_timeout_ms
  98. seek_to_end- a list of topics that only wants the latest message
  99. """
  100. self.name = '%s-%s' % (name, uuid.uuid5(uuid.NAMESPACE_OID, name))
  101. logging.info('BaseConsumer __init__: name=%s' % self.name)
  102. self.args = args
  103. self.kwargs = kwargs
  104. self.rs = Redis(self.kwargs['redis_host'], self.kwargs['redis_port'], self.kwargs['redis_db'])
  105. try:
  106. self.kwargs['seek_to_end']
  107. except KeyError:
  108. self.kwargs['seek_to_end'] = []
  109. self.my_topics = {}
  110. for t in self.kwargs['topics']:
  111. self.my_topics[t]= {}
  112. #self.events = {event: dict() for event in [BaseConsumer.KB_EVENT, BaseConsumer.KB_REACHED_LAST_OFFSET]}
  113. self.events = {event: dict() for event in [BaseConsumer.KB_REACHED_LAST_OFFSET] + self.kwargs['topics']}
  114. return
  115. """
  116. each consumer has its own set of topics offsets stored in redis
  117. for consumer A and consumer B (with different group_ids) subscribing to the same my_topic, each
  118. each of them will need to keep track of its own offsets
  119. to make the redis key unique by consumer, the key is created by topic + '@' + consumer name
  120. example: my_topic@consumerA
  121. offsets = { topic-consumer_name:
  122. {
  123. partition0: offset0,
  124. partition1: offset1,...
  125. }
  126. }
  127. """
  128. def consumer_topic(self, tp):
  129. return tp + '@' + self.name
  130. def clear_offsets(self):
  131. """
  132. clear the offsets in redis by removing the value from redis
  133. and emptying the internal my_topics dict
  134. clear offsets is ncessary when the offset in redis was saved
  135. at a time since kafka manager was shut down
  136. when kafka restarts, previously buffered
  137. messages are no longer available and instead it will restart its offset at 0.
  138. Reading an old offset by BaseConsumer will cause it to think that it
  139. is still receving old buffered messages from Kafa but in fact all the messages
  140. since the last shut down of kafka are all gone
  141. """
  142. for t in self.kwargs['topics']:
  143. self.my_topics[t]= {}
  144. logging.info("BaseConsumer:clear_offsets Deleting %s from redis..." % self.consumer_topic(t))
  145. self.rs.delete(self.consumer_topic(t))
  146. def persist_offsets(self, topic, partition, offset):
  147. #self.rs.set(self.consumer_topic(topic), json.dumps({'partition': partition, 'offset':offset}))
  148. self.my_topics[topic][str(partition)] = offset
  149. self.rs.set(self.consumer_topic(topic), json.dumps(self.my_topics[topic]))
  150. def enrich_message(self, message):
  151. return {'value': message.value, 'partition':message.partition, 'offset': message.offset}
  152. def set_stop(self):
  153. self.done = True
  154. def run(self):
  155. logging.info('BaseConsumer:run. %s:%s started' % (self.kwargs['group_id'], self.name))
  156. if self.kwargs['clear_offsets'] == True:
  157. self.clear_offsets()
  158. consumer = KafkaConsumer(bootstrap_servers='%s:%s' % (self.kwargs['bootstrap_host'], self.kwargs['bootstrap_port']),
  159. auto_offset_reset='earliest',
  160. #
  161. # consumers having the same group id works as
  162. # a group to seize messages published by the publisher
  163. # (like a queue, each message is consumed exactly once
  164. # by a consumer)
  165. #
  166. #
  167. # in a 'pub-sub' environment, set each consumer having
  168. # a unique group_id
  169. #
  170. group_id = self.kwargs['group_id'],
  171. client_id = self.name,
  172. #
  173. # session_timeout_ms is the time it takes for another consumer
  174. # that has the same group_id to pick up the work
  175. # to consume messages when this consumer is dead
  176. #
  177. session_timeout_ms = self.kwargs['session_timeout_ms'],
  178. #
  179. #
  180. #
  181. partition_assignment_strategy=[RoundRobinPartitionAssignor],
  182. #
  183. #
  184. consumer_timeout_ms=self.kwargs['consumer_timeout_ms']
  185. )
  186. for topic in self.my_topics.keys():
  187. #
  188. # if a topic offset is stored previously (the consumer was run before),
  189. # then load the offset values
  190. # and save it locally into the my_topics map
  191. # else self.my_topics[topic] would have zero elements in it
  192. if self.rs.keys(self.consumer_topic(topic)):
  193. self.my_topics[topic] = json.loads(self.rs.get(self.consumer_topic(topic)))
  194. logging.info('BaseConsumer:run. Topics subscribed: %s' % self.my_topics)
  195. consumer.subscribe(self.my_topics.keys())
  196. #consumer.seek_to_end(TopicPartition(topic='my_topic', partition=0))
  197. self.done = False
  198. while self.done <> True:
  199. try:
  200. message = consumer.next()
  201. # the next if block is there to serve information purpose only
  202. # it may be useful to detect slow consumer situation
  203. if message.offset % BaseConsumer.SLOW_CONSUMER_QUALIFY_NUM == 0:
  204. highwater = consumer.highwater(TopicPartition(message.topic, message.partition))
  205. logging.info( "BaseConsumer [%s]:highwater:%d offset:%d part:%d <%s>" % (self.name, highwater, message.offset, message.partition, message.value))
  206. if highwater - message.offset >= BaseConsumer.SLOW_CONSUMER_QUALIFY_NUM:
  207. logging.warn("BaseConsumer:run Slow consumer detected! current: %d, highwater:%d, gap:%d" %
  208. (message.offset, highwater, highwater - message.offset))
  209. # the next block is designed to handle the first time the
  210. # consumer encounters a topic partition it hasnt' seen yet
  211. try:
  212. # the try catch block ensures that on the first encounter of a topic/partition
  213. # the except block is executed once and only once, thereafter since the
  214. # dictionary object keys are assigned the exception will never
  215. # be caught again
  216. #
  217. # the try catch is supposed to be faster than a if-else block...
  218. #
  219. # the below statement has no meaning, it is there merely to ensure that
  220. # the block fails on the first run
  221. self.my_topics[message.topic][str(message.partition)]
  222. except KeyError:
  223. highwater = consumer.highwater(TopicPartition(message.topic, message.partition))
  224. gap = highwater - message.offset
  225. logging.info( "*** On first iteration: [Topic:%s:Part:%d:Offset:%d]: Number of messages lagging behind= %d. Highwater:%d"
  226. % (message.topic, message.partition, message.offset, gap, highwater))
  227. for t, ps in map(lambda t: (t, consumer.partitions_for_topic(t)), self.my_topics.keys()):
  228. logging.info ("*** On first iteration: Topic Partition Table: topic:[%s] %s" % (t.rjust(20),
  229. ','.join('partition:%d, offset:%d' % (p, consumer.position(TopicPartition(topic=t, partition=p))) for p in ps)
  230. ))
  231. self.persist_offsets(message.topic, message.partition, message.offset)
  232. self.my_topics[message.topic] = json.loads(self.rs.get(self.consumer_topic(message.topic)))
  233. if message.topic in self.kwargs['seek_to_end']:
  234. # if there is no gap
  235. if gap == 1:
  236. # the message is valid for dispatching and not to be skipped
  237. self.dispatch(message.topic, self.enrich_message(message))
  238. logging.debug('*** On first iteration: Gap=%d Dispatch this valid message to the listener <%s>' % (gap, message.value))
  239. else: # gap exists
  240. logging.info("*** On first iteration: [Topic:%s:Part:%d:Offset:%d]: Gap:%d Attempting to seek to latest message ..."
  241. % (message.topic, message.partition, message.offset, gap))
  242. consumer.seek_to_end((TopicPartition(topic=message.topic, partition= message.partition)))
  243. # skip this message from dispatching to listeners
  244. continue
  245. """
  246. the message.value received from kafaproducer is expected to contain
  247. plain text encoded as a json string
  248. the content of message.value is not altered. it's content is stored in a dict object
  249. with key = 'value' and enriched with additional kafa metadata
  250. it is the subscriber's job to interpret the content stored in the 'value' key. Typically
  251. it means decoding the content by invoking json.loads
  252. """
  253. if self.my_topics[message.topic][str(message.partition)] > message.offset:
  254. logging.info('BaseConsumer ********************** old message...discarding %s %d' % (message.topic, message.offset))
  255. else:
  256. #if self.my_topics[message.topic][str(message.partition)] == message.offset:
  257. # if the stored offset in redis equals to the current offset
  258. # notify the observers
  259. # the "and" condition ensures that on a fresh start of kafka server this event is not triggered as
  260. # both saved value in redis and current offset are both 0
  261. if self.my_topics[message.topic][str(message.partition)] == message.offset and message.offset <> 0:
  262. self.dispatch(BaseConsumer.KB_REACHED_LAST_OFFSET, self.enrich_message(message))
  263. self.dispatch(message.topic, self.enrich_message(message))
  264. logging.info('********************** reached the last message previously processed %s %d' % (message.topic, message.offset))
  265. else:
  266. self.persist_offsets(message.topic, message.partition, message.offset)
  267. #self.dispatch(BaseConsumer.KB_EVENT, {'message': message})
  268. self.dispatch(message.topic, self.enrich_message(message))
  269. except StopIteration:
  270. logging.debug('BaseConsumer:run StopIteration Caught. No new message arriving...')
  271. continue
  272. consumer.close()
  273. logging.info ('******** BaseConsumer exit done.')
  274. class BaseMessageListener(Subscriber):
  275. def __init__(self, name):
  276. self.name = name
  277. def update(self, event, param=none):
  278. try:
  279. event_fn = getattr(self, event)
  280. event_fn(param)
  281. except AttributeError:
  282. err_msg = 'BaseMessageListener:update| function %s not implemented.' % event
  283. logging.error('BaseMessageListener [%s]:update %s' % (self.name, err_msg))
  284. logging.debug("BaseMessageListener [%s]:update|Event type:[%s] content:[%s]" % (self.name, event, json.dumps(param) if param <> None else "<empty param>"))
  285. class SimpleMessageListener(BaseMessageListener):
  286. def __init__(self, name):
  287. BaseMessageListener.__init__(self, name)
  288. self.cnt_my_topic = 0
  289. self.cnt_my_topic2 = 0
  290. # def on_kb_event(self, param):
  291. # print "on_kb_event [%s] %s" % (self.name, param)
  292. def my_topic(self, param):
  293. if self.cnt_my_topic % 50 == 0:
  294. print "SimpleMessageListener:my_topic. %s" % param
  295. self.cnt_my_topic += 1
  296. def my_topic2(self, param):
  297. if self.cnt_my_topic2 % 50 == 0:
  298. print "SimpleMessageListener:my_topic2. %s" % param
  299. self.cnt_my_topic2 += 1
  300. def on_kb_reached_last_offset(self, param):
  301. print "on_kb_reached_last_offset [%s] %s" % (self.name, param)
  302. class Prosumer(BaseProducer):
  303. # wrapper object
  304. PROSUMER_DEFAULT_CONFIG = {
  305. 'bootstrap_servers': 'localhost',
  306. 'client_id': 'kafka-prosumer' ,
  307. 'group_id': 'kafka-prosumer-default-group',
  308. 'key_deserializer': None,
  309. 'value_deserializer': None,
  310. 'fetch_max_wait_ms': 500,
  311. 'fetch_min_bytes': 1,
  312. 'max_partition_fetch_bytes': 1 * 1024 * 1024,
  313. 'request_timeout_ms': 40 * 1000,
  314. 'retry_backoff_ms': 100,
  315. 'reconnect_backoff_ms': 50,
  316. 'max_in_flight_requests_per_connection': 5,
  317. 'auto_offset_reset': 'latest',
  318. 'enable_auto_commit': True,
  319. 'auto_commit_interval_ms': 5000,
  320. 'default_offset_commit_callback': lambda offsets, response: True,
  321. 'check_crcs': True,
  322. 'metadata_max_age_ms': 5 * 60 * 1000,
  323. 'partition_assignment_strategy': (RoundRobinPartitionAssignor),
  324. 'heartbeat_interval_ms': 3000,
  325. 'session_timeout_ms': 30000,
  326. 'max_poll_records': sys.maxsize,
  327. 'receive_buffer_bytes': None,
  328. 'send_buffer_bytes': None,
  329. 'consumer_timeout_ms': 1000,
  330. 'connections_max_idle_ms': 9 * 60 * 1000, # not implemented yet
  331. }
  332. def __init__(self, name, kwargs=None):
  333. self.kwargs = copy.copy(self.PROSUMER_DEFAULT_CONFIG)
  334. for key in self.kwargs:
  335. if key in kwargs:
  336. self.kwargs[key] = kwargs.pop(key)
  337. self.kwargs.update(kwargs)
  338. logging.info('\nProsumer:init: **** Configurations dump ***')
  339. logging.info('\n'.join('%s:%s' % (k.ljust(40), self.kwargs[k]) for k in sorted(self.kwargs)))
  340. BaseProducer.__init__(self, group=None, target=None, name=name,
  341. args=(), kwargs=self.kwargs, verbose=None)
  342. self.kconsumer = BaseConsumer(name=name, kwargs=self.kwargs)
  343. def add_listener_topics(self, listener, topics):
  344. try:
  345. map(lambda e: self.kconsumer.register(e, listener, getattr(listener, e)), topics)
  346. except AttributeError as e:
  347. logging.error("Prosumer:add_listener_topics. Function not implemented in the listener. %s" % e)
  348. raise NotImplementedException
  349. def add_listeners(self, listeners):
  350. for l in listeners:
  351. map(lambda e: self.kconsumer.register(e, l, getattr(l, e)), self.kwargs['topics'])
  352. def is_stopped(self):
  353. return self.stopped
  354. def set_stop(self):
  355. BaseProducer.set_stop(self)
  356. self.kconsumer.set_stop()
  357. logging.info('Prosumer:set_stop. Pending kconsumer to shutdown in 2s...')
  358. self.stopped = True
  359. def start_prosumer(self):
  360. self.stopped = False
  361. self.kconsumer.start()
  362. self.start()
  363. def message_loads(self, text_msg):
  364. return json.loads(text_msg)
  365. def message_dumps(self, obj_msg):
  366. return json.dumps(obj_msg)
  367. class SubscriptionListener(BaseMessageListener):
  368. '''
  369. test code used by test cases
  370. '''
  371. def __init__(self, name, producer):
  372. BaseMessageListener.__init__(self, name)
  373. self.producer = producer
  374. self.i = 0
  375. def gw_subscription_changed(self, event, items):
  376. logging.info("[%s] received gw_subscription_changed content: [%s]" % (self.name, items))
  377. #print 'SubscriptionListener:gw_subscription_changed %s' % items
  378. # def on_kb_event(self, param):
  379. # print "on_kb_event [%s] %s" % (self.name, param)
  380. def gw_req_subscriptions(self, event, items):
  381. logging.info("[%s] received gw_req_subscriptions content:[%s]" % (self.name, items))
  382. vars= self.producer.message_loads(items['value'])
  383. self.producer.send_message('gw_subscription_changed', self.producer.message_dumps({'id': self.i, 'reqid': vars['reqid'],
  384. 'response' : "%s" % (time.strftime("%b %d %Y %H:%M:%S"))})
  385. )
  386. self.i = self.i + 1
  387. def reqMktData(self, event, items):
  388. logging.info("[%s] received %s content:[%s]" % (self.name, event, items))
  389. self.producer.send_message('tickPrice',
  390. self.producer.message_dumps({'field':4, 'typeName':'tickPrice', 'price':1.0682, 'ts':1485661437.83, 'source':'IB', 'tickerId':79, 'canAutoExecute':0}))
  391. def tickPrice(self, event, items):
  392. logging.info("[%s] received %s content:[%s]" % (self.name, event, items))
  393. def on_kb_reached_last_offset(self, event, items):
  394. logging.info("[%s] received on_kb_reached_last_offset content: [%s]" % (self.name, items))
  395. print "on_kb_reached_last_offset [%s] %s" % (self.name, items)
  396. def test_prosumer2(mode):
  397. if mode == 'A':
  398. topicsA = ['gw_subscription_changed', 'tickPrice']
  399. pA = Prosumer(name='A', kwargs={'bootstrap_host':'localhost', 'bootstrap_port':9092,
  400. 'redis_host':'localhost', 'redis_port':6379, 'redis_db':0,
  401. 'group_id': 'groupA', 'session_timeout_ms':10000,
  402. 'topics': topicsA, 'clear_offsets' : False})
  403. sA = SubscriptionListener('earA', pA)
  404. pA.add_listeners([sA])
  405. pA.start_prosumer()
  406. i = 0
  407. try:
  408. pA.send_message('reqMktData', pA.message_dumps({'contract':'dummy'}))
  409. while True: #i < 5:
  410. #pA.send_message('gw_req_subscriptions', pA.message_dumps({'desc': 'requesting subscription msg counter:%d' % i,
  411. # 'reqid': i}))
  412. i= i + 1
  413. time.sleep(.45)
  414. except (KeyboardInterrupt, SystemExit):
  415. logging.error('caught user interrupt')
  416. pA.set_stop()
  417. pA.join()
  418. else:
  419. topicsB = ['gw_req_subscriptions', 'reqMktData']
  420. pB = Prosumer(name='B', kwargs={'bootstrap_host':'localhost', 'bootstrap_port':9092,
  421. 'redis_host':'localhost', 'redis_port':6379, 'redis_db':0,
  422. 'group_id': 'groupB', 'session_timeout_ms':10000,
  423. 'topics': topicsB, 'clear_offsets' : False})
  424. sB = SubscriptionListener('earB', pB)
  425. pB.add_listeners([sB])
  426. pB.start_prosumer()
  427. try:
  428. while True: #i < 5:
  429. pB.send_message('tickPrice',
  430. pB.message_dumps({'field':5, 'typeName':'tickPrice', 'price':2.0682, 'ts':1485661437.83, 'source':'IB', 'tickerId':79, 'canAutoExecute':0}))
  431. time.sleep(.45)
  432. except (KeyboardInterrupt, SystemExit):
  433. logging.error('caught user interrupt')
  434. pB.set_stop()
  435. pB.join()
  436. class TestProducer(BaseProducer):
  437. pass
  438. def test_base_proconsumer(mode):
  439. '''
  440. This example demonstrates
  441. 1) use of consumer_timeout_ms to break out from the consumer.next loop
  442. 2) how to trap ctrl-c and break out of the running threads
  443. 3) using Queue to store calls to producer.send_message
  444. 4) using redis to store the consumer last processed offsets
  445. 5) use of try-catch block to implement seek_to_latest offset
  446. 6) inherit and implement MessageListener to subscribe messages dispatched by the consumer
  447. '''
  448. if mode == 'P':
  449. #Producer().start()
  450. topics = ['my_topic', 'my_topic2']
  451. tp = TestProducer(name = 'testproducer', kwargs={
  452. 'bootstrap_host':'localhost', 'bootstrap_port':9092,
  453. 'topics': topics})
  454. tp.start()
  455. i = 0
  456. while True:
  457. #today = datetime.date.today()
  458. try:
  459. s = "%d %s test %s" % (i, topics[i%2], time.strftime("%b %d %Y %H:%M:%S"))
  460. logging.info(s)
  461. tp.send_message(topics[i%2], s)
  462. time.sleep(.25)
  463. i=i+1
  464. except (KeyboardInterrupt, SystemExit):
  465. logging.error('caught user interrupt')
  466. tp.set_stop()
  467. tp.join()
  468. sys.exit(-1)
  469. else:
  470. bc = BaseConsumer(name='bc', kwargs={'redis_host':'localhost', 'redis_port':6379, 'redis_db':0,
  471. 'bootstrap_host':'localhost', 'bootstrap_port':9092,
  472. 'group_id':'gid', 'session_timeout_ms':10000, 'topics': ['my_topic', 'my_topic2'],
  473. 'clear_offsets': True, 'consumer_timeout_ms':1000,
  474. # uncomment the next line to process messages from since the program was last shut down
  475. # if seek_to_end is present, for the topic specified the consumer will begin
  476. # sending the latest message to the listener
  477. # note that the list only specifies my_topic to receive the latest
  478. # but not my_topic2. Observe the different behavior by checking the message offset
  479. # in the program log
  480. 'seek_to_end': ['my_topic'],
  481. })
  482. #bml = BaseMessageListener('bml')
  483. sml = SimpleMessageListener('simple')
  484. bc.register(BaseConsumer.KB_REACHED_LAST_OFFSET, sml)
  485. bc.register('my_topic', sml)
  486. bc.register('my_topic2', sml)
  487. bc.start()
  488. def main():
  489. #
  490. # test cases
  491. #
  492. tp = [ test_base_proconsumer, test_prosumer2]
  493. if len(sys.argv) != 3:
  494. print("Usage: %s <role(producer or consumer): P|C> <test case #[0..1]>" % sys.argv[0])
  495. print "\n".join('case #%d: %s' % (i, tp[i].__name__) for i in range(len(tp)))
  496. print "example: python %s P 1" % sys.argv[0]
  497. print "example: python %s C 1" % sys.argv[0]
  498. exit(-1)
  499. mode = sys.argv[1]
  500. #gid = sys.argv[2] if sys.argv[2] <> None else "q-group"
  501. tp[int(sys.argv[2])](mode)
  502. #time.sleep(30)
  503. # while 1:
  504. # try:
  505. # time.sleep(5)
  506. # pass
  507. # except (KeyboardInterrupt, SystemExit):
  508. # logging.error('caught user interrupt')
  509. # sys.exit(-1)
  510. if __name__ == "__main__":
  511. logging.basicConfig(
  512. format='%(asctime)s.%(msecs)s:%(name)s:%(thread)d:%(levelname)s:%(process)d:%(message)s',
  513. level=logging.INFO
  514. )
  515. main()