base_messaging.py 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704
  1. #!/usr/bin/env python
  2. import threading, logging, time, traceback
  3. import sys
  4. import copy
  5. import datetime
  6. import uuid
  7. from Queue import Queue
  8. from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor
  9. from misc2.observer import NotImplementedException
  10. from kafka import KafkaConsumer, KafkaProducer
  11. from kafka.structs import TopicPartition
  12. from kafka.errors import NoBrokersAvailable
  13. #
  14. # packages required for ConsumerNextIteratorPersist
  15. import json
  16. from redis import Redis
  17. from misc2.observer import Subscriber, Publisher
  18. from numpy.distutils.fcompiler import none
  19. from types import NoneType
  20. class Producer(threading.Thread):
  21. daemon = True
  22. def run(self):
  23. try:
  24. producer = KafkaProducer(bootstrap_servers='localhost:9092')
  25. topics = ['my_topic', 'my_topic2']
  26. i = 0
  27. while True:
  28. #today = datetime.date.today()
  29. s = "%d %s test %s" % (i, topics[i%2], time.strftime("%b %d %Y %H:%M:%S"))
  30. logging.info(s)
  31. producer.send(topics[i%2], s)
  32. time.sleep(.45)
  33. i=i+1
  34. except NoBrokersAvailable:
  35. logging.error("NoBrokersAvailable: Has kafka started?")
  36. class BaseProducer(threading.Thread, Subscriber):
  37. def __init__(self, group=None, target=None, name=None,
  38. args=(), kwargs=None, verbose=None):
  39. threading.Thread.__init__(self, group=group, target=target, name=name,
  40. verbose=verbose)
  41. """
  42. kwargs:
  43. bootstrap_host
  44. bootstrap_host
  45. redis_host
  46. session_timeout_ms:
  47. """
  48. self.name = '%s-%s' % (name, uuid.uuid5(uuid.NAMESPACE_OID, name))
  49. logging.info('BaseProducer __init__: name=%s' % self.name)
  50. self.args = args
  51. self.kwargs = kwargs
  52. self.event_q = Queue()
  53. return
  54. def send_message(self, topic, plain_text):
  55. self.event_q.put((topic, plain_text))
  56. self.event_q.task_done()
  57. def set_stop(self):
  58. self.done = True
  59. def run(self):
  60. try:
  61. producer = KafkaProducer(bootstrap_servers='%s:%s' % (self.kwargs['bootstrap_host'], self.kwargs['bootstrap_port']))
  62. self.done = False
  63. while self.done <> True:
  64. #today = datetime.date.today()
  65. #if not self.event_q.empty():
  66. while not self.event_q.empty():
  67. topic, plain_text = self.event_q.get()
  68. #s = "BaseProducer topic:[%s] msg:[%s]" % (i, topics[i%2], time.strftime("%b %d %Y %H:%M:%S"))
  69. logging.info("BaseProducer topic:[%s] msg:[%s]" % (topic, plain_text))
  70. producer.send(topic, plain_text)
  71. # to prevent excessive CPU use
  72. time.sleep(0.1)
  73. logging.info ('******** BaseProducer exit done.')
  74. producer.close(1)
  75. except NoBrokersAvailable:
  76. logging.error("NoBrokersAvailable: Has kafka started?")
  77. class BaseConsumer(threading.Thread, Publisher):
  78. #KB_EVENT = "on_kb_event"
  79. SLOW_CONSUMER_CHECK_EVERY = 50
  80. SLOW_CONSUMER_QUALIFY_NUM = 500
  81. KB_REACHED_LAST_OFFSET = "on_kb_reached_last_offset"
  82. #my_topics = {'my_topic':{}, 'my_topic2':{}}
  83. def __init__(self, group=None, target=None, name=None,
  84. args=(), kwargs=None, verbose=None):
  85. threading.Thread.__init__(self, group=group, target=target, name=name,
  86. verbose=verbose)
  87. """
  88. kwargs:
  89. bootstrap_host
  90. bootstrap_host
  91. redis_host
  92. redis_port
  93. redis_db
  94. group_id
  95. consumer_id: name
  96. topics- a list of topic strings
  97. session_timeout_ms
  98. consumer_timeout_ms
  99. seek_to_end- a list of topics that only wants the latest message
  100. """
  101. self.name = '%s-%s' % (name, uuid.uuid5(uuid.NAMESPACE_OID, name))
  102. logging.info('BaseConsumer __init__: name=%s' % self.name)
  103. self.args = args
  104. self.kwargs = kwargs
  105. self.rs = Redis(self.kwargs['redis_host'], self.kwargs['redis_port'], self.kwargs['redis_db'])
  106. try:
  107. self.kwargs['seek_to_end']
  108. except KeyError:
  109. self.kwargs['seek_to_end'] = []
  110. self.my_topics = {}
  111. for t in self.kwargs['topics']:
  112. self.my_topics[t]= {}
  113. #self.events = {event: dict() for event in [BaseConsumer.KB_EVENT, BaseConsumer.KB_REACHED_LAST_OFFSET]}
  114. self.events = {event: dict() for event in [BaseConsumer.KB_REACHED_LAST_OFFSET] + self.kwargs['topics']}
  115. return
  116. """
  117. each consumer has its own set of topics offsets stored in redis
  118. for consumer A and consumer B (with different group_ids) subscribing to the same my_topic, each
  119. each of them will need to keep track of its own offsets
  120. to make the redis key unique by consumer, the key is created by topic + '@' + consumer name
  121. example: my_topic@consumerA
  122. offsets = { topic-consumer_name:
  123. {
  124. partition0: offset0,
  125. partition1: offset1,...
  126. }
  127. }
  128. """
  129. def consumer_topic(self, tp):
  130. return tp + '@' + self.name
  131. def clear_offsets(self):
  132. """
  133. clear the offsets in redis by removing the value from redis
  134. and emptying the internal my_topics dict
  135. clear offsets is ncessary when the offset in redis was saved
  136. at a time since kafka manager was shut down
  137. when kafka restarts, previously buffered
  138. messages are no longer available and instead it will restart its offset at 0.
  139. Reading an old offset by BaseConsumer will cause it to think that it
  140. is still receving old buffered messages from Kafa but in fact all the messages
  141. since the last shut down of kafka are all gone
  142. """
  143. for t in self.kwargs['topics']:
  144. self.my_topics[t]= {}
  145. logging.info("BaseConsumer:clear_offsets Deleting %s from redis..." % self.consumer_topic(t))
  146. self.rs.delete(self.consumer_topic(t))
  147. def persist_offsets(self, topic, partition, offset):
  148. #self.rs.set(self.consumer_topic(topic), json.dumps({'partition': partition, 'offset':offset}))
  149. self.my_topics[topic][str(partition)] = offset
  150. self.rs.set(self.consumer_topic(topic), json.dumps(self.my_topics[topic]))
  151. def extract_message_content(self, message):
  152. #logging.info('BaseConsumer: extract_message_content. %s %s' % (type(message), message))
  153. try:
  154. return json.loads(message.value)
  155. except ValueError:
  156. logging.info('extract_message_content exception: %s' % message)
  157. return {}
  158. def set_stop(self):
  159. self.done = True
  160. def run(self):
  161. logging.info('BaseConsumer:run. %s:%s started' % (self.kwargs['group_id'], self.name))
  162. if self.kwargs['clear_offsets'] == True:
  163. self.clear_offsets()
  164. consumer = KafkaConsumer(bootstrap_servers='%s:%s' % (self.kwargs['bootstrap_host'], self.kwargs['bootstrap_port']),
  165. auto_offset_reset='earliest',
  166. #
  167. # consumers having the same group id works as
  168. # a group to seize messages published by the publisher
  169. # (like a queue, each message is consumed exactly once
  170. # by a consumer)
  171. #
  172. #
  173. # in a 'pub-sub' environment, set each consumer having
  174. # a unique group_id
  175. #
  176. group_id = self.kwargs['group_id'],
  177. client_id = self.name,
  178. #
  179. # session_timeout_ms is the time it takes for another consumer
  180. # that has the same group_id to pick up the work
  181. # to consume messages when this consumer is dead
  182. #
  183. session_timeout_ms = self.kwargs['session_timeout_ms'],
  184. #
  185. #
  186. #
  187. partition_assignment_strategy=[RoundRobinPartitionAssignor],
  188. #
  189. #
  190. consumer_timeout_ms=self.kwargs['consumer_timeout_ms']
  191. )
  192. for topic in self.my_topics.keys():
  193. #
  194. # if a topic offset is stored previously (the consumer was run before),
  195. # then load the offset values
  196. # and save it locally into the my_topics map
  197. # else self.my_topics[topic] would have zero elements in it
  198. if self.rs.keys(self.consumer_topic(topic)):
  199. self.my_topics[topic] = json.loads(self.rs.get(self.consumer_topic(topic)))
  200. logging.info('BaseConsumer:run. Topics subscribed: %s' % self.my_topics)
  201. consumer.subscribe(self.my_topics.keys())
  202. #consumer.seek_to_end(TopicPartition(topic='my_topic', partition=0))
  203. self.done = False
  204. while self.done <> True:
  205. try:
  206. message = consumer.next()
  207. # the next if block is there to serve information purpose only
  208. # it may be useful to detect slow consumer situation
  209. if message.offset % BaseConsumer.SLOW_CONSUMER_QUALIFY_NUM == 0:
  210. highwater = consumer.highwater(TopicPartition(message.topic, message.partition))
  211. logging.info( "BaseConsumer [%s]:highwater:%d offset:%d part:%d <%s>" % (self.name, highwater, message.offset, message.partition, message.topic))
  212. if highwater - message.offset >= BaseConsumer.SLOW_CONSUMER_QUALIFY_NUM:
  213. logging.warn("BaseConsumer:run Slow consumer detected! current: %d, highwater:%d, gap:%d" %
  214. (message.offset, highwater, highwater - message.offset))
  215. # the next block is designed to handle the first time the
  216. # consumer encounters a topic partition it hasnt' seen yet
  217. try:
  218. # the try catch block ensures that on the first encounter of a topic/partition
  219. # the except block is executed once and only once, thereafter since the
  220. # dictionary object keys are assigned the exception will never
  221. # be caught again
  222. #
  223. # the try catch is supposed to be faster than a if-else block...
  224. #
  225. # the below statement has no meaning, it is there merely to ensure that
  226. # the block fails on the first run
  227. self.my_topics[message.topic][str(message.partition)]
  228. except KeyError:
  229. highwater = consumer.highwater(TopicPartition(message.topic, message.partition))
  230. gap = highwater - message.offset
  231. logging.info( "*** On first iteration: [Topic:%s:Part:%d:Offset:%d]: Number of messages lagging behind= %d. Highwater:%d"
  232. % (message.topic, message.partition, message.offset, gap, highwater))
  233. for t, ps in map(lambda t: (t, consumer.partitions_for_topic(t)), self.my_topics.keys()):
  234. try:
  235. logging.info ("*** On first iteration: T/P Table: topic:[%s] %s" % (t.rjust(25),
  236. ','.join('part:%d, off:%d' % (p, consumer.position(TopicPartition(topic=t, partition=p))) for p in ps)
  237. ))
  238. except TypeError:
  239. logging.warn ('*** On first iteration: [*** %s not registered in kafka topics yet ***]. This message should go away the next time the program is run.' % t)
  240. continue
  241. self.persist_offsets(message.topic, message.partition, message.offset)
  242. self.my_topics[message.topic] = json.loads(self.rs.get(self.consumer_topic(message.topic)))
  243. if '*' in self.kwargs['seek_to_end'] or message.topic in self.kwargs['seek_to_end']:
  244. #print 'baseconsumer run %s %d' % (message.topic, gap)
  245. # if there is no gap
  246. '''
  247. use seek_to_end only for messages that keep streaming and you don't
  248. care whether messages are lost or not
  249. '''
  250. if gap <=1:
  251. # the message is valid for dispatching and not to be skipped
  252. self.dispatch(message.topic, self.extract_message_content(message))
  253. logging.debug('*** On first iteration: Gap=%d Dispatch this valid message to the listener <%s>' % (gap, message.value))
  254. else: # gap exists
  255. logging.info("*** On first iteration: [Topic:%s:Part:%d:Offset:%d]: Gap:%d Attempting to seek to latest message ..."
  256. % (message.topic, message.partition, message.offset, gap))
  257. consumer.seek_to_end((TopicPartition(topic=message.topic, partition= message.partition)))
  258. # skip this message from dispatching to listeners
  259. continue
  260. """
  261. the message.value received from kafaproducer is expected to contain
  262. plain text encoded as a json string
  263. the content of message.value is not altered. it's content is stored in a dict object
  264. with key = 'value' and enriched with additional kafa metadata
  265. it is the subscriber's job to interpret the content stored in the 'value' key. Typically
  266. it means decoding the content by invoking json.loads
  267. """
  268. if self.my_topics[message.topic][str(message.partition)] > message.offset:
  269. logging.info('BaseConsumer ********************** old message...discarding %s %d' % (message.topic, message.offset))
  270. else:
  271. #if self.my_topics[message.topic][str(message.partition)] == message.offset:
  272. # if the stored offset in redis equals to the current offset
  273. # notify the observers
  274. # the "and" condition ensures that on a fresh start of kafka server this event is not triggered as
  275. # both saved value in redis and current offset are both 0
  276. if self.my_topics[message.topic][str(message.partition)] == message.offset and message.offset <> 0:
  277. self.dispatch(BaseConsumer.KB_REACHED_LAST_OFFSET, self.extract_message_content(message))
  278. self.dispatch(message.topic, self.extract_message_content(message))
  279. logging.info('********************** reached the last message previously processed %s %d' % (message.topic, message.offset))
  280. else:
  281. self.persist_offsets(message.topic, message.partition, message.offset)
  282. self.dispatch(message.topic, self.extract_message_content(message))
  283. except StopIteration:
  284. logging.debug('BaseConsumer:run StopIteration Caught. No new message arriving...')
  285. continue
  286. except TypeError:
  287. logging.error('BaseConsumer:run. Caught TypeError Exception while processing a message. Malformat json string? %s: %s' % (message.topic, message.value))
  288. logging.error(traceback.format_exc())
  289. consumer.close()
  290. logging.info ('******** BaseConsumer exit done.')
  291. class BaseMessageListener(Subscriber):
  292. def __init__(self, name):
  293. self.name = name
  294. def update(self, event, param=none):
  295. try:
  296. event_fn = getattr(self, event)
  297. event_fn(param)
  298. except AttributeError:
  299. err_msg = 'BaseMessageListener:update| function %s not implemented.' % event
  300. logging.error('BaseMessageListener [%s]:update %s' % (self.name, err_msg))
  301. logging.debug("BaseMessageListener [%s]:update|Event type:[%s] content:[%s]" % (self.name, event, json.dumps(param) if param <> None else "<empty param>"))
  302. class SimpleMessageListener(BaseMessageListener):
  303. def __init__(self, name):
  304. BaseMessageListener.__init__(self, name)
  305. self.cnt_my_topic = 0
  306. self.cnt_my_topic2 = 0
  307. # def on_kb_event(self, param):
  308. # print "on_kb_event [%s] %s" % (self.name, param)
  309. def my_topic(self, param):
  310. if self.cnt_my_topic % 50 == 0:
  311. print "SimpleMessageListener:my_topic. %s" % param
  312. self.cnt_my_topic += 1
  313. def my_topic2(self, param):
  314. if self.cnt_my_topic2 % 50 == 0:
  315. print "SimpleMessageListener:my_topic2. %s" % param
  316. self.cnt_my_topic2 += 1
  317. def on_kb_reached_last_offset(self, param):
  318. print "on_kb_reached_last_offset [%s] %s" % (self.name, param)
  319. class Prosumer(BaseProducer):
  320. # wrapper object
  321. PROSUMER_DEFAULT_CONFIG = {
  322. 'bootstrap_servers': 'localhost',
  323. 'client_id': 'kafka-prosumer' ,
  324. 'group_id': 'kafka-prosumer-default-group',
  325. 'key_deserializer': None,
  326. 'value_deserializer': None,
  327. 'fetch_max_wait_ms': 500,
  328. 'fetch_min_bytes': 1,
  329. 'max_partition_fetch_bytes': 1 * 1024 * 1024,
  330. 'request_timeout_ms': 40 * 1000,
  331. 'retry_backoff_ms': 100,
  332. 'reconnect_backoff_ms': 50,
  333. 'max_in_flight_requests_per_connection': 5,
  334. 'auto_offset_reset': 'latest',
  335. 'enable_auto_commit': True,
  336. 'auto_commit_interval_ms': 5000,
  337. 'default_offset_commit_callback': lambda offsets, response: True,
  338. 'check_crcs': True,
  339. 'metadata_max_age_ms': 5 * 60 * 1000,
  340. 'partition_assignment_strategy': (RoundRobinPartitionAssignor),
  341. 'heartbeat_interval_ms': 3000,
  342. 'session_timeout_ms': 30000,
  343. 'max_poll_records': sys.maxsize,
  344. 'receive_buffer_bytes': None,
  345. 'send_buffer_bytes': None,
  346. 'consumer_timeout_ms': 1000,
  347. 'connections_max_idle_ms': 9 * 60 * 1000, # not implemented yet
  348. }
  349. def __init__(self, name, kwargs=None):
  350. self.kwargs = copy.copy(self.PROSUMER_DEFAULT_CONFIG)
  351. for key in self.kwargs:
  352. if key in kwargs:
  353. self.kwargs[key] = kwargs.pop(key)
  354. self.kwargs.update(kwargs)
  355. logging.info('\nProsumer:init: **** Configurations dump ***')
  356. logging.info('\n'.join('%s:%s' % (k.ljust(40), self.kwargs[k]) for k in sorted(self.kwargs)))
  357. BaseProducer.__init__(self, group=None, target=None, name=name,
  358. args=(), kwargs=self.kwargs, verbose=None)
  359. self.kconsumer = BaseConsumer(name=name, kwargs=self.kwargs)
  360. def add_listener_topics(self, listener, topics):
  361. try:
  362. map(lambda e: self.kconsumer.register(e, listener, getattr(listener, e)), topics)
  363. except AttributeError as e:
  364. logging.error("Prosumer:add_listener_topics. Function not implemented in the listener. %s" % e)
  365. raise NotImplementedException
  366. def add_listeners(self, listeners):
  367. for l in listeners:
  368. map(lambda e: self.kconsumer.register(e, l, getattr(l, e)), self.kwargs['topics'])
  369. def is_stopped(self):
  370. return self.stopped
  371. def set_stop(self):
  372. BaseProducer.set_stop(self)
  373. self.kconsumer.set_stop()
  374. logging.info('Prosumer:set_stop. Pending kconsumer to shutdown in 2s...')
  375. self.stopped = True
  376. def start_prosumer(self):
  377. self.stopped = False
  378. self.kconsumer.start()
  379. self.start()
  380. def message_loads(self, text_msg):
  381. return json.loads(text_msg)
  382. def message_dumps(self, obj_msg):
  383. return json.dumps(obj_msg)
  384. class SubscriptionListener(BaseMessageListener):
  385. '''
  386. test code used by test cases
  387. '''
  388. def __init__(self, name, producer):
  389. BaseMessageListener.__init__(self, name)
  390. self.producer = producer
  391. self.i = 0
  392. def gw_subscription_changed(self, event, items):
  393. logging.info("[%s] received gw_subscription_changed content: [%s]" % (self.name, items))
  394. #print 'SubscriptionListener:gw_subscription_changed %s' % items
  395. # def on_kb_event(self, param):
  396. # print "on_kb_event [%s] %s" % (self.name, param)
  397. def gw_req_subscriptions(self, event, items):
  398. logging.info("[%s] received gw_req_subscriptions content:[%s]" % (self.name, items))
  399. vars= self.producer.message_loads(items['value'])
  400. self.producer.send_message('gw_subscription_changed', self.producer.message_dumps({'id': self.i, 'reqid': vars['reqid'],
  401. 'response' : "%s" % (time.strftime("%b %d %Y %H:%M:%S"))})
  402. )
  403. self.i = self.i + 1
  404. def reqMktData(self, event, items):
  405. logging.info("[%s] received %s content:[%s]" % (self.name, event, items))
  406. self.producer.send_message('tickPrice',
  407. self.producer.message_dumps({'field':4, 'typeName':'tickPrice', 'price':1.0682, 'ts':1485661437.83, 'source':'IB', 'tickerId':79, 'canAutoExecute':0}))
  408. def tickPrice(self, event, items):
  409. logging.info("[%s] received %s content:[%s]" % (self.name, event, items))
  410. def on_kb_reached_last_offset(self, event, items):
  411. logging.info("[%s] received on_kb_reached_last_offset content: [%s]" % (self.name, items))
  412. print "on_kb_reached_last_offset [%s] %s" % (self.name, items)
  413. def test_prosumer2(mode):
  414. if mode == 'A':
  415. topicsA = ['gw_subscription_changed', 'tickPrice']
  416. pA = Prosumer(name='A', kwargs={'bootstrap_host':'localhost', 'bootstrap_port':9092,
  417. 'redis_host':'localhost', 'redis_port':6379, 'redis_db':0,
  418. 'group_id': 'groupA', 'session_timeout_ms':10000,
  419. 'topics': topicsA, 'clear_offsets' : False})
  420. sA = SubscriptionListener('earA', pA)
  421. pA.add_listeners([sA])
  422. pA.start_prosumer()
  423. i = 0
  424. try:
  425. pA.send_message('reqMktData', pA.message_dumps({'contract':'dummy'}))
  426. while True: #i < 5:
  427. #pA.send_message('gw_req_subscriptions', pA.message_dumps({'desc': 'requesting subscription msg counter:%d' % i,
  428. # 'reqid': i}))
  429. i= i + 1
  430. time.sleep(.45)
  431. except (KeyboardInterrupt, SystemExit):
  432. logging.error('caught user interrupt')
  433. pA.set_stop()
  434. pA.join()
  435. else:
  436. topicsB = ['gw_req_subscriptions', 'reqMktData']
  437. pB = Prosumer(name='B', kwargs={'bootstrap_host':'localhost', 'bootstrap_port':9092,
  438. 'redis_host':'localhost', 'redis_port':6379, 'redis_db':0,
  439. 'group_id': 'groupB', 'session_timeout_ms':10000,
  440. 'topics': topicsB, 'clear_offsets' : False})
  441. sB = SubscriptionListener('earB', pB)
  442. pB.add_listeners([sB])
  443. pB.start_prosumer()
  444. try:
  445. while True: #i < 5:
  446. pB.send_message('tickPrice',
  447. pB.message_dumps({'field':5, 'typeName':'tickPrice', 'price':2.0682, 'ts':1485661437.83, 'source':'IB', 'tickerId':79, 'canAutoExecute':0}))
  448. time.sleep(.45)
  449. except (KeyboardInterrupt, SystemExit):
  450. logging.error('caught user interrupt')
  451. pB.set_stop()
  452. pB.join()
  453. class TestProducer(BaseProducer):
  454. pass
  455. def test_base_proconsumer(mode):
  456. '''
  457. This example demonstrates
  458. 1) use of consumer_timeout_ms to break out from the consumer.next loop
  459. 2) how to trap ctrl-c and break out of the running threads
  460. 3) using Queue to store calls to producer.send_message
  461. 4) using redis to store the consumer last processed offsets
  462. 5) use of try-catch block to implement seek_to_latest offset
  463. 6) inherit and implement MessageListener to subscribe messages dispatched by the consumer
  464. '''
  465. if mode == 'P':
  466. #Producer().start()
  467. topics = ['my_topic', 'my_topic2']
  468. tp = TestProducer(name = 'testproducer', kwargs={
  469. 'bootstrap_host':'localhost', 'bootstrap_port':9092,
  470. 'topics': topics})
  471. tp.start()
  472. i = 0
  473. while True:
  474. #today = datetime.date.today()
  475. try:
  476. s = "%d %s test %s" % (i, topics[i%2], time.strftime("%b %d %Y %H:%M:%S"))
  477. logging.info(s)
  478. tp.send_message(topics[i%2], s)
  479. time.sleep(.25)
  480. i=i+1
  481. except (KeyboardInterrupt, SystemExit):
  482. logging.error('caught user interrupt')
  483. tp.set_stop()
  484. tp.join()
  485. sys.exit(-1)
  486. else:
  487. bc = BaseConsumer(name='bc', kwargs={'redis_host':'localhost', 'redis_port':6379, 'redis_db':0,
  488. 'bootstrap_host':'localhost', 'bootstrap_port':9092,
  489. 'group_id':'gid', 'session_timeout_ms':10000, 'topics': ['my_topic', 'my_topic2'],
  490. 'clear_offsets': True, 'consumer_timeout_ms':1000,
  491. # uncomment the next line to process messages from since the program was last shut down
  492. # if seek_to_end is present, for the topic specified the consumer will begin
  493. # sending the latest message to the listener
  494. # note that the list only specifies my_topic to receive the latest
  495. # but not my_topic2. Observe the different behavior by checking the message offset
  496. # in the program log
  497. 'seek_to_end': ['my_topic'],
  498. })
  499. #bml = BaseMessageListener('bml')
  500. sml = SimpleMessageListener('simple')
  501. bc.register(BaseConsumer.KB_REACHED_LAST_OFFSET, sml)
  502. bc.register('my_topic', sml)
  503. bc.register('my_topic2', sml)
  504. bc.start()
  505. def main():
  506. #
  507. # test cases
  508. #
  509. tp = [ test_base_proconsumer, test_prosumer2]
  510. if len(sys.argv) != 3:
  511. print("Usage: %s <role(producer or consumer): P|C> <test case #[0..1]>" % sys.argv[0])
  512. print "\n".join('case #%d: %s' % (i, tp[i].__name__) for i in range(len(tp)))
  513. print "example: python %s P 1" % sys.argv[0]
  514. print "example: python %s C 1" % sys.argv[0]
  515. exit(-1)
  516. mode = sys.argv[1]
  517. #gid = sys.argv[2] if sys.argv[2] <> None else "q-group"
  518. tp[int(sys.argv[2])](mode)
  519. #time.sleep(30)
  520. # while 1:
  521. # try:
  522. # time.sleep(5)
  523. # pass
  524. # except (KeyboardInterrupt, SystemExit):
  525. # logging.error('caught user interrupt')
  526. # sys.exit(-1)
  527. if __name__ == "__main__":
  528. logging.basicConfig(
  529. format='%(asctime)s.%(msecs)s:%(name)s:%(thread)d:%(levelname)s:%(process)d:%(message)s',
  530. level=logging.INFO
  531. )
  532. main()