blob: 12690994dfeee31e74b32ea5c57ffd0d1435814c [file] [log] [blame] [raw]
"""
Telegram Bot单元测试 - 消息处理器
"""
import pytest
from unittest.mock import Mock, AsyncMock, patch
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent.parent.parent.parent / "src"))
from claude_agent.telegram.message_handler import MessageHandler
class TestMessageHandler:
"""消息处理器测试"""
@pytest.fixture
def mock_components(self):
"""创建Mock组件"""
components = {
'telegram_client': Mock(),
'context_manager': Mock(),
'file_handler': Mock(),
'claude_agent': Mock(),
'stream_sender': Mock()
}
# 设置异步方法
components['telegram_client'].send_message = AsyncMock()
components['telegram_client'].edit_message_text = AsyncMock()
components['file_handler'].save_telegram_file = AsyncMock()
components['file_handler'].process_image = AsyncMock()
components['file_handler'].process_document = AsyncMock()
components['claude_agent'].process_message = AsyncMock()
components['claude_agent'].process_with_image = AsyncMock()
components['claude_agent'].process_with_document = AsyncMock()
components['stream_sender'].send_streaming_message = AsyncMock()
return components
@pytest.fixture
def message_handler(self, mock_components):
"""创建消息处理器实例"""
return MessageHandler(
telegram_client=mock_components['telegram_client'],
context_manager=mock_components['context_manager'],
file_handler=mock_components['file_handler'],
claude_agent=mock_components['claude_agent'],
stream_sender=mock_components['stream_sender'],
allowed_users=[123, 456],
allowed_groups=[-100, -200]
)
def test_init(self, mock_components):
"""测试初始化"""
handler = MessageHandler(
telegram_client=mock_components['telegram_client'],
context_manager=mock_components['context_manager'],
file_handler=mock_components['file_handler'],
claude_agent=mock_components['claude_agent'],
stream_sender=mock_components['stream_sender'],
allowed_users=[111, 222],
allowed_groups=[-333, -444]
)
assert handler.allowed_users == {111, 222}
assert handler.allowed_groups == {-333, -444}
@pytest.mark.asyncio
async def test_handle_text_message_private_authorized(self, message_handler, mock_components):
"""测试处理私聊文本消息(已授权用户)"""
# 创建Mock Update对象
update = Mock()
update.effective_message = Mock()
update.effective_user = Mock()
update.effective_chat = Mock()
update.effective_message.text = "Hello Bot"
update.effective_message.message_id = 789
update.effective_message.reply_to_message = None # 没有回复任何消息
update.effective_user.id = 123 # 授权用户
update.effective_user.username = "testuser"
update.effective_user.first_name = "Test"
update.effective_user.last_name = "User"
update.effective_user.language_code = "en"
update.effective_chat.id = 123
update.effective_chat.type = "private"
# Mock上下文管理器返回值
mock_components['context_manager'].get_context.return_value = []
# Mock Claude Agent方法
mock_agent = Mock()
mock_agent.conversation_history = [] # 添加conversation_history属性
mock_components['claude_agent']._get_or_create_agent.return_value = mock_agent
# Mock create_streaming_response为异步生成器
async def mock_streaming_generator(*args):
yield "Hello!"
yield " How can"
yield " I help you?"
mock_components['claude_agent'].create_streaming_response.return_value = mock_streaming_generator()
# Mock流式发送器返回值
mock_reply = Mock()
mock_reply.text = "Hello! How can I help you?"
mock_reply.from_user = Mock()
mock_reply.from_user.id = 999 # Bot ID
mock_reply.message_id = 999
# Mock send_streaming_message 来实际调用传入的生成器
async def mock_send_streaming_message(chat_id, message_generator, initial_text, reply_to_message_id=None):
# 实际调用生成器来触发create_streaming_response
chunks = []
async for chunk in message_generator():
chunks.append(chunk)
return mock_reply
mock_components['stream_sender'].send_streaming_message.side_effect = mock_send_streaming_message
context = Mock()
# 调用处理函数
await message_handler.handle_text_message(update, context)
# 验证should_reply逻辑是否正确工作
# 对于私聊且用户123在白名单中,should_reply应该为True
assert update.effective_user.id == 123
assert update.effective_chat.type == "private"
assert 123 in message_handler.allowed_users
# 验证调用了Claude Agent的_get_or_create_agent方法
mock_components['claude_agent']._get_or_create_agent.assert_called_with(123)
# 验证调用了Claude Agent的create_streaming_response
mock_components['claude_agent'].create_streaming_response.assert_called_once()
# 验证调用了流式发送器
mock_components['stream_sender'].send_streaming_message.assert_called_once()
@pytest.mark.asyncio
async def test_handle_text_message_unauthorized(self, message_handler, mock_components):
"""测试处理未授权用户的消息"""
# 创建未授权用户的Update
update = Mock()
update.effective_message = Mock()
update.effective_user = Mock()
update.effective_chat = Mock()
update.effective_user.id = 999 # 未授权用户
update.effective_chat.id = 999
update.effective_chat.type = "private"
context = Mock()
# 调用处理函数
await message_handler.handle_text_message(update, context)
# 验证没有进行任何处理
mock_components['context_manager'].add_message.assert_not_called()
mock_components['claude_agent'].process_message.assert_not_called()
@pytest.mark.asyncio
async def test_handle_text_message_group_with_mention(self, message_handler, mock_components):
"""测试处理群组中@机器人的消息"""
# 创建群组消息
update = Mock()
update.effective_message = Mock()
update.effective_user = Mock()
update.effective_chat = Mock()
update.effective_message.text = "@testbot help me"
update.effective_message.message_id = 789
update.effective_user.id = 123 # 授权用户
update.effective_chat.id = -100 # 授权群组
update.effective_chat.type = "group"
# Mock get_bot方法
mock_bot = Mock()
mock_bot.username = "testbot"
update.get_bot.return_value = mock_bot
mock_components['context_manager'].get_context.return_value = []
mock_components['claude_agent'].process_message.return_value = "Group response"
mock_reply = Mock()
mock_reply.text = "Group response"
mock_reply.from_user = Mock()
mock_reply.from_user.id = 999
mock_components['stream_sender'].send_streaming_message.return_value = mock_reply
context = Mock()
# 调用处理函数
await message_handler.handle_text_message(update, context)
# 验证发送流式回复被调用
mock_components['stream_sender'].send_streaming_message.assert_called_once()
# 验证调用参数
args, kwargs = mock_components['stream_sender'].send_streaming_message.call_args
assert kwargs['chat_id'] == -100
assert kwargs['initial_text'] == "⌨️ User is typing..."
assert kwargs['reply_to_message_id'] == 789
@pytest.mark.asyncio
async def test_handle_text_message_group_without_mention(self, message_handler, mock_components):
"""测试处理群组中未@机器人的消息"""
# 创建群组消息(无@)
update = Mock()
update.effective_message = Mock()
update.effective_user = Mock()
update.effective_chat = Mock()
update.effective_message.text = "Just a normal message"
update.effective_user.id = 123
update.effective_chat.id = -100
update.effective_chat.type = "group"
# Mock get_bot方法
mock_bot = Mock()
mock_bot.username = "testbot"
update.get_bot.return_value = mock_bot
context = Mock()
# 调用处理函数
await message_handler.handle_text_message(update, context)
# 验证没有处理消息
mock_components['claude_agent'].process_message.assert_not_called()
@pytest.mark.asyncio
async def test_handle_photo_message(self, message_handler, mock_components):
"""测试处理图片消息(简化版本)"""
# 创建带图片的Update
update = Mock()
update.effective_message = Mock()
update.effective_user = Mock()
update.effective_chat = Mock()
# Mock图片数据
mock_photo = Mock()
mock_photo.file_id = "photo123"
update.effective_message.photo = [mock_photo]
update.effective_message.caption = "What's in this image?"
update.effective_message.message_id = 789
update.effective_message.reply_to_message = None # 没有回复任何消息
update.effective_user.id = 123
update.effective_chat.id = 123
update.effective_chat.type = "private"
# Mock回复消息
mock_reply = Mock()
mock_reply.text = "🖼️ 图片处理功能暂时不可用,请用文字描述您想了解的内容,我会尽力帮助您!"
mock_reply.from_user = Mock()
mock_reply.from_user.id = 999
mock_components['telegram_client'].send_message = AsyncMock(return_value=mock_reply)
context = Mock()
# 调用处理函数
await message_handler.handle_photo_message(update, context)
# 验证发送了不可用消息
mock_components['telegram_client'].send_message.assert_called_with(
chat_id=123,
text="🖼️ 图片处理功能暂时不可用,请用文字描述您想了解的内容,我会尽力帮助您!",
reply_to_message_id=789
)
@pytest.mark.asyncio
async def test_handle_document_message(self, message_handler, mock_components):
"""测试处理文档消息"""
# 创建带文档的Update
update = Mock()
update.effective_message = Mock()
update.effective_user = Mock()
update.effective_chat = Mock()
# Mock文档数据
mock_document = Mock()
mock_document.file_id = "doc123"
mock_document.file_name = "test.pdf"
mock_document.file_size = 1024 * 1024 # 1MB
update.effective_message.document = mock_document
update.effective_message.caption = "Please analyze this document"
update.effective_message.message_id = 789
update.effective_message.reply_to_message = None # 没有回复任何消息
update.effective_user.id = 123
update.effective_chat.id = 123
update.effective_chat.type = "private"
# Mock回复消息
mock_reply = Mock()
mock_reply.text = "📄 文档处理功能暂时不可用,请用文字描述您想了解的内容,我会尽力帮助您!"
mock_reply.from_user = Mock()
mock_reply.from_user.id = 999
mock_components['telegram_client'].send_message = AsyncMock(return_value=mock_reply)
context = Mock()
# 调用处理函数
await message_handler.handle_document_message(update, context)
# 验证发送了不可用消息
mock_components['telegram_client'].send_message.assert_called_with(
chat_id=123,
text="📄 文档处理功能暂时不可用,请用文字描述您想了解的内容,我会尽力帮助您!",
reply_to_message_id=789
)
@pytest.mark.asyncio
async def test_handle_document_message_too_large(self, message_handler, mock_components):
"""测试处理过大的文档"""
# 创建过大文档的Update
update = Mock()
update.effective_message = Mock()
update.effective_user = Mock()
update.effective_chat = Mock()
mock_document = Mock()
mock_document.file_size = 25 * 1024 * 1024 # 25MB,超过限制
mock_document.file_name = "large.pdf"
update.effective_message.document = mock_document
update.effective_message.message_id = 789
update.effective_user.id = 123
update.effective_chat.id = 123
update.effective_chat.type = "private"
context = Mock()
# 调用处理函数
await message_handler.handle_document_message(update, context)
# 验证发送了错误消息
mock_components['telegram_client'].send_message.assert_called_with(
chat_id=123,
text="📄 文档处理功能暂时不可用,请用文字描述您想了解的内容,我会尽力帮助您!",
reply_to_message_id=789
)
@pytest.mark.asyncio
async def test_error_handling(self, message_handler, mock_components):
"""测试错误处理"""
# 创建会导致错误的Update
update = Mock()
update.effective_message = Mock()
update.effective_user = Mock()
update.effective_chat = Mock()
update.effective_message.text = "Error test"
update.effective_user.id = 123
update.effective_chat.id = 123
update.effective_chat.type = "private"
# Mock上下文管理器抛出异常来触发错误处理
mock_components['context_manager'].add_message.side_effect = Exception("Context error")
context = Mock()
# 调用处理函数(应该不抛出异常)
await message_handler.handle_text_message(update, context)
# 验证方法正常完成且没有抛出异常
assert True # 如果运行到这里说明没有异常
def test_build_user_info(self, message_handler):
"""测试构建用户信息"""
# 创建Mock用户和聊天对象
user = Mock()
user.id = 123
user.username = "testuser"
user.first_name = "Test"
user.last_name = "User"
user.language_code = "en"
chat = Mock()
chat.id = 456
chat.type = "private"
# 构建用户信息
user_info = message_handler._build_user_info(user, chat)
# 验证信息
assert user_info['user_id'] == 123
assert user_info['username'] == "testuser"
assert user_info['first_name'] == "Test"
assert user_info['last_name'] == "User"
assert user_info['language_code'] == "en"
assert user_info['chat_id'] == 456
assert user_info['chat_type'] == "private"
def test_build_user_info_group_chat(self, message_handler):
"""测试构建群组聊天用户信息"""
user = Mock()
user.id = 123
user.username = "testuser"
user.first_name = "Test"
user.last_name = None
user.language_code = "zh"
chat = Mock()
chat.id = -100
chat.type = "group"
chat.title = "Test Group"
user_info = message_handler._build_user_info(user, chat)
assert user_info['chat_type'] == "group"
assert user_info['chat_title'] == "Test Group"
assert user_info['last_name'] is None
@pytest.mark.asyncio
async def test_is_authorized_private_allowed(self, message_handler):
"""测试私聊授权检查(允许的用户)"""
update = Mock()
update.effective_user = Mock()
update.effective_chat = Mock()
update.effective_user.id = 123 # 在允许列表中
update.effective_chat.type = "private"
result = await message_handler._is_authorized(update)
assert result is True
@pytest.mark.asyncio
async def test_is_authorized_private_denied(self, message_handler):
"""测试私聊授权检查(拒绝的用户)"""
update = Mock()
update.effective_user = Mock()
update.effective_chat = Mock()
update.effective_user.id = 999 # 不在允许列表中
update.effective_chat.type = "private"
result = await message_handler._is_authorized(update)
assert result is False
@pytest.mark.asyncio
async def test_is_authorized_group_allowed(self, message_handler):
"""测试群组授权检查(允许的群组)"""
update = Mock()
update.effective_user = Mock()
update.effective_chat = Mock()
update.effective_user.id = 999 # 用户ID不重要
update.effective_chat.id = -100 # 在允许的群组列表中
update.effective_chat.type = "group"
result = await message_handler._is_authorized(update)
assert result is True
@pytest.mark.asyncio
async def test_is_authorized_group_denied(self, message_handler):
"""测试群组授权检查(拒绝的群组)"""
update = Mock()
update.effective_user = Mock()
update.effective_chat = Mock()
update.effective_user.id = 123 # 即使是允许的用户
update.effective_chat.id = -999 # 不在允许的群组列表中
update.effective_chat.type = "group"
result = await message_handler._is_authorized(update)
assert result is False
@pytest.mark.asyncio
async def test_should_reply_private_chat(self, message_handler):
"""测试私聊回复判断"""
update = Mock()
update.effective_chat = Mock()
update.effective_user = Mock()
update.effective_chat.type = "private"
update.effective_user.id = 123 # 在白名单中的用户
should_reply, is_random = await message_handler._get_reply_info(update, "any message")
assert should_reply is True
assert is_random is False
@pytest.mark.asyncio
async def test_should_reply_group_with_mention(self, message_handler):
"""测试群组中带@的回复判断"""
update = Mock()
update.effective_chat = Mock()
update.effective_chat.type = "group"
update.effective_message = Mock()
update.effective_message.reply_to_message = None # 不是回复消息
# Mock Bot对象
mock_bot = Mock()
mock_bot.username = "testbot"
update.get_bot.return_value = mock_bot
# Mock participation manager to control random participation
with patch.object(message_handler.participation_manager, 'is_name_mentioned', return_value=False), \
patch.object(message_handler.participation_manager, 'should_participate_random', return_value=False):
# 测试各种@格式
assert await message_handler._should_reply(update, "@testbot help") is True
assert await message_handler._should_reply(update, "@TestBot help") is True # 大小写不敏感
assert await message_handler._should_reply(update, "help @testbot") is True
assert await message_handler._should_reply(update, "just a message") is False
@pytest.mark.asyncio
async def test_should_reply_group_no_bot_username(self, message_handler):
"""测试群组中无Bot用户名的情况"""
update = Mock()
update.effective_chat = Mock()
update.effective_chat.type = "group"
# Mock Bot对象无用户名
mock_bot = Mock()
mock_bot.username = None
update.get_bot.return_value = mock_bot
result = await message_handler._should_reply(update, "@someone help")
assert result is False
@pytest.mark.asyncio
async def test_should_reply_no_bot(self, message_handler):
"""测试无法获取Bot信息的情况"""
update = Mock()
update.effective_chat = Mock()
update.effective_chat.type = "group"
update.get_bot.return_value = None
result = await message_handler._should_reply(update, "@someone help")
assert result is False
def test_extract_reply_context_no_reply(self, message_handler):
"""测试没有回复消息的情况"""
message = Mock()
message.reply_to_message = None
result = message_handler._extract_reply_context(message)
assert result == ""
def test_extract_reply_context_with_text_reply(self, message_handler):
"""测试回复文本消息的情况"""
from datetime import datetime
# 创建被回复的消息
reply_msg = Mock()
reply_msg.from_user = Mock()
reply_msg.from_user.is_bot = False
reply_msg.from_user.username = "testuser"
reply_msg.from_user.first_name = "Test"
reply_msg.from_user.last_name = "User"
reply_msg.text = "这是被回复的消息内容"
# 使用Mock对象来控制时区转换,返回真实的datetime对象
mock_local_time = datetime(2024, 1, 1, 12, 30, 45)
mock_date = Mock()
mock_date.tzinfo = None # 模拟无时区信息
mock_date.replace.return_value.astimezone.return_value = mock_local_time
reply_msg.date = mock_date
reply_msg.caption = None
reply_msg.photo = None
reply_msg.document = None
reply_msg.voice = None
reply_msg.video = None
reply_msg.audio = None
reply_msg.sticker = None
# 创建当前消息
message = Mock()
message.reply_to_message = reply_msg
result = message_handler._extract_reply_context(message)
expected = "↳ 回复 [@testuser 12:30]: 这是被回复的消息内容"
assert result == expected
def test_extract_reply_context_with_bot_reply(self, message_handler):
"""测试回复Bot消息的情况"""
from datetime import datetime
# 创建被回复的Bot消息
reply_msg = Mock()
reply_msg.from_user = Mock()
reply_msg.from_user.is_bot = True
reply_msg.text = "Bot的回复消息"
# 使用Mock对象来控制时区转换
mock_local_time = datetime(2024, 1, 1, 14, 15, 30)
mock_date = Mock()
mock_date.tzinfo = None
mock_date.replace.return_value.astimezone.return_value = mock_local_time
reply_msg.date = mock_date
reply_msg.caption = None
reply_msg.photo = None
reply_msg.document = None
reply_msg.voice = None
reply_msg.video = None
reply_msg.audio = None
reply_msg.sticker = None
# 创建当前消息
message = Mock()
message.reply_to_message = reply_msg
result = message_handler._extract_reply_context(message)
expected = "↳ 回复 [🤖 Bot 14:15]: Bot的回复消息"
assert result == expected
def test_extract_reply_context_with_photo_reply(self, message_handler):
"""测试回复图片消息的情况"""
from datetime import datetime
# 创建被回复的图片消息
reply_msg = Mock()
reply_msg.from_user = Mock()
reply_msg.from_user.is_bot = False
reply_msg.from_user.username = None
reply_msg.from_user.first_name = "张三"
reply_msg.from_user.last_name = None
reply_msg.text = None
reply_msg.caption = "这是图片描述"
reply_msg.photo = Mock() # 有图片
# 使用Mock对象来控制时区转换
mock_local_time = datetime(2024, 1, 1, 10, 0, 0)
mock_date = Mock()
mock_date.tzinfo = None
mock_date.replace.return_value.astimezone.return_value = mock_local_time
reply_msg.date = mock_date
reply_msg.document = None
reply_msg.voice = None
reply_msg.video = None
reply_msg.audio = None
reply_msg.sticker = None
# 创建当前消息
message = Mock()
message.reply_to_message = reply_msg
result = message_handler._extract_reply_context(message)
expected = "↳ 回复 [张三 10:00]: 这是图片描述"
assert result == expected
def test_extract_reply_context_with_document_reply(self, message_handler):
"""测试回复文档消息的情况"""
from datetime import datetime
# 创建被回复的文档消息
reply_msg = Mock()
reply_msg.from_user = Mock()
reply_msg.from_user.is_bot = False
reply_msg.from_user.username = "user123"
reply_msg.text = None
reply_msg.caption = None
reply_msg.photo = None
reply_msg.document = Mock()
reply_msg.document.file_name = "example.pdf"
# 使用Mock对象来控制时区转换
mock_local_time = datetime(2024, 1, 1, 16, 45, 0)
mock_date = Mock()
mock_date.tzinfo = None
mock_date.replace.return_value.astimezone.return_value = mock_local_time
reply_msg.date = mock_date
reply_msg.voice = None
reply_msg.video = None
reply_msg.audio = None
reply_msg.sticker = None
# 创建当前消息
message = Mock()
message.reply_to_message = reply_msg
result = message_handler._extract_reply_context(message)
expected = "↳ 回复 [@user123 16:45]: [文档: example.pdf]"
assert result == expected
def test_extract_reply_context_long_message(self, message_handler):
"""测试回复长消息的情况(内容截断)"""
from datetime import datetime
# 创建被回复的长消息
long_text = "这是一条非常长的消息内容," * 10 # 超过100字符
reply_msg = Mock()
reply_msg.from_user = Mock()
reply_msg.from_user.is_bot = False
reply_msg.from_user.username = "longuser"
reply_msg.text = long_text
reply_msg.date = datetime(2024, 1, 1, 9, 30, 0)
reply_msg.caption = None
reply_msg.photo = None
reply_msg.document = None
reply_msg.voice = None
reply_msg.video = None
reply_msg.audio = None
reply_msg.sticker = None
# 创建当前消息
message = Mock()
message.reply_to_message = reply_msg
result = message_handler._extract_reply_context(message)
# 验证内容被截断了
assert "..." in result
assert len(result) <= 200 # 确保总长度合理
def test_build_message_with_reply_context_no_reply(self, message_handler):
"""测试构建无回复消息的情况"""
message = Mock()
message.reply_to_message = None
result = message_handler._build_message_with_reply_context("Hello", message)
assert result == "Hello"
def test_build_message_with_reply_context_with_reply(self, message_handler):
"""测试构建有回复消息的情况"""
from datetime import datetime
# 创建被回复的消息
reply_msg = Mock()
reply_msg.from_user = Mock()
reply_msg.from_user.is_bot = False
reply_msg.from_user.username = "testuser"
reply_msg.text = "原始消息"
# 使用Mock对象来控制时区转换
mock_local_time = datetime(2024, 1, 1, 12, 0, 0)
mock_date = Mock()
mock_date.tzinfo = None
mock_date.replace.return_value.astimezone.return_value = mock_local_time
reply_msg.date = mock_date
reply_msg.caption = None
reply_msg.photo = None
reply_msg.document = None
reply_msg.voice = None
reply_msg.video = None
reply_msg.audio = None
reply_msg.sticker = None
# 创建当前消息
message = Mock()
message.reply_to_message = reply_msg
result = message_handler._build_message_with_reply_context("我的回复", message)
expected = "↳ 回复 [@testuser 12:00]: 原始消息\n我的回复"
assert result == expected
# Add tests for webhook broadcasting functionality
@pytest.mark.asyncio
async def test_handle_text_message_with_webhook_broadcast(self, mock_components):
"""测试包含Webhook广播的文本消息处理"""
# 创建带有Webhook回调的消息处理器
webhook_callback = AsyncMock()
handler = MessageHandler(
telegram_client=mock_components['telegram_client'],
context_manager=mock_components['context_manager'],
file_handler=mock_components['file_handler'],
claude_agent=mock_components['claude_agent'],
stream_sender=mock_components['stream_sender'],
allowed_users=[123],
allowed_groups=[-100],
webhook_broadcast_callback=webhook_callback
)
# 创建群聊Update对象
update = Mock()
update.effective_message = Mock()
update.effective_user = Mock()
update.effective_chat = Mock()
update.effective_message.text = "@Claude 帮助我"
update.effective_message.message_id = 789
update.effective_message.reply_to_message = None
update.effective_user.id = 123
update.effective_user.username = "testuser"
update.effective_user.first_name = "Test"
update.effective_chat.id = -100
update.effective_chat.type = "group"
update.effective_chat.title = "测试群"
# Mock上下文管理器
mock_components['context_manager'].get_context.return_value = []
# Mock Claude Agent
mock_agent = Mock()
mock_agent.conversation_history = []
mock_components['claude_agent']._get_or_create_agent.return_value = mock_agent
# Mock返回消息
reply_message = Mock()
reply_message.text = "Bot回复内容"
reply_message.message_id = 790
reply_message.from_user = Mock()
reply_message.from_user.id = 999
reply_message.from_user.username = "testbot"
reply_message.from_user.first_name = "TestBot"
reply_message.from_user.last_name = None
mock_components['stream_sender'].send_streaming_message.return_value = reply_message
context = Mock()
# 执行处理
await handler.handle_text_message(update, context)
# 验证Webhook广播被调用
webhook_callback.assert_called_once()
call_args = webhook_callback.call_args[1]
assert call_args['group_id'] == -100
assert call_args['message_content'] == "Bot回复内容"
assert call_args['sender_info']['is_bot'] is True
@pytest.mark.asyncio
async def test_handle_text_message_webhook_broadcast_error(self, mock_components):
"""测试Webhook广播失败的处理"""
# 创建会抛出异常的Webhook回调
webhook_callback = AsyncMock()
webhook_callback.side_effect = Exception("Webhook失败")
handler = MessageHandler(
telegram_client=mock_components['telegram_client'],
context_manager=mock_components['context_manager'],
file_handler=mock_components['file_handler'],
claude_agent=mock_components['claude_agent'],
stream_sender=mock_components['stream_sender'],
allowed_users=[123],
allowed_groups=[-100],
webhook_broadcast_callback=webhook_callback
)
# 创建群聊Update
update = Mock()
update.effective_message = Mock()
update.effective_user = Mock()
update.effective_chat = Mock()
update.effective_message.text = "@Claude test"
update.effective_message.message_id = 789
update.effective_message.reply_to_message = None
update.effective_user.id = 123
update.effective_chat.id = -100
update.effective_chat.type = "group"
mock_components['context_manager'].get_context.return_value = []
mock_agent = Mock()
mock_agent.conversation_history = []
mock_components['claude_agent']._get_or_create_agent.return_value = mock_agent
reply_message = Mock()
reply_message.text = "Bot回复"
reply_message.message_id = 790
reply_message.from_user = Mock()
reply_message.from_user.id = 999
mock_components['stream_sender'].send_streaming_message.return_value = reply_message
context = Mock()
# 应该不会抛出异常,即使Webhook失败
await handler.handle_text_message(update, context)
# 验证Webhook仍然被调用了
webhook_callback.assert_called_once()
@pytest.mark.asyncio
async def test_handle_text_message_error_fallback(self, mock_components):
"""测试文本消息处理发生异常时的错误处理"""
handler = MessageHandler(
telegram_client=mock_components['telegram_client'],
context_manager=mock_components['context_manager'],
file_handler=mock_components['file_handler'],
claude_agent=mock_components['claude_agent'],
stream_sender=mock_components['stream_sender'],
allowed_users=[123],
allowed_groups=[]
)
# 创建私聊Update
update = Mock()
update.effective_message = Mock()
update.effective_user = Mock()
update.effective_chat = Mock()
update.effective_message.text = "测试消息"
update.effective_user.id = 123
update.effective_chat.id = 123
update.effective_chat.type = "private"
# Mock stream_sender抛出异常
mock_components['stream_sender'].send_streaming_message.side_effect = Exception("流式响应失败")
context = Mock()
# 应该捕获异常并发送错误消息
await handler.handle_text_message(update, context)
# 验证发送了错误消息
mock_components['telegram_client'].send_message.assert_called()
send_call = mock_components['telegram_client'].send_message.call_args[1]
assert "消息处理失败" in send_call['text']
def test_check_mention_in_text_webhook(self, message_handler):
"""测试Webhook消息中的@检测"""
# 设置bot_username属性
message_handler.bot_username = "Claude"
# 测试包含@Claude的消息
assert message_handler._check_mention_in_text_webhook("@Claude 帮助我") is True
# 测试不包含@Claude的消息
assert message_handler._check_mention_in_text_webhook("普通消息") is False
# 测试包含其他@的消息
assert message_handler._check_mention_in_text_webhook("@其他人 你好") is False
def test_build_user_display_name(self, message_handler):
"""测试构建用户显示名"""
# 测试有用户名的情况 - 优先使用用户名
user_info = {
'username': 'testuser',
'first_name': 'Test',
'last_name': 'User'
}
display_name = message_handler._build_user_display_name(123, user_info)
assert display_name == "@testuser"
# 测试只有姓名的情况
user_info = {
'first_name': 'Test',
'last_name': 'User'
}
display_name = message_handler._build_user_display_name(456, user_info)
assert display_name == "Test User"
# 测试只有first_name
user_info = {'first_name': 'Test'}
display_name = message_handler._build_user_display_name(789, user_info)
assert display_name == "Test"
# 测试空信息
user_info = {}
display_name = message_handler._build_user_display_name(999, user_info)
assert display_name == "User999"
def test_convert_agent_history_to_context_format(self, message_handler):
"""测试转换Agent历史为上下文格式"""
agent_history = [
{"role": "user", "content": "用户消息1"},
{"role": "assistant", "content": "助手回复1"},
{"role": "user", "content": "用户消息2"}
]
# 执行转换
context = message_handler._convert_agent_history_to_context_format(agent_history, "test_chat")
# 验证转换结果
assert len(context) == 3
assert context[0]["is_bot"] is False
assert context[1]["is_bot"] is True
assert context[2]["is_bot"] is False
assert context[0]["message"] == "用户消息1"
@pytest.mark.asyncio
async def test_handle_webhook_message_with_mention(self, message_handler):
"""测试处理Webhook消息(带@mention)"""
# 设置bot_username以支持mention检测
message_handler.bot_username = "testbot"
chat_id = -100 # 授权的群组
user_id = 123
message_text = "@testbot 帮助我"
user_info = {
'user_id': user_id,
'username': 'testuser',
'first_name': 'Test',
'last_name': 'User'
}
# Mock流式响应
mock_reply = Mock()
mock_reply.text = "Webhook回复内容"
mock_reply.message_id = 999
mock_reply.from_user = Mock()
mock_reply.from_user.id = 888
mock_reply.from_user.username = "testbot"
mock_reply.from_user.first_name = "TestBot"
mock_reply.from_user.last_name = None
# Mock create_streaming_response异步生成器
async def mock_streaming_response(*args, **kwargs):
yield "Webhook"
yield "回复内容"
message_handler.claude_agent.create_streaming_response = AsyncMock(
return_value=mock_streaming_response()
)
# Mock send_streaming_message to actually call the generator function
async def mock_send_streaming_message(chat_id, message_generator, initial_text, **kwargs):
# 调用生成器函数来触发create_streaming_response
async for chunk in message_generator():
pass # 消费生成器
return mock_reply
message_handler.stream_sender.send_streaming_message = AsyncMock(
side_effect=mock_send_streaming_message
)
# 执行处理
await message_handler.handle_webhook_message(
chat_id=chat_id,
user_id=user_id,
message_text=message_text,
user_info=user_info,
chat_type='supergroup',
is_bot=False
)
# 验证调用了流式发送器
message_handler.stream_sender.send_streaming_message.assert_called_once()
# 验证调用了Claude Agent (通过generator调用)
message_handler.claude_agent.create_streaming_response.assert_called_once()
@pytest.mark.asyncio
async def test_handle_webhook_message_private_chat(self, message_handler):
"""测试处理Webhook消息(私聊)"""
chat_id = 123 # 授权的私聊用户
user_id = 123
message_text = "私聊消息"
user_info = {
'user_id': user_id,
'username': 'testuser',
'first_name': 'Test'
}
# Mock流式响应
mock_reply = Mock()
mock_reply.text = "私聊回复"
mock_reply.message_id = 999
mock_reply.from_user = Mock()
mock_reply.from_user.id = 888
# Mock create_streaming_response
async def mock_streaming_response(*args, **kwargs):
yield "私聊回复"
message_handler.claude_agent.create_streaming_response = AsyncMock(
return_value=mock_streaming_response()
)
# Mock send_streaming_message to call the generator
async def mock_send_streaming_message(chat_id, message_generator, initial_text, **kwargs):
async for chunk in message_generator():
pass # 消费生成器
return mock_reply
message_handler.stream_sender.send_streaming_message = AsyncMock(
side_effect=mock_send_streaming_message
)
# 执行处理
await message_handler.handle_webhook_message(
chat_id=chat_id,
user_id=user_id,
message_text=message_text,
user_info=user_info,
chat_type='private',
is_bot=False
)
# 验证调用了流式发送器
message_handler.stream_sender.send_streaming_message.assert_called_once()
# 验证调用了Claude Agent
message_handler.claude_agent.create_streaming_response.assert_called_once()
@pytest.mark.asyncio
async def test_handle_webhook_message_no_reply_needed(self, message_handler):
"""测试处理Webhook消息(不需要回复)"""
chat_id = -999 # 未授权的群组
user_id = 123
message_text = "普通群组消息"
user_info = {'user_id': user_id}
# 执行处理
await message_handler.handle_webhook_message(
chat_id=chat_id,
user_id=user_id,
message_text=message_text,
user_info=user_info,
chat_type='supergroup',
is_bot=False
)
# 验证没有调用流式发送器(因为不需要回复)
message_handler.stream_sender.send_streaming_message.assert_not_called()
@pytest.mark.asyncio
async def test_handle_webhook_message_error_handling(self, message_handler):
"""测试Webhook消息处理错误"""
chat_id = 123
user_id = 123
message_text = "会导致错误的消息"
user_info = {'user_id': user_id}
# Mock抛出异常
message_handler.stream_sender.send_streaming_message = AsyncMock(
side_effect=Exception("流式发送失败")
)
# 应该不会抛出异常
await message_handler.handle_webhook_message(
chat_id=chat_id,
user_id=user_id,
message_text=message_text,
user_info=user_info,
chat_type='private',
is_bot=False
)
@pytest.mark.asyncio
async def test_should_reply_detailed_direct_bot_reply(self, message_handler, mock_components):
"""测试_should_reply_detailed - 直接回复bot消息"""
# 创建群组Update
update = Mock()
update.effective_chat = Mock()
update.effective_message = Mock()
update.effective_chat.type = "group"
update.effective_chat.id = -100 # 授权群组
# Mock回复了bot的消息
reply_message = Mock()
reply_message.from_user = Mock()
reply_message.from_user.is_bot = True
reply_message.from_user.id = 999 # Bot ID
update.effective_message.reply_to_message = reply_message
# Mock Bot对象
mock_bot = Mock()
mock_bot.id = 999 # 同一个bot
update.get_bot.return_value = mock_bot
should_reply, is_random = await message_handler._should_reply_detailed(update, "回复消息")
assert should_reply is True
assert is_random is False # 不是随机参与
@pytest.mark.asyncio
async def test_should_reply_detailed_name_mention(self, message_handler):
"""测试_should_reply_detailed - 检测到bot名字提及"""
update = Mock()
update.effective_chat = Mock()
update.effective_message = Mock()
update.effective_chat.type = "supergroup"
update.effective_chat.id = -100
update.effective_message.reply_to_message = None
# Mock get_bot返回None用户名
mock_bot = Mock()
mock_bot.username = None
update.get_bot.return_value = mock_bot
# Mock participation_manager检测到名字提及
message_handler.participation_manager.is_name_mentioned = Mock(return_value=True)
message_handler.participation_manager.reset_counter = Mock()
should_reply, is_random = await message_handler._should_reply_detailed(update, "Claude 帮忙")
assert should_reply is True
assert is_random is False
message_handler.participation_manager.reset_counter.assert_called_once()
@pytest.mark.asyncio
async def test_should_reply_detailed_random_participation(self, message_handler):
"""测试_should_reply_detailed - 随机参与"""
update = Mock()
update.effective_chat = Mock()
update.effective_message = Mock()
update.effective_chat.type = "group"
update.effective_chat.id = -100
update.effective_message.reply_to_message = None
mock_bot = Mock()
mock_bot.username = "testbot"
update.get_bot.return_value = mock_bot
# Mock没有@提及和名字提及,但达到随机参与条件
message_handler.participation_manager.is_name_mentioned = Mock(return_value=False)
message_handler.participation_manager.should_participate_random = Mock(return_value=True)
should_reply, is_random = await message_handler._should_reply_detailed(update, "普通消息")
assert should_reply is True
assert is_random is True # 这是随机参与
@pytest.mark.asyncio
async def test_should_reply_with_auth_private_unauthorized(self, message_handler):
"""测试_should_reply_with_auth - 私聊未授权用户"""
update = Mock()
update.effective_user = Mock()
update.effective_chat = Mock()
update.effective_user.id = 999 # 未授权用户
update.effective_chat.type = "private"
result = await message_handler._should_reply_with_auth(update, "测试消息")
assert result is False
@pytest.mark.asyncio
async def test_should_reply_with_auth_group_unauthorized(self, message_handler):
"""测试_should_reply_with_auth - 群组未授权"""
update = Mock()
update.effective_user = Mock()
update.effective_chat = Mock()
update.effective_user.id = 123
update.effective_chat.id = -999 # 未授权群组
update.effective_chat.type = "supergroup"
result = await message_handler._should_reply_with_auth(update, "测试消息")
assert result is False
@pytest.mark.asyncio
async def test_should_reply_with_auth_missing_objects(self, message_handler):
"""测试_should_reply_with_auth - 缺少必要对象"""
update = Mock()
update.effective_user = None
update.effective_chat = None
result = await message_handler._should_reply_with_auth(update, "测试消息")
assert result is False
def test_format_reply_context_bot_with_username(self, message_handler):
"""测试_format_reply_context - Bot用户带用户名"""
reply_info = {
'user_info': {
'is_bot': True,
'username': 'testbot',
'first_name': 'TestBot',
'last_name': 'Assistant'
},
'timestamp': None,
'content': 'Bot的回复内容'
}
result = message_handler._format_reply_context(reply_info)
expected = "↳ 回复 [🤖 @testbot]: Bot的回复内容"
assert result == expected
def test_format_reply_context_bot_without_username(self, message_handler):
"""测试_format_reply_context - Bot用户没有用户名"""
reply_info = {
'user_info': {
'is_bot': True,
'username': None,
'first_name': 'TestBot',
'last_name': None
},
'timestamp': None,
'content': 'Bot回复'
}
result = message_handler._format_reply_context(reply_info)
expected = "↳ 回复 [🤖 TestBot]: Bot回复"
assert result == expected
def test_format_reply_context_system_user(self, message_handler):
"""测试_format_reply_context - 系统用户"""
reply_info = {
'user_info': {
'is_bot': False,
'username': None,
'first_name': None,
'user_id': 'system'
},
'timestamp': None,
'content': '系统消息'
}
result = message_handler._format_reply_context(reply_info)
expected = "↳ 回复 [system]: 系统消息"
assert result == expected
def test_format_reply_context_unknown_user(self, message_handler):
"""测试_format_reply_context - 未知用户"""
reply_info = {
'user_info': {
'is_bot': False,
'username': None,
'first_name': None,
'user_id': 0
},
'timestamp': None,
'content': '未知用户消息'
}
result = message_handler._format_reply_context(reply_info)
expected = "↳ 回复 [unknown user]: 未知用户消息"
assert result == expected
def test_format_reply_context_with_timestamp(self, message_handler):
"""测试_format_reply_context - 包含时间戳"""
from datetime import datetime
mock_time = datetime(2024, 1, 1, 14, 30, 0)
reply_info = {
'user_info': {
'is_bot': False,
'username': 'testuser',
'first_name': 'Test'
},
'timestamp': mock_time,
'content': '带时间戳的消息'
}
result = message_handler._format_reply_context(reply_info)
expected = "↳ 回复 [@testuser 14:30]: 带时间戳的消息"
assert result == expected
def test_format_reply_context_empty_info(self, message_handler):
"""测试_format_reply_context - 空回复信息"""
result = message_handler._format_reply_context(None)
assert result == ""
result = message_handler._format_reply_context({})
assert result == ""
@pytest.mark.asyncio
async def test_get_agent_history_with_persistence(self, message_handler):
"""测试_get_agent_history - 有persistence的情况"""
# Mock claude_agent有persistence属性
mock_persistence = Mock()
mock_persistence.load_conversation_history.return_value = [
{"role": "user", "content": "历史消息1"},
{"role": "assistant", "content": "历史回复1"}
]
message_handler.claude_agent.persistence = mock_persistence
history = await message_handler._get_agent_history("test_chat")
assert len(history) == 2
assert history[0]["content"] == "历史消息1"
mock_persistence.load_conversation_history.assert_called_once_with("test_chat")
@pytest.mark.asyncio
async def test_get_agent_history_no_persistence(self, message_handler):
"""测试_get_agent_history - 没有persistence的情况"""
# 确保claude_agent没有persistence属性
if hasattr(message_handler.claude_agent, 'persistence'):
del message_handler.claude_agent.persistence
history = await message_handler._get_agent_history("test_chat")
assert history == []
@pytest.mark.asyncio
async def test_get_agent_history_exception(self, message_handler):
"""测试_get_agent_history - 异常处理"""
# Mock persistence抛出异常
mock_persistence = Mock()
mock_persistence.load_conversation_history.side_effect = Exception("加载失败")
message_handler.claude_agent.persistence = mock_persistence
history = await message_handler._get_agent_history("test_chat")
assert history == []
@pytest.mark.asyncio
async def test_save_message_to_agent_user_message(self, message_handler):
"""测试_save_message_to_agent - 用户消息"""
# Mock Agent实例
mock_agent = Mock()
mock_agent.conversation_history = []
message_handler.claude_agent._get_or_create_agent = Mock(return_value=mock_agent)
user_info = {
'username': 'testuser',
'first_name': 'Test',
'reply_info': {
'user_info': {'username': 'otheruser'},
'content': '被回复的内容',
'timestamp': None
}
}
await message_handler._save_message_to_agent(
chat_id=123,
user_id=456,
message="用户的消息",
is_bot=False,
user_info=user_info
)
# 验证消息被添加到历史记录
assert len(mock_agent.conversation_history) == 1
message_entry = mock_agent.conversation_history[0]
assert message_entry['role'] == 'user'
assert '用户的消息' in message_entry['content']
assert '@testuser' in message_entry['content']
@pytest.mark.asyncio
async def test_save_message_to_agent_bot_message(self, message_handler):
"""测试_save_message_to_agent - Bot消息"""
mock_agent = Mock()
mock_agent.conversation_history = []
message_handler.claude_agent._get_or_create_agent = Mock(return_value=mock_agent)
await message_handler._save_message_to_agent(
chat_id=123,
user_id=999,
message="Bot的回复",
is_bot=True,
user_info={'first_name': 'TestBot'}
)
# 验证Bot消息被添加
assert len(mock_agent.conversation_history) == 1
message_entry = mock_agent.conversation_history[0]
assert message_entry['role'] == 'assistant'
assert 'Bot的回复' in message_entry['content']
assert 'Bot' in message_entry['content']
@pytest.mark.asyncio
async def test_save_message_to_agent_exception(self, message_handler):
"""测试_save_message_to_agent - 异常处理"""
# Mock _get_or_create_agent抛出异常
message_handler.claude_agent._get_or_create_agent = Mock(
side_effect=Exception("Agent创建失败")
)
# 应该不会抛出异常
await message_handler._save_message_to_agent(
chat_id=123,
user_id=456,
message="测试消息",
is_bot=False
)