Python OpenSSL.crypto 模块,Error() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用OpenSSL.crypto.Error()

项目:python-gnutls    作者:AGProjects    | 项目源码 | 文件源码
def __new__(typ, value):
        if isinstance(value, basestring):
            path = process.config_file(value)
            if path is None:
                log.warn("Certificate file '%s' is not readable" % value)
                return None
            try:
                f = open(path, 'rt')
            except:
                log.warn("Certificate file '%s' could not be open" % value)
                return None
            try:
                try:
                    return crypto.load_certificate(crypto.FILETYPE_PEM, f.read())
                except crypto.Error, e:
                    log.warn("Certificate file '%s' could not be loaded: %s" % (value, str(e)))
                    return None
            finally:
                f.close()
        else:
            raise TypeError, 'value should be a string'
项目:munki-enrollment-client    作者:gerritdewitt    | 项目源码 | 文件源码
def sign_message(given_message,given_key):
    '''Signs the (hash of the) given message with the given private key.
        Returns the base64 encoded signature or or a blank string if something bad happened.'''
    # Check for blank message:
    if not given_message:
        common.print_error("Cannot sign blank message.")
        return None
    # Sign the message by encrypting its hash with the private key:
    try:
        signature = crypto.sign(given_key,given_message,'sha512')
        signature = base64.b64encode(signature)
    except crypto.Error:
        common.print_error("Error signing message!")
        signature = ''
    # Return signature:
    return signature
项目:estreamer    作者:spohara79    | 项目源码 | 文件源码
def __init__(self, host, port, verify, cert_path, pkey_path, pkey_passphrase=''):
        self.host = host
        self.port = port
        try:
            self.pkey = crypto.load_privatekey(crypto.FILETYPE_PEM, open(pkey_path, 'rb').read(), pkey_passphrase)
        except IOError:
            raise eStreamerKeyError("Unable to locate key file {}".format(pkey_path))
        except crypto.Error:
            raise eStreamerKeyError("Invalid key file or bad passphrase {}".format(pkey_path))
        try:
            self.cert = crypto.load_certificate(crypto.FILETYPE_PEM, open(cert_path, 'rb').read())
        except IOError:
            raise eStreamerCertError("Unable to locate cert file {}".format(cert_path))
        except crypto.Error:
            raise eStreamerCertError("Invalid certificate {}".format(cert_path))
        self.verify = verify
        self.ctx = None
        self.sock = None
        self._bytes = None
项目:oscars2016    作者:0x0ece    | 项目源码 | 文件源码
def verify(self, message, signature):
        """Verifies a message against a signature.

        Args:
        message: string or bytes, The message to verify. If string, will be
                 encoded to bytes as utf-8.
        signature: string or bytes, The signature on the message. If string,
                   will be encoded to bytes as utf-8.

        Returns:
            True if message was signed by the private key associated with the
            public key that this object was constructed with.
        """
        message = _to_bytes(message, encoding='utf-8')
        signature = _to_bytes(signature, encoding='utf-8')
        try:
            crypto.verify(self._pubkey, signature, message, 'sha256')
            return True
        except crypto.Error:
            return False
项目:oscars2016    作者:0x0ece    | 项目源码 | 文件源码
def from_string(key_pem, is_x509_cert):
        """Construct a Verified instance from a string.

        Args:
            key_pem: string, public key in PEM format.
            is_x509_cert: bool, True if key_pem is an X509 cert, otherwise it
                          is expected to be an RSA key in PEM format.

        Returns:
            Verifier instance.

        Raises:
            OpenSSL.crypto.Error: if the key_pem can't be parsed.
        """
        key_pem = _to_bytes(key_pem)
        if is_x509_cert:
            pubkey = crypto.load_certificate(crypto.FILETYPE_PEM, key_pem)
        else:
            pubkey = crypto.load_privatekey(crypto.FILETYPE_PEM, key_pem)
        return OpenSSLVerifier(pubkey)
