diff --git a/src/chat/message_manager/distribution_manager.py b/src/chat/message_manager/distribution_manager.py index 966e6ea10..4863b2387 100644 --- a/src/chat/message_manager/distribution_manager.py +++ b/src/chat/message_manager/distribution_manager.py @@ -249,6 +249,10 @@ class StreamLoopManager: self.stats["total_process_cycles"] += 1 if success: logger.info(f"✅ [流工作器] stream={stream_id[:8]}, 任务ID={task_id}, 处理成功") + + # 🔒 处理成功后,等待一小段时间确保清理操作完成 + # 这样可以避免在 chatter_manager 清除未读消息之前就进入下一轮循环 + await asyncio.sleep(0.1) else: self.stats["total_failures"] += 1 logger.warning(f"❌ [流工作器] stream={stream_id[:8]}, 任务ID={task_id}, 处理失败") @@ -347,15 +351,9 @@ class StreamLoopManager: # 设置处理状态为正在处理 self._set_stream_processing_status(stream_id, True) - # 子任务跟踪 - 存储到 context 中以便打断时可以访问 - child_tasks = set() - context._active_process_tasks = child_tasks # 临时存储子任务集合 - + chatter_task = None try: start_time = time.time() - - # 注意:缓存消息刷新已移至planner开始时执行(动作修改器之后),此处不再刷新 - # 检查未读消息,如果为空则直接返回(优化:避免无效的 chatter 调用) unread_messages = context.get_unread_messages() if not unread_messages: @@ -370,9 +368,7 @@ class StreamLoopManager: context.triggering_user_id = last_message.user_info.user_id # 创建子任务用于刷新能量(不阻塞主流程) - energy_task = asyncio.create_task(self._refresh_focus_energy(stream_id)) - child_tasks.add(energy_task) - energy_task.add_done_callback(lambda t: child_tasks.discard(t)) + asyncio.create_task(self._refresh_focus_energy(stream_id)) # 设置 Chatter 正在处理的标志 context.is_chatter_processing = True @@ -383,9 +379,7 @@ class StreamLoopManager: self.chatter_manager.process_stream_context(stream_id, context), name=f"chatter_process_{stream_id}" ) - child_tasks.add(chatter_task) - context._chatter_task = chatter_task # 保存主 chatter 任务引用 - + # 等待 chatter 任务完成 results = await chatter_task success = results.get("success", False) @@ -401,47 +395,19 @@ class StreamLoopManager: else: logger.warning(f"流处理失败: {stream_id} - {results.get('error_message', '未知错误')}") - return success - - except asyncio.CancelledError: - logger.info(f"流处理被取消,正在清理所有子任务: {stream_id}") - # 取消所有子任务(包括 chatter 任务和其后台任务) - cancel_count = 0 - for child_task in list(child_tasks): # 使用 list() 创建副本避免迭代时修改 - if not child_task.done(): - child_task.cancel() - cancel_count += 1 - - if cancel_count > 0: - logger.info(f"已取消 {cancel_count} 个子任务: {stream_id}") - # 等待所有子任务真正结束(设置短超时) - try: - await asyncio.wait_for( - asyncio.gather(*child_tasks, return_exceptions=True), - timeout=1.0 - ) - except asyncio.TimeoutError: - logger.warning(f"等待子任务取消超时: {stream_id}") - + return success + except asyncio.CancelledError: + if chatter_task and not chatter_task.done(): + chatter_task.cancel() raise except Exception as e: logger.error(f"流处理异常: {stream_id} - {e}", exc_info=True) - # 异常时也要清理子任务 - for child_task in child_tasks: - if not child_task.done(): - child_task.cancel() return False finally: # 清除 Chatter 处理标志 context.is_chatter_processing = False logger.debug(f"清除 Chatter 处理标志: {stream_id}") - # 清理临时存储的任务引用 - if hasattr(context, '_active_process_tasks'): - delattr(context, '_active_process_tasks') - if hasattr(context, '_chatter_task'): - delattr(context, '_chatter_task') - # 无论成功或失败,都要设置处理状态为未处理 self._set_stream_processing_status(stream_id, False) diff --git a/src/chat/message_manager/message_manager.py b/src/chat/message_manager/message_manager.py index 609d203aa..2d5d4c3bc 100644 --- a/src/chat/message_manager/message_manager.py +++ b/src/chat/message_manager/message_manager.py @@ -436,11 +436,7 @@ class MessageManager: logger.error(f"清除未读消息时发生错误: {e}") async def clear_stream_unread_messages(self, stream_id: str): - """清除指定聊天流的所有未读消息 - - 此方法会标记剩余的未读消息为已读(移到历史),作为兜底保护。 - 正常情况下,action_manager 应该已经标记了所有消息,这里只是确保没有遗漏。 - """ + """清除指定聊天流的所有未读消息""" try: chat_manager = get_chat_manager() chat_stream = await chat_manager.get_stream(stream_id) @@ -453,9 +449,7 @@ class MessageManager: unread_count = len(context.unread_messages) # 如果还有未读消息,说明 action_manager 可能遗漏了,标记它们 - if unread_count > 0: - logger.debug(f"🧹 [兜底清理] stream={stream_id[:8]}, 发现 {unread_count} 条剩余未读消息,标记为已读") - + if unread_count > 0: # 获取所有未读消息的 ID message_ids = [msg.message_id for msg in context.unread_messages] @@ -463,9 +457,8 @@ class MessageManager: success = chat_stream.context_manager.mark_messages_as_read(message_ids) if success: - logger.debug(f"✅ [兜底清理] stream={stream_id[:8]}, 成功标记 {unread_count} 条消息为已读") + logger.debug(f"✅ stream={stream_id[:8]}, 成功标记 {unread_count} 条消息为已读") else: - logger.warning(f"⚠️ [兜底清理] stream={stream_id[:8]}, 标记失败,直接清空") context.unread_messages.clear() else: logger.debug(f"流 {stream_id[:8]} 没有剩余未读消息,无需清理") diff --git a/src/chat/planner_actions/action_manager.py b/src/chat/planner_actions/action_manager.py index 2b2f4ed05..f979c1bea 100644 --- a/src/chat/planner_actions/action_manager.py +++ b/src/chat/planner_actions/action_manager.py @@ -215,9 +215,6 @@ class ChatterActionManager: action_name="no_reply", ) - # 自动清空所有未读消息(改为同步等待) - await self._clear_all_unread_messages(chat_stream.stream_id, "no_reply") - return {"action_type": "no_reply", "success": True, "reply_text": "", "command": ""} elif action_name != "reply" and action_name != "respond" and action_name != "no_action": @@ -235,9 +232,6 @@ class ChatterActionManager: # 记录执行的动作到目标消息(改为同步等待) if success: await self._record_action_to_message(chat_stream, action_name, target_message, action_data) - # 自动清空所有未读消息 - if clear_unread_messages: - await self._clear_all_unread_messages(chat_stream.stream_id, action_name) # 重置打断计数 await self._reset_interruption_count_after_action(chat_stream.stream_id) @@ -322,9 +316,6 @@ class ChatterActionManager: # 记录回复动作到目标消息 await self._record_action_to_message(chat_stream, action_name, target_message, action_data) - if clear_unread_messages: - await self._clear_all_unread_messages(chat_stream.stream_id, action_name) - # 回复成功,重置打断计数(改为同步等待) await self._reset_interruption_count_after_action(chat_stream.stream_id) @@ -401,24 +392,6 @@ class ChatterActionManager: except Exception as e: logger.warning(f"重置打断计数时出错: {e}") - async def _clear_all_unread_messages(self, stream_id: str, action_name: str): - """在动作执行成功后自动清空所有未读消息 - - Args: - stream_id: 聊天流ID - action_name: 动作名称 - """ - try: - from src.chat.message_manager.message_manager import message_manager - - # 清空所有未读消息 - await message_manager.clear_all_unread_messages(stream_id) - logger.debug(f"[{action_name}] 已自动清空聊天流 {stream_id} 的所有未读消息") - - except Exception as e: - logger.error(f"[{action_name}] 自动清空未读消息时出错: {e}") - # 不抛出异常,避免影响主要功能 - async def _handle_action( self, chat_stream, action, reasoning, action_data, cycle_timers, thinking_id, action_message ) -> tuple[bool, str, str]: diff --git a/src/plugins/built_in/affinity_flow_chatter/planner/plan_executor.py b/src/plugins/built_in/affinity_flow_chatter/planner/plan_executor.py index 1cde6b000..4adaf1ade 100644 --- a/src/plugins/built_in/affinity_flow_chatter/planner/plan_executor.py +++ b/src/plugins/built_in/affinity_flow_chatter/planner/plan_executor.py @@ -78,8 +78,9 @@ class ChatterPlanExecutor: other_actions = [] # 分类动作:回复动作和其他动作 + # 回复类动作包括:reply, proactive_reply, respond for action_info in plan.decided_actions: - if action_info.action_type in ["reply", "proactive_reply"]: + if action_info.action_type in ["reply", "proactive_reply", "respond"]: reply_actions.append(action_info) else: other_actions.append(action_info) diff --git a/src/plugins/built_in/affinity_flow_chatter/planner/plan_filter.py b/src/plugins/built_in/affinity_flow_chatter/planner/plan_filter.py index 360f533b7..7bf2b4360 100644 --- a/src/plugins/built_in/affinity_flow_chatter/planner/plan_filter.py +++ b/src/plugins/built_in/affinity_flow_chatter/planner/plan_filter.py @@ -363,9 +363,6 @@ class ChatterPlanFilter: stream_context = chat_stream.context_manager - # 🔥 确保历史消息已从数据库加载 - await stream_context.ensure_history_initialized() - # 获取真正的已读和未读消息 read_messages = stream_context.context.history_messages # 已读消息存储在history_messages中 if not read_messages: