我们从Python开源项目中,提取了以下15个代码示例,用于说明如何使用telegram.ext.CallbackQueryHandler()。
def main(): updater = Updater(AUTHTOKEN, workers=10) dp = updater.dispatcher dp.add_handler(CommandHandler('start', start)) dp.add_handler(CommandHandler('help', help)) dp.add_handler(CommandHandler('info', info)) dp.add_handler(CommandHandler('support', support, pass_chat_data=True)) dp.add_handler(CommandHandler('getStats', getMessageStats)) dp.add_handler(CommandHandler('chooselang', chooseLanguage, pass_chat_data=True, pass_args=True)) dp.add_handler(MessageHandler(Filters.voice, receiveMessage, pass_chat_data=True)) dp.add_handler(MessageHandler(Filters.all, countme)) dp.add_handler(CallbackQueryHandler(callbackHandler, pass_chat_data=True)) dp.add_error_handler(error) updater.start_polling() logger.debug("Setiup complete, Idling.") updater.idle()
def set_up_commands(self): commands = { 'start': self.command_help, 'help': self.command_help, 'list': self.command_list, 'choose': self.command_choose, } for name, command in commands.items(): self.dispatcher.add_handler(CommandHandler(name, command)) self.dispatcher.add_handler(CallbackQueryHandler(self.command_accept_choice)) self.dispatcher.add_error_handler(self.command_error)
def main(): modloader.load_modules(updater, data) dp.add_handler(CommandHandler('start', start)) dp.add_handler(CommandHandler('help', help_command, pass_args=True)) dp.add_handler(CommandHandler('about', about)) dp.add_handler(CommandHandler('modules', module_list)) dp.add_handler(CommandHandler('settings', settings)) dp.add_handler(CallbackQueryHandler(language, pattern=r'^settings:lang:\w+$')) dp.add_error_handler(lambda bot, update, error: logger.exception('Exception was raised', exc_info=error)) updater.start_polling(clean=True) logger.info('Bot started in {0:.3} seconds'.format(time.time() - start_time)) updater.idle() data.save()
def __init__(self): token = rospy.get_param('/telegram/token', None) if token is None: rospy.logerr("No token found in /telegram/token param server.") exit(0) else: rospy.loginfo("Got telegram bot token from param server.") # Set CvBridge self.bridge = CvBridge() # Create the EventHandler and pass it your bot's token. updater = Updater(token) # Get the dispatcher to register handlers dp = updater.dispatcher # on message... dp.add_handler(MessageHandler(Filters.text, self.pub_received)) dp.add_handler(CallbackQueryHandler(self.button)) # log all errors dp.add_error_handler(self.error) # Start the Bot updater.start_polling()
def main(): updater = Updater("INSERT TOKEN HERE") dp = updater.dispatcher dp.add_handler(CommandHandler("start", cmd_start)) dp.add_handler(CommandHandler("md", cmd_markdown, allow_edited=True)) dp.add_handler(CommandHandler("markdown", cmd_markdown, allow_edited=True)) dp.add_handler(CommandHandler("pin", cmd_pin, allow_edited=True)) dp.add_handler(CommandHandler("welcome", cmd_welcome, allow_edited=True)) dp.add_handler(CommandHandler("goodbye", cmd_goodbye, allow_edited=True)) dp.add_handler(CommandHandler("del_welcome", cmd_clear_welcome)) dp.add_handler(CommandHandler("del_goodbye", cmd_clear_goodbye)) dp.add_handler(CommandHandler("set_bot_admin", cmd_set_bot_admin, allow_edited=True)) dp.add_handler(CommandHandler("remove_bot_admin", cmd_remove_bot_admin, allow_edited=True)) dp.add_handler(CommandHandler("settings", cmd_settings)) dp.add_handler(CommandHandler("shortcut", cmd_shortcut_set, allow_edited=True)) dp.add_handler(CommandHandler("del_shortcut", cmd_shortcut_del, allow_edited=True)) dp.add_handler(CommandHandler("shortcuts", cmd_shortcut_getall, allow_edited=True)) dp.add_handler(MessageHandler(Filters.audio | Filters.command | Filters.contact | Filters.document | Filters.photo | Filters.sticker | Filters.text | Filters.video | Filters.voice | Filters.status_update, msg_parse, allow_edited=True)) dp.add_handler(CallbackQueryHandler(inline_button_callback)) dp.add_error_handler(error) updater.start_polling() updater.idle()
def main(): token = "TOKEN" updater = Updater(token, workers=20) updater.dispatcher.add_handler(CommandHandler('start', start)) updater.dispatcher.add_handler(CommandHandler('help', help)) updater.dispatcher.add_handler(CommandHandler('anime', anime)) updater.dispatcher.add_handler(CommandHandler('hentai', hentai)) updater.dispatcher.add_handler(CommandHandler('ecchi', ecchi)) updater.dispatcher.add_handler(CommandHandler('uncensored', uncensored)) updater.dispatcher.add_handler(CommandHandler('yuri', ecchi)) updater.dispatcher.add_handler(CommandHandler('loli', loli)) updater.dispatcher.add_handler(CommandHandler('neko', neko)) updater.dispatcher.add_handler(CommandHandler('wallpaper', wallpaper)) updater.dispatcher.add_handler(CommandHandler('id', idd)) updater.dispatcher.add_handler(CommandHandler('tag', tag)) updater.dispatcher.add_handler(CommandHandler('ping', ping)) updater.dispatcher.add_handler(CommandHandler('info', info)) updater.dispatcher.add_handler(CommandHandler('su', su)) updater.dispatcher.add_handler(CommandHandler('feedback', feedback)) updater.dispatcher.add_handler(CommandHandler("mes", mes)) updater.dispatcher.add_handler(CallbackQueryHandler(callback)) updater.dispatcher.add_handler(MessageHandler(Filters.text, text)) updater.dispatcher.add_handler(InlineQueryHandler(inline)) updater.dispatcher.add_error_handler(error) updater.start_polling(bootstrap_retries=4, clean=True) updater.idle()
def register_handlers(dispatcher): dispatcher.add_handler(CallbackQueryHandler(participant_button, pattern='(%s|%s)(\d+)' % (PARTICIPANT_CONFIRM, PARTICIPANT_DECLINE), pass_groups=True))
def main(): global mClient, mDatabase mClient = MongoClient(mongoURI) mDatabase = mClient[mDatabase] try: serverInfo = mClient.server_info() logger.info("Mongo Connection Achieved") logger.debug("Connected to Mongo Server: %s" % serverInfo) except: logger.error("Could not connect to Mongo Server at %s" % mongoURI) raise updater = Updater(authToken) dp = updater.dispatcher dp.add_handler(CommandHandler('motd', MOTD)) dp.add_handler(CommandHandler('set_motd', setMOTD)) dp.add_handler(CommandHandler('register_me', registerMe)) dp.add_handler(CallbackQueryHandler(callbackHandler, pattern='RegisterMe')) calendar = calendarEventHandler(mDatabase.groups, updater.job_queue, dp) #polls = pollEventHandler(mDatabase.groups, mDatabase.pollData) #dp.add_handler(polls.pollCreateHandler) dp.add_handler(CallbackQueryHandler(empty_callback, pattern=' ')) updateAdmins = Job(updateChatList, 60*5) updater.job_queue.put(updateAdmins, next_t=0) updater.start_polling() updater.idle()
def __init__(self, mCollection, mPollData): self.mCollection = mCollection self.mPollData = mPollData self.logger = logging.getLogger(__name__) self.POLLQUESTION, self.POLLANSWER, self.POLLGROUP = range(100,103) self.POLLGETANSWERING, self.POLLCAST = range(200,202) self.pollCreateHandler = ConversationHandler( entry_points=[CommandHandler('createpoll', self.pollStartEditing, pass_user_data=True)], states = { self.POLLQUESTION:[MessageHandler(Filters.text, self.pollQuestionReceived, pass_user_data=True)], self.POLLANSWER: [MessageHandler(Filters.text, self.pollAnswerReceived, pass_user_data=True), CommandHandler('done', self.pollAskForGroup, pass_user_data=True)], self.POLLGROUP: [MessageHandler(Filters.text, self.pollCreatePoll, pass_user_data=True)]}, fallbacks=[MessageHandler('cancel', self.pollCancel, pass_user_data=True)]) self.pollAnswerHandler = ConversationHandler( entry_points=[CommandHandler('answerpoll', self.answerPollList, pass_user_data=True)], states = { self.POLLGETANSWERING : [CallbackQueryHandler()]} )
def main(): try: serverInfo = MCLIENT.server_info() logger.info("Connected to Mongo Server: %s." % serverInfo) except: logger.error("Could not connect to the Mongo Server.") raise updater = Updater(AUTHTOKEN) dp = updater.dispatcher dp.add_handler(CommandHandler('start',start, pass_user_data=True)) dp.add_handler(CommandHandler('cancel',start, pass_user_data=True)) dp.add_handler(CommandHandler('resolve',resolve, pass_user_data=True)) dp.add_handler(CommandHandler('help',help, pass_user_data=True, pass_chat_data=True)) dp.add_handler(CommandHandler('info',info)) dp.add_handler(CallbackQueryHandler(callbackResponseHandler, pass_user_data=True)) dp.add_handler(MessageHandler(Filters.status_update, statusReceived)) dp.add_handler(MessageHandler(Filters.all, messageReceived, pass_user_data=True)) dp.add_error_handler(error) updater.start_polling() updateAdmins = Job(updateChatList, 60*15) updater.job_queue.put(updateAdmins, next_t=0) updater.idle()
def main(): utils.create_database() updater = Updater(token=TOKEN_ID) # Get the dispatcher to register handlers dp = updater.dispatcher # on different commands - answer in Telegram dp.add_handler(CommandHandler('start', start, pass_args=True)) dp.add_handler(CommandHandler('help', start)) dp.add_handler(CommandHandler('who', ts_stats)) dp.add_handler(CommandHandler('ts', utils.ts_view)) dp.add_handler(CommandHandler('mention', mention_toggle)) dp.add_handler(CommandHandler('generate', generate_invitation, Filters.user(user_id=ADMIN_ID))) dp.add_handler(CommandHandler('notify', notify, Filters.user(user_id=ADMIN_ID), pass_args=True)) dp.add_handler(CommandHandler('id', get_id)) dp.add_handler(RegexHandler(r'(?i).*\@flandas\b', utils.mention_forwarder)) dp.add_handler(CallbackQueryHandler(utils.callback_query_handler, pass_chat_data=True)) dp.add_handler(CommandHandler('users', utils.send_users_tsdb, Filters.user(user_id=ADMIN_ID), pass_chat_data=True)) dp.add_handler(CommandHandler('groups', utils.send_ts_groups, Filters.user(user_id=ADMIN_ID), pass_chat_data=True)) dp.add_handler(MessageHandler(filter_assign_alias, utils.assign_user_alias_step2, pass_chat_data=True)) gm_handler = ConversationHandler( allow_reentry=True, entry_points=[CommandHandler('gm', utils.pre_send_gm)], states={ 0: [MessageHandler(Filters.text, utils.send_gm)], }, fallbacks=[CommandHandler('cancel', cancel), CommandHandler('headshot', cancel)] ) dp.add_handler(gm_handler) # Add response for unknown private messages dp.add_handler(MessageHandler(Filters.private, unknown)) # log all errors dp.add_error_handler(log_error) # Start the Bot updater.start_polling() # Run the bot until the you presses Ctrl-C or the process receives SIGINT, # SIGTERM or SIGABRT. This should be used most of the time, since # start_polling() is non-blocking and will stop the bot gracefully. updater.idle()
def test_callback(self): # first insert the callbackhandler, register it and start polling def button(bot, update): query = update.callback_query bot.editMessageText(text="Selected option: %s" % query.data, chat_id=query.message.chat_id, message_id=query.message.message_id) dp = self.updater.dispatcher dp.add_handler(CallbackQueryHandler(button)) self.updater.start_polling() # the start callback in this example generates a message that will be edited, so let's mimick that message # for future reference keyboard = [[InlineKeyboardButton("Option 1", callback_data='1'), InlineKeyboardButton("Option 2", callback_data='2')], [InlineKeyboardButton("Option 3", callback_data='3')]] reply_markup = InlineKeyboardMarkup(keyboard) chat = self.cg.get_chat() start_message = self.bot.sendMessage(chat_id=chat.id, text='Please choose:', reply_markup=reply_markup) # now let's create some callback query's to send u1 = self.cqg.get_callback_query(message=start_message, data="1") u2 = self.cqg.get_callback_query(message=start_message, data="2") u3 = self.cqg.get_callback_query(message=start_message, data="3") # And test them one by one self.bot.insertUpdate(u1) data = self.bot.sent_messages[-1] self.assertEqual(data['text'], "Selected option: 1") self.assertEqual(data['chat_id'], start_message.chat.id) self.assertEqual(data['message_id'], start_message.message_id) self.bot.insertUpdate(u2) data = self.bot.sent_messages[-1] self.assertEqual(data['text'], "Selected option: 2") self.assertEqual(data['chat_id'], start_message.chat.id) self.assertEqual(data['message_id'], start_message.message_id) self.bot.insertUpdate(u3) data = self.bot.sent_messages[-1] self.assertEqual(data['text'], "Selected option: 3") self.assertEqual(data['chat_id'], start_message.chat.id) self.assertEqual(data['message_id'], start_message.message_id) # stop polling self.updater.stop()
def main(): db.db.connect() try: db.db.create_tables([db.Conversation, db.User, db.IsMod, db.Vote]) except pw.OperationalError: pass updater = Updater(settings.KEY) dispatcher = updater.dispatcher dispatcher.add_handler(CommandHandler('start', start)) dispatcher.add_handler(CommandHandler('help', send_help_message)) dispatcher.add_handler(CommandHandler('auto', set_auto)) dispatcher.add_handler(CommandHandler('location', set_location, pass_args=True)) dispatcher.add_handler(CommandHandler('set_account', set_account)) dispatcher.add_handler(CommandHandler('unlink', unlink)) dispatcher.add_handler(CommandHandler('matches', send_matches)) dispatcher.add_handler(CallbackQueryHandler(inline.do_press_inline_button, pass_job_queue=True)) dispatcher.add_handler(MessageHandler(Filters.text, message_handler, pass_job_queue=True)) dispatcher.add_handler(MessageHandler(Filters.location, update_location)) dispatcher.add_handler(CommandHandler('new_vote', start_vote_session, pass_job_queue=True)) dispatcher.add_handler(CommandHandler('timeout', set_timeout, pass_args=True)) dispatcher.add_handler(CommandHandler('about', send_about)) # Chat functionality dispatcher.add_handler(CommandHandler('poll_msgs', chat.poll_messages, pass_args=True)) dispatcher.add_handler(CommandHandler('poll_unanswered', chat.poll_unanswered_messages, pass_args=True)) dispatcher.add_handler(CommandHandler('unblock', chat.unblock)) # Settings dispatcher.add_handler(CommandHandler('set_setting', admin.set_setting, pass_args=True)) dispatcher.add_handler(CommandHandler('list_settings', admin.list_settings)) dispatcher.add_handler(CommandHandler('help_settings', admin.help_settings)) # Moderators dispatcher.add_handler(CommandHandler('make_me_a_mod', admin.make_me_a_mod)) inline_caps_handler = InlineQueryHandler(inline.inline_preview) dispatcher.add_handler(inline_caps_handler) dispatcher.add_handler(MessageHandler(Filters.command, custom_command_handler)) dispatcher.add_error_handler(error_callback) updater.start_polling() updater.idle()