项目:oscars2016    作者:0x0ece    | 项目源码 | 文件源码
def from_string(key, password=b'notasecret'):
        """Construct a Signer instance from a string.

        Args:
            key: string, private key in PKCS12 or PEM format.
            password: string, password for the private key file.

        Returns:
            Signer instance.

        Raises:
            OpenSSL.crypto.Error if the key can't be parsed.
        """
        key = _to_bytes(key)
        parsed_pem_key = _parse_pem_key(key)
        if parsed_pem_key:
            pkey = crypto.load_privatekey(crypto.FILETYPE_PEM, parsed_pem_key)
        else:
            password = _to_bytes(password, encoding='utf-8')
            pkey = crypto.load_pkcs12(key, password).get_privatekey()
        return OpenSSLSigner(pkey)
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def test_invalid_extension(self):
        """
        L{X509Extension} raises something if it is passed a bad extension
        name or value.
        """
        self.assertRaises(
            Error, X509Extension, 'thisIsMadeUp', False, 'hi')
        self.assertRaises(
            Error, X509Extension, 'basicConstraints', False, 'blah blah')

        # Exercise a weird one (an extension which uses the r2i method).  This
        # exercises the codepath that requires a non-NULL ctx to be passed to
        # X509V3_EXT_nconf.  It can't work now because we provide no
        # configuration database.  It might be made to work in the future.
        self.assertRaises(
            Error, X509Extension, 'proxyCertInfo', True,
            'language:id-ppl-anyLanguage,pathlen:1,policy:text:AB')
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def test_key_only(self):
        """
        A L{PKCS12} with only a private key can be exported using
        L{PKCS12.export} and loaded again using L{load_pkcs12}.
        """
        passwd = 'blah'
        p12 = PKCS12()
        pkey = load_privatekey(FILETYPE_PEM, cleartextPrivateKeyPEM)
        p12.set_privatekey(pkey)
        self.assertEqual(None, p12.get_certificate())
        self.assertEqual(pkey, p12.get_privatekey())
        try:
            dumped_p12 = p12.export(passphrase=passwd, iter=2, maciter=3)
        except Error:
            # Some versions of OpenSSL will throw an exception
            # for this nearly useless PKCS12 we tried to generate:
            # [('PKCS12 routines', 'PKCS12_create', 'invalid null argument')]
            return
        p12 = load_pkcs12(dumped_p12, passwd)
        self.assertEqual(None, p12.get_ca_certificates())
        self.assertEqual(None, p12.get_certificate())

        # OpenSSL fails to bring the key back to us.  So sad.  Perhaps in the
        # future this will be improved.
        self.assertTrue(isinstance(p12.get_privatekey(), (PKey, type(None))))
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def test_load_without_mac(self):
        """
        Loading a PKCS12 without a MAC does something other than crash.
        """
        passwd = 'Lake Michigan'
        p12 = self.gen_pkcs12(server_cert_pem, server_key_pem, root_cert_pem)
        dumped_p12 = p12.export(maciter=-1, passphrase=passwd, iter=2)
        try:
            recovered_p12 = load_pkcs12(dumped_p12, passwd)
            # The person who generated this PCKS12 should be flogged,
            # or better yet we should have a means to determine
            # whether a PCKS12 had a MAC that was verified.
            # Anyway, libopenssl chooses to allow it, so the
            # pyopenssl binding does as well.
            self.assertTrue(isinstance(recovered_p12, PKCS12))
        except Error:
            # Failing here with an exception is preferred as some openssl
            # versions do.
            pass
项目:GAMADV-XTD    作者:taers232c    | 项目源码 | 文件源码
def verify(self, message, signature):
        """Verifies a message against a signature.

        Args:
        message: string or bytes, The message to verify. If string, will be
                 encoded to bytes as utf-8.
        signature: string or bytes, The signature on the message. If string,
                   will be encoded to bytes as utf-8.

        Returns:
            True if message was signed by the private key associated with the
            public key that this object was constructed with.
        """
        message = _helpers._to_bytes(message, encoding='utf-8')
        signature = _helpers._to_bytes(signature, encoding='utf-8')
        try:
            crypto.verify(self._pubkey, signature, message, 'sha256')
            return True
        except crypto.Error:
            return False
项目:GAMADV-XTD    作者:taers232c    | 项目源码 | 文件源码
def from_string(key_pem, is_x509_cert):
        """Construct a Verified instance from a string.

        Args:
            key_pem: string, public key in PEM format.
            is_x509_cert: bool, True if key_pem is an X509 cert, otherwise it
                          is expected to be an RSA key in PEM format.

        Returns:
            Verifier instance.

        Raises:
            OpenSSL.crypto.Error: if the key_pem can't be parsed.
        """
        key_pem = _helpers._to_bytes(key_pem)
        if is_x509_cert:
            pubkey = crypto.load_certificate(crypto.FILETYPE_PEM, key_pem)
        else:
            pubkey = crypto.load_privatekey(crypto.FILETYPE_PEM, key_pem)
        return OpenSSLVerifier(pubkey)
项目:GAMADV-XTD    作者:taers232c    | 项目源码 | 文件源码
def from_string(key, password=b'notasecret'):
        """Construct a Signer instance from a string.

        Args:
            key: string, private key in PKCS12 or PEM format.
            password: string, password for the private key file.

        Returns:
            Signer instance.

        Raises:
            OpenSSL.crypto.Error if the key can't be parsed.
        """
        key = _helpers._to_bytes(key)
        parsed_pem_key = _helpers._parse_pem_key(key)
        if parsed_pem_key:
            pkey = crypto.load_privatekey(crypto.FILETYPE_PEM, parsed_pem_key)
        else:
            password = _helpers._to_bytes(password, encoding='utf-8')
            pkey = crypto.load_pkcs12(key, password).get_privatekey()
        return OpenSSLSigner(pkey)
