我们从Python开源项目中,提取了以下4个代码示例,用于说明如何使用networkx.dijkstra_path()。
def _transform(self, source_type: Type[S], target_type: Type[T]) -> Tuple[Callable[[S], T], int]: try: LOGGER.info("Searching type graph for shortest path from \"{source_type}\" to \"{target_type}\"".format(source_type=source_type.__name__, target_type=target_type.__name__)) path = dijkstra_path(self._type_graph, source=source_type, target=target_type, weight="cost") LOGGER.info("Found a path from \"{source_type}\" to \"{target_type}\"".format(source_type=source_type.__name__, target_type=target_type.__name__)) except (KeyError, NetworkXNoPath): raise NoConversionError("Pipeline can't convert \"{source_type}\" to \"{target_type}\"".format(source_type=source_type, target_type=target_type)) LOGGER.info("Building transformer chain from \"{source_type}\" to \"{target_type}\"".format(source_type=source_type.__name__, target_type=target_type.__name__)) chain = [] cost = 0 for source, target in _pairwise(path): transformer = self._type_graph.adj[source][target][_TRANSFORMER] chain.append((transformer, target)) cost += transformer.cost LOGGER.info("Built transformer chain from \"{source_type}\" to \"{target_type}\"".format(source_type=source_type.__name__, target_type=target_type.__name__)) if not chain: return _identity, 0 return partial(_transform, transformer_chain=chain), cost
def shortest_path(graph, source, target): """Return the windowed shortest path between source and target in the given graph. Graph is expected to be a dict {node: {successors}}. Return value is a tuple of 2-tuple, each 2-tuple representing a window of size 2 on the path. """ if source == target: return tuple() # no move needed nxg = nx.Graph() for node, succs in graph.items(): for succ in succs: nxg.add_edge(node, succ) return tuple(nx.dijkstra_path(nxg, source, target))
def get_path(self,q0,qf): """ Searches for a path in the PRM between configurations q0 and qf. Returns a list of configuration tuples describing the path or [] if no path is found. """ q0 = np.reshape(np.array(q0),3) qf = np.reshape(np.array(qf),3) if all(q0 == qf): return [qf] n0 = len(self.G.node) nf = n0 + 1 # Add the start and end configs to G, so we can just search it self.G.add_node(n0,cfg=q0) self.G.add_node(nf,cfg=qf) for k in [n0,nf]: self._connect(k,DobotModel.forward_kinematics(self.G.node[k]['cfg'])) if not nx.has_path(self.G,n0,nf): path = [] # could not find a path else: nodes = nx.dijkstra_path(self.G,n0,nf,'weight') path = [self.G.node[k]['cfg'] for k in nodes] # Remove the start and end configs so G remains consistent with tree self.G.remove_node(n0) self.G.remove_node(nf) return path
def analysis_graph(self): graph = nx.Graph() for array in self.list_node: if array[0][0] == '3': d = 3 elif array[0][0] == '5': d = 2 elif array[0][0] == '?' and array[0][1]=='3': d = 2 elif '?' in array[0]: d = 3 else: d = 1 try: graph.add_edge(array[0], array[1], weight=array[2], capacity=math.ceil(60 / (24 // d + array[2]))) except ZeroDivisionError: graph.add_edge(array[0], array[1], weight=array[2], capacity=1000) time = Time() dictonary = {} for move in self.schedule: try: path = nx.dijkstra_path(graph, move[0], move[1]) time.hour = int(self.day_time.split('.')[0]) time.minute = int(self.day_time.split('.')[1]) time.second = 0 for j in range(1, len(path)): weight = (graph.edge[path[j - 1]][path[j]]['weight']) capacity = (graph.edge[path[j - 1]][path[j]]['capacity']) time.add(weight) string = time.string() + " " + path[j] + '/' + path[j-1] if string in dictonary.keys(): dictonary[string] += 1/capacity else: dictonary[string] = 1/capacity except nx.exception.NetworkXNoPath: print(move) except KeyError: print(move) finall = [] for key, value in dictonary.items(): if value > 2: finall.append(str(int(value)) + ' - ' + str(key)) finall.sort(key=lambda x: int(x.split(' - ')[0]), reverse=True) return finall