我们从Python开源项目中,提取了以下23个代码示例,用于说明如何使用cryptography.x509.KeyUsage()。
def _decode_key_usage(backend, bit_string): bit_string = backend._ffi.cast("ASN1_BIT_STRING *", bit_string) bit_string = backend._ffi.gc(bit_string, backend._lib.ASN1_BIT_STRING_free) get_bit = backend._lib.ASN1_BIT_STRING_get_bit digital_signature = get_bit(bit_string, 0) == 1 content_commitment = get_bit(bit_string, 1) == 1 key_encipherment = get_bit(bit_string, 2) == 1 data_encipherment = get_bit(bit_string, 3) == 1 key_agreement = get_bit(bit_string, 4) == 1 key_cert_sign = get_bit(bit_string, 5) == 1 crl_sign = get_bit(bit_string, 6) == 1 encipher_only = get_bit(bit_string, 7) == 1 decipher_only = get_bit(bit_string, 8) == 1 return x509.KeyUsage( digital_signature, content_commitment, key_encipherment, data_encipherment, key_agreement, key_cert_sign, crl_sign, encipher_only, decipher_only )
def check_ca(cert): """Check if 'cert' is a proper CA. For this the BasicConstraints need to identify it as a CA cert and it needs to have the CertSign (key_cert_sign in Cryptography) KeyUsage flag. Based loosely on OpenSSL's check_ca()""" from cryptography import x509 bconst_ca = None kuse_sign = None for e in cert.extensions: if isinstance(e.value, x509.BasicConstraints): bconst_ca = e.value.ca elif isinstance(e.value, x509.KeyUsage): kuse_sign = e.value.key_cert_sign return kuse_sign is not False and bconst_ca
def create_ca_certificate(cn, key_size=4096, certify_days=365): key = rsa.generate_private_key(public_exponent=65537, key_size=key_size, backend=default_backend()) key_id = x509.SubjectKeyIdentifier.from_public_key(key.public_key()) subject = issuer = x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, cn)]) now = datetime.datetime.utcnow() serial = x509.random_serial_number() cert = x509.CertificateBuilder() \ .subject_name(subject) \ .issuer_name(issuer) \ .public_key(key.public_key()) \ .serial_number(serial) \ .not_valid_before(now) \ .not_valid_after(now + datetime.timedelta(days=certify_days)) \ .add_extension(key_id, critical=False) \ .add_extension(x509.AuthorityKeyIdentifier(key_id.digest, [x509.DirectoryName(issuer)], serial), critical=False) \ .add_extension(x509.BasicConstraints(ca=True, path_length=0), critical=True) \ .add_extension(x509.KeyUsage(digital_signature=True, content_commitment=False, key_encipherment=False, data_encipherment=False, key_agreement=False, key_cert_sign=True, crl_sign=True, encipher_only=False, decipher_only=False), critical=True) \ .sign(key, hashes.SHA256(), default_backend()) cert = cert.public_bytes(serialization.Encoding.PEM) key = key.private_bytes(encoding=serialization.Encoding.PEM, format=serialization.PrivateFormat.TraditionalOpenSSL, encryption_algorithm=serialization.NoEncryption()) return cert, key
def sign_certificate_request(csr_file, crt_file, ca_crt, ca_pkey): with open(csr_file, 'rb') as f: csr = x509.load_pem_x509_csr(data=f.read(), backend=default_backend()) crt = x509.CertificateBuilder().subject_name( csr.subject ).issuer_name( ca_crt.subject ).public_key( csr.public_key() ).serial_number( uuid.uuid4().int # pylint: disable=no-member ).not_valid_before( datetime.datetime.utcnow() ).not_valid_after( datetime.datetime.utcnow() + datetime.timedelta(days=365 * 10) ).add_extension( extension=x509.KeyUsage( digital_signature=True, key_encipherment=True, content_commitment=True, data_encipherment=False, key_agreement=False, encipher_only=False, decipher_only=False, key_cert_sign=False, crl_sign=False ), critical=True ).add_extension( extension=x509.BasicConstraints(ca=False, path_length=None), critical=True ).add_extension( extension=x509.AuthorityKeyIdentifier.from_issuer_public_key(ca_pkey.public_key()), critical=False ).sign( private_key=ca_pkey, algorithm=hashes.SHA256(), backend=default_backend() ) with open(crt_file, 'wb') as f: f.write(crt.public_bytes(encoding=serialization.Encoding.PEM))
def generate_ca_cert(self, cn, ou, o, expire_period=None, password=None): """CA ??? ?? Peer ??? ?? ?? ???(ECC Key) :param cn: ?? CommonName :param ou: ?? OrganizationalUnitName :param o: ?? OrganizationName :param expire_period: ??? ????(year) :param password: ??? ??? ????(8?? ??) """ sign_pri_key = ec.generate_private_key(ec.SECP256K1(), default_backend()) sign_pub_key = sign_pri_key.public_key() subject_name = x509.Name([ x509.NameAttribute(NameOID.COMMON_NAME, cn), x509.NameAttribute(NameOID.ORGANIZATIONAL_UNIT_NAME, ou), x509.NameAttribute(NameOID.ORGANIZATION_NAME, o), x509.NameAttribute(NameOID.COUNTRY_NAME, "kr") ]) serial_number = self.__LAST_CA_INDEX + 1 key_usage = x509.KeyUsage(digital_signature=True, content_commitment=False, key_encipherment=True, data_encipherment=False, key_agreement=False, key_cert_sign=True, crl_sign=False, encipher_only=False, decipher_only=False) if expire_period is None: expire_period = self.__ca_expired new_cert = self.__generate_cert(pub_key=sign_pub_key, subject_name=subject_name, issuer_name=subject_name, serial_number=serial_number, expire_period=expire_period, key_usage=key_usage, issuer_priv=sign_pri_key) cert_pem = new_cert.public_bytes(encoding=serialization.Encoding.PEM) if password is None: pri_pem = sign_pri_key.private_bytes( encoding=serialization.Encoding.PEM, format=serialization.PrivateFormat.PKCS8, encryption_algorithm=serialization.NoEncryption() ) else: pri_pem = sign_pri_key.private_bytes( encoding=serialization.Encoding.DER, format=serialization.PrivateFormat.PKCS8, encryption_algorithm=serialization.BestAvailableEncryption(password=password) ) self.__save(self.__CA_PATH, cert_pem, pri_pem) self.__LAST_CA_INDEX += 1 self.__show_certificate(new_cert)
def generate_peer_cert(self, cn, password=None): """Peer ??? ?? ???/???? ???(ECC Key), ????? 1? :param cn: ?? CommonName :param password: ??? ??? ????(8?? ??) """ pri_key = ec.generate_private_key(ec.SECP256K1(), default_backend()) pub_key = pri_key.public_key() issuer_name = self.__ca_cert.issuer ou = issuer_name.get_attributes_for_oid(NameOID.ORGANIZATIONAL_UNIT_NAME)[0].value o = issuer_name.get_attributes_for_oid(NameOID.ORGANIZATION_NAME)[0].value subject_name = x509.Name([ x509.NameAttribute(NameOID.COMMON_NAME, cn), x509.NameAttribute(NameOID.ORGANIZATIONAL_UNIT_NAME, ou), x509.NameAttribute(NameOID.ORGANIZATION_NAME, o), x509.NameAttribute(NameOID.COUNTRY_NAME, "kr") ]) expire_period = self.__peer_expired serial_number = self.__LAST_PEER_INDEX + 1 key_usage = x509.KeyUsage(digital_signature=True, content_commitment=False, key_encipherment=True, data_encipherment=False, key_agreement=False, key_cert_sign=False, crl_sign=False, encipher_only=False, decipher_only=False) new_cert = self.__generate_cert(pub_key=pub_key, subject_name=subject_name, issuer_name=issuer_name, serial_number=serial_number, expire_period=expire_period, key_usage=key_usage, issuer_priv=self.__ca_pri, issuer_cert=self.__ca_cert) cert_pem = new_cert.public_bytes(encoding=serialization.Encoding.PEM) if password is None: pri_pem = pri_key.private_bytes( encoding=serialization.Encoding.PEM, format=serialization.PrivateFormat.PKCS8, encryption_algorithm=serialization.NoEncryption() ) else: pri_pem = pri_key.private_bytes( encoding=serialization.Encoding.PEM, format=serialization.PrivateFormat.PKCS8, encryption_algorithm=serialization.BestAvailableEncryption(password=password) ) ca_cert_pem = self.__ca_cert.public_bytes(encoding=serialization.Encoding.PEM) peer_path = join(self.__DEFAULT_PATH, cn) self.__save(peer_path, cert_bytes=cert_pem, pri_bytes=pri_pem, ca_cert=ca_cert_pem) # ???? ?? self.__load_peer_certificate(cert_bytes=cert_pem) self.__show_certificate(new_cert)
def issue_certificate(cn, ca_cert, ca_key, organizations=(), san_dns=(), san_ips=(), key_size=2048, certify_days=365, is_web_server=False, is_web_client=False): ca_cert = x509.load_pem_x509_certificate(ca_cert, default_backend()) ca_key = serialization.load_pem_private_key(ca_key, password=None, backend=default_backend()) ca_key_id = x509.SubjectKeyIdentifier.from_public_key(ca_key.public_key()) key = rsa.generate_private_key(public_exponent=65537, key_size=key_size, backend=default_backend()) subject_name_attributes = [x509.NameAttribute(NameOID.COMMON_NAME, cn)] subject_name_attributes += [x509.NameAttribute(NameOID.ORGANIZATION_NAME, org) for org in organizations] subject = x509.Name(subject_name_attributes) now = datetime.datetime.utcnow() cert = x509.CertificateBuilder() \ .subject_name(subject) \ .issuer_name(ca_cert.issuer) \ .public_key(key.public_key()) \ .serial_number(x509.random_serial_number()) \ .not_valid_before(now) \ .not_valid_after(now + datetime.timedelta(days=certify_days)) \ .add_extension(x509.AuthorityKeyIdentifier(ca_key_id.digest, [x509.DirectoryName(ca_cert.issuer)], ca_cert.serial_number), critical=False) \ .add_extension(x509.KeyUsage(digital_signature=True, content_commitment=False, key_encipherment=True, data_encipherment=False, key_agreement=False, key_cert_sign=False, crl_sign=False, encipher_only=False, decipher_only=False), critical=True) extended_usages = [] if is_web_server: extended_usages.append(ExtendedKeyUsageOID.SERVER_AUTH) if is_web_client: extended_usages.append(ExtendedKeyUsageOID.CLIENT_AUTH) if extended_usages: cert = cert.add_extension(x509.ExtendedKeyUsage(extended_usages), critical=False) sans = [x509.DNSName(name) for name in san_dns] sans += [x509.IPAddress(ipaddress.ip_address(ip)) for ip in san_ips] if sans: cert = cert.add_extension(x509.SubjectAlternativeName(sans), critical=False) cert = cert.sign(ca_key, hashes.SHA256(), default_backend()) cert = cert.public_bytes(serialization.Encoding.PEM) key = key.private_bytes(encoding=serialization.Encoding.PEM, format=serialization.PrivateFormat.TraditionalOpenSSL, encryption_algorithm=serialization.NoEncryption()) return cert, key
def load_or_create_ca_certificate(crt_file, subject, pkey): """ Load a CA certificate or create a self-signed one """ if os.path.isfile(crt_file): with open(crt_file, 'rb') as f: crt = x509.load_pem_x509_certificate( data=f.read(), backend=default_backend() ) else: issuer = subject crt = x509.CertificateBuilder().subject_name( subject ).issuer_name( issuer ).public_key( pkey.public_key() ).serial_number( uuid.uuid4().int # pylint: disable=no-member ).not_valid_before( datetime.datetime.utcnow() ).not_valid_after( datetime.datetime.utcnow() + datetime.timedelta(days=365 * 10) ).add_extension( extension=x509.KeyUsage( digital_signature=True, key_encipherment=True, key_cert_sign=True, crl_sign=True, content_commitment=True, data_encipherment=False, key_agreement=False, encipher_only=False, decipher_only=False ), critical=True ).add_extension( extension=x509.BasicConstraints(ca=True, path_length=0), critical=True ).add_extension( extension=x509.SubjectKeyIdentifier.from_public_key(pkey.public_key()), critical=True ).add_extension( extension=x509.AuthorityKeyIdentifier.from_issuer_public_key(pkey.public_key()), critical=True ).sign( private_key=pkey, algorithm=hashes.SHA256(), backend=default_backend() ) with open(crt_file, 'wb') as f: f.write(crt.public_bytes(encoding=serialization.Encoding.PEM)) return crt
def __check_extensions(self, cert, usages, cur_pathlen): """Check whether the critical extensions in this certificate are supported and allow the provided use(s).""" try: exts = cert.extensions except (ValueError, x509.UnsupportedExtension) as e: raise api_errors.InvalidCertificateExtensions( cert, e) for ext in exts: etype = type(ext.value) if etype in SUPPORTED_EXTENSIONS: keys = EXTENSIONS_VALUES[etype] if etype == x509.BasicConstraints: pathlen = ext.value.path_length if pathlen is not None and \ cur_pathlen > pathlen: raise api_errors.PathlenTooShort(cert, cur_pathlen, pathlen) elif etype == x509.KeyUsage: keys = list(EXTENSIONS_VALUES[etype]) if not getattr(ext.value, "key_agreement"): # Cryptography error: # encipher_only/decipher_only is # undefined unless key_agreement # is true keys.remove("encipher_only") keys.remove("decipher_only") vs = [ key for key in keys if getattr(ext.value, key) ] # For each use, check to see whether it's # permitted by the certificate's extension # values. if etype not in usages: continue for u in usages[etype]: if u not in vs: raise api_errors.InappropriateCertificateUse( cert, ext, u, ", ".join(vs)) # If the extension name is unrecognized and critical, # then the chain cannot be verified. elif ext.critical: raise api_errors.UnsupportedCriticalExtension( cert, ext)