blob: 4dd175aa7797317376097f1007cb6506b3672a89 [file] [log] [blame] [raw]
"""
Telegram Bot单元测试 - 扩展功能测试
测试Bot类的启动、停止、错误处理等功能
"""
import pytest
import asyncio
from unittest.mock import Mock, AsyncMock, patch
import signal
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent.parent.parent.parent / "src"))
from claude_agent.telegram.bot import TelegramBot
class TestTelegramBotExtended:
"""Telegram Bot扩展功能测试"""
@pytest.fixture
def mock_config_manager(self):
"""创建Mock配置管理器"""
config_manager = Mock()
telegram_config = {
'bot_token': '123456789:ABCDEFGHIJKLMNOPQRSTUVWXYZ123456789',
'allowed_users': [111, 222],
'allowed_groups': [-333],
'message': {
'context_history_limit': 20,
'stream_update_interval': 1.0
},
'files': {
'temp_dir': '/tmp/test',
'max_file_size_mb': 10
},
'group_participation': {
'bot_names': ['testbot', 'TestBot'],
'random_participation_range': [5, 10]
}
}
webhook_config = {
'enabled': True,
'server_url': 'http://localhost:8080',
'auth_token': 'test_token',
'client': {
'subscribed_groups': [-333],
'callback_port': 8081
}
}
config_manager.get_telegram_config.return_value = telegram_config
config_manager.get_webhook_config.return_value = webhook_config
return config_manager
@pytest.fixture
def bot_with_mocks(self, mock_config_manager):
"""创建带Mock的Bot实例"""
with patch('claude_agent.telegram.bot.get_config_manager') as mock_get_config:
mock_get_config.return_value = mock_config_manager
bot = TelegramBot("test")
return bot
@pytest.mark.asyncio
async def test_initialize_components(self, bot_with_mocks):
"""测试组件初始化"""
with patch('claude_agent.telegram.bot.Application') as mock_app_class:
# Mock Application builder
mock_builder = Mock()
mock_app = Mock()
mock_bot = Mock()
# 关键:让bot.get_me()返回一个awaitable
mock_bot.get_me = AsyncMock(return_value=Mock(id='123456789', username='TestBot'))
mock_app.bot = mock_bot
mock_builder.token.return_value = mock_builder
mock_builder.build.return_value = mock_app
mock_app_class.builder.return_value = mock_builder
# 初始化
await bot_with_mocks.initialize()
# 验证组件创建
assert bot_with_mocks.application is not None
assert bot_with_mocks.telegram_client is not None
assert bot_with_mocks.context_manager is not None
assert bot_with_mocks.file_handler is not None
assert bot_with_mocks.claude_agent is not None
assert bot_with_mocks.stream_sender is not None
assert bot_with_mocks.message_handler is not None
# 验证Application配置
mock_builder.token.assert_called_with('123456789:ABCDEFGHIJKLMNOPQRSTUVWXYZ123456789')
@pytest.mark.asyncio
async def test_register_handlers(self, bot_with_mocks):
"""测试处理器注册"""
with patch('claude_agent.telegram.bot.Application') as mock_app_class:
mock_builder = Mock()
mock_app = Mock()
mock_bot = Mock()
# 关键:让bot.get_me()返回一个awaitable
mock_bot.get_me = AsyncMock(return_value=Mock(id='123456789', username='TestBot'))
mock_app.bot = mock_bot
mock_app.add_handler = Mock()
mock_app.add_error_handler = Mock()
mock_builder.token.return_value = mock_builder
mock_builder.build.return_value = mock_app
mock_app_class.builder.return_value = mock_builder
await bot_with_mocks.initialize()
# 验证添加了处理器
assert mock_app.add_handler.call_count == 3 # 文本、图片、文档
mock_app.add_error_handler.assert_called_once()
@pytest.mark.asyncio
async def test_error_handler(self, bot_with_mocks):
"""测试错误处理器"""
# 设置Mock客户端
mock_client = Mock()
mock_client.send_message = AsyncMock()
bot_with_mocks.telegram_client = mock_client
# 创建Mock update
update = Mock()
update.effective_chat = Mock()
update.effective_chat.id = 123
# 创建Mock context
context = Mock()
context.error = ValueError("Test error")
# 调用错误处理器
await bot_with_mocks._error_handler(update, context)
# 验证发送错误消息
mock_client.send_message.assert_called_once_with(
chat_id=123,
text="❌ 处理消息时出现内部错误,已记录此问题。"
)
@pytest.mark.asyncio
async def test_error_handler_no_chat(self, bot_with_mocks):
"""测试错误处理器(无聊天对象)"""
# 设置Mock客户端
mock_client = Mock()
mock_client.send_message = AsyncMock()
bot_with_mocks.telegram_client = mock_client
# 创建Mock context(没有effective_chat)
context = Mock()
context.error = ValueError("Test error")
# 调用错误处理器
await bot_with_mocks._error_handler(None, context)
# 验证没有尝试发送消息
mock_client.send_message.assert_not_called()
@pytest.mark.asyncio
async def test_startup_notification(self, bot_with_mocks):
"""测试启动通知"""
# 设置Mock客户端
mock_client = Mock()
mock_client.send_message = AsyncMock()
bot_with_mocks.telegram_client = mock_client
# 发送启动通知
await bot_with_mocks._send_startup_notification()
# 验证向所有用户发送通知
assert mock_client.send_message.call_count == 2
calls = mock_client.send_message.call_args_list
# 验证发送给正确的用户
sent_to_users = {call[1]['chat_id'] for call in calls}
assert sent_to_users == {111, 222}
# 验证消息内容
for call in calls:
message_text = call[1]['text']
assert "🤖 Claude Telegram Bot 已启动" in message_text
@pytest.mark.asyncio
async def test_shutdown_notification(self, bot_with_mocks):
"""测试关闭通知"""
# 设置Mock客户端
mock_client = Mock()
mock_client.send_message = AsyncMock()
bot_with_mocks.telegram_client = mock_client
# 发送关闭通知
await bot_with_mocks._send_shutdown_notification()
# 验证向所有用户发送通知
assert mock_client.send_message.call_count == 2
calls = mock_client.send_message.call_args_list
# 验证发送给正确的用户
sent_to_users = {call[1]['chat_id'] for call in calls}
assert sent_to_users == {111, 222}
# 验证消息内容
for call in calls:
message_text = call[1]['text']
assert "🤖 Claude Telegram Bot 正在关闭" in message_text
@pytest.mark.asyncio
async def test_send_admin_message(self, bot_with_mocks):
"""测试发送管理员消息"""
# 设置Mock客户端
mock_client = Mock()
mock_client.send_message = AsyncMock()
bot_with_mocks.telegram_client = mock_client
# 发送管理员消息
test_message = "System maintenance at 2AM"
await bot_with_mocks.send_admin_message(test_message)
# 验证向所有用户发送消息
assert mock_client.send_message.call_count == 2
calls = mock_client.send_message.call_args_list
# 验证发送给正确的用户
sent_to_users = {call[1]['chat_id'] for call in calls}
assert sent_to_users == {111, 222}
# 验证消息格式
for call in calls:
message_text = call[1]['text']
assert "📢 管理员消息:" in message_text
assert test_message in message_text
@pytest.mark.asyncio
async def test_start_already_running(self, bot_with_mocks):
"""测试启动已运行的Bot"""
# 设置Bot为运行状态
bot_with_mocks.is_running = True
# 记录原始状态
original_running = bot_with_mocks.is_running
# 调用start方法
await bot_with_mocks.start()
# 验证方法正常完成(由于finally块的存在,状态可能改变)
# 但早期返回逻辑应该有效
assert original_running is True
def test_setup_signal_handlers(self, bot_with_mocks):
"""测试信号处理器设置"""
# 模拟信号处理器设置
with patch('signal.signal') as mock_signal:
bot_with_mocks._setup_signal_handlers()
# 验证设置了正确的信号
mock_signal.assert_any_call(signal.SIGINT, mock_signal.call_args_list[0][0][1])
mock_signal.assert_any_call(signal.SIGTERM, mock_signal.call_args_list[1][0][1])
@pytest.mark.asyncio
async def test_stop_not_running(self, bot_with_mocks):
"""测试停止未运行的Bot"""
# 确保Bot未运行
bot_with_mocks.is_running = False
# 停止Bot应该正常返回
await bot_with_mocks.stop()
# 验证状态保持不变
assert bot_with_mocks.is_running is False
@pytest.mark.asyncio
async def test_stop_with_cleanup(self, bot_with_mocks):
"""测试停止Bot时的清理操作"""
# 设置Bot为运行状态
bot_with_mocks.is_running = True
# 设置Mock组件
mock_app = Mock()
mock_app.updater = Mock()
mock_app.updater.stop = AsyncMock()
mock_app.stop = AsyncMock()
mock_app.shutdown = AsyncMock()
bot_with_mocks.application = mock_app
mock_file_handler = Mock()
mock_file_handler.cleanup_temp_files = AsyncMock()
bot_with_mocks.file_handler = mock_file_handler
mock_context_manager = Mock()
mock_context_manager.cleanup_old_chats = Mock()
bot_with_mocks.context_manager = mock_context_manager
mock_client = Mock()
mock_client.send_message = AsyncMock()
bot_with_mocks.telegram_client = mock_client
# 停止Bot
await bot_with_mocks.stop()
# 验证清理操作
mock_app.updater.stop.assert_called_once()
mock_app.stop.assert_called_once()
mock_app.shutdown.assert_called_once()
mock_file_handler.cleanup_temp_files.assert_called_once()
mock_context_manager.cleanup_old_chats.assert_called_once()
# 验证状态更新
assert bot_with_mocks.is_running is False
@pytest.mark.asyncio
async def test_notification_with_send_error(self, bot_with_mocks):
"""测试发送通知时的错误处理"""
# 设置Mock客户端抛出异常
mock_client = Mock()
mock_client.send_message = AsyncMock(side_effect=Exception("Network error"))
bot_with_mocks.telegram_client = mock_client
# 发送启动通知应该不抛出异常
await bot_with_mocks._send_startup_notification()
# 验证尝试发送了消息
assert mock_client.send_message.call_count == 2
@pytest.mark.asyncio
async def test_error_handler_send_error(self, bot_with_mocks):
"""测试错误处理器发送消息失败"""
# 设置Mock客户端抛出异常
mock_client = Mock()
mock_client.send_message = AsyncMock(side_effect=Exception("Send failed"))
bot_with_mocks.telegram_client = mock_client
# 创建Mock update和context
update = Mock()
update.effective_chat = Mock()
update.effective_chat.id = 123
context = Mock()
context.error = ValueError("Original error")
# 调用错误处理器应该不抛出异常
await bot_with_mocks._error_handler(update, context)
# 验证尝试发送了消息
mock_client.send_message.assert_called_once()
def test_get_stats_with_context_manager(self, bot_with_mocks):
"""测试获取统计信息(包含上下文管理器)"""
# 设置Mock上下文管理器
mock_context_manager = Mock()
mock_context_manager.get_chat_count.return_value = 5
bot_with_mocks.context_manager = mock_context_manager
# 获取统计信息
stats = bot_with_mocks.get_stats()
# 验证统计信息
expected_stats = {
'is_running': False,
'config_name': 'test',
'allowed_users_count': 2,
'allowed_groups_count': 1,
'active_chats': 5
}
for key, value in expected_stats.items():
assert stats[key] == value
def test_get_stats_without_context_manager(self, bot_with_mocks):
"""测试获取统计信息(无上下文管理器)"""
# 确保没有上下文管理器
bot_with_mocks.context_manager = None
# 获取统计信息
stats = bot_with_mocks.get_stats()
# 验证基本统计信息
assert stats['is_running'] is False
assert stats['config_name'] == 'test'
assert stats['allowed_users_count'] == 2
assert stats['allowed_groups_count'] == 1
@pytest.mark.asyncio
async def test_bot_lifecycle_comprehensive(self, bot_with_mocks):
"""测试Bot完整生命周期管理"""
with patch.object(bot_with_mocks, 'initialize', new_callable=AsyncMock) as mock_init, \
patch.object(bot_with_mocks, '_setup_signal_handlers') as mock_setup_signals, \
patch.object(bot_with_mocks, '_send_startup_notification', new_callable=AsyncMock) as mock_startup, \
patch.object(bot_with_mocks, '_send_shutdown_notification', new_callable=AsyncMock) as mock_shutdown:
# Mock application
mock_app = Mock()
mock_app.initialize = AsyncMock()
mock_app.start = AsyncMock()
mock_app.updater = Mock()
mock_app.updater.start_polling = AsyncMock()
mock_app.updater.stop = AsyncMock()
mock_app.stop = AsyncMock()
mock_app.shutdown = AsyncMock()
bot_with_mocks.application = mock_app
# Mock其他组件
bot_with_mocks.file_handler = Mock()
bot_with_mocks.file_handler.cleanup_temp_files = AsyncMock()
bot_with_mocks.context_manager = Mock()
bot_with_mocks.context_manager.cleanup_old_chats = Mock()
# 设置立即停止以避免无限等待
bot_with_mocks.shutdown_event.set()
# 启动Bot
await bot_with_mocks.start()
# 验证启动流程
mock_init.assert_called_once()
mock_setup_signals.assert_called_once()
mock_app.initialize.assert_called_once()
mock_app.start.assert_called_once()
mock_app.updater.start_polling.assert_called_once_with(drop_pending_updates=True)
mock_startup.assert_called_once()
# 验证停止流程
mock_shutdown.assert_called_once()
mock_app.updater.stop.assert_called_once()
mock_app.stop.assert_called_once()
mock_app.shutdown.assert_called_once()
@pytest.mark.asyncio
async def test_send_admin_message_functionality(self, bot_with_mocks):
"""测试发送管理员消息功能"""
# Mock telegram客户端
mock_client = Mock()
mock_client.send_message = AsyncMock()
bot_with_mocks.telegram_client = mock_client
test_message = "System maintenance scheduled"
# 发送管理员消息
await bot_with_mocks.send_admin_message(test_message)
# 验证发送给所有允许用户
assert mock_client.send_message.call_count == 2
calls = mock_client.send_message.call_args_list
expected_users = [111, 222]
for i, call in enumerate(calls):
assert call[1]['chat_id'] == expected_users[i]
assert "📢 管理员消息" in call[1]['text']
assert test_message in call[1]['text']
@pytest.mark.asyncio
async def test_send_admin_message_with_failures(self, bot_with_mocks):
"""测试管理员消息发送时部分失败处理"""
mock_client = Mock()
# 第一个用户发送成功,第二个失败
mock_client.send_message = AsyncMock(side_effect=[None, Exception("Send failed")])
bot_with_mocks.telegram_client = mock_client
# 不应该抛出异常
await bot_with_mocks.send_admin_message("Test message")
# 验证尝试发送给所有用户
assert mock_client.send_message.call_count == 2
@pytest.mark.asyncio
async def test_webhook_integration_functionality(self, bot_with_mocks):
"""测试Webhook集成功能"""
with patch('claude_agent.webhook.telegram_integration.create_webhook_integration_from_config') as mock_create:
mock_integration = Mock()
mock_integration.start = AsyncMock()
mock_create.return_value = mock_integration
# 初始化Webhook集成
await bot_with_mocks._initialize_webhook_integration("TestBot")
# 验证创建参数
mock_create.assert_called_once()
call_args = mock_create.call_args
assert call_args[1]['bot_username'] == "TestBot"
assert call_args[1]['telegram_client'] == bot_with_mocks.telegram_client
# 验证启动
mock_integration.start.assert_called_once_with(8081)
assert bot_with_mocks.webhook_integration == mock_integration
@pytest.mark.asyncio
async def test_webhook_integration_disabled(self, bot_with_mocks, mock_config_manager):
"""测试Webhook集成禁用情况"""
# 禁用webhook
webhook_config = mock_config_manager.get_webhook_config.return_value
webhook_config['enabled'] = False
with patch('claude_agent.webhook.telegram_integration.create_webhook_integration_from_config') as mock_create:
mock_create.return_value = None
await bot_with_mocks._initialize_webhook_integration("TestBot")
# 验证集成为None
assert bot_with_mocks.webhook_integration is None
@pytest.mark.asyncio
async def test_webhook_message_processing(self, bot_with_mocks):
"""测试Webhook消息处理"""
# 创建消息数据
message_data = {
'chat_id': '-333',
'user_id': 999,
'message': 'Test webhook message',
'is_bot': False,
'user_info': {'username': 'testuser', 'first_name': 'Test'}
}
# Mock消息处理器
mock_handler = Mock()
mock_handler.handle_webhook_message = AsyncMock()
bot_with_mocks.message_handler = mock_handler
# 处理队列消息
await bot_with_mocks._process_webhook_message_from_queue(message_data)
# 验证消息处理
mock_handler.handle_webhook_message.assert_called_once_with(
chat_id=-333,
user_id=999,
message_text='Test webhook message',
user_info={'username': 'testuser', 'first_name': 'Test'},
chat_type='supergroup',
is_bot=False
)
@pytest.mark.asyncio
async def test_webhook_message_processing_no_handler(self, bot_with_mocks):
"""测试无消息处理器时的Webhook消息处理"""
message_data = {
'chat_id': '-333',
'user_id': 999,
'message': 'Test message',
'is_bot': False,
'user_info': {}
}
# 设置无消息处理器
bot_with_mocks.message_handler = None
# 不应该抛出异常
await bot_with_mocks._process_webhook_message_from_queue(message_data)
@pytest.mark.asyncio
async def test_broadcast_bot_message_success(self, bot_with_mocks):
"""测试Bot消息广播成功"""
mock_integration = Mock()
mock_integration.broadcast_bot_message = AsyncMock(return_value=True)
bot_with_mocks.webhook_integration = mock_integration
result = await bot_with_mocks.broadcast_bot_message(
group_id=-333,
message_content="Hello from bot",
sender_info={'user_id': 999, 'username': 'testbot'},
reply_info={'content': 'original message'},
telegram_message_id=123,
message_type='text'
)
assert result is True
mock_integration.broadcast_bot_message.assert_called_once()
call_args = mock_integration.broadcast_bot_message.call_args[1]
assert call_args['group_id'] == -333
assert call_args['message_content'] == "Hello from bot"
assert call_args['telegram_message_id'] == 123
@pytest.mark.asyncio
async def test_broadcast_bot_message_no_integration(self, bot_with_mocks):
"""测试无Webhook集成时的消息广播"""
bot_with_mocks.webhook_integration = None
result = await bot_with_mocks.broadcast_bot_message(
group_id=-333,
message_content="Hello",
sender_info={'user_id': 999}
)
assert result is False
@pytest.mark.asyncio
async def test_broadcast_bot_message_failure(self, bot_with_mocks):
"""测试Bot消息广播失败处理"""
mock_integration = Mock()
mock_integration.broadcast_bot_message = AsyncMock(side_effect=Exception("Broadcast failed"))
bot_with_mocks.webhook_integration = mock_integration
# 不应该抛出异常
result = await bot_with_mocks.broadcast_bot_message(
group_id=-333,
message_content="Hello",
sender_info={'user_id': 999}
)
assert result is False
@pytest.mark.asyncio
async def test_webhook_queue_processing_loop(self, bot_with_mocks):
"""测试Webhook消息队列处理循环"""
# Mock队列
bot_with_mocks.webhook_message_queue = Mock()
# 第一次返回消息,然后立即抛出CancelledError来终止循环
message_data = {'chat_id': '-333', 'user_id': 999, 'message': 'test', 'is_bot': False, 'user_info': {}}
call_count = 0
async def mock_get():
nonlocal call_count
call_count += 1
if call_count == 1:
return message_data
else:
# 第二次调用时抛出CancelledError来终止循环
raise asyncio.CancelledError()
bot_with_mocks.webhook_message_queue.get = mock_get
# Mock消息处理
with patch.object(bot_with_mocks, '_process_webhook_message_from_queue', new_callable=AsyncMock) as mock_process:
try:
# 运行队列处理,应该在第二次调用时被取消
await bot_with_mocks._process_webhook_message_queue()
except asyncio.CancelledError:
pass
# 验证处理了一条消息
mock_process.assert_called_once_with(message_data)
def test_signal_handler_setup(self, bot_with_mocks):
"""测试信号处理器设置"""
with patch('signal.signal') as mock_signal:
bot_with_mocks._setup_signal_handlers()
# 验证设置了SIGINT和SIGTERM处理器
assert mock_signal.call_count == 2
calls = mock_signal.call_args_list
signals = [call[0][0] for call in calls]
assert signal.SIGINT in signals
assert signal.SIGTERM in signals
@pytest.mark.asyncio
async def test_startup_notification_functionality(self, bot_with_mocks):
"""测试启动通知功能"""
mock_client = Mock()
mock_client.send_message = AsyncMock()
bot_with_mocks.telegram_client = mock_client
await bot_with_mocks._send_startup_notification()
# 验证发送给所有允许用户
assert mock_client.send_message.call_count == 2
calls = mock_client.send_message.call_args_list
for call in calls:
message_text = call[1]['text']
assert "🤖 Claude Telegram Bot 已启动" in message_text
assert "✅ 系统状态: 正常运行" in message_text
@pytest.mark.asyncio
async def test_shutdown_notification_functionality(self, bot_with_mocks):
"""测试关闭通知功能"""
mock_client = Mock()
mock_client.send_message = AsyncMock()
bot_with_mocks.telegram_client = mock_client
await bot_with_mocks._send_shutdown_notification()
# 验证发送给所有允许用户
assert mock_client.send_message.call_count == 2
calls = mock_client.send_message.call_args_list
for call in calls:
message_text = call[1]['text']
assert "🤖 Claude Telegram Bot 正在关闭" in message_text
assert "⏹️ 系统状态: 准备停止" in message_text
@pytest.mark.asyncio
async def test_notification_send_failures(self, bot_with_mocks):
"""测试通知发送失败处理"""
mock_client = Mock()
mock_client.send_message = AsyncMock(side_effect=Exception("Send failed"))
bot_with_mocks.telegram_client = mock_client
# 启动通知失败不应该抛出异常
await bot_with_mocks._send_startup_notification()
# 关闭通知失败不应该抛出异常
await bot_with_mocks._send_shutdown_notification()
# 验证尝试发送
assert mock_client.send_message.call_count == 4 # 2次启动 + 2次关闭
def test_dependency_injection_methods(self, bot_with_mocks):
"""测试依赖注入方法"""
# 创建模拟组件
mock_context_manager = Mock()
mock_file_handler = Mock()
mock_claude_agent = Mock()
mock_stream_sender = Mock()
# 测试设置方法
bot_with_mocks.set_context_manager(mock_context_manager)
bot_with_mocks.set_file_handler(mock_file_handler)
bot_with_mocks.set_claude_agent(mock_claude_agent)
bot_with_mocks.set_stream_sender(mock_stream_sender)
# 验证设置成功
assert bot_with_mocks.context_manager == mock_context_manager
assert bot_with_mocks.file_handler == mock_file_handler
assert bot_with_mocks.claude_agent == mock_claude_agent
assert bot_with_mocks.stream_sender == mock_stream_sender