项目:munki-enrollment-client    作者:gerritdewitt    | 项目源码 | 文件源码
def read_client_identity():
    '''Loads the private key and certificate objects as read
        from the client identity PEM file.  Returns a pair of objects
        (key,cert) or None if something bad happened.'''
    common.print_info("Loading identity file...")
    # Check for missing client identity:
    if not os.path.exists(config_paths.CLIENT_IDENTITY_INSTALLED_FILE_PATH):
        common.print_error("No client identity file found at %s." % config_paths.CLIENT_IDENTITY_INSTALLED_FILE_PATH)
        return None
    # Read and load PKI material from the client identity:
    file_object = open(config_paths.CLIENT_IDENTITY_INSTALLED_FILE_PATH,'r')
    file_contents = file_object.read()
    file_object.close()
    try:
        cert = crypto.load_certificate(crypto.FILETYPE_PEM,file_contents)
    except crypto.Error:
        common.print_error("Could not read the certificate from %s." % config_paths.CLIENT_IDENTITY_INSTALLED_FILE_PATH)
        cert = None
    try:
        key = crypto.load_privatekey(crypto.FILETYPE_PEM,file_contents)
    except crypto.Error:
        common.print_error("Could not read the private key from %s." % config_paths.CLIENT_IDENTITY_INSTALLED_FILE_PATH)
        key = None
    # Return PKI materials:
    return key,cert
项目:vmware-nsxlib    作者:openstack    | 项目源码 | 文件源码
def _get_cert_from_file(self, filename):
        with open(filename, 'r') as f:
            cert_pem = f.read()

        if not cert_pem:
            raise nsxlib_exceptions.CertificateError(
                msg=_("Failed to read certificate from %s") % filename)

        # validate correct crypto
        try:
            cert = crypto.load_certificate(crypto.FILETYPE_PEM, cert_pem)
        except crypto.Error:
            raise nsxlib_exceptions.CertificateError(
                msg=_("Failed to import client certificate"))

        return cert
项目:vmware-nsxlib    作者:openstack    | 项目源码 | 文件源码
def get_cert_and_key(self):
        """Load cert and key from storage"""
        if self._cert and self._key:
            return self._cert, self._key

        cert_pem, key_pem = self._load_from_storage()

        if cert_pem is None:
            return None, None

        try:
            cert = crypto.load_certificate(crypto.FILETYPE_PEM, cert_pem)
            key = crypto.load_privatekey(crypto.FILETYPE_PEM, key_pem)
        except crypto.Error:
            raise nsxlib_exceptions.CertificateError(
                msg="Failed to load client certificate")

        return cert, key
项目:deb-python-oauth2client    作者:openstack    | 项目源码 | 文件源码
def verify(self, message, signature):
        """Verifies a message against a signature.

        Args:
        message: string or bytes, The message to verify. If string, will be
                 encoded to bytes as utf-8.
        signature: string or bytes, The signature on the message. If string,
                   will be encoded to bytes as utf-8.

        Returns:
            True if message was signed by the private key associated with the
            public key that this object was constructed with.
        """
        message = _helpers._to_bytes(message, encoding='utf-8')
        signature = _helpers._to_bytes(signature, encoding='utf-8')
        try:
            crypto.verify(self._pubkey, signature, message, 'sha256')
            return True
        except crypto.Error:
            return False
项目:deb-python-oauth2client    作者:openstack    | 项目源码 | 文件源码
def from_string(key_pem, is_x509_cert):
        """Construct a Verified instance from a string.

        Args:
            key_pem: string, public key in PEM format.
            is_x509_cert: bool, True if key_pem is an X509 cert, otherwise it
                          is expected to be an RSA key in PEM format.

        Returns:
            Verifier instance.

        Raises:
            OpenSSL.crypto.Error: if the key_pem can't be parsed.
        """
        key_pem = _helpers._to_bytes(key_pem)
        if is_x509_cert:
            pubkey = crypto.load_certificate(crypto.FILETYPE_PEM, key_pem)
        else:
            pubkey = crypto.load_privatekey(crypto.FILETYPE_PEM, key_pem)
        return OpenSSLVerifier(pubkey)
项目:deb-python-oauth2client    作者:openstack    | 项目源码 | 文件源码
def from_string(key, password=b'notasecret'):
        """Construct a Signer instance from a string.

        Args:
            key: string, private key in PKCS12 or PEM format.
            password: string, password for the private key file.

        Returns:
            Signer instance.

        Raises:
            OpenSSL.crypto.Error if the key can't be parsed.
        """
        key = _helpers._to_bytes(key)
        parsed_pem_key = _helpers._parse_pem_key(key)
        if parsed_pem_key:
            pkey = crypto.load_privatekey(crypto.FILETYPE_PEM, parsed_pem_key)
        else:
            password = _helpers._to_bytes(password, encoding='utf-8')
            pkey = crypto.load_pkcs12(key, password).get_privatekey()
        return OpenSSLSigner(pkey)
项目:paas-tools    作者:imperodesign    | 项目源码 | 文件源码
def logs(self, log_lines=str(settings.LOG_LINES)):
        """Return aggregated log data for this application."""
        try:
            url = "http://{}:{}/{}?log_lines={}".format(settings.LOGGER_HOST, settings.LOGGER_PORT,
                                                        self.id, log_lines)
            r = requests.get(url)
        # Handle HTTP request errors
        except requests.exceptions.RequestException as e:
            logger.error("Error accessing deis-logger using url '{}': {}".format(url, e))
            raise e
        # Handle logs empty or not found
        if r.status_code == 204 or r.status_code == 404:
            logger.info("GET {} returned a {} status code".format(url, r.status_code))
            raise EnvironmentError('Could not locate logs')
        # Handle unanticipated status codes
        if r.status_code != 200:
            logger.error("Error accessing deis-logger: GET {} returned a {} status code"
                         .format(url, r.status_code))
            raise EnvironmentError('Error accessing deis-logger')
        return r.content
