Python cryptography.fernet 模块,Fernet() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用cryptography.fernet.Fernet()

项目:doctr    作者:drdoctr    | 项目源码 | 文件源码
def decrypt_file(file, key):
    """
    Decrypts the file ``file``.

    The encrypted file is assumed to end with the ``.enc`` extension. The
    decrypted file is saved to the same location without the ``.enc``
    extension.

    The permissions on the decrypted file are automatically set to 0o600.

    See also :func:`doctr.local.encrypt_file`.

    """
    if not file.endswith('.enc'):
        raise ValueError("%s does not end with .enc" % file)

    fer = Fernet(key)

    with open(file, 'rb') as f:
        decrypted_file = fer.decrypt(f.read())

    with open(file[:-4], 'wb') as f:
        f.write(decrypted_file)

    os.chmod(file[:-4], 0o600)
项目:combine    作者:llllllllll    | 项目源码 | 文件源码
def __init__(self,
                 bot_user,
                 osu_client,
                 model_cache_dir,
                 model_cache_size,
                 token_secret,
                 upload_url):
        super().__init__({bot_user})

        self.bot_user = bot_user
        self.osu_client = osu_client
        self.model_cache_dir = pathlib.Path(model_cache_dir)
        self.token_secret = Fernet(token_secret)
        self.upload_url = upload_url

        self.get_model = lru_cache(model_cache_size)(self._get_model)
        self._user_stats = ExpiringCache()

        self._candidates = LockedIterator(self._gen_candidates())
项目:MusicBot    作者:BjoernPetersen    | 项目源码 | 文件源码
def save_secrets():
    logger = logging.getLogger(__name__)
    logger.info("Reading secrets from environment variables")
    secrets = {}

    missing_key = False
    for secrets_key in secrets_keys:
        try:
            logger.info("Writing key: " + secrets_key)
            secrets[secrets_key] = os.environ[secrets_key]
        except KeyError:
            logger.warning("Missing key:", secrets_key)
            missing_key = True

    if missing_key:
        logger.critical("Aborting...")
        return

    logger.info("Writing secrets to secrets.json")
    f = Fernet(key)
    with open("config/secrets.dat", 'wb') as secrets_file:
        secrets_file.write(f.encrypt(json.dumps(secrets).encode()))
项目:bitmask-dev    作者:leapcode    | 项目源码 | 文件源码
def _encode_uuid_map(userid, uuid, passwd):
    data = 'userid:%s:uuid:%s' % (userid, uuid)

    # FIXME scrypt.encrypt is broken in windows.
    # This is a quick hack. The hostname might not be unique enough though.
    # We could use a long random hash per entry and store it in the file.
    # Other option is to use a different KDF that is supported by cryptography
    # (ie, pbkdf)

    if IS_WIN:
        key = scrypt.hash(passwd, socket.gethostname())
        key = base64.urlsafe_b64encode(key[:32])
        f = Fernet(key, backend=crypto_backend)
        encrypted = f.encrypt(data)
    else:
        encrypted = scrypt.encrypt(data, passwd, maxtime=0.05)
    return base64.urlsafe_b64encode(encrypted)
项目:bitmask-dev    作者:leapcode    | 项目源码 | 文件源码
def _decode_uuid_line(line, passwd):
    decoded = base64.urlsafe_b64decode(line)
    if IS_WIN:
        key = scrypt.hash(passwd, socket.gethostname())
        key = base64.urlsafe_b64encode(key[:32])
        try:
            f = Fernet(key, backend=crypto_backend)
            maybe_decrypted = f.decrypt(key)
        except Exception:
            return None
    else:
        try:
            maybe_decrypted = scrypt.decrypt(decoded, passwd, maxtime=0.1)
        except scrypt.error:
            return None
    match = re.findall("userid\:(.+)\:uuid\:(.+)", maybe_decrypted)
    if match:
        return match[0]
