我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用cryptography.hazmat.primitives.asymmetric.padding.MGF1。
def decrypt_pk(priv_key, ciphertext): """ Decrypt a b64encoded ciphertext string with the RSA private key priv_key, using CryptoHash() as the OAEP/MGF1 padding hash. Returns the plaintext. Decryption failures result in an exception being raised. """ try: plaintext = priv_key.decrypt( b64decode(ciphertext), padding.OAEP( mgf=padding.MGF1(algorithm=CryptoHash()), algorithm=CryptoHash(), label=None ) ) except UnsupportedAlgorithm as e: # a failure to dencrypt someone else's data is not typically a fatal # error, but in this particular case, the most likely cause of this # error is an old cryptography library logging.error("Fatal error: encryption hash {} unsupported, try upgrading to cryptography >= 1.4. Exception: {}".format( CryptoHash, e)) # re-raise the exception for the caller to handle raise e return plaintext
def test_verify_signature_PSS(self, mock_get_pub_key): data = b'224626ae19824466f2a7f39ab7b80f7f' mock_get_pub_key.return_value = TEST_RSA_PRIVATE_KEY.public_key() for hash_name, hash_alg in signature_utils.HASH_METHODS.items(): signer = TEST_RSA_PRIVATE_KEY.signer( padding.PSS( mgf=padding.MGF1(hash_alg), salt_length=padding.PSS.MAX_LENGTH ), hash_alg ) signer.update(data) signature = base64.b64encode(signer.finalize()) img_sig_cert_uuid = 'fea14bc2-d75f-4ba5-bccc-b5c924ad0693' verifier = signature_utils.get_verifier(None, img_sig_cert_uuid, hash_name, signature, signature_utils.RSA_PSS) verifier.update(data) verifier.verify()
def create_verifier_for_pss(signature, hash_method, public_key): """Create the verifier to use when the key type is RSA-PSS. :param signature: the decoded signature to use :param hash_method: the hash method to use, as a cryptography object :param public_key: the public key to use, as a cryptography object :raises: SignatureVerificationError if the RSA-PSS specific properties are invalid :returns: the verifier to use to verify the signature for RSA-PSS """ # default to MGF1 mgf = padding.MGF1(hash_method) # default to max salt length salt_length = padding.PSS.MAX_LENGTH # return the verifier return public_key.verifier( signature, padding.PSS(mgf=mgf, salt_length=salt_length), hash_method )
def signing(cls, message, private_key, algorithm='sha1'): """signing.""" if not isinstance(message, bytes): message = message.encode() algorithm = cls.ALGORITHM_DICT.get(algorithm) signer = private_key.signer( padding.PSS( mgf=padding.MGF1(algorithm), salt_length=padding.PSS.MAX_LENGTH ), algorithm ) signer.update(message) return signer.finalize()
def verification(cls, message, signature, public_key, algorithm='sha1'): """Verification.""" if not isinstance(message, bytes): message = message.encode() algorithm = cls.ALGORITHM_DICT.get(algorithm) verifier = public_key().verifier( signature, padding.PSS( mgf=padding.MGF1(algorithm), salt_length=padding.PSS.MAX_LENGTH ), algorithm ) verifier.update(message) return verifier.verify()
def encrypt(cls, message, public_key, algorithm='sha1'): """Public key encrypt.""" if not isinstance(message, bytes): message = message.encode() algorithm = cls.ALGORITHM_DICT.get(algorithm) ciphertext = public_key.encrypt( message, padding.OAEP( mgf=padding.MGF1(algorithm=algorithm), algorithm=algorithm, label=None ) ) return ciphertext
def create_decryptor(private_location, public_location): try: with open(private_location, "rb") as key_file: private_key = serialization.load_pem_private_key( key_file.read(), password=None, backend=default_backend() ) except FileNotFoundError: with open(private_location, "wb") as key_file: private_key = rsa.generate_private_key( public_exponent=65537, key_size=2048, backend=default_backend() ) key_file.write(private_key.private_bytes( encoding=serialization.Encoding.PEM, format=serialization.PrivateFormat.TraditionalOpenSSL, encryption_algorithm=serialization.NoEncryption() )) with open(public_location, "wb") as public_file: public_key = private_key.public_key() pem = public_key.public_bytes( encoding=serialization.Encoding.PEM, format=serialization.PublicFormat.SubjectPublicKeyInfo ) public_file.write(pem) def decrypt(ciphertext): return private_key.decrypt( ciphertext, padding.OAEP( mgf=padding.MGF1(algorithm=hashes.SHA1()), algorithm=hashes.SHA1(), label=None ) ) return decrypt
def verify(self, msg, key, sig): verifier = key.verifier( sig, padding.PSS( mgf=padding.MGF1(self.hash_alg()), salt_length=self.hash_alg.digest_size ), self.hash_alg() ) verifier.update(msg) try: verifier.verify() return True except InvalidSignature: return False
def encrypt_key_with_public_key(secret_key, public_encryption_key): """ Encrypts the given secret key with the public key. :param bytes secret_key: the key to encrypt :param public_encryption_key: the public encryption key :type public_encryption_key: :class:`~rsa.RSAPublicKey` :return: the encrypted key :rtype: bytes """ return public_encryption_key.encrypt( secret_key, padding.OAEP( mgf=padding.MGF1(algorithm=hashes.SHA256()), algorithm=hashes.SHA256(), label=None))
def decrypt_with_private_key(secret_key, private_encryption_key): """ Decrypts the given secret key with the private key. :param bytes secret_key: the secret key to decrypt :param private_encryption_key: the private encryption key :type private_encryption_key: :class:`~rsa.RSAPrivateKey` :return: the decrypted key :rtype: bytes """ return private_encryption_key.decrypt( secret_key, padding.OAEP( mgf=padding.MGF1(algorithm=hashes.SHA256()), algorithm=hashes.SHA256(), label=None))
def rsa(public_key, signature, message): """Verifies an RSA signature. Args: public_key (str): Public key with BEGIN and END sections. signature (str): Hex value of the signature with its leading 0x stripped. message (str): Message that was signed, unhashed. """ try: public_rsa = load_pem_public_key(bytes(public_key), backend=default_backend()) hashed = util.sha256(message) public_rsa.verify( binascii.unhexlify(signature), hashed, padding.PSS( mgf=padding.MGF1(hashes.SHA256()), salt_length=padding.PSS.MAX_LENGTH ), hashes.SHA256() ) except InvalidSignature: raise Exception('Invalid signature')
def rsa_sign(private_key, message): """Signs a message with the provided Rsa private key. Args: private_key (str): Rsa private key with BEGIN and END sections. message (str): Message to be hashed and signed. Returns: str: Hex signature of the message, with its leading 0x stripped. """ private_rsa = load_pem_private_key(bytes(private_key), password=None, backend=default_backend()) hashed = sha256(message) signature = private_rsa.sign( hashed, padding.PSS( mgf=padding.MGF1(hashes.SHA256()), salt_length=padding.PSS.MAX_LENGTH ), hashes.SHA256() ) return binascii.hexlify(bytearray(signature))
def _mgf1_sha256_supported(): wk = serialization.load_pem_private_key( data=VALUES['raw'][b'asym1'][EncryptionKeyType.PRIVATE], password=None, backend=default_backend() ) try: wk.public_key().encrypt( plaintext=b'aosdjfoiajfoiaj;foijae;rogijaerg', padding=padding.OAEP( mgf=padding.MGF1(hashes.SHA256()), algorithm=hashes.SHA256(), label=None ) ) except cryptography.exceptions.UnsupportedAlgorithm: return False return True
def _sign_image(self, image_file): LOG.debug("Creating signature for image data") signer = self.private_key.signer( padding.PSS( mgf=padding.MGF1(hashes.SHA256()), salt_length=padding.PSS.MAX_LENGTH ), hashes.SHA256() ) chunk_bytes = 8192 with open(image_file, 'rb') as f: chunk = f.read(chunk_bytes) while len(chunk) > 0: signer.update(chunk) chunk = f.read(chunk_bytes) signature = signer.finalize() signature_b64 = base64.b64encode(signature) return signature_b64
def _rsa_verify_sig(sig_value, formatted, public_key_jsonld): """ - sig_value: data to be verified - public_key: creator of this document's public_key - tbv: to be verified """ # TODO: Support other formats than just PEM public_key = serialization.load_pem_public_key( _get_value(public_key_jsonld, "publicKeyPem").encode("utf-8"), backend=default_backend()) try: public_key.verify( base64.b64decode(sig_value.encode("utf-8")), formatted, padding.PSS( mgf=padding.MGF1(hashes.SHA256()), salt_length=padding.PSS.MAX_LENGTH), hashes.SHA256()) return True except InvalidSignature: return False # In the future, we'll be doing a lot more work based on what suite is # selected.
def encrypt_pk(pub_key, plaintext): """ Encrypt plaintext with the RSA public key pub_key, using CryptoHash() as the OAEP/MGF1 padding hash. plaintext is limited to the size of the RSA key, minus the padding, or a few hundred bytes. Returns a b64encoded ciphertext string. Encryption failures result in an exception being raised. """ try: ciphertext = pub_key.encrypt( plaintext, padding.OAEP( mgf=padding.MGF1(algorithm=CryptoHash()), algorithm=CryptoHash(), label=None ) ) except UnsupportedAlgorithm as e: # a failure to encrypt our own data is a fatal error # the most likely cause of this error is an old cryptography library # although some newer binary cryptography libraries are linked with # old OpenSSL versions, to fix, check 'openssl version' >= 1.0.2, then: # pip install -I --no-binary cryptography cryptography logging.error("Fatal error: encryption hash {} unsupported, try upgrading to cryptography >= 1.4 compiled with OpenSSL >= 1.0.2. Exception: {}".format( CryptoHash, e)) # re-raise the exception for the caller to handle raise e return b64encode(ciphertext)
def encrypt_rsa(text, key): private_key = load_key(key) public_key = private_key.public_key() return public_key.encrypt( text, padding.OAEP( mgf=padding.MGF1(algorithm=hashes.SHA1()), algorithm=hashes.SHA1(), label=None ) )
def decrypt_rsa(text, key): private_key = load_key(key) return private_key.decrypt( text, padding.OAEP( mgf=padding.MGF1(algorithm=hashes.SHA1()), algorithm=hashes.SHA1(), label=None ) )
def __init__(self, key_pair): self.key_pair = key_pair self._padding = asym_padding.OAEP( mgf=asym_padding.MGF1(algorithm=hashes.SHA1()), algorithm=hashes.SHA1(), label=None )
def decrypt(cls, ciphertext, private_key, algorithm='sha1'): """Private key descrption.""" algorithm = cls.ALGORITHM_DICT.get(algorithm) plaintext = private_key.decrypt( ciphertext, padding.OAEP( mgf=padding.MGF1(algorithm=algorithm), algorithm=algorithm, label=None ) ) return plaintext.decode()
def verify(self, signature: bytes, message: bytes): return self._hazmat_public_key.verify( signature, message, padding.PSS( mgf=padding.MGF1(hashes.SHA256()), salt_length=padding.PSS.MAX_LENGTH ), hashes.SHA256() )
def encrypt(self, message: bytes): symkey = generate_sym_key() ciphertext = symkey.encrypt(message) ciphersymkey = self._hazmat_public_key.encrypt( symkey.key, padding.OAEP( mgf=padding.MGF1(algorithm=hashes.SHA256()), algorithm=hashes.SHA256(), label=None ) ) return struct.pack(">I", len(ciphersymkey)) + ciphersymkey + ciphertext
def sign(self, message: bytes): return self._hazmat_private_key.sign( message, padding.PSS( mgf=padding.MGF1(hashes.SHA256()), salt_length=padding.PSS.MAX_LENGTH ), hashes.SHA256() )
def decrypt(self, ciphertext: bytes): lenciphersymkey, = struct.unpack(">I", ciphertext[:4]) ciphersymkey = ciphertext[4:4 + lenciphersymkey] ciphertext = ciphertext[4 + lenciphersymkey:] symkey = load_sym_key(self._hazmat_private_key.decrypt( ciphersymkey, padding.OAEP( mgf=padding.MGF1(algorithm=hashes.SHA256()), algorithm=hashes.SHA256(), label=None ) )) return symkey.decrypt(ciphertext)
def _generate_sign(self, pri_key, data): """ ?? ??? ?? :param pri_key: ??? ??? :param data: ?? ?? ??? :return: ??? ?? ??? """ _signature = None # ???? Type(RSA, ECC)? ?? ?? ?? ?? if isinstance(pri_key, ec.EllipticCurvePrivateKeyWithSerialization): # ECDSA ?? logging.debug("Sign ECDSA") signer = pri_key.signer(ec.ECDSA(hashes.SHA256())) signer.update(data) _signature = signer.finalize() elif isinstance(pri_key, rsa.RSAPrivateKeyWithSerialization): # RSA ?? logging.debug("Sign RSA") _signature = pri_key.sign( data, padding.PSS( mgf=padding.MGF1(hashes.SHA256()), salt_length=padding.PSS.MAX_LENGTH ), hashes.SHA256() ) else: logging.debug("Unknown PrivateKey Type : %s", type(pri_key)) return _signature
def __init__(self, name, hash_): super(_JWAPS, self).__init__(name) self.padding = padding.PSS( mgf=padding.MGF1(hash_()), salt_length=padding.PSS.MAX_LENGTH) self.hash = hash_()
def sign(self, msg, key): signer = key.signer( padding.PSS( mgf=padding.MGF1(self.hash_alg()), salt_length=self.hash_alg.digest_size ), self.hash_alg() ) signer.update(msg) return signer.finalize()
def test_unsupported_oaep_label(self): private_key = RSA_KEY_512.private_key(backend) with pytest.raises(ValueError): private_key.decrypt( b"0" * 64, padding.OAEP( mgf=padding.MGF1(algorithm=hashes.SHA1()), algorithm=hashes.SHA1(), label=b"label" ) )
def test_unsupported_mgf1_hash_algorithm(self): private_key = RSA_KEY_512.private_key(backend) with raises_unsupported_algorithm(_Reasons.UNSUPPORTED_HASH): private_key.decrypt( b"0" * 64, padding.OAEP( mgf=padding.MGF1(algorithm=hashes.SHA256()), algorithm=hashes.SHA1(), label=None ) )
def test_unsupported_oaep_hash_algorithm(self): private_key = RSA_KEY_512.private_key(backend) with raises_unsupported_algorithm(_Reasons.UNSUPPORTED_HASH): private_key.decrypt( b"0" * 64, padding.OAEP( mgf=padding.MGF1(algorithm=hashes.SHA1()), algorithm=hashes.SHA256(), label=None ) )
def decrypt(privkey, ciphertext): plaintext = privkey.decrypt( ciphertext, padding.OAEP( mgf=padding.MGF1(algorithm=hashes.SHA256()), algorithm=hashes.SHA256(), label=None ) ) return plaintext
def encrypt(pubkey, plaintext): ciphertext= pubkey.encrypt(plaintext=plaintext, padding=padding.OAEP( mgf=padding.MGF1(algorithm=hashes.SHA256()), algorithm=hashes.SHA256(), label=None ) ) return ciphertext
def sign_message(privkey, message): plaintext = b64decode(message.encode('utf-8')) signature = privkey.sign( plaintext, padding.PSS( mgf=padding.MGF1(hashes.SHA256()), salt_length=padding.PSS.MAX_LENGTH ), hashes.SHA256() ) return b64encode(signature).decode('utf-8')
def wrap_rsa_key(public_key, private_key_bytes): # Use the Google public key to encrypt the customer private key. # This means that only the Google private key is capable of decrypting # the customer private key. wrapped_key = public_key.encrypt( private_key_bytes, padding.OAEP( mgf=padding.MGF1(algorithm=hashes.SHA1()), algorithm=hashes.SHA1(), label=None)) encoded_wrapped_key = base64.b64encode(wrapped_key) return encoded_wrapped_key
def __encrypt_with_rsa(self, content, recipient_pk): """ Encrypt content with RSAES-OAEP scheme @developer: vsmysle This method handles an encryption of a *single* RSA block with a specified above scheme. It does not handle splitting of a header into several blocks. It has to be done by other method that would use this one only for single block encryption purpose. TODO: what is a maximum size of a content that can be padded and encrypted given a particular size of RSA key? :param content: bytes content to encrypt (probably a part of ASN.1 DER-encoded MPHeader block) :param recipient_pk: instance of cryptography.hazmat.primitives.rsa .RSAPublicKey to use for a content encryption :return: string encryption of an input content """ # TODO: add exceptions self.logger.debug("rsa encryption") ciphertext = recipient_pk.encrypt( content, asym_padding.OAEP( mgf=asym_padding.MGF1(algorithm=SHA1()), algorithm=SHA1(), label=None ) ) self.logger.info("encrypted") return ciphertext
def __decrypt_with_rsa(self, content, user_sk): """ Decrypt RSAES-OAEP encrypted content (single block) @developer: vsmysle This method decrypts a single RSA ciphertext block only :param content: bytes content to decrypt :param user_sk: instance of cryptography.hazmat.primitives.rsa .RSAPrivateKey to use for a decryption :return: string decryption of an input content """ # TODO: add exceptions self.logger.debug("rsa decryption") try: plaintext = user_sk.decrypt( content, asym_padding.OAEP( mgf=asym_padding.MGF1(algorithm=SHA1()), algorithm=SHA1(), label=None ) ) except InvalidKey: self.logger.warning("Invalid key!") return return plaintext
def __sign_content(self, content, user_sk): """ Produce a signature of an input content using RSASSA-PSS scheme @developer: vsmysle :param content: bytes content to sign :param user_sk: instance of cryptography.hazmat.primitives.rsa. RSAPrivateKey :return: bytes of signature of the input content """ # TODO: add exceptions self.logger.debug("generating a signature of an input content") # creating signer that will sign our content try: signer = user_sk.signer( # we use RSASSA-PSS padding for the signature scheme asym_padding.PSS( mgf=asym_padding.MGF1(SHA1()), salt_length=asym_padding.PSS.MAX_LENGTH ), SHA1() ) except InvalidKey: self.logger.warning("Invalid key!") return signer.update(content) signature = signer.finalize() self.logger.info("signature generation finished") return signature
def __verify_signature(self, signature, signer_pk, content): """ Verify RSASSA-PSS signature @developer: vsmysle :param signature: signature bytes to verify :param signer_pk: instance of cryptography.hazmat.primitives. rsa.RSAPublicKey that is a public key of a signer :param content: content to verify a signature of :return: bool verification result """ self.logger.debug("starting signature verification routine") try: signer_pk.verify( signature, content, asym_padding.PSS( mgf=asym_padding.MGF1(SHA1()), salt_length=asym_padding.PSS.MAX_LENGTH ), SHA1() ) except InvalidSignature: self.logger.warn("signature verification failed") return False self.logger.info("signature OK") return True
def sign(self, msg, key): return key.sign( msg, padding.PSS( mgf=padding.MGF1(self.hash_alg()), salt_length=self.hash_alg.digest_size ), self.hash_alg() )
def verify(self, msg, key, sig): try: key.verify( sig, msg, padding.PSS( mgf=padding.MGF1(self.hash_alg()), salt_length=self.hash_alg.digest_size ), self.hash_alg() ) return True except InvalidSignature: return False
def sign(self, private_key): return private_key.sign( self.string_to_sign.encode("utf-8"), padding.PSS(mgf=padding.MGF1(hashes.SHA256()), salt_length=32), hashes.SHA256())
def sign(self, message, private_key): """Sign the message. This method will take the provided message and create a signature using the provided RSA private key. The resulting signature is stored in the fulfillment. The key should be provided as a PEM encoded private key string. The message is padded using RSA-PSS with SHA256. Args: message (bytes): Message to sign. private_key (bytes): RSA private key. """ private_key_obj = serialization.load_pem_private_key( private_key, password=None, backend=default_backend(), ) if self.modulus is None: m_int = private_key_obj.public_key().public_numbers().n m_bytes = m_int.to_bytes( (m_int.bit_length() + 7) // 8, 'big') self._set_public_modulus(m_bytes) signer = private_key_obj.signer( padding.PSS( mgf=padding.MGF1(hashes.SHA256()), salt_length=SALT_LENGTH, ), hashes.SHA256(), ) signer.update(message) self.signature = signer.finalize()
def validate(self, message): """Verify the signature of self RSA fulfillment. The signature of self RSA fulfillment is verified against the provided message and the condition's public modulus. Args: message (bytes): Message to verify. Returns: bool: Whether self fulfillment is valid. """ if not isinstance(message, bytes): raise Exception( 'Message must be provided as bytes, was: ' + message) public_numbers = RSAPublicNumbers( PUBLIC_EXPONENT, int.from_bytes(self.modulus, byteorder='big'), ) public_key = public_numbers.public_key(default_backend()) verifier = public_key.verifier( self.signature, padding.PSS( mgf=padding.MGF1(hashes.SHA256()), salt_length=SALT_LENGTH, ), hashes.SHA256() ) verifier.update(message) try: verifier.verify() except InvalidSignature as exc: raise ValidationError('Invalid RSA signature') from exc return True
def encrypt(self, message): if self._value is None: raise ValueError("Can't Encrypt with empty key.") try: return self._value.encrypt( message, padding.OAEP( mgf=padding.MGF1(algorithm=hashes.SHA1()), algorithm=hashes.SHA1(), label=None)) except ValueError as e: raise CipherError(e)
def get_signer(self): """Gets an incremental verifier. Must call .update() on the verifier and then .finalize() to check. """ hash_algorithm = hashes.SHA256() padding_algorithm = padding.PSS(mgf=padding.MGF1(hash_algorithm), salt_length=padding.PSS.MAX_LENGTH) padding_algorithm = padding.PKCS1v15() return self._value.signer(padding_algorithm, hash_algorithm)
def decrypt(self, message): if self._value is None: raise ValueError("Can't Decrypt with empty key.") try: return self._value.decrypt( message, padding.OAEP( mgf=padding.MGF1(algorithm=hashes.SHA1()), algorithm=hashes.SHA1(), label=None)) except ValueError as e: raise CipherError(e)