| """ |
| 核心Agent类 - 实现智能对话和任务执行 |
| 基于 claude-agent-sdk V2.2 重构,使用内置 Automatic Compaction |
| """ |
| |
| import asyncio |
| import json |
| import logging |
| import time |
| from typing import Dict, List, Optional, Any, AsyncGenerator |
| from enum import Enum |
| from dataclasses import dataclass, asdict |
| |
| # 使用新的 claude-agent-sdk |
| from claude_agent_sdk import ( |
| ClaudeAgentOptions, |
| ClaudeSDKClient, |
| AssistantMessage, |
| SystemMessage, |
| ResultMessage, |
| TextBlock, |
| ) |
| |
| |
| class InputCompletionSignal(Enum): |
| TIMEOUT = "timeout" |
| DELIMITER = "delimiter" |
| EXPLICIT = "explicit" |
| MAX_LENGTH = "max_length" |
| |
| |
| @dataclass |
| class StreamInputBuffer: |
| buffer: str = "" |
| last_update_time: float = 0.0 |
| is_complete: bool = False |
| completion_signal: Optional[InputCompletionSignal] = None |
| timeout_seconds: float = 3.0 |
| max_buffer_size: int = 10000 |
| delimiter: str = "\n\n" |
| |
| |
| @dataclass |
| class Task: |
| """任务数据类""" |
| id: str |
| description: str |
| status: str = "pending" # pending, in_progress, completed, failed |
| result: Optional[str] = None |
| subtasks: List['Task'] = None |
| |
| def __post_init__(self): |
| if self.subtasks is None: |
| self.subtasks = [] |
| |
| |
| class AgentCore: |
| """ |
| 核心Agent类 - 处理对话、任务规划和执行 |
| |
| V2.2 重构: |
| - 使用 claude-agent-sdk 替代 claude-code-sdk |
| - 利用 SDK 内置的 Automatic Compaction |
| - 简化代码,移除手动上下文管理 |
| """ |
| |
| def __init__( |
| self, |
| api_key: Optional[str] = None, |
| model: str = "claude-sonnet-4-5-20250929", |
| max_history: int = 100, |
| system_prompt: Optional[str] = None |
| ): |
| """ |
| 初始化 AgentCore |
| |
| Args: |
| api_key: API 密钥(已废弃,SDK 自动处理) |
| model: 使用的模型 |
| max_history: 最大历史记录数(用于本地缓存,SDK 自动压缩) |
| system_prompt: 系统提示词 |
| """ |
| self.model = model |
| self.max_history = max_history |
| self.system_prompt = system_prompt or "你是一个智能助手,可以帮助用户完成各种任务。" |
| |
| self.tasks: List[Task] = [] |
| self.conversation_history: List[Dict[str, str]] = [] # 本地缓存,用于显示 |
| self.mcp_tools: Dict[str, Any] = {} |
| self.logger = logging.getLogger(__name__) |
| self.stream_input_buffer: Optional[StreamInputBuffer] = None |
| |
| # SDK session管理 |
| self.session_id: Optional[str] = None # 保存session ID用于上下文连续性 |
| |
| # SDK 客户端实例(按需创建) |
| self._sdk_client: Optional[ClaudeSDKClient] = None |
| |
| self.logger.info(f"AgentCore 初始化完成 - 模型: {model}, SDK: claude-agent-sdk") |
| |
| def _get_options(self, resume_session: Optional[str] = None) -> ClaudeAgentOptions: |
| """ |
| 构建 SDK 配置选项 |
| |
| Args: |
| resume_session: 要恢复的 session ID,None 表示创建新会话 |
| |
| Returns: |
| ClaudeAgentOptions 配置 |
| """ |
| import sys |
| from pathlib import Path |
| |
| # 获取telegram_image_tools.py的绝对路径 |
| mcp_module_path = Path(__file__).parent.parent / "mcp" / "telegram_image_tools.py" |
| |
| options = ClaudeAgentOptions( |
| model=self.model, |
| system_prompt=self.system_prompt, |
| permission_mode="bypassPermissions", |
| # 配置MCP图片分析工具 |
| mcp_servers={ |
| "telegram-image": { |
| "command": sys.executable, # 使用当前Python解释器 |
| "args": [str(mcp_module_path)] |
| } |
| } |
| ) |
| |
| # 只有在恢复已存在的会话时才传递 resume 参数 |
| if resume_session: |
| options.resume = resume_session |
| self.logger.debug(f"使用 resume 参数恢复会话: {resume_session}") |
| |
| return options |
| |
| async def process_user_input(self, user_input: str) -> str: |
| """ |
| 处理用户输入(统一处理逻辑,使用 ClaudeSDKClient) |
| |
| Args: |
| user_input: 用户输入文本 |
| |
| Returns: |
| AI 响应 |
| """ |
| # 添加到本地历史缓存(用于显示) |
| self.conversation_history.append({"role": "user", "content": user_input}) |
| |
| # 统一使用 ClaudeSDKClient 处理,包含重试逻辑 |
| max_retries = 2 |
| retry_delay = 1.0 |
| timeout_seconds = 60 |
| |
| for attempt in range(max_retries): |
| try: |
| self.logger.info(f"处理用户输入,尝试 {attempt + 1}/{max_retries}") |
| |
| try: |
| response_stream = await asyncio.wait_for( |
| self._make_query_call(user_input), |
| timeout=timeout_seconds |
| ) |
| except asyncio.TimeoutError: |
| self.logger.warning(f"SDK调用超时 ({timeout_seconds}s)") |
| if attempt == max_retries - 1: |
| return self._get_fallback_response(user_input, f"响应超时 ({timeout_seconds}s)") |
| continue |
| |
| response_text = await self._extract_response_text(response_stream) |
| |
| if response_text.strip(): |
| self.conversation_history.append({"role": "assistant", "content": response_text}) |
| return response_text |
| else: |
| self.logger.warning(f"第 {attempt + 1} 次尝试得到空响应") |
| |
| except Exception as e: |
| error_msg = str(e) |
| self.logger.error(f"第 {attempt + 1} 次处理失败: {error_msg}") |
| |
| if attempt == max_retries - 1: |
| return self._get_fallback_response(user_input, error_msg) |
| |
| await asyncio.sleep(retry_delay) |
| retry_delay *= 1.5 |
| |
| return self._get_fallback_response(user_input, "多次重试后仍然失败") |
| |
| async def _extract_response_text(self, response_stream) -> str: |
| """ |
| 从响应流中提取文本,并捕获 SDK 的 session_id |
| |
| 重要:第一次对话时,从 init 消息中获取 SDK 生成的 session_id |
| """ |
| response_text = "" |
| async for message in response_stream: |
| # 🔑 捕获 SDK 生成的真正 session_id(从 init 消息) |
| if hasattr(message, 'subtype') and message.subtype == 'init': |
| if hasattr(message, 'data') and 'session_id' in message.data: |
| sdk_session_id = message.data['session_id'] |
| if sdk_session_id != self.session_id: |
| self.logger.info(f"✅ 捕获 SDK session_id: {sdk_session_id}") |
| self.session_id = sdk_session_id |
| |
| # 提取文本内容 |
| if isinstance(message, AssistantMessage): |
| for block in message.content: |
| if isinstance(block, TextBlock): |
| response_text += block.text |
| elif hasattr(block, 'text'): |
| response_text += block.text |
| # 跳过 SystemMessage 和 ResultMessage |
| return response_text |
| |
| def _get_fallback_response(self, user_input: str, error_details: str) -> str: |
| """当SDK失败时的备用响应""" |
| self.logger.warning(f"使用备用响应: {error_details}") |
| |
| if any(keyword in error_details for keyword in ["预扣费额度失败", "403", "剩余额度", "API Error", "/login"]): |
| return "抱歉,AI服务暂时不可用,可能是API配额不足。请联系管理员检查服务状态。" |
| |
| if "超时" in error_details or "timeout" in error_details.lower(): |
| return "请求超时,请稍后重试。如果问题持续,请尝试简化您的问题。" |
| |
| return f"抱歉,处理您的请求时遇到技术问题。错误信息:{error_details}" |
| |
| async def _make_query_call(self, user_input: str): |
| """ |
| 使用 ClaudeSDKClient 进行有状态的对话 |
| |
| ClaudeSDKClient 特点: |
| - 有状态(Stateful):维护对话上下文 |
| - 支持 resume 参数:可以恢复之前的对话 |
| - 适合交互式对话应用 |
| """ |
| self.logger.debug(f"调用 SDK query,输入长度: {len(user_input)}, session_id: {self.session_id}") |
| |
| if not user_input or not user_input.strip(): |
| raise ValueError("输入为空") |
| |
| # SDK 内置限制处理,但我们仍做基本检查 |
| if len(user_input) > 100000: |
| self.logger.warning(f"输入过大 ({len(user_input)} 字符),截断到 100KB") |
| user_input = user_input[:100000] |
| |
| # 使用 ClaudeSDKClient 而不是 query() 函数 |
| from claude_agent_sdk import ClaudeSDKClient |
| |
| # 判断是否为新会话(没有 session_id) |
| is_new_session = not self.session_id |
| if is_new_session: |
| self.logger.info("🆕 创建新会话(session_id 将从 SDK 响应中获取)") |
| else: |
| self.logger.debug(f"使用已存在的session: {self.session_id}") |
| |
| # 创建客户端 |
| # 新会话:不传递 resume(SDK 会生成新的 session_id) |
| # 已有会话:传递 resume=session_id(SDK 恢复会话) |
| resume_session = None if is_new_session else self.session_id |
| client = ClaudeSDKClient(options=self._get_options(resume_session=resume_session)) |
| |
| try: |
| # 连接并发送查询 |
| async with client: |
| await client.connect() |
| # 注意: client.query() 不接收 session_id 参数 |
| # session 管理完全通过 ClaudeAgentOptions 的 resume 参数处理 |
| await client.query(user_input) |
| |
| # 接收响应 |
| async for message in client.receive_response(): |
| yield message |
| finally: |
| # 确保断开连接 |
| try: |
| await client.disconnect() |
| except: |
| pass |
| |
| def add_mcp_tool(self, tool_name: str, tool_instance: Any): |
| """添加MCP工具""" |
| self.mcp_tools[tool_name] = tool_instance |
| self.logger.info(f"已添加MCP工具: {tool_name}") |
| |
| def get_conversation_history(self) -> List[Dict[str, str]]: |
| """获取对话历史(本地缓存)""" |
| return self.conversation_history.copy() |
| |
| def clear_history(self): |
| """清空对话历史""" |
| self.conversation_history.clear() |
| self.tasks.clear() |
| |
| def clean_conversation_history(self) -> int: |
| """ |
| 清理被污染的历史记录 |
| |
| 注意:V2.2 中,SDK 自动管理上下文,此方法主要清理本地缓存 |
| """ |
| if not self.conversation_history: |
| return 0 |
| |
| original_count = len(self.conversation_history) |
| cleaned_history = [] |
| removed_count = 0 |
| |
| for msg in self.conversation_history: |
| content = msg.get('content', '') |
| content_length = len(str(content)) |
| |
| # 过滤异常大的消息 |
| if content_length > 5000 or '系统提示:' in content or 'CLAUDE.md' in content: |
| removed_count += 1 |
| self.logger.info(f"移除污染历史条目: {content_length} 字符") |
| else: |
| cleaned_history.append(msg) |
| |
| self.conversation_history = cleaned_history |
| self.logger.info(f"历史记录清理: {original_count} → {len(cleaned_history)}") |
| |
| return removed_count |
| |
| def trim_conversation_history(self) -> int: |
| """修剪对话历史到最大限制""" |
| if len(self.conversation_history) <= self.max_history: |
| return 0 |
| |
| original_count = len(self.conversation_history) |
| self.conversation_history = self.conversation_history[-self.max_history:] |
| removed_count = original_count - len(self.conversation_history) |
| |
| self.logger.info(f"历史记录修剪: {original_count} → {len(self.conversation_history)}") |
| return removed_count |
| |
| async def create_streaming_response(self, user_input: str): |
| """创建流式响应生成器(统一使用 ClaudeSDKClient)""" |
| self.logger.info(f"AgentCore.create_streaming_response 开始") |
| |
| try: |
| # 统一使用流式处理,无需区分模式 |
| full_response = await self._interactive_process_for_streaming(user_input) |
| |
| self.logger.info(f"获得完整响应,长度: {len(full_response)} 字符") |
| |
| # 智能分块发送 |
| lines = full_response.split('\n') |
| current_chunk = "" |
| max_chunk_size = 100 |
| |
| for line in lines: |
| is_special_line = ( |
| line.strip().startswith('#') or |
| line.strip().startswith('```') or |
| line.strip().startswith('-') or |
| line.strip().startswith('*') or |
| line.strip().startswith('>') or |
| line.strip() == '---' |
| ) |
| |
| if (len(current_chunk) + len(line) + 1 > max_chunk_size and |
| current_chunk.strip() and not is_special_line): |
| yield current_chunk.strip() |
| current_chunk = line + "\n" |
| await asyncio.sleep(0.1) |
| else: |
| current_chunk += line + "\n" |
| |
| if current_chunk.strip(): |
| yield current_chunk.strip() |
| |
| except Exception as e: |
| self.logger.error(f"流式响应生成失败: {e}") |
| yield f"处理请求时出现错误: {str(e)}" |
| |
| async def _interactive_process_for_streaming(self, user_input: str) -> str: |
| """专为流式输出设计的交互模式处理""" |
| max_retries = 2 |
| retry_delay = 1.0 |
| timeout_seconds = 60 |
| |
| for attempt in range(max_retries): |
| try: |
| self.logger.info(f"流式处理开始,尝试 {attempt + 1}/{max_retries}") |
| |
| try: |
| # 异步生成器不能被await,直接调用获得生成器对象 |
| response_stream = self._make_query_call(user_input) |
| |
| # 在extract阶段应用超时控制 |
| response_text = await asyncio.wait_for( |
| self._extract_response_text(response_stream), |
| timeout=timeout_seconds |
| ) |
| except asyncio.TimeoutError: |
| self.logger.warning(f"SDK调用超时 ({timeout_seconds}s)") |
| if attempt == max_retries - 1: |
| return self._get_fallback_response(user_input, f"响应超时 ({timeout_seconds}s)") |
| continue |
| |
| if response_text.strip(): |
| self.conversation_history.append({"role": "assistant", "content": response_text}) |
| return response_text |
| else: |
| self.logger.warning(f"流式处理第 {attempt + 1} 次尝试得到空响应") |
| |
| except Exception as e: |
| error_msg = str(e) |
| self.logger.error(f"流式处理第 {attempt + 1} 次失败: {error_msg}") |
| |
| if attempt == max_retries - 1: |
| return self._get_fallback_response(user_input, error_msg) |
| |
| await asyncio.sleep(retry_delay) |
| retry_delay *= 1.5 |
| |
| return self._get_fallback_response(user_input, "多次重试后仍然失败") |
| |
| def to_dict(self) -> Dict[str, Any]: |
| """ |
| 序列化 Agent 状态(简化版) |
| |
| 只保存必要信息,对话历史由 Claude SDK session 管理 |
| """ |
| return { |
| "session_id": self.session_id, # SDK 会话 ID(核心) |
| "model": self.model, |
| "created_at": getattr(self, 'created_at', int(time.time())) |
| } |
| |
| @classmethod |
| def from_dict(cls, data: Dict[str, Any], api_key: Optional[str] = None) -> 'AgentCore': |
| """ |
| 反序列化 Agent 状态(简化版) |
| |
| 只恢复 session_id,对话历史由 Claude SDK 自动恢复 |
| """ |
| agent = cls(api_key=api_key, model=data.get("model", "claude-sonnet-4-5-20250929")) |
| |
| # 恢复 SDK session ID(对话历史由 SDK 管理) |
| agent.session_id = data.get("session_id") |
| agent.created_at = data.get("created_at", int(time.time())) |
| |
| return agent |
| |
| def get_memory_summary(self) -> Dict[str, Any]: |
| """ |
| 获取记忆摘要信息(简化版) |
| |
| 对话历史由 Claude SDK session 管理,这里只返回元数据 |
| """ |
| return { |
| "session_id": self.session_id, |
| "model": self.model, |
| "created_at": getattr(self, 'created_at', None), |
| "has_session": self.session_id is not None |
| } |