def create_telegram_update(message_text): """ Helper function: create an "Update" to simulate a message sent to the Telegram bot. """ from datetime import datetime message = telegram.Message( message_id=0, from_user=telegram.User(0, 'greenkey'), date=datetime.now(), chat=telegram.Chat(0, ''), text=message_text ) return telegram.Update( update_id=0, message=message )
def get_reply_to(reply_to_message: telegram.Message, forward_index: int): """ Combine replied user's first name and last name and format into (reply to from xxx) :param reply_to_message: the replied message :param forward_index: forward index :return: combined (reply to from xxx) """ if not reply_to_message or not reply_to_message.from_user: return '' reply_to = get_full_user_name(reply_to_message.from_user) if reply_to_message.from_user.id == global_vars.tg_bot_id: tg_message_id = reply_to_message.message_id saved_message = global_vars.mdb.retrieve_message(tg_message_id, forward_index) if not saved_message: return '' qq_number = saved_message[2] if not qq_number: # message is bot command (tg side) return '' reply_to = get_qq_name(qq_number, forward_index) return '(?' + reply_to + ')'
def audio_from_telegram(bot: telegram.Bot, update: telegram.Update): message: telegram.Message = update.message tg_group_id = message.chat_id # telegram group id forward_index = get_forward_index(tg_group_id=tg_group_id) reply_entity = list() reply_entity.append({ 'type': 'text', 'data': {'text': '[ ?? ]'} }) qq_message_id = send_from_tg_to_qq(forward_index, message=reply_entity, tg_group_id=tg_group_id, tg_user=message.from_user, tg_forward_from=message, tg_reply_to=message.reply_to_message) global_vars.mdb.append_message(qq_message_id, message.message_id, forward_index, 0)
def pic_link_on(forward_index: int, tg_group_id: int=None, tg_user: telegram.User=None, tg_message_id: int=None, tg_reply_to: telegram.Message=None, qq_group_id: int=None, qq_discuss_id: int=None, qq_user: int=None): if JQ_MODE: return IMAGE_LINK_MODE[forward_index] = True message = 'QQ ?????????' return send_both_side(forward_index, message, qq_group_id, qq_discuss_id, tg_group_id, tg_message_id)
def pic_link_off(forward_index: int, tg_group_id: int=None, tg_user: telegram.User=None, tg_message_id: int=None, tg_reply_to: telegram.Message=None, qq_group_id: int=None, qq_discuss_id: int=None, qq_user: int=None): if JQ_MODE: return IMAGE_LINK_MODE[forward_index] = False message = 'QQ ?????????' return send_both_side(forward_index, message, qq_group_id, qq_discuss_id, tg_group_id, tg_message_id)
def recall(tg_group_id: int, tg_user: telegram.User, tg_message_id: int, tg_reply_to: telegram.Message): forward_index = get_forward_index(tg_group_id=tg_group_id) if forward_index == -1: return result = recall_message(forward_index, tg_reply_to) if result == -1: text = 'Please refer to a message.' elif result == -2: text = 'Message not recallable.' elif result == -3: text = 'Recalling messages from other QQ users is not supported.', elif result == -4: text = 'Message sent more than two minutes ago. Recalling failed.' else: text = 'Message recalled.' global_vars.tg_bot.sendMessage(chat_id=tg_group_id, text=text, reply_to_message_id=tg_message_id)
def drive_mode_on(forward_index: int, tg_group_id: int=None, tg_user: telegram.User=None, tg_message_id: int=None, tg_reply_to: telegram.Message=None, qq_group_id: int=None, qq_discuss_id: int=None, qq_user: int=None): global_vars.DRIVE_MODE[forward_index] = True message = 'Status changed: 451' return send_both_side(forward_index, message, qq_group_id, qq_discuss_id, tg_group_id, tg_message_id)
def drive_mode_off(forward_index: int, tg_group_id: int=None, tg_user: telegram.User=None, tg_message_id: int=None, tg_reply_to: telegram.Message=None, qq_group_id: int=None, qq_discuss_id: int=None, qq_user: int=None): global_vars.DRIVE_MODE[forward_index] = False message = 'Status changed: 200' return send_both_side(forward_index, message, qq_group_id, qq_discuss_id, tg_group_id, tg_message_id)
def dice(tg_group_id: int, tg_user: telegram.User, tg_message_id: int, tg_reply_to: telegram.Message = None): forward_index = get_forward_index(tg_group_id=tg_group_id) if forward_index == -1: return reply_entity = list() reply_entity.append({ 'data': {'text': 'threw a dice'}, 'type': 'text'}) reply_entity.append({ 'data': {'type': '1'}, 'type': 'dice'}) send_from_tg_to_qq(forward_index, reply_entity, tg_group_id=tg_group_id, tg_user=tg_user)
def rps(tg_group_id: int, tg_user: telegram.User, tg_message_id: int, tg_reply_to: telegram.Message): forward_index = get_forward_index(tg_group_id=tg_group_id) if forward_index == -1: return reply_entity = list() reply_entity.append({ 'data': {'text': 'played rock–paper–scissors'}, 'type': 'text'}) reply_entity.append({ 'data': {'type': '1'}, 'type': 'rps'}) send_from_tg_to_qq(forward_index, reply_entity, tg_group_id=tg_group_id, tg_user=tg_user)
def update_namelist(forward_index: int, tg_group_id: int=None, tg_user: telegram.User=None, tg_message_id: int=None, tg_reply_to: telegram.Message=None, qq_group_id: int=None, qq_discuss_id: int=None, qq_user: int=None): global_vars.group_members[forward_index] = global_vars.qq_bot.get_group_member_list(group_id=FORWARD_LIST[forward_index]['QQ']) message = 'QQ????????' return send_both_side(forward_index, message, qq_group_id, qq_discuss_id, tg_group_id, tg_message_id)
def show_red_pack(forward_index: int, tg_group_id: int=None, tg_user: telegram.User=None, tg_message_id: int=None, tg_reply_to: telegram.Message=None, qq_group_id: int=None, qq_discuss_id: int=None, qq_user: int=None): message = '????????????????????? https://qr.alipay.com/c1x00417d1veog0b99yea4e' return send_both_side(forward_index, message, qq_group_id, qq_discuss_id, tg_group_id, tg_message_id)
def link_chat_show_list(self, bot, update, args=None): """ Show the list of available chats for linking. Triggered by `/link`. Args: bot: Telegram Bot instance update: Message update """ args = args or [] if update.message.from_user.id != update.message.chat_id: links = db.get_chat_assoc(master_uid="%s.%s" % (self.channel_id, update.message.chat_id)) if links: return self.link_chat_gen_list(bot, update.message.chat_id, filter=" ".join(args), chats=links) self.link_chat_gen_list(bot, update.message.from_user.id, filter=" ".join(args))
def unlink_all(self, bot, update): """ Unlink all chats linked to the telegram group. Triggered by `/unlink_all`. Args: bot: Telegram Bot instance update: Message update """ if update.message.chat.id == update.message.from_user.id: return bot.send_message(update.message.chat.id, "Send `/unlink_all` to a group to unlink all remote chats " "from it.", parse_mode=telegram.ParseMode.MARKDOWN, reply_to_message_id=update.message.message_id) assocs = db.get_chat_assoc(master_uid="%s.%s" % (self.channel_id, update.message.chat.id)) if len(assocs) < 1: return bot.send_message(update.message.chat.id, "No chat is linked to the group.", reply_to_message_id=update.message.message_id) else: db.remove_chat_assoc(master_uid="%s.%s" % (self.channel_id, update.message.chat.id)) return bot.send_message(update.message.chat.id, "All chats has been unlinked from this group. (%s)" % len(assocs), reply_to_message_id=update.message.message_id)
def extra_help(self, bot, update): """ Show list of extra functions and their usage. Triggered by `/extra`. Args: bot: Telegram Bot instance update: Message update """ msg = "List of slave channel features:" for n, i in enumerate(sorted(self.slaves)): i = self.slaves[i] msg += "\n\n<b>%s %s</b>" % (i.channel_emoji, i.channel_name) xfns = i.get_extra_functions() if xfns: for j in xfns: fn_name = "/%s_%s" % (n, j) msg += "\n\n%s <b>(%s)</b>\n%s" % ( fn_name, xfns[j].name, xfns[j].desc.format(function_name=fn_name)) else: msg += "No command found." bot.send_message(update.message.chat.id, msg, parse_mode="HTML")
def extra_call(self, bot, update, groupdict=None): """ Call an extra function from slave channel. Args: bot: Telegram Bot instance update: Message update groupdict: Parameters offered by the message """ if int(groupdict['id']) >= len(self.slaves): return self._reply_error(bot, update, "Invalid slave channel ID. (XC01)") ch = self.slaves[sorted(self.slaves)[int(groupdict['id'])]] fns = ch.get_extra_functions() if groupdict['command'] not in fns: return self._reply_error(bot, update, "Command not found in selected channel. (XC02)") header = "%s %s: %s\n-------\n" % (ch.channel_emoji, ch.channel_name, fns[groupdict['command']].name) msg = bot.send_message(update.message.chat.id, header + "Please wait...") result = fns[groupdict['command']](" ".join(update.message.text.split(' ', 1)[1:])) bot.editMessageText(text=header + result, chat_id=update.message.chat.id, message_id=msg.message_id)
def poll(self): """ Message polling process. """ self.polling_from_tg() while True: try: m = self.queue.get() if m is None: break self.logger.info("Got message from queue\nType: %s\nText: %s\n----" % (m.type, m.text)) threading.Thread(target=self.process_msg, args=(m,)).start() self.queue.task_done() self.logger.info("Msg sent to TG, task_done marked.") except Exception as e: self.logger.error("Error occurred during message polling") self.logger.error(repr(e)) self.bot.stop() self.poll() self.logger.debug("Gracefully stopping %s (%s).", self.channel_name, self.channel_id) self.bot.stop() self.logger.debug("%s (%s) gracefully stopped.", self.channel_name, self.channel_id)
def __get_dummy_update(): """Returns a dummy update instance""" user = User(1337, "@foouser") chat = Chat(1, None) message = Message(1, user, datetime.now(), chat) return Update(1, message=message)
def test_telegram_default_response(mocker): """ Test that the Telegram bot correctly reply with the default response. """ mocker.patch('eddie.endpoints.telegram.Updater') mock_messagehandler = mocker.patch( 'eddie.endpoints.telegram.MessageHandler') reply_text_m = mocker.patch('telegram.Message.reply_text') class MyBot(Bot): "Uppering-reversing bot" def default_response(self, in_message): return in_message[::-1].upper() bot = MyBot() endpoint = TelegramEndpoint( token='123:ABC' ) bot.add_endpoint(endpoint) bot.run() handlers_added = [args for args, kwargs in mock_messagehandler.call_args_list] assert len(handlers_added) > 0 generic_handler = list(handler for filter_, handler in handlers_added)[-1] message = 'this is the message' generic_handler(bot, create_telegram_update(message)) reply_text_m.assert_called_with( bot.default_response(message)) bot.stop()
def filter(self, message: Message): return message.from_user.id in self.active_users
def post_title(self, post: Post) -> Message: text = self.format_header(post) return self.bot.send_message(chat_id=self.channel_id, text=text, disable_web_page_preview=True, timeout=self.timeout)
def post_image(self, image: Image, is_album) -> Message: text = self.format_caption(image) if is_album else '' kwargs = { 'caption': text, 'chat_id': self.channel_id, 'disable_notification': True, 'timeout': self.timeout, } if image.animated: return self.bot.send_video(video=image.url, **kwargs) else: return self.bot.send_photo(photo=image.url, **kwargs)
def download(self): if os.path.exists(self.path): print("[FileDownloader]", "Exists", self.path) return r = requests.get(self.url, **self.requests_kwargs) with open(self.path, 'wb') as f: f.write(r.content) # Message attributing functions # Below this line is the user name processing function used by mybot.py
def get_forward_from(message: telegram.Message): """ Combine forwarded user's first name and last name and format into (forwarded from xxx) :param message: the forwarded message :return: combined (forwarded from xxx) """ if not message: return '' if not message.forward_from: return '' if message.forward_from.id == global_vars.tg_bot_id: if message.caption: message_text = message.caption elif message.text: message_text = message.text else: message_text = '' right_end = message_text.find(':') if right_end != -1: # from qq result = message_text[:right_end] else: # self generated command text, etc. result = '' else: result = get_full_user_name(message.forward_from) return '(?' + result + ')'
def video_from_telegram(bot: telegram.Bot, update: telegram.Update): if update.message: message: telegram.Message = update.message edited = False else: message: telegram.Message = update.edited_message edited = True tg_group_id = message.chat_id # telegram group id forward_index = get_forward_index(tg_group_id=tg_group_id) reply_entity = list() reply_entity.append({ 'type': 'text', 'data': {'text': '[ ?? ]'} }) qq_message_id = send_from_tg_to_qq(forward_index, message=reply_entity, tg_group_id=tg_group_id, tg_user=message.from_user, tg_forward_from=message, tg_reply_to=message.reply_to_message, edited=edited) global_vars.mdb.append_message(qq_message_id, message.message_id, forward_index, 0)
def document_from_telegram(bot: telegram.Bot, update: telegram.Update): if update.message: message: telegram.Message = update.message edited = False else: message: telegram.Message = update.edited_message edited = True tg_group_id = message.chat_id # telegram group id forward_index = get_forward_index(tg_group_id=tg_group_id) reply_entity = list() reply_entity.append({ 'type': 'text', 'data': {'text': '[ ?? ]'} }) qq_message_id = send_from_tg_to_qq(forward_index, message=reply_entity, tg_group_id=tg_group_id, tg_user=message.from_user, tg_forward_from=message, tg_reply_to=message.reply_to_message, edited=edited) global_vars.mdb.append_message(qq_message_id, message.message_id, forward_index, 0)
def sticker_from_telegram(bot: telegram.Bot, update: telegram.Update): message: telegram.Message = update.message tg_group_id = message.chat_id # telegram group id forward_index = get_forward_index(tg_group_id=tg_group_id) reply_entity = list() file_id = update.message.sticker.file_id if JQ_MODE: tg_get_pic_url(file_id, 'png') reply_entity.append({ 'type': 'image', 'data': {'file': file_id + '.png'} }) elif IMAGE_LINK_MODE[forward_index]: pic_url = tg_get_pic_url(file_id, 'png') reply_entity.append({ 'type': 'text', 'data': {'text': '[ ' + message.sticker.emoji + ' sticker, ?????' + pic_url + ' ]'} }) else: reply_entity.append({ 'type': 'text', 'data': {'text': '[ ' + message.sticker.emoji + ' sticker ]'} }) qq_message_id = send_from_tg_to_qq(forward_index, message=reply_entity, tg_group_id=tg_group_id, tg_user=message.from_user, tg_forward_from=message, tg_reply_to=message.reply_to_message) global_vars.mdb.append_message(qq_message_id, message.message_id, forward_index, 0)
def text_from_telegram(bot: telegram.Bot, update: telegram.Update): if update.message: message: telegram.Message = update.message edited = False else: message: telegram.Message = update.edited_message edited = True tg_group_id = message.chat_id # telegram group id forward_index = get_forward_index(tg_group_id=tg_group_id) if edited: recall_message(forward_index, message) if message.text.startswith('//'): return reply_entity = list() reply_entity.append({ 'type': 'text', 'data': {'text': message.text} }) qq_message_id = send_from_tg_to_qq(forward_index, message=reply_entity, tg_group_id=tg_group_id, tg_user=message.from_user, tg_forward_from=message, tg_reply_to=message.reply_to_message, edited=edited) global_vars.mdb.append_message(qq_message_id, message.message_id, forward_index, 0)
def show_tg_group_id(tg_group_id: int, tg_user: telegram.User, tg_message_id: int, tg_reply_to: telegram.Message): msg = 'Telegram group id is: ' + str(tg_group_id) global_vars.tg_bot.sendMessage(chat_id=tg_group_id, text=msg)
def tg_command(bot: telegram.Bot, update: telegram.Update): if update.edited_message: # handle edit message: telegram.Message = update.edited_message else: message: telegram.Message = update.message if not message.text.startswith('!!'): # no command indicator return tg_group_id = message.chat_id # telegram group id tg_reply_to = message.reply_to_message text = message.text[2:] for command in global_vars.command_list: # process all non-forward commands if command.tg_only and (text == command.command or text == command.short_command): command.handler(tg_group_id=tg_group_id, tg_user=message.from_user, tg_message_id=message.message_id, tg_reply_to=tg_reply_to) raise DispatcherHandlerStop() forward_index = get_forward_index(tg_group_id=tg_group_id) if forward_index == -1: raise DispatcherHandlerStop() for command in global_vars.command_list: # process all forward commands if not command.tg_only and not command.qq_only and (text == command.command or text == command.short_command): command.handler(forward_index, tg_user=message.from_user, tg_group_id=tg_group_id, tg_message_id=message.message_id, tg_reply_to=tg_reply_to) raise DispatcherHandlerStop()
def command_tg(tg_group_id: int, tg_user: telegram.User, tg_message_id: int, tg_reply_to: telegram.Message = None): result = '''I'm a relay bot between qq and tg. Please use "!!show commands" or "!!cmd" to show all commands. ''' global_vars.tg_bot.sendMessage(chat_id=tg_group_id, text=result, reply_to_message_id=tg_message_id, parse_mode='HTML')
def tg_water_meter(bot: telegram.Bot, update: telegram.Update): if update.message: message: telegram.Message = update.message else: message: telegram.Message = update.edited_message tg_group_id = message.chat_id # telegram group id forward_index = get_forward_index(tg_group_id=int(tg_group_id)) logger.debug("Water meter processing") if message.forward_from_chat and message.forward_from_chat.type == 'channel': logger.debug("message is forward from channel") if update.message.forward_from_chat.id in global_vars.filter_list['channels']: logger.debug("message is blocked") global_vars.drive_mode_on(forward_index, tg_user=message.from_user, tg_group_id=tg_group_id, tg_message_id=message.message_id) raise DispatcherHandlerStop() message_text = '' if message.caption: message_text = message.caption elif message.text: message_text = message.text if not message_text: return for keyword in global_vars.filter_list['keywords']: if keyword in message_text: logger.debug("message is blocked") global_vars.drive_mode_on(forward_index, tg_user=message.from_user, tg_group_id=tg_group_id, tg_message_id=message.message_id) raise DispatcherHandlerStop()
def test_authorized_only(default_conf, mocker): mocker.patch.dict('freqtrade.rpc.telegram._CONF', default_conf) chat = Chat(0, 0) update = Update(randint(1, 100)) update.message = Message(randint(1, 100), 0, datetime.utcnow(), chat) state = {'called': False} @authorized_only def dummy_handler(*args, **kwargs) -> None: state['called'] = True dummy_handler(MagicMock(), update) assert state['called'] is True
def test_authorized_only_unauthorized(default_conf, mocker): mocker.patch.dict('freqtrade.rpc.telegram._CONF', default_conf) chat = Chat(0xdeadbeef, 0) update = Update(randint(1, 100)) update.message = Message(randint(1, 100), 0, datetime.utcnow(), chat) state = {'called': False} @authorized_only def dummy_handler(*args, **kwargs) -> None: state['called'] = True dummy_handler(MagicMock(), update) assert state['called'] is False
def test_authorized_only_exception(default_conf, mocker): mocker.patch.dict('freqtrade.rpc.telegram._CONF', default_conf) update = Update(randint(1, 100)) update.message = Message(randint(1, 100), 0, datetime.utcnow(), Chat(0, 0)) @authorized_only def dummy_handler(*args, **kwargs) -> None: raise Exception('test') dummy_handler(MagicMock(), update)
def update(): _update = Update(0) _update.message = Message(0, 0, datetime.utcnow(), Chat(0, 0)) return _update
def handler_wrapper(func): def wrap(self, _, update, *args, **kwargs): assert isinstance(User.query, Query) assert isinstance(update.message, Message) tguser = update.message.from_user assert isinstance(tguser, TgUser) user = User.query.filter(User.tg_id == tguser.id).one_or_none() now = datetime.now() if not user: user = User( tg_id=tguser.id, first_name=tguser.first_name, last_name=tguser.last_name or '', username=tguser.username, created_at=now, last_active_at=now, ) db.session.add(user) db.session.commit() else: user.first_name = tguser.first_name user.last_name = tguser.last_name or '' user.username = tguser.username user.last_active_at = now user.is_active = True user.update = update user.message = update.message try: func(self, user, *args, **kwargs) except Flow: pass db.session.commit() return wrap
def __init__(self, id, text, chat_id, reply_id=None): from_user = telegram.User( id=1, first_name="He", last_name="Man", username="heman") reply_to_user = telegram.User( id=2, first_name="She", last_name="Ra", username="shera") chat = telegram.Chat(id=chat_id, type="group") reply_to_message = reply_id and telegram.Message(reply_id, reply_to_user, None, chat) super().__init__( message_id=id, date=datetime.now(), text=text, chat=chat, from_user=from_user, reply_to_message=reply_to_message, )
def _reply_error(bot, update, errmsg): """ A wrap that directly reply a message with error details. Returns: telegram.Message: Message sent """ return bot.send_message(update.message.chat.id, errmsg, reply_to_message_id=update.message.message_id)
def link_chat_exec(self, bot, tg_chat_id, tg_msg_id, callback_uid): """ Action to link a chat. Triggered by callback message with status `Flags.EXEC_LINK`. Args: bot: Telegram Bot instance tg_chat_id: Chat ID tg_msg_id: Message ID triggered the callback callback_uid: Callback message """ if callback_uid == Flags.CANCEL_PROCESS: txt = "Cancelled." self.msg_status.pop("%s.%s" % (tg_chat_id, tg_msg_id), None) self.msg_storage.pop("%s.%s" % (tg_chat_id, tg_msg_id), None) return bot.editMessageText(text=txt, chat_id=tg_chat_id, message_id=tg_msg_id) cmd, chat_lid = callback_uid.split() chat = self.msg_storage["%s.%s" % (tg_chat_id, tg_msg_id)]['chats'][int(chat_lid)] chat_uid = "%s.%s" % (chat['channel_id'], chat['chat_uid']) chat_display_name = chat['chat_name'] if chat['chat_name'] == chat['chat_alias'] else "%s (%s)" % ( chat['chat_alias'], chat['chat_name']) chat_display_name = "'%s' from '%s %s'" % (chat_display_name, chat['channel_emoji'], chat['channel_name']) \ if chat['channel_name'] else "'%s'" % chat_display_name self.msg_status.pop("%s.%s" % (tg_chat_id, tg_msg_id), None) self.msg_storage.pop("%s.%s" % (tg_chat_id, tg_msg_id), None) if cmd == "unlink": db.remove_chat_assoc(slave_uid=chat_uid) txt = "Chat %s is restored." % chat_display_name return bot.editMessageText(text=txt, chat_id=tg_chat_id, message_id=tg_msg_id) if cmd == "mute": db.remove_chat_assoc(slave_uid=chat_uid) db.add_chat_assoc(slave_uid=chat_uid, master_uid=self.MUTE_CHAT_ID, multiple_slave=True) txt = "Chat %s is now muted." % chat_display_name return bot.editMessageText(text=txt, chat_id=tg_chat_id, message_id=tg_msg_id) txt = "Command '%s' (%s) is not recognised, please try again" % (cmd, callback_uid) bot.editMessageText(text=txt, chat_id=tg_chat_id, message_id=tg_msg_id)
def command_exec(self, bot, chat_id, message_id, callback): """ Run a command from a command message. Triggered by callback message with status `Flags.COMMAND_PENDING`. Args: bot: Telegram Bot instance chat_id: Chat ID message_id: Message ID triggered the callback callback: Callback message """ if not callback.isdecimal(): msg = "Invalid parameter: %s. (CE01)" % callback self.msg_status.pop("%s.%s" % (chat_id, message_id), None) self.msg_storage.pop("%s.%s" % (chat_id, message_id), None) return bot.editMessageText(text=msg, chat_id=chat_id, message_id=message_id) elif not (0 <= int(callback) < len(self.msg_storage["%s.%s" % (chat_id, message_id)])): msg = "Index out of bound: %s. (CE02)" % callback self.msg_status.pop("%s.%s" % (chat_id, message_id), None) self.msg_storage.pop("%s.%s" % (chat_id, message_id), None) return bot.editMessageText(text=msg, chat_id=chat_id, message_id=message_id) callback = int(callback) channel_id = self.msg_storage["%s.%s" % (chat_id, message_id)]['channel'] command = self.msg_storage["%s.%s" % (chat_id, message_id)]['commands'][callback] msg = self.msg_storage["%s.%s" % (chat_id, message_id)]['text'] + "\n------\n" + getattr( self.slaves[channel_id], command['callable'])(*command['args'], **command['kwargs']) self.msg_status.pop("%s.%s" % (chat_id, message_id), None) self.msg_storage.pop("%s.%s" % (chat_id, message_id), None) return bot.editMessageText(text=msg, chat_id=chat_id, message_id=message_id)
def msg(self, bot, update): """ Process, wrap and deliver messages from user. Args: bot: Telegram Bot instance update: Message update """ self.logger.debug("----\nMsg from tg user:\n%s", update.message.to_dict()) multi_slaves = False if update.message.chat.id != update.message.from_user.id: # from group assocs = db.get_chat_assoc(master_uid="%s.%s" % (self.channel_id, update.message.chat.id)) if len(assocs) > 1: multi_slaves = True reply_to = bool(getattr(update.message, "reply_to_message", None)) private_chat = update.message.chat.id == update.message.from_user.id if (private_chat or multi_slaves) and not reply_to: candidates = db.get_recent_slave_chats(update.message.chat.id) or\ db.get_chat_assoc(master_uid="%s.%s" % (self.channel_id, update.message.chat.id))[:5] if candidates: tg_err_msg = update.message.reply_text("Error: No recipient specified.\n" "Please reply to a previous message.", quote=True) storage_id = "%s.%s" % (update.message.chat.id, tg_err_msg.message_id) legends, buttons = self.slave_chats_pagination(storage_id, 0, fchats=candidates) tg_err_msg.edit_text("Error: No recipient specified.\n" "Please reply to a previous message, " "or choose a recipient:\n\nLegend:\n" + "\n".join(legends), reply_markup=telegram.InlineKeyboardMarkup(buttons)) self.msg_status[storage_id] = Flags.SUGGEST_RECIPIENT self.msg_storage[storage_id]["update"] = update.to_dict() else: update.message.reply_text("Error: No recipient specified.\n" "Please reply to a previous message.", quote=True) else: return self.process_telegram_message(bot, update)
def auto_spin(bot: Bot, job: Job): from telegram import Message, Chat u = Update(0, message=Message(0, None, 0, Chat(job.context, ''))) if core.results_today.get(job.context) is None: do_the_spin(bot, u)
def test_telegram_command(mocker): """ Test that the endpoint class correctly registers the commands callback and that the Telegram bot uses them to reply to messages. """ mocker.patch('eddie.endpoints.telegram.Updater') mock_commandhandler = mocker.patch( 'eddie.endpoints.telegram.CommandHandler') reply_text_m = mocker.patch('telegram.Message.reply_text') class MyBot(Bot): "Reversing bot" def default_response(self, in_message): return in_message[::-1] @command def start(self): "start command" return 'start command has been called' @command def other(self): "other command" return 'other command has been called' bot = MyBot() endpoint = TelegramEndpoint( token='123:ABC' ) bot.add_endpoint(endpoint) bot.run() commands_added = [args for args, kwargs in mock_commandhandler.call_args_list] commands_added = dict((name, handler) for name, handler in commands_added) assert 'start' in commands_added assert 'other' in commands_added commands_added['start'](bot, create_telegram_update('/start')) reply_text_m.assert_called_with(bot.start()) commands_added['other'](bot, create_telegram_update('/other')) reply_text_m.assert_called_with(bot.other()) bot.stop()
def send_from_tg_to_qq(forward_index: int, message: list, tg_group_id: int, tg_user: telegram.User=None, tg_forward_from: telegram.Message=None, tg_reply_to: telegram.Message=None, edited: bool=False, auto_escape: bool=True) -> int: """ send message from telegram to qq :param forward_index: forward group index :param message: message in cq-http-api like format :param tg_group_id: telegram group id :param tg_user: telegram user who send this message :param tg_forward_from: who the message is forwarded from :param tg_reply_to: who the message is replied to :param edited: the status of edition :param auto_escape: if contain coolq code, pass False :return: qq message id """ logger.debug("tg -> qq: " + str(message)) sender_name = get_full_user_name(tg_user) forward_from = get_forward_from(tg_forward_from) reply_to = get_reply_to(tg_reply_to, forward_index) if edited: # if edited, add edit mark edit_mark = ' E ' # ' ? ' else: edit_mark = '' message_attribute = sender_name + reply_to + forward_from + edit_mark + ': ' if sender_name: # insert extra info at beginning message.insert(0, { 'type': 'text', 'data': {'text': message_attribute} }) if FORWARD_LIST[forward_index].get('QQ'): return global_vars.qq_bot.send_group_msg(group_id=FORWARD_LIST[forward_index]['QQ'], message=message, auto_escape=auto_escape)['message_id'] if FORWARD_LIST[forward_index].get('DISCUSS'): return global_vars.qq_bot.send_discuss_msg(discuss_id=FORWARD_LIST[forward_index]['DISCUSS'], message=message, auto_escape=auto_escape)['message_id']
def send_from_qq_to_tg(forward_index: int, message: list, qq_group_id: int = 0, qq_discuss_id: int = 0, qq_user: int=None) -> list: """ send message from qq to telegram :param forward_index: forward group index :param message: message in cq-http-api like format :param qq_group_id: which group this message came from, can be None if qq_discuss_id is not None :param qq_discuss_id: which discuss this message came from, can be None if qq_group_id is not None :param qq_user: which user sent this message :return: telegram.Message list """ logger.debug('qq -> tg: ' + str(message)) message_list = divide_qq_message(forward_index, message) message_count = len(message_list) telegram_message_id_list = list() for idx, message_part in enumerate(message_list): if message_count == 1: message_index_attribute = '' else: message_index_attribute = '(' + str(idx + 1) + '/' + str(message_count) + ')' if message_part.get('image'): filename = message_part['image'] cq_get_pic_url(filename) cq_download_pic(filename) pic = open(os.path.join(CQ_IMAGE_ROOT, filename), 'rb') if message_part.get('text'): full_msg = get_qq_name(qq_user, forward_index) + ': ' \ + message_index_attribute + message_part['text'] else: full_msg = get_qq_name(qq_user, forward_index) + ': ' + message_index_attribute if filename.lower().endswith('gif'): # gif pictures send as document _msg: telegram.Message = global_vars.tg_bot.sendDocument(FORWARD_LIST[forward_index]['TG'], pic, caption=full_msg) else: # jpg/png pictures send as photo _msg: telegram.Message = global_vars.tg_bot.sendPhoto(FORWARD_LIST[forward_index]['TG'], pic, caption=full_msg) else: # only first message could be pure text if qq_user: full_msg_bold = '<b>' + get_qq_name(qq_user, forward_index) + '</b>: ' + \ message_index_attribute +\ message_list[0]['text'] else: full_msg_bold = message_index_attribute + message_list[0]['text'] _msg: telegram.Message = global_vars.tg_bot.sendMessage(FORWARD_LIST[forward_index]['TG'], full_msg_bold, parse_mode='HTML') telegram_message_id_list.append(_msg.message_id) return telegram_message_id_list
def photo_from_telegram(bot: telegram.Bot, update: telegram.Update): if update.message: message: telegram.Message = update.message edited = False else: message: telegram.Message = update.edited_message edited = True tg_group_id = message.chat_id # telegram group id forward_index = get_forward_index(tg_group_id=tg_group_id) if edited: recall_message(forward_index, message) reply_entity = list() file_id = message.photo[-1].file_id pic_url = tg_get_pic_url(file_id, 'jpg') if JQ_MODE: reply_entity.append({ 'type': 'image', 'data': {'file': file_id + '.jpg'} }) if message.caption: reply_entity.append({ 'type': 'text', 'data': {'text': message.caption} }) else: if message.caption: reply_entity.append({ 'type': 'text', 'data': {'text': '[ ??, ?????' + pic_url + ' ]' + message.caption} }) else: reply_entity.append({ 'type': 'text', 'data': {'text': '[ ??, ?????' + pic_url + ' ]'} }) qq_message_id = send_from_tg_to_qq(forward_index, message=reply_entity, tg_group_id=tg_group_id, tg_user=message.from_user, tg_forward_from=message, tg_reply_to=message.reply_to_message, edited=edited) global_vars.mdb.append_message(qq_message_id, message.message_id, forward_index, 0)
def feed(self, message: telegram.Message) -> bool: """Give a telegram message to search for a tag The message will be added to the digest if has a tag or is a reply to a previous message with tag. Returns: bool: Indicate if the message was added to the digest """ # Verify if message is allowed to digest if not self.db.exists(ConfigChat, chat_id=message.chat_id): return False # Extract tag from the message text_tag = extract_hashtag(message.text) # Check early if the message has a tag or can be a reply to a tagged message if not (text_tag or message.reply_to_message): return False # Get the user who sent the message. hashuser = self.db.get(HashUser, id=message.from_user.id) if not hashuser: hashuser = HashUser( id=message.from_user.id, friendly_name=self.make_friendly_name(message.from_user), username=message.from_user.username, ) # A tag was found in the message? reply_id = None if text_tag: # Add a tag entry if necessary tag_id = self.db.generate_tag_id(text_tag) if not self.db.exists(HashTag, id=tag_id): tag = HashTag(id=tag_id, shapes={text_tag}) # Or fetch tag and add a possible new shape else: tag = self.db.get(HashTag, id=tag_id) tag.shapes.add(text_tag) # Otherwise, the message may be a reply to a previous tagged message. else: reply_id = message.reply_to_message.message_id tag = self.db.get_message_tag(reply_id) if not tag: return False # Create a HashMessage from the telegram message hashmessage = HashMessage( id=message.message_id, date=message.date, text=message.text, chat_id=message.chat_id, reply_to=reply_id, ) hashmessage.tag = tag hashmessage.user = hashuser # Add the hashmessage to the database self.db.insert(hashmessage) return True