我们从Python开源项目中,提取了以下10个代码示例,用于说明如何使用OpenSSL.crypto.X509Store()。
def verify_cert_chain(chain_pem, trusted_certs): cert = crypto.load_certificate(crypto.FILETYPE_PEM, chain_pem.decode('utf-8')) # Build store of trusted certificates store = crypto.X509Store() for _cert in trusted_certs: tmp = crypto.load_certificate(crypto.FILETYPE_PEM, _cert.decode('utf-8')) store.add_cert(tmp) # Prepare context ctx = crypto.X509StoreContext(store, cert) # Start validation try: ctx.verify_certificate() return True except crypto.X509StoreContextError as e: logging.error("Certificate validation failed: %s" % e) return False
def _verify_ca(self): """ (internal use only) verifies the current x509 is signed by the associated CA """ store = crypto.X509Store() store.add_cert(self.ca.x509) store_ctx = crypto.X509StoreContext(store, self.x509) try: store_ctx.verify_certificate() except crypto.X509StoreContextError as e: raise ValidationError(_("CA doesn't match, got the " "following error from pyOpenSSL: \"%s\"") % e.args[0][2])
def registerOrdererAdminTuple(self, userName, ordererName, organizationName): ' Assign the user as orderer admin' ordererAdminTuple = NodeAdminTuple(user=userName, nodeName=ordererName, organization=organizationName) assert ordererAdminTuple not in self.ordererAdminTuples, "Orderer admin tuple already registered {0}".format( ordererAdminTuple) assert organizationName in self.organizations, "Orderer Organization not defined {0}".format(organizationName) user = self.getUser(userName, shouldCreate=True) # Add the subjectAlternativeName if the current entity is a signer, and the nodeName contains peer or orderer extensions = self._get_cert_extensions_ip_sans(userName, ordererName) certReq = user.createCertRequest(ordererAdminTuple.nodeName, extensions=extensions) userCert = self.getOrganization(organizationName).createCertificate(certReq, extensions=extensions) # Verify the newly created certificate store = crypto.X509Store() # Assuming a list of trusted certs for trustedCert in [self.getOrganization(organizationName).signedCert]: store.add_cert(trustedCert) # Create a certificate context using the store and the certificate to verify store_ctx = crypto.X509StoreContext(store, userCert) # Verify the certificate, returns None if it can validate the certificate store_ctx.verify_certificate() self.ordererAdminTuples[ordererAdminTuple] = userCert return ordererAdminTuple
def test_get_cert_store(self): """ `Context.get_cert_store` returns a `X509Store` instance. """ context = Context(TLSv1_METHOD) store = context.get_cert_store() assert isinstance(store, X509Store)
def test_new(self): cert = self._create_cert() self.assertNotEqual(cert.certificate, '') self.assertNotEqual(cert.private_key, '') x509 = cert.x509 self.assertEqual(x509.get_serial_number(), cert.serial_number) subject = x509.get_subject() # check subject self.assertEqual(subject.countryName, cert.country_code) self.assertEqual(subject.stateOrProvinceName, cert.state) self.assertEqual(subject.localityName, cert.city) self.assertEqual(subject.organizationName, cert.organization_name) self.assertEqual(subject.emailAddress, cert.email) self.assertEqual(subject.commonName, cert.common_name) # check issuer issuer = x509.get_issuer() ca = cert.ca self.assertEqual(issuer.countryName, ca.country_code) self.assertEqual(issuer.stateOrProvinceName, ca.state) self.assertEqual(issuer.localityName, ca.city) self.assertEqual(issuer.organizationName, ca.organization_name) self.assertEqual(issuer.emailAddress, ca.email) self.assertEqual(issuer.commonName, ca.common_name) # check signature store = crypto.X509Store() store.add_cert(ca.x509) store_ctx = crypto.X509StoreContext(store, cert.x509) store_ctx.verify_certificate() # ensure version is 3 (indexed 0 based counting) self.assertEqual(x509.get_version(), 2) # basic constraints e = cert.x509.get_extension(0) self.assertEqual(e.get_critical(), 0) self.assertEqual(e.get_short_name().decode(), 'basicConstraints') self.assertEqual(e.get_data(), b'0\x00')
def verify_certificate_chain(ca_pem_data, cert_pem_data): try: ca_cert = crypto.load_certificate(crypto.FILETYPE_PEM, ca_pem_data) cert = crypto.load_certificate(crypto.FILETYPE_PEM, cert_pem_data) store = crypto.X509Store() store.add_cert(ca_cert) store_ctx = crypto.X509StoreContext(store, cert) store_ctx.verify_certificate() except crypto.Error as e: raise InvalidCertificate('Broken certificate') from e except crypto.X509StoreContextError as e: raise InvalidCertificate('Invalid certificate chain: ' + str(e)) from e
def test_get_cert_store(self): """ :py:obj:`Context.get_cert_store` returns a :py:obj:`X509Store` instance. """ context = Context(TLSv1_METHOD) store = context.get_cert_store() self.assertIsInstance(store, X509Store)
def __init__(self, local_cert, priv_key, ca_cert, controller_name_re): """Initialize JWTUtils Load the local node certificate, the node private key and the CA certificate from files; prepare for both signing and validation of key-value pairs. Signing will take place with the local certificate, and the public half will be added to signed objects. Validation will take place with the CA certificate, along with other checks that the signing matches the payload. :param local_cert: file containing public half of the local key :param priv_key: file containing private half of the local key :param ca_cert: file containing CA root certificate raise: IOError if the files cannot be read. """ priv_key_pem = self._get_crypto_material(priv_key) self.private_key = serialization.load_pem_private_key( priv_key_pem, password=None, backend=default_backend()) self.node_certificate = self._get_crypto_material(local_cert) self.node_cert_obj = load_pem_x509_certificate( self.node_certificate, default_backend()) self.node_cert_pem = self.node_cert_obj.public_bytes( serialization.Encoding.PEM) ca_certificate = self._get_crypto_material(ca_cert) # pyopenssl root_ca = crypto.load_certificate(crypto.FILETYPE_PEM, ca_certificate) self.store = crypto.X509Store() self.store.add_cert(root_ca) self.controller_name_re = controller_name_re
def verify_certificate(ca, cert): store = crypto.X509Store() store.add_cert(ca) try: crypto.X509StoreContext(store, cert).verify_certificate() except: return False return True