| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314 |
- # -*- coding: utf-8 -*-
- import sys, traceback
- import json
- import logging
- import ConfigParser
- from ib.ext.Contract import Contract
- from ib.opt import ibConnection, message
- from time import sleep
- import time, datetime
- from os import listdir
- from os.path import isfile, join
- from threading import Lock
- from comms.ib_heartbeat import IbHeartBeat
- import threading, urllib2
- #from options_data import ContractHelper
- import finopt.options_data as options_data
- from kafka.client import KafkaClient
- from kafka.producer import SimpleProducer
- from comms.alert_bot import AlertHelper
- ##
- ## to run, start kafka server on vsu-01 <administrator, password>
- ## start ibgw or tws
- ## edit subscript list
- ## check the settings in the console
- ##
- class IbKafkaProducer():
-
- config = None
- con = None
- quit = False
- producer = None
- IB_TICK_PRICE = None
- IB_TICK_SIZE = None
- toggle = False
- persist = {}
- ibh = None
- tlock = None
- ib_conn_status = None
-
- id2contract = {'conId': 1, 'id2contracts': {} }
-
-
- def __init__(self, config):
-
- self.config = config
- self.tlock = Lock()
- host = self.config.get("ib_mds", "ib_mds.gateway").strip('"').strip("'")
- port = int(self.config.get("ib_mds", "ib_mds.ib_port"))
- appid = int(self.config.get("ib_mds", "ib_mds.appid.id"))
- kafka_host = self.config.get("cep", "kafka.host").strip('"').strip("'")
-
- self.persist['is_persist'] = self.config.get("ib_mds", "ib_mds.is_persist")
- self.persist['persist_dir'] =self.config.get("ib_mds", "ib_mds.persist_dir").strip('"').strip("'")
- self.persist['file_exist'] = False
- self.persist['spill_over_limit'] = int(self.config.get("ib_mds", "ib_mds.spill_over_limit"))
-
- IbKafkaProducer.IB_TICK_PRICE = self.config.get("cep", "kafka.ib.topic.tick_price").strip('"').strip("'")
- IbKafkaProducer.IB_TICK_SIZE = self.config.get("cep", "kafka.ib.topic.tick_size").strip('"').strip("'")
- self.con = ibConnection(host, port, appid)
- self.con.registerAll(self.on_ib_message)
- rc = self.con.connect()
- if rc:
- self.ib_conn_status = 'OK'
- logging.info('******* Starting IbKafkaProducer')
- logging.info('IbKafkaProducer: connection status to IB is %d' % rc)
- logging.info('IbKafkaProducer: connecting to kafka host: %s...' % kafka_host)
- logging.info('IbKafkaProducer: message mode is async')
-
- client = KafkaClient(kafka_host)
- self.producer = SimpleProducer(client, async=False)
-
- # start heart beat monitor
- self.ibh = IbHeartBeat(config)
- self.ibh.register_listener([self.on_ib_conn_broken])
- self.ibh.run()
-
- def pub_cn_index(self, sec):
-
- qs = '0000001,1399001,1399300'
- url = 'http://api.money.126.net/data/feed/%s?callback=ne3587367b7387dc' % qs
-
- while 1:
-
- pg = urllib2.urlopen(url.encode('utf-8'))
- s = pg.read().replace('ne3587367b7387dc(', '')
- s = s[:len(s)-2]
-
- d = json.loads(s)
-
- tick_id = 9000
- for k,v in d.iteritems():
-
-
- c_name = '%s-%s-%s-%s' % (v['type'],v['code'],0,0)
- msg = "%s,%d,%d,%s,%s" % (datetime.datetime.now().strftime('%Y%m%d%H%M%S'), tick_id ,\
- 4, v['price'], c_name)
- print msg
- self.producer.send_messages('my.price', msg.encode('utf-8'))
- tick_id+=1
- sleep(sec)
-
-
- def add_contract(self, tuple):
- print tuple
- c = options_data.ContractHelper.makeContract(tuple)
- self.con.reqMktData(self.id2contract['conId'], c, '', False)
- self.id2contract['id2contracts'][self.id2contract['conId']] = options_data.ContractHelper.makeRedisKeyEx(c)
- self.id2contract['conId']+=1
-
- def on_ib_conn_broken(self, msg):
- logging.error('IbKafkaProducer: connection is broken!')
- self.ib_conn_status = 'ERROR'
- self.tlock.acquire()
- try:
- if self.ib_conn_status == 'OK':
- return
-
- self.con.eDisconnect()
-
-
- host = self.config.get("ib_mds", "ib_mds.gateway").strip('"').strip("'")
- port = int(self.config.get("ib_mds", "ib_mds.ib_port"))
- appid = int(self.config.get("ib_mds", "ib_mds.appid.id"))
- self.con = ibConnection(host, port, appid)
- self.con.registerAll(self.on_ib_message)
- rc = None
- while not rc and not self.quit:
- logging.error('IbKafkaProducer: attempt reconnection!')
- rc = self.con.connect()
- logging.info('IbKafkaProducer: connection status to IB is %d (0-broken 1-good)' % rc)
- sleep(2)
-
- if not self.quit:
- # resubscribe tickers again!
- self.load_tickers()
- a = AlertHelper(self.config)
- a.post_msg('ib_mds recovered from broken ib conn, re-subscribe tickers...')
- logging.debug('on_ib_conn_broken: completed restoration. releasing lock...')
- self.ib_conn_status = 'OK'
-
- finally:
- self.tlock.release()
-
-
-
-
- def on_ib_message(self, msg):
- def create_tick_kmessage(msg):
- d = {}
- for t in msg.items():
- d[t[0]] = t[1]
- d['ts'] = time.time()
- d['contract'] = self.id2contract['id2contracts'][msg.tickerId]
- d['typeName'] = msg.typeName
- d['source'] = 'IB'
- return json.dumps(d)
-
- if msg.typeName in ['tickPrice', 'tickSize']:
- t = create_tick_kmessage(msg)
- logging.debug(t)
- if self.toggle:
- print t
- self.producer.send_messages(IbKafkaProducer.IB_TICK_PRICE if msg.typeName == 'tickPrice' else IbKafkaProducer.IB_TICK_SIZE, t)
- if self.persist['is_persist']:
- self.write_message_to_file(t)
-
-
-
- # print ",%0.4f,%0.4f" % (msg.pos, msg.avgCost)
- # self.producer.send_messages('my.topic', "%0.4f,%0.4f" % (msg.pos, msg.avgCost))
- elif msg.typeName in ['error']:
- logging.error('on_ib_message: %s %s' % (msg.errorMsg, self.id2contract['id2contracts'][msg.id] if msg.errorCode == 200 else str(msg.errorCode)) )
- else:
- if self.toggle:
- print msg
-
-
- def write_message_to_file(self, t):
- if self.persist['file_exist'] == False:
- fn = '%s/ibkdump-%s.txt' % (self.persist['persist_dir'], datetime.datetime.now().strftime('%Y%m%d%H%M%S'))
- logging.debug('write_message_to_file: path %s', fn)
- self.persist['fp'] = open(fn, 'w')
- self.persist['file_exist'] = True
- self.persist['spill_over_count'] = 0
-
- self.persist['fp'].write('%s|%s\n' % (datetime.datetime.now().strftime('%Y%m%d%H%M%S'), t))
- self.persist['spill_over_count'] +=1
- #print self.persist['spill_over_count']
- if self.persist['spill_over_count'] == self.persist['spill_over_limit']:
- self.persist['fp'].flush()
- self.persist['fp'].close()
- self.persist['file_exist'] = False
- def load_tickers(self, path=None):
-
- self.id2contract = {'conId': 1, 'id2contracts': {} }
-
-
- if path is None:
- path = self.config.get("cep", "ib.subscription.fileloc").strip('"').strip("'")
- logging.info('load_tickers: attempt to open file %s' % path)
- fr = open(path)
- for l in fr.readlines():
- if l[0] <> '#':
-
- self.add_contract(tuple([t for t in l.strip('\n').split(',')]))
-
- def do_work(self):
- while not self.quit:
- pass
-
- def run_forever(self):
- t = threading.Thread(target = self.do_work, args=())
- t.start()
- self.console()
- print 'pending ib connection to shut down...'
- self.disconnect()
- t.join()
- print 'shutdown complete.'
-
- def disconnect(self):
- self.con.disconnect()
- if 'fp' in self.persist and self.persist['fp']:
- self.persist['fp'].flush()
- self.persist['fp'].close()
- self.quit = True
- self.ibh.shutdown()
- def replay(self, dir_loc):
-
- def process_msg(fn):
- fp = open(fn)
- last_ts = None
- for line in fp:
- msg = line.split('|')[1]
- msg_ts = json.loads(msg)['ts']
- #
- # files = [f if f.size() > 0 else None for f in dir_loc]
- files = sorted([ f for f in listdir(dir_loc) if isfile(join(dir_loc,f)) ])
- for f in files:
- process_msg(f)
-
- def console(self):
- try:
- while not self.quit:
- print "Available commands are: l - list all subscribed contracts, a <symbol> <sectype> <exch> <ccy>"
- print " t - turn/on off output q - terminate program"
- cmd = raw_input(">>")
- input = cmd.split(' ')
- if input[0] == "q":
- print 'quit command received...'
- self.quit = True
- elif input[0] == 'l' :
- print ''.join('%s: %s\n' % (k, v) for k, v in self.id2contract['id2contracts'].iteritems())
- elif input[0] == 'a':
- self.add_contract((input[1],input[2],input[3],input[4],'',0,''))
- elif input[0] == 't':
- self.toggle = False if self.toggle else True
- else:
- pass
-
- except:
- exc_type, exc_value, exc_traceback = sys.exc_info()
- traceback.print_tb(exc_traceback, limit=1, file=sys.stdout)
-
- if __name__ == '__main__':
-
- # logging.basicConfig(#filename = "log/port.log", filemode = 'w',
- # level=logging.INFO,
- # format='%(asctime)s %(levelname)-8s %(message)s')
- #
- # config = ConfigParser.ConfigParser()
- # config.read("../config/app.cfg")
- # ik = IbKafkaProducer(config)
- # ik.load_tickers()
- # ik.run_forever()
-
-
-
- if len(sys.argv) != 2:
- print("Usage: %s <config file>" % sys.argv[0])
- exit(-1)
- cfg_path= sys.argv[1:]
- config = ConfigParser.SafeConfigParser()
- if len(config.read(cfg_path)) == 0:
- raise ValueError, "Failed to open config file"
-
-
- logconfig = eval(config.get("ib_mds", "ib_mds.logconfig").strip('"').strip("'"))
- logconfig['format'] = '%(asctime)s %(levelname)-8s %(message)s'
- logging.basicConfig(**logconfig)
- ik = IbKafkaProducer(config)
- ik.load_tickers()
- ik.run_forever()
|