diff --git a/src/common/data_models/message_manager_data_model.py b/src/common/data_models/message_manager_data_model.py index 0a49b23a6..81e8a7764 100644 --- a/src/common/data_models/message_manager_data_model.py +++ b/src/common/data_models/message_manager_data_model.py @@ -55,6 +55,7 @@ class StreamContext(BaseDataModel): priority_info: dict | None = None triggering_user_id: str | None = None # 触发当前聊天流的用户ID is_replying: bool = False # 是否正在生成回复 + processing_message_id: str | None = None # 当前正在规划/处理的目标消息ID,用于防止重复回复 def add_action_to_message(self, message_id: str, action: str): """ diff --git a/src/plugins/built_in/affinity_flow_chatter/planner.py b/src/plugins/built_in/affinity_flow_chatter/planner.py index b9644fcf7..a24059c05 100644 --- a/src/plugins/built_in/affinity_flow_chatter/planner.py +++ b/src/plugins/built_in/affinity_flow_chatter/planner.py @@ -189,6 +189,37 @@ class ChatterActionPlanner: plan_filter = ChatterPlanFilter(self.chat_id, available_actions) filtered_plan = await plan_filter.filter(reply_not_available, initial_plan) + # 4.5 检查是否正在处理相同的目标消息,防止重复回复 + target_message_id = None + for action in filtered_plan.decided_actions: + if action.action_type in ["reply", "proactive_reply"] and action.action_message: + # 提取目标消息ID + if hasattr(action.action_message, 'message_id'): + target_message_id = action.action_message.message_id + elif isinstance(action.action_message, dict): + target_message_id = action.action_message.get('message_id') + break + + # 如果找到目标消息ID,检查是否已经在处理中 + if target_message_id and context: + if context.processing_message_id == target_message_id: + logger.warning( + f"目标消息 {target_message_id} 已经在处理中,跳过本次规划以防止重复回复" + ) + # 返回 no_action,避免重复处理 + from src.common.data_models.info_data_model import ActionPlannerInfo + no_action = ActionPlannerInfo( + action_type="no_action", + reasoning=f"目标消息 {target_message_id} 已经在处理中,跳过以防止重复回复", + action_data={}, + action_message=None, + ) + return [asdict(no_action)], None + else: + # 记录当前正在处理的消息ID + context.processing_message_id = target_message_id + logger.debug(f"开始处理目标消息: {target_message_id}") + # 5. 使用 PlanExecutor 执行 Plan execution_result = await self.executor.execute(filtered_plan) @@ -209,12 +240,20 @@ class ChatterActionPlanner: context.chat_mode = ChatMode.NORMAL await self._sync_chat_mode_to_stream(context) - # 8. 返回结果 + # 8. 清理处理标记 + if context: + context.processing_message_id = None + logger.debug(f"已清理处理标记,完成规划流程") + + # 9. 返回结果 return self._build_return_result(filtered_plan) except Exception as e: logger.error(f"增强版规划流程出错: {e}") self.planner_stats["failed_plans"] += 1 + # 清理处理标记 + if context: + context.processing_message_id = None return [], None async def _normal_mode_flow(self, context: "StreamContext | None") -> tuple[list[dict[str, Any]], Any | None]: @@ -258,6 +297,27 @@ class ChatterActionPlanner: break if should_reply and target_message: + # 检查是否正在处理相同的目标消息,防止重复回复 + target_message_id = target_message.message_id + if context and context.processing_message_id == target_message_id: + logger.warning( + f"Normal模式: 目标消息 {target_message_id} 已经在处理中,跳过本次规划以防止重复回复" + ) + # 返回 no_action,避免重复处理 + from src.common.data_models.info_data_model import ActionPlannerInfo + no_action = ActionPlannerInfo( + action_type="no_action", + reasoning=f"目标消息 {target_message_id} 已经在处理中,跳过以防止重复回复", + action_data={}, + action_message=None, + ) + return [asdict(no_action)], None + + # 记录当前正在处理的消息ID + if context: + context.processing_message_id = target_message_id + logger.debug(f"Normal模式: 开始处理目标消息: {target_message_id}") + # 达到reply阈值,直接进入回复流程 from src.common.data_models.info_data_model import ActionPlannerInfo, Plan from src.plugin_system.base.component_types import ChatType @@ -287,6 +347,11 @@ class ChatterActionPlanner: logger.info("Normal模式: 执行reply动作完成") + # 清理处理标记 + if context: + context.processing_message_id = None + logger.debug(f"Normal模式: 已清理处理标记") + # 无论是否回复,都进行退出normal模式的判定 await self._check_exit_normal_mode(context) @@ -310,6 +375,9 @@ class ChatterActionPlanner: except Exception as e: logger.error(f"Normal模式流程出错: {e}") self.planner_stats["failed_plans"] += 1 + # 清理处理标记 + if context: + context.processing_message_id = None return [], None async def _check_exit_normal_mode(self, context: "StreamContext | None") -> None: