Python flask.request 模块,view_args() 实例源码

我们从Python开源项目中,提取了以下20个代码示例,用于说明如何使用flask.request.view_args()

项目:cookiecutter-flask-restful    作者:karec    | 项目源码 | 文件源码
def paginate(query, schema):
    page = request.args.get('page', DEFAULT_PAGE_NUMBER)
    per_page = request.args.get('page_size', DEFAULT_PAGE_SIZE)
    page_obj = query.paginate(page=page, per_page=per_page)
    next = url_for(
        request.endpoint,
        page=page_obj.next_num if page_obj.has_next else page_obj.page,
        per_page=per_page,
        **request.view_args
    )
    prev = url_for(
        request.endpoint,
        page=page_obj.prev_num if page_obj.has_prev else page_obj.page,
        per_page=per_page,
        **request.view_args
    )

    return {
        'total': page_obj.total,
        'pages': page_obj.pages,
        'next': next,
        'prev': prev,
        'results': schema.dump(page_obj.items).data
    }
项目:microcosm-flask    作者:globality-corp    | 项目源码 | 文件源码
def __init__(self, options, func, request_context):
        self.options = options
        self.operation = request.endpoint
        self.func = func.__name__
        self.method = request.method
        self.args = request.args
        self.view_args = request.view_args
        self.request_context = request_context
        self.timing = dict()

        self.error = None
        self.stack_trace = None
        self.request_body = None
        self.response_body = None
        self.response_headers = None
        self.status_code = None
        self.success = None
项目:osm-wikidata    作者:EdwardBetts    | 项目源码 | 文件源码
def log_exception(self, exc_info):
        self.logger.error("""
Path:                 %s
HTTP Method:          %s
Client IP Address:    %s
User Agent:           %s
User Platform:        %s
User Browser:         %s
User Browser Version: %s
GET args:             %s
view args:            %s
URL:                  %s
""" % (
            request.path,
            request.method,
            request.remote_addr,
            request.user_agent.string,
            request.user_agent.platform,
            request.user_agent.browser,
            request.user_agent.version,
            dict(request.args),
            request.view_args,
            request.url
        ), exc_info=exc_info)
项目:bucket_api    作者:jokamjohn    | 项目源码 | 文件源码
def bucket_required(f):
    """
    Decorator to ensure that a valid bucket id is sent in the url path parameters
    :param f:
    :return:
    """

    @wraps(f)
    def decorated_function(*args, **kwargs):
        bucket_id_ = request.view_args['bucket_id']
        try:
            int(bucket_id_)
        except ValueError:
            return response('failed', 'Provide a valid Bucket Id', 401)
        return f(*args, **kwargs)

    return decorated_function
项目:flask-alcohol    作者:natfoster82    | 项目源码 | 文件源码
def _auto_update(self, mapper=None):
        cls = self.__class__
        if mapper is None:
            mapper = class_mapper(cls)
        for col in mapper.columns:
            # this order is the listed order except that hybrid properties go first
            api_info = cls._get_api_info(col.name)
            set_by = api_info['set_by']
            if set_by == 'json':
                if col.name in g.fields:
                    val = g.fields.get(col.name)
                    self._set_field_value(col.name, val)
            elif set_by == 'url':
                if col.name in request.view_args:
                    val = request.view_args.get(col.name)
                    self._set_field_value(col.name, val)
            elif set_by == 'server':
                # important not to let it try to set it from json
                # requires a @setter decorated function
                self._set_field_value(name=col.name, try_auto=False)
项目:flask_restapi    作者:dracarysX    | 项目源码 | 文件源码
def get_obj(self):
        args = request.args.to_dict()
        args.update({self.pk_field: request.view_args[self.pk_url_kwarg]})
        self.builder = PeeweeQueryBuilder(self.model, args)
        try:
            obj = self.builder.build().get()
        except self.model.DoesNotExist:
            obj = None
        return obj
项目:microcosm-flask    作者:globality-corp    | 项目源码 | 文件源码
def to_dict(self):
        dct = dict(
            operation=self.operation,
            func=self.func,
            method=self.method,
            **self.timing
        )
        if self.options.include_path and self.view_args:
            dct.update({
                key: value
                for key, value in self.view_args.items()
            })
        if self.options.include_query_string and self.args:
            dct.update({
                key: values[0]
                for key, values in self.args.lists()
                if len(values) == 1 and is_uuid(values[0])
            })

        if self.request_context is not None:
            dct.update(self.request_context())

        if self.success is True:
            dct.update(
                success=self.success,
                status_code=self.status_code,
            )
        if self.success is False:
            dct.update(
                success=self.success,
                message=extract_error_message(self.error)[:2048],
                context=extract_context(self.error),
                stack_trace=self.stack_trace,
                status_code=self.status_code,
            )

        self.post_process_request_body(dct)
        self.post_process_response_body(dct)
        self.post_process_response_headers(dct)

        return dct
