Python discord 模块,TextChannel() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用discord.TextChannel()

项目:lagbot    作者:mikevb1    | 项目源码 | 文件源码
def nostalgia(self, ctx, date: date = None, *, channel: discord.TextChannel = None):
        """Pins an old message from a specific date.

        If a date is not given, then pins first message from the channel.
        If a channel is not given, then pins from the channel the
        command was ran on.

        The format of the date must be either YYYY-MM-DD or YYYY/MM/DD.
        """

        if channel is None:
            channel = ctx.channel
        if date is None:
            date = channel.created_at

        async for m in ctx.history(after=date, limit=1):
            try:
                await m.pin()
            except:
                await ctx.send('\N{THUMBS DOWN SIGN} Could not pin message.')
项目:Excalibot    作者:endreman0    | 项目源码 | 文件源码
def dad_jokes(message, match):
    name = ' '.join(match.group(5).replace('@everyone', '@\N{ZERO WIDTH SPACE}everyone').replace('@here', '@\N{ZERO WIDTH SPACE}here').split()[0:4])
    if name.startswith((',', '.', '!')):
        name = name[1:].lstrip()
    for mem in message.mentions:
        name = name.replace(mem.mention, [sub for sub in [mem.display_name, mem.name, '<\\@%s>' % mem.id] if '<@' not in sub][0])
    for role in message.role_mentions:
        name = name.replace(role.mention, role.name if '<@' not in role.name else '<\\@&%s>' % role.id)
    response = await message.channel.send('Hello **%s**, I\'m %s!' % (name, message.guild.me.display_name if isinstance(message.channel, discord.TextChannel) else message.channel.me.display_name))
    try:
        await bot.wait_for('message_delete', check=lambda m: m == message, timeout=60)
    except asyncio.TimeoutError:
        pass
    else:
        await response.delete()

### LONGCAT
项目:endrebot0    作者:endreman0    | 项目源码 | 文件源码
def afk_send(ctx, message_key, *args, **kwargs):
    global afk_targets
    if afk_targets is None:
        afk_targets = {channel.id: channel for channel in ctx.bot.get_all_channels() if isinstance(channel, discord.TextChannel)}
        afk_targets.update({mem.id: mem for mem in ctx.bot.get_all_members() if mem.bot})

    for info in ctx.bot.config['afk_messages']:
        if message_key in info:
            trigger = await afk_targets[info['dest']].send(info[message_key].format(*args, **kwargs))
            try:
                response = await ctx.bot.wait_for('message', check=lambda m: m.channel == trigger.channel, timeout=10)
                await response.ack()
            except asyncio.TimeoutError:
                pass

    await ctx.message.delete()
项目:dogbot    作者:slice    | 项目源码 | 文件源码
def create_reminder(self, ctx, due, note):
        cid = ctx.channel.id if isinstance(ctx.channel, discord.TextChannel) else ctx.author.id

        await self.bot.pgpool.execute(
            """
            INSERT INTO reminders
            (author_id, channel_id, note, due)
            VALUES ($1, $2, $3, $4)
            """,
            ctx.author.id, cid, note, due
        )

        logger.debug('Creating reminder -- due=%s note=%s cid=%d aid=%d', due, note, cid, ctx.author.id)

        # we just created a reminder, we definitely have one now!
        self.queue.has_item.set()

        # check if it's earlier
        if self.queue.current_item and self.queue.current_item['due'] > due:
            logger.debug('Got a reminder that is due earlier than the current one, rebooting task!')
            self.queue.reboot()
项目:dogbot    作者:slice    | 项目源码 | 文件源码
def watch(self, ctx, channel: discord.TextChannel, subreddit: str):
        """
        Sets up a channel for me to forward hot posts to, from a subreddit of your choosing.

        Only Dogbot Moderators may use this command.
        """
        # check that there isn't too many feeds
        async with self.bot.pgpool.acquire() as conn:
            count = (await conn.fetchrow('SELECT COUNT(*) FROM reddit_feeds WHERE guild_id = $1', ctx.guild.id))['count']
            logger.debug('Guild %s (%d) has %d feeds', ctx.guild.name, ctx.guild.id, count)
            if count >= 2:
                # they have 2 feeds, which is the max
                return await ctx.send(
                    f'You have too many feeds! You can only have two at a time. Use `{ctx.prefix}reddit feeds` '
                    'check the feeds in this server.'
                )
            await conn.execute('INSERT INTO reddit_feeds VALUES ($1, $2, $3)', ctx.guild.id, channel.id, subreddit)
        await ctx.ok()
项目:dogbot    作者:slice    | 项目源码 | 文件源码
def is_publicly_visible(bot: Dogbot, channel: discord.TextChannel) -> bool:
    """
    Returns whether a text channel should be considered as "publicly visible".
    If the guild has been configured to log all message events, this will always return True.

    Args:
        bot: The bot instance.
        channel: The channel to check for.

    Returns:
        Whether the text channel is considered "publicly visible".
    """
    # guild is configured to log all message events
    if await bot.config_is_set(channel.guild, 'log_all_message_events'):
        return True

    # find the @everyone overwrite for the channel
    everyone_overwrite = discord.utils.find(lambda t: t[0].name == '@everyone', channel.overwrites)
    return everyone_overwrite is None or everyone_overwrite[1].read_messages is not False
