Python flask.request 模块,endpoint() 实例源码

我们从Python开源项目中,提取了以下48个代码示例,用于说明如何使用flask.request.endpoint()

项目:cookiecutter-flask-restful    作者:karec    | 项目源码 | 文件源码
def paginate(query, schema):
    page = request.args.get('page', DEFAULT_PAGE_NUMBER)
    per_page = request.args.get('page_size', DEFAULT_PAGE_SIZE)
    page_obj = query.paginate(page=page, per_page=per_page)
    next = url_for(
        request.endpoint,
        page=page_obj.next_num if page_obj.has_next else page_obj.page,
        per_page=per_page,
        **request.view_args
    )
    prev = url_for(
        request.endpoint,
        page=page_obj.prev_num if page_obj.has_prev else page_obj.page,
        per_page=per_page,
        **request.view_args
    )

    return {
        'total': page_obj.total,
        'pages': page_obj.pages,
        'next': next,
        'prev': prev,
        'results': schema.dump(page_obj.items).data
    }
项目:microcosm-flask    作者:globality-corp    | 项目源码 | 文件源码
def __init__(self, options, func, request_context):
        self.options = options
        self.operation = request.endpoint
        self.func = func.__name__
        self.method = request.method
        self.args = request.args
        self.view_args = request.view_args
        self.request_context = request_context
        self.timing = dict()

        self.error = None
        self.stack_trace = None
        self.request_body = None
        self.response_body = None
        self.response_headers = None
        self.status_code = None
        self.success = None
项目:do-portal    作者:certeu    | 项目源码 | 文件源码
def api_deprecated(new_endpoint, message='This endpoint is deprecated.'):
    """Decorator that adds a deprecation message for and endpoint.
    Decorated function will not be executed.

    :param new_endpoint: New endpoint to use
    :param message: Warning message
    :return:
    :rtype: func
    """
    def decorator(f):
        @functools.wraps(f)
        def wrapped(*args, **kwargs):
            response = jsonify({
                'message': message,
                'endpoint': url_for(new_endpoint, _external=True)
            })
            # response = jsonify(rv)
            response.status_code = 301
            response.headers['DO-New-Endpoint'] = \
                url_for(new_endpoint, _external=True)
            return response
        return wrapped
    return decorator
项目:os-reststack-manager    作者:gemagomez    | 项目源码 | 文件源码
def authenticate():
    # logger.debug("endpoint request: %s" % request.endpoint)
    if re.search('tenant_provisioned', str(request.endpoint)):
        g.user = "phone_home"
        logger.info("Authentication bypassed: tenant_provisioned")
        return

    try:
        decoded = jwt.decode(request.headers['X-Auth-Token'], credentials['tenant_secret'], algorithms=['HS256'])
        g.user = decoded['user']
    except KeyError:
        logger.error("Error: key error.")
        abort(401)
    except jwt.DecodeError:
        logger.error("Error: decode error")
        abort(401)
项目:FastIR_Server    作者:SekoiaLab    | 项目源码 | 文件源码
def before_request():
    if request.endpoint != 'root':
        if request.method != 'POST':
            response = generate_response(
                **{
                    'return': 'KO',
                    'data': 'Unsupported HTTP method'
                }
            )
            return response, 400
        elif request.form.get('APIKey') != Master_APIKey:
            response = generate_response(
                **{
                    'return': 'KO',
                    'data': 'Bad API key'
                }
            )
            return response, 403
项目:fabric8-analytics-server    作者:fabric8-analytics    | 项目源码 | 文件源码
def error():
    """This endpoint is used by httpd, which redirects its errors to it."""
    try:
        status = int(request.environ['REDIRECT_STATUS'])
    except Exception:
        # if there's an exception, it means that a client accessed this directly;
        #  in this case, we want to make it look like the endpoint is not here
        return api_404_handler()
    msg = 'Unknown error'
    # for now, we just provide specific error for stuff that already happened;
    #  before adding more, I'd like to see them actually happening with reproducers
    if status == 401:
        msg = 'Authentication failed'
    elif status == 405:
        msg = 'Method not allowed for this endpoint'
    raise HTTPError(status, msg)
