blob: 0292ba73cdcb191494d95e2ed9cb06302e5e368d [file] [log] [blame] [raw]
"""
Telegram Bot单元测试 - 文件处理器
"""
import pytest
import tempfile
import os
from pathlib import Path
from unittest.mock import Mock, patch, AsyncMock
import aiofiles
import sys
sys.path.insert(0, str(Path(__file__).parent.parent.parent.parent / "src"))
from claude_agent.telegram.file_handler import FileHandler
class TestFileHandler:
"""文件处理器测试"""
@pytest.fixture
def temp_dir(self):
"""创建临时目录"""
with tempfile.TemporaryDirectory() as tmpdir:
yield tmpdir
@pytest.fixture
def file_handler(self, temp_dir):
"""创建文件处理器实例"""
return FileHandler(temp_dir=temp_dir, max_file_size_mb=5)
def test_init(self, temp_dir):
"""测试初始化"""
handler = FileHandler(temp_dir=temp_dir, max_file_size_mb=10)
assert handler.temp_dir == Path(temp_dir)
assert handler.max_file_size_mb == 10
assert handler.max_file_size_bytes == 10 * 1024 * 1024
assert handler.temp_dir.exists()
@pytest.mark.asyncio
async def test_validate_file_valid(self, file_handler, temp_dir):
"""测试有效文件验证"""
# 创建测试文件
test_file = Path(temp_dir) / "test.txt"
test_file.write_text("Hello World")
result = await file_handler._validate_file(str(test_file))
assert result is True
@pytest.mark.asyncio
async def test_validate_file_not_exists(self, file_handler, temp_dir):
"""测试不存在的文件"""
non_existent = Path(temp_dir) / "not_exists.txt"
result = await file_handler._validate_file(str(non_existent))
assert result is False
@pytest.mark.asyncio
async def test_validate_file_too_large(self, file_handler, temp_dir):
"""测试文件过大"""
# 创建超大文件(模拟)
large_file = Path(temp_dir) / "large.txt"
large_file.write_text("x" * (6 * 1024 * 1024)) # 6MB,超过5MB限制
result = await file_handler._validate_file(str(large_file))
assert result is False
@pytest.mark.asyncio
async def test_validate_file_empty(self, file_handler, temp_dir):
"""测试空文件"""
empty_file = Path(temp_dir) / "empty.txt"
empty_file.touch() # 创建空文件
result = await file_handler._validate_file(str(empty_file))
assert result is False
@pytest.mark.asyncio
async def test_process_text_file(self, file_handler, temp_dir):
"""测试处理文本文件"""
# 创建文本文件
text_file = Path(temp_dir) / "test.txt"
content = "这是一个测试文件\n包含多行内容\n第三行"
text_file.write_text(content, encoding='utf-8')
result = await file_handler._process_text_file(str(text_file))
assert "📝 内容预览:" in result
assert "这是一个测试文件" in result
@pytest.mark.asyncio
async def test_process_text_file_long_content(self, file_handler, temp_dir):
"""测试处理长文本文件"""
# 创建长文本文件
text_file = Path(temp_dir) / "long.txt"
long_content = "x" * 1500 # 超过1000字符
text_file.write_text(long_content, encoding='utf-8')
result = await file_handler._process_text_file(str(text_file))
assert "📝 内容预览:" in result
assert "..." in result # 应该被截断
@pytest.mark.asyncio
async def test_process_text_file_encoding_error(self, file_handler, temp_dir):
"""测试编码错误处理"""
# 创建非UTF-8编码的文件
text_file = Path(temp_dir) / "gbk.txt"
content = "中文测试"
with open(text_file, 'w', encoding='gbk') as f:
f.write(content)
result = await file_handler._process_text_file(str(text_file))
# 应该尝试GBK编码或返回错误信息
assert ("📝 内容预览:" in result) or ("❌" in result)
@pytest.mark.asyncio
async def test_process_image_valid(self, file_handler, temp_dir):
"""测试处理有效图片"""
# 创建测试图片
from PIL import Image
img = Image.new('RGB', (100, 100), color='red')
img_path = Path(temp_dir) / "test.jpg"
img.save(img_path)
result = await file_handler.process_image(str(img_path))
assert "📸 图片信息:" in result
assert "格式: JPEG" in result
assert "尺寸: 100 × 100" in result
@pytest.mark.asyncio
async def test_process_image_invalid_format(self, file_handler, temp_dir):
"""测试不支持的图片格式"""
# 创建非图片文件
fake_img = Path(temp_dir) / "fake.xyz"
fake_img.write_text("not an image")
result = await file_handler.process_image(str(fake_img))
assert "❌ 不支持的图片格式" in result
@pytest.mark.asyncio
async def test_process_image_file_not_exists(self, file_handler, temp_dir):
"""测试处理不存在的图片"""
non_existent = Path(temp_dir) / "not_exists.jpg"
result = await file_handler.process_image(str(non_existent))
assert "❌ 图片文件无效或过大" in result
@pytest.mark.asyncio
async def test_process_document_text(self, file_handler, temp_dir):
"""测试处理文本文档"""
# 创建.txt文档
doc_path = Path(temp_dir) / "document.txt"
doc_path.write_text("文档内容测试")
result = await file_handler.process_document(str(doc_path))
assert "📄 文档信息:" in result
assert "文件名: document.txt" in result
assert "格式: .TXT" in result
assert "📝 内容预览:" in result
@pytest.mark.asyncio
async def test_process_document_markdown(self, file_handler, temp_dir):
"""测试处理Markdown文档"""
# 创建.md文档
md_path = Path(temp_dir) / "readme.md"
md_content = "# 标题\n\n这是一个Markdown文档"
md_path.write_text(md_content)
result = await file_handler.process_document(str(md_path))
assert "📄 文档信息:" in result
assert "文件名: readme.md" in result
assert "格式: .MD" in result
assert "# 标题" in result
@pytest.mark.asyncio
async def test_process_document_unsupported(self, file_handler, temp_dir):
"""测试不支持的文档格式"""
# 创建不支持的文档
unsupported = Path(temp_dir) / "test.xyz"
unsupported.write_text("unsupported format")
result = await file_handler.process_document(str(unsupported))
assert "❌ 不支持的文档格式" in result
@pytest.mark.asyncio
async def test_generate_image(self, file_handler):
"""测试图片生成"""
prompt = "生成一张蓝色背景的图片"
result = await file_handler.generate_image(prompt)
assert isinstance(result, bytes)
assert len(result) > 0
# 验证生成的是有效的PNG图片
from PIL import Image
import io
img = Image.open(io.BytesIO(result))
assert img.format == 'PNG'
assert img.size == (512, 512)
def test_make_safe_filename(self, file_handler):
"""测试安全文件名生成"""
# 测试正常文件名
safe1 = file_handler._make_safe_filename("normal_file.txt")
assert safe1 == "normal_file.txt"
# 测试包含非法字符的文件名
unsafe = "file<>:\"/\\|?*.txt"
safe2 = file_handler._make_safe_filename(unsafe)
assert "<" not in safe2
assert ">" not in safe2
assert ":" not in safe2
# 测试长文件名
long_name = "a" * 150 + ".txt"
safe3 = file_handler._make_safe_filename(long_name)
assert len(safe3) <= 100
assert safe3.endswith(".txt")
@pytest.mark.asyncio
async def test_save_telegram_file(self, file_handler):
"""测试保存Telegram文件"""
# Mock Telegram文件对象
mock_telegram_file = Mock()
mock_telegram_file.download_to_drive = AsyncMock()
filename = "test_file.jpg"
result_path = await file_handler.save_telegram_file(mock_telegram_file, filename)
# 验证调用
mock_telegram_file.download_to_drive.assert_called_once()
assert result_path.endswith("test_file.jpg")
assert str(file_handler.temp_dir) in result_path
@pytest.mark.asyncio
async def test_save_telegram_file_error(self, file_handler):
"""测试保存文件出错"""
# Mock Telegram文件对象,抛出异常
mock_telegram_file = Mock()
mock_telegram_file.download_to_drive = AsyncMock(side_effect=Exception("Download failed"))
with pytest.raises(Exception, match="Download failed"):
await file_handler.save_telegram_file(mock_telegram_file, "test.jpg")
@pytest.mark.asyncio
async def test_cleanup_temp_files(self, file_handler, temp_dir):
"""测试清理临时文件"""
import time
# 创建测试文件
old_file = Path(temp_dir) / "old_file.txt"
new_file = Path(temp_dir) / "new_file.txt"
old_file.write_text("old content")
new_file.write_text("new content")
# 修改旧文件的修改时间
old_time = time.time() - (25 * 3600) # 25小时前
os.utime(old_file, (old_time, old_time))
# 验证文件存在
assert old_file.exists()
assert new_file.exists()
# 清理24小时前的文件
await file_handler.cleanup_temp_files(max_age_hours=24)
# 验证旧文件被删除,新文件保留
assert not old_file.exists()
assert new_file.exists()
@pytest.mark.asyncio
async def test_cleanup_temp_files_error(self, file_handler, temp_dir):
"""测试清理文件时的错误处理"""
# 创建一个无法删除的文件(通过mock)
test_file = Path(temp_dir) / "test.txt"
test_file.write_text("content")
with patch.object(Path, 'unlink', side_effect=PermissionError("Permission denied")):
# 应该不抛出异常,只记录日志
await file_handler.cleanup_temp_files(max_age_hours=0)
@pytest.mark.asyncio
async def test_multiple_file_types_in_temp_dir(self, file_handler, temp_dir):
"""测试临时目录中的多种文件类型处理"""
# 创建不同类型的文件
txt_file = Path(temp_dir) / "document.txt"
jpg_file = Path(temp_dir) / "image.jpg"
pdf_file = Path(temp_dir) / "document.pdf"
txt_file.write_text("Text content")
# 创建图片文件
from PIL import Image
img = Image.new('RGB', (50, 50), color='blue')
img.save(jpg_file)
# 创建伪PDF文件
pdf_file.write_bytes(b'%PDF fake pdf content')
# 测试处理不同类型的文件
txt_result = await file_handler.process_document(str(txt_file))
img_result = await file_handler.process_image(str(jpg_file))
pdf_result = await file_handler.process_document(str(pdf_file))
assert "📝 内容预览:" in txt_result
assert "📸 图片信息:" in img_result
assert "📄 PDF文档已接收" in pdf_result
@pytest.mark.asyncio
async def test_process_image_corrupted_file(self, file_handler, temp_dir):
"""测试处理损坏的图片文件"""
# 创建损坏的图片文件
corrupted_img = Path(temp_dir) / "corrupted.jpg"
corrupted_img.write_bytes(b"not a real image file")
result = await file_handler.process_image(str(corrupted_img))
assert "❌ 图片处理失败:" in result
@pytest.mark.asyncio
async def test_process_document_pdf_word(self, file_handler, temp_dir):
"""测试PDF和Word文档处理"""
# 测试PDF
pdf_file = Path(temp_dir) / "test.pdf"
pdf_file.write_bytes(b'%PDF-1.4 fake pdf')
pdf_result = await file_handler.process_document(str(pdf_file))
assert "📄 文档信息:" in pdf_result
assert "格式: .PDF" in pdf_result
# 测试Word文档
docx_file = Path(temp_dir) / "test.docx"
docx_file.write_bytes(b'fake word document')
word_result = await file_handler.process_document(str(docx_file))
assert "📄 文档信息:" in word_result
assert "格式: .DOCX" in word_result
@pytest.mark.asyncio
async def test_generate_image_with_pil_features(self, file_handler):
"""测试图片生成的PIL功能"""
# 测试正常生成
with patch('PIL.ImageDraw.Draw') as mock_draw_class:
with patch('PIL.ImageFont.load_default') as mock_font:
mock_draw = Mock()
mock_draw_class.return_value = mock_draw
mock_font.return_value = Mock()
result = await file_handler.generate_image("测试图片")
assert isinstance(result, bytes)
mock_draw.text.assert_called_once()
@pytest.mark.asyncio
async def test_generate_image_font_loading_error(self, file_handler):
"""测试字体加载失败"""
with patch('PIL.ImageDraw.Draw') as mock_draw_class:
with patch('PIL.ImageFont.load_default', side_effect=Exception("Font error")):
mock_draw = Mock()
mock_draw_class.return_value = mock_draw
result = await file_handler.generate_image("测试")
assert isinstance(result, bytes)
# 应该仍能生成图片,只是没有文本
@pytest.mark.asyncio
async def test_generate_image_pil_import_error(self, file_handler):
"""测试PIL导入错误"""
with patch('PIL.ImageDraw.Draw', side_effect=ImportError("PIL unavailable")):
result = await file_handler.generate_image("测试")
assert isinstance(result, bytes)
def test_supported_format_validation(self, file_handler):
"""测试支持的文件格式验证"""
# 验证图片格式
assert '.jpg' in file_handler.supported_image_formats
assert '.jpeg' in file_handler.supported_image_formats
assert '.png' in file_handler.supported_image_formats
assert '.gif' in file_handler.supported_image_formats
assert '.webp' in file_handler.supported_image_formats
assert '.bmp' in file_handler.supported_image_formats
# 验证文档格式
assert '.pdf' in file_handler.supported_document_formats
assert '.doc' in file_handler.supported_document_formats
assert '.docx' in file_handler.supported_document_formats
assert '.txt' in file_handler.supported_document_formats
assert '.md' in file_handler.supported_document_formats
assert '.rtf' in file_handler.supported_document_formats
@pytest.mark.asyncio
async def test_save_telegram_file_with_unsafe_filename(self, file_handler):
"""测试保存不安全文件名的Telegram文件"""
mock_telegram_file = Mock()
mock_telegram_file.download_to_drive = AsyncMock()
unsafe_filename = "test<>:\"/\\|?*file.jpg"
result_path = await file_handler.save_telegram_file(mock_telegram_file, unsafe_filename)
# 验证不安全字符被清理
assert "<" not in result_path
assert ">" not in result_path
assert ":" not in result_path
assert "?" not in result_path
assert "*" not in result_path
@pytest.mark.asyncio
async def test_validate_file_permission_error(self, file_handler, temp_dir):
"""测试文件权限错误处理"""
# 创建测试文件
test_file = Path(temp_dir) / "test.txt"
test_file.write_text("content")
# Mock os.path.exists和os.path.getsize来模拟权限错误
with patch('os.path.exists', return_value=True):
with patch('os.path.getsize', side_effect=PermissionError("Permission denied")):
result = await file_handler._validate_file(str(test_file))
assert result is False
@pytest.mark.asyncio
async def test_process_text_file_read_exception(self, file_handler, temp_dir):
"""测试文本文件读取异常处理"""
text_file = Path(temp_dir) / "test.txt"
text_file.write_text("content")
# Mock aiofiles.open抛出异常
with patch('aiofiles.open', side_effect=Exception("Read error")):
result = await file_handler._process_text_file(str(text_file))
assert "❌ 文本文件处理失败: Read error" in result
@pytest.mark.asyncio
async def test_cleanup_temp_files_no_files_to_clean(self, file_handler, temp_dir):
"""测试清理空目录"""
# 空目录
await file_handler.cleanup_temp_files(max_age_hours=24)
# 不应该出错
@pytest.mark.asyncio
async def test_cleanup_temp_files_iterdir_error(self, file_handler, temp_dir):
"""测试目录遍历错误"""
with patch.object(Path, 'iterdir', side_effect=PermissionError("Access denied")):
# 应该捕获异常并继续
await file_handler.cleanup_temp_files(max_age_hours=24)