diff --git a/src/chat/message_manager/distribution_manager.py b/src/chat/message_manager/distribution_manager.py index 674f0ac00..351e61a64 100644 --- a/src/chat/message_manager/distribution_manager.py +++ b/src/chat/message_manager/distribution_manager.py @@ -52,6 +52,11 @@ class StreamLoopManager: # 流循环启动锁:防止并发启动同一个流的多个循环任务 self._stream_start_locks: dict[str, asyncio.Lock] = {} + + # 死锁检测:记录每个流的最后活动时间 + self._stream_last_activity: dict[str, float] = {} + self._deadlock_detector_task: asyncio.Task | None = None + self._deadlock_threshold_seconds: float = 120.0 # 2分钟无活动视为可能死锁 logger.info(f"流循环管理器初始化完成 (最大并发流数: {self.max_concurrent_streams})") @@ -62,6 +67,60 @@ class StreamLoopManager: return self.is_running = True + + # 启动死锁检测器 + self._deadlock_detector_task = asyncio.create_task( + self._deadlock_detector_loop(), + name="deadlock_detector" + ) + logger.info("死锁检测器已启动") + + async def _deadlock_detector_loop(self) -> None: + """死锁检测循环 - 定期检查所有流的活动状态""" + while self.is_running: + try: + await asyncio.sleep(30.0) # 每30秒检查一次 + + current_time = time.time() + suspected_deadlocks = [] + + # 检查所有活跃流的最后活动时间 + for stream_id, last_activity in list(self._stream_last_activity.items()): + inactive_seconds = current_time - last_activity + if inactive_seconds > self._deadlock_threshold_seconds: + suspected_deadlocks.append((stream_id, inactive_seconds)) + + if suspected_deadlocks: + logger.warning( + f"🔴 [死锁检测] 发现 {len(suspected_deadlocks)} 个可能卡住的流:\n" + + "\n".join([ + f" - stream={sid[:8]}, 无活动时间={inactive:.1f}s" + for sid, inactive in suspected_deadlocks + ]) + ) + + # 打印当前所有 asyncio 任务的状态 + all_tasks = asyncio.all_tasks() + stream_loop_tasks = [t for t in all_tasks if t.get_name().startswith("stream_loop_")] + logger.warning( + f"🔴 [死锁检测] 当前流循环任务状态:\n" + + "\n".join([ + f" - {t.get_name()}: done={t.done()}, cancelled={t.cancelled()}" + for t in stream_loop_tasks + ]) + ) + else: + # 每5分钟报告一次正常状态 + if int(current_time) % 300 < 30: + active_count = len(self._stream_last_activity) + if active_count > 0: + logger.info(f"🟢 [死锁检测] 所有 {active_count} 个流正常运行中") + + except asyncio.CancelledError: + logger.info("死锁检测器被取消") + break + except Exception as e: + logger.error(f"死锁检测器出错: {e}") async def stop(self) -> None: """停止流循环管理器""" @@ -69,6 +128,15 @@ class StreamLoopManager: return self.is_running = False + + # 停止死锁检测器 + if self._deadlock_detector_task and not self._deadlock_detector_task.done(): + self._deadlock_detector_task.cancel() + try: + await self._deadlock_detector_task + except asyncio.CancelledError: + pass + logger.info("死锁检测器已停止") # 取消所有流循环 try: @@ -214,11 +282,24 @@ class StreamLoopManager: """ task_id = id(asyncio.current_task()) logger.info(f"🔄 [流工作器] stream={stream_id[:8]}, 任务ID={task_id}, 启动") + + # 死锁检测:记录循环次数和上次活动时间 + loop_count = 0 + + # 注册到活动跟踪 + self._stream_last_activity[stream_id] = time.time() try: while self.is_running: + loop_count += 1 + loop_start_time = time.time() + + # 更新活动时间(死锁检测用) + self._stream_last_activity[stream_id] = loop_start_time + try: # 1. 获取流上下文 + logger.debug(f"🔍 [流工作器] stream={stream_id[:8]}, 循环#{loop_count}, 获取上下文...") context = await self._get_stream_context(stream_id) if not context: logger.warning(f"⚠️ [流工作器] stream={stream_id[:8]}, 无法获取流上下文") @@ -226,6 +307,7 @@ class StreamLoopManager: continue # 2. 检查是否有消息需要处理 + logger.debug(f"🔍 [流工作器] stream={stream_id[:8]}, 循环#{loop_count}, 刷新缓存消息...") await self._flush_cached_messages_to_unread(stream_id) unread_count = self._get_unread_count(context) force_dispatch = self._needs_force_dispatch_for_context(context, unread_count) @@ -245,11 +327,36 @@ class StreamLoopManager: logger.debug(f"更新流能量失败 {stream_id}: {e}") # 4. 激活chatter处理 + logger.debug(f"🔍 [流工作器] stream={stream_id[:8]}, 循环#{loop_count}, 开始chatter处理...") try: - success = await asyncio.wait_for(self._process_stream_messages(stream_id, context), global_config.chat.thinking_timeout) + # 在长时间处理期间定期更新活动时间,避免死锁检测误报 + async def process_with_activity_update(): + process_task = asyncio.create_task( + self._process_stream_messages(stream_id, context) + ) + activity_update_interval = 30.0 # 每30秒更新一次 + while not process_task.done(): + try: + # 等待任务完成或超时 + await asyncio.wait_for( + asyncio.shield(process_task), + timeout=activity_update_interval + ) + except asyncio.TimeoutError: + # 任务仍在运行,更新活动时间 + self._stream_last_activity[stream_id] = time.time() + logger.debug(f"🔄 [流工作器] stream={stream_id[:8]}, 处理中,更新活动时间") + return await process_task + + success = await asyncio.wait_for( + process_with_activity_update(), + global_config.chat.thinking_timeout + ) except asyncio.TimeoutError: logger.warning(f"⏱️ [流工作器] stream={stream_id[:8]}, 任务ID={task_id}, 处理超时") success = False + logger.debug(f"🔍 [流工作器] stream={stream_id[:8]}, 循环#{loop_count}, chatter处理完成, success={success}") + # 更新统计 self.stats["total_process_cycles"] += 1 if success: @@ -263,6 +370,7 @@ class StreamLoopManager: logger.warning(f"❌ [流工作器] stream={stream_id[:8]}, 任务ID={task_id}, 处理失败") # 5. 计算下次检查间隔 + logger.debug(f"🔍 [流工作器] stream={stream_id[:8]}, 循环#{loop_count}, 计算间隔...") interval = await self._calculate_interval(stream_id, has_messages) # 6. sleep等待下次检查 @@ -271,7 +379,22 @@ class StreamLoopManager: if last_interval is None or abs(interval - last_interval) > 0.01: logger.info(f"流 {stream_id} 等待周期变化: {interval:.2f}s") self._last_intervals[stream_id] = interval - await asyncio.sleep(interval) + + loop_duration = time.time() - loop_start_time + logger.debug(f"🔍 [流工作器] stream={stream_id[:8]}, 循环#{loop_count} 完成, 耗时={loop_duration:.2f}s, 即将sleep {interval:.2f}s") + + # 使用分段sleep,每隔一段时间更新活动时间,避免死锁检测误报 + # 当间隔较长时(如等待用户回复),分段更新活动时间 + remaining_sleep = interval + activity_update_interval = 30.0 # 每30秒更新一次活动时间 + while remaining_sleep > 0: + sleep_chunk = min(remaining_sleep, activity_update_interval) + await asyncio.sleep(sleep_chunk) + remaining_sleep -= sleep_chunk + # 更新活动时间,表明流仍在正常运行(只是在等待) + self._stream_last_activity[stream_id] = time.time() + + logger.debug(f"🔍 [流工作器] stream={stream_id[:8]}, 循环#{loop_count} sleep结束, 开始下一循环") except asyncio.CancelledError: logger.info(f"🛑 [流工作器] stream={stream_id[:8]}, 任务ID={task_id}, 被取消") @@ -293,6 +416,9 @@ class StreamLoopManager: # 清理间隔记录 self._last_intervals.pop(stream_id, None) + + # 清理活动跟踪 + self._stream_last_activity.pop(stream_id, None) logger.info(f"🏁 [流工作器] stream={stream_id[:8]}, 任务ID={task_id}, 循环结束") diff --git a/src/memory_graph/storage/vector_store.py b/src/memory_graph/storage/vector_store.py index e61153119..d30f96c80 100644 --- a/src/memory_graph/storage/vector_store.py +++ b/src/memory_graph/storage/vector_store.py @@ -1,9 +1,13 @@ """ 向量存储层:基于 ChromaDB 的语义向量存储 + +注意:ChromaDB 是同步库,所有操作都必须使用 asyncio.to_thread() 包装 +以避免阻塞 asyncio 事件循环导致死锁。 """ from __future__ import annotations +import asyncio from pathlib import Path from typing import Any @@ -53,22 +57,30 @@ class VectorStore: import chromadb from chromadb.config import Settings - # 创建持久化客户端 - self.client = chromadb.PersistentClient( - path=str(self.data_dir / "chroma"), - settings=Settings( - anonymized_telemetry=False, - allow_reset=True, - ), - ) + # 创建持久化客户端 - 同步操作需要在线程中执行 + def _create_client(): + return chromadb.PersistentClient( + path=str(self.data_dir / "chroma"), + settings=Settings( + anonymized_telemetry=False, + allow_reset=True, + ), + ) + + self.client = await asyncio.to_thread(_create_client) - # 获取或创建集合 - self.collection = self.client.get_or_create_collection( - name=self.collection_name, - metadata={"description": "Memory graph node embeddings"}, - ) + # 获取或创建集合 - 同步操作需要在线程中执行 + def _get_or_create_collection(): + return self.client.get_or_create_collection( + name=self.collection_name, + metadata={"description": "Memory graph node embeddings"}, + ) + + self.collection = await asyncio.to_thread(_get_or_create_collection) - logger.debug(f"ChromaDB 初始化完成,集合包含 {self.collection.count()} 个节点") + # count() 也是同步操作 + count = await asyncio.to_thread(self.collection.count) + logger.debug(f"ChromaDB 初始化完成,集合包含 {count} 个节点") except Exception as e: logger.error(f"初始化 ChromaDB 失败: {e}") @@ -106,12 +118,16 @@ class VectorStore: else: metadata[key] = str(value) - self.collection.add( - ids=[node.id], - embeddings=[node.embedding.tolist()], - metadatas=[metadata], - documents=[node.content], # 文本内容用于检索 - ) + # ChromaDB add() 是同步阻塞操作,必须在线程中执行 + def _add_node(): + self.collection.add( + ids=[node.id], + embeddings=[node.embedding.tolist()], + metadatas=[metadata], + documents=[node.content], + ) + + await asyncio.to_thread(_add_node) logger.debug(f"添加节点到向量存储: {node}") @@ -155,12 +171,16 @@ class VectorStore: metadata[key] = str(value) metadatas.append(metadata) - self.collection.add( - ids=[n.id for n in valid_nodes], - embeddings=[n.embedding.tolist() for n in valid_nodes], # type: ignore - metadatas=metadatas, - documents=[n.content for n in valid_nodes], - ) + # ChromaDB add() 是同步阻塞操作,必须在线程中执行 + def _add_batch(): + self.collection.add( + ids=[n.id for n in valid_nodes], + embeddings=[n.embedding.tolist() for n in valid_nodes], # type: ignore + metadatas=metadatas, + documents=[n.content for n in valid_nodes], + ) + + await asyncio.to_thread(_add_batch) except Exception as e: logger.error(f"批量添加节点失败: {e}") @@ -194,12 +214,15 @@ class VectorStore: if node_types: where_filter = {"node_type": {"$in": [nt.value for nt in node_types]}} - # 执行查询 - results = self.collection.query( - query_embeddings=[query_embedding.tolist()], - n_results=limit, - where=where_filter, - ) + # ChromaDB query() 是同步阻塞操作,必须在线程中执行 + def _query(): + return self.collection.query( + query_embeddings=[query_embedding.tolist()], + n_results=limit, + where=where_filter, + ) + + results = await asyncio.to_thread(_query) # 解析结果 import orjson @@ -360,7 +383,11 @@ class VectorStore: raise RuntimeError("向量存储未初始化") try: - result = self.collection.get(ids=[node_id], include=["metadatas", "embeddings"]) + # ChromaDB get() 是同步阻塞操作,必须在线程中执行 + def _get(): + return self.collection.get(ids=[node_id], include=["metadatas", "embeddings"]) + + result = await asyncio.to_thread(_get) # 修复:直接检查 ids 列表是否非空(避免 numpy 数组的布尔值歧义) if result is not None: @@ -392,7 +419,11 @@ class VectorStore: raise RuntimeError("向量存储未初始化") try: - self.collection.delete(ids=[node_id]) + # ChromaDB delete() 是同步阻塞操作,必须在线程中执行 + def _delete(): + self.collection.delete(ids=[node_id]) + + await asyncio.to_thread(_delete) logger.debug(f"删除节点: {node_id}") except Exception as e: @@ -411,7 +442,11 @@ class VectorStore: raise RuntimeError("向量存储未初始化") try: - self.collection.update(ids=[node_id], embeddings=[embedding.tolist()]) + # ChromaDB update() 是同步阻塞操作,必须在线程中执行 + def _update(): + self.collection.update(ids=[node_id], embeddings=[embedding.tolist()]) + + await asyncio.to_thread(_update) logger.debug(f"更新节点 embedding: {node_id}") except Exception as e: @@ -419,10 +454,16 @@ class VectorStore: raise def get_total_count(self) -> int: - """获取向量存储中的节点总数""" + """获取向量存储中的节点总数(同步方法,谨慎在 async 上下文中使用)""" if not self.collection: return 0 return self.collection.count() + + async def get_total_count_async(self) -> int: + """异步获取向量存储中的节点总数""" + if not self.collection: + return 0 + return await asyncio.to_thread(self.collection.count) async def clear(self) -> None: """清空向量存储(危险操作,仅用于测试)""" @@ -430,12 +471,15 @@ class VectorStore: return try: - # 删除并重新创建集合 - self.client.delete_collection(self.collection_name) - self.collection = self.client.get_or_create_collection( - name=self.collection_name, - metadata={"description": "Memory graph node embeddings"}, - ) + # ChromaDB delete_collection 和 get_or_create_collection 都是同步阻塞操作 + def _clear(): + self.client.delete_collection(self.collection_name) + return self.client.get_or_create_collection( + name=self.collection_name, + metadata={"description": "Memory graph node embeddings"}, + ) + + self.collection = await asyncio.to_thread(_clear) logger.warning(f"向量存储已清空: {self.collection_name}") except Exception as e: diff --git a/src/plugins/built_in/kokoro_flow_chatter/prompt_modules.py b/src/plugins/built_in/kokoro_flow_chatter/prompt_modules.py index 2b3e6ac84..0ccc4a661 100644 --- a/src/plugins/built_in/kokoro_flow_chatter/prompt_modules.py +++ b/src/plugins/built_in/kokoro_flow_chatter/prompt_modules.py @@ -233,7 +233,15 @@ def _format_available_actions(available_actions: dict[str, ActionInfo]) -> str: def _get_default_actions_block() -> str: """获取默认的内置动作描述块""" return """### `reply` - 发消息 -发送文字回复 +发送文字回复。 + +**自然分段技巧**:像真人发微信一样,把长回复拆成几条短消息: +- 在语气词后分段:"嗯~"、"好呀"、"哈哈"、"嗯..."、"唔..." +- 在情绪转折处分段:话题切换、语气变化的地方 +- 在自然停顿处分段:问句后、感叹后、一个完整意思表达完后 +- 每条消息保持简短,1-2句话最自然 +- 用多个 reply 动作,每条就是一条消息 + ```json {"type": "reply", "content": "你要说的话"} ``` @@ -291,8 +299,9 @@ def build_output_module( "expected_user_reaction": "你觉得对方会怎么回应", "max_wait_seconds": 等待秒数(60-900),不想等就填0, "actions": [ - {"type": "reply", "content": "你要发送的消息"}, - {"type": "其他动作", ...} + {"type": "reply", "content": "第一条消息"}, + {"type": "reply", "content": "第二条消息"}, + ... ] } ``` @@ -301,7 +310,12 @@ def build_output_module( - `thought`:你脑子里在想什么,越自然越好 - `actions`:你要做的事,可以组合多个动作 - `max_wait_seconds`:设定一个时间,对方没回的话你会再想想要不要说点什么 -- 即使什么都不想做,也放一个 `{"type": "do_nothing"}`""" +- 即使什么都不想做,也放一个 `{"type": "do_nothing"}` + +💡 **回复技巧**: +- 像发微信一样,把想说的话拆成几条短消息 +- 用多个 `reply` 动作,每个就是一条独立的消息 +- 这样更自然,真人聊天也是分段发的""" parts = ["## 6. 你的表达方式"] diff --git a/src/plugins/built_in/kokoro_flow_chatter/response_post_processor.py b/src/plugins/built_in/kokoro_flow_chatter/response_post_processor.py index e463d696b..496337d4f 100644 --- a/src/plugins/built_in/kokoro_flow_chatter/response_post_processor.py +++ b/src/plugins/built_in/kokoro_flow_chatter/response_post_processor.py @@ -141,29 +141,13 @@ async def process_reply_content(content: str) -> list[str]: # 失败时使用原内容 processed_content = content - # Step 2: 消息分割 - splitter_cfg = global_config.response_splitter - if splitter_cfg.enable: - split_mode = splitter_cfg.split_mode - max_length = splitter_cfg.max_length - max_sentences = splitter_cfg.max_sentence_num - - if split_mode == "punctuation": - # 基于标点符号分割 - result = split_by_punctuation( - processed_content, - max_length=max_length, - max_sentences=max_sentences, - ) - logger.info(f"[KFC PostProcessor] 标点分割完成,分为 {len(result)} 条消息") - return result - elif split_mode == "llm": - # LLM模式:目前暂不支持,回退到不分割 - logger.info("[KFC PostProcessor] LLM分割模式暂不支持,返回完整内容") - return [processed_content] - else: - logger.warning(f"[KFC PostProcessor] 未知分割模式: {split_mode}") - return [processed_content] - else: - # 分割器禁用,返回完整内容 - return [processed_content] + # Step 2: 消息分割 - 已禁用 + # KFC 的 LLM 会自己通过多个 reply 动作来分割消息, + # 后处理器不再进行二次分割,避免破坏 LLM 的自然分割决策。 + # + # 参考提示词中的指导: + # - LLM 被引导在合适的语气词、标点处自然分段 + # - 每个分段作为独立的 reply 动作发送 + # - 这样更符合真人发微信的习惯 + logger.debug("[KFC PostProcessor] 消息分割已禁用(由LLM自行通过多个reply分割)") + return [processed_content]