我们从Python开源项目中,提取了以下11个代码示例,用于说明如何使用networkx.read_gexf()。
def __init__(self, **params) : self.params = params if not os.path.exists(config.MENG_GRAPH_PATH) : log.debug("Meng graph file not found. Building one at '%s'" % config.MENG_GRAPH_PATH) mb = MengModelBuilder() self.graph = mb.build() del mb log.debug("Meng graph built. %d nodes and %d edges." % (self.graph.number_of_nodes(), self.graph.number_of_edges())) utils.ensure_folder(os.path.dirname(config.MENG_GRAPH_PATH)) nx.write_gexf(self.graph, config.MENG_GRAPH_PATH) log.debug("Meng graph saved.") else: log.debug("Reading Meng graph file at '%s'" % config.MENG_GRAPH_PATH) self.graph = nx.read_gexf(config.MENG_GRAPH_PATH, node_type=int)
def lpu_parser(filename): """ GEXF LPU specification parser. Extract LPU specification data from a GEXF file and store it in Python data structures. TODO: Update Parameters ---------- filename : str GEXF filename. Returns ------- TODO: Update """ graph = nx.read_gexf(filename) return LPU.graph_to_dicts(graph)
def test_edges_to_graph_gexf(self): graph = nx.read_gexf(fixtures_path('graph.gexf')) with open(fixtures_path('graph.json'), 'r') as json_graph: edges = json.load(json_graph) out = nx.read_gexf(io.StringIO(utils.edges_to_graph(edges, fmt='gexf'))) assert nodeset(out) == nodeset(graph) assert edgeset(out) == edgeset(graph)
def test_command_graph_gexf(self): matches = [ ((0, 0), (3, 3)), ((1, 11), (4, 14)), ((8, 12), (11, 15)), ] regions = utils.serialize_json(matches) result = self.runner.invoke( cli.graph, [regions, '-f', 'gexf', self.image_grid] ) out = nx.read_gexf(io.StringIO(result.output.strip())) graph = nx.read_gexf(fixtures_path('graph.gexf')) assert nodeset(out) == nodeset(graph) assert edgeset(out) == edgeset(graph)
def test_command_graph_gexf_tolerance(self): matches = [ ((0, 0), (3, 3)), ((1, 11), (4, 14)), ((8, 12), (11, 15)), ] regions = utils.serialize_json(matches) result = self.runner.invoke( cli.graph, [regions, '-f', 'gexf', '-st', 0, self.image_grid] ) out = nx.read_gexf(io.StringIO(result.output.strip())) graph = nx.read_gexf(fixtures_path('graph.gexf')) assert nodeset(out) == nodeset(graph) assert edgeset(out) == edgeset(graph)
def rank_nodes_baselines(graph, method="katz", limit=20) : # If 'graph' is a string then a path was provided, so we load the graph from it if (isinstance(graph, basestring)) : graph = nx.read_gexf(graph, node_type=int) if method=="katz" : r = katz_centrality(graph, alpha=0.01, beta=1.0) elif method=="hits_hub" : hubs, auth = nx.hits(graph, max_iter=500) r = hubs elif method=="hits_auth" : hubs, auth = nx.hits(graph, max_iter=500) r = auth else : raise ValueError("Invalid method parameter: '%s'" % method) rank = sorted(r.items(), key=lambda (k,v):v, reverse=True) results = [] for node_id, score in rank : if graph.node[node_id]["type"]=="paper" : results.append((node_id, graph.node[node_id]["paper_id"], score)) if len(results) == limit : break return results
def build_graph(conf_name, year, H, min_topic_lift, min_ngram_lift, exclude=[], force=False, save=True, load=False): """ Utility method to build and return the graph model. First we check if a graph file exists. If not, we check if the builder class is already instantiated. If not, we do it and proceed to build the graph. """ global builder model_folder = config.IN_MODELS_FOLDER % (config.DATASET, H) # Creates model folder if non existing if not os.path.exists(model_folder): os.makedirs(model_folder) graph_file = utils.get_graph_file_name(conf_name, model_folder) if force or (not os.path.exists(graph_file)): if not builder: builder = kddcup_model.ModelBuilder() # Builds the graph file graph = builder.build(conf_name, year, H, min_topic_lift, min_ngram_lift, exclude) # Stores gexf copy for caching purposes if save: nx.write_gexf(graph, graph_file) return graph else: # A gexf copy already exists in disk. Just load it and return # print graph_file try: graph = nx.read_gexf(graph_file, node_type=int) except: print "Problem opening '%s'." % graph_file sys.exit(1) return graph
def build_graph(query, K, H, min_topic_lift, min_ngram_lift, exclude=[], force=False, save=True, load=False): """ Utility method to build and return the graph model. First we check if a graph file exists. If not, we check if the builder class is already instantiated. If not, we do it and proceed to build the graph. """ global builder model_folder = config.IN_MODELS_FOLDER % (config.DATASET, K, H) # Creates model folder if non existing if not os.path.exists(model_folder): os.makedirs(model_folder) graph_file = utils.get_graph_file_name(query, model_folder) if force or (not os.path.exists(graph_file)): if not builder: builder = model.ModelBuilder() # Builds the graph file graph = builder.build(query, K, H, min_topic_lift, min_ngram_lift, exclude) # Stores gexf copy for caching purposes if save: nx.write_gexf(graph, graph_file) return graph else: # A gexf copy already exists in disk. Just load it and return # print graph_file try: graph = nx.read_gexf(graph_file, node_type=int) except: print "Problem opening '%s'." % graph_file sys.exit(1) return graph
def lpu_parser_legacy(filename): """ TODO: Update """ graph = nx.read_gexf(filename) return LPU.graph_to_dicts(LPU.conv_legacy_graph(graph))
def run(out_name): V = vis.visualizer() # Assumes that generic_lpu_0_input.h5 and generic_lpu_1_input.h5 # contain the same data: V.add_LPU('./data/generic_lpu_0_input.h5', LPU='Sensory', is_input=True) V.add_plot({'type': 'waveform', 'uids': [['sensory_0']], 'variable':'I'}, 'input_Sensory') for i in [0, 1]: G = nx.read_gexf('./data/generic_lpu_%s.gexf.gz' % i) neu_proj = sorted([k for k, n in G.node.items() if \ n['name'][:4] == 'proj' and \ n['class'] == 'LeakyIAF']) N = len(neu_proj) V.add_LPU('generic_lpu_%s_%s_output.h5' % (i, out_name), 'Generic LPU %s' % i, gexf_file='./data/generic_lpu_%s.gexf.gz' % i) V.add_plot({'type': 'raster', 'uids': [neu_proj], 'variable': 'spike_state', 'yticks': range(1, 1+N), 'yticklabels': neu_proj, 'title': 'Output'}, 'Generic LPU %s' % i) V.rows = 3 V.cols = 1 V.fontsize = 8 V.out_filename = '%s.mp4' % out_name V.codec = 'mpeg4' V.xlim = [0, 1.0] V.run() #V.run('%s.png' % out_name) # Run the visualizations in parallel: