我们从Python开源项目中,提取了以下18个代码示例,用于说明如何使用networkx.has_path()。
def add_cycle_edges_by_path(g,number_of_edges,path_length = 5): number = 0 num_nodes = g.number_of_nodes() nodes = g.nodes() extra_edges = [] while number < number_of_edges: u,v = np.random.randint(0,num_nodes,2) u = nodes[u] v = nodes[v] if nx.has_path(g,u,v): length = nx.shortest_path_length(g,source = u,target = v) if length <= path_length: extra_edges.append((v,u)) number += 1 if nx.has_path(g,v,u): length = nx.shortest_path_length(g,source = v,target = u) if length <= path_length: extra_edges.append((u,v)) number += 1 print("# extra edges added with path length <= %d: %d" % (path_length,len(extra_edges))) return extra_edges
def add_extra_edges(g,number_of_edges): number = 0 num_nodes = g.number_of_nodes() nodes = g.nodes() extra_edges = set() while len(extra_edges) < number_of_edges: u,v = np.random.randint(0,num_nodes,2) u = nodes[u] v = nodes[v] if nx.has_path(g,u,v): if (v,u) not in extra_edges: extra_edges.add((v,u)) if nx.has_path(g,v,u): if (u,v) not in extra_edges: extra_edges.add((u,v)) extra_edges = list(extra_edges) print("# extra edges added (path lenght unconstrainted): %d" % (len(extra_edges))) return extra_edges
def _all_simple_paths_multigraph(G, source, target, cutoff=None): if cutoff < 1: return visited = [source] stack = [(v for u,v,k in list(G.edges(source, keys=True)))] while stack: children = stack[-1] child = next(children, None) if child is None: stack.pop() visited.pop() elif nx.has_path(G, child, target) == False: # added kaklise pass elif len(visited) < cutoff: if child == target: yield visited + [target] elif child not in visited: visited.append(child) stack.append((v for u,v in list(G.edges(child)))) else: #len(visited) == cutoff: count = ([child]+list(children)).count(target) for i in range(count): yield visited + [target] stack.pop() visited.pop()
def hasLogicalError(self): for type in self.types: for charge_type in ['X','Z']: LogicalLattice = self.Primal.copy() for node in self.Primal.nodes(): if self.Primal.node[node]['charge'][charge_type] == 0: LogicalLattice.remove_node(node) for node1 in LogicalLattice.nodes(): for node2 in LogicalLattice.nodes(): if node1 in self.Boundary[type] and node2 in self.Boundary[type]: if self.Boundary[type][node1] != self.Boundary[type][node2]: start, end = node1, node2 if start in LogicalLattice.nodes() and end in LogicalLattice.nodes(): if nx.has_path(LogicalLattice, start, end): return True return False
def has_path(self, source_address, target_address): """ True if there is a connecting path regardless of the number of hops. """ try: return networkx.has_path(self.graph, source_address, target_address) except (networkx.NodeNotFound, networkx.NetworkXNoPath): return False
def has_sense_path(G, source, target): if source not in G.senses or target not in G.senses: return False for source_sense, target_sense in itertools.product(G.senses[source], G.senses[target]): if nx.has_path(G, source_sense, target_sense): return True return False
def get_shortest_path(self, nx_graph, src_dpid, dst_dpid): if nx.has_path(nx_graph, src_dpid, dst_dpid): return nx.shortest_path(nx_graph, src_dpid, dst_dpid) return None
def get_path(self,q0,qf): """ Searches for a path in the PRM between configurations q0 and qf. Returns a list of configuration tuples describing the path or [] if no path is found. """ q0 = np.reshape(np.array(q0),3) qf = np.reshape(np.array(qf),3) if all(q0 == qf): return [qf] n0 = len(self.G.node) nf = n0 + 1 # Add the start and end configs to G, so we can just search it self.G.add_node(n0,cfg=q0) self.G.add_node(nf,cfg=qf) for k in [n0,nf]: self._connect(k,DobotModel.forward_kinematics(self.G.node[k]['cfg'])) if not nx.has_path(self.G,n0,nf): path = [] # could not find a path else: nodes = nx.dijkstra_path(self.G,n0,nf,'weight') path = [self.G.node[k]['cfg'] for k in nodes] # Remove the start and end configs so G remains consistent with tree self.G.remove_node(n0) self.G.remove_node(nf) return path
def _remove_unreachable_nodes(graph): """Remove all data structures that are not reachable from modules or from stacks. """ removed = 0 for node in graph.nodes()[:]: # check if reachable from globals for root in graph.root_nodes: if nx.has_path(graph, root, node): break else: graph.remove_node(node) removed += 1 return removed
def find_affected_but_not_down(netbox_going_down, netboxes): """Mark affected but not down because of redundancy boxes""" graph = build_layer2_graph() if not graph.has_node(netbox_going_down): return [netbox_going_down] graph.remove_node(netbox_going_down) masters = find_uplink_nodes(netbox_going_down) redundant = [] for netbox in netboxes: if netbox_going_down == netbox: continue if any(nx.has_path(graph, master, netbox) for master in masters): redundant.append(netbox) return redundant
def guess(graph, question, choices): MAX = 9999 SUBMAX = 999 ques_node = find_node(graph, question) dists = [] for choice in choices: choice_node = find_node(graph, choice) if ques_node is None and choice_node is None: dist = MAX elif ques_node is None and choice_node is not None: dist = SUBMAX elif ques_node is not None and choice_node is None: dist = MAX else: if nx.has_path(graph, ques_node, choice_node): pl = len(nx.shortest_path(graph, ques_node, choice_node)) dist = pl else: dist = MAX dists.append(dist) answer, dist = min(enumerate(dists), key=lambda x: x[1]) max_dist = max(dists) if dist == MAX: return None if dist == max_dist: return None return answer
def links_in_simple_paths(self, sources, sinks): """ Count all links in a simple path between sources and sinks Parameters ----------- sources : list List of source nodes sinks : list List of sink nodes sinks : list List of sink nodes Returns ------- link_count : dict A dictonary with the number of times each link is involved in a path """ link_names = [name for (node1, node2, name) in list(self.edges(keys=True))] link_count = pd.Series(data = 0, index=link_names) for sink in sinks: for source in sources: if nx.has_path(self, source, sink): paths = _all_simple_paths(self,source,target=sink) for path in paths: for i in range(len(path)-1): links = list(self[path[i]][path[i+1]].keys()) for link in links: link_count[link] = link_count[link]+1 return link_count
def hasConnectedBoundaries(code, loops_graph, Exts): if len(loops_graph.edges()) <= 1: return False for ext1 in loops_graph.nodes(): for ext2 in loops_graph.nodes(): if ext1 in Exts and ext2 in Exts and ext1 != ext2: if nx.has_path(loops_graph,ext1,ext2): path = nx.shortest_path(loops_graph, ext1, ext2) for node in path: if node not in Exts: return True return False
def connectedBoundaries(loops_graph, Exts): for ext1 in loops_graph.nodes(): for ext2 in loops_graph.nodes(): if ext1 in Exts and ext2 in Exts and ext1 != ext2: if nx.has_path(loops_graph,ext1,ext2): path = nx.shortest_path(loops_graph, ext1, ext2) for node in path: if node not in Exts: return ext1, ext2
def extract_diff_graph(graph, diff): diff_graph = graph.copy() diff_graph.root_nodes = list() # pnodes = collections.defaultdict(set) # for m in diff[0] | diff[2]: # for n in diff_graph.nodes(): # if nx.has_path(diff_graph, n, m): # pnodes[m].add(n) # # nodes = set.union(*pnodes.values()) nodes = set() logging.debug('Searching on-paths nodes...') for m in graph.root_nodes: for i in diff[0] | diff[2]: if m == i: nodes.add(m) try: nodes.update(*list(search_paths(diff_graph, m, i))) except: pass if m in nodes: diff_graph.root_nodes.append(m) logging.debug('Removing nodes off-path...') for n in diff_graph.nodes()[:]: if n not in nodes: diff_graph.remove_node(n) logging.debug('Setting node colors...') for n in diff_graph.nodes(): diff_graph.node[n]['color'] = 'turquoise' for n in diff[0]: diff_graph.node[n]['color'] = 'red' if isinstance(n, Stack): diff_graph.node[n]['color'] = 'deeppink' for n in diff[2]: diff_graph.node[n]['color'] = 'green' logging.debug('Exporting diff_graph.dot...') nx.write_dot(diff_graph, 'diff_graph.dot') return diff_graph # internal classes # internal functions
def __validate_coloring(self): log.debug("validating red-blue coloring; this could take a while (forever) if your graph is too big") # All red paths to a vertex should be disjoint from all blue paths # to the same vertex, except for red-blue links and their incident nodes red_dag = self.get_red_graph() blue_dag = self.get_blue_graph() source = self.root for dst in self.graph.nodes(): if dst == source: continue red_paths = list(nx.all_simple_paths(red_dag, source, dst)) red_nodes = set(n for p in red_paths for n in p) red_edges = set(e for p in red_paths for e in zip(p, p[1:])) blue_paths = list(nx.all_simple_paths(blue_dag, source, dst)) blue_nodes = set(n for p in blue_paths for n in p) blue_edges = set(e for p in blue_paths for e in zip(p, p[1:])) redblue_edges = red_edges.intersection(blue_edges) redblue_nodes = red_nodes.intersection(blue_nodes) redblue_nodes.remove(source) redblue_nodes.remove(dst) assert all(self._get_edge_color(e) == 'red-blue' for e in redblue_edges),\ "invalid coloring: non cut link shared by red and blue paths!" # every shared node has at least one incident cut link # TODO: finish this? unclear it's necessary as it just validates consistency of coloring not actual correctness of properties # assert all(any(self._get_edge_color(e) == 'red-blue' for e in # list(self.graph.successors(n)) + list(self.graph.predecessors(n))) # for n in redblue_nodes), "invalid coloring of nodes: shares a non-cut node!" # verify each red-blue edge or node is a cut edge/node for cut_node in redblue_nodes: g = self.graph.subgraph(n for n in self.graph.nodes() if n != cut_node) # could induce an empty (or near-empty) graph if source not in g or dst not in g: continue assert not nx.has_path(g, source, dst), "invalid coloring: non cut node shared by red and blue paths!" for cut_link in redblue_edges: g = self.graph.edge_subgraph(e for e in self.graph.edges() if e != cut_link) # could induce an empty (or near-empty) graph if source not in g or dst not in g: continue assert not nx.has_path(g, source, dst), "invalid coloring: non cut link shared by red and blue paths!" # draw_overlaid_graphs(self.graph, [red_dag, blue_dag]) return True