blob: a247cb5e08b8ed2806484ccc38ae24a78ed1ad5e [file] [log] [blame] [raw]
"""
Webhook客户端
Bot用于连接Webhook服务器和处理消息的客户端
"""
import asyncio
import logging
from typing import Optional, Callable, Dict, Any
from datetime import datetime
import aiohttp
from fastapi import FastAPI, Request, HTTPException
from .models import (
BotMessage, BotRegistration, WebhookResponse,
MessageBroadcast, WebhookConfig
)
logger = logging.getLogger(__name__)
class WebhookClient:
"""Webhook客户端"""
def __init__(
self,
config: WebhookConfig,
bot_username: str,
subscribed_groups: list[int],
message_handler: Optional[Callable[[BotMessage], None]] = None
):
"""
初始化Webhook客户端
Args:
config: Webhook配置
bot_username: Bot用户名
subscribed_groups: 订阅的群组ID列表
message_handler: 消息处理回调函数
"""
self.config = config
self.bot_username = bot_username
self.subscribed_groups = subscribed_groups
self.message_handler = message_handler
# HTTP会话
self.session: Optional[aiohttp.ClientSession] = None
# 服务器状态
self.is_registered = False
self.registration_data: Optional[BotRegistration] = None
# 回调服务器(如果需要接收消息)
self.callback_app: Optional[FastAPI] = None
self.callback_port: Optional[int] = None
self.callback_server_thread: Optional[Any] = None
async def start(self, callback_port: Optional[int] = None):
"""启动客户端"""
# 创建HTTP会话
timeout = aiohttp.ClientTimeout(
total=self.config.connection_timeout,
connect=self.config.request_timeout
)
self.session = aiohttp.ClientSession(timeout=timeout)
# 如果需要接收消息,启动回调服务器
if callback_port and self.message_handler:
self.callback_port = callback_port
await self._setup_callback_server()
# 注册到Webhook服务器
await self._register()
logger.info(f"Webhook客户端 {self.bot_username} 启动成功")
async def stop(self):
"""停止客户端"""
# 注销
if self.is_registered:
await self._unregister()
# 关闭HTTP会话
if self.session:
await self.session.close()
self.session = None
logger.info(f"Webhook客户端 {self.bot_username} 已停止")
async def broadcast_message(
self,
message: BotMessage,
target_groups: Optional[list[int]] = None,
exclude_bots: Optional[list[str]] = None
) -> bool:
"""广播消息到Webhook服务器"""
if not self.session or not self.is_registered:
logger.error("客户端未启动或未注册")
return False
try:
# 构建广播请求
broadcast = MessageBroadcast(
bot_message=message,
target_groups=target_groups,
exclude_bots=exclude_bots or []
)
# 发送请求
headers = {
"Authorization": f"Bearer {self.config.auth_token}",
"Content-Type": "application/json"
}
async with self.session.post(
f"{self.config.webhook_url}/broadcast",
json=broadcast.model_dump(mode='json'),
headers=headers
) as response:
if response.status == 200:
logger.debug(f"消息广播成功: {message.message_id}")
return True
else:
logger.error(f"消息广播失败: HTTP {response.status}")
return False
except Exception as e:
logger.error(f"消息广播异常: {e}")
return False
async def _register(self):
"""注册到Webhook服务器"""
if not self.session or not self.config.webhook_url:
raise RuntimeError("Session not initialized or webhook URL not configured")
try:
# 构建注册信息
webhook_endpoint = None
if self.callback_port:
# 从环境变量读取主机名,Docker 环境使用容器名
import os
hostname = os.environ.get('WEBHOOK_CALLBACK_HOST', 'localhost')
webhook_endpoint = f"http://{hostname}:{self.callback_port}/webhook"
registration = BotRegistration(
bot_username=self.bot_username,
auth_token=self.config.auth_token,
subscribed_groups=self.subscribed_groups,
webhook_endpoint=webhook_endpoint
)
# 发送注册请求
headers = {
"Authorization": f"Bearer {self.config.auth_token}",
"Content-Type": "application/json"
}
async with self.session.post(
f"{self.config.webhook_url}/register",
json=registration.model_dump(mode='json'),
headers=headers
) as response:
if response.status == 200:
self.is_registered = True
self.registration_data = registration
logger.info(f"Bot {self.bot_username} 注册成功")
else:
error_text = await response.text()
raise Exception(f"注册失败: HTTP {response.status}, {error_text}")
except Exception as e:
logger.error(f"注册到Webhook服务器失败: {e}")
raise
async def _unregister(self):
"""从Webhook服务器注销"""
if not self.session or not self.is_registered:
return
try:
headers = {
"Authorization": f"Bearer {self.config.auth_token}",
"Content-Type": "application/json"
}
async with self.session.post(
f"{self.config.webhook_url}/unregister/{self.bot_username}",
headers=headers
) as response:
if response.status == 200:
logger.info(f"Bot {self.bot_username} 注销成功")
else:
logger.warning(f"注销失败: HTTP {response.status}")
except Exception as e:
logger.error(f"注销失败: {e}")
finally:
self.is_registered = False
self.registration_data = None
async def _setup_callback_server(self):
"""设置回调服务器"""
self.callback_app = FastAPI(
title=f"Bot {self.bot_username} Webhook Callback",
description="接收来自Webhook服务器的消息"
)
@self.callback_app.post("/webhook")
async def receive_message(request: Request):
"""接收Webhook消息"""
try:
# 验证认证头
auth_header = request.headers.get("Authorization")
if not auth_header or not auth_header.startswith("Bearer "):
raise HTTPException(status_code=401, detail="Missing or invalid authorization header")
token = auth_header.split(" ", 1)[1]
if token != self.config.auth_token:
raise HTTPException(status_code=401, detail="Invalid token")
# 解析消息
data = await request.json()
message_data = data.get("message")
if not message_data:
raise HTTPException(status_code=400, detail="Missing message data")
# 创建消息对象
message = BotMessage(**message_data)
# 调用消息处理器
if self.message_handler:
try:
if asyncio.iscoroutinefunction(self.message_handler):
await self.message_handler(message)
else:
self.message_handler(message)
except Exception as e:
logger.error(f"消息处理器执行失败: {e}")
return {"status": "success", "timestamp": datetime.now().isoformat()}
except Exception as e:
logger.error(f"处理Webhook消息失败: {e}")
raise HTTPException(status_code=500, detail=str(e))
@self.callback_app.get("/health")
async def health():
"""健康检查"""
return {"status": "healthy", "bot": self.bot_username}
# 启动回调服务器
import uvicorn
import asyncio
from threading import Thread
def run_server():
"""在后台线程中运行回调服务器"""
uvicorn.run(
self.callback_app,
host="0.0.0.0", # 监听所有网络接口,允许容器间访问
port=self.callback_port,
log_level="warning", # 减少日志噪音
access_log=False
)
# 在后台线程启动服务器
self.callback_server_thread = Thread(target=run_server, daemon=True)
self.callback_server_thread.start()
# 等待一秒确保服务器启动
await asyncio.sleep(1)
logger.info(f"回调服务器已启动在端口 {self.callback_port}")
def get_callback_app(self) -> Optional[FastAPI]:
"""获取回调应用实例"""
return self.callback_app
async def check_server_status(self) -> bool:
"""检查Webhook服务器状态"""
if not self.session or not self.config.webhook_url:
return False
try:
async with self.session.get(f"{self.config.webhook_url}/health") as response:
return response.status == 200
except Exception:
return False
async def get_registered_bots(self) -> Optional[list[Dict[str, Any]]]:
"""获取已注册的Bot列表"""
if not self.session or not self.config.webhook_url:
return None
try:
headers = {
"Authorization": f"Bearer {self.config.auth_token}",
"Content-Type": "application/json"
}
async with self.session.get(
f"{self.config.webhook_url}/bots",
headers=headers
) as response:
if response.status == 200:
data = await response.json()
return data.get("data", {}).get("bots", [])
else:
logger.warning(f"获取Bot列表失败: HTTP {response.status}")
return None
except Exception as e:
logger.error(f"获取Bot列表异常: {e}")
return None