t1.py 3.2 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788
  1. from pyspark import SparkContext
  2. from pyspark.streaming import StreamingContext
  3. from pyspark.streaming.kafka import KafkaUtils
  4. from numpy import *
  5. import pylab
  6. from scipy import stats
  7. import json
  8. import numpy
  9. import time, datetime
  10. from os import listdir
  11. from os.path import isfile, join
  12. path = '/home/larry/l1304/workspace/finopt/data/mds_files/large_up_1002/ibkdump-20151002105314.txt'
  13. path = '/home/larry/l1304/workspace/finopt/data/mds_files/large_up_1002/ibkdump-20151002110412.txt'
  14. #
  15. # demo how to compute standard deviation in a RDD
  16. def process_msg(file):
  17. try:
  18. md = sc.textFile(file)
  19. mdl = md.map(lambda lines: (lines.split('|'))).map(lambda x: json.loads(x[1]))\
  20. .filter(lambda x: (x['typeName'] == 'tickPrice' and x['contract'] == "HSI-20151029-0--FUT-HKD-102"))\
  21. .map(lambda x: x['price'])
  22. sd = numpy.std(mdl.collect())
  23. #print file[len(file)-20:], sd, mdl.count()
  24. return (sd, mdl.count())
  25. except:
  26. return None
  27. def process_msg_by_key(file):
  28. try:
  29. md = sc.textFile(file)
  30. print file
  31. mdl = md.map(lambda lines: (lines.split('|'))).map(lambda x: json.loads(x[1]))
  32. #mdp = mdl.filter(lambda x: (x['typeName'] == 'tickPrice')) and x['contract'] in ["HSI-20151029-0--FUT-HKD-102"]))\
  33. mdp = mdl.filter(lambda x: (x['typeName'] == 'tickPrice'))\
  34. .map(lambda x: (x['contract'], (x['ts'], x['price']) )).groupByKey()
  35. #mds = mdl.filter(lambda x: (x['typeName'] == 'tickSize' and x['contract'] in ["HSI-20151029-0--FUT-HKD-102"]))\
  36. mds = mdl.filter(lambda x: (x['typeName'] == 'tickSize'))\
  37. .map(lambda x: (x['contract'], (x['ts'], x['size']) )).groupByKey()
  38. sdp = mdp.map(lambda x: (x[0],\
  39. (datetime.datetime.fromtimestamp( [a[0] for a in x[1]][0] ),\
  40. numpy.std([a[1] for a in x[1]]),\
  41. numpy.mean([a[1] for a in x[1]]))\
  42. ))
  43. sds = mds.map(lambda x: (x[0],\
  44. (datetime.datetime.fromtimestamp( [a[0] for a in x[1]][0] ),\
  45. numpy.std([a[1] for a in x[1]]),\
  46. numpy.mean([a[1] for a in x[1]]))\
  47. ))
  48. #print sds.take(2)
  49. sdsp = sdp.cogroup(sds)
  50. elems = sdsp.collect()
  51. for e in elems:
  52. print '%s %s %s' % (e[0], ''.join('[%s %0.2f %0.2f]'%(p[0],p[1],p[2]) for p in e[1][0]), ''.join('[%s %0.2f %0.2f]'%(p[0],p[1],p[2]) for p in e[1][1]))
  53. return sdsp
  54. except:
  55. return
  56. if __name__ == '__main__':
  57. sc = SparkContext(appName= 't1')
  58. #dir_loc = '/home/larry/l1304/workspace/finopt/data/mds_files/large_up_1002'
  59. dir_loc = '/home/larry/l1304/workspace/finopt/data/mds_files'
  60. files = sorted([ join(dir_loc,f) for f in listdir(dir_loc) if isfile(join(dir_loc,f)) ])
  61. a = [(process_msg_by_key(f)) for f in files]
  62. #print a
  63. #print ''.join('%s,%s,%s\n' % (aa[0], aa[1], aa[2]) if aa <> None else '' for aa in a )