diff --git a/src/chat/message_manager/distribution_manager.py b/src/chat/message_manager/distribution_manager.py index 6689d84ac..90c9929d1 100644 --- a/src/chat/message_manager/distribution_manager.py +++ b/src/chat/message_manager/distribution_manager.py @@ -55,11 +55,6 @@ class StreamLoopManager: # 流循环启动锁:防止并发启动同一个流的多个循环任务 self._stream_start_locks: dict[str, asyncio.Lock] = {} - - # 死锁检测:记录每个流的最后活动时间 - self._stream_last_activity: dict[str, float] = {} - self._deadlock_detector_task: asyncio.Task | None = None - self._deadlock_threshold_seconds: float = 120.0 # 2分钟无活动视为可能死锁 logger.info(f"流循环管理器初始化完成 (最大并发流数: {self.max_concurrent_streams})") @@ -70,60 +65,6 @@ class StreamLoopManager: return self.is_running = True - - # 启动死锁检测器 - self._deadlock_detector_task = asyncio.create_task( - self._deadlock_detector_loop(), - name="deadlock_detector" - ) - logger.info("死锁检测器已启动") - - async def _deadlock_detector_loop(self) -> None: - """死锁检测循环 - 定期检查所有流的活动状态""" - while self.is_running: - try: - await asyncio.sleep(30.0) # 每30秒检查一次 - - current_time = time.time() - suspected_deadlocks = [] - - # 检查所有活跃流的最后活动时间 - for stream_id, last_activity in list(self._stream_last_activity.items()): - inactive_seconds = current_time - last_activity - if inactive_seconds > self._deadlock_threshold_seconds: - suspected_deadlocks.append((stream_id, inactive_seconds)) - - if suspected_deadlocks: - logger.warning( - f"🔴 [死锁检测] 发现 {len(suspected_deadlocks)} 个可能卡住的流:\n" + - "\n".join([ - f" - stream={sid[:8]}, 无活动时间={inactive:.1f}s" - for sid, inactive in suspected_deadlocks - ]) - ) - - # 打印当前所有 asyncio 任务的状态 - all_tasks = asyncio.all_tasks() - stream_loop_tasks = [t for t in all_tasks if t.get_name().startswith("stream_loop_")] - logger.warning( - f"🔴 [死锁检测] 当前流循环任务状态:\n" + - "\n".join([ - f" - {t.get_name()}: done={t.done()}, cancelled={t.cancelled()}" - for t in stream_loop_tasks - ]) - ) - else: - # 每5分钟报告一次正常状态 - if int(current_time) % 300 < 30: - active_count = len(self._stream_last_activity) - if active_count > 0: - logger.info(f"🟢 [死锁检测] 所有 {active_count} 个流正常运行中") - - except asyncio.CancelledError: - logger.info("死锁检测器被取消") - break - except Exception as e: - logger.error(f"死锁检测器出错: {e}") async def stop(self) -> None: """停止流循环管理器""" @@ -131,15 +72,6 @@ class StreamLoopManager: return self.is_running = False - - # 停止死锁检测器 - if self._deadlock_detector_task and not self._deadlock_detector_task.done(): - self._deadlock_detector_task.cancel() - try: - await self._deadlock_detector_task - except asyncio.CancelledError: - pass - logger.info("死锁检测器已停止") # 取消所有流循环 try: @@ -285,24 +217,11 @@ class StreamLoopManager: """ task_id = id(asyncio.current_task()) logger.info(f"🔄 [流工作器] stream={stream_id[:8]}, 任务ID={task_id}, 启动") - - # 死锁检测:记录循环次数和上次活动时间 - loop_count = 0 - - # 注册到活动跟踪 - self._stream_last_activity[stream_id] = time.time() try: while self.is_running: - loop_count += 1 - loop_start_time = time.time() - - # 更新活动时间(死锁检测用) - self._stream_last_activity[stream_id] = loop_start_time - try: # 1. 获取流上下文 - logger.debug(f"🔍 [流工作器] stream={stream_id[:8]}, 循环#{loop_count}, 获取上下文...") context = await self._get_stream_context(stream_id) if not context: logger.warning(f"⚠️ [流工作器] stream={stream_id[:8]}, 无法获取流上下文") @@ -310,7 +229,6 @@ class StreamLoopManager: continue # 2. 检查是否有消息需要处理 - logger.debug(f"🔍 [流工作器] stream={stream_id[:8]}, 循环#{loop_count}, 刷新缓存消息...") await self._flush_cached_messages_to_unread(stream_id) unread_count = self._get_unread_count(context) force_dispatch = self._needs_force_dispatch_for_context(context, unread_count) @@ -342,36 +260,11 @@ class StreamLoopManager: logger.debug(f"更新流能量失败 {stream_id}: {e}") # 4. 激活chatter处理 - logger.debug(f"🔍 [流工作器] stream={stream_id[:8]}, 循环#{loop_count}, 开始chatter处理...") try: - # 在长时间处理期间定期更新活动时间,避免死锁检测误报 - async def process_with_activity_update(): - process_task = asyncio.create_task( - self._process_stream_messages(stream_id, context) - ) - activity_update_interval = 30.0 # 每30秒更新一次 - while not process_task.done(): - try: - # 等待任务完成或超时 - await asyncio.wait_for( - asyncio.shield(process_task), - timeout=activity_update_interval - ) - except asyncio.TimeoutError: - # 任务仍在运行,更新活动时间 - self._stream_last_activity[stream_id] = time.time() - logger.debug(f"🔄 [流工作器] stream={stream_id[:8]}, 处理中,更新活动时间") - return await process_task - - success = await asyncio.wait_for( - process_with_activity_update(), - global_config.chat.thinking_timeout - ) + success = await asyncio.wait_for(self._process_stream_messages(stream_id, context), global_config.chat.thinking_timeout) except asyncio.TimeoutError: logger.warning(f"⏱️ [流工作器] stream={stream_id[:8]}, 任务ID={task_id}, 处理超时") success = False - logger.debug(f"🔍 [流工作器] stream={stream_id[:8]}, 循环#{loop_count}, chatter处理完成, success={success}") - # 更新统计 self.stats["total_process_cycles"] += 1 if success: @@ -385,7 +278,6 @@ class StreamLoopManager: logger.debug(f"❌ [流工作器] stream={stream_id[:8]}, 任务ID={task_id}, 处理失败") # 5. 计算下次检查间隔 - logger.debug(f"🔍 [流工作器] stream={stream_id[:8]}, 循环#{loop_count}, 计算间隔...") interval = await self._calculate_interval(stream_id, has_messages) # 6. sleep等待下次检查 @@ -394,22 +286,7 @@ class StreamLoopManager: if last_interval is None or abs(interval - last_interval) > 0.01: logger.info(f"流 {stream_id} 等待周期变化: {interval:.2f}s") self._last_intervals[stream_id] = interval - - loop_duration = time.time() - loop_start_time - logger.debug(f"🔍 [流工作器] stream={stream_id[:8]}, 循环#{loop_count} 完成, 耗时={loop_duration:.2f}s, 即将sleep {interval:.2f}s") - - # 使用分段sleep,每隔一段时间更新活动时间,避免死锁检测误报 - # 当间隔较长时(如等待用户回复),分段更新活动时间 - remaining_sleep = interval - activity_update_interval = 30.0 # 每30秒更新一次活动时间 - while remaining_sleep > 0: - sleep_chunk = min(remaining_sleep, activity_update_interval) - await asyncio.sleep(sleep_chunk) - remaining_sleep -= sleep_chunk - # 更新活动时间,表明流仍在正常运行(只是在等待) - self._stream_last_activity[stream_id] = time.time() - - logger.debug(f"🔍 [流工作器] stream={stream_id[:8]}, 循环#{loop_count} sleep结束, 开始下一循环") + await asyncio.sleep(interval) except asyncio.CancelledError: logger.info(f"🛑 [流工作器] stream={stream_id[:8]}, 任务ID={task_id}, 被取消") @@ -431,9 +308,6 @@ class StreamLoopManager: # 清理间隔记录 self._last_intervals.pop(stream_id, None) - - # 清理活动跟踪 - self._stream_last_activity.pop(stream_id, None) logger.info(f"🏁 [流工作器] stream={stream_id[:8]}, 任务ID={task_id}, 循环结束") diff --git a/src/memory_graph/storage/vector_store.py b/src/memory_graph/storage/vector_store.py index b59ea1b83..ae79e9380 100644 --- a/src/memory_graph/storage/vector_store.py +++ b/src/memory_graph/storage/vector_store.py @@ -1,13 +1,9 @@ """ 向量存储层:基于 ChromaDB 的语义向量存储 - -注意:ChromaDB 是同步库,所有操作都必须使用 asyncio.to_thread() 包装 -以避免阻塞 asyncio 事件循环导致死锁。 """ from __future__ import annotations -import asyncio from pathlib import Path from typing import Any @@ -57,30 +53,22 @@ class VectorStore: import chromadb from chromadb.config import Settings - # 创建持久化客户端 - 同步操作需要在线程中执行 - def _create_client(): - return chromadb.PersistentClient( - path=str(self.data_dir / "chroma"), - settings=Settings( - anonymized_telemetry=False, - allow_reset=True, - ), - ) - - self.client = await asyncio.to_thread(_create_client) + # 创建持久化客户端 + self.client = chromadb.PersistentClient( + path=str(self.data_dir / "chroma"), + settings=Settings( + anonymized_telemetry=False, + allow_reset=True, + ), + ) - # 获取或创建集合 - 同步操作需要在线程中执行 - def _get_or_create_collection(): - return self.client.get_or_create_collection( - name=self.collection_name, - metadata={"description": "Memory graph node embeddings"}, - ) - - self.collection = await asyncio.to_thread(_get_or_create_collection) + # 获取或创建集合 + self.collection = self.client.get_or_create_collection( + name=self.collection_name, + metadata={"description": "Memory graph node embeddings"}, + ) - # count() 也是同步操作 - count = await asyncio.to_thread(self.collection.count) - logger.debug(f"ChromaDB 初始化完成,集合包含 {count} 个节点") + logger.debug(f"ChromaDB 初始化完成,集合包含 {self.collection.count()} 个节点") except Exception as e: logger.error(f"初始化 ChromaDB 失败: {e}") @@ -118,16 +106,12 @@ class VectorStore: else: metadata[key] = str(value) - # ChromaDB add() 是同步阻塞操作,必须在线程中执行 - def _add_node(): - self.collection.add( - ids=[node.id], - embeddings=[node.embedding.tolist()], - metadatas=[metadata], - documents=[node.content], - ) - - await asyncio.to_thread(_add_node) + self.collection.add( + ids=[node.id], + embeddings=[node.embedding.tolist()], + metadatas=[metadata], + documents=[node.content], # 文本内容用于检索 + ) logger.debug(f"添加节点到向量存储: {node}") @@ -171,16 +155,12 @@ class VectorStore: metadata[key] = str(value) metadatas.append(metadata) - # ChromaDB add() 是同步阻塞操作,必须在线程中执行 - def _add_batch(): - self.collection.add( - ids=[n.id for n in valid_nodes], - embeddings=[n.embedding.tolist() for n in valid_nodes], # type: ignore - metadatas=metadatas, - documents=[n.content for n in valid_nodes], - ) - - await asyncio.to_thread(_add_batch) + self.collection.add( + ids=[n.id for n in valid_nodes], + embeddings=[n.embedding.tolist() for n in valid_nodes], # type: ignore + metadatas=metadatas, + documents=[n.content for n in valid_nodes], + ) except Exception as e: logger.error(f"批量添加节点失败: {e}") @@ -214,15 +194,12 @@ class VectorStore: if node_types: where_filter = {"node_type": {"$in": [nt.value for nt in node_types]}} - # ChromaDB query() 是同步阻塞操作,必须在线程中执行 - def _query(): - return self.collection.query( - query_embeddings=[query_embedding.tolist()], - n_results=limit, - where=where_filter, - ) - - results = await asyncio.to_thread(_query) + # 执行查询 + results = self.collection.query( + query_embeddings=[query_embedding.tolist()], + n_results=limit, + where=where_filter, + ) # 解析结果 import orjson @@ -383,11 +360,7 @@ class VectorStore: raise RuntimeError("向量存储未初始化") try: - # ChromaDB get() 是同步阻塞操作,必须在线程中执行 - def _get(): - return self.collection.get(ids=[node_id], include=["metadatas", "embeddings"]) - - result = await asyncio.to_thread(_get) + result = self.collection.get(ids=[node_id], include=["metadatas", "embeddings"]) # 修复:直接检查 ids 列表是否非空(避免 numpy 数组的布尔值歧义) if result is not None: @@ -420,11 +393,7 @@ class VectorStore: raise RuntimeError("向量存储未初始化") try: - # ChromaDB delete() 是同步阻塞操作,必须在线程中执行 - def _delete(): - self.collection.delete(ids=[node_id]) - - await asyncio.to_thread(_delete) + self.collection.delete(ids=[node_id]) logger.debug(f"删除节点: {node_id}") except Exception as e: @@ -443,11 +412,7 @@ class VectorStore: raise RuntimeError("向量存储未初始化") try: - # ChromaDB update() 是同步阻塞操作,必须在线程中执行 - def _update(): - self.collection.update(ids=[node_id], embeddings=[embedding.tolist()]) - - await asyncio.to_thread(_update) + self.collection.update(ids=[node_id], embeddings=[embedding.tolist()]) logger.debug(f"更新节点 embedding: {node_id}") except Exception as e: @@ -455,16 +420,10 @@ class VectorStore: raise def get_total_count(self) -> int: - """获取向量存储中的节点总数(同步方法,谨慎在 async 上下文中使用)""" + """获取向量存储中的节点总数""" if not self.collection: return 0 return self.collection.count() - - async def get_total_count_async(self) -> int: - """异步获取向量存储中的节点总数""" - if not self.collection: - return 0 - return await asyncio.to_thread(self.collection.count) async def clear(self) -> None: """清空向量存储(危险操作,仅用于测试)""" @@ -472,15 +431,12 @@ class VectorStore: return try: - # ChromaDB delete_collection 和 get_or_create_collection 都是同步阻塞操作 - def _clear(): - self.client.delete_collection(self.collection_name) - return self.client.get_or_create_collection( - name=self.collection_name, - metadata={"description": "Memory graph node embeddings"}, - ) - - self.collection = await asyncio.to_thread(_clear) + # 删除并重新创建集合 + self.client.delete_collection(self.collection_name) + self.collection = self.client.get_or_create_collection( + name=self.collection_name, + metadata={"description": "Memory graph node embeddings"}, + ) logger.warning(f"向量存储已清空: {self.collection_name}") except Exception as e: