Python aiohttp.web 模块,Request() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用aiohttp.web.Request()

项目:picasso    作者:openstack    | 项目源码 | 文件源码
def auth_through_token(app: web.Application, handler):
    async def middleware_handler(request: web.Request):
        headers = request.headers
        x_auth_token = headers.get("X-Auth-Token")
        project_id = request.match_info.get('project_id')
        c = config.Config.config_instance()
        try:
            auth = identity.Token(c.auth_url,
                                  token=x_auth_token,
                                  project_id=project_id)
            sess = session.Session(auth=auth)
            ks = client.Client(session=sess,
                               project_id=project_id)
            ks.authenticate(token=x_auth_token)
        except Exception as ex:
            return web.json_response(status=401, data={
                "error": {
                    "message": ("Not authorized. Reason: {}"
                                .format(str(ex)))
                }
            })
        return await handler(request)
    return middleware_handler
项目:guillotina    作者:plone    | 项目源码 | 文件源码
def get_current_request() -> IRequest:
    """
    Return the current request by heuristically looking it up from stack
    """
    try:
        task_context = aiotask_context.get('request')
        if task_context is not None:
            return task_context
    except (ValueError, AttributeError, RuntimeError):
        pass

    # fallback
    frame = inspect.currentframe()
    while frame is not None:
        request = getattr(frame.f_locals.get('self'), 'request', None)
        if request is not None:
            return request
        elif isinstance(frame.f_locals.get('request'), Request):
            return frame.f_locals['request']
        frame = frame.f_back
    raise RequestNotFound(RequestNotFound.__doc__)
项目:aiohttp-three-template    作者:RobertoPrevato    | 项目源码 | 文件源码
def initialize_anonymous_session(self, request: Request):
        """
        Initializes an anonymous session for the given request.

        :param request: incoming request to a resource that is related to this logical area.
        """
        client_ip = self.get_client_ip(request)
        result = await self.membership.initialize_anonymous_session(client_ip,
                                                                    client_data=request.headers.get("User-Agent"))

        # set a flag to set a session cookie
        session = result.session
        session_cookie_name = self.config.session_cookie_name
        encryption_key = self.config.encryption_key
        session_cookie_value = AesEncryptor.encrypt(str(session.guid), encryption_key)

        request.set_session_cookie = True
        request.cookies_to_set.append(CookieToken(session_cookie_name,
                                                  session_cookie_value,
                                                  httponly=True,
                                                  secure=self.secure_cookies))
        # store user and session information in the request object
        request.user = result.principal
        request.session = session
项目:cockatiel    作者:raphaelm    | 项目源码 | 文件源码
def delete_file(request: web.Request):
    filename = request.match_info.get('name').strip()
    filepath = os.path.join(config.args.storage, filename)

    if filename in replication.dellog:
        # We know this already
        raise web.HTTPNotFound()

    if not os.path.exists(filepath):
        if not request.headers['User-Agent'].startswith('cockatiel/'):
            logger.debug('File {} does not exist, but we will still propagate the deletion.'.format(filename))
            replication.dellog.put(filename)
            replication.queue_operation('DELETE', filename)
        raise web.HTTPNotFound()

    os.remove(filepath)
    # TODO: Clean up now-empty dictionaries

    logger.debug('Deleted file {}, scheduling replication.'.format(filename))
    replication.dellog.put(filename)
    replication.queue_operation('DELETE', filename)
    return web.Response()
项目:aiohttp_json_api    作者:vovanbo    | 项目源码 | 文件源码
def get_collection(request: web.Request):
    """
    Fetch resources collection, render JSON API document and return response.

    Uses the :meth:`~aiohttp_json_api.schema.BaseSchema.query_collection`
    method of the schema to query the resources in the collection.

    :seealso: http://jsonapi.org/format/#fetching
    """
    ctx = JSONAPIContext(request)
    resources = await ctx.controller.query_collection()

    compound_documents = None
    if ctx.include and resources:
        compound_documents, relationships = \
            await get_compound_documents(resources, ctx)

    result = await render_document(resources, compound_documents, ctx)

    return jsonapi_response(result)
