我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用flask.redirect()。
def index(): code = request.args.get("code", "") app.logger.debug("code: %s" %code) #app.logger.debug(request.args) if g.signin: return "logged_in" elif code: _data = Get_Access_Token(code) access_token = _data.get('access_token') userData = Get_User_Info(access_token) app.logger.debug(userData) #resp = render_template('info.html', userData=userData) #resp.set_cookie(key="logged_in", value='true', expires=None) resp = jsonify(userData) resp.set_cookie(key="logged_in", value='true', expires=None) return resp else: return redirect(url_for("login"))
def twittercallback(): verification = request.args["oauth_verifier"] auth = tweepy.OAuthHandler(CONSUMER_KEY, CONSUMER_SECRET) try: auth.request_token = session["request_token"] except KeyError: flash("Please login again", "danger") return redirect(url_for("bp.home")) try: auth.get_access_token(verification) except tweepy.TweepError: flash("Failed to get access token", "danger") return redirect(url_for("bp.home")) session["access_token"] = auth.access_token session["access_token_secret"] = auth.access_token_secret return render_template("twittercallback.html", form=HashtagForm())
def webpage(): url = request.args.get('url') if not url: # redirect with url query param so that user can navigate back later next_rec = service.get_next_unlabelled() if next_rec: return redirect("/?url=%s" % (urllib.quote(next_rec['url']))) else: featured_content = "No Unlabelled Record Found." else: featured_content = get_next(url) data = { 'featured_content': featured_content, 'status': service.overall_status() } return render_template('index.html', **data)
def edit_view(self, pk): """edit view function :param pk: the primary key of the model to be edited. """ obj = self.query_object(pk) form = self.edit_form_class(obj=obj) if form.validate_on_submit(): form.populate_obj(obj) obj.save() message = self.edit_flash_message if message is None: message = self.object_name + ' updated' if message: flash(message) return redirect(self.edit_redirect_url) context = self.edit_view_context({self.edit_form_name: form}) return render_template(self.edit_template, **context)
def delete_view(self, pk): """delete view function :param pk: the primary key of the model to be deleted. """ obj = self.query_object(pk) form = self.delete_form_class(obj=obj) if form.validate_on_submit(): obj.delete() message = self.delete_flash_message if message is None: message = self.object_name + ' deleted' if message: flash(message) return redirect(self.delete_redirect_url) context = self.delete_view_context({self.delete_form_name: form}) return render_template(self.delete_template, **context)
def get(self): if request.cookies.get('save_id'): resp = make_response(redirect(url_for('.exit'))) resp.set_cookie('user_name', expires=0) resp.set_cookie('login_time', expires=0) resp.set_cookie('save_id', expires=0) return resp if session.get('name'): session.pop('name') if session.get('show_name'): session.pop('show_name') if session.get('user_id'): session.pop('user_id') return redirect(url_for('.login')) # ?config.json ???? is_register ?false??????? ??????????????
def get_zip(self, project, ty): """Get a ZIP file directly from uploaded directory or generate one on the fly and upload it if not existing.""" filename = self.download_name(project, ty) if not self.zip_existing(project, ty): print "Warning: Generating %s on the fly now!" % filename self._make_zip(project, ty) if isinstance(uploader, local.LocalUploader): filepath = self._download_path(project) res = send_file(filename_or_fp=safe_join(filepath, filename), mimetype='application/octet-stream', as_attachment=True, attachment_filename=filename) # fail safe mode for more encoded filenames. # It seems Flask and Werkzeug do not support RFC 5987 http://greenbytes.de/tech/tc2231/#encoding-2231-char # res.headers['Content-Disposition'] = 'attachment; filename*=%s' % filename return res else: return redirect(url_for('rackspace', filename=filename, container=self._container(project), _external=True))
def build_sitemap(): from redberry.models import RedPost, RedCategory from apesmit import Sitemap sm = Sitemap(changefreq='weekly') for post in RedPost.all_published(): sm.add(url_for('redberry.show_post', slug=post.slug, _external=True), lastmod=post.updated_at.date()) for category in RedCategory.query.all(): sm.add(url_for('redberry.show_category', category_slug=category.slug, _external=True), lastmod=category.updated_at.date()) with open(os.path.join(REDBERRY_ROOT, 'static', 'redberry', 'sitemap.xml'), 'w') as f: sm.write(f) flash("Sitemap created.", 'success') return redirect(url_for('redberry.home')) ############## # ADMIN ROUTES ##############
def index(): form = NameForm() if form.validate_on_submit(): user = User.query.filter_by(username=form.name.data).first() if user is None: user = User(username=form.name.data) db.session.add(user) session['known'] = False if app.config['FLASKY_ADMIN']: send_email(app.config['FLASKY_ADMIN'], 'New User', 'mail/new_user', user=user) else: session['known'] = True session['name'] = form.name.data from.name.data = '' return redirect(url_for('index')) return render_template('index.html', form=form, name=session.get('name'), known=session.get('known', False))
def post(self): if (request.form['username']): data = {"user": request.form['username'], "key": request.form['password']} result = dockletRequest.unauthorizedpost('/login/', data) ok = result and result.get('success', None) if (ok and (ok == "true")): # set cookie:docklet-jupyter-cookie for jupyter notebook resp = make_response(redirect(request.args.get('next',None) or '/dashboard/')) app_key = os.environ['APP_KEY'] resp.set_cookie('docklet-jupyter-cookie', cookie_tool.generate_cookie(request.form['username'], app_key)) # set session for docklet session['username'] = request.form['username'] session['nickname'] = result['data']['nickname'] session['description'] = result['data']['description'] session['avatar'] = '/static/avatar/'+ result['data']['avatar'] session['usergroup'] = result['data']['group'] session['status'] = result['data']['status'] session['token'] = result['data']['token'] return resp else: return redirect('/login/') else: return redirect('/login/')
def get(self): form = external_generate.external_auth_generate_request() result = dockletRequest.unauthorizedpost('/external_login/', form) ok = result and result.get('success', None) if (ok and (ok == "true")): # set cookie:docklet-jupyter-cookie for jupyter notebook resp = make_response(redirect(request.args.get('next',None) or '/dashboard/')) app_key = os.environ['APP_KEY'] resp.set_cookie('docklet-jupyter-cookie', cookie_tool.generate_cookie(result['data']['username'], app_key)) # set session for docklet session['username'] = result['data']['username'] session['nickname'] = result['data']['nickname'] session['description'] = result['data']['description'] session['avatar'] = '/static/avatar/'+ result['data']['avatar'] session['usergroup'] = result['data']['group'] session['status'] = result['data']['status'] session['token'] = result['data']['token'] return resp else: return redirect('/login/')
def post(self): form = external_generate.external_auth_generate_request() result = dockletRequest.unauthorizedpost('/external_login/', form) ok = result and result.get('success', None) if (ok and (ok == "true")): # set cookie:docklet-jupyter-cookie for jupyter notebook resp = make_response(redirect(request.args.get('next',None) or '/dashboard/')) app_key = os.environ['APP_KEY'] resp.set_cookie('docklet-jupyter-cookie', cookie_tool.generate_cookie(result['data']['username'], app_key)) # set session for docklet session['username'] = result['data']['username'] session['nickname'] = result['data']['nickname'] session['description'] = result['data']['description'] session['avatar'] = '/static/avatar/'+ result['data']['avatar'] session['usergroup'] = result['data']['group'] session['status'] = result['data']['status'] session['token'] = result['data']['token'] return resp else: return redirect('/login/')
def post(self): masterip = self.masterip index1 = self.image.rindex("_") index2 = self.image[:index1].rindex("_") checkname(self.clustername) data = { "clustername": self.clustername, 'imagename': self.image[:index2], 'imageowner': self.image[index2+1:index1], 'imagetype': self.image[index1+1:], } result = dockletRequest.post("/cluster/create/", dict(data, **(request.form)), masterip) if(result.get('success', None) == "true"): return redirect("/dashboard/") #return self.render(self.template_path, user = session['username']) else: return self.render(self.error_path, message = result.get('message'))
def post(self): masterip = self.masterip data = { "clustername": self.clustername, "image": self.imagename, "containername": self.containername, "description": self.description, "isforce": self.isforce } result = dockletRequest.post("/cluster/save/", data, masterip) if(result): if result.get('success') == 'true': #return self.render(self.success_path, user = session['username']) return redirect("/config/") #res = detailClusterView() #res.clustername = self.clustername #return res.as_view() else: if result.get('reason') == "exists": return self.render(self.template_path, containername = self.containername, clustername = self.clustername, image = self.imagename, user = session['username'], description = self.description, masterip=masterip) else: return self.render(self.error_path, message = result.get('message')) else: self.error()
def internal_server_error(error): logger.error(error) logger.error(traceback.format_exc()) if "username" in session: if "500" in session and "500_title" in session: reason = session['500'] title = session['500_title'] session.pop('500', None) session.pop('500_title', None) else: reason = '''The server encountered something unexpected that didn't allow it to complete the request. We apologize.You can go back to <a href="/dashboard/">dashboard</a> or <a href="/logout">log out</a>''' title = 'Internal Server Error' return render_template('error/500.html', mysession = session, reason = reason, title = title) else: return redirect('/login/')
def login(): """ This login function checks if the username & password match the admin.db; if the authentication is successful, it passes the id of the user into login_user() """ if request.method == "POST" and \ "username" in request.form and \ "password" in request.form: username = request.form["username"] password = request.form["password"] user = User.get(username) # If we found a user based on username then compare that the submitted # password matches the password in the database. The password is stored # is a slated hash format, so you must hash the password before comparing it. if user and hash_pass(password) == user.password: login_user(user, remember=True) # FIXME! Get this to work properly... # return redirect(request.args.get("next") or url_for("index")) return redirect(url_for("index")) else: flash(u"Invalid username, please try again.") return render_template("login.html")
def changepass(): if request.method == 'POST': # process password change if request.form['pass1'] == request.form['pass2']: change_password(session['username'], request.form['pass1']) log_action(session['uid'], 8) session.pop('logged_in', None) session.pop('uid', None) session.pop('priv', None) session.pop('username', None) flash('Your password has been changed. Please login using your new password.') return redirect(url_for('home')) else: flash('The passwords you entered do not match. Please try again.') return render_template('changepass.html') return render_template('changepass.html') # # EDIT USER PAGE #
def save_config(self): if not self.is_authenticated(): return redirect(url_for('login')) if (config['CONFIG_PATH'] is not None and os.path.isfile(config['CONFIG_PATH'])): config_path = config['CONFIG_PATH'] else: config_path = os.path.join(config['ROOT_PATH'], 'config.json') with open(config_path, 'w') as f: data = {'GOOGLEMAPS_KEY': config['GOOGLEMAPS_KEY'], 'LOCALE': config['LOCALE'], 'CONFIG_PASSWORD': config['CONFIG_PASSWORD'], 'SCAN_LOCATIONS': self.scan_config.SCAN_LOCATIONS.values(), 'ACCOUNTS': config['ACCOUNTS']} f.write(json.dumps(data))
def drop_show(): """Show removal handler. Attempts to remove a show from the backend system. The show ID must be passed in the ``id`` query string. If the user if unauthenticated, the function is aborted with a ``404`` message to hide the page. Returns: An HTTP redirect to the home page, to refresh. """ log.debug("Entering drop_show, trying to remove show from list.") if fe.check_login_id(escape(session['logged_in'])): log.debug("Sending show ID {0} to function".format(request.args['id'])) fe.remove_show(request.args['id']) log.debug("Refreshing user's page.") return redirect('/') log.debug("User cannot be authenticated, send 404 to hide page.") abort(404)
def authorize_view(self): """Flask view that starts the authorization flow. Starts flow by redirecting the user to the OAuth2 provider. """ args = request.args.to_dict() # Scopes will be passed as mutliple args, and to_dict() will only # return one. So, we use getlist() to get all of the scopes. args['scopes'] = request.args.getlist('scopes') return_url = args.pop('return_url', None) if return_url is None: return_url = request.referrer or '/' flow = self._make_flow(return_url=return_url, **args) auth_url = flow.step1_get_authorize_url() return redirect(auth_url)
def article(): site_info = site_get() article_id = request.args.get('article_id',0) if article_id != 0: article = Article.query.filter_by(id = article_id).first() if article is not None: article = article.__dict__ article_id = article['id'] title = article['title'] packet_id = article['packet_id'] show = article['show'] timestamp = article['timestamp'] body = article['body'][:-1] else: return redirect(url_for('main.index')) return render_template('article.html', **locals())
def test_flash_signal(self): app = flask.Flask(__name__) app.config['SECRET_KEY'] = 'secret' @app.route('/') def index(): flask.flash('This is a flash message', category='notice') return flask.redirect('/other') recorded = [] def record(sender, message, category): recorded.append((message, category)) flask.message_flashed.connect(record, app) try: client = app.test_client() with client.session_transaction(): client.get('/') self.assert_equal(len(recorded), 1) message, category = recorded[0] self.assert_equal(message, 'This is a flash message') self.assert_equal(category, 'notice') finally: flask.message_flashed.disconnect(record, app)
def email(self): try: to_email = request.args.get('to_email') path, attr = configuration.get('email', 'EMAIL_BACKEND').rsplit('.', 1) logging.info("path: " + str(path)) logging.info("attr: " + str(attr)) module = importlib.import_module(path) logging.info("module: " + str(module)) backend = getattr(module, attr) backend(to_email, "Test Email", "Test Email", files=None, dryrun=False) flash('Email Sent') except Exception as e: flash('Failed to Send Email: ' + str(e), 'error') return redirect("/admin/admintools", code=302)
def signup(): from forms import SignupForm form = SignupForm() if form.validate_on_submit(): user = User.query.filter_by(email=form.email.data.lower()).first() if user is not None: form.email.errors.append("The Email address is already taken.") return render_template('signup.html', form=form) newuser = User(form.firstname.data,form.lastname.data,form.email.data,form.password.data) db.session.add(newuser) db.session.commit() session['email'] = newuser.email return redirect(url_for('login')) return render_template('signup.html', form=form)
def login(): if g.user is not None and g.user.is_authenticated: return redirect(url_for('index')) from app.forms import LoginForm form = LoginForm() if form.validate_on_submit(): session['remember_me'] = form.remember_me.data user = User.query.filter_by(email=form.email.data.lower()).first() if user and user.check_password(form.password.data): session['email'] = form.email.data login_user(user,remember=session['remember_me']) return redirect(url_for('index')) else: return render_template('login.html',form=form,failed_auth=True) return render_template('login.html',form=form)
def manage_user_login(user, user_data, next_url): """Manage user login.""" if user is None: user = user_repo.get_by_name(user_data['screen_name']) msg, method = get_user_signup_method(user) flash(msg, 'info') if method == 'local': return redirect(url_for('account.forgot_password')) else: return redirect(url_for('account.signin')) login_user(user, remember=True) flash("Welcome back %s" % user.fullname, 'success') if ((user.email_addr != user.name) and user.newsletter_prompted is False and newsletter.is_initialized()): return redirect(url_for('account.newsletter_subscribe', next=next_url)) if user.email_addr != user.name: return redirect(next_url) else: flash("Please update your e-mail address in your profile page") return redirect(url_for('account.update_profile', name=user.name))
def manage_user_login(user, user_data, next_url): """Manage user login.""" if user is None: # Give a hint for the user user = user_repo.get_by(email_addr=user_data.get('email')) if user is not None: msg, method = get_user_signup_method(user) flash(msg, 'info') if method == 'local': return redirect(url_for('account.forgot_password')) else: return redirect(url_for('account.signin')) else: return redirect(url_for('account.signin')) else: login_user(user, remember=True) flash("Welcome back %s" % user.fullname, 'success') request_email = (user.email_addr == user.name) if request_email: flash("Please update your e-mail address in your profile page") return redirect(url_for('account.update_profile', name=user.name)) if (not request_email and user.newsletter_prompted is False and newsletter.is_initialized()): return redirect(url_for('account.newsletter_subscribe', next=next_url)) return redirect(next_url)
def manage_user_login(user, user_data, next_url): """Manage user login.""" if user is None: # Give a hint for the user user = user_repo.get_by(email_addr=user_data['email']) if user is None: name = username_from_full_name(user_data['name']) user = user_repo.get_by_name(name) msg, method = get_user_signup_method(user) flash(msg, 'info') if method == 'local': return redirect(url_for('account.forgot_password')) else: return redirect(url_for('account.signin')) else: login_user(user, remember=True) flash("Welcome back %s" % user.fullname, 'success') if user.newsletter_prompted is False and newsletter.is_initialized(): return redirect(url_for('account.newsletter_subscribe', next=next_url)) return redirect(next_url)
def password_required(short_name): (project, owner, n_tasks, n_task_runs, overall_progress, last_activity, n_results) = project_by_shortname(short_name) form = PasswordForm(request.form) if request.method == 'POST' and form.validate(): password = request.form.get('password') cookie_exp = current_app.config.get('PASSWD_COOKIE_TIMEOUT') passwd_mngr = ProjectPasswdManager(CookieHandler(request, signer, cookie_exp)) if passwd_mngr.validates(password, project): response = make_response(redirect(request.args.get('next'))) return passwd_mngr.update_response(response, project, get_user_id_or_ip()) flash(gettext('Sorry, incorrect password')) return render_template('projects/password.html', project=project, form=form, short_name=short_name, next=request.args.get('next'))
def publish(short_name): (project, owner, n_tasks, n_task_runs, overall_progress, last_activity, n_results) = project_by_shortname(short_name) #### shruthi if("sched" in project.info.keys() and project.info["sched"]=="FRG"): if(project.owner_id==current_user.id and not cached_users.is_quiz_created(current_user.id, project)): flash("You did not created quiz.Please create the quiz","danger") return redirect(url_for('quiz.create_quiz', short_name=project.short_name)) #### end pro = pro_features() ensure_authorized_to('publish', project) if request.method == 'GET': return render_template('projects/publish.html', project=project, pro_features=pro) project.published = True project_repo.save(project) task_repo.delete_taskruns_from_project(project) result_repo.delete_results_from_project(project) webhook_repo.delete_entries_from_project(project) auditlogger.log_event(project, current_user, 'update', 'published', False, True) flash(gettext('Project published! Volunteers will now be able to help you!')) return redirect(url_for('.details', short_name=project.short_name))
def create_post(): post_data = { 'title': request.form.get('title'), 'content': request.form.get('content'), } post = Post() post.set(post_data) post = markdown(post) upload_image = request.files.get('featured_image') if upload_image.filename != '' and allowed_file(upload_image.filename): f = Attachment(upload_image.filename, data=upload_image.stream) post.set('featured_image', f) post.save() tag_names = request.form.get('tags').lower().strip() tags = [get_tag_by_name(x) for x in split_tag_names(tag_names)] map_tags_to_post(tags, post) return redirect(url_for('show_post', post_id=post.id))
def index(): code = request.args.get("code", "") #app.logger.debug("code:%s" %code) #app.logger.debug(request.args) if g.signin: return "logged_in" elif code: _data = Get_Access_Token(code) access_token = _data['access_token'] userData = Get_User_Info(access_token) app.logger.debug(userData) #resp = render_template('info.html', userData=userData) #resp.set_cookie(key="logged_in", value='true', expires=None) resp = jsonify(userData) resp.set_cookie(key="logged_in", value='true', expires=None) return resp else: return redirect(url_for("login"))
def index(): code = request.args.get("code", "") #app.logger.debug("code:%s" %code) #app.logger.debug(request.args) if g.signin: return "logged_in" elif code: _data = Get_Access_Token(code) app.logger.debug(_data) access_token = _data['access_token'] uid = _data['uid'] userData = Get_User_Info(access_token, uid) app.logger.debug(userData) #resp = render_template('info.html', userData=userData) #resp.set_cookie(key="logged_in", value='true', expires=None) resp = jsonify(userData) resp.set_cookie(key="logged_in", value='true', expires=None) return resp else: return redirect(url_for("login"))
def index(): code = request.args.get("code", "") #app.logger.debug("code:%s" %code) #app.logger.debug(request.args) if g.signin: return "logged_in" elif code: _data = Get_Access_Token(code) access_token = _data['access_token'] openid = Get_OpenID(access_token)['openid'] userData = Get_User_Info(access_token, openid) app.logger.debug(userData) #resp = render_template('info.html', userData=userData) #resp.set_cookie(key="logged_in", value='true', expires=None) resp = jsonify(userData) resp.set_cookie(key="logged_in", value='true', expires=None) return resp else: return redirect(url_for("login"))
def login(): if current_user.is_authenticated: return redirect(url_for('index')) form = LoginForm() if not form.validate_on_submit(): status_code = Unauthorized.code if form.is_submitted() else 200 return render_template('login.html', title='Login', form=form, User=User, password_length={'min': TRACKER_PASSWORD_LENGTH_MIN, 'max': TRACKER_PASSWORD_LENGTH_MAX}), status_code user = user_assign_new_token(form.user) user.is_authenticated = True login_user(user) return redirect(url_for('index'))
def admin_login_required(method): def is_admin(user): if isinstance(user.is_admin, bool): return user.is_admin else: return user.is_admin() @functools.wraps(method) def wrapper(*args, **kwargs): if not current_user.is_authenticated: flash("This section is for logged in users only.", 'warning') return redirect(url_for('redberry.home')) if not hasattr(current_user, 'is_admin'): flash("Redberry expects your user instance to implement an `is_admin` boolean attribute " "or an `is_admin()` method.", 'warning') return redirect(url_for('redberry.home')) if not is_admin(current_user): flash("This section is for admin users only.", 'warning') return redirect(url_for('redberry.home')) return method(*args, **kwargs) return wrapper ############ # CMS ROUTES ############
def show_post(slug): from redberry.models import RedPost post = RedPost.query.filter_by(slug=slug).first() if not post: flash("Post not found!", 'danger') return redirect(url_for('redberry.home')) return render_redberry('redberry/post.html', post=post)
def show_category(category_slug): from redberry.models import RedCategory category = RedCategory.query.filter_by(slug=category_slug).first() if not category: flash("Category not found!", 'danger') return redirect(url_for('redberry.home')) return render_redberry('redberry/category.html', category=category)
def new_record(model_name): from redberry.models import RedCategory, RedPost from redberry.forms import CategoryForm, PostForm if model_name == 'category': form = CategoryForm() new_record = RedCategory() elif model_name == 'post': form = PostForm() new_record = RedPost() # Convert category ids into objects for saving in the relationship. if form.categories.data: form.categories.data = RedCategory.query.filter(RedCategory.id.in_(form.categories.data)).all() form.categories.choices = [(c, c.title) for c in RedCategory.sorted()] else: form.categories.choices = [(c.id, c.title) for c in RedCategory.sorted()] if form.validate_on_submit(): form.populate_obj(new_record) cms.config['db'].session.add(new_record) cms.config['db'].session.flush() build_sitemap() flash("Saved %s %s" % (model_name, new_record.id), 'success') return redirect(url_for('redberry.admin', model_name=model_name)) return render_template('redberry/admin/form.html', form=form, model_name=model_name)
def dev_login(user_id): if ENVIRONMENT == 'dev': login_user(db.session.query(User).get(user_id)) return redirect(url_for('index'))
def logout(): logout_user() return flask.redirect(flask.url_for('index'))
def post(self): dockletRequest.post('/cloud/account/add/', request.form) return redirect('/cloud/')
def post(self): data = { 'cloudname' : self.cloudname, } dockletRequest.post('/cloud/account/delete/', data) return redirect('/cloud/')
def post(self): dockletRequest.post('/cloud/account/modify/', request.form) return redirect('/cloud/')
def get(self): if is_authenticated(): refreshInfo() return redirect(request.args.get('next',None) or '/dashboard/') if (env.getenv('EXTERNAL_LOGIN') == 'True'): url = external_generate.external_login_url link = external_generate.external_login_link else: link = '' url = '' return render_template(self.template_path, link = link, url = url, open_registry=self.open_registry)
def login_required(func): @wraps(func) def wrapper(*args, **kwargs): if request.method == 'POST' : if not is_authenticated(): abort(401) else: return func(*args, **kwargs) else: if not is_authenticated(): return redirect("/login/" + "?next=" + request.path) else: return func(*args, **kwargs) return wrapper
def post(self): form = dict(request.form) if (request.form.get('username') == None or request.form.get('password') == None or request.form.get('password') != request.form.get('password2') or request.form.get('email') == None or request.form.get('description') == None): abort(500) result = dockletRequest.unauthorizedpost('/register/', form) return redirect("/login/")
def post(self): data = {"number":request.form["number"],"reason":request.form["reason"]} result = dockletRequest.post('/beans/apply/',data) success = result.get("success") if success == "true": return redirect("/beans/application/") else: return self.render(self.template_path, message = result.get("message"))
def post(cls): dockletRequest.post('/notification/create/', request.form) # return redirect('/admin/') return redirect('/notification/')