Python sanic.response 模块,HTTPResponse() 实例源码

我们从Python开源项目中,提取了以下29个代码示例,用于说明如何使用sanic.response.HTTPResponse()

项目:sanic-transmute    作者:yunstanford    | 项目源码 | 文件源码
def add_swagger_api_route(app, target_route, swagger_json_route):
    """
    mount a swagger statics page.
    app: the sanic app object
    target_route: the path to mount the statics page.
    swagger_json_route: the path where the swagger json definitions is
                        expected to be.
    """
    static_root = get_swagger_static_root()
    swagger_body = generate_swagger_html(
        STATIC_ROOT, swagger_json_route
    ).encode("utf-8")

    async def swagger_ui(request):
        return HTTPResponse(body_bytes=swagger_body, content_type="text/html")

    bp = Blueprint('swagger')
    bp.static(STATIC_ROOT, static_root)

    app.add_route(swagger_ui, target_route, methods=["GET"])
    app.blueprint(bp)
项目:sanic-transmute    作者:yunstanford    | 项目源码 | 文件源码
def create_swagger_json_handler(app, **kwargs):
    """
    Create a handler that returns the swagger definition
    for an application.
    This method assumes the application is using the
    TransmuteUrlDispatcher as the router.
    """

    spec = get_swagger_spec(app)
    _add_blueprint_specs(app, spec)
    spec_dict = spec.swagger_definition(**kwargs)
    encoded_spec = json.dumps(spec_dict).encode("UTF-8")

    async def swagger(request):
        return HTTPResponse(
            body_bytes=encoded_spec,
            headers={
                "Access-Control-Allow-Origin": "*"
            },
            content_type="application/json",
        )

    return swagger
项目:annotated-py-sanic    作者:hhstore    | 项目源码 | 文件源码
def test_middleware_response():
    app = Sanic('test_middleware_response')

    results = []

    @app.middleware('request')
    async def process_response(request):
        results.append(request)

    @app.middleware('response')
    async def process_response(request, response):
        results.append(request)
        results.append(response)

    @app.route('/')
    async def handler(request):
        return text('OK')

    request, response = sanic_endpoint_test(app)

    assert response.text == 'OK'
    assert type(results[0]) is Request
    assert type(results[1]) is Request
    assert issubclass(type(results[2]), HTTPResponse)
项目:sanic-python-web-server    作者:kimjoseph95    | 项目源码 | 文件源码
def test_middleware_response():
    app = Sanic('test_middleware_response')

    results = []

    @app.middleware('request')
    async def process_response(request):
        results.append(request)

    @app.middleware('response')
    async def process_response(request, response):
        results.append(request)
        results.append(response)

    @app.route('/')
    async def handler(request):
        return text('OK')

    request, response = sanic_endpoint_test(app)

    assert response.text == 'OK'
    assert type(results[0]) is Request
    assert type(results[1]) is Request
    assert issubclass(type(results[2]), HTTPResponse)
项目:sanic-graphql    作者:graphql-python    | 项目源码 | 文件源码
def process_preflight(self, request):
        """ Preflight request support for apollo-client
        https://www.w3.org/TR/cors/#resource-preflight-requests """
        origin = request.headers.get('Origin', '')
        method = request.headers.get('Access-Control-Request-Method', '').upper()

        if method and method in self.methods:
            return HTTPResponse(
                status=200,
                headers={
                    'Access-Control-Allow-Origin': origin,
                    'Access-Control-Allow-Methods': ', '.join(self.methods),
                    'Access-Control-Max-Age': str(self.max_age),
                }
            )
        else:
            return HTTPResponse(
                status=400,
            )
