t1.py 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120
  1. from pyspark import SparkContext
  2. from pyspark.streaming import StreamingContext
  3. from pyspark.streaming.kafka import KafkaUtils
  4. from numpy import *
  5. import pylab
  6. from scipy import stats
  7. import json
  8. import numpy
  9. import time, datetime
  10. from os import listdir
  11. from os.path import isfile, join
  12. from optparse import OptionParser
  13. import sys
  14. path = '/home/larry/l1304/workspace/finopt/data/mds_files/large_up_1002/ibkdump-20151002105314.txt'
  15. path = '/home/larry/l1304/workspace/finopt/data/mds_files/large_up_1002/ibkdump-20151002110412.txt'
  16. #
  17. # demo how to compute standard deviation in a RDD
  18. def process_msg(file):
  19. try:
  20. md = sc.textFile(file)
  21. mdl = md.map(lambda lines: (lines.split('|'))).map(lambda x: json.loads(x[1]))\
  22. .filter(lambda x: (x['typeName'] == 'tickPrice' and x['contract'] == "HSI-20151029-0--FUT-HKD-102"))\
  23. .map(lambda x: x['price'])
  24. sd = numpy.std(mdl.collect())
  25. #print file[len(file)-20:], sd, mdl.count()
  26. return (sd, mdl.count())
  27. except:
  28. return None
  29. def process_msg_by_key(file):
  30. try:
  31. md = sc.textFile(file)
  32. print file
  33. mdl = md.map(lambda lines: (lines.split('|'))).map(lambda x: json.loads(x[1]))
  34. #mdp = mdl.filter(lambda x: (x['typeName'] == 'tickPrice')) and x['contract'] in ["HSI-20151029-0--FUT-HKD-102"]))\
  35. mdp = mdl.filter(lambda x: (x['typeName'] == 'tickPrice'))\
  36. .map(lambda x: (x['contract'], (x['ts'], x['price']) )).groupByKey()
  37. #mds = mdl.filter(lambda x: (x['typeName'] == 'tickSize' and x['contract'] in ["HSI-20151029-0--FUT-HKD-102"]))\
  38. mds = mdl.filter(lambda x: (x['typeName'] == 'tickSize'))\
  39. .map(lambda x: (x['contract'], x['size'] ) )\
  40. .reduceByKey(lambda x, y: (x + y))
  41. sdp = mdp.map(lambda x: (x[0],\
  42. (datetime.datetime.fromtimestamp( [a[0] for a in x[1]][0] ),\
  43. numpy.std([a[1] for a in x[1]]),\
  44. numpy.mean([a[1] for a in x[1]]))\
  45. ))
  46. # sds = mds.map(lambda x: (x[0],\
  47. # (datetime.datetime.fromtimestamp( [a[0] for a in x[1]][0] ),\
  48. # numpy.std([a[1] for a in x[1]]),\
  49. # numpy.mean([a[1] for a in x[1]]))\
  50. # ))
  51. #sdsp = sdp.cogroup(sds)
  52. sdsp = mds.join(sdp)
  53. print sdsp.take(2)
  54. elems = sdsp.collect()
  55. f = open('/home/larry/l1304/workspace/finopt/data/mds_files/std/sd-rdd.txt', 'a')
  56. for e in elems:
  57. s = '%s,%s,%s' % (e[0], ''.join('[%s %0.2f %0.2f]'%(p[0],p[1],p[2]) for p in e[1][0]), ''.join('[%s %0.2f %0.2f]'%(p[0],p[1],p[2]) for p in e[1][1]))
  58. print s
  59. f.write(s + '\n')
  60. return sdsp
  61. except:
  62. return
  63. def process_port(file):
  64. try:
  65. md = sc.textFile(file)
  66. print file
  67. mdl = md.map(lambda x: json.loads(x)).filter()
  68. except:
  69. return
  70. if __name__ == '__main__':
  71. parser = OptionParser()
  72. # parser.add_option("-r", "--replay",
  73. # dest="replay_dir",
  74. # help="replay recorded mds files stored in the specified directory")
  75. options, arguments = parser.parse_args()
  76. #print options, arguments
  77. if len(sys.argv) < 2:
  78. print("Usage: %s [options] <dir>" % sys.argv[0])
  79. exit(-1)
  80. sc = SparkContext(appName= 't1')
  81. #dir_loc = '/home/larry/l1304/workspace/finopt/data/mds_files/large_up_1002'
  82. dir_loc = arguments[0]
  83. files = sorted([ join(dir_loc,f) for f in listdir(dir_loc) if isfile(join(dir_loc,f)) ])
  84. a = [(process_msg_by_key(f)) for f in files]
  85. #print a
  86. #print ''.join('%s,%s,%s\n' % (aa[0], aa[1], aa[2]) if aa <> None else '' for aa in a )