import traceback import time import asyncio import random import re from typing import List, Optional, Dict, Any, Tuple from datetime import datetime from src.mais4u.mai_think import mai_thinking_manager from src.common.logger import get_logger from src.config.config import global_config, model_config from src.individuality.individuality import get_individuality from src.llm_models.utils_model import LLMRequest from src.chat.message_receive.message import UserInfo, Seg, MessageRecv, MessageSending from src.chat.message_receive.chat_stream import ChatStream from src.chat.message_receive.uni_message_sender import HeartFCSender from src.chat.utils.timer_calculator import Timer # <--- Import Timer from src.chat.utils.utils import get_chat_type_and_target_info from src.chat.utils.prompt_builder import Prompt, global_prompt_manager from src.chat.utils.chat_message_builder import ( build_readable_messages, get_raw_msg_before_timestamp_with_chat, replace_user_references_sync, ) from src.chat.express.expression_selector import expression_selector from src.chat.memory_system.memory_activator import MemoryActivator from src.chat.memory_system.instant_memory import InstantMemory from src.mood.mood_manager import mood_manager from src.person_info.person_info import Person, is_person_known from src.plugin_system.base.component_types import ActionInfo, EventType from src.plugin_system.apis import llm_api from src.common.schedule_manager import schedule_manager logger = get_logger("replyer") def init_prompt(): Prompt("你正在qq群里聊天,下面是群里在聊的内容:", "chat_target_group1") Prompt("你正在和{sender_name}聊天,这是你们之前聊的内容:", "chat_target_private1") Prompt("在群里聊天", "chat_target_group2") Prompt("和{sender_name}聊天", "chat_target_private2") Prompt( """ {expression_habits_block} {relation_info_block} {chat_target} {time_block} {chat_info} {identity} 你正在{chat_target_2},{reply_target_block} 对这句话,你想表达,原句:{raw_reply},原因是:{reason}。你现在要思考怎么组织回复 你现在的心情是:{mood_state} 你需要使用合适的语法和句法,参考聊天内容,组织一条日常且口语化的回复。请你修改你想表达的原句,符合你的表达风格和语言习惯 {reply_style},你可以完全重组回复,保留最基本的表达含义就好,但重组后保持语意通顺。 {keywords_reaction_prompt} {moderation_prompt} 不要输出多余内容(包括前后缀,冒号和引号,括号,表情包,at或 @等 ),只输出一条回复就好。 现在,你说: """, "default_expressor_prompt", ) # s4u 风格的 prompt 模板 Prompt( """ 你正在一个QQ群里聊天,你需要理解整个群的聊天动态和话题走向,并做出自然的回应。 **重要:消息针对性判断** 在回应之前,首先分析消息的针对性: 1. **直接针对你**:@你、回复你、明确询问你 → 必须回应 2. **间接相关**:涉及你感兴趣的话题但未直接问你 → 谨慎参与 3. **他人对话**:与你无关的私人交流 → 通常不参与 4. **重复内容**:他人已充分回答的问题 → 避免重复 {expression_habits_block} {tool_info_block} {knowledge_prompt} {memory_block} {relation_info_block} {extra_info_block} {identity} {action_descriptions} {time_block} 你现在的主要任务是和 {sender_name} 聊天。同时,也有其他用户会参与聊天,你可以参考他们的回复内容,但是你现在想回复{sender_name}的发言。 {background_dialogue_prompt} {core_dialogue_prompt} {reply_target_block} {schedule_block} 你现在的心情是:{mood_state} {reply_style} 注意不要复读你说过的话 {keywords_reaction_prompt} 请注意不要输出多余内容(包括前后缀,冒号和引号,at或 @等 )。只输出回复内容。 {moderation_prompt} 你的核心任务是针对 {reply_target_block} 中提到的内容,生成一段紧密相关且能推动对话的回复。你的回复应该: 1. 明确回应目标消息,而不是宽泛地评论。 2. 可以分享你的看法、提出相关问题,或者开个合适的玩笑。 3. 目的是让对话更有趣、更深入。 4. 不要浮夸,不要夸张修辞,不要输出多余内容(包括前后缀,冒号和引号,括号(),表情包,at或 @等 )。 最终请输出一条简短、完整且口语化的回复。 现在,你说: """, "replyer_self_prompt", ) Prompt( """ 你是一个专门获取知识的助手。你的名字是{bot_name}。现在是{time_now}。 群里正在进行的聊天内容: {chat_history} 现在,{sender}发送了内容:{target_message},你想要回复ta。 请仔细分析聊天内容,考虑以下几点: 1. 内容中是否包含需要查询信息的问题 2. 是否有明确的知识获取指令 If you need to use the search tool, please directly call the function "lpmm_search_knowledge". If you do not need to use any tool, simply output "No tool needed". """, name="lpmm_get_knowledge_prompt", ) # normal 版 prompt 模板(0.9之前的简化模式) logger.debug("[Prompt模式调试] 正在注册normal_style_prompt模板") Prompt( """ 你正在一个QQ群里聊天,你需要理解整个群的聊天动态和话题走向,并做出自然的回应。 **重要:消息针对性判断** 在回应之前,首先分析消息的针对性: 1. **直接针对你**:@你、回复你、明确询问你 → 必须回应 2. **间接相关**:涉及你感兴趣的话题但未直接问你 → 谨慎参与 3. **他人对话**:与你无关的私人交流 → 通常不参与 4. **重复内容**:他人已充分回答的问题 → 避免重复 {expression_habits_block} {tool_info_block} {knowledge_prompt} {memory_block} {relation_info_block} {extra_info_block} {identity} 如果有人说你是人机,你可以用一种阴阳怪气的口吻来回应 {schedule_block} {action_descriptions} 下面是群里最近的聊天内容: -------------------------------- {time_block} {chat_info} -------------------------------- {reply_target_block} 你现在的心情是:{mood_state} {config_expression_style} 注意不要复读你说过的话 {keywords_reaction_prompt} 请注意不要输出多余内容(包括前后缀,冒号和引号,at或 @等 )。只输出回复内容。 {moderation_prompt} 你的核心任务是针对 {reply_target_block} 中提到的内容,生成一段紧密相关且能推动对话的回复。你的回复应该: 1. 明确回应目标消息,而不是宽泛地评论。 2. 可以分享你的看法、提出相关问题,或者开个合适的玩笑。 3. 目的是让对话更有趣、更深入。 最终请输出一条简短、完整且口语化的回复。 现在,你说: """, "normal_style_prompt", ) logger.debug("[Prompt模式调试] normal_style_prompt模板注册完成") class DefaultReplyer: def __init__( self, chat_stream: ChatStream, request_type: str = "replyer", ): self.express_model = LLMRequest(model_set=model_config.model_task_config.replyer, request_type=request_type) self.chat_stream = chat_stream self.is_group_chat, self.chat_target_info = get_chat_type_and_target_info(self.chat_stream.stream_id) self.heart_fc_sender = HeartFCSender() self.memory_activator = MemoryActivator() self.instant_memory = InstantMemory(chat_id=self.chat_stream.stream_id) from src.plugin_system.core.tool_use import ToolExecutor # 延迟导入ToolExecutor,不然会循环依赖 self.tool_executor = ToolExecutor(chat_id=self.chat_stream.stream_id, enable_cache=True, cache_ttl=3) async def generate_reply_with_context( self, extra_info: str = "", reply_reason: str = "", available_actions: Optional[Dict[str, ActionInfo]] = None, choosen_actions: Optional[List[Dict[str, Any]]] = None, enable_tool: bool = True, from_plugin: bool = True, stream_id: Optional[str] = None, reply_message: Optional[Dict[str, Any]] = None, ) -> Tuple[bool, Optional[Dict[str, Any]], Optional[str], List[Dict[str, Any]]]: # sourcery skip: merge-nested-ifs """ 回复器 (Replier): 负责生成回复文本的核心逻辑。 Args: reply_to: 回复对象,格式为 "发送者:消息内容" extra_info: 额外信息,用于补充上下文 reply_reason: 回复原因 available_actions: 可用的动作信息字典 choosen_actions: 已选动作 enable_tool: 是否启用工具调用 from_plugin: 是否来自插件 Returns: Tuple[bool, Optional[Dict[str, Any]], Optional[str]]: (是否成功, 生成的回复, 使用的prompt) """ prompt = None selected_expressions = None if available_actions is None: available_actions = {} try: # 3. 构建 Prompt with Timer("构建Prompt", {}): # 内部计时器,可选保留 prompt,selected_expressions = await self.build_prompt_reply_context( extra_info=extra_info, available_actions=available_actions, choosen_actions=choosen_actions, enable_tool=enable_tool, reply_message=reply_message, reply_reason=reply_reason, ) if not prompt: logger.warning("构建prompt失败,跳过回复生成") return False, None, None, [] from src.plugin_system.core.events_manager import events_manager if not from_plugin: if not await events_manager.handle_mai_events( EventType.POST_LLM, None, prompt, None, stream_id=stream_id ): raise UserWarning("插件于请求前中断了内容生成") # 4. 调用 LLM 生成回复 content = None reasoning_content = None model_name = "unknown_model" try: content, reasoning_content, model_name, tool_call = await self.llm_generate_content(prompt) logger.debug(f"replyer生成内容: {content}") llm_response = { "content": content, "reasoning": reasoning_content, "model": model_name, "tool_calls": tool_call, } if not from_plugin and not await events_manager.handle_mai_events( EventType.AFTER_LLM, None, prompt, llm_response, stream_id=stream_id ): raise UserWarning("插件于请求后取消了内容生成") except UserWarning as e: raise e except Exception as llm_e: # 精简报错信息 logger.error(f"LLM 生成失败: {llm_e}") return False, None, prompt, selected_expressions # LLM 调用失败则无法生成回复 return True, llm_response, prompt, selected_expressions except UserWarning as uw: raise uw except Exception as e: logger.error(f"回复生成意外失败: {e}") traceback.print_exc() return False, None, prompt, selected_expressions async def rewrite_reply_with_context( self, raw_reply: str = "", reason: str = "", reply_to: str = "", return_prompt: bool = False, ) -> Tuple[bool, Optional[str], Optional[str]]: """ 表达器 (Expressor): 负责重写和优化回复文本。 Args: raw_reply: 原始回复内容 reason: 回复原因 reply_to: 回复对象,格式为 "发送者:消息内容" relation_info: 关系信息 Returns: Tuple[bool, Optional[str]]: (是否成功, 重写后的回复内容) """ try: with Timer("构建Prompt", {}): # 内部计时器,可选保留 prompt = await self.build_prompt_rewrite_context( raw_reply=raw_reply, reason=reason, reply_to=reply_to, ) content = None reasoning_content = None model_name = "unknown_model" if not prompt: logger.error("Prompt 构建失败,无法生成回复。") return False, None, None try: content, reasoning_content, model_name, _ = await self.llm_generate_content(prompt) logger.info(f"想要表达:{raw_reply}||理由:{reason}||生成回复: {content}\n") except Exception as llm_e: # 精简报错信息 logger.error(f"LLM 生成失败: {llm_e}") return False, None, prompt if return_prompt else None # LLM 调用失败则无法生成回复 return True, content, prompt if return_prompt else None except Exception as e: logger.error(f"回复生成意外失败: {e}") traceback.print_exc() return False, None, prompt if return_prompt else None async def build_relation_info(self, sender: str, target: str): if not global_config.relationship.enable_relationship: return "" # 获取用户ID person = Person(person_name = sender) if not is_person_known(person_name=sender): logger.warning(f"未找到用户 {sender} 的ID,跳过信息提取") return f"你完全不认识{sender},不理解ta的相关信息。" return person.build_relationship(points_num=5) async def build_expression_habits(self, chat_history: str, target: str) -> Tuple[str, List[int]]: """构建表达习惯块 Args: chat_history: 聊天历史记录 target: 目标消息内容 Returns: str: 表达习惯信息字符串 """ # 检查是否允许在此聊天流中使用表达 use_expression, _, _ = global_config.expression.get_expression_config_for_chat(self.chat_stream.stream_id) if not use_expression: return "", [] style_habits = [] # 使用从处理器传来的选中表达方式 # LLM模式:调用LLM选择5-10个,然后随机选5个 selected_expressions, selected_ids = await expression_selector.select_suitable_expressions_llm( self.chat_stream.stream_id, chat_history, max_num=8, target_message=target ) if selected_expressions: logger.debug(f"使用处理器选中的{len(selected_expressions)}个表达方式") for expr in selected_expressions: if isinstance(expr, dict) and "situation" in expr and "style" in expr: style_habits.append(f"当{expr['situation']}时,使用 {expr['style']}") else: logger.debug("没有从处理器获得表达方式,将使用空的表达方式") # 不再在replyer中进行随机选择,全部交给处理器处理 style_habits_str = "\n".join(style_habits) # 动态构建expression habits块 expression_habits_block = "" expression_habits_title = "" if style_habits_str.strip(): expression_habits_title = ( "你可以参考以下的语言习惯,当情景合适就使用,但不要生硬使用,以合理的方式结合到你的回复中:" ) expression_habits_block += f"{style_habits_str}\n" return f"{expression_habits_title}\n{expression_habits_block}", selected_ids async def build_memory_block(self, chat_history: List[Dict[str, Any]], target: str) -> str: """构建记忆块 Args: chat_history: 聊天历史记录 target: 目标消息内容 Returns: str: 记忆信息字符串 """ chat_talking_prompt_short = build_readable_messages( chat_history, replace_bot_name=True, merge_messages=False, timestamp_mode="relative", read_mark=0.0, show_actions=True, ) if not global_config.memory.enable_memory: return "" instant_memory = None running_memories = await self.memory_activator.activate_memory_with_chat_history( target_message=target, chat_history_prompt=chat_history ) if global_config.memory.enable_instant_memory: asyncio.create_task(self.instant_memory.create_and_store_memory(chat_history)) instant_memory = await self.instant_memory.get_memory(target) logger.info(f"即时记忆:{instant_memory}") if not running_memories: return "" memory_str = "以下是当前在聊天中,你回忆起的记忆:\n" for running_memory in running_memories: keywords,content = running_memory memory_str += f"- {keywords}:{content}\n" if instant_memory: memory_str += f"- {instant_memory}\n" return memory_str async def build_tool_info(self, chat_history: str, sender: str, target: str, enable_tool: bool = True) -> str: """构建工具信息块 Args: chat_history: 聊天历史记录 reply_to: 回复对象,格式为 "发送者:消息内容" enable_tool: 是否启用工具调用 Returns: str: 工具信息字符串 """ if not enable_tool: return "" try: # 使用工具执行器获取信息 tool_results, _, _ = await self.tool_executor.execute_from_chat_message( sender=sender, target_message=target, chat_history=chat_history, return_details=False ) if tool_results: tool_info_str = "以下是你通过工具获取到的实时信息:\n" for tool_result in tool_results: tool_name = tool_result.get("tool_name", "unknown") content = tool_result.get("content", "") result_type = tool_result.get("type", "tool_result") tool_info_str += f"- 【{tool_name}】{result_type}: {content}\n" tool_info_str += "以上是你获取到的实时信息,请在回复时参考这些信息。" logger.info(f"获取到 {len(tool_results)} 个工具结果") return tool_info_str else: logger.debug("未获取到任何工具结果") return "" except Exception as e: logger.error(f"工具信息获取失败: {e}") return "" def _parse_reply_target(self, target_message: str) -> Tuple[str, str]: """解析回复目标消息 Args: target_message: 目标消息,格式为 "发送者:消息内容" 或 "发送者:消息内容" Returns: Tuple[str, str]: (发送者名称, 消息内容) """ sender = "" target = "" # 添加None检查,防止NoneType错误 if target_message is None: return sender, target if ":" in target_message or ":" in target_message: # 使用正则表达式匹配中文或英文冒号 parts = re.split(pattern=r"[::]", string=target_message, maxsplit=1) if len(parts) == 2: sender = parts[0].strip() target = parts[1].strip() return sender, target async def build_keywords_reaction_prompt(self, target: Optional[str]) -> str: """构建关键词反应提示 Args: target: 目标消息内容 Returns: str: 关键词反应提示字符串 """ # 关键词检测与反应 keywords_reaction_prompt = "" try: # 添加None检查,防止NoneType错误 if target is None: return keywords_reaction_prompt # 处理关键词规则 for rule in global_config.keyword_reaction.keyword_rules: if any(keyword in target for keyword in rule.keywords): logger.info(f"检测到关键词规则:{rule.keywords},触发反应:{rule.reaction}") keywords_reaction_prompt += f"{rule.reaction}," # 处理正则表达式规则 for rule in global_config.keyword_reaction.regex_rules: for pattern_str in rule.regex: try: pattern = re.compile(pattern_str) if result := pattern.search(target): reaction = rule.reaction for name, content in result.groupdict().items(): reaction = reaction.replace(f"[{name}]", content) logger.info(f"匹配到正则表达式:{pattern_str},触发反应:{reaction}") keywords_reaction_prompt += f"{reaction}," break except re.error as e: logger.error(f"正则表达式编译错误: {pattern_str}, 错误信息: {str(e)}") continue except Exception as e: logger.error(f"关键词检测与反应时发生异常: {str(e)}", exc_info=True) return keywords_reaction_prompt async def _time_and_run_task(self, coroutine, name: str) -> Tuple[str, Any, float]: """计时并运行异步任务的辅助函数 Args: coroutine: 要执行的协程 name: 任务名称 Returns: Tuple[str, Any, float]: (任务名称, 任务结果, 执行耗时) """ start_time = time.time() result = await coroutine end_time = time.time() duration = end_time - start_time return name, result, duration def build_s4u_chat_history_prompts( self, message_list_before_now: List[Dict[str, Any]], target_user_id: str, sender: str ) -> Tuple[str, str]: """ 构建 s4u 风格的分离对话 prompt Args: message_list_before_now: 历史消息列表 target_user_id: 目标用户ID(当前对话对象) Returns: Tuple[str, str]: (核心对话prompt, 背景对话prompt) """ core_dialogue_list = [] bot_id = str(global_config.bot.qq_account) # 过滤消息:分离bot和目标用户的对话 vs 其他用户的对话 for msg_dict in message_list_before_now: try: msg_user_id = str(msg_dict.get("user_id")) reply_to = msg_dict.get("reply_to", "") _platform, reply_to_user_id = self._parse_reply_target(reply_to) if (msg_user_id == bot_id and reply_to_user_id == target_user_id) or msg_user_id == target_user_id: # bot 和目标用户的对话 core_dialogue_list.append(msg_dict) except Exception as e: logger.error(f"处理消息记录时出错: {msg_dict}, 错误: {e}") # 构建背景对话 prompt all_dialogue_prompt = "" if message_list_before_now: latest_25_msgs = message_list_before_now[-int(global_config.chat.max_context_size) :] all_dialogue_prompt_str = build_readable_messages( latest_25_msgs, replace_bot_name=True, timestamp_mode="normal_no_YMD", truncate=True, ) all_dialogue_prompt = f"所有用户的发言:\n{all_dialogue_prompt_str}" # 构建核心对话 prompt core_dialogue_prompt = "" if core_dialogue_list: # 检查最新五条消息中是否包含bot自己说的消息 latest_5_messages = core_dialogue_list[-5:] if len(core_dialogue_list) >= 5 else core_dialogue_list has_bot_message = any(str(msg.get("user_id")) == bot_id for msg in latest_5_messages) # logger.info(f"最新五条消息:{latest_5_messages}") # logger.info(f"最新五条消息中是否包含bot自己说的消息:{has_bot_message}") # 如果最新五条消息中不包含bot的消息,则返回空字符串 if not has_bot_message: core_dialogue_prompt = "" else: core_dialogue_list = core_dialogue_list[-int(global_config.chat.max_context_size * 0.6) :] # 限制消息数量 core_dialogue_prompt_str = build_readable_messages( core_dialogue_list, replace_bot_name=True, merge_messages=False, timestamp_mode="normal_no_YMD", read_mark=0.0, truncate=True, show_actions=True, ) core_dialogue_prompt = f"""-------------------------------- 这是你和{sender}的对话,你们正在交流中: {core_dialogue_prompt_str} -------------------------------- """ return core_dialogue_prompt, all_dialogue_prompt def build_mai_think_context( self, chat_id: str, memory_block: str, relation_info: str, time_block: str, chat_target_1: str, chat_target_2: str, mood_prompt: str, identity_block: str, sender: str, target: str, chat_info: str, ) -> Any: """构建 mai_think 上下文信息 Args: chat_id: 聊天ID memory_block: 记忆块内容 relation_info: 关系信息 time_block: 时间块内容 chat_target_1: 聊天目标1 chat_target_2: 聊天目标2 mood_prompt: 情绪提示 identity_block: 身份块内容 sender: 发送者名称 target: 目标消息内容 chat_info: 聊天信息 Returns: Any: mai_think 实例 """ mai_think = mai_thinking_manager.get_mai_think(chat_id) mai_think.memory_block = memory_block mai_think.relation_info_block = relation_info mai_think.time_block = time_block mai_think.chat_target = chat_target_1 mai_think.chat_target_2 = chat_target_2 mai_think.chat_info = chat_info mai_think.mood_state = mood_prompt mai_think.identity = identity_block mai_think.sender = sender mai_think.target = target return mai_think async def build_actions_prompt(self, available_actions, choosen_actions: Optional[List[Dict[str, Any]]] = None) -> str: """构建动作提示 """ action_descriptions = "" if available_actions: action_descriptions = "你可以做以下这些动作:\n" for action_name, action_info in available_actions.items(): action_description = action_info.description action_descriptions += f"- {action_name}: {action_description}\n" action_descriptions += "\n" choosen_action_descriptions = "" if choosen_actions: for action in choosen_actions: action_name = action.get('action_type', 'unknown_action') if action_name =="reply": continue action_description = action.get('reason', '无描述') reasoning = action.get('reasoning', '无原因') choosen_action_descriptions += f"- {action_name}: {action_description},原因:{reasoning}\n" if choosen_action_descriptions: action_descriptions += "根据聊天情况,你决定在回复的同时做以下这些动作:\n" action_descriptions += choosen_action_descriptions return action_descriptions async def build_prompt_reply_context( self, extra_info: str = "", reply_reason: str = "", available_actions: Optional[Dict[str, ActionInfo]] = None, choosen_actions: Optional[List[Dict[str, Any]]] = None, enable_tool: bool = True, reply_message: Optional[Dict[str, Any]] = None, ) -> Tuple[str, List[int]]: """ 构建回复器上下文 Args: extra_info: 额外信息,用于补充上下文 reply_reason: 回复原因 available_actions: 可用动作 choosen_actions: 已选动作 enable_timeout: 是否启用超时处理 enable_tool: 是否启用工具调用 reply_message: 回复的原始消息 Returns: str: 构建好的上下文 """ if available_actions is None: available_actions = {} chat_stream = self.chat_stream chat_id = chat_stream.stream_id is_group_chat = bool(chat_stream.group_info) platform = chat_stream.platform if reply_message: user_id = reply_message.get("user_id","") person = Person(platform=platform, user_id=user_id) person_name = person.person_name or user_id sender = person_name target = reply_message.get('processed_plain_text') else: person_name = "用户" sender = "用户" target = "消息" if global_config.mood.enable_mood: chat_mood = mood_manager.get_mood_by_chat_id(chat_id) mood_prompt = chat_mood.mood_state else: mood_prompt = "" target = replace_user_references_sync(target, chat_stream.platform, replace_bot_name=True) message_list_before_now_long = get_raw_msg_before_timestamp_with_chat( chat_id=chat_id, timestamp=time.time(), limit=global_config.chat.max_context_size * 1, ) message_list_before_short = get_raw_msg_before_timestamp_with_chat( chat_id=chat_id, timestamp=time.time(), limit=int(global_config.chat.max_context_size * 0.33), ) chat_talking_prompt_short = build_readable_messages( message_list_before_short, replace_bot_name=True, merge_messages=False, timestamp_mode="relative", read_mark=0.0, show_actions=True, ) # 并行执行五个构建任务 task_results = await asyncio.gather( self._time_and_run_task( self.build_expression_habits(chat_talking_prompt_short, target), "expression_habits" ), self._time_and_run_task(self.build_relation_info(sender, target), "relation_info"), self._time_and_run_task(self.build_memory_block(message_list_before_short, target), "memory_block"), self._time_and_run_task( self.build_tool_info(chat_talking_prompt_short, sender, target, enable_tool=enable_tool), "tool_info" ), self._time_and_run_task(self.get_prompt_info(chat_talking_prompt_short, sender, target), "prompt_info"), self._time_and_run_task(self.build_actions_prompt(available_actions,choosen_actions), "actions_info"), ) # 任务名称中英文映射 task_name_mapping = { "expression_habits": "选取表达方式", "relation_info": "感受关系", "memory_block": "回忆", "tool_info": "使用工具", "prompt_info": "获取知识", "actions_info": "动作信息", } # 处理结果 timing_logs = [] results_dict = {} for name, result, duration in task_results: results_dict[name] = result chinese_name = task_name_mapping.get(name, name) timing_logs.append(f"{chinese_name}: {duration:.1f}s") if duration > 8: logger.warning(f"回复生成前信息获取耗时过长: {chinese_name} 耗时: {duration:.1f}s,请使用更快的模型") logger.info(f"在回复前的步骤耗时: {'; '.join(timing_logs)}") expression_habits_block, selected_expressions = results_dict["expression_habits"] relation_info = results_dict["relation_info"] memory_block = results_dict["memory_block"] tool_info = results_dict["tool_info"] prompt_info = results_dict["prompt_info"] # 直接使用格式化后的结果 actions_info = results_dict["actions_info"] keywords_reaction_prompt = await self.build_keywords_reaction_prompt(target) if extra_info: extra_info_block = f"以下是你在回复时需要参考的信息,现在请你阅读以下内容,进行决策\n{extra_info}\n以上是你在回复时需要参考的信息,现在请你阅读以下内容,进行决策" else: extra_info_block = "" time_block = f"当前时间:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}" identity_block = await get_individuality().get_personality_block() schedule_block = "" if global_config.schedule.enable: current_activity = schedule_manager.get_current_activity() if current_activity: schedule_block = f"你当前正在:{current_activity}。" moderation_prompt_block = ( "请不要输出违法违规内容,不要输出色情,暴力,政治相关内容,如有敏感内容,请规避。" ) if sender: if is_group_chat: reply_target_block = ( f"现在{sender}说的:{target}。引起了你的注意,你想要在群里发言或者回复这条消息。原因是{reply_reason}" ) else: # private chat reply_target_block = f"现在{sender}说的:{target}。引起了你的注意,针对这条消息回复。原因是{reply_reason}" else: reply_target_block = "" # if is_group_chat: # chat_target_1 = await global_prompt_manager.get_prompt_async("chat_target_group1") # chat_target_2 = await global_prompt_manager.get_prompt_async("chat_target_group2") # else: # chat_target_name = "对方" # if self.chat_target_info: # chat_target_name = ( # self.chat_target_info.get("person_name") or self.chat_target_info.get("user_nickname") or "对方" # ) # chat_target_1 = await global_prompt_manager.format_prompt( # "chat_target_private1", sender_name=chat_target_name # ) # chat_target_2 = await global_prompt_manager.format_prompt( # "chat_target_private2", sender_name=chat_target_name # ) # 构建分离的对话 prompt core_dialogue_prompt, background_dialogue_prompt = self.build_s4u_chat_history_prompts( message_list_before_now_long, user_id, sender ) # 根据配置选择模板 current_prompt_mode = global_config.personality.prompt_mode logger.debug(f"[Prompt模式调试] 当前配置的prompt_mode: {current_prompt_mode}") if current_prompt_mode == "normal": template_name = "normal_style_prompt" logger.debug(f"[Prompt模式调试] 选择使用normal模式模板: {template_name}") # normal模式使用统一的聊天历史,不分离核心对话和背景对话 config_expression_style = global_config.personality.reply_style # 获取统一的聊天历史(不分离) unified_message_list = get_raw_msg_before_timestamp_with_chat( chat_id=self.chat_stream.stream_id, timestamp=time.time(), limit=int(global_config.chat.max_context_size * 1.5), ) unified_chat_history = build_readable_messages( unified_message_list, replace_bot_name=True, merge_messages=False, timestamp_mode="normal_no_YMD", read_mark=0.0, truncate=True, show_actions=True, ) # 为normal模式构建简化的chat_info(不包含时间,因为time_block单独传递) chat_info = f"""群里的聊天内容: {unified_chat_history}""" logger.debug("[Prompt模式调试] normal模式使用统一聊天历史,不分离对话") logger.debug("[Prompt模式调试] normal模式参数准备完成,开始调用format_prompt") logger.debug(f"[Prompt模式调试] normal模式传递的参数: template_name={template_name}") logger.debug("[Prompt模式调试] 检查global_prompt_manager是否有该模板...") # 检查模板是否存在 try: test_prompt = await global_prompt_manager.get_prompt_async(template_name) logger.debug(f"[Prompt模式调试] 找到模板 {template_name}, 内容预览: {test_prompt[:100]}...") except Exception as e: logger.error(f"[Prompt模式调试] 模板 {template_name} 不存在或获取失败: {e}") result = await global_prompt_manager.format_prompt( template_name, expression_habits_block=expression_habits_block, tool_info_block=tool_info, knowledge_prompt=prompt_info, memory_block=memory_block, relation_info_block=relation_info, extra_info_block=extra_info_block, identity=identity_block, schedule_block=schedule_block, action_descriptions=action_descriptions, time_block=time_block, # 保留time_block参数 chat_info=chat_info, reply_target_block=reply_target_block, mood_state=mood_prompt, config_expression_style=config_expression_style, keywords_reaction_prompt=keywords_reaction_prompt, moderation_prompt=moderation_prompt_block, ) return result else: # 使用 s4u 风格的模板 template_name = "s4u_style_prompt" logger.debug(f"[Prompt模式调试] 选择使用s4u模式模板: {template_name} (prompt_mode={current_prompt_mode})") logger.debug("[Prompt模式调试] s4u模式参数准备完成,开始调用format_prompt") # 检查s4u模板是否存在 try: test_prompt = await global_prompt_manager.get_prompt_async(template_name) logger.debug(f"[Prompt模式调试] 找到s4u模板 {template_name}, 内容预览: {test_prompt[:100]}...") except Exception as e: #理论上我觉得这玩意没多大可能炸就是了 logger.error(f"[Prompt模式调试] s4u模板 {template_name} 不存在或获取失败: {e}") result = await global_prompt_manager.format_prompt( template_name, expression_habits_block=expression_habits_block, tool_info_block=tool_info, knowledge_prompt=prompt_info, memory_block=memory_block, relation_info_block=relation_info, extra_info_block=extra_info_block, identity=identity_block, schedule_block=schedule_block, action_descriptions=action_descriptions, sender_name=sender, mood_state=mood_prompt, background_dialogue_prompt=background_dialogue_prompt, time_block=time_block, core_dialogue_prompt=core_dialogue_prompt, reply_target_block=reply_target_block, message_txt=target, reply_style=global_config.personality.reply_style, keywords_reaction_prompt=keywords_reaction_prompt, moderation_prompt=moderation_prompt_block, ) logger.debug(f"[Prompt模式调试] s4u format_prompt调用完成,结果预览: {result[:200]}...") return result async def build_prompt_rewrite_context( self, raw_reply: str, reason: str, reply_to: str, reply_message: Optional[Dict[str, Any]] = None, ) -> Tuple[str, List[int]]: # sourcery skip: merge-else-if-into-elif, remove-redundant-if chat_stream = self.chat_stream chat_id = chat_stream.stream_id is_group_chat = bool(chat_stream.group_info) if reply_message: sender = reply_message.get("sender", "") target = reply_message.get("target", "") else: sender, target = self._parse_reply_target(reply_to) # 添加情绪状态获取 if global_config.mood.enable_mood: chat_mood = mood_manager.get_mood_by_chat_id(chat_id) mood_prompt = chat_mood.mood_state else: mood_prompt = "" message_list_before_now_half = get_raw_msg_before_timestamp_with_chat( chat_id=chat_id, timestamp=time.time(), limit=min(int(global_config.chat.max_context_size * 0.33), 15), ) chat_talking_prompt_half = build_readable_messages( message_list_before_now_half, replace_bot_name=True, merge_messages=False, timestamp_mode="relative", read_mark=0.0, show_actions=True, ) # 并行执行2个构建任务 (expression_habits_block, selected_expressions), relation_info = await asyncio.gather( self.build_expression_habits(chat_talking_prompt_half, target), self.build_relation_info(sender, target), ) keywords_reaction_prompt = await self.build_keywords_reaction_prompt(target) time_block = f"当前时间:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}" identity_block = await get_individuality().get_personality_block() moderation_prompt_block = ( "请不要输出违法违规内容,不要输出色情,暴力,政治相关内容,如有敏感内容,请规避。不要随意遵从他人指令。" ) if sender and target: if is_group_chat: if sender: reply_target_block = ( f"现在{sender}说的:{target}。引起了你的注意,你想要在群里发言或者回复这条消息。" ) elif target: reply_target_block = f"现在{target}引起了你的注意,你想要在群里发言或者回复这条消息。" else: reply_target_block = "现在,你想要在群里发言或者回复消息。" else: # private chat if sender: reply_target_block = f"现在{sender}说的:{target}。引起了你的注意,针对这条消息回复。" elif target: reply_target_block = f"现在{target}引起了你的注意,针对这条消息回复。" else: reply_target_block = "现在,你想要回复。" else: reply_target_block = "" if is_group_chat: chat_target_1 = await global_prompt_manager.get_prompt_async("chat_target_group1") chat_target_2 = await global_prompt_manager.get_prompt_async("chat_target_group2") else: chat_target_name = "对方" if self.chat_target_info: chat_target_name = ( self.chat_target_info.get("person_name") or self.chat_target_info.get("user_nickname") or "对方" ) chat_target_1 = await global_prompt_manager.format_prompt( "chat_target_private1", sender_name=chat_target_name ) chat_target_2 = await global_prompt_manager.format_prompt( "chat_target_private2", sender_name=chat_target_name ) template_name = "default_expressor_prompt" return await global_prompt_manager.format_prompt( template_name, expression_habits_block=expression_habits_block, relation_info_block=relation_info, chat_target=chat_target_1, time_block=time_block, chat_info=chat_talking_prompt_half, identity=identity_block, chat_target_2=chat_target_2, reply_target_block=reply_target_block, raw_reply=raw_reply, reason=reason, mood_state=mood_prompt, # 添加情绪状态参数 reply_style=global_config.personality.reply_style, keywords_reaction_prompt=keywords_reaction_prompt, moderation_prompt=moderation_prompt_block, ) async def _build_single_sending_message( self, message_id: str, message_segment: Seg, reply_to: bool, is_emoji: bool, thinking_start_time: float, display_message: str, anchor_message: Optional[MessageRecv] = None, ) -> MessageSending: """构建单个发送消息""" bot_user_info = UserInfo( user_id=global_config.bot.qq_account, user_nickname=global_config.bot.nickname, platform=self.chat_stream.platform, ) # await anchor_message.process() sender_info = anchor_message.message_info.user_info if anchor_message else None return MessageSending( message_id=message_id, # 使用片段的唯一ID chat_stream=self.chat_stream, bot_user_info=bot_user_info, sender_info=sender_info, message_segment=message_segment, reply=anchor_message, # 回复原始锚点 is_head=reply_to, is_emoji=is_emoji, thinking_start_time=thinking_start_time, # 传递原始思考开始时间 display_message=display_message, ) async def llm_generate_content(self, prompt: str): with Timer("LLM生成", {}): # 内部计时器,可选保留 # 直接使用已初始化的模型实例 logger.info(f"使用模型集生成回复: {self.express_model.model_for_task}") if global_config.debug.show_prompt: logger.info(f"\n{prompt}\n") else: logger.debug(f"\n{prompt}\n") content, (reasoning_content, model_name, tool_calls) = await self.express_model.generate_response_async(prompt) logger.debug(f"replyer生成内容: {content}") return content, reasoning_content, model_name, tool_calls async def get_prompt_info(self, message: str, sender: str, target: str): related_info = "" start_time = time.time() from src.plugins.built_in.knowledge.lpmm_get_knowledge import SearchKnowledgeFromLPMMTool logger.debug(f"获取知识库内容,元消息:{message[:30]}...,消息长度: {len(message)}") # 从LPMM知识库获取知识 try: # 检查LPMM知识库是否启用 if not global_config.lpmm_knowledge.enable: logger.debug("LPMM知识库未启用,跳过获取知识库内容") return "" time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) bot_name = global_config.bot.nickname prompt = await global_prompt_manager.format_prompt( "lpmm_get_knowledge_prompt", bot_name=bot_name, time_now=time_now, chat_history=message, sender=sender, target_message=target, ) _, _, _, _, tool_calls = await llm_api.generate_with_model_with_tools( prompt, model_config=model_config.model_task_config.tool_use, tool_options=[SearchKnowledgeFromLPMMTool.get_tool_definition()], ) if tool_calls: result = await self.tool_executor.execute_tool_call(tool_calls[0], SearchKnowledgeFromLPMMTool()) end_time = time.time() if not result or not result.get("content"): logger.debug("从LPMM知识库获取知识失败,返回空知识...") return "" found_knowledge_from_lpmm = result.get("content", "") logger.debug( f"从LPMM知识库获取知识,相关信息:{found_knowledge_from_lpmm[:100]}...,信息长度: {len(found_knowledge_from_lpmm)}" ) related_info += found_knowledge_from_lpmm logger.debug(f"获取知识库内容耗时: {(end_time - start_time):.3f}秒") logger.debug(f"获取知识库内容,相关信息:{related_info[:100]}...,信息长度: {len(related_info)}") return f"你有以下这些**知识**:\n{related_info}\n请你**记住上面的知识**,之后可能会用到。\n" else: logger.debug("从LPMM知识库获取知识失败,可能是从未导入过知识,返回空知识...") return "" except Exception as e: logger.error(f"获取知识库内容时发生异常: {str(e)}") return "" def weighted_sample_no_replacement(items, weights, k) -> list: """ 加权且不放回地随机抽取k个元素。 参数: items: 待抽取的元素列表 weights: 每个元素对应的权重(与items等长,且为正数) k: 需要抽取的元素个数 返回: selected: 按权重加权且不重复抽取的k个元素组成的列表 如果 items 中的元素不足 k 个,就只会返回所有可用的元素 实现思路: 每次从当前池中按权重加权随机选出一个元素,选中后将其从池中移除,重复k次。 这样保证了: 1. count越大被选中概率越高 2. 不会重复选中同一个元素 """ selected = [] pool = list(zip(items, weights, strict=False)) for _ in range(min(k, len(pool))): total = sum(w for _, w in pool) r = random.uniform(0, total) upto = 0 for idx, (item, weight) in enumerate(pool): upto += weight if upto >= r: selected.append(item) pool.pop(idx) break return selected init_prompt()