From aab3f19f10048db39f7208ddca4bbf3198a07756 Mon Sep 17 00:00:00 2001 From: Windpicker-owo <3431391539@qq.com> Date: Mon, 10 Nov 2025 17:12:20 +0800 Subject: [PATCH] =?UTF-8?q?feat(chatter=5Fmanager):=20=E6=B8=85=E7=90=86?= =?UTF-8?q?=20processing=5Fmessage=5Fid=20=E4=BB=A5=E9=98=B2=E6=AD=A2?= =?UTF-8?q?=E9=87=8D=E5=A4=8D=E5=9B=9E=E5=A4=8D=E6=A3=80=E6=B5=8B=E5=A4=B1?= =?UTF-8?q?=E6=95=88=20feat(distribution=5Fmanager):=20=E6=B7=BB=E5=8A=A0?= =?UTF-8?q?=E5=AD=90=E4=BB=BB=E5=8A=A1=E8=B7=9F=E8=B8=AA=E5=92=8C=E5=8F=96?= =?UTF-8?q?=E6=B6=88=E9=80=BB=E8=BE=91=EF=BC=8C=E4=BC=98=E5=8C=96=E6=B5=81?= =?UTF-8?q?=E5=A4=84=E7=90=86=20feat(default=5Fgenerator):=20=E4=BC=98?= =?UTF-8?q?=E5=8C=96=20respond=20=E5=92=8C=20reply=20=E6=A8=A1=E5=BC=8F?= =?UTF-8?q?=E4=B8=8B=E7=9A=84=E6=B6=88=E6=81=AF=E5=A4=84=E7=90=86=E9=80=BB?= =?UTF-8?q?=E8=BE=91=20feat(affinity=5Fchatter):=20=E5=A4=84=E7=90=86?= =?UTF-8?q?=E5=8F=96=E6=B6=88=E5=BC=82=E5=B8=B8=E6=97=B6=E6=B8=85=E7=90=86?= =?UTF-8?q?=20processing=5Fmessage=5Fid=20feat(planner):=20=E7=A1=AE?= =?UTF-8?q?=E4=BF=9D=E5=9C=A8=E8=A7=84=E5=88=92=E6=B5=81=E7=A8=8B=E5=8F=96?= =?UTF-8?q?=E6=B6=88=E6=97=B6=E6=B8=85=E7=90=86=20processing=5Fmessage=5Fi?= =?UTF-8?q?d?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/chat/chatter_manager.py | 6 +- .../message_manager/distribution_manager.py | 41 ++++++++++-- src/chat/replyer/default_generator.py | 67 ++++++++++++++----- .../core/affinity_chatter.py | 9 +++ .../affinity_flow_chatter/planner/planner.py | 24 +++++++ 5 files changed, 124 insertions(+), 23 deletions(-) diff --git a/src/chat/chatter_manager.py b/src/chat/chatter_manager.py index 73e258158..8d64ba082 100644 --- a/src/chat/chatter_manager.py +++ b/src/chat/chatter_manager.py @@ -145,13 +145,17 @@ class ChatterManager: return result except asyncio.CancelledError: self.stats["failed_executions"] += 1 - logger.info(f"流 {stream_id} 处理被取消,不清空未读消息") + logger.info(f"流 {stream_id} 处理被取消") context.triggering_user_id = None # 清除触发用户ID + # 确保清理 processing_message_id 以防止重复回复检测失效 + context.processing_message_id = None raise except Exception as e: self.stats["failed_executions"] += 1 logger.error(f"处理流 {stream_id} 时发生错误: {e}") context.triggering_user_id = None # 清除触发用户ID + # 确保清理 processing_message_id + context.processing_message_id = None raise finally: # 清除触发用户ID(所有情况下都需要) diff --git a/src/chat/message_manager/distribution_manager.py b/src/chat/message_manager/distribution_manager.py index bb2d922dd..7f7b1e65f 100644 --- a/src/chat/message_manager/distribution_manager.py +++ b/src/chat/message_manager/distribution_manager.py @@ -332,8 +332,9 @@ class StreamLoopManager: # 设置处理状态为正在处理 self._set_stream_processing_status(stream_id, True) - # 子任务跟踪 + # 子任务跟踪 - 存储到 context 中以便打断时可以访问 child_tasks = set() + context._active_process_tasks = child_tasks # 临时存储子任务集合 try: start_time = time.time() @@ -362,8 +363,16 @@ class StreamLoopManager: context.is_chatter_processing = True logger.debug(f"设置 Chatter 处理标志: {stream_id}") - # 直接调用chatter_manager处理流上下文 - results = await self.chatter_manager.process_stream_context(stream_id, context) + # 创建 chatter 处理任务,以便可以在打断时取消 + chatter_task = asyncio.create_task( + self.chatter_manager.process_stream_context(stream_id, context), + name=f"chatter_process_{stream_id}" + ) + child_tasks.add(chatter_task) + context._chatter_task = chatter_task # 保存主 chatter 任务引用 + + # 等待 chatter 任务完成 + results = await chatter_task success = results.get("success", False) if success: @@ -380,11 +389,25 @@ class StreamLoopManager: return success except asyncio.CancelledError: - logger.debug(f"流处理被取消: {stream_id}") - # 取消所有子任务 - for child_task in child_tasks: + logger.info(f"流处理被取消,正在清理所有子任务: {stream_id}") + # 取消所有子任务(包括 chatter 任务和其后台任务) + cancel_count = 0 + for child_task in list(child_tasks): # 使用 list() 创建副本避免迭代时修改 if not child_task.done(): child_task.cancel() + cancel_count += 1 + + if cancel_count > 0: + logger.info(f"已取消 {cancel_count} 个子任务: {stream_id}") + # 等待所有子任务真正结束(设置短超时) + try: + await asyncio.wait_for( + asyncio.gather(*child_tasks, return_exceptions=True), + timeout=1.0 + ) + except asyncio.TimeoutError: + logger.warning(f"等待子任务取消超时: {stream_id}") + raise except Exception as e: logger.error(f"流处理异常: {stream_id} - {e}", exc_info=True) @@ -398,6 +421,12 @@ class StreamLoopManager: context.is_chatter_processing = False logger.debug(f"清除 Chatter 处理标志: {stream_id}") + # 清理临时存储的任务引用 + if hasattr(context, '_active_process_tasks'): + delattr(context, '_active_process_tasks') + if hasattr(context, '_chatter_task'): + delattr(context, '_chatter_task') + # 无论成功或失败,都要设置处理状态为未处理 self._set_stream_processing_status(stream_id, False) diff --git a/src/chat/replyer/default_generator.py b/src/chat/replyer/default_generator.py index 78cf1bda4..271b87dbf 100644 --- a/src/chat/replyer/default_generator.py +++ b/src/chat/replyer/default_generator.py @@ -1162,24 +1162,59 @@ class DefaultReplyer: # 兼容旧的reply_to sender, target = self._parse_reply_target(reply_to) else: - # 获取 platform,如果不存在则从 chat_stream 获取,如果还是 None 则使用默认值 + # 对于 respond 动作,reply_message 可能为 None(统一回应未读消息) + # 对于 reply 动作,reply_message 必须存在(针对特定消息回复) if reply_message is None: - logger.warning("reply_message 为 None,无法构建prompt") - return "" - - # 统一处理 DatabaseMessages 对象和字典 - if isinstance(reply_message, DatabaseMessages): - platform = reply_message.chat_info.platform - user_id = reply_message.user_info.user_id - user_nickname = reply_message.user_info.user_nickname - user_cardname = reply_message.user_info.user_cardname - processed_plain_text = reply_message.processed_plain_text + # respond 模式:没有特定目标消息,使用通用的 sender 和 target + if prompt_mode == "normal": + # 从未读消息中获取最新的消息作为参考 + from src.plugin_system.apis.chat_api import get_chat_manager + chat_manager = get_chat_manager() + chat_stream_obj = await chat_manager.get_stream(chat_id) + + if chat_stream_obj: + unread_messages = chat_stream_obj.context_manager.get_unread_messages() + if unread_messages: + # 使用最后一条未读消息作为参考 + last_msg = unread_messages[-1] + platform = last_msg.chat_info.platform if hasattr(last_msg, 'chat_info') else chat_stream.platform + user_id = last_msg.user_info.user_id if hasattr(last_msg, 'user_info') else "" + user_nickname = last_msg.user_info.user_nickname if hasattr(last_msg, 'user_info') else "" + user_cardname = last_msg.user_info.user_cardname if hasattr(last_msg, 'user_info') else "" + processed_plain_text = last_msg.processed_plain_text or "" + else: + # 没有未读消息,使用默认值 + platform = chat_stream.platform + user_id = "" + user_nickname = "" + user_cardname = "" + processed_plain_text = "" + else: + # 无法获取 chat_stream,使用默认值 + platform = chat_stream.platform + user_id = "" + user_nickname = "" + user_cardname = "" + processed_plain_text = "" + else: + # reply 模式下 reply_message 为 None 是错误的 + logger.warning("reply_message 为 None,但处于 reply 模式,无法构建prompt") + return "" else: - platform = reply_message.get("chat_info_platform") - user_id = reply_message.get("user_id") - user_nickname = reply_message.get("user_nickname") - user_cardname = reply_message.get("user_cardname") - processed_plain_text = reply_message.get("processed_plain_text") + # 有 reply_message,正常处理 + # 统一处理 DatabaseMessages 对象和字典 + if isinstance(reply_message, DatabaseMessages): + platform = reply_message.chat_info.platform + user_id = reply_message.user_info.user_id + user_nickname = reply_message.user_info.user_nickname + user_cardname = reply_message.user_info.user_cardname + processed_plain_text = reply_message.processed_plain_text + else: + platform = reply_message.get("chat_info_platform") + user_id = reply_message.get("user_id") + user_nickname = reply_message.get("user_nickname") + user_cardname = reply_message.get("user_cardname") + processed_plain_text = reply_message.get("processed_plain_text") person_id = person_info_manager.get_person_id( platform, # type: ignore diff --git a/src/plugins/built_in/affinity_flow_chatter/core/affinity_chatter.py b/src/plugins/built_in/affinity_flow_chatter/core/affinity_chatter.py index 2f24d458a..d26379689 100644 --- a/src/plugins/built_in/affinity_flow_chatter/core/affinity_chatter.py +++ b/src/plugins/built_in/affinity_flow_chatter/core/affinity_chatter.py @@ -103,10 +103,19 @@ class AffinityChatter(BaseChatter): return result + except asyncio.CancelledError: + logger.info(f"亲和力聊天处理器 {self.stream_id} 处理被取消") + self.stats["failed_executions"] += 1 + self.last_activity_time = time.time() + # 清理 processing_message_id + context.processing_message_id = None + raise except Exception as e: logger.error(f"亲和力聊天处理器 {self.stream_id} 处理StreamContext时出错: {e}\n{traceback.format_exc()}") self.stats["failed_executions"] += 1 self.last_activity_time = time.time() + # 清理 processing_message_id + context.processing_message_id = None return { "success": False, diff --git a/src/plugins/built_in/affinity_flow_chatter/planner/planner.py b/src/plugins/built_in/affinity_flow_chatter/planner/planner.py index 3ef07588e..6d664b3cb 100644 --- a/src/plugins/built_in/affinity_flow_chatter/planner/planner.py +++ b/src/plugins/built_in/affinity_flow_chatter/planner/planner.py @@ -78,9 +78,19 @@ class ChatterActionPlanner: return await self._enhanced_plan_flow(context) + except asyncio.CancelledError: + logger.info(f"规划流程被取消: {self.chat_id}") + self.planner_stats["failed_plans"] += 1 + # 确保清理 processing_message_id + if context: + context.processing_message_id = None + raise except Exception as e: logger.error(f"规划流程出错: {e}") self.planner_stats["failed_plans"] += 1 + # 确保清理 processing_message_id + if context: + context.processing_message_id = None return [], None async def _enhanced_plan_flow(self, context: "StreamContext | None") -> tuple[list[dict[str, Any]], Any | None]: @@ -257,6 +267,13 @@ class ChatterActionPlanner: # 14. 返回结果 return self._build_return_result(filtered_plan) + except asyncio.CancelledError: + logger.info(f"Focus模式流程被取消: {self.chat_id}") + self.planner_stats["failed_plans"] += 1 + # 清理处理标记 + if context: + context.processing_message_id = None + raise except Exception as e: logger.error(f"Focus模式流程出错: {e}") self.planner_stats["failed_plans"] += 1 @@ -394,6 +411,13 @@ class ChatterActionPlanner: return [asdict(no_action)], None + except asyncio.CancelledError: + logger.info(f"Normal模式流程被取消: {self.chat_id}") + self.planner_stats["failed_plans"] += 1 + # 清理处理标记 + if context: + context.processing_message_id = None + raise except Exception as e: logger.error(f"Normal模式 - 流程出错: {e}") self.planner_stats["failed_plans"] += 1