| """ |
| 增强的CLI界面 - 支持行编辑和历史翻查 |
| """ |
| |
| import asyncio |
| import click |
| import logging |
| import os |
| from typing import Optional |
| from rich.console import Console |
| from rich.panel import Panel |
| from rich.text import Text |
| from rich.logging import RichHandler |
| from rich.table import Table |
| from rich.progress import Progress, SpinnerColumn, TextColumn |
| |
| # 导入prompt_toolkit进行高级输入处理 |
| from prompt_toolkit import PromptSession |
| from prompt_toolkit.history import InMemoryHistory |
| from prompt_toolkit.key_binding import KeyBindings |
| from prompt_toolkit.shortcuts import CompleteStyle |
| from prompt_toolkit.completion import WordCompleter |
| from prompt_toolkit.formatted_text import HTML |
| |
| from ..core.agent import AgentCore, ThinkingMode |
| from ..mcp.integration import MCPToolIntegration |
| from ..sshout import create_sshout_integration |
| |
| |
| class EnhancedCLIInterface: |
| """增强的命令行界面类 - 支持行编辑和历史记录""" |
| |
| def __init__(self): |
| self.console = Console() |
| self.agent: Optional[AgentCore] = None |
| self.mcp_integration: Optional[MCPToolIntegration] = None |
| self.sshout_integration = None |
| self.setup_logging() |
| |
| # 设置命令补全 |
| self.command_completer = WordCompleter([ |
| '/help', '/mode', '/history', '/clear', '/tools', '/status', '/quit', '/exit', |
| '/sshout', '/sshout connect', '/sshout disconnect', '/sshout status', '/sshout send' |
| ], sentence=True) |
| |
| # 设置prompt_toolkit会话 |
| self.history = InMemoryHistory() |
| self.session = PromptSession( |
| history=self.history, |
| completer=self.command_completer, |
| complete_style=CompleteStyle.MULTI_COLUMN |
| ) |
| |
| # 设置快捷键绑定 |
| self.key_bindings = KeyBindings() |
| self.setup_key_bindings() |
| |
| def setup_logging(self): |
| """设置日志""" |
| logging.basicConfig( |
| level=logging.INFO, |
| format="%(message)s", |
| datefmt="[%X]", |
| handlers=[RichHandler(console=self.console, rich_tracebacks=True)] |
| ) |
| |
| def setup_key_bindings(self): |
| """设置按键绑定""" |
| @self.key_bindings.add('c-c') |
| def _(event): |
| """Ctrl+C: 中断当前操作""" |
| event.app.exit(exception=KeyboardInterrupt) |
| |
| @self.key_bindings.add('c-d') |
| def _(event): |
| """Ctrl+D: 退出程序""" |
| event.app.exit() |
| |
| async def initialize(self, api_key: Optional[str] = None): |
| """初始化Agent和MCP集成""" |
| try: |
| with Progress( |
| SpinnerColumn(), |
| TextColumn("[progress.description]{task.description}"), |
| console=self.console, |
| ) as progress: |
| task = progress.add_task("初始化增强CLI...", total=None) |
| |
| # 初始化Agent核心 |
| self.agent = AgentCore(api_key=api_key) |
| progress.update(task, description="初始化MCP工具集成...") |
| |
| # 初始化MCP集成 |
| self.mcp_integration = MCPToolIntegration(self.agent) |
| await self.mcp_integration.setup_default_tools() |
| await self.mcp_integration.enhance_agent_with_tools() |
| |
| progress.update(task, description="初始化SSHOUT集成...") |
| |
| # 初始化SSHOUT集成 |
| self.sshout_integration = create_sshout_integration(self.agent) |
| |
| progress.update(task, description="初始化完成!", completed=True) |
| |
| self.console.print("[green]✓[/green] Claude Agent 增强CLI 初始化完成!") |
| |
| except Exception as e: |
| self.console.print(f"[red]✗[/red] 初始化失败: {str(e)}") |
| raise |
| |
| def show_welcome(self): |
| """显示欢迎信息""" |
| welcome_text = Text() |
| welcome_text.append("Claude Agent", style="bold cyan") |
| welcome_text.append(" - 增强版智能命令行助手\n", style="cyan") |
| welcome_text.append("✨ 新功能:行编辑、历史翻查、SSH连接支持\n", style="green") |
| welcome_text.append("\n快捷键:\n", style="dim") |
| welcome_text.append(" ↑/↓ - 历史记录翻查\n", style="dim") |
| welcome_text.append(" Tab - 命令自动补全\n", style="dim") |
| welcome_text.append(" Ctrl+C - 中断操作\n", style="dim") |
| welcome_text.append(" Ctrl+D - 退出程序\n", style="dim") |
| welcome_text.append("\n输入 '/help' 查看帮助,'/quit' 退出", style="dim") |
| |
| panel = Panel( |
| welcome_text, |
| title="欢迎使用增强版Claude Agent", |
| border_style="cyan", |
| padding=(1, 2) |
| ) |
| self.console.print(panel) |
| |
| def show_help(self): |
| """显示帮助信息""" |
| help_table = Table(title="命令帮助") |
| help_table.add_column("命令", style="cyan") |
| help_table.add_column("描述", style="white") |
| |
| help_table.add_row("/help", "显示此帮助信息") |
| help_table.add_row("/mode", "切换思考模式 (interactive/yolo)") |
| help_table.add_row("/history", "显示对话历史") |
| help_table.add_row("/clear", "清空对话历史") |
| help_table.add_row("/tools", "显示可用的MCP工具") |
| help_table.add_row("/status", "显示系统状态") |
| help_table.add_row("/sshout connect", "连接到SSHOUT聊天室") |
| help_table.add_row("/sshout disconnect", "断开SSHOUT连接") |
| help_table.add_row("/sshout status", "显示SSHOUT连接状态") |
| help_table.add_row("/sshout send <message>", "发送消息到SSHOUT") |
| help_table.add_row("/quit, /exit", "退出程序") |
| |
| self.console.print(help_table) |
| |
| # 显示快捷键帮助 |
| self.console.print("\n[bold]快捷键:[/bold]") |
| self.console.print(" [cyan]↑/↓[/cyan] 历史记录导航") |
| self.console.print(" [cyan]Tab[/cyan] 命令补全") |
| self.console.print(" [cyan]Ctrl+C[/cyan] 中断当前操作") |
| self.console.print(" [cyan]Ctrl+D[/cyan] 退出程序") |
| |
| # 显示使用说明 |
| self.console.print("\n[bold]使用说明:[/bold]") |
| self.console.print(" [green]命令[/green] 以 '/' 开头(如 /help, /status)") |
| self.console.print(" [green]聊天[/green] 直接输入消息与Claude对话") |
| |
| def show_status(self): |
| """显示系统状态""" |
| status_table = Table(title="系统状态") |
| status_table.add_column("项目", style="cyan") |
| status_table.add_column("状态", style="white") |
| |
| # Agent状态 |
| if self.agent: |
| mode_text = "YOLO模式" if self.agent.thinking_mode == ThinkingMode.YOLO else "交互模式" |
| status_table.add_row("思考模式", mode_text) |
| status_table.add_row("对话历史", f"{len(self.agent.conversation_history)} 条消息") |
| else: |
| status_table.add_row("Agent", "[red]未初始化[/red]") |
| |
| # MCP工具状态 |
| if self.mcp_integration and hasattr(self.mcp_integration, 'tool_manager') and hasattr(self.mcp_integration.tool_manager, 'tools'): |
| tools = getattr(self.mcp_integration.tool_manager, 'tools', []) |
| if hasattr(tools, '__len__'): |
| status_table.add_row("MCP工具", f"{len(tools)} 个可用") |
| else: |
| status_table.add_row("MCP工具", "已连接") |
| else: |
| status_table.add_row("MCP工具", "[yellow]未初始化[/yellow]") |
| |
| status_table.add_row("命令历史", f"{len(self.history._storage)} 条") |
| |
| # 显示SSHOUT状态 |
| if self.sshout_integration: |
| try: |
| sshout_status = self.sshout_integration.get_connection_status() |
| if isinstance(sshout_status, dict) and 'connected' in sshout_status: |
| status_text = "已连接" if sshout_status['connected'] else "未连接" |
| if sshout_status['connected'] and 'server' in sshout_status: |
| status_text += f" ({sshout_status['server']})" |
| else: |
| status_text = "状态未知" |
| status_table.add_row("SSHOUT连接", status_text) |
| except Exception: |
| status_table.add_row("SSHOUT连接", "[red]状态检查失败[/red]") |
| |
| self.console.print(status_table) |
| |
| def show_command_history(self): |
| """显示命令历史""" |
| if not self.history._storage: |
| self.console.print("[yellow]暂无命令历史[/yellow]") |
| return |
| |
| self.console.print("[bold]最近的命令历史:[/bold]") |
| for i, command in enumerate(self.history._storage[-10:], 1): |
| command_text = command[:50] + "..." if len(command) > 50 else command |
| self.console.print(f" [dim]{i:2d}.[/dim] {command_text}") |
| |
| async def get_user_input(self, prompt_text: str = "你") -> str: |
| """获取用户输入 - 支持行编辑和历史翻查""" |
| try: |
| # 创建提示符 |
| prompt_html = HTML(f'<cyan>{prompt_text}</cyan><white>: </white>') |
| |
| # 使用prompt_toolkit获取输入(补全器已在session初始化时设置) |
| user_input = await self.session.prompt_async( |
| prompt_html, |
| key_bindings=self.key_bindings |
| ) |
| |
| return user_input.strip() |
| |
| except (KeyboardInterrupt, EOFError): |
| return "" |
| |
| async def process_command(self, command: str) -> bool: |
| """处理特殊命令,返回True表示继续,False表示退出""" |
| original_command = command.strip() |
| |
| # 检查是否是命令(以/开头) |
| if original_command.startswith('/'): |
| command = original_command[1:].lower().strip() |
| |
| if command in ("quit", "exit"): |
| return False |
| |
| elif command == "help": |
| self.show_help() |
| |
| elif command == "status": |
| self.show_status() |
| |
| elif command == "tools": |
| self.show_tools() |
| |
| elif command == "history": |
| self.show_command_history() |
| self.show_conversation_history() |
| |
| elif command == "clear": |
| if await self.confirm_action("确定清空对话历史吗?"): |
| self.agent.clear_history() |
| self.console.print("[green]对话历史已清空[/green]") |
| |
| elif command == "mode": |
| await self.switch_mode() |
| |
| elif command.startswith("sshout"): |
| await self.handle_sshout_command(command) |
| |
| else: |
| self.console.print(f"[yellow]未知命令: /{command}[/yellow]") |
| self.console.print("[dim]输入 '/help' 查看可用命令[/dim]") |
| |
| else: |
| # 不是命令(不以/开头),交给Agent处理 |
| await self.process_user_input(original_command) |
| |
| return True |
| |
| async def confirm_action(self, message: str) -> bool: |
| """确认操作""" |
| try: |
| response = await self.session.prompt_async( |
| HTML(f'<yellow>{message}</yellow> <white>[y/N]: </white>') |
| ) |
| return response.lower().strip() in ('y', 'yes', '是') |
| except (KeyboardInterrupt, EOFError): |
| return False |
| |
| def show_tools(self): |
| """显示可用工具""" |
| if not self.mcp_integration: |
| self.console.print("[red]MCP服务未连接[/red]") |
| return |
| |
| tools = self.mcp_integration.tool_manager.get_available_tools() |
| |
| if not tools: |
| self.console.print("[yellow]暂无可用的MCP工具[/yellow]") |
| return |
| |
| tools_table = Table(title="可用的MCP工具") |
| tools_table.add_column("工具名称", style="cyan") |
| tools_table.add_column("描述", style="white") |
| |
| for tool_name in tools: |
| tool_info = self.mcp_integration.tool_manager.get_tool_info(tool_name) |
| description = tool_info['description'] if tool_info else "无描述" |
| tools_table.add_row(tool_name, description) |
| |
| self.console.print(tools_table) |
| |
| def show_conversation_history(self): |
| """显示对话历史""" |
| if not self.agent: |
| self.console.print("[red]错误: Agent未初始化[/red]") |
| return |
| |
| history = self.agent.get_conversation_history() |
| |
| if not history: |
| self.console.print("[yellow]暂无对话历史[/yellow]") |
| return |
| |
| self.console.print("[bold]最近的对话历史:[/bold]") |
| for i, message in enumerate(history[-5:], 1): # 只显示最后5条 |
| role_style = "blue" if message["role"] == "user" else "green" |
| role_text = "用户" if message["role"] == "user" else "助手" |
| |
| content = message["content"] |
| if len(content) > 100: |
| content = content[:100] + "..." |
| |
| self.console.print(f"[{role_style}]{i}. {role_text}:[/{role_style}] {content}") |
| |
| async def switch_mode(self): |
| """切换思考模式""" |
| current_mode = self.agent.thinking_mode |
| |
| if current_mode == ThinkingMode.INTERACTIVE: |
| new_mode = ThinkingMode.YOLO |
| mode_text = "YOLO自主模式" |
| else: |
| new_mode = ThinkingMode.INTERACTIVE |
| mode_text = "交互模式" |
| |
| if await self.confirm_action(f"确定切换到{mode_text}吗?"): |
| self.agent.set_thinking_mode(new_mode) |
| self.console.print(f"[green]已切换到{mode_text}[/green]") |
| else: |
| self.console.print("[yellow]已取消切换[/yellow]") |
| |
| async def handle_sshout_command(self, command: str): |
| """处理SSHOUT相关命令""" |
| parts = command.split() |
| if len(parts) < 2: |
| self.console.print("[yellow]使用方法: /sshout <connect/disconnect/status/send> [参数][/yellow]") |
| return |
| |
| action = parts[1].lower() |
| |
| if action == "connect": |
| await self._sshout_connect() |
| elif action == "disconnect": |
| await self._sshout_disconnect() |
| elif action == "status": |
| await self._sshout_status() |
| elif action == "send": |
| if len(parts) < 3: |
| self.console.print("[yellow]使用方法: /sshout send <消息内容>[/yellow]") |
| return |
| message = " ".join(parts[2:]) |
| await self._sshout_send_message(message) |
| else: |
| self.console.print("[yellow]未知的SSHOUT命令。使用 '/help' 查看可用命令。[/yellow]") |
| |
| async def _sshout_connect(self): |
| """连接SSHOUT""" |
| if not self.sshout_integration: |
| self.console.print("[red]SSHOUT集成未初始化[/red]") |
| return |
| |
| self.console.print("[cyan]正在连接SSHOUT...[/cyan]") |
| |
| with Progress( |
| SpinnerColumn(), |
| TextColumn("[progress.description]{task.description}"), |
| console=self.console, |
| ) as progress: |
| task = progress.add_task("连接SSHOUT服务器...", total=None) |
| |
| success = await self._connect_sshout() |
| |
| if success: |
| progress.update(task, description="SSHOUT连接成功!", completed=True) |
| self.console.print("[green]✅ SSHOUT连接成功!现在会自动响应@Claude提及[/green]") |
| else: |
| progress.update(task, description="SSHOUT连接失败", completed=True) |
| self.console.print("[red]❌ SSHOUT连接失败[/red]") |
| |
| async def _sshout_disconnect(self): |
| """断开SSHOUT连接""" |
| if not self.sshout_integration: |
| self.console.print("[red]SSHOUT集成未初始化[/red]") |
| return |
| |
| await self._disconnect_sshout() |
| |
| async def _connect_sshout(self) -> bool: |
| """根据配置模式连接SSHOUT""" |
| if hasattr(self.sshout_integration, 'connect_to_sshout_api'): |
| # API模式 |
| return await self.sshout_integration.connect_to_sshout_api() |
| else: |
| # SSH模式 |
| return await self.sshout_integration.connect_to_sshout() |
| |
| async def _disconnect_sshout(self, show_message: bool = True): |
| """根据配置模式断开SSHOUT连接""" |
| if hasattr(self.sshout_integration, 'disconnect_from_sshout_api'): |
| # API模式 |
| await self.sshout_integration.disconnect_from_sshout_api() |
| else: |
| # SSH模式 |
| await self.sshout_integration.disconnect_from_sshout() |
| |
| if show_message: |
| self.console.print("[yellow]🔌 SSHOUT连接已断开[/yellow]") |
| |
| async def _sshout_status(self): |
| """显示SSHOUT状态""" |
| if not self.sshout_integration: |
| self.console.print("[red]SSHOUT集成未初始化[/red]") |
| return |
| |
| status = self.sshout_integration.get_connection_status() |
| |
| status_table = Table(title="SSHOUT连接状态") |
| status_table.add_column("项目", style="cyan") |
| status_table.add_column("值", style="white") |
| |
| status_table.add_row("连接状态", "已连接" if status['connected'] else "未连接") |
| |
| # 添加API版本信息 |
| if 'api_version' in status: |
| status_table.add_row("API版本", status['api_version']) |
| |
| if status['connected']: |
| status_table.add_row("服务器", status['server']) |
| status_table.add_row("消息计数", str(status['message_count'])) |
| |
| # API模式特有的用户信息 |
| if 'my_user_id' in status and status['my_user_id']: |
| status_table.add_row("用户ID", str(status['my_user_id'])) |
| if 'my_username' in status and status['my_username']: |
| status_table.add_row("用户名", status['my_username']) |
| |
| if status['recent_messages']: |
| status_table.add_row("最近消息", "") |
| for msg in status['recent_messages']: |
| # 兼容不同的消息格式 |
| username = msg.get('username', msg.get('from_user', 'unknown')) |
| status_table.add_row( |
| f" [{msg['timestamp']}] {username}", |
| msg['content'] |
| ) |
| |
| self.console.print(status_table) |
| |
| async def _sshout_send_message(self, message: str): |
| """发送消息到SSHOUT""" |
| if not self.sshout_integration: |
| self.console.print("[red]SSHOUT集成未初始化[/red]") |
| return |
| |
| success = await self.sshout_integration.send_message(message) |
| |
| if success: |
| self.console.print(f"[green]✅ 消息已发送: {message}[/green]") |
| else: |
| self.console.print("[red]❌ 消息发送失败。请检查SSHOUT连接状态。[/red]") |
| |
| async def process_user_input(self, user_input: str): |
| """处理用户输入""" |
| try: |
| # 根据模式显示不同的处理提示 |
| if self.agent.thinking_mode == ThinkingMode.YOLO: |
| # YOLO模式的详细进度显示 |
| with Progress( |
| SpinnerColumn(), |
| TextColumn("[progress.description]{task.description}"), |
| console=self.console, |
| ) as progress: |
| # 创建主任务 |
| main_task = progress.add_task("启动YOLO自主思考模式...", total=None) |
| |
| # 创建一个日志处理器来捕获思考过程 |
| import logging |
| from rich.logging import RichHandler |
| |
| # 临时创建一个日志捕获器 |
| class ProgressLogHandler(logging.Handler): |
| def __init__(self, progress_task, progress_obj): |
| super().__init__() |
| self.progress_task = progress_task |
| self.progress_obj = progress_obj |
| |
| def emit(self, record): |
| # 捕获所有包含思考过程标识符的日志 |
| thinking_indicators = [ |
| "🧠", "📋", "🔍", "✅", "🎯", "🔧", "📊", |
| "💡", "⚡", "🎉", "❌", "🔄", "⚠️" |
| ] |
| if any(indicator in record.msg for indicator in thinking_indicators): |
| # 清理消息,移除多余的格式 |
| clean_msg = record.msg.strip() |
| self.progress_obj.update(self.progress_task, description=clean_msg) |
| |
| # 设置日志捕获 |
| agent_logger = logging.getLogger('claude_agent.core.agent') |
| progress_handler = ProgressLogHandler(main_task, progress) |
| agent_logger.addHandler(progress_handler) |
| |
| try: |
| # 处理用户输入 |
| response = await self.agent.process_user_input(user_input) |
| |
| # 如果有MCP工具调用,处理工具调用 |
| enhanced_response = await self.mcp_integration.process_tool_calls_in_response(response) |
| |
| progress.update(main_task, description="思考完成!", completed=True) |
| |
| finally: |
| # 清理日志处理器 |
| agent_logger.removeHandler(progress_handler) |
| |
| else: |
| # 交互模式的简单显示 |
| with Progress( |
| SpinnerColumn(), |
| TextColumn("[progress.description]{task.description}"), |
| console=self.console, |
| ) as progress: |
| task = progress.add_task("处理中...", total=None) |
| |
| # 处理用户输入 |
| response = await self.agent.process_user_input(user_input) |
| |
| # 如果有MCP工具调用,处理工具调用 |
| enhanced_response = await self.mcp_integration.process_tool_calls_in_response(response) |
| |
| # 显示响应 |
| self.console.print("\n[green]助手:[/green]") |
| self.console.print(Panel(enhanced_response, border_style="green")) |
| |
| except Exception as e: |
| self.console.print(f"[red]处理请求时出错: {str(e)}[/red]") |
| |
| async def run_interactive_loop(self): |
| """运行增强的交互循环""" |
| self.show_welcome() |
| |
| while True: |
| try: |
| # 获取用户输入(支持行编辑和历史翻查) |
| user_input = await self.get_user_input() |
| |
| if not user_input: |
| continue |
| |
| # 处理命令或输入 |
| should_continue = await self.process_command(user_input) |
| if not should_continue: |
| break |
| |
| except KeyboardInterrupt: |
| if await self.confirm_action("确定要退出吗?"): |
| break |
| else: |
| self.console.print("继续...") |
| continue |
| except EOFError: |
| break |
| except Exception as e: |
| self.console.print(f"[red]发生未知错误: {str(e)}[/red]") |
| continue |
| |
| async def shutdown(self): |
| """关闭资源""" |
| if self.mcp_integration: |
| await self.mcp_integration.shutdown() |
| |
| if self.sshout_integration: |
| await self._disconnect_sshout(show_message=False) |
| |
| self.console.print("[yellow]再见![/yellow]") |