项目:incubator-airflow-old    作者:apache    | 项目源码 | 文件源码
def get_fernet():
    """
    Deferred load of Fernet key.

    This function could fail either because Cryptography is not installed
    or because the Fernet key is invalid.

    :return: Fernet object
    :raises: AirflowException if there's a problem trying to load Fernet
    """
    try:
        from cryptography.fernet import Fernet
    except:
        raise AirflowException('Failed to import Fernet, it may not be installed')
    try:
        return Fernet(configuration.get('core', 'FERNET_KEY').encode('utf-8'))
    except ValueError as ve:
        raise AirflowException("Could not create Fernet object: {}".format(ve))
项目:hide.py    作者:nukeop    | 项目源码 | 文件源码
def decrypt(data, password):
    """Decrypts data using the password.

    Decrypts the data using the provided password using the cryptography module.
    If the pasword or data is incorrect this will return None. 
    """

    password = bytes(password)

    #Salt is equal to password as we want the encryption to be reversible only
    #using the password itself
    kdf = PBKDF2HMAC(algorithm=hashes.AES(),
                     length=32,
                     salt=bytes(password),
                     iterations=100000,
                     backend=default_backend())

    key = base64.urlsafe_b64encode(kdf.derive(password))
    f = Fernet(key)
    token = f.decrypt(data)
    return token
项目:cmdb    作者:jonmsawyer    | 项目源码 | 文件源码
def encrypt(self, save_content=False):
        if self.encryption_key is None:
            raise Exception('Cannot encrypt content, missing encryption key.')
        if self.config.get('content') is None:
            raise Exception('Cannot encrypt content, content is empty.')
        if self.is_encryptable == False:
            raise Exception('Cannot encrypt, improper configuration.')
        if self.config.get('is_encrypted') == True:
            return self.config.get('content')

        f = Fernet(self.encryption_key)
        if self.config.get('is_binary') == True:
            encr_content = f.encrypt(self.config.get('content'))
        elif self.config.get('is_binary') == False:
            encr_content = f.encrypt(self.config.get('content').encode('utf-8')).decode('utf-8')
        else:
            raise Exception('Could not tell if file is binary or text. Aborting.')
        if save_content == True:
            try:
                self.config['content'] = encr_content
                self.config['content_length'] = len(encr_content)
                self.config['is_encrypted'] = True
            except:
                raise
        return encr_content
项目:cmdb    作者:jonmsawyer    | 项目源码 | 文件源码
def decrypt(self, save_content=False):
        if self.encryption_key is None:
            raise LocalConfigFileError('Cannot decrypt content, missing encryption key.')
        if self.config.get('content') is None:
            raise LocalConfigFileError('Cannot decrypt content, content is empty.')
        if self.is_encryptable == False:
            raise LocalConfigFileError('Cannot decrypt, improper configuration.')
        if self.config.get('is_encrypted') == False:
            return self.config.get('content')

        f = Fernet(self.encryption_key)
        if self.config.get('is_binary') == True:
            decr_content = f.decrypt(self.config.get('content'))
        elif self.config.get('is_binary') == False:
            decr_content = f.decrypt(self.config.get('content').encode('utf-8')).decode('utf-8')
        else:
            raise LocalConfigFileError('Could not tell if file is binary or text. Aborting.')
        if save_content == True:
            try:
                self.config['content'] = decr_content
                self.config['content_length'] = len(decr_content)
                self.config['is_encrypted'] = False
            except:
                raise
        return decr_content
项目:netcrawl    作者:Wyko    | 项目源码 | 文件源码
def test_get_Fernet_returns_a_key():
    fake = Faker()
    appname= '_'.join(['test_a_netcrawl_', fake.word(), fake.word()])
    username= '_'.join(['test_u_netcrawl_', fake.word(), fake.word()])

    key= manage._get_fernet_key(appname, username)

    assert isinstance(key, fernet.Fernet), 'Key [{}] is wrong type'.format(
        type(key))

    assert keyring.get_password(appname, username) is not None

    keyring.delete_password(appname, username)

    assert keyring.get_password(appname, username) is None