项目:aiohttp_json_api    作者:vovanbo    | 项目源码 | 文件源码
def get_resource(request: web.Request):
    """
    Get single resource, render JSON API document and return response.

    Uses the :meth:`~aiohttp_json_api.schema.BaseSchema.query_resource`
    method of the schema to query the requested resource.

    :seealso: http://jsonapi.org/format/#fetching-resources
    """
    ctx = JSONAPIContext(request)
    resource_id = request.match_info.get('id')
    validate_uri_resource_id(ctx.schema, resource_id)

    resource = await ctx.controller.query_resource(resource_id)

    compound_documents = None
    if ctx.include and resource:
        compound_documents, relationships = \
            await get_compound_documents(resource, ctx)

    result = await render_document(resource, compound_documents, ctx)

    return jsonapi_response(result)
项目:aiohttp_json_api    作者:vovanbo    | 项目源码 | 文件源码
def parse_request_includes(cls, request: web.Request) -> RequestIncludes:
        """
        Parse compound documents parameters from request query string.

        Returns the names of the relationships, which should be included into
        the response.

        .. code-block:: python3

            >>> from aiohttp_json_api.context import JSONAPIContext
            >>> from aiohttp.test_utils import make_mocked_request
            >>> request = make_mocked_request('GET', '/api/Post?include=author,comments.author,some-field.nested')
            >>> JSONAPIContext.parse_request_includes(request)
            (('author',), ('comments', 'author'), ('some_field', 'nested'))

        :seealso: http://jsonapi.org/format/#fetching-includes
        """
        return tuple(
            tuple(cls.convert_field_name(p) for p in path.split('.'))
            for path in request.query.get('include', '').split(',') if path
        )
项目:blog-server    作者:chehThss    | 项目源码 | 文件源码
def file_handler(request: web.Request):
    path = '/file' + request.match_info.get('path')
    data = await parse(request, global_handlers.keys())
    data['path'] = path
    handler = global_handlers[request.method]
    try:
        result = await handler(*(data, request, None)[:len(signature(handler).parameters)])
    except InvalidRequest as err:
        return web.Response(text=json.dumps({
            'status': 1,
            'data': str(err)
        }, ensure_ascii=False),status=err.status_code, content_type='application/json')
    if isinstance(result, web.StreamResponse):
        return result
    return web.Response(text=json.dumps({
        'status': 0,
        **({'data': result} if result is not None else {})
    }, ensure_ascii=False), content_type='application/json')
项目:blog-server    作者:chehThss    | 项目源码 | 文件源码
def ajax_handler(request: web.Request):
    action = request.match_info.get('action')
    data = await parse(request, global_handlers.keys())
    if action not in global_handlers[request.method]:
        raise web.HTTPBadRequest()
    handler = global_handlers[request.method][action]
    try:
        result = await handler(*(data, request, None)[:len(signature(handler).parameters)])
    except InvalidRequest as err:
        return web.Response(text=json.dumps({
            'status': 1,
            'data': str(err)
        }, ensure_ascii=False), status=err.status_code, content_type='application/json')
    if isinstance(result, web.StreamResponse):
        return result
    return web.Response(text=json.dumps({
        'status': 0,
        **({'data': result} if result is not None else {})
    }, ensure_ascii=False), content_type='application/json')
项目:djaio    作者:Sberned    | 项目源码 | 文件源码
def _emit(self, record, **kwargs):
        request = getattr(record, 'request', None)
        if request is None or not isinstance(request, Request):
            return super()._emit(record, **kwargs)

        record.request = self._create_request_data(request)
        return super()._emit(record, **kwargs)