项目:fabric8-analytics-server    作者:fabric8-analytics    | 项目源码 | 文件源码
def add_resource_no_matter_slashes(resource, route, endpoint=None, defaults=None):
    """Adds a resource for both trailing slash and no trailing slash to prevent redirects.
    """
    slashless = route.rstrip('/')
    _resource_paths.append(api_v1.url_prefix + slashless)
    slashful = route + '/'
    endpoint = endpoint or resource.__name__.lower()
    defaults = defaults or {}

    rest_api_v1.add_resource(resource,
                             slashless,
                             endpoint=endpoint + '__slashless',
                             defaults=defaults)
    rest_api_v1.add_resource(resource,
                             slashful,
                             endpoint=endpoint + '__slashful',
                             defaults=defaults)
项目:fabric8-analytics-server    作者:fabric8-analytics    | 项目源码 | 文件源码
def stack_analyses_debug(external_request_id):
    """Debug endpoint exposing operational data for particular stack analysis.

    This endpoint is not part of the public API.

    Note the existence of the data is not guaranteed,
    therefore the endpoint can return 404 even for valid request IDs.
    """

    results = retrieve_worker_results(rdb, external_request_id)
    if not results:
        return jsonify(error='No operational data for the request ID'), 404

    response = {'tasks': []}
    for result in results:
        op_data = result.to_dict()
        audit = op_data.get('task_result', {}).get('_audit', {})
        task_data = {'task_name': op_data.get('worker')}
        task_data['started_at'] = audit.get('started_at')
        task_data['ended_at'] = audit.get('ended_at')
        task_data['error'] = op_data.get('error')
        response['tasks'].append(task_data)
    return jsonify(response), 200
项目:hreftoday    作者:soasme    | 项目源码 | 文件源码
def templated(template=None):
    def decorator(f):
        @wraps(f)
        def decorated_function(*args, **kwargs):
            template_name = template
            if template_name is None:
                template_name = request.endpoint \
                    .replace('.', '/') + '.html'
            ctx = f(*args, **kwargs)
            if ctx is None:
                ctx = {}
            elif not isinstance(ctx, dict):
                return ctx
            return render_template(template_name, **ctx)
        return decorated_function
    return decorator
项目:lalascan    作者:blackye    | 项目源码 | 文件源码
def init_values(self):
        current_total = self.found if self.search else self.total
        pages = divmod(current_total, self.per_page)
        self.total_pages = pages[0] + 1 if pages[1] else pages[0]
        self.has_prev = self.page > 1
        self.has_next = self.page < self.total_pages

        args = request.args.copy()
        args.update(request.view_args.copy())
        self.args = {}
        for k, v in args.lists():
            if len(v) == 1:
                self.args[k] = v[0]
            else:
                self.args[k] = v

        self.endpoint = request.endpoint
项目:railgun    作者:xin-xinhanggao    | 项目源码 | 文件源码
def should_update_email():
    """Check whether the current user has not set his or her email address.

    Some authentication providers may not give the email addresses.
    However, Railgun relies on email addresss to finish login process.
    So the users from these providers will be given `fake` email addresses.

    :class:`should_update_email` should be called before a view is executed,
    check whether the user should be redirected to
    :func:`~railgun.website.views.profile_edit` to fill in his or her
    email address.

    :return: :data:`True` if the user has a fake email address, :data:`False`
        otherwise.

    .. note::

        If the current view is :func:`~railgun.website.views.profile_edit`,
        this method will return :data:`False`.
    """
    if (current_user.email.endswith(app.config['EXAMPLE_USER_EMAIL_SUFFIX'])
            and request.endpoint != 'profile_edit'):
        return True
