greeks_changes.py 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193
  1. import sys
  2. from pyspark import SparkContext
  3. from pyspark.streaming import StreamingContext
  4. from pyspark.streaming.kafka import KafkaUtils
  5. from numpy import *
  6. import pylab
  7. from scipy import stats
  8. import time, datetime
  9. import threading
  10. import time
  11. import os
  12. from finopt import ystockquote
  13. ##
  14. ##
  15. ##
  16. ## This example demonstrates the use of accumulators and broadcast
  17. ## and how to terminate spark running jobs
  18. ##
  19. ## it also demonstrates how to send alerts via xmpp
  20. ## (requires prosody server running and redisQueue)
  21. ##
  22. ##
  23. ##
  24. ##
  25. ## insert the path so spark-submit knows where
  26. ## to look for a file located in a given directory
  27. ##
  28. ## the other method is to export PYTHONPATH before
  29. ## calling spark-submit
  30. ##
  31. # import sys
  32. # sys.path.insert(0, '/home/larry-13.04/workspace/finopt/cep')
  33. print sys.path
  34. #import optcal
  35. import json
  36. import numpy
  37. #from finopt.cep.redisQueue import RedisQueue
  38. from comms.redisQueue import RedisQueue
  39. from comms.alert_bot import AlertHelper
  40. def f1(time, rdd):
  41. lt = rdd.collect()
  42. if not lt:
  43. return
  44. print '**** f1'
  45. print lt
  46. print '**** end f1'
  47. f = open('/home/larry/l1304/workspace/finopt/data/mds_files/std/std-20151008.txt', 'a') # % datetime.datetime.now().strftime('%Y%m%d%H%M'), 'a')
  48. msg = ''.join('%s,%s,%s,%s,%s\n'%(s[0], s[1][0][0].strftime('%Y-%m-%d %H:%M:%S.%f'),s[1][0][1],s[1][0][2], s[1][1]) for s in lt)
  49. f.write(msg)
  50. d = Q.value
  51. # return rdd tuple (-,((-,-),-)): name = 0--, time 100, sd 101, mean 102, vol 11-
  52. for s in lt:
  53. if s[0].find('HSI-20151029-0') > 0 and (s[1][0][1] > 4.5 or s[1][1] > 100000):
  54. msg = 'Unusal trading activity: %s (SD=%0.2f, mean px=%d, vol=%d) at %s\n'\
  55. % (s[0], \
  56. s[1][0][1], s[1][0][2],\
  57. s[1][1],\
  58. s[1][0][0].strftime('%m-%d %H:%M:%S'))
  59. q = RedisQueue(d['alert_bot_q'][1], d['alert_bot_q'][0], d['host'], d['port'], d['db'])
  60. q.put(msg)
  61. def f2(time, rdd):
  62. lt = rdd.collect()
  63. if lt:
  64. change = lt[0][0]
  65. d = Q.value
  66. print '********** f2'
  67. print lt[0][0], Threshold.value, lt[0][1]
  68. print '********** end f2'
  69. if change > Threshold.value:
  70. msg = 'Stock alert triggered: %0.6f, mean: %0.2f' % (change, lt[0][1])
  71. print msg
  72. # q = RedisQueue(d['alert_bot_q'][1], d['alert_bot_q'][0], d['host'], d['port'], d['db'])
  73. # q.put(msg)
  74. def f3(time, rdd):
  75. lt = rdd.collect()
  76. if lt:
  77. #print '%s %0.2f %0.2f' % (lt[0], lt[1][0], lt[1][1])
  78. for s in lt:
  79. print '%s [%s] ' % (s[0], ','.join('(%0.4f, %0.4f)' % (e[0], e[1]) for e in s[1]))
  80. # to run from command prompt
  81. # 0. start kafka broker
  82. # 1. edit subscription.txt and prepare 2 stocks
  83. # 2. run ib_mds.py
  84. # 3. spark-submit --jars spark-streaming-kafka-assembly_2.10-1.4.1.jar ./alerts/pairs_corr.py vsu-01:2181
  85. # http://stackoverflow.com/questions/3425439/why-does-corrcoef-return-a-matrix
  86. #
  87. if __name__ == "__main__":
  88. if len(sys.argv) != 5:
  89. print("Usage: %s <broker_list ex: vsu-01:2181> <rdd_name> <tick id> <fn name>" % sys.argv[0])
  90. print("Usage: to gracefully shutdown type echo 1 > /tmp/flag at the terminal")
  91. exit(-1)
  92. app_name = "std_deviation_analysis"
  93. sc = SparkContext(appName= app_name) #, pyFiles = ['./cep/redisQueue.py'])
  94. ssc = StreamingContext(sc, 2)
  95. ssc.checkpoint('/home/larry-13.04/workspace/finopt/log/checkpoint')
  96. brokers, qname, id, fn = sys.argv[1:]
  97. id = int(id)
  98. #
  99. # demonstrate how to use broadcast variable
  100. #
  101. NumProcessed = sc.accumulator(0)
  102. cls = float(ystockquote.get_historical_prices('^HSI', '20151005', '20151005')[1][4])
  103. print 'closing price of HSI %f' % cls
  104. Q = sc.broadcast({'cls': cls, \
  105. 'rname': 'rname', 'qname': qname, 'namespace': 'mdq', 'host': 'localhost', 'port':6379, 'db': 3, 'alert_bot_q': ('alert_bot', 'chatq')})
  106. Threshold = sc.broadcast(0.25)
  107. #kvs = KafkaUtils.createDirectStream(ssc, ['ib_tick_price', 'ib_tick_size'], {"metadata.broker.list": brokers})
  108. kvs = KafkaUtils.createStream(ssc, brokers, app_name, {'optionAnalytics':1})
  109. lns = kvs.map(lambda x: x[1])
  110. #{"analytics":{"imvol" : 0.210757782404, "vega" : 3321.50906944, "delta" : 0.402751602804, "theta" : -5.58857173887, "npv" : 499.993413708, "gamma" : 0.00021240629952}, "contract":{"m_conId": 0, "m_right": "C", "m_symbol": "HSI", "m_secType": "OPT", "m_includeExpired": false, "m_multiplier": 50, "m_expiry": "20160128", "m_currency": "HKD", "m_exchange": "HKFE", "m_strike": 22600.0}, "tick_values":{"0" : 20, "1" : 500.0, "2" : 510.0, "3" : 25, "4" : 500.0, "5" : 1, "8" : 22, "9" : 628.0}, "extra":{"spot" : 22190.0, "rate" : 0.0012, "last_updated" : "20151204143050", "div" : 0.0328}}
  111. #QQQ-DEC11, HSI-DEC30
  112. mdp = lns.map(lambda x: json.loads(x))\
  113. .filter(lambda x: (x['extra']['chain_id'] == 'QQQ-DEC11'))\
  114. .map(lambda x: (x['contract']['m_strike'], (x['analytics']['imvol'], x['analytics']['theta']) ))\
  115. .groupByKeyAndWindow(6, 4, 1)
  116. #.groupByKeyAndWindow(12, 10, 1)
  117. # mds = lns.map(lambda x: json.loads(x))\
  118. # .filter(lambda x: (x['typeName'] == 'tickSize'))\
  119. # .map(lambda x: (x['contract'], x['size'] ))\
  120. # .reduceByKeyAndWindow(lambda x, y: (x + y), None, 12, 10, 1)
  121. # s1 = mdp.map(lambda x: (x[0], (datetime.datetime.fromtimestamp( [a[0] for a in x[1]][0] ), numpy.std([a[1] for a in x[1]]),\
  122. # numpy.mean([a[1] for a in x[1]]))\
  123. # ))
  124. # mds.pprint()
  125. # sps = s1.join(mds)
  126. # sps.foreachRDD(f1)
  127. # mdp.pprint()
  128. mdp.foreachRDD(f3)
  129. #trades.foreachRDD(eval(fn))
  130. def do_work():
  131. while 1:
  132. # program will stop after processing 40 rdds
  133. # if NumProcessed.value == 70:
  134. # break
  135. # program will stop on detecting a 1 in the flag file
  136. try:
  137. f = open('/tmp/flag')
  138. l = f.readlines()
  139. print 'reading %s' % l[0]
  140. if '1' in l[0]:
  141. os.remove('/tmp/flag')
  142. print 'terminating..........'
  143. ssc.stop(True, False)
  144. sys.exit(0)
  145. f.close()
  146. time.sleep(2)
  147. except IOError:
  148. continue
  149. t = threading.Thread(target = do_work, args=())
  150. t.start()
  151. ssc.start()
  152. ssc.awaitTermination()