feat(memory): 增强记忆构建系统并优化检索性能
- 添加记忆提取异常处理机制,提升系统稳定性 - 实现记忆内容格式化功能,增强可读性和结构化输出 - 优化LLM响应解析逻辑,避免系统标识误写入记忆 - 改进向量存储批量嵌入生成,提升处理效率 - 为记忆系统添加机器人身份上下文注入,避免自身信息记录 - 增强记忆检索接口,支持额外上下文参数传递 - 添加控制台记忆预览功能,便于人工检查 - 优化记忆融合算法,正确处理单记忆组情况 - 改进流循环管理器,支持未读消息积压强制分发机制
This commit is contained in:
@@ -38,6 +38,14 @@ class StreamLoopManager:
|
||||
global_config.chat, "max_concurrent_distributions", 10
|
||||
)
|
||||
|
||||
# 强制分发策略
|
||||
self.force_dispatch_unread_threshold: Optional[int] = getattr(
|
||||
global_config.chat, "force_dispatch_unread_threshold", 20
|
||||
)
|
||||
self.force_dispatch_min_interval: float = getattr(
|
||||
global_config.chat, "force_dispatch_min_interval", 0.1
|
||||
)
|
||||
|
||||
# Chatter管理器
|
||||
self.chatter_manager: Optional[ChatterManager] = None
|
||||
|
||||
@@ -75,7 +83,7 @@ class StreamLoopManager:
|
||||
|
||||
logger.info("流循环管理器已停止")
|
||||
|
||||
async def start_stream_loop(self, stream_id: str) -> bool:
|
||||
async def start_stream_loop(self, stream_id: str, force: bool = False) -> bool:
|
||||
"""启动指定流的循环任务
|
||||
|
||||
Args:
|
||||
@@ -90,11 +98,19 @@ class StreamLoopManager:
|
||||
logger.debug(f"流 {stream_id} 循环已在运行")
|
||||
return True
|
||||
|
||||
# 判断是否需要强制分发
|
||||
force = force or self._should_force_dispatch_for_stream(stream_id)
|
||||
|
||||
# 检查是否超过最大并发限制
|
||||
if len(self.stream_loops) >= self.max_concurrent_streams:
|
||||
if len(self.stream_loops) >= self.max_concurrent_streams and not force:
|
||||
logger.warning(f"超过最大并发流数限制,无法启动流 {stream_id}")
|
||||
return False
|
||||
|
||||
if force and len(self.stream_loops) >= self.max_concurrent_streams:
|
||||
logger.warning(
|
||||
"流 %s 未读消息积压严重(>%s),突破并发限制强制启动分发", stream_id, self.force_dispatch_unread_threshold
|
||||
)
|
||||
|
||||
# 创建流循环任务
|
||||
task = asyncio.create_task(self._stream_loop(stream_id))
|
||||
self.stream_loops[stream_id] = task
|
||||
@@ -145,9 +161,16 @@ class StreamLoopManager:
|
||||
continue
|
||||
|
||||
# 2. 检查是否有消息需要处理
|
||||
has_messages = await self._has_messages_to_process(context)
|
||||
unread_count = self._get_unread_count(context)
|
||||
force_dispatch = self._needs_force_dispatch_for_context(context, unread_count)
|
||||
|
||||
has_messages = force_dispatch or await self._has_messages_to_process(context)
|
||||
|
||||
if has_messages:
|
||||
if force_dispatch:
|
||||
logger.info(
|
||||
"流 %s 未读消息 %d 条,触发强制分发", stream_id, unread_count
|
||||
)
|
||||
# 3. 激活chatter处理
|
||||
success = await self._process_stream_messages(stream_id, context)
|
||||
|
||||
@@ -162,6 +185,17 @@ class StreamLoopManager:
|
||||
# 4. 计算下次检查间隔
|
||||
interval = await self._calculate_interval(stream_id, has_messages)
|
||||
|
||||
if has_messages:
|
||||
updated_unread_count = self._get_unread_count(context)
|
||||
if self._needs_force_dispatch_for_context(context, updated_unread_count):
|
||||
interval = min(interval, max(self.force_dispatch_min_interval, 0.0))
|
||||
logger.debug(
|
||||
"流 %s 未读消息仍有 %d 条,使用加速分发间隔 %.2fs",
|
||||
stream_id,
|
||||
updated_unread_count,
|
||||
interval,
|
||||
)
|
||||
|
||||
# 5. sleep等待下次检查
|
||||
logger.info(f"流 {stream_id} 等待 {interval:.2f}s")
|
||||
await asyncio.sleep(interval)
|
||||
@@ -319,6 +353,38 @@ class StreamLoopManager:
|
||||
self.chatter_manager = chatter_manager
|
||||
logger.info(f"设置chatter管理器: {chatter_manager.__class__.__name__}")
|
||||
|
||||
def _should_force_dispatch_for_stream(self, stream_id: str) -> bool:
|
||||
if not self.force_dispatch_unread_threshold or self.force_dispatch_unread_threshold <= 0:
|
||||
return False
|
||||
|
||||
try:
|
||||
chat_manager = get_chat_manager()
|
||||
chat_stream = chat_manager.get_stream(stream_id)
|
||||
if not chat_stream:
|
||||
return False
|
||||
|
||||
unread = getattr(chat_stream.context_manager.context, "unread_messages", [])
|
||||
return len(unread) > self.force_dispatch_unread_threshold
|
||||
except Exception as e:
|
||||
logger.debug(f"检查流 {stream_id} 是否需要强制分发失败: {e}")
|
||||
return False
|
||||
|
||||
def _get_unread_count(self, context: Any) -> int:
|
||||
try:
|
||||
unread_messages = getattr(context, "unread_messages", None)
|
||||
if unread_messages is None:
|
||||
return 0
|
||||
return len(unread_messages)
|
||||
except Exception:
|
||||
return 0
|
||||
|
||||
def _needs_force_dispatch_for_context(self, context: Any, unread_count: Optional[int] = None) -> bool:
|
||||
if not self.force_dispatch_unread_threshold or self.force_dispatch_unread_threshold <= 0:
|
||||
return False
|
||||
|
||||
count = unread_count if unread_count is not None else self._get_unread_count(context)
|
||||
return count > self.force_dispatch_unread_threshold
|
||||
|
||||
def get_performance_summary(self) -> Dict[str, Any]:
|
||||
"""获取性能摘要
|
||||
|
||||
|
||||
@@ -8,6 +8,7 @@ import random
|
||||
import time
|
||||
from typing import Dict, Optional, Any, TYPE_CHECKING, List
|
||||
|
||||
from src.chat.message_receive.chat_stream import ChatStream
|
||||
from src.common.logger import get_logger
|
||||
from src.common.data_models.database_data_model import DatabaseMessages
|
||||
from src.common.data_models.message_manager_data_model import StreamContext, MessageManagerStats, StreamStats
|
||||
@@ -86,11 +87,8 @@ class MessageManager:
|
||||
if not chat_stream:
|
||||
logger.warning(f"MessageManager.add_message: 聊天流 {stream_id} 不存在")
|
||||
return
|
||||
success = await chat_stream.context_manager.add_message(message)
|
||||
if success:
|
||||
logger.debug(f"添加消息到聊天流 {stream_id}: {message.message_id}")
|
||||
else:
|
||||
logger.warning(f"添加消息到聊天流 {stream_id} 失败")
|
||||
await self._check_and_handle_interruption(chat_stream)
|
||||
chat_stream.context_manager.context.processing_task = asyncio.create_task(chat_stream.context_manager.add_message(message))
|
||||
except Exception as e:
|
||||
logger.error(f"添加消息到聊天流 {stream_id} 时发生错误: {e}")
|
||||
|
||||
@@ -280,51 +278,51 @@ class MessageManager:
|
||||
except Exception as e:
|
||||
logger.error(f"清理不活跃聊天流时发生错误: {e}")
|
||||
|
||||
async def _check_and_handle_interruption(self, context: StreamContext, stream_id: str):
|
||||
async def _check_and_handle_interruption(self, chat_stream: Optional[ChatStream] = None):
|
||||
"""检查并处理消息打断"""
|
||||
if not global_config.chat.interruption_enabled:
|
||||
return
|
||||
|
||||
# 检查是否有正在进行的处理任务
|
||||
if context.processing_task and not context.processing_task.done():
|
||||
if chat_stream.context_manager.context.processing_task and not chat_stream.context_manager.context.processing_task.done():
|
||||
# 计算打断概率
|
||||
interruption_probability = context.calculate_interruption_probability(
|
||||
interruption_probability = chat_stream.context_manager.context.calculate_interruption_probability(
|
||||
global_config.chat.interruption_max_limit, global_config.chat.interruption_probability_factor
|
||||
)
|
||||
|
||||
# 检查是否已达到最大打断次数
|
||||
if context.interruption_count >= global_config.chat.interruption_max_limit:
|
||||
if chat_stream.context_manager.context.interruption_count >= global_config.chat.interruption_max_limit:
|
||||
logger.debug(
|
||||
f"聊天流 {stream_id} 已达到最大打断次数 {context.interruption_count}/{global_config.chat.interruption_max_limit},跳过打断检查"
|
||||
f"聊天流 {chat_stream.stream_id} 已达到最大打断次数 {chat_stream.context_manager.context.interruption_count}/{global_config.chat.interruption_max_limit},跳过打断检查"
|
||||
)
|
||||
return
|
||||
|
||||
# 根据概率决定是否打断
|
||||
if random.random() < interruption_probability:
|
||||
logger.info(f"聊天流 {stream_id} 触发消息打断,打断概率: {interruption_probability:.2f}")
|
||||
logger.info(f"聊天流 {chat_stream.stream_id} 触发消息打断,打断概率: {interruption_probability:.2f}")
|
||||
|
||||
# 取消现有任务
|
||||
context.processing_task.cancel()
|
||||
chat_stream.context_manager.context.processing_task.cancel()
|
||||
try:
|
||||
await context.processing_task
|
||||
await chat_stream.context_manager.context.processing_task
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
|
||||
# 增加打断计数并应用afc阈值降低
|
||||
context.increment_interruption_count()
|
||||
context.apply_interruption_afc_reduction(global_config.chat.interruption_afc_reduction)
|
||||
chat_stream.context_manager.context.increment_interruption_count()
|
||||
chat_stream.context_manager.context.apply_interruption_afc_reduction(global_config.chat.interruption_afc_reduction)
|
||||
|
||||
# 检查是否已达到最大次数
|
||||
if context.interruption_count >= global_config.chat.interruption_max_limit:
|
||||
if chat_stream.context_manager.context.interruption_count >= global_config.chat.interruption_max_limit:
|
||||
logger.warning(
|
||||
f"聊天流 {stream_id} 已达到最大打断次数 {context.interruption_count}/{global_config.chat.interruption_max_limit},后续消息将不再打断"
|
||||
f"聊天流 {chat_stream.stream_id} 已达到最大打断次数 {chat_stream.context_manager.context.interruption_count}/{global_config.chat.interruption_max_limit},后续消息将不再打断"
|
||||
)
|
||||
else:
|
||||
logger.info(
|
||||
f"聊天流 {stream_id} 已打断,当前打断次数: {context.interruption_count}/{global_config.chat.interruption_max_limit}, afc阈值调整: {context.get_afc_threshold_adjustment()}"
|
||||
f"聊天流 {chat_stream.stream_id} 已打断,当前打断次数: {chat_stream.context_manager.context.interruption_count}/{global_config.chat.interruption_max_limit}, afc阈值调整: {chat_stream.context_manager.context.get_afc_threshold_adjustment()}"
|
||||
)
|
||||
else:
|
||||
logger.debug(f"聊天流 {stream_id} 未触发打断,打断概率: {interruption_probability:.2f}")
|
||||
logger.debug(f"聊天流 {chat_stream.stream_id} 未触发打断,打断概率: {interruption_probability:.2f}")
|
||||
|
||||
async def clear_all_unread_messages(self, stream_id: str):
|
||||
"""清除指定上下文中的所有未读消息,在消息处理完成后调用"""
|
||||
@@ -358,4 +356,4 @@ class MessageManager:
|
||||
|
||||
|
||||
# 创建全局消息管理器实例
|
||||
message_manager = MessageManager()
|
||||
message_manager = MessageManag
|
||||
Reference in New Issue
Block a user