feat(message-manager): 用流循环管理器替换调度器/分派器
- 移除 scheduler_dispatcher 模块,并用 distribution_manager 替换 - 实现StreamLoopManager,以改进消息分发和中断处理 - 将消息缓存系统直接添加到StreamContext中,并配置缓存设置 - 使用具有缓存感知的消息处理来增强SingleStreamContextManager - 更新`message_manager`,使用`stream_loop_manager`替代`scheduler_dispatcher` - 在StreamContext数据模型中添加缓存统计和刷新方法 - 通过适当的任务取消和重新处理来改进中断处理 - 为ChatManager添加get_all_stream方法,以实现更优的流管理 - 更新亲和聊天规划器,以更可靠地处理专注/正常模式切换
This commit is contained in:
@@ -4,11 +4,12 @@
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import random
|
||||
import time
|
||||
from collections import defaultdict, deque
|
||||
from typing import TYPE_CHECKING, Any
|
||||
|
||||
from src.chat.chatter_manager import ChatterManager
|
||||
from src.chat.message_receive.chat_stream import ChatStream
|
||||
from src.chat.planner_actions.action_manager import ChatterActionManager
|
||||
from src.common.data_models.database_data_model import DatabaseMessages
|
||||
from src.common.data_models.message_manager_data_model import MessageManagerStats, StreamStats
|
||||
@@ -16,8 +17,8 @@ from src.common.logger import get_logger
|
||||
from src.config.config import global_config
|
||||
from src.plugin_system.apis.chat_api import get_chat_manager
|
||||
|
||||
from .distribution_manager import stream_loop_manager
|
||||
from .global_notice_manager import NoticeScope, global_notice_manager
|
||||
from .scheduler_dispatcher import scheduler_dispatcher
|
||||
|
||||
if TYPE_CHECKING:
|
||||
pass
|
||||
@@ -40,14 +41,6 @@ class MessageManager:
|
||||
self.action_manager = ChatterActionManager()
|
||||
self.chatter_manager = ChatterManager(self.action_manager)
|
||||
|
||||
# 消息缓存系统 - 直接集成到消息管理器
|
||||
self.message_caches: dict[str, deque] = defaultdict(deque) # 每个流的消息缓存
|
||||
self.stream_processing_status: dict[str, bool] = defaultdict(bool) # 流的处理状态
|
||||
self.cache_stats = {
|
||||
"total_cached_messages": 0,
|
||||
"total_flushed_messages": 0,
|
||||
}
|
||||
|
||||
# 不再需要全局上下文管理器,直接通过 ChatManager 访问各个 ChatStream 的 context_manager
|
||||
|
||||
# 全局Notice管理器
|
||||
@@ -69,19 +62,11 @@ class MessageManager:
|
||||
except Exception as e:
|
||||
logger.error(f"启动批量数据库写入器失败: {e}")
|
||||
|
||||
# 启动消息缓存系统(内置)
|
||||
logger.debug("消息缓存系统已启动")
|
||||
# 启动流循环管理器并设置chatter_manager
|
||||
await stream_loop_manager.start()
|
||||
stream_loop_manager.set_chatter_manager(self.chatter_manager)
|
||||
|
||||
# 启动基于 scheduler 的消息分发器
|
||||
await scheduler_dispatcher.start()
|
||||
scheduler_dispatcher.set_chatter_manager(self.chatter_manager)
|
||||
|
||||
# 保留旧的流循环管理器(暂时)以便平滑过渡
|
||||
# TODO: 在确认新机制稳定后移除
|
||||
# await stream_loop_manager.start()
|
||||
# stream_loop_manager.set_chatter_manager(self.chatter_manager)
|
||||
|
||||
logger.info("消息管理器已启动(使用 Scheduler 分发器)")
|
||||
logger.info("消息管理器已启动")
|
||||
|
||||
async def stop(self):
|
||||
"""停止消息管理器"""
|
||||
@@ -99,34 +84,15 @@ class MessageManager:
|
||||
except Exception as e:
|
||||
logger.error(f"停止批量数据库写入器失败: {e}")
|
||||
|
||||
# 停止消息缓存系统(内置)
|
||||
self.message_caches.clear()
|
||||
self.stream_processing_status.clear()
|
||||
logger.debug("消息缓存系统已停止")
|
||||
|
||||
# 停止基于 scheduler 的消息分发器
|
||||
await scheduler_dispatcher.stop()
|
||||
|
||||
# 停止旧的流循环管理器(如果启用)
|
||||
# await stream_loop_manager.stop()
|
||||
# 停止流循环管理器
|
||||
await stream_loop_manager.stop()
|
||||
|
||||
logger.info("消息管理器已停止")
|
||||
|
||||
async def add_message(self, stream_id: str, message: DatabaseMessages):
|
||||
"""添加消息到指定聊天流
|
||||
|
||||
新的流程:
|
||||
1. 检查 notice 消息
|
||||
2. 将消息添加到上下文(缓存)
|
||||
3. 通知 scheduler_dispatcher 处理(检查打断、创建/更新 schedule)
|
||||
"""
|
||||
"""添加消息到指定聊天流"""
|
||||
|
||||
try:
|
||||
# 硬编码过滤表情包消息
|
||||
if message.processed_plain_text and message.processed_plain_text.startswith("[表情包"):
|
||||
logger.info(f"检测到表情包消息,已过滤: {message.processed_plain_text}")
|
||||
return
|
||||
|
||||
# 检查是否为notice消息
|
||||
if self._is_notice_message(message):
|
||||
# Notice消息处理 - 添加到全局管理器
|
||||
@@ -147,14 +113,11 @@ class MessageManager:
|
||||
if not chat_stream:
|
||||
logger.warning(f"MessageManager.add_message: 聊天流 {stream_id} 不存在")
|
||||
return
|
||||
|
||||
# 将消息添加到上下文
|
||||
# 启动steam loop任务(如果尚未启动)
|
||||
await stream_loop_manager.start_stream_loop(stream_id)
|
||||
await self._check_and_handle_interruption(chat_stream, message)
|
||||
await chat_stream.context_manager.add_message(message)
|
||||
|
||||
# 通知 scheduler_dispatcher 处理消息接收事件
|
||||
# dispatcher 会检查是否需要打断、创建或更新 schedule
|
||||
await scheduler_dispatcher.on_message_received(stream_id)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"添加消息到聊天流 {stream_id} 时发生错误: {e}")
|
||||
|
||||
@@ -321,9 +284,122 @@ class MessageManager:
|
||||
except Exception as e:
|
||||
logger.error(f"清理不活跃聊天流时发生错误: {e}")
|
||||
|
||||
# === 已废弃的方法已移除 ===
|
||||
# _check_and_handle_interruption 和 _trigger_reprocess 已由 scheduler_dispatcher 接管
|
||||
# 如需查看历史代码,请参考 git 历史记录
|
||||
async def _check_and_handle_interruption(self, chat_stream: ChatStream | None = None, message: DatabaseMessages | None = None):
|
||||
"""检查并处理消息打断 - 通过取消 stream_loop_task 实现"""
|
||||
if not global_config.chat.interruption_enabled or not chat_stream or not message:
|
||||
return
|
||||
|
||||
# 检查是否正在回复,以及是否允许在回复时打断
|
||||
if chat_stream.context_manager.context.is_replying:
|
||||
if not global_config.chat.allow_reply_interruption:
|
||||
logger.debug(f"聊天流 {chat_stream.stream_id} 正在回复中,且配置不允许回复时打断,跳过打断检查")
|
||||
return
|
||||
else:
|
||||
logger.debug(f"聊天流 {chat_stream.stream_id} 正在回复中,但配置允许回复时打断")
|
||||
|
||||
# 检查是否为表情包消息
|
||||
if message.is_picid or message.is_emoji:
|
||||
logger.info(f"消息 {message.message_id} 是表情包或Emoji,跳过打断检查")
|
||||
return
|
||||
|
||||
# 检查上下文
|
||||
context = chat_stream.context_manager.context
|
||||
|
||||
# 只有当 Chatter 真正在处理时才检查打断
|
||||
if not context.is_chatter_processing:
|
||||
logger.debug(f"聊天流 {chat_stream.stream_id} Chatter 未在处理,跳过打断检查")
|
||||
return
|
||||
|
||||
# 检查是否有 stream_loop_task 在运行
|
||||
stream_loop_task = context.stream_loop_task
|
||||
|
||||
if stream_loop_task and not stream_loop_task.done():
|
||||
# 检查触发用户ID
|
||||
triggering_user_id = context.triggering_user_id
|
||||
if triggering_user_id and message.user_info.user_id != triggering_user_id:
|
||||
logger.info(f"消息来自非触发用户 {message.user_info.user_id},实际触发用户为 {triggering_user_id},跳过打断检查")
|
||||
return
|
||||
|
||||
# 计算打断概率
|
||||
interruption_probability = context.calculate_interruption_probability(
|
||||
global_config.chat.interruption_max_limit
|
||||
)
|
||||
|
||||
# 检查是否已达到最大打断次数
|
||||
if context.interruption_count >= global_config.chat.interruption_max_limit:
|
||||
logger.debug(
|
||||
f"聊天流 {chat_stream.stream_id} 已达到最大打断次数 {context.interruption_count}/{global_config.chat.interruption_max_limit},跳过打断检查"
|
||||
)
|
||||
return
|
||||
|
||||
# 根据概率决定是否打断
|
||||
if random.random() < interruption_probability:
|
||||
logger.info(f"聊天流 {chat_stream.stream_id} 触发消息打断,打断概率: {interruption_probability:.2f}")
|
||||
|
||||
# 取消 stream_loop_task,子任务会通过 try-catch 自动取消
|
||||
try:
|
||||
stream_loop_task.cancel()
|
||||
|
||||
# 等待任务真正结束(设置超时避免死锁)
|
||||
try:
|
||||
await asyncio.wait_for(stream_loop_task, timeout=2.0)
|
||||
logger.info(f"流循环任务已完全结束: {chat_stream.stream_id}")
|
||||
except asyncio.TimeoutError:
|
||||
logger.warning(f"等待流循环任务结束超时: {chat_stream.stream_id}")
|
||||
except asyncio.CancelledError:
|
||||
logger.info(f"流循环任务已被取消: {chat_stream.stream_id}")
|
||||
except Exception as e:
|
||||
logger.warning(f"取消流循环任务失败: {chat_stream.stream_id} - {e}")
|
||||
|
||||
# 增加打断计数
|
||||
await context.increment_interruption_count()
|
||||
|
||||
# 打断后重新创建 stream_loop 任务
|
||||
await self._trigger_reprocess(chat_stream)
|
||||
|
||||
# 检查是否已达到最大次数
|
||||
if context.interruption_count >= global_config.chat.interruption_max_limit:
|
||||
logger.warning(
|
||||
f"聊天流 {chat_stream.stream_id} 已达到最大打断次数 {context.interruption_count}/{global_config.chat.interruption_max_limit},后续消息将不再打断"
|
||||
)
|
||||
else:
|
||||
logger.info(
|
||||
f"聊天流 {chat_stream.stream_id} 已打断并重新进入处理流程,当前打断次数: {context.interruption_count}/{global_config.chat.interruption_max_limit}"
|
||||
)
|
||||
else:
|
||||
logger.debug(f"聊天流 {chat_stream.stream_id} 未触发打断,打断概率: {interruption_probability:.2f}")
|
||||
|
||||
async def _trigger_reprocess(self, chat_stream: ChatStream):
|
||||
"""重新处理聊天流的核心逻辑 - 重新创建 stream_loop 任务"""
|
||||
try:
|
||||
stream_id = chat_stream.stream_id
|
||||
|
||||
logger.info(f"🚀 打断后重新创建流循环任务: {stream_id}")
|
||||
|
||||
# 等待一小段时间确保当前消息已经添加到未读消息中
|
||||
await asyncio.sleep(0.1)
|
||||
|
||||
# 获取当前的stream context
|
||||
context = chat_stream.context_manager.context
|
||||
|
||||
# 确保有未读消息需要处理
|
||||
unread_messages = context.get_unread_messages()
|
||||
if not unread_messages:
|
||||
logger.debug(f"聊天流 {stream_id} 没有未读消息,跳过重新处理")
|
||||
return
|
||||
|
||||
logger.debug(f"准备重新处理 {len(unread_messages)} 条未读消息: {stream_id}")
|
||||
|
||||
# 重新创建 stream_loop 任务
|
||||
success = await stream_loop_manager.start_stream_loop(stream_id, force=True)
|
||||
|
||||
if success:
|
||||
logger.debug(f"成功重新创建流循环任务: {stream_id}")
|
||||
else:
|
||||
logger.warning(f"重新创建流循环任务失败: {stream_id}")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"触发重新处理时出错: {e}")
|
||||
|
||||
async def clear_all_unread_messages(self, stream_id: str):
|
||||
"""清除指定上下文中的所有未读消息,在消息处理完成后调用"""
|
||||
@@ -374,71 +450,44 @@ class MessageManager:
|
||||
except Exception as e:
|
||||
logger.error(f"清除流 {stream_id} 的未读消息时发生错误: {e}")
|
||||
|
||||
# ===== 消息缓存系统方法 =====
|
||||
|
||||
def add_message_to_cache(self, stream_id: str, message: DatabaseMessages) -> bool:
|
||||
"""添加消息到缓存
|
||||
|
||||
Args:
|
||||
stream_id: 流ID
|
||||
message: 消息对象
|
||||
|
||||
Returns:
|
||||
bool: 是否成功添加到缓存
|
||||
"""
|
||||
try:
|
||||
if not self.is_running:
|
||||
return False
|
||||
|
||||
self.message_caches[stream_id].append(message)
|
||||
self.cache_stats["total_cached_messages"] += 1
|
||||
|
||||
if message.processed_plain_text:
|
||||
logger.debug(f"消息已添加到缓存: stream={stream_id}, content={message.processed_plain_text[:50]}...")
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.error(f"添加消息到缓存失败: stream={stream_id}, error={e}")
|
||||
return False
|
||||
|
||||
def flush_cached_messages(self, stream_id: str) -> list[DatabaseMessages]:
|
||||
"""刷新缓存消息到未读消息列表
|
||||
|
||||
Args:
|
||||
stream_id: 流ID
|
||||
|
||||
Returns:
|
||||
List[DatabaseMessages]: 缓存的消息列表
|
||||
"""
|
||||
try:
|
||||
if stream_id not in self.message_caches:
|
||||
return []
|
||||
|
||||
cached_messages = list(self.message_caches[stream_id])
|
||||
self.message_caches[stream_id].clear()
|
||||
|
||||
self.cache_stats["total_flushed_messages"] += len(cached_messages)
|
||||
|
||||
logger.debug(f"刷新缓存消息: stream={stream_id}, 数量={len(cached_messages)}")
|
||||
return cached_messages
|
||||
except Exception as e:
|
||||
logger.error(f"刷新缓存消息失败: stream={stream_id}, error={e}")
|
||||
return []
|
||||
# ===== 流处理状态相关方法(用于向后兼容) =====
|
||||
|
||||
def set_stream_processing_status(self, stream_id: str, is_processing: bool):
|
||||
"""设置流的处理状态
|
||||
"""设置流的处理状态 - 已迁移到StreamContext,此方法仅用于向后兼容
|
||||
|
||||
Args:
|
||||
stream_id: 流ID
|
||||
is_processing: 是否正在处理
|
||||
"""
|
||||
try:
|
||||
self.stream_processing_status[stream_id] = is_processing
|
||||
logger.debug(f"设置流处理状态: stream={stream_id}, processing={is_processing}")
|
||||
# 尝试更新StreamContext的处理状态
|
||||
import asyncio
|
||||
async def _update_context():
|
||||
try:
|
||||
chat_manager = get_chat_manager()
|
||||
chat_stream = await chat_manager.get_stream(stream_id)
|
||||
if chat_stream and hasattr(chat_stream.context_manager.context, 'is_chatter_processing'):
|
||||
chat_stream.context_manager.context.is_chatter_processing = is_processing
|
||||
logger.debug(f"设置StreamContext处理状态: stream={stream_id}, processing={is_processing}")
|
||||
except Exception as e:
|
||||
logger.debug(f"更新StreamContext状态失败: stream={stream_id}, error={e}")
|
||||
|
||||
# 在当前事件循环中执行(如果可能)
|
||||
try:
|
||||
loop = asyncio.get_event_loop()
|
||||
if loop.is_running():
|
||||
asyncio.create_task(_update_context())
|
||||
else:
|
||||
# 如果事件循环未运行,则跳过
|
||||
logger.debug("事件循环未运行,跳过StreamContext状态更新")
|
||||
except RuntimeError:
|
||||
logger.debug("无法获取事件循环,跳过StreamContext状态更新")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"设置流处理状态失败: stream={stream_id}, error={e}")
|
||||
logger.debug(f"设置流处理状态失败(向后兼容模式): stream={stream_id}, error={e}")
|
||||
|
||||
def get_stream_processing_status(self, stream_id: str) -> bool:
|
||||
"""获取流的处理状态
|
||||
"""获取流的处理状态 - 已迁移到StreamContext,此方法仅用于向后兼容
|
||||
|
||||
Args:
|
||||
stream_id: 流ID
|
||||
@@ -446,43 +495,33 @@ class MessageManager:
|
||||
Returns:
|
||||
bool: 是否正在处理
|
||||
"""
|
||||
return self.stream_processing_status.get(stream_id, False)
|
||||
try:
|
||||
# 尝试从StreamContext获取处理状态
|
||||
import asyncio
|
||||
async def _get_context_status():
|
||||
try:
|
||||
chat_manager = get_chat_manager()
|
||||
chat_stream = await chat_manager.get_stream(stream_id)
|
||||
if chat_stream and hasattr(chat_stream.context_manager.context, 'is_chatter_processing'):
|
||||
return chat_stream.context_manager.context.is_chatter_processing
|
||||
except Exception:
|
||||
pass
|
||||
return False
|
||||
|
||||
def has_cached_messages(self, stream_id: str) -> bool:
|
||||
"""检查流是否有缓存消息
|
||||
# 同步获取状态(如果可能)
|
||||
try:
|
||||
loop = asyncio.get_event_loop()
|
||||
if loop.is_running():
|
||||
# 如果事件循环正在运行,我们无法在这里等待,返回默认值
|
||||
return False
|
||||
else:
|
||||
# 如果事件循环未运行,运行它来获取状态
|
||||
return loop.run_until_complete(_get_context_status())
|
||||
except RuntimeError:
|
||||
return False
|
||||
|
||||
Args:
|
||||
stream_id: 流ID
|
||||
|
||||
Returns:
|
||||
bool: 是否有缓存消息
|
||||
"""
|
||||
return stream_id in self.message_caches and len(self.message_caches[stream_id]) > 0
|
||||
|
||||
def get_cached_messages_count(self, stream_id: str) -> int:
|
||||
"""获取流的缓存消息数量
|
||||
|
||||
Args:
|
||||
stream_id: 流ID
|
||||
|
||||
Returns:
|
||||
int: 缓存消息数量
|
||||
"""
|
||||
return len(self.message_caches.get(stream_id, []))
|
||||
|
||||
def get_cache_stats(self) -> dict[str, Any]:
|
||||
"""获取缓存统计信息
|
||||
|
||||
Returns:
|
||||
Dict[str, Any]: 缓存统计信息
|
||||
"""
|
||||
return {
|
||||
"total_cached_messages": self.cache_stats["total_cached_messages"],
|
||||
"total_flushed_messages": self.cache_stats["total_flushed_messages"],
|
||||
"active_caches": len(self.message_caches),
|
||||
"cached_streams": len([s for s in self.message_caches.keys() if self.message_caches[s]]),
|
||||
"processing_streams": len([s for s in self.stream_processing_status.keys() if self.stream_processing_status[s]]),
|
||||
}
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
# ===== Notice管理相关方法 =====
|
||||
|
||||
@@ -623,4 +662,4 @@ class MessageManager:
|
||||
|
||||
|
||||
# 创建全局消息管理器实例
|
||||
message_manager = MessageManager()
|
||||
message_manager = MessageManager()
|
||||
Reference in New Issue
Block a user