Python rdflib 模块,Literal() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用rdflib.Literal()

项目:scheduled-bots    作者:SuLab    | 项目源码 | 文件源码
def get_do_metadata():
    # from the do owl file, get do labels, descriptions
    g = Graph()
    g.parse(DO_OWL_PATH)

    disease_ontology = Literal('disease_ontology', datatype=URIRef('http://www.w3.org/2001/XMLSchema#string'))
    query = """
        SELECT * WHERE {
            ?id oboInOwl:hasOBONamespace ?disease_ontology .
            ?id rdfs:label ?label .
            OPTIONAL {?id obo:IAO_0000115 ?descr}
            FILTER NOT EXISTS {?id owl:deprecated ?dep}
        }
        """
    rows = g.query(query, initBindings={'disease_ontology': disease_ontology})
    res = [{str(k): str(v) for k, v in binding.items()} for binding in rows.bindings]
    df = pd.DataFrame(res)
    df.drop_duplicates(subset=['id'], inplace=True)
    df.fillna("", inplace=True)
    do = df.to_dict("records")
    do = {purl_to_curie(x['id']): x for x in do}
    return do
项目:odmtp-tpf    作者:benjimor    | 项目源码 | 文件源码
def result_set_2_rdf(self, result_set, reduced_mapping, fragment):
        for tweet in result_set:
            for s, p, o in reduced_mapping.mapping:
                subject = s
                obj = o
                splited_subject = subject.split('{')
                subject_prefix = splited_subject[0]
                subject_jsonpath = parse(splited_subject[1].split('}')[0])
                subject_values = [match.value for match in subject_jsonpath.find(tweet)]
                if '$.' in obj:
                    object_jsonpath = parse(obj.split('{')[0].split('}')[0])
                    object_values = [match.value for match in object_jsonpath.find(tweet)]
                    for object_value in object_values:
                        fragment.add_data_triple(URIRef("%s%s" % (subject_prefix, subject_values[0])), p, Literal(object_value))
                else:
                    fragment.add_data_triple(URIRef("%s%s" % (subject_prefix, subject_values[0])), p, obj)
项目:odmtp-tpf    作者:benjimor    | 项目源码 | 文件源码
def result_set_2_rdf(self, result_set, reduced_mapping, fragment):
        for repo in result_set:
            for s, p, o in reduced_mapping.mapping:
                subject = s
                obj = o
                splited_subject = subject.split('{')
                subject_prefix = splited_subject[0]
                subject_jsonpath = parse(splited_subject[1].split('}')[0])
                subject_values = [match.value for match in subject_jsonpath.find(repo)]
                if '$.' in obj:
                    object_jsonpath = parse(obj.split('{')[0].split('}')[0])
                    object_values = [match.value for match in object_jsonpath.find(repo)]
                    for object_value in object_values:
                        fragment.add_data_triple(URIRef("%s%s" % (subject_prefix, subject_values[0])), p, Literal(object_value))
                else:
                    fragment.add_data_triple(URIRef("%s%s" % (subject_prefix, subject_values[0])), p, obj)
项目:smartcontainers    作者:crcresearch    | 项目源码 | 文件源码
def test_create_graph():
    """Create new graphFactory Object"""
    from sc import graphManager

    PROV = Namespace("http://www.w3.org/ns/prov#")
    tstregistry = graphManager.VocabularyRegistry()

    vocab1 = Vocabulary1()
    tstregistry.register(vocab1)
    vocab2 = Vocabulary2()
    tstregistry.register(vocab2)

    tstregistry.build_graph()
    print tstregistry.get_turtle()
    # Check assertions in global graph store
    assert (URIRef("http://orcid.org/000-0003-4901-6059"),
            RDF.type, PROV.Person) in tstregistry.global_graph
    assert (URIRef(uuidurn),
            RDFS.label,  Literal(
                "Docker: https://www.docker.com/")) in tstregistry.global_graph
    # Check Serialization
    jsongraph = json.loads(tstregistry.get_json_ld())
    assert '@context' in jsongraph
项目:programming-the-semantic-web    作者:utecht    | 项目源码 | 文件源码
def __addAction(self, action, statement):
        element = self.trans.createElement(action)
        for item in statement:
            if isinstance(item, Literal):
                literal = self.trans.createElement("literal")
                if item.datatype is not None: literal.setAttribute("datatype", str(item.datatype))
                if item.language is not None: literal.setAttribute("xml:lang", str(item.language))
                literal.appendChild(self.trans.createTextNode(str(item)))
                element.appendChild(literal)
            elif isinstance(item, URIRef):
                uri = self.trans.createElement("uri")
                uri.appendChild(self.trans.createTextNode(str(item)))
                element.appendChild(uri)
            elif isinstance(item, BNode):
                bnode = self.trans.createElement("bnode")
                bnode.appendChild(self.trans.createTextNode(str(item)))
                element.appendChild(bnode)
            else:
                raise Exception("Unknown element: " + item)
        self.trans.childNodes[0].appendChild(element)
项目:programming-the-semantic-web    作者:utecht    | 项目源码 | 文件源码
def __setattr__(self, name, values):
        self._objectGraph._load(self.uri)
        unwrappedValues = []
        for value in values:
            # unwrap rdfobjects:
            if isinstance(value, RDFObject):
                unwrappedValues.append(value.uri)
            # pass through rdflib objects:
            elif isinstance(value, URIRef) or isinstance(value, BNode) or isinstance(value, Literal):
                unwrappedValues.append(value)
            # wrap literals:
            else:
                unwrappedValues.append(Literal(value))
        # look for a property mapping for this name:
        prop = self._getProp(name)
        if name.startswith("r_"):
            self._objectGraph._setSubjects(unwrappedValues, prop, self.uri)
        else:
            self._objectGraph._setObjects(self.uri, prop, unwrappedValues)
