feat(chat): 优化流处理逻辑与Normal模式性能

- 在StreamLoopManager中添加流能量更新机制,在处理消息前更新能量值用于间隔计算
- 为消息打断系统添加allow_reply_interruption配置选项,控制是否允许在回复时打断
- 重构AffinityFlowChatter规划器,为Normal模式添加简化流程,显著降低延迟
- 实现Normal模式与Focus模式间的智能切换机制,基于focus_energy概率退出Normal模式
- 移除冗余的兴趣度批量更新逻辑,优化数据库写入性能
- 更新配置模板版本至7.5.0

BREAKING CHANGE: 配置文件中新增allow_reply_interruption选项,需要更新配置
This commit is contained in:
Windpicker-owo
2025-10-28 19:13:18 +08:00
parent 033e1fecb4
commit 4e024656ff
5 changed files with 230 additions and 56 deletions

View File

@@ -263,7 +263,14 @@ class StreamLoopManager:
if has_messages: if has_messages:
if force_dispatch: if force_dispatch:
logger.info("%s 未读消息 %d 条,触发强制分发", stream_id, unread_count) logger.info("%s 未读消息 %d 条,触发强制分发", stream_id, unread_count)
# 3. 激活chatter处理
# 3. 在处理前更新能量值(用于下次间隔计算)
try:
await self._update_stream_energy(stream_id, context)
except Exception as e:
logger.debug(f"更新流能量失败 {stream_id}: {e}")
# 4. 激活chatter处理
success = await self._process_stream_messages(stream_id, context) success = await self._process_stream_messages(stream_id, context)
# 更新统计 # 更新统计
@@ -274,10 +281,10 @@ class StreamLoopManager:
self.stats["total_failures"] += 1 self.stats["total_failures"] += 1
logger.warning(f"流处理失败: {stream_id}") logger.warning(f"流处理失败: {stream_id}")
# 4. 计算下次检查间隔 # 5. 计算下次检查间隔
interval = await self._calculate_interval(stream_id, has_messages) interval = await self._calculate_interval(stream_id, has_messages)
# 5. sleep等待下次检查 # 6. sleep等待下次检查
logger.info(f"{stream_id} 等待 {interval:.2f}s") logger.info(f"{stream_id} 等待 {interval:.2f}s")
await asyncio.sleep(interval) await asyncio.sleep(interval)
@@ -482,6 +489,60 @@ class StreamLoopManager:
logger.warning(f"刷新缓存消息失败: stream={stream_id}, error={e}") logger.warning(f"刷新缓存消息失败: stream={stream_id}, error={e}")
return [] return []
async def _update_stream_energy(self, stream_id: str, context: Any) -> None:
"""更新流的能量值
Args:
stream_id: 流ID
context: 流上下文 (StreamContext)
"""
try:
from src.chat.message_receive.chat_stream import get_chat_manager
# 获取聊天流
chat_manager = get_chat_manager()
chat_stream = await chat_manager.get_stream(stream_id)
if not chat_stream:
logger.debug(f"无法找到聊天流 {stream_id},跳过能量更新")
return
# 从 context_manager 获取消息(包括未读和历史消息)
# 合并未读消息和历史消息
all_messages = []
# 添加历史消息
history_messages = context.get_history_messages(limit=global_config.chat.max_context_size)
all_messages.extend(history_messages)
# 添加未读消息
unread_messages = context.get_unread_messages()
all_messages.extend(unread_messages)
# 按时间排序并限制数量
all_messages.sort(key=lambda m: m.time)
messages = all_messages[-global_config.chat.max_context_size:]
# 获取用户ID
user_id = None
if context.triggering_user_id:
user_id = context.triggering_user_id
# 使用能量管理器计算并缓存能量值
energy = await energy_manager.calculate_focus_energy(
stream_id=stream_id,
messages=messages,
user_id=user_id
)
# 同步更新到 ChatStream
chat_stream._focus_energy = energy
logger.debug(f"已更新流 {stream_id} 的能量值: {energy:.3f}")
except Exception as e:
logger.warning(f"更新流能量失败 {stream_id}: {e}", exc_info=False)
async def _calculate_interval(self, stream_id: str, has_messages: bool) -> float: async def _calculate_interval(self, stream_id: str, has_messages: bool) -> float:
"""计算下次检查间隔 """计算下次检查间隔

View File

