feat(chat): 优化流处理逻辑与Normal模式性能

- 在StreamLoopManager中添加流能量更新机制,在处理消息前更新能量值用于间隔计算
- 为消息打断系统添加allow_reply_interruption配置选项,控制是否允许在回复时打断
- 重构AffinityFlowChatter规划器,为Normal模式添加简化流程,显著降低延迟
- 实现Normal模式与Focus模式间的智能切换机制,基于focus_energy概率退出Normal模式
- 移除冗余的兴趣度批量更新逻辑,优化数据库写入性能
- 更新配置模板版本至7.5.0

BREAKING CHANGE: 配置文件中新增allow_reply_interruption选项,需要更新配置
This commit is contained in:
Windpicker-owo
2025-10-28 19:13:18 +08:00
parent 82f2b68293
commit f6aa923f06
5 changed files with 230 additions and 56 deletions

View File

@@ -97,14 +97,20 @@ class ChatterActionPlanner:
async def _enhanced_plan_flow(self, context: "StreamContext | None") -> tuple[list[dict[str, Any]], Any | None]:
"""执行增强版规划流程"""
try:
# 在规划前,先进行动作修改
from src.chat.planner_actions.action_modifier import ActionModifier
action_modifier = ActionModifier(self.action_manager, self.chat_id)
await action_modifier.modify_actions()
# 1. 生成初始 Plan
chat_mode = context.chat_mode if context else ChatMode.NORMAL
# Normal模式下使用简化流程
if chat_mode == ChatMode.NORMAL:
return await self._normal_mode_flow(context)
# 在规划前,先进行动作修改
from src.chat.planner_actions.action_modifier import ActionModifier
action_modifier = ActionModifier(self.action_manager, self.chat_id)
await action_modifier.modify_actions()
initial_plan = await self.generator.generate(chat_mode)
# 确保Plan中包含所有当前可用的动作
@@ -148,19 +154,6 @@ class ChatterActionPlanner:
message.interest_value = 0.0
message.should_reply = False
message.should_act = False
interest_updates.append(
{
"message_id": message.message_id,
"interest_value": 0.0,
"should_reply": False,
"should_act": False,
}
)
if interest_updates:
task = asyncio.create_task(self._commit_interest_updates(interest_updates))
self._background_tasks.add(task)
task.add_done_callback(self._handle_task_result)
# 检查兴趣度是否达到非回复动作阈值
non_reply_action_interest_threshold = global_config.affinity_flow.non_reply_action_interest_threshold
@@ -192,8 +185,19 @@ class ChatterActionPlanner:
# 6. 根据执行结果更新统计信息
self._update_stats_from_execution_result(execution_result)
# 7. Focus模式下如果执行了reply动作切换到Normal模式
if chat_mode == ChatMode.FOCUS and context:
has_reply = any(
action.action_type in ["reply", "proactive_reply"]
for action in filtered_plan.decided_actions
)
if has_reply:
logger.info("Focus模式: 执行了reply动作切换到Normal模式")
context.chat_mode = ChatMode.NORMAL
await self._sync_chat_mode_to_stream(context)
# 7. 返回结果
# 8. 返回结果
return self._build_return_result(filtered_plan)
except Exception as e:
@@ -201,32 +205,145 @@ class ChatterActionPlanner:
self.planner_stats["failed_plans"] += 1
return [], None
async def _commit_interest_updates(self, updates: list[dict[str, Any]]) -> None:
"""统一更新消息兴趣度,减少数据库写入次数"""
if not updates:
async def _normal_mode_flow(self, context: "StreamContext | None") -> tuple[list[dict[str, Any]], Any | None]:
"""Normal模式下的简化plan流程
只计算兴趣值并判断是否达到reply阈值不执行完整的plan流程。
根据focus_energy决定退出normal模式回到focus模式的概率。
"""
try:
unread_messages = context.get_unread_messages() if context else []
if not unread_messages:
logger.debug("Normal模式: 没有未读消息")
from src.common.data_models.info_data_model import ActionPlannerInfo
no_action = ActionPlannerInfo(
action_type="no_action",
reasoning="Normal模式: 没有未读消息",
action_data={},
action_message=None,
)
return [asdict(no_action)], None
# 检查是否有消息达到reply阈值
should_reply = False
target_message = None
for message in unread_messages:
message_should_reply = getattr(message, "should_reply", False)
if message_should_reply:
should_reply = True
target_message = message
logger.info(f"Normal模式: 消息 {message.message_id} 达到reply阈值")
break
if should_reply and target_message:
# 达到reply阈值直接进入回复流程
from src.common.data_models.info_data_model import ActionPlannerInfo, Plan
from src.plugin_system.base.component_types import ChatType
# 构建目标消息字典 - 使用 flatten() 方法获取扁平化的字典
target_message_dict = target_message.flatten()
reply_action = ActionPlannerInfo(
action_type="reply",
reasoning="Normal模式: 兴趣度达到阈值,直接回复",
action_data={"target_message_id": target_message.message_id},
action_message=target_message_dict,
)
# Normal模式下直接构建最小化的Plan跳过generator和action_modifier
# 这样可以显著降低延迟
minimal_plan = Plan(
chat_id=self.chat_id,
chat_type=ChatType.PRIVATE if not context else context.chat_type,
mode=ChatMode.NORMAL,
decided_actions=[reply_action],
)
# 执行reply动作
execution_result = await self.executor.execute(minimal_plan)
self._update_stats_from_execution_result(execution_result)
logger.info("Normal模式: 执行reply动作完成")
# 无论是否回复都进行退出normal模式的判定
await self._check_exit_normal_mode(context)
return [asdict(reply_action)], target_message_dict
else:
# 未达到reply阈值
logger.debug(f"Normal模式: 未达到reply阈值")
from src.common.data_models.info_data_model import ActionPlannerInfo
no_action = ActionPlannerInfo(
action_type="no_action",
reasoning="Normal模式: 兴趣度未达到阈值",
action_data={},
action_message=None,
)
# 无论是否回复都进行退出normal模式的判定
await self._check_exit_normal_mode(context)
return [asdict(no_action)], None
except Exception as e:
logger.error(f"Normal模式流程出错: {e}")
self.planner_stats["failed_plans"] += 1
return [], None
async def _check_exit_normal_mode(self, context: "StreamContext | None") -> None:
"""检查并执行退出Normal模式的判定
Args:
context: 流上下文
"""
if not context:
return
try:
from src.chat.message_manager.message_manager import message_manager
await message_manager.bulk_update_messages(self.chat_id, updates)
from src.chat.message_receive.chat_stream import get_chat_manager
chat_manager = get_chat_manager()
chat_stream = await chat_manager.get_stream(self.chat_id) if chat_manager else None
if not chat_stream:
return
focus_energy = chat_stream.focus_energy
# focus_energy越低退出normal模式的概率越高
# 使用反比例函数: 退出概率 = 1 - focus_energy
# 当focus_energy = 0.1时,退出概率 = 90%
# 当focus_energy = 0.5时,退出概率 = 50%
# 当focus_energy = 0.9时,退出概率 = 10%
exit_probability = 1.0 - focus_energy
import random
if random.random() < exit_probability:
logger.info(f"Normal模式: focus_energy={focus_energy:.3f}, 退出概率={exit_probability:.3f}, 切换回focus模式")
# 切换回focus模式
context.chat_mode = ChatMode.FOCUS
await self._sync_chat_mode_to_stream(context)
else:
logger.debug(f"Normal模式: focus_energy={focus_energy:.3f}, 退出概率={exit_probability:.3f}, 保持normal模式")
except Exception as e:
logger.warning(f"批量更新上下文消息兴趣度失败: {e}")
logger.warning(f"检查退出Normal模式失败: {e}")
async def _sync_chat_mode_to_stream(self, context: "StreamContext") -> None:
"""同步chat_mode到ChatStream"""
try:
from src.chat.message_receive.storage import MessageStorage
interest_map = {item["message_id"]: item["interest_value"] for item in updates if "interest_value" in item}
reply_map = {item["message_id"]: item["should_reply"] for item in updates if "should_reply" in item}
await MessageStorage.bulk_update_interest_values(
interest_map=interest_map,
reply_map=reply_map if reply_map else None,
)
logger.debug(f"已批量更新 {len(interest_map)} 条消息的兴趣度")
from src.chat.message_receive.chat_stream import get_chat_manager
chat_manager = get_chat_manager()
if chat_manager:
chat_stream = await chat_manager.get_stream(context.stream_id)
if chat_stream:
chat_stream.context_manager.context.chat_mode = context.chat_mode
chat_stream.saved = False # 标记需要保存
logger.debug(f"已同步chat_mode {context.chat_mode.value} 到ChatStream {context.stream_id}")
except Exception as e:
logger.warning(f"批量更新数据库兴趣度失败: {e}")
logger.warning(f"同步chat_mode到ChatStream失败: {e}")
def _update_stats_from_execution_result(self, execution_result: dict[str, Any]):
"""根据执行结果更新规划器统计"""
@@ -269,17 +386,6 @@ class ChatterActionPlanner:
return final_actions_dict, final_target_message_dict
def _handle_task_result(self, task: asyncio.Task) -> None:
"""处理后台任务的结果,记录异常。"""
try:
task.result()
except asyncio.CancelledError:
pass # 任务被取消是正常现象
except Exception as e:
logger.error(f"后台任务执行失败: {e}", exc_info=True)
finally:
self._background_tasks.discard(task)
def get_planner_stats(self) -> dict[str, Any]:
"""获取规划器统计"""
return self.planner_stats.copy()