项目:dati-ckan-docker    作者:italia    | 项目源码 | 文件源码
def _add_date_triple(self, subject, predicate, value, _type=Literal):
        '''
        Adds a new triple with a date object

        Dates are parsed using dateutil, and if the date obtained is correct,
        added to the graph as an XSD.dateTime value.

        If there are parsing errors, the literal string value is added.
        '''
        if not value:
            return
        try:
            default_datetime = datetime.datetime(1, 1, 1, 0, 0, 0)
            _date = parse_date(value, default=default_datetime)

            self.g.add((subject, predicate, _type(_date.isoformat(),
                                                  datatype=XSD.dateTime)))
        except ValueError:
            self.g.add((subject, predicate, _type(value)))
项目:dati-ckan-docker    作者:italia    | 项目源码 | 文件源码
def test_object_list(self):

        p = RDFProfile(_default_graph())

        p.g.add((URIRef('http://example.org/datasets/1'),
                 DCAT.keyword,
                 Literal('space')))
        p.g.add((URIRef('http://example.org/datasets/1'),
                 DCAT.keyword,
                 Literal('moon')))

        value = p._object_value_list(URIRef('http://example.org/datasets/1'),
                                     DCAT.keyword)

        assert isinstance(value, list)
        assert isinstance(value[0], unicode)
        eq_(len(value), 2)
        eq_(sorted(value), ['moon', 'space'])
项目:dati-ckan-docker    作者:italia    | 项目源码 | 文件源码
def test_dataset_license_from_distribution_by_title(self):
        # license_id retrieved from dct:title of dcat:license object
        g = Graph()

        dataset = URIRef("http://example.org/datasets/1")
        g.add((dataset, RDF.type, DCAT.Dataset))

        distribution = URIRef("http://example.org/datasets/1/ds/1")
        g.add((distribution, RDF.type, DCAT.Distribution))
        g.add((dataset, DCAT.distribution, distribution))
        license = BNode()
        g.add((distribution, DCT.license, license))
        g.add((license, DCT.title, Literal("Creative Commons Attribution")))

        p = RDFParser(profiles=['euro_dcat_ap'])

        p.g = g

        dataset = [d for d in p.datasets()][0]
        eq_(dataset['license_id'], 'cc-by')
项目:dati-ckan-docker    作者:italia    | 项目源码 | 文件源码
def test_distribution_access_url(self):
        g = Graph()

        dataset1 = URIRef("http://example.org/datasets/1")
        g.add((dataset1, RDF.type, DCAT.Dataset))

        distribution1_1 = URIRef("http://example.org/datasets/1/ds/1")
        g.add((distribution1_1, RDF.type, DCAT.Distribution))
        g.add((distribution1_1, DCAT.accessURL, Literal('http://access.url.org')))
        g.add((dataset1, DCAT.distribution, distribution1_1))

        p = RDFParser(profiles=['euro_dcat_ap'])

        p.g = g

        datasets = [d for d in p.datasets()]

        resource = datasets[0]['resources'][0]

        eq_(resource['url'], u'http://access.url.org')
        assert 'download_url' not in resource
项目:dati-ckan-docker    作者:italia    | 项目源码 | 文件源码
def test_distribution_download_url(self):
        g = Graph()

        dataset1 = URIRef("http://example.org/datasets/1")
        g.add((dataset1, RDF.type, DCAT.Dataset))

        distribution1_1 = URIRef("http://example.org/datasets/1/ds/1")
        g.add((distribution1_1, RDF.type, DCAT.Distribution))
        g.add((distribution1_1, DCAT.downloadURL, Literal('http://download.url.org')))
        g.add((dataset1, DCAT.distribution, distribution1_1))

        p = RDFParser(profiles=['euro_dcat_ap'])

        p.g = g

        datasets = [d for d in p.datasets()]

        resource = datasets[0]['resources'][0]

        eq_(resource['url'], u'http://download.url.org')
        eq_(resource['download_url'], u'http://download.url.org')
项目:dati-ckan-docker    作者:italia    | 项目源码 | 文件源码
def test_distribution_both_access_and_download_url(self):
        g = Graph()

        dataset1 = URIRef("http://example.org/datasets/1")
        g.add((dataset1, RDF.type, DCAT.Dataset))

        distribution1_1 = URIRef("http://example.org/datasets/1/ds/1")
        g.add((distribution1_1, RDF.type, DCAT.Distribution))
        g.add((distribution1_1, DCAT.accessURL, Literal('http://access.url.org')))
        g.add((distribution1_1, DCAT.downloadURL, Literal('http://download.url.org')))
        g.add((dataset1, DCAT.distribution, distribution1_1))

        p = RDFParser(profiles=['euro_dcat_ap'])

        p.g = g

        datasets = [d for d in p.datasets()]

        resource = datasets[0]['resources'][0]

        eq_(resource['url'], u'http://access.url.org')
        eq_(resource['download_url'], u'http://download.url.org')
项目:dati-ckan-docker    作者:italia    | 项目源码 | 文件源码
def test_distribution_format_imt_and_format(self):
        g = Graph()

        dataset1 = URIRef("http://example.org/datasets/1")
        g.add((dataset1, RDF.type, DCAT.Dataset))

        distribution1_1 = URIRef("http://example.org/datasets/1/ds/1")
        g.add((distribution1_1, RDF.type, DCAT.Distribution))
        g.add((distribution1_1, DCAT.mediaType, Literal('text/csv')))
        g.add((distribution1_1, DCT['format'], Literal('CSV')))
        g.add((dataset1, DCAT.distribution, distribution1_1))

        p = RDFParser(profiles=['euro_dcat_ap'])

        p.g = g

        datasets = [d for d in p.datasets()]

        resource = datasets[0]['resources'][0]

        eq_(resource['format'], u'CSV')
        eq_(resource['mimetype'], u'text/csv')
