| """ |
| Telegram Bot单元测试 - 文件处理器 |
| """ |
| |
| import pytest |
| import tempfile |
| import os |
| from pathlib import Path |
| from unittest.mock import Mock, patch, AsyncMock |
| import aiofiles |
| |
| import sys |
| sys.path.insert(0, str(Path(__file__).parent.parent.parent.parent / "src")) |
| |
| from claude_agent.telegram.file_handler import FileHandler |
| |
| |
| class TestFileHandler: |
| """文件处理器测试""" |
| |
| @pytest.fixture |
| def temp_dir(self): |
| """创建临时目录""" |
| with tempfile.TemporaryDirectory() as tmpdir: |
| yield tmpdir |
| |
| @pytest.fixture |
| def file_handler(self, temp_dir): |
| """创建文件处理器实例""" |
| return FileHandler(temp_dir=temp_dir, max_file_size_mb=5) |
| |
| def test_init(self, temp_dir): |
| """测试初始化""" |
| handler = FileHandler(temp_dir=temp_dir, max_file_size_mb=10) |
| assert handler.temp_dir == Path(temp_dir) |
| assert handler.max_file_size_mb == 10 |
| assert handler.max_file_size_bytes == 10 * 1024 * 1024 |
| assert handler.temp_dir.exists() |
| |
| @pytest.mark.asyncio |
| async def test_validate_file_valid(self, file_handler, temp_dir): |
| """测试有效文件验证""" |
| # 创建测试文件 |
| test_file = Path(temp_dir) / "test.txt" |
| test_file.write_text("Hello World") |
| |
| result = await file_handler._validate_file(str(test_file)) |
| assert result is True |
| |
| @pytest.mark.asyncio |
| async def test_validate_file_not_exists(self, file_handler, temp_dir): |
| """测试不存在的文件""" |
| non_existent = Path(temp_dir) / "not_exists.txt" |
| |
| result = await file_handler._validate_file(str(non_existent)) |
| assert result is False |
| |
| @pytest.mark.asyncio |
| async def test_validate_file_too_large(self, file_handler, temp_dir): |
| """测试文件过大""" |
| # 创建超大文件(模拟) |
| large_file = Path(temp_dir) / "large.txt" |
| large_file.write_text("x" * (6 * 1024 * 1024)) # 6MB,超过5MB限制 |
| |
| result = await file_handler._validate_file(str(large_file)) |
| assert result is False |
| |
| @pytest.mark.asyncio |
| async def test_validate_file_empty(self, file_handler, temp_dir): |
| """测试空文件""" |
| empty_file = Path(temp_dir) / "empty.txt" |
| empty_file.touch() # 创建空文件 |
| |
| result = await file_handler._validate_file(str(empty_file)) |
| assert result is False |
| |
| @pytest.mark.asyncio |
| async def test_process_text_file(self, file_handler, temp_dir): |
| """测试处理文本文件""" |
| # 创建文本文件 |
| text_file = Path(temp_dir) / "test.txt" |
| content = "这是一个测试文件\n包含多行内容\n第三行" |
| text_file.write_text(content, encoding='utf-8') |
| |
| result = await file_handler._process_text_file(str(text_file)) |
| assert "📝 内容预览:" in result |
| assert "这是一个测试文件" in result |
| |
| @pytest.mark.asyncio |
| async def test_process_text_file_long_content(self, file_handler, temp_dir): |
| """测试处理长文本文件""" |
| # 创建长文本文件 |
| text_file = Path(temp_dir) / "long.txt" |
| long_content = "x" * 1500 # 超过1000字符 |
| text_file.write_text(long_content, encoding='utf-8') |
| |
| result = await file_handler._process_text_file(str(text_file)) |
| assert "📝 内容预览:" in result |
| assert "..." in result # 应该被截断 |
| |
| @pytest.mark.asyncio |
| async def test_process_text_file_encoding_error(self, file_handler, temp_dir): |
| """测试编码错误处理""" |
| # 创建非UTF-8编码的文件 |
| text_file = Path(temp_dir) / "gbk.txt" |
| content = "中文测试" |
| with open(text_file, 'w', encoding='gbk') as f: |
| f.write(content) |
| |
| result = await file_handler._process_text_file(str(text_file)) |
| # 应该尝试GBK编码或返回错误信息 |
| assert ("📝 内容预览:" in result) or ("❌" in result) |
| |
| @pytest.mark.asyncio |
| async def test_process_image_valid(self, file_handler, temp_dir): |
| """测试处理有效图片""" |
| # 创建测试图片 |
| from PIL import Image |
| img = Image.new('RGB', (100, 100), color='red') |
| img_path = Path(temp_dir) / "test.jpg" |
| img.save(img_path) |
| |
| result = await file_handler.process_image(str(img_path)) |
| assert "📸 图片信息:" in result |
| assert "格式: JPEG" in result |
| assert "尺寸: 100 × 100" in result |
| |
| @pytest.mark.asyncio |
| async def test_process_image_invalid_format(self, file_handler, temp_dir): |
| """测试不支持的图片格式""" |
| # 创建非图片文件 |
| fake_img = Path(temp_dir) / "fake.xyz" |
| fake_img.write_text("not an image") |
| |
| result = await file_handler.process_image(str(fake_img)) |
| assert "❌ 不支持的图片格式" in result |
| |
| @pytest.mark.asyncio |
| async def test_process_image_file_not_exists(self, file_handler, temp_dir): |
| """测试处理不存在的图片""" |
| non_existent = Path(temp_dir) / "not_exists.jpg" |
| |
| result = await file_handler.process_image(str(non_existent)) |
| assert "❌ 图片文件无效或过大" in result |
| |
| @pytest.mark.asyncio |
| async def test_process_document_text(self, file_handler, temp_dir): |
| """测试处理文本文档""" |
| # 创建.txt文档 |
| doc_path = Path(temp_dir) / "document.txt" |
| doc_path.write_text("文档内容测试") |
| |
| result = await file_handler.process_document(str(doc_path)) |
| assert "📄 文档信息:" in result |
| assert "文件名: document.txt" in result |
| assert "格式: .TXT" in result |
| assert "📝 内容预览:" in result |
| |
| @pytest.mark.asyncio |
| async def test_process_document_markdown(self, file_handler, temp_dir): |
| """测试处理Markdown文档""" |
| # 创建.md文档 |
| md_path = Path(temp_dir) / "readme.md" |
| md_content = "# 标题\n\n这是一个Markdown文档" |
| md_path.write_text(md_content) |
| |
| result = await file_handler.process_document(str(md_path)) |
| assert "📄 文档信息:" in result |
| assert "文件名: readme.md" in result |
| assert "格式: .MD" in result |
| assert "# 标题" in result |
| |
| @pytest.mark.asyncio |
| async def test_process_document_unsupported(self, file_handler, temp_dir): |
| """测试不支持的文档格式""" |
| # 创建不支持的文档 |
| unsupported = Path(temp_dir) / "test.xyz" |
| unsupported.write_text("unsupported format") |
| |
| result = await file_handler.process_document(str(unsupported)) |
| assert "❌ 不支持的文档格式" in result |
| |
| @pytest.mark.asyncio |
| async def test_generate_image(self, file_handler): |
| """测试图片生成""" |
| prompt = "生成一张蓝色背景的图片" |
| |
| result = await file_handler.generate_image(prompt) |
| assert isinstance(result, bytes) |
| assert len(result) > 0 |
| |
| # 验证生成的是有效的PNG图片 |
| from PIL import Image |
| import io |
| img = Image.open(io.BytesIO(result)) |
| assert img.format == 'PNG' |
| assert img.size == (512, 512) |
| |
| def test_make_safe_filename(self, file_handler): |
| """测试安全文件名生成""" |
| # 测试正常文件名 |
| safe1 = file_handler._make_safe_filename("normal_file.txt") |
| assert safe1 == "normal_file.txt" |
| |
| # 测试包含非法字符的文件名 |
| unsafe = "file<>:\"/\\|?*.txt" |
| safe2 = file_handler._make_safe_filename(unsafe) |
| assert "<" not in safe2 |
| assert ">" not in safe2 |
| assert ":" not in safe2 |
| |
| # 测试长文件名 |
| long_name = "a" * 150 + ".txt" |
| safe3 = file_handler._make_safe_filename(long_name) |
| assert len(safe3) <= 100 |
| assert safe3.endswith(".txt") |
| |
| @pytest.mark.asyncio |
| async def test_save_telegram_file(self, file_handler): |
| """测试保存Telegram文件""" |
| # Mock Telegram文件对象 |
| mock_telegram_file = Mock() |
| mock_telegram_file.download_to_drive = AsyncMock() |
| |
| filename = "test_file.jpg" |
| result_path = await file_handler.save_telegram_file(mock_telegram_file, filename) |
| |
| # 验证调用 |
| mock_telegram_file.download_to_drive.assert_called_once() |
| assert result_path.endswith("test_file.jpg") |
| assert str(file_handler.temp_dir) in result_path |
| |
| @pytest.mark.asyncio |
| async def test_save_telegram_file_error(self, file_handler): |
| """测试保存文件出错""" |
| # Mock Telegram文件对象,抛出异常 |
| mock_telegram_file = Mock() |
| mock_telegram_file.download_to_drive = AsyncMock(side_effect=Exception("Download failed")) |
| |
| with pytest.raises(Exception, match="Download failed"): |
| await file_handler.save_telegram_file(mock_telegram_file, "test.jpg") |
| |
| @pytest.mark.asyncio |
| async def test_cleanup_temp_files(self, file_handler, temp_dir): |
| """测试清理临时文件""" |
| import time |
| |
| # 创建测试文件 |
| old_file = Path(temp_dir) / "old_file.txt" |
| new_file = Path(temp_dir) / "new_file.txt" |
| |
| old_file.write_text("old content") |
| new_file.write_text("new content") |
| |
| # 修改旧文件的修改时间 |
| old_time = time.time() - (25 * 3600) # 25小时前 |
| os.utime(old_file, (old_time, old_time)) |
| |
| # 验证文件存在 |
| assert old_file.exists() |
| assert new_file.exists() |
| |
| # 清理24小时前的文件 |
| await file_handler.cleanup_temp_files(max_age_hours=24) |
| |
| # 验证旧文件被删除,新文件保留 |
| assert not old_file.exists() |
| assert new_file.exists() |
| |
| @pytest.mark.asyncio |
| async def test_cleanup_temp_files_error(self, file_handler, temp_dir): |
| """测试清理文件时的错误处理""" |
| # 创建一个无法删除的文件(通过mock) |
| test_file = Path(temp_dir) / "test.txt" |
| test_file.write_text("content") |
| |
| with patch.object(Path, 'unlink', side_effect=PermissionError("Permission denied")): |
| # 应该不抛出异常,只记录日志 |
| await file_handler.cleanup_temp_files(max_age_hours=0) |
| |
| @pytest.mark.asyncio |
| async def test_multiple_file_types_in_temp_dir(self, file_handler, temp_dir): |
| """测试临时目录中的多种文件类型处理""" |
| # 创建不同类型的文件 |
| txt_file = Path(temp_dir) / "document.txt" |
| jpg_file = Path(temp_dir) / "image.jpg" |
| pdf_file = Path(temp_dir) / "document.pdf" |
| |
| txt_file.write_text("Text content") |
| |
| # 创建图片文件 |
| from PIL import Image |
| img = Image.new('RGB', (50, 50), color='blue') |
| img.save(jpg_file) |
| |
| # 创建伪PDF文件 |
| pdf_file.write_bytes(b'%PDF fake pdf content') |
| |
| # 测试处理不同类型的文件 |
| txt_result = await file_handler.process_document(str(txt_file)) |
| img_result = await file_handler.process_image(str(jpg_file)) |
| pdf_result = await file_handler.process_document(str(pdf_file)) |
| |
| assert "📝 内容预览:" in txt_result |
| assert "📸 图片信息:" in img_result |
| assert "📄 PDF文档已接收" in pdf_result |
| |
| @pytest.mark.asyncio |
| async def test_process_image_corrupted_file(self, file_handler, temp_dir): |
| """测试处理损坏的图片文件""" |
| # 创建损坏的图片文件 |
| corrupted_img = Path(temp_dir) / "corrupted.jpg" |
| corrupted_img.write_bytes(b"not a real image file") |
| |
| result = await file_handler.process_image(str(corrupted_img)) |
| assert "❌ 图片处理失败:" in result |
| |
| @pytest.mark.asyncio |
| async def test_process_document_pdf_word(self, file_handler, temp_dir): |
| """测试PDF和Word文档处理""" |
| # 测试PDF |
| pdf_file = Path(temp_dir) / "test.pdf" |
| pdf_file.write_bytes(b'%PDF-1.4 fake pdf') |
| |
| pdf_result = await file_handler.process_document(str(pdf_file)) |
| assert "📄 文档信息:" in pdf_result |
| assert "格式: .PDF" in pdf_result |
| |
| # 测试Word文档 |
| docx_file = Path(temp_dir) / "test.docx" |
| docx_file.write_bytes(b'fake word document') |
| |
| word_result = await file_handler.process_document(str(docx_file)) |
| assert "📄 文档信息:" in word_result |
| assert "格式: .DOCX" in word_result |
| |
| @pytest.mark.asyncio |
| async def test_generate_image_with_pil_features(self, file_handler): |
| """测试图片生成的PIL功能""" |
| # 测试正常生成 |
| with patch('PIL.ImageDraw.Draw') as mock_draw_class: |
| with patch('PIL.ImageFont.load_default') as mock_font: |
| mock_draw = Mock() |
| mock_draw_class.return_value = mock_draw |
| mock_font.return_value = Mock() |
| |
| result = await file_handler.generate_image("测试图片") |
| assert isinstance(result, bytes) |
| mock_draw.text.assert_called_once() |
| |
| @pytest.mark.asyncio |
| async def test_generate_image_font_loading_error(self, file_handler): |
| """测试字体加载失败""" |
| with patch('PIL.ImageDraw.Draw') as mock_draw_class: |
| with patch('PIL.ImageFont.load_default', side_effect=Exception("Font error")): |
| mock_draw = Mock() |
| mock_draw_class.return_value = mock_draw |
| |
| result = await file_handler.generate_image("测试") |
| assert isinstance(result, bytes) |
| # 应该仍能生成图片,只是没有文本 |
| |
| @pytest.mark.asyncio |
| async def test_generate_image_pil_import_error(self, file_handler): |
| """测试PIL导入错误""" |
| with patch('PIL.ImageDraw.Draw', side_effect=ImportError("PIL unavailable")): |
| result = await file_handler.generate_image("测试") |
| assert isinstance(result, bytes) |
| |
| def test_supported_format_validation(self, file_handler): |
| """测试支持的文件格式验证""" |
| # 验证图片格式 |
| assert '.jpg' in file_handler.supported_image_formats |
| assert '.jpeg' in file_handler.supported_image_formats |
| assert '.png' in file_handler.supported_image_formats |
| assert '.gif' in file_handler.supported_image_formats |
| assert '.webp' in file_handler.supported_image_formats |
| assert '.bmp' in file_handler.supported_image_formats |
| |
| # 验证文档格式 |
| assert '.pdf' in file_handler.supported_document_formats |
| assert '.doc' in file_handler.supported_document_formats |
| assert '.docx' in file_handler.supported_document_formats |
| assert '.txt' in file_handler.supported_document_formats |
| assert '.md' in file_handler.supported_document_formats |
| assert '.rtf' in file_handler.supported_document_formats |
| |
| @pytest.mark.asyncio |
| async def test_save_telegram_file_with_unsafe_filename(self, file_handler): |
| """测试保存不安全文件名的Telegram文件""" |
| mock_telegram_file = Mock() |
| mock_telegram_file.download_to_drive = AsyncMock() |
| |
| unsafe_filename = "test<>:\"/\\|?*file.jpg" |
| result_path = await file_handler.save_telegram_file(mock_telegram_file, unsafe_filename) |
| |
| # 验证不安全字符被清理 |
| assert "<" not in result_path |
| assert ">" not in result_path |
| assert ":" not in result_path |
| assert "?" not in result_path |
| assert "*" not in result_path |
| |
| @pytest.mark.asyncio |
| async def test_validate_file_permission_error(self, file_handler, temp_dir): |
| """测试文件权限错误处理""" |
| # 创建测试文件 |
| test_file = Path(temp_dir) / "test.txt" |
| test_file.write_text("content") |
| |
| # Mock os.path.exists和os.path.getsize来模拟权限错误 |
| with patch('os.path.exists', return_value=True): |
| with patch('os.path.getsize', side_effect=PermissionError("Permission denied")): |
| result = await file_handler._validate_file(str(test_file)) |
| assert result is False |
| |
| @pytest.mark.asyncio |
| async def test_process_text_file_read_exception(self, file_handler, temp_dir): |
| """测试文本文件读取异常处理""" |
| text_file = Path(temp_dir) / "test.txt" |
| text_file.write_text("content") |
| |
| # Mock aiofiles.open抛出异常 |
| with patch('aiofiles.open', side_effect=Exception("Read error")): |
| result = await file_handler._process_text_file(str(text_file)) |
| assert "❌ 文本文件处理失败: Read error" in result |
| |
| @pytest.mark.asyncio |
| async def test_cleanup_temp_files_no_files_to_clean(self, file_handler, temp_dir): |
| """测试清理空目录""" |
| # 空目录 |
| await file_handler.cleanup_temp_files(max_age_hours=24) |
| # 不应该出错 |
| |
| @pytest.mark.asyncio |
| async def test_cleanup_temp_files_iterdir_error(self, file_handler, temp_dir): |
| """测试目录遍历错误""" |
| with patch.object(Path, 'iterdir', side_effect=PermissionError("Access denied")): |
| # 应该捕获异常并继续 |
| await file_handler.cleanup_temp_files(max_age_hours=24) |