项目:sanic    作者:channelcat    | 项目源码 | 文件源码
def test_file_head_response(file_name, static_file_directory):
    app = Sanic('test_file_helper')
    @app.route('/files/<filename>', methods=['GET', 'HEAD'])
    async def file_route(request, filename):
        file_path = os.path.join(static_file_directory, filename)
        file_path = os.path.abspath(unquote(file_path))
        stats = await async_os.stat(file_path)
        headers = dict()
        headers['Accept-Ranges'] = 'bytes'
        headers['Content-Length'] = str(stats.st_size)
        if request.method == "HEAD":
            return HTTPResponse(
                headers=headers,
                content_type=guess_type(file_path)[0] or 'text/plain')
        else:
            return file(file_path, headers=headers,
                        mime_type=guess_type(file_path)[0] or 'text/plain')

    request, response = app.test_client.head('/files/{}'.format(file_name))
    assert response.status == 200
    assert 'Accept-Ranges' in response.headers
    assert 'Content-Length' in response.headers
    assert int(response.headers[
               'Content-Length']) == len(
                   get_file_content(static_file_directory, file_name))
项目:sanic    作者:channelcat    | 项目源码 | 文件源码
def test_middleware_response():
    app = Sanic('test_middleware_response')

    results = []

    @app.middleware('request')
    async def process_response(request):
        results.append(request)

    @app.middleware('response')
    async def process_response(request, response):
        results.append(request)
        results.append(response)

    @app.route('/')
    async def handler(request):
        return text('OK')

    request, response = app.test_client.get('/')

    assert response.text == 'OK'
    assert type(results[0]) is Request
    assert type(results[1]) is Request
    assert isinstance(results[2], HTTPResponse)
项目:sanic    作者:channelcat    | 项目源码 | 文件源码
def log_response(self, response):
        if self.access_log:
            extra = {
                'status': getattr(response, 'status', 0),
            }

            if isinstance(response, HTTPResponse):
                extra['byte'] = len(response.body)
            else:
                extra['byte'] = -1

            extra['host'] = 'UNKNOWN'
            if self.request is not None:
                if self.request.ip:
                    extra['host'] = '{0[0]}:{0[1]}'.format(self.request.ip)

                extra['request'] = '{0} {1}'.format(self.request.method,
                                                    self.request.url)
            else:
                extra['request'] = 'nil'

            access_logger.info('', extra=extra)
项目:sanic-cors    作者:ashleysommer    | 项目源码 | 文件源码
def route_wrapper(self, route, req, context, request_args, request_kw,
                            *decorator_args, **decorator_kw):
        _ = decorator_kw.pop('with_context')  # ignore this.
        _options = decorator_kw
        options = get_cors_options(context.app, _options)
        if options.get('automatic_options') and req.method == 'OPTIONS':
            resp = response.HTTPResponse()
        else:
            resp = route(req, *request_args, **request_kw)
            if resp is not None:
                while isawaitable(resp):
                    resp = await resp
        if resp is not None:
            request_context = context.request[id(req)]
            set_cors_headers(req, resp, context, options)
            request_context[SANIC_CORS_EVALUATED] = "1"
        return resp
项目:sanic-cors    作者:ashleysommer    | 项目源码 | 文件源码
def _make_cors_request_middleware_function(plugin, debug):
    @plugin.middleware(relative="pre", attach_to='request', with_context=True)
    def cors_request_middleware(req, context):
        if req.method == 'OPTIONS':
            try:
                path = req.path
            except AttributeError:
                path = req.url
            resources = context.resources
            for res_regex, res_options in resources:
                if res_options.get('automatic_options') and try_match(path, res_regex):
                    debug("Request to '{:s}' matches CORS resource '{}'. "
                          "Using options: {}".format(
                           path, get_regexp_pattern(res_regex), res_options))
                    resp = response.HTTPResponse()
                    set_cors_headers(req, resp, context, res_options)
                    return resp
            else:
                debug('No CORS rule matches')
    return cors_request_middleware
项目:sanic-cors    作者:ashleysommer    | 项目源码 | 文件源码
def setUp(self):
        self.app = Sanic(__name__)

        @self.app.route('/', methods=['GET', 'HEAD', 'OPTIONS'])
        @cross_origin(self.app)
        def wildcard(request):
            return text('Welcome!')

        @self.app.route('/test_consistent_origin', methods=['GET', 'HEAD', 'OPTIONS'])
        @cross_origin(self.app, origins='http://foo.com')
        def test_consistent(request):
            return text('Welcome!')

        @self.app.route('/test_vary', methods=['GET', 'HEAD', 'OPTIONS'])
        @cross_origin(self.app, origins=["http://foo.com", "http://bar.com"])
        def test_vary(request):
            return text('Welcome!')

        @self.app.route('/test_existing_vary_headers')
        @cross_origin(self.app, origins=["http://foo.com", "http://bar.com"])
        def test_existing_vary_headers(request):
            return HTTPResponse('', status=200, headers=CIDict({'Vary': 'Accept-Encoding'}))
