| """ |
| Agent核心模块的单元测试 |
| """ |
| |
| import pytest |
| import json |
| from unittest.mock import AsyncMock, Mock, patch |
| from claude_agent.core.agent import AgentCore, ThinkingMode, Task |
| |
| |
| class TestAgentCore: |
| """AgentCore类的测试""" |
| |
| @pytest.fixture |
| def mock_query(self): |
| """模拟的query函数""" |
| async def mock_async_generator(): |
| yield Mock(content=[Mock(text="测试响应")]) |
| |
| return mock_async_generator |
| |
| @pytest.fixture |
| def agent(self, mock_query): |
| """测试用的Agent实例""" |
| with patch('claude_agent.core.agent.ClaudeSDKClient') as mock_claude, \ |
| patch('claude_agent.core.agent.query', mock_query): |
| agent = AgentCore(api_key="test_key") |
| return agent |
| |
| def test_agent_initialization(self): |
| """测试Agent初始化""" |
| with patch('claude_agent.core.agent.ClaudeSDKClient') as mock_claude: |
| agent = AgentCore(api_key="test_key") |
| |
| assert agent.thinking_mode == ThinkingMode.INTERACTIVE |
| assert agent.tasks == [] |
| assert agent.conversation_history == [] |
| assert agent.mcp_tools == {} |
| mock_claude.assert_called_once_with(api_key="test_key") |
| |
| def test_set_thinking_mode(self, agent): |
| """测试设置思考模式""" |
| agent.set_thinking_mode(ThinkingMode.YOLO) |
| assert agent.thinking_mode == ThinkingMode.YOLO |
| |
| agent.set_thinking_mode(ThinkingMode.INTERACTIVE) |
| assert agent.thinking_mode == ThinkingMode.INTERACTIVE |
| |
| @pytest.mark.asyncio |
| async def test_interactive_process(self, agent, mock_client): |
| """测试交互模式处理""" |
| user_input = "你好" |
| |
| response = await agent.process_user_input(user_input) |
| |
| assert response == "测试响应" |
| assert len(agent.conversation_history) == 2 |
| assert agent.conversation_history[0]["role"] == "user" |
| assert agent.conversation_history[0]["content"] == user_input |
| assert agent.conversation_history[1]["role"] == "assistant" |
| assert agent.conversation_history[1]["content"] == "测试响应" |
| |
| @pytest.mark.asyncio |
| async def test_yolo_process_with_valid_plan(self, agent, mock_client): |
| """测试YOLO模式处理(有效计划)""" |
| agent.set_thinking_mode(ThinkingMode.YOLO) |
| |
| # 模拟规划响应 |
| plan_response = { |
| "main_task": "测试任务", |
| "subtasks": [ |
| { |
| "id": "task_1", |
| "description": "子任务1", |
| "dependencies": [], |
| "tools_needed": [] |
| } |
| ] |
| } |
| |
| # 设置模拟响应 |
| mock_client.chat.side_effect = [ |
| Mock(content=[Mock(text=f"```json\n{json.dumps(plan_response)}\n```")]), # 规划响应 |
| Mock(content=[Mock(text="任务执行完成")]), # 任务执行响应 |
| Mock(content=[Mock(text="执行总结")]) # 总结响应 |
| ] |
| |
| response = await agent.process_user_input("请帮我完成一个任务") |
| |
| assert response == "执行总结" |
| assert mock_client.chat.call_count == 3 |
| |
| @pytest.mark.asyncio |
| async def test_yolo_process_with_invalid_plan(self, agent, mock_client): |
| """测试YOLO模式处理(无效计划)""" |
| agent.set_thinking_mode(ThinkingMode.YOLO) |
| |
| # 模拟无效的规划响应 |
| mock_client.chat.return_value = Mock(content=[Mock(text="无效的JSON")]) |
| |
| response = await agent.process_user_input("请帮我完成一个任务") |
| |
| assert "无法制定有效的执行计划" in response |
| |
| def test_extract_json_from_response(self, agent): |
| """测试从响应中提取JSON""" |
| # 测试带markdown代码块的JSON |
| response_with_markdown = "```json\n{\"test\": \"value\"}\n```" |
| result = agent._extract_json_from_response(response_with_markdown) |
| assert result == {"test": "value"} |
| |
| # 测试纯JSON |
| response_plain = '{"test": "value"}' |
| result = agent._extract_json_from_response(response_plain) |
| assert result == {"test": "value"} |
| |
| # 测试无效JSON |
| response_invalid = "这不是JSON" |
| result = agent._extract_json_from_response(response_invalid) |
| assert result is None |
| |
| @pytest.mark.asyncio |
| async def test_execute_task_plan(self, agent, mock_client): |
| """测试执行任务计划""" |
| plan = { |
| "main_task": "测试任务", |
| "subtasks": [ |
| { |
| "id": "task_1", |
| "description": "第一个任务", |
| "dependencies": [], |
| "tools_needed": [] |
| }, |
| { |
| "id": "task_2", |
| "description": "第二个任务", |
| "dependencies": ["task_1"], |
| "tools_needed": [] |
| } |
| ] |
| } |
| |
| mock_client.chat.return_value = Mock(content=[Mock(text="任务完成")]) |
| |
| result = await agent._execute_task_plan(plan) |
| |
| assert result["overall_success"] == True |
| assert len(result["completed_tasks"]) == 2 |
| assert len(result["failed_tasks"]) == 0 |
| |
| def test_add_mcp_tool(self, agent): |
| """测试添加MCP工具""" |
| tool_instance = Mock() |
| agent.add_mcp_tool("test_tool", tool_instance) |
| |
| assert "test_tool" in agent.mcp_tools |
| assert agent.mcp_tools["test_tool"] == tool_instance |
| |
| def test_conversation_history_management(self, agent): |
| """测试对话历史管理""" |
| # 添加一些历史记录 |
| agent.conversation_history = [ |
| {"role": "user", "content": "你好"}, |
| {"role": "assistant", "content": "你好!"} |
| ] |
| |
| # 获取历史记录 |
| history = agent.get_conversation_history() |
| assert len(history) == 2 |
| assert history != agent.conversation_history # 应该是副本 |
| |
| # 清空历史记录 |
| agent.clear_history() |
| assert len(agent.conversation_history) == 0 |
| assert len(agent.tasks) == 0 |
| |
| |
| class TestTask: |
| """Task数据类的测试""" |
| |
| def test_task_creation(self): |
| """测试任务创建""" |
| task = Task(id="test_1", description="测试任务") |
| |
| assert task.id == "test_1" |
| assert task.description == "测试任务" |
| assert task.status == "pending" |
| assert task.result is None |
| assert task.subtasks == [] |
| |
| def test_task_with_subtasks(self): |
| """测试带子任务的任务""" |
| subtask = Task(id="sub_1", description="子任务") |
| task = Task( |
| id="main_1", |
| description="主任务", |
| subtasks=[subtask] |
| ) |
| |
| assert len(task.subtasks) == 1 |
| assert task.subtasks[0] == subtask |