blob: 03366a381fde1670560fbdc5230f15d070b2ebf1 [file] [log] [blame] [raw]
"""
Webhook集成测试
测试Webhook服务器和客户端的集成功能
"""
import pytest
import asyncio
import aiohttp
from datetime import datetime
import uvicorn
from multiprocessing import Process
import time
from claude_agent.webhook.server import WebhookServer
from claude_agent.webhook.client import WebhookClient
from claude_agent.webhook.models import (
BotMessage, UserInfo, WebhookConfig, BotRegistration,
MessageBroadcast
)
class TestWebhookIntegration:
"""Webhook集成测试"""
@pytest.fixture
def webhook_config(self):
"""创建测试用的Webhook配置"""
return WebhookConfig(
server_host="127.0.0.1",
server_port=8088, # 使用不同的端口避免冲突
webhook_url="http://127.0.0.1:8088",
auth_token="test-webhook-token-123"
)
@pytest.fixture
async def webhook_server(self, webhook_config):
"""启动测试用的Webhook服务器"""
server = WebhookServer(webhook_config)
app = server.get_app()
# 在后台启动服务器
config = uvicorn.Config(
app,
host=webhook_config.server_host,
port=webhook_config.server_port,
log_level="error" # 减少日志输出
)
server_instance = uvicorn.Server(config)
# 启动服务器任务
server_task = asyncio.create_task(server_instance.serve())
# 等待服务器启动
await asyncio.sleep(1)
yield server
# 关闭服务器
server_instance.should_exit = True
try:
await asyncio.wait_for(server_task, timeout=5)
except asyncio.TimeoutError:
server_task.cancel()
@pytest.mark.asyncio
async def test_server_health_check(self, webhook_config, webhook_server):
"""测试服务器健康检查"""
async with aiohttp.ClientSession() as session:
async with session.get(f"{webhook_config.webhook_url}/health") as response:
assert response.status == 200
data = await response.json()
assert data["status"] == "healthy"
@pytest.mark.asyncio
async def test_bot_registration_flow(self, webhook_config, webhook_server):
"""测试Bot注册流程"""
async with aiohttp.ClientSession() as session:
# 测试注册
registration = BotRegistration(
bot_username="testbot",
auth_token=webhook_config.auth_token,
subscribed_groups=[-123456, -789012]
)
headers = {
"Authorization": f"Bearer {webhook_config.auth_token}",
"Content-Type": "application/json"
}
async with session.post(
f"{webhook_config.webhook_url}/register",
json=registration.dict(),
headers=headers
) as response:
assert response.status == 200
data = await response.json()
assert data["success"] is True
# 测试获取Bot列表
async with session.get(
f"{webhook_config.webhook_url}/bots",
headers=headers
) as response:
assert response.status == 200
data = await response.json()
assert data["success"] is True
bots = data["data"]["bots"]
assert len(bots) == 1
assert bots[0]["bot_username"] == "testbot"
# 测试注销
async with session.post(
f"{webhook_config.webhook_url}/unregister/testbot",
headers=headers
) as response:
assert response.status == 200
@pytest.mark.asyncio
async def test_message_broadcast_flow(self, webhook_config, webhook_server):
"""测试消息广播流程"""
async with aiohttp.ClientSession() as session:
# 首先注册一个Bot
registration = BotRegistration(
bot_username="senderbot",
auth_token=webhook_config.auth_token,
subscribed_groups=[-123456]
)
headers = {
"Authorization": f"Bearer {webhook_config.auth_token}",
"Content-Type": "application/json"
}
await session.post(
f"{webhook_config.webhook_url}/register",
json=registration.dict(),
headers=headers
)
# 创建测试消息
user_info = UserInfo(
user_id=123,
username="testuser",
first_name="Test",
last_name="User",
is_bot=False
)
bot_message = BotMessage(
bot_username="senderbot",
group_id=-123456,
message_content="Hello, this is a test message!",
sender_info=user_info,
timestamp=datetime.now()
)
broadcast = MessageBroadcast(bot_message=bot_message)
# 发送广播
async with session.post(
f"{webhook_config.webhook_url}/broadcast",
json=broadcast.dict(),
headers=headers
) as response:
assert response.status == 200
data = await response.json()
assert data["success"] is True
@pytest.mark.asyncio
async def test_authentication_failure(self, webhook_config, webhook_server):
"""测试认证失败"""
async with aiohttp.ClientSession() as session:
headers = {
"Authorization": "Bearer invalid-token",
"Content-Type": "application/json"
}
# 测试无效token
async with session.get(
f"{webhook_config.webhook_url}/bots",
headers=headers
) as response:
assert response.status == 401
# 测试缺少认证头
async with session.get(
f"{webhook_config.webhook_url}/bots"
) as response:
assert response.status == 422 # FastAPI返回422对于缺少的依赖
class TestWebhookClientIntegration:
"""Webhook客户端集成测试"""
@pytest.fixture
def webhook_config(self):
"""创建测试用的Webhook配置"""
return WebhookConfig(
server_host="127.0.0.1",
server_port=8089, # 使用不同的端口
webhook_url="http://127.0.0.1:8089",
auth_token="test-client-token-456"
)
@pytest.mark.asyncio
async def test_client_server_connection(self, webhook_config):
"""测试客户端-服务器连接"""
# 启动服务器
server = WebhookServer(webhook_config)
app = server.get_app()
config = uvicorn.Config(
app,
host=webhook_config.server_host,
port=webhook_config.server_port,
log_level="error"
)
server_instance = uvicorn.Server(config)
server_task = asyncio.create_task(server_instance.serve())
try:
# 等待服务器启动
await asyncio.sleep(1)
# 创建客户端
received_messages = []
async def message_handler(message: BotMessage):
received_messages.append(message)
client = WebhookClient(
config=webhook_config,
bot_username="testclient",
subscribed_groups=[-123456],
message_handler=message_handler
)
# 测试服务器状态检查
await client.start()
try:
status = await client.check_server_status()
assert status is True
# 测试获取Bot列表
bots = await client.get_registered_bots()
assert isinstance(bots, list)
finally:
await client.stop()
finally:
# 关闭服务器
server_instance.should_exit = True
try:
await asyncio.wait_for(server_task, timeout=5)
except asyncio.TimeoutError:
server_task.cancel()
@pytest.mark.asyncio
async def test_client_message_broadcast(self, webhook_config):
"""测试客户端消息广播"""
# 启动服务器
server = WebhookServer(webhook_config)
app = server.get_app()
config = uvicorn.Config(
app,
host=webhook_config.server_host,
port=webhook_config.server_port,
log_level="error"
)
server_instance = uvicorn.Server(config)
server_task = asyncio.create_task(server_instance.serve())
try:
# 等待服务器启动
await asyncio.sleep(1)
# 创建客户端
client = WebhookClient(
config=webhook_config,
bot_username="broadcastclient",
subscribed_groups=[-123456]
)
await client.start()
try:
# 创建测试消息
user_info = UserInfo(
user_id=789,
username="broadcaster",
is_bot=True
)
message = BotMessage(
bot_username="broadcastclient",
group_id=-123456,
message_content="Broadcasting test message",
sender_info=user_info
)
# 发送广播
result = await client.broadcast_message(message)
assert result is True
finally:
await client.stop()
finally:
# 关闭服务器
server_instance.should_exit = True
try:
await asyncio.wait_for(server_task, timeout=5)
except asyncio.TimeoutError:
server_task.cancel()
if __name__ == "__main__":
# 运行测试
pytest.main([__file__, "-v", "-s"])