项目:Chiaki-Nanami    作者:Ikusaba-san    | 项目源码 | 文件源码
def _display_plonked(self, ctx, entries, plonk):
        # things = channels, members

        colour, action = _plonk_embed_mappings[plonk]
        embed = (discord.Embed(colour=colour)
                 .set_author(name=f'{action.title()} successful!', icon_url=PLONK_ICON)
                 )

        for thing in map(list, partition(lambda e: isinstance(e, discord.TextChannel), entries)):
            if not thing:
                continue

            name = f'{_get_class_name(thing[0])}{"s" * (len(thing) != 1)} {action}ed'
            value = truncate(', '.join(map(str, thing)), 1024, '...')
            embed.add_field(name=name, value=value, inline=False)

        await ctx.send(embed=embed)
项目:jose    作者:lnmds    | 项目源码 | 文件源码
def modattach(self, ctx, memberlog: discord.TextChannel, modlog: discord.TextChannel):
        """Attach channels to the moderation system"""
        modconfig = await self.modcfg_get(ctx.guild)
        if modconfig is not None:
            raise self.SayException('Mod config already exists.')

        modconfig = {
            'guild_id': ctx.guild.id,
            'member_log_id': memberlog.id,
            'mod_log_id': modlog.id,
            'last_action_id': 0,
        }

        log.info('[modlog:attach] Creating a modcfg @ g=%s[gid=%d]',
            ctx.guild.name, ctx.guild.id)

        res = await self.modcfg_coll.insert_one(modconfig)
        await ctx.success(res.acknowledged)
项目:jose    作者:lnmds    | 项目源码 | 文件源码
def starattach(self, ctx, starboard_chan: discord.TextChannel):
        """Attach an existing channel as a starboard.

        With this command you can create your starboard
        without needing José to automatically create the starboard for you
        """
        config = await self.get_starconfig(ctx.guild.id)
        if config:
            return await ctx.send('You already have a starboard config setup.')

        config = empty_starconfig(ctx.guild)
        config['starboard_id'] = starboard_chan.id
        res = await self.starconfig_coll.insert_one(config)

        if not res.acknowledged:
            raise self.SayException('Failed to create starboard config (no ack)')
            return

        await ctx.send('Done!')
        await ctx.ok()
项目:PyMiki    作者:TheGrammarJew    | 项目源码 | 文件源码
def mentionspam_ignore(self, ctx, *channels: discord.TextChannel):
        """Specifies what channels ignore mentionspam auto-bans.

        If a channel is given then that channel will no longer be protected
        by auto-banning from mention spammers.

        To use this command you must have the Ban Members permission.
        """

        query = """UPDATE guild_mod_config
                   SET safe_mention_channel_ids =
                       ARRAY(SELECT DISTINCT * FROM unnest(COALESCE(safe_mention_channel_ids, '{}') || $2::bigint[]))
                   WHERE id = $1;
                """

        if len(channels) == 0:
            return await ctx.send('Missing channels to ignore.')

        channel_ids = [c.id for c in channels]
        await ctx.db.execute(query, ctx.guild.id, channel_ids)
        self.get_guild_settings.invalidate(self, ctx.guild.id)
        await ctx.send(f'Mentions are now ignored on {", ".join(c.mention for c in channels)}.')
项目:PyMiki    作者:TheGrammarJew    | 项目源码 | 文件源码
def mentionspam_unignore(self, ctx, *channels: discord.TextChannel):
        """Specifies what channels to take off the ignore list.

        To use this command you must have the Ban Members permission.
        """

        if len(channels) == 0:
            return await ctx.send('Missing channels to protect.')

        query = """UPDATE guild_mod_config
                   SET safe_mention_channel_ids =
                       ARRAY(SELECT element FROM unnest(safe_mention_channel_ids) AS element
                             WHERE NOT(element = ANY($2::bigint[])))
                   WHERE id = $1;
                """

        await ctx.db.execute(query, ctx.guild.id, [c.id for c in channels])
        self.get_guild_settings.invalidate(self, ctx.guild.id)
        await ctx.send('Updated mentionspam ignore list.')
项目:PyMiki    作者:TheGrammarJew    | 项目源码 | 文件源码
def reaction_action(self, fmt, emoji, message_id, channel_id, user_id):
        if str(emoji) != '\N{WHITE MEDIUM STAR}':
            return

        channel = self.bot.get_channel(channel_id)
        if not isinstance(channel, discord.TextChannel):
            return

        method = getattr(self, f'{fmt}_message')

        user = self.bot.get_user(user_id)
        if user is None or user.bot:
            return

        async with self.bot.pool.acquire() as con:
            config = self.bot.get_cog('Config')
            if config:
                plonked = await config.is_plonked(channel.guild.id, user_id, channel_id=channel_id, connection=con)
                if plonked:
                    return

            try:
                await method(channel, message_id, user_id, connection=con)
            except StarError:
                pass
