我们从Python开源项目中,提取了以下33个代码示例,用于说明如何使用aiohttp.web()。
def run_webhook(self, webhook_url, **options): """ Convenience method for running bots in webhook mode :Example: >>> if __name__ == '__main__': >>> bot.run_webhook(webhook_url="https://yourserver.com/webhooktoken") Additional documentation on https://core.telegram.org/bots/api#setwebhook """ loop = asyncio.get_event_loop() loop.run_until_complete(self.set_webhook(webhook_url, **options)) if webhook_url: url = urlparse(webhook_url) app = self.create_webhook_app(url.path, loop) host = os.environ.get('HOST', '0.0.0.0') port = int(os.environ.get('PORT', 0)) or url.port web.run_app(app, host=host, port=port)
def handle_incoming_webhook(self, request): """This coroutine handles incoming webhooks: It receives incoming webhooks and relays the messages to XMPP.""" if request.content_type == 'application/json': payload = await request.json() # print(payload) else: # TODO: Handle other content types payload = await request.post() # Disgard empty messages if payload['text'] == "": return aiohttp.web.Response() token = payload['token'] logging.debug("--> Handling incoming request from token " "'{}'...".format(token)) username = payload['user_name'] msg = payload['text'] for bridge in self.bridges: bridge.handle_incoming_webhook(token, username, msg) return aiohttp.web.Response()
def dps_baglan(request): link="" global tox durum="" data = yield from request.post() toxid = data['kdugum'] print (toxid) port =33999 lport=random.randrange(38000,40000) komut="./tuntox -i "+str(toxid)+" -L "+str(lport)+":127.0.0.1:"+str(port) print ("dugumler aras? tunel ac?l?yor.") #tunel id kaydetmek için-?u an iptal #open("yenidugum","w").write(toxid) durum=yield from komutar(komut) link=lokalhost+":"+str(lport) return web.json_response(data=link)
def replace(self, registry: CollectorRegistry) -> aiohttp.web.Response: ''' ``replace`` pushes new values for a group of metrics to the push gateway. .. note:: All existing metrics with the same grouping key specified in the URL will be replaced with the new metrics value. ''' async with aiohttp.ClientSession(loop=self.loop) as session: payload = self.formatter.marshall(registry) resp = await session.put( self.path, data=payload, headers=self.headers) await resp.release() return resp
def __init__(self, registry: Registry = None, loop: BaseEventLoop = None) -> None: ''' Initialise the Prometheus metrics service. :param registry: A :class:`CollectorRegistry` instance that will hold all the metrics for this service. If no registry if specified then the default registry is used. :param loop: The event loop instance to use. If no loop is specified then the default event loop will be retrieved. :raises: Exception if the registry object is not an instance of the Registry type. ''' self.loop = loop or asyncio.get_event_loop() if registry is not None and not isinstance(registry, Registry): raise Exception( 'registry must be a Registry, got: {}'.format(registry)) self.registry = registry or Registry() self._svr = None # type: Server self._svc = None # type: aiohttp.web.Application self._handler = None # type: aiohttp.web.RequestHandlerFactory self._https = None # type: bool
def __init__(self, app_or_server, *, scheme=sentinel, host=sentinel, cookie_jar=None, **kwargs): if isinstance(app_or_server, BaseTestServer): if scheme is not sentinel or host is not sentinel: raise ValueError("scheme and host are mutable exclusive " "with TestServer parameter") self._server = app_or_server elif isinstance(app_or_server, Application): scheme = "http" if scheme is sentinel else scheme host = '127.0.0.1' if host is sentinel else host self._server = TestServer(app_or_server, scheme=scheme, host=host) else: raise TypeError("app_or_server should be either web.Application " "or TestServer instance") self._loop = self._server._loop if cookie_jar is None: cookie_jar = aiohttp.CookieJar(unsafe=True, loop=self._loop) self._session = ClientSession(loop=self._loop, cookie_jar=cookie_jar, **kwargs) self._closed = False self._responses = [] self._websockets = []
def test_start_runserver_app_instance(tmpworkdir, loop): mktree(tmpworkdir, { 'app.py': """\ from aiohttp import web async def hello(request): return web.Response(text='<h1>hello world</h1>', content_type='text/html') app = web.Application() app.router.add_get('/', hello) """ }) asyncio.set_event_loop(loop) aux_app, aux_port, _ = runserver(app_path='app.py', host='foobar.com') assert isinstance(aux_app, aiohttp.web.Application) assert aux_port == 8001 assert len(aux_app.on_startup) == 1 assert len(aux_app.on_shutdown) == 1
def test_start_runserver_no_loop_argument(tmpworkdir, loop): mktree(tmpworkdir, { 'app.py': """\ from aiohttp import web async def hello(request): return web.Response(text='<h1>hello world</h1>', content_type='text/html') def app(): a = web.Application() a.router.add_get('/', hello) return a """ }) asyncio.set_event_loop(loop) aux_app, aux_port, _ = runserver(app_path='app.py') assert isinstance(aux_app, aiohttp.web.Application) assert aux_port == 8001 assert len(aux_app.on_startup) == 1 assert len(aux_app.on_shutdown) == 1
def test_serve_main_app_app_instance(tmpworkdir, loop, mocker): mktree(tmpworkdir, { 'app.py': """\ from aiohttp import web async def hello(request): return web.Response(text='<h1>hello world</h1>', content_type='text/html') app = web.Application() app.router.add_get('/', hello) """ }) asyncio.set_event_loop(loop) mocker.spy(loop, 'create_server') mock_modify_main_app = mocker.patch('aiohttp_devtools.runserver.serve.modify_main_app') loop.call_later(0.5, loop.stop) config = Config(app_path='app.py') serve_main_app(config, '/dev/tty') assert loop.is_closed() loop.create_server.assert_called_with(mock.ANY, '0.0.0.0', 8000, backlog=128) mock_modify_main_app.assert_called_with(mock.ANY, config)
def jsproxy_get(self, request): """Handle GET requests to jsproxy, decoding encrypted query strings.""" loop = asyncio.get_event_loop() client = aiohttp.ClientSession(loop=loop) p, k = yield from self.session.tx_decrypt_uri(request.query_string) LOGGER.info('*** SEND_PLAINTEXT_URL: {}'.format(p)) resp = yield from client.get(self.target_url + request.path) try: data = yield from resp.read() finally: yield from resp.release() headers = dict() for k, v in resp.headers.items(): if k == 'CONTENT-ENCODING': continue headers[k] = v yield from client.close() return aiohttp.web.Response(status=resp.status, headers=headers, body=data)
def forward_request(self, request): """Handle any other requests.""" loop = asyncio.get_event_loop() client = aiohttp.ClientSession(loop=loop) resp = yield from client.request( request.method, self.target_url + request.path) try: data = yield from resp.read() finally: yield from resp.release() headers = dict() for k, v in resp.headers.items(): if k.lower() == 'content-encoding': continue headers[k] = v yield from client.close() return aiohttp.web.Response(status=resp.status, headers=headers, body=data)
def webhook_handle(self, request): """ aiohttp.web handle for processing web hooks :Example: >>> from aiohttp import web >>> app = web.Application() >>> app.router.add_route('/webhook') """ update = await request.json() self._process_update(update) return web.Response()
def create_webhook_app(self, path, loop=None): """ Shorthand for creating aiohttp.web.Application with registered webhook hanlde """ app = web.Application(loop=loop) app.router.add_route('POST', path, self.webhook_handle) return app
def upload(self, files: Sequence[str]): rqst = Request('POST', '/kernel/{}/upload'.format(self.kernel_id)) rqst.content = [ # name filename file content_type headers aiohttp.web.FileField( 'src', path, open(path, 'rb'), 'application/octet-stream', None ) for path in files ] return (await rqst.asend()) # only supported in AsyncKernel
def handle_get_root( self, request, ): return aiohttp.web.HTTPFound('/dashboard/index.html')
def get_special_handler(self, handler_func): """Wrap a special handler into a Tornado-compatible handler class.""" class SpecialHandler(tornado.web.RequestHandler): """Wrapper handler for special resources. """ def get(self): status_code, reason, headers, content = handler_func() self.set_status(status_code, reason) for name, value in headers.items(): self.set_header(name, value) self.write(content.encode('utf-8')) return SpecialHandler
def __init__(self, routes, app_name, loop=None): self.app = aiohttp.web.Application() self.app_name = app_name # TODO(seirl): integrate with HANDLED_URLS for route in routes: self.app.router.add_route(*route) self.loop = loop or asyncio.get_event_loop()
def run(self, **kwargs): aiohttp.web.run_app(self.app, loop=self.loop, print=lambda *_: None, **kwargs)
def sendMessage(request): global tox result="" fno = request.match_info.get('arkadasno') text = "Mesaj gönderildi--> {}".format(fno) no=int(fno) if tox.friend_get_connection_status(no): result=tox.friend_send_message(no,0,"selam ben milis p2p servisiyim.") text+="-"+str(result) return web.Response(body=text.encode('utf-8'))
def deleteFriend(request): global tox result="" fno = request.match_info.get('arkadasno') text = "Dugum silindi--> {}".format(fno) no=int(fno) result=tox.friend_delete(no) data = tox.get_savedata() ProfileHelper.save_profile(data) text+="-"+str(result) return web.Response(body=text.encode('utf-8'))
def guncelle(request): komut="git pull > guncelleme.log" durum=yield from komutar(komut) log=open("guncelleme.log","r").read() loghtml="<html>güncellendi:<p>"+log+"<p><a href='/'>ana sayfa</a> </html>" return web.Response(body=loghtml.encode(kodsayfasi))
def dugumler(request): global tox text="" for num in tox.self_get_friend_list(): text+=str(num)+"-"+tox.friend_get_name(tox.self_get_friend_list()[num])+"\n" return web.Response(body=text.encode(kodsayfasi))
def add(self, registry: CollectorRegistry) -> aiohttp.web.Response: ''' Add works like replace, but only metrics with the same name as the newly pushed metrics are replaced. ''' async with aiohttp.ClientSession(loop=self.loop) as session: payload = self.formatter.marshall(registry) resp = await session.post( self.path, data=payload, headers=self.headers) await resp.release() return resp
def delete(self, registry: CollectorRegistry) -> aiohttp.web.Response: ''' ``delete`` deletes metrics from the push gateway. All metrics with the grouping key specified in the URL are deleted. ''' async with aiohttp.ClientSession(loop=self.loop) as session: payload = self.formatter.marshall(registry) resp = await session.delete( self.path, data=payload, headers=self.headers) await resp.release() return resp
def accepts(self, request: aiohttp.web.Request) -> Set[str]: ''' Return a sequence of accepts items in the request headers ''' accepts = set() # type: Set[str] accept_headers = request.headers.getall(ACCEPT) logger.debug('accept: {}'.format(accept_headers)) for accept_items in accept_headers: if ';' in accept_items: accept_items = [i.strip() for i in accept_items.split(';')] else: accept_items = [accept_items] accepts.update(accept_items) return accepts
def get_app(self, loop): """ This method should be overridden to return the aiohttp.web.Application object to test. :param loop: the event_loop to use :type loop: asyncio.BaseEventLoop """ pass # pragma: no cover
def test_run_app_test_client(tmpworkdir, test_client): mktree(tmpworkdir, SIMPLE_APP) config = Config(app_path='app.py') app_factory = config.import_app_factory() app = app_factory() modify_main_app(app, config) assert isinstance(app, aiohttp.web.Application) cli = await test_client(app) r = await cli.get('/') assert r.status == 200 text = await r.text() assert text == 'hello world'
def make_svg(self, request): try: parts = request.GET["fen"].replace("_", " ").split(" ", 1) board = chess.BaseBoard("/".join(parts[0].split("/")[0:8])) except KeyError: raise aiohttp.web.HTTPBadRequest(reason="fen required") except ValueError: raise aiohttp.web.HTTPBadRequest(reason="invalid fen") try: size = min(max(int(request.GET.get("size", 360)), 16), 1024) except ValueError: raise aiohttp.web.HTTPBadRequest(reason="size is not a number") try: uci = request.GET.get("lastMove") or request.GET["lastmove"] lastmove = chess.Move.from_uci(uci) except KeyError: lastmove = None except ValueError: raise aiohttp.web.HTTPBadRequest(reason="lastMove is not a valid uci move") try: check = chess.SQUARE_NAMES.index(request.GET["check"]) except KeyError: check = None except ValueError: raise aiohttp.web.HTTPBadRequest(reason="check is not a valid square name") try: arrows = [arrow(s.strip()) for s in request.GET.get("arrows", "").split(",") if s.strip()] except ValueError: raise aiohttp.web.HTTPBadRequest(reason="invalid arrow") flipped = request.GET.get("orientation", "white") == "black" return chess.svg.board(board, coordinates=False, flipped=flipped, lastmove=lastmove, check=check, arrows=arrows, size=size, style=self.css)
def render_svg(self, request): return aiohttp.web.Response(text=self.make_svg(request), content_type="image/svg+xml")
def render_png(self, request): svg_data = self.make_svg(request) png_data = cairosvg.svg2png(bytestring=svg_data) return aiohttp.web.Response(body=png_data, content_type="image/png")
def start(self, addr: str = '', port: int = 0, ssl: SSLContext = None, metrics_url: str = DEFAULT_METRICS_PATH, discovery_agent=None) -> None: ''' Start the prometheus metrics HTTP(S) server. :param addr: the address to bind the server on. By default this is set to an empty string so that the service becomes available on all interfaces. :param port: The port to bind the server on. The default value is 0 which will cause the server to bind to an ephemeral port. If you want the server to operate on a fixed port then you need to specifiy the port. :param ssl: a sslContext for use with TLS. :param metrics_url: The name of the endpoint route to expose prometheus metrics on. Defaults to '/metrics'. :param discovery_agent: an agent that can register the metrics service with a service discovery mechanism. :raises: Exception if the server could not be started. ''' logger.debug( 'Prometheus metrics server starting on %s:%s%s', addr, port, metrics_url) if self._svr: logger.warning( 'Prometheus metrics server is already running') return self._svc = aiohttp.web.Application() self._metrics_url = metrics_url self._svc.router.add_route( GET, metrics_url, self.handle_metrics) self._handler = self._svc.make_handler() self._https = ssl is not None try: self._svr = await self.loop.create_server( self._handler, addr, port, ssl=ssl) except Exception: logger.exception('error creating metrics server') raise logger.debug('Prometheus metrics server started on %s', self.url) # register service with service discovery if discovery_agent: await discovery_agent.register(self)
def make_mocked_request(method, path, headers=None, *, version=HttpVersion(1, 1), closing=False, app=None, reader=sentinel, writer=sentinel, transport=sentinel, payload=sentinel, sslcontext=None, secure_proxy_ssl_header=None): """Creates mocked web.Request testing purposes. Useful in unit tests, when spinning full web server is overkill or specific conditions and errors are hard to trigger. """ if version < HttpVersion(1, 1): closing = True if headers: hdrs = CIMultiDict(headers) raw_hdrs = [ (k.encode('utf-8'), v.encode('utf-8')) for k, v in headers.items()] else: hdrs = CIMultiDict() raw_hdrs = [] message = RawRequestMessage(method, path, version, hdrs, raw_hdrs, closing, False) if app is None: app = _create_app_mock() if reader is sentinel: reader = mock.Mock() if writer is sentinel: writer = mock.Mock() if transport is sentinel: transport = _create_transport(sslcontext) if payload is sentinel: payload = mock.Mock() time_service = mock.Mock() time_service.time.return_value = 12345 time_service.strtime.return_value = "Tue, 15 Nov 1994 08:12:31 GMT" task = mock.Mock() req = Request(message, payload, transport, reader, writer, time_service, task, secure_proxy_ssl_header=secure_proxy_ssl_header) match_info = UrlMappingMatchInfo({}, mock.Mock()) match_info.add_app(app) req._match_info = match_info return req
def test_start_runserver(tmpworkdir, caplog): mktree(tmpworkdir, { 'app.py': """\ from aiohttp import web async def hello(request): return web.Response(text='<h1>hello world</h1>', content_type='text/html') async def has_error(request): raise ValueError() def create_app(loop): app = web.Application() app.router.add_get('/', hello) app.router.add_get('/error', has_error) return app""", 'static_dir/foo.js': 'var bar=1;', }) loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) aux_app, aux_port, _ = runserver(app_path='app.py', static_path='static_dir') assert isinstance(aux_app, aiohttp.web.Application) assert aux_port == 8001 start_app = aux_app.on_startup[0] stop_app = aux_app.on_shutdown[0] loop.run_until_complete(start_app(aux_app)) async def check_callback(session): async with session.get('http://localhost:8000/') as r: assert r.status == 200 assert r.headers['content-type'].startswith('text/html') text = await r.text() assert '<h1>hello world</h1>' in text assert '<script src="http://localhost:8001/livereload.js"></script>' in text async with session.get('http://localhost:8000/error') as r: assert r.status == 500 assert 'raise ValueError()' in (await r.text()) try: loop.run_until_complete(check_server_running(loop, check_callback)) finally: loop.run_until_complete(stop_app(aux_app)) assert ( 'adev.server.dft INFO: pre-check enabled, checking app factory\n' 'adev.server.dft INFO: Starting aux server at http://localhost:8001 ?\n' 'adev.server.dft INFO: serving static files from ./static_dir/ at http://localhost:8001/static/\n' 'adev.server.dft INFO: Starting dev server at http://localhost:8000 ?\n' ) in caplog