项目:railgun    作者:xin-xinhanggao    | 项目源码 | 文件源码
def make_view(title, endpoint, *args, **kwargs):
        """A shortcut to make a :class:`NaviItem` linking to a standard
        Flask view.  Equal to the following statement:

        .. code-block:: python

            NaviItem(
                title=title,
                url=lambda: url_for(endpoint, *args, **kwargs),
                identity=endpoint
            )

        :param title: The title of new :class:`NaviItem`
        :param endpoint: The endpoint of target view.
        :type endpoint: :class:`str`
        :param args: The unnamed arguments to :func:`flask.url_for` when
            generating the `url`.
        :param kwargs: The named arguments to :func:`flask.url_for` when
            generating the `url`.
        """
        return NaviItem(title, lambda: url_for(endpoint, *args, **kwargs),
                        endpoint)
项目:kael    作者:360skyeye    | 项目源码 | 文件源码
def patch_validate_handler(name, bl=None):
    def params_validate_handler():
        if not request.endpoint \
                or not request.endpoint.startswith('{0}.'.format(name)) \
                or request.method == 'HEAD':
            return
        _, blueprint, endpoint = request.endpoint.split('.')
        mapping = get_mapping(blueprint, endpoint)
        if mapping:
            params = request.get_json()
            if validate(params, mapping, format_checker=formatchecker):
                raise ArgumentError('Json schema validate failed')

    if bl:
        bl.before_app_first_request(params_validate_handler)
    else:
        sys.modules['flask'].app.before_app_first_request(params_validate_handler)

    return params_validate_handler
项目:ColdCore    作者:IceCTF    | 项目源码 | 文件源码
def ratelimit(limit, per=300, send_x_headers=True,
              methods=["POST"],
              over_limit=on_over_limit,
              scope_func=scope_func,
              key_func=lambda: request.endpoint):
    def decorator(f):
        def rate_limited(*args, **kwargs):
            if request.method in methods:
                key = 'rate-limit/%s/%s/' % (key_func(), scope_func())
                rlimit = RateLimit(key, limit, per, send_x_headers)
                g._view_rate_limit = rlimit
                if over_limit is not None and rlimit.over_limit:
                    return over_limit(rlimit)
            return f(*args, **kwargs)
        return update_wrapper(rate_limited, f)
    return decorator
项目:go_basketball    作者:ZhaoPengkun    | 项目源码 | 文件源码
def before_request():
    request.start_time = datetime.now()


# @bp.after_request
# def after_request(resp):
#     try:
#         if '_' in request.endpoint:
#             dbcon = influx_db.connection
#             point = [{"measurement": config.APP_NAME,
#                       "tags": {"method": request.method, "status": resp.status_code, "endpoint":
#                           request.endpoint},
#                       "fields": {"base_url": request.base_url, "remote_address": request.remote_addr,
#                                  'response_time': (datetime.now() - request.start_time).microseconds}}]
#             dbcon.write_points(point)
#     except Exception as e:
#         pass
#         logger.debug('Write api statistics data to influxdb failed, error?' + e.message)
#     return resp
项目:FRG-Crowdsourcing    作者:97amarnathk    | 项目源码 | 文件源码
def ratelimit(limit, per, send_x_headers=True,
              scope_func=lambda: request.remote_addr,
              key_func=lambda: request.endpoint,
              path=lambda: request.path):
    """
    Decorator for limiting the access to a route.

    Returns the function if within the limit, otherwise TooManyRequests error

    """
    def decorator(f):
        @wraps(f)
        def rate_limited(*args, **kwargs):
            try:
                key = 'rate-limit/%s/%s/' % (key_func(), scope_func())
                rlimit = RateLimit(key, limit, per, send_x_headers)
                g._view_rate_limit = rlimit
                #if over_limit is not None and rlimit.over_limit:
                if rlimit.over_limit:
                    raise TooManyRequests
                return f(*args, **kwargs)
            except Exception as e:
                return error.format_exception(e, target=path(),
                                              action=f.__name__)
        return update_wrapper(rate_limited, f)
    return decorator
