port_stream.py 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217
  1. import sys
  2. from pyspark import SparkContext
  3. from pyspark.streaming import StreamingContext
  4. from pyspark.streaming.kafka import KafkaUtils
  5. from numpy import *
  6. import pylab
  7. from scipy import stats
  8. import time, datetime
  9. import threading
  10. import time
  11. import os
  12. #from finopt import ystockquote
  13. from comms.epc import ExternalProcessComm
  14. import ConfigParser
  15. import logging
  16. ##
  17. ##
  18. ##
  19. ## This example demonstrates the use of accumulators and broadcast
  20. ## and how to terminate spark running jobs
  21. ##
  22. ## it also demonstrates how to send alerts via xmpp
  23. ## (requires prosody server running and redisQueue)
  24. ##
  25. ##
  26. ##
  27. ##
  28. ## insert the path so spark-submit knows where
  29. ## to look for a file located in a given directory
  30. ##
  31. ## the other method is to export PYTHONPATH before
  32. ## calling spark-submit
  33. ##
  34. # import sys
  35. # sys.path.insert(0, '/home/larry-13.04/workspace/finopt/cep')
  36. print sys.path
  37. #import optcal
  38. import json
  39. import numpy
  40. from comms.redisQueue import RedisQueue
  41. from comms.alert_bot import AlertHelper
  42. def f1(time, rdd):
  43. lt = rdd.collect()
  44. if not lt:
  45. return
  46. print '**** f1'
  47. print lt
  48. print '**** end f1'
  49. f = open('/home/larry/l1304/workspace/finopt/data/mds_files/std/std-20151007.txt', 'a') # % datetime.datetime.now().strftime('%Y%m%d%H%M'), 'a')
  50. msg = ''.join('%s,%s,%s,%s,%s\n'%(s[0], s[1][0][0].strftime('%Y-%m-%d %H:%M:%S.%f'),s[1][0][1],s[1][0][2], s[1][1]) for s in lt)
  51. f.write(msg)
  52. d = Param.value
  53. # return rdd tuple (-,((-,-),-)): name = 0--, time 100, sd 101, mean 102, vol 11-
  54. for s in lt:
  55. if s[0].find('HSI-20151029-0') > 0 and (s[1][0][1] > 4.5 or s[1][1] > 100000):
  56. msg = 'Unusal trading activity: %s (SD=%0.2f, mean px=%d, vol=%d) at %s\n'\
  57. % (s[0], \
  58. s[1][0][1], s[1][0][2],\
  59. s[1][1],\
  60. s[1][0][0].strftime('%m-%d %H:%M:%S'))
  61. q = RedisQueue(d['alert_bot_q'][1], d['alert_bot_q'][0], d['host'], d['port'], d['db'])
  62. q.put(msg)
  63. def f2(time, rdd):
  64. lt = rdd.collect()
  65. if lt:
  66. param = Param.value
  67. print '********** f2'
  68. print '********** end f2'
  69. # if change > Threshold.value:
  70. # msg = 'Stock alert triggered: %0.6f, mean: %0.2f' % (change, lt[0][1])
  71. # print msg
  72. # q = RedisQueue(d['alert_bot_q'][1], d['alert_bot_q'][0], d['host'], d['port'], d['db'])
  73. # q.put(msg)
  74. # to run from command prompt
  75. # 0. start kafka broker
  76. # 1. edit subscription.txt and prepare 2 stocks
  77. # 2. run ib_mds.py
  78. # 3. spark-submit --jars spark-streaming-kafka-assembly_2.10-1.4.1.jar ./alerts/pairs_corr.py vsu-01:2181
  79. # http://stackoverflow.com/questions/3425439/why-does-corrcoef-return-a-matrix
  80. #
  81. if __name__ == "__main__":
  82. # if len(sys.argv) != 5:
  83. # print("Usage: %s <broker_list ex: vsu-01:2181> " % sys.argv[0])
  84. # print("Usage: to gracefully shutdown type echo 1 > /tmp/flag at the terminal")
  85. # exit(-1)
  86. if len(sys.argv) != 5:
  87. print("Usage: %s <config file>" % sys.argv[0])
  88. exit(-1)
  89. cfg_path= sys.argv[1:]
  90. config = ConfigParser.SafeConfigParser()
  91. if len(config.read(cfg_path)) == 0:
  92. raise ValueError, "Failed to open config file"
  93. logconfig = eval(config.get("cep", "cep.logconfig").strip('"').strip("'"))
  94. logconfig['format'] = '%(asctime)s %(levelname)-8s %(message)s'
  95. logging.basicConfig(**logconfig)
  96. param = {}
  97. param['broker'] = 'vsu-1:2181'
  98. param['app_name'] = "portfolio.stream"
  99. param['stream_interval'] = 5
  100. param['win_interval'] = 20
  101. param['slide_interval'] = 10
  102. param['checkpoint_path'] = '/home/larry-13.04/workspace/finopt/log/checkpoint'
  103. param['ib_contracts'] = ['HSI-20151029-0--FUT-HKD-102', 'HSI-20151127-0--FUT-HKD-102']
  104. param['port_thresholds'] = {'delta_all': [-20.0, 20.0, -0.5, 0.75], 'delta_c': [-20.0, 20.0],
  105. 'delta_p': [-20.0, 20.0],
  106. 'theta_all': [2000.0, 7000.0], 'theta_c': [-20.0, 20.0],
  107. 'theta_p': [-20.0, 20.0], 'unreal_pl': [-3]
  108. }
  109. param['rhost'] = config.get("redis", "redis.server").strip('"').strip("'")
  110. param['rport'] = config.get("redis", "redis.port")
  111. param['rdb'] = config.get("redis", "redis.db")
  112. param['chatq'] = config.get("alert_bot", "msg_bot.redis_mq").strip('"').strip("'")
  113. param['prefix'] = config.get("alert_bot", "msg_bot.redis_prefix").strip('"').strip("'")
  114. sc = SparkContext(appName= param['app_name']) #, pyFiles = ['./cep/redisQueue.py'])
  115. ssc = StreamingContext(sc, param['stream_interval'])
  116. ssc.checkpoint(param['checkpoint_path'])
  117. brokers, qname, id, fn = sys.argv[1:]
  118. id = int(id)
  119. Param = sc.broadcast(param)
  120. # { \
  121. # 'rname': 'rname', 'qname': qname, 'namespace': 'mdq', 'host': 'localhost', 'port':6379, 'db': 3, 'alert_bot_q': ('alert_bot', 'chatq')})
  122. # listen to portfolio updates sent by options_data.py
  123. # via the EPC wrapper
  124. port_st = KafkaUtils.createStream(ssc, param['broker'], param['app_name'], \
  125. {v:1 for k,v in ExternalProcessComm.EPC_TOPICS.iteritems()})
  126. # load the message into a dict
  127. jport_st = port_st.map(lambda x: x[1]).map(lambda x: json.loads(x))
  128. # split to new streams according to message types
  129. pt_st = jport_st.filter(lambda x: x[1] == 'port_summary')\
  130. .window(param['win_interval'],param['slide_interval'])
  131. pl_st = jport_st.filter(lambda x: x[1] == 'port_item')\
  132. .window(param['win_interval'],param['slide_interval'])
  133. acct_st = jport_st.filter(lambda x: x[1] == 'acct_summary')\
  134. .window(param['win_interval'],param['slide_interval'])
  135. # listen to ib_mds price updates
  136. # ib_st = KafkaUtils.createStream(ssc, brokers, param['app_name'], {'ib_tick_price':1, 'ib_tick_size':1})\
  137. # .filter(lambda x: (x['typeName'] == 'tickPrice'))\
  138. # .map(lambda x: (x['contract'], (x['ts'], x['price']) ))\
  139. # .groupByKeyAndWindow(param['win_interval'],param['slide_interval'], 1)
  140. #lns.pprint()
  141. acct_st.foreachRDD(f2)
  142. #
  143. # sps.pprint()
  144. #trades.foreachRDD(eval(fn))
  145. def do_work():
  146. while 1:
  147. # program will stop on detecting a 1 in the flag file
  148. try:
  149. f = open('/tmp/flag')
  150. l = f.readlines()
  151. print 'reading %s' % l[0]
  152. if '1' in l[0]:
  153. os.remove('/tmp/flag')
  154. print 'terminating..........'
  155. ssc.stop(True, False)
  156. sys.exit(0)
  157. f.close()
  158. time.sleep(2)
  159. except IOError:
  160. continue
  161. t = threading.Thread(target = do_work, args=())
  162. t.start()
  163. ssc.start()
  164. ssc.awaitTermination()