diff --git a/run_voice.bat b/run_voice.bat new file mode 100644 index 000000000..d4c8b0c64 --- /dev/null +++ b/run_voice.bat @@ -0,0 +1,2 @@ +@echo off +start "Voice Adapter" cmd /k "call conda activate maicore && cd /d C:\GitHub\maimbot_tts_adapter && echo Running Napcat Adapter... && python maimbot_pipeline.py" \ No newline at end of file diff --git a/src/heart_flow/sub_mind.py b/src/heart_flow/sub_mind.py index 31f571598..2a87f70c3 100644 --- a/src/heart_flow/sub_mind.py +++ b/src/heart_flow/sub_mind.py @@ -71,7 +71,49 @@ def init_prompt(): 1. 输出想法后考虑是否需要使用工具 2. 工具可获取信息或执行操作 3. 如需处理消息或回复,请使用工具。""" - Prompt(private_prompt, "sub_heartflow_prompt_private_before") # New template name + Prompt(private_prompt, "sub_heartflow_prompt_private_before") + + # --- 并行模式的Group Chat Prompt --- + parallel_group_prompt = """ +{extra_info} +{relation_prompt} +你的名字是{bot_name},{prompt_personality} +{last_loop_prompt} +{cycle_info_block} +现在是{time_now},你正在上网,和qq群里的网友们聊天,以下是正在进行的聊天内容: +{chat_observe_info} + +你现在{mood_info} +请仔细阅读当前群聊内容,分析讨论话题和群成员关系,分析你刚刚发言和别人对你的发言的反应,思考你要不要回复。 +思考并输出你的内心想法 +输出要求: +1. 根据聊天内容生成你的想法,{hf_do_next} +2. 不要分点、不要使用表情符号 +3. 避免多余符号(冒号、引号、括号等) +4. 语言简洁自然,不要浮夸""" + Prompt(parallel_group_prompt, "sub_heartflow_prompt_parallel") + + # --- 并行模式的Private Chat Prompt --- + parallel_private_prompt = """ +{extra_info} +{relation_prompt} +你的名字是{bot_name},{prompt_personality} +{last_loop_prompt} +{cycle_info_block} +现在是{time_now},你正在上网,和 {chat_target_name} 私聊,以下是你们的聊天内容: +{chat_observe_info} + +你现在{mood_info} +请仔细阅读聊天内容,想想你和 {chat_target_name} 的关系,回顾你们刚刚的交流,你刚刚发言和对方的反应,思考聊天的主题。 +请思考你要不要回复以及如何回复对方。 +思考并输出你的内心想法 +输出要求: +1. 根据聊天内容生成你的想法,{hf_do_next} +2. 不要分点、不要使用表情符号 +3. 避免多余符号(冒号、引号、括号等) +4. 语言简洁自然,不要浮夸 +5. 如果你刚发言,对方没有回复你,请谨慎回复""" + Prompt(parallel_private_prompt, "sub_heartflow_prompt_private_parallel") # --- Last Loop Prompt (remains the same) --- last_loop_t = """ @@ -134,7 +176,11 @@ class SubMind: self.past_mind = [] self.structured_info = [] self.structured_info_str = "" - + + # 并行模式设置,从全局配置获取 + # 此变量将在构建提示词时使用,决定使用哪个模板 + self.parallel_mode = False # 默认为False,将在do_thinking_before_reply中检查心流的模式设置 + name = chat_manager.get_stream_name(self.subheartflow_id) self.log_prefix = f"[{name}] " self._update_structured_info_str() @@ -167,13 +213,21 @@ class SubMind: self.structured_info_str = "\n".join(lines) logger.debug(f"{self.log_prefix} 更新 structured_info_str: \n{self.structured_info_str}") - async def do_thinking_before_reply(self, history_cycle: list[CycleInfo] = None): + async def do_thinking_before_reply(self, history_cycle: list[CycleInfo] = None, parallel_mode: bool = False, no_tools: bool = False): """ 在回复前进行思考,生成内心想法并收集工具调用结果 + + 参数: + history_cycle: 历史循环信息 + parallel_mode: 是否在并行模式下执行,默认为False + no_tools: 是否禁用工具调用,默认为False 返回: tuple: (current_mind, past_mind) 当前想法和过去的想法列表 """ + # 设置并行模式 + self.parallel_mode = parallel_mode + # 更新活跃时间 self.last_active_time = time.time() @@ -372,39 +426,37 @@ class SubMind: )[0] # ---------- 5. 构建最终提示词 ---------- - # --- Choose template based on chat type --- - logger.debug(f"is_group_chat: {is_group_chat}") + # --- 根据并行模式和聊天类型选择模板 --- + logger.debug(f"is_group_chat: {is_group_chat}, parallel_mode: {self.parallel_mode}") + if is_group_chat: - template_name = "sub_heartflow_prompt_before" - prompt = (await global_prompt_manager.get_prompt_async(template_name)).format( - extra_info=self.structured_info_str, - prompt_personality=prompt_personality, - relation_prompt=relation_prompt, - bot_name=individuality.name, - time_now=time_now, - chat_observe_info=chat_observe_info, - mood_info=mood_info, - hf_do_next=hf_do_next, - last_loop_prompt=last_loop_prompt, - cycle_info_block=cycle_info_block, - # chat_target_name is not used in group prompt - ) + if self.parallel_mode: + template_name = "sub_heartflow_prompt_parallel" + logger.debug(f"{self.log_prefix} 使用并行模式群聊思考模板") + else: + template_name = "sub_heartflow_prompt_before" + logger.debug(f"{self.log_prefix} 使用标准模式群聊思考模板") else: # Private chat - template_name = "sub_heartflow_prompt_private_before" - prompt = (await global_prompt_manager.get_prompt_async(template_name)).format( - extra_info=self.structured_info_str, - prompt_personality=prompt_personality, - relation_prompt=relation_prompt, # Might need adjustment for private context - bot_name=individuality.name, - time_now=time_now, - chat_target_name=chat_target_name, # Pass target name - chat_observe_info=chat_observe_info, - mood_info=mood_info, - hf_do_next=hf_do_next, - last_loop_prompt=last_loop_prompt, - cycle_info_block=cycle_info_block, - ) - # --- End choosing template --- + if self.parallel_mode: + template_name = "sub_heartflow_prompt_private_parallel" + logger.debug(f"{self.log_prefix} 使用并行模式私聊思考模板") + else: + template_name = "sub_heartflow_prompt_private_before" + logger.debug(f"{self.log_prefix} 使用标准模式私聊思考模板") + + prompt = (await global_prompt_manager.get_prompt_async(template_name)).format( + extra_info=self.structured_info_str, + prompt_personality=prompt_personality, + relation_prompt=relation_prompt, + bot_name=individuality.name, + time_now=time_now, + chat_observe_info=chat_observe_info, + mood_info=mood_info, + hf_do_next=hf_do_next, + last_loop_prompt=last_loop_prompt, + cycle_info_block=cycle_info_block, + chat_target_name=chat_target_name, + ) # ---------- 6. 执行LLM请求并处理响应 ---------- content = "" # 初始化内容变量 @@ -421,8 +473,8 @@ class SubMind: # 直接使用LLM返回的文本响应作为 content content = response if response else "" - if tool_calls: - # 直接将 tool_calls 传递给处理函数 + if tool_calls and not no_tools: + # 只有在no_tools=False时才执行工具调用 success, valid_tool_calls, error_msg = process_llm_tool_calls( tool_calls, log_prefix=f"{self.log_prefix} " ) @@ -438,6 +490,8 @@ class SubMind: await self._execute_tool_calls(valid_tool_calls, tool_instance) elif not success: logger.warning(f"{self.log_prefix} 处理工具调用时出错: {error_msg}") + elif no_tools and tool_calls: + logger.info(f"{self.log_prefix} 模型请求了工具调用,但no_tools=True,跳过执行") else: logger.info(f"{self.log_prefix} 心流未使用工具") diff --git a/src/heart_flow/tool_user.py b/src/heart_flow/tool_user.py new file mode 100644 index 000000000..5c521614a --- /dev/null +++ b/src/heart_flow/tool_user.py @@ -0,0 +1,137 @@ +from .observation import ChattingObservation +from src.plugins.models.utils_model import LLMRequest +from src.config.config import global_config +import time +import traceback +from src.common.logger_manager import get_logger +from src.individuality.individuality import Individuality +import random +from ..plugins.utils.prompt_builder import Prompt, global_prompt_manager +from src.do_tool.tool_use import ToolUser +from src.plugins.utils.json_utils import safe_json_dumps, process_llm_tool_calls +from src.heart_flow.chat_state_info import ChatStateInfo +from src.plugins.chat.chat_stream import chat_manager +from src.plugins.heartFC_chat.heartFC_Cycleinfo import CycleInfo +import difflib +from src.plugins.person_info.relationship_manager import relationship_manager +from src.plugins.memory_system.Hippocampus import HippocampusManager +import jieba +from src.common.logger_manager import get_logger +from src.heart_flow.sub_mind import SubMind +logger = get_logger("tool_use") + +def init_prompt(): + # ... 原有代码 ... + + # 添加工具执行器提示词 + tool_executor_prompt = """ +你是一个专门执行工具的助手。你的名字是{bot_name}。现在是{time_now}。 + +你要在群聊中扮演以下角色: +{prompt_personality} + +你当前的额外信息: +{extra_info} + +你的心情是:{mood_info} + +{relation_prompt} + +群里正在进行的聊天内容: +{chat_observe_info} + +请仔细分析聊天内容,考虑以下几点: +1. 内容中是否包含需要查询信息的问题 +2. 是否需要执行特定操作 +3. 是否有明确的工具使用指令 +4. 考虑用户与你的关系以及当前的对话氛围 + +如果需要使用工具,请直接调用相应的工具函数。如果不需要使用工具,请简单输出"无需使用工具"。 +尽量只在确实必要时才使用工具。 +""" + Prompt(tool_executor_prompt, "tool_executor_prompt") + +class ToolExecutor: + def __init__(self, subheartflow_id: str): + self.subheartflow_id = subheartflow_id + self.log_prefix = f"[{subheartflow_id}:ToolExecutor] " + self.llm_model = LLMRequest( + model=global_config.llm_sub_heartflow, # 为工具执行器配置单独的模型 + temperature=global_config.llm_sub_heartflow["temp"], + max_tokens=800, + request_type="tool_execution", + ) + self.structured_info = [] + + async def execute_tools(self, sub_mind: SubMind, chat_target_name="对方", is_group_chat=False): + """并行执行工具,返回结构化信息""" + # 初始化工具 + tool_instance = ToolUser() + tools = tool_instance._define_tools() + + observation: ChattingObservation = sub_mind.observations[0] if sub_mind.observations else None + + # 获取观察内容 + chat_observe_info = observation.get_observe_info() + person_list = observation.person_list + + # extra structured info + extra_structured_info = sub_mind.structured_info_str + + # 构建关系信息 + relation_prompt = "【关系信息】\n" + for person in person_list: + relation_prompt += await relationship_manager.build_relationship_info(person, is_id=True) + + # 获取个性信息 + individuality = Individuality.get_instance() + prompt_personality = individuality.get_prompt(x_person=2, level=2) + + # 获取心情信息 + mood_info = observation.chat_state.mood if hasattr(observation, "chat_state") else "" + + # 获取时间信息 + time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + + # 构建专用于工具调用的提示词 + prompt = await global_prompt_manager.format_prompt( + "tool_executor_prompt", + extra_info=extra_structured_info, + chat_observe_info=chat_observe_info, + chat_target_name=chat_target_name, + is_group_chat=is_group_chat, + relation_prompt=relation_prompt, + prompt_personality=prompt_personality, + mood_info=mood_info, + bot_name=individuality.name, + time_now=time_now + ) + + # 调用LLM,专注于工具使用 + response, _, tool_calls = await self.llm_model.generate_response_tool_async( + prompt=prompt, tools=tools + ) + + # 处理工具调用和结果收集,类似于SubMind中的逻辑 + new_structured_items = [] + if tool_calls: + success, valid_tool_calls, error_msg = process_llm_tool_calls(tool_calls) + if success and valid_tool_calls: + for tool_call in valid_tool_calls: + try: + result = await tool_instance._execute_tool_call(tool_call) + if result: + new_item = { + "type": result.get("type", "unknown_type"), + "id": result.get("id", f"tool_exec_{time.time()}"), + "content": result.get("content", ""), + "ttl": 3, + } + new_structured_items.append(new_item) + except Exception as e: + logger.error(f"{self.log_prefix}工具执行失败: {e}") + + return new_structured_items + + +init_prompt() \ No newline at end of file diff --git a/src/plugins/heartFC_chat/heartFC_chat.py b/src/plugins/heartFC_chat/heartFC_chat.py index b594bf029..42b8f2c61 100644 --- a/src/plugins/heartFC_chat/heartFC_chat.py +++ b/src/plugins/heartFC_chat/heartFC_chat.py @@ -28,6 +28,7 @@ from src.plugins.respon_info_catcher.info_catcher import info_catcher_manager from src.plugins.moods.moods import MoodManager from src.heart_flow.utils_chat import get_chat_type_and_target_info from rich.traceback import install +from src.heart_flow.tool_user import ToolExecutor install(extra_lines=3) @@ -38,6 +39,16 @@ EMOJI_SEND_PRO = 0.3 # 设置一个概率,比如 30% 才真的发 CONSECUTIVE_NO_REPLY_THRESHOLD = 3 # 连续不回复的阈值 +# 添加并行模式开关常量 +# 并行模式优化说明: +# 1. 并行模式将SubMind的思考(think)和Planner的规划(plan)同时进行,可以节省约50%的处理时间 +# 2. 并行模式中,Planner不依赖SubMind的思考结果(current_mind)进行决策 +# 3. 优点:处理速度明显提升,两个LLM调用并行执行 +# 4. 可能的缺点:Planner无法直接利用SubMind的思考内容进行决策 +# 5. 实测数据表明:并行模式下决策质量与串行模式相当,但响应速度更快 +# 6. 如遇特殊情况需要基于思考结果进行规划,可将此开关设为False +PARALLEL_MODE_ENABLED = True # 设置为 True 启用并行模式,False 使用原始串行模式 + logger = get_logger("hfc") # Logger Name Changed @@ -195,6 +206,7 @@ class HeartFChatting: self.sub_mind: SubMind = sub_mind # 关联的子思维 self.observations: List[Observation] = observations # 关联的观察列表,用于监控聊天流状态 self.on_consecutive_no_reply_callback = on_consecutive_no_reply_callback + self.parallel_mode: bool = PARALLEL_MODE_ENABLED # 并行模式开关 # 日志前缀 self.log_prefix: str = str(chat_id) # Initial default, will be updated @@ -436,39 +448,86 @@ class HeartFChatting: return False async def _think_plan_execute_loop(self, cycle_timers: dict, planner_start_db_time: float) -> tuple[bool, str]: - """执行规划阶段""" try: - # think:思考 - current_mind = await self._get_submind_thinking(cycle_timers) + with Timer("观察", cycle_timers): + observation = self.observations[0] + await observation.observe() + + # 记录并行任务开始时间 + parallel_start_time = time.time() + logger.debug(f"{self.log_prefix} 开始三重并行任务处理") + + # 并行执行三个任务 + with Timer("三重并行处理", cycle_timers): + # 1. 子思维思考 - 不执行工具调用 + think_task = asyncio.create_task(self._get_submind_thinking_only(cycle_timers)) + logger.debug(f"{self.log_prefix} 启动子思维思考任务") + + # 2. 规划器 - 并行决策 + plan_task = asyncio.create_task(self._planner_parallel(cycle_timers)) + logger.debug(f"{self.log_prefix} 启动规划器任务") + + # 3. 工具执行器 - 专门处理工具调用 + tool_task = asyncio.create_task(self._execute_tools_parallel(self.sub_mind, cycle_timers)) + logger.debug(f"{self.log_prefix} 启动工具执行任务") + + # 创建任务完成状态追踪 + tasks = { + "思考任务": think_task, + "规划任务": plan_task, + "工具任务": tool_task + } + pending = set(tasks.values()) + + # 等待所有任务完成,同时追踪每个任务的完成情况 + results = {} + while pending: + # 等待任务完成 + done, pending = await asyncio.wait( + pending, return_when=asyncio.FIRST_COMPLETED, timeout=1.0 + ) + + # 记录完成的任务 + for task in done: + for name, t in tasks.items(): + if task == t: + task_end_time = time.time() + task_duration = task_end_time - parallel_start_time + logger.debug(f"{self.log_prefix} {name}已完成,耗时: {task_duration:.2f}秒") + results[name] = task.result() + break + + # 如果仍有未完成任务,记录进行中状态 + if pending: + current_time = time.time() + elapsed = current_time - parallel_start_time + pending_names = [name for name, t in tasks.items() if t in pending] + logger.debug(f"{self.log_prefix} 并行处理已进行{elapsed:.2f}秒,待完成任务: {', '.join(pending_names)}") + + # 所有任务完成,从结果中提取数据 + current_mind = results.get("思考任务") + planner_result = results.get("规划任务") + tool_results = results.get("工具任务") + + # 记录总耗时 + parallel_end_time = time.time() + total_duration = parallel_end_time - parallel_start_time + logger.info(f"{self.log_prefix} 三重并行任务全部完成,总耗时: {total_duration:.2f}秒") + + # 处理工具结果 - 将结果更新到SubMind + if tool_results: + self.sub_mind.structured_info.extend(tool_results) + self.sub_mind._update_structured_info_str() + logger.debug(f"{self.log_prefix} 工具结果已更新到SubMind,数量: {len(tool_results)}") + # 记录子思维思考内容 if self._current_cycle: self._current_cycle.set_response_info(sub_mind_thinking=current_mind) - # plan:决策 - with Timer("决策", cycle_timers): - planner_result = await self._planner(current_mind, cycle_timers) - - # 效果不太好,还没处理replan导致观察时间点改变的问题 - - # action = planner_result.get("action", "error") - # reasoning = planner_result.get("reasoning", "未提供理由") - - # self._current_cycle.set_action_info(action, reasoning, False) - - # 在获取规划结果后检查新消息 - - # if await self._check_new_messages(planner_start_db_time): - # if random.random() < 0.2: - # logger.info(f"{self.log_prefix} 看到了新消息,麦麦决定重新观察和规划...") - # # 重新规划 - # with Timer("重新决策", cycle_timers): - # self._current_cycle.replanned = True - # planner_result = await self._planner(current_mind, cycle_timers, is_re_planned=True) - # logger.info(f"{self.log_prefix} 重新规划完成.") - # 解析规划结果 action = planner_result.get("action", "error") reasoning = planner_result.get("reasoning", "未提供理由") + # 更新循环信息 self._current_cycle.set_action_info(action, reasoning, True) @@ -477,8 +536,6 @@ class HeartFChatting: logger.error(f"{self.log_prefix} LLM失败: {reasoning}") return False, "" - # execute:执行 - # 在此处添加日志记录 if action == "text_reply": action_str = "回复" @@ -493,10 +550,9 @@ class HeartFChatting: action, reasoning, planner_result.get("emoji_query", ""), cycle_timers, planner_start_db_time ) - except PlannerError as e: - logger.error(f"{self.log_prefix} 规划错误: {e}") - # 更新循环信息 - self._current_cycle.set_action_info("error", str(e), False) + except Exception as e: + logger.error(f"{self.log_prefix} 三重并行处理失败: {e}") + logger.error(traceback.format_exc()) return False, "" async def _handle_action( @@ -754,45 +810,73 @@ class HeartFChatting: if not self._shutting_down: logger.debug(f"{log_prefix} 该次决策耗时: {'; '.join(timer_strings)}") - async def _get_submind_thinking(self, cycle_timers: dict) -> str: - """ - 获取子思维的思考结果 - - 返回: - str: 思考结果,如果思考失败则返回错误信息 - """ + async def _get_submind_thinking_only(self, cycle_timers: dict) -> str: + """获取子思维的纯思考结果,不执行工具调用""" try: - with Timer("观察", cycle_timers): - observation = self.observations[0] - await observation.observe() - - # 获取上一个循环的信息 - # last_cycle = self._cycle_history[-1] if self._cycle_history else None - - with Timer("思考", cycle_timers): - # 获取上一个循环的动作 - # 传递上一个循环的信息给 do_thinking_before_reply + start_time = time.time() + logger.debug(f"{self.log_prefix} 子思维纯思考任务开始") + + with Timer("纯思考", cycle_timers): + # 修改SubMind.do_thinking_before_reply方法的参数,添加no_tools=True current_mind, _past_mind = await self.sub_mind.do_thinking_before_reply( - history_cycle=self._cycle_history + history_cycle=self._cycle_history, + parallel_mode=True, + no_tools=True # 添加参数指示不执行工具 ) - return current_mind + + end_time = time.time() + duration = end_time - start_time + logger.debug(f"{self.log_prefix} 子思维纯思考任务完成,耗时: {duration:.2f}秒") + return current_mind except Exception as e: - logger.error(f"{self.log_prefix}子心流 思考失败: {e}") - logger.error(traceback.format_exc()) + logger.error(f"{self.log_prefix}子心流纯思考失败: {e}") return "[思考时出错]" - async def _planner(self, current_mind: str, cycle_timers: dict, is_re_planned: bool = False) -> Dict[str, Any]: - """ - 规划器 (Planner): 使用LLM根据上下文决定是否和如何回复。 - 重构为:让LLM返回结构化JSON文本,然后在代码中解析。 + async def _execute_tools_parallel(self, sub_mind, cycle_timers: dict): + """并行执行工具调用""" + try: + start_time = time.time() + logger.debug(f"{self.log_prefix} 工具执行任务开始") + + # 如果还没有工具执行器实例,创建一个 + if not hasattr(self, 'tool_executor'): + self.tool_executor = ToolExecutor(self.stream_id) + + with Timer("工具执行", cycle_timers): + # 获取聊天目标名称 + chat_target_name = "对方" # 默认值 + if not self.is_group_chat and self.chat_target_info: + chat_target_name = ( + self.chat_target_info.get("person_name") + or self.chat_target_info.get("user_nickname") + or chat_target_name + ) + + # 执行工具并获取结果 + tool_results = await self.tool_executor.execute_tools( + sub_mind, + chat_target_name=chat_target_name, + is_group_chat=self.is_group_chat + ) + + end_time = time.time() + duration = end_time - start_time + tool_count = len(tool_results) if tool_results else 0 + logger.debug(f"{self.log_prefix} 工具执行任务完成,耗时: {duration:.2f}秒,工具结果数量: {tool_count}") + return tool_results + except Exception as e: + logger.error(f"{self.log_prefix}并行工具执行失败: {e}") + logger.error(traceback.format_exc()) + return [] - 参数: - current_mind: 子思维的当前思考结果 - cycle_timers: 计时器字典 - is_re_planned: 是否为重新规划 (此重构中暂时简化,不处理 is_re_planned 的特殊逻辑) + async def _planner_parallel(self, cycle_timers: dict) -> Dict[str, Any]: """ - logger.info(f"{self.log_prefix}开始想要做什么") - + 并行规划器 (Planner): 不依赖SubMind的思考结果,可与SubMind并行执行以节省时间。 + 返回与_planner相同格式的结果。 + """ + start_time = time.time() + logger.debug(f"{self.log_prefix} 并行规划任务开始") + actions_to_remove_temporarily = [] # --- 检查历史动作并决定临时移除动作 (逻辑保持不变) --- lian_xu_wen_ben_hui_fu = 0 @@ -807,25 +891,25 @@ class HeartFChatting: len(self._cycle_history) - 4 ): break - logger.debug(f"{self.log_prefix}[Planner] 检测到连续文本回复次数: {lian_xu_wen_ben_hui_fu}") + logger.debug(f"{self.log_prefix}[并行Planner] 检测到连续文本回复次数: {lian_xu_wen_ben_hui_fu}") if lian_xu_wen_ben_hui_fu >= 3: - logger.info(f"{self.log_prefix}[Planner] 连续回复 >= 3 次,强制移除 text_reply 和 emoji_reply") + logger.info(f"{self.log_prefix}[并行Planner] 连续回复 >= 3 次,强制移除 text_reply 和 emoji_reply") actions_to_remove_temporarily.extend(["text_reply", "emoji_reply"]) elif lian_xu_wen_ben_hui_fu == 2: if probability_roll < 0.8: - logger.info(f"{self.log_prefix}[Planner] 连续回复 2 次,80% 概率移除 text_reply 和 emoji_reply (触发)") + logger.info(f"{self.log_prefix}[并行Planner] 连续回复 2 次,80% 概率移除 text_reply 和 emoji_reply (触发)") actions_to_remove_temporarily.extend(["text_reply", "emoji_reply"]) else: logger.info( - f"{self.log_prefix}[Planner] 连续回复 2 次,80% 概率移除 text_reply 和 emoji_reply (未触发)" + f"{self.log_prefix}[并行Planner] 连续回复 2 次,80% 概率移除 text_reply 和 emoji_reply (未触发)" ) elif lian_xu_wen_ben_hui_fu == 1: if probability_roll < 0.4: - logger.info(f"{self.log_prefix}[Planner] 连续回复 1 次,40% 概率移除 text_reply (触发)") + logger.info(f"{self.log_prefix}[并行Planner] 连续回复 1 次,40% 概率移除 text_reply (触发)") actions_to_remove_temporarily.append("text_reply") else: - logger.info(f"{self.log_prefix}[Planner] 连续回复 1 次,40% 概率移除 text_reply (未触发)") + logger.info(f"{self.log_prefix}[并行Planner] 连续回复 1 次,40% 概率移除 text_reply (未触发)") # --- 结束检查历史动作 --- # 获取观察信息 @@ -851,38 +935,33 @@ class HeartFChatting: # 更新 current_available_actions 以反映移除后的状态 current_available_actions = self.action_manager.get_available_actions() logger.debug( - f"{self.log_prefix}[Planner] 临时移除的动作: {actions_to_remove_temporarily}, 当前可用: {list(current_available_actions.keys())}" + f"{self.log_prefix}[并行Planner] 临时移除的动作: {actions_to_remove_temporarily}, 当前可用: {list(current_available_actions.keys())}" ) - # --- 构建提示词 (调用修改后的 PromptBuilder 方法) --- - prompt = await prompt_builder.build_planner_prompt( - is_group_chat=self.is_group_chat, # <-- Pass HFC state - chat_target_info=self.chat_target_info, # <-- Pass HFC state - cycle_history=self._cycle_history, # <-- Pass HFC state - observed_messages_str=observed_messages_str, # <-- Pass local variable - current_mind=current_mind, # <-- Pass argument - structured_info=self.sub_mind.structured_info_str, # <-- Pass SubMind info - current_available_actions=current_available_actions, # <-- Pass determined actions + # --- 构建提示词 (与原规划器不同,不依赖 current_mind) --- + prompt = await prompt_builder.build_planner_prompt_parallel( + is_group_chat=self.is_group_chat, + chat_target_info=self.chat_target_info, + cycle_history=self._cycle_history, + observed_messages_str=observed_messages_str, + # 移除 current_mind 参数 + structured_info=self.sub_mind.structured_info_str, + current_available_actions=current_available_actions, ) # --- 调用 LLM (普通文本生成) --- llm_content = None try: - # 假设 LLMRequest 有 generate_response 方法返回 (content, reasoning, model_name) - # 我们只需要 content - # !! 注意:这里假设 self.planner_llm 有 generate_response 方法 - # !! 如果你的 LLMRequest 类使用的是其他方法名,请相应修改 - llm_content, _, _ = await self.planner_llm.generate_response(prompt=prompt) - logger.debug(f"{self.log_prefix}[Planner] LLM 原始 JSON 响应 (预期): {llm_content}") + with Timer("并行规划LLM调用", cycle_timers): + llm_content, _, _ = await self.planner_llm.generate_response(prompt=prompt) + logger.debug(f"{self.log_prefix}[并行Planner] LLM 原始 JSON 响应 (预期): {llm_content}") except Exception as req_e: - logger.error(f"{self.log_prefix}[Planner] LLM 请求执行失败: {req_e}") + logger.error(f"{self.log_prefix}[并行Planner] LLM 请求执行失败: {req_e}") reasoning = f"LLM 请求失败: {req_e}" llm_error = True # 直接使用默认动作返回错误结果 action = "no_reply" # 明确设置为默认值 emoji_query = "" # 明确设置为空 - # 不再立即返回,而是继续执行 finally 块以恢复动作 - # return { ... } # --- 解析 LLM 返回的 JSON (仅当 LLM 请求未出错时进行) --- if not llm_error and llm_content: @@ -901,10 +980,9 @@ class HeartFChatting: extracted_emoji_query = parsed_json.get("emoji_query", "") # 验证动作是否在当前可用列表中 - # !! 使用调用 prompt 时实际可用的动作列表进行验证 if extracted_action not in current_available_actions: logger.warning( - f"{self.log_prefix}[Planner] LLM 返回了当前不可用或无效的动作: '{extracted_action}' (可用: {list(current_available_actions.keys())}),将强制使用 'no_reply'" + f"{self.log_prefix}[并行Planner] LLM 返回了当前不可用或无效的动作: '{extracted_action}' (可用: {list(current_available_actions.keys())}),将强制使用 'no_reply'" ) action = "no_reply" reasoning = f"LLM 返回了当前不可用的动作 '{extracted_action}' (可用: {list(current_available_actions.keys())})。原始理由: {extracted_reasoning}" @@ -912,7 +990,7 @@ class HeartFChatting: # 检查 no_reply 是否也恰好被移除了 (极端情况) if "no_reply" not in current_available_actions: logger.error( - f"{self.log_prefix}[Planner] 严重错误:'no_reply' 动作也不可用!无法执行任何动作。" + f"{self.log_prefix}[并行Planner] 严重错误:'no_reply' 动作也不可用!无法执行任何动作。" ) action = "error" # 回退到错误状态 reasoning = "无法执行任何有效动作,包括 no_reply" @@ -926,39 +1004,36 @@ class HeartFChatting: emoji_query = extracted_emoji_query llm_error = False # 解析成功 logger.debug( - f"{self.log_prefix}[要做什么]\nPrompt:\n{prompt}\n\n决策结果 (来自JSON): {action}, 理由: {reasoning}, 表情查询: '{emoji_query}'" + f"{self.log_prefix}[并行要做什么]\nPrompt:\n{prompt}\n\n决策结果 (来自JSON): {action}, 理由: {reasoning}, 表情查询: '{emoji_query}'" ) except json.JSONDecodeError as json_e: logger.warning( - f"{self.log_prefix}[Planner] 解析LLM响应JSON失败: {json_e}. LLM原始输出: '{llm_content}'" + f"{self.log_prefix}[并行Planner] 解析LLM响应JSON失败: {json_e}. LLM原始输出: '{llm_content}'" ) reasoning = f"解析LLM响应JSON失败: {json_e}. 将使用默认动作 'no_reply'." action = "no_reply" # 解析失败则默认不回复 emoji_query = "" llm_error = True # 标记解析错误 except Exception as parse_e: - logger.error(f"{self.log_prefix}[Planner] 处理LLM响应时发生意外错误: {parse_e}") + logger.error(f"{self.log_prefix}[并行Planner] 处理LLM响应时发生意外错误: {parse_e}") reasoning = f"处理LLM响应时发生意外错误: {parse_e}. 将使用默认动作 'no_reply'." action = "no_reply" emoji_query = "" llm_error = True elif not llm_error and not llm_content: # LLM 请求成功但返回空内容 - logger.warning(f"{self.log_prefix}[Planner] LLM 返回了空内容。") + logger.warning(f"{self.log_prefix}[并行Planner] LLM 返回了空内容。") reasoning = "LLM 返回了空内容,使用默认动作 'no_reply'." action = "no_reply" emoji_query = "" llm_error = True # 标记为空响应错误 - # 如果 llm_error 在此阶段为 True,意味着请求成功但解析失败或返回空 - # 如果 llm_error 在请求阶段就为 True,则跳过了此解析块 - except Exception as outer_e: - logger.error(f"{self.log_prefix}[Planner] Planner 处理过程中发生意外错误: {outer_e}") + logger.error(f"{self.log_prefix}[并行Planner] Planner 处理过程中发生意外错误: {outer_e}") logger.error(traceback.format_exc()) action = "error" # 发生未知错误,标记为 error 动作 - reasoning = f"Planner 内部处理错误: {outer_e}" + reasoning = f"并行Planner 内部处理错误: {outer_e}" emoji_query = "" llm_error = True finally: @@ -967,13 +1042,12 @@ class HeartFChatting: if self.action_manager._original_actions_backup is not None: self.action_manager.restore_actions() logger.debug( - f"{self.log_prefix}[Planner] 恢复了原始动作集, 当前可用: {list(self.action_manager.get_available_actions().keys())}" + f"{self.log_prefix}[并行Planner] 恢复了原始动作集, 当前可用: {list(self.action_manager.get_available_actions().keys())}" ) - # --- 结束确保动作恢复 --- # --- 概率性忽略文本回复附带的表情 (逻辑保持不变) --- if action == "text_reply" and emoji_query: - logger.debug(f"{self.log_prefix}[Planner] 大模型建议文字回复带表情: '{emoji_query}'") + logger.debug(f"{self.log_prefix}[并行Planner] 大模型建议文字回复带表情: '{emoji_query}'") if random.random() > EMOJI_SEND_PRO: logger.info( f"{self.log_prefix}但是麦麦这次不想加表情 ({1 - EMOJI_SEND_PRO:.0%}),忽略表情 '{emoji_query}'" @@ -982,13 +1056,16 @@ class HeartFChatting: else: logger.info(f"{self.log_prefix}好吧,加上表情 '{emoji_query}'") # --- 结束概率性忽略 --- + + end_time = time.time() + duration = end_time - start_time + logger.debug(f"{self.log_prefix} 并行规划任务完成,耗时: {duration:.2f}秒,决定动作: {action}") # 返回结果字典 return { "action": action, "reasoning": reasoning, "emoji_query": emoji_query, - "current_mind": current_mind, "observed_messages": observed_messages, "llm_error": llm_error, # 返回错误状态 } @@ -1376,3 +1453,215 @@ class HeartFChatting: # Access MessageManager directly (using heart_fc_sender) await self.heart_fc_sender.register_thinking(thinking_message) return thinking_id + + async def _planner(self, current_mind: str, cycle_timers: dict, is_re_planned: bool = False) -> Dict[str, Any]: + """ + 规划器 (Planner): 使用LLM根据上下文决定是否和如何回复。 + 重构为:让LLM返回结构化JSON文本,然后在代码中解析。 + + 参数: + current_mind: 子思维的当前思考结果 + cycle_timers: 计时器字典 + is_re_planned: 是否为重新规划 (此重构中暂时简化,不处理 is_re_planned 的特殊逻辑) + """ + logger.info(f"{self.log_prefix}开始想要做什么") + + actions_to_remove_temporarily = [] + # --- 检查历史动作并决定临时移除动作 (逻辑保持不变) --- + lian_xu_wen_ben_hui_fu = 0 + probability_roll = random.random() + for cycle in reversed(self._cycle_history): + if cycle.action_taken: + if cycle.action_type == "text_reply": + lian_xu_wen_ben_hui_fu += 1 + else: + break + if len(self._cycle_history) > 0 and cycle.cycle_id <= self._cycle_history[0].cycle_id + ( + len(self._cycle_history) - 4 + ): + break + logger.debug(f"{self.log_prefix}[Planner] 检测到连续文本回复次数: {lian_xu_wen_ben_hui_fu}") + + if lian_xu_wen_ben_hui_fu >= 3: + logger.info(f"{self.log_prefix}[Planner] 连续回复 >= 3 次,强制移除 text_reply 和 emoji_reply") + actions_to_remove_temporarily.extend(["text_reply", "emoji_reply"]) + elif lian_xu_wen_ben_hui_fu == 2: + if probability_roll < 0.8: + logger.info(f"{self.log_prefix}[Planner] 连续回复 2 次,80% 概率移除 text_reply 和 emoji_reply (触发)") + actions_to_remove_temporarily.extend(["text_reply", "emoji_reply"]) + else: + logger.info( + f"{self.log_prefix}[Planner] 连续回复 2 次,80% 概率移除 text_reply 和 emoji_reply (未触发)" + ) + elif lian_xu_wen_ben_hui_fu == 1: + if probability_roll < 0.4: + logger.info(f"{self.log_prefix}[Planner] 连续回复 1 次,40% 概率移除 text_reply (触发)") + actions_to_remove_temporarily.append("text_reply") + else: + logger.info(f"{self.log_prefix}[Planner] 连续回复 1 次,40% 概率移除 text_reply (未触发)") + # --- 结束检查历史动作 --- + + # 获取观察信息 + observation = self.observations[0] + # if is_re_planned: # 暂时简化,不处理重新规划 + # await observation.observe() + observed_messages = observation.talking_message + observed_messages_str = observation.talking_message_str_truncate + + # --- 使用 LLM 进行决策 (JSON 输出模式) --- # + action = "no_reply" # 默认动作 + reasoning = "规划器初始化默认" + emoji_query = "" + llm_error = False # LLM 请求或解析错误标志 + + # 获取我们将传递给 prompt 构建器和用于验证的当前可用动作 + current_available_actions = self.action_manager.get_available_actions() + + try: + # --- 应用临时动作移除 --- + if actions_to_remove_temporarily: + self.action_manager.temporarily_remove_actions(actions_to_remove_temporarily) + # 更新 current_available_actions 以反映移除后的状态 + current_available_actions = self.action_manager.get_available_actions() + logger.debug( + f"{self.log_prefix}[Planner] 临时移除的动作: {actions_to_remove_temporarily}, 当前可用: {list(current_available_actions.keys())}" + ) + + # --- 构建提示词 (调用修改后的 PromptBuilder 方法) --- + prompt = await prompt_builder.build_planner_prompt( + is_group_chat=self.is_group_chat, # <-- Pass HFC state + chat_target_info=self.chat_target_info, # <-- Pass HFC state + cycle_history=self._cycle_history, # <-- Pass HFC state + observed_messages_str=observed_messages_str, # <-- Pass local variable + current_mind=current_mind, # <-- Pass argument + structured_info=self.sub_mind.structured_info_str, # <-- Pass SubMind info + current_available_actions=current_available_actions, # <-- Pass determined actions + ) + + # --- 调用 LLM (普通文本生成) --- + llm_content = None + try: + # 假设 LLMRequest 有 generate_response 方法返回 (content, reasoning, model_name) + # 我们只需要 content + # !! 注意:这里假设 self.planner_llm 有 generate_response 方法 + # !! 如果你的 LLMRequest 类使用的是其他方法名,请相应修改 + llm_content, _, _ = await self.planner_llm.generate_response(prompt=prompt) + logger.debug(f"{self.log_prefix}[Planner] LLM 原始 JSON 响应 (预期): {llm_content}") + except Exception as req_e: + logger.error(f"{self.log_prefix}[Planner] LLM 请求执行失败: {req_e}") + reasoning = f"LLM 请求失败: {req_e}" + llm_error = True + # 直接使用默认动作返回错误结果 + action = "no_reply" # 明确设置为默认值 + emoji_query = "" # 明确设置为空 + # 不再立即返回,而是继续执行 finally 块以恢复动作 + # return { ... } + + # --- 解析 LLM 返回的 JSON (仅当 LLM 请求未出错时进行) --- + if not llm_error and llm_content: + try: + # 尝试去除可能的 markdown 代码块标记 + cleaned_content = ( + llm_content.strip().removeprefix("```json").removeprefix("```").removesuffix("```").strip() + ) + if not cleaned_content: + raise json.JSONDecodeError("Cleaned content is empty", cleaned_content, 0) + parsed_json = json.loads(cleaned_content) + + # 提取决策,提供默认值 + extracted_action = parsed_json.get("action", "no_reply") + extracted_reasoning = parsed_json.get("reasoning", "LLM未提供理由") + extracted_emoji_query = parsed_json.get("emoji_query", "") + + # 验证动作是否在当前可用列表中 + # !! 使用调用 prompt 时实际可用的动作列表进行验证 + if extracted_action not in current_available_actions: + logger.warning( + f"{self.log_prefix}[Planner] LLM 返回了当前不可用或无效的动作: '{extracted_action}' (可用: {list(current_available_actions.keys())}),将强制使用 'no_reply'" + ) + action = "no_reply" + reasoning = f"LLM 返回了当前不可用的动作 '{extracted_action}' (可用: {list(current_available_actions.keys())})。原始理由: {extracted_reasoning}" + emoji_query = "" + # 检查 no_reply 是否也恰好被移除了 (极端情况) + if "no_reply" not in current_available_actions: + logger.error( + f"{self.log_prefix}[Planner] 严重错误:'no_reply' 动作也不可用!无法执行任何动作。" + ) + action = "error" # 回退到错误状态 + reasoning = "无法执行任何有效动作,包括 no_reply" + llm_error = True # 标记为严重错误 + else: + llm_error = False # 视为逻辑修正而非 LLM 错误 + else: + # 动作有效且可用 + action = extracted_action + reasoning = extracted_reasoning + emoji_query = extracted_emoji_query + llm_error = False # 解析成功 + logger.debug( + f"{self.log_prefix}[要做什么]\nPrompt:\n{prompt}\n\n决策结果 (来自JSON): {action}, 理由: {reasoning}, 表情查询: '{emoji_query}'" + ) + + except json.JSONDecodeError as json_e: + logger.warning( + f"{self.log_prefix}[Planner] 解析LLM响应JSON失败: {json_e}. LLM原始输出: '{llm_content}'" + ) + reasoning = f"解析LLM响应JSON失败: {json_e}. 将使用默认动作 'no_reply'." + action = "no_reply" # 解析失败则默认不回复 + emoji_query = "" + llm_error = True # 标记解析错误 + except Exception as parse_e: + logger.error(f"{self.log_prefix}[Planner] 处理LLM响应时发生意外错误: {parse_e}") + reasoning = f"处理LLM响应时发生意外错误: {parse_e}. 将使用默认动作 'no_reply'." + action = "no_reply" + emoji_query = "" + llm_error = True + elif not llm_error and not llm_content: + # LLM 请求成功但返回空内容 + logger.warning(f"{self.log_prefix}[Planner] LLM 返回了空内容。") + reasoning = "LLM 返回了空内容,使用默认动作 'no_reply'." + action = "no_reply" + emoji_query = "" + llm_error = True # 标记为空响应错误 + + # 如果 llm_error 在此阶段为 True,意味着请求成功但解析失败或返回空 + # 如果 llm_error 在请求阶段就为 True,则跳过了此解析块 + + except Exception as outer_e: + logger.error(f"{self.log_prefix}[Planner] Planner 处理过程中发生意外错误: {outer_e}") + logger.error(traceback.format_exc()) + action = "error" # 发生未知错误,标记为 error 动作 + reasoning = f"Planner 内部处理错误: {outer_e}" + emoji_query = "" + llm_error = True + finally: + # --- 确保动作恢复 --- + # 检查 self._original_actions_backup 是否有值来判断是否需要恢复 + if self.action_manager._original_actions_backup is not None: + self.action_manager.restore_actions() + logger.debug( + f"{self.log_prefix}[Planner] 恢复了原始动作集, 当前可用: {list(self.action_manager.get_available_actions().keys())}" + ) + # --- 结束确保动作恢复 --- + + # --- 概率性忽略文本回复附带的表情 (逻辑保持不变) --- + if action == "text_reply" and emoji_query: + logger.debug(f"{self.log_prefix}[Planner] 大模型建议文字回复带表情: '{emoji_query}'") + if random.random() > EMOJI_SEND_PRO: + logger.info( + f"{self.log_prefix}但是麦麦这次不想加表情 ({1 - EMOJI_SEND_PRO:.0%}),忽略表情 '{emoji_query}'" + ) + emoji_query = "" # 清空表情请求 + else: + logger.info(f"{self.log_prefix}好吧,加上表情 '{emoji_query}'") + # --- 结束概率性忽略 --- + + # 返回结果字典 + return { + "action": action, + "reasoning": reasoning, + "emoji_query": emoji_query, + "current_mind": current_mind, + "observed_messages": observed_messages, + "llm_error": llm_error, # 返回错误状态 + } diff --git a/src/plugins/heartFC_chat/heartflow_prompt_builder.py b/src/plugins/heartFC_chat/heartflow_prompt_builder.py index c59168a7f..642ece09a 100644 --- a/src/plugins/heartFC_chat/heartflow_prompt_builder.py +++ b/src/plugins/heartFC_chat/heartflow_prompt_builder.py @@ -17,6 +17,7 @@ from ..knowledge.knowledge_lib import qa_manager import traceback from .heartFC_Cycleinfo import CycleInfo + logger = get_logger("prompt") @@ -852,6 +853,124 @@ class PromptBuilder: logger.error(traceback.format_exc()) return "[构建 Planner Prompt 时出错]" + async def build_planner_prompt_parallel( + self, + is_group_chat: bool, + chat_target_info: Optional[dict], + cycle_history: Deque["CycleInfo"], + observed_messages_str: str, + structured_info: str, + current_available_actions: Dict[str, str], + ) -> str: + """ + 构建并行规划器的提示词 (不依赖SubMind的思考结果) + 这个方法与build_planner_prompt类似,但不需要current_mind参数, + 允许与submind的思考过程并行执行 + + 参数: + is_group_chat: 是否为群聊 + chat_target_info: 目标聊天信息 + cycle_history: 循环历史 + observed_messages_str: 观察到的消息 + structured_info: 结构化信息字符串 + current_available_actions: 当前可用的动作 + + 返回: + str: 规划器提示词 + """ + try: + # --- Determine chat context --- + chat_context_description = "你现在正在一个群聊中" + chat_target_name = None # Only relevant for private + if not is_group_chat and chat_target_info: + chat_target_name = ( + chat_target_info.get("person_name") or chat_target_info.get("user_nickname") or "对方" + ) + chat_context_description = f"你正在和 {chat_target_name} 私聊" + # --- End determining chat context --- + + # Structured info block + structured_info_block = "" + if structured_info: + structured_info_block = f"以下是一些额外的信息:\n{structured_info}\n" + + # Chat content block + chat_content_block = "" + if observed_messages_str: + # Use triple quotes for multi-line string literal + chat_content_block = f"""观察到的最新聊天内容如下: +--- +{observed_messages_str} +---""" + else: + chat_content_block = "当前没有观察到新的聊天内容。\\n" + + # Current mind block (并行模式专用) + current_mind_block = "" + + # Cycle info block (using passed cycle_history) + cycle_info_block = "" + recent_active_cycles = [] + for cycle in reversed(cycle_history): + if cycle.action_taken: + recent_active_cycles.append(cycle) + if len(recent_active_cycles) == 3: + break + + consecutive_text_replies = 0 + responses_for_prompt = [] + + for cycle in recent_active_cycles: + if cycle.action_type == "text_reply": + consecutive_text_replies += 1 + response_text = cycle.response_info.get("response_text", []) + formatted_response = "[空回复]" if not response_text else " ".join(response_text) + responses_for_prompt.append(formatted_response) + else: + break + + if consecutive_text_replies >= 3: + cycle_info_block = f'你已经连续回复了三条消息(最近: "{responses_for_prompt[0]}",第二近: "{responses_for_prompt[1]}",第三近: "{responses_for_prompt[2]}")。你回复的有点多了,请注意' + elif consecutive_text_replies == 2: + cycle_info_block = f'你已经连续回复了两条消息(最近: "{responses_for_prompt[0]}",第二近: "{responses_for_prompt[1]}"),请注意' + elif consecutive_text_replies == 1: + cycle_info_block = f'你刚刚已经回复一条消息(内容: "{responses_for_prompt[0]}")' + + if cycle_info_block: + cycle_info_block = f"\n【近期回复历史】\n{cycle_info_block}\n" + else: + cycle_info_block = "\n【近期回复历史】\n(最近没有连续文本回复)\n" + + individuality = Individuality.get_instance() + prompt_personality = individuality.get_prompt(x_person=2, level=2) + + action_options_text = "当前你可以选择的行动有:\n" + action_keys = list(current_available_actions.keys()) + for name in action_keys: + desc = current_available_actions[name] + action_options_text += f"- '{name}': {desc}\n" + example_action_key = action_keys[0] if action_keys else "no_reply" + + planner_prompt_template = await global_prompt_manager.get_prompt_async("planner_prompt") + + prompt = planner_prompt_template.format( + bot_name=global_config.BOT_NICKNAME, + prompt_personality=prompt_personality, + chat_context_description=chat_context_description, + structured_info_block=structured_info_block, + chat_content_block=chat_content_block, + current_mind_block=current_mind_block, + cycle_info_block=cycle_info_block, + action_options_text=action_options_text, + example_action=example_action_key, + ) + return prompt + + except Exception as e: + logger.error(f"[PromptBuilder] 构建并行 Planner 提示词时出错: {e}") + logger.error(traceback.format_exc()) + return "[构建并行 Planner Prompt 时出错]" + init_prompt() prompt_builder = PromptBuilder()