diff --git a/src/chat/utils/prompt.py b/src/chat/utils/prompt.py index 0738d7cad..c4d23d5f4 100644 --- a/src/chat/utils/prompt.py +++ b/src/chat/utils/prompt.py @@ -26,51 +26,80 @@ logger = get_logger("unified_prompt") class PromptContext: - """提示词上下文管理器""" + """提示词上下文管理器. + + 该类用于创建临时的、隔离的提示词作用域,尤其适用于异步环境中的并发消息处理。 + 它使用`contextvars`来确保每个协程(例如,处理单个消息的协程)都拥有自己独立的 + 提示词注册表,从而避免上下文混淆。 + """ def __init__(self): + """初始化上下文管理器.""" + # _context_prompts: 存储按上下文ID组织的提示词字典。 + # 格式: {"context_id_1": {"prompt_name_1": Prompt_obj_1}} self._context_prompts: dict[str, dict[str, "Prompt"]] = {} + + # _current_context_var: 使用contextvars来存储当前协程的上下文ID。 + # 这确保了在并发执行的异步任务中,每个任务都能访问到正确的上下文ID。 self._current_context_var = contextvars.ContextVar("current_context", default=None) + + # _context_lock: 一个异步锁,用于保护对共享资源_context_prompts的并发访问。 self._context_lock = asyncio.Lock() @property def _current_context(self) -> str | None: - """获取当前协程的上下文ID""" + """获取当前协程的上下文ID.""" return self._current_context_var.get() @_current_context.setter def _current_context(self, value: str | None): - """设置当前协程的上下文ID""" + """设置当前协程的上下文ID.""" self._current_context_var.set(value) # type: ignore @asynccontextmanager async def async_scope(self, context_id: str | None = None): - """创建一个异步的临时提示模板作用域""" + """创建一个异步的临时提示模板作用域. + + 在此作用域内注册或获取的提示词将是临时的,并且只对当前协程可见。 + 这对于处理单个消息时需要临时修改或添加提示词的场景非常有用。 + + Args: + context_id (str | None): 上下文ID,通常是消息ID。如果为None,则不创建新的作用域。 + """ if context_id is not None: + # 尝试获取锁以安全地初始化上下文 try: await asyncio.wait_for(self._context_lock.acquire(), timeout=5.0) try: + # 如果是新的上下文ID,为其创建一个空的提示词字典 if context_id not in self._context_prompts: self._context_prompts[context_id] = {} finally: + # 确保锁总是被释放 self._context_lock.release() except asyncio.TimeoutError: + # 如果获取锁超时,记录警告并放弃创建此作用域 logger.warning(f"获取上下文锁超时,context_id: {context_id}") context_id = None + # 设置当前协程的上下文ID previous_context = self._current_context - token = self._current_context_var.set(context_id) if context_id else None # type: ignore + token = self._current_context_var.set(context_id) if context_id else None # type: ignore else: + # 如果没有提供context_id,则不改变当前上下文 previous_context = self._current_context token = None try: + # 进入作用域 yield self finally: + # 退出作用域时,恢复之前的上下文 if context_id is not None and token is not None: try: self._current_context_var.reset(token) except Exception as e: + # 如果重置失败,尝试手动恢复,作为最后的保障 logger.warning(f"恢复上下文时出错: {e}") try: self._current_context = previous_context @@ -78,10 +107,11 @@ class PromptContext: ... async def get_prompt_async(self, name: str) -> Optional["Prompt"]: - """异步获取当前作用域中的提示模板""" + """异步、安全地获取当前作用域中的提示模板.""" async with self._context_lock: current_context = self._current_context logger.debug(f"获取提示词: {name} 当前上下文: {current_context}") + # 检查当前上下文是否存在,并且提示词是否已在该上下文中注册 if ( current_context and current_context in self._context_prompts @@ -91,53 +121,92 @@ class PromptContext: return None async def register_async(self, prompt: "Prompt", context_id: str | None = None) -> None: - """异步注册提示模板到指定作用域""" + """异步、安全地将提示模板注册到指定的作用域. + + 如果未指定context_id,则注册到当前协程的上下文中。 + """ async with self._context_lock: + # 确定目标上下文ID if target_context := context_id or self._current_context: if prompt.name: + # 使用setdefault确保目标上下文的字典存在,然后注册prompt self._context_prompts.setdefault(target_context, {})[prompt.name] = prompt class PromptManager: - """统一提示词管理器""" + """统一提示词管理器. + + 作为全局单例(global_prompt_manager)存在,负责管理所有全局注册的提示词模板。 + 它与PromptContext协作,实现了上下文优先的提示词检索策略,并支持动态的插件内容注入。 + """ def __init__(self): - self._prompts = {} - self._counter = 0 - self._context = PromptContext() + """初始化管理器.""" + self._prompts = {} # 全局提示词注册表 + self._counter = 0 # 用于为未命名提示词生成唯一名称 + self._context = PromptContext() # 上下文管理器实例 @asynccontextmanager async def async_message_scope(self, message_id: str | None = None): - """为消息处理创建异步临时作用域""" + """为单个消息处理流程创建一个异步的临时提示词作用域. + + 这是一个便捷的封装,直接使用了PromptContext的async_scope。 + + Args: + message_id (str | None): 消息ID,用作上下文ID。 + """ async with self._context.async_scope(message_id): yield self async def get_prompt_async(self, name: str, parameters: PromptParameters | None = None) -> "Prompt": - """ - 异步获取提示模板,并动态注入插件内容 + """异步获取提示模板,并动态地将插件内容注入其中. + + 获取提示词的优先级顺序为: + 1. 当前协程的上下文作用域 (通过 `_context.get_prompt_async`) + 2. 全局注册表 + + 核心功能是动态注入:在获取到原始模板后,它会检查是否有插件注册了 + 针对此提示词(`injection_point`)的内容。如果有,它会创建一个新的、 + 临时的、包含了注入内容的Prompt实例返回,而不会污染全局注册表。 + + Args: + name (str): 提示词的名称。 + parameters (PromptParameters | None): 用于插件内容注入的参数。 + + Returns: + Prompt: 最终的(可能已被注入内容的)Prompt实例。 + + Raises: + KeyError: 如果找不到指定名称的提示词。 """ original_prompt = None + # 1. 优先从当前上下文获取 context_prompt = await self._context.get_prompt_async(name) if context_prompt is not None: logger.debug(f"从上下文中获取提示词: {name} {context_prompt}") original_prompt = context_prompt + # 2. 否则,从全局注册表获取 elif name in self._prompts: original_prompt = self._prompts[name] else: raise KeyError(f"Prompt '{name}' not found") - # 动态注入插件内容 + # --- 动态注入插件内容 --- if original_prompt.name: - # 确保我们有有效的parameters实例 + # 确保我们有有效的parameters实例用于注入逻辑 params_for_injection = parameters or original_prompt.parameters + # 从组件管理器获取需要注入的内容 components_prefix = await prompt_component_manager.execute_components_for( injection_point=original_prompt.name, params=params_for_injection ) + # 如果有内容需要注入 if components_prefix: logger.info(f"为'{name}'注入插件内容: \n{components_prefix}") - # 创建一个新的临时Prompt实例,不进行注册 + # 将注入内容与原始模板拼接 new_template = f"{components_prefix}\n\n{original_prompt.template}" + # 创建一个新的、临时的Prompt实例。`should_register=False`是关键, + # 它防止了这个临时版本污染全局或上下文注册表。 temp_prompt = Prompt( template=new_template, name=original_prompt.name, @@ -146,31 +215,40 @@ class PromptManager: ) return temp_prompt + # 如果没有注入内容,返回原始的提示词实例 return original_prompt def generate_name(self, template: str) -> str: - """为未命名的prompt生成名称""" + """为未命名的prompt生成一个唯一的名称.""" self._counter += 1 return f"prompt_{self._counter}" def register(self, prompt: "Prompt") -> None: - """注册一个prompt""" + """在全局注册表中注册一个prompt. + + 如果prompt没有名称,会自动为其生成一个。 + """ if not prompt.name: prompt.name = self.generate_name(prompt.template) self._prompts[prompt.name] = prompt def add_prompt(self, name: str, fstr: str) -> "Prompt": - """添加新提示模板""" + """通过名称和模板字符串快速添加一个新的全局提示模板.""" prompt = Prompt(fstr, name=name) if prompt.name: self._prompts[prompt.name] = prompt return prompt async def format_prompt(self, name: str, **kwargs) -> str: - """格式化提示模板""" - # 提取parameters用于注入 + """格式化一个提示模板. + + 这是格式化操作的主要入口。它会先通过`get_prompt_async`获取 + 最新的、可能已被注入内容的模板,然后再执行格式化。 + """ + # 提取parameters参数,因为它需要被传递给get_prompt_async以进行正确的注入 parameters = kwargs.get("parameters") prompt = await self.get_prompt_async(name, parameters=parameters) + # 使用所有提供的关键字参数格式化最终的模板 result = prompt.format(**kwargs) return result @@ -180,12 +258,16 @@ global_prompt_manager = PromptManager() class Prompt: - """ - 统一提示词类 - 合并模板管理和智能构建功能 - 真正的Prompt类,支持模板管理和智能上下文构建 + """统一提示词类 - 融合了模板管理和智能构建功能. + + 这是系统的核心类。一个`Prompt`实例不仅是一个简单的字符串模板,更是一个 + 能够根据复杂的`PromptParameters`动态、异步地构建自身完整内容的“构建器”。 + 它负责调用各种子系统(如记忆、工具、知识库等)来收集上下文信息, + 并将这些信息整合到最终的提示词中。 """ - # 临时标记,作为类常量 + # 使用临时标记来处理模板中的转义花括号 `\{` 和 `\}` + # 这是为了防止它们在 `format` 方法中被错误地解释为占位符 _TEMP_LEFT_BRACE = "__ESCAPED_LEFT_BRACE__" _TEMP_RIGHT_BRACE = "__ESCAPED_RIGHT_BRACE__" @@ -196,31 +278,32 @@ class Prompt: parameters: PromptParameters | None = None, should_register: bool = True, ): - """ - 初始化统一提示词 + """初始化一个统一提示词实例. Args: - template: 提示词模板字符串 - name: 提示词名称 - parameters: 构建参数 - should_register: 是否自动注册到全局管理器 + template (str): 提示词模板字符串,例如 "你好, {user_name}!"。 + name (str | None): 提示词的唯一名称,用于注册和检索。 + parameters (PromptParameters | None): 控制智能构建过程的参数对象。 + should_register (bool): 是否应将此实例自动注册到全局管理器。 + 在创建临时或动态修改的Prompt时应设为False。 """ self.template = template self.name = name self.parameters = parameters or PromptParameters() - self.args = self._parse_template_args(template) - self._formatted_result = "" + self.args = self._parse_template_args(template) # 解析模板中的占位符 + self._formatted_result = "" # 存储最后一次格式化或构建的结果 - # 预处理模板中的转义花括号 + # 预处理模板,将转义的花括号替换为临时标记 self._processed_template = self._process_escaped_braces(template) - # 自动注册 + # 根据`should_register`标志和当前是否处于一个临时上下文中来决定是否进行全局注册 + # 如果在`async_scope`内,则不进行全局注册,由调用者决定是否进行上下文注册 if should_register and not global_prompt_manager._context._current_context: global_prompt_manager.register(self) @staticmethod def _process_escaped_braces(template) -> str: - """处理模板中的转义花括号""" + """预处理模板,将 `\{` 和 `\}` 替换为临时标记.""" if isinstance(template, list): template = "\n".join(str(item) for item in template) elif not isinstance(template, str): @@ -230,27 +313,40 @@ class Prompt: @staticmethod def _restore_escaped_braces(template: str) -> str: - """将临时标记还原为实际的花括号字符""" + """在格式化完成后,将临时标记还原为实际的花括号字符 `{` 和 `}`.""" return template.replace(Prompt._TEMP_LEFT_BRACE, "{").replace(Prompt._TEMP_RIGHT_BRACE, "}") def _parse_template_args(self, template: str) -> list[str]: - """解析模板参数""" + """从模板字符串中解析出所有占位符(例如 "{user_name}" -> "user_name").""" template_args = [] + # 在解析前先处理转义花括号,避免将它们误认为占位符 processed_template = self._process_escaped_braces(template) + # 使用正则表达式查找所有花括号内的内容 result = re.findall(r"\{(.*?)}", processed_template) for expr in result: + # 添加到列表中,并确保唯一性 if expr and expr not in template_args: template_args.append(expr) return template_args async def build(self) -> str: - """ - 构建完整的提示词,包含智能上下文 + """构建完整的、包含所有智能上下文的提示词. + + 这是`Prompt`类最核心的方法。它 orchestrates 了整个构建流程: + 1. 验证传入的`PromptParameters`是否有效。 + 2. 调用 `_build_context_data` 异步地、并行地收集所有需要的上下文信息。 + 3. 使用收集到的上下文数据格式化主模板。 + 4. 返回最终构建完成的提示词文本。 Returns: - str: 构建完成的提示词文本 + str: 构建完成的、可以直接发送给LLM的提示词文本。 + + Raises: + ValueError: 如果参数验证失败。 + TimeoutError: 如果构建过程中的任何一步超时。 + RuntimeError: 如果发生其他构建错误。 """ - # 参数验证 + # 步骤 0: 参数验证 errors = self.parameters.validate() if errors: logger.error(f"参数验证失败: {', '.join(errors)}") @@ -258,18 +354,20 @@ class Prompt: start_time = time.time() try: - # 1. 构建核心上下文数据 + # 步骤 1: 构建核心的上下文数据字典 context_data = await self._build_context_data() - # 2. 格式化主模板 + # 步骤 2: 使用构建好的上下文数据来格式化主模板 main_formatted_prompt = await self._format_with_context(context_data) - # 3. 拼接组件内容和主模板内容 (逻辑已前置到 get_prompt_async) + # 步骤 3: (已废弃) 注入插件内容的逻辑已前置到`PromptManager.get_prompt_async`中 + # 这样做可以更早地组合模板,也使得`Prompt`类的职责更单一。 result = main_formatted_prompt total_time = time.time() - start_time logger.debug(f"Prompt构建完成,模式: {self.parameters.prompt_mode}, 耗时: {total_time:.2f}s") + # 缓存结果 self._formatted_result = result return result @@ -281,16 +379,33 @@ class Prompt: raise RuntimeError(f"构建Prompt失败: {e}") from e async def _build_context_data(self) -> dict[str, Any]: - """构建智能上下文数据""" - # 并行执行所有构建任务 + """构建所有智能上下文数据. + + 这是性能和复杂性的核心。它根据`PromptParameters`中的开关, + 动态地创建一系列异步构建任务,然后并行执行它们以最大限度地 + 减少I/O等待时间。 + + 关键优化: + - **并行执行**: 使用`asyncio.gather`(隐式地通过循环`await`)来同时运行多个数据获取任务。 + - **独立超时**: 为每个任务设置独立的、合理的超时时间,防止单个慢任务阻塞整个构建过程。 + - **预构建参数**: 允许外部系统(如消息处理器)预先构建某些耗时的数据(如记忆), + 并将其传入`PromptParameters`,从而完全跳过此处的实时构建。 + - **错误隔离**: 单个任务的失败或超时不会导致整个构建过程失败,而是会使用默认的空值替代, + 保证了系统的健壮性。 + + Returns: + dict[str, Any]: 一个包含所有构建好的上下文数据的字典。 + """ start_time = time.time() try: - # 准备构建任务 + # --- 步骤 1: 准备构建任务 --- tasks = [] task_names = [] - # 初始化预构建参数 + # --- 步骤 1.1: 优先使用预构建的参数 --- + # 如果参数对象中已经包含了某些block,说明它们是外部预构建的, + # 我们将它们存起来,并跳过对应的实时构建任务。 pre_built_params = {} if self.parameters.expression_habits_block: pre_built_params["expression_habits_block"] = self.parameters.expression_habits_block @@ -308,14 +423,13 @@ class Prompt: if self.parameters.notice_block: pre_built_params["notice_block"] = self.parameters.notice_block - # 根据参数确定要构建的项 + # --- 步骤 1.2: 根据参数和预构建情况,决定需要实时运行的任务 --- if self.parameters.enable_expression and not pre_built_params.get("expression_habits_block"): tasks.append(self._build_expression_habits()) task_names.append("expression_habits") - # 记忆块应该在回复前预构建,这里优先使用预构建的结果 + # 记忆块构建非常耗时,强烈建议预构建。如果没有预构建,这里会运行一个快速的后备版本。 if self.parameters.enable_memory and not pre_built_params.get("memory_block"): - # 如果没有预构建的记忆块,则快速构建一个简化版本 logger.debug("memory_block未预构建,执行快速构建作为后备方案") tasks.append(self._build_memory_block_fast()) task_names.append("memory_block") @@ -336,69 +450,76 @@ class Prompt: tasks.append(self._build_cross_context()) task_names.append("cross_context") - # 性能优化 - 为不同任务设置不同的超时时间 + # --- 步骤 2: 并行执行任务,并进行精细化的超时和错误处理 --- + + # 为不同类型的任务设置不同的超时时间,这是一个重要的性能优化。 + # I/O密集型或计算密集型任务(如记忆、工具)可以有更长的超时。 task_timeouts = { - "memory_block": 15.0, # 记忆系统 - 降低超时时间,鼓励预构建 - "tool_info": 15.0, # 工具信息 - "relation_info": 10.0, # 关系信息 - "knowledge_info": 10.0, # 知识库查询 - "cross_context": 10.0, # 上下文处理 - "expression_habits": 10.0, # 表达习惯 + "memory_block": 15.0, + "tool_info": 15.0, + "relation_info": 10.0, + "knowledge_info": 10.0, + "cross_context": 10.0, + "expression_habits": 10.0, } - # 分别处理每个任务,避免慢任务影响快任务 results = [] + # 循环等待每个任务,而不是使用`asyncio.gather`, + # 这样可以为每个任务应用独立的超时控制。 for i, task in enumerate(tasks): task_name = task_names[i] if i < len(task_names) else f"task_{i}" - task_timeout = task_timeouts.get(task_name, 2.0) # 默认2秒 + task_timeout = task_timeouts.get(task_name, 2.0) # 未指定超时的任务默认为2秒 try: - # 确保任务是一个协程对象 + # 确保任务是可等待的协程 if asyncio.iscoroutine(task): + # 使用 `asyncio.wait_for` 来执行带超时的任务 result = await asyncio.wait_for(task, timeout=task_timeout) results.append(result) logger.debug(f"构建任务{task_name}完成 ({task_timeout}s)") else: + # 如果任务不是协程,记录警告并使用默认值 logger.warning(f"任务{task_name}不是协程对象,类型: {type(task)},跳过处理") - default_result = self._get_default_result_for_task(task_name) - results.append(default_result) + results.append(self._get_default_result_for_task(task_name)) except asyncio.TimeoutError: + # 如果任务超时,记录警告并使用默认值 logger.warning(f"构建任务{task_name}超时 ({task_timeout}s),使用默认值") - # 为超时任务提供默认值 - default_result = self._get_default_result_for_task(task_name) - results.append(default_result) + results.append(self._get_default_result_for_task(task_name)) except Exception as e: + # 如果任务执行出错,记录错误并使用默认值 logger.error(f"构建任务{task_name}失败: {e!s}") - default_result = self._get_default_result_for_task(task_name) - results.append(default_result) + results.append(self._get_default_result_for_task(task_name)) - # 处理结果 + # --- 步骤 3: 合并所有结果 --- context_data = {} + # 合并实时构建的结果 for i, result in enumerate(results): task_name = task_names[i] if i < len(task_names) else f"task_{i}" - if isinstance(result, Exception): logger.error(f"构建任务{task_name}失败: {result!s}") elif isinstance(result, dict): context_data.update(result) - # 添加预构建的参数 + # 合并预构建的参数,这会覆盖任何同名的实时构建结果 for key, value in pre_built_params.items(): if value: context_data[key] = value except asyncio.TimeoutError: + # 这是一个不太可能发生的、总体的构建超时,作为最后的保障 logger.error("构建超时") context_data = {} + # 即使总体超时,也要确保预构建的参数被包含在内 for key, value in pre_built_params.items(): if value: context_data[key] = value - # 构建聊天历史 + # --- 步骤 4: 构建特定模式的上下文和补充基础信息 --- + # 为 s4u 模式构建特殊的聊天历史上下文 if self.parameters.prompt_mode == "s4u": await self._build_s4u_chat_context(context_data) - # 补充基础信息 + # 补充所有模式都需要的基础信息 context_data.update( { "keywords_reaction_prompt": self.parameters.keywords_reaction_prompt, @@ -421,7 +542,7 @@ class Prompt: return context_data async def _build_s4u_chat_context(self, context_data: dict[str, Any]) -> None: - """构建S4U模式的聊天上下文""" + """为S4U(Scene for You)模式构建特殊的、包含已读和未读消息的聊天上下文.""" if not self.parameters.message_list_before_now_long: return @@ -429,6 +550,7 @@ class Prompt: if self.parameters.target_user_info: target_user_id = self.parameters.target_user_info.get("user_id") or "" + # 调用核心构建逻辑 read_history_prompt, unread_history_prompt = await self._build_s4u_chat_history_prompts( self.parameters.message_list_before_now_long, target_user_id, @@ -436,6 +558,7 @@ class Prompt: self.parameters.chat_id, ) + # 将构建好的prompt添加到上下文数据中 context_data["read_history_prompt"] = read_history_prompt context_data["unread_history_prompt"] = unread_history_prompt @@ -443,14 +566,19 @@ class Prompt: async def _build_s4u_chat_history_prompts( self, message_list_before_now: list[dict[str, Any]], target_user_id: str, sender: str, chat_id: str ) -> tuple[str, str]: - """构建S4U风格的已读/未读历史消息prompt""" + """构建S4U风格的已读/未读历史消息prompt. + + 这是一个代理方法,它动态导入并调用`default_generator`中的实际实现, + 以避免循环依赖问题。 + """ try: - # 动态导入default_generator以避免循环导入 + # 动态导入以避免循环依赖: prompt -> replyer -> prompt from src.plugin_system.apis.generator_api import get_replyer - # 创建临时生成器实例来使用其方法 + # 获取一个临时的生成器实例来访问其方法 temp_generator = await get_replyer(None, chat_id, request_type="prompt_building") if temp_generator: + # 调用实际的构建方法 return await temp_generator.build_s4u_chat_history_prompts( message_list_before_now, target_user_id, sender, chat_id ) @@ -460,15 +588,17 @@ class Prompt: return "", "" async def _build_expression_habits(self) -> dict[str, Any]: - """构建表达习惯""" + """构建表达习惯(如表情、口癖)的上下文块.""" + # 检查当前聊天是否启用了表达习惯功能 use_expression, _, _ = global_config.expression.get_expression_config_for_chat(self.parameters.chat_id) if not use_expression: return {"expression_habits_block": ""} try: + # 动态导入以减少启动时的加载负担 from src.chat.express.expression_selector import ExpressionSelector - # 获取聊天历史用于表情选择 + # 准备用于分析的近期聊天历史 chat_history = "" if self.parameters.message_list_before_now_long: recent_messages = self.parameters.message_list_before_now_long[-10:] @@ -476,19 +606,16 @@ class Prompt: recent_messages, replace_bot_name=True, timestamp_mode="normal", truncate=True ) - # 创建表情选择器 + # 使用LLM选择与当前情景匹配的表达习惯 expression_selector = ExpressionSelector(self.parameters.chat_id) - - # 选择合适的表情 selected_expressions = await expression_selector.select_suitable_expressions_llm( chat_id=self.parameters.chat_id, chat_info=chat_history, target_message=self.parameters.target, ) - # 构建表达习惯块 + # 将选择的表达习惯格式化为提示词的一部分 if selected_expressions: - # 格式化表达方式,提取关键信息 formatted_expressions = [] for expr in selected_expressions: if isinstance(expr, dict): @@ -508,18 +635,19 @@ class Prompt: return {"expression_habits_block": expression_habits_block} except Exception as e: + # 保证即使构建失败,也不会中断整个流程 logger.error(f"构建表达习惯失败: {e}") return {"expression_habits_block": ""} async def _build_memory_block(self) -> dict[str, Any]: - """构建记忆块""" + """构建与当前对话相关的记忆上下文块(完整版).""" if not global_config.memory.enable_memory: return {"memory_block": ""} try: from src.chat.memory_system.enhanced_memory_activator import enhanced_memory_activator - # 获取聊天历史 + # 准备用于记忆激活的聊天历史 chat_history = "" if self.parameters.message_list_before_now_long: recent_messages = self.parameters.message_list_before_now_long[-20:] @@ -527,10 +655,9 @@ class Prompt: recent_messages, replace_bot_name=True, timestamp_mode="normal", truncate=True ) - # 并行执行记忆查询以提高性能 + # 并行查询长期记忆和即时记忆以提高性能 import asyncio - # 创建记忆查询任务 memory_tasks = [ enhanced_memory_activator.activate_memory_with_chat_history( target_message=self.parameters.target, chat_history_prompt=chat_history @@ -541,9 +668,10 @@ class Prompt: ] try: + # 使用 `return_exceptions=True` 来防止一个任务的失败导致所有任务失败 running_memories, instant_memory = await asyncio.gather(*memory_tasks, return_exceptions=True) - # 处理可能的异常结果 + # 单独处理每个任务的结果,如果是异常则记录并使用默认值 if isinstance(running_memories, BaseException): logger.warning(f"长期记忆查询失败: {running_memories}") running_memories = [] @@ -556,24 +684,21 @@ class Prompt: running_memories = [] instant_memory = None - # 构建记忆块 + # 将检索到的记忆格式化为提示词 if running_memories: try: - # 使用记忆格式化器进行格式化 from src.chat.memory_system.memory_formatter import format_memories_bracket_style - # 转换记忆数据格式 - 修复字段映射 + # 将原始记忆数据转换为格式化器所需的标准格式 formatted_memories = [] for memory in running_memories: - # 从 content 字段提取 display 内容,移除括号中的元数据 content = memory.get("content", "") display_text = content - - # 如果包含元数据括号,提取纯文本部分 + # 清理内容,移除元数据括号 if "(类型:" in content and ")" in content: display_text = content.split("(类型:")[0].strip() - # 映射 topic 到 memory_type + # 映射记忆主题到标准类型 topic = memory.get("topic", "personal_fact") memory_type_mapping = { "relationship": "personal_fact", @@ -598,17 +723,16 @@ class Prompt: } ) - # 使用方括号格式格式化记忆 + # 使用指定的风格进行格式化 memory_block = format_memories_bracket_style( formatted_memories, query_context=self.parameters.target ) except Exception as e: + # 如果格式化失败,提供一个简化的、健壮的备用格式 logger.warning(f"记忆格式化失败,使用简化格式: {e}") - # 备用简化格式 - 提取纯净内容 memory_parts = ["## 相关记忆回顾", ""] for memory in running_memories: content = memory.get("content", "") - # 提取纯文本内容 if "(类型:" in content and ")" in content: clean_content = content.split("(类型:")[0].strip() memory_parts.append(f"- {clean_content}") @@ -618,7 +742,7 @@ class Prompt: else: memory_block = "" - # 添加即时记忆 + # 将即时记忆附加到记忆块的末尾 if instant_memory: if memory_block: memory_block += f"\n- 最相关记忆:{instant_memory}" @@ -632,14 +756,14 @@ class Prompt: return {"memory_block": ""} async def _build_memory_block_fast(self) -> dict[str, Any]: - """快速构建记忆块(简化版本,用于未预构建时的后备方案)""" + """快速构建记忆块(简化版),作为未预构建时的后备方案.""" if not global_config.memory.enable_memory: return {"memory_block": ""} try: from src.chat.memory_system.enhanced_memory_activator import enhanced_memory_activator - # 简化的快速查询,只获取即时记忆 + # 这个快速版本只查询最高优先级的“即时记忆”,速度更快 instant_memory = await enhanced_memory_activator.get_instant_memory( target_message=self.parameters.target, chat_id=self.parameters.chat_id ) @@ -656,8 +780,9 @@ class Prompt: return {"memory_block": ""} async def _build_relation_info(self) -> dict[str, Any]: - """构建关系信息""" + """构建与对话目标相关的关系信息.""" try: + # 调用静态方法来执行实际的构建逻辑 relation_info = await Prompt.build_relation_info(self.parameters.chat_id, self.parameters.reply_to) return {"relation_info_block": relation_info} except Exception as e: @@ -665,14 +790,14 @@ class Prompt: return {"relation_info_block": ""} async def _build_tool_info(self) -> dict[str, Any]: - """构建工具信息""" + """构建工具调用结果的上下文块.""" if not global_config.tool.enable_tool: return {"tool_info_block": ""} try: from src.plugin_system.core.tool_use import ToolExecutor - # 获取聊天历史 + # 准备用于工具选择的聊天历史 chat_history = "" if self.parameters.message_list_before_now_long: recent_messages = self.parameters.message_list_before_now_long[-15:] @@ -680,10 +805,8 @@ class Prompt: recent_messages, replace_bot_name=True, timestamp_mode="normal", truncate=True ) - # 创建工具执行器 + # 决定是否调用工具并执行 tool_executor = ToolExecutor(chat_id=self.parameters.chat_id) - - # 执行工具获取信息 tool_results, _, _ = await tool_executor.execute_from_chat_message( sender=self.parameters.sender, target_message=self.parameters.target, @@ -691,7 +814,7 @@ class Prompt: return_details=False, ) - # 构建工具信息块 + # 将工具结果格式化为提示词的一部分 if tool_results: tool_info_parts = ["## 工具信息", "以下是你通过工具获取到的实时信息:"] for tool_result in tool_results: @@ -713,22 +836,21 @@ class Prompt: return {"tool_info_block": ""} async def _build_knowledge_info(self) -> dict[str, Any]: - """构建知识信息""" + """构建从知识库检索到的相关信息的上下文块.""" if not global_config.lpmm_knowledge.enable: return {"knowledge_prompt": ""} try: from src.chat.knowledge.knowledge_lib import qa_manager - # 获取问题文本(当前消息) question = self.parameters.target or "" if not question or not qa_manager: return {"knowledge_prompt": ""} - # 搜索相关知识 + # 从知识库检索与当前消息相关的信息 knowledge_results = await qa_manager.get_knowledge(question=question) - # 构建知识块 + # 将检索结果格式化为提示词 if knowledge_results and knowledge_results.get("knowledge_items"): knowledge_parts = ["## 知识库信息", "以下是与你当前对话相关的知识信息:"] @@ -737,10 +859,11 @@ class Prompt: source = item.get("source", "") relevance = item.get("relevance", 0.0) if content: + # 过滤掉相关性低于阈值的知识 try: relevance_float = float(relevance) if relevance_float < global_config.lpmm_knowledge.qa_paragraph_threshold: - continue # 跳过不符合阈值的知识 + continue relevance_str = f"{relevance_float:.2f}" except (ValueError, TypeError): relevance_str = str(relevance) @@ -750,6 +873,7 @@ class Prompt: else: knowledge_parts.append(f"- [{relevance_str}] {content}") + # 如果有总结,也一并加入 if knowledge_results.get("summary"): knowledge_parts.append(f"\n知识总结: {knowledge_results['summary']}") @@ -764,8 +888,9 @@ class Prompt: return {"knowledge_prompt": ""} async def _build_cross_context(self) -> dict[str, Any]: - """构建跨群上下文""" + """构建跨群聊上下文信息.""" try: + # 调用静态方法来执行实际的构建逻辑 cross_context = await Prompt.build_cross_context( self.parameters.chat_id, self.parameters.prompt_mode, self.parameters.target_user_info ) @@ -775,16 +900,19 @@ class Prompt: return {"cross_context_block": ""} async def _format_with_context(self, context_data: dict[str, Any]) -> str: - """使用上下文数据格式化模板""" + """根据不同的提示词模式,准备最终的参数并格式化模板.""" + # 根据prompt_mode选择不同的参数准备策略 if self.parameters.prompt_mode == "s4u": params = self._prepare_s4u_params(context_data) else: + # 当前normal模式和default模式使用相同的参数准备逻辑 params = self._prepare_default_params(context_data) + # 如果prompt有名称,则通过全局管理器格式化(这样可以应用注入逻辑),否则直接格式化 return await global_prompt_manager.format_prompt(self.name, **params) if self.name else self.format(**params) def _prepare_s4u_params(self, context_data: dict[str, Any]) -> dict[str, Any]: - """准备S4U模式的参数""" + """为S4U(Scene for You)模式准备最终用于格式化的参数字典.""" return { **context_data, "expression_habits_block": context_data.get("expression_habits_block", ""), @@ -816,7 +944,7 @@ class Prompt: } def _prepare_normal_params(self, context_data: dict[str, Any]) -> dict[str, Any]: - """准备Normal模式的参数""" + """为Normal模式准备最终用于格式化的参数字典.""" return { **context_data, "expression_habits_block": context_data.get("expression_habits_block", ""), @@ -844,7 +972,7 @@ class Prompt: } def _prepare_default_params(self, context_data: dict[str, Any]) -> dict[str, Any]: - """准备默认模式的参数""" + """为默认模式(或其他未指定模式)准备最终用于格式化的参数字典.""" return { "expression_habits_block": context_data.get("expression_habits_block", ""), "relation_info_block": context_data.get("relation_info_block", ""), @@ -870,62 +998,79 @@ class Prompt: } def format(self, *args, **kwargs) -> str: - """格式化模板,支持位置参数和关键字参数""" + """使用给定的参数格式化模板. + + 支持标准的`str.format()`语法,包括位置参数和关键字参数。 + 同时处理了之前用临时标记替换的转义花括号。 + + Args: + *args: 用于格式化的位置参数。 + **kwargs: 用于格式化的关键字参数。 + + Returns: + str: 格式化后的字符串。 + + Raises: + ValueError: 如果提供的参数与模板中的占位符不匹配。 + """ try: - # 先用位置参数格式化 + # 优先使用位置参数进行格式化 if args: formatted_args = {} - for i in range(len(args)): + for i, arg in enumerate(args): if i < len(self.args): - formatted_args[self.args[i]] = args[i] + formatted_args[self.args[i]] = arg processed_template = self._processed_template.format(**formatted_args) else: processed_template = self._processed_template - # 再用关键字参数格式化 + # 然后使用关键字参数对结果进行再次格式化 if kwargs: processed_template = processed_template.format(**kwargs) - # 将临时标记还原为实际的花括号 + # 最后,将转义花括号的临时标记还原 result = self._restore_escaped_braces(processed_template) return result except (IndexError, KeyError) as e: + # 捕获格式化错误并抛出更具信息量的异常 raise ValueError(f"格式化模板失败: {self.template}, args={args}, kwargs={kwargs} {e!s}") from e def __str__(self) -> str: - """返回格式化后的结果或原始模板""" + """返回格式化后的结果,如果还未格式化,则返回原始模板.""" return self._formatted_result if self._formatted_result else self.template def __repr__(self) -> str: - """返回提示词的表示形式""" + """返回一个清晰的、可用于调试的Prompt对象表示形式.""" return f"Prompt(template='{self.template}', name='{self.name}')" # ============================================================================= # PromptUtils功能迁移 - 静态工具方法 - # 这些方法原来在PromptUtils类中,现在作为Prompt类的静态方法 - # 解决循环导入问题 + # + # 这些方法原本位于一个单独的`PromptUtils`类中,为了解决循环导入问题, + # 它们被迁移到`Prompt`类下作为静态方法。 + # 这样,任何需要这些工具函数的地方都可以直接通过`Prompt.method_name`调用, + # 而无需导入另一个可能导致循环依赖的模块。 # ============================================================================= @staticmethod def parse_reply_target(target_message: str) -> tuple[str, str]: - """ - 解析回复目标消息 - 统一实现 + """解析“回复”类型的消息,分离出发送者和消息内容. Args: - target_message: 目标消息,格式为 "发送者:消息内容" 或 "发送者:消息内容" + target_message: 目标消息字符串,通常格式为 "发送者:消息内容" 或 "发送者:消息内容"。 Returns: - Tuple[str, str]: (发送者名称, 消息内容) + tuple[str, str]: 一个包含(发送者名称, 消息内容)的元组。 """ sender = "" target = "" - # 添加None检查,防止NoneType错误 + # 添加None检查,增强健壮性 if target_message is None: return sender, target + # 兼容中文和英文冒号作为分隔符 if ":" in target_message or ":" in target_message: - # 使用正则表达式匹配中文或英文冒号 parts = re.split(pattern=r"[::]", string=target_message, maxsplit=1) if len(parts) == 2: sender = parts[0].strip() @@ -934,15 +1079,14 @@ class Prompt: @staticmethod async def build_relation_info(chat_id: str, reply_to: str) -> str: - """ - 构建关系信息 - 统一实现 + """构建关于回复目标用户的关系信息字符串. Args: - chat_id: 聊天ID - reply_to: 回复目标字符串 + chat_id: 当前聊天的ID。 + reply_to: 被回复的原始消息字符串。 Returns: - str: 关系信息字符串 + str: 格式化后的关系信息字符串,或在失败时返回空字符串。 """ if not global_config.affinity_flow.enable_relationship_tracking: return "" @@ -953,28 +1097,31 @@ class Prompt: if not reply_to: return "" + # 解析出回复目标的发送者 sender, text = Prompt.parse_reply_target(reply_to) if not sender or not text: return "" - # 获取用户ID + # 根据发送者名称查找其用户ID person_info_manager = get_person_info_manager() person_id = await person_info_manager.get_person_id_by_person_name(sender) if not person_id: logger.warning(f"未找到用户 {sender} 的ID,跳过信息提取") return f"你完全不认识{sender},不理解ta的相关信息。" + # 使用关系提取器构建关系信息 return await relationship_fetcher.build_relation_info(person_id, points_num=5) def _get_default_result_for_task(self, task_name: str) -> dict[str, Any]: - """ - 为超时的任务提供默认结果 + """为超时或失败的异步构建任务提供一个安全的默认返回值. + + 这确保了单个子任务的失败不会导致整个提示词构建过程的崩溃。 Args: - task_name: 任务名称 + task_name: 失败的任务的名称。 Returns: - Dict: 默认结果 + dict: 一个包含空字符串值的字典,其键与任务的预期输出相匹配。 """ defaults = { "memory_block": {"memory_block": ""}, @@ -986,7 +1133,7 @@ class Prompt: } if task_name in defaults: - logger.info(f"为超时任务 {task_name} 提供默认值") + logger.info(f"为超时/失败的任务 {task_name} 提供默认值") return defaults[task_name] else: logger.warning(f"未知任务类型 {task_name},返回空结果") @@ -994,26 +1141,27 @@ class Prompt: @staticmethod async def build_cross_context(chat_id: str, prompt_mode: str, target_user_info: dict[str, Any] | None) -> str: - """ - 构建跨群聊上下文 - 统一实现 + """构建跨群聊的上下文信息. Args: - chat_id: 聊天ID - prompt_mode: 当前提示词模式 - target_user_info: 目标用户信息 + chat_id: 当前聊天的ID。 + prompt_mode: 当前的提示词模式。 + target_user_info: 目标用户的信息字典。 Returns: - str: 跨群聊上下文字符串 + str: 构建好的跨群聊上下文字符串。 """ if not global_config.cross_context.enable: return "" + # 动态导入以避免循环依赖 from src.plugin_system.apis import cross_context_api chat_stream = await get_chat_manager().get_stream(chat_id) if not chat_stream: return "" + # 目前只为s4u模式构建跨群上下文 if prompt_mode == "s4u": return await cross_context_api.build_cross_context_s4u(chat_stream, target_user_info) @@ -1021,24 +1169,23 @@ class Prompt: @staticmethod async def parse_reply_target_id(reply_to: str) -> str: - """ - 解析回复目标中的用户ID + """从回复目标字符串中解析出原始发送者的用户ID. Args: - reply_to: 回复目标字符串 + reply_to: 回复目标字符串。 Returns: - str: 用户ID + str: 找到的用户ID,如果找不到则返回空字符串。 """ if not reply_to: return "" - # 复用parse_reply_target方法的逻辑 + # 首先,解析出发送者的名称 sender, _ = Prompt.parse_reply_target(reply_to) if not sender: return "" - # 获取用户ID + # 然后,通过名称查询用户ID person_info_manager = get_person_info_manager() person_id = await person_info_manager.get_person_id_by_person_name(sender) if person_id: @@ -1052,7 +1199,19 @@ class Prompt: def create_prompt( template: str, name: str | None = None, parameters: PromptParameters | None = None, **kwargs ) -> Prompt: - """快速创建Prompt实例的工厂函数""" + """一个用于快速创建`Prompt`实例的工厂函数. + + 它会自动处理`PromptParameters`的创建。 + + Args: + template (str): 提示词模板。 + name (str | None): 提示词名称。 + parameters (PromptParameters | None): 预先创建的参数对象。 + **kwargs: 如果未提供`parameters`,这些关键字参数将被用于创建一个新的`PromptParameters`实例。 + + Returns: + Prompt: 新创建的Prompt实例。 + """ if parameters is None: parameters = PromptParameters(**kwargs) return Prompt(template, name, parameters) @@ -1061,23 +1220,36 @@ def create_prompt( async def create_prompt_async( template: str, name: str | None = None, parameters: PromptParameters | None = None, **kwargs ) -> Prompt: - """异步创建Prompt实例,并动态注入插件内容""" - # 确保有可用的parameters实例 + """异步创建`Prompt`实例,并自动处理插件内容的动态注入. + + 这是推荐的创建prompt的方式,因为它整合了注入逻辑。 + + Args: + template (str): 基础提示词模板。 + name (str | None): 提示词名称,用于查找要注入的组件。 + parameters (PromptParameters | None): 预先创建的参数对象。 + **kwargs: 如果未提供`parameters`,这些关键字参数将被用于创建一个新的`PromptParameters`实例。 + + Returns: + Prompt: 一个可能包含了注入内容的、新创建的Prompt实例。 + """ + # 确保我们有一个有效的参数实例 final_params = parameters or PromptParameters(**kwargs) - # 动态注入插件内容 + # 如果提供了名称,就尝试为它注入插件内容 if name: components_prefix = await prompt_component_manager.execute_components_for( injection_point=name, params=final_params ) if components_prefix: logger.debug(f"为'{name}'注入插件内容: \n{components_prefix}") + # 将注入内容拼接到原始模板的前面 template = f"{components_prefix}\n\n{template}" - # 使用可能已修改的模板创建实例 + # 使用可能已被修改的模板来创建最终的Prompt实例 prompt = create_prompt(template, name, final_params) - # 如果在特定上下文中,则异步注册 + # 如果当前处于一个临时上下文中,则将这个新创建的prompt异步注册到该上下文中 if global_prompt_manager._context._current_context: await global_prompt_manager._context.register_async(prompt)