feat(core): 实现死锁检测器并改进 LLM 消息拆分 本次提交引入了两个主要增强功能:在 StreamLoopManager 中增加死锁检测机制以提高系统稳定性,以及对 Kokoro Flow Chatter (KFC) 的消息拆分策略进行调整,以生成更自然、更贴近人类的对话。 **StreamLoopManager 中的死锁检测:** - 新的死锁检测器现在会定期运行,监控所有活动消息流。 - 它会跟踪每个消息流的最后活动时间,并标记任何超过两分钟未活动的流为潜在死锁。 - 这种主动监控有助于识别和诊断可能卡住的消息流,防止系统整体冻结。 - 为了避免在长时间等待(例如等待用户回复或长时间 LLM 生成)期间出现误报,消息流循环现在即使在睡眠或处理阶段也会定期更新其活动时间戳。 **KFC 中的消息拆分优化:** - 自动,响应后处理器中的基于规则的消息拆分器已被禁用。- 消息拆分的责任现在完全交由大型语言模型(LLM)处理。- 系统提示已更新,明确指示LLM使用多个 reply 操作,将长响应拆分为更短、更自然的段落,模仿真实的人类消息模式。- 此更改允许进行更加上下文感知和情感适宜的消息分段,从而提供更具吸引力的用户体验。**VectorStore 的异步安全性:**- 所有对同步 ChromaDB 库的调用现在都被封装在 asyncio.to_thread() 中。这可以防止阻塞主 asyncio 事件循环,而这正是新检测器设计用来捕获的潜在死锁来源。
This commit is contained in:
@@ -52,6 +52,11 @@ class StreamLoopManager:
|
|||||||
|
|
||||||
# 流循环启动锁:防止并发启动同一个流的多个循环任务
|
# 流循环启动锁:防止并发启动同一个流的多个循环任务
|
||||||
self._stream_start_locks: dict[str, asyncio.Lock] = {}
|
self._stream_start_locks: dict[str, asyncio.Lock] = {}
|
||||||
|
|
||||||
|
# 死锁检测:记录每个流的最后活动时间
|
||||||
|
self._stream_last_activity: dict[str, float] = {}
|
||||||
|
self._deadlock_detector_task: asyncio.Task | None = None
|
||||||
|
self._deadlock_threshold_seconds: float = 120.0 # 2分钟无活动视为可能死锁
|
||||||
|
|
||||||
logger.info(f"流循环管理器初始化完成 (最大并发流数: {self.max_concurrent_streams})")
|
logger.info(f"流循环管理器初始化完成 (最大并发流数: {self.max_concurrent_streams})")
|
||||||
|
|
||||||
@@ -62,6 +67,60 @@ class StreamLoopManager:
|
|||||||
return
|
return
|
||||||
|
|
||||||
self.is_running = True
|
self.is_running = True
|
||||||
|
|
||||||
|
# 启动死锁检测器
|
||||||
|
self._deadlock_detector_task = asyncio.create_task(
|
||||||
|
self._deadlock_detector_loop(),
|
||||||
|
name="deadlock_detector"
|
||||||
|
)
|
||||||
|
logger.info("死锁检测器已启动")
|
||||||
|
|
||||||
|
async def _deadlock_detector_loop(self) -> None:
|
||||||
|
"""死锁检测循环 - 定期检查所有流的活动状态"""
|
||||||
|
while self.is_running:
|
||||||
|
try:
|
||||||
|
await asyncio.sleep(30.0) # 每30秒检查一次
|
||||||
|
|
||||||
|
current_time = time.time()
|
||||||
|
suspected_deadlocks = []
|
||||||
|
|
||||||
|
# 检查所有活跃流的最后活动时间
|
||||||
|
for stream_id, last_activity in list(self._stream_last_activity.items()):
|
||||||
|
inactive_seconds = current_time - last_activity
|
||||||
|
if inactive_seconds > self._deadlock_threshold_seconds:
|
||||||
|
suspected_deadlocks.append((stream_id, inactive_seconds))
|
||||||
|
|
||||||
|
if suspected_deadlocks:
|
||||||
|
logger.warning(
|
||||||
|
f"🔴 [死锁检测] 发现 {len(suspected_deadlocks)} 个可能卡住的流:\n" +
|
||||||
|
"\n".join([
|
||||||
|
f" - stream={sid[:8]}, 无活动时间={inactive:.1f}s"
|
||||||
|
for sid, inactive in suspected_deadlocks
|
||||||
|
])
|
||||||
|
)
|
||||||
|
|
||||||
|
# 打印当前所有 asyncio 任务的状态
|
||||||
|
all_tasks = asyncio.all_tasks()
|
||||||
|
stream_loop_tasks = [t for t in all_tasks if t.get_name().startswith("stream_loop_")]
|
||||||
|
logger.warning(
|
||||||
|
f"🔴 [死锁检测] 当前流循环任务状态:\n" +
|
||||||
|
"\n".join([
|
||||||
|
f" - {t.get_name()}: done={t.done()}, cancelled={t.cancelled()}"
|
||||||
|
for t in stream_loop_tasks
|
||||||
|
])
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
# 每5分钟报告一次正常状态
|
||||||
|
if int(current_time) % 300 < 30:
|
||||||
|
active_count = len(self._stream_last_activity)
|
||||||
|
if active_count > 0:
|
||||||
|
logger.info(f"🟢 [死锁检测] 所有 {active_count} 个流正常运行中")
|
||||||
|
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
logger.info("死锁检测器被取消")
|
||||||
|
break
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"死锁检测器出错: {e}")
|
||||||
|
|
||||||
async def stop(self) -> None:
|
async def stop(self) -> None:
|
||||||
"""停止流循环管理器"""
|
"""停止流循环管理器"""
|
||||||
@@ -69,6 +128,15 @@ class StreamLoopManager:
|
|||||||
return
|
return
|
||||||
|
|
||||||
self.is_running = False
|
self.is_running = False
|
||||||
|
|
||||||
|
# 停止死锁检测器
|
||||||
|
if self._deadlock_detector_task and not self._deadlock_detector_task.done():
|
||||||
|
self._deadlock_detector_task.cancel()
|
||||||
|
try:
|
||||||
|
await self._deadlock_detector_task
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
pass
|
||||||
|
logger.info("死锁检测器已停止")
|
||||||
|
|
||||||
# 取消所有流循环
|
# 取消所有流循环
|
||||||
try:
|
try:
|
||||||
@@ -214,11 +282,24 @@ class StreamLoopManager:
|
|||||||
"""
|
"""
|
||||||
task_id = id(asyncio.current_task())
|
task_id = id(asyncio.current_task())
|
||||||
logger.info(f"🔄 [流工作器] stream={stream_id[:8]}, 任务ID={task_id}, 启动")
|
logger.info(f"🔄 [流工作器] stream={stream_id[:8]}, 任务ID={task_id}, 启动")
|
||||||
|
|
||||||
|
# 死锁检测:记录循环次数和上次活动时间
|
||||||
|
loop_count = 0
|
||||||
|
|
||||||
|
# 注册到活动跟踪
|
||||||
|
self._stream_last_activity[stream_id] = time.time()
|
||||||
|
|
||||||
try:
|
try:
|
||||||
while self.is_running:
|
while self.is_running:
|
||||||
|
loop_count += 1
|
||||||
|
loop_start_time = time.time()
|
||||||
|
|
||||||
|
# 更新活动时间(死锁检测用)
|
||||||
|
self._stream_last_activity[stream_id] = loop_start_time
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# 1. 获取流上下文
|
# 1. 获取流上下文
|
||||||
|
logger.debug(f"🔍 [流工作器] stream={stream_id[:8]}, 循环#{loop_count}, 获取上下文...")
|
||||||
context = await self._get_stream_context(stream_id)
|
context = await self._get_stream_context(stream_id)
|
||||||
if not context:
|
if not context:
|
||||||
logger.warning(f"⚠️ [流工作器] stream={stream_id[:8]}, 无法获取流上下文")
|
logger.warning(f"⚠️ [流工作器] stream={stream_id[:8]}, 无法获取流上下文")
|
||||||
@@ -226,6 +307,7 @@ class StreamLoopManager:
|
|||||||
continue
|
continue
|
||||||
|
|
||||||
# 2. 检查是否有消息需要处理
|
# 2. 检查是否有消息需要处理
|
||||||
|
logger.debug(f"🔍 [流工作器] stream={stream_id[:8]}, 循环#{loop_count}, 刷新缓存消息...")
|
||||||
await self._flush_cached_messages_to_unread(stream_id)
|
await self._flush_cached_messages_to_unread(stream_id)
|
||||||
unread_count = self._get_unread_count(context)
|
unread_count = self._get_unread_count(context)
|
||||||
force_dispatch = self._needs_force_dispatch_for_context(context, unread_count)
|
force_dispatch = self._needs_force_dispatch_for_context(context, unread_count)
|
||||||
@@ -245,11 +327,36 @@ class StreamLoopManager:
|
|||||||
logger.debug(f"更新流能量失败 {stream_id}: {e}")
|
logger.debug(f"更新流能量失败 {stream_id}: {e}")
|
||||||
|
|
||||||
# 4. 激活chatter处理
|
# 4. 激活chatter处理
|
||||||
|
logger.debug(f"🔍 [流工作器] stream={stream_id[:8]}, 循环#{loop_count}, 开始chatter处理...")
|
||||||
try:
|
try:
|
||||||
success = await asyncio.wait_for(self._process_stream_messages(stream_id, context), global_config.chat.thinking_timeout)
|
# 在长时间处理期间定期更新活动时间,避免死锁检测误报
|
||||||
|
async def process_with_activity_update():
|
||||||
|
process_task = asyncio.create_task(
|
||||||
|
self._process_stream_messages(stream_id, context)
|
||||||
|
)
|
||||||
|
activity_update_interval = 30.0 # 每30秒更新一次
|
||||||
|
while not process_task.done():
|
||||||
|
try:
|
||||||
|
# 等待任务完成或超时
|
||||||
|
await asyncio.wait_for(
|
||||||
|
asyncio.shield(process_task),
|
||||||
|
timeout=activity_update_interval
|
||||||
|
)
|
||||||
|
except asyncio.TimeoutError:
|
||||||
|
# 任务仍在运行,更新活动时间
|
||||||
|
self._stream_last_activity[stream_id] = time.time()
|
||||||
|
logger.debug(f"🔄 [流工作器] stream={stream_id[:8]}, 处理中,更新活动时间")
|
||||||
|
return await process_task
|
||||||
|
|
||||||
|
success = await asyncio.wait_for(
|
||||||
|
process_with_activity_update(),
|
||||||
|
global_config.chat.thinking_timeout
|
||||||
|
)
|
||||||
except asyncio.TimeoutError:
|
except asyncio.TimeoutError:
|
||||||
logger.warning(f"⏱️ [流工作器] stream={stream_id[:8]}, 任务ID={task_id}, 处理超时")
|
logger.warning(f"⏱️ [流工作器] stream={stream_id[:8]}, 任务ID={task_id}, 处理超时")
|
||||||
success = False
|
success = False
|
||||||
|
logger.debug(f"🔍 [流工作器] stream={stream_id[:8]}, 循环#{loop_count}, chatter处理完成, success={success}")
|
||||||
|
|
||||||
# 更新统计
|
# 更新统计
|
||||||
self.stats["total_process_cycles"] += 1
|
self.stats["total_process_cycles"] += 1
|
||||||
if success:
|
if success:
|
||||||
@@ -263,6 +370,7 @@ class StreamLoopManager:
|
|||||||
logger.warning(f"❌ [流工作器] stream={stream_id[:8]}, 任务ID={task_id}, 处理失败")
|
logger.warning(f"❌ [流工作器] stream={stream_id[:8]}, 任务ID={task_id}, 处理失败")
|
||||||
|
|
||||||
# 5. 计算下次检查间隔
|
# 5. 计算下次检查间隔
|
||||||
|
logger.debug(f"🔍 [流工作器] stream={stream_id[:8]}, 循环#{loop_count}, 计算间隔...")
|
||||||
interval = await self._calculate_interval(stream_id, has_messages)
|
interval = await self._calculate_interval(stream_id, has_messages)
|
||||||
|
|
||||||
# 6. sleep等待下次检查
|
# 6. sleep等待下次检查
|
||||||
@@ -271,7 +379,22 @@ class StreamLoopManager:
|
|||||||
if last_interval is None or abs(interval - last_interval) > 0.01:
|
if last_interval is None or abs(interval - last_interval) > 0.01:
|
||||||
logger.info(f"流 {stream_id} 等待周期变化: {interval:.2f}s")
|
logger.info(f"流 {stream_id} 等待周期变化: {interval:.2f}s")
|
||||||
self._last_intervals[stream_id] = interval
|
self._last_intervals[stream_id] = interval
|
||||||
await asyncio.sleep(interval)
|
|
||||||
|
loop_duration = time.time() - loop_start_time
|
||||||
|
logger.debug(f"🔍 [流工作器] stream={stream_id[:8]}, 循环#{loop_count} 完成, 耗时={loop_duration:.2f}s, 即将sleep {interval:.2f}s")
|
||||||
|
|
||||||
|
# 使用分段sleep,每隔一段时间更新活动时间,避免死锁检测误报
|
||||||
|
# 当间隔较长时(如等待用户回复),分段更新活动时间
|
||||||
|
remaining_sleep = interval
|
||||||
|
activity_update_interval = 30.0 # 每30秒更新一次活动时间
|
||||||
|
while remaining_sleep > 0:
|
||||||
|
sleep_chunk = min(remaining_sleep, activity_update_interval)
|
||||||
|
await asyncio.sleep(sleep_chunk)
|
||||||
|
remaining_sleep -= sleep_chunk
|
||||||
|
# 更新活动时间,表明流仍在正常运行(只是在等待)
|
||||||
|
self._stream_last_activity[stream_id] = time.time()
|
||||||
|
|
||||||
|
logger.debug(f"🔍 [流工作器] stream={stream_id[:8]}, 循环#{loop_count} sleep结束, 开始下一循环")
|
||||||
|
|
||||||
except asyncio.CancelledError:
|
except asyncio.CancelledError:
|
||||||
logger.info(f"🛑 [流工作器] stream={stream_id[:8]}, 任务ID={task_id}, 被取消")
|
logger.info(f"🛑 [流工作器] stream={stream_id[:8]}, 任务ID={task_id}, 被取消")
|
||||||
@@ -293,6 +416,9 @@ class StreamLoopManager:
|
|||||||
|
|
||||||
# 清理间隔记录
|
# 清理间隔记录
|
||||||
self._last_intervals.pop(stream_id, None)
|
self._last_intervals.pop(stream_id, None)
|
||||||
|
|
||||||
|
# 清理活动跟踪
|
||||||
|
self._stream_last_activity.pop(stream_id, None)
|
||||||
|
|
||||||
logger.info(f"🏁 [流工作器] stream={stream_id[:8]}, 任务ID={task_id}, 循环结束")
|
logger.info(f"🏁 [流工作器] stream={stream_id[:8]}, 任务ID={task_id}, 循环结束")
|
||||||
|
|
||||||
|
|||||||
@@ -1,9 +1,13 @@
|
|||||||
"""
|
"""
|
||||||
向量存储层:基于 ChromaDB 的语义向量存储
|
向量存储层:基于 ChromaDB 的语义向量存储
|
||||||
|
|
||||||
|
注意:ChromaDB 是同步库,所有操作都必须使用 asyncio.to_thread() 包装
|
||||||
|
以避免阻塞 asyncio 事件循环导致死锁。
|
||||||
"""
|
"""
|
||||||
|
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import asyncio
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from typing import Any
|
from typing import Any
|
||||||
|
|
||||||
@@ -53,22 +57,30 @@ class VectorStore:
|
|||||||
import chromadb
|
import chromadb
|
||||||
from chromadb.config import Settings
|
from chromadb.config import Settings
|
||||||
|
|
||||||
# 创建持久化客户端
|
# 创建持久化客户端 - 同步操作需要在线程中执行
|
||||||
self.client = chromadb.PersistentClient(
|
def _create_client():
|
||||||
path=str(self.data_dir / "chroma"),
|
return chromadb.PersistentClient(
|
||||||
settings=Settings(
|
path=str(self.data_dir / "chroma"),
|
||||||
anonymized_telemetry=False,
|
settings=Settings(
|
||||||
allow_reset=True,
|
anonymized_telemetry=False,
|
||||||
),
|
allow_reset=True,
|
||||||
)
|
),
|
||||||
|
)
|
||||||
|
|
||||||
|
self.client = await asyncio.to_thread(_create_client)
|
||||||
|
|
||||||
# 获取或创建集合
|
# 获取或创建集合 - 同步操作需要在线程中执行
|
||||||
self.collection = self.client.get_or_create_collection(
|
def _get_or_create_collection():
|
||||||
name=self.collection_name,
|
return self.client.get_or_create_collection(
|
||||||
metadata={"description": "Memory graph node embeddings"},
|
name=self.collection_name,
|
||||||
)
|
metadata={"description": "Memory graph node embeddings"},
|
||||||
|
)
|
||||||
|
|
||||||
|
self.collection = await asyncio.to_thread(_get_or_create_collection)
|
||||||
|
|
||||||
logger.debug(f"ChromaDB 初始化完成,集合包含 {self.collection.count()} 个节点")
|
# count() 也是同步操作
|
||||||
|
count = await asyncio.to_thread(self.collection.count)
|
||||||
|
logger.debug(f"ChromaDB 初始化完成,集合包含 {count} 个节点")
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"初始化 ChromaDB 失败: {e}")
|
logger.error(f"初始化 ChromaDB 失败: {e}")
|
||||||
@@ -106,12 +118,16 @@ class VectorStore:
|
|||||||
else:
|
else:
|
||||||
metadata[key] = str(value)
|
metadata[key] = str(value)
|
||||||
|
|
||||||
self.collection.add(
|
# ChromaDB add() 是同步阻塞操作,必须在线程中执行
|
||||||
ids=[node.id],
|
def _add_node():
|
||||||
embeddings=[node.embedding.tolist()],
|
self.collection.add(
|
||||||
metadatas=[metadata],
|
ids=[node.id],
|
||||||
documents=[node.content], # 文本内容用于检索
|
embeddings=[node.embedding.tolist()],
|
||||||
)
|
metadatas=[metadata],
|
||||||
|
documents=[node.content],
|
||||||
|
)
|
||||||
|
|
||||||
|
await asyncio.to_thread(_add_node)
|
||||||
|
|
||||||
logger.debug(f"添加节点到向量存储: {node}")
|
logger.debug(f"添加节点到向量存储: {node}")
|
||||||
|
|
||||||
@@ -155,12 +171,16 @@ class VectorStore:
|
|||||||
metadata[key] = str(value)
|
metadata[key] = str(value)
|
||||||
metadatas.append(metadata)
|
metadatas.append(metadata)
|
||||||
|
|
||||||
self.collection.add(
|
# ChromaDB add() 是同步阻塞操作,必须在线程中执行
|
||||||
ids=[n.id for n in valid_nodes],
|
def _add_batch():
|
||||||
embeddings=[n.embedding.tolist() for n in valid_nodes], # type: ignore
|
self.collection.add(
|
||||||
metadatas=metadatas,
|
ids=[n.id for n in valid_nodes],
|
||||||
documents=[n.content for n in valid_nodes],
|
embeddings=[n.embedding.tolist() for n in valid_nodes], # type: ignore
|
||||||
)
|
metadatas=metadatas,
|
||||||
|
documents=[n.content for n in valid_nodes],
|
||||||
|
)
|
||||||
|
|
||||||
|
await asyncio.to_thread(_add_batch)
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"批量添加节点失败: {e}")
|
logger.error(f"批量添加节点失败: {e}")
|
||||||
@@ -194,12 +214,15 @@ class VectorStore:
|
|||||||
if node_types:
|
if node_types:
|
||||||
where_filter = {"node_type": {"$in": [nt.value for nt in node_types]}}
|
where_filter = {"node_type": {"$in": [nt.value for nt in node_types]}}
|
||||||
|
|
||||||
# 执行查询
|
# ChromaDB query() 是同步阻塞操作,必须在线程中执行
|
||||||
results = self.collection.query(
|
def _query():
|
||||||
query_embeddings=[query_embedding.tolist()],
|
return self.collection.query(
|
||||||
n_results=limit,
|
query_embeddings=[query_embedding.tolist()],
|
||||||
where=where_filter,
|
n_results=limit,
|
||||||
)
|
where=where_filter,
|
||||||
|
)
|
||||||
|
|
||||||
|
results = await asyncio.to_thread(_query)
|
||||||
|
|
||||||
# 解析结果
|
# 解析结果
|
||||||
import orjson
|
import orjson
|
||||||
@@ -360,7 +383,11 @@ class VectorStore:
|
|||||||
raise RuntimeError("向量存储未初始化")
|
raise RuntimeError("向量存储未初始化")
|
||||||
|
|
||||||
try:
|
try:
|
||||||
result = self.collection.get(ids=[node_id], include=["metadatas", "embeddings"])
|
# ChromaDB get() 是同步阻塞操作,必须在线程中执行
|
||||||
|
def _get():
|
||||||
|
return self.collection.get(ids=[node_id], include=["metadatas", "embeddings"])
|
||||||
|
|
||||||
|
result = await asyncio.to_thread(_get)
|
||||||
|
|
||||||
# 修复:直接检查 ids 列表是否非空(避免 numpy 数组的布尔值歧义)
|
# 修复:直接检查 ids 列表是否非空(避免 numpy 数组的布尔值歧义)
|
||||||
if result is not None:
|
if result is not None:
|
||||||
@@ -392,7 +419,11 @@ class VectorStore:
|
|||||||
raise RuntimeError("向量存储未初始化")
|
raise RuntimeError("向量存储未初始化")
|
||||||
|
|
||||||
try:
|
try:
|
||||||
self.collection.delete(ids=[node_id])
|
# ChromaDB delete() 是同步阻塞操作,必须在线程中执行
|
||||||
|
def _delete():
|
||||||
|
self.collection.delete(ids=[node_id])
|
||||||
|
|
||||||
|
await asyncio.to_thread(_delete)
|
||||||
logger.debug(f"删除节点: {node_id}")
|
logger.debug(f"删除节点: {node_id}")
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
@@ -411,7 +442,11 @@ class VectorStore:
|
|||||||
raise RuntimeError("向量存储未初始化")
|
raise RuntimeError("向量存储未初始化")
|
||||||
|
|
||||||
try:
|
try:
|
||||||
self.collection.update(ids=[node_id], embeddings=[embedding.tolist()])
|
# ChromaDB update() 是同步阻塞操作,必须在线程中执行
|
||||||
|
def _update():
|
||||||
|
self.collection.update(ids=[node_id], embeddings=[embedding.tolist()])
|
||||||
|
|
||||||
|
await asyncio.to_thread(_update)
|
||||||
logger.debug(f"更新节点 embedding: {node_id}")
|
logger.debug(f"更新节点 embedding: {node_id}")
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
@@ -419,10 +454,16 @@ class VectorStore:
|
|||||||
raise
|
raise
|
||||||
|
|
||||||
def get_total_count(self) -> int:
|
def get_total_count(self) -> int:
|
||||||
"""获取向量存储中的节点总数"""
|
"""获取向量存储中的节点总数(同步方法,谨慎在 async 上下文中使用)"""
|
||||||
if not self.collection:
|
if not self.collection:
|
||||||
return 0
|
return 0
|
||||||
return self.collection.count()
|
return self.collection.count()
|
||||||
|
|
||||||
|
async def get_total_count_async(self) -> int:
|
||||||
|
"""异步获取向量存储中的节点总数"""
|
||||||
|
if not self.collection:
|
||||||
|
return 0
|
||||||
|
return await asyncio.to_thread(self.collection.count)
|
||||||
|
|
||||||
async def clear(self) -> None:
|
async def clear(self) -> None:
|
||||||
"""清空向量存储(危险操作,仅用于测试)"""
|
"""清空向量存储(危险操作,仅用于测试)"""
|
||||||
@@ -430,12 +471,15 @@ class VectorStore:
|
|||||||
return
|
return
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# 删除并重新创建集合
|
# ChromaDB delete_collection 和 get_or_create_collection 都是同步阻塞操作
|
||||||
self.client.delete_collection(self.collection_name)
|
def _clear():
|
||||||
self.collection = self.client.get_or_create_collection(
|
self.client.delete_collection(self.collection_name)
|
||||||
name=self.collection_name,
|
return self.client.get_or_create_collection(
|
||||||
metadata={"description": "Memory graph node embeddings"},
|
name=self.collection_name,
|
||||||
)
|
metadata={"description": "Memory graph node embeddings"},
|
||||||
|
)
|
||||||
|
|
||||||
|
self.collection = await asyncio.to_thread(_clear)
|
||||||
logger.warning(f"向量存储已清空: {self.collection_name}")
|
logger.warning(f"向量存储已清空: {self.collection_name}")
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
|||||||
@@ -233,7 +233,15 @@ def _format_available_actions(available_actions: dict[str, ActionInfo]) -> str:
|
|||||||
def _get_default_actions_block() -> str:
|
def _get_default_actions_block() -> str:
|
||||||
"""获取默认的内置动作描述块"""
|
"""获取默认的内置动作描述块"""
|
||||||
return """### `reply` - 发消息
|
return """### `reply` - 发消息
|
||||||
发送文字回复
|
发送文字回复。
|
||||||
|
|
||||||
|
**自然分段技巧**:像真人发微信一样,把长回复拆成几条短消息:
|
||||||
|
- 在语气词后分段:"嗯~"、"好呀"、"哈哈"、"嗯..."、"唔..."
|
||||||
|
- 在情绪转折处分段:话题切换、语气变化的地方
|
||||||
|
- 在自然停顿处分段:问句后、感叹后、一个完整意思表达完后
|
||||||
|
- 每条消息保持简短,1-2句话最自然
|
||||||
|
- 用多个 reply 动作,每条就是一条消息
|
||||||
|
|
||||||
```json
|
```json
|
||||||
{"type": "reply", "content": "你要说的话"}
|
{"type": "reply", "content": "你要说的话"}
|
||||||
```
|
```
|
||||||
@@ -291,8 +299,9 @@ def build_output_module(
|
|||||||
"expected_user_reaction": "你觉得对方会怎么回应",
|
"expected_user_reaction": "你觉得对方会怎么回应",
|
||||||
"max_wait_seconds": 等待秒数(60-900),不想等就填0,
|
"max_wait_seconds": 等待秒数(60-900),不想等就填0,
|
||||||
"actions": [
|
"actions": [
|
||||||
{"type": "reply", "content": "你要发送的消息"},
|
{"type": "reply", "content": "第一条消息"},
|
||||||
{"type": "其他动作", ...}
|
{"type": "reply", "content": "第二条消息"},
|
||||||
|
...
|
||||||
]
|
]
|
||||||
}
|
}
|
||||||
```
|
```
|
||||||
@@ -301,7 +310,12 @@ def build_output_module(
|
|||||||
- `thought`:你脑子里在想什么,越自然越好
|
- `thought`:你脑子里在想什么,越自然越好
|
||||||
- `actions`:你要做的事,可以组合多个动作
|
- `actions`:你要做的事,可以组合多个动作
|
||||||
- `max_wait_seconds`:设定一个时间,对方没回的话你会再想想要不要说点什么
|
- `max_wait_seconds`:设定一个时间,对方没回的话你会再想想要不要说点什么
|
||||||
- 即使什么都不想做,也放一个 `{"type": "do_nothing"}`"""
|
- 即使什么都不想做,也放一个 `{"type": "do_nothing"}`
|
||||||
|
|
||||||
|
💡 **回复技巧**:
|
||||||
|
- 像发微信一样,把想说的话拆成几条短消息
|
||||||
|
- 用多个 `reply` 动作,每个就是一条独立的消息
|
||||||
|
- 这样更自然,真人聊天也是分段发的"""
|
||||||
|
|
||||||
parts = ["## 6. 你的表达方式"]
|
parts = ["## 6. 你的表达方式"]
|
||||||
|
|
||||||
|
|||||||
@@ -141,29 +141,13 @@ async def process_reply_content(content: str) -> list[str]:
|
|||||||
# 失败时使用原内容
|
# 失败时使用原内容
|
||||||
processed_content = content
|
processed_content = content
|
||||||
|
|
||||||
# Step 2: 消息分割
|
# Step 2: 消息分割 - 已禁用
|
||||||
splitter_cfg = global_config.response_splitter
|
# KFC 的 LLM 会自己通过多个 reply 动作来分割消息,
|
||||||
if splitter_cfg.enable:
|
# 后处理器不再进行二次分割,避免破坏 LLM 的自然分割决策。
|
||||||
split_mode = splitter_cfg.split_mode
|
#
|
||||||
max_length = splitter_cfg.max_length
|
# 参考提示词中的指导:
|
||||||
max_sentences = splitter_cfg.max_sentence_num
|
# - LLM 被引导在合适的语气词、标点处自然分段
|
||||||
|
# - 每个分段作为独立的 reply 动作发送
|
||||||
if split_mode == "punctuation":
|
# - 这样更符合真人发微信的习惯
|
||||||
# 基于标点符号分割
|
logger.debug("[KFC PostProcessor] 消息分割已禁用(由LLM自行通过多个reply分割)")
|
||||||
result = split_by_punctuation(
|
return [processed_content]
|
||||||
processed_content,
|
|
||||||
max_length=max_length,
|
|
||||||
max_sentences=max_sentences,
|
|
||||||
)
|
|
||||||
logger.info(f"[KFC PostProcessor] 标点分割完成,分为 {len(result)} 条消息")
|
|
||||||
return result
|
|
||||||
elif split_mode == "llm":
|
|
||||||
# LLM模式:目前暂不支持,回退到不分割
|
|
||||||
logger.info("[KFC PostProcessor] LLM分割模式暂不支持,返回完整内容")
|
|
||||||
return [processed_content]
|
|
||||||
else:
|
|
||||||
logger.warning(f"[KFC PostProcessor] 未知分割模式: {split_mode}")
|
|
||||||
return [processed_content]
|
|
||||||
else:
|
|
||||||
# 分割器禁用,返回完整内容
|
|
||||||
return [processed_content]
|
|
||||||
|
|||||||
Reference in New Issue
Block a user