我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用bottle.request.forms()。
def update_password(): username = request.forms.get('username', None) password = request.forms.get('password', None) new_pass = request.forms.get('new_pass', None) dict_for_return = dict(universal_POST_dict) if not verify_login(username, password): dict_for_return['verified'] = False dict_for_return['message_str'] = "login failed" json_for_return = json.dumps(dict_for_return) return json_for_return else: dbMgr = MmrzSyncDBManager("USERS") dbMgr.update_USERS_DB([username, new_pass]) dbMgr.closeDB() dict_for_return['verified'] = True dict_for_return['message_str'] = "password updated" json_for_return = json.dumps(dict_for_return) return json_for_return
def upload_wordbook(): username = request.forms.get('username', None) password = request.forms.get('password', None) dict_for_return = dict(universal_POST_dict) if not verify_login(username, password): dict_for_return['verified'] = False dict_for_return['message_str'] = "login failed" json_for_return = json.dumps(dict_for_return) return json_for_return else: rows = request.forms['wordbook'] rows = json.loads(rows) dbMgr = MmrzSyncDBManager(username) dbMgr.createDB() dbMgr.pruneDB() for row in rows: dbMgr.insertDB(row) dbMgr.closeDB() dict_for_return['verified'] = True dict_for_return['message_str'] = "upload done" json_for_return = json.dumps(dict_for_return) return json_for_return
def is_state_cache_available(): username = request.forms.get('username', None) password = request.forms.get('password', None) dict_for_return = dict(universal_POST_dict) if not verify_login(username, password): dict_for_return['mmrz_code'] = MMRZ_CODE_Universal_Verification_Fail json_for_return = json.dumps(dict_for_return) return json_for_return else: dbMgr = MongoDBManager() userData = dbMgr.query_memorize_state(username) dbMgr.closeDB() state_cached = userData.get('state_cached', False) dict_for_return['state_cached'] = state_cached json_for_return = json.dumps(dict_for_return) return json_for_return
def restore_remote_saved_state(): username = request.forms.get('username', None) password = request.forms.get('password', None) dict_for_return = dict(universal_POST_dict) if not verify_login(username, password): dict_for_return['mmrz_code'] = MMRZ_CODE_Universal_Verification_Fail json_for_return = json.dumps(dict_for_return) return json_for_return else: dbMgr = MongoDBManager() userData = dbMgr.query_memorize_state(username) dbMgr.closeDB() dict_for_return['mmrz_code'] = MMRZ_CODE_Restore_State_OK dict_for_return['current_cursor'] = int(userData['current_cursor']) dict_for_return['max_size_this_turn'] = int(userData['max_size_this_turn']) dict_for_return['data'] = userData['data'] json_for_return = json.dumps(dict_for_return) return json_for_return
def clear_state_cached_flag_and_eiginvalue(): username = request.forms.get('username', None) password = request.forms.get('password', None) dict_for_return = dict(universal_POST_dict) if not verify_login(username, password): dict_for_return['mmrz_code'] = MMRZ_CODE_Universal_Verification_Fail json_for_return = json.dumps(dict_for_return) return json_for_return else: dbMgr = MongoDBManager() dbMgr.clear_state_cached_flag_and_eiginvalue(username) dbMgr.closeDB() dict_for_return['mmrz_code'] = MMRZ_CODE_Universal_OK json_for_return = json.dumps(dict_for_return) return json_for_return
def update_row(): username = request.forms.get('username', None) password = request.forms.get('password', None) dict_for_return = dict(universal_POST_dict) if not verify_login(username, password): dict_for_return['verified'] = False dict_for_return['message_str'] = "login failed" json_for_return = json.dumps(dict_for_return) return json_for_return else: row = request.forms['row'] update_whole_row = request.forms.get('update_whole_row', False) row = json.loads(row) dbMgr = MmrzSyncDBManager(username) dbMgr.createDB() dbMgr.updateDB(row, update_whole_row) dbMgr.closeDB() dict_for_return['verified'] = True dict_for_return['message_str'] = "Update row success" json_for_return = json.dumps(dict_for_return) return json_for_return
def _request(request, request_fallback=None): ''' Extract request fields wherever they may come from: GET, POST, forms, fallback ''' # Use lambdas to avoid evaluating bottle.request.* which may throw an Error all_dicts = [ lambda: request.json, lambda: request.forms, lambda: request.query, lambda: request.files, #lambda: request.POST, lambda: request_fallback ] request_dict = dict() for req_dict_ in all_dicts: try: req_dict = req_dict_() except KeyError: continue if req_dict is not None and hasattr(req_dict, 'items'): for req_key, req_val in req_dict.items(): request_dict[req_key] = req_val return request_dict
def addcrypted(): package = request.forms.get('referer', 'ClickAndLoad Package') dlc = request.forms['crypted'].replace(" ", "+") dlc_path = join(DL_ROOT, package.replace("/", "").replace("\\", "").replace(":", "") + ".dlc") dlc_file = open(dlc_path, "wb") dlc_file.write(dlc) dlc_file.close() try: PYLOAD.addPackage(package, [dlc_path], 0) except: return HTTPError() else: return "success\r\n"
def set_sskey(): u = current_user u.sskey = request.forms.get('sskey') u.write() import cron; cd = cron.start() if cd <= 0.5: msg = "Your Shadowsocks key is changed!" else: msg = "The Shadowsocks key will be changed in %.2f sec" % cd return get_home_content(msg) # Website for Administrator
def log_in(): username = request.forms.get('username', None) password = request.forms.get('password', None) dict_for_return = dict(universal_POST_dict) if verify_login(username, password): dict_for_return['verified'] = True dict_for_return['message_str'] = "logged in" else: dict_for_return['verified'] = False dict_for_return['message_str'] = "username or password not correct" json_for_return = json.dumps(dict_for_return) return json_for_return
def sign_up(): username = request.forms.get('username', None) password = request.forms.get('password', None) dict_for_return = dict(universal_POST_dict) if not is_username_available(username): dict_for_return['verified'] = False dict_for_return['mmrz_code'] = MMRZ_CODE_Username_Not_Available_Error dict_for_return['message_str'] = "Username not available" elif not validate_username(username): dict_for_return['verified'] = False dict_for_return['mmrz_code'] = MMRZ_CODE_Username_Not_Valid dict_for_return['message_str'] = "Username not valid" elif not validate_password(password): dict_for_return['verified'] = False dict_for_return['mmrz_code'] = MMRZ_CODE_Password_Not_Valid dict_for_return['message_str'] = "Password not valid" else: dbMgr = MmrzSyncDBManager("USERS") dbMgr.insert_USERS_DB([username, password, "", "", "000000", 0, 0, 0]) dbMgr.closeDB() dict_for_return['verified'] = True dict_for_return['mmrz_code'] = MMRZ_CODE_Signup_OK dict_for_return['message_str'] = "Signed up" json_for_return = json.dumps(dict_for_return) return json_for_return
def verify_eiginvalue(): username = request.forms.get('username', None) password = request.forms.get('password', None) dict_for_return = dict(universal_POST_dict) if not verify_login(username, password): dict_for_return['mmrz_code'] = MMRZ_CODE_Universal_Verification_Fail json_for_return = json.dumps(dict_for_return) return json_for_return else: dbMgr = MongoDBManager() userData = dbMgr.query_memorize_state(username) dbMgr.closeDB() rows_length_fromDB = userData.get('rows_length', 0) current_cursor_fromDB = userData.get('current_cursor', 0) rows_length_from_client = request.forms.get('rows_length', 0) current_cursor_from_client = request.forms.get('current_cursor', 0) if rows_length_from_client == rows_length_fromDB and current_cursor_from_client == current_cursor_fromDB: dict_for_return["mmrz_code"] = MMRZ_CODE_SaveState_Same_Eigenvalue else: dict_for_return["mmrz_code"] = MMRZ_CODE_SaveState_Diff_Eigenvalue return json.dumps(dict_for_return)
def save_current_state(): username = request.forms.get('username', None) password = request.forms.get('password', None) dict_for_return = dict(universal_POST_dict) if not verify_login(username, password): dict_for_return['mmrz_code'] = MMRZ_CODE_Universal_Verification_Fail json_for_return = json.dumps(dict_for_return) return json_for_return else: current_state = request.forms.get('current_state', "[]") current_state = json.loads(current_state) rows_length_from_client = request.forms.get('rows_length', 0) current_cursor_from_client = request.forms.get('current_cursor', 0) max_size_this_turn_from_client = request.forms.get('max_size_this_turn', 0) # not save state if state length is 0 if len(current_state) == 0: dict_for_return['mmrz_code'] = MMRZ_CODE_Universal_Error # this error code better changed to something like: MMRZ_CODE_State_Length_Zero json_for_return = json.dumps(dict_for_return) return json_for_return document = { "username": username, "state_cached": True, "rows_length": rows_length_from_client, "current_cursor": current_cursor_from_client, "max_size_this_turn": max_size_this_turn_from_client, "data": current_state, } dbMgr = MongoDBManager() dbMgr.update_memorize_state(document) dbMgr.closeDB() dict_for_return['mmrz_code'] = MMRZ_CODE_SaveState_Save_OK json_for_return = json.dumps(dict_for_return) return json_for_return
def online_import(): username = request.forms.get('username', None) password = request.forms.get('password', None) is_smart = request.forms.get('is_smart', None) is_smart = json.loads(is_smart) quantity = request.forms.get('quantity', None) quantity = int(quantity) dict_for_return = dict(universal_POST_dict) if not verify_login(username, password): dict_for_return['verified'] = False dict_for_return['message_str'] = "login failed" json_for_return = json.dumps(dict_for_return) return json_for_return else: dict_for_return['verified'] = True dict_for_return['message_str'] = "Online import success" fr = open("./WORDBOOK/{0}/data.pkl".format(username), "rb") pkl = pickle.load(fr) fr.close() added = smart_import("./WORDBOOK/{0}/{1}".format(username, pkl["book_name"]), username, quantity, is_smart) dict_for_return['added'] = added json_for_return = json.dumps(dict_for_return) return json_for_return
def tik_tik(): username = request.forms.get('username', None) if not username: return "username is None" localtime = time.localtime() timeStamp = int(time.time()) # ???? Year, Week, Day = datetime.date.fromtimestamp(timeStamp).isocalendar() uniqMinute = timeStamp / 60 # ????? uniqHour = uniqMinute / 60 # ????? uniqDate = uniqHour / 24 # ???? theYear = localtime[0] # ?: 2017? theMonth = localtime[1] # ??: 2 (2017?2?) theDate = localtime[2] # ??: 22 (2?22) theHour = localtime[3] # ???: 17 (17?) theWeek = Week # ??: 8 (???) theDay = Day # ??: 3 (?3) tikMgr = TikTimeDBManager() tikInfo = [username, timeStamp, uniqMinute, uniqHour, uniqDate, theYear, theMonth, theDate, theHour, theWeek, theDay] if uniqMinute != tikMgr.getMaxUniqMinute(username): tikMgr.insertDB(tikInfo) else: pass tikMgr.closeDB() return "tik_tik: OK"
def del_job(): # jid = request.forms['jid'] jid = request.query.jid sched.stop(jid) sched.qdel(jid) return "OK"
def stop(): # jid = request.forms['jid'] jid = request.query.jid sched.stop(jid) return "OK"
def listfiles(): app = request.forms['app'] user = request.forms['user'] cid = request.forms['cid'] mypath = os.path.join(user_dir, user, app, cid) return listdir(mypath)
def execute(): app = request.forms['app'] user = request.forms['user'] cid = request.forms['cid'] desc = request.forms['desc'] np = request.forms['np'] appmod = pickle.loads(request.forms['appmod']) # remove the appmod key del request.forms['appmod'] appmod.write_params(request.forms, user) # if preprocess is set run the preprocessor try: if appmod.preprocess: run_params, _, _ = appmod.read_params(user, cid) base_dir = os.path.join(user_dir, user, app) process.preprocess(run_params, appmod.preprocess, base_dir) if appmod.preprocess == "terra.in": appmod.outfn = "out"+run_params['casenum']+".00" except: return template('error', err="There was an error with the preprocessor") # submit job to queue try: priority = db(users.user==user).select(users.priority).first().priority uid = users(user=user).id jid = sched.qsub(app, cid, uid, np, priority, desc) return str(jid) #redirect("http://localhost:"+str(config.port)+"/case?app="+str(app)+"&cid="+str(cid)+"&jid="+str(jid)) except OSError: return "ERROR: a problem occurred"
def editplotdef(pltid): user = root.authorized() if user != 'admin': return template('error', err="must be admin to edit plots") app = request.forms.app result = db(plots.id==pltid).select().first() params = { 'app': app, 'user': user } return template('plots/edit_plot', params, row=result)
def editplot(pltid): user = root.authorized() if user != 'admin': return template('error', err="must be admin to edit plots") app = request.forms.app title = request.forms.title ptype = request.forms.ptype options = request.forms.options print "updating plot ", pltid, "for app", app plots(pltid).update_record(title=title, ptype=ptype, options=options) db.commit() redirect('/plots/edit?app='+app)
def edit_datasource_post(pltid, dsid): """update datasource for given plot""" user = root.authorized() if user != 'admin': return template('error', err="must be admin to edit plots") app = request.forms.get('app') r = request.forms datasource(id=dsid).update_record(label=r['label'], pltid=pltid, filename=r['fn'], cols=r['cols'], line_range=r['line_range'], data_def=r['data_def']) db.commit() redirect ('/plots/' + str(pltid) + '/datasources?app='+app) params = {'app': app, 'pltid': pltid, 'dsid': dsid} return template('plots/edit_datasource', params)
def delete_datasource(): user = root.authorized() if user != 'admin': return template('error', err="must be admin to edit plots") app = request.forms.get('app') pltid = request.forms.get('pltid') dsid = request.forms.get('dsid') del db.datasource[dsid] db.commit() redirect ('/plots/' + str(pltid) + '/datasources?app='+app)
def create_plot(): user = root.authorized() if user != 'admin': return template('error', err="must be admin to edit plots") app = request.forms.get('app') r = request plots.insert(appid=root.myapps[app].appid, ptype=r.forms['ptype'], title=r.forms['title'], options=r.forms['options']) db.commit() redirect ('/plots/edit?app='+app)
def flashgot(): if request.environ['HTTP_REFERER'] != "http://localhost:9666/flashgot" and request.environ['HTTP_REFERER'] != "http://127.0.0.1:9666/flashgot": return HTTPError() autostart = int(request.forms.get('autostart', 0)) package = request.forms.get('package', None) urls = filter(lambda x: x != "", request.forms['urls'].split("\n")) folder = request.forms.get('dir', None) if package: PYLOAD.addPackage(package, urls, autostart) else: PYLOAD.generateAndAddPackages(urls, autostart) return ""
def add_package(): name = request.forms.get("add_name", "New Package").strip() queue = int(request.forms['add_dest']) links = decode(request.forms['add_links']) links = links.split("\n") pw = request.forms.get("add_password", "").strip("\n\r") try: f = request.files['add_file'] if not name or name == "New Package": name = f.name fpath = join(PYLOAD.getConfigValue("general", "download_folder"), "tmp_" + f.filename) destination = open(fpath, 'wb') copyfileobj(f.file, destination) destination.close() links.insert(0, fpath) except: pass name = name.decode("utf8", "ignore") links = map(lambda x: x.strip(), links) links = filter(lambda x: x != "", links) pack = PYLOAD.addPackage(name, links, queue) if pw: pw = pw.decode("utf8", "ignore") data = {"password": pw} PYLOAD.setPackageData(pack, data)
def edit_package(): try: id = int(request.forms.get("pack_id")) data = {"name": request.forms.get("pack_name").decode("utf8", "ignore"), "folder": request.forms.get("pack_folder").decode("utf8", "ignore"), "password": request.forms.get("pack_pws").decode("utf8", "ignore")} PYLOAD.setPackageData(id, data) return {"response": "success"} except: return HTTPError()
def login_post(): user = request.forms.get("username") password = request.forms.get("password") info = PYLOAD.checkAuth(user, password) if not info: return render_to_response("login.html", {"errors": True}, [pre_processor]) set_session(request, info) return redirect("/")
def test_multipart(self): """ Environ: POST (multipart files and multible values per key) """ fields = [('field1','value1'), ('field2','value2'), ('field2','value3')] files = [('file1','filename1.txt','content1'), ('??','??foo.py', 'ä\nö\rü')] e = tools.multipart_environ(fields=fields, files=files) request = BaseRequest(e) # File content self.assertTrue('file1' in request.POST) self.assertTrue('file1' in request.files) self.assertTrue('file1' not in request.forms) cmp = tob('content1') if sys.version_info >= (3,2,0) else 'content1' self.assertEqual(cmp, request.POST['file1'].file.read()) # File name and meta data self.assertTrue('??' in request.POST) self.assertTrue('??' in request.files) self.assertTrue('??' not in request.forms) self.assertEqual('foo.py', request.POST['??'].filename) self.assertTrue(request.files['??']) self.assertFalse(request.files.file77) # UTF-8 files x = request.POST['??'].file.read() if (3,2,0) > sys.version_info >= (3,0,0): x = x.encode('utf8') self.assertEqual(tob('ä\nö\rü'), x) # No file self.assertTrue('file3' not in request.POST) self.assertTrue('file3' not in request.files) self.assertTrue('file3' not in request.forms) # Field (single) self.assertEqual('value1', request.POST['field1']) self.assertTrue('field1' not in request.files) self.assertEqual('value1', request.forms['field1']) # Field (multi) self.assertEqual(2, len(request.POST.getall('field2'))) self.assertEqual(['value2', 'value3'], request.POST.getall('field2')) self.assertEqual(['value2', 'value3'], request.forms.getall('field2')) self.assertTrue('field2' not in request.files)
def parse(self, body): return request.forms ############################################################ # Base objects ############################################################
def post_new(tgtbox, boxes_rpc): box = _get_box_rpc(tgtbox, boxes_rpc) if box.lock_exists(): return template('error', errormsg="It looks like an experiment is already running on this box. Please wait for it to finish before starting another.") # validate form data form = NewExperimentForm(request.forms) if not form.validate(): return template('new', dict(form=form, box=box)) expname = form.expname.data notes = form.notes.data inifile = form.inifile.data def get_phase(phase): length = phase.length.data stimulus = phase.stimulus.data background = phase.background.data return (length, stimulus, background) phases = [get_phase(p) for p in form.phases if p.enabled.data == 'True'] box.start_experiment(expname, notes, inifile, phases) redirect("/")
def jsonp(view): def wrapped(*posargs, **kwargs): args = {} # if we access the args via get(), # we can get encoding errors... for k in request.forms: args[k] = getattr(request.forms, k) for k in request.query: args[k] = getattr(request.query, k) callback = args.get('callback') status_code = 200 try: result = view(args, *posargs, **kwargs) except (KeyError) as e:#ValueError, AttributeError, KeyError) as e: import traceback, sys traceback.print_exc(file=sys.stdout) result = {'status':'error', 'message':'invalid query', 'details': str(e)} status_code = 403 if callback: result = '%s(%s);' % (callback, json.dumps(result)) if status_code == 200: return result else: abort(status_code, result) return wrapped
def verify_recaptcha_form(): if config['DEBUG']: return True captcha_rs = request.forms.get('g-recaptcha-response') req = requests.get('https://www.google.com/recaptcha/api/siteverify', params = { 'secret': config['RECAPTCHA_SECRET_KEY'], 'response': request.forms.get('g-recaptcha-response'), 'remoteip': request.remote_addr }, verify=True).json() return req.get("success", False)
def account(user=None): if not user: return users.login_required() subscribe = request.forms.get('subscribe') if subscribe: user['subscribed'] = True else: user['subscribed'] = False r.table('users').get(user['identity']).replace(user).run(conn()) users.flash('info', 'Settings updated') return redirect('/account')
def account_delete(user=None): if not user: return users.login_required() if request.forms.get('delete'): # TODO Delete all answers r.table('users').get(user['identity']).delete().run(conn()) for page in config['PAGES']: for question in page.questions: r.table('%s_answers' % question.name).get(user['identity']).delete().run(conn()) users.delete_session() users.flash('info', 'Account deleted') return redirect('/') else: users.flash('info', 'You must check the box') return redirect('/account')
def register_email(): verify = verify_recaptcha_form() if not verify: return template("login.htm", email_login_error = "ReCAPTCHA verification failed") email = request.forms.get('email') msg = 'An email has been sent to the address provided with login information' # validate email # check that user does not already exist user = None for user2 in r.table('users').get_all(["email", email], index = 'identity_check').run(conn()): user = user2 if user: user['authlink'] = str(uuid.uuid4()) r.table('users').replace(user).run(conn()) link = "%s/login/email/%s/%s" % (config['SITE_URL'], email, user['authlink']) users.mail(user, 'login information', REGISTER_EMAIL_HTML.substitute(link=link), REGISTER_EMAIL_TEXT.substitute(link=link)) users.flash('info', msg) else: authlink = uuid.uuid4() user = { "identity_type": "email", "identity": email, "authlink": str(uuid.uuid4()), "subscribed": True, "last_update": r.now(), } r.table('users').insert(user).run(conn()) link = "%s/login/email/%s/%s" % (config['SITE_URL'], email, user['authlink']) users.mail(user, 'confirmation', REGISTER_EMAIL_HTML.substitute(link=link), REGISTER_EMAIL_TEXT.substitute(link=link)) users.flash('info', msg) return redirect('/login')
def update(name, user=None): if not user: return users.login_required() question = census.questions[name] form = question.form(user, request.forms) # TODO: WTForm should automatically figure out request.forms, but no combination of arguments has actually worked if len(request.forms.getall("answer")) > 1: # TODO: malformed input possible here because this is unaware of the question's actual type form.answer.data = request.forms.getall("answer") elif len(request.forms.getall("answer")) == 0: # Check to see if question was answered; if yes, this is an attempt to delete the answer, if not, leave it be if question.answered(user): form.answer.data = [] else: # If it is not just don't do anything return {} else: form.answer.data = request.forms.get("answer") # If only one answer is submitted for checkboxed, this will be returned as a string rather than a list with a # single element if question.CHART_TYPE == 'check' and type(form.answer.data) == str: form.answer.data = [form.answer.data] if form.validate(): res = question.save_answer(user, form.answer.data) return {} return form.errors
def get_salted_password(): password = request.forms.get('password') if not (request.forms.get('md5ed') == '1' and re.match(r'^[a-f\d]{32}$', password)): password = user.salt_password(password) return password # home panel Generator
def admin_cli(): argv = request.forms.get('cmd').split(' ') out, rtn = utils.get_stdout([sys.executable, './cli.py'] + argv) return { "retval": rtn, "output": out }
def admin_tx_query(): tfrom, tto, tsum = [request.forms.get(n) or None for n in ('from', 'to', 'group')] # Make a UID->Username table un = { u.id: u.username for u in user.get_all() } # Make result tresult = {} for uid, pkg, tx, time in traffic.query(min_time=tfrom, max_time=tto, sum=int(tsum)): if not time in tresult: tresult[time] = [] tresult[time].append({ "title": un[uid], "amount": tx }) tdays = tresult.keys() tdays.sort() return [ {"title": tday, "data": tresult[tday]} for tday in tdays ]
def admin_user_add(): u = user.User() u.username, u.salted_password, u.sskey = [request.forms.get(n) for n in ('username', 'password', 'sskey')] u.create() u.write() return { "id": u.id }
def admin_user_passwd(): username, password = [request.forms.get(n) for n in ('username', 'password')] u = user.get_by_username(username) u.salted_password = password u.write() return { "status": "ok" }
def admin_user_sskey(): username, sskey = [request.forms.get(n) for n in ('username', 'value')] u = user.get_by_username(username) u.sskey = sskey u.write() return { "status": "ok" }
def admin_user_port(): username, port = [request.forms.get(n) for n in ('username', 'value')] u = user.get_by_username(username) u.set_port(port) u.write() return { "status": "ok" }
def admin_user_limit(): username, limit = [request.forms.get(n) for n in ('username', 'limit')] limit = json.loads(limit) u = user.get_by_username(username) u.set_meta("limit", limit) u.write() return { "status": "ok" }
def admin_user_suspend(): username, suspend, reason = [request.forms.get(n) for n in ('username', 'suspend', 'reason')] u = user.get_by_username(username) u.suspended = suspend == "1" if reason: datestr = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') u.set_meta("limiter_log", "%s: %s"%(datestr, reason)) u.write() return { "username": username, "suspended": u.suspended } # Login and Logout
def slack_verification_preprocessor(view): """ Run some preliminary authentication - by Slack policy (and, given team privacy considerations), we need to ensure that requests legitimately come from slack. There is an assigned (secret) shared token that we need to compare from our side to the incoming request. we need to: 1) ensure it is present (first assertion) 2) ensure it matches (second assertion) if these two succeed, then we can allow the view to be called. """ def wrapper(db, *args, **kwargs): try: assert('token' in request.forms) except AssertionError: abort(400, "No token provided to authenticate") try: assert(request.forms['token'] == settings.SLACK_VERIFICATION_TOKEN) except AssertionError: abort(401, "Do you even authenticate, bro?") body = view(db, *args, **kwargs) return body return wrapper