项目:do-portal    作者:certeu    | 项目源码 | 文件源码
def api_audit_log(response):
    """Saves information about the request in the ``audit_log``

    :param response: Server :class:`~flask.Response`
    :return: :class:`~flask.Response`
    """
    kwargs = {
        'module': api.name,
        'user': current_user.name,
        'email': current_user.email,
        'action': _HTTP_METHOD_TO_AUDIT_MAP[request.method.lower()],
        'data': addslashes(request.data.decode()),
        'url': request.url,
        'endpoint': request.endpoint,
        'ip': request.remote_addr,
        'status': response.status,
        'timestamp': datetime.datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S')
    }
    if not request.view_args and request.method.lower() == 'put':
        kwargs['action'] = _HTTP_METHOD_TO_AUDIT_MAP['post']
    entry = []
    for k, v in kwargs.items():
        entry.append('{0!s}="{1!s}"'.format(k, v))
    entry = ' '.join(entry)
    current_app.audit_log.info('{0!s}'.format(entry))
    return response
项目:do-portal    作者:certeu    | 项目源码 | 文件源码
def cp_audit_log(response):
    """Saves information about the request in the ``audit_log``

    :param response: Server :class:`~flask.Response`
    :return: :class:`~flask.Response`
    """
    try:
        jdata = json.loads(request.data.decode())
        if 'password' in jdata:
            jdata['password'] = '*********'
        jdata_str = json.dumps(jdata)
    except ValueError:
        jdata_str = ''

    kwargs = {
        'module': cp.name,
        'user': g.user.name,
        'email': g.user.email,
        'action': _HTTP_METHOD_TO_AUDIT_MAP[request.method.lower()],
        'data': addslashes(jdata_str),
        'url': request.url,
        'endpoint': request.endpoint,
        'ip': request.remote_addr,
        'status': response.status,
        'timestamp': datetime.datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S')
    }
    if not request.view_args and request.method.lower() == 'put':
        kwargs['action'] = _HTTP_METHOD_TO_AUDIT_MAP['post']
    entry = []
    for k, v in kwargs.items():
        entry.append('{0!s}="{1!s}"'.format(k, v))
    entry = ' '.join(entry)
    current_app.audit_log.info('{0!s}'.format(entry))
    return response
项目:klue-client-server    作者:erwan-lemonnier    | 项目源码 | 文件源码
def __init__(self, request, has_json):
        self.request = request
        self.query = request.args
        self.path = request.view_args
        self.headers = request.headers
        if has_json:
            self._json = self.request.get_json(force=True)
项目:fastapi    作者:zhangnian    | 项目源码 | 文件源码
def user_cache_key():
    """?path paramaters???userid
    """
    return 'cache::user::{}'.format(request.view_args['id'])
项目:flask-docjson    作者:elemepi    | 项目源码 | 文件源码
def validate_request(typ):
    validate_method(request.method, typ['methods'], P_REQUEST)
    validate_route(request.view_args, typ['route'], P_REQUEST)
    validate_json(request.get_json(), typ['schema'], P_REQUEST, key='')
项目:dodotable    作者:spoqa    | 项目源码 | 文件源码
def build_url(self, **kwargs):
        arg = request.args.copy()
        view_args = request.view_args
        arg.update(view_args)
        for attr in kwargs.keys():
            if attr in arg:
                arg.pop(attr)
        arg.update(kwargs.items())
        rule = request.url_rule
        result = rule.build(arg)
        return result[1]
项目:eq-survey-runner    作者:ONSdigital    | 项目源码 | 文件源码
def before_request():
    metadata = get_metadata(current_user)
    if metadata:
        logger.bind(tx_id=metadata['tx_id'])

    values = request.view_args
    logger.bind(eq_id=values['eq_id'], form_type=values['form_type'],
                ce_id=values['collection_id'])
    logger.info('questionnaire request', method=request.method, url_path=request.full_path)

    if metadata:
        g.schema_json = load_schema_from_metadata(metadata)
        _check_same_survey(values['eq_id'], values['form_type'], values['collection_id'])
项目:SuperOcto    作者:mcecchi    | 项目源码 | 文件源码
def _check_etag_and_lastmodified_for_i18n():
    locale = request.view_args["locale"]
    domain = request.view_args["domain"]

    etag_ok = util.flask.check_etag(_compute_etag_for_i18n(request.view_args["locale"], request.view_args["domain"]))

    lastmodified = _compute_date_for_i18n(locale, domain)
    lastmodified_ok = lastmodified is None or util.flask.check_lastmodified(lastmodified)

    return etag_ok and lastmodified_ok
