port_stream.py 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207
  1. import sys
  2. from pyspark import SparkContext
  3. from pyspark.streaming import StreamingContext
  4. from pyspark.streaming.kafka import KafkaUtils
  5. from numpy import *
  6. import pylab
  7. from scipy import stats
  8. import time, datetime
  9. import threading
  10. import time
  11. import os
  12. #from finopt import ystockquote
  13. from comms.epc import ExternalProcessComm
  14. import ConfigParser
  15. ##
  16. ##
  17. ##
  18. ## This example demonstrates the use of accumulators and broadcast
  19. ## and how to terminate spark running jobs
  20. ##
  21. ## it also demonstrates how to send alerts via xmpp
  22. ## (requires prosody server running and redisQueue)
  23. ##
  24. ##
  25. ##
  26. ##
  27. ## insert the path so spark-submit knows where
  28. ## to look for a file located in a given directory
  29. ##
  30. ## the other method is to export PYTHONPATH before
  31. ## calling spark-submit
  32. ##
  33. # import sys
  34. # sys.path.insert(0, '/home/larry-13.04/workspace/finopt/cep')
  35. print sys.path
  36. #import optcal
  37. import json
  38. import numpy
  39. from comms.redisQueue import RedisQueue
  40. from comms.alert_bot import AlertHelper
  41. def f1(time, rdd):
  42. lt = rdd.collect()
  43. if not lt:
  44. return
  45. print '**** f1'
  46. print lt
  47. print '**** end f1'
  48. f = open('/home/larry/l1304/workspace/finopt/data/mds_files/std/std-20151007.txt', 'a') # % datetime.datetime.now().strftime('%Y%m%d%H%M'), 'a')
  49. msg = ''.join('%s,%s,%s,%s,%s\n'%(s[0], s[1][0][0].strftime('%Y-%m-%d %H:%M:%S.%f'),s[1][0][1],s[1][0][2], s[1][1]) for s in lt)
  50. f.write(msg)
  51. d = Param.value
  52. # return rdd tuple (-,((-,-),-)): name = 0--, time 100, sd 101, mean 102, vol 11-
  53. for s in lt:
  54. if s[0].find('HSI-20151029-0') > 0 and (s[1][0][1] > 4.5 or s[1][1] > 100000):
  55. msg = 'Unusal trading activity: %s (SD=%0.2f, mean px=%d, vol=%d) at %s\n'\
  56. % (s[0], \
  57. s[1][0][1], s[1][0][2],\
  58. s[1][1],\
  59. s[1][0][0].strftime('%m-%d %H:%M:%S'))
  60. q = RedisQueue(d['alert_bot_q'][1], d['alert_bot_q'][0], d['host'], d['port'], d['db'])
  61. q.put(msg)
  62. def f2(time, rdd):
  63. lt = rdd.collect()
  64. if lt:
  65. param = Param.value
  66. print '********** f2'
  67. print '********** end f2'
  68. # if change > Threshold.value:
  69. # msg = 'Stock alert triggered: %0.6f, mean: %0.2f' % (change, lt[0][1])
  70. # print msg
  71. # q = RedisQueue(d['alert_bot_q'][1], d['alert_bot_q'][0], d['host'], d['port'], d['db'])
  72. # q.put(msg)
  73. # to run from command prompt
  74. # 0. start kafka broker
  75. # 1. edit subscription.txt and prepare 2 stocks
  76. # 2. run ib_mds.py
  77. # 3. spark-submit --jars spark-streaming-kafka-assembly_2.10-1.4.1.jar ./alerts/pairs_corr.py vsu-01:2181
  78. # http://stackoverflow.com/questions/3425439/why-does-corrcoef-return-a-matrix
  79. #
  80. if __name__ == "__main__":
  81. # if len(sys.argv) != 5:
  82. # print("Usage: %s <broker_list ex: vsu-01:2181> " % sys.argv[0])
  83. # print("Usage: to gracefully shutdown type echo 1 > /tmp/flag at the terminal")
  84. # exit(-1)
  85. if len(sys.argv) != 5:
  86. print("Usage: %s <config file>" % sys.argv[0])
  87. exit(-1)
  88. cfg_path= sys.argv[1:]
  89. config = ConfigParser.ConfigParser()
  90. param = {}
  91. param['broker'] = 'vsu-1:2181'
  92. param['app_name'] = "portfolio.stream"
  93. param['stream_interval'] = 5
  94. param['win_interval'] = 20
  95. param['slide_interval'] = 10
  96. param['checkpoint_path'] = '/home/larry-13.04/workspace/finopt/log/checkpoint'
  97. param['ib_contracts'] = ['HSI-20151029-0--FUT-HKD-102', 'HSI-20151127-0--FUT-HKD-102']
  98. param['port_thresholds'] = {'delta_all': [-20.0, 20.0, -0.5, 0.75], 'delta_c': [-20.0, 20.0],
  99. 'delta_p': [-20.0, 20.0],
  100. 'theta_all': [2000.0, 7000.0], 'theta_c': [-20.0, 20.0],
  101. 'theta_p': [-20.0, 20.0], 'unreal_pl': [-3]
  102. }
  103. param['rhost'] = config.get("redis", "redis.server").strip('"').strip("'")
  104. param['rport'] = config.get("redis", "redis.port")
  105. param['rdb'] = config.get("redis", "redis.db")
  106. param['chatq'] = config.get("alert_bot", "msg_bot.redis_mq").strip('"').strip("'")
  107. param['prefix'] = config.get("alert_bot", "msg_bot.redis_prefix").strip('"').strip("'")
  108. sc = SparkContext(appName= param['app_name']) #, pyFiles = ['./cep/redisQueue.py'])
  109. ssc = StreamingContext(sc, param['stream_interval'])
  110. ssc.checkpoint(param['checkpoint_path'])
  111. brokers, qname, id, fn = sys.argv[1:]
  112. id = int(id)
  113. Param = sc.broadcast(param)
  114. # { \
  115. # 'rname': 'rname', 'qname': qname, 'namespace': 'mdq', 'host': 'localhost', 'port':6379, 'db': 3, 'alert_bot_q': ('alert_bot', 'chatq')})
  116. # listen to portfolio updates sent by options_data.py
  117. # via the EPC wrapper
  118. port_st = KafkaUtils.createStream(ssc, param['broker'], param['app_name'], \
  119. {v:1 for k,v in ExternalProcessComm.EPC_TOPICS.iteritems()})
  120. # load the message into a dict
  121. jport_st = port_st.map(lambda x: x[1]).map(lambda x: json.loads(x))
  122. # split to new streams according to message types
  123. pt_st = jport_st.filter(lambda x: x[1] == 'port_summary')\
  124. .window(param['win_interval'],param['slide_interval'])
  125. pl_st = jport_st.filter(lambda x: x[1] == 'port_item')\
  126. .window(param['win_interval'],param['slide_interval'])
  127. # listen to ib_mds price updates
  128. ib_st = KafkaUtils.createStream(ssc, brokers, param['app_name'], {'ib_tick_price':1, 'ib_tick_size':1})\
  129. .filter(lambda x: (x['typeName'] == 'tickPrice'))\
  130. .map(lambda x: (x['contract'], (x['ts'], x['price']) ))\
  131. .groupByKeyAndWindow(param['win_interval'],param['slide_interval'], 1)
  132. #lns.pprint()
  133. pt_st.foreachRDD(f2)
  134. #
  135. # sps.pprint()
  136. #trades.foreachRDD(eval(fn))
  137. def do_work():
  138. while 1:
  139. # program will stop on detecting a 1 in the flag file
  140. try:
  141. f = open('/tmp/flag')
  142. l = f.readlines()
  143. print 'reading %s' % l[0]
  144. if '1' in l[0]:
  145. os.remove('/tmp/flag')
  146. print 'terminating..........'
  147. ssc.stop(True, False)
  148. sys.exit(0)
  149. f.close()
  150. time.sleep(2)
  151. except IOError:
  152. continue
  153. t = threading.Thread(target = do_work, args=())
  154. t.start()
  155. ssc.start()
  156. ssc.awaitTermination()