我们从Python开源项目中,提取了以下8个代码示例,用于说明如何使用bokeh.plotting.curdoc()。
def run(self): print("In thread.run") self.p = figure(plot_height=500, tools=TOOLS, y_axis_location='left', title=self.title) self.p.x_range.follow = "end" self.p.xaxis.axis_label = "Timestamp" self.p.x_range.follow_interval = 100 self.p.x_range.range_padding = 0 self.p.line(x="timestamp", y="value", color="blue", source=self.source) self.p.circle(x="timestamp", y="value", color="red", source=self.source) self.session = push_session(curdoc()) curdoc().add_periodic_callback(self.update, 100) #period in ms self.session.show(column(self.p)) curdoc().title = 'Sensor' self.session.loop_until_closed() # def register(self, d, sourceq): # source = ColumnDataSource(dict(d)) # self.p.line(x=d[0], y=d[1], color="orange", source=source) # curdoc().add_periodic_callback(self.update, 100) #period in ms
def __init__(self, map_size, team_size): # start bokeh server if it doesn't exist # note: there should be only one process to update the session, otherwise chaos # TODO: this process will not be closed when main process quit, but it doesn't matter # must wait this serve process to run up then we can draw try: requests.get("http://localhost:5006") except Exception as e: subprocess.Popen(['bokeh', 'serve'], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) print(">> waiting for bokeh server starting, 5 seconds...") time.sleep(5) print(">> here we go!") self.draw_init_plot(map_size, team_size) # curdoc().add_root(plt_combo) # open a session to keep our local document in sync with server session = push_session(curdoc()) session.show(self.plt_combo) # open the document in a browser # session.loop_until_closed() # run forever # def update_plots(self, env_state_action): # """update bokeh plots according to new env state and action data""" # curdoc().add_next_tick_callback(partial(self.update_bokeh_doc, env_state_action=env_state_action))
def _draw_plots(self, scaffolder): '''Setup all plots.''' self.contig_read_src = ColumnDataSource(dict( reads=[scaffolder.nrReads], contigs=[scaffolder.nrContigs], n50=[scaffolder.N50])) # Calculate data for contig circle plot circle = self._calculate_circle(scaffolder) self.contig_dist_src = ColumnDataSource(dict( start=circle[0], stop=circle[1], colors=circle[2], contigs=circle[3])) self.read_src = ColumnDataSource(dict( nrReads=[], nrPassReads=[], nrFailReads=[], readTime=[])) self.read_hist_src = ColumnDataSource(dict( readLength=[], left=[], right=[])) # Draw plots contigNrPlot = self._draw_contigNrPlot(scaffolder) n50Plot = self._draw_n50Plot() contigCirclePlot = self._draw_contigCirclePlot() readPlot = self._draw_readCountPlot() #readHist = self._draw_readLenHistPlot() # Position plots layout = gridplot([[n50Plot, contigNrPlot], [contigCirclePlot, readPlot]]) try: session = push_session(curdoc()) session.show(layout) except IOError: sys.exit("No bokeh server is running on this host")
def make_fig(self, plot_source): plot_specs = plot_source['plot_specs'] p = figure(plot_height=400, tools=TOOLS, y_axis_location='left', title=plot_specs.name) p.xaxis.axis_label = plot_specs.x_axis_label p.yaxis.axis_label = plot_specs.y_axis_label p.x_range.follow = "end" p.x_range.follow_interval = 10 p.x_range.range_padding = 0 # p.xaxis.formatter=DatetimeTickFormatter(dict(seconds=["%S"],minutes=["%M"],hours=["%d %B %Y"],days=["%d %B %Y"],months=["%d %B %Y"],years=["%d %B %Y"])) p.xaxis.major_label_orientation = pi/4 p.line(x=plot_specs.x_axis_label, y=plot_specs.y_axis_label, color="blue", source=plot_specs.source) p.circle(x=plot_specs.x_axis_label, y=plot_specs.y_axis_label, color="red", source=plot_specs.source) curdoc().add_periodic_callback(functools.partial(self.update, name=plot_specs.name), plot_specs.update_period) #period in ms return p
def run(self): print("In thread.run") self.figs = [self.make_fig(self.plotters[name]) for name in self.plotters] self.session = push_session(curdoc()) self.session.show(column(self.figs)) curdoc().title = 'AntEvent Streams' self.session.loop_until_closed()