| """ |
| Telegram Bot单元测试 - 上下文管理器 |
| """ |
| |
| import pytest |
| from unittest.mock import Mock, patch |
| from datetime import datetime |
| import json |
| |
| import sys |
| from pathlib import Path |
| sys.path.insert(0, str(Path(__file__).parent.parent.parent.parent / "src")) |
| |
| from claude_agent.telegram.context_manager import ContextManager |
| |
| |
| class TestContextManager: |
| """上下文管理器测试""" |
| |
| def test_init(self): |
| """测试初始化""" |
| manager = ContextManager(max_history_per_chat=10) |
| assert manager.max_history_per_chat == 10 |
| assert manager.get_chat_count() == 0 |
| |
| def test_add_message(self): |
| """测试添加消息""" |
| manager = ContextManager(max_history_per_chat=5) |
| |
| # 添加消息 |
| manager.add_message(chat_id=123, user_id=456, message="Hello", is_bot=False) |
| |
| # 验证消息数量 |
| assert manager.get_message_count(123) == 1 |
| assert manager.get_chat_count() == 1 |
| |
| # 获取上下文 |
| context = manager.get_context(123) |
| assert len(context) == 1 |
| assert context[0]['message'] == "Hello" |
| assert context[0]['user_id'] == 456 |
| assert context[0]['is_bot'] == False |
| assert context[0]['chat_id'] == 123 |
| |
| def test_add_bot_message(self): |
| """测试添加机器人消息""" |
| manager = ContextManager() |
| |
| manager.add_message(chat_id=123, user_id=789, message="Bot response", is_bot=True) |
| |
| context = manager.get_context(123) |
| assert len(context) == 1 |
| assert context[0]['is_bot'] == True |
| assert context[0]['message'] == "Bot response" |
| |
| def test_context_limit(self): |
| """测试上下文数量限制""" |
| manager = ContextManager(max_history_per_chat=3) |
| |
| # 添加超过限制的消息 |
| for i in range(5): |
| manager.add_message(chat_id=123, user_id=456, message=f"Message {i}") |
| |
| # 验证只保留最新的3条消息 |
| context = manager.get_context(123) |
| assert len(context) == 3 |
| assert context[0]['message'] == "Message 2" |
| assert context[1]['message'] == "Message 3" |
| assert context[2]['message'] == "Message 4" |
| |
| def test_get_context_with_limit(self): |
| """测试获取指定数量的上下文""" |
| manager = ContextManager() |
| |
| # 添加5条消息 |
| for i in range(5): |
| manager.add_message(chat_id=123, user_id=456, message=f"Message {i}") |
| |
| # 获取最近3条 |
| context = manager.get_context(123, limit=3) |
| assert len(context) == 3 |
| assert context[0]['message'] == "Message 2" |
| assert context[1]['message'] == "Message 3" |
| assert context[2]['message'] == "Message 4" |
| |
| def test_multiple_chats(self): |
| """测试多个聊天的上下文分离""" |
| manager = ContextManager() |
| |
| # 添加不同聊天的消息 |
| manager.add_message(chat_id=123, user_id=456, message="Chat 1 Message") |
| manager.add_message(chat_id=456, user_id=789, message="Chat 2 Message") |
| |
| # 验证上下文分离 |
| context1 = manager.get_context(123) |
| context2 = manager.get_context(456) |
| |
| assert len(context1) == 1 |
| assert len(context2) == 1 |
| assert context1[0]['message'] == "Chat 1 Message" |
| assert context2[0]['message'] == "Chat 2 Message" |
| assert manager.get_chat_count() == 2 |
| |
| def test_clear_context(self): |
| """测试清空上下文""" |
| manager = ContextManager() |
| |
| # 添加消息 |
| manager.add_message(chat_id=123, user_id=456, message="Test message") |
| assert manager.get_message_count(123) == 1 |
| |
| # 清空上下文 |
| manager.clear_context(123) |
| assert manager.get_message_count(123) == 0 |
| |
| def test_export_import_context(self): |
| """测试导出和导入上下文""" |
| manager = ContextManager() |
| |
| # 添加测试数据 |
| manager.add_message(chat_id=123, user_id=456, message="Message 1") |
| manager.add_message(chat_id=123, user_id=789, message="Message 2", is_bot=True) |
| |
| # 导出上下文 |
| exported = manager.export_context(123) |
| assert isinstance(exported, str) |
| |
| # 解析JSON验证内容 |
| data = json.loads(exported) |
| assert len(data) == 2 |
| assert data[0]['message'] == "Message 1" |
| assert data[1]['message'] == "Message 2" |
| |
| # 清空并导入 |
| manager.clear_context(123) |
| manager.import_context(123, exported) |
| |
| # 验证导入成功 |
| context = manager.get_context(123) |
| assert len(context) == 2 |
| assert context[0]['message'] == "Message 1" |
| assert context[1]['message'] == "Message 2" |
| |
| def test_import_invalid_json(self): |
| """测试导入无效JSON""" |
| manager = ContextManager() |
| |
| with pytest.raises(ValueError, match="无效的上下文JSON数据"): |
| manager.import_context(123, "invalid json") |
| |
| def test_get_user_message_count(self): |
| """测试获取用户消息数量""" |
| manager = ContextManager() |
| |
| # 添加不同用户的消息 |
| manager.add_message(chat_id=123, user_id=456, message="User 1 Message 1") |
| manager.add_message(chat_id=123, user_id=456, message="User 1 Message 2") |
| manager.add_message(chat_id=123, user_id=789, message="User 2 Message") |
| manager.add_message(chat_id=123, user_id=999, message="Bot Message", is_bot=True) |
| |
| # 验证用户消息计数 |
| assert manager.get_user_message_count(123, 456) == 2 |
| assert manager.get_user_message_count(123, 789) == 1 |
| assert manager.get_user_message_count(123, 999) == 0 # Bot消息不计入 |
| |
| def test_thread_safety(self): |
| """测试线程安全""" |
| import threading |
| import time |
| |
| manager = ContextManager() |
| results = [] |
| |
| def add_messages(chat_id, user_id, count): |
| for i in range(count): |
| manager.add_message(chat_id, user_id, f"Message {i}") |
| time.sleep(0.001) # 小延迟模拟实际使用 |
| results.append(manager.get_message_count(chat_id)) |
| |
| # 创建多个线程同时添加消息 |
| threads = [] |
| for i in range(3): |
| thread = threading.Thread(target=add_messages, args=(123, 456 + i, 5)) |
| threads.append(thread) |
| thread.start() |
| |
| # 等待所有线程完成 |
| for thread in threads: |
| thread.join() |
| |
| # 验证结果 |
| total_messages = manager.get_message_count(123) |
| assert total_messages == 15 # 3个线程 * 5条消息 |
| |
| @patch('claude_agent.telegram.context_manager.datetime') |
| def test_cleanup_old_chats(self, mock_datetime): |
| """测试清理旧聊天""" |
| from datetime import datetime, timedelta |
| |
| manager = ContextManager() |
| |
| # 设置当前时间 |
| current_time = datetime(2024, 1, 15, 12, 0, 0) |
| old_time = current_time - timedelta(days=8) |
| |
| # Mock datetime.now() |
| mock_datetime.now.return_value = current_time |
| # Mock datetime.fromisoformat() |
| def mock_fromisoformat(timestamp_str): |
| if timestamp_str == old_time.isoformat(): |
| return old_time |
| else: |
| return current_time |
| mock_datetime.fromisoformat = mock_fromisoformat |
| |
| # 添加旧消息(手动设置时间戳) |
| manager.add_message(chat_id=123, user_id=456, message="Old message") |
| manager.add_message(chat_id=456, user_id=789, message="New message") |
| |
| # 手动修改时间戳 |
| context_123 = manager._contexts["123"] |
| context_123[0]['timestamp'] = old_time.isoformat() |
| |
| assert manager.get_chat_count() == 2 |
| |
| # 清理7天前的聊天 |
| manager.cleanup_old_chats(max_inactive_days=7) |
| |
| # 验证旧聊天被清理 |
| assert manager.get_chat_count() == 1 |
| assert manager.get_message_count(456) == 1 |
| assert manager.get_message_count(123) == 0 |
| |
| def test_empty_chat_context(self): |
| """测试空聊天上下文""" |
| manager = ContextManager() |
| |
| # 获取不存在的聊天上下文 |
| context = manager.get_context(999) |
| assert len(context) == 0 |
| |
| # 获取消息数量 |
| assert manager.get_message_count(999) == 0 |
| assert manager.get_user_message_count(999, 456) == 0 |
| |
| def test_add_message_with_user_info(self): |
| """测试添加带用户信息的消息""" |
| manager = ContextManager() |
| user_info = { |
| 'username': 'testuser', |
| 'first_name': 'Test', |
| 'last_name': 'User' |
| } |
| |
| # 添加带用户信息的消息 |
| manager.add_message( |
| chat_id=123, |
| user_id=456, |
| message="Test message", |
| user_info=user_info |
| ) |
| |
| # 验证消息被正确存储 |
| context = manager.get_context(123) |
| assert len(context) == 1 |
| assert 'user_info' in context[0] |
| assert context[0]['user_info'] == user_info |
| |
| # 添加不带用户信息的消息 |
| manager.add_message(chat_id=123, user_id=789, message="No user info") |
| context = manager.get_context(123) |
| assert len(context) == 2 |
| assert 'user_info' in context[0] # 第一条有user_info |
| assert 'user_info' not in context[1] # 第二条没有user_info |