Python cryptography.fernet 模块,InvalidToken() 实例源码

我们从Python开源项目中,提取了以下30个代码示例,用于说明如何使用cryptography.fernet.InvalidToken()

项目:edx-video-pipeline    作者:edx    | 项目源码 | 文件源码
def test_decrypt_different_key_set(self):
        """
        Tests decryption with different fernet key set. Note that now we don't have the old fernet key with which
        value was encrypted so we would not be able to decrypt it and we should get an Invalid Token.
        """
        old_keys_set = ['test-ferent-key']
        self.assertEqual(settings.FERNET_KEYS, old_keys_set)
        new_keys_set = ['new-fernet-key']

        # Invalidate cached properties so that we get the latest keys
        invalidate_fernet_cached_properties(TranscriptCredentials, ['api_key', 'api_secret'])

        with override_settings(FERNET_KEYS=new_keys_set):
            self.assertEqual(settings.FERNET_KEYS, new_keys_set)
            with self.assertRaises(InvalidToken):
                TranscriptCredentials.objects.get(
                    org=self.credentials_data['org'], provider=self.credentials_data['provider']
                )
项目:edx-video-pipeline    作者:edx    | 项目源码 | 文件源码
def handle(self, *args, **options):
        """
        handle method for command class.
        """

        LOGGER.info('[Transcript credentials re-encryption] Process started.')

        # Invalidate cached properties so that we get the latest keys
        invalidate_fernet_cached_properties(TranscriptCredentials, ['api_key', 'api_secret'])

        try:
            with transaction.atomic():
                # Call save on each credentials record so that re-encryption can be be performed on fernet fields.
                for transcript_credential in TranscriptCredentials.objects.all():
                    transcript_credential.save()

            LOGGER.info('[Transcript credentials re-encryption] Process completed.')

        except InvalidToken:
            LOGGER.exception(
                '[Transcript credentials re-encryption] No valid fernet key present to decrypt. Process halted.'
            )
项目:katprep    作者:stdevel    | 项目源码 | 文件源码
def get_credential(self, hostname):
        """
        This function returns credentials for a particular hostname.

        :param hostname: hostname
        :type hostname: str
        """
        hostname = self.cut_hostname(hostname)
        try:
            if self.KEY:
                crypto = Fernet(self.KEY)
                return (
                    self.CREDENTIALS[hostname]["username"],
                    crypto.decrypt(self.CREDENTIALS[hostname]["password"][2:].encode())
                    )
            else:
                #return plain information
                return (
                    self.CREDENTIALS[hostname]["username"],
                    self.CREDENTIALS[hostname]["password"]
                    )
        except InvalidToken:
            raise ContainerException("Invalid password specified!")
        except KeyError:
            pass
项目:maas    作者:maas    | 项目源码 | 文件源码
def test__assures_data_integrity(self):
        self.write_secret()
        testdata = factory.make_bytes(size=10)
        token = fernet_encrypt_psk(testdata)
        bad_token = bytearray(token)
        # Flip a bit in the token, so we can ensure it won't decrypt if it
        # has been corrupted. Subtract 4 to avoid the end of the token; that
        # portion is just padding, and isn't covered by the HMAC.
        byte_to_flip = randint(0, len(bad_token) - 4)
        bit_to_flip = 1 << randint(0, 7)
        bad_token[byte_to_flip] ^= bit_to_flip
        bad_token = bytes(bad_token)
        test_description = ("token=%s; token[%d] ^= 0x%02x" % (
            token.decode("utf-8"), byte_to_flip, bit_to_flip))
        with ExpectedException(InvalidToken, msg=test_description):
            fernet_decrypt_psk(bad_token)
