我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用flask.g.current_user()。
def put(self): """Change the password""" us = User.query \ .filter(User.disabled == 0) \ .filter(User.id_user == g.current_user) \ .first() abort_if_none(us, 404, 'User not found') if not check_password_hash(us.password, request.json['old_password']): return msg('Old password incorrect'), 403 us.password = request.json['password'] db.session.commit() cache.blacklisted_tokens.append(request.headers['Authorization']) return msg('success!')
def check_auth(): session = None user = None token = request.headers.get('X-Auth-Token') if token: session = Session.query.filter_by(token=token).first() if not session: return make_error_response('Invalid session token', 401) user = session.user else: auth = request.authorization if auth: user = User.find_by_email_or_username(auth.username) if not (user and user.password == auth.password): return make_error_response('Invalid username/password combination', 401) g.current_session = session g.current_user = user
def verify_password(email_or_token, password): """Verify user using email and address or token, otherwise, set as anonymous user. """ if email_or_token == '': g.current_user = AnonymousUser() return True if password == '': g.current_user = User.verify_auth_token(email_or_token) g.token_used = True return g.current_user is not None user = User.query.filter_by(email=email_or_token).first() if not user: return False g.current_user = user g.token_used = False return user.verify_password(password)
def verify_password(username, password): authorization = request.headers.get('Authorization', '').split(' ') bearer = authorization[1] if len(authorization) > 1 else '' if bearer: g.current_user = User.verify_access_token(bearer) g.token_used = True return g.current_user is not None if username == '': g.current_user = AnonymousUser() return True user = User(username=username) if not user or not user.user_id: return False g.current_user = user g.token_used = False return user.verify_password(password)
def verify_token(token, add_to_session=False): """Token verification callback.""" if add_to_session: # clear the session in case auth fails if 'nickname' in session: del session['nickname'] user = User.query.filter_by(token=token).first() if user is None: return False if user.ping(): from .events import push_model push_model(user) db.session.add(user) db.session.commit() g.current_user = user if add_to_session: session['nickname'] = user.nickname return True
def delete(self, org_id, location_id, role_id, user_id, timeclock_id): """ deletes a timeclock record """ timeclock = Timeclock.query.get_or_404(timeclock_id) user = User.query.get_or_404(user_id) original_start = timeclock.start original_stop = timeclock.stop try: db.session.delete(timeclock) db.session.commit() except Exception as exception: db.session.rollback() current_app.logger.error(str(exception)) abort(400) if timeclock.user_id != g.current_user.id: alert_timeclock_change(None, org_id, location_id, role_id, original_start, original_stop, user, g.current_user) g.current_user.track_event("timeclock_deleted") return {}, 204
def login(): username = request.json.get('username') password = request.json.get('password') if username and password: user = User.find_by_identity(username) if user and user.authenticated(password): g.current_user = user session_token = user.generate_auth_token(3600) user.session_token = session_token db.session.commit() response = user.to_json() return jsonify({'response': response}), 200 else: return jsonify({'response': {'message': 'Username or password is wrong'}}), 404 return jsonify({'response': {'message': 'Password and username not provided'}}), 500
def get_blender_id_oauth_token() -> str: """Returns the Blender ID auth token, or an empty string if there is none.""" from flask import request token = session.get('blender_id_oauth_token') if token: if isinstance(token, (tuple, list)): # In a past version of Pillar we accidentally stored tuples in the session. # Such sessions should be actively fixed. # TODO(anyone, after 2017-12-01): refactor this if-block so that it just converts # the token value to a string and use that instead. token = token[0] session['blender_id_oauth_token'] = token return token if request.authorization and request.authorization.username: return request.authorization.username if current_user.is_authenticated and current_user.id: return current_user.id return ''
def verify_password(email_or_token, password): if email_or_token == '': g.current_user = AnonymousUser() return True if password == '': g.current_user = User.verify_auth_token(email_or_token) g.token_used = True return g.current_user is not None user = User.query.filter_by(email=email_or_token).first() if not user: return False g.current_user = user g.token_used = False return user.verify_password(password) # 401
def verify_password(email_or_token,password): if email_or_token == '': g.current_user = AnonymousUser() return True if password == '': g.current_user = User.verify_auth_token(email_or_token) g.token_used = True return g.current_user is not None user = User.query.filter_by(email=email_or_token).first() if not user: return False g.current_user = user g.token_used = False return user.verify_password(password)
def verify_token(): """ Verify if the token is valid, not expired and not blacklisted """ if 'Authorization' in request.headers: if request.headers['Authorization'] in cache.blacklisted_tokens: abort(403, 'Error: invalid token') try: payload = jwt.decode(request.headers['Authorization'], config.SECRET_KEY) g.current_user = payload['id_user'] except jwt.ExpiredSignatureError: abort(403, 'Error: token expired') except jwt.DecodeError: abort(403, 'Error: invalid token')
def get_user_sessions(): return g.current_user.sessions
def delete_user_sessions(): g.current_user.sessions.delete() db.session.commit() return ('', 204)
def delete_session(id): session = Session.query.get(id) if not (session and session.user == g.current_user): return make_error_response('Session not found', 404) db.session.delete(session) db.session.commit() return ('', 204)
def update_user(data): user = g.current_user if data['password']: user.change_password(data['password']) db.session.commit() return user
def inject_context(context): ctx = {} if context: ctx.update(context) ctx['current_user'] = g.current_user return ctx
def new_post(): post = Post.from_json(request.json) post.author = g.current_user db.session.add(post) db.session.commit() return jsonify(post.to_json()), 201, \ {'Location': url_for('api.get_post', id=post.id, _external=True)}
def edit_post(id): post = Post.query.get_or_404(id) if g.current_user != post.author and \ not g.current_user.can(Permission.ADMINISTER): return forbidden('Insufficient permissions') post.body = request.json.get('body', post.body) db.session.add(post) return jsonify(post.to_json())
def new_post_comment(id): post = Post.query.get_or_404(id) comment = Comment.from_json(request.json) comment.author = g.current_user comment.post = post db.session.add(comment) db.session.commit() return jsonify(comment.to_json()), 201, \ {'Location': url_for('api.get_comment', id=comment.id, _external=True)}
def verify_password(email_or_token, password): if email_or_token == '': g.current_user = AnonymousUser() return True if password == '': g.current_user = User.verify_auth_token(email_or_token) g.token_used = True return g.current_user is not None user = User.query.filter_by(email=email_or_token).first() if not user: return False g.current_user = user g.token_used = False return user.verify_password(password)
def before_request(): if not g.current_user.is_anonymous and \ not g.current_user.confirmed: return forbidden('Unconfirmed account')
def permission_required(permission): """Decorator for specified permission verification. """ def decorator(f): @wraps(f) def decorated_function(*args, **kwargs): if not g.current_user.can(permission): abort(403) return f(*args, **kwargs) return decorated_function return decorator
def new_post(): post = Post.from_json(request.json) post.author = g.current_user db.session.add(post) db.session.commit() return jsonify(post.to_json()), 201, {'Location': url_for('api.get_post',id=post.id, _external=True)} # put??
def edit_post(id): post = Post.query.get_or_404(id) if g.current_user != post.author and \ not g.current_user.operation(Permission.ADMINISTER): return forbidden('Insufficient permissions') post.title = request.json.get('title', post.title) post.body = request.json.get('body', post.body) db.session.add(post) return jsonify(post.to_json())
def new_post_comment(id): post = Post.query.get_or_404(id) comment = Comment.from_json(request.json) comment.author = g.current_user comment.post = post db.session.add(comment) db.session.commit() return jsonify(comment.to_json()), 201, \ {'Location': url_for('api.get_comment', id=comment.id,_external=True)}
def before_request(): if not g.current_user.is_authenticated: return forbidden('Unconfirmed account')
def before_pre_request(): if request.path in ['/auth/login', '/oauth2/welcome']: return token = request.headers.get('TOKEN') if not token: return jsonify('Authorization error'), 403 gl = gitlab.Gitlab( 'http://gitlab.onenet.com', oauth_token=token, api_version='4') gl.auth() g.current_user = gl.user g.gl = gl
def get(self): return g.current_user.attributes
def login_required(f) : @wraps(f) def decorated(*args,**kwargs) : token = request.headers.get('token') if token is not None : g.current_user = User.verify_auth_token(token) return f(*args,**kwargs) return jsonify("login first!") , 401 return decorated
def forbidden_error(): return forbidden('unconfirmed account') # uncomment to apply auth.login_required for each view in the blueprint # #@webhook.before_request #@auth.login_required #def before_request(): # if not g.current_user.is_anonymous and \ # not g.current_user.confirmed: # return forbidden('Unconfirmed account')
def get_token(): if g.current_user.is_anonymous or g.token_used: return unauthorized('Invalid credentials') return jsonify({'token': g.current_user.generate_auth_token( expiration=3600), 'expiration': 3600})
def new_post(): post = Post.from_json(request.json) post.author = g.current_user db.session.add(post) db.session.commit() return jsonify(post.to_json()), 201, \ {'Location': url_for('api.get_post', id=post.id)}
def edit_post(id): post = Post.query.get_or_404(id) if g.current_user != post.author and \ not g.current_user.can(Permission.ADMIN): return forbidden('Insufficient permissions') post.body = request.json.get('body', post.body) db.session.add(post) db.session.commit() return jsonify(post.to_json())
def new_post_comment(id): post = Post.query.get_or_404(id) comment = Comment.from_json(request.json) comment.author = g.current_user comment.post = post db.session.add(comment) db.session.commit() return jsonify(comment.to_json()), 201, \ {'Location': url_for('api.get_comment', id=comment.id)}
def verify_password(email_or_token, password): if email_or_token == '': return False if password == '': g.current_user = User.verify_auth_token(email_or_token) g.token_used = True return g.current_user is not None user = User.query.filter_by(email=email_or_token).first() if not user: return False g.current_user = user g.token_used = False return user.verify_password(password)
def verify_token(token): if token in TOKENS: g.current_user = token return True return False # ?????????