我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用cryptography.hazmat.primitives.ciphers.modes.GCM。
def encrypt(message, receiver_public_key): sender_private_key = ec.generate_private_key(ec.SECP256K1(), backend) shared_key = sender_private_key.exchange(ec.ECDH(), receiver_public_key) sender_public_key = sender_private_key.public_key() point = sender_public_key.public_numbers().encode_point() iv = '000000000000' xkdf = x963kdf.X963KDF( algorithm = hashes.SHA256(), length = 32, sharedinfo = '', backend = backend ) key = xkdf.derive(shared_key) encryptor = Cipher( algorithms.AES(key), modes.GCM(iv), backend = backend ).encryptor() ciphertext = encryptor.update(message) + encryptor.finalize() return point + encryptor.tag + ciphertext
def decrypt(message, receiver_private_key): point = message[0:65] tag = message[65:81] ciphertext = message[81:] sender_public_numbers = ec.EllipticCurvePublicNumbers.from_encoded_point(ec.SECP256K1(), point) sender_public_key = sender_public_numbers.public_key(backend) shared_key = receiver_private_key.exchange(ec.ECDH(), sender_public_key) iv = '000000000000' xkdf = x963kdf.X963KDF( algorithm = hashes.SHA256(), length = 32, sharedinfo = '', backend = backend ) key = xkdf.derive(shared_key) decryptor = Cipher( algorithms.AES(key), modes.GCM(iv,tag), backend = backend ).decryptor() message = decryptor.update(ciphertext) + decryptor.finalize() return message
def update(self, data): # OpenSSL 0.9.8e has an assertion in its EVP code that causes it # to SIGABRT if you call update with an empty byte string. This can be # removed when we drop support for 0.9.8e (CentOS/RHEL 5). This branch # should be taken only when length is zero and mode is not GCM because # AES GCM can return improper tag values if you don't call update # with empty plaintext when authenticating AAD for ...reasons. if len(data) == 0 and not isinstance(self._mode, modes.GCM): return b"" buf = self._backend._ffi.new("unsigned char[]", len(data) + self._block_size - 1) outlen = self._backend._ffi.new("int *") res = self._backend._lib.EVP_CipherUpdate(self._ctx, buf, outlen, data, len(data)) self._backend.openssl_assert(res != 0) return self._backend._ffi.buffer(buf)[:outlen[0]]
def update(self, data): # OpenSSL 0.9.8e has an assertion in its EVP code that causes it # to SIGABRT if you call update with an empty byte string. This can be # removed when we drop support for 0.9.8e (CentOS/RHEL 5). This branch # should be taken only when length is zero and mode is not GCM because # AES GCM can return improper tag values if you don't call update # with empty plaintext when authenticating AAD for ...reasons. if len(data) == 0 and not isinstance(self._mode, modes.GCM): return b"" buf = self._backend._ffi.new("unsigned char[]", len(data) + self._block_size - 1) outlen = self._backend._ffi.new("int *") res = self._backend._lib.EVP_CipherUpdate(self._ctx, buf, outlen, data, len(data)) assert res != 0 return self._backend._ffi.buffer(buf)[:outlen[0]]
def decryptStorage(path, key): cipherkey = unhexlify(key) with open(path, 'rb') as f: iv = f.read(12) tag = f.read(16) cipher = Cipher(algorithms.AES(cipherkey), modes.GCM(iv, tag), backend=default_backend()) decryptor = cipher.decryptor() data = '' while True: block = f.read(16) # data are not authenticated yet if block: data = data + decryptor.update(block).decode() else: break # throws exception when the tag is wrong data = data + decryptor.finalize().decode() return json.loads(data)
def decryptEntryValue(nonce, val): cipherkey = unhexlify(nonce) iv = val[:12] tag = val[12:28] cipher = Cipher(algorithms.AES(cipherkey), modes.GCM(iv, tag), backend=default_backend()) decryptor = cipher.decryptor() data = '' inputData = val[28:] while True: block = inputData[:16] inputData = inputData[16:] if block: data = data + decryptor.update(block).decode() else: break # throws exception when the tag is wrong data = data + decryptor.finalize().decode() return json.loads(data) # Decrypt give entry nonce
def encode_block(self, block_id, block_data): assert (isinstance(block_id, bytes) and len(block_id) == self.block_id_len) enc = Encoder() enc.encode_bytes(self.magic) iv = os.urandom(self.iv_len) enc.encode_bytes(iv) c = Cipher(algorithms.AES(self.key), modes.GCM(iv), backend=self.backend) e = c.encryptor() e.authenticate_additional_data(block_id) s = e.update(block_data) + e.finalize() assert len(e.tag) == self.tag_len enc.encode_bytes(e.tag) enc.encode_bytes(s) return enc.value
def decode_block(self, block_id, block_data): assert (isinstance(block_id, bytes) and len(block_id) == self.block_id_len) assert isinstance(block_data, bytes) assert len(block_data) > (len(self.magic) + self.iv_len + self.tag_len) dec = Decoder(block_data) # check magic assert dec.decode_bytes(len(self.magic)) == self.magic # get iv iv = dec.decode_bytes(self.iv_len) # get tag tag = dec.decode_bytes(self.tag_len) c = Cipher(algorithms.AES(self.key), modes.GCM(iv, tag), backend=self.backend) d = c.decryptor() d.authenticate_additional_data(block_id) s = d.update(dec.decode_bytes_rest()) + d.finalize() return s
def encrypt(data, secret_key, initialisation_vector): """ Encrypts data using the given secret key and initialisation vector. :param bytes data: the plaintext bytes to be encrypted :param bytes secret_key: the key to be used for encryption :param bytes initialisation_vector: the initialisation vector :return: the cipher text and GCM authentication tag tuple :rtype: (bytes, bytes) """ cipher = Cipher(algorithm=algorithms.AES(secret_key), mode=modes.GCM(initialization_vector=initialisation_vector, min_tag_length=16), backend=default_backend()) encryptor = cipher.encryptor() ciphertext = encryptor.update(data) + encryptor.finalize() return ciphertext, encryptor.tag
def decrypt(ciphertext, tag, secret_key, initialisation_vector): """ Decrypts a cipher text using the given GCM authentication tag, secret key and initialisation vector. :param bytes ciphertext: the cipher text to be decrypted :param bytes tag: the GCM authentication tag :param bytes secret_key: the key to be used for encryption :param bytes initialisation_vector: the initialisation vector :return: the decrypted plaintext :rtype: bytes """ cipher = Cipher(algorithm=algorithms.AES(secret_key), mode=modes.GCM(initialization_vector=initialisation_vector, tag=tag), backend=default_backend()) decryptor = cipher.decryptor() return decryptor.update(ciphertext) + decryptor.finalize()
def _encrypt_data(self, data): assert self._encryption_key from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes from cryptography.hazmat.backends import default_backend iv = os.urandom(12) encryptor = Cipher( algorithms.AES(self._encryption_key), modes.GCM(iv), backend=default_backend() ).encryptor() ciphertext = encryptor.update(json.dumps(data).encode("UTF-8")) + encryptor.finalize() ciphertext = b"1" + encryptor.tag + iv + ciphertext return standard_b64encode(ciphertext).decode("ASCII")
def encrypt(self, cleartext: bytes): # No need for padding as we are using GCM # Get a new iv for GCM iv = urandom(int(AES.block_size // 8)) cipher = Cipher(self._hazmat_key, GCM(iv), backend=openssl) encryptor = cipher.encryptor() enc = encryptor.update(cleartext) + encryptor.finalize() return iv + enc + encryptor.tag
def decrypt(self, ciphertext: bytes): iv = ciphertext[:AES.block_size // 8] tag = ciphertext[-16:] cipher = Cipher(self._hazmat_key, GCM(iv, tag), backend=openssl).decryptor() return cipher.update(ciphertext[16:-16]) + cipher.finalize()
def aes_decrypt(key, iv, payload): """ Use AES128 GCM with the given key and iv to decrypt the payload. """ data = payload[:-16] tag = payload[-16:] backend = default_backend() decryptor = Cipher( algorithms.AES(key), GCM(iv, tag=tag), backend=backend).decryptor() return decryptor.update(data) + decryptor.finalize()
def aes_encrypt(key, iv, plaintext): """ Use AES128 GCM with the given key and iv to encrypt the plaintext. """ backend = default_backend() encryptor = Cipher( algorithms.AES(key), GCM(iv), backend=backend).encryptor() return encryptor.update(plaintext) + encryptor.finalize() + encryptor.tag
def encrypt_str(self, content, key, salt, salt_explicit): # return the encrypted content prepended with the # gcm tag and salt_explicit cipher = Cipher(algorithms.AES(key), modes.GCM(initialization_vector=self._bulid_iv(salt, salt_explicit)), backend=default_backend() ).encryptor() ciphertext = cipher.update(content) + cipher.finalize() return struct.pack('!q16s', salt_explicit, cipher.tag) + ciphertext
def decrypt_str(self, content, key, salt): # content contains the gcm tag and salt_explicit in plaintext if len(content) < 24: raise CryptoException("truncated content") salt_explicit, gcm_tag = struct.unpack_from('!q16s', content) cipher = Cipher(algorithms.AES(key), modes.GCM(initialization_vector=self._bulid_iv(salt, salt_explicit), tag=gcm_tag), backend=default_backend() ).decryptor() return cipher.update(content[24:]) + cipher.finalize()
def test_aes_gcm_full(s): iv_new = os.urandom(16) cipher_gcm_full = Cipher(algorithms.AES(rawkey), modes.GCM(iv_new), backend=_default_backend) encryptor = cipher_gcm_full.encryptor() r = encryptor.update(s) + encryptor.finalize()
def initialization(self, key): key = base64.urlsafe_b64decode(key) k = key[:16] iv = key[16:28] cipher = Cipher(algorithms.AES(k), modes.GCM(iv), backend=default_backend()) self._decryptor = cipher.decryptor() self._encryptor = cipher.encryptor() self._last_packet = b""
def _decrypt_data(self, data): assert self._encryption_key from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes from cryptography.hazmat.backends import default_backend from binascii import a2b_base64 key = self._encryption_key encoded_message = a2b_base64(data) version = encoded_message[0:1] tag = encoded_message[1:17] initialization_vector = encoded_message[17:29] encrypted_message = encoded_message[29:] if version != b"1": raise Exception("Invalid Version") cipher = Cipher(algorithms.AES(key), modes.GCM(initialization_vector, tag), backend=default_backend()) decryptor = cipher.decryptor() decrypted = decryptor.update(encrypted_message) + decryptor.finalize() decrypted = decrypted.decode() return(decrypted)
def finalize(self): # OpenSSL 1.0.1 on Ubuntu 12.04 (and possibly other distributions) # appears to have a bug where you must make at least one call to update # even if you are only using authenticate_additional_data or the # GCM tag will be wrong. An (empty) call to update resolves this # and is harmless for all other versions of OpenSSL. if isinstance(self._mode, modes.GCM): self.update(b"") buf = self._backend._ffi.new("unsigned char[]", self._block_size) outlen = self._backend._ffi.new("int *") res = self._backend._lib.EVP_CipherFinal_ex(self._ctx, buf, outlen) if res == 0: errors = self._backend._consume_errors() if not errors and isinstance(self._mode, modes.GCM): raise InvalidTag self._backend.openssl_assert( errors[0][1:] == ( self._backend._lib.ERR_LIB_EVP, self._backend._lib.EVP_F_EVP_ENCRYPTFINAL_EX, self._backend._lib.EVP_R_DATA_NOT_MULTIPLE_OF_BLOCK_LENGTH ) or errors[0][1:] == ( self._backend._lib.ERR_LIB_EVP, self._backend._lib.EVP_F_EVP_DECRYPTFINAL_EX, self._backend._lib.EVP_R_DATA_NOT_MULTIPLE_OF_BLOCK_LENGTH ) ) raise ValueError( "The length of the provided data is not a multiple of " "the block length." ) if (isinstance(self._mode, modes.GCM) and self._operation == self._ENCRYPT): block_byte_size = self._block_size // 8 tag_buf = self._backend._ffi.new( "unsigned char[]", block_byte_size ) res = self._backend._lib.EVP_CIPHER_CTX_ctrl( self._ctx, self._backend._lib.EVP_CTRL_GCM_GET_TAG, block_byte_size, tag_buf ) self._backend.openssl_assert(res != 0) self._tag = self._backend._ffi.buffer(tag_buf)[:] res = self._backend._lib.EVP_CIPHER_CTX_cleanup(self._ctx) self._backend.openssl_assert(res == 1) return self._backend._ffi.buffer(buf)[:outlen[0]]
def finalize(self): # OpenSSL 1.0.1 on Ubuntu 12.04 (and possibly other distributions) # appears to have a bug where you must make at least one call to update # even if you are only using authenticate_additional_data or the # GCM tag will be wrong. An (empty) call to update resolves this # and is harmless for all other versions of OpenSSL. if isinstance(self._mode, modes.GCM): self.update(b"") buf = self._backend._ffi.new("unsigned char[]", self._block_size_bytes) outlen = self._backend._ffi.new("int *") res = self._backend._lib.EVP_CipherFinal_ex(self._ctx, buf, outlen) if res == 0: errors = self._backend._consume_errors() if not errors and isinstance(self._mode, modes.GCM): raise InvalidTag self._backend.openssl_assert( errors[0][1:] == ( self._backend._lib.ERR_LIB_EVP, self._backend._lib.EVP_F_EVP_ENCRYPTFINAL_EX, self._backend._lib.EVP_R_DATA_NOT_MULTIPLE_OF_BLOCK_LENGTH ) or errors[0][1:] == ( self._backend._lib.ERR_LIB_EVP, self._backend._lib.EVP_F_EVP_DECRYPTFINAL_EX, self._backend._lib.EVP_R_DATA_NOT_MULTIPLE_OF_BLOCK_LENGTH ) ) raise ValueError( "The length of the provided data is not a multiple of " "the block length." ) if (isinstance(self._mode, modes.GCM) and self._operation == self._ENCRYPT): tag_buf = self._backend._ffi.new( "unsigned char[]", self._block_size_bytes ) res = self._backend._lib.EVP_CIPHER_CTX_ctrl( self._ctx, self._backend._lib.EVP_CTRL_GCM_GET_TAG, self._block_size_bytes, tag_buf ) self._backend.openssl_assert(res != 0) self._tag = self._backend._ffi.buffer(tag_buf)[:] res = self._backend._lib.EVP_CIPHER_CTX_cleanup(self._ctx) self._backend.openssl_assert(res == 1) return self._backend._ffi.buffer(buf)[:outlen[0]]
def finalize(self): # OpenSSL 1.0.1 on Ubuntu 12.04 (and possibly other distributions) # appears to have a bug where you must make at least one call to update # even if you are only using authenticate_additional_data or the # GCM tag will be wrong. An (empty) call to update resolves this # and is harmless for all other versions of OpenSSL. if isinstance(self._mode, modes.GCM): self.update(b"") if ( self._operation == self._DECRYPT and isinstance(self._mode, modes.ModeWithAuthenticationTag) and self.tag is None ): raise ValueError( "Authentication tag must be provided when decrypting." ) buf = self._backend._ffi.new("unsigned char[]", self._block_size_bytes) outlen = self._backend._ffi.new("int *") res = self._backend._lib.EVP_CipherFinal_ex(self._ctx, buf, outlen) if res == 0: errors = self._backend._consume_errors() if not errors and isinstance(self._mode, modes.GCM): raise InvalidTag self._backend.openssl_assert( errors[0]._lib_reason_match( self._backend._lib.ERR_LIB_EVP, self._backend._lib.EVP_R_DATA_NOT_MULTIPLE_OF_BLOCK_LENGTH ) or errors[0]._lib_reason_match( self._backend._lib.ERR_LIB_EVP, self._backend._lib.EVP_R_DATA_NOT_MULTIPLE_OF_BLOCK_LENGTH ) ) raise ValueError( "The length of the provided data is not a multiple of " "the block length." ) if (isinstance(self._mode, modes.GCM) and self._operation == self._ENCRYPT): tag_buf = self._backend._ffi.new( "unsigned char[]", self._block_size_bytes ) res = self._backend._lib.EVP_CIPHER_CTX_ctrl( self._ctx, self._backend._lib.EVP_CTRL_GCM_GET_TAG, self._block_size_bytes, tag_buf ) self._backend.openssl_assert(res != 0) self._tag = self._backend._ffi.buffer(tag_buf)[:] res = self._backend._lib.EVP_CIPHER_CTX_cleanup(self._ctx) self._backend.openssl_assert(res == 1) return self._backend._ffi.buffer(buf)[:outlen[0]]
def finalize(self): # OpenSSL 1.0.1 on Ubuntu 12.04 (and possibly other distributions) # appears to have a bug where you must make at least one call to update # even if you are only using authenticate_additional_data or the # GCM tag will be wrong. An (empty) call to update resolves this # and is harmless for all other versions of OpenSSL. if isinstance(self._mode, modes.GCM): self.update(b"") if ( self._operation == self._DECRYPT and isinstance(self._mode, modes.ModeWithAuthenticationTag) and self.tag is None ): raise ValueError( "Authentication tag must be provided when decrypting." ) buf = self._backend._ffi.new("unsigned char[]", self._block_size_bytes) outlen = self._backend._ffi.new("int *") res = self._backend._lib.EVP_CipherFinal_ex(self._ctx, buf, outlen) if res == 0: errors = self._backend._consume_errors() if not errors and isinstance(self._mode, modes.GCM): raise InvalidTag self._backend.openssl_assert( errors[0]._lib_reason_match( self._backend._lib.ERR_LIB_EVP, self._backend._lib.EVP_R_DATA_NOT_MULTIPLE_OF_BLOCK_LENGTH ) or errors[0]._lib_reason_match( self._backend._lib.ERR_LIB_EVP, self._backend._lib.EVP_R_DATA_NOT_MULTIPLE_OF_BLOCK_LENGTH ) ) raise ValueError( "The length of the provided data is not a multiple of " "the block length." ) if (isinstance(self._mode, modes.GCM) and self._operation == self._ENCRYPT): tag_buf = self._backend._ffi.new( "unsigned char[]", self._block_size_bytes ) res = self._backend._lib.EVP_CIPHER_CTX_ctrl( self._ctx, self._backend._lib.EVP_CTRL_AEAD_GET_TAG, self._block_size_bytes, tag_buf ) self._backend.openssl_assert(res != 0) self._tag = self._backend._ffi.buffer(tag_buf)[:] res = self._backend._lib.EVP_CIPHER_CTX_cleanup(self._ctx) self._backend.openssl_assert(res == 1) return self._backend._ffi.buffer(buf)[:outlen[0]]