我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用telegram.ext.Filters.text()。
def start_bot(): my_bot = Updater(settings.TELEGRAM_API_KEY) dp = my_bot.dispatcher dp.add_handler(CommandHandler("start", reply_to_start_command)) conv_handler = ConversationHandler( entry_points=[RegexHandler('^(????????? ??????)$', start_anketa, pass_user_data=True)], states={ "name": [MessageHandler(Filters.text, get_name, pass_user_data=True)], "attitude": [RegexHandler('^(1|2|3|4|5)$', attitude, pass_user_data=True)], "understanding": [RegexHandler('^(1|2|3|4|5)$', understanding, pass_user_data=True)], "comment": [MessageHandler(Filters.text, comment, pass_user_data=True), CommandHandler('skip', skip_comment, pass_user_data=True)], }, fallbacks=[MessageHandler(Filters.text, dontknow, pass_user_data=True)] ) dp.add_handler(conv_handler) my_bot.start_polling() my_bot.idle()
def register_command(self, commands, callback, allow_edited=False): """Registers commands handler Args: commands(list|tuple): list of commands to register callback(function): callable object to execute allow_edited(Optional[bool]): pass edited messages Raises: ValueError: if one of commands in ``commands`` was already registered """ for command in commands: self._register_command(command) @utils.log(logger, print_ret=False) def process_update(bot, update): lang = utils.get_lang(self._storage, update.effective_user) callback(update.effective_message, update.effective_message.text.split(' ')[1:], lang) self._dispatcher.add_handler(CommandHandler(commands, process_update, allow_edited=allow_edited))
def _bad_keyword_detect(self, text): keywords = ( '??', '??', '??' ) bad_words = ( '??', '??', '??', # ?? '??', '??', # ?? ) for i in keywords: if i in text: for j in bad_words: if j in text: return True return False
def filter(self, message): # First filter by weekday: we don't work on weekends date = TZ.normalize(message.date.replace(tzinfo=pytz.utc)) if not is_workday(date): return False # Then filter by time: we work in range of [9.am, 18.pm] if date.hour < START or date.hour >= END: return False # Then filter by message text text = message.text if self._bad_keyword_detect(text) or self._keyword_detect(text): return True return False
def workday_end_time(bot, update): bot.sendChatAction( chat_id=update.message.chat_id, action=ChatAction.TYPING ) text_templates = ( '{}?????????????', '?????{}', '??{}????', '??????{}', '???{}????', '????{}', '??????: {}', ) now = TZ.normalize(update.message.date.replace(tzinfo=pytz.utc)) end_time = TZ.localize(datetime(now.year, now.month, now.day, hour=18)) duration = end_time - now hour = duration.seconds // 3600 minute = (duration.seconds % 3600) // 60 time_remaining = ' {} ??'.format(hour) if hour else '' time_remaining += ' {} ??'.format(minute) text = random.choice(text_templates).format(time_remaining) update.message.reply_text(text, quote=True)
def send_callback_keyboard(self, bot, chat_id, text, buttons, action=None): keyboard = [] for button in buttons: keyboard.append([button]) keyboard.append([InlineKeyboardButton("cancel", callback_data="null")]) markup = InlineKeyboardMarkup(keyboard) try: message_id = self._sent_keyboard_message_ids[chat_id] message = bot.edit_message_text(chat_id=chat_id, message_id=message_id, text=text, reply_markup=markup) except KeyError: message = bot.send_message(chat_id=chat_id, text=text, reply_markup=markup) self._sent_keyboard_message_ids[chat_id] = message.message_id if action: self._sent_keyboard[chat_id] = action
def admin_command(self, bot, update): chat_id = update.message.chat_id admin_chat_id = config.get_state(_admin_key) if not admin_chat_id: config.save_state(_admin_key, chat_id) config.save_secrets() text = "You're admin now!" elif chat_id == admin_chat_id: text = "You already are admin!" else: bot.send_message(text="There can be only one!", chat_id=chat_id) return text += "\nHidden comamnds:\n/" + "\n/".join(admin_commands) bot.send_message(text=text, chat_id=chat_id) # Password protected commands
def set_password_command(self, bot, update): chat_id = update.message.chat_id if not config.get_state(_password_enabled_key): bot.send_message(chat_id=chat_id, text="Please enable password protection with /togglepassword first") return split = update.message.text.split(" ") if len(split) < 2: bot.send_message(chat_id=chat_id, text="Usage: /setpassword [password]") return password = split[1].strip() if not password: bot.send_message(chat_id=chat_id, text="Password can't be empty") return global _password _password = password _clients.add(User(user=update.message.from_user)) bot.send_message(chat_id=chat_id, text="Successfully changed password")
def ban_user_command(self, bot, update): chat_id = update.message.chat_id if not config.get_state(_password_enabled_key): bot.send_message(chat_id=chat_id, text="Password is disabled") return if not _clients: bot.send_message(chat_id=chat_id, text="No clients logged in") return text = "Who should be banned?" keyboard_items = list( map(lambda client: InlineKeyboardButton(text=client.name, callback_data=str(client.user_id)), _clients)) @decorators.callback_keyboard_answer_handler def _action(chat_id, data): global _clients global _password _clients = set(filter(lambda client: str(client.user_id) != data, _clients)) _password = None return "Banned user. Please set a new password." self.send_callback_keyboard(bot, chat_id, text, keyboard_items, _action)
def echo(bot, update): global max_ if update.message.from_user.username == "andremesquita96": user_message = update.message.text if randint(1, max_) == 3: if '?' in user_message: bot.sendMessage(update.message.chat_id, text='{}, {}'.format( 'Responde', choice(automatic_swearwords))) else: bot.sendMessage(update.message.chat_id, text='{}, {}'.format( user_message, choice(automatic_swearwords))) if max_ < 6: max_ += 1 else: max_ = 3
def get_short_url(long_url: str): """ generate short url using Sina Weibo api :param long_url: the original url :return: short url """ # change long url to `t.cn` short url sina_api_prefix = 'http://api.t.sina.com.cn/short_url/shorten.json?source=3271760578&url_long=' try: r = requests.get(sina_api_prefix + long_url) obj = json.loads(r.text) short_url = obj[0]['url_short'] return short_url # when error occurs, return origin long url except: traceback.print_exc() return long_url
def audio_from_telegram(bot: telegram.Bot, update: telegram.Update): message: telegram.Message = update.message tg_group_id = message.chat_id # telegram group id forward_index = get_forward_index(tg_group_id=tg_group_id) reply_entity = list() reply_entity.append({ 'type': 'text', 'data': {'text': '[ ?? ]'} }) qq_message_id = send_from_tg_to_qq(forward_index, message=reply_entity, tg_group_id=tg_group_id, tg_user=message.from_user, tg_forward_from=message, tg_reply_to=message.reply_to_message) global_vars.mdb.append_message(qq_message_id, message.message_id, forward_index, 0)
def get_location_from_baidu(latitude: Union[float, str], longitude: Union[float, str]): params = ( ('callback', 'renderReverse'), ('location', str(latitude) + ',' + str(longitude)), ('output', 'json'), ('pois', '1'), ('ak', BAIDU_API), ) result = requests.get('http://api.map.baidu.com/geocoder/v2/', params=params) result = result.text.replace('renderReverse&&renderReverse(', '')[:-1] result_json = json.loads(result) if result_json['status'] == 0: return result_json['result']['formatted_address'] else: return 'Baidu API returned an error code: ' + str(result_json['status'])
def location_from_telegram(bot: telegram.Bot, update: telegram.Update): message: telegram.Message = update.message tg_group_id = message.chat_id # telegram group id forward_index = get_forward_index(tg_group_id=tg_group_id) latitude = message.location.latitude longitude = message.location.longitude reply_entity = list() reply_entity.append({ 'type': 'text', 'data': {'text': '????????' + get_location_from_baidu(latitude, longitude)} }) qq_message_id = send_from_tg_to_qq(forward_index, message=reply_entity, tg_group_id=tg_group_id, tg_user=message.from_user, tg_forward_from=message, tg_reply_to=message.reply_to_message) global_vars.mdb.append_message(qq_message_id, message.message_id, forward_index, 0)
def main(): # Create the EventHandler and pass it your bot's token. updater = Updater(api_key, workers=64) # Get the dispatcher to register handlers dp = updater.dispatcher # on noncommand i.e message - return not found dp.add_handler(MessageHandler(Filters.command, maintenance)) dp.add_handler(MessageHandler(Filters.text, maintenance)) # log all errors dp.add_error_handler(error) # Start the Bot #updater.start_polling() updater.start_webhook(listen='127.0.0.1', port=int(listen_port), url_path=api_key) updater.bot.setWebhook('https://{0}/{1}'.format(domain, api_key)) # Run the bot until the you presses Ctrl-C or the process receives SIGINT, # SIGTERM or SIGABRT. This should be used most of the time, since # start_polling() is non-blocking and will stop the bot gracefully. updater.idle()
def broadcast(bot, update): info_log(update) bot = Bot(api_key) # list users from MySQL accounts_list = mysql_select_accounts_balances() # some users are bugged & stop broadcast - they deleted chat with bot. So we blacklist them BLACK_LIST = mysql_select_blacklist() for account in accounts_list: # if not in blacklist and has balance if ((account[0] not in BLACK_LIST) and (int(account[1]) > 0)): mysql_set_blacklist(account[0]) print(account[0]) push_simple(bot, account[0], update.message.text.replace('/broadcast ', '')) sleep(0.2) mysql_delete_blacklist(account[0]) # if someone deleted chat, broadcast will fail and he will remain in blacklist # bootstrap
def price_text(bot, update): user_id = update.message.from_user.id chat_id = update.message.chat_id lang_id = mysql_select_language(user_id) price = mysql_select_price() last_price_merc = ('%.8f' % (float(price[0][0]) / (10 ** 8))) ask_price_merc = ('%.8f' % (float(price[0][3]) / (10 ** 8))) bid_price_merc = ('%.8f' % (float(price[0][4]) / (10 ** 8))) last_price_grail = ('%.8f' % (float(price[1][0]) / (10 ** 8))) ask_price_grail = ('%.8f' % (float(price[1][3]) / (10 ** 8))) bid_price_grail = ('%.8f' % (float(price[1][4]) / (10 ** 8))) last_price_flip = ('%.8f' % (float(price[2][0]) / (10 ** 8))) ask_price_flip = ('%.8f' % (float(price[2][3]) / (10 ** 8))) bid_price_flip = ('%.8f' % (float(price[2][4]) / (10 ** 8))) high_price = ('%.8f' % (max(float(price[0][1]), float(price[1][1]), float(price[0][0])) / (10 ** 8))) low_price = ('%.8f' % (min(float(price[0][2]), float(price[1][2])) / (10 ** 8))) volume = int(price[0][5]) + int(price[1][5]) + int(price[2][5]) volume_btc = ('%.2f' % ((float(price[0][6]) + float(price[1][6])) / (10 ** 8))) text = lang_text('price', lang_id).format(last_price_merc, ask_price_merc, bid_price_merc, last_price_grail, ask_price_grail, bid_price_grail, high_price, low_price, "{:,}".format(volume), volume_btc, last_price_flip, ask_price_flip, bid_price_flip) lang_keyboard(lang_id, bot, chat_id, text) sleep(1) message_markdown(bot, chat_id, lang_text('price_options', lang_id))
def main(): logger.info("Loading handlers for telegram bot") # Default dispatcher (this is related to the first bot in settings.TELEGRAM_BOT_TOKENS) dp = DjangoTelegramBot.dispatcher # To get Dispatcher related to a specific bot # dp = DjangoTelegramBot.getDispatcher('BOT_n_token') #get by bot token # dp = DjangoTelegramBot.getDispatcher('BOT_n_username') #get by bot username # on different commands - answer in Telegram dp.add_handler(CommandHandler("start", start)) dp.add_handler(CommandHandler("help", help)) dp.add_handler(CommandHandler("startgroup", startgroup)) dp.add_handler(CommandHandler("me", me)) dp.add_handler(CommandHandler("chat", chat)) dp.add_handler(MessageHandler(Filters.forwarded , forwarded)) # on noncommand i.e message - echo the message on Telegram dp.add_handler(MessageHandler(Filters.text, echo)) # log all errors dp.add_error_handler(error)
def start(bot, update, args): message = update.message user = message.from_user res = False link = "http://www.teamspeak.com/invite/%s/" % TS_HOST reply_markup = InlineKeyboardMarkup([[InlineKeyboardButton(_('Connect TS'), url=link)]]) if args: res = utils.validate_invitation_token(token=args[0], user_id=user.id, name=user.first_name) if res: text = _("Welcome %s you're now activated, using /who or /ts you can check who's in teamspeak.") % \ user.first_name elif utils.is_allow(user.id): text = _("Hello %s, using /who or /ts you can check who's in teamspeak.") % user.first_name else: reply_markup = None text = _("Welcome, ask admin to generate an invitation link by /generate") bot.sendMessage(message.chat_id, text, reply_to_message_id=message.message_id, reply_markup=reply_markup)
def button(self, bot, update): query = update.callback_query bot.editMessageText(text="Capturing image of topic: %s" % query.data, chat_id=query.message.chat_id, message_id=query.message.message_id) img_file_path = self.get_image(image_topic=query.data) bot.editMessageText(text="Uploading captured image of topic: %s" % query.data, chat_id=query.message.chat_id, message_id=query.message.message_id) bot.send_photo(chat_id=query.message.chat_id, photo=open(img_file_path, 'rb'), caption="This is what I see on topic " + query.data) # update.message.reply_photo(photo=open(img_file_path, 'rb'), # caption="This is what I see on topic " + # query.data)
def __init__(self): token = rospy.get_param('/telegram/token', None) if token is None: rospy.logerr("No token found in /telegram/token param server.") exit(0) else: rospy.loginfo("Got telegram bot token from param server.") # Set CvBridge self.bridge = CvBridge() # Create the EventHandler and pass it your bot's token. updater = Updater(token) # Get the dispatcher to register handlers dp = updater.dispatcher # on noncommand i.e message - echo the message on Telegram dp.add_handler(MessageHandler(Filters.text, self.pub_received)) # log all errors dp.add_error_handler(self.error) # Start the Bot updater.start_polling()
def pub_received(self, bot, update): rospy.loginfo("Received: " + str(update)) valid_necessary_words = ['what do you see', 'picture', 'camera'] found_word = False for v in valid_necessary_words: if v in update.message.text.lower(): img_file_path = self.get_image() update.message.reply_photo(photo=open(img_file_path, 'rb'), caption="This is what I see!") found_word = True break if not found_word: update.message.reply_text("Try any of: " + str(valid_necessary_words))
def main(token): # Create the EventHandler and pass it your bot's token. updater = Updater(token) # Get the dispatcher to register handlers dp = updater.dispatcher # on different commands - answer in Telegram dp.add_handler(CommandHandler("start", start)) dp.add_handler(CommandHandler("help", help)) # on noncommand i.e message - echo the message on Telegram dp.add_handler(MessageHandler(Filters.text, echo)) # log all errors dp.add_error_handler(error) # Start the Bot updater.start_polling() # Run the bot until the you presses Ctrl-C or the process receives SIGINT, # SIGTERM or SIGABRT. This should be used most of the time, since # start_polling() is non-blocking and will stop the bot gracefully. updater.idle()
def __init__(self): self.base_pub = rospy.Publisher("/base_controller/command", Twist, queue_size=1) token = rospy.get_param('/telegram/token', None) # Create the Updater and pass it your bot's token. updater = Updater(token) # Add command and error handlers updater.dispatcher.add_handler(CommandHandler('start', self.start)) updater.dispatcher.add_handler(CommandHandler('help', self.help)) updater.dispatcher.add_handler(MessageHandler(Filters.text, self.echo)) updater.dispatcher.add_error_handler(self.error) # Start the Bot updater.start_polling()
def __init__(self): self.history = {} self.updater = Updater(TOKEN) self.name = str(self).split(' ')[-1][:-1] self.dp = self.updater.dispatcher self.dp.add_handler(CommandHandler("start", start)) self.dp.add_handler(CommandHandler("help", help)) self.dp.add_handler(MessageHandler([Filters.text], echo)) self.dp.add_error_handler(error) self.stories = StoriesHandler() logger.info('I\'m alive!')
def echo(bot, update): text = update.message.text md = mess2dict(update.message) try: sender_fname = md['from']['first_name'] sender_lname = md['from']['last_name'] except: sender_fname = str(md['from']['id']) sender_lname = "" logger.info("{} {} says: {}".format(sender_fname, sender_lname, text)) sender_id = str(md['from']['id']) msg_id = str(md["message_id"]) if text: ai.history[sender_id]['context'].append(text) rep = ai.history[sender_id]["state_tracker"].get_reply(text) ai.history[sender_id]['replies'].append(rep) logger.info('bot replies: {}'.format(rep)) bot_send_message(bot, update, rep)
def tag(bot, update, chat_id=None): if chat_id == None: chat_id = update.message.chat_id else: pass try: tags = update.message.text.split(' ', 1)[1] except Exception as e: try: tags = lastcmd[update.callback_query.message.chat_id].split(' ', 1)[1] except: bot.sendMessage(chat_id, "Please, use `/tag <tag>` instead.", parse_mode=ParseMode.MARKDOWN) pages = "30" try: lastcmd[update.message.chat_id]=update.message.text except Exception as e: pass noparser(bot, update, tags=tags, pages=pages, chat_id=chat_id) if update.message != None: botanio(bot, update, message=update.message, uid=chat_id, event_name="/tag") else: botanio(bot, update, message = None, uid=chat_id, event_name="/tag")
def text(bot, update): if update.message.text == "Anime": #Keyboard handler anime(bot, update, chat_id=update.message.chat_id) elif update.message.text == "Hentai (18+)": hentai(bot, update, chat_id=update.message.chat_id) elif update.message.text == "Uncensored (18+)": uncensored(bot, update, chat_id=update.message.chat_id) elif update.message.text == "Yuri (18+)": yuri(bot, update, chat_id=update.message.chat_id) elif update.message.text == "Loli (18+)": loli(bot, update, chat_id=update.message.chat_id) elif update.message.text == "Ecchi": ecchi(bot, update, chat_id=update.message.chat_id) elif update.message.text == "Neko": neko(bot, update, chat_id=update.message.chat_id) elif update.message.text == "Wallpaper": wallpaper(bot, update, chat_id=update.message.chat_id) else: pass
def handle_message(self, bot, update): print("Received", update.message) chat_id = update.message.chat_id if update.message.text == "/start": # ???? ???????? ??????? /start, ???????? ??? ? ?????? -- ??? # ????? ??????? ????????? ???????? ??????, ???? ??? ???? self.handlers.pop(chat_id, None) if chat_id in self.handlers: # ???? ?????? ??? ?????, ?? ???? ???????????? .send(), ????? # ???????? ? ????????? ????? ???????????? try: answer = self.handlers[chat_id].send(update.message) except StopIteration: # ???? ??? ???? ????????? ?????????? -- ??? ??????, ???????? ??????? ? ?????? del self.handlers[chat_id] # (???????? ?????????, ???? ????? ????? ??????, ??? ???????????? ? ???? ???????) return self.handle_message(bot, update) else: # ?????? ?????? ??????????. defaultdict ???????? ????? ????????? ??? ????? # ??????, ? ?? ?????? ????? ??????? ?????? ????????? ? ??????? .next() # (.send() ??????????? ?????? ????? ??????? yield) answer = next(self.handlers[chat_id]) # ?????????? ?????????? ????? ???????????? print("Answer: %r" % answer) self._send_answer(bot, chat_id, answer)
def test_help(self): # this tests the help handler. So first insert the handler def help(bot, update): update.message.reply_text('Help!') # Then register the handler with he updater's dispatcher and start polling self.updater.dispatcher.add_handler(CommandHandler("help", help)) self.updater.start_polling() # We want to simulate a message. Since we don't care wich user sends it we let the MessageGenerator # create random ones update = self.mg.get_message(text="/help") # We insert the update with the bot so the updater can retrieve it. self.bot.insertUpdate(update) # sent_messages is the list with calls to the bot's outbound actions. Since we hope the message we inserted # only triggered one sendMessage action it's length should be 1. self.assertEqual(len(self.bot.sent_messages), 1) sent = self.bot.sent_messages[0] self.assertEqual(sent['method'], "sendMessage") self.assertEqual(sent['text'], "Help!") # Always stop the updater at the end of a testcase so it won't hang. self.updater.stop()
def test_start(self): def start(bot, update): update.message.reply_text('Hi!') self.updater.dispatcher.add_handler(CommandHandler("start", start)) self.updater.start_polling() # Here you can see how we would handle having our own user and chat user = self.ug.get_user(first_name="Test", last_name="The Bot") chat = self.cg.get_chat(user=user) update = self.mg.get_message(user=user, chat=chat, text="/start") self.bot.insertUpdate(update) self.assertEqual(len(self.bot.sent_messages), 1) sent = self.bot.sent_messages[0] self.assertEqual(sent['method'], "sendMessage") self.assertEqual(sent['text'], "Hi!") self.updater.stop()
def test_echo(self): def echo(bot, update): update.message.reply_text(update.message.text) self.updater.dispatcher.add_handler(MessageHandler(Filters.text, echo)) self.updater.start_polling() update = self.mg.get_message(text="first message") update2 = self.mg.get_message(text="second message") self.bot.insertUpdate(update) self.bot.insertUpdate(update2) self.assertEqual(len(self.bot.sent_messages), 2) sent = self.bot.sent_messages self.assertEqual(sent[0]['method'], "sendMessage") self.assertEqual(sent[0]['text'], "first message") self.assertEqual(sent[1]['text'], "second message") self.updater.stop()
def filef(bot,update): # get file id file = bot.getFile(update.message.document.file_id) # create a randon temp file name tf = tempfile.mkstemp() # download file file.download(tf[1]) # read file content try: tfco = open(tf[1],"r") tfc = tfco.read() tfco.close() except Exception as e: # if it is not text file bot.sendMessage(chat_id=update.message.chat_id,reply_to_message_id=update.message.message_id,text=" ?? Please send me a text file") # get user name for author try: author = update.message.from_user.first_name + " " + update.message.from_user.last_name except Exception: author = "Anonymous user" # call paste function url = paste(tfc , author) # replay the url to user bot.sendMessage(chat_id=update.message.chat_id,reply_to_message_id=update.message.message_id,text=url)
def start_consensus(bot, update): """Cleans/initializes data structures (runs new reasoning) :param bot: :param update: telegranm.ext.Update :return: """ # TODO(scezar): right now requested format is /start_consensus int h global message_stack message_stack = [] operated_message = update.message.text new_meeting_len = '' for letter in operated_message: if letter.isdigit(): new_meeting_len += letter elif new_meeting_len: global meeting_length meeting_length = int(new_meeting_len) return
def intent_extractor(bot, update): """ :param bot: :param update: :return: Parses the intent and calls the appropriate callback """ intent = intent_parser.extract_intent(update.message.text) global state feedback, give_reply, state = gf.give_feedback(intent,state) if give_reply: bot.sendMessage(update.message.chat_id, text=feedback) if state.value < States.FINALIZING.value: process_callback[intent](bot,update) else: finalize_schedule(bot,update)
def help(self, bot, update): icon = emojize(":information_source: ", use_aliases=True) text = icon + " The following commands are available:\n" commands = [["/new", "Create new alarm"], ["/list", "List alarms, enable/disable and remove alarms"], ["/stop", "Stop all alarms"], ["/timezone", "Set the timezone (only works if sudo requires no password)"], ["/test", "Play an alarm to test"], ["/time", "Print time and timezone on device"], ["/help", "Get this message"] ] for command in commands: text += command[0] + " " + command[1] + "\n" bot.send_message(chat_id=update.message.chat_id, text=text)
def start_command(self, bot, update, args): user_id = update.message.from_user.id chat= str(update.message.chat_id) if str(user_id) in allowed_users.values() and chat != chats['group']: self.store.new_draft(user_id) bot.sendMessage(update.message.chat_id,parse_mode='Markdown', text="Crearem una publicació per a compartir.\n\n\u0031\u20E3 El primer que heu de fer és enviar-me el *nom de la publicació*.\n\nSi no voleu continuar amb el procés, envieu /cancel.", reply_markup=ReplyKeyboardHide()) else: f_name = update.message.from_user.first_name chat= str(update.message.chat_id) if not function['production'] and chat != chats['group']: bot.sendMessage(update.message.chat_id, parse_mode='Markdown', text= "Robot destinat a proves internes de Softcatalà. Si cerqueu el bot públic de Softcatalà el trobareu a @SoftcatalaBot.") elif function['production'] and chat != chats['group']: bot.sendMessage(update.message.chat_id, text= str(f_name) + ", no teniu permisos per utilitzar aquesta ordre. Les ordres que teniu disponibles ara mateix són: /baixa /android /ios /tdesktop i /help.") else: bot.sendMessage(update.message.chat_id, text= "No es permet crear publicacions des del grup.")
def help_command_callback(self, bot, update): self.init_chat(update.message) event_name = "help" entities = update.message.parse_entities(types=[MessageEntity.BOT_COMMAND]) for entity_value in entities.values(): event_name = entity_value.replace("/", "").replace("@{}".format(self.bot_username), "") break self.log_and_botan_track(event_name, update.message) chat_id = update.message.chat_id chat_type = update.message.chat.type reply_to_message_id = update.message.message_id flood = self.chat_storage[str(chat_id)]["settings"]["flood"] if chat_type != Chat.PRIVATE and flood == "no": self.rant_and_cleanup(bot, chat_id, self.RANT_TEXT_PUBLIC, reply_to_message_id=reply_to_message_id) else: bot.send_message(chat_id=chat_id, text=self.HELP_TEXT, parse_mode='Markdown', disable_web_page_preview=True)
def get_settings_inline_keyboard(self, chat_id): mode = self.chat_storage[str(chat_id)]["settings"]["mode"] flood = self.chat_storage[str(chat_id)]["settings"]["flood"] emoji_yes = "?" emoji_no = "?" button_dl = InlineKeyboardButton(text=" ".join([emoji_yes if mode == "dl" else emoji_no, "Download"]), callback_data=" ".join(["settings", "dl"])) button_link = InlineKeyboardButton(text=" ".join([emoji_yes if mode == "link" else emoji_no, "Links"]), callback_data=" ".join(["settings", "link"])) button_ask = InlineKeyboardButton(text=" ".join([emoji_yes if mode == "ask" else emoji_no, "Ask"]), callback_data=" ".join(["settings", "ask"])) button_flood = InlineKeyboardButton(text=" ".join([emoji_yes if flood == "yes" else emoji_no, "Flood"]), callback_data=" ".join(["settings", "flood"])) button_close = InlineKeyboardButton(text=" ".join([emoji_no, "Close settings"]), callback_data=" ".join(["settings", "close"])) inline_keyboard = InlineKeyboardMarkup([[button_dl, button_link, button_ask], [button_flood, button_close]]) return inline_keyboard
def help_me(bot, update): response = '?????? ????????? ??????:\n' \ '/start - ???????? ????????????????? ????\n' \ '/help - ??????? ?? ????????????? ????\n\n' \ '??? ?????????? ?????? ??????? ??????? ?? ?????? "???????? ???????", ? ????? ???????:\n' \ '{???? ???????} {????? ???????} {???????? ???????}\n' \ '????????:\n' \ '01.01.2017 00:00 ?????????? ?????? ? ????? ?????\n' \ '< ??? >\n' \ '01.01.17 00:00 ?????????? ?????? ? ????? ?????\n\n' \ '?????, ?????????????? ??????????? ?????? ?????\n' \ '????? ???????? ???? ? ??????? ????? ??????? ???? ? ????? ' \ '??????????? ?? ?????? ?? ??????? ? ????????? ???? < #{?????????} >\n' \ '????????:\n' \ '08.02.2017 09:00 01.02.2017 20:00 #?????? ???? ???????? ??????????\n\n' \ '??? ???????? ??????? ??????? ?? ?????? "??????? ???????", ? ????? ??????? ??? ????????\n' \ '??? ????????? ??????? ????????? ?????????????\n' \ '??? ????? ???????? ????? ?????! ??????????? ? ???!' bot.sendMessage(chat_id=update.message.chat_id, text=response, reply_markup=keyboard) logging.info('Command \'help\' invoked by chat id [{0}]'.format(update.message.chat_id)) # ????????? ??????
def telegram_command_handle(updater): """ ????????? ?????? ?? ???? Telegram :param updater: :return: """ dispatcher = updater.dispatcher start_handler = CommandHandler('start', start) dispatcher.add_handler(start_handler) help_me_handler = CommandHandler('help', help_me) dispatcher.add_handler(help_me_handler) echo_handler = MessageHandler(Filters.text, echo) dispatcher.add_handler(echo_handler)
def register_text_handler(self, callback, allow_edited=False): """Registers text message handler Args: callback(function): callable object to execute allow_edited(Optional[bool]): pass edited messages """ @utils.log(logger, print_ret=False) def process_update(bot, update): lang = utils.get_lang(self._storage, update.effective_user) callback(update.effective_message, lang) self._dispatcher.add_handler(MessageHandler(Filters.text, process_update, edited_updates=allow_edited))
def set_bot(self, bot): """ Sets the main bot, the bot must be an instance of `eddie.bot.Bot`. This method is in charge of registering all the bot's commands and default message handlers. The commands will be handled by `TelegramEndpoint.default_command_handler`, all the other messages will be handled by `TelegramEndpoint.default_message_handler`. """ self._bot = bot self._telegram.dispatcher.add_handler( MessageHandler( Filters.text, self.default_message_handler ) ) for command in self._bot.command_names: self._telegram.dispatcher.add_handler( CommandHandler( command, self.default_command_handler ) )
def default_message_handler(self, bot, update): """ This is the method that will be called for every new message that is not a command. It will ask the bot how to reply to the user. The input parameters (`bot` and `update`) are default parameters used by telegram. """ in_message = update.message.text update.message.reply_text(self._bot.default_response(in_message))
def default_command_handler(self, bot, update): """ All the commands will pass through this method. It will use the bot's command to get the output for the user. The input parameters (`bot` and `update`) are default parameters used by telegram. """ command = update.message.text[1:] command_handler = self._bot.__getattribute__(command) update.message.reply_text(command_handler())
def _keyword_detect(self, text): keywords = ( '???', '???', '???', '????', '???', # ?? '???', '???', '???', '????', ) time_query_keywords = ( '??', '??', '????', '??', '??', '??', ) eye_catching_keywords = ( '??', '??', '????', ) for keyword in keywords: if keyword in text: return True if u'??' in text: for k in time_query_keywords: if k in text: return True if u'??' in text: for k in eye_catching_keywords: if k in text: return True return False
def search_youtube(text): url = 'https://www.youtube.com' while True: r = requests.get(url + '/results', params={'search_query': text}) soup = BeautifulSoup(r.content, 'html.parser') tag = soup.find('a', {'rel': 'spf-prefetch'}) title, video_url = tag.text, url + tag['href'] if 'googleads' not in video_url: break return title, video_url
def music(bot, update): title, video_url = search_youtube(update.message.text) music_dict = download(title, video_url) update.message.reply_audio(**music_dict) os.remove(title + '.mp3')
def main(): u = Updater('YOUR-TOKEN') dp = u.dispatcher dp.add_handler(CommandHandler("start", start)) dp.add_handler(MessageHandler(Filters.text, music)) u.start_polling() u.idle()
def _register_commands(self, _dispatcher, plugins): # keyboard answer handlers _dispatcher.add_handler(MessageHandler(Filters.text, self.handle_message)) _dispatcher.add_handler(CallbackQueryHandler(self.handle_callback_query)) # public commands _dispatcher.add_handler(CommandHandler('start', self.start_command)) _dispatcher.add_handler(CommandHandler('showqueue', self.show_queue_command)) _dispatcher.add_handler(CommandHandler('currentsong', self.current_song_command)) _dispatcher.add_handler(CommandHandler('cancel', self.cancel_command)) _dispatcher.add_handler(CommandHandler('login', self.login_command)) _dispatcher.add_handler(CommandHandler('subscribe', self.subscribe)) _dispatcher.add_handler(CommandHandler('unsubscribe', self.unsubscribe)) # gmusic_password protected commands _dispatcher.add_handler(CommandHandler('next', self.next_command)) _dispatcher.add_handler(CommandHandler('play', self.play_command)) _dispatcher.add_handler(CommandHandler('pause', self.pause_command)) _dispatcher.add_handler(CommandHandler('skip', self.skip_command)) _dispatcher.add_handler(CommandHandler('movesong', self.move_song_command)) # admin commands _dispatcher.add_handler(CommandHandler('admin', self.admin_command)) _dispatcher.add_handler(CommandHandler('clearqueue', self.clear_queue_command)) _dispatcher.add_handler(CommandHandler('reset', self.reset_command)) _dispatcher.add_handler(CommandHandler('exit', self.exit_command)) _dispatcher.add_handler(CommandHandler('ip', self.ip_command)) _dispatcher.add_handler(CommandHandler('togglepassword', self.toggle_password_command)) _dispatcher.add_handler(CommandHandler('setpassword', self.set_password_command)) _dispatcher.add_handler(CommandHandler('banuser', self.ban_user_command)) _dispatcher.add_handler(CommandHandler('setquality', self.set_quality_command)) _dispatcher.add_handler(CommandHandler('stationremove', self.station_remove_command)) _dispatcher.add_handler(CommandHandler('stationreload', self.station_reload_command)) for plugin in plugins: _dispatcher.add_handler( CommandHandler(plugin.get_label(), lambda bot, update: plugin.run_command(self, bot, update))) _dispatcher.add_handler(InlineQueryHandler(self.get_inline_query_handler())) _dispatcher.add_handler(ChosenInlineResultHandler(self.queue_command))