This commit is contained in:
SengokuCola
2025-06-23 21:33:23 +08:00

View File

@@ -271,9 +271,13 @@ class HeartFChatting:
) )
if self.post_planning_processors: if self.post_planning_processors:
logger.info(f"{self.log_prefix} 已注册后期处理器: {[p.__class__.__name__ for p in self.post_planning_processors]}") logger.info(
f"{self.log_prefix} 已注册后期处理器: {[p.__class__.__name__ for p in self.post_planning_processors]}"
)
else: else:
logger.warning(f"{self.log_prefix} 没有注册任何后期处理器。这可能是由于配置错误或所有后期处理器都被禁用了。") logger.warning(
f"{self.log_prefix} 没有注册任何后期处理器。这可能是由于配置错误或所有后期处理器都被禁用了。"
)
async def start(self): async def start(self):
"""检查是否需要启动主循环,如果未激活则启动。""" """检查是否需要启动主循环,如果未激活则启动。"""
@@ -561,7 +565,7 @@ class HeartFChatting:
) )
task = asyncio.create_task(run_with_timeout()) task = asyncio.create_task(run_with_timeout())
processor_tasks.append(task) processor_tasks.append(task)
task_to_name_map[task] = processor_name task_to_name_map[task] = processor_name
logger.debug(f"{self.log_prefix} 启动处理器任务: {processor_name}") logger.debug(f"{self.log_prefix} 启动处理器任务: {processor_name}")
@@ -619,66 +623,66 @@ class HeartFChatting:
""" """
处理后期处理器(规划后执行的处理器) 处理后期处理器(规划后执行的处理器)
包括:关系处理器、表达选择器、记忆激活器 包括:关系处理器、表达选择器、记忆激活器
参数: 参数:
observations: 观察器列表 observations: 观察器列表
action_data: 原始动作数据 action_data: 原始动作数据
返回: 返回:
dict: 更新后的动作数据 dict: 更新后的动作数据
""" """
logger.info(f"{self.log_prefix} 开始执行后期处理器") logger.info(f"{self.log_prefix} 开始执行后期处理器")
# 创建所有后期任务 # 创建所有后期任务
task_list = [] task_list = []
task_to_name_map = {} task_to_name_map = {}
# 添加后期处理器任务 # 添加后期处理器任务
for processor in self.post_planning_processors: for processor in self.post_planning_processors:
processor_name = processor.__class__.__name__ processor_name = processor.__class__.__name__
async def run_processor_with_timeout(proc=processor): async def run_processor_with_timeout(proc=processor):
return await asyncio.wait_for( return await asyncio.wait_for(
proc.process_info(observations=observations), proc.process_info(observations=observations),
timeout=global_config.focus_chat.processor_max_time, timeout=global_config.focus_chat.processor_max_time,
) )
task = asyncio.create_task(run_processor_with_timeout()) task = asyncio.create_task(run_processor_with_timeout())
task_list.append(task) task_list.append(task)
task_to_name_map[task] = ("processor", processor_name) task_to_name_map[task] = ("processor", processor_name)
logger.info(f"{self.log_prefix} 启动后期处理器任务: {processor_name}") logger.info(f"{self.log_prefix} 启动后期处理器任务: {processor_name}")
# 添加记忆激活器任务 # 添加记忆激活器任务
async def run_memory_with_timeout(): async def run_memory_with_timeout():
return await asyncio.wait_for( return await asyncio.wait_for(
self.memory_activator.activate_memory(observations), self.memory_activator.activate_memory(observations),
timeout=MEMORY_ACTIVATION_TIMEOUT, timeout=MEMORY_ACTIVATION_TIMEOUT,
) )
memory_task = asyncio.create_task(run_memory_with_timeout()) memory_task = asyncio.create_task(run_memory_with_timeout())
task_list.append(memory_task) task_list.append(memory_task)
task_to_name_map[memory_task] = ("memory", "MemoryActivator") task_to_name_map[memory_task] = ("memory", "MemoryActivator")
logger.info(f"{self.log_prefix} 启动记忆激活器任务") logger.info(f"{self.log_prefix} 启动记忆激活器任务")
# 如果没有任何后期任务,直接返回 # 如果没有任何后期任务,直接返回
if not task_list: if not task_list:
logger.info(f"{self.log_prefix} 没有启用的后期处理器或记忆激活器") logger.info(f"{self.log_prefix} 没有启用的后期处理器或记忆激活器")
return action_data return action_data
# 等待所有任务完成 # 等待所有任务完成
pending_tasks = set(task_list) pending_tasks = set(task_list)
all_post_plan_info = [] all_post_plan_info = []
running_memorys = [] running_memorys = []
while pending_tasks: while pending_tasks:
done, pending_tasks = await asyncio.wait(pending_tasks, return_when=asyncio.FIRST_COMPLETED) done, pending_tasks = await asyncio.wait(pending_tasks, return_when=asyncio.FIRST_COMPLETED)
for task in done: for task in done:
task_type, task_name = task_to_name_map[task] task_type, task_name = task_to_name_map[task]
try: try:
result = await task result = await task
if task_type == "processor": if task_type == "processor":
logger.info(f"{self.log_prefix} 后期处理器 {task_name} 已完成!") logger.info(f"{self.log_prefix} 后期处理器 {task_name} 已完成!")
if result is not None: if result is not None:
@@ -692,7 +696,7 @@ class HeartFChatting:
else: else:
logger.warning(f"{self.log_prefix} 记忆激活器返回了 None") logger.warning(f"{self.log_prefix} 记忆激活器返回了 None")
running_memorys = [] running_memorys = []
except asyncio.TimeoutError: except asyncio.TimeoutError:
if task_type == "processor": if task_type == "processor":
logger.warning( logger.warning(
@@ -710,10 +714,10 @@ class HeartFChatting:
elif task_type == "memory": elif task_type == "memory":
logger.error(f"{self.log_prefix} 记忆激活器执行失败. 错误: {e}", exc_info=True) logger.error(f"{self.log_prefix} 记忆激活器执行失败. 错误: {e}", exc_info=True)
running_memorys = [] running_memorys = []
# 将后期处理器的结果整合到 action_data 中 # 将后期处理器的结果整合到 action_data 中
updated_action_data = action_data.copy() updated_action_data = action_data.copy()
relation_info = "" relation_info = ""
selected_expressions = [] selected_expressions = []
structured_info = "" structured_info = ""
@@ -745,8 +749,10 @@ class HeartFChatting:
if all_post_plan_info or running_memorys: if all_post_plan_info or running_memorys:
logger.info(f"{self.log_prefix} 后期处理完成,产生了 {len(all_post_plan_info)} 个信息项和 {len(running_memorys)} 个记忆") logger.info(
f"{self.log_prefix} 后期处理完成,产生了 {len(all_post_plan_info)} 个信息项和 {len(running_memorys)} 个记忆"
)
return updated_action_data return updated_action_data
async def _observe_process_plan_action_loop(self, cycle_timers: dict, thinking_id: str) -> dict: async def _observe_process_plan_action_loop(self, cycle_timers: dict, thinking_id: str) -> dict:
@@ -821,9 +827,7 @@ class HeartFChatting:
"processor_time_costs": processor_time_costs, "processor_time_costs": processor_time_costs,
} }
logger.debug( logger.debug(f"{self.log_prefix} 并行阶段完成准备进入规划器plan_info数量: {len(all_plan_info)}")
f"{self.log_prefix} 并行阶段完成准备进入规划器plan_info数量: {len(all_plan_info)}"
)
with Timer("规划器", cycle_timers): with Timer("规划器", cycle_timers):
plan_result = await self.action_planner.plan(all_plan_info, [], loop_start_time) plan_result = await self.action_planner.plan(all_plan_info, [], loop_start_time)