From 626114c59372b3f608e8a279a0c98056524d65c3 Mon Sep 17 00:00:00 2001 From: minecraft1024a Date: Wed, 12 Nov 2025 12:23:20 +0800 Subject: [PATCH] =?UTF-8?q?feat(prompt):=20=E9=87=8D=E6=9E=84=E6=8F=90?= =?UTF-8?q?=E7=A4=BA=E8=AF=8D=E7=AE=A1=E7=90=86=E5=99=A8=E4=B8=BA=E5=8A=A8?= =?UTF-8?q?=E6=80=81=E5=8F=AF=E8=A7=82=E6=B5=8B=E7=9A=84=E6=B3=A8=E5=85=A5?= =?UTF-8?q?=E4=B8=AD=E5=BF=83?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 本次提交对 `PromptComponentManager` 进行了彻底的重构,将其从一个无状态的、按需计算的工具转变为一个有状态的、支持动态配置和实时观测的统一管理中心。 核心改进包括: - **统一规则存储**: 引入 `_dynamic_rules` 作为唯一的真实来源。系统启动时会加载所有组件的静态 `injection_rules` 作为默认配置,后续可动态修改。 - **动态API**: 新增 `add_injection_rule` 和 `remove_injection_rule` 方法,允许在运行时动态地添加、更新和移除注入规则,极大地提升了灵活性。 - **可观测性**: 提供了一套完整的状态查询API(如 `get_full_injection_map`, `get_injections_for_prompt`),使系统当前的注入状态完全透明,便于调试和监控。 - **性能优化**: `apply_injections` 流程被极大简化,它现在直接从预加载的规则集中获取内容并应用注入,避免了每次调用都重新扫描和实例化组件的开销。 --- src/chat/utils/prompt_component_manager.py | 262 +++++++++++++-------- 1 file changed, 163 insertions(+), 99 deletions(-) diff --git a/src/chat/utils/prompt_component_manager.py b/src/chat/utils/prompt_component_manager.py index 3c68630d1..1f56e9ed8 100644 --- a/src/chat/utils/prompt_component_manager.py +++ b/src/chat/utils/prompt_component_manager.py @@ -1,6 +1,9 @@ import asyncio +import copy import re +from typing import Awaitable, Callable +from src.chat.utils.prompt import global_prompt_manager from src.chat.utils.prompt_params import PromptParameters from src.common.logger import get_logger from src.plugin_system.base.base_prompt import BasePrompt @@ -10,123 +13,140 @@ from src.plugin_system.core.component_registry import component_registry logger = get_logger("prompt_component_manager") + class PromptComponentManager: """ - 管理所有 `BasePrompt` 组件的单例类。 + 统一的、动态的、可观测的提示词管理中心。 该管理器负责: - 1. 从 `component_registry` 中查询 `BasePrompt` 子类。 - 2. 根据注入点(目标Prompt名称)对它们进行筛选。 - 3. 提供一个接口,以便在构建核心Prompt时,能够获取并执行所有相关的组件。 + 1. 在启动时,将所有 `BasePrompt` 组件的静态 `injection_rules` 加载为默认的动态规则。 + 2. 提供 API 以在运行时动态地添加、更新、移除注入规则。 + 3. 提供查询 API 以观测系统当前的完整注入状态。 + 4. 在构建核心 Prompt 时,根据统一的规则集应用注入,修改模板。 """ - def _get_rules_for(self, target_prompt_name: str) -> list[tuple[InjectionRule, type[BasePrompt]]]: - """ - 获取指定目标Prompt的所有注入规则及其关联的组件类。 + def __init__(self): + """初始化管理器。""" + # _dynamic_rules 存储统一的注入规则 + # 结构: { "target_prompt_name": { "prompt_component_name": (InjectionRule, content_provider, source) } } + self._dynamic_rules: dict[str, dict[str, tuple[InjectionRule, Callable[..., Awaitable[str]], str]]] = {} + self._lock = asyncio.Lock() + self._initialized = False - Args: - target_prompt_name (str): 目标 Prompt 的名称。 - - Returns: - list[tuple[InjectionRule, Type[BasePrompt]]]: 一个元组列表, - 每个元组包含一个注入规则和其对应的 Prompt 组件类,并已根据优先级排序。 + def load_static_rules(self): """ - # 从注册表中获取所有已启用的 PROMPT 类型的组件 + 在系统启动时被调用,扫描所有已注册的 Prompt 组件, + 将其静态的 `injection_rules` 转换为动态规则并加载到管理器中。 + """ + if self._initialized: + return + logger.info("正在加载静态 Prompt 注入规则...") enabled_prompts = component_registry.get_enabled_components_by_type(ComponentType.PROMPT) - matching_rules = [] - # 遍历所有启用的 Prompt 组件,查找与目标 Prompt 相关的注入规则 for prompt_name, prompt_info in enabled_prompts.items(): if not isinstance(prompt_info, PromptInfo): continue - # prompt_info.injection_rules 已经经过了后向兼容处理,确保总是列表 - for rule in prompt_info.injection_rules: - # 如果规则的目标是当前指定的 Prompt - if rule.target_prompt == target_prompt_name: - # 获取该规则对应的组件类 - component_class = component_registry.get_component_class(prompt_name, ComponentType.PROMPT) - # 确保获取到的确实是一个 BasePrompt 的子类 - if component_class and issubclass(component_class, BasePrompt): - matching_rules.append((rule, component_class)) + component_class = component_registry.get_component_class(prompt_name, ComponentType.PROMPT) + if not (component_class and issubclass(component_class, BasePrompt)): + logger.warning(f"无法为 '{prompt_name}' 加载静态规则,因为它不是一个有效的 Prompt 组件。") + continue - # 根据规则的优先级进行排序,数字越小,优先级越高,越先应用 - matching_rules.sort(key=lambda x: x[0].priority) - return matching_rules + def create_provider(cls: type[BasePrompt]) -> Callable[[PromptParameters], Awaitable[str]]: + """为静态组件创建一个内容提供者闭包。""" + + async def content_provider(params: PromptParameters) -> str: + try: + p_info = component_registry.get_component_info(cls.prompt_name, ComponentType.PROMPT) + plugin_config = {} + if isinstance(p_info, PromptInfo): + plugin_config = component_registry.get_plugin_config(p_info.plugin_name) + + instance = cls(params=params, plugin_config=plugin_config) + result = await instance.execute() + return str(result) if result is not None else "" + except Exception as e: + logger.error(f"执行静态规则提供者 '{cls.prompt_name}' 时出错: {e}", exc_info=True) + return "" + + return content_provider + + for rule in prompt_info.injection_rules: + provider = create_provider(component_class) + target_rules = self._dynamic_rules.setdefault(rule.target_prompt, {}) + target_rules[prompt_name] = (rule, provider, "static_default") + + self._initialized = True + logger.info("静态 Prompt 注入规则加载完成。") + + + async def add_injection_rule( + self, + prompt_name: str, + rule: InjectionRule, + content_provider: Callable[..., Awaitable[str]] | None = None, + source: str = "runtime", + ) -> bool: + """ + 动态添加或更新一条注入规则。 + + Args: + prompt_name (str): 动态注入组件名称。 + rule (InjectionRule): 注入规则。 + content_provider (Callable | None, optional): 动态内容提供者。 + 如果提供,apply_injections 时会调用此函数获取注入内容。 + 函数签名应为: async def provider(params: "PromptParameters") -> str + source (str, optional): 规则来源,默认为 "runtime"。 + """ + if not content_provider: + logger.error(f"为 '{prompt_name}' 添加动态注入规则失败:必须提供 content_provider。") + return False + + async with self._lock: + target_rules = self._dynamic_rules.setdefault(rule.target_prompt, {}) + target_rules[prompt_name] = (rule, content_provider, source) + logger.info(f"成功添加/更新注入规则: '{prompt_name}' -> '{rule.target_prompt}'") + return True + + async def remove_injection_rule(self, prompt_name: str, target_prompt: str) -> bool: + """移除一条动态注入规则。""" + async with self._lock: + if target_prompt in self._dynamic_rules and prompt_name in self._dynamic_rules[target_prompt]: + del self._dynamic_rules[target_prompt][prompt_name] + if not self._dynamic_rules[target_prompt]: + del self._dynamic_rules[target_prompt] + logger.info(f"成功移除注入规则: '{prompt_name}' from '{target_prompt}'") + return True + logger.warning(f"尝试移除注入规则失败: 未找到 '{prompt_name}' on '{target_prompt}'") + return False async def apply_injections( self, target_prompt_name: str, original_template: str, params: PromptParameters ) -> str: """ - 获取、实例化并执行所有相关组件,然后根据注入规则修改原始模板。 - - 这是一个三步走的过程: - 1. 实例化所有需要执行的组件。 - 2. 并行执行它们的 `execute` 方法以获取注入内容。 - 3. 按照优先级顺序,将内容注入到原始模板中。 - - Args: - target_prompt_name (str): 目标 Prompt 的名称。 - original_template (str): 原始的、未经修改的 Prompt 模板字符串。 - params (PromptParameters): 传递给 Prompt 组件实例的参数。 - - Returns: - str: 应用了所有注入规则后,修改过的 Prompt 模板字符串。 + 【核心方法】根据目标名称,应用所有匹配的注入规则,返回修改后的模板。 """ - rules_with_classes = self._get_rules_for(target_prompt_name) - # 如果没有找到任何匹配的规则,就直接返回原始模板,啥也不干 - if not rules_with_classes: + if not self._initialized: + self.load_static_rules() + + # 1. 从 _dynamic_rules 中获取所有相关的规则 + rules_for_target = list(self._dynamic_rules.get(target_prompt_name, {}).values()) + if not rules_for_target: return original_template - # --- 第一步: 实例化所有需要执行的组件 --- - instance_map = {} # 存储组件实例,虽然目前没直接用,但留着总没错 - tasks = [] # 存放所有需要并行执行的 execute 异步任务 - components_to_execute = [] # 存放需要执行的组件类,用于后续结果映射 + # 2. 按优先级排序 + rules_for_target.sort(key=lambda x: x[0].priority) - for rule, component_class in rules_with_classes: - # 如果注入类型是 REMOVE,那就不需要执行组件了,因为它不产生内容 + # 3. 依次执行内容提供者 (content_provider) 并拼接模板 + modified_template = original_template + for rule, provider, source in rules_for_target: + content = "" if rule.injection_type != InjectionType.REMOVE: try: - # 获取组件的元信息,主要是为了拿到插件名称来读取插件配置 - prompt_info = component_registry.get_component_info( - component_class.prompt_name, ComponentType.PROMPT - ) - if not isinstance(prompt_info, PromptInfo): - plugin_config = {} - else: - # 从注册表获取该组件所属插件的配置 - plugin_config = component_registry.get_plugin_config(prompt_info.plugin_name) - - # 实例化组件,并传入参数和插件配置 - instance = component_class(params=params, plugin_config=plugin_config) - instance_map[component_class.prompt_name] = instance - # 将组件的 execute 方法作为一个任务添加到列表中 - tasks.append(instance.execute()) - components_to_execute.append(component_class) + content = await provider(params) except Exception as e: - logger.error(f"实例化 Prompt 组件 '{component_class.prompt_name}' 失败: {e}") - # 即使失败,也添加一个立即完成的空任务,以保持与其他任务的索引同步 - tasks.append(asyncio.create_task(asyncio.sleep(0, result=e))) # type: ignore - - # --- 第二步: 并行执行所有组件的 execute 方法 --- - # 使用 asyncio.gather 来同时运行所有任务,提高效率 - results = await asyncio.gather(*tasks, return_exceptions=True) - # 创建一个从组件名到执行结果的映射,方便后续查找 - result_map = { - components_to_execute[i].prompt_name: res - for i, res in enumerate(results) - if not isinstance(res, Exception) # 只包含成功的结果 - } - # 单独处理并记录执行失败的组件 - for i, res in enumerate(results): - if isinstance(res, Exception): - logger.error(f"执行 Prompt 组件 '{components_to_execute[i].prompt_name}' 失败: {res}") - - # --- 第三步: 按优先级顺序应用注入规则 --- - modified_template = original_template - for rule, component_class in rules_with_classes: - # 从结果映射中获取该组件生成的内容 - content = result_map.get(component_class.prompt_name) + logger.error(f"执行规则 '{rule}' (来源: {source}) 的内容提供者时失败: {e}") + continue # 跳过失败的 provider try: if rule.injection_type == InjectionType.PREPEND: @@ -136,28 +156,72 @@ class PromptComponentManager: if content: modified_template = f"{modified_template}\n{content}" elif rule.injection_type == InjectionType.REPLACE: - # 使用正则表达式替换目标内容 - if content and rule.target_content: + if content is not None and rule.target_content: modified_template = re.sub(rule.target_content, str(content), modified_template) elif rule.injection_type == InjectionType.INSERT_AFTER: - # 在匹配到的内容后面插入 if content and rule.target_content: - # re.sub a little trick: \g<0> represents the entire matched string replacement = f"\\g<0>\n{content}" modified_template = re.sub(rule.target_content, replacement, modified_template) elif rule.injection_type == InjectionType.REMOVE: - # 使用正则表达式移除目标内容 if rule.target_content: modified_template = re.sub(rule.target_content, "", modified_template) except re.error as e: - logger.error( - f"在为 '{component_class.prompt_name}' 应用规则时发生正则错误: {e} (pattern: '{rule.target_content}')" - ) + logger.error(f"应用规则时发生正则错误: {e} (pattern: '{rule.target_content}')") except Exception as e: - logger.error(f"应用 Prompt 注入规则 '{rule}' 失败: {e}") + logger.error(f"应用注入规则 '{rule}' 失败: {e}") return modified_template + # --- 状态查询API --- + + def get_core_prompts(self) -> list[str]: + """获取所有已注册的核心提示词模板名称列表(即所有可注入的目标)。""" + return list(global_prompt_manager._prompts.keys()) + + def get_core_prompt_contents(self) -> dict[str, str]: + """获取所有核心提示词模板的原始内容。""" + return {name: prompt.template for name, prompt in global_prompt_manager._prompts.items()} + + def get_registered_prompt_components(self) -> list[PromptInfo]: + """获取所有在 ComponentRegistry 中注册的 Prompt 组件信息。""" + components = component_registry.get_components_by_type(ComponentType.PROMPT).values() + return [info for info in components if isinstance(info, PromptInfo)] + + async def get_full_injection_map(self) -> dict[str, list[dict]]: + """获取当前完整的注入映射图。""" + injection_map = {} + async with self._lock: + all_targets = set(self._dynamic_rules.keys()) | set(self.get_core_prompts()) + for target in all_targets: + rules = self._dynamic_rules.get(target, {}) + if not rules: + injection_map[target] = [] + continue + + info_list = [] + for prompt_name, (rule, _, source) in rules.items(): + info_list.append( + {"name": prompt_name, "priority": rule.priority, "source": source} + ) + info_list.sort(key=lambda x: x["priority"]) + injection_map[target] = info_list + return injection_map + + async def get_injections_for_prompt(self, target_prompt_name: str) -> list[dict]: + """获取指定核心提示词模板的所有注入信息。""" + full_map = await self.get_full_injection_map() + return full_map.get(target_prompt_name, []) + + def get_all_dynamic_rules(self) -> dict[str, dict[str, 'InjectionRule']]: + """获取所有当前的动态注入规则,以 InjectionRule 对象形式返回。""" + rules_copy = {} + # 只返回规则对象,隐藏 provider 实现细节 + for target, rules in self._dynamic_rules.items(): + target_copy = {} + for name, (rule, _, _) in rules.items(): + target_copy[name] = rule + rules_copy[target] = target_copy + return copy.deepcopy(rules_copy) # 创建全局单例 prompt_component_manager = PromptComponentManager()