我们从Python开源项目中,提取了以下47个代码示例,用于说明如何使用discord.ext()。
def spank(self, ctx, user : discord.Member = None): """Spank""" author = ctx.message.author if not user: with aiohttp.ClientSession() as session: async with session.get("https://images-ext-1.discordapp.net/eyJ1cmwiOiJodHRwczovL2Nkbi5kaXNjb3JkYXBwLmNvbS9hdHRhY2htZW50cy8xMDc5NDI2NTIyNzU2MDE0MDgvMTA3OTQ1MDg3MzUwMDc5NDg4L1R1SEdKLmdpZiJ9.-XeFHSFOR0nv53M34HeUBqQc7Wc.gif") as resp: test = await resp.read() with open("data/commands/Images/imgres.gif", "wb") as f: f.write(test) await self.bot.say("{} spanked someone! :scream:".format(author.mention)) await self.bot.upload("data/commands/Images/imgres.gif") else: with aiohttp.ClientSession() as session: async with session.get("https://images-ext-1.discordapp.net/eyJ1cmwiOiJodHRwczovL2Nkbi5kaXNjb3JkYXBwLmNvbS9hdHRhY2htZW50cy8xMDc5NDI2NTIyNzU2MDE0MDgvMTA3OTQ1MDg3MzUwMDc5NDg4L1R1SEdKLmdpZiJ9.-XeFHSFOR0nv53M34HeUBqQc7Wc.gif") as resp: test = await resp.read() with open("data/commands/Images/imgres.gif", "wb") as f: f.write(test) await self.bot.say("{} spanked {}! :scream:".format(author.mention, user.mention)) await self.bot.upload("data/commands/Images/imgres.gif")
def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs, formatter=DogbotHelpFormatter()) # configuration dict self.cfg = kwargs.get('cfg', {}) # aiohttp session used for fetching data self.session = aiohttp.ClientSession(loop=self.loop) # boot time (for uptime) self.boot_time = datetime.datetime.utcnow() # aioredis connection self.redis = None # asyncpg self.database = self.cfg['db']['postgres']['database'] self.pgpool = None # load core extensions self._exts_to_load = [] self.load_extensions('dog/core/ext', 'Core recursive load')
def load_extensions(self, directory: str, prefix: str = 'Recursive load'): """Loads extensions from a directory recursively.""" IGNORE = {'__init__.py', '__pycache__', '.DS_Store'} base = directory.replace('/', '.') # build list of extension stems extension_stems = [ path.stem for path in Path(directory).resolve().iterdir() \ if path.name not in IGNORE and path.suffix != 'pyc' ] logger.debug('Extensions to load: %s', extension_stems) for ext in extension_stems: load_path = base + '.' + ext logger.info('%s: %s', prefix, load_path) self.load_extension(load_path) # keep track of a list of extensions to load self._exts_to_load = list(self.extensions.keys()).copy()
def update_statistics(pg: asyncpg.connection.Connection, ctx: commands.Context): """ Updates command statistics for a specific `discord.ext.commands.Context`. If no record was found for a command, it is created. Otherwise, the ``times_used`` and ``last_used`` fields are updated. """ row = await get_statistics(pg, str(ctx.command)) if row is None: # first time command is being used, insert it into the database insert = 'INSERT INTO command_statistics VALUES ($1, 1, $2)' await pg.execute(insert, str(ctx.command), datetime.datetime.utcnow()) logger.info('First command usage for %s', ctx.command) else: # command was used before, increment time_used and update last_used update = ('UPDATE command_statistics SET times_used = times_used + 1, last_used = $2 ' 'WHERE command_name = $1') await pg.execute(update, str(ctx.command), datetime.datetime.utcnow())
def emote(self, ctx, emote: str): """Get a Twitch, FrankerFaceZ, BetterTTV, or Discord emote. Usage: emote [name of emote]""" emote = emote.replace(':', '') with async_timeout.timeout(13): try: async with self.bot.cog_http.get('https://static-cdn.jtvnw.net/emoticons/v1/' + str(self.bot.emotes['twitch'][emote]['image_id']) + '/1.0') as resp: emote_img = await resp.read() except KeyError: # let's try frankerfacez try: async with self.bot.cog_http.get('https://cdn.frankerfacez.com/emoticon/' + str(self.bot.emotes['ffz'][emote]) + '/1') as resp: emote_img = await resp.read() except KeyError: # let's try BetterTTV try: async with self.bot.cog_http.get(self.bot.emotes['bttv'][emote]) as resp: emote_img = await resp.read() except KeyError: # let's try Discord await ctx.send('**No such emote!** I can fetch from Twitch, FrankerFaceZ, BetterTTV, or Discord (soon).') return False img_bytes = io.BytesIO(emote_img) ext = imghdr.what(img_bytes) await ctx.send(file=discord.File(img_bytes, f'emote.{ext}'))
def rtfs(self, ctx, *, thing: str): """tries to show the source file of a thing thing may be a non-builtin module, class, method, function, traceback, frame, or code object, or if surrounded by single or double quotes, a space separated discord.ext.commands call, or a period deliminated file/module path as used when importing""" msg = ctx.message variables = { 'ctx': ctx, 'bot': self.bot, 'message': msg, 'server': msg.server, 'channel': msg.channel, 'author': msg.author } def call(thing, variables): return self.repl_format_source(eval(thing, variables)) closure = _close(call, thing, variables) await self.print_results(ctx, _call_catch_fmt(closure, 0))
def add_jose_cog(self, cls: 'class'): """Add a cog but load its requirements first.""" requires = cls._cog_metadata.get('requires', []) log.debug('requirements for %s: %r', cls, requires) for _req in requires: req = f'ext.{_req}' if not self.extensions.get(req): log.debug('loading %r from requirements', req) self.load_extension(req) else: log.debug('%s already loaded', req) # We instantiate here because # instantiating on the old add_cog # is exactly the cause of the problem cog = cls(self) super().add_cog(cog)
def charge_ext(self): extdiv = "data/biomes/ext/{}.txt" chemin = extdiv.format(self.system["EXT"]) if os.path.isfile(chemin): liste = chemin with open(liste, "r", encoding="ISO-8859-1") as f: liste = f.readlines() self.charge = {} for line in liste: line = line.replace("\n", "") tag = line[:3] item = line[4:] if tag not in self.charge: self.charge[tag] = {"TAG" : tag, "ITEMS" : []} self.charge[tag]["ITEMS"].append(item) else: self.save("charge") return True else: return False
def on_command_error(ctx, error): if isinstance(error, discord.ext.commands.errors.CommandNotFound): pass # ...don't need to know if commands don't exist elif isinstance(error, discord.ext.commands.errors.CheckFailure): await ctx.send("You don't have permission to use this command.") elif isinstance(error, discord.ext.commands.errors.MissingRequiredArgument): formatter = commands.formatter.HelpFormatter() await ctx.send("You are missing required arguments.\n{}".format(formatter.format_help_for(ctx, ctx.command)[0])) elif isinstance(error, discord.ext.commands.errors.CommandOnCooldown): await ctx.message.delete() await ctx.send("This command is on cooldown, don't you dare try it again.", delete_after=10) else: if ctx.command: await ctx.send("An error occurred while processing the `{}` command.".format(ctx.command.name)) print('Ignoring exception in command {0.command} in {0.message.channel}'.format(ctx)) tb = traceback.format_exception(type(error), error, error.__traceback__) error_trace = "".join(tb) print(error_trace) embed = discord.Embed(description=error_trace.translate(bot.escape_trans)) await bot.err_logs_channel.send("An error occurred while processing the `{}` command in channel `{}`.".format(ctx.command.name, ctx.message.channel), embed=embed)
def imgur_remove(self,albumName,imageLink): """removes image from album, pass direct image""" albumName = albumName.lower() for album in imgurClient.get_account_albums('me'): if albumName == album.title.lower(): #image = imgurClient.upload_from_url(args[1], config=None, anon=True) #imgurClient.album_add_images(album.id,image['id']) images = imgurClient.get_album_images(album.id) #imageID = urlparse(imageLink).path[1::] #remove / at start imageID, ext = splitext(urlparse(imageLink).path) imageID = imageID[1::] for image in images: if imageID == image.id: imgurClient.album_remove_images(album.id,imageID) await self.bot.say("removed {} from album".format(image.id)) break else: #album not found await self.bot.say("Album: " + albumName + " not found")
def get_ext(url): """Return the filename extension from url, or ''.""" parsed = urlparse(url) root, ext = splitext(parsed.path) return ext[1:] # or ext if you want the leading '.'
def get_extensions(self, path, excl): extensions = [] for root, dir, files in os.walk(path): for items in fnmatch.filter(files, "*"): temp_extensions = items.rfind(".") ext = items[temp_extensions+1:] if ext not in extensions: if ext in excl: extensions.append(ext) pass return extensions
def on_command_error(error, ctx): if isinstance(error, discord.ext.commands.errors.CommandNotFound): pass # ...don't need to know if commands don't exist if isinstance(error, discord.ext.commands.errors.CheckFailure): await bot.send_message(ctx.message.channel, "{} You don't have permission to use this command.".format(ctx.message.author.mention)) elif isinstance(error, discord.ext.commands.errors.MissingRequiredArgument): formatter = commands.formatter.HelpFormatter() await bot.send_message(ctx.message.channel, "{} You are missing required arguments.\n{}".format(ctx.message.author.mention, formatter.format_help_for(ctx, ctx.command)[0])) else: if ctx.command: await bot.send_message(ctx.message.channel, "An error occured while processing the `{}` command.".format(ctx.command.name)) print('Ignoring exception in command {}'.format(ctx.command), file=sys.stderr) traceback.print_exception(type(error), error, error.__traceback__, file=sys.stderr)
def get_server(context: Context) -> discord.Server: """ Gets the server to which a command was sent, based on the command's context. :param context: The context in which the command was sent. :type context: discord.ext.commands.Context :return: The server """ return context.message.server
def get_server_id(context: Context) -> str: """ Gets the ID of the server to which a command was sent, based on the command's context. :param context: The context in which the command was sent. :type context: discord.ext.commands.Context :return: The server's ID """ server = get_server(context) return None if server is None else server.id
def get_channel(context: Context) -> discord.Channel: """ Gets a channel to which a command was sent, based on the command's context. :param context: The context in which the command was sent. :type context: discord.ext.commands.Context :return: The channel """ return context.message.channel
def get_channel_id(context: Context) -> str: """ Gets the ID of the channel to which a command was sent, based on the command's context. :param context: The context in which the command was sent. :type context: discord.ext.commands.Context :return: The channel's ID """ return get_channel(context).id
def get_channel_name(context: Context) -> str: """ Gets the name of the channel to which a command was sent, based on the command's context. :param context: The context in which the command was sent. :type context: discord.ext.commands.Context :return: The channel's name """ return get_channel(context).name
def get_author_name(context: Context, nick=False) -> str: """ Gets the name of the author of a command, based on the command's context. :param context: The context in which the command was sent. :type context: discord.ext.commands.Context :param nick: Whether the given value should be the real user name, or the member's nick (if available). Defaults to False :type nick: bool :return: The author's name """ return get_name(context.message.author, nick)
def author_has_role(context: commands.Context, role_id: str) -> bool: """ Checks within a command's authors roles for one that has a matching ID to the one given. :param context: The context in which the command was sent. :type context: discord.ext.commands.Context :param role_id: The ID of a role to check for. :type role_id: str :return: True if the author has a role with the given ID, false otherwise. """ return has_role(context.message.author, role_id)
def to_boolean(value) -> bool: """Parses a string to boolean. Only meant to be used for command arguments. :param value: The string to evaluate. :return: The parsed value. :raise: discord.ext.commands.errors.BadArgument: If the value cannot be parsed. .. note:: The valid values are not just 'True' or 'False'. It can be either 'true', 'on', 'yes' or 'y' for True or 'false', 'off', 'no' or 'n' for False and is not case-sensitive (so something like TruE is valid). """ if isinstance(value, bool): return value value = str(value).lower() if value in ['1', 'true', 'on', 'yes', 'y']: return True elif value in ['0', 'false', 'off', 'no', 'n']: return False else: raise TypeError("Could not parse {} to boolean".format(value))
def on_command_error(self, exception, context): """ Override bot.on_command_error function. Error handling function. """ if isinstance(exception, discord.ext.commands.errors.CommandNotFound): msg = """Sorry, this command is unknown to me... :japanese_ogre:\ \nDo you need help? \ \nIf so, just type ***!help*** :sunglasses:""" elif isinstance(exception, discord.ext.commands.errors.DisabledCommand): msg = ":sleeping:" elif isinstance(exception, discord.ext.commands.errors.CheckFailure): msg = """:octagonal_sign: you are not allowed to do this.\ \nOnly an :alien: can wake me up...""" elif isinstance(exception, discord.ext.commands.errors.MissingRequiredArgument): msg = """:thought_balloon: You forgot parameters.\ \nType !help [command] to get help :interrobang:""" elif isinstance(exception, APIconnectionError): msg = """It seems we can't contact the API...:frowning2: \ \nTake a rest and retry later.:play_pause: """ elif isinstance(exception, discord.ext.commands.errors.CommandInvokeError): msg = "oups... " + str(exception) print(msg) else: msg = "oups inconnu... " + str(type(exception)) + str(exception) print(msg) await self.send_message(context.message.channel, msg)
def kys(self, ctx): """Kill Yourself""" author = ctx.message.author with aiohttp.ClientSession() as session: async with session.get("https://images-ext-2.discordapp.net/eyJ1cmwiOiJodHRwczovL2Nkbi5tZW1lLmFtL2luc3RhbmNlcy81MDB4LzY3NTY4NTg0LmpwZyJ9.oNix5m3kkZkmQK5V8uDlLD_fVe0.jpg") as resp: test = await resp.read() with open("data/commands/Images/imgres.png", "wb") as f: f.write(test) await self.bot.upload("data/commands/Images/imgres.png")
def datboi(self, ctx): """Dait Boi""" author = ctx.message.author foo = ["http://i1.kym-cdn.com/photos/images/newsfeed/001/112/704/8a8.jpg", "http://2static.fjcdn.com/pictures/Origins_3c4898_5920950.jpg", "http://i3.kym-cdn.com/photos/images/original/001/116/633/8e9.jpg", "http://www.kappit.com/img/pics/201605_1023_iagcb_sm.jpg", "https://img0.etsystatic.com/125/1/12078849/il_340x270.969775168_rzq8.jpg", "https://i.ytimg.com/vi/fs5Sudmauks/maxresdefault.jpg", "http://i1.kym-cdn.com/photos/images/original/001/118/006/5b1.jpg", "http://i3.kym-cdn.com/photos/images/newsfeed/001/118/078/22b.jpeg", "http://i3.kym-cdn.com/photos/images/newsfeed/001/118/014/e78.png", "https://images-ext-1.discordapp.net/eyJ1cmwiOiJodHRwOi8vY2RuLnNtb3NoLmNvbS9zaXRlcy9kZWZhdWx0L2ZpbGVzLzIwMTYvMDUvZGF0LWJvaS1tZW1lcy1ib3ktd2hvLWxpdmVkLmpwZyJ9.WBSEhlT69xiEWq0KRgR90j9YwDA.jpg?width=243&height=249", "http://i2.kym-cdn.com/photos/images/newsfeed/001/112/712/650.jpg"] with aiohttp.ClientSession() as session: async with session.get("{}".format(random.choice(foo))) as resp: test = await resp.read() with open("data/commands/Images/imgres.png", "wb") as f: f.write(test) await self.bot.say("Here comes dat boi! o shit waddup!:\n") await self.bot.upload("data/commands/Images/imgres.png")
def reload_modules(self): """Reloads all Dogbot related modules.""" # get applicable modules to reload modules = {k: m for k, m in sys.modules.items() if ('dog' in k and 'datadog' not in k) and 'ext' not in k and k != 'dog'} for name, module in modules.items(): logger.info('Reloading bot module: %s', name) importlib.reload(module) logger.info('Finished reloading bot modules!')
def get_statistics(pg: asyncpg.connection.Connection, command_name: str) -> \ Union[List[asyncpg.Record], None]: """ Fetches statistics for a specific ``discord.ext.commands.Context``. If no record was found, ``None`` is returned. """ return await pg.fetchrow('SELECT * FROM command_statistics WHERE command_name = $1', command_name)
def get_nitro_embed(self): emb = discord.Embed(color=0x505e80, description='Discord Nitro is **required** to view this message.') emb.set_thumbnail(url='https://images-ext-2.discordapp.net/eyJ1cmwiOiJodHRwczovL2Nkbi5kaXNjb3' 'JkYXBwLmNvbS9lbW9qaXMvMjY0Mjg3NTY5Njg3MjE2MTI5LnBuZyJ9.2' '6ZJzd3ReEjyptc_N8jX-00oFGs') emb.set_author(name='Discord Nitro Message', icon_url='https://images-ext-1.discordapp.net/eyJ1' 'cmwiOiJodHRwczovL2Nkbi5kaXNjb3JkYXBwLmNvbS9lbW9qaXMvMjYz' 'MDQzMDUxMzM5OTA3MDcyLnBuZyJ9.-pis3JTckm9LcASNN16DaKy9qlI') return emb
def _reload(self, ctx, *, ext: str = None): """Reloads a module. Can only be used by the owner.""" if ext: self.bot.unload_extension(ext) self.bot.load_extension(ext) else: for m in self.bot.initial_extensions: log.info('Reloading cog %s', m) self.bot.unload_extension(m) self.bot.load_extension(m)
def __init__(self): super().__init__(command_prefix=_callable_prefix, formatter=_chiaki_formatter, description=config.description, pm_help=None) # loop is needed to prevent outside coro errors self.session = aiohttp.ClientSession(loop=self.loop) self.table_base = None try: with open('data/command_image_urls.json') as f: self.command_image_urls = __import__('json').load(f) except FileNotFoundError: self.command_image_urls = {} self.message_counter = 0 self.command_counter = collections.Counter() self.custom_prefixes = JSONFile('customprefixes.json') self.cog_aliases = {} self.reset_requested = False psql = f'postgresql://{config.psql_user}:{config.psql_pass}@{config.psql_host}/{config.psql_db}' self.db = asyncqlio.DatabaseInterface(psql) self.loop.run_until_complete(self._connect_to_db()) self.db_scheduler = DatabaseScheduler(self.db, timefunc=datetime.utcnow) self.db_scheduler.add_callback(self._dispatch_from_scheduler) for ext in config.extensions: # Errors should never pass silently, if there's a bug in an extension, # better to know now before the bot logs in, because a restart # can become extremely expensive later on, especially with the # 1000 IDENTIFYs a day limit. self.load_extension(ext) self._game_task = self.loop.create_task(self.change_game())
def on_command_error(ctx, error): if isinstance(error, discord.ext.commands.errors.CommandNotFound): await ctx.author.send( "{}, this command does not exist!```{}```".format(ctx.message.author.mention, ctx.message.content)) elif isinstance(error, discord.ext.commands.errors.NotOwner): await ctx.author.send("{}, only my Owner can ask me to do that, nya!```{}```".format(ctx.message.author.mention, ctx.message.content)) elif isinstance(error, discord.ext.commands.errors.UserInputError): await ctx.author.send( "{}, Input error```py\n{}: {}\n```".format(ctx.message.author.mention, type(error).__name__, str(error))) elif isinstance(error, discord.ext.commands.errors.NoPrivateMessage): await ctx.author.send( "{}, this command cannot be send in a PM!```{}```".format(ctx.message.author.mention, ctx.message.content)) elif isinstance(error, discord.ext.commands.errors.CheckFailure): await ctx.author.send( "You don\'t have the permission to use this command, {}```{}```".format(ctx.message.author.mention, ctx.message.content)) else: await ctx.author.send( "{}, error```py\n{}: {}\n```".format(ctx.message.author.mention, type(error).__name__, str(error))) has_mention = False for arg in ctx.args: if '@' in str(arg): has_mention = True break if has_mention is True: return False await ctx.message.delete()
def _get_league_fixtures_timeframe(self, server_id: str, league_id: str, timeframe: str): """Retrieves specific league matchday fixtures from API Optional timeframe parameter: The value of the timeFrame argument must start with either p(ast) or n(ext), representing a timeframe either in the past or future. It is followed by a number in the range 1..99. It defaults to n7 in the fixture resource and is unset for fixture as a subresource. For instance: p6 would return all fixtures in the last 6 days, whereas n23 would result in returning all fixtures in the next 23 days.""" params = {'timeFrame': timeframe} url = self.api_url + 'competitions/{}/fixtures'.format(league_id) return await self._make_request(url, params, server_id)
def on_message(message): msgchan = message.channel if await command(message, "help", True): cmd = message.content[len(prefix + "help "):] help_cmds = "" if cmd == "": for ext in cmds.keys(): help_cmds += "\n**{}**:\n".format(ext) for cmda in cmds[ext].keys(): if len(cmda + cmds[ext][cmda]['help']) > 70: help_cmds += "\t- `{}`: {}...\n".format(cmda, cmds[ext][cmda]['help'][:70]) else: help_cmds += "\t- `{}`: {}\n".format(cmda, cmds[ext][cmda]['help']) if len(help_cmds) > 1750: await say(msgchan, help_cmds) help_cmds = "" await say(msgchan, help_cmds) help_cmds = "" await say(msgchan, "To get information of a specific command type {}help <command>".format(prefix)) else: error = 0 for ext in cmds.keys(): try: temp = cmds[ext][cmd]['help'] await say(msgchan, "`{}` ({}):\n{}\n\nUsage:\n`{}`".format(cmd, ext, cmds[ext][cmd]['help'], prefix + cmds[ext][cmd]['usage'])) temp = None except: temp = None error += 1 if error == len(cmds.keys()): await say(msgchan, "The command you entered ({}) could not be found.".format(cmd))
def get_source(self, thing): """returns a source object of a thing thing may be a non-builtin module, class, method, function, traceback, frame, or code object, or a space separated discord.ext.commands call, or a period deliminated file/module path as used when importing """ if isinstance(thing, str): if '.' in thing: # import modules = thing.split('.') def get_last_attr(prev, attrs): try: return prev.callback except AttributeError: if not attrs: return prev return get_last_attr(getattr(prev, attrs.pop(0)), attrs) thing = get_last_attr(__import__(modules.pop(0)), modules) else: # space delimited command call names = thing.split() thing = self.bot.commands[names.pop(0)] for name in names: thing = thing.commands[name] thing = thing.callback return Source(thing)
def handle_message_mention(self, origin, message): perms = origin.permissions_in(message.channel) if not perms.read_messages: # if you can't read the messages, then you shouldn't get PM'd. return messages = [] async for msg in self.bot.logs_from(message.channel, limit=3, before=message): messages.append(msg) messages.reverse() messages.append(message) if origin.status != discord.Status.online: # wait 30 seconds for context if it's not always on await asyncio.sleep(30) # get an updated reference, server references don't change member = message.server.get_member(origin.id) if member is None or member.status == discord.Status.online: # they've come online, so they might have checked the mention return # get the messages after this one ext = [] async for msg in self.bot.logs_from(message.channel, limit=3, after=message): ext.append(msg) ext.reverse() messages.extend(ext) try: fmt = 'You were mentioned in {0.channel.mention}:\n{1}' fmt = fmt.format(message, '\n'.join(map(self.format_message, messages))) await self.bot.send_message(origin, fmt) except: # silent failure best failure pass
def audioplayer(self, ctx, error_on_none=True): # TODO: ACCOUNT FOR WHEN THIS MESSAGE IS A PM if isinstance(ctx, discord.ext.commands.Context): if ctx.message.guild is None: # This is a private channel, so give it user ctx = ctx.message.author else: ctx = ctx.message.guild if isinstance(ctx, discord.User): author = ctx for audioplayer in self.audioplayers: member = audioplayer.guild.get_member(author.id) if member and member.voice and audioplayer.voice and audioplayer.voice.channel.id == member.voice.channel.id: if botdata.guildinfo(audioplayer.guild).is_banned(member): raise AudioPlayerNotFoundError("Nice try, but you're banned in the voice channel that I'm in") return audioplayer if error_on_none: raise AudioPlayerNotFoundError("You're not in any voice channels that I'm in") else: return None elif isinstance(ctx, discord.Guild): guild = ctx elif isinstance(ctx, discord.abc.GuildChannel): guild = ctx.guild else: raise ValueError(f"Incorrect type '{type(ctx)}' given to audioplayer function") for audioplayer in self.audioplayers: if audioplayer.guild == guild: return audioplayer if error_on_none: raise AudioPlayerNotFoundError(f"I'm not in a voice channel on this server/guild. Have an admin do `{self.bot.command_prefix}summon` to put me in one.") else: return None # Connects an audioplayer for the correct guild to the indicated channel
def load_all(self): """Load all extensions in the extensions list, but make sure all dependencies are matched for every cog.""" for extension in extensions: try: self.load_extension(f'ext.{extension}') except Exception as err: log.error(f'Failed to load {extension}', exc_info=True) sys.exit(1)
def midi(self, ctx: commands.Context, tempo: int=120, *, data: str=None): """ Convert text to MIDI. Multiple channels can be used by splitting text with |. Letters are converted to their pitch values using a mapping. Full documentation about it is not provided, read the code at https://github.com/lnmds/jose/blob/master/ext/midi.py To give longer input than a discord message allows you may upload a .txt file of up to 20 KiB. """ if data is None: try: data = await self.download_data(ctx.message) except Exception as err: log.exception('error downloading file at midi') raise self.SayException('We had an error while downloading ' 'the file, are you sure it is text?') before = time.monotonic() midi_file = await self.make_midi(tempo, data) duration = (time.monotonic() - before) * 1000 if midi_file is None: return await ctx.send('Failed to generate a MIDI file!') file = io.BytesIO() await self.loop.run_in_executor(None, midi_file.writeFile, file) file.seek(0) wrapped = discord.File(file, filename='boop.midi') await ctx.send(f'Took {duration:.3f}ms!', file=wrapped)
def __init__(self, bot): self.bot = bot self.player = dataIO.load_json("data/biomes/player.json") self.system = dataIO.load_json("data/biomes/system.json") self.charge = dataIO.load_json("data/biomes/charge.json") self.sponsor = dataIO.load_json("data/biomes/sponsor.json") defpar = {"SPONSOR_NB": 0, "STOP" : False, "EXT" : None,"OPEN" : False, "PLAYING" : False,"PLAYERS" : 0,"MAX_PLAYERS" : 12, "SAISON" : 1} extdiv = "data/biomes/ext/{}.txt"
def ext(self, ctx, val:str): """Change l'extension de la session.""" nom = val val += ".txt" exts = os.listdir("data/biomes/ext/") if val in exts: self.system["EXT"] = nom self.save("system") await self.bot.say("Extension {} selectionnée ".format(nom.title())) else: await self.bot.say("Cette extension n'existe pas.")
def check_folders(): if not os.path.exists("data/biomes"): print("Creation du fichier Biomes...") os.makedirs("data/biomes") if not os.path.exists("data/biomes/ext/"): print("Creation du fichier Extensions Biomes...") os.makedirs("data/biomes/ext/")
def build_rtfm_lookup_table(self): cache = {} page_types = { 'rewrite': ( 'http://discordpy.rtfd.io/en/rewrite/api.html', 'http://discordpy.rtfd.io/en/rewrite/ext/commands/api.html' ), 'latest': ( 'http://discordpy.rtfd.io/en/latest/api.html', ) } for key, pages in page_types.items(): sub = cache[key] = {} for page in pages: async with self.bot.session.get(page) as resp: if resp.status != 200: raise RuntimeError('Cannot build rtfm lookup table, try again later.') text = await resp.text(encoding='utf-8') root = etree.fromstring(text, etree.HTMLParser()) nodes = root.findall(".//dt/a[@class='headerlink']") for node in nodes: href = node.get('href', '') as_key = href.replace('#discord.', '').replace('ext.commands.', '') sub[as_key] = page + href self._rtfm_cache = cache
def steam_error(self, error, ctx): if isinstance(error, commands.errors.CommandOnCooldown): seconds = str(error)[34:] await self.bot.say(f':alarm_clock: Cooldown! Versuche es in {seconds} erneut') if isinstance(error, discord.ext.commands.errors.CommandInvokeError): await self.bot.say(':x: Konnte keine Verbindung zum Steam API aufbauen')
def init_extensions(self): for ext in os.listdir('extensions'): if ext.endswith('.py') and not ext.startswith('core'): try: self.bot.load_extension(f'extensions.{ext[:-3]}') self.settings['extensions'].append(f'extensions.{ext[:-3]}') except: pass
def download(url, ext : str = "jpg", sizeLimit : int = 8000000, ua : str = 'CorpNewt DeepThoughtBot'): """Download the passed URL and return the file path.""" # Set up a temp directory dirpath = tempfile.mkdtemp() tempFileName = url.rsplit('/', 1)[-1] # Strip question mark tempFileName = tempFileName.split('?')[0] imagePath = dirpath + "/" + tempFileName try: rImage = requests.get(url, stream = True, headers = {'User-agent': ua}) except: remove(dirpath) return None with open(imagePath, 'wb') as f: for chunk in rImage.iter_content(chunk_size=1024): if chunk: f.write(chunk) # Check if the file exists if not os.path.exists(imagePath): remove(dirpath) return None # Let's make sure it's less than the passed limit imageSize = os.stat(imagePath) while int(imageSize.st_size) > sizeLimit: try: # Image is too big - resize myimage = Image.open(imagePath) xsize, ysize = myimage.size ratio = sizeLimit/int(imageSize.st_size) xsize *= ratio ysize *= ratio myimage = myimage.resize((int(xsize), int(ysize)), Image.ANTIALIAS) myimage.save(imagePath) imageSize = os.stat(imagePath) except Exception: # Image too big and can't be opened remove(dirpath) return None try: # Try to get the extension img = Image.open(imagePath) ext = img.format img.close() except Exception: # Not something we understand - error out remove(dirpath) return None if ext: os.rename(imagePath, '{}.{}'.format(imagePath, ext)) return '{}.{}'.format(imagePath, ext) else: return imagePath
def load_cogs(bot): defaults = ("alias", "audio", "customcom", "downloader", "economy", "general", "image", "mod", "streams", "trivia") try: registry = dataIO.load_json("data/red/cogs.json") except: registry = {} bot.load_extension('cogs.owner') owner_cog = bot.get_cog('Owner') if owner_cog is None: print("The owner cog is missing. It contains core functions without " "which Red cannot function. Reinstall.") exit(1) if bot.settings._no_cogs: bot.logger.debug("Skipping initial cogs loading (--no-cogs)") if not os.path.isfile("data/red/cogs.json"): dataIO.save_json("data/red/cogs.json", {}) return failed = [] extensions = owner_cog._list_cogs() if not registry: # All default cogs enabled by default for ext in defaults: registry["cogs." + ext] = True for extension in extensions: if extension.lower() == "cogs.owner": continue to_load = registry.get(extension, False) if to_load: try: owner_cog._load_cog(extension) except Exception as e: print("{}: {}".format(e.__class__.__name__, str(e))) bot.logger.exception(e) failed.append(extension) registry[extension] = False dataIO.save_json("data/red/cogs.json", registry) if failed: print("\nFailed to load: {}\n".format(" ".join(failed)))
def __init__(self, ctx, bot: commands.Bot, idx_emoji, idx_embed): """ Create a new PagedEmbed. Arguments: ctx : discord.ext.commands.Context The context in which the command to send the PagedEmbed was invoked. bot : discord.ext.commands.Bot The bot that the application is logged in with. In Cogs, this is usually saved as an instance attribute `bot`. idx_emoji : str The Emoji which is used to navigate to the initial page, constructed through arguments and keyword arguments passed to this constructor. Example: embed = PagedEmbed(ctx, bot, "??", title="Index Page", description="This page gets shown initially." ) embed.add_page("??", discord.Embed( title="Second Page", description="This page gets shown when you click the ?? emoji." )) """ self._ctx = ctx self._bot = bot self._msg = None # Dictionary to contain embed Pages in the following format: # { # "idx_emoji": discord.Embed, # "emoji": discord.Embed, # "emoji": discord.Embed, # ... # } self._pages = { idx_emoji: idx_embed } self.current_page = self._pages[idx_emoji] # Attach our event listener to the bot self._bot.add_listener(self.on_reaction_add)
def _addimage(self, ctx: commands.Context, category: str, image_url: str=None): """Adds a new image to the specified category.""" await self.bot.type() server = ctx.message.server if server.id not in os.listdir(self.base): self.settings[server.id] = default_settings dataIO.save_json(self.settings_path, self.settings) os.makedirs(os.path.join(self.base, server.id)) if category not in self._list_image_dirs(server.id): await self.bot.reply(cf.error("Category not found.")) return attach = ctx.message.attachments if len(attach) > 1 or (attach and image_url): await self.bot.reply(cf.error("Please only provide one file.")) return url = "" filename = "" if attach: a = attach[0] url = a["url"] filename = a["filename"] elif image_url: url = image_url filename = os.path.basename( "_".join(url.split()).replace("%20", "_")) else: await self.bot.reply(cf.error( "You must provide either a Discord attachment" " or a direct link to an image.")) return new_id = str(self.settings[server.id]["next_ids"][category]) self.settings[server.id]["next_ids"][category] += 1 dataIO.save_json(self.settings_path, self.settings) ext = os.path.splitext(filename)[1] filepath = os.path.join( self.base, server.id, category, "{}_{}.{}".format( category, new_id, ext)) async with aiohttp.get(url) as new_sound: f = open(filepath, "wb") f.write(await new_sound.read()) f.close() await self.bot.reply(cf.info("Image added."))