#===============================================================================
# def test_check_credentials_no_error():
#     config.cc.check_credentials()
#===============================================================================
项目:aumbry    作者:pyarmory    | 项目源码 | 文件源码
def setup_up_config(func):
    def wrapper(arguments):
        _, ext = os.path.splitext(arguments.path)
        file_data = b''

        if os.path.exists(arguments.path):
            with open(arguments.path, 'rb') as fp:
                file_data = fp.read()

        if file_data and arguments.fernet_key:
            f = Fernet(arguments.fernet_key.encode('utf-8'))
            file_data = f.decrypt(file_data)

        with tempfile.NamedTemporaryFile(suffix=ext) as fp:
            fp.write(file_data)
            fp.file.flush()

            arguments.origin_path = arguments.path
            arguments.path = fp.name

            return func(arguments)

    return wrapper
项目:aumbry    作者:pyarmory    | 项目源码 | 文件源码
def can_save_an_encrypted_file(self):
        with tempfile.NamedTemporaryFile() as tp:
            save_temporary_config(
                './spec/cli/sample.yml',
                tp.name,
                test_key
            )

            with open(tp.name, 'rb') as fp:
                ct = fp.read()

        f = Fernet(test_key.encode('utf-8'))
        data = yaml.load(f.decrypt(ct))

        expect(data['thing']).to.equal('bam')
        expect(data['other']).to.equal('thing')
项目:katprep    作者:stdevel    | 项目源码 | 文件源码
def get_credential(self, hostname):
        """
        This function returns credentials for a particular hostname.

        :param hostname: hostname
        :type hostname: str
        """
        hostname = self.cut_hostname(hostname)
        try:
            if self.KEY:
                crypto = Fernet(self.KEY)
                return (
                    self.CREDENTIALS[hostname]["username"],
                    crypto.decrypt(self.CREDENTIALS[hostname]["password"][2:].encode())
                    )
            else:
                #return plain information
                return (
                    self.CREDENTIALS[hostname]["username"],
                    self.CREDENTIALS[hostname]["password"]
                    )
        except InvalidToken:
            raise ContainerException("Invalid password specified!")
        except KeyError:
            pass
项目:routewatch    作者:nerdalize    | 项目源码 | 文件源码
def encrypt(data, secret):
    """
    Basically wraps the cryptography lib to reduce code duplication
    :param data:
    :type data: bytes
    :param secret:
    :type secret: bytes
    :return:
    :rtype: bytes, str
    """
    # Generate a cryptographically secure salt
    r = Random.new()
    salt = r.read(16)
    # Generates a suitable key from the secret by HMACing it with the salt
    kdf = PBKDF2HMAC(algorithm=hashes.SHA256(), length=32, salt=salt, iterations=100000, backend=default_backend())
    key = urlsafe_b64encode(kdf.derive(secret))
    # Encrypts the data
    f = Fernet(key)
    ciphertext = f.encrypt(data)
    return b64encode(salt + ciphertext)
项目:routewatch    作者:nerdalize    | 项目源码 | 文件源码
def decrypt(encodedciphertext, secret):
    """
    Basically wraps the cryptography lib to reduce code duplication
    :param encodedciphertext:
    :type encodedciphertext: bytes
    :param secret:
    :type secret: bytes
    :return:
    """
    # Split out the salt from the ciphertext
    ciphertext = b64decode(encodedciphertext)
    salt = ciphertext[:16]
    ciphertext = ciphertext[16:]
    # Regenerates the key by HMACing the secret with the salt
    kdf = PBKDF2HMAC(algorithm=hashes.SHA256(), length=32, salt=salt, iterations=100000, backend=default_backend())
    key = urlsafe_b64encode(kdf.derive(secret))
    # Decrypts the data
    f = Fernet(key)
    data = f.decrypt(ciphertext)
    return data
项目:maas    作者:maas    | 项目源码 | 文件源码
def fernet_encrypt_psk(message, raw=False):
    """Encrypts the specified message using the Fernet format.

    Returns the encrypted token, as a byte string.

    Note that a Fernet token includes the current time. Users decrypting a
    the token can specify a TTL (in seconds) indicating how long the encrypted
    message should be valid. So the system clock must be correct before calling
    this function.

    :param message: The message to encrypt.
    :type message: Must be of type 'bytes' or a UTF-8 'str'.
    :param raw: if True, returns the decoded base64 bytes representing the
        Fernet token. The bytes must be converted back to base64 to be
        decrypted. (Or the 'raw' argument on the corresponding
        fernet_decrypt_psk() function can be used.)
    :return: the encryption token, as a base64-encoded byte string.
    """
    fernet = _get_fernet_context()
    if isinstance(message, str):
        message = message.encode("utf-8")
    token = fernet.encrypt(message)
    if raw is True:
        token = urlsafe_b64decode(token)
    return token
