tick_datastore.py 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226
  1. import logging
  2. import json
  3. from threading import RLock
  4. from misc2.observer import Publisher
  5. from misc2.observer import NotImplementedException
  6. from misc2.helpers import ContractHelper
  7. from comms.ibc.base_client_messaging import AbstractGatewayListener
  8. import traceback
  9. from finopt.instrument import Symbol
  10. class TickDataStore(Publisher):
  11. """
  12. Data structure:
  13. """
  14. '''
  15. EVENT_TICK_UPDATED
  16. param = {'update_mode': A|D|U <- add/udpate/delete,
  17. 'name': name_of_this_oc,
  18. 'instrument: the option associated with this event
  19. }
  20. EVENT_UNDERLYING_ADDED
  21. param = {'update_mode':
  22. 'name':
  23. 'instrument':
  24. '''
  25. EVENT_TICK_UPDATED = 'tds_event_tick_updated'
  26. EVENT_SYMBOL_ADDED = 'tds_event_symbol_added'
  27. EVENT_SYMBOL_DELETED = 'tds_event_symbol_deleted'
  28. TDS_EVENTS = [EVENT_TICK_UPDATED, EVENT_SYMBOL_ADDED, EVENT_SYMBOL_DELETED]
  29. def __init__(self, name):
  30. self.symbols = {}
  31. self.name = name
  32. self.lock = RLock()
  33. Publisher.__init__(self, TickDataStore.TDS_EVENTS)
  34. self.first_run = True
  35. def register_listener(self, listener):
  36. try:
  37. map(lambda e: self.register(e, listener, getattr(listener, e)), TickDataStore.TDS_EVENTS)
  38. except AttributeError as e:
  39. logging.error("TickDataStore:register_listener. Function not implemented in the listener. %s" % e)
  40. raise NotImplementedException
  41. def dump(self):
  42. def format_tick_val(val, fmt):
  43. if val == None:
  44. length = len(fmt % (0))
  45. return ' ' * length
  46. return fmt % (val)
  47. fmt_spec = '%8.2f'
  48. fmt_spec2 = '%8.4f'
  49. fmt_specq = '%8d'
  50. def get_field(sym, fld_id):
  51. try:
  52. return sym[0].get_tick_value(fld_id)
  53. except:
  54. return ''
  55. fmt_sym = map(lambda x: (x[0], '%s,%s,%s,%s,%s,%s,%s' % (
  56. format_tick_val(get_field(x[1]['syms'],Symbol.LAST), fmt_spec),
  57. format_tick_val(get_field(x[1]['syms'],Symbol.BIDSIZE), fmt_specq),
  58. format_tick_val(get_field(x[1]['syms'],Symbol.BID), fmt_spec),
  59. format_tick_val(get_field(x[1]['syms'],Symbol.ASK), fmt_spec),
  60. format_tick_val(get_field(x[1]['syms'],Symbol.ASKSIZE), fmt_specq),
  61. format_tick_val(get_field(x[1]['syms'],Symbol.CLOSE), fmt_spec),
  62. format_tick_val(get_field(x[1]['syms'],Symbol.VOLUME), fmt_specq),
  63. )), [(k,v) for k, v in self.symbols.iteritems()])
  64. print('%40s,%8s,%8s,%8s,%8s,%8s,%8s,%8s\n' % ('SYM', 'LAST', 'BIDSIZE','BID','ASK','ASKSIZE','CLOSE','VOLUME'
  65. ))
  66. for e in fmt_sym:
  67. print('[%s]%s' % (e[0].ljust(40), e[1]))
  68. def is_symbol_in_list(self, symbol, list):
  69. for s in list:
  70. if s is symbol:
  71. return True
  72. return False
  73. def add_symbol(self, symbol):
  74. try:
  75. dispatch = True
  76. self.lock.acquire()
  77. key = symbol.get_key()
  78. if key not in self.symbols:
  79. self.symbols[key] = {'syms': [symbol]}
  80. else:
  81. if not self.is_symbol_in_list(symbol, self.symbols[key]['syms']):
  82. self.symbols[key]['syms'].append(symbol)
  83. except KeyError:
  84. dispatch = False
  85. logging.error('TickDataStore: add_symbol. Exception when adding symbol:%s' % key)
  86. finally:
  87. self.lock.release()
  88. if dispatch:
  89. self.dispatch(TickDataStore.EVENT_SYMBOL_ADDED, {'update_mode': 'A',
  90. 'name': self.name,
  91. 'instrument' : symbol})
  92. def del_symbol(self, symbol):
  93. try:
  94. dispatch = True
  95. self.lock.acquire()
  96. key = symbol.get_key()
  97. if key not in self.symbols:
  98. return
  99. else:
  100. for s in self.symbols[key]['syms']:
  101. if s is symbol:
  102. self.symbols[key]['syms'].remove(s)
  103. except KeyError:
  104. dispatch = False
  105. logging.error('TickDataStore: del_symbol. Exception when deleting symbol:%s' % key)
  106. finally:
  107. self.lock.release()
  108. if dispatch:
  109. self.dispatch(TickDataStore.EVENT_SYMBOL_DELETED, {'update_mode': 'D',
  110. 'name': self.name,
  111. 'instrument' : symbol})
  112. def set_symbol_tick_price(self, contract_key, field, price, canAutoExecute):
  113. logging.debug('set_symbol_price: -------------------')
  114. try:
  115. self.lock.acquire()
  116. if contract_key in self.symbols:
  117. logging.debug('set_symbol_tick_price: ***** sym key= : %s' % contract_key)
  118. logging.debug('set_symbol_tick_price: ***** sym= : %s' % str(self.symbols[contract_key]['syms']))
  119. map(lambda e: e.set_tick_value(field, price), self.symbols[contract_key]['syms'])
  120. self.dispatch(TickDataStore.EVENT_TICK_UPDATED, {'contract_key': contract_key, 'field': field,
  121. 'price': price, 'syms': self.symbols[contract_key]['syms']})
  122. except:
  123. # contract not set up in the datastore, ignore message
  124. logging.error('tick_datastore:set_symbol_tick_price: exception occured to: %s. Exception could have been triggered due to the dispatched client processing logic' % contract_key)
  125. logging.error(traceback.format_exc())
  126. #self.dump()
  127. pass
  128. finally:
  129. self.lock.release()
  130. def set_symbol_analytics(self, contract_key, field, value):
  131. logging.debug('set_symbol_analytics: -------------------')
  132. try:
  133. self.lock.acquire()
  134. if contract_key in self.symbols:
  135. map(lambda e: e.set_tick_value(field, value), self.symbols[contract_key]['syms'])
  136. except:
  137. # contract not set up in the datastore, ignore message
  138. logging.error('set_symbol_price: exception occured to: %s' % contract_key)
  139. #self.dump()
  140. pass
  141. finally:
  142. self.lock.release()
  143. def set_symbol_tick_size(self, contract_key, field, size):
  144. logging.debug('set_symbol_size: -------------------')
  145. try:
  146. self.lock.acquire()
  147. if contract_key in self.symbols:
  148. map(lambda e: e.set_tick_value(field, size), self.symbols[contract_key]['syms'])
  149. except:
  150. # contract not set up in the datastore, ignore message
  151. logging.error('set_symbol_size: exception occured to: %s' % contract_key)
  152. #self.dump()
  153. pass
  154. finally:
  155. self.lock.release()
  156. #self.dispatch(TickDataStore.EVENT_TICK_UPDATED, {'contract_key': contract_key, 'field': field,
  157. # 'size': size})