blob: 707c2da21b2a13fdee101b366757b5c529b86a32 [file] [log] [blame] [raw]
"""
修复的黑盒测试 - 专注于核心功能验证
"""
import unittest
import subprocess
import os
import sys
import tempfile
class TestClaudeAgentFixed(unittest.TestCase):
"""修复的Claude Agent黑盒测试"""
def setUp(self):
"""设置测试环境"""
self.project_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
self.main_script = os.path.join(self.project_root, "scripts", "main_enhanced.py")
self.src_path = os.path.join(self.project_root, "src")
def test_help_command_display(self):
"""测试帮助命令显示"""
result = subprocess.run([
sys.executable, self.main_script, "--help"
], capture_output=True, text=True, cwd=self.project_root)
self.assertEqual(result.returncode, 0)
self.assertIn("Claude Agent 增强版命令行工具", result.stdout)
self.assertIn("行编辑和历史翻查", result.stdout)
self.assertIn("SSHOUT 聊天室集成", result.stdout)
def test_file_structure_integrity(self):
"""测试文件结构完整性"""
required_files = [
"scripts/main_enhanced.py",
"src/claude_agent/__init__.py",
"src/claude_agent/core/agent.py",
"src/claude_agent/cli/enhanced_interface.py",
"src/claude_agent/sshout/integration.py",
"src/claude_agent/mcp/integration.py",
"docs/ENHANCED_FEATURES_REPORT.md"
]
missing_files = []
for file_path in required_files:
full_path = os.path.join(self.project_root, file_path)
if not os.path.exists(full_path):
missing_files.append(file_path)
self.assertEqual(len(missing_files), 0, f"Missing required files: {missing_files}")
def test_core_module_imports(self):
"""测试核心模块导入"""
with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as tmp:
tmp.write(f'''
import sys
sys.path.insert(0, r"{self.src_path}")
try:
from claude_agent.cli.enhanced_interface import EnhancedCLIInterface
print("✅ CLI模块导入成功")
from claude_agent.sshout.integration import SSHOUTIntegration
print("✅ SSHOUT模块导入成功")
from claude_agent.core.agent import AgentCore, ThinkingMode
print("✅ Agent核心模块导入成功")
from claude_agent.mcp.integration import MCPToolIntegration
print("✅ MCP集成模块导入成功")
print("✅ 所有关键模块导入测试通过")
except Exception as e:
print(f"❌ 模块导入失败: {{e}}")
import traceback
traceback.print_exc()
sys.exit(1)
''')
tmp.flush()
result = subprocess.run([
sys.executable, tmp.name
], capture_output=True, text=True, timeout=30)
os.unlink(tmp.name)
self.assertEqual(result.returncode, 0, f"Import test failed: {result.stderr}")
self.assertIn("✅ 所有关键模块导入测试通过", result.stdout)
def test_sshout_message_parsing(self):
"""测试SSHOUT消息解析功能"""
with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as tmp:
tmp.write(f'''
import sys
sys.path.insert(0, r"{self.src_path}")
from claude_agent.sshout.integration import SSHOUTConnection
# 创建测试连接
conn = SSHOUTConnection("test", 22, "test", "test")
# 测试用例
test_cases = [
("[14:30:25] <user1> 普通消息", True, "user1", "普通消息"),
("<user2> 简单格式消息", True, "user2", "简单格式消息"),
("user3: 冒号格式消息", True, "user3", "冒号格式消息"),
("无效格式消息", False, None, None),
]
passed = 0
failed = 0
for line, should_parse, expected_user, expected_content in test_cases:
result = conn._parse_message(line)
if should_parse:
if result and result.username == expected_user and result.content == expected_content:
print(f"✅ 解析成功: {{line}}")
passed += 1
else:
print(f"❌ 解析失败: {{line}}")
failed += 1
else:
if result is None:
print(f"✅ 正确拒绝: {{line}}")
passed += 1
else:
print(f"❌ 错误解析: {{line}}")
failed += 1
print(f"测试结果: 通过 {{passed}}, 失败 {{failed}}")
if failed > 0:
sys.exit(1)
else:
print("✅ SSHOUT消息解析测试全部通过")
''')
tmp.flush()
result = subprocess.run([
sys.executable, tmp.name
], capture_output=True, text=True, timeout=30)
os.unlink(tmp.name)
self.assertEqual(result.returncode, 0, f"SSHOUT parsing test failed: {result.stderr}")
self.assertIn("✅ SSHOUT消息解析测试全部通过", result.stdout)
def test_claude_mention_detection(self):
"""测试@Claude检测功能"""
with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as tmp:
tmp.write(f'''
import sys
sys.path.insert(0, r"{self.src_path}")
from claude_agent.sshout.integration import SSHOUTConnection
conn = SSHOUTConnection("test", 22, "test", "test")
# 应该检测到的@Claude提及
positive_cases = [
"@Claude 你好",
"@claude 帮我一下",
"@CLAUDE 大写测试",
"Claude: 你在吗?",
"claude, 有问题吗",
]
# 不应该检测到的情况
negative_cases = [
"普通消息没有提及",
"Claudia is a name",
"包含claude但是没有标点的句子",
]
passed = 0
failed = 0
print("测试正面案例(应该检测到):")
for case in positive_cases:
result = conn._is_claude_mention(case)
if result:
print(f"✅ 正确检测: {{case}}")
passed += 1
else:
print(f"❌ 检测失败: {{case}}")
failed += 1
print("\\n测试负面案例(不应该检测到):")
for case in negative_cases:
result = conn._is_claude_mention(case)
if not result:
print(f"✅ 正确拒绝: {{case}}")
passed += 1
else:
print(f"❌ 错误检测: {{case}}")
failed += 1
print(f"\\n测试结果: 通过 {{passed}}, 失败 {{failed}}")
if failed > 0:
sys.exit(1)
else:
print("✅ @Claude检测测试全部通过")
''')
tmp.flush()
result = subprocess.run([
sys.executable, tmp.name
], capture_output=True, text=True, timeout=30)
os.unlink(tmp.name)
self.assertEqual(result.returncode, 0, f"Claude mention test failed: {result.stderr}")
self.assertIn("✅ @Claude检测测试全部通过", result.stdout)
def test_dependency_availability(self):
"""测试依赖可用性"""
with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as tmp:
tmp.write('''
try:
# 测试核心依赖
import click
print("✅ click 导入成功")
import rich
from rich.console import Console
print("✅ rich 导入成功")
import prompt_toolkit
print("✅ prompt_toolkit 导入成功")
import paramiko
print("✅ paramiko 导入成功")
import asyncio
print("✅ asyncio 导入成功")
print("✅ 所有依赖检查通过")
except Exception as e:
print(f"❌ 依赖检查失败: {{e}}")
sys.exit(1)
''')
tmp.flush()
result = subprocess.run([
sys.executable, tmp.name
], capture_output=True, text=True, timeout=30)
os.unlink(tmp.name)
self.assertEqual(result.returncode, 0, f"Dependency test failed: {result.stderr}")
self.assertIn("✅ 所有依赖检查通过", result.stdout)
if __name__ == '__main__':
unittest.main(verbosity=2)