blob: 0a2882b6ea37fb338ba454818483109865181d31 [file] [log] [blame] [raw]
"""
消息处理器
处理Telegram消息的核心逻辑
"""
import logging
import random
from typing import Union, Optional, Dict, List, Any, Callable
from telegram import Update, User, Chat
from telegram.ext import ContextTypes
from .interfaces import IMessageHandler, ITelegramClient, IContextManager, IFileHandler, IClaudeAgent, IStreamMessageSender
logger = logging.getLogger(__name__)
class GroupParticipationManager:
"""群聊参与管理器 - 智能决定何时参与群聊"""
def __init__(self, bot_names: list[str] = None, participation_range: list[int] = None):
"""
初始化群聊参与管理器
Args:
bot_names: Bot可能被提及的名字列表
participation_range: 随机参与的消息数量范围 [min, max]
"""
# 每个群聊的消息计数器和触发阈值
self._message_counters: Dict[str, int] = {}
self._trigger_thresholds: Dict[str, int] = {}
# bot的可能名字(用于检测),支持配置覆盖
self._bot_names = bot_names if bot_names is not None else [
'claude', 'Claude', 'CLAUDE',
'克劳德', 'AI', 'ai', 'bot', 'Bot', 'BOT',
'助手', '机器人'
]
# 随机参与的消息数量范围,支持配置覆盖
self._participation_range = participation_range if participation_range is not None else [1, 10]
def reset_counter(self, chat_id: str):
"""重置计数器并设置新的随机阈值"""
self._message_counters[chat_id] = 0
min_count, max_count = self._participation_range
self._trigger_thresholds[chat_id] = random.randint(min_count, max_count)
logger.info(f"群聊 {chat_id} 设置新的参与阈值: {self._trigger_thresholds[chat_id]} (范围: {min_count}-{max_count})")
def should_participate_random(self, chat_id: str) -> bool:
"""检查是否达到随机参与条件"""
if chat_id not in self._message_counters:
self.reset_counter(chat_id)
return False
self._message_counters[chat_id] += 1
current_count = self._message_counters[chat_id]
threshold = self._trigger_thresholds[chat_id]
logger.debug(f"群聊 {chat_id} 消息计数: {current_count}/{threshold}")
if current_count >= threshold:
logger.info(f"群聊 {chat_id} 达到随机参与阈值,准备参与对话")
self.reset_counter(chat_id) # 重置计数器
return True
return False
def is_name_mentioned(self, text: str) -> bool:
"""检查消息中是否提到了bot的名字"""
text_lower = text.lower()
for name in self._bot_names:
if name.lower() in text_lower:
logger.info(f"检测到bot名字被提及: '{name}' in '{text[:50]}...'")
return True
return False
class MessageHandler(IMessageHandler):
"""消息处理器实现"""
def __init__(
self,
telegram_client: ITelegramClient,
context_manager: IContextManager,
file_handler: IFileHandler,
claude_agent: IClaudeAgent,
stream_sender: IStreamMessageSender,
allowed_users: list[int],
allowed_groups: list[int],
telegram_config: dict = None,
webhook_broadcast_callback: Optional[Callable] = None
):
"""
初始化消息处理器
Args:
telegram_client: Telegram客户端
context_manager: 上下文管理器
file_handler: 文件处理器
claude_agent: Claude Agent
stream_sender: 流式消息发送器
allowed_users: 允许的用户ID列表
allowed_groups: 允许的群组ID列表
telegram_config: Telegram配置字典(可选)
webhook_broadcast_callback: Webhook广播回调函数(可选)
"""
self.telegram_client = telegram_client
self.context_manager = context_manager
self.file_handler = file_handler
self.claude_agent = claude_agent
self.stream_sender = stream_sender
self.allowed_users = set(allowed_users)
self.allowed_groups = set(allowed_groups)
self.webhook_broadcast_callback = webhook_broadcast_callback
# 从配置中获取群聊参与设置
if telegram_config:
group_config = telegram_config.get('group_participation', {})
bot_names = group_config.get('bot_names')
participation_range = group_config.get('random_participation_range')
else:
bot_names = None
participation_range = None
# 初始化群聊参与管理器(使用配置或默认值)
self.participation_manager = GroupParticipationManager(
bot_names=bot_names,
participation_range=participation_range
)
logger.info(f"消息处理器初始化完成:")
logger.info(f" Bot名字列表: {self.participation_manager._bot_names}")
logger.info(f" 随机参与范围: {self.participation_manager._participation_range}")
async def handle_text_message(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""处理文本消息"""
try:
logger.debug("handle_text_message 被调用")
message = update.effective_message
user = update.effective_user
chat = update.effective_chat
if not message or not user or not chat:
logger.warning(f"缺少必要对象 - message: {message}, user: {user}, chat: {chat}")
return
text = message.text or ""
# 提取回复信息但不直接拼接到消息中
reply_info = self._extract_reply_info(message) if message.reply_to_message else None
user_id = user.id
chat_id = chat.id
logger.debug(f"收到消息 - 聊天ID: {chat_id}, 用户ID: {user_id}, 聊天类型: {chat.type}")
logger.debug(f"原始消息内容: '{text}'")
if reply_info:
logger.debug(f"回复信息: {reply_info}")
# 检查是否需要回复
should_reply, is_random_participation = await self._get_reply_info(update, text)
# 🔄 统一保存用户消息到conversations.json(无论是否回复,只保存一次)
if (chat.type in ['group', 'supergroup'] and chat_id in self.allowed_groups) or should_reply:
# 构建用户信息(包含格式化的时间戳)
user_info = self._build_user_info(user, chat, message)
# 将回复信息添加到user_info中,而不是消息内容中
if reply_info:
user_info['reply_info'] = reply_info
# 直接通过Claude Agent保存消息到conversations.json(只保存一次)
await self._save_message_to_agent(chat_id, user_id, text, is_bot=False, user_info=user_info)
logger.info(f"✅ 用户消息已保存到conversations.json - 聊天ID: {chat_id}, 用户: {user_id}, 消息: '{text[:50]}...', 是否回复: {should_reply}")
else:
logger.debug(f"❌ 消息不符合保存条件 - 聊天类型: {chat.type}, 是否在白名单: {chat_id in self.allowed_groups}, 是否回复: {should_reply}")
if should_reply:
# 🔄 不再传递历史上下文,让Agent使用自己的内存历史
# 避免重复合并导致的历史消息膨胀问题
formatted_context = [] # 传递空上下文,Agent会使用自己的conversation_history
# 创建流式响应生成器
logger.debug(f"开始创建流式响应生成器,聊天ID: {chat_id}")
async def generate_response():
logger.debug("generate_response 函数被调用")
chunk_num = 0
async for chunk in self.claude_agent.create_streaming_response(text, formatted_context, user_info):
chunk_num += 1
logger.debug(f"生成器产生chunk #{chunk_num}: {len(str(chunk))} 字符")
yield chunk
logger.debug(f"生成器完成,总共 {chunk_num} chunks")
logger.debug("调用 stream_sender.send_streaming_message...")
# 发送流式回复 - 如果是随机参与则不回复特定消息
reply_to_message_id = None if is_random_participation else message.message_id
if is_random_participation:
logger.debug("随机参与模式:不回复特定消息")
else:
logger.debug(f"回复特定消息ID: {reply_to_message_id}")
reply_message = await self.stream_sender.send_streaming_message(
chat_id=chat_id,
message_generator=generate_response,
initial_text="⌨️ User is typing...",
reply_to_message_id=reply_to_message_id
)
logger.debug(f"stream_sender 完成,返回消息ID: {reply_message.message_id if reply_message else None}")
# 广播Bot消息到Webhook服务器(如果配置了)
if self.webhook_broadcast_callback and reply_message and chat.type in ['group', 'supergroup']:
try:
# 构建Bot发送者信息
bot_sender_info = {
'user_id': reply_message.from_user.id if reply_message.from_user else 0,
'username': reply_message.from_user.username if reply_message.from_user else None,
'first_name': reply_message.from_user.first_name if reply_message.from_user else 'Bot',
'last_name': reply_message.from_user.last_name if reply_message.from_user else None,
'is_bot': True
}
# 构建回复信息(如果有)
webhook_reply_info = None
if reply_info:
webhook_reply_info = {
'user_info': reply_info,
'timestamp': reply_info.get('timestamp'),
'content': reply_info.get('content', ''),
'message_id': reply_to_message_id
}
# 调用Webhook广播
await self.webhook_broadcast_callback(
group_id=chat_id,
message_content=reply_message.text or '',
sender_info=bot_sender_info,
reply_info=webhook_reply_info,
telegram_message_id=reply_message.message_id,
message_type='text'
)
logger.debug(f"Bot消息已广播到Webhook: {chat_id}")
except Exception as e:
logger.warning(f"Webhook广播失败: {e}")
# 注意:Bot回复已经在流式响应过程中保存到conversations.json
# 不需要在这里重复保存,避免数据重复
# 如果是群聊,bot回复后重置随机参与计数器
if chat.type in ['group', 'supergroup']:
self.participation_manager.reset_counter(str(chat_id))
except Exception as e:
logger.error(f"文本消息处理错误: {e}")
if update.effective_chat:
try:
await self.telegram_client.send_message(
chat_id=update.effective_chat.id,
text="❌ 消息处理失败,请稍后再试。"
)
except:
pass
async def handle_webhook_message(
self,
chat_id: int,
user_id: int,
message_text: str,
user_info: dict,
chat_type: str = 'supergroup',
is_bot: bool = False
):
"""
处理Webhook消息的专用方法
Args:
chat_id: 聊天ID
user_id: 用户ID
message_text: 消息文本
user_info: 用户信息字典
chat_type: 聊天类型 ('group', 'supergroup', 'private')
is_bot: 是否为Bot消息
"""
try:
logger.debug(f"处理Webhook消息 - 聊天ID: {chat_id}, 用户ID: {user_id}")
# 对于Webhook消息,使用与Telegram消息相同的智能参与逻辑
should_reply = False
is_random_participation = False
# 1. 检查@mention
if self._check_mention_in_text_webhook(message_text):
should_reply = True
is_random_participation = False
# @提及后重置随机计数器
if chat_type in ['group', 'supergroup']:
self.participation_manager.reset_counter(str(chat_id))
logger.info(f"Webhook消息检测到@mention,触发回复")
# 2. 如果没有@mention,检查随机参与(仅对群组)
# 暂时屏蔽掉,这回让bot们在群里聊个没完没了 根本停不下来
# elif chat_type in ['group', 'supergroup'] and chat_id in self.allowed_groups:
# should_participate = self.participation_manager.should_participate_random(str(chat_id))
# if should_participate:
# should_reply = True
# is_random_participation = True
# logger.info(f"Webhook消息触发随机参与: 群聊 {chat_id}")
# else:
# # 即使不回复,也要计数(随机参与逻辑内部已经处理了计数)
# logger.debug(f"Webhook消息计数中: 群聊 {chat_id}")
# 3. 私聊逻辑(如果需要支持)
elif chat_type == 'private' and chat_id in self.allowed_users:
should_reply = True
is_random_participation = False
logger.info(f"Webhook消息回复决策 - 聊天ID: {chat_id}, 聊天类型: {chat_type}")
logger.info(f"Webhook消息回复决策 - 是否回复: {should_reply}, 随机参与: {is_random_participation}")
logger.info(f"Webhook消息回复决策 - 允许的群组: {list(self.allowed_groups)}")
logger.info(f"Webhook消息回复决策 - 群组检查: {chat_id in self.allowed_groups}")
# 🔄 统一保存用户消息到conversations.json(无论是否回复,只保存一次)
if (chat_type in ['group', 'supergroup'] and chat_id in self.allowed_groups) or should_reply:
# 直接通过Claude Agent保存消息到conversations.json(只保存一次)
await self._save_message_to_agent(chat_id, user_id, message_text, is_bot=is_bot, user_info=user_info)
logger.info(f"✅ Webhook消息已保存到conversations.json - 聊天ID: {chat_id}, 用户: {user_id}, 消息: '{message_text[:50]}...', 是否回复: {should_reply}")
else:
logger.debug(f"❌ Webhook消息不符合保存条件 - 聊天类型: {chat_type}, 是否在白名单: {chat_id in self.allowed_groups}, 是否回复: {should_reply}")
if should_reply:
# 🔄 不再传递历史上下文,让Agent使用自己的内存历史
# 避免重复合并导致的历史消息膨胀问题
formatted_context = [] # 传递空上下文,Agent会使用自己的conversation_history
# 创建流式响应生成器
logger.debug(f"开始创建Webhook流式响应生成器,聊天ID: {chat_id}")
async def create_response_generator():
"""创建响应生成器"""
try:
# 调用Claude Agent
async for chunk in self.claude_agent.create_streaming_response(
message=message_text,
context=formatted_context,
user_info=user_info
):
yield chunk
except Exception as e:
logger.error(f"创建Webhook响应生成器失败: {e}")
yield f"抱歉,处理您的消息时遇到错误: {str(e)}"
# 发送流式响应
logger.debug(f"开始发送Webhook流式响应,聊天ID: {chat_id}")
reply_message = await self.stream_sender.send_streaming_message(
chat_id=chat_id,
message_generator=create_response_generator,
initial_text="⌨️ User is typing..."
)
# 保存Bot的回复消息到conversations.json
if reply_message and reply_message.text:
bot_user_id = reply_message.from_user.id if reply_message.from_user else 999
bot_user_info = {
'user_id': bot_user_id,
'username': reply_message.from_user.username if reply_message.from_user else None,
'first_name': reply_message.from_user.first_name if reply_message.from_user else 'Bot',
'is_bot': True
}
await self._save_message_to_agent(
chat_id, bot_user_id, reply_message.text,
is_bot=True, user_info=bot_user_info
)
logger.info(f"✅ Webhook Bot回复已保存到conversations.json - 聊天ID: {chat_id}, 内容: '{reply_message.text[:50]}...'")
# 广播Bot回复到Webhook服务器(如果配置了)
if self.webhook_broadcast_callback and chat_type in ['group', 'supergroup']:
try:
logger.info(f"准备广播Webhook Bot回复到其他Bot...")
# 构建Bot发送者信息
bot_sender_info = {
'user_id': bot_user_id,
'username': reply_message.from_user.username if reply_message.from_user else None,
'first_name': reply_message.from_user.first_name if reply_message.from_user else 'Bot',
'last_name': reply_message.from_user.last_name if reply_message.from_user else None,
'is_bot': True
}
# 调用Webhook广播
success = await self.webhook_broadcast_callback(
group_id=chat_id,
message_content=reply_message.text or '',
sender_info=bot_sender_info,
reply_info=None,
telegram_message_id=reply_message.message_id if reply_message else None,
message_type='text'
)
if success:
logger.info(f"✅ Webhook Bot回复已成功广播: {chat_id}")
else:
logger.warning(f"⚠️ Webhook Bot回复广播失败: {chat_id}")
except Exception as e:
logger.error(f"Webhook Bot回复广播异常: {e}")
# 如果是群聊,bot回复后重置随机参与计数器
if chat_type in ['group', 'supergroup']:
self.participation_manager.reset_counter(str(chat_id))
logger.debug(f"Webhook回复后重置计数器: 群聊 {chat_id}")
except Exception as e:
logger.error(f"Webhook消息处理错误: {e}")
def _check_mention_in_text_webhook(self, text: str) -> bool:
"""检查文本中是否包含对当前Bot的@mention"""
logger.debug(f"检查@mention - 消息内容: '{text}'")
# logger.debug(f"检查@mention - Bot名字列表: {self.participation_manager._bot_names}")
# 检查Bot名字列表
# 谨慎开启,这个开启后Bot会在群里刷屏
# for bot_name in self.participation_manager._bot_names:
# if bot_name.lower() in text.lower():
# logger.info(f"检测到Bot名字提及: '{bot_name}' in '{text[:100]}...'")
# return True
# 检查Bot用户名@mention(如果可用)
if hasattr(self, 'bot_username') and self.bot_username:
logger.debug(f"检查@mention - Bot用户名: @{self.bot_username}")
if f"@{self.bot_username}" in text:
logger.info(f"检测到@用户名提及: '@{self.bot_username}' in '{text[:100]}...'")
return True
else:
logger.debug("检查@mention - Bot用户名未设置")
logger.debug("检查@mention - 未检测到任何提及")
return False
async def handle_photo_message(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""处理图片消息 - 通过MCP工具分析图片"""
try:
if not await self._is_authorized(update):
return
message = update.effective_message
user = update.effective_user
chat = update.effective_chat
if not message or not user or not chat or not message.photo:
return
user_id = user.id
chat_id = chat.id
caption = message.caption or ""
# 提取回复信息
reply_info = self._extract_reply_info(message) if message.reply_to_message else None
# 检查是否需要回复
should_reply, _ = await self._get_reply_info(update, caption)
if should_reply:
# 构建用户信息(包含时间戳)
user_info = self._build_user_info(user, chat, message)
# 将回复信息添加到user_info中
if reply_info:
user_info['reply_info'] = reply_info
# 下载图片到临时目录
try:
photo_file = await message.photo[-1].get_file()
import os
os.makedirs("temp/telegram", exist_ok=True)
file_path = f"temp/telegram/{chat_id}_{message.message_id}.jpg"
await photo_file.download_to_drive(file_path)
# 获取绝对路径
abs_path = os.path.abspath(file_path)
logger.info(f"图片已下载到: {abs_path}")
# 构建提示Claude使用MCP工具的消息
if caption:
prompt_message = f"用户发送了一张图片并说: {caption}\\n\\n请使用 analyze_telegram_image 工具(路径: {abs_path})查看并分析这张图片,然后回复用户。"
save_message = f"📸 [图片] {caption}"
else:
prompt_message = f"用户发送了一张图片\\n\\n请使用 analyze_telegram_image 工具(路径: {abs_path})查看并分析这张图片,然后回复用户。"
save_message = f"📸 [图片]"
# 保存用户消息到本地历史(简化版,因为SDK会保存完整版)
await self._save_message_to_agent(chat_id, user_id, save_message, is_bot=False, user_info=user_info)
# 创建流式响应
async def generate_response():
try:
# Claude会自动调用MCP工具来查看图片
async for chunk in self.claude_agent.create_streaming_response(
prompt_message, [], user_info
):
yield chunk
except Exception as e:
logger.error(f"图片消息处理失败: {e}", exc_info=True)
yield f"❌ 图片处理失败: {str(e)}"
# 发送流式回复
reply_message = await self.stream_sender.send_streaming_message(
chat_id=chat_id,
message_generator=generate_response,
initial_text="🖼️ 正在分析图片...",
reply_to_message_id=message.message_id
)
# 清理临时文件
try:
os.remove(abs_path)
logger.debug(f"已删除临时图片文件: {abs_path}")
except Exception as e:
logger.debug(f"删除临时文件失败: {e}")
# 群聊中重置参与计数器
if chat.type in ['group', 'supergroup']:
self.participation_manager.reset_counter(str(chat_id))
except Exception as img_error:
logger.error(f"图片下载或处理失败: {img_error}", exc_info=True)
await self.telegram_client.send_message(
chat_id=chat_id,
text=f"❌ 图片处理失败: {str(img_error)}",
reply_to_message_id=message.message_id
)
except Exception as e:
logger.error(f"图片消息处理错误: {e}")
if update.effective_chat:
try:
await self.telegram_client.send_message(
chat_id=update.effective_chat.id,
text="❌ 图片处理出错,请稍后再试。"
)
except:
pass
async def handle_document_message(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""处理文档消息 - 暂时禁用文档处理功能"""
try:
if not await self._is_authorized(update):
return
message = update.effective_message
user = update.effective_user
chat = update.effective_chat
if not message or not user or not chat or not message.document:
return
user_id = user.id
chat_id = chat.id
document = message.document
caption = message.caption or f"发送了文档: {document.file_name}"
# 提取回复信息
reply_info = self._extract_reply_info(message) if message.reply_to_message else None
# 检查是否需要回复
should_reply, _ = await self._get_reply_info(update, caption)
if should_reply:
# 构建用户信息
user_info = self._build_user_info(user, chat)
# 将回复信息添加到user_info中
if reply_info:
user_info['reply_info'] = reply_info
# 添加用户消息到上下文
user_message = f"[文档: {document.file_name}] {caption}"
self.context_manager.add_message(chat_id, user_id, user_message, is_bot=False, user_info=user_info)
# 简单的文档处理不可用提示
response = "📄 文档处理功能暂时不可用,请用文字描述您想了解的内容,我会尽力帮助您!"
# 发送回复
reply_message = await self.telegram_client.send_message(
chat_id=chat_id,
text=response,
reply_to_message_id=message.message_id
)
# 将Bot回复添加到上下文
if reply_message and reply_message.text:
self.context_manager.add_message(chat_id, reply_message.from_user.id, reply_message.text, is_bot=True)
# 如果是群聊,bot回复后重置随机参与计数器
if chat.type in ['group', 'supergroup']:
self.participation_manager.reset_counter(str(chat_id))
except Exception as e:
logger.error(f"文档消息处理错误: {e}")
if update.effective_chat:
try:
await self.telegram_client.send_message(
chat_id=update.effective_chat.id,
text="❌ 文档处理功能暂时不可用,请稍后再试。"
)
except:
pass
async def _is_authorized(self, update: Update) -> bool:
"""检查用户是否有权限"""
user = update.effective_user
chat = update.effective_chat
if not user or not chat:
return False
# 私聊检查
if chat.type == 'private':
return user.id in self.allowed_users
# 群组检查
if chat.type in ['group', 'supergroup']:
return chat.id in self.allowed_groups
return False
async def _get_reply_info(self, update: Update, text: str) -> tuple[bool, bool]:
"""
获取回复信息
Returns:
(should_reply: bool, is_random_participation: bool)
"""
chat = update.effective_chat
user = update.effective_user
if not chat or not user:
return False, False
# 私聊:需要用户在白名单中
if chat.type == 'private':
if user.id not in self.allowed_users:
logger.debug(f"私聊用户 {user.id} 不在白名单中,不回复")
return False, False
return True, False # 私聊中白名单用户总是回复,不是随机参与
# 群聊:需要群组在白名单中,群内任何用户都可以触发
if chat.type in ['group', 'supergroup']:
if chat.id not in self.allowed_groups:
logger.debug(f"群组 {chat.id} 不在白名单中,不回复")
return False, False
# 群组在白名单中,检查智能参与条件
return await self._should_reply_detailed(update, text)
return False, False
async def _should_reply_detailed(self, update: Update, text: str) -> tuple[bool, bool]:
"""
详细判断是否应该回复消息(智能参与模式)- 仅处理群聊逻辑
Returns:
(should_reply: bool, is_random_participation: bool)
"""
chat = update.effective_chat
if not chat:
return False, False
# 此方法只处理群聊智能参与逻辑
if chat.type in ['group', 'supergroup']:
chat_id = str(chat.id)
logger.debug(f"群组消息检测 - 聊天ID: {chat.id}, 类型: {chat.type}")
logger.debug(f"消息文本: '{text}'")
# 1. 检查是否直接回复了bot的消息
message = update.effective_message
if message and message.reply_to_message:
replied_message = message.reply_to_message
if replied_message.from_user and replied_message.from_user.is_bot:
# 获取bot信息来确认是回复的我们的bot
bot_user = update.get_bot()
if bot_user and replied_message.from_user.id == bot_user.id:
logger.info("检测到用户回复bot消息,触发回复")
# 回复后重置随机计数器
self.participation_manager.reset_counter(chat_id)
return True, False # 这不是随机参与
# 2. 检查是否被@提及
bot_user = update.get_bot()
if bot_user and bot_user.username:
bot_username = bot_user.username
mentions = [f"@{bot_username}", f"@{bot_username.lower()}"]
text_lower = text.lower()
for mention in mentions:
if mention in text_lower:
logger.info(f"检测到@提及: '{mention}',触发回复")
# @提及后重置随机计数器
self.participation_manager.reset_counter(chat_id)
return True, False # 这不是随机参与
# 3. 检查是否提到了bot的名字
if self.participation_manager.is_name_mentioned(text):
logger.info("检测到bot名字被提及,触发回复")
# 名字提及后重置随机计数器
self.participation_manager.reset_counter(chat_id)
return True, False # 这不是随机参与
# 4. 检查是否达到随机参与条件
if self.participation_manager.should_participate_random(chat_id):
logger.info(f"群聊 {chat_id} 达到随机参与条件,主动参与对话")
return True, True # 这是随机参与
# 5. 都不满足,不回复但消息已被记录
logger.debug("群聊消息不触发回复条件,但已记录到上下文")
return False, False
# 私聊逻辑已移到 _get_reply_info 中处理
return False, False
async def _should_reply_with_auth(self, update: Update, text: str) -> bool:
"""结合授权检查和回复条件判断是否应该回复"""
chat = update.effective_chat
user = update.effective_user
if not chat or not user:
return False
# 私聊:需要用户在白名单中
if chat.type == 'private':
if user.id not in self.allowed_users:
logger.debug(f"私聊用户 {user.id} 不在白名单中,不回复")
return False
return True # 私聊中白名单用户总是回复
# 群聊:需要群组在白名单中,群内任何用户都可以触发
if chat.type in ['group', 'supergroup']:
if chat.id not in self.allowed_groups:
logger.debug(f"群组 {chat.id} 不在白名单中,不回复")
return False
# 群组在白名单中,检查智能参与条件
return await self._should_reply(update, text)
return False
async def _should_reply(self, update: Update, text: str) -> bool:
"""判断是否应该回复消息(智能参与模式)- 仅处理群聊逻辑"""
chat = update.effective_chat
if not chat:
return False
# 此方法只处理群聊智能参与逻辑
if chat.type in ['group', 'supergroup']:
chat_id = str(chat.id)
logger.debug(f"群组消息检测 - 聊天ID: {chat.id}, 类型: {chat.type}")
logger.debug(f"消息文本: '{text}'")
# 1. 检查是否直接回复了bot的消息
message = update.effective_message
if message and message.reply_to_message:
replied_message = message.reply_to_message
if replied_message.from_user and replied_message.from_user.is_bot:
# 获取bot信息来确认是回复的我们的bot
bot_user = update.get_bot()
if bot_user and replied_message.from_user.id == bot_user.id:
logger.info("检测到用户回复bot消息,触发回复")
# 回复后重置随机计数器
self.participation_manager.reset_counter(chat_id)
return True
# 2. 检查是否被@提及
bot_user = update.get_bot()
if bot_user and bot_user.username:
bot_username = bot_user.username
mentions = [f"@{bot_username}", f"@{bot_username.lower()}"]
text_lower = text.lower()
for mention in mentions:
if mention in text_lower:
logger.info(f"检测到@提及: '{mention}',触发回复")
# @提及后重置随机计数器
self.participation_manager.reset_counter(chat_id)
return True
# 3. 检查是否提到了bot的名字
if self.participation_manager.is_name_mentioned(text):
logger.info("检测到bot名字被提及,触发回复")
# 名字提及后重置随机计数器
self.participation_manager.reset_counter(chat_id)
return True
# 4. 检查是否达到随机参与条件
if self.participation_manager.should_participate_random(chat_id):
logger.info(f"群聊 {chat_id} 达到随机参与条件,主动参与对话")
return True
# 5. 都不满足,不回复但消息已被记录
logger.debug("群聊消息不触发回复条件,但已记录到上下文")
return False
# 私聊逻辑已移到 _should_reply_with_auth 中处理
return False
def _build_user_info(self, user: User, chat: Chat, message=None) -> dict:
"""构建用户信息"""
user_info = {
'user_id': user.id,
'username': user.username,
'first_name': user.first_name,
'last_name': user.last_name,
'language_code': user.language_code,
'chat_id': chat.id,
'chat_type': chat.type,
'chat_title': getattr(chat, 'title', None)
}
# 添加消息时间戳(格式化为可读字符串)
if message and message.date:
try:
# 转换为本地时区
import pytz
from datetime import datetime
if message.date.tzinfo is None:
utc_time = pytz.utc.localize(message.date)
else:
utc_time = message.date
local_time = utc_time.astimezone()
# 格式化为 "2025-12-10 18:30:15"
user_info['message_time'] = local_time.strftime('%Y-%m-%d %H:%M:%S')
except Exception as e:
logger.debug(f"时间格式化失败: {e}")
# 降级方案:使用简单格式
try:
user_info['message_time'] = message.date.strftime('%Y-%m-%d %H:%M:%S')
except:
pass
return user_info
def _extract_reply_info(self, message) -> dict:
"""
提取回复消息的信息
Args:
message: Telegram消息对象
Returns:
dict: 包含回复信息的字典,如果没有回复则返回None
"""
if not message or not message.reply_to_message:
return None
reply_msg = message.reply_to_message
try:
# 提取被回复消息的用户信息
reply_user_info = {}
if reply_msg.from_user:
user = reply_msg.from_user
reply_user_info = {
'user_id': user.id,
'username': user.username,
'first_name': user.first_name,
'last_name': user.last_name,
'is_bot': user.is_bot
}
# Debug: 记录特殊的用户情况
if user.id == 0 or (not user.username and not user.first_name):
logger.debug(f"检测到特殊用户信息 - ID: {user.id}, username: {user.username}, first_name: {user.first_name}, is_bot: {user.is_bot}")
else:
# 处理from_user为None的情况(如频道消息或系统消息)
logger.debug(f"被回复消息缺少from_user信息,消息ID: {reply_msg.message_id}")
reply_user_info = {
'user_id': 'unknown',
'username': None,
'first_name': 'system',
'last_name': None,
'is_bot': False
}
# 提取时间信息
reply_timestamp = None
if reply_msg.date:
try:
# 如果原始时间没有时区信息,添加UTC时区
if reply_msg.date.tzinfo is None:
import pytz
utc_time = reply_msg.date.replace(tzinfo=pytz.UTC)
else:
utc_time = reply_msg.date
# 转换为本地时区
local_time = utc_time.astimezone() # 自动使用系统本地时区
reply_timestamp = local_time
except Exception as e:
logger.debug(f"时区转换失败,使用原始时间: {e}")
# 如果转换失败,使用原始时间
reply_timestamp = reply_msg.date
# 提取消息内容
reply_content = ""
if reply_msg.text:
reply_content = reply_msg.text
elif reply_msg.caption:
reply_content = reply_msg.caption
elif reply_msg.photo:
reply_content = "[图片]"
elif reply_msg.document:
reply_content = f"[文档: {reply_msg.document.file_name or '未知文件'}]"
elif reply_msg.voice:
reply_content = "[语音消息]"
elif reply_msg.video:
reply_content = "[视频]"
elif reply_msg.audio:
reply_content = "[音频]"
elif reply_msg.sticker:
reply_content = f"[贴纸: {reply_msg.sticker.emoji or '😀'}]"
else:
reply_content = "[其他类型消息]"
# 限制回复内容长度
max_reply_length = 100
if reply_content and len(str(reply_content)) > max_reply_length:
reply_content = str(reply_content)[:max_reply_length] + "..."
return {
'user_info': reply_user_info,
'timestamp': reply_timestamp,
'content': reply_content
}
except Exception as e:
logger.error(f"提取回复信息时出错: {e}")
return None
def _extract_reply_context(self, message) -> str:
"""
提取回复消息的上下文信息(已废弃,保留向后兼容)
Args:
message: Telegram消息对象
Returns:
str: 包含回复上下文的格式化文本,如果没有回复则返回空字符串
"""
reply_info = self._extract_reply_info(message)
if not reply_info:
return ""
# 构建用户显示名
user_info = reply_info['user_info']
if user_info.get('is_bot'):
user_display = "🤖 Bot"
elif user_info.get('username'):
user_display = f"@{user_info['username']}"
elif user_info.get('first_name'):
user_display = user_info['first_name']
if user_info.get('last_name'):
user_display += f" {user_info['last_name']}"
else:
user_display = f"用户{user_info.get('user_id', 'unknown')}"
# 格式化时间
time_info = ""
if reply_info['timestamp']:
time_info = f" {reply_info['timestamp'].strftime('%H:%M')}"
# 构建回复上下文
reply_context = f"↳ 回复 [{user_display}{time_info}]: {reply_info['content']}"
return reply_context
def _build_message_with_reply_context(self, text: str, message) -> str:
"""
构建包含回复上下文的完整消息
Args:
text: 原始消息文本
message: Telegram消息对象
Returns:
str: 包含回复上下文的完整消息文本
"""
reply_context = self._extract_reply_context(message)
if reply_context:
# 将回复上下文添加到消息前面
return f"{reply_context}\n{text}"
else:
return text
async def _get_agent_history(self, chat_id: Union[int, str]) -> List[Dict[str, Any]]:
"""
从Claude Agent获取已保存的对话历史
Args:
chat_id: 聊天ID
Returns:
对话历史列表,如果没有则返回空列表
"""
try:
# 通过Claude Agent获取已保存的对话历史
if hasattr(self.claude_agent, 'persistence'):
history = self.claude_agent.persistence.load_conversation_history(str(chat_id))
return history or []
return []
except Exception as e:
logger.warning(f"获取Agent历史记录失败: {e}")
return []
def _convert_agent_history_to_context_format(self, agent_history: List[Dict[str, Any]], chat_id: Union[int, str]) -> List[Dict[str, Any]]:
"""
将Agent历史格式转换为context_manager格式,用于向后兼容
Args:
agent_history: Agent历史记录(来自conversations.json)
chat_id: 聊天ID
Returns:
转换后的context格式列表
"""
context_format = []
for entry in agent_history:
try:
# Agent历史格式: {"role": "user/assistant", "content": "..."}
role = entry.get('role', 'user')
content = entry.get('content', '')
user_id = entry.get('user_id', 999 if role == 'assistant' else 123)
timestamp = entry.get('timestamp', '')
# 转换为context_manager格式
context_entry = {
'user_id': user_id,
'message': content,
'is_bot': role == 'assistant',
'timestamp': timestamp,
'chat_id': str(chat_id),
'user_info': {
'username': 'agent_user' if role == 'user' else 'bot',
'source': 'conversations_json' # 标记来源
}
}
context_format.append(context_entry)
except Exception as e:
logger.warning(f"转换Agent历史记录失败: {e}")
logger.debug(f"转换Agent历史完成: {len(agent_history)} → {len(context_format)} 条记录")
return context_format
async def _save_message_to_agent(self, chat_id: Union[int, str], user_id: int, message: str, is_bot: bool = False, user_info: dict = None):
"""
将消息添加到Claude Agent的内存历史中
此方法会构建完整的消息格式,包含用户显示名、时间戳和回复上下文,
然后将消息添加到指定聊天的Agent内存历史记录中。注意:此方法不会立即
保存到conversations.json文件,文件保存会在流式响应完成后统一进行。
Args:
chat_id: 聊天ID(群组ID或私聊ID)
user_id: 用户ID(发送消息的用户ID,机器人消息时使用bot的user_id)
message: 原始消息内容
is_bot: 是否为机器人消息,默认False
user_info: 用户信息字典,可选,包含:
- username: 用户名
- first_name: 名字
- last_name: 姓氏
- reply_info: 回复信息(如果是回复消息)
Note:
- 用户消息会被格式化为: "[用户显示名 时间] 消息内容"
- 机器人消息会被格式化为: "[Bot 时间] 消息内容"
- 如果包含回复信息,会在消息前添加回复上下文
- 消息会立即添加到Agent的conversation_history中,但不立即保存到文件
"""
try:
# 获取该聊天的Agent实例
agent = self.claude_agent._get_or_create_agent(chat_id)
# 构建消息格式,与_build_conversation_history保持一致
role = "assistant" if is_bot else "user"
if role == "user":
# 构建用户显示名
user_display = self._build_user_display_name(user_id, user_info)
# 处理回复上下文
reply_context = ""
reply_info = user_info.get('reply_info') if user_info else None
if reply_info:
reply_context = self._format_reply_context(reply_info)
if reply_context:
reply_context = f"{reply_context}\n"
# 添加时间信息到消息中
from datetime import datetime
try:
local_dt = datetime.now()
time_str = local_dt.strftime('%H:%M')
content_with_identity = f"{reply_context}[{user_display} {time_str}] {message}"
except Exception:
content_with_identity = f"{reply_context}[{user_display}] {message}"
else:
# Bot消息也加上时间戳
from datetime import datetime
try:
local_dt = datetime.now()
time_str = local_dt.strftime('%H:%M')
content_with_identity = f"[Bot {time_str}] {message}"
except Exception:
content_with_identity = message
# 添加到Agent历史
from datetime import datetime
message_entry = {
"role": role,
"content": content_with_identity,
"user_id": user_id,
"timestamp": datetime.now().isoformat()
}
agent.conversation_history.append(message_entry)
# 注意:不在这里立即保存,让流式响应完成后统一保存
# 这样避免了多次保存导致的重复数据问题
logger.debug(f"📝 消息已添加到Agent历史 - 聊天: {chat_id}, 角色: {role}, 内容: '{message[:50]}...'")
except Exception as e:
logger.error(f"保存消息到Agent失败: {e}")
def _build_user_display_name(self, user_id: int, user_info: dict) -> str:
"""
构建用户显示名称(从claude_adapter复制)
Args:
user_id: 用户ID
user_info: 用户信息字典
Returns:
友好的用户显示名称
"""
if not user_info:
return f"User{user_id}" if user_id != 'unknown' else "群友"
# 优先使用用户名
if user_info.get('username'):
return f"@{user_info['username']}"
# 其次使用姓名
name_parts = []
if user_info.get('first_name'):
name_parts.append(user_info['first_name'])
if user_info.get('last_name'):
name_parts.append(user_info['last_name'])
if name_parts:
return " ".join(name_parts)
# 最后回退到用户ID
return f"User{user_id}" if user_id != 'unknown' else "群友"
def _format_reply_context(self, reply_info: Dict[str, Any]) -> str:
"""
格式化回复上下文,保留原作者信息(从claude_adapter复制)
Args:
reply_info: 回复信息字典
Returns:
格式化的回复上下文字符串
"""
if not reply_info:
return ""
user_info = reply_info.get('user_info', {})
timestamp = reply_info.get('timestamp')
content = reply_info.get('content', '')
# 构建原作者显示名
if user_info.get('is_bot'):
# 对于Bot,优先显示username
if user_info.get('username'):
original_author = f"🤖 @{user_info['username']}"
elif user_info.get('first_name'):
original_author = f"🤖 {user_info['first_name']}"
if user_info.get('last_name'):
original_author += f" {user_info['last_name']}"
else:
original_author = "🤖 Bot"
elif user_info.get('username'):
original_author = f"@{user_info['username']}"
elif user_info.get('first_name'):
original_author = user_info['first_name']
if user_info.get('last_name'):
original_author += f" {user_info['last_name']}"
else:
# 改进边界情况处理
user_id = user_info.get('user_id', 'unknown')
if user_id == 'unknown' or user_id == 'system':
original_author = "system"
elif user_id == 0:
original_author = "unknown user"
else:
original_author = f"用户{user_id}"
# 格式化时间
time_info = ""
if timestamp:
try:
time_info = f" {timestamp.strftime('%H:%M')}"
except:
pass
# 构建回复上下文
reply_context = f"↳ 回复 [{original_author}{time_info}]: {content}"
return reply_context