momentum2.py 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233
  1. import sys
  2. from pyspark import SparkContext
  3. from pyspark.streaming import StreamingContext
  4. from pyspark.streaming.kafka import KafkaUtils
  5. from numpy import *
  6. import pylab
  7. from scipy import stats
  8. import threading
  9. import time
  10. import os
  11. ##
  12. ##
  13. ##
  14. ## This example demonstrates the use of accumulators and broadcast
  15. ## and how to terminate spark running jobs
  16. ##
  17. ## it also demonstrates how to send alerts via xmpp
  18. ## (requires prosody server running and redisQueue)
  19. ##
  20. ##
  21. ##
  22. ##
  23. ## insert the path so spark-submit knows where
  24. ## to look for a file located in a given directory
  25. ##
  26. ## the other method is to export PYTHONPATH before
  27. ## calling spark-submit
  28. ##
  29. # import sys
  30. # sys.path.insert(0, '/home/larry-13.04/workspace/finopt/cep')
  31. print sys.path
  32. #import optcal
  33. import json
  34. import numpy
  35. #from finopt.cep.redisQueue import RedisQueue
  36. from comms.redisQueue import RedisQueue
  37. def f1(time, rdd):
  38. lt = rdd.collect()
  39. for l in lt:
  40. print ''.join('%s {%s}' % (l[0], ','.join('%f'% e for e in l[1])))
  41. xx = [e for e in l[1]]
  42. print xx
  43. if len(xx) > 1:
  44. first = xx[0]
  45. last_pos = len(xx) - 1
  46. last = xx[last_pos]
  47. change = (last - first) / last
  48. print '%f' % change
  49. #rint '\n'.join ('%s %s'% (l[0], ''.join(('%f'% e) for e in l[1])) for l in lt)
  50. def persist(time, rdd):
  51. #print (time, rdd)
  52. #lt = (rdd.collect())
  53. rdd.saveAsTextFile("./rdd/rdd-%s-%03d" % (Q.value['rname'], NumProcessed.value))
  54. print 'process................... %d' % NumProcessed.value
  55. NumProcessed.add(1)
  56. #pass
  57. #print '\n'.join ('%d %s'% (l[0], ''.join(('%f'% e) for e in l[1])) for l in list)
  58. def simple(time, rdd):
  59. lt = (rdd.collect())
  60. if lt:
  61. first = lt[0][1][0]
  62. last_pos = len(lt) - 1
  63. last = lt[last_pos][1][0]
  64. change = (last - first) / last
  65. msg = '%0.6f, %0.2f, %0.2f, %0.2f' % (change, first, last, last_pos)
  66. print 'process............. %d {%s}' % (NumProcessed.value, msg)
  67. if abs(change) > Threshold.value:
  68. d = Q.value
  69. q = RedisQueue(d['alert_bot_q'][1], d['alert_bot_q'][0], d['host'], d['port'], d['db'])
  70. q.put(msg)
  71. NumProcessed.add(1)
  72. #print (time, rdd)
  73. def cal_trend(time, rdd):
  74. def detect_trend(x1, y1, num_points_ahead, ric):
  75. z4 = polyfit(x1, y1, 3)
  76. p4 = poly1d(z4) # construct the polynomial
  77. #print y1
  78. z5 = polyfit(x1, y1, 4)
  79. p5 = poly1d(z5)
  80. extrap_y_max_limit = len(x1) * 2 # 360 days
  81. x2 = linspace(0, extrap_y_max_limit, 50) # 0, 160, 100 means 0 - 160 with 100 data points in between
  82. pylab.switch_backend('agg') # switch to agg backend that support writing in non-main threads
  83. pylab.plot(x1, y1, 'o', x2, p4(x2),'-g', x2, p5(x2),'-b')
  84. pylab.legend(['%s to fit' % ric, '4th degree poly', '5th degree poly'])
  85. #pylab.axis([0,160,0,10])
  86. #
  87. pylab.axis([0,len(x1)*1.1, min(y1)*0.997,max(y1)*1.002]) # first pair tells the x axis boundary, 2nd pair y axis boundary
  88. # compute the slopes of each set of data points
  89. # sr - slope real contains the slope computed from real data points from d to d+5 days
  90. # s4 - slope extrapolated by applying 4th degree polynomial
  91. y_arraysize = len(y1)
  92. # s4, intercept, r_value, p_value, std_err = stats.linregress(range(0,num_points_ahead), [p4(i) for i in range(y_arraysize,y_arraysize + num_points_ahead )])
  93. # s5, intercept, r_value, p_value, std_err = stats.linregress(range(0,num_points_ahead), [p5(i) for i in range(y_arraysize,y_arraysize + num_points_ahead )])
  94. s4, intercept, r_value, p_value, std_err = stats.linregress(x1, y1)
  95. s5, intercept, r_value, p_value, std_err = stats.linregress(x1, y1)
  96. rc = (1.0 if s4 > 0.0 else 0.0, 1.0 if s5 > 0.0 else 0.0)
  97. print s4, s5, rc, y_arraysize
  98. #pylab.show()
  99. pylab.savefig('../data/extrapolation/%s-%s.png' % (ric, time))
  100. pylab.close()
  101. d = Q.value
  102. q = RedisQueue(d['qname'], d['namespace'], d['host'], d['port'], d['db'])
  103. q.put((time, y1, s4, s5))
  104. if abs(s4) > Threshold.value:
  105. d = Q.value
  106. q = RedisQueue(d['alert_bot_q'][1], d['alert_bot_q'][0], d['host'], d['port'], d['db'])
  107. q.put('%s %0.2f %0.2f' % (ric, s4, s5))
  108. return rc
  109. ls = rdd.collect()
  110. # print [y[1][0] for y in ls]
  111. # print len(ls), [range(len(ls))]
  112. # print len([y[1][0] for y in ls])
  113. if ls:
  114. rc = detect_trend(range(len(ls)), [y[1][0] for y in ls], 5, '_HSI')
  115. # to run from command prompt
  116. # 0. start kafka broker
  117. # 1. edit subscription.txt and prepare 2 stocks
  118. # 2. run ib_mds.py
  119. # 3. spark-submit --jars spark-streaming-kafka-assembly_2.10-1.4.1.jar ./alerts/pairs_corr.py vsu-01:2181
  120. # http://stackoverflow.com/questions/3425439/why-does-corrcoef-return-a-matrix
  121. #
  122. if __name__ == "__main__":
  123. if len(sys.argv) != 5:
  124. print("Usage: %s <broker_list ex: vsu-01:2181> <rdd_name> <tick id> <fn name>" % sys.argv[0])
  125. print("Usage: to gracefully shutdown type echo 1 > /tmp/flag at the terminal")
  126. exit(-1)
  127. app_name = "Momentum"
  128. sc = SparkContext(appName= app_name) #, pyFiles = ['./cep/redisQueue.py'])
  129. ssc = StreamingContext(sc, 2)
  130. ssc.checkpoint('../checkpoint')
  131. brokers, qname, id, fn = sys.argv[1:]
  132. id = int(id)
  133. #
  134. # demonstrate how to use broadcast variable
  135. #
  136. NumProcessed = sc.accumulator(0)
  137. Q = sc.broadcast({'rname': 'rname', 'qname': qname, 'namespace': 'mdq', 'host': 'localhost', 'port':6379, 'db': 3, 'alert_bot_q': ('msg_bot', 'chatq')})
  138. Threshold = sc.broadcast(0.00015)
  139. #kvs = KafkaUtils.createDirectStream(ssc, ['ib_tick_price', 'ib_tick_size'], {"metadata.broker.list": brokers})
  140. kvs = KafkaUtils.createStream(ssc, brokers, app_name, {'ib_tick_price':1, 'ib_tick_size':1})
  141. lines = kvs.map(lambda x: x[1])
  142. # s1 = lines.map(lambda line: json.loads(line)).filter(lambda x: (x['tickerId'] == id and x['typeName']== 'tickPrice'))\
  143. # .filter(lambda x: (x['field'] == 4))\
  144. # .map(lambda x: (id, x['price'])).window(60, 30)
  145. #
  146. # s2 = lines.map(lambda line: json.loads(line)).filter(lambda x: (x['tickerId'] == id and x['typeName']== 'tickSize'))\
  147. # .filter(lambda x: (x['field'] == 5))\
  148. # .map(lambda x: (id, x['size'])).window(60, 30)
  149. #
  150. #
  151. #
  152. #
  153. # trades = s1.join(s2)
  154. s1 = lines.map(lambda line: json.loads(line)).filter(lambda x: (x['tickerId'] in [1,2] and x['typeName']== 'tickPrice'))\
  155. .filter(lambda x: (x['field'] == 4))\
  156. .map(lambda x: (x['tickerId'], x['price'])).reduceByKey(lambda x,y: (x+y)/2).groupByKeyAndWindow(30, 20, 1)
  157. s1.foreachRDD(f1)
  158. #trades.pprint()
  159. #trades.foreachRDD(eval(fn))
  160. def do_work():
  161. while 1:
  162. # program will stop after processing 40 rdds
  163. # if NumProcessed.value == 70:
  164. # break
  165. # program will stop on detecting a 1 in the flag file
  166. try:
  167. f = open('/tmp/flag')
  168. l = f.readlines()
  169. print 'reading %s' % l[0]
  170. if '1' in l[0]:
  171. break
  172. f.close()
  173. time.sleep(2)
  174. except IOError:
  175. continue
  176. print 'terminating..........'
  177. ssc.stop(True, False)
  178. os.remove('/tmp/flag')
  179. sys.exit()
  180. t = threading.Thread(target = do_work, args=())
  181. t.start()
  182. ssc.start()
  183. ssc.awaitTermination()