feat(memory): 增强记忆构建系统并优化检索性能

- 添加记忆提取异常处理机制,提升系统稳定性
- 实现记忆内容格式化功能,增强可读性和结构化输出
- 优化LLM响应解析逻辑,避免系统标识误写入记忆
- 改进向量存储批量嵌入生成,提升处理效率
- 为记忆系统添加机器人身份上下文注入,避免自身信息记录
- 增强记忆检索接口,支持额外上下文参数传递
- 添加控制台记忆预览功能,便于人工检查
- 优化记忆融合算法,正确处理单记忆组情况
- 改进流循环管理器,支持未读消息积压强制分发机制
This commit is contained in:
Windpicker-owo
2025-09-30 21:15:40 +08:00
parent 0a3c908654
commit 37c8253f54
11 changed files with 873 additions and 180 deletions

View File

@@ -38,6 +38,14 @@ class StreamLoopManager:
global_config.chat, "max_concurrent_distributions", 10
)
# 强制分发策略
self.force_dispatch_unread_threshold: Optional[int] = getattr(
global_config.chat, "force_dispatch_unread_threshold", 20
)
self.force_dispatch_min_interval: float = getattr(
global_config.chat, "force_dispatch_min_interval", 0.1
)
# Chatter管理器
self.chatter_manager: Optional[ChatterManager] = None
@@ -75,7 +83,7 @@ class StreamLoopManager:
logger.info("流循环管理器已停止")
async def start_stream_loop(self, stream_id: str) -> bool:
async def start_stream_loop(self, stream_id: str, force: bool = False) -> bool:
"""启动指定流的循环任务
Args:
@@ -90,11 +98,19 @@ class StreamLoopManager:
logger.debug(f"{stream_id} 循环已在运行")
return True
# 判断是否需要强制分发
force = force or self._should_force_dispatch_for_stream(stream_id)
# 检查是否超过最大并发限制
if len(self.stream_loops) >= self.max_concurrent_streams:
if len(self.stream_loops) >= self.max_concurrent_streams and not force:
logger.warning(f"超过最大并发流数限制,无法启动流 {stream_id}")
return False
if force and len(self.stream_loops) >= self.max_concurrent_streams:
logger.warning(
"%s 未读消息积压严重(>%s),突破并发限制强制启动分发", stream_id, self.force_dispatch_unread_threshold
)
# 创建流循环任务
task = asyncio.create_task(self._stream_loop(stream_id))
self.stream_loops[stream_id] = task
@@ -145,9 +161,16 @@ class StreamLoopManager:
continue
# 2. 检查是否有消息需要处理
has_messages = await self._has_messages_to_process(context)
unread_count = self._get_unread_count(context)
force_dispatch = self._needs_force_dispatch_for_context(context, unread_count)
has_messages = force_dispatch or await self._has_messages_to_process(context)
if has_messages:
if force_dispatch:
logger.info(
"%s 未读消息 %d 条,触发强制分发", stream_id, unread_count
)
# 3. 激活chatter处理
success = await self._process_stream_messages(stream_id, context)
@@ -162,6 +185,17 @@ class StreamLoopManager:
# 4. 计算下次检查间隔
interval = await self._calculate_interval(stream_id, has_messages)
if has_messages:
updated_unread_count = self._get_unread_count(context)
if self._needs_force_dispatch_for_context(context, updated_unread_count):
interval = min(interval, max(self.force_dispatch_min_interval, 0.0))
logger.debug(
"%s 未读消息仍有 %d 条,使用加速分发间隔 %.2fs",
stream_id,
updated_unread_count,
interval,
)
# 5. sleep等待下次检查
logger.info(f"{stream_id} 等待 {interval:.2f}s")
await asyncio.sleep(interval)
@@ -319,6 +353,38 @@ class StreamLoopManager:
self.chatter_manager = chatter_manager
logger.info(f"设置chatter管理器: {chatter_manager.__class__.__name__}")
def _should_force_dispatch_for_stream(self, stream_id: str) -> bool:
if not self.force_dispatch_unread_threshold or self.force_dispatch_unread_threshold <= 0:
return False
try:
chat_manager = get_chat_manager()
chat_stream = chat_manager.get_stream(stream_id)
if not chat_stream:
return False
unread = getattr(chat_stream.context_manager.context, "unread_messages", [])
return len(unread) > self.force_dispatch_unread_threshold
except Exception as e:
logger.debug(f"检查流 {stream_id} 是否需要强制分发失败: {e}")
return False
def _get_unread_count(self, context: Any) -> int:
try:
unread_messages = getattr(context, "unread_messages", None)
if unread_messages is None:
return 0
return len(unread_messages)
except Exception:
return 0
def _needs_force_dispatch_for_context(self, context: Any, unread_count: Optional[int] = None) -> bool:
if not self.force_dispatch_unread_threshold or self.force_dispatch_unread_threshold <= 0:
return False
count = unread_count if unread_count is not None else self._get_unread_count(context)
return count > self.force_dispatch_unread_threshold
def get_performance_summary(self) -> Dict[str, Any]:
"""获取性能摘要