我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用aiohttp.get()。
def _online_tibia(self): """Get total players playing""" url = "http://www.tibia.com/community/?subtopic=worlds" try: async with aiohttp.get(url) as response: soup = BeautifulSoup(await response.text(), "html.parser") div1 = soup.find('div', attrs={'id': 'RightArtwork'}) div2 = div1.find('div', attrs={'id': 'PlayersOnline'}) test = div2.get_text() test1 = test.replace("Players Online", "") new = "Players currently playing Tibia: " + test1 # div2 = div1.find('div', attrs={'class': 'Border_2'}) # div3 = div2.find('div', attrs={'class': 'Border_3'}) # table = div3.find_all('table', attrs={'class': 'Table1'}) # tr = table.find_all('tr') # tbody = div4.find('div', attrs={'class': 'CaptionInnerContainer'}) await self.bot.say(str(new)) except: await self.bot.say("Could not retrive data. The webserver may be offline.")
def _server_tibia(self, servername): """Get Server Info""" servername = servername.title() url = "https://secure.tibia.com/community/?subtopic=worlds&world=" + str(servername) try: async with aiohttp.get(url) as response: soup = BeautifulSoup(await response.text(), "html5lib") b = soup.find_all("table", attrs={'class': 'Table1'}) new = [] rows = b[1].tbody.div.find_all('td') for row in rows: new.append(row.get_text()) k = new[::2] l = new[1::2] zipped = list(zip(k, l)) t = tabulate(zipped, headers=["Category", "Info"]) await self.bot.say("```Python" + "\n" + str(t) + "```") except: await self.bot.say("Unable to retrive server data. The webserver may be offline.")
def add_smug(self, ctx, path): allowed_content = {'image/jpeg': 'jpg', 'image/png': 'png', 'image/gif': 'gif'} if not global_methods.is_admin(ctx.message.author): await self.bot.say("You're not a big boy") async with aiohttp.get(path) as r: if r.status == 200: file = await r.content.read() type = r.headers['Content-Type'] if type not in allowed_content: await self.bot.say("That kind of file is not allowed") return else: hash = hashlib.md5(file).hexdigest() filename = "smug-anime-faces/{}.{}".format(hash, allowed_content[type]) with open(filename, 'wb') as f: f.write(file) await self.bot.say("Smugness levels increased")
def suggest(self, ctx, *, suggestion : str): """Sends a suggestion to the owner.""" if settings.owner == "id_here": await self.bot.say("I have no owner set, cannot suggest.") return owner = discord.utils.get(self.bot.get_all_members(), id=settings.owner) author = ctx.message.author if ctx.message.channel.is_private is False: server = ctx.message.server source = "server **{}** ({})".format(server.name, server.id) else: source = "direct message" sender = "**{}** ({}) sent you a suggestion from {}:\n\n".format(author, author.id, source) message = sender + suggestion try: await self.bot.send_message(owner, message) except discord.errors.InvalidArgument: await self.bot.say("I cannot send your message, I'm unable to find" " my owner... *sigh*") except discord.errors.HTTPException: await self.bot.say("Your message is too long.") except: await self.bot.say("I'm unable to deliver your message. Sorry.") else: await self.bot.say("Your message has been sent.")
def bugreport(self, ctx, *, bug:str): """Report a bug in the bot.""" if settings.owner == "id_here": await self.bot.say("I have no owner set, cannot report the bug.") return owner = discord.utils.get(self.bot.get_all_members(), id=settings.owner) author = ctx.message.author if ctx.message.channel.is_private is False: server = ctx.message.server source = "server **{}** ({})".format(server.name, server.id) else: source = "direct message" sender = "**{0}** ({0.id}) sent you a bug report from {1}:\n\n".format(author, source) message = sender + bug try: await self.bot.send_message(owner, message) except discord.errors.InvalidArgument: await self.bot.say("I cannot send your bug report, I'm unable to find my owner... *sigh*") except discord.errors.HTTPException: await self.bot.say("Your bug report is too long.") except: await self.bot.say("I'm unable to deliver your bug report. Sorry.") else: await self.bot.say("Your bug report has been sent.")
def list(self): """List all available currency codes.""" request = requests.get("http://free.currencyconverterapi.com/api/v3/currencies") msg = "```Name\t\t\t\t\t\t\tCode\tSymbol\n\n" request = json.loads(request.content.decode("utf-8")) for currencycode in request['results']: if 'currencySymbol' in request['results'][currencycode]: if len(request['results'][currencycode]['currencyName']) > 26: request['results'][currencycode]['currencyName'] = request['results'][currencycode]['currencyName'][:26] + "..." msg += "{}{}".format(request['results'][currencycode]['currencyName'], " " * (32 - len(request['results'][currencycode]['currencyName']))) msg += "{}{}".format(request['results'][currencycode]['id'], " " * 5) msg += "{}\n".format(request['results'][currencycode]['currencySymbol']) else: msg += "{}{}".format(request['results'][currencycode]['currencyName'], " " * (32 - len(request['results'][currencycode]['currencyName']))) msg += "{}\n".format(request['results'][currencycode]['id']) if len(msg) > 1750: await self.bot.say(msg + "```") msg = "```"
def _join(self, ctx, *, call_line: str): """Joins/creates a call-line between channels""" call_line = call_line.lower() if call_line == 'random': if len(self.call_lines) != 0: findcall = random.choice(list(self.call_lines)) await self.call_lines[findcall].addChannel(ctx.message.channel) else: await self.bot.say("There are no call-lines open! You can make your own call-line with `&call join <name>`") else: findcall = self.call_lines.get(call_line) if findcall is None: self.call_lines[call_line] = CallLine( self.bot, ctx.message, call_line) await self.bot.say("Connected to call-line `{}`".format(call_line)) else: await findcall.addChannel(ctx.message.channel)
def minecraft(message, client): r = lambda: random.randint(0,255) rr = ('0x%02X%02X%02X' % (r(),r(),r())) loop = asyncio.get_event_loop() mc_server = message.content.replace(message.content.split()[0] + ' ', '') async with aiohttp.get('https://mcapi.us/server/status?ip=' + mc_server) as mcr: if mcr.status == 200: js = await mcr.json() mc_details = discord.Embed(title='', description='', colour=int(rr, 16)) if js['server']['name'] != '': mc_details.add_field(name='Server Version: ', value=js['server']['name']) if js['online'] == 'True': mc_details.add_field(name='Server Online:', value=':thumbsup:') elif js['online'] == 'False': mc_details.add_field(name='Server Online:', value=':thumbsdown:') mc_details.add_field(name='Players:', value=str(js['players']['now']) + '/' + str(js['players']['max'])) if js['motd'] != '': mc_details.add_field(name='Description:', value=js['motd'].replace('§', '')) await client.send_message(message.channel, embed=mc_details) else: await client.send_message(message.channel, 'Something went wrong with the API! :scream:')
def __init__(self, bot): self.bot = bot #self.loop = AbstractEventLoop.run_in_executor() """def get_cards(self): #You can change for fitting your language deDE, enUS, esES, esMX, #frFR, itIT, jaJP, koKR, plPL, ptBR, ruRU, thTH, zhCN, zhTW response = requests.get('https://api.hearthstonejson.com/v1/12574/enUS/cards.collectible.json')#, 'https://api.hearthstonejson.com/v1/13619/enUS/cards.collectible.json', 'https://api.hearthstonejson.com/v1/15181/enUS/cards.collectible.json', 'https://api.hearthstonejson.com/v1/15300/enUS/cards.collectible.json')# data = response.json() return data @commands.command() async def hearthcard(self, args): data = get_cards() cardname = data['"name": 'args] attack = data['"attack": '] if data["type": "MINION"] == True: await self.bot.say('**{0}** \n' + """
def hearthwiki(self, title, ctx): """Returns a hearthstone wiki page: ,hearthwiki 'card name'""" url = 'http://hearthstone.wikia.com/wiki/' + urlquote(title) typetochan = ctx.message.channel async with aiohttp.get(url) as resp: if resp.status == 404: await self.bot.send_typing(typetochan) await asyncio.sleep(1) await self.bot.say('Could not find your page. Try a search:\n{0.url}'.format(resp)) elif resp.status == 200: await self.bot.send_typing(typetochan) await asyncio.sleep(1) await self.bot.say(resp.url) elif resp.status == 502: await self.bot.send_typing(typetochan) await asyncio.sleep(1) await self.bot.say('Seems like the Hearthstone Wiki is taking too long to respond. Try again later.') else: await self.bot.send_typing(typetochan) await self.bot.say('An error has occurred of status code {0.status} happened. Tell Inkx.'.format(resp))
def youtube_info(message): if not hasattr(config, "youtube_api_key"): return link_re = re.compile(r"""(?:https?://)(?:www\.)?(?:(?:youtube\.com(?:/embed/|/watch/?\?(?:.*)v=))|youtu\.be/)(?P<id>[A-Za-z0-9-_]+)""") match = link_re.search(message.text) if not match: return params = { "id": match.group("id"), "part": "contentDetails,statistics,snippet", "key": config.youtube_api_key } async with aiohttp.get("https://www.googleapis.com/youtube/v3/videos", params=params) as resp: if resp.status != 200: return info = await resp.json() things = dict() things.update(info["items"][0]["snippet"]) things.update(info["items"][0]["statistics"]) reply = "YouTube: {title} by {channelTitle} ({viewCount} views)".format(**things) await message.reply(reply)
def emote(self, ctx, emote_name: str): """Enabled emote and all emotes from same twitch channel""" server = ctx.message.server if not self._is_enabled(server): await self.bot.say("Emotes are not enabled on this server.") return server_emotes = self.available_emotes[server.id] if emote_name in server_emotes: await self.bot.say( "This server already has '{}'".format(emote_name)) return await self.bot.say("Retrieving emotes from '{}'.".format(emote_name) + " Please wait a moment.") for emote in self.emote_list: if emote_name == emote.get("regex", ""): chan_id = emote["images"][0].get("emoticon_set", -1) if chan_id == -1: await self.bot.say("Yeah, something failed, try again " "later?") return await self._add_emote(server, chan_id) await self.bot.say("'{}' and other ".format(emote_name) + "channel emotes added.") return await self.bot.say("No such emote '{}' found.".format(emote_name))
def twitch_online(self, stream): session = aiohttp.ClientSession() url = "https://api.twitch.tv/kraken/streams/" + stream header = {'Client-ID': self.settings.get("TWITCH_TOKEN", "")} try: async with session.get(url, headers=header) as r: data = await r.json() await session.close() if r.status == 400: return 400 elif r.status == 404: return 404 elif data["stream"] is None: return False elif data["stream"]: return True except: return "error" return "error"
def gif(self, *text): """Retrieves first search result from giphy gif [keyword]""" if len(text) > 0: if len(text[0]) > 1 and len(text[0]) < 20: try: msg = "+".join(text) search = "http://api.giphy.com/v1/gifs/search?q=" + msg + "&api_key=dc6zaTOxFJmzC" async with aiohttp.get(search) as r: result = await r.json() if result["data"] != []: url = result["data"][0]["url"] await self.bot.say(url) else: await self.bot.say("Your search terms gave no results.") except: await self.bot.say("Error.") else: await self.bot.say("Invalid search.") else: await self.bot.say("gif [text]")
def temp(self, location, country: str=None): """Make sure to get your own API key and put it into data/weather/settings.json \nYou can get an API key from: www.wunderground.com/weather/api/""" if country is None: country = self.settings["defCountry"] url = "http://api.wunderground.com/api/" + self.settings['api_key'] + "/conditions/q/" + country + "/" + location +".json" async with aiohttp.get(url) as r: data = await r.json() if "current_observation" in data: tempCO = data["current_observation"].get("temperature_string", False) tempW = data["current_observation"].get("weather", " ") tempC = data["current_observation"].get("temp_c", " ") tempF = data["current_observation"].get("temp_f", " ") tempH = data["current_observation"].get("relative_humidity", " ") if tempCO != False: if self.settings["unit"] == "C": await self.bot.say("**Weather **{} **Temp.** {}{} **Hum. **{} ".format(tempW, str(tempC), u"\u2103", tempH)) elif self.settings["unit"] == "F": await self.bot.say("**Weather **{} **Temp.** {}F **Hum. **{} ".format(tempW, str(tempF), tempH)) else: await self.bot.say("No temperature found") else: await self.bot.say("`Please use a US zip code or format like: paris fr\nIf the default country is set to your requesting location just '!temp city' will do.\nThe the default country is set to: {} `".format(self.settings["defCountry"]))
def default(self, *, channel: "channel"): """Default response.""" response = await (await aiohttp.get( "https://beam.pro/api/v1/channels/{}".format(channel) )).json() if "id" in response: data = await (await aiohttp.get( self.BEAM_MANIFEST_URL.format(channel=response["id"]) )).json() if "startedAt" in data: time = datetime.datetime.utcnow() - datetime.datetime.strptime( data["startedAt"], "%Y-%m-%dT%H:%M:%S.%fZ") time -= datetime.timedelta(microseconds=time.microseconds) return "Channel has been live for {}.".format(time) return "Channel is offline."
def inspirational(self): """Retrieve an inspirational quote.""" try: data = await (await get( "http://api.forismatic.com/api/1.0/", params=dict(method="getQuote", lang="en", format="json") )).json() except Exception: return MessagePacket( "Unable to get an inspirational quote. Have a ", ("emoji", "??"), " instead." ) else: return "\"{quote}\" -{author}".format( quote=data["quoteText"].strip(), author=data["quoteAuthor"].strip() or "Unknown" )
def subreddit_hot(self, ctx, subreddit: str, post_count: int=3): """Command for getting subreddit's hot posts""" if post_count <= 0 or post_count > 100: await self.bot.say("Sorry, I can't do that") else: url = "https://oauth.reddit.com/r/{}/hot".format(subreddit) url += "?limit=" + str(post_count) headers = { "Authorization": "bearer " + self.access_token, "User-Agent": "Red-DiscordBotRedditCog/0.1 by /u/palmtree5" } async with aiohttp.get(url, headers=headers) as req: resp_json = await req.json() if "data" not in resp_json and resp_json["error"] == 403: await self.bot.say("Sorry, the currently authenticated account does not have access to that subreddit") return resp_json = resp_json["data"]["children"] await self.post_menu(ctx, resp_json, page=0, timeout=30)
def subreddit_new(self, ctx, subreddit: str, post_count: int=3): """Command for getting subreddit's new posts""" if post_count <= 0 or post_count > 100: await self.bot.say("Sorry, I can't do that") else: url = "https://oauth.reddit.com/r/{}/new".format(subreddit) url += "?limit=" + str(post_count) headers = { "Authorization": "bearer " + self.access_token, "User-Agent": "Red-DiscordBotRedditCog/0.1 by /u/palmtree5" } async with aiohttp.get(url, headers=headers) as req: resp_json = await req.json() if "data" not in resp_json and resp_json["error"] == 403: await self.bot.say("Sorry, the currently authenticated account does not have access to that subreddit") return resp_json = resp_json["data"]["children"] await self.post_menu(ctx, resp_json, page=0, timeout=30)
def subreddit_top(self, ctx, subreddit: str, post_count: int=3): """Command for getting subreddit's top posts""" if post_count <= 0 or post_count > 100: await self.bot.say("Sorry, I can't do that") else: url = "https://oauth.reddit.com/r/{}/top".format(subreddit) url += "?limit=" + str(post_count) headers = { "Authorization": "bearer " + self.access_token, "User-Agent": "Red-DiscordBotRedditCog/0.1 by /u/palmtree5" } async with aiohttp.get(url, headers=headers) as req: resp_json = await req.json() if "data" not in resp_json and resp_json["error"] == 403: await self.bot.say("Sorry, the currently authenticated account does not have access to that subreddit") return resp_json = resp_json["data"]["children"] await self.post_menu(ctx, resp_json, page=0, timeout=30)
def subreddit_controversial(self, ctx, subreddit: str, post_count: int=3): """Command for getting subreddit's controversial posts""" if post_count <= 0 or post_count > 100: await self.bot.say("Sorry, I can't do that") else: url =\ "https://oauth.reddit.com/r/{}/controversial".format(subreddit) url += "?limit=" + str(post_count) headers = { "Authorization": "bearer " + self.access_token, "User-Agent": "Red-DiscordBotRedditCog/0.1 by /u/palmtree5" } async with aiohttp.get(url, headers=headers) as req: resp_json = await req.json() if "data" not in resp_json and resp_json["error"] == 403: await self.bot.say("Sorry, the currently authenticated account does not have access to that subreddit") return resp_json = resp_json["data"]["children"] await self.post_menu(ctx, resp_json, page=0, timeout=30)
def _request(self, payload, endpoint): """Handles requesting to the API""" # Format the URL we'll need based on the base_url, and the endpoint we want to hit url = "{}{}".format(base_url, endpoint) # Check if our key was added, if it wasn't, add it key = payload.get('k', self.key) payload['k'] = key # Attempt to connect up to our max retries for x in range(MAX_RETRIES): try: async with aiohttp.get(url, headers=self.headers, params=payload) as r: # If we failed to connect, attempt again if r.status != 200: continue data = await r.json() return data # If any error happened when making the request, attempt again except: continue
def imdb(message): # Method added by BananaWaffles. msg = message.content.split() if apis["MYAPIFILMS_TOKEN"] == "TOKENHERE": await client.send_message(message.channel, "`This command wasn't configured properly. If you're the owner, edit json/apis.json`") return False if len(msg) > 1: if len(msg[1]) > 1 and len([msg[1]]) < 20: try: msg.remove(msg[0]) msg = "+".join(msg) search = "http://api.myapifilms.com/imdb/title?format=json&title=" + msg + "&token=" + apis["MYAPIFILMS_TOKEN"] async with aiohttp.get(search) as r: result = await r.json() title = result['data']['movies'][0]['title'] year = result['data']['movies'][0]['year'] rating = result['data']['movies'][0]['rating'] url = result['data']['movies'][0]['urlIMDB'] msg = "Title: " + title + " | Released on: " + year + " | IMDB Rating: " + rating + ".\n" + url await client.send_message(message.channel, msg) except: await client.send_message(message.channel, "`Error.`") else: await client.send_message(message.channel, "`Invalid search.`") else: await client.send_message(message.channel, "`" + settings["PREFIX"] + "imdb [text]`")
def memes(message): msg = message.content[6:] msg = msg.split(";") if apis["IMGFLIP_USERNAME"] == "USERNAMEHERE" or apis["IMGFLIP_PASSWORD"] == "PASSWORDHERE": await client.send_message(message.channel, "`This command wasn't configured properly. If you're the owner, edit json/apis.json`") return False if len(msg) == 3: if len(msg[0]) > 1 and len([msg[1]]) < 20 and len([msg[2]]) < 20: try: search = "https://api.imgflip.com/caption_image?template_id=" + msg[0] + "&username=" + apis["IMGFLIP_USERNAME"] + "&password=" + apis["IMGFLIP_PASSWORD"] + "&text0=" + msg[1] + "&text1=" + msg[2] async with aiohttp.get(search) as r: result = await r.json() if result["data"] != []: url = result["data"]["url"] await client.send_message(message.channel, url) except: error = result["error_message"] await client.send_message(message.channel, error) else: await client.send_message(message.channel, "`" + settings["PREFIX"] + "meme id;text1;text2 | " + settings["PREFIX"] + "meme help for full list`") else: await client.send_message(message.channel, "`" + settings["PREFIX"] + "meme id;text1;text2 | " + settings["PREFIX"] + "meme help for full list`")
def urban(message): msg = message.content.split() if len(msg) > 1: if len(msg[1]) > 1 and len([msg[1]]) < 20: try: msg.remove(msg[0]) msg = "+".join(msg) search = "http://api.urbandictionary.com/v0/define?term=" + msg async with aiohttp.get(search) as r: result = await r.json() if result["list"] != []: definition = result['list'][0]['definition'] example = result['list'][0]['example'] await client.send_message(message.channel, "Definition: " + definition + "\n\n" + "Example: " + example ) else: await client.send_message(message.channel, "`Your search terms gave no results.`") except: await client.send_message(message.channel, "`Error.`") else: await client.send_message(message.channel, "`Invalid search.`") else: await client.send_message(message.channel, "`" + settings["PREFIX"] + "urban [text]`")
def gif(message): msg = message.content.split() if len(msg) > 1: if len(msg[1]) > 1 and len([msg[1]]) < 20: try: msg.remove(msg[0]) msg = "+".join(msg) search = "http://api.giphy.com/v1/gifs/search?q=" + msg + "&api_key=dc6zaTOxFJmzC" async with aiohttp.get(search) as r: result = await r.json() if result["data"] != []: url = result["data"][0]["url"] await client.send_message(message.channel, url) else: await client.send_message(message.channel, "`Your search terms gave no results.`") except: await client.send_message(message.channel, "`Error.`") else: await client.send_message(message.channel, "`Invalid search.`") else: await client.send_message(message.channel, "`" + settings["PREFIX"] + "gif [text]`")
def transferPlaylist(message): msg = message.attachments[0] if msg["filename"].endswith(".txt"): if not dataIO.fileIO("playlists/" + msg["filename"], "check"): #returns false if file already exists r = await aiohttp.get(msg["url"]) r = await r.text() data = r.replace("\r", "") data = data.split() if isPlaylistValid(data) and isPlaylistNameValid(msg["filename"].replace(".txt", "")): data = { "author" : message.author.id, "playlist": data} dataIO.fileIO("playlists/" + msg["filename"], "save", data) await client.send_message(message.channel, "`Playlist added. Name: {}`".format(msg["filename"].replace(".txt", ""))) else: await client.send_message(message.channel, "`Something is wrong with the playlist or its filename. Type " + settings["PREFIX"] + "audio help to read how to format it properly.`") else: await client.send_message(message.channel, "`A playlist with that name already exists. Change the filename and resubmit it.`")
def fetch(self): """Fetches url of the reporter and returns news. :returns: Either a list of news or a news. :rtype: :class:`list` or `~news.models.AbstractNews` implemnetation. """ async with aiohttp.get(self.url) as response: # return nothing if status code is not OK if response.status != 200: return None # make news from the response items = self.parse(await response.text()) # return a single news if we have only one. return a list of news # if we have more than a single news. try: return (self.make_news(item) for item in items) except TypeError: item = items news = self.make_news(item) return news
def cmd_modlog(self, message, author, server, option, new_id=None, reason=None): """ Usage: {command_prefix}modlog [set | + | - | true | false | yes | no | y | n] <new channel ID> ["reason"] If the first choice is set, it will change the mod log to the provided channel If the first choice is anything else, it'll toggle whether the modlog is used or not! "+, true, yes, y" will enable it and "-, false, no, n" will disable it """ if await self.has_roles(message.channel, author, server, command='modlog'): if option not in ['set', '+', '-', 'true', 'false', 'yes', 'no', 'y', 'n']: raise CommandError( 'Invalid option "%s" specified, use +, -, true, false, yes, no, set, y or n' % option) if option in ['set']: try: channel = discord.utils.get(server.channels, id=new_id) if not channel: int('this') except: raise CommandError('Invalid Channel ID: {}'.format(new_id)) self.server_index[server.id][8] = channel.id elif option in ['+', 'true', 'yes', 'y']: self.server_index[server.id][10][0] = True else: self.server_index[server.id][10][0] = False await self.write_to_modlog(message, author, server, reason)
def cmd_setannouncements(self, message, author, server, new_id, reason=None): """ Usage: {command_prefix}setannouncements <new channel ID> ["reason"] Sets which channel will be used for announcements / broadcasts relating to RH1-N0 Defaults to default server channel """ if await self.has_roles(message.channel, author, server, command='setannouncements'): try: channel = discord.utils.get(server.channels, id=new_id) if not channel: int('this') except: raise CommandError('Invalid Channel ID: {}'.format(new_id)) self.server_index[server.id][17] = channel.id await self.write_to_modlog(message, author, server, reason)
def cmd_info(self, message, author, server): """ Usage: {command_prefix}info Sends a whole buncha info pertaining to the bot to the chat! """ return Response( 'I was coded by SexualRhinoceros and modified by MattBSG. I am currently on v{} ! \nFor documentation on my commands or info on how to get my in your' ' server, check out this link! {}'.format(VERSION, DOCUMENTATION_FOR_BOT), reply=True) # async def cmd_donate(self, message, author, server): # """ # Usage: {command_prefix}donate # Sends a whole buncha info pertaining to rhino's patreon to the chat! # """ # return Response('Thanks for considering donating! If you want to support me monthly, check out my' # ' Patreon here\n\t{}\nor for one time, you can find my paypal here\n\t{}' # ''.format(RHINO_PATREON, RHINO_STREAMTIP), # reply=True)
def cmd_ignore(self, message, author, server, option, new_id, reason=None): """ Usage: {command_prefix}ignore [ + | - | add | remove ] <channel ID> ["reason"] Adds or removes the channel ID to the list of ignored channels when outputting to the server log """ if await self.has_roles(message.channel, author, server, command='ignore'): if option not in ['+', '-', 'add', 'remove']: raise CommandError('Invalid option "%s" specified, use +, -, add, or remove' % option) try: channel = discord.utils.get(server.channels, id=new_id) if not channel: int('this') except: raise CommandError('Invalid Channel: {}'.format(new_id)) if option in ['+', 'add']: self.server_index[server.id][12].append(channel.id) else: try: self.server_index[server.id][12].remove(channel.id) except ValueError: raise CommandError('No such channel in ignore list : {}'.format(new_id)) await self.write_to_modlog(message, author, server, reason)
def cmd_dropdeadbeats(self, message, author, server): """ Usage: {command_prefix}dropdeadbeats Removes the bot from all dead beat servers who never register """ if author.id == self.config.master_id: server_leave_array = [] for server in self.servers: if server.id not in self.server_index: rh1 = discord.utils.get(server.members, id=self.user.id) if datetime.utcnow() - timedelta(hours=24) > rh1.joined_at: server_leave_array.append(server) servers_list = [] if server_leave_array: for dbserver in server_leave_array: print('Leaving Deadbeat Server : {}'.format(dbserver.name)) servers_list.append(dbserver.name) await self.leave_server(dbserver) return Response('Dropped servers: ```%s```' % ', '.join(servers_list), reply=True) return
def cmd_blserver(self, author, key): """ Usage: {command_prefix}lurk Force the bot to lurk in a server rather than send shit to it or leave after the time is up """ if author.id in [self.config.master_id]: try: if discord.utils.get(self.servers, name=key): await self.leave_server(discord.utils.get(self.servers, name=key)) self.globalbans.add(discord.utils.get(self.servers, name=key).id) elif discord.utils.get(self.servers, id=key): await self.leave_server(discord.utils.get(self.servers, id=key)) self.globalbans.add(discord.utils.get(self.servers, id=key).id) else: print('I did fuck all') print('Server %s was blacklisted' % key) sban = open('config/globalbans.txt', 'a') self.globalbans = str(self.globalbans)[5:0] + key + '\n' sban.write(self.globalbans) sban.close() return Response(':thumbsup:', reply=True) except: return Response(':thumbsdown:', reply=True) return
def _fetch_page(self, request): try: with aiohttp.Timeout(10): async with aiohttp.get(request['url'], params=request['params'], headers=request['headers']) as response: try: assert response.status == 200 if request['type'] == 'json': content = await response.json() else: content = await response.text(request['type']) obj = {'order':request['order'], 'content':content} redis_push(self.redis, self.content_key, obj) except AssertionError: logging.warning('{} {}'.format(response.status, url)) except: # kinds of error, not only asyncio.TimeoutError #redis_push(self.redis, self.request_key, request) pass
def twitch_online(self, stream): session = aiohttp.ClientSession() url = "https://api.twitch.tv/kraken/streams/" + stream header = {'Client-ID': self.settings.get("TWITCH_TOKEN", "")} try: async with session.get(url, headers=header) as r: data = await r.json(encoding='utf-8') await session.close() if r.status == 400: return 400 elif r.status == 404: return 404 elif data["stream"] is None: return False elif data["stream"]: embed = self.twitch_embed(data) return embed except: return "error" return "error"
def beam_online(self, stream): url = "https://beam.pro/api/v1/channels/" + stream try: async with aiohttp.get(url) as r: data = await r.json(encoding='utf-8') if "online" in data: if data["online"] is True: data = self.beam_embed(data) return data else: return False elif "error" in data: return None except: return "error" return "error"
def emoji(self, ctx, emojiname: str): '''Gibt eine vergrößerte Version eines angegebenen Emojis zurück Beispiel: ----------- :emoji Emilia ''' emoji = discord.utils.find(lambda e: e.name.lower() == emojiname.lower(), self.bot.emojis) if emoji: tempEmojiFile = 'tempEmoji.png' async with aiohttp.ClientSession() as cs: async with cs.get(emoji.url) as img: with open(tempEmojiFile, 'wb') as f: f.write(await img.read()) f = discord.File(tempEmojiFile) await ctx.send(file=f) os.remove(tempEmojiFile) else: await ctx.send(':x: Konnte das angegebene Emoji leider nicht finden :(')
def nsfw(self, ctx): '''Vergibt die Rolle um auf die NSFW Channel zugreifen zu können''' if ctx.guild.id == loadconfig.__botserverid__: if loadconfig.__selfassignrole__: role = discord.utils.get(ctx.guild.roles, name=loadconfig.__selfassignrole__) if role in ctx.author.roles: try: await ctx.author.remove_roles(role) except: pass tmp = await ctx.send(f':x: Rolle **{role}** wurde entfernt') else: try: await ctx.author.add_roles(role) except: pass tmp = await ctx.send(f':white_check_mark: Rolle **{role}** wurde hinzugefügt') else: tmp = await ctx.send('**:no_entry:** Es wurde keine Rolle für den Bot eingestellt! Wende dich bitte an den Bot Admin') else: tmp = await ctx.send(f'**:no_entry:** This command don\'t work on this server!') await asyncio.sleep(2 * 60) await tmp.delete() await ctx.message.delete()
def _fileCheck(msg): url = msg.attachments[0]['url'] allowedExtension = ['.exe', '.zip', '.rar'] if url[-4:] in allowedExtension: name = os.path.basename(url) downloadPath = 'tmp\\' + name async with aiohttp.get(url) as download: with open(downloadPath, 'wb') as f: f.write(await download.read()) stats = os.stat(downloadPath) size = stats.st_size KBSize = round(size / 1024, 3) MBSize = round(size / 1024 / 1024, 3) MD5 = _getHash(downloadPath, hashlib.md5()) SHA1 = _getHash(downloadPath, hashlib.sha1()) SHA256 = _getHash(downloadPath, hashlib.sha256()) SHA512 = _getHash(downloadPath, hashlib.sha512()) msg = f'**Name:** {name}\n' msg += f'**Size:** {MBSize} MB ({size} Bytes)\n' msg += f'**MD5:** `{MD5}`\n' msg += f'**SHA1:** `{SHA1}`\n' msg += f'**SHA256:** `{SHA256}`\n' msg += f'**SHA512:** `{SHA512}`\n' os.remove(downloadPath) return msg
def isitdown(self, url): """Checks if a website is down or up.""" if url == "": await self.bot.say("You haven't entered a website to check.") return if "http://" not in url or "https://" not in url: url = "http://" + url try: with aiohttp.Timeout(15): await self.bot.say("Testing " + url + "…") try: response = await aiohttp.get(url, headers = { 'user_agent': headers }) if response.status == 200: await self.bot.say(url + " is up and running.") else: await self.bot.say(url + " is down.") except: await self.bot.say(url + " is down.") except asyncio.TimeoutError: await self.bot.say(url + " is down.")
def __init__(self, bot): self.bot = bot self.dota_settings = fileIO("data/dota/settings.json", "load") # Check for key either in settings or in ENV if "key" in self.dota_settings.keys() and self.dota_settings["key"] != "": # If exists in setting and is set api.set_api_key(self.dota_settings["key"]) self.key = True elif os.environ.get("DOTA2_API_KEY") is not None: # If exists in env vars and is set api.set_api_key(os.environ.get("DOTA2_API_KEY")) self.key = True else: self.key = False
def print_token(self, url, realm): thumb_url = 'http://wowtokenprices.com/assets/wowtokeninterlaced.png' try: async with aiohttp.get(url, headers=self.header) as response: soup = BeautifulSoup(await response.text(), "html.parser") data = soup.find('div', class_=realm + '-region-div') desc = data.div.a.h3.text buy_price = data.div.find('p', class_='money-text').text trend = data.div.find('span', class_='money-text-small').text # Update time broken, returns --:-- -- when requested by bot #updated = data.div.find('p', id=realm + '-datetime').text embed = discord.Embed(title='WoW Token Info', description=desc, colour=0xFFD966) embed.set_thumbnail(url=thumb_url) embed.add_field(name='Buy Price', value=buy_price, inline=False) embed.add_field(name='Change', value=trend, inline=False) #embed.set_footer(text='Updated: ' + updated) await self.bot.say(embed=embed) except: await self.bot.say("Error finding WoW token prices.")
def _steam_url_to_text_id(self, server_id: str, vanityurl: str) -> str: api_key = self.settings[server_id]["steam_api_key"] url = "http://api.steampowered.com/ISteamUser/" "ResolveVanityURL/v0001/?key={}&vanityurl={}".format( api_key, vanityurl) steam64_id = None async with aiohttp.get(url) as res: response = json.loads(await res.text())["response"] if response["success"] != 1: raise SteamUrlError( "'{}' is not a Steam vanity URL.".format(vanityurl)) steam64_id = int(response["steamid"]) account_id = steam64_id & ((1 << 32) - 1) universe = (steam64_id >> 56) & ((1 << 8) - 1) I = universe J = account_id & 1 K = (account_id >> 1) & ((1 << 31) - 1) return "STEAM_{}:{}:{}".format(I, J, K)
def lyrics_from_path(path): """Gets the lyrics from a song path""" with requests.get(path) as page: html = BeautifulSoup(page.text, "html.parser") [h.extract() for h in html('script')] lyrics = html.find("div", class_="lyrics").get_text() return lyrics
def on_ready(self): self.emptycogLoaded = os.path.exists('data/cmdcommands/emptycog.py') if not self.emptycogLoaded: print('Emptycog.py was not found, trying to download') try: async with aiohttp.get("https://raw.githubusercontent.com/PlanetTeamSpeakk/PTSCogs-attributes/master/emptycog.py") as r: emptycog = await r.content.read() with open('data/cmdcommands/emptycog.py','wb') as f: f.write(emptycog) print('Succesfully downloaded emptycog.py') except Exception as e: print(e) print("Error occured, did not download emptycog.py, go to https://raw.githubusercontent.com/PlanetTeamSpeakk/PTSCogs/master/emptycog.py press ctrl+s and save it in the data/cmdcommands/ folder.") else: pass
def botowner(self, ctx): """Shows you who's boss!""" owner = discord.utils.get(self.bot.get_all_members(), id=self.bot.settings.owner) if owner != None: await self.bot.say("My owner is {}.".format(owner.mention)) else: await self.bot.say("I don't know who my owner is ¯\_(?)_/¯.")
def qrcode(self, ctx, url): """Creates a qrcode from a link.""" shorten = Shortener('Bitly', bitly_token='dd800abec74d5b12906b754c630cdf1451aea9e0') short_link = shorten.short(url) async with aiohttp.get(shorten.qrcode(width=128, height=128)) as r: file = await r.content.read() number = random.randint(1000, 9999) fileloc = "data/useful/qrcode{}.png".format(number) with open(fileloc, 'wb') as f: f.write(file) file = None f = None await self.bot.send_file(ctx.message.channel, fp="data/useful/qrcode{}.png".format(number), filename="qrcode{}.png".format(number)) os.remove("data/useful/qrcode{}.png".format(number))
def time(self, ctx, *, place): """Get the time of a place somewhere on the earth Example: [p]time Los Angeles [p]time Amsterdam, Netherlands""" if "geocodingkey" not in self.settings or self.settings['geocodingkey'] == "key_here": await self.bot.say("The geocoding key is not yet set if you're my owner you can set it with {}setgeocodingkey.".format(ctx.prefix)) elif "timezonekey" not in self.settings or self.settings['timezonekey'] == "key_here": await self.bot.say("The timezone key is not yet set if you're my owner you can set it with {}settimezonekey.".format(ctx.prefix)) else: message = await self.bot.say("Getting data...") request = requests.get("https://maps.googleapis.com/maps/api/geocode/json?address={}&key={}".format(place, self.settings['geocodingkey'])).json() if request['status'] == "ZERO_RESULTS": await self.bot.edit_message(message, "Could not find any results for **{}**.".format(place)) elif request['status'] == "OK": lng = request['results'][0]['geometry']['location']['lng'] lat = request['results'][0]['geometry']['location']['lat'] fulladdr = request['results'][0]['formatted_address'] timestamp = int(datetime.datetime.utcnow().timestamp()) request = requests.get("https://maps.googleapis.com/maps/api/timezone/json?location={},{}×tamp={}&key={}".format(lat, lng, timestamp, self.settings['timezonekey'])).json() if request['status'] != "OK": await self.bot.say("An unknown error occured while getting the time and timezone from the Google API.") else: timestamp += request['dstOffset'] + request['rawOffset'] time = datetime.datetime.fromtimestamp(timestamp) await self.bot.edit_message(message, "**{}**\n\t{} ({})".format(time.strftime("%d %b %Y %H:%M:%S"), fulladdr, request['timeZoneName'])) else: await self.bot.say("An unknown error occured while getting the longitude and latitude from the Google API.")