| """ |
| Telegram Bot主类 |
| 整合所有组件,提供完整的Telegram Bot功能 |
| """ |
| |
| import asyncio |
| import logging |
| import signal |
| import sys |
| from typing import Dict, Any, Optional |
| from telegram import Update |
| from telegram.ext import Application, MessageHandler as TGMessageHandler, filters, ContextTypes |
| |
| from .interfaces import IContextManager, IFileHandler, IClaudeAgent, IStreamMessageSender |
| from .context_manager import ContextManager |
| from .file_handler import FileHandler |
| from .claude_adapter import ClaudeAgentAdapter |
| from .stream_sender import StreamMessageSender |
| from .message_handler import MessageHandler |
| from .client_adapter import TelegramClientAdapter |
| from ..utils.config import get_config_manager |
| |
| |
| logger = logging.getLogger(__name__) |
| |
| |
| class TelegramBot: |
| """Telegram Bot主类""" |
| |
| def __init__(self, config_name: str = "local"): |
| """ |
| 初始化Telegram Bot |
| |
| Args: |
| config_name: 配置文件名 |
| """ |
| self.config_name = config_name |
| self.config_manager = get_config_manager(config_name) |
| self.telegram_config = self.config_manager.get_telegram_config() |
| |
| # 验证配置 |
| self._validate_config() |
| |
| # 初始化组件 |
| self.application: Optional[Application] = None |
| self.context_manager: Optional[IContextManager] = None |
| self.file_handler: Optional[IFileHandler] = None |
| self.claude_agent: Optional[IClaudeAgent] = None |
| self.stream_sender: Optional[IStreamMessageSender] = None |
| self.message_handler: Optional[MessageHandler] = None |
| self.telegram_client: Optional[TelegramClientAdapter] = None |
| |
| # Webhook集成 |
| self.webhook_integration: Optional[Any] = None # Use Any to avoid circular import |
| |
| # 状态管理 |
| self.is_running = False |
| self.shutdown_event = asyncio.Event() |
| |
| # Webhook消息队列(用于跨线程通信) |
| self.webhook_message_queue = asyncio.Queue() |
| self.main_event_loop = None |
| self.webhook_queue_task = None |
| |
| def _validate_config(self): |
| """验证配置""" |
| required_keys = ['bot_token', 'allowed_users', 'allowed_groups'] |
| for key in required_keys: |
| if key not in self.telegram_config: |
| raise ValueError(f"缺少必需的配置项: telegram.{key}") |
| |
| if not self.telegram_config['bot_token'] or self.telegram_config['bot_token'] == "YOUR_BOT_TOKEN_HERE": |
| raise ValueError("请配置有效的Telegram Bot Token") |
| |
| async def initialize(self): |
| """初始化所有组件""" |
| try: |
| logger.info("初始化Telegram Bot组件...") |
| |
| # 创建Application |
| self.application = Application.builder().token(self.telegram_config['bot_token']).build() |
| |
| # 创建Telegram客户端适配器 |
| self.telegram_client = TelegramClientAdapter(self.application.bot) |
| |
| # 获取Bot信息用于实例隔离 |
| bot_info = await self.application.bot.get_me() |
| bot_id = str(bot_info.id) |
| logger.info(f"Bot信息: {bot_info.username} (ID: {bot_id})") |
| |
| # 设置主事件循环引用 |
| self.main_event_loop = asyncio.get_running_loop() |
| |
| # 创建上下文管理器(保持简单,不使用持久化) |
| context_limit = self.telegram_config.get('message', {}).get('context_history_limit', 50) |
| self.context_manager = ContextManager( |
| max_history_per_chat=context_limit, |
| storage_dir=None, # 不使用context_manager持久化 |
| bot_id=bot_id |
| ) |
| |
| # 创建文件处理器 |
| file_config = self.telegram_config.get('files', {}) |
| temp_dir = file_config.get('temp_dir', 'temp/telegram') |
| max_file_size = file_config.get('max_file_size_mb', 20) |
| self.file_handler = FileHandler(temp_dir, max_file_size) |
| |
| # 创建Claude Agent适配器(传递bot_id用于多实例隔离) |
| self.claude_agent = ClaudeAgentAdapter(bot_id=bot_id) |
| |
| # 创建流式消息发送器 |
| update_interval = self.telegram_config.get('message', {}).get('stream_update_interval', 1.0) |
| self.stream_sender = StreamMessageSender(self.telegram_client, update_interval) |
| |
| # 创建消息处理器(传递完整的Telegram配置和Webhook回调) |
| self.message_handler = MessageHandler( |
| telegram_client=self.telegram_client, |
| context_manager=self.context_manager, |
| file_handler=self.file_handler, |
| claude_agent=self.claude_agent, |
| stream_sender=self.stream_sender, |
| allowed_users=self.telegram_config['allowed_users'], |
| allowed_groups=self.telegram_config['allowed_groups'], |
| telegram_config=self.telegram_config, |
| webhook_broadcast_callback=self.broadcast_bot_message |
| ) |
| |
| # 设置Bot用户名用于@mention检测 |
| if hasattr(self.message_handler, 'set_bot_username'): |
| self.message_handler.set_bot_username(bot_info.username) |
| else: |
| self.message_handler.bot_username = bot_info.username |
| |
| # 注册消息处理器 |
| self._register_handlers() |
| |
| # 初始化Webhook集成(如果启用) |
| await self._initialize_webhook_integration(bot_info.username) |
| |
| # 启动Webhook消息队列处理器 |
| self.webhook_queue_task = asyncio.create_task(self._process_webhook_message_queue()) |
| logger.info("✅ Webhook消息队列处理器已创建并启动") |
| |
| logger.info("Telegram Bot初始化完成") |
| |
| except Exception as e: |
| logger.error(f"Telegram Bot初始化失败: {e}") |
| raise |
| |
| def _register_handlers(self): |
| """注册消息处理器""" |
| if not self.application or not self.message_handler: |
| raise RuntimeError("Application或MessageHandler未初始化") |
| |
| # 文本消息处理器 - 支持@mentions |
| text_handler = TGMessageHandler( |
| filters.TEXT, # 允许包含@mentions的文本消息 |
| self.message_handler.handle_text_message |
| ) |
| self.application.add_handler(text_handler) |
| |
| logger.debug("文本消息处理器已注册,使用filter: filters.TEXT") |
| |
| # 图片消息处理器 |
| photo_handler = TGMessageHandler( |
| filters.PHOTO, |
| self.message_handler.handle_photo_message |
| ) |
| self.application.add_handler(photo_handler) |
| |
| # 文档消息处理器 |
| document_handler = TGMessageHandler( |
| filters.Document.ALL, |
| self.message_handler.handle_document_message |
| ) |
| self.application.add_handler(document_handler) |
| |
| # 错误处理器 |
| self.application.add_error_handler(self._error_handler) |
| |
| logger.info("消息处理器注册完成") |
| |
| async def _error_handler(self, update: Optional[Update], context: ContextTypes.DEFAULT_TYPE): |
| """错误处理器""" |
| logger.error(f"处理更新时出错: {context.error}") |
| |
| if update and update.effective_chat: |
| try: |
| await self.telegram_client.send_message( |
| chat_id=update.effective_chat.id, |
| text="❌ 处理消息时出现内部错误,已记录此问题。" |
| ) |
| except Exception as e: |
| logger.error(f"发送错误消息失败: {e}") |
| |
| async def start(self): |
| """启动Bot""" |
| try: |
| if self.is_running: |
| logger.warning("Bot已经在运行中") |
| return |
| |
| logger.info("启动Telegram Bot...") |
| |
| # 初始化组件 |
| await self.initialize() |
| |
| # 设置信号处理器 |
| self._setup_signal_handlers() |
| |
| # 启动应用 |
| if self.application: |
| await self.application.initialize() |
| await self.application.start() |
| await self.application.updater.start_polling(drop_pending_updates=True) |
| |
| self.is_running = True |
| logger.info("Telegram Bot启动成功,开始接收消息...") |
| |
| # 发送启动通知 |
| await self._send_startup_notification() |
| |
| # 等待关闭信号 |
| await self.shutdown_event.wait() |
| |
| except Exception as e: |
| logger.error(f"启动Bot失败: {e}") |
| raise |
| finally: |
| await self.stop() |
| |
| async def stop(self): |
| """停止Bot""" |
| try: |
| if not self.is_running: |
| return |
| |
| logger.info("正在停止Telegram Bot...") |
| |
| # 发送关闭通知 |
| await self._send_shutdown_notification() |
| |
| # 停止应用 |
| if self.application: |
| await self.application.updater.stop() |
| await self.application.stop() |
| await self.application.shutdown() |
| |
| # 停止Webhook集成 |
| await self._stop_webhook_integration() |
| |
| # 取消Webhook队列处理器 |
| if self.webhook_queue_task: |
| self.webhook_queue_task.cancel() |
| try: |
| await self.webhook_queue_task |
| except asyncio.CancelledError: |
| pass |
| |
| # 清理资源 |
| if self.file_handler: |
| await self.file_handler.cleanup_temp_files() |
| |
| if self.context_manager: |
| self.context_manager.cleanup_old_chats() |
| |
| self.is_running = False |
| logger.info("Telegram Bot已停止") |
| |
| except Exception as e: |
| logger.error(f"停止Bot时出错: {e}") |
| |
| def _setup_signal_handlers(self): |
| """设置信号处理器""" |
| def signal_handler(signum, frame): |
| logger.info(f"收到信号 {signum},准备关闭...") |
| self.shutdown_event.set() |
| |
| signal.signal(signal.SIGINT, signal_handler) |
| signal.signal(signal.SIGTERM, signal_handler) |
| |
| async def _send_startup_notification(self): |
| """发送启动通知""" |
| try: |
| message = "🤖 Claude Telegram Bot 已启动\n\n✅ 系统状态: 正常运行\n📝 配置: 已加载\n🔄 准备接收消息..." |
| |
| # 发送给所有允许的用户 |
| for user_id in self.telegram_config['allowed_users']: |
| try: |
| await self.telegram_client.send_message(chat_id=user_id, text=message) |
| except Exception as e: |
| logger.warning(f"发送启动通知给用户{user_id}失败: {e}") |
| |
| except Exception as e: |
| logger.error(f"发送启动通知失败: {e}") |
| |
| async def _send_shutdown_notification(self): |
| """发送关闭通知""" |
| try: |
| message = "🤖 Claude Telegram Bot 正在关闭\n\n⏹️ 系统状态: 准备停止\n💾 数据已保存\n👋 再见!" |
| |
| # 发送给所有允许的用户 |
| for user_id in self.telegram_config['allowed_users']: |
| try: |
| await self.telegram_client.send_message(chat_id=user_id, text=message) |
| except Exception as e: |
| logger.warning(f"发送关闭通知给用户{user_id}失败: {e}") |
| |
| except Exception as e: |
| logger.error(f"发送关闭通知失败: {e}") |
| |
| def get_stats(self) -> Dict[str, Any]: |
| """获取Bot统计信息""" |
| stats = { |
| 'is_running': self.is_running, |
| 'config_name': self.config_name, |
| 'allowed_users_count': len(self.telegram_config['allowed_users']), |
| 'allowed_groups_count': len(self.telegram_config['allowed_groups']) |
| } |
| |
| if self.context_manager: |
| stats['active_chats'] = self.context_manager.get_chat_count() |
| |
| return stats |
| |
| async def send_admin_message(self, message: str): |
| """发送管理员消息""" |
| try: |
| for user_id in self.telegram_config['allowed_users']: |
| try: |
| await self.telegram_client.send_message(chat_id=user_id, text=f"📢 管理员消息:\n{message}") |
| except Exception as e: |
| logger.warning(f"发送管理员消息给用户{user_id}失败: {e}") |
| except Exception as e: |
| logger.error(f"发送管理员消息失败: {e}") |
| |
| # 依赖注入方法,便于测试 |
| def set_context_manager(self, context_manager: IContextManager): |
| """设置上下文管理器(用于测试)""" |
| self.context_manager = context_manager |
| |
| def set_file_handler(self, file_handler: IFileHandler): |
| """设置文件处理器(用于测试)""" |
| self.file_handler = file_handler |
| |
| def set_claude_agent(self, claude_agent: IClaudeAgent): |
| """设置Claude Agent(用于测试)""" |
| self.claude_agent = claude_agent |
| |
| def set_stream_sender(self, stream_sender: IStreamMessageSender): |
| """设置流式发送器(用于测试)""" |
| self.stream_sender = stream_sender |
| |
| async def _initialize_webhook_integration(self, bot_username: str): |
| """初始化Webhook集成""" |
| try: |
| # 获取Webhook配置 |
| webhook_config = self.config_manager.get_webhook_config() |
| |
| # 延迟导入避免循环依赖 |
| from ..webhook.telegram_integration import create_webhook_integration_from_config |
| |
| # 创建Webhook集成 |
| self.webhook_integration = create_webhook_integration_from_config( |
| telegram_client=self.telegram_client, |
| webhook_config_dict=webhook_config, |
| bot_username=bot_username, |
| message_saver=self._save_webhook_message_callback |
| ) |
| |
| # 启动Webhook集成 |
| if self.webhook_integration: |
| callback_port = webhook_config.get('client', {}).get('callback_port', 8081) |
| await self.webhook_integration.start(callback_port) |
| logger.info(f"Webhook集成已启动: {bot_username}") |
| else: |
| logger.info("Webhook集成未启用或配置不完整") |
| |
| except Exception as e: |
| logger.error(f"初始化Webhook集成失败: {e}") |
| # Webhook失败不应影响Bot正常运行 |
| self.webhook_integration = None |
| |
| async def _process_webhook_message_queue(self): |
| """处理Webhook消息队列""" |
| logger.error("🚨 Webhook消息队列处理器已启动 - 使用ERROR级别确保可见") |
| |
| while True: |
| try: |
| # 等待队列中的消息,超时1秒 |
| message_data = await asyncio.wait_for( |
| self.webhook_message_queue.get(), |
| timeout=1.0 |
| ) |
| |
| logger.error(f"🚨 从队列中获取到Webhook消息 - 使用ERROR级别: {message_data['message'][:50]}...") |
| |
| # 在主事件循环中处理消息 |
| await self._process_webhook_message_from_queue(message_data) |
| |
| except asyncio.TimeoutError: |
| # 超时是正常的,继续循环 |
| continue |
| except asyncio.CancelledError: |
| # 任务被取消,退出循环 |
| logger.info("Webhook消息队列处理器被取消") |
| break |
| except Exception as e: |
| logger.error(f"处理Webhook消息队列失败: {e}") |
| |
| logger.info("Webhook消息队列处理器已停止") |
| |
| async def _process_webhook_message_from_queue(self, message_data): |
| """从队列处理Webhook消息""" |
| try: |
| chat_id = message_data['chat_id'] |
| user_id = message_data['user_id'] |
| message = message_data['message'] |
| is_bot = message_data['is_bot'] |
| user_info = message_data['user_info'] |
| |
| if not self.message_handler: |
| logger.warning("MessageHandler未初始化,无法处理Webhook消息") |
| return |
| |
| logger.info(f"在主事件循环中处理Webhook消息: {message[:50]}...") |
| |
| # 在主事件循环中调用处理器 |
| await self.message_handler.handle_webhook_message( |
| chat_id=int(chat_id), |
| user_id=user_id, |
| message_text=message, |
| user_info=user_info, |
| chat_type='supergroup', |
| is_bot=is_bot |
| ) |
| |
| logger.debug(f"Webhook消息已在主事件循环中处理完成: {chat_id}") |
| |
| except Exception as e: |
| logger.error(f"在主事件循环中处理Webhook消息失败: {e}") |
| |
| async def _save_webhook_message_callback( |
| self, |
| chat_id: str, |
| user_id: int, |
| message: str, |
| is_bot: bool, |
| user_info: dict |
| ): |
| """ |
| 保存Webhook消息的回调函数 - 使用队列跨线程通信 |
| |
| 将消息放入队列,由主事件循环处理 |
| """ |
| try: |
| # 构建标准的用户信息格式 |
| user_info_formatted = { |
| 'user_id': user_id, |
| 'chat_id': int(chat_id), # 添加缺少的chat_id字段 |
| 'first_name': user_info.get('first_name', f'WebhookUser_{user_id}'), |
| 'last_name': user_info.get('last_name'), |
| 'username': user_info.get('username'), |
| 'display_name': user_info.get('display_name', f'WebhookUser_{user_id}'), |
| 'chat_type': 'supergroup' # 假设是群组 |
| } |
| |
| # 准备队列消息数据 |
| message_data = { |
| 'chat_id': chat_id, |
| 'user_id': user_id, |
| 'message': message, |
| 'is_bot': is_bot, |
| 'user_info': user_info_formatted |
| } |
| |
| # 将消息放入队列(使用主事件循环) |
| if self.main_event_loop: |
| future = asyncio.run_coroutine_threadsafe( |
| self.webhook_message_queue.put(message_data), |
| self.main_event_loop |
| ) |
| # 等待放入队列完成 |
| future.result(timeout=5.0) |
| logger.info(f"✅ Webhook消息已成功放入队列: {message[:50]}...") |
| else: |
| logger.error("主事件循环引用未设置,无法处理Webhook消息") |
| |
| except Exception as e: |
| logger.error(f"Webhook消息队列处理失败: {e}") |
| # 降级到简单保存 |
| if self.claude_agent: |
| try: |
| # 同步调用保存方法(如果支持) |
| pass # 这里不再调用异步方法,避免事件循环问题 |
| except Exception as e2: |
| logger.error(f"降级保存也失败: {e2}") |
| |
| async def broadcast_bot_message( |
| self, |
| group_id: int, |
| message_content: str, |
| sender_info: dict, |
| reply_info: dict = None, |
| telegram_message_id: int = None, |
| message_type: str = "text" |
| ) -> bool: |
| """ |
| 广播Bot消息到Webhook服务器 |
| |
| 这个方法可以被消息处理器调用,用于广播Bot的回复消息 |
| """ |
| if not self.webhook_integration: |
| return False |
| |
| try: |
| return await self.webhook_integration.broadcast_bot_message( |
| group_id=group_id, |
| message_content=message_content, |
| sender_info=sender_info, |
| reply_info=reply_info, |
| telegram_message_id=telegram_message_id, |
| message_type=message_type |
| ) |
| except Exception as e: |
| logger.error(f"广播Bot消息失败: {e}") |
| return False |
| |
| async def _stop_webhook_integration(self): |
| """停止Webhook集成""" |
| if self.webhook_integration: |
| try: |
| await self.webhook_integration.stop() |
| logger.info("Webhook集成已停止") |
| except Exception as e: |
| logger.error(f"停止Webhook集成失败: {e}") |
| finally: |
| self.webhook_integration = None |