blob: b6c1ac3e8eacc65c0476ca66ec931e968c4ab753 [file] [log] [blame] [raw]
"""
Telegram Bot单元测试 - 流式消息发送器
"""
import pytest
import asyncio
from unittest.mock import Mock, AsyncMock
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent.parent.parent.parent / "src"))
from claude_agent.telegram.stream_sender import StreamMessageSender
class TestStreamMessageSender:
"""流式消息发送器测试"""
@pytest.fixture
def mock_telegram_client(self):
"""创建Mock Telegram客户端"""
client = Mock()
client.send_message = AsyncMock()
client.edit_message_text = AsyncMock()
return client
@pytest.fixture
def stream_sender(self, mock_telegram_client):
"""创建流式发送器实例"""
return StreamMessageSender(mock_telegram_client, update_interval=0.1)
def test_init(self, mock_telegram_client):
"""测试初始化"""
sender = StreamMessageSender(mock_telegram_client, update_interval=2.0)
assert sender.telegram_client == mock_telegram_client
assert sender.update_interval == 2.0
@pytest.mark.asyncio
async def test_send_streaming_message_simple(self, stream_sender, mock_telegram_client):
"""测试简单流式消息发送"""
# Mock初始消息
initial_message = Mock()
initial_message.message_id = 123
mock_telegram_client.send_message.return_value = initial_message
# 创建简单函数返回字符串
def simple_function():
return "Hello World"
# 发送流式消息
result = await stream_sender.send_streaming_message(
chat_id=456,
message_generator=simple_function,
initial_text="Loading...",
reply_to_message_id=789
)
# 验证调用
mock_telegram_client.send_message.assert_called_once_with(
chat_id=456,
text="Loading...",
parse_mode='Markdown',
reply_to_message_id=789
)
# 验证至少调用了一次编辑消息
assert mock_telegram_client.edit_message_text.call_count >= 1
# 验证最终消息包含完整内容
final_call = mock_telegram_client.edit_message_text.call_args_list[-1]
final_text = final_call[1]['text']
assert "Hello World" in final_text
assert result == initial_message
@pytest.mark.asyncio
async def test_send_streaming_message_sync_generator(self, stream_sender, mock_telegram_client):
"""测试同步生成器"""
initial_message = Mock()
initial_message.message_id = 123
mock_telegram_client.send_message.return_value = initial_message
# 创建同步生成器函数
def sync_generator():
for word in ["Test", " ", "Message"]:
yield word
# 发送流式消息
result = await stream_sender.send_streaming_message(
chat_id=456,
message_generator=sync_generator,
initial_text="Processing..."
)
# 验证调用
mock_telegram_client.send_message.assert_called_once_with(
chat_id=456,
text="Processing...",
parse_mode='Markdown',
reply_to_message_id=None
)
# 验证编辑消息被调用
assert mock_telegram_client.edit_message_text.call_count >= 1
assert result == initial_message
@pytest.mark.asyncio
async def test_send_streaming_message_plain_function(self, stream_sender, mock_telegram_client):
"""测试普通函数返回值"""
initial_message = Mock()
initial_message.message_id = 123
mock_telegram_client.send_message.return_value = initial_message
# 创建普通函数
def plain_function():
return "Simple response"
# 发送流式消息
result = await stream_sender.send_streaming_message(
chat_id=456,
message_generator=plain_function
)
# 验证最终消息
final_call = mock_telegram_client.edit_message_text.call_args_list[-1]
final_text = final_call[1]['text']
assert "Simple response" in final_text
@pytest.mark.asyncio
async def test_send_streaming_message_error_handling(self, stream_sender, mock_telegram_client):
"""测试错误处理"""
initial_message = Mock()
initial_message.message_id = 123
mock_telegram_client.send_message.return_value = initial_message
# 创建会抛出异常的函数
def error_function():
raise ValueError("Test error")
# 发送流式消息
result = await stream_sender.send_streaming_message(
chat_id=456,
message_generator=error_function
)
# 验证错误消息被发送
error_call = mock_telegram_client.edit_message_text.call_args_list[-1]
error_text = error_call[1]['text']
assert "❌ 消息处理出错" in error_text # 修正错误消息文本
assert "Test error" in error_text
@pytest.mark.asyncio
async def test_send_streaming_message_edit_error(self, stream_sender, mock_telegram_client):
"""测试编辑消息失败的处理"""
initial_message = Mock()
initial_message.message_id = 123
mock_telegram_client.send_message.return_value = initial_message
# Mock编辑消息在最后一次调用时失败,但中间的调用成功
call_count = 0
def edit_side_effect(*args, **kwargs):
nonlocal call_count
call_count += 1
if call_count >= 2: # 第二次及以后的调用失败
raise Exception("Edit failed")
mock_telegram_client.edit_message_text.side_effect = edit_side_effect
def simple_function():
return "Hello"
# 应该不抛出异常,继续处理
result = await stream_sender.send_streaming_message(
chat_id=456,
message_generator=simple_function
)
# 验证方法正常完成
assert result == initial_message
assert result == initial_message
def test_format_message_text_simple(self, stream_sender):
"""测试简单消息格式化"""
result = stream_sender._format_message_text("Hello World", final=True)
assert result == "Hello World"
def test_format_message_text_with_status(self, stream_sender):
"""测试带状态的消息格式化"""
result = stream_sender._format_message_text("Hello", final=False)
assert "Hello" in result
assert "⏳ 继续生成中..." in result
def test_format_message_text_final(self, stream_sender):
"""测试最终消息格式化"""
result = stream_sender._format_message_text("Final message", final=True)
assert result == "Final message"
def test_format_message_text_empty(self, stream_sender):
"""测试空消息格式化"""
result = stream_sender._format_message_text("")
assert "🤔 正在处理..." in result
def test_format_message_text_long(self, stream_sender):
"""测试长消息格式化"""
long_text = "x" * 5000 # 超过4000字符限制
result = stream_sender._format_message_text(long_text)
assert len(result) <= 4050 # 包含状态指示器
assert "..." in result
def test_clean_markdown(self, stream_sender):
"""测试Markdown清理功能"""
# 测试保守的Markdown清理策略:移除所有可能有问题的字符
test_cases = [
("**bold** and *italic*", "bold and italic"), # 移除星号
("[link](url)", "linkurl"), # 移除方括号和圆括号
("`code`", "`code`"), # 保留行内代码
("# Header", "# Header"), # 保留井号
("> quote", "> quote"), # 保留引用
]
for input_text, expected in test_cases:
result = stream_sender._clean_markdown(input_text)
assert result == expected
# 测试代码块保持原样
text1 = "```python\nprint('hello')\n```"
result1 = stream_sender._clean_markdown(text1)
assert "```python\nprint('hello')\n```" == result1
# 测试多余空行移除
text3 = "Line 1\n\n\n\nLine 2"
result3 = stream_sender._clean_markdown(text3)
assert "Line 1\n\nLine 2" in result3
# 测试行内代码保持原样
text4 = "`inline code`"
result4 = stream_sender._clean_markdown(text4)
assert "`inline code`" in result4
# 测试多余空行清理
text5 = "Line1\n\n\n\nLine2"
result5 = stream_sender._clean_markdown(text5)
assert "Line1\n\nLine2" in result5
@pytest.mark.asyncio
async def test_send_chunked_message(self, stream_sender, mock_telegram_client):
"""测试分块发送消息"""
# Mock消息对象
mock_message1 = Mock()
mock_message2 = Mock()
mock_telegram_client.send_message.side_effect = [mock_message1, mock_message2]
# 创建长文本(超过chunk_size)
long_text = "A" * 3000 + "B" * 3000 # 6000字符
# 发送分块消息
messages = await stream_sender.send_chunked_message(
chat_id=456,
text=long_text,
chunk_size=4000,
reply_to_message_id=789
)
# 验证发送了两次消息
assert len(messages) == 2
assert mock_telegram_client.send_message.call_count == 2
# 验证第一条消息有回复ID,第二条没有
first_call = mock_telegram_client.send_message.call_args_list[0]
second_call = mock_telegram_client.send_message.call_args_list[1]
assert first_call[1]['reply_to_message_id'] == 789
assert second_call[1]['reply_to_message_id'] is None
# 验证消息内容
first_text = first_call[1]['text']
second_text = second_call[1]['text']
# 验证消息内容包含预期的部分
assert "A" in first_text # 第一部分包含A
assert "B" in second_text # 第二部分包含B
assert len(first_text) <= 4000 # 验证分块大小
assert len(second_text) <= 4000
@pytest.mark.asyncio
async def test_send_chunked_message_short_text(self, stream_sender, mock_telegram_client):
"""测试发送短文本(不需要分块)"""
mock_message = Mock()
mock_telegram_client.send_message.return_value = mock_message
short_text = "Short message"
messages = await stream_sender.send_chunked_message(
chat_id=456,
text=short_text,
chunk_size=4000
)
# 验证只发送了一次消息
assert len(messages) == 1
assert mock_telegram_client.send_message.call_count == 1
@pytest.mark.asyncio
async def test_update_interval_functionality(self, mock_telegram_client):
"""测试更新间隔功能"""
# 使用较短的更新间隔进行测试
sender = StreamMessageSender(mock_telegram_client, update_interval=0.01)
initial_message = Mock()
initial_message.message_id = 123
mock_telegram_client.send_message.return_value = initial_message
def simple_function():
return "Test message"
await sender.send_streaming_message(
chat_id=456,
message_generator=simple_function
)
# 验证初始消息和编辑消息都被调用
mock_telegram_client.send_message.assert_called_once()
assert mock_telegram_client.edit_message_text.call_count >= 1
@pytest.mark.asyncio
async def test_empty_generator(self, stream_sender, mock_telegram_client):
"""测试空生成器"""
initial_message = Mock()
initial_message.message_id = 123
mock_telegram_client.send_message.return_value = initial_message
async def empty_generator():
return
yield # 这行永远不会执行
result = await stream_sender.send_streaming_message(
chat_id=456,
message_generator=empty_generator
)
# 验证初始消息被发送
mock_telegram_client.send_message.assert_called_once()
# 由于没有内容生成,最终消息可能不会被编辑或编辑为空
assert result == initial_message
@pytest.mark.asyncio
async def test_direct_value_handling(self, stream_sender, mock_telegram_client):
"""测试直接传入非可调用值的处理(覆盖107-109行)"""
initial_message = Mock()
initial_message.message_id = 123
mock_telegram_client.send_message.return_value = initial_message
# 直接传入字符串值(非可调用)
direct_value = "Direct string value"
result = await stream_sender.send_streaming_message(
chat_id=456,
message_generator=direct_value
)
# 验证调用了_handle_sync_result方法来处理直接值
final_call = mock_telegram_client.edit_message_text.call_args_list[-1]
final_text = final_call[1]['text']
assert "Direct string value" in final_text
assert result == initial_message
@pytest.mark.asyncio
async def test_async_generator_instance_with_update_intervals(self, mock_telegram_client):
"""测试异步生成器实例的时间间隔更新(覆盖145-169行)"""
# 使用非常短的更新间隔以触发时间间隔更新逻辑
sender = StreamMessageSender(mock_telegram_client, update_interval=0.01)
initial_message = Mock()
initial_message.message_id = 123
mock_telegram_client.send_message.return_value = initial_message
# Mock edit_message_text返回一个消息对象用于final_message
final_message = Mock()
final_message.message_id = 124
mock_telegram_client.edit_message_text.return_value = final_message
async def slow_async_generator():
yield "Part1"
await asyncio.sleep(0.02) # 确保超过更新间隔
yield " Part2"
await asyncio.sleep(0.02) # 再次超过更新间隔
yield " Part3"
# 传入函数,而不是生成器实例,这样会调用_handle_async_generator_instance
result = await sender.send_streaming_message(
chat_id=456,
message_generator=slow_async_generator
)
# 验证至少有编辑消息调用(可能是中间更新 + 最终更新)
assert mock_telegram_client.edit_message_text.call_count >= 1
# 验证最终消息包含所有部分
final_call = mock_telegram_client.edit_message_text.call_args_list[-1]
final_text = final_call[1]['text']
assert "Part1" in final_text
assert "Part2" in final_text
assert "Part3" in final_text
# 结果应该是最后编辑返回的消息或初始消息
assert result in [initial_message, final_message]
@pytest.mark.asyncio
async def test_async_generator_instance_update_error(self, mock_telegram_client):
"""测试异步生成器中间更新失败的处理(覆盖165-169行)"""
sender = StreamMessageSender(mock_telegram_client, update_interval=0.01)
initial_message = Mock()
initial_message.message_id = 123
mock_telegram_client.send_message.return_value = initial_message
# Mock编辑消息在中间调用时失败,但最终调用成功
call_count = 0
def edit_side_effect(*args, **kwargs):
nonlocal call_count
call_count += 1
if call_count == 1: # 第一次中间更新失败
raise Exception("Intermediate update failed")
return initial_message # 最终更新成功
mock_telegram_client.edit_message_text.side_effect = edit_side_effect
async def async_generator_with_delays():
yield "Part1"
await asyncio.sleep(0.02) # 触发中间更新
yield " Final"
result = await sender.send_streaming_message(
chat_id=456,
message_generator=async_generator_with_delays()
)
# 验证至少有两次编辑尝试
assert mock_telegram_client.edit_message_text.call_count >= 2
assert result == initial_message
@pytest.mark.asyncio
async def test_final_message_sending_error_handling(self, mock_telegram_client):
"""测试最终消息发送失败和简化重试逻辑(覆盖175-200行)"""
sender = StreamMessageSender(mock_telegram_client, update_interval=0.01)
initial_message = Mock()
initial_message.message_id = 123
mock_telegram_client.send_message.return_value = initial_message
# Mock编辑消息:让编辑调用失败但不影响整个流程(异常会被捕获)
call_count = 0
def edit_side_effect(*args, **kwargs):
nonlocal call_count
call_count += 1
if call_count <= 2: # 前两次调用失败
raise Exception("Edit message failed")
# 后续调用成功,但这里我们不会到达,只是测试错误处理
return initial_message
mock_telegram_client.edit_message_text.side_effect = edit_side_effect
async def simple_async_generator():
yield "Test content"
# 异步生成器实例需要用callable方式传入
result = await sender.send_streaming_message(
chat_id=456,
message_generator=simple_async_generator
)
# 验证编辑消息被尝试调用(虽然可能失败)
assert mock_telegram_client.edit_message_text.call_count >= 1
assert result == initial_message
@pytest.mark.asyncio
async def test_final_message_empty_accumulated_text(self, mock_telegram_client):
"""测试累积文本为空时跳过最终消息更新(覆盖202行)"""
sender = StreamMessageSender(mock_telegram_client, update_interval=0.01)
initial_message = Mock()
initial_message.message_id = 123
mock_telegram_client.send_message.return_value = initial_message
async def empty_content_generator():
# 生成器不产生任何内容
return
yield "never reached"
result = await sender.send_streaming_message(
chat_id=456,
message_generator=empty_content_generator
)
# 注意:即使生成器为空,仍然会触发一次初始处理,所以可能有1次编辑调用
# 我们主要验证没有因空内容而崩溃,且逻辑正确处理了空的累积文本
assert result == initial_message
@pytest.mark.asyncio
async def test_sync_result_error_handling(self, stream_sender, mock_telegram_client):
"""测试同步结果处理错误(覆盖220-222行)"""
# Mock编辑消息先成功一次,然后失败,这样可以触发错误处理中的第二次调用
call_count = 0
def edit_side_effect(*args, **kwargs):
nonlocal call_count
call_count += 1
if call_count == 1: # 第一次正常调用失败
raise Exception("Edit failed")
# 第二次错误消息调用成功
return Mock()
mock_telegram_client.edit_message_text.side_effect = edit_side_effect
# 直接调用_handle_sync_result方法来触发错误处理
# 这个方法内部有try-except,所以不会抛出异常
await stream_sender._handle_sync_result(456, 123, "Test result")
# 验证尝试了编辑消息(第一次正常尝试,失败后第二次是错误消息)
assert mock_telegram_client.edit_message_text.call_count == 2
@pytest.mark.asyncio
async def test_handle_async_generator_method(self, stream_sender, mock_telegram_client):
"""测试_handle_async_generator方法(覆盖235-256行)"""
initial_message = Mock()
initial_message.message_id = 123
async def async_generator_func():
yield "Async"
yield " Generator"
yield " Content"
# 直接调用_handle_async_generator方法
await stream_sender._handle_async_generator(456, 123, async_generator_func)
# 验证编辑消息被调用
assert mock_telegram_client.edit_message_text.call_count >= 1
@pytest.mark.asyncio
async def test_handle_async_generator_update_error(self, mock_telegram_client):
"""测试_handle_async_generator中间更新错误(覆盖251-252行)"""
sender = StreamMessageSender(mock_telegram_client, update_interval=0.01)
# Mock编辑消息失败(中间更新失败,但最终消息成功)
call_count = 0
def edit_side_effect(*args, **kwargs):
nonlocal call_count
call_count += 1
if call_count <= 1: # 第一次中间更新失败
raise Exception("Update failed")
# 最终更新成功
return Mock()
mock_telegram_client.edit_message_text.side_effect = edit_side_effect
async def async_generator_func():
yield "Part1"
await asyncio.sleep(0.02) # 触发更新间隔
yield " Part2"
# 直接调用方法,异常被内部处理
await sender._handle_async_generator(456, 123, async_generator_func)
# 验证尝试了编辑消息(至少中间更新一次,最终更新一次)
assert mock_telegram_client.edit_message_text.call_count >= 2
@pytest.mark.asyncio
async def test_handle_sync_generator_method(self, stream_sender, mock_telegram_client):
"""测试_handle_sync_generator方法(覆盖269-307行)"""
def sync_generator_func():
yield "Sync"
yield " Generator"
return # 显式返回
# 直接调用_handle_sync_generator方法
await stream_sender._handle_sync_generator(456, 123, sync_generator_func)
# 验证编辑消息被调用
assert mock_telegram_client.edit_message_text.call_count >= 1
@pytest.mark.asyncio
async def test_handle_sync_generator_non_callable(self, stream_sender, mock_telegram_client):
"""测试_handle_sync_generator处理非可调用对象(覆盖298-299行)"""
# 传入非可调用对象
non_callable_value = "Direct value"
await stream_sender._handle_sync_generator(456, 123, non_callable_value)
# 验证编辑消息被调用
assert mock_telegram_client.edit_message_text.call_count >= 1
# 验证消息内容
final_call = mock_telegram_client.edit_message_text.call_args_list[-1]
final_text = final_call[1]['text']
assert "Direct value" in final_text
@pytest.mark.asyncio
async def test_handle_sync_generator_non_iterable_result(self, stream_sender, mock_telegram_client):
"""测试_handle_sync_generator处理返回非可迭代结果的函数(覆盖295-297行)"""
def function_returning_string():
return "Simple string result"
await stream_sender._handle_sync_generator(456, 123, function_returning_string)
# 验证编辑消息被调用
assert mock_telegram_client.edit_message_text.call_count >= 1
# 验证消息内容
final_call = mock_telegram_client.edit_message_text.call_args_list[-1]
final_text = final_call[1]['text']
assert "Simple string result" in final_text
@pytest.mark.asyncio
async def test_handle_sync_generator_processing_error(self, stream_sender, mock_telegram_client):
"""测试_handle_sync_generator处理异常(覆盖301-303行)"""
def error_function():
raise ValueError("Processing error")
await stream_sender._handle_sync_generator(456, 123, error_function)
# 验证编辑消息被调用,包含错误信息
assert mock_telegram_client.edit_message_text.call_count >= 1
final_call = mock_telegram_client.edit_message_text.call_args_list[-1]
final_text = final_call[1]['text']
assert "❌ 处理错误" in final_text
assert "Processing error" in final_text
@pytest.mark.asyncio
async def test_handle_sync_generator_with_time_intervals(self, mock_telegram_client):
"""测试_handle_sync_generator的时间间隔更新(覆盖282-294行)"""
sender = StreamMessageSender(mock_telegram_client, update_interval=0.01)
def slow_sync_generator():
yield "Part1"
import time
time.sleep(0.02) # 超过更新间隔
yield " Part2"
time.sleep(0.02) # 再次超过更新间隔
yield " Part3"
await sender._handle_sync_generator(456, 123, slow_sync_generator)
# 验证多次编辑消息调用
assert mock_telegram_client.edit_message_text.call_count >= 2
@pytest.mark.asyncio
async def test_handle_sync_generator_update_error(self, mock_telegram_client):
"""测试_handle_sync_generator中间更新失败(覆盖290-291行)"""
sender = StreamMessageSender(mock_telegram_client, update_interval=0.01)
# Mock编辑消息失败(只在中间更新时失败)
call_count = 0
def edit_side_effect(*args, **kwargs):
nonlocal call_count
call_count += 1
if call_count <= 2: # 中间更新失败
raise Exception("Update failed")
# 最终更新成功
return Mock()
mock_telegram_client.edit_message_text.side_effect = edit_side_effect
def sync_generator():
yield "Part1"
import time
time.sleep(0.02) # 触发更新间隔
yield " Part2"
# 异常会被内部处理,不会向外抛出
await sender._handle_sync_generator(456, 123, sync_generator)
# 验证尝试了编辑消息(中间更新 + 最终更新)
assert mock_telegram_client.edit_message_text.call_count >= 2
@pytest.mark.asyncio
async def test_send_typing_action(self, stream_sender):
"""测试发送typing状态功能(覆盖406-411行)"""
# 这个方法目前是占位符实现(pass),但我们需要覆盖异常处理部分
# 直接调用方法,应该不会抛出异常
await stream_sender.send_typing_action(456)
# 方法当前只包含pass和异常处理,所以只要不抛出异常就表示成功覆盖