我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用flask_login.current_user.is_authenticated()。
def register(): form = RegistrationForm() if current_user.is_authenticated: return redirect(url_for('profile', username=current_user.username)) if form.validate_on_submit(): # Check to make sure username not taken if db_utils.user_exists(engine, form.username.data): flash('Username already taken.', 'danger') return redirect(url_for('register')) # Create user else: db_utils.create_user(engine, form.username.data, form.password.data, form.email.data) flash('Thanks for registering', 'success') return redirect(url_for('login')) return render_template('register.html', form=form)
def login(): if current_user.is_authenticated: return redirect(url_for('index')) form = LoginForm() if request.method == 'POST' and form.validate_on_submit(): username = form.username.data if '@' in username: existing_user = User.query.filter_by(email=username).first() else: existing_user = User.query.filter_by(username=username).first() if not (existing_user and existing_user.check_password(form.password.data)): flash('Invalid username or password. Please try again.', 'warning') return render_template('login.html', form=form) login_user(existing_user) db.session.add(Connection(user=existing_user, address=request.remote_addr)) db.session.commit() return redirect(url_for('index')) if form.errors: flash(form.errors, 'danger') return render_template('login.html', form=form)
def recover_password(token): if current_user.is_authenticated: return redirect(url_for('index')) form = ResetPasswordForm() if request.method == 'POST' and form.validate_on_submit(): user = User.query.filter_by(reset_password_token=token).first() if not user: flash('The mail or username is not in our DataBase.', 'warning') return render_template('forgot_password.html', form=form) user.password = form.password.data user.reset_password_token = None db.session.commit() # TODO: Send some message informing user about their password reset return redirect(url_for('login')) if request.method == 'GET': user = User.query.filter_by(reset_password_token=token).first() if not user: flash('The recover password token is invalid.', 'warning') return redirect(url_for('forgot_password')) if form.errors: flash(form.errors, 'danger') return render_template('recover_password.html', form=form)
def index(): form = PostForm() if current_user.can(Permission.WRITE_ARTICLES) and \ form.validate_on_submit(): post = Post(body=form.body.data, author=current_user._get_current_object()) db.session.add(post) return redirect(url_for('.index')) page = request.args.get('page', 1, type=int) show_followed = False if current_user.is_authenticated: show_followed=bool(request.cookies.get('show_followed', '')) if show_followed: query = current_user.followed_posts else: query = Post.query pagination = query.order_by(Post.timestamp.desc()).paginate( page, per_page=current_app.config['POSTS_PER_PAGE'], error_out=False) posts=pagination.items return render_template('index.html', form=form, posts=posts, show_followed=show_followed, pagination=pagination)
def authorized(access_token): next_url = session.get('next_url') if next_url is None: next_url = url_for('index') else: session.pop('next_url') if access_token is None: return redirect(next_url) session['github_token'] = access_token user = github_helper.get_user() if user is None: return redirect(next_url) if current_user.is_authenticated: current_user.github_id = user['id'] current_user.github_token = access_token current_user.github_username = user['login'] if not current_user.avatar_url: current_user.avatar_url = user['avatar_url'] db.session.commit() return redirect(next_url) user = User.query.filter_by(github_id=user['id']).first() if user is not None: login_user(user) return redirect(next_url)
def login_required(role=0, group='open'): """ This is a redefinition of the decorator login_required, to include a 'role' argument to allow users with different roles access different views and a group access to close some views by groups. For example: @login_required(role=0, group='ntuwn') 0 = for all """ def wrapper(fn): @wraps(fn) def decorated_view(*args, **kwargs): if not current_user.is_authenticated: return login_manager.unauthorized() if current_user.role < role: return login_manager.unauthorized() if group != 'open' and current_user.group != group: return login_manager.unauthorized() return fn(*args, **kwargs) return decorated_view return wrapper
def event_list(): form = AddUserToEventForm(request.form) if request.method == 'POST' and form.validate(): if current_user.is_anonymous or not current_user.is_authenticated: flash('You must log in to register for events.') return redirect(url_for('events.event_list')) event = Event.query.filter(Event.id == form.event_id.data).first() if current_user in event.users: flash('You are already registered for this event!') return redirect(url_for('events.event_list')) else: event.users.append(current_user) db_session.add(event) db_session.commit() flash('Registration successful!') return redirect(url_for('events.event_list')) else: events = Event.query.limit(10) return render_template('events.html', events=events, form=form)
def login(): if current_user.is_authenticated: return redirect(url_for('index')) form = LoginForm() if not form.validate_on_submit(): status_code = Unauthorized.code if form.is_submitted() else 200 return render_template('login.html', title='Login', form=form, User=User, password_length={'min': TRACKER_PASSWORD_LENGTH_MIN, 'max': TRACKER_PASSWORD_LENGTH_MAX}), status_code user = user_assign_new_token(form.user) user.is_authenticated = True login_user(user) return redirect(url_for('index'))
def index(): form = PostForm() if current_user.can(Permission.WRITE_ARTICLES) and \ form.validate_on_submit(): post = Post(body=form.body.data, author=current_user._get_current_object()) db.session.add(post) return redirect(url_for('.index')) page = request.args.get('page', 1, type=int) show_followed = False if current_user.is_authenticated: show_followed = bool(request.cookies.get('show_followed', '')) if show_followed: query = current_user.followed_posts else: query = Post.query pagination = query.order_by(Post.timestamp.desc()).paginate( page, per_page=current_app.config['CIRCULATE_POSTS_PER_PAGE'], error_out=False) posts = pagination.items return render_template('index.html', form=form, posts=posts, show_followed=show_followed, pagination=pagination)
def actintro(activity): '''Club Activity Page''' if current_user.is_authenticated: has_access = (current_user == activity.club.leader or current_user == activity.club.teacher or current_user.type == UserType.ADMIN) try: can_join = False selection = activity.signup_user_status(current_user)['selection'] except NoRow: selection = '' can_join = (current_user.type == UserType.STUDENT) else: can_join = False has_access = False selection = '' is_other_act = activity.time in [ActivityTime.UNKNOWN, ActivityTime.OTHERS] return render_template('activity/actintro.jinja2', is_other_act=is_other_act, has_access=has_access, can_join=can_join, selection=selection)
def configure_before_handlers(app): """Configures the before request handlers.""" @app.before_request def update_lastseen(): """Updates `lastseen` before every reguest if the user is authenticated.""" if current_user.is_authenticated: current_user.lastseen = datetime.datetime.utcnow() db.session.add(current_user) db.session.commit() if app.config["REDIS_ENABLED"]: @app.before_request def mark_current_user_online(): if current_user.is_authenticated: mark_online(current_user.username) else: mark_online(request.remote_addr, guest=True)
def get_forums(query_result, user): """Returns a tuple which contains the category and the forums as list. This is the counterpart for get_categories_and_forums and especially usefull when you just need the forums for one category. For example:: (<Category 2>, [(<Forum 3>, None), (<Forum 4>, None)]) :param query_result: A tuple (KeyedTuple) with all categories and forums :param user: The user object is needed because a signed out user does not have the ForumsRead relation joined. """ it = itertools.groupby(query_result, operator.itemgetter(0)) if user.is_authenticated: for key, value in it: forums = key, [(item[1], item[2]) for item in value] else: for key, value in it: forums = key, [(item[1], None) for item in value] return forums
def require_login_api(func): """ A custom implementation of Flask-login's built-in @login_required decorator. This decorator will allow usage of the API endpoint if the user is either currently logged in via the app or if the user authenticates with an API key in the POST JSON parameters. This implementation overrides the behavior taken when the current user is not authenticated by returning the predefined AUTH_FAILURE JSON response with HTTP status code 401. This decorator is intended for use with API endpoints. """ @wraps(func) def decorator(*args, **kwargs): data = request.get_json() if current_user.is_authenticated: return func(*args, **kwargs) try: if data and data.get('api_key'): user = database.user.get_user_by_api_key(data['api_key'], active_only=True) login_user(user) del data['api_key'] request.get_json = lambda: data return func(*args, **kwargs) except UserDoesNotExistException: return jsonify(AUTH_FAILURE), AUTH_FAILURE_CODE return jsonify(AUTH_FAILURE), AUTH_FAILURE_CODE return decorator
def optional_login_api(func): """ This decorator is similar in behavior to require_login_api, but is intended for use with endpoints that offer extended functionality with a login, but can still be used without any authentication. The decorator will set current_user if authentication via an API key is provided, and will continue without error otherwise. This decorator is intended for use with API endpoints. """ @wraps(func) def decorator(*args, **kwargs): data = request.get_json() if current_user.is_authenticated: return func(*args, **kwargs) try: if data and data.get('api_key'): user = database.user.get_user_by_api_key(data['api_key'], active_only=True) login_user(user) del data['api_key'] request.get_json = lambda: data return func(*args, **kwargs) except UserDoesNotExistException: return jsonify(AUTH_FAILURE), AUTH_FAILURE_CODE return func(*args, **kwargs) return decorator
def test_optional_login_api_deactivated_user(self): with app.test_request_context(): @optional_login_api def login_optional(): if current_user.is_authenticated: return 'authenticated' return 'not authenticated' self.assertEqual('not authenticated', login_optional()) user = util.testing.UserFactory.generate() database.user.deactivate_user(user.user_id) request.get_json = lambda: { 'api_key': user.api_key, } resp, resp_code = login_optional() self.assertEqual(constants.api.AUTH_FAILURE, json.loads(resp.data)) self.assertEqual(constants.api.AUTH_FAILURE_CODE, resp_code)
def index(self): if not current_user.is_authenticated: return redirect(url_for('admin.login_view')) connzsky = pymysql.connect(host=DB_HOST,port=DB_PORT_MYSQL,user=DB_USER,password=DB_PASS,db=DB_NAME_MYSQL,charset=DB_CHARSET,cursorclass=pymysql.cursors.DictCursor) currzsky = connzsky.cursor() totalsql = 'select count(id) from search_hash' currzsky.execute(totalsql) totalcounts=currzsky.fetchall() total=int(totalcounts[0]['count(id)']) todaysql='select count(id) from search_hash where to_days(search_hash.create_time)= to_days(now())' currzsky.execute(todaysql) todaycounts=currzsky.fetchall() today=int(todaycounts[0]['count(id)']) currzsky.close() connzsky.close() return self.render('admin/index.html',total=total,today=today)
def test_flask_login_user_login(app): # LoginManager could be set up in app fixture in conftest.py instead login_manager = LoginManager() login_manager.init_app(app) # TODO: once event is marked, user id exists in MonthEvents and test will # continue to pass, regardless of continued success; set to current # microsecond to temporarily circumvent, but there should be a better # way to fix user_id assignment (or tear down redis or something) user_id = datetime.now().microsecond with app.test_request_context(): # set up and log in user user = User() user.id = user_id login_user(user) # test that user was logged in assert current_user.is_active assert current_user.is_authenticated assert current_user == user # test that user id was marked with 'user:logged_in' event assert user_id in MonthEvents('user:logged_in', now.year, now.month)
def login(): form = LoginForm() if current_user.is_authenticated: return redirect(url_for('index')) if request.method == 'POST': user = User.query.filter(User.username == form.username.data).first() if ( user and user.password == form.password.data ): login_user(user) return redirect(url_for('index')) else: flash('Username or password wrong') return render_template('login.html', form=form)
def signup(): if current_user.is_authenticated(): return redirect('/user') form = SignupForm() if request.method == 'POST' and form.validate_on_submit(): user = User(form.username.data, form.email.data, form.password.data) db.session.add(user) db.session.commit() login_user(user, remember=True) return redirect(request.args.get('next') or '/user') return render_template('signup.html', form=form)
def login(): next = request.args.get('next', url_for('manage.index')) if current_user and current_user.is_authenticated: return safe_redirect(next, url_for('manage.index')) if current_app.config['DOORMAN_AUTH_METHOD'] not in (None, 'doorman', 'ldap'): authorization_url = current_app.oauth_provider.get_authorize_url() current_app.logger.debug("Redirecting user to %s", authorization_url) return redirect(authorization_url) form = LoginForm() if form.validate_on_submit(): login_user(form.user, remember=form.remember.data) flash(u'Welcome {0}.'.format(form.user.username), 'info') current_app.logger.info("%s logged in", form.user.username) return safe_redirect(next, url_for('manage.index')) if form.errors: flash(u'Invalid username or password.', 'danger') return render_template('login.html', form=form)
def get_user(): """User information. .. note:: **Privacy note** A users IP address, user agent string, and user id (if logged in) is sent to a message queue, where it is stored for about 5 minutes. The information is used to: - Detect robot visits from the user agent string. - Generate an anonymized visitor id (using a random salt per day). - Detect the users host contry based on the IP address. The information is then discarded. """ return dict( ip_address=request.remote_addr, user_agent=request.user_agent.string, user_id=( current_user.get_id() if current_user.is_authenticated else None ), )
def index(): form = PostForm() if current_user.can(Permission.WRITE) and form.validate_on_submit(): post = Post(body=form.body.data, author=current_user._get_current_object()) db.session.add(post) db.session.commit() return redirect(url_for('.index')) page = request.args.get('page', 1, type=int) show_followed = False if current_user.is_authenticated: show_followed = bool(request.cookies.get('show_followed', '')) if show_followed: query = current_user.followed_posts else: query = Post.query pagination = query.order_by(Post.timestamp.desc()).paginate( page, per_page=current_app.config['FLASKY_POSTS_PER_PAGE'], error_out=False) posts = pagination.items return render_template('index.html', form=form, posts=posts, show_followed=show_followed, pagination=pagination)
def roles_required(*roles): # noqa: D202 """Docorate handler with role requiremnts.""" def wrapper(fn): @wraps(fn) def decorated_view(*args, **kwargs): if not current_user.is_authenticated: return login_manager.unauthorized() for r in roles: if current_user.has_role(r): return fn(*args, **kwargs) else: return login_manager.unauthorized() return decorated_view return wrapper
def show_book(): books = Book.query.all() if request.method == 'POST': if not current_user.is_authenticated: abort(403) title = request.form["title"] book = Book(title=title) db.session.add(book) db.session.commit() return redirect(url_for('.show_book')) return render_template( 'book.html', books=books )
def signup(): if current_user.is_authenticated: return redirect(url_for('index')) form = SignUpForm(request.form) if form.validate_on_submit(): # Handle duplicate account creation, could be more elegant if User.query.filter_by(email=form.email): flash('Email already exists, try again', 'warning') return redirect(url_for('accounts.signup')) user = User() form.populate_obj(user) user.createdOn = datetime.datetime.now() user.updatedOn = datetime.datetime.now() db.session.add(user) db.session.commit() login_user(user) flash('Signed up successfully. Welcome!', 'success') return redirect(url_for('index')) return render_template('account/signup.html', title = 'Sign Up', form = form)
def login(): # Already logged in; return to index if current_user.is_authenticated: return redirect(url_for('index')) # Not logged in; show the login form or errors form = LoginForm() if form.validate_on_submit(): user = User.query.filter_by(email = form.email.data).first() if user is not None and user.valid_password(form.password.data): if login_user(user, remember = form.remember.data): session.permanent = not form.remember.data #Need to add proper message flashing code to base.html user.lastLoggedIn = datetime.datetime.now() db.session.commit() flash('Logged in successfully!', category = 'success') return redirect(request.args.get('next') or url_for('index')) else: flash('This username is disabled', 'danger') else: flash('Wrong username or password', 'danger') return render_template('account/login.html', title = 'Login', form = form)
def index(): form = PostForm() if current_user.can(Permission.WRITE_ARTICLES) and \ form.validate_on_submit(): post = Post(body=form.body.data, author=current_user._get_current_object()) db.session.add(post) return redirect(url_for('.index')) page = request.args.get('page', 1, type=int) show_followed = False if current_user.is_authenticated: show_followed = bool(request.cookies.get('show_followed', '')) if show_followed: query = current_user.followed_posts else: query = Post.query pagination = query.order_by(Post.timestamp.desc()).paginate( page, per_page=current_app.config['FLASKY_POSTS_PER_PAGE'], error_out=False) posts = pagination.items return render_template('index.html', form=form, posts=posts, show_followed=showfollowed, pagination=pagination)
def index(): group, groups = [], [] user_id = current_user.id if current_user.is_authenticated else 0 terms = db.and_(UserTopics.c.user_id==user_id, UserTopics.c.topic_id==Topic.id) case = db.case([(UserTopics.c.user_id==user_id, True)], else_=False) query = db.session.query(Topic, case.label('has_following')) query = query.outerjoin(UserTopics, terms) topics = query.order_by( Topic.issues_count.desc(), Topic.followers_count.desc() ).limit(36).all() for t in topics: group.append(t) if len(group) >= 3: groups.append(group) group = [] if len(group) > 0: groups.append(group) return render_template('topics/index.html', groups=groups, navbar_active='topics')
def post(self, name): if not current_user.is_authenticated: return abort(401, message='permission denied') for topic in current_user.following_topics: if topic.name == name: return topic topic = topics_service.get(name) if topic is None: return abort(400, message='topic not found') try: current_user.following_topics.append(topic) topic.followers_count += 1 print topic.followers_count db.session.commit() except: db.session.rollback() return abort(500, message='operation failed') return topic
def delete(self, name): topic = None if not current_user.is_authenticated: return abort(401, message='permission denied') for topic in current_user.following_topics: if topic.name == name: break else: return abort(400, message='topic not found') try: current_user.following_topics.remove(topic) if topic.followers_count > 0: topic.followers_count -= 1 db.session.commit() except: db.session.rollback() return abort(500, message='operation failed')
def authorize(*args, **kwargs): # pylint: disable=unused-argument """ This endpoint asks user if he grants access to his data to the requesting application. """ # TODO: improve implementation. This implementation is broken because we # don't use cookies, so there is no session which client could carry on. # OAuth2 server should probably be deployed on a separate domain, so we # can implement a login page and store cookies with a session id. # ALTERNATIVELY, authorize page can be implemented as SPA (single page # application) if not current_user.is_authenticated: return api.abort(code=HTTPStatus.UNAUTHORIZED) if request.method == 'GET': client_id = kwargs.get('client_id') oauth2_client = OAuth2Client.query.get_or_404(client_id=client_id) kwargs['client'] = oauth2_client kwargs['user'] = current_user # TODO: improve template design return render_template('authorize.html', **kwargs) confirm = request.form.get('confirm', 'no') return confirm == 'yes'
def profile_settings(username): user = User.query.filter_by(name=username).first_or_404() form = ProfileSettings() if form.validate_on_submit(): # POST request user.bio = form.bio.data user.website = form.website.data db.session.add(user) db.session.commit() flash("new settings were successfully applied", "success") view = "posts.user_profile_posts" return redirect(url_for(view, username=user.name)) # GET request if current_user.is_authenticated and current_user.name == user.name: form.website.data = current_user.website form.bio.data = current_user.bio return render_template("profile_settings.html", form=form, user_bio=user.bio, title="profile settings" ) else: return abort(403)
def configure_before_handlers(app): """Configures the before request handlers.""" @app.before_request def update_lastseen(): """Updates `lastseen` before every reguest if the user is authenticated.""" if current_user.is_authenticated: current_user.lastseen = time_utcnow() db.session.add(current_user) db.session.commit() if app.config["REDIS_ENABLED"]: @app.before_request def mark_current_user_online(): if current_user.is_authenticated: mark_online(current_user.username) else: mark_online(request.remote_addr, guest=True)
def login(): if current_user.is_authenticated: return redirect("/") form = LoginForm() if form.validate_on_submit(): candidate_username = form.username.data candidate_password = form.password.data real_user = User.query.filter_by(name=candidate_username).first() if real_user is None: form.username.errors.append("Username does not exist.") del candidate_password return utils.render_with_navbar("login.html", form=form) else: if check_password(candidate_password, real_user.password): login_user(User.query.get(real_user.id_)) del candidate_password return redirect("/") else: form.password.errors.append("Username and password do not match.") del candidate_password return utils.render_with_navbar("login.html", form=form) return utils.render_with_navbar("login.html", form=form)
def jw_login_required(f): @wraps(f) def decorated_function(*args, **kwargs): if not current_user.is_authenticated: return redirect(url_for('helper.jw_login', next=request.path)) test_url = current_app.config['JW_BASE_URL'][0] + 'student/studentinfo/index.do' try: test_req = current_user.spd.jws.get(test_url, allow_redirects=False, timeout = 1).status_code except: test_req = 302 if test_req != 200: test_url = current_app.config['JW_BASE_URL'][1] + 'student/studentinfo/index.do' try: test_req = current_user.spd.jws.get(test_url, allow_redirects=False, timeout=1).status_code except: test_req = 302 if test_req != 200: return redirect(url_for('helper.jw_login', next=request.path)) pass return f(*args, **kwargs) return decorated_function
def pillar_api(token=None): # Cache API objects on the request per token. api = getattr(request, 'pillar_api', {}).get(token) if api is not None: return api # Check if current_user is initialized (in order to support manage.py # scripts and non authenticated server requests). use_token = token if token is None and current_user and current_user.is_authenticated: use_token = current_user.id api = FlaskInternalApi( endpoint=pillar_server_endpoint(), username=None, password=None, token=use_token ) if token is None: if not hasattr(request, 'pillar_api'): request.pillar_api = {} request.pillar_api[token] = api return api
def logout(): if not current_user.is_authenticated: return redirect(url_for('index')) user_invalidate(current_user) logout_user() return redirect(url_for('index'))
def login(): """ login handler route redirects to Google oauth uri if user is not logged in """ if current_user.is_authenticated: return redirect(url_for('index')) google = get_google_auth() # get google oauth url auth_url, state = google.authorization_url( Auth.AUTH_URI, access_type='offline') # set oauth state session['oauth_state'] = state # redirect to google for auth return redirect(auth_url)
def set_login_status(): """ set current login status Used in jinja templates """ if current_user.is_authenticated: g.logged_in = True g.user_email = current_user.email else: g.logged_in = False
def index(): if current_user.is_authenticated: return redirect(url_for('webui.dashboard')) else: return render_template('index.html')
def login(): if request.method == 'GET' and not current_user.is_authenticated: return render_template('login.html') elif request.method == 'POST': username = request.form.get('username') password = request.form.get('password') if current_app.config['USERNAME'] != username or current_app.config['PASSWORD'] != password: flash('Login failed!', 'danger') return redirect(request.url) else: login_user(User()) flash('Login succeed!', 'success') return redirect(url_for('admin.mange')) return redirect('/')
def logout(): if not current_user.is_authenticated: abort(403) logout_user() return redirect('/')
def login(): if not current_user.is_authenticated: abort(403)
def admin_required(func): """Check that user is logged in and an administrator.""" @wraps(func) def decorated_view(*args, **kwargs): # See implementation of flask_login.utils.login_required if request.method in EXEMPT_METHODS: return func(*args, **kwargs) elif login_manager._login_disabled: return func(*args, **kwargs) elif not (current_user.is_authenticated and current_user.is_admin): return login_manager.unauthorized() return func(*args, **kwargs) return decorated_view
def login(): if current_user and current_user.is_authenticated: return redirect(url_for("user.home")) form = LoginForm() if form.validate_on_submit(): user = get_user_by_username_or_email(form.username_or_email.data) login_user(user, form.remember.data) return redirect(url_for("user.home")) return render_template("login.html", form=form)
def register(): if current_user and current_user.is_authenticated: return redirect(url_for("user.home")) form = RegisterForm() if form.validate_on_submit(): user = create_user(form.username.data, form.email.data, form.password.data) init_user_plan(user, form.coupon.data or None) login_user(user, remember=True) return redirect(url_for("user.home")) return render_template("register.html", form=form)
def is_accessible(self): return current_user and current_user.is_authenticated and current_user.is_admin