我们从Python开源项目中,提取了以下4个代码示例,用于说明如何使用networkx.find_cliques()。
def find_biggest_clique(self): """Finds the biggest clique of validators committed to target estimate.""" # Do not have safety if less than half have candidate_estimate. if self.validator_set.weight(self.with_candidate) < self.validator_set.weight() / 2: return set(), 0 edges = self._collect_edges() graph = nx.Graph() graph.add_edges_from(edges) cliques = nx.find_cliques(graph) max_clique = [] max_weight = 0 for clique in cliques: test_weight = utils.get_weight(clique) if test_weight > max_weight: max_clique = clique max_weight = test_weight return set(max_clique), max_weight
def neighbors2_community(G, remove_duplicates=True, use_kcore=False): Gc = None if use_kcore: Gc = G.copy() Gc.remove_edges_from(Gc.selfloop_edges()) Gc = nx.k_core(Gc, 3) # Gc = [cl for cl in nx.find_cliques(G)] else: Gc = G communities = set() for v in Gc.nodes(): neighs = G.neighbors(v) community = [] for n in neighs: community.append(n) neighs2 = G.neighbors(n) community.extend(neighs2) if remove_duplicates: community = list(set(community)) communities.add(tuple(community)) communities = list(map(list, communities)) # Convert tuples back into lists return communities
def all_clusters(cls): graph = Graph() for edges in [[(h, p) for p in h.ha_cluster_peers.all()] for h in ManagedHost.objects.all()]: graph.add_edges_from(edges) clusters = [] for cluster_peers in find_cliques(graph): clusters.append(cls(cluster_peers)) return clusters
def handler(websocket, path): payload = await websocket.recv() print("payload: {0}".format(payload)) payload = json.loads(payload) print("begin crawl") count = payload['count'] time_stamp = payload['date'] time_stamp = datetime(time_stamp['year'], time_stamp['month'], time_stamp['day']) wikicrawler = WikipediaCrawler(payload['title'], time_stamp, count) while True: for edge in wikicrawler.crawl_iter(seed_count=count): print("edge: {0}".format(edge)) await websocket.send("{0}".format(json.dumps(edge))) listener_task = asyncio.ensure_future(websocket.recv()) done, pending = await asyncio.wait( [listener_task], return_when=asyncio.FIRST_COMPLETED) if listener_task in done: payload = listener_task.result() payload = json.loads(payload) print("listener_task payload:", payload) await websocket.send("{0}".format(nx.find_cliques(wikicrawler.graph))) else: print("cancelling listener_task") listener_task.cancel()