Python networkx 模块,is_directed_acyclic_graph() 实例源码


项目:phasm    作者:AbeelLab    | 项目源码 | 文件源码
def test_graph_to_dag(test_graph):
    logger = logging.getLogger('graph_to_dag')
    g = test_graph

    for partition, acyclic in partition_graph(g):
        if acyclic:

        dag, dfs_tree = graph_to_dag(partition)

        logger.debug("%s", dag.nodes())
        logger.debug("%s", dag.edges())
        logger.debug("DFS nodes: %s", dfs_tree.nodes())
        logger.debug("DFS edges: %s", dfs_tree.edges())

        assert networkx.is_directed_acyclic_graph(dag)
项目:breaking_cycles_in_noisy_hierarchies    作者:zhenv5    | 项目源码 | 文件源码
def remove_cycle_edges_by_mfas(graph_file):
    g = nx.read_edgelist(graph_file,create_using = nx.DiGraph(),nodetype = int)
    from remove_self_loops import remove_self_loops_from_graph
    self_loops = remove_self_loops_from_graph(g)

    scc_nodes,_,_,_ = scc_nodes_edges(g)
    degree_dict = get_nodes_degree_dict(g,scc_nodes)
    sccs = get_big_sccs(g)
    if len(sccs) == 0:
        print("After removal of self loop edgs: %s" % nx.is_directed_acyclic_graph(g))
        return self_loops
    edges_to_be_removed = []
    import timeit
    t1 = timeit.default_timer()
    t2 = timeit.default_timer()
    print("mfas time usage: %0.4f s" % (t2 - t1))
    edges_to_be_removed = list(set(edges_to_be_removed))
    edges_to_be_removed += self_loops
    edges_to_be_removed_file = graph_file[:len(graph_file)-6] + "_removed_by_mfas.edges"
    return edges_to_be_removed