项目:2FAssassin    作者:maxwellkoh    | 项目源码 | 文件源码
def test_sign(self):
        """
        `X509Req.sign` succeeds when passed a private key object and a
        valid digest function. `X509Req.verify` can be used to check
        the signature.
        """
        request = self.signable()
        key = PKey()
        key.generate_key(TYPE_RSA, 512)
        request.set_pubkey(key)
        request.sign(key, GOOD_DIGEST)
        # If the type has a verify method, cover that too.
        if getattr(request, 'verify', None) is not None:
            pub = request.get_pubkey()
            assert request.verify(pub)
            # Make another key that won't verify.
            key = PKey()
            key.generate_key(TYPE_RSA, 512)
            with pytest.raises(Error):
                request.verify(key)
项目:2FAssassin    作者:maxwellkoh    | 项目源码 | 文件源码
def test_load_without_mac(self):
        """
        Loading a PKCS12 without a MAC does something other than crash.
        """
        passwd = b"Lake Michigan"
        p12 = self.gen_pkcs12(server_cert_pem, server_key_pem, root_cert_pem)
        dumped_p12 = p12.export(maciter=-1, passphrase=passwd, iter=2)
        try:
            recovered_p12 = load_pkcs12(dumped_p12, passwd)
            # The person who generated this PCKS12 should be flogged,
            # or better yet we should have a means to determine
            # whether a PCKS12 had a MAC that was verified.
            # Anyway, libopenssl chooses to allow it, so the
            # pyopenssl binding does as well.
            assert isinstance(recovered_p12, PKCS12)
        except Error:
            # Failing here with an exception is preferred as some openssl
            # versions do.
            pass
项目:REMAP    作者:REMAPApp    | 项目源码 | 文件源码
def verify(self, message, signature):
        """Verifies a message against a signature.

        Args:
        message: string or bytes, The message to verify. If string, will be
                 encoded to bytes as utf-8.
        signature: string or bytes, The signature on the message. If string,
                   will be encoded to bytes as utf-8.

        Returns:
            True if message was signed by the private key associated with the
            public key that this object was constructed with.
        """
        message = _helpers._to_bytes(message, encoding='utf-8')
        signature = _helpers._to_bytes(signature, encoding='utf-8')
        try:
            crypto.verify(self._pubkey, signature, message, 'sha256')
            return True
        except crypto.Error:
            return False
项目:REMAP    作者:REMAPApp    | 项目源码 | 文件源码
def from_string(key_pem, is_x509_cert):
        """Construct a Verified instance from a string.

        Args:
            key_pem: string, public key in PEM format.
            is_x509_cert: bool, True if key_pem is an X509 cert, otherwise it
                          is expected to be an RSA key in PEM format.

        Returns:
            Verifier instance.

        Raises:
            OpenSSL.crypto.Error: if the key_pem can't be parsed.
        """
        key_pem = _helpers._to_bytes(key_pem)
        if is_x509_cert:
            pubkey = crypto.load_certificate(crypto.FILETYPE_PEM, key_pem)
        else:
            pubkey = crypto.load_privatekey(crypto.FILETYPE_PEM, key_pem)
        return OpenSSLVerifier(pubkey)
项目:REMAP    作者:REMAPApp    | 项目源码 | 文件源码
def from_string(key, password=b'notasecret'):
        """Construct a Signer instance from a string.

        Args:
            key: string, private key in PKCS12 or PEM format.
            password: string, password for the private key file.

        Returns:
            Signer instance.

        Raises:
            OpenSSL.crypto.Error if the key can't be parsed.
        """
        key = _helpers._to_bytes(key)
        parsed_pem_key = _helpers._parse_pem_key(key)
        if parsed_pem_key:
            pkey = crypto.load_privatekey(crypto.FILETYPE_PEM, parsed_pem_key)
        else:
            password = _helpers._to_bytes(password, encoding='utf-8')
            pkey = crypto.load_pkcs12(key, password).get_privatekey()
        return OpenSSLSigner(pkey)
项目:sslstrip-hsts-openwrt    作者:adde88    | 项目源码 | 文件源码
def test_invalid_extension(self):
        """
        L{X509Extension} raises something if it is passed a bad extension
        name or value.
        """
        self.assertRaises(
            Error, X509Extension, 'thisIsMadeUp', False, 'hi')
        self.assertRaises(
            Error, X509Extension, 'basicConstraints', False, 'blah blah')

        # Exercise a weird one (an extension which uses the r2i method).  This
        # exercises the codepath that requires a non-NULL ctx to be passed to
        # X509V3_EXT_nconf.  It can't work now because we provide no
        # configuration database.  It might be made to work in the future.
        self.assertRaises(
            Error, X509Extension, 'proxyCertInfo', True,
            'language:id-ppl-anyLanguage,pathlen:1,policy:text:AB')
