test_kafka.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328
  1. #!/usr/bin/env python
  2. import threading, logging, time
  3. import signal
  4. import sys
  5. import datetime
  6. from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor
  7. from kafka import KafkaConsumer, KafkaProducer
  8. from kafka.structs import TopicPartition
  9. #
  10. # packages required for ConsumerNextIteratorPersist
  11. import json
  12. from redis import Redis
  13. class Producer(threading.Thread):
  14. daemon = True
  15. def run(self):
  16. producer = KafkaProducer(bootstrap_servers='localhost:9092')
  17. topics = ['my-topic', 'my-topic2']
  18. i = 0
  19. while True:
  20. #today = datetime.date.today()
  21. s = "%d %s test %s" % (i, topics[i%2], time.strftime("%b %d %Y %H:%M:%S"))
  22. logging.info(s)
  23. producer.send(topics[i%2], s)
  24. time.sleep(.15)
  25. i=i+1
  26. class Consumer(threading.Thread):
  27. daemon = True
  28. my_topics = ['my-topic', 'my-topic2']
  29. def __init__(self, group=None, target=None, name=None,
  30. args=(), kwargs=None, verbose=None):
  31. threading.Thread.__init__(self, group=group, target=target, name=name,
  32. verbose=verbose)
  33. self.name = name
  34. self.args = args
  35. self.kwargs = kwargs
  36. return
  37. def run(self):
  38. print '%s:%s started' % (self.kwargs['group_id'], self.name)
  39. consumer = KafkaConsumer(bootstrap_servers='localhost:9092',
  40. auto_offset_reset='latest',
  41. #
  42. # consumers having the same group id works as
  43. # a group to seize messages published by the publisher
  44. # (like a queue, each message is consumed exactly once
  45. # by a consumer)
  46. #
  47. #
  48. # in a 'pub-sub' environment, set each consumer having
  49. # a unique group_id
  50. #
  51. group_id = self.kwargs['group_id'],
  52. client_id = self.name,
  53. #
  54. # session_timeout_ms is the time it takes for another consumer
  55. # that has the same group_id to pick up the work
  56. # to consume messages when this consumer is dead
  57. #
  58. session_timeout_ms = 10000,
  59. #
  60. #
  61. #
  62. partition_assignment_strategy=[RoundRobinPartitionAssignor])
  63. consumer.subscribe(self.my_topics)
  64. # print consumer.partitions_for_topic("my-topic")
  65. # print consumer.assignment()
  66. # print consumer.subscription()
  67. #consumer.seek_to_end(TopicPartition(topic='my-topic', partition=0))
  68. for message in consumer:
  69. #time.sleep(0.25)
  70. logging.info( "%s:offset:%d part:%d %s" % (self.name, message.offset, message.partition, message.value))
  71. for t, ps in map(lambda t: (t, consumer.partitions_for_topic(t)), self.my_topics):
  72. print "t:%s %s" % (t, ','.join('p:%d, offset:%d' % (p, consumer.position(TopicPartition(topic=t, partition=p))) for p in ps)) # consumer.position(TopicPartition(topic=t, partition=p)))
  73. logging.info ('**********************************************done')
  74. class ConsumerNextIteratorPersist(threading.Thread):
  75. # this class demonstrates the use of next iterator to process message
  76. # and logic to save offsets to an external storage2
  77. daemon = True
  78. my_topics = {'my-topic':{}, 'my-topic2':{}}
  79. rs = None
  80. def __init__(self, group=None, target=None, name=None,
  81. args=(), kwargs=None, verbose=None):
  82. threading.Thread.__init__(self, group=group, target=target, name=name,
  83. verbose=verbose)
  84. self.name = name
  85. self.args = args
  86. self.kwargs = kwargs
  87. self.rs = Redis('localhost', 6379, 0)
  88. return
  89. # no use: doesn't work
  90. def seek_to_last_read_offset(self, consumer):
  91. for t in self.my_topics.keys():
  92. po = json.loads(self.rs.get(t))
  93. consumer.seek(TopicPartition(topic=t, partition=po['partition']), po['offset'])
  94. """
  95. each consumer has its own set of topics offsets stored in redis
  96. for consumer A and consumer B (with different group_ids) subscribing to the same my-topic, each
  97. each of them will need to keep track of its own offsets
  98. to make the redis key unique by consumer, the key is created by topic + '@' + consumer name
  99. example: my-topic@consumerA
  100. offsets = { topic-consumer_name:
  101. {
  102. partition0: offset0,
  103. partition1: offset1,...
  104. }
  105. }
  106. """
  107. def consumer_topic(self, tp):
  108. return tp + '@' + self.name
  109. def persist_offsets(self, topic, partition, offset):
  110. #self.rs.set(self.consumer_topic(topic), json.dumps({'partition': partition, 'offset':offset}))
  111. self.my_topics[topic][str(partition)] = offset
  112. self.rs.set(self.consumer_topic(topic), json.dumps(self.my_topics[topic]))
  113. def run(self):
  114. print '%s:%s started' % (self.kwargs['group_id'], self.name)
  115. consumer = KafkaConsumer(bootstrap_servers='localhost:9092',
  116. auto_offset_reset='earliest',
  117. #
  118. # consumers having the same group id works as
  119. # a group to seize messages published by the publisher
  120. # (like a queue, each message is consumed exactly once
  121. # by a consumer)
  122. #
  123. #
  124. # in a 'pub-sub' environment, set each consumer having
  125. # a unique group_id
  126. #
  127. group_id = self.kwargs['group_id'],
  128. client_id = self.name,
  129. #
  130. # session_timeout_ms is the time it takes for another consumer
  131. # that has the same group_id to pick up the work
  132. # to consume messages when this consumer is dead
  133. #
  134. session_timeout_ms = 10000,
  135. #
  136. #
  137. #
  138. partition_assignment_strategy=[RoundRobinPartitionAssignor])
  139. for topic in self.my_topics.keys():
  140. #
  141. # if a topic offset is stored previously (the consumer was run before),
  142. # then load the offset values
  143. # and save it locally into the my_topics map
  144. # else self.my_topics[topic] would have zero elements in it
  145. if self.rs.keys(self.consumer_topic(topic)):
  146. self.my_topics[topic] = json.loads(self.rs.get(self.consumer_topic(topic)))
  147. print self.my_topics
  148. consumer.subscribe(self.my_topics.keys())
  149. #consumer.seek_to_end(TopicPartition(topic='my-topic', partition=0))
  150. done = False
  151. while not done:
  152. message = consumer.next()
  153. #time.sleep(0.25)
  154. if message.offset % 50 == 0:
  155. logging.info( "[%s]:highwater:%d offset:%d part:%d <%s>" % (self.name, consumer.highwater(TopicPartition(message.topic, message.partition)),
  156. message.offset, message.partition, message.value))
  157. # for t, ps in map(lambda t: (t, consumer.partitions_for_topic(t)), self.my_topics.keys()):
  158. # print "t:%s %s" % (t, ','.join('p:%d, offset:%d' % (p, consumer.position(TopicPartition(topic=t, partition=p))) for p in ps)) # consumer.position(TopicPartition(topic=t, partition=p)))
  159. # if this is the first time the consumer is run
  160. # it contains no offsets in the redis map, so it has
  161. # 0 elements in the map,
  162. # then insert a new offset in redis and populate
  163. # the local my_topics dict
  164. if len(self.my_topics[message.topic]) == 0:
  165. self.persist_offsets(message.topic, message.partition, message.offset)
  166. self.my_topics[message.topic] = json.loads(self.rs.get(self.consumer_topic(message.topic)))
  167. continue
  168. #if self.my_topics[message.topic]['offset'] >= message.offset and self.my_topics[message.topic]['partition'] == message.partition:
  169. if self.my_topics[message.topic][str(message.partition)] >= message.offset:
  170. print '********************** processed previously...discarding %s %d' % (message.topic, message.offset)
  171. else:
  172. self.persist_offsets(message.topic, message.partition, message.offset)
  173. logging.info ('**********************************************done')
  174. def test_queue_mechanism(mode, gid):
  175. #
  176. # at the console, start a producer by typing python test_kafka.py P x 1
  177. # the 2nd param is consumer group which has no use for a producer
  178. # (so just specify some jibberish for the program to run)
  179. # the 3rd param is the position of the test function in the test cases list
  180. #
  181. # create a new console, start a consumer by typing python test_kafka.py C group1
  182. # create another instance of the consumer, again issuing the command
  183. # python test_kafka.py C group1
  184. #
  185. # Make sure the producer has the following settings when it is first created:
  186. # partition_assignment_strategy=[RoundRobinPartitionAssignor])
  187. #
  188. # watch the 2 instances consuming messages simultaneously
  189. # kill one of the consumer and view the other one picks up message topic originally
  190. # assigned to the now deceased consumer instance
  191. #
  192. # the time it takes for the 2nd consumer to assume the role of the first consumer
  193. # is determined by the variable session_timeout_ms = 10000,
  194. threads = []
  195. if mode == 'P':
  196. threads.append(Producer())
  197. else:
  198. threads = [
  199. Consumer(name='c1', kwargs={'group_id':gid}),
  200. Consumer(name='c2', kwargs={'group_id':gid})
  201. ]
  202. for t in threads:
  203. t.start()
  204. def test_recovery(mode, gid):
  205. #
  206. # this demo requires redis library to
  207. # persist the topic offsets in the redis database
  208. #
  209. # 1. start a producer by typing test_kafka.py P x 1
  210. # 2. start a consumer and let it run for awhile
  211. # 3. kill the consumer but leave the producer running
  212. # 4. write down the last saved topic offset in redis
  213. # 5. after awhile, restart the consumer
  214. # 6. notice the output of the consumer, there should
  215. # be some output that says messages are discarded because
  216. # they had been processed previously
  217. #
  218. # the consumer self.name has to be the same across
  219. # re-runs in order for the recovery to work
  220. # in this example, it has been hard coded to 'c_test_recovery'
  221. #
  222. threads = []
  223. if mode == 'P':
  224. threads.append(Producer())
  225. else:
  226. threads = [
  227. ConsumerNextIteratorPersist(name='c_test_recovery', kwargs={'group_id':gid}),
  228. #Consumer(name='c2')
  229. ]
  230. for t in threads:
  231. t.start()
  232. def test_recovery_discard_aged():
  233. # internal pub/sub callback pattern
  234. #fire_event(f_msg, kv, age_factor)
  235. pass
  236. def main():
  237. #
  238. # test cases
  239. #
  240. tp = [test_queue_mechanism, test_recovery, test_recovery_discard_aged]
  241. if len(sys.argv) != 4:
  242. print("Usage: %s <role(producer or consumer): P|C> <consumer group id> <test case #[0..1]>" % sys.argv[0])
  243. print "\n".join('case #%d: %s' % (i, tp[i].__name__) for i in range(len(tp)))
  244. print "example: python %s producer g2 1" % sys.argv[0]
  245. print "example: python %s consumer g2 1" % sys.argv[0]
  246. exit(-1)
  247. mode = sys.argv[1]
  248. gid = sys.argv[2] if sys.argv[2] <> None else "q-group"
  249. tp[int(sys.argv[3])](mode, gid)
  250. #time.sleep(30)
  251. while 1:
  252. try:
  253. time.sleep(5)
  254. pass
  255. except (KeyboardInterrupt, SystemExit):
  256. logging.error('caught user interrupt')
  257. sys.exit(-1)
  258. if __name__ == "__main__":
  259. logging.basicConfig(
  260. format='%(asctime)s.%(msecs)s:%(name)s:%(thread)d:%(levelname)s:%(process)d:%(message)s',
  261. level=logging.INFO
  262. )
  263. main()