我们从Python开源项目中,提取了以下48个代码示例,用于说明如何使用flask.request.endpoint()。
def paginate(query, schema): page = request.args.get('page', DEFAULT_PAGE_NUMBER) per_page = request.args.get('page_size', DEFAULT_PAGE_SIZE) page_obj = query.paginate(page=page, per_page=per_page) next = url_for( request.endpoint, page=page_obj.next_num if page_obj.has_next else page_obj.page, per_page=per_page, **request.view_args ) prev = url_for( request.endpoint, page=page_obj.prev_num if page_obj.has_prev else page_obj.page, per_page=per_page, **request.view_args ) return { 'total': page_obj.total, 'pages': page_obj.pages, 'next': next, 'prev': prev, 'results': schema.dump(page_obj.items).data }
def __init__(self, options, func, request_context): self.options = options self.operation = request.endpoint self.func = func.__name__ self.method = request.method self.args = request.args self.view_args = request.view_args self.request_context = request_context self.timing = dict() self.error = None self.stack_trace = None self.request_body = None self.response_body = None self.response_headers = None self.status_code = None self.success = None
def api_deprecated(new_endpoint, message='This endpoint is deprecated.'): """Decorator that adds a deprecation message for and endpoint. Decorated function will not be executed. :param new_endpoint: New endpoint to use :param message: Warning message :return: :rtype: func """ def decorator(f): @functools.wraps(f) def wrapped(*args, **kwargs): response = jsonify({ 'message': message, 'endpoint': url_for(new_endpoint, _external=True) }) # response = jsonify(rv) response.status_code = 301 response.headers['DO-New-Endpoint'] = \ url_for(new_endpoint, _external=True) return response return wrapped return decorator
def authenticate(): # logger.debug("endpoint request: %s" % request.endpoint) if re.search('tenant_provisioned', str(request.endpoint)): g.user = "phone_home" logger.info("Authentication bypassed: tenant_provisioned") return try: decoded = jwt.decode(request.headers['X-Auth-Token'], credentials['tenant_secret'], algorithms=['HS256']) g.user = decoded['user'] except KeyError: logger.error("Error: key error.") abort(401) except jwt.DecodeError: logger.error("Error: decode error") abort(401)
def before_request(): if request.endpoint != 'root': if request.method != 'POST': response = generate_response( **{ 'return': 'KO', 'data': 'Unsupported HTTP method' } ) return response, 400 elif request.form.get('APIKey') != Master_APIKey: response = generate_response( **{ 'return': 'KO', 'data': 'Bad API key' } ) return response, 403
def error(): """This endpoint is used by httpd, which redirects its errors to it.""" try: status = int(request.environ['REDIRECT_STATUS']) except Exception: # if there's an exception, it means that a client accessed this directly; # in this case, we want to make it look like the endpoint is not here return api_404_handler() msg = 'Unknown error' # for now, we just provide specific error for stuff that already happened; # before adding more, I'd like to see them actually happening with reproducers if status == 401: msg = 'Authentication failed' elif status == 405: msg = 'Method not allowed for this endpoint' raise HTTPError(status, msg)
def add_resource_no_matter_slashes(resource, route, endpoint=None, defaults=None): """Adds a resource for both trailing slash and no trailing slash to prevent redirects. """ slashless = route.rstrip('/') _resource_paths.append(api_v1.url_prefix + slashless) slashful = route + '/' endpoint = endpoint or resource.__name__.lower() defaults = defaults or {} rest_api_v1.add_resource(resource, slashless, endpoint=endpoint + '__slashless', defaults=defaults) rest_api_v1.add_resource(resource, slashful, endpoint=endpoint + '__slashful', defaults=defaults)
def stack_analyses_debug(external_request_id): """Debug endpoint exposing operational data for particular stack analysis. This endpoint is not part of the public API. Note the existence of the data is not guaranteed, therefore the endpoint can return 404 even for valid request IDs. """ results = retrieve_worker_results(rdb, external_request_id) if not results: return jsonify(error='No operational data for the request ID'), 404 response = {'tasks': []} for result in results: op_data = result.to_dict() audit = op_data.get('task_result', {}).get('_audit', {}) task_data = {'task_name': op_data.get('worker')} task_data['started_at'] = audit.get('started_at') task_data['ended_at'] = audit.get('ended_at') task_data['error'] = op_data.get('error') response['tasks'].append(task_data) return jsonify(response), 200
def templated(template=None): def decorator(f): @wraps(f) def decorated_function(*args, **kwargs): template_name = template if template_name is None: template_name = request.endpoint \ .replace('.', '/') + '.html' ctx = f(*args, **kwargs) if ctx is None: ctx = {} elif not isinstance(ctx, dict): return ctx return render_template(template_name, **ctx) return decorated_function return decorator
def init_values(self): current_total = self.found if self.search else self.total pages = divmod(current_total, self.per_page) self.total_pages = pages[0] + 1 if pages[1] else pages[0] self.has_prev = self.page > 1 self.has_next = self.page < self.total_pages args = request.args.copy() args.update(request.view_args.copy()) self.args = {} for k, v in args.lists(): if len(v) == 1: self.args[k] = v[0] else: self.args[k] = v self.endpoint = request.endpoint
def should_update_email(): """Check whether the current user has not set his or her email address. Some authentication providers may not give the email addresses. However, Railgun relies on email addresss to finish login process. So the users from these providers will be given `fake` email addresses. :class:`should_update_email` should be called before a view is executed, check whether the user should be redirected to :func:`~railgun.website.views.profile_edit` to fill in his or her email address. :return: :data:`True` if the user has a fake email address, :data:`False` otherwise. .. note:: If the current view is :func:`~railgun.website.views.profile_edit`, this method will return :data:`False`. """ if (current_user.email.endswith(app.config['EXAMPLE_USER_EMAIL_SUFFIX']) and request.endpoint != 'profile_edit'): return True
def make_view(title, endpoint, *args, **kwargs): """A shortcut to make a :class:`NaviItem` linking to a standard Flask view. Equal to the following statement: .. code-block:: python NaviItem( title=title, url=lambda: url_for(endpoint, *args, **kwargs), identity=endpoint ) :param title: The title of new :class:`NaviItem` :param endpoint: The endpoint of target view. :type endpoint: :class:`str` :param args: The unnamed arguments to :func:`flask.url_for` when generating the `url`. :param kwargs: The named arguments to :func:`flask.url_for` when generating the `url`. """ return NaviItem(title, lambda: url_for(endpoint, *args, **kwargs), endpoint)
def patch_validate_handler(name, bl=None): def params_validate_handler(): if not request.endpoint \ or not request.endpoint.startswith('{0}.'.format(name)) \ or request.method == 'HEAD': return _, blueprint, endpoint = request.endpoint.split('.') mapping = get_mapping(blueprint, endpoint) if mapping: params = request.get_json() if validate(params, mapping, format_checker=formatchecker): raise ArgumentError('Json schema validate failed') if bl: bl.before_app_first_request(params_validate_handler) else: sys.modules['flask'].app.before_app_first_request(params_validate_handler) return params_validate_handler
def ratelimit(limit, per=300, send_x_headers=True, methods=["POST"], over_limit=on_over_limit, scope_func=scope_func, key_func=lambda: request.endpoint): def decorator(f): def rate_limited(*args, **kwargs): if request.method in methods: key = 'rate-limit/%s/%s/' % (key_func(), scope_func()) rlimit = RateLimit(key, limit, per, send_x_headers) g._view_rate_limit = rlimit if over_limit is not None and rlimit.over_limit: return over_limit(rlimit) return f(*args, **kwargs) return update_wrapper(rate_limited, f) return decorator
def before_request(): request.start_time = datetime.now() # @bp.after_request # def after_request(resp): # try: # if '_' in request.endpoint: # dbcon = influx_db.connection # point = [{"measurement": config.APP_NAME, # "tags": {"method": request.method, "status": resp.status_code, "endpoint": # request.endpoint}, # "fields": {"base_url": request.base_url, "remote_address": request.remote_addr, # 'response_time': (datetime.now() - request.start_time).microseconds}}] # dbcon.write_points(point) # except Exception as e: # pass # logger.debug('Write api statistics data to influxdb failed, error?' + e.message) # return resp
def ratelimit(limit, per, send_x_headers=True, scope_func=lambda: request.remote_addr, key_func=lambda: request.endpoint, path=lambda: request.path): """ Decorator for limiting the access to a route. Returns the function if within the limit, otherwise TooManyRequests error """ def decorator(f): @wraps(f) def rate_limited(*args, **kwargs): try: key = 'rate-limit/%s/%s/' % (key_func(), scope_func()) rlimit = RateLimit(key, limit, per, send_x_headers) g._view_rate_limit = rlimit #if over_limit is not None and rlimit.over_limit: if rlimit.over_limit: raise TooManyRequests return f(*args, **kwargs) except Exception as e: return error.format_exception(e, target=path(), action=f.__name__) return update_wrapper(rate_limited, f) return decorator
def log_request(resp): if request.endpoint != "get_doi_endpoint": return logging_start_time = time() try: results = json.loads(resp.get_data())["results"][0] except (ValueError, RuntimeError, KeyError): # don't bother logging if no results return oa_color = results["oa_color"] if not oa_color: oa_color = "gray" body = { "timestamp": datetime.utcnow().isoformat(), "elapsed": elapsed(g.request_start_time, 2), "ip": get_ip(), "status_code": resp.status_code, "email": request.args.get("email", None), "doi": results["doi"], "year": results.get("year", None), "oa_color": oa_color } h = { "content-type": "text/json", "X-Forwarded-For": get_ip() } url = "http://logs-01.loggly.com/inputs/6470410b-1d7f-4cb2-a625-72d8fa867d61/tag/{}/".format( oa_color) requests.post(url, headers=h, data=json.dumps(body)) # logger.info(u"log_request took {} seconds".format(elapsed(logging_start_time, 2)))
def print_ip(): user_agent = request.headers.get('User-Agent') logger.info(u"calling from IP {ip}. User-Agent is '{user_agent}'.".format( ip=get_ip(), user_agent=user_agent )) # this is the old way of expressing this endpoint. # the new way is POST api.oadoi.org/ # you can give it an object that lists DOIs # you can also give it an object that lists biblios. # this is undocumented and is just for impactstory use now.
def get_doi_endpoint(doi): # the GET api endpoint (returns json data) my_pub = get_pub_from_doi(doi) return jsonify({"results": [my_pub.to_dict()]})
def get_doi_endpoint_v2(doi): # the GET api endpoint (returns json data) my_pub = get_pub_from_doi(doi) return jsonify(my_pub.to_dict_v2())
def before_request(): if current_user.is_authenticated: current_user.ping() if not current_user.confirmed \ and request.endpoint[:5] != 'auth.' \ and request.endpoint != 'static': return redirect(url_for('auth.unconfirmed'))
def check_jwt_authorization(): current_identity = getattr(_request_ctx_stack.top, 'current_identity', None) if current_identity: return current_identity skip_check = False if current_app.config.get("disable_jwt", False): skip_check = True if request.endpoint in current_app.view_functions: fn = current_app.view_functions[request.endpoint] # Check Flask-RESTful endpoints for openness if hasattr(fn, "view_class"): exempt = getattr(fn.view_class, "no_jwt_check", []) if request.method in exempt: skip_check = True elif fn in _open_endpoints: skip_check = True # the static folder is open to all without authentication if request.endpoint == "static" or request.url.endswith("favicon.ico"): skip_check = True # In case the endpoint requires no authorization, and the request does not # carry any authorization info as well, we will not try to verify any JWT's if skip_check and 'Authorization' not in request.headers: return token, auth_type = get_auth_token_and_type() current_identity = verify_token(token, auth_type) if auth_type == "JWT": # Cache this token cache_token(current_identity) # Authorization token has now been converted to a verified payload _request_ctx_stack.top.current_identity = current_identity return current_identity
def requires_roles(_roles): """ endpoint decorator to lock down an endpoint on a set of roles (comma delimitered) """ def wrapper(fn): @wraps(fn) def decorator(*args, **kwargs): if not _roles: return fn(*args, **kwargs) current_user = query_current_user() if not current_user: abort_unauthorized("You do not have access to this resource." " It requires role '%s'" % _roles) required_roles = set(_roles.split(",")) user_roles = set(current_user.get("roles", [])) if not required_roles.intersection(user_roles): log.warning("User does not have the needed roles for this " "call. User roles = '%s', Required roles = " "'%s'. current_user = '%s'", current_user.get("roles", ""), _roles, repr(current_user)) abort_unauthorized("You do not have access to this resource. " "It requires role '%s'" % _roles) return fn(*args, **kwargs) return decorator return wrapper # List of open endpoints, i.e. not requiring a valid JWT.
def jwt_not_required(fn): log.debug("Registering open endpoint: %s", fn.__name__) _open_endpoints.add(fn) return fn
def auth_audit_log(response): """On deployment remove the ``crossdomain`` decorator""" try: jdata = json.loads(request.data.decode()) if 'password' in jdata: jdata['password'] = '*********' jdata_str = json.dumps(jdata) except ValueError: jdata_str = '' kwargs = { 'module': auth.name, 'user': current_user.name, 'email': current_user.email, 'action': _HTTP_METHOD_TO_AUDIT_MAP[request.method.lower()], 'data': addslashes(jdata_str), 'url': request.url, 'endpoint': request.endpoint, 'ip': request.remote_addr, 'status': response.status, 'timestamp': datetime.datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S') } entry = [] for k, v in kwargs.items(): entry.append('{0!s}="{1!s}"'.format(k, v)) entry = ' '.join(entry) current_app.audit_log.info('{0!s}'.format(entry)) return response
def api_audit_log(response): """Saves information about the request in the ``audit_log`` :param response: Server :class:`~flask.Response` :return: :class:`~flask.Response` """ kwargs = { 'module': api.name, 'user': current_user.name, 'email': current_user.email, 'action': _HTTP_METHOD_TO_AUDIT_MAP[request.method.lower()], 'data': addslashes(request.data.decode()), 'url': request.url, 'endpoint': request.endpoint, 'ip': request.remote_addr, 'status': response.status, 'timestamp': datetime.datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S') } if not request.view_args and request.method.lower() == 'put': kwargs['action'] = _HTTP_METHOD_TO_AUDIT_MAP['post'] entry = [] for k, v in kwargs.items(): entry.append('{0!s}="{1!s}"'.format(k, v)) entry = ' '.join(entry) current_app.audit_log.info('{0!s}'.format(entry)) return response
def all(self): try: return getattr(get_mailman_client(), pluralize(self.endpoint)) except AttributeError: raise MailmanApiError except MailmanConnectionError as e: raise MailmanApiError(e)
def get(self, **kwargs): try: method = getattr(get_mailman_client(), 'get_' + self.endpoint) return method(**kwargs) except AttributeError as e: raise MailmanApiError(e) except HTTPError as e: raise except MailmanConnectionError as e: raise MailmanApiError(e)
def __init__(self, endpoint, **kwargs): self.endpoint = endpoint
def cp_audit_log(response): """Saves information about the request in the ``audit_log`` :param response: Server :class:`~flask.Response` :return: :class:`~flask.Response` """ try: jdata = json.loads(request.data.decode()) if 'password' in jdata: jdata['password'] = '*********' jdata_str = json.dumps(jdata) except ValueError: jdata_str = '' kwargs = { 'module': cp.name, 'user': g.user.name, 'email': g.user.email, 'action': _HTTP_METHOD_TO_AUDIT_MAP[request.method.lower()], 'data': addslashes(jdata_str), 'url': request.url, 'endpoint': request.endpoint, 'ip': request.remote_addr, 'status': response.status, 'timestamp': datetime.datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S') } if not request.view_args and request.method.lower() == 'put': kwargs['action'] = _HTTP_METHOD_TO_AUDIT_MAP['post'] entry = [] for k, v in kwargs.items(): entry.append('{0!s}="{1!s}"'.format(k, v)) entry = ' '.join(entry) current_app.audit_log.info('{0!s}'.format(entry)) return response
def build_menu_item(endpoint, name): is_active = "active" if request.endpoint == endpoint else "" return Markup( f'<li class="nav-item {is_active}">' f'<a class="nav-link" href="{url_for(endpoint)}">{name}</a>' f'</li>' )
def build_menu_item_as_filter(endpoint): is_active = "active" if request.endpoint == endpoint else "" return Markup( f'<li class="nav-item {is_active}">' f'<a class="nav-link" href="{url_for(endpoint)}">{endpoint.upper()}</a>' f'</li>' )
def url_for_other_page(page): args = request.view_args.copy() args.update(request.args) args['page'] = page return url_for(request.endpoint, **args)
def navbar(): try: return dict(navbar_pages=navbar_pages, active=request.endpoint) except RuntimeError: return {} # maybe we don't care
def sort_link(order): args = request.view_args.copy() args['sort'] = order return url_for(request.endpoint, **args)
def ask_login(): if not current_user.is_anonymous: return if getattr(current_app.view_functions[request.endpoint], 'no_login', False): return return redirect(url_for('ares.aresLogin', next=request.endpoint))
def app_after_request(response): if request.endpoint != 'static': return response response.cache_control.max_age = 15552000 return response # jinja_env
def _finish_span(self, response=None, exception=None): """ Close and finish the active span if it exists. """ span = getattr(g, 'flask_datadog_span', None) if span: if span.sampled: error = 0 code = response.status_code if response else None method = request.method if request else None # if we didn't get a response, but we did get an exception, set # codes accordingly. if not response and exception: code = 500 # The 3 next lines might not be strictly required, since `set_traceback` # also get the exception from the sys.exc_info (and fill the error meta). # Since we aren't sure it always work/for insuring no BC break, keep # these lines which get overridden anyway. error = 1 span.set_tag(errors.ERROR_TYPE, type(exception)) span.set_tag(errors.ERROR_MSG, exception) # The provided `exception` object doesn't have a stack trace attached, # so attach the stack trace with `set_traceback`. span.set_traceback() # the endpoint that matched the request is None if an exception # happened so we fallback to a common resource resource = code if not request.endpoint else request.endpoint span.resource = compat.to_unicode(resource).lower() span.set_tag(http.URL, compat.to_unicode(request.base_url or '')) span.set_tag(http.STATUS_CODE, code) span.set_tag(http.METHOD, method) span.error = error span.finish() # Clear our span just in case. g.flask_datadog_span = None # Request hook methods
def before_request(): if current_user.is_authenticated\ and not current_user.confirmed \ and request.endpoint[:5] != 'auth.' \ and request.endpoint != 'static': return redirect(url_for('auth.unconfirmed'))
def test_index(app, bitmap, client): with app.test_request_context('/bitmapist/'): assert request.endpoint == 'bitmapist.index'
def test_cohort(app, bitmap, client): with app.test_request_context('/bitmapist/cohort'): assert request.endpoint == 'bitmapist.cohort'
def allow_public_endpoints_only(): public_endpoints = (HealthCheck.__name__.lower(),) if g.endpoint not in public_endpoints: abort(401)
def set_name_for_reseller(reseller_id): if not reseller_id: return None if g.endpoint == ApplicationList.__name__.lower(): return generate_reseller_name() return get_reseller_name(reseller_id)
def get_reseller_info(): reseller_id = request.headers.get('Aps-Instance-Id') is_new = g.endpoint == ApplicationList.__name__.lower() reseller_name = set_name_for_reseller(reseller_id) oauth = get_oauth() return ResellerInfo(id=reseller_id, name=reseller_name, is_new=is_new, auth=oauth)
def before_request(): g.log = dict() g.log['out'] = list() g.log['request'] = log_request(request) g.endpoint = request.endpoint if request.blueprint: g.endpoint = g.endpoint[len(request.blueprint):].lstrip('.') reseller_info = get_reseller_info() g.reseller_name = reseller_info.name g.company_name = 'N/A' if not reseller_info.name: allow_public_endpoints_only() return if not check_oauth_signature(request): abort(401) g.auth = reseller_info.auth g.reseller = Reseller(reseller_info.name, reseller_info.id, None) g.reseller.refresh() if not g.reseller.token and not reseller_info.is_new: abort(403)