项目:flask-monitor    作者:fraoustin    | 项目源码 | 文件源码
def _dict(self):
        mydict = {}
        # manage timing
        mydict['timing'] = {}
        mydict['timing']['delta'] = self.timing
        mydict['timing']['start'] = self.request._stats_start_event
        mydict['timing']['asctime'] = asctime(gmtime(self.request._stats_start_event))
        # manage flask
        mydict['flask'] = {}
        mydict['flask']['secret_key'] = current_app.config['SECRET_KEY']
        mydict['flask']['server_name'] = current_app.config['SERVER_NAME']
        mydict['flask']['session_cookie_name'] = current_app.config['SESSION_COOKIE_NAME']
        mydict['flask']['session_cookie_domain'] = current_app.config['SESSION_COOKIE_DOMAIN']
        mydict['flask']['session_cookie_path'] = current_app.config['SESSION_COOKIE_PATH']
        mydict['flask']['session_cookie_httponly'] = current_app.config['SESSION_COOKIE_HTTPONLY']
        mydict['flask']['session_cookie_secure'] = current_app.config['SESSION_COOKIE_SECURE']
        mydict['flask']['session_refresh_each_request'] = current_app.config['SESSION_REFRESH_EACH_REQUEST']
        # manage request
        mydict['request'] = {}
        mydict['request']['url'] = request.url
        mydict['request']['args'] = {arg: request.args.get(arg) for arg in request.args}
        mydict['request']['view_args'] = request.view_args
        mydict['request']['path'] = request.path
        mydict['request']['method'] = request.method
        mydict['request']['remote_addr'] = request.remote_addr
        try:
            mydict['request']['rule'] = request.url_rule.rule
        except:
            mydict['request']['rule'] = ''
        #manage response
        mydict['response'] = {}
        mydict['response']['status_code'] = self.response.status_code
        mydict['response']['headers'] = { i:j for i,j in self.response.headers}
        return mydict
项目:flask-alcohol    作者:natfoster82    | 项目源码 | 文件源码
def make_proxy_method(cls, name):
        """
        Creates a proxy function that can be used by Flasks routing. The
        proxy instantiates the FlaskView subclass and calls the appropriate
        method.

        :param name: the name of the method to create a proxy for
        """

        i = cls()
        view = getattr(i, name)

        if cls.__decorators__:
            for decorator in cls.__decorators__:
                view = decorator(view)

        @functools.wraps(view)
        def proxy(**forgettable_view_args):
            # Always use the global request object's view_args, because they
            # can be modified by intervening function before an endpoint or
            # wrapper gets called. This matches Flask's behavior.
            del forgettable_view_args

            response = view(**request.view_args)
            if not isinstance(response, Response):
                response = make_response(response)

            return response

        return proxy
项目:Penny-Dreadful-Tools    作者:PennyDreadfulMTG    | 项目源码 | 文件源码
def create_issue(content, author, location='Discord', repo='PennyDreadfulMTG/Penny-Dreadful-Tools'):
    if content is None or content == '':
        return None
    body = ''
    if '\n' in content:
        title, body = content.split('\n', 1)
        body += '\n\n'
    else:
        title = content
    body += 'Reported on {location} by {author}'.format(location=location, author=author)
    if request:
        body += textwrap.dedent("""
            --------------------------------------------------------------------------------
            Request Method: {method}
            Path: {full_path}
            Cookies: {cookies}
            Endpoint: {endpoint}
            View Args: {view_args}
            Person: {id}
            User-Agent: {user_agent}
            Referrer: {referrer}
        """.format(method=request.method, full_path=request.full_path, cookies=request.cookies, endpoint=request.endpoint, view_args=request.view_args, id=session.get('id', 'logged_out'), user_agent=request.headers.get('User-Agent'), referrer=request.referrer))
    print(title + '\n' + body)
    # Only check for github details at the last second to get log output even if github not configured.
    if not configuration.get('github_user') or not configuration.get('github_password'):
        return None
    g = Github(configuration.get('github_user'), configuration.get('github_password'))
    repo = g.get_repo(repo)
    issue = repo.create_issue(title=title, body=body)
    return issue
项目:radar    作者:renalreg    | 项目源码 | 文件源码
def get_object(self):
        query = self.get_query()
        query = self.filter_query(query)

        model_class = self.get_model_class()
        id_type = inspect(model_class).columns['id'].type

        obj_id = request.view_args['id']

        if isinstance(id_type, Integer):
            try:
                obj_id = int(obj_id)
            except ValueError:
                raise NotFound()
        elif isinstance(id_type, postgresql.UUID):
            try:
                # Check the ID is a valid UUID so don't try
                # to query with an invalid UUID.
                uuid.UUID(obj_id)
            except ValueError:
                raise NotFound()

        query = query.filter(model_class.id == obj_id)

        try:
            obj = query.one()
        except NoResultFound:
            raise NotFound()

        self.check_object_permissions(obj)

        return obj
项目:flask-rest-jsonapi    作者:miLibris    | 项目源码 | 文件源码
def get(self, *args, **kwargs):
        """Retrieve a collection of objects"""
        self.before_get(args, kwargs)

        qs = QSManager(request.args, self.schema)
        objects_count, objects = self._data_layer.get_collection(qs, kwargs)

        schema_kwargs = getattr(self, 'get_schema_kwargs', dict())
        schema_kwargs.update({'many': True})

        schema = compute_schema(self.schema,
                                schema_kwargs,
                                qs,
                                qs.include)

        result = schema.dump(objects).data

        view_kwargs = request.view_args if getattr(self, 'view_kwargs', None) is True else dict()
        add_pagination_links(result,
                             objects_count,
                             qs,
                             url_for(self.view, **view_kwargs))

        result.update({'meta': {'count': objects_count}})

        self.after_get(result)

        return result