我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用networkx.MultiDiGraph()。
def test_flatten_edges(self): g = nx.MultiDiGraph() g.add_edge(1, 2, key=5, attr_dict={'A': 'a', 'B': {'C': 'c', 'D': 'd'}}) result = pybel.utils.flatten_graph_data(g) expected = nx.MultiDiGraph() expected.add_edge(1, 2, key=5, attr_dict={'A': 'a', 'B_C': 'c', 'B_D': 'd'}) self.assertEqual(set(result.nodes()), set(expected.nodes())) res_edges = result.edges(keys=True) exp_edges = expected.edges(keys=True) self.assertEqual(set(res_edges), set(exp_edges)) for u, v, k in expected.edges(keys=True): self.assertEqual(expected[u][v][k], result[u][v][k])
def to_graphml(graph, file): """Writes this graph to GraphML XML file using :func:`networkx.write_graphml`. The .graphml file extension is suggested so Cytoscape can recognize it. :param BELGraph graph: A BEL graph :param file file: A file or file-like object """ g = nx.MultiDiGraph() for node, data in graph.nodes(data=True): g.add_node(node, json=json.dumps(data)) for u, v, key, data in graph.edges(data=True, keys=True): g.add_edge(u, v, key=key, attr_dict=flatten_dict(data)) nx.write_graphml(g, file)
def flatten_graph_data(graph): """Returns a new graph with flattened edge data dictionaries. :param nx.MultiDiGraph graph: A graph with nested edge data dictionaries :return: A graph with flattened edge data dictionaries :rtype: nx.MultiDiGraph """ g = nx.MultiDiGraph(**graph.graph) for node, data in graph.nodes(data=True): g.add_node(node, data) for u, v, key, data in graph.edges(data=True, keys=True): g.add_edge(u, v, key=key, attr_dict=flatten_dict(data)) return g
def nx_graph(self): """Convert the data in a ``nodelist`` into a networkx labeled directed graph.""" import networkx nx_nodelist = list(range(1, len(self.nodes))) nx_edgelist = [ (n, self._hd(n), self._rel(n)) for n in nx_nodelist if self._hd(n) ] self.nx_labels = {} for n in nx_nodelist: self.nx_labels[n] = self.nodes[n]['word'] g = networkx.MultiDiGraph() g.add_nodes_from(nx_nodelist) g.add_edges_from(nx_edgelist) return g
def _acceptable_subgraph(self): # type: () -> nx.MultiDiGraph graph = self.as_multidigraph reachable_states = nx.descendants(graph, self.start) | {self.start} graph = graph.subgraph(reachable_states) reachable_accepting_states = reachable_states & { node for node in graph.node if graph.node[node]['accepting'] } # Add a "sink" node with an in-edge from every accepting state. This is # is solely done because the networkx API makes it easier to find the # ancestor of a node than a set of nodes. sink = object() graph.add_node(sink) for state in reachable_accepting_states: graph.add_edge(state, sink) acceptable_sates = nx.ancestors(graph, sink) return graph.subgraph(acceptable_sates)
def live_subgraph(self): # type: () -> nx.MultiDiGraph """ Returns the graph of "live" states for this graph, i.e. the start state together with states that may be involved in positively matching a string (reachable from the start node and an ancestor of an accepting node). This is intended for display purposes, only showing the paths which might lead to an accepting state, or just the start state if no such paths exist. """ graph = self.as_multidigraph accepting_states = { node for node in graph.node if graph.node[node]['accepting'] } # Add a "sink" node with an in-edge from every accepting state. This is # is solely done because the networkx API makes it easier to find the # ancestor of a node than a set of nodes. sink = object() graph.add_node(sink) for state in accepting_states: graph.add_edge(state, sink) live_states = {self.start} | (nx.ancestors(graph, sink) & nx.descendants(graph, self.start)) return graph.subgraph(live_states)
def test_multidigraph_is_subgraph_without_labels(self): G = nx.MultiDiGraph() G.add_edges_from( [ (1, 2), (2, 3), (3, 4), (2, 4) ] ) valid_subgraph_a = nx.MultiDiGraph() valid_subgraph_a.add_edge(1, 2) valid_subgraph_b = nx.MultiDiGraph() valid_subgraph_b.add_edge(3, 4) invalid_subgraph_a = nx.MultiDiGraph() invalid_subgraph_a.add_edge(1, 4) self.assertTrue(ParsemisMiner.is_subgraph(G, valid_subgraph_a)) self.assertTrue(ParsemisMiner.is_subgraph(G, valid_subgraph_b)) self.assertFalse(ParsemisMiner.is_subgraph(G, invalid_subgraph_a))
def combined_stop_to_stop_transit_network(gtfs, start_time_ut=None, end_time_ut=None): """ Compute stop-to-stop networks for all travel modes and combine them into a single network. The modes of transport are encoded to a single network. The network consists of multiple links corresponding to each travel mode. Walk mode is not included. Parameters ---------- gtfs: gtfspy.GTFS Returns ------- net: networkx.MultiDiGraph keys should be one of route_types.TRANSIT_ROUTE_TYPES (i.e. GTFS route_types) """ multi_di_graph = networkx.MultiDiGraph() for route_type in route_types.TRANSIT_ROUTE_TYPES: graph = stop_to_stop_network_for_route_type(gtfs, route_type, start_time_ut=start_time_ut, end_time_ut=end_time_ut) for from_node, to_node, data in graph.edges(data=True): data['route_type'] = route_type multi_di_graph.add_edges_from(graph.edges(data=True)) multi_di_graph.add_nodes_from(graph.nodes(data=True)) return multi_di_graph
def main(): # sample = ','.join(['MATCH (x:SOMECLASS {bar : "baz"', # 'foo:"goo"})<-[:WHATEVER]-(:ANOTHERCLASS)', # '(y:LASTCLASS) RETURN x.foo, y']) # create = ('CREATE (n:SOMECLASS {foo: "bar", bar: {qux: "baz"}})' # '-[e:EDGECLASS]->(m:ANOTHERCLASS) RETURN n') # create = 'CREATE (n:SOMECLASS {foo: "bar", qux: "baz"}) RETURN n' create_query = ('CREATE (n:SOMECLASS {foo: {goo: "bar"}})' '-[e:EDGECLASS]->(m:ANOTHERCLASS {qux: "foobar", bar: 10}) ' 'RETURN n') test_query = ('MATCH (n:SOMECLASS {foo: {goo: "bar"}})-[e:EDGECLASS]->' '(m:ANOTHERCLASS) WHERE ' 'm.bar = 10 ' 'RETURN n.foo.goo, m.qux, e') # atomic_facts = extract_atomic_facts(test_query) graph_object = nx.MultiDiGraph() my_parser = CypherToNetworkx() for i in my_parser.query(graph_object, create_query): pass # a generator, we need to loop over results to run. for i in my_parser.query(graph_object, test_query): print i
def build_layer2_graph(related_extra=None): """Builds a graph representation of the layer 2 topology stored in the NAV database. :param related_extra Additional selection_related fields :returns: A MultiDiGraph of Netbox nodes, edges annotated with Interface model objects. """ graph = nx.MultiDiGraph(name="Layer 2 topology") select_related = ('netbox', 'to_netbox', 'to_interface') if related_extra: select_related = select_related + related_extra links = Interface.objects.filter( to_netbox__isnull=False).select_related(*select_related) for link in links: dest = link.to_interface.netbox if link.to_interface else link.to_netbox graph.add_edge(link.netbox, dest, key=link) return graph
def test_reduce_simple_case_cam(self): graph = nx.MultiDiGraph(name="simple case cam") graph.add_edge(self.switch_a, self.switch_port_a) graph.add_edge(self.switch_b, self.switch_port_b) graph.add_edge(self.switch_port_a, self.switch_b, "cam") graph.add_edge(self.switch_port_b, self.switch_a, "cam") reducer = AdjacencyReducer(graph) print("input:") print(reducer.format_connections()) reducer.reduce() print("result:") print(reducer.format_connections()) result = reducer.graph assert result.has_edge(self.switch_port_a, self.switch_port_b) assert result.has_edge(self.switch_port_b, self.switch_port_a) assert result.out_degree(self.switch_port_a) == 1 assert result.out_degree(self.switch_port_b) == 1
def test_reduce_simple_case_lldp(self): graph = nx.MultiDiGraph(name="simple case lldp") graph.add_edge(self.switch_a, self.switch_port_a) graph.add_edge(self.switch_b, self.switch_port_b) graph.add_edge(self.switch_port_a, self.switch_port_b, "lldp") graph.add_edge(self.switch_port_b, self.switch_port_a, "lldp") reducer = AdjacencyReducer(graph) print("input:") print(reducer.format_connections()) reducer.reduce() print("result:") print(reducer.format_connections()) result = reducer.graph assert result.has_edge(self.switch_port_a, self.switch_port_b) assert result.has_edge(self.switch_port_b, self.switch_port_a) assert result.out_degree(self.switch_port_a) == 1 assert result.out_degree(self.switch_port_b) == 1
def test_reduce_simple_tree_lldp(self): graph = nx.MultiDiGraph(name="simple tree lldp") graph.add_edge(self.router, self.router_port_a) graph.add_edge(self.router, self.router_port_b) graph.add_edge(self.switch_a, self.switch_port_a) graph.add_edge(self.switch_b, self.switch_port_b) graph.add_edge(self.switch_port_a, self.router_port_a, "lldp") graph.add_edge(self.switch_port_b, self.router_port_b, "lldp") graph.add_edge(self.router_port_a, self.switch_port_a, "lldp") graph.add_edge(self.router_port_b, self.switch_port_b, "lldp") reducer = AdjacencyReducer(graph) print("input:") print(reducer.format_connections()) reducer.reduce() print("result:") print(reducer.format_connections()) result = reducer.graph assert result.has_edge(self.switch_port_a, self.router_port_a) assert result.has_edge(self.switch_port_b, self.router_port_b) assert result.has_edge(self.router_port_a, self.switch_port_a) assert result.has_edge(self.router_port_b, self.switch_port_b) assert result.out_degree(self.switch_port_a) == 1 assert result.out_degree(self.switch_port_b) == 1 assert result.out_degree(self.router_port_a) == 1 assert result.out_degree(self.router_port_b) == 1
def test_no_return_path(self): graph = nx.MultiDiGraph() graph.add_edge(self.switch_a, self.switch_port_a) graph.add_edge(self.switch_b, self.switch_port_b) graph.add_edge(self.switch_port_a, self.switch_b, "cam") reducer = AdjacencyReducer(graph) print("input:") print(reducer.format_connections()) reducer.reduce() print("result:") print(reducer.format_connections()) result = reducer.graph assert result.has_edge(self.switch_port_a, self.switch_b) assert not result.has_edge(self.switch_port_b, self.switch_port_a) assert result.out_degree(self.switch_port_a) == 1 assert self.switch_port_b not in result
def filter_graph(g, cutoff=7.0, min_kihs=2): """ Get subgraph formed from edges that have max_kh_distance < cutoff. Parameters ---------- g : MultiDiGraph representing KIHs g is the output from graph_from_protein cutoff : float Socket cutoff in Angstroms. Default is 7.0. min_kihs : int Minimum number of KIHs shared between all pairs of connected nodes in the graph. Returns ------- networkx.MultiDigraph subgraph formed from edges that have max_kh_distance < cutoff. """ edge_list = [e for e in g.edges(keys=True, data=True) if e[3]['kih'].max_kh_distance <= cutoff] if min_kihs > 0: c = Counter([(e[0], e[1]) for e in edge_list]) # list of nodes that share > min_kihs edges with at least one other node. node_list = set(list(itertools.chain.from_iterable([k for k, v in c.items() if v > min_kihs]))) edge_list = [e for e in edge_list if (e[0] in node_list) and (e[1] in node_list)] return networkx.MultiDiGraph(edge_list)
def __init__(self, root_path): self.code_graph = nx.MultiDiGraph() self.metrics = {} self.root_path = Path(root_path) self.root_arch_ids = [] self.entity_kinds = SortedSet() self.ref_kinds = SortedSet()
def projected_graph(B, nodes, multigraph=False): if B.is_multigraph(): raise nx.NetworkXError("not defined for multigraphs") if B.is_directed(): directed=True if multigraph: G=nx.MultiDiGraph() else: G=nx.DiGraph() else: directed=False if multigraph: G=nx.MultiGraph() else: G=nx.Graph() G.graph.update(B.graph) G.add_nodes_from((n,B.node[n]) for n in nodes) i = 0 nodes = set(nodes) tenpercent = len(nodes) / 10 for u in nodes: if i % tenpercent == 0: logging.info(str(10 * i / tenpercent) + "%") i += 1 nbrs2=set((v for nbr in B[u] for v in B[nbr])) & nodes - set([u]) if multigraph: for n in nbrs2: if directed: links=set(B[u]) & set(B.pred[n]) else: links=set(B[u]) & set(B[n]) for l in links: if not G.has_edge(u,n,l): G.add_edge(u,n,key=l) else: G.add_edges_from((u,n) for n in nbrs2) return G
def test_dict_matches_graph(self): g = nx.MultiDiGraph() g.add_node(1) g.add_node(2) g.add_edge(1, 2, relation='yup') g.add_edge(1, 2, relation='nope') d = {'relation': 'yup'} self.assertTrue(any_subdict_matches(g.edge[1][2], d))
def create_project_graph(project): """ :param Project project: :rtype: nx.MultiDiGraph """ g = nx.MultiDiGraph() assets = list(project.iterate_assets()) add_assets_to_graph(g, assets) return g
def new_graph(): """Get a clean graph.""" graph = networkx.MultiDiGraph() graph.graph = ATTR_GRAPH return graph
def __init__(self): """Create an empty circuit.""" # Map from a wire's name (reg,idx) to a Bool that is True if the # wire is a classical bit and False if the wire is a qubit. self.wire_type = {} # Map from wire names (reg,idx) to input nodes of the graph self.input_map = {} # Map from wire names (reg,idx) to output nodes of the graph self.output_map = {} # Running count of the total number of nodes self.node_counter = 0 # Map of named operations in this circuit and their signatures. # The signature is an integer tuple (nq,nc,np) specifying the # number of input qubits, input bits, and real parameters. # The definition is external to the circuit object. self.basis = {} # Directed multigraph whose nodes are inputs, outputs, or operations. # Operation nodes have equal in- and out-degrees and carry # additional data about the operation, including the argument order # and parameter values. # Input nodes have out-degree 1 and output nodes have in-degree 1. # Edges carry wire labels (reg,idx) and each operation has # corresponding in- and out-edges with the same wire labels. self.multi_graph = nx.MultiDiGraph() # Map of qregs to sizes self.qregs = {} # Map of cregs to sizes self.cregs = {} # Map of user defined gates to ast nodes defining them self.gates = {} # Output precision for printing floats self.prec = 10
def compute_centrality(graph: nx.MultiDiGraph, data: geojson.feature.FeatureCollection, edge_map: Dict): for item in data['features']: edge = edge_map[item['properties']['id']] from_degree = graph.degree(edge[0]) to_degree = graph.degree(edge[1]) item['properties']["from_degree"] = from_degree item['properties']["to_degree"] = to_degree
def _create_edge_map(graph: nx.MultiDiGraph) -> Dict: edge_map = {} for edge in graph.edges(): edge_map[graph[edge[0]][edge[1]][0]["id"]] = edge return edge_map
def graph_multi_test(graph: nx.MultiDiGraph): for edge in graph.edges(): if len(graph[edge[0]][edge[1]]) > 1: a=1
def _load_graph(json_dict: dict) -> nx.MultiDiGraph: g = nx.MultiDiGraph() for item in json_dict['features']: coord = item['geometry']['coordinates'] coord_u = get_node(coord[0]) coord_v = get_node(coord[-1]) if coord_u != coord_v or len(coord) != 2: # prune loops without any purpose, save loops like traffic roundabout lanes = item['properties']['lanes'] g.add_edge(coord_u, coord_v, id=item['properties']['id'], others=[[]], lanes=lanes) return g
def simplify_graph(g: nx.MultiDiGraph, check_lanes): for n, _ in list(g.adjacency()): if g.out_degree(n) == 1 and g.in_degree(n) == 1: # oneways simplify_oneways(n, g, check_lanes) for n, _ in list(g.adjacency()): if g.out_degree(n) == 2 and g.in_degree(n) == 2: # both directions in highway simplify_twoways(n, g, check_lanes)
def load_graph(json_dict): g = nx.MultiDiGraph() for item in json_dict['features']: coord = item['geometry']['coordinates'] coord_u = get_node(coord[0]) coord_v = get_node(coord[-1]) if coord_u != coord_v or len(coord) != 2: # prune loops without any purpose, save loops like traffic roundabout lanes = item['properties']['lanes'] data = item['geometry']['coordinates'][1:-1] if len(data) == 0: data = [] g.add_edge(coord_u, coord_v, id=item['properties']['id'], others=data, lanes=lanes) return g
def load_graph(data: geojson.feature.FeatureCollection) -> nx.MultiDiGraph: g = nx.MultiDiGraph() print_info("Creating networkx graph from geojson") for item in tqdm(data['features'], desc="processing features"): coord = item['geometry']['coordinates'] coord_u = _get_node(coord[0]) coord_v = _get_node(coord[-1]) g.add_edge(coord_u, coord_v, id=item['properties']['id']) return g
def load_graph(json_dict): g = nx.MultiDiGraph() for item in json_dict['features']: coord = item['geometry']['coordinates'] coord_u = get_node(coord[0]) coord_v = get_node(coord[-1]) g.add_edge(coord_u, coord_v, id=item['properties']['id']) return g
def __init__(self, workflow_data, config): self.config = config # Initialize self.graph = nx.MultiDiGraph() # Workflow dictionary self.workflow = workflow_data # Construct graph self._build_initial_workflow_graph() self._build_privacy_restrictions() # Topological sorted tasks according to their dependencies self.sorted_tasks = [] # Spark or COMPSs self.platform = workflow_data.get('platform', {}).get('slug', 'spark') # Verify null edges to topological_sorted_tasks if self.is_there_null_target_id_tasks() \ and self.is_there_null_source_id_tasks(): self.sorted_tasks = self.get_topological_sorted_tasks() else: raise AttributeError( _("Port '{}/{}' must be informed for operation{}").format( self.WORKFLOW_GRAPH_SOURCE_ID_PARAM, self.WORKFLOW_GRAPH_TARGET_ID_PARAM, self.__class__))
def __init__(self, debug=False): self.graph = nx.MultiDiGraph() self._debug = debug
def get_mdg(self): mdg = nx.MultiDiGraph(self.graph) for n in self.graph: node_type = self.get_node_type(n) # Remove all host nodes if node_type == "host": mdg.remove_node(n) return mdg
def as_multidigraph(self): # type: () -> nx.MultiDiGraph """ Constructs a MultiDiGraph that is a copy of self. This is a bit of a hack, but allows some useful methods like .subgraph() to work correctly. """ graph = nx.MultiDiGraph() graph.add_nodes_from(self.nodes(data=True)) graph.add_edges_from(self.edges(data=True)) return graph
def parse_saaf_json(filename): with open(filename) as json_file: saaf_json = json.load(json_file) #print saaf_json G = nx.MultiDiGraph() #graphs = [] i = 0 for invoke, invoke_info in saaf_json.iteritems(): for backtrack in invoke_info: j = 0 for node in backtrack: node = dict((k.lower(), v) for k, v in node.iteritems()) node['parent'] = node['parent'] + i node['nodeid'] = node['nodeid'] + i node['invoke'] = invoke new_node = node['nodeid'] new_edge = (node['parent'], node['nodeid']) if not G.has_node(new_node): G.add_node(new_node, attr_dict = node) if node['parent'] == i-1: node['parent'] = i elif not G.has_edge(*new_edge): G.add_edge(*new_edge) j += 1 i += j #print i #print G.edges() #print json.dumps(json_graph.node_link_data(G), sort_keys = True, indent = 4) #print #graphs.append(G) #return graphs return G
def gdfs_to_graph(gdf_nodes, gdf_edges): """ Convert node and edge GeoDataFrames into a graph Parameters ---------- gdf_nodes : GeoDataFrame gdf_edges : GeoDataFrame Returns ------- networkx multidigraph """ G = nx.MultiDiGraph() G.graph['crs'] = gdf_nodes.crs G.graph['name'] = gdf_nodes.gdf_name.rstrip('_nodes') # add the nodes and their attributes to the graph G.add_nodes_from(gdf_nodes.index) attributes = gdf_nodes.to_dict() for attribute_name in gdf_nodes.columns: # only add this attribute to nodes which have a non-null value for it attribute_values = {k:v for k, v in attributes[attribute_name].items() if pd.notnull(v)} nx.set_node_attributes(G, name=attribute_name, values=attribute_values) # add the edges and attributes that are not u, v, key (as they're added # separately) or null for _, row in gdf_edges.iterrows(): attrs = {} for label, value in row.iteritems(): if (label not in ['u', 'v', 'key']) and (isinstance(value, list) or pd.notnull(value)): attrs[label] = value G.add_edge(u=row['u'], v=row['v'], key=row['key'], **attrs) return G
def get_label_from_edge(g, edge, attribute_name='label'): edge_attributes = g.get_edge_data(edge[0], edge[1]) if edge_attributes is None and nx.is_directed(g): edge_attributes = g.get_edge_data(edge[1], edge[0]) labels = [] if type(g) == nx.MultiDiGraph or type(g) == nx.MultiGraph: for index in edge_attributes: if attribute_name in edge_attributes[index]: labels.append(edge_attributes[index][attribute_name]) else: if attribute_name in edge_attributes: labels.append(edge_attributes[attribute_name]) return labels
def test_multidigraph_is_subgraph_with_labels(self): G = nx.MultiDiGraph() G.add_edges_from( [ (1, 2, {'label': 'a'}), (1, 2, {'label': 'f'}), (2, 3, {'label': 'c'}), (3, 4, {'label': 'd'}), (2, 4, {'label': 'b'}) ] ) valid_subgraph_a = nx.MultiDiGraph() valid_subgraph_a.add_edge(1, 2, label='a') valid_subgraph_b = nx.MultiDiGraph() valid_subgraph_b.add_edge(1, 2, label='f') valid_subgraph_c = nx.MultiDiGraph() valid_subgraph_c.add_edge(3, 4, label='d') invalid_subgraph_a = nx.MultiDiGraph() invalid_subgraph_a.add_edge(1, 2, label='b') invalid_subgraph_b = nx.MultiDiGraph() invalid_subgraph_b.add_edge(2, 1, label='a') invalid_subgraph_c = nx.MultiDiGraph() invalid_subgraph_c.add_edge(1, 4, label='a') self.assertTrue(ParsemisMiner.is_subgraph(G, valid_subgraph_a)) self.assertTrue(ParsemisMiner.is_subgraph(G, valid_subgraph_b)) self.assertTrue(ParsemisMiner.is_subgraph(G, valid_subgraph_c)) self.assertFalse(ParsemisMiner.is_subgraph(G, invalid_subgraph_a)) self.assertFalse(ParsemisMiner.is_subgraph(G, invalid_subgraph_b)) self.assertFalse(ParsemisMiner.is_subgraph(G, invalid_subgraph_c))
def test_frequent_graph(self): G = nx.MultiDiGraph() G.add_edge(1, 2, label='a') G.add_edge(1, 2, label='b') fg = FrequentGraph(G, []) print("Name: %s" % fg.to_string())
def chess_pgn_graph(pgn_file="chess_masters_WCC.pgn.bz2"): """Read chess games in pgn format in pgn_file. Filenames ending in .gz or .bz2 will be uncompressed. Return the MultiDiGraph of players connected by a chess game. Edges contain game data in a dict. """ import bz2 G=nx.MultiDiGraph() game={} datafile = bz2.BZ2File(pgn_file) lines = (line.decode().rstrip('\r\n') for line in datafile) for line in lines: if line.startswith('['): tag,value=line[1:-1].split(' ',1) game[str(tag)]=value.strip('"') else: # empty line after tag set indicates # we finished reading game info if game: white=game.pop('White') black=game.pop('Black') G.add_edge(white, black, **game) game={} return G
def test_combined_stop_to_stop_transit_network(self): multi_di_graph = networks.combined_stop_to_stop_transit_network(self.gtfs) self.assertIsInstance(multi_di_graph, networkx.MultiDiGraph) for from_node, to_node, data in multi_di_graph.edges(data=True): self.assertIn("route_type", data)
def test_upper(self): """Test we can parse a CREATE... RETURN query.""" g = nx.MultiDiGraph() query = 'CREATE (n:SOMECLASS) RETURN n' test_parser = python_cypher.CypherToNetworkx() test_parser.query(g, query)
def test_create_node(self): """Test we can build a query and create a node""" g = nx.MultiDiGraph() query = 'CREATE (n) RETURN n' test_parser = python_cypher.CypherToNetworkx() for i in test_parser.query(g, query): pass self.assertEqual(len(g.node), 1)