项目:PyMiki    作者:TheGrammarJew    | 项目源码 | 文件源码
def on_raw_message_delete(self, message_id, channel_id):
        if message_id in self._about_to_be_deleted:
            # we triggered this deletion ourselves and
            # we don't need to drop it from the database
            self._about_to_be_deleted.discard(message_id)
            return

        channel = self.bot.get_channel(channel_id)
        if channel is None or not isinstance(channel, discord.TextChannel):
            return

        starboard = await self.get_starboard(channel.guild.id)
        if starboard.channel is None or starboard.channel.id != channel_id:
            return

        # at this point a message got deleted in the starboard
        # so just delete it from the database
        async with self.bot.pool.acquire() as con:
            query = "DELETE FROM starboard_entries WHERE bot_message_id=$1;"
            await con.execute(query, message_id)
项目:PyMiki    作者:TheGrammarJew    | 项目源码 | 文件源码
def on_raw_bulk_message_delete(self, message_ids, channel_id):
        if message_ids <= self._about_to_be_deleted:
            # see comment above
            self._about_to_be_deleted.difference_update(message_ids)
            return

        channel = self.bot.get_channel(channel_id)
        if channel is None or not isinstance(channel, discord.TextChannel):
            return

        starboard = await self.get_starboard(channel.guild.id)
        if starboard.channel is None or starboard.channel.id != channel_id:
            return

        async with self.bot.pool.acquire() as con:
            query = "DELETE FROM starboard_entries WHERE bot_message_id=ANY($1::bigint[]);"
            await con.execute(query, list(message_ids))
项目:PyMiki    作者:TheGrammarJew    | 项目源码 | 文件源码
def on_raw_reaction_clear(self, message_id, channel_id):
        channel = self.bot.get_channel(channel_id)
        if channel is None or not isinstance(channel, discord.TextChannel):
            return

        async with self.bot.pool.acquire() as con:
            starboard = await self.get_starboard(channel.guild.id, connection=con)
            if starboard.channel is None:
                return

            query = "DELETE FROM starboard_entries WHERE message_id=$1 RETURNING bot_message_id;"
            bot_message_id = await con.fetchrow(query, message_id)

            if bot_message_id is None:
                return


            bot_message_id = bot_message_id[0]
            msg = await self.get_message(starboard.channel, bot_message_id)
            if msg is not None:
                await msg.delete()
项目:PyMiki    作者:TheGrammarJew    | 项目源码 | 文件源码
def nostalgia(self, ctx, date: date, *, channel: discord.TextChannel = None):
        """Pins an old message from a specific date.

        If a channel is not given, then pins from the channel the
        command was ran on.

        The format of the date must be either YYYY-MM-DD or YYYY/MM/DD.
        """
        channel = channel or ctx.channel

        message = await channel.history(after=date, limit=1).flatten()

        if len(message) == 0:
            return await ctx.send('Could not find message.')

        message = message[0]

        try:
            await message.pin()
        except discord.HTTPException:
            await ctx.send('Could not pin message.')
        else:
            await ctx.send('Pinned message.')
项目:statbot    作者:strinking    | 项目源码 | 文件源码
def on_guild_channel_update(self, before, after):
        self._log_ignored(f"Channel was updated in guild {after.guild.id}")
        if not await self._accept_channel(after):
            return

        if before.name != after.name:
            changed = f' (now {after.name})'
        else:
            changed = ''

        if isinstance(after, discord.TextChannel):
            self.logger.info(f"Channel #{before.name}{changed} was changed in {after.guild.name}")

            with self.sql.transaction() as trans:
                self.sql.update_channel(trans, after)

            # pylint: disable=not-callable
            hook = self.hooks['on_guild_channel_update']
            if hook:
                self.logger.debug(f"Found hook {hook!r}, calling it")
                await hook(before, after)
        elif isinstance(after, discord.VoiceChannel):
            self.logger.info("Voice channel {before.name}{changed} was changed in {after.guild.name}")

            with self.sql.transaction() as trans:
                self.sql.update_voice_channel(trans, after)
        elif isinstance(after, discord.CategoryChannel):
            self.logger.info(f"Channel category {before.name}{changed} was changed in {after.guild.name}")

            with self.sql.transaction() as trans:
                self.sql.update_channel_category(trans, after)
项目:Godavaru    作者:Godavaru    | 项目源码 | 文件源码
def lol(ctx):
    if ctx.message.guild.id == 213468583252983809:
        msg = await ctx.send("Searching channels... (this may take a while)")
        l = 0
        for c in ctx.message.guild.channels:
            if isinstance(c, discord.TextChannel):
                if c.permissions_for(ctx.message.guild.me).read_messages:
                    async for m in c.history():
                        if m.author.id == 132584525296435200 and "lol" in m.content.lower():
                            l = l+1
        await msg.edit(content="Lars' total lol counter so far is: `{}`".format(l))

# force update
项目:Excalibot    作者:endreman0    | 项目源码 | 文件源码
def link(self, ctx, text: discord.TextChannel, *, voice: discord.VoiceChannel):
        """Links an existing text channel to a voice channel."""
        with ctx.session:
            link = ctx.session.get(TextVoiceLink, text_id=text.id, voice_id=voice.id).one_or_none()
            if link is not None:
                return await ctx.send('BAKA! Those channels are already linked!')
            role = await self._create_role(ctx.guild, text, voice, 'Voice link requested by {}'.format(ctx.author))
            link = ctx.session.add(TextVoiceLink(role_id=role.id, text_id=text.id, voice_id=voice.id))
        await ctx.send(content='Linked {} to "{}"'.format(text.mention, voice.name))
