我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用werkzeug.security.safe_str_cmp()。
def decode_cookie(cookie): ''' This decodes a cookie given by `encode_cookie`. If verification of the cookie fails, ``None`` will be implicitly returned. :param cookie: An encoded cookie. :type cookie: str ''' try: payload, digest = cookie.rsplit(u'|', 1) if hasattr(digest, 'decode'): digest = digest.decode('ascii') # pragma: no cover except ValueError: return if safe_str_cmp(_cookie_digest(payload), digest): return payload
def process_zip_for_index(): if not safe_str_cmp(request.form.get('secret', ''), current_app.config['SEARCH_INDEX_SECRET']): abort(403) index_path = get_index_path() config = json.load(request.files['config']) archive = release_file(request, 'archive') try: shutil.rmtree(index_path) except (OSError, IOError): pass def generate(): for event in index_tree(config, from_zip=archive, index_path=index_path): yield '%s\n' % event.encode('utf-8') return Response(generate(), direct_passthrough=True, headers={'X-Accel-Buffering': 'no'}, mimetype='text/plain')
def delete_index(): if not safe_str_cmp(request.form.get('secret', ''), current_app.config['SEARCH_INDEX_SECRET']): abort(403) index_path = get_index_path() if index_path is not None: try: shutil.rmtree(index_path) except (OSError, IOError): pass deleted = not os.path.isdir(index_path) if deleted: return Response(status=204) else: return Response(status=500)
def reset_password_token_status(token): """Returns the expired status, invalid status, and user of a password reset token. For example:: expired, invalid, user, data = reset_password_token_status('...') :param token: The password reset token """ expired, invalid, user, data = get_token_status(token, 'reset', 'RESET_PASSWORD', return_data=True) if not invalid: if user.password: password_hash = md5(user.password) if not safe_str_cmp(password_hash, data[1]): invalid = True return expired, invalid, user
def decode_jwt(encoded_token, secret, algorithm, identity_claim_key, user_claims_key, csrf_value=None): """ Decodes an encoded JWT :param encoded_token: The encoded JWT string to decode :param secret: Secret key used to encode the JWT :param algorithm: Algorithm used to encode the JWT :param identity_claim_key: expected key that contains the identity :param user_claims_key: expected key that contains the user claims :param csrf_value: Expected double submit csrf value :return: Dictionary containing contents of the JWT """ # This call verifies the ext, iat, and nbf claims data = jwt.decode(encoded_token, secret, algorithms=[algorithm]) # Make sure that any custom claims we expect in the token are present if 'jti' not in data: raise JWTDecodeError("Missing claim: jti") if identity_claim_key not in data: raise JWTDecodeError("Missing claim: {}".format(identity_claim_key)) if 'type' not in data or data['type'] not in ('refresh', 'access'): raise JWTDecodeError("Missing or invalid claim: type") if data['type'] == 'access': if 'fresh' not in data: raise JWTDecodeError("Missing claim: fresh") if user_claims_key not in data: data[user_claims_key] = {} if csrf_value: if 'csrf' not in data: raise JWTDecodeError("Missing claim: csrf") if not safe_str_cmp(data['csrf'], csrf_value): raise CSRFError("CSRF double submit tokens do not match") return data
def authenticate(username, password): user = username_table.get(username, None) if user and safe_str_cmp(user.password.encode('utf-8'), password.encode('utf-8')): return user
def authenticate(username, password): user = username_table.get(username, None) if user and safe_str_cmp(user.password, password): return user
def authenticate(username, password): user = User.find_by_username(username) if user and safe_str_cmp(user.password, password): return user
def authenticate(username, password): user = UserModel.find_by_username(username) if user and safe_str_cmp(user.password, password): return user
def check_password_hash(self, pw_hash, password): '''Tests a password hash against a candidate password. The candidate password is first hashed and then subsequently compared in constant time to the existing hash. This will either return `True` or `False`. Example usage of :class:`check_password_hash` would look something like this:: pw_hash = bcrypt.generate_password_hash('secret', 10) bcrypt.check_password_hash(pw_hash, 'secret') # returns True :param pw_hash: The hash to be compared against. :param password: The password to compare. ''' # Python 3 unicode strings must be encoded as bytes before hashing. if PY3 and isinstance(pw_hash, str): pw_hash = bytes(pw_hash, 'utf-8') if PY3 and isinstance(password, str): password = bytes(password, 'utf-8') if not PY3 and isinstance(pw_hash, unicode): pw_hash = pw_hash.encode('utf-8') if not PY3 and isinstance(password, unicode): password = password.encode('utf-8') return safe_str_cmp(bcrypt.hashpw(password, pw_hash), pw_hash)
def authenticate(username, password): """ Authenticate a username/password - this sample routine simply checks the username/password against a hard-coded table, a real-world implementation would authenticate users against a database or external service. """ user = username_table.get(username, None) if user and safe_str_cmp(user.password.encode('utf-8'), password.encode('utf-8')): return user
def update_index(): if not safe_str_cmp(request.form.get('secret', ''), current_app.config['SEARCH_INDEX_SECRET']): abort(403) index_path = get_index_path() put_index(index_path, request.files['archive']) return jsonify(okay=True)
def _token_loader(token): try: data = _security.remember_token_serializer.loads(token, max_age=_security.token_max_age) user = _security.datastore.find_user(id=data[0]) if user and safe_str_cmp(md5(user.password), data[1]): return user except: pass return _security.login_manager.anonymous_user()
def is_valid_password(self, password): """ Check if given password is valid. """ return safe_str_cmp( bcrypt.hashpw(password.encode('utf-8'), self.password_hash.encode('utf-8')), self.password_hash )
def unserialize(cls, string, secret_key): """Load the secure cookie from a serialized string. :param string: the cookie value to unserialize. :param secret_key: the secret key used to serialize the cookie. :return: a new :class:`SecureCookie`. """ if isinstance(string, text_type): string = string.encode('utf-8', 'replace') if isinstance(secret_key, text_type): secret_key = secret_key.encode('utf-8', 'replace') try: base64_hash, data = string.split(b'?', 1) except (ValueError, IndexError): items = () else: items = {} mac = hmac(secret_key, None, cls.hash_method) for item in data.split(b'&'): mac.update(b'|' + item) if b'=' not in item: items = None break key, value = item.split(b'=', 1) # try to make the key a string key = url_unquote_plus(key.decode('ascii')) try: key = to_native(key) except UnicodeError: pass items[key] = value # no parsing error and the mac looks okay, we can now # sercurely unpickle our cookie. try: client_hash = base64.b64decode(base64_hash) except TypeError: items = client_hash = None if items is not None and safe_str_cmp(client_hash, mac.digest()): try: for key, value in iteritems(items): items[key] = cls.unquote(value) except UnquoteError: items = () else: if '_expires' in items: if time() > items['_expires']: items = () else: del items['_expires'] else: items = () return cls(items, secret_key, False)
def validate_csrf(data, secret_key=None, time_limit=None, token_key=None): """Check if the given data is a valid CSRF token. This compares the given signed token to the one stored in the session. :param data: The signed CSRF token to be checked. :param secret_key: Used to securely sign the token. Default is ``WTF_CSRF_SECRET_KEY`` or ``SECRET_KEY``. :param time_limit: Number of seconds that the token is valid. Default is ``WTF_CSRF_TIME_LIMIT`` or 3600 seconds (60 minutes). :param token_key: Key where token is stored in session for comparision. Default is ``WTF_CSRF_FIELD_NAME`` or ``'csrf_token'``. :raises ValidationError: Contains the reason that validation failed. .. versionchanged:: 0.14 Raises ``ValidationError`` with a specific error message rather than returning ``True`` or ``False``. """ secret_key = _get_config( secret_key, 'WTF_CSRF_SECRET_KEY', current_app.secret_key, message='A secret key is required to use CSRF.' ) field_name = _get_config( token_key, 'WTF_CSRF_FIELD_NAME', 'csrf_token', message='A field name is required to use CSRF.' ) time_limit = _get_config( time_limit, 'WTF_CSRF_TIME_LIMIT', 3600, required=False ) if not data: raise ValidationError('The CSRF token is missing.') if field_name not in session: raise ValidationError('The CSRF session token is missing.') s = URLSafeTimedSerializer(secret_key, salt='wtf-csrf-token') try: token = s.loads(data, max_age=time_limit) except SignatureExpired: raise ValidationError('The CSRF token has expired.') except BadData: raise ValidationError('The CSRF token is invalid.') if not safe_str_cmp(session[field_name], token): raise ValidationError('The CSRF tokens do not match.')
def unserialize(cls, string, secret_key): """Load the secure cookie from a serialized string. :param string: the cookie value to unserialize. :param secret_key: the secret key used to serialize the cookie. :return: a new :class:`SecureCookie`. """ if isinstance(string, text_type): string = string.encode('utf-8', 'replace') if isinstance(secret_key, text_type): secret_key = secret_key.encode('utf-8', 'replace') try: base64_hash, data = string.split(b'?', 1) except (ValueError, IndexError): items = () else: items = {} mac = hmac(secret_key, None, cls.hash_method) for item in data.split(b'&'): mac.update(b'|' + item) if not b'=' in item: items = None break key, value = item.split(b'=', 1) # try to make the key a string key = url_unquote_plus(key.decode('ascii')) try: key = to_native(key) except UnicodeError: pass items[key] = value # no parsing error and the mac looks okay, we can now # sercurely unpickle our cookie. try: client_hash = base64.b64decode(base64_hash) except TypeError: items = client_hash = None if items is not None and safe_str_cmp(client_hash, mac.digest()): try: for key, value in iteritems(items): items[key] = cls.unquote(value) except UnquoteError: items = () else: if '_expires' in items: if time() > items['_expires']: items = () else: del items['_expires'] else: items = () return cls(items, secret_key, False)
def validate_csrf(data, secret_key=None, time_limit=None): """Check if the given data is a valid csrf token. :param data: The csrf token value to be checked. :param secret_key: A secret key for mixing in the token, default is Flask.secret_key. :param time_limit: Check if the csrf token is expired. default is True. """ if not data or '##' not in data: return False expires, hmac_csrf = data.split('##', 1) if time_limit is None: time_limit = current_app.config.get('WTF_CSRF_TIME_LIMIT', 3600) if time_limit: try: expires = float(expires) except: return False now = time.time() if now > expires: return False if not secret_key: secret_key = current_app.config.get( 'WTF_CSRF_SECRET_KEY', current_app.secret_key ) if 'csrf_token' not in session: return False csrf_build = '%s%s' % (session['csrf_token'], expires) hmac_compare = hmac.new( to_bytes(secret_key), to_bytes(csrf_build), digestmod=hashlib.sha1 ).hexdigest() return safe_str_cmp(hmac_compare, hmac_csrf)
def validate_csrf(data, secret_key=None, time_limit=None): """Check if the given data is a valid csrf token. :param data: The csrf token value to be checked. :param secret_key: A secret key for mixing in the token, default is Flask.secret_key. :param time_limit: Check if the csrf token is expired. default is True. """ if not data or '##' not in data: return False try: expires, hmac_csrf = data.split('##', 1) except ValueError: return False # unpack error if time_limit is None: time_limit = current_app.config.get('WTF_CSRF_TIME_LIMIT', 3600) if time_limit: try: expires = int(expires) except ValueError: return False now = int(time.time()) if now > expires: return False if not secret_key: secret_key = current_app.config.get( 'WTF_CSRF_SECRET_KEY', current_app.secret_key ) if 'csrf_token' not in session: return False csrf_build = '%s%s' % (session['csrf_token'], expires) hmac_compare = hmac.new( to_bytes(secret_key), to_bytes(csrf_build), digestmod=hashlib.sha1 ).hexdigest() return safe_str_cmp(hmac_compare, hmac_csrf)