pairs_corr.py 2.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576
  1. import sys
  2. from pyspark import SparkContext
  3. from pyspark.streaming import StreamingContext
  4. from pyspark.streaming.kafka import KafkaUtils
  5. #import optcal
  6. import json
  7. import numpy
  8. def process(time, rdd):
  9. #print (time, rdd)
  10. lt = (rdd.collect())
  11. #print '\n'.join ('%d %s'% (l[0], ''.join(('%f'% e) for e in l[1])) for l in list)
  12. if len(lt) == 2:
  13. a = list(lt[0][1])
  14. b = list(lt[1][1])
  15. #print a, b
  16. corr = 0.0
  17. if len(a) > 1 and len(b) > 1:
  18. if len(a) > len(b):
  19. corr= numpy.corrcoef(a[:len(b)], b)
  20. else:
  21. corr= numpy.corrcoef(b[:len(a)], a)
  22. print "%s corr---> %f" % (time.strftime('%Y%m%d %H:%M:%S'), corr.tolist()[0][1])
  23. #print numpy.corrcoef(list(lt[0][1]), list(lt[1][1]))
  24. # to run from command prompt
  25. # 0. start kafka broker
  26. # 1. edit subscription.txt and prepare 2 stocks
  27. # 2. run ib_mds.py
  28. # 3. spark-submit --jars spark-streaming-kafka-assembly_2.10-1.4.1.jar ./alerts/pairs_corr.py vsu-01:2181
  29. # http://stackoverflow.com/questions/3425439/why-does-corrcoef-return-a-matrix
  30. #
  31. if __name__ == "__main__":
  32. if len(sys.argv) != 2:
  33. print("Usage: ib_test02.py <broker_list ex: vsu-01:2181>")
  34. exit(-1)
  35. app_name = "IbMarketDataStream"
  36. sc = SparkContext(appName= app_name)
  37. ssc = StreamingContext(sc, 2)
  38. ssc.checkpoint('./checkpoint')
  39. brokers = sys.argv[1]
  40. #kvs = KafkaUtils.createDirectStream(ssc, ['ib_tick_price', 'ib_tick_size'], {"metadata.broker.list": brokers})
  41. kvs = KafkaUtils.createStream(ssc, brokers, app_name, {'ib_tick_price':1, 'ib_tick_size':1})
  42. lines = kvs.map(lambda x: x[1])
  43. uso = lines.map(lambda line: json.loads(line)).filter(lambda x: (x['tickerId'] == 1 and x['typeName']== 'tickPrice'))\
  44. .map(lambda x: (1, x['price'])).window(8, 6)
  45. dug = lines.map(lambda line: json.loads(line)).filter(lambda x: (x['tickerId'] == 2 and x['typeName']== 'tickPrice'))\
  46. .map(lambda x: (2, x['price'])).window(8, 6)
  47. pair = uso.union(dug).groupByKey()
  48. # sample values are empty, one element, and 2 elements
  49. #(1, <pyspark.resultiterable.ResultIterable object at 0x7fae53a187d0>)
  50. #(2, <pyspark.resultiterable.ResultIterable object at 0x7fae53a18c50>)
  51. #pair.pprint()
  52. pair.foreachRDD(process)
  53. ssc.start()
  54. ssc.awaitTermination()