Revert "feat(core): 实现死锁检测器并改进 LLM 消息拆分 本次提交引入了两个主要增强功能:在 StreamLoopManager 中增加死锁检测机制以提高系统稳定性,以及对 Kokoro Flow Chatter (KFC) 的消息拆分策略进行调整,以生成更自然、更贴近人类的对话。 **StreamLoopManager 中的死锁检测:** - 新的死锁检测器现在会定期运行,监控所有活动消息流。 - 它会跟踪每个消息流的最后活动时间,并标记任何超过两分钟未活动的流为潜在死锁。 - 这种主动监控有助于识别和诊断可能卡住的消息流,防止系统整体冻结。 - 为了避免在长时间等待(例如等待用户回复或长时间 LLM 生成)期间出现误报,消息流循环现在即使在睡眠或处理阶段也会定期更新其活动时间戳。 **KFC 中的消息拆分优化:** - 自动,响应后处理器中的基于规则的消息拆分器已被禁用。- 消息拆分的责任现在完全交由大型语言模型(LLM)处理。- 系统提示已更新,明确指示LLM使用多个 reply 操作,将长响应拆分为更短、更自然的段落,模仿真实的人类消息模式。- 此更改允许进行更加上下文感知和情感适宜的消息分段,从而提供更具吸引力的用户体验。**VectorStore 的异步安全性:**- 所有对同步 ChromaDB 库的调用现在都被封装在 asyncio.to_thread() 中。这可以防止阻塞主 asyncio 事件循环,而这正是新检测器设计用来捕获的潜在死锁来源。"

This reverts commit f489020a12.
This commit is contained in:
tt-P607
2025-12-04 19:26:31 +08:00
parent 12e66a328c
commit 7f0494cbc3
2 changed files with 44 additions and 214 deletions

View File

@@ -56,11 +56,6 @@ class StreamLoopManager:
# 流循环启动锁:防止并发启动同一个流的多个循环任务 # 流循环启动锁:防止并发启动同一个流的多个循环任务
self._stream_start_locks: dict[str, asyncio.Lock] = {} self._stream_start_locks: dict[str, asyncio.Lock] = {}
# 死锁检测:记录每个流的最后活动时间
self._stream_last_activity: dict[str, float] = {}
self._deadlock_detector_task: asyncio.Task | None = None
self._deadlock_threshold_seconds: float = 120.0 # 2分钟无活动视为可能死锁
logger.info(f"流循环管理器初始化完成 (最大并发流数: {self.max_concurrent_streams})") logger.info(f"流循环管理器初始化完成 (最大并发流数: {self.max_concurrent_streams})")
async def start(self) -> None: async def start(self) -> None:
@@ -71,60 +66,6 @@ class StreamLoopManager:
self.is_running = True self.is_running = True
# 启动死锁检测器
self._deadlock_detector_task = asyncio.create_task(
self._deadlock_detector_loop(),
name="deadlock_detector"
)
logger.info("死锁检测器已启动")
async def _deadlock_detector_loop(self) -> None:
"""死锁检测循环 - 定期检查所有流的活动状态"""
while self.is_running:
try:
await asyncio.sleep(30.0) # 每30秒检查一次
current_time = time.time()
suspected_deadlocks = []
# 检查所有活跃流的最后活动时间
for stream_id, last_activity in list(self._stream_last_activity.items()):
inactive_seconds = current_time - last_activity
if inactive_seconds > self._deadlock_threshold_seconds:
suspected_deadlocks.append((stream_id, inactive_seconds))
if suspected_deadlocks:
logger.warning(
f"🔴 [死锁检测] 发现 {len(suspected_deadlocks)} 个可能卡住的流:\n" +
"\n".join([
f" - stream={sid[:8]}, 无活动时间={inactive:.1f}s"
for sid, inactive in suspected_deadlocks
])
)
# 打印当前所有 asyncio 任务的状态
all_tasks = asyncio.all_tasks()
stream_loop_tasks = [t for t in all_tasks if t.get_name().startswith("stream_loop_")]
logger.warning(
f"🔴 [死锁检测] 当前流循环任务状态:\n" +
"\n".join([
f" - {t.get_name()}: done={t.done()}, cancelled={t.cancelled()}"
for t in stream_loop_tasks
])
)
else:
# 每5分钟报告一次正常状态
if int(current_time) % 300 < 30:
active_count = len(self._stream_last_activity)
if active_count > 0:
logger.info(f"🟢 [死锁检测] 所有 {active_count} 个流正常运行中")
except asyncio.CancelledError:
logger.info("死锁检测器被取消")
break
except Exception as e:
logger.error(f"死锁检测器出错: {e}")
async def stop(self) -> None: async def stop(self) -> None:
"""停止流循环管理器""" """停止流循环管理器"""
if not self.is_running: if not self.is_running:
@@ -132,15 +73,6 @@ class StreamLoopManager:
self.is_running = False self.is_running = False
# 停止死锁检测器
if self._deadlock_detector_task and not self._deadlock_detector_task.done():
self._deadlock_detector_task.cancel()
try:
await self._deadlock_detector_task
except asyncio.CancelledError:
pass
logger.info("死锁检测器已停止")
# 取消所有流循环 # 取消所有流循环
try: try:
# 获取所有活跃的流 # 获取所有活跃的流
@@ -286,23 +218,10 @@ class StreamLoopManager:
task_id = id(asyncio.current_task()) task_id = id(asyncio.current_task())
logger.info(f"🔄 [流工作器] stream={stream_id[:8]}, 任务ID={task_id}, 启动") logger.info(f"🔄 [流工作器] stream={stream_id[:8]}, 任务ID={task_id}, 启动")
# 死锁检测:记录循环次数和上次活动时间
loop_count = 0
# 注册到活动跟踪
self._stream_last_activity[stream_id] = time.time()
try: try:
while self.is_running: while self.is_running:
loop_count += 1
loop_start_time = time.time()
# 更新活动时间(死锁检测用)
self._stream_last_activity[stream_id] = loop_start_time
try: try:
# 1. 获取流上下文 # 1. 获取流上下文
logger.debug(f"🔍 [流工作器] stream={stream_id[:8]}, 循环#{loop_count}, 获取上下文...")
context = await self._get_stream_context(stream_id) context = await self._get_stream_context(stream_id)
if not context: if not context:
logger.warning(f"⚠️ [流工作器] stream={stream_id[:8]}, 无法获取流上下文") logger.warning(f"⚠️ [流工作器] stream={stream_id[:8]}, 无法获取流上下文")
@@ -310,7 +229,6 @@ class StreamLoopManager:
continue continue
# 2. 检查是否有消息需要处理 # 2. 检查是否有消息需要处理
logger.debug(f"🔍 [流工作器] stream={stream_id[:8]}, 循环#{loop_count}, 刷新缓存消息...")
await self._flush_cached_messages_to_unread(stream_id) await self._flush_cached_messages_to_unread(stream_id)
unread_count = self._get_unread_count(context) unread_count = self._get_unread_count(context)
force_dispatch = self._needs_force_dispatch_for_context(context, unread_count) force_dispatch = self._needs_force_dispatch_for_context(context, unread_count)
@@ -342,36 +260,11 @@ class StreamLoopManager:
logger.debug(f"更新流能量失败 {stream_id}: {e}") logger.debug(f"更新流能量失败 {stream_id}: {e}")
# 4. 激活chatter处理 # 4. 激活chatter处理
logger.debug(f"🔍 [流工作器] stream={stream_id[:8]}, 循环#{loop_count}, 开始chatter处理...")
try: try:
# 在长时间处理期间定期更新活动时间,避免死锁检测误报 success = await asyncio.wait_for(self._process_stream_messages(stream_id, context), global_config.chat.thinking_timeout)
async def process_with_activity_update():
process_task = asyncio.create_task(
self._process_stream_messages(stream_id, context)
)
activity_update_interval = 30.0 # 每30秒更新一次
while not process_task.done():
try:
# 等待任务完成或超时
await asyncio.wait_for(
asyncio.shield(process_task),
timeout=activity_update_interval
)
except asyncio.TimeoutError:
# 任务仍在运行,更新活动时间
self._stream_last_activity[stream_id] = time.time()
logger.debug(f"🔄 [流工作器] stream={stream_id[:8]}, 处理中,更新活动时间")
return await process_task
success = await asyncio.wait_for(
process_with_activity_update(),
global_config.chat.thinking_timeout
)
except asyncio.TimeoutError: except asyncio.TimeoutError:
logger.warning(f"⏱️ [流工作器] stream={stream_id[:8]}, 任务ID={task_id}, 处理超时") logger.warning(f"⏱️ [流工作器] stream={stream_id[:8]}, 任务ID={task_id}, 处理超时")
success = False success = False
logger.debug(f"🔍 [流工作器] stream={stream_id[:8]}, 循环#{loop_count}, chatter处理完成, success={success}")
# 更新统计 # 更新统计
self.stats["total_process_cycles"] += 1 self.stats["total_process_cycles"] += 1
if success: if success:
@@ -385,7 +278,6 @@ class StreamLoopManager:
logger.debug(f"❌ [流工作器] stream={stream_id[:8]}, 任务ID={task_id}, 处理失败") logger.debug(f"❌ [流工作器] stream={stream_id[:8]}, 任务ID={task_id}, 处理失败")
# 5. 计算下次检查间隔 # 5. 计算下次检查间隔
logger.debug(f"🔍 [流工作器] stream={stream_id[:8]}, 循环#{loop_count}, 计算间隔...")
interval = await self._calculate_interval(stream_id, has_messages) interval = await self._calculate_interval(stream_id, has_messages)
# 6. sleep等待下次检查 # 6. sleep等待下次检查
@@ -394,22 +286,7 @@ class StreamLoopManager:
if last_interval is None or abs(interval - last_interval) > 0.01: if last_interval is None or abs(interval - last_interval) > 0.01:
logger.info(f"{stream_id} 等待周期变化: {interval:.2f}s") logger.info(f"{stream_id} 等待周期变化: {interval:.2f}s")
self._last_intervals[stream_id] = interval self._last_intervals[stream_id] = interval
await asyncio.sleep(interval)
loop_duration = time.time() - loop_start_time
logger.debug(f"🔍 [流工作器] stream={stream_id[:8]}, 循环#{loop_count} 完成, 耗时={loop_duration:.2f}s, 即将sleep {interval:.2f}s")
# 使用分段sleep每隔一段时间更新活动时间避免死锁检测误报
# 当间隔较长时(如等待用户回复),分段更新活动时间
remaining_sleep = interval
activity_update_interval = 30.0 # 每30秒更新一次活动时间
while remaining_sleep > 0:
sleep_chunk = min(remaining_sleep, activity_update_interval)
await asyncio.sleep(sleep_chunk)
remaining_sleep -= sleep_chunk
# 更新活动时间,表明流仍在正常运行(只是在等待)
self._stream_last_activity[stream_id] = time.time()
logger.debug(f"🔍 [流工作器] stream={stream_id[:8]}, 循环#{loop_count} sleep结束, 开始下一循环")
except asyncio.CancelledError: except asyncio.CancelledError:
logger.info(f"🛑 [流工作器] stream={stream_id[:8]}, 任务ID={task_id}, 被取消") logger.info(f"🛑 [流工作器] stream={stream_id[:8]}, 任务ID={task_id}, 被取消")
@@ -432,9 +309,6 @@ class StreamLoopManager:
# 清理间隔记录 # 清理间隔记录
self._last_intervals.pop(stream_id, None) self._last_intervals.pop(stream_id, None)
# 清理活动跟踪
self._stream_last_activity.pop(stream_id, None)
logger.info(f"🏁 [流工作器] stream={stream_id[:8]}, 任务ID={task_id}, 循环结束") logger.info(f"🏁 [流工作器] stream={stream_id[:8]}, 任务ID={task_id}, 循环结束")
async def _get_stream_context(self, stream_id: str) -> "StreamContext | None": async def _get_stream_context(self, stream_id: str) -> "StreamContext | None":

