blob: d16527613f408373cdd437f242189b5d03a3c042 [file] [log] [blame] [raw]
"""
Claude Agent适配器
将Telegram Bot与现有的Claude Agent集成
V2.2 重构:
- 使用 claude-agent-sdk 替代 claude-code-sdk
- 利用 SDK 内置的 Automatic Compaction
- 简化代码,移除手动上下文管理
"""
import logging
import os
from pathlib import Path
from typing import List, Dict, Any, Optional, Union
import asyncio
from .interfaces import IClaudeAgent
from ..core.agent import AgentCore
from ..storage.persistence import PersistenceManager
logger = logging.getLogger(__name__)
class ClaudeAgentAdapter(IClaudeAgent):
"""
Claude Agent适配器实现
V2.2 重构说明:
- SDK 内置 Automatic Compaction,自动管理上下文大小
- 移除了手动的 full_prompt 拼接和历史记录管理
- 简化消息处理流程
"""
def __init__(
self,
agent: Optional[AgentCore] = None,
storage_dir: str = "data/storage",
bot_id: Optional[str] = None
):
"""
初始化Claude Agent适配器
Args:
agent: Claude Agent实例,如果为None则创建新实例
storage_dir: 持久化存储目录
bot_id: Bot ID,用于多实例隔离
"""
# 为每个聊天维护独立的Agent实例,避免串台
self._agents: Dict[str, AgentCore] = {}
self._claude_md_content = self._load_claude_md()
self.bot_id = bot_id
# 初始化持久化存储管理器(传递bot_id用于多实例隔离)
self.persistence = PersistenceManager(storage_dir, bot_id=bot_id)
# 在启动时恢复所有已保存的Agent状态
self._restore_agents()
if bot_id:
logger.info(f"Claude Agent适配器初始化完成 (Bot ID: {bot_id}),存储目录: {storage_dir}")
else:
logger.info(f"Claude Agent适配器初始化完成,存储目录: {storage_dir}")
def _get_or_create_agent(self, chat_id: Union[int, str]) -> AgentCore:
"""
获取或创建指定聊天的Agent实例
V2.2: 简化,移除了手动历史清理(SDK 自动管理)
Args:
chat_id: 聊天ID
Returns:
该聊天专用的Agent实例
"""
chat_key = str(chat_id)
if chat_key not in self._agents:
# 尝试从持久化存储中恢复Agent状态
agent_state = self.persistence.load_agent_state(chat_key)
if agent_state:
# 从保存的状态恢复Agent
self._agents[chat_key] = AgentCore.from_dict(agent_state)
# 更新system_prompt为最新的CLAUDE.md内容
self._agents[chat_key].system_prompt = self._claude_md_content
logger.info(f"从持久化存储恢复聊天 {chat_id} 的Agent实例,并更新系统提示词")
# 清理本地缓存中的污染历史
cleaned_count = self._agents[chat_key].clean_conversation_history()
if cleaned_count > 0:
logger.warning(f"清理了聊天 {chat_id} 的 {cleaned_count} 条污染历史记录")
# 修剪历史记录到合理数量
trimmed_count = self._agents[chat_key].trim_conversation_history()
if trimmed_count > 0:
logger.info(f"修剪了聊天 {chat_id} 的 {trimmed_count} 条旧历史记录")
# 如果有任何变更,立即保存状态
if cleaned_count > 0 or trimmed_count > 0:
self.persistence.save_agent_state(chat_key, self._agents[chat_key].to_dict())
else:
# 创建新的Agent实例,使用 CLAUDE.md 作为系统提示
self._agents[chat_key] = AgentCore(system_prompt=self._claude_md_content)
logger.info(f"为聊天 {chat_id} 创建新的Agent实例")
return self._agents[chat_key]
async def process_message(
self,
message: str,
context: List[Dict[str, Any]],
user_info: Dict[str, Any]
) -> str:
"""
处理用户消息,返回回复
V2.2 简化: 直接使用 Agent 处理,SDK 自动管理上下文
Args:
message: 用户消息
context: 对话上下文 (用于显示,不影响 SDK 上下文)
user_info: 用户信息
Returns:
回复消息
"""
try:
chat_id = user_info.get('chat_id', 'unknown')
agent = self._get_or_create_agent(chat_id)
# 构建带用户信息的消息
formatted_message = self._format_user_message(message, user_info)
# 调用 Agent 处理 (SDK 自动管理上下文和压缩)
response = await agent.process_user_input(formatted_message)
# 保存Agent状态到持久化存储
await self._save_agent_state(chat_id, agent)
return response
except Exception as e:
logger.error(f"消息处理错误: {e}")
return self._get_telegram_fallback_response(message, str(e))
async def process_with_image(
self,
message: str,
image_path: str,
context: List[Dict[str, Any]],
user_info: Dict[str, Any]
) -> str:
"""处理包含图片的消息"""
try:
chat_id = user_info.get('chat_id', 'unknown')
agent = self._get_or_create_agent(chat_id)
# 构建包含图片信息的消息
image_message = f"用户发送了一张图片(路径: {image_path})\n用户消息: {message}"
formatted_message = self._format_user_message(image_message, user_info)
response = await agent.process_user_input(formatted_message)
await self._save_agent_state(chat_id, agent)
return response
except Exception as e:
logger.error(f"图片消息处理错误: {e}")
return self._get_telegram_fallback_response(message, str(e))
async def process_with_document(
self,
message: str,
document_path: str,
context: List[Dict[str, Any]],
user_info: Dict[str, Any]
) -> str:
"""处理包含文档的消息"""
try:
chat_id = user_info.get('chat_id', 'unknown')
agent = self._get_or_create_agent(chat_id)
document_message = f"用户发送了一个文档(路径: {document_path})\n用户消息: {message}"
formatted_message = self._format_user_message(document_message, user_info)
response = await agent.process_user_input(formatted_message)
await self._save_agent_state(chat_id, agent)
return response
except Exception as e:
logger.error(f"文档消息处理错误: {e}")
return self._get_telegram_fallback_response(message, str(e))
def _format_user_message(self, message: str, user_info: Dict[str, Any]) -> str:
"""
格式化用户消息,添加用户身份信息
Args:
message: 原始消息
user_info: 用户信息
Returns:
格式化后的消息
"""
user_context = self._build_user_context(user_info)
if user_context:
return f"{user_context}\n\n{message}"
return message
def _build_user_context(self, user_info: Dict[str, Any]) -> str:
"""构建用户上下文信息"""
context_parts = []
# 🔍 优先检查是否来自 webhook(其他 bot 的消息)
if user_info.get('source') == 'webhook' and user_info.get('bot_username'):
bot_name = user_info.get('bot_username')
context_parts.append(f"📡 来自其他Bot: {bot_name}(Webhook转发)")
# 如果有原始用户信息且不是bot消息,也标注出来
if not user_info.get('is_bot') and user_info.get('username'):
context_parts.append(f"原始发送者: @{user_info['username']}")
return "消息来源: " + ", ".join(context_parts) if context_parts else ""
# 原有逻辑:处理直接用户消息
# 添加消息时间
if user_info.get('message_time'):
context_parts.append(f"时间: {user_info['message_time']}")
if user_info.get('username'):
context_parts.append(f"用户名: @{user_info['username']}")
if user_info.get('first_name'):
context_parts.append(f"名字: {user_info['first_name']}")
if user_info.get('last_name'):
context_parts.append(f"姓氏: {user_info['last_name']}")
chat_type = user_info.get('chat_type', 'private')
if chat_type == 'group':
context_parts.append("这是群组对话")
elif chat_type == 'supergroup':
context_parts.append("这是超级群组对话")
elif chat_type == 'channel':
context_parts.append("这是频道消息")
else:
context_parts.append("这是私人对话")
if user_info.get('language_code'):
context_parts.append(f"语言: {user_info['language_code']}")
return "用户信息: " + ", ".join(context_parts) if context_parts else ""
def _load_claude_md(self) -> str:
"""加载CLAUDE.md文件内容"""
claude_md_path = os.getenv('CLAUDE_MD_PATH')
if claude_md_path and Path(claude_md_path).exists():
try:
with open(claude_md_path, 'r', encoding='utf-8') as f:
content = f.read().strip()
logger.info(f"成功加载CLAUDE.md文件: {claude_md_path}")
return content
except Exception as e:
logger.error(f"读取CLAUDE.md文件失败: {e}")
logger.info("使用默认Telegram Bot提示词")
return "你是一个有用的AI助手,请礼貌地回答用户的问题。"
def _get_telegram_fallback_response(self, user_input: str, error_details: str) -> str:
"""Telegram专用的个性化备用响应"""
logger.warning(f"使用Telegram个性化备用响应: {error_details}")
input_lower = user_input.lower()
# 检查是否是API额度不足错误
if any(keyword in error_details for keyword in ["预扣费额度失败", "403", "剩余额度", "API Error", "/login"]):
return "呜呜~主人!我的AI大脑暂时欠费了...💸😿 开发者需要给我充值才能继续为主人服务呢!"
# 检查是否是超时错误
if "超时" in error_details or "timeout" in error_details.lower():
return "哎呀~主人,我的大脑好像卡住了一下下...💭 要不你再问我一遍?"
# 基于用户输入内容的智能回复
if any(word in input_lower for word in ['你好', 'hello', 'hi', '嗨', '早', '晚上好']):
return "嗨~主人!虽然我现在系统有点小毛病,但看到主人还是超开心的呢!💕"
elif any(word in input_lower for word in ['谢谢', 'thank', '感谢']):
return "呜呜~主人这么客气干嘛!虽然我现在有点技术问题,但为主人做事是应该的啦!💖"
elif any(word in input_lower for word in ['再见', 'bye', '拜拜', '睡觉']):
return "主人要离开了吗?虽然我现在状态不太好,但...但是会想你的!😭 晚安~💤"
else:
return f"主人~我听到你的话了,但是现在我的AI大脑有点小故障...🤖💧\n\n(技术备注: {error_details})"
async def create_streaming_response(
self,
message: str,
context: List[Dict[str, Any]],
user_info: Dict[str, Any]
):
"""
创建流式响应生成器
V2.2 简化: 直接使用 Agent 的流式响应,SDK 自动管理上下文
"""
logger.info(f"create_streaming_response 开始")
try:
chat_id = user_info.get('chat_id', 'unknown')
agent = self._get_or_create_agent(chat_id)
# 构建带用户信息的消息
formatted_message = self._format_user_message(message, user_info)
logger.info(f"格式化消息长度: {len(formatted_message)} 字符")
# 使用 Agent 的流式响应 (SDK 自动管理上下文)
async for chunk in agent.create_streaming_response(formatted_message):
yield chunk
# 流式响应完成后保存Agent状态
await self._save_agent_state(chat_id, agent)
logger.info(f"已保存聊天 {chat_id} 的流式响应状态")
except Exception as e:
logger.error(f"流式响应生成错误: {e}")
yield self._get_telegram_fallback_response(message, str(e))
async def _save_agent_state(self, chat_id: Union[int, str], agent: AgentCore) -> bool:
"""
保存Agent状态到持久化存储(简化版)
只保存 session_id 等元数据,对话历史由 Claude SDK 管理
"""
try:
chat_key = str(chat_id)
# 保存Agent状态(只包含 session_id, model, created_at)
agent_data = agent.to_dict()
success = self.persistence.save_agent_state(chat_key, agent_data)
if success:
logger.debug(f"成功保存聊天 {chat_id} 的Agent状态 (session_id: {agent.session_id})")
return True
else:
logger.warning(f"保存聊天 {chat_id} 的状态失败")
return False
except Exception as e:
logger.error(f"保存Agent状态失败: {e}")
return False
def _restore_agents(self):
"""启动时恢复所有保存的Agent实例"""
try:
chat_ids = self.persistence.get_all_chat_ids()
restored_count = 0
for chat_id in chat_ids:
try:
agent_state = self.persistence.load_agent_state(chat_id)
if agent_state:
restored_count += 1
except Exception as e:
logger.warning(f"跳过恢复聊天 {chat_id} 的Agent: {e}")
logger.info(f"启动时发现 {restored_count} 个可恢复的聊天记录")
except Exception as e:
logger.error(f"恢复Agent状态失败: {e}")
def set_agent(self, agent: AgentCore):
"""设置Agent实例(已废弃)"""
logger.warning("set_agent方法已废弃,现在为每个聊天维护独立的Agent实例")
def get_agent(self, chat_id: Union[int, str] = None) -> Optional[AgentCore]:
"""获取Agent实例"""
if chat_id is not None:
return self._get_or_create_agent(chat_id)
return None
def get_storage_stats(self) -> Dict[str, Any]:
"""获取存储统计信息"""
try:
stats = self.persistence.get_storage_stats()
stats["active_agents"] = len(self._agents)
return stats
except Exception as e:
logger.error(f"获取存储统计失败: {e}")
return {}
async def cleanup_old_conversations(self, days_threshold: int = 30) -> int:
"""清理旧的对话记录"""
try:
cleanup_count = self.persistence.cleanup_old_data(days_threshold)
logger.info(f"清理了 {cleanup_count} 条旧对话记录")
return cleanup_count
except Exception as e:
logger.error(f"清理旧对话记录失败: {e}")
return 0
async def _save_message_to_agent(
self,
chat_id: Union[int, str],
user_id: int,
message: str,
is_bot: bool = False,
user_info: Dict[str, Any] = None
):
"""
保存消息到Agent的历史记录中(用于Webhook消息)
Args:
chat_id: 聊天ID
user_id: 用户ID
message: 消息内容
is_bot: 是否为Bot消息
user_info: 用户信息字典
"""
try:
agent = self._get_or_create_agent(chat_id)
user_display_name = self._build_user_display_name(user_id, user_info or {})
if is_bot:
message_entry = {
"role": "assistant",
"content": f"[Bot: {user_display_name}] {message}"
}
else:
message_entry = {
"role": "user",
"content": f"[{user_display_name}] {message}"
}
agent.conversation_history.append(message_entry)
await self._save_agent_state(chat_id, agent)
logger.debug(f"Webhook消息已保存到Agent - 聊天: {chat_id}")
except Exception as e:
logger.error(f"保存Webhook消息到Agent失败: {e}")
raise
def _build_user_display_name(self, user_id: int, user_info: dict) -> str:
"""构建用户显示名称"""
if not user_info:
return f"User{user_id}" if user_id != 'unknown' else "群友"
if user_info.get('username'):
return f"@{user_info['username']}"
name_parts = []
if user_info.get('first_name'):
name_parts.append(user_info['first_name'])
if user_info.get('last_name'):
name_parts.append(user_info['last_name'])
if name_parts:
return " ".join(name_parts)
return f"User{user_id}" if user_id != 'unknown' else "群友"