Python flask.g 模块,current_user() 实例源码

我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用flask.g.current_user()

项目:IntegraTI-API    作者:discentes-imd    | 项目源码 | 文件源码
def put(self):
        """Change the password"""
        us = User.query \
            .filter(User.disabled == 0) \
            .filter(User.id_user == g.current_user) \
            .first()
        abort_if_none(us, 404, 'User not found')

        if not check_password_hash(us.password, request.json['old_password']):
            return msg('Old password incorrect'), 403

        us.password = request.json['password']
        db.session.commit()
        cache.blacklisted_tokens.append(request.headers['Authorization'])

        return msg('success!')
项目:flask-api-skeleton    作者:ianunruh    | 项目源码 | 文件源码
def check_auth():
    session = None
    user = None

    token = request.headers.get('X-Auth-Token')
    if token:
        session = Session.query.filter_by(token=token).first()
        if not session:
            return make_error_response('Invalid session token', 401)

        user = session.user
    else:
        auth = request.authorization
        if auth:
            user = User.find_by_email_or_username(auth.username)
            if not (user and user.password == auth.password):
                return make_error_response('Invalid username/password combination', 401)

    g.current_session = session
    g.current_user = user
项目:myproject    作者:dengliangshi    | 项目源码 | 文件源码
def verify_password(email_or_token, password):
    """Verify user using email and address or token, 
    otherwise, set as anonymous user.
    """
    if email_or_token == '':
        g.current_user = AnonymousUser()
        return True
    if password == '':
        g.current_user = User.verify_auth_token(email_or_token)
        g.token_used = True
        return g.current_user is not None
    user = User.query.filter_by(email=email_or_token).first()
    if not user:
        return False
    g.current_user = user
    g.token_used = False
    return user.verify_password(password)
项目:Plog    作者:thundernet8    | 项目源码 | 文件源码
def verify_password(username, password):
    authorization = request.headers.get('Authorization', '').split(' ')
    bearer = authorization[1] if len(authorization) > 1 else ''
    if bearer:
        g.current_user = User.verify_access_token(bearer)
        g.token_used = True
        return g.current_user is not None
    if username == '':
        g.current_user = AnonymousUser()
        return True
    user = User(username=username)
    if not user or not user.user_id:
        return False
    g.current_user = user
    g.token_used = False
    return user.verify_password(password)
项目:flack    作者:miguelgrinberg    | 项目源码 | 文件源码
def verify_token(token, add_to_session=False):
    """Token verification callback."""
    if add_to_session:
        # clear the session in case auth fails
        if 'nickname' in session:
            del session['nickname']
    user = User.query.filter_by(token=token).first()
    if user is None:
        return False
    if user.ping():
        from .events import push_model
        push_model(user)
    db.session.add(user)
    db.session.commit()
    g.current_user = user
    if add_to_session:
        session['nickname'] = user.nickname
    return True
项目:suite    作者:Staffjoy    | 项目源码 | 文件源码
def delete(self, org_id, location_id, role_id, user_id, timeclock_id):
        """
        deletes a timeclock record
        """

        timeclock = Timeclock.query.get_or_404(timeclock_id)
        user = User.query.get_or_404(user_id)
        original_start = timeclock.start
        original_stop = timeclock.stop

        try:
            db.session.delete(timeclock)
            db.session.commit()
        except Exception as exception:
            db.session.rollback()
            current_app.logger.error(str(exception))
            abort(400)

        if timeclock.user_id != g.current_user.id:
            alert_timeclock_change(None, org_id, location_id, role_id,
                                   original_start, original_stop, user,
                                   g.current_user)

        g.current_user.track_event("timeclock_deleted")
        return {}, 204
