@@ -13,7 +13,7 @@ from src.chat.heart_flow.observation.observation import Observation
from src . chat . focus_chat . heartFC_Cycleinfo import CycleDetail
from src . chat . focus_chat . info . info_base import InfoBase
from src . chat . focus_chat . info_processors . chattinginfo_processor import ChattingInfoProcessor
from src . chat . focus_chat . info_processors . relationship_processor import Relationshi pProcessor
from src . chat . focus_chat . info_processors . relationship_processor import PersonImpression pProcessor
from src . chat . focus_chat . info_processors . working_memory_processor import WorkingMemoryProcessor
from src . chat . heart_flow . observation . hfcloop_observation import HFCloopObservation
from src . chat . heart_flow . observation . working_observation import WorkingMemoryObservation
@@ -31,6 +31,10 @@ from src.config.config import global_config
install ( extra_lines = 3 )
# 超时常量配置
MEMORY_ACTIVATION_TIMEOUT = 5.0 # 记忆激活任务超时时限(秒)
ACTION_MODIFICATION_TIMEOUT = 15.0 # 动作修改任务超时时限(秒)
# 定义观察器映射:键是观察器名称,值是 (观察器类, 初始化参数)
OBSERVATION_CLASSES = {
" ChattingObservation " : ( ChattingObservation , " chat_id " ) ,
@@ -44,7 +48,7 @@ PROCESSOR_CLASSES = {
" ChattingInfoProcessor " : ( ChattingInfoProcessor , None ) ,
" ToolProcessor " : ( ToolProcessor , " tool_use_processor " ) ,
" WorkingMemoryProcessor " : ( WorkingMemoryProcessor , " working_memory_processor " ) ,
" Relationshi pProcessor" : ( Relationshi pProcessor, " relat ion_processor" ) ,
" PersonImpression pProcessor" : ( PersonImpression pProcessor, " person_impress ion_processor" ) ,
" ExpressionSelectorProcessor " : ( ExpressionSelectorProcessor , " expression_selector_processor " ) ,
}
@@ -106,7 +110,7 @@ class HeartFChatting:
for proc_name , ( _proc_class , config_key ) in PROCESSOR_CLASSES . items ( ) :
# 对于关系处理器,需要同时检查两个配置项
if proc_name == " Relationshi pProcessor" :
if proc_name == " PersonImpression pProcessor" :
if global_config . relationship . enable_relationship and getattr (
config_processor_settings , config_key , True
) :
@@ -182,7 +186,7 @@ class HeartFChatting:
if name in [
" ToolProcessor " ,
" WorkingMemoryProcessor " ,
" Relationshi pProcessor" ,
" PersonImpression pProcessor" ,
" ExpressionSelectorProcessor " ,
] :
self . processors . append ( processor_actual_class ( subheartflow_id = self . stream_id ) )
@@ -560,21 +564,72 @@ class HeartFChatting:
self . observations . append ( self . action_observation )
return True
# 创建三个并行任务
action_modify_task = asyncio . create_task ( modify_actions_task ( ) )
memory_task = asyncio . create_task ( self . memory_activator . activate_memory ( self . observations ) )
# 创建三个并行任务, 为LLM调用添加超时保护
action_modify_task = asyncio . create_task (
asyncio . wait_for (
modify_actions_task ( ) ,
timeout = ACTION_MODIFICATION_TIMEOUT
)
)
memory_task = asyncio . create_task (
asyncio . wait_for (
self . memory_activator . activate_memory ( self . observations ) ,
timeout = MEMORY_ACTIVATION_TIMEOUT
)
)
processor_task = asyncio . create_task ( self . _process_processors ( self . observations ) )
# 等待三个任务完成
_ , running_memorys , ( all_plan_info , processor_time_costs ) = await asyncio . gather (
action_modify_task , memory_task , processor_task
)
# 等待三个任务完成,使用超时保护和详细错误处理
action_modify_result = None
running_memorys = [ ]
all_plan_info = [ ]
processor_time_costs = { }
try :
action_modify_result , running_memorys , ( all_plan_info , processor_time_costs ) = await asyncio . gather (
action_modify_task , memory_task , processor_task ,
return_exceptions = True
)
# 检查各个任务的结果
if isinstance ( action_modify_result , Exception ) :
if isinstance ( action_modify_result , asyncio . TimeoutError ) :
logger . error ( f " { self . log_prefix } 动作修改任务超时 " )
else :
logger . error ( f " { self . log_prefix } 动作修改任务失败: { action_modify_result } " )
if isinstance ( running_memorys , Exception ) :
if isinstance ( running_memorys , asyncio . TimeoutError ) :
logger . error ( f " { self . log_prefix } 记忆激活任务超时 " )
else :
logger . error ( f " { self . log_prefix } 记忆激活任务失败: { running_memorys } " )
running_memorys = [ ]
processor_result = ( all_plan_info , processor_time_costs )
if isinstance ( processor_result , Exception ) :
if isinstance ( processor_result , asyncio . TimeoutError ) :
logger . error ( f " { self . log_prefix } 处理器任务超时 " )
else :
logger . error ( f " { self . log_prefix } 处理器任务失败: { processor_result } " )
all_plan_info = [ ]
processor_time_costs = { }
else :
all_plan_info , processor_time_costs = processor_result
except Exception as e :
logger . error ( f " { self . log_prefix } 并行任务gather失败: { e } " )
# 设置默认值以继续执行
running_memorys = [ ]
all_plan_info = [ ]
processor_time_costs = { }
loop_processor_info = {
" all_plan_info " : all_plan_info ,
" processor_time_costs " : processor_time_costs ,
}
logger . debug ( f " { self . log_prefix } 并行阶段完成, 准备进入规划器, plan_info数量: { len ( all_plan_info ) } , running_memorys数量: { len ( running_memorys ) } " )
with Timer ( " 规划器 " , cycle_timers ) :
plan_result = await self . action_planner . plan ( all_plan_info , running_memorys , loop_start_time )