blob: d714d808113fb36bb34014f67c245cea3fc69e00 [file] [log] [blame] [raw]
"""
Telegram客户端适配器
适配python-telegram-bot库到我们的接口
"""
import logging
from typing import Union, Optional
from telegram import Bot, Message
from telegram.constants import ChatAction
from telegram.ext import Application
from .interfaces import ITelegramClient
logger = logging.getLogger(__name__)
class TelegramClientAdapter(ITelegramClient):
"""Telegram客户端适配器"""
def __init__(self, bot: Bot):
"""
初始化Telegram客户端适配器
Args:
bot: Telegram Bot实例
"""
self.bot = bot
async def send_message(
self,
chat_id: Union[int, str],
text: str,
parse_mode: Optional[str] = None,
reply_to_message_id: Optional[int] = None
) -> Message:
"""发送消息"""
return await self.bot.send_message(
chat_id=chat_id,
text=text,
parse_mode=parse_mode,
reply_to_message_id=reply_to_message_id
)
async def edit_message_text(
self,
text: str,
chat_id: Union[int, str],
message_id: int,
parse_mode: Optional[str] = None
) -> Union[Message, bool]:
"""编辑消息文本"""
return await self.bot.edit_message_text(
text=text,
chat_id=chat_id,
message_id=message_id,
parse_mode=parse_mode
)
async def get_file(self, file_id: str):
"""获取文件对象"""
return await self.bot.get_file(file_id)
async def send_photo(
self,
chat_id: Union[int, str],
photo: Union[str, bytes],
caption: Optional[str] = None
) -> Message:
"""发送图片"""
return await self.bot.send_photo(
chat_id=chat_id,
photo=photo,
caption=caption
)
async def send_document(
self,
chat_id: Union[int, str],
document: Union[str, bytes],
caption: Optional[str] = None
) -> Message:
"""发送文档"""
return await self.bot.send_document(
chat_id=chat_id,
document=document,
caption=caption
)