项目:FoxHA    作者:globocom    | 项目源码 | 文件源码
def parse_config_file(
            cipher_suite, config_file="./config/foxha_config.ini"
    ):
        try:
            repo_host, repo_port, repo_database, repo_user,\
                encrypted_repo_pass =\
                Utils.get_config_values_from_config_file(config_file)
        except (ConfigParser.NoSectionError) as err:
            print_error("Config file error: {}".format(err))
            exit(99)
        except (ConfigParser.NoOptionError) as err:
            print_error("Config file error: {}".format(err))
            exit(99)

        try:
            decrypted_repo_pass = cipher_suite.decrypt(encrypted_repo_pass)
            return repo_host, repo_port, repo_database,\
                repo_user, decrypted_repo_pass
        except InvalidToken as e:
            print_error("ERROR: InvalidToken")
            exit(99)
        except Exception as e:
            print_error("ERROR: %s" % e)
            exit(3)
项目:MusicBot    作者:BjoernPetersen    | 项目源码 | 文件源码
def _load_secrets():
    secrets_path = _get_secrets_path()
    if not path.isfile(secrets_path):
        logging.getLogger(__name__).debug("No secrets file found")
        return {}
    f = Fernet(key)
    with open(secrets_path, 'rb') as secrets_file:
        try:
            return json.loads(f.decrypt(secrets_file.read()).decode())
        except InvalidToken:
            logging.getLogger(__name__).critical("You entered the wrong password")
            async_handler.shutdown()
            return None
项目:cerberus-core    作者:ovh    | 项目源码 | 文件源码
def encrypt(self, data):
        """
            Symmetric encryption using django's secret key
        """
        try:
            encrypted = self._fernet.encrypt(data)
            return encrypted
        except (InvalidSignature, InvalidToken):
            raise CryptoException('unable to encrypt data')
项目:cerberus-core    作者:ovh    | 项目源码 | 文件源码
def decrypt(self, data):
        """
            Symmetric decryption using django's secret key
        """
        try:
            encrypted = self._fernet.decrypt(data)
            return encrypted
        except (InvalidSignature, InvalidToken):
            raise CryptoException('unable to decrypt data')
项目:edx-video-pipeline    作者:edx    | 项目源码 | 文件源码
def verify_access_credentials(self):
        """
        Fetches a record to check if we are able to get encrypted data.
        Accessing object that is not able to be decrypted, would throw InvalidToken error.
        """
        TranscriptCredentials.objects.get(
            org=self.credentials_data['org'], provider=self.credentials_data['provider']
        )
项目:edx-video-pipeline    作者:edx    | 项目源码 | 文件源码
def test_reencrypt_transcript_credentials_invalid_keys(self, mock_logger):
        """
        Test transcript credentials would not be re-encrypted if an decryption key is not provided with which
        data was encypted before.
        """
        # Verify fernet keys.
        self.assertEqual(settings.FERNET_KEYS, OLD_FERNET_KEYS_LIST)

        # Verify we are able to access the record.
        self.verify_access_credentials()

        # Modify key set so that old key is not presnet in the key list. Note that now we are not providing
        # a decryption key for data to be decrypted.
        new_keys_set = ['new-fernet-key']

        with override_settings(FERNET_KEYS=new_keys_set):
            self.assertEqual(settings.FERNET_KEYS, new_keys_set)
            # Run re-encryption process.
            call_command('re_encrypt_transcript_credentials')

            # Verify logging.
            mock_logger.info.assert_called_with('[Transcript credentials re-encryption] Process started.')
            mock_logger.exception.assert_called_with(
                '[Transcript credentials re-encryption] No valid fernet key present to decrypt. Process halted.'
            )

            # Verify we are not able to access the record, we should get an error due to decryption key not present.
            with self.assertRaises(InvalidToken):
                self.verify_access_credentials()
项目:badwolf    作者:bosondata    | 项目源码 | 文件源码
def _decrypt(self, token):
        from cryptography.fernet import InvalidToken

        try:
            return SecureToken.decrypt(token)
        except InvalidToken:
            logger.warning('Invalid secure token: %s', token)
            return ''
