feat(chat): 添加消息重复处理防护机制

在StreamContext中添加processing_message_id字段,用于跟踪当前正在处理的目标消息ID。在AffinityFlowChatter规划器中实现重复消息检测逻辑,防止对同一消息进行重复回复处理。

- 在增强版规划流程和Normal模式流程中添加目标消息ID检查
- 当检测到正在处理相同消息时返回no_action避免重复
- 在处理完成后清理processing_message_id标记
- 添加详细的日志记录用于调试和监控
This commit is contained in:
Windpicker-owo
2025-10-29 07:40:26 +08:00
parent 4e024656ff
commit e26940dd4e
2 changed files with 70 additions and 1 deletions

View File

@@ -55,6 +55,7 @@ class StreamContext(BaseDataModel):
priority_info: dict | None = None
triggering_user_id: str | None = None # 触发当前聊天流的用户ID
is_replying: bool = False # 是否正在生成回复
processing_message_id: str | None = None # 当前正在规划/处理的目标消息ID用于防止重复回复
def add_action_to_message(self, message_id: str, action: str):
"""

View File

@@ -180,6 +180,37 @@ class ChatterActionPlanner:
plan_filter = ChatterPlanFilter(self.chat_id, available_actions)
filtered_plan = await plan_filter.filter(reply_not_available, initial_plan)
# 4.5 检查是否正在处理相同的目标消息,防止重复回复
target_message_id = None
for action in filtered_plan.decided_actions:
if action.action_type in ["reply", "proactive_reply"] and action.action_message:
# 提取目标消息ID
if hasattr(action.action_message, 'message_id'):
target_message_id = action.action_message.message_id
elif isinstance(action.action_message, dict):
target_message_id = action.action_message.get('message_id')
break
# 如果找到目标消息ID检查是否已经在处理中
if target_message_id and context:
if context.processing_message_id == target_message_id:
logger.warning(
f"目标消息 {target_message_id} 已经在处理中,跳过本次规划以防止重复回复"
)
# 返回 no_action避免重复处理
from src.common.data_models.info_data_model import ActionPlannerInfo
no_action = ActionPlannerInfo(
action_type="no_action",
reasoning=f"目标消息 {target_message_id} 已经在处理中,跳过以防止重复回复",
action_data={},
action_message=None,
)
return [asdict(no_action)], None
else:
# 记录当前正在处理的消息ID
context.processing_message_id = target_message_id
logger.debug(f"开始处理目标消息: {target_message_id}")
# 5. 使用 PlanExecutor 执行 Plan
execution_result = await self.executor.execute(filtered_plan)
@@ -197,12 +228,20 @@ class ChatterActionPlanner:
context.chat_mode = ChatMode.NORMAL
await self._sync_chat_mode_to_stream(context)
# 8. 返回结果
# 8. 清理处理标记
if context:
context.processing_message_id = None
logger.debug(f"已清理处理标记,完成规划流程")
# 9. 返回结果
return self._build_return_result(filtered_plan)
except Exception as e:
logger.error(f"增强版规划流程出错: {e}")
self.planner_stats["failed_plans"] += 1
# 清理处理标记
if context:
context.processing_message_id = None
return [], None
async def _normal_mode_flow(self, context: "StreamContext | None") -> tuple[list[dict[str, Any]], Any | None]:
@@ -238,6 +277,27 @@ class ChatterActionPlanner:
break
if should_reply and target_message:
# 检查是否正在处理相同的目标消息,防止重复回复
target_message_id = target_message.message_id
if context and context.processing_message_id == target_message_id:
logger.warning(
f"Normal模式: 目标消息 {target_message_id} 已经在处理中,跳过本次规划以防止重复回复"
)
# 返回 no_action避免重复处理
from src.common.data_models.info_data_model import ActionPlannerInfo
no_action = ActionPlannerInfo(
action_type="no_action",
reasoning=f"目标消息 {target_message_id} 已经在处理中,跳过以防止重复回复",
action_data={},
action_message=None,
)
return [asdict(no_action)], None
# 记录当前正在处理的消息ID
if context:
context.processing_message_id = target_message_id
logger.debug(f"Normal模式: 开始处理目标消息: {target_message_id}")
# 达到reply阈值直接进入回复流程
from src.common.data_models.info_data_model import ActionPlannerInfo, Plan
from src.plugin_system.base.component_types import ChatType
@@ -267,6 +327,11 @@ class ChatterActionPlanner:
logger.info("Normal模式: 执行reply动作完成")
# 清理处理标记
if context:
context.processing_message_id = None
logger.debug(f"Normal模式: 已清理处理标记")
# 无论是否回复都进行退出normal模式的判定
await self._check_exit_normal_mode(context)
@@ -290,6 +355,9 @@ class ChatterActionPlanner:
except Exception as e:
logger.error(f"Normal模式流程出错: {e}")
self.planner_stats["failed_plans"] += 1
# 清理处理标记
if context:
context.processing_message_id = None
return [], None
async def _check_exit_normal_mode(self, context: "StreamContext | None") -> None: