Python aiohttp 模块,web() 实例源码

我们从Python开源项目中,提取了以下33个代码示例,用于说明如何使用aiohttp.web()

项目:uzdevsbot    作者:Uzbek-Developers    | 项目源码 | 文件源码
def run_webhook(self, webhook_url, **options):
        """
        Convenience method for running bots in webhook mode

        :Example:

        >>> if __name__ == '__main__':
        >>>     bot.run_webhook(webhook_url="https://yourserver.com/webhooktoken")

        Additional documentation on https://core.telegram.org/bots/api#setwebhook
        """
        loop = asyncio.get_event_loop()
        loop.run_until_complete(self.set_webhook(webhook_url, **options))
        if webhook_url:
            url = urlparse(webhook_url)
            app = self.create_webhook_app(url.path, loop)
            host = os.environ.get('HOST', '0.0.0.0')
            port = int(os.environ.get('PORT', 0)) or url.port
            web.run_app(app, host=host, port=port)
项目:xmppwb    作者:saqura    | 项目源码 | 文件源码
def handle_incoming_webhook(self, request):
        """This coroutine handles incoming webhooks: It receives incoming
        webhooks and relays the messages to XMPP."""
        if request.content_type == 'application/json':
            payload = await request.json()
            # print(payload)
        else:
            # TODO: Handle other content types
            payload = await request.post()

        # Disgard empty messages
        if payload['text'] == "":
            return aiohttp.web.Response()

        token = payload['token']
        logging.debug("--> Handling incoming request from token "
                      "'{}'...".format(token))
        username = payload['user_name']
        msg = payload['text']

        for bridge in self.bridges:
            bridge.handle_incoming_webhook(token, username, msg)

        return aiohttp.web.Response()
项目:milisia-tox    作者:milisarge    | 项目源码 | 文件源码
def dps_baglan(request):
    link=""
    global tox
    durum=""
    data = yield from request.post()
    toxid = data['kdugum']
    print (toxid)
    port =33999 
    lport=random.randrange(38000,40000)
    komut="./tuntox -i "+str(toxid)+" -L "+str(lport)+":127.0.0.1:"+str(port)
    print ("dugumler aras? tunel ac?l?yor.")
    #tunel id kaydetmek için-?u an iptal
    #open("yenidugum","w").write(toxid)
    durum=yield from komutar(komut)
    link=lokalhost+":"+str(lport)
    return web.json_response(data=link)
项目:aioprometheus    作者:claws    | 项目源码 | 文件源码
def replace(self,
                      registry: CollectorRegistry) -> aiohttp.web.Response:
        '''
        ``replace`` pushes new values for a group of metrics to the push
        gateway.

        .. note::

            All existing metrics with the same grouping key specified in the
            URL will be replaced with the new metrics value.

        '''
        async with aiohttp.ClientSession(loop=self.loop) as session:
            payload = self.formatter.marshall(registry)
            resp = await session.put(
                self.path, data=payload, headers=self.headers)
        await resp.release()
        return resp
项目:aioprometheus    作者:claws    | 项目源码 | 文件源码
def __init__(self,
                 registry: Registry = None,
                 loop: BaseEventLoop = None) -> None:
        '''
        Initialise the Prometheus metrics service.

        :param registry: A :class:`CollectorRegistry` instance that will
          hold all the metrics for this service. If no registry if specified
          then the default registry is used.

        :param loop: The event loop instance to use. If no loop is specified
          then the default event loop will be retrieved.

        :raises: Exception if the registry object is not an instance of the
          Registry type.
        '''
        self.loop = loop or asyncio.get_event_loop()
        if registry is not None and not isinstance(registry, Registry):
            raise Exception(
                'registry must be a Registry, got: {}'.format(registry))
        self.registry = registry or Registry()
        self._svr = None  # type: Server
        self._svc = None  # type: aiohttp.web.Application
        self._handler = None  # type: aiohttp.web.RequestHandlerFactory
        self._https = None  # type: bool
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def __init__(self, app_or_server, *, scheme=sentinel, host=sentinel,
                 cookie_jar=None, **kwargs):
        if isinstance(app_or_server, BaseTestServer):
            if scheme is not sentinel or host is not sentinel:
                raise ValueError("scheme and host are mutable exclusive "
                                 "with TestServer parameter")
            self._server = app_or_server
        elif isinstance(app_or_server, Application):
            scheme = "http" if scheme is sentinel else scheme
            host = '127.0.0.1' if host is sentinel else host
            self._server = TestServer(app_or_server,
                                      scheme=scheme, host=host)
        else:
            raise TypeError("app_or_server should be either web.Application "
                            "or TestServer instance")
        self._loop = self._server._loop
        if cookie_jar is None:
            cookie_jar = aiohttp.CookieJar(unsafe=True,
                                           loop=self._loop)
        self._session = ClientSession(loop=self._loop,
                                      cookie_jar=cookie_jar,
                                      **kwargs)
        self._closed = False
        self._responses = []
        self._websockets = []