项目:apex-sigma-core    作者:lu-ci    | 项目源码 | 文件源码
def decrypt(cmd, message, args):
    key = cmd.bot.cfg.pref.raw.get('key_to_my_heart')
    text = False
    if key:
        if args:
            if args[-1] == ':t':
                text = True
                crypt_text = ''.join(args[:-1]).encode('utf-8')
            else:
                crypt_text = ''.join(args).encode('utf-8')
            key = key.encode('utf-8')
            cipher = Fernet(key)
            try:
                ciphered = cipher.decrypt(crypt_text).decode('utf-8')
            except InvalidToken:
                ciphered = None
            except InvalidSignature:
                ciphered = None
            if ciphered:
                if text:
                    response = ciphered
                else:
                    response = discord.Embed(color=0xe75a70)
                    response.add_field(name=f'?? Token Decrypted', value=ciphered)
            else:
                response = discord.Embed(color=0xBE1931, title='? The token or key are incorrect.')
        else:
            response = discord.Embed(color=0xBE1931, title='? Nothing to decrypt.')
    else:
        response = discord.Embed(color=0xBE1931, title='? You don\'t posses a key.')
    if text:
        await message.channel.send(response)
    else:
        await message.channel.send(embed=response)
项目:apex-sigma-core    作者:lu-ci    | 项目源码 | 文件源码
def encrypt(cmd, message, args):
    key = cmd.bot.cfg.pref.raw.get('key_to_my_heart')
    text = False
    if key:
        if args:
            if args[-1] == ':t':
                text = True
                crypt_text = ' '.join(args[:-1]).encode('utf-8')
            else:
                crypt_text = ' '.join(args).encode('utf-8')
            key = key.encode('utf-8')
            cipher = Fernet(key)
            try:
                ciphered = cipher.encrypt(crypt_text).decode('utf-8')
            except InvalidToken:
                ciphered = None
            except InvalidSignature:
                ciphered = None
            if ciphered:
                if text:
                    response = ciphered
                else:
                    response = discord.Embed(color=0xe75a70)
                    response.add_field(name=f'?? Text Encrypted', value=ciphered)
            else:
                response = discord.Embed(color=0xBE1931, title='? The token or key are incorrect.')
        else:
            response = discord.Embed(color=0xBE1931, title='? Nothing to decrypt.')
    else:
        response = discord.Embed(color=0xBE1931, title='? You don\'t posses a key.')
    if text:
        await message.channel.send(response)
    else:
        await message.channel.send(embed=response)
项目:ghost    作者:nir0s    | 项目源码 | 文件源码
def _assert_valid_stash(self):
        if not self._storage.is_initialized:
            raise GhostError(
                'Stash not initialized. Please initialize it and try again')
        else:
            try:
                key = self._storage.get('stored_passphrase')
                if key:
                    self._decrypt(key['value'])
            except InvalidToken:
                raise GhostError(
                    'The passphrase provided is invalid for this stash. '
                    'Please provide the correct passphrase')
项目:apex-sigma-plugins    作者:lu-ci    | 项目源码 | 文件源码
def decrypt(cmd, message, args):
    key = cmd.bot.cfg.pref.raw.get('key_to_my_heart')
    text = False
    if key:
        if args:
            if args[-1] == ':t':
                text = True
                crypt_text = ''.join(args[:-1]).encode('utf-8')
            else:
                crypt_text = ''.join(args).encode('utf-8')
            key = key.encode('utf-8')
            cipher = Fernet(key)
            try:
                ciphered = cipher.decrypt(crypt_text).decode('utf-8')
            except InvalidToken:
                ciphered = None
            except InvalidSignature:
                ciphered = None
            if ciphered:
                if text:
                    response = ciphered
                else:
                    response = discord.Embed(color=0xe75a70)
                    response.add_field(name=f'?? Token Decrypted', value=ciphered)
            else:
                response = discord.Embed(color=0xBE1931, title='? The token or key are incorrect.')
        else:
            response = discord.Embed(color=0xBE1931, title='? Nothing to decrypt.')
    else:
        response = discord.Embed(color=0xBE1931, title='? You don\'t posses a key.')
    if text:
        await message.channel.send(response)
    else:
        await message.channel.send(embed=response)
