我们从Python开源项目中,提取了以下41个代码示例,用于说明如何使用cryptography.hazmat.primitives.asymmetric.padding.PSS。
def test_verify_signature_PSS(self, mock_get_pub_key): data = b'224626ae19824466f2a7f39ab7b80f7f' mock_get_pub_key.return_value = TEST_RSA_PRIVATE_KEY.public_key() for hash_name, hash_alg in signature_utils.HASH_METHODS.items(): signer = TEST_RSA_PRIVATE_KEY.signer( padding.PSS( mgf=padding.MGF1(hash_alg), salt_length=padding.PSS.MAX_LENGTH ), hash_alg ) signer.update(data) signature = base64.b64encode(signer.finalize()) img_sig_cert_uuid = 'fea14bc2-d75f-4ba5-bccc-b5c924ad0693' verifier = signature_utils.get_verifier(None, img_sig_cert_uuid, hash_name, signature, signature_utils.RSA_PSS) verifier.update(data) verifier.verify()
def create_verifier_for_pss(signature, hash_method, public_key): """Create the verifier to use when the key type is RSA-PSS. :param signature: the decoded signature to use :param hash_method: the hash method to use, as a cryptography object :param public_key: the public key to use, as a cryptography object :raises: SignatureVerificationError if the RSA-PSS specific properties are invalid :returns: the verifier to use to verify the signature for RSA-PSS """ # default to MGF1 mgf = padding.MGF1(hash_method) # default to max salt length salt_length = padding.PSS.MAX_LENGTH # return the verifier return public_key.verifier( signature, padding.PSS(mgf=mgf, salt_length=salt_length), hash_method )
def signing(cls, message, private_key, algorithm='sha1'): """signing.""" if not isinstance(message, bytes): message = message.encode() algorithm = cls.ALGORITHM_DICT.get(algorithm) signer = private_key.signer( padding.PSS( mgf=padding.MGF1(algorithm), salt_length=padding.PSS.MAX_LENGTH ), algorithm ) signer.update(message) return signer.finalize()
def verification(cls, message, signature, public_key, algorithm='sha1'): """Verification.""" if not isinstance(message, bytes): message = message.encode() algorithm = cls.ALGORITHM_DICT.get(algorithm) verifier = public_key().verifier( signature, padding.PSS( mgf=padding.MGF1(algorithm), salt_length=padding.PSS.MAX_LENGTH ), algorithm ) verifier.update(message) return verifier.verify()
def verify(self, msg, key, sig): verifier = key.verifier( sig, padding.PSS( mgf=padding.MGF1(self.hash_alg()), salt_length=self.hash_alg.digest_size ), self.hash_alg() ) verifier.update(msg) try: verifier.verify() return True except InvalidSignature: return False
def rsa(public_key, signature, message): """Verifies an RSA signature. Args: public_key (str): Public key with BEGIN and END sections. signature (str): Hex value of the signature with its leading 0x stripped. message (str): Message that was signed, unhashed. """ try: public_rsa = load_pem_public_key(bytes(public_key), backend=default_backend()) hashed = util.sha256(message) public_rsa.verify( binascii.unhexlify(signature), hashed, padding.PSS( mgf=padding.MGF1(hashes.SHA256()), salt_length=padding.PSS.MAX_LENGTH ), hashes.SHA256() ) except InvalidSignature: raise Exception('Invalid signature')
def rsa_sign(private_key, message): """Signs a message with the provided Rsa private key. Args: private_key (str): Rsa private key with BEGIN and END sections. message (str): Message to be hashed and signed. Returns: str: Hex signature of the message, with its leading 0x stripped. """ private_rsa = load_pem_private_key(bytes(private_key), password=None, backend=default_backend()) hashed = sha256(message) signature = private_rsa.sign( hashed, padding.PSS( mgf=padding.MGF1(hashes.SHA256()), salt_length=padding.PSS.MAX_LENGTH ), hashes.SHA256() ) return binascii.hexlify(bytearray(signature))
def _sign_image(self, image_file): LOG.debug("Creating signature for image data") signer = self.private_key.signer( padding.PSS( mgf=padding.MGF1(hashes.SHA256()), salt_length=padding.PSS.MAX_LENGTH ), hashes.SHA256() ) chunk_bytes = 8192 with open(image_file, 'rb') as f: chunk = f.read(chunk_bytes) while len(chunk) > 0: signer.update(chunk) chunk = f.read(chunk_bytes) signature = signer.finalize() signature_b64 = base64.b64encode(signature) return signature_b64
def sign_and_upload_image(self): img_signature = self._sign_image(self.img_file) img_properties = { 'img_signature': img_signature, 'img_signature_certificate_uuid': self.signing_cert_uuid, 'img_signature_key_type': 'RSA-PSS', 'img_signature_hash_method': 'SHA-256', } LOG.debug("Uploading image with signature metadata properties") img_uuid = self._image_create( 'signed_img', CONF.scenario.img_container_format, self.img_file, disk_format=CONF.scenario.img_disk_format, properties=img_properties ) LOG.debug("Uploaded image %s", img_uuid) return img_uuid
def _rsa_verify_sig(sig_value, formatted, public_key_jsonld): """ - sig_value: data to be verified - public_key: creator of this document's public_key - tbv: to be verified """ # TODO: Support other formats than just PEM public_key = serialization.load_pem_public_key( _get_value(public_key_jsonld, "publicKeyPem").encode("utf-8"), backend=default_backend()) try: public_key.verify( base64.b64decode(sig_value.encode("utf-8")), formatted, padding.PSS( mgf=padding.MGF1(hashes.SHA256()), salt_length=padding.PSS.MAX_LENGTH), hashes.SHA256()) return True except InvalidSignature: return False # In the future, we'll be doing a lot more work based on what suite is # selected.
def test_signature_key_type_lookup_fail(self): self.assertRaisesRegex(exception.SignatureVerificationError, 'Invalid signature key type: .*', signature_utils.SignatureKeyType.lookup, 'RSB-PSS')
def verify(self, signature: bytes, message: bytes): return self._hazmat_public_key.verify( signature, message, padding.PSS( mgf=padding.MGF1(hashes.SHA256()), salt_length=padding.PSS.MAX_LENGTH ), hashes.SHA256() )
def sign(self, message: bytes): return self._hazmat_private_key.sign( message, padding.PSS( mgf=padding.MGF1(hashes.SHA256()), salt_length=padding.PSS.MAX_LENGTH ), hashes.SHA256() )
def _generate_sign(self, pri_key, data): """ ?? ??? ?? :param pri_key: ??? ??? :param data: ?? ?? ??? :return: ??? ?? ??? """ _signature = None # ???? Type(RSA, ECC)? ?? ?? ?? ?? if isinstance(pri_key, ec.EllipticCurvePrivateKeyWithSerialization): # ECDSA ?? logging.debug("Sign ECDSA") signer = pri_key.signer(ec.ECDSA(hashes.SHA256())) signer.update(data) _signature = signer.finalize() elif isinstance(pri_key, rsa.RSAPrivateKeyWithSerialization): # RSA ?? logging.debug("Sign RSA") _signature = pri_key.sign( data, padding.PSS( mgf=padding.MGF1(hashes.SHA256()), salt_length=padding.PSS.MAX_LENGTH ), hashes.SHA256() ) else: logging.debug("Unknown PrivateKey Type : %s", type(pri_key)) return _signature
def __init__(self, name, hash_): super(_JWAPS, self).__init__(name) self.padding = padding.PSS( mgf=padding.MGF1(hash_()), salt_length=padding.PSS.MAX_LENGTH) self.hash = hash_()
def sign(self, msg, key): signer = key.signer( padding.PSS( mgf=padding.MGF1(self.hash_alg()), salt_length=self.hash_alg.digest_size ), self.hash_alg() ) signer.update(msg) return signer.finalize()
def sign_message(privkey, message): plaintext = b64decode(message.encode('utf-8')) signature = privkey.sign( plaintext, padding.PSS( mgf=padding.MGF1(hashes.SHA256()), salt_length=padding.PSS.MAX_LENGTH ), hashes.SHA256() ) return b64encode(signature).decode('utf-8')
def __sign_content(self, content, user_sk): """ Produce a signature of an input content using RSASSA-PSS scheme @developer: vsmysle :param content: bytes content to sign :param user_sk: instance of cryptography.hazmat.primitives.rsa. RSAPrivateKey :return: bytes of signature of the input content """ # TODO: add exceptions self.logger.debug("generating a signature of an input content") # creating signer that will sign our content try: signer = user_sk.signer( # we use RSASSA-PSS padding for the signature scheme asym_padding.PSS( mgf=asym_padding.MGF1(SHA1()), salt_length=asym_padding.PSS.MAX_LENGTH ), SHA1() ) except InvalidKey: self.logger.warning("Invalid key!") return signer.update(content) signature = signer.finalize() self.logger.info("signature generation finished") return signature
def __verify_signature(self, signature, signer_pk, content): """ Verify RSASSA-PSS signature @developer: vsmysle :param signature: signature bytes to verify :param signer_pk: instance of cryptography.hazmat.primitives. rsa.RSAPublicKey that is a public key of a signer :param content: content to verify a signature of :return: bool verification result """ self.logger.debug("starting signature verification routine") try: signer_pk.verify( signature, content, asym_padding.PSS( mgf=asym_padding.MGF1(SHA1()), salt_length=asym_padding.PSS.MAX_LENGTH ), SHA1() ) except InvalidSignature: self.logger.warn("signature verification failed") return False self.logger.info("signature OK") return True
def sign(self, msg, key): return key.sign( msg, padding.PSS( mgf=padding.MGF1(self.hash_alg()), salt_length=self.hash_alg.digest_size ), self.hash_alg() )
def verify(self, msg, key, sig): try: key.verify( sig, msg, padding.PSS( mgf=padding.MGF1(self.hash_alg()), salt_length=self.hash_alg.digest_size ), self.hash_alg() ) return True except InvalidSignature: return False
def sign(self, private_key): return private_key.sign( self.string_to_sign.encode("utf-8"), padding.PSS(mgf=padding.MGF1(hashes.SHA256()), salt_length=32), hashes.SHA256())
def _set_signature(self, signature): """Set the signature manually. The signature must be a valid RSA-PSS siganture. Args: signature (bytes): RSA signature. """ if not isinstance(signature, bytes): raise TypeError('Signature must be bytes, was: ' + signature) self.signature = signature
def sign(self, message, private_key): """Sign the message. This method will take the provided message and create a signature using the provided RSA private key. The resulting signature is stored in the fulfillment. The key should be provided as a PEM encoded private key string. The message is padded using RSA-PSS with SHA256. Args: message (bytes): Message to sign. private_key (bytes): RSA private key. """ private_key_obj = serialization.load_pem_private_key( private_key, password=None, backend=default_backend(), ) if self.modulus is None: m_int = private_key_obj.public_key().public_numbers().n m_bytes = m_int.to_bytes( (m_int.bit_length() + 7) // 8, 'big') self._set_public_modulus(m_bytes) signer = private_key_obj.signer( padding.PSS( mgf=padding.MGF1(hashes.SHA256()), salt_length=SALT_LENGTH, ), hashes.SHA256(), ) signer.update(message) self.signature = signer.finalize()
def validate(self, message): """Verify the signature of self RSA fulfillment. The signature of self RSA fulfillment is verified against the provided message and the condition's public modulus. Args: message (bytes): Message to verify. Returns: bool: Whether self fulfillment is valid. """ if not isinstance(message, bytes): raise Exception( 'Message must be provided as bytes, was: ' + message) public_numbers = RSAPublicNumbers( PUBLIC_EXPONENT, int.from_bytes(self.modulus, byteorder='big'), ) public_key = public_numbers.public_key(default_backend()) verifier = public_key.verifier( self.signature, padding.PSS( mgf=padding.MGF1(hashes.SHA256()), salt_length=SALT_LENGTH, ), hashes.SHA256() ) verifier.update(message) try: verifier.verify() except InvalidSignature as exc: raise ValidationError('Invalid RSA signature') from exc return True
def get_signer(self): """Gets an incremental verifier. Must call .update() on the verifier and then .finalize() to check. """ hash_algorithm = hashes.SHA256() padding_algorithm = padding.PSS(mgf=padding.MGF1(hash_algorithm), salt_length=padding.PSS.MAX_LENGTH) padding_algorithm = padding.PKCS1v15() return self._value.signer(padding_algorithm, hash_algorithm)
def sign(self, message): signer = self._key.signer( padding.PSS( mgf=padding.MGF1(hashes.SHA256()), salt_length=padding.PSS.MAX_LENGTH ), hashes.SHA256() ) signer.update(message) return signer.finalize()
def verify(self, message, signature): verifier = self._key.verifier( signature, padding.PSS( mgf=padding.MGF1(hashes.SHA256()), salt_length=padding.PSS.MAX_LENGTH ), hashes.SHA256() ) verifier.update(message) verifier.verify() return True
def sign(self, caller: NodeId, message: bytes) -> str: signature = self._private_keys[caller].sign( message, padding.PSS( mgf=padding.MGF1(hashes.SHA256()), salt_length=padding.PSS.MAX_LENGTH ), hashes.SHA256() ) return base64.b64encode(signature)
def verify(self, caller: NodeId, signature: str, message: bytes, sender_id: NodeId) -> bool: public_key = self._private_keys[sender_id].public_key() try: public_key.verify( base64.b64decode(signature), message, padding.PSS( mgf=padding.MGF1(hashes.SHA256()), salt_length=padding.PSS.MAX_LENGTH ), hashes.SHA256() ) return True except InvalidSignature: return False
def _basic_rsa_signature(formatted, options): private_key = serialization.load_pem_private_key( options["privateKeyPem"], password=None, backend=default_backend()) signed = private_key.sign( formatted, # I'm guessing this is the right padding function...? padding.PSS( mgf=padding.MGF1(hashes.SHA256()), salt_length=padding.PSS.MAX_LENGTH), hashes.SHA256()) return base64.b64encode(signed).decode("utf-8")
def pkcs_emsa_pss_encode(M, emBits, h, mgf, sLen): """ Implements EMSA-PSS-ENCODE() function described in Sect. 9.1.1 of RFC 3447 Input: M : message to be encoded, an octet string emBits: maximal bit length of the integer resulting of pkcs_os2ip(EM), where EM is the encoded message, output of the function. h : hash function name (in 'md2', 'md4', 'md5', 'sha1', 'tls', 'sha256', 'sha384'). hLen denotes the length in octets of the hash function output. mgf : the mask generation function f : seed, maskLen -> mask sLen : intended length in octets of the salt Output: encoded message, an octet string of length emLen = ceil(emBits/8) On error, None is returned. """ # 1) is not done hLen = _hashFuncParams[h][0] # 2) hFunc = _hashFuncParams[h][1] mHash = hFunc(M) emLen = int(math.ceil(emBits/8.)) if emLen < hLen + sLen + 2: # 3) warning("encoding error (emLen < hLen + sLen + 2)") return None salt = randstring(sLen) # 4) MPrime = '\x00'*8 + mHash + salt # 5) H = hFunc(MPrime) # 6) PS = '\x00'*(emLen - sLen - hLen - 2) # 7) DB = PS + '\x01' + salt # 8) dbMask = mgf(H, emLen - hLen - 1) # 9) maskedDB = strxor(DB, dbMask) # 10) l = (8*emLen - emBits)/8 # 11) rem = 8*emLen - emBits - 8*l # additionnal bits andMask = l*'\x00' if rem: j = chr(reduce(lambda x,y: x+y, map(lambda x: 1<<x, range(8-rem)))) andMask += j l += 1 maskedDB = strand(maskedDB[:l], andMask) + maskedDB[l:] EM = maskedDB + H + '\xbc' # 12) return EM # 13)
def pkcs_emsa_pss_verify(M, EM, emBits, h, mgf, sLen): """ Implements EMSA-PSS-VERIFY() function described in Sect. 9.1.2 of RFC 3447 Input: M : message to be encoded, an octet string EM : encoded message, an octet string of length emLen = ceil(emBits/8) emBits: maximal bit length of the integer resulting of pkcs_os2ip(EM) h : hash function name (in 'md2', 'md4', 'md5', 'sha1', 'tls', 'sha256', 'sha384'). hLen denotes the length in octets of the hash function output. mgf : the mask generation function f : seed, maskLen -> mask sLen : intended length in octets of the salt Output: True if the verification is ok, False otherwise. """ # 1) is not done hLen = _hashFuncParams[h][0] # 2) hFunc = _hashFuncParams[h][1] mHash = hFunc(M) emLen = int(math.ceil(emBits/8.)) # 3) if emLen < hLen + sLen + 2: return False if EM[-1] != '\xbc': # 4) return False l = emLen - hLen - 1 # 5) maskedDB = EM[:l] H = EM[l:l+hLen] l = (8*emLen - emBits)/8 # 6) rem = 8*emLen - emBits - 8*l # additionnal bits andMask = l*'\xff' if rem: val = reduce(lambda x,y: x+y, map(lambda x: 1<<x, range(8-rem))) j = chr(~val & 0xff) andMask += j l += 1 if strand(maskedDB[:l], andMask) != '\x00'*l: return False dbMask = mgf(H, emLen - hLen - 1) # 7) DB = strxor(maskedDB, dbMask) # 8) l = (8*emLen - emBits)/8 # 9) rem = 8*emLen - emBits - 8*l # additionnal bits andMask = l*'\x00' if rem: j = chr(reduce(lambda x,y: x+y, map(lambda x: 1<<x, range(8-rem)))) andMask += j l += 1 DB = strand(DB[:l], andMask) + DB[l:] l = emLen - hLen - sLen - 1 # 10) if DB[:l] != '\x00'*(l-1) + '\x01': return False salt = DB[-sLen:] # 11) MPrime = '\x00'*8 + mHash + salt # 12) HPrime = hFunc(MPrime) # 13) return H == HPrime # 14)
def _rsassa_pss_sign(self, M, h=None, mgf=None, sLen=None): """ Implements RSASSA-PSS-SIGN() function described in Sect. 8.1.1 of RFC 3447. Input: M: message to be signed, an octet string Output: signature, an octet string of length k, where k is the length in octets of the RSA modulus n. On error, None is returned. """ # Set default parameters if not provided if h is None: # By default, sha1 h = "sha1" if not h in _hashFuncParams: warning("Key._rsassa_pss_sign(): unknown hash function " "provided (%s)" % h) return None if mgf is None: # use mgf1 with underlying hash function mgf = lambda x,y: pkcs_mgf1(x, y, h) if sLen is None: # use Hash output length (A.2.3 of RFC 3447) hLen = _hashFuncParams[h][0] sLen = hLen # 1) EMSA-PSS encoding modBits = self.modulusLen k = modBits / 8 EM = pkcs_emsa_pss_encode(M, modBits - 1, h, mgf, sLen) if EM is None: warning("Key._rsassa_pss_sign(): unable to encode") return None # 2) RSA signature m = pkcs_os2ip(EM) # 2.a) s = self._rsasp1(m) # 2.b) S = pkcs_i2osp(s, k) # 2.c) return S # 3)
def sign(self, M, t=None, h=None, mgf=None, sLen=None): """ Sign message 'M' using 't' signature scheme where 't' can be: - None: the message 'M' is directly applied the RSASP1 signature primitive, as described in PKCS#1 v2.1, i.e. RFC 3447 Sect 5.2.1. Simply put, the message undergo a modular exponentiation using the private key. Additionnal method parameters are just ignored. - 'pkcs': the message 'M' is applied RSASSA-PKCS1-v1_5-SIGN signature scheme as described in Sect. 8.2.1 of RFC 3447. In that context, the hash function name is passed using 'h'. Possible values are "md2", "md4", "md5", "sha1", "tls", "sha224", "sha256", "sha384" and "sha512". If none is provided, sha1 is used. Other additionnal parameters are ignored. - 'pss' : the message 'M' is applied RSASSA-PSS-SIGN signature scheme as described in Sect. 8.1.1. of RFC 3447. In that context, o 'h' parameter provides the name of the hash method to use. Possible values are "md2", "md4", "md5", "sha1", "tls", "sha224", "sha256", "sha384" and "sha512". if none is provided, sha1 is used. o 'mgf' is the mask generation function. By default, mgf is derived from the provided hash function using the generic MGF1 (see pkcs_mgf1() for details). o 'sLen' is the length in octet of the salt. You can overload the default value (the octet length of the hash value for provided algorithm) by providing another one with that parameter. """ if t is None: # RSASP1 M = pkcs_os2ip(M) n = self.modulus if M > n-1: warning("Message to be signed is too long for key modulus") return None s = self._rsasp1(M) if s is None: return None return pkcs_i2osp(s, self.modulusLen/8) elif t == "pkcs": # RSASSA-PKCS1-v1_5-SIGN if h is None: h = "sha1" return self._rsassa_pkcs1_v1_5_sign(M, h) elif t == "pss": # RSASSA-PSS-SIGN return self._rsassa_pss_sign(M, h, mgf, sLen) else: warning("Key.sign(): Unknown signature type (%s) provided" % t) return None
def _verify_signature(self, pub_key, data, signature): """ ?? ??? ?? :param pub_key: ??? ??? :param data: ?? ?? ??? :param signature: ?? ??? :return: ?? ?? ??(True/False) """ validation_result = False # ???? Type(RSA, ECC)? ?? ?? ?? ?? if isinstance(pub_key, ec.EllipticCurvePublicKeyWithSerialization): # ECDSA ?? logging.debug("Verify ECDSA") try: pub_key.verify( signature, data, ec.ECDSA(hashes.SHA256()) ) validation_result = True except InvalidSignature: logging.debug("InvalidSignature_ECDSA") elif isinstance(pub_key, rsa.RSAPublicKeyWithSerialization): # RSA ?? logging.debug("Verify RSA") try: pub_key.verify( signature, data, padding.PSS( mgf=padding.MGF1(hashes.SHA256()), salt_length=padding.PSS.MAX_LENGTH ), hashes.SHA256() ) validation_result = True except InvalidSignature: logging.debug('InvalidSignature_RSA') else: logging.debug("Unknown PublicKey Type : %s", type(pub_key)) return validation_result