md_std.py 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181
  1. import sys
  2. from pyspark import SparkContext
  3. from pyspark.streaming import StreamingContext
  4. from pyspark.streaming.kafka import KafkaUtils
  5. from numpy import *
  6. import pylab
  7. from scipy import stats
  8. import time, datetime
  9. import threading
  10. import time
  11. import os
  12. from finopt import ystockquote
  13. ##
  14. ##
  15. ##
  16. ## This example demonstrates the use of accumulators and broadcast
  17. ## and how to terminate spark running jobs
  18. ##
  19. ## it also demonstrates how to send alerts via xmpp
  20. ## (requires prosody server running and redisQueue)
  21. ##
  22. ##
  23. ##
  24. ##
  25. ## insert the path so spark-submit knows where
  26. ## to look for a file located in a given directory
  27. ##
  28. ## the other method is to export PYTHONPATH before
  29. ## calling spark-submit
  30. ##
  31. # import sys
  32. # sys.path.insert(0, '/home/larry-13.04/workspace/finopt/cep')
  33. print sys.path
  34. #import optcal
  35. import json
  36. import numpy
  37. #from finopt.cep.redisQueue import RedisQueue
  38. from comms.redisQueue import RedisQueue
  39. from comms.alert_bot import AlertHelper
  40. def f1(time, rdd):
  41. lt = rdd.collect()
  42. print '**** f1'
  43. print lt
  44. print '**** end f1'
  45. f = open('/home/larry/l1304/workspace/finopt/data/mds_files/std/std.txt', 'a')
  46. f.write(''.join('%s,%s,%s\n'%(s[0].strftime('%Y-%m-%d %H:%M:%S.%f'),s[1],s[2]) for s in lt))
  47. d = Q.value
  48. if float(lt[0][1]) > 8.0:
  49. msg = 'Stock SD alert triggered: '.join('%s,%s,%s\n'%(s[0].strftime('%Y-%m-%d %H:%M:%S.%f'),s[1],s[2]) for s in lt)
  50. print msg
  51. q = RedisQueue(d['alert_bot_q'][1], d['alert_bot_q'][0], d['host'], d['port'], d['db'])
  52. q.put(msg)
  53. def f2(time, rdd):
  54. lt = rdd.collect()
  55. if lt:
  56. change = lt[0][0]
  57. d = Q.value
  58. print '********** f2'
  59. print lt[0][0], Threshold.value, lt[0][1]
  60. print '********** end f2'
  61. if change > Threshold.value:
  62. msg = 'Stock alert triggered: %0.6f, mean: %0.2f' % (change, lt[0][1])
  63. print msg
  64. # q = RedisQueue(d['alert_bot_q'][1], d['alert_bot_q'][0], d['host'], d['port'], d['db'])
  65. # q.put(msg)
  66. # to run from command prompt
  67. # 0. start kafka broker
  68. # 1. edit subscription.txt and prepare 2 stocks
  69. # 2. run ib_mds.py
  70. # 3. spark-submit --jars spark-streaming-kafka-assembly_2.10-1.4.1.jar ./alerts/pairs_corr.py vsu-01:2181
  71. # http://stackoverflow.com/questions/3425439/why-does-corrcoef-return-a-matrix
  72. #
  73. if __name__ == "__main__":
  74. if len(sys.argv) != 5:
  75. print("Usage: %s <broker_list ex: vsu-01:2181> <rdd_name> <tick id> <fn name>" % sys.argv[0])
  76. print("Usage: to gracefully shutdown type echo 1 > /tmp/flag at the terminal")
  77. exit(-1)
  78. app_name = "std_deviation_analysis"
  79. sc = SparkContext(appName= app_name) #, pyFiles = ['./cep/redisQueue.py'])
  80. ssc = StreamingContext(sc, 2)
  81. ssc.checkpoint('../checkpoint')
  82. brokers, qname, id, fn = sys.argv[1:]
  83. id = int(id)
  84. #
  85. # demonstrate how to use broadcast variable
  86. #
  87. NumProcessed = sc.accumulator(0)
  88. cls = float(ystockquote.get_historical_prices('^HSI', '20150930', '20150930')[1][4])
  89. print 'closing price of HSI %f' % cls
  90. Q = sc.broadcast({'cls': cls, \
  91. 'rname': 'rname', 'qname': qname, 'namespace': 'mdq', 'host': 'localhost', 'port':6379, 'db': 3, 'alert_bot_q': ('alert_bot', 'chatq')})
  92. Threshold = sc.broadcast(0.020)
  93. #kvs = KafkaUtils.createDirectStream(ssc, ['ib_tick_price', 'ib_tick_size'], {"metadata.broker.list": brokers})
  94. kvs = KafkaUtils.createStream(ssc, brokers, app_name, {'ib_tick_price':1, 'ib_tick_size':1})
  95. lns = kvs.map(lambda x: x[1])
  96. mdl = lns.map(lambda x: json.loads(x))\
  97. .filter(lambda x: (x['typeName'] == 'tickPrice' and x['contract'] == "HSI-20151029-0--FUT-HKD-102"))\
  98. .map(lambda x: (x['contract'], (x['ts'], x['price']) ))\
  99. .groupByKeyAndWindow(12, 10, 1)
  100. s1 = mdl.map(lambda x: (datetime.datetime.fromtimestamp( [a[0] for a in x[1]][0] ), numpy.std([a[1] for a in x[1]]),\
  101. numpy.mean([a[1] for a in x[1]])\
  102. ))
  103. s2 = s1.map(lambda x: (abs(x[2] - Q.value['cls']) / Q.value['cls'], x[2]))
  104. # s1 = lines.map(lambda line: json.loads(line)).filter(lambda x: (x['tickerId'] in [1,2] and x['typeName']== 'tickPrice'))\
  105. # .filter(lambda x: (x['field'] == 4))\
  106. # .map(lambda x: (x['tickerId'], x['price'])).reduceByKey(lambda x,y: (x+y)/2).groupByKeyAndWindow(30, 20, 1)
  107. s1.foreachRDD(f1)
  108. s2.foreachRDD(f2)
  109. #trades.pprint()
  110. #trades.foreachRDD(eval(fn))
  111. def do_work():
  112. while 1:
  113. # program will stop after processing 40 rdds
  114. # if NumProcessed.value == 70:
  115. # break
  116. # program will stop on detecting a 1 in the flag file
  117. try:
  118. f = open('/tmp/flag')
  119. l = f.readlines()
  120. print 'reading %s' % l[0]
  121. if '1' in l[0]:
  122. os.remove('/tmp/flag')
  123. print 'terminating..........'
  124. ssc.stop(True, False)
  125. sys.exit()
  126. f.close()
  127. time.sleep(2)
  128. except IOError:
  129. continue
  130. t = threading.Thread(target = do_work, args=())
  131. t.start()
  132. ssc.start()
  133. ssc.awaitTermination()