blob: 669ef3ed138c8f0758a05bd27b9ea5e43f8cc136 [file] [log] [blame] [raw]
"""
SSHOUT实际SSH认证黑盒测试
测试真实的SSH连接、认证和基本通信功能
"""
import unittest
import asyncio
import os
import sys
import tempfile
import subprocess
from unittest import skip
class TestSSHOUTRealAuthentication(unittest.TestCase):
"""SSHOUT实际SSH认证测试"""
def setUp(self):
"""设置测试环境"""
self.project_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
def test_ssh_connection_and_authentication(self):
"""测试SSH连接和认证功能"""
# 只在特定环境变量设置时运行此测试
if not os.getenv('TEST_SSHOUT_AUTH'):
self.skipTest("需要设置 TEST_SSHOUT_AUTH=1 环境变量来运行SSH认证测试")
with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as tmp:
tmp.write(f"""
import sys
sys.path.insert(0, r'{os.path.join(self.project_root, "src")}')
import asyncio
from claude_agent.utils.config import get_config_manager
from claude_agent.sshout.integration import SSHOUTConnection
async def test_ssh_auth():
try:
# 加载配置
config_manager = get_config_manager('default')
sshout_config = config_manager.get_sshout_config()
server_config = sshout_config['server']
ssh_config = sshout_config['ssh_key']
print(f"🔌 尝试连接到 {{server_config['hostname']}}:{{server_config['port']}}")
# 创建连接
connection = SSHOUTConnection(
hostname=server_config['hostname'],
port=server_config['port'],
username=server_config['username'],
key_path=ssh_config['private_key_path'],
mention_patterns=sshout_config.get('mention_patterns', [])
)
# 尝试连接
success = await asyncio.wait_for(connection.connect(), timeout=15)
if success:
print("✅ SSH连接和认证成功")
# 测试发送消息
test_message = "[测试] Claude Agent SSH认证测试 - " + str(asyncio.get_event_loop().time())
send_success = await connection.send_message(test_message)
if send_success:
print("✅ 消息发送成功")
else:
print("❌ 消息发送失败")
sys.exit(1)
# 等待一小段时间确保消息发送完成
await asyncio.sleep(2)
# 清理连接
await connection.disconnect()
print("✅ 连接已断开")
print("✅ SSH认证测试全部通过")
else:
print("❌ SSH连接失败")
sys.exit(1)
except asyncio.TimeoutError:
print("⏰ SSH连接超时")
sys.exit(1)
except FileNotFoundError as e:
if "SSH私钥文件不存在" in str(e):
print(f"🔑 SSH私钥文件不存在: {{e}}")
print("💡 请确保SSH密钥文件存在并且路径配置正确")
else:
print(f"❌ 文件未找到: {{e}}")
sys.exit(1)
except Exception as e:
print(f"💥 SSH认证测试异常: {{e}}")
import traceback
traceback.print_exc()
sys.exit(1)
# 运行测试
asyncio.run(test_ssh_auth())
""")
tmp.flush()
result = subprocess.run([
sys.executable, tmp.name
], capture_output=True, text=True, timeout=30)
os.unlink(tmp.name)
# 输出结果供调试
if result.stdout:
print("STDOUT:", result.stdout)
if result.stderr:
print("STDERR:", result.stderr)
# 只有在真正的认证错误时才失败,其他情况(如网络不通)则跳过
if result.returncode != 0:
if "SSH私钥文件不存在" in result.stdout:
self.fail("SSH私钥文件不存在,请检查配置")
elif "认证失败" in result.stdout or "Authentication failed" in result.stderr:
self.fail("SSH认证失败,请检查密钥文件和服务器配置")
else:
# 网络问题等其他错误,跳过测试而不是失败
self.skipTest(f"SSH连接测试因外部问题跳过: {result.stdout}")
# 检查成功标志
self.assertIn("✅ SSH认证测试全部通过", result.stdout)
def test_ssh_key_file_exists_and_permissions(self):
"""测试SSH密钥文件存在性和权限"""
with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as tmp:
tmp.write(f"""
import sys
sys.path.insert(0, r'{os.path.join(self.project_root, "src")}')
import os
from claude_agent.utils.config import get_config_manager
try:
config_manager = get_config_manager('default')
sshout_config = config_manager.get_sshout_config()
key_path = sshout_config['ssh_key']['private_key_path']
if os.path.exists(key_path):
print(f"✅ SSH密钥文件存在: {{key_path}}")
# 检查文件权限
stat_info = os.stat(key_path)
permissions = stat_info.st_mode & 0o777
print(f"📋 文件权限: {{oct(permissions)}}")
# SSH私钥通常应该是 600 (只有所有者可读写)
if permissions <= 0o600:
print("✅ SSH密钥文件权限安全")
else:
print(f"⚠️ SSH密钥文件权限可能过于宽松: {{oct(permissions)}}")
print("💡 建议设置为 600: chmod 600 " + key_path)
else:
print(f"❌ SSH密钥文件不存在: {{key_path}}")
sys.exit(1)
print("✅ SSH密钥检查通过")
except Exception as e:
print(f"❌ SSH密钥检查失败: {{e}}")
sys.exit(1)
""")
tmp.flush()
result = subprocess.run([
sys.executable, tmp.name
], capture_output=True, text=True, timeout=10)
os.unlink(tmp.name)
self.assertEqual(result.returncode, 0,
f"SSH密钥检查失败:\nstdout: {result.stdout}\nstderr: {result.stderr}")
self.assertIn("✅ SSH密钥检查通过", result.stdout)
def test_network_connectivity_to_sshout_server(self):
"""测试到SSHOUT服务器的网络连通性"""
with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as tmp:
tmp.write(f"""
import sys
sys.path.insert(0, r'{os.path.join(self.project_root, "src")}')
import socket
from claude_agent.utils.config import get_config_manager
try:
config_manager = get_config_manager('default')
sshout_config = config_manager.get_sshout_config()
hostname = sshout_config['server']['hostname']
port = sshout_config['server']['port']
print(f"🌐 测试网络连通性: {{hostname}}:{{port}}")
# 创建socket连接测试
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(10)
result = sock.connect_ex((hostname, port))
sock.close()
if result == 0:
print("✅ 网络连通性测试通过")
else:
print(f"❌ 无法连接到 {{hostname}}:{{port}} (错误代码: {{result}})")
print("💡 请检查网络连接和服务器状态")
sys.exit(1)
except Exception as e:
print(f"❌ 网络连通性测试异常: {{e}}")
sys.exit(1)
""")
tmp.flush()
result = subprocess.run([
sys.executable, tmp.name
], capture_output=True, text=True, timeout=15)
os.unlink(tmp.name)
# 网络连通性问题不应该导致测试失败,而是跳过
if result.returncode != 0:
self.skipTest(f"网络连通性问题,跳过测试: {result.stdout}")
self.assertIn("✅ 网络连通性测试通过", result.stdout)
if __name__ == '__main__':
print("🧪 开始SSHOUT SSH认证黑盒测试...")
print("💡 提示:设置环境变量 TEST_SSHOUT_AUTH=1 来运行实际SSH认证测试")
unittest.main(verbosity=2)