项目:jinja2-sanic    作者:yunstanford    | 项目源码 | 文件源码
def render_template(template_name, request, context, *,
                    app_key=APP_KEY, encoding='utf-8',
                    headers=None, status=200):
    """
    Return sanic.response.Response which contains template template_name filled with context.
    Returned response has Content-Type header set to 'text/html'.

    :param template_name: template name.
    :param request: a parameter from web-handler, sanic.request.Request instance.
    :param context: context for rendering.
    :param encoding: response encoding, 'utf-8' by default.
    :param status: HTTP status code for returned response, 200 (OK) by default.
    :param app_key: a optional key for application instance. If not provided,
                    default value will be used.
    """
    if context is None:
        context = {}

    text = render_string(template_name, request, context, app_key=app_key)
    content_type = "text/html; charset={encoding}".format(encoding=encoding)

    return HTTPResponse(
        text, status=status, headers=headers,
        content_type=content_type
    )
项目:sanic-transmute    作者:yunstanford    | 项目源码 | 文件源码
def create_handler(transmute_func, context):

    @wraps(transmute_func.raw_func)
    async def handler(request, *args, **kwargs):
        exc, result = None, None
        try:
            args, kwargs = await extract_params(request, context,
                                                transmute_func)
            result = await transmute_func.raw_func(*args, **kwargs)
        except SanicException as se:
            code = se.status_code or 400
            exc = APIException(message=str(se), code=code)
        except Exception as e:
            exc = e
        content_type = request.headers.get("Content-Type", DEFAULT_HTTP_CONTENT_TYPE)
        response = transmute_func.process_result(
            context, result, exc, content_type
        )
        return HTTPResponse(
            status=response["code"],
            content_type=response["content-type"],
            headers=response["headers"],
            body_bytes=response["body"],
        )
    handler.transmute_func = transmute_func
    return handler
项目:annotated-py-sanic    作者:hhstore    | 项目源码 | 文件源码
def test_response_body_not_a_string():
    """Test when a response body sent from the application is not a string"""
    app = Sanic('response_body_not_a_string')
    random_num = choice(range(1000))

    @app.route('/hello')
    async def hello_route(request):
        return HTTPResponse(body=random_num)

    request, response = sanic_endpoint_test(app, uri='/hello')
    assert response.text == str(random_num)
项目:annotated-py-sanic    作者:hhstore    | 项目源码 | 文件源码
def test_with_middleware_response():
    app = Sanic('test_with_middleware_response')

    results = []

    @app.middleware('request')
    async def process_response(request):
        results.append(request)

    @app.middleware('response')
    async def process_response(request, response):
        results.append(request)
        results.append(response)

    class DummyView(HTTPMethodView):

        def get(self, request):
            return text('I am get method')

    app.add_route(DummyView(), '/')

    request, response = sanic_endpoint_test(app)

    assert response.text == 'I am get method'
    assert type(results[0]) is Request
    assert type(results[1]) is Request
    assert issubclass(type(results[2]), HTTPResponse)
项目:RealtimePythonChat    作者:quangtqag    | 项目源码 | 文件源码
def make_response(status, headers, payload):
    """This function generates an appropriate response object for this async
    mode.
    """
    headers_dict = {}
    content_type = None
    for h in headers:
        if h[0].lower() == 'content-type':
            content_type = h[1]
        else:
            headers_dict[h[0]] = h[1]
    return HTTPResponse(body_bytes=payload, content_type=content_type,
                        status=int(status.split()[0]), headers=headers_dict)
项目:sanic    作者:channelcat    | 项目源码 | 文件源码
def test_response_body_not_a_string():
    """Test when a response body sent from the application is not a string"""
    app = Sanic('response_body_not_a_string')
    random_num = choice(range(1000))

    @app.route('/hello')
    async def hello_route(request):
        return HTTPResponse(body=random_num)

    request, response = app.test_client.get('/hello')
    assert response.text == str(random_num)
