| """ |
| Claude Agent 增强功能黑盒测试套件 |
| 端到端测试CLI工具的完整功能 |
| """ |
| |
| import unittest |
| import asyncio |
| import subprocess |
| import os |
| import sys |
| import time |
| import signal |
| import tempfile |
| from unittest.mock import patch, Mock |
| |
| # 添加项目根目录到Python路径 |
| project_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) |
| sys.path.insert(0, os.path.join(project_root, 'src')) |
| |
| class TestClaudeAgentBlackBox(unittest.TestCase): |
| """Claude Agent 黑盒测试""" |
| |
| def setUp(self): |
| """设置测试环境""" |
| self.project_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) |
| self.main_script = os.path.join(self.project_root, "scripts", "main_enhanced.py") |
| self.test_api_key = "test-api-key-12345" |
| |
| def test_help_command_display(self): |
| """测试帮助命令显示""" |
| result = subprocess.run([ |
| sys.executable, self.main_script, "--help" |
| ], capture_output=True, text=True, cwd=self.project_root) |
| |
| self.assertEqual(result.returncode, 0) |
| self.assertIn("Claude Agent 增强版命令行工具", result.stdout) |
| self.assertIn("行编辑和历史翻查", result.stdout) |
| self.assertIn("SSHOUT 聊天室集成", result.stdout) |
| self.assertIn("@Claude 自动响应", result.stdout) |
| self.assertIn("--sshout", result.stdout) |
| self.assertIn("--mode", result.stdout) |
| |
| def test_version_and_basic_startup(self): |
| """测试基本启动和版本信息""" |
| # 测试快速退出(非交互模式) |
| with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as tmp: |
| tmp.write(""" |
| import sys |
| import os |
| sys.path.insert(0, r'{0}') |
| |
| # 设置环境变量 |
| os.environ['ANTHROPIC_API_KEY'] = '{1}' |
| |
| try: |
| from claude_agent.cli.enhanced_interface import EnhancedCLIInterface |
| print("✅ CLI模块导入成功") |
| |
| from claude_agent.sshout.integration import SSHOUTIntegration |
| print("✅ SSHOUT模块导入成功") |
| |
| from claude_agent.core.agent import AgentCore, ThinkingMode |
| print("✅ Agent核心模块导入成功") |
| |
| print("✅ 所有关键模块导入测试通过") |
| |
| except Exception as e: |
| print(f"❌ 模块导入失败: {{e}}") |
| sys.exit(1) |
| """.format(os.path.join(self.project_root, 'src'), self.test_api_key)) |
| tmp.flush() |
| |
| result = subprocess.run([ |
| sys.executable, tmp.name |
| ], capture_output=True, text=True, timeout=30) |
| |
| os.unlink(tmp.name) |
| |
| self.assertEqual(result.returncode, 0, f"Import test failed: {result.stderr}") |
| self.assertIn("✅ 所有关键模块导入测试通过", result.stdout) |
| |
| def test_command_line_arguments_parsing(self): |
| """测试命令行参数解析""" |
| test_cases = [ |
| (["--mode", "yolo", "--help"], 0), |
| (["--mode", "interactive", "--help"], 0), |
| (["--api-key", "test-key", "--help"], 0), |
| (["--sshout", "--help"], 0), |
| (["--no-interactive", "--help"], 0), |
| ] |
| |
| for args, expected_code in test_cases: |
| with self.subTest(args=args): |
| result = subprocess.run([ |
| sys.executable, self.main_script |
| ] + args, capture_output=True, text=True, cwd=self.project_root) |
| |
| self.assertEqual(result.returncode, expected_code) |
| |
| def test_invalid_arguments(self): |
| """测试无效参数处理""" |
| invalid_cases = [ |
| ["--mode", "invalid_mode"], |
| ["--invalid-option"], |
| ] |
| |
| for args in invalid_cases: |
| with self.subTest(args=args): |
| result = subprocess.run([ |
| sys.executable, self.main_script |
| ] + args, capture_output=True, text=True, cwd=self.project_root) |
| |
| # Click 通常返回2作为参数错误的退出码 |
| self.assertNotEqual(result.returncode, 0) |
| |
| def test_sshout_integration_imports(self): |
| """测试SSHOUT集成相关导入""" |
| with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as tmp: |
| tmp.write(""" |
| import sys |
| sys.path.insert(0, r'{0}') |
| |
| try: |
| from claude_agent.sshout.integration import SSHOUTConnection, SSHOUTMessage, SSHOUTIntegration |
| print("✅ SSHOUT核心类导入成功") |
| |
| # 测试创建连接对象 |
| conn = SSHOUTConnection("test.example.com", 22, "testuser", "/path/to/key") |
| print("✅ SSHOUT连接对象创建成功") |
| |
| # 测试消息对象 |
| from datetime import datetime |
| msg = SSHOUTMessage(datetime.now(), "user", "content") |
| print("✅ SSHOUT消息对象创建成功") |
| |
| # 测试消息解析功能 |
| test_lines = [ |
| "[14:30:25] <user1> 大家好!", |
| "<user2> @Claude 你好吗?", |
| "user3: 这是一条测试消息" |
| ] |
| |
| for line in test_lines: |
| parsed = conn._parse_message(line) |
| if parsed: |
| print(f"✅ 消息解析成功: {{line}}") |
| else: |
| print(f"❌ 消息解析失败: {{line}}") |
| |
| # 测试@Claude检测 |
| claude_mentions = [ |
| "@Claude 你好", |
| "@claude 帮我一下", |
| "Claude: 你在吗?" |
| ] |
| |
| for mention in claude_mentions: |
| if conn._is_claude_mention(mention): |
| print(f"✅ @Claude检测成功: {{mention}}") |
| else: |
| print(f"❌ @Claude检测失败: {{mention}}") |
| |
| print("✅ SSHOUT功能测试全部通过") |
| |
| except Exception as e: |
| print(f"❌ SSHOUT测试失败: {{e}}") |
| import traceback |
| traceback.print_exc() |
| sys.exit(1) |
| """.format(os.path.join(self.project_root, 'src'))) |
| tmp.flush() |
| |
| result = subprocess.run([ |
| sys.executable, tmp.name |
| ], capture_output=True, text=True, timeout=30) |
| |
| os.unlink(tmp.name) |
| |
| self.assertEqual(result.returncode, 0, f"SSHOUT test failed: {result.stderr}") |
| self.assertIn("✅ SSHOUT功能测试全部通过", result.stdout) |
| |
| def test_cli_enhanced_features(self): |
| """测试CLI增强功能""" |
| with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as tmp: |
| tmp.write(""" |
| import sys |
| sys.path.insert(0, r'{0}') |
| |
| try: |
| from claude_agent.cli.enhanced_interface import EnhancedCLIInterface |
| from unittest.mock import patch, Mock |
| |
| # 测试CLI初始化 |
| with patch('claude_agent.cli.enhanced_interface.PromptSession'): |
| with patch('claude_agent.cli.enhanced_interface.Console'): |
| cli = EnhancedCLIInterface() |
| print("✅ 增强CLI对象创建成功") |
| |
| # 测试命令补全器 |
| expected_commands = [ |
| 'help', 'mode', 'history', 'clear', 'tools', 'status', |
| 'sshout', 'connect', 'disconnect', 'send' |
| ] |
| |
| completer_words = cli.command_completer.words |
| for command in expected_commands: |
| if command in completer_words: |
| print(f"✅ 命令补全包含: {{command}}") |
| else: |
| print(f"❌ 命令补全缺失: {{command}}") |
| |
| print("✅ CLI增强功能测试通过") |
| |
| except Exception as e: |
| print(f"❌ CLI测试失败: {{e}}") |
| import traceback |
| traceback.print_exc() |
| sys.exit(1) |
| """.format(os.path.join(self.project_root, 'src'))) |
| tmp.flush() |
| |
| result = subprocess.run([ |
| sys.executable, tmp.name |
| ], capture_output=True, text=True, timeout=30) |
| |
| os.unlink(tmp.name) |
| |
| self.assertEqual(result.returncode, 0, f"CLI test failed: {result.stderr}") |
| self.assertIn("✅ CLI增强功能测试通过", result.stdout) |
| |
| def test_agent_core_functionality(self): |
| """测试Agent核心功能""" |
| with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as tmp: |
| tmp.write(""" |
| import sys |
| import asyncio |
| sys.path.insert(0, r'{0}') |
| |
| async def test_agent(): |
| try: |
| from claude_agent.core.agent import AgentCore, ThinkingMode |
| |
| # 测试Agent创建 |
| agent = AgentCore(api_key="{1}") |
| print("✅ Agent核心对象创建成功") |
| |
| # 测试思考模式设置 |
| agent.set_thinking_mode(ThinkingMode.YOLO) |
| print("✅ YOLO模式设置成功") |
| |
| agent.set_thinking_mode(ThinkingMode.INTERACTIVE) |
| print("✅ 交互模式设置成功") |
| |
| # 测试历史管理 |
| agent.clear_history() |
| print("✅ 历史清空功能正常") |
| |
| history = agent.get_conversation_history() |
| print(f"✅ 历史获取功能正常,当前历史条数: {{len(history)}}") |
| |
| print("✅ Agent核心功能测试全部通过") |
| |
| except Exception as e: |
| print(f"❌ Agent测试失败: {{e}}") |
| import traceback |
| traceback.print_exc() |
| sys.exit(1) |
| |
| # 运行异步测试 |
| asyncio.run(test_agent()) |
| """.format(os.path.join(self.project_root, 'src'), self.test_api_key)) |
| tmp.flush() |
| |
| result = subprocess.run([ |
| sys.executable, tmp.name |
| ], capture_output=True, text=True, timeout=30) |
| |
| os.unlink(tmp.name) |
| |
| self.assertEqual(result.returncode, 0, f"Agent test failed: {result.stderr}") |
| self.assertIn("✅ Agent核心功能测试全部通过", result.stdout) |
| |
| def test_mcp_integration_imports(self): |
| """测试MCP集成导入""" |
| with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as tmp: |
| tmp.write(""" |
| import sys |
| sys.path.insert(0, r'{0}') |
| |
| try: |
| from claude_agent.mcp.integration import MCPToolIntegration, MCPToolManager |
| print("✅ MCP集成模块导入成功") |
| |
| # 测试工具管理器创建 |
| tool_manager = MCPToolManager() |
| print("✅ MCP工具管理器创建成功") |
| |
| # 测试获取可用工具 |
| tools = tool_manager.get_available_tools() |
| print(f"✅ 获取可用工具成功,工具数量: {{len(tools)}}") |
| |
| print("✅ MCP集成测试通过") |
| |
| except Exception as e: |
| print(f"❌ MCP测试失败: {{e}}") |
| import traceback |
| traceback.print_exc() |
| sys.exit(1) |
| """.format(os.path.join(self.project_root, 'src'))) |
| tmp.flush() |
| |
| result = subprocess.run([ |
| sys.executable, tmp.name |
| ], capture_output=True, text=True, timeout=30) |
| |
| os.unlink(tmp.name) |
| |
| self.assertEqual(result.returncode, 0, f"MCP test failed: {result.stderr}") |
| self.assertIn("✅ MCP集成测试通过", result.stdout) |
| |
| def test_file_structure_integrity(self): |
| """测试文件结构完整性""" |
| required_files = [ |
| "scripts/main_enhanced.py", |
| "src/claude_agent/__init__.py", |
| "src/claude_agent/core/agent.py", |
| "src/claude_agent/cli/enhanced_interface.py", |
| "src/claude_agent/sshout/integration.py", |
| "src/claude_agent/mcp/integration.py", |
| "docs/ENHANCED_FEATURES_REPORT.md" |
| ] |
| |
| missing_files = [] |
| for file_path in required_files: |
| full_path = os.path.join(self.project_root, file_path) |
| if not os.path.exists(full_path): |
| missing_files.append(file_path) |
| |
| self.assertEqual(len(missing_files), 0, |
| f"Missing required files: {missing_files}") |
| |
| def test_dependency_availability(self): |
| """测试依赖可用性""" |
| with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as tmp: |
| tmp.write(""" |
| try: |
| # 测试核心依赖 |
| import click |
| print("✅ click 导入成功") |
| |
| import rich |
| from rich.console import Console |
| from rich.panel import Panel |
| print("✅ rich 导入成功") |
| |
| import prompt_toolkit |
| from prompt_toolkit import PromptSession |
| from prompt_toolkit.history import InMemoryHistory |
| print("✅ prompt_toolkit 导入成功") |
| |
| import paramiko |
| print("✅ paramiko 导入成功") |
| |
| import asyncio |
| print("✅ asyncio 导入成功") |
| |
| print("✅ 所有依赖检查通过") |
| |
| except Exception as e: |
| print(f"❌ 依赖检查失败: {{e}}") |
| import traceback |
| traceback.print_exc() |
| sys.exit(1) |
| """) |
| tmp.flush() |
| |
| result = subprocess.run([ |
| sys.executable, tmp.name |
| ], capture_output=True, text=True, timeout=30) |
| |
| os.unlink(tmp.name) |
| |
| self.assertEqual(result.returncode, 0, f"Dependency test failed: {result.stderr}") |
| self.assertIn("✅ 所有依赖检查通过", result.stdout) |
| |
| |
| class TestSSHOUTBlackBox(unittest.TestCase): |
| """SSHOUT集成黑盒测试""" |
| |
| def setUp(self): |
| """设置测试环境""" |
| self.project_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) |
| |
| def test_sshout_message_parsing_comprehensive(self): |
| """测试SSHOUT消息解析 - 综合测试""" |
| with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as tmp: |
| tmp.write(""" |
| import sys |
| sys.path.insert(0, r'{0}') |
| |
| from claude_agent.sshout.integration import SSHOUTConnection |
| |
| # 创建测试连接 |
| conn = SSHOUTConnection("test", 22, "test", "test") |
| |
| # 综合测试用例 |
| test_cases = [ |
| # 标准格式 |
| ("[14:30:25] <user1> 普通消息", True, "user1", "普通消息"), |
| ("<user2> 简单格式消息", True, "user2", "简单格式消息"), |
| ("user3: 冒号格式消息", True, "user3", "冒号格式消息"), |
| |
| # @Claude提及测试 |
| ("[15:20:30] <alice> @Claude 你好吗?", True, "alice", "@Claude 你好吗?"), |
| ("<bob> claude, 帮我一下", True, "bob", "claude, 帮我一下"), |
| ("charlie: Claude: 你在吗?", True, "charlie", "Claude: 你在吗?"), |
| |
| # 边界情况 |
| ("[23:59:59] <user_with_underscores> 带下划线的用户名", True, "user_with_underscores", "带下划线的用户名"), |
| ("<用户中文名> 中文用户名测试", True, "用户中文名", "中文用户名测试"), |
| |
| # 无效格式 |
| ("纯文本消息", False, None, None), |
| ("", False, None, None), |
| ("< >", False, None, None), |
| ] |
| |
| passed = 0 |
| failed = 0 |
| |
| for line, should_parse, expected_user, expected_content in test_cases: |
| result = conn._parse_message(line) |
| |
| if should_parse: |
| if result and result.username == expected_user and result.content == expected_content: |
| print(f"✅ 解析成功: {{line}}") |
| passed += 1 |
| else: |
| print(f"❌ 解析失败: {{line}} - 预期: {{expected_user}}/{{expected_content}}, 实际: {{result.username if result else None}}/{{result.content if result else None}}") |
| failed += 1 |
| else: |
| if result is None: |
| print(f"✅ 正确拒绝: {{line}}") |
| passed += 1 |
| else: |
| print(f"❌ 错误解析: {{line}} - 应该拒绝但解析为: {{result.username}}/{{result.content}}") |
| failed += 1 |
| |
| print(f"\\n测试结果: 通过 {{passed}}, 失败 {{failed}}") |
| |
| if failed > 0: |
| sys.exit(1) |
| else: |
| print("✅ SSHOUT消息解析综合测试全部通过") |
| """.format(os.path.join(self.project_root, 'src'))) |
| tmp.flush() |
| |
| result = subprocess.run([ |
| sys.executable, tmp.name |
| ], capture_output=True, text=True, timeout=30) |
| |
| os.unlink(tmp.name) |
| |
| self.assertEqual(result.returncode, 0, |
| f"SSHOUT parsing test failed:\nstdout: {result.stdout}\nstderr: {result.stderr}") |
| self.assertIn("✅ SSHOUT消息解析综合测试全部通过", result.stdout) |
| |
| def test_claude_mention_detection_comprehensive(self): |
| """测试@Claude检测 - 综合测试""" |
| with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as tmp: |
| tmp.write(""" |
| import sys |
| sys.path.insert(0, r'{0}') |
| |
| from claude_agent.sshout.integration import SSHOUTConnection |
| |
| conn = SSHOUTConnection("test", 22, "test", "test") |
| |
| # 应该检测到的@Claude提及 |
| positive_cases = [ |
| "@Claude 你好", |
| "@claude 帮我一下", |
| "@CLAUDE 大写测试", |
| "Hello @Claude!", |
| "Hey @claude, how are you?", |
| "Claude: 你在吗?", |
| "claude, 有问题吗", |
| "Claude,中文逗号测试", |
| "@Claude你好吗", # 紧贴测试 |
| "问题 @Claude 在中间", |
| ] |
| |
| # 不应该检测到的情况 |
| negative_cases = [ |
| "普通消息没有提及", |
| "Claudia is a name", # 相似名字 |
| "claudine restaurant", # 包含claude但不是提及 |
| "包含claude但是没有标点的句子", |
| "ClaudeAI 连在一起", |
| "", |
| "claude123 数字后缀", |
| ] |
| |
| passed = 0 |
| failed = 0 |
| |
| print("测试正面案例(应该检测到):") |
| for case in positive_cases: |
| result = conn._is_claude_mention(case) |
| if result: |
| print(f"✅ 正确检测: {{case}}") |
| passed += 1 |
| else: |
| print(f"❌ 检测失败: {{case}}") |
| failed += 1 |
| |
| print("\\n测试负面案例(不应该检测到):") |
| for case in negative_cases: |
| result = conn._is_claude_mention(case) |
| if not result: |
| print(f"✅ 正确拒绝: {{case}}") |
| passed += 1 |
| else: |
| print(f"❌ 错误检测: {{case}}") |
| failed += 1 |
| |
| print(f"\\n测试结果: 通过 {{passed}}, 失败 {{failed}}") |
| |
| if failed > 0: |
| sys.exit(1) |
| else: |
| print("✅ @Claude检测综合测试全部通过") |
| """.format(os.path.join(self.project_root, 'src'))) |
| tmp.flush() |
| |
| result = subprocess.run([ |
| sys.executable, tmp.name |
| ], capture_output=True, text=True, timeout=30) |
| |
| os.unlink(tmp.name) |
| |
| self.assertEqual(result.returncode, 0, |
| f"Claude mention test failed:\nstdout: {result.stdout}\nstderr: {result.stderr}") |
| self.assertIn("✅ @Claude检测综合测试全部通过", result.stdout) |
| |
| |
| if __name__ == '__main__': |
| # 运行黑盒测试 |
| test_loader = unittest.TestLoader() |
| test_suite = test_loader.loadTestsFromModule(sys.modules[__name__]) |
| |
| runner = unittest.TextTestRunner(verbosity=2, buffer=True) |
| result = runner.run(test_suite) |
| |
| # 打印测试总结 |
| print("\n" + "="*60) |
| print("黑盒测试总结") |
| print("="*60) |
| print(f"总测试数: {result.testsRun}") |
| print(f"成功: {result.testsRun - len(result.failures) - len(result.errors)}") |
| print(f"失败: {len(result.failures)}") |
| print(f"错误: {len(result.errors)}") |
| |
| if result.failures: |
| print("\n失败的测试:") |
| for test, traceback in result.failures: |
| print(f" - {test}: {traceback}") |
| |
| if result.errors: |
| print("\n错误的测试:") |
| for test, traceback in result.errors: |
| print(f" - {test}: {traceback}") |
| |
| sys.exit(0 if result.wasSuccessful() else 1) |