blob: abf3aec0f0d5a8a9b95a8aeb8929e4c2c929ed6f [file] [log] [blame] [raw]
"""
Bot Webhook集成模块
将Webhook功能集成到现有的Telegram Bot中
"""
import asyncio
import logging
from typing import Optional, Dict, Any
from datetime import datetime
from .client import WebhookClient
from .models import BotMessage, UserInfo, ReplyInfo, WebhookConfig
from ..telegram.interfaces import ITelegramClient
logger = logging.getLogger(__name__)
class BotWebhookIntegration:
"""Bot Webhook集成类"""
def __init__(
self,
telegram_client: ITelegramClient,
webhook_config: WebhookConfig,
bot_username: str,
subscribed_groups: list[int]
):
"""
初始化Webhook集成
Args:
telegram_client: Telegram客户端
webhook_config: Webhook配置
bot_username: Bot用户名
subscribed_groups: 订阅的群组ID列表
"""
self.telegram_client = telegram_client
self.webhook_config = webhook_config
self.bot_username = bot_username
self.subscribed_groups = subscribed_groups
# Webhook客户端
self.webhook_client: Optional[WebhookClient] = None
# 消息处理回调
self.message_callback = self._handle_webhook_message
# 启动状态
self.is_running = False
async def start(self, callback_port: Optional[int] = None):
"""启动Webhook集成"""
try:
# 创建Webhook客户端
self.webhook_client = WebhookClient(
config=self.webhook_config,
bot_username=self.bot_username,
subscribed_groups=self.subscribed_groups,
message_handler=self.message_callback
)
# 启动客户端
await self.webhook_client.start(callback_port)
self.is_running = True
logger.info(f"Bot Webhook集成启动成功: {self.bot_username}")
except Exception as e:
logger.error(f"启动Webhook集成失败: {e}")
raise
async def stop(self):
"""停止Webhook集成"""
try:
if self.webhook_client:
await self.webhook_client.stop()
self.webhook_client = None
self.is_running = False
logger.info(f"Bot Webhook集成已停止: {self.bot_username}")
except Exception as e:
logger.error(f"停止Webhook集成失败: {e}")
async def broadcast_bot_message(
self,
group_id: int,
message_content: str,
sender_info: Dict[str, Any],
reply_info: Optional[Dict[str, Any]] = None,
telegram_message_id: Optional[int] = None,
message_type: str = "text"
) -> bool:
"""
广播Bot消息到Webhook服务器
Args:
group_id: 群组ID
message_content: 消息内容
sender_info: 发送者信息
reply_info: 回复信息(可选)
telegram_message_id: Telegram消息ID(可选)
message_type: 消息类型
Returns:
是否广播成功
"""
if not self.webhook_client or not self.is_running:
logger.warning("Webhook集成未启动,无法广播消息")
return False
try:
# 构建用户信息
user_info = UserInfo(
user_id=sender_info.get('user_id', 0),
username=sender_info.get('username'),
first_name=sender_info.get('first_name'),
last_name=sender_info.get('last_name'),
is_bot=sender_info.get('is_bot', True)
)
# 构建回复信息
webhook_reply_info = None
if reply_info:
reply_user_info = reply_info.get('user_info', {})
webhook_reply_info = ReplyInfo(
user_info=UserInfo(
user_id=reply_user_info.get('user_id', 0),
username=reply_user_info.get('username'),
first_name=reply_user_info.get('first_name'),
last_name=reply_user_info.get('last_name'),
is_bot=reply_user_info.get('is_bot', False)
),
timestamp=reply_info.get('timestamp'),
content=reply_info.get('content', ''),
message_id=reply_info.get('message_id')
)
# 构建消息对象
bot_message = BotMessage(
bot_username=self.bot_username,
group_id=group_id,
message_content=message_content,
message_type=message_type,
sender_info=user_info,
reply_info=webhook_reply_info,
telegram_message_id=telegram_message_id,
timestamp=datetime.now()
)
# 广播消息
success = await self.webhook_client.broadcast_message(bot_message)
if success:
logger.debug(f"Bot消息广播成功: {bot_message.message_id}")
else:
logger.warning(f"Bot消息广播失败: {bot_message.message_id}")
return success
except Exception as e:
logger.error(f"广播Bot消息异常: {e}")
return False
async def _handle_webhook_message(self, message: BotMessage):
"""
处理来自Webhook的消息
这个方法会被调用当其他Bot发送消息时,
将消息转换为适合当前Bot处理的格式
"""
try:
logger.debug(f"收到Webhook消息: {message.bot_username} -> 群组 {message.group_id}")
# 检查是否是我们订阅的群组
if message.group_id not in self.subscribed_groups:
logger.debug(f"忽略非订阅群组的消息: {message.group_id}")
return
# 检查是否是我们自己发送的消息(双重保险)
if message.bot_username == self.bot_username:
logger.debug("忽略自己发送的消息")
return
# 转换消息格式并保存到上下文
await self._process_bot_message_to_context(message)
logger.info(f"成功处理来自 {message.bot_username} 的Webhook消息")
except Exception as e:
logger.error(f"处理Webhook消息失败: {e}")
async def _process_bot_message_to_context(self, message: BotMessage):
"""
将Webhook消息处理为上下文消息
这个方法需要与现有的上下文管理系统集成,
将其他Bot的消息作为"虚拟"消息添加到对话历史中
"""
try:
# 这里需要根据实际的上下文管理器接口来实现
# 由于我们有多种实现(ContextManager和Claude Agent),
# 这里提供一个通用的处理框架
chat_id = str(message.group_id)
# 构建用户显示名
sender_info = message.sender_info
if sender_info.is_bot:
user_display = f"🤖 {message.bot_username}"
elif sender_info.username:
user_display = f"@{sender_info.username}"
elif sender_info.first_name:
display_name = sender_info.first_name
if sender_info.last_name:
display_name += f" {sender_info.last_name}"
user_display = display_name
else:
user_display = f"用户{sender_info.user_id}"
# 构建时间戳
time_str = message.timestamp.strftime('%H:%M')
# 构建回复上下文
reply_context = ""
if message.reply_info:
reply_info = message.reply_info
reply_user_info = reply_info.user_info
if reply_user_info.is_bot:
reply_author = "🤖 Bot"
elif reply_user_info.username:
reply_author = f"@{reply_user_info.username}"
elif reply_user_info.first_name:
reply_author = reply_user_info.first_name
if reply_user_info.last_name:
reply_author += f" {reply_user_info.last_name}"
else:
reply_author = f"用户{reply_user_info.user_id}"
reply_time = ""
if reply_info.timestamp:
reply_time = f" {reply_info.timestamp.strftime('%H:%M')}"
reply_context = f"↳ 回复 [{reply_author}{reply_time}]: {reply_info.content}\n"
# 构建完整消息内容
full_message = f"{reply_context}[{user_display} {time_str}] {message.message_content}"
# 构建用户信息字典
user_info_dict = {
'user_id': sender_info.user_id,
'username': sender_info.username,
'first_name': sender_info.first_name,
'last_name': sender_info.last_name,
'is_bot': sender_info.is_bot,
'bot_username': message.bot_username, # 标识这是来自其他Bot的消息
'source': 'webhook', # 标识消息来源
'chat_id': message.group_id,
'chat_type': 'group'
}
# 将回复信息添加到user_info中
if message.reply_info:
user_info_dict['reply_info'] = {
'user_info': {
'user_id': message.reply_info.user_info.user_id,
'username': message.reply_info.user_info.username,
'first_name': message.reply_info.user_info.first_name,
'last_name': message.reply_info.user_info.last_name,
'is_bot': message.reply_info.user_info.is_bot,
},
'timestamp': message.reply_info.timestamp,
'content': message.reply_info.content,
'message_id': message.reply_info.message_id
}
# 这里需要调用实际的消息保存方法
# 这个方法应该由使用此集成的组件来提供
await self._save_webhook_message_to_context(
chat_id,
sender_info.user_id,
full_message,
user_info_dict
)
logger.debug(f"Webhook消息已保存到上下文: {chat_id}")
except Exception as e:
logger.error(f"处理Webhook消息到上下文失败: {e}")
async def _save_webhook_message_to_context(
self,
chat_id: str,
user_id: int,
message: str,
user_info: Dict[str, Any]
):
"""
保存Webhook消息到上下文
这个方法需要由具体的集成实现来覆盖,
以调用实际的上下文管理器
"""
# 这是一个占位符方法,具体实现应该在子类或通过依赖注入提供
logger.debug(f"保存Webhook消息到上下文的占位符实现: {chat_id}, {user_id}, {message[:50]}...")
def get_status(self) -> Dict[str, Any]:
"""获取Webhook集成状态"""
return {
"is_running": self.is_running,
"bot_username": self.bot_username,
"subscribed_groups": self.subscribed_groups,
"webhook_url": self.webhook_config.webhook_url,
"webhook_client_registered": self.webhook_client.is_registered if self.webhook_client else False
}
async def check_webhook_server(self) -> bool:
"""检查Webhook服务器状态"""
if not self.webhook_client:
return False
return await self.webhook_client.check_server_status()
async def get_registered_bots(self) -> Optional[list[Dict[str, Any]]]:
"""获取已注册的Bot列表"""
if not self.webhook_client:
return None
return await self.webhook_client.get_registered_bots()