| """ |
| interface.py的完整测试覆盖 |
| 针对95%覆盖率目标设计,覆盖所有missing lines |
| """ |
| |
| import pytest |
| from unittest.mock import Mock, AsyncMock, patch, MagicMock |
| import asyncio |
| import logging |
| from io import StringIO |
| |
| from src.claude_agent.cli.interface import CLIInterface |
| from src.claude_agent.core.agent import AgentCore, ThinkingMode |
| |
| |
| @pytest.fixture |
| def cli_interface(): |
| """创建CLIInterface实例""" |
| return CLIInterface() |
| |
| |
| @pytest.fixture |
| def mock_agent(): |
| """创建模拟的AgentCore""" |
| agent = Mock(spec=AgentCore) |
| agent.thinking_mode = ThinkingMode.INTERACTIVE |
| agent.conversation_history = [] |
| agent.process_user_input = AsyncMock(return_value="Mock response") |
| agent.get_conversation_history = Mock(return_value=[]) |
| agent.clear_history = Mock() |
| agent.set_thinking_mode = Mock() |
| return agent |
| |
| |
| @pytest.fixture |
| def mock_mcp_integration(): |
| """创建模拟的MCP集成""" |
| mcp = Mock() |
| mcp.setup_default_tools = AsyncMock() |
| mcp.enhance_agent_with_tools = AsyncMock() |
| mcp.process_tool_calls_in_response = AsyncMock(return_value="Enhanced response") |
| mcp.shutdown = AsyncMock() |
| return mcp |
| |
| |
| class TestCLIInterfaceComprehensive: |
| """CLI接口的完全测试覆盖""" |
| |
| def test_init(self, cli_interface): |
| """测试初始化""" |
| assert cli_interface.console is not None |
| assert cli_interface.agent is None |
| assert cli_interface.mcp_integration is None |
| |
| def test_setup_logging(self, cli_interface): |
| """测试日志设置""" |
| cli_interface.setup_logging() |
| # 验证日志配置 |
| |
| @pytest.mark.asyncio |
| async def test_initialize_success(self, cli_interface, mock_mcp_integration): |
| """测试成功初始化""" |
| with patch('src.claude_agent.cli.interface.AgentCore') as mock_agent_class, \ |
| patch('src.claude_agent.cli.interface.MCPToolIntegration', return_value=mock_mcp_integration): |
| |
| mock_agent_instance = Mock() |
| mock_agent_class.return_value = mock_agent_instance |
| |
| await cli_interface.initialize("test-api-key") |
| |
| assert cli_interface.agent == mock_agent_instance |
| assert cli_interface.mcp_integration == mock_mcp_integration |
| mock_mcp_integration.setup_default_tools.assert_called_once() |
| mock_mcp_integration.enhance_agent_with_tools.assert_called_once() |
| |
| def test_show_welcome(self, cli_interface): |
| """测试显示欢迎信息""" |
| with patch.object(cli_interface.console, 'print') as mock_print: |
| cli_interface.show_welcome() |
| mock_print.assert_called() |
| |
| def test_show_help(self, cli_interface): |
| """测试显示帮助""" |
| with patch.object(cli_interface.console, 'print') as mock_print: |
| cli_interface.show_help() |
| # 应该有多次调用 |
| assert mock_print.call_count >= 1 |
| |
| def test_show_status_with_agent(self, cli_interface, mock_agent): |
| """测试显示状态(有Agent)""" |
| cli_interface.agent = mock_agent |
| cli_interface.mcp_integration = Mock() |
| cli_interface.mcp_integration.tool_manager = Mock() |
| cli_interface.mcp_integration.tool_manager.tools = ['tool1', 'tool2'] |
| |
| with patch.object(cli_interface.console, 'print') as mock_print: |
| cli_interface.show_status() |
| mock_print.assert_called() |
| |
| def test_show_status_without_agent(self, cli_interface): |
| """测试显示状态(无Agent)""" |
| cli_interface.agent = None |
| cli_interface.mcp_integration = None |
| |
| with patch.object(cli_interface.console, 'print') as mock_print: |
| cli_interface.show_status() |
| mock_print.assert_called() |
| |
| def test_show_tools_with_mcp(self, cli_interface, mock_mcp_integration): |
| """测试显示工具(有MCP)""" |
| cli_interface.mcp_integration = mock_mcp_integration |
| mock_mcp_integration.tool_manager = Mock() |
| mock_mcp_integration.tool_manager.get_available_tools.return_value = ['tool1', 'tool2'] |
| mock_mcp_integration.tool_manager.get_tool_info.return_value = {'description': 'test tool'} |
| |
| with patch.object(cli_interface.console, 'print') as mock_print: |
| cli_interface.show_tools() |
| mock_print.assert_called() |
| |
| def test_show_tools_without_mcp(self, cli_interface): |
| """测试显示工具(无MCP)""" |
| cli_interface.mcp_integration = None |
| |
| with patch.object(cli_interface.console, 'print') as mock_print: |
| cli_interface.show_tools() |
| mock_print.assert_called() |
| |
| def test_show_tools_no_tools(self, cli_interface, mock_mcp_integration): |
| """测试显示工具(无可用工具)""" |
| cli_interface.mcp_integration = mock_mcp_integration |
| mock_mcp_integration.tool_manager = Mock() |
| mock_mcp_integration.tool_manager.get_available_tools.return_value = [] |
| |
| with patch.object(cli_interface.console, 'print') as mock_print: |
| cli_interface.show_tools() |
| mock_print.assert_called() |
| |
| def test_show_history_with_agent(self, cli_interface, mock_agent): |
| """测试显示历史(有Agent)""" |
| cli_interface.agent = mock_agent |
| history = [ |
| {"role": "user", "content": "Hello"}, |
| {"role": "assistant", "content": "Hi there"} |
| ] |
| mock_agent.get_conversation_history.return_value = history |
| |
| with patch.object(cli_interface.console, 'print') as mock_print: |
| cli_interface.show_history() |
| # 应该显示历史消息 |
| assert mock_print.call_count >= len(history) |
| |
| def test_show_history_without_agent(self, cli_interface): |
| """测试显示历史(无Agent)""" |
| cli_interface.agent = None |
| |
| with patch.object(cli_interface.console, 'print') as mock_print: |
| cli_interface.show_history() |
| # 应该显示错误消息 |
| mock_print.assert_called() |
| |
| def test_show_history_empty(self, cli_interface, mock_agent): |
| """测试显示空历史""" |
| cli_interface.agent = mock_agent |
| mock_agent.get_conversation_history.return_value = [] |
| |
| with patch.object(cli_interface.console, 'print') as mock_print: |
| cli_interface.show_history() |
| mock_print.assert_called() |
| |
| @pytest.mark.asyncio |
| async def test_switch_mode_interactive_to_yolo(self, cli_interface, mock_agent): |
| """测试从交互模式切换到YOLO""" |
| cli_interface.agent = mock_agent |
| mock_agent.thinking_mode = ThinkingMode.INTERACTIVE |
| |
| with patch('rich.prompt.Confirm.ask', return_value=True): |
| await cli_interface.switch_mode() |
| mock_agent.set_thinking_mode.assert_called_with(ThinkingMode.YOLO) |
| |
| @pytest.mark.asyncio |
| async def test_switch_mode_yolo_to_interactive(self, cli_interface, mock_agent): |
| """测试从YOLO模式切换到交互""" |
| cli_interface.agent = mock_agent |
| mock_agent.thinking_mode = ThinkingMode.YOLO |
| |
| with patch('rich.prompt.Confirm.ask', return_value=True): |
| await cli_interface.switch_mode() |
| mock_agent.set_thinking_mode.assert_called_with(ThinkingMode.INTERACTIVE) |
| |
| @pytest.mark.asyncio |
| async def test_switch_mode_cancelled(self, cli_interface, mock_agent): |
| """测试取消模式切换""" |
| cli_interface.agent = mock_agent |
| mock_agent.thinking_mode = ThinkingMode.INTERACTIVE |
| |
| with patch('rich.prompt.Confirm.ask', return_value=False), \ |
| patch.object(cli_interface.console, 'print') as mock_print: |
| await cli_interface.switch_mode() |
| mock_agent.set_thinking_mode.assert_not_called() |
| mock_print.assert_called() |
| |
| @pytest.mark.asyncio |
| async def test_process_command_help(self, cli_interface): |
| """测试help命令""" |
| with patch.object(cli_interface, 'show_help') as mock_show_help: |
| await cli_interface.process_command("help") |
| mock_show_help.assert_called_once() |
| |
| @pytest.mark.asyncio |
| async def test_process_command_status(self, cli_interface): |
| """测试status命令""" |
| with patch.object(cli_interface, 'show_status') as mock_show_status: |
| await cli_interface.process_command("status") |
| mock_show_status.assert_called_once() |
| |
| @pytest.mark.asyncio |
| async def test_process_command_tools(self, cli_interface): |
| """测试tools命令""" |
| with patch.object(cli_interface, 'show_tools') as mock_show_tools: |
| await cli_interface.process_command("tools") |
| mock_show_tools.assert_called_once() |
| |
| @pytest.mark.asyncio |
| async def test_process_command_history(self, cli_interface): |
| """测试history命令""" |
| with patch.object(cli_interface, 'show_history') as mock_show_history: |
| await cli_interface.process_command("history") |
| mock_show_history.assert_called_once() |
| |
| @pytest.mark.asyncio |
| async def test_process_command_clear(self, cli_interface, mock_agent): |
| """测试clear命令""" |
| cli_interface.agent = mock_agent |
| |
| with patch('rich.prompt.Confirm.ask', return_value=True): |
| await cli_interface.process_command("clear") |
| mock_agent.clear_history.assert_called_once() |
| |
| @pytest.mark.asyncio |
| async def test_process_command_mode(self, cli_interface): |
| """测试mode命令""" |
| with patch.object(cli_interface, 'switch_mode') as mock_switch_mode: |
| await cli_interface.process_command("mode") |
| mock_switch_mode.assert_called_once() |
| |
| @pytest.mark.asyncio |
| async def test_process_command_quit(self, cli_interface): |
| """测试quit命令""" |
| result = await cli_interface.process_command("quit") |
| assert result is False |
| |
| @pytest.mark.asyncio |
| async def test_process_command_exit(self, cli_interface): |
| """测试exit命令""" |
| result = await cli_interface.process_command("exit") |
| assert result is False |
| |
| @pytest.mark.asyncio |
| async def test_process_command_unknown(self, cli_interface): |
| """测试未知命令""" |
| with patch.object(cli_interface.console, 'print') as mock_print: |
| await cli_interface.process_command("unknown") |
| mock_print.assert_called() |
| |
| @pytest.mark.asyncio |
| async def test_process_user_input_interactive_mode(self, cli_interface, mock_agent, mock_mcp_integration): |
| """测试用户输入处理(交互模式)""" |
| cli_interface.agent = mock_agent |
| cli_interface.mcp_integration = mock_mcp_integration |
| mock_agent.thinking_mode = ThinkingMode.INTERACTIVE |
| |
| with patch.object(cli_interface.console, 'print') as mock_print: |
| await cli_interface.process_user_input("test input") |
| mock_agent.process_user_input.assert_called_with("test input") |
| mock_mcp_integration.process_tool_calls_in_response.assert_called() |
| |
| @pytest.mark.asyncio |
| async def test_process_user_input_yolo_mode(self, cli_interface, mock_agent, mock_mcp_integration): |
| """测试用户输入处理(YOLO模式)""" |
| cli_interface.agent = mock_agent |
| cli_interface.mcp_integration = mock_mcp_integration |
| mock_agent.thinking_mode = ThinkingMode.YOLO |
| |
| with patch('logging.getLogger') as mock_get_logger: |
| mock_logger = Mock() |
| mock_get_logger.return_value = mock_logger |
| |
| await cli_interface.process_user_input("test input") |
| mock_agent.process_user_input.assert_called_with("test input") |
| |
| @pytest.mark.asyncio |
| async def test_process_user_input_exception(self, cli_interface, mock_agent, mock_mcp_integration): |
| """测试用户输入处理异常""" |
| cli_interface.agent = mock_agent |
| cli_interface.mcp_integration = mock_mcp_integration |
| mock_agent.process_user_input.side_effect = Exception("Processing error") |
| |
| with patch.object(cli_interface.console, 'print') as mock_print: |
| await cli_interface.process_user_input("test input") |
| # 应该打印错误消息 |
| error_printed = any("error" in str(call).lower() for call in mock_print.call_args_list) |
| assert error_printed |
| |
| @pytest.mark.asyncio |
| async def test_process_user_input_progress_log_handler_emit(self, cli_interface, mock_agent, mock_mcp_integration): |
| """测试进度日志处理器的emit方法""" |
| cli_interface.agent = mock_agent |
| cli_interface.mcp_integration = mock_mcp_integration |
| mock_agent.thinking_mode = ThinkingMode.YOLO |
| |
| # 创建一个包含思考指示器的日志记录 |
| class MockRecord: |
| def __init__(self, msg): |
| self.msg = msg |
| |
| # 测试包含思考指示器的消息 |
| with patch('logging.getLogger') as mock_get_logger: |
| mock_logger = Mock() |
| mock_get_logger.return_value = mock_logger |
| |
| # 模拟日志处理器的添加和移除 |
| handler_added = None |
| |
| def capture_handler(handler): |
| nonlocal handler_added |
| handler_added = handler |
| |
| mock_logger.addHandler.side_effect = capture_handler |
| |
| await cli_interface.process_user_input("test input") |
| |
| # 验证处理器被添加 |
| mock_logger.addHandler.assert_called_once() |
| mock_logger.removeHandler.assert_called_once() |
| |
| # 测试处理器的emit方法 |
| if handler_added: |
| # 测试包含思考指示器的记录 |
| record_with_indicator = MockRecord("🧠 思考中...") |
| handler_added.emit(record_with_indicator) |
| |
| # 测试不包含思考指示器的记录 |
| record_without_indicator = MockRecord("普通日志消息") |
| handler_added.emit(record_without_indicator) |
| |
| |
| class TestCLIInterfaceInteractiveLoop: |
| """测试交互循环的完整覆盖""" |
| |
| @pytest.mark.asyncio |
| async def test_run_interactive_loop_basic_flow(self, cli_interface): |
| """测试基本交互循环流程""" |
| # 模拟用户输入序列 |
| user_inputs = ["help", "status", "quit"] |
| input_iterator = iter(user_inputs) |
| |
| def mock_prompt_ask(prompt, **kwargs): |
| try: |
| return next(input_iterator) |
| except StopIteration: |
| return "quit" |
| |
| with patch('rich.prompt.Prompt.ask', side_effect=mock_prompt_ask), \ |
| patch.object(cli_interface, 'show_welcome'), \ |
| patch.object(cli_interface, 'process_command', side_effect=[True, True, False]): |
| |
| await cli_interface.run_interactive_loop() |
| |
| @pytest.mark.asyncio |
| async def test_run_interactive_loop_empty_input(self, cli_interface): |
| """测试空输入""" |
| user_inputs = ["", " ", "quit"] |
| input_iterator = iter(user_inputs) |
| |
| def mock_prompt_ask(prompt, **kwargs): |
| try: |
| return next(input_iterator) |
| except StopIteration: |
| return "quit" |
| |
| with patch('rich.prompt.Prompt.ask', side_effect=mock_prompt_ask), \ |
| patch.object(cli_interface, 'show_welcome'), \ |
| patch.object(cli_interface, 'process_command', side_effect=[False]): |
| |
| await cli_interface.run_interactive_loop() |
| |
| @pytest.mark.asyncio |
| async def test_run_interactive_loop_keyboard_interrupt_exit(self, cli_interface): |
| """测试键盘中断(选择退出)""" |
| def mock_prompt_ask(prompt, **kwargs): |
| raise KeyboardInterrupt() |
| |
| with patch('rich.prompt.Prompt.ask', side_effect=mock_prompt_ask), \ |
| patch('rich.prompt.Confirm.ask', return_value=True), \ |
| patch.object(cli_interface, 'show_welcome'): |
| |
| await cli_interface.run_interactive_loop() |
| |
| @pytest.mark.asyncio |
| async def test_run_interactive_loop_keyboard_interrupt_continue(self, cli_interface): |
| """测试键盘中断(选择继续)""" |
| interrupt_count = 0 |
| |
| def mock_prompt_ask(prompt, **kwargs): |
| nonlocal interrupt_count |
| interrupt_count += 1 |
| if interrupt_count == 1: |
| raise KeyboardInterrupt() |
| elif interrupt_count == 2: |
| return "quit" |
| else: |
| return "quit" |
| |
| with patch('rich.prompt.Prompt.ask', side_effect=mock_prompt_ask), \ |
| patch('rich.prompt.Confirm.ask', side_effect=[False, True]), \ |
| patch.object(cli_interface, 'show_welcome'), \ |
| patch.object(cli_interface, 'process_command', return_value=False), \ |
| patch.object(cli_interface.console, 'print') as mock_print: |
| |
| await cli_interface.run_interactive_loop() |
| |
| # 验证显示了"继续..."消息 |
| continue_printed = any("继续" in str(call) for call in mock_print.call_args_list) |
| assert continue_printed |
| |
| @pytest.mark.asyncio |
| async def test_run_interactive_loop_eof_error(self, cli_interface): |
| """测试EOF错误""" |
| def mock_prompt_ask(prompt, **kwargs): |
| raise EOFError() |
| |
| with patch('rich.prompt.Prompt.ask', side_effect=mock_prompt_ask), \ |
| patch.object(cli_interface, 'show_welcome'): |
| |
| await cli_interface.run_interactive_loop() |
| |
| @pytest.mark.asyncio |
| async def test_run_interactive_loop_exception_handling(self, cli_interface): |
| """测试异常处理""" |
| exception_count = 0 |
| |
| def mock_prompt_ask(prompt, **kwargs): |
| nonlocal exception_count |
| exception_count += 1 |
| if exception_count == 1: |
| raise ValueError("Test exception") |
| elif exception_count == 2: |
| return "quit" |
| else: |
| return "quit" |
| |
| with patch('rich.prompt.Prompt.ask', side_effect=mock_prompt_ask), \ |
| patch.object(cli_interface, 'show_welcome'), \ |
| patch.object(cli_interface, 'process_command', side_effect=[False]), \ |
| patch.object(cli_interface.console, 'print') as mock_print: |
| |
| await cli_interface.run_interactive_loop() |
| |
| # 验证打印了错误消息 |
| error_printed = any("错误" in str(call) or "error" in str(call).lower() |
| for call in mock_print.call_args_list) |
| assert error_printed |
| |
| @pytest.mark.asyncio |
| async def test_process_user_input_exception_coverage(self, cli_interface, mock_agent, mock_mcp_integration): |
| """测试process_user_input的异常分支覆盖""" |
| cli_interface.agent = mock_agent |
| cli_interface.mcp_integration = mock_mcp_integration |
| |
| # 测试异常处理分支(第303-305行) |
| mock_agent.process_user_input.side_effect = Exception("Processing error") |
| |
| with patch.object(cli_interface.console, 'print') as mock_print: |
| result = await cli_interface.process_user_input("test input") |
| |
| # 验证返回None(第305行) |
| assert result is None |
| |
| # 验证错误消息被打印(第304行) |
| error_calls = [call for call in mock_print.call_args_list |
| if "处理请求时出错" in str(call)] |
| assert len(error_calls) > 0 |
| |
| |
| class TestCLIInterfaceShutdown: |
| """测试关闭流程""" |
| |
| @pytest.mark.asyncio |
| async def test_shutdown_with_mcp(self, cli_interface, mock_mcp_integration): |
| """测试有MCP的关闭""" |
| cli_interface.mcp_integration = mock_mcp_integration |
| |
| with patch.object(cli_interface.console, 'print') as mock_print: |
| await cli_interface.shutdown() |
| mock_mcp_integration.shutdown.assert_called_once() |
| mock_print.assert_called() |
| |
| @pytest.mark.asyncio |
| async def test_shutdown_without_mcp(self, cli_interface): |
| """测试无MCP的关闭""" |
| cli_interface.mcp_integration = None |
| |
| with patch.object(cli_interface.console, 'print') as mock_print: |
| await cli_interface.shutdown() |
| mock_print.assert_called() |
| |
| @pytest.mark.asyncio |
| async def test_shutdown_with_mcp_exception(self, cli_interface, mock_mcp_integration): |
| """测试MCP关闭异常""" |
| cli_interface.mcp_integration = mock_mcp_integration |
| mock_mcp_integration.shutdown.side_effect = Exception("Shutdown error") |
| |
| # shutdown函数没有异常处理,会抛出异常 |
| with pytest.raises(Exception, match="Shutdown error"): |
| await cli_interface.shutdown() |
| |
| |
| class TestCLIInterfaceMainFunction: |
| """测试main函数和相关流程""" |
| |
| def test_main_function_imports(self): |
| """测试main函数可以正确导入""" |
| from src.claude_agent.cli.interface import main |
| assert main is not None |
| |
| def test_main_function_is_click_command(self): |
| """测试main函数是click命令""" |
| from src.claude_agent.cli.interface import main |
| import click |
| assert isinstance(main, click.Command) |
| |
| |
| class TestCLIProgressLogHandler: |
| """测试进度日志处理器的详细功能""" |
| |
| @pytest.mark.asyncio |
| async def test_progress_log_handler_creation_and_cleanup(self, cli_interface, mock_agent, mock_mcp_integration): |
| """测试进度日志处理器的创建和清理""" |
| cli_interface.agent = mock_agent |
| cli_interface.mcp_integration = mock_mcp_integration |
| mock_agent.thinking_mode = ThinkingMode.YOLO |
| |
| handler_calls = [] |
| |
| # 捕获addHandler和removeHandler调用 |
| def capture_add_handler(handler): |
| handler_calls.append(('add', handler)) |
| |
| def capture_remove_handler(handler): |
| handler_calls.append(('remove', handler)) |
| |
| with patch('logging.getLogger') as mock_get_logger: |
| mock_logger = Mock() |
| mock_logger.addHandler.side_effect = capture_add_handler |
| mock_logger.removeHandler.side_effect = capture_remove_handler |
| mock_get_logger.return_value = mock_logger |
| |
| await cli_interface.process_user_input("test input") |
| |
| # 验证处理器被正确添加和移除 |
| assert len(handler_calls) == 2 |
| assert handler_calls[0][0] == 'add' |
| assert handler_calls[1][0] == 'remove' |
| assert handler_calls[0][1] == handler_calls[1][1] # 同一个处理器 |