Python flask_login 模块,current_user() 实例源码

我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用flask_login.current_user()

项目:wank.party    作者:Streetwalrus    | 项目源码 | 文件源码
def inject():
    return {
        'root': _cfg("protocol") + "://" + _cfg("domain"),
        'domain': _cfg("domain"),
        'protocol': _cfg("protocol"),
        'len': len,
        'any': any,
        'request': request,
        'locale': locale,
        'url_for': url_for,
        'file_link': file_link,
        'disown_link': disown_link,
        'user': current_user,
        'moe': random.choice(moe),
        'random': random,
        'owner': _cfg("owner"),
        'owner_email': _cfg("owner_email"),
        '_cfg': _cfg
    }
项目:infilcheck    作者:jonnykry    | 项目源码 | 文件源码
def enable():
    if request.method == 'POST':
        current_user = Pi.query.filter_by(id=flask_login.current_user.id).first()
        flag=Flags.query.filter_by(id=flask_login.current_user.id).first()
        flag.request_update_settings = True

        if current_user.is_enabled:
            current_user.is_enabled = False
        else:
            current_user.is_enabled = True


        db.session.commit()

        return redirect('settings')


    return render_template('settings.html', user_data = flask_login.current_user, pi_data = Pi.query.filter_by(id=flask_login.current_user.id).first())
项目:ddots-api-server    作者:frol    | 项目源码 | 文件源码
def _grantsetter(self, client_id, code, request, *args, **kwargs):
        # pylint: disable=method-hidden,unused-argument
        # TODO: review expiration time
        # decide the expires time yourself
        expires = datetime.utcnow() + timedelta(seconds=100)
        try:
            with db.session.begin():
                grant_instance = self._grant_class(
                    client_id=client_id,
                    code=code['code'],
                    redirect_uri=request.redirect_uri,
                    scopes=request.scopes,
                    user=current_user,
                    expires=expires
                )
                db.session.add(grant_instance)
        except sqlalchemy.exc.IntegrityError:
            log.exception("Grant-setter has failed.")
            return None
        return grant_instance
项目:watchlist    作者:mbuthiya    | 项目源码 | 文件源码
def new_review(id):

    form = ReviewForm()

    movie = get_movie(id)

    if form.validate_on_submit():
        title = form.title.data
        review = form.review.data

        new_review = Review(movie_id=movie.id,movie_title=title,image_path=movie.poster,movie_review=review,user=current_user)

        new_review.save_review()

        return redirect(url_for('.movie',id = movie.id ))

    title = f'{movie.title} review'
    return render_template('new_review.html',title = title, review_form=form, movie=movie)
项目:quupod    作者:alvinwan    | 项目源码 | 文件源码
def requires(*permissions) -> str:
    """Decorator for views, restricting access to the roles listed."""
    from quupod.queue.views import render_queue

    def wrap(f):
        @wraps(f)
        def decorator(*args, **kwargs):
            user = flask_login.current_user
            if not all(user.can(perm) for perm in permissions):
                return render_queue(
                    'error.html',
                    message='Permission Denied.',
                    action='Home',
                    url=url_for('queue.home'))
            return f(*args, **kwargs)
        return decorator
    return wrap
项目:graygram-web    作者:devxoul    | 项目源码 | 文件源码
def delete_post(post_id):
    """ Delete the post

    **Example JSON response**:

    .. sourcecode:: json

        {}

    :statuscode 200: Deleted
    :statuscode 401: Not authorized
    :statuscode 403: No permission
    """
    post = m.Post.query.get_or_404(post_id)
    if post.user != current_user:
        raise Forbidden()
    db.session.delete(post)
    db.session.commit()
    return render_json({})
项目:graygram-web    作者:devxoul    | 项目源码 | 文件源码
def like_post(post_id):
    """Like the post

    **Example JSON response**:

    .. sourcecode:: json

        {}

    :statuscode 201: Liked
    :statuscode 401: Not authorized
    :statuscode 409: Already liked
    """
    post = m.Post.query.get_or_404(post_id)
    cache.delete_memoized('is_liked_by', post, current_user)
    if post.is_liked:
        raise Conflict()
    post.is_liked = True
    db.session.add(post)
    db.session.commit()
    return render_json({}), 201