项目:seqenv    作者:xapple    | 项目源码 | 文件源码
def networkx(self):
        """The network converted to `networkx` format.
        Seems like it looses directionality.
        To install: $ pip install networkx
        g = self.orange_obo.to_networkx()
        assert networkx.is_directed_acyclic_graph(g)
        return g

    # --------------------------- In this section --------------------------- #
    # test
    # get_subgraph
    # add_weights
    # draw_to_pdf
    # write_to_dot
项目:feagen    作者:ianlini    | 项目源码 | 文件源码
def draw_dag(cls, path, data_keys):
        # pylint: disable=protected-access
        dag = cls._dag.draw(path, data_keys, root_node_key='generate',
        if not nx.is_directed_acyclic_graph(dag):
            print("Warning! The graph is not acyclic!")
项目:inferno    作者:inferno-pytorch    | 项目源码 | 文件源码
def graph_is_valid(self):
        """Checks if the graph is valid."""
        # Check if the graph is a DAG
        is_dag = is_directed_acyclic_graph(self.graph)
        # Check if output nodes are sinks
        output_nodes_are_sinks = all([self.is_sink_node(name) for name in self.output_nodes])
        # Check inf input nodes are sources
        input_nodes_are_sources = all([self.is_source_node(name) for name in self.input_nodes])
        # TODO Check whether only input nodes are sources and only output nodes are sinks
        # Conclude
        is_valid = is_dag and output_nodes_are_sinks and input_nodes_are_sources
        return is_valid
项目:inferno    作者:inferno-pytorch    | 项目源码 | 文件源码
def assert_graph_is_valid(self):
        """Asserts that the graph is valid."""
        assert is_directed_acyclic_graph(self.graph), "Graph is not a DAG."
        for name in self.output_nodes:
            assert self.is_sink_node(name), "Output node {} is not a sink.".format(name)
            assert not self.is_source_node(name), "Output node {} is a source node. " \
                                                  "Make sure it's connected.".format(name)
        for name in self.input_nodes:
            assert self.is_source_node(name), "Input node {} is not a source.".format(name)
            assert not self.is_sink_node(name), "Input node {} is a sink node. " \
                                                "Make sure it's connected.".format(name)
项目:qiskit-sdk-py    作者:QISKit    | 项目源码 | 文件源码
def depth(self):
        """Return the circuit depth."""
        assert nx.is_directed_acyclic_graph(self.multi_graph), "not a DAG"
        return nx.dag_longest_path_length(self.multi_graph) - 1
项目:NetworkCompress    作者:luzai    | 项目源码 | 文件源码
def add_skip(self, model, config):
        assert nx.is_directed_acyclic_graph(model.graph)
        topo_nodes = nx.topological_sort(model.graph)

        names = [ for node in topo_nodes
                 if node.type == 'Conv2D' or node.type == 'Group' or node.type == 'Conv2D_Pooling']

        if len(names) <= 2:
  'can\'t find a suitable layer to apply add_skip operation,return origin model')
            return model, False

        max_iter = 100
        for i in range(max_iter + 1):
            if i == max_iter:
      'can\'t find a suitable layer to apply add_skip operation,return origin model')
                return model, False
            from_idx = np.random.randint(0, len(names) - 2)
            to_idx = from_idx + 1
            next_nodes = model.graph.get_nodes(names[to_idx], next_layer=True, last_layer=False)
            if 'Add' in [node.type for node in next_nodes]:

        from_name = names[from_idx]
        to_name = names[to_idx]'choose {} and {} to add_skip'.format(from_name, to_name))
        return self.skip(model, from_name, to_name, config), True

    # add group operation
项目:cppdep    作者:rakhimov    | 项目源码 | 文件源码
def __transitive_reduction(self):
        """Transitive reduction for acyclic graphs."""
        assert nx.is_directed_acyclic_graph(self.digraph)
        for u in self.digraph:
            transitive_vertex = []
            for v in self.digraph[u]:
                    x for _, x in nx.dfs_edges(self.digraph, v))
            self.digraph.remove_edges_from((u, x) for x in transitive_vertex)
项目:revex    作者:lucaswiman    | 项目源码 | 文件源码
def has_finite_language(self):  # type: () -> bool
        Returns True iff this DFA recognizes a finite (possibly empty) language.

        Based on decision procedure described here:

        - Remove nodes which cannot reach an accepting state (see
          `_acceptable_subgraph` above).
        - Language is finite iff the remaining graph is acyclic.
        return nx.is_directed_acyclic_graph(self._acceptable_subgraph)
项目:lightflow    作者:AustralianSynchrotron    | 项目源码 | 文件源码
def validate(self, graph):
        """ Validate the graph by checking whether it is a directed acyclic graph.

            graph (DiGraph): Reference to a DiGraph object from NetworkX.

            DirectedAcyclicGraphInvalid: If the graph is not a valid dag.
        if not nx.is_directed_acyclic_graph(graph):
            raise DirectedAcyclicGraphInvalid(graph_name=self._name)
项目:breaking_cycles_in_noisy_hierarchies    作者:zhenv5    | 项目源码 | 文件源码
def scc_based_to_remove_cycle_edges_iterately(g,nodes_score):
    from remove_self_loops import remove_self_loops_from_graph
    self_loops = remove_self_loops_from_graph(g)
    big_sccs = get_big_sccs(g)
    scc_nodes_score_dict = scores_of_nodes_in_scc(big_sccs,nodes_score)
    edges_to_be_removed = []
    if len(big_sccs) == 0:
        print("After removal of self loop edgs: %s" % nx.is_directed_acyclic_graph(g))
        return self_loops

    #print(" # edges to be removed: %d" % len(edges_to_be_removed))
    return edges_to_be_removed+self_loops
项目:breaking_cycles_in_noisy_hierarchies    作者:zhenv5    | 项目源码 | 文件源码
def scc_based_to_remove_cycle_edges_iterately(g,nodes_score,is_Forward):
    big_sccs = get_big_sccs(g)
    if len(big_sccs) == 0:
        print("After removal of self loop edgs: %s" % nx.is_directed_acyclic_graph(g))
        return []
    scc_nodes_score_dict = scores_of_nodes_in_scc(big_sccs,nodes_score)
    edges_to_be_removed = []
    #print(" # edges to be removed: %d" % len(edges_to_be_removed))
    return edges_to_be_removed