@@ -364,10 +364,13 @@ class MessageManager:
if not global_config.chat.interruption_enabled or not chat_stream or not message: if not global_config.chat.interruption_enabled or not chat_stream or not message:
return return
# 检查是否正在回复 # 检查是否正在回复,以及是否允许在回复时打断
if chat_stream.context_manager.context.is_replying: if chat_stream.context_manager.context.is_replying:
logger.info(f"聊天流 {chat_stream.stream_id} 正在回复中,跳过打断检查") if not global_config.chat.allow_reply_interruption:
return logger.debug(f"聊天流 {chat_stream.stream_id} 正在回复中,且配置不允许回复时打断,跳过打断检查")
return
else:
logger.debug(f"聊天流 {chat_stream.stream_id} 正在回复中,但配置允许回复时打断")
# 检查是否为表情包消息 # 检查是否为表情包消息
if message.is_picid or message.is_emoji: if message.is_picid or message.is_emoji:

View File

@@ -121,6 +121,9 @@ class ChatConfig(ValidatedConfigBase):
) )
# 消息打断系统配置 - 线性概率模型 # 消息打断系统配置 - 线性概率模型
interruption_enabled: bool = Field(default=True, description="是否启用消息打断系统") interruption_enabled: bool = Field(default=True, description="是否启用消息打断系统")
allow_reply_interruption: bool = Field(
default=False, description="是否允许在正在生成回复时打断True=允许打断回复False=回复期间不允许打断)"
)
interruption_max_limit: int = Field(default=10, ge=0, description="每个聊天流的最大打断次数") interruption_max_limit: int = Field(default=10, ge=0, description="每个聊天流的最大打断次数")
interruption_min_probability: float = Field( interruption_min_probability: float = Field(
default=0.1, ge=0.0, le=1.0, description="最低打断概率(即使达到较高打断次数,也保证有此概率的打断机会)" default=0.1, ge=0.0, le=1.0, description="最低打断概率(即使达到较高打断次数,也保证有此概率的打断机会)"

View File

@@ -97,14 +97,20 @@ class ChatterActionPlanner:
async def _enhanced_plan_flow(self, context: "StreamContext | None") -> tuple[list[dict[str, Any]], Any | None]: async def _enhanced_plan_flow(self, context: "StreamContext | None") -> tuple[list[dict[str, Any]], Any | None]:
"""执行增强版规划流程""" """执行增强版规划流程"""
try: try:
# 在规划前,先进行动作修改
from src.chat.planner_actions.action_modifier import ActionModifier
action_modifier = ActionModifier(self.action_manager, self.chat_id)
await action_modifier.modify_actions()
# 1. 生成初始 Plan # 1. 生成初始 Plan
chat_mode = context.chat_mode if context else ChatMode.NORMAL chat_mode = context.chat_mode if context else ChatMode.NORMAL
# Normal模式下使用简化流程
if chat_mode == ChatMode.NORMAL:
return await self._normal_mode_flow(context)
# 在规划前,先进行动作修改
from src.chat.planner_actions.action_modifier import ActionModifier
action_modifier = ActionModifier(self.action_manager, self.chat_id)
await action_modifier.modify_actions()
initial_plan = await self.generator.generate(chat_mode) initial_plan = await self.generator.generate(chat_mode)
# 确保Plan中包含所有当前可用的动作 # 确保Plan中包含所有当前可用的动作
@@ -148,19 +154,6 @@ class ChatterActionPlanner:
message.interest_value = 0.0 message.interest_value = 0.0
message.should_reply = False message.should_reply = False
message.should_act = False message.should_act = False
interest_updates.append(
{
"message_id": message.message_id,
"interest_value": 0.0,
"should_reply": False,
"should_act": False,
}
)
if interest_updates:
task = asyncio.create_task(self._commit_interest_updates(interest_updates))
self._background_tasks.add(task)
task.add_done_callback(self._handle_task_result)
# 检查兴趣度是否达到非回复动作阈值 # 检查兴趣度是否达到非回复动作阈值
non_reply_action_interest_threshold = global_config.affinity_flow.non_reply_action_interest_threshold non_reply_action_interest_threshold = global_config.affinity_flow.non_reply_action_interest_threshold
@@ -192,8 +185,19 @@ class ChatterActionPlanner:
# 6. 根据执行结果更新统计信息 # 6. 根据执行结果更新统计信息
self._update_stats_from_execution_result(execution_result) self._update_stats_from_execution_result(execution_result)
# 7. Focus模式下如果执行了reply动作切换到Normal模式
if chat_mode == ChatMode.FOCUS and context:
has_reply = any(
action.action_type in ["reply", "proactive_reply"]
for action in filtered_plan.decided_actions
)
if has_reply:
logger.info("Focus模式: 执行了reply动作切换到Normal模式")
context.chat_mode = ChatMode.NORMAL
await self._sync_chat_mode_to_stream(context)
# 7. 返回结果 # 8. 返回结果
return self._build_return_result(filtered_plan) return self._build_return_result(filtered_plan)
except Exception as e: except Exception as e:
@@ -201,32 +205,145 @@ class ChatterActionPlanner:
self.planner_stats["failed_plans"] += 1 self.planner_stats["failed_plans"] += 1
return [], None return [], None
async def _commit_interest_updates(self, updates: list[dict[str, Any]]) -> None: async def _normal_mode_flow(self, context: "StreamContext | None") -> tuple[list[dict[str, Any]], Any | None]:
"""统一更新消息兴趣度,减少数据库写入次数""" """Normal模式下的简化plan流程
if not updates:
只计算兴趣值并判断是否达到reply阈值不执行完整的plan流程。
根据focus_energy决定退出normal模式回到focus模式的概率。
"""
try:
unread_messages = context.get_unread_messages() if context else []
if not unread_messages:
logger.debug("Normal模式: 没有未读消息")
from src.common.data_models.info_data_model import ActionPlannerInfo
no_action = ActionPlannerInfo(
action_type="no_action",
reasoning="Normal模式: 没有未读消息",
action_data={},
action_message=None,
)
return [asdict(no_action)], None
# 检查是否有消息达到reply阈值
should_reply = False
target_message = None
for message in unread_messages:
message_should_reply = getattr(message, "should_reply", False)
if message_should_reply:
should_reply = True
target_message = message
logger.info(f"Normal模式: 消息 {message.message_id} 达到reply阈值")
break
if should_reply and target_message:
# 达到reply阈值直接进入回复流程
from src.common.data_models.info_data_model import ActionPlannerInfo, Plan
from src.plugin_system.base.component_types import ChatType
# 构建目标消息字典 - 使用 flatten() 方法获取扁平化的字典
target_message_dict = target_message.flatten()
reply_action = ActionPlannerInfo(
action_type="reply",
reasoning="Normal模式: 兴趣度达到阈值,直接回复",
action_data={"target_message_id": target_message.message_id},
action_message=target_message_dict,
)
# Normal模式下直接构建最小化的Plan跳过generator和action_modifier
# 这样可以显著降低延迟
minimal_plan = Plan(
chat_id=self.chat_id,
chat_type=ChatType.PRIVATE if not context else context.chat_type,
mode=ChatMode.NORMAL,
decided_actions=[reply_action],
)
# 执行reply动作
execution_result = await self.executor.execute(minimal_plan)
self._update_stats_from_execution_result(execution_result)
logger.info("Normal模式: 执行reply动作完成")
# 无论是否回复都进行退出normal模式的判定
await self._check_exit_normal_mode(context)
return [asdict(reply_action)], target_message_dict
else:
# 未达到reply阈值
logger.debug(f"Normal模式: 未达到reply阈值")
from src.common.data_models.info_data_model import ActionPlannerInfo
no_action = ActionPlannerInfo(
action_type="no_action",
reasoning="Normal模式: 兴趣度未达到阈值",
action_data={},
action_message=None,
)
# 无论是否回复都进行退出normal模式的判定
await self._check_exit_normal_mode(context)
return [asdict(no_action)], None
except Exception as e:
logger.error(f"Normal模式流程出错: {e}")
self.planner_stats["failed_plans"] += 1
return [], None
async def _check_exit_normal_mode(self, context: "StreamContext | None") -> None:
"""检查并执行退出Normal模式的判定
Args:
context: 流上下文
"""
if not context:
return return
try: try:
from src.chat.message_manager.message_manager import message_manager from src.chat.message_receive.chat_stream import get_chat_manager
await message_manager.bulk_update_messages(self.chat_id, updates) chat_manager = get_chat_manager()
chat_stream = await chat_manager.get_stream(self.chat_id) if chat_manager else None
if not chat_stream:
return
focus_energy = chat_stream.focus_energy
# focus_energy越低退出normal模式的概率越高
# 使用反比例函数: 退出概率 = 1 - focus_energy
# 当focus_energy = 0.1时,退出概率 = 90%
# 当focus_energy = 0.5时,退出概率 = 50%
# 当focus_energy = 0.9时,退出概率 = 10%
exit_probability = 1.0 - focus_energy
import random
if random.random() < exit_probability:
logger.info(f"Normal模式: focus_energy={focus_energy:.3f}, 退出概率={exit_probability:.3f}, 切换回focus模式")
# 切换回focus模式
context.chat_mode = ChatMode.FOCUS
await self._sync_chat_mode_to_stream(context)
else:
logger.debug(f"Normal模式: focus_energy={focus_energy:.3f}, 退出概率={exit_probability:.3f}, 保持normal模式")
except Exception as e: except Exception as e:
logger.warning(f"批量更新上下文消息兴趣度失败: {e}") logger.warning(f"检查退出Normal模式失败: {e}")
async def _sync_chat_mode_to_stream(self, context: "StreamContext") -> None:
"""同步chat_mode到ChatStream"""
try: try:
from src.chat.message_receive.storage import MessageStorage from src.chat.message_receive.chat_stream import get_chat_manager
interest_map = {item["message_id"]: item["interest_value"] for item in updates if "interest_value" in item} chat_manager = get_chat_manager()
reply_map = {item["message_id"]: item["should_reply"] for item in updates if "should_reply" in item} if chat_manager:
chat_stream = await chat_manager.get_stream(context.stream_id)
await MessageStorage.bulk_update_interest_values( if chat_stream:
interest_map=interest_map, chat_stream.context_manager.context.chat_mode = context.chat_mode
reply_map=reply_map if reply_map else None, chat_stream.saved = False # 标记需要保存
) logger.debug(f"已同步chat_mode {context.chat_mode.value} 到ChatStream {context.stream_id}")
logger.debug(f"已批量更新 {len(interest_map)} 条消息的兴趣度")
except Exception as e: except Exception as e:
logger.warning(f"批量更新数据库兴趣度失败: {e}") logger.warning(f"同步chat_mode到ChatStream失败: {e}")
def _update_stats_from_execution_result(self, execution_result: dict[str, Any]): def _update_stats_from_execution_result(self, execution_result: dict[str, Any]):
"""根据执行结果更新规划器统计""" """根据执行结果更新规划器统计"""
@@ -269,17 +386,6 @@ class ChatterActionPlanner:
return final_actions_dict, final_target_message_dict return final_actions_dict, final_target_message_dict
def _handle_task_result(self, task: asyncio.Task) -> None:
"""处理后台任务的结果,记录异常。"""
try:
task.result()
except asyncio.CancelledError:
pass # 任务被取消是正常现象
except Exception as e:
logger.error(f"后台任务执行失败: {e}", exc_info=True)
finally:
self._background_tasks.discard(task)
def get_planner_stats(self) -> dict[str, Any]: def get_planner_stats(self) -> dict[str, Any]:
"""获取规划器统计""" """获取规划器统计"""
return self.planner_stats.copy() return self.planner_stats.copy()

View File

@@ -1,5 +1,5 @@
[inner] [inner]
version = "7.4.9" version = "7.5.0"
#----以下是给开发人员阅读的如果你只是部署了MoFox-Bot不需要阅读---- #----以下是给开发人员阅读的如果你只是部署了MoFox-Bot不需要阅读----
#如果你想要修改配置文件请递增version的值 #如果你想要修改配置文件请递增version的值
@@ -126,6 +126,7 @@ thinking_timeout = 40 # MoFox-Bot一次回复最长思考规划时间超过
# 消息打断系统配置 - 反比例函数概率模型 # 消息打断系统配置 - 反比例函数概率模型
interruption_enabled = true # 是否启用消息打断系统 interruption_enabled = true # 是否启用消息打断系统
allow_reply_interruption = false # 是否允许在正在生成回复时打断true=允许打断回复false=回复期间不允许打断)
interruption_max_limit = 5 # 每个聊天流的最大打断次数 interruption_max_limit = 5 # 每个聊天流的最大打断次数
interruption_min_probability = 0.05 # 最低打断概率(反比例函数趋近的下限值) interruption_min_probability = 0.05 # 最低打断概率(反比例函数趋近的下限值)