项目:sanic    作者:channelcat    | 项目源码 | 文件源码
def test_file_stream_head_response(file_name, static_file_directory):
    app = Sanic('test_file_helper')
    @app.route('/files/<filename>', methods=['GET', 'HEAD'])
    async def file_route(request, filename):
        file_path = os.path.join(static_file_directory, filename)
        file_path = os.path.abspath(unquote(file_path))
        headers = dict()
        headers['Accept-Ranges'] = 'bytes'
        if request.method == "HEAD":
            # Return a normal HTTPResponse, not a
            # StreamingHTTPResponse for a HEAD request
            stats = await async_os.stat(file_path)
            headers['Content-Length'] = str(stats.st_size)
            return HTTPResponse(
                headers=headers,
                content_type=guess_type(file_path)[0] or 'text/plain')
        else:
            return file_stream(file_path, chunk_size=32, headers=headers,
                        mime_type=guess_type(file_path)[0] or 'text/plain')

    request, response = app.test_client.head('/files/{}'.format(file_name))
    assert response.status == 200
    # A HEAD request should never be streamed/chunked.
    if 'Transfer-Encoding' in response.headers:
        assert response.headers['Transfer-Encoding'] != "chunked"
    assert 'Accept-Ranges' in response.headers
    # A HEAD request should get the Content-Length too
    assert 'Content-Length' in response.headers
    assert int(response.headers[
               'Content-Length']) == len(
                   get_file_content(static_file_directory, file_name))
项目:sanic    作者:channelcat    | 项目源码 | 文件源码
def test_with_middleware_response():
    app = Sanic('test_with_middleware_response')

    results = []

    @app.middleware('request')
    async def process_response(request):
        results.append(request)

    @app.middleware('response')
    async def process_response(request, response):
        results.append(request)
        results.append(response)

    class DummyView(HTTPMethodView):

        def get(self, request):
            return text('I am get method')

    app.add_route(DummyView.as_view(), '/')

    request, response = app.test_client.get('/')

    assert response.text == 'I am get method'
    assert type(results[0]) is Request
    assert type(results[1]) is Request
    assert isinstance(results[2], HTTPResponse)
项目:sanic    作者:channelcat    | 项目源码 | 文件源码
def test_dont_register_system_signals():
    """Test if sanic don't register system signals"""
    app = Sanic('test_register_system_signals')

    @app.route('/hello')
    async def hello_route(request):
        return HTTPResponse()

    app.listener('after_server_start')(stop)
    app.listener('before_server_start')(set_loop)
    app.listener('after_server_stop')(after)

    app.run(HOST, PORT, register_sys_signals=False)
    assert calledq.get() == False
项目:sanic    作者:channelcat    | 项目源码 | 文件源码
def write_response(self, response):
        """
        Writes response content synchronously to the transport.
        """
        if self._response_timeout_handler:
            self._response_timeout_handler.cancel()
            self._response_timeout_handler = None
        try:
            keep_alive = self.keep_alive
            self.transport.write(
                response.output(
                    self.request.version, keep_alive,
                    self.keep_alive_timeout))
            self.log_response(response)
        except AttributeError:
            logger.error('Invalid response object for url %s, '
                         'Expected Type: HTTPResponse, Actual Type: %s',
                         self.url, type(response))
            self.write_error(ServerError('Invalid response type'))
        except RuntimeError:
            if self._debug:
                logger.error('Connection lost before response written @ %s',
                             self.request.ip)
            keep_alive = False
        except Exception as e:
            self.bail_out(
                "Writing response failed, connection closed {}".format(
                    repr(e)))
        finally:
            if not keep_alive:
                self.transport.close()
                self.transport = None
            else:
                self._keep_alive_timeout_handler = self.loop.call_later(
                    self.keep_alive_timeout,
                    self.keep_alive_timeout_callback)
                self._last_response_time = current_time
                self.cleanup()
