blob: ad2d2156a40b5b671d3089782b794ec4478b12b2 [file] [log] [blame] [raw]
"""
文件处理器
处理图片和文档文件
"""
import os
import aiofiles
import logging
from pathlib import Path
from typing import Optional
from PIL import Image
import base64
import io
from .interfaces import IFileHandler
logger = logging.getLogger(__name__)
class FileHandler(IFileHandler):
"""文件处理器实现"""
def __init__(self, temp_dir: str, max_file_size_mb: int = 20):
"""
初始化文件处理器
Args:
temp_dir: 临时文件目录
max_file_size_mb: 最大文件大小(MB)
"""
self.temp_dir = Path(temp_dir)
self.max_file_size_mb = max_file_size_mb
self.max_file_size_bytes = max_file_size_mb * 1024 * 1024
# 确保临时目录存在
self.temp_dir.mkdir(parents=True, exist_ok=True)
# 支持的文件格式
self.supported_image_formats = {'.jpg', '.jpeg', '.png', '.gif', '.webp', '.bmp'}
self.supported_document_formats = {'.pdf', '.doc', '.docx', '.txt', '.md', '.rtf'}
async def process_image(self, file_path: str) -> str:
"""
处理图片文件,返回描述文本
Args:
file_path: 图片文件路径
Returns:
图片描述文本
"""
try:
# 验证文件存在和大小
if not await self._validate_file(file_path):
return "❌ 图片文件无效或过大"
# 验证是否为支持的图片格式
file_ext = Path(file_path).suffix.lower()
if file_ext not in self.supported_image_formats:
return f"❌ 不支持的图片格式: {file_ext}"
# 获取图片基本信息
with Image.open(file_path) as img:
width, height = img.size
format_name = img.format
mode = img.mode
# 计算文件大小
file_size = os.path.getsize(file_path)
file_size_mb = file_size / (1024 * 1024)
# 生成图片描述
description = f"📸 图片信息:\n"
description += f"• 格式: {format_name}\n"
description += f"• 尺寸: {width} × {height}\n"
description += f"• 颜色模式: {mode}\n"
description += f"• 文件大小: {file_size_mb:.2f} MB\n"
# 这里可以集成图像识别API,如Claude Vision等
# 暂时返回基本信息
description += f"\n💡 已接收到您的图片,如需详细分析请告诉我您想了解什么。"
return description
except Exception as e:
logger.error(f"图片处理错误: {e}")
return f"❌ 图片处理失败: {str(e)}"
async def process_document(self, file_path: str) -> str:
"""
处理文档文件,返回内容摘要
Args:
file_path: 文档文件路径
Returns:
文档内容摘要
"""
try:
# 验证文件存在和大小
if not await self._validate_file(file_path):
return "❌ 文档文件无效或过大"
file_path_obj = Path(file_path)
file_ext = file_path_obj.suffix.lower()
# 验证是否为支持的文档格式
if file_ext not in self.supported_document_formats:
return f"❌ 不支持的文档格式: {file_ext}"
# 根据文件类型处理
if file_ext in {'.txt', '.md'}:
content = await self._process_text_file(file_path)
elif file_ext == '.pdf':
content = await self._process_pdf_file(file_path)
elif file_ext in {'.doc', '.docx'}:
content = await self._process_word_file(file_path)
else:
content = f"📄 文档已接收({file_ext}格式)"
# 计算文件大小
file_size = os.path.getsize(file_path)
file_size_mb = file_size / (1024 * 1024)
result = f"📄 文档信息:\n"
result += f"• 文件名: {file_path_obj.name}\n"
result += f"• 格式: {file_ext.upper()}\n"
result += f"• 大小: {file_size_mb:.2f} MB\n\n"
result += content
return result
except Exception as e:
logger.error(f"文档处理错误: {e}")
return f"❌ 文档处理失败: {str(e)}"
async def generate_image(self, prompt: str) -> bytes:
"""
生成图片,返回图片数据
Args:
prompt: 生成提示
Returns:
图片字节数据
"""
try:
# 这里可以集成图像生成API,如DALL-E、Midjourney等
# 暂时生成一个简单的占位图片
img = Image.new('RGB', (512, 512), color='lightblue')
# 添加文本
try:
from PIL import ImageDraw, ImageFont
draw = ImageDraw.Draw(img)
# 尝试使用默认字体
try:
font = ImageFont.load_default()
except:
font = None
text = f"生成图片\n{prompt[:50]}..."
draw.text((50, 250), text, fill='black', font=font)
except ImportError:
pass # 如果PIL不完整,跳过文本添加
# 转换为字节
img_bytes = io.BytesIO()
img.save(img_bytes, format='PNG')
return img_bytes.getvalue()
except Exception as e:
logger.error(f"图片生成错误: {e}")
raise
async def _validate_file(self, file_path: str) -> bool:
"""
验证文件是否有效
Args:
file_path: 文件路径
Returns:
是否有效
"""
try:
if not os.path.exists(file_path):
return False
file_size = os.path.getsize(file_path)
return file_size <= self.max_file_size_bytes and file_size > 0
except Exception:
return False
async def _process_text_file(self, file_path: str) -> str:
"""处理文本文件"""
try:
async with aiofiles.open(file_path, 'r', encoding='utf-8') as f:
content = await f.read()
# 截取前1000个字符作为预览
if len(content) > 1000:
preview = content[:1000] + "..."
else:
preview = content
return f"📝 内容预览:\n{preview}"
except UnicodeDecodeError:
# 尝试其他编码
try:
async with aiofiles.open(file_path, 'r', encoding='gbk') as f:
content = await f.read()
preview = content[:1000] + "..." if len(content) > 1000 else content
return f"📝 内容预览:\n{preview}"
except:
return "❌ 无法读取文本文件(编码问题)"
except Exception as e:
return f"❌ 文本文件处理失败: {str(e)}"
async def _process_pdf_file(self, file_path: str) -> str:
"""处理PDF文件"""
try:
# 这里可以集成PDF处理库,如PyPDF2、pdfplumber等
return "📄 PDF文档已接收,如需详细内容分析请告诉我您的具体需求。"
except Exception as e:
return f"❌ PDF处理失败: {str(e)}"
async def _process_word_file(self, file_path: str) -> str:
"""处理Word文档"""
try:
# 这里可以集成Word处理库,如python-docx等
return "📄 Word文档已接收,如需详细内容分析请告诉我您的具体需求。"
except Exception as e:
return f"❌ Word文档处理失败: {str(e)}"
async def save_telegram_file(self, telegram_file, filename: str) -> str:
"""
保存Telegram文件到本地
Args:
telegram_file: Telegram文件对象
filename: 文件名
Returns:
本地文件路径
"""
try:
# 生成安全的文件名
safe_filename = self._make_safe_filename(filename)
local_path = self.temp_dir / safe_filename
# 下载文件
await telegram_file.download_to_drive(str(local_path))
return str(local_path)
except Exception as e:
logger.error(f"文件保存失败: {e}")
raise
def _make_safe_filename(self, filename: str) -> str:
"""
生成安全的文件名
Args:
filename: 原始文件名
Returns:
安全的文件名
"""
import re
import uuid
# 移除不安全的字符
safe_name = re.sub(r'[<>:"/\\|?*]', '_', filename)
# 如果文件名过长,截断并添加UUID
if len(safe_name) > 100:
name_part = safe_name[:50]
ext_part = Path(safe_name).suffix
unique_id = str(uuid.uuid4())[:8]
safe_name = f"{name_part}_{unique_id}{ext_part}"
return safe_name
async def cleanup_temp_files(self, max_age_hours: int = 24):
"""
清理临时文件
Args:
max_age_hours: 文件最大保留时间(小时)
"""
try:
import time
current_time = time.time()
cutoff_time = current_time - (max_age_hours * 3600)
for file_path in self.temp_dir.iterdir():
if file_path.is_file():
file_mtime = file_path.stat().st_mtime
if file_mtime < cutoff_time:
file_path.unlink()
logger.info(f"已清理临时文件: {file_path}")
except Exception as e:
logger.error(f"清理临时文件失败: {e}")