我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用cryptography.hazmat.primitives.serialization.load_pem_public_key()。
def _set_encryptor(self, public_key): '''Creates a OAEP decryptor based on a RSA private key.''' if self.encryptor is not None: return if not public_key: try: with open(self.public_key_file, 'rb') as f: public_key = f.read() except IOError as e: raise errors.InputError( "public key '%s' does not exist." % self.public_key_file) try: public_key_obj = serialization.load_pem_public_key( public_key, backend=default_backend()) self.encryptor = Encryptor(public_key_obj) except Exception as e: raise errors.EncryptorError( "public key is malformed. (Reason: %s)" % (str(e)))
def _set_public_key_string(self, public_key_string): # add the PKCS#8 header if it doesn't exist if not public_key_string.startswith(PKCS8_HEADER): public_key_string = PKCS8_HEADER + public_key_string # break up the base64 key string into lines of max length 64, to please cryptography public_key_string = public_key_string.replace("\n", "") public_key_string = "\n".join(re.findall(".{1,64}", public_key_string)) # add the appropriate PEM header/footer public_key_string = self._add_pem_headers(public_key_string, "PUBLIC KEY") self._public_key = crypto_serialization.load_pem_public_key( self.ensure_bytes(public_key_string), backend=crypto_backend, )
def _load_cryptography_key(cls, data, password=None, backend=None): backend = default_backend() if backend is None else backend exceptions = {} # private key? for loader in (serialization.load_pem_private_key, serialization.load_der_private_key): try: return loader(data, password, backend) except (ValueError, TypeError, cryptography.exceptions.UnsupportedAlgorithm) as error: exceptions[loader] = error # public key? for loader in (serialization.load_pem_public_key, serialization.load_der_public_key): try: return loader(data, backend) except (ValueError, cryptography.exceptions.UnsupportedAlgorithm) as error: exceptions[loader] = error # no luck raise errors.Error('Unable to deserialize key: {0}'.format(exceptions))
def rsa(public_key, signature, message): """Verifies an RSA signature. Args: public_key (str): Public key with BEGIN and END sections. signature (str): Hex value of the signature with its leading 0x stripped. message (str): Message that was signed, unhashed. """ try: public_rsa = load_pem_public_key(bytes(public_key), backend=default_backend()) hashed = util.sha256(message) public_rsa.verify( binascii.unhexlify(signature), hashed, padding.PSS( mgf=padding.MGF1(hashes.SHA256()), salt_length=padding.PSS.MAX_LENGTH ), hashes.SHA256() ) except InvalidSignature: raise Exception('Invalid signature')
def __init__(self, wrapping_algorithm, wrapping_key, wrapping_key_type, password=None): """Prepares initial values.""" self.wrapping_algorithm = wrapping_algorithm self.wrapping_key_type = wrapping_key_type if wrapping_key_type is EncryptionKeyType.PRIVATE: self._wrapping_key = serialization.load_pem_private_key( data=wrapping_key, password=password, backend=default_backend() ) elif wrapping_key_type is EncryptionKeyType.PUBLIC: self._wrapping_key = serialization.load_pem_public_key( data=wrapping_key, backend=default_backend() ) elif wrapping_key_type is EncryptionKeyType.SYMMETRIC: self._wrapping_key = wrapping_key self._derived_wrapping_key = derive_data_encryption_key( source_key=self._wrapping_key, algorithm=self.wrapping_algorithm.algorithm, message_id=None ) else: raise InvalidDataKeyError('Invalid wrapping_key_type: {}'.format(wrapping_key_type))
def _rsa_verify_sig(sig_value, formatted, public_key_jsonld): """ - sig_value: data to be verified - public_key: creator of this document's public_key - tbv: to be verified """ # TODO: Support other formats than just PEM public_key = serialization.load_pem_public_key( _get_value(public_key_jsonld, "publicKeyPem").encode("utf-8"), backend=default_backend()) try: public_key.verify( base64.b64decode(sig_value.encode("utf-8")), formatted, padding.PSS( mgf=padding.MGF1(hashes.SHA256()), salt_length=padding.PSS.MAX_LENGTH), hashes.SHA256()) return True except InvalidSignature: return False # In the future, we'll be doing a lot more work based on what suite is # selected.
def load_key(pubkey): """Load public RSA key, with work-around for keys using incorrect header/footer format. Read more about RSA encryption with cryptography: https://cryptography.io/latest/hazmat/primitives/asymmetric/rsa/ """ try: return load_pem_public_key(pubkey.encode(), default_backend()) except ValueError: # workaround for https://github.com/travis-ci/travis-api/issues/196 pubkey = pubkey.replace('BEGIN RSA', 'BEGIN').replace('END RSA', 'END') return load_pem_public_key(pubkey.encode(), default_backend())
def decode_public_key(self, encoded): """ Based on spotnab, this is the gzipped version of the key with base64 applied to it. We decode it and load it. """ fileobj = StringIO() try: fileobj.write(b64decode(encoded)) except TypeError: return False fileobj.seek(0L, SEEK_SET) self.public_key = None with GzipFile(fileobj=fileobj, mode="rb") as f: try: self.public_key = serialization.load_pem_public_key( f.read(), backend=default_backend() ) except ValueError: # Could not decrypt content return False if not self.public_key: return False return True
def retrieve_public_key(user_repo): """Retrieve the public key from the Travis API. The Travis API response is accessed as JSON so that Travis-Encrypt can easily find the public key that is to be passed to cryptography's load_pem_public_key function. Due to issues with some public keys being returned from the Travis API as PKCS8 encoded, the key is returned with RSA removed from the header and footer. Parameters ---------- user_repo: str the repository in the format of 'username/repository' Returns ------- response: str the public RSA key of the username's repository Raises ------ InvalidCredentialsError raised when an invalid 'username/repository' is given """ url = 'https://api.travis-ci.org/repos/{}/key' .format(user_repo) response = requests.get(url) try: return response.json()['key'].replace(' RSA ', ' ') except KeyError: raise InvalidCredentialsError("Please enter a valid user/repository name.")
def encrypt_key(key, password): """Encrypt the password with the public key and return an ASCII representation. The public key retrieved from the Travis API is loaded as an RSAPublicKey object using Cryptography's default backend. Then the given password is encrypted with the encrypt() method of RSAPublicKey. The encrypted password is then encoded to base64 and decoded into ASCII in order to convert the bytes object into a string object. Parameters ---------- key: str Travis CI public RSA key that requires deserialization password: str the password to be encrypted Returns ------- encrypted_password: str the base64 encoded encrypted password decoded as ASCII Notes ----- Travis CI uses the PKCS1v15 padding scheme. While PKCS1v15 is secure, it is outdated and should be replaced with OAEP. Example: OAEP(mgf=MGF1(algorithm=SHA256()), algorithm=SHA256(), label=None)) """ public_key = load_pem_public_key(key.encode(), default_backend()) encrypted_password = public_key.encrypt(password, PKCS1v15()) return base64.b64encode(encrypted_password).decode('ascii')
def load_key(pubkey): """Load public RSA key. Work around keys with incorrect header/footer format. Read more about RSA encryption with cryptography: https://cryptography.io/latest/hazmat/primitives/asymmetric/rsa/ """ try: return load_pem_public_key(pubkey.encode(), default_backend()) except ValueError: # workaround for https://github.com/travis-ci/travis-api/issues/196 pubkey = pubkey.replace('BEGIN RSA', 'BEGIN').replace('END RSA', 'END') return load_pem_public_key(pubkey.encode(), default_backend())
def load_public_key_string(key_string): return serialization.load_pem_public_key(key_string, backend=default_backend())
def get_public_key_and_key_id(issuer, key_id=None): file_name = get_key_file_name(keys=api_settings.PUBLIC_KEYS, issuer=issuer, key_id=key_id) file_data = read_key_file(file_name=file_name) key = load_pem_public_key(file_data, backend=default_backend()) return key, get_key_id(file_name=file_name)
def convert_pubkey_to_pem(key): key_bytes = ApiClient.convert_to_bytes(key) return serialization.load_pem_public_key( key_bytes, backend=default_backend() )