项目:apex-sigma-plugins    作者:lu-ci    | 项目源码 | 文件源码
def encrypt(cmd, message, args):
    key = cmd.bot.cfg.pref.raw.get('key_to_my_heart')
    text = False
    if key:
        if args:
            if args[-1] == ':t':
                text = True
                crypt_text = ' '.join(args[:-1]).encode('utf-8')
            else:
                crypt_text = ' '.join(args).encode('utf-8')
            key = key.encode('utf-8')
            cipher = Fernet(key)
            try:
                ciphered = cipher.encrypt(crypt_text).decode('utf-8')
            except InvalidToken:
                ciphered = None
            except InvalidSignature:
                ciphered = None
            if ciphered:
                if text:
                    response = ciphered
                else:
                    response = discord.Embed(color=0xe75a70)
                    response.add_field(name=f'?? Text Encrypted', value=ciphered)
            else:
                response = discord.Embed(color=0xBE1931, title='? The token or key are incorrect.')
        else:
            response = discord.Embed(color=0xBE1931, title='? Nothing to decrypt.')
    else:
        response = discord.Embed(color=0xBE1931, title='? You don\'t posses a key.')
    if text:
        await message.channel.send(response)
    else:
        await message.channel.send(embed=response)
项目:data-hub-backend    作者:uktrade-attic    | 项目源码 | 文件源码
def read(self):
        """
        Returns the cookie if valid and exists, None otherwise.
        """
        if self.exists():
            with open(config.cdms_cookie_path, 'rb') as f:
                try:
                    ciphertext = self.crypto.decrypt(f.read())
                    return pickle.loads(ciphertext)
                except (InvalidToken, TypeError):
                    self.reset()
        return None
项目:maas    作者:maas    | 项目源码 | 文件源码
def test__messages_from_the_past_exceeding_ttl_rejected(self):
        self.write_secret()
        testdata = factory.make_bytes()
        now = time.time()
        self.patch(time, "time").side_effect = [now - 2, now]
        token = fernet_encrypt_psk(testdata)
        with ExpectedException(InvalidToken):
            fernet_decrypt_psk(token, ttl=1)
项目:maas    作者:maas    | 项目源码 | 文件源码
def test__messages_from_future_exceeding_clock_skew_limit_rejected(self):
        self.write_secret()
        testdata = factory.make_bytes()
        now = time.time()
        self.patch(time, "time").side_effect = [now + 61, now]
        token = fernet_encrypt_psk(testdata)
        with ExpectedException(InvalidToken):
            fernet_decrypt_psk(token, ttl=1)
项目:FoxHA    作者:globocom    | 项目源码 | 文件源码
def crypt_pass(cipher_suite, password):
        try:
            cipher_text = cipher_suite.encrypt(password)
            print cipher_text
        except InvalidToken as e:
            print_error("ERROR: InvalidToken")
            exit(99)
项目:FoxHA    作者:globocom    | 项目源码 | 文件源码
def decrypt_pass(cipher_suite, password):
        try:
            cipher_text = cipher_suite.decrypt(password)
            print cipher_text
        except InvalidToken as e:
            print_error("ERROR: InvalidToken")
            exit(99)
项目:python-secureconfig    作者:nthmost    | 项目源码 | 文件源码
def test_read_enc_wrong_key_raises_InvalidToken(self):
        'ValueError: No JSON object could be decoded'
        args = [TEST_KEYSTRING_WRONG]
        kwargs = { 'filepath': TEST_JSON_OUTFILE }
        self.assertRaises(InvalidToken, SecureJson.from_key, *args, **kwargs)
项目:python-secureconfig    作者:nthmost    | 项目源码 | 文件源码
def test_bad_key_raises_InvalidToken(self):
        try:
            ck = CryptKeeper(TEST_BAD_KEY)
        except InvalidToken:
            assert True
项目:python-secureconfig    作者:nthmost    | 项目源码 | 文件源码
def test_wrong_key_raises_InvalidToken(self):
        enctxt = encrypt_string(TEST_KEYSTRING, 'test string')
        self.failUnlessRaises(InvalidToken, self.string_ck_wrong.decrypt, enctxt)
