Python OpenSSL 模块,crypto() 实例源码

我们从Python开源项目中,提取了以下13个代码示例,用于说明如何使用OpenSSL.crypto()

项目:keyview    作者:ALSchwalm    | 项目源码 | 文件源码
def get_pkcs7_certificates(bundle):
    from OpenSSL._util import (
        ffi as _ffi,
        lib as _lib
    )
    from OpenSSL.crypto import X509

    certs = _ffi.NULL
    if bundle.type_is_signed():
        certs = bundle._pkcs7.d.sign.cert
    elif bundle.type_is_signedAndEnveloped():
        certs = bundle._pkcs7.d.signed_and_enveloped.cert

    pycerts = []
    for i in range(_lib.sk_X509_num(certs)):
        pycert = X509.__new__(X509)
        pycert._x509 = _ffi.gc(_lib.X509_dup(_lib.sk_X509_value(certs, i)), _lib.X509_free)
        pycerts.append(pycert)
    if not pycerts:
        return tuple()
    return tuple(pycerts)
项目:keyview    作者:ALSchwalm    | 项目源码 | 文件源码
def display_item_info(item, filename, print_filename=False):
    if print_filename is True:
        print("#######[ {} ]#######".format(filename))

    if (isinstance(item, rsa.RSAPrivateKey) or
        isinstance(item, dsa.DSAPrivateKey) or
        isinstance(item, ec.EllipticCurvePrivateKey)):
        display_private_key(item)

    elif (isinstance(item, rsa.RSAPublicKey) or
          isinstance(item, dsa.DSAPublicKey) or
          isinstance(item, ec.EllipticCurvePublicKey)):
        display_public_key(item)

    elif isinstance(item, OpenSSL.crypto.PKCS12):
        display_pkcs12(item)

    elif isinstance(item, OpenSSL.crypto.PKCS7):
        display_pkcs7(item)

    elif isinstance(item, Certificate):
        display_x509_cert(item)
项目:vault-ca    作者:crisidev    | 项目源码 | 文件源码
def test_is_certificate_valid_false_less_than_24h(vault_ca_obj, monkeypatch):

    class MockX509(OpenSSL.crypto.X509):

        def get_notAfter(self):
            return b'20000510141643Z'

    class MockDatetime(datetime.datetime):

        @classmethod
        def now(cls):
            return datetime.datetime(2000, 5, 9, 15, 16, 43, 100000)

    monkeypatch.setattr(OpenSSL.crypto, 'X509', MockX509)
    monkeypatch.setattr(vault_ca, 'datetime', MockDatetime)

    assert not vault_ca_obj._is_certificate_valid('tests/fixtures/acomponent-test.test.org.pem.ok')
项目:vault-ca    作者:crisidev    | 项目源码 | 文件源码
def test_is_certificate_valid_true_more_than_24h(vault_ca_obj, monkeypatch):

    class MockX509(OpenSSL.crypto.X509):

        def get_notAfter(self):
            return b'20000510141643Z'

    class MockDatetime(datetime.datetime):

        @classmethod
        def now(cls):
            return datetime.datetime(2000, 5, 9, 13, 16, 43, 100000)

    monkeypatch.setattr(OpenSSL.crypto, 'X509', MockX509)
    monkeypatch.setattr(vault_ca, 'datetime', MockDatetime)

    assert vault_ca_obj._is_certificate_valid('tests/fixtures/acomponent-test.test.org.pem.ok')
项目:reflectrpc    作者:aheck    | 项目源码 | 文件源码
def sign_cert_request(ca_cert, ca_key, cert_request, serial):
    req = OpenSSL.crypto.load_certificate_request(OpenSSL.crypto.FILETYPE_PEM,
                                              cert_request)

    cert = OpenSSL.crypto.X509()
    cert.set_subject(req.get_subject())
    cert.set_serial_number(1)
    cert.gmtime_adj_notBefore(0)
    cert.gmtime_adj_notAfter(10*365*24*60*60)
    cert.set_issuer(ca_cert.get_subject())
    cert.set_pubkey(req.get_pubkey())
    cert.sign(ca_key, "sha256")

    cert_file = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM,
            cert)

    return cert_file
项目:keyview    作者:ALSchwalm    | 项目源码 | 文件源码
def openssl_to_cryptography_cert(cert):
    backend = cryptography.hazmat.backends.default_backend()
    buff = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_ASN1, cert)
    return load_der_x509_certificate(buff, backend)


# This function was modified from this PR to the pyopenssl project
# https://github.com/pyca/pyopenssl/pull/367/files
项目:keyview    作者:ALSchwalm    | 项目源码 | 文件源码
def load_binary_item(buffer):
    backend = cryptography.hazmat.backends.default_backend()

    # PKCS#1/PKCS#8 private in DER
    try:
        return cryptography.hazmat.primitives.serialization.load_der_private_key(buffer, password=None, backend=backend)
    except:
        pass

    # PKCS#1/PKCS#8 public in DER
    try:
        return cryptography.hazmat.primitives.serialization.load_der_public_key(buffer, backend)
    except:
        pass

    # X509 certificate in DER
    try:
        return load_der_x509_certificate(buffer, backend)
    except:
        pass

    # PKCS7 certificate bundle in DER
    try:
        return OpenSSL.crypto.load_pkcs7_data(OpenSSL.crypto.FILETYPE_ASN1, buffer)
    except:
        pass

    # PKCS12 bundle (binary)
    try:
        return OpenSSL.crypto.load_pkcs12(buffer)
    except:
        pass

    raise Exception("Unknown file type for: {}".format(buffer))
