From e02af208c4f97ecfd910acc40aee5c7faa24bb87 Mon Sep 17 00:00:00 2001 From: minecraft1024a Date: Fri, 3 Oct 2025 12:55:45 +0800 Subject: [PATCH] ruff --- src/chat/chatter_manager.py | 2 +- .../enhanced_memory_adapter.py | 3 +- .../enhanced_memory_hooks.py | 1 - .../enhanced_memory_integration.py | 1 - .../deprecated_backup/integration_layer.py | 1 - .../memory_integration_hooks.py | 1 - .../multi_stage_retrieval.py | 2 +- .../message_manager/distribution_manager.py | 82 +++++++++++++------ src/chat/message_manager/message_manager.py | 2 +- 9 files changed, 61 insertions(+), 34 deletions(-) diff --git a/src/chat/chatter_manager.py b/src/chat/chatter_manager.py index cf9d3b039..8ccdfd84a 100644 --- a/src/chat/chatter_manager.py +++ b/src/chat/chatter_manager.py @@ -1,7 +1,7 @@ +import asyncio import time from typing import Any -import asyncio from src.chat.planner_actions.action_manager import ChatterActionManager from src.common.data_models.message_manager_data_model import StreamContext from src.common.logger import get_logger diff --git a/src/chat/memory_system/deprecated_backup/enhanced_memory_adapter.py b/src/chat/memory_system/deprecated_backup/enhanced_memory_adapter.py index cf93ceaf0..9f35d2d82 100644 --- a/src/chat/memory_system/deprecated_backup/enhanced_memory_adapter.py +++ b/src/chat/memory_system/deprecated_backup/enhanced_memory_adapter.py @@ -8,9 +8,8 @@ from dataclasses import dataclass from typing import Any from src.chat.memory_system.integration_layer import IntegrationConfig, IntegrationMode, MemoryIntegrationLayer -from src.chat.memory_system.memory_formatter import FormatterConfig, format_memories_for_llm - from src.chat.memory_system.memory_chunk import MemoryChunk, MemoryType +from src.chat.memory_system.memory_formatter import FormatterConfig, format_memories_for_llm from src.common.logger import get_logger from src.llm_models.utils_model import LLMRequest diff --git a/src/chat/memory_system/deprecated_backup/enhanced_memory_hooks.py b/src/chat/memory_system/deprecated_backup/enhanced_memory_hooks.py index 2794332cf..1d6e65396 100644 --- a/src/chat/memory_system/deprecated_backup/enhanced_memory_hooks.py +++ b/src/chat/memory_system/deprecated_backup/enhanced_memory_hooks.py @@ -7,7 +7,6 @@ from datetime import datetime from typing import Any from src.chat.memory_system.enhanced_memory_manager import enhanced_memory_manager - from src.common.logger import get_logger from src.config.config import global_config diff --git a/src/chat/memory_system/deprecated_backup/enhanced_memory_integration.py b/src/chat/memory_system/deprecated_backup/enhanced_memory_integration.py index 8583f7dd2..068326113 100644 --- a/src/chat/memory_system/deprecated_backup/enhanced_memory_integration.py +++ b/src/chat/memory_system/deprecated_backup/enhanced_memory_integration.py @@ -6,7 +6,6 @@ from typing import Any from src.chat.memory_system.enhanced_memory_hooks import enhanced_memory_hooks - from src.common.logger import get_logger logger = get_logger(__name__) diff --git a/src/chat/memory_system/deprecated_backup/integration_layer.py b/src/chat/memory_system/deprecated_backup/integration_layer.py index c7a27b8cb..220ec00f5 100644 --- a/src/chat/memory_system/deprecated_backup/integration_layer.py +++ b/src/chat/memory_system/deprecated_backup/integration_layer.py @@ -10,7 +10,6 @@ from enum import Enum from typing import Any from src.chat.memory_system.enhanced_memory_core import EnhancedMemorySystem - from src.chat.memory_system.memory_chunk import MemoryChunk from src.common.logger import get_logger from src.llm_models.utils_model import LLMRequest diff --git a/src/chat/memory_system/deprecated_backup/memory_integration_hooks.py b/src/chat/memory_system/deprecated_backup/memory_integration_hooks.py index a37e4c548..5dfd52c38 100644 --- a/src/chat/memory_system/deprecated_backup/memory_integration_hooks.py +++ b/src/chat/memory_system/deprecated_backup/memory_integration_hooks.py @@ -12,7 +12,6 @@ from src.chat.memory_system.enhanced_memory_adapter import ( process_conversation_with_enhanced_memory, retrieve_memories_with_enhanced_system, ) - from src.common.logger import get_logger logger = get_logger(__name__) diff --git a/src/chat/memory_system/deprecated_backup/multi_stage_retrieval.py b/src/chat/memory_system/deprecated_backup/multi_stage_retrieval.py index f13792603..529d9db99 100644 --- a/src/chat/memory_system/deprecated_backup/multi_stage_retrieval.py +++ b/src/chat/memory_system/deprecated_backup/multi_stage_retrieval.py @@ -9,8 +9,8 @@ from enum import Enum from typing import Any import orjson -from src.chat.memory_system.enhanced_reranker import EnhancedReRanker, ReRankingConfig +from src.chat.memory_system.enhanced_reranker import EnhancedReRanker, ReRankingConfig from src.chat.memory_system.memory_chunk import MemoryChunk, MemoryType from src.common.logger import get_logger diff --git a/src/chat/message_manager/distribution_manager.py b/src/chat/message_manager/distribution_manager.py index 6cf0131db..edffe966a 100644 --- a/src/chat/message_manager/distribution_manager.py +++ b/src/chat/message_manager/distribution_manager.py @@ -68,23 +68,33 @@ class StreamLoopManager: # 取消所有流循环 try: - # 创建任务列表以便并发取消 - cancel_tasks = [] - for stream_id, task in list(self.stream_loops.items()): - if not task.done(): - task.cancel() - cancel_tasks.append((stream_id, task)) + # 使用带超时的锁获取,避免无限等待 + lock_acquired = await asyncio.wait_for(self.loop_lock.acquire(), timeout=10.0) + if not lock_acquired: + logger.error("停止管理器时获取锁超时") + else: + try: + # 创建任务列表以便并发取消 + cancel_tasks = [] + for stream_id, task in list(self.stream_loops.items()): + if not task.done(): + task.cancel() + cancel_tasks.append((stream_id, task)) - # 并发等待所有任务取消 - if cancel_tasks: - logger.info(f"正在取消 {len(cancel_tasks)} 个流循环任务...") - await asyncio.gather( - *[self._wait_for_task_cancel(stream_id, task) for stream_id, task in cancel_tasks], - return_exceptions=True - ) + # 并发等待所有任务取消 + if cancel_tasks: + logger.info(f"正在取消 {len(cancel_tasks)} 个流循环任务...") + await asyncio.gather( + *[self._wait_for_task_cancel(stream_id, task) for stream_id, task in cancel_tasks], + return_exceptions=True + ) - self.stream_loops.clear() - logger.info("所有流循环已清理") + self.stream_loops.clear() + logger.info("所有流循环已清理") + finally: + self.loop_lock.release() + except asyncio.TimeoutError: + logger.error("停止管理器时获取锁超时") except Exception as e: logger.error(f"停止管理器时出错: {e}") @@ -183,9 +193,31 @@ class StreamLoopManager: except Exception as e: logger.error(f"等待流循环任务结束时出错: {stream_id} - {e}") - del self.stream_loops[stream_id] - logger.info(f"停止流循环: {stream_id} (剩余: {len(self.stream_loops)})") - return True + try: + # 双重检查:在获取锁后再次检查流是否存在 + if stream_id not in self.stream_loops: + logger.debug(f"流 {stream_id} 循环不存在(双重检查)") + return False + + task = self.stream_loops[stream_id] + if not task.done(): + task.cancel() + try: + # 设置取消超时,避免无限等待 + await asyncio.wait_for(task, timeout=5.0) + except asyncio.CancelledError: + logger.debug(f"流循环任务已取消: {stream_id}") + except asyncio.TimeoutError: + logger.warning(f"流循环任务取消超时: {stream_id}") + except Exception as e: + logger.error(f"等待流循环任务结束时出错: {stream_id} - {e}") + + del self.stream_loops[stream_id] + logger.info(f"停止流循环: {stream_id} (剩余: {len(self.stream_loops)})") + return True + finally: + # 确保锁被释放 + self.loop_lock.release() async def _stream_loop(self, stream_id: str) -> None: """单个流的无限循环 @@ -499,7 +531,7 @@ class StreamLoopManager: stream_id: 流ID """ logger.info(f"强制分发流处理: {stream_id}") - + try: # 检查是否有现有的分发循环 if stream_id in self.stream_loops: @@ -519,24 +551,24 @@ class StreamLoopManager: ) # 从字典中移除 del self.stream_loops[stream_id] - + # 获取聊天管理器和流 chat_manager = get_chat_manager() chat_stream = chat_manager.get_stream(stream_id) if not chat_stream: logger.warning(f"强制分发时未找到流: {stream_id}") return - + # 获取流上下文 context = chat_stream.context_manager.context if not context: logger.warning(f"强制分发时未找到流上下文: {stream_id}") return - + # 检查未读消息数量 unread_count = self._get_unread_count(context) logger.info(f"流 {stream_id} 当前未读消息数: {unread_count}") - + # 创建新的流循环任务 new_task = asyncio.create_task( self._stream_loop(stream_id), @@ -544,9 +576,9 @@ class StreamLoopManager: ) self.stream_loops[stream_id] = new_task self.stats["total_loops"] += 1 - + logger.info(f"已创建强制分发流循环: {stream_id} (当前总数: {len(self.stream_loops)})") - + except Exception as e: logger.error(f"强制分发流处理失败 {stream_id}: {e}", exc_info=True) diff --git a/src/chat/message_manager/message_manager.py b/src/chat/message_manager/message_manager.py index 8f43f6128..a714fd957 100644 --- a/src/chat/message_manager/message_manager.py +++ b/src/chat/message_manager/message_manager.py @@ -90,7 +90,7 @@ class MessageManager: return await self._check_and_handle_interruption(chat_stream) await chat_stream.context_manager.add_message(message) - + except Exception as e: logger.error(f"添加消息到聊天流 {stream_id} 时发生错误: {e}")