View File

@@ -1,13 +1,9 @@
""" """
向量存储层:基于 ChromaDB 的语义向量存储 向量存储层:基于 ChromaDB 的语义向量存储
注意ChromaDB 是同步库,所有操作都必须使用 asyncio.to_thread() 包装
以避免阻塞 asyncio 事件循环导致死锁。
""" """
from __future__ import annotations from __future__ import annotations
import asyncio
from pathlib import Path from pathlib import Path
from typing import Any from typing import Any
@@ -57,30 +53,22 @@ class VectorStore:
import chromadb import chromadb
from chromadb.config import Settings from chromadb.config import Settings
# 创建持久化客户端 - 同步操作需要在线程中执行 # 创建持久化客户端
def _create_client(): self.client = chromadb.PersistentClient(
return chromadb.PersistentClient( path=str(self.data_dir / "chroma"),
path=str(self.data_dir / "chroma"), settings=Settings(
settings=Settings( anonymized_telemetry=False,
anonymized_telemetry=False, allow_reset=True,
allow_reset=True, ),
), )
)
self.client = await asyncio.to_thread(_create_client) # 获取或创建集合
self.collection = self.client.get_or_create_collection(
name=self.collection_name,
metadata={"description": "Memory graph node embeddings"},
)
# 获取或创建集合 - 同步操作需要在线程中执行 logger.debug(f"ChromaDB 初始化完成,集合包含 {self.collection.count()} 个节点")
def _get_or_create_collection():
return self.client.get_or_create_collection(
name=self.collection_name,
metadata={"description": "Memory graph node embeddings"},
)
self.collection = await asyncio.to_thread(_get_or_create_collection)
# count() 也是同步操作
count = await asyncio.to_thread(self.collection.count)
logger.debug(f"ChromaDB 初始化完成,集合包含 {count} 个节点")
except Exception as e: except Exception as e:
logger.error(f"初始化 ChromaDB 失败: {e}") logger.error(f"初始化 ChromaDB 失败: {e}")
@@ -118,16 +106,12 @@ class VectorStore:
else: else:
metadata[key] = str(value) metadata[key] = str(value)
# ChromaDB add() 是同步阻塞操作,必须在线程中执行 self.collection.add(
def _add_node(): ids=[node.id],
self.collection.add( embeddings=[node.embedding.tolist()],
ids=[node.id], metadatas=[metadata],
embeddings=[node.embedding.tolist()], documents=[node.content], # 文本内容用于检索
metadatas=[metadata], )
documents=[node.content],
)
await asyncio.to_thread(_add_node)
logger.debug(f"添加节点到向量存储: {node}") logger.debug(f"添加节点到向量存储: {node}")
@@ -171,16 +155,12 @@ class VectorStore:
metadata[key] = str(value) metadata[key] = str(value)
metadatas.append(metadata) metadatas.append(metadata)
# ChromaDB add() 是同步阻塞操作,必须在线程中执行 self.collection.add(
def _add_batch(): ids=[n.id for n in valid_nodes],
self.collection.add( embeddings=[n.embedding.tolist() for n in valid_nodes], # type: ignore
ids=[n.id for n in valid_nodes], metadatas=metadatas,
embeddings=[n.embedding.tolist() for n in valid_nodes], # type: ignore documents=[n.content for n in valid_nodes],
metadatas=metadatas, )
documents=[n.content for n in valid_nodes],
)
await asyncio.to_thread(_add_batch)
except Exception as e: except Exception as e:
logger.error(f"批量添加节点失败: {e}") logger.error(f"批量添加节点失败: {e}")
@@ -214,15 +194,12 @@ class VectorStore:
if node_types: if node_types:
where_filter = {"node_type": {"$in": [nt.value for nt in node_types]}} where_filter = {"node_type": {"$in": [nt.value for nt in node_types]}}
# ChromaDB query() 是同步阻塞操作,必须在线程中执行 # 执行查询
def _query(): results = self.collection.query(
return self.collection.query( query_embeddings=[query_embedding.tolist()],
query_embeddings=[query_embedding.tolist()], n_results=limit,
n_results=limit, where=where_filter,
where=where_filter, )
)
results = await asyncio.to_thread(_query)
# 解析结果 # 解析结果
import orjson import orjson
@@ -383,11 +360,7 @@ class VectorStore:
raise RuntimeError("向量存储未初始化") raise RuntimeError("向量存储未初始化")
try: try:
# ChromaDB get() 是同步阻塞操作,必须在线程中执行 result = self.collection.get(ids=[node_id], include=["metadatas", "embeddings"])
def _get():
return self.collection.get(ids=[node_id], include=["metadatas", "embeddings"])
result = await asyncio.to_thread(_get)
# 修复:直接检查 ids 列表是否非空(避免 numpy 数组的布尔值歧义) # 修复:直接检查 ids 列表是否非空(避免 numpy 数组的布尔值歧义)
if result is not None: if result is not None:
@@ -420,11 +393,7 @@ class VectorStore:
raise RuntimeError("向量存储未初始化") raise RuntimeError("向量存储未初始化")
try: try:
# ChromaDB delete() 是同步阻塞操作,必须在线程中执行 self.collection.delete(ids=[node_id])
def _delete():
self.collection.delete(ids=[node_id])
await asyncio.to_thread(_delete)
logger.debug(f"删除节点: {node_id}") logger.debug(f"删除节点: {node_id}")
except Exception as e: except Exception as e:
@@ -443,11 +412,7 @@ class VectorStore:
raise RuntimeError("向量存储未初始化") raise RuntimeError("向量存储未初始化")
try: try:
# ChromaDB update() 是同步阻塞操作,必须在线程中执行 self.collection.update(ids=[node_id], embeddings=[embedding.tolist()])
def _update():
self.collection.update(ids=[node_id], embeddings=[embedding.tolist()])
await asyncio.to_thread(_update)
logger.debug(f"更新节点 embedding: {node_id}") logger.debug(f"更新节点 embedding: {node_id}")
except Exception as e: except Exception as e:
@@ -455,32 +420,23 @@ class VectorStore:
raise raise
def get_total_count(self) -> int: def get_total_count(self) -> int:
"""获取向量存储中的节点总数(同步方法,谨慎在 async 上下文中使用)""" """获取向量存储中的节点总数"""
if not self.collection: if not self.collection:
return 0 return 0
return self.collection.count() return self.collection.count()
async def get_total_count_async(self) -> int:
"""异步获取向量存储中的节点总数"""
if not self.collection:
return 0
return await asyncio.to_thread(self.collection.count)
async def clear(self) -> None: async def clear(self) -> None:
"""清空向量存储(危险操作,仅用于测试)""" """清空向量存储(危险操作,仅用于测试)"""
if not self.collection: if not self.collection:
return return
try: try:
# ChromaDB delete_collection 和 get_or_create_collection 都是同步阻塞操作 # 删除并重新创建集合
def _clear(): self.client.delete_collection(self.collection_name)
self.client.delete_collection(self.collection_name) self.collection = self.client.get_or_create_collection(
return self.client.get_or_create_collection( name=self.collection_name,
name=self.collection_name, metadata={"description": "Memory graph node embeddings"},
metadata={"description": "Memory graph node embeddings"}, )
)
self.collection = await asyncio.to_thread(_clear)
logger.warning(f"向量存储已清空: {self.collection_name}") logger.warning(f"向量存储已清空: {self.collection_name}")
except Exception as e: except Exception as e: