我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用aiohttp.ClientSession()。
def __init__(self, *args, debug=False, **kwargs): self._debug = debug self.game = config.game game = discord.Game(name=self.game) status = discord.Status.dnd if self._debug else discord.Status.online super().__init__(*args, command_prefix=command_prefix, game=game, status=status, **kwargs) self._before_invoke = self._before_invoke_ self._after_invoke = self._after_invoke_ self.resumes = 0 useragent = 'Discord Bot' source = config.source if source is not None: useragent += ' ' + source self.http_ = aiohttp.ClientSession(loop=self.loop, headers={'User-Agent': useragent}) self.db_pool = self.loop.run_until_complete( asyncpg.create_pool(dsn=config.pg_dsn, command_timeout=10, loop=self.loop))
def credential_verfication(self, username, password): auth = aiohttp.BasicAuth(login=username, password=password) url = "https://myanimelist.net/api/account/verify_credentials.xml" with aiohttp.ClientSession(auth=auth) as session: async with session.get(url) as response: status = response.status if status == 200: return True if status == 401: await self.bot.say("Username and Password is incorrect.") return False if status == 403: await self.bot.say("Too many failed login attempts. Try putting in the" "correct credentials after some time has passed.") return False
def check(self, first_check=False): from pyplanet import __version__ as current_version logging.debug('Checking for new versions...') async with aiohttp.ClientSession() as session: async with session.get(self.url) as resp: for release in await resp.json(): if not release['draft'] and not release['prerelease']: self.latest = release['tag_name'] break self.current = current_version logging.debug('Version check, your version: {}, online version: {}'.format(self.current, self.latest)) if first_check and self.update_available: logging.info('New version of PyPlanet available, consider updating: {}'.format(self.latest)) await self.instance.chat( '\uf1e6 $FD4$oPy$369Planet$z$s$fff \uf0e7 new version available: v{}. Consider updating!'.format(self.latest) )
def start(self, instance): """ Initiate the analytics. :param instance: Instance of controller. :type instance: pyplanet.core.instance.Instance """ self.instance = instance self.client = aiohttp.ClientSession() try: await self.capture('start_controller', dict( app_labels=list(self.instance.apps.apps.keys()) )) except: pass # Start report loop asyncio.ensure_future(self.loop())
def __init__(self, client=None, download_strategy=None, request_strategy=None): if not client: # Get the event loop and initialize a client session if not provided self.loop = asyncio.get_event_loop() self.client = aiohttp.ClientSession(loop=self.loop) else: # Or grab the event loop from the client session self.loop = client._loop self.client = client # Configuration objects managing download and request strategies self._download_strategy = download_strategy or DownloadStrategy() # chunk_size, home, skip_cached self._request_strategy = request_strategy or Lenient() # concurrent, max_attempts, timeout # Bounded semaphore guards how many requests can run concurrently self._main_semaphore = asyncio.BoundedSemaphore(self._request_strategy.concurrent)
def oauth2(code): url = 'https://api.weibo.com/oauth2/access_token' payload = { 'client_id': '366603916', 'client_secret': 'b418efbd77094585d0a7f9ccac98a706', 'grant_type': 'authorization_code', 'code': code, 'redirect_uri': 'http://www.qiangtaoli.com' } with ClientSession() as session: async with session.post(url, data=payload) as resp: params = await resp.json() async with session.get('https://api.weibo.com/2/users/show.json', params=params) as resp: info = await resp.json() o = await Oauth.find('weibo-' + info['idstr']) if not o: return 'redirect:/bootstrap/register?oid=weibo-%s&name=%s&image=%s' % (info['idstr'], info['name'], info['avatar_large']) user = await User.find(o.user_id) if not user: return 'oauth user was deleted.' return user.signin(web.HTTPFound('/')) # ????
def connect_websocket(self, sess=None): ''' Creates a WebSocket connection. ''' assert self.method == 'GET' if sess is None: sess = aiohttp.ClientSession() else: assert isinstance(sess, aiohttp.ClientSession) try: ws = await sess.ws_connect(self.build_url(), headers=self.headers) return sess, ws except Exception as e: msg = 'Request to the API endpoint has failed.\n' \ 'Check your network connection and/or the server status.' raise BackendClientError(msg) from e
def test_asend_with_appropriate_method(mocker, req_params): req = Request(**req_params) methods = Request._allowed_methods for method in methods: req.method = method mock_reqfunc = mocker.patch.object( aiohttp.ClientSession, method.lower(), autospec=True) assert mock_reqfunc.call_count == 0 try: # Ignore exceptions in `async with` statement. We're only # interested in request call here. await req.asend() except BackendClientError: pass mock_reqfunc.assert_called_once_with( mocker.ANY, req.build_url(), data=req._content, headers=req.headers)
def test_asend_returns_appropriate_sorna_response(mocker, req_params, mock_sorna_aresp): req = Request(**req_params) methods = Request._allowed_methods for method in methods: req.method = method mock_reqfunc = mocker.patch.object( aiohttp.ClientSession, method.lower(), new_callable=asynctest.CoroutineMock ) mock_reqfunc.return_value, conf = mock_sorna_aresp resp = await req.asend() assert isinstance(resp, Response) assert resp.status == conf['status'] assert resp.reason == conf['reason'] assert resp.content_type == conf['content_type'] body = await conf['read']() assert resp.content_length == len(body) assert resp.text() == body.decode() assert resp.json() == json.loads(body.decode())
def run_scraping(url, timestamps, scrape_function, concurrency, user_agent): """ Run the scraping function asynchronously on the given archives. The concurrency parameter limits the number of concurrent connections to the web archive. """ # Use a semaphore to limit the number of concurrent connections to the internet archive sem = asyncio.Semaphore(concurrency) # Use one session to benefit from connection pooling async with aiohttp.ClientSession(headers={'User-Agent': user_agent}) as session: # Create scraping coroutines for each archive coroutines = [scrape_archive(session, url, timestamp, scrape_function, sem) for timestamp in timestamps] # Wait for coroutines to finish and gather the results results = await asyncio.gather(*coroutines) # Compile each valid scraping results in a dictionary return {timestamp: result for timestamp, result in results if result is not None}
def aget(self, **kw): session = aiohttp.ClientSession() url = URL(self.url) if kw: url = url.with_query(**kw) logger.debug("GET %s", url) try: response = yield from session.get(url, timeout=10) payload = yield from response.read() finally: yield from session.close() response.raise_for_status() payload = payload.decode('utf-8') if response.content_type == 'text/x-python': payload = ast.literal_eval(payload) return Payload.factory(response.status, response.headers, payload)
def apost(self, headers={}, data=None, **kw): session = aiohttp.ClientSession() url = URL(self.url) if kw: url = url.with_query(**kw) logger.debug("POST %s", url) try: response = yield from session.post( url, headers=headers, data=data, timeout=10, ) payload = yield from response.read() finally: yield from session.close() response.raise_for_status() payload = payload.decode('utf-8') return Payload.factory(response.status, response.headers, payload)
def process(self, event_fn): logger = self.get_logger() ts = time() state = None async with aiohttp.ClientSession() as session: async with session.get(self.url) as resp: state = self.translate_status(resp.status) te = time() span = int((te - ts) * 1000) logger.debug("Request to {url} returned status code {code}(as {state})" "in {span} milliseconds.".format(url=self.url, code=resp.status, state=state, span=span)) event_fn(service=self.prefix + "health", metric_f=span, state=str(state), description=self.url)
def __init__(self, auth_url, username, tenant, loop=None, log=None, cafile=None, token_renew_delay=3300): self.auth_url = auth_url self.username = username self.tenant = tenant self.log = log self.token_renew_delay = token_renew_delay self.loop = loop or asyncio.get_event_loop() self.headers = {"content-type": "application/json", "accept": "application/json"} if cafile: sslcontext = ssl.create_default_context(cafile=cafile) conn = aiohttp.TCPConnector(ssl_context=sslcontext) self.session = aiohttp.ClientSession(connector=conn, loop=self.loop) else: session = aiohttp.ClientSession(loop=self.loop)
def send_sms(recipients: Iterable[str], msg: str, username: str, api_key: str, sender: str): data = { 'messages': [], } # type: Dict[str, List] for recipient in recipients: data['messages'].append({ 'source': 'python', 'from': sender, 'body': msg[:140], 'to': recipient, 'schedule': '' }) try: async with aiohttp.ClientSession(headers={'Content-Type': 'application/json'}, auth=aiohttp.BasicAuth(username, api_key)) as session: async with session.post(CLICKSEND_URL, data=json.dumps(data), timeout=30) as resp: if resp.status != 200: log.msg('Error sending clicksend sms notification: http status %s' % (str(resp.status)), 'NOTIFICATION') except aiohttp.ClientError as e: log.msg('Error sending clicksend sms notification: %s' % (str(e)), 'NOTIFICATIONS')
def get_user_series(self, username: str, series_type: str): """ :param username: The name of the accounts information you're trying to get :param series_type: If you're looking for manga or anime :return type list: """ params = { "u": username, "status": 'all', "type": series_type } if series_type not in ("anime", "manga"): raise InvalidSeriesTypeException else: with aiohttp.ClientSession(auth=self._auth, headers={"User-Agent": self.user_agent}) as session: async with session.get(MAL_APP_INFO, params=params) as response: # Raise an error if we get the wrong response code if response.status != 200: raise ResponseError(response.status) # Get the response text and set parser soup = bs4.BeautifulSoup(await response.text(), "lxml") return [dict(self.process_(child) for child in anime.children) for anime in soup.find_all(series_type)] # End of bit Zeta wrote
def get_user_data(self, user: str) -> UserInfo: """ :param user: username who's information we're getting :return type list: """ # List that stores all the UserInfo Objects to return with aiohttp.ClientSession(auth=self._auth, headers={"User-Agent": self.user_agent}) as session: async with session.get(MAL_APP_INFO, params={"u": user}) as response: # Raise an error if we get the wrong response code if response.status != 200: raise ResponseError(response.status) response_data = await response.read() # We want the [0] index as myanimelist always returns the user data first user_info = etree.fromstring(response_data)[0] # Add to list containing UserInfo objects return UserInfo( id=user_info.find("user_id").text, username=user_info.find("user_name").text, watching=user_info.find("user_watching").text, completed=user_info.find("user_completed").text, on_hold=user_info.find("user_onhold").text, dropped=user_info.find("user_dropped").text, plan_to_watch=user_info.find("user_plantowatch").text, days_spent_watching=user_info.find("user_days_spent_watching").text )
def __init__(self, sparcli): self.sparcli = sparcli self.urbanSite = 'https://api.urbandictionary.com/v0/define?term={}' self.wolfClient = None self.nounlist = [] # Set up Wolfram if wolframalphaImported == True: try: tokens = getTokens() secret = tokens['WolframAlpha']['Secret'] self.wolfClient = Client(secret) except KeyError: pass # Set up noun list self.nounlist = [] self.session = ClientSession(loop=sparcli.loop)
def read_stream(app_name, auth_token): while True: stream_url = yield from get_stream_url(app_name, auth_token) print('Reading stream: %s' % stream_url) log = b'' with aiohttp.ClientSession() as session: response = yield from session.get(stream_url) while True: try: chunk = yield from response.content.read(1) except aiohttp.ServerDisconnectedError: break if not chunk: break if chunk == b'\n': try: yield from write_to_queue(log) except ValueError as e: print(str(e)) log = b'' else: log += chunk
def test_request_proxy(dummy_client): class RaiseProxy: def __init__(self, *args, proxy=None, **kwargs): raise RuntimeError(proxy) async with aiohttp.ClientSession() as session: with patch.object(session, 'request', side_effect=RaiseProxy): try: await dummy_client.request(method='get', url="http://hello.com", proxy="http://some.proxy.com", session=session, future=asyncio.Future()) except RuntimeError as e: assert str(e) == "http://some.proxy.com"
def test_stream_cancel(event_loop): async def cancel(task): await asyncio.sleep(0.001) task.cancel() async def test_stream_iterations(stream): while True: await test_stream_iteration(stream) with aiohttp.ClientSession(loop=event_loop) as session: client = peony.client.BasePeonyClient("", "", session=session) context = peony.stream.StreamResponse(method='GET', url="http://whatever.com", client=client) with context as stream: with patch.object(stream, '_connect', side_effect=stream_content): coro = test_stream_iterations(stream) task = event_loop.create_task(coro) cancel_task = event_loop.create_task(cancel(task)) with aiohttp.Timeout(1): await asyncio.wait([task, cancel_task])
def _setup(self): if self._session is None: logger.debug("Creating session") self._session = aiohttp.ClientSession() # this will allow requests to be made starting from this point self.setup.early.set_result(True) init_tasks = self.init_tasks if callable(init_tasks): init_tasks = init_tasks() if init_tasks: logger.debug("Starting init tasks") await asyncio.gather(*init_tasks) self.setup.set_result(True)
def on_guild_join(server): server_count = len(bot.guilds) member_count = 0 for server in bot.guilds: for member in server.members: member_count += 1 await bot.change_presence(game=discord.Game(name=bot.command_prefix[0]+"help | {} guilds with {} members.".format(server_count, member_count))) webhook.send(':tada: [`'+str(datetime.datetime.now().strftime("%d/%m/%y %H:%M:%S"))+'`] I joined the server `' + server.name + '` ('+ str(server.id) + '), owned by `' + server.owner.name + '#' + server.owner.discriminator + '` (' + str(server.owner.id) + ').') guild_count = len(bot.guilds) headers = {'Authorization': config['Main']['dbotstoken']} data = {'server_count': guild_count} api_url = 'https://discordbots.org/api/bots/311810096336470017/stats' async with aiohttp.ClientSession() as session: await session.post(api_url, data=data, headers=headers) # server leave
def test_auth_with_valid_data(self): s = TestAuthSession(login=USER_LOGIN, password=USER_PASSWORD, app_id=APP_ID) s.driver.session = aiohttp.ClientSession(connector=aiohttp.TCPConnector(verify_ssl=False), response_class=CustomClientResponse) yield from s.authorize() params = {'client_id': APP_ID, 'display': 'page', 'redirect_uri': REDIRECT_URI, 'response_type': 'code'} with aiohttp.Timeout(10): response = yield from s.driver.session.get("https://oauth.vk.com/authorize", params=params, allow_redirects=True) s.close() code = response.url.query.get('code') self.assertIsNotNone(code) s = AuthorizationCodeSession(APP_ID, APP_SECRET, REDIRECT_URI, code) yield from s.authorize() s.close() self.assertIsNotNone(s.access_token)
def set(self, ctx, lamp, state): """ Set lamp state. Changes state of [lamp] to [state] and sends server response """ tmp = await self.bot.send_message(ctx.message.channel, "Requesting") payload = {'lamp': lamp, 'state': state, 'user_agent': 'AutomaBot'} r = "" try: async with aiohttp.ClientSession() as session: async with session.post(self.url_post, data=payload) as resp: r = await resp.text() except aiohttp.errors.ClientOSError: raise APIconnectionError() embed = make_embed_message("*Result!*", json.loads(r), self.bot, ctx.message) await self.bot.edit_message(tmp, new_content='Right now : ', embed=embed)
def subrequest( orig_request, path, relative_to_site=True, headers={}, body=None, params=None, method='GET'): """Subrequest, initial implementation doing a real request.""" session = aiohttp.ClientSession() method = method.lower() if method not in SUBREQUEST_METHODS: raise AttributeError('No valid method ' + method) caller = getattr(session, method) for head in orig_request.headers: if head not in headers: headers[head] = orig_request.headers[head] params = { 'headers': headers, 'params': params } if method in ['put', 'patch']: params['data'] = body return caller(path, **params)
def main(): """Scriptworker entry point: get everything set up, then enter the main loop.""" context, credentials = get_context_from_cmdln(sys.argv[1:]) log.info("Scriptworker starting up at {} UTC".format(arrow.utcnow().format())) cleanup(context) conn = aiohttp.TCPConnector(limit=context.config['aiohttp_max_connections']) loop = asyncio.get_event_loop() with aiohttp.ClientSession(connector=conn) as session: context.session = session context.credentials = credentials while True: try: loop.run_until_complete(async_main(context)) except Exception: log.critical("Fatal exception", exc_info=1) raise
def fetch(self, point, key=conf.GOOGLE_MAPS_KEY): if not key: return self.fallback() try: async with ClientSession(loop=LOOP) as session: async with session.get( 'https://maps.googleapis.com/maps/api/elevation/json', params={'locations': '{0[0]},{0[1]}'.format(point), 'key': key}, timeout=10) as resp: response = await resp.json(loads=json_loads) altitude = response['results'][0]['elevation'] self.altitudes[point] = altitude self.changed = True return altitude except CancelledError: raise except Exception: try: self.log.error(response['error_message']) except (KeyError, NameError): self.log.error('Error fetching altitude for {}.', point) return self.fallback()
def __awaitable__(self): if self._data is None: with aiohttp.ClientSession(connector=aiohttp.TCPConnector(verify_ssl=self.api.verify_ssl)) as session: wait_time = self._wait_time() if wait_time is None and self.api: try: await self._make_async_request(session) except ServiceUnavailableException: await asyncio.sleep(60) self._wait_time() await self._make_async_request(session) else: await asyncio.sleep(wait_time) await self._make_async_request(session) return self
def sfp(self, ctx, user: discord.Member=None): """Super Falcon Punch""" author = ctx.message.author if not user: await self.bot.say("{} has Super Falcon Punched!".format(author.mention)) with aiohttp.ClientSession() as session: async with session.get("https://cdn.discordapp.com/attachments/172354611477348352/193299243539234817/imgres.jpg") as resp: test = await resp.read() with open("data/commands/Images/imgres.png", "wb") as f: f.write(test) await self.bot.upload("data/commands/Images/imgres.png") else: await self.bot.say("{} has Super Falcon Punched {} and blew him away!".format(author.mention, user.mention)) with aiohttp.ClientSession() as session: async with session.get("https://cdn.discordapp.com/attachments/172354611477348352/193299243539234817/imgres.jpg") as resp: test = await resp.read() with open("data/commands/Images/imgres.png", "wb") as f: f.write(test) await self.bot.upload("data/commands/Images/imgres.png")
def slash(self, ctx, user: discord.Member = None): """Slash a fool!""" author = ctx.message.author if user: with aiohttp.ClientSession() as session: async with session.get("https://cdn.discordapp.com/attachments/175246808967151616/217342324919894017/7e469f8443630de4f8cedb1c87b161d3.jpg") as resp: test = await resp.read() with open("data/commands/Images/imgres.png", "wb") as f: f.write(test) await self.bot.say("{} just slashed {}!\nFOOL!".format(author.mention, user.mention)) await self.bot.upload("data/commands/Images/imgres.png") else: with aiohttp.ClientSession() as session: async with session.get("https://cdn.discordapp.com/attachments/175246808967151616/217342324919894017/7e469f8443630de4f8cedb1c87b161d3.jpg") as resp: test = await resp.read() with open("data/commands/Images/imgres.png", "wb") as f: f.write(test) await self.bot.say("{} just slashed someone. Who knows, he forgot the darn mention.".format(author.mention)) await self.bot.upload("data/commands/Images/imgres.png")
def suckmydonut(self, ctx, user : discord.Member = None): """Suck My Donuts Beeyatch!""" author = ctx.message.author if not user: with aiohttp.ClientSession() as session: async with session.get("http://owned.com/media/_cache/adjusted/postblock/image/4/6/7/2/4672.jpg.png") as resp: test = await resp.read() with open("data/commands/Images/imgres.png", "wb") as f: f.write(test) await self.bot.say("{} says to:".format(author.mention)) await self.bot.upload("data/commands/Images/imgres.png") else: with aiohttp.ClientSession() as session: async with session.get("http://owned.com/media/_cache/adjusted/postblock/image/4/6/7/2/4672.jpg.png") as resp: test = await resp.read() with open("data/commands/Images/imgres.png", "wb") as f: f.write(test) await self.bot.say("{} tells {} to:".format(author.mention, user.mention)) await self.bot.upload("data/commands/Images/imgres.png")
def spank(self, ctx, user : discord.Member = None): """Spank""" author = ctx.message.author if not user: with aiohttp.ClientSession() as session: async with session.get("https://images-ext-1.discordapp.net/eyJ1cmwiOiJodHRwczovL2Nkbi5kaXNjb3JkYXBwLmNvbS9hdHRhY2htZW50cy8xMDc5NDI2NTIyNzU2MDE0MDgvMTA3OTQ1MDg3MzUwMDc5NDg4L1R1SEdKLmdpZiJ9.-XeFHSFOR0nv53M34HeUBqQc7Wc.gif") as resp: test = await resp.read() with open("data/commands/Images/imgres.gif", "wb") as f: f.write(test) await self.bot.say("{} spanked someone! :scream:".format(author.mention)) await self.bot.upload("data/commands/Images/imgres.gif") else: with aiohttp.ClientSession() as session: async with session.get("https://images-ext-1.discordapp.net/eyJ1cmwiOiJodHRwczovL2Nkbi5kaXNjb3JkYXBwLmNvbS9hdHRhY2htZW50cy8xMDc5NDI2NTIyNzU2MDE0MDgvMTA3OTQ1MDg3MzUwMDc5NDg4L1R1SEdKLmdpZiJ9.-XeFHSFOR0nv53M34HeUBqQc7Wc.gif") as resp: test = await resp.read() with open("data/commands/Images/imgres.gif", "wb") as f: f.write(test) await self.bot.say("{} spanked {}! :scream:".format(author.mention, user.mention)) await self.bot.upload("data/commands/Images/imgres.gif")
def _listen(self): """Listen to the WebSocket URL.""" with ClientSession() as session: ws = yield from session._ws_connect(self.rtm['url']) self.ws = ws try: while True: msg = yield from ws.receive() if msg.tp == MsgType.close: break assert msg.tp == MsgType.text message = json.loads(msg.data) yield from self.queue.put(message) finally: yield from ws.close()
def make_request(self, method, path, params=None, data=None, authenticated=True, auth_type='Basic', headers={}, token=testing.ADMIN_TOKEN, accept='application/json'): settings = {} headers = headers.copy() settings['headers'] = headers if accept is not None: settings['headers']['ACCEPT'] = accept if authenticated and token is not None: settings['headers']['AUTHORIZATION'] = '{} {}'.format( auth_type, token) settings['params'] = params settings['data'] = data async with aiohttp.ClientSession(loop=self.loop) as session: operation = getattr(session, method.lower(), None) async with operation(self.server.make_url(path), **settings) as resp: try: value = await resp.json() status = resp.status except: # noqa value = await resp.read() status = resp.status return value, status, resp.headers
def get_packages_info(max_pkgs=MAX_PKGS, start_time=None): await asyncio.sleep(1) # ensure the server is highly responsive on bootup fmt = 'Gathering Python 3 support info on the top {:,} PyPI packages...' print(fmt.format(max_pkgs)) start_time = start_time or time.time() packages = [] with aiohttp.ClientSession() as session: tasks = create_tasks(session, max_pkgs) while tasks: current_block, tasks = tasks[:200], tasks[200:] packages += await asyncio.gather(*current_block) if len(packages) == 200: html = create_index(packages).splitlines() html = '\n'.join(line.rstrip() for line in html if line.strip()) with open('index.html', 'w') as out_file: out_file.write(html) print('index.html written with {:,} packages after {:.2f} ' 'seconds.'.format(len(packages), time.time() - start_time)) return enhance_packages(packages), datetime.datetime.utcnow()
def get_remote_data(self, url): async with aiohttp.ClientSession() as session: async with session.get(url) as resp: if resp.status != 200: raise web.HTTPBadGateway session.close() try: return await resp.json() or '' except: raise web.HTTPBadGateway
def create_session(self): self.session = await aiohttp.ClientSession( cookie_jar=self.cookie_jar, headers={ 'User-Agent': 'PyPlanet/{}'.format(pyplanet_version), 'X-ManiaPlanet-ServerLogin': self.server_login } ).__aenter__()
def test_aiodownload_init_client(): download = AioDownload(client=ClientSession()) assert isinstance(download.client, ClientSession)
def __init__(self, username, secret, uri=EMARSYS_URI): super().__init__(username, secret, uri) self.session = aiohttp.ClientSession()
def get(self, resource): url = self._uri + resource logger.debug("REST.Get> {}".format(url)) with aiohttp.ClientSession() as session: async with session.get(url, headers=headers) as response: self._validate(url, response) response = await response.json() logger.debug("REST.Resp> Response: {}".format(response)) return response
def post(self, resource, params={}): url = self._uri + resource logger.debug("REST.Post> {}".format(url)) with aiohttp.ClientSession() as session: if self._need_auth: params.update(self._auth.get_params()) async with session.post(url, data=params) as response: self._validate(url, response) response = await response.json() logger.debug("REST.Resp> {}".format(response)) return response
def shorten(self, ctx, url): """Shorten url""" key = self.loadapi["ApiKey"] shorten = 'https://www.googleapis.com/urlshortener/v1/url?key=' + key payload = {"longUrl": url} headers = {'content-type': 'application/json'} async with aiohttp.ClientSession() as session: async with session.post(shorten,data=json.dumps(payload),headers=headers) as resp: print(resp.status) yes = await resp.json() await self.bot.say(yes['id'])
def expand(self, ctx, url): """Expand goo.gl url""" key = self.loadapi["ApiKey"] async with aiohttp.ClientSession() as session: async with session.get('https://www.googleapis.com/urlshortener/v1/url?key=' + key + '&shortUrl=' + url) as resp: print(resp.status) yes = await resp.json() await self.bot.say(yes['longUrl'])
def analytics(self, ctx, url): """Analytics for url""" key = self.loadapi["ApiKey"] async with aiohttp.ClientSession() as session: async with session.get('https://www.googleapis.com/urlshortener/v1/url?key=' + key + '&shortUrl=' + url + '&projection=FULL') as resp: print(resp.status) yes = await resp.json() embed = discord.Embed(colour=discord.Colour.blue()) embed.add_field(name="**Shortened Url:**",value=yes['id']) embed.add_field(name="**Long Url:**",value=yes['longUrl']) embed.add_field(name="**Date Created:**",value=yes['created']) embed.add_field(name="**Clicks:**",value=yes['analytics']['allTime']['shortUrlClicks']) embed.set_image(url="https://www.ostraining.com/cdn/images/coding/google-url-shortener-tool.jpg") await self.bot.say(embed=embed)
def __init__(self, bot): self.bot = bot self.session = aiohttp.ClientSession(loop=self.bot.loop)
def alias(self, ctx, alias): """Create D.io link""" key = self.loadapi["ApiKey"] inv = await self.bot.create_invite(ctx.message.server, unique=False) async with aiohttp.ClientSession() as session: async with session.get('https://discord.io/api?api={0}&url={1}'.format(key, inv) + "&custom=" + alias + '&format=text') as resp: print(resp.status) try: await self.bot.say(await resp.text()) except discord.errors.HTTPException: await self.bot.say("Link already exists, or the alias is reserved for Pro users.")
def fetch_user_mal(self, name, url, cmd): with aiohttp.ClientSession() as session: async with session.get(url.format(name)) as response: data = await response.text() try: root = ET.fromstring(data) except ET.ParseError: return '', '' else: if len(root) == 0: return '', '' collection = {x.find('series_title').text for x in root.findall(cmd)} entry = root.find('myinfo') if cmd == "anime": info = [entry.find(x).text for x in ['user_watching', 'user_completed', 'user_onhold', 'user_dropped', 'user_days_spent_watching']] return collection, info else: info = [entry.find(x).text for x in ['user_reading', 'user_completed', 'user_onhold', 'user_dropped', 'user_days_spent_watching']] return collection, info
def get_xml(self, nature, name): username = self.credentials["Username"] password = self.credentials["Password"] name = name.replace(" ", "_") auth = aiohttp.BasicAuth(login=username, password=password) url = 'https://myanimelist.net/api/{}/search.xml?q={}'.format(nature, name) with aiohttp.ClientSession(auth=auth) as session: async with session.get(url) as response: data = await response.text() return data
def _location_pokemon(self, *, poke: str): """Get a Pokémon's catch location. Example !pokedex location voltorb """ location_url = "http://pokemondb.net/pokedex/{}".format(poke) with aiohttp.ClientSession() as session: async with session.get(location_url) as response: soup = BeautifulSoup(await response.text(), "html.parser") loc = [] version = [] div = soup.find('div', attrs={'class': 'col desk-span-7 lap-span-12'}) table = div.find('table', attrs={'class': 'vitals-table'}) cols = [ele.text.strip() for ele in table.find_all('td') if ele] loc.append(cols) tcols = [ele.strings for ele in table.find_all('th') if ele] version.append([', '.join(x) for x in tcols]) # We have to extract out the base index, because it scrapes as # a list of a list. Then we can stack and tabulate. extract_loc = loc[0] extract_version = version[0] m = list(zip(extract_version, extract_loc)) t = tabulate(m, headers=["Game Version", "Location"]) await self.bot.say("```{}```".format(t))