| """ |
| enhanced_interface.py的全面测试覆盖 |
| 针对提升覆盖率至85%+目标设计 |
| """ |
| |
| import pytest |
| from unittest.mock import Mock, AsyncMock, patch, MagicMock |
| import asyncio |
| import logging |
| from io import StringIO |
| |
| from src.claude_agent.cli.enhanced_interface import EnhancedCLIInterface |
| from src.claude_agent.core.agent import AgentCore, ThinkingMode |
| |
| |
| @pytest.fixture |
| def enhanced_cli(): |
| """创建EnhancedCLIInterface实例""" |
| with patch('src.claude_agent.cli.enhanced_interface.create_sshout_integration'): |
| return EnhancedCLIInterface() |
| |
| |
| @pytest.fixture |
| def mock_agent(): |
| """创建模拟的AgentCore""" |
| agent = Mock(spec=AgentCore) |
| agent.thinking_mode = ThinkingMode.INTERACTIVE |
| agent.conversation_history = [] |
| agent.process_user_input = AsyncMock(return_value="Mock response") |
| agent.get_conversation_history = Mock(return_value=[]) |
| agent.clear_history = Mock() |
| agent.set_thinking_mode = Mock() |
| return agent |
| |
| |
| @pytest.fixture |
| def mock_mcp_integration(): |
| """创建模拟的MCP集成""" |
| mcp = Mock() |
| mcp.setup_default_tools = AsyncMock() |
| mcp.enhance_agent_with_tools = AsyncMock() |
| mcp.process_tool_calls_in_response = AsyncMock(return_value="Enhanced response") |
| mcp.tool_manager = Mock() |
| mcp.tool_manager.get_available_tools = Mock(return_value=['tool1', 'tool2']) |
| mcp.tool_manager.get_tool_info = Mock(return_value={'description': 'Test tool'}) |
| mcp.shutdown = AsyncMock() |
| return mcp |
| |
| |
| @pytest.fixture |
| def mock_sshout_integration(): |
| """创建模拟的SSHOUT集成""" |
| sshout = Mock() |
| sshout.connect_to_sshout = AsyncMock(return_value=True) |
| sshout.connect_to_sshout_api = AsyncMock(return_value=True) |
| sshout.disconnect_from_sshout = AsyncMock() |
| sshout.disconnect_from_sshout_api = AsyncMock() |
| sshout.send_message = AsyncMock(return_value=True) |
| sshout.get_connection_status = Mock(return_value={ |
| 'connected': True, |
| 'server': 'test.server.com', |
| 'message_count': 10, |
| 'recent_messages': [] |
| }) |
| return sshout |
| |
| |
| class TestEnhancedCLIInitialization: |
| """测试增强CLI的初始化""" |
| |
| def test_init_basic_setup(self, enhanced_cli): |
| """测试基本初始化""" |
| assert enhanced_cli.console is not None |
| assert enhanced_cli.agent is None |
| assert enhanced_cli.mcp_integration is None |
| assert enhanced_cli.sshout_integration is None |
| assert enhanced_cli.history is not None |
| assert enhanced_cli.session is not None |
| assert enhanced_cli.command_completer is not None |
| assert enhanced_cli.key_bindings is not None |
| |
| def test_command_completer_setup(self, enhanced_cli): |
| """测试命令补全器设置""" |
| completer = enhanced_cli.command_completer |
| assert completer is not None |
| # 检查是否包含预期的命令 |
| expected_commands = ['/help', '/mode', '/history', '/clear', '/tools', '/status', '/quit', '/exit'] |
| for cmd in expected_commands: |
| assert cmd in str(completer.words) |
| |
| @pytest.mark.asyncio |
| async def test_initialize_success(self, enhanced_cli, mock_mcp_integration): |
| """测试成功的初始化过程""" |
| # 直接验证初始化过程中的关键调用 |
| original_console_print = enhanced_cli.console.print |
| |
| with patch('src.claude_agent.cli.enhanced_interface.AgentCore') as mock_agent_class, \ |
| patch('src.claude_agent.cli.enhanced_interface.MCPToolIntegration', return_value=mock_mcp_integration): |
| |
| mock_agent_instance = Mock() |
| mock_agent_class.return_value = mock_agent_instance |
| |
| await enhanced_cli.initialize("test-api-key") |
| |
| # 验证关键方法被调用 |
| mock_agent_class.assert_called_once_with(api_key="test-api-key") |
| mock_mcp_integration.setup_default_tools.assert_called_once() |
| mock_mcp_integration.enhance_agent_with_tools.assert_called_once() |
| |
| @pytest.mark.asyncio |
| async def test_initialize_failure(self, enhanced_cli): |
| """测试初始化失败""" |
| with patch('src.claude_agent.cli.enhanced_interface.AgentCore', side_effect=Exception("Init failed")): |
| with pytest.raises(Exception, match="Init failed"): |
| await enhanced_cli.initialize("test-api-key") |
| |
| |
| class TestEnhancedCLIDisplayMethods: |
| """测试增强CLI的显示方法""" |
| |
| def test_show_welcome(self, enhanced_cli): |
| """测试显示欢迎信息""" |
| with patch.object(enhanced_cli.console, 'print') as mock_print: |
| enhanced_cli.show_welcome() |
| mock_print.assert_called_once() |
| |
| def test_show_help(self, enhanced_cli): |
| """测试显示帮助信息""" |
| with patch.object(enhanced_cli.console, 'print') as mock_print: |
| enhanced_cli.show_help() |
| # 应该有多次print调用(表格、快捷键、使用说明) |
| assert mock_print.call_count >= 3 |
| |
| def test_show_status_with_agent(self, enhanced_cli, mock_agent): |
| """测试显示状态(有Agent)""" |
| enhanced_cli.agent = mock_agent |
| mock_agent.thinking_mode = ThinkingMode.YOLO |
| mock_agent.conversation_history = ['msg1', 'msg2'] |
| |
| with patch.object(enhanced_cli.console, 'print') as mock_print: |
| enhanced_cli.show_status() |
| mock_print.assert_called_once() |
| |
| def test_show_status_without_agent(self, enhanced_cli): |
| """测试显示状态(无Agent)""" |
| enhanced_cli.agent = None |
| |
| with patch.object(enhanced_cli.console, 'print') as mock_print: |
| enhanced_cli.show_status() |
| mock_print.assert_called_once() |
| |
| def test_show_status_with_mcp_tools(self, enhanced_cli, mock_mcp_integration): |
| """测试显示状态(有MCP工具)""" |
| enhanced_cli.mcp_integration = mock_mcp_integration |
| mock_mcp_integration.tool_manager.tools = ['tool1', 'tool2'] |
| |
| with patch.object(enhanced_cli.console, 'print') as mock_print: |
| enhanced_cli.show_status() |
| mock_print.assert_called_once() |
| |
| def test_show_status_with_sshout_connected(self, enhanced_cli, mock_sshout_integration): |
| """测试显示状态(SSHOUT已连接)""" |
| enhanced_cli.sshout_integration = mock_sshout_integration |
| |
| with patch.object(enhanced_cli.console, 'print') as mock_print: |
| enhanced_cli.show_status() |
| mock_print.assert_called_once() |
| |
| def test_show_status_with_sshout_error(self, enhanced_cli): |
| """测试显示状态(SSHOUT状态检查失败)""" |
| mock_sshout = Mock() |
| mock_sshout.get_connection_status.side_effect = Exception("Connection error") |
| enhanced_cli.sshout_integration = mock_sshout |
| |
| with patch.object(enhanced_cli.console, 'print') as mock_print: |
| enhanced_cli.show_status() |
| mock_print.assert_called_once() |
| |
| def test_show_command_history_empty(self, enhanced_cli): |
| """测试显示空的命令历史""" |
| enhanced_cli.history._storage = [] |
| |
| with patch.object(enhanced_cli.console, 'print') as mock_print: |
| enhanced_cli.show_command_history() |
| mock_print.assert_called_once_with("[yellow]暂无命令历史[/yellow]") |
| |
| def test_show_command_history_with_data(self, enhanced_cli): |
| """测试显示有数据的命令历史""" |
| enhanced_cli.history._storage = ['command1', 'command2', 'very_long_command_that_should_be_truncated_because_it_is_too_long'] |
| |
| with patch.object(enhanced_cli.console, 'print') as mock_print: |
| enhanced_cli.show_command_history() |
| # 应该有标题 + 每个命令一行 |
| assert mock_print.call_count == 4 # 1个标题 + 3个命令 |
| |
| def test_show_conversation_history_no_agent(self, enhanced_cli): |
| """测试显示对话历史(无Agent)""" |
| enhanced_cli.agent = None |
| |
| with patch.object(enhanced_cli.console, 'print') as mock_print: |
| enhanced_cli.show_conversation_history() |
| mock_print.assert_called_once_with("[red]错误: Agent未初始化[/red]") |
| |
| def test_show_conversation_history_empty(self, enhanced_cli, mock_agent): |
| """测试显示空的对话历史""" |
| enhanced_cli.agent = mock_agent |
| mock_agent.get_conversation_history.return_value = [] |
| |
| with patch.object(enhanced_cli.console, 'print') as mock_print: |
| enhanced_cli.show_conversation_history() |
| mock_print.assert_called_once_with("[yellow]暂无对话历史[/yellow]") |
| |
| def test_show_conversation_history_with_data(self, enhanced_cli, mock_agent): |
| """测试显示有数据的对话历史""" |
| history = [ |
| {"role": "user", "content": "Hello"}, |
| {"role": "assistant", "content": "Hi there"}, |
| {"role": "user", "content": "Very long message that should be truncated because it exceeds the maximum length limit set for display purposes in the conversation history"} |
| ] |
| enhanced_cli.agent = mock_agent |
| mock_agent.get_conversation_history.return_value = history |
| |
| with patch.object(enhanced_cli.console, 'print') as mock_print: |
| enhanced_cli.show_conversation_history() |
| # 应该有标题 + 每条消息一行 |
| assert mock_print.call_count == 4 # 1个标题 + 3条消息 |
| |
| def test_show_tools_no_mcp(self, enhanced_cli): |
| """测试显示工具(无MCP)""" |
| enhanced_cli.mcp_integration = None |
| |
| with patch.object(enhanced_cli.console, 'print') as mock_print: |
| enhanced_cli.show_tools() |
| mock_print.assert_called_once_with("[red]MCP服务未连接[/red]") |
| |
| def test_show_tools_no_tools(self, enhanced_cli, mock_mcp_integration): |
| """测试显示工具(无可用工具)""" |
| enhanced_cli.mcp_integration = mock_mcp_integration |
| mock_mcp_integration.tool_manager.get_available_tools.return_value = [] |
| |
| with patch.object(enhanced_cli.console, 'print') as mock_print: |
| enhanced_cli.show_tools() |
| mock_print.assert_called_once_with("[yellow]暂无可用的MCP工具[/yellow]") |
| |
| def test_show_tools_with_tools(self, enhanced_cli, mock_mcp_integration): |
| """测试显示工具(有可用工具)""" |
| enhanced_cli.mcp_integration = mock_mcp_integration |
| |
| with patch.object(enhanced_cli.console, 'print') as mock_print: |
| enhanced_cli.show_tools() |
| mock_print.assert_called_once() |
| |
| |
| class TestEnhancedCLIInputHandling: |
| """测试增强CLI的输入处理""" |
| |
| @pytest.mark.asyncio |
| async def test_get_user_input_normal(self, enhanced_cli): |
| """测试正常的用户输入""" |
| with patch.object(enhanced_cli.session, 'prompt_async', return_value="test input"): |
| result = await enhanced_cli.get_user_input("prompt") |
| assert result == "test input" |
| |
| @pytest.mark.asyncio |
| async def test_get_user_input_keyboard_interrupt(self, enhanced_cli): |
| """测试键盘中断""" |
| with patch.object(enhanced_cli.session, 'prompt_async', side_effect=KeyboardInterrupt): |
| result = await enhanced_cli.get_user_input("prompt") |
| assert result == "" |
| |
| @pytest.mark.asyncio |
| async def test_get_user_input_eof_error(self, enhanced_cli): |
| """测试EOF错误""" |
| with patch.object(enhanced_cli.session, 'prompt_async', side_effect=EOFError): |
| result = await enhanced_cli.get_user_input("prompt") |
| assert result == "" |
| |
| @pytest.mark.asyncio |
| async def test_confirm_action_yes(self, enhanced_cli): |
| """测试确认操作(是)""" |
| with patch.object(enhanced_cli.session, 'prompt_async', return_value="y"): |
| result = await enhanced_cli.confirm_action("Are you sure?") |
| assert result is True |
| |
| @pytest.mark.asyncio |
| async def test_confirm_action_no(self, enhanced_cli): |
| """测试确认操作(否)""" |
| with patch.object(enhanced_cli.session, 'prompt_async', return_value="n"): |
| result = await enhanced_cli.confirm_action("Are you sure?") |
| assert result is False |
| |
| @pytest.mark.asyncio |
| async def test_confirm_action_keyboard_interrupt(self, enhanced_cli): |
| """测试确认操作(键盘中断)""" |
| with patch.object(enhanced_cli.session, 'prompt_async', side_effect=KeyboardInterrupt): |
| result = await enhanced_cli.confirm_action("Are you sure?") |
| assert result is False |
| |
| |
| class TestEnhancedCLICommandProcessing: |
| """测试增强CLI的命令处理""" |
| |
| @pytest.mark.asyncio |
| async def test_process_command_quit(self, enhanced_cli): |
| """测试退出命令""" |
| result = await enhanced_cli.process_command("/quit") |
| assert result is False |
| |
| @pytest.mark.asyncio |
| async def test_process_command_exit(self, enhanced_cli): |
| """测试退出命令""" |
| result = await enhanced_cli.process_command("/exit") |
| assert result is False |
| |
| @pytest.mark.asyncio |
| async def test_process_command_help(self, enhanced_cli): |
| """测试帮助命令""" |
| with patch.object(enhanced_cli, 'show_help') as mock_show_help: |
| result = await enhanced_cli.process_command("/help") |
| assert result is True |
| mock_show_help.assert_called_once() |
| |
| @pytest.mark.asyncio |
| async def test_process_command_status(self, enhanced_cli): |
| """测试状态命令""" |
| with patch.object(enhanced_cli, 'show_status') as mock_show_status: |
| result = await enhanced_cli.process_command("/status") |
| assert result is True |
| mock_show_status.assert_called_once() |
| |
| @pytest.mark.asyncio |
| async def test_process_command_tools(self, enhanced_cli): |
| """测试工具命令""" |
| with patch.object(enhanced_cli, 'show_tools') as mock_show_tools: |
| result = await enhanced_cli.process_command("/tools") |
| assert result is True |
| mock_show_tools.assert_called_once() |
| |
| @pytest.mark.asyncio |
| async def test_process_command_history(self, enhanced_cli): |
| """测试历史命令""" |
| with patch.object(enhanced_cli, 'show_command_history') as mock_show_cmd_hist, \ |
| patch.object(enhanced_cli, 'show_conversation_history') as mock_show_conv_hist: |
| result = await enhanced_cli.process_command("/history") |
| assert result is True |
| mock_show_cmd_hist.assert_called_once() |
| mock_show_conv_hist.assert_called_once() |
| |
| @pytest.mark.asyncio |
| async def test_process_command_clear_confirmed(self, enhanced_cli, mock_agent): |
| """测试清空命令(确认)""" |
| enhanced_cli.agent = mock_agent |
| with patch.object(enhanced_cli, 'confirm_action', return_value=True): |
| result = await enhanced_cli.process_command("/clear") |
| assert result is True |
| mock_agent.clear_history.assert_called_once() |
| |
| @pytest.mark.asyncio |
| async def test_process_command_clear_cancelled(self, enhanced_cli, mock_agent): |
| """测试清空命令(取消)""" |
| enhanced_cli.agent = mock_agent |
| with patch.object(enhanced_cli, 'confirm_action', return_value=False): |
| result = await enhanced_cli.process_command("/clear") |
| assert result is True |
| mock_agent.clear_history.assert_not_called() |
| |
| @pytest.mark.asyncio |
| async def test_process_command_mode(self, enhanced_cli): |
| """测试模式切换命令""" |
| with patch.object(enhanced_cli, 'switch_mode') as mock_switch_mode: |
| result = await enhanced_cli.process_command("/mode") |
| assert result is True |
| mock_switch_mode.assert_called_once() |
| |
| @pytest.mark.asyncio |
| async def test_process_command_sshout(self, enhanced_cli): |
| """测试SSHOUT命令""" |
| with patch.object(enhanced_cli, 'handle_sshout_command') as mock_handle_sshout: |
| result = await enhanced_cli.process_command("/sshout connect") |
| assert result is True |
| mock_handle_sshout.assert_called_once_with("sshout connect") |
| |
| @pytest.mark.asyncio |
| async def test_process_command_unknown(self, enhanced_cli): |
| """测试未知命令""" |
| with patch.object(enhanced_cli.console, 'print') as mock_print: |
| result = await enhanced_cli.process_command("/unknown") |
| assert result is True |
| mock_print.assert_called() |
| |
| @pytest.mark.asyncio |
| async def test_process_command_non_command(self, enhanced_cli): |
| """测试非命令输入""" |
| with patch.object(enhanced_cli, 'process_user_input') as mock_process_input: |
| result = await enhanced_cli.process_command("hello world") |
| assert result is True |
| mock_process_input.assert_called_once_with("hello world") |
| |
| |
| class TestEnhancedCLIModeSwitch: |
| """测试增强CLI的模式切换""" |
| |
| @pytest.mark.asyncio |
| async def test_switch_mode_to_yolo_confirmed(self, enhanced_cli, mock_agent): |
| """测试切换到YOLO模式(确认)""" |
| enhanced_cli.agent = mock_agent |
| mock_agent.thinking_mode = ThinkingMode.INTERACTIVE |
| |
| with patch.object(enhanced_cli, 'confirm_action', return_value=True): |
| await enhanced_cli.switch_mode() |
| mock_agent.set_thinking_mode.assert_called_once_with(ThinkingMode.YOLO) |
| |
| @pytest.mark.asyncio |
| async def test_switch_mode_to_interactive_confirmed(self, enhanced_cli, mock_agent): |
| """测试切换到交互模式(确认)""" |
| enhanced_cli.agent = mock_agent |
| mock_agent.thinking_mode = ThinkingMode.YOLO |
| |
| with patch.object(enhanced_cli, 'confirm_action', return_value=True): |
| await enhanced_cli.switch_mode() |
| mock_agent.set_thinking_mode.assert_called_once_with(ThinkingMode.INTERACTIVE) |
| |
| @pytest.mark.asyncio |
| async def test_switch_mode_cancelled(self, enhanced_cli, mock_agent): |
| """测试切换模式(取消)""" |
| enhanced_cli.agent = mock_agent |
| mock_agent.thinking_mode = ThinkingMode.INTERACTIVE |
| |
| with patch.object(enhanced_cli, 'confirm_action', return_value=False), \ |
| patch.object(enhanced_cli.console, 'print') as mock_print: |
| await enhanced_cli.switch_mode() |
| mock_agent.set_thinking_mode.assert_not_called() |
| mock_print.assert_called_once_with("[yellow]已取消切换[/yellow]") |
| |
| |
| class TestEnhancedCLISSHOUTHandling: |
| """测试增强CLI的SSHOUT处理""" |
| |
| @pytest.mark.asyncio |
| async def test_handle_sshout_command_no_action(self, enhanced_cli): |
| """测试SSHOUT命令(无动作)""" |
| with patch.object(enhanced_cli.console, 'print') as mock_print: |
| await enhanced_cli.handle_sshout_command("sshout") |
| mock_print.assert_called_once_with("[yellow]使用方法: /sshout <connect/disconnect/status/send> [参数][/yellow]") |
| |
| @pytest.mark.asyncio |
| async def test_handle_sshout_command_connect(self, enhanced_cli): |
| """测试SSHOUT连接命令""" |
| with patch.object(enhanced_cli, '_sshout_connect') as mock_connect: |
| await enhanced_cli.handle_sshout_command("sshout connect") |
| mock_connect.assert_called_once() |
| |
| @pytest.mark.asyncio |
| async def test_handle_sshout_command_disconnect(self, enhanced_cli): |
| """测试SSHOUT断连命令""" |
| with patch.object(enhanced_cli, '_sshout_disconnect') as mock_disconnect: |
| await enhanced_cli.handle_sshout_command("sshout disconnect") |
| mock_disconnect.assert_called_once() |
| |
| @pytest.mark.asyncio |
| async def test_handle_sshout_command_status(self, enhanced_cli): |
| """测试SSHOUT状态命令""" |
| with patch.object(enhanced_cli, '_sshout_status') as mock_status: |
| await enhanced_cli.handle_sshout_command("sshout status") |
| mock_status.assert_called_once() |
| |
| @pytest.mark.asyncio |
| async def test_handle_sshout_command_send_no_message(self, enhanced_cli): |
| """测试SSHOUT发送命令(无消息)""" |
| with patch.object(enhanced_cli.console, 'print') as mock_print: |
| await enhanced_cli.handle_sshout_command("sshout send") |
| mock_print.assert_called_once_with("[yellow]使用方法: /sshout send <消息内容>[/yellow]") |
| |
| @pytest.mark.asyncio |
| async def test_handle_sshout_command_send_with_message(self, enhanced_cli): |
| """测试SSHOUT发送命令(有消息)""" |
| with patch.object(enhanced_cli, '_sshout_send_message') as mock_send: |
| await enhanced_cli.handle_sshout_command("sshout send hello world") |
| mock_send.assert_called_once_with("hello world") |
| |
| @pytest.mark.asyncio |
| async def test_handle_sshout_command_unknown(self, enhanced_cli): |
| """测试SSHOUT未知命令""" |
| with patch.object(enhanced_cli.console, 'print') as mock_print: |
| await enhanced_cli.handle_sshout_command("sshout unknown") |
| mock_print.assert_called_once_with("[yellow]未知的SSHOUT命令。使用 '/help' 查看可用命令。[/yellow]") |
| |
| |
| class TestEnhancedCLISSHOUTOperations: |
| """测试增强CLI的SSHOUT操作""" |
| |
| @pytest.mark.asyncio |
| async def test_sshout_connect_no_integration(self, enhanced_cli): |
| """测试SSHOUT连接(无集成)""" |
| enhanced_cli.sshout_integration = None |
| with patch.object(enhanced_cli.console, 'print') as mock_print: |
| await enhanced_cli._sshout_connect() |
| mock_print.assert_called_once_with("[red]SSHOUT集成未初始化[/red]") |
| |
| @pytest.mark.asyncio |
| async def test_sshout_connect_success(self, enhanced_cli, mock_sshout_integration): |
| """测试SSHOUT连接成功""" |
| enhanced_cli.sshout_integration = mock_sshout_integration |
| with patch.object(enhanced_cli, '_connect_sshout', return_value=True): |
| await enhanced_cli._sshout_connect() |
| # 验证成功消息被打印(通过Progress显示) |
| |
| @pytest.mark.asyncio |
| async def test_sshout_connect_failure(self, enhanced_cli, mock_sshout_integration): |
| """测试SSHOUT连接失败""" |
| enhanced_cli.sshout_integration = mock_sshout_integration |
| with patch.object(enhanced_cli, '_connect_sshout', return_value=False): |
| await enhanced_cli._sshout_connect() |
| # 验证失败消息被打印(通过Progress显示) |
| |
| @pytest.mark.asyncio |
| async def test_sshout_disconnect_no_integration(self, enhanced_cli): |
| """测试SSHOUT断连(无集成)""" |
| enhanced_cli.sshout_integration = None |
| with patch.object(enhanced_cli.console, 'print') as mock_print: |
| await enhanced_cli._sshout_disconnect() |
| mock_print.assert_called_once_with("[red]SSHOUT集成未初始化[/red]") |
| |
| @pytest.mark.asyncio |
| async def test_sshout_disconnect_success(self, enhanced_cli, mock_sshout_integration): |
| """测试SSHOUT断连成功""" |
| enhanced_cli.sshout_integration = mock_sshout_integration |
| with patch.object(enhanced_cli, '_disconnect_sshout') as mock_disconnect: |
| await enhanced_cli._sshout_disconnect() |
| mock_disconnect.assert_called_once() |
| |
| @pytest.mark.asyncio |
| async def test_connect_sshout_api_mode(self, enhanced_cli, mock_sshout_integration): |
| """测试连接SSHOUT(API模式)""" |
| enhanced_cli.sshout_integration = mock_sshout_integration |
| result = await enhanced_cli._connect_sshout() |
| assert result is True |
| mock_sshout_integration.connect_to_sshout_api.assert_called_once() |
| |
| @pytest.mark.asyncio |
| async def test_connect_sshout_ssh_mode(self, enhanced_cli): |
| """测试连接SSHOUT(SSH模式)""" |
| mock_sshout = Mock() |
| mock_sshout.connect_to_sshout = AsyncMock(return_value=True) |
| # 确保没有connect_to_sshout_api属性,这样就会使用SSH模式 |
| if hasattr(mock_sshout, 'connect_to_sshout_api'): |
| delattr(mock_sshout, 'connect_to_sshout_api') |
| enhanced_cli.sshout_integration = mock_sshout |
| |
| result = await enhanced_cli._connect_sshout() |
| assert result is True |
| mock_sshout.connect_to_sshout.assert_called_once() |
| |
| @pytest.mark.asyncio |
| async def test_disconnect_sshout_api_mode(self, enhanced_cli, mock_sshout_integration): |
| """测试断连SSHOUT(API模式)""" |
| enhanced_cli.sshout_integration = mock_sshout_integration |
| with patch.object(enhanced_cli.console, 'print') as mock_print: |
| await enhanced_cli._disconnect_sshout() |
| mock_sshout_integration.disconnect_from_sshout_api.assert_called_once() |
| mock_print.assert_called_once_with("[yellow]🔌 SSHOUT连接已断开[/yellow]") |
| |
| @pytest.mark.asyncio |
| async def test_disconnect_sshout_ssh_mode(self, enhanced_cli): |
| """测试断连SSHOUT(SSH模式)""" |
| mock_sshout = Mock() |
| mock_sshout.disconnect_from_sshout = AsyncMock() |
| # 确保没有disconnect_from_sshout_api属性,这样就会使用SSH模式 |
| if hasattr(mock_sshout, 'disconnect_from_sshout_api'): |
| delattr(mock_sshout, 'disconnect_from_sshout_api') |
| enhanced_cli.sshout_integration = mock_sshout |
| |
| with patch.object(enhanced_cli.console, 'print') as mock_print: |
| await enhanced_cli._disconnect_sshout() |
| mock_sshout.disconnect_from_sshout.assert_called_once() |
| mock_print.assert_called_once_with("[yellow]🔌 SSHOUT连接已断开[/yellow]") |
| |
| @pytest.mark.asyncio |
| async def test_disconnect_sshout_no_message(self, enhanced_cli, mock_sshout_integration): |
| """测试断连SSHOUT(不显示消息)""" |
| enhanced_cli.sshout_integration = mock_sshout_integration |
| with patch.object(enhanced_cli.console, 'print') as mock_print: |
| await enhanced_cli._disconnect_sshout(show_message=False) |
| mock_print.assert_not_called() |
| |
| @pytest.mark.asyncio |
| async def test_sshout_status_no_integration(self, enhanced_cli): |
| """测试SSHOUT状态(无集成)""" |
| enhanced_cli.sshout_integration = None |
| with patch.object(enhanced_cli.console, 'print') as mock_print: |
| await enhanced_cli._sshout_status() |
| mock_print.assert_called_once_with("[red]SSHOUT集成未初始化[/red]") |
| |
| @pytest.mark.asyncio |
| async def test_sshout_status_detailed(self, enhanced_cli): |
| """测试SSHOUT详细状态""" |
| mock_sshout = Mock() |
| mock_sshout.get_connection_status.return_value = { |
| 'connected': True, |
| 'server': 'test.server.com', |
| 'message_count': 15, |
| 'api_version': 'v2.0', |
| 'my_user_id': 123, |
| 'my_username': 'testuser', |
| 'recent_messages': [ |
| { |
| 'timestamp': '2023-01-01 12:00:00', |
| 'username': 'user1', |
| 'content': 'Hello', |
| 'from_user': 'user1' # 测试兼容格式 |
| } |
| ] |
| } |
| enhanced_cli.sshout_integration = mock_sshout |
| |
| with patch.object(enhanced_cli.console, 'print') as mock_print: |
| await enhanced_cli._sshout_status() |
| mock_print.assert_called_once() |
| |
| @pytest.mark.asyncio |
| async def test_sshout_send_message_no_integration(self, enhanced_cli): |
| """测试SSHOUT发送消息(无集成)""" |
| enhanced_cli.sshout_integration = None |
| with patch.object(enhanced_cli.console, 'print') as mock_print: |
| await enhanced_cli._sshout_send_message("test message") |
| mock_print.assert_called_once_with("[red]SSHOUT集成未初始化[/red]") |
| |
| @pytest.mark.asyncio |
| async def test_sshout_send_message_success(self, enhanced_cli, mock_sshout_integration): |
| """测试SSHOUT发送消息成功""" |
| enhanced_cli.sshout_integration = mock_sshout_integration |
| with patch.object(enhanced_cli.console, 'print') as mock_print: |
| await enhanced_cli._sshout_send_message("test message") |
| mock_print.assert_called_once_with("[green]✅ 消息已发送: test message[/green]") |
| |
| @pytest.mark.asyncio |
| async def test_sshout_send_message_failure(self, enhanced_cli, mock_sshout_integration): |
| """测试SSHOUT发送消息失败""" |
| mock_sshout_integration.send_message.return_value = False |
| enhanced_cli.sshout_integration = mock_sshout_integration |
| |
| with patch.object(enhanced_cli.console, 'print') as mock_print: |
| await enhanced_cli._sshout_send_message("test message") |
| mock_print.assert_called_once_with("[red]❌ 消息发送失败。请检查SSHOUT连接状态。[/red]") |
| |
| |
| class TestEnhancedCLIUserInputProcessing: |
| """测试增强CLI的用户输入处理""" |
| |
| @pytest.mark.asyncio |
| async def test_process_user_input_interactive_mode(self, enhanced_cli, mock_agent, mock_mcp_integration): |
| """测试处理用户输入(交互模式)""" |
| enhanced_cli.agent = mock_agent |
| enhanced_cli.mcp_integration = mock_mcp_integration |
| mock_agent.thinking_mode = ThinkingMode.INTERACTIVE |
| |
| with patch.object(enhanced_cli.console, 'print') as mock_print: |
| await enhanced_cli.process_user_input("test input") |
| mock_agent.process_user_input.assert_called_once_with("test input") |
| mock_mcp_integration.process_tool_calls_in_response.assert_called_once() |
| assert mock_print.call_count >= 2 # 至少包含角色提示和响应面板 |
| |
| @pytest.mark.asyncio |
| async def test_process_user_input_yolo_mode(self, enhanced_cli, mock_agent, mock_mcp_integration): |
| """测试处理用户输入(YOLO模式)""" |
| enhanced_cli.agent = mock_agent |
| enhanced_cli.mcp_integration = mock_mcp_integration |
| mock_agent.thinking_mode = ThinkingMode.YOLO |
| |
| with patch.object(enhanced_cli.console, 'print') as mock_print: |
| await enhanced_cli.process_user_input("test input") |
| mock_agent.process_user_input.assert_called_once_with("test input") |
| mock_mcp_integration.process_tool_calls_in_response.assert_called_once() |
| assert mock_print.call_count >= 2 |
| |
| @pytest.mark.asyncio |
| async def test_process_user_input_exception(self, enhanced_cli, mock_agent, mock_mcp_integration): |
| """测试处理用户输入异常""" |
| enhanced_cli.agent = mock_agent |
| enhanced_cli.mcp_integration = mock_mcp_integration |
| mock_agent.process_user_input.side_effect = Exception("Processing error") |
| |
| with patch.object(enhanced_cli.console, 'print') as mock_print: |
| await enhanced_cli.process_user_input("test input") |
| mock_print.assert_called_with("[red]处理请求时出错: Processing error[/red]") |
| |
| |
| class TestEnhancedCLIInteractiveLoop: |
| """测试增强CLI的交互循环""" |
| |
| @pytest.mark.asyncio |
| async def test_run_interactive_loop_normal_flow(self, enhanced_cli): |
| """测试正常的交互循环流程""" |
| # 模拟用户输入序列:一些命令然后退出 |
| user_inputs = ["help", "/status", "/quit"] |
| input_iterator = iter(user_inputs) |
| |
| def mock_get_input(): |
| try: |
| return next(input_iterator) |
| except StopIteration: |
| return "/quit" |
| |
| with patch.object(enhanced_cli, 'show_welcome'), \ |
| patch.object(enhanced_cli, 'get_user_input', side_effect=lambda: mock_get_input()), \ |
| patch.object(enhanced_cli, 'process_command', side_effect=[True, True, False]): |
| |
| await enhanced_cli.run_interactive_loop() |
| |
| @pytest.mark.asyncio |
| async def test_run_interactive_loop_empty_input(self, enhanced_cli): |
| """测试交互循环空输入""" |
| user_inputs = ["", "/quit"] |
| input_iterator = iter(user_inputs) |
| |
| def mock_get_input(): |
| try: |
| return next(input_iterator) |
| except StopIteration: |
| return "/quit" |
| |
| with patch.object(enhanced_cli, 'show_welcome'), \ |
| patch.object(enhanced_cli, 'get_user_input', side_effect=lambda: mock_get_input()), \ |
| patch.object(enhanced_cli, 'process_command', side_effect=[True, False]): |
| |
| await enhanced_cli.run_interactive_loop() |
| |
| @pytest.mark.asyncio |
| async def test_run_interactive_loop_keyboard_interrupt_continue(self, enhanced_cli): |
| """测试交互循环键盘中断(继续)""" |
| interrupt_count = 0 |
| |
| def mock_get_input(): |
| nonlocal interrupt_count |
| interrupt_count += 1 |
| if interrupt_count == 1: |
| raise KeyboardInterrupt() |
| return "/quit" |
| |
| with patch.object(enhanced_cli, 'show_welcome'), \ |
| patch.object(enhanced_cli, 'get_user_input', side_effect=mock_get_input), \ |
| patch.object(enhanced_cli, 'confirm_action', side_effect=[False, True]), \ |
| patch.object(enhanced_cli, 'process_command', side_effect=[False]), \ |
| patch.object(enhanced_cli.console, 'print') as mock_print: |
| |
| await enhanced_cli.run_interactive_loop() |
| # 验证显示了"继续..."消息 |
| assert any("继续..." in str(call) for call in mock_print.call_args_list) |
| |
| @pytest.mark.asyncio |
| async def test_run_interactive_loop_keyboard_interrupt_exit(self, enhanced_cli): |
| """测试交互循环键盘中断(退出)""" |
| def mock_get_input(): |
| raise KeyboardInterrupt() |
| |
| with patch.object(enhanced_cli, 'show_welcome'), \ |
| patch.object(enhanced_cli, 'get_user_input', side_effect=mock_get_input), \ |
| patch.object(enhanced_cli, 'confirm_action', return_value=True): |
| |
| await enhanced_cli.run_interactive_loop() |
| |
| @pytest.mark.asyncio |
| async def test_run_interactive_loop_eof_error(self, enhanced_cli): |
| """测试交互循环EOF错误""" |
| def mock_get_input(): |
| raise EOFError() |
| |
| with patch.object(enhanced_cli, 'show_welcome'), \ |
| patch.object(enhanced_cli, 'get_user_input', side_effect=mock_get_input): |
| |
| await enhanced_cli.run_interactive_loop() |
| |
| @pytest.mark.asyncio |
| async def test_run_interactive_loop_unexpected_exception(self, enhanced_cli): |
| """测试交互循环意外异常""" |
| exception_count = 0 |
| |
| def mock_get_input(): |
| nonlocal exception_count |
| exception_count += 1 |
| if exception_count == 1: |
| raise ValueError("Unexpected error") |
| return "/quit" |
| |
| with patch.object(enhanced_cli, 'show_welcome'), \ |
| patch.object(enhanced_cli, 'get_user_input', side_effect=mock_get_input), \ |
| patch.object(enhanced_cli, 'process_command', side_effect=[False]), \ |
| patch.object(enhanced_cli.console, 'print') as mock_print: |
| |
| await enhanced_cli.run_interactive_loop() |
| # 验证显示了错误消息 |
| assert any("发生未知错误" in str(call) for call in mock_print.call_args_list) |
| |
| |
| class TestEnhancedCLIShutdown: |
| """测试增强CLI的关闭""" |
| |
| @pytest.mark.asyncio |
| async def test_shutdown_complete(self, enhanced_cli, mock_mcp_integration, mock_sshout_integration): |
| """测试完整的关闭过程""" |
| enhanced_cli.mcp_integration = mock_mcp_integration |
| enhanced_cli.sshout_integration = mock_sshout_integration |
| |
| with patch.object(enhanced_cli, '_disconnect_sshout') as mock_disconnect, \ |
| patch.object(enhanced_cli.console, 'print') as mock_print: |
| |
| await enhanced_cli.shutdown() |
| |
| mock_mcp_integration.shutdown.assert_called_once() |
| mock_disconnect.assert_called_once_with(show_message=False) |
| mock_print.assert_called_once_with("[yellow]再见![/yellow]") |
| |
| @pytest.mark.asyncio |
| async def test_shutdown_no_integrations(self, enhanced_cli): |
| """测试关闭(无集成)""" |
| enhanced_cli.mcp_integration = None |
| enhanced_cli.sshout_integration = None |
| |
| with patch.object(enhanced_cli.console, 'print') as mock_print: |
| await enhanced_cli.shutdown() |
| mock_print.assert_called_once_with("[yellow]再见![/yellow]") |
| |
| |
| class TestEnhancedCLIProgressLogHandler: |
| """测试增强CLI的进度日志处理器""" |
| |
| @pytest.mark.asyncio |
| async def test_progress_log_handler_thinking_indicators(self, enhanced_cli, mock_agent, mock_mcp_integration): |
| """测试进度日志处理器识别思考指示器""" |
| enhanced_cli.agent = mock_agent |
| enhanced_cli.mcp_integration = mock_mcp_integration |
| mock_agent.thinking_mode = ThinkingMode.YOLO |
| |
| # 模拟包含思考指示器的日志记录 |
| with patch('logging.getLogger') as mock_get_logger: |
| mock_logger = Mock() |
| mock_get_logger.return_value = mock_logger |
| |
| await enhanced_cli.process_user_input("test input") |
| |
| # 验证日志处理器被添加和移除 |
| mock_logger.addHandler.assert_called_once() |
| mock_logger.removeHandler.assert_called_once() |