项目:Excalibot    作者:endreman0    | 项目源码 | 文件源码
def unlink(self, ctx, text: discord.TextChannel, *, voice: discord.VoiceChannel):
        """Unlinks a voice channel and deletes the corresponding role."""
        with ctx.session:
            link = ctx.session.get(TextVoiceLink, text_id=text.id, voice_id=voice.id).one_or_none()
            if link is None:
                return await ctx.send('BAKA! Those channels are not linked!')
            role_id, text_id, voice_id = link.role_id, link.text_id, link.voice_id
            ctx.session.delete(link)
        role = discord.utils.get(ctx.guild.roles, id=role_id)
        if role is None:
            await ctx.send(content='Unlinked {} from "{}" and deleted the "{}" role.'.format(text.mention, voice.name, role.name))
        else:
            await self._delete_role(ctx.guild, role, text, voice, 'Voice unlink requested by {}'.format(ctx.author))
            await ctx.send(content='Unlinked {} from "{}" and deleted the "{}" role.'.format(text.mention, voice.name, role.name))
项目:Excalibot    作者:endreman0    | 项目源码 | 文件源码
def on_command_error(self, ctx, err):
        if isinstance(err, errors.PermissionDenied):
            await ctx.send('BAKA! You do not have my permission!')
        elif isinstance(err, errors.MissingPermissions):
            await ctx.send('BAKA! I require these permissions: %s' % ', '.join(err.args))
        elif isinstance(err, commands.NoPrivateMessage):
            await ctx.send('BAKA! This command does not work in private messages!')
        elif isinstance(err, commands.BadArgument):
            str_err = str(err)
            if not str_err.endswith(('.', '!')):
                str_err += '.'
            await ctx.send('BAKA! %s' % str_err)
        elif isinstance(err, commands.MissingRequiredArgument):
            await ctx.send('BAKA! Missing required argument: `%s`' % err.args[0].partition(' ')[0])
        elif isinstance(err, commands.TooManyArguments):
            await ctx.send('BAKA! Too many arguments!')
        elif isinstance(err, commands.CommandNotFound):
            pass
        else:
            await ctx.send('```\n%s\n```' % ''.join(traceback.format_exception_only(type(err), err)).strip())
            if isinstance(ctx.channel, discord.TextChannel):
                log.error('Error in command <{0}> ({1.name!r}({1.id}) {2}({2.id}) {3}({3.id}) {4!r})'.format(ctx.command, ctx.guild, ctx.channel, ctx.author, ctx.message.content))
            else:
                log.error('Error in command <{0}> (DM {1}({1.id}) {2!r})'.format(ctx.command, ctx.channel.recipient, ctx.message.content))
            log.error(''.join(traceback.format_exception(type(err), err, err.__traceback__)))
项目:kitsuchan-2    作者:n303p4    | 项目源码 | 文件源码
def channelinfo(self, ctx, *, channel: discord.TextChannel=None):
        """Display information about a text channel.
        Defaults to the current channel.

        * channel - Optional argument. A specific channel to get information about."""

        # If channel is None, then it is set to ctx.channel.
        channel = channel or ctx.channel

        embed = discord.Embed(title=f"{channel.name}")

        try:
            embed.description = channel.topic
        except AttributeError:
            pass

        embed.add_field(name="Channel ID", value=channel.id)

        try:
            embed.add_field(name="Guild", value=channel.guild.name)
        except AttributeError:
            pass

        embed.add_field(name="Members", value=len(channel.members))
        embed.add_field(name="Created at", value=channel.created_at.ctime())

        if channel.is_nsfw():
            embed.set_footer(text="NSFW content is allowed for this channel.")

        await ctx.send(embed=embed)
项目:SESTREN    作者:SirThane    | 项目源码 | 文件源码
def c4_enable(self, ctx, *, chan: discord.TextChannel=None):
        """Enable Connect Four on a channel

        Run without an argument to enable on the current channel
        Pass a channel as an argument to enable on that channel"""
        if not chan:
            chan = ctx.channel
        if self.db.sadd(f"{self.app_name}:c4:allowed_channels", chan.id):
            await self.message(ctx, msg="Connect Four successfully enabled on channel.")
        else:
            await self.message(ctx, msg="Connect Four already enabled on channel.", level=1)
项目:SESTREN    作者:SirThane    | 项目源码 | 文件源码
def c4_disable(self, ctx, *, chan: discord.TextChannel=None):
        """Disable Connect Four on a channel

        Run without an argument to disabled on the current channel
        Pass a channel as an argument to disable on that channel"""
        if not chan:
            chan = ctx.channel
        if self.db.srem(f"{self.app_name}:c4:allowed_channels", chan.id):
            await self.message(ctx, msg="Connect Four successfully disabled on channel.")
        else:
            await self.message(ctx, msg="Connect Four already disabled on channel.", level=1)
