我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用OpenSSL.crypto.Error()。
def __new__(typ, value): if isinstance(value, basestring): path = process.config_file(value) if path is None: log.warn("Certificate file '%s' is not readable" % value) return None try: f = open(path, 'rt') except: log.warn("Certificate file '%s' could not be open" % value) return None try: try: return crypto.load_certificate(crypto.FILETYPE_PEM, f.read()) except crypto.Error, e: log.warn("Certificate file '%s' could not be loaded: %s" % (value, str(e))) return None finally: f.close() else: raise TypeError, 'value should be a string'
def sign_message(given_message,given_key): '''Signs the (hash of the) given message with the given private key. Returns the base64 encoded signature or or a blank string if something bad happened.''' # Check for blank message: if not given_message: common.print_error("Cannot sign blank message.") return None # Sign the message by encrypting its hash with the private key: try: signature = crypto.sign(given_key,given_message,'sha512') signature = base64.b64encode(signature) except crypto.Error: common.print_error("Error signing message!") signature = '' # Return signature: return signature
def __init__(self, host, port, verify, cert_path, pkey_path, pkey_passphrase=''): self.host = host self.port = port try: self.pkey = crypto.load_privatekey(crypto.FILETYPE_PEM, open(pkey_path, 'rb').read(), pkey_passphrase) except IOError: raise eStreamerKeyError("Unable to locate key file {}".format(pkey_path)) except crypto.Error: raise eStreamerKeyError("Invalid key file or bad passphrase {}".format(pkey_path)) try: self.cert = crypto.load_certificate(crypto.FILETYPE_PEM, open(cert_path, 'rb').read()) except IOError: raise eStreamerCertError("Unable to locate cert file {}".format(cert_path)) except crypto.Error: raise eStreamerCertError("Invalid certificate {}".format(cert_path)) self.verify = verify self.ctx = None self.sock = None self._bytes = None
def verify(self, message, signature): """Verifies a message against a signature. Args: message: string or bytes, The message to verify. If string, will be encoded to bytes as utf-8. signature: string or bytes, The signature on the message. If string, will be encoded to bytes as utf-8. Returns: True if message was signed by the private key associated with the public key that this object was constructed with. """ message = _to_bytes(message, encoding='utf-8') signature = _to_bytes(signature, encoding='utf-8') try: crypto.verify(self._pubkey, signature, message, 'sha256') return True except crypto.Error: return False
def from_string(key_pem, is_x509_cert): """Construct a Verified instance from a string. Args: key_pem: string, public key in PEM format. is_x509_cert: bool, True if key_pem is an X509 cert, otherwise it is expected to be an RSA key in PEM format. Returns: Verifier instance. Raises: OpenSSL.crypto.Error: if the key_pem can't be parsed. """ key_pem = _to_bytes(key_pem) if is_x509_cert: pubkey = crypto.load_certificate(crypto.FILETYPE_PEM, key_pem) else: pubkey = crypto.load_privatekey(crypto.FILETYPE_PEM, key_pem) return OpenSSLVerifier(pubkey)
def from_string(key, password=b'notasecret'): """Construct a Signer instance from a string. Args: key: string, private key in PKCS12 or PEM format. password: string, password for the private key file. Returns: Signer instance. Raises: OpenSSL.crypto.Error if the key can't be parsed. """ key = _to_bytes(key) parsed_pem_key = _parse_pem_key(key) if parsed_pem_key: pkey = crypto.load_privatekey(crypto.FILETYPE_PEM, parsed_pem_key) else: password = _to_bytes(password, encoding='utf-8') pkey = crypto.load_pkcs12(key, password).get_privatekey() return OpenSSLSigner(pkey)
def test_invalid_extension(self): """ L{X509Extension} raises something if it is passed a bad extension name or value. """ self.assertRaises( Error, X509Extension, 'thisIsMadeUp', False, 'hi') self.assertRaises( Error, X509Extension, 'basicConstraints', False, 'blah blah') # Exercise a weird one (an extension which uses the r2i method). This # exercises the codepath that requires a non-NULL ctx to be passed to # X509V3_EXT_nconf. It can't work now because we provide no # configuration database. It might be made to work in the future. self.assertRaises( Error, X509Extension, 'proxyCertInfo', True, 'language:id-ppl-anyLanguage,pathlen:1,policy:text:AB')
def test_key_only(self): """ A L{PKCS12} with only a private key can be exported using L{PKCS12.export} and loaded again using L{load_pkcs12}. """ passwd = 'blah' p12 = PKCS12() pkey = load_privatekey(FILETYPE_PEM, cleartextPrivateKeyPEM) p12.set_privatekey(pkey) self.assertEqual(None, p12.get_certificate()) self.assertEqual(pkey, p12.get_privatekey()) try: dumped_p12 = p12.export(passphrase=passwd, iter=2, maciter=3) except Error: # Some versions of OpenSSL will throw an exception # for this nearly useless PKCS12 we tried to generate: # [('PKCS12 routines', 'PKCS12_create', 'invalid null argument')] return p12 = load_pkcs12(dumped_p12, passwd) self.assertEqual(None, p12.get_ca_certificates()) self.assertEqual(None, p12.get_certificate()) # OpenSSL fails to bring the key back to us. So sad. Perhaps in the # future this will be improved. self.assertTrue(isinstance(p12.get_privatekey(), (PKey, type(None))))
def test_load_without_mac(self): """ Loading a PKCS12 without a MAC does something other than crash. """ passwd = 'Lake Michigan' p12 = self.gen_pkcs12(server_cert_pem, server_key_pem, root_cert_pem) dumped_p12 = p12.export(maciter=-1, passphrase=passwd, iter=2) try: recovered_p12 = load_pkcs12(dumped_p12, passwd) # The person who generated this PCKS12 should be flogged, # or better yet we should have a means to determine # whether a PCKS12 had a MAC that was verified. # Anyway, libopenssl chooses to allow it, so the # pyopenssl binding does as well. self.assertTrue(isinstance(recovered_p12, PKCS12)) except Error: # Failing here with an exception is preferred as some openssl # versions do. pass
def verify(self, message, signature): """Verifies a message against a signature. Args: message: string or bytes, The message to verify. If string, will be encoded to bytes as utf-8. signature: string or bytes, The signature on the message. If string, will be encoded to bytes as utf-8. Returns: True if message was signed by the private key associated with the public key that this object was constructed with. """ message = _helpers._to_bytes(message, encoding='utf-8') signature = _helpers._to_bytes(signature, encoding='utf-8') try: crypto.verify(self._pubkey, signature, message, 'sha256') return True except crypto.Error: return False
def from_string(key_pem, is_x509_cert): """Construct a Verified instance from a string. Args: key_pem: string, public key in PEM format. is_x509_cert: bool, True if key_pem is an X509 cert, otherwise it is expected to be an RSA key in PEM format. Returns: Verifier instance. Raises: OpenSSL.crypto.Error: if the key_pem can't be parsed. """ key_pem = _helpers._to_bytes(key_pem) if is_x509_cert: pubkey = crypto.load_certificate(crypto.FILETYPE_PEM, key_pem) else: pubkey = crypto.load_privatekey(crypto.FILETYPE_PEM, key_pem) return OpenSSLVerifier(pubkey)
def from_string(key, password=b'notasecret'): """Construct a Signer instance from a string. Args: key: string, private key in PKCS12 or PEM format. password: string, password for the private key file. Returns: Signer instance. Raises: OpenSSL.crypto.Error if the key can't be parsed. """ key = _helpers._to_bytes(key) parsed_pem_key = _helpers._parse_pem_key(key) if parsed_pem_key: pkey = crypto.load_privatekey(crypto.FILETYPE_PEM, parsed_pem_key) else: password = _helpers._to_bytes(password, encoding='utf-8') pkey = crypto.load_pkcs12(key, password).get_privatekey() return OpenSSLSigner(pkey)
def read_client_identity(): '''Loads the private key and certificate objects as read from the client identity PEM file. Returns a pair of objects (key,cert) or None if something bad happened.''' common.print_info("Loading identity file...") # Check for missing client identity: if not os.path.exists(config_paths.CLIENT_IDENTITY_INSTALLED_FILE_PATH): common.print_error("No client identity file found at %s." % config_paths.CLIENT_IDENTITY_INSTALLED_FILE_PATH) return None # Read and load PKI material from the client identity: file_object = open(config_paths.CLIENT_IDENTITY_INSTALLED_FILE_PATH,'r') file_contents = file_object.read() file_object.close() try: cert = crypto.load_certificate(crypto.FILETYPE_PEM,file_contents) except crypto.Error: common.print_error("Could not read the certificate from %s." % config_paths.CLIENT_IDENTITY_INSTALLED_FILE_PATH) cert = None try: key = crypto.load_privatekey(crypto.FILETYPE_PEM,file_contents) except crypto.Error: common.print_error("Could not read the private key from %s." % config_paths.CLIENT_IDENTITY_INSTALLED_FILE_PATH) key = None # Return PKI materials: return key,cert
def _get_cert_from_file(self, filename): with open(filename, 'r') as f: cert_pem = f.read() if not cert_pem: raise nsxlib_exceptions.CertificateError( msg=_("Failed to read certificate from %s") % filename) # validate correct crypto try: cert = crypto.load_certificate(crypto.FILETYPE_PEM, cert_pem) except crypto.Error: raise nsxlib_exceptions.CertificateError( msg=_("Failed to import client certificate")) return cert
def get_cert_and_key(self): """Load cert and key from storage""" if self._cert and self._key: return self._cert, self._key cert_pem, key_pem = self._load_from_storage() if cert_pem is None: return None, None try: cert = crypto.load_certificate(crypto.FILETYPE_PEM, cert_pem) key = crypto.load_privatekey(crypto.FILETYPE_PEM, key_pem) except crypto.Error: raise nsxlib_exceptions.CertificateError( msg="Failed to load client certificate") return cert, key
def logs(self, log_lines=str(settings.LOG_LINES)): """Return aggregated log data for this application.""" try: url = "http://{}:{}/{}?log_lines={}".format(settings.LOGGER_HOST, settings.LOGGER_PORT, self.id, log_lines) r = requests.get(url) # Handle HTTP request errors except requests.exceptions.RequestException as e: logger.error("Error accessing deis-logger using url '{}': {}".format(url, e)) raise e # Handle logs empty or not found if r.status_code == 204 or r.status_code == 404: logger.info("GET {} returned a {} status code".format(url, r.status_code)) raise EnvironmentError('Could not locate logs') # Handle unanticipated status codes if r.status_code != 200: logger.error("Error accessing deis-logger: GET {} returned a {} status code" .format(url, r.status_code)) raise EnvironmentError('Error accessing deis-logger') return r.content
def test_sign(self): """ `X509Req.sign` succeeds when passed a private key object and a valid digest function. `X509Req.verify` can be used to check the signature. """ request = self.signable() key = PKey() key.generate_key(TYPE_RSA, 512) request.set_pubkey(key) request.sign(key, GOOD_DIGEST) # If the type has a verify method, cover that too. if getattr(request, 'verify', None) is not None: pub = request.get_pubkey() assert request.verify(pub) # Make another key that won't verify. key = PKey() key.generate_key(TYPE_RSA, 512) with pytest.raises(Error): request.verify(key)
def test_load_without_mac(self): """ Loading a PKCS12 without a MAC does something other than crash. """ passwd = b"Lake Michigan" p12 = self.gen_pkcs12(server_cert_pem, server_key_pem, root_cert_pem) dumped_p12 = p12.export(maciter=-1, passphrase=passwd, iter=2) try: recovered_p12 = load_pkcs12(dumped_p12, passwd) # The person who generated this PCKS12 should be flogged, # or better yet we should have a means to determine # whether a PCKS12 had a MAC that was verified. # Anyway, libopenssl chooses to allow it, so the # pyopenssl binding does as well. assert isinstance(recovered_p12, PKCS12) except Error: # Failing here with an exception is preferred as some openssl # versions do. pass
def read_ca_cert(): '''Loads CA certificate from the filesystem and returns it as an object. Returns None if something went wrong.''' common.logging_info("Loading CA certificate.") # Check for missing CA certificate: if not os.path.exists(config_ca.CA_CERT_FILE_PATH): common.logging_error("No CA certificate file found at %s." % config_ca.CA_CERT_FILE_PATH) return None # Read and load CA certificate: try: file_object = open(config_ca.CA_CERT_FILE_PATH,'r') file_contents = file_object.read() file_object.close() except IOError: return None # Return CA cert: try: return crypto.load_certificate(crypto.FILETYPE_PEM,file_contents) except crypto.Error: common.logging_error("Could not read CA certificate from %s." % config_ca.CA_CERT_FILE_PATH) return None
def make_p12_with_ca_cert(given_ca_cert,given_passphrase): '''Given CA cert object and passphrase, create a PKCS container and return it as data. Return None if something went wrong.''' common.logging_info("Generating PKCS12 store for given CA certificate.") p = crypto.PKCS12() try: p.set_ca_certificates([given_ca_cert]) except crypto.Error: common.logging_error("Could create a p12 object with given CA certificate.") return None try: p12_ca_data = p.export(passphrase=given_passphrase) except crypto.Error: common.logging_error("Could not export data from p12 object.") return None return p12_ca_data #PRAGMA MARK: SIGNING & VERIFICATION METHODS
def verify_signed_message(given_message,given_encoded_sig,given_cert): '''Given strings for the message, its signature, and a certificate object, verify the message. Returns true or false.''' # Decode given signature (expected base64 encoding): try: decoded_sig = base64.b64decode(given_encoded_sig) except: common.logging_error("Could not interpret encoded signature.") return False # Verify the signature: try: # OpenSSL is funny. A successful crypto.verify() returns a None object! crypto.verify(given_cert,decoded_sig,given_message,config_client_pki.CSR_SIGNING_HASH_ALGORITHM) return True except crypto.Error: common.logging_error("Message fails verification.") return False
def __new__(typ, value): if isinstance(value, basestring): path = process.config_file(value) if path is None: log.warn("Private key file '%s' is not readable" % value) return None try: f = open(path, 'rt') except: log.warn("Private key file '%s' could not be open" % value) return None try: try: return crypto.load_privatekey(crypto.FILETYPE_PEM, f.read()) except crypto.Error, e: log.warn("Private key file '%s' could not be loaded: %s" % (value, str(e))) return None finally: f.close() else: raise TypeError, 'value should be a string'
def test_invalid_extension(self): """ :py:class:`X509Extension` raises something if it is passed a bad extension name or value. """ self.assertRaises( Error, X509Extension, b('thisIsMadeUp'), False, b('hi')) self.assertRaises( Error, X509Extension, b('basicConstraints'), False, b('blah blah')) # Exercise a weird one (an extension which uses the r2i method). This # exercises the codepath that requires a non-NULL ctx to be passed to # X509V3_EXT_nconf. It can't work now because we provide no # configuration database. It might be made to work in the future. self.assertRaises( Error, X509Extension, b('proxyCertInfo'), True, b('language:id-ppl-anyLanguage,pathlen:1,policy:text:AB'))
def test_sign(self): """ :py:meth:`X509Req.sign` succeeds when passed a private key object and a valid digest function. :py:meth:`X509Req.verify` can be used to check the signature. """ request = self.signable() key = PKey() key.generate_key(TYPE_RSA, 512) request.set_pubkey(key) request.sign(key, GOOD_DIGEST) # If the type has a verify method, cover that too. if getattr(request, 'verify', None) is not None: pub = request.get_pubkey() self.assertTrue(request.verify(pub)) # Make another key that won't verify. key = PKey() key.generate_key(TYPE_RSA, 512) self.assertRaises(Error, request.verify, key)
def test_load_without_mac(self): """ Loading a PKCS12 without a MAC does something other than crash. """ passwd = b"Lake Michigan" p12 = self.gen_pkcs12(server_cert_pem, server_key_pem, root_cert_pem) dumped_p12 = p12.export(maciter=-1, passphrase=passwd, iter=2) try: recovered_p12 = load_pkcs12(dumped_p12, passwd) # The person who generated this PCKS12 should be flogged, # or better yet we should have a means to determine # whether a PCKS12 had a MAC that was verified. # Anyway, libopenssl chooses to allow it, so the # pyopenssl binding does as well. self.assertTrue(isinstance(recovered_p12, PKCS12)) except Error: # Failing here with an exception is preferred as some openssl # versions do. pass
def tearDown(self): """ Clean up any files or directories created using :py:meth:`TestCase.mktemp`. Subclasses must invoke this method if they override it or the cleanup will not occur. """ if False and self._temporaryFiles is not None: for temp in self._temporaryFiles: if os.path.isdir(temp): shutil.rmtree(temp) elif os.path.exists(temp): os.unlink(temp) try: exception_from_error_queue(Error) except Error: e = sys.exc_info()[1] if e.args != ([],): self.fail("Left over errors in OpenSSL error queue: " + repr(e))
def from_string(key_pem, is_x509_cert): """Construct a Verified instance from a string. Args: key_pem: string, public key in PEM format. is_x509_cert: bool, True if key_pem is an X509 cert, otherwise it is expected to be an RSA key in PEM format. Returns: Verifier instance. Raises: OpenSSL.crypto.Error: if the key_pem can't be parsed. """ if is_x509_cert: pubkey = crypto.load_certificate(crypto.FILETYPE_PEM, key_pem) else: pubkey = crypto.load_privatekey(crypto.FILETYPE_PEM, key_pem) return OpenSSLVerifier(pubkey)
def from_string(key, password=b'notasecret'): """Construct a Signer instance from a string. Args: key: string, private key in PKCS12 or PEM format. password: string, password for the private key file. Returns: Signer instance. Raises: OpenSSL.crypto.Error if the key can't be parsed. """ parsed_pem_key = _parse_pem_key(key) if parsed_pem_key: pkey = crypto.load_privatekey(crypto.FILETYPE_PEM, parsed_pem_key) else: password = _to_bytes(password, encoding='utf-8') pkey = crypto.load_pkcs12(key, password).get_privatekey() return OpenSSLSigner(pkey)
def _compute_expiry_date(self): try: pfx = base64.decodestring( self.with_context(bin_size=False).nfe_a1_file) pfx = crypto.load_pkcs12(pfx, self.nfe_a1_password) cert = pfx.get_certificate() end = datetime.strptime( cert.get_notAfter().decode(), '%Y%m%d%H%M%SZ') subj = cert.get_subject() self.cert_expire_date = end if datetime.now() < end: self.cert_state = 'valid' else: self.cert_state = 'expired' self.cert_information = "%s\n%s\n%s\n%s" % ( subj.CN, subj.L, subj.O, subj.OU) except crypto.Error: self.cert_state = 'invalid_password' except: self.cert_state = 'unknown' _logger.error( u'Erro desconhecido ao consultar certificado', exc_info=True)