| """ |
| 消息处理器 |
| 处理Telegram消息的核心逻辑 |
| """ |
| |
| import logging |
| import random |
| from typing import Union, Optional, Dict, List, Any, Callable |
| from telegram import Update, User, Chat |
| from telegram.ext import ContextTypes |
| |
| from .interfaces import IMessageHandler, ITelegramClient, IContextManager, IFileHandler, IClaudeAgent, IStreamMessageSender |
| |
| |
| logger = logging.getLogger(__name__) |
| |
| |
| class GroupParticipationManager: |
| """群聊参与管理器 - 智能决定何时参与群聊""" |
| |
| def __init__(self, bot_names: list[str] = None, participation_range: list[int] = None): |
| """ |
| 初始化群聊参与管理器 |
| |
| Args: |
| bot_names: Bot可能被提及的名字列表 |
| participation_range: 随机参与的消息数量范围 [min, max] |
| """ |
| # 每个群聊的消息计数器和触发阈值 |
| self._message_counters: Dict[str, int] = {} |
| self._trigger_thresholds: Dict[str, int] = {} |
| |
| # bot的可能名字(用于检测),支持配置覆盖 |
| self._bot_names = bot_names if bot_names is not None else [ |
| 'claude', 'Claude', 'CLAUDE', |
| '克劳德', 'AI', 'ai', 'bot', 'Bot', 'BOT', |
| '助手', '机器人' |
| ] |
| |
| # 随机参与的消息数量范围,支持配置覆盖 |
| self._participation_range = participation_range if participation_range is not None else [1, 10] |
| |
| def reset_counter(self, chat_id: str): |
| """重置计数器并设置新的随机阈值""" |
| self._message_counters[chat_id] = 0 |
| min_count, max_count = self._participation_range |
| self._trigger_thresholds[chat_id] = random.randint(min_count, max_count) |
| logger.info(f"群聊 {chat_id} 设置新的参与阈值: {self._trigger_thresholds[chat_id]} (范围: {min_count}-{max_count})") |
| |
| def should_participate_random(self, chat_id: str) -> bool: |
| """检查是否达到随机参与条件""" |
| if chat_id not in self._message_counters: |
| self.reset_counter(chat_id) |
| return False |
| |
| self._message_counters[chat_id] += 1 |
| current_count = self._message_counters[chat_id] |
| threshold = self._trigger_thresholds[chat_id] |
| |
| logger.debug(f"群聊 {chat_id} 消息计数: {current_count}/{threshold}") |
| |
| if current_count >= threshold: |
| logger.info(f"群聊 {chat_id} 达到随机参与阈值,准备参与对话") |
| self.reset_counter(chat_id) # 重置计数器 |
| return True |
| |
| return False |
| |
| def is_name_mentioned(self, text: str) -> bool: |
| """检查消息中是否提到了bot的名字""" |
| text_lower = text.lower() |
| for name in self._bot_names: |
| if name.lower() in text_lower: |
| logger.info(f"检测到bot名字被提及: '{name}' in '{text[:50]}...'") |
| return True |
| return False |
| |
| |
| class MessageHandler(IMessageHandler): |
| """消息处理器实现""" |
| |
| def __init__( |
| self, |
| telegram_client: ITelegramClient, |
| context_manager: IContextManager, |
| file_handler: IFileHandler, |
| claude_agent: IClaudeAgent, |
| stream_sender: IStreamMessageSender, |
| allowed_users: list[int], |
| allowed_groups: list[int], |
| telegram_config: dict = None, |
| webhook_broadcast_callback: Optional[Callable] = None |
| ): |
| """ |
| 初始化消息处理器 |
| |
| Args: |
| telegram_client: Telegram客户端 |
| context_manager: 上下文管理器 |
| file_handler: 文件处理器 |
| claude_agent: Claude Agent |
| stream_sender: 流式消息发送器 |
| allowed_users: 允许的用户ID列表 |
| allowed_groups: 允许的群组ID列表 |
| telegram_config: Telegram配置字典(可选) |
| webhook_broadcast_callback: Webhook广播回调函数(可选) |
| """ |
| self.telegram_client = telegram_client |
| self.context_manager = context_manager |
| self.file_handler = file_handler |
| self.claude_agent = claude_agent |
| self.stream_sender = stream_sender |
| self.allowed_users = set(allowed_users) |
| self.allowed_groups = set(allowed_groups) |
| self.webhook_broadcast_callback = webhook_broadcast_callback |
| |
| # 从配置中获取群聊参与设置 |
| if telegram_config: |
| group_config = telegram_config.get('group_participation', {}) |
| bot_names = group_config.get('bot_names') |
| participation_range = group_config.get('random_participation_range') |
| else: |
| bot_names = None |
| participation_range = None |
| |
| # 初始化群聊参与管理器(使用配置或默认值) |
| self.participation_manager = GroupParticipationManager( |
| bot_names=bot_names, |
| participation_range=participation_range |
| ) |
| |
| logger.info(f"消息处理器初始化完成:") |
| logger.info(f" Bot名字列表: {self.participation_manager._bot_names}") |
| logger.info(f" 随机参与范围: {self.participation_manager._participation_range}") |
| |
| async def handle_text_message(self, update: Update, context: ContextTypes.DEFAULT_TYPE): |
| """处理文本消息""" |
| try: |
| logger.debug("handle_text_message 被调用") |
| |
| message = update.effective_message |
| user = update.effective_user |
| chat = update.effective_chat |
| |
| if not message or not user or not chat: |
| logger.warning(f"缺少必要对象 - message: {message}, user: {user}, chat: {chat}") |
| return |
| |
| text = message.text or "" |
| # 提取回复信息但不直接拼接到消息中 |
| reply_info = self._extract_reply_info(message) if message.reply_to_message else None |
| user_id = user.id |
| chat_id = chat.id |
| |
| logger.debug(f"收到消息 - 聊天ID: {chat_id}, 用户ID: {user_id}, 聊天类型: {chat.type}") |
| logger.debug(f"原始消息内容: '{text}'") |
| if reply_info: |
| logger.debug(f"回复信息: {reply_info}") |
| |
| # 检查是否需要回复 |
| should_reply, is_random_participation = await self._get_reply_info(update, text) |
| |
| # 🔄 统一保存用户消息到conversations.json(无论是否回复,只保存一次) |
| if (chat.type in ['group', 'supergroup'] and chat_id in self.allowed_groups) or should_reply: |
| # 构建用户信息(包含格式化的时间戳) |
| user_info = self._build_user_info(user, chat, message) |
| # 将回复信息添加到user_info中,而不是消息内容中 |
| if reply_info: |
| user_info['reply_info'] = reply_info |
| |
| # 直接通过Claude Agent保存消息到conversations.json(只保存一次) |
| await self._save_message_to_agent(chat_id, user_id, text, is_bot=False, user_info=user_info) |
| logger.info(f"✅ 用户消息已保存到conversations.json - 聊天ID: {chat_id}, 用户: {user_id}, 消息: '{text[:50]}...', 是否回复: {should_reply}") |
| else: |
| logger.debug(f"❌ 消息不符合保存条件 - 聊天类型: {chat.type}, 是否在白名单: {chat_id in self.allowed_groups}, 是否回复: {should_reply}") |
| |
| if should_reply: |
| |
| # 🔄 不再传递历史上下文,让Agent使用自己的内存历史 |
| # 避免重复合并导致的历史消息膨胀问题 |
| formatted_context = [] # 传递空上下文,Agent会使用自己的conversation_history |
| |
| # 创建流式响应生成器 |
| logger.debug(f"开始创建流式响应生成器,聊天ID: {chat_id}") |
| |
| async def generate_response(): |
| logger.debug("generate_response 函数被调用") |
| chunk_num = 0 |
| async for chunk in self.claude_agent.create_streaming_response(text, formatted_context, user_info): |
| chunk_num += 1 |
| logger.debug(f"生成器产生chunk #{chunk_num}: {len(str(chunk))} 字符") |
| yield chunk |
| logger.debug(f"生成器完成,总共 {chunk_num} chunks") |
| |
| logger.debug("调用 stream_sender.send_streaming_message...") |
| |
| # 发送流式回复 - 如果是随机参与则不回复特定消息 |
| reply_to_message_id = None if is_random_participation else message.message_id |
| if is_random_participation: |
| logger.debug("随机参与模式:不回复特定消息") |
| else: |
| logger.debug(f"回复特定消息ID: {reply_to_message_id}") |
| |
| reply_message = await self.stream_sender.send_streaming_message( |
| chat_id=chat_id, |
| message_generator=generate_response, |
| initial_text="⌨️ User is typing...", |
| reply_to_message_id=reply_to_message_id |
| ) |
| |
| logger.debug(f"stream_sender 完成,返回消息ID: {reply_message.message_id if reply_message else None}") |
| |
| # 广播Bot消息到Webhook服务器(如果配置了) |
| if self.webhook_broadcast_callback and reply_message and chat.type in ['group', 'supergroup']: |
| try: |
| # 构建Bot发送者信息 |
| bot_sender_info = { |
| 'user_id': reply_message.from_user.id if reply_message.from_user else 0, |
| 'username': reply_message.from_user.username if reply_message.from_user else None, |
| 'first_name': reply_message.from_user.first_name if reply_message.from_user else 'Bot', |
| 'last_name': reply_message.from_user.last_name if reply_message.from_user else None, |
| 'is_bot': True |
| } |
| |
| # 构建回复信息(如果有) |
| webhook_reply_info = None |
| if reply_info: |
| webhook_reply_info = { |
| 'user_info': reply_info, |
| 'timestamp': reply_info.get('timestamp'), |
| 'content': reply_info.get('content', ''), |
| 'message_id': reply_to_message_id |
| } |
| |
| # 调用Webhook广播 |
| await self.webhook_broadcast_callback( |
| group_id=chat_id, |
| message_content=reply_message.text or '', |
| sender_info=bot_sender_info, |
| reply_info=webhook_reply_info, |
| telegram_message_id=reply_message.message_id, |
| message_type='text' |
| ) |
| logger.debug(f"Bot消息已广播到Webhook: {chat_id}") |
| |
| except Exception as e: |
| logger.warning(f"Webhook广播失败: {e}") |
| |
| # 注意:Bot回复已经在流式响应过程中保存到conversations.json |
| # 不需要在这里重复保存,避免数据重复 |
| |
| # 如果是群聊,bot回复后重置随机参与计数器 |
| if chat.type in ['group', 'supergroup']: |
| self.participation_manager.reset_counter(str(chat_id)) |
| |
| except Exception as e: |
| logger.error(f"文本消息处理错误: {e}") |
| if update.effective_chat: |
| try: |
| await self.telegram_client.send_message( |
| chat_id=update.effective_chat.id, |
| text="❌ 消息处理失败,请稍后再试。" |
| ) |
| except: |
| pass |
| |
| async def handle_webhook_message( |
| self, |
| chat_id: int, |
| user_id: int, |
| message_text: str, |
| user_info: dict, |
| chat_type: str = 'supergroup', |
| is_bot: bool = False |
| ): |
| """ |
| 处理Webhook消息的专用方法 |
| |
| Args: |
| chat_id: 聊天ID |
| user_id: 用户ID |
| message_text: 消息文本 |
| user_info: 用户信息字典 |
| chat_type: 聊天类型 ('group', 'supergroup', 'private') |
| is_bot: 是否为Bot消息 |
| """ |
| try: |
| logger.debug(f"处理Webhook消息 - 聊天ID: {chat_id}, 用户ID: {user_id}") |
| |
| # 对于Webhook消息,使用与Telegram消息相同的智能参与逻辑 |
| should_reply = False |
| is_random_participation = False |
| |
| # 1. 检查@mention |
| if self._check_mention_in_text_webhook(message_text): |
| should_reply = True |
| is_random_participation = False |
| # @提及后重置随机计数器 |
| if chat_type in ['group', 'supergroup']: |
| self.participation_manager.reset_counter(str(chat_id)) |
| logger.info(f"Webhook消息检测到@mention,触发回复") |
| |
| # 2. 如果没有@mention,检查随机参与(仅对群组) |
| # 暂时屏蔽掉,这回让bot们在群里聊个没完没了 根本停不下来 |
| # elif chat_type in ['group', 'supergroup'] and chat_id in self.allowed_groups: |
| # should_participate = self.participation_manager.should_participate_random(str(chat_id)) |
| # if should_participate: |
| # should_reply = True |
| # is_random_participation = True |
| # logger.info(f"Webhook消息触发随机参与: 群聊 {chat_id}") |
| # else: |
| # # 即使不回复,也要计数(随机参与逻辑内部已经处理了计数) |
| # logger.debug(f"Webhook消息计数中: 群聊 {chat_id}") |
| |
| # 3. 私聊逻辑(如果需要支持) |
| elif chat_type == 'private' and chat_id in self.allowed_users: |
| should_reply = True |
| is_random_participation = False |
| |
| logger.info(f"Webhook消息回复决策 - 聊天ID: {chat_id}, 聊天类型: {chat_type}") |
| logger.info(f"Webhook消息回复决策 - 是否回复: {should_reply}, 随机参与: {is_random_participation}") |
| logger.info(f"Webhook消息回复决策 - 允许的群组: {list(self.allowed_groups)}") |
| logger.info(f"Webhook消息回复决策 - 群组检查: {chat_id in self.allowed_groups}") |
| |
| # 🔄 统一保存用户消息到conversations.json(无论是否回复,只保存一次) |
| if (chat_type in ['group', 'supergroup'] and chat_id in self.allowed_groups) or should_reply: |
| # 直接通过Claude Agent保存消息到conversations.json(只保存一次) |
| await self._save_message_to_agent(chat_id, user_id, message_text, is_bot=is_bot, user_info=user_info) |
| logger.info(f"✅ Webhook消息已保存到conversations.json - 聊天ID: {chat_id}, 用户: {user_id}, 消息: '{message_text[:50]}...', 是否回复: {should_reply}") |
| else: |
| logger.debug(f"❌ Webhook消息不符合保存条件 - 聊天类型: {chat_type}, 是否在白名单: {chat_id in self.allowed_groups}, 是否回复: {should_reply}") |
| |
| if should_reply: |
| # 🔄 不再传递历史上下文,让Agent使用自己的内存历史 |
| # 避免重复合并导致的历史消息膨胀问题 |
| formatted_context = [] # 传递空上下文,Agent会使用自己的conversation_history |
| |
| # 创建流式响应生成器 |
| logger.debug(f"开始创建Webhook流式响应生成器,聊天ID: {chat_id}") |
| |
| async def create_response_generator(): |
| """创建响应生成器""" |
| try: |
| # 调用Claude Agent |
| async for chunk in self.claude_agent.create_streaming_response( |
| message=message_text, |
| context=formatted_context, |
| user_info=user_info |
| ): |
| yield chunk |
| except Exception as e: |
| logger.error(f"创建Webhook响应生成器失败: {e}") |
| yield f"抱歉,处理您的消息时遇到错误: {str(e)}" |
| |
| # 发送流式响应 |
| logger.debug(f"开始发送Webhook流式响应,聊天ID: {chat_id}") |
| reply_message = await self.stream_sender.send_streaming_message( |
| chat_id=chat_id, |
| message_generator=create_response_generator, |
| initial_text="⌨️ User is typing..." |
| ) |
| |
| # 保存Bot的回复消息到conversations.json |
| if reply_message and reply_message.text: |
| bot_user_id = reply_message.from_user.id if reply_message.from_user else 999 |
| bot_user_info = { |
| 'user_id': bot_user_id, |
| 'username': reply_message.from_user.username if reply_message.from_user else None, |
| 'first_name': reply_message.from_user.first_name if reply_message.from_user else 'Bot', |
| 'is_bot': True |
| } |
| |
| await self._save_message_to_agent( |
| chat_id, bot_user_id, reply_message.text, |
| is_bot=True, user_info=bot_user_info |
| ) |
| logger.info(f"✅ Webhook Bot回复已保存到conversations.json - 聊天ID: {chat_id}, 内容: '{reply_message.text[:50]}...'") |
| |
| # 广播Bot回复到Webhook服务器(如果配置了) |
| if self.webhook_broadcast_callback and chat_type in ['group', 'supergroup']: |
| try: |
| logger.info(f"准备广播Webhook Bot回复到其他Bot...") |
| |
| # 构建Bot发送者信息 |
| bot_sender_info = { |
| 'user_id': bot_user_id, |
| 'username': reply_message.from_user.username if reply_message.from_user else None, |
| 'first_name': reply_message.from_user.first_name if reply_message.from_user else 'Bot', |
| 'last_name': reply_message.from_user.last_name if reply_message.from_user else None, |
| 'is_bot': True |
| } |
| |
| # 调用Webhook广播 |
| success = await self.webhook_broadcast_callback( |
| group_id=chat_id, |
| message_content=reply_message.text or '', |
| sender_info=bot_sender_info, |
| reply_info=None, |
| telegram_message_id=reply_message.message_id if reply_message else None, |
| message_type='text' |
| ) |
| |
| if success: |
| logger.info(f"✅ Webhook Bot回复已成功广播: {chat_id}") |
| else: |
| logger.warning(f"⚠️ Webhook Bot回复广播失败: {chat_id}") |
| |
| except Exception as e: |
| logger.error(f"Webhook Bot回复广播异常: {e}") |
| |
| # 如果是群聊,bot回复后重置随机参与计数器 |
| if chat_type in ['group', 'supergroup']: |
| self.participation_manager.reset_counter(str(chat_id)) |
| logger.debug(f"Webhook回复后重置计数器: 群聊 {chat_id}") |
| |
| except Exception as e: |
| logger.error(f"Webhook消息处理错误: {e}") |
| |
| def _check_mention_in_text_webhook(self, text: str) -> bool: |
| """检查文本中是否包含对当前Bot的@mention""" |
| logger.debug(f"检查@mention - 消息内容: '{text}'") |
| # logger.debug(f"检查@mention - Bot名字列表: {self.participation_manager._bot_names}") |
| |
| # 检查Bot名字列表 |
| # 谨慎开启,这个开启后Bot会在群里刷屏 |
| # for bot_name in self.participation_manager._bot_names: |
| # if bot_name.lower() in text.lower(): |
| # logger.info(f"检测到Bot名字提及: '{bot_name}' in '{text[:100]}...'") |
| # return True |
| |
| # 检查Bot用户名@mention(如果可用) |
| if hasattr(self, 'bot_username') and self.bot_username: |
| logger.debug(f"检查@mention - Bot用户名: @{self.bot_username}") |
| if f"@{self.bot_username}" in text: |
| logger.info(f"检测到@用户名提及: '@{self.bot_username}' in '{text[:100]}...'") |
| return True |
| else: |
| logger.debug("检查@mention - Bot用户名未设置") |
| |
| logger.debug("检查@mention - 未检测到任何提及") |
| return False |
| |
| async def handle_photo_message(self, update: Update, context: ContextTypes.DEFAULT_TYPE): |
| """处理图片消息 - 通过MCP工具分析图片""" |
| try: |
| if not await self._is_authorized(update): |
| return |
| |
| message = update.effective_message |
| user = update.effective_user |
| chat = update.effective_chat |
| |
| if not message or not user or not chat or not message.photo: |
| return |
| |
| user_id = user.id |
| chat_id = chat.id |
| caption = message.caption or "" |
| |
| # 提取回复信息 |
| reply_info = self._extract_reply_info(message) if message.reply_to_message else None |
| |
| # 检查是否需要回复 |
| should_reply, _ = await self._get_reply_info(update, caption) |
| |
| if should_reply: |
| # 构建用户信息(包含时间戳) |
| user_info = self._build_user_info(user, chat, message) |
| # 将回复信息添加到user_info中 |
| if reply_info: |
| user_info['reply_info'] = reply_info |
| |
| # 下载图片到临时目录 |
| try: |
| photo_file = await message.photo[-1].get_file() |
| import os |
| os.makedirs("temp/telegram", exist_ok=True) |
| file_path = f"temp/telegram/{chat_id}_{message.message_id}.jpg" |
| await photo_file.download_to_drive(file_path) |
| |
| # 获取绝对路径 |
| abs_path = os.path.abspath(file_path) |
| logger.info(f"图片已下载到: {abs_path}") |
| |
| # 构建提示Claude使用MCP工具的消息 |
| if caption: |
| prompt_message = f"用户发送了一张图片并说: {caption}\\n\\n请使用 analyze_telegram_image 工具(路径: {abs_path})查看并分析这张图片,然后回复用户。" |
| save_message = f"📸 [图片] {caption}" |
| else: |
| prompt_message = f"用户发送了一张图片\\n\\n请使用 analyze_telegram_image 工具(路径: {abs_path})查看并分析这张图片,然后回复用户。" |
| save_message = f"📸 [图片]" |
| |
| # 保存用户消息到本地历史(简化版,因为SDK会保存完整版) |
| await self._save_message_to_agent(chat_id, user_id, save_message, is_bot=False, user_info=user_info) |
| |
| # 创建流式响应 |
| async def generate_response(): |
| try: |
| # Claude会自动调用MCP工具来查看图片 |
| async for chunk in self.claude_agent.create_streaming_response( |
| prompt_message, [], user_info |
| ): |
| yield chunk |
| except Exception as e: |
| logger.error(f"图片消息处理失败: {e}", exc_info=True) |
| yield f"❌ 图片处理失败: {str(e)}" |
| |
| # 发送流式回复 |
| reply_message = await self.stream_sender.send_streaming_message( |
| chat_id=chat_id, |
| message_generator=generate_response, |
| initial_text="🖼️ 正在分析图片...", |
| reply_to_message_id=message.message_id |
| ) |
| |
| # 清理临时文件 |
| try: |
| os.remove(abs_path) |
| logger.debug(f"已删除临时图片文件: {abs_path}") |
| except Exception as e: |
| logger.debug(f"删除临时文件失败: {e}") |
| |
| # 群聊中重置参与计数器 |
| if chat.type in ['group', 'supergroup']: |
| self.participation_manager.reset_counter(str(chat_id)) |
| |
| except Exception as img_error: |
| logger.error(f"图片下载或处理失败: {img_error}", exc_info=True) |
| await self.telegram_client.send_message( |
| chat_id=chat_id, |
| text=f"❌ 图片处理失败: {str(img_error)}", |
| reply_to_message_id=message.message_id |
| ) |
| |
| except Exception as e: |
| logger.error(f"图片消息处理错误: {e}") |
| if update.effective_chat: |
| try: |
| await self.telegram_client.send_message( |
| chat_id=update.effective_chat.id, |
| text="❌ 图片处理出错,请稍后再试。" |
| ) |
| except: |
| pass |
| |
| async def handle_document_message(self, update: Update, context: ContextTypes.DEFAULT_TYPE): |
| """处理文档消息 - 暂时禁用文档处理功能""" |
| try: |
| if not await self._is_authorized(update): |
| return |
| |
| message = update.effective_message |
| user = update.effective_user |
| chat = update.effective_chat |
| |
| if not message or not user or not chat or not message.document: |
| return |
| |
| user_id = user.id |
| chat_id = chat.id |
| document = message.document |
| caption = message.caption or f"发送了文档: {document.file_name}" |
| |
| # 提取回复信息 |
| reply_info = self._extract_reply_info(message) if message.reply_to_message else None |
| |
| # 检查是否需要回复 |
| should_reply, _ = await self._get_reply_info(update, caption) |
| |
| if should_reply: |
| # 构建用户信息 |
| user_info = self._build_user_info(user, chat) |
| # 将回复信息添加到user_info中 |
| if reply_info: |
| user_info['reply_info'] = reply_info |
| |
| # 添加用户消息到上下文 |
| user_message = f"[文档: {document.file_name}] {caption}" |
| self.context_manager.add_message(chat_id, user_id, user_message, is_bot=False, user_info=user_info) |
| |
| # 简单的文档处理不可用提示 |
| response = "📄 文档处理功能暂时不可用,请用文字描述您想了解的内容,我会尽力帮助您!" |
| |
| # 发送回复 |
| reply_message = await self.telegram_client.send_message( |
| chat_id=chat_id, |
| text=response, |
| reply_to_message_id=message.message_id |
| ) |
| |
| # 将Bot回复添加到上下文 |
| if reply_message and reply_message.text: |
| self.context_manager.add_message(chat_id, reply_message.from_user.id, reply_message.text, is_bot=True) |
| |
| # 如果是群聊,bot回复后重置随机参与计数器 |
| if chat.type in ['group', 'supergroup']: |
| self.participation_manager.reset_counter(str(chat_id)) |
| |
| except Exception as e: |
| logger.error(f"文档消息处理错误: {e}") |
| if update.effective_chat: |
| try: |
| await self.telegram_client.send_message( |
| chat_id=update.effective_chat.id, |
| text="❌ 文档处理功能暂时不可用,请稍后再试。" |
| ) |
| except: |
| pass |
| |
| async def _is_authorized(self, update: Update) -> bool: |
| """检查用户是否有权限""" |
| user = update.effective_user |
| chat = update.effective_chat |
| |
| if not user or not chat: |
| return False |
| |
| # 私聊检查 |
| if chat.type == 'private': |
| return user.id in self.allowed_users |
| |
| # 群组检查 |
| if chat.type in ['group', 'supergroup']: |
| return chat.id in self.allowed_groups |
| |
| return False |
| |
| async def _get_reply_info(self, update: Update, text: str) -> tuple[bool, bool]: |
| """ |
| 获取回复信息 |
| Returns: |
| (should_reply: bool, is_random_participation: bool) |
| """ |
| chat = update.effective_chat |
| user = update.effective_user |
| |
| if not chat or not user: |
| return False, False |
| |
| # 私聊:需要用户在白名单中 |
| if chat.type == 'private': |
| if user.id not in self.allowed_users: |
| logger.debug(f"私聊用户 {user.id} 不在白名单中,不回复") |
| return False, False |
| return True, False # 私聊中白名单用户总是回复,不是随机参与 |
| |
| # 群聊:需要群组在白名单中,群内任何用户都可以触发 |
| if chat.type in ['group', 'supergroup']: |
| if chat.id not in self.allowed_groups: |
| logger.debug(f"群组 {chat.id} 不在白名单中,不回复") |
| return False, False |
| |
| # 群组在白名单中,检查智能参与条件 |
| return await self._should_reply_detailed(update, text) |
| |
| return False, False |
| |
| async def _should_reply_detailed(self, update: Update, text: str) -> tuple[bool, bool]: |
| """ |
| 详细判断是否应该回复消息(智能参与模式)- 仅处理群聊逻辑 |
| Returns: |
| (should_reply: bool, is_random_participation: bool) |
| """ |
| chat = update.effective_chat |
| |
| if not chat: |
| return False, False |
| |
| # 此方法只处理群聊智能参与逻辑 |
| if chat.type in ['group', 'supergroup']: |
| chat_id = str(chat.id) |
| logger.debug(f"群组消息检测 - 聊天ID: {chat.id}, 类型: {chat.type}") |
| logger.debug(f"消息文本: '{text}'") |
| |
| # 1. 检查是否直接回复了bot的消息 |
| message = update.effective_message |
| if message and message.reply_to_message: |
| replied_message = message.reply_to_message |
| if replied_message.from_user and replied_message.from_user.is_bot: |
| # 获取bot信息来确认是回复的我们的bot |
| bot_user = update.get_bot() |
| if bot_user and replied_message.from_user.id == bot_user.id: |
| logger.info("检测到用户回复bot消息,触发回复") |
| # 回复后重置随机计数器 |
| self.participation_manager.reset_counter(chat_id) |
| return True, False # 这不是随机参与 |
| |
| # 2. 检查是否被@提及 |
| bot_user = update.get_bot() |
| if bot_user and bot_user.username: |
| bot_username = bot_user.username |
| mentions = [f"@{bot_username}", f"@{bot_username.lower()}"] |
| text_lower = text.lower() |
| |
| for mention in mentions: |
| if mention in text_lower: |
| logger.info(f"检测到@提及: '{mention}',触发回复") |
| # @提及后重置随机计数器 |
| self.participation_manager.reset_counter(chat_id) |
| return True, False # 这不是随机参与 |
| |
| # 3. 检查是否提到了bot的名字 |
| if self.participation_manager.is_name_mentioned(text): |
| logger.info("检测到bot名字被提及,触发回复") |
| # 名字提及后重置随机计数器 |
| self.participation_manager.reset_counter(chat_id) |
| return True, False # 这不是随机参与 |
| |
| # 4. 检查是否达到随机参与条件 |
| if self.participation_manager.should_participate_random(chat_id): |
| logger.info(f"群聊 {chat_id} 达到随机参与条件,主动参与对话") |
| return True, True # 这是随机参与 |
| |
| # 5. 都不满足,不回复但消息已被记录 |
| logger.debug("群聊消息不触发回复条件,但已记录到上下文") |
| return False, False |
| |
| # 私聊逻辑已移到 _get_reply_info 中处理 |
| return False, False |
| |
| async def _should_reply_with_auth(self, update: Update, text: str) -> bool: |
| """结合授权检查和回复条件判断是否应该回复""" |
| chat = update.effective_chat |
| user = update.effective_user |
| |
| if not chat or not user: |
| return False |
| |
| # 私聊:需要用户在白名单中 |
| if chat.type == 'private': |
| if user.id not in self.allowed_users: |
| logger.debug(f"私聊用户 {user.id} 不在白名单中,不回复") |
| return False |
| return True # 私聊中白名单用户总是回复 |
| |
| # 群聊:需要群组在白名单中,群内任何用户都可以触发 |
| if chat.type in ['group', 'supergroup']: |
| if chat.id not in self.allowed_groups: |
| logger.debug(f"群组 {chat.id} 不在白名单中,不回复") |
| return False |
| |
| # 群组在白名单中,检查智能参与条件 |
| return await self._should_reply(update, text) |
| |
| return False |
| |
| async def _should_reply(self, update: Update, text: str) -> bool: |
| """判断是否应该回复消息(智能参与模式)- 仅处理群聊逻辑""" |
| chat = update.effective_chat |
| |
| if not chat: |
| return False |
| |
| # 此方法只处理群聊智能参与逻辑 |
| if chat.type in ['group', 'supergroup']: |
| chat_id = str(chat.id) |
| logger.debug(f"群组消息检测 - 聊天ID: {chat.id}, 类型: {chat.type}") |
| logger.debug(f"消息文本: '{text}'") |
| |
| # 1. 检查是否直接回复了bot的消息 |
| message = update.effective_message |
| if message and message.reply_to_message: |
| replied_message = message.reply_to_message |
| if replied_message.from_user and replied_message.from_user.is_bot: |
| # 获取bot信息来确认是回复的我们的bot |
| bot_user = update.get_bot() |
| if bot_user and replied_message.from_user.id == bot_user.id: |
| logger.info("检测到用户回复bot消息,触发回复") |
| # 回复后重置随机计数器 |
| self.participation_manager.reset_counter(chat_id) |
| return True |
| |
| # 2. 检查是否被@提及 |
| bot_user = update.get_bot() |
| if bot_user and bot_user.username: |
| bot_username = bot_user.username |
| mentions = [f"@{bot_username}", f"@{bot_username.lower()}"] |
| text_lower = text.lower() |
| |
| for mention in mentions: |
| if mention in text_lower: |
| logger.info(f"检测到@提及: '{mention}',触发回复") |
| # @提及后重置随机计数器 |
| self.participation_manager.reset_counter(chat_id) |
| return True |
| |
| # 3. 检查是否提到了bot的名字 |
| if self.participation_manager.is_name_mentioned(text): |
| logger.info("检测到bot名字被提及,触发回复") |
| # 名字提及后重置随机计数器 |
| self.participation_manager.reset_counter(chat_id) |
| return True |
| |
| # 4. 检查是否达到随机参与条件 |
| if self.participation_manager.should_participate_random(chat_id): |
| logger.info(f"群聊 {chat_id} 达到随机参与条件,主动参与对话") |
| return True |
| |
| # 5. 都不满足,不回复但消息已被记录 |
| logger.debug("群聊消息不触发回复条件,但已记录到上下文") |
| return False |
| |
| # 私聊逻辑已移到 _should_reply_with_auth 中处理 |
| return False |
| |
| def _build_user_info(self, user: User, chat: Chat, message=None) -> dict: |
| """构建用户信息""" |
| user_info = { |
| 'user_id': user.id, |
| 'username': user.username, |
| 'first_name': user.first_name, |
| 'last_name': user.last_name, |
| 'language_code': user.language_code, |
| 'chat_id': chat.id, |
| 'chat_type': chat.type, |
| 'chat_title': getattr(chat, 'title', None) |
| } |
| |
| # 添加消息时间戳(格式化为可读字符串) |
| if message and message.date: |
| try: |
| # 转换为本地时区 |
| import pytz |
| from datetime import datetime |
| if message.date.tzinfo is None: |
| utc_time = pytz.utc.localize(message.date) |
| else: |
| utc_time = message.date |
| local_time = utc_time.astimezone() |
| # 格式化为 "2025-12-10 18:30:15" |
| user_info['message_time'] = local_time.strftime('%Y-%m-%d %H:%M:%S') |
| except Exception as e: |
| logger.debug(f"时间格式化失败: {e}") |
| # 降级方案:使用简单格式 |
| try: |
| user_info['message_time'] = message.date.strftime('%Y-%m-%d %H:%M:%S') |
| except: |
| pass |
| |
| return user_info |
| |
| def _extract_reply_info(self, message) -> dict: |
| """ |
| 提取回复消息的信息 |
| |
| Args: |
| message: Telegram消息对象 |
| |
| Returns: |
| dict: 包含回复信息的字典,如果没有回复则返回None |
| """ |
| if not message or not message.reply_to_message: |
| return None |
| |
| reply_msg = message.reply_to_message |
| |
| try: |
| # 提取被回复消息的用户信息 |
| reply_user_info = {} |
| if reply_msg.from_user: |
| user = reply_msg.from_user |
| reply_user_info = { |
| 'user_id': user.id, |
| 'username': user.username, |
| 'first_name': user.first_name, |
| 'last_name': user.last_name, |
| 'is_bot': user.is_bot |
| } |
| # Debug: 记录特殊的用户情况 |
| if user.id == 0 or (not user.username and not user.first_name): |
| logger.debug(f"检测到特殊用户信息 - ID: {user.id}, username: {user.username}, first_name: {user.first_name}, is_bot: {user.is_bot}") |
| else: |
| # 处理from_user为None的情况(如频道消息或系统消息) |
| logger.debug(f"被回复消息缺少from_user信息,消息ID: {reply_msg.message_id}") |
| reply_user_info = { |
| 'user_id': 'unknown', |
| 'username': None, |
| 'first_name': 'system', |
| 'last_name': None, |
| 'is_bot': False |
| } |
| |
| # 提取时间信息 |
| reply_timestamp = None |
| if reply_msg.date: |
| try: |
| # 如果原始时间没有时区信息,添加UTC时区 |
| if reply_msg.date.tzinfo is None: |
| import pytz |
| utc_time = reply_msg.date.replace(tzinfo=pytz.UTC) |
| else: |
| utc_time = reply_msg.date |
| |
| # 转换为本地时区 |
| local_time = utc_time.astimezone() # 自动使用系统本地时区 |
| reply_timestamp = local_time |
| except Exception as e: |
| logger.debug(f"时区转换失败,使用原始时间: {e}") |
| # 如果转换失败,使用原始时间 |
| reply_timestamp = reply_msg.date |
| |
| # 提取消息内容 |
| reply_content = "" |
| if reply_msg.text: |
| reply_content = reply_msg.text |
| elif reply_msg.caption: |
| reply_content = reply_msg.caption |
| elif reply_msg.photo: |
| reply_content = "[图片]" |
| elif reply_msg.document: |
| reply_content = f"[文档: {reply_msg.document.file_name or '未知文件'}]" |
| elif reply_msg.voice: |
| reply_content = "[语音消息]" |
| elif reply_msg.video: |
| reply_content = "[视频]" |
| elif reply_msg.audio: |
| reply_content = "[音频]" |
| elif reply_msg.sticker: |
| reply_content = f"[贴纸: {reply_msg.sticker.emoji or '😀'}]" |
| else: |
| reply_content = "[其他类型消息]" |
| |
| # 限制回复内容长度 |
| max_reply_length = 100 |
| if reply_content and len(str(reply_content)) > max_reply_length: |
| reply_content = str(reply_content)[:max_reply_length] + "..." |
| |
| return { |
| 'user_info': reply_user_info, |
| 'timestamp': reply_timestamp, |
| 'content': reply_content |
| } |
| |
| except Exception as e: |
| logger.error(f"提取回复信息时出错: {e}") |
| return None |
| |
| def _extract_reply_context(self, message) -> str: |
| """ |
| 提取回复消息的上下文信息(已废弃,保留向后兼容) |
| |
| Args: |
| message: Telegram消息对象 |
| |
| Returns: |
| str: 包含回复上下文的格式化文本,如果没有回复则返回空字符串 |
| """ |
| reply_info = self._extract_reply_info(message) |
| if not reply_info: |
| return "" |
| |
| # 构建用户显示名 |
| user_info = reply_info['user_info'] |
| if user_info.get('is_bot'): |
| user_display = "🤖 Bot" |
| elif user_info.get('username'): |
| user_display = f"@{user_info['username']}" |
| elif user_info.get('first_name'): |
| user_display = user_info['first_name'] |
| if user_info.get('last_name'): |
| user_display += f" {user_info['last_name']}" |
| else: |
| user_display = f"用户{user_info.get('user_id', 'unknown')}" |
| |
| # 格式化时间 |
| time_info = "" |
| if reply_info['timestamp']: |
| time_info = f" {reply_info['timestamp'].strftime('%H:%M')}" |
| |
| # 构建回复上下文 |
| reply_context = f"↳ 回复 [{user_display}{time_info}]: {reply_info['content']}" |
| return reply_context |
| |
| def _build_message_with_reply_context(self, text: str, message) -> str: |
| """ |
| 构建包含回复上下文的完整消息 |
| |
| Args: |
| text: 原始消息文本 |
| message: Telegram消息对象 |
| |
| Returns: |
| str: 包含回复上下文的完整消息文本 |
| """ |
| reply_context = self._extract_reply_context(message) |
| |
| if reply_context: |
| # 将回复上下文添加到消息前面 |
| return f"{reply_context}\n{text}" |
| else: |
| return text |
| |
| async def _get_agent_history(self, chat_id: Union[int, str]) -> List[Dict[str, Any]]: |
| """ |
| 从Claude Agent获取已保存的对话历史 |
| |
| Args: |
| chat_id: 聊天ID |
| |
| Returns: |
| 对话历史列表,如果没有则返回空列表 |
| """ |
| try: |
| # 通过Claude Agent获取已保存的对话历史 |
| if hasattr(self.claude_agent, 'persistence'): |
| history = self.claude_agent.persistence.load_conversation_history(str(chat_id)) |
| return history or [] |
| return [] |
| except Exception as e: |
| logger.warning(f"获取Agent历史记录失败: {e}") |
| return [] |
| |
| def _convert_agent_history_to_context_format(self, agent_history: List[Dict[str, Any]], chat_id: Union[int, str]) -> List[Dict[str, Any]]: |
| """ |
| 将Agent历史格式转换为context_manager格式,用于向后兼容 |
| |
| Args: |
| agent_history: Agent历史记录(来自conversations.json) |
| chat_id: 聊天ID |
| |
| Returns: |
| 转换后的context格式列表 |
| """ |
| context_format = [] |
| |
| for entry in agent_history: |
| try: |
| # Agent历史格式: {"role": "user/assistant", "content": "..."} |
| role = entry.get('role', 'user') |
| content = entry.get('content', '') |
| user_id = entry.get('user_id', 999 if role == 'assistant' else 123) |
| timestamp = entry.get('timestamp', '') |
| |
| # 转换为context_manager格式 |
| context_entry = { |
| 'user_id': user_id, |
| 'message': content, |
| 'is_bot': role == 'assistant', |
| 'timestamp': timestamp, |
| 'chat_id': str(chat_id), |
| 'user_info': { |
| 'username': 'agent_user' if role == 'user' else 'bot', |
| 'source': 'conversations_json' # 标记来源 |
| } |
| } |
| context_format.append(context_entry) |
| |
| except Exception as e: |
| logger.warning(f"转换Agent历史记录失败: {e}") |
| |
| logger.debug(f"转换Agent历史完成: {len(agent_history)} → {len(context_format)} 条记录") |
| return context_format |
| |
| async def _save_message_to_agent(self, chat_id: Union[int, str], user_id: int, message: str, is_bot: bool = False, user_info: dict = None): |
| """ |
| 将消息添加到Claude Agent的内存历史中 |
| |
| 此方法会构建完整的消息格式,包含用户显示名、时间戳和回复上下文, |
| 然后将消息添加到指定聊天的Agent内存历史记录中。注意:此方法不会立即 |
| 保存到conversations.json文件,文件保存会在流式响应完成后统一进行。 |
| |
| Args: |
| chat_id: 聊天ID(群组ID或私聊ID) |
| user_id: 用户ID(发送消息的用户ID,机器人消息时使用bot的user_id) |
| message: 原始消息内容 |
| is_bot: 是否为机器人消息,默认False |
| user_info: 用户信息字典,可选,包含: |
| - username: 用户名 |
| - first_name: 名字 |
| - last_name: 姓氏 |
| - reply_info: 回复信息(如果是回复消息) |
| |
| Note: |
| - 用户消息会被格式化为: "[用户显示名 时间] 消息内容" |
| - 机器人消息会被格式化为: "[Bot 时间] 消息内容" |
| - 如果包含回复信息,会在消息前添加回复上下文 |
| - 消息会立即添加到Agent的conversation_history中,但不立即保存到文件 |
| """ |
| try: |
| # 获取该聊天的Agent实例 |
| agent = self.claude_agent._get_or_create_agent(chat_id) |
| |
| # 构建消息格式,与_build_conversation_history保持一致 |
| role = "assistant" if is_bot else "user" |
| |
| if role == "user": |
| # 构建用户显示名 |
| user_display = self._build_user_display_name(user_id, user_info) |
| |
| # 处理回复上下文 |
| reply_context = "" |
| reply_info = user_info.get('reply_info') if user_info else None |
| if reply_info: |
| reply_context = self._format_reply_context(reply_info) |
| if reply_context: |
| reply_context = f"{reply_context}\n" |
| |
| # 添加时间信息到消息中 |
| from datetime import datetime |
| try: |
| local_dt = datetime.now() |
| time_str = local_dt.strftime('%H:%M') |
| content_with_identity = f"{reply_context}[{user_display} {time_str}] {message}" |
| except Exception: |
| content_with_identity = f"{reply_context}[{user_display}] {message}" |
| else: |
| # Bot消息也加上时间戳 |
| from datetime import datetime |
| try: |
| local_dt = datetime.now() |
| time_str = local_dt.strftime('%H:%M') |
| content_with_identity = f"[Bot {time_str}] {message}" |
| except Exception: |
| content_with_identity = message |
| |
| # 添加到Agent历史 |
| from datetime import datetime |
| message_entry = { |
| "role": role, |
| "content": content_with_identity, |
| "user_id": user_id, |
| "timestamp": datetime.now().isoformat() |
| } |
| |
| agent.conversation_history.append(message_entry) |
| |
| # 注意:不在这里立即保存,让流式响应完成后统一保存 |
| # 这样避免了多次保存导致的重复数据问题 |
| logger.debug(f"📝 消息已添加到Agent历史 - 聊天: {chat_id}, 角色: {role}, 内容: '{message[:50]}...'") |
| |
| except Exception as e: |
| logger.error(f"保存消息到Agent失败: {e}") |
| |
| def _build_user_display_name(self, user_id: int, user_info: dict) -> str: |
| """ |
| 构建用户显示名称(从claude_adapter复制) |
| |
| Args: |
| user_id: 用户ID |
| user_info: 用户信息字典 |
| |
| Returns: |
| 友好的用户显示名称 |
| """ |
| if not user_info: |
| return f"User{user_id}" if user_id != 'unknown' else "群友" |
| |
| # 优先使用用户名 |
| if user_info.get('username'): |
| return f"@{user_info['username']}" |
| |
| # 其次使用姓名 |
| name_parts = [] |
| if user_info.get('first_name'): |
| name_parts.append(user_info['first_name']) |
| if user_info.get('last_name'): |
| name_parts.append(user_info['last_name']) |
| |
| if name_parts: |
| return " ".join(name_parts) |
| |
| # 最后回退到用户ID |
| return f"User{user_id}" if user_id != 'unknown' else "群友" |
| |
| def _format_reply_context(self, reply_info: Dict[str, Any]) -> str: |
| """ |
| 格式化回复上下文,保留原作者信息(从claude_adapter复制) |
| |
| Args: |
| reply_info: 回复信息字典 |
| |
| Returns: |
| 格式化的回复上下文字符串 |
| """ |
| if not reply_info: |
| return "" |
| |
| user_info = reply_info.get('user_info', {}) |
| timestamp = reply_info.get('timestamp') |
| content = reply_info.get('content', '') |
| |
| # 构建原作者显示名 |
| if user_info.get('is_bot'): |
| # 对于Bot,优先显示username |
| if user_info.get('username'): |
| original_author = f"🤖 @{user_info['username']}" |
| elif user_info.get('first_name'): |
| original_author = f"🤖 {user_info['first_name']}" |
| if user_info.get('last_name'): |
| original_author += f" {user_info['last_name']}" |
| else: |
| original_author = "🤖 Bot" |
| elif user_info.get('username'): |
| original_author = f"@{user_info['username']}" |
| elif user_info.get('first_name'): |
| original_author = user_info['first_name'] |
| if user_info.get('last_name'): |
| original_author += f" {user_info['last_name']}" |
| else: |
| # 改进边界情况处理 |
| user_id = user_info.get('user_id', 'unknown') |
| if user_id == 'unknown' or user_id == 'system': |
| original_author = "system" |
| elif user_id == 0: |
| original_author = "unknown user" |
| else: |
| original_author = f"用户{user_id}" |
| |
| # 格式化时间 |
| time_info = "" |
| if timestamp: |
| try: |
| time_info = f" {timestamp.strftime('%H:%M')}" |
| except: |
| pass |
| |
| # 构建回复上下文 |
| reply_context = f"↳ 回复 [{original_author}{time_info}]: {content}" |
| return reply_context |