| """ |
| Webhook客户端 |
| Bot用于连接Webhook服务器和处理消息的客户端 |
| """ |
| |
| import asyncio |
| import logging |
| from typing import Optional, Callable, Dict, Any |
| from datetime import datetime |
| import aiohttp |
| from fastapi import FastAPI, Request, HTTPException |
| |
| from .models import ( |
| BotMessage, BotRegistration, WebhookResponse, |
| MessageBroadcast, WebhookConfig |
| ) |
| |
| |
| logger = logging.getLogger(__name__) |
| |
| |
| class WebhookClient: |
| """Webhook客户端""" |
| |
| def __init__( |
| self, |
| config: WebhookConfig, |
| bot_username: str, |
| subscribed_groups: list[int], |
| message_handler: Optional[Callable[[BotMessage], None]] = None |
| ): |
| """ |
| 初始化Webhook客户端 |
| |
| Args: |
| config: Webhook配置 |
| bot_username: Bot用户名 |
| subscribed_groups: 订阅的群组ID列表 |
| message_handler: 消息处理回调函数 |
| """ |
| self.config = config |
| self.bot_username = bot_username |
| self.subscribed_groups = subscribed_groups |
| self.message_handler = message_handler |
| |
| # HTTP会话 |
| self.session: Optional[aiohttp.ClientSession] = None |
| |
| # 服务器状态 |
| self.is_registered = False |
| self.registration_data: Optional[BotRegistration] = None |
| |
| # 回调服务器(如果需要接收消息) |
| self.callback_app: Optional[FastAPI] = None |
| self.callback_port: Optional[int] = None |
| self.callback_server_thread: Optional[Any] = None |
| |
| async def start(self, callback_port: Optional[int] = None): |
| """启动客户端""" |
| # 创建HTTP会话 |
| timeout = aiohttp.ClientTimeout( |
| total=self.config.connection_timeout, |
| connect=self.config.request_timeout |
| ) |
| self.session = aiohttp.ClientSession(timeout=timeout) |
| |
| # 如果需要接收消息,启动回调服务器 |
| if callback_port and self.message_handler: |
| self.callback_port = callback_port |
| await self._setup_callback_server() |
| |
| # 注册到Webhook服务器 |
| await self._register() |
| |
| logger.info(f"Webhook客户端 {self.bot_username} 启动成功") |
| |
| async def stop(self): |
| """停止客户端""" |
| # 注销 |
| if self.is_registered: |
| await self._unregister() |
| |
| # 关闭HTTP会话 |
| if self.session: |
| await self.session.close() |
| self.session = None |
| |
| logger.info(f"Webhook客户端 {self.bot_username} 已停止") |
| |
| async def broadcast_message( |
| self, |
| message: BotMessage, |
| target_groups: Optional[list[int]] = None, |
| exclude_bots: Optional[list[str]] = None |
| ) -> bool: |
| """广播消息到Webhook服务器""" |
| if not self.session or not self.is_registered: |
| logger.error("客户端未启动或未注册") |
| return False |
| |
| try: |
| # 构建广播请求 |
| broadcast = MessageBroadcast( |
| bot_message=message, |
| target_groups=target_groups, |
| exclude_bots=exclude_bots or [] |
| ) |
| |
| # 发送请求 |
| headers = { |
| "Authorization": f"Bearer {self.config.auth_token}", |
| "Content-Type": "application/json" |
| } |
| |
| async with self.session.post( |
| f"{self.config.webhook_url}/broadcast", |
| json=broadcast.model_dump(mode='json'), |
| headers=headers |
| ) as response: |
| if response.status == 200: |
| logger.debug(f"消息广播成功: {message.message_id}") |
| return True |
| else: |
| logger.error(f"消息广播失败: HTTP {response.status}") |
| return False |
| |
| except Exception as e: |
| logger.error(f"消息广播异常: {e}") |
| return False |
| |
| async def _register(self): |
| """注册到Webhook服务器""" |
| if not self.session or not self.config.webhook_url: |
| raise RuntimeError("Session not initialized or webhook URL not configured") |
| |
| try: |
| # 构建注册信息 |
| webhook_endpoint = None |
| if self.callback_port: |
| # 从环境变量读取主机名,Docker 环境使用容器名 |
| import os |
| hostname = os.environ.get('WEBHOOK_CALLBACK_HOST', 'localhost') |
| webhook_endpoint = f"http://{hostname}:{self.callback_port}/webhook" |
| |
| registration = BotRegistration( |
| bot_username=self.bot_username, |
| auth_token=self.config.auth_token, |
| subscribed_groups=self.subscribed_groups, |
| webhook_endpoint=webhook_endpoint |
| ) |
| |
| # 发送注册请求 |
| headers = { |
| "Authorization": f"Bearer {self.config.auth_token}", |
| "Content-Type": "application/json" |
| } |
| |
| async with self.session.post( |
| f"{self.config.webhook_url}/register", |
| json=registration.model_dump(mode='json'), |
| headers=headers |
| ) as response: |
| if response.status == 200: |
| self.is_registered = True |
| self.registration_data = registration |
| logger.info(f"Bot {self.bot_username} 注册成功") |
| else: |
| error_text = await response.text() |
| raise Exception(f"注册失败: HTTP {response.status}, {error_text}") |
| |
| except Exception as e: |
| logger.error(f"注册到Webhook服务器失败: {e}") |
| raise |
| |
| async def _unregister(self): |
| """从Webhook服务器注销""" |
| if not self.session or not self.is_registered: |
| return |
| |
| try: |
| headers = { |
| "Authorization": f"Bearer {self.config.auth_token}", |
| "Content-Type": "application/json" |
| } |
| |
| async with self.session.post( |
| f"{self.config.webhook_url}/unregister/{self.bot_username}", |
| headers=headers |
| ) as response: |
| if response.status == 200: |
| logger.info(f"Bot {self.bot_username} 注销成功") |
| else: |
| logger.warning(f"注销失败: HTTP {response.status}") |
| |
| except Exception as e: |
| logger.error(f"注销失败: {e}") |
| finally: |
| self.is_registered = False |
| self.registration_data = None |
| |
| async def _setup_callback_server(self): |
| """设置回调服务器""" |
| self.callback_app = FastAPI( |
| title=f"Bot {self.bot_username} Webhook Callback", |
| description="接收来自Webhook服务器的消息" |
| ) |
| |
| @self.callback_app.post("/webhook") |
| async def receive_message(request: Request): |
| """接收Webhook消息""" |
| try: |
| # 验证认证头 |
| auth_header = request.headers.get("Authorization") |
| if not auth_header or not auth_header.startswith("Bearer "): |
| raise HTTPException(status_code=401, detail="Missing or invalid authorization header") |
| |
| token = auth_header.split(" ", 1)[1] |
| if token != self.config.auth_token: |
| raise HTTPException(status_code=401, detail="Invalid token") |
| |
| # 解析消息 |
| data = await request.json() |
| message_data = data.get("message") |
| if not message_data: |
| raise HTTPException(status_code=400, detail="Missing message data") |
| |
| # 创建消息对象 |
| message = BotMessage(**message_data) |
| |
| # 调用消息处理器 |
| if self.message_handler: |
| try: |
| if asyncio.iscoroutinefunction(self.message_handler): |
| await self.message_handler(message) |
| else: |
| self.message_handler(message) |
| except Exception as e: |
| logger.error(f"消息处理器执行失败: {e}") |
| |
| return {"status": "success", "timestamp": datetime.now().isoformat()} |
| |
| except Exception as e: |
| logger.error(f"处理Webhook消息失败: {e}") |
| raise HTTPException(status_code=500, detail=str(e)) |
| |
| @self.callback_app.get("/health") |
| async def health(): |
| """健康检查""" |
| return {"status": "healthy", "bot": self.bot_username} |
| |
| # 启动回调服务器 |
| import uvicorn |
| import asyncio |
| from threading import Thread |
| |
| def run_server(): |
| """在后台线程中运行回调服务器""" |
| uvicorn.run( |
| self.callback_app, |
| host="0.0.0.0", # 监听所有网络接口,允许容器间访问 |
| port=self.callback_port, |
| log_level="warning", # 减少日志噪音 |
| access_log=False |
| ) |
| |
| # 在后台线程启动服务器 |
| self.callback_server_thread = Thread(target=run_server, daemon=True) |
| self.callback_server_thread.start() |
| |
| # 等待一秒确保服务器启动 |
| await asyncio.sleep(1) |
| |
| logger.info(f"回调服务器已启动在端口 {self.callback_port}") |
| |
| def get_callback_app(self) -> Optional[FastAPI]: |
| """获取回调应用实例""" |
| return self.callback_app |
| |
| async def check_server_status(self) -> bool: |
| """检查Webhook服务器状态""" |
| if not self.session or not self.config.webhook_url: |
| return False |
| |
| try: |
| async with self.session.get(f"{self.config.webhook_url}/health") as response: |
| return response.status == 200 |
| except Exception: |
| return False |
| |
| async def get_registered_bots(self) -> Optional[list[Dict[str, Any]]]: |
| """获取已注册的Bot列表""" |
| if not self.session or not self.config.webhook_url: |
| return None |
| |
| try: |
| headers = { |
| "Authorization": f"Bearer {self.config.auth_token}", |
| "Content-Type": "application/json" |
| } |
| |
| async with self.session.get( |
| f"{self.config.webhook_url}/bots", |
| headers=headers |
| ) as response: |
| if response.status == 200: |
| data = await response.json() |
| return data.get("data", {}).get("bots", []) |
| else: |
| logger.warning(f"获取Bot列表失败: HTTP {response.status}") |
| return None |
| |
| except Exception as e: |
| logger.error(f"获取Bot列表异常: {e}") |
| return None |