项目:dati-ckan-docker    作者:italia    | 项目源码 | 文件源码
def test_distribution_format_imt_only(self):
        g = Graph()

        dataset1 = URIRef("http://example.org/datasets/1")
        g.add((dataset1, RDF.type, DCAT.Dataset))

        distribution1_1 = URIRef("http://example.org/datasets/1/ds/1")
        g.add((distribution1_1, RDF.type, DCAT.Distribution))
        g.add((distribution1_1, DCAT.mediaType, Literal('text/csv')))
        g.add((dataset1, DCAT.distribution, distribution1_1))

        p = RDFParser(profiles=['euro_dcat_ap'])

        p.g = g

        datasets = [d for d in p.datasets()]

        resource = datasets[0]['resources'][0]
        if toolkit.check_ckan_version(min_version='2.3'):
            eq_(resource['format'], u'CSV')
            eq_(resource['mimetype'], u'text/csv')
        else:
            eq_(resource['format'], u'text/csv')
项目:dati-ckan-docker    作者:italia    | 项目源码 | 文件源码
def test_distribution_format_imt_only_normalize_false(self):
        g = Graph()

        dataset1 = URIRef("http://example.org/datasets/1")
        g.add((dataset1, RDF.type, DCAT.Dataset))

        distribution1_1 = URIRef("http://example.org/datasets/1/ds/1")
        g.add((distribution1_1, RDF.type, DCAT.Distribution))
        g.add((distribution1_1, DCAT.mediaType, Literal('text/csv')))
        g.add((dataset1, DCAT.distribution, distribution1_1))

        p = RDFParser(profiles=['euro_dcat_ap'])

        p.g = g

        datasets = [d for d in p.datasets()]

        resource = datasets[0]['resources'][0]

        eq_(resource['format'], u'text/csv')
        eq_(resource['mimetype'], u'text/csv')
项目:dati-ckan-docker    作者:italia    | 项目源码 | 文件源码
def test_distribution_format_format_only_normalize_false(self):
        g = Graph()

        dataset1 = URIRef("http://example.org/datasets/1")
        g.add((dataset1, RDF.type, DCAT.Dataset))

        distribution1_1 = URIRef("http://example.org/datasets/1/ds/1")
        g.add((distribution1_1, RDF.type, DCAT.Distribution))
        g.add((distribution1_1, DCT['format'], Literal('CSV')))
        g.add((dataset1, DCAT.distribution, distribution1_1))

        p = RDFParser(profiles=['euro_dcat_ap'])

        p.g = g

        datasets = [d for d in p.datasets()]

        resource = datasets[0]['resources'][0]

        eq_(resource['format'], u'CSV')
        assert 'mimetype' not in resource
项目:dati-ckan-docker    作者:italia    | 项目源码 | 文件源码
def test_distribution_format_unknown_imt(self):
        g = Graph()

        dataset1 = URIRef("http://example.org/datasets/1")
        g.add((dataset1, RDF.type, DCAT.Dataset))

        distribution1_1 = URIRef("http://example.org/datasets/1/ds/1")
        g.add((distribution1_1, RDF.type, DCAT.Distribution))
        g.add((distribution1_1, DCAT.mediaType, Literal('text/unknown-imt')))
        g.add((dataset1, DCAT.distribution, distribution1_1))

        p = RDFParser(profiles=['euro_dcat_ap'])

        p.g = g

        datasets = [d for d in p.datasets()]

        resource = datasets[0]['resources'][0]

        eq_(resource['format'], u'text/unknown-imt')
        eq_(resource['mimetype'], u'text/unknown-imt')
项目:dati-ckan-docker    作者:italia    | 项目源码 | 文件源码
def test_distribution_format_imt_normalized(self):
        g = Graph()

        dataset1 = URIRef("http://example.org/datasets/1")
        g.add((dataset1, RDF.type, DCAT.Dataset))

        distribution1_1 = URIRef("http://example.org/datasets/1/ds/1")
        g.add((distribution1_1, RDF.type, DCAT.Distribution))
        g.add((distribution1_1, DCAT.mediaType, Literal('text/unknown-imt')))
        g.add((dataset1, DCAT.distribution, distribution1_1))

        p = RDFParser(profiles=['euro_dcat_ap'])

        p.g = g

        datasets = [d for d in p.datasets()]

        resource = datasets[0]['resources'][0]

        eq_(resource['format'], u'text/unknown-imt')
        eq_(resource['mimetype'], u'text/unknown-imt')
项目:dati-ckan-docker    作者:italia    | 项目源码 | 文件源码
def test_spatial_rdfs_label(self):
        g = Graph()

        dataset = URIRef('http://example.org/datasets/1')
        g.add((dataset, RDF.type, DCAT.Dataset))

        spatial_uri = URIRef('http://geonames/Newark')
        g.add((dataset, DCT.spatial, spatial_uri))

        g.add((spatial_uri, RDF.type, DCT.Location))
        g.add((spatial_uri, RDFS.label, Literal('Newark')))

        p = RDFParser(profiles=['euro_dcat_ap'])

        p.g = g

        datasets = [d for d in p.datasets()]

        extras = self._extras(datasets[0])

        eq_(extras['spatial_text'], 'Newark')
项目:dati-ckan-docker    作者:italia    | 项目源码 | 文件源码
def test_spatial_wkt_only(self):
        g = Graph()

        dataset = URIRef('http://example.org/datasets/1')
        g.add((dataset, RDF.type, DCAT.Dataset))

        spatial_uri = URIRef('http://geonames/Newark')
        g.add((dataset, DCT.spatial, spatial_uri))

        g.add((spatial_uri, RDF.type, DCT.Location))
        g.add((spatial_uri,
               LOCN.geometry,
               Literal('POINT (67 89)', datatype=GSP.wktLiteral)))

        p = RDFParser(profiles=['euro_dcat_ap'])

        p.g = g

        datasets = [d for d in p.datasets()]

        extras = self._extras(datasets[0])
        # NOTE: geomet returns floats for coordinates on WKT -> GeoJSON
        eq_(extras['spatial'], '{"type": "Point", "coordinates": [67.0, 89.0]}')