项目:breaking_cycles_in_noisy_hierarchies    作者:zhenv5    | 项目源码 | 文件源码
def scc_based_to_remove_cycle_edges_iterately(g,edges_score):
    big_sccs = get_big_sccs(g)
    if len(big_sccs) == 0:
        print("After removal of self loop edgs: %s" % nx.is_directed_acyclic_graph(g))
        return []
    edges_to_be_removed = []
    #print(" # edges to be removed: %d" % len(edges_to_be_removed))
    return edges_to_be_removed
项目:ride    作者:KyleBenson    | 项目源码 | 文件源码
def __validate_list(self):
        non_source_sets = self._list[1:-1]
        # every non-source anchor has an incoming neighbor
        # both before and after it in the skeleton list.
        for anchor in (self._get_anchor(t) for t in non_source_sets):
            found_after = False
            found_before = False
            my_idx = self._get_index(anchor)
            for pred in self._get_external_predecessors(anchor):
                pred_idx = self._get_index(pred)
                if pred_idx < my_idx:
                    found_before = True
                # HACK: root is in 2 places!
                if pred == self.root:
                    pred_idx = len(self._list) - 1
                if pred_idx > my_idx:
                    found_after = True

            assert(found_after and found_before)

        # every non-source set is pair-wise disjoint with every other set
        for s1, s2 in itertools.combinations(non_source_sets, 2):
            assert(len(set(s1.nodes()).intersection(s2.nodes())) == 0)

        # directed path from anchor to every other node in its set
        assert(all(nx.is_directed_acyclic_graph(t) for t in non_source_sets))

        # first and last sets are always just the root
        assert(self.root in self._list[0] and self.root in self._list[-1] and\
               self._list[0].number_of_nodes() == 1 and self._list[-1].number_of_nodes() == 1)

        return True