项目:aiohttp-devtools    作者:aio-libs    | 项目源码 | 文件源码
def test_start_runserver_app_instance(tmpworkdir, loop):
    mktree(tmpworkdir, {
        'app.py': """\
from aiohttp import web

async def hello(request):
    return web.Response(text='<h1>hello world</h1>', content_type='text/html')

app = web.Application()
app.router.add_get('/', hello)
"""
    })
    asyncio.set_event_loop(loop)
    aux_app, aux_port, _ = runserver(app_path='app.py', host='foobar.com')
    assert isinstance(aux_app, aiohttp.web.Application)
    assert aux_port == 8001
    assert len(aux_app.on_startup) == 1
    assert len(aux_app.on_shutdown) == 1
项目:aiohttp-devtools    作者:aio-libs    | 项目源码 | 文件源码
def test_start_runserver_no_loop_argument(tmpworkdir, loop):
    mktree(tmpworkdir, {
        'app.py': """\
from aiohttp import web

async def hello(request):
    return web.Response(text='<h1>hello world</h1>', content_type='text/html')

def app():
    a = web.Application()
    a.router.add_get('/', hello)
    return a
"""
    })
    asyncio.set_event_loop(loop)
    aux_app, aux_port, _ = runserver(app_path='app.py')
    assert isinstance(aux_app, aiohttp.web.Application)
    assert aux_port == 8001
    assert len(aux_app.on_startup) == 1
    assert len(aux_app.on_shutdown) == 1
项目:aiohttp-devtools    作者:aio-libs    | 项目源码 | 文件源码
def test_serve_main_app_app_instance(tmpworkdir, loop, mocker):
    mktree(tmpworkdir, {
        'app.py': """\
from aiohttp import web

async def hello(request):
    return web.Response(text='<h1>hello world</h1>', content_type='text/html')

app = web.Application()
app.router.add_get('/', hello)
"""
    })
    asyncio.set_event_loop(loop)
    mocker.spy(loop, 'create_server')
    mock_modify_main_app = mocker.patch('aiohttp_devtools.runserver.serve.modify_main_app')
    loop.call_later(0.5, loop.stop)

    config = Config(app_path='app.py')
    serve_main_app(config, '/dev/tty')

    assert loop.is_closed()
    loop.create_server.assert_called_with(mock.ANY, '0.0.0.0', 8000, backlog=128)
    mock_modify_main_app.assert_called_with(mock.ANY, config)
项目:webfixy    作者:takeshixx    | 项目源码 | 文件源码
def jsproxy_get(self, request):
        """Handle GET requests to jsproxy, decoding encrypted query strings."""
        loop = asyncio.get_event_loop()
        client = aiohttp.ClientSession(loop=loop)
        p, k = yield from self.session.tx_decrypt_uri(request.query_string)
        LOGGER.info('*** SEND_PLAINTEXT_URL: {}'.format(p))
        resp = yield from client.get(self.target_url + request.path)
        try:
            data = yield from resp.read()
        finally:
            yield from resp.release()

        headers = dict()
        for k, v in resp.headers.items():
            if k == 'CONTENT-ENCODING':
                continue
            headers[k] = v
        yield from client.close()
        return aiohttp.web.Response(status=resp.status, headers=headers, body=data)
项目:webfixy    作者:takeshixx    | 项目源码 | 文件源码
def forward_request(self, request):
        """Handle any other requests."""
        loop = asyncio.get_event_loop()
        client = aiohttp.ClientSession(loop=loop)
        resp = yield from client.request(
                request.method,
                self.target_url + request.path)
        try:
            data = yield from resp.read()
        finally:
            yield from resp.release()

        headers = dict()
        for k, v in resp.headers.items():
            if k.lower() == 'content-encoding':
                continue
            headers[k] = v

        yield from client.close()
        return aiohttp.web.Response(status=resp.status, headers=headers, body=data)
项目:uzdevsbot    作者:Uzbek-Developers    | 项目源码 | 文件源码
def webhook_handle(self, request):
        """
        aiohttp.web handle for processing web hooks

        :Example:

        >>> from aiohttp import web
        >>> app = web.Application()
        >>> app.router.add_route('/webhook')
        """
        update = await request.json()
        self._process_update(update)
        return web.Response()