项目:SESTREN    作者:SirThane    | 项目源码 | 文件源码
def on_message(message):
    default_prefix = get_default_prefix()
    if not isinstance(message.channel, discord.TextChannel):
        bot.command_prefix = [default_prefix]
    else:
        guild_prefix = db.hget(f'{config}:prefix', message.guild.id)
        if guild_prefix:
            bot.command_prefix = [guild_prefix, default_prefix]
        else:
            bot.command_prefix = [default_prefix]
    await bot.process_commands(message)
项目:apex-sigma-core    作者:lu-ci    | 项目源码 | 文件源码
def check_nsfw(self):
        if isinstance(self.message.channel, discord.TextChannel):
            if self.cmd.nsfw:
                if self.message.channel.is_nsfw():
                    self.nsfw_denied = False
                else:
                    self.nsfw_denied = True
            else:
                self.nsfw_denied = False
        else:
            self.nsfw_denied = False
项目:apex-sigma-core    作者:lu-ci    | 项目源码 | 文件源码
def count_channels(channels):
    text = 0
    voice = 0
    categories = 0
    for channel in channels:
        if isinstance(channel, discord.TextChannel):
            text += 1
        elif isinstance(channel, discord.VoiceChannel):
            voice += 1
        elif isinstance(channel, discord.CategoryChannel):
            categories += 1
    return text, voice, categories
项目:apex-sigma-core    作者:lu-ci    | 项目源码 | 文件源码
def hardunmute(cmd, message, args):
    if message.author.permissions_in(message.channel).manage_channels:
        if message.mentions:
            target = message.mentions[0]
            if len(args) > 1:
                reason = ' '.join(args[1:])
            else:
                reason = 'Not stated.'
            hierarchy_me = hierarchy_permit(message.guild.me, target)
            if hierarchy_me:
                hierarchy_auth = hierarchy_permit(message.author, target)
                if hierarchy_auth:
                    ongoing = discord.Embed(color=0x696969, title='? Editing permissions...')
                    ongoing_msg = await message.channel.send(embed=ongoing)
                    for channel in message.guild.channels:
                        if isinstance(channel, discord.TextChannel) or isinstance(channel, discord.CategoryChannel):
                            try:
                                await channel.set_permissions(target, overwrite=None, reason=reason)
                            except discord.Forbidden:
                                pass
                    log_embed = generate_log_embed(message, target, args)
                    await log_event(cmd.db, message.guild, log_embed)
                    title = f'? {target.display_name} has been hard-unmuted.'
                    response = discord.Embed(color=0x77B255, title=title)
                    await ongoing_msg.delete()
                else:
                    response = discord.Embed(color=0xBE1931, title='? That user is euqal or above you.')
            else:
                response = discord.Embed(color=0xBE1931, title='? I can\'t mute a user equal or above me.')
        else:
            response = discord.Embed(color=0xBE1931, title='? No user targetted.')
    else:
        response = discord.Embed(title='? Access Denied. Manage Channels needed.', color=0xBE1931)
    await message.channel.send(embed=response)
项目:apex-sigma-core    作者:lu-ci    | 项目源码 | 文件源码
def quote(cmd, message, args):
    if args:
        lookup = args[0]
        try:
            lookup = int(lookup)
        except ValueError:
            lookup = None
        if lookup:
            msg = None
            for channel in message.guild.channels:
                if isinstance(channel, discord.TextChannel):
                    try:
                        msg = await channel.get_message(lookup)
                        break
                    except discord.Forbidden:
                        msg = None
                    except discord.NotFound:
                        msg = None
            if msg:
                if msg.content:
                    location = f'{msg.guild.name} | #{msg.channel.name}'
                    response = discord.Embed(color=msg.author.color, timestamp=msg.created_at)
                    response.set_author(name=f'{msg.author.display_name}', icon_url=user_avatar(msg.author))
                    response.description = msg.content
                    response.set_footer(text=location)
                else:
                    response = discord.Embed(color=0xBE1931, title='? That message has no text content.')
            else:
                response = discord.Embed(color=0xBE1931, title='? Message not found.')
        else:
            response = discord.Embed(color=0xBE1931, title='? Invalid ID given.')
    else:
        response = discord.Embed(color=0xBE1931, title='? No ID given.')
    await message.channel.send(embed=response)
项目:dogbot    作者:slice    | 项目源码 | 文件源码
def _get_recent_image(channel: discord.TextChannel) -> typing.Optional[discord.Message]:
    async for msg in channel.history(limit=100):
        # Scan any attached images.
        for attachment in msg.attachments:
            if attachment.height:
                return attachment.proxy_url

        # Scan any embeds in the message.
        for embed in msg.embeds:
            if embed.image is discord.Embed.Empty:
                continue
            return embed.image.proxy_url
项目:dogbot    作者:slice    | 项目源码 | 文件源码
def notify_error(self, channel: discord.TextChannel, text: str):
        embed = discord.Embed(title='\N{WARNING SIGN} Feed error', description=text, color=0xff4747)
        try:
            await channel.send(embed=embed)
        except discord.Forbidden:
            # wow
            pass
