我们从Python开源项目中,提取了以下16个代码示例,用于说明如何使用networkx.all_shortest_paths()。
def findPaths(Graph, source, des, pathLen): #for path like V1//V2 if pathLen == 0: #Old version find paths: #while True: #pathLen += 1 #print source, des #paths = nx.all_simple_paths(Graph, source, des, pathLen) #if (len(pathList) != 0) or (pathLen == 3) : #if (pathLen == 3) : #pathLen = 0 #break #New version find paths: paths = nx.all_shortest_paths(Graph, source, des) pathList = list(paths) return pathList #for path like V1/./V2 with specific length else: paths = nx.all_simple_paths(Graph, source, des, pathLen) pathList = list(paths) return pathList
def findshortestpath(self, obj0SRotmat4, obj1SRotmat4): self.__addEnds(obj0SRotmat4, obj1SRotmat4) # startrgt goalrgt if len(self.regghalf[0].endnodeids) > 0 and len(self.regghalf[1].endnodeids) > 0: startgrip = self.regghalf[0].endnodeids[0] goalgrip = self.regghalf[1].endnodeids[0] shortestpaths = nx.all_shortest_paths(self.regg, source = startgrip, target = goalgrip) self.directshortestpaths = [] print shortestpaths for path in shortestpaths: for i, pathnode in enumerate(path): if pathnode.startswith('endrgt') and i < len(path)-1: continue else: self.directshortestpaths.append(path[i-1:]) break for i, pathnode in enumerate(self.directshortestpaths[-1]): if i > 0 and pathnode.startswith('endlft'): self.directshortestpaths[-1]=self.directshortestpaths[-1][:i+1] break
def pre_compute_paths(self, G): """ Computes paths between all hosts in the network. It computes up to PATH_LIMIT paths per each host. All computed paths are saved to a path lookup table path_table.py Args: G: networkx graph containing the topology of the network. """ host_combinations = itertools.permutations(self.hosts, 2) for src, dst in host_combinations: paths_generator = nx.all_shortest_paths(G, src.dpid, dst.dpid) counter = 0 for path in paths_generator: if counter > PATH_LIMIT: break # counter += 1 # TODO de-comment for big topologies path = Path(src.dpid, src.port, dst.dpid, dst.port, path) self.path_table.put_path(path = path, src = src.dpid, dst = dst.dpid)
def get_nodes_in_all_shortest_paths(graph, nodes, weight=None): """Gets all shortest paths from all nodes to all other nodes in the given list and returns the set of all nodes contained in those paths using :func:`networkx.all_shortest_paths`. :param pybel.BELGraph graph: A BEL graph :param iter[tuple] nodes: The list of nodes to use to use to find all shortest paths :param str weight: Edge data key corresponding to the edge weight. If none, uses unweighted search. :return: A set of nodes appearing in the shortest paths between nodes in the BEL graph :rtype: set[tuple] .. note:: This can be trivially parallelized using :func:`networkx.single_source_shortest_path` """ shortest_paths_nodes_iterator = _get_nodes_in_all_shortest_paths_helper(graph, nodes, weight=weight) return set(itt.chain.from_iterable(shortest_paths_nodes_iterator)) # TODO consider all shortest paths?
def _check_rule_typing(self, rule_id, graph_id, lhs_mapping, rhs_mapping): all_paths = nx.all_pairs_shortest_path(self) paths_from_target = {} for s in self.nodes(): if s == graph_id: for key in all_paths[graph_id].keys(): paths_from_target[key] = all_paths[graph_id][key] for t in paths_from_target.keys(): if t != graph_id: new_lhs_h = compose( lhs_mapping, self.compose_path_typing(paths_from_target[t])) new_rhs_h = compose( rhs_mapping, self.compose_path_typing(paths_from_target[t])) try: # find homomorphisms from s to t via other paths s_t_paths = nx.all_shortest_paths(self, rule_id, t) for path in s_t_paths: lhs_h, rhs_h = self.compose_path_typing(path) if lhs_h != new_lhs_h: raise HierarchyError( "Invalid lhs typing: homomorphism does not commute with an existing " + "path from '%s' to '%s'!" % (s, t) ) if rhs_h != new_rhs_h: raise HierarchyError( "Invalid rhs typing: homomorphism does not commute with an existing " + "path from '%s' to '%s'!" % (s, t) ) except(nx.NetworkXNoPath): pass return
def get_shortest_paths(self, source, target): """Compute all shortest paths in the graph. Returns: generator of lists: A generator of all paths between source and target. """ if not isaddress(source) or not isaddress(target): raise ValueError('both source and target must be valid addresses') return networkx.all_shortest_paths(self.graph, source, target)
def test_closure_and_cclosure_against_networkx(): """ Test 'clusure' and 'cclosure' on 'metric' againt the NetworkX shortest_path (Rion's testing) """ G = nx.Graph() G.add_nodes_from([0,1,2,3,4]) G.add_edges_from([(0,1), (1,2), (2,3), (3,4)], weight=0.1) G.add_edges_from([(0,4)], weight=0.8) # Extract Adjacency Matrix from G A = nx.adjacency_matrix(G) # Transform distance into a similarity x = np.ravel(A[A > 0]) A[A > 0] = (1.0 / (x + 1.0)) for n1, n2 in combinations(G.nodes(),2): # Tests all three methods of computing all shortest paths ('closure','cclosure', and 'nx.all_shortest_paths') c_dist, c_paths = clo.closure(A, source=n1, target=n2, kind='metric') c_paths = [n for n in c_paths] # convers numbers to letters cc_dist, cc_paths = clo.cclosure(A, source=n1, target=n2, retpath=1, kind='metric') cc_paths = [n for n in cc_paths] if cc_paths is not None else '' nx_paths = list(nx.all_shortest_paths(G, source=n1, target=n2, weight='weight'))[0] assert nx_paths == c_paths, "NetworkX and Python 'closure' differ" assert nx_paths == cc_paths, "NetworkX and Cython 'cclosure' differ" assert c_paths == cc_paths, "Python 'closure' and Cython 'cclosure' differ"
def findshortestpath(self, startrotmat4, goalrotmat4, base): self.__addstartgoal(startrotmat4, goalrotmat4, base) # startgrip = random.select(self.startnodeids) # goalgrip = random.select(self.goalnodeids) startgrip = self.startnodeids[0] goalgrip = self.goalnodeids[0] self.shortestpaths = nx.all_shortest_paths(self.regg, source = startgrip, target = goalgrip) self.directshortestpaths = [] # directshortestpaths removed the repeated start and goal transit try: for path in self.shortestpaths: print path for i, pathnode in enumerate(path): if pathnode.startswith('start') and i < len(path)-1: continue else: self.directshortestpaths.append(path[i-1:]) break for i, pathnode in enumerate(self.directshortestpaths[-1]): if i > 0 and pathnode.startswith('goal'): self.directshortestpaths[-1]=self.directshortestpaths[-1][:i+1] break except: print "No path found!" pass
def _check_rule_typing(self, rule_id, graph_id, lhs_mapping, rhs_mapping): all_paths = nx.all_pairs_shortest_path(self) paths_from_target = {} for s in self.nodes(): if s == graph_id: for key in all_paths[graph_id].keys(): paths_from_target[key] = all_paths[graph_id][key] for t in paths_from_target.keys(): if t != graph_id: new_lhs_h = compose( lhs_mapping, self.compose_path_typing(paths_from_target[t])) new_rhs_h = compose( rhs_mapping, self.compose_path_typing(paths_from_target[t])) try: # find homomorphisms from s to t via other paths s_t_paths = nx.all_shortest_paths(self, rule_id, t) for path in s_t_paths: lhs_h, rhs_h = self.compose_path_typing(path) if lhs_h != new_lhs_h: raise HierarchyError( "Invalid lhs typing: homomorphism does not " "commute with an existing " "path from '%s' to '%s'!" % (s, t) ) if rhs_h != new_rhs_h: raise HierarchyError( "Invalid rhs typing: homomorphism does not " "commute with an existing " + "path from '%s' to '%s'!" % (s, t) ) except(nx.NetworkXNoPath): pass return
def all_paths(self, G, src=None, dst=None, src_dpid=None, dst_dpid=None): """ For a given source src and destination dst, compute all shortest paths. Args: G: graph. src: source obj. Defaults to None. dst: destination obj. Defaults to None. src_dpid: source dpid. Defaults to None. dst_dpid: dst dpid. Defaults to None. Returns: List of all paths between src and dst. If only src_dpid and dst_dpid were given it returns a list of paths defined as a list of nodes and not path objects. """ if src_dpid and dst_dpid and not src and not dst: path_list = [] for path in nx.all_shortest_paths(G, src_dpid, dst_dpid): path_list.append(path) return path_list src_dpid = src.dpid dst_dpid = dst.dpid # print "Fetching all paths between {}-{}".format(src, dst) if self.path_table.has_path(src_dpid, dst_dpid): "Using cached path" return [p for p in self.path_table.get_path(src_dpid, dst_dpid)] else: "Compute and cache new path" print "Inserting new path in cache!" for path in nx.all_shortest_paths(G, src_dpid, dst_dpid): pathObj = Path.of(src, dst, path) print pathObj self.path_table.put_path(pathObj, src_dpid, dst_dpid) return self.path_table.get_path(src_dpid, dst_dpid)
def findSwitchRoute(): pathKey = "" nodeList = [] src = int(switch[h2].split(":",7)[7],16) dst = int(switch[h1].split(":",7)[7],16) print src print dst for currentPath in nx.all_shortest_paths(G, source=src, target=dst, weight=None): for node in currentPath: tmp = "" if node < 17: pathKey = pathKey + "0" + str(hex(node)).split("x",1)[1] + "::" tmp = "00:00:00:00:00:00:00:0" + str(hex(node)).split("x",1)[1] else: pathKey = pathKey + str(hex(node)).split("x",1)[1] + "::" tmp = "00:00:00:00:00:00:00:" + str(hex(node)).split("x",1)[1] nodeList.append(tmp) pathKey=pathKey.strip("::") path[pathKey] = nodeList pathKey = "" nodeList = [] print path # Computes Link TX
def get_shortest_paths(self, src, dst, with_weight=True): if with_weight: return nx.all_shortest_paths(self.graph,source=src,target=dst, weight='delay') else: return nx.all_shortest_paths(self.graph,source=src,target=dst)
def _get_nodes_in_all_shortest_paths_helper(graph, nodes, weight=None): for u, v in product(nodes, repeat=2): try: paths = all_shortest_paths(graph, u, v, weight=weight) for path in paths: yield path except nx.exception.NetworkXNoPath: continue
def run_cna(graph, root, targets, relationship_dict=None): """ Returns the effect from the root to the target nodes represented as {-1,1} :param pybel.BELGraph graph: A BEL graph :param tuple root: The root node :param iter targets: The targets nodes :param dict relationship_dict: dictionary with relationship effects :return list[tuple]: """ causal_effects = [] relationship_dict = causal_effect_dict if relationship_dict is None else relationship_dict for target in targets: try: shortest_paths = nx.all_shortest_paths(graph, source=root, target=target) effects_in_path = set() for shortest_path in shortest_paths: effects_in_path.add(get_path_effect(graph, shortest_path, relationship_dict)) if len(effects_in_path) == 1: causal_effects.append((root, target, next(iter(effects_in_path)))) # Append the only predicted effect elif Effect.activation in effects_in_path and Effect.inhibition in effects_in_path: causal_effects.append((root, target, Effect.ambiguous)) elif Effect.activation in effects_in_path and Effect.inhibition not in effects_in_path: causal_effects.append((root, target, Effect.activation)) elif Effect.inhibition in effects_in_path and Effect.activation not in effects_in_path: causal_effects.append((root, target, Effect.inhibition)) else: log.warning('Exception in set: {}.'.format(effects_in_path)) except nx.NetworkXNoPath: log.warning('No shortest path between: {} and {}.'.format(root, target)) return causal_effects