项目:maas    作者:maas    | 项目源码 | 文件源码
def fernet_decrypt_psk(token, ttl=None, raw=False):
    """Decrypts the specified Fernet token using the MAAS secret.

    Returns the decrypted token as a byte string; the user is responsible for
    converting it to the correct format or encoding.

    :param message: The token to decrypt.
    :type token: Must be of type 'bytes', or an ASCII base64 string.
    :param ttl: Optional amount of time (in seconds) allowed to have elapsed
        before the message is rejected upon decryption. Note that the Fernet
        library considers times up to 60 seconds into the future (beyond the
        TTL) to be valid.
    :param raw: if True, treats the string as the decoded base64 bytes of a
        Fernet token, and attempts to encode them (as expected by the Fernet
        APIs) before decrypting.
    :return: bytes
    """
    if raw is True:
        token = urlsafe_b64encode(token)
    f = _get_fernet_context()
    if isinstance(token, str):
        token = token.encode("ascii")
    return f.decrypt(token, ttl=ttl)
项目:python-confidant-client    作者:lyft    | 项目源码 | 文件源码
def _get_decrypted_pairs(self, credential):
        """
        From credential, get decrypted blind credential pairs.

        Given a region => data_key dict of data keys, a region => context dict
        of KMS encryption context, a dict of encrypted credential pairs, a
        cipher and a cipher version, return decrypted credential_pairs.
        """
        region = self.config['region']
        _context = credential['metadata']['context'][region]
        if self.aws_creds:
            _kms_client = confidant_client.services.get_boto_client(
                'kms',
                region=self.config['region'],
                aws_access_key_id=self.aws_creds['AccessKeyId'],
                aws_secret_access_key=self.aws_creds['SecretAccessKey'],
                aws_session_token=self.aws_creds['SessionToken']
            )
        else:
            _kms_client = self.kms_client
        _data_key = cryptolib.decrypt_datakey(
            base64.b64decode(
                ensure_bytes(credential['data_key'][region])
            ),
            _context,
            _kms_client
        )
        _credential_pair = credential['credential_pairs'][region]
        f = Fernet(_data_key)
        return json.loads(f.decrypt(_credential_pair.encode('utf-8')))
项目:Steganography    作者:Ludisposed    | 项目源码 | 文件源码
def encrypt_text(password, token):
    f = Fernet(get_key(password))
    return f.encrypt(bytes(token))
项目:Steganography    作者:Ludisposed    | 项目源码 | 文件源码
def decrypt_text(password, token):
    f = Fernet(get_key(password))
    return f.decrypt(bytes(token))
项目:django-encrypted-filefield    作者:danielquinn    | 项目源码 | 文件源码
def _get_fernet(cls):
        return Fernet(base64.urlsafe_b64encode(PBKDF2HMAC(
            algorithm=hashes.SHA256(),
            length=32,
            salt=SALT,
            iterations=100000,
            backend=default_backend()
        ).derive(PASSWORD)))
项目:combine    作者:llllllllll    | 项目源码 | 文件源码
def gen_token(obj, user):
    """Generate a token for a user.
    """
    from cryptography.fernet import Fernet

    from .token import gen_token

    print(gen_token(Fernet(obj.token_secret), user))
项目:MusicBot    作者:BjoernPetersen    | 项目源码 | 文件源码
def _load_secrets():
    secrets_path = _get_secrets_path()
    if not path.isfile(secrets_path):
        logging.getLogger(__name__).debug("No secrets file found")
        return {}
    f = Fernet(key)
    with open(secrets_path, 'rb') as secrets_file:
        try:
            return json.loads(f.decrypt(secrets_file.read()).decode())
        except InvalidToken:
            logging.getLogger(__name__).critical("You entered the wrong password")
            async_handler.shutdown()
            return None