项目:sslstrip-hsts-openwrt    作者:adde88    | 项目源码 | 文件源码
def test_key_only(self):
        """
        A L{PKCS12} with only a private key can be exported using
        L{PKCS12.export} and loaded again using L{load_pkcs12}.
        """
        passwd = 'blah'
        p12 = PKCS12()
        pkey = load_privatekey(FILETYPE_PEM, cleartextPrivateKeyPEM)
        p12.set_privatekey(pkey)
        self.assertEqual(None, p12.get_certificate())
        self.assertEqual(pkey, p12.get_privatekey())
        try:
            dumped_p12 = p12.export(passphrase=passwd, iter=2, maciter=3)
        except Error:
            # Some versions of OpenSSL will throw an exception
            # for this nearly useless PKCS12 we tried to generate:
            # [('PKCS12 routines', 'PKCS12_create', 'invalid null argument')]
            return
        p12 = load_pkcs12(dumped_p12, passwd)
        self.assertEqual(None, p12.get_ca_certificates())
        self.assertEqual(None, p12.get_certificate())

        # OpenSSL fails to bring the key back to us.  So sad.  Perhaps in the
        # future this will be improved.
        self.assertTrue(isinstance(p12.get_privatekey(), (PKey, type(None))))
项目:sslstrip-hsts-openwrt    作者:adde88    | 项目源码 | 文件源码
def test_load_without_mac(self):
        """
        Loading a PKCS12 without a MAC does something other than crash.
        """
        passwd = 'Lake Michigan'
        p12 = self.gen_pkcs12(server_cert_pem, server_key_pem, root_cert_pem)
        dumped_p12 = p12.export(maciter=-1, passphrase=passwd, iter=2)
        try:
            recovered_p12 = load_pkcs12(dumped_p12, passwd)
            # The person who generated this PCKS12 should be flogged,
            # or better yet we should have a means to determine
            # whether a PCKS12 had a MAC that was verified.
            # Anyway, libopenssl chooses to allow it, so the
            # pyopenssl binding does as well.
            self.assertTrue(isinstance(recovered_p12, PKCS12))
        except Error:
            # Failing here with an exception is preferred as some openssl
            # versions do.
            pass
项目:ecodash    作者:Servir-Mekong    | 项目源码 | 文件源码
def verify(self, message, signature):
        """Verifies a message against a signature.

        Args:
        message: string or bytes, The message to verify. If string, will be
                 encoded to bytes as utf-8.
        signature: string or bytes, The signature on the message. If string,
                   will be encoded to bytes as utf-8.

        Returns:
            True if message was signed by the private key associated with the
            public key that this object was constructed with.
        """
        message = _to_bytes(message, encoding='utf-8')
        signature = _to_bytes(signature, encoding='utf-8')
        try:
            crypto.verify(self._pubkey, signature, message, 'sha256')
            return True
        except crypto.Error:
            return False
项目:ecodash    作者:Servir-Mekong    | 项目源码 | 文件源码
def from_string(key_pem, is_x509_cert):
        """Construct a Verified instance from a string.

        Args:
            key_pem: string, public key in PEM format.
            is_x509_cert: bool, True if key_pem is an X509 cert, otherwise it
                          is expected to be an RSA key in PEM format.

        Returns:
            Verifier instance.

        Raises:
            OpenSSL.crypto.Error: if the key_pem can't be parsed.
        """
        key_pem = _to_bytes(key_pem)
        if is_x509_cert:
            pubkey = crypto.load_certificate(crypto.FILETYPE_PEM, key_pem)
        else:
            pubkey = crypto.load_privatekey(crypto.FILETYPE_PEM, key_pem)
        return OpenSSLVerifier(pubkey)
项目:ecodash    作者:Servir-Mekong    | 项目源码 | 文件源码
def from_string(key, password=b'notasecret'):
        """Construct a Signer instance from a string.

        Args:
            key: string, private key in PKCS12 or PEM format.
            password: string, password for the private key file.

        Returns:
            Signer instance.

        Raises:
            OpenSSL.crypto.Error if the key can't be parsed.
        """
        key = _to_bytes(key)
        parsed_pem_key = _parse_pem_key(key)
        if parsed_pem_key:
            pkey = crypto.load_privatekey(crypto.FILETYPE_PEM, parsed_pem_key)
        else:
            password = _to_bytes(password, encoding='utf-8')
            pkey = crypto.load_pkcs12(key, password).get_privatekey()
        return OpenSSLSigner(pkey)