项目:oadoi    作者:Impactstory    | 项目源码 | 文件源码
def log_request(resp):
    if request.endpoint != "get_doi_endpoint":
        return

    logging_start_time = time()

    try:
        results = json.loads(resp.get_data())["results"][0]
    except (ValueError, RuntimeError, KeyError):
        # don't bother logging if no results
        return

    oa_color = results["oa_color"]
    if not oa_color:
        oa_color = "gray"

    body = {
        "timestamp": datetime.utcnow().isoformat(),
        "elapsed": elapsed(g.request_start_time, 2),
        "ip": get_ip(),
        "status_code": resp.status_code,
        "email": request.args.get("email", None),
        "doi": results["doi"],
        "year": results.get("year", None),
        "oa_color": oa_color
    }

    h = {
        "content-type": "text/json",
        "X-Forwarded-For": get_ip()
    }

    url = "http://logs-01.loggly.com/inputs/6470410b-1d7f-4cb2-a625-72d8fa867d61/tag/{}/".format(
        oa_color)
    requests.post(url, headers=h, data=json.dumps(body))
    # logger.info(u"log_request took {} seconds".format(elapsed(logging_start_time, 2)))
项目:oadoi    作者:Impactstory    | 项目源码 | 文件源码
def print_ip():
    user_agent = request.headers.get('User-Agent')
    logger.info(u"calling from IP {ip}. User-Agent is '{user_agent}'.".format(
        ip=get_ip(),
        user_agent=user_agent
    ))


# this is the old way of expressing this endpoint.
# the new way is POST api.oadoi.org/
# you can give it an object that lists DOIs
# you can also give it an object that lists biblios.
# this is undocumented and is just for impactstory use now.
项目:oadoi    作者:Impactstory    | 项目源码 | 文件源码
def get_doi_endpoint(doi):
    # the GET api endpoint (returns json data)
    my_pub = get_pub_from_doi(doi)
    return jsonify({"results": [my_pub.to_dict()]})
项目:oadoi    作者:Impactstory    | 项目源码 | 文件源码
def get_doi_endpoint_v2(doi):
    # the GET api endpoint (returns json data)
    my_pub = get_pub_from_doi(doi)
    return jsonify(my_pub.to_dict_v2())
项目:circleci-demo-python-flask    作者:CircleCI-Public    | 项目源码 | 文件源码
def before_request():
    if current_user.is_authenticated:
        current_user.ping()
        if not current_user.confirmed \
                and request.endpoint[:5] != 'auth.' \
                and request.endpoint != 'static':
            return redirect(url_for('auth.unconfirmed'))
项目:drift    作者:dgnorth    | 项目源码 | 文件源码
def check_jwt_authorization():
    current_identity = getattr(_request_ctx_stack.top,
                               'current_identity', None)
    if current_identity:
        return current_identity

    skip_check = False

    if current_app.config.get("disable_jwt", False):
        skip_check = True

    if request.endpoint in current_app.view_functions:
        fn = current_app.view_functions[request.endpoint]

        # Check Flask-RESTful endpoints for openness
        if hasattr(fn, "view_class"):
            exempt = getattr(fn.view_class, "no_jwt_check", [])
            if request.method in exempt:
                skip_check = True
        elif fn in _open_endpoints:
            skip_check = True

    # the static folder is open to all without authentication
    if request.endpoint == "static" or request.url.endswith("favicon.ico"):
        skip_check = True

    # In case the endpoint requires no authorization, and the request does not
    # carry any authorization info as well, we will not try to verify any JWT's
    if skip_check and 'Authorization' not in request.headers:
        return

    token, auth_type = get_auth_token_and_type()
    current_identity = verify_token(token, auth_type)
    if auth_type == "JWT":
        # Cache this token
        cache_token(current_identity)

    # Authorization token has now been converted to a verified payload
    _request_ctx_stack.top.current_identity = current_identity
    return current_identity
