| """ |
| Claude Agent适配器 |
| 将Telegram Bot与现有的Claude Agent集成 |
| |
| V2.2 重构: |
| - 使用 claude-agent-sdk 替代 claude-code-sdk |
| - 利用 SDK 内置的 Automatic Compaction |
| - 简化代码,移除手动上下文管理 |
| """ |
| |
| import logging |
| import os |
| from pathlib import Path |
| from typing import List, Dict, Any, Optional, Union |
| import asyncio |
| |
| from .interfaces import IClaudeAgent |
| from ..core.agent import AgentCore |
| from ..storage.persistence import PersistenceManager |
| |
| logger = logging.getLogger(__name__) |
| |
| |
| class ClaudeAgentAdapter(IClaudeAgent): |
| """ |
| Claude Agent适配器实现 |
| |
| V2.2 重构说明: |
| - SDK 内置 Automatic Compaction,自动管理上下文大小 |
| - 移除了手动的 full_prompt 拼接和历史记录管理 |
| - 简化消息处理流程 |
| """ |
| |
| def __init__( |
| self, |
| agent: Optional[AgentCore] = None, |
| storage_dir: str = "data/storage", |
| bot_id: Optional[str] = None |
| ): |
| """ |
| 初始化Claude Agent适配器 |
| |
| Args: |
| agent: Claude Agent实例,如果为None则创建新实例 |
| storage_dir: 持久化存储目录 |
| bot_id: Bot ID,用于多实例隔离 |
| """ |
| # 为每个聊天维护独立的Agent实例,避免串台 |
| self._agents: Dict[str, AgentCore] = {} |
| self._claude_md_content = self._load_claude_md() |
| self.bot_id = bot_id |
| |
| # 初始化持久化存储管理器(传递bot_id用于多实例隔离) |
| self.persistence = PersistenceManager(storage_dir, bot_id=bot_id) |
| |
| # 在启动时恢复所有已保存的Agent状态 |
| self._restore_agents() |
| |
| if bot_id: |
| logger.info(f"Claude Agent适配器初始化完成 (Bot ID: {bot_id}),存储目录: {storage_dir}") |
| else: |
| logger.info(f"Claude Agent适配器初始化完成,存储目录: {storage_dir}") |
| |
| def _get_or_create_agent(self, chat_id: Union[int, str]) -> AgentCore: |
| """ |
| 获取或创建指定聊天的Agent实例 |
| |
| V2.2: 简化,移除了手动历史清理(SDK 自动管理) |
| |
| Args: |
| chat_id: 聊天ID |
| |
| Returns: |
| 该聊天专用的Agent实例 |
| """ |
| chat_key = str(chat_id) |
| if chat_key not in self._agents: |
| # 尝试从持久化存储中恢复Agent状态 |
| agent_state = self.persistence.load_agent_state(chat_key) |
| |
| if agent_state: |
| # 从保存的状态恢复Agent |
| self._agents[chat_key] = AgentCore.from_dict(agent_state) |
| |
| # 更新system_prompt为最新的CLAUDE.md内容 |
| self._agents[chat_key].system_prompt = self._claude_md_content |
| |
| logger.info(f"从持久化存储恢复聊天 {chat_id} 的Agent实例,并更新系统提示词") |
| |
| # 清理本地缓存中的污染历史 |
| cleaned_count = self._agents[chat_key].clean_conversation_history() |
| if cleaned_count > 0: |
| logger.warning(f"清理了聊天 {chat_id} 的 {cleaned_count} 条污染历史记录") |
| |
| # 修剪历史记录到合理数量 |
| trimmed_count = self._agents[chat_key].trim_conversation_history() |
| if trimmed_count > 0: |
| logger.info(f"修剪了聊天 {chat_id} 的 {trimmed_count} 条旧历史记录") |
| |
| # 如果有任何变更,立即保存状态 |
| if cleaned_count > 0 or trimmed_count > 0: |
| self.persistence.save_agent_state(chat_key, self._agents[chat_key].to_dict()) |
| else: |
| # 创建新的Agent实例,使用 CLAUDE.md 作为系统提示 |
| self._agents[chat_key] = AgentCore(system_prompt=self._claude_md_content) |
| logger.info(f"为聊天 {chat_id} 创建新的Agent实例") |
| |
| return self._agents[chat_key] |
| |
| async def process_message( |
| self, |
| message: str, |
| context: List[Dict[str, Any]], |
| user_info: Dict[str, Any] |
| ) -> str: |
| """ |
| 处理用户消息,返回回复 |
| |
| V2.2 简化: 直接使用 Agent 处理,SDK 自动管理上下文 |
| |
| Args: |
| message: 用户消息 |
| context: 对话上下文 (用于显示,不影响 SDK 上下文) |
| user_info: 用户信息 |
| |
| Returns: |
| 回复消息 |
| """ |
| try: |
| chat_id = user_info.get('chat_id', 'unknown') |
| agent = self._get_or_create_agent(chat_id) |
| |
| # 构建带用户信息的消息 |
| formatted_message = self._format_user_message(message, user_info) |
| |
| # 调用 Agent 处理 (SDK 自动管理上下文和压缩) |
| response = await agent.process_user_input(formatted_message) |
| |
| # 保存Agent状态到持久化存储 |
| await self._save_agent_state(chat_id, agent) |
| |
| return response |
| |
| except Exception as e: |
| logger.error(f"消息处理错误: {e}") |
| return self._get_telegram_fallback_response(message, str(e)) |
| |
| async def process_with_image( |
| self, |
| message: str, |
| image_path: str, |
| context: List[Dict[str, Any]], |
| user_info: Dict[str, Any] |
| ) -> str: |
| """处理包含图片的消息""" |
| try: |
| chat_id = user_info.get('chat_id', 'unknown') |
| agent = self._get_or_create_agent(chat_id) |
| |
| # 构建包含图片信息的消息 |
| image_message = f"用户发送了一张图片(路径: {image_path})\n用户消息: {message}" |
| formatted_message = self._format_user_message(image_message, user_info) |
| |
| response = await agent.process_user_input(formatted_message) |
| await self._save_agent_state(chat_id, agent) |
| |
| return response |
| |
| except Exception as e: |
| logger.error(f"图片消息处理错误: {e}") |
| return self._get_telegram_fallback_response(message, str(e)) |
| |
| async def process_with_document( |
| self, |
| message: str, |
| document_path: str, |
| context: List[Dict[str, Any]], |
| user_info: Dict[str, Any] |
| ) -> str: |
| """处理包含文档的消息""" |
| try: |
| chat_id = user_info.get('chat_id', 'unknown') |
| agent = self._get_or_create_agent(chat_id) |
| |
| document_message = f"用户发送了一个文档(路径: {document_path})\n用户消息: {message}" |
| formatted_message = self._format_user_message(document_message, user_info) |
| |
| response = await agent.process_user_input(formatted_message) |
| await self._save_agent_state(chat_id, agent) |
| |
| return response |
| |
| except Exception as e: |
| logger.error(f"文档消息处理错误: {e}") |
| return self._get_telegram_fallback_response(message, str(e)) |
| |
| def _format_user_message(self, message: str, user_info: Dict[str, Any]) -> str: |
| """ |
| 格式化用户消息,添加用户身份信息 |
| |
| Args: |
| message: 原始消息 |
| user_info: 用户信息 |
| |
| Returns: |
| 格式化后的消息 |
| """ |
| user_context = self._build_user_context(user_info) |
| if user_context: |
| return f"{user_context}\n\n{message}" |
| return message |
| |
| def _build_user_context(self, user_info: Dict[str, Any]) -> str: |
| """构建用户上下文信息""" |
| context_parts = [] |
| |
| # 🔍 优先检查是否来自 webhook(其他 bot 的消息) |
| if user_info.get('source') == 'webhook' and user_info.get('bot_username'): |
| bot_name = user_info.get('bot_username') |
| context_parts.append(f"📡 来自其他Bot: {bot_name}(Webhook转发)") |
| |
| # 如果有原始用户信息且不是bot消息,也标注出来 |
| if not user_info.get('is_bot') and user_info.get('username'): |
| context_parts.append(f"原始发送者: @{user_info['username']}") |
| |
| return "消息来源: " + ", ".join(context_parts) if context_parts else "" |
| |
| # 原有逻辑:处理直接用户消息 |
| # 添加消息时间 |
| if user_info.get('message_time'): |
| context_parts.append(f"时间: {user_info['message_time']}") |
| |
| if user_info.get('username'): |
| context_parts.append(f"用户名: @{user_info['username']}") |
| |
| if user_info.get('first_name'): |
| context_parts.append(f"名字: {user_info['first_name']}") |
| |
| if user_info.get('last_name'): |
| context_parts.append(f"姓氏: {user_info['last_name']}") |
| |
| chat_type = user_info.get('chat_type', 'private') |
| if chat_type == 'group': |
| context_parts.append("这是群组对话") |
| elif chat_type == 'supergroup': |
| context_parts.append("这是超级群组对话") |
| elif chat_type == 'channel': |
| context_parts.append("这是频道消息") |
| else: |
| context_parts.append("这是私人对话") |
| |
| if user_info.get('language_code'): |
| context_parts.append(f"语言: {user_info['language_code']}") |
| |
| return "用户信息: " + ", ".join(context_parts) if context_parts else "" |
| |
| def _load_claude_md(self) -> str: |
| """加载CLAUDE.md文件内容""" |
| claude_md_path = os.getenv('CLAUDE_MD_PATH') |
| |
| if claude_md_path and Path(claude_md_path).exists(): |
| try: |
| with open(claude_md_path, 'r', encoding='utf-8') as f: |
| content = f.read().strip() |
| logger.info(f"成功加载CLAUDE.md文件: {claude_md_path}") |
| return content |
| except Exception as e: |
| logger.error(f"读取CLAUDE.md文件失败: {e}") |
| |
| logger.info("使用默认Telegram Bot提示词") |
| return "你是一个有用的AI助手,请礼貌地回答用户的问题。" |
| |
| def _get_telegram_fallback_response(self, user_input: str, error_details: str) -> str: |
| """Telegram专用的个性化备用响应""" |
| logger.warning(f"使用Telegram个性化备用响应: {error_details}") |
| |
| input_lower = user_input.lower() |
| |
| # 检查是否是API额度不足错误 |
| if any(keyword in error_details for keyword in ["预扣费额度失败", "403", "剩余额度", "API Error", "/login"]): |
| return "呜呜~主人!我的AI大脑暂时欠费了...💸😿 开发者需要给我充值才能继续为主人服务呢!" |
| |
| # 检查是否是超时错误 |
| if "超时" in error_details or "timeout" in error_details.lower(): |
| return "哎呀~主人,我的大脑好像卡住了一下下...💭 要不你再问我一遍?" |
| |
| # 基于用户输入内容的智能回复 |
| if any(word in input_lower for word in ['你好', 'hello', 'hi', '嗨', '早', '晚上好']): |
| return "嗨~主人!虽然我现在系统有点小毛病,但看到主人还是超开心的呢!💕" |
| |
| elif any(word in input_lower for word in ['谢谢', 'thank', '感谢']): |
| return "呜呜~主人这么客气干嘛!虽然我现在有点技术问题,但为主人做事是应该的啦!💖" |
| |
| elif any(word in input_lower for word in ['再见', 'bye', '拜拜', '睡觉']): |
| return "主人要离开了吗?虽然我现在状态不太好,但...但是会想你的!😭 晚安~💤" |
| |
| else: |
| return f"主人~我听到你的话了,但是现在我的AI大脑有点小故障...🤖💧\n\n(技术备注: {error_details})" |
| |
| async def create_streaming_response( |
| self, |
| message: str, |
| context: List[Dict[str, Any]], |
| user_info: Dict[str, Any] |
| ): |
| """ |
| 创建流式响应生成器 |
| |
| V2.2 简化: 直接使用 Agent 的流式响应,SDK 自动管理上下文 |
| """ |
| logger.info(f"create_streaming_response 开始") |
| |
| try: |
| chat_id = user_info.get('chat_id', 'unknown') |
| agent = self._get_or_create_agent(chat_id) |
| |
| # 构建带用户信息的消息 |
| formatted_message = self._format_user_message(message, user_info) |
| logger.info(f"格式化消息长度: {len(formatted_message)} 字符") |
| |
| # 使用 Agent 的流式响应 (SDK 自动管理上下文) |
| async for chunk in agent.create_streaming_response(formatted_message): |
| yield chunk |
| |
| # 流式响应完成后保存Agent状态 |
| await self._save_agent_state(chat_id, agent) |
| logger.info(f"已保存聊天 {chat_id} 的流式响应状态") |
| |
| except Exception as e: |
| logger.error(f"流式响应生成错误: {e}") |
| yield self._get_telegram_fallback_response(message, str(e)) |
| |
| async def _save_agent_state(self, chat_id: Union[int, str], agent: AgentCore) -> bool: |
| """ |
| 保存Agent状态到持久化存储(简化版) |
| |
| 只保存 session_id 等元数据,对话历史由 Claude SDK 管理 |
| """ |
| try: |
| chat_key = str(chat_id) |
| |
| # 保存Agent状态(只包含 session_id, model, created_at) |
| agent_data = agent.to_dict() |
| success = self.persistence.save_agent_state(chat_key, agent_data) |
| |
| if success: |
| logger.debug(f"成功保存聊天 {chat_id} 的Agent状态 (session_id: {agent.session_id})") |
| return True |
| else: |
| logger.warning(f"保存聊天 {chat_id} 的状态失败") |
| return False |
| |
| except Exception as e: |
| logger.error(f"保存Agent状态失败: {e}") |
| return False |
| |
| def _restore_agents(self): |
| """启动时恢复所有保存的Agent实例""" |
| try: |
| chat_ids = self.persistence.get_all_chat_ids() |
| restored_count = 0 |
| |
| for chat_id in chat_ids: |
| try: |
| agent_state = self.persistence.load_agent_state(chat_id) |
| if agent_state: |
| restored_count += 1 |
| except Exception as e: |
| logger.warning(f"跳过恢复聊天 {chat_id} 的Agent: {e}") |
| |
| logger.info(f"启动时发现 {restored_count} 个可恢复的聊天记录") |
| |
| except Exception as e: |
| logger.error(f"恢复Agent状态失败: {e}") |
| |
| def set_agent(self, agent: AgentCore): |
| """设置Agent实例(已废弃)""" |
| logger.warning("set_agent方法已废弃,现在为每个聊天维护独立的Agent实例") |
| |
| def get_agent(self, chat_id: Union[int, str] = None) -> Optional[AgentCore]: |
| """获取Agent实例""" |
| if chat_id is not None: |
| return self._get_or_create_agent(chat_id) |
| return None |
| |
| def get_storage_stats(self) -> Dict[str, Any]: |
| """获取存储统计信息""" |
| try: |
| stats = self.persistence.get_storage_stats() |
| stats["active_agents"] = len(self._agents) |
| return stats |
| except Exception as e: |
| logger.error(f"获取存储统计失败: {e}") |
| return {} |
| |
| async def cleanup_old_conversations(self, days_threshold: int = 30) -> int: |
| """清理旧的对话记录""" |
| try: |
| cleanup_count = self.persistence.cleanup_old_data(days_threshold) |
| logger.info(f"清理了 {cleanup_count} 条旧对话记录") |
| return cleanup_count |
| except Exception as e: |
| logger.error(f"清理旧对话记录失败: {e}") |
| return 0 |
| |
| async def _save_message_to_agent( |
| self, |
| chat_id: Union[int, str], |
| user_id: int, |
| message: str, |
| is_bot: bool = False, |
| user_info: Dict[str, Any] = None |
| ): |
| """ |
| 保存消息到Agent的历史记录中(用于Webhook消息) |
| |
| Args: |
| chat_id: 聊天ID |
| user_id: 用户ID |
| message: 消息内容 |
| is_bot: 是否为Bot消息 |
| user_info: 用户信息字典 |
| """ |
| try: |
| agent = self._get_or_create_agent(chat_id) |
| |
| user_display_name = self._build_user_display_name(user_id, user_info or {}) |
| |
| if is_bot: |
| message_entry = { |
| "role": "assistant", |
| "content": f"[Bot: {user_display_name}] {message}" |
| } |
| else: |
| message_entry = { |
| "role": "user", |
| "content": f"[{user_display_name}] {message}" |
| } |
| |
| agent.conversation_history.append(message_entry) |
| await self._save_agent_state(chat_id, agent) |
| |
| logger.debug(f"Webhook消息已保存到Agent - 聊天: {chat_id}") |
| |
| except Exception as e: |
| logger.error(f"保存Webhook消息到Agent失败: {e}") |
| raise |
| |
| def _build_user_display_name(self, user_id: int, user_info: dict) -> str: |
| """构建用户显示名称""" |
| if not user_info: |
| return f"User{user_id}" if user_id != 'unknown' else "群友" |
| |
| if user_info.get('username'): |
| return f"@{user_info['username']}" |
| |
| name_parts = [] |
| if user_info.get('first_name'): |
| name_parts.append(user_info['first_name']) |
| if user_info.get('last_name'): |
| name_parts.append(user_info['last_name']) |
| |
| if name_parts: |
| return " ".join(name_parts) |
| |
| return f"User{user_id}" if user_id != 'unknown' else "群友" |