项目:dogbot    作者:slice    | 项目源码 | 文件源码
def on_raw_bulk_message_delete(self, message_ids, channel_id: int):
        # add to list of bulk deletes so we don't process message delete events for these messages
        self.bulk_deletes.add(message_ids=message_ids)

        # resolve the channel that the message was deleted in
        channel = self.bot.get_channel(channel_id)

        # don't handle non-existent channels or dms
        if not channel or not isinstance(channel, discord.TextChannel):
            return

        # log
        await self.log(channel.guild, f'\U0001f6ae {len(message_ids)} message(s) deleted in {channel.mention}')
项目:dogbot    作者:slice    | 项目源码 | 文件源码
def on_message_delete(self, msg: discord.Message):
        # don't handle message deletion elsewhere
        if not isinstance(msg.channel, discord.TextChannel):
            return

        # do not process bulk message deletes, or message censors (the censor cog does that already)
        # TODO: do this but cleanly, maybe paste website?
        if await self.bulk_deletes.check_batch(msg.id) or await self.censored_messages.check(message_id=msg.id):
            return

        # if this channel isn't publicly visible or deletes shouldn't be tracked, bail
        if (not await is_publicly_visible(self.bot, msg.channel) or
                await self.bot.config_is_set(msg.guild, 'modlog_notrack_deletes')):
            return

        # if the author was a bot and we aren't configured to allow bots, return
        if msg.author.bot and not await self.bot.config_is_set(msg.guild, 'modlog_filter_allow_bot'):
            return

        # format attachment list
        attachments = 'no attachments' if not msg.attachments else f'{len(msg.attachments)} attachment(s): ' + \
            ', '.join(f'{a.filename}, {filesize(a.size)}' for a in msg.attachments)

        content = utils.prevent_codeblock_breakout(utils.truncate(msg.content, 1500))
        fmt = (f'\U0001f6ae Message by {describe(msg.author)} deleted in {msg.channel.mention}: ```\n{content}\n``` '
               f'({attachments}, {len(msg.embeds)} embed(s)')
        await self.log(msg.guild, fmt)
项目:dogbot    作者:slice    | 项目源码 | 文件源码
def is_public(self, ctx, channel: discord.TextChannel=None):
        """
        Checks if a channel is public.

        This command is in the Modlog cog because the modlog does not process message edit and
        delete events for private channels.

        If you have turned 'log_all_message_events' on, this will always say public.
        """
        channel = channel if channel else ctx.channel
        public = f'{channel.mention} {{}} public to @\u200beveryone.'
        await ctx.send(public.format('is' if await is_publicly_visible(self.bot, channel) else '**is not**'))
项目:goldmine    作者:Armored-Dragon    | 项目源码 | 文件源码
def guildinfo(self, ctx):
        """Get loads of info about this guild.
        Usage: guildinfo"""
        s = ctx.guild
        ach = s.channels
        chlist = [len(ach), 0, 0]
        for i in ach:
            if isinstance(i, discord.TextChannel):
                chlist[1] += 1
            else:
                chlist[2] += 1
        iurl = s.icon_url
        s_reg = str(s.region)
        r_embed = discord.Embed(color=random.randint(0, 255**3-1))
        if iurl:
            thing = {'url': iurl}
        else:
            thing = {}
        r_embed.set_author(name=s.name, **thing, icon_url=(iurl if iurl else ctx.me.avatar_url))
        r_embed.set_footer(text=ctx.me.display_name, icon_url=ctx.me.avatar_url)
        if iurl:
            r_embed.set_image(url=iurl)
        r_embed.add_field(name='ID', value=s.id)
        r_embed.add_field(name='Members', value=len(s.members))
        r_embed.add_field(name='Channels', value=ch_fmt.format(*[str(i) for i in chlist]))
        r_embed.add_field(name='Roles', value=len(s.roles))
        r_embed.add_field(name='Custom Emojis', value=len(s.emojis))
        r_embed.add_field(name='Region (Location)', value=str(s.region).replace('-', ' ').title().replace('Eu ', 'EU ').replace('Us ', 'US ').replace('Vip', 'VIP '))
        r_embed.add_field(name='Owner', value=str(s.owner))
        r_embed.add_field(name='Default Channel', value=f'<#{s.default_channel.id}>\n(#{s.default_channel.name})' if s.default_channel is not None else 'None (deleted)')
        r_embed.add_field(name='Admins Need 2FA', value=('Yes' if s.mfa_level else 'No'))
        r_embed.add_field(name='Verification Level', value=v_level_map[str(s.verification_level)])
        await ctx.send(embed=r_embed)