项目:dati-ckan-docker    作者:italia    | 项目源码 | 文件源码
def test_spatial_literal_only(self):
        g = Graph()

        dataset = URIRef('http://example.org/datasets/1')
        g.add((dataset, RDF.type, DCAT.Dataset))

        g.add((dataset, DCT.spatial, Literal('Newark')))

        p = RDFParser(profiles=['euro_dcat_ap'])

        p.g = g

        datasets = [d for d in p.datasets()]

        extras = self._extras(datasets[0])

        eq_(extras['spatial_text'], 'Newark')
        assert_true('spatial_uri' not in extras)
        assert_true('spatial' not in extras)
项目:dati-ckan-docker    作者:italia    | 项目源码 | 文件源码
def _add_agent(self, _dict, ref, basekey, _type):
        ''' Stores the Agent in this format:
                <dct:publisher rdf:resource="http://dati.gov.it/resource/Amministrazione/r_liguri"/>
                    <dcatapit:Agent rdf:about="http://dati.gov.it/resource/Amministrazione/r_liguri">
                        <rdf:type rdf:resource="&foaf;Agent"/>
                        <dct:identifier>r_liguri</dct:identifier>
                        <foaf:name>Regione Liguria</foaf:name>
                    </dcatapit:Agent>

            Returns the ref to the agent node
        '''

        agent_name = self._get_dict_value(_dict, basekey + '_name', 'N/A')
        agent_id = self._get_dict_value(_dict, basekey + '_identifier','N/A')

        agent = BNode()

        self.g.add((agent, RDF['type'], DCATAPIT.Agent))
        self.g.add((agent, RDF['type'], FOAF.Agent))
        self.g.add((ref, _type, agent))

        self.g.add((agent, FOAF.name, Literal(agent_name)))
        self.g.add((agent, DCT.identifier, Literal(agent_id)))

        return agent
项目:pycsvw    作者:bloomberg    | 项目源码 | 文件源码
def test_null_values_with_multiple_strings():
    csvw = CSVW(csv_path="tests/null1.csv",
                metadata_path="tests/null1.multiple.csv-metadata.json")
    rdf_contents = csvw.to_rdf()
    g = ConjunctiveGraph()
    g.parse(data=rdf_contents, format="turtle")

    all_objects = {x for x in g.objects()}

    assert Literal('null_key', datatype=XSD.token) not in all_objects
    assert Literal('null_sector') not in all_objects
    assert Literal('null_id', datatype=XSD.token) not in all_objects
    for id in ['10', '11', '12', '13']:
        assert Literal(id, datatype=XSD.token) not in all_objects

    all_preds = {x for x in g.predicates()}
    assert id_uri not in all_preds

    assert Literal('1', datatype=XSD.token) not in all_objects
项目:pycsvw    作者:bloomberg    | 项目源码 | 文件源码
def test_date():
    with CSVW(csv_path="tests/datatypes.date.csv",
              metadata_path="tests/datatypes.date.csv-metadata.json") as csvw:
        rdf_output = csvw.to_rdf()

    g = ConjunctiveGraph()
    g.parse(data=rdf_output, format="turtle")

    date1_lit = Literal("2017-01-09", datatype=XSD.date)
    assert len(list(g.triples((NS['event/1'], NS['date1'], date1_lit)))) == 1

    date2_lit = Literal("2017-01-10Z", datatype=XSD.date)
    assert len(list(g.triples((NS['event/1'], NS['date2'], date2_lit)))) == 1

    date3_lit = Literal("2017-01-11", datatype=XSD.date)
    assert len(list(g.triples((NS['event/1'], NS['date3'], date3_lit)))) == 1

    date4_lit = Literal("2002-09-24-06:00", datatype=XSD.date)
    assert len(list(g.triples((NS['event/1'], NS['date4'], date4_lit)))) == 1

    date5_lit = Literal("2002-09-24+04:00", datatype=XSD.date)
    assert len(list(g.triples((NS['event/1'], NS['date5'], date5_lit)))) == 1
项目:pycsvw    作者:bloomberg    | 项目源码 | 文件源码
def test_datetime():
    with CSVW(csv_path="tests/datatypes.datetime.csv",
              metadata_path="tests/datatypes.datetime.csv-metadata.json") as csvw:
        rdf_output = csvw.to_rdf()

    g = ConjunctiveGraph()
    g.parse(data=rdf_output, format="turtle")

    dt1_lit = Literal("2002-05-30T09:00:00", datatype=XSD.dateTime)
    assert len(list(g.triples((NS['event/1'], NS['datetime1'], dt1_lit)))) == 1

    dt2_lit = Literal("2002-05-30T09:30:10.5", datatype=XSD.dateTime)
    assert len(list(g.triples((NS['event/1'], NS['datetime2'], dt2_lit)))) == 1

    dt3_lit = Literal("2002-05-30T09:30:10Z", datatype=XSD.dateTime)
    assert len(list(g.triples((NS['event/1'], NS['datetime3'], dt3_lit)))) == 1

    dt4_lit = Literal("2002-05-30T09:30:10-06:00", datatype=XSD.dateTime)
    assert len(list(g.triples((NS['event/1'], NS['datetime4'], dt4_lit)))) == 1

    dt5_lit = Literal("2002-05-30T09:30:10+04:00", datatype=XSD.dateTime)
    assert len(list(g.triples((NS['event/1'], NS['datetime5'], dt5_lit)))) == 1

    datestamp = Literal("2004-04-12T13:20:00-05:00", datatype=XSD.dateTimeStamp)
    assert len(list(g.triples((NS['event/1'], NS['datetimestamp'], datestamp)))) == 1