项目:djaio    作者:Sberned    | 项目源码 | 文件源码
def _create_request_data(request: Request) -> Dict:
        return {
            'url': str(request.url),
            'query_string': request.query_string,
            'method': request.method,
            'headers': dict(request.headers),
        }
项目:irisett    作者:beebyte    | 项目源码 | 文件源码
def logging_middleware_factory(app: web.Application, handler: Any) -> Callable:
    """Basic logging and accounting."""
    async def middleware_handler(request: web.Request) -> web.Response:
        stats.inc('num_calls', 'WEBAPI')
        log.msg('Received request: %s' % request, 'WEBAPI')
        return await handler(request)

    return middleware_handler
项目:irisett    作者:beebyte    | 项目源码 | 文件源码
def basic_auth_middleware_factory(app: web.Application, handler: Any) -> Callable:
    """Authentication.

    Uses HTTP basic auth to check that requests are including the required
    username and password.
    """
    async def middleware_handler(request: web.Request) -> web.Response:
        ok = False
        auth_token = request.headers.get('Authorization')
        if auth_token and auth_token.startswith('Basic '):
            auth_token = auth_token[6:]
            try:
                auth_bytes = base64.b64decode(auth_token)  # type: Optional[bytes]
            except binascii.Error:
                auth_bytes = None
            if auth_bytes:
                auth_str = auth_bytes.decode('utf-8', errors='ignore')
                if ':' in auth_str:
                    username, password = auth_str.split(':', 1)
                    if username == app['username'] and password == app['password']:
                        ok = True
        if not ok:
            log.msg('Unauthorized request: %s' % request, 'WEBAPI')
            raise errors.PermissionDenied('Unauthorized')
        return await handler(request)

    return middleware_handler


# noinspection PyUnusedLocal
项目:irisett    作者:beebyte    | 项目源码 | 文件源码
def error_handler_middleware_factory(app: web.Application, handler: Any) -> Callable:
    """Error handling middle.

    Catch errors raised in web views and try to return a corresponding
    HTTP error code.
    """
    async def middleware_handler(request: web.Request) -> web.Response:
        errcode = None
        errmsg = None
        ret = None
        try:
            ret = await handler(request)
        except errors.NotFound as e:
            errcode = 404
            errmsg = str(e) or 'not found'
        except errors.PermissionDenied as e:
            errcode = 401
            errmsg = str(e) or 'permission denied'
        except errors.InvalidData as e:
            errcode = 400
            errmsg = str(e) or 'invalid data'
        except errors.WebAPIError as e:
            errcode = 400
            errmsg = str(e) or 'api error'
        except IrisettError as e:
            errcode = 400
            errmsg = str(e) or 'irisett error'
        if errcode:
            log.msg('Request returning error(%d/%s): %s' % (errcode, errmsg, request), 'WEBAPI')
            ret = web.Response(status=errcode, text=errmsg)
        return ret

    return middleware_handler
项目:irisett    作者:beebyte    | 项目源码 | 文件源码
def get_request_param(request: web.Request, name: str, error_if_missing: bool = True) -> Optional[str]:
    """Get a single value from a request GET parameter.

    Optionally error if it is missing.
    """
    if name not in request.rel_url.query:
        if error_if_missing:
            raise errors.NotFound()
        else:
            return None
    ret = request.rel_url.query[name]
    return ret
项目:irisett    作者:beebyte    | 项目源码 | 文件源码
def _get_request_monitor(self, request: web.Request) -> ActiveMonitor:
        monitor_id = require_int(cast(str, get_request_param(request, 'id')))
        monitor = request.app['active_monitor_manager'].monitors.get(monitor_id, None)
        if not monitor:
            raise errors.NotFound()
        return monitor
项目:irisett    作者:beebyte    | 项目源码 | 文件源码
def _get_request_monitor_def(self, request: web.Request) -> ActiveMonitorDef:
        monitor_def_id = require_int(get_request_param(request, 'id'))
        monitor_def = self.request.app['active_monitor_manager'].monitor_defs.get(monitor_def_id, None)
        if not monitor_def:
            raise errors.NotFound()
        return monitor_def
