我们从Python开源项目中,提取了以下21个代码示例,用于说明如何使用networkx.is_directed_acyclic_graph()。
def test_graph_to_dag(test_graph): logger = logging.getLogger('graph_to_dag') g = test_graph for partition, acyclic in partition_graph(g): if acyclic: continue dag, dfs_tree = graph_to_dag(partition) logger.debug("%s", dag.nodes()) logger.debug("%s", dag.edges()) logger.debug("DFS nodes: %s", dfs_tree.nodes()) logger.debug("DFS edges: %s", dfs_tree.edges()) assert networkx.is_directed_acyclic_graph(dag)
def remove_cycle_edges_by_mfas(graph_file): g = nx.read_edgelist(graph_file,create_using = nx.DiGraph(),nodetype = int) from remove_self_loops import remove_self_loops_from_graph self_loops = remove_self_loops_from_graph(g) scc_nodes,_,_,_ = scc_nodes_edges(g) degree_dict = get_nodes_degree_dict(g,scc_nodes) sccs = get_big_sccs(g) if len(sccs) == 0: print("After removal of self loop edgs: %s" % nx.is_directed_acyclic_graph(g)) return self_loops edges_to_be_removed = [] import timeit t1 = timeit.default_timer() greedy_local_heuristic(sccs,degree_dict,edges_to_be_removed) t2 = timeit.default_timer() print("mfas time usage: %0.4f s" % (t2 - t1)) edges_to_be_removed = list(set(edges_to_be_removed)) g.remove_edges_from(edges_to_be_removed) edges_to_be_removed += self_loops edges_to_be_removed_file = graph_file[:len(graph_file)-6] + "_removed_by_mfas.edges" write_pairs_to_file(edges_to_be_removed,edges_to_be_removed_file) return edges_to_be_removed
def networkx(self): """The network converted to `networkx` format. Seems like it looses directionality. * https://networkx.readthedocs.org/en/stable/ To install: $ pip install networkx """ g = self.orange_obo.to_networkx() assert networkx.is_directed_acyclic_graph(g) return g # --------------------------- In this section --------------------------- # # test # get_subgraph # add_weights # draw_to_pdf # write_to_dot
def draw_dag(cls, path, data_keys): # pylint: disable=protected-access dag = cls._dag.draw(path, data_keys, root_node_key='generate', reverse=True) if not nx.is_directed_acyclic_graph(dag): print("Warning! The graph is not acyclic!")
def graph_is_valid(self): """Checks if the graph is valid.""" # Check if the graph is a DAG is_dag = is_directed_acyclic_graph(self.graph) # Check if output nodes are sinks output_nodes_are_sinks = all([self.is_sink_node(name) for name in self.output_nodes]) # Check inf input nodes are sources input_nodes_are_sources = all([self.is_source_node(name) for name in self.input_nodes]) # TODO Check whether only input nodes are sources and only output nodes are sinks # Conclude is_valid = is_dag and output_nodes_are_sinks and input_nodes_are_sources return is_valid
def assert_graph_is_valid(self): """Asserts that the graph is valid.""" assert is_directed_acyclic_graph(self.graph), "Graph is not a DAG." for name in self.output_nodes: assert self.is_sink_node(name), "Output node {} is not a sink.".format(name) assert not self.is_source_node(name), "Output node {} is a source node. " \ "Make sure it's connected.".format(name) for name in self.input_nodes: assert self.is_source_node(name), "Input node {} is not a source.".format(name) assert not self.is_sink_node(name), "Input node {} is a sink node. " \ "Make sure it's connected.".format(name)
def depth(self): """Return the circuit depth.""" assert nx.is_directed_acyclic_graph(self.multi_graph), "not a DAG" return nx.dag_longest_path_length(self.multi_graph) - 1
def add_skip(self, model, config): assert nx.is_directed_acyclic_graph(model.graph) topo_nodes = nx.topological_sort(model.graph) names = [node.name for node in topo_nodes if node.type == 'Conv2D' or node.type == 'Group' or node.type == 'Conv2D_Pooling'] if len(names) <= 2: logger.info('can\'t find a suitable layer to apply add_skip operation,return origin model') return model, False max_iter = 100 for i in range(max_iter + 1): if i == max_iter: logger.info('can\'t find a suitable layer to apply add_skip operation,return origin model') return model, False from_idx = np.random.randint(0, len(names) - 2) to_idx = from_idx + 1 next_nodes = model.graph.get_nodes(names[to_idx], next_layer=True, last_layer=False) if 'Add' in [node.type for node in next_nodes]: continue else: break from_name = names[from_idx] to_name = names[to_idx] logger.info('choose {} and {} to add_skip'.format(from_name, to_name)) return self.skip(model, from_name, to_name, config), True # add group operation
def __transitive_reduction(self): """Transitive reduction for acyclic graphs.""" assert nx.is_directed_acyclic_graph(self.digraph) for u in self.digraph: transitive_vertex = [] for v in self.digraph[u]: transitive_vertex.extend( x for _, x in nx.dfs_edges(self.digraph, v)) self.digraph.remove_edges_from((u, x) for x in transitive_vertex)
def has_finite_language(self): # type: () -> bool """ Returns True iff this DFA recognizes a finite (possibly empty) language. Based on decision procedure described here: http://math.uaa.alaska.edu/~afkjm/cs351/handouts/non-regular.pdf - Remove nodes which cannot reach an accepting state (see `_acceptable_subgraph` above). - Language is finite iff the remaining graph is acyclic. """ return nx.is_directed_acyclic_graph(self._acceptable_subgraph)
def validate(self, graph): """ Validate the graph by checking whether it is a directed acyclic graph. Args: graph (DiGraph): Reference to a DiGraph object from NetworkX. Raises: DirectedAcyclicGraphInvalid: If the graph is not a valid dag. """ if not nx.is_directed_acyclic_graph(graph): raise DirectedAcyclicGraphInvalid(graph_name=self._name)
def scc_based_to_remove_cycle_edges_iterately(g,nodes_score): from remove_self_loops import remove_self_loops_from_graph self_loops = remove_self_loops_from_graph(g) big_sccs = get_big_sccs(g) scc_nodes_score_dict = scores_of_nodes_in_scc(big_sccs,nodes_score) edges_to_be_removed = [] if len(big_sccs) == 0: print("After removal of self loop edgs: %s" % nx.is_directed_acyclic_graph(g)) return self_loops remove_cycle_edges_by_agony_iterately(big_sccs,scc_nodes_score_dict,edges_to_be_removed) #print(" # edges to be removed: %d" % len(edges_to_be_removed)) return edges_to_be_removed+self_loops
def scc_based_to_remove_cycle_edges_iterately(g,nodes_score,is_Forward): big_sccs = get_big_sccs(g) if len(big_sccs) == 0: print("After removal of self loop edgs: %s" % nx.is_directed_acyclic_graph(g)) return [] scc_nodes_score_dict = scores_of_nodes_in_scc(big_sccs,nodes_score) edges_to_be_removed = [] remove_cycle_edges_by_ranking_score_iterately(big_sccs,scc_nodes_score_dict,edges_to_be_removed,is_Forward) #print(" # edges to be removed: %d" % len(edges_to_be_removed)) return edges_to_be_removed
def scc_based_to_remove_cycle_edges_iterately(g,edges_score): big_sccs = get_big_sccs(g) if len(big_sccs) == 0: print("After removal of self loop edgs: %s" % nx.is_directed_acyclic_graph(g)) return [] edges_to_be_removed = [] remove_cycle_edges_by_agony_iterately(big_sccs,edges_score,edges_to_be_removed) #print(" # edges to be removed: %d" % len(edges_to_be_removed)) return edges_to_be_removed
def __validate_list(self): non_source_sets = self._list[1:-1] # every non-source anchor has an incoming neighbor # both before and after it in the skeleton list. for anchor in (self._get_anchor(t) for t in non_source_sets): found_after = False found_before = False my_idx = self._get_index(anchor) for pred in self._get_external_predecessors(anchor): pred_idx = self._get_index(pred) if pred_idx < my_idx: found_before = True # HACK: root is in 2 places! if pred == self.root: pred_idx = len(self._list) - 1 if pred_idx > my_idx: found_after = True assert(found_after and found_before) # every non-source set is pair-wise disjoint with every other set for s1, s2 in itertools.combinations(non_source_sets, 2): assert(len(set(s1.nodes()).intersection(s2.nodes())) == 0) # directed path from anchor to every other node in its set assert(all(nx.is_directed_acyclic_graph(t) for t in non_source_sets)) # first and last sets are always just the root assert(self.root in self._list[0] and self.root in self._list[-1] and\ self._list[0].number_of_nodes() == 1 and self._list[-1].number_of_nodes() == 1) return True
def calculate_account_data(prov_document: provmodel.ProvDocument) -> list: """ Transforms a ProvDocument into a list of tuples including: ProvAgent, list of ProvRelations from agent, list of ProvElements associated to ProvAgent, list of Namespaces :param prov_document: Document to transform :type prov_document: :return: List of tuples(ProvAgent, list(), list(), list()) :rtype: list """ namespaces = prov_document.get_registered_namespaces() g = provgraph.prov_to_graph(prov_document=prov_document) sorted_nodes = topological_sort(g, reverse=True) agents = list(filter(lambda elem: isinstance(elem, provmodel.ProvAgent), sorted_nodes)) elements = list(filter(lambda elem: not isinstance(elem, provmodel.ProvAgent), sorted_nodes)) # Check on compatibility if not is_directed_acyclic_graph(g): raise Exception("Provenance graph is not acyclic") if isolates(g): raise Exception("Provenance not compatible with role-based concept. Has isolated Elements") for element in elements: if provmodel.ProvAgent not in [type(n) for n in g.neighbors(element)]: raise Exception( "Provenance not compatible with role-based concept. Element {} has not relation to any agent".format( element)) accounts = [] for agent in agents: # find out-going relations from agent agent_relations = [] for u, v in g.out_edges(agent): # Todo check if filter does not left out some info agent_relations.append(g.get_edge_data(u, v)[0]['relation']) agent_elements = {} i = 0 for element in elements: element_relations = [] if g.has_edge(element, agent): for u, v in set(g.out_edges(element)): for relation in g[u][v].values(): element_relations.append(relation['relation']) agent_elements[i] = {element: element_relations} i += 1 accounts.append((agent, agent_relations, agent_elements, namespaces)) return accounts
def add_typing(self, source, target, mapping, total=False, attrs=None): """Add homomorphism to the hierarchy.""" if source not in self.nodes(): raise HierarchyError( "Node '%s' is not defined in the hierarchy!" % source) if target not in self.nodes(): raise HierarchyError( "Node '%s' is not defined in the hierarchy!" % target) if (source, target) in self.edges(): raise HierarchyError( "Edge '%s->%s' already exists in the hierarchy: " "no muliple edges allowed!" % (source, target) ) if not isinstance(self.node[source], GraphNode): if type(self.node[source]) == RuleNode: raise HierarchyError( "Source node is a rule, use `add_rule_typing` " "method instead!" ) else: raise HierarchyError( "Source of a typing should be a graph, `%s` is provided!" % type(self.node[source]) ) if not isinstance(self.node[target], GraphNode): raise HierarchyError( "Target of a typing should be a graph, `%s` is provided!" % type(self.node[target]) ) # check no cycles are produced self.add_edge(source, target) if not nx.is_directed_acyclic_graph(self): self.remove_edge(source, target) raise HierarchyError( "Edge '%s->%s' creates a cycle in the hierarchy!" % (source, target) ) self.remove_edge(source, target) # check if the homomorphism is valid check_homomorphism( self.node[source].graph, self.node[target].graph, mapping, total=total ) # check if newly created path commutes with existing shortest paths self._check_consistency(source, target, mapping) self.add_edge(source, target) if attrs is not None: normalize_attrs(attrs) self.edge[source][target] = self.graph_typing_cls( mapping, total, attrs) self.typing[source][target] = self.edge[source][target].mapping return
def test_partition(test_graph): logger = logging.getLogger('partition') g = test_graph i = 0 for partition, acyclic in partition_graph(g): if i == 0: expected_nodes = {'5', '4', '3', '2', 'r_', 're_'} expected_edges = { ("5", "2"), ("5", "re_"), ("4", "5"), ("2", "3"), ("2", "re_"), ("3", "4"), ("3", "5"), ("r_", "2") } assert set(partition.nodes()) == expected_nodes assert set(partition.edges()) == expected_edges elif i == 1: expected_nodes = {'8', '7', '6', '1', 'r_', 're_'} expected_edges = { ("7", "8"), ("6", "7"), ("1", "6"), ("1", "re_"), ("r_", "7"), ("r_", "6"), ("r_", "1"), ("8", "re_") } assert set(partition.nodes()) == expected_nodes assert set(partition.edges()) == expected_edges else: raise ValueError("Unexpected partition, got more than 2") logger.debug("%s", partition.nodes()) logger.debug("%s", partition.edges()) assert networkx.is_directed_acyclic_graph(partition) == acyclic i += 1
def build(self, debug=False): """Returns a OpenMDAO Group from the variable graph. Parameters ---------- debug : bool True to print debug messages. Returns ------- grp : omdao.Group the OpenMDAO group that computes all variables. input_bounds : dict[str, any] a dictionary from input variable name to (min, max, ndim) tuple. """ input_bounds = {} ndim_dict = {} if not nx.is_directed_acyclic_graph(self._g): raise Exception('Dependency loop detected') grp = omdao.Group() prom = ['*'] for var in nx.topological_sort(self._g): nattrs = self._g.node[var] ndim = nattrs['ndim'] ndim_dict[var] = ndim if self._g.in_degree(var) == 0: if debug: # input variable print('Input variable: %s' % var) # range checking vmin, vmax = nattrs['min'], nattrs['max'] veq = nattrs.get('equals', None) if vmin > vmax: raise Exception('Variable %s input range not valid.' % var) input_bounds[var] = veq, vmin, vmax, ndim else: init_vals = {par: np.zeros(ndim_dict[par]) for par in self._g.predecessors_iter(var)} comp_name = 'comp__%s' % var if 'expr' in nattrs: eqn = '{}={}'.format(var, nattrs['expr']) init_vals[var] = np.zeros(ndim) # noinspection PyTypeChecker grp.add(comp_name, omdao.ExecComp(eqn, **init_vals), promotes=prom) elif 'fun_list' in nattrs: params = nattrs['params'] fun_list = nattrs['fun_list'] vec_params = nattrs['vec_params'] comp = VecFunComponent(var, fun_list, params, vector_params=vec_params) # noinspection PyTypeChecker grp.add(comp_name, comp, promotes=prom) else: raise Exception('Unknown attributes: {}'.format(nattrs)) return grp, input_bounds
def gnm_random_graph(n, m, seed=None, directed=True): """Return the random graph G_{n,m}. Produces a graph picked randomly out of the set of all graphs with n nodes and m edges. Parameters ---------- n : int The number of nodes. m : int The number of edges. seed : int, optional Seed for random number generator (default=None). directed : bool, optional (default=False) If True return a directed graph """ if directed: G=nx.DiGraph() g = nx.DiGraph() else: G=nx.Graph() g = nx.Graph() G.add_nodes_from(range(n)) G.name="gnm_random_graph(%s,%s)"%(n,m) if seed is not None: random.seed(seed) if n==1: return G max_edges=n*(n-1) if not directed: max_edges/=2.0 if m>=max_edges: return nx.complete_graph(n,create_using=G) nlist=G.nodes() edge_count=0 while edge_count < m: # generate random edge,u,v u = random.choice(nlist) v = random.choice(nlist) if u>=v or G.has_edge(u,v): continue else: G.add_edge(u,v) edge_count = edge_count+1 permutation = np.random.permutation(n) #print permutation new_edges = [] for e in G.edges(): u,v = e new_edges.append((permutation[u],permutation[v])) g.add_edges_from(new_edges) print("is_directed_acyclic_graph: %s" % nx.is_directed_acyclic_graph(g)) return g
def add_typing(self, source, target, mapping, total=True, attrs=None): """Add homomorphism to the hierarchy.""" if source not in self.nodes(): raise HierarchyError( "Node '%s' is not defined in the hierarchy!" % source) if target not in self.nodes(): raise HierarchyError( "Node '%s' is not defined in the hierarchy!" % target) if (source, target) in self.edges(): raise HierarchyError( "Edge '%s->%s' already exists in the hierarchy: " "no muliple edges allowed!" % (source, target) ) if not isinstance(self.node[source], GraphNode): if type(self.node[source]) == RuleNode: raise HierarchyError( "Source node is a rule, use `add_rule_typing` " "method instead!" ) else: raise HierarchyError( "Source of a typing should be a graph, `%s` is provided!" % type(self.node[source]) ) if not isinstance(self.node[target], GraphNode): raise HierarchyError( "Target of a typing should be a graph, `%s` is provided!" % type(self.node[target]) ) # check no cycles are produced self.add_edge(source, target) if not nx.is_directed_acyclic_graph(self): self.remove_edge(source, target) raise HierarchyError( "Edge '%s->%s' creates a cycle in the hierarchy!" % (source, target) ) self.remove_edge(source, target) # check if the homomorphism is valid check_homomorphism( self.node[source].graph, self.node[target].graph, mapping, total=total ) # check if newly created path commutes with existing shortest paths self._check_consistency(source, target, mapping) self.add_edge(source, target) if attrs is not None: normalize_attrs(attrs) self.edge[source][target] = self.graph_typing_cls( mapping, total, attrs) self.typing[source][target] = self.edge[source][target].mapping return