项目:keyview    作者:ALSchwalm    | 项目源码 | 文件源码
def load_pem_file_item(buffer):
    backend = cryptography.hazmat.backends.default_backend()

    # PKCS#1/PKCS#8 private in PEM
    try:
        return cryptography.hazmat.primitives.serialization.load_pem_private_key(buffer, password=None, backend=backend)
    except:
        pass

    # PKCS#1/PKCS#8 public in PEM
    try:
        return cryptography.hazmat.primitives.serialization.load_pem_public_key(buffer, backend)
    except:
        pass

    # X509 certificate in PEM
    try:
        return load_pem_x509_certificate(buffer, backend)
    except:
        pass

    # PKCS7 certificate bundle in PEM
    try:
        return OpenSSL.crypto.load_pkcs7_data(OpenSSL.crypto.FILETYPE_PEM, buffer)
    except:
        pass

    raise Exception("Unknown file type for: {}".format(buffer))
项目:vault-ca    作者:crisidev    | 项目源码 | 文件源码
def test_is_certificate_valid_true(vault_ca_obj, monkeypatch):

    class MockX509(OpenSSL.crypto.X509):

        def get_notAfter(self):
            return b'20220510141643Z'

    monkeypatch.setattr(OpenSSL.crypto, 'X509', MockX509)
    assert vault_ca_obj._is_certificate_valid('tests/fixtures/acomponent-test.test.org.pem.ok')
项目:vault-ca    作者:crisidev    | 项目源码 | 文件源码
def test_is_certificate_valid_false(vault_ca_obj, monkeypatch):

    class MockX509(OpenSSL.crypto.X509):

        def get_notAfter(self):
            return b'20000510141643Z'

    monkeypatch.setattr(OpenSSL.crypto, 'X509', MockX509)

    assert not vault_ca_obj._is_certificate_valid('tests/fixtures/acomponent-test.test.org.pem.ok')
项目:reflectrpc    作者:aheck    | 项目源码 | 文件源码
def gen_cert_request(cname):
    pkey = OpenSSL.crypto.PKey()
    pkey.generate_key(OpenSSL.crypto.TYPE_RSA, 2048)

    req = OpenSSL.crypto.X509Req()
    req.get_subject().CN = cname
    req.set_pubkey(pkey)
    req.sign(pkey, 'sha512')

    req_file = OpenSSL.crypto.dump_certificate_request(OpenSSL.crypto.FILETYPE_PEM, req)
    key_file = OpenSSL.crypto.dump_privatekey(OpenSSL.crypto.FILETYPE_PEM, pkey)

    return req_file, key_file
项目:loopchain    作者:theloopkr    | 项目源码 | 文件源码
def test_pkcs12_format(self):
        """
        PKCS12 ???? ???/??? ??? ?? ??
        """
        logging.debug("----- PKCS12 Test Start -----")
        # ECC ? ? ??
        pri_key = ec.generate_private_key(ec.SECP256K1(), default_backend())
        pub_key = pri_key.public_key()

        logging.debug("Key_Type : %s", type(pri_key))

        # ??? ??
        cert = self._generate_cert(pub_key=pub_key, issuer_key=pri_key, subject_name="test")

        cert_pem = cert.public_bytes(
            encoding=serialization.Encoding.DER
        )
        key_pem = pri_key.private_bytes(
            encoding=serialization.Encoding.DER,
            format=serialization.PrivateFormat.PKCS8,
            encryption_algorithm=serialization.NoEncryption()
        )

        # ???/???? OpenSSL Key? ??
        crypto = OpenSSL.crypto
        cert_ssl_key = crypto.load_certificate(
            type=crypto.FILETYPE_ASN1,
            buffer=cert_pem
        )
        priv_ssl_key = crypto.load_privatekey(
            type=crypto.FILETYPE_ASN1,
            buffer=key_pem,
            passphrase=None
        )

        logging.debug("Key_Type : %s", type(priv_ssl_key))

        # ??? ??????? PKCS12???? ??
        p12 = OpenSSL.crypto.PKCS12()
        p12.set_privatekey(priv_ssl_key)
        p12.set_certificate(cert_ssl_key)
        pfx = p12.export()

        pfx_b64 = base64.b64encode(pfx, altchars=None)
        logging.debug("%s", pfx_b64)
项目:IoT_pki    作者:zibawa    | 项目源码 | 文件源码
def cert_collect_pkcs12(request,token):
    #when supplied with token, looks up request, checks approval status and returns appropriate HttpResponse
    #in this case token is taken from browser


    certs = Cert_request.objects.filter(token=token)

    try:
        cert_request=certs[0]
    except:
        return HttpResponse(status=404,content="Not found")    
    logger.info("retrieved cert %s",cert_request.common_name)

    if (cert_request.issued==True):
        logger.info("Certificate already issued %s",cert_request.common_name)
        response= HttpResponse(status=410,content="Cert already issued")
    elif (cert_request.approved==False):
        logger.debug("Certificate pending approval %s",cert_request.common_name)
        response=HttpResponse(status=403, content="Pending approval")         
    else:    
        logger.info("Generating cert %s",cert_request.common_name)
        datastream=generateNewX509(cert_request)

        c=OpenSSL.crypto
        cert=c.load_certificate(c.FILETYPE_PEM, datastream)
        key=c.load_privatekey(c.FILETYPE_PEM, datastream)
        p12 = OpenSSL.crypto.PKCS12()
        p12.set_privatekey(key)
        p12.set_certificate(cert)
        #open( "container.pfx", 'w' ).write( p12.export() )
        p12dataStream=p12.export()

        response = HttpResponse(p12dataStream, content_type='application/x-pkcs12',status=201)
        response['Content-Disposition'] = 'attachment; filename="client_cert.p12"'
        #update cert_request to avoid duplicates being created
        #!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!uncomment below!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
        #cert_request.issued=True
        cert_request.save()



    return response