base_messaging.py 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675
  1. #!/usr/bin/env python
  2. import threading, logging, time
  3. import sys
  4. import copy
  5. import datetime
  6. import uuid
  7. from Queue import Queue
  8. from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor
  9. from misc2.observer import NotImplementedException
  10. from kafka import KafkaConsumer, KafkaProducer
  11. from kafka.structs import TopicPartition
  12. from kafka.errors import NoBrokersAvailable
  13. #
  14. # packages required for ConsumerNextIteratorPersist
  15. import json
  16. from redis import Redis
  17. from misc2.observer import Subscriber, Publisher
  18. from numpy.distutils.fcompiler import none
  19. class Producer(threading.Thread):
  20. daemon = True
  21. def run(self):
  22. try:
  23. producer = KafkaProducer(bootstrap_servers='localhost:9092')
  24. topics = ['my_topic', 'my_topic2']
  25. i = 0
  26. while True:
  27. #today = datetime.date.today()
  28. s = "%d %s test %s" % (i, topics[i%2], time.strftime("%b %d %Y %H:%M:%S"))
  29. logging.info(s)
  30. producer.send(topics[i%2], s)
  31. time.sleep(.45)
  32. i=i+1
  33. except NoBrokersAvailable:
  34. logging.error("NoBrokersAvailable: Has kafka started?")
  35. class BaseProducer(threading.Thread, Subscriber):
  36. def __init__(self, group=None, target=None, name=None,
  37. args=(), kwargs=None, verbose=None):
  38. threading.Thread.__init__(self, group=group, target=target, name=name,
  39. verbose=verbose)
  40. """
  41. kwargs:
  42. bootstrap_host
  43. bootstrap_host
  44. redis_host
  45. session_timeout_ms:
  46. """
  47. self.name = '%s-%s' % (name, uuid.uuid5(uuid.NAMESPACE_OID, name))
  48. logging.info('BaseProducer __init__: name=%s' % self.name)
  49. self.args = args
  50. self.kwargs = kwargs
  51. self.event_q = Queue()
  52. return
  53. def send_message(self, topic, plain_text):
  54. self.event_q.put((topic, plain_text))
  55. self.event_q.task_done()
  56. def set_stop(self):
  57. self.done = True
  58. def run(self):
  59. try:
  60. producer = KafkaProducer(bootstrap_servers='%s:%s' % (self.kwargs['bootstrap_host'], self.kwargs['bootstrap_port']))
  61. self.done = False
  62. while self.done <> True:
  63. #today = datetime.date.today()
  64. if not self.event_q.empty():
  65. topic, plain_text = self.event_q.get()
  66. #s = "BaseProducer topic:[%s] msg:[%s]" % (i, topics[i%2], time.strftime("%b %d %Y %H:%M:%S"))
  67. logging.info("BaseProducer topic:[%s] msg:[%s]" % (topic, plain_text))
  68. producer.send(topic, plain_text)
  69. # to prevent excessive CPU use
  70. time.sleep(0.1)
  71. logging.info ('******** BaseProducer exit done.')
  72. producer.close(1)
  73. except NoBrokersAvailable:
  74. logging.error("NoBrokersAvailable: Has kafka started?")
  75. class BaseConsumer(threading.Thread, Publisher):
  76. #KB_EVENT = "on_kb_event"
  77. SLOW_CONSUMER_CHECK_EVERY = 50
  78. SLOW_CONSUMER_QUALIFY_NUM = 500
  79. KB_REACHED_LAST_OFFSET = "on_kb_reached_last_offset"
  80. #my_topics = {'my_topic':{}, 'my_topic2':{}}
  81. def __init__(self, group=None, target=None, name=None,
  82. args=(), kwargs=None, verbose=None):
  83. threading.Thread.__init__(self, group=group, target=target, name=name,
  84. verbose=verbose)
  85. """
  86. kwargs:
  87. bootstrap_host
  88. bootstrap_host
  89. redis_host
  90. redis_port
  91. redis_db
  92. group_id
  93. consumer_id: name
  94. topics- a list of topic strings
  95. session_timeout_ms
  96. consumer_timeout_ms
  97. seek_to_end- a list of topics that only wants the latest message
  98. """
  99. self.name = '%s-%s' % (name, uuid.uuid5(uuid.NAMESPACE_OID, name))
  100. logging.info('BaseConsumer __init__: name=%s' % self.name)
  101. self.args = args
  102. self.kwargs = kwargs
  103. self.rs = Redis(self.kwargs['redis_host'], self.kwargs['redis_port'], self.kwargs['redis_db'])
  104. try:
  105. self.kwargs['seek_to_end']
  106. except KeyError:
  107. self.kwargs['seek_to_end'] = []
  108. self.my_topics = {}
  109. for t in self.kwargs['topics']:
  110. self.my_topics[t]= {}
  111. #self.events = {event: dict() for event in [BaseConsumer.KB_EVENT, BaseConsumer.KB_REACHED_LAST_OFFSET]}
  112. self.events = {event: dict() for event in [BaseConsumer.KB_REACHED_LAST_OFFSET] + self.kwargs['topics']}
  113. return
  114. """
  115. each consumer has its own set of topics offsets stored in redis
  116. for consumer A and consumer B (with different group_ids) subscribing to the same my_topic, each
  117. each of them will need to keep track of its own offsets
  118. to make the redis key unique by consumer, the key is created by topic + '@' + consumer name
  119. example: my_topic@consumerA
  120. offsets = { topic-consumer_name:
  121. {
  122. partition0: offset0,
  123. partition1: offset1,...
  124. }
  125. }
  126. """
  127. def consumer_topic(self, tp):
  128. return tp + '@' + self.name
  129. def clear_offsets(self):
  130. """
  131. clear the offsets in redis by removing the value from redis
  132. and emptying the internal my_topics dict
  133. clear offsets is ncessary when the offset in redis was saved
  134. at a time since kafka manager was shut down
  135. when kafka restarts, previously buffered
  136. messages are no longer available and instead it will restart its offset at 0.
  137. Reading an old offset by BaseConsumer will cause it to think that it
  138. is still receving old buffered messages from Kafa but in fact all the messages
  139. since the last shut down of kafka are all gone
  140. """
  141. for t in self.kwargs['topics']:
  142. self.my_topics[t]= {}
  143. logging.info("BaseConsumer:clear_offsets Deleting %s from redis..." % self.consumer_topic(t))
  144. self.rs.delete(self.consumer_topic(t))
  145. def persist_offsets(self, topic, partition, offset):
  146. #self.rs.set(self.consumer_topic(topic), json.dumps({'partition': partition, 'offset':offset}))
  147. self.my_topics[topic][str(partition)] = offset
  148. self.rs.set(self.consumer_topic(topic), json.dumps(self.my_topics[topic]))
  149. def enrich_message(self, message):
  150. return {'value': message.value, 'partition':message.partition, 'offset': message.offset}
  151. def set_stop(self):
  152. self.done = True
  153. def run(self):
  154. logging.info('BaseConsumer:run. %s:%s started' % (self.kwargs['group_id'], self.name))
  155. if self.kwargs['clear_offsets'] == True:
  156. self.clear_offsets()
  157. consumer = KafkaConsumer(bootstrap_servers='%s:%s' % (self.kwargs['bootstrap_host'], self.kwargs['bootstrap_port']),
  158. auto_offset_reset='earliest',
  159. #
  160. # consumers having the same group id works as
  161. # a group to seize messages published by the publisher
  162. # (like a queue, each message is consumed exactly once
  163. # by a consumer)
  164. #
  165. #
  166. # in a 'pub-sub' environment, set each consumer having
  167. # a unique group_id
  168. #
  169. group_id = self.kwargs['group_id'],
  170. client_id = self.name,
  171. #
  172. # session_timeout_ms is the time it takes for another consumer
  173. # that has the same group_id to pick up the work
  174. # to consume messages when this consumer is dead
  175. #
  176. session_timeout_ms = self.kwargs['session_timeout_ms'],
  177. #
  178. #
  179. #
  180. partition_assignment_strategy=[RoundRobinPartitionAssignor],
  181. #
  182. #
  183. consumer_timeout_ms=self.kwargs['consumer_timeout_ms']
  184. )
  185. for topic in self.my_topics.keys():
  186. #
  187. # if a topic offset is stored previously (the consumer was run before),
  188. # then load the offset values
  189. # and save it locally into the my_topics map
  190. # else self.my_topics[topic] would have zero elements in it
  191. if self.rs.keys(self.consumer_topic(topic)):
  192. self.my_topics[topic] = json.loads(self.rs.get(self.consumer_topic(topic)))
  193. logging.info('BaseConsumer:run. Topics subscribed: %s' % self.my_topics)
  194. consumer.subscribe(self.my_topics.keys())
  195. #consumer.seek_to_end(TopicPartition(topic='my_topic', partition=0))
  196. self.done = False
  197. while self.done <> True:
  198. try:
  199. message = consumer.next()
  200. # the next if block is there to serve information purpose only
  201. # it may be useful to detect slow consumer situation
  202. if message.offset % BaseConsumer.SLOW_CONSUMER_QUALIFY_NUM == 0:
  203. highwater = consumer.highwater(TopicPartition(message.topic, message.partition))
  204. logging.info( "BaseConsumer [%s]:highwater:%d offset:%d part:%d <%s>" % (self.name, highwater, message.offset, message.partition, message.value))
  205. if highwater - message.offset >= BaseConsumer.SLOW_CONSUMER_QUALIFY_NUM:
  206. logging.warn("BaseConsumer:run Slow consumer detected! current: %d, highwater:%d, gap:%d" %
  207. (message.offset, highwater, highwater - message.offset))
  208. # the next block is designed to handle the first time the
  209. # consumer encounters a topic partition it hasnt' seen yet
  210. try:
  211. # the try catch block ensures that on the first encounter of a topic/partition
  212. # the except block is executed once and only once, thereafter since the
  213. # dictionary object keys are assigned the exception will never
  214. # be caught again
  215. #
  216. # the try catch is supposed to be faster than a if-else block...
  217. #
  218. # the below statement has no meaning, it is there merely to ensure that
  219. # the block fails on the first run
  220. self.my_topics[message.topic][str(message.partition)]
  221. except KeyError:
  222. highwater = consumer.highwater(TopicPartition(message.topic, message.partition))
  223. gap = highwater - message.offset
  224. logging.info( "*** On first iteration: [Topic:%s:Part:%d:Offset:%d]: Number of messages lagging behind= %d. Highwater:%d"
  225. % (message.topic, message.partition, message.offset, gap, highwater))
  226. for t, ps in map(lambda t: (t, consumer.partitions_for_topic(t)), self.my_topics.keys()):
  227. logging.info ("*** On first iteration: Topic Partition Table: topic:[%s] %s" % (t.rjust(20),
  228. ','.join('partition:%d, offset:%d' % (p, consumer.position(TopicPartition(topic=t, partition=p))) for p in ps)
  229. ))
  230. self.persist_offsets(message.topic, message.partition, message.offset)
  231. self.my_topics[message.topic] = json.loads(self.rs.get(self.consumer_topic(message.topic)))
  232. if message.topic in self.kwargs['seek_to_end']:
  233. # if there is no gap
  234. if gap == 1:
  235. # the message is valid for dispatching and not to be skipped
  236. self.dispatch(message.topic, self.enrich_message(message))
  237. logging.debug('*** On first iteration: Gap=%d Dispatch this valid message to the listener <%s>' % (gap, message.value))
  238. else: # gap exists
  239. logging.info("*** On first iteration: [Topic:%s:Part:%d:Offset:%d]: Gap:%d Attempting to seek to latest message ..."
  240. % (message.topic, message.partition, message.offset, gap))
  241. consumer.seek_to_end((TopicPartition(topic=message.topic, partition= message.partition)))
  242. # skip this message from dispatching to listeners
  243. continue
  244. """
  245. the message.value received from kafaproducer is expected to contain
  246. plain text encoded as a json string
  247. the content of message.value is not altered. it's content is stored in a dict object
  248. with key = 'value' and enriched with additional kafa metadata
  249. it is the subscriber's job to interpret the content stored in the 'value' key. Typically
  250. it means decoding the content by invoking json.loads
  251. """
  252. if self.my_topics[message.topic][str(message.partition)] > message.offset:
  253. logging.info('BaseConsumer ********************** old message...discarding %s %d' % (message.topic, message.offset))
  254. else:
  255. #if self.my_topics[message.topic][str(message.partition)] == message.offset:
  256. # if the stored offset in redis equals to the current offset
  257. # notify the observers
  258. # the "and" condition ensures that on a fresh start of kafka server this event is not triggered as
  259. # both saved value in redis and current offset are both 0
  260. if self.my_topics[message.topic][str(message.partition)] == message.offset and message.offset <> 0:
  261. self.dispatch(BaseConsumer.KB_REACHED_LAST_OFFSET, self.enrich_message(message))
  262. self.dispatch(message.topic, self.enrich_message(message))
  263. logging.info('********************** reached the last message previously processed %s %d' % (message.topic, message.offset))
  264. else:
  265. self.persist_offsets(message.topic, message.partition, message.offset)
  266. #self.dispatch(BaseConsumer.KB_EVENT, {'message': message})
  267. self.dispatch(message.topic, self.enrich_message(message))
  268. except StopIteration:
  269. logging.debug('BaseConsumer:run StopIteration Caught. No new message arriving...')
  270. continue
  271. consumer.close()
  272. logging.info ('******** BaseConsumer exit done.')
  273. class BaseMessageListener(Subscriber):
  274. def update(self, event, param=none):
  275. try:
  276. event_fn = getattr(self, event)
  277. event_fn(param)
  278. except AttributeError:
  279. err_msg = 'BaseMessageListener:update| function %s not implemented.' % event
  280. logging.error('BaseMessageListener [%s]:update %s' % (self.name, err_msg))
  281. logging.debug("BaseMessageListener [%s]:update|Event type:[%s] content:[%s]" % (self.name, event, json.dumps(param) if param <> None else "<empty param>"))
  282. class SimpleMessageListener(BaseMessageListener):
  283. def __init__(self, name):
  284. BaseMessageListener.__init__(self, name)
  285. self.cnt_my_topic = 0
  286. self.cnt_my_topic2 = 0
  287. # def on_kb_event(self, param):
  288. # print "on_kb_event [%s] %s" % (self.name, param)
  289. def my_topic(self, param):
  290. if self.cnt_my_topic % 50 == 0:
  291. print "SimpleMessageListener:my_topic. %s" % param
  292. self.cnt_my_topic += 1
  293. def my_topic2(self, param):
  294. if self.cnt_my_topic2 % 50 == 0:
  295. print "SimpleMessageListener:my_topic2. %s" % param
  296. self.cnt_my_topic2 += 1
  297. def on_kb_reached_last_offset(self, param):
  298. print "on_kb_reached_last_offset [%s] %s" % (self.name, param)
  299. class Prosumer(BaseProducer):
  300. # wrapper object
  301. PROSUMER_DEFAULT_CONFIG = {
  302. 'bootstrap_servers': 'localhost',
  303. 'client_id': 'kafka-prosumer' ,
  304. 'group_id': 'kafka-prosumer-default-group',
  305. 'key_deserializer': None,
  306. 'value_deserializer': None,
  307. 'fetch_max_wait_ms': 500,
  308. 'fetch_min_bytes': 1,
  309. 'max_partition_fetch_bytes': 1 * 1024 * 1024,
  310. 'request_timeout_ms': 40 * 1000,
  311. 'retry_backoff_ms': 100,
  312. 'reconnect_backoff_ms': 50,
  313. 'max_in_flight_requests_per_connection': 5,
  314. 'auto_offset_reset': 'latest',
  315. 'enable_auto_commit': True,
  316. 'auto_commit_interval_ms': 5000,
  317. 'default_offset_commit_callback': lambda offsets, response: True,
  318. 'check_crcs': True,
  319. 'metadata_max_age_ms': 5 * 60 * 1000,
  320. 'partition_assignment_strategy': (RoundRobinPartitionAssignor),
  321. 'heartbeat_interval_ms': 3000,
  322. 'session_timeout_ms': 30000,
  323. 'max_poll_records': sys.maxsize,
  324. 'receive_buffer_bytes': None,
  325. 'send_buffer_bytes': None,
  326. 'consumer_timeout_ms': 1000,
  327. 'connections_max_idle_ms': 9 * 60 * 1000, # not implemented yet
  328. }
  329. def __init__(self, name, kwargs=None):
  330. self.kwargs = copy.copy(self.PROSUMER_DEFAULT_CONFIG)
  331. for key in self.kwargs:
  332. if key in kwargs:
  333. self.kwargs[key] = kwargs.pop(key)
  334. self.kwargs.update(kwargs)
  335. logging.info('\nProsumer:init: **** Configurations dump ***')
  336. logging.info('\n'.join('%s:%s' % (k.ljust(40), self.kwargs[k]) for k in sorted(self.kwargs)))
  337. BaseProducer.__init__(self, group=None, target=None, name=name,
  338. args=(), kwargs=self.kwargs, verbose=None)
  339. self.kconsumer = BaseConsumer(name=name, kwargs=self.kwargs)
  340. def add_listener_topics(self, listener, topics):
  341. try:
  342. map(lambda e: self.kconsumer.register(e, listener, getattr(listener, e)), topics)
  343. except AttributeError as e:
  344. logging.error("Prosumer:add_listener_topics. Function not implemented in the listener. %s" % e)
  345. raise NotImplementedException
  346. def add_listeners(self, listeners):
  347. for l in listeners:
  348. map(lambda e: self.kconsumer.register(e, l, getattr(l, e)), self.kwargs['topics'])
  349. def is_stopped(self):
  350. return self.stopped
  351. def set_stop(self):
  352. BaseProducer.set_stop(self)
  353. self.kconsumer.set_stop()
  354. logging.info('Prosumer:set_stop. Pending kconsumer to shutdown in 2s...')
  355. self.stopped = True
  356. def start_prosumer(self):
  357. self.stopped = False
  358. self.kconsumer.start()
  359. self.start()
  360. def message_loads(self, text_msg):
  361. return json.loads(text_msg)
  362. def message_dumps(self, obj_msg):
  363. return json.dumps(obj_msg)
  364. class SubscriptionListener(BaseMessageListener):
  365. def __init__(self, name, producer):
  366. BaseMessageListener.__init__(self, name)
  367. self.producer = producer
  368. self.i = 0
  369. def gw_subscription_changed(self, event, items):
  370. logging.info("[%s] received gw_subscription_changed content: [%s]" % (self.name, items))
  371. #print 'SubscriptionListener:gw_subscription_changed %s' % items
  372. # def on_kb_event(self, param):
  373. # print "on_kb_event [%s] %s" % (self.name, param)
  374. def gw_req_subscriptions(self, event, items):
  375. logging.info("[%s] received gw_req_subscriptions content:[%s]" % (self.name, items))
  376. vars= self.producer.message_loads(items['value'])
  377. self.producer.send_message('gw_subscription_changed', self.producer.message_dumps({'id': self.i, 'reqid': vars['reqid'],
  378. 'response' : "%s" % (time.strftime("%b %d %Y %H:%M:%S"))})
  379. )
  380. self.i = self.i + 1
  381. def reqMktData(self, event, items):
  382. logging.info("[%s] received %s content:[%s]" % (self.name, event, items))
  383. self.producer.send_message('tickPrice',
  384. self.producer.message_dumps({'field':4, 'typeName':'tickPrice', 'price':1.0682, 'ts':1485661437.83, 'source':'IB', 'tickerId':79, 'canAutoExecute':0}))
  385. def tickPrice(self, event, items):
  386. logging.info("[%s] received %s content:[%s]" % (self.name, event, items))
  387. def on_kb_reached_last_offset(self, event, items):
  388. logging.info("[%s] received on_kb_reached_last_offset content: [%s]" % (self.name, items))
  389. print "on_kb_reached_last_offset [%s] %s" % (self.name, items)
  390. def test_prosumer2(mode):
  391. if mode == 'A':
  392. topicsA = ['gw_subscription_changed', 'tickPrice']
  393. pA = Prosumer(name='A', kwargs={'bootstrap_host':'localhost', 'bootstrap_port':9092,
  394. 'redis_host':'localhost', 'redis_port':6379, 'redis_db':0,
  395. 'group_id': 'groupA', 'session_timeout_ms':10000,
  396. 'topics': topicsA, 'clear_offsets' : False})
  397. sA = SubscriptionListener('earA', pA)
  398. pA.add_listeners([sA])
  399. pA.start_prosumer()
  400. i = 0
  401. try:
  402. pA.send_message('reqMktData', pA.message_dumps({'contract':'dummy'}))
  403. while True: #i < 5:
  404. #pA.send_message('gw_req_subscriptions', pA.message_dumps({'desc': 'requesting subscription msg counter:%d' % i,
  405. # 'reqid': i}))
  406. i= i + 1
  407. time.sleep(.45)
  408. except (KeyboardInterrupt, SystemExit):
  409. logging.error('caught user interrupt')
  410. pA.set_stop()
  411. pA.join()
  412. else:
  413. topicsB = ['gw_req_subscriptions', 'reqMktData']
  414. pB = Prosumer(name='B', kwargs={'bootstrap_host':'localhost', 'bootstrap_port':9092,
  415. 'redis_host':'localhost', 'redis_port':6379, 'redis_db':0,
  416. 'group_id': 'groupB', 'session_timeout_ms':10000,
  417. 'topics': topicsB, 'clear_offsets' : False})
  418. sB = SubscriptionListener('earB', pB)
  419. pB.add_listeners([sB])
  420. pB.start_prosumer()
  421. try:
  422. while True: #i < 5:
  423. pB.send_message('tickPrice',
  424. pB.message_dumps({'field':5, 'typeName':'tickPrice', 'price':2.0682, 'ts':1485661437.83, 'source':'IB', 'tickerId':79, 'canAutoExecute':0}))
  425. time.sleep(.45)
  426. except (KeyboardInterrupt, SystemExit):
  427. logging.error('caught user interrupt')
  428. pB.set_stop()
  429. pB.join()
  430. class TestProducer(BaseProducer):
  431. pass
  432. def test_base_proconsumer(mode):
  433. '''
  434. This example demonstrates
  435. 1) use of consumer_timeout_ms to break out from the consumer.next loop
  436. 2) how to trap ctrl-c and break out of the running threads
  437. 3) using Queue to store calls to producer.send_message
  438. 4) using redis to store the consumer last processed offsets
  439. 5) use of try-catch block to implement seek_to_latest offset
  440. 6) inherit and implement MessageListener to subscribe messages dispatched by the consumer
  441. '''
  442. if mode == 'P':
  443. #Producer().start()
  444. topics = ['my_topic', 'my_topic2']
  445. tp = TestProducer(name = 'testproducer', kwargs={
  446. 'bootstrap_host':'localhost', 'bootstrap_port':9092,
  447. 'topics': topics})
  448. tp.start()
  449. i = 0
  450. while True:
  451. #today = datetime.date.today()
  452. try:
  453. s = "%d %s test %s" % (i, topics[i%2], time.strftime("%b %d %Y %H:%M:%S"))
  454. logging.info(s)
  455. tp.send_message(topics[i%2], s)
  456. time.sleep(.25)
  457. i=i+1
  458. except (KeyboardInterrupt, SystemExit):
  459. logging.error('caught user interrupt')
  460. tp.set_stop()
  461. tp.join()
  462. sys.exit(-1)
  463. else:
  464. bc = BaseConsumer(name='bc', kwargs={'redis_host':'localhost', 'redis_port':6379, 'redis_db':0,
  465. 'bootstrap_host':'localhost', 'bootstrap_port':9092,
  466. 'group_id':'gid', 'session_timeout_ms':10000, 'topics': ['my_topic', 'my_topic2'],
  467. 'clear_offsets': True, 'consumer_timeout_ms':1000,
  468. # uncomment the next line to process messages from since the program was last shut down
  469. # if seek_to_end is present, for the topic specified the consumer will begin
  470. # sending the latest message to the listener
  471. # note that the list only specifies my_topic to receive the latest
  472. # but not my_topic2. Observe the different behavior by checking the message offset
  473. # in the program log
  474. 'seek_to_end': ['my_topic'],
  475. })
  476. #bml = BaseMessageListener('bml')
  477. sml = SimpleMessageListener('simple')
  478. bc.register(BaseConsumer.KB_REACHED_LAST_OFFSET, sml)
  479. bc.register('my_topic', sml)
  480. bc.register('my_topic2', sml)
  481. bc.start()
  482. def main():
  483. #
  484. # test cases
  485. #
  486. tp = [ test_base_proconsumer, test_prosumer2]
  487. if len(sys.argv) != 3:
  488. print("Usage: %s <role(producer or consumer): P|C> <test case #[0..1]>" % sys.argv[0])
  489. print "\n".join('case #%d: %s' % (i, tp[i].__name__) for i in range(len(tp)))
  490. print "example: python %s P 1" % sys.argv[0]
  491. print "example: python %s C 1" % sys.argv[0]
  492. exit(-1)
  493. mode = sys.argv[1]
  494. #gid = sys.argv[2] if sys.argv[2] <> None else "q-group"
  495. tp[int(sys.argv[2])](mode)
  496. #time.sleep(30)
  497. # while 1:
  498. # try:
  499. # time.sleep(5)
  500. # pass
  501. # except (KeyboardInterrupt, SystemExit):
  502. # logging.error('caught user interrupt')
  503. # sys.exit(-1)
  504. if __name__ == "__main__":
  505. logging.basicConfig(
  506. format='%(asctime)s.%(msecs)s:%(name)s:%(thread)d:%(levelname)s:%(process)d:%(message)s',
  507. level=logging.INFO
  508. )
  509. main()