From a2c0afa75d0bf94f554f66e7d51689075a12aec3 Mon Sep 17 00:00:00 2001 From: Windpicker-owo <3431391539@qq.com> Date: Mon, 27 Oct 2025 17:16:36 +0800 Subject: [PATCH] =?UTF-8?q?refactor(chat):=20=E7=AE=80=E5=8C=96=E4=BB=BB?= =?UTF-8?q?=E5=8A=A1=E7=AE=A1=E7=90=86=E6=9E=B6=E6=9E=84=EF=BC=8C=E7=A7=BB?= =?UTF-8?q?=E9=99=A4=E5=A4=9A=E9=87=8D=E5=9B=9E=E5=A4=8D=E6=94=AF=E6=8C=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 移除 ChatterManager 中的复杂任务追踪逻辑(_processing_tasks) - 将流循环任务管理从 StreamLoopManager 转移到 StreamContext - 简化消息打断机制,通过取消 stream_loop_task 实现 - 移除多重回复相关功能,统一使用单一任务管理 - 优化错误处理和资源清理逻辑 BREAKING CHANGE: 移除了多重回复功能,所有流处理现在使用单一任务架构 --- src/chat/chatter_manager.py | 146 +--------------- .../message_manager/distribution_manager.py | 156 +++++++++--------- src/chat/message_manager/message_manager.py | 104 ++++-------- .../data_models/message_manager_data_model.py | 1 + 4 files changed, 117 insertions(+), 290 deletions(-) diff --git a/src/chat/chatter_manager.py b/src/chat/chatter_manager.py index 867957f67..221c5e93f 100644 --- a/src/chat/chatter_manager.py +++ b/src/chat/chatter_manager.py @@ -16,8 +16,6 @@ class ChatterManager: self.action_manager = action_manager self.chatter_classes: dict[ChatType, list[type]] = {} self.instances: dict[str, BaseChatter] = {} - # 🌟 优化:统一任务追踪,支持多重回复 - self._processing_tasks: dict[str, list[asyncio.Task]] = {} # 管理器统计 self.stats = { @@ -153,22 +151,21 @@ class ChatterManager: except asyncio.CancelledError: self.stats["failed_executions"] += 1 logger.info(f"流 {stream_id} 处理被取消,不清空未读消息") + context.triggering_user_id = None # 清除触发用户ID raise except Exception as e: self.stats["failed_executions"] += 1 logger.error(f"处理流 {stream_id} 时发生错误: {e}") + context.triggering_user_id = None # 清除触发用户ID raise finally: - # 无论成功还是失败,都要清理处理任务记录 - self.remove_processing_task(stream_id) - context.triggering_user_id = None # 清除触发用户ID - + # 清除触发用户ID(所有情况下都需要) + context.triggering_user_id = None def get_stats(self) -> dict[str, Any]: """获取管理器统计信息""" stats = self.stats.copy() stats["active_instances"] = len(self.instances) stats["registered_chatter_types"] = len(self.chatter_classes) - stats["active_processing_tasks"] = len(self.get_active_processing_tasks()) return stats def reset_stats(self): @@ -179,138 +176,3 @@ class ChatterManager: "successful_executions": 0, "failed_executions": 0, } - - def set_processing_task(self, stream_id: str, task: asyncio.Task): - """设置流的主要处理任务""" - if stream_id not in self._processing_tasks: - self._processing_tasks[stream_id] = [] - self._processing_tasks[stream_id].insert(0, task) # 主要任务放在第一位 - logger.debug(f"设置流 {stream_id} 的主要处理任务") - - def get_processing_task(self, stream_id: str) -> asyncio.Task | None: - """获取流的主要处理任务""" - tasks = self._processing_tasks.get(stream_id, []) - return tasks[0] if tasks and not tasks[0].done() else None - - def add_processing_task(self, stream_id: str, task: asyncio.Task): - """添加处理任务到流(支持多重回复)""" - if stream_id not in self._processing_tasks: - self._processing_tasks[stream_id] = [] - self._processing_tasks[stream_id].append(task) - logger.debug(f"添加处理任务到流 {stream_id},当前任务数: {len(self._processing_tasks[stream_id])}") - - def get_all_processing_tasks(self, stream_id: str) -> list[asyncio.Task]: - """获取流的所有活跃处理任务""" - if stream_id not in self._processing_tasks: - return [] - - # 清理已完成的任务并返回活跃任务 - active_tasks = [task for task in self._processing_tasks[stream_id] if not task.done()] - self._processing_tasks[stream_id] = active_tasks - - if len(active_tasks) == 0: - del self._processing_tasks[stream_id] - - return active_tasks - - def cancel_all_stream_tasks(self, stream_id: str) -> int: - """取消指定流的所有处理任务(包括多重回复) - - Args: - stream_id: 流ID - - Returns: - int: 成功取消的任务数量 - """ - if stream_id not in self._processing_tasks: - return 0 - - tasks = self._processing_tasks[stream_id] - cancelled_count = 0 - - logger.info(f"开始取消流 {stream_id} 的所有处理任务,共 {len(tasks)} 个") - - for task in tasks: - try: - if not task.done(): - task.cancel() - cancelled_count += 1 - logger.debug(f"成功取消任务 {task.get_name() if hasattr(task, 'get_name') else 'unnamed'}") - except Exception as e: - logger.warning(f"取消任务时出错: {e}") - - # 清理任务记录 - del self._processing_tasks[stream_id] - logger.info(f"流 {stream_id} 的任务取消完成,成功取消 {cancelled_count} 个任务") - return cancelled_count - - def cancel_processing_task(self, stream_id: str) -> bool: - """取消流的主要处理任务 - - Args: - stream_id: 流ID - - Returns: - bool: 是否成功取消了任务 - """ - main_task = self.get_processing_task(stream_id) - if main_task and not main_task.done(): - try: - main_task.cancel() - logger.info(f"已取消流 {stream_id} 的主要处理任务") - return True - except Exception as e: - logger.warning(f"取消流 {stream_id} 的主要处理任务时出错: {e}") - return False - return False - - def remove_processing_task(self, stream_id: str) -> None: - """移除流的处理任务记录 - - Args: - stream_id: 流ID - """ - if stream_id in self._processing_tasks: - del self._processing_tasks[stream_id] - logger.debug(f"已移除流 {stream_id} 的所有处理任务记录") - - def get_active_processing_tasks(self) -> dict[str, asyncio.Task]: - """获取所有活跃的主要处理任务 - - Returns: - Dict[str, asyncio.Task]: 流ID到主要处理任务的映射 - """ - # 过滤掉已完成的任务,只返回主要任务 - active_tasks = {} - for stream_id, task_list in list(self._processing_tasks.items()): - if task_list: - main_task = task_list[0] # 获取主要任务 - if not main_task.done(): - active_tasks[stream_id] = main_task - else: - # 清理已完成的主要任务 - task_list = [t for t in task_list if not t.done()] - if task_list: - self._processing_tasks[stream_id] = task_list - active_tasks[stream_id] = task_list[0] # 新的主要任务 - else: - del self._processing_tasks[stream_id] - logger.debug(f"清理已完成的处理任务: {stream_id}") - - return active_tasks - - async def cancel_all_processing_tasks(self) -> int: - """取消所有活跃的处理任务 - - Returns: - int: 成功取消的任务数量 - """ - active_tasks = self.get_active_processing_tasks() - cancelled_count = 0 - - for stream_id in active_tasks.keys(): - if self.cancel_processing_task(stream_id): - cancelled_count += 1 - - logger.info(f"已取消 {cancelled_count} 个活跃处理任务") - return cancelled_count diff --git a/src/chat/message_manager/distribution_manager.py b/src/chat/message_manager/distribution_manager.py index 48cc17e23..8caae1bf8 100644 --- a/src/chat/message_manager/distribution_manager.py +++ b/src/chat/message_manager/distribution_manager.py @@ -22,9 +22,6 @@ class StreamLoopManager: """流循环管理器 - 每个流一个独立的无限循环任务""" def __init__(self, max_concurrent_streams: int | None = None): - # 流循环任务管理 - self.stream_loops: dict[str, asyncio.Task] = {} - # 统计信息 self.stats: dict[str, Any] = { "active_streams": 0, @@ -68,12 +65,19 @@ class StreamLoopManager: # 取消所有流循环 try: + # 获取所有活跃的流 + from src.plugin_system.apis.chat_api import get_chat_manager + + chat_manager = get_chat_manager() + all_streams = await chat_manager.get_all_streams() + # 创建任务列表以便并发取消 cancel_tasks = [] - for stream_id, task in list(self.stream_loops.items()): - if not task.done(): - task.cancel() - cancel_tasks.append((stream_id, task)) + for chat_stream in all_streams: + context = chat_stream.context_manager.context + if context.stream_loop_task and not context.stream_loop_task.done(): + context.stream_loop_task.cancel() + cancel_tasks.append((chat_stream.stream_id, context.stream_loop_task)) # 并发等待所有任务取消 if cancel_tasks: @@ -83,15 +87,6 @@ class StreamLoopManager: return_exceptions=True, ) - # 取消所有活跃的 chatter 处理任务 - if self.chatter_manager: - try: - cancelled_count = await self.chatter_manager.cancel_all_processing_tasks() - logger.info(f"已取消 {cancelled_count} 个活跃的 chatter 处理任务") - except Exception as e: - logger.error(f"取消 chatter 处理任务时出错: {e}") - - self.stream_loops.clear() logger.info("所有流循环已清理") except Exception as e: logger.error(f"停止管理器时出错: {e}") @@ -108,8 +103,14 @@ class StreamLoopManager: Returns: bool: 是否成功启动 """ + # 获取流上下文 + context = await self._get_stream_context(stream_id) + if not context: + logger.warning(f"无法获取流上下文: {stream_id}") + return False + # 快速路径:如果流已存在,无需处理 - if stream_id in self.stream_loops: + if context.stream_loop_task and not context.stream_loop_task.done(): logger.debug(f"流 {stream_id} 循环已在运行") return True @@ -141,7 +142,10 @@ class StreamLoopManager: # 创建流循环任务 try: loop_task = asyncio.create_task(self._stream_loop_worker(stream_id), name=f"stream_loop_{stream_id}") - self.stream_loops[stream_id] = loop_task + + # 将任务记录到 StreamContext 中 + context.stream_loop_task = loop_task + # 更新统计信息 self.stats["active_streams"] += 1 self.stats["total_loops"] += 1 @@ -189,12 +193,18 @@ class StreamLoopManager: Returns: bool: 是否成功停止 """ - # 快速路径:如果流不存在,无需处理 - if stream_id not in self.stream_loops: - logger.debug(f"流 {stream_id} 循环不存在,无需停止") + # 获取流上下文 + context = await self._get_stream_context(stream_id) + if not context: + logger.debug(f"流 {stream_id} 上下文不存在,无需停止") return False - task = self.stream_loops[stream_id] + # 检查是否有 stream_loop_task + if not context.stream_loop_task or context.stream_loop_task.done(): + logger.debug(f"流 {stream_id} 循环不存在或已结束,无需停止") + return False + + task = context.stream_loop_task if not task.done(): task.cancel() try: @@ -207,14 +217,10 @@ class StreamLoopManager: except Exception as e: logger.error(f"等待流循环任务结束时出错: {stream_id} - {e}") - # 取消关联的 chatter 处理任务 - if self.chatter_manager: - cancelled = self.chatter_manager.cancel_processing_task(stream_id) - if cancelled: - logger.info(f"已取消关联的 chatter 处理任务: {stream_id}") - - del self.stream_loops[stream_id] - logger.info(f"停止流循环: {stream_id} (剩余: {len(self.stream_loops)})") + # 清空 StreamContext 中的任务记录 + context.stream_loop_task = None + + logger.info(f"停止流循环: {stream_id}") return True async def _stream_loop_worker(self, stream_id: str) -> None: @@ -277,13 +283,6 @@ class StreamLoopManager: except asyncio.CancelledError: logger.info(f"流循环被取消: {stream_id}") - if self.chatter_manager: - # 使用 ChatterManager 的新方法取消处理任务 - cancelled = self.chatter_manager.cancel_processing_task(stream_id) - if cancelled: - logger.info(f"成功取消 chatter 处理任务: {stream_id}") - else: - logger.debug(f"没有需要取消的 chatter 处理任务: {stream_id}") break except Exception as e: logger.error(f"流循环出错 {stream_id}: {e}", exc_info=True) @@ -291,10 +290,14 @@ class StreamLoopManager: await asyncio.sleep(5.0) # 错误时等待5秒再重试 finally: - # 清理循环标记 - if stream_id in self.stream_loops: - del self.stream_loops[stream_id] - logger.debug(f"清理流循环标记: {stream_id}") + # 清理 StreamContext 中的任务记录 + try: + context = await self._get_stream_context(stream_id) + if context and context.stream_loop_task: + context.stream_loop_task = None + logger.debug(f"清理 StreamContext 中的流循环任务: {stream_id}") + except Exception as e: + logger.debug(f"清理 StreamContext 任务记录失败: {e}") # 释放自适应管理器的槽位 try: @@ -384,9 +387,7 @@ class StreamLoopManager: energy_task.add_done_callback(lambda t: child_tasks.discard(t)) # 直接调用chatter_manager处理流上下文 - task = asyncio.create_task(self.chatter_manager.process_stream_context(stream_id, context)) - self.chatter_manager.set_processing_task(stream_id, task) - results = await task + results = await self.chatter_manager.process_stream_context(stream_id, context) success = results.get("success", False) if success: @@ -509,8 +510,11 @@ class StreamLoopManager: current_time = time.time() uptime = current_time - self.stats["start_time"] if self.is_running else 0 + # 从统计信息中获取活跃流数量 + active_streams = self.stats.get("active_streams", 0) + return { - "active_streams": len(self.stream_loops), + "active_streams": active_streams, "total_loops": self.stats["total_loops"], "max_concurrent": self.max_concurrent_streams, "is_running": self.is_running, @@ -573,9 +577,12 @@ class StreamLoopManager: # 计算吞吐量 throughput = self.stats["total_process_cycles"] / max(1, uptime / 3600) # 每小时处理次数 + # 从统计信息中获取活跃流数量 + active_streams = self.stats.get("active_streams", 0) + return { "uptime_hours": uptime / 3600, - "active_streams": len(self.stream_loops), + "active_streams": active_streams, "total_process_cycles": self.stats["total_process_cycles"], "total_failures": self.stats["total_failures"], "throughput_per_hour": throughput, @@ -627,48 +634,39 @@ class StreamLoopManager: logger.info(f"强制分发流处理: {stream_id}") try: - # 检查是否有现有的分发循环 - if stream_id in self.stream_loops: - logger.info(f"发现现有流循环 {stream_id},将先移除再重新创建") - existing_task = self.stream_loops[stream_id] - if not existing_task.done(): - existing_task.cancel() - # 创建异步任务来等待取消完成,并添加异常处理 - cancel_task = asyncio.create_task( - self._wait_for_task_cancel(stream_id, existing_task), name=f"cancel_existing_loop_{stream_id}" - ) - # 为取消任务添加异常处理,避免孤儿任务 - cancel_task.add_done_callback( - lambda task: logger.debug(f"取消任务完成: {stream_id}") - if not task.exception() - else logger.error(f"取消任务异常: {stream_id} - {task.exception()}") - ) - # 从字典中移除 - del self.stream_loops[stream_id] - - # 获取聊天管理器和流 - chat_manager = get_chat_manager() - chat_stream = await chat_manager.get_stream(stream_id) - if not chat_stream: - logger.warning(f"强制分发时未找到流: {stream_id}") - return - # 获取流上下文 - context = chat_stream.context_manager.context + context = await self._get_stream_context(stream_id) if not context: logger.warning(f"强制分发时未找到流上下文: {stream_id}") return + # 检查是否有现有的 stream_loop_task + if context.stream_loop_task and not context.stream_loop_task.done(): + logger.info(f"发现现有流循环 {stream_id},将先取消再重新创建") + existing_task = context.stream_loop_task + existing_task.cancel() + # 创建异步任务来等待取消完成,并添加异常处理 + cancel_task = asyncio.create_task( + self._wait_for_task_cancel(stream_id, existing_task), name=f"cancel_existing_loop_{stream_id}" + ) + # 为取消任务添加异常处理,避免孤儿任务 + cancel_task.add_done_callback( + lambda task: logger.debug(f"取消任务完成: {stream_id}") + if not task.exception() + else logger.error(f"取消任务异常: {stream_id} - {task.exception()}") + ) + # 检查未读消息数量 unread_count = self._get_unread_count(context) logger.info(f"流 {stream_id} 当前未读消息数: {unread_count}") - # 创建新的流循环任务 - new_task = asyncio.create_task(self._stream_loop(stream_id), name=f"force_stream_loop_{stream_id}") - self.stream_loops[stream_id] = new_task - self.stats["total_loops"] += 1 - - logger.info(f"已创建强制分发流循环: {stream_id} (当前总数: {len(self.stream_loops)})") + # 使用 start_stream_loop 重新创建流循环任务 + success = await self.start_stream_loop(stream_id, force=True) + + if success: + logger.info(f"已创建强制分发流循环: {stream_id}") + else: + logger.warning(f"创建强制分发流循环失败: {stream_id}") except Exception as e: logger.error(f"强制分发流处理失败 {stream_id}: {e}", exc_info=True) diff --git a/src/chat/message_manager/message_manager.py b/src/chat/message_manager/message_manager.py index 00a4895c7..e0dde6aec 100644 --- a/src/chat/message_manager/message_manager.py +++ b/src/chat/message_manager/message_manager.py @@ -360,7 +360,7 @@ class MessageManager: logger.error(f"清理不活跃聊天流时发生错误: {e}") async def _check_and_handle_interruption(self, chat_stream: ChatStream | None = None, message: DatabaseMessages | None = None): - """检查并处理消息打断 - 支持多重回复任务取消""" + """检查并处理消息打断 - 通过取消 stream_loop_task 实现""" if not global_config.chat.interruption_enabled or not chat_stream or not message: return @@ -374,70 +374,64 @@ class MessageManager: logger.info(f"消息 {message.message_id} 是表情包或Emoji,跳过打断检查") return - # 修复:获取所有处理任务(包括多重回复) - all_processing_tasks = self.chatter_manager.get_all_processing_tasks(chat_stream.stream_id) - - if all_processing_tasks: + # 检查是否有 stream_loop_task 在运行 + context = chat_stream.context_manager.context + stream_loop_task = context.stream_loop_task + + if stream_loop_task and not stream_loop_task.done(): # 检查触发用户ID - triggering_user_id = chat_stream.context_manager.context.triggering_user_id + triggering_user_id = context.triggering_user_id if triggering_user_id and message.user_info.user_id != triggering_user_id: logger.info(f"消息来自非触发用户 {message.user_info.user_id},实际触发用户为 {triggering_user_id},跳过打断检查") return - # 计算打断概率 - 使用新的线性概率模型 - interruption_probability = chat_stream.context_manager.context.calculate_interruption_probability( + # 计算打断概率 + interruption_probability = context.calculate_interruption_probability( global_config.chat.interruption_max_limit ) # 检查是否已达到最大打断次数 - if chat_stream.context_manager.context.interruption_count >= global_config.chat.interruption_max_limit: + if context.interruption_count >= global_config.chat.interruption_max_limit: logger.debug( - f"聊天流 {chat_stream.stream_id} 已达到最大打断次数 {chat_stream.context_manager.context.interruption_count}/{global_config.chat.interruption_max_limit},跳过打断检查" + f"聊天流 {chat_stream.stream_id} 已达到最大打断次数 {context.interruption_count}/{global_config.chat.interruption_max_limit},跳过打断检查" ) return # 根据概率决定是否打断 if random.random() < interruption_probability: - logger.info(f"聊天流 {chat_stream.stream_id} 触发消息打断,打断概率: {interruption_probability:.2f},检测到 {len(all_processing_tasks)} 个任务") + logger.info(f"聊天流 {chat_stream.stream_id} 触发消息打断,打断概率: {interruption_probability:.2f}") - # 修复:取消所有任务(包括多重回复) - cancelled_count = self.chatter_manager.cancel_all_stream_tasks(chat_stream.stream_id) - - if cancelled_count > 0: - logger.info(f"消息打断成功取消 {cancelled_count} 个任务: {chat_stream.stream_id}") - else: - logger.warning(f"消息打断未能取消任何任务: {chat_stream.stream_id}") + # 取消 stream_loop_task,子任务会通过 try-catch 自动取消 + try: + stream_loop_task.cancel() + logger.info(f"已取消流循环任务: {chat_stream.stream_id}") + except Exception as e: + logger.warning(f"取消流循环任务失败: {chat_stream.stream_id} - {e}") # 增加打断计数 - await chat_stream.context_manager.context.increment_interruption_count() + await context.increment_interruption_count() - # 新增:打断后立即重新进入聊天流程 - # 新增:打断后延迟重新进入聊天流程,以合并短时间内的多条消息 - asyncio.create_task(self._trigger_delayed_reprocess(chat_stream, delay=0.5)) + # 打断后重新创建 stream_loop 任务 + await self._trigger_reprocess(chat_stream) # 检查是否已达到最大次数 - if chat_stream.context_manager.context.interruption_count >= global_config.chat.interruption_max_limit: + if context.interruption_count >= global_config.chat.interruption_max_limit: logger.warning( - f"聊天流 {chat_stream.stream_id} 已达到最大打断次数 {chat_stream.context_manager.context.interruption_count}/{global_config.chat.interruption_max_limit},后续消息将不再打断" + f"聊天流 {chat_stream.stream_id} 已达到最大打断次数 {context.interruption_count}/{global_config.chat.interruption_max_limit},后续消息将不再打断" ) else: logger.info( - f"聊天流 {chat_stream.stream_id} 已打断并重新进入处理流程,当前打断次数: {chat_stream.context_manager.context.interruption_count}/{global_config.chat.interruption_max_limit}" + f"聊天流 {chat_stream.stream_id} 已打断并重新进入处理流程,当前打断次数: {context.interruption_count}/{global_config.chat.interruption_max_limit}" ) else: - logger.debug(f"聊天流 {chat_stream.stream_id} 未触发打断,打断概率: {interruption_probability:.2f},检测到 {len(all_processing_tasks)} 个任务") - - async def _trigger_delayed_reprocess(self, chat_stream: ChatStream, delay: float): - """打断后延迟重新进入聊天流程,以合并短时间内的多条消息""" - await asyncio.sleep(delay) - await self._trigger_reprocess(chat_stream) + logger.debug(f"聊天流 {chat_stream.stream_id} 未触发打断,打断概率: {interruption_probability:.2f}") async def _trigger_reprocess(self, chat_stream: ChatStream): - """重新处理聊天流的核心逻辑 - 支持子任务管理""" + """重新处理聊天流的核心逻辑 - 重新创建 stream_loop 任务""" try: stream_id = chat_stream.stream_id - logger.info(f"🚀 打断后立即重新处理聊天流: {stream_id}") + logger.info(f"🚀 打断后重新创建流循环任务: {stream_id}") # 等待一小段时间确保当前消息已经添加到未读消息中 await asyncio.sleep(0.1) @@ -451,47 +445,19 @@ class MessageManager: logger.debug(f"💭 聊天流 {stream_id} 没有未读消息,跳过重新处理") return - logger.info(f"💬 开始重新处理 {len(unread_messages)} 条未读消息: {stream_id}") + logger.info(f"💬 准备重新处理 {len(unread_messages)} 条未读消息: {stream_id}") - # 创建处理任务并使用try-catch实现子任务管理 - task = asyncio.create_task( - self._managed_reprocess_with_cleanup(stream_id, context), - name=f"reprocess_{stream_id}_{int(time.time())}" - ) - - # 设置处理任务 - self.chatter_manager.set_processing_task(stream_id, task) - - # 不等待完成,让它异步执行 - # 如果需要等待,调用者会等待 chatter_manager.process_stream_context + # 重新创建 stream_loop 任务 + success = await stream_loop_manager.start_stream_loop(stream_id, force=True) + + if success: + logger.info(f"✅ 成功重新创建流循环任务: {stream_id}") + else: + logger.warning(f"⚠️ 重新创建流循环任务失败: {stream_id}") except Exception as e: logger.error(f"🚨 触发重新处理时出错: {e}") - async def _managed_reprocess_with_cleanup(self, stream_id: str, context): - """带清理功能的重新处理""" - child_tasks = set() # 跟踪子任务 - - try: - # 处理流上下文 - result = await self.chatter_manager.process_stream_context(stream_id, context) - return result - - except asyncio.CancelledError: - logger.info(f"重新处理任务被取消: {stream_id}") - # 取消所有子任务 - for child_task in child_tasks: - if not child_task.done(): - child_task.cancel() - raise - except Exception as e: - logger.error(f"重新处理任务执行出错: {stream_id} - {e}") - # 清理子任务 - for child_task in child_tasks: - if not child_task.done(): - child_task.cancel() - raise - async def clear_all_unread_messages(self, stream_id: str): """清除指定上下文中的所有未读消息,在消息处理完成后调用""" try: diff --git a/src/common/data_models/message_manager_data_model.py b/src/common/data_models/message_manager_data_model.py index ac0f55921..6b0bb9ab0 100644 --- a/src/common/data_models/message_manager_data_model.py +++ b/src/common/data_models/message_manager_data_model.py @@ -40,6 +40,7 @@ class StreamContext(BaseDataModel): last_check_time: float = field(default_factory=time.time) is_active: bool = True processing_task: asyncio.Task | None = None + stream_loop_task: asyncio.Task | None = None # 流循环任务 interruption_count: int = 0 # 打断计数器 last_interruption_time: float = 0.0 # 上次打断时间