我们从Python开源项目中,提取了以下45个代码示例,用于说明如何使用flask.ext.login.current_user.is_anonymous()。
def oauth_callback(provider): if not current_user.is_anonymous: return redirect(url_for('index')) oauth = OAuthSignIn.get_provider(provider) xd_id, name, email = oauth.callback() if xd_id is None: flash('Authentication failed.') return redirect(url_for('index')) user = User.query.filter_by(xd_id=xd_id).first() if not user: # Create, add and login new user. Redirect to /register user = User(xd_id=xd_id, name=name, email=email) db.session.add(user) db.session.commit() login_user(user, True) return redirect(url_for('dash')) # previously register else: # Login new user. Redirect to / login_user(user, True) return redirect(url_for('index'))
def delete_email_address(): if current_user.is_anonymous: flash('You need to be logged in to do that') return redirect(url_for('index')) try: current_user.email = None db.session.commit() except exc.SQLAlchemyError: # TODO log this flash('Something went wrong while deleting your email from our database.') db.session.rollback() oauth = OAuthSignIn.get_provider('facebook') # Strip out the 'facebook$' at the start of the id user_id = re.findall('\d+', current_user.social_id)[0] permission_revoked = oauth.revoke_email_permission(user_id) if not permission_revoked: flash('There was a problem giving up the permission to access your email address. ' 'It may be re-added to your account here the next time you sign in. ' 'To permanently remove it, please use your privacy settings in Facebook.') return redirect(url_for('index'))
def password_reset_request(): if not current_user.is_anonymous: return redirect(url_for('main.index')) form = PasswordResetRequestForm() if form.validate_on_submit(): user = User.query.filter_by(email=form.email.data).first() if user: token = user.generate_reset_token() send_email(user.email, 'Reset Your Password', 'auth/email/reset_password', user=user, token=token, next=request.args.get('next')) flash('An email with instructions to reset your password has been ' 'sent to you.') return redirect(url_for('auth.login')) return render_template('auth/reset_password.html', form=form)
def changePasswordForUser(username): if not userManager.enabled: return jsonify(SUCCESS) if current_user is not None and not current_user.is_anonymous() and (current_user.get_name() == username or current_user.is_admin()): if not "application/json" in request.headers["Content-Type"]: return make_response("Expected content-type JSON", 400) try: data = request.json except BadRequest: return make_response("Malformed JSON body in request", 400) if not "password" in data or not data["password"]: return make_response("password is missing from request", 400) try: userManager.changeUserPassword(username, data["password"]) except users.UnknownUser: return make_response(("Unknown user: %s" % username, 404, [])) return jsonify(SUCCESS) else: return make_response(("Forbidden", 403, []))
def changeSettingsForUser(username): if not userManager.enabled: return jsonify(SUCCESS) if current_user is None or current_user.is_anonymous() or (current_user.get_name() != username and not current_user.is_admin()): return make_response("Forbidden", 403) try: data = request.json except BadRequest: return make_response("Malformed JSON body in request", 400) try: userManager.changeUserSettings(username, data) return jsonify(SUCCESS) except users.UnknownUser: return make_response("Unknown user: %s" % username, 404)
def password_reset_request(): if not current_user.is_anonymous: return redirect(url_for('main.index')) form = PasswordResetRequestForm() if form.validate_on_submit(): user = User.query.filter_by(email = form.email.data).first() if user: token = user.generate_reset_token() send_email(user.email, 'Reset Your Password', 'auth/email/reset_password', user = user, token = token, next = request.args.get('next')) flash('An email with instructions to reset your password has been ' 'send to you.') return redirect(url_for('auth.login')) return render_template('auth/reset_password.html', form = form)
def test_correct_login(self): with self.client: response = self.login("admin", "adminpassword") # 200 (OK) HTTP status code self.assert200(response) # check if user is authenticated self.assertTrue(current_user.is_authenticated()) # check if user is not anonymous self.assertFalse(current_user.is_anonymous()) # get user id self.assertEqual(current_user.get_id(), "1") # test user redirects to the main page self.assertIn('/posts', request.url) # Ensure alert is shown after logging in # Binary format because str() object doesn't support Buffer api self.assertIn(b'you were just logged in', response.data)
def pwd_reset_request(): if not current_user.is_anonymous: #??????????????? return redirect('main.index') form = ResetpwdReqForm() if form.validate_on_submit(): user = User.query.filter_by(email=form.email.data).first() if user: token = user.generate_reset_token() send_email(user.email, 'Reset Your Password', 'auth/email/reset_password', user=user, token=token, next=request.args.get('next')) flash('???????????????') return redirect(url_for('auth.login')) return render_template('auth/reset_password.html', form=form) #??????????
def password_reset(token): if not current_user.is_anonymous: return redirect(url_for('main.index')) form = ResetpwdForm() if form.validate_on_submit(): user = User.query.filter_by(email=form.email.data).first() if user is None: return redirect('main.index') if user.reset_password(token, form.password.data): flash('??????.') return redirect(url_for('auth.login')) else: return redirect(url_for('main.index')) return render_template('auth/reset_password.html', form=form) #??????
def ensure_authorized_to(action, resource, **kwargs): authorized = is_authorized(current_user, action, resource, **kwargs) if authorized is False: if current_user.is_anonymous(): raise abort(401) else: raise abort(403) return authorized
def password_needed(self, project, user_id_or_ip): """Check if password is required.""" if project.needs_password() and (current_user.is_anonymous() or not (current_user.admin or current_user.id == project.owner_id)): cookie = self.cookie_handler.get_cookie_from(project) request_passwd = user_id_or_ip not in cookie return request_passwd return False
def _update_object(self, obj): if not current_user.is_anonymous(): obj.user_id = current_user.id
def _add_user_info(self, taskrun): if current_user.is_anonymous(): taskrun.user_ip = request.remote_addr if taskrun.user_ip is None: taskrun.user_ip = '127.0.0.1' else: taskrun.user_id = current_user.id
def _update_object(self, obj): if not current_user.is_anonymous(): obj.owner_id = current_user.id
def _select_attributes(self, data): if current_user.is_anonymous(): data = self._filter_private_data(data) return data if (current_user.is_authenticated and (current_user.id == data['owner_id'] or current_user.admin)): return data else: data = self._filter_private_data(data) return data
def user_progress(project_id=None, short_name=None): """API endpoint for user progress. Return a JSON object with two fields regarding the tasks for the user: { 'done': 10, 'total: 100 } This will mean that the user has done a 10% of the available tasks for him """ if project_id or short_name: if short_name: project = project_repo.get_by_shortname(short_name) elif project_id: project = project_repo.get(project_id) if project: # For now, keep this version, but wait until redis cache is used here for task_runs too query_attrs = dict(project_id=project.id) if current_user.is_anonymous(): query_attrs['user_ip'] = request.remote_addr or '127.0.0.1' else: query_attrs['user_id'] = current_user.id taskrun_count = task_repo.count_task_runs_with(**query_attrs) tmp = dict(done=taskrun_count, total=n_tasks(project.id)) return Response(json.dumps(tmp), mimetype="application/json") else: return abort(404) else: # pragma: no cover return abort(404)
def get_twitter_token(): # pragma: no cover """Get Twitter token from session.""" if current_user.is_anonymous(): return None return((current_user.info['twitter_token']['oauth_token'], current_user.info['twitter_token']['oauth_token_secret']))
def get_facebook_token(): # pragma: no cover """Get Facebook token from session.""" if current_user.is_anonymous(): return session.get('oauth_token') else: return (current_user.info['facebook_token']['oauth_token'], '')
def get_google_token(): # pragma: no cover """Get Google Token from session.""" if current_user.is_anonymous(): return session.get('oauth_token') else: return (current_user.info['google_token']['oauth_token'], '')
def oauth_authorize(provider): if not current_user.is_anonymous: return redirect(url_for('index')) oauth = OAuthSignIn.get_provider(provider) return oauth.authorize()
def oauth_authorize(): """Invoked when the user clicks the "login with..." button.""" if not current_user.is_anonymous: return redirect(url_for('index')) # TODO allow logging in through Twitter, or another service other than FB oauth = OAuthSignIn.get_provider('facebook') assert(isinstance(oauth, FacebookSignIn)) return oauth.authorize()
def oauth_rerequest_permissions(): """ Lets the user grant us permission to access their email address, should they have denied us permission the first time around. Sends them to Facebook's OAuth permissions dialog. """ if current_user.is_anonymous: flash('You need to be logged in to do that.') return redirect(url_for('index')) oauth = OAuthSignIn.get_provider('facebook') return oauth.rerequest_permissions()
def oauth_callback_authorize(): if not current_user.is_anonymous: return redirect(url_for('index')) oauth = OAuthSignIn.get_provider('facebook') search_terms = request.args.get('search_terms_to_save', '') social_id, username, email, is_email_granted = oauth.callback_authorize(search_terms) if social_id is None: flash('Authentication failed.') return redirect(url_for('index')) user = User.query.filter_by(social_id=social_id).first() if not user: user = User(social_id=social_id, nickname=username, email=email) try: db.session.add(user) db.session.commit() except exc.SQLAlchemyError: flash('Something went wrong while adding you as a user. Sorry!') # TODO log this return redirect(url_for('index')) else: # Update email and nickname upon login to those provided by Facebook. if email and email != user.email: update_email_address(email) if username and username != user.nickname: update_nickname(username) login_user(user, True) return redirect(url_for('index', search_terms_to_save=search_terms))
def add_search(): if current_user.is_anonymous: return json_failed("You need to be logged in to save a search.") already_saved_searches = SavedSearch.query.filter_by(owner=current_user).all() if len(already_saved_searches) >= 25: return json_failed('You can only have up to 25 saved searches. ' 'Please delete some before you make more.') search_terms = request.form['search_terms'] if not search_terms: return json_failed("You can't save a search with blank search terms.") if len(search_terms) > MAX_SEARCH_LENGTH: return json_failed("The entered search criteria are too long " "({0} characters.) Please limit your search's " "length to {1} characters.".format(len(search_terms), MAX_SEARCH_LENGTH)) search = SavedSearch(owner=current_user, search_terms=search_terms, timestamp=datetime.datetime.utcnow()) try: db.session.add(search) db.session.commit() return jsonify({ "status": "success", "search_id": search.id }) except exc.SQLAlchemyError: # TODO log this db.session.rollback() return json_failed('Something went wrong while saving your search in' ' our database.')
def unconfirmed(): if current_user.is_anonymous or current_user.confirmed: return redirect(url_for('main.index')) return render_template('auth/unconfirmed.html') # ????????
def oauth_authorize(provider): if not current_user.is_anonymous: return redirect(url_for('main.index')) oauth = OAuthSignIn.get_provider(provider) return oauth.authorize()
def oauth_callback(provider): if not current_user.is_anonymous: return redirect(url_for('main.index')) oauth = OAuthSignIn.get_provider(provider) social_id, username, email = oauth.callback() if social_id is None: flash('Authentication failed.') return redirect(url_for('main.index')) user = User.query.filter_by(social_id=social_id).first() if not user: user = User(social_id=social_id, username=username, email=email,password=generate_pass(),confirmed=True) db.session.add(user) db.session.commit() login_user(user, True) return redirect(url_for('main.index'))
def unconfirmed(): if current_user.is_anonymous or current_user.confirmed: return redirect('main.index') return render_template('auth/unconfirmed.html')
def password_reset(token): if not current_user.is_anonymous: return redirect(url_for('main.index')) form = PasswordResetForm() if form.validate_on_submit(): user = User.query.filter_by(email=form.email.data).first() if user is None: return redirect(url_for('main.index')) if user.reset_password(token, form.password.data): flash('Your password has been updated.') return redirect(url_for('auth.login')) else: return redirect(url_for('main.index')) return render_template('auth/reset_password.html', form=form)
def index(): if current_user.is_anonymous: return render_template("index.html") else: return render_template("index_user.html")
def getUser(username): if not userManager.enabled: return jsonify(SUCCESS) if current_user is not None and not current_user.is_anonymous() and (current_user.get_name() == username or current_user.is_admin()): user = userManager.findUser(username) if user is not None: return jsonify(user.asDict()) else: abort(404) else: abort(403)
def getSettingsForUser(username): if not userManager.enabled: return jsonify(SUCCESS) if current_user is None or current_user.is_anonymous() or (current_user.get_name() != username and not current_user.is_admin()): return make_response("Forbidden", 403) try: return jsonify(userManager.getAllUserSettings(username)) except users.UnknownUser: return make_response("Unknown user: %s" % username, 404)
def deleteApikeyForUser(username): if not userManager.enabled: return jsonify(SUCCESS) if current_user is not None and not current_user.is_anonymous() and (current_user.get_name() == username or current_user.is_admin()): try: userManager.deleteApikey(username) except users.UnknownUser: return make_response(("Unknown user: %s" % username, 404, [])) return jsonify(SUCCESS) else: return make_response(("Forbidden", 403, []))
def _etag(lm=None): if lm is None: lm = _lastmodified() connection_options = printer.__class__.get_connection_options() plugins = sorted(octoprint.plugin.plugin_manager().enabled_plugins) plugin_settings = _get_plugin_settings() from collections import OrderedDict sorted_plugin_settings = OrderedDict() for key in sorted(plugin_settings.keys()): sorted_plugin_settings[key] = plugin_settings.get(key, dict()) if current_user is not None and not current_user.is_anonymous(): roles = sorted(current_user.roles) else: roles = [] import hashlib hash = hashlib.sha1() # last modified timestamp hash.update(str(lm)) # effective config from config.yaml + overlays hash.update(repr(settings().effective)) # might duplicate settings().effective, but plugins might also inject additional keys into the settings # output that are not stored in config.yaml hash.update(repr(sorted_plugin_settings)) # connection options are also part of the settings hash.update(repr(connection_options)) # if the list of plugins changes, the settings structure changes too hash.update(repr(plugins)) # and likewise if the role of the user changes hash.update(repr(roles)) return hash.hexdigest()
def unconfirmed(): if current_user.is_anonymous or current_user.confirmed: return redirect(url_for('main.index')) return render_template('auth/unconfirmed.html')
def password_reset(token): if not current_user.is_anonymous: return redirect(url_for('main.index')) form = PasswordResetForm() if form.validate_on_submit(): user = User.query.filter_by(email = form.email.data).first() if user is None: return redirect(url_for('main.index')) if user.reset_password(token, form.password.data): flash('Your password has been updated.') return redirect(url_for('auth.login')) else: return redirect(url_for('main.index')) return render_template('auth/reset_password.html', form = form)
def on_settings_load(self): data = octoprint.plugin.SettingsPlugin.on_settings_load(self) # only return our restricted settings to admin users - this is only needed for OctoPrint <= 1.2.16 restricted = (("token", None), ("tracking_token", None), ("chats", dict())) for r, v in restricted: if r in data and (current_user is None or current_user.is_anonymous() or not current_user.is_admin()): data[r] = v return data
def logout(): if not current_user.is_anonymous(): logout_user() return redirect(url_for('main.index'))
def __iter__(self): """Iterate through all :class:`HwProxy` instances. :return: Iterable object of :class:`HwProxy` instances. """ if not current_user.is_anonymous() and current_user.is_admin: return iter(self.items) return (i for i in self.items if not i.is_hidden())
def unconfirmed(): if current_user.is_anonymous or current_user.confirmed: #????????????????????? return redirect(url_for('main.index')) return render_template('auth/unconfirmed.html') #????
def get(self, oid): """Return all the tasks favorited by current user.""" try: if current_user.is_anonymous(): raise abort(401) uid = current_user.id tasks = task_repo.filter_tasks_by_user_favorites(uid) data = self._create_json_response(tasks, oid) return Response(data, 200, mimetype='application/json') except Exception as e: return error.format_exception( e, target=self.__class__.__name__.lower(), action='GET')
def post(self): """Add User ID to task as a favorite.""" try: self.valid_args() data = json.loads(request.data) if (len(data.keys()) != 1) or ('task_id' not in data.keys()): raise AttributeError if current_user.is_anonymous(): raise Unauthorized uid = current_user.id tasks = task_repo.get_task_favorited(uid, data['task_id']) if len(tasks) == 1: task = tasks[0] if len(tasks) == 0: task = task_repo.get_task(data['task_id']) if task is None: raise NotFound if task.fav_user_ids is None: task.fav_user_ids = [uid] else: task.fav_user_ids.append(uid) task_repo.update(task) self._log_changes(None, task) return Response(json.dumps(task.dictize()), 200, mimetype='application/json') except Exception as e: return error.format_exception( e, target=self.__class__.__name__.lower(), action='POST')
def delete(self, oid): """Delete User ID from task as a favorite.""" try: if current_user.is_anonymous(): raise abort(401) uid = current_user.id tasks = task_repo.get_task_favorited(uid, oid) if tasks == []: raise NotFound if len(tasks) == 1: task = tasks[0] idx = task.fav_user_ids.index(uid) task.fav_user_ids.pop(idx) task_repo.update(task) return Response(json.dumps(task.dictize()), 200, mimetype='application/json') except Exception as e: return error.format_exception( e, target=self.__class__.__name__.lower(), action='DEL')
def task_presenter(short_name, task_id): (project, owner, n_tasks, n_task_runs, overall_progress, last_activity, n_results) = project_by_shortname(short_name) task = task_repo.get_task(id=task_id) if task is None: raise abort(404) if project.needs_password(): redirect_to_password = _check_if_redirect_to_password(project) if redirect_to_password: return redirect_to_password else: ensure_authorized_to('read', project) if current_user.is_anonymous(): if not project.allow_anonymous_contributors: msg = ("Oops! You have to sign in to participate in " "<strong>%s</strong>" "project" % project.name) flash(gettext(msg), 'warning') return redirect(url_for('account.signin', next=url_for('.presenter', short_name=project.short_name))) else: msg_1 = gettext( "Ooops! You are an anonymous user and will not " "get any credit" " for your contributions.") next_url = url_for('project.task_presenter', short_name=short_name, task_id=task_id) url = url_for('account.signin', next=next_url) flash(msg_1 + "<a href=\"" + url + "\">Sign in now!</a>", "warning") title = project_title(project, "Contribute") project_sanitized, owner_sanitized = sanitize_project_owner(project, owner, current_user) template_args = {"project": project_sanitized, "title": title, "owner": owner_sanitized} def respond(tmpl): response = dict(template = tmpl, **template_args) return handle_content_type(response) if not (task.project_id == project.id): return respond('/projects/task/wrong.html') guard = ContributionsGuard(sentinel.master) guard.stamp(task, get_user_id_or_ip()) if has_no_presenter(project): flash(gettext("Sorry, but this project is still a draft and does " "not have a task presenter."), "error") return respond('/projects/presenter.html')