test_bokeh.py 2.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677
  1. # Make sure bokeh server is running
  2. # bokeh serve
  3. import asyncio
  4. from utils import ValueListSensor, ValidationSubscriber
  5. from antevents.base import Scheduler, SensorPub, SensorEvent
  6. from antevents.linq.map import map
  7. from antevents.adapters.bokeh import bokeh_output_streaming, BokehStreamer, BokehPlot, BokehPlotManager
  8. import datetime, time
  9. def mk_csv():
  10. sid = 'temp'
  11. val = 0
  12. csv = [ ]
  13. for i in range(3):
  14. ts = time.mktime(datetime.datetime.now().timetuple())
  15. dt = ts
  16. val = val + 1
  17. csv.append([SensorEvent(ts=ts, sensor_id=sid, val=val)])
  18. time.sleep(1)
  19. return csv
  20. def debug():
  21. csv = mk_csv()
  22. bokeh_output(csv)
  23. bokeh_output_streaming(csv)
  24. value_stream = [10, 13, 20, 20, 19, 19, 20, 21, 28, 28, 23, 21, 21, 18, 19, 16, 21,
  25. 10, 13, 20, 20, 19, 19, 20, 21, 28, 28, 23, 21, 21, 18, 19, 16, 21]
  26. value_stream2 = [2, 3, 2, 2, 9, 9, 2, 1, 8, 8, 3, 2, 1, 8, 9, 6, 2, 2, 3, 4, 5, 6, 7, 8, 2, 3, 4,
  27. 6, 2, 3, 2, 2, 9, 9, 2, 1, 8, 8, 3, 2, 1, 8, 9, 6, 2, 2, 3, 4, 5, 6, 7, 8, 2, 3, 4, 6,
  28. 6, 2, 3, 2, 2, 9, 9, 2, 1, 8, 8, 3, 2, 1, 8, 9, 6, 2, 2, 3, 4, 5, 6, 7, 8, 2, 3, 4, 6,
  29. 6, 2, 3, 2, 2, 9, 9, 2, 1, 8, 8, 3, 2, 1, 8, 9, 6, 2, 2, 3, 4, 5, 6, 7, 8, 2, 3, 4, 6,
  30. 6, 2, 3, 2, 2, 9, 9, 2, 1, 8, 8, 3, 2, 1, 8, 9, 6, 2, 2, 3, 4, 5, 6, 7, 8, 2, 3, 4, 6]
  31. def test_bokeh_output():
  32. loop = asyncio.get_event_loop()
  33. s = ValueListSensor(1, value_stream)
  34. p = SensorPub(s)
  35. b = BokehStreamer([ SensorEvent(ts=0,val=10,sensor_id="temp" ) ], io_loop=loop)
  36. p.subscribe(b)
  37. scheduler = Scheduler(loop)
  38. scheduler.schedule_periodic(p, 0.5) # sample twice every second
  39. scheduler.run_forever()
  40. self.assertTrue(vo.completed,
  41. "Schedule exited before validation observer completed")
  42. print("That's all folks")
  43. def test_bokeh_manager():
  44. loop = asyncio.get_event_loop()
  45. s1 = ValueListSensor(1, value_stream)
  46. p1 = SensorPub(s1)
  47. s2 = ValueListSensor(1, value_stream2)
  48. p2 = SensorPub(s2)
  49. bm = BokehPlotManager()
  50. bplot1 = BokehPlot('Sensor1', y_axis_label='value')
  51. bplot2 = BokehPlot('Sensor2', y_axis_label='value')
  52. bm.register(bplot1)
  53. bm.register(bplot2)
  54. p1.map(lambda v: ('Sensor1', v) ).subscribe(bm)
  55. p2.map(lambda v: ('Sensor2', v) ).subscribe(bm)
  56. bm.start()
  57. scheduler = Scheduler(loop)
  58. scheduler.schedule_periodic(p1, 1.0) # sample every second
  59. scheduler.schedule_periodic(p2, 0.5) # sample twice every second
  60. scheduler.run_forever()
  61. # self.assertTrue(vo.completed,
  62. # "Schedule exited before validation observer completed")
  63. print("That's all folks")
  64. if __name__ == "__main__":
  65. # test_bokeh_output()
  66. test_bokeh_manager()