| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201 |
- import sys
- from pyspark import SparkContext
- from pyspark.streaming import StreamingContext
- from pyspark.streaming.kafka import KafkaUtils
- from numpy import *
- import pylab
- from scipy import stats
- import threading
- import time
- import os
- ##
- ##
- ##
- ## This example demonstrates the use of accumulators and broadcast
- ## and how to terminate spark running jobs
- ##
- ##
- ##
- ##
- ## insert the path so spark-submit knows where
- ## to look for a file located in a given directory
- ##
- ## the other method is to export PYTHONPATH before
- ## calling spark-submit
- ##
- # import sys
- # sys.path.insert(0, '/home/larry-13.04/workspace/finopt/cep')
- print sys.path
- #import optcal
- import json
- import numpy
- #from finopt.cep.redisQueue import RedisQueue
- from comms.redisQueue import RedisQueue
- def persist(time, rdd):
- #print (time, rdd)
- #lt = (rdd.collect())
-
- rdd.saveAsTextFile("./rdd/rdd-%s-%03d" % (Q.value['rname'], NumProcessed.value))
- print 'process................... %d' % NumProcessed.value
- NumProcessed.add(1)
- #pass
- #print '\n'.join ('%d %s'% (l[0], ''.join(('%f'% e) for e in l[1])) for l in list)
-
- def simple(time, rdd):
-
- lt = (rdd.collect())
-
- if lt:
-
- first = lt[0][1][0]
- last_pos = len(lt) - 1
- last = lt[last_pos][1][0]
- change = (last - first) / last
- print change, first, last, len(lt)
- print 'process................... %d' % NumProcessed.value
- NumProcessed.add(1)
-
- def cal_trend(time, rdd):
- def detect_trend(x1, y1, num_points_ahead, ric):
- z4 = polyfit(x1, y1, 3)
- p4 = poly1d(z4) # construct the polynomial
- #print y1
-
- z5 = polyfit(x1, y1, 4)
- p5 = poly1d(z5)
-
- extrap_y_max_limit = len(x1) * 2 # 360 days
- x2 = linspace(0, extrap_y_max_limit, 50) # 0, 160, 100 means 0 - 160 with 100 data points in between
- pylab.switch_backend('agg') # switch to agg backend that support writing in non-main threads
- pylab.plot(x1, y1, 'o', x2, p4(x2),'-g', x2, p5(x2),'-b')
- pylab.legend(['%s to fit' % ric, '4th degree poly', '5th degree poly'])
- #pylab.axis([0,160,0,10])
- #
- pylab.axis([0,len(x1)*1.1, min(y1)*0.997,max(y1)*1.002]) # first pair tells the x axis boundary, 2nd pair y axis boundary
-
- # compute the slopes of each set of data points
- # sr - slope real contains the slope computed from real data points from d to d+5 days
- # s4 - slope extrapolated by applying 4th degree polynomial
- y_arraysize = len(y1)
- # s4, intercept, r_value, p_value, std_err = stats.linregress(range(0,num_points_ahead), [p4(i) for i in range(y_arraysize,y_arraysize + num_points_ahead )])
- # s5, intercept, r_value, p_value, std_err = stats.linregress(range(0,num_points_ahead), [p5(i) for i in range(y_arraysize,y_arraysize + num_points_ahead )])
- s4, intercept, r_value, p_value, std_err = stats.linregress(x1, y1)
- s5, intercept, r_value, p_value, std_err = stats.linregress(x1, y1)
-
-
- rc = (1.0 if s4 > 0.0 else 0.0, 1.0 if s5 > 0.0 else 0.0)
- print s4, s5, rc, y_arraysize
- #pylab.show()
- pylab.savefig('../data/extrapolation/%s-%s.png' % (ric, time))
- d = Q.value
- q = RedisQueue(d['qname'], d['namespace'], d['host'], d['port'], d['db'])
- q.put((time, y1))
- # # clear memory
- pylab.close()
- return rc
- ls = rdd.collect()
- # print [y[1][0] for y in ls]
- # print len(ls), [range(len(ls))]
- # print len([y[1][0] for y in ls])
- if ls:
- rc = detect_trend(range(len(ls)), [y[1][0] for y in ls], 5, '_HSI')
-
- # to run from command prompt
- # 0. start kafka broker
- # 1. edit subscription.txt and prepare 2 stocks
- # 2. run ib_mds.py
- # 3. spark-submit --jars spark-streaming-kafka-assembly_2.10-1.4.1.jar ./alerts/pairs_corr.py vsu-01:2181
- # http://stackoverflow.com/questions/3425439/why-does-corrcoef-return-a-matrix
- #
- if __name__ == "__main__":
- if len(sys.argv) != 5:
- print("Usage: %s <broker_list ex: vsu-01:2181> <rdd_name> <tick id> <fn name>" % sys.argv[0])
- print("Usage: to gracefully shutdown type echo 1 > /tmp/flag at the terminal")
- exit(-1)
- app_name = "Momentum"
- sc = SparkContext(appName= app_name) #, pyFiles = ['./cep/redisQueue.py'])
- ssc = StreamingContext(sc, 2)
- ssc.checkpoint('../checkpoint')
-
- brokers, qname, id, fn = sys.argv[1:]
- id = int(id)
-
- #
- # demonstrate how to use broadcast variable
- #
- NumProcessed = sc.accumulator(0)
- Q = sc.broadcast({'rname': 'rname', 'qname': qname, 'namespace': 'mdq', 'host': 'localhost', 'port':6379, 'db': 3})
-
- #kvs = KafkaUtils.createDirectStream(ssc, ['ib_tick_price', 'ib_tick_size'], {"metadata.broker.list": brokers})
- kvs = KafkaUtils.createStream(ssc, brokers, app_name, {'ib_tick_price':1, 'ib_tick_size':1})
- lines = kvs.map(lambda x: x[1])
- s1 = lines.map(lambda line: json.loads(line)).filter(lambda x: (x['tickerId'] == id and x['typeName']== 'tickPrice'))\
- .filter(lambda x: (x['field'] == 1))\
- .map(lambda x: (id, x['price'])).window(30, 20)
-
- s2 = lines.map(lambda line: json.loads(line)).filter(lambda x: (x['tickerId'] == id and x['typeName']== 'tickSize'))\
- .filter(lambda x: (x['field'] == 5))\
- .map(lambda x: (id, x['size'])).window(30, 20)
-
-
-
-
- #trades = s1.join(s2)
- trades = s1.leftOuterJoin(s2)
-
- #s1.pprint()
- trades.pprint()
- trades.foreachRDD(eval(fn))
-
-
- def do_work():
- while 1:
- # program will stop after processing 40 rdds
- if NumProcessed.value == 999:
- break
- # program will stop on detecting a 1 in the flag file
- try:
- f = open('/tmp/flag')
- l = f.readlines()
- print 'reading %s' % l[0]
- if '1' in l[0]:
- break
- f.close()
- time.sleep(2)
- except:
- continue
-
- print 'terminating...'
- ssc.stop(True, False)
- os.remove('/tmp/flag')
-
-
- t = threading.Thread(target = do_work, args=())
- t.start()
- ssc.start()
- ssc.awaitTermination()
|