| """ |
| 持久化存储管理器 |
| 实现对话历史和Agent状态的持久化存储 |
| 支持 claude-agent-sdk 的 Session 管理 |
| """ |
| |
| import json |
| import logging |
| import os |
| import time |
| from pathlib import Path |
| from typing import Dict, List, Any, Optional |
| from threading import Lock |
| from dataclasses import asdict |
| |
| logger = logging.getLogger(__name__) |
| |
| |
| class PersistenceManager: |
| """持久化存储管理器""" |
| |
| def __init__(self, storage_dir: str = "data/storage", bot_id: Optional[str] = None): |
| """ |
| 初始化持久化管理器 |
| |
| Args: |
| storage_dir: 基础存储目录路径 |
| bot_id: Bot ID,用于多实例隔离。如果提供,将在存储目录下创建bot_id子目录 |
| """ |
| base_storage_dir = Path(storage_dir) |
| |
| if bot_id: |
| # 使用bot_id创建独立的存储目录 |
| self.storage_dir = base_storage_dir / bot_id |
| self.bot_id = bot_id |
| else: |
| # 向后兼容,使用原有的存储目录 |
| self.storage_dir = base_storage_dir |
| self.bot_id = None |
| |
| self.storage_dir.mkdir(parents=True, exist_ok=True) |
| |
| # 文件路径 |
| self.conversation_file = self.storage_dir / "conversations.json" |
| self.agents_file = self.storage_dir / "agents.json" |
| self.sessions_file = self.storage_dir / "sessions.json" # 新增: session_id 映射 |
| |
| # Claude CLI session 目录 (默认 ~/.claude/sessions/) |
| self.claude_sessions_dir = Path.home() / ".claude" / "sessions" |
| |
| # 线程锁,确保文件操作安全 |
| self._lock = Lock() |
| |
| if self.bot_id: |
| logger.info(f"持久化存储初始化 (Bot ID: {self.bot_id}),存储目录: {self.storage_dir}") |
| else: |
| logger.info(f"持久化存储初始化,存储目录: {self.storage_dir}") |
| |
| def save_conversation_history(self, chat_id: str, history: List[Dict[str, str]]) -> bool: |
| """ |
| 保存对话历史 |
| |
| Args: |
| chat_id: 聊天ID |
| history: 对话历史列表 |
| |
| Returns: |
| 保存是否成功 |
| """ |
| try: |
| with self._lock: |
| # 读取现有数据 |
| conversations = self._load_json_file(self.conversation_file, {}) |
| |
| # 更新对话历史 |
| conversations[chat_id] = { |
| "history": history, |
| "last_updated": int(time.time()), |
| "message_count": len(history) |
| } |
| |
| # 保存到文件 |
| success = self._save_json_file(self.conversation_file, conversations) |
| |
| if success: |
| logger.debug(f"保存聊天 {chat_id} 的对话历史,条目数: {len(history)}") |
| return True |
| else: |
| return False |
| |
| except Exception as e: |
| logger.error(f"保存对话历史失败: {e}") |
| return False |
| |
| def load_conversation_history(self, chat_id: str) -> List[Dict[str, str]]: |
| """ |
| 加载对话历史 |
| |
| Args: |
| chat_id: 聊天ID |
| |
| Returns: |
| 对话历史列表 |
| """ |
| try: |
| with self._lock: |
| conversations = self._load_json_file(self.conversation_file, {}) |
| |
| if chat_id in conversations: |
| history = conversations[chat_id].get("history", []) |
| logger.debug(f"加载聊天 {chat_id} 的对话历史,条目数: {len(history)}") |
| return history |
| else: |
| logger.debug(f"聊天 {chat_id} 没有历史记录") |
| return [] |
| |
| except Exception as e: |
| logger.error(f"加载对话历史失败: {e}") |
| return [] |
| |
| def save_agent_state(self, chat_id: str, agent_data: Dict[str, Any]) -> bool: |
| """ |
| 保存Agent状态 |
| |
| Args: |
| chat_id: 聊天ID |
| agent_data: Agent状态数据 |
| |
| Returns: |
| 保存是否成功 |
| """ |
| try: |
| with self._lock: |
| # 读取现有数据 |
| agents = self._load_json_file(self.agents_file, {}) |
| |
| # 更新Agent状态 |
| agents[chat_id] = { |
| **agent_data, |
| "last_updated": int(time.time()) |
| } |
| |
| # 保存到文件 |
| success = self._save_json_file(self.agents_file, agents) |
| |
| if success: |
| logger.debug(f"保存聊天 {chat_id} 的Agent状态") |
| return True |
| else: |
| return False |
| |
| except Exception as e: |
| logger.error(f"保存Agent状态失败: {e}") |
| return False |
| |
| def load_agent_state(self, chat_id: str) -> Optional[Dict[str, Any]]: |
| """ |
| 加载Agent状态 |
| |
| Args: |
| chat_id: 聊天ID |
| |
| Returns: |
| Agent状态数据,如果不存在则返回None |
| """ |
| try: |
| with self._lock: |
| agents = self._load_json_file(self.agents_file, {}) |
| |
| if chat_id in agents: |
| state = agents[chat_id] |
| logger.debug(f"加载聊天 {chat_id} 的Agent状态") |
| return state |
| else: |
| logger.debug(f"聊天 {chat_id} 没有Agent状态记录") |
| return None |
| |
| except Exception as e: |
| logger.error(f"加载Agent状态失败: {e}") |
| return None |
| |
| def get_all_chat_ids(self) -> List[str]: |
| """ |
| 获取所有有记录的聊天ID |
| |
| Returns: |
| 聊天ID列表 |
| """ |
| try: |
| with self._lock: |
| conversations = self._load_json_file(self.conversation_file, {}) |
| chat_ids = list(conversations.keys()) |
| logger.debug(f"获取所有聊天ID,数量: {len(chat_ids)}") |
| return chat_ids |
| |
| except Exception as e: |
| logger.error(f"获取聊天ID列表失败: {e}") |
| return [] |
| |
| def cleanup_old_data(self, days_threshold: int = 30) -> int: |
| """ |
| 清理旧数据 |
| |
| Args: |
| days_threshold: 保留天数阈值 |
| |
| Returns: |
| 清理的记录数量 |
| """ |
| try: |
| current_time = int(time.time()) |
| threshold_time = current_time - (days_threshold * 24 * 3600) |
| cleanup_count = 0 |
| |
| with self._lock: |
| # 清理对话历史 |
| conversations = self._load_json_file(self.conversation_file, {}) |
| conversations_to_remove = [] |
| |
| for chat_id, data in conversations.items(): |
| if data.get("last_updated", 0) < threshold_time: |
| conversations_to_remove.append(chat_id) |
| |
| for chat_id in conversations_to_remove: |
| del conversations[chat_id] |
| cleanup_count += 1 |
| |
| if conversations_to_remove: |
| self._save_json_file(self.conversation_file, conversations) |
| |
| # 清理Agent状态 |
| agents = self._load_json_file(self.agents_file, {}) |
| agents_to_remove = [] |
| |
| for chat_id, data in agents.items(): |
| if data.get("last_updated", 0) < threshold_time: |
| agents_to_remove.append(chat_id) |
| |
| for chat_id in agents_to_remove: |
| del agents[chat_id] |
| cleanup_count += len(agents_to_remove) |
| |
| if agents_to_remove: |
| self._save_json_file(self.agents_file, agents) |
| |
| logger.info(f"清理了 {cleanup_count} 条旧记录 (>{days_threshold}天)") |
| return cleanup_count |
| |
| except Exception as e: |
| logger.error(f"清理旧数据失败: {e}") |
| return 0 |
| |
| def get_storage_stats(self) -> Dict[str, Any]: |
| """ |
| 获取存储统计信息 |
| |
| Returns: |
| 存储统计数据 |
| """ |
| try: |
| with self._lock: |
| conversations = self._load_json_file(self.conversation_file, {}) |
| agents = self._load_json_file(self.agents_file, {}) |
| |
| total_messages = sum( |
| data.get("message_count", 0) |
| for data in conversations.values() |
| ) |
| |
| stats = { |
| "total_chats": len(conversations), |
| "total_messages": total_messages, |
| "total_agents": len(agents), |
| "bot_id": self.bot_id, |
| "storage_dir": str(self.storage_dir), |
| "file_sizes": { |
| "conversations": self._get_file_size(self.conversation_file), |
| "agents": self._get_file_size(self.agents_file) |
| } |
| } |
| |
| return stats |
| |
| except Exception as e: |
| logger.error(f"获取存储统计失败: {e}") |
| return {} |
| |
| def _load_json_file(self, file_path: Path, default: Any = None) -> Any: |
| """加载JSON文件""" |
| try: |
| if file_path.exists(): |
| with open(file_path, 'r', encoding='utf-8') as f: |
| return json.load(f) |
| else: |
| return default if default is not None else {} |
| except Exception as e: |
| logger.error(f"加载JSON文件失败 {file_path}: {e}") |
| return default if default is not None else {} |
| |
| def _save_json_file(self, file_path: Path, data: Any) -> bool: |
| """保存JSON文件""" |
| try: |
| # 先写入临时文件,然后原子性重命名 |
| temp_file = file_path.with_suffix('.tmp') |
| |
| with open(temp_file, 'w', encoding='utf-8') as f: |
| json.dump(data, f, ensure_ascii=False, indent=2) |
| |
| # 原子性重命名 |
| temp_file.replace(file_path) |
| return True |
| |
| except Exception as e: |
| logger.error(f"保存JSON文件失败 {file_path}: {e}") |
| # 清理临时文件 |
| temp_file = file_path.with_suffix('.tmp') |
| if temp_file.exists(): |
| temp_file.unlink() |
| return False |
| |
| def _get_file_size(self, file_path: Path) -> int: |
| """获取文件大小(字节)""" |
| try: |
| if file_path.exists(): |
| return file_path.stat().st_size |
| return 0 |
| except Exception: |
| return 0 |
| |
| # ========== Session ID 管理 (V2.2 新增) ========== |
| |
| def save_session_id(self, chat_id: str, session_id: str) -> bool: |
| """ |
| 保存 chat_id 到 session_id 的映射 |
| |
| Args: |
| chat_id: 聊天ID |
| session_id: Claude SDK session ID |
| |
| Returns: |
| 保存是否成功 |
| """ |
| try: |
| with self._lock: |
| sessions = self._load_json_file(self.sessions_file, {}) |
| |
| sessions[chat_id] = { |
| "session_id": session_id, |
| "last_updated": int(time.time()) |
| } |
| |
| success = self._save_json_file(self.sessions_file, sessions) |
| |
| if success: |
| logger.debug(f"保存聊天 {chat_id} 的 session_id: {session_id}") |
| return True |
| else: |
| return False |
| |
| except Exception as e: |
| logger.error(f"保存 session_id 失败: {e}") |
| return False |
| |
| def load_session_id(self, chat_id: str) -> Optional[str]: |
| """ |
| 加载 session_id(带有效性检查) |
| |
| Args: |
| chat_id: 聊天ID |
| |
| Returns: |
| session_id,如果不存在或无效则返回 None |
| """ |
| try: |
| with self._lock: |
| sessions = self._load_json_file(self.sessions_file, {}) |
| |
| data = sessions.get(chat_id) |
| if not data: |
| logger.debug(f"聊天 {chat_id} 没有保存的 session_id") |
| return None |
| |
| session_id = data.get("session_id") |
| |
| # 检查 CLI session 文件是否存在 |
| if session_id and not self._session_file_exists(session_id): |
| logger.warning(f"Session 文件不存在: {session_id},将创建新 session") |
| return None |
| |
| logger.debug(f"加载聊天 {chat_id} 的 session_id: {session_id}") |
| return session_id |
| |
| except Exception as e: |
| logger.error(f"加载 session_id 失败: {e}") |
| return None |
| |
| def _session_file_exists(self, session_id: str) -> bool: |
| """ |
| 检查 CLI session 文件是否存在 |
| |
| Args: |
| session_id: Session ID |
| |
| Returns: |
| session 文件是否存在 |
| """ |
| # Session 文件通常在 ~/.claude/sessions/{session_id}/ |
| session_path = self.claude_sessions_dir / session_id |
| exists = session_path.exists() |
| if not exists: |
| logger.debug(f"Session 目录不存在: {session_path}") |
| return exists |
| |
| def clear_session_id(self, chat_id: str) -> bool: |
| """ |
| 清除指定聊天的 session_id |
| |
| Args: |
| chat_id: 聊天ID |
| |
| Returns: |
| 清除是否成功 |
| """ |
| try: |
| with self._lock: |
| sessions = self._load_json_file(self.sessions_file, {}) |
| |
| if chat_id in sessions: |
| del sessions[chat_id] |
| success = self._save_json_file(self.sessions_file, sessions) |
| if success: |
| logger.info(f"清除聊天 {chat_id} 的 session_id") |
| return True |
| return False |
| return True # 如果不存在,也算成功 |
| |
| except Exception as e: |
| logger.error(f"清除 session_id 失败: {e}") |
| return False |
| |
| def get_all_session_ids(self) -> Dict[str, str]: |
| """ |
| 获取所有 chat_id -> session_id 映射 |
| |
| Returns: |
| 映射字典 |
| """ |
| try: |
| with self._lock: |
| sessions = self._load_json_file(self.sessions_file, {}) |
| return { |
| chat_id: data.get("session_id") |
| for chat_id, data in sessions.items() |
| if data.get("session_id") |
| } |
| except Exception as e: |
| logger.error(f"获取所有 session_id 失败: {e}") |
| return {} |