| """ |
| Webhook集成测试 |
| 测试Webhook服务器和客户端的集成功能 |
| """ |
| |
| import pytest |
| import asyncio |
| import aiohttp |
| from datetime import datetime |
| import uvicorn |
| from multiprocessing import Process |
| import time |
| |
| from claude_agent.webhook.server import WebhookServer |
| from claude_agent.webhook.client import WebhookClient |
| from claude_agent.webhook.models import ( |
| BotMessage, UserInfo, WebhookConfig, BotRegistration, |
| MessageBroadcast |
| ) |
| |
| |
| class TestWebhookIntegration: |
| """Webhook集成测试""" |
| |
| @pytest.fixture |
| def webhook_config(self): |
| """创建测试用的Webhook配置""" |
| return WebhookConfig( |
| server_host="127.0.0.1", |
| server_port=8088, # 使用不同的端口避免冲突 |
| webhook_url="http://127.0.0.1:8088", |
| auth_token="test-webhook-token-123" |
| ) |
| |
| @pytest.fixture |
| async def webhook_server(self, webhook_config): |
| """启动测试用的Webhook服务器""" |
| server = WebhookServer(webhook_config) |
| app = server.get_app() |
| |
| # 在后台启动服务器 |
| config = uvicorn.Config( |
| app, |
| host=webhook_config.server_host, |
| port=webhook_config.server_port, |
| log_level="error" # 减少日志输出 |
| ) |
| server_instance = uvicorn.Server(config) |
| |
| # 启动服务器任务 |
| server_task = asyncio.create_task(server_instance.serve()) |
| |
| # 等待服务器启动 |
| await asyncio.sleep(1) |
| |
| yield server |
| |
| # 关闭服务器 |
| server_instance.should_exit = True |
| try: |
| await asyncio.wait_for(server_task, timeout=5) |
| except asyncio.TimeoutError: |
| server_task.cancel() |
| |
| @pytest.mark.asyncio |
| async def test_server_health_check(self, webhook_config, webhook_server): |
| """测试服务器健康检查""" |
| async with aiohttp.ClientSession() as session: |
| async with session.get(f"{webhook_config.webhook_url}/health") as response: |
| assert response.status == 200 |
| data = await response.json() |
| assert data["status"] == "healthy" |
| |
| @pytest.mark.asyncio |
| async def test_bot_registration_flow(self, webhook_config, webhook_server): |
| """测试Bot注册流程""" |
| async with aiohttp.ClientSession() as session: |
| # 测试注册 |
| registration = BotRegistration( |
| bot_username="testbot", |
| auth_token=webhook_config.auth_token, |
| subscribed_groups=[-123456, -789012] |
| ) |
| |
| headers = { |
| "Authorization": f"Bearer {webhook_config.auth_token}", |
| "Content-Type": "application/json" |
| } |
| |
| async with session.post( |
| f"{webhook_config.webhook_url}/register", |
| json=registration.dict(), |
| headers=headers |
| ) as response: |
| assert response.status == 200 |
| data = await response.json() |
| assert data["success"] is True |
| |
| # 测试获取Bot列表 |
| async with session.get( |
| f"{webhook_config.webhook_url}/bots", |
| headers=headers |
| ) as response: |
| assert response.status == 200 |
| data = await response.json() |
| assert data["success"] is True |
| bots = data["data"]["bots"] |
| assert len(bots) == 1 |
| assert bots[0]["bot_username"] == "testbot" |
| |
| # 测试注销 |
| async with session.post( |
| f"{webhook_config.webhook_url}/unregister/testbot", |
| headers=headers |
| ) as response: |
| assert response.status == 200 |
| |
| @pytest.mark.asyncio |
| async def test_message_broadcast_flow(self, webhook_config, webhook_server): |
| """测试消息广播流程""" |
| async with aiohttp.ClientSession() as session: |
| # 首先注册一个Bot |
| registration = BotRegistration( |
| bot_username="senderbot", |
| auth_token=webhook_config.auth_token, |
| subscribed_groups=[-123456] |
| ) |
| |
| headers = { |
| "Authorization": f"Bearer {webhook_config.auth_token}", |
| "Content-Type": "application/json" |
| } |
| |
| await session.post( |
| f"{webhook_config.webhook_url}/register", |
| json=registration.dict(), |
| headers=headers |
| ) |
| |
| # 创建测试消息 |
| user_info = UserInfo( |
| user_id=123, |
| username="testuser", |
| first_name="Test", |
| last_name="User", |
| is_bot=False |
| ) |
| |
| bot_message = BotMessage( |
| bot_username="senderbot", |
| group_id=-123456, |
| message_content="Hello, this is a test message!", |
| sender_info=user_info, |
| timestamp=datetime.now() |
| ) |
| |
| broadcast = MessageBroadcast(bot_message=bot_message) |
| |
| # 发送广播 |
| async with session.post( |
| f"{webhook_config.webhook_url}/broadcast", |
| json=broadcast.dict(), |
| headers=headers |
| ) as response: |
| assert response.status == 200 |
| data = await response.json() |
| assert data["success"] is True |
| |
| @pytest.mark.asyncio |
| async def test_authentication_failure(self, webhook_config, webhook_server): |
| """测试认证失败""" |
| async with aiohttp.ClientSession() as session: |
| headers = { |
| "Authorization": "Bearer invalid-token", |
| "Content-Type": "application/json" |
| } |
| |
| # 测试无效token |
| async with session.get( |
| f"{webhook_config.webhook_url}/bots", |
| headers=headers |
| ) as response: |
| assert response.status == 401 |
| |
| # 测试缺少认证头 |
| async with session.get( |
| f"{webhook_config.webhook_url}/bots" |
| ) as response: |
| assert response.status == 422 # FastAPI返回422对于缺少的依赖 |
| |
| |
| class TestWebhookClientIntegration: |
| """Webhook客户端集成测试""" |
| |
| @pytest.fixture |
| def webhook_config(self): |
| """创建测试用的Webhook配置""" |
| return WebhookConfig( |
| server_host="127.0.0.1", |
| server_port=8089, # 使用不同的端口 |
| webhook_url="http://127.0.0.1:8089", |
| auth_token="test-client-token-456" |
| ) |
| |
| @pytest.mark.asyncio |
| async def test_client_server_connection(self, webhook_config): |
| """测试客户端-服务器连接""" |
| # 启动服务器 |
| server = WebhookServer(webhook_config) |
| app = server.get_app() |
| |
| config = uvicorn.Config( |
| app, |
| host=webhook_config.server_host, |
| port=webhook_config.server_port, |
| log_level="error" |
| ) |
| server_instance = uvicorn.Server(config) |
| server_task = asyncio.create_task(server_instance.serve()) |
| |
| try: |
| # 等待服务器启动 |
| await asyncio.sleep(1) |
| |
| # 创建客户端 |
| received_messages = [] |
| |
| async def message_handler(message: BotMessage): |
| received_messages.append(message) |
| |
| client = WebhookClient( |
| config=webhook_config, |
| bot_username="testclient", |
| subscribed_groups=[-123456], |
| message_handler=message_handler |
| ) |
| |
| # 测试服务器状态检查 |
| await client.start() |
| try: |
| status = await client.check_server_status() |
| assert status is True |
| |
| # 测试获取Bot列表 |
| bots = await client.get_registered_bots() |
| assert isinstance(bots, list) |
| |
| finally: |
| await client.stop() |
| |
| finally: |
| # 关闭服务器 |
| server_instance.should_exit = True |
| try: |
| await asyncio.wait_for(server_task, timeout=5) |
| except asyncio.TimeoutError: |
| server_task.cancel() |
| |
| @pytest.mark.asyncio |
| async def test_client_message_broadcast(self, webhook_config): |
| """测试客户端消息广播""" |
| # 启动服务器 |
| server = WebhookServer(webhook_config) |
| app = server.get_app() |
| |
| config = uvicorn.Config( |
| app, |
| host=webhook_config.server_host, |
| port=webhook_config.server_port, |
| log_level="error" |
| ) |
| server_instance = uvicorn.Server(config) |
| server_task = asyncio.create_task(server_instance.serve()) |
| |
| try: |
| # 等待服务器启动 |
| await asyncio.sleep(1) |
| |
| # 创建客户端 |
| client = WebhookClient( |
| config=webhook_config, |
| bot_username="broadcastclient", |
| subscribed_groups=[-123456] |
| ) |
| |
| await client.start() |
| try: |
| # 创建测试消息 |
| user_info = UserInfo( |
| user_id=789, |
| username="broadcaster", |
| is_bot=True |
| ) |
| |
| message = BotMessage( |
| bot_username="broadcastclient", |
| group_id=-123456, |
| message_content="Broadcasting test message", |
| sender_info=user_info |
| ) |
| |
| # 发送广播 |
| result = await client.broadcast_message(message) |
| assert result is True |
| |
| finally: |
| await client.stop() |
| |
| finally: |
| # 关闭服务器 |
| server_instance.should_exit = True |
| try: |
| await asyncio.wait_for(server_task, timeout=5) |
| except asyncio.TimeoutError: |
| server_task.cancel() |
| |
| |
| if __name__ == "__main__": |
| # 运行测试 |
| pytest.main([__file__, "-v", "-s"]) |