项目:graygram-web    作者:devxoul    | 项目源码 | 文件源码
def unlike_post(post_id):
    """Unlike the post

    **Example JSON response**:

    .. sourcecode:: json

        {}

    :statuscode 200: Unliked
    :statuscode 401: Not authorized
    :statuscode 409: Already unliked
    """
    post = m.Post.query.get_or_404(post_id)
    cache.delete_memoized('is_liked_by', post, current_user)
    if not post.is_liked:
        raise Conflict()
    post.is_liked = False
    db.session.add(post)
    db.session.commit()
    return render_json({})
项目:fitbit-api-example-python    作者:Stasonis    | 项目源码 | 文件源码
def index():
    if not flask_login.current_user.is_authenticated:
        return redirect(url_for('main.login'))
    else:
        user_profile = "Could not access fitbit profile"
        fitbit_creds = get_user_fitbit_credentials(flask_login.current_user.id)
        if fitbit_creds:
            with fitbit_client(fitbit_creds) as client:
                try:
                    profile_response = client.user_profile_get()
                    user_profile = "{} has been on fitbit since {}".format(
                        profile_response['user']['fullName'],
                        profile_response['user']['memberSince']
                    )
                except BadResponse:
                    flash("Api Call Failed")
        return render_template('index.html', user_profile=user_profile, permission_url=get_permission_screen_url())
项目:pillar    作者:armadillica    | 项目源码 | 文件源码
def setup_jinja_env(jinja_env, app_config: dict):
    jinja_env.filters['pretty_date'] = format_pretty_date
    jinja_env.filters['pretty_date_time'] = format_pretty_date_time
    jinja_env.filters['undertitle'] = format_undertitle
    jinja_env.filters['hide_none'] = do_hide_none
    jinja_env.filters['pluralize'] = do_pluralize
    jinja_env.filters['gravatar'] = pillar.api.utils.gravatar
    jinja_env.filters['markdown'] = do_markdown
    jinja_env.filters['yesno'] = do_yesno
    jinja_env.filters['repr'] = repr
    jinja_env.globals['url_for_node'] = do_url_for_node
    jinja_env.globals['abs_url'] = functools.partial(flask.url_for,
                                                     _external=True,
                                                     _scheme=app_config['SCHEME'])
    jinja_env.globals['session'] = flask.session
    jinja_env.globals['current_user'] = flask_login.current_user
项目:Mocha    作者:mardix    | 项目源码 | 文件源码
def accepts_roles(*roles):
    """
    A decorator to check if user has any of the roles specified

    @roles_accepted('superadmin', 'admin')
    def fn():
        pass
    """

    def wrapper(f):
        @functools.wraps(f)
        def wrapped(*args, **kwargs):
            if is_authenticated():
                if not flask_login.current_user.has_any_roles(*roles):
                    return abort(403)
            else:
                return abort(401)
            return f(*args, **kwargs)

        return wrapped

    return wrapper
项目:NodeDefender    作者:CTSNE    | 项目源码 | 文件源码
def messages():
    messages = NodeDefender.db.message.messages(current_user)
    return emit('messages', ([message.to_json() for message in messages]))
项目:NodeDefender    作者:CTSNE    | 项目源码 | 文件源码
def unassigned():
    icpes = [icpe.mac_address for icpe in
            NodeDefender.db.icpe.unassigned(current_user)]
    emit('unassigned', icpes)
    return True
项目:NodeDefender    作者:CTSNE    | 项目源码 | 文件源码
def inject_user():      # Adds general data to base-template
    if current_user:
        # Return Message- inbox for user if authenticated
        return dict(current_user = current_user)
    else:
        # If not authenticated user get Guest- ID(That cant be used).
        return dict(current_user = None)
项目:Prism    作者:Stumblinbear    | 项目源码 | 文件源码
def revalidate_login():
    flask.g.current_plugin = None
    flask.g.user = flask_login.current_user

    if (flask.request.endpoint and
        not flask.request.endpoint.startswith('static') and
        not getattr(flask_app.view_functions[flask.request.endpoint] if flask.request.endpoint in flask_app.view_functions else None, 'is_public', False) and
        not is_logged_in()):
        return flask.redirect(flask.url_for('login'))
项目:repocribro    作者:MarekSuchanek    | 项目源码 | 文件源码
def on_identity_loaded(sender, identity):
    """Principal helper for loading the identity of logged user

    :param sender: Sender of the signal
    :param identity: Identity container
    :type identity: ``flask_principal.Identity``
    """
    identity.user = flask_login.current_user

    if hasattr(flask_login.current_user, 'id'):
        identity.provides.add(
            flask_principal.UserNeed(flask_login.current_user.id)
        )

    if hasattr(flask_login.current_user, 'roles'):
        for role in flask_login.current_user.roles:
            identity.provides.add(
                flask_principal.RoleNeed(role.name)
            )