项目:sanic    作者:channelcat    | 项目源码 | 文件源码
def stream_response(self, response):
        """
        Streams a response to the client asynchronously. Attaches
        the transport to the response so the response consumer can
        write to the response as needed.
        """
        if self._response_timeout_handler:
            self._response_timeout_handler.cancel()
            self._response_timeout_handler = None
        try:
            keep_alive = self.keep_alive
            response.transport = self.transport
            await response.stream(
                self.request.version, keep_alive, self.keep_alive_timeout)
            self.log_response(response)
        except AttributeError:
            logger.error('Invalid response object for url %s, '
                         'Expected Type: HTTPResponse, Actual Type: %s',
                         self.url, type(response))
            self.write_error(ServerError('Invalid response type'))
        except RuntimeError:
            if self._debug:
                logger.error('Connection lost before response written @ %s',
                             self.request.ip)
            keep_alive = False
        except Exception as e:
            self.bail_out(
                "Writing response failed, connection closed {}".format(
                    repr(e)))
        finally:
            if not keep_alive:
                self.transport.close()
                self.transport = None
            else:
                self._keep_alive_timeout_handler = self.loop.call_later(
                    self.keep_alive_timeout,
                    self.keep_alive_timeout_callback)
                self._last_response_time = current_time
                self.cleanup()
项目:sanic-cors    作者:ashleysommer    | 项目源码 | 文件源码
def setUp(self):
        self.app = Sanic(__name__)
        CORS(self.app)

        @self.app.route('/', methods=['GET', 'HEAD', 'OPTIONS'])
        def index(request):
            return HTTPResponse(body='Welcome', headers={"custom": "dictionary"})
项目:sanic-cors    作者:ashleysommer    | 项目源码 | 文件源码
def setUp(self):
        self.app = Sanic(__name__)

        @self.app.route('/test_multiple_set_cookie_headers')
        @cross_origin(self.app)
        def test_multiple_set_cookie_headers(request):
            resp = HTTPResponse(body="Foo bar baz")
            resp.headers = CIDict()
            resp.headers['set-cookie'] = 'foo'
            resp.headers['set-cookie'] = 'bar'
            return resp
项目:elizabeth-cloud    作者:wemake-services    | 项目源码 | 文件源码
def handle_resource(request: Request,
                          resource: str, sub: str) -> HTTPResponse:
    """
    This route is a wrapper of :class:`elizabeth.Generic`.
    It is used to serve different data over the REST API.

    Args:
        request: Sanic's request instance
        resource: first part of the url,
            name of elizabeth's resource
        sub: subname of the elizabeth's resource

    Returns:
        HTTPResponse: Sanic's response with json body,
            like: `{'data': 'some string'}`
    """
    language = request.args.get('lang', settings.SC_DEFAULT_LANGUAGE)

    g = Generic(language)

    try:
        obj = getattr(g, resource)
        obj = getattr(obj, sub)

    except AttributeError:
        # This means that one of the attributes
        # was not found, raise 404:
        raise NotFound('This resource does not exist')

    return response.json({'data': obj()})
项目:jinja2-sanic    作者:yunstanford    | 项目源码 | 文件源码
def template(template_name, *, app_key=APP_KEY, encoding='utf-8',
             headers=None, status=200):
    """
    Decorate web-handler to convert returned dict context into sanic.response.Response
    filled with template_name template.

    :param template_name: template name.
    :param request: a parameter from web-handler, sanic.request.Request instance.
    :param context: context for rendering.
    :param encoding: response encoding, 'utf-8' by default.
    :param status: HTTP status code for returned response, 200 (OK) by default.
    :param app_key: a optional key for application instance. If not provided,
                    default value will be used.
    """
    def wrapper(func):
        @functools.wraps(func)
        async def wrapped(*args, **kwargs):

            if asyncio.iscoroutinefunction(func):
                coro = func
            else:
                coro = asyncio.coroutine(func)

            context = await coro(*args, **kwargs)

            if isinstance(context, HTTPResponse):
                return context

            if isinstance(args[0], HTTPMethodView):
                request = args[1]
            else:
                request = args[0]

            return render_template(template_name, request, context,
                                   app_key=app_key, encoding=encoding)
        return wrapped

    return wrapper
