bokeh.py 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238
  1. """Adapters for connecting time-series data to Bokeh visualizations
  2. To use Bokeh visualizations, do the following
  3. 1. Create an instance of BokehPlotManager
  4. 2. Register all plots with the BokehPlotManager using BokehPlot objects.
  5. BokehPlot objects provide the basic formatting of the plots.
  6. 3. Call BokehPlotManager's start() routine to start the visualization
  7. 4. Make BokehPlotManager subscribe to the event streams.
  8. TODO: Step 2 and step 4 should be combined into one
  9. TODO: Currently, we do not support BokehPlot's with multiple plots
  10. TODO: formatting, etc
  11. """
  12. """Define an event type for time-series data from sensors.
  13. from collections import namedtuple
  14. # Define a sensor event as a tuple of sensor id, timestamp, and value.
  15. # A 'sensor' is just a generator of sensor events.
  16. SensorEvent = namedtuple('SensorEvent', ['sensor_id', 'ts', 'val'])
  17. TODO: Write automated tests
  18. """
  19. import logging
  20. import functools
  21. from math import pi
  22. import threading, queue
  23. from bokeh.plotting import figure, curdoc
  24. from bokeh.layouts import column # to show two or more plots arranged in a column
  25. from bokeh.models import ColumnDataSource
  26. from bokeh.client import push_session
  27. from thingflow.base import Filter
  28. logger = logging.getLogger(__name__)
  29. TOOLS="pan,wheel_zoom,box_zoom,reset,save"
  30. tooltips=[
  31. ("Open", "@Open"),
  32. ("Close", "@Close"),
  33. ("High", "@High"),
  34. ("Low", "@Low"),
  35. ("Volume", "@Volume")
  36. ]
  37. def bokeh_timeseries_mapper(events):
  38. # a row is 'timestamp', 'datetime', 'sensor_id', 'value'
  39. ts = [ ]
  40. value = [ ]
  41. for r in events:
  42. t = float(r.ts)
  43. print(t)
  44. # dt = datetime.datetime.utcfromtimestamp(t)
  45. ts.append(t)
  46. # dttm.append(r['datetime'])
  47. value.append(r.val)
  48. return { 'timestamp' : ts, 'value' : value }
  49. def bokeh_default_mapper(csv):
  50. return csv
  51. class BokehPlotWorker(threading.Thread):
  52. def __init__(self, plotters):
  53. threading.Thread.__init__(self)
  54. self.plotters = plotters
  55. def update(self, name):
  56. print("In update")
  57. whichqueue = self.plotters[name]['queue']
  58. whichsource = self.plotters[name]['plot_specs'].source
  59. try:
  60. data = whichqueue.get_nowait()
  61. if data:
  62. # ts = datetime.datetime.fromtimestamp(data.ts)
  63. ts = (data.ts)
  64. val = data.val
  65. print('data = ', data)
  66. new_data = dict(timestamp=[ts], value=[val])
  67. print('newdata = ', new_data)
  68. whichsource.stream(new_data)
  69. except queue.Empty:
  70. pass
  71. def make_fig(self, plot_source):
  72. plot_specs = plot_source['plot_specs']
  73. p = figure(plot_height=400, tools=TOOLS, y_axis_location='left', title=plot_specs.name)
  74. p.xaxis.axis_label = plot_specs.x_axis_label
  75. p.yaxis.axis_label = plot_specs.y_axis_label
  76. p.x_range.follow = "end"
  77. p.x_range.follow_interval = 10
  78. p.x_range.range_padding = 0
  79. # p.xaxis.formatter=DatetimeTickFormatter(dict(seconds=["%S"],minutes=["%M"],hours=["%d %B %Y"],days=["%d %B %Y"],months=["%d %B %Y"],years=["%d %B %Y"]))
  80. p.xaxis.major_label_orientation = pi/4
  81. p.line(x=plot_specs.x_axis_label, y=plot_specs.y_axis_label, color="blue", source=plot_specs.source)
  82. p.circle(x=plot_specs.x_axis_label, y=plot_specs.y_axis_label, color="red", source=plot_specs.source)
  83. curdoc().add_periodic_callback(functools.partial(self.update, name=plot_specs.name), plot_specs.update_period) #period in ms
  84. return p
  85. def run(self):
  86. print("In thread.run")
  87. self.figs = [self.make_fig(self.plotters[name]) for name in self.plotters]
  88. self.session = push_session(curdoc())
  89. self.session.show(column(self.figs))
  90. curdoc().title = 'AntEvent Streams'
  91. self.session.loop_until_closed()
  92. class BokehPlot(object):
  93. def __init__(self, name, y_axis_label="", x_axis_label="timestamp", update_period_in_ms=500):
  94. self.name = name
  95. self.x_axis_label = x_axis_label
  96. self.y_axis_label = y_axis_label
  97. self.update_period = update_period_in_ms
  98. self.source = ColumnDataSource(dict({ self.x_axis_label: [], self.y_axis_label: []} ))
  99. class BokehPlotManager(Filter):
  100. def __init__(self):
  101. super().__init__()
  102. self.plotters = { }
  103. self.open_for_registration = True
  104. self.started = False
  105. def register(self, plot):
  106. if self.open_for_registration:
  107. self.plotters[plot.name] = { 'queue' : queue.Queue(), 'plot_specs' : plot }
  108. else:
  109. raise Exception("Bokeh Adapter: Plot manager does not dynamically add registrations.")
  110. def start(self):
  111. self.open_for_registration = False
  112. self.bokeh_plot_worker = BokehPlotWorker(self.plotters)
  113. self.bokeh_plot_worker.start()
  114. self.started = True
  115. def on_next(self, t):
  116. whichplot, data = t
  117. assert self.started, "BokehPlotManager: Data sent without initialization"
  118. if whichplot in self.plotters:
  119. self.plotters[whichplot]['queue'].put(data)
  120. else:
  121. raise Exception("Plot %s not found among registered plots", whichplot)
  122. def on_completed(self):
  123. exit(1)
  124. def on_error(self):
  125. pass
  126. # The following is deprecated. Use BokehPlotManager
  127. class BokehOutputWorker(threading.Thread):
  128. source = ColumnDataSource(dict(timestamp=[], value=[]))
  129. def __init__(self, sensor_id, datasource):
  130. threading.Thread.__init__(self)
  131. self.q = datasource
  132. self.title = sensor_id
  133. self.counter = 0
  134. def update(self):
  135. print("In update")
  136. try:
  137. data = self.q.get_nowait()
  138. if data:
  139. print('data = ', data)
  140. ts = data.ts
  141. val = data.val
  142. new_data = dict(timestamp=[ts], value=[val])
  143. self.source.stream(new_data, 300)
  144. self.counter = 0
  145. except queue.Empty:
  146. pass
  147. self.counter = self.counter + 1
  148. if self.counter == 10:
  149. exit(0)
  150. def run(self):
  151. print("In thread.run")
  152. self.p = figure(plot_height=500, tools=TOOLS, y_axis_location='left', title=self.title)
  153. self.p.x_range.follow = "end"
  154. self.p.xaxis.axis_label = "Timestamp"
  155. self.p.x_range.follow_interval = 100
  156. self.p.x_range.range_padding = 0
  157. self.p.line(x="timestamp", y="value", color="blue", source=self.source)
  158. self.p.circle(x="timestamp", y="value", color="red", source=self.source)
  159. self.session = push_session(curdoc())
  160. curdoc().add_periodic_callback(self.update, 100) #period in ms
  161. self.session.show(column(self.p))
  162. curdoc().title = 'Sensor'
  163. self.session.loop_until_closed()
  164. # def register(self, d, sourceq):
  165. # source = ColumnDataSource(dict(d))
  166. # self.p.line(x=d[0], y=d[1], color="orange", source=source)
  167. # curdoc().add_periodic_callback(self.update, 100) #period in ms
  168. class BokehStreamer(Filter):
  169. def __init__(self, initial_csv, io_loop=None):
  170. super().__init__()
  171. self.q = queue.Queue()
  172. self.bokeh_worker = BokehOutputWorker("Sensor", self.q)
  173. self.bokeh_worker.start()
  174. def on_next(self, x):
  175. print("next:", x)
  176. self.q.put(x)
  177. def on_completed(self):
  178. self.q.join()
  179. self.bokeh_worker.stop()
  180. self._dispatch_completed()
  181. def on_error(self, e):
  182. self.q.join()
  183. self._dispatch_error(e)
  184. def bokeh_output_streaming(csv):
  185. """Write an event stream to a Bokeh visualizer
  186. """
  187. BokehStreamer(csv)