pairs_corr_redis.py 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108
  1. import sys
  2. from pyspark import SparkContext
  3. from pyspark.streaming import StreamingContext
  4. from pyspark.streaming.kafka import KafkaUtils
  5. ##
  6. ##
  7. ## insert the path so spark-submit knows where
  8. ## to look for a file located in a given directory
  9. ##
  10. ## the other method is to export PYTHONPATH before
  11. ## calling spark-submit
  12. ##
  13. # import sys
  14. # sys.path.insert(0, '/home/larry-13.04/workspace/finopt/cep')
  15. print sys.path
  16. #import optcal
  17. import json
  18. import numpy
  19. #from finopt.cep.redisQueue import RedisQueue
  20. from comms.redisQueue import RedisQueue
  21. def process(time, rdd):
  22. #print (time, rdd)
  23. lt = (rdd.collect())
  24. #print '\n'.join ('%d %s'% (l[0], ''.join(('%f'% e) for e in l[1])) for l in list)
  25. if len(lt) == 2:
  26. a = list(lt[0][1])
  27. b = list(lt[1][1])
  28. #print a, b
  29. corr = 0.0
  30. if len(a) > 1 and len(b) > 1:
  31. if len(a) > len(b):
  32. corr= numpy.corrcoef(a[:len(b)], b)
  33. else:
  34. corr= numpy.corrcoef(b[:len(a)], a)
  35. print "%s corr---> %f" % (time.strftime('%Y%m%d %H:%M:%S'), corr.tolist()[0][1])
  36. d = Q.value
  37. q = RedisQueue(d['qname'], d['namespace'], d['host'], d['port'], d['db'])
  38. corr = corr.tolist()[0][1]
  39. if not numpy.isnan(corr):
  40. print 'insert into redis'
  41. q.put((time, corr))
  42. #print numpy.corrcoef(list(lt[0][1]), list(lt[1][1]))
  43. # to run from command prompt
  44. # 0. start kafka broker
  45. # 1. edit subscription.txt and prepare 2 stocks
  46. # 2. run ib_mds.py
  47. # 3. spark-submit --jars spark-streaming-kafka-assembly_2.10-1.4.1.jar ./alerts/pairs_corr.py vsu-01:2181
  48. # http://stackoverflow.com/questions/3425439/why-does-corrcoef-return-a-matrix
  49. #
  50. if __name__ == "__main__":
  51. if len(sys.argv) != 3:
  52. print("Usage: pairs_corr_redis.py <broker_list ex: vsu-01:2181> <queue_name - for saving the correlations series>")
  53. exit(-1)
  54. app_name = "IbMarketDataStream"
  55. sc = SparkContext(appName= app_name) #, pyFiles = ['./cep/redisQueue.py'])
  56. ssc = StreamingContext(sc, 2)
  57. ssc.checkpoint('./checkpoint')
  58. brokers, qname = sys.argv[1:]
  59. #
  60. # demonstrate how to use broadcast variable
  61. #
  62. Q = sc.broadcast({'qname': qname, 'namespace': 'mdq', 'host': 'localhost', 'port':6379, 'db': 3})
  63. #kvs = KafkaUtils.createDirectStream(ssc, ['ib_tick_price', 'ib_tick_size'], {"metadata.broker.list": brokers})
  64. kvs = KafkaUtils.createStream(ssc, brokers, app_name, {'ib_tick_price':1, 'ib_tick_size':1})
  65. lines = kvs.map(lambda x: x[1])
  66. uso = lines.map(lambda line: json.loads(line)).filter(lambda x: (x['tickerId'] == 1 and x['typeName']== 'tickPrice'))\
  67. .map(lambda x: (1, x['price'])).window(40, 30)
  68. dug = lines.map(lambda line: json.loads(line)).filter(lambda x: (x['tickerId'] == 2 and x['typeName']== 'tickPrice'))\
  69. .map(lambda x: (2, x['price'])).window(40, 30)
  70. pair = uso.union(dug).groupByKey()
  71. # sample values are empty, one element, and 2 elements
  72. #(1, <pyspark.resultiterable.ResultIterable object at 0x7fae53a187d0>)
  73. #(2, <pyspark.resultiterable.ResultIterable object at 0x7fae53a18c50>)
  74. #pair.pprint()
  75. pair.foreachRDD(process)
  76. ssc.start()
  77. ssc.awaitTermination()