项目:flask-api-boilerplate    作者:mikaelm1    | 项目源码 | 文件源码
def login():
    username = request.json.get('username')
    password = request.json.get('password')
    if username and password:
        user = User.find_by_identity(username)
        if user and user.authenticated(password):
            g.current_user = user
            session_token = user.generate_auth_token(3600)
            user.session_token = session_token
            db.session.commit()
            response = user.to_json()
            return jsonify({'response': response}), 200
        else:
            return jsonify({'response':
                           {'message': 'Username or password is wrong'}}), 404
    return jsonify({'response':
                   {'message': 'Password and username not provided'}}), 500
项目:pillar    作者:armadillica    | 项目源码 | 文件源码
def get_blender_id_oauth_token() -> str:
    """Returns the Blender ID auth token, or an empty string if there is none."""

    from flask import request

    token = session.get('blender_id_oauth_token')
    if token:
        if isinstance(token, (tuple, list)):
            # In a past version of Pillar we accidentally stored tuples in the session.
            # Such sessions should be actively fixed.
            # TODO(anyone, after 2017-12-01): refactor this if-block so that it just converts
            # the token value to a string and use that instead.
            token = token[0]
            session['blender_id_oauth_token'] = token
        return token

    if request.authorization and request.authorization.username:
        return request.authorization.username

    if current_user.is_authenticated and current_user.id:
        return current_user.id

    return ''
项目:flask-blog    作者:zhuwei05    | 项目源码 | 文件源码
def verify_password(email_or_token, password):
    if email_or_token == '':
        g.current_user = AnonymousUser()
        return True
    if password == '':
        g.current_user = User.verify_auth_token(email_or_token)
        g.token_used = True
        return g.current_user is not None
    user = User.query.filter_by(email=email_or_token).first()
    if not user:
        return False
    g.current_user = user
    g.token_used = False
    return user.verify_password(password)

# 401
项目:MyFlasky    作者:aliasxu    | 项目源码 | 文件源码
def verify_password(email_or_token,password):
    if email_or_token == '':
        g.current_user = AnonymousUser()
        return True

    if password == '':
        g.current_user = User.verify_auth_token(email_or_token)
        g.token_used = True
        return g.current_user is not None

    user = User.query.filter_by(email=email_or_token).first()
    if not user:
        return False
    g.current_user = user
    g.token_used = False

    return user.verify_password(password)
项目:IntegraTI-API    作者:discentes-imd    | 项目源码 | 文件源码
def verify_token():
    """
    Verify if the token is valid, not expired and not blacklisted
    """
    if 'Authorization' in request.headers:
        if request.headers['Authorization'] in cache.blacklisted_tokens:
            abort(403, 'Error: invalid token')
        try:
            payload = jwt.decode(request.headers['Authorization'], config.SECRET_KEY)
            g.current_user = payload['id_user']
        except jwt.ExpiredSignatureError:
            abort(403, 'Error: token expired')
        except jwt.DecodeError:
            abort(403, 'Error: invalid token')
项目:flask-api-skeleton    作者:ianunruh    | 项目源码 | 文件源码
def get_user_sessions():
    return g.current_user.sessions
项目:flask-api-skeleton    作者:ianunruh    | 项目源码 | 文件源码
def delete_user_sessions():
    g.current_user.sessions.delete()
    db.session.commit()

    return ('', 204)
项目:flask-api-skeleton    作者:ianunruh    | 项目源码 | 文件源码
def delete_session(id):
    session = Session.query.get(id)
    if not (session and session.user == g.current_user):
        return make_error_response('Session not found', 404)

    db.session.delete(session)
    db.session.commit()

    return ('', 204)
项目:flask-api-skeleton    作者:ianunruh    | 项目源码 | 文件源码
def update_user(data):
    user = g.current_user

    if data['password']:
        user.change_password(data['password'])

    db.session.commit()

    return user
项目:flask-api-skeleton    作者:ianunruh    | 项目源码 | 文件源码
def inject_context(context):
    ctx = {}
    if context:
        ctx.update(context)

    ctx['current_user'] = g.current_user
    return ctx
