我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用flask.request.environ()。
def request_validate(view): @wraps(view) def wrapper(*args, **kwargs): endpoint = request.endpoint.partition('.')[-1] # data method = request.method if method == 'HEAD': method = 'GET' locations = validators.get((endpoint, method), {}) data_type = {"json": "request_data", "args": "request_args"} for location, schema in locations.items(): value = getattr(request, location, MultiDict()) validator = FlaskValidatorAdaptor(schema) result = validator.validate(value) LOG.info("Validated request %s: %s" % (location, result)) if schema.get("maxProperties") == 0: continue else: kwargs[data_type[location]] = result context = request.environ['context'] return view(*args, context=context, **kwargs) return wrapper
def auth(): """ Check user/password and returns token if valid """ if settings.API.get('forwarded_host'): try: if not request.environ['HTTP_X_FORWARDED_HOST'] == settings.API['forwarded_host']: raise BadRequest('Invalid HTTP_X_FORWARDED_HOST') except KeyError: raise BadRequest('Missing HTTP_X_FORWARDED_HOST') body = request.get_json() authenticated, ret = GeneralController.auth(body) if authenticated: return ret else: raise Unauthorized(ret)
def _setup_requests(app): def _init_request(): session = request.environ['beaker.session'] session.save() _setup_connector( app=current_app, app_config=current_app.config, session=session ) @app.before_request def before_request(): init_request = _init_request() return init_request
def createTeam(): """ """ if request.environ.get('HTTP_ORIGIN') == request.host_url[:-1]: newTeam = Team.query.filter_by(team_name=request.args['team']).first() dbEnv = AresSql.SqliteDB(request.args['report_name']) if not newTeam: team = Team(request.args['team'], request.args['team_email']) db.session.add(team) db.session.commit() newTeam = Team.query.filter_by(team_name=request.args['team']).first() team_id = newTeam.team_id role = request.args.get('role', 'user') dbEnv.modify("""INSERT INTO team_def (team_id, team_name, role) VALUES (%s, '%s', '%s'); INSERT INTO env_auth (env_id, team_id) SELECT env_def.env_id, %s FROM env_def WHERE env_def.env_name = '%s' ;""" % (team_id, request.args['team'], role, team_id, request.args['report_name'])) return json.dumps('Success'), 200 return json.dumps('Forbidden', 403)
def page_not_found(error): logdate = datetime.strftime(date.today(), '%Y-%m-%d') logfn = './logs/activity-'+logdate user = 'Anonymous' # Username is Anonymous by default if 'token' in session: token = session['token'] tokenfilename = 'registered/'+token with open(tokenfilename, 'r') as f: # for getting username associated with the set token user = f.readline()[:-1] with open(logfn, 'a') as f: # logging username, IP addr, error code log = user+' '+request.environ['REMOTE_ADDR']+' 500\n' f.write(log) return render_template('500.html'), 500 # No cache
def post_comment(): """Add post to article.""" form = PostForm() article = request.environ["HTTP_REFERER"].split("=")[-1] tim = time.time() user = current_user.name post = Post(author=user, article=article, message=form.message.data, time=tim) db.session.add(post) user = current_user.name event = Event(author=user, article=article, event="COMMENT", time=time.time()) db.session.add(event) db.session.commit() return redirect("/biblio/article=" + article)
def send_error_mail(exception): """Sends an error mail to the admin containing the traceback and configuration. After that, a custom HTTP 500 page is shown. :param exception: the exception raised :type exception: ``Exception`` :return: the HTML to send back in the response, and the HTTP code 500. :rtype: str, int """ # Inspired from <https://github.com/jasonwyatt/Flask-ErrorMail>. message = Message("Join2 ORCID exception: %s" % exception, sender=CFG_SITE_ADMIN_EMAIL, recipients=[CFG_SITE_ADMIN_EMAIL]) message_contents = ["Traceback:", "=" * 80, traceback.format_exc(), "\n", "Request Information:", "=" * 80] environ = request.environ for key in sorted(environ.keys()): message_contents.append("%s: %s" % (key, environ.get(key))) message.body = "\n".join(message_contents) + "\n" mailer.send(message) return render_template("500.html"), 500
def run_flask_request(environ): from .wsgi_aux import app if '_wsgi.input' in environ: environ['wsgi.input'] = BytesIO(environ['_wsgi.input']) # Create a request context similar to that of the original request # so that the task can have access to flask.g, flask.request, etc. with app.request_context(environ): # Record the fact that we are running in the Celery worker now g.in_celery = True # Run the route function and record the response try: rv = app.full_dispatch_request() except: # If we are in debug mode we want to see the exception # Else, return a 500 error if app.debug: raise rv = app.make_response(InternalServerError()) return (rv.get_data(), rv.status_code, rv.headers)
def wrapped(*args, **kwargs): # If we are already running the request on the celery side, then we # just call the wrapped function to allow the request to execute. if getattr(g, 'in_celery', False): return f(*args, **kwargs) # If we are on the Flask side, we need to launch the Celery task, # passing the request environment, which will be used to reconstruct # the request object. The request body has to be handled as a special # case, since WSGI requires it to be provided as a file-like object. environ = {k: v for k, v in request.environ.items() if isinstance(v, text_types)} if 'wsgi.input' in request.environ: environ['_wsgi.input'] = request.get_data() t = run_flask_request.apply_async(args=(environ,)) # Return a 202 response, with a link that the client can use to # obtain task status that is based on the Celery task id. if t.state == states.PENDING or t.state == states.RECEIVED or \ t.state == states.STARTED: return '', 202, {'Location': url_for('tasks.get_status', id=t.id)} # If the task already finished, return its return value as response. # This would be the case when CELERY_ALWAYS_EAGER is set to True. return t.info
def error(): """This endpoint is used by httpd, which redirects its errors to it.""" try: status = int(request.environ['REDIRECT_STATUS']) except Exception: # if there's an exception, it means that a client accessed this directly; # in this case, we want to make it look like the endpoint is not here return api_404_handler() msg = 'Unknown error' # for now, we just provide specific error for stuff that already happened; # before adding more, I'd like to see them actually happening with reproducers if status == 401: msg = 'Authentication failed' elif status == 405: msg = 'Method not allowed for this endpoint' raise HTTPError(status, msg)
def flask_cache_key(*args, **kwargs): """Create a key for use by Flask-Caching. Args: None Returns: result: Key to be used """ # Use the request URI as part of the key path = request.path # This helps to differentiate between various instances of infoset # each running on different ports server_port = request.environ['SERVER_PORT'] # Use a hash of the request arguments as part of the key args = str(hash(frozenset(request.args.items()))) # Return result = ( 'infoset_flask_{}_{}_{}'.format( path, server_port, args)[:255].encode('utf-8')) return result
def run_ctx_request(environ): """ run flask request context in celery worker """ from blueprints import app # wsgi.app if '_wsgi.input' in environ: # an input stream (file-like object) from which the HTTP request body can be read. # detail: https://www.python.org/dev/peps/pep-0333/#environ-variables environ['wsgi.input'] = BytesIO(environ['_wsgi.input']) with app.request_context(): g.in_celery = True try: rv = app.full_dispatch_request() except InternalServerError: if app.debug: raise return app.make_response(InternalServerError()) return (rv.get_data(), rv.status_code, rv.headers)
def decorator(*args, **kwargs): if getattr(g, 'in_celery', False): return f(*args, **kwargs) environ = {k: v for k, v in request.environ.items() if isinstance(v, text_types)} if 'wsgi.input' in request.environ: environ['_wsgi.input'] = request.get_data() # request.body task = run_ctx_request.apply_async(args=(environ,)) if task.state == states.PENDING or task.state == states.RECEIVED or \ task.state == states.STARTED: return '', 202, {'Location': url_for('api.get_status', id=task.id)} return task.info
def __init__(self, rate_limiter): super(LogApiCallCount, self).__init__() if self.stats: tags = ['version:'+str(Config.API_VERSION_MINOR), 'application:rest-api', 'api-key:'+rate_limiter._auth_key.id, 'method:'+request.environ['REQUEST_METHOD'], 'endpoint:'+request.url_rule.rule.split('<')[0].split(request.blueprint)[1]] ip = get_remote_addr() self.stats.track(get_remote_addr(), 'api_call_count', properties=dict(tags=tags, ip=ip, ignore_time = True, )) # self.stats.increment('api.call.count', # tags=tags,)
def ensure_project_exists(view): @wraps(view) def wrapper(*args, **kwargs): context = request.environ['context'] if context.using_keystone: find_or_create_project(request, context) return view(*args, **kwargs) return wrapper
def get_debug_flag(default=None): val = os.environ.get('FLASK_DEBUG') if not val: return default return val not in ('0', 'false', 'no')
def twitter(): auth = tweepy.OAuthHandler("T4NRPcEtUrCEU58FesRmRtkdW", "zmpbytgPpSbro6RZcXsKgYQoz24zLH3vYZHOHAAs5j33P4eoRg", "http://"+ request.environ["HTTP_HOST"] + "/auth/twitter") auth.set_access_token(config.TWITTER_ACCESS_TOKEN, config.TWITTER_ACCESS_TOKEN_SECRET) api = tweepy.API(auth) try: if api.me().name: return redirect(url_for('index')) except tweepy.TweepError: pass redirect_url = auth.get_authorization_url() session["request_token"] = auth.request_token return redirect(redirect_url)
def make_response(self, rv): status_or_headers = headers = None if isinstance(rv, tuple): rv, status_or_headers, headers = rv + (None,) * (3 - len(rv)) if rv is None: raise ValueError('View function did not return a response') if isinstance(status_or_headers, (dict, list)): headers, status_or_headers = status_or_headers, None if not isinstance(rv, self.response_class): # When we create a response object directly, we let the constructor # set the headers and status. We do this because there can be # some extra logic involved when creating these objects with # specific values (like default content type selection). if isinstance(rv, (JSONRender, text_type, bytes, bytearray, list, dict)): rv = self.response_class(rv, headers=headers, status=status_or_headers) headers = status_or_headers = None else: rv = self.response_class.force_type(rv, request.environ) if status_or_headers is not None: if isinstance(status_or_headers, string_types): rv.status = status_or_headers else: rv.status_code = status_or_headers if headers: rv.headers.extend(headers) return rv
def force_type(cls, rv, environ=None): if isinstance(rv, dict) or isinstance(rv, list): rv = Response( json.dumps( rv, cls=TimestampJSONEncoder, ), content_type='application/json' ) return super(CerberusResponse, cls).force_type(rv, environ)
def get_http_info_with_retriever(self, retriever=None): """ Exact method for getting http_info but with form data work around. """ if retriever is None: retriever = self.get_form_data url_parts = urlparse.urlsplit(request.url) try: data = retriever() except ClientDisconnected: data = {} headers = dict(get_headers(request.environ)) if self.data_blacklist: data = apply_blacklist(data, self.data_blacklist) if self.headers_blacklist: headers = apply_blacklist(headers, self.headers_blacklist) return { 'url': '%s://%s%s' % (url_parts.scheme, url_parts.netloc, url_parts.path), 'query_string': url_parts.query, 'method': request.method, 'data': data, 'headers': headers, 'env': dict(get_environ(request.environ)), }
def get_script_url(): # Dynamically determine the script's url # For production use, this is not a great idea. Instead, set it # explicitly. Remember that for production, webhook urls must start with # https! my_url = rm_queryparameters(full_url(request.environ)) # See http://flask.pocoo.org/docs/0.10/api/#flask.request return my_url
def url_origin(s, use_forwarded_host = False): # testing if Heroku includes forwarding host use_forwarded_host = True include_protocol = True ssl = (('HTTPS' in s) and s['HTTPS'] == 'on') sp = s['SERVER_PROTOCOL'].lower() protocol = sp[:sp.find('/')] + ('s' if ssl else '' ) port = s['SERVER_PORT'] port = '' if ((not ssl and port=='80') or (ssl and port=='443')) else (':' + port) host = s['HTTP_X_FORWARDED_HOST'] if (use_forwarded_host and ('HTTP_X_FORWARDED_HOST' in s)) \ else (s['HTTP_HOST'] if ('HTTP_HOST' in s) else None) host = host if (host != None) else (s['SERVER_NAME'] + port) # The protocol can easily be wrong if we're frontended by a HTTPS proxy # (Like the standard Heroku setup!) on_heroku = heroku_env in os.environ upgrade_insecure_request = request.headers.get('Upgrade-Insecure-Requests') upgrade_insecure_request = upgrade_insecure_request and upgrade_insecure_request == 1 https_proto = request.headers.get('X-Forwarded-Proto') https_proto = https_proto and https_proto == 'https' use_https = on_heroku or upgrade_insecure_request or https_proto if use_https: # Special handling protocol = "https" return protocol + '://' + host
def editScript(): """ """ scriptDtls = request.environ['HTTP_REFERER'].split('?')[0].split('/') if scriptDtls[-2] == 'run': script = open(os.path.join(config.ARES_USERS_LOCATION, scriptDtls[-1], "%s.py" % scriptDtls[-1]), 'w') else: script = open(os.path.join(config.ARES_USERS_LOCATION, scriptDtls[-2], "%s.py" % scriptDtls[-1]), 'w') try: for line in request.data.decode('utf-8').split('\n'): script.write('%s\n' % line) except Exception as e: logging.debug(e) finally: script.close() return 'OK'
def on_connect(): addr = request.environ['REMOTE_ADDR'] if addr in blacklist: print('{} was found in the blacklist and was rejected'.format(addr)) shard.update_connection(request)
def after_request(resp): """ Adds HTTP Headers to the response Arguments ---------- resp : flask.Response object the Flask response object """ resp.headers['Cache-Control'] = 'no-cache' if app.config['DEV']: resp.headers['Access-Control-Allow-Origin'] = '*' resp.headers['Access-Control-Allow-Methods'] = 'GET, POST' resp.headers['Access-Control-Allow-Headers'] = 'Origin, X-Requested-With, Content-Type, Accept' source_ip = get_real_source_ip() structured_log( level='info', msg="HTTP Request", method=request.method, uri=request.path, status=resp.status_code, src_ip=source_ip, protocol=request.environ['SERVER_PROTOCOL'], user_agent=request.environ.get('HTTP_USER_AGENT', '-') ) return resp
def get_real_source_ip(): """ Returns the real source IP address of the HTTP request. """ if 'X-Forwarded-For' in request.headers: return request.headers.getlist("X-Forwarded-For")[0].rpartition(' ')[-1] else: return request.environ['REMOTE_ADDR']
def register(): with open('users.pkl', 'r') as f: userdict = load(f) if request.method == 'GET': logdate = datetime.strftime(date.today(), '%Y-%m-%d') logfn = './logs/activity-'+logdate user = 'Anonymous' # Username is Anonymous by default if 'token' in session: token = session['token'] tokenfilename = 'registered/'+token with open(tokenfilename, 'r') as f: # for getting username associated with the set token user = f.readline()[:-1] with open(logfn, 'a') as f: # logging username, IP addr, end-point, request type log = user+' '+request.environ['REMOTE_ADDR']+' register'+' GET\n' f.write(log) return render_template('register.html', userdict=userdict) # inputs for name, email, timezone, phone, aboutme elif request.method == 'POST': uname = str(request.form['uname']) # mandatory email = str(request.form['email']) # mandatory timezone = request.form['timezone'] # optional phone = request.form['phone'] # optional aboutme = request.form['aboutme'] # optional token = pbkdf2_sha512.encrypt(email) # setting the token as a salted and hashed email token = token.replace('/', '+') # filenames can't contain '/' session['token'] = token # set token in session key on successful registration fn = 'registered' + path.sep + token with open(fn, 'w') as f: # write reviewer info into a file named by the token f.write(uname+'\n'+email+'\n'+timezone+'\n'+phone+'\n'+aboutme+'\n') f.write('--files--\n') userdict[uname] = email with open('users.pkl', 'w') as f: # dictionary with keys as usernames and values as emails dump(userdict, f) send_email(email, 'Welcome to AROWF!', 'registration_mail', name=uname, token=token) # send welcome email with token logdate = datetime.strftime(date.today(), '%Y-%m-%d') logfn = './logs/activity-'+logdate with open(logfn, 'a') as f: # logging username, IP addr, end-point, request type log = uname+' '+request.environ['REMOTE_ADDR']+' register'+' POST\n' f.write(log) return redirect(url_for('index'))
def token(): if request.method == 'GET': logdate = datetime.strftime(date.today(), '%Y-%m-%d') logfn = './logs/activity-'+logdate user = 'Anonymous' # Username is Anonymous by default if 'token' in session: token = session['token'] tokenfilename = 'registered/'+token with open(tokenfilename, 'r') as f: # for getting username associated with the set token user = f.readline()[:-1] with open(logfn, 'a') as f: # logging username, IP addr, end-point, request type log = user+' '+request.environ['REMOTE_ADDR']+' token'+' GET\n' f.write(log) tokenNames = listdir('registered/') # get list of all tokens return render_template('token.html', user=user, tokenNames=tokenNames) # displays links to help docs for each end-point elif request.method == 'POST': # if token not set in session key if request.form['tokeninput'] != 'null': session['token'] = request.form['tokeninput'] # obtain from form and set it else: session.pop('token', None) logdate = datetime.strftime(date.today(), '%Y-%m-%d') logfn = './logs/activity-'+logdate user = 'Anonymous' # Username is Anonymous by default if 'token' in session: token = session['token'] tokenfilename = 'registered/'+token with open(tokenfilename, 'r') as f: # for getting username associated with the set token user = f.readline()[:-1] with open(logfn, 'a') as f: # logging username, IP addr, end-point, request type log = user+' '+request.environ['REMOTE_ADDR']+' token'+' POST\n' f.write(log) return redirect(url_for('index'))
def page_not_found(error): logdate = datetime.strftime(date.today(), '%Y-%m-%d') logfn = './logs/activity-'+logdate user = 'Anonymous' # Username is Anonymous by default if 'token' in session: token = session['token'] tokenfilename = 'registered/'+token with open(tokenfilename, 'r') as f: # for getting username associated with the set token user = f.readline()[:-1] with open(logfn, 'a') as f: # logging username, IP addr, error code log = user+' '+request.environ['REMOTE_ADDR']+' 404\n' f.write(log) return render_template('404.html'), 404
def update_entry(): """Add a new entry to the bibliography.""" form = BiblioForm() article_name = request.environ["HTTP_REFERER"].split("=")[-1] if form.validate_on_submit(): article = BiblioEntry.query.filter_by(ID=form.ID.data).first() article.ID = form.ID.data article.ENTRYTYPE = form.typ.data article.authors = form.author.data article.title = form.title.data article.year = form.year.data article.journal = form.journal.data article.school = form.school.data article.url = form.url.data article.keywords = form.keywords.data article.tag = form.tag.data db.session.add(article) user = current_user.name event = Event(author=user, article=form.ID.data, event="UPDATE", time=time.time()) db.session.add(event) db.session.commit() return redirect("/biblio/article=" + article_name) return redirect("/biblio")
def ensure_secure_request(): is_request_secure = request.environ['wsgi.url_scheme'] == 'https' if not is_request_secure and not config.allow_insecure_transport: if request.method in ('POST', 'PUT', 'PATCH'): # request body already sent in insecure manner # return error in this case to notify cluster admin return abort(400) else: return redirect(request.url.replace('http://', 'https://', 1))
def swagger(file_name): with open('/usr/local/share/ari/api-docs/{file_name}'.format(file_name=file_name), 'r') as swagger_file: swagger_spec = swagger_file.read() swagger_spec = swagger_spec.replace('localhost:8088', 'ari:{port}'.format(port=request.environ['SERVER_PORT'])) return make_response(swagger_spec, 200, {'Content-Type': 'application/json'})