项目:fame    作者:certsocietegenerale    | 项目源码 | 文件源码
def _change_password():
    current = request.form.get('current_password', '')
    new = request.form.get('new_password', '')
    confirm = request.form.get('confirm_password', '')

    if not check_password_hash(current_user['pwd_hash'], current):
        flash('Current password is invalid', 'danger')
    elif valid_new_password(new, confirm):
        change_password(current_user, new)
        flash('Password was successfully changed.', 'success')

    return redirect(request.referrer)
项目:web_develop    作者:dongweiming    | 项目源码 | 文件源码
def protected():
    user = flask_login.current_user
    return 'Logged in as: {}| Login_count: {}|IP: {}'.format(
        user.name, user.login_count, user.last_login_ip)
项目:mybookshelf2    作者:izderadicka    | 项目源码 | 文件源码
def my_rating(self):
        rating = EbookRating.query.filter(EbookRating.ebook_id == self.id, 
                                          EbookRating.created_by == current_user).one_or_none()
        if rating:
            return rating.rating
项目:eq-survey-runner    作者:ONSdigital    | 项目源码 | 文件源码
def log_exception(error, status_code):
    metadata = get_metadata(current_user)
    if metadata:
        logger.bind(tx_id=metadata['tx_id'])

    if error:
        logger.error('an error has occurred', exc_info=error, url=request.url, status_code=status_code)
    else:
        logger.error('an error has occurred', url=request.url, status_code=status_code)
项目:eq-survey-runner    作者:ONSdigital    | 项目源码 | 文件源码
def get_tx_id():
    tx_id = None
    metadata = get_metadata(current_user)
    if metadata:
        tx_id = convert_tx_id(metadata['tx_id'])
    return tx_id
项目:eq-survey-runner    作者:ONSdigital    | 项目源码 | 文件源码
def before_request():
    logger.info('feedback request', url_path=request.full_path)

    metadata = get_metadata(current_user)
    if metadata:
        logger.bind(tx_id=metadata['tx_id'])
        g.schema_json = load_schema_from_metadata(metadata)
项目:eq-survey-runner    作者:ONSdigital    | 项目源码 | 文件源码
def send_feedback():
    form = FeedbackForm()

    if form.validate():

        metadata = get_metadata(current_user)

        message = convert_feedback(
            escape(request.form.get('message')),
            escape(request.form.get('name')),
            escape(request.form.get('email')),
            request.referrer or '',
            metadata,
            g.schema_json['survey_id'],
        )

        encrypted_message = encrypt(message, current_app.eq['key_store'], key_purpose=KEY_PURPOSE_SUBMISSION)
        sent = current_app.eq['submitter'].send_message(encrypted_message,
                                                        current_app.config['EQ_RABBITMQ_QUEUE_NAME'],
                                                        metadata['tx_id'])

        if not sent:
            raise SubmissionFailedException()

    if request.form.get('redirect', 'true') == 'true':
        return redirect(url_for('feedback.thank_you'))

    return '', 200
项目:eq-survey-runner    作者:ONSdigital    | 项目源码 | 文件源码
def dump_answers():
    response = {'answers': get_answer_store(current_user).answers or []}
    return jsonify(response), 200
项目:eq-survey-runner    作者:ONSdigital    | 项目源码 | 文件源码
def with_metadata(func):
    def metadata_wrapper(*args, **kwargs):
        metadata = get_metadata(current_user)
        metadata_context = build_metadata_context(metadata)

        return func(*args, meta=metadata_context, **kwargs)

    return metadata_wrapper
项目:eq-survey-runner    作者:ONSdigital    | 项目源码 | 文件源码
def with_questionnaire_url_prefix(func):
    def url_prefix_wrapper(*args, **kwargs):
        metadata = get_metadata(current_user)
        metadata_context = build_metadata_context(metadata)
        survey_data = metadata_context['survey']

        url_prefix = '/questionnaire/{}/{}/{}'.format(
            survey_data['eq_id'],
            survey_data['form_type'],
            survey_data['collection_id'],
        )

        return func(*args, url_prefix=url_prefix, **kwargs)

    return url_prefix_wrapper
