我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用cryptography.x509.load_pem_x509_certificate()。
def verify_renewable_cert_sig(renewable_cert): """ Verifies the signature of a `.storage.RenewableCert` object. :param `.storage.RenewableCert` renewable_cert: cert to verify :raises errors.Error: If signature verification fails. """ try: with open(renewable_cert.chain, 'rb') as chain: chain, _ = pyopenssl_load_certificate(chain.read()) with open(renewable_cert.cert, 'rb') as cert: cert = x509.load_pem_x509_certificate(cert.read(), default_backend()) hash_name = cert.signature_hash_algorithm.name OpenSSL.crypto.verify(chain, cert.signature, cert.tbs_certificate_bytes, hash_name) except (IOError, ValueError, OpenSSL.crypto.Error) as e: error_str = "verifying the signature of the cert located at {0} has failed. \ Details: {1}".format(renewable_cert.cert, e) logger.exception(error_str) raise errors.Error(error_str)
def load_certificate(self, cert_name): # Load the raw certificate file data. path = os.path.join(self.cert_path, cert_name) with open(path, 'rb') as cert_file: data = cert_file.read() # Convert the raw certificate data into a certificate object, first # as a PEM-encoded certificate and, if that fails, then as a # DER-encoded certificate. If both fail, the certificate cannot be # loaded. try: return x509.load_pem_x509_certificate(data, default_backend()) except Exception: try: return x509.load_der_x509_certificate(data, default_backend()) except Exception: raise exception.SignatureVerificationError( "Failed to load certificate: %s" % path )
def validate_ca_cert(self, ignored): expected = self._get_expected_ca_cert_fingerprint() algo, expectedfp = expected.split(':') expectedfp = expectedfp.replace(' ', '') backend = default_backend() with open(self._get_ca_cert_path(), 'r') as f: certstr = f.read() cert = load_pem_x509_certificate(certstr, backend) hasher = getattr(hashes, algo)() fpbytes = cert.fingerprint(hasher) fp = binascii.hexlify(fpbytes) if fp != expectedfp: os.unlink(self._get_ca_cert_path()) self.log.error("Fingerprint of CA cert doesn't match: %s <-> %s" % (fp, expectedfp)) raise NetworkError("The provider's CA fingerprint doesn't match")
def read_file(self, subject_name, pw=None): """ ???? ??? :param subject_name: ??? ??? ??? ?? :return: ??? ??? """ pem_path = os.path.join(os.path.dirname(__file__), "../../resources/unittest/" + subject_name + "/cert.pem") f = open(pem_path, "rb") cert_pem = f.read() f.close() cert = x509.load_pem_x509_certificate(cert_pem, default_backend()) key_path = os.path.join(os.path.dirname(__file__), "../../resources/unittest/" + subject_name + "/key.pem") f = open(key_path, "rb") cert_key = f.read() f.close() private_key = serialization.load_pem_private_key(cert_key, pw, default_backend()) return {'cert': cert, 'private_key': private_key}
def load_pki(self, cert_path: str, cert_pass=None): """ ??? ?? :param cert_path: ??? ?? :param cert_pass: ??? ???? """ ca_cert_file = join(cert_path, self.CERT_NAME) ca_pri_file = join(cert_path, self.PRI_NAME) # ???/??? ?? with open(ca_cert_file, "rb") as der: cert_bytes = der.read() self.__ca_cert = x509.load_pem_x509_certificate(cert_bytes, default_backend()) with open(ca_pri_file, "rb") as der: private_bytes = der.read() try: self.__ca_pri = serialization.load_pem_private_key(private_bytes, cert_pass, default_backend()) except ValueError: logging.debug("Invalid Password") # ??? ? ? ?? sign = self.sign_data(b'TEST') if self.verify_data(b'TEST', sign) is False: logging.debug("Invalid Signature(Root Certificate load test)")
def validate_ca_certificate_constraints(cert_pem_data): cert = x509.load_pem_x509_certificate(cert_pem_data, default_backend()) try: constraints = cert.extensions.get_extension_for_oid(ExtensionOID.BASIC_CONSTRAINTS) constraints = constraints.value except x509.extensions.ExtensionNotFound: return if not constraints.ca: raise InvalidCertificate("Not a CA certificate") if constraints.path_length != 0: raise InvalidCertificate("Invalid pathlen") # based on ssl.match_hostname code # https://github.com/python/cpython/blob/6f0eb93183519024cb360162bdd81b9faec97ba6/Lib/ssl.py#L279
def load_certificate(cert_file=None, cert_bytes=None): if cert_file: with open(cert_file, 'rb') as f: data = f.read() else: data = cert_bytes if '-----BEGIN'.encode() in data: cert = x509.load_pem_x509_certificate( data=data, backend=default_backend() ) else: cert = x509.load_der_x509_certificate( data=data, backend=default_backend() ) return cert
def _user_cert_validation(cert_str): """Prompt user for validation of certification from cluster :param cert_str: cluster certificate bundle :type cert_str: str :returns whether or not user validated cert :rtype: bool """ cert = x509.load_pem_x509_certificate( cert_str.encode('utf-8'), default_backend()) fingerprint = cert.fingerprint(hashes.SHA256()) pp_fingerprint = ":".join("{:02x}".format(c) for c in fingerprint).upper() msg = "SHA256 fingerprint of cluster certificate bundle:\n{}".format( pp_fingerprint) return confirm(msg, False)
def get_certificate(self, kid): # retrieve keys from jwks_url resp = self.request(self.jwks_url(), method='GET') resp.raise_for_status() # find the proper key for the kid for key in resp.json()['keys']: if key['kid'] == kid: x5c = key['x5c'][0] break else: raise DecodeError('Cannot find kid={}'.format(kid)) certificate = '-----BEGIN CERTIFICATE-----\n' \ '{}\n' \ '-----END CERTIFICATE-----'.format(x5c) return load_pem_x509_certificate(certificate.encode(), default_backend())
def test_crl_0(self): """Test that the X509 CRL revocation works correctly.""" with open(os.path.join(self.crl_dir, "ch1_ta4_crl.pem"), "rb") as f: crl = x509.load_pem_x509_crl( f.read(), default_backend()) with open(os.path.join(self.cs_dir, "cs1_ch1_ta4_cert.pem"), "rb") as f: cert = x509.load_pem_x509_certificate( f.read(), default_backend()) self.assertTrue(crl.issuer == cert.issuer) for rev in crl: if rev.serial_number == cert.serial_number: break else: self.assertTrue(False, "Can not find revoked " "certificate in CRL!")
def requireRenewal(self, path, lifetime): with open(path, 'rb') as fh: crt = fh.read() data = x509.load_pem_x509_certificate(crt, default_backend()) date = data.not_valid_after - datetime.timedelta(days = lifetime) if datetime.datetime.now() <= date: return False else: return True
def certificate_expiry_date(self): if self.certificate: cert = x509.load_pem_x509_certificate(str(self.certificate), default_backend()) return cert.not_valid_after
def certificate_common_name(self): if self.certificate: cert = x509.load_pem_x509_certificate(str(self.certificate), default_backend()) return cert.subject.get_attributes_for_oid(NameOID.COMMON_NAME)[0].value
def load_pem_certificate(data): """ Loads a PEM X.509 certificate. """ return x509.load_pem_x509_certificate(data, default_backend())
def test_convert_from_cryptography(self): crypto_cert = x509.load_pem_x509_certificate( intermediate_cert_pem, backend ) cert = X509.from_cryptography(crypto_cert) assert isinstance(cert, X509) assert cert.get_version() == crypto_cert.version.value
def decode_token(token): """ Get the organisation ID from a token :param token: a JSON Web Token :returns: str, organisation ID :raises: jwt.DecodeError: Invalid token or not signed with our key jwt.ExpiredSignatureError: Token has expired jwt.InvalidAudienceError: Invalid "aud" claim jwt.InvalidIssuerError: Invalid "iss" claim jwt.MissingRequiredClaimError: Missing a required claim """ cert_file = getattr(options, 'ssl_cert', None) or LOCALHOST_CRT with open(cert_file) as f: cert = load_pem_x509_certificate(f.read(), default_backend()) public_key = cert.public_key() payload = jwt.decode(token, public_key, audience=audience(), issuer=issuer(), algorithms=[ALGORITHM], verify=True) if not payload.get('sub'): raise jwt.MissingRequiredClaimError('"sub" claim is required') payload['scope'] = Scope(payload['scope']) return payload
def parse_issuer_cred(issuer_cred): """ Given an X509 PEM file in the form of a string, parses it into sections by the PEM delimiters of: -----BEGIN <label>----- and -----END <label>---- Confirms the sections can be decoded in the proxy credential order of: issuer cert, issuer private key, proxy chain of 0 or more certs . Returns the issuer cert and private key as loaded cryptography objects and the proxy chain as a potentially empty string. """ # get each section of the PEM file sections = re.findall( "-----BEGIN.*?-----.*?-----END.*?-----", issuer_cred, flags=re.DOTALL) try: issuer_cert = sections[0] issuer_private_key = sections[1] issuer_chain_certs = sections[2:] except IndexError: raise ValueError("Unable to parse PEM data in credentials, " "make sure the X.509 file is in PEM format and " "consists of the issuer cert, issuer private key, " "and proxy chain (if any) in that order.") # then validate that each section of data can be decoded as expected try: loaded_cert = x509.load_pem_x509_certificate( six.b(issuer_cert), default_backend()) loaded_private_key = serialization.load_pem_private_key( six.b(issuer_private_key), password=None, backend=default_backend()) for chain_cert in issuer_chain_certs: x509.load_pem_x509_certificate( six.b(chain_cert), default_backend()) issuer_chain = "".join(issuer_chain_certs) except ValueError: raise ValueError("Failed to decode PEM data in credentials. Make sure " "the X.509 file consists of the issuer cert, " "issuer private key, and proxy chain (if any) " "in that order.") # return loaded cryptography objects and the issuer chain return loaded_cert, loaded_private_key, issuer_chain
def __load_cert(self, cert_dir): """???/??? ?? ? ?? ??? :param cert_dir: ???/??? ?? ?? :return: X509 ??? """ logging.debug("Cert/Key loading...") cert_file = join(cert_dir, "cert.pem") pri_file = join(cert_dir, "key.pem") f = open(cert_file, "rb") cert_bytes = f.read() f.close() cert = x509.load_pem_x509_certificate(cert_bytes, default_backend()) f = open(pri_file, "rb") pri_bytes = f.read() f.close() try: pri = serialization.load_pem_private_key(pri_bytes, self.__PASSWD, default_backend()) except ValueError: logging.debug("Invalid Password(%s)", cert_dir) return None data = b"test" signature = pri.sign(data, ec.ECDSA(hashes.SHA256())) try: pub_key = cert.public_key() result = pub_key.verify(signature, data, ec.ECDSA(hashes.SHA256())) except InvalidSignature: logging.debug("sign test fail") result = False if result: return cert else: logging.error("result is False ") return None
def convert_x509cert_from_pem(self, cert_pem): """PEM ???? x509 ???? ?? :param cert_pem: PEM ??? :return: x509 ??? """ return x509.load_pem_x509_certificate(cert_pem, default_backend())
def validate_certificate_common_name(cert_pem_data, subject_name): cert = x509.load_pem_x509_certificate(cert_pem_data, default_backend()) _match_subject_name(cert, subject_name, alt_names=False)
def validate_certificate_hosts(cert_pem_data, host_names): cert = x509.load_pem_x509_certificate(cert_pem_data, default_backend()) for host_name in host_names: _match_subject_name(cert, host_name, compare_func=ssl._dnsname_match)
def validate_certificate_host_ips(cert_pem_data, host_ips): cert = x509.load_pem_x509_certificate(cert_pem_data, default_backend()) for host_ip in host_ips: _match_subject_ip(cert, host_ip)
def validate_certificate_key_usage(cert_pem_data, is_web_server, is_web_client): cert = x509.load_pem_x509_certificate(cert_pem_data, default_backend()) try: key_usage = cert.extensions.get_extension_for_oid(ExtensionOID.KEY_USAGE) key_usage = key_usage.value except x509.extensions.ExtensionNotFound: raise InvalidCertificate("Key usage not specified") if not key_usage.digital_signature: raise InvalidCertificate("Not intented for Digital Signature") if not key_usage.key_encipherment: raise InvalidCertificate("Not intented for Key Encipherment") if is_web_server or is_web_client: try: exteneded_key_usage = cert.extensions.get_extension_for_oid(ExtensionOID.EXTENDED_KEY_USAGE) exteneded_key_usage = exteneded_key_usage.value except x509.extensions.ExtensionNotFound: raise InvalidCertificate("Extended key usage not specified") if is_web_server: if ExtendedKeyUsageOID.SERVER_AUTH not in exteneded_key_usage: raise InvalidCertificate("Not intented for TLS Web Server Authentication") if is_web_client: if ExtendedKeyUsageOID.CLIENT_AUTH not in exteneded_key_usage: raise InvalidCertificate("Not intented for TLS Web Client Authentication")
def list_hosts(self): hosts = {} for csr_file in os.listdir(self.csr_path): with open(os.path.join(self.csr_path, csr_file), 'rb') as f: csr = x509.load_pem_x509_csr(f.read(), default_backend()) hosts[csr.subject.get_attributes_for_oid(NameOID.COMMON_NAME)[0].value] = { 'key_fingerprint': rsa_key_fingerprint(csr.public_key()), 'cert_fingerprint': None, 'status': 'pending', } for crt_file in os.listdir(self.crt_path): with open(os.path.join(self.crt_path, crt_file), 'rb') as f: crt = x509.load_pem_x509_certificate(f.read(), default_backend()) revoked = revoked_cert(crt, self.crl) if revoked: status = 'revoked' else: status = 'authorized' hosts[crt.subject.get_attributes_for_oid(NameOID.COMMON_NAME)[0].value] = { 'key_fingerprint': rsa_key_fingerprint(crt.public_key()), 'cert_fingerprint': x509_cert_fingerprint(crt), 'status': status, } return hosts
def loadCert(pem): """ Creates a cryptography certificate object from the given PEM certificate. :param pem: A certificate as a PEM string. :return: A cryptography certificate object. """ return x509.load_pem_x509_certificate(pem.encode("utf-8"), default_backend())
def __init__(self, local_cert, priv_key, ca_cert, controller_name_re): """Initialize JWTUtils Load the local node certificate, the node private key and the CA certificate from files; prepare for both signing and validation of key-value pairs. Signing will take place with the local certificate, and the public half will be added to signed objects. Validation will take place with the CA certificate, along with other checks that the signing matches the payload. :param local_cert: file containing public half of the local key :param priv_key: file containing private half of the local key :param ca_cert: file containing CA root certificate raise: IOError if the files cannot be read. """ priv_key_pem = self._get_crypto_material(priv_key) self.private_key = serialization.load_pem_private_key( priv_key_pem, password=None, backend=default_backend()) self.node_certificate = self._get_crypto_material(local_cert) self.node_cert_obj = load_pem_x509_certificate( self.node_certificate, default_backend()) self.node_cert_pem = self.node_cert_obj.public_bytes( serialization.Encoding.PEM) ca_certificate = self._get_crypto_material(ca_cert) # pyopenssl root_ca = crypto.load_certificate(crypto.FILETYPE_PEM, ca_certificate) self.store = crypto.X509Store() self.store.add_cert(root_ca) self.controller_name_re = controller_name_re
def generate_x509_fingerprint(pem_key): try: if isinstance(pem_key, six.text_type): pem_key = pem_key.encode('utf-8') cert = x509.load_pem_x509_certificate( pem_key, backends.default_backend()) raw_fp = binascii.hexlify(cert.fingerprint(hashes.SHA1())) if six.PY3: raw_fp = raw_fp.decode('ascii') return ':'.join(a + b for a, b in zip(raw_fp[::2], raw_fp[1::2])) except (ValueError, TypeError, binascii.Error) as ex: raise exception.InvalidKeypair( reason=_('failed to generate X509 fingerprint. ' 'Error message: %s') % ex)
def get_google_public_cert_key(): r = requests.get(GOOGLE_PUBLIC_CERT_URL) r.raise_for_status() # Load the certificate. certificate = x509.load_pem_x509_certificate( r.text.encode('utf-8'), default_backend()) # Get the certicate's public key. public_key = certificate.public_key() return public_key
def deserialize(self, data): cert = x509.load_pem_x509_certificate(data, default_backend()) key_material = cert.public_key().public_numbers() self._e = key_material.e self._n = key_material.n
def _verify(self): ''' Check, if message is wellsigned ''' try: # canonize soap body a make sha256 digest body_c14n = etree.tostring(self.body, method='c14n', exclusive=True, with_comments=False) sha256 = hashlib.sha256(body_c14n) digest = b64encode(sha256.digest()) # load cert options cert = self.root.find('.//wsse:BinarySecurityToken', namespaces=NSMAP) sig_info = self.root.find('.//ds:SignedInfo', namespaces=NSMAP) sig_value = self.root.find('.//ds:SignatureValue', namespaces=NSMAP) # check, if there is all nesesery data assert cert is not None assert sig_info is not None assert sig_value is not None # canonize signature info sig_info_c14n = etree.tostring(sig_info, method='c14n', exclusive=True, with_comments=False) # transform and load cert cert = '\n'.join(['-----BEGIN CERTIFICATE-----'] + textwrap.wrap(cert.text, 64) + ['-----END CERTIFICATE-----\n']) cert = load_pem_x509_certificate(cert.encode('utf-8'), default_backend()) key = cert.public_key() # verify digest verifier = key.verifier(b64decode(sig_value.text), padding.PKCS1v15(), hashes.SHA256()) verifier.update(sig_info_c14n) # if verify fail, raise exception verifier.verify() return True except Exception as e: logger.exception(e) # probably error, return false return False
def test_basics(): ca = CA() today = datetime.datetime.today() assert b"BEGIN CERTIFICATE" in ca.cert_pem.bytes() ca_cert = x509.load_pem_x509_certificate( ca.cert_pem.bytes(), default_backend()) assert ca_cert.not_valid_before <= today <= ca_cert.not_valid_after assert ca_cert.issuer == ca_cert.subject bc = ca_cert.extensions.get_extension_for_class(x509.BasicConstraints) assert bc.value.ca == True assert bc.critical == True with pytest.raises(ValueError): ca.issue_server_cert() server = ca.issue_server_cert(u"test-1.example.org", u"test-2.example.org") assert b"PRIVATE KEY" in server.private_key_pem.bytes() assert b"BEGIN CERTIFICATE" in server.cert_chain_pems[0].bytes() assert len(server.cert_chain_pems) == 1 assert server.private_key_pem.bytes() in server.private_key_and_cert_chain_pem.bytes() for blob in server.cert_chain_pems: assert blob.bytes() in server.private_key_and_cert_chain_pem.bytes() server_cert = x509.load_pem_x509_certificate( server.cert_chain_pems[0].bytes(), default_backend()) assert server_cert.not_valid_before <= today <= server_cert.not_valid_after assert server_cert.issuer == ca_cert.subject san = server_cert.extensions.get_extension_for_class(x509.SubjectAlternativeName) hostnames = san.value.get_values_for_type(x509.DNSName) assert hostnames == [u"test-1.example.org", u"test-2.example.org"]
def __load_cert_from_file(filename): with open(filename, "br") as x509_file: crl = load_pem_x509_certificate(x509_file.read(), default_backend()) rsa = crl.public_key() return rsa
def extract_rsa_key_from_x509_cert(pem): cert = load_pem_x509_certificate(pem, backend) return cert.public_key()
def from_primitive(cls, pem_string, session=None): result = cls(session=session) try: return result.from_raw_key(x509.load_pem_x509_certificate( utils.SmartStr(pem_string), backend=openssl.backend)) except (TypeError, ValueError, exceptions.UnsupportedAlgorithm) as e: raise CipherError("X509 Certificate invalid: %s" % e) return result
def _match_certificate(matcher): return MatchesAny( Not(IsInstance(Certificate)), AfterPreprocessing( lambda c: x509.load_pem_x509_certificate( c.as_bytes(), default_backend()), matcher))
def load(cls, data=None, filename=None): if filename is not None: with open(filename, 'rb') as fh: data = fh.read() return Certificate( x509.load_pem_x509_certificate(data, default_backend()) )
def test_self_signed_cert(self): priv = PrivateKey.create() cert = priv.self_signed_cert({'common_name': u'jose'}) assert isinstance(cert, Certificate) assert cert.subject.attribs['common_name'] == u'jose' assert cert.issuer.attribs['common_name'] == u'jose' cert = x509.load_pem_x509_certificate( cert.dump(), default_backend() ) assert cert.subject.get_attributes_for_oid( NameOID.COMMON_NAME)[0].value == u'jose' assert cert.issuer.get_attributes_for_oid( NameOID.COMMON_NAME)[0].value == u'jose'
def loadPEMCert(pathToCert): with open(pathToCert, 'rb') as f: crt_data = f.read() cert=x509.load_pem_x509_certificate(crt_data,default_backend()) return cert
def __string_to_cert(s, pkg_hash=None): """Convert a string to a X509 cert.""" try: return x509.load_pem_x509_certificate( misc.force_bytes(s), default_backend()) except ValueError: if pkg_hash is not None: raise api_errors.BadFileFormat(_("The file " "with hash {0} was expected to be a PEM " "certificate but it could not be " "read.").format(pkg_hash)) raise api_errors.BadFileFormat(_("The following string " "was expected to be a PEM certificate, but it " "could not be parsed as such:\n{0}".format(s)))
def __get_certs_by_name(self, name): """Given 'name', a Cryptograhy 'Name' object, return the certs with that name as a subject.""" res = [] count = 0 name_hsh = hashlib.sha1(misc.force_bytes(name)).hexdigest() def load_cert(pth): with open(pth, "rb") as f: return x509.load_pem_x509_certificate( f.read(), default_backend()) try: while True: pth = os.path.join(self.__subj_root, "{0}.{1}".format(name_hsh, count)) res.append(load_cert(pth)) count += 1 except EnvironmentError as e: # When switching to a different hash algorithm, the hash # name of file changes so that we couldn't find the # file. We try harder to rebuild the subject's metadata # if it's the first time we fail (count == 0). if count == 0 and e.errno == errno.ENOENT: self.__rebuild_subj_root() try: res.append(load_cert(pth)) except EnvironmentError as ex: if ex.errno != errno.ENOENT: raise t = api_errors._convert_error(e, [errno.ENOENT]) if t: raise t res.extend(self.__issuers.get(name_hsh, [])) return res
def calc_pem_hash(pth): """Find the hash of pem representation the file.""" with open(pth, "rb") as f: cert = x509.load_pem_x509_certificate( f.read(), default_backend()) return hashlib.sha1( cert.public_bytes(serialization.Encoding.PEM)).hexdigest()
def __make_tmp_cert(d, pth): try: with open(pth, "rb") as f: cert = x509.load_pem_x509_certificate(f.read(), default_backend()) except (ValueError, IOError) as e: raise api_errors.BadFileFormat(_("The file {0} was expected to " "be a PEM certificate but it could not be read.").format( pth)) fd, fp = tempfile.mkstemp(dir=d) with os.fdopen(fd, "wb") as fh: fh.write(cert.public_bytes(serialization.Encoding.PEM)) return fp
def getCertFromBytes(pem_data): return x509.load_pem_x509_certificate(pem_data, backend)
def _load_from_string(cls, certificate): """ Load a certificate from a string. Args: certificate (str): A base64 PEM encoded certificate """ certificate = certificate.encode() try: cert_obj = load_pem_x509_certificate(certificate, backend) cls._public_keys.append(cert_obj.public_key()) cls._key_age = datetime.now() except ValueError: raise ImproperlyConfigured("Invalid ADFS token signing certificate")
def test_generate_wildcard_pem_bytes(): """ When we generate a self-signed wildcard certificate's PEM data, that data should be deserializable and the deserilized certificate should have the expected parameters. """ pem_bytes = generate_wildcard_pem_bytes() # Parse the concatenated bytes into a list of object pem_objects = pem.parse(pem_bytes) assert_that(pem_objects, HasLength(2)) # Deserialize the private key and assert that it is the right type (the # other details we trust txacme with) key = serialization.load_pem_private_key( pem_objects[0].as_bytes(), password=None, backend=default_backend() ) assert_that(key, IsInstance(rsa.RSAPrivateKey)) # Deserialize the certificate and validate all the options we set cert = x509.load_pem_x509_certificate( pem_objects[1].as_bytes(), backend=default_backend() ) expected_before = datetime.today() - timedelta(days=1) expected_after = datetime.now() + timedelta(days=3650) assert_that(cert, MatchesStructure( issuer=MatchesListwise([ MatchesStructure(value=Equals(u'*')) ]), subject=MatchesListwise([ MatchesStructure(value=Equals(u'*')) ]), not_valid_before=matches_time_or_just_before(expected_before), not_valid_after=matches_time_or_just_before(expected_after), signature_hash_algorithm=IsInstance(hashes.SHA256) )) assert_that(cert.public_key().public_numbers(), Equals( key.public_key().public_numbers())) # From txacme
def verify_and_load_message(config, file, only_accept_cn=None): '''Checks the message's signature to make sure it was signed by someone in our CA chain''' try: # We need a temporary file to get the signer PEM msg_fd, signer_pem = tempfile.mkstemp() os.close(msg_fd) # Don't need to write anything to it # CApath is required to force out any system CAs that might be in the global CNF file ossl_verify_cmd = ["openssl", "smime", "-verify", "-in", file, "-CAfile", config.ssl_cafile, "-CApath", "/dev/nonexistant-dir", "-text", "-signer", signer_pem] ossl_verify_proc = subprocess.run( args=ossl_verify_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=False) if ossl_verify_proc.returncode != 0: config.logger.warn( "rejecting %s: %s", file, str(ossl_verify_proc.stderr)) return None config.logger.info("%s passed openssl S/MIME verify", file) config.logger.debug("checking %s", signer_pem) with open(signer_pem, 'rb') as x509_signer: # NOW we can use cryptography to read the x509 certificates cert = x509.load_pem_x509_certificate( x509_signer.read(), default_backend()) common_name = cert.subject.get_attributes_for_oid( x509.NameOID.COMMON_NAME)[0].value config.logger.debug( "signed message came from common name: %s", common_name) # Make sure the CN is something we're willing to accept if only_accept_cn is not None: if common_name != only_accept_cn: config.logger.error("rejecting message due to CN %s != %s", common_name, only_accept_cn) # Read it in decoded_message = ossl_verify_proc.stdout message = IngestMessage(config) message.load_from_yaml(decoded_message) return message # and clean up after ourselves finally: os.remove(signer_pem)
def check_certificate(self, domain): """ Download and get information from the TLS certificate """ pem = ssl.get_server_certificate((domain, 443)) if self.output: with open(os.path.join(self.output, 'cert.pem'), 'wb') as f: f.write(pem) cert = x509.load_pem_x509_certificate(str(pem), default_backend()) self.log.critical("\tCertificate:") self.log.critical("\t\tDomain: %s", ",".join(map(lambda x: x.value, cert.subject))) self.log.critical("\t\tNot After: %s", str(cert.not_valid_after)) self.log.critical("\t\tNot Before: %s", str(cert.not_valid_before)) self.log.critical("\t\tCA Issuer: %s", ", ".join(map(lambda x:x.value, cert.issuer))) self.log.critical("\t\tSerial: %s", cert.serial_number) for ext in cert.extensions: if ext.oid._name == 'basicConstraints': if ext.value.ca: self.log.critical("\t\tBasic Constraints: True") elif ext.oid._name == 'subjectAltName': self.log.critical("\t\tAlternate names: %s", ", ".join(ext.value.get_values_for_type(x509.DNSName)))
def issue_certificate(cn, ca_cert, ca_key, organizations=(), san_dns=(), san_ips=(), key_size=2048, certify_days=365, is_web_server=False, is_web_client=False): ca_cert = x509.load_pem_x509_certificate(ca_cert, default_backend()) ca_key = serialization.load_pem_private_key(ca_key, password=None, backend=default_backend()) ca_key_id = x509.SubjectKeyIdentifier.from_public_key(ca_key.public_key()) key = rsa.generate_private_key(public_exponent=65537, key_size=key_size, backend=default_backend()) subject_name_attributes = [x509.NameAttribute(NameOID.COMMON_NAME, cn)] subject_name_attributes += [x509.NameAttribute(NameOID.ORGANIZATION_NAME, org) for org in organizations] subject = x509.Name(subject_name_attributes) now = datetime.datetime.utcnow() cert = x509.CertificateBuilder() \ .subject_name(subject) \ .issuer_name(ca_cert.issuer) \ .public_key(key.public_key()) \ .serial_number(x509.random_serial_number()) \ .not_valid_before(now) \ .not_valid_after(now + datetime.timedelta(days=certify_days)) \ .add_extension(x509.AuthorityKeyIdentifier(ca_key_id.digest, [x509.DirectoryName(ca_cert.issuer)], ca_cert.serial_number), critical=False) \ .add_extension(x509.KeyUsage(digital_signature=True, content_commitment=False, key_encipherment=True, data_encipherment=False, key_agreement=False, key_cert_sign=False, crl_sign=False, encipher_only=False, decipher_only=False), critical=True) extended_usages = [] if is_web_server: extended_usages.append(ExtendedKeyUsageOID.SERVER_AUTH) if is_web_client: extended_usages.append(ExtendedKeyUsageOID.CLIENT_AUTH) if extended_usages: cert = cert.add_extension(x509.ExtendedKeyUsage(extended_usages), critical=False) sans = [x509.DNSName(name) for name in san_dns] sans += [x509.IPAddress(ipaddress.ip_address(ip)) for ip in san_ips] if sans: cert = cert.add_extension(x509.SubjectAlternativeName(sans), critical=False) cert = cert.sign(ca_key, hashes.SHA256(), default_backend()) cert = cert.public_bytes(serialization.Encoding.PEM) key = key.private_bytes(encoding=serialization.Encoding.PEM, format=serialization.PrivateFormat.TraditionalOpenSSL, encryption_algorithm=serialization.NoEncryption()) return cert, key