Merge branch 'dev' into mofox-bus

This commit is contained in:
Windpicker-owo
2025-11-26 22:25:39 +08:00
34 changed files with 1943 additions and 1199 deletions

View File

@@ -29,14 +29,13 @@ class PromptComponentManager:
def __init__(self):
"""初始化管理器实例。"""
# _dynamic_rules 是管理器的核心状态,存储所有注入规则。
# _dynamic_rules 存储通过 API 在运行时动态添加/修改的规则。
# 这是实现提示词动态性的核心数据结构。
# 结构: {
# "target_prompt_name": {
# "prompt_component_name": (InjectionRule, content_provider, source)
# "target_prompt_name": { // 目标 Prompt 的名称
# "prompt_component_name": (InjectionRule, content_provider, source) // 注入组件的规则详情
# }
# }
# content_provider 是一个异步函数,用于在应用规则时动态生成注入内容。
# source 记录了规则的来源(例如 "static_default" 或 "runtime")。
self._dynamic_rules: dict[str, dict[str, tuple[InjectionRule, Callable[..., Awaitable[str]], str]]] = {}
self._lock = asyncio.Lock() # 使用异步锁确保对 _dynamic_rules 的并发访问安全。
self._initialized = False # 标记静态规则是否已加载,防止重复加载。
@@ -139,9 +138,13 @@ class PromptComponentManager:
Returns:
bool: 如果成功添加或更新,则返回 True。
"""
# 加锁以保证多协程环境下的数据一致性
async with self._lock:
# 遍历所有待添加的规则
for rule in rules:
# 使用 setdefault 确保目标 prompt 的规则字典存在
target_rules = self._dynamic_rules.setdefault(rule.target_prompt, {})
# 添加或覆盖指定组件的规则、内容提供者和来源
target_rules[prompt_name] = (rule, content_provider, source)
logger.info(f"成功添加/更新注入规则: '{prompt_name}' -> '{rule.target_prompt}' (来源: {source})")
return True
@@ -163,15 +166,16 @@ class PromptComponentManager:
如果未找到该组件的任何现有规则(无法复用),则返回 False。
"""
async with self._lock:
# 步骤 1: 查找现有的 content_provider 和 source
# 步骤 1: 遍历所有动态规则,查找指定组件已存在的 provider 和 source
found_provider: Callable[..., Awaitable[str]] | None = None
found_source: str | None = None
for target_rules in self._dynamic_rules.values():
if prompt_name in target_rules:
# 如果找到,记录其 provider 和 source 并跳出循环
_, found_provider, found_source = target_rules[prompt_name]
break
# 步骤 2: 如果找到 provider则操作失败
# 步骤 2: 如果遍历完仍未找到 provider说明该组件无任何规则,无法复用
if not found_provider:
logger.warning(
f"尝试为组件 '{prompt_name}' 添加规则失败: "
@@ -180,7 +184,7 @@ class PromptComponentManager:
return False
# 步骤 3: 使用找到的 provider 和 source 添加新规则
source_to_use = found_source or "runtime" # 提供一个默认值以防万一
source_to_use = found_source or "runtime" # 如果 source 为 None提供默认值
target_rules = self._dynamic_rules.setdefault(rule.target_prompt, {})
target_rules[prompt_name] = (rule, found_provider, source_to_use)
logger.info(
@@ -201,13 +205,16 @@ class PromptComponentManager:
bool: 如果成功移除,则返回 True如果规则不存在则返回 False。
"""
async with self._lock:
# 检查目标和组件规则是否存在
if target_prompt in self._dynamic_rules and prompt_name in self._dynamic_rules[target_prompt]:
# 存在则删除
del self._dynamic_rules[target_prompt][prompt_name]
# 如果目标下已无任何规则,则清理掉这个
# 如果删除后,该目标下已无任何规则,则清理掉这个目标键,保持数据结构整洁
if not self._dynamic_rules[target_prompt]:
del self._dynamic_rules[target_prompt]
logger.info(f"成功移除注入规则: '{prompt_name}' from '{target_prompt}'")
return True
# 如果规则不存在,记录警告并返回 False
logger.warning(f"尝试移除注入规则失败: 未找到 '{prompt_name}' on '{target_prompt}'")
return False
@@ -228,7 +235,9 @@ class PromptComponentManager:
async with self._lock:
# 创建一个目标列表的副本进行迭代,因为我们可能会在循环中修改字典
for target_prompt in list(self._dynamic_rules.keys()):
# 检查当前目标下是否存在该组件的规则
if prompt_name in self._dynamic_rules[target_prompt]:
# 存在则删除
del self._dynamic_rules[target_prompt][prompt_name]
removed = True
logger.info(f"成功移除注入规则: '{prompt_name}' from '{target_prompt}'")
@@ -243,6 +252,77 @@ class PromptComponentManager:
return removed
# --- 核心注入逻辑 ---
def _create_content_provider(
self, component_name: str, component_class: type[BasePrompt]
) -> Callable[[PromptParameters, str], Awaitable[str]]:
"""为指定的组件类创建一个标准化的内容提供者闭包。"""
async def content_provider(params: PromptParameters, target_prompt_name: str) -> str:
"""实际执行内容生成的异步函数。"""
try:
# 从注册表获取组件信息,用于后续获取插件配置
p_info = component_registry.get_component_info(component_name, ComponentType.PROMPT)
plugin_config = {}
if isinstance(p_info, PromptInfo):
# 获取该组件所属插件的配置
plugin_config = component_registry.get_plugin_config(p_info.plugin_name)
# 实例化组件,并传入所需参数
instance = component_class(
params=params, plugin_config=plugin_config, target_prompt_name=target_prompt_name
)
# 执行组件的 execute 方法以生成内容
result = await instance.execute()
# 确保返回的是字符串
return str(result) if result is not None else ""
except Exception as e:
# 捕获并记录执行过程中的任何异常,返回空字符串以避免注入失败
logger.error(f"执行规则提供者 '{component_name}' 时出错: {e}", exc_info=True)
return ""
return content_provider
async def _build_rules_for_target(self, target_prompt_name: str) -> list:
"""在注入时动态构建目标的所有有效规则列表。"""
all_rules = []
# 1. 从 component_registry 获取所有静态组件的规则
static_components = component_registry.get_components_by_type(ComponentType.PROMPT)
for name, info in static_components.items():
if not isinstance(info, PromptInfo):
continue
# 实时检查组件是否已启用,跳过禁用的组件
if not component_registry.is_component_available(name, ComponentType.PROMPT):
continue
# 获取组件的类定义
component_class = component_registry.get_component_class(name, ComponentType.PROMPT)
if not (component_class and issubclass(component_class, BasePrompt)):
continue
# 为该组件创建一个内容提供者
provider = self._create_content_provider(name, component_class)
# 遍历组件定义的所有注入规则
for rule in info.injection_rules:
# 如果规则的目标与当前目标匹配,则添加到列表中
if rule.target_prompt == target_prompt_name:
all_rules.append((rule, provider, "static"))
# 2. 从 _dynamic_rules 获取所有纯运行时规则
async with self._lock:
runtime_rules = self._dynamic_rules.get(target_prompt_name, {})
for name, (rule, provider, source) in runtime_rules.items():
# 检查该运行时规则是否关联到一个已注册的静态组件
static_info = component_registry.get_component_info(name, ComponentType.PROMPT)
# 如果关联的静态组件存在且被禁用,则跳过此运行时规则
if static_info and not component_registry.is_component_available(name, ComponentType.PROMPT):
logger.debug(f"跳过运行时规则 '{name}',因为它关联的静态组件当前已禁用。")
continue
# 将有效的运行时规则添加到列表
all_rules.append((rule, provider, source))
return all_rules
async def apply_injections(
self, target_prompt_name: str, original_template: str, params: PromptParameters
@@ -268,20 +348,19 @@ class PromptComponentManager:
Returns:
str: 应用了所有注入规则后,最终生成的提示词模板字符串。
"""
if not self._initialized:
self.load_static_rules()
rules_for_target = list(self._dynamic_rules.get(target_prompt_name, {}).values())
# 构建适用于当前目标的所有规则
rules_for_target = await self._build_rules_for_target(target_prompt_name)
if not rules_for_target:
# 如果没有规则,直接返回原始模板
return original_template
# --- 占位符保护机制 ---
# 1. 保护: 找到所有 {placeholder} 并用临时标记替换
placeholders = re.findall(r"({[^{}]+})", original_template)
placeholder_map: dict[str, str] = {
f"__PROMPT_PLACEHOLDER_{i}__": p for i, p in enumerate(placeholders)
}
}
# 1. 保护: 将占位符替换为临时标记
protected_template = original_template
for marker, placeholder in placeholder_map.items():
protected_template = protected_template.replace(placeholder, marker)
@@ -290,6 +369,7 @@ class PromptComponentManager:
for rule, _, source in rules_for_target:
if rule.injection_type in (InjectionType.REMOVE, InjectionType.REPLACE) and rule.target_content:
try:
# 检查规则的 target_content (正则) 是否可能匹配到任何一个占位符
for p in placeholders:
if re.search(rule.target_content, p):
logger.warning(
@@ -297,10 +377,10 @@ class PromptComponentManager:
f"规则 `target_content` ('{rule.target_content}') "
f"可能会影响核心占位符 '{p}'。为保证系统稳定,该占位符已被保护,不会被此规则修改。"
)
# 只对每个规则警告一次
# 每个规则警告一次
break
except re.error:
# 正则表达式本身有误,后执行时会再次捕获,这里可忽略
# 如果正则表达式本身有误,后执行时会捕获,此处可忽略
pass
# 3. 安全执行: 按优先级排序并应用规则
@@ -309,13 +389,16 @@ class PromptComponentManager:
modified_template = protected_template
for rule, provider, source in rules_for_target:
content = ""
# REMOVE 类型不需要生成内容
if rule.injection_type != InjectionType.REMOVE:
try:
# 调用内容提供者生成要注入的文本
content = await provider(params, target_prompt_name)
except Exception as e:
logger.error(f"执行规则 '{rule}' (来源: {source}) 的内容提供者时失败: {e}")
continue
# 应用注入规则
try:
if rule.injection_type == InjectionType.PREPEND:
if content:
@@ -328,6 +411,7 @@ class PromptComponentManager:
modified_template = re.sub(rule.target_content, str(content), modified_template)
elif rule.injection_type == InjectionType.INSERT_AFTER:
if content and rule.target_content:
# 使用 \\g<0> 在匹配项后插入内容
replacement = f"\\g<0>\n{content}"
modified_template = re.sub(rule.target_content, replacement, modified_template)
elif rule.injection_type == InjectionType.REMOVE:
@@ -338,7 +422,7 @@ class PromptComponentManager:
except Exception as e:
logger.error(f"应用注入规则 '{rule}' (来源: {source}) 失败: {e}")
# 4. 占位符恢复
# 4. 占位符恢复: 将临时标记替换回原始的占位符
final_template = modified_template
for marker, placeholder in placeholder_map.items():
final_template = final_template.replace(marker, placeholder)
@@ -362,8 +446,9 @@ class PromptComponentManager:
str: 模拟生成的最终提示词模板字符串。如果找不到模板,则返回错误信息。
"""
try:
# 从全局提示词管理器获取最原始的模板内容
# 动态导入以避免循环依赖
from src.chat.utils.prompt import global_prompt_manager
# 从全局管理器获取原始的、未经修改的提示词对象
original_prompt = global_prompt_manager._prompts.get(target_prompt_name)
if not original_prompt:
logger.warning(f"无法预览 '{target_prompt_name}',因为找不到这个核心 Prompt。")
@@ -373,14 +458,16 @@ class PromptComponentManager:
logger.warning(f"无法预览 '{target_prompt_name}',因为找不到这个核心 Prompt。")
return f"Error: Prompt '{target_prompt_name}' not found."
# 直接调用核心注入逻辑来模拟结果
# 直接调用核心注入逻辑来模拟并返回结果
return await self.apply_injections(target_prompt_name, original_template, params)
# --- 状态观测与查询 API ---
def get_core_prompts(self) -> list[str]:
"""获取所有已注册的核心提示词模板名称列表(即所有可注入的目标)。"""
# 动态导入以避免循环依赖
from src.chat.utils.prompt import global_prompt_manager
# 返回所有核心 prompt 的名称列表
return list(global_prompt_manager._prompts.keys())
def get_core_prompt_contents(self, prompt_name: str | None = None) -> list[list[str]]:
@@ -400,60 +487,50 @@ class PromptComponentManager:
from src.chat.utils.prompt import global_prompt_manager
if prompt_name:
# 如果指定了名称,则查找并返回单个模板
prompt = global_prompt_manager._prompts.get(prompt_name)
return [[prompt_name, prompt.template]] if prompt else []
# 如果未指定名称,则返回所有模板的列表
return [[name, prompt.template] for name, prompt in global_prompt_manager._prompts.items()]
def get_registered_prompt_component_info(self) -> list[PromptInfo]:
async def get_registered_prompt_component_info(self) -> list[PromptInfo]:
"""
获取所有已注册和动态添加的Prompt组件信息并反映当前的注入规则状态。
该方法会合并静态注册的组件信息和运行时的动态注入规则,
确保返回的 `PromptInfo` 列表能够准确地反映系统当前的完整状态。
Returns:
list[PromptInfo]: 一个包含所有静态和动态Prompt组件信息的列表。
每个组件的 `injection_rules` 都会被更新为当前实际生效的规则。
此方法现在直接从 component_registry 获取静态组件信息,并合并纯运行时的组件信息。
"""
# 步骤 1: 获取所有静态注册的组件信息,并使用深拷贝以避免修改原始数据
static_components = component_registry.get_components_by_type(ComponentType.PROMPT)
# 使用深拷贝以避免修改原始注册表数据
info_dict: dict[str, PromptInfo] = {
name: copy.deepcopy(info) for name, info in static_components.items() if isinstance(info, PromptInfo)
}
# 从注册表获取所有注册的静态 Prompt 组件信息
all_components = component_registry.get_components_by_type(ComponentType.PROMPT)
info_list = [info for info in all_components.values() if isinstance(info, PromptInfo)]
# 步骤 2: 遍历动态规则,识别并创建纯动态组件的 PromptInfo
all_dynamic_component_names = set()
for target, rules in self._dynamic_rules.items():
for prompt_name, (rule, _, source) in rules.items():
all_dynamic_component_names.add(prompt_name)
# 检查并合并仅在运行时通过 API 添加的“纯动态组件
async with self._lock:
runtime_component_names = set()
# 收集所有动态规则中涉及的组件名称
for rules in self._dynamic_rules.values():
runtime_component_names.update(rules.keys())
for name in all_dynamic_component_names:
if name not in info_dict:
# 这是一个纯动态组件,为其创建一个新的 PromptInfo
info_dict[name] = PromptInfo(
static_component_names = {info.name for info in info_list}
# 找出那些只存在于动态规则中,但未在静态组件中注册的名称
pure_dynamic_names = runtime_component_names - static_component_names
for name in pure_dynamic_names:
# 为这些“纯动态”组件创建一个临时的信息对象
dynamic_info = PromptInfo(
name=name,
component_type=ComponentType.PROMPT,
description="Dynamically added component",
plugin_name="runtime", # 动态组件通常没有插件归属
description="Dynamically added runtime component",
plugin_name="runtime",
is_built_in=False,
)
# 从动态规则中收集并关联其所有注入规则
for target, rules_in_target in self._dynamic_rules.items():
if name in rules_in_target:
rule, _, _ = rules_in_target[name]
dynamic_info.injection_rules.append(rule)
info_list.append(dynamic_info)
# 步骤 3: 清空所有组件的注入规则,准备用当前状态重新填充
for info in info_dict.values():
info.injection_rules = []
# 步骤 4: 再次遍历动态规则,为每个组件重建其 injection_rules 列表
for target, rules in self._dynamic_rules.items():
for prompt_name, (rule, _, _) in rules.items():
if prompt_name in info_dict:
# 确保规则是 InjectionRule 的实例
if isinstance(rule, InjectionRule):
info_dict[prompt_name].injection_rules.append(rule)
# 步骤 5: 返回最终的 PromptInfo 对象列表
return list(info_dict.values())
return info_list
async def get_injection_info(
self,
@@ -462,60 +539,51 @@ class PromptComponentManager:
) -> dict[str, list[dict]]:
"""
获取注入信息的映射图,可按目标筛选,并可控制信息的详细程度。
- `get_injection_info()` 返回所有目标的摘要注入信息。
- `get_injection_info(target_prompt="...")` 返回指定目标的摘要注入信息。
- `get_injection_info(detailed=True)` 返回所有目标的详细注入信息。
- `get_injection_info(target_prompt="...", detailed=True)` 返回指定目标的详细注入信息。
Args:
target_prompt (str, optional): 如果指定,仅返回该目标的注入信息。
detailed (bool, optional): 如果为 True则返回包含注入类型和内容的详细信息。
默认为 False返回摘要信息。
Returns:
dict[str, list[dict]]: 一个字典,键是目标提示词名称,
值是按优先级排序的注入信息列表。
此方法现在动态构建信息,以反映当前启用的组件和规则。
"""
info_map = {}
async with self._lock:
all_targets = set(self._dynamic_rules.keys()) | set(self.get_core_prompts())
all_core_prompts = self.get_core_prompts()
# 确定要处理的目标:如果指定了有效的目标,则只处理它;否则处理所有核心 prompt
targets_to_process = [target_prompt] if target_prompt and target_prompt in all_core_prompts else all_core_prompts
# 如果指定了目标,则只处理该目标
targets_to_process = [target_prompt] if target_prompt and target_prompt in all_targets else sorted(all_targets)
for target in targets_to_process:
# 动态构建该目标的所有有效规则
rules_for_target = await self._build_rules_for_target(target)
if not rules_for_target:
info_map[target] = []
continue
for target in targets_to_process:
rules = self._dynamic_rules.get(target, {})
if not rules:
info_map[target] = []
continue
info_list = []
for rule, _, source in rules_for_target:
# 从规则对象中获取其所属组件的名称
prompt_name = rule.owner_component
if detailed:
# 如果需要详细信息,则添加更多字段
info_list.append(
{
"name": prompt_name,
"priority": rule.priority,
"source": source,
"injection_type": rule.injection_type.value,
"target_content": rule.target_content,
}
)
else:
# 否则只添加基本信息
info_list.append({"name": prompt_name, "priority": rule.priority, "source": source})
info_list = []
for prompt_name, (rule, _, source) in rules.items():
if detailed:
info_list.append(
{
"name": prompt_name,
"priority": rule.priority,
"source": source,
"injection_type": rule.injection_type.value,
"target_content": rule.target_content,
}
)
else:
info_list.append({"name": prompt_name, "priority": rule.priority, "source": source})
info_list.sort(key=lambda x: x["priority"])
info_map[target] = info_list
# 按优先级对结果进行排序
info_list.sort(key=lambda x: x["priority"])
info_map[target] = info_list
return info_map
def get_injection_rules(
async def get_injection_rules(
self,
target_prompt: str | None = None,
component_name: str | None = None,
) -> dict[str, dict[str, "InjectionRule"]]:
"""
获取动态注入规则,可通过目标或组件名称进行筛选。
获取所有(包括静态和运行时)注入规则,可通过目标或组件名称进行筛选。
- 不提供任何参数时,返回所有规则。
- 提供 `target_prompt` 时,仅返回注入到该目标的规则。
@@ -527,44 +595,44 @@ class PromptComponentManager:
component_name (str, optional): 按注入组件名称筛选。
Returns:
dict[str, dict[str, InjectionRule]]: 一个深拷贝的规则字典。
dict[str, dict[str, InjectionRule]]: 一个包含所有匹配规则的深拷贝字典。
结构: { "target_prompt": { "component_name": InjectionRule } }
"""
rules_copy = {}
# 筛选目标
targets_to_check = [target_prompt] if target_prompt else self._dynamic_rules.keys()
all_rules: dict[str, dict[str, InjectionRule]] = {}
for target in targets_to_check:
if target not in self._dynamic_rules:
# 1. 收集所有静态组件的规则
static_components = component_registry.get_components_by_type(ComponentType.PROMPT)
for name, info in static_components.items():
if not isinstance(info, PromptInfo):
continue
# 如果指定了 component_name 且不匹配,则跳过此组件
if component_name and name != component_name:
continue
rules_for_target = self._dynamic_rules[target]
target_copy = {}
for rule in info.injection_rules:
# 如果指定了 target_prompt 且不匹配,则跳过此规则
if target_prompt and rule.target_prompt != target_prompt:
continue
target_dict = all_rules.setdefault(rule.target_prompt, {})
target_dict[name] = rule
# 筛选组件
if component_name:
if component_name in rules_for_target:
rule, _, _ = rules_for_target[component_name]
target_copy[component_name] = rule
else:
for name, (rule, _, _) in rules_for_target.items():
target_copy[name] = rule
# 2. 收集并合并所有运行时规则
async with self._lock:
for target, rules_in_target in self._dynamic_rules.items():
# 如果指定了 target_prompt 且不匹配,则跳过此目标下的所有规则
if target_prompt and target != target_prompt:
continue
if target_copy:
rules_copy[target] = target_copy
for name, (rule, _, _) in rules_in_target.items():
# 如果指定了 component_name 且不匹配,则跳过此规则
if component_name and name != component_name:
continue
target_dict = all_rules.setdefault(target, {})
# 运行时规则会覆盖同名的静态规则
target_dict[name] = rule
# 如果是按组件筛选且未指定目标,则需遍历所有目标
if component_name and not target_prompt:
found_rules = {}
for target, rules in self._dynamic_rules.items():
if component_name in rules:
rule, _, _ = rules[component_name]
if target not in found_rules:
found_rules[target] = {}
found_rules[target][component_name] = rule
return copy.deepcopy(found_rules)
return copy.deepcopy(rules_copy)
# 返回深拷贝以防止外部修改影响内部状态
return copy.deepcopy(all_rules)
# 创建全局单例 (Singleton)