From 4e024656ff5cabf8e410190c7c479454bb005149 Mon Sep 17 00:00:00 2001 From: Windpicker-owo <3431391539@qq.com> Date: Tue, 28 Oct 2025 19:13:18 +0800 Subject: [PATCH] =?UTF-8?q?feat(chat):=20=E4=BC=98=E5=8C=96=E6=B5=81?= =?UTF-8?q?=E5=A4=84=E7=90=86=E9=80=BB=E8=BE=91=E4=B8=8ENormal=E6=A8=A1?= =?UTF-8?q?=E5=BC=8F=E6=80=A7=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 在StreamLoopManager中添加流能量更新机制,在处理消息前更新能量值用于间隔计算 - 为消息打断系统添加allow_reply_interruption配置选项,控制是否允许在回复时打断 - 重构AffinityFlowChatter规划器,为Normal模式添加简化流程,显著降低延迟 - 实现Normal模式与Focus模式间的智能切换机制,基于focus_energy概率退出Normal模式 - 移除冗余的兴趣度批量更新逻辑,优化数据库写入性能 - 更新配置模板版本至7.5.0 BREAKING CHANGE: 配置文件中新增allow_reply_interruption选项,需要更新配置 --- .../message_manager/distribution_manager.py | 67 +++++- src/chat/message_manager/message_manager.py | 9 +- src/config/official_configs.py | 3 + .../built_in/affinity_flow_chatter/planner.py | 204 +++++++++++++----- template/bot_config_template.toml | 3 +- 5 files changed, 230 insertions(+), 56 deletions(-) diff --git a/src/chat/message_manager/distribution_manager.py b/src/chat/message_manager/distribution_manager.py index e238a6059..5bf13081f 100644 --- a/src/chat/message_manager/distribution_manager.py +++ b/src/chat/message_manager/distribution_manager.py @@ -263,7 +263,14 @@ class StreamLoopManager: if has_messages: if force_dispatch: logger.info("流 %s 未读消息 %d 条,触发强制分发", stream_id, unread_count) - # 3. 激活chatter处理 + + # 3. 在处理前更新能量值(用于下次间隔计算) + try: + await self._update_stream_energy(stream_id, context) + except Exception as e: + logger.debug(f"更新流能量失败 {stream_id}: {e}") + + # 4. 激活chatter处理 success = await self._process_stream_messages(stream_id, context) # 更新统计 @@ -274,10 +281,10 @@ class StreamLoopManager: self.stats["total_failures"] += 1 logger.warning(f"流处理失败: {stream_id}") - # 4. 计算下次检查间隔 + # 5. 计算下次检查间隔 interval = await self._calculate_interval(stream_id, has_messages) - # 5. sleep等待下次检查 + # 6. sleep等待下次检查 logger.info(f"流 {stream_id} 等待 {interval:.2f}s") await asyncio.sleep(interval) @@ -482,6 +489,60 @@ class StreamLoopManager: logger.warning(f"刷新缓存消息失败: stream={stream_id}, error={e}") return [] + async def _update_stream_energy(self, stream_id: str, context: Any) -> None: + """更新流的能量值 + + Args: + stream_id: 流ID + context: 流上下文 (StreamContext) + """ + try: + from src.chat.message_receive.chat_stream import get_chat_manager + + # 获取聊天流 + chat_manager = get_chat_manager() + chat_stream = await chat_manager.get_stream(stream_id) + + if not chat_stream: + logger.debug(f"无法找到聊天流 {stream_id},跳过能量更新") + return + + # 从 context_manager 获取消息(包括未读和历史消息) + # 合并未读消息和历史消息 + all_messages = [] + + # 添加历史消息 + history_messages = context.get_history_messages(limit=global_config.chat.max_context_size) + all_messages.extend(history_messages) + + # 添加未读消息 + unread_messages = context.get_unread_messages() + all_messages.extend(unread_messages) + + # 按时间排序并限制数量 + all_messages.sort(key=lambda m: m.time) + messages = all_messages[-global_config.chat.max_context_size:] + + # 获取用户ID + user_id = None + if context.triggering_user_id: + user_id = context.triggering_user_id + + # 使用能量管理器计算并缓存能量值 + energy = await energy_manager.calculate_focus_energy( + stream_id=stream_id, + messages=messages, + user_id=user_id + ) + + # 同步更新到 ChatStream + chat_stream._focus_energy = energy + + logger.debug(f"已更新流 {stream_id} 的能量值: {energy:.3f}") + + except Exception as e: + logger.warning(f"更新流能量失败 {stream_id}: {e}", exc_info=False) + async def _calculate_interval(self, stream_id: str, has_messages: bool) -> float: """计算下次检查间隔 diff --git a/src/chat/message_manager/message_manager.py b/src/chat/message_manager/message_manager.py index c38e6d12c..49c169640 100644 --- a/src/chat/message_manager/message_manager.py +++ b/src/chat/message_manager/message_manager.py @@ -364,10 +364,13 @@ class MessageManager: if not global_config.chat.interruption_enabled or not chat_stream or not message: return - # 检查是否正在回复 + # 检查是否正在回复,以及是否允许在回复时打断 if chat_stream.context_manager.context.is_replying: - logger.info(f"聊天流 {chat_stream.stream_id} 正在回复中,跳过打断检查") - return + if not global_config.chat.allow_reply_interruption: + logger.debug(f"聊天流 {chat_stream.stream_id} 正在回复中,且配置不允许回复时打断,跳过打断检查") + return + else: + logger.debug(f"聊天流 {chat_stream.stream_id} 正在回复中,但配置允许回复时打断") # 检查是否为表情包消息 if message.is_picid or message.is_emoji: diff --git a/src/config/official_configs.py b/src/config/official_configs.py index 3d3522f1b..dab75b404 100644 --- a/src/config/official_configs.py +++ b/src/config/official_configs.py @@ -121,6 +121,9 @@ class ChatConfig(ValidatedConfigBase): ) # 消息打断系统配置 - 线性概率模型 interruption_enabled: bool = Field(default=True, description="是否启用消息打断系统") + allow_reply_interruption: bool = Field( + default=False, description="是否允许在正在生成回复时打断(True=允许打断回复,False=回复期间不允许打断)" + ) interruption_max_limit: int = Field(default=10, ge=0, description="每个聊天流的最大打断次数") interruption_min_probability: float = Field( default=0.1, ge=0.0, le=1.0, description="最低打断概率(即使达到较高打断次数,也保证有此概率的打断机会)" diff --git a/src/plugins/built_in/affinity_flow_chatter/planner.py b/src/plugins/built_in/affinity_flow_chatter/planner.py index 9be3235e9..adb2ba6a1 100644 --- a/src/plugins/built_in/affinity_flow_chatter/planner.py +++ b/src/plugins/built_in/affinity_flow_chatter/planner.py @@ -97,14 +97,20 @@ class ChatterActionPlanner: async def _enhanced_plan_flow(self, context: "StreamContext | None") -> tuple[list[dict[str, Any]], Any | None]: """执行增强版规划流程""" try: - # 在规划前,先进行动作修改 - from src.chat.planner_actions.action_modifier import ActionModifier - action_modifier = ActionModifier(self.action_manager, self.chat_id) - await action_modifier.modify_actions() # 1. 生成初始 Plan chat_mode = context.chat_mode if context else ChatMode.NORMAL + + # Normal模式下使用简化流程 + if chat_mode == ChatMode.NORMAL: + return await self._normal_mode_flow(context) + + # 在规划前,先进行动作修改 + from src.chat.planner_actions.action_modifier import ActionModifier + action_modifier = ActionModifier(self.action_manager, self.chat_id) + await action_modifier.modify_actions() + initial_plan = await self.generator.generate(chat_mode) # 确保Plan中包含所有当前可用的动作 @@ -148,19 +154,6 @@ class ChatterActionPlanner: message.interest_value = 0.0 message.should_reply = False message.should_act = False - interest_updates.append( - { - "message_id": message.message_id, - "interest_value": 0.0, - "should_reply": False, - "should_act": False, - } - ) - - if interest_updates: - task = asyncio.create_task(self._commit_interest_updates(interest_updates)) - self._background_tasks.add(task) - task.add_done_callback(self._handle_task_result) # 检查兴趣度是否达到非回复动作阈值 non_reply_action_interest_threshold = global_config.affinity_flow.non_reply_action_interest_threshold @@ -192,8 +185,19 @@ class ChatterActionPlanner: # 6. 根据执行结果更新统计信息 self._update_stats_from_execution_result(execution_result) + + # 7. Focus模式下如果执行了reply动作,切换到Normal模式 + if chat_mode == ChatMode.FOCUS and context: + has_reply = any( + action.action_type in ["reply", "proactive_reply"] + for action in filtered_plan.decided_actions + ) + if has_reply: + logger.info("Focus模式: 执行了reply动作,切换到Normal模式") + context.chat_mode = ChatMode.NORMAL + await self._sync_chat_mode_to_stream(context) - # 7. 返回结果 + # 8. 返回结果 return self._build_return_result(filtered_plan) except Exception as e: @@ -201,32 +205,145 @@ class ChatterActionPlanner: self.planner_stats["failed_plans"] += 1 return [], None - async def _commit_interest_updates(self, updates: list[dict[str, Any]]) -> None: - """统一更新消息兴趣度,减少数据库写入次数""" - if not updates: + async def _normal_mode_flow(self, context: "StreamContext | None") -> tuple[list[dict[str, Any]], Any | None]: + """Normal模式下的简化plan流程 + + 只计算兴趣值并判断是否达到reply阈值,不执行完整的plan流程。 + 根据focus_energy决定退出normal模式回到focus模式的概率。 + """ + try: + unread_messages = context.get_unread_messages() if context else [] + + if not unread_messages: + logger.debug("Normal模式: 没有未读消息") + from src.common.data_models.info_data_model import ActionPlannerInfo + no_action = ActionPlannerInfo( + action_type="no_action", + reasoning="Normal模式: 没有未读消息", + action_data={}, + action_message=None, + ) + return [asdict(no_action)], None + + # 检查是否有消息达到reply阈值 + should_reply = False + target_message = None + + for message in unread_messages: + message_should_reply = getattr(message, "should_reply", False) + if message_should_reply: + should_reply = True + target_message = message + logger.info(f"Normal模式: 消息 {message.message_id} 达到reply阈值") + break + + if should_reply and target_message: + # 达到reply阈值,直接进入回复流程 + from src.common.data_models.info_data_model import ActionPlannerInfo, Plan + from src.plugin_system.base.component_types import ChatType + + # 构建目标消息字典 - 使用 flatten() 方法获取扁平化的字典 + target_message_dict = target_message.flatten() + + reply_action = ActionPlannerInfo( + action_type="reply", + reasoning="Normal模式: 兴趣度达到阈值,直接回复", + action_data={"target_message_id": target_message.message_id}, + action_message=target_message_dict, + ) + + # Normal模式下直接构建最小化的Plan,跳过generator和action_modifier + # 这样可以显著降低延迟 + minimal_plan = Plan( + chat_id=self.chat_id, + chat_type=ChatType.PRIVATE if not context else context.chat_type, + mode=ChatMode.NORMAL, + decided_actions=[reply_action], + ) + + # 执行reply动作 + execution_result = await self.executor.execute(minimal_plan) + self._update_stats_from_execution_result(execution_result) + + logger.info("Normal模式: 执行reply动作完成") + + # 无论是否回复,都进行退出normal模式的判定 + await self._check_exit_normal_mode(context) + + return [asdict(reply_action)], target_message_dict + else: + # 未达到reply阈值 + logger.debug(f"Normal模式: 未达到reply阈值") + from src.common.data_models.info_data_model import ActionPlannerInfo + no_action = ActionPlannerInfo( + action_type="no_action", + reasoning="Normal模式: 兴趣度未达到阈值", + action_data={}, + action_message=None, + ) + + # 无论是否回复,都进行退出normal模式的判定 + await self._check_exit_normal_mode(context) + + return [asdict(no_action)], None + + except Exception as e: + logger.error(f"Normal模式流程出错: {e}") + self.planner_stats["failed_plans"] += 1 + return [], None + + async def _check_exit_normal_mode(self, context: "StreamContext | None") -> None: + """检查并执行退出Normal模式的判定 + + Args: + context: 流上下文 + """ + if not context: return - + try: - from src.chat.message_manager.message_manager import message_manager - - await message_manager.bulk_update_messages(self.chat_id, updates) + from src.chat.message_receive.chat_stream import get_chat_manager + + chat_manager = get_chat_manager() + chat_stream = await chat_manager.get_stream(self.chat_id) if chat_manager else None + + if not chat_stream: + return + + focus_energy = chat_stream.focus_energy + # focus_energy越低,退出normal模式的概率越高 + # 使用反比例函数: 退出概率 = 1 - focus_energy + # 当focus_energy = 0.1时,退出概率 = 90% + # 当focus_energy = 0.5时,退出概率 = 50% + # 当focus_energy = 0.9时,退出概率 = 10% + exit_probability = 1.0 - focus_energy + + import random + if random.random() < exit_probability: + logger.info(f"Normal模式: focus_energy={focus_energy:.3f}, 退出概率={exit_probability:.3f}, 切换回focus模式") + # 切换回focus模式 + context.chat_mode = ChatMode.FOCUS + await self._sync_chat_mode_to_stream(context) + else: + logger.debug(f"Normal模式: focus_energy={focus_energy:.3f}, 退出概率={exit_probability:.3f}, 保持normal模式") + except Exception as e: - logger.warning(f"批量更新上下文消息兴趣度失败: {e}") + logger.warning(f"检查退出Normal模式失败: {e}") + async def _sync_chat_mode_to_stream(self, context: "StreamContext") -> None: + """同步chat_mode到ChatStream""" try: - from src.chat.message_receive.storage import MessageStorage - - interest_map = {item["message_id"]: item["interest_value"] for item in updates if "interest_value" in item} - reply_map = {item["message_id"]: item["should_reply"] for item in updates if "should_reply" in item} - - await MessageStorage.bulk_update_interest_values( - interest_map=interest_map, - reply_map=reply_map if reply_map else None, - ) - - logger.debug(f"已批量更新 {len(interest_map)} 条消息的兴趣度") + from src.chat.message_receive.chat_stream import get_chat_manager + + chat_manager = get_chat_manager() + if chat_manager: + chat_stream = await chat_manager.get_stream(context.stream_id) + if chat_stream: + chat_stream.context_manager.context.chat_mode = context.chat_mode + chat_stream.saved = False # 标记需要保存 + logger.debug(f"已同步chat_mode {context.chat_mode.value} 到ChatStream {context.stream_id}") except Exception as e: - logger.warning(f"批量更新数据库兴趣度失败: {e}") + logger.warning(f"同步chat_mode到ChatStream失败: {e}") def _update_stats_from_execution_result(self, execution_result: dict[str, Any]): """根据执行结果更新规划器统计""" @@ -269,17 +386,6 @@ class ChatterActionPlanner: return final_actions_dict, final_target_message_dict - def _handle_task_result(self, task: asyncio.Task) -> None: - """处理后台任务的结果,记录异常。""" - try: - task.result() - except asyncio.CancelledError: - pass # 任务被取消是正常现象 - except Exception as e: - logger.error(f"后台任务执行失败: {e}", exc_info=True) - finally: - self._background_tasks.discard(task) - def get_planner_stats(self) -> dict[str, Any]: """获取规划器统计""" return self.planner_stats.copy() diff --git a/template/bot_config_template.toml b/template/bot_config_template.toml index a41c26743..05f6e9dd2 100644 --- a/template/bot_config_template.toml +++ b/template/bot_config_template.toml @@ -1,5 +1,5 @@ [inner] -version = "7.4.9" +version = "7.5.0" #----以下是给开发人员阅读的,如果你只是部署了MoFox-Bot,不需要阅读---- #如果你想要修改配置文件,请递增version的值 @@ -126,6 +126,7 @@ thinking_timeout = 40 # MoFox-Bot一次回复最长思考规划时间,超过 # 消息打断系统配置 - 反比例函数概率模型 interruption_enabled = true # 是否启用消息打断系统 +allow_reply_interruption = false # 是否允许在正在生成回复时打断(true=允许打断回复,false=回复期间不允许打断) interruption_max_limit = 5 # 每个聊天流的最大打断次数 interruption_min_probability = 0.05 # 最低打断概率(反比例函数趋近的下限值)