项目:drift    作者:dgnorth    | 项目源码 | 文件源码
def requires_roles(_roles):
    """
        endpoint decorator to lock down an endpoint
        on a set of roles (comma delimitered)
    """
    def wrapper(fn):
        @wraps(fn)
        def decorator(*args, **kwargs):
            if not _roles:
                return fn(*args, **kwargs)
            current_user = query_current_user()
            if not current_user:
                abort_unauthorized("You do not have access to this resource."
                                   " It requires role '%s'" % _roles)

            required_roles = set(_roles.split(","))
            user_roles = set(current_user.get("roles", []))
            if not required_roles.intersection(user_roles):
                log.warning("User does not have the needed roles for this "
                            "call. User roles = '%s', Required roles = "
                            "'%s'. current_user = '%s'",
                            current_user.get("roles", ""),
                            _roles, repr(current_user))
                abort_unauthorized("You do not have access to this resource. "
                                   "It requires role '%s'" % _roles)
            return fn(*args, **kwargs)
        return decorator
    return wrapper


# List of open endpoints, i.e. not requiring a valid JWT.
项目:drift    作者:dgnorth    | 项目源码 | 文件源码
def jwt_not_required(fn):
    log.debug("Registering open endpoint: %s", fn.__name__)
    _open_endpoints.add(fn)
    return fn
项目:do-portal    作者:certeu    | 项目源码 | 文件源码
def auth_audit_log(response):
    """On deployment remove the ``crossdomain`` decorator"""
    try:
        jdata = json.loads(request.data.decode())
        if 'password' in jdata:
            jdata['password'] = '*********'
        jdata_str = json.dumps(jdata)
    except ValueError:
        jdata_str = ''
    kwargs = {
        'module': auth.name,
        'user': current_user.name,
        'email': current_user.email,
        'action': _HTTP_METHOD_TO_AUDIT_MAP[request.method.lower()],
        'data': addslashes(jdata_str),
        'url': request.url,
        'endpoint': request.endpoint,
        'ip': request.remote_addr,
        'status': response.status,
        'timestamp': datetime.datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S')
    }
    entry = []
    for k, v in kwargs.items():
        entry.append('{0!s}="{1!s}"'.format(k, v))
    entry = ' '.join(entry)
    current_app.audit_log.info('{0!s}'.format(entry))
    return response
项目:do-portal    作者:certeu    | 项目源码 | 文件源码
def api_audit_log(response):
    """Saves information about the request in the ``audit_log``

    :param response: Server :class:`~flask.Response`
    :return: :class:`~flask.Response`
    """
    kwargs = {
        'module': api.name,
        'user': current_user.name,
        'email': current_user.email,
        'action': _HTTP_METHOD_TO_AUDIT_MAP[request.method.lower()],
        'data': addslashes(request.data.decode()),
        'url': request.url,
        'endpoint': request.endpoint,
        'ip': request.remote_addr,
        'status': response.status,
        'timestamp': datetime.datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S')
    }
    if not request.view_args and request.method.lower() == 'put':
        kwargs['action'] = _HTTP_METHOD_TO_AUDIT_MAP['post']
    entry = []
    for k, v in kwargs.items():
        entry.append('{0!s}="{1!s}"'.format(k, v))
    entry = ' '.join(entry)
    current_app.audit_log.info('{0!s}'.format(entry))
    return response
项目:do-portal    作者:certeu    | 项目源码 | 文件源码
def all(self):
        try:
            return getattr(get_mailman_client(), pluralize(self.endpoint))
        except AttributeError:
            raise MailmanApiError
        except MailmanConnectionError as e:
            raise MailmanApiError(e)
项目:do-portal    作者:certeu    | 项目源码 | 文件源码
def get(self, **kwargs):
        try:
            method = getattr(get_mailman_client(), 'get_' + self.endpoint)
            return method(**kwargs)
        except AttributeError as e:
            raise MailmanApiError(e)
        except HTTPError as e:
            raise
        except MailmanConnectionError as e:
            raise MailmanApiError(e)
项目:do-portal    作者:certeu    | 项目源码 | 文件源码
def __init__(self, endpoint, **kwargs):
        self.endpoint = endpoint
