我们从Python开源项目中,提取了以下19个代码示例,用于说明如何使用OpenSSL.crypto.X509StoreContext()。
def verify_cert_chain(chain_pem, trusted_certs): cert = crypto.load_certificate(crypto.FILETYPE_PEM, chain_pem.decode('utf-8')) # Build store of trusted certificates store = crypto.X509Store() for _cert in trusted_certs: tmp = crypto.load_certificate(crypto.FILETYPE_PEM, _cert.decode('utf-8')) store.add_cert(tmp) # Prepare context ctx = crypto.X509StoreContext(store, cert) # Start validation try: ctx.verify_certificate() return True except crypto.X509StoreContextError as e: logging.error("Certificate validation failed: %s" % e) return False
def _verify_ca(self): """ (internal use only) verifies the current x509 is signed by the associated CA """ store = crypto.X509Store() store.add_cert(self.ca.x509) store_ctx = crypto.X509StoreContext(store, self.x509) try: store_ctx.verify_certificate() except crypto.X509StoreContextError as e: raise ValidationError(_("CA doesn't match, got the " "following error from pyOpenSSL: \"%s\"") % e.args[0][2])
def registerOrdererAdminTuple(self, userName, ordererName, organizationName): ' Assign the user as orderer admin' ordererAdminTuple = NodeAdminTuple(user=userName, nodeName=ordererName, organization=organizationName) assert ordererAdminTuple not in self.ordererAdminTuples, "Orderer admin tuple already registered {0}".format( ordererAdminTuple) assert organizationName in self.organizations, "Orderer Organization not defined {0}".format(organizationName) user = self.getUser(userName, shouldCreate=True) # Add the subjectAlternativeName if the current entity is a signer, and the nodeName contains peer or orderer extensions = self._get_cert_extensions_ip_sans(userName, ordererName) certReq = user.createCertRequest(ordererAdminTuple.nodeName, extensions=extensions) userCert = self.getOrganization(organizationName).createCertificate(certReq, extensions=extensions) # Verify the newly created certificate store = crypto.X509Store() # Assuming a list of trusted certs for trustedCert in [self.getOrganization(organizationName).signedCert]: store.add_cert(trustedCert) # Create a certificate context using the store and the certificate to verify store_ctx = crypto.X509StoreContext(store, userCert) # Verify the certificate, returns None if it can validate the certificate store_ctx.verify_certificate() self.ordererAdminTuples[ordererAdminTuple] = userCert return ordererAdminTuple
def test_modification_pre_verify(self): """ :py:obj:`verify_certificate` can use a store context modified after instantiation. """ store_bad = X509Store() store_bad.add_cert(self.intermediate_cert) store_good = X509Store() store_good.add_cert(self.root_cert) store_good.add_cert(self.intermediate_cert) store_ctx = X509StoreContext(store_bad, self.intermediate_server_cert) e = self.assertRaises(X509StoreContextError, store_ctx.verify_certificate) self.assertEqual(e.args[0][2], 'unable to get issuer certificate') self.assertEqual(e.certificate.get_subject().CN, 'intermediate') store_ctx.set_store(store_good) self.assertEqual(store_ctx.verify_certificate(), None)
def test_new(self): cert = self._create_cert() self.assertNotEqual(cert.certificate, '') self.assertNotEqual(cert.private_key, '') x509 = cert.x509 self.assertEqual(x509.get_serial_number(), cert.serial_number) subject = x509.get_subject() # check subject self.assertEqual(subject.countryName, cert.country_code) self.assertEqual(subject.stateOrProvinceName, cert.state) self.assertEqual(subject.localityName, cert.city) self.assertEqual(subject.organizationName, cert.organization_name) self.assertEqual(subject.emailAddress, cert.email) self.assertEqual(subject.commonName, cert.common_name) # check issuer issuer = x509.get_issuer() ca = cert.ca self.assertEqual(issuer.countryName, ca.country_code) self.assertEqual(issuer.stateOrProvinceName, ca.state) self.assertEqual(issuer.localityName, ca.city) self.assertEqual(issuer.organizationName, ca.organization_name) self.assertEqual(issuer.emailAddress, ca.email) self.assertEqual(issuer.commonName, ca.common_name) # check signature store = crypto.X509Store() store.add_cert(ca.x509) store_ctx = crypto.X509StoreContext(store, cert.x509) store_ctx.verify_certificate() # ensure version is 3 (indexed 0 based counting) self.assertEqual(x509.get_version(), 2) # basic constraints e = cert.x509.get_extension(0) self.assertEqual(e.get_critical(), 0) self.assertEqual(e.get_short_name().decode(), 'basicConstraints') self.assertEqual(e.get_data(), b'0\x00')
def verify_certificate_chain(ca_pem_data, cert_pem_data): try: ca_cert = crypto.load_certificate(crypto.FILETYPE_PEM, ca_pem_data) cert = crypto.load_certificate(crypto.FILETYPE_PEM, cert_pem_data) store = crypto.X509Store() store.add_cert(ca_cert) store_ctx = crypto.X509StoreContext(store, cert) store_ctx.verify_certificate() except crypto.Error as e: raise InvalidCertificate('Broken certificate') from e except crypto.X509StoreContextError as e: raise InvalidCertificate('Invalid certificate chain: ' + str(e)) from e
def test_valid(self): """ :py:obj:`verify_certificate` returns ``None`` when called with a certificate and valid chain. """ store = X509Store() store.add_cert(self.root_cert) store.add_cert(self.intermediate_cert) store_ctx = X509StoreContext(store, self.intermediate_server_cert) self.assertEqual(store_ctx.verify_certificate(), None)
def test_reuse(self): """ :py:obj:`verify_certificate` can be called multiple times with the same ``X509StoreContext`` instance to produce the same result. """ store = X509Store() store.add_cert(self.root_cert) store.add_cert(self.intermediate_cert) store_ctx = X509StoreContext(store, self.intermediate_server_cert) self.assertEqual(store_ctx.verify_certificate(), None) self.assertEqual(store_ctx.verify_certificate(), None)
def test_trusted_self_signed(self): """ :py:obj:`verify_certificate` returns ``None`` when called with a self-signed certificate and itself in the chain. """ store = X509Store() store.add_cert(self.root_cert) store_ctx = X509StoreContext(store, self.root_cert) self.assertEqual(store_ctx.verify_certificate(), None)
def test_untrusted_self_signed(self): """ :py:obj:`verify_certificate` raises error when a self-signed certificate is verified without itself in the chain. """ store = X509Store() store_ctx = X509StoreContext(store, self.root_cert) e = self.assertRaises(X509StoreContextError, store_ctx.verify_certificate) self.assertEqual(e.args[0][2], 'self signed certificate') self.assertEqual(e.certificate.get_subject().CN, 'Testing Root CA')
def test_invalid_chain_no_root(self): """ :py:obj:`verify_certificate` raises error when a root certificate is missing from the chain. """ store = X509Store() store.add_cert(self.intermediate_cert) store_ctx = X509StoreContext(store, self.intermediate_server_cert) e = self.assertRaises(X509StoreContextError, store_ctx.verify_certificate) self.assertEqual(e.args[0][2], 'unable to get issuer certificate') self.assertEqual(e.certificate.get_subject().CN, 'intermediate')
def _verify_certificate(self, vcert): """Confirm this certificate is in a chain of trust We have a CA, and we want to know we're seeing a certificate that this CA has signed. """ certificate = crypto.load_certificate(crypto.FILETYPE_PEM, vcert) store_ctx = crypto.X509StoreContext(self.store, certificate) result = store_ctx.verify_certificate() if result is not None: raise JWTSigningFailed(_("Certificate is not trusted"))
def verify_certificate(ca, cert): store = crypto.X509Store() store.add_cert(ca) try: crypto.X509StoreContext(store, cert).verify_certificate() except: return False return True