我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用rdflib.Literal()。
def get_do_metadata(): # from the do owl file, get do labels, descriptions g = Graph() g.parse(DO_OWL_PATH) disease_ontology = Literal('disease_ontology', datatype=URIRef('http://www.w3.org/2001/XMLSchema#string')) query = """ SELECT * WHERE { ?id oboInOwl:hasOBONamespace ?disease_ontology . ?id rdfs:label ?label . OPTIONAL {?id obo:IAO_0000115 ?descr} FILTER NOT EXISTS {?id owl:deprecated ?dep} } """ rows = g.query(query, initBindings={'disease_ontology': disease_ontology}) res = [{str(k): str(v) for k, v in binding.items()} for binding in rows.bindings] df = pd.DataFrame(res) df.drop_duplicates(subset=['id'], inplace=True) df.fillna("", inplace=True) do = df.to_dict("records") do = {purl_to_curie(x['id']): x for x in do} return do
def result_set_2_rdf(self, result_set, reduced_mapping, fragment): for tweet in result_set: for s, p, o in reduced_mapping.mapping: subject = s obj = o splited_subject = subject.split('{') subject_prefix = splited_subject[0] subject_jsonpath = parse(splited_subject[1].split('}')[0]) subject_values = [match.value for match in subject_jsonpath.find(tweet)] if '$.' in obj: object_jsonpath = parse(obj.split('{')[0].split('}')[0]) object_values = [match.value for match in object_jsonpath.find(tweet)] for object_value in object_values: fragment.add_data_triple(URIRef("%s%s" % (subject_prefix, subject_values[0])), p, Literal(object_value)) else: fragment.add_data_triple(URIRef("%s%s" % (subject_prefix, subject_values[0])), p, obj)
def result_set_2_rdf(self, result_set, reduced_mapping, fragment): for repo in result_set: for s, p, o in reduced_mapping.mapping: subject = s obj = o splited_subject = subject.split('{') subject_prefix = splited_subject[0] subject_jsonpath = parse(splited_subject[1].split('}')[0]) subject_values = [match.value for match in subject_jsonpath.find(repo)] if '$.' in obj: object_jsonpath = parse(obj.split('{')[0].split('}')[0]) object_values = [match.value for match in object_jsonpath.find(repo)] for object_value in object_values: fragment.add_data_triple(URIRef("%s%s" % (subject_prefix, subject_values[0])), p, Literal(object_value)) else: fragment.add_data_triple(URIRef("%s%s" % (subject_prefix, subject_values[0])), p, obj)
def test_create_graph(): """Create new graphFactory Object""" from sc import graphManager PROV = Namespace("http://www.w3.org/ns/prov#") tstregistry = graphManager.VocabularyRegistry() vocab1 = Vocabulary1() tstregistry.register(vocab1) vocab2 = Vocabulary2() tstregistry.register(vocab2) tstregistry.build_graph() print tstregistry.get_turtle() # Check assertions in global graph store assert (URIRef("http://orcid.org/000-0003-4901-6059"), RDF.type, PROV.Person) in tstregistry.global_graph assert (URIRef(uuidurn), RDFS.label, Literal( "Docker: https://www.docker.com/")) in tstregistry.global_graph # Check Serialization jsongraph = json.loads(tstregistry.get_json_ld()) assert '@context' in jsongraph
def __addAction(self, action, statement): element = self.trans.createElement(action) for item in statement: if isinstance(item, Literal): literal = self.trans.createElement("literal") if item.datatype is not None: literal.setAttribute("datatype", str(item.datatype)) if item.language is not None: literal.setAttribute("xml:lang", str(item.language)) literal.appendChild(self.trans.createTextNode(str(item))) element.appendChild(literal) elif isinstance(item, URIRef): uri = self.trans.createElement("uri") uri.appendChild(self.trans.createTextNode(str(item))) element.appendChild(uri) elif isinstance(item, BNode): bnode = self.trans.createElement("bnode") bnode.appendChild(self.trans.createTextNode(str(item))) element.appendChild(bnode) else: raise Exception("Unknown element: " + item) self.trans.childNodes[0].appendChild(element)
def __setattr__(self, name, values): self._objectGraph._load(self.uri) unwrappedValues = [] for value in values: # unwrap rdfobjects: if isinstance(value, RDFObject): unwrappedValues.append(value.uri) # pass through rdflib objects: elif isinstance(value, URIRef) or isinstance(value, BNode) or isinstance(value, Literal): unwrappedValues.append(value) # wrap literals: else: unwrappedValues.append(Literal(value)) # look for a property mapping for this name: prop = self._getProp(name) if name.startswith("r_"): self._objectGraph._setSubjects(unwrappedValues, prop, self.uri) else: self._objectGraph._setObjects(self.uri, prop, unwrappedValues)
def _add_date_triple(self, subject, predicate, value, _type=Literal): ''' Adds a new triple with a date object Dates are parsed using dateutil, and if the date obtained is correct, added to the graph as an XSD.dateTime value. If there are parsing errors, the literal string value is added. ''' if not value: return try: default_datetime = datetime.datetime(1, 1, 1, 0, 0, 0) _date = parse_date(value, default=default_datetime) self.g.add((subject, predicate, _type(_date.isoformat(), datatype=XSD.dateTime))) except ValueError: self.g.add((subject, predicate, _type(value)))
def test_object_list(self): p = RDFProfile(_default_graph()) p.g.add((URIRef('http://example.org/datasets/1'), DCAT.keyword, Literal('space'))) p.g.add((URIRef('http://example.org/datasets/1'), DCAT.keyword, Literal('moon'))) value = p._object_value_list(URIRef('http://example.org/datasets/1'), DCAT.keyword) assert isinstance(value, list) assert isinstance(value[0], unicode) eq_(len(value), 2) eq_(sorted(value), ['moon', 'space'])
def test_dataset_license_from_distribution_by_title(self): # license_id retrieved from dct:title of dcat:license object g = Graph() dataset = URIRef("http://example.org/datasets/1") g.add((dataset, RDF.type, DCAT.Dataset)) distribution = URIRef("http://example.org/datasets/1/ds/1") g.add((distribution, RDF.type, DCAT.Distribution)) g.add((dataset, DCAT.distribution, distribution)) license = BNode() g.add((distribution, DCT.license, license)) g.add((license, DCT.title, Literal("Creative Commons Attribution"))) p = RDFParser(profiles=['euro_dcat_ap']) p.g = g dataset = [d for d in p.datasets()][0] eq_(dataset['license_id'], 'cc-by')
def test_distribution_access_url(self): g = Graph() dataset1 = URIRef("http://example.org/datasets/1") g.add((dataset1, RDF.type, DCAT.Dataset)) distribution1_1 = URIRef("http://example.org/datasets/1/ds/1") g.add((distribution1_1, RDF.type, DCAT.Distribution)) g.add((distribution1_1, DCAT.accessURL, Literal('http://access.url.org'))) g.add((dataset1, DCAT.distribution, distribution1_1)) p = RDFParser(profiles=['euro_dcat_ap']) p.g = g datasets = [d for d in p.datasets()] resource = datasets[0]['resources'][0] eq_(resource['url'], u'http://access.url.org') assert 'download_url' not in resource
def test_distribution_download_url(self): g = Graph() dataset1 = URIRef("http://example.org/datasets/1") g.add((dataset1, RDF.type, DCAT.Dataset)) distribution1_1 = URIRef("http://example.org/datasets/1/ds/1") g.add((distribution1_1, RDF.type, DCAT.Distribution)) g.add((distribution1_1, DCAT.downloadURL, Literal('http://download.url.org'))) g.add((dataset1, DCAT.distribution, distribution1_1)) p = RDFParser(profiles=['euro_dcat_ap']) p.g = g datasets = [d for d in p.datasets()] resource = datasets[0]['resources'][0] eq_(resource['url'], u'http://download.url.org') eq_(resource['download_url'], u'http://download.url.org')
def test_distribution_both_access_and_download_url(self): g = Graph() dataset1 = URIRef("http://example.org/datasets/1") g.add((dataset1, RDF.type, DCAT.Dataset)) distribution1_1 = URIRef("http://example.org/datasets/1/ds/1") g.add((distribution1_1, RDF.type, DCAT.Distribution)) g.add((distribution1_1, DCAT.accessURL, Literal('http://access.url.org'))) g.add((distribution1_1, DCAT.downloadURL, Literal('http://download.url.org'))) g.add((dataset1, DCAT.distribution, distribution1_1)) p = RDFParser(profiles=['euro_dcat_ap']) p.g = g datasets = [d for d in p.datasets()] resource = datasets[0]['resources'][0] eq_(resource['url'], u'http://access.url.org') eq_(resource['download_url'], u'http://download.url.org')
def test_distribution_format_imt_and_format(self): g = Graph() dataset1 = URIRef("http://example.org/datasets/1") g.add((dataset1, RDF.type, DCAT.Dataset)) distribution1_1 = URIRef("http://example.org/datasets/1/ds/1") g.add((distribution1_1, RDF.type, DCAT.Distribution)) g.add((distribution1_1, DCAT.mediaType, Literal('text/csv'))) g.add((distribution1_1, DCT['format'], Literal('CSV'))) g.add((dataset1, DCAT.distribution, distribution1_1)) p = RDFParser(profiles=['euro_dcat_ap']) p.g = g datasets = [d for d in p.datasets()] resource = datasets[0]['resources'][0] eq_(resource['format'], u'CSV') eq_(resource['mimetype'], u'text/csv')
def test_distribution_format_imt_only(self): g = Graph() dataset1 = URIRef("http://example.org/datasets/1") g.add((dataset1, RDF.type, DCAT.Dataset)) distribution1_1 = URIRef("http://example.org/datasets/1/ds/1") g.add((distribution1_1, RDF.type, DCAT.Distribution)) g.add((distribution1_1, DCAT.mediaType, Literal('text/csv'))) g.add((dataset1, DCAT.distribution, distribution1_1)) p = RDFParser(profiles=['euro_dcat_ap']) p.g = g datasets = [d for d in p.datasets()] resource = datasets[0]['resources'][0] if toolkit.check_ckan_version(min_version='2.3'): eq_(resource['format'], u'CSV') eq_(resource['mimetype'], u'text/csv') else: eq_(resource['format'], u'text/csv')
def test_distribution_format_imt_only_normalize_false(self): g = Graph() dataset1 = URIRef("http://example.org/datasets/1") g.add((dataset1, RDF.type, DCAT.Dataset)) distribution1_1 = URIRef("http://example.org/datasets/1/ds/1") g.add((distribution1_1, RDF.type, DCAT.Distribution)) g.add((distribution1_1, DCAT.mediaType, Literal('text/csv'))) g.add((dataset1, DCAT.distribution, distribution1_1)) p = RDFParser(profiles=['euro_dcat_ap']) p.g = g datasets = [d for d in p.datasets()] resource = datasets[0]['resources'][0] eq_(resource['format'], u'text/csv') eq_(resource['mimetype'], u'text/csv')
def test_distribution_format_format_only_normalize_false(self): g = Graph() dataset1 = URIRef("http://example.org/datasets/1") g.add((dataset1, RDF.type, DCAT.Dataset)) distribution1_1 = URIRef("http://example.org/datasets/1/ds/1") g.add((distribution1_1, RDF.type, DCAT.Distribution)) g.add((distribution1_1, DCT['format'], Literal('CSV'))) g.add((dataset1, DCAT.distribution, distribution1_1)) p = RDFParser(profiles=['euro_dcat_ap']) p.g = g datasets = [d for d in p.datasets()] resource = datasets[0]['resources'][0] eq_(resource['format'], u'CSV') assert 'mimetype' not in resource
def test_distribution_format_unknown_imt(self): g = Graph() dataset1 = URIRef("http://example.org/datasets/1") g.add((dataset1, RDF.type, DCAT.Dataset)) distribution1_1 = URIRef("http://example.org/datasets/1/ds/1") g.add((distribution1_1, RDF.type, DCAT.Distribution)) g.add((distribution1_1, DCAT.mediaType, Literal('text/unknown-imt'))) g.add((dataset1, DCAT.distribution, distribution1_1)) p = RDFParser(profiles=['euro_dcat_ap']) p.g = g datasets = [d for d in p.datasets()] resource = datasets[0]['resources'][0] eq_(resource['format'], u'text/unknown-imt') eq_(resource['mimetype'], u'text/unknown-imt')
def test_distribution_format_imt_normalized(self): g = Graph() dataset1 = URIRef("http://example.org/datasets/1") g.add((dataset1, RDF.type, DCAT.Dataset)) distribution1_1 = URIRef("http://example.org/datasets/1/ds/1") g.add((distribution1_1, RDF.type, DCAT.Distribution)) g.add((distribution1_1, DCAT.mediaType, Literal('text/unknown-imt'))) g.add((dataset1, DCAT.distribution, distribution1_1)) p = RDFParser(profiles=['euro_dcat_ap']) p.g = g datasets = [d for d in p.datasets()] resource = datasets[0]['resources'][0] eq_(resource['format'], u'text/unknown-imt') eq_(resource['mimetype'], u'text/unknown-imt')
def test_spatial_rdfs_label(self): g = Graph() dataset = URIRef('http://example.org/datasets/1') g.add((dataset, RDF.type, DCAT.Dataset)) spatial_uri = URIRef('http://geonames/Newark') g.add((dataset, DCT.spatial, spatial_uri)) g.add((spatial_uri, RDF.type, DCT.Location)) g.add((spatial_uri, RDFS.label, Literal('Newark'))) p = RDFParser(profiles=['euro_dcat_ap']) p.g = g datasets = [d for d in p.datasets()] extras = self._extras(datasets[0]) eq_(extras['spatial_text'], 'Newark')
def test_spatial_wkt_only(self): g = Graph() dataset = URIRef('http://example.org/datasets/1') g.add((dataset, RDF.type, DCAT.Dataset)) spatial_uri = URIRef('http://geonames/Newark') g.add((dataset, DCT.spatial, spatial_uri)) g.add((spatial_uri, RDF.type, DCT.Location)) g.add((spatial_uri, LOCN.geometry, Literal('POINT (67 89)', datatype=GSP.wktLiteral))) p = RDFParser(profiles=['euro_dcat_ap']) p.g = g datasets = [d for d in p.datasets()] extras = self._extras(datasets[0]) # NOTE: geomet returns floats for coordinates on WKT -> GeoJSON eq_(extras['spatial'], '{"type": "Point", "coordinates": [67.0, 89.0]}')
def test_spatial_literal_only(self): g = Graph() dataset = URIRef('http://example.org/datasets/1') g.add((dataset, RDF.type, DCAT.Dataset)) g.add((dataset, DCT.spatial, Literal('Newark'))) p = RDFParser(profiles=['euro_dcat_ap']) p.g = g datasets = [d for d in p.datasets()] extras = self._extras(datasets[0]) eq_(extras['spatial_text'], 'Newark') assert_true('spatial_uri' not in extras) assert_true('spatial' not in extras)
def _add_agent(self, _dict, ref, basekey, _type): ''' Stores the Agent in this format: <dct:publisher rdf:resource="http://dati.gov.it/resource/Amministrazione/r_liguri"/> <dcatapit:Agent rdf:about="http://dati.gov.it/resource/Amministrazione/r_liguri"> <rdf:type rdf:resource="&foaf;Agent"/> <dct:identifier>r_liguri</dct:identifier> <foaf:name>Regione Liguria</foaf:name> </dcatapit:Agent> Returns the ref to the agent node ''' agent_name = self._get_dict_value(_dict, basekey + '_name', 'N/A') agent_id = self._get_dict_value(_dict, basekey + '_identifier','N/A') agent = BNode() self.g.add((agent, RDF['type'], DCATAPIT.Agent)) self.g.add((agent, RDF['type'], FOAF.Agent)) self.g.add((ref, _type, agent)) self.g.add((agent, FOAF.name, Literal(agent_name))) self.g.add((agent, DCT.identifier, Literal(agent_id))) return agent
def test_null_values_with_multiple_strings(): csvw = CSVW(csv_path="tests/null1.csv", metadata_path="tests/null1.multiple.csv-metadata.json") rdf_contents = csvw.to_rdf() g = ConjunctiveGraph() g.parse(data=rdf_contents, format="turtle") all_objects = {x for x in g.objects()} assert Literal('null_key', datatype=XSD.token) not in all_objects assert Literal('null_sector') not in all_objects assert Literal('null_id', datatype=XSD.token) not in all_objects for id in ['10', '11', '12', '13']: assert Literal(id, datatype=XSD.token) not in all_objects all_preds = {x for x in g.predicates()} assert id_uri not in all_preds assert Literal('1', datatype=XSD.token) not in all_objects
def test_date(): with CSVW(csv_path="tests/datatypes.date.csv", metadata_path="tests/datatypes.date.csv-metadata.json") as csvw: rdf_output = csvw.to_rdf() g = ConjunctiveGraph() g.parse(data=rdf_output, format="turtle") date1_lit = Literal("2017-01-09", datatype=XSD.date) assert len(list(g.triples((NS['event/1'], NS['date1'], date1_lit)))) == 1 date2_lit = Literal("2017-01-10Z", datatype=XSD.date) assert len(list(g.triples((NS['event/1'], NS['date2'], date2_lit)))) == 1 date3_lit = Literal("2017-01-11", datatype=XSD.date) assert len(list(g.triples((NS['event/1'], NS['date3'], date3_lit)))) == 1 date4_lit = Literal("2002-09-24-06:00", datatype=XSD.date) assert len(list(g.triples((NS['event/1'], NS['date4'], date4_lit)))) == 1 date5_lit = Literal("2002-09-24+04:00", datatype=XSD.date) assert len(list(g.triples((NS['event/1'], NS['date5'], date5_lit)))) == 1
def test_datetime(): with CSVW(csv_path="tests/datatypes.datetime.csv", metadata_path="tests/datatypes.datetime.csv-metadata.json") as csvw: rdf_output = csvw.to_rdf() g = ConjunctiveGraph() g.parse(data=rdf_output, format="turtle") dt1_lit = Literal("2002-05-30T09:00:00", datatype=XSD.dateTime) assert len(list(g.triples((NS['event/1'], NS['datetime1'], dt1_lit)))) == 1 dt2_lit = Literal("2002-05-30T09:30:10.5", datatype=XSD.dateTime) assert len(list(g.triples((NS['event/1'], NS['datetime2'], dt2_lit)))) == 1 dt3_lit = Literal("2002-05-30T09:30:10Z", datatype=XSD.dateTime) assert len(list(g.triples((NS['event/1'], NS['datetime3'], dt3_lit)))) == 1 dt4_lit = Literal("2002-05-30T09:30:10-06:00", datatype=XSD.dateTime) assert len(list(g.triples((NS['event/1'], NS['datetime4'], dt4_lit)))) == 1 dt5_lit = Literal("2002-05-30T09:30:10+04:00", datatype=XSD.dateTime) assert len(list(g.triples((NS['event/1'], NS['datetime5'], dt5_lit)))) == 1 datestamp = Literal("2004-04-12T13:20:00-05:00", datatype=XSD.dateTimeStamp) assert len(list(g.triples((NS['event/1'], NS['datetimestamp'], datestamp)))) == 1
def test_bool_with_format(): csvw = CSVW(csv_path="tests/datatypes.bool.csv", metadata_path="tests/datatypes.bool.csv-metadata.json") rdf_output = csvw.to_rdf() g = ConjunctiveGraph() g.parse(data=rdf_output, format="turtle") true_lit = Literal(True, datatype=XSD.boolean) false_lit = Literal(False, datatype=XSD.boolean) assert len(list(g.triples((NS['event/1'], NS['bool1'], true_lit)))) == 1 assert len(list(g.triples((NS['event/1'], NS['bool2'], true_lit)))) == 1 assert len(list(g.triples((NS['event/1'], NS['bool3'], true_lit)))) == 1 assert len(list(g.triples((NS['event/2'], NS['bool1'], false_lit)))) == 1 assert len(list(g.triples((NS['event/2'], NS['bool2'], false_lit)))) == 1 assert len(list(g.triples((NS['event/2'], NS['bool3'], false_lit)))) == 1 assert len(list(g.triples((NS['event/3'], NS['bool1'], false_lit)))) == 1 assert len(list(g.triples((NS['event/3'], NS['bool2'], false_lit)))) == 1 assert len(list(g.triples((NS['event/3'], NS['bool3'], false_lit)))) == 1
def test_empty_boolean(): csvw = CSVW(csv_path="tests/empty.csv", metadata_path="tests/empty.bool.csv-metadata.json") rdf_output = csvw.to_rdf() g = ConjunctiveGraph() g.parse(data=rdf_output, format="turtle") assert len(g) == 2 assert len(list(g.triples((None, None, Literal(False))))) == 2 csvw = CSVW(csv_path="tests/empty.csv", metadata_path="tests/empty.invalid_base.csv-metadata.json") rdf_output = csvw.to_rdf() g = ConjunctiveGraph() g.parse(data=rdf_output, format="turtle") assert len(g) == 0
def setUp(self): self.graph = rdflib.Graph() self.entity = rdflib.URIRef("https://bibcat.org/test-entity") self.simple_title_bnode = rdflib.BNode() self.graph.add((self.entity, rdflib.RDF.type, BF.Title)) self.graph.add((self.entity, BF.title, self.simple_title_bnode)) self.graph.add((self.simple_title_bnode, BF.mainTitle, rdflib.Literal("This is a test"))) self.top_title_bnode = rdflib.BNode() self.graph.add((self.entity, BF.title, self.top_title_bnode)) secondary_title_bnode = rdflib.BNode() self.graph.add((self.top_title_bnode, rdflib.RDF.type, BF.Topic)) self.graph.add((self.top_title_bnode, rdflib.RDFS.label, rdflib.Literal("This is a title and a name"))) self.graph.add((self.top_title_bnode, SCHEMA.name, secondary_title_bnode)) self.graph.add((secondary_title_bnode, rdflib.RDF.value, rdflib.Literal("This is a name")))
def setUp(self): self.graph = rdflib.Graph() self.entity_one = rdflib.URIRef("https://bibcat.org/test-entity") self.graph.add((self.entity_one, rdflib.RDF.type, rdflib.RDFS.Resource)) self.graph.add((self.entity_one, rdflib.RDFS.label, rdflib.Literal("Test Entity One", lang="en"))) self.entity_two = rdflib.URIRef("https://bibcat.org/test-entity-two") self.graph.add((self.entity_two, rdflib.RDF.type, rdflib.RDFS.Resource)) self.graph.add((self.entity_two, rdflib.RDFS.label, rdflib.Literal("Test Entity Two", lang="en"))) title_bnode = rdflib.BNode() self.graph.add((self.entity_two, BF.title, title_bnode)) self.graph.add((title_bnode, rdflib.RDF.type, BF.Title)) self.graph.add((title_bnode, BF.subTitle, rdflib.Literal("Subtitle ")))
def __generate_object_term__(self, datatype, value): """Internal method takes a datatype (can be None) and returns the RDF Object Term Args: ----- datatype: None, or rdflib.URIRef value: Varys depending on ingester """ if datatype == NS_MGR.xsd.anyURI: term = rdflib.URIRef(value) elif datatype: term = rdflib.Literal(value, datatype=datatype) else: term = rdflib.Literal(value) return term
def generate_term(self, **kwargs): """Method generates a rdflib.Term based on kwargs""" term_map = kwargs.pop('term_map') if hasattr(term_map, "termType") and\ term_map.termType == NS_MGR.rr.BlankNode: return rdflib.BNode() if not hasattr(term_map, 'datatype'): term_map.datatype = NS_MGR.xsd.anyURI if hasattr(term_map, "template") and term_map.template is not None: template_vars = kwargs template_vars.update(self.constants) # Call any functions to generate values for key, value in template_vars.items(): if hasattr(value, "__call__"): template_vars[key] = value() raw_value = term_map.template.format(**template_vars) if term_map.datatype == NS_MGR.xsd.anyURI: return rdflib.URIRef(raw_value) return rdflib.Literal(raw_value, datatype=term_map.datatype) if term_map.reference is not None: # Each child will have different mechanisms for referencing the # source based return self.__generate_reference__(term_map, **kwargs)
def __generate_reference__(self, triple_map, **kwargs): """Generates a RDF entity based on triple map Args: triple_map(SimpleNamespace): Triple Map """ raw_value = self.source.get(str(triple_map.reference)) if raw_value is None or len(raw_value) < 1: return if hasattr(triple_map, "datatype"): if triple_map.datatype == NS_MGR.xsd.anyURI: output = rdflib.URIRef(raw_value) else: output = rdflib.Literal( raw_value, datatype=triple_map.datatype) else: output = rdflib.Literal(raw_value) return output
def __reference_handler__(self, **kwargs): """Internal method for handling rr:reference in triples map Keyword Args: ------------- predicate_obj_map: SimpleNamespace obj: dict subject: rdflib.URIRef """ subjects = [] pred_obj_map = kwargs.get("predicate_obj_map") obj = kwargs.get("obj") subject = kwargs.get("subject") if pred_obj_map.reference is None: return subjects predicate = pred_obj_map.predicate ref_exp = jsonpath_ng.parse(str(pred_obj_map.refernce)) found_objects = [r.value for r in ref_exp(obj)] for row in found_objects: self.output.add((subject, predicate, rdflib.Literal(row)))
def add_admin_metadata(self, entity): """Takes a graph and adds the AdminMetadata for the entity Args: entity (rdflib.URIRef): URI of the entity """ generate_msg = "Generated by BIBCAT version {} from KnowledgeLinks.io" generation_process = rdflib.BNode() self.graph.add((generation_process, rdflib.RDF.type, NS_MGR.bf.GenerationProcess)) self.graph.add((generation_process, NS_MGR.bf.generationDate, rdflib.Literal( datetime.datetime.utcnow().isoformat()))) self.graph.add((generation_process, rdflib.RDF.value, rdflib.Literal(generate_msg.format(__version__), lang="en"))) #! Should add bibcat's current git MD5 commit self.graph.add( (entity, NS_MGR.bf.generationProcess, generation_process) )
def __top_result__(query_result, type_=None, class_=None): """Internal function takes a JSON query results and returns the top result as a rdflib.URIRef IRI if more than one. Args: ---- query_result(dict): Query result """ if query_result.get("totalResultsCount", 0) > 0: print(query_result.get("geonames")[0]) top_result = query_result.get("geonames")[0] geo_id = top_result.get("geonameId") place_iri = rdflib.URIRef("{}{}/".format(IRI_BASE, geo_id)) if type_ is not None and type_.startswith("rdf"): output = rdflib.Graph() rdf_type = rdflib.RDFS.Resource if class_ is not None: rdf_type = class_ output.add((place_iri, rdflib.RDF.type, rdf_type)) output.add((place_iri, rdflib.RDFS.label, rdflib.Literal(top_result.get("name")))) return output return place_iri
def __add_work_title__(self, work_graph, work_uri, instance_uri): """Method takes a new work graph and instance uri, queries for bf:InstanceTitle of instance uri and adds values to work graph Args: work_graph(rdflib.Graph): RDF Graph of new BF Work instance_uri(rdflib.URIRef): URI of BF Instance """ instance_key = str(instance_uri) if instance_key in self.processed and\ "title" in self.processed[instance_key]: work_title_bnode = rdflib.BNode() work_graph.add((work_uri, NS_MGR.bf.title, work_title_bnode)) work_graph.add((work_title_bnode, NS_MGR.rdf.type, NS_MGR.bf.WorkTitle)) for row in self.processed[instance_key]["title"]: main_title, subtitle = row["mainTitle"], row["subtitle"] work_graph.add((work_title_bnode, NS_MGR.bf.mainTitle, rdflib.Literal(main_title))) if subtitle: work_graph.add((work_title_bnode, NS_MGR.bf.subtitle, rdflib.Literal(subtitle)))
def graph_member(self, ldp_root, c_id, obj=None): if not obj: obj = self.member() node = URIRef(ldp_root+encoder.encode(c_id)+"/member/"+encoder.encode(obj.id)) mappings = URIRef(node+"#mappings") g = Graph(identifier=node) g.add((node, RDF.type, RDA.Member)) g.add((node, DCTERMS.identifier, Literal(obj.id))) g.add((node, RDA.location, Literal(obj.location))) if hasattr(obj, 'datatype'): g.add((node, RDA.datatype, Literal(obj.datatype))) if hasattr(obj, 'ontology'): g.add((node, RDA.ontology, Literal(obj.ontology))) if hasattr(obj, 'mappings'): g.add((node, RDA.mappings, mappings)) mp = obj.mappings if hasattr(mp, 'role'): g.add((mappings, RDA.role, URIRef(obj.mappings.role))) if hasattr(mp, 'index'): g.add((mappings, RDA.itemIndex, Literal(obj.mappings.index))) if hasattr(mp, 'dateAdded'): g.add((mappings, RDA.dateAdded, Literal(obj.mappings.dateAdded))) return g
def string_to_literal(string, to_uri=True): """ Creates a literal or URI object from a given string. :param string: String to convert from :param to_uri: Whether to try convert to an URI if not a valid literal is given. :return: Literal object or URI object (if failed to create a literal and desired) or else input string. """ if "^^http://www.w3.org/2001/XMLSchema#" in string: try: value, datatype = string.split("^^") return Literal(value, datatype=datatype) except: loginfo("Could not deliteralize string.") return Literal(string) else: loginfo("not a literal") return URIRef(string) if to_uri else string
def check_value_type(self, value): """ Check type of a value that I'm analyzing :param value to check :return: value that are casted to a rdflib type (float, string or uri if it's a resource) """ # i can have input value like list or like single input, i need to make a filter and get # unique element of this list result = value if self.mapper.is_float(result): data_type = rdflib.namespace.XSD.float elif self.mapper.is_int(result): data_type = rdflib.namespace.XSD.int else: # If this string represents a resource resource = self.check_if_is_resource(result) # if it is a resource in dbpedia if resource: return rdflib.URIRef(resource) else: data_type = rdflib.namespace.XSD.string return rdflib.Literal(result, datatype=data_type)
def _resolve_range(self, range_uri): """ Resolve a rdfs.Property rdfs.range value to a type. """ range_name = self._extract_name(range_uri) if range_name in self.namespace: return self.namespace[range_name] if is_a_literal(range_uri): return Literal elif is_a_property(range_uri): return RDF_Property elif is_a_list(range_uri): return RDF.List return RDFS_Class
def rdf(request): uri = request.GET['uri'] g = Graph() annotations = Annotation.objects.filter(uri=uri) for annotation in annotations: if annotation.title: g.add( ( URIRef(annotation.uri), URIRef("http://localhost/metawiki/index.php/Special:URIResolver/Property-3ATitle"), Literal(annotation.title) ) ) if annotation.notes: g.add( ( URIRef(annotation.uri), URIRef("http://localhost/metawiki/index.php/Special:URIResolver/Property-3ANotes"), Literal(annotation.notes) ) ) for tag in annotation.tags.all(): g.add( ( URIRef(annotation.uri), URIRef("http://localhost/metawiki/index.php/Special:URIResolver/Property-3ATag"), Literal(tag.prefLabel) ) ) status = HttpResponse(g.serialize( format='xml' ) ) status["Content-Type"] = "application/rdf+xml" return status
def add_vcard(self, position, name): """ :param position: number in author order :param name: name as string - last, first, middle :return: rdflib.Graph """ g = Graph() # vcard individual vci_uri = D['vcard-individual-' + position + '-' + self.localid] g.add((vci_uri, RDF.type, VCARD.Individual)) # vcard name vcn_uri = D['vcard-name-' + position + '-' + self.localid] g.add((vcn_uri, RDF.type, VCARD.Name)) g.add((vcn_uri, RDFS.label, Literal(name))) # Parse name into first, last, middle name = HumanName(name) g.add((vcn_uri, VCARD.givenName, Literal(name.first))) g.add((vcn_uri, VCARD.familyName, Literal(name.last))) if name.middle != "": g.add((vcn_uri, VIVO.middleName, Literal(name.middle))) # Relate vcard individual to vcard name g.add((vci_uri, VCARD.hasName, vcn_uri)) return vci_uri, g
def authorship(self): """ Add authorship statements and vcards for authors. :return: rdflib.Graph """ g = Graph() for num, au in enumerate(self.authors()): position = str(num + 1) vcard_individual_uri, vcard_stmts = self.add_vcard(position, au) g += vcard_stmts # Authorship aship_uri = D['authorship-' + position + '-' + self.localid] g.add((aship_uri, RDF.type, VIVO.Authorship)) g.add((aship_uri, VIVO.rank, Literal(int(position)))) # Relate pub and authorship g.add((aship_uri, VIVO.relates, self.pub_uri)) # Relate vcard and authorship g.add((aship_uri, VIVO.relates, vcard_individual_uri)) return g
def add_vcard_weblink(self): """ Build statements for weblinks in VIVO. :return: rdflib.Graph """ base_url = "http://ws.isiknowledge.com/cps/openurl/service?url_ver=Z39.88-2004&rft_id=info:ut/WOS:{}" g = Graph() # vcard individual for pub vci_uri = D['vcard-individual-pub-' + self.localid] g.add((vci_uri, RDF.type, VCARD.Individual)) # vcard URL vcu_uri = D['vcard-url-pub-' + self.localid] g.add((vcu_uri, RDF.type, VCARD.URL)) g.add((vcu_uri, RDFS.label, Literal(u"Web of Science™"))) g.add((vcu_uri, VCARD.url, Literal(base_url.format(self.ut())))) # Relate vcard individual to url g.add((vci_uri, VCARD.hasURL, vcu_uri)) return vci_uri, g
def _vcard_email(self): g = Graph() try: emails = [e for e in self.profile["emails"].split("|")] except KeyError: try: emails = [self.profile['email']] except KeyError: emails = [] for email in emails: vt = Resource(g, self.vcard_email_uri) vt.set(RDF.type, VCARD.Work) # Label probably not necessary vt.set(RDFS.label, Literal(email)) vt.set(VCARD.email, Literal(email)) return g
def org_total_cites(orgs): g = Graph() for org_name in orgs: org_uri = waan_uri(org_name) #print>>sys.stderr, "Processing", org_name, "total cites" ln = local_name(org_uri) tc = load_incites_json_file(org_name, 'cites') for item in tc: curi = D['citecount-' + ln + '-' + str(item['year'])] g.add((curi, RDF.type, WOS.InCitesCitesPerYear)) g.add((curi, RDFS.label, Literal("{} - {}".format(item['year'], item['count'])))) g.add((curi, WOS.number, Literal(item['count']))) g.add((curi, WOS.year, Literal(item['year']))) g.add((org_uri, VIVO.relates, curi)) #print g.serialize(format="turtle") ng = "http://localhost/data/incites-total-cites-year-counts" backend.sync_updates(ng, g) return True
def org_top_categories(orgs): g = Graph() for org_name in orgs: #print>>sys.stderr, "Processing", org_name, "top categories" org_uri = waan_uri(org_name) ln = local_name(org_uri) top_cat = load_incites_json_file(org_name, 'categories') for item in top_cat: cat = item['category'] category_uri = get_category_uri(cat) curi = D['topcategory-'] + ln + slugify(cat) g.add((curi, RDF.type, WOS.InCitesTopCategory)) g.add((curi, RDFS.label, Literal("{} - {}".format(org_name, cat)))) g.add((curi, WOS.number, Literal(item['count']))) g.add((curi, VIVO.relates, category_uri)) g.add((curi, VIVO.relates, org_uri)) #print g.serialize(format="turtle") ng = "http://localhost/data/incites-top-categories" backend.sync_updates(ng, g) return True
def run(self): g = Graph() wos_top = D['wos-topics'] g.add((wos_top, RDF.type, WOS.TopTopic)) g.add((wos_top, RDFS.label, Literal("Web of Science Subject Schemas"))) with open(self.input_file) as inf: for row in csv.DictReader(inf): ra = row['Research Area (eASCA)'] category = row['WoS Category (tASCA)'] broad, ra1, ra2 = self.chunk_ras(ra) broad_uri, cg = self.do_term(broad, clz=WOS.BroadDiscipline) g.add((broad_uri, SKOS.broader, wos_top)) g += cg ra1_uri, cg = self.do_term(ra1, broader=broad_uri, clz=WOS.ResearchArea, uri_prefix="wosra") g += cg ra2_uri = None if ra2 is not None: ra2_uri, cg = self.do_term(ra2, broader=ra1_uri, clz=WOS.ResearchArea, uri_prefix="wosra") g += cg cat_uri, cg = self.do_term(category, broader=ra2_uri or ra1_uri, clz=WOS.Category) g += cg self.serialize(g)
def add_grant(grant, pub_uri): """ Create a funder and grant(s). """ g = Graph() if grant.get("agency") is None: logger.info("No agency found for {} with ids.".format(pub_uri, ";".join(grant.get("ids", [])))) return g slug = slugify(grant["agency"]) uri = D['funder-' + slug] g.add((uri, RDF.type, WOS.Funder)) g.add((uri, RDFS.label, Literal(grant["agency"]))) for gid in grant["ids"]: label = "{} - {}".format(grant["agency"], gid) guri = D['grant-'] + slugify(label) g.add((guri, RDF.type, WOS.Grant)) g.add((guri, RDFS.label, Literal(label))) g.add((guri, WOS.grantId, Literal(gid))) g.add((guri, VIVO.relates, uri)) g.add((guri, VIVO.relates, pub_uri)) return g