我们从Python开源项目中,提取了以下18个代码示例,用于说明如何使用cryptography.hazmat.primitives.serialization.BestAvailableEncryption()。
def test_rsa(self): from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives.asymmetric import rsa from cryptography.hazmat.primitives import serialization private_key = rsa.generate_private_key( public_exponent=65537, key_size=2048, backend=default_backend() ) private_key_pem = private_key.private_bytes( encoding=serialization.Encoding.PEM, format=serialization.PrivateFormat.PKCS8, encryption_algorithm=serialization.BestAvailableEncryption(passphrase) ) public_key_pem = private_key.public_key().public_bytes( encoding=serialization.Encoding.PEM, format=serialization.PublicFormat.SubjectPublicKeyInfo ) url = 'http://example.com/path?query#fragment' auth = HTTPSignatureAuth(algorithm="rsa-sha256", key=private_key_pem, key_id="sekret", passphrase=passphrase) self.session.get(url, auth=auth, headers=dict(pubkey=base64.b64encode(public_key_pem)))
def generate_keys(self): private_key = rsa.generate_private_key( public_exponent=65537, key_size=2048, backend=default_backend() ) private_pem = private_key.private_bytes( encoding=serialization.Encoding.PEM, format=serialization.PrivateFormat.PKCS8, encryption_algorithm=serialization.BestAvailableEncryption(bytes(CloudLinkSettings.PRIVATE_KEY_PASSPHRASE)) ) public_pem = private_key.public_key().public_bytes( encoding=serialization.Encoding.PEM, format=serialization.PublicFormat.SubjectPublicKeyInfo ) with open(CloudLinkSettings.PRIVATE_KEY_LOCATION, 'w') as key_file: key_file.write(str(private_pem)) with open(CloudLinkSettings.PUBLIC_KEY_LOCATION, 'w') as key_file: key_file.write(str(public_pem))
def __save(self, private_key, file_name): if not isinstance(private_key, RSAPrivateKey): raise TypeError("private_key must be an instance of RSAPrivateKey, " "actual: {}".format(type(private_key).__name__)) pem = private_key.private_bytes( serialization.Encoding.PEM, serialization.PrivateFormat.PKCS8, serialization.BestAvailableEncryption(self.__key_store_passphrase)) file_path = os.path.join(self.key_store_path, file_name) if not os.path.isdir(self.key_store_path): os.makedirs(self.key_store_path) if os.path.isfile(file_path): msg = "Save failed: A key with name [{}] already exists".format( file_name) raise IOError(msg) with open(file_path, 'w') as f: f.write(pem.decode(encoding='utf8'))
def set_key(self): password = settings.ACCOUNT_KEY_PASSWORD.encode() key = rsa.generate_private_key(public_exponent=65537, key_size=settings.BITS, backend=default_backend()) self.key = key.private_bytes(encoding=serialization.Encoding.PEM, format=serialization.PrivateFormat.TraditionalOpenSSL, encryption_algorithm=serialization.BestAvailableEncryption(password))
def export(self, password: str): return self._hazmat_private_key.private_bytes( encoding=serialization.Encoding.PEM, format=serialization.PrivateFormat.PKCS8, encryption_algorithm=serialization.BestAvailableEncryption(password.encode('utf-8')) )
def main(argv): if len(argv) > 0: password = argv[0].encode() else: password = b'test' private_key = ec.generate_private_key(ec.SECP256K1(), default_backend()) serialized_private = private_key.private_bytes( encoding=serialization.Encoding.DER, format=serialization.PrivateFormat.PKCS8, encryption_algorithm=serialization.BestAvailableEncryption(password) ) serialized_public = private_key.public_key().public_bytes( encoding=serialization.Encoding.DER, format=serialization.PublicFormat.SubjectPublicKeyInfo ) private_key = serialization.load_der_private_key(serialized_private, password, default_backend()) with open('resources/default_pki/private.der', 'wb') as private_file: private_file.write(serialized_private) with open('resources/default_pki/public.der', 'wb') as public_file: public_file.write(serialized_public) print('save pki in : resources/default_pki/')
def ask_for_password_or_no_crypto(key_path): # we can't use prompt's "default" and "value_proc" arguments because we monkeypatch prompt in test_wile.py password = click.prompt('(optional) Password for %s' % key_path, default='', hide_input=True, confirmation_prompt=True, show_default=False) if password: return serialization.BestAvailableEncryption(password.encode('utf-8')) else: return serialization.NoEncryption()
def gen(bits=1024, password=None): """ ??SSH RSA??? :?? bits: ?????????: 1024 :?? password: ?????????: None (???) :??: RSA??, RSA?? """ rsaKey = rsa.generate_private_key( backend = default_backend(), public_exponent = 65537, key_size = bits ) privateKey = rsaKey.private_bytes( encoding = serialization.Encoding.PEM, format = serialization.PrivateFormat.TraditionalOpenSSL, encryption_algorithm = password and serialization.BestAvailableEncryption(str(password).encode("utf-8")) or serialization.NoEncryption() ).decode("utf-8") publicKey = rsaKey.public_key().public_bytes( encoding = serialization.Encoding.OpenSSH, format = serialization.PublicFormat.OpenSSH ).decode("utf-8") return privateKey, publicKey
def _write_private_key(self, f, key, format, password=None): if password is None: encryption = serialization.NoEncryption() else: encryption = serialization.BestAvailableEncryption(b(password)) f.write(key.private_bytes( serialization.Encoding.PEM, format, encryption ).decode())
def keygen(pub_key, priv_key, user_id, priv_key_password=None): """Generate new private key and certificate RSA_SHA512_4096""" # Generate our key key = rsa.generate_private_key(public_exponent=65537, key_size=4096, backend=default_backend()) if priv_key_password: ea = serialization.BestAvailableEncryption(priv_key_password) else: ea = serialization.NoEncryption() # Write our key to disk for safe keeping with open(priv_key, "wb") as f: f.write(key.private_bytes( encoding=serialization.Encoding.PEM, format=serialization.PrivateFormat.TraditionalOpenSSL, encryption_algorithm=ea, )) # Various details about who we are. For a self-signed certificate the # subject and issuer are always the same. subject = issuer = x509.Name([ x509.NameAttribute(NameOID.COUNTRY_NAME, "XX"), x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, "XX"), x509.NameAttribute(NameOID.LOCALITY_NAME, "XX"), x509.NameAttribute(NameOID.ORGANIZATION_NAME, "I2P Anonymous Network"), x509.NameAttribute(NameOID.ORGANIZATIONAL_UNIT_NAME, "I2P"), x509.NameAttribute(NameOID.COMMON_NAME, user_id), ]) cert = x509.CertificateBuilder() \ .subject_name(subject) \ .issuer_name(issuer) \ .public_key(key.public_key()) \ .not_valid_before(datetime.datetime.utcnow()) \ .not_valid_after( datetime.datetime.utcnow() + datetime.timedelta(days=365*10) ) \ .serial_number(random.randrange(1000000000, 2000000000)) \ .add_extension( x509.SubjectKeyIdentifier.from_public_key(key.public_key()), critical=False, ).sign(key, hashes.SHA512(), default_backend()) with open(pub_key, "wb") as f: f.write(cert.public_bytes(serialization.Encoding.PEM))
def _private_key_bytes(self, encoding, format, encryption_algorithm, traditional_write_func, evp_pkey, cdata): if not isinstance(encoding, serialization.Encoding): raise TypeError("encoding must be an item from the Encoding enum") if not isinstance(format, serialization.PrivateFormat): raise TypeError( "format must be an item from the PrivateFormat enum" ) # This is a temporary check until we land DER serialization. if encoding is not serialization.Encoding.PEM: raise ValueError("Only PEM encoding is supported by this backend") if format is serialization.PrivateFormat.PKCS8: write_bio = self._lib.PEM_write_bio_PKCS8PrivateKey key = evp_pkey elif format is serialization.PrivateFormat.TraditionalOpenSSL: write_bio = traditional_write_func key = cdata if not isinstance(encryption_algorithm, serialization.KeySerializationEncryption): raise TypeError( "Encryption algorithm must be a KeySerializationEncryption " "instance" ) if isinstance(encryption_algorithm, serialization.NoEncryption): password = b"" passlen = 0 evp_cipher = self._ffi.NULL elif isinstance(encryption_algorithm, serialization.BestAvailableEncryption): # This is a curated value that we will update over time. evp_cipher = self._lib.EVP_get_cipherbyname( b"aes-256-cbc" ) password = encryption_algorithm.password passlen = len(password) if passlen > 1023: raise ValueError( "Passwords longer than 1023 bytes are not supported by " "this backend" ) else: raise ValueError("Unsupported encryption type") bio = self._create_mem_bio() res = write_bio( bio, key, evp_cipher, password, passlen, self._ffi.NULL, self._ffi.NULL ) assert res == 1 return self._read_mem_bio(bio)
def test_ecc_key(self): """ ECC ??? ???? ??? ??, ECDSA ??/?? ??? """ logging.debug("----- ECDSA Test Start -----") # ?? ?? pri_key = ec.generate_private_key(ec.SECP256K1(), default_backend()) pub_key = pri_key.public_key() pri_der = pri_key.private_bytes( encoding=serialization.Encoding.DER, format=serialization.PrivateFormat.PKCS8, # encryption_algorithm=serialization.NoEncryption() encryption_algorithm=serialization.BestAvailableEncryption(password=b'qwer1234') ) pub_der = pub_key.public_bytes( encoding=serialization.Encoding.DER, format=serialization.PublicFormat.SubjectPublicKeyInfo ) pri_b64 = base64.b64encode(pri_der, altchars=None) pub_b64 = base64.b64encode(pub_der, altchars=None) logging.debug("Private Key : \n%s", pri_b64) logging.debug("Public Key : \n%s", pub_b64) # ??? ?? cert = self._generate_cert(pub_key=pub_key, issuer_key=pri_key, subject_name="test") cert_key = cert.public_key() # ECDSA ?? ?? ? ?? ??? data = b"test" signature = self._generate_sign(pri_key=pri_key, data=data) sign_b64 = base64.b64encode(signature, altchars=None) logging.debug("Sign : %s", sign_b64) validation_result = self._verify_signature(pub_key=cert_key, data=data, signature=signature) logging.debug("Verify : %s", validation_result) self.assertEqual(validation_result, True) # ECDSA ??? ???? ?? ?? signature = pri_key.sign( data, ec.ECDSA(hashes.SHA256()) ) validation_result = self._verify_signature(pub_key=cert_key, data=data, signature=signature) logging.debug("----- ECDSA Test End -----\n") self.assertTrue(validation_result)
def generate_ca_cert(self, cn, ou, o, expire_period=None, password=None): """CA ??? ?? Peer ??? ?? ?? ???(ECC Key) :param cn: ?? CommonName :param ou: ?? OrganizationalUnitName :param o: ?? OrganizationName :param expire_period: ??? ????(year) :param password: ??? ??? ????(8?? ??) """ sign_pri_key = ec.generate_private_key(ec.SECP256K1(), default_backend()) sign_pub_key = sign_pri_key.public_key() subject_name = x509.Name([ x509.NameAttribute(NameOID.COMMON_NAME, cn), x509.NameAttribute(NameOID.ORGANIZATIONAL_UNIT_NAME, ou), x509.NameAttribute(NameOID.ORGANIZATION_NAME, o), x509.NameAttribute(NameOID.COUNTRY_NAME, "kr") ]) serial_number = self.__LAST_CA_INDEX + 1 key_usage = x509.KeyUsage(digital_signature=True, content_commitment=False, key_encipherment=True, data_encipherment=False, key_agreement=False, key_cert_sign=True, crl_sign=False, encipher_only=False, decipher_only=False) if expire_period is None: expire_period = self.__ca_expired new_cert = self.__generate_cert(pub_key=sign_pub_key, subject_name=subject_name, issuer_name=subject_name, serial_number=serial_number, expire_period=expire_period, key_usage=key_usage, issuer_priv=sign_pri_key) cert_pem = new_cert.public_bytes(encoding=serialization.Encoding.PEM) if password is None: pri_pem = sign_pri_key.private_bytes( encoding=serialization.Encoding.PEM, format=serialization.PrivateFormat.PKCS8, encryption_algorithm=serialization.NoEncryption() ) else: pri_pem = sign_pri_key.private_bytes( encoding=serialization.Encoding.DER, format=serialization.PrivateFormat.PKCS8, encryption_algorithm=serialization.BestAvailableEncryption(password=password) ) self.__save(self.__CA_PATH, cert_pem, pri_pem) self.__LAST_CA_INDEX += 1 self.__show_certificate(new_cert)
def generate_peer_cert(self, cn, password=None): """Peer ??? ?? ???/???? ???(ECC Key), ????? 1? :param cn: ?? CommonName :param password: ??? ??? ????(8?? ??) """ pri_key = ec.generate_private_key(ec.SECP256K1(), default_backend()) pub_key = pri_key.public_key() issuer_name = self.__ca_cert.issuer ou = issuer_name.get_attributes_for_oid(NameOID.ORGANIZATIONAL_UNIT_NAME)[0].value o = issuer_name.get_attributes_for_oid(NameOID.ORGANIZATION_NAME)[0].value subject_name = x509.Name([ x509.NameAttribute(NameOID.COMMON_NAME, cn), x509.NameAttribute(NameOID.ORGANIZATIONAL_UNIT_NAME, ou), x509.NameAttribute(NameOID.ORGANIZATION_NAME, o), x509.NameAttribute(NameOID.COUNTRY_NAME, "kr") ]) expire_period = self.__peer_expired serial_number = self.__LAST_PEER_INDEX + 1 key_usage = x509.KeyUsage(digital_signature=True, content_commitment=False, key_encipherment=True, data_encipherment=False, key_agreement=False, key_cert_sign=False, crl_sign=False, encipher_only=False, decipher_only=False) new_cert = self.__generate_cert(pub_key=pub_key, subject_name=subject_name, issuer_name=issuer_name, serial_number=serial_number, expire_period=expire_period, key_usage=key_usage, issuer_priv=self.__ca_pri, issuer_cert=self.__ca_cert) cert_pem = new_cert.public_bytes(encoding=serialization.Encoding.PEM) if password is None: pri_pem = pri_key.private_bytes( encoding=serialization.Encoding.PEM, format=serialization.PrivateFormat.PKCS8, encryption_algorithm=serialization.NoEncryption() ) else: pri_pem = pri_key.private_bytes( encoding=serialization.Encoding.PEM, format=serialization.PrivateFormat.PKCS8, encryption_algorithm=serialization.BestAvailableEncryption(password=password) ) ca_cert_pem = self.__ca_cert.public_bytes(encoding=serialization.Encoding.PEM) peer_path = join(self.__DEFAULT_PATH, cn) self.__save(peer_path, cert_bytes=cert_pem, pri_bytes=pri_pem, ca_cert=ca_cert_pem) # ???? ?? self.__load_peer_certificate(cert_bytes=cert_pem) self.__show_certificate(new_cert)