项目:pycsvw    作者:bloomberg    | 项目源码 | 文件源码
def test_bool_with_format():
    csvw = CSVW(csv_path="tests/datatypes.bool.csv",
                metadata_path="tests/datatypes.bool.csv-metadata.json")
    rdf_output = csvw.to_rdf()
    g = ConjunctiveGraph()
    g.parse(data=rdf_output, format="turtle")

    true_lit = Literal(True, datatype=XSD.boolean)
    false_lit = Literal(False, datatype=XSD.boolean)

    assert len(list(g.triples((NS['event/1'], NS['bool1'], true_lit)))) == 1
    assert len(list(g.triples((NS['event/1'], NS['bool2'], true_lit)))) == 1
    assert len(list(g.triples((NS['event/1'], NS['bool3'], true_lit)))) == 1
    assert len(list(g.triples((NS['event/2'], NS['bool1'], false_lit)))) == 1
    assert len(list(g.triples((NS['event/2'], NS['bool2'], false_lit)))) == 1
    assert len(list(g.triples((NS['event/2'], NS['bool3'], false_lit)))) == 1
    assert len(list(g.triples((NS['event/3'], NS['bool1'], false_lit)))) == 1
    assert len(list(g.triples((NS['event/3'], NS['bool2'], false_lit)))) == 1
    assert len(list(g.triples((NS['event/3'], NS['bool3'], false_lit)))) == 1
项目:pycsvw    作者:bloomberg    | 项目源码 | 文件源码
def test_empty_boolean():
    csvw = CSVW(csv_path="tests/empty.csv",
                metadata_path="tests/empty.bool.csv-metadata.json")
    rdf_output = csvw.to_rdf()

    g = ConjunctiveGraph()
    g.parse(data=rdf_output, format="turtle")

    assert len(g) == 2
    assert len(list(g.triples((None, None, Literal(False))))) == 2

    csvw = CSVW(csv_path="tests/empty.csv",
                metadata_path="tests/empty.invalid_base.csv-metadata.json")
    rdf_output = csvw.to_rdf()

    g = ConjunctiveGraph()
    g.parse(data=rdf_output, format="turtle")

    assert len(g) == 0
项目:bibcat    作者:KnowledgeLinks    | 项目源码 | 文件源码
def setUp(self):
        self.graph = rdflib.Graph()
        self.entity = rdflib.URIRef("https://bibcat.org/test-entity")
        self.simple_title_bnode = rdflib.BNode()
        self.graph.add((self.entity,
                        rdflib.RDF.type,
                        BF.Title))
        self.graph.add((self.entity, BF.title, self.simple_title_bnode))
        self.graph.add((self.simple_title_bnode, 
                        BF.mainTitle, 
                        rdflib.Literal("This is a test")))
        self.top_title_bnode = rdflib.BNode()
        self.graph.add((self.entity, BF.title, self.top_title_bnode))
        secondary_title_bnode = rdflib.BNode()
        self.graph.add((self.top_title_bnode, rdflib.RDF.type, BF.Topic))
        self.graph.add((self.top_title_bnode, 
                        rdflib.RDFS.label, 
                        rdflib.Literal("This is a title and a name")))
        self.graph.add((self.top_title_bnode, SCHEMA.name, secondary_title_bnode))
        self.graph.add((secondary_title_bnode, 
                        rdflib.RDF.value,
                        rdflib.Literal("This is a name")))
项目:bibcat    作者:KnowledgeLinks    | 项目源码 | 文件源码
def setUp(self):
        self.graph = rdflib.Graph()
        self.entity_one = rdflib.URIRef("https://bibcat.org/test-entity")
        self.graph.add((self.entity_one, 
                        rdflib.RDF.type, 
                        rdflib.RDFS.Resource))
        self.graph.add((self.entity_one, 
                        rdflib.RDFS.label, 
                        rdflib.Literal("Test Entity One", lang="en")))
        self.entity_two = rdflib.URIRef("https://bibcat.org/test-entity-two")
        self.graph.add((self.entity_two, 
                        rdflib.RDF.type, 
                        rdflib.RDFS.Resource))
        self.graph.add((self.entity_two, 
                        rdflib.RDFS.label, 
                        rdflib.Literal("Test Entity Two", lang="en")))
        title_bnode = rdflib.BNode()
        self.graph.add((self.entity_two, BF.title, title_bnode))
        self.graph.add((title_bnode, rdflib.RDF.type, BF.Title))
        self.graph.add((title_bnode, BF.subTitle, rdflib.Literal("Subtitle ")))
项目:bibcat    作者:KnowledgeLinks    | 项目源码 | 文件源码
def __generate_object_term__(self, datatype, value):
        """Internal method takes a datatype (can be None) and returns
        the RDF Object Term

        Args:

        -----
            datatype: None, or rdflib.URIRef
            value: Varys depending on ingester
        """
        if datatype == NS_MGR.xsd.anyURI:
            term = rdflib.URIRef(value)
        elif datatype:
            term = rdflib.Literal(value, datatype=datatype)
        else:
            term = rdflib.Literal(value)
        return term
项目:bibcat    作者:KnowledgeLinks    | 项目源码 | 文件源码
def generate_term(self, **kwargs):
        """Method generates a rdflib.Term based on kwargs"""
        term_map = kwargs.pop('term_map')
        if hasattr(term_map, "termType") and\
            term_map.termType == NS_MGR.rr.BlankNode:
            return rdflib.BNode()
        if not hasattr(term_map, 'datatype'):
            term_map.datatype = NS_MGR.xsd.anyURI
        if hasattr(term_map, "template") and term_map.template is not None:
            template_vars = kwargs
            template_vars.update(self.constants)
            # Call any functions to generate values
            for key, value in template_vars.items():
                if hasattr(value, "__call__"):
                    template_vars[key] = value()
            raw_value = term_map.template.format(**template_vars)
            if term_map.datatype == NS_MGR.xsd.anyURI:
                return rdflib.URIRef(raw_value)
            return rdflib.Literal(raw_value,
                                  datatype=term_map.datatype)
        if term_map.reference is not None:
            # Each child will have different mechanisms for referencing the
            # source based
            return self.__generate_reference__(term_map, **kwargs)