项目:prov2bigchaindb    作者:DLR-SC    | 项目源码 | 文件源码
def calculate_account_data(prov_document: provmodel.ProvDocument) -> list:
        Transforms a ProvDocument into a list of tuples including: 
        ProvAgent, list of ProvRelations from agent,
        list of ProvElements associated to ProvAgent,
        list of Namespaces

        :param prov_document: Document to transform
        :type prov_document:
        :return: List of tuples(ProvAgent, list(), list(), list())
        :rtype: list

        namespaces = prov_document.get_registered_namespaces()
        g = provgraph.prov_to_graph(prov_document=prov_document)
        sorted_nodes = topological_sort(g, reverse=True)
        agents = list(filter(lambda elem: isinstance(elem, provmodel.ProvAgent), sorted_nodes))
        elements = list(filter(lambda elem: not isinstance(elem, provmodel.ProvAgent), sorted_nodes))

        # Check on compatibility
        if not is_directed_acyclic_graph(g):
            raise Exception("Provenance graph is not acyclic")
        if isolates(g):
            raise Exception("Provenance not compatible with role-based concept. Has isolated Elements")
        for element in elements:
            if provmodel.ProvAgent not in [type(n) for n in g.neighbors(element)]:
                raise Exception(
                    "Provenance not compatible with role-based concept. Element {} has not relation to any agent".format(

        accounts = []
        for agent in agents:
            # find out-going relations from agent
            agent_relations = []
            for u, v in g.out_edges(agent):
                # Todo check if filter does not left out some info
                agent_relations.append(g.get_edge_data(u, v)[0]['relation'])

            agent_elements = {}
            i = 0
            for element in elements:
                element_relations = []
                if g.has_edge(element, agent):
                    for u, v in set(g.out_edges(element)):
                        for relation in g[u][v].values():
                    agent_elements[i] = {element: element_relations}
                    i += 1

            accounts.append((agent, agent_relations, agent_elements, namespaces))
        return accounts
项目:ReGraph    作者:eugeniashurko    | 项目源码 | 文件源码
def add_typing(self, source, target, mapping,
                   total=False, attrs=None):
        """Add homomorphism to the hierarchy."""
        if source not in self.nodes():
            raise HierarchyError(
                "Node '%s' is not defined in the hierarchy!" % source)
        if target not in self.nodes():
            raise HierarchyError(
                "Node '%s' is not defined in the hierarchy!" % target)

        if (source, target) in self.edges():
            raise HierarchyError(
                "Edge '%s->%s' already exists in the hierarchy: "
                "no muliple edges allowed!" %
                (source, target)
        if not isinstance(self.node[source], GraphNode):
            if type(self.node[source]) == RuleNode:
                raise HierarchyError(
                    "Source node is a rule, use `add_rule_typing` "
                    "method instead!"
                raise HierarchyError(
                    "Source of a typing should be a graph, `%s` is provided!" %
        if not isinstance(self.node[target], GraphNode):
            raise HierarchyError(
                "Target of a typing should be a graph, `%s` is provided!" %

        # check no cycles are produced
        self.add_edge(source, target)
        if not nx.is_directed_acyclic_graph(self):
            self.remove_edge(source, target)
            raise HierarchyError(
                "Edge '%s->%s' creates a cycle in the hierarchy!" %
                (source, target)
        self.remove_edge(source, target)

        # check if the homomorphism is valid

        # check if newly created path commutes with existing shortest paths
        self._check_consistency(source, target, mapping)

        self.add_edge(source, target)
        if attrs is not None:
        self.edge[source][target] = self.graph_typing_cls(
            mapping, total, attrs)
        self.typing[source][target] = self.edge[source][target].mapping
项目:phasm    作者:AbeelLab    | 项目源码 | 文件源码
def test_partition(test_graph):
    logger = logging.getLogger('partition')
    g = test_graph

    i = 0
    for partition, acyclic in partition_graph(g):
        if i == 0:
            expected_nodes = {'5', '4', '3', '2', 'r_', 're_'}
            expected_edges = {
                ("5", "2"),
                ("5", "re_"),
                ("4", "5"),
                ("2", "3"),
                ("2", "re_"),
                ("3", "4"),
                ("3", "5"),
                ("r_", "2")

            assert set(partition.nodes()) == expected_nodes
            assert set(partition.edges()) == expected_edges
        elif i == 1:
            expected_nodes = {'8', '7', '6', '1', 'r_', 're_'}
            expected_edges = {
                ("7", "8"),
                ("6", "7"),
                ("1", "6"),
                ("1", "re_"),
                ("r_", "7"),
                ("r_", "6"),
                ("r_", "1"),
                ("8", "re_")

            assert set(partition.nodes()) == expected_nodes
            assert set(partition.edges()) == expected_edges
            raise ValueError("Unexpected partition, got more than 2")

        logger.debug("%s", partition.nodes())
        logger.debug("%s", partition.edges())

        assert networkx.is_directed_acyclic_graph(partition) == acyclic

        i += 1
项目:BAG_framework    作者:ucb-art    | 项目源码 | 文件源码
def build(self, debug=False):
        """Returns a OpenMDAO Group from the variable graph.

        debug : bool
            True to print debug messages.

        grp : omdao.Group
            the OpenMDAO group that computes all variables.
        input_bounds : dict[str, any]
            a dictionary from input variable name to (min, max, ndim) tuple.
        input_bounds = {}
        ndim_dict = {}

        if not nx.is_directed_acyclic_graph(self._g):
            raise Exception('Dependency loop detected')

        grp = omdao.Group()
        prom = ['*']
        for var in nx.topological_sort(self._g):
            nattrs = self._g.node[var]
            ndim = nattrs['ndim']
            ndim_dict[var] = ndim
            if self._g.in_degree(var) == 0:
                if debug:
                    # input variable
                    print('Input variable: %s' % var)
                # range checking
                vmin, vmax = nattrs['min'], nattrs['max']
                veq = nattrs.get('equals', None)
                if vmin > vmax:
                    raise Exception('Variable %s input range not valid.' % var)
                input_bounds[var] = veq, vmin, vmax, ndim
                init_vals = {par: np.zeros(ndim_dict[par]) for par in self._g.predecessors_iter(var)}
                comp_name = 'comp__%s' % var
                if 'expr' in nattrs:
                    eqn = '{}={}'.format(var, nattrs['expr'])
                    init_vals[var] = np.zeros(ndim)
                    # noinspection PyTypeChecker
                    grp.add(comp_name, omdao.ExecComp(eqn, **init_vals), promotes=prom)
                elif 'fun_list' in nattrs:
                    params = nattrs['params']
                    fun_list = nattrs['fun_list']
                    vec_params = nattrs['vec_params']
                    comp = VecFunComponent(var, fun_list, params, vector_params=vec_params)
                    # noinspection PyTypeChecker
                    grp.add(comp_name, comp, promotes=prom)
                    raise Exception('Unknown attributes: {}'.format(nattrs))

        return grp, input_bounds
项目:breaking_cycles_in_noisy_hierarchies    作者:zhenv5    | 项目源码 | 文件源码
def gnm_random_graph(n, m, seed=None, directed=True):
    """Return the random graph G_{n,m}.

    Produces a graph picked randomly out of the set of all graphs
    with n nodes and m edges.

    n : int
        The number of nodes.
    m : int
        The number of edges.
    seed : int, optional
        Seed for random number generator (default=None).
    directed : bool, optional (default=False)
        If True return a directed graph
    if directed:
        g = nx.DiGraph()
        g = nx.Graph()


    if seed is not None:

    if n==1:
        return G


    if not directed:
    if m>=max_edges:
        return nx.complete_graph(n,create_using=G)

    while edge_count < m:
        # generate random edge,u,v
        u = random.choice(nlist)
        v = random.choice(nlist)
        if u>=v or G.has_edge(u,v):
            edge_count = edge_count+1

    permutation = np.random.permutation(n)
    #print permutation
    new_edges = []
    for e in G.edges():
        u,v = e 
    print("is_directed_acyclic_graph: %s" % nx.is_directed_acyclic_graph(g))
    return g
项目:ReGraph    作者:Kappa-Dev    | 项目源码 | 文件源码
def add_typing(self, source, target, mapping,
                   total=True, attrs=None):
        """Add homomorphism to the hierarchy."""
        if source not in self.nodes():
            raise HierarchyError(
                "Node '%s' is not defined in the hierarchy!" % source)
        if target not in self.nodes():
            raise HierarchyError(
                "Node '%s' is not defined in the hierarchy!" % target)

        if (source, target) in self.edges():
            raise HierarchyError(
                "Edge '%s->%s' already exists in the hierarchy: "
                "no muliple edges allowed!" %
                (source, target)
        if not isinstance(self.node[source], GraphNode):
            if type(self.node[source]) == RuleNode:
                raise HierarchyError(
                    "Source node is a rule, use `add_rule_typing` "
                    "method instead!"
                raise HierarchyError(
                    "Source of a typing should be a graph, `%s` is provided!" %
        if not isinstance(self.node[target], GraphNode):
            raise HierarchyError(
                "Target of a typing should be a graph, `%s` is provided!" %

        # check no cycles are produced
        self.add_edge(source, target)
        if not nx.is_directed_acyclic_graph(self):
            self.remove_edge(source, target)
            raise HierarchyError(
                "Edge '%s->%s' creates a cycle in the hierarchy!" %
                (source, target)
        self.remove_edge(source, target)

        # check if the homomorphism is valid

        # check if newly created path commutes with existing shortest paths
        self._check_consistency(source, target, mapping)

        self.add_edge(source, target)
        if attrs is not None:
        self.edge[source][target] = self.graph_typing_cls(
            mapping, total, attrs)
        self.typing[source][target] = self.edge[source][target].mapping