| """ |
| Telegram Bot单元测试 - 流式消息发送器 |
| """ |
| |
| import pytest |
| import asyncio |
| from unittest.mock import Mock, AsyncMock |
| |
| import sys |
| from pathlib import Path |
| sys.path.insert(0, str(Path(__file__).parent.parent.parent.parent / "src")) |
| |
| from claude_agent.telegram.stream_sender import StreamMessageSender |
| |
| |
| class TestStreamMessageSender: |
| """流式消息发送器测试""" |
| |
| @pytest.fixture |
| def mock_telegram_client(self): |
| """创建Mock Telegram客户端""" |
| client = Mock() |
| client.send_message = AsyncMock() |
| client.edit_message_text = AsyncMock() |
| return client |
| |
| @pytest.fixture |
| def stream_sender(self, mock_telegram_client): |
| """创建流式发送器实例""" |
| return StreamMessageSender(mock_telegram_client, update_interval=0.1) |
| |
| def test_init(self, mock_telegram_client): |
| """测试初始化""" |
| sender = StreamMessageSender(mock_telegram_client, update_interval=2.0) |
| assert sender.telegram_client == mock_telegram_client |
| assert sender.update_interval == 2.0 |
| |
| @pytest.mark.asyncio |
| async def test_send_streaming_message_simple(self, stream_sender, mock_telegram_client): |
| """测试简单流式消息发送""" |
| # Mock初始消息 |
| initial_message = Mock() |
| initial_message.message_id = 123 |
| mock_telegram_client.send_message.return_value = initial_message |
| |
| # 创建简单函数返回字符串 |
| def simple_function(): |
| return "Hello World" |
| |
| # 发送流式消息 |
| result = await stream_sender.send_streaming_message( |
| chat_id=456, |
| message_generator=simple_function, |
| initial_text="Loading...", |
| reply_to_message_id=789 |
| ) |
| |
| # 验证调用 |
| mock_telegram_client.send_message.assert_called_once_with( |
| chat_id=456, |
| text="Loading...", |
| parse_mode='Markdown', |
| reply_to_message_id=789 |
| ) |
| |
| # 验证至少调用了一次编辑消息 |
| assert mock_telegram_client.edit_message_text.call_count >= 1 |
| |
| # 验证最终消息包含完整内容 |
| final_call = mock_telegram_client.edit_message_text.call_args_list[-1] |
| final_text = final_call[1]['text'] |
| assert "Hello World" in final_text |
| |
| assert result == initial_message |
| |
| @pytest.mark.asyncio |
| async def test_send_streaming_message_sync_generator(self, stream_sender, mock_telegram_client): |
| """测试同步生成器""" |
| initial_message = Mock() |
| initial_message.message_id = 123 |
| mock_telegram_client.send_message.return_value = initial_message |
| |
| # 创建同步生成器函数 |
| def sync_generator(): |
| for word in ["Test", " ", "Message"]: |
| yield word |
| |
| # 发送流式消息 |
| result = await stream_sender.send_streaming_message( |
| chat_id=456, |
| message_generator=sync_generator, |
| initial_text="Processing..." |
| ) |
| |
| # 验证调用 |
| mock_telegram_client.send_message.assert_called_once_with( |
| chat_id=456, |
| text="Processing...", |
| parse_mode='Markdown', |
| reply_to_message_id=None |
| ) |
| |
| # 验证编辑消息被调用 |
| assert mock_telegram_client.edit_message_text.call_count >= 1 |
| |
| assert result == initial_message |
| |
| @pytest.mark.asyncio |
| async def test_send_streaming_message_plain_function(self, stream_sender, mock_telegram_client): |
| """测试普通函数返回值""" |
| initial_message = Mock() |
| initial_message.message_id = 123 |
| mock_telegram_client.send_message.return_value = initial_message |
| |
| # 创建普通函数 |
| def plain_function(): |
| return "Simple response" |
| |
| # 发送流式消息 |
| result = await stream_sender.send_streaming_message( |
| chat_id=456, |
| message_generator=plain_function |
| ) |
| |
| # 验证最终消息 |
| final_call = mock_telegram_client.edit_message_text.call_args_list[-1] |
| final_text = final_call[1]['text'] |
| assert "Simple response" in final_text |
| |
| @pytest.mark.asyncio |
| async def test_send_streaming_message_error_handling(self, stream_sender, mock_telegram_client): |
| """测试错误处理""" |
| initial_message = Mock() |
| initial_message.message_id = 123 |
| mock_telegram_client.send_message.return_value = initial_message |
| |
| # 创建会抛出异常的函数 |
| def error_function(): |
| raise ValueError("Test error") |
| |
| # 发送流式消息 |
| result = await stream_sender.send_streaming_message( |
| chat_id=456, |
| message_generator=error_function |
| ) |
| |
| # 验证错误消息被发送 |
| error_call = mock_telegram_client.edit_message_text.call_args_list[-1] |
| error_text = error_call[1]['text'] |
| assert "❌ 消息处理出错" in error_text # 修正错误消息文本 |
| assert "Test error" in error_text |
| |
| @pytest.mark.asyncio |
| async def test_send_streaming_message_edit_error(self, stream_sender, mock_telegram_client): |
| """测试编辑消息失败的处理""" |
| initial_message = Mock() |
| initial_message.message_id = 123 |
| mock_telegram_client.send_message.return_value = initial_message |
| |
| # Mock编辑消息在最后一次调用时失败,但中间的调用成功 |
| call_count = 0 |
| def edit_side_effect(*args, **kwargs): |
| nonlocal call_count |
| call_count += 1 |
| if call_count >= 2: # 第二次及以后的调用失败 |
| raise Exception("Edit failed") |
| |
| mock_telegram_client.edit_message_text.side_effect = edit_side_effect |
| |
| def simple_function(): |
| return "Hello" |
| |
| # 应该不抛出异常,继续处理 |
| result = await stream_sender.send_streaming_message( |
| chat_id=456, |
| message_generator=simple_function |
| ) |
| |
| # 验证方法正常完成 |
| assert result == initial_message |
| |
| assert result == initial_message |
| |
| def test_format_message_text_simple(self, stream_sender): |
| """测试简单消息格式化""" |
| result = stream_sender._format_message_text("Hello World", final=True) |
| assert result == "Hello World" |
| |
| def test_format_message_text_with_status(self, stream_sender): |
| """测试带状态的消息格式化""" |
| result = stream_sender._format_message_text("Hello", final=False) |
| assert "Hello" in result |
| assert "⏳ 继续生成中..." in result |
| |
| def test_format_message_text_final(self, stream_sender): |
| """测试最终消息格式化""" |
| result = stream_sender._format_message_text("Final message", final=True) |
| assert result == "Final message" |
| |
| def test_format_message_text_empty(self, stream_sender): |
| """测试空消息格式化""" |
| result = stream_sender._format_message_text("") |
| assert "🤔 正在处理..." in result |
| |
| def test_format_message_text_long(self, stream_sender): |
| """测试长消息格式化""" |
| long_text = "x" * 5000 # 超过4000字符限制 |
| result = stream_sender._format_message_text(long_text) |
| assert len(result) <= 4050 # 包含状态指示器 |
| assert "..." in result |
| |
| def test_clean_markdown(self, stream_sender): |
| """测试Markdown清理功能""" |
| # 测试保守的Markdown清理策略:移除所有可能有问题的字符 |
| test_cases = [ |
| ("**bold** and *italic*", "bold and italic"), # 移除星号 |
| ("[link](url)", "linkurl"), # 移除方括号和圆括号 |
| ("`code`", "`code`"), # 保留行内代码 |
| ("# Header", "# Header"), # 保留井号 |
| ("> quote", "> quote"), # 保留引用 |
| ] |
| |
| for input_text, expected in test_cases: |
| result = stream_sender._clean_markdown(input_text) |
| assert result == expected |
| |
| # 测试代码块保持原样 |
| text1 = "```python\nprint('hello')\n```" |
| result1 = stream_sender._clean_markdown(text1) |
| assert "```python\nprint('hello')\n```" == result1 |
| |
| # 测试多余空行移除 |
| text3 = "Line 1\n\n\n\nLine 2" |
| result3 = stream_sender._clean_markdown(text3) |
| assert "Line 1\n\nLine 2" in result3 |
| |
| # 测试行内代码保持原样 |
| text4 = "`inline code`" |
| result4 = stream_sender._clean_markdown(text4) |
| assert "`inline code`" in result4 |
| |
| # 测试多余空行清理 |
| text5 = "Line1\n\n\n\nLine2" |
| result5 = stream_sender._clean_markdown(text5) |
| assert "Line1\n\nLine2" in result5 |
| |
| @pytest.mark.asyncio |
| async def test_send_chunked_message(self, stream_sender, mock_telegram_client): |
| """测试分块发送消息""" |
| # Mock消息对象 |
| mock_message1 = Mock() |
| mock_message2 = Mock() |
| mock_telegram_client.send_message.side_effect = [mock_message1, mock_message2] |
| |
| # 创建长文本(超过chunk_size) |
| long_text = "A" * 3000 + "B" * 3000 # 6000字符 |
| |
| # 发送分块消息 |
| messages = await stream_sender.send_chunked_message( |
| chat_id=456, |
| text=long_text, |
| chunk_size=4000, |
| reply_to_message_id=789 |
| ) |
| |
| # 验证发送了两次消息 |
| assert len(messages) == 2 |
| assert mock_telegram_client.send_message.call_count == 2 |
| |
| # 验证第一条消息有回复ID,第二条没有 |
| first_call = mock_telegram_client.send_message.call_args_list[0] |
| second_call = mock_telegram_client.send_message.call_args_list[1] |
| |
| assert first_call[1]['reply_to_message_id'] == 789 |
| assert second_call[1]['reply_to_message_id'] is None |
| |
| # 验证消息内容 |
| first_text = first_call[1]['text'] |
| second_text = second_call[1]['text'] |
| |
| # 验证消息内容包含预期的部分 |
| assert "A" in first_text # 第一部分包含A |
| assert "B" in second_text # 第二部分包含B |
| assert len(first_text) <= 4000 # 验证分块大小 |
| assert len(second_text) <= 4000 |
| |
| @pytest.mark.asyncio |
| async def test_send_chunked_message_short_text(self, stream_sender, mock_telegram_client): |
| """测试发送短文本(不需要分块)""" |
| mock_message = Mock() |
| mock_telegram_client.send_message.return_value = mock_message |
| |
| short_text = "Short message" |
| |
| messages = await stream_sender.send_chunked_message( |
| chat_id=456, |
| text=short_text, |
| chunk_size=4000 |
| ) |
| |
| # 验证只发送了一次消息 |
| assert len(messages) == 1 |
| assert mock_telegram_client.send_message.call_count == 1 |
| |
| @pytest.mark.asyncio |
| async def test_update_interval_functionality(self, mock_telegram_client): |
| """测试更新间隔功能""" |
| # 使用较短的更新间隔进行测试 |
| sender = StreamMessageSender(mock_telegram_client, update_interval=0.01) |
| |
| initial_message = Mock() |
| initial_message.message_id = 123 |
| mock_telegram_client.send_message.return_value = initial_message |
| |
| def simple_function(): |
| return "Test message" |
| |
| await sender.send_streaming_message( |
| chat_id=456, |
| message_generator=simple_function |
| ) |
| |
| # 验证初始消息和编辑消息都被调用 |
| mock_telegram_client.send_message.assert_called_once() |
| assert mock_telegram_client.edit_message_text.call_count >= 1 |
| |
| @pytest.mark.asyncio |
| async def test_empty_generator(self, stream_sender, mock_telegram_client): |
| """测试空生成器""" |
| initial_message = Mock() |
| initial_message.message_id = 123 |
| mock_telegram_client.send_message.return_value = initial_message |
| |
| async def empty_generator(): |
| return |
| yield # 这行永远不会执行 |
| |
| result = await stream_sender.send_streaming_message( |
| chat_id=456, |
| message_generator=empty_generator |
| ) |
| |
| # 验证初始消息被发送 |
| mock_telegram_client.send_message.assert_called_once() |
| |
| # 由于没有内容生成,最终消息可能不会被编辑或编辑为空 |
| assert result == initial_message |
| |
| @pytest.mark.asyncio |
| async def test_direct_value_handling(self, stream_sender, mock_telegram_client): |
| """测试直接传入非可调用值的处理(覆盖107-109行)""" |
| initial_message = Mock() |
| initial_message.message_id = 123 |
| mock_telegram_client.send_message.return_value = initial_message |
| |
| # 直接传入字符串值(非可调用) |
| direct_value = "Direct string value" |
| |
| result = await stream_sender.send_streaming_message( |
| chat_id=456, |
| message_generator=direct_value |
| ) |
| |
| # 验证调用了_handle_sync_result方法来处理直接值 |
| final_call = mock_telegram_client.edit_message_text.call_args_list[-1] |
| final_text = final_call[1]['text'] |
| assert "Direct string value" in final_text |
| |
| assert result == initial_message |
| |
| @pytest.mark.asyncio |
| async def test_async_generator_instance_with_update_intervals(self, mock_telegram_client): |
| """测试异步生成器实例的时间间隔更新(覆盖145-169行)""" |
| # 使用非常短的更新间隔以触发时间间隔更新逻辑 |
| sender = StreamMessageSender(mock_telegram_client, update_interval=0.01) |
| |
| initial_message = Mock() |
| initial_message.message_id = 123 |
| mock_telegram_client.send_message.return_value = initial_message |
| # Mock edit_message_text返回一个消息对象用于final_message |
| final_message = Mock() |
| final_message.message_id = 124 |
| mock_telegram_client.edit_message_text.return_value = final_message |
| |
| async def slow_async_generator(): |
| yield "Part1" |
| await asyncio.sleep(0.02) # 确保超过更新间隔 |
| yield " Part2" |
| await asyncio.sleep(0.02) # 再次超过更新间隔 |
| yield " Part3" |
| |
| # 传入函数,而不是生成器实例,这样会调用_handle_async_generator_instance |
| result = await sender.send_streaming_message( |
| chat_id=456, |
| message_generator=slow_async_generator |
| ) |
| |
| # 验证至少有编辑消息调用(可能是中间更新 + 最终更新) |
| assert mock_telegram_client.edit_message_text.call_count >= 1 |
| |
| # 验证最终消息包含所有部分 |
| final_call = mock_telegram_client.edit_message_text.call_args_list[-1] |
| final_text = final_call[1]['text'] |
| assert "Part1" in final_text |
| assert "Part2" in final_text |
| assert "Part3" in final_text |
| |
| # 结果应该是最后编辑返回的消息或初始消息 |
| assert result in [initial_message, final_message] |
| |
| @pytest.mark.asyncio |
| async def test_async_generator_instance_update_error(self, mock_telegram_client): |
| """测试异步生成器中间更新失败的处理(覆盖165-169行)""" |
| sender = StreamMessageSender(mock_telegram_client, update_interval=0.01) |
| |
| initial_message = Mock() |
| initial_message.message_id = 123 |
| mock_telegram_client.send_message.return_value = initial_message |
| |
| # Mock编辑消息在中间调用时失败,但最终调用成功 |
| call_count = 0 |
| def edit_side_effect(*args, **kwargs): |
| nonlocal call_count |
| call_count += 1 |
| if call_count == 1: # 第一次中间更新失败 |
| raise Exception("Intermediate update failed") |
| return initial_message # 最终更新成功 |
| |
| mock_telegram_client.edit_message_text.side_effect = edit_side_effect |
| |
| async def async_generator_with_delays(): |
| yield "Part1" |
| await asyncio.sleep(0.02) # 触发中间更新 |
| yield " Final" |
| |
| result = await sender.send_streaming_message( |
| chat_id=456, |
| message_generator=async_generator_with_delays() |
| ) |
| |
| # 验证至少有两次编辑尝试 |
| assert mock_telegram_client.edit_message_text.call_count >= 2 |
| assert result == initial_message |
| |
| @pytest.mark.asyncio |
| async def test_final_message_sending_error_handling(self, mock_telegram_client): |
| """测试最终消息发送失败和简化重试逻辑(覆盖175-200行)""" |
| sender = StreamMessageSender(mock_telegram_client, update_interval=0.01) |
| |
| initial_message = Mock() |
| initial_message.message_id = 123 |
| mock_telegram_client.send_message.return_value = initial_message |
| |
| # Mock编辑消息:让编辑调用失败但不影响整个流程(异常会被捕获) |
| call_count = 0 |
| def edit_side_effect(*args, **kwargs): |
| nonlocal call_count |
| call_count += 1 |
| if call_count <= 2: # 前两次调用失败 |
| raise Exception("Edit message failed") |
| # 后续调用成功,但这里我们不会到达,只是测试错误处理 |
| return initial_message |
| |
| mock_telegram_client.edit_message_text.side_effect = edit_side_effect |
| |
| async def simple_async_generator(): |
| yield "Test content" |
| |
| # 异步生成器实例需要用callable方式传入 |
| result = await sender.send_streaming_message( |
| chat_id=456, |
| message_generator=simple_async_generator |
| ) |
| |
| # 验证编辑消息被尝试调用(虽然可能失败) |
| assert mock_telegram_client.edit_message_text.call_count >= 1 |
| assert result == initial_message |
| |
| @pytest.mark.asyncio |
| async def test_final_message_empty_accumulated_text(self, mock_telegram_client): |
| """测试累积文本为空时跳过最终消息更新(覆盖202行)""" |
| sender = StreamMessageSender(mock_telegram_client, update_interval=0.01) |
| |
| initial_message = Mock() |
| initial_message.message_id = 123 |
| mock_telegram_client.send_message.return_value = initial_message |
| |
| async def empty_content_generator(): |
| # 生成器不产生任何内容 |
| return |
| yield "never reached" |
| |
| result = await sender.send_streaming_message( |
| chat_id=456, |
| message_generator=empty_content_generator |
| ) |
| |
| # 注意:即使生成器为空,仍然会触发一次初始处理,所以可能有1次编辑调用 |
| # 我们主要验证没有因空内容而崩溃,且逻辑正确处理了空的累积文本 |
| assert result == initial_message |
| |
| @pytest.mark.asyncio |
| async def test_sync_result_error_handling(self, stream_sender, mock_telegram_client): |
| """测试同步结果处理错误(覆盖220-222行)""" |
| # Mock编辑消息先成功一次,然后失败,这样可以触发错误处理中的第二次调用 |
| call_count = 0 |
| def edit_side_effect(*args, **kwargs): |
| nonlocal call_count |
| call_count += 1 |
| if call_count == 1: # 第一次正常调用失败 |
| raise Exception("Edit failed") |
| # 第二次错误消息调用成功 |
| return Mock() |
| |
| mock_telegram_client.edit_message_text.side_effect = edit_side_effect |
| |
| # 直接调用_handle_sync_result方法来触发错误处理 |
| # 这个方法内部有try-except,所以不会抛出异常 |
| await stream_sender._handle_sync_result(456, 123, "Test result") |
| |
| # 验证尝试了编辑消息(第一次正常尝试,失败后第二次是错误消息) |
| assert mock_telegram_client.edit_message_text.call_count == 2 |
| |
| @pytest.mark.asyncio |
| async def test_handle_async_generator_method(self, stream_sender, mock_telegram_client): |
| """测试_handle_async_generator方法(覆盖235-256行)""" |
| initial_message = Mock() |
| initial_message.message_id = 123 |
| |
| async def async_generator_func(): |
| yield "Async" |
| yield " Generator" |
| yield " Content" |
| |
| # 直接调用_handle_async_generator方法 |
| await stream_sender._handle_async_generator(456, 123, async_generator_func) |
| |
| # 验证编辑消息被调用 |
| assert mock_telegram_client.edit_message_text.call_count >= 1 |
| |
| @pytest.mark.asyncio |
| async def test_handle_async_generator_update_error(self, mock_telegram_client): |
| """测试_handle_async_generator中间更新错误(覆盖251-252行)""" |
| sender = StreamMessageSender(mock_telegram_client, update_interval=0.01) |
| |
| # Mock编辑消息失败(中间更新失败,但最终消息成功) |
| call_count = 0 |
| def edit_side_effect(*args, **kwargs): |
| nonlocal call_count |
| call_count += 1 |
| if call_count <= 1: # 第一次中间更新失败 |
| raise Exception("Update failed") |
| # 最终更新成功 |
| return Mock() |
| |
| mock_telegram_client.edit_message_text.side_effect = edit_side_effect |
| |
| async def async_generator_func(): |
| yield "Part1" |
| await asyncio.sleep(0.02) # 触发更新间隔 |
| yield " Part2" |
| |
| # 直接调用方法,异常被内部处理 |
| await sender._handle_async_generator(456, 123, async_generator_func) |
| |
| # 验证尝试了编辑消息(至少中间更新一次,最终更新一次) |
| assert mock_telegram_client.edit_message_text.call_count >= 2 |
| |
| @pytest.mark.asyncio |
| async def test_handle_sync_generator_method(self, stream_sender, mock_telegram_client): |
| """测试_handle_sync_generator方法(覆盖269-307行)""" |
| def sync_generator_func(): |
| yield "Sync" |
| yield " Generator" |
| return # 显式返回 |
| |
| # 直接调用_handle_sync_generator方法 |
| await stream_sender._handle_sync_generator(456, 123, sync_generator_func) |
| |
| # 验证编辑消息被调用 |
| assert mock_telegram_client.edit_message_text.call_count >= 1 |
| |
| @pytest.mark.asyncio |
| async def test_handle_sync_generator_non_callable(self, stream_sender, mock_telegram_client): |
| """测试_handle_sync_generator处理非可调用对象(覆盖298-299行)""" |
| # 传入非可调用对象 |
| non_callable_value = "Direct value" |
| |
| await stream_sender._handle_sync_generator(456, 123, non_callable_value) |
| |
| # 验证编辑消息被调用 |
| assert mock_telegram_client.edit_message_text.call_count >= 1 |
| |
| # 验证消息内容 |
| final_call = mock_telegram_client.edit_message_text.call_args_list[-1] |
| final_text = final_call[1]['text'] |
| assert "Direct value" in final_text |
| |
| @pytest.mark.asyncio |
| async def test_handle_sync_generator_non_iterable_result(self, stream_sender, mock_telegram_client): |
| """测试_handle_sync_generator处理返回非可迭代结果的函数(覆盖295-297行)""" |
| def function_returning_string(): |
| return "Simple string result" |
| |
| await stream_sender._handle_sync_generator(456, 123, function_returning_string) |
| |
| # 验证编辑消息被调用 |
| assert mock_telegram_client.edit_message_text.call_count >= 1 |
| |
| # 验证消息内容 |
| final_call = mock_telegram_client.edit_message_text.call_args_list[-1] |
| final_text = final_call[1]['text'] |
| assert "Simple string result" in final_text |
| |
| @pytest.mark.asyncio |
| async def test_handle_sync_generator_processing_error(self, stream_sender, mock_telegram_client): |
| """测试_handle_sync_generator处理异常(覆盖301-303行)""" |
| def error_function(): |
| raise ValueError("Processing error") |
| |
| await stream_sender._handle_sync_generator(456, 123, error_function) |
| |
| # 验证编辑消息被调用,包含错误信息 |
| assert mock_telegram_client.edit_message_text.call_count >= 1 |
| |
| final_call = mock_telegram_client.edit_message_text.call_args_list[-1] |
| final_text = final_call[1]['text'] |
| assert "❌ 处理错误" in final_text |
| assert "Processing error" in final_text |
| |
| @pytest.mark.asyncio |
| async def test_handle_sync_generator_with_time_intervals(self, mock_telegram_client): |
| """测试_handle_sync_generator的时间间隔更新(覆盖282-294行)""" |
| sender = StreamMessageSender(mock_telegram_client, update_interval=0.01) |
| |
| def slow_sync_generator(): |
| yield "Part1" |
| import time |
| time.sleep(0.02) # 超过更新间隔 |
| yield " Part2" |
| time.sleep(0.02) # 再次超过更新间隔 |
| yield " Part3" |
| |
| await sender._handle_sync_generator(456, 123, slow_sync_generator) |
| |
| # 验证多次编辑消息调用 |
| assert mock_telegram_client.edit_message_text.call_count >= 2 |
| |
| @pytest.mark.asyncio |
| async def test_handle_sync_generator_update_error(self, mock_telegram_client): |
| """测试_handle_sync_generator中间更新失败(覆盖290-291行)""" |
| sender = StreamMessageSender(mock_telegram_client, update_interval=0.01) |
| |
| # Mock编辑消息失败(只在中间更新时失败) |
| call_count = 0 |
| def edit_side_effect(*args, **kwargs): |
| nonlocal call_count |
| call_count += 1 |
| if call_count <= 2: # 中间更新失败 |
| raise Exception("Update failed") |
| # 最终更新成功 |
| return Mock() |
| |
| mock_telegram_client.edit_message_text.side_effect = edit_side_effect |
| |
| def sync_generator(): |
| yield "Part1" |
| import time |
| time.sleep(0.02) # 触发更新间隔 |
| yield " Part2" |
| |
| # 异常会被内部处理,不会向外抛出 |
| await sender._handle_sync_generator(456, 123, sync_generator) |
| |
| # 验证尝试了编辑消息(中间更新 + 最终更新) |
| assert mock_telegram_client.edit_message_text.call_count >= 2 |
| |
| @pytest.mark.asyncio |
| async def test_send_typing_action(self, stream_sender): |
| """测试发送typing状态功能(覆盖406-411行)""" |
| # 这个方法目前是占位符实现(pass),但我们需要覆盖异常处理部分 |
| |
| # 直接调用方法,应该不会抛出异常 |
| await stream_sender.send_typing_action(456) |
| |
| # 方法当前只包含pass和异常处理,所以只要不抛出异常就表示成功覆盖 |