项目:irisett    作者:beebyte    | 项目源码 | 文件源码
def logging_middleware_factory(app: web.Application, handler: Any) -> Callable:
    """Basic logging and accounting."""
    async def middleware_handler(request: web.Request) -> web.Response:
        stats.inc('num_calls', 'WEBMGMT')
        log.msg('Received request: %s' % request, 'WEBMGMT')
        return await handler(request)

    return middleware_handler
项目:irisett    作者:beebyte    | 项目源码 | 文件源码
def basic_auth_middleware_factory(app: web.Application, handler: Any) -> Callable:
    """Authentication.

    Uses HTTP basic auth to check that requests are including the required
    username and password.
    """
    async def middleware_handler(request: web.Request) -> web.Response:
        ok = False
        auth_token = request.headers.get('Authorization')
        if auth_token and auth_token.startswith('Basic '):
            auth_token = auth_token[6:]
            try:
                auth_bytes = base64.b64decode(auth_token)  # type: Optional[bytes]
            except binascii.Error:
                auth_bytes = None
            if auth_bytes:
                auth_str = auth_bytes.decode('utf-8', errors='ignore')
                if ':' in auth_str:
                    username, password = auth_str.split(':', 1)
                    if username == app['username'] and password == app['password']:
                        ok = True
        if not ok:
            log.msg('Unauthorized request: %s' % request, 'WEBMGMT')
            raise errors.MissingLogin('Unauthorized')
        return await handler(request)

    return middleware_handler


# noinspection PyUnusedLocal
项目:irisett    作者:beebyte    | 项目源码 | 文件源码
def error_handler_middleware_factory(app: web.Application, handler: Any) -> Callable:
    """Error handling middle.

    Catch errors raised in web views and try to return a corresponding
    HTTP error code.
    """
    async def middleware_handler(request: web.Request) -> web.Response:
        errcode = None
        errmsg = None
        ret = None
        headers = {}
        try:
            ret = await handler(request)
        except errors.NotFound as e:
            errcode = 404
            errmsg = str(e) or 'not found'
        except errors.PermissionDenied as e:
            errcode = 401
            errmsg = str(e) or 'permission denied'
        except errors.MissingLogin as e:
            errcode = 401
            errmsg = str(e) or 'permission denied'
            headers['WWW-Authenticate'] = 'Basic realm="Restricted"'
        except errors.InvalidData as e:
            errcode = 400
            errmsg = str(e) or 'invalid data'
        except errors.WebMgmtError as e:
            errcode = 400
            errmsg = str(e) or 'web error'
        except IrisettError as e:
            errcode = 400
            errmsg = str(e) or 'irisett error'
        if errcode:
            log.msg('Request returning error(%d/%s): %s' % (errcode, errmsg, request), 'WEBMGMT')
            ret = web.Response(status=errcode, text=errmsg, headers=headers)
        return ret

    return middleware_handler
项目:irisett    作者:beebyte    | 项目源码 | 文件源码
def __init__(self, request: web.Request) -> None:
        self.request = request
        self.ws = web.WebSocketResponse()
        self.running = False
        self.client_started = False
        self.listener = None  # type: Optional[event.EventListener]
项目:irisett    作者:beebyte    | 项目源码 | 文件源码
def run_active_monitor_view(request: web.Request) -> web.Response:
    """GET view to run an active monitor immediately."""
    monitor_id = int(request.match_info['id'])
    am_manager = request.app['active_monitor_manager']
    monitor = am_manager.monitors[monitor_id]
    monitor.schedule_immediately()
    return web.HTTPFound('/active_monitor/%s/?notification_msg=Monitor job scheduled' % monitor_id)
