base_messaging.py 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593
  1. #!/usr/bin/env python
  2. import threading, logging, time
  3. import sys
  4. import copy
  5. import datetime
  6. import uuid
  7. from Queue import Queue
  8. from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor
  9. from misc2.observer import NotImplementedException
  10. from kafka import KafkaConsumer, KafkaProducer
  11. from kafka.structs import TopicPartition
  12. from kafka.errors import NoBrokersAvailable
  13. #
  14. # packages required for ConsumerNextIteratorPersist
  15. import json
  16. from redis import Redis
  17. from misc2.observer import Subscriber, Publisher
  18. from numpy.distutils.fcompiler import none
  19. class Producer(threading.Thread):
  20. daemon = True
  21. def run(self):
  22. try:
  23. producer = KafkaProducer(bootstrap_servers='localhost:9092')
  24. topics = ['my-topic', 'my-topic2']
  25. i = 0
  26. while True:
  27. #today = datetime.date.today()
  28. s = "%d %s test %s" % (i, topics[i%2], time.strftime("%b %d %Y %H:%M:%S"))
  29. logging.info(s)
  30. producer.send(topics[i%2], s)
  31. time.sleep(.45)
  32. i=i+1
  33. except NoBrokersAvailable:
  34. logging.error("NoBrokersAvailable: Has kafka started?")
  35. class BaseProducer(threading.Thread, Subscriber):
  36. def __init__(self, group=None, target=None, name=None,
  37. args=(), kwargs=None, verbose=None):
  38. threading.Thread.__init__(self, group=group, target=target, name=name,
  39. verbose=verbose)
  40. """
  41. kwargs:
  42. bootstrap_host
  43. bootstrap_host
  44. redis_host
  45. session_timeout_ms:
  46. """
  47. self.name = '%s-%s' % (name, uuid.uuid5(uuid.NAMESPACE_OID, name))
  48. logging.info('BaseProducer __init__: name=%s' % self.name)
  49. self.args = args
  50. self.kwargs = kwargs
  51. self.event_q = Queue()
  52. return
  53. def send_message(self, topic, plain_text):
  54. self.event_q.put((topic, plain_text))
  55. self.event_q.task_done()
  56. def set_stop(self):
  57. self.done = True
  58. def run(self):
  59. try:
  60. producer = KafkaProducer(bootstrap_servers='%s:%s' % (self.kwargs['bootstrap_host'], self.kwargs['bootstrap_port']))
  61. self.done = False
  62. while self.done <> True:
  63. #today = datetime.date.today()
  64. if not self.event_q.empty():
  65. topic, plain_text = self.event_q.get()
  66. #s = "BaseProducer topic:[%s] msg:[%s]" % (i, topics[i%2], time.strftime("%b %d %Y %H:%M:%S"))
  67. logging.info("BaseProducer topic:[%s] msg:[%s]" % (topic, plain_text))
  68. producer.send(topic, plain_text)
  69. # to prevent excessive CPU use
  70. time.sleep(0.1)
  71. logging.info ('******** BaseProducer exit done.')
  72. except NoBrokersAvailable:
  73. logging.error("NoBrokersAvailable: Has kafka started?")
  74. class BaseConsumer(threading.Thread, Publisher):
  75. #KB_EVENT = "on_kb_event"
  76. KB_REACHED_LAST_OFFSET = "on_kb_reached_last_offset"
  77. #my_topics = {'my-topic':{}, 'my-topic2':{}}
  78. def __init__(self, group=None, target=None, name=None,
  79. args=(), kwargs=None, verbose=None):
  80. threading.Thread.__init__(self, group=group, target=target, name=name,
  81. verbose=verbose)
  82. """
  83. kwargs:
  84. bootstrap_host
  85. bootstrap_host
  86. redis_host
  87. redis_port
  88. redis_db
  89. group_id
  90. consumer_id: name
  91. topics: a list of topic strings
  92. session_timeout_ms:
  93. consumer_timeout_ms
  94. """
  95. self.name = '%s-%s' % (name, uuid.uuid5(uuid.NAMESPACE_OID, name))
  96. logging.info('BaseConsumer __init__: name=%s' % self.name)
  97. self.args = args
  98. self.kwargs = kwargs
  99. self.rs = Redis(self.kwargs['redis_host'], self.kwargs['redis_port'], self.kwargs['redis_db'])
  100. self.my_topics = {}
  101. for t in self.kwargs['topics']:
  102. self.my_topics[t]= {}
  103. #self.events = {event: dict() for event in [BaseConsumer.KB_EVENT, BaseConsumer.KB_REACHED_LAST_OFFSET]}
  104. self.events = {event: dict() for event in [BaseConsumer.KB_REACHED_LAST_OFFSET] + self.kwargs['topics']}
  105. return
  106. # no use: doesn't work
  107. def seek_to_last_read_offset(self, consumer):
  108. for t in self.my_topics.keys():
  109. po = json.loads(self.rs.get(t))
  110. consumer.seek(TopicPartition(topic=t, partition=po['partition']), po['offset'])
  111. """
  112. each consumer has its own set of topics offsets stored in redis
  113. for consumer A and consumer B (with different group_ids) subscribing to the same my-topic, each
  114. each of them will need to keep track of its own offsets
  115. to make the redis key unique by consumer, the key is created by topic + '@' + consumer name
  116. example: my-topic@consumerA
  117. offsets = { topic-consumer_name:
  118. {
  119. partition0: offset0,
  120. partition1: offset1,...
  121. }
  122. }
  123. """
  124. def consumer_topic(self, tp):
  125. return tp + '@' + self.name
  126. def clear_offsets(self):
  127. """
  128. clear the offsets in redis by removing the value from redis
  129. and emptying the internal my_topics dict
  130. clear offsets is ncessary when the offset in redis was saved
  131. at a time since kafka manager was shut down
  132. when kafka restarts, previously buffered
  133. messages are no longer available and instead it will restart its offset at 0.
  134. Reading an old offset by BaseConsumer will cause it to think that it
  135. is still receving old buffered messages from Kafa but in fact all the messages
  136. since the last shut down of kafka are all gone
  137. """
  138. for t in self.kwargs['topics']:
  139. self.my_topics[t]= {}
  140. logging.info("BaseConsumer:clear_offsets Deleting %s from redis..." % self.consumer_topic(t))
  141. self.rs.delete(self.consumer_topic(t))
  142. #raise NotImplementedException
  143. def persist_offsets(self, topic, partition, offset):
  144. #self.rs.set(self.consumer_topic(topic), json.dumps({'partition': partition, 'offset':offset}))
  145. self.my_topics[topic][str(partition)] = offset
  146. self.rs.set(self.consumer_topic(topic), json.dumps(self.my_topics[topic]))
  147. def enrich_message(self, message):
  148. return {'value': message.value, 'partition':message.partition, 'offset': message.offset}
  149. def set_stop(self):
  150. self.done = True
  151. def run(self):
  152. print '%s:%s started' % (self.kwargs['group_id'], self.name)
  153. if self.kwargs['clear_offsets'] == True:
  154. self.clear_offsets()
  155. consumer = KafkaConsumer(bootstrap_servers='%s:%s' % (self.kwargs['bootstrap_host'], self.kwargs['bootstrap_port']),
  156. auto_offset_reset='earliest',
  157. #
  158. # consumers having the same group id works as
  159. # a group to seize messages published by the publisher
  160. # (like a queue, each message is consumed exactly once
  161. # by a consumer)
  162. #
  163. #
  164. # in a 'pub-sub' environment, set each consumer having
  165. # a unique group_id
  166. #
  167. group_id = self.kwargs['group_id'],
  168. client_id = self.name,
  169. #
  170. # session_timeout_ms is the time it takes for another consumer
  171. # that has the same group_id to pick up the work
  172. # to consume messages when this consumer is dead
  173. #
  174. session_timeout_ms = self.kwargs['session_timeout_ms'],
  175. #
  176. #
  177. #
  178. partition_assignment_strategy=[RoundRobinPartitionAssignor],
  179. #
  180. #
  181. consumer_timeout_ms=self.kwargs['consumer_timeout_ms']
  182. )
  183. for topic in self.my_topics.keys():
  184. #
  185. # if a topic offset is stored previously (the consumer was run before),
  186. # then load the offset values
  187. # and save it locally into the my_topics map
  188. # else self.my_topics[topic] would have zero elements in it
  189. if self.rs.keys(self.consumer_topic(topic)):
  190. self.my_topics[topic] = json.loads(self.rs.get(self.consumer_topic(topic)))
  191. print self.my_topics
  192. consumer.subscribe(self.my_topics.keys())
  193. #consumer.seek_to_end(TopicPartition(topic='my-topic', partition=0))
  194. self.done = False
  195. while self.done <> True:
  196. try:
  197. message = consumer.next()
  198. #time.sleep(0.25)
  199. if message.offset % 50 == 0:
  200. logging.info( "[%s]:highwater:%d offset:%d part:%d <%s>" % (self.name, consumer.highwater(TopicPartition(message.topic, message.partition)),
  201. message.offset, message.partition, message.value))
  202. # for t, ps in map(lambda t: (t, consumer.partitions_for_topic(t)), self.my_topics.keys()):
  203. # print "t:%s %s" % (t, ','.join('p:%d, offset:%d' % (p, consumer.position(TopicPartition(topic=t, partition=p))) for p in ps)) # consumer.position(TopicPartition(topic=t, partition=p)))
  204. # if this is the first time the consumer is run
  205. # it contains no offsets in the redis map, so it has
  206. # 0 elements in the map,
  207. # then insert a new offset in redis and populate
  208. # the local my_topics dict
  209. if len(self.my_topics[message.topic]) == 0:
  210. self.persist_offsets(message.topic, message.partition, message.offset)
  211. self.my_topics[message.topic] = json.loads(self.rs.get(self.consumer_topic(message.topic)))
  212. #continue
  213. """
  214. the message.value received from kafaproducer is expected to contain
  215. plain text encoded as a json string
  216. the content of message.value is not altered. it's content is stored in a dict object
  217. with key = 'value' along with additional kafa metadata
  218. it is the subscriber's job to interpret the content stored in the 'value' key. Typically
  219. it means decoding the content by invoking json.loads
  220. """
  221. if self.my_topics[message.topic][str(message.partition)] > message.offset:
  222. print '********************** old message...discarding %s %d' % (message.topic, message.offset)
  223. else:
  224. #if self.my_topics[message.topic][str(message.partition)] == message.offset:
  225. # if the stored offset in redis equals to the current offset
  226. # notify the observers
  227. # the "and" condition ensures that on a fresh start of kafka server this event is not triggered as
  228. # both saved value in redis and current offset are both 0
  229. if self.my_topics[message.topic][str(message.partition)] == message.offset and message.offset <> 0:
  230. self.dispatch(BaseConsumer.KB_REACHED_LAST_OFFSET, self.enrich_message(message))
  231. logging.info('********************** reached the last message previously processed %s %d' % (message.topic, message.offset))
  232. else:
  233. self.persist_offsets(message.topic, message.partition, message.offset)
  234. #self.dispatch(BaseConsumer.KB_EVENT, {'message': message})
  235. self.dispatch(message.topic, self.enrich_message(message))
  236. except StopIteration:
  237. logging.debug('BaseConsumer:run StopIteration Caught. No new message arriving...')
  238. continue
  239. logging.info ('******** BaseConsumer exit done.')
  240. class BaseMessageListener(Subscriber):
  241. def update(self, event, param=none):
  242. try:
  243. event_fn = getattr(self, event)
  244. event_fn(param)
  245. except AttributeError:
  246. err_msg = 'BaseMessageListener:update| function %s not implemented.' % event
  247. logging.error('BaseMessageListener [%s]:update %s' % (self.name, err_msg))
  248. logging.debug("BaseMessageListener [%s]:update|Event type:[%s] content:[%s]" % (self.name, event, json.dumps(param) if param <> None else "<empty param>"))
  249. class SimpleMessageListener(BaseMessageListener):
  250. def __init__(self, name):
  251. BaseMessageListener.__init__(self, name)
  252. # def on_kb_event(self, param):
  253. # print "on_kb_event [%s] %s" % (self.name, param)
  254. def on_kb_reached_last_offset(self, param):
  255. print "on_kb_reached_last_offset [%s] %s" % (self.name, param)
  256. class Prosumer(BaseProducer):
  257. # wrapper object
  258. PROSUMER_DEFAULT_CONFIG = {
  259. 'bootstrap_servers': 'localhost',
  260. 'client_id': 'kafka-prosumer' ,
  261. 'group_id': 'kafka-prosumer-default-group',
  262. 'key_deserializer': None,
  263. 'value_deserializer': None,
  264. 'fetch_max_wait_ms': 500,
  265. 'fetch_min_bytes': 1,
  266. 'max_partition_fetch_bytes': 1 * 1024 * 1024,
  267. 'request_timeout_ms': 40 * 1000,
  268. 'retry_backoff_ms': 100,
  269. 'reconnect_backoff_ms': 50,
  270. 'max_in_flight_requests_per_connection': 5,
  271. 'auto_offset_reset': 'latest',
  272. 'enable_auto_commit': True,
  273. 'auto_commit_interval_ms': 5000,
  274. 'default_offset_commit_callback': lambda offsets, response: True,
  275. 'check_crcs': True,
  276. 'metadata_max_age_ms': 5 * 60 * 1000,
  277. 'partition_assignment_strategy': (RoundRobinPartitionAssignor),
  278. 'heartbeat_interval_ms': 3000,
  279. 'session_timeout_ms': 30000,
  280. 'max_poll_records': sys.maxsize,
  281. 'receive_buffer_bytes': None,
  282. 'send_buffer_bytes': None,
  283. 'consumer_timeout_ms': 1000,
  284. 'connections_max_idle_ms': 9 * 60 * 1000, # not implemented yet
  285. }
  286. def __init__(self, name, kwargs=None):
  287. self.kwargs = copy.copy(self.PROSUMER_DEFAULT_CONFIG)
  288. for key in self.kwargs:
  289. if key in kwargs:
  290. self.kwargs[key] = kwargs.pop(key)
  291. self.kwargs.update(kwargs)
  292. logging.info('\nProsumer:init: **** Configurations dump ***')
  293. logging.info('\n'.join('%s:%s' % (k.ljust(40), self.kwargs[k]) for k in sorted(self.kwargs)))
  294. BaseProducer.__init__(self, group=None, target=None, name=name,
  295. args=(), kwargs=self.kwargs, verbose=None)
  296. self.kconsumer = BaseConsumer(name=name, kwargs=self.kwargs)
  297. def add_listener_topics(self, listener, topics):
  298. map(lambda e: self.kconsumer.register(e, listener, getattr(listener, e)), topics)
  299. def add_listeners(self, listeners):
  300. for l in listeners:
  301. map(lambda e: self.kconsumer.register(e, l, getattr(l, e)), self.kwargs['topics'])
  302. def set_stop(self):
  303. BaseProducer.set_stop(self)
  304. self.kconsumer.set_stop()
  305. def start_prosumer(self):
  306. self.kconsumer.start()
  307. self.start()
  308. def message_loads(self, text_msg):
  309. return json.loads(text_msg)
  310. def message_dumps(self, obj_msg):
  311. return json.dumps(obj_msg)
  312. class SubscriptionListener(BaseMessageListener):
  313. def __init__(self, name, producer):
  314. BaseMessageListener.__init__(self, name)
  315. self.producer = producer
  316. self.i = 0
  317. def gw_subscription_changed(self, event, items):
  318. logging.info("[%s] received gw_subscription_changed content: [%s]" % (self.name, items))
  319. #print 'SubscriptionListener:gw_subscription_changed %s' % items
  320. # def on_kb_event(self, param):
  321. # print "on_kb_event [%s] %s" % (self.name, param)
  322. def gw_req_subscriptions(self, event, items):
  323. logging.info("[%s] received gw_req_subscriptions content:[%s]" % (self.name, items))
  324. vars= self.producer.message_loads(items['value'])
  325. self.producer.send_message('gw_subscription_changed', self.producer.message_dumps({'id': self.i, 'reqid': vars['reqid'],
  326. 'response' : "%s" % (time.strftime("%b %d %Y %H:%M:%S"))})
  327. )
  328. self.i = self.i + 1
  329. def reqMktData(self, event, items):
  330. logging.info("[%s] received %s content:[%s]" % (self.name, event, items))
  331. self.producer.send_message('tickPrice',
  332. self.producer.message_dumps({'field':4, 'typeName':'tickPrice', 'price':1.0682, 'ts':1485661437.83, 'source':'IB', 'tickerId':79, 'canAutoExecute':0}))
  333. def tickPrice(self, event, items):
  334. logging.info("[%s] received %s content:[%s]" % (self.name, event, items))
  335. def on_kb_reached_last_offset(self, event, items):
  336. logging.info("[%s] received on_kb_reached_last_offset content: [%s]" % (self.name, items))
  337. print "on_kb_reached_last_offset [%s] %s" % (self.name, items)
  338. def test_prosumer2(mode):
  339. if mode == 'A':
  340. topicsA = ['gw_subscription_changed', 'tickPrice']
  341. pA = Prosumer(name='A', kwargs={'bootstrap_host':'localhost', 'bootstrap_port':9092,
  342. 'redis_host':'localhost', 'redis_port':6379, 'redis_db':0,
  343. 'group_id': 'groupA', 'session_timeout_ms':10000,
  344. 'topics': topicsA, 'clear_offsets' : False})
  345. sA = SubscriptionListener('earA', pA)
  346. pA.add_listeners([sA])
  347. pA.start_prosumer()
  348. i = 0
  349. try:
  350. pA.send_message('reqMktData', pA.message_dumps({'contract':'dummy'}))
  351. while True: #i < 5:
  352. #pA.send_message('gw_req_subscriptions', pA.message_dumps({'desc': 'requesting subscription msg counter:%d' % i,
  353. # 'reqid': i}))
  354. i= i + 1
  355. time.sleep(.45)
  356. except (KeyboardInterrupt, SystemExit):
  357. logging.error('caught user interrupt')
  358. pA.set_stop()
  359. pA.join()
  360. else:
  361. topicsB = ['gw_req_subscriptions', 'reqMktData']
  362. pB = Prosumer(name='B', kwargs={'bootstrap_host':'localhost', 'bootstrap_port':9092,
  363. 'redis_host':'localhost', 'redis_port':6379, 'redis_db':0,
  364. 'group_id': 'groupB', 'session_timeout_ms':10000,
  365. 'topics': topicsB, 'clear_offsets' : False})
  366. sB = SubscriptionListener('earB', pB)
  367. pB.add_listeners([sB])
  368. pB.start_prosumer()
  369. try:
  370. while True: #i < 5:
  371. pB.send_message('tickPrice',
  372. pB.message_dumps({'field':5, 'typeName':'tickPrice', 'price':2.0682, 'ts':1485661437.83, 'source':'IB', 'tickerId':79, 'canAutoExecute':0}))
  373. time.sleep(.45)
  374. except (KeyboardInterrupt, SystemExit):
  375. logging.error('caught user interrupt')
  376. pB.set_stop()
  377. pB.join()
  378. class TestProducer(BaseProducer):
  379. pass
  380. def test_base_proconsumer(mode):
  381. if mode == 'P':
  382. #Producer().start()
  383. topics = ['my-topic', 'my-topic2']
  384. tp = TestProducer(name = 'testproducer', kwargs={
  385. 'bootstrap_host':'localhost', 'bootstrap_port':9092,
  386. 'topics': topics})
  387. tp.start()
  388. i = 0
  389. while True:
  390. #today = datetime.date.today()
  391. s = "%d %s test %s" % (i, topics[i%2], time.strftime("%b %d %Y %H:%M:%S"))
  392. logging.info(s)
  393. tp.send_message(topics[i%2], s)
  394. time.sleep(.45)
  395. i=i+1
  396. else:
  397. bc = BaseConsumer(name='bc', kwargs={'redis_host':'localhost', 'redis_port':6379, 'redis_db':0,
  398. 'bootstrap_host':'localhost', 'bootstrap_port':9092,
  399. 'group_id':'gid', 'session_timeout_ms':10000, 'topics': ['my-topic', 'my-topic2']})
  400. #bml = BaseMessageListener('bml')
  401. sml = SimpleMessageListener('simple')
  402. #bc.register(BaseConsumer.KB_EVENT, bml)
  403. #bc.register(BaseConsumer.KB_REACHED_LAST_OFFSET, bml)
  404. bc.register(BaseConsumer.KB_EVENT, sml)
  405. bc.register(BaseConsumer.KB_REACHED_LAST_OFFSET, sml)
  406. bc.start()
  407. def main():
  408. #
  409. # test cases
  410. #
  411. tp = [ test_base_proconsumer, test_prosumer2]
  412. if len(sys.argv) != 3:
  413. print("Usage: %s <role(producer or consumer): P|C> <test case #[0..1]>" % sys.argv[0])
  414. print "\n".join('case #%d: %s' % (i, tp[i].__name__) for i in range(len(tp)))
  415. print "example: python %s P 1" % sys.argv[0]
  416. print "example: python %s C 1" % sys.argv[0]
  417. exit(-1)
  418. mode = sys.argv[1]
  419. #gid = sys.argv[2] if sys.argv[2] <> None else "q-group"
  420. tp[int(sys.argv[2])](mode)
  421. #time.sleep(30)
  422. # while 1:
  423. # try:
  424. # time.sleep(5)
  425. # pass
  426. # except (KeyboardInterrupt, SystemExit):
  427. # logging.error('caught user interrupt')
  428. # sys.exit(-1)
  429. if __name__ == "__main__":
  430. logging.basicConfig(
  431. format='%(asctime)s.%(msecs)s:%(name)s:%(thread)d:%(levelname)s:%(process)d:%(message)s',
  432. level=logging.INFO
  433. )
  434. main()