blob: f168cde8aa8a7d2a939adb1e43a1b7128681480b [file] [log] [blame] [raw]
"""
持久化存储管理器
实现对话历史和Agent状态的持久化存储
支持 claude-agent-sdk 的 Session 管理
"""
import json
import logging
import os
import time
from pathlib import Path
from typing import Dict, List, Any, Optional
from threading import Lock
from dataclasses import asdict
logger = logging.getLogger(__name__)
class PersistenceManager:
"""持久化存储管理器"""
def __init__(self, storage_dir: str = "data/storage", bot_id: Optional[str] = None):
"""
初始化持久化管理器
Args:
storage_dir: 基础存储目录路径
bot_id: Bot ID,用于多实例隔离。如果提供,将在存储目录下创建bot_id子目录
"""
base_storage_dir = Path(storage_dir)
if bot_id:
# 使用bot_id创建独立的存储目录
self.storage_dir = base_storage_dir / bot_id
self.bot_id = bot_id
else:
# 向后兼容,使用原有的存储目录
self.storage_dir = base_storage_dir
self.bot_id = None
self.storage_dir.mkdir(parents=True, exist_ok=True)
# 文件路径
self.conversation_file = self.storage_dir / "conversations.json"
self.agents_file = self.storage_dir / "agents.json"
self.sessions_file = self.storage_dir / "sessions.json" # 新增: session_id 映射
# Claude CLI session 目录 (默认 ~/.claude/sessions/)
self.claude_sessions_dir = Path.home() / ".claude" / "sessions"
# 线程锁,确保文件操作安全
self._lock = Lock()
if self.bot_id:
logger.info(f"持久化存储初始化 (Bot ID: {self.bot_id}),存储目录: {self.storage_dir}")
else:
logger.info(f"持久化存储初始化,存储目录: {self.storage_dir}")
def save_conversation_history(self, chat_id: str, history: List[Dict[str, str]]) -> bool:
"""
保存对话历史
Args:
chat_id: 聊天ID
history: 对话历史列表
Returns:
保存是否成功
"""
try:
with self._lock:
# 读取现有数据
conversations = self._load_json_file(self.conversation_file, {})
# 更新对话历史
conversations[chat_id] = {
"history": history,
"last_updated": int(time.time()),
"message_count": len(history)
}
# 保存到文件
success = self._save_json_file(self.conversation_file, conversations)
if success:
logger.debug(f"保存聊天 {chat_id} 的对话历史,条目数: {len(history)}")
return True
else:
return False
except Exception as e:
logger.error(f"保存对话历史失败: {e}")
return False
def load_conversation_history(self, chat_id: str) -> List[Dict[str, str]]:
"""
加载对话历史
Args:
chat_id: 聊天ID
Returns:
对话历史列表
"""
try:
with self._lock:
conversations = self._load_json_file(self.conversation_file, {})
if chat_id in conversations:
history = conversations[chat_id].get("history", [])
logger.debug(f"加载聊天 {chat_id} 的对话历史,条目数: {len(history)}")
return history
else:
logger.debug(f"聊天 {chat_id} 没有历史记录")
return []
except Exception as e:
logger.error(f"加载对话历史失败: {e}")
return []
def save_agent_state(self, chat_id: str, agent_data: Dict[str, Any]) -> bool:
"""
保存Agent状态
Args:
chat_id: 聊天ID
agent_data: Agent状态数据
Returns:
保存是否成功
"""
try:
with self._lock:
# 读取现有数据
agents = self._load_json_file(self.agents_file, {})
# 更新Agent状态
agents[chat_id] = {
**agent_data,
"last_updated": int(time.time())
}
# 保存到文件
success = self._save_json_file(self.agents_file, agents)
if success:
logger.debug(f"保存聊天 {chat_id} 的Agent状态")
return True
else:
return False
except Exception as e:
logger.error(f"保存Agent状态失败: {e}")
return False
def load_agent_state(self, chat_id: str) -> Optional[Dict[str, Any]]:
"""
加载Agent状态
Args:
chat_id: 聊天ID
Returns:
Agent状态数据,如果不存在则返回None
"""
try:
with self._lock:
agents = self._load_json_file(self.agents_file, {})
if chat_id in agents:
state = agents[chat_id]
logger.debug(f"加载聊天 {chat_id} 的Agent状态")
return state
else:
logger.debug(f"聊天 {chat_id} 没有Agent状态记录")
return None
except Exception as e:
logger.error(f"加载Agent状态失败: {e}")
return None
def get_all_chat_ids(self) -> List[str]:
"""
获取所有有记录的聊天ID
Returns:
聊天ID列表
"""
try:
with self._lock:
conversations = self._load_json_file(self.conversation_file, {})
chat_ids = list(conversations.keys())
logger.debug(f"获取所有聊天ID,数量: {len(chat_ids)}")
return chat_ids
except Exception as e:
logger.error(f"获取聊天ID列表失败: {e}")
return []
def cleanup_old_data(self, days_threshold: int = 30) -> int:
"""
清理旧数据
Args:
days_threshold: 保留天数阈值
Returns:
清理的记录数量
"""
try:
current_time = int(time.time())
threshold_time = current_time - (days_threshold * 24 * 3600)
cleanup_count = 0
with self._lock:
# 清理对话历史
conversations = self._load_json_file(self.conversation_file, {})
conversations_to_remove = []
for chat_id, data in conversations.items():
if data.get("last_updated", 0) < threshold_time:
conversations_to_remove.append(chat_id)
for chat_id in conversations_to_remove:
del conversations[chat_id]
cleanup_count += 1
if conversations_to_remove:
self._save_json_file(self.conversation_file, conversations)
# 清理Agent状态
agents = self._load_json_file(self.agents_file, {})
agents_to_remove = []
for chat_id, data in agents.items():
if data.get("last_updated", 0) < threshold_time:
agents_to_remove.append(chat_id)
for chat_id in agents_to_remove:
del agents[chat_id]
cleanup_count += len(agents_to_remove)
if agents_to_remove:
self._save_json_file(self.agents_file, agents)
logger.info(f"清理了 {cleanup_count} 条旧记录 (>{days_threshold}天)")
return cleanup_count
except Exception as e:
logger.error(f"清理旧数据失败: {e}")
return 0
def get_storage_stats(self) -> Dict[str, Any]:
"""
获取存储统计信息
Returns:
存储统计数据
"""
try:
with self._lock:
conversations = self._load_json_file(self.conversation_file, {})
agents = self._load_json_file(self.agents_file, {})
total_messages = sum(
data.get("message_count", 0)
for data in conversations.values()
)
stats = {
"total_chats": len(conversations),
"total_messages": total_messages,
"total_agents": len(agents),
"bot_id": self.bot_id,
"storage_dir": str(self.storage_dir),
"file_sizes": {
"conversations": self._get_file_size(self.conversation_file),
"agents": self._get_file_size(self.agents_file)
}
}
return stats
except Exception as e:
logger.error(f"获取存储统计失败: {e}")
return {}
def _load_json_file(self, file_path: Path, default: Any = None) -> Any:
"""加载JSON文件"""
try:
if file_path.exists():
with open(file_path, 'r', encoding='utf-8') as f:
return json.load(f)
else:
return default if default is not None else {}
except Exception as e:
logger.error(f"加载JSON文件失败 {file_path}: {e}")
return default if default is not None else {}
def _save_json_file(self, file_path: Path, data: Any) -> bool:
"""保存JSON文件"""
try:
# 先写入临时文件,然后原子性重命名
temp_file = file_path.with_suffix('.tmp')
with open(temp_file, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
# 原子性重命名
temp_file.replace(file_path)
return True
except Exception as e:
logger.error(f"保存JSON文件失败 {file_path}: {e}")
# 清理临时文件
temp_file = file_path.with_suffix('.tmp')
if temp_file.exists():
temp_file.unlink()
return False
def _get_file_size(self, file_path: Path) -> int:
"""获取文件大小(字节)"""
try:
if file_path.exists():
return file_path.stat().st_size
return 0
except Exception:
return 0
# ========== Session ID 管理 (V2.2 新增) ==========
def save_session_id(self, chat_id: str, session_id: str) -> bool:
"""
保存 chat_id 到 session_id 的映射
Args:
chat_id: 聊天ID
session_id: Claude SDK session ID
Returns:
保存是否成功
"""
try:
with self._lock:
sessions = self._load_json_file(self.sessions_file, {})
sessions[chat_id] = {
"session_id": session_id,
"last_updated": int(time.time())
}
success = self._save_json_file(self.sessions_file, sessions)
if success:
logger.debug(f"保存聊天 {chat_id} 的 session_id: {session_id}")
return True
else:
return False
except Exception as e:
logger.error(f"保存 session_id 失败: {e}")
return False
def load_session_id(self, chat_id: str) -> Optional[str]:
"""
加载 session_id(带有效性检查)
Args:
chat_id: 聊天ID
Returns:
session_id,如果不存在或无效则返回 None
"""
try:
with self._lock:
sessions = self._load_json_file(self.sessions_file, {})
data = sessions.get(chat_id)
if not data:
logger.debug(f"聊天 {chat_id} 没有保存的 session_id")
return None
session_id = data.get("session_id")
# 检查 CLI session 文件是否存在
if session_id and not self._session_file_exists(session_id):
logger.warning(f"Session 文件不存在: {session_id},将创建新 session")
return None
logger.debug(f"加载聊天 {chat_id} 的 session_id: {session_id}")
return session_id
except Exception as e:
logger.error(f"加载 session_id 失败: {e}")
return None
def _session_file_exists(self, session_id: str) -> bool:
"""
检查 CLI session 文件是否存在
Args:
session_id: Session ID
Returns:
session 文件是否存在
"""
# Session 文件通常在 ~/.claude/sessions/{session_id}/
session_path = self.claude_sessions_dir / session_id
exists = session_path.exists()
if not exists:
logger.debug(f"Session 目录不存在: {session_path}")
return exists
def clear_session_id(self, chat_id: str) -> bool:
"""
清除指定聊天的 session_id
Args:
chat_id: 聊天ID
Returns:
清除是否成功
"""
try:
with self._lock:
sessions = self._load_json_file(self.sessions_file, {})
if chat_id in sessions:
del sessions[chat_id]
success = self._save_json_file(self.sessions_file, sessions)
if success:
logger.info(f"清除聊天 {chat_id} 的 session_id")
return True
return False
return True # 如果不存在,也算成功
except Exception as e:
logger.error(f"清除 session_id 失败: {e}")
return False
def get_all_session_ids(self) -> Dict[str, str]:
"""
获取所有 chat_id -> session_id 映射
Returns:
映射字典
"""
try:
with self._lock:
sessions = self._load_json_file(self.sessions_file, {})
return {
chat_id: data.get("session_id")
for chat_id, data in sessions.items()
if data.get("session_id")
}
except Exception as e:
logger.error(f"获取所有 session_id 失败: {e}")
return {}