项目:bibcat    作者:KnowledgeLinks    | 项目源码 | 文件源码
def __generate_reference__(self, triple_map, **kwargs):
        """Generates a RDF entity based on triple map

        Args:
            triple_map(SimpleNamespace): Triple Map
        """
        raw_value = self.source.get(str(triple_map.reference))
        if raw_value is None or len(raw_value) < 1:
            return
        if hasattr(triple_map, "datatype"):
            if triple_map.datatype == NS_MGR.xsd.anyURI:
                output = rdflib.URIRef(raw_value)
            else:
                output = rdflib.Literal(
                    raw_value,
                    datatype=triple_map.datatype)
        else:
            output = rdflib.Literal(raw_value)
        return output
项目:bibcat    作者:KnowledgeLinks    | 项目源码 | 文件源码
def __reference_handler__(self, **kwargs):
        """Internal method for handling rr:reference in triples map

        Keyword Args:

        -------------
            predicate_obj_map: SimpleNamespace
            obj: dict
            subject: rdflib.URIRef
        """
        subjects = []
        pred_obj_map = kwargs.get("predicate_obj_map")
        obj = kwargs.get("obj")
        subject = kwargs.get("subject")
        if pred_obj_map.reference is None:
            return subjects
        predicate = pred_obj_map.predicate
        ref_exp = jsonpath_ng.parse(str(pred_obj_map.refernce))
        found_objects = [r.value for r in ref_exp(obj)]
        for row in found_objects:
            self.output.add((subject, predicate, rdflib.Literal(row)))
项目:bibcat    作者:KnowledgeLinks    | 项目源码 | 文件源码
def add_admin_metadata(self, entity):
        """Takes a graph and adds the AdminMetadata for the entity

        Args:
            entity (rdflib.URIRef): URI of the entity
        """
        generate_msg = "Generated by BIBCAT version {} from KnowledgeLinks.io"
        generation_process = rdflib.BNode()
        self.graph.add((generation_process,
                        rdflib.RDF.type,
                        NS_MGR.bf.GenerationProcess))
        self.graph.add((generation_process,
                        NS_MGR.bf.generationDate,
                        rdflib.Literal(
                            datetime.datetime.utcnow().isoformat())))
        self.graph.add((generation_process,
                        rdflib.RDF.value,
                        rdflib.Literal(generate_msg.format(__version__),
                                       lang="en")))
        #! Should add bibcat's current git MD5 commit
        self.graph.add(
            (entity,
             NS_MGR.bf.generationProcess,
             generation_process)
        )
项目:bibcat    作者:KnowledgeLinks    | 项目源码 | 文件源码
def add_admin_metadata(self, entity):
        """Takes a graph and adds the AdminMetadata for the entity

        Args:
            entity (rdflib.URIRef): URI of the entity
        """
        generate_msg = "Generated by BIBCAT version {} from KnowledgeLinks.io"
        generation_process = rdflib.BNode()
        self.graph.add((generation_process,
                        rdflib.RDF.type,
                        NS_MGR.bf.GenerationProcess))
        self.graph.add((generation_process,
                        NS_MGR.bf.generationDate,
                        rdflib.Literal(
                            datetime.datetime.utcnow().isoformat())))
        self.graph.add((generation_process,
                        rdflib.RDF.value,
                        rdflib.Literal(generate_msg.format(__version__),
                                       lang="en")))
        #! Should add bibcat's current git MD5 commit
        self.graph.add(
            (entity,
             NS_MGR.bf.generationProcess,
             generation_process)
        )
项目:bibcat    作者:KnowledgeLinks    | 项目源码 | 文件源码
def __top_result__(query_result, type_=None, class_=None):
    """Internal function takes a JSON query results and returns
    the top result as a rdflib.URIRef IRI if more than one.

    Args:
    ----
        query_result(dict): Query result
    """
    if query_result.get("totalResultsCount", 0) > 0:
        print(query_result.get("geonames")[0])

        top_result = query_result.get("geonames")[0]
        geo_id =  top_result.get("geonameId")
        place_iri = rdflib.URIRef("{}{}/".format(IRI_BASE, geo_id))
        if type_ is not None and type_.startswith("rdf"):
            output = rdflib.Graph()
            rdf_type = rdflib.RDFS.Resource
            if class_ is not None:
                rdf_type = class_
            output.add((place_iri, rdflib.RDF.type, rdf_type))
            output.add((place_iri, 
                        rdflib.RDFS.label, 
                        rdflib.Literal(top_result.get("name"))))
            return output
        return place_iri
项目:bibcat    作者:KnowledgeLinks    | 项目源码 | 文件源码
def __add_work_title__(self, work_graph, work_uri, instance_uri):
        """Method takes a new work graph and instance uri, queries for
        bf:InstanceTitle of instance uri and adds values to work graph

        Args:
            work_graph(rdflib.Graph): RDF Graph of new BF Work
            instance_uri(rdflib.URIRef): URI of BF Instance
        """
        instance_key = str(instance_uri)
        if instance_key in self.processed and\
        "title" in self.processed[instance_key]:
            work_title_bnode = rdflib.BNode()
            work_graph.add((work_uri,  NS_MGR.bf.title, work_title_bnode))
            work_graph.add((work_title_bnode, 
                            NS_MGR.rdf.type, 
                            NS_MGR.bf.WorkTitle))
            for row in self.processed[instance_key]["title"]:
                main_title, subtitle = row["mainTitle"], row["subtitle"]
                work_graph.add((work_title_bnode,
                                NS_MGR.bf.mainTitle,
                                rdflib.Literal(main_title)))
                if subtitle:
                    work_graph.add((work_title_bnode,
                                    NS_MGR.bf.subtitle,
                                    rdflib.Literal(subtitle)))