项目:sanic-graphql    作者:graphql-python    | 项目源码 | 文件源码
def dispatch_request(self, request, *args, **kwargs):
        try:
            request_method = request.method.lower()
            data = self.parse_body(request)

            show_graphiql = request_method == 'get' and self.should_display_graphiql(request)
            catch = show_graphiql

            pretty = self.pretty or show_graphiql or request.args.get('pretty')

            if request_method != 'options':
                execution_results, all_params = run_http_query(
                    self.schema,
                    request_method,
                    data,
                    query_data=request.args,
                    batch_enabled=self.batch,
                    catch=catch,

                    # Execute options
                    return_promise=self._enable_async,
                    root_value=self.get_root_value(request),
                    context_value=self.get_context(request),
                    middleware=self.get_middleware(request),
                    executor=self.get_executor(request),
                )
                awaited_execution_results = await Promise.all(execution_results)
                result, status_code = encode_execution_results(
                    awaited_execution_results,
                    is_batch=isinstance(data, list),
                    format_error=self.format_error,
                    encode=partial(self.encode, pretty=pretty)
                )

                if show_graphiql:
                    return await self.render_graphiql(
                        params=all_params[0],
                        result=result
                    )

                return HTTPResponse(
                    result,
                    status=status_code,
                    content_type='application/json'
                )

            else:
                return self.process_preflight(request)

        except HttpQueryError as e:
            return HTTPResponse(
                self.encode({
                    'errors': [default_format_error(e)]
                }),
                status=e.status_code,
                headers=e.headers,
                content_type='application/json'
            )

    # noinspection PyBroadException
项目:Metis    作者:gusibi    | 项目源码 | 文件源码
def response_filter(view):

    @wraps(view)
    async def wrapper(*args, **kwargs):
        request = args[1]
        resp = view(*args, **kwargs)

        from inspect import isawaitable
        if isawaitable(resp):
            resp = await resp
        if isinstance(resp, HTTPResponse):
            return resp

        endpoint = _path_to_endpoint(request.uri_template)
        method = request.method
        if method == 'HEAD':
            method = 'GET'
        filter = filters.get((endpoint, method), None)
        if not filter:
            return resp

        headers = None
        status = None
        if isinstance(resp, tuple):
            resp, status, headers = unpack(resp)

        if len(filter) == 1:
            if six.PY3:
                status = list(filter.keys())[0]
            else:
                status = filter.keys()[0]

        schemas = filter.get(status)
        if not schemas:
            # return resp, status, headers
            raise ServerError('undefined', message='`%d` is not a defined status code.' % status)

        resp, errors = normalize(schemas['schema'], resp)
        if schemas['headers']:
            headers, header_errors = normalize(
                {'properties': schemas['headers']}, headers)
            errors.extend(header_errors)
        if errors:
            error_code = errors[0].get('name', 'ExpectationFailed')
            raise ServerError(error_code, errors=errors)

        return response.json(
            resp,
            status=status,
            headers=headers,
        )

    return wrapper
项目:Metis    作者:gusibi    | 项目源码 | 文件源码
def response_filter(view):

    @wraps(view)
    async def wrapper(*args, **kwargs):
        request = args[1]
        resp = view(*args, **kwargs)

        from inspect import isawaitable
        if isawaitable(resp):
            resp = await resp
        if isinstance(resp, HTTPResponse):
            return resp

        endpoint = _path_to_endpoint(request.uri_template)
        method = request.method
        if method == 'HEAD':
            method = 'GET'
        filter = filters.get((endpoint, method), None)
        if not filter:
            return resp

        headers = None
        status = None
        if isinstance(resp, tuple):
            resp, status, headers = unpack(resp)

        if len(filter) == 1:
            if six.PY3:
                status = list(filter.keys())[0]
            else:
                status = filter.keys()[0]

        schemas = filter.get(status)
        if not schemas:
            # return resp, status, headers
            raise ServerError('`%d` is not a defined status code.' % status, 500)

        resp, errors = normalize(schemas['schema'], resp)
        if schemas['headers']:
            headers, header_errors = normalize(
                {'properties': schemas['headers']}, headers)
            errors.extend(header_errors)
        if errors:
            raise ServerError('Expectation Failed', 500)

        return response.json(
            resp,
            status=status,
            headers=headers,
        )

    return wrapper