From 94e34c9370960e1935da71fa72900a97e2c5e301 Mon Sep 17 00:00:00 2001 From: tt-P607 <68868379+tt-P607@users.noreply.github.com> Date: Sat, 11 Oct 2025 02:11:32 +0800 Subject: [PATCH] =?UTF-8?q?fix(chat):=20=E4=BF=AE=E5=A4=8D=E6=B6=88?= =?UTF-8?q?=E6=81=AF=E6=89=93=E6=96=AD=E4=BC=9A=E5=8F=96=E6=B6=88=E6=AD=A3?= =?UTF-8?q?=E5=9C=A8=E8=BF=9B=E8=A1=8C=E7=9A=84=E5=9B=9E=E5=A4=8D=E4=BB=BB?= =?UTF-8?q?=E5=8A=A1=E7=9A=84=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 之前的消息打断逻辑会无差别地取消处理流中的所有任务。这会导致一个问题:当用户在机器人生成回复期间快速发送新消息时,回复任务会被意外中断,导致机器人无法正常完成回复。 本次修改通过引入 `is_replying` 状态来解决此问题: 1. 在 `StreamContext` 中新增 `is_replying` 状态标志,用于追踪回复生成过程。 2. 当开始生成回复时,设置该标志为 `True`,并在回复完成或取消后通过 `finally` 块确保其恢复为 `False`。 3. `MessageManager` 的打断检查逻辑现在会首先检查此标志,如果为 `True` 则跳过打断,从而保护正在进行的回复。 4. `cancel_all_stream_tasks` 也增加了 `exclude_reply` 选项,确保即使触发打断,也不会取消回复任务。 --- src/chat/chatter_manager.py | 18 ++++++++--- src/chat/message_manager/message_manager.py | 32 ++++++++----------- src/chat/planner_actions/action_manager.py | 3 ++ .../data_models/message_manager_data_model.py | 1 + 4 files changed, 31 insertions(+), 23 deletions(-) diff --git a/src/chat/chatter_manager.py b/src/chat/chatter_manager.py index ac9f880d8..5f9eec406 100644 --- a/src/chat/chatter_manager.py +++ b/src/chat/chatter_manager.py @@ -207,11 +207,12 @@ class ChatterManager: return active_tasks - def cancel_all_stream_tasks(self, stream_id: str) -> int: + def cancel_all_stream_tasks(self, stream_id: str, exclude_reply: bool = False) -> int: """取消指定流的所有处理任务(包括多重回复) Args: stream_id: 流ID + exclude_reply: 是否排除回复任务 Returns: int: 成功取消的任务数量 @@ -221,10 +222,15 @@ class ChatterManager: tasks = self._processing_tasks[stream_id] cancelled_count = 0 + remaining_tasks = [] - logger.info(f"开始取消流 {stream_id} 的所有处理任务,共 {len(tasks)} 个") + logger.info(f"开始取消流 {stream_id} 的处理任务,共 {len(tasks)} 个") for task in tasks: + if exclude_reply and "reply" in task.get_name().lower(): + remaining_tasks.append(task) + continue + try: if not task.done(): task.cancel() @@ -233,8 +239,12 @@ class ChatterManager: except Exception as e: logger.warning(f"取消任务时出错: {e}") - # 清理任务记录 - del self._processing_tasks[stream_id] + if remaining_tasks: + self._processing_tasks[stream_id] = remaining_tasks + else: + if stream_id in self._processing_tasks: + del self._processing_tasks[stream_id] + logger.info(f"流 {stream_id} 的任务取消完成,成功取消 {cancelled_count} 个任务") return cancelled_count diff --git a/src/chat/message_manager/message_manager.py b/src/chat/message_manager/message_manager.py index e1ad6308c..a49860ac5 100644 --- a/src/chat/message_manager/message_manager.py +++ b/src/chat/message_manager/message_manager.py @@ -352,11 +352,16 @@ class MessageManager: if not global_config.chat.interruption_enabled or not chat_stream: return - # 🌟 修复:获取所有处理任务(包括多重回复) + # 获取所有处理任务 all_processing_tasks = self.chatter_manager.get_all_processing_tasks(chat_stream.stream_id) if all_processing_tasks: - # 计算打断概率 - 使用新的线性概率模型 + # 检查是否有回复任务正在进行 + if chat_stream.context_manager.context.is_replying: + logger.debug(f"聊天流 {chat_stream.stream_id} 正在回复,跳过打断检查") + return + + # 计算打断概率 interruption_probability = chat_stream.context_manager.context.calculate_interruption_probability( global_config.chat.interruption_max_limit ) @@ -364,39 +369,28 @@ class MessageManager: # 检查是否已达到最大打断次数 if chat_stream.context_manager.context.interruption_count >= global_config.chat.interruption_max_limit: logger.debug( - f"聊天流 {chat_stream.stream_id} 已达到最大打断次数 {chat_stream.context_manager.context.interruption_count}/{global_config.chat.interruption_max_limit},跳过打断检查" + f"聊天流 {chat_stream.stream_id} 已达到最大打断次数,跳过打断检查" ) return # 根据概率决定是否打断 if random.random() < interruption_probability: - logger.info(f"聊天流 {chat_stream.stream_id} 触发消息打断,打断概率: {interruption_probability:.2f},检测到 {len(all_processing_tasks)} 个任务") + logger.info(f"聊天流 {chat_stream.stream_id} 触发消息打断 (概率: {interruption_probability:.2f})") - # 🌟 修复:取消所有任务(包括多重回复) - cancelled_count = self.chatter_manager.cancel_all_stream_tasks(chat_stream.stream_id) + # 取消所有非回复任务 + cancelled_count = self.chatter_manager.cancel_all_stream_tasks(chat_stream.stream_id, exclude_reply=True) if cancelled_count > 0: logger.info(f"消息打断成功取消 {cancelled_count} 个任务: {chat_stream.stream_id}") - else: - logger.warning(f"消息打断未能取消任何任务: {chat_stream.stream_id}") # 增加打断计数 await chat_stream.context_manager.context.increment_interruption_count() - # 🚀 新增:打断后立即重新进入聊天流程 + # 立即重新处理 await self._trigger_immediate_reprocess(chat_stream) - # 检查是否已达到最大次数 - if chat_stream.context_manager.context.interruption_count >= global_config.chat.interruption_max_limit: - logger.warning( - f"聊天流 {chat_stream.stream_id} 已达到最大打断次数 {chat_stream.context_manager.context.interruption_count}/{global_config.chat.interruption_max_limit},后续消息将不再打断" - ) - else: - logger.info( - f"聊天流 {chat_stream.stream_id} 已打断并重新进入处理流程,当前打断次数: {chat_stream.context_manager.context.interruption_count}/{global_config.chat.interruption_max_limit}" - ) else: - logger.debug(f"聊天流 {chat_stream.stream_id} 未触发打断,打断概率: {interruption_probability:.2f},检测到 {len(all_processing_tasks)} 个任务") + logger.debug(f"聊天流 {chat_stream.stream_id} 未触发打断 (概率: {interruption_probability:.2f})") async def _trigger_immediate_reprocess(self, chat_stream: ChatStream): """打断后立即重新进入聊天流程""" diff --git a/src/chat/planner_actions/action_manager.py b/src/chat/planner_actions/action_manager.py index e7ff21ad4..539a5e09c 100644 --- a/src/chat/planner_actions/action_manager.py +++ b/src/chat/planner_actions/action_manager.py @@ -248,6 +248,7 @@ class ChatterActionManager: else: # 生成回复 try: + chat_stream.context_manager.context.is_replying = True success, response_set, _ = await generator_api.generate_reply( chat_stream=chat_stream, reply_message=target_message, @@ -265,6 +266,8 @@ class ChatterActionManager: except asyncio.CancelledError: logger.debug(f"{log_prefix} 并行执行:回复生成任务已被取消") return {"action_type": "reply", "success": False, "reply_text": "", "loop_info": None} + finally: + chat_stream.context_manager.context.is_replying = False # 发送并存储回复 loop_info, reply_text, cycle_timers_reply = await self._send_and_store_reply( diff --git a/src/common/data_models/message_manager_data_model.py b/src/common/data_models/message_manager_data_model.py index 7a967c55c..9a462586e 100644 --- a/src/common/data_models/message_manager_data_model.py +++ b/src/common/data_models/message_manager_data_model.py @@ -42,6 +42,7 @@ class StreamContext(BaseDataModel): processing_task: asyncio.Task | None = None interruption_count: int = 0 # 打断计数器 last_interruption_time: float = 0.0 # 上次打断时间 + is_replying: bool = False # 是否正在回复 # 独立分发周期字段 next_check_time: float = field(default_factory=time.time) # 下次检查时间