项目:perseids-manifold    作者:RDACollectionsWG    | 项目源码 | 文件源码
def graph_member(self, ldp_root, c_id, obj=None):
        if not obj:
            obj = self.member()
        node = URIRef(ldp_root+encoder.encode(c_id)+"/member/"+encoder.encode(obj.id))
        mappings = URIRef(node+"#mappings")

        g = Graph(identifier=node)
        g.add((node, RDF.type, RDA.Member))
        g.add((node, DCTERMS.identifier, Literal(obj.id)))
        g.add((node, RDA.location, Literal(obj.location)))
        if hasattr(obj, 'datatype'):
            g.add((node, RDA.datatype, Literal(obj.datatype)))
        if hasattr(obj, 'ontology'):
            g.add((node, RDA.ontology, Literal(obj.ontology)))
        if hasattr(obj, 'mappings'):
            g.add((node, RDA.mappings, mappings))
            mp = obj.mappings
            if hasattr(mp, 'role'):
                g.add((mappings, RDA.role, URIRef(obj.mappings.role)))
            if hasattr(mp, 'index'):
                g.add((mappings, RDA.itemIndex, Literal(obj.mappings.index)))
            if hasattr(mp, 'dateAdded'):
                g.add((mappings, RDA.dateAdded, Literal(obj.mappings.dateAdded)))
        return g
项目:aide    作者:Lambda-3    | 项目源码 | 文件源码
def string_to_literal(string, to_uri=True):
    """
    Creates a literal or URI object from a given string.
    :param string: String to convert from
    :param to_uri: Whether to try convert to an URI if not a valid literal is given.
    :return: Literal object or URI object (if failed to create a literal and desired) or else input string.
    """
    if "^^http://www.w3.org/2001/XMLSchema#" in string:
        try:
            value, datatype = string.split("^^")
            return Literal(value, datatype=datatype)
        except:
            loginfo("Could not deliteralize string.")
            return Literal(string)
    else:
        loginfo("not a literal")
        return URIRef(string) if to_uri else string
项目:table-extractor    作者:dbpedia    | 项目源码 | 文件源码
def check_value_type(self, value):
        """
        Check type of a value that I'm analyzing
        :param value to check
        :return: value that are casted to a rdflib type (float, string or uri if it's a resource)
        """
        # i can have input value like list or like single input, i need to make a filter and get
        # unique element of this list
        result = value
        if self.mapper.is_float(result):
            data_type = rdflib.namespace.XSD.float
        elif self.mapper.is_int(result):
            data_type = rdflib.namespace.XSD.int
        else:
            # If this string represents a resource
            resource = self.check_if_is_resource(result)
            # if it is a resource in dbpedia
            if resource:
                return rdflib.URIRef(resource)
            else:
                data_type = rdflib.namespace.XSD.string
        return rdflib.Literal(result, datatype=data_type)
项目:ontology-alchemy    作者:globality-corp    | 项目源码 | 文件源码
def _resolve_range(self, range_uri):
        """
        Resolve a rdfs.Property rdfs.range value to a type.

        """
        range_name = self._extract_name(range_uri)
        if range_name in self.namespace:
            return self.namespace[range_name]
        if is_a_literal(range_uri):
            return Literal
        elif is_a_property(range_uri):
            return RDF_Property
        elif is_a_list(range_uri):
            return RDF.List

        return RDFS_Class
项目:open-semantic-search-apps    作者:opensemanticsearch    | 项目源码 | 文件源码
def rdf(request):

    uri = request.GET['uri']

    g = Graph()

    annotations = Annotation.objects.filter(uri=uri)

    for annotation in annotations:

        if annotation.title:
            g.add( ( URIRef(annotation.uri), URIRef("http://localhost/metawiki/index.php/Special:URIResolver/Property-3ATitle"), Literal(annotation.title) ) )

        if annotation.notes:
            g.add( ( URIRef(annotation.uri), URIRef("http://localhost/metawiki/index.php/Special:URIResolver/Property-3ANotes"), Literal(annotation.notes) ) )

        for tag in annotation.tags.all():
            g.add( ( URIRef(annotation.uri), URIRef("http://localhost/metawiki/index.php/Special:URIResolver/Property-3ATag"), Literal(tag.prefLabel) ) )

    status = HttpResponse(g.serialize( format='xml' ) )
    status["Content-Type"] = "application/rdf+xml" 
    return status
项目:wos2vivo    作者:Clarivate-SAR    | 项目源码 | 文件源码
def add_vcard(self, position, name):
        """
        :param position: number in author order
        :param name: name as string - last, first, middle
        :return: rdflib.Graph
        """
        g = Graph()

        # vcard individual
        vci_uri = D['vcard-individual-' + position + '-' + self.localid]
        g.add((vci_uri, RDF.type, VCARD.Individual))

        # vcard name
        vcn_uri = D['vcard-name-' + position + '-' + self.localid]
        g.add((vcn_uri, RDF.type, VCARD.Name))
        g.add((vcn_uri, RDFS.label, Literal(name)))
        # Parse name into first, last, middle
        name = HumanName(name)
        g.add((vcn_uri, VCARD.givenName, Literal(name.first)))
        g.add((vcn_uri, VCARD.familyName, Literal(name.last)))
        if name.middle != "":
            g.add((vcn_uri, VIVO.middleName, Literal(name.middle)))
        # Relate vcard individual to vcard name
        g.add((vci_uri, VCARD.hasName, vcn_uri))
        return vci_uri, g
项目:wos2vivo    作者:Clarivate-SAR    | 项目源码 | 文件源码
def authorship(self):
        """
        Add authorship statements and vcards for authors.
        :return: rdflib.Graph
        """
        g = Graph()
        for num, au in enumerate(self.authors()):
            position = str(num + 1)

            vcard_individual_uri, vcard_stmts = self.add_vcard(position, au)
            g += vcard_stmts

            # Authorship
            aship_uri = D['authorship-' + position + '-' + self.localid]
            g.add((aship_uri, RDF.type, VIVO.Authorship))
            g.add((aship_uri, VIVO.rank, Literal(int(position))))

            # Relate pub and authorship
            g.add((aship_uri, VIVO.relates, self.pub_uri))

            # Relate vcard and authorship
            g.add((aship_uri, VIVO.relates, vcard_individual_uri))

        return g