项目:circleci-demo-python-flask    作者:CircleCI-Public    | 项目源码 | 文件源码
def new_post():
    post = Post.from_json(request.json)
    post.author = g.current_user
    db.session.add(post)
    db.session.commit()
    return jsonify(post.to_json()), 201, \
        {'Location': url_for('api.get_post', id=post.id, _external=True)}
项目:circleci-demo-python-flask    作者:CircleCI-Public    | 项目源码 | 文件源码
def edit_post(id):
    post = Post.query.get_or_404(id)
    if g.current_user != post.author and \
            not g.current_user.can(Permission.ADMINISTER):
        return forbidden('Insufficient permissions')
    post.body = request.json.get('body', post.body)
    db.session.add(post)
    return jsonify(post.to_json())
项目:circleci-demo-python-flask    作者:CircleCI-Public    | 项目源码 | 文件源码
def new_post_comment(id):
    post = Post.query.get_or_404(id)
    comment = Comment.from_json(request.json)
    comment.author = g.current_user
    comment.post = post
    db.session.add(comment)
    db.session.commit()
    return jsonify(comment.to_json()), 201, \
        {'Location': url_for('api.get_comment', id=comment.id,
                             _external=True)}
项目:circleci-demo-python-flask    作者:CircleCI-Public    | 项目源码 | 文件源码
def verify_password(email_or_token, password):
    if email_or_token == '':
        g.current_user = AnonymousUser()
        return True
    if password == '':
        g.current_user = User.verify_auth_token(email_or_token)
        g.token_used = True
        return g.current_user is not None
    user = User.query.filter_by(email=email_or_token).first()
    if not user:
        return False
    g.current_user = user
    g.token_used = False
    return user.verify_password(password)
项目:circleci-demo-python-flask    作者:CircleCI-Public    | 项目源码 | 文件源码
def before_request():
    if not g.current_user.is_anonymous and \
            not g.current_user.confirmed:
        return forbidden('Unconfirmed account')
项目:myproject    作者:dengliangshi    | 项目源码 | 文件源码
def permission_required(permission):
    """Decorator for specified permission verification.
    """
    def decorator(f):
        @wraps(f)
        def decorated_function(*args, **kwargs):
            if not g.current_user.can(permission):
                abort(403)
            return f(*args, **kwargs)
        return decorated_function
    return decorator
项目:Simpleblog    作者:Blackyukun    | 项目源码 | 文件源码
def new_post():
    post = Post.from_json(request.json)
    post.author = g.current_user
    db.session.add(post)
    db.session.commit()
    return jsonify(post.to_json()), 201, {'Location': url_for('api.get_post',id=post.id, _external=True)}

# put??
项目:Simpleblog    作者:Blackyukun    | 项目源码 | 文件源码
def edit_post(id):
    post = Post.query.get_or_404(id)
    if g.current_user != post.author and \
            not g.current_user.operation(Permission.ADMINISTER):
        return forbidden('Insufficient permissions')
    post.title = request.json.get('title', post.title)
    post.body = request.json.get('body', post.body)
    db.session.add(post)
    return jsonify(post.to_json())
项目:Simpleblog    作者:Blackyukun    | 项目源码 | 文件源码
def new_post_comment(id):
    post = Post.query.get_or_404(id)
    comment = Comment.from_json(request.json)
    comment.author = g.current_user
    comment.post = post
    db.session.add(comment)
    db.session.commit()
    return jsonify(comment.to_json()), 201, \
        {'Location': url_for('api.get_comment', id=comment.id,_external=True)}
项目:Simpleblog    作者:Blackyukun    | 项目源码 | 文件源码
def verify_password(email_or_token, password):
    if email_or_token == '':
        g.current_user = AnonymousUser()
        return True
    if password == '':
        g.current_user = User.verify_auth_token(email_or_token)
        g.token_used = True
        return g.current_user is not None
    user = User.query.filter_by(email=email_or_token).first()
    if not user:
        return False
    g.current_user = user
    g.token_used = False
    return user.verify_password(password)
