我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用flask_login.current_user()。
def inject(): return { 'root': _cfg("protocol") + "://" + _cfg("domain"), 'domain': _cfg("domain"), 'protocol': _cfg("protocol"), 'len': len, 'any': any, 'request': request, 'locale': locale, 'url_for': url_for, 'file_link': file_link, 'disown_link': disown_link, 'user': current_user, 'moe': random.choice(moe), 'random': random, 'owner': _cfg("owner"), 'owner_email': _cfg("owner_email"), '_cfg': _cfg }
def enable(): if request.method == 'POST': current_user = Pi.query.filter_by(id=flask_login.current_user.id).first() flag=Flags.query.filter_by(id=flask_login.current_user.id).first() flag.request_update_settings = True if current_user.is_enabled: current_user.is_enabled = False else: current_user.is_enabled = True db.session.commit() return redirect('settings') return render_template('settings.html', user_data = flask_login.current_user, pi_data = Pi.query.filter_by(id=flask_login.current_user.id).first())
def _grantsetter(self, client_id, code, request, *args, **kwargs): # pylint: disable=method-hidden,unused-argument # TODO: review expiration time # decide the expires time yourself expires = datetime.utcnow() + timedelta(seconds=100) try: with db.session.begin(): grant_instance = self._grant_class( client_id=client_id, code=code['code'], redirect_uri=request.redirect_uri, scopes=request.scopes, user=current_user, expires=expires ) db.session.add(grant_instance) except sqlalchemy.exc.IntegrityError: log.exception("Grant-setter has failed.") return None return grant_instance
def new_review(id): form = ReviewForm() movie = get_movie(id) if form.validate_on_submit(): title = form.title.data review = form.review.data new_review = Review(movie_id=movie.id,movie_title=title,image_path=movie.poster,movie_review=review,user=current_user) new_review.save_review() return redirect(url_for('.movie',id = movie.id )) title = f'{movie.title} review' return render_template('new_review.html',title = title, review_form=form, movie=movie)
def requires(*permissions) -> str: """Decorator for views, restricting access to the roles listed.""" from quupod.queue.views import render_queue def wrap(f): @wraps(f) def decorator(*args, **kwargs): user = flask_login.current_user if not all(user.can(perm) for perm in permissions): return render_queue( 'error.html', message='Permission Denied.', action='Home', url=url_for('queue.home')) return f(*args, **kwargs) return decorator return wrap
def delete_post(post_id): """ Delete the post **Example JSON response**: .. sourcecode:: json {} :statuscode 200: Deleted :statuscode 401: Not authorized :statuscode 403: No permission """ post = m.Post.query.get_or_404(post_id) if post.user != current_user: raise Forbidden() db.session.delete(post) db.session.commit() return render_json({})
def like_post(post_id): """Like the post **Example JSON response**: .. sourcecode:: json {} :statuscode 201: Liked :statuscode 401: Not authorized :statuscode 409: Already liked """ post = m.Post.query.get_or_404(post_id) cache.delete_memoized('is_liked_by', post, current_user) if post.is_liked: raise Conflict() post.is_liked = True db.session.add(post) db.session.commit() return render_json({}), 201
def unlike_post(post_id): """Unlike the post **Example JSON response**: .. sourcecode:: json {} :statuscode 200: Unliked :statuscode 401: Not authorized :statuscode 409: Already unliked """ post = m.Post.query.get_or_404(post_id) cache.delete_memoized('is_liked_by', post, current_user) if not post.is_liked: raise Conflict() post.is_liked = False db.session.add(post) db.session.commit() return render_json({})
def index(): if not flask_login.current_user.is_authenticated: return redirect(url_for('main.login')) else: user_profile = "Could not access fitbit profile" fitbit_creds = get_user_fitbit_credentials(flask_login.current_user.id) if fitbit_creds: with fitbit_client(fitbit_creds) as client: try: profile_response = client.user_profile_get() user_profile = "{} has been on fitbit since {}".format( profile_response['user']['fullName'], profile_response['user']['memberSince'] ) except BadResponse: flash("Api Call Failed") return render_template('index.html', user_profile=user_profile, permission_url=get_permission_screen_url())
def setup_jinja_env(jinja_env, app_config: dict): jinja_env.filters['pretty_date'] = format_pretty_date jinja_env.filters['pretty_date_time'] = format_pretty_date_time jinja_env.filters['undertitle'] = format_undertitle jinja_env.filters['hide_none'] = do_hide_none jinja_env.filters['pluralize'] = do_pluralize jinja_env.filters['gravatar'] = pillar.api.utils.gravatar jinja_env.filters['markdown'] = do_markdown jinja_env.filters['yesno'] = do_yesno jinja_env.filters['repr'] = repr jinja_env.globals['url_for_node'] = do_url_for_node jinja_env.globals['abs_url'] = functools.partial(flask.url_for, _external=True, _scheme=app_config['SCHEME']) jinja_env.globals['session'] = flask.session jinja_env.globals['current_user'] = flask_login.current_user
def accepts_roles(*roles): """ A decorator to check if user has any of the roles specified @roles_accepted('superadmin', 'admin') def fn(): pass """ def wrapper(f): @functools.wraps(f) def wrapped(*args, **kwargs): if is_authenticated(): if not flask_login.current_user.has_any_roles(*roles): return abort(403) else: return abort(401) return f(*args, **kwargs) return wrapped return wrapper
def messages(): messages = NodeDefender.db.message.messages(current_user) return emit('messages', ([message.to_json() for message in messages]))
def unassigned(): icpes = [icpe.mac_address for icpe in NodeDefender.db.icpe.unassigned(current_user)] emit('unassigned', icpes) return True
def inject_user(): # Adds general data to base-template if current_user: # Return Message- inbox for user if authenticated return dict(current_user = current_user) else: # If not authenticated user get Guest- ID(That cant be used). return dict(current_user = None)
def revalidate_login(): flask.g.current_plugin = None flask.g.user = flask_login.current_user if (flask.request.endpoint and not flask.request.endpoint.startswith('static') and not getattr(flask_app.view_functions[flask.request.endpoint] if flask.request.endpoint in flask_app.view_functions else None, 'is_public', False) and not is_logged_in()): return flask.redirect(flask.url_for('login'))
def on_identity_loaded(sender, identity): """Principal helper for loading the identity of logged user :param sender: Sender of the signal :param identity: Identity container :type identity: ``flask_principal.Identity`` """ identity.user = flask_login.current_user if hasattr(flask_login.current_user, 'id'): identity.provides.add( flask_principal.UserNeed(flask_login.current_user.id) ) if hasattr(flask_login.current_user, 'roles'): for role in flask_login.current_user.roles: identity.provides.add( flask_principal.RoleNeed(role.name) )
def _change_password(): current = request.form.get('current_password', '') new = request.form.get('new_password', '') confirm = request.form.get('confirm_password', '') if not check_password_hash(current_user['pwd_hash'], current): flash('Current password is invalid', 'danger') elif valid_new_password(new, confirm): change_password(current_user, new) flash('Password was successfully changed.', 'success') return redirect(request.referrer)
def protected(): user = flask_login.current_user return 'Logged in as: {}| Login_count: {}|IP: {}'.format( user.name, user.login_count, user.last_login_ip)
def my_rating(self): rating = EbookRating.query.filter(EbookRating.ebook_id == self.id, EbookRating.created_by == current_user).one_or_none() if rating: return rating.rating
def log_exception(error, status_code): metadata = get_metadata(current_user) if metadata: logger.bind(tx_id=metadata['tx_id']) if error: logger.error('an error has occurred', exc_info=error, url=request.url, status_code=status_code) else: logger.error('an error has occurred', url=request.url, status_code=status_code)
def get_tx_id(): tx_id = None metadata = get_metadata(current_user) if metadata: tx_id = convert_tx_id(metadata['tx_id']) return tx_id
def before_request(): logger.info('feedback request', url_path=request.full_path) metadata = get_metadata(current_user) if metadata: logger.bind(tx_id=metadata['tx_id']) g.schema_json = load_schema_from_metadata(metadata)
def send_feedback(): form = FeedbackForm() if form.validate(): metadata = get_metadata(current_user) message = convert_feedback( escape(request.form.get('message')), escape(request.form.get('name')), escape(request.form.get('email')), request.referrer or '', metadata, g.schema_json['survey_id'], ) encrypted_message = encrypt(message, current_app.eq['key_store'], key_purpose=KEY_PURPOSE_SUBMISSION) sent = current_app.eq['submitter'].send_message(encrypted_message, current_app.config['EQ_RABBITMQ_QUEUE_NAME'], metadata['tx_id']) if not sent: raise SubmissionFailedException() if request.form.get('redirect', 'true') == 'true': return redirect(url_for('feedback.thank_you')) return '', 200
def dump_answers(): response = {'answers': get_answer_store(current_user).answers or []} return jsonify(response), 200
def with_metadata(func): def metadata_wrapper(*args, **kwargs): metadata = get_metadata(current_user) metadata_context = build_metadata_context(metadata) return func(*args, meta=metadata_context, **kwargs) return metadata_wrapper
def with_questionnaire_url_prefix(func): def url_prefix_wrapper(*args, **kwargs): metadata = get_metadata(current_user) metadata_context = build_metadata_context(metadata) survey_data = metadata_context['survey'] url_prefix = '/questionnaire/{}/{}/{}'.format( survey_data['eq_id'], survey_data['form_type'], survey_data['collection_id'], ) return func(*args, url_prefix=url_prefix, **kwargs) return url_prefix_wrapper
def get_path_finder(): finder = getattr(g, 'path_finder', None) if finder is None: metadata = get_metadata(current_user) answer_store = get_answer_store(current_user) finder = PathFinder(g.schema_json, answer_store, metadata) g.path_finder = finder return finder
def protected(): return 'Logged in as: ' + flask_login.current_user.id
def settings(): if request.method == 'POST': email = request.form['email'] curpassword = request.form['curpassword'] password = request.form['password'] phone = flask.request.form['phone'] pi_id = flask.request.form['piid'] room_name = flask.request.form['room_name'] capture_framerate = flask.request.form['capture_framerate'] output_framerate = flask.request.form['output_framerate'] threshold_frame_count = flask.request.form['threshold_frame_count'] current_user = User.query.filter_by(id=flask_login.current_user.id).first() if pi_id == 'true': current_user.pi_id = str(uuid.uuid4()) if current_user.check_password(curpassword) and password is not '': current_user.passhash = generate_password_hash(password) current_user.email = email current_user.phone = phone pi_data = Pi.query.filter_by(id=flask_login.current_user.id).first() pi_data.room_name = room_name pi_data.capture_framerate = capture_framerate pi_data.output_framerate =output_framerate pi_data.threshold_frame_count = threshold_frame_count flag=Flags.query.filter_by(id=flask_login.current_user.id).first() flag.request_update_settings = True db.session.commit() return redirect('settings') return render_template('settings.html', user_data = flask_login.current_user, pi_data = Pi.query.filter_by(id=flask_login.current_user.id).first())
def dashboard(): user = flask_login.current_user user_id = user.id video_list = [] for video in db.session.query(Video).order_by(desc(Video.created_at)).filter(Video.user_id == user_id): video_list.append(video) username = user.email.rsplit('@', 1)[0] return render_template('dashboard.html', username=username, videos=video_list)
def snapshot(): user = flask_login.current_user user_id = user.id if request.method == 'POST': flag = db.session.query(Flags).filter(Flags.user_id == user_id).first() flag.request_picture = True db.session.commit() return redirect('dashboard')
def process(cls, post, render=True): """ This method takes the post data and renders it :param post: :param render: :return: """ post["slug"] = cls.create_slug(post["title"]) post["editable"] = cls.is_author(post, current_user) post["url"] = cls.construct_url(post) if render: cls.render_text(post) cls.custom_process(post) return
def save_grant(client_id, code, request, *args, **kwargs): # noqa: D103 expires = datetime.utcnow() + timedelta(seconds=100) return Grant.create( client=Client.get(client_id=client_id), code=code["code"], redirect_uri=request.redirect_uri, _scopes=' '.join(request.scopes), user=current_user, expires=expires)
def _get_notes(self): return list(filter(lambda x: ( x.author == current_user and not x.is_deleted), self.notes))
def before_request(): g.user = flask_login.current_user g.testing = myapp.config['TESTING'] g.texts = texts g.urls = urls
def post(self, args): """ Create a new problem. """ with api.commit_or_abort( db.session, default_error_message="Failed to create a new problem" ): problem = Problem(creator=current_user, **args) db.session.add(problem) return problem
def post(self, args): """ Create a new team. """ with api.commit_or_abort( db.session, default_error_message="Failed to create a new team" ): team = Team(**args) db.session.add(team) team_member = TeamMember(team=team, user=current_user, is_leader=True) db.session.add(team_member) return team
def post(self, args): """ Upload a new solution. """ with api.commit_or_abort( db.session, default_error_message="Failed to create a new solution" ): solution = Solution(author=current_user, **args) db.session.add(solution) return solution
def get_current_user(): return user_schema_secure.jsonify(current_user)
def profile(): user.update(current_user) return render_template('profile.html')
def test_get_user(self): """Ensure get_user classmethod is returning always what we expect""" # test: invalid input self.assertRaises(ValidationError, umodels.User.get_user, None) # test: UserMixin (from flask.login) object self.assertEqual(self.user, umodels.User.get_user(self.user)) # test: LocalProxy object login_user(self.user) self.assertEqual(self.user, umodels.User.get_user(current_user)) # test: User Object from db but by "username" and by "id" self.assertEqual(self.user, umodels.User.get_user("username")) self.assertEqual(self.user, umodels.User.get_user(self.user.id))
def current_user(): """Return currently-logged-in user.""" return flask_login.current_user
def anonymous_required(f): """Decorator for views that require anonymous users (e.g., sign in).""" from .models import User @wraps(f) def decorator(*args, **kwargs): user = flask_login.current_user if user.is_authenticated: return User.get_home(user) return f(*args, **kwargs) return decorator
def login_required_else(message: str): """If user is not logged in, prompt for login with the provided message.""" def wrap(f): @wraps(f) def decorator(*args, **kwargs): if not current_user().is_authenticated: return render( 'confirm.html', title='Login Required', message=message) return f(*args, **kwargs) return decorator return wrap
def get_num_current_requests(name: str): """Get user's number of queued requests for the current queue.""" if flask_login.current_user.is_authenticated: filter_id = User.email == flask_login.current_user.email else: filter_id = User.name == name return Inquiry.query.join(User).filter( filter_id, Inquiry.status == 'unresolved', Inquiry.queue_id == g.queue.id).count()
def get_current_asking() -> db.Model: """Return current resolution for the logged-in, non-staff user.""" return Inquiry.query.filter_by( owner_id=flask_login.current_user.id, status='unresolved', queue_id=g.queue.id).first()
def get_current_user_inquiries(limit: int=10) -> [db.Model]: """Return list of all inquiries associated with the current user.""" user = flask_login.current_user if user.is_authenticated: return Inquiry.query.filter_by(id=user.id).limit(limit).all() return Inquiry.query.filter_by(name=user.name).limit(limit).all()