项目:eq-survey-runner    作者:ONSdigital    | 项目源码 | 文件源码
def get_path_finder():
    finder = getattr(g, 'path_finder', None)

    if finder is None:
        metadata = get_metadata(current_user)
        answer_store = get_answer_store(current_user)
        finder = PathFinder(g.schema_json, answer_store, metadata)
        g.path_finder = finder

    return finder
项目:infilcheck    作者:jonnykry    | 项目源码 | 文件源码
def protected():
    return 'Logged in as: ' + flask_login.current_user.id
项目:infilcheck    作者:jonnykry    | 项目源码 | 文件源码
def settings():
    if request.method == 'POST':
        email = request.form['email']
        curpassword = request.form['curpassword']
        password = request.form['password']
        phone = flask.request.form['phone']
        pi_id = flask.request.form['piid']

        room_name = flask.request.form['room_name']
        capture_framerate = flask.request.form['capture_framerate']
        output_framerate = flask.request.form['output_framerate']
        threshold_frame_count = flask.request.form['threshold_frame_count']

        current_user = User.query.filter_by(id=flask_login.current_user.id).first()

        if pi_id == 'true':
           current_user.pi_id = str(uuid.uuid4())

        if current_user.check_password(curpassword) and password is not '':
            current_user.passhash = generate_password_hash(password)

        current_user.email = email
        current_user.phone = phone

        pi_data = Pi.query.filter_by(id=flask_login.current_user.id).first()
        pi_data.room_name = room_name
        pi_data.capture_framerate = capture_framerate
        pi_data.output_framerate =output_framerate
        pi_data.threshold_frame_count = threshold_frame_count

        flag=Flags.query.filter_by(id=flask_login.current_user.id).first()
        flag.request_update_settings = True

        db.session.commit()

        return redirect('settings')

    return render_template('settings.html', user_data = flask_login.current_user, pi_data = Pi.query.filter_by(id=flask_login.current_user.id).first())
项目:infilcheck    作者:jonnykry    | 项目源码 | 文件源码
def dashboard():
    user = flask_login.current_user
    user_id = user.id

    video_list = []
    for video in db.session.query(Video).order_by(desc(Video.created_at)).filter(Video.user_id == user_id):
        video_list.append(video)

    username = user.email.rsplit('@', 1)[0]

    return render_template('dashboard.html', username=username, videos=video_list)
项目:infilcheck    作者:jonnykry    | 项目源码 | 文件源码
def snapshot():
    user = flask_login.current_user
    user_id = user.id

    if request.method == 'POST':
        flag = db.session.query(Flags).filter(Flags.user_id == user_id).first()
        flag.request_picture = True
        db.session.commit()

    return redirect('dashboard')
项目:zual    作者:ninadmhatre    | 项目源码 | 文件源码
def process(cls, post, render=True):
        """
        This method takes the post data and renders it
        :param post:
        :param render:
        :return:
        """
        post["slug"] = cls.create_slug(post["title"])
        post["editable"] = cls.is_author(post, current_user)
        post["url"] = cls.construct_url(post)
        if render:
            cls.render_text(post)
        cls.custom_process(post)
        return
项目:zual    作者:ninadmhatre    | 项目源码 | 文件源码
def process(cls, post, render=True):
        """
        This method takes the post data and renders it
        :param post:
        :param render:
        :return:
        """
        post["slug"] = cls.create_slug(post["title"])
        post["editable"] = cls.is_author(post, current_user)
        post["url"] = cls.construct_url(post)
        if render:
            cls.render_text(post)
        cls.custom_process(post)
        return
项目:zual    作者:ninadmhatre    | 项目源码 | 文件源码
def process(cls, post, render=True):
        """
        This method takes the post data and renders it
        :param post:
        :param render:
        :return:
        """
        post["slug"] = cls.create_slug(post["title"])
        post["editable"] = cls.is_author(post, current_user)
        post["url"] = cls.construct_url(post)
        if render:
            cls.render_text(post)
        cls.custom_process(post)
        return
项目:NZ-ORCID-Hub    作者:Royal-Society-of-New-Zealand    | 项目源码 | 文件源码
def save_grant(client_id, code, request, *args, **kwargs):  # noqa: D103
    expires = datetime.utcnow() + timedelta(seconds=100)
    return Grant.create(
        client=Client.get(client_id=client_id),
        code=code["code"],
        redirect_uri=request.redirect_uri,
        _scopes=' '.join(request.scopes),
        user=current_user,
        expires=expires)
项目:cci-demo-flask    作者:circleci    | 项目源码 | 文件源码
def _get_notes(self):
        return list(filter(lambda x: (
            x.author == current_user and not x.is_deleted), self.notes))
