| """ |
| Telegram客户端适配器 |
| 适配python-telegram-bot库到我们的接口 |
| """ |
| |
| import logging |
| from typing import Union, Optional |
| from telegram import Bot, Message |
| from telegram.constants import ChatAction |
| from telegram.ext import Application |
| |
| from .interfaces import ITelegramClient |
| |
| |
| logger = logging.getLogger(__name__) |
| |
| |
| class TelegramClientAdapter(ITelegramClient): |
| """Telegram客户端适配器""" |
| |
| def __init__(self, bot: Bot): |
| """ |
| 初始化Telegram客户端适配器 |
| |
| Args: |
| bot: Telegram Bot实例 |
| """ |
| self.bot = bot |
| |
| async def send_message( |
| self, |
| chat_id: Union[int, str], |
| text: str, |
| parse_mode: Optional[str] = None, |
| reply_to_message_id: Optional[int] = None |
| ) -> Message: |
| """发送消息""" |
| return await self.bot.send_message( |
| chat_id=chat_id, |
| text=text, |
| parse_mode=parse_mode, |
| reply_to_message_id=reply_to_message_id |
| ) |
| |
| async def edit_message_text( |
| self, |
| text: str, |
| chat_id: Union[int, str], |
| message_id: int, |
| parse_mode: Optional[str] = None |
| ) -> Union[Message, bool]: |
| """编辑消息文本""" |
| return await self.bot.edit_message_text( |
| text=text, |
| chat_id=chat_id, |
| message_id=message_id, |
| parse_mode=parse_mode |
| ) |
| |
| async def get_file(self, file_id: str): |
| """获取文件对象""" |
| return await self.bot.get_file(file_id) |
| |
| async def send_photo( |
| self, |
| chat_id: Union[int, str], |
| photo: Union[str, bytes], |
| caption: Optional[str] = None |
| ) -> Message: |
| """发送图片""" |
| return await self.bot.send_photo( |
| chat_id=chat_id, |
| photo=photo, |
| caption=caption |
| ) |
| |
| async def send_document( |
| self, |
| chat_id: Union[int, str], |
| document: Union[str, bytes], |
| caption: Optional[str] = None |
| ) -> Message: |
| """发送文档""" |
| return await self.bot.send_document( |
| chat_id=chat_id, |
| document=document, |
| caption=caption |
| ) |