我们从Python开源项目中,提取了以下17个代码示例,用于说明如何使用networkx.descendants()。
def _acceptable_subgraph(self): # type: () -> nx.MultiDiGraph graph = self.as_multidigraph reachable_states = nx.descendants(graph, self.start) | {self.start} graph = graph.subgraph(reachable_states) reachable_accepting_states = reachable_states & { node for node in graph.node if graph.node[node]['accepting'] } # Add a "sink" node with an in-edge from every accepting state. This is # is solely done because the networkx API makes it easier to find the # ancestor of a node than a set of nodes. sink = object() graph.add_node(sink) for state in reachable_accepting_states: graph.add_edge(state, sink) acceptable_sates = nx.ancestors(graph, sink) return graph.subgraph(acceptable_sates)
def live_subgraph(self): # type: () -> nx.MultiDiGraph """ Returns the graph of "live" states for this graph, i.e. the start state together with states that may be involved in positively matching a string (reachable from the start node and an ancestor of an accepting node). This is intended for display purposes, only showing the paths which might lead to an accepting state, or just the start state if no such paths exist. """ graph = self.as_multidigraph accepting_states = { node for node in graph.node if graph.node[node]['accepting'] } # Add a "sink" node with an in-edge from every accepting state. This is # is solely done because the networkx API makes it easier to find the # ancestor of a node than a set of nodes. sink = object() graph.add_node(sink) for state in accepting_states: graph.add_edge(state, sink) live_states = {self.start} | (nx.ancestors(graph, sink) & nx.descendants(graph, self.start)) return graph.subgraph(live_states)
def print_test(self, e=None): """Just a method to see a bit how the different libraries work.""" # Test node # if e is None: e = test_envos[0] # Goa # print "Goa: " print self.goatools[e] # Pygraphviz # print "pygraphviz: " print self.pygraphviz[e] print self.pygraphviz.successors(e) print self.pygraphviz.predecessors(e) print self.pygraphviz.get_node(e) # Networkx # import networkx print "networkx: " print self.networkx[e] print self.networkx.successors(e) print self.networkx.predecessors(e) print networkx.ancestors(self.networkx, e) # same as predecessors print networkx.descendants(self.networkx, e) # almost as child_to_parents
def print_impacting_modules(single_node=None): """ For each module, print a list of modules that the module is depending on, i.e. modules whose change can potentially impact the module. The function shows all levels of dependency, not just the immediately imported modules. :return: """ print('\n===Impacting Modules===') for node_name in G.nodes_iter(): if single_node and (node_name!=single_node): continue descendants = nx.descendants(G, node_name) print(augment_format_string(node_name, '\n%s:') % node_name) for d in descendants: print(augment_format_string(d, ' %s') % d)
def _create_eps_closure_func(table): edges = {(c.src, c.sym.text, c.dst[0]) for c in table.cells} graph = DiGraph((src, dst) for src, sym, dst in edges if sym == '') states = set(sum(((src, dst) for src, _, dst in edges), ())) cs = {s: descendants(graph, s) if s in graph else set() for s in states} return lambda ss: frozenset(set(ss).union(*(cs[s] for s in ss)))
def remove_descendants_of(self, node): """Remove all of the descendant operation nodes of node.""" dec = nx.descendants(self.multi_graph, node) for n in dec: nd = self.multi_graph.node[n] if nd["type"] == "op": self._remove_op_node(n)
def remove_nondescendants_of(self, node): """Remove all of the non-descendants operation nodes of node.""" dec = nx.descendants(self.multi_graph, node) comp = list(set(self.multi_graph.nodes()) - set(dec)) for n in comp: nd = self.multi_graph.node[n] if nd["type"] == "op": self._remove_op_node(n)
def collective_downstream(workflow, steps): downstream = set() for step in steps: for x in nx.descendants(workflow.dag, step): downstream.add(x) return list(downstream)
def get_nodes_from_spec(graph, spec): select_parents = spec['select_parents'] select_children = spec['select_children'] qualified_node_name = spec['qualified_node_name'] selected_nodes = set(get_nodes_by_qualified_name(graph, qualified_node_name)) additional_nodes = set() test_nodes = set() if select_parents: for node in selected_nodes: parent_nodes = nx.ancestors(graph, node) additional_nodes.update(parent_nodes) if select_children: for node in selected_nodes: child_nodes = nx.descendants(graph, node) additional_nodes.update(child_nodes) model_nodes = selected_nodes | additional_nodes for node in model_nodes: # include tests that depend on this node. if we aren't running tests, # they'll be filtered out later. child_tests = [n for n in graph.successors(node) if graph.node.get(n).get('resource_type') == NodeType.Test] test_nodes.update(child_tests) return model_nodes | test_nodes
def get_dependent_nodes(self, node): return nx.descendants(self.graph, node)
def remove_node(self, node): children = nx.descendants(self.graph, node) self.graph.remove_node(node) return children
def get_subgraph(self, envos=None): """Given a list of ENVO terms, get the subgraph that contains them all and all their ancestors, up to the root. Outputs a networkx DiGraph object.""" # Testing mode # if envos is None: envos = test_envos # All nodes # nodes = set(n for e in envos for n in networkx.descendants(self.networkx, e)) nodes.update(envos) nodes = list(nodes) # Return # return self.networkx.subgraph(nodes)
def print_impacting_modules(single_node=None, json_out=None): """ For each module, print a list of modules that the module is depending on, i.e. modules whose change can potentially impact the module. The function shows all levels of dependency, not just the immediately imported modules. If the json_out argument is not None, then the output will be recorded there instead of on stdout. :return: """ if json_out is None: print('\n===Impacting Modules===') else: json_out['impacting_modules'] = {} for node_name in G.nodes_iter(): if single_node and (node_name!=single_node): continue descendants = nx.descendants(G, node_name) if json_out is None: print(augment_format_string(node_name, '\n%s:') % node_name) else: json_out['impacting_modules'][node_name] = [] for d in descendants: if json_out is None: print(augment_format_string(d, ' %s') % d) else: json_out['impacting_modules'][node_name].append(d)
def prune_standalone_nodes(): """ Remove from the module dependency graph all modules that do not have any dependencies (i.e they neither import/include any modules nor are they imported/included by any modules) :return: the connected module dependency graph """ ng = nx.DiGraph(G) for node_name in G.nodes_iter(): ancestors = nx.ancestors(G, node_name) descendants = nx.descendants(G, node_name) if len(ancestors) == 0 and len(descendants) == 0: ng.remove_node(node_name) return ng
def _find_necessary_steps(self, outputs, inputs): """ Determines what graph steps need to pe run to get to the requested outputs from the provided inputs. Eliminates steps that come before (in topological order) any inputs that have been provided. Also eliminates steps that are not on a path from the provided inputs to the requested outputs. :param list outputs: A list of desired output names. This can also be ``None``, in which case the necessary steps are all graph nodes that are reachable from one of the provided inputs. :param dict inputs: A dictionary mapping names to values for all provided inputs. :returns: Returns a list of all the steps that need to be run for the provided inputs and requested outputs. """ if not outputs: # If caller requested all outputs, the necessary nodes are all # nodes that are reachable from one of the inputs. Ignore input # names that aren't in the graph. necessary_nodes = set() for input_name in iter(inputs): if self.graph.has_node(input_name): necessary_nodes |= nx.descendants(self.graph, input_name) else: # If the caller requested a subset of outputs, find any nodes that # are made unecessary because we were provided with an input that's # deeper into the network graph. Ignore input names that aren't # in the graph. unnecessary_nodes = set() for input_name in iter(inputs): if self.graph.has_node(input_name): unnecessary_nodes |= nx.ancestors(self.graph, input_name) # Find the nodes we need to be able to compute the requested # outputs. Raise an exception if a requested output doesn't # exist in the graph. necessary_nodes = set() for output_name in outputs: if not self.graph.has_node(output_name): raise ValueError("graphkit graph does not have an output " "node named %s" % output_name) necessary_nodes |= nx.ancestors(self.graph, output_name) # Get rid of the unnecessary nodes from the set of necessary ones. necessary_nodes -= unnecessary_nodes # Return an ordered list of the needed steps. return [step for step in self.steps if step in necessary_nodes]
def longest_string(self): # type: () -> String """ Returns an example of a maximally long string recognized by this DFA. If the language is infinite, raises InfiniteLanguageError. If the language is empty, raises EmptyLanguageError. The algorithm is similar to the one described in these lecture notes for deciding whether a language is finite: http://math.uaa.alaska.edu/~afkjm/cs351/handouts/non-regular.pdf """ # Compute what we're calling the "acceptable subgraph" by restricting to # states which are (1) descendants of a start state, and (2) ancestors of # an accepting state. These two properties imply that there is at least # one walk between these two states, corresponding to a string present in # the language. acceptable_subgraph = self._acceptable_subgraph if len(acceptable_subgraph.node) == 0: # If this graph is _empty_, then the language is empty. raise EmptyLanguageError() # Otherwise, we try to find the longest path in it. Internally, networkx # does this by topologically sorting the graph (which only works if it's # a DAG), then using the sorted graph to construct the longest path in # linear time. try: longest_path = nx.algorithms.dag.dag_longest_path( acceptable_subgraph) except nx.NetworkXUnfeasible: # If a topological sort is not possible, this means there is a # cycle, and the recognized language is infinite. In this case, # nx raises ``nx.NetworkXUnfeasible``. raise InfiniteLanguageError() # To show that the longest path must originate at the start node, # consider 3 cases for the position of s in a longest path P from u to v: # # (a) At the beginning. Done; this is what we were seeking to prove. # (b) On the path, but not at the beginning. In this case, u is # reachable from s (by property (1) above), and s in reachable from # u (since s is on a path from u to v). This means the graph # contains a cycle, which contradicts that we've constructed a # topological sort on it. # (c) Disjoint from s. Let P' be a path connecting s to u (which must # exist by property (1)). If this path contains a vertex u'!=u in P, # then P ? P' contains a cycle (from u to u' on P and from u' to u # on P'), which is a contradiction. But then P ? P' is a path, which # contains at least one more vertex than P (in particular, s), and # so is a longer path, which contradicts the maximality assumption. chars = [] for state1, state2 in zip(longest_path, longest_path[1:]): edges = self.succ[state1][state2] chars.append(next(six.itervalues(edges))['transition']) return type(self.alphabet[0])().join(chars)