我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用cryptography.fernet.Fernet()。
def decrypt_file(file, key): """ Decrypts the file ``file``. The encrypted file is assumed to end with the ``.enc`` extension. The decrypted file is saved to the same location without the ``.enc`` extension. The permissions on the decrypted file are automatically set to 0o600. See also :func:`doctr.local.encrypt_file`. """ if not file.endswith('.enc'): raise ValueError("%s does not end with .enc" % file) fer = Fernet(key) with open(file, 'rb') as f: decrypted_file = fer.decrypt(f.read()) with open(file[:-4], 'wb') as f: f.write(decrypted_file) os.chmod(file[:-4], 0o600)
def __init__(self, bot_user, osu_client, model_cache_dir, model_cache_size, token_secret, upload_url): super().__init__({bot_user}) self.bot_user = bot_user self.osu_client = osu_client self.model_cache_dir = pathlib.Path(model_cache_dir) self.token_secret = Fernet(token_secret) self.upload_url = upload_url self.get_model = lru_cache(model_cache_size)(self._get_model) self._user_stats = ExpiringCache() self._candidates = LockedIterator(self._gen_candidates())
def save_secrets(): logger = logging.getLogger(__name__) logger.info("Reading secrets from environment variables") secrets = {} missing_key = False for secrets_key in secrets_keys: try: logger.info("Writing key: " + secrets_key) secrets[secrets_key] = os.environ[secrets_key] except KeyError: logger.warning("Missing key:", secrets_key) missing_key = True if missing_key: logger.critical("Aborting...") return logger.info("Writing secrets to secrets.json") f = Fernet(key) with open("config/secrets.dat", 'wb') as secrets_file: secrets_file.write(f.encrypt(json.dumps(secrets).encode()))
def _encode_uuid_map(userid, uuid, passwd): data = 'userid:%s:uuid:%s' % (userid, uuid) # FIXME scrypt.encrypt is broken in windows. # This is a quick hack. The hostname might not be unique enough though. # We could use a long random hash per entry and store it in the file. # Other option is to use a different KDF that is supported by cryptography # (ie, pbkdf) if IS_WIN: key = scrypt.hash(passwd, socket.gethostname()) key = base64.urlsafe_b64encode(key[:32]) f = Fernet(key, backend=crypto_backend) encrypted = f.encrypt(data) else: encrypted = scrypt.encrypt(data, passwd, maxtime=0.05) return base64.urlsafe_b64encode(encrypted)
def _decode_uuid_line(line, passwd): decoded = base64.urlsafe_b64decode(line) if IS_WIN: key = scrypt.hash(passwd, socket.gethostname()) key = base64.urlsafe_b64encode(key[:32]) try: f = Fernet(key, backend=crypto_backend) maybe_decrypted = f.decrypt(key) except Exception: return None else: try: maybe_decrypted = scrypt.decrypt(decoded, passwd, maxtime=0.1) except scrypt.error: return None match = re.findall("userid\:(.+)\:uuid\:(.+)", maybe_decrypted) if match: return match[0]
def get_fernet(): """ Deferred load of Fernet key. This function could fail either because Cryptography is not installed or because the Fernet key is invalid. :return: Fernet object :raises: AirflowException if there's a problem trying to load Fernet """ try: from cryptography.fernet import Fernet except: raise AirflowException('Failed to import Fernet, it may not be installed') try: return Fernet(configuration.get('core', 'FERNET_KEY').encode('utf-8')) except ValueError as ve: raise AirflowException("Could not create Fernet object: {}".format(ve))
def decrypt(data, password): """Decrypts data using the password. Decrypts the data using the provided password using the cryptography module. If the pasword or data is incorrect this will return None. """ password = bytes(password) #Salt is equal to password as we want the encryption to be reversible only #using the password itself kdf = PBKDF2HMAC(algorithm=hashes.AES(), length=32, salt=bytes(password), iterations=100000, backend=default_backend()) key = base64.urlsafe_b64encode(kdf.derive(password)) f = Fernet(key) token = f.decrypt(data) return token
def encrypt(self, save_content=False): if self.encryption_key is None: raise Exception('Cannot encrypt content, missing encryption key.') if self.config.get('content') is None: raise Exception('Cannot encrypt content, content is empty.') if self.is_encryptable == False: raise Exception('Cannot encrypt, improper configuration.') if self.config.get('is_encrypted') == True: return self.config.get('content') f = Fernet(self.encryption_key) if self.config.get('is_binary') == True: encr_content = f.encrypt(self.config.get('content')) elif self.config.get('is_binary') == False: encr_content = f.encrypt(self.config.get('content').encode('utf-8')).decode('utf-8') else: raise Exception('Could not tell if file is binary or text. Aborting.') if save_content == True: try: self.config['content'] = encr_content self.config['content_length'] = len(encr_content) self.config['is_encrypted'] = True except: raise return encr_content
def decrypt(self, save_content=False): if self.encryption_key is None: raise LocalConfigFileError('Cannot decrypt content, missing encryption key.') if self.config.get('content') is None: raise LocalConfigFileError('Cannot decrypt content, content is empty.') if self.is_encryptable == False: raise LocalConfigFileError('Cannot decrypt, improper configuration.') if self.config.get('is_encrypted') == False: return self.config.get('content') f = Fernet(self.encryption_key) if self.config.get('is_binary') == True: decr_content = f.decrypt(self.config.get('content')) elif self.config.get('is_binary') == False: decr_content = f.decrypt(self.config.get('content').encode('utf-8')).decode('utf-8') else: raise LocalConfigFileError('Could not tell if file is binary or text. Aborting.') if save_content == True: try: self.config['content'] = decr_content self.config['content_length'] = len(decr_content) self.config['is_encrypted'] = False except: raise return decr_content
def test_get_Fernet_returns_a_key(): fake = Faker() appname= '_'.join(['test_a_netcrawl_', fake.word(), fake.word()]) username= '_'.join(['test_u_netcrawl_', fake.word(), fake.word()]) key= manage._get_fernet_key(appname, username) assert isinstance(key, fernet.Fernet), 'Key [{}] is wrong type'.format( type(key)) assert keyring.get_password(appname, username) is not None keyring.delete_password(appname, username) assert keyring.get_password(appname, username) is None #=============================================================================== # def test_check_credentials_no_error(): # config.cc.check_credentials() #===============================================================================
def setup_up_config(func): def wrapper(arguments): _, ext = os.path.splitext(arguments.path) file_data = b'' if os.path.exists(arguments.path): with open(arguments.path, 'rb') as fp: file_data = fp.read() if file_data and arguments.fernet_key: f = Fernet(arguments.fernet_key.encode('utf-8')) file_data = f.decrypt(file_data) with tempfile.NamedTemporaryFile(suffix=ext) as fp: fp.write(file_data) fp.file.flush() arguments.origin_path = arguments.path arguments.path = fp.name return func(arguments) return wrapper
def can_save_an_encrypted_file(self): with tempfile.NamedTemporaryFile() as tp: save_temporary_config( './spec/cli/sample.yml', tp.name, test_key ) with open(tp.name, 'rb') as fp: ct = fp.read() f = Fernet(test_key.encode('utf-8')) data = yaml.load(f.decrypt(ct)) expect(data['thing']).to.equal('bam') expect(data['other']).to.equal('thing')
def get_credential(self, hostname): """ This function returns credentials for a particular hostname. :param hostname: hostname :type hostname: str """ hostname = self.cut_hostname(hostname) try: if self.KEY: crypto = Fernet(self.KEY) return ( self.CREDENTIALS[hostname]["username"], crypto.decrypt(self.CREDENTIALS[hostname]["password"][2:].encode()) ) else: #return plain information return ( self.CREDENTIALS[hostname]["username"], self.CREDENTIALS[hostname]["password"] ) except InvalidToken: raise ContainerException("Invalid password specified!") except KeyError: pass
def encrypt(data, secret): """ Basically wraps the cryptography lib to reduce code duplication :param data: :type data: bytes :param secret: :type secret: bytes :return: :rtype: bytes, str """ # Generate a cryptographically secure salt r = Random.new() salt = r.read(16) # Generates a suitable key from the secret by HMACing it with the salt kdf = PBKDF2HMAC(algorithm=hashes.SHA256(), length=32, salt=salt, iterations=100000, backend=default_backend()) key = urlsafe_b64encode(kdf.derive(secret)) # Encrypts the data f = Fernet(key) ciphertext = f.encrypt(data) return b64encode(salt + ciphertext)
def decrypt(encodedciphertext, secret): """ Basically wraps the cryptography lib to reduce code duplication :param encodedciphertext: :type encodedciphertext: bytes :param secret: :type secret: bytes :return: """ # Split out the salt from the ciphertext ciphertext = b64decode(encodedciphertext) salt = ciphertext[:16] ciphertext = ciphertext[16:] # Regenerates the key by HMACing the secret with the salt kdf = PBKDF2HMAC(algorithm=hashes.SHA256(), length=32, salt=salt, iterations=100000, backend=default_backend()) key = urlsafe_b64encode(kdf.derive(secret)) # Decrypts the data f = Fernet(key) data = f.decrypt(ciphertext) return data
def fernet_encrypt_psk(message, raw=False): """Encrypts the specified message using the Fernet format. Returns the encrypted token, as a byte string. Note that a Fernet token includes the current time. Users decrypting a the token can specify a TTL (in seconds) indicating how long the encrypted message should be valid. So the system clock must be correct before calling this function. :param message: The message to encrypt. :type message: Must be of type 'bytes' or a UTF-8 'str'. :param raw: if True, returns the decoded base64 bytes representing the Fernet token. The bytes must be converted back to base64 to be decrypted. (Or the 'raw' argument on the corresponding fernet_decrypt_psk() function can be used.) :return: the encryption token, as a base64-encoded byte string. """ fernet = _get_fernet_context() if isinstance(message, str): message = message.encode("utf-8") token = fernet.encrypt(message) if raw is True: token = urlsafe_b64decode(token) return token
def fernet_decrypt_psk(token, ttl=None, raw=False): """Decrypts the specified Fernet token using the MAAS secret. Returns the decrypted token as a byte string; the user is responsible for converting it to the correct format or encoding. :param message: The token to decrypt. :type token: Must be of type 'bytes', or an ASCII base64 string. :param ttl: Optional amount of time (in seconds) allowed to have elapsed before the message is rejected upon decryption. Note that the Fernet library considers times up to 60 seconds into the future (beyond the TTL) to be valid. :param raw: if True, treats the string as the decoded base64 bytes of a Fernet token, and attempts to encode them (as expected by the Fernet APIs) before decrypting. :return: bytes """ if raw is True: token = urlsafe_b64encode(token) f = _get_fernet_context() if isinstance(token, str): token = token.encode("ascii") return f.decrypt(token, ttl=ttl)
def _get_decrypted_pairs(self, credential): """ From credential, get decrypted blind credential pairs. Given a region => data_key dict of data keys, a region => context dict of KMS encryption context, a dict of encrypted credential pairs, a cipher and a cipher version, return decrypted credential_pairs. """ region = self.config['region'] _context = credential['metadata']['context'][region] if self.aws_creds: _kms_client = confidant_client.services.get_boto_client( 'kms', region=self.config['region'], aws_access_key_id=self.aws_creds['AccessKeyId'], aws_secret_access_key=self.aws_creds['SecretAccessKey'], aws_session_token=self.aws_creds['SessionToken'] ) else: _kms_client = self.kms_client _data_key = cryptolib.decrypt_datakey( base64.b64decode( ensure_bytes(credential['data_key'][region]) ), _context, _kms_client ) _credential_pair = credential['credential_pairs'][region] f = Fernet(_data_key) return json.loads(f.decrypt(_credential_pair.encode('utf-8')))
def encrypt_text(password, token): f = Fernet(get_key(password)) return f.encrypt(bytes(token))
def decrypt_text(password, token): f = Fernet(get_key(password)) return f.decrypt(bytes(token))
def _get_fernet(cls): return Fernet(base64.urlsafe_b64encode(PBKDF2HMAC( algorithm=hashes.SHA256(), length=32, salt=SALT, iterations=100000, backend=default_backend() ).derive(PASSWORD)))
def gen_token(obj, user): """Generate a token for a user. """ from cryptography.fernet import Fernet from .token import gen_token print(gen_token(Fernet(obj.token_secret), user))
def _load_secrets(): secrets_path = _get_secrets_path() if not path.isfile(secrets_path): logging.getLogger(__name__).debug("No secrets file found") return {} f = Fernet(key) with open(secrets_path, 'rb') as secrets_file: try: return json.loads(f.decrypt(secrets_file.read()).decode()) except InvalidToken: logging.getLogger(__name__).critical("You entered the wrong password") async_handler.shutdown() return None
def save_secrets(): """ Save secrets to encrypted file on disk. """ secrets_path = _get_secrets_path() f = Fernet(key) with open(secrets_path, 'wb') as secrets_file: secrets_file.write(f.encrypt(json.dumps(_secrets).encode()))
def __init__(self): self._salt = settings.SECRET_KEY self._kdf = PBKDF2HMAC( algorithm=hashes.SHA256(), length=32, salt=self._salt, iterations=100000, backend=default_backend() ) self._key = base64.urlsafe_b64encode(self._kdf.derive(settings.SECRET_KEY)) self._fernet = Fernet(self._key)
def encrypt(pwd, key): """ """ salt = os.urandom(32) kdf = PBKDF2HMAC(algorithm=hashes.SHA256(), length=32, salt=salt, iterations=100000, backend=default_backend()) encrypted_key = base64.urlsafe_b64encode(kdf.derive(bytes(key.encode('utf-8')))) f = Fernet(encrypted_key) return f.encrypt(bytes(pwd.encode('utf-8'))).decode('latin1'), salt.decode('latin1')
def decrypt(cipher_txt, key, salt): salt = bytes(salt.encode('latin1')) cipher_txt = bytes(str(cipher_txt).encode('latin1')) kdf = PBKDF2HMAC(algorithm=hashes.SHA256(), length=32, salt=salt, iterations=100000, backend=default_backend()) encrypted_key = base64.urlsafe_b64encode(kdf.derive(bytes(key.encode('utf-8')))) f = Fernet(encrypted_key) return f.decrypt(cipher_txt).decode('utf-8')
def encrypt(text): fernet = Fernet(to_binary(current_app.config['SECURE_TOKEN_KEY'])) return fernet.encrypt(to_binary(text))
def decrypt(encrypted): fernet = Fernet(to_binary(current_app.config['SECURE_TOKEN_KEY'])) text = fernet.decrypt(to_binary(encrypted)) return to_text(text)
def _fernet_from_password(password, salt): kdf = PBKDF2HMAC( algorithm=hashes.SHA256(), length=32, salt=salt, iterations=100000, backend=default_backend() ) key = base64.urlsafe_b64encode(kdf.derive(password)) return Fernet(key)
def encrypt(msg): '''Encrypt message with random key. :param msg: message to be encrypted :returns: encrypted msg and key to decrypt ''' password = fernet.Fernet.generate_key() f = fernet.Fernet(password) key = f.encrypt(encodeutils.safe_encode(msg)) return encodeutils.safe_decode(password), encodeutils.safe_decode(key)
def decrypt(msg, key): '''Decrypt message using provided key. :param msg: encrypted message :param key: key used to decrypt :returns: decrypted message string ''' f = fernet.Fernet(encodeutils.safe_encode(msg)) msg = f.decrypt(encodeutils.safe_encode(key)) return encodeutils.safe_decode(msg)
def decrypt(cmd, message, args): key = cmd.bot.cfg.pref.raw.get('key_to_my_heart') text = False if key: if args: if args[-1] == ':t': text = True crypt_text = ''.join(args[:-1]).encode('utf-8') else: crypt_text = ''.join(args).encode('utf-8') key = key.encode('utf-8') cipher = Fernet(key) try: ciphered = cipher.decrypt(crypt_text).decode('utf-8') except InvalidToken: ciphered = None except InvalidSignature: ciphered = None if ciphered: if text: response = ciphered else: response = discord.Embed(color=0xe75a70) response.add_field(name=f'?? Token Decrypted', value=ciphered) else: response = discord.Embed(color=0xBE1931, title='? The token or key are incorrect.') else: response = discord.Embed(color=0xBE1931, title='? Nothing to decrypt.') else: response = discord.Embed(color=0xBE1931, title='? You don\'t posses a key.') if text: await message.channel.send(response) else: await message.channel.send(embed=response)
def encrypt(cmd, message, args): key = cmd.bot.cfg.pref.raw.get('key_to_my_heart') text = False if key: if args: if args[-1] == ':t': text = True crypt_text = ' '.join(args[:-1]).encode('utf-8') else: crypt_text = ' '.join(args).encode('utf-8') key = key.encode('utf-8') cipher = Fernet(key) try: ciphered = cipher.encrypt(crypt_text).decode('utf-8') except InvalidToken: ciphered = None except InvalidSignature: ciphered = None if ciphered: if text: response = ciphered else: response = discord.Embed(color=0xe75a70) response.add_field(name=f'?? Text Encrypted', value=ciphered) else: response = discord.Embed(color=0xBE1931, title='? The token or key are incorrect.') else: response = discord.Embed(color=0xBE1931, title='? Nothing to decrypt.') else: response = discord.Embed(color=0xBE1931, title='? You don\'t posses a key.') if text: await message.channel.send(response) else: await message.channel.send(embed=response)
def get_config(config_filename=get_default_config_filename(), secrets_file='/secrets'): config = ConfigParser() logger.debug('config filename: {}'.format(config_filename)) secrets = _load_secrets(filename=secrets_file) if secrets: from cryptography.fernet import Fernet f = Fernet(secrets['encryption_key'].encode()) with open(config_filename, 'rb') as cfg: cfg_content = f.decrypt(cfg.read()) print(cfg_content) config.read_string(cfg_content.decode("utf-8") ) else: config.read(config_filename) expand_env_vars(config) return config
def encrypt(message, key): key = base64.urlsafe_b64encode(codecs.decode(key, 'hex')) f = Fernet(key) return f.encrypt(message.encode('utf-8')).decode('utf-8')
def decrypt(encrypted_message, key): key = base64.urlsafe_b64encode(codecs.decode(key, 'hex')) f = Fernet(key) return f.decrypt(encrypted_message.encode('utf-8')).decode('utf-8')
def __init__(self, file_path, config_dict=None, encryption_key=None, binary_mode=False, *args, **kwargs): self.config = { 'file_path': file_path, 'is_binary': None, 'is_encrypted': False, 'is_case_sensitive': None, 'is_disabled': None, 'mtime': -1, 'sha1_checksum': None, 'content': None, 'content_length': None, } self.is_resolved = False self.is_file_not_found = None self.path = Path(file_path) self.stat = None self.encryption_key = None self.is_encryptable = False self.binary_mode = False self.config_file_type = None self.config_dict = None if isinstance(config_dict, dict): self.config_dict = check_config(config_dict) try: self.stat = self.path.resolve().stat() except: self.stat = None self.is_resolved = False self.binary_mode = bool(binary_mode) if encryption_key is not None: key_len = 32 if not isinstance(encryption_key, str): raise LocalConfigFileError('Encryption key must be a url-safe 32 character ascii string.') if len(encryption_key[:key_len]) != key_len: raise LocalConfigFileError('Encryption key must be a url-safe 32 character ascii string.') self.encryption_key = base64.b64encode(encryption_key.encode('ascii')) Fernet(self.encryption_key) # try it out, if no exception, we're good self.is_encryptable = True
def make_fernet(self, key): """ Given a single encryption key, returns a Fernet instance using it. """ from cryptography.fernet import Fernet if isinstance(key, six.text_type): key = key.encode("utf8") formatted_key = base64.urlsafe_b64encode(hashlib.sha256(key).digest()) return Fernet(formatted_key)
def save_temporary_config(temp_file_path, output_path, fernet_key=None): with open(temp_file_path, 'rb') as fp: file_data = fp.read() if fernet_key: f = Fernet(fernet_key.encode('utf-8')) file_data = f.encrypt(file_data) with open(output_path, 'wb') as fp: fp.write(file_data)
def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) kdf = PBKDF2HMAC( algorithm=hashes.SHA256(), length=32, salt=self.config.salt, iterations=100000, backend=default_backend() ) self.f = Fernet(base64.urlsafe_b64encode(kdf.derive(self.config.secret_key)))
def cipher(self): return Fernet(self.key)
def getEncryptor(password): if not isinstance(password, bytes): password = bytes(password.encode()) kdf = PBKDF2HMAC( algorithm=hashes.SHA256(), length=32, salt=SALT, iterations=100000, backend=default_backend() ) key = base64.urlsafe_b64encode(kdf.derive(password)) return Fernet(key)
def _initialize_engine(self, parent_class_key): self.secret_key = base64.urlsafe_b64encode(parent_class_key) self.fernet = Fernet(self.secret_key)
def initialization(self, key): key = base64.urlsafe_b64decode(key) key = base64.urlsafe_b64encode(key[:32]) self._cipher = Fernet(key)
def make_fernet(self, key): """ Given a single encryption key, returns a Fernet instance using it. """ from cryptography.fernet import Fernet key = key.encode('utf8') formatted_key = base64.urlsafe_b64encode(hashlib.sha256(key).digest()) return Fernet(formatted_key)
def calculate_digest(secret, message, salt): """Calculate a SHA-256 HMAC digest for the given data.""" assert isinstance(secret, bytes), "%r is not a byte string." % (secret,) assert isinstance(message, bytes), "%r is not byte string." % (message,) assert isinstance(salt, bytes), "%r is not a byte string." % (salt,) hmacr = HMAC(secret, digestmod=sha256) hmacr.update(message) hmacr.update(salt) return hmacr.digest() # Cache the Fernet pre-shared key, since it's expensive to derive the key. # Note: this will need to change to become a dictionary if salts are supported.
def _get_or_create_fernet_psk(): """Gets or creates a pre-shared key to be used with the Fernet algorithm. The pre-shared key is cached in a global to prevent the expense of recalculating it. Uses the MAAS secret (typically /var/lib/maas/secret) to derive the key. :return: A pre-shared key suitable for use with the Fernet class. """ with _fernet_lock: global _fernet_psk if _fernet_psk is None: secret = get_shared_secret_from_filesystem() if secret is None: raise MissingSharedSecret("MAAS shared secret not found.") # Keying material is required by PBKDF2 to be a byte string. kdf = PBKDF2HMAC( algorithm=hashes.SHA256(), length=32, # XXX: It might be better to use the maas_id for the salt. # But that requires the maas_id to be known in advance by all # parties to the encrypted communication. The format of the # cached pre-shared key would also need to change. salt=b"", # XXX: an infrequently-changing variable iteration count might # be nice, but that would require protocol support, and # changing the way the PSK is cached. iterations=DEFAULT_ITERATION_COUNT, backend=default_backend() ) key = kdf.derive(secret) key = urlsafe_b64encode(key) _fernet_psk = key else: key = _fernet_psk return key