项目:MusicBot    作者:BjoernPetersen    | 项目源码 | 文件源码
def save_secrets():
    """
    Save secrets to encrypted file on disk.
    """
    secrets_path = _get_secrets_path()
    f = Fernet(key)
    with open(secrets_path, 'wb') as secrets_file:
        secrets_file.write(f.encrypt(json.dumps(_secrets).encode()))
项目:cerberus-core    作者:ovh    | 项目源码 | 文件源码
def __init__(self):

        self._salt = settings.SECRET_KEY
        self._kdf = PBKDF2HMAC(
            algorithm=hashes.SHA256(),
            length=32,
            salt=self._salt,
            iterations=100000,
            backend=default_backend()
        )
        self._key = base64.urlsafe_b64encode(self._kdf.derive(settings.SECRET_KEY))
        self._fernet = Fernet(self._key)
项目:python-ares    作者:pynog    | 项目源码 | 文件源码
def encrypt(pwd, key):
  """ """
  salt = os.urandom(32)
  kdf = PBKDF2HMAC(algorithm=hashes.SHA256(), length=32, salt=salt, iterations=100000, backend=default_backend())
  encrypted_key = base64.urlsafe_b64encode(kdf.derive(bytes(key.encode('utf-8'))))
  f = Fernet(encrypted_key)
  return f.encrypt(bytes(pwd.encode('utf-8'))).decode('latin1'), salt.decode('latin1')
项目:python-ares    作者:pynog    | 项目源码 | 文件源码
def decrypt(cipher_txt, key, salt):
  salt = bytes(salt.encode('latin1'))
  cipher_txt = bytes(str(cipher_txt).encode('latin1'))
  kdf = PBKDF2HMAC(algorithm=hashes.SHA256(), length=32, salt=salt, iterations=100000, backend=default_backend())
  encrypted_key = base64.urlsafe_b64encode(kdf.derive(bytes(key.encode('utf-8'))))
  f = Fernet(encrypted_key)
  return f.decrypt(cipher_txt).decode('utf-8')
项目:badwolf    作者:bosondata    | 项目源码 | 文件源码
def encrypt(text):
        fernet = Fernet(to_binary(current_app.config['SECURE_TOKEN_KEY']))
        return fernet.encrypt(to_binary(text))
项目:badwolf    作者:bosondata    | 项目源码 | 文件源码
def decrypt(encrypted):
        fernet = Fernet(to_binary(current_app.config['SECURE_TOKEN_KEY']))
        text = fernet.decrypt(to_binary(encrypted))
        return to_text(text)
项目:parsec-cloud    作者:Scille    | 项目源码 | 文件源码
def _fernet_from_password(password, salt):
    kdf = PBKDF2HMAC(
        algorithm=hashes.SHA256(),
        length=32,
        salt=salt,
        iterations=100000,
        backend=default_backend()
    )
    key = base64.urlsafe_b64encode(kdf.derive(password))
    return Fernet(key)
项目:bilean    作者:openstack    | 项目源码 | 文件源码
def encrypt(msg):
    '''Encrypt message with random key.

    :param msg: message to be encrypted
    :returns: encrypted msg and key to decrypt
    '''
    password = fernet.Fernet.generate_key()
    f = fernet.Fernet(password)
    key = f.encrypt(encodeutils.safe_encode(msg))
    return encodeutils.safe_decode(password), encodeutils.safe_decode(key)
项目:bilean    作者:openstack    | 项目源码 | 文件源码
def decrypt(msg, key):
    '''Decrypt message using provided key.

    :param msg: encrypted message
    :param key: key used to decrypt
    :returns: decrypted message string
    '''
    f = fernet.Fernet(encodeutils.safe_encode(msg))
    msg = f.decrypt(encodeutils.safe_encode(key))

    return encodeutils.safe_decode(msg)