项目:Simpleblog    作者:Blackyukun    | 项目源码 | 文件源码
def before_request():
    if not g.current_user.is_authenticated:
        return forbidden('Unconfirmed account')
项目:flasky    作者:RoseOu    | 项目源码 | 文件源码
def new_post():
    post = Post.from_json(request.json)
    post.author = g.current_user
    db.session.add(post)
    db.session.commit()
    return jsonify(post.to_json()), 201, \
        {'Location': url_for('api.get_post', id=post.id, _external=True)}
项目:flasky    作者:RoseOu    | 项目源码 | 文件源码
def edit_post(id):
    post = Post.query.get_or_404(id)
    if g.current_user != post.author and \
            not g.current_user.can(Permission.ADMINISTER):
        return forbidden('Insufficient permissions')
    post.body = request.json.get('body', post.body)
    db.session.add(post)
    return jsonify(post.to_json())
项目:flasky    作者:RoseOu    | 项目源码 | 文件源码
def new_post_comment(id):
    post = Post.query.get_or_404(id)
    comment = Comment.from_json(request.json)
    comment.author = g.current_user
    comment.post = post
    db.session.add(comment)
    db.session.commit()
    return jsonify(comment.to_json()), 201, \
        {'Location': url_for('api.get_comment', id=comment.id,
                             _external=True)}
项目:flasky    作者:RoseOu    | 项目源码 | 文件源码
def verify_password(email_or_token, password):
    if email_or_token == '':
        g.current_user = AnonymousUser()
        return True
    if password == '':
        g.current_user = User.verify_auth_token(email_or_token)
        g.token_used = True
        return g.current_user is not None
    user = User.query.filter_by(email=email_or_token).first()
    if not user:
        return False
    g.current_user = user
    g.token_used = False
    return user.verify_password(password)
项目:flasky    作者:RoseOu    | 项目源码 | 文件源码
def before_request():
    if not g.current_user.is_anonymous and \
            not g.current_user.confirmed:
        return forbidden('Unconfirmed account')
项目:BoerOPS    作者:BoerOPS    | 项目源码 | 文件源码
def before_pre_request():
    if request.path in ['/auth/login', '/oauth2/welcome']:
        return
    token = request.headers.get('TOKEN')
    if not token:
        return jsonify('Authorization error'), 403
    gl = gitlab.Gitlab(
        'http://gitlab.onenet.com', oauth_token=token, api_version='4')
    gl.auth()
    g.current_user = gl.user
    g.gl = gl
项目:BoerOPS    作者:BoerOPS    | 项目源码 | 文件源码
def get(self):
        return g.current_user.attributes
项目:MyCoin_Backend    作者:Four-Undefined    | 项目源码 | 文件源码
def login_required(f) :
    @wraps(f)
    def decorated(*args,**kwargs) :
        token = request.headers.get('token')
        if token is not None :
            g.current_user = User.verify_auth_token(token)
            return f(*args,**kwargs)
        return jsonify("login first!") , 401
    return decorated
项目:PilosusBot    作者:pilosus    | 项目源码 | 文件源码
def verify_password(email_or_token, password):
    if email_or_token == '':
        g.current_user = AnonymousUser()
        return True
    if password == '':
        g.current_user = User.verify_auth_token(email_or_token)
        g.token_used = True
        return g.current_user is not None
    user = User.query.filter_by(email=email_or_token).first()
    if not user:
        return False
    g.current_user = user
    g.token_used = False
    return user.verify_password(password)
项目:PilosusBot    作者:pilosus    | 项目源码 | 文件源码
def forbidden_error():
    return forbidden('unconfirmed account')

# uncomment to apply auth.login_required for each view in the blueprint #
#@webhook.before_request
#@auth.login_required
#def before_request():
#    if not g.current_user.is_anonymous and \
#            not g.current_user.confirmed:
#        return forbidden('Unconfirmed account')
项目:PilosusBot    作者:pilosus    | 项目源码 | 文件源码
def get_token():
    if g.current_user.is_anonymous or g.token_used:
        return unauthorized('Invalid credentials')
    return jsonify({'token': g.current_user.generate_auth_token(
        expiration=3600), 'expiration': 3600})
