ib_mds.py 11 KB


  1. # -*- coding: utf-8 -*-
  2. import sys, traceback
  3. import json
  4. import logging
  5. import ConfigParser
  6. from ib.ext.Contract import Contract
  7. from ib.opt import ibConnection, message
  8. from time import sleep
  9. import time, datetime
  10. from os import listdir
  11. from os.path import isfile, join
  12. from threading import Lock
  13. from comms.ib_heartbeat import IbHeartBeat
  14. import threading, urllib2
  15. #from options_data import ContractHelper
  16. import finopt.options_data as options_data
  17. from kafka.client import KafkaClient
  18. from kafka.producer import SimpleProducer
  19. from comms.alert_bot import AlertHelper
  20. ##
  21. ## to run, start kafka server on vsu-01 <administrator, password>
  22. ## start ibgw or tws
  23. ## edit subscript list
  24. ## check the settings in the console
  25. ##
  26. class IbKafkaProducer():
  27. config = None
  28. con = None
  29. quit = False
  30. producer = None
  31. IB_TICK_PRICE = None
  32. IB_TICK_SIZE = None
  33. toggle = False
  34. persist = {}
  35. ibh = None
  36. tlock = None
  37. ib_conn_status = None
  38. id2contract = {'conId': 1, 'id2contracts': {} }
  39. def __init__(self, config):
  40. self.config = config
  41. self.tlock = Lock()
  42. host = self.config.get("ib_mds", "ib_mds.gateway").strip('"').strip("'")
  43. port = int(self.config.get("ib_mds", "ib_mds.ib_port"))
  44. appid = int(self.config.get("ib_mds", "ib_mds.appid.id"))
  45. kafka_host = self.config.get("cep", "kafka.host").strip('"').strip("'")
  46. self.persist['is_persist'] = self.config.get("ib_mds", "ib_mds.is_persist")
  47. self.persist['persist_dir'] =self.config.get("ib_mds", "ib_mds.persist_dir").strip('"').strip("'")
  48. self.persist['file_exist'] = False
  49. self.persist['spill_over_limit'] = int(self.config.get("ib_mds", "ib_mds.spill_over_limit"))
  50. IbKafkaProducer.IB_TICK_PRICE = self.config.get("cep", "kafka.ib.topic.tick_price").strip('"').strip("'")
  51. IbKafkaProducer.IB_TICK_SIZE = self.config.get("cep", "kafka.ib.topic.tick_size").strip('"').strip("'")
  52. self.con = ibConnection(host, port, appid)
  53. self.con.registerAll(self.on_ib_message)
  54. rc = self.con.connect()
  55. if rc:
  56. self.ib_conn_status = 'OK'
  57. logging.info('******* Starting IbKafkaProducer')
  58. logging.info('IbKafkaProducer: connection status to IB is %d' % rc)
  59. logging.info('IbKafkaProducer: connecting to kafka host: %s...' % kafka_host)
  60. logging.info('IbKafkaProducer: message mode is async')
  61. client = KafkaClient(kafka_host)
  62. self.producer = SimpleProducer(client, async=False)
  63. # start heart beat monitor
  64. self.ibh = IbHeartBeat(config)
  65. self.ibh.register_listener([self.on_ib_conn_broken])
  66. self.ibh.run()
  67. def pub_cn_index(self, sec):
  68. qs = '0000001,1399001,1399300'
  69. url = 'http://api.money.126.net/data/feed/%s?callback=ne3587367b7387dc' % qs
  70. while 1:
  71. pg = urllib2.urlopen(url.encode('utf-8'))
  72. s = pg.read().replace('ne3587367b7387dc(', '')
  73. s = s[:len(s)-2]
  74. d = json.loads(s)
  75. tick_id = 9000
  76. for k,v in d.iteritems():
  77. c_name = '%s-%s-%s-%s' % (v['type'],v['code'],0,0)
  78. msg = "%s,%d,%d,%s,%s" % (datetime.datetime.now().strftime('%Y%m%d%H%M%S'), tick_id ,\
  79. 4, v['price'], c_name)
  80. print msg
  81. self.producer.send_messages('my.price', msg.encode('utf-8'))
  82. tick_id+=1
  83. sleep(sec)
  84. def add_contract(self, tuple):
  85. print tuple
  86. c = options_data.ContractHelper.makeContract(tuple)
  87. self.con.reqMktData(self.id2contract['conId'], c, '', False)
  88. self.id2contract['id2contracts'][self.id2contract['conId']] = options_data.ContractHelper.makeRedisKeyEx(c)
  89. self.id2contract['conId']+=1
  90. def on_ib_conn_broken(self, msg):
  91. logging.error('IbKafkaProducer: connection is broken!')
  92. self.ib_conn_status = 'ERROR'
  93. self.tlock.acquire()
  94. try:
  95. if self.ib_conn_status == 'OK':
  96. return
  97. self.con.eDisconnect()
  98. host = self.config.get("ib_mds", "ib_mds.gateway").strip('"').strip("'")
  99. port = int(self.config.get("ib_mds", "ib_mds.ib_port"))
  100. appid = int(self.config.get("ib_mds", "ib_mds.appid.id"))
  101. self.con = ibConnection(host, port, appid)
  102. self.con.registerAll(self.on_ib_message)
  103. rc = None
  104. while not rc and not self.quit:
  105. logging.error('IbKafkaProducer: attempt reconnection!')
  106. rc = self.con.connect()
  107. logging.info('IbKafkaProducer: connection status to IB is %d (0-broken 1-good)' % rc)
  108. sleep(2)
  109. if not self.quit:
  110. # resubscribe tickers again!
  111. self.load_tickers()
  112. a = AlertHelper(self.config)
  113. a.post_msg('ib_mds recovered from broken ib conn, re-subscribe tickers...')
  114. logging.debug('on_ib_conn_broken: completed restoration. releasing lock...')
  115. self.ib_conn_status = 'OK'
  116. finally:
  117. self.tlock.release()
  118. def on_ib_message(self, msg):
  119. def create_tick_kmessage(msg):
  120. d = {}
  121. for t in msg.items():
  122. d[t[0]] = t[1]
  123. d['ts'] = time.time()
  124. d['contract'] = self.id2contract['id2contracts'][msg.tickerId]
  125. d['typeName'] = msg.typeName
  126. d['source'] = 'IB'
  127. return json.dumps(d)
  128. if msg.typeName in ['tickPrice', 'tickSize']:
  129. t = create_tick_kmessage(msg)
  130. logging.debug(t)
  131. if self.toggle:
  132. print t
  133. self.producer.send_messages(IbKafkaProducer.IB_TICK_PRICE if msg.typeName == 'tickPrice' else IbKafkaProducer.IB_TICK_SIZE, t)
  134. if self.persist['is_persist']:
  135. self.write_message_to_file(t)
  136. # print ",%0.4f,%0.4f" % (msg.pos, msg.avgCost)
  137. # self.producer.send_messages('my.topic', "%0.4f,%0.4f" % (msg.pos, msg.avgCost))
  138. elif msg.typeName in ['error']:
  139. logging.error('on_ib_message: %s %s' % (msg.errorMsg, self.id2contract['id2contracts'][msg.id] if msg.errorCode == 200 else str(msg.errorCode)) )
  140. else:
  141. if self.toggle:
  142. print msg
  143. def write_message_to_file(self, t):
  144. if self.persist['file_exist'] == False:
  145. fn = '%s/ibkdump-%s.txt' % (self.persist['persist_dir'], datetime.datetime.now().strftime('%Y%m%d%H%M%S'))
  146. logging.debug('write_message_to_file: path %s', fn)
  147. self.persist['fp'] = open(fn, 'w')
  148. self.persist['file_exist'] = True
  149. self.persist['spill_over_count'] = 0
  150. self.persist['fp'].write('%s|%s\n' % (datetime.datetime.now().strftime('%Y%m%d%H%M%S'), t))
  151. self.persist['spill_over_count'] +=1
  152. #print self.persist['spill_over_count']
  153. if self.persist['spill_over_count'] == self.persist['spill_over_limit']:
  154. self.persist['fp'].flush()
  155. self.persist['fp'].close()
  156. self.persist['file_exist'] = False
  157. def load_tickers(self, path=None):
  158. self.id2contract = {'conId': 1, 'id2contracts': {} }
  159. if path is None:
  160. path = self.config.get("cep", "ib.subscription.fileloc").strip('"').strip("'")
  161. logging.info('load_tickers: attempt to open file %s' % path)
  162. fr = open(path)
  163. for l in fr.readlines():
  164. if l[0] <> '#':
  165. self.add_contract(tuple([t for t in l.strip('\n').split(',')]))
  166. def do_work(self):
  167. while not self.quit:
  168. pass
  169. def run_forever(self):
  170. t = threading.Thread(target = self.do_work, args=())
  171. t.start()
  172. self.console()
  173. print 'pending ib connection to shut down...'
  174. self.disconnect()
  175. t.join()
  176. print 'shutdown complete.'
  177. def disconnect(self):
  178. self.con.disconnect()
  179. if 'fp' in self.persist and self.persist['fp']:
  180. self.persist['fp'].flush()
  181. self.persist['fp'].close()
  182. self.quit = True
  183. self.ibh.shutdown()
  184. def replay(self, dir_loc):
  185. def process_msg(fn):
  186. fp = open(fn)
  187. last_ts = None
  188. for line in fp:
  189. msg = line.split('|')[1]
  190. msg_ts = json.loads(msg)['ts']
  191. #
  192. # files = [f if f.size() > 0 else None for f in dir_loc]
  193. files = sorted([ f for f in listdir(dir_loc) if isfile(join(dir_loc,f)) ])
  194. for f in files:
  195. process_msg(f)
  196. def console(self):
  197. try:
  198. while not self.quit:
  199. print "Available commands are: l - list all subscribed contracts, a <symbol> <sectype> <exch> <ccy>"
  200. print " t - turn/on off output q - terminate program"
  201. cmd = raw_input(">>")
  202. input = cmd.split(' ')
  203. if input[0] == "q":
  204. print 'quit command received...'
  205. self.quit = True
  206. elif input[0] == 'l' :
  207. print ''.join('%s: %s\n' % (k, v) for k, v in self.id2contract['id2contracts'].iteritems())
  208. elif input[0] == 'a':
  209. self.add_contract((input[1],input[2],input[3],input[4],'',0,''))
  210. elif input[0] == 't':
  211. self.toggle = False if self.toggle else True
  212. else:
  213. pass
  214. except:
  215. exc_type, exc_value, exc_traceback = sys.exc_info()
  216. traceback.print_tb(exc_traceback, limit=1, file=sys.stdout)
  217. if __name__ == '__main__':
  218. # logging.basicConfig(#filename = "log/port.log", filemode = 'w',
  219. # level=logging.INFO,
  220. # format='%(asctime)s %(levelname)-8s %(message)s')
  221. #
  222. # config = ConfigParser.ConfigParser()
  223. # config.read("../config/app.cfg")
  224. # ik = IbKafkaProducer(config)
  225. # ik.load_tickers()
  226. # ik.run_forever()
  227. if len(sys.argv) != 2:
  228. print("Usage: %s <config file>" % sys.argv[0])
  229. exit(-1)
  230. cfg_path= sys.argv[1:]
  231. config = ConfigParser.SafeConfigParser()
  232. if len(config.read(cfg_path)) == 0:
  233. raise ValueError, "Failed to open config file"
  234. logconfig = eval(config.get("ib_mds", "ib_mds.logconfig").strip('"').strip("'"))
  235. logconfig['format'] = '%(asctime)s %(levelname)-8s %(message)s'
  236. logging.basicConfig(**logconfig)
  237. ik = IbKafkaProducer(config)
  238. ik.load_tickers()
  239. ik.run_forever()