项目:apex-sigma-core    作者:lu-ci    | 项目源码 | 文件源码
def decrypt(cmd, message, args):
    key = cmd.bot.cfg.pref.raw.get('key_to_my_heart')
    text = False
    if key:
        if args:
            if args[-1] == ':t':
                text = True
                crypt_text = ''.join(args[:-1]).encode('utf-8')
            else:
                crypt_text = ''.join(args).encode('utf-8')
            key = key.encode('utf-8')
            cipher = Fernet(key)
            try:
                ciphered = cipher.decrypt(crypt_text).decode('utf-8')
            except InvalidToken:
                ciphered = None
            except InvalidSignature:
                ciphered = None
            if ciphered:
                if text:
                    response = ciphered
                else:
                    response = discord.Embed(color=0xe75a70)
                    response.add_field(name=f'?? Token Decrypted', value=ciphered)
            else:
                response = discord.Embed(color=0xBE1931, title='? The token or key are incorrect.')
        else:
            response = discord.Embed(color=0xBE1931, title='? Nothing to decrypt.')
    else:
        response = discord.Embed(color=0xBE1931, title='? You don\'t posses a key.')
    if text:
        await message.channel.send(response)
    else:
        await message.channel.send(embed=response)
项目:apex-sigma-core    作者:lu-ci    | 项目源码 | 文件源码
def encrypt(cmd, message, args):
    key = cmd.bot.cfg.pref.raw.get('key_to_my_heart')
    text = False
    if key:
        if args:
            if args[-1] == ':t':
                text = True
                crypt_text = ' '.join(args[:-1]).encode('utf-8')
            else:
                crypt_text = ' '.join(args).encode('utf-8')
            key = key.encode('utf-8')
            cipher = Fernet(key)
            try:
                ciphered = cipher.encrypt(crypt_text).decode('utf-8')
            except InvalidToken:
                ciphered = None
            except InvalidSignature:
                ciphered = None
            if ciphered:
                if text:
                    response = ciphered
                else:
                    response = discord.Embed(color=0xe75a70)
                    response.add_field(name=f'?? Text Encrypted', value=ciphered)
            else:
                response = discord.Embed(color=0xBE1931, title='? The token or key are incorrect.')
        else:
            response = discord.Embed(color=0xBE1931, title='? Nothing to decrypt.')
    else:
        response = discord.Embed(color=0xBE1931, title='? You don\'t posses a key.')
    if text:
        await message.channel.send(response)
    else:
        await message.channel.send(embed=response)
项目:postgraas_server    作者:blue-yonder    | 项目源码 | 文件源码
def get_config(config_filename=get_default_config_filename(), secrets_file='/secrets'):
    config = ConfigParser()
    logger.debug('config filename: {}'.format(config_filename))
    secrets = _load_secrets(filename=secrets_file)
    if secrets:
        from cryptography.fernet import Fernet
        f = Fernet(secrets['encryption_key'].encode())
        with open(config_filename, 'rb') as cfg:
            cfg_content = f.decrypt(cfg.read())
        print(cfg_content)
        config.read_string(cfg_content.decode("utf-8") )
    else:
        config.read(config_filename)
    expand_env_vars(config)
    return config
项目:kinto-portier    作者:Kinto    | 项目源码 | 文件源码
def encrypt(message, key):
    key = base64.urlsafe_b64encode(codecs.decode(key, 'hex'))
    f = Fernet(key)
    return f.encrypt(message.encode('utf-8')).decode('utf-8')
项目:kinto-portier    作者:Kinto    | 项目源码 | 文件源码
def decrypt(encrypted_message, key):
    key = base64.urlsafe_b64encode(codecs.decode(key, 'hex'))
    f = Fernet(key)
    return f.decrypt(encrypted_message.encode('utf-8')).decode('utf-8')