项目:pyetje    作者:rorlika    | 项目源码 | 文件源码
def new_post():
    post = Post.from_json(request.json)
    post.author = g.current_user
    db.session.add(post)
    db.session.commit()
    return jsonify(post.to_json()), 201, \
        {'Location': url_for('api.get_post', id=post.id, _external=True)}
项目:pyetje    作者:rorlika    | 项目源码 | 文件源码
def edit_post(id):
    post = Post.query.get_or_404(id)
    if g.current_user != post.author and \
            not g.current_user.can(Permission.ADMINISTER):
        return forbidden('Insufficient permissions')
    post.body = request.json.get('body', post.body)
    db.session.add(post)
    return jsonify(post.to_json())
项目:pyetje    作者:rorlika    | 项目源码 | 文件源码
def new_post_comment(id):
    post = Post.query.get_or_404(id)
    comment = Comment.from_json(request.json)
    comment.author = g.current_user
    comment.post = post
    db.session.add(comment)
    db.session.commit()
    return jsonify(comment.to_json()), 201, \
        {'Location': url_for('api.get_comment', id=comment.id,
                             _external=True)}
项目:pyetje    作者:rorlika    | 项目源码 | 文件源码
def verify_password(email_or_token, password):
    if email_or_token == '':
        g.current_user = AnonymousUser()
        return True
    if password == '':
        g.current_user = User.verify_auth_token(email_or_token)
        g.token_used = True
        return g.current_user is not None
    user = User.query.filter_by(email=email_or_token).first()
    if not user:
        return False
    g.current_user = user
    g.token_used = False
    return user.verify_password(password)
项目:pyetje    作者:rorlika    | 项目源码 | 文件源码
def before_request():
    if not g.current_user.is_anonymous and \
            not g.current_user.confirmed:
        return forbidden('Unconfirmed account')
项目:smart-iiot    作者:quanpower    | 项目源码 | 文件源码
def new_post():
    post = Post.from_json(request.json)
    post.author = g.current_user
    db.session.add(post)
    db.session.commit()
    return jsonify(post.to_json()), 201, \
        {'Location': url_for('api.get_post', id=post.id)}
项目:smart-iiot    作者:quanpower    | 项目源码 | 文件源码
def edit_post(id):
    post = Post.query.get_or_404(id)
    if g.current_user != post.author and \
            not g.current_user.can(Permission.ADMIN):
        return forbidden('Insufficient permissions')
    post.body = request.json.get('body', post.body)
    db.session.add(post)
    db.session.commit()
    return jsonify(post.to_json())
项目:smart-iiot    作者:quanpower    | 项目源码 | 文件源码
def new_post_comment(id):
    post = Post.query.get_or_404(id)
    comment = Comment.from_json(request.json)
    comment.author = g.current_user
    comment.post = post
    db.session.add(comment)
    db.session.commit()
    return jsonify(comment.to_json()), 201, \
        {'Location': url_for('api.get_comment', id=comment.id)}
项目:smart-iiot    作者:quanpower    | 项目源码 | 文件源码
def verify_password(email_or_token, password):
    if email_or_token == '':
        return False
    if password == '':
        g.current_user = User.verify_auth_token(email_or_token)
        g.token_used = True
        return g.current_user is not None
    user = User.query.filter_by(email=email_or_token).first()
    if not user:
        return False
    g.current_user = user
    g.token_used = False
    return user.verify_password(password)
项目:smart-iiot    作者:quanpower    | 项目源码 | 文件源码
def before_request():
    if not g.current_user.is_anonymous and \
            not g.current_user.confirmed:
        return forbidden('Unconfirmed account')
项目:python_demo    作者:Wasim37    | 项目源码 | 文件源码
def verify_token(token):
    if token in TOKENS:
        g.current_user = token
        return True
    return False


# ?????????