| #!/usr/bin/env python3 |
| """ |
| Telegram Bot启动脚本 |
| """ |
| |
| import asyncio |
| import logging |
| import sys |
| import os |
| import argparse |
| import fcntl |
| import tempfile |
| from pathlib import Path |
| |
| # 添加src目录到Python路径 |
| current_dir = Path(__file__).parent |
| project_root = current_dir.parent |
| src_dir = project_root / "src" |
| sys.path.insert(0, str(src_dir)) |
| |
| from claude_agent.telegram.bot import TelegramBot |
| |
| |
| class ProcessLock: |
| """进程锁管理器,防止多个实例同时运行""" |
| |
| def __init__(self, lock_name="telegram_bot"): |
| self.lock_name = lock_name |
| self.lock_file = None |
| self.lock_path = Path(tempfile.gettempdir()) / f"{lock_name}.lock" |
| |
| def __enter__(self): |
| try: |
| self.lock_file = open(self.lock_path, 'w') |
| fcntl.flock(self.lock_file.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB) |
| self.lock_file.write(str(os.getpid())) |
| self.lock_file.flush() |
| return self |
| except (IOError, OSError): |
| if self.lock_file: |
| self.lock_file.close() |
| raise RuntimeError(f"另一个{self.lock_name}实例正在运行。请先停止其他实例。") |
| |
| def __exit__(self, exc_type, exc_val, exc_tb): |
| if self.lock_file: |
| try: |
| fcntl.flock(self.lock_file.fileno(), fcntl.LOCK_UN) |
| self.lock_file.close() |
| self.lock_path.unlink(missing_ok=True) |
| except Exception: |
| pass |
| |
| |
| def setup_logging(): |
| """设置日志配置""" |
| logging.basicConfig( |
| level=logging.INFO, # 改回INFO级别 |
| format='[%(asctime)s] %(levelname)s %(name)s: %(message)s', |
| datefmt='%Y-%m-%d %H:%M:%S' |
| ) |
| |
| # 设置第三方库的日志级别 |
| logging.getLogger('httpx').setLevel(logging.WARNING) |
| logging.getLogger('httpcore').setLevel(logging.WARNING) # 也关闭httpcore的DEBUG |
| logging.getLogger('telegram').setLevel(logging.INFO) |
| |
| # Claude Agent SDK保持INFO级别 |
| logging.getLogger('claude_agent_sdk').setLevel(logging.INFO) |
| |
| |
| def parse_arguments(): |
| """解析命令行参数""" |
| parser = argparse.ArgumentParser(description='启动Telegram Bot') |
| parser.add_argument( |
| '--working-dir', |
| type=str, |
| help='Bot工作目录,将从该目录读取CLAUDE.md文件' |
| ) |
| return parser.parse_args() |
| |
| |
| async def get_bot_id(config_name: str) -> str: |
| """获取Bot ID用于实例隔离""" |
| try: |
| from claude_agent.utils.config import get_config_manager |
| config_manager = get_config_manager(config_name) |
| telegram_config = config_manager.get_telegram_config() |
| |
| bot_token = telegram_config.get('bot_token') |
| if not bot_token or bot_token == "YOUR_BOT_TOKEN_HERE": |
| raise ValueError("无效的Bot Token") |
| |
| # 从token中提取bot_id(token格式:bot_id:bot_secret) |
| bot_id = bot_token.split(':')[0] |
| if not bot_id.isdigit(): |
| raise ValueError("无效的Bot Token格式") |
| |
| return bot_id |
| |
| except Exception as e: |
| # 如果无法获取bot_id,使用配置名作为后备方案 |
| logger = logging.getLogger(__name__) |
| logger.warning(f"无法获取Bot ID: {e},使用配置名: {config_name}") |
| return f"config_{config_name}" |
| |
| |
| async def main(): |
| """主函数""" |
| # 解析命令行参数 |
| args = parse_arguments() |
| |
| setup_logging() |
| logger = logging.getLogger(__name__) |
| |
| try: |
| # 获取配置文件名 |
| config_name = os.getenv('CLAUDE_CONFIG', 'local') |
| logger.info(f"使用配置文件: {config_name}.toml") |
| |
| # 获取Bot ID用于实例隔离 |
| bot_id = await get_bot_id(config_name) |
| lock_name = f"telegram_bot_{bot_id}" |
| logger.info(f"Bot ID: {bot_id},锁名称: {lock_name}") |
| |
| # 使用进程锁防止同一Bot的多个实例运行 |
| with ProcessLock(lock_name): |
| try: |
| # 设置工作目录 |
| if args.working_dir: |
| working_dir = Path(args.working_dir) |
| logger.info(f"Bot工作目录: {working_dir}") |
| |
| # 检查CLAUDE.md文件 |
| claude_md_path = working_dir / "CLAUDE.md" |
| if claude_md_path.exists(): |
| logger.info(f"找到CLAUDE.md文件: {claude_md_path}") |
| # 设置环境变量,供Bot读取 |
| os.environ['CLAUDE_MD_PATH'] = str(claude_md_path) |
| else: |
| logger.warning(f"未找到CLAUDE.md文件: {claude_md_path}") |
| |
| # 创建并启动Bot |
| bot = TelegramBot(config_name=config_name) |
| |
| logger.info(f"正在启动Telegram Bot (ID: {bot_id})...") |
| await bot.start() |
| |
| except KeyboardInterrupt: |
| logger.info("收到退出信号") |
| except Exception as e: |
| logger.error(f"启动失败: {e}") |
| raise |
| finally: |
| logger.info("程序退出") |
| |
| except RuntimeError as e: |
| print(f"错误: {e}") |
| sys.exit(1) |
| |
| |
| if __name__ == "__main__": |
| try: |
| asyncio.run(main()) |
| except KeyboardInterrupt: |
| print("\n程序被用户中断") |
| sys.exit(0) |
| except Exception as e: |
| print(f"程序异常退出: {e}") |
| sys.exit(1) |