项目:munki-enrollment-server    作者:gerritdewitt    | 项目源码 | 文件源码
def read_ca_cert():
    '''Loads CA certificate from the filesystem and returns it as an object.
        Returns None if something went wrong.'''
    common.logging_info("Loading CA certificate.")
    # Check for missing CA certificate:
    if not os.path.exists(config_ca.CA_CERT_FILE_PATH):
        common.logging_error("No CA certificate file found at %s." % config_ca.CA_CERT_FILE_PATH)
        return None
    # Read and load CA certificate:
    try:
        file_object = open(config_ca.CA_CERT_FILE_PATH,'r')
        file_contents = file_object.read()
        file_object.close()
    except IOError:
        return None
    # Return CA cert:
    try:
        return crypto.load_certificate(crypto.FILETYPE_PEM,file_contents)
    except crypto.Error:
        common.logging_error("Could not read CA certificate from %s." % config_ca.CA_CERT_FILE_PATH)
        return None
项目:munki-enrollment-server    作者:gerritdewitt    | 项目源码 | 文件源码
def make_p12_with_ca_cert(given_ca_cert,given_passphrase):
    '''Given CA cert object and passphrase, create a PKCS container
        and return it as data.  Return None if something went wrong.'''
    common.logging_info("Generating PKCS12 store for given CA certificate.")
    p = crypto.PKCS12()
    try:
        p.set_ca_certificates([given_ca_cert])
    except crypto.Error:
        common.logging_error("Could create a p12 object with given CA certificate.")
        return None
    try:
        p12_ca_data = p.export(passphrase=given_passphrase)
    except crypto.Error:
        common.logging_error("Could not export data from p12 object.")
        return None
    return p12_ca_data

#PRAGMA MARK: SIGNING & VERIFICATION METHODS
项目:munki-enrollment-server    作者:gerritdewitt    | 项目源码 | 文件源码
def verify_signed_message(given_message,given_encoded_sig,given_cert):
    '''Given strings for the message, its signature, and a certificate object,
        verify the message.  Returns true or false.'''
    # Decode given signature (expected base64 encoding):
    try:
        decoded_sig = base64.b64decode(given_encoded_sig)
    except:
        common.logging_error("Could not interpret encoded signature.")
        return False
    # Verify the signature:
    try:
        # OpenSSL is funny.  A successful crypto.verify() returns a None object!
        crypto.verify(given_cert,decoded_sig,given_message,config_client_pki.CSR_SIGNING_HASH_ALGORITHM)
        return True
    except crypto.Error:
        common.logging_error("Message fails verification.")
        return False
项目:python-gnutls    作者:AGProjects    | 项目源码 | 文件源码
def __new__(typ, value):
        if isinstance(value, basestring):
            path = process.config_file(value)
            if path is None:
                log.warn("Private key file '%s' is not readable" % value)
                return None
            try:
                f = open(path, 'rt')
            except:
                log.warn("Private key file '%s' could not be open" % value)
                return None
            try:
                try:
                    return crypto.load_privatekey(crypto.FILETYPE_PEM, f.read())
                except crypto.Error, e:
                    log.warn("Private key file '%s' could not be loaded: %s" % (value, str(e)))
                    return None
            finally:
                f.close()
        else:
            raise TypeError, 'value should be a string'
项目:python-gnutls    作者:AGProjects    | 项目源码 | 文件源码
def __new__(typ, value):
        if isinstance(value, basestring):
            path = process.config_file(value)
            if path is None:
                log.warn("Certificate file '%s' is not readable" % value)
                return None
            try:
                f = open(path, 'rt')
            except:
                log.warn("Certificate file '%s' could not be open" % value)
                return None
            try:
                try:
                    return crypto.load_certificate(crypto.FILETYPE_PEM, f.read())
                except crypto.Error, e:
                    log.warn("Certificate file '%s' could not be loaded: %s" % (value, str(e)))
                    return None
            finally:
                f.close()
        else:
            raise TypeError, 'value should be a string'
项目:python-gnutls    作者:AGProjects    | 项目源码 | 文件源码
def __new__(typ, value):
        if isinstance(value, basestring):
            path = process.config_file(value)
            if path is None:
                log.warn("Private key file '%s' is not readable" % value)
                return None
            try:
                f = open(path, 'rt')
            except:
                log.warn("Private key file '%s' could not be open" % value)
                return None
            try:
                try:
                    return crypto.load_privatekey(crypto.FILETYPE_PEM, f.read())
                except crypto.Error, e:
                    log.warn("Private key file '%s' could not be loaded: %s" % (value, str(e)))
                    return None
            finally:
                f.close()
        else:
            raise TypeError, 'value should be a string'
项目:OneClickDTU    作者:satwikkansal    | 项目源码 | 文件源码
def test_invalid_extension(self):
        """
        :py:class:`X509Extension` raises something if it is passed a bad extension
        name or value.
        """
        self.assertRaises(
            Error, X509Extension, b('thisIsMadeUp'), False, b('hi'))
        self.assertRaises(
            Error, X509Extension, b('basicConstraints'), False, b('blah blah'))

        # Exercise a weird one (an extension which uses the r2i method).  This
        # exercises the codepath that requires a non-NULL ctx to be passed to
        # X509V3_EXT_nconf.  It can't work now because we provide no
        # configuration database.  It might be made to work in the future.
        self.assertRaises(
            Error, X509Extension, b('proxyCertInfo'), True,
            b('language:id-ppl-anyLanguage,pathlen:1,policy:text:AB'))
