我们从Python开源项目中,提取了以下20个代码示例,用于说明如何使用flask.request.view_args()。
def paginate(query, schema): page = request.args.get('page', DEFAULT_PAGE_NUMBER) per_page = request.args.get('page_size', DEFAULT_PAGE_SIZE) page_obj = query.paginate(page=page, per_page=per_page) next = url_for( request.endpoint, page=page_obj.next_num if page_obj.has_next else page_obj.page, per_page=per_page, **request.view_args ) prev = url_for( request.endpoint, page=page_obj.prev_num if page_obj.has_prev else page_obj.page, per_page=per_page, **request.view_args ) return { 'total': page_obj.total, 'pages': page_obj.pages, 'next': next, 'prev': prev, 'results': schema.dump(page_obj.items).data }
def __init__(self, options, func, request_context): self.options = options self.operation = request.endpoint self.func = func.__name__ self.method = request.method self.args = request.args self.view_args = request.view_args self.request_context = request_context self.timing = dict() self.error = None self.stack_trace = None self.request_body = None self.response_body = None self.response_headers = None self.status_code = None self.success = None
def log_exception(self, exc_info): self.logger.error(""" Path: %s HTTP Method: %s Client IP Address: %s User Agent: %s User Platform: %s User Browser: %s User Browser Version: %s GET args: %s view args: %s URL: %s """ % ( request.path, request.method, request.remote_addr, request.user_agent.string, request.user_agent.platform, request.user_agent.browser, request.user_agent.version, dict(request.args), request.view_args, request.url ), exc_info=exc_info)
def bucket_required(f): """ Decorator to ensure that a valid bucket id is sent in the url path parameters :param f: :return: """ @wraps(f) def decorated_function(*args, **kwargs): bucket_id_ = request.view_args['bucket_id'] try: int(bucket_id_) except ValueError: return response('failed', 'Provide a valid Bucket Id', 401) return f(*args, **kwargs) return decorated_function
def _auto_update(self, mapper=None): cls = self.__class__ if mapper is None: mapper = class_mapper(cls) for col in mapper.columns: # this order is the listed order except that hybrid properties go first api_info = cls._get_api_info(col.name) set_by = api_info['set_by'] if set_by == 'json': if col.name in g.fields: val = g.fields.get(col.name) self._set_field_value(col.name, val) elif set_by == 'url': if col.name in request.view_args: val = request.view_args.get(col.name) self._set_field_value(col.name, val) elif set_by == 'server': # important not to let it try to set it from json # requires a @setter decorated function self._set_field_value(name=col.name, try_auto=False)
def get_obj(self): args = request.args.to_dict() args.update({self.pk_field: request.view_args[self.pk_url_kwarg]}) self.builder = PeeweeQueryBuilder(self.model, args) try: obj = self.builder.build().get() except self.model.DoesNotExist: obj = None return obj
def to_dict(self): dct = dict( operation=self.operation, func=self.func, method=self.method, **self.timing ) if self.options.include_path and self.view_args: dct.update({ key: value for key, value in self.view_args.items() }) if self.options.include_query_string and self.args: dct.update({ key: values[0] for key, values in self.args.lists() if len(values) == 1 and is_uuid(values[0]) }) if self.request_context is not None: dct.update(self.request_context()) if self.success is True: dct.update( success=self.success, status_code=self.status_code, ) if self.success is False: dct.update( success=self.success, message=extract_error_message(self.error)[:2048], context=extract_context(self.error), stack_trace=self.stack_trace, status_code=self.status_code, ) self.post_process_request_body(dct) self.post_process_response_body(dct) self.post_process_response_headers(dct) return dct
def api_audit_log(response): """Saves information about the request in the ``audit_log`` :param response: Server :class:`~flask.Response` :return: :class:`~flask.Response` """ kwargs = { 'module': api.name, 'user': current_user.name, 'email': current_user.email, 'action': _HTTP_METHOD_TO_AUDIT_MAP[request.method.lower()], 'data': addslashes(request.data.decode()), 'url': request.url, 'endpoint': request.endpoint, 'ip': request.remote_addr, 'status': response.status, 'timestamp': datetime.datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S') } if not request.view_args and request.method.lower() == 'put': kwargs['action'] = _HTTP_METHOD_TO_AUDIT_MAP['post'] entry = [] for k, v in kwargs.items(): entry.append('{0!s}="{1!s}"'.format(k, v)) entry = ' '.join(entry) current_app.audit_log.info('{0!s}'.format(entry)) return response
def cp_audit_log(response): """Saves information about the request in the ``audit_log`` :param response: Server :class:`~flask.Response` :return: :class:`~flask.Response` """ try: jdata = json.loads(request.data.decode()) if 'password' in jdata: jdata['password'] = '*********' jdata_str = json.dumps(jdata) except ValueError: jdata_str = '' kwargs = { 'module': cp.name, 'user': g.user.name, 'email': g.user.email, 'action': _HTTP_METHOD_TO_AUDIT_MAP[request.method.lower()], 'data': addslashes(jdata_str), 'url': request.url, 'endpoint': request.endpoint, 'ip': request.remote_addr, 'status': response.status, 'timestamp': datetime.datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S') } if not request.view_args and request.method.lower() == 'put': kwargs['action'] = _HTTP_METHOD_TO_AUDIT_MAP['post'] entry = [] for k, v in kwargs.items(): entry.append('{0!s}="{1!s}"'.format(k, v)) entry = ' '.join(entry) current_app.audit_log.info('{0!s}'.format(entry)) return response
def __init__(self, request, has_json): self.request = request self.query = request.args self.path = request.view_args self.headers = request.headers if has_json: self._json = self.request.get_json(force=True)
def user_cache_key(): """?path paramaters???userid """ return 'cache::user::{}'.format(request.view_args['id'])
def validate_request(typ): validate_method(request.method, typ['methods'], P_REQUEST) validate_route(request.view_args, typ['route'], P_REQUEST) validate_json(request.get_json(), typ['schema'], P_REQUEST, key='')
def build_url(self, **kwargs): arg = request.args.copy() view_args = request.view_args arg.update(view_args) for attr in kwargs.keys(): if attr in arg: arg.pop(attr) arg.update(kwargs.items()) rule = request.url_rule result = rule.build(arg) return result[1]
def before_request(): metadata = get_metadata(current_user) if metadata: logger.bind(tx_id=metadata['tx_id']) values = request.view_args logger.bind(eq_id=values['eq_id'], form_type=values['form_type'], ce_id=values['collection_id']) logger.info('questionnaire request', method=request.method, url_path=request.full_path) if metadata: g.schema_json = load_schema_from_metadata(metadata) _check_same_survey(values['eq_id'], values['form_type'], values['collection_id'])
def _check_etag_and_lastmodified_for_i18n(): locale = request.view_args["locale"] domain = request.view_args["domain"] etag_ok = util.flask.check_etag(_compute_etag_for_i18n(request.view_args["locale"], request.view_args["domain"])) lastmodified = _compute_date_for_i18n(locale, domain) lastmodified_ok = lastmodified is None or util.flask.check_lastmodified(lastmodified) return etag_ok and lastmodified_ok
def _dict(self): mydict = {} # manage timing mydict['timing'] = {} mydict['timing']['delta'] = self.timing mydict['timing']['start'] = self.request._stats_start_event mydict['timing']['asctime'] = asctime(gmtime(self.request._stats_start_event)) # manage flask mydict['flask'] = {} mydict['flask']['secret_key'] = current_app.config['SECRET_KEY'] mydict['flask']['server_name'] = current_app.config['SERVER_NAME'] mydict['flask']['session_cookie_name'] = current_app.config['SESSION_COOKIE_NAME'] mydict['flask']['session_cookie_domain'] = current_app.config['SESSION_COOKIE_DOMAIN'] mydict['flask']['session_cookie_path'] = current_app.config['SESSION_COOKIE_PATH'] mydict['flask']['session_cookie_httponly'] = current_app.config['SESSION_COOKIE_HTTPONLY'] mydict['flask']['session_cookie_secure'] = current_app.config['SESSION_COOKIE_SECURE'] mydict['flask']['session_refresh_each_request'] = current_app.config['SESSION_REFRESH_EACH_REQUEST'] # manage request mydict['request'] = {} mydict['request']['url'] = request.url mydict['request']['args'] = {arg: request.args.get(arg) for arg in request.args} mydict['request']['view_args'] = request.view_args mydict['request']['path'] = request.path mydict['request']['method'] = request.method mydict['request']['remote_addr'] = request.remote_addr try: mydict['request']['rule'] = request.url_rule.rule except: mydict['request']['rule'] = '' #manage response mydict['response'] = {} mydict['response']['status_code'] = self.response.status_code mydict['response']['headers'] = { i:j for i,j in self.response.headers} return mydict
def make_proxy_method(cls, name): """ Creates a proxy function that can be used by Flasks routing. The proxy instantiates the FlaskView subclass and calls the appropriate method. :param name: the name of the method to create a proxy for """ i = cls() view = getattr(i, name) if cls.__decorators__: for decorator in cls.__decorators__: view = decorator(view) @functools.wraps(view) def proxy(**forgettable_view_args): # Always use the global request object's view_args, because they # can be modified by intervening function before an endpoint or # wrapper gets called. This matches Flask's behavior. del forgettable_view_args response = view(**request.view_args) if not isinstance(response, Response): response = make_response(response) return response return proxy
def create_issue(content, author, location='Discord', repo='PennyDreadfulMTG/Penny-Dreadful-Tools'): if content is None or content == '': return None body = '' if '\n' in content: title, body = content.split('\n', 1) body += '\n\n' else: title = content body += 'Reported on {location} by {author}'.format(location=location, author=author) if request: body += textwrap.dedent(""" -------------------------------------------------------------------------------- Request Method: {method} Path: {full_path} Cookies: {cookies} Endpoint: {endpoint} View Args: {view_args} Person: {id} User-Agent: {user_agent} Referrer: {referrer} """.format(method=request.method, full_path=request.full_path, cookies=request.cookies, endpoint=request.endpoint, view_args=request.view_args, id=session.get('id', 'logged_out'), user_agent=request.headers.get('User-Agent'), referrer=request.referrer)) print(title + '\n' + body) # Only check for github details at the last second to get log output even if github not configured. if not configuration.get('github_user') or not configuration.get('github_password'): return None g = Github(configuration.get('github_user'), configuration.get('github_password')) repo = g.get_repo(repo) issue = repo.create_issue(title=title, body=body) return issue
def get_object(self): query = self.get_query() query = self.filter_query(query) model_class = self.get_model_class() id_type = inspect(model_class).columns['id'].type obj_id = request.view_args['id'] if isinstance(id_type, Integer): try: obj_id = int(obj_id) except ValueError: raise NotFound() elif isinstance(id_type, postgresql.UUID): try: # Check the ID is a valid UUID so don't try # to query with an invalid UUID. uuid.UUID(obj_id) except ValueError: raise NotFound() query = query.filter(model_class.id == obj_id) try: obj = query.one() except NoResultFound: raise NotFound() self.check_object_permissions(obj) return obj
def get(self, *args, **kwargs): """Retrieve a collection of objects""" self.before_get(args, kwargs) qs = QSManager(request.args, self.schema) objects_count, objects = self._data_layer.get_collection(qs, kwargs) schema_kwargs = getattr(self, 'get_schema_kwargs', dict()) schema_kwargs.update({'many': True}) schema = compute_schema(self.schema, schema_kwargs, qs, qs.include) result = schema.dump(objects).data view_kwargs = request.view_args if getattr(self, 'view_kwargs', None) is True else dict() add_pagination_links(result, objects_count, qs, url_for(self.view, **view_kwargs)) result.update({'meta': {'count': objects_count}}) self.after_get(result) return result