blob: 633ad9aff19a472991579b112048a1089cf8ca3b [file] [log] [blame] [raw]
"""
Claude Agent 增强功能黑盒测试套件
端到端测试CLI工具的完整功能
"""
import unittest
import asyncio
import subprocess
import os
import sys
import time
import signal
import tempfile
from unittest.mock import patch, Mock
# 添加项目根目录到Python路径
project_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
sys.path.insert(0, os.path.join(project_root, 'src'))
class TestClaudeAgentBlackBox(unittest.TestCase):
"""Claude Agent 黑盒测试"""
def setUp(self):
"""设置测试环境"""
self.project_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
self.main_script = os.path.join(self.project_root, "scripts", "main_enhanced.py")
self.test_api_key = "test-api-key-12345"
def test_help_command_display(self):
"""测试帮助命令显示"""
result = subprocess.run([
sys.executable, self.main_script, "--help"
], capture_output=True, text=True, cwd=self.project_root)
self.assertEqual(result.returncode, 0)
self.assertIn("Claude Agent 增强版命令行工具", result.stdout)
self.assertIn("行编辑和历史翻查", result.stdout)
self.assertIn("SSHOUT 聊天室集成", result.stdout)
self.assertIn("@Claude 自动响应", result.stdout)
self.assertIn("--sshout", result.stdout)
self.assertIn("--mode", result.stdout)
def test_version_and_basic_startup(self):
"""测试基本启动和版本信息"""
# 测试快速退出(非交互模式)
with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as tmp:
tmp.write("""
import sys
import os
sys.path.insert(0, r'{0}')
# 设置环境变量
os.environ['ANTHROPIC_API_KEY'] = '{1}'
try:
from claude_agent.cli.enhanced_interface import EnhancedCLIInterface
print("✅ CLI模块导入成功")
from claude_agent.sshout.integration import SSHOUTIntegration
print("✅ SSHOUT模块导入成功")
from claude_agent.core.agent import AgentCore, ThinkingMode
print("✅ Agent核心模块导入成功")
print("✅ 所有关键模块导入测试通过")
except Exception as e:
print(f"❌ 模块导入失败: {{e}}")
sys.exit(1)
""".format(os.path.join(self.project_root, 'src'), self.test_api_key))
tmp.flush()
result = subprocess.run([
sys.executable, tmp.name
], capture_output=True, text=True, timeout=30)
os.unlink(tmp.name)
self.assertEqual(result.returncode, 0, f"Import test failed: {result.stderr}")
self.assertIn("✅ 所有关键模块导入测试通过", result.stdout)
def test_command_line_arguments_parsing(self):
"""测试命令行参数解析"""
test_cases = [
(["--mode", "yolo", "--help"], 0),
(["--mode", "interactive", "--help"], 0),
(["--api-key", "test-key", "--help"], 0),
(["--sshout", "--help"], 0),
(["--no-interactive", "--help"], 0),
]
for args, expected_code in test_cases:
with self.subTest(args=args):
result = subprocess.run([
sys.executable, self.main_script
] + args, capture_output=True, text=True, cwd=self.project_root)
self.assertEqual(result.returncode, expected_code)
def test_invalid_arguments(self):
"""测试无效参数处理"""
invalid_cases = [
["--mode", "invalid_mode"],
["--invalid-option"],
]
for args in invalid_cases:
with self.subTest(args=args):
result = subprocess.run([
sys.executable, self.main_script
] + args, capture_output=True, text=True, cwd=self.project_root)
# Click 通常返回2作为参数错误的退出码
self.assertNotEqual(result.returncode, 0)
def test_sshout_integration_imports(self):
"""测试SSHOUT集成相关导入"""
with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as tmp:
tmp.write("""
import sys
sys.path.insert(0, r'{0}')
try:
from claude_agent.sshout.integration import SSHOUTConnection, SSHOUTMessage, SSHOUTIntegration
print("✅ SSHOUT核心类导入成功")
# 测试创建连接对象
conn = SSHOUTConnection("test.example.com", 22, "testuser", "/path/to/key")
print("✅ SSHOUT连接对象创建成功")
# 测试消息对象
from datetime import datetime
msg = SSHOUTMessage(datetime.now(), "user", "content")
print("✅ SSHOUT消息对象创建成功")
# 测试消息解析功能
test_lines = [
"[14:30:25] <user1> 大家好!",
"<user2> @Claude 你好吗?",
"user3: 这是一条测试消息"
]
for line in test_lines:
parsed = conn._parse_message(line)
if parsed:
print(f"✅ 消息解析成功: {{line}}")
else:
print(f"❌ 消息解析失败: {{line}}")
# 测试@Claude检测
claude_mentions = [
"@Claude 你好",
"@claude 帮我一下",
"Claude: 你在吗?"
]
for mention in claude_mentions:
if conn._is_claude_mention(mention):
print(f"✅ @Claude检测成功: {{mention}}")
else:
print(f"❌ @Claude检测失败: {{mention}}")
print("✅ SSHOUT功能测试全部通过")
except Exception as e:
print(f"❌ SSHOUT测试失败: {{e}}")
import traceback
traceback.print_exc()
sys.exit(1)
""".format(os.path.join(self.project_root, 'src')))
tmp.flush()
result = subprocess.run([
sys.executable, tmp.name
], capture_output=True, text=True, timeout=30)
os.unlink(tmp.name)
self.assertEqual(result.returncode, 0, f"SSHOUT test failed: {result.stderr}")
self.assertIn("✅ SSHOUT功能测试全部通过", result.stdout)
def test_cli_enhanced_features(self):
"""测试CLI增强功能"""
with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as tmp:
tmp.write("""
import sys
sys.path.insert(0, r'{0}')
try:
from claude_agent.cli.enhanced_interface import EnhancedCLIInterface
from unittest.mock import patch, Mock
# 测试CLI初始化
with patch('claude_agent.cli.enhanced_interface.PromptSession'):
with patch('claude_agent.cli.enhanced_interface.Console'):
cli = EnhancedCLIInterface()
print("✅ 增强CLI对象创建成功")
# 测试命令补全器
expected_commands = [
'help', 'mode', 'history', 'clear', 'tools', 'status',
'sshout', 'connect', 'disconnect', 'send'
]
completer_words = cli.command_completer.words
for command in expected_commands:
if command in completer_words:
print(f"✅ 命令补全包含: {{command}}")
else:
print(f"❌ 命令补全缺失: {{command}}")
print("✅ CLI增强功能测试通过")
except Exception as e:
print(f"❌ CLI测试失败: {{e}}")
import traceback
traceback.print_exc()
sys.exit(1)
""".format(os.path.join(self.project_root, 'src')))
tmp.flush()
result = subprocess.run([
sys.executable, tmp.name
], capture_output=True, text=True, timeout=30)
os.unlink(tmp.name)
self.assertEqual(result.returncode, 0, f"CLI test failed: {result.stderr}")
self.assertIn("✅ CLI增强功能测试通过", result.stdout)
def test_agent_core_functionality(self):
"""测试Agent核心功能"""
with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as tmp:
tmp.write("""
import sys
import asyncio
sys.path.insert(0, r'{0}')
async def test_agent():
try:
from claude_agent.core.agent import AgentCore, ThinkingMode
# 测试Agent创建
agent = AgentCore(api_key="{1}")
print("✅ Agent核心对象创建成功")
# 测试思考模式设置
agent.set_thinking_mode(ThinkingMode.YOLO)
print("✅ YOLO模式设置成功")
agent.set_thinking_mode(ThinkingMode.INTERACTIVE)
print("✅ 交互模式设置成功")
# 测试历史管理
agent.clear_history()
print("✅ 历史清空功能正常")
history = agent.get_conversation_history()
print(f"✅ 历史获取功能正常,当前历史条数: {{len(history)}}")
print("✅ Agent核心功能测试全部通过")
except Exception as e:
print(f"❌ Agent测试失败: {{e}}")
import traceback
traceback.print_exc()
sys.exit(1)
# 运行异步测试
asyncio.run(test_agent())
""".format(os.path.join(self.project_root, 'src'), self.test_api_key))
tmp.flush()
result = subprocess.run([
sys.executable, tmp.name
], capture_output=True, text=True, timeout=30)
os.unlink(tmp.name)
self.assertEqual(result.returncode, 0, f"Agent test failed: {result.stderr}")
self.assertIn("✅ Agent核心功能测试全部通过", result.stdout)
def test_mcp_integration_imports(self):
"""测试MCP集成导入"""
with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as tmp:
tmp.write("""
import sys
sys.path.insert(0, r'{0}')
try:
from claude_agent.mcp.integration import MCPToolIntegration, MCPToolManager
print("✅ MCP集成模块导入成功")
# 测试工具管理器创建
tool_manager = MCPToolManager()
print("✅ MCP工具管理器创建成功")
# 测试获取可用工具
tools = tool_manager.get_available_tools()
print(f"✅ 获取可用工具成功,工具数量: {{len(tools)}}")
print("✅ MCP集成测试通过")
except Exception as e:
print(f"❌ MCP测试失败: {{e}}")
import traceback
traceback.print_exc()
sys.exit(1)
""".format(os.path.join(self.project_root, 'src')))
tmp.flush()
result = subprocess.run([
sys.executable, tmp.name
], capture_output=True, text=True, timeout=30)
os.unlink(tmp.name)
self.assertEqual(result.returncode, 0, f"MCP test failed: {result.stderr}")
self.assertIn("✅ MCP集成测试通过", result.stdout)
def test_file_structure_integrity(self):
"""测试文件结构完整性"""
required_files = [
"scripts/main_enhanced.py",
"src/claude_agent/__init__.py",
"src/claude_agent/core/agent.py",
"src/claude_agent/cli/enhanced_interface.py",
"src/claude_agent/sshout/integration.py",
"src/claude_agent/mcp/integration.py",
"docs/ENHANCED_FEATURES_REPORT.md"
]
missing_files = []
for file_path in required_files:
full_path = os.path.join(self.project_root, file_path)
if not os.path.exists(full_path):
missing_files.append(file_path)
self.assertEqual(len(missing_files), 0,
f"Missing required files: {missing_files}")
def test_dependency_availability(self):
"""测试依赖可用性"""
with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as tmp:
tmp.write("""
try:
# 测试核心依赖
import click
print("✅ click 导入成功")
import rich
from rich.console import Console
from rich.panel import Panel
print("✅ rich 导入成功")
import prompt_toolkit
from prompt_toolkit import PromptSession
from prompt_toolkit.history import InMemoryHistory
print("✅ prompt_toolkit 导入成功")
import paramiko
print("✅ paramiko 导入成功")
import asyncio
print("✅ asyncio 导入成功")
print("✅ 所有依赖检查通过")
except Exception as e:
print(f"❌ 依赖检查失败: {{e}}")
import traceback
traceback.print_exc()
sys.exit(1)
""")
tmp.flush()
result = subprocess.run([
sys.executable, tmp.name
], capture_output=True, text=True, timeout=30)
os.unlink(tmp.name)
self.assertEqual(result.returncode, 0, f"Dependency test failed: {result.stderr}")
self.assertIn("✅ 所有依赖检查通过", result.stdout)
class TestSSHOUTBlackBox(unittest.TestCase):
"""SSHOUT集成黑盒测试"""
def setUp(self):
"""设置测试环境"""
self.project_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
def test_sshout_message_parsing_comprehensive(self):
"""测试SSHOUT消息解析 - 综合测试"""
with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as tmp:
tmp.write("""
import sys
sys.path.insert(0, r'{0}')
from claude_agent.sshout.integration import SSHOUTConnection
# 创建测试连接
conn = SSHOUTConnection("test", 22, "test", "test")
# 综合测试用例
test_cases = [
# 标准格式
("[14:30:25] <user1> 普通消息", True, "user1", "普通消息"),
("<user2> 简单格式消息", True, "user2", "简单格式消息"),
("user3: 冒号格式消息", True, "user3", "冒号格式消息"),
# @Claude提及测试
("[15:20:30] <alice> @Claude 你好吗?", True, "alice", "@Claude 你好吗?"),
("<bob> claude, 帮我一下", True, "bob", "claude, 帮我一下"),
("charlie: Claude: 你在吗?", True, "charlie", "Claude: 你在吗?"),
# 边界情况
("[23:59:59] <user_with_underscores> 带下划线的用户名", True, "user_with_underscores", "带下划线的用户名"),
("<用户中文名> 中文用户名测试", True, "用户中文名", "中文用户名测试"),
# 无效格式
("纯文本消息", False, None, None),
("", False, None, None),
("< >", False, None, None),
]
passed = 0
failed = 0
for line, should_parse, expected_user, expected_content in test_cases:
result = conn._parse_message(line)
if should_parse:
if result and result.username == expected_user and result.content == expected_content:
print(f"✅ 解析成功: {{line}}")
passed += 1
else:
print(f"❌ 解析失败: {{line}} - 预期: {{expected_user}}/{{expected_content}}, 实际: {{result.username if result else None}}/{{result.content if result else None}}")
failed += 1
else:
if result is None:
print(f"✅ 正确拒绝: {{line}}")
passed += 1
else:
print(f"❌ 错误解析: {{line}} - 应该拒绝但解析为: {{result.username}}/{{result.content}}")
failed += 1
print(f"\\n测试结果: 通过 {{passed}}, 失败 {{failed}}")
if failed > 0:
sys.exit(1)
else:
print("✅ SSHOUT消息解析综合测试全部通过")
""".format(os.path.join(self.project_root, 'src')))
tmp.flush()
result = subprocess.run([
sys.executable, tmp.name
], capture_output=True, text=True, timeout=30)
os.unlink(tmp.name)
self.assertEqual(result.returncode, 0,
f"SSHOUT parsing test failed:\nstdout: {result.stdout}\nstderr: {result.stderr}")
self.assertIn("✅ SSHOUT消息解析综合测试全部通过", result.stdout)
def test_claude_mention_detection_comprehensive(self):
"""测试@Claude检测 - 综合测试"""
with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as tmp:
tmp.write("""
import sys
sys.path.insert(0, r'{0}')
from claude_agent.sshout.integration import SSHOUTConnection
conn = SSHOUTConnection("test", 22, "test", "test")
# 应该检测到的@Claude提及
positive_cases = [
"@Claude 你好",
"@claude 帮我一下",
"@CLAUDE 大写测试",
"Hello @Claude!",
"Hey @claude, how are you?",
"Claude: 你在吗?",
"claude, 有问题吗",
"Claude,中文逗号测试",
"@Claude你好吗", # 紧贴测试
"问题 @Claude 在中间",
]
# 不应该检测到的情况
negative_cases = [
"普通消息没有提及",
"Claudia is a name", # 相似名字
"claudine restaurant", # 包含claude但不是提及
"包含claude但是没有标点的句子",
"ClaudeAI 连在一起",
"",
"claude123 数字后缀",
]
passed = 0
failed = 0
print("测试正面案例(应该检测到):")
for case in positive_cases:
result = conn._is_claude_mention(case)
if result:
print(f"✅ 正确检测: {{case}}")
passed += 1
else:
print(f"❌ 检测失败: {{case}}")
failed += 1
print("\\n测试负面案例(不应该检测到):")
for case in negative_cases:
result = conn._is_claude_mention(case)
if not result:
print(f"✅ 正确拒绝: {{case}}")
passed += 1
else:
print(f"❌ 错误检测: {{case}}")
failed += 1
print(f"\\n测试结果: 通过 {{passed}}, 失败 {{failed}}")
if failed > 0:
sys.exit(1)
else:
print("✅ @Claude检测综合测试全部通过")
""".format(os.path.join(self.project_root, 'src')))
tmp.flush()
result = subprocess.run([
sys.executable, tmp.name
], capture_output=True, text=True, timeout=30)
os.unlink(tmp.name)
self.assertEqual(result.returncode, 0,
f"Claude mention test failed:\nstdout: {result.stdout}\nstderr: {result.stderr}")
self.assertIn("✅ @Claude检测综合测试全部通过", result.stdout)
if __name__ == '__main__':
# 运行黑盒测试
test_loader = unittest.TestLoader()
test_suite = test_loader.loadTestsFromModule(sys.modules[__name__])
runner = unittest.TextTestRunner(verbosity=2, buffer=True)
result = runner.run(test_suite)
# 打印测试总结
print("\n" + "="*60)
print("黑盒测试总结")
print("="*60)
print(f"总测试数: {result.testsRun}")
print(f"成功: {result.testsRun - len(result.failures) - len(result.errors)}")
print(f"失败: {len(result.failures)}")
print(f"错误: {len(result.errors)}")
if result.failures:
print("\n失败的测试:")
for test, traceback in result.failures:
print(f" - {test}: {traceback}")
if result.errors:
print("\n错误的测试:")
for test, traceback in result.errors:
print(f" - {test}: {traceback}")
sys.exit(0 if result.wasSuccessful() else 1)