我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用OpenSSL.crypto.load_certificate()。
def __new__(typ, value): if isinstance(value, basestring): path = process.config_file(value) if path is None: log.warn("Certificate file '%s' is not readable" % value) return None try: f = open(path, 'rt') except: log.warn("Certificate file '%s' could not be open" % value) return None try: try: return crypto.load_certificate(crypto.FILETYPE_PEM, f.read()) except crypto.Error, e: log.warn("Certificate file '%s' could not be loaded: %s" % (value, str(e))) return None finally: f.close() else: raise TypeError, 'value should be a string'
def __init__(self, host, port, verify, cert_path, pkey_path, pkey_passphrase=''): self.host = host self.port = port try: self.pkey = crypto.load_privatekey(crypto.FILETYPE_PEM, open(pkey_path, 'rb').read(), pkey_passphrase) except IOError: raise eStreamerKeyError("Unable to locate key file {}".format(pkey_path)) except crypto.Error: raise eStreamerKeyError("Invalid key file or bad passphrase {}".format(pkey_path)) try: self.cert = crypto.load_certificate(crypto.FILETYPE_PEM, open(cert_path, 'rb').read()) except IOError: raise eStreamerCertError("Unable to locate cert file {}".format(cert_path)) except crypto.Error: raise eStreamerCertError("Invalid certificate {}".format(cert_path)) self.verify = verify self.ctx = None self.sock = None self._bytes = None
def from_string(key_pem, is_x509_cert): """Construct a Verified instance from a string. Args: key_pem: string, public key in PEM format. is_x509_cert: bool, True if key_pem is an X509 cert, otherwise it is expected to be an RSA key in PEM format. Returns: Verifier instance. Raises: OpenSSL.crypto.Error: if the key_pem can't be parsed. """ key_pem = _to_bytes(key_pem) if is_x509_cert: pubkey = crypto.load_certificate(crypto.FILETYPE_PEM, key_pem) else: pubkey = crypto.load_privatekey(crypto.FILETYPE_PEM, key_pem) return OpenSSLVerifier(pubkey)
def from_string(key_pem, is_x509_cert): """Construct a Verified instance from a string. Args: key_pem: string, public key in PEM format. is_x509_cert: bool, True if key_pem is an X509 cert, otherwise it is expected to be an RSA key in PEM format. Returns: Verifier instance. Raises: OpenSSL.crypto.Error if the key_pem can't be parsed. """ if is_x509_cert: pubkey = crypto.load_certificate(crypto.FILETYPE_PEM, key_pem) else: pubkey = crypto.load_privatekey(crypto.FILETYPE_PEM, key_pem) return OpenSSLVerifier(pubkey)
def gen_pkcs12(self, cert_pem=None, key_pem=None, ca_pem=None, friendly_name=None): """ Generate a PKCS12 object with components from PEM. Verify that the set functions return None. """ p12 = PKCS12() if cert_pem: ret = p12.set_certificate(load_certificate(FILETYPE_PEM, cert_pem)) self.assertEqual(ret, None) if key_pem: ret = p12.set_privatekey(load_privatekey(FILETYPE_PEM, key_pem)) self.assertEqual(ret, None) if ca_pem: ret = p12.set_ca_certificates((load_certificate(FILETYPE_PEM, ca_pem),)) self.assertEqual(ret, None) if friendly_name: ret = p12.set_friendlyname(friendly_name) self.assertEqual(ret, None) return p12
def test_replace(self): """ L{PKCS12.set_certificate} replaces the certificate in a PKCS12 cluster. L{PKCS12.set_privatekey} replaces the private key. L{PKCS12.set_ca_certificates} replaces the CA certificates. """ p12 = self.gen_pkcs12(client_cert_pem, client_key_pem, root_cert_pem) p12.set_certificate(load_certificate(FILETYPE_PEM, server_cert_pem)) p12.set_privatekey(load_privatekey(FILETYPE_PEM, server_key_pem)) root_cert = load_certificate(FILETYPE_PEM, root_cert_pem) client_cert = load_certificate(FILETYPE_PEM, client_cert_pem) p12.set_ca_certificates([root_cert]) # not a tuple self.assertEqual(1, len(p12.get_ca_certificates())) self.assertEqual(root_cert, p12.get_ca_certificates()[0]) p12.set_ca_certificates([client_cert, root_cert]) self.assertEqual(2, len(p12.get_ca_certificates())) self.assertEqual(client_cert, p12.get_ca_certificates()[0]) self.assertEqual(root_cert, p12.get_ca_certificates()[1])
def _server(self, sock): """ Create a new server-side SSL L{Connection} object wrapped around C{sock}. """ # Create the server side Connection. This is mostly setup boilerplate # - use TLSv1, use a particular certificate, etc. server_ctx = Context(TLSv1_METHOD) server_ctx.set_options(OP_NO_SSLv2 | OP_NO_SSLv3 | OP_SINGLE_DH_USE ) server_ctx.set_verify(VERIFY_PEER|VERIFY_FAIL_IF_NO_PEER_CERT|VERIFY_CLIENT_ONCE, verify_cb) server_store = server_ctx.get_cert_store() server_ctx.use_privatekey(load_privatekey(FILETYPE_PEM, server_key_pem)) server_ctx.use_certificate(load_certificate(FILETYPE_PEM, server_cert_pem)) server_ctx.check_privatekey() server_store.add_cert(load_certificate(FILETYPE_PEM, root_cert_pem)) # Here the Connection is actually created. If None is passed as the 2nd # parameter, it indicates a memory BIO should be created. server_conn = Connection(server_ctx, sock) server_conn.set_accept_state() return server_conn
def _client(self, sock): """ Create a new client-side SSL L{Connection} object wrapped around C{sock}. """ # Now create the client side Connection. Similar boilerplate to the # above. client_ctx = Context(TLSv1_METHOD) client_ctx.set_options(OP_NO_SSLv2 | OP_NO_SSLv3 | OP_SINGLE_DH_USE ) client_ctx.set_verify(VERIFY_PEER|VERIFY_FAIL_IF_NO_PEER_CERT|VERIFY_CLIENT_ONCE, verify_cb) client_store = client_ctx.get_cert_store() client_ctx.use_privatekey(load_privatekey(FILETYPE_PEM, client_key_pem)) client_ctx.use_certificate(load_certificate(FILETYPE_PEM, client_cert_pem)) client_ctx.check_privatekey() client_store.add_cert(load_certificate(FILETYPE_PEM, root_cert_pem)) client_conn = Connection(client_ctx, sock) client_conn.set_connect_state() return client_conn
def test_set_multiple_ca_list(self): """ If passed a list containing multiple X509Name objects, L{Context.set_client_ca_list} configures the context to send those CA names to the client and, on both the server and client sides, L{Connection.get_client_ca_list} returns a list containing those X509Names after the connection is set up. """ secert = load_certificate(FILETYPE_PEM, server_cert_pem) clcert = load_certificate(FILETYPE_PEM, server_cert_pem) sedesc = secert.get_subject() cldesc = clcert.get_subject() def multiple_ca(ctx): L = [sedesc, cldesc] ctx.set_client_ca_list(L) return L self._check_client_ca_list(multiple_ca)
def test_reset_ca_list(self): """ If called multiple times, only the X509Names passed to the final call of L{Context.set_client_ca_list} are used to configure the CA names sent to the client. """ cacert = load_certificate(FILETYPE_PEM, root_cert_pem) secert = load_certificate(FILETYPE_PEM, server_cert_pem) clcert = load_certificate(FILETYPE_PEM, server_cert_pem) cadesc = cacert.get_subject() sedesc = secert.get_subject() cldesc = clcert.get_subject() def changed_ca(ctx): ctx.set_client_ca_list([sedesc, cldesc]) ctx.set_client_ca_list([cadesc]) return [cadesc] self._check_client_ca_list(changed_ca)
def test_mutated_ca_list(self): """ If the list passed to L{Context.set_client_ca_list} is mutated afterwards, this does not affect the list of CA names sent to the client. """ cacert = load_certificate(FILETYPE_PEM, root_cert_pem) secert = load_certificate(FILETYPE_PEM, server_cert_pem) cadesc = cacert.get_subject() sedesc = secert.get_subject() def mutated_ca(ctx): L = [cadesc] ctx.set_client_ca_list([cadesc]) L.append(sedesc) return [cadesc] self._check_client_ca_list(mutated_ca)
def test_set_and_add_client_ca(self): """ A call to L{Context.set_client_ca_list} followed by a call to L{Context.add_client_ca} results in using the CA names from the first call and the CA name from the second call. """ cacert = load_certificate(FILETYPE_PEM, root_cert_pem) secert = load_certificate(FILETYPE_PEM, server_cert_pem) clcert = load_certificate(FILETYPE_PEM, server_cert_pem) cadesc = cacert.get_subject() sedesc = secert.get_subject() cldesc = clcert.get_subject() def mixed_set_add_ca(ctx): ctx.set_client_ca_list([cadesc, sedesc]) ctx.add_client_ca(clcert) return [cadesc, sedesc, cldesc] self._check_client_ca_list(mixed_set_add_ca)
def test_set_after_add_client_ca(self): """ A call to L{Context.set_client_ca_list} after a call to L{Context.add_client_ca} replaces the CA name specified by the former call with the names specified by the latter cal. """ cacert = load_certificate(FILETYPE_PEM, root_cert_pem) secert = load_certificate(FILETYPE_PEM, server_cert_pem) clcert = load_certificate(FILETYPE_PEM, server_cert_pem) cadesc = cacert.get_subject() sedesc = secert.get_subject() cldesc = clcert.get_subject() def set_replaces_add_ca(ctx): ctx.add_client_ca(clcert) ctx.set_client_ca_list([cadesc]) ctx.add_client_ca(secert) return [cadesc, sedesc] self._check_client_ca_list(set_replaces_add_ca)
def from_string(key_pem, is_x509_cert): """Construct a Verified instance from a string. Args: key_pem: string, public key in PEM format. is_x509_cert: bool, True if key_pem is an X509 cert, otherwise it is expected to be an RSA key in PEM format. Returns: Verifier instance. Raises: OpenSSL.crypto.Error: if the key_pem can't be parsed. """ key_pem = _helpers._to_bytes(key_pem) if is_x509_cert: pubkey = crypto.load_certificate(crypto.FILETYPE_PEM, key_pem) else: pubkey = crypto.load_privatekey(crypto.FILETYPE_PEM, key_pem) return OpenSSLVerifier(pubkey)
def read_client_identity(): '''Loads the private key and certificate objects as read from the client identity PEM file. Returns a pair of objects (key,cert) or None if something bad happened.''' common.print_info("Loading identity file...") # Check for missing client identity: if not os.path.exists(config_paths.CLIENT_IDENTITY_INSTALLED_FILE_PATH): common.print_error("No client identity file found at %s." % config_paths.CLIENT_IDENTITY_INSTALLED_FILE_PATH) return None # Read and load PKI material from the client identity: file_object = open(config_paths.CLIENT_IDENTITY_INSTALLED_FILE_PATH,'r') file_contents = file_object.read() file_object.close() try: cert = crypto.load_certificate(crypto.FILETYPE_PEM,file_contents) except crypto.Error: common.print_error("Could not read the certificate from %s." % config_paths.CLIENT_IDENTITY_INSTALLED_FILE_PATH) cert = None try: key = crypto.load_privatekey(crypto.FILETYPE_PEM,file_contents) except crypto.Error: common.print_error("Could not read the private key from %s." % config_paths.CLIENT_IDENTITY_INSTALLED_FILE_PATH) key = None # Return PKI materials: return key,cert
def prepara_certificado_txt(self, cert_txt): # # Para dar certo a leitura pelo xmlsec, temos que separar o certificado # em linhas de 64 caracteres de extensão... # cert_txt = cert_txt.replace(u'\n', u'') cert_txt = cert_txt.replace(u'-----BEGIN CERTIFICATE-----', u'') cert_txt = cert_txt.replace(u'-----END CERTIFICATE-----', u'') linhas_certificado = [u'-----BEGIN CERTIFICATE-----\n'] for i in range(0, len(cert_txt), 64): linhas_certificado.append(cert_txt[i:i+64] + '\n') linhas_certificado.append(u'-----END CERTIFICATE-----\n') self.certificado = u''.join(linhas_certificado) cert_openssl = crypto.load_certificate(crypto.FILETYPE_PEM, self.certificado) self.emissor = dict(cert_openssl.get_issuer().get_components()) self.proprietario = dict(cert_openssl.get_subject().get_components()) self.data_inicio_validade = datetime.strptime(cert_openssl.get_notBefore().decode('utf-8'), '%Y%m%d%H%M%SZ') self.data_fim_validade = datetime.strptime(cert_openssl.get_notAfter().decode('utf-8'), '%Y%m%d%H%M%SZ')
def from_string(key_pem, is_x509_cert): """Construct a Verified instance from a string. Args: key_pem: string, public key in PEM format. is_x509_cert: bool, True if key_pem is an X509 cert, otherwise it is expected to be an RSA key in PEM format. Returns: Verifier instance. Raises: OpenSSL.crypto.Error if the key_pem can't be parsed. """ if is_x509_cert: pubkey = crypto.load_certificate(crypto.FILETYPE_PEM, key_pem) else: pubkey = crypto.load_privatekey(crypto.FILETYPE_PEM, key_pem) return Verifier(pubkey)
def _get_cert_from_file(self, filename): with open(filename, 'r') as f: cert_pem = f.read() if not cert_pem: raise nsxlib_exceptions.CertificateError( msg=_("Failed to read certificate from %s") % filename) # validate correct crypto try: cert = crypto.load_certificate(crypto.FILETYPE_PEM, cert_pem) except crypto.Error: raise nsxlib_exceptions.CertificateError( msg=_("Failed to import client certificate")) return cert
def get_cert_and_key(self): """Load cert and key from storage""" if self._cert and self._key: return self._cert, self._key cert_pem, key_pem = self._load_from_storage() if cert_pem is None: return None, None try: cert = crypto.load_certificate(crypto.FILETYPE_PEM, cert_pem) key = crypto.load_privatekey(crypto.FILETYPE_PEM, key_pem) except crypto.Error: raise nsxlib_exceptions.CertificateError( msg="Failed to load client certificate") return cert, key
def from_string(key_pem, is_x509_cert): """Construct a Verified instance from a string. Args: key_pem: string, public key in PEM format. is_x509_cert: bool, True if key_pem is an X509 cert, otherwise it is expected to be an RSA key in PEM format. Returns: Verifier instance. Raises: OpenSSL.crypto.Error if the key_pem can't be parsed. """ from OpenSSL import crypto if is_x509_cert: pubkey = crypto.load_certificate(crypto.FILETYPE_PEM, key_pem) else: pubkey = crypto.load_privatekey(crypto.FILETYPE_PEM, key_pem) return OpenSSLVerifier(pubkey)
def gen_pkcs12(self, cert_pem=None, key_pem=None, ca_pem=None, friendly_name=None): """ Generate a PKCS12 object with components from PEM. Verify that the set functions return None. """ p12 = PKCS12() if cert_pem: ret = p12.set_certificate(load_certificate(FILETYPE_PEM, cert_pem)) assert ret is None if key_pem: ret = p12.set_privatekey(load_privatekey(FILETYPE_PEM, key_pem)) assert ret is None if ca_pem: ret = p12.set_ca_certificates( (load_certificate(FILETYPE_PEM, ca_pem),) ) assert ret is None if friendly_name: ret = p12.set_friendlyname(friendly_name) assert ret is None return p12
def test_replace(self): """ `PKCS12.set_certificate` replaces the certificate in a PKCS12 cluster. `PKCS12.set_privatekey` replaces the private key. `PKCS12.set_ca_certificates` replaces the CA certificates. """ p12 = self.gen_pkcs12(client_cert_pem, client_key_pem, root_cert_pem) p12.set_certificate(load_certificate(FILETYPE_PEM, server_cert_pem)) p12.set_privatekey(load_privatekey(FILETYPE_PEM, server_key_pem)) root_cert = load_certificate(FILETYPE_PEM, root_cert_pem) client_cert = load_certificate(FILETYPE_PEM, client_cert_pem) p12.set_ca_certificates([root_cert]) # not a tuple assert 1 == len(p12.get_ca_certificates()) assert root_cert == p12.get_ca_certificates()[0] p12.set_ca_certificates([client_cert, root_cert]) assert 2 == len(p12.get_ca_certificates()) assert client_cert == p12.get_ca_certificates()[0] assert root_cert == p12.get_ca_certificates()[1]
def test_sign_verify_ecdsa(self): """ `sign` generates a cryptographic signature which `verify` can check. ECDSA Signatures in the X9.62 format may have variable length, different from the length of the private key. """ content = ( b"It was a bright cold day in April, and the clocks were striking " b"thirteen. Winston Smith, his chin nuzzled into his breast in an " b"effort to escape the vile wind, slipped quickly through the " b"glass doors of Victory Mansions, though not quickly enough to " b"prevent a swirl of gritty dust from entering along with him." ).decode("ascii") priv_key = load_privatekey(FILETYPE_PEM, ec_root_key_pem) cert = load_certificate(FILETYPE_PEM, ec_root_cert_pem) sig = sign(priv_key, content, "sha1") verify(cert, sig, content, "sha1")
def test_shutdown_truncated(self): """ If the underlying connection is truncated, `Connection.shutdown` raises an `Error`. """ server_ctx = Context(TLSv1_METHOD) client_ctx = Context(TLSv1_METHOD) server_ctx.use_privatekey( load_privatekey(FILETYPE_PEM, server_key_pem)) server_ctx.use_certificate( load_certificate(FILETYPE_PEM, server_cert_pem)) server = Connection(server_ctx, None) client = Connection(client_ctx, None) handshake_in_memory(client, server) assert not server.shutdown() with pytest.raises(WantReadError): server.shutdown() server.bio_shutdown() with pytest.raises(Error): server.shutdown()
def _server(self, sock): """ Create a new server-side SSL `Connection` object wrapped around `sock`. """ # Create the server side Connection. This is mostly setup boilerplate # - use TLSv1, use a particular certificate, etc. server_ctx = Context(TLSv1_METHOD) server_ctx.set_options(OP_NO_SSLv2 | OP_NO_SSLv3 | OP_SINGLE_DH_USE) server_ctx.set_verify( VERIFY_PEER | VERIFY_FAIL_IF_NO_PEER_CERT | VERIFY_CLIENT_ONCE, verify_cb ) server_store = server_ctx.get_cert_store() server_ctx.use_privatekey( load_privatekey(FILETYPE_PEM, server_key_pem)) server_ctx.use_certificate( load_certificate(FILETYPE_PEM, server_cert_pem)) server_ctx.check_privatekey() server_store.add_cert(load_certificate(FILETYPE_PEM, root_cert_pem)) # Here the Connection is actually created. If None is passed as the # 2nd parameter, it indicates a memory BIO should be created. server_conn = Connection(server_ctx, sock) server_conn.set_accept_state() return server_conn
def test_set_multiple_ca_list(self): """ If passed a list containing multiple X509Name objects, `Context.set_client_ca_list` configures the context to send those CA names to the client and, on both the server and client sides, `Connection.get_client_ca_list` returns a list containing those X509Names after the connection is set up. """ secert = load_certificate(FILETYPE_PEM, server_cert_pem) clcert = load_certificate(FILETYPE_PEM, server_cert_pem) sedesc = secert.get_subject() cldesc = clcert.get_subject() def multiple_ca(ctx): L = [sedesc, cldesc] ctx.set_client_ca_list(L) return L self._check_client_ca_list(multiple_ca)
def test_reset_ca_list(self): """ If called multiple times, only the X509Names passed to the final call of `Context.set_client_ca_list` are used to configure the CA names sent to the client. """ cacert = load_certificate(FILETYPE_PEM, root_cert_pem) secert = load_certificate(FILETYPE_PEM, server_cert_pem) clcert = load_certificate(FILETYPE_PEM, server_cert_pem) cadesc = cacert.get_subject() sedesc = secert.get_subject() cldesc = clcert.get_subject() def changed_ca(ctx): ctx.set_client_ca_list([sedesc, cldesc]) ctx.set_client_ca_list([cadesc]) return [cadesc] self._check_client_ca_list(changed_ca)
def test_mutated_ca_list(self): """ If the list passed to `Context.set_client_ca_list` is mutated afterwards, this does not affect the list of CA names sent to the client. """ cacert = load_certificate(FILETYPE_PEM, root_cert_pem) secert = load_certificate(FILETYPE_PEM, server_cert_pem) cadesc = cacert.get_subject() sedesc = secert.get_subject() def mutated_ca(ctx): L = [cadesc] ctx.set_client_ca_list([cadesc]) L.append(sedesc) return [cadesc] self._check_client_ca_list(mutated_ca)
def test_set_and_add_client_ca(self): """ A call to `Context.set_client_ca_list` followed by a call to `Context.add_client_ca` results in using the CA names from the first call and the CA name from the second call. """ cacert = load_certificate(FILETYPE_PEM, root_cert_pem) secert = load_certificate(FILETYPE_PEM, server_cert_pem) clcert = load_certificate(FILETYPE_PEM, server_cert_pem) cadesc = cacert.get_subject() sedesc = secert.get_subject() cldesc = clcert.get_subject() def mixed_set_add_ca(ctx): ctx.set_client_ca_list([cadesc, sedesc]) ctx.add_client_ca(clcert) return [cadesc, sedesc, cldesc] self._check_client_ca_list(mixed_set_add_ca)
def test_set_after_add_client_ca(self): """ A call to `Context.set_client_ca_list` after a call to `Context.add_client_ca` replaces the CA name specified by the former call with the names specified by the latter call. """ cacert = load_certificate(FILETYPE_PEM, root_cert_pem) secert = load_certificate(FILETYPE_PEM, server_cert_pem) clcert = load_certificate(FILETYPE_PEM, server_cert_pem) cadesc = cacert.get_subject() sedesc = secert.get_subject() def set_replaces_add_ca(ctx): ctx.add_client_ca(clcert) ctx.set_client_ca_list([cadesc]) ctx.add_client_ca(secert) return [cadesc, sedesc] self._check_client_ca_list(set_replaces_add_ca)
def _secure_viewer(self): cert_file='install/viewer-certs/ca-cert.pem' cert_file='' try: with open(cert_file, "r") as caFile: ca=caFile.read() from OpenSSL import crypto cert = crypto.load_certificate(crypto.FILETYPE_PEM, open(cert_file).read()) return {'defaultMode':'Secure', 'certificate':ca, 'domain':cert.get_issuer().organizationName} except Exception as e: print(e) return {'defaultMode':'Insecure', 'certificate':'', 'domain':''}
def read_ca_cert(): '''Loads CA certificate from the filesystem and returns it as an object. Returns None if something went wrong.''' common.logging_info("Loading CA certificate.") # Check for missing CA certificate: if not os.path.exists(config_ca.CA_CERT_FILE_PATH): common.logging_error("No CA certificate file found at %s." % config_ca.CA_CERT_FILE_PATH) return None # Read and load CA certificate: try: file_object = open(config_ca.CA_CERT_FILE_PATH,'r') file_contents = file_object.read() file_object.close() except IOError: return None # Return CA cert: try: return crypto.load_certificate(crypto.FILETYPE_PEM,file_contents) except crypto.Error: common.logging_error("Could not read CA certificate from %s." % config_ca.CA_CERT_FILE_PATH) return None
def initFromPath(self, path): 'Will initialize the directory from the path supplied' import cPickle data = None with open(path,'r') as f: data = cPickle.load(f) assert data != None, "Expected some data, did not load any." priv_key_from_pem = lambda x: crypto.load_privatekey(crypto.FILETYPE_PEM, x) for userName, keyTuple in data['users'].iteritems(): self.users[userName] = User(userName, directory=self, ecdsaSigningKey=ecdsa.SigningKey.from_pem(keyTuple[0]), rsaSigningKey=priv_key_from_pem(keyTuple[1])) self.users[userName].tags = keyTuple[2] for orgName, tuple in data['organizations'].iteritems(): org = Organization(orgName, ecdsaSigningKey=ecdsa.SigningKey.from_pem(tuple[0]), rsaSigningKey=priv_key_from_pem(tuple[0])) org.signedCert = crypto.load_certificate(crypto.FILETYPE_PEM, tuple[2]) org.networks = [Network[name] for name in tuple[3]] self.organizations[orgName] = org for nat, cert_as_pem in data['nats'].iteritems(): self.ordererAdminTuples[nat] = crypto.load_certificate(crypto.FILETYPE_PEM, cert_as_pem)