项目:python-secureconfig    作者:nthmost    | 项目源码 | 文件源码
def test_wrong_ck_raises_InvalidToken(self):
        scfg = SecureConfigParser(ck=self.ck_wrong)
        scfg.read(TEST_INI_OUTFILE)
        self.assertRaises(InvalidToken, scfg.get(testd['section'], testd['enc']['key']))
项目:pgrepup    作者:rtshome    | 项目源码 | 文件源码
def decrypt(password):
    encrypted_passwords = config().get('Security', 'encrypted_credentials') == 'y'
    if not encrypted_passwords:
        return password

    try:
        f = Fernet(_get_key())
        return f.decrypt(password)
    except InvalidToken:
        print("Invalid master password")
        sys.exit(-1)
项目:apex-sigma-core    作者:lu-ci    | 项目源码 | 文件源码
def dokidoki(cmd, message, args):
    char = None
    glitch = False
    if args:
        if args[0][0].lower() in files:
            char = args[0][0].lower()
        if args[-1].startswith(':g'):
            glitch = True
    if not char:
        char = secrets.choice(list(files))
    char_file = files[char]
    with open(f'doki/{char_file}.luci', 'rb') as quote_file:
        quotes = quote_file.read()
    key = cmd.bot.cfg.pref.raw.get('key_to_my_heart')
    if key:
        key = key.encode('utf-8')
        cipher = Fernet(key)
        try:
            ciphered = cipher.decrypt(quotes).decode('utf-8')
        except InvalidToken:
            ciphered = None
        if ciphered:
            if not glitch:
                glitch = secrets.randbelow(6)
                glitch = not bool(glitch)
            if glitch:
                line_count = 1
                thumbnail = chars_glitch[char]
            else:
                line_count = 3
                thumbnail = secrets.choice(chars[char])
            lines = []
            for x in range(0, line_count):
                output = markovify.Text(ciphered).make_short_sentence(500, tries=100)
                output = clean(output, message.author)
                if glitch:
                    output = cipher.encrypt(output.encode('utf-8')).decode('utf-8')
                lines.append(output)
            output_final = ' '.join(lines)
            if glitch:
                title = titles_glitch[char]
            else:
                title = titles[char]
            response = discord.Embed(color=0xe75a70)
            response.add_field(name=f'?? {title}', value=output_final)
            response.set_thumbnail(url=thumbnail)
        else:
            response = discord.Embed(color=0xe75a70, title='?? Sorry but that key is incorrect!')
    else:
        response = discord.Embed(color=0xe75a70, title='?? You are missing the key to my heart!')
    await message.channel.send(embed=response)
项目:apex-sigma-plugins    作者:lu-ci    | 项目源码 | 文件源码
def dokidoki(cmd, message, args):
    char_letters = ['m', 'n', 'y', 's']
    char = None
    glitch = False
    if args:
        if args[0][0].lower() in char_letters:
            char = args[0][0].lower()
        if args[-1].startswith(':g'):
            glitch = True
    if not char:
        char = secrets.choice(char_letters)
    char_file = files[char]
    with open(f'doki/{char_file}.luci', 'rb') as quote_file:
        quotes = quote_file.read()
    key = cmd.bot.cfg.pref.raw.get('key_to_my_heart')
    if key:
        key = key.encode('utf-8')
        cipher = Fernet(key)
        try:
            ciphered = cipher.decrypt(quotes).decode('utf-8')
        except InvalidToken:
            ciphered = None
        if ciphered:
            if not glitch:
                glitch = secrets.randbelow(6)
                glitch = not bool(glitch)
            if glitch:
                line_count = 1
                thumbnail = chars_glitch[char]
            else:
                line_count = 3
                thumbnail = secrets.choice(chars[char])
            lines = []
            for x in range(0, line_count):
                output = markovify.Text(ciphered).make_short_sentence(500, tries=100)
                output = clean(output, message.author)
                if glitch:
                    output = cipher.encrypt(output.encode('utf-8')).decode('utf-8')
                lines.append(output)
            output_final = ' '.join(lines)
            if glitch:
                title = titles_glitch[char]
            else:
                title = titles[char]
            response = discord.Embed(color=0xe75a70)
            response.add_field(name=f'?? {title}', value=output_final)
            response.set_thumbnail(url=thumbnail)
        else:
            response = discord.Embed(color=0xe75a70, title='?? Sorry but that key is incorrect!')
    else:
        response = discord.Embed(color=0xe75a70, title='?? You are missing the key to my heart!')
    await message.channel.send(embed=response)
