blob: b2fa5e931713ddb4f7329c4dcbf2eb6091cdaea7 [file] [log] [blame] [raw]
"""
核心Agent类 - 实现智能对话和任务执行
基于 claude-agent-sdk V2.2 重构,使用内置 Automatic Compaction
"""
import asyncio
import json
import logging
import time
from typing import Dict, List, Optional, Any, AsyncGenerator
from enum import Enum
from dataclasses import dataclass, asdict
# 使用新的 claude-agent-sdk
from claude_agent_sdk import (
ClaudeAgentOptions,
ClaudeSDKClient,
AssistantMessage,
SystemMessage,
ResultMessage,
TextBlock,
)
class InputCompletionSignal(Enum):
TIMEOUT = "timeout"
DELIMITER = "delimiter"
EXPLICIT = "explicit"
MAX_LENGTH = "max_length"
@dataclass
class StreamInputBuffer:
buffer: str = ""
last_update_time: float = 0.0
is_complete: bool = False
completion_signal: Optional[InputCompletionSignal] = None
timeout_seconds: float = 3.0
max_buffer_size: int = 10000
delimiter: str = "\n\n"
@dataclass
class Task:
"""任务数据类"""
id: str
description: str
status: str = "pending" # pending, in_progress, completed, failed
result: Optional[str] = None
subtasks: List['Task'] = None
def __post_init__(self):
if self.subtasks is None:
self.subtasks = []
class AgentCore:
"""
核心Agent类 - 处理对话、任务规划和执行
V2.2 重构:
- 使用 claude-agent-sdk 替代 claude-code-sdk
- 利用 SDK 内置的 Automatic Compaction
- 简化代码,移除手动上下文管理
"""
def __init__(
self,
api_key: Optional[str] = None,
model: str = "claude-sonnet-4-5-20250929",
max_history: int = 100,
system_prompt: Optional[str] = None
):
"""
初始化 AgentCore
Args:
api_key: API 密钥(已废弃,SDK 自动处理)
model: 使用的模型
max_history: 最大历史记录数(用于本地缓存,SDK 自动压缩)
system_prompt: 系统提示词
"""
self.model = model
self.max_history = max_history
self.system_prompt = system_prompt or "你是一个智能助手,可以帮助用户完成各种任务。"
self.tasks: List[Task] = []
self.conversation_history: List[Dict[str, str]] = [] # 本地缓存,用于显示
self.mcp_tools: Dict[str, Any] = {}
self.logger = logging.getLogger(__name__)
self.stream_input_buffer: Optional[StreamInputBuffer] = None
# SDK session管理
self.session_id: Optional[str] = None # 保存session ID用于上下文连续性
# SDK 客户端实例(按需创建)
self._sdk_client: Optional[ClaudeSDKClient] = None
self.logger.info(f"AgentCore 初始化完成 - 模型: {model}, SDK: claude-agent-sdk")
def _get_options(self, resume_session: Optional[str] = None) -> ClaudeAgentOptions:
"""
构建 SDK 配置选项
Args:
resume_session: 要恢复的 session ID,None 表示创建新会话
Returns:
ClaudeAgentOptions 配置
"""
import sys
from pathlib import Path
# 获取telegram_image_tools.py的绝对路径
mcp_module_path = Path(__file__).parent.parent / "mcp" / "telegram_image_tools.py"
options = ClaudeAgentOptions(
model=self.model,
system_prompt=self.system_prompt,
permission_mode="bypassPermissions",
# 配置MCP图片分析工具
mcp_servers={
"telegram-image": {
"command": sys.executable, # 使用当前Python解释器
"args": [str(mcp_module_path)]
}
}
)
# 只有在恢复已存在的会话时才传递 resume 参数
if resume_session:
options.resume = resume_session
self.logger.debug(f"使用 resume 参数恢复会话: {resume_session}")
return options
async def process_user_input(self, user_input: str) -> str:
"""
处理用户输入(统一处理逻辑,使用 ClaudeSDKClient)
Args:
user_input: 用户输入文本
Returns:
AI 响应
"""
# 添加到本地历史缓存(用于显示)
self.conversation_history.append({"role": "user", "content": user_input})
# 统一使用 ClaudeSDKClient 处理,包含重试逻辑
max_retries = 2
retry_delay = 1.0
timeout_seconds = 60
for attempt in range(max_retries):
try:
self.logger.info(f"处理用户输入,尝试 {attempt + 1}/{max_retries}")
try:
response_stream = await asyncio.wait_for(
self._make_query_call(user_input),
timeout=timeout_seconds
)
except asyncio.TimeoutError:
self.logger.warning(f"SDK调用超时 ({timeout_seconds}s)")
if attempt == max_retries - 1:
return self._get_fallback_response(user_input, f"响应超时 ({timeout_seconds}s)")
continue
response_text = await self._extract_response_text(response_stream)
if response_text.strip():
self.conversation_history.append({"role": "assistant", "content": response_text})
return response_text
else:
self.logger.warning(f"第 {attempt + 1} 次尝试得到空响应")
except Exception as e:
error_msg = str(e)
self.logger.error(f"第 {attempt + 1} 次处理失败: {error_msg}")
if attempt == max_retries - 1:
return self._get_fallback_response(user_input, error_msg)
await asyncio.sleep(retry_delay)
retry_delay *= 1.5
return self._get_fallback_response(user_input, "多次重试后仍然失败")
async def _extract_response_text(self, response_stream) -> str:
"""
从响应流中提取文本,并捕获 SDK 的 session_id
重要:第一次对话时,从 init 消息中获取 SDK 生成的 session_id
"""
response_text = ""
async for message in response_stream:
# 🔑 捕获 SDK 生成的真正 session_id(从 init 消息)
if hasattr(message, 'subtype') and message.subtype == 'init':
if hasattr(message, 'data') and 'session_id' in message.data:
sdk_session_id = message.data['session_id']
if sdk_session_id != self.session_id:
self.logger.info(f"✅ 捕获 SDK session_id: {sdk_session_id}")
self.session_id = sdk_session_id
# 提取文本内容
if isinstance(message, AssistantMessage):
for block in message.content:
if isinstance(block, TextBlock):
response_text += block.text
elif hasattr(block, 'text'):
response_text += block.text
# 跳过 SystemMessage 和 ResultMessage
return response_text
def _get_fallback_response(self, user_input: str, error_details: str) -> str:
"""当SDK失败时的备用响应"""
self.logger.warning(f"使用备用响应: {error_details}")
if any(keyword in error_details for keyword in ["预扣费额度失败", "403", "剩余额度", "API Error", "/login"]):
return "抱歉,AI服务暂时不可用,可能是API配额不足。请联系管理员检查服务状态。"
if "超时" in error_details or "timeout" in error_details.lower():
return "请求超时,请稍后重试。如果问题持续,请尝试简化您的问题。"
return f"抱歉,处理您的请求时遇到技术问题。错误信息:{error_details}"
async def _make_query_call(self, user_input: str):
"""
使用 ClaudeSDKClient 进行有状态的对话
ClaudeSDKClient 特点:
- 有状态(Stateful):维护对话上下文
- 支持 resume 参数:可以恢复之前的对话
- 适合交互式对话应用
"""
self.logger.debug(f"调用 SDK query,输入长度: {len(user_input)}, session_id: {self.session_id}")
if not user_input or not user_input.strip():
raise ValueError("输入为空")
# SDK 内置限制处理,但我们仍做基本检查
if len(user_input) > 100000:
self.logger.warning(f"输入过大 ({len(user_input)} 字符),截断到 100KB")
user_input = user_input[:100000]
# 使用 ClaudeSDKClient 而不是 query() 函数
from claude_agent_sdk import ClaudeSDKClient
# 判断是否为新会话(没有 session_id)
is_new_session = not self.session_id
if is_new_session:
self.logger.info("🆕 创建新会话(session_id 将从 SDK 响应中获取)")
else:
self.logger.debug(f"使用已存在的session: {self.session_id}")
# 创建客户端
# 新会话:不传递 resume(SDK 会生成新的 session_id)
# 已有会话:传递 resume=session_id(SDK 恢复会话)
resume_session = None if is_new_session else self.session_id
client = ClaudeSDKClient(options=self._get_options(resume_session=resume_session))
try:
# 连接并发送查询
async with client:
await client.connect()
# 注意: client.query() 不接收 session_id 参数
# session 管理完全通过 ClaudeAgentOptions 的 resume 参数处理
await client.query(user_input)
# 接收响应
async for message in client.receive_response():
yield message
finally:
# 确保断开连接
try:
await client.disconnect()
except:
pass
def add_mcp_tool(self, tool_name: str, tool_instance: Any):
"""添加MCP工具"""
self.mcp_tools[tool_name] = tool_instance
self.logger.info(f"已添加MCP工具: {tool_name}")
def get_conversation_history(self) -> List[Dict[str, str]]:
"""获取对话历史(本地缓存)"""
return self.conversation_history.copy()
def clear_history(self):
"""清空对话历史"""
self.conversation_history.clear()
self.tasks.clear()
def clean_conversation_history(self) -> int:
"""
清理被污染的历史记录
注意:V2.2 中,SDK 自动管理上下文,此方法主要清理本地缓存
"""
if not self.conversation_history:
return 0
original_count = len(self.conversation_history)
cleaned_history = []
removed_count = 0
for msg in self.conversation_history:
content = msg.get('content', '')
content_length = len(str(content))
# 过滤异常大的消息
if content_length > 5000 or '系统提示:' in content or 'CLAUDE.md' in content:
removed_count += 1
self.logger.info(f"移除污染历史条目: {content_length} 字符")
else:
cleaned_history.append(msg)
self.conversation_history = cleaned_history
self.logger.info(f"历史记录清理: {original_count} → {len(cleaned_history)}")
return removed_count
def trim_conversation_history(self) -> int:
"""修剪对话历史到最大限制"""
if len(self.conversation_history) <= self.max_history:
return 0
original_count = len(self.conversation_history)
self.conversation_history = self.conversation_history[-self.max_history:]
removed_count = original_count - len(self.conversation_history)
self.logger.info(f"历史记录修剪: {original_count} → {len(self.conversation_history)}")
return removed_count
async def create_streaming_response(self, user_input: str):
"""创建流式响应生成器(统一使用 ClaudeSDKClient)"""
self.logger.info(f"AgentCore.create_streaming_response 开始")
try:
# 统一使用流式处理,无需区分模式
full_response = await self._interactive_process_for_streaming(user_input)
self.logger.info(f"获得完整响应,长度: {len(full_response)} 字符")
# 智能分块发送
lines = full_response.split('\n')
current_chunk = ""
max_chunk_size = 100
for line in lines:
is_special_line = (
line.strip().startswith('#') or
line.strip().startswith('```') or
line.strip().startswith('-') or
line.strip().startswith('*') or
line.strip().startswith('>') or
line.strip() == '---'
)
if (len(current_chunk) + len(line) + 1 > max_chunk_size and
current_chunk.strip() and not is_special_line):
yield current_chunk.strip()
current_chunk = line + "\n"
await asyncio.sleep(0.1)
else:
current_chunk += line + "\n"
if current_chunk.strip():
yield current_chunk.strip()
except Exception as e:
self.logger.error(f"流式响应生成失败: {e}")
yield f"处理请求时出现错误: {str(e)}"
async def _interactive_process_for_streaming(self, user_input: str) -> str:
"""专为流式输出设计的交互模式处理"""
max_retries = 2
retry_delay = 1.0
timeout_seconds = 60
for attempt in range(max_retries):
try:
self.logger.info(f"流式处理开始,尝试 {attempt + 1}/{max_retries}")
try:
# 异步生成器不能被await,直接调用获得生成器对象
response_stream = self._make_query_call(user_input)
# 在extract阶段应用超时控制
response_text = await asyncio.wait_for(
self._extract_response_text(response_stream),
timeout=timeout_seconds
)
except asyncio.TimeoutError:
self.logger.warning(f"SDK调用超时 ({timeout_seconds}s)")
if attempt == max_retries - 1:
return self._get_fallback_response(user_input, f"响应超时 ({timeout_seconds}s)")
continue
if response_text.strip():
self.conversation_history.append({"role": "assistant", "content": response_text})
return response_text
else:
self.logger.warning(f"流式处理第 {attempt + 1} 次尝试得到空响应")
except Exception as e:
error_msg = str(e)
self.logger.error(f"流式处理第 {attempt + 1} 次失败: {error_msg}")
if attempt == max_retries - 1:
return self._get_fallback_response(user_input, error_msg)
await asyncio.sleep(retry_delay)
retry_delay *= 1.5
return self._get_fallback_response(user_input, "多次重试后仍然失败")
def to_dict(self) -> Dict[str, Any]:
"""
序列化 Agent 状态(简化版)
只保存必要信息,对话历史由 Claude SDK session 管理
"""
return {
"session_id": self.session_id, # SDK 会话 ID(核心)
"model": self.model,
"created_at": getattr(self, 'created_at', int(time.time()))
}
@classmethod
def from_dict(cls, data: Dict[str, Any], api_key: Optional[str] = None) -> 'AgentCore':
"""
反序列化 Agent 状态(简化版)
只恢复 session_id,对话历史由 Claude SDK 自动恢复
"""
agent = cls(api_key=api_key, model=data.get("model", "claude-sonnet-4-5-20250929"))
# 恢复 SDK session ID(对话历史由 SDK 管理)
agent.session_id = data.get("session_id")
agent.created_at = data.get("created_at", int(time.time()))
return agent
def get_memory_summary(self) -> Dict[str, Any]:
"""
获取记忆摘要信息(简化版)
对话历史由 Claude SDK session 管理,这里只返回元数据
"""
return {
"session_id": self.session_id,
"model": self.model,
"created_at": getattr(self, 'created_at', None),
"has_session": self.session_id is not None
}