| """ |
| Telegram Bot单元测试 - 消息处理器 |
| """ |
| |
| import pytest |
| from unittest.mock import Mock, AsyncMock, patch |
| |
| import sys |
| from pathlib import Path |
| sys.path.insert(0, str(Path(__file__).parent.parent.parent.parent / "src")) |
| |
| from claude_agent.telegram.message_handler import MessageHandler |
| |
| |
| class TestMessageHandler: |
| """消息处理器测试""" |
| |
| @pytest.fixture |
| def mock_components(self): |
| """创建Mock组件""" |
| components = { |
| 'telegram_client': Mock(), |
| 'context_manager': Mock(), |
| 'file_handler': Mock(), |
| 'claude_agent': Mock(), |
| 'stream_sender': Mock() |
| } |
| |
| # 设置异步方法 |
| components['telegram_client'].send_message = AsyncMock() |
| components['telegram_client'].edit_message_text = AsyncMock() |
| components['file_handler'].save_telegram_file = AsyncMock() |
| components['file_handler'].process_image = AsyncMock() |
| components['file_handler'].process_document = AsyncMock() |
| components['claude_agent'].process_message = AsyncMock() |
| components['claude_agent'].process_with_image = AsyncMock() |
| components['claude_agent'].process_with_document = AsyncMock() |
| components['stream_sender'].send_streaming_message = AsyncMock() |
| |
| return components |
| |
| @pytest.fixture |
| def message_handler(self, mock_components): |
| """创建消息处理器实例""" |
| return MessageHandler( |
| telegram_client=mock_components['telegram_client'], |
| context_manager=mock_components['context_manager'], |
| file_handler=mock_components['file_handler'], |
| claude_agent=mock_components['claude_agent'], |
| stream_sender=mock_components['stream_sender'], |
| allowed_users=[123, 456], |
| allowed_groups=[-100, -200] |
| ) |
| |
| def test_init(self, mock_components): |
| """测试初始化""" |
| handler = MessageHandler( |
| telegram_client=mock_components['telegram_client'], |
| context_manager=mock_components['context_manager'], |
| file_handler=mock_components['file_handler'], |
| claude_agent=mock_components['claude_agent'], |
| stream_sender=mock_components['stream_sender'], |
| allowed_users=[111, 222], |
| allowed_groups=[-333, -444] |
| ) |
| |
| assert handler.allowed_users == {111, 222} |
| assert handler.allowed_groups == {-333, -444} |
| |
| @pytest.mark.asyncio |
| async def test_handle_text_message_private_authorized(self, message_handler, mock_components): |
| """测试处理私聊文本消息(已授权用户)""" |
| # 创建Mock Update对象 |
| update = Mock() |
| update.effective_message = Mock() |
| update.effective_user = Mock() |
| update.effective_chat = Mock() |
| |
| update.effective_message.text = "Hello Bot" |
| update.effective_message.message_id = 789 |
| update.effective_message.reply_to_message = None # 没有回复任何消息 |
| update.effective_user.id = 123 # 授权用户 |
| update.effective_user.username = "testuser" |
| update.effective_user.first_name = "Test" |
| update.effective_user.last_name = "User" |
| update.effective_user.language_code = "en" |
| update.effective_chat.id = 123 |
| update.effective_chat.type = "private" |
| |
| # Mock上下文管理器返回值 |
| mock_components['context_manager'].get_context.return_value = [] |
| |
| # Mock Claude Agent方法 |
| mock_agent = Mock() |
| mock_agent.conversation_history = [] # 添加conversation_history属性 |
| mock_components['claude_agent']._get_or_create_agent.return_value = mock_agent |
| |
| # Mock create_streaming_response为异步生成器 |
| async def mock_streaming_generator(*args): |
| yield "Hello!" |
| yield " How can" |
| yield " I help you?" |
| mock_components['claude_agent'].create_streaming_response.return_value = mock_streaming_generator() |
| |
| # Mock流式发送器返回值 |
| mock_reply = Mock() |
| mock_reply.text = "Hello! How can I help you?" |
| mock_reply.from_user = Mock() |
| mock_reply.from_user.id = 999 # Bot ID |
| mock_reply.message_id = 999 |
| |
| # Mock send_streaming_message 来实际调用传入的生成器 |
| async def mock_send_streaming_message(chat_id, message_generator, initial_text, reply_to_message_id=None): |
| # 实际调用生成器来触发create_streaming_response |
| chunks = [] |
| async for chunk in message_generator(): |
| chunks.append(chunk) |
| return mock_reply |
| |
| mock_components['stream_sender'].send_streaming_message.side_effect = mock_send_streaming_message |
| |
| context = Mock() |
| |
| # 调用处理函数 |
| await message_handler.handle_text_message(update, context) |
| |
| # 验证should_reply逻辑是否正确工作 |
| # 对于私聊且用户123在白名单中,should_reply应该为True |
| assert update.effective_user.id == 123 |
| assert update.effective_chat.type == "private" |
| assert 123 in message_handler.allowed_users |
| |
| # 验证调用了Claude Agent的_get_or_create_agent方法 |
| mock_components['claude_agent']._get_or_create_agent.assert_called_with(123) |
| |
| # 验证调用了Claude Agent的create_streaming_response |
| mock_components['claude_agent'].create_streaming_response.assert_called_once() |
| |
| # 验证调用了流式发送器 |
| mock_components['stream_sender'].send_streaming_message.assert_called_once() |
| |
| @pytest.mark.asyncio |
| async def test_handle_text_message_unauthorized(self, message_handler, mock_components): |
| """测试处理未授权用户的消息""" |
| # 创建未授权用户的Update |
| update = Mock() |
| update.effective_message = Mock() |
| update.effective_user = Mock() |
| update.effective_chat = Mock() |
| |
| update.effective_user.id = 999 # 未授权用户 |
| update.effective_chat.id = 999 |
| update.effective_chat.type = "private" |
| |
| context = Mock() |
| |
| # 调用处理函数 |
| await message_handler.handle_text_message(update, context) |
| |
| # 验证没有进行任何处理 |
| mock_components['context_manager'].add_message.assert_not_called() |
| mock_components['claude_agent'].process_message.assert_not_called() |
| |
| @pytest.mark.asyncio |
| async def test_handle_text_message_group_with_mention(self, message_handler, mock_components): |
| """测试处理群组中@机器人的消息""" |
| # 创建群组消息 |
| update = Mock() |
| update.effective_message = Mock() |
| update.effective_user = Mock() |
| update.effective_chat = Mock() |
| |
| update.effective_message.text = "@testbot help me" |
| update.effective_message.message_id = 789 |
| update.effective_user.id = 123 # 授权用户 |
| update.effective_chat.id = -100 # 授权群组 |
| update.effective_chat.type = "group" |
| |
| # Mock get_bot方法 |
| mock_bot = Mock() |
| mock_bot.username = "testbot" |
| update.get_bot.return_value = mock_bot |
| |
| mock_components['context_manager'].get_context.return_value = [] |
| mock_components['claude_agent'].process_message.return_value = "Group response" |
| |
| mock_reply = Mock() |
| mock_reply.text = "Group response" |
| mock_reply.from_user = Mock() |
| mock_reply.from_user.id = 999 |
| mock_components['stream_sender'].send_streaming_message.return_value = mock_reply |
| |
| context = Mock() |
| |
| # 调用处理函数 |
| await message_handler.handle_text_message(update, context) |
| |
| # 验证发送流式回复被调用 |
| mock_components['stream_sender'].send_streaming_message.assert_called_once() |
| |
| # 验证调用参数 |
| args, kwargs = mock_components['stream_sender'].send_streaming_message.call_args |
| assert kwargs['chat_id'] == -100 |
| assert kwargs['initial_text'] == "⌨️ User is typing..." |
| assert kwargs['reply_to_message_id'] == 789 |
| |
| @pytest.mark.asyncio |
| async def test_handle_text_message_group_without_mention(self, message_handler, mock_components): |
| """测试处理群组中未@机器人的消息""" |
| # 创建群组消息(无@) |
| update = Mock() |
| update.effective_message = Mock() |
| update.effective_user = Mock() |
| update.effective_chat = Mock() |
| |
| update.effective_message.text = "Just a normal message" |
| update.effective_user.id = 123 |
| update.effective_chat.id = -100 |
| update.effective_chat.type = "group" |
| |
| # Mock get_bot方法 |
| mock_bot = Mock() |
| mock_bot.username = "testbot" |
| update.get_bot.return_value = mock_bot |
| |
| context = Mock() |
| |
| # 调用处理函数 |
| await message_handler.handle_text_message(update, context) |
| |
| # 验证没有处理消息 |
| mock_components['claude_agent'].process_message.assert_not_called() |
| |
| @pytest.mark.asyncio |
| async def test_handle_photo_message(self, message_handler, mock_components): |
| """测试处理图片消息(简化版本)""" |
| # 创建带图片的Update |
| update = Mock() |
| update.effective_message = Mock() |
| update.effective_user = Mock() |
| update.effective_chat = Mock() |
| |
| # Mock图片数据 |
| mock_photo = Mock() |
| mock_photo.file_id = "photo123" |
| update.effective_message.photo = [mock_photo] |
| update.effective_message.caption = "What's in this image?" |
| update.effective_message.message_id = 789 |
| update.effective_message.reply_to_message = None # 没有回复任何消息 |
| |
| update.effective_user.id = 123 |
| update.effective_chat.id = 123 |
| update.effective_chat.type = "private" |
| |
| # Mock回复消息 |
| mock_reply = Mock() |
| mock_reply.text = "🖼️ 图片处理功能暂时不可用,请用文字描述您想了解的内容,我会尽力帮助您!" |
| mock_reply.from_user = Mock() |
| mock_reply.from_user.id = 999 |
| mock_components['telegram_client'].send_message = AsyncMock(return_value=mock_reply) |
| |
| context = Mock() |
| |
| # 调用处理函数 |
| await message_handler.handle_photo_message(update, context) |
| |
| # 验证发送了不可用消息 |
| mock_components['telegram_client'].send_message.assert_called_with( |
| chat_id=123, |
| text="🖼️ 图片处理功能暂时不可用,请用文字描述您想了解的内容,我会尽力帮助您!", |
| reply_to_message_id=789 |
| ) |
| |
| @pytest.mark.asyncio |
| async def test_handle_document_message(self, message_handler, mock_components): |
| """测试处理文档消息""" |
| # 创建带文档的Update |
| update = Mock() |
| update.effective_message = Mock() |
| update.effective_user = Mock() |
| update.effective_chat = Mock() |
| |
| # Mock文档数据 |
| mock_document = Mock() |
| mock_document.file_id = "doc123" |
| mock_document.file_name = "test.pdf" |
| mock_document.file_size = 1024 * 1024 # 1MB |
| update.effective_message.document = mock_document |
| update.effective_message.caption = "Please analyze this document" |
| update.effective_message.message_id = 789 |
| update.effective_message.reply_to_message = None # 没有回复任何消息 |
| |
| update.effective_user.id = 123 |
| update.effective_chat.id = 123 |
| update.effective_chat.type = "private" |
| |
| # Mock回复消息 |
| mock_reply = Mock() |
| mock_reply.text = "📄 文档处理功能暂时不可用,请用文字描述您想了解的内容,我会尽力帮助您!" |
| mock_reply.from_user = Mock() |
| mock_reply.from_user.id = 999 |
| mock_components['telegram_client'].send_message = AsyncMock(return_value=mock_reply) |
| |
| context = Mock() |
| |
| # 调用处理函数 |
| await message_handler.handle_document_message(update, context) |
| |
| # 验证发送了不可用消息 |
| mock_components['telegram_client'].send_message.assert_called_with( |
| chat_id=123, |
| text="📄 文档处理功能暂时不可用,请用文字描述您想了解的内容,我会尽力帮助您!", |
| reply_to_message_id=789 |
| ) |
| |
| @pytest.mark.asyncio |
| async def test_handle_document_message_too_large(self, message_handler, mock_components): |
| """测试处理过大的文档""" |
| # 创建过大文档的Update |
| update = Mock() |
| update.effective_message = Mock() |
| update.effective_user = Mock() |
| update.effective_chat = Mock() |
| |
| mock_document = Mock() |
| mock_document.file_size = 25 * 1024 * 1024 # 25MB,超过限制 |
| mock_document.file_name = "large.pdf" |
| update.effective_message.document = mock_document |
| update.effective_message.message_id = 789 |
| |
| update.effective_user.id = 123 |
| update.effective_chat.id = 123 |
| update.effective_chat.type = "private" |
| |
| context = Mock() |
| |
| # 调用处理函数 |
| await message_handler.handle_document_message(update, context) |
| |
| # 验证发送了错误消息 |
| mock_components['telegram_client'].send_message.assert_called_with( |
| chat_id=123, |
| text="📄 文档处理功能暂时不可用,请用文字描述您想了解的内容,我会尽力帮助您!", |
| reply_to_message_id=789 |
| ) |
| |
| @pytest.mark.asyncio |
| async def test_error_handling(self, message_handler, mock_components): |
| """测试错误处理""" |
| # 创建会导致错误的Update |
| update = Mock() |
| update.effective_message = Mock() |
| update.effective_user = Mock() |
| update.effective_chat = Mock() |
| |
| update.effective_message.text = "Error test" |
| update.effective_user.id = 123 |
| update.effective_chat.id = 123 |
| update.effective_chat.type = "private" |
| |
| # Mock上下文管理器抛出异常来触发错误处理 |
| mock_components['context_manager'].add_message.side_effect = Exception("Context error") |
| |
| context = Mock() |
| |
| # 调用处理函数(应该不抛出异常) |
| await message_handler.handle_text_message(update, context) |
| |
| # 验证方法正常完成且没有抛出异常 |
| assert True # 如果运行到这里说明没有异常 |
| |
| def test_build_user_info(self, message_handler): |
| """测试构建用户信息""" |
| # 创建Mock用户和聊天对象 |
| user = Mock() |
| user.id = 123 |
| user.username = "testuser" |
| user.first_name = "Test" |
| user.last_name = "User" |
| user.language_code = "en" |
| |
| chat = Mock() |
| chat.id = 456 |
| chat.type = "private" |
| |
| # 构建用户信息 |
| user_info = message_handler._build_user_info(user, chat) |
| |
| # 验证信息 |
| assert user_info['user_id'] == 123 |
| assert user_info['username'] == "testuser" |
| assert user_info['first_name'] == "Test" |
| assert user_info['last_name'] == "User" |
| assert user_info['language_code'] == "en" |
| assert user_info['chat_id'] == 456 |
| assert user_info['chat_type'] == "private" |
| |
| def test_build_user_info_group_chat(self, message_handler): |
| """测试构建群组聊天用户信息""" |
| user = Mock() |
| user.id = 123 |
| user.username = "testuser" |
| user.first_name = "Test" |
| user.last_name = None |
| user.language_code = "zh" |
| |
| chat = Mock() |
| chat.id = -100 |
| chat.type = "group" |
| chat.title = "Test Group" |
| |
| user_info = message_handler._build_user_info(user, chat) |
| |
| assert user_info['chat_type'] == "group" |
| assert user_info['chat_title'] == "Test Group" |
| assert user_info['last_name'] is None |
| |
| @pytest.mark.asyncio |
| async def test_is_authorized_private_allowed(self, message_handler): |
| """测试私聊授权检查(允许的用户)""" |
| update = Mock() |
| update.effective_user = Mock() |
| update.effective_chat = Mock() |
| |
| update.effective_user.id = 123 # 在允许列表中 |
| update.effective_chat.type = "private" |
| |
| result = await message_handler._is_authorized(update) |
| assert result is True |
| |
| @pytest.mark.asyncio |
| async def test_is_authorized_private_denied(self, message_handler): |
| """测试私聊授权检查(拒绝的用户)""" |
| update = Mock() |
| update.effective_user = Mock() |
| update.effective_chat = Mock() |
| |
| update.effective_user.id = 999 # 不在允许列表中 |
| update.effective_chat.type = "private" |
| |
| result = await message_handler._is_authorized(update) |
| assert result is False |
| |
| @pytest.mark.asyncio |
| async def test_is_authorized_group_allowed(self, message_handler): |
| """测试群组授权检查(允许的群组)""" |
| update = Mock() |
| update.effective_user = Mock() |
| update.effective_chat = Mock() |
| |
| update.effective_user.id = 999 # 用户ID不重要 |
| update.effective_chat.id = -100 # 在允许的群组列表中 |
| update.effective_chat.type = "group" |
| |
| result = await message_handler._is_authorized(update) |
| assert result is True |
| |
| @pytest.mark.asyncio |
| async def test_is_authorized_group_denied(self, message_handler): |
| """测试群组授权检查(拒绝的群组)""" |
| update = Mock() |
| update.effective_user = Mock() |
| update.effective_chat = Mock() |
| |
| update.effective_user.id = 123 # 即使是允许的用户 |
| update.effective_chat.id = -999 # 不在允许的群组列表中 |
| update.effective_chat.type = "group" |
| |
| result = await message_handler._is_authorized(update) |
| assert result is False |
| |
| @pytest.mark.asyncio |
| async def test_should_reply_private_chat(self, message_handler): |
| """测试私聊回复判断""" |
| update = Mock() |
| update.effective_chat = Mock() |
| update.effective_user = Mock() |
| update.effective_chat.type = "private" |
| update.effective_user.id = 123 # 在白名单中的用户 |
| |
| should_reply, is_random = await message_handler._get_reply_info(update, "any message") |
| assert should_reply is True |
| assert is_random is False |
| |
| @pytest.mark.asyncio |
| async def test_should_reply_group_with_mention(self, message_handler): |
| """测试群组中带@的回复判断""" |
| update = Mock() |
| update.effective_chat = Mock() |
| update.effective_chat.type = "group" |
| update.effective_message = Mock() |
| update.effective_message.reply_to_message = None # 不是回复消息 |
| |
| # Mock Bot对象 |
| mock_bot = Mock() |
| mock_bot.username = "testbot" |
| update.get_bot.return_value = mock_bot |
| |
| # Mock participation manager to control random participation |
| with patch.object(message_handler.participation_manager, 'is_name_mentioned', return_value=False), \ |
| patch.object(message_handler.participation_manager, 'should_participate_random', return_value=False): |
| |
| # 测试各种@格式 |
| assert await message_handler._should_reply(update, "@testbot help") is True |
| assert await message_handler._should_reply(update, "@TestBot help") is True # 大小写不敏感 |
| assert await message_handler._should_reply(update, "help @testbot") is True |
| assert await message_handler._should_reply(update, "just a message") is False |
| |
| @pytest.mark.asyncio |
| async def test_should_reply_group_no_bot_username(self, message_handler): |
| """测试群组中无Bot用户名的情况""" |
| update = Mock() |
| update.effective_chat = Mock() |
| update.effective_chat.type = "group" |
| |
| # Mock Bot对象无用户名 |
| mock_bot = Mock() |
| mock_bot.username = None |
| update.get_bot.return_value = mock_bot |
| |
| result = await message_handler._should_reply(update, "@someone help") |
| assert result is False |
| |
| @pytest.mark.asyncio |
| async def test_should_reply_no_bot(self, message_handler): |
| """测试无法获取Bot信息的情况""" |
| update = Mock() |
| update.effective_chat = Mock() |
| update.effective_chat.type = "group" |
| |
| update.get_bot.return_value = None |
| |
| result = await message_handler._should_reply(update, "@someone help") |
| assert result is False |
| |
| def test_extract_reply_context_no_reply(self, message_handler): |
| """测试没有回复消息的情况""" |
| message = Mock() |
| message.reply_to_message = None |
| |
| result = message_handler._extract_reply_context(message) |
| assert result == "" |
| |
| def test_extract_reply_context_with_text_reply(self, message_handler): |
| """测试回复文本消息的情况""" |
| from datetime import datetime |
| |
| # 创建被回复的消息 |
| reply_msg = Mock() |
| reply_msg.from_user = Mock() |
| reply_msg.from_user.is_bot = False |
| reply_msg.from_user.username = "testuser" |
| reply_msg.from_user.first_name = "Test" |
| reply_msg.from_user.last_name = "User" |
| reply_msg.text = "这是被回复的消息内容" |
| |
| # 使用Mock对象来控制时区转换,返回真实的datetime对象 |
| mock_local_time = datetime(2024, 1, 1, 12, 30, 45) |
| mock_date = Mock() |
| mock_date.tzinfo = None # 模拟无时区信息 |
| mock_date.replace.return_value.astimezone.return_value = mock_local_time |
| reply_msg.date = mock_date |
| |
| reply_msg.caption = None |
| reply_msg.photo = None |
| reply_msg.document = None |
| reply_msg.voice = None |
| reply_msg.video = None |
| reply_msg.audio = None |
| reply_msg.sticker = None |
| |
| # 创建当前消息 |
| message = Mock() |
| message.reply_to_message = reply_msg |
| |
| result = message_handler._extract_reply_context(message) |
| expected = "↳ 回复 [@testuser 12:30]: 这是被回复的消息内容" |
| assert result == expected |
| |
| def test_extract_reply_context_with_bot_reply(self, message_handler): |
| """测试回复Bot消息的情况""" |
| from datetime import datetime |
| |
| # 创建被回复的Bot消息 |
| reply_msg = Mock() |
| reply_msg.from_user = Mock() |
| reply_msg.from_user.is_bot = True |
| reply_msg.text = "Bot的回复消息" |
| |
| # 使用Mock对象来控制时区转换 |
| mock_local_time = datetime(2024, 1, 1, 14, 15, 30) |
| mock_date = Mock() |
| mock_date.tzinfo = None |
| mock_date.replace.return_value.astimezone.return_value = mock_local_time |
| reply_msg.date = mock_date |
| |
| reply_msg.caption = None |
| reply_msg.photo = None |
| reply_msg.document = None |
| reply_msg.voice = None |
| reply_msg.video = None |
| reply_msg.audio = None |
| reply_msg.sticker = None |
| |
| # 创建当前消息 |
| message = Mock() |
| message.reply_to_message = reply_msg |
| |
| result = message_handler._extract_reply_context(message) |
| expected = "↳ 回复 [🤖 Bot 14:15]: Bot的回复消息" |
| assert result == expected |
| |
| def test_extract_reply_context_with_photo_reply(self, message_handler): |
| """测试回复图片消息的情况""" |
| from datetime import datetime |
| |
| # 创建被回复的图片消息 |
| reply_msg = Mock() |
| reply_msg.from_user = Mock() |
| reply_msg.from_user.is_bot = False |
| reply_msg.from_user.username = None |
| reply_msg.from_user.first_name = "张三" |
| reply_msg.from_user.last_name = None |
| reply_msg.text = None |
| reply_msg.caption = "这是图片描述" |
| reply_msg.photo = Mock() # 有图片 |
| |
| # 使用Mock对象来控制时区转换 |
| mock_local_time = datetime(2024, 1, 1, 10, 0, 0) |
| mock_date = Mock() |
| mock_date.tzinfo = None |
| mock_date.replace.return_value.astimezone.return_value = mock_local_time |
| reply_msg.date = mock_date |
| |
| reply_msg.document = None |
| reply_msg.voice = None |
| reply_msg.video = None |
| reply_msg.audio = None |
| reply_msg.sticker = None |
| |
| # 创建当前消息 |
| message = Mock() |
| message.reply_to_message = reply_msg |
| |
| result = message_handler._extract_reply_context(message) |
| expected = "↳ 回复 [张三 10:00]: 这是图片描述" |
| assert result == expected |
| |
| def test_extract_reply_context_with_document_reply(self, message_handler): |
| """测试回复文档消息的情况""" |
| from datetime import datetime |
| |
| # 创建被回复的文档消息 |
| reply_msg = Mock() |
| reply_msg.from_user = Mock() |
| reply_msg.from_user.is_bot = False |
| reply_msg.from_user.username = "user123" |
| reply_msg.text = None |
| reply_msg.caption = None |
| reply_msg.photo = None |
| reply_msg.document = Mock() |
| reply_msg.document.file_name = "example.pdf" |
| |
| # 使用Mock对象来控制时区转换 |
| mock_local_time = datetime(2024, 1, 1, 16, 45, 0) |
| mock_date = Mock() |
| mock_date.tzinfo = None |
| mock_date.replace.return_value.astimezone.return_value = mock_local_time |
| reply_msg.date = mock_date |
| |
| reply_msg.voice = None |
| reply_msg.video = None |
| reply_msg.audio = None |
| reply_msg.sticker = None |
| |
| # 创建当前消息 |
| message = Mock() |
| message.reply_to_message = reply_msg |
| |
| result = message_handler._extract_reply_context(message) |
| expected = "↳ 回复 [@user123 16:45]: [文档: example.pdf]" |
| assert result == expected |
| |
| def test_extract_reply_context_long_message(self, message_handler): |
| """测试回复长消息的情况(内容截断)""" |
| from datetime import datetime |
| |
| # 创建被回复的长消息 |
| long_text = "这是一条非常长的消息内容," * 10 # 超过100字符 |
| reply_msg = Mock() |
| reply_msg.from_user = Mock() |
| reply_msg.from_user.is_bot = False |
| reply_msg.from_user.username = "longuser" |
| reply_msg.text = long_text |
| reply_msg.date = datetime(2024, 1, 1, 9, 30, 0) |
| reply_msg.caption = None |
| reply_msg.photo = None |
| reply_msg.document = None |
| reply_msg.voice = None |
| reply_msg.video = None |
| reply_msg.audio = None |
| reply_msg.sticker = None |
| |
| # 创建当前消息 |
| message = Mock() |
| message.reply_to_message = reply_msg |
| |
| result = message_handler._extract_reply_context(message) |
| # 验证内容被截断了 |
| assert "..." in result |
| assert len(result) <= 200 # 确保总长度合理 |
| |
| def test_build_message_with_reply_context_no_reply(self, message_handler): |
| """测试构建无回复消息的情况""" |
| message = Mock() |
| message.reply_to_message = None |
| |
| result = message_handler._build_message_with_reply_context("Hello", message) |
| assert result == "Hello" |
| |
| def test_build_message_with_reply_context_with_reply(self, message_handler): |
| """测试构建有回复消息的情况""" |
| from datetime import datetime |
| |
| # 创建被回复的消息 |
| reply_msg = Mock() |
| reply_msg.from_user = Mock() |
| reply_msg.from_user.is_bot = False |
| reply_msg.from_user.username = "testuser" |
| reply_msg.text = "原始消息" |
| |
| # 使用Mock对象来控制时区转换 |
| mock_local_time = datetime(2024, 1, 1, 12, 0, 0) |
| mock_date = Mock() |
| mock_date.tzinfo = None |
| mock_date.replace.return_value.astimezone.return_value = mock_local_time |
| reply_msg.date = mock_date |
| reply_msg.caption = None |
| reply_msg.photo = None |
| reply_msg.document = None |
| reply_msg.voice = None |
| reply_msg.video = None |
| reply_msg.audio = None |
| reply_msg.sticker = None |
| |
| # 创建当前消息 |
| message = Mock() |
| message.reply_to_message = reply_msg |
| |
| result = message_handler._build_message_with_reply_context("我的回复", message) |
| expected = "↳ 回复 [@testuser 12:00]: 原始消息\n我的回复" |
| assert result == expected |
| |
| # Add tests for webhook broadcasting functionality |
| @pytest.mark.asyncio |
| async def test_handle_text_message_with_webhook_broadcast(self, mock_components): |
| """测试包含Webhook广播的文本消息处理""" |
| # 创建带有Webhook回调的消息处理器 |
| webhook_callback = AsyncMock() |
| handler = MessageHandler( |
| telegram_client=mock_components['telegram_client'], |
| context_manager=mock_components['context_manager'], |
| file_handler=mock_components['file_handler'], |
| claude_agent=mock_components['claude_agent'], |
| stream_sender=mock_components['stream_sender'], |
| allowed_users=[123], |
| allowed_groups=[-100], |
| webhook_broadcast_callback=webhook_callback |
| ) |
| |
| # 创建群聊Update对象 |
| update = Mock() |
| update.effective_message = Mock() |
| update.effective_user = Mock() |
| update.effective_chat = Mock() |
| |
| update.effective_message.text = "@Claude 帮助我" |
| update.effective_message.message_id = 789 |
| update.effective_message.reply_to_message = None |
| update.effective_user.id = 123 |
| update.effective_user.username = "testuser" |
| update.effective_user.first_name = "Test" |
| update.effective_chat.id = -100 |
| update.effective_chat.type = "group" |
| update.effective_chat.title = "测试群" |
| |
| # Mock上下文管理器 |
| mock_components['context_manager'].get_context.return_value = [] |
| |
| # Mock Claude Agent |
| mock_agent = Mock() |
| mock_agent.conversation_history = [] |
| mock_components['claude_agent']._get_or_create_agent.return_value = mock_agent |
| |
| # Mock返回消息 |
| reply_message = Mock() |
| reply_message.text = "Bot回复内容" |
| reply_message.message_id = 790 |
| reply_message.from_user = Mock() |
| reply_message.from_user.id = 999 |
| reply_message.from_user.username = "testbot" |
| reply_message.from_user.first_name = "TestBot" |
| reply_message.from_user.last_name = None |
| |
| mock_components['stream_sender'].send_streaming_message.return_value = reply_message |
| |
| context = Mock() |
| |
| # 执行处理 |
| await handler.handle_text_message(update, context) |
| |
| # 验证Webhook广播被调用 |
| webhook_callback.assert_called_once() |
| call_args = webhook_callback.call_args[1] |
| assert call_args['group_id'] == -100 |
| assert call_args['message_content'] == "Bot回复内容" |
| assert call_args['sender_info']['is_bot'] is True |
| |
| @pytest.mark.asyncio |
| async def test_handle_text_message_webhook_broadcast_error(self, mock_components): |
| """测试Webhook广播失败的处理""" |
| # 创建会抛出异常的Webhook回调 |
| webhook_callback = AsyncMock() |
| webhook_callback.side_effect = Exception("Webhook失败") |
| |
| handler = MessageHandler( |
| telegram_client=mock_components['telegram_client'], |
| context_manager=mock_components['context_manager'], |
| file_handler=mock_components['file_handler'], |
| claude_agent=mock_components['claude_agent'], |
| stream_sender=mock_components['stream_sender'], |
| allowed_users=[123], |
| allowed_groups=[-100], |
| webhook_broadcast_callback=webhook_callback |
| ) |
| |
| # 创建群聊Update |
| update = Mock() |
| update.effective_message = Mock() |
| update.effective_user = Mock() |
| update.effective_chat = Mock() |
| |
| update.effective_message.text = "@Claude test" |
| update.effective_message.message_id = 789 |
| update.effective_message.reply_to_message = None |
| update.effective_user.id = 123 |
| update.effective_chat.id = -100 |
| update.effective_chat.type = "group" |
| |
| mock_components['context_manager'].get_context.return_value = [] |
| mock_agent = Mock() |
| mock_agent.conversation_history = [] |
| mock_components['claude_agent']._get_or_create_agent.return_value = mock_agent |
| |
| reply_message = Mock() |
| reply_message.text = "Bot回复" |
| reply_message.message_id = 790 |
| reply_message.from_user = Mock() |
| reply_message.from_user.id = 999 |
| mock_components['stream_sender'].send_streaming_message.return_value = reply_message |
| |
| context = Mock() |
| |
| # 应该不会抛出异常,即使Webhook失败 |
| await handler.handle_text_message(update, context) |
| |
| # 验证Webhook仍然被调用了 |
| webhook_callback.assert_called_once() |
| |
| @pytest.mark.asyncio |
| async def test_handle_text_message_error_fallback(self, mock_components): |
| """测试文本消息处理发生异常时的错误处理""" |
| handler = MessageHandler( |
| telegram_client=mock_components['telegram_client'], |
| context_manager=mock_components['context_manager'], |
| file_handler=mock_components['file_handler'], |
| claude_agent=mock_components['claude_agent'], |
| stream_sender=mock_components['stream_sender'], |
| allowed_users=[123], |
| allowed_groups=[] |
| ) |
| |
| # 创建私聊Update |
| update = Mock() |
| update.effective_message = Mock() |
| update.effective_user = Mock() |
| update.effective_chat = Mock() |
| |
| update.effective_message.text = "测试消息" |
| update.effective_user.id = 123 |
| update.effective_chat.id = 123 |
| update.effective_chat.type = "private" |
| |
| # Mock stream_sender抛出异常 |
| mock_components['stream_sender'].send_streaming_message.side_effect = Exception("流式响应失败") |
| |
| context = Mock() |
| |
| # 应该捕获异常并发送错误消息 |
| await handler.handle_text_message(update, context) |
| |
| # 验证发送了错误消息 |
| mock_components['telegram_client'].send_message.assert_called() |
| send_call = mock_components['telegram_client'].send_message.call_args[1] |
| assert "消息处理失败" in send_call['text'] |
| |
| def test_check_mention_in_text_webhook(self, message_handler): |
| """测试Webhook消息中的@检测""" |
| # 设置bot_username属性 |
| message_handler.bot_username = "Claude" |
| |
| # 测试包含@Claude的消息 |
| assert message_handler._check_mention_in_text_webhook("@Claude 帮助我") is True |
| |
| # 测试不包含@Claude的消息 |
| assert message_handler._check_mention_in_text_webhook("普通消息") is False |
| |
| # 测试包含其他@的消息 |
| assert message_handler._check_mention_in_text_webhook("@其他人 你好") is False |
| |
| def test_build_user_display_name(self, message_handler): |
| """测试构建用户显示名""" |
| # 测试有用户名的情况 - 优先使用用户名 |
| user_info = { |
| 'username': 'testuser', |
| 'first_name': 'Test', |
| 'last_name': 'User' |
| } |
| display_name = message_handler._build_user_display_name(123, user_info) |
| assert display_name == "@testuser" |
| |
| # 测试只有姓名的情况 |
| user_info = { |
| 'first_name': 'Test', |
| 'last_name': 'User' |
| } |
| display_name = message_handler._build_user_display_name(456, user_info) |
| assert display_name == "Test User" |
| |
| # 测试只有first_name |
| user_info = {'first_name': 'Test'} |
| display_name = message_handler._build_user_display_name(789, user_info) |
| assert display_name == "Test" |
| |
| # 测试空信息 |
| user_info = {} |
| display_name = message_handler._build_user_display_name(999, user_info) |
| assert display_name == "User999" |
| |
| def test_convert_agent_history_to_context_format(self, message_handler): |
| """测试转换Agent历史为上下文格式""" |
| agent_history = [ |
| {"role": "user", "content": "用户消息1"}, |
| {"role": "assistant", "content": "助手回复1"}, |
| {"role": "user", "content": "用户消息2"} |
| ] |
| |
| # 执行转换 |
| context = message_handler._convert_agent_history_to_context_format(agent_history, "test_chat") |
| |
| # 验证转换结果 |
| assert len(context) == 3 |
| assert context[0]["is_bot"] is False |
| assert context[1]["is_bot"] is True |
| assert context[2]["is_bot"] is False |
| assert context[0]["message"] == "用户消息1" |
| |
| @pytest.mark.asyncio |
| async def test_handle_webhook_message_with_mention(self, message_handler): |
| """测试处理Webhook消息(带@mention)""" |
| # 设置bot_username以支持mention检测 |
| message_handler.bot_username = "testbot" |
| |
| chat_id = -100 # 授权的群组 |
| user_id = 123 |
| message_text = "@testbot 帮助我" |
| user_info = { |
| 'user_id': user_id, |
| 'username': 'testuser', |
| 'first_name': 'Test', |
| 'last_name': 'User' |
| } |
| |
| # Mock流式响应 |
| mock_reply = Mock() |
| mock_reply.text = "Webhook回复内容" |
| mock_reply.message_id = 999 |
| mock_reply.from_user = Mock() |
| mock_reply.from_user.id = 888 |
| mock_reply.from_user.username = "testbot" |
| mock_reply.from_user.first_name = "TestBot" |
| mock_reply.from_user.last_name = None |
| |
| # Mock create_streaming_response异步生成器 |
| async def mock_streaming_response(*args, **kwargs): |
| yield "Webhook" |
| yield "回复内容" |
| |
| message_handler.claude_agent.create_streaming_response = AsyncMock( |
| return_value=mock_streaming_response() |
| ) |
| |
| # Mock send_streaming_message to actually call the generator function |
| async def mock_send_streaming_message(chat_id, message_generator, initial_text, **kwargs): |
| # 调用生成器函数来触发create_streaming_response |
| async for chunk in message_generator(): |
| pass # 消费生成器 |
| return mock_reply |
| |
| message_handler.stream_sender.send_streaming_message = AsyncMock( |
| side_effect=mock_send_streaming_message |
| ) |
| |
| # 执行处理 |
| await message_handler.handle_webhook_message( |
| chat_id=chat_id, |
| user_id=user_id, |
| message_text=message_text, |
| user_info=user_info, |
| chat_type='supergroup', |
| is_bot=False |
| ) |
| |
| # 验证调用了流式发送器 |
| message_handler.stream_sender.send_streaming_message.assert_called_once() |
| # 验证调用了Claude Agent (通过generator调用) |
| message_handler.claude_agent.create_streaming_response.assert_called_once() |
| |
| @pytest.mark.asyncio |
| async def test_handle_webhook_message_private_chat(self, message_handler): |
| """测试处理Webhook消息(私聊)""" |
| chat_id = 123 # 授权的私聊用户 |
| user_id = 123 |
| message_text = "私聊消息" |
| user_info = { |
| 'user_id': user_id, |
| 'username': 'testuser', |
| 'first_name': 'Test' |
| } |
| |
| # Mock流式响应 |
| mock_reply = Mock() |
| mock_reply.text = "私聊回复" |
| mock_reply.message_id = 999 |
| mock_reply.from_user = Mock() |
| mock_reply.from_user.id = 888 |
| |
| # Mock create_streaming_response |
| async def mock_streaming_response(*args, **kwargs): |
| yield "私聊回复" |
| |
| message_handler.claude_agent.create_streaming_response = AsyncMock( |
| return_value=mock_streaming_response() |
| ) |
| |
| # Mock send_streaming_message to call the generator |
| async def mock_send_streaming_message(chat_id, message_generator, initial_text, **kwargs): |
| async for chunk in message_generator(): |
| pass # 消费生成器 |
| return mock_reply |
| |
| message_handler.stream_sender.send_streaming_message = AsyncMock( |
| side_effect=mock_send_streaming_message |
| ) |
| |
| # 执行处理 |
| await message_handler.handle_webhook_message( |
| chat_id=chat_id, |
| user_id=user_id, |
| message_text=message_text, |
| user_info=user_info, |
| chat_type='private', |
| is_bot=False |
| ) |
| |
| # 验证调用了流式发送器 |
| message_handler.stream_sender.send_streaming_message.assert_called_once() |
| # 验证调用了Claude Agent |
| message_handler.claude_agent.create_streaming_response.assert_called_once() |
| |
| @pytest.mark.asyncio |
| async def test_handle_webhook_message_no_reply_needed(self, message_handler): |
| """测试处理Webhook消息(不需要回复)""" |
| chat_id = -999 # 未授权的群组 |
| user_id = 123 |
| message_text = "普通群组消息" |
| user_info = {'user_id': user_id} |
| |
| # 执行处理 |
| await message_handler.handle_webhook_message( |
| chat_id=chat_id, |
| user_id=user_id, |
| message_text=message_text, |
| user_info=user_info, |
| chat_type='supergroup', |
| is_bot=False |
| ) |
| |
| # 验证没有调用流式发送器(因为不需要回复) |
| message_handler.stream_sender.send_streaming_message.assert_not_called() |
| |
| @pytest.mark.asyncio |
| async def test_handle_webhook_message_error_handling(self, message_handler): |
| """测试Webhook消息处理错误""" |
| chat_id = 123 |
| user_id = 123 |
| message_text = "会导致错误的消息" |
| user_info = {'user_id': user_id} |
| |
| # Mock抛出异常 |
| message_handler.stream_sender.send_streaming_message = AsyncMock( |
| side_effect=Exception("流式发送失败") |
| ) |
| |
| # 应该不会抛出异常 |
| await message_handler.handle_webhook_message( |
| chat_id=chat_id, |
| user_id=user_id, |
| message_text=message_text, |
| user_info=user_info, |
| chat_type='private', |
| is_bot=False |
| ) |
| |
| @pytest.mark.asyncio |
| async def test_should_reply_detailed_direct_bot_reply(self, message_handler, mock_components): |
| """测试_should_reply_detailed - 直接回复bot消息""" |
| # 创建群组Update |
| update = Mock() |
| update.effective_chat = Mock() |
| update.effective_message = Mock() |
| |
| update.effective_chat.type = "group" |
| update.effective_chat.id = -100 # 授权群组 |
| |
| # Mock回复了bot的消息 |
| reply_message = Mock() |
| reply_message.from_user = Mock() |
| reply_message.from_user.is_bot = True |
| reply_message.from_user.id = 999 # Bot ID |
| update.effective_message.reply_to_message = reply_message |
| |
| # Mock Bot对象 |
| mock_bot = Mock() |
| mock_bot.id = 999 # 同一个bot |
| update.get_bot.return_value = mock_bot |
| |
| should_reply, is_random = await message_handler._should_reply_detailed(update, "回复消息") |
| |
| assert should_reply is True |
| assert is_random is False # 不是随机参与 |
| |
| @pytest.mark.asyncio |
| async def test_should_reply_detailed_name_mention(self, message_handler): |
| """测试_should_reply_detailed - 检测到bot名字提及""" |
| update = Mock() |
| update.effective_chat = Mock() |
| update.effective_message = Mock() |
| |
| update.effective_chat.type = "supergroup" |
| update.effective_chat.id = -100 |
| update.effective_message.reply_to_message = None |
| |
| # Mock get_bot返回None用户名 |
| mock_bot = Mock() |
| mock_bot.username = None |
| update.get_bot.return_value = mock_bot |
| |
| # Mock participation_manager检测到名字提及 |
| message_handler.participation_manager.is_name_mentioned = Mock(return_value=True) |
| message_handler.participation_manager.reset_counter = Mock() |
| |
| should_reply, is_random = await message_handler._should_reply_detailed(update, "Claude 帮忙") |
| |
| assert should_reply is True |
| assert is_random is False |
| message_handler.participation_manager.reset_counter.assert_called_once() |
| |
| @pytest.mark.asyncio |
| async def test_should_reply_detailed_random_participation(self, message_handler): |
| """测试_should_reply_detailed - 随机参与""" |
| update = Mock() |
| update.effective_chat = Mock() |
| update.effective_message = Mock() |
| |
| update.effective_chat.type = "group" |
| update.effective_chat.id = -100 |
| update.effective_message.reply_to_message = None |
| |
| mock_bot = Mock() |
| mock_bot.username = "testbot" |
| update.get_bot.return_value = mock_bot |
| |
| # Mock没有@提及和名字提及,但达到随机参与条件 |
| message_handler.participation_manager.is_name_mentioned = Mock(return_value=False) |
| message_handler.participation_manager.should_participate_random = Mock(return_value=True) |
| |
| should_reply, is_random = await message_handler._should_reply_detailed(update, "普通消息") |
| |
| assert should_reply is True |
| assert is_random is True # 这是随机参与 |
| |
| @pytest.mark.asyncio |
| async def test_should_reply_with_auth_private_unauthorized(self, message_handler): |
| """测试_should_reply_with_auth - 私聊未授权用户""" |
| update = Mock() |
| update.effective_user = Mock() |
| update.effective_chat = Mock() |
| |
| update.effective_user.id = 999 # 未授权用户 |
| update.effective_chat.type = "private" |
| |
| result = await message_handler._should_reply_with_auth(update, "测试消息") |
| |
| assert result is False |
| |
| @pytest.mark.asyncio |
| async def test_should_reply_with_auth_group_unauthorized(self, message_handler): |
| """测试_should_reply_with_auth - 群组未授权""" |
| update = Mock() |
| update.effective_user = Mock() |
| update.effective_chat = Mock() |
| |
| update.effective_user.id = 123 |
| update.effective_chat.id = -999 # 未授权群组 |
| update.effective_chat.type = "supergroup" |
| |
| result = await message_handler._should_reply_with_auth(update, "测试消息") |
| |
| assert result is False |
| |
| @pytest.mark.asyncio |
| async def test_should_reply_with_auth_missing_objects(self, message_handler): |
| """测试_should_reply_with_auth - 缺少必要对象""" |
| update = Mock() |
| update.effective_user = None |
| update.effective_chat = None |
| |
| result = await message_handler._should_reply_with_auth(update, "测试消息") |
| |
| assert result is False |
| |
| def test_format_reply_context_bot_with_username(self, message_handler): |
| """测试_format_reply_context - Bot用户带用户名""" |
| reply_info = { |
| 'user_info': { |
| 'is_bot': True, |
| 'username': 'testbot', |
| 'first_name': 'TestBot', |
| 'last_name': 'Assistant' |
| }, |
| 'timestamp': None, |
| 'content': 'Bot的回复内容' |
| } |
| |
| result = message_handler._format_reply_context(reply_info) |
| |
| expected = "↳ 回复 [🤖 @testbot]: Bot的回复内容" |
| assert result == expected |
| |
| def test_format_reply_context_bot_without_username(self, message_handler): |
| """测试_format_reply_context - Bot用户没有用户名""" |
| reply_info = { |
| 'user_info': { |
| 'is_bot': True, |
| 'username': None, |
| 'first_name': 'TestBot', |
| 'last_name': None |
| }, |
| 'timestamp': None, |
| 'content': 'Bot回复' |
| } |
| |
| result = message_handler._format_reply_context(reply_info) |
| |
| expected = "↳ 回复 [🤖 TestBot]: Bot回复" |
| assert result == expected |
| |
| def test_format_reply_context_system_user(self, message_handler): |
| """测试_format_reply_context - 系统用户""" |
| reply_info = { |
| 'user_info': { |
| 'is_bot': False, |
| 'username': None, |
| 'first_name': None, |
| 'user_id': 'system' |
| }, |
| 'timestamp': None, |
| 'content': '系统消息' |
| } |
| |
| result = message_handler._format_reply_context(reply_info) |
| |
| expected = "↳ 回复 [system]: 系统消息" |
| assert result == expected |
| |
| def test_format_reply_context_unknown_user(self, message_handler): |
| """测试_format_reply_context - 未知用户""" |
| reply_info = { |
| 'user_info': { |
| 'is_bot': False, |
| 'username': None, |
| 'first_name': None, |
| 'user_id': 0 |
| }, |
| 'timestamp': None, |
| 'content': '未知用户消息' |
| } |
| |
| result = message_handler._format_reply_context(reply_info) |
| |
| expected = "↳ 回复 [unknown user]: 未知用户消息" |
| assert result == expected |
| |
| def test_format_reply_context_with_timestamp(self, message_handler): |
| """测试_format_reply_context - 包含时间戳""" |
| from datetime import datetime |
| |
| mock_time = datetime(2024, 1, 1, 14, 30, 0) |
| reply_info = { |
| 'user_info': { |
| 'is_bot': False, |
| 'username': 'testuser', |
| 'first_name': 'Test' |
| }, |
| 'timestamp': mock_time, |
| 'content': '带时间戳的消息' |
| } |
| |
| result = message_handler._format_reply_context(reply_info) |
| |
| expected = "↳ 回复 [@testuser 14:30]: 带时间戳的消息" |
| assert result == expected |
| |
| def test_format_reply_context_empty_info(self, message_handler): |
| """测试_format_reply_context - 空回复信息""" |
| result = message_handler._format_reply_context(None) |
| assert result == "" |
| |
| result = message_handler._format_reply_context({}) |
| assert result == "" |
| |
| @pytest.mark.asyncio |
| async def test_get_agent_history_with_persistence(self, message_handler): |
| """测试_get_agent_history - 有persistence的情况""" |
| # Mock claude_agent有persistence属性 |
| mock_persistence = Mock() |
| mock_persistence.load_conversation_history.return_value = [ |
| {"role": "user", "content": "历史消息1"}, |
| {"role": "assistant", "content": "历史回复1"} |
| ] |
| message_handler.claude_agent.persistence = mock_persistence |
| |
| history = await message_handler._get_agent_history("test_chat") |
| |
| assert len(history) == 2 |
| assert history[0]["content"] == "历史消息1" |
| mock_persistence.load_conversation_history.assert_called_once_with("test_chat") |
| |
| @pytest.mark.asyncio |
| async def test_get_agent_history_no_persistence(self, message_handler): |
| """测试_get_agent_history - 没有persistence的情况""" |
| # 确保claude_agent没有persistence属性 |
| if hasattr(message_handler.claude_agent, 'persistence'): |
| del message_handler.claude_agent.persistence |
| |
| history = await message_handler._get_agent_history("test_chat") |
| |
| assert history == [] |
| |
| @pytest.mark.asyncio |
| async def test_get_agent_history_exception(self, message_handler): |
| """测试_get_agent_history - 异常处理""" |
| # Mock persistence抛出异常 |
| mock_persistence = Mock() |
| mock_persistence.load_conversation_history.side_effect = Exception("加载失败") |
| message_handler.claude_agent.persistence = mock_persistence |
| |
| history = await message_handler._get_agent_history("test_chat") |
| |
| assert history == [] |
| |
| @pytest.mark.asyncio |
| async def test_save_message_to_agent_user_message(self, message_handler): |
| """测试_save_message_to_agent - 用户消息""" |
| # Mock Agent实例 |
| mock_agent = Mock() |
| mock_agent.conversation_history = [] |
| message_handler.claude_agent._get_or_create_agent = Mock(return_value=mock_agent) |
| |
| user_info = { |
| 'username': 'testuser', |
| 'first_name': 'Test', |
| 'reply_info': { |
| 'user_info': {'username': 'otheruser'}, |
| 'content': '被回复的内容', |
| 'timestamp': None |
| } |
| } |
| |
| await message_handler._save_message_to_agent( |
| chat_id=123, |
| user_id=456, |
| message="用户的消息", |
| is_bot=False, |
| user_info=user_info |
| ) |
| |
| # 验证消息被添加到历史记录 |
| assert len(mock_agent.conversation_history) == 1 |
| message_entry = mock_agent.conversation_history[0] |
| assert message_entry['role'] == 'user' |
| assert '用户的消息' in message_entry['content'] |
| assert '@testuser' in message_entry['content'] |
| |
| @pytest.mark.asyncio |
| async def test_save_message_to_agent_bot_message(self, message_handler): |
| """测试_save_message_to_agent - Bot消息""" |
| mock_agent = Mock() |
| mock_agent.conversation_history = [] |
| message_handler.claude_agent._get_or_create_agent = Mock(return_value=mock_agent) |
| |
| await message_handler._save_message_to_agent( |
| chat_id=123, |
| user_id=999, |
| message="Bot的回复", |
| is_bot=True, |
| user_info={'first_name': 'TestBot'} |
| ) |
| |
| # 验证Bot消息被添加 |
| assert len(mock_agent.conversation_history) == 1 |
| message_entry = mock_agent.conversation_history[0] |
| assert message_entry['role'] == 'assistant' |
| assert 'Bot的回复' in message_entry['content'] |
| assert 'Bot' in message_entry['content'] |
| |
| @pytest.mark.asyncio |
| async def test_save_message_to_agent_exception(self, message_handler): |
| """测试_save_message_to_agent - 异常处理""" |
| # Mock _get_or_create_agent抛出异常 |
| message_handler.claude_agent._get_or_create_agent = Mock( |
| side_effect=Exception("Agent创建失败") |
| ) |
| |
| # 应该不会抛出异常 |
| await message_handler._save_message_to_agent( |
| chat_id=123, |
| user_id=456, |
| message="测试消息", |
| is_bot=False |
| ) |