From 06484245e1e0b6dacc18063bd4d205d3e79f53fa Mon Sep 17 00:00:00 2001 From: Windpicker-owo <3431391539@qq.com> Date: Fri, 3 Oct 2025 01:14:53 +0800 Subject: [PATCH] =?UTF-8?q?refactor(chat):=20=E4=BC=98=E5=8C=96=E6=B5=81?= =?UTF-8?q?=E5=BE=AA=E7=8E=AF=E7=AE=A1=E7=90=86=E5=99=A8=E7=9A=84=E5=B9=B6?= =?UTF-8?q?=E5=8F=91=E6=80=A7=E8=83=BD=E5=92=8C=E5=81=A5=E5=A3=AE=E6=80=A7?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 改进StreamLoopManager的锁机制和任务管理,添加超时控制避免死锁,使用并发操作提升性能,增强异常处理和日志记录确保系统稳定性。 --- .../message_manager/distribution_manager.py | 277 +++++++++++++++--- 1 file changed, 236 insertions(+), 41 deletions(-) diff --git a/src/chat/message_manager/distribution_manager.py b/src/chat/message_manager/distribution_manager.py index 152c40362..4170c11f2 100644 --- a/src/chat/message_manager/distribution_manager.py +++ b/src/chat/message_manager/distribution_manager.py @@ -67,15 +67,36 @@ class StreamLoopManager: self.is_running = False # 取消所有流循环 - async with self.loop_lock: - for task in list(self.stream_loops.values()): - if not task.done(): - task.cancel() - try: - await task - except asyncio.CancelledError: - pass - self.stream_loops.clear() + try: + # 使用带超时的锁获取,避免无限等待 + lock_acquired = await asyncio.wait_for(self.loop_lock.acquire(), timeout=10.0) + if not lock_acquired: + logger.error("停止管理器时获取锁超时") + else: + try: + # 创建任务列表以便并发取消 + cancel_tasks = [] + for stream_id, task in list(self.stream_loops.items()): + if not task.done(): + task.cancel() + cancel_tasks.append((stream_id, task)) + + # 并发等待所有任务取消 + if cancel_tasks: + logger.info(f"正在取消 {len(cancel_tasks)} 个流循环任务...") + await asyncio.gather( + *[self._wait_for_task_cancel(stream_id, task) for stream_id, task in cancel_tasks], + return_exceptions=True + ) + + self.stream_loops.clear() + logger.info("所有流循环已清理") + finally: + self.loop_lock.release() + except asyncio.TimeoutError: + logger.error("停止管理器时获取锁超时") + except Exception as e: + logger.error(f"停止管理器时获取锁异常: {e}") logger.info("流循环管理器已停止") @@ -88,34 +109,84 @@ class StreamLoopManager: Returns: bool: 是否成功启动 """ - async with self.loop_lock: - # 检查是否已有循环在运行 + # 使用更细粒度的锁策略:先检查是否需要锁,再获取锁 + # 快速路径:如果流已存在,无需获取锁 + if stream_id in self.stream_loops: + logger.debug(f"流 {stream_id} 循环已在运行") + return True + + # 判断是否需要强制分发(在锁外执行,减少锁持有时间) + should_force = force or self._should_force_dispatch_for_stream(stream_id) + + # 获取锁进行流循环创建 + try: + # 使用带超时的锁获取,避免无限等待 + lock_acquired = await asyncio.wait_for(self.loop_lock.acquire(), timeout=5.0) + if not lock_acquired: + logger.error(f"获取流循环锁超时: {stream_id}") + return False + except asyncio.TimeoutError: + logger.error(f"获取流循环锁超时: {stream_id}") + return False + except Exception as e: + logger.error(f"获取流循环锁异常: {stream_id} - {e}") + return False + + try: + # 双重检查:在获取锁后再次检查流是否已存在 if stream_id in self.stream_loops: - logger.debug(f"流 {stream_id} 循环已在运行") + logger.debug(f"流 {stream_id} 循环已在运行(双重检查)") return True - # 判断是否需要强制分发 - force = force or self._should_force_dispatch_for_stream(stream_id) - # 检查是否超过最大并发限制 - if len(self.stream_loops) >= self.max_concurrent_streams and not force: - logger.warning(f"超过最大并发流数限制,无法启动流 {stream_id}") + current_streams = len(self.stream_loops) + if current_streams >= self.max_concurrent_streams and not should_force: + logger.warning( + f"超过最大并发流数限制({current_streams}/{self.max_concurrent_streams}),无法启动流 {stream_id}" + ) return False - if force and len(self.stream_loops) >= self.max_concurrent_streams: + if should_force and current_streams >= self.max_concurrent_streams: logger.warning( - "流 %s 未读消息积压严重(>%s),突破并发限制强制启动分发", - stream_id, - self.force_dispatch_unread_threshold, + f"流 {stream_id} 未读消息积压严重(>{self.force_dispatch_unread_threshold}),突破并发限制强制启动分发 (当前: {current_streams}/{self.max_concurrent_streams})" ) + # 检查是否有现有的分发循环,如果有则先移除 + if stream_id in self.stream_loops: + logger.info(f"发现现有流循环 {stream_id},将先移除再重新创建") + existing_task = self.stream_loops[stream_id] + if not existing_task.done(): + existing_task.cancel() + # 创建异步任务来等待取消完成,并添加异常处理 + cancel_task = asyncio.create_task( + self._wait_for_task_cancel(stream_id, existing_task), + name=f"cancel_existing_loop_{stream_id}" + ) + # 为取消任务添加异常处理,避免孤儿任务 + cancel_task.add_done_callback( + lambda task: logger.debug(f"取消任务完成: {stream_id}") if not task.exception() + else logger.error(f"取消任务异常: {stream_id} - {task.exception()}") + ) + # 从字典中移除 + del self.stream_loops[stream_id] + current_streams -= 1 # 更新当前流数量 # 创建流循环任务 - task = asyncio.create_task(self._stream_loop(stream_id)) - self.stream_loops[stream_id] = task - self.stats["total_loops"] += 1 + try: + task = asyncio.create_task( + self._stream_loop(stream_id), + name=f"stream_loop_{stream_id}" # 为任务添加名称,便于调试 + ) + self.stream_loops[stream_id] = task + self.stats["total_loops"] += 1 - logger.info(f"启动流循环: {stream_id}") - return True + logger.info(f"启动流循环: {stream_id} (当前总数: {len(self.stream_loops)})") + return True + except Exception as e: + logger.error(f"创建流循环任务失败: {stream_id} - {e}") + return False + finally: + # 确保锁被释放 + self.loop_lock.release() async def stop_stream_loop(self, stream_id: str) -> bool: """停止指定流的循环任务 @@ -126,20 +197,51 @@ class StreamLoopManager: Returns: bool: 是否成功停止 """ - async with self.loop_lock: - if stream_id in self.stream_loops: - task = self.stream_loops[stream_id] - if not task.done(): - task.cancel() - try: - await task - except asyncio.CancelledError: - pass - del self.stream_loops[stream_id] - logger.info(f"停止流循环: {stream_id}") - return True + # 快速路径:如果流不存在,无需获取锁 + if stream_id not in self.stream_loops: + logger.debug(f"流 {stream_id} 循环不存在,无需停止") return False + # 获取锁进行流循环停止 + try: + # 使用带超时的锁获取,避免无限等待 + lock_acquired = await asyncio.wait_for(self.loop_lock.acquire(), timeout=5.0) + if not lock_acquired: + logger.error(f"获取流循环锁超时: {stream_id}") + return False + except asyncio.TimeoutError: + logger.error(f"获取流循环锁超时: {stream_id}") + return False + except Exception as e: + logger.error(f"获取流循环锁异常: {stream_id} - {e}") + return False + + try: + # 双重检查:在获取锁后再次检查流是否存在 + if stream_id not in self.stream_loops: + logger.debug(f"流 {stream_id} 循环不存在(双重检查)") + return False + + task = self.stream_loops[stream_id] + if not task.done(): + task.cancel() + try: + # 设置取消超时,避免无限等待 + await asyncio.wait_for(task, timeout=5.0) + except asyncio.CancelledError: + logger.debug(f"流循环任务已取消: {stream_id}") + except asyncio.TimeoutError: + logger.warning(f"流循环任务取消超时: {stream_id}") + except Exception as e: + logger.error(f"等待流循环任务结束时出错: {stream_id} - {e}") + + del self.stream_loops[stream_id] + logger.info(f"停止流循环: {stream_id} (剩余: {len(self.stream_loops)})") + return True + finally: + # 确保锁被释放 + self.loop_lock.release() + async def _stream_loop(self, stream_id: str) -> None: """单个流的无限循环 @@ -206,9 +308,22 @@ class StreamLoopManager: finally: # 清理循环标记 - async with self.loop_lock: - if stream_id in self.stream_loops: - del self.stream_loops[stream_id] + try: + # 使用带超时的锁获取,避免无限等待 + lock_acquired = await asyncio.wait_for(self.loop_lock.acquire(), timeout=5.0) + if not lock_acquired: + logger.error(f"流结束时获取锁超时: {stream_id}") + else: + try: + if stream_id in self.stream_loops: + del self.stream_loops[stream_id] + logger.debug(f"清理流循环标记: {stream_id}") + finally: + self.loop_lock.release() + except asyncio.TimeoutError: + logger.error(f"流结束时获取锁超时: {stream_id}") + except Exception as e: + logger.error(f"流结束时获取锁异常: {stream_id} - {e}") logger.info(f"流循环结束: {stream_id}") @@ -416,6 +531,86 @@ class StreamLoopManager: except Exception as e: logger.warning(f"刷新聊天流 {stream_id} 能量失败: {e}") + async def _wait_for_task_cancel(self, stream_id: str, task: asyncio.Task) -> None: + """等待任务取消完成,带有超时控制 + + Args: + stream_id: 流ID + task: 要等待取消的任务 + """ + try: + await asyncio.wait_for(task, timeout=5.0) + logger.debug(f"流循环任务已正常结束: {stream_id}") + except asyncio.CancelledError: + logger.debug(f"流循环任务已取消: {stream_id}") + except asyncio.TimeoutError: + logger.warning(f"流循环任务取消超时: {stream_id}") + except Exception as e: + logger.error(f"等待流循环任务结束时出错: {stream_id} - {e}") + + def _force_dispatch_stream(self, stream_id: str) -> None: + """强制分发流处理 + + 当流的未读消息超过阈值时,强制触发分发处理 + 这个方法主要用于突破并发限制时的紧急处理 + + 注意:此方法目前未被使用,相关功能已集成到 start_stream_loop 方法中 + + Args: + stream_id: 流ID + """ + logger.info(f"强制分发流处理: {stream_id}") + + try: + # 检查是否有现有的分发循环 + if stream_id in self.stream_loops: + logger.info(f"发现现有流循环 {stream_id},将先移除再重新创建") + existing_task = self.stream_loops[stream_id] + if not existing_task.done(): + existing_task.cancel() + # 创建异步任务来等待取消完成,并添加异常处理 + cancel_task = asyncio.create_task( + self._wait_for_task_cancel(stream_id, existing_task), + name=f"cancel_existing_loop_{stream_id}" + ) + # 为取消任务添加异常处理,避免孤儿任务 + cancel_task.add_done_callback( + lambda task: logger.debug(f"取消任务完成: {stream_id}") if not task.exception() + else logger.error(f"取消任务异常: {stream_id} - {task.exception()}") + ) + # 从字典中移除 + del self.stream_loops[stream_id] + + # 获取聊天管理器和流 + chat_manager = get_chat_manager() + chat_stream = chat_manager.get_stream(stream_id) + if not chat_stream: + logger.warning(f"强制分发时未找到流: {stream_id}") + return + + # 获取流上下文 + context = chat_stream.context_manager.context + if not context: + logger.warning(f"强制分发时未找到流上下文: {stream_id}") + return + + # 检查未读消息数量 + unread_count = self._get_unread_count(context) + logger.info(f"流 {stream_id} 当前未读消息数: {unread_count}") + + # 创建新的流循环任务 + new_task = asyncio.create_task( + self._stream_loop(stream_id), + name=f"force_stream_loop_{stream_id}" + ) + self.stream_loops[stream_id] = new_task + self.stats["total_loops"] += 1 + + logger.info(f"已创建强制分发流循环: {stream_id} (当前总数: {len(self.stream_loops)})") + + except Exception as e: + logger.error(f"强制分发流处理失败 {stream_id}: {e}", exc_info=True) + # 全局流循环管理器实例 stream_loop_manager = StreamLoopManager()