项目:wos2vivo    作者:Clarivate-SAR    | 项目源码 | 文件源码
def add_vcard_weblink(self):
        """
        Build statements for weblinks in VIVO.
        :return: rdflib.Graph
        """
        base_url = "http://ws.isiknowledge.com/cps/openurl/service?url_ver=Z39.88-2004&rft_id=info:ut/WOS:{}"
        g = Graph()

        # vcard individual for pub
        vci_uri = D['vcard-individual-pub-' + self.localid]
        g.add((vci_uri, RDF.type, VCARD.Individual))

        # vcard URL
        vcu_uri = D['vcard-url-pub-' + self.localid]
        g.add((vcu_uri, RDF.type, VCARD.URL))
        g.add((vcu_uri, RDFS.label, Literal(u"Web of Science™")))
        g.add((vcu_uri, VCARD.url, Literal(base_url.format(self.ut()))))

        # Relate vcard individual to url
        g.add((vci_uri, VCARD.hasURL, vcu_uri))
        return vci_uri, g
项目:rap-etl    作者:RAP-research-output-impact    | 项目源码 | 文件源码
def _vcard_email(self):
        g = Graph()
        try:
            emails = [e for e in self.profile["emails"].split("|")]
        except KeyError:
            try:
                emails = [self.profile['email']]
            except KeyError:
                emails = []
        for email in emails:
            vt = Resource(g, self.vcard_email_uri)
            vt.set(RDF.type, VCARD.Work)
            # Label probably not necessary
            vt.set(RDFS.label, Literal(email))
            vt.set(VCARD.email, Literal(email))
        return g
项目:rap-etl    作者:RAP-research-output-impact    | 项目源码 | 文件源码
def org_total_cites(orgs):
    g = Graph()
    for org_name in orgs:
        org_uri = waan_uri(org_name)
        #print>>sys.stderr, "Processing", org_name, "total cites"
        ln = local_name(org_uri)
        tc = load_incites_json_file(org_name, 'cites')
        for item in tc:
            curi = D['citecount-' + ln + '-' + str(item['year'])]
            g.add((curi, RDF.type, WOS.InCitesCitesPerYear))
            g.add((curi, RDFS.label, Literal("{} - {}".format(item['year'], item['count']))))
            g.add((curi, WOS.number, Literal(item['count'])))
            g.add((curi, WOS.year, Literal(item['year'])))
            g.add((org_uri, VIVO.relates, curi))


    #print g.serialize(format="turtle")
    ng = "http://localhost/data/incites-total-cites-year-counts"
    backend.sync_updates(ng, g)
    return True
项目:rap-etl    作者:RAP-research-output-impact    | 项目源码 | 文件源码
def org_top_categories(orgs):
    g = Graph()
    for org_name in orgs:
        #print>>sys.stderr, "Processing", org_name, "top categories"
        org_uri = waan_uri(org_name)
        ln = local_name(org_uri)
        top_cat = load_incites_json_file(org_name, 'categories')
        for item in top_cat:
            cat = item['category']
            category_uri = get_category_uri(cat)
            curi = D['topcategory-'] + ln + slugify(cat)
            g.add((curi, RDF.type, WOS.InCitesTopCategory))
            g.add((curi, RDFS.label, Literal("{} - {}".format(org_name, cat))))
            g.add((curi, WOS.number, Literal(item['count'])))
            g.add((curi, VIVO.relates, category_uri))
            g.add((curi, VIVO.relates, org_uri))
    #print g.serialize(format="turtle")
    ng = "http://localhost/data/incites-top-categories"
    backend.sync_updates(ng, g)
    return True
项目:rap-etl    作者:RAP-research-output-impact    | 项目源码 | 文件源码
def run(self):
        g = Graph()
        wos_top = D['wos-topics']
        g.add((wos_top, RDF.type, WOS.TopTopic))
        g.add((wos_top, RDFS.label, Literal("Web of Science Subject Schemas")))
        with open(self.input_file) as inf:
            for row in csv.DictReader(inf):
                ra = row['Research Area (eASCA)']
                category = row['WoS Category (tASCA)']
                broad, ra1, ra2 = self.chunk_ras(ra)
                broad_uri, cg = self.do_term(broad, clz=WOS.BroadDiscipline)
                g.add((broad_uri, SKOS.broader, wos_top))
                g += cg
                ra1_uri, cg = self.do_term(ra1, broader=broad_uri, clz=WOS.ResearchArea, uri_prefix="wosra")
                g += cg
                ra2_uri = None
                if ra2 is not None:
                    ra2_uri, cg = self.do_term(ra2, broader=ra1_uri, clz=WOS.ResearchArea, uri_prefix="wosra")
                    g += cg
                cat_uri, cg = self.do_term(category, broader=ra2_uri or ra1_uri, clz=WOS.Category)
                g += cg

        self.serialize(g)
项目:rap-etl    作者:RAP-research-output-impact    | 项目源码 | 文件源码
def add_grant(grant, pub_uri):
    """
    Create a funder and grant(s).
    """
    g = Graph()
    if grant.get("agency") is None:
        logger.info("No agency found for {} with ids.".format(pub_uri, ";".join(grant.get("ids", []))))
        return g
    slug = slugify(grant["agency"])
    uri = D['funder-' + slug]
    g.add((uri, RDF.type, WOS.Funder))
    g.add((uri, RDFS.label, Literal(grant["agency"])))
    for gid in grant["ids"]:
        label = "{} - {}".format(grant["agency"], gid)
        guri = D['grant-'] + slugify(label)
        g.add((guri, RDF.type, WOS.Grant))
        g.add((guri, RDFS.label, Literal(label)))
        g.add((guri, WOS.grantId, Literal(gid)))
        g.add((guri, VIVO.relates, uri))
        g.add((guri, VIVO.relates, pub_uri))
    return g