我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用telegram.Update()。
def test_requires_usergroup_acc(self): """ Test requires usergroup decorator if the user has access """ with patch("ownbot.auth.User") as user_mock,\ patch("test_auth.Update") as update_mock: user_mock = user_mock.return_value user_mock.has_acces.return_value = True @ownbot.auth.requires_usergroup("foo") def my_command_handler(bot, update): """Dummy command handler""" print(bot, update) return True bot_mock = Mock(spec=Bot) update_mock = Update(1337) called = my_command_handler(bot_mock, update_mock) self.assertTrue(called)
def test_requires_usergroup_self(self): """ Test requires usergroup decorator with self as first argument. """ with patch("ownbot.auth.User") as user_mock,\ patch("test_auth.Update") as update_mock: user_mock = user_mock.return_value user_mock.has_acces.return_value = True @ownbot.auth.requires_usergroup("foo") def my_command_handler(self, bot, update): """Dummy command handler""" print(self, bot, update) return True bot_mock = Mock(spec=Bot) update_mock = Update(1337) called = my_command_handler(None, bot_mock, update_mock) self.assertTrue(called)
def test_assign_first_to(self): """ Test assign first to decorator. """ with patch("ownbot.auth.User") as user_mock,\ patch("test_auth.Update") as update_mock,\ patch("ownbot.auth.UserManager") as usrmgr_mock: user_mock = user_mock.return_value usrmgr_mock.return_value.group_is_empty.return_value = True @ownbot.auth.assign_first_to("foo") def my_command_handler(bot, update): """Dummy command handler""" print(bot, update) bot_mock = Mock(spec=Bot) update_mock = Update(1337) my_command_handler(bot_mock, update_mock) self.assertTrue(usrmgr_mock.return_value.group_is_empty.called) self.assertTrue(user_mock.save.called)
def test_assign_first_to_with_self(self): """ Test assign first to decorator with self as first argument. """ with patch("ownbot.auth.User") as user_mock,\ patch("test_auth.Update") as update_mock,\ patch("ownbot.auth.UserManager") as usrmgr_mock: user_mock = user_mock.return_value usrmgr_mock.return_value.group_is_empty.return_value = True @ownbot.auth.assign_first_to("foo") def my_command_handler(self, bot, update): """Dummy command handler""" print(self, bot, update) bot_mock = Mock(spec=Bot) update_mock = Update(1337) my_command_handler(None, bot_mock, update_mock) self.assertTrue(usrmgr_mock.return_value.group_is_empty.called) self.assertTrue(user_mock.save.called)
def admin_access(): def access(func): @wraps(func) def wrapped(self, bot: Bot, update: Update, *args, **kwargs): user = update.effective_user admins = getattr(self, 'admins', None) if admins is None: self.logger.warning('Specify self.admins (list of users ids) parameter in ' 'manager or channel classes in order to restrict access to bot.') return func(self, bot, update, *args, **kwargs) if user.id not in admins: self.logger.info(f"Unauthorized access denied for {user}.") return return func(self, bot, update, *args, **kwargs) return wrapped return access
def create_telegram_update(message_text): """ Helper function: create an "Update" to simulate a message sent to the Telegram bot. """ from datetime import datetime message = telegram.Message( message_id=0, from_user=telegram.User(0, 'greenkey'), date=datetime.now(), chat=telegram.Chat(0, ''), text=message_text ) return telegram.Update( update_id=0, message=message )
def check_update(self, update): if (isinstance(update, Update) and (update.message or update.edited_message and self.allow_edited)): message = update.message or update.edited_message if message.text: command = message.text[1:].split(' ')[0].split('@') command.append( message.bot.username) # in case the command was send without a username if self.filters is None: res = True elif isinstance(self.filters, list): res = any(func(message) for func in self.filters) else: res = self.filters(message) return res and (message.text.startswith('/') and command[0] == self.command and command[1].lower() == message.bot.username.lower()) else: return False else: return False
def admin_access(admins_list=None): def access(func): @wraps(func) def wrapped(self, bot: Bot, update: Update, *args, **kwargs): user = update.effective_user admins = None if admins_list is None: admins = getattr(self, 'admins', None) if admins is None: self.logger.warning('Specify self.admins (list of users ids) parameter in ' 'manager or channel classes in order to restrict access to bot.') return func(self, bot, update, *args, **kwargs) if user.id not in admins: self.logger.info(f"Unauthorized access denied for {user}.") return return func(self, bot, update, *args, **kwargs) return wrapped return access
def command_accept_choice(self, bot: Bot, update: Update): query = update.callback_query update.message = query.message # because callback update doesn't have message at all chat_id = update.message.chat_id chosen_channel = self.chosen_channels.get(chat_id, None) if chosen_channel: chosen_channel.remove_commands_handlers(chat_id) chosen_channel: BaseChannel = self.channels[query.data] self.chosen_channels[chat_id] = chosen_channel chosen_channel.add_commands_handlers(chat_id) bot.edit_message_text(text=f'Chose {query.data} ({chosen_channel.channel_id}).', chat_id=query.message.chat_id, message_id=query.message.message_id) help = chosen_channel.help_text() update.message.reply_text('```\n' + help + '\n```', parse_mode=ParseMode.MARKDOWN)
def distribute(self, bot: Bot, update: Update, args): if args.help: self.send_code(update, args.help) elif args.command == 'add': usage = self.subparsers['add'].format_usage() self.add(bot, update, args.tags, usage) elif args.command in ['remove', 'delete']: usage = self.subparsers['remove'].format_usage() self.remove(bot, update, args.tags, usage) elif args.command == 'show': self.show(bot, update) else: self.logger.error('Bad args: ' + str(args)) raise Exception # will never get this far (hopefully)
def distribute(self, bot: Bot, update: Update, args): if args.help: self.send_code(update, args.help) elif args.command == 'add': usage = self.subparsers['add'].format_usage() self.add(bot, update, args.keys, usage) elif args.command in ['remove', 'delete']: usage = self.subparsers['remove'].format_usage() self.remove(bot, update, args.keys, usage) elif args.command == 'show': self.show(bot, update) else: self.logger.error('Bad args: ' + str(args)) raise Exception # will never get this far (hopefully)
def add(self, bot: Bot, update: Update, args: List[str], usage: str): if len(args) != 2: self.send_code(update, usage) return subreddits = {} name, score = args if score.isdecimal(): score = int(score) else: self.send_code(update, usage) return subreddits[name] = score self.store.add(subreddits) self.show(bot, update)
def add_keyword(bot: telegram.Bot, update: telegram.Update, args: list): if update.message.chat_id not in global_vars.admin_list['TG']: return if len(args) == 0: update.message.reply_text('Usage: /add_keyword keyword1 keyword2 ...') return for keyword in args: logger.debug('keyword: ' + keyword) if keyword in global_vars.filter_list['keywords']: update.message.reply_text('Keyword: "' + keyword + '" already in list') continue global_vars.filter_list['keywords'].append(keyword) update.message.reply_text('Done.') save_data()
def add_channel(bot: telegram.Bot, update: telegram.Update): if update.message.forward_from_chat: if update.message.forward_from_chat.type != 'channel': # it seems forward_from_chat can only be channels update.message.reply_text( 'Message type error. Please forward me a message from channel, or use /cancel to stop') return if update.message.forward_from_chat.id not in global_vars.filter_list['channels']: global_vars.filter_list['channels'].append(update.message.forward_from_chat.id) save_data() update.message.reply_text('Okay, please send me another, or use /cancel to stop') else: update.message.reply_text('Already in list. Send me another or use /cancel to stop') return CHANNEL else: if update.message.text == '/cancel': update.message.reply_text('Done.') return ConversationHandler.END else: update.message.reply_text( 'Message type error. Please forward me a message from channel, or use /cancel to stop') return CHANNEL
def audio_from_telegram(bot: telegram.Bot, update: telegram.Update): message: telegram.Message = update.message tg_group_id = message.chat_id # telegram group id forward_index = get_forward_index(tg_group_id=tg_group_id) reply_entity = list() reply_entity.append({ 'type': 'text', 'data': {'text': '[ ?? ]'} }) qq_message_id = send_from_tg_to_qq(forward_index, message=reply_entity, tg_group_id=tg_group_id, tg_user=message.from_user, tg_forward_from=message, tg_reply_to=message.reply_to_message) global_vars.mdb.append_message(qq_message_id, message.message_id, forward_index, 0)
def location_from_telegram(bot: telegram.Bot, update: telegram.Update): message: telegram.Message = update.message tg_group_id = message.chat_id # telegram group id forward_index = get_forward_index(tg_group_id=tg_group_id) latitude = message.location.latitude longitude = message.location.longitude reply_entity = list() reply_entity.append({ 'type': 'text', 'data': {'text': '????????' + get_location_from_baidu(latitude, longitude)} }) qq_message_id = send_from_tg_to_qq(forward_index, message=reply_entity, tg_group_id=tg_group_id, tg_user=message.from_user, tg_forward_from=message, tg_reply_to=message.reply_to_message) global_vars.mdb.append_message(qq_message_id, message.message_id, forward_index, 0)
def authorized_only(command_handler: Callable[[Bot, Update], None]) -> Callable[..., Any]: """ Decorator to check if the message comes from the correct chat_id :param command_handler: Telegram CommandHandler :return: decorated function """ def wrapper(*args, **kwargs): bot, update = kwargs.get('bot') or args[0], kwargs.get('update') or args[1] # Reject unauthorized messages chat_id = int(_CONF['telegram']['chat_id']) if int(update.message.chat_id) != chat_id: logger.info('Rejected unauthorized message from: %s', update.message.chat_id) return wrapper logger.info('Executing handler: %s for chat_id: %s', command_handler.__name__, chat_id) try: return command_handler(*args, **kwargs) except BaseException: logger.exception('Exception occurred within Telegram module') return wrapper
def _status_table(bot: Bot, update: Update) -> None: """ Handler for /status table. Returns the current TradeThread status in table format :param bot: telegram bot :param update: message update :return: None """ # Fetch open trade trades = Trade.query.filter(Trade.is_open.is_(True)).all() if get_state() != State.RUNNING: send_msg('*Status:* `trader is not running`', bot=bot) elif not trades: send_msg('*Status:* `no active order`', bot=bot) else: trades_list = [] for trade in trades: # calculate profit and send message to user current_rate = exchange.get_ticker(trade.pair)['bid'] trades_list.append([ trade.id, trade.pair, shorten_date(arrow.get(trade.open_date).humanize(only_distance=True)), '{:.2f}'.format(100 * trade.calc_profit(current_rate)) ]) columns = ['ID', 'Pair', 'Since', 'Profit'] df_statuses = DataFrame.from_records(trades_list, columns=columns) df_statuses = df_statuses.set_index(columns[0]) message = tabulate(df_statuses, headers='keys', tablefmt='simple') message = "<pre>{}</pre>".format(message) send_msg(message, parse_mode=ParseMode.HTML)
def _balance(bot: Bot, update: Update) -> None: """ Handler for /balance Returns current account balance per crypto """ output = '' balances = [ c for c in exchange.get_balances() if c['Balance'] or c['Available'] or c['Pending'] ] if not balances: output = '`All balances are zero.`' for currency in balances: output += """*Currency*: {Currency} *Available*: {Available} *Balance*: {Balance} *Pending*: {Pending} """.format(**currency) send_msg(output)
def _count(bot: Bot, update: Update) -> None: """ Handler for /count. Returns the number of trades running :param bot: telegram bot :param update: message update :return: None """ if get_state() != State.RUNNING: send_msg('`trader is not running`', bot=bot) return trades = Trade.query.filter(Trade.is_open.is_(True)).all() message = tabulate({ 'current': [len(trades)], 'max': [_CONF['max_open_trades']] }, headers=['current', 'max'], tablefmt='simple') message = "<pre>{}</pre>".format(message) logger.debug(message) send_msg(message, parse_mode=ParseMode.HTML)
def _help(bot: Bot, update: Update) -> None: """ Handler for /help. Show commands of the bot :param bot: telegram bot :param update: message update :return: None """ message = """ */start:* `Starts the trader` */stop:* `Stops the trader` */status [table]:* `Lists all open trades` *table :* `will display trades in a table` */profit:* `Lists cumulative profit from all finished trades` */forcesell <trade_id>|all:* `Instantly sells the given trade or all trades, regardless of profit` */performance:* `Show performance of each finished trade grouped by pair` */count:* `Show number of trades running compared to allowed number of trades` */balance:* `Show account balance per currency` */help:* `This help message` */version:* `Show version` """ send_msg(message, bot=bot)
def helpme(bot, thing, arguments): """Visualizza il messaggio di aiuto di un comando. Sintassi: `{symbol}helpme [comando]`""" # Set status to typing await status_typing(bot, thing) # If no command is specified, show the help message for this command. if len(arguments) == 0 or len(arguments) > 1: await display_help(bot, thing, helpme) return # Check the list of telegram commands if the message was sent from Telegram if isinstance(thing, telegram.Update): if arguments[0] in b.commands: await display_help(bot, thing, b.commands[arguments[0]]) else: await answer(bot, thing, "? Il comando specificato non esiste.") # Check the list of discord commands if the message was sent from Discord if isinstance(thing, extradiscord.discord.Message): if arguments[0] in d.commands: await display_help(bot, thing, d.commands[arguments[0]]) else: await answer(bot, thing, "? Il comando specificato non esiste.")
def job_updatelol(singletimeout=1, alltimeout=1800): await d.client.wait_until_ready() while True: # Open a new database session session = database.Session() # Query all the LoL accounts users = session.query(database.LoL).all() # Update all the users' stats for user in users: updates = await user.update_data() if updates: # Send some info to Discord await d.client.send_message(d.client.get_channel(247426005537390592), "Account aggiornato!", embed=user.generate_discord_embed()) await asyncio.sleep(singletimeout) session.commit() await asyncio.sleep(alltimeout)
def cmd_list_tokens(bot: telegram.Bot, update: telegram.Update): cid = update.message.chat_id if generic_checks(bot, update, "listtokens", "", 1): # Extract value text = update.message.text cmd, tag = text.split(" ", 1) q = Query() tokens = tokens_db.search(q.poll_tag == tag) msg = "There have been %i tokens generated for your poll:\n" % len( tokens) msg = msg + "\n".join( ["%i: %s" % (n + 1, t['token']) for n, t in enumerate(tokens)]) bot.sendMessage(cid, msg)
def cmd_vote(bot: telegram.Bot, update: telegram.Update): cid = update.message.chat_id uid = update.message.from_user.id polls = get_polls(uid, bot) if update.message.chat.type != "private": bot.sendMessage(cid, text_private_chat_only) return ConversationHandler.END if len(polls) == 0: bot.sendMessage(cid, "You aren't eligible to vote in any polls.") else: keyboard_choices = [p["tag"] + ": " + p["title"] for p in polls] # keyboard array is a list of lists # because each list represents a new row # and we want each button on a separate row keyboard_array = [[k, ] for k in keyboard_choices] keyboard = ReplyKeyboardMarkup(keyboard_array, one_time_keyboard=True) bot.sendMessage(cid, "Click the button for the poll you would like to vote in.", reply_markup=keyboard) return state_vote_1
def cmd_set_generic(bot: telegram.Bot, update: telegram.Update, field_name: str, cmd_name: str, example: str): if generic_checks(bot, update, cmd_name, example, 2): # Extract value text = update.message.text cmd, tag, value = text.split(" ", 2) # Apply edit and edit_poll(tag, field_name, value) # send "success" feedback message new_p = get_poll(tag) msg = text_edit_successful.format(p=new_p) cid = update.message.chat_id bot.sendMessage(cid, msg) # Check if complete, and suggest to activate if check_if_complete(tag): msg2 = text_activate_suggestion.format(t=tag) bot.sendMessage(cid, msg2, parse_mode="Markdown") else: # A generic check failed. pass return
def execute(self, chat_id, bot: Bot, update: Update): handled = False if update.message.text in self._buildings_dict: handled = True building_identifier = self._buildings_dict[update.message.text] classrooms = self.get_classroom_source().get_classrooms_in_building(building_identifier) keyboard_button = [] counter = 0 row = -1 for classroom in classrooms: if counter == 0: keyboard_button.append([]) row += 1 keyboard_button[row].append(KeyboardButton("/"+classroom.get_name().lower())) counter += 1 if counter == 3: counter = 0 keyboard_button.append(["/buildings"]) reply_keyboard = ReplyKeyboardMarkup(keyboard_button) bot.send_message(chat_id=chat_id, text="Available classrooms", reply_markup=reply_keyboard) return handled
def help_about(bot: telegram.Bot, update: telegram.Update): bot.send_message( chat_id=update.message.chat_id, text=strings.HELP_MESSAGE, parse_mode='HTML', disable_web_page_preview=True, reply_markup=strings.HELP_MESSAGE_KEYBOARD ) bot_types.Botan.track( uid=update.message.chat_id, message=update.message.to_dict(), name='help.about' ) logging.info('Command help: about.', extra={ 'telegram': { 'update': update.to_dict(), 'chat_id': update.message.chat_id, 'message_id': update.message.message_id, }, 'id': extentions.TextHelper.get_md5(str(update.message.chat_id) + str(update.message.message_id)) })
def settings_message(bot: telegram.Bot, update: telegram.Update): chat_settings = bot_types.ChatSettings.from_db(db, update.message.chat, admin_id=update.message.from_user.id, admin_name=update.message.from_user.first_name) if not chat_settings.admin_only or chat_settings.admin_id == update.message.from_user.id: send_settings_message(bot, chat_settings) bot_types.Botan.track( uid=update.message.chat_id, message=update.message.to_dict(), name='settings.main' ) logging.info('Command settings: main.', extra={ 'telegram': { 'update': update.to_dict(), 'chat_id': update.message.chat_id, 'message_id': update.message.message_id, }, 'id': extentions.TextHelper.get_md5(str(update.message.chat_id) + str(update.message.message_id)) })
def settings_voice_message(bot: telegram.Bot, update: telegram.Update): chat_settings = bot_types.ChatSettings.from_db(db, update.message.chat, admin_id=update.message.from_user.id, admin_name=update.message.from_user.first_name) if not chat_settings.admin_only or chat_settings.admin_id == update.message.from_user.id: send_settings_voice_message(bot, update.message.chat_id) bot_types.Botan.track( uid=update.message.chat_id, message=update.message.to_dict(), name='settings.voice.get' ) logging.info('Command settings: voice.', extra={ 'telegram': { 'update': update.to_dict(), 'chat_id': update.message.chat_id, 'message_id': update.message.message_id, }, 'id': extentions.TextHelper.get_md5(str(update.message.chat_id) + str(update.message.message_id)) })
def settings_emotion_message(bot: telegram.Bot, update: telegram.Update): chat_settings = bot_types.ChatSettings.from_db(db, update.message.chat, admin_id=update.message.from_user.id, admin_name=update.message.from_user.first_name) if not chat_settings.admin_only or chat_settings.admin_id == update.message.from_user.id: send_settings_emotion_message(bot, update.message.chat_id) bot_types.Botan.track( uid=update.message.chat_id, message=update.message.to_dict(), name='settings.emotions.get' ) logging.info('Command settings: emotion.', extra={ 'telegram': { 'update': update.to_dict(), 'chat_id': update.message.chat_id, 'message_id': update.message.message_id, }, 'id': extentions.TextHelper.get_md5(str(update.message.chat_id) + str(update.message.message_id)) })
def handle_update(self, update, dispatcher): """Send the update to the :attr:`callback`. Args: update (:class:`telegram.Update`): Incoming telegram update. dispatcher (:class:`telegram.ext.Dispatcher`): Dispatcher that originated the Update. """ optional_args = self.collect_optional_args(dispatcher, update) message = update.message or update.edited_message if self.pass_args: optional_args['args'] = message.text.split()[1:] return self.callback(dispatcher.bot, update, **optional_args)
def check_update(self, update): """ Determines whether an update should be passed to this handlers :attr:`callback`. Args: update (:class:`telegram.Update`): Incoming telegram update. Returns: :obj:`bool` """ if isinstance(update, Update) and update.inline_query: if self.pattern: if update.inline_query.query: match = re.match(self.pattern, update.inline_query.query) return bool(match) else: return True
def handle_update(self, update, dispatcher): """ Send the update to the :attr:`callback`. Args: update (:class:`telegram.Update`): Incoming telegram update. dispatcher (:class:`telegram.ext.Dispatcher`): Dispatcher that originated the Update. """ optional_args = self.collect_optional_args(dispatcher, update) if self.pattern: match = re.match(self.pattern, update.inline_query.query) if self.pass_groups: optional_args['groups'] = match.groups() if self.pass_groupdict: optional_args['groupdict'] = match.groupdict() return self.callback(dispatcher.bot, update, **optional_args) # old non-PEP8 Handler methods
def check_update(self, update): """Determines whether an update should be passed to this handlers :attr:`callback`. Args: update (:class:`telegram.Update`): Incoming telegram update. Returns: :obj:`bool` """ if not isinstance(update, Update) and not update.effective_message: return False if any([(self.message_updates and update.message), (self.edited_updates and update.edited_message), (self.channel_post_updates and update.channel_post)]) and \ update.effective_message.text: match = re.match(self.pattern, update.effective_message.text) return bool(match) return False
def handle_update(self, update, dispatcher): """Send the update to the :attr:`callback`. Args: update (:class:`telegram.Update`): Incoming telegram update. dispatcher (:class:`telegram.ext.Dispatcher`): Dispatcher that originated the Update. """ optional_args = self.collect_optional_args(dispatcher, update) if self.pattern: match = re.match(self.pattern, update.callback_query.data) if self.pass_groups: optional_args['groups'] = match.groups() if self.pass_groupdict: optional_args['groupdict'] = match.groupdict() return self.callback(dispatcher.bot, update, **optional_args)
def set_auto(bot: Bot, update: Update): """ Handles /auto command :param bot: :param update: :return: """ global data chat_id = update.message.chat_id if chat_id in data.conversations: data.conversations[chat_id].auto = not data.conversations[chat_id].auto if data.conversations[chat_id].auto: message = "Automatic mode enabled." else: message = "Automatic mode disabled." send_custom_message(bot=bot, chat_id=chat_id, message=message) else: send_error(bot=bot, chat_id=chat_id, name="account_not_setup")
def unlink(bot: Bot, update: Update): """ Handles /unlink command :param bot: :param update: :return: """ global data sender = update.message.from_user.id chat_id = update.message.chat_id if chat_id in data.conversations: if sender == data.conversations[chat_id].owner: del data.conversations[chat_id] send_message(bot, chat_id, name="account_unlinked") else: send_error(bot, chat_id=chat_id, name="command_not_allowed") else: send_error(bot, chat_id=chat_id, name="account_not_setup")
def _db_update_slave_chats_cache(self, chats): """ Update all slave chats info cache to database. Triggered by retrieving the entire list of chats from all slaves by the method `slave_chats_pagination`. Args: chats (list of dict): a list of dicts generated by method `_make_chat_dict` """ for i in chats: db.set_slave_chat_info(slave_channel_id=i['channel_id'], slave_channel_name=i['channel_name'], slave_channel_emoji=i['channel_emoji'], slave_chat_uid=i['chat_uid'], slave_chat_name=i['chat_name'], slave_chat_alias=i['chat_alias'], slave_chat_type=i['type'])
def suggested_recipient(self, bot, chat_id, msg_id, param): storage_id = "%s.%s" % (chat_id, msg_id) if param.startswith("chat "): update = telegram.update.Update.de_json(self.msg_storage[storage_id]["update"], bot) chat = self.msg_storage[storage_id]['chats'][int(param.split(' ', 1)[1])] self.process_telegram_message(bot, update, channel_id=chat['channel_id'], chat_id=chat['chat_uid']) bot.edit_message_text("Delivering the message to %s%s %s: %s." % (chat['channel_emoji'], utils.Emojis.get_source_emoji(chat['type']), chat['channel_name'], chat['chat_name'] if not chat['chat_alias'] or chat['chat_alias'] == chat['chat_name'] else "%s (%s)" % (chat['chat_alias'], chat['chat_name'])), chat_id=chat_id, message_id=msg_id) elif param == Flags.CANCEL_PROCESS: bot.edit_message_text("Error: No recipient specified.\n" "Please reply to a previous message.", chat_id=chat_id, message_id=msg_id) else: bot.edit_message_text("Error: No recipient specified.\n" "Please reply to a previous message.\n\n" "Invalid parameter (%s)." % param, chat_id=chat_id, message_id=msg_id)
def __get_dummy_update(): """Returns a dummy update instance""" user = User(1337, "@foouser") chat = Chat(1, None) message = Message(1, user, datetime.now(), chat) return Update(1, message=message)
def command_get_tags(self, bot: Bot, update: Update): del bot tags = self.store.get_tags() tags = [f'` - {tag}`' for tag in tags] tags = '\n'.join(tags) text = 'Banned tags:\n' + tags update.message.reply_text(text, parse_mode=ParseMode.MARKDOWN)
def command_add_tags(self, bot: Bot, update: Update, args: List[str]): help_msg = 'Usage: `/add_tags <tag> [<tag>]*`' if len(args) == 0: update.message.reply_text(help_msg, parse_mode=ParseMode.MARKDOWN) return self.store.add_tags(args) self.command_get_tags(bot, update)
def command_remove_tags(self, bot: Bot, update: Update, args: List[str]): help_msg = 'Usage: `/remove_tags <tag> [<tag>]*`' if len(args) == 0: update.message.reply_text(help_msg, parse_mode=ParseMode.MARKDOWN) return self.store.remove_tags(args) self.command_get_tags(bot, update)
def command_get_settings(self, bot: Bot, update: Update): del bot ss = [] for key, value in self.settings.items(): ss.append(f'` - {key} = {value}`') ss = '\n'.join(ss) msg = 'Settings list:\n' + ss update.message.reply_text(msg, parse_mode=ParseMode.MARKDOWN)
def command_change_setting(self, bot: Bot, update: Update, args: List[str]): help_msg = 'Usage: `/setting <key> <value>`' if len(args) != 2: update.message.reply_text(help_msg, parse_mode=ParseMode.MARKDOWN) return key, value = args try: self.settings.set(key, value) except SettingsError as e: update.message.reply_text(str(e)) self.command_get_settings(bot, update)
def command_remove(self, bot: Bot, update: Update, args: List[str]): if len(args) == 0 or len(args) < 1: update.message.reply_text('Usage: /remove <subreddit> [<subreddit> ...]') return self.store.remove(args) self.command_list(bot, update)
def command_list(self, bot: Bot, update: Update): del bot subreddits = self.store.get() title = 'subreddit'.ljust(13) text = f"`{title} limit score`\n" for name, score in subreddits.items(): text += f'` - {name:10s} {score}`\n' update.message.reply_text(text, parse_mode=ParseMode.MARKDOWN)
def command_help(self, bot: Bot, update: Update): text = "/help or /start - print this message.\n" \ "/choose - choose channels.\n" \ "/list - print list of available channels.\n" if self.chosen_channel: channel_help = self.chosen_channel.help_text() or ' - no commands' text += f"\n`{self.chosen_channel.label} ({self.chosen_channel.name})` commands:\n" + channel_help bot.send_message(chat_id=update.message.chat_id, text=text)