From a0ddd525b315087b1c6eb798a28502d97b1952f8 Mon Sep 17 00:00:00 2001 From: minecraft1024a Date: Sat, 6 Sep 2025 17:13:58 +0800 Subject: [PATCH] =?UTF-8?q?refactor(chat):=20=E9=87=8D=E6=9E=84planner?= =?UTF-8?q?=E4=B8=BA=E5=A4=A7=E8=84=91/=E5=B0=8F=E8=84=91=E5=B9=B6?= =?UTF-8?q?=E8=A1=8C=E6=9E=B6=E6=9E=84=E4=BB=A5=E6=8F=90=E5=8D=87=E6=80=A7?= =?UTF-8?q?=E8=83=BD=E5=92=8C=E5=8F=AF=E6=89=A9=E5=B1=95=E6=80=A7(?= =?UTF-8?q?=E5=88=AB=E7=AE=A1=E8=83=BD=E4=B8=8D=E8=83=BD=E7=94=A8=E5=85=88?= =?UTF-8?q?=E6=8F=92=E8=BF=9B=E6=9D=A5=E5=86=8D=E8=AF=B4)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 将原有的单体`plan`方法重构为一个多智能体架构,包含一个"大脑"和多个并行的"小脑"。 大脑 (`plan`方法) 专注于决定是否进行聊天回复 (`reply`),并负责调度和整合所有决策。 小脑 (`sub_plan`方法) 并行处理具体的、独立的action判断。每个小脑接收一部分action,使用轻量级模型进行快速评估,从而实现并行化处理,减少了单一LLM调用的延迟。 这种新架构的主要优势包括: - **性能提升**:通过并行化action判断,显著减少了规划器的总响应时间。 - **可扩展性**:添加新的action变得更加容易,因为它们可以被分配到不同的小脑中,而不会增加主规划流程的复杂性。 - **鲁棒性**:将复杂的规划任务分解为更小的、独立的单元,降低了单个点失败导致整个规划失败的风险。 - **成本效益**:允许为小脑配置更轻量、更快速的模型,优化了资源使用。 --- src/chat/planner_actions/planner.py | 486 ++++++++++++++++++++-------- 1 file changed, 358 insertions(+), 128 deletions(-) diff --git a/src/chat/planner_actions/planner.py b/src/chat/planner_actions/planner.py index 80f974b94..deee8fdef 100644 --- a/src/chat/planner_actions/planner.py +++ b/src/chat/planner_actions/planner.py @@ -1,7 +1,11 @@ import orjson import time import traceback -from typing import Dict, Any, Optional, Tuple, List +import asyncio +import math +import random +import json +from typing import Dict, Any, Optional, Tuple, List, TYPE_CHECKING from rich.traceback import install from datetime import datetime from json_repair import repair_json @@ -19,12 +23,15 @@ from src.chat.utils.chat_message_builder import ( from src.chat.utils.utils import get_chat_type_and_target_info from src.chat.planner_actions.action_manager import ActionManager from src.chat.message_receive.chat_stream import get_chat_manager -from src.plugin_system.base.component_types import ActionInfo, ChatMode, ComponentType +from src.plugin_system.base.component_types import ActionInfo, ChatMode, ComponentType, ActionActivationType from src.plugin_system.core.component_registry import component_registry from src.schedule.schedule_manager import schedule_manager from src.mood.mood_manager import mood_manager from src.chat.memory_system.Hippocampus import hippocampus_manager +if TYPE_CHECKING: + pass + logger = get_logger("planner") install(extra_lines=3) @@ -110,6 +117,37 @@ def init_prompt(): "action_prompt", ) + Prompt( + """ +{name_block} + +{chat_context_description},{time_block},现在请你根据以下聊天内容,选择一个或多个合适的action。如果没有合适的action,请选择no_action。, +{chat_content_block} + +**要求** +1.action必须符合使用条件,如果符合条件,就选择 +2.如果聊天内容不适合使用action,即使符合条件,也不要使用 +3.{moderation_prompt} +4.请注意如果相同的内容已经被执行,请不要重复执行 +这是你最近执行过的动作: +{actions_before_now_block} + +**可用的action** + +no_action:不选择任何动作 +{{ + "action": "no_action", + "reason":"不动作的原因" +}} + +{action_options_text} + +请选择,并说明触发action的消息id和选择该action的原因。消息id格式:m+数字 +请根据动作示例,以严格的 JSON 格式输出,且仅包含 JSON 内容: +""", + "sub_planner_prompt", + ) + class ActionPlanner: def __init__(self, chat_id: str, action_manager: ActionManager): @@ -117,14 +155,17 @@ class ActionPlanner: self.log_prefix = f"[{get_chat_manager().get_stream_name(chat_id) or chat_id}]" self.action_manager = action_manager # LLM规划器配置 + # --- 大脑 --- self.planner_llm = LLMRequest( model_set=model_config.model_task_config.planner, request_type="planner" - ) # 用于动作规划 + ) + # --- 小脑 (新增) --- + # TODO: 可以在 model_config.toml 中为 planner_small 单独配置一个轻量级模型 + self.planner_small_llm = LLMRequest( + model_set=model_config.model_task_config.planner, request_type="planner_small" + ) self.last_obs_time_mark = 0.0 - # 添加重试计数器 - self.plan_retry_count = 0 - self.max_plan_retries = 3 async def _get_long_term_memory_context(self) -> str: """ @@ -237,6 +278,168 @@ class ActionPlanner: # 假设消息列表是按时间顺序排列的,最后一个是最新的 return message_id_list[-1].get("message") + def _parse_single_action( + self, + action_json: dict, + message_id_list: list, # 使用 planner.py 的 list of dict + current_available_actions: list, # 使用 planner.py 的 list of tuple + ) -> List[Dict[str, Any]]: + """ + [注释] 解析单个小脑LLM返回的action JSON,并将其转换为标准化的字典。 + """ + parsed_actions = [] + try: + action = action_json.get("action", "no_action") + reasoning = action_json.get("reason", "未提供原因") + action_data = {k: v for k, v in action_json.items() if k not in ["action", "reason"]} + + target_message = None + if action != "no_action": + if target_message_id := action_json.get("target_message_id"): + target_message = self.find_message_by_id(target_message_id, message_id_list) + if target_message is None: + logger.warning(f"{self.log_prefix}无法找到target_message_id '{target_message_id}'") + target_message = self.get_latest_message(message_id_list) + else: + logger.warning(f"{self.log_prefix}动作'{action}'缺少target_message_id") + + available_action_names = [name for name, _ in current_available_actions] + if action not in ["no_action", "reply"] and action not in available_action_names: + logger.warning( + f"{self.log_prefix}LLM 返回了当前不可用或无效的动作: '{action}' (可用: {available_action_names}),将强制使用 'no_action'" + ) + reasoning = f"LLM 返回了当前不可用的动作 '{action}' (可用: {available_action_names})。原始理由: {reasoning}" + action = "no_action" + + # 将列表转换为字典格式以供将来使用 + available_actions_dict = dict(current_available_actions) + parsed_actions.append( + { + "action_type": action, + "reasoning": reasoning, + "action_data": action_data, + "action_message": target_message, + "available_actions": available_actions_dict, + } + ) + except Exception as e: + logger.error(f"{self.log_prefix}解析单个action时出错: {e}") + parsed_actions.append( + { + "action_type": "no_action", + "reasoning": f"解析action时出错: {e}", + "action_data": {}, + "action_message": None, + "available_actions": dict(current_available_actions), + } + ) + return parsed_actions + + def _filter_no_actions(self, action_list: List[Dict[str, Any]]) -> List[Dict[str, Any]]: + """ + [注释] 从一个action字典列表中过滤掉所有的 'no_action'。 + 如果过滤后列表为空, 则返回一个空的列表, 或者根据需要返回一个默认的no_action字典。 + """ + non_no_actions = [a for a in action_list if a.get("action_type") not in ["no_action", "no_reply"]] + if non_no_actions: + return non_no_actions + # 如果都是 no_action,则返回一个包含第一个 no_action 的列表,以保留 reason + return action_list[:1] if action_list else [] + + async def sub_plan( + self, + action_list: list, # 使用 planner.py 的 list of tuple + chat_content_block: str, + message_id_list: list, # 使用 planner.py 的 list of dict + is_group_chat: bool = False, + chat_target_info: Optional[dict] = None, + ) -> List[Dict[str, Any]]: + """ + [注释] "小脑"规划器。接收一小组actions,使用轻量级LLM判断其中哪些应该被触发。 + 这是一个独立的、并行的思考单元。返回一个包含action字典的列表。 + """ + try: + actions_before_now = get_actions_by_timestamp_with_chat( + chat_id=self.chat_id, + timestamp_start=time.time() - 1200, + timestamp_end=time.time(), + limit=20, + ) + action_names_in_list = [name for name, _ in action_list] + filtered_actions = [ + record for record in actions_before_now if record.get("action_name") in action_names_in_list + ] + actions_before_now_block = build_readable_actions(actions=filtered_actions) + + chat_context_description = "你现在正在一个群聊中" + if not is_group_chat and chat_target_info: + chat_target_name = chat_target_info.get("person_name") or chat_target_info.get("user_nickname") or "对方" + chat_context_description = f"你正在和 {chat_target_name} 私聊" + + action_options_block = "" + for using_actions_name, using_actions_info in action_list: + param_text = "" + if using_actions_info.action_parameters: + param_text = "\n" + "\n".join( + f' "{p_name}":"{p_desc}"' + for p_name, p_desc in using_actions_info.action_parameters.items() + ) + require_text = "\n".join(f"- {req}" for req in using_actions_info.action_require) + using_action_prompt = await global_prompt_manager.get_prompt_async("action_prompt") + action_options_block += using_action_prompt.format( + action_name=using_actions_name, + action_description=using_actions_info.description, + action_parameters=param_text, + action_require=require_text, + ) + + moderation_prompt_block = "请不要输出违法违规内容,不要输出色情,暴力,政治相关内容,如有敏感内容,请规避。" + time_block = f"当前时间:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}" + bot_name = global_config.bot.nickname + bot_nickname = f",也有人叫你{','.join(global_config.bot.alias_names)}" if global_config.bot.alias_names else "" + name_block = f"你的名字是{bot_name}{bot_nickname},请注意哪些是你自己的发言。" + + planner_prompt_template = await global_prompt_manager.get_prompt_async("sub_planner_prompt") + prompt = planner_prompt_template.format( + time_block=time_block, + chat_context_description=chat_context_description, + chat_content_block=chat_content_block, + actions_before_now_block=actions_before_now_block, + action_options_text=action_options_block, + moderation_prompt=moderation_prompt_block, + name_block=name_block, + ) + except Exception as e: + logger.error(f"构建小脑提示词时出错: {e}\n{traceback.format_exc()}") + return [{"action_type": "no_action", "reasoning": f"构建小脑Prompt时出错: {e}"}] + + action_dicts: List[Dict[str, Any]] = [] + try: + llm_content, (reasoning_content, _, _) = await self.planner_small_llm.generate_response_async(prompt=prompt) + if global_config.debug.show_prompt: + logger.info(f"{self.log_prefix}小脑原始提示词: {prompt}") + logger.info(f"{self.log_prefix}小脑原始响应: {llm_content}") + else: + logger.debug(f"{self.log_prefix}小脑原始响应: {llm_content}") + + if llm_content: + parsed_json = orjson.loads(repair_json(llm_content)) + if isinstance(parsed_json, list): + for item in parsed_json: + if isinstance(item, dict): + action_dicts.extend(self._parse_single_action(item, message_id_list, action_list)) + elif isinstance(parsed_json, dict): + action_dicts.extend(self._parse_single_action(parsed_json, message_id_list, action_list)) + + except Exception as e: + logger.warning(f"{self.log_prefix}解析小脑响应JSON失败: {e}. LLM原始输出: '{llm_content}'") + action_dicts.append({"action_type": "no_action", "reasoning": f"解析小脑响应失败: {e}"}) + + if not action_dicts: + action_dicts.append({"action_type": "no_action", "reasoning": "小脑未返回有效action"}) + + return action_dicts + async def plan( self, mode: ChatMode = ChatMode.FOCUS, @@ -244,153 +447,180 @@ class ActionPlanner: available_actions: Optional[Dict[str, ActionInfo]] = None, ) -> Tuple[List[Dict[str, Any]], Optional[Dict[str, Any]]]: """ - 规划器 (Planner): 使用LLM根据上下文决定做出什么动作。 + [注释] "大脑"规划器。 + 1. 启动多个并行的"小脑"(sub_plan)来决定是否执行具体的actions。 + 2. 自己(大脑)则专注于决定是否进行聊天回复(reply)。 + 3. 整合大脑和小脑的决策,返回最终要执行的动作列表。 """ + # --- 1. 准备上下文信息 --- + message_list_before_now = get_raw_msg_before_timestamp_with_chat( + chat_id=self.chat_id, + timestamp=time.time(), + limit=int(global_config.chat.max_context_size * 0.6), + ) + # 大脑使用较长的上下文 + chat_content_block, message_id_list = build_readable_messages_with_id( + messages=message_list_before_now, + timestamp_mode="normal", + read_mark=self.last_obs_time_mark, + truncate=True, + show_actions=True, + ) + # 小脑使用较短、较新的上下文 + message_list_before_now_short = message_list_before_now[-int(global_config.chat.max_context_size * 0.3) :] + chat_content_block_short, message_id_list_short = build_readable_messages_with_id( + messages=message_list_before_now_short, + timestamp_mode="normal", + truncate=False, + show_actions=False, + ) + self.last_obs_time_mark = time.time() - action = "no_reply" # 默认动作 - reasoning = "规划器初始化默认" - action_data = {} - current_available_actions: Dict[str, ActionInfo] = {} - target_message: Optional[Dict[str, Any]] = None # 初始化target_message变量 - prompt: str = "" - message_id_list: list = [] + is_group_chat, chat_target_info, current_available_actions = self.get_necessary_info() + if available_actions is None: + available_actions = current_available_actions + # --- 2. 启动小脑并行思考 --- + all_sub_planner_results: List[Dict[str, Any]] = [] try: - is_group_chat, chat_target_info, current_available_actions = self.get_necessary_info() + sub_planner_actions: Dict[str, ActionInfo] = {} + for action_name, action_info in available_actions.items(): + if action_info.activation_type in [ActionActivationType.LLM_JUDGE, ActionActivationType.ALWAYS]: + sub_planner_actions[action_name] = action_info + elif action_info.activation_type == ActionActivationType.RANDOM: + if random.random() < action_info.random_activation_probability: + sub_planner_actions[action_name] = action_info + elif action_info.activation_type == ActionActivationType.KEYWORD: + if any(keyword in chat_content_block_short for keyword in action_info.activation_keywords): + sub_planner_actions[action_name] = action_info - # --- 构建提示词 (调用修改后的 PromptBuilder 方法) --- - prompt, message_id_list = await self.build_planner_prompt( - is_group_chat=is_group_chat, # <-- Pass HFC state - chat_target_info=chat_target_info, # <-- 传递获取到的聊天目标信息 - current_available_actions=current_available_actions, # <-- Pass determined actions + if sub_planner_actions: + sub_planner_actions_num = len(sub_planner_actions) + # TODO: 您可以在 config.toml 的 [chat] 部分添加 planner_size = 5.0 来自定义此值 + planner_size_config = getattr(global_config.chat, "planner_size", 5.0) + sub_planner_size = int(planner_size_config) + ( + 1 if random.random() < planner_size_config - int(planner_size_config) else 0 + ) + sub_planner_num = math.ceil(sub_planner_actions_num / sub_planner_size) + logger.info(f"{self.log_prefix}使用{sub_planner_num}个小脑进行思考 (尺寸: {sub_planner_size})") + + action_items = list(sub_planner_actions.items()) + random.shuffle(action_items) + sub_planner_lists = [action_items[i::sub_planner_num] for i in range(sub_planner_num)] + + sub_plan_tasks = [ + self.sub_plan( + action_list=action_group, + chat_content_block=chat_content_block_short, + message_id_list=message_id_list_short, + is_group_chat=is_group_chat, + chat_target_info=chat_target_info, + ) + for action_group in sub_planner_lists + ] + sub_plan_results = await asyncio.gather(*sub_plan_tasks) + for sub_result in sub_plan_results: + all_sub_planner_results.extend(sub_result) + + sub_actions_str = ", ".join( + a["action_type"] for a in all_sub_planner_results if a["action_type"] != "no_action" + ) or "no_action" + logger.info(f"{self.log_prefix}小脑决策: [{sub_actions_str}]") + + except Exception as e: + logger.error(f"{self.log_prefix}小脑调度过程中出错: {e}\n{traceback.format_exc()}") + + # --- 3. 大脑独立思考是否回复 --- + action, reasoning, action_data, target_message = "no_reply", "大脑初始化默认", {}, None + try: + prompt, _ = await self.build_planner_prompt( + is_group_chat=is_group_chat, + chat_target_info=chat_target_info, + current_available_actions={}, # 大脑不考虑具体action mode=mode, + chat_content_block_override=chat_content_block, + message_id_list_override=message_id_list, ) - - # --- 调用 LLM (普通文本生成) --- - llm_content = None - try: - llm_content, (reasoning_content, _, _) = await self.planner_llm.generate_response_async(prompt=prompt) - - if global_config.debug.show_prompt: - logger.info(f"{self.log_prefix}规划器原始提示词: {prompt}") - logger.info(f"{self.log_prefix}规划器原始响应: {llm_content}") - if reasoning_content: - logger.info(f"{self.log_prefix}规划器推理: {reasoning_content}") - else: - logger.debug(f"{self.log_prefix}规划器原始提示词: {prompt}") - logger.debug(f"{self.log_prefix}规划器原始响应: {llm_content}") - if reasoning_content: - logger.debug(f"{self.log_prefix}规划器推理: {reasoning_content}") - - except Exception as req_e: - logger.error(f"{self.log_prefix}LLM 请求执行失败: {req_e}") - reasoning = f"LLM 请求失败,模型出现问题: {req_e}" - action = "no_reply" + llm_content, _ = await self.planner_llm.generate_response_async(prompt=prompt) if llm_content: - try: - parsed_json = orjson.loads(repair_json(llm_content)) - - if isinstance(parsed_json, list): - if parsed_json: - parsed_json = parsed_json[-1] - logger.warning(f"{self.log_prefix}LLM返回了多个JSON对象,使用最后一个: {parsed_json}") - else: - parsed_json = {} - - if not isinstance(parsed_json, dict): - logger.error(f"{self.log_prefix}解析后的JSON不是字典类型: {type(parsed_json)}") - parsed_json = {} - + parsed_json = orjson.loads(repair_json(llm_content)) + parsed_json = parsed_json[-1] if isinstance(parsed_json, list) and parsed_json else parsed_json + if isinstance(parsed_json, dict): action = parsed_json.get("action", "no_reply") reasoning = parsed_json.get("reason", "未提供原因") - - # 将所有其他属性添加到action_data - for key, value in parsed_json.items(): - if key not in ["action", "reason"]: - action_data[key] = value - - # 非no_reply动作需要target_message_id + action_data = {k: v for k, v in parsed_json.items() if k not in ["action", "reason"]} if action != "no_reply": - if target_message_id := parsed_json.get("target_message_id"): - # 根据target_message_id查找原始消息 - target_message = self.find_message_by_id(target_message_id, message_id_list) - # 如果获取的target_message为None,输出warning并重新plan - if target_message is None: - self.plan_retry_count += 1 - logger.warning( - f"{self.log_prefix}无法找到target_message_id '{target_message_id}' 对应的消息,重试次数: {self.plan_retry_count}/{self.max_plan_retries}" - ) + if target_id := parsed_json.get("target_message_id"): + target_message = self.find_message_by_id(target_id, message_id_list) + if not target_message: + target_message = self.get_latest_message(message_id_list) + logger.info(f"{self.log_prefix}大脑决策: [{action}]") - # 如果连续三次plan均为None,输出error并选取最新消息 - if self.plan_retry_count >= self.max_plan_retries: - logger.error( - f"{self.log_prefix}连续{self.max_plan_retries}次plan获取target_message失败,选择最新消息作为target_message" - ) - target_message = self.get_latest_message(message_id_list) - self.plan_retry_count = 0 # 重置计数器 - else: - # 递归重新plan - return await self.plan(mode, loop_start_time, available_actions) - else: - # 成功获取到target_message,重置计数器 - self.plan_retry_count = 0 - else: - logger.warning(f"{self.log_prefix}动作'{action}'缺少target_message_id") + except Exception as e: + logger.error(f"{self.log_prefix}大脑处理过程中发生意外错误: {e}\n{traceback.format_exc()}") + action, reasoning = "no_reply", f"大脑处理错误: {e}" - if action != "no_reply" and action != "reply" and action not in current_available_actions: - logger.warning( - f"{self.log_prefix}LLM 返回了当前不可用或无效的动作: '{action}' (可用: {list(current_available_actions.keys())}),将强制使用 'no_reply'" - ) - reasoning = f"LLM 返回了当前不可用的动作 '{action}' (可用: {list(current_available_actions.keys())})。原始理由: {reasoning}" - action = "no_reply" - - except Exception as json_e: - logger.warning(f"{self.log_prefix}解析LLM响应JSON失败 {json_e}. LLM原始输出: '{llm_content}'") - traceback.print_exc() - reasoning = f"解析LLM响应JSON失败: {json_e}. 将使用默认动作 'no_reply'." - action = "no_reply" - - except Exception as outer_e: - logger.error(f"{self.log_prefix}Planner 处理过程中发生意外错误,规划失败,将执行 no_reply: {outer_e}") - traceback.print_exc() - action = "no_reply" - reasoning = f"Planner 内部处理错误: {outer_e}" - - is_parallel = False - if mode == ChatMode.NORMAL and action in current_available_actions: - is_parallel = current_available_actions[action].parallel_action + # --- 4. 整合大脑和小脑的决策 --- + is_parallel = True + for info in all_sub_planner_results: + action_type = info.get("action_type") + if action_type and action_type not in ["no_action", "no_reply"]: + action_info = available_actions.get(action_type) + if action_info and not action_info.parallel_action: + is_parallel = False + break action_data["loop_start_time"] = loop_start_time + final_actions: List[Dict[str, Any]] = [] - actions = [] + if is_parallel: + logger.info(f"{self.log_prefix}决策模式: 大脑与小脑并行") + if action not in ["no_action", "no_reply"]: + final_actions.append( + { + "action_type": action, + "reasoning": reasoning, + "action_data": action_data, + "action_message": target_message, + "available_actions": available_actions, + } + ) + final_actions.extend(all_sub_planner_results) + else: + logger.info(f"{self.log_prefix}决策模式: 小脑优先 (检测到非并行action)") + final_actions.extend(all_sub_planner_results) - # 1. 添加Planner取得的动作 - actions.append( - { - "action_type": action, - "reasoning": reasoning, - "action_data": action_data, - "action_message": target_message, - "available_actions": available_actions, # 添加这个字段 - } - ) + final_actions = self._filter_no_actions(final_actions) - if action != "reply" and is_parallel: - actions.append( - {"action_type": "reply", "action_message": target_message, "available_actions": available_actions} - ) + if not final_actions: + final_actions = [ + { + "action_type": "no_action", + "reasoning": "所有规划器都选择不执行动作", + "action_data": {}, "action_message": None, "available_actions": available_actions + } + ] - return actions, target_message + final_target_message = target_message + if not final_target_message and final_actions: + final_target_message = next((act.get("action_message") for act in final_actions if act.get("action_message")), None) + + actions_str = ", ".join([a.get('action_type', 'N/A') for a in final_actions]) + logger.info(f"{self.log_prefix}最终执行动作 ({len(final_actions)}): [{actions_str}]") + + return final_actions, final_target_message async def build_planner_prompt( self, - is_group_chat: bool, # Now passed as argument - chat_target_info: Optional[dict], # Now passed as argument + is_group_chat: bool, + chat_target_info: Optional[dict], current_available_actions: Dict[str, ActionInfo], - refresh_time: bool = False, mode: ChatMode = ChatMode.FOCUS, - ) -> tuple[str, list]: # sourcery skip: use-join + chat_content_block_override: Optional[str] = None, + message_id_list_override: Optional[List] = None, + refresh_time: bool = False, # 添加缺失的参数 + ) -> tuple[str, list]: """构建 Planner LLM 的提示词 (获取模板并填充数据)""" try: # --- 通用信息获取 ---