我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用cryptography.hazmat.primitives.asymmetric.padding.PKCS1v15()。
def sign(self, msg): """Create signature for message Taken from https://github.com/madeddie/python-bunq - Thanks! :param msg: data to be signed, usually action, headers and body :type msg: str """ return base64.b64encode( self.privkey_pem.sign( msg.encode(), padding.PKCS1v15(), hashes.SHA256() ) ).decode()
def verify(self, buffer, alg, signature): sig_alg = hash_.rsa_signature_algorithm_by_name(alg) verifier = self.key.verifier( signature, padding.PKCS1v15(), sig_alg.hasher() ) while True: d = buffer.read(1024).encode() if not d: break verifier.update(d) try: verifier.verify() except Exception as e: raise e return True
def sign_request(key, header, protected_header, payload): """ Creates a JSON Web Signature for the request header and payload using the specified account key. """ protected = jose_b64(json.dumps(protected_header).encode('utf8')) payload = jose_b64(json.dumps(payload).encode('utf8')) signer = key.signer(padding.PKCS1v15(), hashes.SHA256()) signer.update(protected.encode('ascii')) signer.update(b'.') signer.update(payload.encode('ascii')) return json.dumps({ 'header': header, 'protected': protected, 'payload': payload, 'signature': jose_b64(signer.finalize()), })
def sign_data(self, data: bytes) -> bytes: """ CA ???? DATA ?? :param data: ?? ?? ?? :return: ?? """ if isinstance(self.__ca_pri, ec.EllipticCurvePrivateKeyWithSerialization): signer = self.__ca_pri.signer(ec.ECDSA(hashes.SHA256())) signer.update(data) return signer.finalize() elif isinstance(self.__ca_pri, rsa.RSAPrivateKeyWithSerialization): return self.__ca_pri.sign( data, padding.PKCS1v15(), hashes.SHA256() ) else: logging.debug("Unknown PrivateKey Type : %s", type(self.__ca_pri)) return None
def get_channel(self): #first get a token we need to sign in order to prove we are who we say we are r = requests.get(str(self.base_link_uri) + "/get_device_token", params={"UUID": self.uuid, }) token = r.json()["token"] # get the private Key with open(CloudLinkSettings.PRIVATE_KEY_LOCATION,'r') as key_file: private_key = serialization.load_pem_private_key(key_file.read(), password=CloudLinkSettings.PRIVATE_KEY_PASSPHRASE, backend=default_backend()) # sign the token with our private key signer = private_key.signer(padding.PKCS1v15(), hashes.SHA256()) signer.update(bytes(token)) signature = signer.finalize() # get the randomly assigned channel for my UUID r = requests.get(str(self.base_link_uri) + "/get_device_group", params={"UUID": self.uuid, "signature": b64encode(signature), "format": "PKCS1_v1_5"}) if r.ok: self.channel_uri = r.json()["channel"] elif r.status_code == 400: raise Exception("UUID or Token not registered with Cloud.") elif r.status_code == 403: raise Exception("Signature didn't verify correctly. Bad private key or signature.")
def verify_ssh_sig(self, data, msg): if msg.get_text() != 'ssh-rsa': return False key = self.key if isinstance(key, rsa.RSAPrivateKey): key = key.public_key() verifier = key.verifier( signature=msg.get_binary(), padding=padding.PKCS1v15(), algorithm=hashes.SHA1(), ) verifier.update(data) try: verifier.verify() except InvalidSignature: return False else: return True
def encrypt(pubkey, password): """Encrypt password using given RSA public key and encode it with base64. The encrypted password can only be decrypted by someone with the private key (in this case, only Travis). """ key = load_key(pubkey) encrypted_password = key.encrypt(password, PKCS1v15()) return base64.b64encode(encrypted_password)
def encrypt_key(key, password): """Encrypt the password with the public key and return an ASCII representation. The public key retrieved from the Travis API is loaded as an RSAPublicKey object using Cryptography's default backend. Then the given password is encrypted with the encrypt() method of RSAPublicKey. The encrypted password is then encoded to base64 and decoded into ASCII in order to convert the bytes object into a string object. Parameters ---------- key: str Travis CI public RSA key that requires deserialization password: str the password to be encrypted Returns ------- encrypted_password: str the base64 encoded encrypted password decoded as ASCII Notes ----- Travis CI uses the PKCS1v15 padding scheme. While PKCS1v15 is secure, it is outdated and should be replaced with OAEP. Example: OAEP(mgf=MGF1(algorithm=SHA256()), algorithm=SHA256(), label=None)) """ public_key = load_pem_public_key(key.encode(), default_backend()) encrypted_password = public_key.encrypt(password, PKCS1v15()) return base64.b64encode(encrypted_password).decode('ascii')
def append_signature(target_file, priv_key, priv_key_password=None): """Append signature to the end of file""" with open(target_file, "rb") as f: contents = f.read() with open(priv_key, "rb") as kf: private_key = serialization.load_pem_private_key( kf.read(), password=priv_key_password, backend=default_backend()) signature = private_key.sign(contents, padding.PKCS1v15(), hashes.SHA512()) with open(target_file, "ab") as f: f.write(signature)
def verify(self, res): """Verify response from server Taken from https://github.com/madeddie/python-bunq - Thanks! :param res: request to be verified :type res: requests.models.Response """ if not self.server_pubkey: print('No server public key defined, skipping verification') return serv_headers = [ 'X-Bunq-Client-Request-Id', 'X-Bunq-Client-Response-Id' ] msg = '%s\n%s\n\n%s' % ( res.status_code, '\n'.join( ['%s: %s' % (k, v) for k, v in sorted( res.headers.items() ) if k in serv_headers] ), res.text ) signature = base64.b64decode(res.headers['X-Bunq-Server-Signature']) try: self.server_pubkey_pem.verify( signature, msg.encode(), padding.PKCS1v15(), hashes.SHA256() ) except InvalidSignature: print('Message failed verification, data might be tampered with') return False else: return True
def sign(self, message): canonicalization = NoFWSCanonicalization() signer = self._key.signer(padding.PKCS1v15(), hashes.SHA1()) headers, body = _rfc822_parse(message) h_field = [] for header, value in headers: if self._signed_headers is None or header in self._signed_headers: h_field.append(header) header, value = canonicalization.canonicalize_header( header, value) signer.update(header) signer.update(b":") signer.update(value) body = canonicalization.canonicalize_body(body) if body: signer.update(b"\r\n") signer.update(body) return _fold( b"DomainKey-Signature: a=rsa-sha1; c=nofws; d={domain}; " b"s={selector}; q=dns; h={headers}; b={signature}".format( domain=self._domain, selector=self._selector, headers=b": ".join(h_field), signature=base64.b64encode(signer.finalize()) )) + b"\r\n"