我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用aiohttp.request()。
def moneyconvert(self, amount: float, base: str, to: str): """Currency converter Set the amount, currency FROM (base) and currency TO Available currencies for conversion: AUD BGN BRL CAD CHF CNY CZK DKK EUR GBP HKD HRK HUF IDR ILS INR JPY KRW MXN MYR NOK NZD PHP PLN RON RUB SEK SGD THB TRY USD ZAR ***WARNING*** Conversion may not be exact""" if base.upper() not in self.currencies or to.upper() not in self.currencies: await self.bot.say('One or both of the currencies selected are invalid') else: api = 'http://api.fixer.io/latest?base={}'.format(base.upper()) async with aiohttp.request("GET", api) as r: result = await r.json() rate = result['rates'][to.upper()] converted_amount = amount * rate pre_conv = '{0:.2f}'.format(amount) post_conv = '{0:.2f}'.format(converted_amount) await self.bot.say('`{} {} = {} {}`'.format(base.upper(), pre_conv, to.upper(), post_conv))
def get_async(ym, x): ''' ??GET????xls???? parameters ========== ym : str ?? x : dict ??????? ''' with (await sem): name, sid = x['name'], x['sid'] url = 'http://www.chinabond.com.cn/DownLoadxlsx?sId={}&sBbly={}&sMimeType=4'.format( sid, ym) outfilename = 'data/{}{}{}.xls'.format(sid, name, ym) logging.info('downloading %s', outfilename) response = await aiohttp.request('GET', url, headers=headers) with closing(response), open(outfilename, 'wb') as file: while True: # ???? chunk = await response.content.read(1024) if not chunk: break file.write(chunk) logging.info('done %s', outfilename)
def send_http_request(self, app: str, service: str, version: str, method: str, entity: str, params: dict): """ A convenience method that allows you to send a well formatted http request to another service """ host, port, node_id, service_type = self._registry_client.resolve(service, version, entity, HTTP) url = 'http://{}:{}{}'.format(host, port, params.pop('path')) http_keys = ['data', 'headers', 'cookies', 'auth', 'allow_redirects', 'compress', 'chunked'] kwargs = {k: params[k] for k in http_keys if k in params} query_params = params.pop('params', {}) if app is not None: query_params['app'] = app query_params['version'] = version query_params['service'] = service response = yield from aiohttp.request(method, url, params=query_params, **kwargs) return response
def _request_sender(self, packet: dict): """ Sends a request to a server from a ServiceClient auto dispatch method called from self.send() """ node_id = self._get_node_id_for_packet(packet) client_protocol = self._client_protocols.get(node_id) if node_id and client_protocol: if client_protocol.is_connected(): packet['to'] = node_id client_protocol.send(packet) return True else: self._logger.error('Client protocol is not connected for packet %s', packet) raise ClientDisconnected() else: # No node found to send request self._logger.error('Out of %s, Client Not found for packet %s, restarting server...', self._client_protocols.keys(), packet) raise ClientNotFoundError()
def get_flag(base_url, cc): url = '{}/{cc}/{cc}.gif'.format(base_url, cc=cc.lower()) resp = yield from aiohttp.request('GET', url) with contextlib.closing(resp): if resp.status == 200: image = yield from resp.read() return image elif resp.status == 404: raise web.HTTPNotFound() else: raise aiohttp.HttpProcessingError( code=resp.status, message=resp.reason, headers=resp.headers) # BEGIN FLAGS2_ASYNCIO_EXECUTOR
def http_get(url): res = yield from aiohttp.request('GET', url) if res.status == 200: ctype = res.headers.get('Content-type', '').lower() if 'json' in ctype or url.endswith('json'): data = yield from res.json() # <1> else: data = yield from res.read() # <2> return data elif res.status == 404: raise web.HTTPNotFound() else: raise aiohttp.errors.HttpProcessingError( code=res.status, message=res.reason, headers=res.headers)
def do_request(): proxy_url = 'http://198.23.237.213:1080' # your proxy address response = yield from aiohttp.request( 'GET', 'http://google.com', proxy=proxy_url, ) return response
def _send_to_external_chat(self, bot, event, config): if event.from_bot: # don't send my own messages return conversation_id = event.conv_id conversation_text = event.text user_id = event.user_id url = config["HUBOT_URL"] + conversation_id payload = {"from" : str(user_id.chat_id), "message" : conversation_text} headers = {'content-type': 'application/json'} connector = aiohttp.TCPConnector(verify_ssl=False) asyncio.ensure_future( aiohttp.request('post', url, data=json.dumps(payload), headers=headers, connector=connector) )
def _cookie(self): """Retrieves a random fortune cookie fortune.""" regex = ["class=\"cookie-link\">([^`]*?)<\/a>", "<p>([^`]*?)<\/p>", "(?:\\\\['])", "<strong>([^`]*?)<\/strong>", "<\/strong><\/a>([^`]*?)<br>", "3\)<\/strong><\/a>([^`]*?)<\/div>"] url = "http://www.fortunecookiemessage.com" await self.file_check() async with aiohttp.request("GET", url, headers={"encoding": "utf-8"}) as resp: test = str(await resp.text()) fortune = re.findall(regex[0], test) fortest = re.match("<p>", fortune[0]) if fortest is not None: fortune = re.findall(regex[1], fortune[0]) title = re.findall(regex[3], test) info = re.findall(regex[4], test) info[0] = html.unescape(info[0]) dailynum = re.findall(regex[5], test) self.fortune_process(fortune[0]) await self.bot.say("Your fortune is:") await self.bot.upload("data/horoscope/cookie-edit.png") await self.bot.say("\n" + title[1] + info[1] + "\n" + title[2] + dailynum[0]) os.remove("data/horoscope/cookie-edit.png")
def _font(self, url: str=None): """Allows you to set the font that the fortune cookies are shown in. Only accepts ttf.""" option = {'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64; rv:40.0)' ' Gecko/20100101 Firefox/40.1'} if url is None : url = "https://cdn.discordapp.com/attachments/218222973557932032/240223136447070208/FortuneCookieNF.ttf" if os.is_file("data/horoscope/FortuneCookieNF.ttf"): return else: async with aiohttp.request("GET", url, headers=option) as resp: test = await resp.read() with open("data/horoscope/FortuneCookieNF.ttf", "wb") as f: f.write(test) elif not url.endswith("ttf"): await self.bot.say("This is not a .ttf font, please use a .ttf font. Thanks") return await self.bot.say("Font has been saved")
def _strawpoll(self, ctx, *, question, options=None): """Makes a poll based on questions and choices or options. must be divided by "; " Examples: [p]strawpoll What is this person?; Who is this person?; Where is this person?; When is this person coming? [p]strawpoll What; Who?; Where?; When?; Why?""" options_list = question.split('; ') title = options_list[0] options_list.remove(title) if len(options_list) < 2: await self.bot.say("You need to specify 2 or more options") else: normal = {"title": title, "options": options_list} request = dict(normal, **self.settings) async with aiohttp.request('POST', 'http://strawpoll.me/api/v2/polls', headers={'content-type': 'application/json'}, data=json.dumps(request)) as resp: test = await resp.content.read() test = json.loads(test.decode()) sid = test["id"] await self.bot.say("Here's your strawpoll link: http://strawpoll.me/{}".format(sid))
def handle_request(self, message, payload): url = message.path logger.info('{0} {1}'.format(message.method, url)) if message.method in ('POST', 'PUT', 'PATCH'): data = yield from payload.read() else: data = None message, data = self.intercept_request(message, data) if not message: return response = yield from aiohttp.request(message.method, url, headers=message.headers, data=data) response_content = yield from response.content.read() response, response_content = self.intercept_response(response, response_content) yield from self.response_to_proxy_response(response, response_content)
def get_page(self, _url): ''' ???????? return str ''' header = { 'Accept-Encoding': 'gzip' } header['User-Agent'] = self.ualist[random.randint(0, len(self.ualist)-1)] if opts['user_agent']: header['User-Agent'] = opts['user_agent'] with (yield from semaphore): response = yield from aiohttp.request('GET', _url, headers = header) page = yield from response.read() try: if self.url_type == "2": return "None Content" if self.url_type == "4": return gzip.decompress(page).decode('gb2312').encode('utf-8') else: return gzip.decompress(page) except OSError: return page
def get(self, url, param): data = {} _url = self.req["protocol"] + self.req["host"] + ":" + str(self.req["port"]) + url print(_url +" get?????:"+str(param)) try: response = yield from aiohttp.request("GET", _url, headers=self.req["header"], params=param) string = (yield from response.read()).decode('utf-8') if response.status == 200: data = json.loads(string) else: print("data fetch failed for") print(response.content, response.status) data["status_code"] = response.status print(data) except asyncio.TimeoutError: print("????") except UnicodeDecodeError: print("?????") return data
def post(self,url, param): data = {} _url = self.req["protocol"] + self.req["host"] + ':' + str(self.req["port"]) + url print(_url + " post?????:" + str(param)) requests.post(_url,files=None, data=json.dumps(param), headers=self.req["header"]) response = yield from aiohttp.request('POST', _url, data=json.dumps(param), headers=self.req["header"]) string = (yield from response.read()).decode('utf-8') if response.status == 200: data = json.loads(string) else: print("data fetch failed for") print(response.content, response.status) data["status_code"] = response.status print(data) return data
def ping_coroutine(self, payload=None): try: res = yield from request('get', self._url) if res.status == 200: self.pong_received(payload=payload) res.close() except Exception: self.logger.exception('Error while ping')
def test_aiohttp(self): try: import aiohttp except ImportError: raise SkipTest("Requires aiohttp") from aiohttp import web zmq.asyncio.install() @asyncio.coroutine def echo(request): print(request.path) return web.Response(body=str(request).encode('utf8')) @asyncio.coroutine def server(loop): app = web.Application(loop=loop) app.router.add_route('GET', '/', echo) srv = yield from loop.create_server(app.make_handler(), '127.0.0.1', 8080) print("Server started at http://127.0.0.1:8080") return srv @asyncio.coroutine def client(): push, pull = self.create_bound_pair(zmq.PUSH, zmq.PULL) res = yield from aiohttp.request('GET', 'http://127.0.0.1:8080/') text = yield from res.text() yield from push.send(text.encode('utf8')) rcvd = yield from pull.recv() self.assertEqual(rcvd.decode('utf8'), text) loop = asyncio.get_event_loop() loop.run_until_complete(server(loop)) print("servered") loop.run_until_complete(client())
def callback(self, channel, data): """ Callback method """ # The data is a URL url = data # We return the response immediately print('Fetching URL',url,'...') future = aiohttp.request('GET', url) self.futures.append(future) return future
def fetch (url): r = yield from aiohttp.request ('GET', url) print ('%s %s' % (r.status, r.reason), end = " ") return (yield from r.read ())
def get_flag(base_url, cc): # <2> url = '{}/{cc}/{cc}.gif'.format(base_url, cc=cc.lower()) resp = yield from aiohttp.request('GET', url) with contextlib.closing(resp): if resp.status == 200: image = yield from resp.read() return image elif resp.status == 404: raise web.HTTPNotFound() else: raise aiohttp.HttpProcessingError( code=resp.status, message=resp.reason, headers=resp.headers)
def get_flag(base_url, cc): # <2> url = '{}/{cc}/{cc}.gif'.format(base_url, cc=cc.lower()) with closing(await aiohttp.request('GET', url)) as resp: if resp.status == 200: image = await resp.read() return image elif resp.status == 404: raise web.HTTPNotFound() else: raise aiohttp.HttpProcessingError( code=resp.status, message=resp.reason, headers=resp.headers)
def get_flag(cc): url = '{}/{cc}/{cc}.gif'.format(BASE_URL, cc=cc.lower()) res = yield from aiohttp.request('GET', url) image = yield from res.read() return image
def download(url): response = yield from aiohttp.request('GET', url) for k, v in response.items(): print('{}: {}'.format(k, v[:80])) data = yield from response.read() print('\nReceived {} bytes.\n'.format(len(data)))
def _request(self, url_, method_, params=None, data=None, **kwargs): """Perform request via `aiohttp.request`_. .. _aiohttp.request: http://aiohttp.readthedocs.org/en/v0.9.2/client.html#make-a-request """ response = yield from aiohttp.request(method_.lower(), url_, params=params, data=data, loop=self._loop, **utils.norm_aiohttp_kwargs(**kwargs)) return { 'code': response.status, 'body': (yield from response.text()), 'error': None, # TODO: how errors statues are described in aiohttp? }
def _handle_forwarding(bot, event, command): """Handle message forwarding""" # Test if message forwarding is enabled if not bot.get_config_suboption(event.conv_id, 'forwarding_enabled'): return forward_to_list = bot.get_config_suboption(event.conv_id, 'forward_to') if forward_to_list: logger.debug("{}".format(forward_to_list)) for _conv_id in forward_to_list: html_identity = "<b><a href='https://plus.google.com/u/0/{}/about'>{}</a></b><b>:</b> ".format(event.user_id.chat_id, event.user.full_name) html_message = event.text if not event.conv_event.attachments: yield from bot.coro_send_message( _conv_id, html_identity + html_message ) for link in event.conv_event.attachments: filename = os.path.basename(link) r = yield from aiohttp.request('get', link) raw = yield from r.read() image_data = io.BytesIO(raw) image_id = None try: image_id = yield from bot._client.upload_image(image_data, filename=filename) if not html_message: html_message = "(sent an image)" yield from bot.coro_send_message( _conv_id, html_identity + html_message, image_id=image_id ) except AttributeError: yield from bot.coro_send_message( _conv_id, html_identity + html_message + " " + link )
def image_upload_single(image_uri, bot): logger.info("getting {}".format(image_uri)) filename = os.path.basename(image_uri) r = yield from aiohttp.request('get', image_uri) raw = yield from r.read() image_data = io.BytesIO(raw) image_id = yield from bot._client.upload_image(image_data, filename=filename) return image_id
def handle(request): coroutines = [aiohttp.request('get', url) for url in REQEUST_URLS] results = await asyncio.gather(*coroutines, return_exceptions=True) response_data = { url: not isinstance(result, Exception) and result.status == 200 for url, result in zip(REQEUST_URLS, results) } body = json.dumps(response_data).encode('utf-8') return web.Response(body=body, content_type="application/json")
def get(url, *args, **kwargs): url = url.split('#')[0] with (yield from sem): response = yield from aiohttp.request('GET', url, *args, **kwargs) body = yield from response.read_and_close() body = body.decode('utf-8') print('GET', url) return body
def fetch(self, url, retry=3): '''?? URL?????? HTML ??''' try: start_time = self.loop.time() response = await aiohttp.request('GET', url) time_used = self.loop.time() - start_time except (TimeoutError, aiohttp.ClientResponseError) as e: # ?? if retry > 0: retry -= 1 await asyncio.sleep(1) return await self.fetch(url, retry) else: time_used = self.loop.time() - start_time logger.error('USE %6.3f s STAT: 500 URL: %s (%s)' % (time_used, url, e)) return '' except Exception as e: time_used = self.loop.time() - start_time logger.error('USE %6.3f s STAT: 500 URL: %s (%s)' % (time_used, url, e)) return '' if not (200 <= response.status < 300): logger.error('USE %6.3f s STAT: %s URL: %s' % (time_used, response.status, url)) # ?? html ?????? body = await response.read() try: return body.decode('utf-8') except UnicodeDecodeError: try: return body.decode('gbk') except UnicodeDecodeError: return body
def file_check(self): urls = ["https://images-2.discordapp.net/.eJwNwcENwyAMAMBdGABDCWCyDSKIoCYxwuZVdff27qPWvNSuTpHBO8DRudA8NAvN3Kp" "uRO2qeXTWhW7IIrmcd32EwQbjMCRMaJNxPmwILxcRg_9Da_yWYoQ3dV5z6fE09f0BC6EjAw.B0sII_QLbL9kJo6Zbb4GuO4MQNw", "https://cdn.discordapp.com/attachments/218222973557932032/240223136447070208/FortuneCookieNF.ttf"] option = {'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64; rv:40.0)' ' Gecko/20100101 Firefox/40.1'} if os.path.exists("data/horoscope/cookie.png"): async with aiohttp.request("GET", urls[0], headers=option) as resp: test = await resp.read() meow = False with open("data/horoscope/cookie.png", "rb") as e: if len(test) != len(e.read()): meow = True if meow: with open("data/horoscope/cookie.png", "wb") as f: f.write(test) elif not os.path.exists("data/horoscope/cookie.png"): async with aiohttp.request("GET", urls[0], headers=option) as resp: test = await resp.read() with open("data/horoscope/cookie.png", "wb") as f: f.write(test) if not os.path.exists("data/horoscope/FortuneCookieNF.ttf"): async with aiohttp.request("GET", urls[1], headers=option) as resp: test = await resp.read() with open("data/horoscope/FortuneCookieNF.ttf", "wb") as f: f.write(test)
def add(self, ctx, name, url): """Allows you to add emotes to the emote list [p]emotes add pan http://i.imgur.com/FFRjKBW.gifv""" server = ctx.message.server name = name.lower() option = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 ' '(KHTML, like Gecko) Chrome/52.0.2743.116 Safari/537.36'} if server.id not in self.servers: # default off self.servers[server.id] = dict({"status": False}) if "emotes" not in self.servers[server.id]: self.servers[server.id]["emotes"] = dict() dataIO.save_json(self.data_path, self.servers) if not url.endswith((".gif", ".gifv", ".png")): await self.bot.say("Links ending in .gif, .png, and .gifv are the only ones accepted." "Please try again with a valid emote link, thanks.") return if name in self.servers[server.id]["emotes"]: await self.bot.say("This keyword already exists, please use another keyword.") return if url.endswith(".gifv"): url = url.replace(".gifv", ".gif") try: await self.bot.say("Downloading {}.".format(name)) async with aiohttp.request("GET", url, headers=option) as r: emote = await r.read() with open(self.emote+"{}.{}".format(name, url[-3:]), 'wb') as f: f.write(emote) await self.bot.say("Adding {} to the list.".format(name)) self.servers[server.id]["emotes"][name] = "{}.{}".format(name, url[-3:]) self.servers[server.id]["emotes"] dataIO.save_json(self.data_path, self.servers) await self.bot.say("{} has been added to the list".format(name)) except Exception as e: print(e) await self.bot.say("It seems your url is not valid," " please make sure you are not typing names with spaces as they are and then the url." " If so, do [p]emotes add name_with_spaces url")
def _quotes(self, ctx, *, author: str): """Retrieves a specified number of quotes from a specified author. Max number of quotes at a time is 5. Examples: [p]quotes Morgan Freeman; 5 [p]quotes Margaret Thatcher; 2""" regex = "title=\"view quote\">([^`]*?)<\/a>" url = 'http://www.brainyquote.com/quotes/authors/' try: author = author.split('; ') title = author[0] number = int(author[1]) if number > 5: number = 5 await self.bot.reply("Heck naw brah. 5 is max. Any more and you get killed.") url = url + title.lower()[0] + "/" + title.lower().replace(" ", "_") + ".html" async with aiohttp.request("GET", url) as resp: test = str(await resp.text()) quote_find = list(set(re.findall(regex, test))) for i in range(number): random_quote = choice(quote_find) quote_find.remove(random_quote) while random_quote == title: random_quote = choice(quote_find) random_quote = random_quote.replace("'", "'") if "'" in random_quote else random_quote await self.bot.say(box(random_quote)) await asyncio.sleep(1) except IndexError: await self.bot.say("Your search is not valid, please follow the examples." "Make sure the names are correctly written\n" "[p]quotes Margaret Thatcher; 5\n[p]quotes Morgan Freeman; 5")
def dadjoke(self): """Gets a random dad joke.""" api = 'https://icanhazdadjoke.com/' async with aiohttp.request('GET', api, headers={'Accept': 'text/plain'}) as r: result = await r.text() await self.bot.say('`' + result + '`')
def typeracer(self, user: str): """Get user stats from typeracer API""" api = 'http://data.typeracer.com/users?id=tr:{}'.format(user) async with aiohttp.request("GET", api) as r: if r.status == 200: result = await r.json() random_colour = int("0x%06x" % random.randint(0, 0xFFFFFF), 16) last_scores = '\n'.join(str(int(x)) for x in result['tstats']['recentScores']) embed = discord.Embed(colour=random_colour) embed.set_author(name=result['name']) embed.add_field(name='Country', value=':flag_{}:'.format(result['country'])) embed.add_field(name='Level', value=result['tstats']['level']) embed.add_field(name='Wins', value=result['tstats']['gamesWon']) embed.add_field(name='Recent WPM', value=int(result['tstats']['recentAvgWpm'])) embed.add_field(name='Average WPM', value=int(result['tstats']['wpm'])) embed.add_field(name='Best WPM', value=int(result['tstats']['bestGameWpm'])) embed.add_field(name='Recent scores', value=last_scores) embed.set_footer(text='typeracer.com') embed.url = 'http://play.typeracer.com/' await self.bot.say(embed=embed) else: await self.bot.say('`Unable to retieve stats for user {}`'.format(user))
def _authenticate(self, data): async with aiohttp.request('GET', '{}/oauth2/profile'.format(settings.OSF['ACCOUNTS_URL']), headers={ 'Authorization': 'Bearer {}'.format(data.get('access_token')) }) as resp: if resp.status != 200: raise exceptions.Unauthorized() return 'user', 'osf', (await resp.json())['id'] # TODO Fuzz test me
def send_remote(self, url, data, headers=None, callback=None): headers = headers or {} if not self.state.should_try(): message = self._get_log_message(data) self.error_logger.error(message) return async def _future(): resp = await aiohttp.request('POST', url, data=data, headers=headers) await resp.release() asyncio.ensure_future(_future())
def http_request(self, http_method: str, url: str, send_json: dict, expected_statuses: List[str]=None) -> Tuple[int, dict, str]: """ Internal use. Method to make PATCH/POST requests to server using requests library. """ self.assert_sync() import requests logger.debug('%s request: %s', http_method.upper(), send_json) expected_statuses = expected_statuses or HttpStatus.ALL_OK response = requests.request(http_method, url, json=send_json, headers={'Content-Type': 'application/vnd.api+json'}, **self._request_kwargs) if response.status_code not in expected_statuses: raise DocumentError(f'Could not {http_method.upper()} ' f'({response.status_code}): ' f'{error_from_response(response)}', errors={'status_code': response.status_code}, response=response, json_data=send_json) return response.status_code, response.json() \ if response.content \ else {}, response.headers.get('Location')
def http_request_async( self, http_method: str, url: str, send_json: dict, expected_statuses: List[str]=None) \ -> Tuple[int, dict, str]: """ Internal use. Async version. Method to make PATCH/POST requests to server using aiohttp library. """ self.assert_async() logger.debug('%s request: %s', http_method.upper(), send_json) expected_statuses = expected_statuses or HttpStatus.ALL_OK content_type = '' if http_method == HttpMethod.DELETE else 'application/vnd.api+json' async with self._aiohttp_session.request( http_method, url, data=json.dumps(send_json), headers={'Content-Type':'application/vnd.api+json'}, **self._request_kwargs) as response: if response.status not in expected_statuses: raise DocumentError(f'Could not {http_method.upper()} ' f'({response.status}): ' f'{error_from_response(response)}', errors={'status_code': response.status}, response=response, json_data=send_json) response_json = await response.json(content_type=content_type) return response.status, response_json or {}, response.headers.get('Location')
def get_body(url): response = await aiohttp.request('GET', url) return await response.read()
def request(self, method, url, **kwargs): """Perform HTTP request.""" return _RequestContextManager(self._request(method, url, **kwargs))
def get(self, url, *, allow_redirects=True, **kwargs): """Perform HTTP GET request.""" return _RequestContextManager( self._request(hdrs.METH_GET, url, allow_redirects=allow_redirects, **kwargs))
def options(self, url, *, allow_redirects=True, **kwargs): """Perform HTTP OPTIONS request.""" return _RequestContextManager( self._request(hdrs.METH_OPTIONS, url, allow_redirects=allow_redirects, **kwargs))
def head(self, url, *, allow_redirects=False, **kwargs): """Perform HTTP HEAD request.""" return _RequestContextManager( self._request(hdrs.METH_HEAD, url, allow_redirects=allow_redirects, **kwargs))
def post(self, url, *, data=None, **kwargs): """Perform HTTP POST request.""" return _RequestContextManager( self._request(hdrs.METH_POST, url, data=data, **kwargs))
def patch(self, url, *, data=None, **kwargs): """Perform HTTP PATCH request.""" return _RequestContextManager( self._request(hdrs.METH_PATCH, url, data=data, **kwargs))
def delete(self, url, **kwargs): """Perform HTTP DELETE request.""" return _RequestContextManager( self._request(hdrs.METH_DELETE, url, **kwargs))
def get(url, **kwargs): warnings.warn("Use ClientSession().get() instead", DeprecationWarning) return request(hdrs.METH_GET, url, **kwargs)