tick_datastore.py 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220
  1. import logging
  2. import json
  3. from threading import RLock
  4. from misc2.observer import Publisher
  5. from misc2.observer import NotImplementedException
  6. from misc2.helpers import ContractHelper
  7. from comms.ibc.base_client_messaging import AbstractGatewayListener
  8. import symbol
  9. class TickDataStore(Publisher):
  10. """
  11. Data structure:
  12. """
  13. '''
  14. EVENT_TICK_UPDATED
  15. param = {'update_mode': A|D|U <- add/udpate/delete,
  16. 'name': name_of_this_oc,
  17. 'instrument: the option associated with this event
  18. }
  19. EVENT_UNDERLYING_ADDED
  20. param = {'update_mode':
  21. 'name':
  22. 'instrument':
  23. '''
  24. EVENT_TICK_UPDATED = 'tds_event_tick_updated'
  25. EVENT_SYMBOL_ADDED = 'tds_event_symbol_added'
  26. EVENT_SYMBOL_DELETED = 'tds_event_symbol_deleted'
  27. TDS_EVENTS = [EVENT_TICK_UPDATED, EVENT_SYMBOL_ADDED, EVENT_SYMBOL_DELETED]
  28. def __init__(self, name):
  29. self.symbols = {}
  30. self.name = name
  31. self.lock = RLock()
  32. Publisher.__init__(self, TickDataStore.TDS_EVENTS)
  33. self.first_run = True
  34. def register_listener(self, listener):
  35. try:
  36. map(lambda e: self.register(e, listener, getattr(listener, e)), TickDataStore.TDS_EVENTS)
  37. except AttributeError as e:
  38. logging.error("TickDataStore:register_listener. Function not implemented in the listener. %s" % e)
  39. raise NotImplementedException
  40. def dump(self):
  41. def format_tick_val(val, fmt):
  42. if val == None:
  43. length = len(fmt % (0))
  44. return ' ' * length
  45. return fmt % (val)
  46. # last, bidq, bid, ask, askq, imvol, delta, theta
  47. fmt_spec = '%8.2f'
  48. fmt_spec2 = '%8.4f'
  49. fmt_specq = '%8d'
  50. def get_field(sym, fld_id):
  51. try:
  52. return sym[0].get_tick_value(fld_id)
  53. except:
  54. return ''
  55. fmt_sym = map(lambda x: (x[0], '%s,%s,%s,%s,%s,%s' % (
  56. format_tick_val(get_field(x[1]['syms'],4), fmt_spec),
  57. format_tick_val(get_field(x[1]['syms'],0), fmt_specq),
  58. format_tick_val(get_field(x[1]['syms'],1), fmt_spec),
  59. format_tick_val(get_field(x[1]['syms'],2), fmt_spec),
  60. format_tick_val(get_field(x[1]['syms'],3), fmt_specq),
  61. format_tick_val(get_field(x[1]['syms'],9), fmt_spec),
  62. )), [(k,v) for k, v in self.symbols.iteritems()])
  63. for e in fmt_sym:
  64. print('[%s]%s' % (e[0].ljust(40), e[1]))
  65. def is_symbol_in_list(self, symbol, list):
  66. for s in list:
  67. if s is symbol:
  68. return True
  69. return False
  70. def add_symbol(self, symbol):
  71. try:
  72. dispatch = True
  73. self.lock.acquire()
  74. key = symbol.get_key()
  75. if key not in self.symbols:
  76. self.symbols[key] = {'syms': [symbol]}
  77. else:
  78. if not self.is_symbol_in_list(symbol, self.symbols[key]['syms']):
  79. self.symbols[key]['syms'].append(symbol)
  80. except KeyError:
  81. dispatch = False
  82. logging.error('TickDataStore: add_symbol. Exception when adding symbol:%s' % key)
  83. finally:
  84. self.lock.release()
  85. if dispatch:
  86. self.dispatch(TickDataStore.EVENT_SYMBOL_ADDED, {'update_mode': 'A',
  87. 'name': self.name,
  88. 'instrument' : symbol})
  89. def del_symbol(self, symbol):
  90. try:
  91. dispatch = True
  92. self.lock.acquire()
  93. key = symbol.get_key()
  94. if key not in self.symbols:
  95. return
  96. else:
  97. for s in self.symbols[key]['syms']:
  98. if s is symbol:
  99. self.symbols[key]['syms'].remove(s)
  100. except KeyError:
  101. dispatch = False
  102. logging.error('TickDataStore: del_symbol. Exception when deleting symbol:%s' % key)
  103. finally:
  104. self.lock.release()
  105. if dispatch:
  106. self.dispatch(TickDataStore.EVENT_SYMBOL_DELETED, {'update_mode': 'D',
  107. 'name': self.name,
  108. 'instrument' : symbol})
  109. def set_symbol_tick_price(self, contract_key, field, price, canAutoExecute):
  110. logging.debug('set_symbol_price: -------------------')
  111. try:
  112. self.lock.acquire()
  113. if contract_key in self.symbols:
  114. logging.info('set_symbol_tick_price: ***** sym key= : %s' % contract_key)
  115. logging.info('set_symbol_tick_price: ***** sym= : %s' % str(self.symbols[contract_key]['syms']))
  116. map(lambda e: e.set_tick_value(field, price), self.symbols[contract_key]['syms'])
  117. logging.info('set_symbol_tick_price: <<<<<<<<<')
  118. self.dispatch(TickDataStore.EVENT_TICK_UPDATED, {'contract_key': contract_key, 'field': field,
  119. 'price': price, 'syms': self.symbols[contract_key]['syms']})
  120. except:
  121. # contract not set up in the datastore, ignore message
  122. logging.error('set_symbol_tick_price: exception occured to: %s' % contract_key)
  123. #self.dump()
  124. pass
  125. finally:
  126. self.lock.release()
  127. def set_symbol_analytics(self, contract_key, field, value):
  128. logging.debug('set_symbol_analytics: -------------------')
  129. try:
  130. self.lock.acquire()
  131. if contract_key in self.symbols:
  132. map(lambda e: e.set_tick_value(field, value), self.symbols[contract_key]['syms'])
  133. except:
  134. # contract not set up in the datastore, ignore message
  135. logging.error('set_symbol_price: exception occured to: %s' % contract_key)
  136. #self.dump()
  137. pass
  138. finally:
  139. self.lock.release()
  140. def set_symbol_tick_size(self, contract_key, field, size):
  141. logging.debug('set_symbol_size: -------------------')
  142. try:
  143. self.lock.acquire()
  144. if contract_key in self.symbols:
  145. map(lambda e: e.set_tick_value(field, size), self.symbols[contract_key]['syms'])
  146. except:
  147. # contract not set up in the datastore, ignore message
  148. logging.error('set_symbol_size: exception occured to: %s' % contract_key)
  149. #self.dump()
  150. pass
  151. finally:
  152. self.lock.release()
  153. #self.dispatch(TickDataStore.EVENT_TICK_UPDATED, {'contract_key': contract_key, 'field': field,
  154. # 'size': size})