blob: 8ff073fc6ff0629a223e0993fa99b4073c74e981 [file] [log] [blame] [raw]
"""
Telegram Bot的Webhook集成扩展
专门为Telegram Bot提供的Webhook集成实现
"""
import asyncio
import logging
from typing import Optional, Dict, Any, Callable
from .integration import BotWebhookIntegration
from .models import BotMessage, WebhookConfig
from ..telegram.interfaces import ITelegramClient
logger = logging.getLogger(__name__)
class TelegramWebhookIntegration(BotWebhookIntegration):
"""
Telegram Bot专用的Webhook集成
扩展基础的BotWebhookIntegration,添加与Telegram Bot特定组件的集成
"""
def __init__(
self,
telegram_client: ITelegramClient,
webhook_config: WebhookConfig,
bot_username: str,
subscribed_groups: list[int],
message_saver: Optional[Callable] = None
):
"""
初始化Telegram Webhook集成
Args:
telegram_client: Telegram客户端
webhook_config: Webhook配置
bot_username: Bot用户名
subscribed_groups: 订阅的群组ID列表
message_saver: 消息保存回调函数
"""
super().__init__(telegram_client, webhook_config, bot_username, subscribed_groups)
# 消息保存回调
self.message_saver = message_saver
async def _save_webhook_message_to_context(
self,
chat_id: str,
user_id: int,
message: str,
user_info: Dict[str, Any]
):
"""
保存Webhook消息到上下文(重写基类方法)
通过message_saver回调函数保存消息到实际的上下文管理器
"""
try:
if self.message_saver:
if asyncio.iscoroutinefunction(self.message_saver):
await self.message_saver(chat_id, user_id, message, False, user_info)
else:
self.message_saver(chat_id, user_id, message, False, user_info)
logger.debug(f"Webhook消息已通过回调保存: {chat_id}")
else:
logger.warning("未设置消息保存回调,Webhook消息未保存")
except Exception as e:
logger.error(f"保存Webhook消息到上下文失败: {e}")
def set_message_saver(self, message_saver: Callable):
"""
设置消息保存回调函数
Args:
message_saver: 消息保存回调函数
签名: async def save_message(chat_id: str, user_id: int, message: str, is_bot: bool, user_info: dict)
"""
self.message_saver = message_saver
logger.info("消息保存回调函数已设置")
def create_webhook_integration_from_config(
telegram_client: ITelegramClient,
webhook_config_dict: Dict[str, Any],
bot_username: str,
message_saver: Optional[Callable] = None
) -> Optional[TelegramWebhookIntegration]:
"""
从配置字典创建Webhook集成实例
Args:
telegram_client: Telegram客户端
webhook_config_dict: Webhook配置字典
bot_username: Bot用户名
message_saver: 消息保存回调函数
Returns:
TelegramWebhookIntegration实例,如果未启用则返回None
"""
try:
# 检查是否启用Webhook
if not webhook_config_dict.get('enabled', False):
logger.info("Webhook功能未启用")
return None
# 验证必需的配置项
required_keys = ['server_url', 'auth_token']
for key in required_keys:
if not webhook_config_dict.get(key):
logger.warning(f"缺少必需的Webhook配置项: {key}")
return None
# 获取客户端配置
client_config = webhook_config_dict.get('client', {})
subscribed_groups = client_config.get('subscribed_groups', [])
if not subscribed_groups:
logger.info("未配置订阅群组,Webhook功能将被禁用")
return None
# 创建Webhook配置对象
webhook_config = WebhookConfig(
webhook_url=webhook_config_dict['server_url'],
auth_token=webhook_config_dict['auth_token'],
enabled_groups=subscribed_groups,
connection_timeout=client_config.get('connection_timeout', 10.0),
request_timeout=client_config.get('request_timeout', 5.0),
max_retries=client_config.get('max_retries', 3),
retry_delay=client_config.get('retry_delay', 1.0)
)
# 创建集成实例
integration = TelegramWebhookIntegration(
telegram_client=telegram_client,
webhook_config=webhook_config,
bot_username=bot_username,
subscribed_groups=subscribed_groups,
message_saver=message_saver
)
logger.info(f"Webhook集成创建成功: {bot_username}, 订阅群组: {subscribed_groups}")
return integration
except Exception as e:
logger.error(f"创建Webhook集成失败: {e}")
return None