项目:cmdb    作者:jonmsawyer    | 项目源码 | 文件源码
def __init__(self, file_path, config_dict=None, encryption_key=None, binary_mode=False, *args, **kwargs):
        self.config = {
            'file_path': file_path,
            'is_binary': None,
            'is_encrypted': False,
            'is_case_sensitive': None,
            'is_disabled': None,
            'mtime': -1,
            'sha1_checksum': None,
            'content': None,
            'content_length': None,
        }
        self.is_resolved = False
        self.is_file_not_found = None
        self.path = Path(file_path)
        self.stat = None
        self.encryption_key = None
        self.is_encryptable = False
        self.binary_mode = False
        self.config_file_type = None
        self.config_dict = None

        if isinstance(config_dict, dict):
            self.config_dict = check_config(config_dict)
        try:
            self.stat = self.path.resolve().stat()
        except:
            self.stat = None
        self.is_resolved = False
        self.binary_mode = bool(binary_mode)
        if encryption_key is not None:
            key_len = 32
            if not isinstance(encryption_key, str):
                raise LocalConfigFileError('Encryption key must be a url-safe 32 character ascii string.')
            if len(encryption_key[:key_len]) != key_len:
                raise LocalConfigFileError('Encryption key must be a url-safe 32 character ascii string.')
            self.encryption_key = base64.b64encode(encryption_key.encode('ascii'))
            Fernet(self.encryption_key) # try it out, if no exception, we're good
            self.is_encryptable = True
项目:asgi_redis    作者:django    | 项目源码 | 文件源码
def make_fernet(self, key):
        """
        Given a single encryption key, returns a Fernet instance using it.
        """
        from cryptography.fernet import Fernet
        if isinstance(key, six.text_type):
            key = key.encode("utf8")
        formatted_key = base64.urlsafe_b64encode(hashlib.sha256(key).digest())
        return Fernet(formatted_key)
项目:aumbry    作者:pyarmory    | 项目源码 | 文件源码
def save_temporary_config(temp_file_path, output_path, fernet_key=None):
    with open(temp_file_path, 'rb') as fp:
        file_data = fp.read()

    if fernet_key:
        f = Fernet(fernet_key.encode('utf-8'))
        file_data = f.encrypt(file_data)

    with open(output_path, 'wb') as fp:
        fp.write(file_data)
项目:raftos    作者:zhebrak    | 项目源码 | 文件源码
def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

        kdf = PBKDF2HMAC(
            algorithm=hashes.SHA256(),
            length=32,
            salt=self.config.salt,
            iterations=100000,
            backend=default_backend()
        )
        self.f = Fernet(base64.urlsafe_b64encode(kdf.derive(self.config.secret_key)))
项目:ghost    作者:nir0s    | 项目源码 | 文件源码
def cipher(self):
        return Fernet(self.key)
项目:apex-sigma-plugins    作者:lu-ci    | 项目源码 | 文件源码
def decrypt(cmd, message, args):
    key = cmd.bot.cfg.pref.raw.get('key_to_my_heart')
    text = False
    if key:
        if args:
            if args[-1] == ':t':
                text = True
                crypt_text = ''.join(args[:-1]).encode('utf-8')
            else:
                crypt_text = ''.join(args).encode('utf-8')
            key = key.encode('utf-8')
            cipher = Fernet(key)
            try:
                ciphered = cipher.decrypt(crypt_text).decode('utf-8')
            except InvalidToken:
                ciphered = None
            except InvalidSignature:
                ciphered = None
            if ciphered:
                if text:
                    response = ciphered
                else:
                    response = discord.Embed(color=0xe75a70)
                    response.add_field(name=f'?? Token Decrypted', value=ciphered)
            else:
                response = discord.Embed(color=0xBE1931, title='? The token or key are incorrect.')
        else:
            response = discord.Embed(color=0xBE1931, title='? Nothing to decrypt.')
    else:
        response = discord.Embed(color=0xBE1931, title='? You don\'t posses a key.')
    if text:
        await message.channel.send(response)
    else:
        await message.channel.send(embed=response)
