base_messaging.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348
  1. #!/usr/bin/env python
  2. import threading, logging, time
  3. import sys
  4. import datetime
  5. import uuid
  6. from Queue import Queue
  7. from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor
  8. from kafka import KafkaConsumer, KafkaProducer
  9. from kafka.structs import TopicPartition
  10. from kafka.errors import NoBrokersAvailable
  11. #
  12. # packages required for ConsumerNextIteratorPersist
  13. import json
  14. from redis import Redis
  15. from misc2.observer import Subscriber, Publisher
  16. from numpy.distutils.fcompiler import none
  17. class Producer(threading.Thread):
  18. daemon = True
  19. def run(self):
  20. try:
  21. producer = KafkaProducer(bootstrap_servers='localhost:9092')
  22. topics = ['my-topic', 'my-topic2']
  23. i = 0
  24. while True:
  25. #today = datetime.date.today()
  26. s = "%d %s test %s" % (i, topics[i%2], time.strftime("%b %d %Y %H:%M:%S"))
  27. logging.info(s)
  28. producer.send(topics[i%2], s)
  29. time.sleep(.45)
  30. i=i+1
  31. except NoBrokersAvailable:
  32. logging.error("NoBrokersAvailable: Has kafka started?")
  33. class BaseProducer(threading.Thread, Subscriber):
  34. def __init__(self, group=None, target=None, name=None,
  35. args=(), kwargs=None, verbose=None):
  36. threading.Thread.__init__(self, group=group, target=target, name=name,
  37. verbose=verbose)
  38. """
  39. kwargs:
  40. bootstrap_host
  41. bootstrap_host
  42. redis_host
  43. session_timeout_ms:
  44. """
  45. self.name = '%s-%s' % (name, uuid.uuid5(uuid.NAMESPACE_OID, name))
  46. logging.info('BaseConsumer __init__: name=%s' % self.name)
  47. self.args = args
  48. self.kwargs = kwargs
  49. self.event_q = Queue()
  50. return
  51. def send_message(self, topic, message):
  52. self.event_q.put((topic, message))
  53. self.event_q.task_done()
  54. def run(self):
  55. try:
  56. producer = KafkaProducer(bootstrap_servers='%s:%s' % (self.kwargs['bootstrap_host'], self.kwargs['bootstrap_port']))
  57. while True:
  58. #today = datetime.date.today()
  59. if not self.event_q.empty():
  60. topic, message = self.event_q.get()
  61. #s = "BaseProducer topic:[%s] msg:[%s]" % (i, topics[i%2], time.strftime("%b %d %Y %H:%M:%S"))
  62. logging.debug("BaseProducer topic:[%s] msg:[%s]" % (topic, message))
  63. producer.send(topic, message)
  64. except NoBrokersAvailable:
  65. logging.error("NoBrokersAvailable: Has kafka started?")
  66. class BaseConsumer(threading.Thread, Publisher):
  67. KB_EVENT = "on_kb_event"
  68. KB_REACHED_LAST_OFFSET = "on_kb_reached_last_offset"
  69. #my_topics = {'my-topic':{}, 'my-topic2':{}}
  70. def __init__(self, group=None, target=None, name=None,
  71. args=(), kwargs=None, verbose=None):
  72. threading.Thread.__init__(self, group=group, target=target, name=name,
  73. verbose=verbose)
  74. """
  75. kwargs:
  76. bootstrap_host
  77. bootstrap_host
  78. redis_host
  79. redis_port
  80. redis_db
  81. group_id
  82. consumer_id: name
  83. topics: a list of topic strings
  84. session_timeout_ms:
  85. """
  86. self.name = '%s-%s' % (name, uuid.uuid5(uuid.NAMESPACE_OID, name))
  87. logging.info('BaseConsumer __init__: name=%s' % self.name)
  88. self.args = args
  89. self.kwargs = kwargs
  90. self.rs = Redis(self.kwargs['redis_host'], self.kwargs['redis_port'], self.kwargs['redis_db'])
  91. self.my_topics = {}
  92. for t in self.kwargs['topics']:
  93. self.my_topics[t]= {}
  94. self.events = {event: dict() for event in [BaseConsumer.KB_EVENT, BaseConsumer.KB_REACHED_LAST_OFFSET]}
  95. return
  96. # no use: doesn't work
  97. def seek_to_last_read_offset(self, consumer):
  98. for t in self.my_topics.keys():
  99. po = json.loads(self.rs.get(t))
  100. consumer.seek(TopicPartition(topic=t, partition=po['partition']), po['offset'])
  101. """
  102. each consumer has its own set of topics offsets stored in redis
  103. for consumer A and consumer B (with different group_ids) subscribing to the same my-topic, each
  104. each of them will need to keep track of its own offsets
  105. to make the redis key unique by consumer, the key is created by topic + '@' + consumer name
  106. example: my-topic@consumerA
  107. offsets = { topic-consumer_name:
  108. {
  109. partition0: offset0,
  110. partition1: offset1,...
  111. }
  112. }
  113. """
  114. def consumer_topic(self, tp):
  115. return tp + '@' + self.name
  116. def persist_offsets(self, topic, partition, offset):
  117. #self.rs.set(self.consumer_topic(topic), json.dumps({'partition': partition, 'offset':offset}))
  118. self.my_topics[topic][str(partition)] = offset
  119. self.rs.set(self.consumer_topic(topic), json.dumps(self.my_topics[topic]))
  120. def run(self):
  121. print '%s:%s started' % (self.kwargs['group_id'], self.name)
  122. consumer = KafkaConsumer(bootstrap_servers='%s:%s' % (self.kwargs['bootstrap_host'], self.kwargs['bootstrap_port']),
  123. auto_offset_reset='earliest',
  124. #
  125. # consumers having the same group id works as
  126. # a group to seize messages published by the publisher
  127. # (like a queue, each message is consumed exactly once
  128. # by a consumer)
  129. #
  130. #
  131. # in a 'pub-sub' environment, set each consumer having
  132. # a unique group_id
  133. #
  134. group_id = self.kwargs['group_id'],
  135. client_id = self.name,
  136. #
  137. # session_timeout_ms is the time it takes for another consumer
  138. # that has the same group_id to pick up the work
  139. # to consume messages when this consumer is dead
  140. #
  141. session_timeout_ms = self.kwargs['session_timeout_ms'],
  142. #
  143. #
  144. #
  145. partition_assignment_strategy=[RoundRobinPartitionAssignor])
  146. for topic in self.my_topics.keys():
  147. #
  148. # if a topic offset is stored previously (the consumer was run before),
  149. # then load the offset values
  150. # and save it locally into the my_topics map
  151. # else self.my_topics[topic] would have zero elements in it
  152. if self.rs.keys(self.consumer_topic(topic)):
  153. self.my_topics[topic] = json.loads(self.rs.get(self.consumer_topic(topic)))
  154. print self.my_topics
  155. consumer.subscribe(self.my_topics.keys())
  156. #consumer.seek_to_end(TopicPartition(topic='my-topic', partition=0))
  157. done = False
  158. while not done:
  159. message = consumer.next()
  160. #time.sleep(0.25)
  161. if message.offset % 50 == 0:
  162. logging.info( "[%s]:highwater:%d offset:%d part:%d <%s>" % (self.name, consumer.highwater(TopicPartition(message.topic, message.partition)),
  163. message.offset, message.partition, message.value))
  164. # for t, ps in map(lambda t: (t, consumer.partitions_for_topic(t)), self.my_topics.keys()):
  165. # print "t:%s %s" % (t, ','.join('p:%d, offset:%d' % (p, consumer.position(TopicPartition(topic=t, partition=p))) for p in ps)) # consumer.position(TopicPartition(topic=t, partition=p)))
  166. # if this is the first time the consumer is run
  167. # it contains no offsets in the redis map, so it has
  168. # 0 elements in the map,
  169. # then insert a new offset in redis and populate
  170. # the local my_topics dict
  171. if len(self.my_topics[message.topic]) == 0:
  172. self.persist_offsets(message.topic, message.partition, message.offset)
  173. self.my_topics[message.topic] = json.loads(self.rs.get(self.consumer_topic(message.topic)))
  174. continue
  175. if self.my_topics[message.topic][str(message.partition)] > message.offset:
  176. print '********************** old message...discarding %s %d' % (message.topic, message.offset)
  177. else:
  178. if self.my_topics[message.topic][str(message.partition)] == message.offset:
  179. self.dispatch(BaseConsumer.KB_REACHED_LAST_OFFSET, {'message': message})
  180. logging.info('********************** reached the last message previously processed %s %d' % (message.topic, message.offset))
  181. else:
  182. self.persist_offsets(message.topic, message.partition, message.offset)
  183. self.dispatch(BaseConsumer.KB_EVENT, {'message': message})
  184. logging.info ('**********************************************done')
  185. class BaseMessageListener(Subscriber):
  186. def update(self, event, param=none):
  187. try:
  188. event_fn = getattr(self, event)
  189. event_fn(param)
  190. except AttributeError:
  191. err_msg = 'BaseMessageListener:update| function %s not implemented.' % event
  192. logging.error('BaseMessageListener [%s]:update %s' % (self.name, err_msg))
  193. logging.debug("BaseMessageListener [%s]:update|Event type:[%s] content:[%s]" % (self.name, event, json.dumps(param) if param <> None else "<empty param>"))
  194. class SimpleMessageListener(BaseMessageListener):
  195. def __init__(self, name):
  196. BaseMessageListener.__init__(self, name)
  197. def on_kb_event(self, param):
  198. print "on_kb_event [%s] %s" % (self.name, param)
  199. def on_kb_reached_last_offset(self, param):
  200. print "on_kb_reached_last_offset [%s] %s" % (self.name, param)
  201. class Prosumer(BaseProducer):
  202. # wrapper object
  203. pass
  204. class TestProducer(BaseProducer):
  205. pass
  206. def test_base_proconsumer(mode):
  207. if mode == 'P':
  208. #Producer().start()
  209. topics = ['my-topic', 'my-topic2']
  210. tp = TestProducer(name = 'testproducer', kwargs={
  211. 'bootstrap_host':'localhost', 'bootstrap_port':9092,
  212. 'topics': topics})
  213. tp.start()
  214. i = 0
  215. while True:
  216. #today = datetime.date.today()
  217. s = "%d %s test %s" % (i, topics[i%2], time.strftime("%b %d %Y %H:%M:%S"))
  218. logging.info(s)
  219. tp.send_message(topics[i%2], s)
  220. time.sleep(.45)
  221. i=i+1
  222. else:
  223. bc = BaseConsumer(name='bc', kwargs={'redis_host':'localhost', 'redis_port':6379, 'redis_db':0,
  224. 'bootstrap_host':'localhost', 'bootstrap_port':9092,
  225. 'group_id':'gid', 'session_timeout_ms':10000, 'topics': ['my-topic', 'my-topic2']})
  226. #bml = BaseMessageListener('bml')
  227. sml = SimpleMessageListener('simple')
  228. #bc.register(BaseConsumer.KB_EVENT, bml)
  229. #bc.register(BaseConsumer.KB_REACHED_LAST_OFFSET, bml)
  230. bc.register(BaseConsumer.KB_EVENT, sml)
  231. bc.register(BaseConsumer.KB_REACHED_LAST_OFFSET, sml)
  232. bc.start()
  233. def main():
  234. #
  235. # test cases
  236. #
  237. tp = [ test_base_proconsumer]
  238. if len(sys.argv) != 3:
  239. print("Usage: %s <role(producer or consumer): P|C> <test case #[0..1]>" % sys.argv[0])
  240. print "\n".join('case #%d: %s' % (i, tp[i].__name__) for i in range(len(tp)))
  241. print "example: python %s P 1" % sys.argv[0]
  242. print "example: python %s C 1" % sys.argv[0]
  243. exit(-1)
  244. mode = sys.argv[1]
  245. #gid = sys.argv[2] if sys.argv[2] <> None else "q-group"
  246. tp[int(sys.argv[2])](mode)
  247. #time.sleep(30)
  248. while 1:
  249. try:
  250. time.sleep(5)
  251. pass
  252. except (KeyboardInterrupt, SystemExit):
  253. logging.error('caught user interrupt')
  254. sys.exit(-1)
  255. if __name__ == "__main__":
  256. logging.basicConfig(
  257. format='%(asctime)s.%(msecs)s:%(name)s:%(thread)d:%(levelname)s:%(process)d:%(message)s',
  258. level=logging.INFO
  259. )
  260. main()