fix(chat): 增强消息查找功能并添加未读消息自动清理机制

- 重构 `_find_message_by_id` 方法,支持多种消息ID格式和模糊匹配
- 在动作执行后自动清理未读消息,防止消息堆积
- 为 `reply` 动作添加目标消息查找失败时的降级处理
- 将消息计数和关系查询方法改为异步调用以保持一致性
This commit is contained in:
Windpicker-owo
2025-09-29 10:47:43 +08:00
parent 02d0490da7
commit fc429228b5
5 changed files with 177 additions and 51 deletions

View File

@@ -288,8 +288,8 @@ class MessageManager:
else: else:
logger.debug(f"聊天流 {stream_id} 未触发打断,打断概率: {interruption_probability:.2f}") logger.debug(f"聊天流 {stream_id} 未触发打断,打断概率: {interruption_probability:.2f}")
def _clear_all_unread_messages(self, stream_id: str): async def clear_all_unread_messages(self, stream_id: str):
"""清除指定上下文中的所有未读消息,防止意外情况导致消息一直未读""" """清除指定上下文中的所有未读消息,在消息处理完成后调用"""
try: try:
# 通过 ChatManager 获取 ChatStream # 通过 ChatManager 获取 ChatStream
chat_manager = get_chat_manager() chat_manager = get_chat_manager()

View File

@@ -195,6 +195,10 @@ class ChatterActionManager:
action_data={"reason": reason}, action_data={"reason": reason},
action_name="no_reply", action_name="no_reply",
) )
# 自动清空所有未读消息
await self._clear_all_unread_messages(chat_stream.stream_id, "no_reply")
return {"action_type": "no_reply", "success": True, "reply_text": "", "command": ""} return {"action_type": "no_reply", "success": True, "reply_text": "", "command": ""}
elif action_name != "reply" and action_name != "no_action": elif action_name != "reply" and action_name != "no_action":
@@ -212,6 +216,8 @@ class ChatterActionManager:
# 记录执行的动作到目标消息 # 记录执行的动作到目标消息
if success: if success:
await self._record_action_to_message(chat_stream, action_name, target_message, action_data) await self._record_action_to_message(chat_stream, action_name, target_message, action_data)
# 自动清空所有未读消息
await self._clear_all_unread_messages(chat_stream.stream_id, action_name)
# 重置打断计数 # 重置打断计数
await self._reset_interruption_count_after_action(chat_stream.stream_id) await self._reset_interruption_count_after_action(chat_stream.stream_id)
@@ -256,6 +262,9 @@ class ChatterActionManager:
# 记录回复动作到目标消息 # 记录回复动作到目标消息
await self._record_action_to_message(chat_stream, "reply", target_message, action_data) await self._record_action_to_message(chat_stream, "reply", target_message, action_data)
# 自动清空所有未读消息
await self._clear_all_unread_messages(chat_stream.stream_id, "reply")
# 回复成功,重置打断计数 # 回复成功,重置打断计数
await self._reset_interruption_count_after_action(chat_stream.stream_id) await self._reset_interruption_count_after_action(chat_stream.stream_id)
@@ -325,6 +334,24 @@ class ChatterActionManager:
except Exception as e: except Exception as e:
logger.warning(f"重置打断计数时出错: {e}") logger.warning(f"重置打断计数时出错: {e}")
async def _clear_all_unread_messages(self, stream_id: str, action_name: str):
"""在动作执行成功后自动清空所有未读消息
Args:
stream_id: 聊天流ID
action_name: 动作名称
"""
try:
from src.chat.message_manager.message_manager import message_manager
# 清空所有未读消息
await message_manager.clear_all_unread_messages(stream_id)
logger.debug(f"[{action_name}] 已自动清空聊天流 {stream_id} 的所有未读消息")
except Exception as e:
logger.error(f"[{action_name}] 自动清空未读消息时出错: {e}")
# 不抛出异常,避免影响主要功能
async def _handle_action( async def _handle_action(
self, chat_stream, action, reasoning, action_data, cycle_timers, thinking_id, action_message self, chat_stream, action, reasoning, action_data, cycle_timers, thinking_id, action_message
) -> tuple[bool, str, str]: ) -> tuple[bool, str, str]:
@@ -495,7 +522,7 @@ class ChatterActionManager:
""" """
current_time = time.time() current_time = time.time()
# 计算新消息数量 # 计算新消息数量
new_message_count = message_api.count_new_messages( new_message_count = await message_api.count_new_messages(
chat_id=chat_stream.stream_id, start_time=thinking_start_time, end_time=current_time chat_id=chat_stream.stream_id, start_time=thinking_start_time, end_time=current_time
) )

View File

