refactor(message_manager): 重构消息分发机制为流循环模式

重构原有的动态消息分发管理器为流循环管理器,每个聊天流拥有独立的无限循环任务主动轮询处理消息。

主要变更:
- 移除 DistributionManager 及相关类(DistributionPriority、DistributionTask、StreamDistributionState、DistributionExecutor)
- 新增 StreamLoopManager 实现基于流的循环处理机制
- 修改 context_manager 和 message_manager 以适配新的流循环模式
- 优化 plan_filter.py 中的消息处理逻辑以适应新的数据格式

BREAKING CHANGE: 原有的分发管理器 API 已被移除,需要更新所有依赖分发功能的代码
This commit is contained in:
Windpicker-owo
2025-09-29 00:42:54 +08:00
parent 8b034e21c6
commit 903ab855bf
5 changed files with 284 additions and 1278 deletions

View File

@@ -1,25 +1,19 @@
""" """
消息管理器模块 消息管理器模块
提供统一的消息管理、上下文管理和分发调度功能 提供统一的消息管理、上下文管理和流循环调度功能
""" """
from .message_manager import MessageManager, message_manager from .message_manager import MessageManager, message_manager
from .context_manager import SingleStreamContextManager from .context_manager import SingleStreamContextManager
from .distribution_manager import ( from .distribution_manager import (
DistributionManager, StreamLoopManager,
DistributionPriority, stream_loop_manager
DistributionTask,
StreamDistributionState,
distribution_manager
) )
__all__ = [ __all__ = [
"MessageManager", "MessageManager",
"message_manager", "message_manager",
"SingleStreamContextManager", "SingleStreamContextManager",
"DistributionManager", "StreamLoopManager",
"DistributionPriority", "stream_loop_manager"
"DistributionTask",
"StreamDistributionState",
"distribution_manager"
] ]

View File

@@ -13,7 +13,7 @@ from src.common.logger import get_logger
from src.config.config import global_config from src.config.config import global_config
from src.common.data_models.database_data_model import DatabaseMessages from src.common.data_models.database_data_model import DatabaseMessages
from src.chat.energy_system import energy_manager from src.chat.energy_system import energy_manager
from .distribution_manager import distribution_manager from .distribution_manager import stream_loop_manager
logger = get_logger("context_manager") logger = get_logger("context_manager")
@@ -60,8 +60,9 @@ class SingleStreamContextManager:
self.last_access_time = time.time() self.last_access_time = time.time()
if not skip_energy_update: if not skip_energy_update:
await self._update_stream_energy() await self._update_stream_energy()
distribution_manager.add_stream_message(self.stream_id, 1) # 启动流的循环任务(如果还未启动)
logger.debug(f"添加消息到单流上下文: {self.stream_id} (兴趣度: {interest_value:.3f})") await stream_loop_manager.start_stream_loop(self.stream_id)
logger.info(f"添加消息到单流上下文: {self.stream_id} (兴趣度: {interest_value:.3f})")
return True return True
except Exception as e: except Exception as e:
logger.error(f"添加消息到单流上下文失败 {self.stream_id}: {e}", exc_info=True) logger.error(f"添加消息到单流上下文失败 {self.stream_id}: {e}", exc_info=True)
@@ -293,7 +294,8 @@ class SingleStreamContextManager:
if not skip_energy_update: if not skip_energy_update:
await self._update_stream_energy() await self._update_stream_energy()
distribution_manager.add_stream_message(self.stream_id, 1) # 启动流的循环任务(如果还未启动)
await stream_loop_manager.start_stream_loop(self.stream_id)
logger.debug(f"添加消息到单流上下文(异步): {self.stream_id} (兴趣度: {interest_value:.3f})") logger.debug(f"添加消息到单流上下文(异步): {self.stream_id} (兴趣度: {interest_value:.3f})")
return True return True
@@ -356,8 +358,8 @@ class SingleStreamContextManager:
stream_id=self.stream_id, messages=combined_messages, user_id=user_id stream_id=self.stream_id, messages=combined_messages, user_id=user_id
) )
# 更新分发管理器 # 更新流循环管理器
distribution_manager.update_stream_energy(self.stream_id, energy) # 注意能量更新会通过energy_manager自动同步到流循环管理器
except Exception as e: except Exception as e:
logger.error(f"更新单流能量失败 {self.stream_id}: {e}") logger.error(f"更新单流能量失败 {self.stream_id}: {e}")

File diff suppressed because it is too large Load Diff

View File

@@ -6,7 +6,6 @@
import asyncio import asyncio
import random import random
import time import time
import traceback
from typing import Dict, Optional, Any, TYPE_CHECKING from typing import Dict, Optional, Any, TYPE_CHECKING
from src.common.logger import get_logger from src.common.logger import get_logger
@@ -18,6 +17,7 @@ from .sleep_manager.sleep_manager import SleepManager
from .sleep_manager.wakeup_manager import WakeUpManager from .sleep_manager.wakeup_manager import WakeUpManager
from src.config.config import global_config from src.config.config import global_config
from src.plugin_system.apis.chat_api import get_chat_manager from src.plugin_system.apis.chat_api import get_chat_manager
from .distribution_manager import stream_loop_manager
if TYPE_CHECKING: if TYPE_CHECKING:
from src.common.data_models.message_manager_data_model import StreamContext from src.common.data_models.message_manager_data_model import StreamContext
@@ -53,11 +53,16 @@ class MessageManager:
return return
self.is_running = True self.is_running = True
self.manager_task = asyncio.create_task(self._manager_loop())
await self.wakeup_manager.start()
# await self.context_manager.start() # 已删除,需要重构
logger.info("消息管理器已启动")
# 启动睡眠和唤醒管理器
await self.wakeup_manager.start()
# 启动流循环管理器并设置chatter_manager
await stream_loop_manager.start()
stream_loop_manager.set_chatter_manager(self.chatter_manager)
logger.info("🚀 消息管理器已启动 | 流循环管理器已启动")
async def stop(self): async def stop(self):
"""停止消息管理器""" """停止消息管理器"""
if not self.is_running: if not self.is_running:
@@ -65,15 +70,13 @@ class MessageManager:
self.is_running = False self.is_running = False
# 停止所有流处理任务 # 停止睡眠和唤醒管理器
# 注意context_manager 会自己清理任务
if self.manager_task and not self.manager_task.done():
self.manager_task.cancel()
await self.wakeup_manager.stop() await self.wakeup_manager.stop()
# await self.context_manager.stop() # 已删除,需要重构
logger.info("消息管理器已停止") # 停止流循环管理器
await stream_loop_manager.stop()
logger.info("🛑 消息管理器已停止 | 流循环管理器已停止")
async def add_message(self, stream_id: str, message: DatabaseMessages): async def add_message(self, stream_id: str, message: DatabaseMessages):
"""添加消息到指定聊天流""" """添加消息到指定聊天流"""
@@ -140,152 +143,6 @@ class MessageManager:
except Exception as e: except Exception as e:
logger.error(f"为消息 {message_id} 添加动作时发生错误: {e}") logger.error(f"为消息 {message_id} 添加动作时发生错误: {e}")
async def _manager_loop(self):
"""管理器主循环 - 独立聊天流分发周期版本"""
while self.is_running:
try:
# 更新睡眠状态
await self.sleep_manager.update_sleep_state(self.wakeup_manager)
# 执行独立分发周期的检查
await self._check_streams_with_individual_intervals()
# 计算下次检查时间(使用最小间隔或固定间隔)
if global_config.chat.dynamic_distribution_enabled:
next_check_delay = self._calculate_next_manager_delay()
else:
next_check_delay = self.check_interval
await asyncio.sleep(next_check_delay)
except asyncio.CancelledError:
break
except Exception as e:
logger.error(f"消息管理器循环出错: {e}")
traceback.print_exc()
async def _check_all_streams(self):
"""检查所有聊天流"""
active_streams = 0
total_unread = 0
# 通过 ChatManager 获取所有活跃的流
try:
chat_manager = get_chat_manager()
active_stream_ids = list(chat_manager.streams.keys())
for stream_id in active_stream_ids:
chat_stream = chat_manager.get_stream(stream_id)
if not chat_stream:
continue
# 检查流是否活跃
context = chat_stream.stream_context
if not context.is_active:
continue
active_streams += 1
# 检查是否有未读消息
unread_messages = chat_stream.context_manager.get_unread_messages()
if unread_messages:
total_unread += len(unread_messages)
# 如果没有处理任务,创建一个
if not hasattr(context, 'processing_task') or not context.processing_task or context.processing_task.done():
context.processing_task = asyncio.create_task(self._process_stream_messages(stream_id))
# 更新统计
self.stats.active_streams = active_streams
self.stats.total_unread_messages = total_unread
except Exception as e:
logger.error(f"检查所有聊天流时发生错误: {e}")
async def _process_stream_messages(self, stream_id: str):
"""处理指定聊天流的消息"""
try:
# 通过 ChatManager 获取 ChatStream
chat_manager = get_chat_manager()
chat_stream = chat_manager.get_stream(stream_id)
if not chat_stream:
logger.warning(f"处理消息失败: 聊天流 {stream_id} 不存在")
return
context = chat_stream.stream_context
# 获取未读消息
unread_messages = chat_stream.context_manager.get_unread_messages()
if not unread_messages:
return
# 检查是否需要打断现有处理
await self._check_and_handle_interruption(context, stream_id)
# --- 睡眠状态检查 ---
if self.sleep_manager.is_sleeping():
logger.info(f"Bot正在睡觉检查聊天流 {stream_id} 是否有唤醒触发器。")
was_woken_up = False
is_private = context.is_private_chat()
for message in unread_messages:
is_mentioned = message.is_mentioned or False
if not is_mentioned and not is_private:
bot_names = [global_config.bot.nickname] + global_config.bot.alias_names
if any(name in message.processed_plain_text for name in bot_names):
is_mentioned = True
logger.debug(f"通过关键词 '{next((name for name in bot_names if name in message.processed_plain_text), '')}' 匹配将消息标记为 'is_mentioned'")
if is_private or is_mentioned:
if self.wakeup_manager.add_wakeup_value(is_private, is_mentioned, chat_id=stream_id):
was_woken_up = True
break # 一旦被吵醒,就跳出循环并处理消息
if not was_woken_up:
logger.debug(f"聊天流 {stream_id} 中没有唤醒触发器,保持消息未读状态。")
return # 退出,不处理消息
logger.info(f"Bot被聊天流 {stream_id} 中的消息吵醒,继续处理。")
elif self.sleep_manager.is_woken_up():
angry_chat_id = self.wakeup_manager.angry_chat_id
if stream_id != angry_chat_id:
logger.debug(f"Bot处于WOKEN_UP状态但当前流 {stream_id} 不是触发唤醒的流 {angry_chat_id},跳过处理。")
return # 退出,不处理此流的消息
logger.info(f"Bot处于WOKEN_UP状态处理触发唤醒的流 {stream_id}")
# --- 睡眠状态检查结束 ---
logger.debug(f"开始处理聊天流 {stream_id}{len(unread_messages)} 条未读消息")
# 直接使用StreamContext对象进行处理
if unread_messages:
try:
# 记录当前chat type用于调试
logger.debug(f"聊天流 {stream_id} 检测到的chat type: {context.chat_type.value}")
# 发送到chatter manager传递StreamContext对象
results = await self.chatter_manager.process_stream_context(stream_id, context)
# 处理结果,标记消息为已读
if results.get("success", False):
self._clear_all_unread_messages(stream_id)
logger.debug(f"聊天流 {stream_id} 处理成功,清除了 {len(unread_messages)} 条未读消息")
else:
logger.warning(f"聊天流 {stream_id} 处理失败: {results.get('error_message', '未知错误')}")
except Exception as e:
logger.error(f"处理聊天流 {stream_id} 时发生异常,将清除所有未读消息: {e}")
# 出现异常时也清除未读消息,避免重复处理
self._clear_all_unread_messages(stream_id)
raise
logger.debug(f"聊天流 {stream_id} 消息处理完成")
except asyncio.CancelledError:
raise
except Exception as e:
logger.error(f"处理聊天流 {stream_id} 消息时出错: {e}")
traceback.print_exc()
def deactivate_stream(self, stream_id: str): def deactivate_stream(self, stream_id: str):
"""停用聊天流""" """停用聊天流"""
try: try:
@@ -431,211 +288,6 @@ class MessageManager:
else: else:
logger.debug(f"聊天流 {stream_id} 未触发打断,打断概率: {interruption_probability:.2f}") logger.debug(f"聊天流 {stream_id} 未触发打断,打断概率: {interruption_probability:.2f}")
def _calculate_stream_distribution_interval(self, context: StreamContext) -> float:
"""计算单个聊天流的分发周期 - 使用重构后的能量管理器"""
if not global_config.chat.dynamic_distribution_enabled:
return self.check_interval # 使用固定间隔
try:
from src.chat.energy_system import energy_manager
from src.plugin_system.apis.chat_api import get_chat_manager
# 获取聊天流和能量
chat_stream = get_chat_manager().get_stream(context.stream_id)
if chat_stream:
focus_energy = chat_stream.focus_energy
# 使用能量管理器获取分发周期
interval = energy_manager.get_distribution_interval(focus_energy)
logger.debug(f"{context.stream_id} 分发周期: {interval:.2f}s (能量: {focus_energy:.3f})")
return interval
else:
# 默认间隔
return self.check_interval
except Exception as e:
logger.error(f"计算分发周期失败: {e}")
return self.check_interval
def _calculate_next_manager_delay(self) -> float:
"""计算管理器下次检查的延迟时间"""
current_time = time.time()
min_delay = float("inf")
# 找到最近需要检查的流
try:
chat_manager = get_chat_manager()
for _stream_id, chat_stream in chat_manager.streams.items():
context = chat_stream.stream_context
if not context or not context.is_active:
continue
time_until_check = context.next_check_time - current_time
if time_until_check > 0:
min_delay = min(min_delay, time_until_check)
else:
min_delay = 0.1 # 立即检查
break
# 如果没有活跃流,使用默认间隔
if min_delay == float("inf"):
return self.check_interval
# 确保最小延迟
return max(0.1, min(min_delay, self.check_interval))
except Exception as e:
logger.error(f"计算下次检查延迟时发生错误: {e}")
return self.check_interval
async def _check_streams_with_individual_intervals(self):
"""检查所有达到检查时间的聊天流"""
current_time = time.time()
processed_streams = 0
# 通过 ChatManager 获取活跃的流
try:
chat_manager = get_chat_manager()
for stream_id, chat_stream in chat_manager.streams.items():
context = chat_stream.stream_context
if not context or not context.is_active:
continue
# 检查是否达到检查时间
if current_time >= context.next_check_time:
# 更新检查时间
context.last_check_time = current_time
# 计算下次检查时间和分发周期
if global_config.chat.dynamic_distribution_enabled:
context.distribution_interval = self._calculate_stream_distribution_interval(context)
else:
context.distribution_interval = self.check_interval
# 设置下次检查时间
context.next_check_time = current_time + context.distribution_interval
# 检查未读消息
unread_messages = chat_stream.context_manager.get_unread_messages()
if unread_messages:
processed_streams += 1
self.stats.total_unread_messages = len(unread_messages)
# 如果没有处理任务,创建一个
if not context.processing_task or context.processing_task.done():
focus_energy = chat_stream.focus_energy
# 根据优先级记录日志
if focus_energy >= 0.7:
logger.info(
f"高优先级流 {stream_id} 开始处理 | "
f"focus_energy: {focus_energy:.3f} | "
f"分发周期: {context.distribution_interval:.2f}s | "
f"未读消息: {len(unread_messages)}"
)
else:
logger.debug(
f"{stream_id} 开始处理 | "
f"focus_energy: {focus_energy:.3f} | "
f"分发周期: {context.distribution_interval:.2f}s"
)
context.processing_task = asyncio.create_task(self._process_stream_messages(stream_id))
except Exception as e:
logger.error(f"检查独立分发周期的聊天流时发生错误: {e}")
# 更新活跃流计数
try:
chat_manager = get_chat_manager()
active_count = len([s for s in chat_manager.streams.values() if s.stream_context.is_active])
self.stats.active_streams = active_count
if processed_streams > 0:
logger.debug(f"本次循环处理了 {processed_streams} 个流 | 活跃流总数: {active_count}")
except Exception as e:
logger.error(f"更新活跃流计数时发生错误: {e}")
async def _check_all_streams_with_priority(self):
"""按优先级检查所有聊天流高focus_energy的流优先处理"""
try:
chat_manager = get_chat_manager()
if not chat_manager.streams:
return
# 获取活跃的聊天流并按focus_energy排序
active_streams = []
for stream_id, chat_stream in chat_manager.streams.items():
context = chat_stream.stream_context
if not context or not context.is_active:
continue
# 获取focus_energy
focus_energy = chat_stream.focus_energy
# 计算流优先级分数
priority_score = self._calculate_stream_priority(context, focus_energy)
active_streams.append((priority_score, stream_id, context))
except Exception as e:
logger.error(f"获取活跃流列表时发生错误: {e}")
return
# 按优先级降序排序
active_streams.sort(reverse=True, key=lambda x: x[0])
# 处理排序后的流
active_stream_count = 0
total_unread = 0
for priority_score, stream_id, context in active_streams:
active_stream_count += 1
# 检查是否有未读消息
try:
chat_stream = chat_manager.get_stream(stream_id)
if not chat_stream:
continue
unread_messages = chat_stream.context_manager.get_unread_messages()
if unread_messages:
total_unread += len(unread_messages)
# 如果没有处理任务,创建一个
if not hasattr(context, 'processing_task') or not context.processing_task or context.processing_task.done():
context.processing_task = asyncio.create_task(self._process_stream_messages(stream_id))
# 高优先级流的额外日志
if priority_score > 0.7:
logger.info(
f"高优先级流 {stream_id} 开始处理 | "
f"优先级: {priority_score:.3f} | "
f"未读消息: {len(unread_messages)}"
)
except Exception as e:
logger.error(f"处理流 {stream_id} 的未读消息时发生错误: {e}")
continue
# 更新统计
self.stats.active_streams = active_stream_count
self.stats.total_unread_messages = total_unread
def _calculate_stream_priority(self, context: StreamContext, focus_energy: float) -> float:
"""计算聊天流的优先级分数 - 简化版本主要使用focus_energy"""
# 使用重构后的能量管理器主要依赖focus_energy
base_priority = focus_energy
# 简单的未读消息加权
unread_count = len(context.get_unread_messages())
message_bonus = min(unread_count * 0.05, 0.2) # 最多20%加成
# 简单的时间加权
current_time = time.time()
time_since_active = current_time - context.last_check_time
time_bonus = max(0, 1.0 - time_since_active / 7200.0) * 0.1 # 2小时内衰减
final_priority = base_priority + message_bonus + time_bonus
return max(0.0, min(1.0, final_priority))
def _clear_all_unread_messages(self, stream_id: str): def _clear_all_unread_messages(self, stream_id: str):
"""清除指定上下文中的所有未读消息,防止意外情况导致消息一直未读""" """清除指定上下文中的所有未读消息,防止意外情况导致消息一直未读"""
try: try:

