|
|
@@ -386,29 +386,6 @@ class BaseMessageListener(Subscriber):
|
|
|
logging.debug("BaseMessageListener [%s]:update|Event type:[%s] content:[%s]" % (self.name, event, json.dumps(param) if param <> None else "<empty param>"))
|
|
|
|
|
|
|
|
|
-class SimpleMessageListener(BaseMessageListener):
|
|
|
-
|
|
|
- def __init__(self, name):
|
|
|
- BaseMessageListener.__init__(self, name)
|
|
|
- self.cnt_my_topic = 0
|
|
|
- self.cnt_my_topic2 = 0
|
|
|
-
|
|
|
-# def on_kb_event(self, param):
|
|
|
-# print "on_kb_event [%s] %s" % (self.name, param)
|
|
|
- def my_topic(self, param):
|
|
|
- if self.cnt_my_topic % 50 == 0:
|
|
|
- print "SimpleMessageListener:my_topic. %s" % param
|
|
|
- self.cnt_my_topic += 1
|
|
|
-
|
|
|
- def my_topic2(self, param):
|
|
|
- if self.cnt_my_topic2 % 50 == 0:
|
|
|
- print "SimpleMessageListener:my_topic2. %s" % param
|
|
|
- self.cnt_my_topic2 += 1
|
|
|
-
|
|
|
-
|
|
|
- def on_kb_reached_last_offset(self, param):
|
|
|
- print "on_kb_reached_last_offset [%s] %s" % (self.name, param)
|
|
|
-
|
|
|
|
|
|
class Prosumer(BaseProducer):
|
|
|
# wrapper object
|
|
|
@@ -501,8 +478,31 @@ class Prosumer(BaseProducer):
|
|
|
|
|
|
|
|
|
|
|
|
+class SimpleMessageListener(BaseMessageListener):
|
|
|
+
|
|
|
+ def __init__(self, name):
|
|
|
+ BaseMessageListener.__init__(self, name)
|
|
|
+ self.cnt_my_topic = 0
|
|
|
+ self.cnt_my_topic2 = 0
|
|
|
|
|
|
-class SubscriptionListener(BaseMessageListener):
|
|
|
+# def on_kb_event(self, param):
|
|
|
+# print "on_kb_event [%s] %s" % (self.name, param)
|
|
|
+ def my_topic(self, e, param):
|
|
|
+ if self.cnt_my_topic % 50 == 0:
|
|
|
+ print "SimpleMessageListener:my_topic. %s" % param
|
|
|
+ self.cnt_my_topic += 1
|
|
|
+
|
|
|
+ def my_topic2(self, e, param):
|
|
|
+ if self.cnt_my_topic2 % 50 == 0:
|
|
|
+ print "SimpleMessageListener:my_topic2. %s" % param
|
|
|
+ self.cnt_my_topic2 += 1
|
|
|
+
|
|
|
+
|
|
|
+ def on_kb_reached_last_offset(self, param):
|
|
|
+ print "on_kb_reached_last_offset [%s] %s" % (self.name, param)
|
|
|
+
|
|
|
+
|
|
|
+class Prosumer2Listener(BaseMessageListener):
|
|
|
'''
|
|
|
test code used by test cases
|
|
|
'''
|
|
|
@@ -512,54 +512,56 @@ class SubscriptionListener(BaseMessageListener):
|
|
|
self.producer = producer
|
|
|
self.i = 0
|
|
|
|
|
|
- def gw_subscription_changed(self, event, items):
|
|
|
- logging.info("[%s] received gw_subscription_changed content: [%s]" % (self.name, items))
|
|
|
- #print 'SubscriptionListener:gw_subscription_changed %s' % items
|
|
|
+
|
|
|
+
|
|
|
+ def findTemperature(self, event, location):
|
|
|
|
|
|
-# def on_kb_event(self, param):
|
|
|
-# print "on_kb_event [%s] %s" % (self.name, param)
|
|
|
- def gw_req_subscriptions(self, event, items):
|
|
|
+ logging.info("[%s] received findTemperature:[%s]" % (self.name, location))
|
|
|
+ #vars= self.producer.message_loads(items['value'])
|
|
|
|
|
|
- logging.info("[%s] received gw_req_subscriptions content:[%s]" % (self.name, items))
|
|
|
- vars= self.producer.message_loads(items['value'])
|
|
|
- self.producer.send_message('gw_subscription_changed', self.producer.message_dumps({'id': self.i, 'reqid': vars['reqid'],
|
|
|
- 'response' : "%s" % (time.strftime("%b %d %Y %H:%M:%S"))})
|
|
|
- )
|
|
|
+ import urllib2
|
|
|
+ response = urllib2.urlopen('http://rss.weather.gov.hk/rss/SeveralDaysWeatherForecast.xml')
|
|
|
+ self.producer.send_message('temperature', self.producer.message_dumps({'current': "%s" % (response.read())}))
|
|
|
+ #self.producer.send_message('temperature', self.producer.message_dumps({'current': "%s" % (time.strftime("%b %d %Y %H:%M:%S"))}))
|
|
|
+ self.producer.send_message('temperatureEnd', self.producer.message_dumps({'empty': None}))
|
|
|
+ #id': self.i, 'reqid': vars['reqid'], 'response' : "%s" % (time.strftime("%b %d %Y %H:%M:%S"))})
|
|
|
+
|
|
|
self.i = self.i + 1
|
|
|
|
|
|
- def reqMktData(self, event, items):
|
|
|
- logging.info("[%s] received %s content:[%s]" % (self.name, event, items))
|
|
|
- self.producer.send_message('tickPrice',
|
|
|
- self.producer.message_dumps({'field':4, 'typeName':'tickPrice', 'price':1.0682, 'ts':1485661437.83, 'source':'IB', 'tickerId':79, 'canAutoExecute':0}))
|
|
|
+ def temperature(self, event, current):
|
|
|
+ logging.info("[%s] received event [%s] content:[%s]" % (self.name, event, current))
|
|
|
|
|
|
|
|
|
- def tickPrice(self, event, items):
|
|
|
- logging.info("[%s] received %s content:[%s]" % (self.name, event, items))
|
|
|
+ def temperatureEnd(self, event, empty):
|
|
|
+ logging.info("[%s] received event [%s] content:[%s]" % (self.name, event, empty))
|
|
|
+ self.producer.set_stop()
|
|
|
+
|
|
|
|
|
|
- def on_kb_reached_last_offset(self, event, items):
|
|
|
- logging.info("[%s] received on_kb_reached_last_offset content: [%s]" % (self.name, items))
|
|
|
- print "on_kb_reached_last_offset [%s] %s" % (self.name, items)
|
|
|
|
|
|
|
|
|
def test_prosumer2(mode):
|
|
|
|
|
|
+ bootstrap_host = 'vorsprung'
|
|
|
+
|
|
|
if mode == 'A':
|
|
|
|
|
|
- topicsA = ['gw_subscription_changed', 'tickPrice']
|
|
|
+ topicsA = ['temperature', 'temperatureEnd']
|
|
|
|
|
|
- pA = Prosumer(name='A', kwargs={'bootstrap_host':'localhost', 'bootstrap_port':9092,
|
|
|
+ pA = Prosumer(name='A', kwargs={'bootstrap_host':bootstrap_host, 'bootstrap_port':9092,
|
|
|
'redis_host':'localhost', 'redis_port':6379, 'redis_db':0,
|
|
|
'group_id': 'groupA', 'session_timeout_ms':10000,
|
|
|
- 'topics': topicsA, 'clear_offsets' : False})
|
|
|
- sA = SubscriptionListener('earA', pA)
|
|
|
+ 'topics': topicsA, 'clear_offsets' : True,
|
|
|
+ 'seek_to_end': [topicsA],
|
|
|
+ })
|
|
|
+ sA = Prosumer2Listener('earA', pA)
|
|
|
|
|
|
pA.add_listeners([sA])
|
|
|
pA.start_prosumer()
|
|
|
i = 0
|
|
|
|
|
|
try:
|
|
|
- pA.send_message('reqMktData', pA.message_dumps({'contract':'dummy'}))
|
|
|
- while True: #i < 5:
|
|
|
+ pA.send_message('findTemperature', pA.message_dumps({'location':'HKG'}))
|
|
|
+ while not pA.is_stopped(): #i < 5:
|
|
|
|
|
|
#pA.send_message('gw_req_subscriptions', pA.message_dumps({'desc': 'requesting subscription msg counter:%d' % i,
|
|
|
# 'reqid': i}))
|
|
|
@@ -571,26 +573,26 @@ def test_prosumer2(mode):
|
|
|
pA.set_stop()
|
|
|
pA.join()
|
|
|
|
|
|
-
|
|
|
+ print ('end...exiting')
|
|
|
|
|
|
|
|
|
|
|
|
else:
|
|
|
- topicsB = ['gw_req_subscriptions', 'reqMktData']
|
|
|
+ topicsB = ['findTemperature']
|
|
|
|
|
|
- pB = Prosumer(name='B', kwargs={'bootstrap_host':'localhost', 'bootstrap_port':9092,
|
|
|
+ pB = Prosumer(name='B', kwargs={'bootstrap_host':bootstrap_host, 'bootstrap_port':9092,
|
|
|
'redis_host':'localhost', 'redis_port':6379, 'redis_db':0,
|
|
|
'group_id': 'groupB', 'session_timeout_ms':10000,
|
|
|
- 'topics': topicsB, 'clear_offsets' : False})
|
|
|
- sB = SubscriptionListener('earB', pB)
|
|
|
+ 'topics': topicsB, 'clear_offsets' : True,
|
|
|
+ 'seek_to_end': [topicsB],
|
|
|
+ })
|
|
|
+ sB = Prosumer2Listener('earB', pB)
|
|
|
pB.add_listeners([sB])
|
|
|
pB.start_prosumer()
|
|
|
try:
|
|
|
|
|
|
- while True: #i < 5:
|
|
|
+ while not pB.is_stopped(): #i < 5:
|
|
|
|
|
|
- pB.send_message('tickPrice',
|
|
|
- pB.message_dumps({'field':5, 'typeName':'tickPrice', 'price':2.0682, 'ts':1485661437.83, 'source':'IB', 'tickerId':79, 'canAutoExecute':0}))
|
|
|
|
|
|
time.sleep(.45)
|
|
|
|
|
|
@@ -616,11 +618,12 @@ def test_base_proconsumer(mode):
|
|
|
5) use of try-catch block to implement seek_to_latest offset
|
|
|
6) inherit and implement MessageListener to subscribe messages dispatched by the consumer
|
|
|
'''
|
|
|
+ topics = ['my_topic', 'my_topic2']
|
|
|
if mode == 'P':
|
|
|
#Producer().start()
|
|
|
- topics = ['my_topic', 'my_topic2']
|
|
|
+
|
|
|
tp = TestProducer(name = 'testproducer', kwargs={
|
|
|
- 'bootstrap_host':'localhost', 'bootstrap_port':9092,
|
|
|
+ 'bootstrap_host':'vsu-bison', 'bootstrap_port':9092,
|
|
|
'topics': topics})
|
|
|
tp.start()
|
|
|
i = 0
|
|
|
@@ -630,9 +633,33 @@ def test_base_proconsumer(mode):
|
|
|
try:
|
|
|
s = "%d %s test %s" % (i, topics[i%2], time.strftime("%b %d %Y %H:%M:%S"))
|
|
|
logging.info(s)
|
|
|
- tp.send_message(topics[i%2], s)
|
|
|
|
|
|
- time.sleep(.25)
|
|
|
+ '''
|
|
|
+ send_message requires 2 function arguments
|
|
|
+ argument 1 is the name of the message function expected to be fired in the consumer upon
|
|
|
+ receiving the message sent by send_message
|
|
|
+ argument 2 is a json parameter string with a list of key value pairs
|
|
|
+ the message function in the consumer is expected to have all the keys declared in the
|
|
|
+ function parameter list
|
|
|
+
|
|
|
+ for this example the consumer uses a SimpleMessageListener, thus in the SimpleMessageListener
|
|
|
+ it must declare two topic functions namely my_topic and my_topic2
|
|
|
+
|
|
|
+ to register the SimpleMessageListner to receive callback events, call the register function of
|
|
|
+ consumer which takes 3 parameters: the event name, the listener reference, and the call back function
|
|
|
+
|
|
|
+ the standard callback function takes event_name as its 2nd parameter,
|
|
|
+ follow by a list of parameters (self, event_name, param1, param2...)
|
|
|
+
|
|
|
+ def my_topic(self, event, param):
|
|
|
+ def my_topic2(self, event, param):
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ '''
|
|
|
+ tp.send_message(topics[i%2], json.dumps({'param': s}))
|
|
|
+
|
|
|
+ time.sleep(.5)
|
|
|
i=i+1
|
|
|
except (KeyboardInterrupt, SystemExit):
|
|
|
logging.error('caught user interrupt')
|
|
|
@@ -644,8 +671,8 @@ def test_base_proconsumer(mode):
|
|
|
else:
|
|
|
|
|
|
bc = BaseConsumer(name='bc', kwargs={'redis_host':'localhost', 'redis_port':6379, 'redis_db':0,
|
|
|
- 'bootstrap_host':'localhost', 'bootstrap_port':9092,
|
|
|
- 'group_id':'gid', 'session_timeout_ms':10000, 'topics': ['my_topic', 'my_topic2'],
|
|
|
+ 'bootstrap_host':'vsu-bison', 'bootstrap_port':9092,
|
|
|
+ 'group_id':'gid', 'session_timeout_ms':10000, 'topics': topics,
|
|
|
'clear_offsets': True, 'consumer_timeout_ms':1000,
|
|
|
# uncomment the next line to process messages from since the program was last shut down
|
|
|
# if seek_to_end is present, for the topic specified the consumer will begin
|
|
|
@@ -658,9 +685,26 @@ def test_base_proconsumer(mode):
|
|
|
#bml = BaseMessageListener('bml')
|
|
|
sml = SimpleMessageListener('simple')
|
|
|
bc.register(BaseConsumer.KB_REACHED_LAST_OFFSET, sml)
|
|
|
- bc.register('my_topic', sml)
|
|
|
- bc.register('my_topic2', sml)
|
|
|
+
|
|
|
+ '''
|
|
|
+ tell bc to dispatch my_topic event to the call back function
|
|
|
+ sml.my_topic of listener sml
|
|
|
+ another way to reference the function name in sml is to use the getattr
|
|
|
+ function in python: getattr(obj, 'obj_func_name')
|
|
|
+
|
|
|
+ '''
|
|
|
+ bc.register('my_topic', sml, sml.my_topic)
|
|
|
+ bc.register('my_topic2', sml, sml.my_topic2)
|
|
|
bc.start()
|
|
|
+ while True:
|
|
|
+
|
|
|
+ try:
|
|
|
+ time.sleep(.25)
|
|
|
+ except (KeyboardInterrupt, SystemExit):
|
|
|
+ logging.error('caught user interrupt')
|
|
|
+ bc.set_stop()
|
|
|
+ bc.join()
|
|
|
+ sys.exit(-1)
|
|
|
|
|
|
|
|
|
|
|
|
@@ -679,10 +723,26 @@ def main():
|
|
|
print "example: python %s C 1" % sys.argv[0]
|
|
|
exit(-1)
|
|
|
|
|
|
+
|
|
|
mode = sys.argv[1]
|
|
|
#gid = sys.argv[2] if sys.argv[2] <> None else "q-group"
|
|
|
|
|
|
+
|
|
|
+ '''
|
|
|
+ program start up parameter controls which function to test
|
|
|
|
|
|
+ tp[0] -> run test_base_proconsumer
|
|
|
+ tp[1] -> run test_prosumer2
|
|
|
+
|
|
|
+
|
|
|
+ test_prosumer2
|
|
|
+
|
|
|
+ Provide weather information: tp[1]('B')
|
|
|
+ Request weather information: tp[1]('A')
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ '''
|
|
|
tp[int(sys.argv[2])](mode)
|
|
|
|
|
|
#time.sleep(30)
|