项目:do-portal    作者:certeu    | 项目源码 | 文件源码
def cp_audit_log(response):
    """Saves information about the request in the ``audit_log``

    :param response: Server :class:`~flask.Response`
    :return: :class:`~flask.Response`
    """
    try:
        jdata = json.loads(request.data.decode())
        if 'password' in jdata:
            jdata['password'] = '*********'
        jdata_str = json.dumps(jdata)
    except ValueError:
        jdata_str = ''

    kwargs = {
        'module': cp.name,
        'user': g.user.name,
        'email': g.user.email,
        'action': _HTTP_METHOD_TO_AUDIT_MAP[request.method.lower()],
        'data': addslashes(jdata_str),
        'url': request.url,
        'endpoint': request.endpoint,
        'ip': request.remote_addr,
        'status': response.status,
        'timestamp': datetime.datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S')
    }
    if not request.view_args and request.method.lower() == 'put':
        kwargs['action'] = _HTTP_METHOD_TO_AUDIT_MAP['post']
    entry = []
    for k, v in kwargs.items():
        entry.append('{0!s}="{1!s}"'.format(k, v))
    entry = ' '.join(entry)
    current_app.audit_log.info('{0!s}'.format(entry))
    return response
项目:flask_workshop    作者:cursodepythonoficial    | 项目源码 | 文件源码
def build_menu_item(endpoint, name):
    is_active = "active" if request.endpoint == endpoint else ""
    return Markup(
        f'<li class="nav-item {is_active}">'
        f'<a class="nav-link" href="{url_for(endpoint)}">{name}</a>'
        f'</li>'
    )
项目:flask_workshop    作者:cursodepythonoficial    | 项目源码 | 文件源码
def build_menu_item_as_filter(endpoint):
    is_active = "active" if request.endpoint == endpoint else ""
    return Markup(
        f'<li class="nav-item {is_active}">'
        f'<a class="nav-link" href="{url_for(endpoint)}">{endpoint.upper()}</a>'
        f'</li>'
    )
项目:osm-wikidata    作者:EdwardBetts    | 项目源码 | 文件源码
def url_for_other_page(page):
    args = request.view_args.copy()
    args.update(request.args)
    args['page'] = page
    return url_for(request.endpoint, **args)
项目:osm-wikidata    作者:EdwardBetts    | 项目源码 | 文件源码
def navbar():
    try:
        return dict(navbar_pages=navbar_pages,
                    active=request.endpoint)
    except RuntimeError:
        return {}  # maybe we don't care
项目:osm-wikidata    作者:EdwardBetts    | 项目源码 | 文件源码
def sort_link(order):
    args = request.view_args.copy()
    args['sort'] = order
    return url_for(request.endpoint, **args)
项目:oclubs    作者:SHSIDers    | 项目源码 | 文件源码
def url_for_other_page(page):
    args = request.view_args.copy()
    args.update(request.args)
    args['page'] = page
    return url_for(request.endpoint, **args)
项目:flasky    作者:RoseOu    | 项目源码 | 文件源码
def before_request():
    if current_user.is_authenticated:
        current_user.ping()
        if not current_user.confirmed \
                and request.endpoint[:5] != 'auth.' \
                and request.endpoint != 'static':
            return redirect(url_for('auth.unconfirmed'))
项目:python-ares    作者:pynog    | 项目源码 | 文件源码
def ask_login():
  if not current_user.is_anonymous:
    return

  if getattr(current_app.view_functions[request.endpoint], 'no_login', False):
    return

  return redirect(url_for('ares.aresLogin', next=request.endpoint))
项目:python-ares    作者:pynog    | 项目源码 | 文件源码
def ask_login():
  if not current_user.is_anonymous:
    return

  if getattr(current_app.view_functions[request.endpoint], 'no_login', False):
    return

  return redirect(url_for('ares.aresLogin', next=request.endpoint))
项目:Plog    作者:thundernet8    | 项目源码 | 文件源码
def app_after_request(response):
    if request.endpoint != 'static':
        return response
    response.cache_control.max_age = 15552000
    return response