项目:irisett    作者:beebyte    | 项目源码 | 文件源码
def send_active_monitor_test_notification(request: web.Request) -> web.Response:
    """GET view to send a test notification for an active monitor."""
    monitor_id = int(request.match_info['id'])
    am_manager = request.app['active_monitor_manager']
    monitor = am_manager.monitors[monitor_id]
    monitor.schedule_immediately()
    await monitor.notify_state_change('UNKNOWN', abs(monitor.state_ts - (time.time() - monitor.state_ts)))
    return web.HTTPFound('/active_monitor/%s/?notification_msg=Notification sent' % monitor_id)
项目:python-awesome-web    作者:tianzhenyun    | 项目源码 | 文件源码
def logger_factory(app, handler):
    @asyncio.coroutine
    def logger(request):
        logging.info('Request: {} {}'.format(request.method, request.path))
        # await asyncio.sleep(0.3)
        return (yield from handler(request))
    return logger
项目:python-awesome-web    作者:tianzhenyun    | 项目源码 | 文件源码
def response_factory(app, handler):
    @asyncio.coroutine
    def response(request):
        logging.info('Response handler...')
        r = yield from handler(request)
        if isinstance(r, web.StreamResponse):
            return r
        if isinstance(r, bytes):
            res = web.Response(body = r)
            res.content_type = 'application/octet-stream'
            return res
        if isinstance(r, str):
            if r.startswith('redirect:'):
                return web.HTTPFound(r[9:])
            res = web.Response(body = r.encode('utf-8'))
            res.content_type = 'text/html; charset=utf-8'
            return res
        if isinstance(r, dict):
            template = r.get('__template__')
            if template is None:
                res = web.Response(body = json.dumps(r, ensure_ascii = False, default = lambda o: o.__dict__).encode('utf-8'))
                res.content_type = 'application/json;charset=utf-8'
                return res
            else:
                r['__user__'] = request.__user__
                res = web.Response(body = app['__templating__'].get_template(template).render(**r).encode('utf-8'))
                res.content_type = 'text/html;charset=utf-8'
                return res
        if isinstance(r, int) and r >= 100 and r < 600:
            return web.Response(r)
        if isinstance(r, tuple) and len(r) == 2:
            t, m = r
            if isinstance(t, int) and t >= 100 and t < 600:
                return web.Response(t, str(m))
        #default:
        res = web.Request(body = str(r).encode('utf-8'))
        res.content_type = 'text/plain;charset=utf-8'
        return res
    return response
项目:picasso    作者:openstack    | 项目源码 | 文件源码
def content_type_validator(app: web.Application, handler):
    async def middleware_handler(request: web.Request):
        headers = request.headers
        content_type = headers.get("Content-Type")
        if request.has_body:
            if "application/json" != content_type:
                return web.json_response(
                    data={
                        "error": {
                            "message": "Invalid content type"
                        }
                    }, status=400)
        return await handler(request)
    return middleware_handler
项目:aiohttp_auth    作者:gnarlychicken    | 项目源码 | 文件源码
def make_request(method, path, middlewares, cookies=None):
    headers = CIMultiDict()
    if cookies:
        for key, value in cookies:
            headers.add('Cookie', _cookie_value(key, value))

    message = protocol.RawRequestMessage(method, path, protocol.HttpVersion11,
                                         headers, True, False)
    request = web.Request({}, message, EmptyStreamReader(), None, None, None)

    if middlewares:
        return await prepare_request(request, middlewares)

    return request
项目:covador    作者:baverman    | 项目源码 | 文件源码
def make_request(method, path, content=None, headers=None):
    s = streams.StreamReader()
    if content is not None:
        s.feed_data(content)
        s.feed_eof()

    return web.Request(Message(method, path, headers or {}), s, Protocol, None, None, None)
项目:covador    作者:baverman    | 项目源码 | 文件源码
def make_request(method, path, content=None, headers=None):
    s = streams.StreamReader()
    if content is not None:
        s.feed_data(content)
        s.feed_eof()

    return web.Request(Message(method, path, headers or {}), s, Protocol, None, None, None)
