Python cryptography.hazmat.primitives.ciphers.algorithms 模块,AES 实例源码


项目:github-token    作者:ethereans    | 项目源码 | 文件源码
def encrypt(message, receiver_public_key):
    sender_private_key = ec.generate_private_key(ec.SECP256K1(), backend)
    shared_key =, receiver_public_key)
    sender_public_key = sender_private_key.public_key()
    point = sender_public_key.public_numbers().encode_point()
    iv = '000000000000'
    xkdf = x963kdf.X963KDF(
        algorithm = hashes.SHA256(),
        length = 32,
        sharedinfo = '',
        backend = backend
    key = xkdf.derive(shared_key)
    encryptor = Cipher(
        backend = backend
    ciphertext = encryptor.update(message) + encryptor.finalize()
    return point + encryptor.tag + ciphertext
项目:github-token    作者:ethereans    | 项目源码 | 文件源码
def decrypt(message, receiver_private_key):
    point = message[0:65]
    tag = message[65:81]
    ciphertext = message[81:]
    sender_public_numbers = ec.EllipticCurvePublicNumbers.from_encoded_point(ec.SECP256K1(), point)
    sender_public_key = sender_public_numbers.public_key(backend)
    shared_key =, sender_public_key)
    iv = '000000000000'
    xkdf = x963kdf.X963KDF(
        algorithm = hashes.SHA256(),
        length = 32,
        sharedinfo = '',
        backend = backend
    key = xkdf.derive(shared_key)
    decryptor = Cipher(
        backend = backend
    message = decryptor.update(ciphertext) +  decryptor.finalize()
    return message
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def _encrypt_from_parts(self, data, current_time, iv):
        if not isinstance(data, bytes):
            raise TypeError("data must be bytes.")

        padder = padding.PKCS7(algorithms.AES.block_size).padder()
        padded_data = padder.update(data) + padder.finalize()
        encryptor = Cipher(
            algorithms.AES(self._encryption_key), modes.CBC(iv), self._backend
        ciphertext = encryptor.update(padded_data) + encryptor.finalize()

        basic_parts = (
            b"\x80" + struct.pack(">Q", current_time) + iv + ciphertext

        h = HMAC(self._signing_key, hashes.SHA256(), backend=self._backend)
        hmac = h.finalize()
        return base64.urlsafe_b64encode(basic_parts + hmac)
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def _encrypt_from_parts(self, data, current_time, iv):
        if not isinstance(data, bytes):
            raise TypeError("data must be bytes.")

        padder = padding.PKCS7(algorithms.AES.block_size).padder()
        padded_data = padder.update(data) + padder.finalize()
        encryptor = Cipher(
            algorithms.AES(self._encryption_key), modes.CBC(iv), self._backend
        ciphertext = encryptor.update(padded_data) + encryptor.finalize()

        basic_parts = (
            b"\x80" + struct.pack(">Q", current_time) + iv + ciphertext

        h = HMAC(self._signing_key, hashes.SHA256(), backend=self._backend)
        hmac = h.finalize()
        return base64.urlsafe_b64encode(basic_parts + hmac)
项目:python-miio    作者:rytilahti    | 项目源码 | 文件源码
def encrypt(plaintext: bytes, token: bytes) -> bytes:
        """Encrypt plaintext with a given token.

        :param bytes plaintext: Plaintext (json) to encrypt
        :param bytes token: Token to use
        :return: Encrypted bytes"""
        if not isinstance(plaintext, bytes):
            raise TypeError("plaintext requires bytes")
        key, iv = Utils.key_iv(token)
        padder = padding.PKCS7(128).padder()

        padded_plaintext = padder.update(plaintext) + padder.finalize()
        cipher = Cipher(algorithms.AES(key), modes.CBC(iv),

        encryptor = cipher.encryptor()
        return encryptor.update(padded_plaintext) + encryptor.finalize()
项目:python-miio    作者:rytilahti    | 项目源码 | 文件源码
def decrypt(ciphertext: bytes, token: bytes) -> bytes:
        """Decrypt ciphertext with a given token.

        :param bytes ciphertext: Ciphertext to decrypt
        :param bytes token: Token to use
        :return: Decrypted bytes object"""
        if not isinstance(ciphertext, bytes):
            raise TypeError("ciphertext requires bytes")
        key, iv = Utils.key_iv(token)
        cipher = Cipher(algorithms.AES(key), modes.CBC(iv),

        decryptor = cipher.decryptor()
        padded_plaintext = decryptor.update(ciphertext) + decryptor.finalize()

        unpadder = padding.PKCS7(128).unpadder()
        unpadded_plaintext = unpadder.update(padded_plaintext)
        unpadded_plaintext += unpadder.finalize()
        return unpadded_plaintext
项目:endosome    作者:teor2345    | 项目源码 | 文件源码
def crypt_create(key_bytes,
    Create and return a crypto context for symmetric key encryption using the
    key key_bytes.

    Uses algorithm in mode with an IV of iv_bytes.
    If iv_bytes is None, an all-zero IV is used.

    AES CTR mode uses the same operations for encryption and decyption.
    #print "Key: " + binascii.hexlify(bytes(key_bytes))
    algo = algorithm(bytes(key_bytes))
    # The block_size is in bits
    if iv_bytes is None:
        iv_bytes = get_zero_pad(algo.block_size/BITS_IN_BYTE)
    cipher = ciphers.Cipher(algo, mode(bytes(iv_bytes)),
    if is_encrypt_flag:
        return cipher.encryptor()
        return cipher.decryptor()
项目:endosome    作者:teor2345    | 项目源码 | 文件源码
def crypt_bytes_key(key_bytes,
    Use symmetric key encryption to encrypt or decrypt data_bytes with
    Returns the crypted data_bytes.
    See crypt_create() and crypt_bytes_context() for details.
    crypt_context = crypt_create(key_bytes,
    (_, crypt_bytes) = crypt_bytes_context(crypt_context, data_bytes)
    return crypt_bytes
项目:aws-cfn-plex    作者:lordmuffin    | 项目源码 | 文件源码
def _encrypt_from_parts(self, data, current_time, iv):
        if not isinstance(data, bytes):
            raise TypeError("data must be bytes.")

        padder = padding.PKCS7(algorithms.AES.block_size).padder()
        padded_data = padder.update(data) + padder.finalize()
        encryptor = Cipher(
            algorithms.AES(self._encryption_key), modes.CBC(iv), self._backend
        ciphertext = encryptor.update(padded_data) + encryptor.finalize()

        basic_parts = (
            b"\x80" + struct.pack(">Q", current_time) + iv + ciphertext

        h = HMAC(self._signing_key, hashes.SHA256(), backend=self._backend)
        hmac = h.finalize()
        return base64.urlsafe_b64encode(basic_parts + hmac)
项目:Intranet-Penetration    作者:yuxiaokui    | 项目源码 | 文件源码
def _encrypt_from_parts(self, data, current_time, iv):
        if not isinstance(data, bytes):
            raise TypeError("data must be bytes.")

        padder = padding.PKCS7(algorithms.AES.block_size).padder()
        padded_data = padder.update(data) + padder.finalize()
        encryptor = Cipher(
            algorithms.AES(self._encryption_key), modes.CBC(iv), self._backend
        ciphertext = encryptor.update(padded_data) + encryptor.finalize()

        basic_parts = (
            b"\x80" + struct.pack(">Q", current_time) + iv + ciphertext

        h = HMAC(self._signing_key, hashes.SHA256(), backend=self._backend)
        hmac = h.finalize()
        return base64.urlsafe_b64encode(basic_parts + hmac)
项目:Intranet-Penetration    作者:yuxiaokui    | 项目源码 | 文件源码
def _encrypt_from_parts(self, data, current_time, iv):
        if not isinstance(data, bytes):
            raise TypeError("data must be bytes.")

        padder = padding.PKCS7(algorithms.AES.block_size).padder()
        padded_data = padder.update(data) + padder.finalize()
        encryptor = Cipher(
            algorithms.AES(self._encryption_key), modes.CBC(iv), self._backend
        ciphertext = encryptor.update(padded_data) + encryptor.finalize()

        basic_parts = (
            b"\x80" + struct.pack(">Q", current_time) + iv + ciphertext

        h = HMAC(self._signing_key, hashes.SHA256(), backend=self._backend)
        hmac = h.finalize()
        return base64.urlsafe_b64encode(basic_parts + hmac)
项目:Intranet-Penetration    作者:yuxiaokui    | 项目源码 | 文件源码
def _encrypt_from_parts(self, data, current_time, iv):
        if not isinstance(data, bytes):
            raise TypeError("data must be bytes.")

        padder = padding.PKCS7(algorithms.AES.block_size).padder()
        padded_data = padder.update(data) + padder.finalize()
        encryptor = Cipher(
            algorithms.AES(self._encryption_key), modes.CBC(iv), self._backend
        ciphertext = encryptor.update(padded_data) + encryptor.finalize()

        basic_parts = (
            b"\x80" + struct.pack(">Q", current_time) + iv + ciphertext

        h = HMAC(self._signing_key, hashes.SHA256(), backend=self._backend)
        hmac = h.finalize()
        return base64.urlsafe_b64encode(basic_parts + hmac)
项目:MKFQ    作者:maojingios    | 项目源码 | 文件源码
def _encrypt_from_parts(self, data, current_time, iv):
        if not isinstance(data, bytes):
            raise TypeError("data must be bytes.")

        padder = padding.PKCS7(algorithms.AES.block_size).padder()
        padded_data = padder.update(data) + padder.finalize()
        encryptor = Cipher(
            algorithms.AES(self._encryption_key), modes.CBC(iv), self._backend
        ciphertext = encryptor.update(padded_data) + encryptor.finalize()

        basic_parts = (
            b"\x80" + struct.pack(">Q", current_time) + iv + ciphertext

        h = HMAC(self._signing_key, hashes.SHA256(), backend=self._backend)
        hmac = h.finalize()
        return base64.urlsafe_b64encode(basic_parts + hmac)
项目:MKFQ    作者:maojingios    | 项目源码 | 文件源码
def _encrypt_from_parts(self, data, current_time, iv):
        if not isinstance(data, bytes):
            raise TypeError("data must be bytes.")

        padder = padding.PKCS7(algorithms.AES.block_size).padder()
        padded_data = padder.update(data) + padder.finalize()
        encryptor = Cipher(
            algorithms.AES(self._encryption_key), modes.CBC(iv), self._backend
        ciphertext = encryptor.update(padded_data) + encryptor.finalize()

        basic_parts = (
            b"\x80" + struct.pack(">Q", current_time) + iv + ciphertext

        h = HMAC(self._signing_key, hashes.SHA256(), backend=self._backend)
        hmac = h.finalize()
        return base64.urlsafe_b64encode(basic_parts + hmac)
项目:MKFQ    作者:maojingios    | 项目源码 | 文件源码
def _encrypt_from_parts(self, data, current_time, iv):
        if not isinstance(data, bytes):
            raise TypeError("data must be bytes.")

        padder = padding.PKCS7(algorithms.AES.block_size).padder()
        padded_data = padder.update(data) + padder.finalize()
        encryptor = Cipher(
            algorithms.AES(self._encryption_key), modes.CBC(iv), self._backend
        ciphertext = encryptor.update(padded_data) + encryptor.finalize()

        basic_parts = (
            b"\x80" + struct.pack(">Q", current_time) + iv + ciphertext

        h = HMAC(self._signing_key, hashes.SHA256(), backend=self._backend)
        hmac = h.finalize()
        return base64.urlsafe_b64encode(basic_parts + hmac)
项目    作者:nukeop    | 项目源码 | 文件源码
def decrypt(data, password):
    """Decrypts data using the password.

    Decrypts the data using the provided password using the cryptography module.
    If the pasword or data is incorrect this will return None. 

    password = bytes(password)

    #Salt is equal to password as we want the encryption to be reversible only
    #using the password itself
    kdf = PBKDF2HMAC(algorithm=hashes.AES(),

    key = base64.urlsafe_b64encode(kdf.derive(password))
    f = Fernet(key)
    token = f.decrypt(data)
    return token
项目:pytdx    作者:rainx    | 项目源码 | 文件源码
def __init__(self, endpoint="", encoding="utf-8", enc_key=None, enc_iv=None):

        self._endpoint = endpoint
        self._encoding = "utf-8"
        if enc_key == None or enc_iv == None:
            self._transport_enc = False
            self._transport_enc_key = None
            self._transport_enc_iv = None
            self._cipher = None
            self._transport_enc = True
            self._transport_enc_key = enc_key
            self._transport_enc_iv = enc_iv
            backend = default_backend()
            self._cipher = Cipher(algorithms.AES(enc_key), modes.CBC(enc_iv), backend=backend)

        self._session = requests.Session()
项目:slips    作者:satoshilabs    | 项目源码 | 文件源码
def decryptStorage(path, key):
    cipherkey = unhexlify(key)
    with open(path, 'rb') as f:
        iv =
        tag =
        cipher = Cipher(algorithms.AES(cipherkey), modes.GCM(iv, tag), backend=default_backend())
        decryptor = cipher.decryptor()
        data = ''
        while True:
            block =
            # data are not authenticated yet
            if block:
                data = data + decryptor.update(block).decode()
        # throws exception when the tag is wrong
        data = data + decryptor.finalize().decode()
    return json.loads(data)
项目:slips    作者:satoshilabs    | 项目源码 | 文件源码
def decryptEntryValue(nonce, val):
    cipherkey = unhexlify(nonce)
    iv = val[:12]
    tag = val[12:28]
    cipher = Cipher(algorithms.AES(cipherkey), modes.GCM(iv, tag), backend=default_backend())
    decryptor = cipher.decryptor()
    data = ''
    inputData = val[28:]
    while True:
        block = inputData[:16]
        inputData = inputData[16:]
        if block:
            data = data + decryptor.update(block).decode()
        # throws exception when the tag is wrong
    data = data + decryptor.finalize().decode()
    return json.loads(data)

# Decrypt give entry nonce
项目:OneClickDTU    作者:satwikkansal    | 项目源码 | 文件源码
def _encrypt_from_parts(self, data, current_time, iv):
        if not isinstance(data, bytes):
            raise TypeError("data must be bytes.")

        padder = padding.PKCS7(algorithms.AES.block_size).padder()
        padded_data = padder.update(data) + padder.finalize()
        encryptor = Cipher(
            algorithms.AES(self._encryption_key), modes.CBC(iv), self._backend
        ciphertext = encryptor.update(padded_data) + encryptor.finalize()

        basic_parts = (
            b"\x80" + struct.pack(">Q", current_time) + iv + ciphertext

        h = HMAC(self._signing_key, hashes.SHA256(), backend=self._backend)
        hmac = h.finalize()
        return base64.urlsafe_b64encode(basic_parts + hmac)
项目:deb-python-kmip    作者:openstack    | 项目源码 | 文件源码
def test_create_symmetric_key_with_cryptographic_failure(self):
        Test that a CryptographicFailure error is raised when the symmetric
        key generation process fails.
        # Create a dummy algorithm that always fails on instantiation.
        class DummyAlgorithm(object):
            key_sizes = [0]

            def __init__(self, key_bytes):
                raise Exception()

        engine = crypto.CryptographyEngine()

        args = [enums.CryptographicAlgorithm.AES, 0]
项目:deb-python-kmip    作者:openstack    | 项目源码 | 文件源码
def test_decrypt_missing_iv_nonce(self):
        Test that the right error is raised when an IV/nonce is not provided
        for the decryption algorithm.
        engine = crypto.CryptographyEngine()

        args = (
        kwargs = {
            'cipher_mode': enums.BlockCipherMode.CBC,
            'padding_method': enums.PaddingMethod.PKCS5
            "IV/nonce is required.",
项目:xxNet    作者:drzorm    | 项目源码 | 文件源码
def _encrypt_from_parts(self, data, current_time, iv):
        if not isinstance(data, bytes):
            raise TypeError("data must be bytes.")

        padder = padding.PKCS7(algorithms.AES.block_size).padder()
        padded_data = padder.update(data) + padder.finalize()
        encryptor = Cipher(
            algorithms.AES(self._encryption_key), modes.CBC(iv), self._backend
        ciphertext = encryptor.update(padded_data) + encryptor.finalize()

        basic_parts = (
            b"\x80" + struct.pack(">Q", current_time) + iv + ciphertext

        h = HMAC(self._signing_key, hashes.SHA256(), backend=self._backend)
        hmac = h.finalize()
        return base64.urlsafe_b64encode(basic_parts + hmac)
项目:xxNet    作者:drzorm    | 项目源码 | 文件源码
def _encrypt_from_parts(self, data, current_time, iv):
        if not isinstance(data, bytes):
            raise TypeError("data must be bytes.")

        padder = padding.PKCS7(algorithms.AES.block_size).padder()
        padded_data = padder.update(data) + padder.finalize()
        encryptor = Cipher(
            algorithms.AES(self._encryption_key), modes.CBC(iv), self._backend
        ciphertext = encryptor.update(padded_data) + encryptor.finalize()

        basic_parts = (
            b"\x80" + struct.pack(">Q", current_time) + iv + ciphertext

        h = HMAC(self._signing_key, hashes.SHA256(), backend=self._backend)
        hmac = h.finalize()
        return base64.urlsafe_b64encode(basic_parts + hmac)
项目:xxNet    作者:drzorm    | 项目源码 | 文件源码
def _encrypt_from_parts(self, data, current_time, iv):
        if not isinstance(data, bytes):
            raise TypeError("data must be bytes.")

        padder = padding.PKCS7(algorithms.AES.block_size).padder()
        padded_data = padder.update(data) + padder.finalize()
        encryptor = Cipher(
            algorithms.AES(self._encryption_key), modes.CBC(iv), self._backend
        ciphertext = encryptor.update(padded_data) + encryptor.finalize()

        basic_parts = (
            b"\x80" + struct.pack(">Q", current_time) + iv + ciphertext

        h = HMAC(self._signing_key, hashes.SHA256(), backend=self._backend)
        hmac = h.finalize()
        return base64.urlsafe_b64encode(basic_parts + hmac)
项目:tfhfs    作者:fingon    | 项目源码 | 文件源码
def encode_block(self, block_id, block_data):
        assert (isinstance(block_id, bytes)
                and len(block_id) == self.block_id_len)
        enc = Encoder()
        iv = os.urandom(self.iv_len)
        c = Cipher(algorithms.AES(self.key), modes.GCM(iv),
        e = c.encryptor()
        s = e.update(block_data) + e.finalize()
        assert len(e.tag) == self.tag_len
        return enc.value
项目:tfhfs    作者:fingon    | 项目源码 | 文件源码
def decode_block(self, block_id, block_data):
        assert (isinstance(block_id, bytes)
                and len(block_id) == self.block_id_len)
        assert isinstance(block_data, bytes)
        assert len(block_data) > (len(self.magic) + self.iv_len + self.tag_len)

        dec = Decoder(block_data)

        # check magic
        assert dec.decode_bytes(len(self.magic)) == self.magic

        # get iv
        iv = dec.decode_bytes(self.iv_len)

        # get tag
        tag = dec.decode_bytes(self.tag_len)

        c = Cipher(algorithms.AES(self.key), modes.GCM(iv, tag),
        d = c.decryptor()

        s = d.update(dec.decode_bytes_rest()) + d.finalize()
        return s
项目:HLS_m3u8_downloader    作者:djstava    | 项目源码 | 文件源码
def execute(self, item):
        if item[1]:
            url = item[1] + "/" + item[2]
            url = item[2]
            item[2] = os.path.basename(urllib.parse.urlparse(url).path)

        if item[3]:
            backend = default_backend()
            r = requests.get(item[3].uri)
            key = r.content
            cipher = Cipher(algorithms.AES(key), modes.CBC(bytes.fromhex(item[3].iv[2:])), backend=backend)
            decryptor = cipher.decryptor()

        r = requests.get(url, stream=True)
        with open(os.path.join(self.location, item[2]), 'wb') as f:
            for chunk in r.iter_content(chunk_size=1024):
                if chunk:
                    if item[3]:
项目:slack_scholar    作者:xLeitix    | 项目源码 | 文件源码
def _encrypt_from_parts(self, data, current_time, iv):
        if not isinstance(data, bytes):
            raise TypeError("data must be bytes.")

        padder = padding.PKCS7(algorithms.AES.block_size).padder()
        padded_data = padder.update(data) + padder.finalize()
        encryptor = Cipher(
            algorithms.AES(self._encryption_key), modes.CBC(iv), self._backend
        ciphertext = encryptor.update(padded_data) + encryptor.finalize()

        basic_parts = (
            b"\x80" + struct.pack(">Q", current_time) + iv + ciphertext

        h = HMAC(self._signing_key, hashes.SHA256(), backend=self._backend)
        hmac = h.finalize()
        return base64.urlsafe_b64encode(basic_parts + hmac)
项目:delta-sdk-python    作者:Covata    | 项目源码 | 文件源码
def encrypt(data, secret_key, initialisation_vector):
    Encrypts data using the given secret key and initialisation vector.

    :param bytes data: the plaintext bytes to be encrypted
    :param bytes secret_key: the key to be used for encryption
    :param bytes initialisation_vector: the initialisation vector
    :return: the cipher text and GCM authentication tag tuple
    :rtype: (bytes, bytes)
    cipher = Cipher(algorithm=algorithms.AES(secret_key),
    encryptor = cipher.encryptor()
    ciphertext = encryptor.update(data) + encryptor.finalize()

    return ciphertext, encryptor.tag
项目:delta-sdk-python    作者:Covata    | 项目源码 | 文件源码
def decrypt(ciphertext, tag, secret_key, initialisation_vector):
    Decrypts a cipher text using the given GCM authentication tag,
    secret key and initialisation vector.

    :param bytes ciphertext: the cipher text to be decrypted
    :param bytes tag: the GCM authentication tag
    :param bytes secret_key: the key to be used for encryption
    :param bytes initialisation_vector: the initialisation vector
    :return: the decrypted plaintext
    :rtype: bytes
    cipher = Cipher(algorithm=algorithms.AES(secret_key),
    decryptor = cipher.decryptor()
    return decryptor.update(ciphertext) + decryptor.finalize()
项目:RemoteTree    作者:deNULL    | 项目源码 | 文件源码
def _encrypt_from_parts(self, data, current_time, iv):
        if not isinstance(data, bytes):
            raise TypeError("data must be bytes.")

        padder = padding.PKCS7(algorithms.AES.block_size).padder()
        padded_data = padder.update(data) + padder.finalize()
        encryptor = Cipher(
            algorithms.AES(self._encryption_key), modes.CBC(iv), self._backend
        ciphertext = encryptor.update(padded_data) + encryptor.finalize()

        basic_parts = (
            b"\x80" + struct.pack(">Q", current_time) + iv + ciphertext

        h = HMAC(self._signing_key, hashes.SHA256(), backend=self._backend)
        hmac = h.finalize()
        return base64.urlsafe_b64encode(basic_parts + hmac)
项目:quickstart-git2s3    作者:aws-quickstart    | 项目源码 | 文件源码
def _encrypt_from_parts(self, data, current_time, iv):
        if not isinstance(data, bytes):
            raise TypeError("data must be bytes.")

        padder = padding.PKCS7(algorithms.AES.block_size).padder()
        padded_data = padder.update(data) + padder.finalize()
        encryptor = Cipher(
            algorithms.AES(self._encryption_key), modes.CBC(iv), self._backend
        ciphertext = encryptor.update(padded_data) + encryptor.finalize()

        basic_parts = (
            b"\x80" + struct.pack(">Q", current_time) + iv + ciphertext

        h = HMAC(self._signing_key, hashes.SHA256(), backend=self._backend)
        hmac = h.finalize()
        return base64.urlsafe_b64encode(basic_parts + hmac)
项目:quickstart-git2s3    作者:aws-quickstart    | 项目源码 | 文件源码
def _wrap_core(wrapping_key, a, r, backend):
    # RFC 3394 Key Wrap - 2.2.1 (index method)
    encryptor = Cipher(AES(wrapping_key), ECB(), backend).encryptor()
    n = len(r)
    for j in range(6):
        for i in range(n):
            # every encryption operation is a discrete 16 byte chunk (because
            # AES has a 128-bit block size) and since we're using ECB it is
            # safe to reuse the encryptor for the entire operation
            b = encryptor.update(a + r[i])
            # pack/unpack are safe as these are always 64-bit chunks
            a = struct.pack(
                ">Q", struct.unpack(">Q", b[:8])[0] ^ ((n * j) + i + 1)
            r[i] = b[-8:]

    assert encryptor.finalize() == b""

    return a + b"".join(r)
项目:quickstart-git2s3    作者:aws-quickstart    | 项目源码 | 文件源码
def _unwrap_core(wrapping_key, a, r, backend):
    # Implement RFC 3394 Key Unwrap - 2.2.2 (index method)
    decryptor = Cipher(AES(wrapping_key), ECB(), backend).decryptor()
    n = len(r)
    for j in reversed(range(6)):
        for i in reversed(range(n)):
            # pack/unpack are safe as these are always 64-bit chunks
            atr = struct.pack(
                ">Q", struct.unpack(">Q", a)[0] ^ ((n * j) + i + 1)
            ) + r[i]
            # every decryption operation is a discrete 16 byte chunk so
            # it is safe to reuse the decryptor for the entire operation
            b = decryptor.update(atr)
            a = b[:8]
            r[i] = b[-8:]

    assert decryptor.finalize() == b""
    return a, r
项目:Docker-XX-Net    作者:kuanghy    | 项目源码 | 文件源码
def _encrypt_from_parts(self, data, current_time, iv):
        if not isinstance(data, bytes):
            raise TypeError("data must be bytes.")

        padder = padding.PKCS7(algorithms.AES.block_size).padder()
        padded_data = padder.update(data) + padder.finalize()
        encryptor = Cipher(
            algorithms.AES(self._encryption_key), modes.CBC(iv), self._backend
        ciphertext = encryptor.update(padded_data) + encryptor.finalize()

        basic_parts = (
            b"\x80" + struct.pack(">Q", current_time) + iv + ciphertext

        h = HMAC(self._signing_key, hashes.SHA256(), backend=self._backend)
        hmac = h.finalize()
        return base64.urlsafe_b64encode(basic_parts + hmac)
项目:Docker-XX-Net    作者:kuanghy    | 项目源码 | 文件源码
def _encrypt_from_parts(self, data, current_time, iv):
        if not isinstance(data, bytes):
            raise TypeError("data must be bytes.")

        padder = padding.PKCS7(algorithms.AES.block_size).padder()
        padded_data = padder.update(data) + padder.finalize()
        encryptor = Cipher(
            algorithms.AES(self._encryption_key), modes.CBC(iv), self._backend
        ciphertext = encryptor.update(padded_data) + encryptor.finalize()

        basic_parts = (
            b"\x80" + struct.pack(">Q", current_time) + iv + ciphertext

        h = HMAC(self._signing_key, hashes.SHA256(), backend=self._backend)
        hmac = h.finalize()
        return base64.urlsafe_b64encode(basic_parts + hmac)
项目:Intruder-detector-with-Raspberry-Pi-and-Pushbullet    作者:DeligenceTechnologies    | 项目源码 | 文件源码
def _encrypt_data(self, data):
        assert self._encryption_key

        from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
        from cryptography.hazmat.backends import default_backend

        iv = os.urandom(12)
        encryptor = Cipher(

        ciphertext = encryptor.update(json.dumps(data).encode("UTF-8")) + encryptor.finalize()
        ciphertext = b"1" + encryptor.tag + iv + ciphertext
        return standard_b64encode(ciphertext).decode("ASCII")
项目:Intruder-detector-with-Raspberry-Pi-and-Pushbullet    作者:DeligenceTechnologies    | 项目源码 | 文件源码
def _encrypt_data(self, data):
        assert self._encryption_key

        from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
        from cryptography.hazmat.backends import default_backend

        iv = os.urandom(12)
        encryptor = Cipher(

        ciphertext = encryptor.update(json.dumps(data).encode("UTF-8")) + encryptor.finalize()
        ciphertext = b"1" + encryptor.tag + iv + ciphertext
        return standard_b64encode(ciphertext).decode("ASCII")
项目:PyQYT    作者:collinsctk    | 项目源码 | 文件源码
def _encrypt_from_parts(self, data, current_time, iv):
        if not isinstance(data, bytes):
            raise TypeError("data must be bytes.")

        padder = padding.PKCS7(algorithms.AES.block_size).padder()
        padded_data = padder.update(data) + padder.finalize()
        encryptor = Cipher(
            algorithms.AES(self._encryption_key), modes.CBC(iv), self._backend
        ciphertext = encryptor.update(padded_data) + encryptor.finalize()

        basic_parts = (
            b"\x80" + struct.pack(">Q", current_time) + iv + ciphertext

        h = HMAC(self._signing_key, hashes.SHA256(), backend=self._backend)
        hmac = h.finalize()
        return base64.urlsafe_b64encode(basic_parts + hmac)
项目:pyAesCrypt    作者:marcobellaccini    | 项目源码 | 文件源码
def stretch(passw, iv1):

    # hash the external iv and the password 8192 times
    digest = iv1 + (16 * b"\x00")

    for i in range(8192):
        passHash = hashes.Hash(hashes.SHA256(), backend=default_backend())
        passHash.update(bytes(passw, "utf_16_le"))
        digest = passHash.finalize()

    return digest

# encrypting function
# arguments:
# infile: plaintext file path
# outfile: ciphertext file path
# passw: encryption password
# bufferSize: encryption buffer size, must be a multiple of
#             AES block size (16)
#             using a larger buffer speeds up things when dealing
#             with big files
项目:QUANTAXIS    作者:yutiansut    | 项目源码 | 文件源码
def __init__(self, broker="", encoding="utf-8", enc_key=None, enc_iv=None):

        self._endpoint = broker
        self._encoding = "utf-8"
        if enc_key == None or enc_iv == None:
            self._transport_enc = False
            self._transport_enc_key = None
            self._transport_enc_iv = None
            self._cipher = None
            self._transport_enc = True
            self._transport_enc_key = enc_key
            self._transport_enc_iv = enc_iv
            backend = default_backend()
            self._cipher = Cipher(algorithms.AES(
                enc_key), modes.CBC(enc_iv), backend=backend)

        self._session = requests.Session()
        self._event_dict = {'logon': self.on_login, 'logoff': self.on_logout, 'ping': self.on_ping,
                            'query_data': self.on_query_data, 'send_order': self.on_insert_order,
                            'cancel_order': self.on_cancel_order_event, 'get_quote': self.on_get_quote}
        self.client_id = ''
        self.account_id = ''
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def aes_key_wrap(wrapping_key, key_to_wrap, backend):
    if len(wrapping_key) not in [16, 24, 32]:
        raise ValueError("The wrapping key must be a valid AES key length")

    if len(key_to_wrap) < 16:
        raise ValueError("The key to wrap must be at least 16 bytes")

    if len(key_to_wrap) % 8 != 0:
        raise ValueError("The key to wrap must be a multiple of 8 bytes")

    # RFC 3394 Key Wrap - 2.2.1 (index method)
    encryptor = Cipher(AES(wrapping_key), ECB(), backend).encryptor()
    a = b"\xa6\xa6\xa6\xa6\xa6\xa6\xa6\xa6"
    r = [key_to_wrap[i:i + 8] for i in range(0, len(key_to_wrap), 8)]
    n = len(r)
    for j in range(6):
        for i in range(n):
            # every encryption operation is a discrete 16 byte chunk (because
            # AES has a 128-bit block size) and since we're using ECB it is
            # safe to reuse the encryptor for the entire operation
            b = encryptor.update(a + r[i])
            # pack/unpack are safe as these are always 64-bit chunks
            a = struct.pack(
                ">Q", struct.unpack(">Q", b[:8])[0] ^ ((n * j) + i + 1)
            r[i] = b[-8:]

    assert encryptor.finalize() == b""

    return a + b"".join(r)
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def aes_key_unwrap(wrapping_key, wrapped_key, backend):
    if len(wrapped_key) < 24:
        raise ValueError("Must be at least 24 bytes")

    if len(wrapped_key) % 8 != 0:
        raise ValueError("The wrapped key must be a multiple of 8 bytes")

    if len(wrapping_key) not in [16, 24, 32]:
        raise ValueError("The wrapping key must be a valid AES key length")

    # Implement RFC 3394 Key Unwrap - 2.2.2 (index method)
    decryptor = Cipher(AES(wrapping_key), ECB(), backend).decryptor()
    aiv = b"\xa6\xa6\xa6\xa6\xa6\xa6\xa6\xa6"

    r = [wrapped_key[i:i + 8] for i in range(0, len(wrapped_key), 8)]
    a = r.pop(0)
    n = len(r)
    for j in reversed(range(6)):
        for i in reversed(range(n)):
            # pack/unpack are safe as these are always 64-bit chunks
            atr = struct.pack(
                ">Q", struct.unpack(">Q", a)[0] ^ ((n * j) + i + 1)
            ) + r[i]
            # every decryption operation is a discrete 16 byte chunk so
            # it is safe to reuse the decryptor for the entire operation
            b = decryptor.update(atr)
            a = b[:8]
            r[i] = b[-8:]

    assert decryptor.finalize() == b""

    if not bytes_eq(a, aiv):
        raise InvalidUnwrap()

    return b"".join(r)
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def aes_key_wrap(wrapping_key, key_to_wrap, backend):
    if len(wrapping_key) not in [16, 24, 32]:
        raise ValueError("The wrapping key must be a valid AES key length")

    if len(key_to_wrap) < 16:
        raise ValueError("The key to wrap must be at least 16 bytes")

    if len(key_to_wrap) % 8 != 0:
        raise ValueError("The key to wrap must be a multiple of 8 bytes")

    # RFC 3394 Key Wrap - 2.2.1 (index method)
    encryptor = Cipher(AES(wrapping_key), ECB(), backend).encryptor()
    a = b"\xa6\xa6\xa6\xa6\xa6\xa6\xa6\xa6"
    r = [key_to_wrap[i:i + 8] for i in range(0, len(key_to_wrap), 8)]
    n = len(r)
    for j in range(6):
        for i in range(n):
            # every encryption operation is a discrete 16 byte chunk (because
            # AES has a 128-bit block size) and since we're using ECB it is
            # safe to reuse the encryptor for the entire operation
            b = encryptor.update(a + r[i])
            # pack/unpack are safe as these are always 64-bit chunks
            a = struct.pack(
                ">Q", struct.unpack(">Q", b[:8])[0] ^ ((n * j) + i + 1)
            r[i] = b[-8:]

    assert encryptor.finalize() == b""

    return a + b"".join(r)
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def aes_key_unwrap(wrapping_key, wrapped_key, backend):
    if len(wrapped_key) < 24:
        raise ValueError("Must be at least 24 bytes")

    if len(wrapped_key) % 8 != 0:
        raise ValueError("The wrapped key must be a multiple of 8 bytes")

    if len(wrapping_key) not in [16, 24, 32]:
        raise ValueError("The wrapping key must be a valid AES key length")

    # Implement RFC 3394 Key Unwrap - 2.2.2 (index method)
    decryptor = Cipher(AES(wrapping_key), ECB(), backend).decryptor()
    aiv = b"\xa6\xa6\xa6\xa6\xa6\xa6\xa6\xa6"

    r = [wrapped_key[i:i + 8] for i in range(0, len(wrapped_key), 8)]
    a = r.pop(0)
    n = len(r)
    for j in reversed(range(6)):
        for i in reversed(range(n)):
            # pack/unpack are safe as these are always 64-bit chunks
            atr = struct.pack(
                ">Q", struct.unpack(">Q", a)[0] ^ ((n * j) + i + 1)
            ) + r[i]
            # every decryption operation is a discrete 16 byte chunk so
            # it is safe to reuse the decryptor for the entire operation
            b = decryptor.update(atr)
            a = b[:8]
            r[i] = b[-8:]

    assert decryptor.finalize() == b""

    if not bytes_eq(a, aiv):
        raise InvalidUnwrap()

    return b"".join(r)
项目:python-miio    作者:rytilahti    | 项目源码 | 文件源码
def decrypt_ztoken(ztoken):
        """Decrypt the given ztoken, used by apple."""
        if len(ztoken) <= 32:
            return ztoken

        keystring = '00000000000000000000000000000000'
        key = bytes.fromhex(keystring)
        cipher = Cipher(algorithms.AES(key), modes.ECB(),
        decryptor = cipher.decryptor()
        token = decryptor.update(bytes.fromhex(ztoken[:64])) \
                + decryptor.finalize()

        return token.decode()
项目:pyetesync    作者:etesync    | 项目源码 | 文件源码
def decrypt(self, ctext):
        iv = ctext[:AES_BLOCK_SIZE]
        ctext = ctext[AES_BLOCK_SIZE:]
        cipher = Cipher(algorithms.AES(self.cipher_key), modes.CBC(iv), backend=default_backend())
        unpadder = padding.PKCS7(AES_BLOCK_SIZE * 8).unpadder()
        decryptor = cipher.decryptor()

        data = decryptor.update(ctext) + decryptor.finalize()
        return unpadder.update(data) + unpadder.finalize()
项目:pyetesync    作者:etesync    | 项目源码 | 文件源码
def encrypt(self, clear_text):
        iv = os.urandom(AES_BLOCK_SIZE)
        cipher = Cipher(algorithms.AES(self.cipher_key), modes.CBC(iv), backend=default_backend())
        padder = padding.PKCS7(AES_BLOCK_SIZE * 8).padder()
        encryptor = cipher.encryptor()
        padded_data = padder.update(clear_text) + padder.finalize()

        return iv + encryptor.update(padded_data) + encryptor.finalize()
项目:aws-cfn-plex    作者:lordmuffin    | 项目源码 | 文件源码
def aes_key_wrap(wrapping_key, key_to_wrap, backend):
    if len(wrapping_key) not in [16, 24, 32]:
        raise ValueError("The wrapping key must be a valid AES key length")

    if len(key_to_wrap) < 16:
        raise ValueError("The key to wrap must be at least 16 bytes")

    if len(key_to_wrap) % 8 != 0:
        raise ValueError("The key to wrap must be a multiple of 8 bytes")

    # RFC 3394 Key Wrap - 2.2.1 (index method)
    encryptor = Cipher(AES(wrapping_key), ECB(), backend).encryptor()
    a = b"\xa6\xa6\xa6\xa6\xa6\xa6\xa6\xa6"
    r = [key_to_wrap[i:i + 8] for i in range(0, len(key_to_wrap), 8)]
    n = len(r)
    for j in range(6):
        for i in range(n):
            # every encryption operation is a discrete 16 byte chunk (because
            # AES has a 128-bit block size) and since we're using ECB it is
            # safe to reuse the encryptor for the entire operation
            b = encryptor.update(a + r[i])
            # pack/unpack are safe as these are always 64-bit chunks
            a = struct.pack(
                ">Q", struct.unpack(">Q", b[:8])[0] ^ ((n * j) + i + 1)
            r[i] = b[-8:]

    assert encryptor.finalize() == b""

    return a + b"".join(r)