order_manager.py 11 KB


  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. import logging
  4. import sys, traceback
  5. import json
  6. from time import sleep
  7. from misc2.helpers import ContractHelper
  8. from misc2.observer import Subscriber
  9. from ib.ext.Contract import Contract
  10. from comms.ibgw.base_messaging import BaseMessageListener
  11. from comms.ibgw.tws_event_handler import TWS_event_handler
  12. from Queue import Queue
  13. import threading
  14. import uuid
  15. import numpy as np
  16. class OrderManagerException(Exception):
  17. pass
  18. class OrderIdManager(threading.Thread):
  19. def __init__(self, tws_conn):
  20. threading.Thread.__init__(self)
  21. self.id_request_q = Queue()
  22. self.next_valid_q = Queue()
  23. self.tws_conn = tws_conn
  24. self.stop = False
  25. self.tws_pending = False
  26. self.clordid_id = {}
  27. '''
  28. expects a client order id of md5
  29. '''
  30. def request_id(self, client_id, clordid):
  31. self.id_request_q.put((client_id, clordid))
  32. self.id_request_q.task_done()
  33. ''' this function is called by tws_event_handler upon
  34. receiving nextValidId reply from TWS
  35. '''
  36. def update_next_valid_id(self, orderId):
  37. self.next_valid_q.put(orderId)
  38. self.next_valid_q.task_done()
  39. self.tws_pending = False
  40. def set_stop(self):
  41. self.stop = True
  42. logging.info('OrderIdManager: set stop flag to true')
  43. def run(self):
  44. while not self.stop:
  45. if not self.id_request_q.empty():
  46. if not self.next_valid_q.empty():
  47. next_valid_id = self.next_valid_q.get()
  48. print 'get next valid %d next_valid_q size->%d' % (next_valid_id, self.next_valid_q.qsize())
  49. client_id, clordid = self.id_request_q.get()
  50. #
  51. # report the assigned id back to the client side
  52. self.dispatch_event('OrderRequestAck', {'client_id': client_id, 'clordid': clordid, 'next_valid_id': next_valid_id})
  53. self.clordid_id[clordid] = {'client_id': client_id, 'next_valid_id': next_valid_id}
  54. else:
  55. #
  56. # request a next id from TWS
  57. if not self.tws_pending:
  58. self.tws_pending = True
  59. logging.info('OrderIdManager: before call to tws_conn.reqIds for the next avail id')
  60. self.tws_conn.reqIds(-1)
  61. # to prevent excessive CPU use
  62. sleep(0.1)
  63. def assigned_id(self, clordid):
  64. try:
  65. return self.clordid_id[clordid]
  66. except KeyError:
  67. return None
  68. def dispatch_event(self, event, dict):
  69. logging.info ('OrderIdManager: [%s] => %s' % (event, json.dumps(dict)))
  70. class OrderIdManagerFake(threading.Thread):
  71. def __init__(self, tws_conn):
  72. threading.Thread.__init__(self)
  73. self.id_request_q = Queue()
  74. self.next_valid_q = Queue()
  75. self.tws_conn = tws_conn
  76. self.stop = False
  77. self.tws_pending = False
  78. #wait_th = threading.Thread(target=get_user_input, args=(response,))
  79. self.fake_id = 1
  80. '''
  81. expects a client order id of md5
  82. '''
  83. def on_id_request(self, client_id, clordid):
  84. self.id_request_q.put((client_id, clordid))
  85. self.id_request_q.task_done()
  86. ''' TWS events
  87. '''
  88. def nextValidId(self, orderId):
  89. self.next_valid_q.put(orderId)
  90. self.next_valid_q.task_done()
  91. self.tws_pending = False
  92. def run(self):
  93. while not self.stop:
  94. if not self.id_request_q.empty():
  95. print 'id_request_q size %d' % self.id_request_q.qsize()
  96. if not self.next_valid_q.empty():
  97. next_valid_id = self.next_valid_q.get()
  98. print 'get next valid %d next_valid_q size->%d' % (next_valid_id, self.next_valid_q.qsize())
  99. client_id, clordid = self.id_request_q.get()
  100. #
  101. # dispatch ?
  102. self.dispatch_event('OrderRequestAck', {'client_id': client_id, 'clordid': clordid, 'next_valid_id': next_valid_id})
  103. else:
  104. #
  105. # request a next id from TWS
  106. if not self.tws_pending:
  107. self.tws_pending = True
  108. #self.tws_conn.reqIds(-1)
  109. self.fake_gen_id()
  110. # to prevent excessive CPU use
  111. sleep(0.1)
  112. def fake_gen_id(self):
  113. print 'fake id %d' % self.fake_id
  114. sleep(0.4)
  115. self.nextValidId(self.fake_id)
  116. self.fake_id+=1
  117. def dispatch_event(self, event, dict):
  118. #logging.info('OrderIdManager: [%s] => %s', (event, json.dumps(dict)))
  119. print 'OrderIdManager: [%s] => %s' % (event, json.dumps(dict))
  120. class OrderBook(Subscriber):
  121. OPEN_ORDER_STATUS = ['PendingSubmit', 'PendingCancel','PreSubmitted','Submitted',
  122. 'Inactive']
  123. def __init__(self, name):
  124. '''
  125. orderbook:
  126. '''
  127. Subscriber.__init__(self, name)
  128. self.name = name
  129. self.open_order = False
  130. self.orders = {}
  131. def handle_order_status(self, orderId, status, filled, remaining, avgFillPrice, permId, parentId, lastFillPrice, clientId, whyHeld):
  132. try:
  133. _ = self.orders[orderId]
  134. except KeyError:
  135. self.orders[orderId]= {'ord_status':{}, 'error':{}, 'order':{}, 'contract': {}, 'state':{}}
  136. self.orders[orderId]['ord_status']['status'] = status
  137. self.orders[orderId]['ord_status']['filled'] = filled
  138. self.orders[orderId]['ord_status']['remaining'] = remaining
  139. self.orders[orderId]['ord_status']['avgFillPrice'] = avgFillPrice
  140. self.orders[orderId]['ord_status']['permId'] = permId
  141. self.orders[orderId]['ord_status']['parentId'] = parentId
  142. self.orders[orderId]['ord_status']['lastFillPrice'] = lastFillPrice
  143. self.orders[orderId]['ord_status']['clientId'] = clientId
  144. self.orders[orderId]['ord_status']['whyHeld'] = whyHeld
  145. def handle_error(self, id, errorCode, errorMsg):
  146. try:
  147. _ = self.orders[id]
  148. except KeyError:
  149. self.orders[id]= {'ord_status':{}, 'error':{}, 'order':{}, 'contract': {}}
  150. self.orders[id]['error'] = {'errorCode': errorCode, 'errorMsg': errorMsg}
  151. def handle_open_order(self, orderId, state, order, contract):
  152. try:
  153. _ = self.orders[orderId]
  154. except KeyError:
  155. self.orders[orderId]= {'ord_status':{}, 'error':{}, 'order':{}, 'contract': {}, 'state':{}}
  156. self.orders[orderId]['order'] = order
  157. self.orders[orderId]['contract'] = contract
  158. self.orders[orderId]['state'] = state
  159. def update(self, event, **param):
  160. if event == 'orderStatus':
  161. self.handle_order_status(**param)
  162. elif event == 'openOrder':
  163. self.open_order = True
  164. self.handle_open_order(**param)
  165. elif event == 'openOrderEnd':
  166. self.open_order = False
  167. elif event == 'error':
  168. try:
  169. id = param['id']
  170. if id <> -1:
  171. self.handle_error(**param)
  172. except:
  173. logging.error('OrderBook ERROR: in processing tws error event %s' % param)
  174. return
  175. def get_order_status(self, orderId):
  176. try:
  177. orderId = int(orderId)
  178. return self.orders[orderId]
  179. except:
  180. return None
  181. def get_open_orders(self):
  182. def filter_order_by_type(id_ord):
  183. try:
  184. if id_ord[1]['ord_status']['status'] in OrderBook.OPEN_ORDER_STATUS:
  185. return id_ord[0]
  186. except:
  187. pass
  188. return None
  189. try:
  190. i = 0
  191. while self.open_order:
  192. sleep(0.5)
  193. i += 1
  194. logging.warn('OrderBook: waiting open_order status to change to finish: round # %d... ' % i)
  195. if i == 10:
  196. raise OrderManagerException
  197. res = map(filter_order_by_type, [(id, orders) for id, orders in self.orders.iteritems()])
  198. return res
  199. except:
  200. logging.error('OrderBook: get_open_orders exception %s' % traceback.format_exc())
  201. return None
  202. '''
  203. client side order manager
  204. '''
  205. class OrderManager(BaseMessageListener):
  206. def __init__(self, name, gw_parent, kwargs):
  207. self.name = name
  208. self.gw_parent = gw_parent
  209. self.tws_connect = self.gw_parent.get_tws_connection()
  210. self.tws_event_handler = gw_parent.get_tws_event_handler()
  211. self.order_book = OrderBook(self.name)
  212. self.order_id_mgr = OrderIdManager(self.tws_connect)
  213. self.tws_event_handler.set_order_id_manager(self.order_id_mgr)
  214. '''
  215. ask tws_event_handler to forward order messages to
  216. the order_book class
  217. '''
  218. for e in TWS_event_handler.PUBLISH_TWS_EVENTS:
  219. self.tws_event_handler.register(e, self.order_book)
  220. def start_order_manager(self):
  221. self.order_id_mgr.start()
  222. '''
  223. rebuild order book
  224. ****
  225. persitence function missing!!
  226. to be implemented
  227. '''
  228. self.tws_connect.reqOpenOrders()
  229. logging.info('OrderManager: start_order_manager: request open orders')
  230. def get_order_id_mgr(self):
  231. return self.order_id_mgr
  232. def get_order_book(self):
  233. return self.order_book
  234. def is_id_in_order_book(self, id):
  235. if self.get_order_book().get_order_status(id):
  236. return True
  237. return False
  238. if __name__ == '__main__':
  239. oim = OrderIdManagerFake(1)
  240. def do_stuff(client):
  241. for i in range(10):
  242. oim.on_id_request(client, str(uuid.uuid4()))
  243. sleep(np.random.uniform(0, 1) * 0.5)
  244. for j in range(2):
  245. threading.Thread(target=do_stuff, args=(['client%d'% j])).start()
  246. #sleep(2)
  247. oim.start()