项目:katprep    作者:stdevel    | 项目源码 | 文件源码
def __manage_credentials(self, hostname, username, password,
        remove_entry=False):
        """
        This functions adds or removes credentials to/from the authentication
        container.
        Adding credentials requires a hostname, username and corresponding
        password. Removing credentials only requires a hostname.

        There are two alias functions for credentials management:
        add_credentials() and remove_credentials()

        :param hostname: hostname
        :type hostname: str
        :param username: username
        :type username: str
        :param password: corresponding password
        :type password: str
        :param remove_entry: setting True will remove an entry
        :type remove_entry: bool
        """
        global CREDENTIALS
        hostname = self.cut_hostname(hostname)

        try:
            if remove_entry:
                #remove entry
                del self.CREDENTIALS[hostname]
            else:
                #add entry
                self.CREDENTIALS[hostname] = {}
                self.CREDENTIALS[hostname]["username"] = username
                #add encrypted or plain password
                if self.KEY:
                    crypto = Fernet(self.KEY)
                    self.CREDENTIALS[hostname]["password"] = "s/{0}".format(
                        crypto.encrypt(password.encode()))
                else:
                    self.CREDENTIALS[hostname]["password"] = password
        except InvalidToken:
            raise ContainerException("Invalid password specified!")
        except KeyError:
            pass

    #aliases
项目:maas    作者:maas    | 项目源码 | 文件源码
def read_beacon_payload(beacon_bytes):
    """Returns a BeaconPayload namedtuple representing the given beacon bytes.

    Decrypts the inner beacon data if necessary.

    :param beacon_bytes: beacon payload (bytes).
    :return: BeaconPayload namedtuple
    """
    if len(beacon_bytes) < BEACON_HEADER_LENGTH_V1:
        raise InvalidBeaconingPacket(
            "Beaconing packet must be at least %d bytes." % (
                BEACON_HEADER_LENGTH_V1))
    header = beacon_bytes[:BEACON_HEADER_LENGTH_V1]
    version, beacon_type_code, expected_payload_length = struct.unpack(
        BEACON_HEADER_FORMAT_V1, header)
    actual_payload_length = len(beacon_bytes) - BEACON_HEADER_LENGTH_V1
    if len(beacon_bytes) - BEACON_HEADER_LENGTH_V1 < expected_payload_length:
        raise InvalidBeaconingPacket(
            "Invalid payload length: expected %d bytes, got %d bytes." % (
                expected_payload_length, actual_payload_length))
    payload_start = BEACON_HEADER_LENGTH_V1
    payload_end = BEACON_HEADER_LENGTH_V1 + expected_payload_length
    payload_bytes = beacon_bytes[payload_start:payload_end]
    payload = None
    if version == 1:
        if len(payload_bytes) == 0:
            # No encrypted inner payload; nothing to do.
            pass
        else:
            try:
                decrypted_data = fernet_decrypt_psk(
                    payload_bytes, ttl=60, raw=True)
            except InvalidToken:
                raise InvalidBeaconingPacket(
                    "Failed to decrypt inner payload: check MAAS secret key.")
            try:
                decompressed_data = decompress(decrypted_data)
            except OSError:
                raise InvalidBeaconingPacket(
                    "Failed to decompress inner payload: %r" % decrypted_data)
            try:
                # Replace the data in the dictionary with its decrypted form.
                payload = BSON.decode(decompressed_data)
            except BSONError:
                raise InvalidBeaconingPacket(
                    "Inner beacon payload is not BSON: %r" % decompressed_data)
    else:
        raise InvalidBeaconingPacket(
            "Unknown beacon version: %d" % version)
    beacon_type_code = payload["type"] if payload else beacon_type_code
    return BeaconPayload(
        beacon_bytes, version, BEACON_TYPE_VALUES[beacon_type_code], payload)