feat(prompt): 重构提示词管理器为动态可观测的注入中心

本次提交对 `PromptComponentManager` 进行了彻底的重构,将其从一个无状态的、按需计算的工具转变为一个有状态的、支持动态配置和实时观测的统一管理中心。

核心改进包括:
- **统一规则存储**: 引入 `_dynamic_rules` 作为唯一的真实来源。系统启动时会加载所有组件的静态 `injection_rules` 作为默认配置,后续可动态修改。
- **动态API**: 新增 `add_injection_rule` 和 `remove_injection_rule` 方法,允许在运行时动态地添加、更新和移除注入规则,极大地提升了灵活性。
- **可观测性**: 提供了一套完整的状态查询API(如 `get_full_injection_map`, `get_injections_for_prompt`),使系统当前的注入状态完全透明,便于调试和监控。
- **性能优化**: `apply_injections` 流程被极大简化,它现在直接从预加载的规则集中获取内容并应用注入,避免了每次调用都重新扫描和实例化组件的开销。
This commit is contained in:
minecraft1024a
2025-11-12 12:23:20 +08:00
parent 2edc049524
commit 626114c593

View File

@@ -1,6 +1,9 @@
import asyncio
import copy
import re
from typing import Awaitable, Callable
from src.chat.utils.prompt import global_prompt_manager
from src.chat.utils.prompt_params import PromptParameters
from src.common.logger import get_logger
from src.plugin_system.base.base_prompt import BasePrompt
@@ -10,123 +13,140 @@ from src.plugin_system.core.component_registry import component_registry
logger = get_logger("prompt_component_manager")
class PromptComponentManager:
"""
管理所有 `BasePrompt` 组件的单例类
统一的、动态的、可观测的提示词管理中心
该管理器负责:
1. 从 `component_registry` 中查询 `BasePrompt` 子类
2. 根据注入点目标Prompt名称对它们进行筛选
3. 提供一个接口以便在构建核心Prompt时能够获取并执行所有相关的组件
1. 在启动时,将所有 `BasePrompt` 组件的静态 `injection_rules` 加载为默认的动态规则
2. 提供 API 以在运行时动态地添加、更新、移除注入规则
3. 提供查询 API 以观测系统当前的完整注入状态
4. 在构建核心 Prompt 时,根据统一的规则集应用注入,修改模板。
"""
def _get_rules_for(self, target_prompt_name: str) -> list[tuple[InjectionRule, type[BasePrompt]]]:
"""
获取指定目标Prompt的所有注入规则及其关联的组件类。
def __init__(self):
"""初始化管理器。"""
# _dynamic_rules 存储统一的注入规则
# 结构: { "target_prompt_name": { "prompt_component_name": (InjectionRule, content_provider, source) } }
self._dynamic_rules: dict[str, dict[str, tuple[InjectionRule, Callable[..., Awaitable[str]], str]]] = {}
self._lock = asyncio.Lock()
self._initialized = False
Args:
target_prompt_name (str): 目标 Prompt 的名称。
Returns:
list[tuple[InjectionRule, Type[BasePrompt]]]: 一个元组列表,
每个元组包含一个注入规则和其对应的 Prompt 组件类,并已根据优先级排序。
def load_static_rules(self):
"""
# 从注册表中获取所有已启用的 PROMPT 类型的组件
在系统启动时被调用,扫描所有已注册的 Prompt 组件,
将其静态的 `injection_rules` 转换为动态规则并加载到管理器中。
"""
if self._initialized:
return
logger.info("正在加载静态 Prompt 注入规则...")
enabled_prompts = component_registry.get_enabled_components_by_type(ComponentType.PROMPT)
matching_rules = []
# 遍历所有启用的 Prompt 组件,查找与目标 Prompt 相关的注入规则
for prompt_name, prompt_info in enabled_prompts.items():
if not isinstance(prompt_info, PromptInfo):
continue
# prompt_info.injection_rules 已经经过了后向兼容处理,确保总是列表
for rule in prompt_info.injection_rules:
# 如果规则的目标是当前指定的 Prompt
if rule.target_prompt == target_prompt_name:
# 获取该规则对应的组件类
component_class = component_registry.get_component_class(prompt_name, ComponentType.PROMPT)
# 确保获取到的确实是一个 BasePrompt 的子类
if component_class and issubclass(component_class, BasePrompt):
matching_rules.append((rule, component_class))
component_class = component_registry.get_component_class(prompt_name, ComponentType.PROMPT)
if not (component_class and issubclass(component_class, BasePrompt)):
logger.warning(f"无法为 '{prompt_name}' 加载静态规则,因为它不是一个有效的 Prompt 组件。")
continue
# 根据规则的优先级进行排序,数字越小,优先级越高,越先应用
matching_rules.sort(key=lambda x: x[0].priority)
return matching_rules
def create_provider(cls: type[BasePrompt]) -> Callable[[PromptParameters], Awaitable[str]]:
"""为静态组件创建一个内容提供者闭包。"""
async def content_provider(params: PromptParameters) -> str:
try:
p_info = component_registry.get_component_info(cls.prompt_name, ComponentType.PROMPT)
plugin_config = {}
if isinstance(p_info, PromptInfo):
plugin_config = component_registry.get_plugin_config(p_info.plugin_name)
instance = cls(params=params, plugin_config=plugin_config)
result = await instance.execute()
return str(result) if result is not None else ""
except Exception as e:
logger.error(f"执行静态规则提供者 '{cls.prompt_name}' 时出错: {e}", exc_info=True)
return ""
return content_provider
for rule in prompt_info.injection_rules:
provider = create_provider(component_class)
target_rules = self._dynamic_rules.setdefault(rule.target_prompt, {})
target_rules[prompt_name] = (rule, provider, "static_default")
self._initialized = True
logger.info("静态 Prompt 注入规则加载完成。")
async def add_injection_rule(
self,
prompt_name: str,
rule: InjectionRule,
content_provider: Callable[..., Awaitable[str]] | None = None,
source: str = "runtime",
) -> bool:
"""
动态添加或更新一条注入规则。
Args:
prompt_name (str): 动态注入组件名称。
rule (InjectionRule): 注入规则。
content_provider (Callable | None, optional): 动态内容提供者。
如果提供apply_injections 时会调用此函数获取注入内容。
函数签名应为: async def provider(params: "PromptParameters") -> str
source (str, optional): 规则来源,默认为 "runtime"
"""
if not content_provider:
logger.error(f"'{prompt_name}' 添加动态注入规则失败:必须提供 content_provider。")
return False
async with self._lock:
target_rules = self._dynamic_rules.setdefault(rule.target_prompt, {})
target_rules[prompt_name] = (rule, content_provider, source)
logger.info(f"成功添加/更新注入规则: '{prompt_name}' -> '{rule.target_prompt}'")
return True
async def remove_injection_rule(self, prompt_name: str, target_prompt: str) -> bool:
"""移除一条动态注入规则。"""
async with self._lock:
if target_prompt in self._dynamic_rules and prompt_name in self._dynamic_rules[target_prompt]:
del self._dynamic_rules[target_prompt][prompt_name]
if not self._dynamic_rules[target_prompt]:
del self._dynamic_rules[target_prompt]
logger.info(f"成功移除注入规则: '{prompt_name}' from '{target_prompt}'")
return True
logger.warning(f"尝试移除注入规则失败: 未找到 '{prompt_name}' on '{target_prompt}'")
return False
async def apply_injections(
self, target_prompt_name: str, original_template: str, params: PromptParameters
) -> str:
"""
获取、实例化并执行所有相关组件,然后根据注入规则修改原始模板。
这是一个三步走的过程:
1. 实例化所有需要执行的组件。
2. 并行执行它们的 `execute` 方法以获取注入内容。
3. 按照优先级顺序,将内容注入到原始模板中。
Args:
target_prompt_name (str): 目标 Prompt 的名称。
original_template (str): 原始的、未经修改的 Prompt 模板字符串。
params (PromptParameters): 传递给 Prompt 组件实例的参数。
Returns:
str: 应用了所有注入规则后,修改过的 Prompt 模板字符串。
【核心方法】根据目标名称,应用所有匹配的注入规则,返回修改后的模板。
"""
rules_with_classes = self._get_rules_for(target_prompt_name)
# 如果没有找到任何匹配的规则,就直接返回原始模板,啥也不干
if not rules_with_classes:
if not self._initialized:
self.load_static_rules()
# 1. 从 _dynamic_rules 中获取所有相关的规则
rules_for_target = list(self._dynamic_rules.get(target_prompt_name, {}).values())
if not rules_for_target:
return original_template
# --- 第一步: 实例化所有需要执行的组件 ---
instance_map = {} # 存储组件实例,虽然目前没直接用,但留着总没错
tasks = [] # 存放所有需要并行执行的 execute 异步任务
components_to_execute = [] # 存放需要执行的组件类,用于后续结果映射
# 2. 按优先级排序
rules_for_target.sort(key=lambda x: x[0].priority)
for rule, component_class in rules_with_classes:
# 如果注入类型是 REMOVE那就不需要执行组件了因为它不产生内容
# 3. 依次执行内容提供者 (content_provider) 并拼接模板
modified_template = original_template
for rule, provider, source in rules_for_target:
content = ""
if rule.injection_type != InjectionType.REMOVE:
try:
# 获取组件的元信息,主要是为了拿到插件名称来读取插件配置
prompt_info = component_registry.get_component_info(
component_class.prompt_name, ComponentType.PROMPT
)
if not isinstance(prompt_info, PromptInfo):
plugin_config = {}
else:
# 从注册表获取该组件所属插件的配置
plugin_config = component_registry.get_plugin_config(prompt_info.plugin_name)
# 实例化组件,并传入参数和插件配置
instance = component_class(params=params, plugin_config=plugin_config)
instance_map[component_class.prompt_name] = instance
# 将组件的 execute 方法作为一个任务添加到列表中
tasks.append(instance.execute())
components_to_execute.append(component_class)
content = await provider(params)
except Exception as e:
logger.error(f"实例化 Prompt 组件 '{component_class.prompt_name}' 失败: {e}")
# 即使失败,也添加一个立即完成的空任务,以保持与其他任务的索引同步
tasks.append(asyncio.create_task(asyncio.sleep(0, result=e))) # type: ignore
# --- 第二步: 并行执行所有组件的 execute 方法 ---
# 使用 asyncio.gather 来同时运行所有任务,提高效率
results = await asyncio.gather(*tasks, return_exceptions=True)
# 创建一个从组件名到执行结果的映射,方便后续查找
result_map = {
components_to_execute[i].prompt_name: res
for i, res in enumerate(results)
if not isinstance(res, Exception) # 只包含成功的结果
}
# 单独处理并记录执行失败的组件
for i, res in enumerate(results):
if isinstance(res, Exception):
logger.error(f"执行 Prompt 组件 '{components_to_execute[i].prompt_name}' 失败: {res}")
# --- 第三步: 按优先级顺序应用注入规则 ---
modified_template = original_template
for rule, component_class in rules_with_classes:
# 从结果映射中获取该组件生成的内容
content = result_map.get(component_class.prompt_name)
logger.error(f"执行规则 '{rule}' (来源: {source}) 的内容提供者时失败: {e}")
continue # 跳过失败的 provider
try:
if rule.injection_type == InjectionType.PREPEND:
@@ -136,28 +156,72 @@ class PromptComponentManager:
if content:
modified_template = f"{modified_template}\n{content}"
elif rule.injection_type == InjectionType.REPLACE:
# 使用正则表达式替换目标内容
if content and rule.target_content:
if content is not None and rule.target_content:
modified_template = re.sub(rule.target_content, str(content), modified_template)
elif rule.injection_type == InjectionType.INSERT_AFTER:
# 在匹配到的内容后面插入
if content and rule.target_content:
# re.sub a little trick: \g<0> represents the entire matched string
replacement = f"\\g<0>\n{content}"
modified_template = re.sub(rule.target_content, replacement, modified_template)
elif rule.injection_type == InjectionType.REMOVE:
# 使用正则表达式移除目标内容
if rule.target_content:
modified_template = re.sub(rule.target_content, "", modified_template)
except re.error as e:
logger.error(
f"在为 '{component_class.prompt_name}' 应用规则时发生正则错误: {e} (pattern: '{rule.target_content}')"
)
logger.error(f"应用规则时发生正则错误: {e} (pattern: '{rule.target_content}')")
except Exception as e:
logger.error(f"应用 Prompt 注入规则 '{rule}' 失败: {e}")
logger.error(f"应用注入规则 '{rule}' 失败: {e}")
return modified_template
# --- 状态查询API ---
def get_core_prompts(self) -> list[str]:
"""获取所有已注册的核心提示词模板名称列表(即所有可注入的目标)。"""
return list(global_prompt_manager._prompts.keys())
def get_core_prompt_contents(self) -> dict[str, str]:
"""获取所有核心提示词模板的原始内容。"""
return {name: prompt.template for name, prompt in global_prompt_manager._prompts.items()}
def get_registered_prompt_components(self) -> list[PromptInfo]:
"""获取所有在 ComponentRegistry 中注册的 Prompt 组件信息。"""
components = component_registry.get_components_by_type(ComponentType.PROMPT).values()
return [info for info in components if isinstance(info, PromptInfo)]
async def get_full_injection_map(self) -> dict[str, list[dict]]:
"""获取当前完整的注入映射图。"""
injection_map = {}
async with self._lock:
all_targets = set(self._dynamic_rules.keys()) | set(self.get_core_prompts())
for target in all_targets:
rules = self._dynamic_rules.get(target, {})
if not rules:
injection_map[target] = []
continue
info_list = []
for prompt_name, (rule, _, source) in rules.items():
info_list.append(
{"name": prompt_name, "priority": rule.priority, "source": source}
)
info_list.sort(key=lambda x: x["priority"])
injection_map[target] = info_list
return injection_map
async def get_injections_for_prompt(self, target_prompt_name: str) -> list[dict]:
"""获取指定核心提示词模板的所有注入信息。"""
full_map = await self.get_full_injection_map()
return full_map.get(target_prompt_name, [])
def get_all_dynamic_rules(self) -> dict[str, dict[str, 'InjectionRule']]:
"""获取所有当前的动态注入规则,以 InjectionRule 对象形式返回。"""
rules_copy = {}
# 只返回规则对象,隐藏 provider 实现细节
for target, rules in self._dynamic_rules.items():
target_copy = {}
for name, (rule, _, _) in rules.items():
target_copy[name] = rule
rules_copy[target] = target_copy
return copy.deepcopy(rules_copy)
# 创建全局单例
prompt_component_manager = PromptComponentManager()