项目:uzdevsbot    作者:Uzbek-Developers    | 项目源码 | 文件源码
def create_webhook_app(self, path, loop=None):
        """
        Shorthand for creating aiohttp.web.Application with registered webhook hanlde
        """
        app = web.Application(loop=loop)
        app.router.add_route('POST', path, self.webhook_handle)
        return app
项目:backend.ai-client-py    作者:lablup    | 项目源码 | 文件源码
def upload(self, files: Sequence[str]):
        rqst = Request('POST', '/kernel/{}/upload'.format(self.kernel_id))
        rqst.content = [
            # name filename file content_type headers
            aiohttp.web.FileField(
                'src', path, open(path, 'rb'), 'application/octet-stream', None
            ) for path in files
        ]
        return (await rqst.asend())

    # only supported in AsyncKernel
项目:tasker    作者:wavenator    | 项目源码 | 文件源码
def handle_get_root(
        self,
        request,
    ):
        return aiohttp.web.HTTPFound('/dashboard/index.html')
项目:sadm    作者:prologin    | 项目源码 | 文件源码
def get_special_handler(self, handler_func):
        """Wrap a special handler into a Tornado-compatible handler class."""

        class SpecialHandler(tornado.web.RequestHandler):
            """Wrapper handler for special resources.
            """
            def get(self):
                status_code, reason, headers, content = handler_func()
                self.set_status(status_code, reason)
                for name, value in headers.items():
                    self.set_header(name, value)
                self.write(content.encode('utf-8'))

        return SpecialHandler
项目:sadm    作者:prologin    | 项目源码 | 文件源码
def __init__(self, routes, app_name, loop=None):
        self.app = aiohttp.web.Application()
        self.app_name = app_name
        # TODO(seirl): integrate with HANDLED_URLS
        for route in routes:
            self.app.router.add_route(*route)
        self.loop = loop or asyncio.get_event_loop()
项目:sadm    作者:prologin    | 项目源码 | 文件源码
def run(self, **kwargs):
        aiohttp.web.run_app(self.app, loop=self.loop,
                            print=lambda *_: None, **kwargs)
项目:milisia-tox    作者:milisarge    | 项目源码 | 文件源码
def sendMessage(request):
    global tox
    result=""
    fno = request.match_info.get('arkadasno')
    text = "Mesaj gönderildi--> {}".format(fno)
    no=int(fno)
    if tox.friend_get_connection_status(no):
        result=tox.friend_send_message(no,0,"selam ben milis p2p servisiyim.")
    text+="-"+str(result)
    return web.Response(body=text.encode('utf-8'))
项目:milisia-tox    作者:milisarge    | 项目源码 | 文件源码
def deleteFriend(request):
    global tox
    result=""
    fno = request.match_info.get('arkadasno')
    text = "Dugum silindi--> {}".format(fno)
    no=int(fno)
    result=tox.friend_delete(no)
    data = tox.get_savedata()
    ProfileHelper.save_profile(data)
    text+="-"+str(result)
    return web.Response(body=text.encode('utf-8'))
项目:milisia-tox    作者:milisarge    | 项目源码 | 文件源码
def guncelle(request):
    komut="git pull > guncelleme.log"
    durum=yield from komutar(komut)
    log=open("guncelleme.log","r").read()
    loghtml="<html>güncellendi:<p>"+log+"<p><a href='/'>ana sayfa</a> </html>"
    return web.Response(body=loghtml.encode(kodsayfasi))
项目:milisia-tox    作者:milisarge    | 项目源码 | 文件源码
def dugumler(request):
    global tox
    text=""
    for num in tox.self_get_friend_list():
        text+=str(num)+"-"+tox.friend_get_name(tox.self_get_friend_list()[num])+"\n"
    return web.Response(body=text.encode(kodsayfasi))
项目:aioprometheus    作者:claws    | 项目源码 | 文件源码
def add(self,
                  registry: CollectorRegistry) -> aiohttp.web.Response:
        '''
        Add works like replace, but only metrics with the same name as the
        newly pushed metrics are replaced.
        '''
        async with aiohttp.ClientSession(loop=self.loop) as session:
            payload = self.formatter.marshall(registry)
            resp = await session.post(
                self.path, data=payload, headers=self.headers)
        await resp.release()
        return resp
项目:aioprometheus    作者:claws    | 项目源码 | 文件源码
def delete(self,
                     registry: CollectorRegistry) -> aiohttp.web.Response:
        '''
        ``delete`` deletes metrics from the push gateway. All metrics with
        the grouping key specified in the URL are deleted.
        '''
        async with aiohttp.ClientSession(loop=self.loop) as session:
            payload = self.formatter.marshall(registry)
            resp = await session.delete(
                self.path, data=payload, headers=self.headers)
        await resp.release()
        return resp
项目:aioprometheus    作者:claws    | 项目源码 | 文件源码
def accepts(self,
                request: aiohttp.web.Request) -> Set[str]:
        ''' Return a sequence of accepts items in the request headers '''
        accepts = set()  # type: Set[str]
        accept_headers = request.headers.getall(ACCEPT)
        logger.debug('accept: {}'.format(accept_headers))
        for accept_items in accept_headers:
            if ';' in accept_items:
                accept_items = [i.strip() for i in accept_items.split(';')]
            else:
                accept_items = [accept_items]
            accepts.update(accept_items)
        return accepts
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def get_app(self, loop):
        """
        This method should be overridden
        to return the aiohttp.web.Application
        object to test.

        :param loop: the event_loop to use
        :type loop: asyncio.BaseEventLoop
        """
        pass  # pragma: no cover
项目:aiohttp-devtools    作者:aio-libs    | 项目源码 | 文件源码
def test_run_app_test_client(tmpworkdir, test_client):
    mktree(tmpworkdir, SIMPLE_APP)
    config = Config(app_path='app.py')
    app_factory = config.import_app_factory()
    app = app_factory()
    modify_main_app(app, config)
    assert isinstance(app, aiohttp.web.Application)
    cli = await test_client(app)
    r = await cli.get('/')
    assert r.status == 200
    text = await r.text()
    assert text == 'hello world'
项目:web-boardimage    作者:niklasf    | 项目源码 | 文件源码
def make_svg(self, request):
        try:
            parts = request.GET["fen"].replace("_", " ").split(" ", 1)
            board = chess.BaseBoard("/".join(parts[0].split("/")[0:8]))
        except KeyError:
            raise aiohttp.web.HTTPBadRequest(reason="fen required")
        except ValueError:
            raise aiohttp.web.HTTPBadRequest(reason="invalid fen")

        try:
            size = min(max(int(request.GET.get("size", 360)), 16), 1024)
        except ValueError:
            raise aiohttp.web.HTTPBadRequest(reason="size is not a number")

        try:
            uci = request.GET.get("lastMove") or request.GET["lastmove"]
            lastmove = chess.Move.from_uci(uci)
        except KeyError:
            lastmove = None
        except ValueError:
            raise aiohttp.web.HTTPBadRequest(reason="lastMove is not a valid uci move")

        try:
            check = chess.SQUARE_NAMES.index(request.GET["check"])
        except KeyError:
            check = None
        except ValueError:
            raise aiohttp.web.HTTPBadRequest(reason="check is not a valid square name")

        try:
            arrows = [arrow(s.strip()) for s in request.GET.get("arrows", "").split(",") if s.strip()]
        except ValueError:
            raise aiohttp.web.HTTPBadRequest(reason="invalid arrow")

        flipped = request.GET.get("orientation", "white") == "black"

        return chess.svg.board(board, coordinates=False, flipped=flipped, lastmove=lastmove, check=check, arrows=arrows, size=size, style=self.css)
项目:web-boardimage    作者:niklasf    | 项目源码 | 文件源码
def render_svg(self, request):
        return aiohttp.web.Response(text=self.make_svg(request), content_type="image/svg+xml")
项目:web-boardimage    作者:niklasf    | 项目源码 | 文件源码
def render_png(self, request):
        svg_data = self.make_svg(request)
        png_data = cairosvg.svg2png(bytestring=svg_data)
        return aiohttp.web.Response(body=png_data, content_type="image/png")
