| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238 |
- """Adapters for connecting time-series data to Bokeh visualizations
- To use Bokeh visualizations, do the following
- 1. Create an instance of BokehPlotManager
- 2. Register all plots with the BokehPlotManager using BokehPlot objects.
- BokehPlot objects provide the basic formatting of the plots.
- 3. Call BokehPlotManager's start() routine to start the visualization
- 4. Make BokehPlotManager subscribe to the event streams.
- TODO: Step 2 and step 4 should be combined into one
- TODO: Currently, we do not support BokehPlot's with multiple plots
- TODO: formatting, etc
- """
- """Define an event type for time-series data from sensors.
- from collections import namedtuple
- # Define a sensor event as a tuple of sensor id, timestamp, and value.
- # A 'sensor' is just a generator of sensor events.
- SensorEvent = namedtuple('SensorEvent', ['sensor_id', 'ts', 'val'])
- TODO: Write automated tests
- """
- import logging
- import functools
- from math import pi
- import threading, queue
- from bokeh.plotting import figure, curdoc
- from bokeh.layouts import column # to show two or more plots arranged in a column
- from bokeh.models import ColumnDataSource
- from bokeh.client import push_session
- from thingflow.base import Filter
- logger = logging.getLogger(__name__)
- TOOLS="pan,wheel_zoom,box_zoom,reset,save"
- tooltips=[
- ("Open", "@Open"),
- ("Close", "@Close"),
- ("High", "@High"),
- ("Low", "@Low"),
- ("Volume", "@Volume")
- ]
- def bokeh_timeseries_mapper(events):
- # a row is 'timestamp', 'datetime', 'sensor_id', 'value'
- ts = [ ]
- value = [ ]
- for r in events:
- t = float(r.ts)
- print(t)
- # dt = datetime.datetime.utcfromtimestamp(t)
- ts.append(t)
- # dttm.append(r['datetime'])
- value.append(r.val)
- return { 'timestamp' : ts, 'value' : value }
-
- def bokeh_default_mapper(csv):
- return csv
- class BokehPlotWorker(threading.Thread):
- def __init__(self, plotters):
- threading.Thread.__init__(self)
- self.plotters = plotters
- def update(self, name):
- print("In update")
- whichqueue = self.plotters[name]['queue']
- whichsource = self.plotters[name]['plot_specs'].source
- try:
- data = whichqueue.get_nowait()
- if data:
- # ts = datetime.datetime.fromtimestamp(data.ts)
- ts = (data.ts)
- val = data.val
- print('data = ', data)
- new_data = dict(timestamp=[ts], value=[val])
- print('newdata = ', new_data)
- whichsource.stream(new_data)
- except queue.Empty:
- pass
- def make_fig(self, plot_source):
- plot_specs = plot_source['plot_specs']
- p = figure(plot_height=400, tools=TOOLS, y_axis_location='left', title=plot_specs.name)
- p.xaxis.axis_label = plot_specs.x_axis_label
- p.yaxis.axis_label = plot_specs.y_axis_label
- p.x_range.follow = "end"
- p.x_range.follow_interval = 10
- p.x_range.range_padding = 0
- # p.xaxis.formatter=DatetimeTickFormatter(dict(seconds=["%S"],minutes=["%M"],hours=["%d %B %Y"],days=["%d %B %Y"],months=["%d %B %Y"],years=["%d %B %Y"]))
- p.xaxis.major_label_orientation = pi/4
- p.line(x=plot_specs.x_axis_label, y=plot_specs.y_axis_label, color="blue", source=plot_specs.source)
- p.circle(x=plot_specs.x_axis_label, y=plot_specs.y_axis_label, color="red", source=plot_specs.source)
- curdoc().add_periodic_callback(functools.partial(self.update, name=plot_specs.name), plot_specs.update_period) #period in ms
- return p
- def run(self):
- print("In thread.run")
- self.figs = [self.make_fig(self.plotters[name]) for name in self.plotters]
- self.session = push_session(curdoc())
- self.session.show(column(self.figs))
- curdoc().title = 'AntEvent Streams'
- self.session.loop_until_closed()
- class BokehPlot(object):
- def __init__(self, name, y_axis_label="", x_axis_label="timestamp", update_period_in_ms=500):
- self.name = name
- self.x_axis_label = x_axis_label
- self.y_axis_label = y_axis_label
- self.update_period = update_period_in_ms
- self.source = ColumnDataSource(dict({ self.x_axis_label: [], self.y_axis_label: []} ))
- class BokehPlotManager(Filter):
- def __init__(self):
- super().__init__()
- self.plotters = { }
- self.open_for_registration = True
- self.started = False
- def register(self, plot):
- if self.open_for_registration:
- self.plotters[plot.name] = { 'queue' : queue.Queue(), 'plot_specs' : plot }
- else:
- raise Exception("Bokeh Adapter: Plot manager does not dynamically add registrations.")
- def start(self):
- self.open_for_registration = False
- self.bokeh_plot_worker = BokehPlotWorker(self.plotters)
- self.bokeh_plot_worker.start()
- self.started = True
- def on_next(self, t):
- whichplot, data = t
- assert self.started, "BokehPlotManager: Data sent without initialization"
- if whichplot in self.plotters:
- self.plotters[whichplot]['queue'].put(data)
- else:
- raise Exception("Plot %s not found among registered plots", whichplot)
- def on_completed(self):
- exit(1)
- def on_error(self):
- pass
- # The following is deprecated. Use BokehPlotManager
- class BokehOutputWorker(threading.Thread):
- source = ColumnDataSource(dict(timestamp=[], value=[]))
- def __init__(self, sensor_id, datasource):
- threading.Thread.__init__(self)
- self.q = datasource
- self.title = sensor_id
- self.counter = 0
- def update(self):
- print("In update")
- try:
- data = self.q.get_nowait()
- if data:
- print('data = ', data)
- ts = data.ts
- val = data.val
- new_data = dict(timestamp=[ts], value=[val])
- self.source.stream(new_data, 300)
- self.counter = 0
- except queue.Empty:
- pass
- self.counter = self.counter + 1
- if self.counter == 10:
- exit(0)
- def run(self):
- print("In thread.run")
- self.p = figure(plot_height=500, tools=TOOLS, y_axis_location='left', title=self.title)
- self.p.x_range.follow = "end"
- self.p.xaxis.axis_label = "Timestamp"
- self.p.x_range.follow_interval = 100
- self.p.x_range.range_padding = 0
- self.p.line(x="timestamp", y="value", color="blue", source=self.source)
- self.p.circle(x="timestamp", y="value", color="red", source=self.source)
- self.session = push_session(curdoc())
- curdoc().add_periodic_callback(self.update, 100) #period in ms
- self.session.show(column(self.p))
- curdoc().title = 'Sensor'
- self.session.loop_until_closed()
- # def register(self, d, sourceq):
- # source = ColumnDataSource(dict(d))
- # self.p.line(x=d[0], y=d[1], color="orange", source=source)
- # curdoc().add_periodic_callback(self.update, 100) #period in ms
- class BokehStreamer(Filter):
- def __init__(self, initial_csv, io_loop=None):
- super().__init__()
- self.q = queue.Queue()
- self.bokeh_worker = BokehOutputWorker("Sensor", self.q)
- self.bokeh_worker.start()
- def on_next(self, x):
- print("next:", x)
- self.q.put(x)
- def on_completed(self):
- self.q.join()
- self.bokeh_worker.stop()
- self._dispatch_completed()
- def on_error(self, e):
- self.q.join()
- self._dispatch_error(e)
- def bokeh_output_streaming(csv):
- """Write an event stream to a Bokeh visualizer
- """
- BokehStreamer(csv)
|