refactor(chat): 重构planner为大脑/小脑并行架构以提升性能和可扩展性(别管能不能用先插进来再说)

将原有的单体`plan`方法重构为一个多智能体架构,包含一个"大脑"和多个并行的"小脑"。

大脑 (`plan`方法) 专注于决定是否进行聊天回复 (`reply`),并负责调度和整合所有决策。

小脑 (`sub_plan`方法) 并行处理具体的、独立的action判断。每个小脑接收一部分action,使用轻量级模型进行快速评估,从而实现并行化处理,减少了单一LLM调用的延迟。

这种新架构的主要优势包括:
- **性能提升**:通过并行化action判断,显著减少了规划器的总响应时间。
- **可扩展性**:添加新的action变得更加容易,因为它们可以被分配到不同的小脑中,而不会增加主规划流程的复杂性。
- **鲁棒性**:将复杂的规划任务分解为更小的、独立的单元,降低了单个点失败导致整个规划失败的风险。
- **成本效益**:允许为小脑配置更轻量、更快速的模型,优化了资源使用。
This commit is contained in:
minecraft1024a
2025-09-06 17:13:58 +08:00
parent 268dbcca5e
commit a0ddd525b3

View File

@@ -1,7 +1,11 @@
import orjson
import time
import traceback
from typing import Dict, Any, Optional, Tuple, List
import asyncio
import math
import random
import json
from typing import Dict, Any, Optional, Tuple, List, TYPE_CHECKING
from rich.traceback import install
from datetime import datetime
from json_repair import repair_json
@@ -19,12 +23,15 @@ from src.chat.utils.chat_message_builder import (
from src.chat.utils.utils import get_chat_type_and_target_info
from src.chat.planner_actions.action_manager import ActionManager
from src.chat.message_receive.chat_stream import get_chat_manager
from src.plugin_system.base.component_types import ActionInfo, ChatMode, ComponentType
from src.plugin_system.base.component_types import ActionInfo, ChatMode, ComponentType, ActionActivationType
from src.plugin_system.core.component_registry import component_registry
from src.schedule.schedule_manager import schedule_manager
from src.mood.mood_manager import mood_manager
from src.chat.memory_system.Hippocampus import hippocampus_manager
if TYPE_CHECKING:
pass
logger = get_logger("planner")
install(extra_lines=3)
@@ -110,6 +117,37 @@ def init_prompt():
"action_prompt",
)
Prompt(
"""
{name_block}
{chat_context_description}{time_block}现在请你根据以下聊天内容选择一个或多个合适的action。如果没有合适的action请选择no_action。,
{chat_content_block}
**要求**
1.action必须符合使用条件如果符合条件就选择
2.如果聊天内容不适合使用action即使符合条件也不要使用
3.{moderation_prompt}
4.请注意如果相同的内容已经被执行,请不要重复执行
这是你最近执行过的动作:
{actions_before_now_block}
**可用的action**
no_action不选择任何动作
{{
"action": "no_action",
"reason":"不动作的原因"
}}
{action_options_text}
请选择并说明触发action的消息id和选择该action的原因。消息id格式:m+数字
请根据动作示例,以严格的 JSON 格式输出,且仅包含 JSON 内容:
""",
"sub_planner_prompt",
)
class ActionPlanner:
def __init__(self, chat_id: str, action_manager: ActionManager):
@@ -117,14 +155,17 @@ class ActionPlanner:
self.log_prefix = f"[{get_chat_manager().get_stream_name(chat_id) or chat_id}]"
self.action_manager = action_manager
# LLM规划器配置
# --- 大脑 ---
self.planner_llm = LLMRequest(
model_set=model_config.model_task_config.planner, request_type="planner"
) # 用于动作规划
)
# --- 小脑 (新增) ---
# TODO: 可以在 model_config.toml 中为 planner_small 单独配置一个轻量级模型
self.planner_small_llm = LLMRequest(
model_set=model_config.model_task_config.planner, request_type="planner_small"
)
self.last_obs_time_mark = 0.0
# 添加重试计数器
self.plan_retry_count = 0
self.max_plan_retries = 3
async def _get_long_term_memory_context(self) -> str:
"""
@@ -237,6 +278,168 @@ class ActionPlanner:
# 假设消息列表是按时间顺序排列的,最后一个是最新的
return message_id_list[-1].get("message")
def _parse_single_action(
self,
action_json: dict,
message_id_list: list, # 使用 planner.py 的 list of dict
current_available_actions: list, # 使用 planner.py 的 list of tuple
) -> List[Dict[str, Any]]:
"""
[注释] 解析单个小脑LLM返回的action JSON并将其转换为标准化的字典。
"""
parsed_actions = []
try:
action = action_json.get("action", "no_action")
reasoning = action_json.get("reason", "未提供原因")
action_data = {k: v for k, v in action_json.items() if k not in ["action", "reason"]}
target_message = None
if action != "no_action":
if target_message_id := action_json.get("target_message_id"):
target_message = self.find_message_by_id(target_message_id, message_id_list)
if target_message is None:
logger.warning(f"{self.log_prefix}无法找到target_message_id '{target_message_id}'")
target_message = self.get_latest_message(message_id_list)
else:
logger.warning(f"{self.log_prefix}动作'{action}'缺少target_message_id")
available_action_names = [name for name, _ in current_available_actions]
if action not in ["no_action", "reply"] and action not in available_action_names:
logger.warning(
f"{self.log_prefix}LLM 返回了当前不可用或无效的动作: '{action}' (可用: {available_action_names}),将强制使用 'no_action'"
)
reasoning = f"LLM 返回了当前不可用的动作 '{action}' (可用: {available_action_names})。原始理由: {reasoning}"
action = "no_action"
# 将列表转换为字典格式以供将来使用
available_actions_dict = dict(current_available_actions)
parsed_actions.append(
{
"action_type": action,
"reasoning": reasoning,
"action_data": action_data,
"action_message": target_message,
"available_actions": available_actions_dict,
}
)
except Exception as e:
logger.error(f"{self.log_prefix}解析单个action时出错: {e}")
parsed_actions.append(
{
"action_type": "no_action",
"reasoning": f"解析action时出错: {e}",
"action_data": {},
"action_message": None,
"available_actions": dict(current_available_actions),
}
)
return parsed_actions
def _filter_no_actions(self, action_list: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""
[注释] 从一个action字典列表中过滤掉所有的 'no_action'
如果过滤后列表为空, 则返回一个空的列表, 或者根据需要返回一个默认的no_action字典。
"""
non_no_actions = [a for a in action_list if a.get("action_type") not in ["no_action", "no_reply"]]
if non_no_actions:
return non_no_actions
# 如果都是 no_action则返回一个包含第一个 no_action 的列表,以保留 reason
return action_list[:1] if action_list else []
async def sub_plan(
self,
action_list: list, # 使用 planner.py 的 list of tuple
chat_content_block: str,
message_id_list: list, # 使用 planner.py 的 list of dict
is_group_chat: bool = False,
chat_target_info: Optional[dict] = None,
) -> List[Dict[str, Any]]:
"""
[注释] "小脑"规划器。接收一小组actions使用轻量级LLM判断其中哪些应该被触发。
这是一个独立的、并行的思考单元。返回一个包含action字典的列表。
"""
try:
actions_before_now = get_actions_by_timestamp_with_chat(
chat_id=self.chat_id,
timestamp_start=time.time() - 1200,
timestamp_end=time.time(),
limit=20,
)
action_names_in_list = [name for name, _ in action_list]
filtered_actions = [
record for record in actions_before_now if record.get("action_name") in action_names_in_list
]
actions_before_now_block = build_readable_actions(actions=filtered_actions)
chat_context_description = "你现在正在一个群聊中"
if not is_group_chat and chat_target_info:
chat_target_name = chat_target_info.get("person_name") or chat_target_info.get("user_nickname") or "对方"
chat_context_description = f"你正在和 {chat_target_name} 私聊"
action_options_block = ""
for using_actions_name, using_actions_info in action_list:
param_text = ""
if using_actions_info.action_parameters:
param_text = "\n" + "\n".join(
f' "{p_name}":"{p_desc}"'
for p_name, p_desc in using_actions_info.action_parameters.items()
)
require_text = "\n".join(f"- {req}" for req in using_actions_info.action_require)
using_action_prompt = await global_prompt_manager.get_prompt_async("action_prompt")
action_options_block += using_action_prompt.format(
action_name=using_actions_name,
action_description=using_actions_info.description,
action_parameters=param_text,
action_require=require_text,
)
moderation_prompt_block = "请不要输出违法违规内容,不要输出色情,暴力,政治相关内容,如有敏感内容,请规避。"
time_block = f"当前时间:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
bot_name = global_config.bot.nickname
bot_nickname = f",也有人叫你{','.join(global_config.bot.alias_names)}" if global_config.bot.alias_names else ""
name_block = f"你的名字是{bot_name}{bot_nickname},请注意哪些是你自己的发言。"
planner_prompt_template = await global_prompt_manager.get_prompt_async("sub_planner_prompt")
prompt = planner_prompt_template.format(
time_block=time_block,
chat_context_description=chat_context_description,
chat_content_block=chat_content_block,
actions_before_now_block=actions_before_now_block,
action_options_text=action_options_block,
moderation_prompt=moderation_prompt_block,
name_block=name_block,
)
except Exception as e:
logger.error(f"构建小脑提示词时出错: {e}\n{traceback.format_exc()}")
return [{"action_type": "no_action", "reasoning": f"构建小脑Prompt时出错: {e}"}]
action_dicts: List[Dict[str, Any]] = []
try:
llm_content, (reasoning_content, _, _) = await self.planner_small_llm.generate_response_async(prompt=prompt)
if global_config.debug.show_prompt:
logger.info(f"{self.log_prefix}小脑原始提示词: {prompt}")
logger.info(f"{self.log_prefix}小脑原始响应: {llm_content}")
else:
logger.debug(f"{self.log_prefix}小脑原始响应: {llm_content}")
if llm_content:
parsed_json = orjson.loads(repair_json(llm_content))
if isinstance(parsed_json, list):
for item in parsed_json:
if isinstance(item, dict):
action_dicts.extend(self._parse_single_action(item, message_id_list, action_list))
elif isinstance(parsed_json, dict):
action_dicts.extend(self._parse_single_action(parsed_json, message_id_list, action_list))
except Exception as e:
logger.warning(f"{self.log_prefix}解析小脑响应JSON失败: {e}. LLM原始输出: '{llm_content}'")
action_dicts.append({"action_type": "no_action", "reasoning": f"解析小脑响应失败: {e}"})
if not action_dicts:
action_dicts.append({"action_type": "no_action", "reasoning": "小脑未返回有效action"})
return action_dicts
async def plan(
self,
mode: ChatMode = ChatMode.FOCUS,
@@ -244,153 +447,180 @@ class ActionPlanner:
available_actions: Optional[Dict[str, ActionInfo]] = None,
) -> Tuple[List[Dict[str, Any]], Optional[Dict[str, Any]]]:
"""
规划器 (Planner): 使用LLM根据上下文决定做出什么动作
[注释] "大脑"规划器
1. 启动多个并行的"小脑"(sub_plan)来决定是否执行具体的actions。
2. 自己(大脑)则专注于决定是否进行聊天回复(reply)。
3. 整合大脑和小脑的决策,返回最终要执行的动作列表。
"""
# --- 1. 准备上下文信息 ---
message_list_before_now = get_raw_msg_before_timestamp_with_chat(
chat_id=self.chat_id,
timestamp=time.time(),
limit=int(global_config.chat.max_context_size * 0.6),
)
# 大脑使用较长的上下文
chat_content_block, message_id_list = build_readable_messages_with_id(
messages=message_list_before_now,
timestamp_mode="normal",
read_mark=self.last_obs_time_mark,
truncate=True,
show_actions=True,
)
# 小脑使用较短、较新的上下文
message_list_before_now_short = message_list_before_now[-int(global_config.chat.max_context_size * 0.3) :]
chat_content_block_short, message_id_list_short = build_readable_messages_with_id(
messages=message_list_before_now_short,
timestamp_mode="normal",
truncate=False,
show_actions=False,
)
self.last_obs_time_mark = time.time()
action = "no_reply" # 默认动作
reasoning = "规划器初始化默认"
action_data = {}
current_available_actions: Dict[str, ActionInfo] = {}
target_message: Optional[Dict[str, Any]] = None # 初始化target_message变量
prompt: str = ""
message_id_list: list = []
is_group_chat, chat_target_info, current_available_actions = self.get_necessary_info()
if available_actions is None:
available_actions = current_available_actions
# --- 2. 启动小脑并行思考 ---
all_sub_planner_results: List[Dict[str, Any]] = []
try:
is_group_chat, chat_target_info, current_available_actions = self.get_necessary_info()
sub_planner_actions: Dict[str, ActionInfo] = {}
for action_name, action_info in available_actions.items():
if action_info.activation_type in [ActionActivationType.LLM_JUDGE, ActionActivationType.ALWAYS]:
sub_planner_actions[action_name] = action_info
elif action_info.activation_type == ActionActivationType.RANDOM:
if random.random() < action_info.random_activation_probability:
sub_planner_actions[action_name] = action_info
elif action_info.activation_type == ActionActivationType.KEYWORD:
if any(keyword in chat_content_block_short for keyword in action_info.activation_keywords):
sub_planner_actions[action_name] = action_info
# --- 构建提示词 (调用修改后的 PromptBuilder 方法) ---
prompt, message_id_list = await self.build_planner_prompt(
is_group_chat=is_group_chat, # <-- Pass HFC state
chat_target_info=chat_target_info, # <-- 传递获取到的聊天目标信息
current_available_actions=current_available_actions, # <-- Pass determined actions
if sub_planner_actions:
sub_planner_actions_num = len(sub_planner_actions)
# TODO: 您可以在 config.toml 的 [chat] 部分添加 planner_size = 5.0 来自定义此值
planner_size_config = getattr(global_config.chat, "planner_size", 5.0)
sub_planner_size = int(planner_size_config) + (
1 if random.random() < planner_size_config - int(planner_size_config) else 0
)
sub_planner_num = math.ceil(sub_planner_actions_num / sub_planner_size)
logger.info(f"{self.log_prefix}使用{sub_planner_num}个小脑进行思考 (尺寸: {sub_planner_size})")
action_items = list(sub_planner_actions.items())
random.shuffle(action_items)
sub_planner_lists = [action_items[i::sub_planner_num] for i in range(sub_planner_num)]
sub_plan_tasks = [
self.sub_plan(
action_list=action_group,
chat_content_block=chat_content_block_short,
message_id_list=message_id_list_short,
is_group_chat=is_group_chat,
chat_target_info=chat_target_info,
)
for action_group in sub_planner_lists
]
sub_plan_results = await asyncio.gather(*sub_plan_tasks)
for sub_result in sub_plan_results:
all_sub_planner_results.extend(sub_result)
sub_actions_str = ", ".join(
a["action_type"] for a in all_sub_planner_results if a["action_type"] != "no_action"
) or "no_action"
logger.info(f"{self.log_prefix}小脑决策: [{sub_actions_str}]")
except Exception as e:
logger.error(f"{self.log_prefix}小脑调度过程中出错: {e}\n{traceback.format_exc()}")
# --- 3. 大脑独立思考是否回复 ---
action, reasoning, action_data, target_message = "no_reply", "大脑初始化默认", {}, None
try:
prompt, _ = await self.build_planner_prompt(
is_group_chat=is_group_chat,
chat_target_info=chat_target_info,
current_available_actions={}, # 大脑不考虑具体action
mode=mode,
chat_content_block_override=chat_content_block,
message_id_list_override=message_id_list,
)
# --- 调用 LLM (普通文本生成) ---
llm_content = None
try:
llm_content, (reasoning_content, _, _) = await self.planner_llm.generate_response_async(prompt=prompt)
if global_config.debug.show_prompt:
logger.info(f"{self.log_prefix}规划器原始提示词: {prompt}")
logger.info(f"{self.log_prefix}规划器原始响应: {llm_content}")
if reasoning_content:
logger.info(f"{self.log_prefix}规划器推理: {reasoning_content}")
else:
logger.debug(f"{self.log_prefix}规划器原始提示词: {prompt}")
logger.debug(f"{self.log_prefix}规划器原始响应: {llm_content}")
if reasoning_content:
logger.debug(f"{self.log_prefix}规划器推理: {reasoning_content}")
except Exception as req_e:
logger.error(f"{self.log_prefix}LLM 请求执行失败: {req_e}")
reasoning = f"LLM 请求失败,模型出现问题: {req_e}"
action = "no_reply"
llm_content, _ = await self.planner_llm.generate_response_async(prompt=prompt)
if llm_content:
try:
parsed_json = orjson.loads(repair_json(llm_content))
if isinstance(parsed_json, list):
if parsed_json:
parsed_json = parsed_json[-1]
logger.warning(f"{self.log_prefix}LLM返回了多个JSON对象使用最后一个: {parsed_json}")
else:
parsed_json = {}
if not isinstance(parsed_json, dict):
logger.error(f"{self.log_prefix}解析后的JSON不是字典类型: {type(parsed_json)}")
parsed_json = {}
parsed_json = orjson.loads(repair_json(llm_content))
parsed_json = parsed_json[-1] if isinstance(parsed_json, list) and parsed_json else parsed_json
if isinstance(parsed_json, dict):
action = parsed_json.get("action", "no_reply")
reasoning = parsed_json.get("reason", "未提供原因")
# 将所有其他属性添加到action_data
for key, value in parsed_json.items():
if key not in ["action", "reason"]:
action_data[key] = value
# 非no_reply动作需要target_message_id
action_data = {k: v for k, v in parsed_json.items() if k not in ["action", "reason"]}
if action != "no_reply":
if target_message_id := parsed_json.get("target_message_id"):
# 根据target_message_id查找原始消息
target_message = self.find_message_by_id(target_message_id, message_id_list)
# 如果获取的target_message为None输出warning并重新plan
if target_message is None:
self.plan_retry_count += 1
logger.warning(
f"{self.log_prefix}无法找到target_message_id '{target_message_id}' 对应的消息,重试次数: {self.plan_retry_count}/{self.max_plan_retries}"
)
if target_id := parsed_json.get("target_message_id"):
target_message = self.find_message_by_id(target_id, message_id_list)
if not target_message:
target_message = self.get_latest_message(message_id_list)
logger.info(f"{self.log_prefix}大脑决策: [{action}]")
# 如果连续三次plan均为None输出error并选取最新消息
if self.plan_retry_count >= self.max_plan_retries:
logger.error(
f"{self.log_prefix}连续{self.max_plan_retries}次plan获取target_message失败选择最新消息作为target_message"
)
target_message = self.get_latest_message(message_id_list)
self.plan_retry_count = 0 # 重置计数器
else:
# 递归重新plan
return await self.plan(mode, loop_start_time, available_actions)
else:
# 成功获取到target_message重置计数器
self.plan_retry_count = 0
else:
logger.warning(f"{self.log_prefix}动作'{action}'缺少target_message_id")
except Exception as e:
logger.error(f"{self.log_prefix}大脑处理过程中发生意外错误: {e}\n{traceback.format_exc()}")
action, reasoning = "no_reply", f"大脑处理错误: {e}"
if action != "no_reply" and action != "reply" and action not in current_available_actions:
logger.warning(
f"{self.log_prefix}LLM 返回了当前不可用或无效的动作: '{action}' (可用: {list(current_available_actions.keys())}),将强制使用 'no_reply'"
)
reasoning = f"LLM 返回了当前不可用的动作 '{action}' (可用: {list(current_available_actions.keys())})。原始理由: {reasoning}"
action = "no_reply"
except Exception as json_e:
logger.warning(f"{self.log_prefix}解析LLM响应JSON失败 {json_e}. LLM原始输出: '{llm_content}'")
traceback.print_exc()
reasoning = f"解析LLM响应JSON失败: {json_e}. 将使用默认动作 'no_reply'."
action = "no_reply"
except Exception as outer_e:
logger.error(f"{self.log_prefix}Planner 处理过程中发生意外错误,规划失败,将执行 no_reply: {outer_e}")
traceback.print_exc()
action = "no_reply"
reasoning = f"Planner 内部处理错误: {outer_e}"
is_parallel = False
if mode == ChatMode.NORMAL and action in current_available_actions:
is_parallel = current_available_actions[action].parallel_action
# --- 4. 整合大脑和小脑的决策 ---
is_parallel = True
for info in all_sub_planner_results:
action_type = info.get("action_type")
if action_type and action_type not in ["no_action", "no_reply"]:
action_info = available_actions.get(action_type)
if action_info and not action_info.parallel_action:
is_parallel = False
break
action_data["loop_start_time"] = loop_start_time
final_actions: List[Dict[str, Any]] = []
actions = []
if is_parallel:
logger.info(f"{self.log_prefix}决策模式: 大脑与小脑并行")
if action not in ["no_action", "no_reply"]:
final_actions.append(
{
"action_type": action,
"reasoning": reasoning,
"action_data": action_data,
"action_message": target_message,
"available_actions": available_actions,
}
)
final_actions.extend(all_sub_planner_results)
else:
logger.info(f"{self.log_prefix}决策模式: 小脑优先 (检测到非并行action)")
final_actions.extend(all_sub_planner_results)
# 1. 添加Planner取得的动作
actions.append(
{
"action_type": action,
"reasoning": reasoning,
"action_data": action_data,
"action_message": target_message,
"available_actions": available_actions, # 添加这个字段
}
)
final_actions = self._filter_no_actions(final_actions)
if action != "reply" and is_parallel:
actions.append(
{"action_type": "reply", "action_message": target_message, "available_actions": available_actions}
)
if not final_actions:
final_actions = [
{
"action_type": "no_action",
"reasoning": "所有规划器都选择不执行动作",
"action_data": {}, "action_message": None, "available_actions": available_actions
}
]
return actions, target_message
final_target_message = target_message
if not final_target_message and final_actions:
final_target_message = next((act.get("action_message") for act in final_actions if act.get("action_message")), None)
actions_str = ", ".join([a.get('action_type', 'N/A') for a in final_actions])
logger.info(f"{self.log_prefix}最终执行动作 ({len(final_actions)}): [{actions_str}]")
return final_actions, final_target_message
async def build_planner_prompt(
self,
is_group_chat: bool, # Now passed as argument
chat_target_info: Optional[dict], # Now passed as argument
is_group_chat: bool,
chat_target_info: Optional[dict],
current_available_actions: Dict[str, ActionInfo],
refresh_time: bool = False,
mode: ChatMode = ChatMode.FOCUS,
) -> tuple[str, list]: # sourcery skip: use-join
chat_content_block_override: Optional[str] = None,
message_id_list_override: Optional[List] = None,
refresh_time: bool = False, # 添加缺失的参数
) -> tuple[str, list]:
"""构建 Planner LLM 的提示词 (获取模板并填充数据)"""
try:
# --- 通用信息获取 ---