| """ |
| SSHOUT实际SSH认证黑盒测试 |
| 测试真实的SSH连接、认证和基本通信功能 |
| """ |
| |
| import unittest |
| import asyncio |
| import os |
| import sys |
| import tempfile |
| import subprocess |
| from unittest import skip |
| |
| |
| class TestSSHOUTRealAuthentication(unittest.TestCase): |
| """SSHOUT实际SSH认证测试""" |
| |
| def setUp(self): |
| """设置测试环境""" |
| self.project_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) |
| |
| def test_ssh_connection_and_authentication(self): |
| """测试SSH连接和认证功能""" |
| # 只在特定环境变量设置时运行此测试 |
| if not os.getenv('TEST_SSHOUT_AUTH'): |
| self.skipTest("需要设置 TEST_SSHOUT_AUTH=1 环境变量来运行SSH认证测试") |
| |
| with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as tmp: |
| tmp.write(f""" |
| import sys |
| sys.path.insert(0, r'{os.path.join(self.project_root, "src")}') |
| |
| import asyncio |
| from claude_agent.utils.config import get_config_manager |
| from claude_agent.sshout.integration import SSHOUTConnection |
| |
| async def test_ssh_auth(): |
| try: |
| # 加载配置 |
| config_manager = get_config_manager('default') |
| sshout_config = config_manager.get_sshout_config() |
| |
| server_config = sshout_config['server'] |
| ssh_config = sshout_config['ssh_key'] |
| |
| print(f"🔌 尝试连接到 {{server_config['hostname']}}:{{server_config['port']}}") |
| |
| # 创建连接 |
| connection = SSHOUTConnection( |
| hostname=server_config['hostname'], |
| port=server_config['port'], |
| username=server_config['username'], |
| key_path=ssh_config['private_key_path'], |
| mention_patterns=sshout_config.get('mention_patterns', []) |
| ) |
| |
| # 尝试连接 |
| success = await asyncio.wait_for(connection.connect(), timeout=15) |
| |
| if success: |
| print("✅ SSH连接和认证成功") |
| |
| # 测试发送消息 |
| test_message = "[测试] Claude Agent SSH认证测试 - " + str(asyncio.get_event_loop().time()) |
| send_success = await connection.send_message(test_message) |
| |
| if send_success: |
| print("✅ 消息发送成功") |
| else: |
| print("❌ 消息发送失败") |
| sys.exit(1) |
| |
| # 等待一小段时间确保消息发送完成 |
| await asyncio.sleep(2) |
| |
| # 清理连接 |
| await connection.disconnect() |
| print("✅ 连接已断开") |
| |
| print("✅ SSH认证测试全部通过") |
| else: |
| print("❌ SSH连接失败") |
| sys.exit(1) |
| |
| except asyncio.TimeoutError: |
| print("⏰ SSH连接超时") |
| sys.exit(1) |
| except FileNotFoundError as e: |
| if "SSH私钥文件不存在" in str(e): |
| print(f"🔑 SSH私钥文件不存在: {{e}}") |
| print("💡 请确保SSH密钥文件存在并且路径配置正确") |
| else: |
| print(f"❌ 文件未找到: {{e}}") |
| sys.exit(1) |
| except Exception as e: |
| print(f"💥 SSH认证测试异常: {{e}}") |
| import traceback |
| traceback.print_exc() |
| sys.exit(1) |
| |
| # 运行测试 |
| asyncio.run(test_ssh_auth()) |
| """) |
| tmp.flush() |
| |
| result = subprocess.run([ |
| sys.executable, tmp.name |
| ], capture_output=True, text=True, timeout=30) |
| |
| os.unlink(tmp.name) |
| |
| # 输出结果供调试 |
| if result.stdout: |
| print("STDOUT:", result.stdout) |
| if result.stderr: |
| print("STDERR:", result.stderr) |
| |
| # 只有在真正的认证错误时才失败,其他情况(如网络不通)则跳过 |
| if result.returncode != 0: |
| if "SSH私钥文件不存在" in result.stdout: |
| self.fail("SSH私钥文件不存在,请检查配置") |
| elif "认证失败" in result.stdout or "Authentication failed" in result.stderr: |
| self.fail("SSH认证失败,请检查密钥文件和服务器配置") |
| else: |
| # 网络问题等其他错误,跳过测试而不是失败 |
| self.skipTest(f"SSH连接测试因外部问题跳过: {result.stdout}") |
| |
| # 检查成功标志 |
| self.assertIn("✅ SSH认证测试全部通过", result.stdout) |
| |
| def test_ssh_key_file_exists_and_permissions(self): |
| """测试SSH密钥文件存在性和权限""" |
| with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as tmp: |
| tmp.write(f""" |
| import sys |
| sys.path.insert(0, r'{os.path.join(self.project_root, "src")}') |
| import os |
| from claude_agent.utils.config import get_config_manager |
| |
| try: |
| config_manager = get_config_manager('default') |
| sshout_config = config_manager.get_sshout_config() |
| key_path = sshout_config['ssh_key']['private_key_path'] |
| |
| if os.path.exists(key_path): |
| print(f"✅ SSH密钥文件存在: {{key_path}}") |
| |
| # 检查文件权限 |
| stat_info = os.stat(key_path) |
| permissions = stat_info.st_mode & 0o777 |
| |
| print(f"📋 文件权限: {{oct(permissions)}}") |
| |
| # SSH私钥通常应该是 600 (只有所有者可读写) |
| if permissions <= 0o600: |
| print("✅ SSH密钥文件权限安全") |
| else: |
| print(f"⚠️ SSH密钥文件权限可能过于宽松: {{oct(permissions)}}") |
| print("💡 建议设置为 600: chmod 600 " + key_path) |
| else: |
| print(f"❌ SSH密钥文件不存在: {{key_path}}") |
| sys.exit(1) |
| |
| print("✅ SSH密钥检查通过") |
| |
| except Exception as e: |
| print(f"❌ SSH密钥检查失败: {{e}}") |
| sys.exit(1) |
| """) |
| tmp.flush() |
| |
| result = subprocess.run([ |
| sys.executable, tmp.name |
| ], capture_output=True, text=True, timeout=10) |
| |
| os.unlink(tmp.name) |
| |
| self.assertEqual(result.returncode, 0, |
| f"SSH密钥检查失败:\nstdout: {result.stdout}\nstderr: {result.stderr}") |
| self.assertIn("✅ SSH密钥检查通过", result.stdout) |
| |
| def test_network_connectivity_to_sshout_server(self): |
| """测试到SSHOUT服务器的网络连通性""" |
| with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as tmp: |
| tmp.write(f""" |
| import sys |
| sys.path.insert(0, r'{os.path.join(self.project_root, "src")}') |
| import socket |
| from claude_agent.utils.config import get_config_manager |
| |
| try: |
| config_manager = get_config_manager('default') |
| sshout_config = config_manager.get_sshout_config() |
| |
| hostname = sshout_config['server']['hostname'] |
| port = sshout_config['server']['port'] |
| |
| print(f"🌐 测试网络连通性: {{hostname}}:{{port}}") |
| |
| # 创建socket连接测试 |
| sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
| sock.settimeout(10) |
| |
| result = sock.connect_ex((hostname, port)) |
| sock.close() |
| |
| if result == 0: |
| print("✅ 网络连通性测试通过") |
| else: |
| print(f"❌ 无法连接到 {{hostname}}:{{port}} (错误代码: {{result}})") |
| print("💡 请检查网络连接和服务器状态") |
| sys.exit(1) |
| |
| except Exception as e: |
| print(f"❌ 网络连通性测试异常: {{e}}") |
| sys.exit(1) |
| """) |
| tmp.flush() |
| |
| result = subprocess.run([ |
| sys.executable, tmp.name |
| ], capture_output=True, text=True, timeout=15) |
| |
| os.unlink(tmp.name) |
| |
| # 网络连通性问题不应该导致测试失败,而是跳过 |
| if result.returncode != 0: |
| self.skipTest(f"网络连通性问题,跳过测试: {result.stdout}") |
| |
| self.assertIn("✅ 网络连通性测试通过", result.stdout) |
| |
| |
| if __name__ == '__main__': |
| print("🧪 开始SSHOUT SSH认证黑盒测试...") |
| print("💡 提示:设置环境变量 TEST_SSHOUT_AUTH=1 来运行实际SSH认证测试") |
| unittest.main(verbosity=2) |