ib_mds.py 13 KB


  1. # -*- coding: utf-8 -*-
  2. import sys, traceback
  3. import json
  4. import logging
  5. import ConfigParser
  6. from ib.ext.Contract import Contract
  7. from ib.opt import ibConnection, message
  8. from time import sleep
  9. import time, datetime
  10. from os import listdir
  11. from os.path import isfile, join
  12. from threading import Lock
  13. from comms.ib_heartbeat import IbHeartBeat
  14. import threading, urllib2
  15. from optparse import OptionParser
  16. #from options_data import ContractHelper
  17. import finopt.options_data as options_data
  18. from kafka.client import KafkaClient
  19. from kafka.producer import SimpleProducer
  20. from comms.alert_bot import AlertHelper
  21. ##
  22. ## to run, start kafka server on vsu-01 <administrator, password>
  23. ## start ibgw or tws
  24. ## edit subscript list
  25. ## check the settings in the console
  26. ##
  27. class IbKafkaProducer():
  28. config = None
  29. con = None
  30. quit = False
  31. producer = None
  32. IB_TICK_PRICE = None
  33. IB_TICK_SIZE = None
  34. toggle = False
  35. persist = {}
  36. ibh = None
  37. tlock = None
  38. ib_conn_status = None
  39. id2contract = {'conId': 1, 'id2contracts': {} }
  40. def __init__(self, config, replay = False):
  41. self.config = config
  42. self.tlock = Lock()
  43. # host = self.config.get("ib_mds", "ib_mds.gateway").strip('"').strip("'")
  44. # port = int(self.config.get("ib_mds", "ib_mds.ib_port"))
  45. # appid = int(self.config.get("ib_mds", "ib_mds.appid.id"))
  46. kafka_host = self.config.get("cep", "kafka.host").strip('"').strip("'")
  47. self.persist['is_persist'] = self.config.get("ib_mds", "ib_mds.is_persist")
  48. self.persist['persist_dir'] =self.config.get("ib_mds", "ib_mds.persist_dir").strip('"').strip("'")
  49. self.persist['file_exist'] = False
  50. self.persist['spill_over_limit'] = int(self.config.get("ib_mds", "ib_mds.spill_over_limit"))
  51. IbKafkaProducer.IB_TICK_PRICE = self.config.get("cep", "kafka.ib.topic.tick_price").strip('"').strip("'")
  52. IbKafkaProducer.IB_TICK_SIZE = self.config.get("cep", "kafka.ib.topic.tick_size").strip('"').strip("'")
  53. # self.con = ibConnection(host, port, appid)
  54. # self.con.registerAll(self.on_ib_message)
  55. # rc = self.con.connect()
  56. # if rc:
  57. # self.ib_conn_status = 'OK'
  58. # logging.info('IbKafkaProducer: connection status to IB is %d' % rc)
  59. logging.info('******* Starting IbKafkaProducer')
  60. logging.info('IbKafkaProducer: connecting to kafka host: %s...' % kafka_host)
  61. logging.info('IbKafkaProducer: message mode is async')
  62. client = KafkaClient(kafka_host)
  63. self.producer = SimpleProducer(client, async=False)
  64. if not replay:
  65. self.start_ib_connection()
  66. # # start heart beat monitor
  67. # self.ibh = IbHeartBeat(config)
  68. # self.ibh.register_listener([self.on_ib_conn_broken])
  69. # #self.ibh.run()
  70. def pub_cn_index(self, sec):
  71. #http://blog.csdn.net/moneyice/article/details/7877030
  72. # hkHSI, hkHSCEI, hkHSCCI
  73. qs = '0000001,1399001,1399300'
  74. url = 'http://api.money.126.net/data/feed/%s?callback=ne3587367b7387dc' % qs
  75. while 1:
  76. pg = urllib2.urlopen(url.encode('utf-8'))
  77. s = pg.read().replace('ne3587367b7387dc(', '')
  78. s = s[:len(s)-2]
  79. d = json.loads(s)
  80. tick_id = 9000
  81. for k,v in d.iteritems():
  82. c_name = '%s-%s-%s-%s' % (v['type'],v['code'],0,0)
  83. msg = "%s,%d,%d,%s,%s" % (datetime.datetime.now().strftime('%Y%m%d%H%M%S'), tick_id ,\
  84. 4, v['price'], c_name)
  85. print msg
  86. self.producer.send_messages('my.price', msg.encode('utf-8'))
  87. tick_id+=1
  88. sleep(sec)
  89. def add_contract(self, tuple):
  90. print tuple
  91. c = options_data.ContractHelper.makeContract(tuple)
  92. self.con.reqMktData(self.id2contract['conId'], c, '', False)
  93. self.id2contract['id2contracts'][self.id2contract['conId']] = options_data.ContractHelper.makeRedisKeyEx(c)
  94. self.id2contract['conId']+=1
  95. def on_ib_conn_broken(self, msg):
  96. logging.error('IbKafkaProducer: connection is broken!')
  97. self.ib_conn_status = 'ERROR'
  98. self.tlock.acquire() # this function may get called multiple times
  99. try: # block until another party finishes executing
  100. if self.ib_conn_status == 'OK': # check status
  101. return # if already fixed up while waiting, return
  102. self.con.eDisconnect()
  103. host = self.config.get("ib_mds", "ib_mds.gateway").strip('"').strip("'")
  104. port = int(self.config.get("ib_mds", "ib_mds.ib_port"))
  105. appid = int(self.config.get("ib_mds", "ib_mds.appid.id"))
  106. self.con = ibConnection(host, port, appid)
  107. self.con.registerAll(self.on_ib_message)
  108. rc = None
  109. while not rc and not self.quit:
  110. logging.error('IbKafkaProducer: attempt reconnection!')
  111. rc = self.con.connect()
  112. logging.info('IbKafkaProducer: connection status to IB is %d (0-broken 1-good)' % rc)
  113. sleep(2)
  114. # we arrived here because the connection has been restored
  115. if not self.quit:
  116. # resubscribe tickers again!
  117. self.load_tickers()
  118. a = AlertHelper(self.config)
  119. a.post_msg('ib_mds recovered from broken ib conn, re-subscribe tickers...')
  120. logging.debug('on_ib_conn_broken: completed restoration. releasing lock...')
  121. self.ib_conn_status = 'OK'
  122. finally:
  123. self.tlock.release()
  124. def on_ib_message(self, msg):
  125. def create_tick_kmessage(msg):
  126. d = {}
  127. for t in msg.items():
  128. d[t[0]] = t[1]
  129. d['ts'] = time.time()
  130. d['contract'] = self.id2contract['id2contracts'][msg.tickerId]
  131. d['typeName'] = msg.typeName
  132. d['source'] = 'IB'
  133. return json.dumps(d)
  134. if msg.typeName in ['tickPrice', 'tickSize']:
  135. t = create_tick_kmessage(msg)
  136. logging.debug(t)
  137. if self.toggle:
  138. print t
  139. self.producer.send_messages(IbKafkaProducer.IB_TICK_PRICE if msg.typeName == 'tickPrice' else IbKafkaProducer.IB_TICK_SIZE, t)
  140. if self.persist['is_persist']:
  141. self.write_message_to_file(t)
  142. # print ",%0.4f,%0.4f" % (msg.pos, msg.avgCost)
  143. # self.producer.send_messages('my.topic', "%0.4f,%0.4f" % (msg.pos, msg.avgCost))
  144. elif msg.typeName in ['error']:
  145. logging.error('on_ib_message: %s %s' % (msg.errorMsg, self.id2contract['id2contracts'][msg.id] if msg.errorCode == 200 else str(msg.errorCode)) )
  146. else:
  147. if self.toggle:
  148. print msg
  149. def write_message_to_file(self, t):
  150. if self.persist['file_exist'] == False:
  151. fn = '%s/ibkdump-%s.txt' % (self.persist['persist_dir'], datetime.datetime.now().strftime('%Y%m%d%H%M%S'))
  152. logging.debug('write_message_to_file: path %s', fn)
  153. self.persist['fp'] = open(fn, 'w')
  154. self.persist['file_exist'] = True
  155. self.persist['spill_over_count'] = 0
  156. self.persist['fp'].write('%s|%s\n' % (datetime.datetime.now().strftime('%Y%m%d%H%M%S'), t))
  157. self.persist['spill_over_count'] +=1
  158. #print self.persist['spill_over_count']
  159. if self.persist['spill_over_count'] == self.persist['spill_over_limit']:
  160. self.persist['fp'].flush()
  161. self.persist['fp'].close()
  162. self.persist['file_exist'] = False
  163. def start_ib_connection(self):
  164. host = self.config.get("ib_mds", "ib_mds.gateway").strip('"').strip("'")
  165. port = int(self.config.get("ib_mds", "ib_mds.ib_port"))
  166. appid = int(self.config.get("ib_mds", "ib_mds.appid.id"))
  167. self.con = ibConnection(host, port, appid)
  168. self.con.registerAll(self.on_ib_message)
  169. rc = self.con.connect()
  170. if rc:
  171. self.ib_conn_status = 'OK'
  172. logging.info('start_ib_connection: connection status to IB is %d' % rc)
  173. # start heart beat monitor
  174. self.ibh = IbHeartBeat(config)
  175. self.ibh.register_listener([self.on_ib_conn_broken])
  176. self.ibh.run()
  177. def load_tickers(self, path=None):
  178. self.id2contract = {'conId': 1, 'id2contracts': {} }
  179. if path is None:
  180. path = self.config.get("ib_mds", "ib_mds.subscription.fileloc").strip('"').strip("'")
  181. logging.info('load_tickers: attempt to open file %s' % path)
  182. fr = open(path)
  183. for l in fr.readlines():
  184. if l[0] <> '#':
  185. self.add_contract(tuple([t for t in l.strip('\n').split(',')]))
  186. def do_work(self):
  187. while not self.quit:
  188. pass
  189. def run_forever(self):
  190. t = threading.Thread(target = self.do_work, args=())
  191. t.start()
  192. self.console()
  193. print 'pending ib connection to shut down...'
  194. self.disconnect()
  195. t.join()
  196. print 'shutdown complete.'
  197. def disconnect(self):
  198. self.con.disconnect()
  199. if 'fp' in self.persist and self.persist['fp']:
  200. self.persist['fp'].flush()
  201. self.persist['fp'].close()
  202. self.quit = True
  203. self.ibh.shutdown()
  204. def replay(self, dir_loc):
  205. def process_msg(fn):
  206. fp = open(fn)
  207. logging.info('replay file %s' % fn)
  208. last_record_ts = None
  209. for line in fp:
  210. s_msg = line.split('|')[1]
  211. msg = json.loads(s_msg)
  212. msg_ts = datetime.datetime.fromtimestamp(msg['ts'])
  213. interval = (msg_ts - (last_record_ts if last_record_ts <> None else msg_ts)).microseconds / 1000000.0
  214. print '%s %s %s' % (msg_ts.strftime('%Y-%m-%d %H:%M:%S.%f'), s_msg, fn)
  215. self.producer.send_messages(IbKafkaProducer.IB_TICK_PRICE if msg['typeName'] == 'tickPrice' else IbKafkaProducer.IB_TICK_SIZE, s_msg)
  216. last_record_ts = msg_ts
  217. sleep(interval)
  218. files = sorted([ join(dir_loc,f) for f in listdir(dir_loc) if isfile(join(dir_loc,f)) ])
  219. for f in files:
  220. process_msg(f)
  221. def console(self):
  222. try:
  223. while not self.quit:
  224. print "Available commands are: l - list all subscribed contracts, a <symbol> <sectype> <exch> <ccy>"
  225. print " t - turn/on off output q - terminate program"
  226. cmd = raw_input(">>")
  227. input = cmd.split(' ')
  228. if input[0] == "q":
  229. print 'quit command received...'
  230. self.quit = True
  231. elif input[0] == 'l' :
  232. print ''.join('%s: %s\n' % (k, v) for k, v in self.id2contract['id2contracts'].iteritems())
  233. elif input[0] == 'a':
  234. self.add_contract((input[1],input[2],input[3],input[4],'',0,''))
  235. elif input[0] == 't':
  236. self.toggle = False if self.toggle else True
  237. else:
  238. pass
  239. except:
  240. exc_type, exc_value, exc_traceback = sys.exc_info()
  241. traceback.print_tb(exc_traceback, limit=1, file=sys.stdout)
  242. if __name__ == '__main__':
  243. parser = OptionParser()
  244. parser.add_option("-r", "--replay",
  245. dest="replay_dir",
  246. help="replay recorded mds files stored in the specified directory")
  247. options, arguments = parser.parse_args()
  248. #print options, arguments
  249. if len(sys.argv) < 2:
  250. print("Usage: %s [options] <config file>" % sys.argv[0])
  251. exit(-1)
  252. cfg_path= arguments[0]
  253. config = ConfigParser.SafeConfigParser()
  254. if len(config.read(cfg_path)) == 0:
  255. raise ValueError, "Failed to open config file"
  256. logconfig = eval(config.get("ib_mds", "ib_mds.logconfig").strip('"').strip("'"))
  257. logconfig['format'] = '%(asctime)s %(levelname)-8s %(message)s'
  258. logging.basicConfig(**logconfig)
  259. replay = True if options.replay_dir <> None else False
  260. ik = IbKafkaProducer(config, replay)
  261. if not replay:
  262. ik.load_tickers()
  263. else:
  264. ik.replay(options.replay_dir)
  265. ik.run_forever()