def tick(self, tick_type: str, *, raw: bool = False, guild: Guild = None) -> str: """ Returns a custom tick emoji. Args: tick_type: The tick type to return. Either "green" or "red". raw: Specifies whether the returned tick shouldn't be in emoji message formatting form. guild: Specifies the guild that this reaction will be used in. Used in checking if we can actually use the ticks. If not, we return the unicode alternatives instead. Returns: The tick. """ raw_tick = '\U00002705' if tick_type == 'green' else '\U0000274c' # use raw ticks if we can't use external emoji, or we want to if guild and not guild.me.guild_permissions.external_emojis: return raw_tick try: # fetch tick from config custom_tick = self.cfg['bot']['emoji'][tick_type + '_tick'] return custom_tick if raw else f'<:{custom_tick}>' except KeyError: return raw_tick
def is_bot_collection(bot: Dogbot, guild: discord.Guild) -> bool: """Returns a bool indicating whether a guild is a collection.""" if await is_blacklisted(bot, guild.id): return True # keywords in the guild name if any(keyword in guild.name.lower() for keyword in ('bot collection', 'bot hell')): return True # special guilds that shouldn't be classified as a bot collection if guild.id in WHITELISTED_GUILDS: return False # ratio too big! if user_to_bot_ratio(guild) >= UTBR_MAXIMUM: return True return False
def get_responsible(self, guild: discord.Guild, action: str, *, target: discord.Member=None) -> discord.AuditLogEntry: """ Checks the audit log for recent action performed on some user. :param guild: The :class:`discord.Guild` to look at. :param action: The name of the :class:`discord.AuditLogAction` attribute to check for. :param target: The targeted user to check for. :returns: The audit log entry. """ try: # get the audit logs for the action specified entries = await guild.audit_logs(limit=1, action=getattr(discord.AuditLogAction, action)).flatten() # only check for entries performed on target, and happened in the last 2 seconds def check(entry): created_ago = (datetime.datetime.utcnow() - entry.created_at).total_seconds() return (entry.target == target if target else True) and created_ago <= 2 return discord.utils.find(check, entries) except discord.Forbidden: pass
def ranks(self, user_id: int, guild: discord.Guild) -> tuple: """Get ranking data about a user.""" all_accounts = await self.all_accounts() all_ids = [account['id'] for account in all_accounts] guild_ids = [account['id'] for account in all_accounts if guild.get_member(account['id']) is not None] try: guildrank = guild_ids.index(user_id) + 1 except ValueError: guildrank = -20 globalrank = all_ids.index(user_id) + 1 return guildrank, globalrank, len(guild_ids), len(all_ids)
def current(self, ctx): """Get your current heist join session.""" session = self.get_sess(ctx) em = discord.Embed(title='Current heist status') em.add_field(name='Guild being attacked', value=f'`{session.target!s}` [{session.target.id}]') em.add_field(name='Amount being heisted', value=f'`{session.amount!s}`JC') users_in_heist = [] for user_id in session.users: users_in_heist.append(f'<@{user_id}>') em.add_field(name='Current users in the heist', value=' '.join(users_in_heist)) await ctx.send(embed=em)
def get_prefixes(self, guild: Guild) -> List[str]: """Returns the supplementary prefixes for a guild.""" if not guild: return [] return await self.redis.smembers(f'dog:prefixes:{guild.id}', encoding='utf-8')
def command_is_disabled(self, guild: discord.Guild, command_name: str) -> bool: return await self.redis.exists(f'disabled:{guild.id}:{command_name}')
def disable_command(self, guild: discord.Guild, command_name: str): logger.debug('Disabling %s in %d.', command_name, guild.id) await self.redis.set(f'disabled:{guild.id}:{command_name}', 'on')
def enable_command(self, guild: discord.Guild, command_name: str): logger.debug('Enabling %s in %d.', command_name, guild.id) await self.redis.delete(f'disabled:{guild.id}:{command_name}')
def config_get(self, guild: Guild, name: str): """Returns a configuration key's value for a guild.""" return await self.redis.get(f'{guild.id}:{name}', encoding='utf-8')
def config_is_set(self, guild: discord.Guild, name: str) -> bool: """ Returns whether a configuration key for a guild is set or not. .. NOTE:: This does not look at the value of a configuration key; it just checks if it exists. In Dogbot, a configuration key existing signifies that it is set. """ return await self.redis.exists(f'{guild.id}:{name}')
def on_command(self, ctx): # some metadata author = ctx.message.author checks = [c.__qualname__.split('.')[0] for c in ctx.command.checks] location = '[DM]' if isinstance(ctx.channel, DMChannel) else '[Guild]' # log command invocation logger.info( '%s Command invocation by %s (%d) "%s" checks=%s', location, author, author.id, ctx.message.content, ','.join(checks) or '(none)' )
def user_to_bot_ratio(guild: discord.Guild): bots, users = 0, 0 for member in guild.members: if member.bot: bots += 1 else: users += 1 return bots / users
def __init__(self, guild: Guild): self.guild: Guild = guild self.looping = False self.to_loop = None # list of user IDs that have voted to skip self.skip_votes = [] self.queue = []
def state_for(self, guild: discord.Guild): """Returns a State instance for a guild. If one does not exist, it is created.""" if guild.id not in self.states: self.states[guild.id] = State(guild) return self.states[guild.id]
def is_whitelisted(bot, guild: Guild): query = """ SELECT * FROM music_guilds WHERE guild_id = $1 """ record = await bot.pgpool.fetchrow(query, guild.id) return record is not None
def guild_fields(self, g: Guild): """Returns a list of fields to be passed into :method:``monitor_send`` from a guild.""" ratio = botcollection.user_to_bot_ratio(g) humans = utils.commas(sum(1 for u in g.members if not u.bot)) bots = utils.commas(sum(1 for u in g.members if u.bot)) return [ ('Guild', f'{g.name}\n`{g.id}`'), ('Owner', f'{g.owner.mention} {g.owner}\n`{g.owner.id}`'), ('Info', f'Created {utils.ago(g.created_at)}'), ('Members', f'Members: {len(g.members)} (UTBR: {ratio})\n{humans} human(s), {bots} bot(s)') ]
def on_guild_remove(self, g: Guild): if g.id in self.refuse_notify_left: # refuse to notify that we got removed from the guild, because the "left bot collection"/"left blacklisted" # monitor message already does that self.refuse_notify_left.remove(g.id) return fields = self.guild_fields(g) await self.monitor_send(title='\N{OUTBOX TRAY} Removed from guild', fields=fields, color=Colors.ORANGE) await self.bot.redis.incr('stats:guilds:removes')
def log(self, guild: discord.Guild, text: str, *, do_not_format: bool=False) -> discord.Message: """ Directly logs a message to a guild's modlog channel. :param guild: The guild to log to. :param text: The text to log. :param do_not_format: Disables automatic time formatting. :return: The sent message. """ return await self.bot.send_modlog(guild, text if do_not_format else self.modlog_msg(text))
def on_guild_emojis_update(self, guild: discord.Guild, before: 'List[discord.Emoji]', after: 'List[discord.Emoji]'): added, removed = diff(before, after) if not added and not removed: # TODO: Handle renames return differences = describe_differences(self.bot, added, removed) await self.log(guild, f'\N{FRAME WITH PICTURE} Emoji updated: {differences}')
def on_member_ban(self, guild: discord.Guild, user: discord.Guild): # don't make on_member_remove process this user's departure self.ban_debounces.add(user_id=user.id, guild_id=guild.id) verb = 'was banned' msg = await self.log(guild, self.format_member_departure(user, verb=verb, emoji='\N{HAMMER}')) await self.autoformat_responsible(msg, user, 'ban', departure=True, departure_extra=verb, departure_emoji='\N{HAMMER}')
def guildtree(self, ctx, *ids: str): """List the guilds I am in (tree version). Usage: guildtree""" echeck_perms(ctx, ('bot_owner',)) pager = commands.Paginator(prefix='```diff') guilds: List[discord.Guild] if ids: s_map = {i.id: i for i in self.bot.guilds} for sid in ids: with assert_msg(ctx, '**ID** `%s` **is invalid. (must be 18 numbers)**' % sid): check(len(sid) == 18) try: guilds.append(s_map[sid]) except KeyError: await ctx.send('guild ID **%s** not found.' % sid) return False else: guilds = self.bot.guilds for guild in guilds: pager.add_line('+ ' + guild.name + ' [{0} members] [ID {1}]'.format(str(len(guild.members)), guild.id)) for channel in guild.channels: xname = channel.name if str(channel.type) == 'voice': xname = '[voice] ' + xname pager.add_line(' • ' + xname) for page in pager.pages: await ctx.send(page)
def on_guild_channel_delete(self, channel: discord.TextChannel): log.info('Guild channel deleted') await self._remove_channels_from_database([channel])
def on_guild_remove(self, guild: discord.Guild): log.info('Guild deleted') await self._remove_channels_from_database(guild.channels)
def _show_blacklist_embed(self, ctx, colour, action, icon, thing, reason, time): embed = discord.Embed(colour=colour) type_name = 'Server' if isinstance(thing, discord.Guild) else 'User' reason = truncate(reason, 1024, '...') if reason else 'None' embed = (discord.Embed(colour=colour, timestamp=time) .set_author(name=f'{type_name} {action}', icon_url=icon) .add_field(name='Name', value=thing) .add_field(name='ID', value=thing.id) .add_field(name='Reason', value=reason, inline=False) ) await ctx.send(embed=embed)
def audioplayer(self, ctx, error_on_none=True): # TODO: ACCOUNT FOR WHEN THIS MESSAGE IS A PM if isinstance(ctx, discord.ext.commands.Context): if ctx.message.guild is None: # This is a private channel, so give it user ctx = ctx.message.author else: ctx = ctx.message.guild if isinstance(ctx, discord.User): author = ctx for audioplayer in self.audioplayers: member = audioplayer.guild.get_member(author.id) if member and member.voice and audioplayer.voice and audioplayer.voice.channel.id == member.voice.channel.id: if botdata.guildinfo(audioplayer.guild).is_banned(member): raise AudioPlayerNotFoundError("Nice try, but you're banned in the voice channel that I'm in") return audioplayer if error_on_none: raise AudioPlayerNotFoundError("You're not in any voice channels that I'm in") else: return None elif isinstance(ctx, discord.Guild): guild = ctx elif isinstance(ctx, discord.abc.GuildChannel): guild = ctx.guild else: raise ValueError(f"Incorrect type '{type(ctx)}' given to audioplayer function") for audioplayer in self.audioplayers: if audioplayer.guild == guild: return audioplayer if error_on_none: raise AudioPlayerNotFoundError(f"I'm not in a voice channel on this server/guild. Have an admin do `{self.bot.command_prefix}summon` to put me in one.") else: return None # Connects an audioplayer for the correct guild to the indicated channel
def guildinfo(self, guildid): if isinstance(guildid, discord.ext.commands.Context): guildid = guildid.message.guild if isinstance(guildid, discord.abc.GuildChannel): guildid = guildid.guild if isinstance(guildid, discord.Guild): guildid = guildid.id if guildid is None: return None return GuildInfo(self, guildid)
def guild_ratio(self, guild: discord.Guild) -> float: """Get the bot-to-human ratio for a guild""" if len(guild.members) < 50: return BOT_RATIO_MIN else: return BOT_RATIO_MAX
def get_name(self, user_id, account=None): """Get a string representation of a user or guild.""" if isinstance(user_id, discord.Guild): return f'taxbank:{user_id.name}' elif isinstance(user_id, discord.User): return str(user_id) obj = self.bot.get_user(int(user_id)) if obj is None: # try to find guild obj = self.bot.get_guild(user_id) if obj is not None: obj = f'taxbank:{obj}' if obj is None: # we tried stuff, show a special text if account: if account['type'] == 'user': return f'Unfindable User {user_id}' elif account['type'] == 'guild': return f'Unfindable Guild {user_id}' else: return f'Unfindable Unknown {user_id}' else: return f'Unfindable ID {user_id}' return str(obj)
def guild_accounts(self, guild: discord.Guild, field='amount') -> list: """Fetch all accounts that reference users that are in the guild. Uses caching. """ lock = self.gacct_locks[guild.id] await lock accounts = [] try: userids = None using_cache = False if guild.id in self.acct_cache: userids = self.acct_cache[guild.id] using_cache = True else: userids = [m.id for m in guild.members] for uid in userids: account = await self.get_account(uid) if account: accounts.append(account) if not using_cache: self.acct_cache[guild.id].append(uid) finally: lock.release() # sanity check if lock.locked(): lock.release() return sorted(accounts, key=lambda account: float(account[field]), reverse=True)
def blockreason(self, ctx, anything_id: int): """Get a reason for a block if it exists""" userblock = await self.block_coll.find_one({'user_id': anything_id}) if userblock is not None: e = discord.Embed(title='User blocked') e.description = f'<@{anything_id}> - {userblock.get("reason")}' return await ctx.send(embed=e) guildblock = await self.block_coll.find_one({'guild_id': anything_id}) if guildblock is not None: e = discord.Embed(title='Guild blocked') e.description = f'why? {userblock.get("reason")}' return await ctx.send(embed=e) await ctx.send('Block not found')
def test_lookup(self): m = utils.TypeMap() m.put(discord.abc.User, "something") m.put(discord.TextChannel, "something else") m.put(discord.Guild, "another thing") self.assertEquals("something", m.lookup(discord.abc.User)) self.assertEquals("something", m.lookup(discord.User)) self.assertEquals("something", m.lookup(discord.Member)) self.assertEquals("something else", m.lookup(discord.TextChannel)) self.assertEquals(None, m.lookup(discord.DMChannel))
def test_constructed(self): m = utils.TypeMap({ discord.abc.User: "something", discord.TextChannel: "something else", discord.Guild: "another thing" }) self.assertEquals("something", m.lookup(discord.abc.User)) self.assertEquals("something", m.lookup(discord.User)) self.assertEquals("something", m.lookup(discord.Member)) self.assertEquals("something else", m.lookup(discord.TextChannel)) self.assertEquals(None, m.lookup(discord.DMChannel))
def __init__(self, config, database, redis): self.database = database self.redis = redis self._mapping = utils.TypeMap() self._lru = LRU(2048) self._lru_types = set() self.register_mapping(discord.abc.User, 'member') self.register_mapping(discord.Guild, 'server') self.register_mapping(discord.TextChannel, 'channel')
def get_joinleave_channel(member): """ Fetches instance of `discord.TextChannel` in instance of `discord.Guild` using an instance of `discord.Member`. """ return discord.utils.get(member.guild.channels, name='joinleave')
def is_mod(guild: discord.Guild, member: discord.Member): return discord.utils.get(guild.roles, name="Mods") in member.roles
def on_member_ban(self, guild: Guild, user): if not await self.bot.config_is_set(guild, 'pollr_mod_log'): # not configured to announce bans here return logger.debug('pollr ban process: %d', user.id) # grab bans channel bans = discord.utils.get(guild.text_channels, name='bans') if not bans: logger.debug('not announcing ban, couldn\'t find channel. gid=%d', guild.id) return ban = f'**Ban:** {describe(user)}' # get my permissions perms = guild.me.guild_permissions # check to see if they were banned via dogbot's ban command, and provide cleaner insight information = await self.ban_debounce.check( member_id=user.id, return_information=True, partial=True, wait_period=2.0 ) if information: reason = information['reason'] responsible = information['moderator'] ban += f'\n**Responsible:** {describe(responsible)}\n**Reason:** {reason}' if not information and perms.view_audit_log: # fall back to audit log await asyncio.sleep(1) # wait a bit, because audit logs can be slow logs = await guild.audit_logs(action=AuditLogAction.ban, limit=5).flatten() # grab the entry that targeted our banned user entry = get(logs, target=user) if entry: ban += f'\n**Responsible:** {describe(entry.user)}' if entry.reason: ban += f'\n**Reason:** {entry.reason}' try: # send the ban message await bans.send(ban) except discord.HTTPException as exc: logger.warning('Cannot announce ban, error! gid=%d %s', guild.id, exc)
def get_name(self, user_id: int, account=None): """Get a string representation of a user or guild. Parameters ---------- user_id: int User ID to get a string representation of. account: dict, optional Account object to determine if this is A user or a guild to search. Returns ------- str """ if isinstance(user_id, discord.Guild): return f'taxbank:{user_id.name}' elif isinstance(user_id, discord.User): return str(user_id) obj = self.bot.get_user(int(user_id)) if not obj: # try to find guild obj = self.bot.get_guild(user_id) if obj: obj = f'taxbank:{obj}' if not obj: # we tried stuff, show a special text if account: res = '' if account['type'] == AccountType.USER: res = f'Unfindable User {user_id}' elif account['type'] == AccountType.TAXBANK: res = f'Unfindable Guild {user_id}' else: res = f'Unfindable Unknown {user_id}' return res else: return f'Unfindable ID {user_id}' return str(obj)
def do_heist(self, ctx) -> dict: """Actually does the heist. Returns ------- dict With data about if the heist was successful and the amount stolen, or which members went to jail if it was unsuccessful. """ await self.finish.wait() log.info('Doing the heist') res = { 'success': False, 'jailed': [], 'saved': [], } bot = ctx.bot jcoin = bot.get_cog('Coins') if not jcoin: raise SayException('coins cog not loaded') target_account = await jcoin.get_account(self.target.id) if target_account is None: raise SayException('Guild not found') if target_account['type'] != 'taxbank': raise SayException('Account is not a taxbank') amnt = target_account['amount'] increase_people = len(self.users) * INCREASE_PER_PERSON chance = BASE_HEIST_CHANCE + ((amnt / self.amount) + increase_people) * HEIST_CONSTANT # trim it to 60% success if chance > 6: chance = 6 result = random.uniform(0, 10) res['chance'] = chance res['result'] = result if result < chance: res['success'] = True else: res['success'] = False # 50% chance of every person # in the heist to go to jail for user_id in self.users: if random.random() < 0.5: res['jailed'].append(user_id) else: res['saved'].append(user_id) return res
def heist(self, ctx, target: GuildConverter, amount: CoinConverter): """Heist a server. This works better if you have more people joining in your heist. - As soon as you use this command, a heist join session will start. - This session requires that all other people that want to join the heist to use the "j!heist join" command - There is a timeout of 3 minutes on the heist join session. - If your heist fails, all participants of the heist will be sentenced to jail or not, its random. - If your heist succeedes, you get a type 1 cooldown of 7 hours. it will show you are "regenerating your steal points". """ if amount > 200: return await ctx.send('Cannot heist more than 200JC.') account = await self.coins.get_account(ctx.author.id) if amount > account['amount']: raise self.SayException('You cant heist more than' ' what you currently hold.') for session in self.sessions.values(): if session.target.id == target.id: raise self.SayException('An already existing session ' 'exists with the same target') if target == ctx.guild: raise self.SayException('stealing from the same guild? :thinking:') taxbank = await self.coins.get_account(target.id) if not taxbank: raise self.SayException('Guild taxbank account not found') if amount > taxbank['amount']: raise self.SayException('You cannot steal more than the ' 'guild taxbank currently holds.') session = self.get_sess(ctx, target, True) try: await self.check_user(ctx, session) except self.SayException as err: self.sessions.pop(ctx.guild.id) raise err session.amount = amount session.add_member(ctx.author.id) session.task = self.loop.create_task(session.do_heist(ctx)) await ctx.send('Join session started!') # timeout of 5 minutes accepting members # OR "j!heist raid" await asyncio.sleep(300) if not session.finish.is_set(): await session.process_heist(await session.force_finish())