From e0c1bb71bebdaa55df45903b56f78159c72f1684 Mon Sep 17 00:00:00 2001 From: SengokuCola <1026294844@qq.com> Date: Mon, 23 Jun 2025 21:05:19 +0800 Subject: [PATCH] =?UTF-8?q?feat=EF=BC=9A=E5=B0=86=E9=83=A8=E5=88=86?= =?UTF-8?q?=E5=A4=84=E7=90=86=E5=99=A8=E5=90=8E=E7=BD=AE=EF=BC=8C=E5=A4=A7?= =?UTF-8?q?=E5=A4=A7=E5=87=8F=E5=B0=91=E6=B6=88=E8=80=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/chat/focus_chat/heartFC_chat.py | 228 ++++++++++++++++-- .../normal_chat/willing/mode_classical.py | 2 +- src/chat/normal_chat/willing/mode_mxp.py | 2 +- 3 files changed, 205 insertions(+), 27 deletions(-) diff --git a/src/chat/focus_chat/heartFC_chat.py b/src/chat/focus_chat/heartFC_chat.py index 7e5139c51..ea5bc8040 100644 --- a/src/chat/focus_chat/heartFC_chat.py +++ b/src/chat/focus_chat/heartFC_chat.py @@ -30,6 +30,9 @@ from src.chat.focus_chat.planners.action_manager import ActionManager from src.config.config import global_config from src.chat.focus_chat.hfc_performance_logger import HFCPerformanceLogger from src.chat.focus_chat.hfc_version_manager import get_hfc_version +from src.chat.focus_chat.info.relation_info import RelationInfo +from src.chat.focus_chat.info.expression_selection_info import ExpressionSelectionInfo + install(extra_lines=3) @@ -50,6 +53,10 @@ PROCESSOR_CLASSES = { "ChattingInfoProcessor": (ChattingInfoProcessor, None), "ToolProcessor": (ToolProcessor, "tool_use_processor"), "WorkingMemoryProcessor": (WorkingMemoryProcessor, "working_memory_processor"), +} + +# 定义后期处理器映射:在规划后、动作执行前运行的处理器 +POST_PLANNING_PROCESSOR_CLASSES = { "PersonImpressionpProcessor": (PersonImpressionpProcessor, "person_impression_processor"), "ExpressionSelectorProcessor": (ExpressionSelectorProcessor, "expression_selector_processor"), } @@ -113,22 +120,34 @@ class HeartFChatting: self.enabled_processor_names = [] for proc_name, (_proc_class, config_key) in PROCESSOR_CLASSES.items(): + # 检查处理器是否应该启用 + if not config_key or getattr(config_processor_settings, config_key, True): + self.enabled_processor_names.append(proc_name) + + # 初始化后期处理器(规划后执行的处理器) + self.enabled_post_planning_processor_names = [] + for proc_name, (_proc_class, config_key) in POST_PLANNING_PROCESSOR_CLASSES.items(): # 对于关系处理器,需要同时检查两个配置项 if proc_name == "PersonImpressionpProcessor": if global_config.relationship.enable_relationship and getattr( config_processor_settings, config_key, True ): - self.enabled_processor_names.append(proc_name) + self.enabled_post_planning_processor_names.append(proc_name) else: - # 其他处理器的原有逻辑 + # 其他后期处理器的逻辑 if not config_key or getattr(config_processor_settings, config_key, True): - self.enabled_processor_names.append(proc_name) + self.enabled_post_planning_processor_names.append(proc_name) # logger.info(f"{self.log_prefix} 将启用的处理器: {self.enabled_processor_names}") + # logger.info(f"{self.log_prefix} 将启用的后期处理器: {self.enabled_post_planning_processor_names}") self.processors: List[BaseProcessor] = [] self._register_default_processors() + # 初始化后期处理器 + self.post_planning_processors: List[BaseProcessor] = [] + self._register_post_planning_processors() + self.action_manager = ActionManager() self.action_planner = PlannerFactory.create_planner( log_prefix=self.log_prefix, action_manager=self.action_manager @@ -195,8 +214,6 @@ class HeartFChatting: if name in [ "ToolProcessor", "WorkingMemoryProcessor", - "PersonImpressionpProcessor", - "ExpressionSelectorProcessor", ]: self.processors.append(processor_actual_class(subheartflow_id=self.stream_id)) elif name == "ChattingInfoProcessor": @@ -222,6 +239,41 @@ class HeartFChatting: else: logger.warning(f"{self.log_prefix} 没有注册任何处理器。这可能是由于配置错误或所有处理器都被禁用了。") + def _register_post_planning_processors(self): + """根据 self.enabled_post_planning_processor_names 注册后期处理器""" + self.post_planning_processors = [] # 清空已有的 + + for name in self.enabled_post_planning_processor_names: # 'name' is "PersonImpressionpProcessor", etc. + processor_info = POST_PLANNING_PROCESSOR_CLASSES.get(name) # processor_info is (ProcessorClass, config_key) + if processor_info: + processor_actual_class = processor_info[0] # 获取实际的类定义 + # 根据处理器类名判断是否需要 subheartflow_id + if name in [ + "PersonImpressionpProcessor", + "ExpressionSelectorProcessor", + ]: + self.post_planning_processors.append(processor_actual_class(subheartflow_id=self.stream_id)) + else: + # 对于POST_PLANNING_PROCESSOR_CLASSES中定义但此处未明确处理构造的处理器 + # (例如, 新增了一个处理器到POST_PLANNING_PROCESSOR_CLASSES, 它不需要id, 也不叫PersonImpressionpProcessor) + try: + self.post_planning_processors.append(processor_actual_class()) # 尝试无参构造 + logger.debug(f"{self.log_prefix} 注册后期处理器 {name} (尝试无参构造).") + except TypeError: + logger.error( + f"{self.log_prefix} 后期处理器 {name} 构造失败。它可能需要参数(如 subheartflow_id)但未在注册逻辑中明确处理。" + ) + else: + # 这理论上不应该发生,因为 enabled_post_planning_processor_names 是从 POST_PLANNING_PROCESSOR_CLASSES 的键生成的 + logger.warning( + f"{self.log_prefix} 在 POST_PLANNING_PROCESSOR_CLASSES 中未找到名为 '{name}' 的处理器定义,将跳过注册。" + ) + + if self.post_planning_processors: + logger.info(f"{self.log_prefix} 已注册后期处理器: {[p.__class__.__name__ for p in self.post_planning_processors]}") + else: + logger.warning(f"{self.log_prefix} 没有注册任何后期处理器。这可能是由于配置错误或所有后期处理器都被禁用了。") + async def start(self): """检查是否需要启动主循环,如果未激活则启动。""" logger.debug(f"{self.log_prefix} 开始启动 HeartFChatting") @@ -508,6 +560,7 @@ class HeartFChatting: ) task = asyncio.create_task(run_with_timeout()) + processor_tasks.append(task) task_to_name_map[task] = processor_name logger.debug(f"{self.log_prefix} 启动处理器任务: {processor_name}") @@ -561,6 +614,139 @@ class HeartFChatting: return all_plan_info, processor_time_costs + async def _process_post_planning_processors(self, observations: List[Observation], action_data: dict) -> dict: + """ + 处理后期处理器(规划后执行的处理器) + 包括:关系处理器、表达选择器、记忆激活器 + + 参数: + observations: 观察器列表 + action_data: 原始动作数据 + + 返回: + dict: 更新后的动作数据 + """ + logger.info(f"{self.log_prefix} 开始执行后期处理器") + + # 创建所有后期任务 + task_list = [] + task_to_name_map = {} + + # 添加后期处理器任务 + for processor in self.post_planning_processors: + processor_name = processor.__class__.__name__ + + async def run_processor_with_timeout(proc=processor): + return await asyncio.wait_for( + proc.process_info(observations=observations), + timeout=global_config.focus_chat.processor_max_time, + ) + + task = asyncio.create_task(run_processor_with_timeout()) + task_list.append(task) + task_to_name_map[task] = ("processor", processor_name) + logger.info(f"{self.log_prefix} 启动后期处理器任务: {processor_name}") + + # 添加记忆激活器任务 + async def run_memory_with_timeout(): + return await asyncio.wait_for( + self.memory_activator.activate_memory(observations), + timeout=MEMORY_ACTIVATION_TIMEOUT, + ) + + memory_task = asyncio.create_task(run_memory_with_timeout()) + task_list.append(memory_task) + task_to_name_map[memory_task] = ("memory", "MemoryActivator") + logger.info(f"{self.log_prefix} 启动记忆激活器任务") + + # 如果没有任何后期任务,直接返回 + if not task_list: + logger.info(f"{self.log_prefix} 没有启用的后期处理器或记忆激活器") + return action_data + + # 等待所有任务完成 + pending_tasks = set(task_list) + all_post_plan_info = [] + running_memorys = [] + + while pending_tasks: + done, pending_tasks = await asyncio.wait(pending_tasks, return_when=asyncio.FIRST_COMPLETED) + + for task in done: + task_type, task_name = task_to_name_map[task] + + try: + result = await task + + if task_type == "processor": + logger.info(f"{self.log_prefix} 后期处理器 {task_name} 已完成!") + if result is not None: + all_post_plan_info.extend(result) + else: + logger.warning(f"{self.log_prefix} 后期处理器 {task_name} 返回了 None") + elif task_type == "memory": + logger.info(f"{self.log_prefix} 记忆激活器已完成!") + if result is not None: + running_memorys = result + else: + logger.warning(f"{self.log_prefix} 记忆激活器返回了 None") + running_memorys = [] + + except asyncio.TimeoutError: + if task_type == "processor": + logger.warning( + f"{self.log_prefix} 后期处理器 {task_name} 超时(>{global_config.focus_chat.processor_max_time}s),已跳过" + ) + elif task_type == "memory": + logger.warning(f"{self.log_prefix} 记忆激活器超时(>{MEMORY_ACTIVATION_TIMEOUT}s),已跳过") + running_memorys = [] + except Exception as e: + if task_type == "processor": + logger.error( + f"{self.log_prefix} 后期处理器 {task_name} 执行失败. 错误: {e}", + exc_info=True, + ) + elif task_type == "memory": + logger.error(f"{self.log_prefix} 记忆激活器执行失败. 错误: {e}", exc_info=True) + running_memorys = [] + + # 将后期处理器的结果整合到 action_data 中 + updated_action_data = action_data.copy() + + relation_info = "" + selected_expressions = [] + + for info in all_post_plan_info: + if isinstance(info, RelationInfo): + relation_info = info.get_processed_info() + elif isinstance(info, ExpressionSelectionInfo): + selected_expressions = info.get_expressions_for_action_data() + + if relation_info: + updated_action_data["relation_info_block"] = relation_info + + # 将选中的表达方式传递给action_data + if selected_expressions: + updated_action_data["selected_expressions"] = selected_expressions + logger.info(f"{self.log_prefix} 传递{len(selected_expressions)}个选中的表达方式到action_data") + + # 将记忆信息也添加到action_data中 + if running_memorys: + updated_action_data["running_memories"] = running_memorys + + # 生成兼容的memory_block格式 + memory_str = "以下是当前在聊天中,你回忆起的记忆:\n" + for running_memory in running_memorys: + memory_str += f"{running_memory['content']}\n" + updated_action_data["memory_block"] = memory_str + + logger.info(f"{self.log_prefix} 添加了 {len(running_memorys)} 个激活的记忆到action_data") + + if all_post_plan_info or running_memorys: + logger.info(f"{self.log_prefix} 后期处理完成,产生了 {len(all_post_plan_info)} 个信息项和 {len(running_memorys)} 个记忆") + + return updated_action_data + async def _observe_process_plan_action_loop(self, cycle_timers: dict, thinking_id: str) -> dict: try: loop_start_time = time.time() @@ -588,26 +774,20 @@ class HeartFChatting: self.observations.append(self.action_observation) return True - # 创建三个并行任务,为LLM调用添加超时保护 + # 创建两个并行任务,为LLM调用添加超时保护 action_modify_task = asyncio.create_task( asyncio.wait_for(modify_actions_task(), timeout=ACTION_MODIFICATION_TIMEOUT) ) - memory_task = asyncio.create_task( - asyncio.wait_for( - self.memory_activator.activate_memory(self.observations), timeout=MEMORY_ACTIVATION_TIMEOUT - ) - ) processor_task = asyncio.create_task(self._process_processors(self.observations)) - # 等待三个任务完成,使用超时保护和详细错误处理 + # 等待两个任务完成,使用超时保护和详细错误处理 action_modify_result = None - running_memorys = [] all_plan_info = [] processor_time_costs = {} try: - action_modify_result, running_memorys, (all_plan_info, processor_time_costs) = await asyncio.gather( - action_modify_task, memory_task, processor_task, return_exceptions=True + action_modify_result, (all_plan_info, processor_time_costs) = await asyncio.gather( + action_modify_task, processor_task, return_exceptions=True ) # 检查各个任务的结果 @@ -617,13 +797,6 @@ class HeartFChatting: else: logger.error(f"{self.log_prefix} 动作修改任务失败: {action_modify_result}") - if isinstance(running_memorys, Exception): - if isinstance(running_memorys, asyncio.TimeoutError): - logger.error(f"{self.log_prefix} 记忆激活任务超时") - else: - logger.error(f"{self.log_prefix} 记忆激活任务失败: {running_memorys}") - running_memorys = [] - processor_result = (all_plan_info, processor_time_costs) if isinstance(processor_result, Exception): if isinstance(processor_result, asyncio.TimeoutError): @@ -638,7 +811,6 @@ class HeartFChatting: except Exception as e: logger.error(f"{self.log_prefix} 并行任务gather失败: {e}") # 设置默认值以继续执行 - running_memorys = [] all_plan_info = [] processor_time_costs = {} @@ -648,11 +820,11 @@ class HeartFChatting: } logger.debug( - f"{self.log_prefix} 并行阶段完成,准备进入规划器,plan_info数量: {len(all_plan_info)}, running_memorys数量: {len(running_memorys)}" + f"{self.log_prefix} 并行阶段完成,准备进入规划器,plan_info数量: {len(all_plan_info)}" ) with Timer("规划器", cycle_timers): - plan_result = await self.action_planner.plan(all_plan_info, running_memorys, loop_start_time) + plan_result = await self.action_planner.plan(all_plan_info, [], loop_start_time) loop_plan_info = { "action_result": plan_result.get("action_result", {}), @@ -675,6 +847,12 @@ class HeartFChatting: logger.debug(f"{self.log_prefix} 麦麦想要:'{action_str}'") + # 如果动作不是no_reply,则执行后期处理器 + if action_type != "no_reply": + with Timer("后期处理器", cycle_timers): + logger.debug(f"{self.log_prefix} 执行后期处理器(动作类型: {action_type})") + action_data = await self._process_post_planning_processors(self.observations, action_data) + success, reply_text, command = await self._handle_action( action_type, reasoning, action_data, cycle_timers, thinking_id ) diff --git a/src/chat/normal_chat/willing/mode_classical.py b/src/chat/normal_chat/willing/mode_classical.py index 067b0a0ac..5ef1ed387 100644 --- a/src/chat/normal_chat/willing/mode_classical.py +++ b/src/chat/normal_chat/willing/mode_classical.py @@ -52,7 +52,7 @@ class ClassicalWillingManager(BaseWillingManager): # 检查群组权限(如果是群聊) if ( willing_info.group_info - and willing_info.group_info.group_id in global_config.chat.talk_frequency_down_groups + and willing_info.group_info.group_id in global_config.normal_chat.talk_frequency_down_groups ): reply_probability = reply_probability / global_config.normal_chat.down_frequency_rate diff --git a/src/chat/normal_chat/willing/mode_mxp.py b/src/chat/normal_chat/willing/mode_mxp.py index 41bb22aa7..2a294d0c5 100644 --- a/src/chat/normal_chat/willing/mode_mxp.py +++ b/src/chat/normal_chat/willing/mode_mxp.py @@ -179,7 +179,7 @@ class MxpWillingManager(BaseWillingManager): if w_info.is_picid: probability = 0 # picid格式消息直接不回复 - if w_info.group_info and w_info.group_info.group_id in global_config.chat.talk_frequency_down_groups: + if w_info.group_info and w_info.group_info.group_id in global_config.normal_chat.talk_frequency_down_groups: probability /= global_config.normal_chat.down_frequency_rate self.temporary_willing = current_willing