| """ |
| 文件处理器 |
| 处理图片和文档文件 |
| """ |
| |
| import os |
| import aiofiles |
| import logging |
| from pathlib import Path |
| from typing import Optional |
| from PIL import Image |
| import base64 |
| import io |
| |
| from .interfaces import IFileHandler |
| |
| |
| logger = logging.getLogger(__name__) |
| |
| |
| class FileHandler(IFileHandler): |
| """文件处理器实现""" |
| |
| def __init__(self, temp_dir: str, max_file_size_mb: int = 20): |
| """ |
| 初始化文件处理器 |
| |
| Args: |
| temp_dir: 临时文件目录 |
| max_file_size_mb: 最大文件大小(MB) |
| """ |
| self.temp_dir = Path(temp_dir) |
| self.max_file_size_mb = max_file_size_mb |
| self.max_file_size_bytes = max_file_size_mb * 1024 * 1024 |
| |
| # 确保临时目录存在 |
| self.temp_dir.mkdir(parents=True, exist_ok=True) |
| |
| # 支持的文件格式 |
| self.supported_image_formats = {'.jpg', '.jpeg', '.png', '.gif', '.webp', '.bmp'} |
| self.supported_document_formats = {'.pdf', '.doc', '.docx', '.txt', '.md', '.rtf'} |
| |
| async def process_image(self, file_path: str) -> str: |
| """ |
| 处理图片文件,返回描述文本 |
| |
| Args: |
| file_path: 图片文件路径 |
| |
| Returns: |
| 图片描述文本 |
| """ |
| try: |
| # 验证文件存在和大小 |
| if not await self._validate_file(file_path): |
| return "❌ 图片文件无效或过大" |
| |
| # 验证是否为支持的图片格式 |
| file_ext = Path(file_path).suffix.lower() |
| if file_ext not in self.supported_image_formats: |
| return f"❌ 不支持的图片格式: {file_ext}" |
| |
| # 获取图片基本信息 |
| with Image.open(file_path) as img: |
| width, height = img.size |
| format_name = img.format |
| mode = img.mode |
| |
| # 计算文件大小 |
| file_size = os.path.getsize(file_path) |
| file_size_mb = file_size / (1024 * 1024) |
| |
| # 生成图片描述 |
| description = f"📸 图片信息:\n" |
| description += f"• 格式: {format_name}\n" |
| description += f"• 尺寸: {width} × {height}\n" |
| description += f"• 颜色模式: {mode}\n" |
| description += f"• 文件大小: {file_size_mb:.2f} MB\n" |
| |
| # 这里可以集成图像识别API,如Claude Vision等 |
| # 暂时返回基本信息 |
| description += f"\n💡 已接收到您的图片,如需详细分析请告诉我您想了解什么。" |
| |
| return description |
| |
| except Exception as e: |
| logger.error(f"图片处理错误: {e}") |
| return f"❌ 图片处理失败: {str(e)}" |
| |
| async def process_document(self, file_path: str) -> str: |
| """ |
| 处理文档文件,返回内容摘要 |
| |
| Args: |
| file_path: 文档文件路径 |
| |
| Returns: |
| 文档内容摘要 |
| """ |
| try: |
| # 验证文件存在和大小 |
| if not await self._validate_file(file_path): |
| return "❌ 文档文件无效或过大" |
| |
| file_path_obj = Path(file_path) |
| file_ext = file_path_obj.suffix.lower() |
| |
| # 验证是否为支持的文档格式 |
| if file_ext not in self.supported_document_formats: |
| return f"❌ 不支持的文档格式: {file_ext}" |
| |
| # 根据文件类型处理 |
| if file_ext in {'.txt', '.md'}: |
| content = await self._process_text_file(file_path) |
| elif file_ext == '.pdf': |
| content = await self._process_pdf_file(file_path) |
| elif file_ext in {'.doc', '.docx'}: |
| content = await self._process_word_file(file_path) |
| else: |
| content = f"📄 文档已接收({file_ext}格式)" |
| |
| # 计算文件大小 |
| file_size = os.path.getsize(file_path) |
| file_size_mb = file_size / (1024 * 1024) |
| |
| result = f"📄 文档信息:\n" |
| result += f"• 文件名: {file_path_obj.name}\n" |
| result += f"• 格式: {file_ext.upper()}\n" |
| result += f"• 大小: {file_size_mb:.2f} MB\n\n" |
| result += content |
| |
| return result |
| |
| except Exception as e: |
| logger.error(f"文档处理错误: {e}") |
| return f"❌ 文档处理失败: {str(e)}" |
| |
| async def generate_image(self, prompt: str) -> bytes: |
| """ |
| 生成图片,返回图片数据 |
| |
| Args: |
| prompt: 生成提示 |
| |
| Returns: |
| 图片字节数据 |
| """ |
| try: |
| # 这里可以集成图像生成API,如DALL-E、Midjourney等 |
| # 暂时生成一个简单的占位图片 |
| img = Image.new('RGB', (512, 512), color='lightblue') |
| |
| # 添加文本 |
| try: |
| from PIL import ImageDraw, ImageFont |
| draw = ImageDraw.Draw(img) |
| |
| # 尝试使用默认字体 |
| try: |
| font = ImageFont.load_default() |
| except: |
| font = None |
| |
| text = f"生成图片\n{prompt[:50]}..." |
| draw.text((50, 250), text, fill='black', font=font) |
| except ImportError: |
| pass # 如果PIL不完整,跳过文本添加 |
| |
| # 转换为字节 |
| img_bytes = io.BytesIO() |
| img.save(img_bytes, format='PNG') |
| return img_bytes.getvalue() |
| |
| except Exception as e: |
| logger.error(f"图片生成错误: {e}") |
| raise |
| |
| async def _validate_file(self, file_path: str) -> bool: |
| """ |
| 验证文件是否有效 |
| |
| Args: |
| file_path: 文件路径 |
| |
| Returns: |
| 是否有效 |
| """ |
| try: |
| if not os.path.exists(file_path): |
| return False |
| |
| file_size = os.path.getsize(file_path) |
| return file_size <= self.max_file_size_bytes and file_size > 0 |
| |
| except Exception: |
| return False |
| |
| async def _process_text_file(self, file_path: str) -> str: |
| """处理文本文件""" |
| try: |
| async with aiofiles.open(file_path, 'r', encoding='utf-8') as f: |
| content = await f.read() |
| |
| # 截取前1000个字符作为预览 |
| if len(content) > 1000: |
| preview = content[:1000] + "..." |
| else: |
| preview = content |
| |
| return f"📝 内容预览:\n{preview}" |
| |
| except UnicodeDecodeError: |
| # 尝试其他编码 |
| try: |
| async with aiofiles.open(file_path, 'r', encoding='gbk') as f: |
| content = await f.read() |
| preview = content[:1000] + "..." if len(content) > 1000 else content |
| return f"📝 内容预览:\n{preview}" |
| except: |
| return "❌ 无法读取文本文件(编码问题)" |
| |
| except Exception as e: |
| return f"❌ 文本文件处理失败: {str(e)}" |
| |
| async def _process_pdf_file(self, file_path: str) -> str: |
| """处理PDF文件""" |
| try: |
| # 这里可以集成PDF处理库,如PyPDF2、pdfplumber等 |
| return "📄 PDF文档已接收,如需详细内容分析请告诉我您的具体需求。" |
| except Exception as e: |
| return f"❌ PDF处理失败: {str(e)}" |
| |
| async def _process_word_file(self, file_path: str) -> str: |
| """处理Word文档""" |
| try: |
| # 这里可以集成Word处理库,如python-docx等 |
| return "📄 Word文档已接收,如需详细内容分析请告诉我您的具体需求。" |
| except Exception as e: |
| return f"❌ Word文档处理失败: {str(e)}" |
| |
| async def save_telegram_file(self, telegram_file, filename: str) -> str: |
| """ |
| 保存Telegram文件到本地 |
| |
| Args: |
| telegram_file: Telegram文件对象 |
| filename: 文件名 |
| |
| Returns: |
| 本地文件路径 |
| """ |
| try: |
| # 生成安全的文件名 |
| safe_filename = self._make_safe_filename(filename) |
| local_path = self.temp_dir / safe_filename |
| |
| # 下载文件 |
| await telegram_file.download_to_drive(str(local_path)) |
| |
| return str(local_path) |
| |
| except Exception as e: |
| logger.error(f"文件保存失败: {e}") |
| raise |
| |
| def _make_safe_filename(self, filename: str) -> str: |
| """ |
| 生成安全的文件名 |
| |
| Args: |
| filename: 原始文件名 |
| |
| Returns: |
| 安全的文件名 |
| """ |
| import re |
| import uuid |
| |
| # 移除不安全的字符 |
| safe_name = re.sub(r'[<>:"/\\|?*]', '_', filename) |
| |
| # 如果文件名过长,截断并添加UUID |
| if len(safe_name) > 100: |
| name_part = safe_name[:50] |
| ext_part = Path(safe_name).suffix |
| unique_id = str(uuid.uuid4())[:8] |
| safe_name = f"{name_part}_{unique_id}{ext_part}" |
| |
| return safe_name |
| |
| async def cleanup_temp_files(self, max_age_hours: int = 24): |
| """ |
| 清理临时文件 |
| |
| Args: |
| max_age_hours: 文件最大保留时间(小时) |
| """ |
| try: |
| import time |
| current_time = time.time() |
| cutoff_time = current_time - (max_age_hours * 3600) |
| |
| for file_path in self.temp_dir.iterdir(): |
| if file_path.is_file(): |
| file_mtime = file_path.stat().st_mtime |
| if file_mtime < cutoff_time: |
| file_path.unlink() |
| logger.info(f"已清理临时文件: {file_path}") |
| |
| except Exception as e: |
| logger.error(f"清理临时文件失败: {e}") |