t1.py 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105
  1. from pyspark import SparkContext
  2. from pyspark.streaming import StreamingContext
  3. from pyspark.streaming.kafka import KafkaUtils
  4. from numpy import *
  5. import pylab
  6. from scipy import stats
  7. import json
  8. import numpy
  9. import time, datetime
  10. from os import listdir
  11. from os.path import isfile, join
  12. from optparse import OptionParser
  13. import sys
  14. path = '/home/larry/l1304/workspace/finopt/data/mds_files/large_up_1002/ibkdump-20151002105314.txt'
  15. path = '/home/larry/l1304/workspace/finopt/data/mds_files/large_up_1002/ibkdump-20151002110412.txt'
  16. #
  17. # demo how to compute standard deviation in a RDD
  18. def process_msg(file):
  19. try:
  20. md = sc.textFile(file)
  21. mdl = md.map(lambda lines: (lines.split('|'))).map(lambda x: json.loads(x[1]))\
  22. .filter(lambda x: (x['typeName'] == 'tickPrice' and x['contract'] == "HSI-20151029-0--FUT-HKD-102"))\
  23. .map(lambda x: x['price'])
  24. sd = numpy.std(mdl.collect())
  25. #print file[len(file)-20:], sd, mdl.count()
  26. return (sd, mdl.count())
  27. except:
  28. return None
  29. def process_msg_by_key(file):
  30. try:
  31. md = sc.textFile(file)
  32. print file
  33. mdl = md.map(lambda lines: (lines.split('|'))).map(lambda x: json.loads(x[1]))
  34. #mdp = mdl.filter(lambda x: (x['typeName'] == 'tickPrice')) and x['contract'] in ["HSI-20151029-0--FUT-HKD-102"]))\
  35. mdp = mdl.filter(lambda x: (x['typeName'] == 'tickPrice'))\
  36. .map(lambda x: (x['contract'], (x['ts'], x['price']) )).groupByKey()
  37. #mds = mdl.filter(lambda x: (x['typeName'] == 'tickSize' and x['contract'] in ["HSI-20151029-0--FUT-HKD-102"]))\
  38. mds = mdl.filter(lambda x: (x['typeName'] == 'tickSize'))\
  39. .map(lambda x: (x['contract'], (x['ts'], x['size']) )).groupByKey()
  40. sdp = mdp.map(lambda x: (x[0],\
  41. (datetime.datetime.fromtimestamp( [a[0] for a in x[1]][0] ),\
  42. numpy.std([a[1] for a in x[1]]),\
  43. numpy.mean([a[1] for a in x[1]]))\
  44. ))
  45. sds = mds.map(lambda x: (x[0],\
  46. (datetime.datetime.fromtimestamp( [a[0] for a in x[1]][0] ),\
  47. numpy.std([a[1] for a in x[1]]),\
  48. numpy.mean([a[1] for a in x[1]]))\
  49. ))
  50. #print sds.take(2)
  51. sdsp = sdp.cogroup(sds)
  52. elems = sdsp.collect()
  53. for e in elems:
  54. print '%s %s %s' % (e[0], ''.join('[%s %0.2f %0.2f]'%(p[0],p[1],p[2]) for p in e[1][0]), ''.join('[%s %0.2f %0.2f]'%(p[0],p[1],p[2]) for p in e[1][1]))
  55. return sdsp
  56. except:
  57. return
  58. if __name__ == '__main__':
  59. parser = OptionParser()
  60. # parser.add_option("-r", "--replay",
  61. # dest="replay_dir",
  62. # help="replay recorded mds files stored in the specified directory")
  63. options, arguments = parser.parse_args()
  64. #print options, arguments
  65. if len(sys.argv) < 2:
  66. print("Usage: %s [options] <dir>" % sys.argv[0])
  67. exit(-1)
  68. sc = SparkContext(appName= 't1')
  69. #dir_loc = '/home/larry/l1304/workspace/finopt/data/mds_files/large_up_1002'
  70. dir_loc = arguments[0]
  71. files = sorted([ join(dir_loc,f) for f in listdir(dir_loc) if isfile(join(dir_loc,f)) ])
  72. a = [(process_msg_by_key(f)) for f in files]
  73. #print a
  74. #print ''.join('%s,%s,%s\n' % (aa[0], aa[1], aa[2]) if aa <> None else '' for aa in a )