项目:goldmine    作者:Armored-Dragon    | 项目源码 | 文件源码
def info(self, ctx):
        """Get bot info.
        Usage: info"""
        ach = self.bot.get_all_channels()
        chlist = [0, 0, 0]
        for i in ach:
            chlist[0] += 1
            if isinstance(i, discord.TextChannel):
                chlist[1] += 1
            else:
                chlist[2] += 1
        up = self.bot.format_uptime()
        ram = self.bot.get_ram()
        got_conversion = ram[0]
        emb = discord.Embed(color=random.randint(0, 255**3-1))
        emb.set_author(name=ctx.me.display_name, url='https://khronodragon.com/', icon_url=ctx.me.avatar_url)
        emb.set_footer(text='Made in Python 3.6+ with Discord.py %s' % self.bot.lib_version, icon_url='https://upload.wikimedia.org/wikipedia/commons/thumb/c/c3/Python-logo-notext.svg/400px-Python-logo-notext.svg.png')
        emb.add_field(name='Guilds', value=len(self.bot.guilds))
        emb.add_field(name='Author', value='Dragon5232#1841')
        emb.add_field(name='Uptime', value=up)
        emb.add_field(name='Local Time', value=time.strftime(absfmt, time.localtime()))
        emb.add_field(name='Cogs Loaded', value=len(self.bot.cogs))
        emb.add_field(name='Command Calls', value=sum(self.bot.command_calls.values()))
        emb.add_field(name='Memory Used', value=(str(round(ram[1], 1)) + ' MB (%s MiB)' % str(round(ram[2], 1))) if got_conversion else 'Couldn\'t fetch')
        emb.add_field(name='Members Seen', value=sum(g.member_count for g in self.bot.guilds))
        emb.add_field(name='Channels', value=ch_fmt.format(*[str(i) for i in chlist]))
        emb.add_field(name='Custom Emojis', value=len(self.bot.emojis))
        emb.add_field(name='Commands', value=str(len(self.bot.all_commands)))
        emb.add_field(name='ID', value=ctx.me.id)
        if self.bot.user.id == 239775420470394897:
            emb.add_field(name='Invite Link', value='https://tiny.cc/goldbot')
        await ctx.send(home_broadcast, embed=emb)
项目:goldmine    作者:Armored-Dragon    | 项目源码 | 文件源码
def nitro_sendto(self, ctx, *, channel: discord.TextChannel):
        """Send a fake Nitro message embed to a channel.
        Usage: nitro_sendto [channel]"""
        emb = self.get_nitro_embed()
        with channel.typing():
            await asyncio.sleep(random.uniform(0.25, 1.2), loop=self.loop)
            await channel.send(embed=emb)
项目:StreamNotificationBot    作者:ivandardi    | 项目源码 | 文件源码
def __init__(self, subscriber):
        if not isinstance(subscriber, (discord.User, discord.Member, discord.TextChannel)):
            raise ValueError("Passed subscriber isn't an User nor a TextChannel")
        self.subscriber = subscriber
        self.id = self.subscriber.id
项目:StreamNotificationBot    作者:ivandardi    | 项目源码 | 文件源码
def _add_command(self, ctx, username: str = None, channel: discord.TextChannel = None):
        """add command"""
        username = await self.validate_username(username)
        channel = await validate_notification_channel(ctx, channel)
        async with ctx.typing():
            subscriber = Subscriber(channel or ctx.author)
            await self._subscribe_to_streamer(subscriber, username)

        await ctx.send(f'{subscriber} subscribed to {username} successfully!')
项目:StreamNotificationBot    作者:ivandardi    | 项目源码 | 文件源码
def _del_command(self, ctx, username: str = None, channel: discord.TextChannel = None):
        """del command"""
        username = await self.validate_username(username)
        channel = await validate_notification_channel(ctx, channel)
        async with ctx.typing():
            subscriber = Subscriber(channel or ctx.author)
            await self._del_subscription(subscriber, username)

        await ctx.send(f'{subscriber} unsubscribed to {username} successfully!')
项目:StreamNotificationBot    作者:ivandardi    | 项目源码 | 文件源码
def _list_command(self, ctx, channel: discord.TextChannel = None):
        """list command"""
        channel = await validate_notification_channel(ctx, channel)
        subscriber = Subscriber(channel or ctx.author)
        async with ctx.typing():
            streamers = await self.bot.database.get_subscriptions_from_subscriber(subscriber.id, self.service_name)
            embed = self._make_list_embed(streamers, subscriber)

        await ctx.send(embed=embed)
项目:StreamNotificationBot    作者:ivandardi    | 项目源码 | 文件源码
def _enable_command(self, ctx, channel: discord.TextChannel = None):
        channel = await validate_notification_channel(ctx, channel)
        subscriber = Subscriber(channel or ctx.author)
        self.disabled_users.discard(subscriber.id)
        await ctx.send(f'{subscriber.subscriber} has notifications enabled.')
项目:StreamNotificationBot    作者:ivandardi    | 项目源码 | 文件源码
def on_guild_channel_delete(self, channel: discord.TextChannel):
        log.info('Guild channel deleted')
        await self._remove_channels_from_database([channel])
项目:Chiaki-Nanami    作者:Ikusaba-san    | 项目源码 | 文件源码
def __init__(self, ctx):
        permissions_in = ctx.author.permissions_in

        _channel_parsers = {
            discord.TextChannel: functools.partial(_parse_channel, prefix='#', predicate=lambda c: permissions_in(c).read_messages),
            discord.VoiceChannel: functools.partial(_parse_channel, prefix='', predicate=lambda c: permissions_in(c).connect),
        }

        entries = [
            (category, [_channel_parsers[c.__class__](c) for c in entries])
            for category, channels in ctx.guild.by_category()
            for entries in sliced(channels, 10)
        ]

        super().__init__(ctx, entries, lines_per_page=1)