View File

@@ -401,13 +401,31 @@ class ChatterPlanFilter:
for msg_dict in messages: for msg_dict in messages:
try: try:
# 将字典转换为DatabaseMessages对象 # 将字典转换为DatabaseMessages对象
db_message = DatabaseMessages( # 处理两种可能的数据格式flatten()返回的平铺字段 或 包含user_info字段的字典
message_id=msg_dict.get("message_id", ""), user_info_dict = msg_dict.get("user_info", {})
user_info=msg_dict.get("user_info", {}), if isinstance(user_info_dict, dict) and user_info_dict:
processed_plain_text=msg_dict.get("processed_plain_text", ""), # 如果有user_info字段使用它
key_words=msg_dict.get("key_words", "[]"), db_message = DatabaseMessages(
is_mentioned=msg_dict.get("is_mentioned", False) message_id=msg_dict.get("message_id", ""),
) user_id=user_info_dict.get("user_id", ""),
user_nickname=user_info_dict.get("user_nickname", ""),
user_platform=user_info_dict.get("platform", ""),
processed_plain_text=msg_dict.get("processed_plain_text", ""),
key_words=msg_dict.get("key_words", "[]"),
is_mentioned=msg_dict.get("is_mentioned", False),
**{"user_info": user_info_dict} # 通过kwargs传入user_info
)
else:
# 如果没有user_info字段使用平铺的字段flatten()方法返回的格式)
db_message = DatabaseMessages(
message_id=msg_dict.get("message_id", ""),
user_id=msg_dict.get("user_id", ""),
user_nickname=msg_dict.get("user_nickname", ""),
user_platform=msg_dict.get("user_platform", ""),
processed_plain_text=msg_dict.get("processed_plain_text", ""),
key_words=msg_dict.get("key_words", "[]"),
is_mentioned=msg_dict.get("is_mentioned", False)
)
# 计算消息兴趣度 # 计算消息兴趣度
interest_score_obj = await chatter_interest_scoring_system._calculate_single_message_score( interest_score_obj = await chatter_interest_scoring_system._calculate_single_message_score(