feat:添加后处理器统计时间
This commit is contained in:
@@ -454,7 +454,19 @@ class HeartFChatting:
|
||||
formatted_ptime = f"{ptime * 1000:.2f}毫秒" if ptime < 1 else f"{ptime:.2f}秒"
|
||||
processor_time_strings.append(f"{pname}: {formatted_ptime}")
|
||||
processor_time_log = (
|
||||
("\n各处理器耗时: " + "; ".join(processor_time_strings)) if processor_time_strings else ""
|
||||
("\n前处理器耗时: " + "; ".join(processor_time_strings)) if processor_time_strings else ""
|
||||
)
|
||||
|
||||
# 新增:输出每个后处理器的耗时
|
||||
post_processor_time_costs = self._current_cycle_detail.loop_post_processor_info.get(
|
||||
"post_processor_time_costs", {}
|
||||
)
|
||||
post_processor_time_strings = []
|
||||
for pname, ptime in post_processor_time_costs.items():
|
||||
formatted_ptime = f"{ptime * 1000:.2f}毫秒" if ptime < 1 else f"{ptime:.2f}秒"
|
||||
post_processor_time_strings.append(f"{pname}: {formatted_ptime}")
|
||||
post_processor_time_log = (
|
||||
("\n后处理器耗时: " + "; ".join(post_processor_time_strings)) if post_processor_time_strings else ""
|
||||
)
|
||||
|
||||
logger.info(
|
||||
@@ -463,6 +475,7 @@ class HeartFChatting:
|
||||
f"动作: {self._current_cycle_detail.loop_plan_info.get('action_result', {}).get('action_type', '未知动作')}"
|
||||
+ (f"\n详情: {'; '.join(timer_strings)}" if timer_strings else "")
|
||||
+ processor_time_log
|
||||
+ post_processor_time_log
|
||||
)
|
||||
|
||||
# 记录性能数据
|
||||
@@ -473,6 +486,8 @@ class HeartFChatting:
|
||||
"action_type": action_result.get("action_type", "unknown"),
|
||||
"total_time": self._current_cycle_detail.end_time - self._current_cycle_detail.start_time,
|
||||
"step_times": cycle_timers.copy(),
|
||||
"processor_time_costs": processor_time_costs, # 前处理器时间
|
||||
"post_processor_time_costs": post_processor_time_costs, # 后处理器时间
|
||||
"reasoning": action_result.get("reasoning", ""),
|
||||
"success": self._current_cycle_detail.loop_action_info.get("action_taken", False),
|
||||
}
|
||||
@@ -754,6 +769,180 @@ class HeartFChatting:
|
||||
|
||||
return updated_action_data
|
||||
|
||||
async def _process_post_planning_processors_with_timing(self, observations: List[Observation], action_data: dict) -> tuple[dict, dict]:
|
||||
"""
|
||||
处理后期处理器(规划后执行的处理器)并收集详细时间统计
|
||||
包括:关系处理器、表达选择器、记忆激活器
|
||||
|
||||
参数:
|
||||
observations: 观察器列表
|
||||
action_data: 原始动作数据
|
||||
|
||||
返回:
|
||||
tuple[dict, dict]: (更新后的动作数据, 后处理器时间统计)
|
||||
"""
|
||||
logger.info(f"{self.log_prefix} 开始执行后期处理器(带详细统计)")
|
||||
|
||||
# 创建所有后期任务
|
||||
task_list = []
|
||||
task_to_name_map = {}
|
||||
task_start_times = {}
|
||||
post_processor_time_costs = {}
|
||||
|
||||
# 添加后期处理器任务
|
||||
for processor in self.post_planning_processors:
|
||||
processor_name = processor.__class__.__name__
|
||||
|
||||
async def run_processor_with_timeout_and_timing(proc=processor, name=processor_name):
|
||||
start_time = time.time()
|
||||
try:
|
||||
result = await asyncio.wait_for(
|
||||
proc.process_info(observations=observations),
|
||||
timeout=global_config.focus_chat.processor_max_time,
|
||||
)
|
||||
end_time = time.time()
|
||||
post_processor_time_costs[name] = end_time - start_time
|
||||
logger.debug(f"{self.log_prefix} 后期处理器 {name} 耗时: {end_time - start_time:.3f}秒")
|
||||
return result
|
||||
except Exception as e:
|
||||
end_time = time.time()
|
||||
post_processor_time_costs[name] = end_time - start_time
|
||||
logger.warning(f"{self.log_prefix} 后期处理器 {name} 执行异常,耗时: {end_time - start_time:.3f}秒")
|
||||
raise e
|
||||
|
||||
task = asyncio.create_task(run_processor_with_timeout_and_timing())
|
||||
task_list.append(task)
|
||||
task_to_name_map[task] = ("processor", processor_name)
|
||||
task_start_times[task] = time.time()
|
||||
logger.info(f"{self.log_prefix} 启动后期处理器任务: {processor_name}")
|
||||
|
||||
# 添加记忆激活器任务
|
||||
async def run_memory_with_timeout_and_timing():
|
||||
start_time = time.time()
|
||||
try:
|
||||
result = await asyncio.wait_for(
|
||||
self.memory_activator.activate_memory(observations),
|
||||
timeout=MEMORY_ACTIVATION_TIMEOUT,
|
||||
)
|
||||
end_time = time.time()
|
||||
post_processor_time_costs["MemoryActivator"] = end_time - start_time
|
||||
logger.debug(f"{self.log_prefix} 记忆激活器耗时: {end_time - start_time:.3f}秒")
|
||||
return result
|
||||
except Exception as e:
|
||||
end_time = time.time()
|
||||
post_processor_time_costs["MemoryActivator"] = end_time - start_time
|
||||
logger.warning(f"{self.log_prefix} 记忆激活器执行异常,耗时: {end_time - start_time:.3f}秒")
|
||||
raise e
|
||||
|
||||
memory_task = asyncio.create_task(run_memory_with_timeout_and_timing())
|
||||
task_list.append(memory_task)
|
||||
task_to_name_map[memory_task] = ("memory", "MemoryActivator")
|
||||
task_start_times[memory_task] = time.time()
|
||||
logger.info(f"{self.log_prefix} 启动记忆激活器任务")
|
||||
|
||||
# 如果没有任何后期任务,直接返回
|
||||
if not task_list:
|
||||
logger.info(f"{self.log_prefix} 没有启用的后期处理器或记忆激活器")
|
||||
return action_data, {}
|
||||
|
||||
# 等待所有任务完成
|
||||
pending_tasks = set(task_list)
|
||||
all_post_plan_info = []
|
||||
running_memorys = []
|
||||
|
||||
while pending_tasks:
|
||||
done, pending_tasks = await asyncio.wait(pending_tasks, return_when=asyncio.FIRST_COMPLETED)
|
||||
|
||||
for task in done:
|
||||
task_type, task_name = task_to_name_map[task]
|
||||
|
||||
try:
|
||||
result = await task
|
||||
|
||||
if task_type == "processor":
|
||||
logger.info(f"{self.log_prefix} 后期处理器 {task_name} 已完成!")
|
||||
if result is not None:
|
||||
all_post_plan_info.extend(result)
|
||||
else:
|
||||
logger.warning(f"{self.log_prefix} 后期处理器 {task_name} 返回了 None")
|
||||
elif task_type == "memory":
|
||||
logger.info(f"{self.log_prefix} 记忆激活器已完成!")
|
||||
if result is not None:
|
||||
running_memorys = result
|
||||
else:
|
||||
logger.warning(f"{self.log_prefix} 记忆激活器返回了 None")
|
||||
running_memorys = []
|
||||
|
||||
except asyncio.TimeoutError:
|
||||
# 对于超时任务,记录已用时间
|
||||
elapsed_time = time.time() - task_start_times[task]
|
||||
if task_type == "processor":
|
||||
post_processor_time_costs[task_name] = elapsed_time
|
||||
logger.warning(
|
||||
f"{self.log_prefix} 后期处理器 {task_name} 超时(>{global_config.focus_chat.processor_max_time}s),已跳过,耗时: {elapsed_time:.3f}秒"
|
||||
)
|
||||
elif task_type == "memory":
|
||||
post_processor_time_costs["MemoryActivator"] = elapsed_time
|
||||
logger.warning(f"{self.log_prefix} 记忆激活器超时(>{MEMORY_ACTIVATION_TIMEOUT}s),已跳过,耗时: {elapsed_time:.3f}秒")
|
||||
running_memorys = []
|
||||
except Exception as e:
|
||||
# 对于异常任务,记录已用时间
|
||||
elapsed_time = time.time() - task_start_times[task]
|
||||
if task_type == "processor":
|
||||
post_processor_time_costs[task_name] = elapsed_time
|
||||
logger.error(
|
||||
f"{self.log_prefix} 后期处理器 {task_name} 执行失败,耗时: {elapsed_time:.3f}秒. 错误: {e}",
|
||||
exc_info=True,
|
||||
)
|
||||
elif task_type == "memory":
|
||||
post_processor_time_costs["MemoryActivator"] = elapsed_time
|
||||
logger.error(f"{self.log_prefix} 记忆激活器执行失败,耗时: {elapsed_time:.3f}秒. 错误: {e}", exc_info=True)
|
||||
running_memorys = []
|
||||
|
||||
# 将后期处理器的结果整合到 action_data 中
|
||||
updated_action_data = action_data.copy()
|
||||
|
||||
relation_info = ""
|
||||
selected_expressions = []
|
||||
structured_info = ""
|
||||
|
||||
for info in all_post_plan_info:
|
||||
if isinstance(info, RelationInfo):
|
||||
relation_info = info.get_processed_info()
|
||||
elif isinstance(info, ExpressionSelectionInfo):
|
||||
selected_expressions = info.get_expressions_for_action_data()
|
||||
elif isinstance(info, StructuredInfo):
|
||||
structured_info = info.get_processed_info()
|
||||
|
||||
if relation_info:
|
||||
updated_action_data["relation_info_block"] = relation_info
|
||||
|
||||
if selected_expressions:
|
||||
updated_action_data["selected_expressions"] = selected_expressions
|
||||
|
||||
if structured_info:
|
||||
updated_action_data["structured_info"] = structured_info
|
||||
|
||||
# 特殊处理running_memorys
|
||||
if running_memorys:
|
||||
memory_str = "以下是当前在聊天中,你回忆起的记忆:\n"
|
||||
for running_memory in running_memorys:
|
||||
memory_str += f"{running_memory['content']}\n"
|
||||
updated_action_data["memory_block"] = memory_str
|
||||
logger.info(f"{self.log_prefix} 添加了 {len(running_memorys)} 个激活的记忆到action_data")
|
||||
|
||||
if all_post_plan_info or running_memorys:
|
||||
logger.info(
|
||||
f"{self.log_prefix} 后期处理完成,产生了 {len(all_post_plan_info)} 个信息项和 {len(running_memorys)} 个记忆"
|
||||
)
|
||||
|
||||
# 输出详细统计信息
|
||||
if post_processor_time_costs:
|
||||
stats_str = ", ".join([f"{name}: {time_cost:.3f}s" for name, time_cost in post_processor_time_costs.items()])
|
||||
logger.info(f"{self.log_prefix} 后期处理器详细耗时统计: {stats_str}")
|
||||
|
||||
return updated_action_data, post_processor_time_costs
|
||||
|
||||
async def _observe_process_plan_action_loop(self, cycle_timers: dict, thinking_id: str) -> dict:
|
||||
try:
|
||||
loop_start_time = time.time()
|
||||
@@ -836,28 +1025,37 @@ class HeartFChatting:
|
||||
"observed_messages": plan_result.get("observed_messages", ""),
|
||||
}
|
||||
|
||||
with Timer("执行动作", cycle_timers):
|
||||
action_type, action_data, reasoning = (
|
||||
plan_result.get("action_result", {}).get("action_type", "error"),
|
||||
plan_result.get("action_result", {}).get("action_data", {}),
|
||||
plan_result.get("action_result", {}).get("reasoning", "未提供理由"),
|
||||
)
|
||||
# 修正:将后期处理器从执行动作Timer中分离出来
|
||||
action_type, action_data, reasoning = (
|
||||
plan_result.get("action_result", {}).get("action_type", "error"),
|
||||
plan_result.get("action_result", {}).get("action_data", {}),
|
||||
plan_result.get("action_result", {}).get("reasoning", "未提供理由"),
|
||||
)
|
||||
|
||||
if action_type == "reply":
|
||||
action_str = "回复"
|
||||
elif action_type == "no_reply":
|
||||
action_str = "不回复"
|
||||
else:
|
||||
action_str = action_type
|
||||
if action_type == "reply":
|
||||
action_str = "回复"
|
||||
elif action_type == "no_reply":
|
||||
action_str = "不回复"
|
||||
else:
|
||||
action_str = action_type
|
||||
|
||||
logger.debug(f"{self.log_prefix} 麦麦想要:'{action_str}'")
|
||||
logger.debug(f"{self.log_prefix} 麦麦想要:'{action_str}'")
|
||||
|
||||
# 如果动作不是no_reply,则执行后期处理器
|
||||
if action_type != "no_reply":
|
||||
with Timer("后期处理器", cycle_timers):
|
||||
logger.debug(f"{self.log_prefix} 执行后期处理器(动作类型: {action_type})")
|
||||
action_data = await self._process_post_planning_processors(self.observations, action_data)
|
||||
# 添加:单独计时后期处理器,并收集详细统计
|
||||
post_processor_time_costs = {}
|
||||
if action_type != "no_reply":
|
||||
with Timer("后期处理器", cycle_timers):
|
||||
logger.debug(f"{self.log_prefix} 执行后期处理器(动作类型: {action_type})")
|
||||
# 记录详细的后处理器时间
|
||||
post_start_time = time.time()
|
||||
action_data, post_processor_time_costs = await self._process_post_planning_processors_with_timing(self.observations, action_data)
|
||||
post_end_time = time.time()
|
||||
logger.info(f"{self.log_prefix} 后期处理器总耗时: {post_end_time - post_start_time:.3f}秒")
|
||||
else:
|
||||
logger.debug(f"{self.log_prefix} 跳过后期处理器(动作类型: {action_type})")
|
||||
|
||||
# 修正:纯动作执行计时
|
||||
with Timer("动作执行", cycle_timers):
|
||||
success, reply_text, command = await self._handle_action(
|
||||
action_type, reasoning, action_data, cycle_timers, thinking_id
|
||||
)
|
||||
@@ -869,11 +1067,17 @@ class HeartFChatting:
|
||||
"taken_time": time.time(),
|
||||
}
|
||||
|
||||
# 添加后处理器统计到loop_info
|
||||
loop_post_processor_info = {
|
||||
"post_processor_time_costs": post_processor_time_costs,
|
||||
}
|
||||
|
||||
loop_info = {
|
||||
"loop_observation_info": loop_observation_info,
|
||||
"loop_processor_info": loop_processor_info,
|
||||
"loop_plan_info": loop_plan_info,
|
||||
"loop_action_info": loop_action_info,
|
||||
"loop_post_processor_info": loop_post_processor_info, # 新增
|
||||
}
|
||||
|
||||
return loop_info
|
||||
|
||||
Reference in New Issue
Block a user