项目:OneClickDTU    作者:satwikkansal    | 项目源码 | 文件源码
def test_sign(self):
        """
        :py:meth:`X509Req.sign` succeeds when passed a private key object and a valid
        digest function.  :py:meth:`X509Req.verify` can be used to check the signature.
        """
        request = self.signable()
        key = PKey()
        key.generate_key(TYPE_RSA, 512)
        request.set_pubkey(key)
        request.sign(key, GOOD_DIGEST)
        # If the type has a verify method, cover that too.
        if getattr(request, 'verify', None) is not None:
            pub = request.get_pubkey()
            self.assertTrue(request.verify(pub))
            # Make another key that won't verify.
            key = PKey()
            key.generate_key(TYPE_RSA, 512)
            self.assertRaises(Error, request.verify, key)
项目:OneClickDTU    作者:satwikkansal    | 项目源码 | 文件源码
def test_load_without_mac(self):
        """
        Loading a PKCS12 without a MAC does something other than crash.
        """
        passwd = b"Lake Michigan"
        p12 = self.gen_pkcs12(server_cert_pem, server_key_pem, root_cert_pem)
        dumped_p12 = p12.export(maciter=-1, passphrase=passwd, iter=2)
        try:
            recovered_p12 = load_pkcs12(dumped_p12, passwd)
            # The person who generated this PCKS12 should be flogged,
            # or better yet we should have a means to determine
            # whether a PCKS12 had a MAC that was verified.
            # Anyway, libopenssl chooses to allow it, so the
            # pyopenssl binding does as well.
            self.assertTrue(isinstance(recovered_p12, PKCS12))
        except Error:
            # Failing here with an exception is preferred as some openssl
            # versions do.
            pass
项目:OneClickDTU    作者:satwikkansal    | 项目源码 | 文件源码
def tearDown(self):
        """
        Clean up any files or directories created using :py:meth:`TestCase.mktemp`.
        Subclasses must invoke this method if they override it or the
        cleanup will not occur.
        """
        if False and self._temporaryFiles is not None:
            for temp in self._temporaryFiles:
                if os.path.isdir(temp):
                    shutil.rmtree(temp)
                elif os.path.exists(temp):
                    os.unlink(temp)
        try:
            exception_from_error_queue(Error)
        except Error:
            e = sys.exc_info()[1]
            if e.args != ([],):
                self.fail("Left over errors in OpenSSL error queue: " + repr(e))
项目:OneClickDTU    作者:satwikkansal    | 项目源码 | 文件源码
def from_string(key_pem, is_x509_cert):
        """Construct a Verified instance from a string.

        Args:
            key_pem: string, public key in PEM format.
            is_x509_cert: bool, True if key_pem is an X509 cert, otherwise it
                          is expected to be an RSA key in PEM format.

        Returns:
            Verifier instance.

        Raises:
            OpenSSL.crypto.Error: if the key_pem can't be parsed.
        """
        if is_x509_cert:
            pubkey = crypto.load_certificate(crypto.FILETYPE_PEM, key_pem)
        else:
            pubkey = crypto.load_privatekey(crypto.FILETYPE_PEM, key_pem)
        return OpenSSLVerifier(pubkey)
项目:OneClickDTU    作者:satwikkansal    | 项目源码 | 文件源码
def from_string(key, password=b'notasecret'):
        """Construct a Signer instance from a string.

        Args:
            key: string, private key in PKCS12 or PEM format.
            password: string, password for the private key file.

        Returns:
            Signer instance.

        Raises:
            OpenSSL.crypto.Error if the key can't be parsed.
        """
        parsed_pem_key = _parse_pem_key(key)
        if parsed_pem_key:
            pkey = crypto.load_privatekey(crypto.FILETYPE_PEM, parsed_pem_key)
        else:
            password = _to_bytes(password, encoding='utf-8')
            pkey = crypto.load_pkcs12(key, password).get_privatekey()
        return OpenSSLSigner(pkey)
项目:odoo-brasil    作者:Trust-Code    | 项目源码 | 文件源码
def _compute_expiry_date(self):
        try:
            pfx = base64.decodestring(
                self.with_context(bin_size=False).nfe_a1_file)
            pfx = crypto.load_pkcs12(pfx, self.nfe_a1_password)
            cert = pfx.get_certificate()
            end = datetime.strptime(
                cert.get_notAfter().decode(), '%Y%m%d%H%M%SZ')
            subj = cert.get_subject()
            self.cert_expire_date = end
            if datetime.now() < end:
                self.cert_state = 'valid'
            else:
                self.cert_state = 'expired'
            self.cert_information = "%s\n%s\n%s\n%s" % (
                subj.CN, subj.L, subj.O, subj.OU)
        except crypto.Error:
            self.cert_state = 'invalid_password'
        except:
            self.cert_state = 'unknown'
            _logger.error(
                u'Erro desconhecido ao consultar certificado', exc_info=True)
