feat(message-manager): 用流循环管理器替换调度器/分派器

- 移除 scheduler_dispatcher 模块,并用 distribution_manager 替换
- 实现StreamLoopManager,以改进消息分发和中断处理
- 将消息缓存系统直接添加到StreamContext中,并配置缓存设置
- 使用具有缓存感知的消息处理来增强SingleStreamContextManager
- 更新`message_manager`,使用`stream_loop_manager`替代`scheduler_dispatcher`
- 在StreamContext数据模型中添加缓存统计和刷新方法
- 通过适当的任务取消和重新处理来改进中断处理
- 为ChatManager添加get_all_stream方法,以实现更优的流管理
- 更新亲和聊天规划器,以更可靠地处理专注/正常模式切换
This commit is contained in:
Windpicker-owo
2025-11-08 10:46:44 +08:00
parent 617ee5b5cb
commit 636b1e9fea
9 changed files with 1178 additions and 908 deletions

View File

@@ -5,6 +5,7 @@
import asyncio
import time
from collections import deque
from dataclasses import dataclass, field
from enum import Enum
from typing import TYPE_CHECKING, Optional
@@ -66,6 +67,16 @@ class StreamContext(BaseDataModel):
processing_message_id: str | None = None # 当前正在规划/处理的目标消息ID用于防止重复回复
decision_history: list["DecisionRecord"] = field(default_factory=list) # 决策历史
# 消息缓存系统相关字段
message_cache: deque["DatabaseMessages"] = field(default_factory=deque) # 消息缓存队列
is_cache_enabled: bool = False # 是否为此流启用缓存
cache_stats: dict = field(default_factory=lambda: {
"total_cached_messages": 0,
"total_flushed_messages": 0,
"cache_hits": 0,
"cache_misses": 0
}) # 缓存统计信息
def add_action_to_message(self, message_id: str, action: str):
"""
向指定消息添加执行的动作
@@ -189,13 +200,12 @@ class StreamContext(BaseDataModel):
and hasattr(self.current_message, "additional_config")
and self.current_message.additional_config
):
import orjson
try:
import json
config = json.loads(self.current_message.additional_config)
config = orjson.loads(self.current_message.additional_config)
if config.get("template_info") and not config.get("template_default", True):
return config.get("template_name")
except (json.JSONDecodeError, AttributeError):
except (orjson.JSONDecodeError, AttributeError):
pass
return None
@@ -231,9 +241,8 @@ class StreamContext(BaseDataModel):
# 优先从additional_config中获取format_info
if hasattr(self.current_message, "additional_config") and self.current_message.additional_config:
import orjson
try:
import orjson
logger.debug(f"[check_types] additional_config 类型: {type(self.current_message.additional_config)}")
config = orjson.loads(self.current_message.additional_config)
logger.debug(f"[check_types] 解析后的 config 键: {config.keys() if isinstance(config, dict) else 'N/A'}")
@@ -311,6 +320,145 @@ class StreamContext(BaseDataModel):
"""获取优先级信息"""
return self.priority_info
# ==================== 消息缓存系统方法 ====================
def enable_cache(self, enabled: bool = True):
"""
启用或禁用消息缓存系统
Args:
enabled: 是否启用缓存
"""
self.is_cache_enabled = enabled
logger.debug(f"StreamContext {self.stream_id} 缓存系统已{'启用' if enabled else '禁用'}")
def add_message_to_cache(self, message: "DatabaseMessages") -> bool:
"""
添加消息到缓存队列
Args:
message: 要缓存的消息
Returns:
bool: 是否成功添加到缓存
"""
if not self.is_cache_enabled:
self.cache_stats["cache_misses"] += 1
logger.debug(f"StreamContext {self.stream_id} 缓存未启用,消息无法缓存")
return False
try:
self.message_cache.append(message)
self.cache_stats["total_cached_messages"] += 1
self.cache_stats["cache_hits"] += 1
logger.debug(f"消息已添加到缓存: stream={self.stream_id}, message_id={message.message_id}, 缓存大小={len(self.message_cache)}")
return True
except Exception as e:
logger.error(f"添加消息到缓存失败: stream={self.stream_id}, error={e}")
return False
def flush_cached_messages(self) -> list["DatabaseMessages"]:
"""
刷新缓存消息到未读消息列表
Returns:
list[DatabaseMessages]: 刷新的消息列表
"""
if not self.message_cache:
logger.debug(f"StreamContext {self.stream_id} 缓存为空,无需刷新")
return []
try:
cached_messages = list(self.message_cache)
cache_size = len(cached_messages)
# 清空缓存队列
self.message_cache.clear()
# 将缓存消息添加到未读消息列表
self.unread_messages.extend(cached_messages)
# 更新统计信息
self.cache_stats["total_flushed_messages"] += cache_size
logger.debug(f"缓存消息已刷新到未读列表: stream={self.stream_id}, 数量={cache_size}")
return cached_messages
except Exception as e:
logger.error(f"刷新缓存消息失败: stream={self.stream_id}, error={e}")
return []
def get_cache_size(self) -> int:
"""
获取当前缓存大小
Returns:
int: 缓存中的消息数量
"""
return len(self.message_cache)
def clear_cache(self):
"""清空消息缓存"""
cache_size = len(self.message_cache)
self.message_cache.clear()
logger.debug(f"消息缓存已清空: stream={self.stream_id}, 清空数量={cache_size}")
def has_cached_messages(self) -> bool:
"""
检查是否有缓存的消息
Returns:
bool: 是否有缓存消息
"""
return len(self.message_cache) > 0
def get_cache_stats(self) -> dict:
"""
获取缓存统计信息
Returns:
dict: 缓存统计数据
"""
stats = self.cache_stats.copy()
stats.update({
"current_cache_size": len(self.message_cache),
"is_cache_enabled": self.is_cache_enabled,
"stream_id": self.stream_id
})
return stats
def add_message_with_cache_check(self, message: "DatabaseMessages", force_direct: bool = False) -> bool:
"""
智能添加消息:根据缓存状态决定是缓存还是直接添加到未读列表
Args:
message: 要添加的消息
force_direct: 是否强制直接添加到未读列表(跳过缓存)
Returns:
bool: 是否成功添加
"""
try:
# 如果强制直接添加或缓存未启用,直接添加到未读列表
if force_direct or not self.is_cache_enabled:
self.unread_messages.append(message)
logger.debug(f"消息直接添加到未读列表: stream={self.stream_id}, message_id={message.message_id}")
return True
# 如果正在处理中,添加到缓存
if self.is_chatter_processing:
return self.add_message_to_cache(message)
# 如果没有在处理,先刷新缓存再添加到未读列表
self.flush_cached_messages()
self.unread_messages.append(message)
logger.debug(f"消息添加到未读列表(已刷新缓存): stream={self.stream_id}, message_id={message.message_id}")
return True
except Exception as e:
logger.error(f"智能添加消息失败: stream={self.stream_id}, error={e}")
return False
def __deepcopy__(self, memo):
"""自定义深拷贝,跳过不可序列化的 asyncio.Task (processing_task)。
@@ -332,9 +480,16 @@ class StreamContext(BaseDataModel):
memo[obj_id] = new
for k, v in self.__dict__.items():
if k == "processing_task":
if k in ["processing_task", "stream_loop_task"]:
# 不复制 asyncio.Task避免无法 pickling
setattr(new, k, None)
elif k == "message_cache":
# 深拷贝消息缓存队列
try:
setattr(new, k, copy.deepcopy(v, memo))
except Exception:
# 如果拷贝失败,创建新的空队列
setattr(new, k, deque())
else:
try:
setattr(new, k, copy.deepcopy(v, memo))