我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用bottle.redirect()。
def login_required(perm=None): def _dec(func): def _view(*args, **kwargs): s = request.environ.get('beaker.session') if s.get("name", None) and s.get("authenticated", False): if perm: perms = parse_permissions(s) if perm not in perms or not perms[perm]: if request.headers.get('X-Requested-With') == 'XMLHttpRequest': return HTTPError(403, "Forbidden") else: return redirect("/nopermission") return func(*args, **kwargs) else: if request.headers.get('X-Requested-With') == 'XMLHttpRequest': return HTTPError(403, "Forbidden") else: return redirect("/login") return _view return _dec
def chat_room(): """ ????????? :return: """ # cookie??????request???? username = request.get_cookie("username") # cookie??????????????????? if not username: return redirect("/") # ????????????????????? talk_list = get_talk() return template("chat_room", username=username, talk_list=talk_list)
def talk(): """ ???????????????????????? :return: """ # ????????????get????getunicode??? chat_data = request.POST.getunicode("chat") # ????cookie???? username = request.get_cookie("username") # ?????? talk_time = datetime.now() # ???? save_talk(talk_time, username, chat_data) return redirect("/chat_room")
def show_app(app): # very similar to start_new_job() consider consolidating user = root.authorized() root.set_active(app) # parameters for return template if app not in root.myapps: return template('error', err="app %s is not installed" % (app)) try: params = {} params.update(root.myapps[app].params) params['cid'] = '' params['app'] = app params['user'] = user params['apps'] = root.myapps return template(os.path.join('apps', app), params) except: exc_type, exc_value, exc_traceback = sys.exc_info() print traceback.print_exception(exc_type, exc_value, exc_traceback) redirect('/app/'+app)
def app_save(appid): root.authorized() app = request.forms.app lang = request.forms.language info = request.forms.input_format category = request.forms.category preprocess = request.forms.preprocess postprocess = request.forms.postprocess assets = request.forms.assets if assets == "None": assets = None desc = request.forms.description row = db(db.apps.id==appid).select().first() row.update_record(language=lang, category=category, description=desc, input_format=info, preprocess=preprocess, postprocess=postprocess, assets=assets) db.commit() redirect("/app/"+app) # allow only admin or user to delete apps
def delete_app(appid): user = root.authorized() if user != 'admin': return template('error', err="must be admin to edit app") appname = request.forms.app del_app_dir = request.forms.del_app_dir try: if user == 'admin': # delete entry in DB if del_app_dir == "on": del_files = True else: del_files = False root.myapps[appname].delete(appid, del_files) else: return template("error", err="must be admin") except: exc_type, exc_value, exc_traceback = sys.exc_info() print traceback.print_exception(exc_type, exc_value, exc_traceback) return template("error", err="failed to delete app... did the app load properly?") redirect("/apps")
def view_app(app): user = root.authorized() if app: root.set_active(app) else: redirect('/myapps') if user != 'admin': return template('error', err="must be admin to edit app") cid = request.query.cid result = db(apps.name==app).select().first() params = {} params['app'] = app params['user'] = user params['cid'] = cid #if request.query.edit: # return template('appedit', params, rows=result) #else: try: return template('app', params, rows=result) except: exc_type, exc_value, exc_traceback = sys.exc_info() print traceback.print_exception(exc_type, exc_value, exc_traceback) return template('error', err="there was a problem showing the app template. Check traceback.")
def addapp(): user = root.authorized() if user != 'admin': return template('error', err="must be admin to add app") appname = request.forms.appname input_format = request.forms.input_format # ask for app name category = request.forms.category language = request.forms.language description = request.forms.description command = request.forms.command preprocess = request.forms.preprocess postprocess = request.forms.postprocess # put in db a = apprw.App() #print "user:",user a.create(appname, description, category, language, input_format, command, preprocess, postprocess) # load_apps() needs to be called here in case a user wants to delete # this app just after it has been created... it is called again after # the user uploads a sample input file root.load_apps() redirect('/app/'+appname)
def delete_job(jid): user = root.authorized() app = request.forms.app cid = request.forms.cid state = jobs(jid).state if re.search("/", cid): return template("error", err="only possible to delete cases that you own") if state != "R": path = os.path.join(user_dir, user, app, cid) if os.path.isdir(path): shutil.rmtree(path) root.sched.stop(jid) root.sched.qdel(jid) else: return template("error", err="cannot delete while job is still running") redirect("/jobs")
def delete_f(): user = root.authorized() app = request.forms.app cid = request.forms.cid selected_files = request.forms.selected_files files = selected_files.rstrip(':').split(':') for file in files: path = os.path.join(user_dir, user, app, cid, file) if cid is not None: if os.path.isfile(path): print "removing file:", path os.remove(path) elif os.path.isdir(path): print "removing path:", path shutil.rmtree(path) else: print "ERROR: not removing path:", path, "because cid missing" redirect("/files?cid="+cid+"&app="+app)
def zip_case(): """zip case on machine to prepare for download""" user = root.authorized() import zipfile app = request.query.app cid = request.query.cid base_dir = os.path.join(user_dir, user, app) path = os.path.join(base_dir, cid+".zip") zf = zipfile.ZipFile(path, mode='w', compression=zipfile.ZIP_DEFLATED) sim_dir = os.path.join(base_dir, cid) for fn in os.listdir(sim_dir): zf.write(os.path.join(sim_dir, fn)) zf.close() return static_file(path, root="./") # status = "case compressed" # redirect(request.headers.get('Referer')+"&status="+status)
def login_post_handler(): try: username = bottle.request.forms['username'] password = bottle.request.forms['password'] except KeyError: bottle.abort(400, 'Invalid form.') try: user = model.get_user(username) except KeyError: bottle.redirect('/login?msg=fail') if user['password_hash'] == model.PASSWORDLESS_HASH: if not handler_util.is_admin(): bottle.redirect('/login?msg=fail') else: if not sha256_crypt.verify(password, user['password_hash']): bottle.redirect('/login?msg=fail') handler_util.set_current_username(username) bottle.redirect('/')
def login(): """Authenticate users""" username = post_get('username') password = post_get('password') if not aaa.login(username, password): return dict(ok=False, msg="Username or password invalid") if aaa.current_user.role == 'admin': return dict( ok=True, redirect="/admin" ) else: return dict( ok=True, redirect="/experimenter" )
def assertRedirect(self, target, result, query=None, status=303, **args): env = {'SERVER_PROTOCOL':'HTTP/1.1'} for key in list(args): if key.startswith('wsgi'): args[key.replace('_', '.', 1)] = args[key] del args[key] env.update(args) request.bind(env) bottle.response.bind() try: bottle.redirect(target, **(query or {})) except bottle.HTTPResponse: r = _e() self.assertEqual(status, r.status_code) self.assertTrue(r.headers) self.assertEqual(result, r.headers['Location'])
def require_login(func): def func_wrapper(*args, **kwargs): global current_user, is_admin uid_str = request.get_cookie('ssl_uid') password = request.get_cookie('ssl_pw') logined = uid_str and password if logined: current_user = user.get_by_id(int(uid_str)) logined = current_user and current_user.salted_password == password is_admin = logined and current_user.id == config.USER_ADMIN if not logined: response.set_cookie('ssl_uid', '', expires=0) response.set_cookie('ssl_pw', '', expires=0) return redirect('/login') return func(*args, **kwargs) return func_wrapper # decorator that used for user json APIs. Decorated function shall return a dict.
def do_login(): username = request.forms.get('username') password = get_salted_password() current_user = user.get_by_username(username) logined = current_user and current_user.salted_password == password if logined: response.set_cookie('ssl_uid', str(current_user.id)) response.set_cookie('ssl_pw', password) return redirect('/') return template('login', username=username, message='User not found.' if not current_user else 'Password is incorrect.', salt=config.USER_SALT )
def show_post(permalink="notfound"): cookie = bottle.request.get_cookie("session") username = sessions.get_username(cookie) permalink = cgi.escape(permalink) print "about to query on permalink = ", permalink post = posts.get_post_by_permalink(permalink) if post is None: bottle.redirect("/post_not_found") # init comment form fields for additional comment comment = {'name': "", 'body': "", 'email': ""} return bottle.template("entry_template", dict(post=post, username=username, errors="", comment=comment)) # used to process a comment on a blog post
def process_signup(): email = bottle.request.forms.get("email") username = bottle.request.forms.get("username") password = bottle.request.forms.get("password") verify = bottle.request.forms.get("verify") # set these up in case we have an error case errors = {'username': cgi.escape(username), 'email': cgi.escape(email)} if validate_signup(username, password, verify, email, errors): if not users.add_user(username, password, email): # this was a duplicate errors['username_error'] = "Username already in use. Please choose another" return bottle.template("signup", errors) session_id = sessions.start_session(username) print session_id bottle.response.set_cookie("session", session_id) bottle.redirect("/welcome") else: print "user did not validate" return bottle.template("signup", errors)
def present_welcome(): # check for a cookie, if present, then extract value cookie = bottle.request.get_cookie("session") username = sessions.get_username(cookie) # see if user is logged in if username is None: print "welcome: can't identify user...redirecting to signup" bottle.redirect("/signup") return bottle.template("welcome", {'username': username}) # Helper Functions #extracts the tag from the tags form element. an experience python programmer could do this in fewer lines, no doubt
def show_post(permalink="notfound"): cookie = bottle.request.get_cookie("session") username = sessions.get_username(cookie) permalink = cgi.escape(permalink) print "about to query on permalink = ", permalink post = posts.get_post_by_permalink(permalink) if post is None: bottle.redirect("/post_not_found") # init comment form fields for additional comment comment = {'name': "", 'body': "", 'email': ""} return bottle.template("entry_template", {"post": post, "username": username, "errors": "", "comment": comment}) # used to process a comment on a blog post
def post_comment_like(): permalink = bottle.request.forms.get("permalink") permalink = cgi.escape(permalink) comment_ordinal_str = bottle.request.forms.get("comment_ordinal") comment_ordinal = int(comment_ordinal_str) post = posts.get_post_by_permalink(permalink) if post is None: bottle.redirect("/post_not_found") return # it all looks good. increment the ordinal posts.increment_likes(permalink, comment_ordinal) bottle.redirect("/post/" + permalink)
def process_signup(): email = bottle.request.forms.get("email") username = bottle.request.forms.get("username") password = bottle.request.forms.get("password") verify = bottle.request.forms.get("verify") # set these up in case we have an error case errors = {'username': cgi.escape(username), 'email': cgi.escape(email)} if validate_signup(username, password, verify, email, errors): if not users.add_user(username, password, email): # this was a duplicate errors['username_error'] = ("Username already in use. " + "Please choose another") return bottle.template("signup", errors) session_id = sessions.start_session(username) print session_id bottle.response.set_cookie("session", session_id) bottle.redirect("/welcome") else: print "user did not validate" return bottle.template("signup", errors)
def present_welcome(): # check for a cookie, if present, then extract value cookie = bottle.request.get_cookie("session") username = sessions.get_username(cookie) # see if user is logged in if username is None: print "welcome: can't identify user...redirecting to signup" bottle.redirect("/signup") return bottle.template("welcome", {'username': username}) # Helper Functions # extracts the tag from the tags form element. An experienced python # programmer could do this in fewer lines, no doubt
def callback(): """ Step 3: Retrieving an access token. The user has been redirected back from the provider to your registered callback URL. With this redirection comes an authorization code included in the redirect URL. We will use that to obtain an access token. NOTE: your server name must be correctly configured in order for this to work, do this by adding the headers at your http layer, in particular: X_FORWARDED_HOST, X_FORWARDED_PROTO so that bottle can render the correct url and links for you. """ oauth2session = OAuth2Session(settings.SLACK_OAUTH['client_id'], state=request.GET['state']) #PII - update privacy policy if oauth2 token is stored. token = oauth2session.fetch_token( settings.SLACK_OAUTH['token_url'], client_secret=settings.SLACK_OAUTH['client_secret'], authorization_response=request.url ) # we don't need the token, we just need the user to have installed the app # in the future, if we need tokens, we'll get them. redirect('/?added_to_slack=true')
def welcome(): if request.params.get('req_thing') == "version_info": return version_info() username = request.get_cookie('username') password = request.get_cookie('password') password = urllib.unquote(password) if password else None if verify_login(username, password): redirect('/memorize') else: return dict(universal_ROUTE_dict)
def mmrz(): username = request.get_cookie('username') password = request.get_cookie('password') password = urllib.unquote(password) if password else None if not verify_login(username, password): redirect('/') # need_https = "localhost" not in request.url need_https = False return_dict = dict(universal_ROUTE_dict) return_dict.update(dict(need_https=need_https)) return return_dict
def setting(): username = request.get_cookie('username') password = request.get_cookie('password') password = urllib.unquote(password) if password else None if not verify_login(username, password): redirect('/') dbMgr = MmrzSyncDBManager("USERS") users = dbMgr.read_USERS_DB_DICT() dbMgr.closeDB() return_dict = dict(universal_ROUTE_dict) return_dict.update({"username": username, "mailAddr": "" if users[username]["mailAddr"] == None else users[username]["mailAddr"]}) return return_dict
def dict_short(): query = request.urlparts.query url = '/dictionary?{0}'.format(query) if query else '/dictionary' redirect(url)
def show_favoritebook(): username = request.params.get('username', None) # username not available means username exist, connect it if not is_username_available(username): dbMgr = MmrzSyncDBManager(username) rows = dbMgr.read_FAVOURITE_DB() dbMgr.closeDB() # else user name not exist, redirect to / else: redirect('/') rows_for_return = [] for row in rows: row = list(row) wordID = row[0] word = row[1] pronounce = row[2] favourite = row[3] rows_for_return.append(row) return_dict = dict(universal_ROUTE_dict) return_dict.update(dict(rows=rows_for_return)) return return_dict
def static(file): return redirect("/static/" + file)
def add_user(user): """Trace back the user's repositories to their origins. Submit all found organizations. """ redirect("/organization/" + user)
def get_user(user, ending): """Return the status of this github user. - what are the first-timer repositories? """ redirect("/organization/" + user + "." + ending)
def get_all_users(ending): """All the users and their statuses.""" redirect("/organizations." + ending)
def get_source_redirect(): """Get the source code as a zip file.""" redirect(ZIP_PATH)
def slashify(): bottle.redirect(bottle.request.path + "/", 301)
def enter(): """ ?????????? ????????cookie?????????????????????? :return: """ # POST?????? username = request.POST.get("username") print("POST username is ", username) # cookie???? response.set_cookie("username", username) return redirect("/chat_room")
def get_load_apps(): root.load_apps() redirect('/myapps')
def removeapp(): user = root.authorized() uid = users(user=user).id app = request.forms.app appid = apps(name=app).id auid = app_user(uid=uid, appid=appid).id del app_user[auid] print "removing user", user, uid, "access to app", app, appid db.commit() redirect('/myapps')
def post_login(): if not config.auth: return "ERROR: authorization disabled. Change auth setting in config.py to enable" s = request.environ.get('beaker.session') row = users(user=request.forms.get('user').lower()) pw = request.forms.passwd err = "<p>Login failed: wrong username or password</p>" # if password matches, set the USER_ID_SESSION_KEY hashpw = hashlib.sha256(pw).hexdigest() try: if hashpw == row.passwd: # set session key s[USER_ID_SESSION_KEY] = row.user.lower() s.save() else: return err except: exc_type, exc_value, exc_traceback = sys.exc_info() print traceback.print_exception(exc_type, exc_value, exc_traceback) return err # if referred to login from another page redirect to referring page referrer = request.forms.referrer if referrer: redirect('/'+referrer) else: redirect('/myapps')
def logout(): s = request.environ.get('beaker.session') s.delete() try: return template('logout', {'oauth_client_id': config.oauth_client_id}) except: redirect('/login')
def create_container(id): print "creating container:", id cli = docker.Client(base_url=base_url) host_port_number = int(request.forms.host_port_number) container_port_number = int(request.forms.container_port_number) try: cli.create_container(image=id, ports=[host_port_number], host_config=cli.create_host_config(port_bindings={host_port_number:container_port_number})) alert = "SUCCESS: container created " + id except Exception as e: alert = "ERROR: failed to start container " + str(e) redirect("/docker?alert="+alert) # don't think we want to have this option # @dockerMod.route('/docker/remove_image/<id:path>', method='GET') # def remove_image(id): # print "removing image:", id # cli = docker.Client(base_url=base_url) # try: # msg = cli.remove_image(image=id) # alert = "SUCCESS: image removed " + id # except: # alert = "ERROR: unable to remove image " + id + \ # " Either has dependent child images, or a container is running." + \ # " Remove the container and retry." # redirect("/docker?alert="+alert)
def start_container(id): print "starting container:", id cli = docker.Client(base_url=base_url) try: cli.start(container=id) alert = "SUCCESS: started container " + id except: alert = "ERROR: failed to start container " + id redirect("/docker?alert="+alert)
def stop_container(id): cli = docker.Client(base_url=base_url) try: cli.stop(container=id) alert = "SUCCESS: stopped container " + id except: alert = "ERROR stopping container " + id redirect("/docker?alert="+alert)
def remove_container(id): print "removing container:", id cli = docker.Client(base_url=base_url) try: cli.remove_container(id) alert = "SUCCESS: removed container " + id except: alert = "ERROR: problem removing container " + id redirect("/docker?alert="+alert)
def star_case(): root.authorized() jid = request.forms.jid jobs(id=jid).update_record(starred="True") db.commit() redirect('/jobs')
def unstar_case(): root.authorized() jid = request.forms.jid jobs(id=jid).update_record(starred="False") db.commit() redirect('/jobs')
def share_case(): root.authorized() jid = request.forms.jid jobs(id=jid).update_record(shared="True") db.commit() # increase count in database for every user for u in db().select(users.ALL): nmsg = users(user=u.user).new_shared_jobs or 0 users(user=u.user).update_record(new_shared_jobs=nmsg+1) db.commit() redirect('/jobs')
def unshare_case(): root.authorized() jid = request.forms.jid jobs(id=jid).update_record(shared="False") db.commit() redirect('/jobs')
def stop_job(): root.authorized() app = request.forms.app cid = request.forms.cid jid = request.forms.jid if re.search("/", cid): return template("error", err="only possible to stop cases that you own") root.sched.stop(jid) time.sleep(0.1) jobs(jid).update_record(state="X") db.commit() redirect("/case?app="+app+"&cid="+cid+"&jid="+jid)
def editplotdefs(): user = root.authorized() if user != 'admin': return template('error', err="must be admin to edit plots") app = request.query.app if config.auth and not root.authorized(): redirect('/login') if app not in root.myapps: redirect('/apps') query = (root.apps.id==plots.appid) & (root.apps.name==app) result = db(query).select() params = { 'app': app, 'user': user } return template('plots/plotdefs', params, rows=result)