我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用cryptography.hazmat.primitives.ciphers.Cipher()。
def encrypt(message, receiver_public_key): sender_private_key = ec.generate_private_key(ec.SECP256K1(), backend) shared_key = sender_private_key.exchange(ec.ECDH(), receiver_public_key) sender_public_key = sender_private_key.public_key() point = sender_public_key.public_numbers().encode_point() iv = '000000000000' xkdf = x963kdf.X963KDF( algorithm = hashes.SHA256(), length = 32, sharedinfo = '', backend = backend ) key = xkdf.derive(shared_key) encryptor = Cipher( algorithms.AES(key), modes.GCM(iv), backend = backend ).encryptor() ciphertext = encryptor.update(message) + encryptor.finalize() return point + encryptor.tag + ciphertext
def decrypt(message, receiver_private_key): point = message[0:65] tag = message[65:81] ciphertext = message[81:] sender_public_numbers = ec.EllipticCurvePublicNumbers.from_encoded_point(ec.SECP256K1(), point) sender_public_key = sender_public_numbers.public_key(backend) shared_key = receiver_private_key.exchange(ec.ECDH(), sender_public_key) iv = '000000000000' xkdf = x963kdf.X963KDF( algorithm = hashes.SHA256(), length = 32, sharedinfo = '', backend = backend ) key = xkdf.derive(shared_key) decryptor = Cipher( algorithms.AES(key), modes.GCM(iv,tag), backend = backend ).decryptor() message = decryptor.update(ciphertext) + decryptor.finalize() return message
def _encrypt_from_parts(self, data, current_time, iv): if not isinstance(data, bytes): raise TypeError("data must be bytes.") padder = padding.PKCS7(algorithms.AES.block_size).padder() padded_data = padder.update(data) + padder.finalize() encryptor = Cipher( algorithms.AES(self._encryption_key), modes.CBC(iv), self._backend ).encryptor() ciphertext = encryptor.update(padded_data) + encryptor.finalize() basic_parts = ( b"\x80" + struct.pack(">Q", current_time) + iv + ciphertext ) h = HMAC(self._signing_key, hashes.SHA256(), backend=self._backend) h.update(basic_parts) hmac = h.finalize() return base64.urlsafe_b64encode(basic_parts + hmac)
def encrypt(plaintext: bytes, token: bytes) -> bytes: """Encrypt plaintext with a given token. :param bytes plaintext: Plaintext (json) to encrypt :param bytes token: Token to use :return: Encrypted bytes""" if not isinstance(plaintext, bytes): raise TypeError("plaintext requires bytes") Utils.verify_token(token) key, iv = Utils.key_iv(token) padder = padding.PKCS7(128).padder() padded_plaintext = padder.update(plaintext) + padder.finalize() cipher = Cipher(algorithms.AES(key), modes.CBC(iv), backend=default_backend()) encryptor = cipher.encryptor() return encryptor.update(padded_plaintext) + encryptor.finalize()
def decrypt(ciphertext: bytes, token: bytes) -> bytes: """Decrypt ciphertext with a given token. :param bytes ciphertext: Ciphertext to decrypt :param bytes token: Token to use :return: Decrypted bytes object""" if not isinstance(ciphertext, bytes): raise TypeError("ciphertext requires bytes") Utils.verify_token(token) key, iv = Utils.key_iv(token) cipher = Cipher(algorithms.AES(key), modes.CBC(iv), backend=default_backend()) decryptor = cipher.decryptor() padded_plaintext = decryptor.update(ciphertext) + decryptor.finalize() unpadder = padding.PKCS7(128).unpadder() unpadded_plaintext = unpadder.update(padded_plaintext) unpadded_plaintext += unpadder.finalize() return unpadded_plaintext
def crypt_create(key_bytes, is_encrypt_flag=None, iv_bytes=None, algorithm=ciphers_algorithms.AES, mode=ciphers_modes.CTR): ''' Create and return a crypto context for symmetric key encryption using the key key_bytes. Uses algorithm in mode with an IV of iv_bytes. If iv_bytes is None, an all-zero IV is used. AES CTR mode uses the same operations for encryption and decyption. ''' #print "Key: " + binascii.hexlify(bytes(key_bytes)) algo = algorithm(bytes(key_bytes)) # The block_size is in bits if iv_bytes is None: iv_bytes = get_zero_pad(algo.block_size/BITS_IN_BYTE) cipher = ciphers.Cipher(algo, mode(bytes(iv_bytes)), backend=backends.default_backend()) if is_encrypt_flag: return cipher.encryptor() else: return cipher.decryptor()
def __init__(self, endpoint="http://127.0.0.1:10092/api", encoding="utf-8", enc_key=None, enc_iv=None): self._endpoint = endpoint self._encoding = "utf-8" if enc_key == None or enc_iv == None: self._transport_enc = False self._transport_enc_key = None self._transport_enc_iv = None self._cipher = None else: self._transport_enc = True self._transport_enc_key = enc_key self._transport_enc_iv = enc_iv backend = default_backend() self._cipher = Cipher(algorithms.AES(enc_key), modes.CBC(enc_iv), backend=backend) self._session = requests.Session()
def decryptStorage(path, key): cipherkey = unhexlify(key) with open(path, 'rb') as f: iv = f.read(12) tag = f.read(16) cipher = Cipher(algorithms.AES(cipherkey), modes.GCM(iv, tag), backend=default_backend()) decryptor = cipher.decryptor() data = '' while True: block = f.read(16) # data are not authenticated yet if block: data = data + decryptor.update(block).decode() else: break # throws exception when the tag is wrong data = data + decryptor.finalize().decode() return json.loads(data)
def decryptEntryValue(nonce, val): cipherkey = unhexlify(nonce) iv = val[:12] tag = val[12:28] cipher = Cipher(algorithms.AES(cipherkey), modes.GCM(iv, tag), backend=default_backend()) decryptor = cipher.decryptor() data = '' inputData = val[28:] while True: block = inputData[:16] inputData = inputData[16:] if block: data = data + decryptor.update(block).decode() else: break # throws exception when the tag is wrong data = data + decryptor.finalize().decode() return json.loads(data) # Decrypt give entry nonce
def encode_block(self, block_id, block_data): assert (isinstance(block_id, bytes) and len(block_id) == self.block_id_len) enc = Encoder() enc.encode_bytes(self.magic) iv = os.urandom(self.iv_len) enc.encode_bytes(iv) c = Cipher(algorithms.AES(self.key), modes.GCM(iv), backend=self.backend) e = c.encryptor() e.authenticate_additional_data(block_id) s = e.update(block_data) + e.finalize() assert len(e.tag) == self.tag_len enc.encode_bytes(e.tag) enc.encode_bytes(s) return enc.value
def decode_block(self, block_id, block_data): assert (isinstance(block_id, bytes) and len(block_id) == self.block_id_len) assert isinstance(block_data, bytes) assert len(block_data) > (len(self.magic) + self.iv_len + self.tag_len) dec = Decoder(block_data) # check magic assert dec.decode_bytes(len(self.magic)) == self.magic # get iv iv = dec.decode_bytes(self.iv_len) # get tag tag = dec.decode_bytes(self.tag_len) c = Cipher(algorithms.AES(self.key), modes.GCM(iv, tag), backend=self.backend) d = c.decryptor() d.authenticate_additional_data(block_id) s = d.update(dec.decode_bytes_rest()) + d.finalize() return s
def execute(self, item): if item[1]: url = item[1] + "/" + item[2] else: url = item[2] item[2] = os.path.basename(urllib.parse.urlparse(url).path) if item[3]: backend = default_backend() r = requests.get(item[3].uri) key = r.content cipher = Cipher(algorithms.AES(key), modes.CBC(bytes.fromhex(item[3].iv[2:])), backend=backend) decryptor = cipher.decryptor() r = requests.get(url, stream=True) with open(os.path.join(self.location, item[2]), 'wb') as f: for chunk in r.iter_content(chunk_size=1024): if chunk: if item[3]: f.write(decryptor.update(chunk)) else: f.write(chunk)
def encrypt(data, secret_key, initialisation_vector): """ Encrypts data using the given secret key and initialisation vector. :param bytes data: the plaintext bytes to be encrypted :param bytes secret_key: the key to be used for encryption :param bytes initialisation_vector: the initialisation vector :return: the cipher text and GCM authentication tag tuple :rtype: (bytes, bytes) """ cipher = Cipher(algorithm=algorithms.AES(secret_key), mode=modes.GCM(initialization_vector=initialisation_vector, min_tag_length=16), backend=default_backend()) encryptor = cipher.encryptor() ciphertext = encryptor.update(data) + encryptor.finalize() return ciphertext, encryptor.tag
def decrypt(ciphertext, tag, secret_key, initialisation_vector): """ Decrypts a cipher text using the given GCM authentication tag, secret key and initialisation vector. :param bytes ciphertext: the cipher text to be decrypted :param bytes tag: the GCM authentication tag :param bytes secret_key: the key to be used for encryption :param bytes initialisation_vector: the initialisation vector :return: the decrypted plaintext :rtype: bytes """ cipher = Cipher(algorithm=algorithms.AES(secret_key), mode=modes.GCM(initialization_vector=initialisation_vector, tag=tag), backend=default_backend()) decryptor = cipher.decryptor() return decryptor.update(ciphertext) + decryptor.finalize()
def _getCipher(self, cip, iv, key): """ Creates an initialized cipher object. @param cip: the name of the cipher, maps into cipherMap @param iv: the initialzation vector @param key: the encryption key @return: the cipher object. """ algorithmClass, keySize, modeClass = self.cipherMap[cip] if algorithmClass is None: return _DummyCipher() return Cipher( algorithmClass(key[:keySize]), modeClass(iv[:algorithmClass.block_size // 8]), backend=default_backend(), )
def _getSupportedCiphers(): """ Build a list of ciphers that are supported by the backend in use. @return: a list of supported ciphers. @rtype: L{list} of L{str} """ supportedCiphers = [] cs = [b'aes256-ctr', b'aes256-cbc', b'aes192-ctr', b'aes192-cbc', b'aes128-ctr', b'aes128-cbc', b'cast128-ctr', b'cast128-cbc', b'blowfish-ctr', b'blowfish-cbc', b'3des-ctr', b'3des-cbc'] for cipher in cs: algorithmClass, keySize, modeClass = SSHCiphers.cipherMap[cipher] try: Cipher( algorithmClass(b' ' * keySize), modeClass(b' ' * (algorithmClass.block_size // 8)), backend=default_backend(), ).encryptor() except UnsupportedAlgorithm: pass else: supportedCiphers.append(cipher) return supportedCiphers
def __init__(self, filename, session=None, readers_public_key=None, writers_private_key=None): self.fd = standard.WritableAddressSpace( filename=filename, session=session, mode="w+b") self.session = session self.profile = AgentProfile(session=session) self.cipher = CipherProperties(session=session).generate_keys() self.readers_public_key = readers_public_key self.writers_private_key = writers_private_key # Cipher is encrypted with the reader's public key - only the reader can # read it. It is also signed with the sender's private key. signature = Signature(session=session) cipher_plain_text = self.cipher.to_json() signature.encrypted_cipher = readers_public_key.encrypt( cipher_plain_text) signature.signature = writers_private_key.sign(cipher_plain_text) serialized_signature = signature.to_json() self.write_part(serialized_signature, "EncryptedCipher") self.hmac = hmac.HMAC(self.cipher.hmac_key.RawBytes(), hashes.SHA256(), backend=openssl.backend) self.hmac.update(serialized_signature)
def _wrap_core(wrapping_key, a, r, backend): # RFC 3394 Key Wrap - 2.2.1 (index method) encryptor = Cipher(AES(wrapping_key), ECB(), backend).encryptor() n = len(r) for j in range(6): for i in range(n): # every encryption operation is a discrete 16 byte chunk (because # AES has a 128-bit block size) and since we're using ECB it is # safe to reuse the encryptor for the entire operation b = encryptor.update(a + r[i]) # pack/unpack are safe as these are always 64-bit chunks a = struct.pack( ">Q", struct.unpack(">Q", b[:8])[0] ^ ((n * j) + i + 1) ) r[i] = b[-8:] assert encryptor.finalize() == b"" return a + b"".join(r)
def _unwrap_core(wrapping_key, a, r, backend): # Implement RFC 3394 Key Unwrap - 2.2.2 (index method) decryptor = Cipher(AES(wrapping_key), ECB(), backend).decryptor() n = len(r) for j in reversed(range(6)): for i in reversed(range(n)): # pack/unpack are safe as these are always 64-bit chunks atr = struct.pack( ">Q", struct.unpack(">Q", a)[0] ^ ((n * j) + i + 1) ) + r[i] # every decryption operation is a discrete 16 byte chunk so # it is safe to reuse the decryptor for the entire operation b = decryptor.update(atr) a = b[:8] r[i] = b[-8:] assert decryptor.finalize() == b"" return a, r
def _encrypt_data(self, data): assert self._encryption_key from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes from cryptography.hazmat.backends import default_backend iv = os.urandom(12) encryptor = Cipher( algorithms.AES(self._encryption_key), modes.GCM(iv), backend=default_backend() ).encryptor() ciphertext = encryptor.update(json.dumps(data).encode("UTF-8")) + encryptor.finalize() ciphertext = b"1" + encryptor.tag + iv + ciphertext return standard_b64encode(ciphertext).decode("ASCII")
def __init__(self, algorithm, key, associated_data, iv): """Prepares initial values.""" self.source_key = key # Construct an encryptor object with the given key and a provided IV. # This is intentionally generic to leave an option for non-Cipher encryptor types in the future. self.iv = iv self._encryptor = Cipher( algorithm.encryption_algorithm(key), algorithm.encryption_mode(self.iv), backend=default_backend() ).encryptor() # associated_data will be authenticated but not encrypted, # it must also be passed in on decryption. self._encryptor.authenticate_additional_data(associated_data)
def __init__(self, broker="http://127.0.0.1:19820/api", encoding="utf-8", enc_key=None, enc_iv=None): super().__init__() self._endpoint = broker self._encoding = "utf-8" if enc_key == None or enc_iv == None: self._transport_enc = False self._transport_enc_key = None self._transport_enc_iv = None self._cipher = None else: self._transport_enc = True self._transport_enc_key = enc_key self._transport_enc_iv = enc_iv backend = default_backend() self._cipher = Cipher(algorithms.AES( enc_key), modes.CBC(enc_iv), backend=backend) self._session = requests.Session() self._event_dict = {'logon': self.on_login, 'logoff': self.on_logout, 'ping': self.on_ping, 'query_data': self.on_query_data, 'send_order': self.on_insert_order, 'cancel_order': self.on_cancel_order_event, 'get_quote': self.on_get_quote} self.client_id = '' self.account_id = ''
def aes_key_wrap(wrapping_key, key_to_wrap, backend): if len(wrapping_key) not in [16, 24, 32]: raise ValueError("The wrapping key must be a valid AES key length") if len(key_to_wrap) < 16: raise ValueError("The key to wrap must be at least 16 bytes") if len(key_to_wrap) % 8 != 0: raise ValueError("The key to wrap must be a multiple of 8 bytes") # RFC 3394 Key Wrap - 2.2.1 (index method) encryptor = Cipher(AES(wrapping_key), ECB(), backend).encryptor() a = b"\xa6\xa6\xa6\xa6\xa6\xa6\xa6\xa6" r = [key_to_wrap[i:i + 8] for i in range(0, len(key_to_wrap), 8)] n = len(r) for j in range(6): for i in range(n): # every encryption operation is a discrete 16 byte chunk (because # AES has a 128-bit block size) and since we're using ECB it is # safe to reuse the encryptor for the entire operation b = encryptor.update(a + r[i]) # pack/unpack are safe as these are always 64-bit chunks a = struct.pack( ">Q", struct.unpack(">Q", b[:8])[0] ^ ((n * j) + i + 1) ) r[i] = b[-8:] assert encryptor.finalize() == b"" return a + b"".join(r)
def aes_key_unwrap(wrapping_key, wrapped_key, backend): if len(wrapped_key) < 24: raise ValueError("Must be at least 24 bytes") if len(wrapped_key) % 8 != 0: raise ValueError("The wrapped key must be a multiple of 8 bytes") if len(wrapping_key) not in [16, 24, 32]: raise ValueError("The wrapping key must be a valid AES key length") # Implement RFC 3394 Key Unwrap - 2.2.2 (index method) decryptor = Cipher(AES(wrapping_key), ECB(), backend).decryptor() aiv = b"\xa6\xa6\xa6\xa6\xa6\xa6\xa6\xa6" r = [wrapped_key[i:i + 8] for i in range(0, len(wrapped_key), 8)] a = r.pop(0) n = len(r) for j in reversed(range(6)): for i in reversed(range(n)): # pack/unpack are safe as these are always 64-bit chunks atr = struct.pack( ">Q", struct.unpack(">Q", a)[0] ^ ((n * j) + i + 1) ) + r[i] # every decryption operation is a discrete 16 byte chunk so # it is safe to reuse the decryptor for the entire operation b = decryptor.update(atr) a = b[:8] r[i] = b[-8:] assert decryptor.finalize() == b"" if not bytes_eq(a, aiv): raise InvalidUnwrap() return b"".join(r)
def _cipher_aes_key(value, secret, salt, cost, decrypt=False): """ Internal helper for :meth:`encrypt_key` -- handles lowlevel encryption/decryption. Algorithm details: This function uses PBKDF2-HMAC-SHA256 to generate a 32-byte AES key and a 16-byte IV from the application secret & random salt. It then uses AES-256-CTR to encrypt/decrypt the TOTP key. CTR mode was chosen over CBC because the main attack scenario here is that the attacker has stolen the database, and is trying to decrypt a TOTP key (the plaintext value here). To make it hard for them, we want every password to decrypt to a potentially valid key -- thus need to avoid any authentication or padding oracle attacks. While some random padding construction could be devised to make this work for CBC mode, a stream cipher mode is just plain simpler. OFB/CFB modes would also work here, but seeing as they have malleability and cyclic issues (though remote and barely relevant here), CTR was picked as the best overall choice. """ # make sure backend AES support is available if _cg_ciphers is None: raise RuntimeError("TOTP encryption requires 'cryptography' package " "(https://cryptography.io)") # use pbkdf2 to derive both key (32 bytes) & iv (16 bytes) # NOTE: this requires 2 sha256 blocks to be calculated. keyiv = pbkdf2_hmac("sha256", secret, salt=salt, rounds=(1 << cost), keylen=48) # use AES-256-CTR to encrypt/decrypt input value cipher = _cg_ciphers.Cipher(_cg_ciphers.algorithms.AES(keyiv[:32]), _cg_ciphers.modes.CTR(keyiv[32:]), _cg_default_backend()) ctx = cipher.decryptor() if decrypt else cipher.encryptor() return ctx.update(value) + ctx.finalize()
def decrypt_ztoken(ztoken): """Decrypt the given ztoken, used by apple.""" if len(ztoken) <= 32: return ztoken keystring = '00000000000000000000000000000000' key = bytes.fromhex(keystring) cipher = Cipher(algorithms.AES(key), modes.ECB(), backend=default_backend()) decryptor = cipher.decryptor() token = decryptor.update(bytes.fromhex(ztoken[:64])) \ + decryptor.finalize() return token.decode()
def decrypt(self, ctext): iv = ctext[:AES_BLOCK_SIZE] ctext = ctext[AES_BLOCK_SIZE:] cipher = Cipher(algorithms.AES(self.cipher_key), modes.CBC(iv), backend=default_backend()) unpadder = padding.PKCS7(AES_BLOCK_SIZE * 8).unpadder() decryptor = cipher.decryptor() data = decryptor.update(ctext) + decryptor.finalize() return unpadder.update(data) + unpadder.finalize()
def encrypt(self, clear_text): iv = os.urandom(AES_BLOCK_SIZE) cipher = Cipher(algorithms.AES(self.cipher_key), modes.CBC(iv), backend=default_backend()) padder = padding.PKCS7(AES_BLOCK_SIZE * 8).padder() encryptor = cipher.encryptor() padded_data = padder.update(clear_text) + padder.finalize() return iv + encryptor.update(padded_data) + encryptor.finalize()