我们从Python开源项目中,提取了以下14个代码示例,用于说明如何使用networkx.simple_cycles()。
def bag_of_elementary_cycles(graph): """ Bag of elementary cycles """ bag = {} for cycle in nx.simple_cycles(graph): ns = map(lambda x: node_label(graph.node[x]), cycle) # Determine smallest label and rotate cycle i = min(enumerate(ns), key=lambda x: x[1])[0] ns.extend(ns[:i]) ns[:i] = [] label = '-'.join(ns) if label not in bag: bag[label] = 0 bag[label] += 1 return bag
def cycle_detect(G): # G = nx.DiGraph object cycle_list = list(nx.simple_cycles(G)) print '\n\tCycles detected for individuals: ' for i in cycle_list: print '\t', i[0] rmCyNode = raw_input('\n\t> Remove nodes? (y/n): ') print rmCyNode if rmCyNode == 'y': for i in cycle_list: G.remove_node(i[0]) print '\n\tNodes removed with success.' print '\n\tProceeding with the algorithm.' elif rmCyNode == 'n': print '\n\tCorrect pedigree errors before proceeding..' print '\n\tClosing PedWork.py session...' print '\n\tExit > OK!' sys.exit() else: print '\n\tPlease enter y/n.' sys.exit() return G
def embed(self, G, rng): if G.order() < self.cycle_length: return np.array([0.0], dtype = "f") succ = 0 max_cycle = sp.binom(G.order(), self.cycle_length) for _ in range(self.sample_size): us = rng.choice(G.nodes(), self.cycle_length) H = G.subgraph(us.tolist()) for c in nx.simple_cycles(H): if len(c) == self.cycle_length: succ += 1 break val = [max_cycle * (succ / self.sample_size)] return np.array(val, dtype = "f")
def embed(self, G, rng): if G.order() < self.cycle_length: return np.array([0.0], dtype = "f") succ, samples = 0, 0 max_cycle = sp.binom(G.order(), self.cycle_length) for _ in range(self.sample_cap): us = rng.choice(G.nodes(), self.cycle_length) H = G.subgraph(us.tolist()) samples += 1 for c in nx.simple_cycles(H): if len(c) == self.cycle_length: succ += 1 break if succ >= self.successes: break return np.array([max_cycle * (succ / samples)], dtype = "f")
def _feature_loopcount(self, f_ea): ''' Generate a block graph, then get loopcount via DiGraph.number_of_selfloops() prior feature: Null ''' DEBUG_PRINT( 'feature_loopcount' + hex(f_ea)) loopcount = 0 mloops = set() # loops = map(lambda loop: sorted(loop),nx.simple_cycles(self.ftable["dg"])) # sorted too slow, use min instead self.loops = {} self.loops = simple_cycles(self.ftable["dg"]) # if loops is None: # return 0 # # mloops = map(lambda loop : min(loop), loops) # # loopcount = len(sets(mloops)) # for loop in loops: # mloop = min(loop) # mloops.add(mloop) # loopcount = len(mloops) loopcount = len(self.loops) return loopcount
def expand(self): assert(self.state == TransactionState.INIT) (expanded, missing) = self._expand_depedencies( self.local_repo, self.repo, self.targets, exclude=self.removes ) if len(missing) > 0: raise MissingDependencyError(missing) self.depend_G = super().packages_to_graph(expanded) try: cycle = next(nx.simple_cycles(self.depend_G)) except StopIteration: pass else: raise TransactionCycleError(cycle) self._sort_deps_to_targets() # Find all the packages that are upgrades rather than installs for package in expanded: if not package.is_local: # Search for just the name to check if we have any version # locally q = Query(package.name) local_package = self.local_repo.find_package(q) # If we have something we want to remove it first, we call that # an "upgrade" if local_package is None: continue self.removes.append(local_package) conflicts = super()._find_conflicts(self.installs, self.local_repo) if len(conflicts) > 0: # There were at least some conflicts raise ConflictError(conflicts) self.state = TransactionState.EXPANDED
def get_cycles_by_value(self, edge_type: EdgeType, quote_type: QuoteType) -> Dict[float, List[str]]: dg = self.get_network(NetworkType.price, edge_type, quote_type) weights = get_edge_attributes(dg, 'weight') cycle_vals = {} for cycle in simple_cycles(dg): # sort the currencies in cycle for long term sanity best_curr = max(cycle, key=lambda x: Currency[x].value) best_curr_ind = cycle.index(best_curr) cycle = [cycle[best_curr_ind]] + cycle[(best_curr_ind + 1):] + cycle[:best_curr_ind] cycle.append(cycle[0]) prodw = [float(weights[(cycle[i], cycle[i + 1])]) for i in range(len(cycle) - 1)] prodw = prod(prodw) cycle_vals[prodw] = cycle return cycle_vals
def loops(func): '''Return number of loops at function ``func``''' import networkx as nx fn = by(func) g = blocks.nx(fn) return len(list(nx.simple_cycles(g))) # FIXME: document this #def refs(func, member): # xl, fn = idaapi.xreflist_t(), by(func) # idaapi.build_stkvar_xrefs(xl, fn, member.ptr) # x.ea, x.opnum, x.type # ref_types = { # 0 : 'Data_Unknown', # 1 : 'Data_Offset', # 2 : 'Data_Write', # 3 : 'Data_Read', # 4 : 'Data_Text', # 5 : 'Data_Informational', # 16 : 'Code_Far_Call', # 17 : 'Code_Near_Call', # 18 : 'Code_Far_Jump', # 19 : 'Code_Near_Jump', # 20 : 'Code_User', # 21 : 'Ordinary_Flow' # } # return [(x.ea,x.opnum) for x in xl]
def _remove_cycles(self): """ Really hacky way to remove cycles in hierarchies (wtf). """ G = self.make_graph() cycles = list(networkx.simple_cycles(G)) if not cycles: return False else: cycles[0][0].looped = True cycles[0][0].sources[:] = [ ] return True
def _compile_node_creator_list(self): graph = self._graph cycles = list(nx.simple_cycles(graph)) if len(cycles): raise ValueError("Found cycles in dependencies "+str(cycles)) else: nodes = list(nx.dfs_postorder_nodes(graph)) nodes.remove(self._ROOT) # exclude root creators = [] for n in nodes: creator = graph.node[n].get('creator') creators.append((n, creator)) return creators
def valid_graph(self, g): cycles = [a for a in nx.simple_cycles(g)] if len(cycles) > 0: return False return True
def build_tree(self, tasks): """Accepts any number of tasks as returned by _get_pipeline. :param tasks: dictionary of str:info where str is the name of the task, info is from the registry :returns: Graph containing each node connected by dependencies, "after: ALL" nodes will be ignored :rtype: networkx.DiGraph :raises: :exc:`DependencyError` """ all_task_names = [task for tasks_subset in self.registry.values() for task in tasks_subset] # Find dependencies - directed graph of node names tree = nx.DiGraph() # Add nodes for name, info in tasks.items(): # TODO: doing this twice sucks if info['after'] is ALL: # ignore these continue tree.add_node(name, info=info) # Add edges for name, info in tasks.items(): if info['after'] is ALL: # ignore these continue for req in info['after']: if req not in all_task_names: msg = '"{}" pipeline element was not found, but it is declared as dependency of the pipeline "{}" with arguments "{}"' raise DependencyError(msg.format(req, name, info)) if req in tree: # don't add an edge if dependency is not part of the current set of tasks tree.add_edge(req, name) # Not as useful as it originally seemed # tree = prune_edges(tree) # Check for circular dependencies try: cycle = nx.simple_cycles(tree).next() raise DependencyError('Circular dependencies detected: {}'.format(cycle)) except StopIteration: # Good - didn't want any cycles pass # Joins (merge multiple tasks) have more than one edge in # joins = [n for n, d in tree.in_degree_iter() if d > 1] # Don't support joins right now, one reducer at the end of the chain # if joins: # raise DependencyError('Multiple after values not currently supported joins="{}"'.format(joins)) # TODO - even with joins this could be a challenge # Can't handle "N" shapes, so check for those # Descendants of forks, cannot join from outside. return tree