项目:mincloud    作者:number13dev    | 项目源码 | 文件源码
def before_request():
    g.user = flask_login.current_user
    g.testing = myapp.config['TESTING']
    g.texts = texts
    g.urls = urls
项目:ddots-api-server    作者:frol    | 项目源码 | 文件源码
def post(self, args):
        """
        Create a new problem.
        """
        with api.commit_or_abort(
                db.session,
                default_error_message="Failed to create a new problem"
            ):
            problem = Problem(creator=current_user, **args)
            db.session.add(problem)
        return problem
项目:ddots-api-server    作者:frol    | 项目源码 | 文件源码
def post(self, args):
        """
        Create a new team.
        """
        with api.commit_or_abort(
                db.session,
                default_error_message="Failed to create a new team"
            ):
            team = Team(**args)
            db.session.add(team)
            team_member = TeamMember(team=team, user=current_user, is_leader=True)
            db.session.add(team_member)
        return team
项目:ddots-api-server    作者:frol    | 项目源码 | 文件源码
def post(self, args):
        """
        Upload a new solution.
        """
        with api.commit_or_abort(
                db.session,
                default_error_message="Failed to create a new solution"
            ):
            solution = Solution(author=current_user, **args)
            db.session.add(solution)
        return solution
项目:flask-rest-crud    作者:Farael49    | 项目源码 | 文件源码
def get_current_user():
    return user_schema_secure.jsonify(current_user)
项目:extranet    作者:demaisj    | 项目源码 | 文件源码
def profile():
    user.update(current_user)
    return render_template('profile.html')
项目:flask-template-project    作者:andreffs18    | 项目源码 | 文件源码
def test_get_user(self):
        """Ensure get_user classmethod is returning always what we expect"""
        # test: invalid input
        self.assertRaises(ValidationError, umodels.User.get_user, None)
        # test: UserMixin (from flask.login) object
        self.assertEqual(self.user, umodels.User.get_user(self.user))
        # test: LocalProxy object
        login_user(self.user)
        self.assertEqual(self.user, umodels.User.get_user(current_user))
        # test: User Object from db but by "username" and by "id"
        self.assertEqual(self.user, umodels.User.get_user("username"))
        self.assertEqual(self.user, umodels.User.get_user(self.user.id))
项目:quupod    作者:alvinwan    | 项目源码 | 文件源码
def current_user():
    """Return currently-logged-in user."""
    return flask_login.current_user
项目:quupod    作者:alvinwan    | 项目源码 | 文件源码
def anonymous_required(f):
    """Decorator for views that require anonymous users (e.g., sign in)."""
    from .models import User

    @wraps(f)
    def decorator(*args, **kwargs):
        user = flask_login.current_user
        if user.is_authenticated:
            return User.get_home(user)
        return f(*args, **kwargs)
    return decorator
项目:quupod    作者:alvinwan    | 项目源码 | 文件源码
def login_required_else(message: str):
    """If user is not logged in, prompt for login with the provided message."""
    def wrap(f):
        @wraps(f)
        def decorator(*args, **kwargs):
            if not current_user().is_authenticated:
                return render(
                    'confirm.html',
                    title='Login Required',
                    message=message)
            return f(*args, **kwargs)
        return decorator
    return wrap
项目:quupod    作者:alvinwan    | 项目源码 | 文件源码
def get_num_current_requests(name: str):
        """Get user's number of queued requests for the current queue."""
        if flask_login.current_user.is_authenticated:
            filter_id = User.email == flask_login.current_user.email
        else:
            filter_id = User.name == name
        return Inquiry.query.join(User).filter(
                filter_id,
                Inquiry.status == 'unresolved',
                Inquiry.queue_id == g.queue.id).count()
项目:quupod    作者:alvinwan    | 项目源码 | 文件源码
def get_current_asking() -> db.Model:
        """Return current resolution for the logged-in, non-staff user."""
        return Inquiry.query.filter_by(
            owner_id=flask_login.current_user.id,
            status='unresolved',
            queue_id=g.queue.id).first()
项目:quupod    作者:alvinwan    | 项目源码 | 文件源码
def get_current_user_inquiries(limit: int=10) -> [db.Model]:
        """Return list of all inquiries associated with the current user."""
        user = flask_login.current_user
        if user.is_authenticated:
            return Inquiry.query.filter_by(id=user.id).limit(limit).all()
        return Inquiry.query.filter_by(name=user.name).limit(limit).all()