|
@@ -13,7 +13,7 @@ from os.path import isfile, join
|
|
|
from threading import Lock
|
|
from threading import Lock
|
|
|
from comms.ib_heartbeat import IbHeartBeat
|
|
from comms.ib_heartbeat import IbHeartBeat
|
|
|
import threading, urllib2
|
|
import threading, urllib2
|
|
|
-
|
|
|
|
|
|
|
+from optparse import OptionParser
|
|
|
#from options_data import ContractHelper
|
|
#from options_data import ContractHelper
|
|
|
|
|
|
|
|
import finopt.options_data as options_data
|
|
import finopt.options_data as options_data
|
|
@@ -45,13 +45,13 @@ class IbKafkaProducer():
|
|
|
id2contract = {'conId': 1, 'id2contracts': {} }
|
|
id2contract = {'conId': 1, 'id2contracts': {} }
|
|
|
|
|
|
|
|
|
|
|
|
|
- def __init__(self, config):
|
|
|
|
|
|
|
+ def __init__(self, config, replay = False):
|
|
|
|
|
|
|
|
self.config = config
|
|
self.config = config
|
|
|
self.tlock = Lock()
|
|
self.tlock = Lock()
|
|
|
- host = self.config.get("ib_mds", "ib_mds.gateway").strip('"').strip("'")
|
|
|
|
|
- port = int(self.config.get("ib_mds", "ib_mds.ib_port"))
|
|
|
|
|
- appid = int(self.config.get("ib_mds", "ib_mds.appid.id"))
|
|
|
|
|
|
|
+# host = self.config.get("ib_mds", "ib_mds.gateway").strip('"').strip("'")
|
|
|
|
|
+# port = int(self.config.get("ib_mds", "ib_mds.ib_port"))
|
|
|
|
|
+# appid = int(self.config.get("ib_mds", "ib_mds.appid.id"))
|
|
|
kafka_host = self.config.get("cep", "kafka.host").strip('"').strip("'")
|
|
kafka_host = self.config.get("cep", "kafka.host").strip('"').strip("'")
|
|
|
|
|
|
|
|
self.persist['is_persist'] = self.config.get("ib_mds", "ib_mds.is_persist")
|
|
self.persist['is_persist'] = self.config.get("ib_mds", "ib_mds.is_persist")
|
|
@@ -62,25 +62,32 @@ class IbKafkaProducer():
|
|
|
|
|
|
|
|
IbKafkaProducer.IB_TICK_PRICE = self.config.get("cep", "kafka.ib.topic.tick_price").strip('"').strip("'")
|
|
IbKafkaProducer.IB_TICK_PRICE = self.config.get("cep", "kafka.ib.topic.tick_price").strip('"').strip("'")
|
|
|
IbKafkaProducer.IB_TICK_SIZE = self.config.get("cep", "kafka.ib.topic.tick_size").strip('"').strip("'")
|
|
IbKafkaProducer.IB_TICK_SIZE = self.config.get("cep", "kafka.ib.topic.tick_size").strip('"').strip("'")
|
|
|
- self.con = ibConnection(host, port, appid)
|
|
|
|
|
- self.con.registerAll(self.on_ib_message)
|
|
|
|
|
- rc = self.con.connect()
|
|
|
|
|
- if rc:
|
|
|
|
|
- self.ib_conn_status = 'OK'
|
|
|
|
|
|
|
+# self.con = ibConnection(host, port, appid)
|
|
|
|
|
+# self.con.registerAll(self.on_ib_message)
|
|
|
|
|
+# rc = self.con.connect()
|
|
|
|
|
+# if rc:
|
|
|
|
|
+# self.ib_conn_status = 'OK'
|
|
|
|
|
+
|
|
|
|
|
+# logging.info('IbKafkaProducer: connection status to IB is %d' % rc)
|
|
|
|
|
+
|
|
|
logging.info('******* Starting IbKafkaProducer')
|
|
logging.info('******* Starting IbKafkaProducer')
|
|
|
- logging.info('IbKafkaProducer: connection status to IB is %d' % rc)
|
|
|
|
|
logging.info('IbKafkaProducer: connecting to kafka host: %s...' % kafka_host)
|
|
logging.info('IbKafkaProducer: connecting to kafka host: %s...' % kafka_host)
|
|
|
logging.info('IbKafkaProducer: message mode is async')
|
|
logging.info('IbKafkaProducer: message mode is async')
|
|
|
|
|
|
|
|
client = KafkaClient(kafka_host)
|
|
client = KafkaClient(kafka_host)
|
|
|
self.producer = SimpleProducer(client, async=False)
|
|
self.producer = SimpleProducer(client, async=False)
|
|
|
|
|
|
|
|
- # start heart beat monitor
|
|
|
|
|
- self.ibh = IbHeartBeat(config)
|
|
|
|
|
- self.ibh.register_listener([self.on_ib_conn_broken])
|
|
|
|
|
- self.ibh.run()
|
|
|
|
|
|
|
+ if not replay:
|
|
|
|
|
+ self.start_ib_connection()
|
|
|
|
|
+
|
|
|
|
|
+# # start heart beat monitor
|
|
|
|
|
+# self.ibh = IbHeartBeat(config)
|
|
|
|
|
+# self.ibh.register_listener([self.on_ib_conn_broken])
|
|
|
|
|
+# #self.ibh.run()
|
|
|
|
|
|
|
|
def pub_cn_index(self, sec):
|
|
def pub_cn_index(self, sec):
|
|
|
|
|
+ #http://blog.csdn.net/moneyice/article/details/7877030
|
|
|
|
|
+ # hkHSI, hkHSCEI, hkHSCCI
|
|
|
|
|
|
|
|
qs = '0000001,1399001,1399300'
|
|
qs = '0000001,1399001,1399300'
|
|
|
url = 'http://api.money.126.net/data/feed/%s?callback=ne3587367b7387dc' % qs
|
|
url = 'http://api.money.126.net/data/feed/%s?callback=ne3587367b7387dc' % qs
|
|
@@ -118,10 +125,10 @@ class IbKafkaProducer():
|
|
|
def on_ib_conn_broken(self, msg):
|
|
def on_ib_conn_broken(self, msg):
|
|
|
logging.error('IbKafkaProducer: connection is broken!')
|
|
logging.error('IbKafkaProducer: connection is broken!')
|
|
|
self.ib_conn_status = 'ERROR'
|
|
self.ib_conn_status = 'ERROR'
|
|
|
- self.tlock.acquire()
|
|
|
|
|
- try:
|
|
|
|
|
- if self.ib_conn_status == 'OK':
|
|
|
|
|
- return
|
|
|
|
|
|
|
+ self.tlock.acquire() # this function may get called multiple times
|
|
|
|
|
+ try: # block until another party finishes executing
|
|
|
|
|
+ if self.ib_conn_status == 'OK': # check status
|
|
|
|
|
+ return # if already fixed up while waiting, return
|
|
|
|
|
|
|
|
self.con.eDisconnect()
|
|
self.con.eDisconnect()
|
|
|
|
|
|
|
@@ -138,6 +145,7 @@ class IbKafkaProducer():
|
|
|
logging.info('IbKafkaProducer: connection status to IB is %d (0-broken 1-good)' % rc)
|
|
logging.info('IbKafkaProducer: connection status to IB is %d (0-broken 1-good)' % rc)
|
|
|
sleep(2)
|
|
sleep(2)
|
|
|
|
|
|
|
|
|
|
+ # we arrived here because the connection has been restored
|
|
|
if not self.quit:
|
|
if not self.quit:
|
|
|
# resubscribe tickers again!
|
|
# resubscribe tickers again!
|
|
|
self.load_tickers()
|
|
self.load_tickers()
|
|
@@ -204,13 +212,31 @@ class IbKafkaProducer():
|
|
|
self.persist['fp'].close()
|
|
self.persist['fp'].close()
|
|
|
self.persist['file_exist'] = False
|
|
self.persist['file_exist'] = False
|
|
|
|
|
|
|
|
|
|
+
|
|
|
|
|
+ def start_ib_connection(self):
|
|
|
|
|
+ host = self.config.get("ib_mds", "ib_mds.gateway").strip('"').strip("'")
|
|
|
|
|
+ port = int(self.config.get("ib_mds", "ib_mds.ib_port"))
|
|
|
|
|
+ appid = int(self.config.get("ib_mds", "ib_mds.appid.id"))
|
|
|
|
|
+ self.con = ibConnection(host, port, appid)
|
|
|
|
|
+ self.con.registerAll(self.on_ib_message)
|
|
|
|
|
+ rc = self.con.connect()
|
|
|
|
|
+ if rc:
|
|
|
|
|
+ self.ib_conn_status = 'OK'
|
|
|
|
|
+ logging.info('start_ib_connection: connection status to IB is %d' % rc)
|
|
|
|
|
+
|
|
|
|
|
+ # start heart beat monitor
|
|
|
|
|
+ self.ibh = IbHeartBeat(config)
|
|
|
|
|
+ self.ibh.register_listener([self.on_ib_conn_broken])
|
|
|
|
|
+ self.ibh.run()
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
def load_tickers(self, path=None):
|
|
def load_tickers(self, path=None):
|
|
|
|
|
|
|
|
self.id2contract = {'conId': 1, 'id2contracts': {} }
|
|
self.id2contract = {'conId': 1, 'id2contracts': {} }
|
|
|
|
|
|
|
|
|
|
|
|
|
if path is None:
|
|
if path is None:
|
|
|
- path = self.config.get("cep", "ib.subscription.fileloc").strip('"').strip("'")
|
|
|
|
|
|
|
+ path = self.config.get("ib_mds", "ib_mds.subscription.fileloc").strip('"').strip("'")
|
|
|
logging.info('load_tickers: attempt to open file %s' % path)
|
|
logging.info('load_tickers: attempt to open file %s' % path)
|
|
|
fr = open(path)
|
|
fr = open(path)
|
|
|
for l in fr.readlines():
|
|
for l in fr.readlines():
|
|
@@ -245,15 +271,27 @@ class IbKafkaProducer():
|
|
|
|
|
|
|
|
def replay(self, dir_loc):
|
|
def replay(self, dir_loc):
|
|
|
|
|
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
def process_msg(fn):
|
|
def process_msg(fn):
|
|
|
fp = open(fn)
|
|
fp = open(fn)
|
|
|
- last_ts = None
|
|
|
|
|
|
|
+ logging.info('replay file %s' % fn)
|
|
|
|
|
+ last_record_ts = None
|
|
|
for line in fp:
|
|
for line in fp:
|
|
|
- msg = line.split('|')[1]
|
|
|
|
|
- msg_ts = json.loads(msg)['ts']
|
|
|
|
|
-#
|
|
|
|
|
-# files = [f if f.size() > 0 else None for f in dir_loc]
|
|
|
|
|
- files = sorted([ f for f in listdir(dir_loc) if isfile(join(dir_loc,f)) ])
|
|
|
|
|
|
|
+
|
|
|
|
|
+ s_msg = line.split('|')[1]
|
|
|
|
|
+ msg = json.loads(s_msg)
|
|
|
|
|
+ msg_ts = datetime.datetime.fromtimestamp(msg['ts'])
|
|
|
|
|
+ interval = (msg_ts - (last_record_ts if last_record_ts <> None else msg_ts)).microseconds / 1000000.0
|
|
|
|
|
+
|
|
|
|
|
+ print '%s %s %s' % (msg_ts.strftime('%Y-%m-%d %H:%M:%S.%f'), s_msg, fn)
|
|
|
|
|
+ self.producer.send_messages(IbKafkaProducer.IB_TICK_PRICE if msg['typeName'] == 'tickPrice' else IbKafkaProducer.IB_TICK_SIZE, s_msg)
|
|
|
|
|
+
|
|
|
|
|
+ last_record_ts = msg_ts
|
|
|
|
|
+ sleep(interval)
|
|
|
|
|
+
|
|
|
|
|
+ files = sorted([ join(dir_loc,f) for f in listdir(dir_loc) if isfile(join(dir_loc,f)) ])
|
|
|
|
|
+
|
|
|
for f in files:
|
|
for f in files:
|
|
|
process_msg(f)
|
|
process_msg(f)
|
|
|
|
|
|
|
@@ -284,23 +322,24 @@ class IbKafkaProducer():
|
|
|
|
|
|
|
|
if __name__ == '__main__':
|
|
if __name__ == '__main__':
|
|
|
|
|
|
|
|
-# logging.basicConfig(#filename = "log/port.log", filemode = 'w',
|
|
|
|
|
-# level=logging.INFO,
|
|
|
|
|
-# format='%(asctime)s %(levelname)-8s %(message)s')
|
|
|
|
|
-#
|
|
|
|
|
-# config = ConfigParser.ConfigParser()
|
|
|
|
|
-# config.read("../config/app.cfg")
|
|
|
|
|
-# ik = IbKafkaProducer(config)
|
|
|
|
|
-# ik.load_tickers()
|
|
|
|
|
-# ik.run_forever()
|
|
|
|
|
|
|
|
|
|
|
|
+ parser = OptionParser()
|
|
|
|
|
+
|
|
|
|
|
+ parser.add_option("-r", "--replay",
|
|
|
|
|
+ dest="replay_dir",
|
|
|
|
|
+ help="replay recorded mds files stored in the specified directory")
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
|
|
|
|
|
+ options, arguments = parser.parse_args()
|
|
|
|
|
+
|
|
|
|
|
+ #print options, arguments
|
|
|
|
|
|
|
|
- if len(sys.argv) != 2:
|
|
|
|
|
- print("Usage: %s <config file>" % sys.argv[0])
|
|
|
|
|
|
|
+ if len(sys.argv) < 2:
|
|
|
|
|
+ print("Usage: %s [options] <config file>" % sys.argv[0])
|
|
|
exit(-1)
|
|
exit(-1)
|
|
|
|
|
|
|
|
- cfg_path= sys.argv[1:]
|
|
|
|
|
|
|
+ cfg_path= arguments[0]
|
|
|
config = ConfigParser.SafeConfigParser()
|
|
config = ConfigParser.SafeConfigParser()
|
|
|
if len(config.read(cfg_path)) == 0:
|
|
if len(config.read(cfg_path)) == 0:
|
|
|
raise ValueError, "Failed to open config file"
|
|
raise ValueError, "Failed to open config file"
|
|
@@ -309,6 +348,13 @@ if __name__ == '__main__':
|
|
|
logconfig = eval(config.get("ib_mds", "ib_mds.logconfig").strip('"').strip("'"))
|
|
logconfig = eval(config.get("ib_mds", "ib_mds.logconfig").strip('"').strip("'"))
|
|
|
logconfig['format'] = '%(asctime)s %(levelname)-8s %(message)s'
|
|
logconfig['format'] = '%(asctime)s %(levelname)-8s %(message)s'
|
|
|
logging.basicConfig(**logconfig)
|
|
logging.basicConfig(**logconfig)
|
|
|
- ik = IbKafkaProducer(config)
|
|
|
|
|
- ik.load_tickers()
|
|
|
|
|
- ik.run_forever()
|
|
|
|
|
|
|
+ replay = True if options.replay_dir <> None else False
|
|
|
|
|
+ ik = IbKafkaProducer(config, replay)
|
|
|
|
|
+
|
|
|
|
|
+ if not replay:
|
|
|
|
|
+ ik.load_tickers()
|
|
|
|
|
+ else:
|
|
|
|
|
+ ik.replay(options.replay_dir)
|
|
|
|
|
+
|
|
|
|
|
+ ik.run_forever()
|
|
|
|
|
+
|