我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用flask.request()。
def test_manual_context_binding(self): app = flask.Flask(__name__) @app.route('/') def index(): return 'Hello %s!' % flask.request.args['name'] ctx = app.test_request_context('/?name=World') ctx.push() self.assert_equal(index(), 'Hello World!') ctx.pop() try: index() except RuntimeError: pass else: self.assert_true(0, 'expected runtime error')
def launch_lti() -> t.Any: """Do a LTI launch. .. :quickref: LTI; Do a LTI Launch. """ lti = { 'params': CanvasLTI.create_from_request(flask.request).launch_params, 'exp': datetime.datetime.utcnow() + datetime.timedelta(minutes=1) } return flask.redirect( '{}/lti_launch/?inLTI=true&jwt={}'.format( app.config['EXTERNAL_URL'], urllib.parse.quote( jwt.encode( lti, app.config['LTI_SECRET_KEY'], algorithm='HS512' ).decode('utf8') ) ) )
def index(): if request.args.get('code'): unlock_code = request.args.get('code') # unlock, new password re = s.query(RecoveryEmail).filter(RecoveryEmail.password_code==unlock_code).one_or_none() if re: jid = re.jid re.password_code = None s.merge(re) s.commit() # set new password and send email email_address = re.email password = ''.join(random.SystemRandom().choice(string.ascii_lowercase + string.ascii_uppercase + string.digits) for _ in range(10)) p = subprocess.Popen(['/usr/bin/prosodyctl', 'passwd', jid], stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.STDOUT) args = bytes("%s\n%s\n" % (password, password), encoding='utf8') p.communicate(args) sendMail(email_address, 'new password', password) content = render_template('success.html', message='password was sent') else: content = render_template('error.html', message='link invalid') else: content = render_template('index.html') return content
def test_greenlet_context_copying_api(self): app = flask.Flask(__name__) greenlets = [] @app.route('/') def index(): reqctx = flask._request_ctx_stack.top.copy() @flask.copy_current_request_context def g(): self.assert_true(flask.request) self.assert_equal(flask.current_app, app) self.assert_equal(flask.request.path, '/') self.assert_equal(flask.request.args['foo'], 'bar') return 42 greenlets.append(greenlet(g)) return 'Hello World!' rv = app.test_client().get('/?foo=bar') self.assert_equal(rv.data, b'Hello World!') result = greenlets[0].run() self.assert_equal(result, 42) # Disable test if we don't have greenlets available
def api_send_loves(): sender = request.form.get('sender') recipients = sanitize_recipients(request.form.get('recipient')) message = request.form.get('message') try: recipients = send_loves(recipients, message, sender_username=sender) recipients_display_str = ', '.join(recipients) link_url = create_love_link(recipients_display_str, message).url return make_response( u'Love sent to {}! Share: {}'.format(recipients_display_str, link_url), LOVE_CREATED_STATUS_CODE, {} ) except TaintedLove as exc: return make_response( exc.user_message, LOVE_FAILED_STATUS_CODE if exc.is_error else LOVE_CREATED_STATUS_CODE, {} )
def sent(): link_id = request.args.get('link_id', None) recipients_str = request.args.get('recipients', None) message = request.args.get('message', None) if not link_id or not recipients_str or not message: return redirect(url_for('home')) recipients = sanitize_recipients(recipients_str) loved = [ Employee.get_key_for_username(recipient).get() for recipient in recipients ] return render_template( 'sent.html', current_time=datetime.utcnow(), current_user=Employee.get_current_employee(), message=message, loved=loved, url='{0}l/{1}'.format(config.APP_BASE_URL, link_id), )
def test_greenlet_context_copying(self): app = flask.Flask(__name__) greenlets = [] @app.route('/') def index(): reqctx = flask._request_ctx_stack.top.copy() def g(): self.assert_false(flask.request) self.assert_false(flask.current_app) with reqctx: self.assert_true(flask.request) self.assert_equal(flask.current_app, app) self.assert_equal(flask.request.path, '/') self.assert_equal(flask.request.args['foo'], 'bar') self.assert_false(flask.request) return 42 greenlets.append(greenlet(g)) return 'Hello World!' rv = app.test_client().get('/?foo=bar') self.assert_equal(rv.data, b'Hello World!') result = greenlets[0].run() self.assert_equal(result, 42)
def login(): username = request.headers.get('username') password = request.headers.get('password') if username is None or password is None: raise InvalidRequest() user = UsersCollection().find_one({'username': username}) if user is None: raise AuthFailed() is_valid = check_password_hash(user['password_hash'], password) if not is_valid: raise AuthFailed() return jsonify({'token': UserJWT.new(username, user['scope'])})
def fixUrl(destinationPort, transport, url, peerType): """ fixes the URL (original request string) """ transportProtocol = "" if transport.lower() in "udp" or transport.lower() in "tcp": transportProtocol="/"+transport if ("honeytrap" in peerType): return "Attack on port " + str(destinationPort) + transportProtocol # prepared dionaea to output additional information in ticker if ("Dionaea" in peerType): return "Attack on port " + str(destinationPort)+ transportProtocol return url
def password_reset_request(): """Request for reseting password when user forget his/her password. """ if not current_user.is_anonymous: return redirect(url_for('.index')) form = PasswordResetRequestForm() if form.validate_on_submit(): user = User.query.filter_by(email=form.email.data).first() if user: token = user.generate_reset_token() send_email(user.email, 'Reset Your Password', 'auth/email/reset_password', user=user, token=token, next=request.args.get('next')) flash('An email with instructions to reset your password has been ' 'sent to you.') return redirect(url_for('auth.login')) return render_template('auth/reset_password.html', form=form)
def inject(): return { 'root': _cfg("protocol") + "://" + _cfg("domain"), 'domain': _cfg("domain"), 'protocol': _cfg("protocol"), 'len': len, 'any': any, 'request': request, 'locale': locale, 'url_for': url_for, 'file_link': file_link, 'disown_link': disown_link, 'user': current_user, 'moe': random.choice(moe), 'random': random, 'owner': _cfg("owner"), 'owner_email': _cfg("owner_email"), '_cfg': _cfg }
def make_embed(request, message="New Transaction"): """ create an embed for logging transactions """ fields = [] fields.append({"name": "ID", "value": request['_id']}) if request['type'] == "deposit": fields.append({"name": "Amount", "value": f"+{request['amount']}"}) elif request['type'] == "withdrawl": fields.append({"name": "Amount", "value": f"-{request['amount']}"}) fields.append({"name": "Reason", "value": request['reason']}) fields.append({"name": "Balance", "value": request['user']['balance']}) fields.append( {"name": "User", "value": f"{request['user']['name']}#{request['user']['discrim']} ({request['user']['_id']})"}) fields.append( {"name": "Bot", "value": f"{request['bot']['name']}#{request['bot']['discrim']} ({request['bot']['_id']})"}) embed = {"title": message, "fields": fields, "timestamp":datetime.now().isoformat()} test = requests.post('https://canary.discordapp.com/api/webhooks/338371642277494786/vG8DJjpXC-NEXB4ZISo1r7QQ0Ras_RaqZbuhzjYOklKu70l73PmumdUCgBruypPv3fQp', json={"embeds": [embed]})
def get_transactions(request): transactions = [] if "type" in request: transactions.append(list(r.table('transactions').filter( r.row['type'] == request['type']).run(conn))) if "amount" in request: transactions.append(list(r.table('transactions').filter( r.row['amount'] == request['amount']).run(conn))) if "bot" in request: transactions.append(list(r.table('transactions').filter( r.row['bot'] == request['bot']).run(conn))) if "user" in request: transactions.append(list(r.table('transactions').filter( r.row['user'] == request['user']).run(conn))) if "reason" in request: transactions.append(list(r.table('transactions').filter( r.row['reason'] == request['reason']).run(conn))) if not "reason" or "user" or "bot" or "amount" or "type" in request: transactions.append(list(r.table('transactions').run(conn))) temp = [] for i in transactions: if i not in temp: temp.append(i) transactions = temp return transactions[:request['limit']]
def create_token(): """ register a new bot, and return the token "bot": { "name": "Alpha Bot", "discrim": "4112", "_id": "331841835733614603" } """ raw_request = flask.request request = flask.request.get_json() token = tokengenerator.URandomTokenGenerator().generate() bot = {"name": request['bot']['name'], "discrim": request['bot']['discrim'], "_id": request['bot']['id'], "owner": request['owner'], "token": token} r.table('bots').insert({"bot": bot}) return flask.jsonify(bot)
def show_transactions(): """ Returns all transactions that match the given parameters (keys in parentheses are optional) { "limit": 20, ("type": "deposit",) ("amount": 200,) ("user": { "name": "Mr Boo Grande", "discrim": "6644", "_id": "209137943661641728" },) ("bot": { "name": "Alpha Bot", "discrim": "4112", "_id": "331841835733614603" },) ("reason": "casino") } """ raw_request = flask.request request = flask.request.get_json() transactions = get_transactions(request) return flask.jsonify(transactions)
def fake_transaction(): """ create a fake transaction log (for testing purposes) { "_id": 90832, "type": "deposit", "amount": 200, "user": { "name": "Mr Boo Grande", "discrim": "6644", "_id": "209137943661641728" }, "bot": { "name": "Alpha Bot", "discrim": "4112", "_id": "331841835733614603" }, "reason": "casino" } """ raw_request = flask.request request = flask.request.get_json() make_embed(request) return flask.jsonify(request)
def flask(body, headers): import flask path = '/hello/<account_id>/test' flask_app = flask.Flask('hello') @flask_app.route(path) def hello(account_id): request = flask.request user_agent = request.headers['User-Agent'] # NOQA limit = request.args.get('limit', '10') # NOQA return flask.Response(body, headers=headers, mimetype='text/plain') return flask_app
def werkzeug(body, headers): import werkzeug.wrappers as werkzeug from werkzeug.routing import Map, Rule path = '/hello/<account_id>/test' url_map = Map([Rule(path, endpoint='hello')]) @werkzeug.Request.application def hello(request): user_agent = request.headers['User-Agent'] # NOQA limit = request.args.get('limit', '10') # NOQA adapter = url_map.bind_to_environ(request.environ) # NOQA endpoint, values = adapter.match() # NOQA aid = values['account_id'] # NOQA return werkzeug.Response(body, headers=headers, mimetype='text/plain') return hello
def _serve_webui(self, file_name='index.html'): # pylint: disable=redefined-builtin try: assert file_name web3 = self.flask_app.config.get('WEB3_ENDPOINT') if web3 and 'config.' in file_name and file_name.endswith('.json'): host = request.headers.get('Host') if any(h in web3 for h in ('localhost', '127.0.0.1')) and host: _, _port = split_endpoint(web3) _host, _ = split_endpoint(host) web3 = 'http://{}:{}'.format(_host, _port) response = jsonify({'raiden': self._api_prefix, 'web3': web3}) else: response = send_from_directory(self.flask_app.config['WEBUI_PATH'], file_name) except (NotFound, AssertionError): response = send_from_directory(self.flask_app.config['WEBUI_PATH'], 'index.html') return response
def find_fontface_by_font_id(): response_data = [] try: query_string = request.args.get("font_id") fontfaces = FontFaceService().find_by_font_id(query_string) for fontface in fontfaces: response_data.append( { "fontface_id": fontface.fontface_id, "fontface": fontface.fontface, "font_id": fontface.font_id, "resource_path": fontface.resource_path } ) return jsonify(response_data) except: return jsonify({"error": "Invalid request"})
def pushbullet(ALERTID=None, TOKEN=None): """ Send a `link` notification to all devices on pushbullet with a link back to the alert's query. If `TOKEN` is not passed, requires `PUSHBULLETTOKEN` defined, see https://www.pushbullet.com/#settings/account """ if not PUSHBULLETURL: return ("PUSHBULLET parameter must be set, please edit the shim!", 500, None) if (TOKEN is not None): PUSHBULLETTOKEN = TOKEN if not PUSHBULLETTOKEN: return ("PUSHBULLETTOKEN parameter must be set, please edit the shim!", 500, None) a = parse(request) payload = { "body": a['info'], "title": a['AlertName'], "type": "link", "url": a['url'], } headers = {'Content-type': 'application/json', 'Access-Token': PUSHBULLETTOKEN} return callapi(PUSHBULLETURL, 'post', json.dumps(payload), headers)
def extract_token(self): ''' Extracts a token from the current HTTP request if it is available. Invokes the `save_user` callback if authentication is successful. ''' header = request.headers.get(b'authorization') if header and header.startswith(b'Negotiate '): token = header[10:] user, token = _gssapi_authenticate(token, self._service_name) if token is not None: stack.top.kerberos_token = token if user is not None: self._save_user(user) else: # Invalid Kerberos ticket, we could not complete authentication abort(403)
def recongize(): ''' Receives POST request from Twilio. Sends data from request to Amazon Rekognize and receives the API's analysis. Returns the resulting analysis as JSON. ''' # Reads image data from incoming request r = request data = r.files['file'].read() # Sends image data to Amazon Rekognize for analysis. Returns JSON response of results. try: results = client.recognize_celebrities(Image={'Bytes': data}) except botocore.exceptions.ClientError as e: results = {"Rekognition Client Error": str(e)} logging.info(results) return jsonify(results)
def chunked_reader(): try: # If we're running under uWSGI, use the uwsgi.chunked_read method # to read chunked input. import uwsgi # noqa while True: chunk = uwsgi.chunked_read() if len(chunk) > 0: yield chunk else: return except ImportError: # Otherwise try to read the wsgi input. This works in embedded Apache. stream = flask.request.environ["wsgi.input"] try: while True: yield stream.next() except Exception: return
def get_history(coin): c = Coin(coin.lower()) q = "SELECT * from api_coin WHERE name=:coin ORDER BY date desc" if request.args.get('key') in API_KEYS: print(crayons.red('Pro request!')) rows = pro_db.query(q, coin=c.name) else: rows = db.query(q, coin=c.name) return jsonify(history=[ { 'value': r.value, 'value.currency': 'USD', 'timestamp': maya.MayaDT.from_datetime(r.date).subtract(hours=4).iso8601(), 'when': maya.MayaDT.from_datetime(r.date).subtract(hours=4).slang_time() } for r in rows] )
def second_phase_lti_launch( ) -> helpers.JSONResponse[t.Mapping[str, t.Union[str, models.Assignment, bool]] ]: launch_params = jwt.decode( flask.request.headers.get('Jwt', None), app.config['LTI_SECRET_KEY'], algorithm='HS512' )['params'] lti = CanvasLTI(launch_params) user, new_token = lti.ensure_lti_user() course = lti.get_course() assig = lti.get_assignment(user) lti.set_user_role(user) new_role_created = lti.set_user_course_role(user, course) db.session.commit() result: t.Mapping[str, t.Union[str, models.Assignment, bool]] result = {'assignment': assig, 'new_role_created': new_role_created} if new_token is not None: result['access_token'] = new_token return helpers.jsonify(result)
def validate_sig(body, sig_str, pkh): """ Validate the signature on the body of the request - throws exception if not valid. """ # Check permission to update if (pkh is None): raise PermissionError("pkh not found.") # Validate the pkh format base58.b58decode_check(pkh) if (len(pkh) < 20) or (len(pkh) > 40): raise PermissionError("Invalid pkh") try: if not sig_str: raise PermissionError("X-Bitcoin-Sig header not found.") if not wallet.verify_bitcoin_message(body, sig_str, pkh): raise PermissionError("X-Bitcoin-Sig header not valid.") except Exception as err: logger.error("Failure: {0}".format(err)) raise PermissionError("X-Bitcoin-Sig header validation failed.") return True
def clear_cache(): user_name = request.headers.get('x-webauth-user') account = ldap_get_member(user_name) if not ldap_is_eval_director(account) and not ldap_is_rtp(account): return redirect("/dashboard") log = logger.new(request=request) log.info('Purge All Caches') _ldap_is_member_of_directorship.cache_clear() ldap_get_member.cache_clear() ldap_get_active_members.cache_clear() ldap_get_intro_members.cache_clear() ldap_get_onfloor_members.cache_clear() ldap_get_current_students.cache_clear() get_voting_members.cache_clear() get_members_info.cache_clear() get_onfloor_members.cache_clear() return "cache cleared", 200
def database_processor(logger, log_method, event_dict): # pylint: disable=unused-argument, redefined-outer-name if 'request' in event_dict: if event_dict['method'] != 'GET': log = UserLog( ipaddr=event_dict['ip'], user=event_dict['user'], method=event_dict['method'], blueprint=event_dict['blueprint'], path=event_dict['path'], description=event_dict['event'] ) db.session.add(log) db.session.flush() db.session.commit() del event_dict['request'] return event_dict
def RunFileServer(fileServerDir, fileServerPort): """ Run a Flask file server on the given port. Explicitly specify instance_path, because Flask's auto_find_instance_path can fail when run in a frozen app. """ app = Flask(__name__, instance_path=fileServerDir) @app.route('/fileserver-is-ready', methods=['GET']) def FileserverIsReady(): # pylint: disable=unused-variable """ Used to test if file server has started. """ return 'Fileserver is ready!' @app.route('/<path:filename>', methods=['GET']) def ServeFile(filename): # pylint: disable=unused-variable """ Serves up a file from PYUPDATER_FILESERVER_DIR. """ return send_from_directory(fileServerDir, filename.strip('/')) def ShutDownServer(): """ Shut down the file server. """ func = request.environ.get('werkzeug.server.shutdown') if func is None: raise RuntimeError('Not running with the Werkzeug Server') func() @app.route('/shutdown', methods=['POST']) def ShutDown(): # pylint: disable=unused-variable """ Respond to a POSTed request to shut down the file server. """ ShutDownServer() return 'Server shutting down...' app.run(host=LOCALHOST, port=fileServerPort)
def request_password(): jid = request.form.get('jid') re = s.query(RecoveryEmail).filter(RecoveryEmail.jid==jid, RecoveryEmail.confirmed==True).one_or_none() if re: password_code = ''.join(random.SystemRandom().choice(string.ascii_lowercase + string.ascii_uppercase + string.digits) for _ in range(64)) re.password_code = password_code s.merge(re) s.commit() password_code_link = 'https://www.pimux.de/password/?code=%s' % password_code sendMail(re.email, 'password reset request', 'click here: %s' % password_code_link) content = render_template('success.html', message='link was sent') else: content = render_template('error.html', message='user not found') return content
def connected(): logging.debug('Received connect request') emit('log', {'data': 'Connected'})