项目:aioprometheus    作者:claws    | 项目源码 | 文件源码
def start(self,
                    addr: str = '',
                    port: int = 0,
                    ssl: SSLContext = None,
                    metrics_url: str = DEFAULT_METRICS_PATH,
                    discovery_agent=None) -> None:
        ''' Start the prometheus metrics HTTP(S) server.

        :param addr: the address to bind the server on. By default this is
          set to an empty string so that the service becomes available on
          all interfaces.

        :param port: The port to bind the server on. The default value is 0
          which will cause the server to bind to an ephemeral port. If you
          want the server to operate on a fixed port then you need to specifiy
          the port.

        :param ssl: a sslContext for use with TLS.

        :param metrics_url: The name of the endpoint route to expose
          prometheus metrics on. Defaults to '/metrics'.

        :param discovery_agent: an agent that can register the metrics
          service with a service discovery mechanism.

        :raises: Exception if the server could not be started.
        '''
        logger.debug(
            'Prometheus metrics server starting on %s:%s%s',
            addr, port, metrics_url)

        if self._svr:
            logger.warning(
                'Prometheus metrics server is already running')
            return

        self._svc = aiohttp.web.Application()
        self._metrics_url = metrics_url
        self._svc.router.add_route(
            GET, metrics_url, self.handle_metrics)
        self._handler = self._svc.make_handler()
        self._https = ssl is not None
        try:
            self._svr = await self.loop.create_server(
                self._handler, addr, port, ssl=ssl)
        except Exception:
            logger.exception('error creating metrics server')
            raise

        logger.debug('Prometheus metrics server started on %s', self.url)

        # register service with service discovery
        if discovery_agent:
            await discovery_agent.register(self)
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def make_mocked_request(method, path, headers=None, *,
                        version=HttpVersion(1, 1), closing=False,
                        app=None,
                        reader=sentinel,
                        writer=sentinel,
                        transport=sentinel,
                        payload=sentinel,
                        sslcontext=None,
                        secure_proxy_ssl_header=None):
    """Creates mocked web.Request testing purposes.

    Useful in unit tests, when spinning full web server is overkill or
    specific conditions and errors are hard to trigger.

    """

    if version < HttpVersion(1, 1):
        closing = True

    if headers:
        hdrs = CIMultiDict(headers)
        raw_hdrs = [
            (k.encode('utf-8'), v.encode('utf-8')) for k, v in headers.items()]
    else:
        hdrs = CIMultiDict()
        raw_hdrs = []

    message = RawRequestMessage(method, path, version, hdrs,
                                raw_hdrs, closing, False)
    if app is None:
        app = _create_app_mock()

    if reader is sentinel:
        reader = mock.Mock()

    if writer is sentinel:
        writer = mock.Mock()

    if transport is sentinel:
        transport = _create_transport(sslcontext)

    if payload is sentinel:
        payload = mock.Mock()

    time_service = mock.Mock()
    time_service.time.return_value = 12345
    time_service.strtime.return_value = "Tue, 15 Nov 1994 08:12:31 GMT"

    task = mock.Mock()

    req = Request(message, payload,
                  transport, reader, writer,
                  time_service, task,
                  secure_proxy_ssl_header=secure_proxy_ssl_header)

    match_info = UrlMappingMatchInfo({}, mock.Mock())
    match_info.add_app(app)
    req._match_info = match_info

    return req
项目:aiohttp-devtools    作者:aio-libs    | 项目源码 | 文件源码
def test_start_runserver(tmpworkdir, caplog):
    mktree(tmpworkdir, {
        'app.py': """\
from aiohttp import web

async def hello(request):
    return web.Response(text='<h1>hello world</h1>', content_type='text/html')

async def has_error(request):
    raise ValueError()

def create_app(loop):
    app = web.Application()
    app.router.add_get('/', hello)
    app.router.add_get('/error', has_error)
    return app""",
        'static_dir/foo.js': 'var bar=1;',
    })
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    aux_app, aux_port, _ = runserver(app_path='app.py', static_path='static_dir')
    assert isinstance(aux_app, aiohttp.web.Application)
    assert aux_port == 8001
    start_app = aux_app.on_startup[0]
    stop_app = aux_app.on_shutdown[0]
    loop.run_until_complete(start_app(aux_app))

    async def check_callback(session):
        async with session.get('http://localhost:8000/') as r:
            assert r.status == 200
            assert r.headers['content-type'].startswith('text/html')
            text = await r.text()
            assert '<h1>hello world</h1>' in text
            assert '<script src="http://localhost:8001/livereload.js"></script>' in text

        async with session.get('http://localhost:8000/error') as r:
            assert r.status == 500
            assert 'raise ValueError()' in (await r.text())

    try:
        loop.run_until_complete(check_server_running(loop, check_callback))
    finally:
        loop.run_until_complete(stop_app(aux_app))
    assert (
        'adev.server.dft INFO: pre-check enabled, checking app factory\n'
        'adev.server.dft INFO: Starting aux server at http://localhost:8001 ?\n'
        'adev.server.dft INFO: serving static files from ./static_dir/ at http://localhost:8001/static/\n'
        'adev.server.dft INFO: Starting dev server at http://localhost:8000 ?\n'
    ) in caplog