项目:covador    作者:baverman    | 项目源码 | 文件源码
def make_request(method, path, content=None, headers=None):
    s = streams.StreamReader()
    if content is not None:
        s.feed_data(content)
        s.feed_eof()

    return web.Request(Message(method, path, headers or {}), s, Protocol, None, None, None)
项目:covador    作者:baverman    | 项目源码 | 文件源码
def make_request(method, path, content=None, headers=None):
    s = streams.StreamReader()
    if content is not None:
        s.feed_data(content)
        s.feed_eof()

    return web.Request(Message(method, path, headers or {}), s, Protocol, None, None, None)
项目:aiohttp-three-template    作者:RobertoPrevato    | 项目源码 | 文件源码
def _authenticate_user(self, request : Request):
        """
        If the area features membership, it invokes the methods of the underlying membership provider to authenticate
        the user, supporting anonymous authentication.

        :param request: request to authenticate.
        """
        request.user = None
        encryption_key = self.config.encryption_key
        membership = self.membership
        set_anonymous_session = False

        if self.membership:
            # does the request contains the session cookie for this area?
            session_cookie_name = self.config.session_cookie_name
            session_key = request.cookies.get(session_cookie_name)
            if session_key:
                # try to load the session
                # decrypt the session key
                success, session_guid = AesEncryptor.try_decrypt(session_key, encryption_key)
                if success:
                    # try to perform login by session key
                    success, result = await membership.try_login_by_session_key(session_guid)
                    if success:
                        # result is a principal object
                        request.user = result.principal
                        request.session = result.session
                    else:
                        # the login by session cookie failed: the session could be expired
                        set_anonymous_session = True
                else:
                    # session key decryption failed
                    set_anonymous_session = True
            else:
                # the request does not contain a session cookie for this area
                set_anonymous_session = True

        if set_anonymous_session:
            # initialize an anonymous session
            await self.initialize_anonymous_session(request)
        return self
项目:aiohttp-three-template    作者:RobertoPrevato    | 项目源码 | 文件源码
def _get_culture_for_request(self, request):
        """
        Gets the culture to use for a given request.
        """
        if "GET" == request.method:
            culture = request.match_info.get("culture")
            if culture:
                if not self._is_supported_culture(culture):
                    # the given culture is not supported; the user could have changed a url by hand
                    # raise an exception to redirect to a proper url
                    raise InvalidCultureException()
                return culture

        user = request.user
        if user and not user.anonymous and self._is_supported_culture(user.culture):
            return user.culture

        if "POST" == request.method:
            # check custom headers
            culture_header = request.headers.get("X-Request-Culture")
            if self._is_supported_culture(culture_header):
                return culture_header

        culture_cookie = request.cookies.get("culture")
        if self._is_supported_culture(culture_cookie):
            return culture_cookie
项目:the-knights-who-say-ni    作者:python    | 项目源码 | 文件源码
def handler(create_client: Callable[[], aiohttp.ClientSession], server: ni_abc.ServerHost,
            cla_records: ni_abc.CLAHost) -> Callable[[web.Request], Awaitable[web.Response]]:
    """Create a closure to handle requests from the contribution host."""
    async def respond(request: web.Request) -> web.Response:
        """Handle a webhook trigger from the contribution host."""
        async with create_client() as client:
            try:
                contribution = await ContribHost.process(server, request, client)
                usernames = await contribution.usernames()
                server.log("Usernames: " + str(usernames))
                trusted_users = server.trusted_users()
                usernames_to_check = usernames - trusted_users
                cla_status = await cla_records.check(client, usernames_to_check)
                server.log("CLA status: " + str(cla_status))
                # With a work queue, one could make the updating of the
                # contribution a work item and return an HTTP 202 response.
                await contribution.update(cla_status)
                return web.Response(status=http.HTTPStatus.OK)
            except ni_abc.ResponseExit as exc:
                return exc.response
            except Exception as exc:
                server.log_exception(exc)
                return web.Response(
                        status=http.HTTPStatus.INTERNAL_SERVER_ERROR)

    return respond