@@ -1651,7 +1651,7 @@ class DefaultReplyer:
user_id = user_info.get("user_id", "unknown") user_id = user_info.get("user_id", "unknown")
# 从数据库获取关系数据 # 从数据库获取关系数据
relationship_data = relationship_tracker._get_user_relationship_from_db(user_id) relationship_data = await relationship_tracker._get_user_relationship_from_db(user_id)
if relationship_data: if relationship_data:
relationship_text = relationship_data.get("relationship_text", "") relationship_text = relationship_data.get("relationship_text", "")
relationship_score = relationship_data.get("relationship_score", 0.3) relationship_score = relationship_data.get("relationship_score", 0.3)

View File

@@ -146,7 +146,6 @@ async def generate_reply(
from_plugin=from_plugin, from_plugin=from_plugin,
stream_id=chat_stream.stream_id if chat_stream else chat_id, stream_id=chat_stream.stream_id if chat_stream else chat_id,
reply_message=reply_message, reply_message=reply_message,
read_mark=read_mark,
) )
if not success: if not success:
logger.warning("[GeneratorAPI] 回复生成失败") logger.warning("[GeneratorAPI] 回复生成失败")

View File

@@ -483,8 +483,40 @@ class ChatterPlanFilter:
target_message_obj = None target_message_obj = None
if action not in ["no_action", "no_reply", "do_nothing", "proactive_reply"]: if action not in ["no_action", "no_reply", "do_nothing", "proactive_reply"]:
if target_message_id := action_data.get("target_message_id"): original_target_id = action_data.get("target_message_id")
target_message_dict = self._find_message_by_id(target_message_id, message_id_list)
if original_target_id:
# 记录原始ID用于调试
logger.debug(f"[{action}] 尝试查找目标消息: {original_target_id}")
# 使用增强的查找函数
target_message_dict = self._find_message_by_id(original_target_id, message_id_list)
if not target_message_dict:
logger.warning(f"[{action}] 未找到目标消息: {original_target_id}")
# 根据动作类型采用不同的恢复策略
if action == "reply":
# reply动作必须有目标消息使用最新消息作为兜底
target_message_dict = self._get_latest_message(message_id_list)
if target_message_dict:
logger.info(f"[{action}] 使用最新消息作为目标: {target_message_dict.get('message_id')}")
else:
logger.error(f"[{action}] 无法找到任何目标消息降级为no_action")
action = "no_action"
reasoning = f"无法找到目标消息进行回复。原始理由: {reasoning}"
elif action in ["poke_user", "set_emoji_like"]:
# 这些动作可以尝试其他策略
target_message_dict = self._find_poke_notice(message_id_list) or self._get_latest_message(message_id_list)
if target_message_dict:
logger.info(f"[{action}] 使用替代消息作为目标: {target_message_dict.get('message_id')}")
else:
# 其他动作使用最新消息或跳过
target_message_dict = self._get_latest_message(message_id_list)
if target_message_dict:
logger.info(f"[{action}] 使用最新消息作为目标: {target_message_dict.get('message_id')}")
else: else:
# 如果LLM没有指定target_message_id进行特殊处理 # 如果LLM没有指定target_message_id进行特殊处理
if action == "poke_user": if action == "poke_user":
@@ -505,19 +537,27 @@ class ChatterPlanFilter:
real_message_id = target_message_dict.get("message_id") or target_message_dict.get("id") real_message_id = target_message_dict.get("message_id") or target_message_dict.get("id")
if real_message_id: if real_message_id:
action_data["target_message_id"] = real_message_id action_data["target_message_id"] = real_message_id
logger.debug(f"[{action}] 更新目标消息ID: {original_target_id} -> {real_message_id}")
# 确保 action_message 中始终有 message_id 字段
if "message_id" not in target_message_obj and "id" in target_message_obj:
target_message_obj["message_id"] = target_message_obj["id"]
else: else:
# 如果找不到目标消息对于reply动作来说这是必需的应该记录警告 logger.warning(f"[{action}] 最终未找到任何可用的目标消息")
if action == "reply": if action == "reply":
logger.warning( # reply动作如果没有目标消息降级为no_action
f"reply动作找不到目标消息target_message_id: {action_data.get('target_message_id')}"
)
# 将reply动作改为no_action避免后续执行时出错
action = "no_action" action = "no_action"
reasoning = f"到目标消息进行回复。原始理由: {reasoning}" reasoning = f"无法找到目标消息进行回复。原始理由: {reasoning}"
if target_message_obj:
# 确保 action_message 中始终有 message_id 字段
if "message_id" not in target_message_obj and "id" in target_message_obj:
target_message_obj["message_id"] = target_message_obj["id"]
else:
# 如果找不到目标消息对于reply动作来说这是必需的应该记录警告
if action == "reply":
logger.warning(
f"reply动作找不到目标消息target_message_id: {action_data.get('target_message_id')}"
)
# 将reply动作改为no_action避免后续执行时出错
action = "no_action"
reasoning = f"找不到目标消息进行回复。原始理由: {reasoning}"
if ( if (
action not in ["no_action", "no_reply", "reply", "do_nothing", "proactive_reply"] action not in ["no_action", "no_reply", "reply", "do_nothing", "proactive_reply"]
@@ -644,47 +684,107 @@ class ChatterPlanFilter:
return action_options_block return action_options_block
def _find_message_by_id(self, message_id: str, message_id_list: list) -> Optional[Dict[str, Any]]: def _find_message_by_id(self, message_id: str, message_id_list: list) -> Optional[Dict[str, Any]]:
# 兼容多种 message_id 格式数字、m123、buffered-xxxx """
# 如果是纯数字,补上 m 前缀以兼容旧格式 增强的消息查找函数,支持多种格式和模糊匹配
candidate_ids = {message_id} 兼容大模型可能返回的各种格式变体
if message_id.isdigit(): """
candidate_ids.add(f"m{message_id}") if not message_id or not message_id_list:
return None
# 如果是 m 开头且后面是数字,尝试去掉 m 前缀的数字形式 # 1. 标准化处理:去除可能的格式干扰
if message_id.startswith("m") and message_id[1:].isdigit(): original_id = str(message_id).strip()
candidate_ids.add(message_id[1:]) normalized_id = original_id.strip('<>"\'').strip()
# 逐项匹配 message_id_list每项可能为 {'id':..., 'message':...} if not normalized_id:
for item in message_id_list: return None
# 支持 message_id_list 中直接是字符串/ID 的情形
if isinstance(item, str):
if item in candidate_ids:
# 没有 message 对象返回None
return None
continue
if not isinstance(item, dict): # 2. 构建候选ID集合兼容各种可能的格式
continue candidate_ids = {normalized_id}
item_id = item.get("id") # 处理纯数字格式 (123 -> m123)
# 直接匹配分配的短 id if normalized_id.isdigit():
if item_id and item_id in candidate_ids: candidate_ids.add(f"m{normalized_id}")
return item.get("message")
# 有时 message 存储里会有原始的 message_id 字段(如 buffered-xxxx # 处理m前缀格式 (m123 -> 123)
message_obj = item.get("message") if normalized_id.startswith("m") and normalized_id[1:].isdigit():
if isinstance(message_obj, dict): candidate_ids.add(normalized_id[1:])
orig_mid = message_obj.get("message_id") or message_obj.get("id")
if orig_mid and orig_mid in candidate_ids:
return message_obj
# 作为兜底,尝试在 message_id_list 中找到 message.message_id 匹配 # 处理包含在文本中的ID格式 (如 "消息m123" -> 提取 m123)
for item in message_id_list: import re
if isinstance(item, dict) and isinstance(item.get("message"), dict): # 尝试提取各种格式的ID
mid = item["message"].get("message_id") or item["message"].get("id") id_patterns = [
if mid == message_id: r'm\d+', # m123格式
return item["message"] r'\d+', # 纯数字格式
r'buffered-[a-f0-9-]+', # buffered-xxxx格式
r'[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{12}', # UUID格式
]
for pattern in id_patterns:
matches = re.findall(pattern, normalized_id)
for match in matches:
candidate_ids.add(match)
# 3. 尝试精确匹配
for candidate in candidate_ids:
for item in message_id_list:
if isinstance(item, str):
if item == candidate:
# 字符串类型没有message对象返回None
return None
continue
if not isinstance(item, dict):
continue
# 匹配短ID
item_id = item.get("id")
if item_id and item_id == candidate:
return item.get("message")
# 匹配原始消息ID
message_obj = item.get("message")
if isinstance(message_obj, dict):
orig_mid = message_obj.get("message_id") or message_obj.get("id")
if orig_mid and orig_mid == candidate:
return message_obj
# 4. 尝试模糊匹配(数字部分匹配)
for candidate in candidate_ids:
# 提取数字部分进行模糊匹配
number_part = re.sub(r'[^0-9]', '', candidate)
if number_part:
for item in message_id_list:
if isinstance(item, dict):
item_id = item.get("id", "")
item_number = re.sub(r'[^0-9]', '', item_id)
# 数字部分匹配
if item_number == number_part:
logger.debug(f"模糊匹配成功: {candidate} -> {item_id}")
return item.get("message")
# 检查消息对象中的ID
message_obj = item.get("message")
if isinstance(message_obj, dict):
orig_mid = message_obj.get("message_id") or message_obj.get("id")
orig_number = re.sub(r'[^0-9]', '', orig_mid)
if orig_number == number_part:
logger.debug(f"模糊匹配成功(消息对象): {candidate} -> {orig_mid}")
return message_obj
# 5. 兜底策略:返回最新消息
if message_id_list:
latest_item = message_id_list[-1]
if isinstance(latest_item, dict):
latest_message = latest_item.get("message")
if isinstance(latest_message, dict):
logger.warning(f"未找到精确匹配的消息ID {original_id},使用最新消息作为兜底")
return latest_message
elif latest_message is not None:
logger.warning(f"未找到精确匹配的消息ID {original_id},使用最新消息作为兜底")
return latest_message
logger.warning(f"未找到任何匹配的消息: {original_id} (候选: {candidate_ids})")
return None return None
@staticmethod @staticmethod