# jinja_env
项目:dd-trace-py    作者:DataDog    | 项目源码 | 文件源码
def _finish_span(self, response=None, exception=None):
        """ Close and finish the active span if it exists. """
        span = getattr(g, 'flask_datadog_span', None)
        if span:
            if span.sampled:
                error = 0
                code = response.status_code if response else None
                method = request.method if request else None

                # if we didn't get a response, but we did get an exception, set
                # codes accordingly.
                if not response and exception:
                    code = 500
                    # The 3 next lines might not be strictly required, since `set_traceback`
                    # also get the exception from the sys.exc_info (and fill the error meta).
                    # Since we aren't sure it always work/for insuring no BC break, keep
                    # these lines which get overridden anyway.
                    error = 1
                    span.set_tag(errors.ERROR_TYPE, type(exception))
                    span.set_tag(errors.ERROR_MSG, exception)
                    # The provided `exception` object doesn't have a stack trace attached,
                    # so attach the stack trace with `set_traceback`.
                    span.set_traceback()

                # the endpoint that matched the request is None if an exception
                # happened so we fallback to a common resource
                resource = code if not request.endpoint else request.endpoint
                span.resource = compat.to_unicode(resource).lower()
                span.set_tag(http.URL, compat.to_unicode(request.base_url or ''))
                span.set_tag(http.STATUS_CODE, code)
                span.set_tag(http.METHOD, method)
                span.error = error
            span.finish()
            # Clear our span just in case.
            g.flask_datadog_span = None

    # Request hook methods
项目:fanclley    作者:guerbai    | 项目源码 | 文件源码
def before_request():
    if  current_user.is_authenticated\
        and not current_user.confirmed \
        and request.endpoint[:5] != 'auth.' \
        and request.endpoint != 'static':
            return redirect(url_for('auth.unconfirmed'))
项目:flask-bitmapist    作者:cuttlesoft    | 项目源码 | 文件源码
def test_index(app, bitmap, client):
    with app.test_request_context('/bitmapist/'):
        assert request.endpoint == 'bitmapist.index'
项目:flask-bitmapist    作者:cuttlesoft    | 项目源码 | 文件源码
def test_cohort(app, bitmap, client):
    with app.test_request_context('/bitmapist/cohort'):
        assert request.endpoint == 'bitmapist.cohort'
项目:fallball-connector    作者:ingrammicro    | 项目源码 | 文件源码
def allow_public_endpoints_only():
    public_endpoints = (HealthCheck.__name__.lower(),)
    if g.endpoint not in public_endpoints:
        abort(401)
项目:fallball-connector    作者:ingrammicro    | 项目源码 | 文件源码
def set_name_for_reseller(reseller_id):
    if not reseller_id:
        return None
    if g.endpoint == ApplicationList.__name__.lower():
        return generate_reseller_name()

    return get_reseller_name(reseller_id)
项目:fallball-connector    作者:ingrammicro    | 项目源码 | 文件源码
def get_reseller_info():
    reseller_id = request.headers.get('Aps-Instance-Id')
    is_new = g.endpoint == ApplicationList.__name__.lower()
    reseller_name = set_name_for_reseller(reseller_id)
    oauth = get_oauth()
    return ResellerInfo(id=reseller_id, name=reseller_name, is_new=is_new, auth=oauth)
项目:fallball-connector    作者:ingrammicro    | 项目源码 | 文件源码
def before_request():
    g.log = dict()
    g.log['out'] = list()
    g.log['request'] = log_request(request)

    g.endpoint = request.endpoint
    if request.blueprint:
        g.endpoint = g.endpoint[len(request.blueprint):].lstrip('.')

    reseller_info = get_reseller_info()
    g.reseller_name = reseller_info.name
    g.company_name = 'N/A'

    if not reseller_info.name:
        allow_public_endpoints_only()
        return

    if not check_oauth_signature(request):
        abort(401)

    g.auth = reseller_info.auth

    g.reseller = Reseller(reseller_info.name, reseller_info.id, None)
    g.reseller.refresh()

    if not g.reseller.token and not reseller_info.is_new:
        abort(403)