项目:the-knights-who-say-ni    作者:python    | 项目源码 | 文件源码
def process(cls, server: ni_abc.ServerHost,
                      request: web.Request, client: aiohttp.ClientSession) -> "Host":
        """Process the pull request."""
        event = sansio.Event.from_http(request.headers,
                                       await request.read(),
                                       secret=server.contrib_secret())
        if event.event == "ping":
            # A ping event; nothing to do.
            # https://developer.github.com/webhooks/#ping-event
            raise ni_abc.ResponseExit(status=http.HTTPStatus.OK)
        elif event.event != "pull_request":
            # Only happens if GitHub is misconfigured to send the wrong events.
            raise TypeError(f"don't know how to handle a {event.event!r} event")
        elif event.data['action'] not in cls._useful_actions:
            raise ni_abc.ResponseExit(status=http.HTTPStatus.NO_CONTENT)
        elif event.data['action'] in {PullRequestEvent.opened.value, PullRequestEvent.synchronize.value}:
            if event.data['action'] == PullRequestEvent.opened.value:
                # GitHub is eventually consistent, so add a delay to wait for
                # the API to digest the new pull request.
                await asyncio.sleep(1)
            return cls(server, client, PullRequestEvent(event.data['action']),
                       event.data)
        elif event.data['action'] == PullRequestEvent.unlabeled.value:
            label = event.data['label']['name']
            if not label.startswith(LABEL_PREFIX):
                raise ni_abc.ResponseExit(status=http.HTTPStatus.NO_CONTENT)
            return cls(server, client, PullRequestEvent.unlabeled, event.data)
        else:  # pragma: no cover
            # Should never happen.
            raise TypeError(f"don't know how to handle a {event.data['action']!r} action")
项目:the-knights-who-say-ni    作者:python    | 项目源码 | 文件源码
def process(cls, server: ServerHost,
                      request: web.Request,
                      client: aiohttp.ClientSession) -> "ContribHost":
        """Process a request into a contribution."""
        # This method exists because __init__() cannot be a coroutine.
        raise ResponseExit(status=http.HTTPStatus.NOT_IMPLEMENTED)  # pragma: no cover
项目:cockatiel    作者:raphaelm    | 项目源码 | 文件源码
def get_file(request: web.Request):
    filename = request.match_info.get('name').strip()
    filepath = os.path.join(config.args.storage, filename)
    _, ext = os.path.splitext(filepath)
    etag = hashlib.sha1(filename.encode('utf-8')).hexdigest()

    if not os.path.exists(filepath):
        raise web.HTTPNotFound()

    if 'If-None-Match' in request.headers:
        raise web.HTTPNotModified(headers={
            'ETag': etag
        })

    stat = os.stat(filepath)

    if request.method == 'HEAD':
        resp = web.Response()
    else:
        resp = web.StreamResponse()

    resp.headers['Content-Type'] = mimetypes.types_map.get(ext, 'application/octet-stream')
    resp.headers['ETag'] = etag
    resp.headers['Cache-Control'] = 'max-age=31536000'
    resp.headers['X-Content-SHA1'] = get_hash_from_name(filename)
    resp.content_length = stat.st_size
    resp.last_modified = stat.st_mtime

    if request.method == 'HEAD':
        return resp

    yield from resp.prepare(request)
    with open(filepath, 'rb') as f:
        for chunk in chunks(f):
            resp.write(chunk)
            yield from resp.drain()

    yield from resp.write_eof()
    resp.force_close()
    return resp
项目:cockatiel    作者:raphaelm    | 项目源码 | 文件源码
def status(request: web.Request):
    stat = {
        'queues': {
            n: {
                'length': len(replication.get_queue_for_node(n))
            } for n in replication.get_nodes()
            }
    }
    return web.Response(text=json.dumps(stat), headers={
        'Content-Type': 'application/json'
    })
