feat:处理器处理时间上限,处理并行模式,planner和思考prompt,循环观察器大优化

feat:增加了处理器处理时间上限,记忆处理并行模式,优化了planner和思考prompt,优化了循环观察器
This commit is contained in:
SengokuCola
2025-05-30 11:04:29 +08:00
parent 8f0a4d9d2c
commit 54724ae21e
21 changed files with 233 additions and 120 deletions

View File

@@ -44,21 +44,10 @@ PROCESSOR_CLASSES = {
"ToolProcessor": (ToolProcessor, "tool_use_processor"),
"WorkingMemoryProcessor": (WorkingMemoryProcessor, "working_memory_processor"),
"SelfProcessor": (SelfProcessor, "self_identify_processor"),
# "ActionProcessor": (ActionProcessor, "action_processor"), # 这个处理器不需要配置键名,默认启用
}
WAITING_TIME_THRESHOLD = 300 # 等待新消息时间阈值,单位秒
EMOJI_SEND_PRO = 0.3 # 设置一个概率,比如 30% 才真的发
CONSECUTIVE_NO_REPLY_THRESHOLD = 3 # 连续不回复的阈值
logger = get_logger("hfc") # Logger Name Changed
# 设定处理器超时时间(秒)
PROCESSOR_TIMEOUT = 30
async def _handle_cycle_delay(action_taken_this_cycle: bool, cycle_start_time: float, log_prefix: str):
"""处理循环延迟"""
@@ -150,7 +139,7 @@ class HeartFChatting:
# 添加循环信息管理相关的属性
self._cycle_counter = 0
self._cycle_history: Deque[CycleDetail] = deque(maxlen=10) # 保留最近10个循环的信息
self._current_cycle: Optional[CycleDetail] = None
self._current_cycle_detail: Optional[CycleDetail] = None
self._shutting_down: bool = False # 关闭标志位
# 存储回调函数
@@ -262,12 +251,12 @@ class HeartFChatting:
try:
exception = task.exception()
if exception:
logger.error(f"{self.log_prefix} HeartFChatting: 麦麦脱离了聊天(异常): {exception}")
logger.error(f"{self.log_prefix} HeartFChatting: 脱离了聊天(异常): {exception}")
logger.error(traceback.format_exc()) # Log full traceback for exceptions
else:
logger.info(f"{self.log_prefix} HeartFChatting: 麦麦脱离了聊天 (外部停止)")
logger.info(f"{self.log_prefix} HeartFChatting: 脱离了聊天 (外部停止)")
except asyncio.CancelledError:
logger.info(f"{self.log_prefix} HeartFChatting: 麦麦脱离了聊天(任务取消)")
logger.info(f"{self.log_prefix} HeartFChatting: 脱离了聊天(任务取消)")
finally:
self._loop_active = False
self._loop_task = None
@@ -286,7 +275,8 @@ class HeartFChatting:
# 创建新的循环信息
self._cycle_counter += 1
self._current_cycle = CycleDetail(self._cycle_counter)
self._current_cycle_detail = CycleDetail(self._cycle_counter)
self._current_cycle_detail.prefix = self.log_prefix
# 初始化周期状态
cycle_timers = {}
@@ -295,13 +285,12 @@ class HeartFChatting:
# 执行规划和处理阶段
async with self._get_cycle_context():
thinking_id = "tid" + str(round(time.time(), 2))
self._current_cycle.set_thinking_id(thinking_id)
self._current_cycle_detail.set_thinking_id(thinking_id)
# 主循环:思考->决策->执行
async with global_prompt_manager.async_message_scope(self.chat_stream.context.get_template_name()):
logger.debug(f"模板 {self.chat_stream.context.get_template_name()}")
loop_info = await self._observe_process_plan_action_loop(cycle_timers, thinking_id)
print(loop_info["loop_action_info"]["command"])
if loop_info["loop_action_info"]["command"] == "stop_focus_chat":
logger.info(f"{self.log_prefix} 麦麦决定停止专注聊天")
# 如果设置了回调函数,则调用它
@@ -314,10 +303,10 @@ class HeartFChatting:
logger.error(traceback.format_exc())
break
self._current_cycle.set_loop_info(loop_info)
self._current_cycle_detail.set_loop_info(loop_info)
self.hfcloop_observation.add_loop_info(self._current_cycle)
self._current_cycle.timers = cycle_timers
self.hfcloop_observation.add_loop_info(self._current_cycle_detail)
self._current_cycle_detail.timers = cycle_timers
# 防止循环过快消耗资源
await _handle_cycle_delay(
@@ -325,8 +314,8 @@ class HeartFChatting:
)
# 完成当前循环并保存历史
self._current_cycle.complete_cycle()
self._cycle_history.append(self._current_cycle)
self._current_cycle_detail.complete_cycle()
self._cycle_history.append(self._current_cycle_detail)
# 记录循环信息和计时器结果
timer_strings = []
@@ -335,7 +324,7 @@ class HeartFChatting:
timer_strings.append(f"{name}: {formatted_time}")
# 新增:输出每个处理器的耗时
processor_time_costs = self._current_cycle.loop_processor_info.get("processor_time_costs", {})
processor_time_costs = self._current_cycle_detail.loop_processor_info.get("processor_time_costs", {})
processor_time_strings = []
for pname, ptime in processor_time_costs.items():
formatted_ptime = f"{ptime * 1000:.2f}毫秒" if ptime < 1 else f"{ptime:.2f}"
@@ -345,9 +334,9 @@ class HeartFChatting:
)
logger.info(
f"{self.log_prefix}{self._current_cycle.cycle_id}次思考,"
f"耗时: {self._current_cycle.end_time - self._current_cycle.start_time:.1f}秒, "
f"动作: {self._current_cycle.loop_plan_info['action_result']['action_type']}"
f"{self.log_prefix}{self._current_cycle_detail.cycle_id}次思考,"
f"耗时: {self._current_cycle_detail.end_time - self._current_cycle_detail.start_time:.1f}秒, "
f"动作: {self._current_cycle_detail.loop_plan_info['action_result']['action_type']}"
+ (f"\n详情: {'; '.join(timer_strings)}" if timer_strings else "")
+ processor_time_log
)
@@ -384,7 +373,7 @@ class HeartFChatting:
self._processing_lock.release()
async def _process_processors(
self, observations: List[Observation], running_memorys: List[Dict[str, Any]], cycle_timers: dict
self, observations: List[Observation], running_memorys: List[Dict[str, Any]]
) -> tuple[List[InfoBase], Dict[str, float]]:
# 记录并行任务开始时间
parallel_start_time = time.time()
@@ -400,7 +389,7 @@ class HeartFChatting:
async def run_with_timeout(proc=processor):
return await asyncio.wait_for(
proc.process_info(observations=observations, running_memorys=running_memorys),
timeout=PROCESSOR_TIMEOUT,
timeout=global_config.focus_chat.processor_max_time,
)
task = asyncio.create_task(run_with_timeout())
@@ -429,8 +418,8 @@ class HeartFChatting:
# 记录耗时
processor_time_costs[processor_name] = duration_since_parallel_start
except asyncio.TimeoutError:
logger.info(f"{self.log_prefix} 处理器 {processor_name} 超时(>{PROCESSOR_TIMEOUT}s已跳过")
processor_time_costs[processor_name] = PROCESSOR_TIMEOUT
logger.info(f"{self.log_prefix} 处理器 {processor_name} 超时(>{global_config.focus_chat.processor_max_time}s已跳过")
processor_time_costs[processor_name] = global_config.focus_chat.processor_max_time
except Exception as e:
logger.error(
f"{self.log_prefix} 处理器 {processor_name} 执行失败,耗时 (自并行开始): {duration_since_parallel_start:.2f}秒. 错误: {e}",
@@ -473,28 +462,42 @@ class HeartFChatting:
}
self.all_observations = observations
with Timer("回忆", cycle_timers):
running_memorys = await self.memory_activator.activate_memory(observations)
with Timer("调整动作", cycle_timers):
# 处理特殊的观察
await self.action_modifier.modify_actions(observations=observations, running_memorys=running_memorys)
await self.action_modifier.modify_actions(observations=observations)
await self.action_observation.observe()
observations.append(self.action_observation)
with Timer("执行 信息处理器", cycle_timers):
all_plan_info, processor_time_costs = await self._process_processors(
observations, running_memorys, cycle_timers
)
# 根据配置决定是否并行执行回忆和处理器阶段
# print(global_config.focus_chat.parallel_processing)
if global_config.focus_chat.parallel_processing:
# 并行执行回忆和处理器阶段
with Timer("并行回忆和处理", cycle_timers):
memory_task = asyncio.create_task(self.memory_activator.activate_memory(observations))
processor_task = asyncio.create_task(self._process_processors(observations, []))
# 等待两个任务完成
running_memorys, (all_plan_info, processor_time_costs) = await asyncio.gather(memory_task, processor_task)
else:
# 串行执行
with Timer("回忆", cycle_timers):
running_memorys = await self.memory_activator.activate_memory(observations)
with Timer("执行 信息处理器", cycle_timers):
all_plan_info, processor_time_costs = await self._process_processors(
observations, running_memorys
)
loop_processor_info = {
"all_plan_info": all_plan_info,
"processor_time_costs": processor_time_costs,
}
loop_processor_info = {
"all_plan_info": all_plan_info,
"processor_time_costs": processor_time_costs,
}
with Timer("规划器", cycle_timers):
plan_result = await self.action_planner.plan(all_plan_info, cycle_timers)
plan_result = await self.action_planner.plan(all_plan_info, running_memorys)
loop_plan_info = {
"action_result": plan_result.get("action_result", {}),
@@ -526,6 +529,7 @@ class HeartFChatting:
"action_taken": success,
"reply_text": reply_text,
"command": command,
"taken_time": time.time(),
}
loop_info = {