项目:apex-sigma-plugins    作者:lu-ci    | 项目源码 | 文件源码
def encrypt(cmd, message, args):
    key = cmd.bot.cfg.pref.raw.get('key_to_my_heart')
    text = False
    if key:
        if args:
            if args[-1] == ':t':
                text = True
                crypt_text = ' '.join(args[:-1]).encode('utf-8')
            else:
                crypt_text = ' '.join(args).encode('utf-8')
            key = key.encode('utf-8')
            cipher = Fernet(key)
            try:
                ciphered = cipher.encrypt(crypt_text).decode('utf-8')
            except InvalidToken:
                ciphered = None
            except InvalidSignature:
                ciphered = None
            if ciphered:
                if text:
                    response = ciphered
                else:
                    response = discord.Embed(color=0xe75a70)
                    response.add_field(name=f'?? Text Encrypted', value=ciphered)
            else:
                response = discord.Embed(color=0xBE1931, title='? The token or key are incorrect.')
        else:
            response = discord.Embed(color=0xBE1931, title='? Nothing to decrypt.')
    else:
        response = discord.Embed(color=0xBE1931, title='? You don\'t posses a key.')
    if text:
        await message.channel.send(response)
    else:
        await message.channel.send(embed=response)
项目:PySyncObj    作者:bakwc    | 项目源码 | 文件源码
def getEncryptor(password):
    if not isinstance(password, bytes):
        password = bytes(password.encode())
    kdf = PBKDF2HMAC(
        algorithm=hashes.SHA256(),
        length=32,
        salt=SALT,
        iterations=100000,
        backend=default_backend()
    )
    key = base64.urlsafe_b64encode(kdf.derive(password))
    return Fernet(key)
项目:deb-python-sqlalchemy-utils    作者:openstack    | 项目源码 | 文件源码
def _initialize_engine(self, parent_class_key):
        self.secret_key = base64.urlsafe_b64encode(parent_class_key)
        self.fernet = Fernet(self.secret_key)
项目:sftransfer    作者:Ododo    | 项目源码 | 文件源码
def initialization(self, key):
        key = base64.urlsafe_b64decode(key)
        key = base64.urlsafe_b64encode(key[:32])
        self._cipher = Fernet(key)
项目:asgi_rabbitmq    作者:proofit404    | 项目源码 | 文件源码
def make_fernet(self, key):
        """
        Given a single encryption key, returns a Fernet instance using it.
        """

        from cryptography.fernet import Fernet
        key = key.encode('utf8')
        formatted_key = base64.urlsafe_b64encode(hashlib.sha256(key).digest())
        return Fernet(formatted_key)
项目:maas    作者:maas    | 项目源码 | 文件源码
def calculate_digest(secret, message, salt):
    """Calculate a SHA-256 HMAC digest for the given data."""
    assert isinstance(secret, bytes), "%r is not a byte string." % (secret,)
    assert isinstance(message, bytes), "%r is not byte string." % (message,)
    assert isinstance(salt, bytes), "%r is not a byte string." % (salt,)
    hmacr = HMAC(secret, digestmod=sha256)
    hmacr.update(message)
    hmacr.update(salt)
    return hmacr.digest()


# Cache the Fernet pre-shared key, since it's expensive to derive the key.
# Note: this will need to change to become a dictionary if salts are supported.
项目:maas    作者:maas    | 项目源码 | 文件源码
def _get_or_create_fernet_psk():
    """Gets or creates a pre-shared key to be used with the Fernet algorithm.

    The pre-shared key is cached in a global to prevent the expense of
    recalculating it.

    Uses the MAAS secret (typically /var/lib/maas/secret) to derive the key.

    :return: A pre-shared key suitable for use with the Fernet class.
    """
    with _fernet_lock:
        global _fernet_psk
        if _fernet_psk is None:
            secret = get_shared_secret_from_filesystem()
            if secret is None:
                raise MissingSharedSecret("MAAS shared secret not found.")
            # Keying material is required by PBKDF2 to be a byte string.
            kdf = PBKDF2HMAC(
                algorithm=hashes.SHA256(),
                length=32,
                # XXX: It might be better to use the maas_id for the salt.
                # But that requires the maas_id to be known in advance by all
                # parties to the encrypted communication. The format of the
                # cached pre-shared key would also need to change.
                salt=b"",
                # XXX: an infrequently-changing variable iteration count might
                # be nice, but that would require protocol support, and
                # changing the way the PSK is cached.
                iterations=DEFAULT_ITERATION_COUNT,
                backend=default_backend()
            )
            key = kdf.derive(secret)
            key = urlsafe_b64encode(key)
            _fernet_psk = key
        else:
            key = _fernet_psk
    return key