我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用cryptography.exceptions.InvalidSignature()。
def verify(self, public_key, message, signature): """ECDSA verify signature. :param public_key: Signing public key :param message: Origin message :param signature: Signature of message :Returns: verify result boolean, True means valid """ if not (self._check_malleability(signature)): return False verifier = public_key.verifier(signature, ec.ECDSA(self.sign_hash_algorithm)) verifier.update(message) try: verifier.verify() except InvalidSignature: return False except Exception as e: raise e return True
def verify_hmac(expected_result, secret_key, unique_prefix, data): ''' Perform a HMAC using the secret key, unique hash prefix, and data, and verify that the result of: HMAC-SHA256(secret_key, unique_prefix | data) matches the bytes in expected_result. The key must be kept secret. The prefix ensures hash uniqueness. Returns True if the signature matches, and False if it does not. ''' # If the secret key is shorter than the digest size, security is reduced assert secret_key assert len(secret_key) >= CryptoHash.digest_size h = hmac.HMAC(bytes(secret_key), CryptoHash(), backend=default_backend()) h.update(bytes(unique_prefix)) h.update(bytes(data)) try: h.verify(bytes(expected_result)) return True except InvalidSignature: return False
def handshake(self): if self.id: raise HandshakeError('Handshake already done.') challenge = _generate_challenge() query = {'handshake': 'challenge', 'challenge': challenge} yield Effect(EHandshakeSend(ejson_dumps(query))) raw_resp = yield Effect(EHandshakeRecv()) try: resp = ejson_loads(raw_resp) except (TypeError, json.JSONDecodeError): error = HandshakeError('Invalid challenge response format') yield Effect(EHandshakeSend(error.to_raw())) raise error resp = HandshakeAnswerSchema().load(resp) claimed_identity = resp['identity'] try: pubkey = yield Effect(EPubKeyGet(claimed_identity)) pubkey.verify(resp['answer'], challenge.encode()) yield Effect(EHandshakeSend('{"status": "ok", "handshake": "done"}')) self.id = claimed_identity except (TypeError, PubKeyNotFound, InvalidSignature): error = HandshakeError('Invalid signature, challenge or identity') yield Effect(EHandshakeSend(error.to_raw())) raise error
def sign_test(self): """???/??? ?? ? ?? ??? :return: ??? ??(True/False) """ if self.is_secure is False: logging.debug("CA is not secure_mode") return False data = b"test" signature = self.__ca_pri.sign( data, ec.ECDSA(hashes.SHA256()) ) try: pub_key = self.__ca_cert.public_key() return pub_key.verify( signature, data, ec.ECDSA(hashes.SHA256()) ) except InvalidSignature: logging.debug("cert test fail!!!") return False
def verify(self, msg, key, sig): verifier = key.verifier( sig, padding.PSS( mgf=padding.MGF1(self.hash_alg()), salt_length=self.hash_alg.digest_size ), self.hash_alg() ) verifier.update(msg) try: verifier.verify() return True except InvalidSignature: return False
def verify_ssh_sig(self, data, msg): if msg.get_text() != self.ecdsa_curve.key_format_identifier: return False sig = msg.get_binary() sigR, sigS = self._sigdecode(sig) signature = encode_dss_signature(sigR, sigS) verifier = self.verifying_key.verifier( signature, ec.ECDSA(self.ecdsa_curve.hash_object()) ) verifier.update(data) try: verifier.verify() except InvalidSignature: return False else: return True
def verify_ssh_sig(self, data, msg): if msg.get_text() != 'ssh-rsa': return False key = self.key if isinstance(key, rsa.RSAPrivateKey): key = key.public_key() verifier = key.verifier( signature=msg.get_binary(), padding=padding.PKCS1v15(), algorithm=hashes.SHA1(), ) verifier.update(data) try: verifier.verify() except InvalidSignature: return False else: return True
def rsa(public_key, signature, message): """Verifies an RSA signature. Args: public_key (str): Public key with BEGIN and END sections. signature (str): Hex value of the signature with its leading 0x stripped. message (str): Message that was signed, unhashed. """ try: public_rsa = load_pem_public_key(bytes(public_key), backend=default_backend()) hashed = util.sha256(message) public_rsa.verify( binascii.unhexlify(signature), hashed, padding.PSS( mgf=padding.MGF1(hashes.SHA256()), salt_length=padding.PSS.MAX_LENGTH ), hashes.SHA256() ) except InvalidSignature: raise Exception('Invalid signature')
def _decrypt_cryptography(cls, b_ciphertext, b_crypted_hmac, b_key1, b_key2, b_iv): # b_key1, b_key2, b_iv = self._gen_key_initctr(b_password, b_salt) # EXIT EARLY IF DIGEST DOESN'T MATCH hmac = HMAC(b_key2, hashes.SHA256(), CRYPTOGRAPHY_BACKEND) hmac.update(b_ciphertext) try: hmac.verify(unhexlify(b_crypted_hmac)) except InvalidSignature as e: raise AnsibleVaultError('HMAC verification failed: %s' % e) cipher = C_Cipher(algorithms.AES(b_key1), modes.CTR(b_iv), CRYPTOGRAPHY_BACKEND) decryptor = cipher.decryptor() unpadder = padding.PKCS7(128).unpadder() b_plaintext = unpadder.update( decryptor.update(b_ciphertext) + decryptor.finalize() ) + unpadder.finalize() return b_plaintext
def _rsa_verify_sig(sig_value, formatted, public_key_jsonld): """ - sig_value: data to be verified - public_key: creator of this document's public_key - tbv: to be verified """ # TODO: Support other formats than just PEM public_key = serialization.load_pem_public_key( _get_value(public_key_jsonld, "publicKeyPem").encode("utf-8"), backend=default_backend()) try: public_key.verify( base64.b64decode(sig_value.encode("utf-8")), formatted, padding.PSS( mgf=padding.MGF1(hashes.SHA256()), salt_length=padding.PSS.MAX_LENGTH), hashes.SHA256()) return True except InvalidSignature: return False # In the future, we'll be doing a lot more work based on what suite is # selected.
def verify(self, res): """Verify response from server Taken from https://github.com/madeddie/python-bunq - Thanks! :param res: request to be verified :type res: requests.models.Response """ if not self.server_pubkey: print('No server public key defined, skipping verification') return serv_headers = [ 'X-Bunq-Client-Request-Id', 'X-Bunq-Client-Response-Id' ] msg = '%s\n%s\n\n%s' % ( res.status_code, '\n'.join( ['%s: %s' % (k, v) for k, v in sorted( res.headers.items() ) if k in serv_headers] ), res.text ) signature = base64.b64decode(res.headers['X-Bunq-Server-Signature']) try: self.server_pubkey_pem.verify( signature, msg.encode(), padding.PKCS1v15(), hashes.SHA256() ) except InvalidSignature: print('Message failed verification, data might be tampered with') return False else: return True
def is_issuer(issuing_certificate, issued_certificate): """Determine if the issuing cert is the parent of the issued cert. Determine if the issuing certificate is the parent of the issued certificate by: * conducting subject and issuer name matching, and * verifying the signature of the issued certificate with the issuing certificate's public key :param issuing_certificate: the cryptography certificate object that is the potential parent of the issued certificate :param issued_certificate: the cryptography certificate object that is the potential child of the issuing certificate :return: True if the issuing certificate is the parent of the issued certificate, False otherwise. """ if (issuing_certificate is None) or (issued_certificate is None): return False elif issuing_certificate.subject != issued_certificate.issuer: return False else: try: verify_certificate_signature( issuing_certificate, issued_certificate ) except cryptography_exceptions.InvalidSignature: # If verification fails, an exception is expected. return False return True
def verify_certificate_signature(signing_certificate, certificate): """Verify that the certificate was signed correctly. :param signing_certificate: the cryptography certificate object used to sign the certificate :param certificate: the cryptography certificate object that was signed by the signing certificate :raises: cryptography.exceptions.InvalidSignature if certificate signature verification fails. """ signature_hash_algorithm = certificate.signature_hash_algorithm signature_bytes = certificate.signature signer_public_key = signing_certificate.public_key() if isinstance(signer_public_key, rsa.RSAPublicKey): verifier = signer_public_key.verifier( signature_bytes, padding.PKCS1v15(), signature_hash_algorithm ) elif isinstance(signer_public_key, ec.EllipticCurvePublicKey): verifier = signer_public_key.verifier( signature_bytes, ec.ECDSA(signature_hash_algorithm) ) else: verifier = signer_public_key.verifier( signature_bytes, signature_hash_algorithm ) verifier.update(certificate.tbs_certificate_bytes) verifier.verify()
def test_verify_signature_bad_signature(self, mock_get_pub_key): data = b'224626ae19824466f2a7f39ab7b80f7f' mock_get_pub_key.return_value = TEST_RSA_PRIVATE_KEY.public_key() img_sig_cert_uuid = 'fea14bc2-d75f-4ba5-bccc-b5c924ad0693' verifier = signature_utils.get_verifier(None, img_sig_cert_uuid, 'SHA-256', 'BLAH', signature_utils.RSA_PSS) verifier.update(data) self.assertRaises(crypto_exceptions.InvalidSignature, verifier.verify)
def unsign(self, signed_value, ttl=None): """ Retrieve original value and check it wasn't signed more than max_age seconds ago. :type signed_value: bytes :type ttl: int | datetime.timedelta """ h_size, d_size = struct.calcsize('>cQ'), self.digest.digest_size fmt = '>cQ%ds%ds' % (len(signed_value) - h_size - d_size, d_size) try: version, timestamp, value, sig = struct.unpack(fmt, signed_value) except struct.error: raise BadSignature('Signature is not valid') if version != self.version: raise BadSignature('Signature version not supported') if ttl is not None: if isinstance(ttl, datetime.timedelta): ttl = ttl.total_seconds() # Check timestamp is not older than ttl age = abs(time.time() - timestamp) if age > ttl + _MAX_CLOCK_SKEW: raise SignatureExpired('Signature age %s > %s seconds' % (age, ttl)) try: self.signature(signed_value[:-d_size]).verify(sig) except InvalidSignature: raise BadSignature( 'Signature "%s" does not match' % binascii.b2a_base64(sig)) return value
def _verify(self, message, signature): try: self._public_key.verify(signature, message, crypto_padding.PKCS1v15(), crypto_hashes.SHA256()) return True except crypto_exceptions.InvalidSignature: return False
def __load_cert(self, cert_dir): """???/??? ?? ? ?? ??? :param cert_dir: ???/??? ?? ?? :return: X509 ??? """ logging.debug("Cert/Key loading...") cert_file = join(cert_dir, "cert.pem") pri_file = join(cert_dir, "key.pem") f = open(cert_file, "rb") cert_bytes = f.read() f.close() cert = x509.load_pem_x509_certificate(cert_bytes, default_backend()) f = open(pri_file, "rb") pri_bytes = f.read() f.close() try: pri = serialization.load_pem_private_key(pri_bytes, self.__PASSWD, default_backend()) except ValueError: logging.debug("Invalid Password(%s)", cert_dir) return None data = b"test" signature = pri.sign(data, ec.ECDSA(hashes.SHA256())) try: pub_key = cert.public_key() result = pub_key.verify(signature, data, ec.ECDSA(hashes.SHA256())) except InvalidSignature: logging.debug("sign test fail") result = False if result: return cert else: logging.error("result is False ") return None
def verify_certificate(self, peer_cert): """ ??? ?? :param peer_cert: ????? :return: ???? """ # ??? ???? ?? not_after = peer_cert.not_valid_after now = datetime.datetime.now() if not_after < now: logging.error("Certificate is Expired") return False # ??? ?? ?? ca_pub = self.__ca_cert.public_key() signature = peer_cert.signature data = peer_cert.tbs_certificate_bytes validation_result = False try: ca_pub.verify( signature=signature, data=data, signature_algorithm=ec.ECDSA(hashes.SHA256()) ) validation_result = True except InvalidSignature: logging.debug("InvalidSignatureException") return validation_result
def verify_data_with_publickey(public_key, data: bytes, signature: bytes, is_hash: bool=False) -> bool: """??? DATA ?? :param public_key: ??? ??? :param data: ?? ?? ?? :param signature: ?? ??? :param is_hash: ?? hashed ??(True/False :return: ?? ?? ??(True/False) """ hash_algorithm = hashes.SHA256() if is_hash: hash_algorithm = utils.Prehashed(hash_algorithm) if isinstance(public_key, ec.EllipticCurvePublicKeyWithSerialization): try: public_key.verify( signature=signature, data=data, signature_algorithm=ec.ECDSA(hash_algorithm) ) return True except InvalidSignature: logging.debug("InvalidSignatureException_ECDSA") elif isinstance(public_key, rsa.RSAPublicKeyWithSerialization): try: public_key.verify( signature, data, padding.PKCS1v15(), hash_algorithm ) return True except InvalidSignature: logging.debug("InvalidSignatureException_RSA") else: logging.debug("Unknown PublicKey Type : %s", type(public_key)) return False
def verify(self, res): """Verify response from server Taken from https://github.com/madeddie/python-bunq - Thanks! :param res: request to be verified :type res: requests.models.Response """ if not self.server_pubkey: print('No server public key defined, skipping verification') return True serv_headers = [ 'X-Bunq-Client-Request-Id', 'X-Bunq-Client-Response-Id' ] msg = '%s\n%s\n\n%s' % ( res.status_code, '\n'.join( ['%s: %s' % (k, v) for k, v in sorted( res.headers.items() ) if k in serv_headers] ), res.text ) signature = base64.b64decode(res.headers['X-Bunq-Server-Signature']) try: self.server_pubkey_pem.verify( signature, msg.encode(), padding.PKCS1v15(), hashes.SHA256() ) except InvalidSignature: print('Message failed verification, data might be tampered with') return False else: return True
def verify(self, msg, key, sig): verifier = key.verifier( sig, padding.PKCS1v15(), self.hash_alg() ) verifier.update(msg) try: verifier.verify() return True except InvalidSignature: return False
def verify(self, msg, key, sig): try: der_sig = raw_to_der_signature(sig, key.curve) except ValueError: return False verifier = key.verifier(der_sig, ec.ECDSA(self.hash_alg())) verifier.update(msg) try: verifier.verify() return True except InvalidSignature: return False
def verify(self, signature, data, hash_context): if not isinstance(hash_context, hashes.HashContext): raise TypeError("hash_context must be an instance of hashes.HashContext.") size = self.public_numbers.parameter_numbers.q.bit_length() // 8 r, s = (bytes_to_long(value) for value in read_content(signature, '{0}s{0}s'.format(size))) # r, s = (bytes_to_long(value) for value in read_content(signature, '20s20s')) verifier = self._key.verifier(encode_dss_signature(r, s), hashes.SHA256()) verifier._hash_ctx = hash_context verifier.update(data) try: verifier.verify() except InvalidSignature: raise ValueError("invalid signature")
def verifyCert(cert, signCert): """ Verifies that a certificate has been signed with another. Note that this function only verifies the cryptographic signature and is probably wrong and dangerous. Do not use it to verify certificates. This function only supports ECDSA and RSA+PKCS1 signatures, all other signature types will fail. :param cert: The certificate whose signature we want to verify as a cryptography certificate object. :param signCert: The certificate that was used to sign the first certificate as a cryptography certificate object. :return: True if the signature is a valid ECDSA signature, False otherwise. """ # FIXME: This is very likely wrong and we should find a better way to verify certs. halg = cert.signature_hash_algorithm sig = cert.signature data = cert.tbs_certificate_bytes pubKey = signCert.public_key() alg = None # We only support ECDSA and RSA+PKCS1 if isinstance(pubKey, ec.EllipticCurvePublicKey): alg = ec.ECDSA(halg) ver = pubKey.verifier(sig, alg) elif isinstance(pubKey, rsa.RSAPublicKey): pad = padding.PKCS1v15() ver = pubKey.verifier(sig, pad, halg) else: return False ver.update(data) try: ver.verify() return True except InvalidSignature as e: return False
def _verify_single(self, token, verifying_key): """ Verify a compact-formatted JWT signed by a single key Return True if authentic Return False if not """ # grab the token parts header, payload, raw_signature, signing_input = _unpack_token_compact(token) # load the verifying key verifying_key = load_verifying_key(verifying_key, self.crypto_backend) # convert the raw_signature to DER format der_signature = raw_to_der_signature( raw_signature, verifying_key.curve) # initialize the verifier verifier = self._get_verifier(verifying_key, der_signature) verifier.update(signing_input) # check to see whether the signature is valid try: verifier.verify() except InvalidSignature: # raise DecodeError('Signature verification failed') return False return True
def verify(self, signature, msg): """ Verify whether a given signature is correct for a message. :param signature: the given signature :param msg: the given message """ length = len(signature) / 2 r = signature[:length] # remove all "\x00" prefixes while r and r[0] == "\x00": r = r[1:] # prepend "\x00" when the most significant bit is set if ord(r[0]) & 128: r = "\x00" + r s = signature[length:] # remove all "\x00" prefixes while s and s[0] == "\x00": s = s[1:] # prepend "\x00" when the most significant bit is set if ord(s[0]) & 128: s = "\x00" + s # turn back into int r = int(hexlify(r), 16) s = int(hexlify(s), 16) # verify try: self.ec.verifier(encode_dss_signature(r, s), ec.ECDSA(hashes.SHA1())) return True except InvalidSignature: return False
def __verify_signature(self, signature, signer_pk, content): """ Verify RSASSA-PSS signature @developer: vsmysle :param signature: signature bytes to verify :param signer_pk: instance of cryptography.hazmat.primitives. rsa.RSAPublicKey that is a public key of a signer :param content: content to verify a signature of :return: bool verification result """ self.logger.debug("starting signature verification routine") try: signer_pk.verify( signature, content, asym_padding.PSS( mgf=asym_padding.MGF1(SHA1()), salt_length=asym_padding.PSS.MAX_LENGTH ), SHA1() ) except InvalidSignature: self.logger.warn("signature verification failed") return False self.logger.info("signature OK") return True
def verify(self, msg, key, sig): try: key.verify(sig, msg, padding.PKCS1v15(), self.hash_alg()) return True except InvalidSignature: return False
def verify(self, msg, key, sig): try: der_sig = raw_to_der_signature(sig, key.curve) except ValueError: return False try: key.verify(der_sig, msg, ec.ECDSA(self.hash_alg())) return True except InvalidSignature: return False
def verify(self, msg, key, sig): try: key.verify( sig, msg, padding.PSS( mgf=padding.MGF1(self.hash_alg()), salt_length=self.hash_alg.digest_size ), self.hash_alg() ) return True except InvalidSignature: return False
def verify_ssh_sig(self, data, msg): if len(msg.asbytes()) == 40: # spies.com bug: signature has no header sig = msg.asbytes() else: kind = msg.get_text() if kind != 'ssh-dss': return 0 sig = msg.get_binary() # pull out (r, s) which are NOT encoded as mpints sigR = util.inflate_long(sig[:20], 1) sigS = util.inflate_long(sig[20:], 1) signature = encode_dss_signature(sigR, sigS) key = dsa.DSAPublicNumbers( y=self.y, parameter_numbers=dsa.DSAParameterNumbers( p=self.p, q=self.q, g=self.g ) ).public_key(backend=default_backend()) verifier = key.verifier(signature, hashes.SHA1()) verifier.update(data) try: verifier.verify() except InvalidSignature: return False else: return True
def event_rsa(event_json): """Verifies the RSA signature of the provided event json. Args: event_json (dict): Event to validate. """ try: rsa(util.expand_rsa_public_key(event_json['fleet_key']), event_json['signature'], util.concat_event(event_json)) except InvalidSignature: raise Exception('Invalid RSA signature')
def verify(self, msg, sig, key=None): if key is None: key = self.key try: if isinstance(key, rsa.RSAPrivateKey): key = key.public_key() key.verify(sig, msg, PKCS1v15(), self.digest) return True except InvalidSignature: return False
def validate(self, message): """Verify the signature of self RSA fulfillment. The signature of self RSA fulfillment is verified against the provided message and the condition's public modulus. Args: message (bytes): Message to verify. Returns: bool: Whether self fulfillment is valid. """ if not isinstance(message, bytes): raise Exception( 'Message must be provided as bytes, was: ' + message) public_numbers = RSAPublicNumbers( PUBLIC_EXPONENT, int.from_bytes(self.modulus, byteorder='big'), ) public_key = public_numbers.public_key(default_backend()) verifier = public_key.verifier( self.signature, padding.PSS( mgf=padding.MGF1(hashes.SHA256()), salt_length=SALT_LENGTH, ), hashes.SHA256() ) verifier.update(message) try: verifier.verify() except InvalidSignature as exc: raise ValidationError('Invalid RSA signature') from exc return True
def verify_ssh_sig(self, data, msg): if len(msg.asbytes()) == 40: # spies.com bug: signature has no header sig = msg.asbytes() else: kind = msg.get_text() if kind != 'ssh-dss': return 0 sig = msg.get_binary() # pull out (r, s) which are NOT encoded as mpints sigR = paramiko.util.inflate_long(sig[:20], 1) sigS = paramiko.util.inflate_long(sig[20:], 1) signature = encode_dss_signature(sigR, sigS) key = dsa.DSAPublicNumbers( y=self.y, parameter_numbers=dsa.DSAParameterNumbers( p=self.p, q=self.q, g=self.g ) ).public_key(backend=default_backend()) verifier = key.verifier(signature, hashes.SHA1()) verifier.update(data) try: verifier.verify() except InvalidSignature: return False else: return True
def verify(self, message, signature, hash_algorithm=None): if hash_algorithm is None: hash_algorithm = hashes.SHA256() padding_algorithm = padding.PKCS1v15() try: verifyer = self._value.verifier(signature, padding_algorithm, hash_algorithm) verifyer.update(message) verifyer.verify() return True except exceptions.InvalidSignature as e: pass raise VerificationError(e)
def verify(self, caller: NodeId, signature: str, message: bytes, sender_id: NodeId) -> bool: public_key = self._private_keys[sender_id].public_key() try: public_key.verify( base64.b64decode(signature), message, padding.PSS( mgf=padding.MGF1(hashes.SHA256()), salt_length=padding.PSS.MAX_LENGTH ), hashes.SHA256() ) return True except InvalidSignature: return False
def verifyMac(self, data, checkMac): try: self.verifier( checkMac, data, # PSS( # mgf=MGF1(hashes.SHA256()), # salt_length=PSS.MAX_LENGTH # ), PKCS1v15(), hashes.SHA256() ) return True except InvalidSignature: return False