我们从Python开源项目中,提取了以下13个代码示例,用于说明如何使用OpenSSL.crypto()。
def get_pkcs7_certificates(bundle): from OpenSSL._util import ( ffi as _ffi, lib as _lib ) from OpenSSL.crypto import X509 certs = _ffi.NULL if bundle.type_is_signed(): certs = bundle._pkcs7.d.sign.cert elif bundle.type_is_signedAndEnveloped(): certs = bundle._pkcs7.d.signed_and_enveloped.cert pycerts = [] for i in range(_lib.sk_X509_num(certs)): pycert = X509.__new__(X509) pycert._x509 = _ffi.gc(_lib.X509_dup(_lib.sk_X509_value(certs, i)), _lib.X509_free) pycerts.append(pycert) if not pycerts: return tuple() return tuple(pycerts)
def display_item_info(item, filename, print_filename=False): if print_filename is True: print("#######[ {} ]#######".format(filename)) if (isinstance(item, rsa.RSAPrivateKey) or isinstance(item, dsa.DSAPrivateKey) or isinstance(item, ec.EllipticCurvePrivateKey)): display_private_key(item) elif (isinstance(item, rsa.RSAPublicKey) or isinstance(item, dsa.DSAPublicKey) or isinstance(item, ec.EllipticCurvePublicKey)): display_public_key(item) elif isinstance(item, OpenSSL.crypto.PKCS12): display_pkcs12(item) elif isinstance(item, OpenSSL.crypto.PKCS7): display_pkcs7(item) elif isinstance(item, Certificate): display_x509_cert(item)
def test_is_certificate_valid_false_less_than_24h(vault_ca_obj, monkeypatch): class MockX509(OpenSSL.crypto.X509): def get_notAfter(self): return b'20000510141643Z' class MockDatetime(datetime.datetime): @classmethod def now(cls): return datetime.datetime(2000, 5, 9, 15, 16, 43, 100000) monkeypatch.setattr(OpenSSL.crypto, 'X509', MockX509) monkeypatch.setattr(vault_ca, 'datetime', MockDatetime) assert not vault_ca_obj._is_certificate_valid('tests/fixtures/acomponent-test.test.org.pem.ok')
def test_is_certificate_valid_true_more_than_24h(vault_ca_obj, monkeypatch): class MockX509(OpenSSL.crypto.X509): def get_notAfter(self): return b'20000510141643Z' class MockDatetime(datetime.datetime): @classmethod def now(cls): return datetime.datetime(2000, 5, 9, 13, 16, 43, 100000) monkeypatch.setattr(OpenSSL.crypto, 'X509', MockX509) monkeypatch.setattr(vault_ca, 'datetime', MockDatetime) assert vault_ca_obj._is_certificate_valid('tests/fixtures/acomponent-test.test.org.pem.ok')
def sign_cert_request(ca_cert, ca_key, cert_request, serial): req = OpenSSL.crypto.load_certificate_request(OpenSSL.crypto.FILETYPE_PEM, cert_request) cert = OpenSSL.crypto.X509() cert.set_subject(req.get_subject()) cert.set_serial_number(1) cert.gmtime_adj_notBefore(0) cert.gmtime_adj_notAfter(10*365*24*60*60) cert.set_issuer(ca_cert.get_subject()) cert.set_pubkey(req.get_pubkey()) cert.sign(ca_key, "sha256") cert_file = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, cert) return cert_file
def openssl_to_cryptography_cert(cert): backend = cryptography.hazmat.backends.default_backend() buff = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_ASN1, cert) return load_der_x509_certificate(buff, backend) # This function was modified from this PR to the pyopenssl project # https://github.com/pyca/pyopenssl/pull/367/files
def load_binary_item(buffer): backend = cryptography.hazmat.backends.default_backend() # PKCS#1/PKCS#8 private in DER try: return cryptography.hazmat.primitives.serialization.load_der_private_key(buffer, password=None, backend=backend) except: pass # PKCS#1/PKCS#8 public in DER try: return cryptography.hazmat.primitives.serialization.load_der_public_key(buffer, backend) except: pass # X509 certificate in DER try: return load_der_x509_certificate(buffer, backend) except: pass # PKCS7 certificate bundle in DER try: return OpenSSL.crypto.load_pkcs7_data(OpenSSL.crypto.FILETYPE_ASN1, buffer) except: pass # PKCS12 bundle (binary) try: return OpenSSL.crypto.load_pkcs12(buffer) except: pass raise Exception("Unknown file type for: {}".format(buffer))
def load_pem_file_item(buffer): backend = cryptography.hazmat.backends.default_backend() # PKCS#1/PKCS#8 private in PEM try: return cryptography.hazmat.primitives.serialization.load_pem_private_key(buffer, password=None, backend=backend) except: pass # PKCS#1/PKCS#8 public in PEM try: return cryptography.hazmat.primitives.serialization.load_pem_public_key(buffer, backend) except: pass # X509 certificate in PEM try: return load_pem_x509_certificate(buffer, backend) except: pass # PKCS7 certificate bundle in PEM try: return OpenSSL.crypto.load_pkcs7_data(OpenSSL.crypto.FILETYPE_PEM, buffer) except: pass raise Exception("Unknown file type for: {}".format(buffer))
def test_is_certificate_valid_true(vault_ca_obj, monkeypatch): class MockX509(OpenSSL.crypto.X509): def get_notAfter(self): return b'20220510141643Z' monkeypatch.setattr(OpenSSL.crypto, 'X509', MockX509) assert vault_ca_obj._is_certificate_valid('tests/fixtures/acomponent-test.test.org.pem.ok')
def test_is_certificate_valid_false(vault_ca_obj, monkeypatch): class MockX509(OpenSSL.crypto.X509): def get_notAfter(self): return b'20000510141643Z' monkeypatch.setattr(OpenSSL.crypto, 'X509', MockX509) assert not vault_ca_obj._is_certificate_valid('tests/fixtures/acomponent-test.test.org.pem.ok')
def gen_cert_request(cname): pkey = OpenSSL.crypto.PKey() pkey.generate_key(OpenSSL.crypto.TYPE_RSA, 2048) req = OpenSSL.crypto.X509Req() req.get_subject().CN = cname req.set_pubkey(pkey) req.sign(pkey, 'sha512') req_file = OpenSSL.crypto.dump_certificate_request(OpenSSL.crypto.FILETYPE_PEM, req) key_file = OpenSSL.crypto.dump_privatekey(OpenSSL.crypto.FILETYPE_PEM, pkey) return req_file, key_file
def test_pkcs12_format(self): """ PKCS12 ???? ???/??? ??? ?? ?? """ logging.debug("----- PKCS12 Test Start -----") # ECC ? ? ?? pri_key = ec.generate_private_key(ec.SECP256K1(), default_backend()) pub_key = pri_key.public_key() logging.debug("Key_Type : %s", type(pri_key)) # ??? ?? cert = self._generate_cert(pub_key=pub_key, issuer_key=pri_key, subject_name="test") cert_pem = cert.public_bytes( encoding=serialization.Encoding.DER ) key_pem = pri_key.private_bytes( encoding=serialization.Encoding.DER, format=serialization.PrivateFormat.PKCS8, encryption_algorithm=serialization.NoEncryption() ) # ???/???? OpenSSL Key? ?? crypto = OpenSSL.crypto cert_ssl_key = crypto.load_certificate( type=crypto.FILETYPE_ASN1, buffer=cert_pem ) priv_ssl_key = crypto.load_privatekey( type=crypto.FILETYPE_ASN1, buffer=key_pem, passphrase=None ) logging.debug("Key_Type : %s", type(priv_ssl_key)) # ??? ??????? PKCS12???? ?? p12 = OpenSSL.crypto.PKCS12() p12.set_privatekey(priv_ssl_key) p12.set_certificate(cert_ssl_key) pfx = p12.export() pfx_b64 = base64.b64encode(pfx, altchars=None) logging.debug("%s", pfx_b64)
def cert_collect_pkcs12(request,token): #when supplied with token, looks up request, checks approval status and returns appropriate HttpResponse #in this case token is taken from browser certs = Cert_request.objects.filter(token=token) try: cert_request=certs[0] except: return HttpResponse(status=404,content="Not found") logger.info("retrieved cert %s",cert_request.common_name) if (cert_request.issued==True): logger.info("Certificate already issued %s",cert_request.common_name) response= HttpResponse(status=410,content="Cert already issued") elif (cert_request.approved==False): logger.debug("Certificate pending approval %s",cert_request.common_name) response=HttpResponse(status=403, content="Pending approval") else: logger.info("Generating cert %s",cert_request.common_name) datastream=generateNewX509(cert_request) c=OpenSSL.crypto cert=c.load_certificate(c.FILETYPE_PEM, datastream) key=c.load_privatekey(c.FILETYPE_PEM, datastream) p12 = OpenSSL.crypto.PKCS12() p12.set_privatekey(key) p12.set_certificate(cert) #open( "container.pfx", 'w' ).write( p12.export() ) p12dataStream=p12.export() response = HttpResponse(p12dataStream, content_type='application/x-pkcs12',status=201) response['Content-Disposition'] = 'attachment; filename="client_cert.p12"' #update cert_request to avoid duplicates being created #!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!uncomment below!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! #cert_request.issued=True cert_request.save() return response