我们从Python开源项目中,提取了以下29个代码示例,用于说明如何使用networkx.ancestors()。
def _get_calc_nodes(self, name): g = nx.DiGraph() g.add_nodes_from(self.dag.nodes()) g.add_edges_from(self.dag.edges()) for n in nx.ancestors(g, name): node = self.dag.node[n] state = node[_AN_STATE] if state == States.UPTODATE or state == States.PINNED: g.remove_node(n) ancestors = nx.ancestors(g, name) for n in ancestors: if state == States.UNINITIALIZED and len(self.dag.pred[n]) == 0: raise Exception("Cannot compute {} because {} uninitialized".format(name, n)) if state == States.PLACEHOLDER: raise Exception("Cannot compute {} because {} is placeholder".format(name, n)) ancestors.add(name) nodes_sorted = nx.topological_sort(g) return [n for n in nodes_sorted if n in ancestors]
def compute(self, name, raise_exceptions=False): """ Compute a node and all necessary predecessors Following the computation, if successful, the target node, and all necessary ancestors that were not already UPTODATE will have been calculated and set to UPTODATE. Any node that did not need to be calculated will not have been recalculated. If any nodes raises an exception, then the state of that node will be set to ERROR, and its value set to an object containing the exception object, as well as a traceback. This will not halt the computation, which will proceed as far as it can, until no more nodes that would be required to calculate the target are COMPUTABLE. :param name: Name of the node to compute :param raise_exceptions: Whether to pass exceptions raised by node computations back to the caller :type raise_exceptions: Boolean, default False """ if isinstance(name, (types.GeneratorType, list)): calc_nodes = set() for name0 in name: for n in self._get_calc_nodes(name0): calc_nodes.add(n) else: calc_nodes = self._get_calc_nodes(name) self._compute_nodes(calc_nodes, raise_exceptions=raise_exceptions)
def ready_to_schedule_operation(op, has_executed, graph): """ Determines if a Operation is ready to be scheduled for execution based on what has already been executed. Args: op: The Operation object to check has_executed: set A set containing all operations that have been executed so far graph: The networkx graph containing the operations and data nodes Returns: A boolean indicating whether the operation may be scheduled for execution based on what has already been executed. """ dependencies = set(filter(lambda v: isinstance(v, Operation), nx.ancestors(graph, op))) return dependencies.issubset(has_executed)
def workflow_execution_parcial(self): topological_sort = self.get_topological_sorted_tasks() for node_obj in topological_sort: # print self.workflow_graph.node[node] print (nx.ancestors(self.graph, node_obj), self.graph.predecessors(node_obj), node_obj, self.graph.node[node_obj]['in_degree_required'], self.graph.node[node_obj]['in_degree'], self.graph.node[node_obj]['out_degree_required'], self.graph.node[node_obj]['out_degree'] ) return True # only to debug
def _acceptable_subgraph(self): # type: () -> nx.MultiDiGraph graph = self.as_multidigraph reachable_states = nx.descendants(graph, self.start) | {self.start} graph = graph.subgraph(reachable_states) reachable_accepting_states = reachable_states & { node for node in graph.node if graph.node[node]['accepting'] } # Add a "sink" node with an in-edge from every accepting state. This is # is solely done because the networkx API makes it easier to find the # ancestor of a node than a set of nodes. sink = object() graph.add_node(sink) for state in reachable_accepting_states: graph.add_edge(state, sink) acceptable_sates = nx.ancestors(graph, sink) return graph.subgraph(acceptable_sates)
def live_subgraph(self): # type: () -> nx.MultiDiGraph """ Returns the graph of "live" states for this graph, i.e. the start state together with states that may be involved in positively matching a string (reachable from the start node and an ancestor of an accepting node). This is intended for display purposes, only showing the paths which might lead to an accepting state, or just the start state if no such paths exist. """ graph = self.as_multidigraph accepting_states = { node for node in graph.node if graph.node[node]['accepting'] } # Add a "sink" node with an in-edge from every accepting state. This is # is solely done because the networkx API makes it easier to find the # ancestor of a node than a set of nodes. sink = object() graph.add_node(sink) for state in accepting_states: graph.add_edge(state, sink) live_states = {self.start} | (nx.ancestors(graph, sink) & nx.descendants(graph, self.start)) return graph.subgraph(live_states)
def print_test(self, e=None): """Just a method to see a bit how the different libraries work.""" # Test node # if e is None: e = test_envos[0] # Goa # print "Goa: " print self.goatools[e] # Pygraphviz # print "pygraphviz: " print self.pygraphviz[e] print self.pygraphviz.successors(e) print self.pygraphviz.predecessors(e) print self.pygraphviz.get_node(e) # Networkx # import networkx print "networkx: " print self.networkx[e] print self.networkx.successors(e) print self.networkx.predecessors(e) print networkx.ancestors(self.networkx, e) # same as predecessors print networkx.descendants(self.networkx, e) # almost as child_to_parents
def print_impacted_modules(single_node=None): """ For each module, print a list of modules that depend on the module, i.e. modules that would be impacted by a change in this module. The function shows all levels of dependency, not just the immediately impacted modules. :return: """ print('\n===Impacted Modules===') for node_name in G.nodes_iter(): if single_node and (node_name!=single_node): continue ancestors = nx.ancestors(G, node_name) if len(ancestors) > 0: print(augment_format_string(node_name, '\n%s:') % node_name) for a in ancestors: print(augment_format_string(a, ' %s') % a)
def get_subgraph_with_ancestors(self, nodes): subgraph_nodes = set(nodes) for node in nodes: subgraph_nodes |= nx.ancestors(self._nx_dag, node) return self._nx_dag.subgraph(subgraph_nodes)
def is_back_edge(t: networkx.DiGraph, e: Tuple[Node, Node]): u, v = e[:2] if not t.has_edge(u, v): if u in t and v in networkx.ancestors(t, u): return True return False
def insert_many(self, name_value_pairs): """ Insert values into many nodes of a computation simultaneously Following insertation, the nodes will have state UPTODATE, and all their descendents will be COMPUTABLE or STALE. In the case of inserting many nodes, some of which are descendents of others, this ensures that the inserted nodes have correct status, rather than being set as STALE when their ancestors are inserted. If an attempt is made to insert a value into a node that does not exist, a ``NonExistentNodeException`` will be raised, and none of the nodes will be inserted. :param name_value_pairs: Each tuple should be a pair (name, value), where name is the name of the node to insert the value into. :type name_value_pairs: List of tuples """ LOG.debug('Inserting value into nodes {}'.format(", ".join(str(name) for name, value in name_value_pairs))) for name, value in name_value_pairs: if name not in self.dag: raise NonExistentNodeException('Node {} does not exist'.format(str(name))) stale = set() computable = set() for name, value in name_value_pairs: self._set_state_and_value(name, States.UPTODATE, value) stale.update(nx.dag.descendants(self.dag, name)) computable.update(self.dag.successors(name)) names = set([name for name, value in name_value_pairs]) stale.difference_update(names) computable.difference_update(names) for name in stale: self._set_state(name, States.STALE) for name in computable: self._try_set_computable(name)
def remove_ancestors_of(self, node): """Remove all of the ancestor operation nodes of node.""" anc = nx.ancestors(self.multi_graph, node) # TODO: probably better to do all at once using # multi_graph.remove_nodes_from; same for related functions ... for n in anc: nd = self.multi_graph.node[n] if nd["type"] == "op": self._remove_op_node(n)
def remove_nonancestors_of(self, node): """Remove all of the non-ancestors operation nodes of node.""" anc = nx.ancestors(self.multi_graph, node) comp = list(set(self.multi_graph.nodes()) - set(anc)) for n in comp: nd = self.multi_graph.node[n] if nd["type"] == "op": self._remove_op_node(n)
def nbunch_ancestors(G, nbunch): """Resolve output ancestors.""" ancestors = set(nbunch) for node in nbunch: ancestors = ancestors.union(nx.ancestors(G, node)) return ancestors
def get_nodes_from_spec(graph, spec): select_parents = spec['select_parents'] select_children = spec['select_children'] qualified_node_name = spec['qualified_node_name'] selected_nodes = set(get_nodes_by_qualified_name(graph, qualified_node_name)) additional_nodes = set() test_nodes = set() if select_parents: for node in selected_nodes: parent_nodes = nx.ancestors(graph, node) additional_nodes.update(parent_nodes) if select_children: for node in selected_nodes: child_nodes = nx.descendants(graph, node) additional_nodes.update(child_nodes) model_nodes = selected_nodes | additional_nodes for node in model_nodes: # include tests that depend on this node. if we aren't running tests, # they'll be filtered out later. child_tests = [n for n in graph.successors(node) if graph.node.get(n).get('resource_type') == NodeType.Test] test_nodes.update(child_tests) return model_nodes | test_nodes
def as_dependency_list(self, limit_to=None, ephemeral_only=False): """returns a list of list of nodes, eg. [[0,1], [2], [4,5,6]]. Each element contains nodes whose dependenices are subsumed by the union of all lists before it. In this way, all nodes in list `i` can be run simultaneously assuming that all lists before list `i` have been completed""" depth_nodes = defaultdict(list) if limit_to is None: graph_nodes = self.graph.nodes() else: graph_nodes = limit_to for node in graph_nodes: if node not in self.graph: raise RuntimeError( "Couldn't find model '{}' -- does it exist or is " "it disabled?".format(node) ) num_ancestors = len([ ancestor for ancestor in nx.ancestors(self.graph, node) if (dbt.utils.is_blocking_dependency( self.get_node(ancestor)) and (ephemeral_only is False or dbt.utils.get_materialization( self.get_node(ancestor)) == 'ephemeral')) ]) depth_nodes[num_ancestors].append(node) dependency_list = [] for depth in sorted(depth_nodes.keys()): dependency_list.append(depth_nodes[depth]) return dependency_list
def get_subgraph(self, envos=None): """Given a list of ENVO terms, get the subgraph that contains them all and all their ancestors, up to the root. Outputs a networkx DiGraph object.""" # Testing mode # if envos is None: envos = test_envos # All nodes # nodes = set(n for e in envos for n in networkx.descendants(self.networkx, e)) nodes.update(envos) nodes = list(nodes) # Return # return self.networkx.subgraph(nodes)
def descends(self, e, root): """Does the envo term `e` descend from the node `root`? Returns True or False.""" # Auto conversion # if isinstance(e, int): e = "ENVO:%08d" % e if isinstance(root, int): root = "ENVO:%08d" % root # Return # return e in networkx.ancestors(self.networkx, root) # --------------------------- In this section --------------------------- # # print_test # draw_with_networkx # draw_with_pygraphviz
def print_impacted_modules(single_node=None, json_out=None): """ For each module, print a list of modules that depend on the module, i.e. modules that would be impacted by a change in this module. The function shows all levels of dependency, not just the immediately impacted modules. If the json_out argument is not None, then the output will be recorded there rather than printed on stdout. :return: """ if json_out is None: print('\n===Impacted Modules===') else: json_out['impacted_modules'] = {} for node_name in G.nodes_iter(): if single_node and (node_name!=single_node): continue ancestors = nx.ancestors(G, node_name) if len(ancestors) > 0: if json_out is None: print(augment_format_string(node_name, '\n%s:') % node_name) else: json_out['impacted_modules'][node_name] = [] for a in ancestors: if json_out is None: print(augment_format_string(a, ' %s') % a) else: json_out['impacted_modules'][node_name].append(a)
def get_subgraph_for_node(node_name): """ Prints the dependency graph for only the specified node_name (a full dependency graph can be difficult to read). :param node_name: Node for which to print the sub-graph :return: """ ancestors = nx.ancestors(G, node_name) ancestors.add(node_name) return nx.subgraph(G, ancestors)
def prune_standalone_nodes(): """ Remove from the module dependency graph all modules that do not have any dependencies (i.e they neither import/include any modules nor are they imported/included by any modules) :return: the connected module dependency graph """ ng = nx.DiGraph(G) for node_name in G.nodes_iter(): ancestors = nx.ancestors(G, node_name) descendants = nx.descendants(G, node_name) if len(ancestors) == 0 and len(descendants) == 0: ng.remove_node(node_name) return ng
def transform(self, X, y=None): ''' From each value in the feature matrix, traverse upwards in the hierarchy (including multiple parents in DAGs), and set all nodes to one''' hierarchy = self.hierarchy X_out = np.zeros(X.shape, dtype=np.bool_) samples, relevant_topics, _ = sp.find(X) for sample, topic in zip(samples, relevant_topics): X_out[sample, topic] = 1 ancestors = nx.ancestors(hierarchy, topic) for ancestor in ancestors: X_out[sample, ancestor] = 1 return X_out
def subgraph_from(self, targets): '''Trim DAG to keep only nodes that produce targets''' # first, find all nodes with targets subnodes = [] for node in self.nodes(): if not isinstance(node._output_targets, Undetermined) and any(x in node._output_targets for x in targets): subnodes.append(node) # ancestors = set() for node in subnodes: ancestors |= nx.ancestors(self, node) return SoS_DAG(nx.subgraph(self, subnodes + list(ancestors)))
def _find_necessary_steps(self, outputs, inputs): """ Determines what graph steps need to pe run to get to the requested outputs from the provided inputs. Eliminates steps that come before (in topological order) any inputs that have been provided. Also eliminates steps that are not on a path from the provided inputs to the requested outputs. :param list outputs: A list of desired output names. This can also be ``None``, in which case the necessary steps are all graph nodes that are reachable from one of the provided inputs. :param dict inputs: A dictionary mapping names to values for all provided inputs. :returns: Returns a list of all the steps that need to be run for the provided inputs and requested outputs. """ if not outputs: # If caller requested all outputs, the necessary nodes are all # nodes that are reachable from one of the inputs. Ignore input # names that aren't in the graph. necessary_nodes = set() for input_name in iter(inputs): if self.graph.has_node(input_name): necessary_nodes |= nx.descendants(self.graph, input_name) else: # If the caller requested a subset of outputs, find any nodes that # are made unecessary because we were provided with an input that's # deeper into the network graph. Ignore input names that aren't # in the graph. unnecessary_nodes = set() for input_name in iter(inputs): if self.graph.has_node(input_name): unnecessary_nodes |= nx.ancestors(self.graph, input_name) # Find the nodes we need to be able to compute the requested # outputs. Raise an exception if a requested output doesn't # exist in the graph. necessary_nodes = set() for output_name in outputs: if not self.graph.has_node(output_name): raise ValueError("graphkit graph does not have an output " "node named %s" % output_name) necessary_nodes |= nx.ancestors(self.graph, output_name) # Get rid of the unnecessary nodes from the set of necessary ones. necessary_nodes -= unnecessary_nodes # Return an ordered list of the needed steps. return [step for step in self.steps if step in necessary_nodes]
def get_execution_order(cls, G): """Return a list of nodes to execute. This method returns the minimal list of nodes that need to be executed in graph G in order to return the requested outputs. The ordering of the nodes is fixed. Parameters ---------- G : nx.DiGraph Returns ------- nodes : list nodes that require execution """ # Get the cache dict if it exists cache = G.graph.get('_executor_cache', {}) output_nodes = G.graph['outputs'] # Filter those output nodes who have an operation to run needed = tuple(sorted(node for node in output_nodes if 'operation' in G.node[node])) if needed not in cache: # Resolve the nodes that need to be executed in the graph nodes_to_execute = set(needed) if 'sort_order' not in cache: cache['sort_order'] = nx_constant_topological_sort(G) sort_order = cache['sort_order'] # Resolve the dependencies of needed dep_graph = nx.DiGraph(G.edges()) for node in sort_order: attr = G.node[node] if attr.keys() >= {'operation', 'output'}: raise ValueError('Generative graph has both op and output present') # Remove those nodes from the dependency graph whose outputs are present if 'output' in attr: dep_graph.remove_node(node) elif 'operation' not in attr: raise ValueError('Generative graph has no op or output present') # Add the dependencies of the needed nodes for needed_node in needed: nodes_to_execute.update(nx.ancestors(dep_graph, needed_node)) # Turn in to a sorted list and cache cache[needed] = [n for n in sort_order if n in nodes_to_execute] return cache[needed]
def compile(cls, source_net, compiled_net): """Add observed nodes to the computation graph. Parameters ---------- source_net : nx.DiGraph compiled_net : nx.DiGraph Returns ------- compiled_net : nx.Digraph """ logger.debug("{} compiling...".format(cls.__name__)) observable = [] uses_observed = [] for node in nx.topological_sort(source_net): state = source_net.node[node] if state.get('_observable'): observable.append(node) cls.make_observed_copy(node, compiled_net) elif state.get('_uses_observed'): uses_observed.append(node) obs_node = cls.make_observed_copy(node, compiled_net, args_to_tuple) # Make edge to the using node compiled_net.add_edge(obs_node, node, param='observed') else: continue # Copy the edges if not state.get('_stochastic'): obs_node = observed_name(node) for parent in source_net.predecessors(node): if parent in observable: link_parent = observed_name(parent) else: link_parent = parent compiled_net.add_edge(link_parent, obs_node, source_net[parent][node].copy()) # Check that there are no stochastic nodes in the ancestors for node in uses_observed: # Use the observed version to query observed ancestors in the compiled_net obs_node = observed_name(node) for ancestor_node in nx.ancestors(compiled_net, obs_node): if '_stochastic' in source_net.node.get(ancestor_node, {}): raise ValueError("Observed nodes must be deterministic. Observed " "data depends on a non-deterministic node {}." .format(ancestor_node)) return compiled_net
def longest_string(self): # type: () -> String """ Returns an example of a maximally long string recognized by this DFA. If the language is infinite, raises InfiniteLanguageError. If the language is empty, raises EmptyLanguageError. The algorithm is similar to the one described in these lecture notes for deciding whether a language is finite: http://math.uaa.alaska.edu/~afkjm/cs351/handouts/non-regular.pdf """ # Compute what we're calling the "acceptable subgraph" by restricting to # states which are (1) descendants of a start state, and (2) ancestors of # an accepting state. These two properties imply that there is at least # one walk between these two states, corresponding to a string present in # the language. acceptable_subgraph = self._acceptable_subgraph if len(acceptable_subgraph.node) == 0: # If this graph is _empty_, then the language is empty. raise EmptyLanguageError() # Otherwise, we try to find the longest path in it. Internally, networkx # does this by topologically sorting the graph (which only works if it's # a DAG), then using the sorted graph to construct the longest path in # linear time. try: longest_path = nx.algorithms.dag.dag_longest_path( acceptable_subgraph) except nx.NetworkXUnfeasible: # If a topological sort is not possible, this means there is a # cycle, and the recognized language is infinite. In this case, # nx raises ``nx.NetworkXUnfeasible``. raise InfiniteLanguageError() # To show that the longest path must originate at the start node, # consider 3 cases for the position of s in a longest path P from u to v: # # (a) At the beginning. Done; this is what we were seeking to prove. # (b) On the path, but not at the beginning. In this case, u is # reachable from s (by property (1) above), and s in reachable from # u (since s is on a path from u to v). This means the graph # contains a cycle, which contradicts that we've constructed a # topological sort on it. # (c) Disjoint from s. Let P' be a path connecting s to u (which must # exist by property (1)). If this path contains a vertex u'!=u in P, # then P ? P' contains a cycle (from u to u' on P and from u' to u # on P'), which is a contradiction. But then P ? P' is a path, which # contains at least one more vertex than P (in particular, s), and # so is a longer path, which contradicts the maximality assumption. chars = [] for state1, state2 in zip(longest_path, longest_path[1:]): edges = self.succ[state1][state2] chars.append(next(six.itervalues(edges))['transition']) return type(self.alphabet[0])().join(chars)
def hierarchical_f_measure(tr, y_true, y_pred): """ Calculate hierarchical f-measure. This is defined as the f-measure precision and recall calculated with the union of the ancestors of the given labels (including the labels themselves and excluding the root). Parameters ---------- tr: ThesaursReader The thesaurus. y_true: {sparse matrix, array-like} The true labels y_pred: {sparse matrix, array-like} The predicited labels Returns ------- float The hierarchical f_measure """ graph = tr.nx_graph root = tr.nx_root if not sp.issparse(y_true): y_true = sp.coo_matrix(y_true) y_pred = sp.coo_matrix(y_pred) label_scores = [] for i in range(0, y_true.shape[0]): row_true = y_true.getrow(i) row_pred = y_pred.getrow(i) true_ancestors = set.union(set(row_true.indices), *[nx.ancestors(graph, index) for index in row_true.indices]) true_ancestors.discard(root) pred_ancestors = set.union(set(row_pred.indices), *[nx.ancestors(graph, index) for index in row_pred.indices]) pred_ancestors.discard(root) intersection = len(pred_ancestors & true_ancestors) try: p = intersection / len(pred_ancestors) r = intersection / len(true_ancestors) label_scores.append(2 * p * r / (p + r)) except ZeroDivisionError: warn('F_score is ill-defined and being set to 0.0 on samples with no predicted labels', UndefinedMetricWarning, stacklevel=2) label_scores.append(0) return np.mean(label_scores)