From 7f0494cbc30ff489f9bc40f245a800ddca7db78d Mon Sep 17 00:00:00 2001 From: tt-P607 <68868379+tt-P607@users.noreply.github.com> Date: Thu, 4 Dec 2025 19:26:31 +0800 Subject: [PATCH] =?UTF-8?q?Revert=20"feat(core):=20=E5=AE=9E=E7=8E=B0?= =?UTF-8?q?=E6=AD=BB=E9=94=81=E6=A3=80=E6=B5=8B=E5=99=A8=E5=B9=B6=E6=94=B9?= =?UTF-8?q?=E8=BF=9B=20LLM=20=E6=B6=88=E6=81=AF=E6=8B=86=E5=88=86=20?= =?UTF-8?q?=E6=9C=AC=E6=AC=A1=E6=8F=90=E4=BA=A4=E5=BC=95=E5=85=A5=E4=BA=86?= =?UTF-8?q?=E4=B8=A4=E4=B8=AA=E4=B8=BB=E8=A6=81=E5=A2=9E=E5=BC=BA=E5=8A=9F?= =?UTF-8?q?=E8=83=BD=EF=BC=9A=E5=9C=A8=20`StreamLoopManager`=20=E4=B8=AD?= =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E6=AD=BB=E9=94=81=E6=A3=80=E6=B5=8B=E6=9C=BA?= =?UTF-8?q?=E5=88=B6=E4=BB=A5=E6=8F=90=E9=AB=98=E7=B3=BB=E7=BB=9F=E7=A8=B3?= =?UTF-8?q?=E5=AE=9A=E6=80=A7=EF=BC=8C=E4=BB=A5=E5=8F=8A=E5=AF=B9=20Kokoro?= =?UTF-8?q?=20Flow=20Chatter=20(KFC)=20=E7=9A=84=E6=B6=88=E6=81=AF?= =?UTF-8?q?=E6=8B=86=E5=88=86=E7=AD=96=E7=95=A5=E8=BF=9B=E8=A1=8C=E8=B0=83?= =?UTF-8?q?=E6=95=B4=EF=BC=8C=E4=BB=A5=E7=94=9F=E6=88=90=E6=9B=B4=E8=87=AA?= =?UTF-8?q?=E7=84=B6=E3=80=81=E6=9B=B4=E8=B4=B4=E8=BF=91=E4=BA=BA=E7=B1=BB?= =?UTF-8?q?=E7=9A=84=E5=AF=B9=E8=AF=9D=E3=80=82=20**`StreamLoopManager`=20?= =?UTF-8?q?=E4=B8=AD=E7=9A=84=E6=AD=BB=E9=94=81=E6=A3=80=E6=B5=8B=EF=BC=9A?= =?UTF-8?q?**=20-=20=E6=96=B0=E7=9A=84=E6=AD=BB=E9=94=81=E6=A3=80=E6=B5=8B?= =?UTF-8?q?=E5=99=A8=E7=8E=B0=E5=9C=A8=E4=BC=9A=E5=AE=9A=E6=9C=9F=E8=BF=90?= =?UTF-8?q?=E8=A1=8C=EF=BC=8C=E7=9B=91=E6=8E=A7=E6=89=80=E6=9C=89=E6=B4=BB?= =?UTF-8?q?=E5=8A=A8=E6=B6=88=E6=81=AF=E6=B5=81=E3=80=82=20-=20=E5=AE=83?= =?UTF-8?q?=E4=BC=9A=E8=B7=9F=E8=B8=AA=E6=AF=8F=E4=B8=AA=E6=B6=88=E6=81=AF?= =?UTF-8?q?=E6=B5=81=E7=9A=84=E6=9C=80=E5=90=8E=E6=B4=BB=E5=8A=A8=E6=97=B6?= =?UTF-8?q?=E9=97=B4=EF=BC=8C=E5=B9=B6=E6=A0=87=E8=AE=B0=E4=BB=BB=E4=BD=95?= =?UTF-8?q?=E8=B6=85=E8=BF=87=E4=B8=A4=E5=88=86=E9=92=9F=E6=9C=AA=E6=B4=BB?= =?UTF-8?q?=E5=8A=A8=E7=9A=84=E6=B5=81=E4=B8=BA=E6=BD=9C=E5=9C=A8=E6=AD=BB?= =?UTF-8?q?=E9=94=81=E3=80=82=20-=20=E8=BF=99=E7=A7=8D=E4=B8=BB=E5=8A=A8?= =?UTF-8?q?=E7=9B=91=E6=8E=A7=E6=9C=89=E5=8A=A9=E4=BA=8E=E8=AF=86=E5=88=AB?= =?UTF-8?q?=E5=92=8C=E8=AF=8A=E6=96=AD=E5=8F=AF=E8=83=BD=E5=8D=A1=E4=BD=8F?= =?UTF-8?q?=E7=9A=84=E6=B6=88=E6=81=AF=E6=B5=81=EF=BC=8C=E9=98=B2=E6=AD=A2?= =?UTF-8?q?=E7=B3=BB=E7=BB=9F=E6=95=B4=E4=BD=93=E5=86=BB=E7=BB=93=E3=80=82?= =?UTF-8?q?=20-=20=E4=B8=BA=E4=BA=86=E9=81=BF=E5=85=8D=E5=9C=A8=E9=95=BF?= =?UTF-8?q?=E6=97=B6=E9=97=B4=E7=AD=89=E5=BE=85=EF=BC=88=E4=BE=8B=E5=A6=82?= =?UTF-8?q?=E7=AD=89=E5=BE=85=E7=94=A8=E6=88=B7=E5=9B=9E=E5=A4=8D=E6=88=96?= =?UTF-8?q?=E9=95=BF=E6=97=B6=E9=97=B4=20LLM=20=E7=94=9F=E6=88=90=EF=BC=89?= =?UTF-8?q?=E6=9C=9F=E9=97=B4=E5=87=BA=E7=8E=B0=E8=AF=AF=E6=8A=A5=EF=BC=8C?= =?UTF-8?q?=E6=B6=88=E6=81=AF=E6=B5=81=E5=BE=AA=E7=8E=AF=E7=8E=B0=E5=9C=A8?= =?UTF-8?q?=E5=8D=B3=E4=BD=BF=E5=9C=A8=E7=9D=A1=E7=9C=A0=E6=88=96=E5=A4=84?= =?UTF-8?q?=E7=90=86=E9=98=B6=E6=AE=B5=E4=B9=9F=E4=BC=9A=E5=AE=9A=E6=9C=9F?= =?UTF-8?q?=E6=9B=B4=E6=96=B0=E5=85=B6=E6=B4=BB=E5=8A=A8=E6=97=B6=E9=97=B4?= =?UTF-8?q?=E6=88=B3=E3=80=82=20**KFC=20=E4=B8=AD=E7=9A=84=E6=B6=88?= =?UTF-8?q?=E6=81=AF=E6=8B=86=E5=88=86=E4=BC=98=E5=8C=96=EF=BC=9A**=20-=20?= =?UTF-8?q?=E8=87=AA=E5=8A=A8,=E5=93=8D=E5=BA=94=E5=90=8E=E5=A4=84?= =?UTF-8?q?=E7=90=86=E5=99=A8=E4=B8=AD=E7=9A=84=E5=9F=BA=E4=BA=8E=E8=A7=84?= =?UTF-8?q?=E5=88=99=E7=9A=84=E6=B6=88=E6=81=AF=E6=8B=86=E5=88=86=E5=99=A8?= =?UTF-8?q?=E5=B7=B2=E8=A2=AB=E7=A6=81=E7=94=A8=E3=80=82-=20=E6=B6=88?= =?UTF-8?q?=E6=81=AF=E6=8B=86=E5=88=86=E7=9A=84=E8=B4=A3=E4=BB=BB=E7=8E=B0?= =?UTF-8?q?=E5=9C=A8=E5=AE=8C=E5=85=A8=E4=BA=A4=E7=94=B1=E5=A4=A7=E5=9E=8B?= =?UTF-8?q?=E8=AF=AD=E8=A8=80=E6=A8=A1=E5=9E=8B=EF=BC=88LLM=EF=BC=89?= =?UTF-8?q?=E5=A4=84=E7=90=86=E3=80=82-=20=E7=B3=BB=E7=BB=9F=E6=8F=90?= =?UTF-8?q?=E7=A4=BA=E5=B7=B2=E6=9B=B4=E6=96=B0=EF=BC=8C=E6=98=8E=E7=A1=AE?= =?UTF-8?q?=E6=8C=87=E7=A4=BALLM=E4=BD=BF=E7=94=A8=E5=A4=9A=E4=B8=AA=20`re?= =?UTF-8?q?ply`=20=E6=93=8D=E4=BD=9C=EF=BC=8C=E5=B0=86=E9=95=BF=E5=93=8D?= =?UTF-8?q?=E5=BA=94=E6=8B=86=E5=88=86=E4=B8=BA=E6=9B=B4=E7=9F=AD=E3=80=81?= =?UTF-8?q?=E6=9B=B4=E8=87=AA=E7=84=B6=E7=9A=84=E6=AE=B5=E8=90=BD=EF=BC=8C?= =?UTF-8?q?=E6=A8=A1=E4=BB=BF=E7=9C=9F=E5=AE=9E=E7=9A=84=E4=BA=BA=E7=B1=BB?= =?UTF-8?q?=E6=B6=88=E6=81=AF=E6=A8=A1=E5=BC=8F=E3=80=82-=20=E6=AD=A4?= =?UTF-8?q?=E6=9B=B4=E6=94=B9=E5=85=81=E8=AE=B8=E8=BF=9B=E8=A1=8C=E6=9B=B4?= =?UTF-8?q?=E5=8A=A0=E4=B8=8A=E4=B8=8B=E6=96=87=E6=84=9F=E7=9F=A5=E5=92=8C?= =?UTF-8?q?=E6=83=85=E6=84=9F=E9=80=82=E5=AE=9C=E7=9A=84=E6=B6=88=E6=81=AF?= =?UTF-8?q?=E5=88=86=E6=AE=B5=EF=BC=8C=E4=BB=8E=E8=80=8C=E6=8F=90=E4=BE=9B?= =?UTF-8?q?=E6=9B=B4=E5=85=B7=E5=90=B8=E5=BC=95=E5=8A=9B=E7=9A=84=E7=94=A8?= =?UTF-8?q?=E6=88=B7=E4=BD=93=E9=AA=8C=E3=80=82**`VectorStore`=20=E7=9A=84?= =?UTF-8?q?=E5=BC=82=E6=AD=A5=E5=AE=89=E5=85=A8=E6=80=A7=EF=BC=9A**-=20?= =?UTF-8?q?=E6=89=80=E6=9C=89=E5=AF=B9=E5=90=8C=E6=AD=A5=20ChromaDB=20?= =?UTF-8?q?=E5=BA=93=E7=9A=84=E8=B0=83=E7=94=A8=E7=8E=B0=E5=9C=A8=E9=83=BD?= =?UTF-8?q?=E8=A2=AB=E5=B0=81=E8=A3=85=E5=9C=A8=20`asyncio.to=5Fthread()`?= =?UTF-8?q?=20=E4=B8=AD=E3=80=82=E8=BF=99=E5=8F=AF=E4=BB=A5=E9=98=B2?= =?UTF-8?q?=E6=AD=A2=E9=98=BB=E5=A1=9E=E4=B8=BB=20asyncio=20=E4=BA=8B?= =?UTF-8?q?=E4=BB=B6=E5=BE=AA=E7=8E=AF=EF=BC=8C=E8=80=8C=E8=BF=99=E6=AD=A3?= =?UTF-8?q?=E6=98=AF=E6=96=B0=E6=A3=80=E6=B5=8B=E5=99=A8=E8=AE=BE=E8=AE=A1?= =?UTF-8?q?=E7=94=A8=E6=9D=A5=E6=8D=95=E8=8E=B7=E7=9A=84=E6=BD=9C=E5=9C=A8?= =?UTF-8?q?=E6=AD=BB=E9=94=81=E6=9D=A5=E6=BA=90=E3=80=82"?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This reverts commit f489020a1219acc60b10247a5d0f8c0104c2fd9e. --- .../message_manager/distribution_manager.py | 130 +----------------- src/memory_graph/storage/vector_store.py | 128 ++++++----------- 2 files changed, 44 insertions(+), 214 deletions(-) diff --git a/src/chat/message_manager/distribution_manager.py b/src/chat/message_manager/distribution_manager.py index 6689d84ac..90c9929d1 100644 --- a/src/chat/message_manager/distribution_manager.py +++ b/src/chat/message_manager/distribution_manager.py @@ -55,11 +55,6 @@ class StreamLoopManager: # 流循环启动锁:防止并发启动同一个流的多个循环任务 self._stream_start_locks: dict[str, asyncio.Lock] = {} - - # 死锁检测:记录每个流的最后活动时间 - self._stream_last_activity: dict[str, float] = {} - self._deadlock_detector_task: asyncio.Task | None = None - self._deadlock_threshold_seconds: float = 120.0 # 2分钟无活动视为可能死锁 logger.info(f"流循环管理器初始化完成 (最大并发流数: {self.max_concurrent_streams})") @@ -70,60 +65,6 @@ class StreamLoopManager: return self.is_running = True - - # 启动死锁检测器 - self._deadlock_detector_task = asyncio.create_task( - self._deadlock_detector_loop(), - name="deadlock_detector" - ) - logger.info("死锁检测器已启动") - - async def _deadlock_detector_loop(self) -> None: - """死锁检测循环 - 定期检查所有流的活动状态""" - while self.is_running: - try: - await asyncio.sleep(30.0) # 每30秒检查一次 - - current_time = time.time() - suspected_deadlocks = [] - - # 检查所有活跃流的最后活动时间 - for stream_id, last_activity in list(self._stream_last_activity.items()): - inactive_seconds = current_time - last_activity - if inactive_seconds > self._deadlock_threshold_seconds: - suspected_deadlocks.append((stream_id, inactive_seconds)) - - if suspected_deadlocks: - logger.warning( - f"🔴 [死锁检测] 发现 {len(suspected_deadlocks)} 个可能卡住的流:\n" + - "\n".join([ - f" - stream={sid[:8]}, 无活动时间={inactive:.1f}s" - for sid, inactive in suspected_deadlocks - ]) - ) - - # 打印当前所有 asyncio 任务的状态 - all_tasks = asyncio.all_tasks() - stream_loop_tasks = [t for t in all_tasks if t.get_name().startswith("stream_loop_")] - logger.warning( - f"🔴 [死锁检测] 当前流循环任务状态:\n" + - "\n".join([ - f" - {t.get_name()}: done={t.done()}, cancelled={t.cancelled()}" - for t in stream_loop_tasks - ]) - ) - else: - # 每5分钟报告一次正常状态 - if int(current_time) % 300 < 30: - active_count = len(self._stream_last_activity) - if active_count > 0: - logger.info(f"🟢 [死锁检测] 所有 {active_count} 个流正常运行中") - - except asyncio.CancelledError: - logger.info("死锁检测器被取消") - break - except Exception as e: - logger.error(f"死锁检测器出错: {e}") async def stop(self) -> None: """停止流循环管理器""" @@ -131,15 +72,6 @@ class StreamLoopManager: return self.is_running = False - - # 停止死锁检测器 - if self._deadlock_detector_task and not self._deadlock_detector_task.done(): - self._deadlock_detector_task.cancel() - try: - await self._deadlock_detector_task - except asyncio.CancelledError: - pass - logger.info("死锁检测器已停止") # 取消所有流循环 try: @@ -285,24 +217,11 @@ class StreamLoopManager: """ task_id = id(asyncio.current_task()) logger.info(f"🔄 [流工作器] stream={stream_id[:8]}, 任务ID={task_id}, 启动") - - # 死锁检测:记录循环次数和上次活动时间 - loop_count = 0 - - # 注册到活动跟踪 - self._stream_last_activity[stream_id] = time.time() try: while self.is_running: - loop_count += 1 - loop_start_time = time.time() - - # 更新活动时间(死锁检测用) - self._stream_last_activity[stream_id] = loop_start_time - try: # 1. 获取流上下文 - logger.debug(f"🔍 [流工作器] stream={stream_id[:8]}, 循环#{loop_count}, 获取上下文...") context = await self._get_stream_context(stream_id) if not context: logger.warning(f"⚠️ [流工作器] stream={stream_id[:8]}, 无法获取流上下文") @@ -310,7 +229,6 @@ class StreamLoopManager: continue # 2. 检查是否有消息需要处理 - logger.debug(f"🔍 [流工作器] stream={stream_id[:8]}, 循环#{loop_count}, 刷新缓存消息...") await self._flush_cached_messages_to_unread(stream_id) unread_count = self._get_unread_count(context) force_dispatch = self._needs_force_dispatch_for_context(context, unread_count) @@ -342,36 +260,11 @@ class StreamLoopManager: logger.debug(f"更新流能量失败 {stream_id}: {e}") # 4. 激活chatter处理 - logger.debug(f"🔍 [流工作器] stream={stream_id[:8]}, 循环#{loop_count}, 开始chatter处理...") try: - # 在长时间处理期间定期更新活动时间,避免死锁检测误报 - async def process_with_activity_update(): - process_task = asyncio.create_task( - self._process_stream_messages(stream_id, context) - ) - activity_update_interval = 30.0 # 每30秒更新一次 - while not process_task.done(): - try: - # 等待任务完成或超时 - await asyncio.wait_for( - asyncio.shield(process_task), - timeout=activity_update_interval - ) - except asyncio.TimeoutError: - # 任务仍在运行,更新活动时间 - self._stream_last_activity[stream_id] = time.time() - logger.debug(f"🔄 [流工作器] stream={stream_id[:8]}, 处理中,更新活动时间") - return await process_task - - success = await asyncio.wait_for( - process_with_activity_update(), - global_config.chat.thinking_timeout - ) + success = await asyncio.wait_for(self._process_stream_messages(stream_id, context), global_config.chat.thinking_timeout) except asyncio.TimeoutError: logger.warning(f"⏱️ [流工作器] stream={stream_id[:8]}, 任务ID={task_id}, 处理超时") success = False - logger.debug(f"🔍 [流工作器] stream={stream_id[:8]}, 循环#{loop_count}, chatter处理完成, success={success}") - # 更新统计 self.stats["total_process_cycles"] += 1 if success: @@ -385,7 +278,6 @@ class StreamLoopManager: logger.debug(f"❌ [流工作器] stream={stream_id[:8]}, 任务ID={task_id}, 处理失败") # 5. 计算下次检查间隔 - logger.debug(f"🔍 [流工作器] stream={stream_id[:8]}, 循环#{loop_count}, 计算间隔...") interval = await self._calculate_interval(stream_id, has_messages) # 6. sleep等待下次检查 @@ -394,22 +286,7 @@ class StreamLoopManager: if last_interval is None or abs(interval - last_interval) > 0.01: logger.info(f"流 {stream_id} 等待周期变化: {interval:.2f}s") self._last_intervals[stream_id] = interval - - loop_duration = time.time() - loop_start_time - logger.debug(f"🔍 [流工作器] stream={stream_id[:8]}, 循环#{loop_count} 完成, 耗时={loop_duration:.2f}s, 即将sleep {interval:.2f}s") - - # 使用分段sleep,每隔一段时间更新活动时间,避免死锁检测误报 - # 当间隔较长时(如等待用户回复),分段更新活动时间 - remaining_sleep = interval - activity_update_interval = 30.0 # 每30秒更新一次活动时间 - while remaining_sleep > 0: - sleep_chunk = min(remaining_sleep, activity_update_interval) - await asyncio.sleep(sleep_chunk) - remaining_sleep -= sleep_chunk - # 更新活动时间,表明流仍在正常运行(只是在等待) - self._stream_last_activity[stream_id] = time.time() - - logger.debug(f"🔍 [流工作器] stream={stream_id[:8]}, 循环#{loop_count} sleep结束, 开始下一循环") + await asyncio.sleep(interval) except asyncio.CancelledError: logger.info(f"🛑 [流工作器] stream={stream_id[:8]}, 任务ID={task_id}, 被取消") @@ -431,9 +308,6 @@ class StreamLoopManager: # 清理间隔记录 self._last_intervals.pop(stream_id, None) - - # 清理活动跟踪 - self._stream_last_activity.pop(stream_id, None) logger.info(f"🏁 [流工作器] stream={stream_id[:8]}, 任务ID={task_id}, 循环结束") diff --git a/src/memory_graph/storage/vector_store.py b/src/memory_graph/storage/vector_store.py index b59ea1b83..ae79e9380 100644 --- a/src/memory_graph/storage/vector_store.py +++ b/src/memory_graph/storage/vector_store.py @@ -1,13 +1,9 @@ """ 向量存储层:基于 ChromaDB 的语义向量存储 - -注意:ChromaDB 是同步库,所有操作都必须使用 asyncio.to_thread() 包装 -以避免阻塞 asyncio 事件循环导致死锁。 """ from __future__ import annotations -import asyncio from pathlib import Path from typing import Any @@ -57,30 +53,22 @@ class VectorStore: import chromadb from chromadb.config import Settings - # 创建持久化客户端 - 同步操作需要在线程中执行 - def _create_client(): - return chromadb.PersistentClient( - path=str(self.data_dir / "chroma"), - settings=Settings( - anonymized_telemetry=False, - allow_reset=True, - ), - ) - - self.client = await asyncio.to_thread(_create_client) + # 创建持久化客户端 + self.client = chromadb.PersistentClient( + path=str(self.data_dir / "chroma"), + settings=Settings( + anonymized_telemetry=False, + allow_reset=True, + ), + ) - # 获取或创建集合 - 同步操作需要在线程中执行 - def _get_or_create_collection(): - return self.client.get_or_create_collection( - name=self.collection_name, - metadata={"description": "Memory graph node embeddings"}, - ) - - self.collection = await asyncio.to_thread(_get_or_create_collection) + # 获取或创建集合 + self.collection = self.client.get_or_create_collection( + name=self.collection_name, + metadata={"description": "Memory graph node embeddings"}, + ) - # count() 也是同步操作 - count = await asyncio.to_thread(self.collection.count) - logger.debug(f"ChromaDB 初始化完成,集合包含 {count} 个节点") + logger.debug(f"ChromaDB 初始化完成,集合包含 {self.collection.count()} 个节点") except Exception as e: logger.error(f"初始化 ChromaDB 失败: {e}") @@ -118,16 +106,12 @@ class VectorStore: else: metadata[key] = str(value) - # ChromaDB add() 是同步阻塞操作,必须在线程中执行 - def _add_node(): - self.collection.add( - ids=[node.id], - embeddings=[node.embedding.tolist()], - metadatas=[metadata], - documents=[node.content], - ) - - await asyncio.to_thread(_add_node) + self.collection.add( + ids=[node.id], + embeddings=[node.embedding.tolist()], + metadatas=[metadata], + documents=[node.content], # 文本内容用于检索 + ) logger.debug(f"添加节点到向量存储: {node}") @@ -171,16 +155,12 @@ class VectorStore: metadata[key] = str(value) metadatas.append(metadata) - # ChromaDB add() 是同步阻塞操作,必须在线程中执行 - def _add_batch(): - self.collection.add( - ids=[n.id for n in valid_nodes], - embeddings=[n.embedding.tolist() for n in valid_nodes], # type: ignore - metadatas=metadatas, - documents=[n.content for n in valid_nodes], - ) - - await asyncio.to_thread(_add_batch) + self.collection.add( + ids=[n.id for n in valid_nodes], + embeddings=[n.embedding.tolist() for n in valid_nodes], # type: ignore + metadatas=metadatas, + documents=[n.content for n in valid_nodes], + ) except Exception as e: logger.error(f"批量添加节点失败: {e}") @@ -214,15 +194,12 @@ class VectorStore: if node_types: where_filter = {"node_type": {"$in": [nt.value for nt in node_types]}} - # ChromaDB query() 是同步阻塞操作,必须在线程中执行 - def _query(): - return self.collection.query( - query_embeddings=[query_embedding.tolist()], - n_results=limit, - where=where_filter, - ) - - results = await asyncio.to_thread(_query) + # 执行查询 + results = self.collection.query( + query_embeddings=[query_embedding.tolist()], + n_results=limit, + where=where_filter, + ) # 解析结果 import orjson @@ -383,11 +360,7 @@ class VectorStore: raise RuntimeError("向量存储未初始化") try: - # ChromaDB get() 是同步阻塞操作,必须在线程中执行 - def _get(): - return self.collection.get(ids=[node_id], include=["metadatas", "embeddings"]) - - result = await asyncio.to_thread(_get) + result = self.collection.get(ids=[node_id], include=["metadatas", "embeddings"]) # 修复:直接检查 ids 列表是否非空(避免 numpy 数组的布尔值歧义) if result is not None: @@ -420,11 +393,7 @@ class VectorStore: raise RuntimeError("向量存储未初始化") try: - # ChromaDB delete() 是同步阻塞操作,必须在线程中执行 - def _delete(): - self.collection.delete(ids=[node_id]) - - await asyncio.to_thread(_delete) + self.collection.delete(ids=[node_id]) logger.debug(f"删除节点: {node_id}") except Exception as e: @@ -443,11 +412,7 @@ class VectorStore: raise RuntimeError("向量存储未初始化") try: - # ChromaDB update() 是同步阻塞操作,必须在线程中执行 - def _update(): - self.collection.update(ids=[node_id], embeddings=[embedding.tolist()]) - - await asyncio.to_thread(_update) + self.collection.update(ids=[node_id], embeddings=[embedding.tolist()]) logger.debug(f"更新节点 embedding: {node_id}") except Exception as e: @@ -455,16 +420,10 @@ class VectorStore: raise def get_total_count(self) -> int: - """获取向量存储中的节点总数(同步方法,谨慎在 async 上下文中使用)""" + """获取向量存储中的节点总数""" if not self.collection: return 0 return self.collection.count() - - async def get_total_count_async(self) -> int: - """异步获取向量存储中的节点总数""" - if not self.collection: - return 0 - return await asyncio.to_thread(self.collection.count) async def clear(self) -> None: """清空向量存储(危险操作,仅用于测试)""" @@ -472,15 +431,12 @@ class VectorStore: return try: - # ChromaDB delete_collection 和 get_or_create_collection 都是同步阻塞操作 - def _clear(): - self.client.delete_collection(self.collection_name) - return self.client.get_or_create_collection( - name=self.collection_name, - metadata={"description": "Memory graph node embeddings"}, - ) - - self.collection = await asyncio.to_thread(_clear) + # 删除并重新创建集合 + self.client.delete_collection(self.collection_name) + self.collection = self.client.get_or_create_collection( + name=self.collection_name, + metadata={"description": "Memory graph node embeddings"}, + ) logger.warning(f"向量存储已清空: {self.collection_name}") except Exception as e: