feat:将部分处理器后置,大大减少消耗

This commit is contained in:
SengokuCola
2025-06-23 21:05:19 +08:00
parent 231813690d
commit e0c1bb71be
3 changed files with 205 additions and 27 deletions

View File

@@ -30,6 +30,9 @@ from src.chat.focus_chat.planners.action_manager import ActionManager
from src.config.config import global_config from src.config.config import global_config
from src.chat.focus_chat.hfc_performance_logger import HFCPerformanceLogger from src.chat.focus_chat.hfc_performance_logger import HFCPerformanceLogger
from src.chat.focus_chat.hfc_version_manager import get_hfc_version from src.chat.focus_chat.hfc_version_manager import get_hfc_version
from src.chat.focus_chat.info.relation_info import RelationInfo
from src.chat.focus_chat.info.expression_selection_info import ExpressionSelectionInfo
install(extra_lines=3) install(extra_lines=3)
@@ -50,6 +53,10 @@ PROCESSOR_CLASSES = {
"ChattingInfoProcessor": (ChattingInfoProcessor, None), "ChattingInfoProcessor": (ChattingInfoProcessor, None),
"ToolProcessor": (ToolProcessor, "tool_use_processor"), "ToolProcessor": (ToolProcessor, "tool_use_processor"),
"WorkingMemoryProcessor": (WorkingMemoryProcessor, "working_memory_processor"), "WorkingMemoryProcessor": (WorkingMemoryProcessor, "working_memory_processor"),
}
# 定义后期处理器映射:在规划后、动作执行前运行的处理器
POST_PLANNING_PROCESSOR_CLASSES = {
"PersonImpressionpProcessor": (PersonImpressionpProcessor, "person_impression_processor"), "PersonImpressionpProcessor": (PersonImpressionpProcessor, "person_impression_processor"),
"ExpressionSelectorProcessor": (ExpressionSelectorProcessor, "expression_selector_processor"), "ExpressionSelectorProcessor": (ExpressionSelectorProcessor, "expression_selector_processor"),
} }
@@ -113,22 +120,34 @@ class HeartFChatting:
self.enabled_processor_names = [] self.enabled_processor_names = []
for proc_name, (_proc_class, config_key) in PROCESSOR_CLASSES.items(): for proc_name, (_proc_class, config_key) in PROCESSOR_CLASSES.items():
# 检查处理器是否应该启用
if not config_key or getattr(config_processor_settings, config_key, True):
self.enabled_processor_names.append(proc_name)
# 初始化后期处理器(规划后执行的处理器)
self.enabled_post_planning_processor_names = []
for proc_name, (_proc_class, config_key) in POST_PLANNING_PROCESSOR_CLASSES.items():
# 对于关系处理器,需要同时检查两个配置项 # 对于关系处理器,需要同时检查两个配置项
if proc_name == "PersonImpressionpProcessor": if proc_name == "PersonImpressionpProcessor":
if global_config.relationship.enable_relationship and getattr( if global_config.relationship.enable_relationship and getattr(
config_processor_settings, config_key, True config_processor_settings, config_key, True
): ):
self.enabled_processor_names.append(proc_name) self.enabled_post_planning_processor_names.append(proc_name)
else: else:
# 其他处理器的原有逻辑 # 其他后期处理器的逻辑
if not config_key or getattr(config_processor_settings, config_key, True): if not config_key or getattr(config_processor_settings, config_key, True):
self.enabled_processor_names.append(proc_name) self.enabled_post_planning_processor_names.append(proc_name)
# logger.info(f"{self.log_prefix} 将启用的处理器: {self.enabled_processor_names}") # logger.info(f"{self.log_prefix} 将启用的处理器: {self.enabled_processor_names}")
# logger.info(f"{self.log_prefix} 将启用的后期处理器: {self.enabled_post_planning_processor_names}")
self.processors: List[BaseProcessor] = [] self.processors: List[BaseProcessor] = []
self._register_default_processors() self._register_default_processors()
# 初始化后期处理器
self.post_planning_processors: List[BaseProcessor] = []
self._register_post_planning_processors()
self.action_manager = ActionManager() self.action_manager = ActionManager()
self.action_planner = PlannerFactory.create_planner( self.action_planner = PlannerFactory.create_planner(
log_prefix=self.log_prefix, action_manager=self.action_manager log_prefix=self.log_prefix, action_manager=self.action_manager
@@ -195,8 +214,6 @@ class HeartFChatting:
if name in [ if name in [
"ToolProcessor", "ToolProcessor",
"WorkingMemoryProcessor", "WorkingMemoryProcessor",
"PersonImpressionpProcessor",
"ExpressionSelectorProcessor",
]: ]:
self.processors.append(processor_actual_class(subheartflow_id=self.stream_id)) self.processors.append(processor_actual_class(subheartflow_id=self.stream_id))
elif name == "ChattingInfoProcessor": elif name == "ChattingInfoProcessor":
@@ -222,6 +239,41 @@ class HeartFChatting:
else: else:
logger.warning(f"{self.log_prefix} 没有注册任何处理器。这可能是由于配置错误或所有处理器都被禁用了。") logger.warning(f"{self.log_prefix} 没有注册任何处理器。这可能是由于配置错误或所有处理器都被禁用了。")
def _register_post_planning_processors(self):
"""根据 self.enabled_post_planning_processor_names 注册后期处理器"""
self.post_planning_processors = [] # 清空已有的
for name in self.enabled_post_planning_processor_names: # 'name' is "PersonImpressionpProcessor", etc.
processor_info = POST_PLANNING_PROCESSOR_CLASSES.get(name) # processor_info is (ProcessorClass, config_key)
if processor_info:
processor_actual_class = processor_info[0] # 获取实际的类定义
# 根据处理器类名判断是否需要 subheartflow_id
if name in [
"PersonImpressionpProcessor",
"ExpressionSelectorProcessor",
]:
self.post_planning_processors.append(processor_actual_class(subheartflow_id=self.stream_id))
else:
# 对于POST_PLANNING_PROCESSOR_CLASSES中定义但此处未明确处理构造的处理器
# (例如, 新增了一个处理器到POST_PLANNING_PROCESSOR_CLASSES, 它不需要id, 也不叫PersonImpressionpProcessor)
try:
self.post_planning_processors.append(processor_actual_class()) # 尝试无参构造
logger.debug(f"{self.log_prefix} 注册后期处理器 {name} (尝试无参构造).")
except TypeError:
logger.error(
f"{self.log_prefix} 后期处理器 {name} 构造失败。它可能需要参数(如 subheartflow_id但未在注册逻辑中明确处理。"
)
else:
# 这理论上不应该发生,因为 enabled_post_planning_processor_names 是从 POST_PLANNING_PROCESSOR_CLASSES 的键生成的
logger.warning(
f"{self.log_prefix} 在 POST_PLANNING_PROCESSOR_CLASSES 中未找到名为 '{name}' 的处理器定义,将跳过注册。"
)
if self.post_planning_processors:
logger.info(f"{self.log_prefix} 已注册后期处理器: {[p.__class__.__name__ for p in self.post_planning_processors]}")
else:
logger.warning(f"{self.log_prefix} 没有注册任何后期处理器。这可能是由于配置错误或所有后期处理器都被禁用了。")
async def start(self): async def start(self):
"""检查是否需要启动主循环,如果未激活则启动。""" """检查是否需要启动主循环,如果未激活则启动。"""
logger.debug(f"{self.log_prefix} 开始启动 HeartFChatting") logger.debug(f"{self.log_prefix} 开始启动 HeartFChatting")
@@ -508,6 +560,7 @@ class HeartFChatting:
) )
task = asyncio.create_task(run_with_timeout()) task = asyncio.create_task(run_with_timeout())
processor_tasks.append(task) processor_tasks.append(task)
task_to_name_map[task] = processor_name task_to_name_map[task] = processor_name
logger.debug(f"{self.log_prefix} 启动处理器任务: {processor_name}") logger.debug(f"{self.log_prefix} 启动处理器任务: {processor_name}")
@@ -561,6 +614,139 @@ class HeartFChatting:
return all_plan_info, processor_time_costs return all_plan_info, processor_time_costs
async def _process_post_planning_processors(self, observations: List[Observation], action_data: dict) -> dict:
"""
处理后期处理器(规划后执行的处理器)
包括:关系处理器、表达选择器、记忆激活器
参数:
observations: 观察器列表
action_data: 原始动作数据
返回:
dict: 更新后的动作数据
"""
logger.info(f"{self.log_prefix} 开始执行后期处理器")
# 创建所有后期任务
task_list = []
task_to_name_map = {}
# 添加后期处理器任务
for processor in self.post_planning_processors:
processor_name = processor.__class__.__name__
async def run_processor_with_timeout(proc=processor):
return await asyncio.wait_for(
proc.process_info(observations=observations),
timeout=global_config.focus_chat.processor_max_time,
)
task = asyncio.create_task(run_processor_with_timeout())
task_list.append(task)
task_to_name_map[task] = ("processor", processor_name)
logger.info(f"{self.log_prefix} 启动后期处理器任务: {processor_name}")
# 添加记忆激活器任务
async def run_memory_with_timeout():
return await asyncio.wait_for(
self.memory_activator.activate_memory(observations),
timeout=MEMORY_ACTIVATION_TIMEOUT,
)
memory_task = asyncio.create_task(run_memory_with_timeout())
task_list.append(memory_task)
task_to_name_map[memory_task] = ("memory", "MemoryActivator")
logger.info(f"{self.log_prefix} 启动记忆激活器任务")
# 如果没有任何后期任务,直接返回
if not task_list:
logger.info(f"{self.log_prefix} 没有启用的后期处理器或记忆激活器")
return action_data
# 等待所有任务完成
pending_tasks = set(task_list)
all_post_plan_info = []
running_memorys = []
while pending_tasks:
done, pending_tasks = await asyncio.wait(pending_tasks, return_when=asyncio.FIRST_COMPLETED)
for task in done:
task_type, task_name = task_to_name_map[task]
try:
result = await task
if task_type == "processor":
logger.info(f"{self.log_prefix} 后期处理器 {task_name} 已完成!")
if result is not None:
all_post_plan_info.extend(result)
else:
logger.warning(f"{self.log_prefix} 后期处理器 {task_name} 返回了 None")
elif task_type == "memory":
logger.info(f"{self.log_prefix} 记忆激活器已完成!")
if result is not None:
running_memorys = result
else:
logger.warning(f"{self.log_prefix} 记忆激活器返回了 None")
running_memorys = []
except asyncio.TimeoutError:
if task_type == "processor":
logger.warning(
f"{self.log_prefix} 后期处理器 {task_name} 超时(>{global_config.focus_chat.processor_max_time}s已跳过"
)
elif task_type == "memory":
logger.warning(f"{self.log_prefix} 记忆激活器超时(>{MEMORY_ACTIVATION_TIMEOUT}s已跳过")
running_memorys = []
except Exception as e:
if task_type == "processor":
logger.error(
f"{self.log_prefix} 后期处理器 {task_name} 执行失败. 错误: {e}",
exc_info=True,
)
elif task_type == "memory":
logger.error(f"{self.log_prefix} 记忆激活器执行失败. 错误: {e}", exc_info=True)
running_memorys = []
# 将后期处理器的结果整合到 action_data 中
updated_action_data = action_data.copy()
relation_info = ""
selected_expressions = []
for info in all_post_plan_info:
if isinstance(info, RelationInfo):
relation_info = info.get_processed_info()
elif isinstance(info, ExpressionSelectionInfo):
selected_expressions = info.get_expressions_for_action_data()
if relation_info:
updated_action_data["relation_info_block"] = relation_info
# 将选中的表达方式传递给action_data
if selected_expressions:
updated_action_data["selected_expressions"] = selected_expressions
logger.info(f"{self.log_prefix} 传递{len(selected_expressions)}个选中的表达方式到action_data")
# 将记忆信息也添加到action_data中
if running_memorys:
updated_action_data["running_memories"] = running_memorys
# 生成兼容的memory_block格式
memory_str = "以下是当前在聊天中,你回忆起的记忆:\n"
for running_memory in running_memorys:
memory_str += f"{running_memory['content']}\n"
updated_action_data["memory_block"] = memory_str
logger.info(f"{self.log_prefix} 添加了 {len(running_memorys)} 个激活的记忆到action_data")
if all_post_plan_info or running_memorys:
logger.info(f"{self.log_prefix} 后期处理完成,产生了 {len(all_post_plan_info)} 个信息项和 {len(running_memorys)} 个记忆")
return updated_action_data
async def _observe_process_plan_action_loop(self, cycle_timers: dict, thinking_id: str) -> dict: async def _observe_process_plan_action_loop(self, cycle_timers: dict, thinking_id: str) -> dict:
try: try:
loop_start_time = time.time() loop_start_time = time.time()
@@ -588,26 +774,20 @@ class HeartFChatting:
self.observations.append(self.action_observation) self.observations.append(self.action_observation)
return True return True
# 创建个并行任务为LLM调用添加超时保护 # 创建个并行任务为LLM调用添加超时保护
action_modify_task = asyncio.create_task( action_modify_task = asyncio.create_task(
asyncio.wait_for(modify_actions_task(), timeout=ACTION_MODIFICATION_TIMEOUT) asyncio.wait_for(modify_actions_task(), timeout=ACTION_MODIFICATION_TIMEOUT)
) )
memory_task = asyncio.create_task(
asyncio.wait_for(
self.memory_activator.activate_memory(self.observations), timeout=MEMORY_ACTIVATION_TIMEOUT
)
)
processor_task = asyncio.create_task(self._process_processors(self.observations)) processor_task = asyncio.create_task(self._process_processors(self.observations))
# 等待个任务完成,使用超时保护和详细错误处理 # 等待个任务完成,使用超时保护和详细错误处理
action_modify_result = None action_modify_result = None
running_memorys = []
all_plan_info = [] all_plan_info = []
processor_time_costs = {} processor_time_costs = {}
try: try:
action_modify_result, running_memorys, (all_plan_info, processor_time_costs) = await asyncio.gather( action_modify_result, (all_plan_info, processor_time_costs) = await asyncio.gather(
action_modify_task, memory_task, processor_task, return_exceptions=True action_modify_task, processor_task, return_exceptions=True
) )
# 检查各个任务的结果 # 检查各个任务的结果
@@ -617,13 +797,6 @@ class HeartFChatting:
else: else:
logger.error(f"{self.log_prefix} 动作修改任务失败: {action_modify_result}") logger.error(f"{self.log_prefix} 动作修改任务失败: {action_modify_result}")
if isinstance(running_memorys, Exception):
if isinstance(running_memorys, asyncio.TimeoutError):
logger.error(f"{self.log_prefix} 记忆激活任务超时")
else:
logger.error(f"{self.log_prefix} 记忆激活任务失败: {running_memorys}")
running_memorys = []
processor_result = (all_plan_info, processor_time_costs) processor_result = (all_plan_info, processor_time_costs)
if isinstance(processor_result, Exception): if isinstance(processor_result, Exception):
if isinstance(processor_result, asyncio.TimeoutError): if isinstance(processor_result, asyncio.TimeoutError):
@@ -638,7 +811,6 @@ class HeartFChatting:
except Exception as e: except Exception as e:
logger.error(f"{self.log_prefix} 并行任务gather失败: {e}") logger.error(f"{self.log_prefix} 并行任务gather失败: {e}")
# 设置默认值以继续执行 # 设置默认值以继续执行
running_memorys = []
all_plan_info = [] all_plan_info = []
processor_time_costs = {} processor_time_costs = {}
@@ -648,11 +820,11 @@ class HeartFChatting:
} }
logger.debug( logger.debug(
f"{self.log_prefix} 并行阶段完成准备进入规划器plan_info数量: {len(all_plan_info)}, running_memorys数量: {len(running_memorys)}" f"{self.log_prefix} 并行阶段完成准备进入规划器plan_info数量: {len(all_plan_info)}"
) )
with Timer("规划器", cycle_timers): with Timer("规划器", cycle_timers):
plan_result = await self.action_planner.plan(all_plan_info, running_memorys, loop_start_time) plan_result = await self.action_planner.plan(all_plan_info, [], loop_start_time)
loop_plan_info = { loop_plan_info = {
"action_result": plan_result.get("action_result", {}), "action_result": plan_result.get("action_result", {}),
@@ -675,6 +847,12 @@ class HeartFChatting:
logger.debug(f"{self.log_prefix} 麦麦想要:'{action_str}'") logger.debug(f"{self.log_prefix} 麦麦想要:'{action_str}'")
# 如果动作不是no_reply则执行后期处理器
if action_type != "no_reply":
with Timer("后期处理器", cycle_timers):
logger.debug(f"{self.log_prefix} 执行后期处理器(动作类型: {action_type}")
action_data = await self._process_post_planning_processors(self.observations, action_data)
success, reply_text, command = await self._handle_action( success, reply_text, command = await self._handle_action(
action_type, reasoning, action_data, cycle_timers, thinking_id action_type, reasoning, action_data, cycle_timers, thinking_id
) )

View File

@@ -52,7 +52,7 @@ class ClassicalWillingManager(BaseWillingManager):
# 检查群组权限(如果是群聊) # 检查群组权限(如果是群聊)
if ( if (
willing_info.group_info willing_info.group_info
and willing_info.group_info.group_id in global_config.chat.talk_frequency_down_groups and willing_info.group_info.group_id in global_config.normal_chat.talk_frequency_down_groups
): ):
reply_probability = reply_probability / global_config.normal_chat.down_frequency_rate reply_probability = reply_probability / global_config.normal_chat.down_frequency_rate

View File

@@ -179,7 +179,7 @@ class MxpWillingManager(BaseWillingManager):
if w_info.is_picid: if w_info.is_picid:
probability = 0 # picid格式消息直接不回复 probability = 0 # picid格式消息直接不回复
if w_info.group_info and w_info.group_info.group_id in global_config.chat.talk_frequency_down_groups: if w_info.group_info and w_info.group_info.group_id in global_config.normal_chat.talk_frequency_down_groups:
probability /= global_config.normal_chat.down_frequency_rate probability /= global_config.normal_chat.down_frequency_rate
self.temporary_willing = current_willing self.temporary_willing = current_willing