我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用cryptography.hazmat.primitives.ciphers.algorithms.AES。
def encrypt(message, receiver_public_key): sender_private_key = ec.generate_private_key(ec.SECP256K1(), backend) shared_key = sender_private_key.exchange(ec.ECDH(), receiver_public_key) sender_public_key = sender_private_key.public_key() point = sender_public_key.public_numbers().encode_point() iv = '000000000000' xkdf = x963kdf.X963KDF( algorithm = hashes.SHA256(), length = 32, sharedinfo = '', backend = backend ) key = xkdf.derive(shared_key) encryptor = Cipher( algorithms.AES(key), modes.GCM(iv), backend = backend ).encryptor() ciphertext = encryptor.update(message) + encryptor.finalize() return point + encryptor.tag + ciphertext
def decrypt(message, receiver_private_key): point = message[0:65] tag = message[65:81] ciphertext = message[81:] sender_public_numbers = ec.EllipticCurvePublicNumbers.from_encoded_point(ec.SECP256K1(), point) sender_public_key = sender_public_numbers.public_key(backend) shared_key = receiver_private_key.exchange(ec.ECDH(), sender_public_key) iv = '000000000000' xkdf = x963kdf.X963KDF( algorithm = hashes.SHA256(), length = 32, sharedinfo = '', backend = backend ) key = xkdf.derive(shared_key) decryptor = Cipher( algorithms.AES(key), modes.GCM(iv,tag), backend = backend ).decryptor() message = decryptor.update(ciphertext) + decryptor.finalize() return message
def _encrypt_from_parts(self, data, current_time, iv): if not isinstance(data, bytes): raise TypeError("data must be bytes.") padder = padding.PKCS7(algorithms.AES.block_size).padder() padded_data = padder.update(data) + padder.finalize() encryptor = Cipher( algorithms.AES(self._encryption_key), modes.CBC(iv), self._backend ).encryptor() ciphertext = encryptor.update(padded_data) + encryptor.finalize() basic_parts = ( b"\x80" + struct.pack(">Q", current_time) + iv + ciphertext ) h = HMAC(self._signing_key, hashes.SHA256(), backend=self._backend) h.update(basic_parts) hmac = h.finalize() return base64.urlsafe_b64encode(basic_parts + hmac)
def encrypt(plaintext: bytes, token: bytes) -> bytes: """Encrypt plaintext with a given token. :param bytes plaintext: Plaintext (json) to encrypt :param bytes token: Token to use :return: Encrypted bytes""" if not isinstance(plaintext, bytes): raise TypeError("plaintext requires bytes") Utils.verify_token(token) key, iv = Utils.key_iv(token) padder = padding.PKCS7(128).padder() padded_plaintext = padder.update(plaintext) + padder.finalize() cipher = Cipher(algorithms.AES(key), modes.CBC(iv), backend=default_backend()) encryptor = cipher.encryptor() return encryptor.update(padded_plaintext) + encryptor.finalize()
def decrypt(ciphertext: bytes, token: bytes) -> bytes: """Decrypt ciphertext with a given token. :param bytes ciphertext: Ciphertext to decrypt :param bytes token: Token to use :return: Decrypted bytes object""" if not isinstance(ciphertext, bytes): raise TypeError("ciphertext requires bytes") Utils.verify_token(token) key, iv = Utils.key_iv(token) cipher = Cipher(algorithms.AES(key), modes.CBC(iv), backend=default_backend()) decryptor = cipher.decryptor() padded_plaintext = decryptor.update(ciphertext) + decryptor.finalize() unpadder = padding.PKCS7(128).unpadder() unpadded_plaintext = unpadder.update(padded_plaintext) unpadded_plaintext += unpadder.finalize() return unpadded_plaintext
def crypt_create(key_bytes, is_encrypt_flag=None, iv_bytes=None, algorithm=ciphers_algorithms.AES, mode=ciphers_modes.CTR): ''' Create and return a crypto context for symmetric key encryption using the key key_bytes. Uses algorithm in mode with an IV of iv_bytes. If iv_bytes is None, an all-zero IV is used. AES CTR mode uses the same operations for encryption and decyption. ''' #print "Key: " + binascii.hexlify(bytes(key_bytes)) algo = algorithm(bytes(key_bytes)) # The block_size is in bits if iv_bytes is None: iv_bytes = get_zero_pad(algo.block_size/BITS_IN_BYTE) cipher = ciphers.Cipher(algo, mode(bytes(iv_bytes)), backend=backends.default_backend()) if is_encrypt_flag: return cipher.encryptor() else: return cipher.decryptor()
def crypt_bytes_key(key_bytes, data_bytes, is_encrypt_flag=None, iv_bytes=None, algorithm=ciphers_algorithms.AES, mode=ciphers_modes.CTR): ''' Use symmetric key encryption to encrypt or decrypt data_bytes with key_bytes. Returns the crypted data_bytes. See crypt_create() and crypt_bytes_context() for details. ''' crypt_context = crypt_create(key_bytes, is_encrypt_flag=is_encrypt_flag, iv_bytes=iv_bytes, algorithm=algorithm, mode=mode) (_, crypt_bytes) = crypt_bytes_context(crypt_context, data_bytes) return crypt_bytes
def decrypt(data, password): """Decrypts data using the password. Decrypts the data using the provided password using the cryptography module. If the pasword or data is incorrect this will return None. """ password = bytes(password) #Salt is equal to password as we want the encryption to be reversible only #using the password itself kdf = PBKDF2HMAC(algorithm=hashes.AES(), length=32, salt=bytes(password), iterations=100000, backend=default_backend()) key = base64.urlsafe_b64encode(kdf.derive(password)) f = Fernet(key) token = f.decrypt(data) return token
def __init__(self, endpoint="http://127.0.0.1:10092/api", encoding="utf-8", enc_key=None, enc_iv=None): self._endpoint = endpoint self._encoding = "utf-8" if enc_key == None or enc_iv == None: self._transport_enc = False self._transport_enc_key = None self._transport_enc_iv = None self._cipher = None else: self._transport_enc = True self._transport_enc_key = enc_key self._transport_enc_iv = enc_iv backend = default_backend() self._cipher = Cipher(algorithms.AES(enc_key), modes.CBC(enc_iv), backend=backend) self._session = requests.Session()
def decryptStorage(path, key): cipherkey = unhexlify(key) with open(path, 'rb') as f: iv = f.read(12) tag = f.read(16) cipher = Cipher(algorithms.AES(cipherkey), modes.GCM(iv, tag), backend=default_backend()) decryptor = cipher.decryptor() data = '' while True: block = f.read(16) # data are not authenticated yet if block: data = data + decryptor.update(block).decode() else: break # throws exception when the tag is wrong data = data + decryptor.finalize().decode() return json.loads(data)
def decryptEntryValue(nonce, val): cipherkey = unhexlify(nonce) iv = val[:12] tag = val[12:28] cipher = Cipher(algorithms.AES(cipherkey), modes.GCM(iv, tag), backend=default_backend()) decryptor = cipher.decryptor() data = '' inputData = val[28:] while True: block = inputData[:16] inputData = inputData[16:] if block: data = data + decryptor.update(block).decode() else: break # throws exception when the tag is wrong data = data + decryptor.finalize().decode() return json.loads(data) # Decrypt give entry nonce
def test_create_symmetric_key_with_cryptographic_failure(self): """ Test that a CryptographicFailure error is raised when the symmetric key generation process fails. """ # Create a dummy algorithm that always fails on instantiation. class DummyAlgorithm(object): key_sizes = [0] def __init__(self, key_bytes): raise Exception() engine = crypto.CryptographyEngine() engine._symmetric_key_algorithms.update([( enums.CryptographicAlgorithm.AES, DummyAlgorithm )]) args = [enums.CryptographicAlgorithm.AES, 0] self.assertRaises( exceptions.CryptographicFailure, engine.create_symmetric_key, *args )
def test_decrypt_missing_iv_nonce(self): """ Test that the right error is raised when an IV/nonce is not provided for the decryption algorithm. """ engine = crypto.CryptographyEngine() args = ( enums.CryptographicAlgorithm.AES, b'\x00\x01\x02\x03\x04\x05\x06\x07' b'\x08\x09\x0A\x0B\x0C\x0D\x0E\x0F', b'\x0F\x0E\x0D\x0C\x0B\x0A\x09\x08' b'\x07\x06\x05\x04\x03\x02\x01\x00' ) kwargs = { 'cipher_mode': enums.BlockCipherMode.CBC, 'padding_method': enums.PaddingMethod.PKCS5 } self.assertRaisesRegexp( exceptions.InvalidField, "IV/nonce is required.", engine.decrypt, *args, **kwargs )
def encode_block(self, block_id, block_data): assert (isinstance(block_id, bytes) and len(block_id) == self.block_id_len) enc = Encoder() enc.encode_bytes(self.magic) iv = os.urandom(self.iv_len) enc.encode_bytes(iv) c = Cipher(algorithms.AES(self.key), modes.GCM(iv), backend=self.backend) e = c.encryptor() e.authenticate_additional_data(block_id) s = e.update(block_data) + e.finalize() assert len(e.tag) == self.tag_len enc.encode_bytes(e.tag) enc.encode_bytes(s) return enc.value
def decode_block(self, block_id, block_data): assert (isinstance(block_id, bytes) and len(block_id) == self.block_id_len) assert isinstance(block_data, bytes) assert len(block_data) > (len(self.magic) + self.iv_len + self.tag_len) dec = Decoder(block_data) # check magic assert dec.decode_bytes(len(self.magic)) == self.magic # get iv iv = dec.decode_bytes(self.iv_len) # get tag tag = dec.decode_bytes(self.tag_len) c = Cipher(algorithms.AES(self.key), modes.GCM(iv, tag), backend=self.backend) d = c.decryptor() d.authenticate_additional_data(block_id) s = d.update(dec.decode_bytes_rest()) + d.finalize() return s
def execute(self, item): if item[1]: url = item[1] + "/" + item[2] else: url = item[2] item[2] = os.path.basename(urllib.parse.urlparse(url).path) if item[3]: backend = default_backend() r = requests.get(item[3].uri) key = r.content cipher = Cipher(algorithms.AES(key), modes.CBC(bytes.fromhex(item[3].iv[2:])), backend=backend) decryptor = cipher.decryptor() r = requests.get(url, stream=True) with open(os.path.join(self.location, item[2]), 'wb') as f: for chunk in r.iter_content(chunk_size=1024): if chunk: if item[3]: f.write(decryptor.update(chunk)) else: f.write(chunk)
def encrypt(data, secret_key, initialisation_vector): """ Encrypts data using the given secret key and initialisation vector. :param bytes data: the plaintext bytes to be encrypted :param bytes secret_key: the key to be used for encryption :param bytes initialisation_vector: the initialisation vector :return: the cipher text and GCM authentication tag tuple :rtype: (bytes, bytes) """ cipher = Cipher(algorithm=algorithms.AES(secret_key), mode=modes.GCM(initialization_vector=initialisation_vector, min_tag_length=16), backend=default_backend()) encryptor = cipher.encryptor() ciphertext = encryptor.update(data) + encryptor.finalize() return ciphertext, encryptor.tag
def decrypt(ciphertext, tag, secret_key, initialisation_vector): """ Decrypts a cipher text using the given GCM authentication tag, secret key and initialisation vector. :param bytes ciphertext: the cipher text to be decrypted :param bytes tag: the GCM authentication tag :param bytes secret_key: the key to be used for encryption :param bytes initialisation_vector: the initialisation vector :return: the decrypted plaintext :rtype: bytes """ cipher = Cipher(algorithm=algorithms.AES(secret_key), mode=modes.GCM(initialization_vector=initialisation_vector, tag=tag), backend=default_backend()) decryptor = cipher.decryptor() return decryptor.update(ciphertext) + decryptor.finalize()
def _wrap_core(wrapping_key, a, r, backend): # RFC 3394 Key Wrap - 2.2.1 (index method) encryptor = Cipher(AES(wrapping_key), ECB(), backend).encryptor() n = len(r) for j in range(6): for i in range(n): # every encryption operation is a discrete 16 byte chunk (because # AES has a 128-bit block size) and since we're using ECB it is # safe to reuse the encryptor for the entire operation b = encryptor.update(a + r[i]) # pack/unpack are safe as these are always 64-bit chunks a = struct.pack( ">Q", struct.unpack(">Q", b[:8])[0] ^ ((n * j) + i + 1) ) r[i] = b[-8:] assert encryptor.finalize() == b"" return a + b"".join(r)
def _unwrap_core(wrapping_key, a, r, backend): # Implement RFC 3394 Key Unwrap - 2.2.2 (index method) decryptor = Cipher(AES(wrapping_key), ECB(), backend).decryptor() n = len(r) for j in reversed(range(6)): for i in reversed(range(n)): # pack/unpack are safe as these are always 64-bit chunks atr = struct.pack( ">Q", struct.unpack(">Q", a)[0] ^ ((n * j) + i + 1) ) + r[i] # every decryption operation is a discrete 16 byte chunk so # it is safe to reuse the decryptor for the entire operation b = decryptor.update(atr) a = b[:8] r[i] = b[-8:] assert decryptor.finalize() == b"" return a, r
def _encrypt_data(self, data): assert self._encryption_key from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes from cryptography.hazmat.backends import default_backend iv = os.urandom(12) encryptor = Cipher( algorithms.AES(self._encryption_key), modes.GCM(iv), backend=default_backend() ).encryptor() ciphertext = encryptor.update(json.dumps(data).encode("UTF-8")) + encryptor.finalize() ciphertext = b"1" + encryptor.tag + iv + ciphertext return standard_b64encode(ciphertext).decode("ASCII")
def stretch(passw, iv1): # hash the external iv and the password 8192 times digest = iv1 + (16 * b"\x00") for i in range(8192): passHash = hashes.Hash(hashes.SHA256(), backend=default_backend()) passHash.update(digest) passHash.update(bytes(passw, "utf_16_le")) digest = passHash.finalize() return digest # encrypting function # arguments: # infile: plaintext file path # outfile: ciphertext file path # passw: encryption password # bufferSize: encryption buffer size, must be a multiple of # AES block size (16) # using a larger buffer speeds up things when dealing # with big files
def __init__(self, broker="http://127.0.0.1:19820/api", encoding="utf-8", enc_key=None, enc_iv=None): super().__init__() self._endpoint = broker self._encoding = "utf-8" if enc_key == None or enc_iv == None: self._transport_enc = False self._transport_enc_key = None self._transport_enc_iv = None self._cipher = None else: self._transport_enc = True self._transport_enc_key = enc_key self._transport_enc_iv = enc_iv backend = default_backend() self._cipher = Cipher(algorithms.AES( enc_key), modes.CBC(enc_iv), backend=backend) self._session = requests.Session() self._event_dict = {'logon': self.on_login, 'logoff': self.on_logout, 'ping': self.on_ping, 'query_data': self.on_query_data, 'send_order': self.on_insert_order, 'cancel_order': self.on_cancel_order_event, 'get_quote': self.on_get_quote} self.client_id = '' self.account_id = ''
def aes_key_wrap(wrapping_key, key_to_wrap, backend): if len(wrapping_key) not in [16, 24, 32]: raise ValueError("The wrapping key must be a valid AES key length") if len(key_to_wrap) < 16: raise ValueError("The key to wrap must be at least 16 bytes") if len(key_to_wrap) % 8 != 0: raise ValueError("The key to wrap must be a multiple of 8 bytes") # RFC 3394 Key Wrap - 2.2.1 (index method) encryptor = Cipher(AES(wrapping_key), ECB(), backend).encryptor() a = b"\xa6\xa6\xa6\xa6\xa6\xa6\xa6\xa6" r = [key_to_wrap[i:i + 8] for i in range(0, len(key_to_wrap), 8)] n = len(r) for j in range(6): for i in range(n): # every encryption operation is a discrete 16 byte chunk (because # AES has a 128-bit block size) and since we're using ECB it is # safe to reuse the encryptor for the entire operation b = encryptor.update(a + r[i]) # pack/unpack are safe as these are always 64-bit chunks a = struct.pack( ">Q", struct.unpack(">Q", b[:8])[0] ^ ((n * j) + i + 1) ) r[i] = b[-8:] assert encryptor.finalize() == b"" return a + b"".join(r)
def aes_key_unwrap(wrapping_key, wrapped_key, backend): if len(wrapped_key) < 24: raise ValueError("Must be at least 24 bytes") if len(wrapped_key) % 8 != 0: raise ValueError("The wrapped key must be a multiple of 8 bytes") if len(wrapping_key) not in [16, 24, 32]: raise ValueError("The wrapping key must be a valid AES key length") # Implement RFC 3394 Key Unwrap - 2.2.2 (index method) decryptor = Cipher(AES(wrapping_key), ECB(), backend).decryptor() aiv = b"\xa6\xa6\xa6\xa6\xa6\xa6\xa6\xa6" r = [wrapped_key[i:i + 8] for i in range(0, len(wrapped_key), 8)] a = r.pop(0) n = len(r) for j in reversed(range(6)): for i in reversed(range(n)): # pack/unpack are safe as these are always 64-bit chunks atr = struct.pack( ">Q", struct.unpack(">Q", a)[0] ^ ((n * j) + i + 1) ) + r[i] # every decryption operation is a discrete 16 byte chunk so # it is safe to reuse the decryptor for the entire operation b = decryptor.update(atr) a = b[:8] r[i] = b[-8:] assert decryptor.finalize() == b"" if not bytes_eq(a, aiv): raise InvalidUnwrap() return b"".join(r)
def decrypt_ztoken(ztoken): """Decrypt the given ztoken, used by apple.""" if len(ztoken) <= 32: return ztoken keystring = '00000000000000000000000000000000' key = bytes.fromhex(keystring) cipher = Cipher(algorithms.AES(key), modes.ECB(), backend=default_backend()) decryptor = cipher.decryptor() token = decryptor.update(bytes.fromhex(ztoken[:64])) \ + decryptor.finalize() return token.decode()
def decrypt(self, ctext): iv = ctext[:AES_BLOCK_SIZE] ctext = ctext[AES_BLOCK_SIZE:] cipher = Cipher(algorithms.AES(self.cipher_key), modes.CBC(iv), backend=default_backend()) unpadder = padding.PKCS7(AES_BLOCK_SIZE * 8).unpadder() decryptor = cipher.decryptor() data = decryptor.update(ctext) + decryptor.finalize() return unpadder.update(data) + unpadder.finalize()
def encrypt(self, clear_text): iv = os.urandom(AES_BLOCK_SIZE) cipher = Cipher(algorithms.AES(self.cipher_key), modes.CBC(iv), backend=default_backend()) padder = padding.PKCS7(AES_BLOCK_SIZE * 8).padder() encryptor = cipher.encryptor() padded_data = padder.update(clear_text) + padder.finalize() return iv + encryptor.update(padded_data) + encryptor.finalize()