From b24eaba7ff1544fbb6bc2a20d995e7148708ebaf Mon Sep 17 00:00:00 2001 From: tt-P607 <68868379+tt-P607@users.noreply.github.com> Date: Fri, 3 Oct 2025 10:15:24 +0800 Subject: [PATCH 1/3] =?UTF-8?q?fix(chat):=20=E7=A1=AE=E4=BF=9D=E5=9C=A8?= =?UTF-8?q?=E6=B5=81=E5=8F=96=E6=B6=88=E6=97=B6=E6=AD=A3=E7=A1=AE=E5=8F=96?= =?UTF-8?q?=E6=B6=88chatter=E5=A4=84=E7=90=86=E4=BB=BB=E5=8A=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 先前,当一个流循环(stream loop)被取消时,为其创建的 chatter 处理任务(`process_stream_context`)不会被一并取消。 这可能导致任务泄露,即“孤儿”任务在后台继续运行,消耗资源并可能引发意外行为。 本次修改引入了一个任务跟踪机制: - `ChatterManager`现在会记录每个流正在运行的处理任务。 - 当流循环捕获到 `CancelledError` 时,它会主动取消关联的 chatter 任务。 这确保了在流停止时,相关的计算资源能够被正确、及时地释放,提高了系统的健壮性。 --- src/chat/chatter_manager.py | 10 ++++++++++ src/chat/message_manager/distribution_manager.py | 10 ++++++++-- 2 files changed, 18 insertions(+), 2 deletions(-) diff --git a/src/chat/chatter_manager.py b/src/chat/chatter_manager.py index d8eda9baa..cf9d3b039 100644 --- a/src/chat/chatter_manager.py +++ b/src/chat/chatter_manager.py @@ -1,6 +1,7 @@ import time from typing import Any +import asyncio from src.chat.planner_actions.action_manager import ChatterActionManager from src.common.data_models.message_manager_data_model import StreamContext from src.common.logger import get_logger @@ -15,6 +16,7 @@ class ChatterManager: self.action_manager = action_manager self.chatter_classes: dict[ChatType, list[type]] = {} self.instances: dict[str, BaseChatter] = {} + self._processing_tasks: dict[str, asyncio.Task] = {} # 管理器统计 self.stats = { @@ -155,3 +157,11 @@ class ChatterManager: "successful_executions": 0, "failed_executions": 0, } + + def set_processing_task(self, stream_id: str, task: asyncio.Task): + """设置流的处理任务""" + self._processing_tasks[stream_id] = task + + def get_processing_task(self, stream_id: str) -> asyncio.Task | None: + """获取流的处理任务""" + return self._processing_tasks.get(stream_id) diff --git a/src/chat/message_manager/distribution_manager.py b/src/chat/message_manager/distribution_manager.py index fa6bcea0d..516691bae 100644 --- a/src/chat/message_manager/distribution_manager.py +++ b/src/chat/message_manager/distribution_manager.py @@ -301,6 +301,11 @@ class StreamLoopManager: except asyncio.CancelledError: logger.info(f"流循环被取消: {stream_id}") + if self.chatter_manager: + task = self.chatter_manager.get_processing_task(stream_id) + if task and not task.done(): + task.cancel() + logger.debug(f"已取消 chatter 处理任务: {stream_id}") break except Exception as e: logger.error(f"流循环出错 {stream_id}: {e}", exc_info=True) @@ -388,8 +393,9 @@ class StreamLoopManager: start_time = time.time() # 直接调用chatter_manager处理流上下文 - context.processing_task = asyncio.create_task(self.chatter_manager.process_stream_context(stream_id, context)) - results = await context.processing_task + task = asyncio.create_task(self.chatter_manager.process_stream_context(stream_id, context)) + self.chatter_manager.set_processing_task(stream_id, task) + results = await task success = results.get("success", False) if success: From d5c8bde3a2298da67a7e6e698f0a574a1b002e97 Mon Sep 17 00:00:00 2001 From: tt-P607 <68868379+tt-P607@users.noreply.github.com> Date: Fri, 3 Oct 2025 10:39:21 +0800 Subject: [PATCH 2/3] =?UTF-8?q?fix(chat):=20=E4=BF=AE=E5=A4=8D=E8=83=BD?= =?UTF-8?q?=E9=87=8F=E8=B0=83=E6=95=B4=E8=AE=A1=E7=AE=97=E4=B8=AD=E7=9A=84?= =?UTF-8?q?=E8=B4=9F=E6=95=B0=E5=B9=82=E9=94=99=E8=AF=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 当能量值非常接近高能量阈值(0.7)时,由于浮点数精度问题,`energy - 0.7` 的结果可能为一个极小的负数,这会导致 `ValueError` (negative number cannot be raised to a fractional power)。 通过使用 `max(0, ...)` 来确保幂运算的基数始终为非负数,从而解决了这个潜在的运行时错误,增强了系统的健壮性。 --- src/chat/energy_system/energy_manager.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/chat/energy_system/energy_manager.py b/src/chat/energy_system/energy_manager.py index 9983174d6..4fbd05c48 100644 --- a/src/chat/energy_system/energy_manager.py +++ b/src/chat/energy_system/energy_manager.py @@ -365,7 +365,7 @@ class EnergyManager: # 计算与阈值的相对位置 if energy >= high_threshold: # 高能量区域:指数增强 - adjusted = 0.7 + (energy - 0.7) ** 0.8 + adjusted = 0.7 + max(0, energy - 0.7) ** 0.8 elif energy >= reply_threshold: # 中等能量区域:线性保持 adjusted = energy From e9fbd749d83e0bb6bdbc7f91e8ee7dbc2a68a584 Mon Sep 17 00:00:00 2001 From: minecraft1024a Date: Fri, 3 Oct 2025 12:55:45 +0800 Subject: [PATCH 3/3] ruff --- src/chat/chatter_manager.py | 2 +- .../enhanced_memory_adapter.py | 3 +-- .../enhanced_memory_hooks.py | 1 - .../enhanced_memory_integration.py | 1 - .../deprecated_backup/integration_layer.py | 1 - .../memory_integration_hooks.py | 1 - .../multi_stage_retrieval.py | 2 +- .../message_manager/distribution_manager.py | 20 +++++++++---------- src/chat/message_manager/message_manager.py | 2 +- 9 files changed, 14 insertions(+), 19 deletions(-) diff --git a/src/chat/chatter_manager.py b/src/chat/chatter_manager.py index cf9d3b039..8ccdfd84a 100644 --- a/src/chat/chatter_manager.py +++ b/src/chat/chatter_manager.py @@ -1,7 +1,7 @@ +import asyncio import time from typing import Any -import asyncio from src.chat.planner_actions.action_manager import ChatterActionManager from src.common.data_models.message_manager_data_model import StreamContext from src.common.logger import get_logger diff --git a/src/chat/memory_system/deprecated_backup/enhanced_memory_adapter.py b/src/chat/memory_system/deprecated_backup/enhanced_memory_adapter.py index cf93ceaf0..9f35d2d82 100644 --- a/src/chat/memory_system/deprecated_backup/enhanced_memory_adapter.py +++ b/src/chat/memory_system/deprecated_backup/enhanced_memory_adapter.py @@ -8,9 +8,8 @@ from dataclasses import dataclass from typing import Any from src.chat.memory_system.integration_layer import IntegrationConfig, IntegrationMode, MemoryIntegrationLayer -from src.chat.memory_system.memory_formatter import FormatterConfig, format_memories_for_llm - from src.chat.memory_system.memory_chunk import MemoryChunk, MemoryType +from src.chat.memory_system.memory_formatter import FormatterConfig, format_memories_for_llm from src.common.logger import get_logger from src.llm_models.utils_model import LLMRequest diff --git a/src/chat/memory_system/deprecated_backup/enhanced_memory_hooks.py b/src/chat/memory_system/deprecated_backup/enhanced_memory_hooks.py index 2794332cf..1d6e65396 100644 --- a/src/chat/memory_system/deprecated_backup/enhanced_memory_hooks.py +++ b/src/chat/memory_system/deprecated_backup/enhanced_memory_hooks.py @@ -7,7 +7,6 @@ from datetime import datetime from typing import Any from src.chat.memory_system.enhanced_memory_manager import enhanced_memory_manager - from src.common.logger import get_logger from src.config.config import global_config diff --git a/src/chat/memory_system/deprecated_backup/enhanced_memory_integration.py b/src/chat/memory_system/deprecated_backup/enhanced_memory_integration.py index 8583f7dd2..068326113 100644 --- a/src/chat/memory_system/deprecated_backup/enhanced_memory_integration.py +++ b/src/chat/memory_system/deprecated_backup/enhanced_memory_integration.py @@ -6,7 +6,6 @@ from typing import Any from src.chat.memory_system.enhanced_memory_hooks import enhanced_memory_hooks - from src.common.logger import get_logger logger = get_logger(__name__) diff --git a/src/chat/memory_system/deprecated_backup/integration_layer.py b/src/chat/memory_system/deprecated_backup/integration_layer.py index c7a27b8cb..220ec00f5 100644 --- a/src/chat/memory_system/deprecated_backup/integration_layer.py +++ b/src/chat/memory_system/deprecated_backup/integration_layer.py @@ -10,7 +10,6 @@ from enum import Enum from typing import Any from src.chat.memory_system.enhanced_memory_core import EnhancedMemorySystem - from src.chat.memory_system.memory_chunk import MemoryChunk from src.common.logger import get_logger from src.llm_models.utils_model import LLMRequest diff --git a/src/chat/memory_system/deprecated_backup/memory_integration_hooks.py b/src/chat/memory_system/deprecated_backup/memory_integration_hooks.py index a37e4c548..5dfd52c38 100644 --- a/src/chat/memory_system/deprecated_backup/memory_integration_hooks.py +++ b/src/chat/memory_system/deprecated_backup/memory_integration_hooks.py @@ -12,7 +12,6 @@ from src.chat.memory_system.enhanced_memory_adapter import ( process_conversation_with_enhanced_memory, retrieve_memories_with_enhanced_system, ) - from src.common.logger import get_logger logger = get_logger(__name__) diff --git a/src/chat/memory_system/deprecated_backup/multi_stage_retrieval.py b/src/chat/memory_system/deprecated_backup/multi_stage_retrieval.py index f13792603..529d9db99 100644 --- a/src/chat/memory_system/deprecated_backup/multi_stage_retrieval.py +++ b/src/chat/memory_system/deprecated_backup/multi_stage_retrieval.py @@ -9,8 +9,8 @@ from enum import Enum from typing import Any import orjson -from src.chat.memory_system.enhanced_reranker import EnhancedReRanker, ReRankingConfig +from src.chat.memory_system.enhanced_reranker import EnhancedReRanker, ReRankingConfig from src.chat.memory_system.memory_chunk import MemoryChunk, MemoryType from src.common.logger import get_logger diff --git a/src/chat/message_manager/distribution_manager.py b/src/chat/message_manager/distribution_manager.py index 516691bae..8de7d3355 100644 --- a/src/chat/message_manager/distribution_manager.py +++ b/src/chat/message_manager/distribution_manager.py @@ -81,7 +81,7 @@ class StreamLoopManager: if not task.done(): task.cancel() cancel_tasks.append((stream_id, task)) - + # 并发等待所有任务取消 if cancel_tasks: logger.info(f"正在取消 {len(cancel_tasks)} 个流循环任务...") @@ -89,7 +89,7 @@ class StreamLoopManager: *[self._wait_for_task_cancel(stream_id, task) for stream_id, task in cancel_tasks], return_exceptions=True ) - + self.stream_loops.clear() logger.info("所有流循环已清理") finally: @@ -235,7 +235,7 @@ class StreamLoopManager: logger.warning(f"流循环任务取消超时: {stream_id}") except Exception as e: logger.error(f"等待流循环任务结束时出错: {stream_id} - {e}") - + del self.stream_loops[stream_id] logger.info(f"停止流循环: {stream_id} (剩余: {len(self.stream_loops)})") return True @@ -568,7 +568,7 @@ class StreamLoopManager: stream_id: 流ID """ logger.info(f"强制分发流处理: {stream_id}") - + try: # 检查是否有现有的分发循环 if stream_id in self.stream_loops: @@ -588,24 +588,24 @@ class StreamLoopManager: ) # 从字典中移除 del self.stream_loops[stream_id] - + # 获取聊天管理器和流 chat_manager = get_chat_manager() chat_stream = chat_manager.get_stream(stream_id) if not chat_stream: logger.warning(f"强制分发时未找到流: {stream_id}") return - + # 获取流上下文 context = chat_stream.context_manager.context if not context: logger.warning(f"强制分发时未找到流上下文: {stream_id}") return - + # 检查未读消息数量 unread_count = self._get_unread_count(context) logger.info(f"流 {stream_id} 当前未读消息数: {unread_count}") - + # 创建新的流循环任务 new_task = asyncio.create_task( self._stream_loop(stream_id), @@ -613,9 +613,9 @@ class StreamLoopManager: ) self.stream_loops[stream_id] = new_task self.stats["total_loops"] += 1 - + logger.info(f"已创建强制分发流循环: {stream_id} (当前总数: {len(self.stream_loops)})") - + except Exception as e: logger.error(f"强制分发流处理失败 {stream_id}: {e}", exc_info=True) diff --git a/src/chat/message_manager/message_manager.py b/src/chat/message_manager/message_manager.py index 8f43f6128..a714fd957 100644 --- a/src/chat/message_manager/message_manager.py +++ b/src/chat/message_manager/message_manager.py @@ -90,7 +90,7 @@ class MessageManager: return await self._check_and_handle_interruption(chat_stream) await chat_stream.context_manager.add_message(message) - + except Exception as e: logger.error(f"添加消息到聊天流 {stream_id} 时发生错误: {e}")