diff --git a/src/chat/focus_chat/heartFC_Cycleinfo.py b/src/chat/focus_chat/heartFC_Cycleinfo.py index 37eea6e9a..120381df3 100644 --- a/src/chat/focus_chat/heartFC_Cycleinfo.py +++ b/src/chat/focus_chat/heartFC_Cycleinfo.py @@ -22,9 +22,10 @@ class CycleDetail: # 新字段 self.loop_observation_info: Dict[str, Any] = {} - self.loop_process_info: Dict[str, Any] = {} + self.loop_processor_info: Dict[str, Any] = {} # 前处理器信息 self.loop_plan_info: Dict[str, Any] = {} self.loop_action_info: Dict[str, Any] = {} + self.loop_post_processor_info: Dict[str, Any] = {} # 后处理器信息 def to_dict(self) -> Dict[str, Any]: """将循环信息转换为字典格式""" @@ -76,9 +77,10 @@ class CycleDetail: "timers": self.timers, "thinking_id": self.thinking_id, "loop_observation_info": convert_to_serializable(self.loop_observation_info), - "loop_process_info": convert_to_serializable(self.loop_process_info), + "loop_processor_info": convert_to_serializable(self.loop_processor_info), "loop_plan_info": convert_to_serializable(self.loop_plan_info), "loop_action_info": convert_to_serializable(self.loop_action_info), + "loop_post_processor_info": convert_to_serializable(self.loop_post_processor_info), } def complete_cycle(self): @@ -133,3 +135,4 @@ class CycleDetail: self.loop_processor_info = loop_info["loop_processor_info"] self.loop_plan_info = loop_info["loop_plan_info"] self.loop_action_info = loop_info["loop_action_info"] + self.loop_post_processor_info = loop_info["loop_post_processor_info"] diff --git a/src/chat/focus_chat/heartFC_chat.py b/src/chat/focus_chat/heartFC_chat.py index e9bfa4b80..584aa4087 100644 --- a/src/chat/focus_chat/heartFC_chat.py +++ b/src/chat/focus_chat/heartFC_chat.py @@ -454,7 +454,19 @@ class HeartFChatting: formatted_ptime = f"{ptime * 1000:.2f}毫秒" if ptime < 1 else f"{ptime:.2f}秒" processor_time_strings.append(f"{pname}: {formatted_ptime}") processor_time_log = ( - ("\n各处理器耗时: " + "; ".join(processor_time_strings)) if processor_time_strings else "" + ("\n前处理器耗时: " + "; ".join(processor_time_strings)) if processor_time_strings else "" + ) + + # 新增:输出每个后处理器的耗时 + post_processor_time_costs = self._current_cycle_detail.loop_post_processor_info.get( + "post_processor_time_costs", {} + ) + post_processor_time_strings = [] + for pname, ptime in post_processor_time_costs.items(): + formatted_ptime = f"{ptime * 1000:.2f}毫秒" if ptime < 1 else f"{ptime:.2f}秒" + post_processor_time_strings.append(f"{pname}: {formatted_ptime}") + post_processor_time_log = ( + ("\n后处理器耗时: " + "; ".join(post_processor_time_strings)) if post_processor_time_strings else "" ) logger.info( @@ -463,6 +475,7 @@ class HeartFChatting: f"动作: {self._current_cycle_detail.loop_plan_info.get('action_result', {}).get('action_type', '未知动作')}" + (f"\n详情: {'; '.join(timer_strings)}" if timer_strings else "") + processor_time_log + + post_processor_time_log ) # 记录性能数据 @@ -473,6 +486,8 @@ class HeartFChatting: "action_type": action_result.get("action_type", "unknown"), "total_time": self._current_cycle_detail.end_time - self._current_cycle_detail.start_time, "step_times": cycle_timers.copy(), + "processor_time_costs": processor_time_costs, # 前处理器时间 + "post_processor_time_costs": post_processor_time_costs, # 后处理器时间 "reasoning": action_result.get("reasoning", ""), "success": self._current_cycle_detail.loop_action_info.get("action_taken", False), } @@ -754,6 +769,180 @@ class HeartFChatting: return updated_action_data + async def _process_post_planning_processors_with_timing(self, observations: List[Observation], action_data: dict) -> tuple[dict, dict]: + """ + 处理后期处理器(规划后执行的处理器)并收集详细时间统计 + 包括:关系处理器、表达选择器、记忆激活器 + + 参数: + observations: 观察器列表 + action_data: 原始动作数据 + + 返回: + tuple[dict, dict]: (更新后的动作数据, 后处理器时间统计) + """ + logger.info(f"{self.log_prefix} 开始执行后期处理器(带详细统计)") + + # 创建所有后期任务 + task_list = [] + task_to_name_map = {} + task_start_times = {} + post_processor_time_costs = {} + + # 添加后期处理器任务 + for processor in self.post_planning_processors: + processor_name = processor.__class__.__name__ + + async def run_processor_with_timeout_and_timing(proc=processor, name=processor_name): + start_time = time.time() + try: + result = await asyncio.wait_for( + proc.process_info(observations=observations), + timeout=global_config.focus_chat.processor_max_time, + ) + end_time = time.time() + post_processor_time_costs[name] = end_time - start_time + logger.debug(f"{self.log_prefix} 后期处理器 {name} 耗时: {end_time - start_time:.3f}秒") + return result + except Exception as e: + end_time = time.time() + post_processor_time_costs[name] = end_time - start_time + logger.warning(f"{self.log_prefix} 后期处理器 {name} 执行异常,耗时: {end_time - start_time:.3f}秒") + raise e + + task = asyncio.create_task(run_processor_with_timeout_and_timing()) + task_list.append(task) + task_to_name_map[task] = ("processor", processor_name) + task_start_times[task] = time.time() + logger.info(f"{self.log_prefix} 启动后期处理器任务: {processor_name}") + + # 添加记忆激活器任务 + async def run_memory_with_timeout_and_timing(): + start_time = time.time() + try: + result = await asyncio.wait_for( + self.memory_activator.activate_memory(observations), + timeout=MEMORY_ACTIVATION_TIMEOUT, + ) + end_time = time.time() + post_processor_time_costs["MemoryActivator"] = end_time - start_time + logger.debug(f"{self.log_prefix} 记忆激活器耗时: {end_time - start_time:.3f}秒") + return result + except Exception as e: + end_time = time.time() + post_processor_time_costs["MemoryActivator"] = end_time - start_time + logger.warning(f"{self.log_prefix} 记忆激活器执行异常,耗时: {end_time - start_time:.3f}秒") + raise e + + memory_task = asyncio.create_task(run_memory_with_timeout_and_timing()) + task_list.append(memory_task) + task_to_name_map[memory_task] = ("memory", "MemoryActivator") + task_start_times[memory_task] = time.time() + logger.info(f"{self.log_prefix} 启动记忆激活器任务") + + # 如果没有任何后期任务,直接返回 + if not task_list: + logger.info(f"{self.log_prefix} 没有启用的后期处理器或记忆激活器") + return action_data, {} + + # 等待所有任务完成 + pending_tasks = set(task_list) + all_post_plan_info = [] + running_memorys = [] + + while pending_tasks: + done, pending_tasks = await asyncio.wait(pending_tasks, return_when=asyncio.FIRST_COMPLETED) + + for task in done: + task_type, task_name = task_to_name_map[task] + + try: + result = await task + + if task_type == "processor": + logger.info(f"{self.log_prefix} 后期处理器 {task_name} 已完成!") + if result is not None: + all_post_plan_info.extend(result) + else: + logger.warning(f"{self.log_prefix} 后期处理器 {task_name} 返回了 None") + elif task_type == "memory": + logger.info(f"{self.log_prefix} 记忆激活器已完成!") + if result is not None: + running_memorys = result + else: + logger.warning(f"{self.log_prefix} 记忆激活器返回了 None") + running_memorys = [] + + except asyncio.TimeoutError: + # 对于超时任务,记录已用时间 + elapsed_time = time.time() - task_start_times[task] + if task_type == "processor": + post_processor_time_costs[task_name] = elapsed_time + logger.warning( + f"{self.log_prefix} 后期处理器 {task_name} 超时(>{global_config.focus_chat.processor_max_time}s),已跳过,耗时: {elapsed_time:.3f}秒" + ) + elif task_type == "memory": + post_processor_time_costs["MemoryActivator"] = elapsed_time + logger.warning(f"{self.log_prefix} 记忆激活器超时(>{MEMORY_ACTIVATION_TIMEOUT}s),已跳过,耗时: {elapsed_time:.3f}秒") + running_memorys = [] + except Exception as e: + # 对于异常任务,记录已用时间 + elapsed_time = time.time() - task_start_times[task] + if task_type == "processor": + post_processor_time_costs[task_name] = elapsed_time + logger.error( + f"{self.log_prefix} 后期处理器 {task_name} 执行失败,耗时: {elapsed_time:.3f}秒. 错误: {e}", + exc_info=True, + ) + elif task_type == "memory": + post_processor_time_costs["MemoryActivator"] = elapsed_time + logger.error(f"{self.log_prefix} 记忆激活器执行失败,耗时: {elapsed_time:.3f}秒. 错误: {e}", exc_info=True) + running_memorys = [] + + # 将后期处理器的结果整合到 action_data 中 + updated_action_data = action_data.copy() + + relation_info = "" + selected_expressions = [] + structured_info = "" + + for info in all_post_plan_info: + if isinstance(info, RelationInfo): + relation_info = info.get_processed_info() + elif isinstance(info, ExpressionSelectionInfo): + selected_expressions = info.get_expressions_for_action_data() + elif isinstance(info, StructuredInfo): + structured_info = info.get_processed_info() + + if relation_info: + updated_action_data["relation_info_block"] = relation_info + + if selected_expressions: + updated_action_data["selected_expressions"] = selected_expressions + + if structured_info: + updated_action_data["structured_info"] = structured_info + + # 特殊处理running_memorys + if running_memorys: + memory_str = "以下是当前在聊天中,你回忆起的记忆:\n" + for running_memory in running_memorys: + memory_str += f"{running_memory['content']}\n" + updated_action_data["memory_block"] = memory_str + logger.info(f"{self.log_prefix} 添加了 {len(running_memorys)} 个激活的记忆到action_data") + + if all_post_plan_info or running_memorys: + logger.info( + f"{self.log_prefix} 后期处理完成,产生了 {len(all_post_plan_info)} 个信息项和 {len(running_memorys)} 个记忆" + ) + + # 输出详细统计信息 + if post_processor_time_costs: + stats_str = ", ".join([f"{name}: {time_cost:.3f}s" for name, time_cost in post_processor_time_costs.items()]) + logger.info(f"{self.log_prefix} 后期处理器详细耗时统计: {stats_str}") + + return updated_action_data, post_processor_time_costs + async def _observe_process_plan_action_loop(self, cycle_timers: dict, thinking_id: str) -> dict: try: loop_start_time = time.time() @@ -836,28 +1025,37 @@ class HeartFChatting: "observed_messages": plan_result.get("observed_messages", ""), } - with Timer("执行动作", cycle_timers): - action_type, action_data, reasoning = ( - plan_result.get("action_result", {}).get("action_type", "error"), - plan_result.get("action_result", {}).get("action_data", {}), - plan_result.get("action_result", {}).get("reasoning", "未提供理由"), - ) + # 修正:将后期处理器从执行动作Timer中分离出来 + action_type, action_data, reasoning = ( + plan_result.get("action_result", {}).get("action_type", "error"), + plan_result.get("action_result", {}).get("action_data", {}), + plan_result.get("action_result", {}).get("reasoning", "未提供理由"), + ) - if action_type == "reply": - action_str = "回复" - elif action_type == "no_reply": - action_str = "不回复" - else: - action_str = action_type + if action_type == "reply": + action_str = "回复" + elif action_type == "no_reply": + action_str = "不回复" + else: + action_str = action_type - logger.debug(f"{self.log_prefix} 麦麦想要:'{action_str}'") + logger.debug(f"{self.log_prefix} 麦麦想要:'{action_str}'") - # 如果动作不是no_reply,则执行后期处理器 - if action_type != "no_reply": - with Timer("后期处理器", cycle_timers): - logger.debug(f"{self.log_prefix} 执行后期处理器(动作类型: {action_type})") - action_data = await self._process_post_planning_processors(self.observations, action_data) + # 添加:单独计时后期处理器,并收集详细统计 + post_processor_time_costs = {} + if action_type != "no_reply": + with Timer("后期处理器", cycle_timers): + logger.debug(f"{self.log_prefix} 执行后期处理器(动作类型: {action_type})") + # 记录详细的后处理器时间 + post_start_time = time.time() + action_data, post_processor_time_costs = await self._process_post_planning_processors_with_timing(self.observations, action_data) + post_end_time = time.time() + logger.info(f"{self.log_prefix} 后期处理器总耗时: {post_end_time - post_start_time:.3f}秒") + else: + logger.debug(f"{self.log_prefix} 跳过后期处理器(动作类型: {action_type})") + # 修正:纯动作执行计时 + with Timer("动作执行", cycle_timers): success, reply_text, command = await self._handle_action( action_type, reasoning, action_data, cycle_timers, thinking_id ) @@ -869,11 +1067,17 @@ class HeartFChatting: "taken_time": time.time(), } + # 添加后处理器统计到loop_info + loop_post_processor_info = { + "post_processor_time_costs": post_processor_time_costs, + } + loop_info = { "loop_observation_info": loop_observation_info, "loop_processor_info": loop_processor_info, "loop_plan_info": loop_plan_info, "loop_action_info": loop_action_info, + "loop_post_processor_info": loop_post_processor_info, # 新增 } return loop_info diff --git a/src/chat/focus_chat/hfc_performance_logger.py b/src/chat/focus_chat/hfc_performance_logger.py index 16f222e80..4d7e0561b 100644 --- a/src/chat/focus_chat/hfc_performance_logger.py +++ b/src/chat/focus_chat/hfc_performance_logger.py @@ -41,6 +41,8 @@ class HFCPerformanceLogger: "action_type": cycle_data.get("action_type", "unknown"), "total_time": cycle_data.get("total_time", 0), "step_times": cycle_data.get("step_times", {}), + "processor_time_costs": cycle_data.get("processor_time_costs", {}), # 前处理器时间 + "post_processor_time_costs": cycle_data.get("post_processor_time_costs", {}), # 后处理器时间 "reasoning": cycle_data.get("reasoning", ""), "success": cycle_data.get("success", False), } @@ -51,9 +53,22 @@ class HFCPerformanceLogger: # 立即写入文件(防止数据丢失) self._write_session_data() - logger.debug( - f"记录HFC循环数据: cycle_id={record['cycle_id']}, action={record['action_type']}, time={record['total_time']:.2f}s" - ) + # 构建详细的日志信息 + log_parts = [ + f"cycle_id={record['cycle_id']}", + f"action={record['action_type']}", + f"time={record['total_time']:.2f}s" + ] + + # 添加后处理器时间信息到日志 + if record['post_processor_time_costs']: + post_processor_stats = ", ".join([ + f"{name}: {time_cost:.3f}s" + for name, time_cost in record['post_processor_time_costs'].items() + ]) + log_parts.append(f"post_processors=({post_processor_stats})") + + logger.debug(f"记录HFC循环数据: {', '.join(log_parts)}") except Exception as e: logger.error(f"记录HFC循环数据失败: {e}") diff --git a/src/chat/focus_chat/replyer/default_generator.py b/src/chat/focus_chat/replyer/default_generator.py index 7a0142dd8..e93945c00 100644 --- a/src/chat/focus_chat/replyer/default_generator.py +++ b/src/chat/focus_chat/replyer/default_generator.py @@ -440,6 +440,7 @@ class DefaultReplyer: chat_info=chat_talking_prompt, memory_block=memory_block, structured_info_block=structured_info_block, + relation_info_block=relation_info_block, extra_info_block=extra_info_block, time_block=time_block, keywords_reaction_prompt=keywords_reaction_prompt, diff --git a/src/chat/utils/statistic.py b/src/chat/utils/statistic.py index bb3f53a1a..59c7dc164 100644 --- a/src/chat/utils/statistic.py +++ b/src/chat/utils/statistic.py @@ -58,6 +58,12 @@ FOCUS_CYCLE_CNT_BY_VERSION = "focus_cycle_count_by_version" FOCUS_ACTION_RATIOS_BY_VERSION = "focus_action_ratios_by_version" FOCUS_AVG_TIMES_BY_VERSION = "focus_avg_times_by_version" +# 新增: 后处理器统计数据的键 +FOCUS_POST_PROCESSOR_TIMES = "focus_post_processor_times" +FOCUS_POST_PROCESSOR_COUNT = "focus_post_processor_count" +FOCUS_POST_PROCESSOR_SUCCESS_RATE = "focus_post_processor_success_rate" +FOCUS_PROCESSOR_TIMES = "focus_processor_times" # 前处理器统计 + class OnlineTimeRecordTask(AsyncTask): """在线时间记录任务""" @@ -495,6 +501,10 @@ class StatisticOutputTask(AsyncTask): FOCUS_AVG_TIMES_BY_VERSION: defaultdict(lambda: defaultdict(list)), "focus_exec_times_by_version_action": defaultdict(lambda: defaultdict(list)), "focus_action_ratios_by_chat": defaultdict(lambda: defaultdict(int)), + # 新增:前处理器和后处理器统计字段 + FOCUS_PROCESSOR_TIMES: defaultdict(list), # 前处理器时间 + FOCUS_POST_PROCESSOR_TIMES: defaultdict(list), # 后处理器时间 + FOCUS_POST_PROCESSOR_COUNT: defaultdict(int), # 后处理器执行次数 } for period_key, _ in collect_period } @@ -556,6 +566,10 @@ class StatisticOutputTask(AsyncTask): total_time = cycle_data.get("total_time", 0.0) step_times = cycle_data.get("step_times", {}) version = cycle_data.get("version", "unknown") + + # 新增:获取前处理器和后处理器时间 + processor_time_costs = cycle_data.get("processor_time_costs", {}) + post_processor_time_costs = cycle_data.get("post_processor_time_costs", {}) # 更新聊天ID名称映射 if chat_id not in self.name_mapping: @@ -594,6 +608,15 @@ class StatisticOutputTask(AsyncTask): stat["focus_exec_times_by_chat_action"][chat_id][action_type].append(time_val) # 按版本和action类型收集执行时间 stat["focus_exec_times_by_version_action"][version][action_type].append(time_val) + + # 新增:前处理器时间统计 + for processor_name, time_val in processor_time_costs.items(): + stat[FOCUS_PROCESSOR_TIMES][processor_name].append(time_val) + + # 新增:后处理器时间统计 + for processor_name, time_val in post_processor_time_costs.items(): + stat[FOCUS_POST_PROCESSOR_TIMES][processor_name].append(time_val) + stat[FOCUS_POST_PROCESSOR_COUNT][processor_name] += 1 break except Exception as e: logger.warning(f"Failed to process cycle data: {e}") @@ -651,6 +674,20 @@ class StatisticOutputTask(AsyncTask): else: stat["focus_exec_times_by_version_action"][version][action_type] = 0.0 + # 新增:计算前处理器平均时间 + for processor_name, times in stat[FOCUS_PROCESSOR_TIMES].items(): + if times: + stat[FOCUS_PROCESSOR_TIMES][processor_name] = sum(times) / len(times) + else: + stat[FOCUS_PROCESSOR_TIMES][processor_name] = 0.0 + + # 新增:计算后处理器平均时间 + for processor_name, times in stat[FOCUS_POST_PROCESSOR_TIMES].items(): + if times: + stat[FOCUS_POST_PROCESSOR_TIMES][processor_name] = sum(times) / len(times) + else: + stat[FOCUS_POST_PROCESSOR_TIMES][processor_name] = 0.0 + def _collect_all_statistics(self, now: datetime) -> Dict[str, Dict[str, Any]]: """ 收集各时间段的统计数据 @@ -959,261 +996,6 @@ class StatisticOutputTask(AsyncTask): ] ) - # 按聊天流统计 - _focus_chat_rows = "\n".join( - [ - f"
- 统计时段: - {start_time.strftime("%Y-%m-%d %H:%M:%S")} ~ {now.strftime("%Y-%m-%d %H:%M:%S")} -
-总在线时间: {_format_online_time(stat_data[ONLINE_TIME])}
-总消息数: {stat_data[TOTAL_MSG_CNT]}
-总请求数: {stat_data[TOTAL_REQ_CNT]}
-总花费: {stat_data[TOTAL_COST]:.4f} ¥
- -| 模型名称 | 调用次数 | 输入Token | 输出Token | Token总量 | 累计花费 |
|---|
| 模块名称 | 调用次数 | 输入Token | 输出Token | Token总量 | 累计花费 |
|---|
| 请求类型 | 调用次数 | 输入Token | 输出Token | Token总量 | 累计花费 |
|---|
| 联系人/群组名称 | 消息数量 |
|---|
统计截止时间: {now.strftime("%Y-%m-%d %H:%M:%S")}
- -| 处理器名称 | 平均耗时 |
|---|
| 处理器名称 | 平均耗时 | 执行次数 |
|---|