blob: f49c5fb3c9ffa0f0268615859871da638bc6ec59 [file] [log] [blame] [raw]
"""
Telegram Bot主类
整合所有组件,提供完整的Telegram Bot功能
"""
import asyncio
import logging
import signal
import sys
from typing import Dict, Any, Optional
from telegram import Update
from telegram.ext import Application, MessageHandler as TGMessageHandler, filters, ContextTypes
from .interfaces import IContextManager, IFileHandler, IClaudeAgent, IStreamMessageSender
from .context_manager import ContextManager
from .file_handler import FileHandler
from .claude_adapter import ClaudeAgentAdapter
from .stream_sender import StreamMessageSender
from .message_handler import MessageHandler
from .client_adapter import TelegramClientAdapter
from ..utils.config import get_config_manager
logger = logging.getLogger(__name__)
class TelegramBot:
"""Telegram Bot主类"""
def __init__(self, config_name: str = "local"):
"""
初始化Telegram Bot
Args:
config_name: 配置文件名
"""
self.config_name = config_name
self.config_manager = get_config_manager(config_name)
self.telegram_config = self.config_manager.get_telegram_config()
# 验证配置
self._validate_config()
# 初始化组件
self.application: Optional[Application] = None
self.context_manager: Optional[IContextManager] = None
self.file_handler: Optional[IFileHandler] = None
self.claude_agent: Optional[IClaudeAgent] = None
self.stream_sender: Optional[IStreamMessageSender] = None
self.message_handler: Optional[MessageHandler] = None
self.telegram_client: Optional[TelegramClientAdapter] = None
# Webhook集成
self.webhook_integration: Optional[Any] = None # Use Any to avoid circular import
# 状态管理
self.is_running = False
self.shutdown_event = asyncio.Event()
# Webhook消息队列(用于跨线程通信)
self.webhook_message_queue = asyncio.Queue()
self.main_event_loop = None
self.webhook_queue_task = None
def _validate_config(self):
"""验证配置"""
required_keys = ['bot_token', 'allowed_users', 'allowed_groups']
for key in required_keys:
if key not in self.telegram_config:
raise ValueError(f"缺少必需的配置项: telegram.{key}")
if not self.telegram_config['bot_token'] or self.telegram_config['bot_token'] == "YOUR_BOT_TOKEN_HERE":
raise ValueError("请配置有效的Telegram Bot Token")
async def initialize(self):
"""初始化所有组件"""
try:
logger.info("初始化Telegram Bot组件...")
# 创建Application
self.application = Application.builder().token(self.telegram_config['bot_token']).build()
# 创建Telegram客户端适配器
self.telegram_client = TelegramClientAdapter(self.application.bot)
# 获取Bot信息用于实例隔离
bot_info = await self.application.bot.get_me()
bot_id = str(bot_info.id)
logger.info(f"Bot信息: {bot_info.username} (ID: {bot_id})")
# 设置主事件循环引用
self.main_event_loop = asyncio.get_running_loop()
# 创建上下文管理器(保持简单,不使用持久化)
context_limit = self.telegram_config.get('message', {}).get('context_history_limit', 50)
self.context_manager = ContextManager(
max_history_per_chat=context_limit,
storage_dir=None, # 不使用context_manager持久化
bot_id=bot_id
)
# 创建文件处理器
file_config = self.telegram_config.get('files', {})
temp_dir = file_config.get('temp_dir', 'temp/telegram')
max_file_size = file_config.get('max_file_size_mb', 20)
self.file_handler = FileHandler(temp_dir, max_file_size)
# 创建Claude Agent适配器(传递bot_id用于多实例隔离)
self.claude_agent = ClaudeAgentAdapter(bot_id=bot_id)
# 创建流式消息发送器
update_interval = self.telegram_config.get('message', {}).get('stream_update_interval', 1.0)
self.stream_sender = StreamMessageSender(self.telegram_client, update_interval)
# 创建消息处理器(传递完整的Telegram配置和Webhook回调)
self.message_handler = MessageHandler(
telegram_client=self.telegram_client,
context_manager=self.context_manager,
file_handler=self.file_handler,
claude_agent=self.claude_agent,
stream_sender=self.stream_sender,
allowed_users=self.telegram_config['allowed_users'],
allowed_groups=self.telegram_config['allowed_groups'],
telegram_config=self.telegram_config,
webhook_broadcast_callback=self.broadcast_bot_message
)
# 设置Bot用户名用于@mention检测
if hasattr(self.message_handler, 'set_bot_username'):
self.message_handler.set_bot_username(bot_info.username)
else:
self.message_handler.bot_username = bot_info.username
# 注册消息处理器
self._register_handlers()
# 初始化Webhook集成(如果启用)
await self._initialize_webhook_integration(bot_info.username)
# 启动Webhook消息队列处理器
self.webhook_queue_task = asyncio.create_task(self._process_webhook_message_queue())
logger.info("✅ Webhook消息队列处理器已创建并启动")
logger.info("Telegram Bot初始化完成")
except Exception as e:
logger.error(f"Telegram Bot初始化失败: {e}")
raise
def _register_handlers(self):
"""注册消息处理器"""
if not self.application or not self.message_handler:
raise RuntimeError("Application或MessageHandler未初始化")
# 文本消息处理器 - 支持@mentions
text_handler = TGMessageHandler(
filters.TEXT, # 允许包含@mentions的文本消息
self.message_handler.handle_text_message
)
self.application.add_handler(text_handler)
logger.debug("文本消息处理器已注册,使用filter: filters.TEXT")
# 图片消息处理器
photo_handler = TGMessageHandler(
filters.PHOTO,
self.message_handler.handle_photo_message
)
self.application.add_handler(photo_handler)
# 文档消息处理器
document_handler = TGMessageHandler(
filters.Document.ALL,
self.message_handler.handle_document_message
)
self.application.add_handler(document_handler)
# 错误处理器
self.application.add_error_handler(self._error_handler)
logger.info("消息处理器注册完成")
async def _error_handler(self, update: Optional[Update], context: ContextTypes.DEFAULT_TYPE):
"""错误处理器"""
logger.error(f"处理更新时出错: {context.error}")
if update and update.effective_chat:
try:
await self.telegram_client.send_message(
chat_id=update.effective_chat.id,
text="❌ 处理消息时出现内部错误,已记录此问题。"
)
except Exception as e:
logger.error(f"发送错误消息失败: {e}")
async def start(self):
"""启动Bot"""
try:
if self.is_running:
logger.warning("Bot已经在运行中")
return
logger.info("启动Telegram Bot...")
# 初始化组件
await self.initialize()
# 设置信号处理器
self._setup_signal_handlers()
# 启动应用
if self.application:
await self.application.initialize()
await self.application.start()
await self.application.updater.start_polling(drop_pending_updates=True)
self.is_running = True
logger.info("Telegram Bot启动成功,开始接收消息...")
# 发送启动通知
await self._send_startup_notification()
# 等待关闭信号
await self.shutdown_event.wait()
except Exception as e:
logger.error(f"启动Bot失败: {e}")
raise
finally:
await self.stop()
async def stop(self):
"""停止Bot"""
try:
if not self.is_running:
return
logger.info("正在停止Telegram Bot...")
# 发送关闭通知
await self._send_shutdown_notification()
# 停止应用
if self.application:
await self.application.updater.stop()
await self.application.stop()
await self.application.shutdown()
# 停止Webhook集成
await self._stop_webhook_integration()
# 取消Webhook队列处理器
if self.webhook_queue_task:
self.webhook_queue_task.cancel()
try:
await self.webhook_queue_task
except asyncio.CancelledError:
pass
# 清理资源
if self.file_handler:
await self.file_handler.cleanup_temp_files()
if self.context_manager:
self.context_manager.cleanup_old_chats()
self.is_running = False
logger.info("Telegram Bot已停止")
except Exception as e:
logger.error(f"停止Bot时出错: {e}")
def _setup_signal_handlers(self):
"""设置信号处理器"""
def signal_handler(signum, frame):
logger.info(f"收到信号 {signum},准备关闭...")
self.shutdown_event.set()
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
async def _send_startup_notification(self):
"""发送启动通知"""
try:
message = "🤖 Claude Telegram Bot 已启动\n\n✅ 系统状态: 正常运行\n📝 配置: 已加载\n🔄 准备接收消息..."
# 发送给所有允许的用户
for user_id in self.telegram_config['allowed_users']:
try:
await self.telegram_client.send_message(chat_id=user_id, text=message)
except Exception as e:
logger.warning(f"发送启动通知给用户{user_id}失败: {e}")
except Exception as e:
logger.error(f"发送启动通知失败: {e}")
async def _send_shutdown_notification(self):
"""发送关闭通知"""
try:
message = "🤖 Claude Telegram Bot 正在关闭\n\n⏹️ 系统状态: 准备停止\n💾 数据已保存\n👋 再见!"
# 发送给所有允许的用户
for user_id in self.telegram_config['allowed_users']:
try:
await self.telegram_client.send_message(chat_id=user_id, text=message)
except Exception as e:
logger.warning(f"发送关闭通知给用户{user_id}失败: {e}")
except Exception as e:
logger.error(f"发送关闭通知失败: {e}")
def get_stats(self) -> Dict[str, Any]:
"""获取Bot统计信息"""
stats = {
'is_running': self.is_running,
'config_name': self.config_name,
'allowed_users_count': len(self.telegram_config['allowed_users']),
'allowed_groups_count': len(self.telegram_config['allowed_groups'])
}
if self.context_manager:
stats['active_chats'] = self.context_manager.get_chat_count()
return stats
async def send_admin_message(self, message: str):
"""发送管理员消息"""
try:
for user_id in self.telegram_config['allowed_users']:
try:
await self.telegram_client.send_message(chat_id=user_id, text=f"📢 管理员消息:\n{message}")
except Exception as e:
logger.warning(f"发送管理员消息给用户{user_id}失败: {e}")
except Exception as e:
logger.error(f"发送管理员消息失败: {e}")
# 依赖注入方法,便于测试
def set_context_manager(self, context_manager: IContextManager):
"""设置上下文管理器(用于测试)"""
self.context_manager = context_manager
def set_file_handler(self, file_handler: IFileHandler):
"""设置文件处理器(用于测试)"""
self.file_handler = file_handler
def set_claude_agent(self, claude_agent: IClaudeAgent):
"""设置Claude Agent(用于测试)"""
self.claude_agent = claude_agent
def set_stream_sender(self, stream_sender: IStreamMessageSender):
"""设置流式发送器(用于测试)"""
self.stream_sender = stream_sender
async def _initialize_webhook_integration(self, bot_username: str):
"""初始化Webhook集成"""
try:
# 获取Webhook配置
webhook_config = self.config_manager.get_webhook_config()
# 延迟导入避免循环依赖
from ..webhook.telegram_integration import create_webhook_integration_from_config
# 创建Webhook集成
self.webhook_integration = create_webhook_integration_from_config(
telegram_client=self.telegram_client,
webhook_config_dict=webhook_config,
bot_username=bot_username,
message_saver=self._save_webhook_message_callback
)
# 启动Webhook集成
if self.webhook_integration:
callback_port = webhook_config.get('client', {}).get('callback_port', 8081)
await self.webhook_integration.start(callback_port)
logger.info(f"Webhook集成已启动: {bot_username}")
else:
logger.info("Webhook集成未启用或配置不完整")
except Exception as e:
logger.error(f"初始化Webhook集成失败: {e}")
# Webhook失败不应影响Bot正常运行
self.webhook_integration = None
async def _process_webhook_message_queue(self):
"""处理Webhook消息队列"""
logger.error("🚨 Webhook消息队列处理器已启动 - 使用ERROR级别确保可见")
while True:
try:
# 等待队列中的消息,超时1秒
message_data = await asyncio.wait_for(
self.webhook_message_queue.get(),
timeout=1.0
)
logger.error(f"🚨 从队列中获取到Webhook消息 - 使用ERROR级别: {message_data['message'][:50]}...")
# 在主事件循环中处理消息
await self._process_webhook_message_from_queue(message_data)
except asyncio.TimeoutError:
# 超时是正常的,继续循环
continue
except asyncio.CancelledError:
# 任务被取消,退出循环
logger.info("Webhook消息队列处理器被取消")
break
except Exception as e:
logger.error(f"处理Webhook消息队列失败: {e}")
logger.info("Webhook消息队列处理器已停止")
async def _process_webhook_message_from_queue(self, message_data):
"""从队列处理Webhook消息"""
try:
chat_id = message_data['chat_id']
user_id = message_data['user_id']
message = message_data['message']
is_bot = message_data['is_bot']
user_info = message_data['user_info']
if not self.message_handler:
logger.warning("MessageHandler未初始化,无法处理Webhook消息")
return
logger.info(f"在主事件循环中处理Webhook消息: {message[:50]}...")
# 在主事件循环中调用处理器
await self.message_handler.handle_webhook_message(
chat_id=int(chat_id),
user_id=user_id,
message_text=message,
user_info=user_info,
chat_type='supergroup',
is_bot=is_bot
)
logger.debug(f"Webhook消息已在主事件循环中处理完成: {chat_id}")
except Exception as e:
logger.error(f"在主事件循环中处理Webhook消息失败: {e}")
async def _save_webhook_message_callback(
self,
chat_id: str,
user_id: int,
message: str,
is_bot: bool,
user_info: dict
):
"""
保存Webhook消息的回调函数 - 使用队列跨线程通信
将消息放入队列,由主事件循环处理
"""
try:
# 构建标准的用户信息格式
user_info_formatted = {
'user_id': user_id,
'chat_id': int(chat_id), # 添加缺少的chat_id字段
'first_name': user_info.get('first_name', f'WebhookUser_{user_id}'),
'last_name': user_info.get('last_name'),
'username': user_info.get('username'),
'display_name': user_info.get('display_name', f'WebhookUser_{user_id}'),
'chat_type': 'supergroup' # 假设是群组
}
# 准备队列消息数据
message_data = {
'chat_id': chat_id,
'user_id': user_id,
'message': message,
'is_bot': is_bot,
'user_info': user_info_formatted
}
# 将消息放入队列(使用主事件循环)
if self.main_event_loop:
future = asyncio.run_coroutine_threadsafe(
self.webhook_message_queue.put(message_data),
self.main_event_loop
)
# 等待放入队列完成
future.result(timeout=5.0)
logger.info(f"✅ Webhook消息已成功放入队列: {message[:50]}...")
else:
logger.error("主事件循环引用未设置,无法处理Webhook消息")
except Exception as e:
logger.error(f"Webhook消息队列处理失败: {e}")
# 降级到简单保存
if self.claude_agent:
try:
# 同步调用保存方法(如果支持)
pass # 这里不再调用异步方法,避免事件循环问题
except Exception as e2:
logger.error(f"降级保存也失败: {e2}")
async def broadcast_bot_message(
self,
group_id: int,
message_content: str,
sender_info: dict,
reply_info: dict = None,
telegram_message_id: int = None,
message_type: str = "text"
) -> bool:
"""
广播Bot消息到Webhook服务器
这个方法可以被消息处理器调用,用于广播Bot的回复消息
"""
if not self.webhook_integration:
return False
try:
return await self.webhook_integration.broadcast_bot_message(
group_id=group_id,
message_content=message_content,
sender_info=sender_info,
reply_info=reply_info,
telegram_message_id=telegram_message_id,
message_type=message_type
)
except Exception as e:
logger.error(f"广播Bot消息失败: {e}")
return False
async def _stop_webhook_integration(self):
"""停止Webhook集成"""
if self.webhook_integration:
try:
await self.webhook_integration.stop()
logger.info("Webhook集成已停止")
except Exception as e:
logger.error(f"停止Webhook集成失败: {e}")
finally:
self.webhook_integration = None