| """ |
| Bot Webhook集成模块 |
| 将Webhook功能集成到现有的Telegram Bot中 |
| """ |
| |
| import asyncio |
| import logging |
| from typing import Optional, Dict, Any |
| from datetime import datetime |
| |
| from .client import WebhookClient |
| from .models import BotMessage, UserInfo, ReplyInfo, WebhookConfig |
| from ..telegram.interfaces import ITelegramClient |
| |
| |
| logger = logging.getLogger(__name__) |
| |
| |
| class BotWebhookIntegration: |
| """Bot Webhook集成类""" |
| |
| def __init__( |
| self, |
| telegram_client: ITelegramClient, |
| webhook_config: WebhookConfig, |
| bot_username: str, |
| subscribed_groups: list[int] |
| ): |
| """ |
| 初始化Webhook集成 |
| |
| Args: |
| telegram_client: Telegram客户端 |
| webhook_config: Webhook配置 |
| bot_username: Bot用户名 |
| subscribed_groups: 订阅的群组ID列表 |
| """ |
| self.telegram_client = telegram_client |
| self.webhook_config = webhook_config |
| self.bot_username = bot_username |
| self.subscribed_groups = subscribed_groups |
| |
| # Webhook客户端 |
| self.webhook_client: Optional[WebhookClient] = None |
| |
| # 消息处理回调 |
| self.message_callback = self._handle_webhook_message |
| |
| # 启动状态 |
| self.is_running = False |
| |
| async def start(self, callback_port: Optional[int] = None): |
| """启动Webhook集成""" |
| try: |
| # 创建Webhook客户端 |
| self.webhook_client = WebhookClient( |
| config=self.webhook_config, |
| bot_username=self.bot_username, |
| subscribed_groups=self.subscribed_groups, |
| message_handler=self.message_callback |
| ) |
| |
| # 启动客户端 |
| await self.webhook_client.start(callback_port) |
| self.is_running = True |
| |
| logger.info(f"Bot Webhook集成启动成功: {self.bot_username}") |
| |
| except Exception as e: |
| logger.error(f"启动Webhook集成失败: {e}") |
| raise |
| |
| async def stop(self): |
| """停止Webhook集成""" |
| try: |
| if self.webhook_client: |
| await self.webhook_client.stop() |
| self.webhook_client = None |
| |
| self.is_running = False |
| logger.info(f"Bot Webhook集成已停止: {self.bot_username}") |
| |
| except Exception as e: |
| logger.error(f"停止Webhook集成失败: {e}") |
| |
| async def broadcast_bot_message( |
| self, |
| group_id: int, |
| message_content: str, |
| sender_info: Dict[str, Any], |
| reply_info: Optional[Dict[str, Any]] = None, |
| telegram_message_id: Optional[int] = None, |
| message_type: str = "text" |
| ) -> bool: |
| """ |
| 广播Bot消息到Webhook服务器 |
| |
| Args: |
| group_id: 群组ID |
| message_content: 消息内容 |
| sender_info: 发送者信息 |
| reply_info: 回复信息(可选) |
| telegram_message_id: Telegram消息ID(可选) |
| message_type: 消息类型 |
| |
| Returns: |
| 是否广播成功 |
| """ |
| if not self.webhook_client or not self.is_running: |
| logger.warning("Webhook集成未启动,无法广播消息") |
| return False |
| |
| try: |
| # 构建用户信息 |
| user_info = UserInfo( |
| user_id=sender_info.get('user_id', 0), |
| username=sender_info.get('username'), |
| first_name=sender_info.get('first_name'), |
| last_name=sender_info.get('last_name'), |
| is_bot=sender_info.get('is_bot', True) |
| ) |
| |
| # 构建回复信息 |
| webhook_reply_info = None |
| if reply_info: |
| reply_user_info = reply_info.get('user_info', {}) |
| webhook_reply_info = ReplyInfo( |
| user_info=UserInfo( |
| user_id=reply_user_info.get('user_id', 0), |
| username=reply_user_info.get('username'), |
| first_name=reply_user_info.get('first_name'), |
| last_name=reply_user_info.get('last_name'), |
| is_bot=reply_user_info.get('is_bot', False) |
| ), |
| timestamp=reply_info.get('timestamp'), |
| content=reply_info.get('content', ''), |
| message_id=reply_info.get('message_id') |
| ) |
| |
| # 构建消息对象 |
| bot_message = BotMessage( |
| bot_username=self.bot_username, |
| group_id=group_id, |
| message_content=message_content, |
| message_type=message_type, |
| sender_info=user_info, |
| reply_info=webhook_reply_info, |
| telegram_message_id=telegram_message_id, |
| timestamp=datetime.now() |
| ) |
| |
| # 广播消息 |
| success = await self.webhook_client.broadcast_message(bot_message) |
| |
| if success: |
| logger.debug(f"Bot消息广播成功: {bot_message.message_id}") |
| else: |
| logger.warning(f"Bot消息广播失败: {bot_message.message_id}") |
| |
| return success |
| |
| except Exception as e: |
| logger.error(f"广播Bot消息异常: {e}") |
| return False |
| |
| async def _handle_webhook_message(self, message: BotMessage): |
| """ |
| 处理来自Webhook的消息 |
| |
| 这个方法会被调用当其他Bot发送消息时, |
| 将消息转换为适合当前Bot处理的格式 |
| """ |
| try: |
| logger.debug(f"收到Webhook消息: {message.bot_username} -> 群组 {message.group_id}") |
| |
| # 检查是否是我们订阅的群组 |
| if message.group_id not in self.subscribed_groups: |
| logger.debug(f"忽略非订阅群组的消息: {message.group_id}") |
| return |
| |
| # 检查是否是我们自己发送的消息(双重保险) |
| if message.bot_username == self.bot_username: |
| logger.debug("忽略自己发送的消息") |
| return |
| |
| # 转换消息格式并保存到上下文 |
| await self._process_bot_message_to_context(message) |
| |
| logger.info(f"成功处理来自 {message.bot_username} 的Webhook消息") |
| |
| except Exception as e: |
| logger.error(f"处理Webhook消息失败: {e}") |
| |
| async def _process_bot_message_to_context(self, message: BotMessage): |
| """ |
| 将Webhook消息处理为上下文消息 |
| |
| 这个方法需要与现有的上下文管理系统集成, |
| 将其他Bot的消息作为"虚拟"消息添加到对话历史中 |
| """ |
| try: |
| # 这里需要根据实际的上下文管理器接口来实现 |
| # 由于我们有多种实现(ContextManager和Claude Agent), |
| # 这里提供一个通用的处理框架 |
| |
| chat_id = str(message.group_id) |
| |
| # 构建用户显示名 |
| sender_info = message.sender_info |
| if sender_info.is_bot: |
| user_display = f"🤖 {message.bot_username}" |
| elif sender_info.username: |
| user_display = f"@{sender_info.username}" |
| elif sender_info.first_name: |
| display_name = sender_info.first_name |
| if sender_info.last_name: |
| display_name += f" {sender_info.last_name}" |
| user_display = display_name |
| else: |
| user_display = f"用户{sender_info.user_id}" |
| |
| # 构建时间戳 |
| time_str = message.timestamp.strftime('%H:%M') |
| |
| # 构建回复上下文 |
| reply_context = "" |
| if message.reply_info: |
| reply_info = message.reply_info |
| reply_user_info = reply_info.user_info |
| |
| if reply_user_info.is_bot: |
| reply_author = "🤖 Bot" |
| elif reply_user_info.username: |
| reply_author = f"@{reply_user_info.username}" |
| elif reply_user_info.first_name: |
| reply_author = reply_user_info.first_name |
| if reply_user_info.last_name: |
| reply_author += f" {reply_user_info.last_name}" |
| else: |
| reply_author = f"用户{reply_user_info.user_id}" |
| |
| reply_time = "" |
| if reply_info.timestamp: |
| reply_time = f" {reply_info.timestamp.strftime('%H:%M')}" |
| |
| reply_context = f"↳ 回复 [{reply_author}{reply_time}]: {reply_info.content}\n" |
| |
| # 构建完整消息内容 |
| full_message = f"{reply_context}[{user_display} {time_str}] {message.message_content}" |
| |
| # 构建用户信息字典 |
| user_info_dict = { |
| 'user_id': sender_info.user_id, |
| 'username': sender_info.username, |
| 'first_name': sender_info.first_name, |
| 'last_name': sender_info.last_name, |
| 'is_bot': sender_info.is_bot, |
| 'bot_username': message.bot_username, # 标识这是来自其他Bot的消息 |
| 'source': 'webhook', # 标识消息来源 |
| 'chat_id': message.group_id, |
| 'chat_type': 'group' |
| } |
| |
| # 将回复信息添加到user_info中 |
| if message.reply_info: |
| user_info_dict['reply_info'] = { |
| 'user_info': { |
| 'user_id': message.reply_info.user_info.user_id, |
| 'username': message.reply_info.user_info.username, |
| 'first_name': message.reply_info.user_info.first_name, |
| 'last_name': message.reply_info.user_info.last_name, |
| 'is_bot': message.reply_info.user_info.is_bot, |
| }, |
| 'timestamp': message.reply_info.timestamp, |
| 'content': message.reply_info.content, |
| 'message_id': message.reply_info.message_id |
| } |
| |
| # 这里需要调用实际的消息保存方法 |
| # 这个方法应该由使用此集成的组件来提供 |
| await self._save_webhook_message_to_context( |
| chat_id, |
| sender_info.user_id, |
| full_message, |
| user_info_dict |
| ) |
| |
| logger.debug(f"Webhook消息已保存到上下文: {chat_id}") |
| |
| except Exception as e: |
| logger.error(f"处理Webhook消息到上下文失败: {e}") |
| |
| async def _save_webhook_message_to_context( |
| self, |
| chat_id: str, |
| user_id: int, |
| message: str, |
| user_info: Dict[str, Any] |
| ): |
| """ |
| 保存Webhook消息到上下文 |
| |
| 这个方法需要由具体的集成实现来覆盖, |
| 以调用实际的上下文管理器 |
| """ |
| # 这是一个占位符方法,具体实现应该在子类或通过依赖注入提供 |
| logger.debug(f"保存Webhook消息到上下文的占位符实现: {chat_id}, {user_id}, {message[:50]}...") |
| |
| def get_status(self) -> Dict[str, Any]: |
| """获取Webhook集成状态""" |
| return { |
| "is_running": self.is_running, |
| "bot_username": self.bot_username, |
| "subscribed_groups": self.subscribed_groups, |
| "webhook_url": self.webhook_config.webhook_url, |
| "webhook_client_registered": self.webhook_client.is_registered if self.webhook_client else False |
| } |
| |
| async def check_webhook_server(self) -> bool: |
| """检查Webhook服务器状态""" |
| if not self.webhook_client: |
| return False |
| return await self.webhook_client.check_server_status() |
| |
| async def get_registered_bots(self) -> Optional[list[Dict[str, Any]]]: |
| """获取已注册的Bot列表""" |
| if not self.webhook_client: |
| return None |
| return await self.webhook_client.get_registered_bots() |