我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用rdflib.URIRef()。
def get_do_metadata(): # from the do owl file, get do labels, descriptions g = Graph() g.parse(DO_OWL_PATH) disease_ontology = Literal('disease_ontology', datatype=URIRef('http://www.w3.org/2001/XMLSchema#string')) query = """ SELECT * WHERE { ?id oboInOwl:hasOBONamespace ?disease_ontology . ?id rdfs:label ?label . OPTIONAL {?id obo:IAO_0000115 ?descr} FILTER NOT EXISTS {?id owl:deprecated ?dep} } """ rows = g.query(query, initBindings={'disease_ontology': disease_ontology}) res = [{str(k): str(v) for k, v in binding.items()} for binding in rows.bindings] df = pd.DataFrame(res) df.drop_duplicates(subset=['id'], inplace=True) df.fillna("", inplace=True) do = df.to_dict("records") do = {purl_to_curie(x['id']): x for x in do} return do
def result_set_2_rdf(self, result_set, reduced_mapping, fragment): for tweet in result_set: for s, p, o in reduced_mapping.mapping: subject = s obj = o splited_subject = subject.split('{') subject_prefix = splited_subject[0] subject_jsonpath = parse(splited_subject[1].split('}')[0]) subject_values = [match.value for match in subject_jsonpath.find(tweet)] if '$.' in obj: object_jsonpath = parse(obj.split('{')[0].split('}')[0]) object_values = [match.value for match in object_jsonpath.find(tweet)] for object_value in object_values: fragment.add_data_triple(URIRef("%s%s" % (subject_prefix, subject_values[0])), p, Literal(object_value)) else: fragment.add_data_triple(URIRef("%s%s" % (subject_prefix, subject_values[0])), p, obj)
def result_set_2_rdf(self, result_set, reduced_mapping, fragment): for repo in result_set: for s, p, o in reduced_mapping.mapping: subject = s obj = o splited_subject = subject.split('{') subject_prefix = splited_subject[0] subject_jsonpath = parse(splited_subject[1].split('}')[0]) subject_values = [match.value for match in subject_jsonpath.find(repo)] if '$.' in obj: object_jsonpath = parse(obj.split('{')[0].split('}')[0]) object_values = [match.value for match in object_jsonpath.find(repo)] for object_value in object_values: fragment.add_data_triple(URIRef("%s%s" % (subject_prefix, subject_values[0])), p, Literal(object_value)) else: fragment.add_data_triple(URIRef("%s%s" % (subject_prefix, subject_values[0])), p, obj)
def _preprocess_mapping(self): resources = [] for s in self.mapping.subjects(): subject = None if isinstance(s, URIRef) and s not in resources: resources.append(s) for node in self.mapping.objects(subject=s, predicate=rr.subjectMap): for template in self.mapping.objects(subject=node, predicate=rr.template): subject = template for type_class in self.mapping.objects(subject=node, predicate=rr['class']): self.preprocessed_mapping.add((subject, RDF.type, type_class)) for node in self.mapping.objects(subject=s, predicate=rr.predicateObjectMap): predicate = None for predicate_object in self.mapping.objects(subject=node, predicate=rr.predicate): predicate = predicate_object for object_map in self.mapping.objects(subject=node, predicate=rr.objectMap): for reference in self.mapping.objects(subject=object_map, predicate=xrr.reference): self.preprocessed_mapping.add((subject, predicate, reference)) for node in self.mapping.objects(subject=s, predicate=xrr.logicalSource): subject_prefix = subject.split('{')[0] self.logical_sources[subject_prefix] = {} for query in self.mapping.objects(subject=node, predicate=xrr.query): self.logical_sources[subject_prefix]['query'] = query for iterator in self.mapping.objects(subject=node, predicate=rml.iterator): self.logical_sources[subject_prefix]['iterator'] = iterator
def getgraphcontent(self, graphuri): """Get the serialized content of a named graph. Args: graphuri: The URI of a named graph. Returns: content: A list of strings where each string is a quad. """ data = [] context = self.store.get_context(URIRef(graphuri)) triplestring = context.serialize(format='nt').decode('UTF-8') # Since we have triples here, we transform them to quads by adding the graphuri # TODO This might cause problems if ' .\n' will be part of a literal. # Maybe a regex would be a better solution triplestring = triplestring.replace(' .\n', ' <' + graphuri + '> .\n') data = triplestring.splitlines() data.remove('') return data
def test_create_graph(): """Create new graphFactory Object""" from sc import graphManager PROV = Namespace("http://www.w3.org/ns/prov#") tstregistry = graphManager.VocabularyRegistry() vocab1 = Vocabulary1() tstregistry.register(vocab1) vocab2 = Vocabulary2() tstregistry.register(vocab2) tstregistry.build_graph() print tstregistry.get_turtle() # Check assertions in global graph store assert (URIRef("http://orcid.org/000-0003-4901-6059"), RDF.type, PROV.Person) in tstregistry.global_graph assert (URIRef(uuidurn), RDFS.label, Literal( "Docker: https://www.docker.com/")) in tstregistry.global_graph # Check Serialization jsongraph = json.loads(tstregistry.get_json_ld()) assert '@context' in jsongraph
def __addAction(self, action, statement): element = self.trans.createElement(action) for item in statement: if isinstance(item, Literal): literal = self.trans.createElement("literal") if item.datatype is not None: literal.setAttribute("datatype", str(item.datatype)) if item.language is not None: literal.setAttribute("xml:lang", str(item.language)) literal.appendChild(self.trans.createTextNode(str(item))) element.appendChild(literal) elif isinstance(item, URIRef): uri = self.trans.createElement("uri") uri.appendChild(self.trans.createTextNode(str(item))) element.appendChild(uri) elif isinstance(item, BNode): bnode = self.trans.createElement("bnode") bnode.appendChild(self.trans.createTextNode(str(item))) element.appendChild(bnode) else: raise Exception("Unknown element: " + item) self.trans.childNodes[0].appendChild(element)
def __setattr__(self, name, values): self._objectGraph._load(self.uri) unwrappedValues = [] for value in values: # unwrap rdfobjects: if isinstance(value, RDFObject): unwrappedValues.append(value.uri) # pass through rdflib objects: elif isinstance(value, URIRef) or isinstance(value, BNode) or isinstance(value, Literal): unwrappedValues.append(value) # wrap literals: else: unwrappedValues.append(Literal(value)) # look for a property mapping for this name: prop = self._getProp(name) if name.startswith("r_"): self._objectGraph._setSubjects(unwrappedValues, prop, self.uri) else: self._objectGraph._setObjects(self.uri, prop, unwrappedValues)
def _object_value_int(self, subject, predicate): ''' Given a subject and a predicate, returns the value of the object as an integer Both subject and predicate must be rdflib URIRef or BNode objects If the value can not be parsed as intger, returns None ''' object_value = self._object_value(subject, predicate) if object_value: try: return int(object_value) except ValueError: pass return None
def _contact_details(self, subject, predicate): ''' Returns a dict with details about a vcard expression Both subject and predicate must be rdflib URIRef or BNode objects Returns keys for uri, name and email with the values set to None if they could not be found ''' contact = {} for agent in self.g.objects(subject, predicate): contact['uri'] = (unicode(agent) if isinstance(agent, rdflib.term.URIRef) else None) contact['name'] = self._object_value(agent, VCARD.fn) contact['email'] = self._object_value(agent, VCARD.hasEmail) return contact
def graph_from_catalog(self, catalog_dict=None): ''' Creates a graph for the catalog (CKAN site) using the loaded profiles The class RDFLib graph (accessible via `serializer.g`) will be updated by the loaded profiles. Returns the reference to the catalog, which will be an rdflib URIRef. ''' catalog_ref = URIRef(catalog_uri()) for profile_class in self._profiles: profile = profile_class(self.g, self.compatibility_mode) profile.graph_from_catalog(catalog_dict, catalog_ref) return catalog_ref
def test_object_list(self): p = RDFProfile(_default_graph()) p.g.add((URIRef('http://example.org/datasets/1'), DCAT.keyword, Literal('space'))) p.g.add((URIRef('http://example.org/datasets/1'), DCAT.keyword, Literal('moon'))) value = p._object_value_list(URIRef('http://example.org/datasets/1'), DCAT.keyword) assert isinstance(value, list) assert isinstance(value[0], unicode) eq_(len(value), 2) eq_(sorted(value), ['moon', 'space'])
def test_publisher_ref(self): data = '''<?xml version="1.0" encoding="utf-8" ?> <rdf:RDF xmlns:dct="http://purl.org/dc/terms/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:rdfs="http://www.w3.org/2000/01/rdf-schema#"> <rdfs:SomeClass rdf:about="http://example.org"> <dct:publisher rdf:resource="http://orgs.vocab.org/some-org" /> </rdfs:SomeClass> </rdf:RDF> ''' g = Graph() g.parse(data=data) p = RDFProfile(g) publisher = p._publisher(URIRef('http://example.org'), DCT.publisher) eq_(publisher['uri'], 'http://orgs.vocab.org/some-org')
def test_graph_from_catalog_dict(self): catalog_dict = { 'title': 'My Catalog', 'description': 'An Open Data Catalog', 'homepage': 'http://example.com', 'language': 'de', } s = RDFSerializer() g = s.g catalog = s.graph_from_catalog(catalog_dict) eq_(unicode(catalog), utils.catalog_uri()) # Basic fields assert self._triple(g, catalog, RDF.type, DCAT.Catalog) assert self._triple(g, catalog, DCT.title, catalog_dict['title']) assert self._triple(g, catalog, DCT.description, catalog_dict['description']) assert self._triple(g, catalog, FOAF.homepage, URIRef(catalog_dict['homepage'])) assert self._triple(g, catalog, DCT.language, catalog_dict['language'])
def test_dataset_license_from_distribution_by_uri(self): # license_id retrieved from the URI of dcat:license object g = Graph() dataset = URIRef("http://example.org/datasets/1") g.add((dataset, RDF.type, DCAT.Dataset)) distribution = URIRef("http://example.org/datasets/1/ds/1") g.add((dataset, DCAT.distribution, distribution)) g.add((distribution, RDF.type, DCAT.Distribution)) g.add((distribution, DCT.license, URIRef("http://www.opendefinition.org/licenses/cc-by"))) p = RDFParser(profiles=['euro_dcat_ap']) p.g = g dataset = [d for d in p.datasets()][0] eq_(dataset['license_id'], 'cc-by')
def test_dataset_license_from_distribution_by_title(self): # license_id retrieved from dct:title of dcat:license object g = Graph() dataset = URIRef("http://example.org/datasets/1") g.add((dataset, RDF.type, DCAT.Dataset)) distribution = URIRef("http://example.org/datasets/1/ds/1") g.add((distribution, RDF.type, DCAT.Distribution)) g.add((dataset, DCAT.distribution, distribution)) license = BNode() g.add((distribution, DCT.license, license)) g.add((license, DCT.title, Literal("Creative Commons Attribution"))) p = RDFParser(profiles=['euro_dcat_ap']) p.g = g dataset = [d for d in p.datasets()][0] eq_(dataset['license_id'], 'cc-by')
def test_distribution_access_url(self): g = Graph() dataset1 = URIRef("http://example.org/datasets/1") g.add((dataset1, RDF.type, DCAT.Dataset)) distribution1_1 = URIRef("http://example.org/datasets/1/ds/1") g.add((distribution1_1, RDF.type, DCAT.Distribution)) g.add((distribution1_1, DCAT.accessURL, Literal('http://access.url.org'))) g.add((dataset1, DCAT.distribution, distribution1_1)) p = RDFParser(profiles=['euro_dcat_ap']) p.g = g datasets = [d for d in p.datasets()] resource = datasets[0]['resources'][0] eq_(resource['url'], u'http://access.url.org') assert 'download_url' not in resource
def test_distribution_both_access_and_download_url(self): g = Graph() dataset1 = URIRef("http://example.org/datasets/1") g.add((dataset1, RDF.type, DCAT.Dataset)) distribution1_1 = URIRef("http://example.org/datasets/1/ds/1") g.add((distribution1_1, RDF.type, DCAT.Distribution)) g.add((distribution1_1, DCAT.accessURL, Literal('http://access.url.org'))) g.add((distribution1_1, DCAT.downloadURL, Literal('http://download.url.org'))) g.add((dataset1, DCAT.distribution, distribution1_1)) p = RDFParser(profiles=['euro_dcat_ap']) p.g = g datasets = [d for d in p.datasets()] resource = datasets[0]['resources'][0] eq_(resource['url'], u'http://access.url.org') eq_(resource['download_url'], u'http://download.url.org')
def test_distribution_format_imt_and_format(self): g = Graph() dataset1 = URIRef("http://example.org/datasets/1") g.add((dataset1, RDF.type, DCAT.Dataset)) distribution1_1 = URIRef("http://example.org/datasets/1/ds/1") g.add((distribution1_1, RDF.type, DCAT.Distribution)) g.add((distribution1_1, DCAT.mediaType, Literal('text/csv'))) g.add((distribution1_1, DCT['format'], Literal('CSV'))) g.add((dataset1, DCAT.distribution, distribution1_1)) p = RDFParser(profiles=['euro_dcat_ap']) p.g = g datasets = [d for d in p.datasets()] resource = datasets[0]['resources'][0] eq_(resource['format'], u'CSV') eq_(resource['mimetype'], u'text/csv')
def test_distribution_format_format_only(self): g = Graph() dataset1 = URIRef("http://example.org/datasets/1") g.add((dataset1, RDF.type, DCAT.Dataset)) distribution1_1 = URIRef("http://example.org/datasets/1/ds/1") g.add((distribution1_1, RDF.type, DCAT.Distribution)) g.add((distribution1_1, DCT['format'], Literal('CSV'))) g.add((dataset1, DCAT.distribution, distribution1_1)) p = RDFParser(profiles=['euro_dcat_ap']) p.g = g datasets = [d for d in p.datasets()] resource = datasets[0]['resources'][0] eq_(resource['format'], u'CSV')
def test_distribution_format_imt_only(self): g = Graph() dataset1 = URIRef("http://example.org/datasets/1") g.add((dataset1, RDF.type, DCAT.Dataset)) distribution1_1 = URIRef("http://example.org/datasets/1/ds/1") g.add((distribution1_1, RDF.type, DCAT.Distribution)) g.add((distribution1_1, DCAT.mediaType, Literal('text/csv'))) g.add((dataset1, DCAT.distribution, distribution1_1)) p = RDFParser(profiles=['euro_dcat_ap']) p.g = g datasets = [d for d in p.datasets()] resource = datasets[0]['resources'][0] if toolkit.check_ckan_version(min_version='2.3'): eq_(resource['format'], u'CSV') eq_(resource['mimetype'], u'text/csv') else: eq_(resource['format'], u'text/csv')
def test_distribution_format_imt_only_normalize_false(self): g = Graph() dataset1 = URIRef("http://example.org/datasets/1") g.add((dataset1, RDF.type, DCAT.Dataset)) distribution1_1 = URIRef("http://example.org/datasets/1/ds/1") g.add((distribution1_1, RDF.type, DCAT.Distribution)) g.add((distribution1_1, DCAT.mediaType, Literal('text/csv'))) g.add((dataset1, DCAT.distribution, distribution1_1)) p = RDFParser(profiles=['euro_dcat_ap']) p.g = g datasets = [d for d in p.datasets()] resource = datasets[0]['resources'][0] eq_(resource['format'], u'text/csv') eq_(resource['mimetype'], u'text/csv')
def test_distribution_format_unknown_imt(self): g = Graph() dataset1 = URIRef("http://example.org/datasets/1") g.add((dataset1, RDF.type, DCAT.Dataset)) distribution1_1 = URIRef("http://example.org/datasets/1/ds/1") g.add((distribution1_1, RDF.type, DCAT.Distribution)) g.add((distribution1_1, DCAT.mediaType, Literal('text/unknown-imt'))) g.add((dataset1, DCAT.distribution, distribution1_1)) p = RDFParser(profiles=['euro_dcat_ap']) p.g = g datasets = [d for d in p.datasets()] resource = datasets[0]['resources'][0] eq_(resource['format'], u'text/unknown-imt') eq_(resource['mimetype'], u'text/unknown-imt')
def test_distribution_format_imt_normalized(self): g = Graph() dataset1 = URIRef("http://example.org/datasets/1") g.add((dataset1, RDF.type, DCAT.Dataset)) distribution1_1 = URIRef("http://example.org/datasets/1/ds/1") g.add((distribution1_1, RDF.type, DCAT.Distribution)) g.add((distribution1_1, DCAT.mediaType, Literal('text/unknown-imt'))) g.add((dataset1, DCAT.distribution, distribution1_1)) p = RDFParser(profiles=['euro_dcat_ap']) p.g = g datasets = [d for d in p.datasets()] resource = datasets[0]['resources'][0] eq_(resource['format'], u'text/unknown-imt') eq_(resource['mimetype'], u'text/unknown-imt')
def test_distribution_format_format_normalized(self): g = Graph() dataset1 = URIRef("http://example.org/datasets/1") g.add((dataset1, RDF.type, DCAT.Dataset)) distribution1_1 = URIRef("http://example.org/datasets/1/ds/1") g.add((distribution1_1, RDF.type, DCAT.Distribution)) g.add((distribution1_1, DCAT.mediaType, Literal('text/csv'))) g.add((distribution1_1, DCT['format'], Literal('Comma Separated Values'))) g.add((dataset1, DCAT.distribution, distribution1_1)) p = RDFParser(profiles=['euro_dcat_ap']) p.g = g datasets = [d for d in p.datasets()] resource = datasets[0]['resources'][0] if toolkit.check_ckan_version(min_version='2.3'): eq_(resource['format'], u'CSV') eq_(resource['mimetype'], u'text/csv') else: eq_(resource['format'], u'Comma Separated Values')
def test_spatial_rdfs_label(self): g = Graph() dataset = URIRef('http://example.org/datasets/1') g.add((dataset, RDF.type, DCAT.Dataset)) spatial_uri = URIRef('http://geonames/Newark') g.add((dataset, DCT.spatial, spatial_uri)) g.add((spatial_uri, RDF.type, DCT.Location)) g.add((spatial_uri, RDFS.label, Literal('Newark'))) p = RDFParser(profiles=['euro_dcat_ap']) p.g = g datasets = [d for d in p.datasets()] extras = self._extras(datasets[0]) eq_(extras['spatial_text'], 'Newark')
def test_spatial_wkt_only(self): g = Graph() dataset = URIRef('http://example.org/datasets/1') g.add((dataset, RDF.type, DCAT.Dataset)) spatial_uri = URIRef('http://geonames/Newark') g.add((dataset, DCT.spatial, spatial_uri)) g.add((spatial_uri, RDF.type, DCT.Location)) g.add((spatial_uri, LOCN.geometry, Literal('POINT (67 89)', datatype=GSP.wktLiteral))) p = RDFParser(profiles=['euro_dcat_ap']) p.g = g datasets = [d for d in p.datasets()] extras = self._extras(datasets[0]) # NOTE: geomet returns floats for coordinates on WKT -> GeoJSON eq_(extras['spatial'], '{"type": "Point", "coordinates": [67.0, 89.0]}')
def test_spatial_uri_only(self): g = Graph() dataset = URIRef('http://example.org/datasets/1') g.add((dataset, RDF.type, DCAT.Dataset)) spatial_uri = URIRef('http://geonames/Newark') g.add((dataset, DCT.spatial, spatial_uri)) p = RDFParser(profiles=['euro_dcat_ap']) p.g = g datasets = [d for d in p.datasets()] extras = self._extras(datasets[0]) eq_(extras['spatial_uri'], 'http://geonames/Newark') assert_true('spatial_text' not in extras) assert_true('spatial' not in extras)
def add_property_definition(self, concept: RF2Files.Concept, concept_uri: URIRef) -> None: """ Add a property definition :param concept: Concept entry for the given property :param concept_uri: Concept URI :return: """ parents = [parent for parent in self._relationships.parents(concept.id) if concept.id != Concept_model_attribute_sctid] if len(parents) > 1 and concept.definitionStatusId == Defined_sctid: target, collection = intersection(self) [collection.append(as_uri(parent)) for parent in parents] self.add_t((concept_uri, OWL.equivalentProperty, target), self._stats.num_properties) else: [self.add_t((concept_uri, RDFS.subPropertyOf, as_uri(parent)), self._stats.num_properties) for parent in parents] # add an owl:propertyChain assertion for $subject if is in the RIGHT_ID if concept.id in self._context.RIGHT_ID: node = BNode() self.add_t((node, RDFS.subPropertyOf, concept_uri), None) coll = BNode() Collection(self, coll, [concept_uri, as_uri(self._context.RIGHT_ID[concept.id])]) self.add_t((node, OWL.propertyChain, coll), self._stats.num_propchains)
def verify_virtual_columns(sub, g, orig_value_str, encoded_value_str): v1_triples = list(g.triples((sub, PRE_NS['v1p{}'.format(encoded_value_str)], None))) assert len(v1_triples) == 1 assert "v1p{}".format(encoded_value_str) in str(v1_triples[0][1]) assert orig_value_str == str(v1_triples[0][2]) v2_triples = list(g.triples((sub, PRE_NS['v2p{}'.format(encoded_value_str)], None))) assert len(v2_triples) == 1 assert "v2p{}".format(encoded_value_str) in str(v2_triples[0][1]) assert 'v2v{}'.format(encoded_value_str) in str(v2_triples[0][2]) # Standalone virtual column standalone_sub = URIRef('http://www.example.org/v3s{}'.format(encoded_value_str)) v3_triples = list(g.triples((standalone_sub, None, None))) assert len(v3_triples) == 1 assert "v3p{}".format(encoded_value_str) in str(v3_triples[0][1]) assert 'v3v{}'.format(encoded_value_str) in str(v3_triples[0][2])
def setUp(self): self.ingester = mods.MODSIngester(source=SAMPLE_MODS) self.entity = self.ingester.__generate_uri__() self.cc = rdflib.URIRef("http://coloradocollege.edu/") bc_org = getattr(NS_MGR.kds, "bf-Organization") self.ingester.rules_graph.add(( bc_org, NS_MGR.rdf.type, NS_MGR.kds.PropertyLinker)) self.held_by = rdflib.BNode() self.ingester.rules_graph.add(( bc_org, NS_MGR.kds.destPropUri, self.held_by)) self.ingester.rules_graph.add(( self.held_by, NS_MGR.bf.heldBy, self.cc)) self.ingester.rules_graph.add(( bc_org, NS_MGR.kds.destClassUri, NS_MGR.bf.Item))
def setUp(self): self.graph = rdflib.Graph() self.entity = rdflib.URIRef("https://bibcat.org/test-entity") self.simple_title_bnode = rdflib.BNode() self.graph.add((self.entity, rdflib.RDF.type, BF.Title)) self.graph.add((self.entity, BF.title, self.simple_title_bnode)) self.graph.add((self.simple_title_bnode, BF.mainTitle, rdflib.Literal("This is a test"))) self.top_title_bnode = rdflib.BNode() self.graph.add((self.entity, BF.title, self.top_title_bnode)) secondary_title_bnode = rdflib.BNode() self.graph.add((self.top_title_bnode, rdflib.RDF.type, BF.Topic)) self.graph.add((self.top_title_bnode, rdflib.RDFS.label, rdflib.Literal("This is a title and a name"))) self.graph.add((self.top_title_bnode, SCHEMA.name, secondary_title_bnode)) self.graph.add((secondary_title_bnode, rdflib.RDF.value, rdflib.Literal("This is a name")))
def setUp(self): self.graph = rdflib.Graph() self.entity_one = rdflib.URIRef("https://bibcat.org/test-entity") self.graph.add((self.entity_one, rdflib.RDF.type, rdflib.RDFS.Resource)) self.graph.add((self.entity_one, rdflib.RDFS.label, rdflib.Literal("Test Entity One", lang="en"))) self.entity_two = rdflib.URIRef("https://bibcat.org/test-entity-two") self.graph.add((self.entity_two, rdflib.RDF.type, rdflib.RDFS.Resource)) self.graph.add((self.entity_two, rdflib.RDFS.label, rdflib.Literal("Test Entity Two", lang="en"))) title_bnode = rdflib.BNode() self.graph.add((self.entity_two, BF.title, title_bnode)) self.graph.add((title_bnode, rdflib.RDF.type, BF.Title)) self.graph.add((title_bnode, BF.subTitle, rdflib.Literal("Subtitle ")))
def __generate_object_term__(self, datatype, value): """Internal method takes a datatype (can be None) and returns the RDF Object Term Args: ----- datatype: None, or rdflib.URIRef value: Varys depending on ingester """ if datatype == NS_MGR.xsd.anyURI: term = rdflib.URIRef(value) elif datatype: term = rdflib.Literal(value, datatype=datatype) else: term = rdflib.Literal(value) return term
def __handle_parents__(self, **kwargs): """Internal method handles parentTriplesMaps Keyword args: ------------- parent_map: SimpleNamespace of ParentTriplesMap subject: rdflib.URIRef or rdflib.BNode predicate: rdflib.URIRef """ parent_map = kwargs.pop("parent_map") subject = kwargs.pop('subject') predicate = kwargs.pop('predicate') parent_objects = self.execute( self.triple_maps[str(parent_map)], **kwargs) for parent_obj in parent_objects: if parent_obj == subject: continue self.output.add(( subject, predicate, parent_obj))
def __generate_reference__(self, triple_map, **kwargs): """Generates a RDF entity based on triple map Args: triple_map(SimpleNamespace): Triple Map """ raw_value = self.source.get(str(triple_map.reference)) if raw_value is None or len(raw_value) < 1: return if hasattr(triple_map, "datatype"): if triple_map.datatype == NS_MGR.xsd.anyURI: output = rdflib.URIRef(raw_value) else: output = rdflib.Literal( raw_value, datatype=triple_map.datatype) else: output = rdflib.Literal(raw_value) return output
def __reference_handler__(self, **kwargs): """Internal method for handling rr:reference in triples map Keyword Args: ------------- predicate_obj_map: SimpleNamespace obj: dict subject: rdflib.URIRef """ subjects = [] pred_obj_map = kwargs.get("predicate_obj_map") obj = kwargs.get("obj") subject = kwargs.get("subject") if pred_obj_map.reference is None: return subjects predicate = pred_obj_map.predicate ref_exp = jsonpath_ng.parse(str(pred_obj_map.refernce)) found_objects = [r.value for r in ref_exp(obj)] for row in found_objects: self.output.add((subject, predicate, rdflib.Literal(row)))
def __generate_reference__(self, triple_map, **kwargs): """Internal method takes a triple_map and returns the result of applying to XPath to the current DOM context Args: ----- triple_map: SimpleNamespace element: etree.Element """ element = kwargs.get("element") found_elements = element.xpath( triple_map.reference, namespaces=self.xml_ns) for elem in found_elements: raw_text = elem.text.strip() #! Quick and dirty test for valid URI if not raw_text.startswith("http"): continue return rdflib.URIRef(raw_text)
def __get_object__(binding): """Method takes a binding extracts value and returns rdflib entity Args: binding: binding row """ if isinstance(binding, rdflib.term.Node): return binding elif isinstance(binding, collections.Iterable): for key, row in binding.items(): if isinstance(row, (rdflib.URIRef, rdflib.Literal)): return row elif isinstance(row, dict): if row.get('type').startswith('uri'): return rdflib.URIRef(row.get('value')) return rdflib.Literal(row.get('value')) elif isinstance(row, tuple): print(row) elif isinstance(row, str): if row.startswith("literal") or "xml:lang" in key: continue return rdflib.Literal(row)
def new_existing_bnode(self, bf_property, rule): """Returns existing blank node or a new if it doesn't exist Args: bf_property (str): RDF property URI rule (rdflib.URIRef): RDF subject of the map rule Returns: rdflib.BNode: Existing or New blank node """ blank_node = None for row in self.rules_graph.query(HAS_MULTI_NODES.format(rule)): if str(row[0]).lower().startswith("true"): return rdflib.BNode() for subject in self.graph.query(GET_BLANK_NODE.format(bf_property)): # set to first and exist loop blank_node = subject[0] break if not blank_node: blank_node = rdflib.BNode() return blank_node
def populate_entity(self, bf_class, existing_uri=None): """Takes a BIBFRAME graph and MODS XML, extracts info for each entity's property and adds to graph. Args: bf_class(rdflib.URIRef): Namespace URI Returns: rdflib.URIRef: URI of new entity """ if existing_uri: entity_uri = existing_uri else: # Check for custom IRIPattern entity_uri = self.__pattern_uri__(bf_class) # Finally generate an IRI from the default patterns if not entity_uri: entity_uri = self.__generate_uri__() self.graph.add((entity_uri, rdflib.RDF.type, bf_class)) self.update_linked_classes(bf_class, entity_uri) self.update_direct_properties(bf_class, entity_uri) self.update_ordered_linked_classes(bf_class, entity_uri) self.add_admin_metadata(entity_uri) self.clean_rdf_types() return entity_uri
def update_direct_properties(self, entity_class, entity): """Update the graph by adding all direct literal properties of the entity in the graph. Args: entity_class (url): URL of the entity's class entity (rdflib.URIRef): RDFlib Entity """ sparql = GET_DIRECT_PROPS.format(entity_class) for dest_prop, rule in self.rules_graph.query(sparql): self.__handle_pattern__( entity=entity, rule=rule, destination_property=dest_prop)
def add_admin_metadata(self, entity): """Takes a graph and adds the AdminMetadata for the entity Args: entity (rdflib.URIRef): URI of the entity """ generate_msg = "Generated by BIBCAT version {} from KnowledgeLinks.io" generation_process = rdflib.BNode() self.graph.add((generation_process, rdflib.RDF.type, NS_MGR.bf.GenerationProcess)) self.graph.add((generation_process, NS_MGR.bf.generationDate, rdflib.Literal( datetime.datetime.utcnow().isoformat()))) self.graph.add((generation_process, rdflib.RDF.value, rdflib.Literal(generate_msg.format(__version__), lang="en"))) #! Should add bibcat's current git MD5 commit self.graph.add( (entity, NS_MGR.bf.generationProcess, generation_process) )
def transform(self, source=None, instance_uri=None, item_uri=None): """Takes new source, sets new graph, and creates a BF.Instance and BF.Item entities Args: source: New source, could be URL, XML, or CSV row instance_uri(rdflib.URIRef): Existing Instance URI, defaults to None item_uri(rdflib.URIRef): Existing Item URI, defaults to None Returns: tuple: BIBFRAME Instance and Item """ if source is not None: self.source = source self.graph = new_graph() bf_instance = self.populate_entity(NS_MGR.bf.Instance, instance_uri) bf_item = self.populate_entity(NS_MGR.bf.Item, item_uri) self.graph.add((bf_item, NS_MGR.bf.itemOf, bf_instance)) return bf_instance, bf_item
def __link_subject__(self, term, subject_iri): """Function takes a term and queries LOC service Args: term(str): Term subject_iri(rdflib.URIRef): Subject IRI """ subject_result = self.__build_lc_url__( term, "http://id.loc.gov/authorities/subjects") lsch_iri, title = self.__process_loc_results__( subject_result.json(), term) if lsch_iri is None: return None, None entities = [] for row in self.graph.subjects(predicate=BF.subject, object=subject_iri): entities.append(row) for entity in entities: self.graph.add((entity, BF.subject, lsch_iri)) bibcat.delete_iri(self.graph, subject_iri) return lsch_iri, title return None, None
def __top_result__(query_result, type_=None, class_=None): """Internal function takes a JSON query results and returns the top result as a rdflib.URIRef IRI if more than one. Args: ---- query_result(dict): Query result """ if query_result.get("totalResultsCount", 0) > 0: print(query_result.get("geonames")[0]) top_result = query_result.get("geonames")[0] geo_id = top_result.get("geonameId") place_iri = rdflib.URIRef("{}{}/".format(IRI_BASE, geo_id)) if type_ is not None and type_.startswith("rdf"): output = rdflib.Graph() rdf_type = rdflib.RDFS.Resource if class_ is not None: rdf_type = class_ output.add((place_iri, rdflib.RDF.type, rdf_type)) output.add((place_iri, rdflib.RDFS.label, rdflib.Literal(top_result.get("name")))) return output return place_iri
def replace_iri(graph, old_iri, new_iri): """Replaces old IRI with a new IRI in the graph Args: ---- graph: rdflib.Graph old_iri: rdflib.URIRef, Old IRI new_iri: rdflib.URIRef, New IRI """ if old_iri == new_iri: # Otherwise deletes all occurrences of the iri in the # graph return for pred, obj in graph.predicate_objects(subject=old_iri): graph.add((new_iri, pred, obj)) graph.remove((old_iri, pred, obj)) for subj, pred in graph.subject_predicates(object=old_iri): graph.add((subj, pred, new_iri)) graph.remove((subj, pred, old_iri))
def __add_creators__(self, work_graph, work_uri, instance_uri): """Method takes a new work graph and instance uri, queries for relators:creators of instance uri and adds values to work graph Args: work_graph(rdflib.Graph): RDF Graph of new BF Work instance_uri(rdflib.URIRef): URI of BF Instance """ instance_key = str(instance_uri) if instance_key in self.processed: for code in self.creator_codes: if not code in self.processed[instance_key]: continue relator = getattr(NS_MGR.relators, code) for agent_uri in self.processed[instance_key][code]: work_graph.add((work_uri, relator, agent_uri))