我们从Python开源项目中,提取了以下3个代码示例,用于说明如何使用aiohttp.web.View()。
def test_atomic_from_view(test_client): app = web.Application() class MyView(web.View): @atomic async def get(self): return web.Response() app.router.add_route("*", "/", MyView) aiojobs_setup(app) client = await test_client(app) resp = await client.get('/') assert resp.status == 200 scheduler = get_scheduler_from_app(app) assert scheduler.active_count == 0 assert scheduler.pending_count == 0
def __call__(self, url, method: [Iterable, str] = 'GET'): def _(obj): from .view import BaseView if iscoroutinefunction(obj): assert method, "Must give at least one method to http handler `%s`" % obj.__name__ if type(method) == str: methods = (method,) else: methods = list(method) self.funcs.append((url, methods, obj)) elif isinstance(obj, type): if issubclass(obj, WSHandler): self.websockets.append((url, obj())) elif issubclass(obj, web.View): self.aiohttp_views.append((url, obj)) elif issubclass(obj, BaseView): self.views.append((url, obj)) else: raise BaseException('Invalid type for router: %r' % type(obj).__name__) else: raise BaseException('Invalid type for router: %r' % type(obj).__name__) return obj return _
def atomic(coro): @wraps(coro) async def wrapper(request): if isinstance(request, View): # Class Based View decorated. request = request.request job = await spawn(request, coro(request)) return await job.wait() return wrapper