我们从Python开源项目中,提取了以下29个代码示例,用于说明如何使用sanic.response.HTTPResponse()。
def add_swagger_api_route(app, target_route, swagger_json_route): """ mount a swagger statics page. app: the sanic app object target_route: the path to mount the statics page. swagger_json_route: the path where the swagger json definitions is expected to be. """ static_root = get_swagger_static_root() swagger_body = generate_swagger_html( STATIC_ROOT, swagger_json_route ).encode("utf-8") async def swagger_ui(request): return HTTPResponse(body_bytes=swagger_body, content_type="text/html") bp = Blueprint('swagger') bp.static(STATIC_ROOT, static_root) app.add_route(swagger_ui, target_route, methods=["GET"]) app.blueprint(bp)
def create_swagger_json_handler(app, **kwargs): """ Create a handler that returns the swagger definition for an application. This method assumes the application is using the TransmuteUrlDispatcher as the router. """ spec = get_swagger_spec(app) _add_blueprint_specs(app, spec) spec_dict = spec.swagger_definition(**kwargs) encoded_spec = json.dumps(spec_dict).encode("UTF-8") async def swagger(request): return HTTPResponse( body_bytes=encoded_spec, headers={ "Access-Control-Allow-Origin": "*" }, content_type="application/json", ) return swagger
def test_middleware_response(): app = Sanic('test_middleware_response') results = [] @app.middleware('request') async def process_response(request): results.append(request) @app.middleware('response') async def process_response(request, response): results.append(request) results.append(response) @app.route('/') async def handler(request): return text('OK') request, response = sanic_endpoint_test(app) assert response.text == 'OK' assert type(results[0]) is Request assert type(results[1]) is Request assert issubclass(type(results[2]), HTTPResponse)
def process_preflight(self, request): """ Preflight request support for apollo-client https://www.w3.org/TR/cors/#resource-preflight-requests """ origin = request.headers.get('Origin', '') method = request.headers.get('Access-Control-Request-Method', '').upper() if method and method in self.methods: return HTTPResponse( status=200, headers={ 'Access-Control-Allow-Origin': origin, 'Access-Control-Allow-Methods': ', '.join(self.methods), 'Access-Control-Max-Age': str(self.max_age), } ) else: return HTTPResponse( status=400, )
def test_file_head_response(file_name, static_file_directory): app = Sanic('test_file_helper') @app.route('/files/<filename>', methods=['GET', 'HEAD']) async def file_route(request, filename): file_path = os.path.join(static_file_directory, filename) file_path = os.path.abspath(unquote(file_path)) stats = await async_os.stat(file_path) headers = dict() headers['Accept-Ranges'] = 'bytes' headers['Content-Length'] = str(stats.st_size) if request.method == "HEAD": return HTTPResponse( headers=headers, content_type=guess_type(file_path)[0] or 'text/plain') else: return file(file_path, headers=headers, mime_type=guess_type(file_path)[0] or 'text/plain') request, response = app.test_client.head('/files/{}'.format(file_name)) assert response.status == 200 assert 'Accept-Ranges' in response.headers assert 'Content-Length' in response.headers assert int(response.headers[ 'Content-Length']) == len( get_file_content(static_file_directory, file_name))
def test_middleware_response(): app = Sanic('test_middleware_response') results = [] @app.middleware('request') async def process_response(request): results.append(request) @app.middleware('response') async def process_response(request, response): results.append(request) results.append(response) @app.route('/') async def handler(request): return text('OK') request, response = app.test_client.get('/') assert response.text == 'OK' assert type(results[0]) is Request assert type(results[1]) is Request assert isinstance(results[2], HTTPResponse)
def log_response(self, response): if self.access_log: extra = { 'status': getattr(response, 'status', 0), } if isinstance(response, HTTPResponse): extra['byte'] = len(response.body) else: extra['byte'] = -1 extra['host'] = 'UNKNOWN' if self.request is not None: if self.request.ip: extra['host'] = '{0[0]}:{0[1]}'.format(self.request.ip) extra['request'] = '{0} {1}'.format(self.request.method, self.request.url) else: extra['request'] = 'nil' access_logger.info('', extra=extra)
def route_wrapper(self, route, req, context, request_args, request_kw, *decorator_args, **decorator_kw): _ = decorator_kw.pop('with_context') # ignore this. _options = decorator_kw options = get_cors_options(context.app, _options) if options.get('automatic_options') and req.method == 'OPTIONS': resp = response.HTTPResponse() else: resp = route(req, *request_args, **request_kw) if resp is not None: while isawaitable(resp): resp = await resp if resp is not None: request_context = context.request[id(req)] set_cors_headers(req, resp, context, options) request_context[SANIC_CORS_EVALUATED] = "1" return resp
def _make_cors_request_middleware_function(plugin, debug): @plugin.middleware(relative="pre", attach_to='request', with_context=True) def cors_request_middleware(req, context): if req.method == 'OPTIONS': try: path = req.path except AttributeError: path = req.url resources = context.resources for res_regex, res_options in resources: if res_options.get('automatic_options') and try_match(path, res_regex): debug("Request to '{:s}' matches CORS resource '{}'. " "Using options: {}".format( path, get_regexp_pattern(res_regex), res_options)) resp = response.HTTPResponse() set_cors_headers(req, resp, context, res_options) return resp else: debug('No CORS rule matches') return cors_request_middleware
def setUp(self): self.app = Sanic(__name__) @self.app.route('/', methods=['GET', 'HEAD', 'OPTIONS']) @cross_origin(self.app) def wildcard(request): return text('Welcome!') @self.app.route('/test_consistent_origin', methods=['GET', 'HEAD', 'OPTIONS']) @cross_origin(self.app, origins='http://foo.com') def test_consistent(request): return text('Welcome!') @self.app.route('/test_vary', methods=['GET', 'HEAD', 'OPTIONS']) @cross_origin(self.app, origins=["http://foo.com", "http://bar.com"]) def test_vary(request): return text('Welcome!') @self.app.route('/test_existing_vary_headers') @cross_origin(self.app, origins=["http://foo.com", "http://bar.com"]) def test_existing_vary_headers(request): return HTTPResponse('', status=200, headers=CIDict({'Vary': 'Accept-Encoding'}))
def render_template(template_name, request, context, *, app_key=APP_KEY, encoding='utf-8', headers=None, status=200): """ Return sanic.response.Response which contains template template_name filled with context. Returned response has Content-Type header set to 'text/html'. :param template_name: template name. :param request: a parameter from web-handler, sanic.request.Request instance. :param context: context for rendering. :param encoding: response encoding, 'utf-8' by default. :param status: HTTP status code for returned response, 200 (OK) by default. :param app_key: a optional key for application instance. If not provided, default value will be used. """ if context is None: context = {} text = render_string(template_name, request, context, app_key=app_key) content_type = "text/html; charset={encoding}".format(encoding=encoding) return HTTPResponse( text, status=status, headers=headers, content_type=content_type )
def create_handler(transmute_func, context): @wraps(transmute_func.raw_func) async def handler(request, *args, **kwargs): exc, result = None, None try: args, kwargs = await extract_params(request, context, transmute_func) result = await transmute_func.raw_func(*args, **kwargs) except SanicException as se: code = se.status_code or 400 exc = APIException(message=str(se), code=code) except Exception as e: exc = e content_type = request.headers.get("Content-Type", DEFAULT_HTTP_CONTENT_TYPE) response = transmute_func.process_result( context, result, exc, content_type ) return HTTPResponse( status=response["code"], content_type=response["content-type"], headers=response["headers"], body_bytes=response["body"], ) handler.transmute_func = transmute_func return handler
def test_response_body_not_a_string(): """Test when a response body sent from the application is not a string""" app = Sanic('response_body_not_a_string') random_num = choice(range(1000)) @app.route('/hello') async def hello_route(request): return HTTPResponse(body=random_num) request, response = sanic_endpoint_test(app, uri='/hello') assert response.text == str(random_num)
def test_with_middleware_response(): app = Sanic('test_with_middleware_response') results = [] @app.middleware('request') async def process_response(request): results.append(request) @app.middleware('response') async def process_response(request, response): results.append(request) results.append(response) class DummyView(HTTPMethodView): def get(self, request): return text('I am get method') app.add_route(DummyView(), '/') request, response = sanic_endpoint_test(app) assert response.text == 'I am get method' assert type(results[0]) is Request assert type(results[1]) is Request assert issubclass(type(results[2]), HTTPResponse)
def make_response(status, headers, payload): """This function generates an appropriate response object for this async mode. """ headers_dict = {} content_type = None for h in headers: if h[0].lower() == 'content-type': content_type = h[1] else: headers_dict[h[0]] = h[1] return HTTPResponse(body_bytes=payload, content_type=content_type, status=int(status.split()[0]), headers=headers_dict)
def test_response_body_not_a_string(): """Test when a response body sent from the application is not a string""" app = Sanic('response_body_not_a_string') random_num = choice(range(1000)) @app.route('/hello') async def hello_route(request): return HTTPResponse(body=random_num) request, response = app.test_client.get('/hello') assert response.text == str(random_num)
def test_file_stream_head_response(file_name, static_file_directory): app = Sanic('test_file_helper') @app.route('/files/<filename>', methods=['GET', 'HEAD']) async def file_route(request, filename): file_path = os.path.join(static_file_directory, filename) file_path = os.path.abspath(unquote(file_path)) headers = dict() headers['Accept-Ranges'] = 'bytes' if request.method == "HEAD": # Return a normal HTTPResponse, not a # StreamingHTTPResponse for a HEAD request stats = await async_os.stat(file_path) headers['Content-Length'] = str(stats.st_size) return HTTPResponse( headers=headers, content_type=guess_type(file_path)[0] or 'text/plain') else: return file_stream(file_path, chunk_size=32, headers=headers, mime_type=guess_type(file_path)[0] or 'text/plain') request, response = app.test_client.head('/files/{}'.format(file_name)) assert response.status == 200 # A HEAD request should never be streamed/chunked. if 'Transfer-Encoding' in response.headers: assert response.headers['Transfer-Encoding'] != "chunked" assert 'Accept-Ranges' in response.headers # A HEAD request should get the Content-Length too assert 'Content-Length' in response.headers assert int(response.headers[ 'Content-Length']) == len( get_file_content(static_file_directory, file_name))
def test_with_middleware_response(): app = Sanic('test_with_middleware_response') results = [] @app.middleware('request') async def process_response(request): results.append(request) @app.middleware('response') async def process_response(request, response): results.append(request) results.append(response) class DummyView(HTTPMethodView): def get(self, request): return text('I am get method') app.add_route(DummyView.as_view(), '/') request, response = app.test_client.get('/') assert response.text == 'I am get method' assert type(results[0]) is Request assert type(results[1]) is Request assert isinstance(results[2], HTTPResponse)
def test_dont_register_system_signals(): """Test if sanic don't register system signals""" app = Sanic('test_register_system_signals') @app.route('/hello') async def hello_route(request): return HTTPResponse() app.listener('after_server_start')(stop) app.listener('before_server_start')(set_loop) app.listener('after_server_stop')(after) app.run(HOST, PORT, register_sys_signals=False) assert calledq.get() == False
def write_response(self, response): """ Writes response content synchronously to the transport. """ if self._response_timeout_handler: self._response_timeout_handler.cancel() self._response_timeout_handler = None try: keep_alive = self.keep_alive self.transport.write( response.output( self.request.version, keep_alive, self.keep_alive_timeout)) self.log_response(response) except AttributeError: logger.error('Invalid response object for url %s, ' 'Expected Type: HTTPResponse, Actual Type: %s', self.url, type(response)) self.write_error(ServerError('Invalid response type')) except RuntimeError: if self._debug: logger.error('Connection lost before response written @ %s', self.request.ip) keep_alive = False except Exception as e: self.bail_out( "Writing response failed, connection closed {}".format( repr(e))) finally: if not keep_alive: self.transport.close() self.transport = None else: self._keep_alive_timeout_handler = self.loop.call_later( self.keep_alive_timeout, self.keep_alive_timeout_callback) self._last_response_time = current_time self.cleanup()
def stream_response(self, response): """ Streams a response to the client asynchronously. Attaches the transport to the response so the response consumer can write to the response as needed. """ if self._response_timeout_handler: self._response_timeout_handler.cancel() self._response_timeout_handler = None try: keep_alive = self.keep_alive response.transport = self.transport await response.stream( self.request.version, keep_alive, self.keep_alive_timeout) self.log_response(response) except AttributeError: logger.error('Invalid response object for url %s, ' 'Expected Type: HTTPResponse, Actual Type: %s', self.url, type(response)) self.write_error(ServerError('Invalid response type')) except RuntimeError: if self._debug: logger.error('Connection lost before response written @ %s', self.request.ip) keep_alive = False except Exception as e: self.bail_out( "Writing response failed, connection closed {}".format( repr(e))) finally: if not keep_alive: self.transport.close() self.transport = None else: self._keep_alive_timeout_handler = self.loop.call_later( self.keep_alive_timeout, self.keep_alive_timeout_callback) self._last_response_time = current_time self.cleanup()
def setUp(self): self.app = Sanic(__name__) CORS(self.app) @self.app.route('/', methods=['GET', 'HEAD', 'OPTIONS']) def index(request): return HTTPResponse(body='Welcome', headers={"custom": "dictionary"})
def setUp(self): self.app = Sanic(__name__) @self.app.route('/test_multiple_set_cookie_headers') @cross_origin(self.app) def test_multiple_set_cookie_headers(request): resp = HTTPResponse(body="Foo bar baz") resp.headers = CIDict() resp.headers['set-cookie'] = 'foo' resp.headers['set-cookie'] = 'bar' return resp
def handle_resource(request: Request, resource: str, sub: str) -> HTTPResponse: """ This route is a wrapper of :class:`elizabeth.Generic`. It is used to serve different data over the REST API. Args: request: Sanic's request instance resource: first part of the url, name of elizabeth's resource sub: subname of the elizabeth's resource Returns: HTTPResponse: Sanic's response with json body, like: `{'data': 'some string'}` """ language = request.args.get('lang', settings.SC_DEFAULT_LANGUAGE) g = Generic(language) try: obj = getattr(g, resource) obj = getattr(obj, sub) except AttributeError: # This means that one of the attributes # was not found, raise 404: raise NotFound('This resource does not exist') return response.json({'data': obj()})
def template(template_name, *, app_key=APP_KEY, encoding='utf-8', headers=None, status=200): """ Decorate web-handler to convert returned dict context into sanic.response.Response filled with template_name template. :param template_name: template name. :param request: a parameter from web-handler, sanic.request.Request instance. :param context: context for rendering. :param encoding: response encoding, 'utf-8' by default. :param status: HTTP status code for returned response, 200 (OK) by default. :param app_key: a optional key for application instance. If not provided, default value will be used. """ def wrapper(func): @functools.wraps(func) async def wrapped(*args, **kwargs): if asyncio.iscoroutinefunction(func): coro = func else: coro = asyncio.coroutine(func) context = await coro(*args, **kwargs) if isinstance(context, HTTPResponse): return context if isinstance(args[0], HTTPMethodView): request = args[1] else: request = args[0] return render_template(template_name, request, context, app_key=app_key, encoding=encoding) return wrapped return wrapper
def dispatch_request(self, request, *args, **kwargs): try: request_method = request.method.lower() data = self.parse_body(request) show_graphiql = request_method == 'get' and self.should_display_graphiql(request) catch = show_graphiql pretty = self.pretty or show_graphiql or request.args.get('pretty') if request_method != 'options': execution_results, all_params = run_http_query( self.schema, request_method, data, query_data=request.args, batch_enabled=self.batch, catch=catch, # Execute options return_promise=self._enable_async, root_value=self.get_root_value(request), context_value=self.get_context(request), middleware=self.get_middleware(request), executor=self.get_executor(request), ) awaited_execution_results = await Promise.all(execution_results) result, status_code = encode_execution_results( awaited_execution_results, is_batch=isinstance(data, list), format_error=self.format_error, encode=partial(self.encode, pretty=pretty) ) if show_graphiql: return await self.render_graphiql( params=all_params[0], result=result ) return HTTPResponse( result, status=status_code, content_type='application/json' ) else: return self.process_preflight(request) except HttpQueryError as e: return HTTPResponse( self.encode({ 'errors': [default_format_error(e)] }), status=e.status_code, headers=e.headers, content_type='application/json' ) # noinspection PyBroadException
def response_filter(view): @wraps(view) async def wrapper(*args, **kwargs): request = args[1] resp = view(*args, **kwargs) from inspect import isawaitable if isawaitable(resp): resp = await resp if isinstance(resp, HTTPResponse): return resp endpoint = _path_to_endpoint(request.uri_template) method = request.method if method == 'HEAD': method = 'GET' filter = filters.get((endpoint, method), None) if not filter: return resp headers = None status = None if isinstance(resp, tuple): resp, status, headers = unpack(resp) if len(filter) == 1: if six.PY3: status = list(filter.keys())[0] else: status = filter.keys()[0] schemas = filter.get(status) if not schemas: # return resp, status, headers raise ServerError('undefined', message='`%d` is not a defined status code.' % status) resp, errors = normalize(schemas['schema'], resp) if schemas['headers']: headers, header_errors = normalize( {'properties': schemas['headers']}, headers) errors.extend(header_errors) if errors: error_code = errors[0].get('name', 'ExpectationFailed') raise ServerError(error_code, errors=errors) return response.json( resp, status=status, headers=headers, ) return wrapper
def response_filter(view): @wraps(view) async def wrapper(*args, **kwargs): request = args[1] resp = view(*args, **kwargs) from inspect import isawaitable if isawaitable(resp): resp = await resp if isinstance(resp, HTTPResponse): return resp endpoint = _path_to_endpoint(request.uri_template) method = request.method if method == 'HEAD': method = 'GET' filter = filters.get((endpoint, method), None) if not filter: return resp headers = None status = None if isinstance(resp, tuple): resp, status, headers = unpack(resp) if len(filter) == 1: if six.PY3: status = list(filter.keys())[0] else: status = filter.keys()[0] schemas = filter.get(status) if not schemas: # return resp, status, headers raise ServerError('`%d` is not a defined status code.' % status, 500) resp, errors = normalize(schemas['schema'], resp) if schemas['headers']: headers, header_errors = normalize( {'properties': schemas['headers']}, headers) errors.extend(header_errors) if errors: raise ServerError('Expectation Failed', 500) return response.json( resp, status=status, headers=headers, ) return wrapper