项目:chromewhip    作者:chuckus    | 项目源码 | 文件源码
def render_html(request: web.Request):
    # https://splash.readthedocs.io/en/stable/api.html#render-html
    tab = await _go(request)
    return web.Response(text=BS((await tab.html()).decode()).prettify())
项目:tomodachi    作者:kalaspuff    | 项目源码 | 文件源码
def example(self, request: web.Request) -> str:
        await asyncio.sleep(1)
        return '??'  # tomodachi
项目:tomodachi    作者:kalaspuff    | 项目源码 | 文件源码
def example_with_id(self, request: web.Request, id: str) -> str:
        return '?? (id: {})'.format(id)
项目:tomodachi    作者:kalaspuff    | 项目源码 | 文件源码
def response_object(self, request: web.Request) -> Response:
        return Response(body='{"data": true}', status=200, content_type='application/json')
项目:tomodachi    作者:kalaspuff    | 项目源码 | 文件源码
def error_404(self, request: web.Request) -> str:
        return 'error 404'
项目:tomodachi    作者:kalaspuff    | 项目源码 | 文件源码
def static_request_handler(cls: Any, obj: Any, context: Dict, func: Any, path: str, base_url: str) -> Any:
        if '?P<filename>' not in base_url:
            pattern = r'^{}(?P<filename>.+?)$'.format(re.sub(r'\$$', '', re.sub(r'^\^?(.*)$', r'\1', base_url)))
        else:
            pattern = r'^{}$'.format(re.sub(r'\$$', '', re.sub(r'^\^?(.*)$', r'\1', base_url)))
        compiled_pattern = re.compile(pattern)

        if path.startswith('/'):
            path = os.path.dirname(path)
        else:
            path = '{}/{}'.format(os.path.dirname(context.get('context', {}).get('_service_file_path')), path)

        if not path.endswith('/'):
            path = '{}/'.format(path)

        async def handler(request: web.Request) -> web.Response:
            result = compiled_pattern.match(request.path)
            filename = result.groupdict()['filename']
            filepath = '{}{}'.format(path, filename)

            try:
                if os.path.isdir(filepath) or not os.path.exists(filepath):
                    raise web.HTTPNotFound()

                pathlib.Path(filepath).open('r')
                return FileResponse(filepath)
            except PermissionError as e:
                raise web.HTTPForbidden()

        context['_http_routes'] = context.get('_http_routes', [])
        context['_http_routes'].append(('GET', pattern, handler))

        start_func = cls.start_server(obj, context)
        return (await start_func) if start_func else None
项目:tomodachi    作者:kalaspuff    | 项目源码 | 文件源码
def test(self, request: web.Request) -> str:
        return_value = 'test'
        return return_value
项目:tomodachi    作者:kalaspuff    | 项目源码 | 文件源码
def test_with_id(self, request: web.Request, id: str) -> str:
        return 'test {}'.format(id)
项目:tomodachi    作者:kalaspuff    | 项目源码 | 文件源码
def test_slow(self, request: web.Request) -> str:
        await asyncio.sleep(2.0)
        self.slow_request = True
        return 'test'
项目:tomodachi    作者:kalaspuff    | 项目源码 | 文件源码
def test_dict(self, request: web.Request) -> Dict:
        return {
            'status': 200,
            'body': 'test dict',
            'headers': {
                'X-Dict': 'test'
            }
        }
项目:tomodachi    作者:kalaspuff    | 项目源码 | 文件源码
def test_tuple(self, request: web.Request) -> Tuple:
        return (200, 'test tuple', {
            'X-Tuple': 'test'
        })
项目:tomodachi    作者:kalaspuff    | 项目源码 | 文件源码
def test_response_object(self, request: web.Request) -> Response:
        return Response(body='test tomodachi response', status=200, headers={
            'X-Tomodachi-Response': 'test'
        })