项目:aqua-monitor    作者:Deltares    | 项目源码 | 文件源码
def verify(self, message, signature):
        """Verifies a message against a signature.

        Args:
        message: string or bytes, The message to verify. If string, will be
                 encoded to bytes as utf-8.
        signature: string or bytes, The signature on the message. If string,
                   will be encoded to bytes as utf-8.

        Returns:
            True if message was signed by the private key associated with the
            public key that this object was constructed with.
        """
        message = _to_bytes(message, encoding='utf-8')
        signature = _to_bytes(signature, encoding='utf-8')
        try:
            crypto.verify(self._pubkey, signature, message, 'sha256')
            return True
        except crypto.Error:
            return False
项目:aqua-monitor    作者:Deltares    | 项目源码 | 文件源码
def from_string(key_pem, is_x509_cert):
        """Construct a Verified instance from a string.

        Args:
            key_pem: string, public key in PEM format.
            is_x509_cert: bool, True if key_pem is an X509 cert, otherwise it
                          is expected to be an RSA key in PEM format.

        Returns:
            Verifier instance.

        Raises:
            OpenSSL.crypto.Error: if the key_pem can't be parsed.
        """
        if is_x509_cert:
            pubkey = crypto.load_certificate(crypto.FILETYPE_PEM, key_pem)
        else:
            pubkey = crypto.load_privatekey(crypto.FILETYPE_PEM, key_pem)
        return OpenSSLVerifier(pubkey)
项目:aqua-monitor    作者:Deltares    | 项目源码 | 文件源码
def from_string(key, password=b'notasecret'):
        """Construct a Signer instance from a string.

        Args:
            key: string, private key in PKCS12 or PEM format.
            password: string, password for the private key file.

        Returns:
            Signer instance.

        Raises:
            OpenSSL.crypto.Error if the key can't be parsed.
        """
        parsed_pem_key = _parse_pem_key(key)
        if parsed_pem_key:
            pkey = crypto.load_privatekey(crypto.FILETYPE_PEM, parsed_pem_key)
        else:
            password = _to_bytes(password, encoding='utf-8')
            pkey = crypto.load_pkcs12(key, password).get_privatekey()
        return OpenSSLSigner(pkey)
项目:SurfaceWaterTool    作者:Servir-Mekong    | 项目源码 | 文件源码
def verify(self, message, signature):
        """Verifies a message against a signature.

        Args:
        message: string or bytes, The message to verify. If string, will be
                 encoded to bytes as utf-8.
        signature: string or bytes, The signature on the message. If string,
                   will be encoded to bytes as utf-8.

        Returns:
            True if message was signed by the private key associated with the
            public key that this object was constructed with.
        """
        message = _to_bytes(message, encoding='utf-8')
        signature = _to_bytes(signature, encoding='utf-8')
        try:
            crypto.verify(self._pubkey, signature, message, 'sha256')
            return True
        except crypto.Error:
            return False
项目:SurfaceWaterTool    作者:Servir-Mekong    | 项目源码 | 文件源码
def from_string(key_pem, is_x509_cert):
        """Construct a Verified instance from a string.

        Args:
            key_pem: string, public key in PEM format.
            is_x509_cert: bool, True if key_pem is an X509 cert, otherwise it
                          is expected to be an RSA key in PEM format.

        Returns:
            Verifier instance.

        Raises:
            OpenSSL.crypto.Error: if the key_pem can't be parsed.
        """
        key_pem = _to_bytes(key_pem)
        if is_x509_cert:
            pubkey = crypto.load_certificate(crypto.FILETYPE_PEM, key_pem)
        else:
            pubkey = crypto.load_privatekey(crypto.FILETYPE_PEM, key_pem)
        return OpenSSLVerifier(pubkey)
项目:SurfaceWaterTool    作者:Servir-Mekong    | 项目源码 | 文件源码
def from_string(key, password=b'notasecret'):
        """Construct a Signer instance from a string.

        Args:
            key: string, private key in PKCS12 or PEM format.
            password: string, password for the private key file.

        Returns:
            Signer instance.

        Raises:
            OpenSSL.crypto.Error if the key can't be parsed.
        """
        key = _to_bytes(key)
        parsed_pem_key = _parse_pem_key(key)
        if parsed_pem_key:
            pkey = crypto.load_privatekey(crypto.FILETYPE_PEM, parsed_pem_key)
        else:
            password = _to_bytes(password, encoding='utf-8')
            pkey = crypto.load_pkcs12(key, password).get_privatekey()
        return OpenSSLSigner(pkey)
项目:metrics    作者:Jeremy-Friedman    | 项目源码 | 文件源码
def verify(self, message, signature):
        """Verifies a message against a signature.

        Args:
        message: string or bytes, The message to verify. If string, will be
                 encoded to bytes as utf-8.
        signature: string or bytes, The signature on the message. If string,
                   will be encoded to bytes as utf-8.

        Returns:
            True if message was signed by the private key associated with the
            public key that this object was constructed with.
        """
        message = _to_bytes(message, encoding='utf-8')
        signature = _to_bytes(signature, encoding='utf-8')
        try:
            crypto.verify(self._pubkey, signature, message, 'sha256')
            return True
        except crypto.Error:
            return False