ib_heartbeat.py 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112
  1. # -*- coding: utf-8 -*-
  2. import sys, traceback
  3. import json
  4. import logging
  5. import ConfigParser
  6. from ib.ext.Contract import Contract
  7. from ib.opt import ibConnection, message
  8. from time import sleep
  9. import time, datetime
  10. from os import listdir
  11. from os.path import isfile, join
  12. from comms.alert_bot import AlertHelper
  13. import threading, urllib2
  14. class IbHeartBeat():
  15. config = None
  16. quit = False
  17. prev_state = ''
  18. q = None
  19. chat_handle = None
  20. last_broken_time = None
  21. alert_callbacks = []
  22. def __init__(self, config):
  23. self.config = config
  24. self.chat_handle = AlertHelper(config)
  25. # ensure the message will get printed right away when the connection is first broken
  26. self.last_broken_time = datetime.datetime.now() - datetime.timedelta(seconds=90)
  27. def register_listener(self, fns):
  28. self.alert_callbacks = [fn for fn in fns]
  29. def alert_listeners(self, msg):
  30. [fn(msg) for fn in self.alert_callbacks]
  31. def run(self):
  32. t = threading.Thread(target = self.keep_trying, args=())
  33. t.start()
  34. def shutdown(self):
  35. self.quit = True
  36. def keep_trying(self):
  37. host = self.config.get("ib_heartbeat", "ib_heartbeat.gateway").strip('"').strip("'")
  38. port = int(self.config.get("ib_heartbeat", "ib_heartbeat.ib_port"))
  39. appid = int(self.config.get("ib_heartbeat", "ib_heartbeat.appid.id"))
  40. try_interval = int(self.config.get("ib_heartbeat", "ib_heartbeat.try_interval"))
  41. suppress_msg_interval = int(self.config.get("ib_heartbeat", "ib_heartbeat.suppress_msg_interval"))
  42. logging.info('ib gateway->%s:%d, appid->%d, try_interval->%d, suppress msg interval->%d' % \
  43. (host, port, appid, try_interval, suppress_msg_interval))
  44. while not self.quit:
  45. con = ibConnection(host, port, appid)
  46. rc = con.connect()
  47. if rc:
  48. if self.prev_state == 'broken':
  49. msg = '*** Connection restored at %s **********' % datetime.datetime.now().strftime('%H:%M:%S')
  50. self.chat_handle.post_msg(msg)
  51. self.alert_listeners(msg)
  52. self.prev_state = ''
  53. # reset to a much earlier time
  54. self.last_broken_time = datetime.datetime.now() - datetime.timedelta(seconds=90)
  55. con.eDisconnect()
  56. else:
  57. msg = '*** Connection to IB API is broken **********'
  58. now = datetime.datetime.now()
  59. self.prev_state = 'broken'
  60. #print now, self.last_broken_time, (now - self.last_broken_time).seconds
  61. if (now - self.last_broken_time).seconds > suppress_msg_interval:
  62. self.chat_handle.post_msg(msg)
  63. self.alert_listeners(msg)
  64. self.last_broken_time = now
  65. logging.error(msg)
  66. sleep(try_interval)
  67. if __name__ == '__main__':
  68. if len(sys.argv) != 2:
  69. print("Usage: %s <config file>" % sys.argv[0])
  70. exit(-1)
  71. cfg_path= sys.argv[1:]
  72. config = ConfigParser.SafeConfigParser()
  73. if len(config.read(cfg_path)) == 0:
  74. raise ValueError, "Failed to open config file"
  75. logconfig = eval(config.get("ib_mds", "ib_mds.logconfig").strip('"').strip("'"))
  76. logconfig['format'] = '%(asctime)s %(levelname)-8s %(message)s'
  77. logging.basicConfig(**logconfig)
  78. ibh = IbHeartBeat(config)
  79. def warn_me(msg):
  80. print 'warn_me: received %s' % msg
  81. ibh.register_listener([warn_me])
  82. ibh.run()