项目:Chiaki-Nanami    作者:Ikusaba-san    | 项目源码 | 文件源码
def info_channel(self, ctx, channel: union(discord.TextChannel, discord.VoiceChannel)=None):
        """Shows info about a voice or text channel."""
        if channel is None:
            channel = ctx.channel
        embed_type = 'text_channel_embed' if isinstance(channel, discord.TextChannel) else 'voice_channel_embed'
        channel_embed = getattr(self, embed_type)(channel)
        channel_embed.colour = self.bot.colour

        await ctx.send(embed=channel_embed)
项目:Chiaki-Nanami    作者:Ikusaba-san    | 项目源码 | 文件源码
def stats(self, ctx):
        """Shows some general statistics about the bot.

        Do not confuse this with `{prefix}about` which is just the
        general info. This is just numbers.
        """

        bot = self.bot
        command_map = itertools.starmap('{1} {0}'.format, bot.command_counter.most_common())
        command_stats = '\n'.join(command_map) or 'No stats yet.'
        extension_stats = '\n'.join(f'{len(set(getattr(bot, attr).values()))} {attr}'
                                    for attr in ('cogs', 'extensions'))

        with self.process.oneshot():
            memory_usage_in_mb = self.process.memory_full_info().uss / 1024**2
            cpu_usage = self.process.cpu_percent() / psutil.cpu_count()

        uptime_seconds = bot.uptime.total_seconds()
        average_messages = bot.message_counter / uptime_seconds
        message_field = f'{bot.message_counter} messages\n({average_messages :.2f} messages/sec)'

        text, voice = partition(lambda c: isinstance(c, discord.TextChannel), bot.get_all_channels())
        presence = (f"{bot.guild_count} Servers\n{ilen(text)} Text Channels\n"
                    f"{ilen(voice)} Voice Channels\n{bot.user_count} Users")

        chiaki_embed = (discord.Embed(description=bot.appinfo.description, colour=self.bot.colour)
                        .set_author(name=str(ctx.bot.user), icon_url=bot.user.avatar_url)
                        .add_field(name='Modules', value=extension_stats)
                        .add_field(name='CPU Usage', value=f'{cpu_usage}%\n{memory_usage_in_mb :.2f}MB')
                        .add_field(name='Messages', value=message_field)
                        .add_field(name='Presence', value=presence)
                        .add_field(name='Commands', value=command_stats)
                        .add_field(name='Uptime', value=self.bot.str_uptime.replace(', ', '\n'))
                        )
        await ctx.send(embed=chiaki_embed)
项目:Chiaki-Nanami    作者:Ikusaba-san    | 项目源码 | 文件源码
def convert(self, ctx, argument):
        bot = ctx.bot

        match = self._get_id_match(argument) or re.match(r'<#([0-9]+)>$', argument)
        result = None
        guild = ctx.guild

        if match is not None:
            return await super().convert(ctx, argument)
        else:
            if self.case_sensitive:
                def check(c):
                    return isinstance(c, discord.TextChannel) and c.name == argument
            else:
                # not a mention
                lowered = argument.lower()

                def check(c):
                    return isinstance(c, discord.TextChannel) and c.name.lower() == lowered

            if guild:
                result = list(filter(check, guild.text_channels))
                transform = str
            else:
                result = list(filter(check, bot.get_all_channels()))
                transform = '{0} (Server: {0.guild})'

        return await ctx.disambiguate(result, transform)
项目:Chiaki-Nanami    作者:Ikusaba-san    | 项目源码 | 文件源码
def send_message(self, ctx, channel: discord.TextChannel, *, msg):
        """Sends a message to a particular channel"""
        owner = (await self.bot.application_info()).owner
        await channel.send(f"Message from {owner}:\n{msg}")
        await ctx.send(f"Successfully sent message in {channel}: {msg}")
项目:Discord-ASM-Bot    作者:Emzi0767    | 项目源码 | 文件源码
def on_message(self, message):
        #if message.channel.is_private:
        if not isinstance(message.channel, discord.TextChannel):
            return

        if message.author.bot:
            return

        await self.process_commands(message)
项目:GAFBot    作者:DiNitride    | 项目源码 | 文件源码
def invite(self, ctx, guild=None):
        """
        Creates an invite to a specified server
        """
        guild_names = list("{} - ID: {}".format(g.name, g.id) for g in self.bot.guilds)
        if guild is None:
            guild = await reaction_menu.start_reaction_menu(self.bot, guild_names, ctx.author, ctx.channel, count=1,
                                                            timeout=60, per_page=10, header=header,
                                                            return_from=self.bot.guilds, allow_none=True)
            guild = guild[0]
        else:
            guild = discord.utils.find(lambda s: s.name == guild or str(s.id) == guild, self.bot.guilds)
            if guild is None:
                await ctx.send("`Unable to locate guild`")
                return

        for channel in guild.channels:
            if isinstance(channel, discord.TextChannel):
                try:
                    invite = await channel.create_invite()
                    await ctx.send("`Created an invite to guild, I will DM it to you`")
                    dm_channel = ctx.author.dm_channel
                    if dm_channel is None:
                        dm_channel = await ctx.author.create_dm()
                    await dm_channel.send(invite.url)
                    break
                except discord.HTTPException:
                    await ctx.send("`Failed to create invite for guild!`")