generic_stream.py 1.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950
  1. import sys
  2. from pyspark import SparkContext
  3. from pyspark.streaming import StreamingContext
  4. from pyspark.streaming.kafka import KafkaUtils
  5. #import optcal
  6. import json
  7. def process(time, rdd):
  8. #print (time, rdd)
  9. list = (rdd.collect())
  10. #print ('process---> %s' %list)
  11. print '\n'.join ('%s %s'% (l['contract'], l['price']) for l in list)
  12. def psize(time, rdd):
  13. list = rdd.collect()
  14. print '\n'.join ('%s:%s'% (l[0],l[1]) for l in list)
  15. if __name__ == "__main__":
  16. if len(sys.argv) != 2:
  17. print("Usage: ib_test02.py <broker_list ex: vsu-01:2181>")
  18. exit(-1)
  19. app_name = "IbMarketDataStream"
  20. sc = SparkContext(appName= app_name)
  21. ssc = StreamingContext(sc, 2)
  22. ssc.checkpoint('./checkpoint')
  23. brokers = sys.argv[1]
  24. #kvs = KafkaUtils.createDirectStream(ssc, ['ib_tick_price', 'ib_tick_size'], {"metadata.broker.list": brokers})
  25. kvs = KafkaUtils.createStream(ssc, brokers, app_name, {'ib_tick_price':1, 'ib_tick_size':1})
  26. lines = kvs.map(lambda x: x[1])
  27. msg = lines.map(lambda line: json.loads(line)).filter(lambda x: (x['tickerId'] == 1 and x['typeName']== 'tickPrice')).window(4, 2)
  28. size = lines.map(lambda line: json.loads(line)).filter(lambda x: (x['tickerId'] == 1 and x['typeName']== 'tickSize'))\
  29. .map(lambda x: (x['tickerId'], x['size'])).window(4,2)
  30. avg = size.reduceByKey(lambda a,b: (a+b)/2).window(4, 2)
  31. # mix.pprint()
  32. msg.foreachRDD(process)
  33. avg.foreachRDD(psize)
  34. ssc.start()
  35. ssc.awaitTermination()