feat:将normal抽象为循环

This commit is contained in:
SengokuCola
2025-07-11 21:51:47 +08:00
parent 0cdf53fb85
commit a0efb89d98
6 changed files with 292 additions and 366 deletions

View File

@@ -14,11 +14,26 @@ from src.chat.planner_actions.planner import ActionPlanner
from src.chat.planner_actions.action_modifier import ActionModifier
from src.chat.planner_actions.action_manager import ActionManager
from src.config.config import global_config
from src.chat.focus_chat.hfc_performance_logger import HFCPerformanceLogger
from src.person_info.relationship_builder_manager import relationship_builder_manager
from src.chat.focus_chat.hfc_utils import CycleDetail
ERROR_LOOP_INFO = {
"loop_plan_info": {
"action_result": {
"action_type": "error",
"action_data": {},
"reasoning": "循环处理失败",
},
},
"loop_action_info": {
"action_taken": False,
"reply_text": "",
"command": "",
"taken_time": time.time(),
},
}
install(extra_lines=3)
# 注释:原来的动作修改超时常量已移除,因为改为顺序执行
@@ -66,17 +81,14 @@ class HeartFChatting:
self.action_planner = ActionPlanner(chat_id=self.stream_id, action_manager=self.action_manager)
self.action_modifier = ActionModifier(action_manager=self.action_manager, chat_id=self.stream_id)
self._processing_lock = asyncio.Lock()
# 循环控制内部状态
self._loop_active: bool = False # 循环是否正在运行
self.running: bool = False
self._loop_task: Optional[asyncio.Task] = None # 主循环任务
# 添加循环信息管理相关的属性
self._cycle_counter = 0
self._cycle_history: Deque[CycleDetail] = deque(maxlen=10) # 保留最近10个循环的信息
self._current_cycle_detail: Optional[CycleDetail] = None
self._shutting_down: bool = False # 关闭标志位
# 存储回调函数
self.on_stop_focus_chat = on_stop_focus_chat
@@ -84,11 +96,6 @@ class HeartFChatting:
self.reply_timeout_count = 0
self.plan_timeout_count = 0
# 初始化性能记录器
# 如果没有指定版本号,则使用全局版本管理器的版本号
self.performance_logger = HFCPerformanceLogger(chat_id)
logger.info(
f"{self.log_prefix} HeartFChatting 初始化完成,消息疲惫阈值: {self._message_threshold}基于exit_focus_threshold={global_config.chat.exit_focus_threshold}计算仅在auto模式下生效"
)
@@ -97,36 +104,23 @@ class HeartFChatting:
"""检查是否需要启动主循环,如果未激活则启动。"""
# 如果循环已经激活,直接返回
if self._loop_active:
if self.running:
logger.debug(f"{self.log_prefix} HeartFChatting 已激活,无需重复启动")
return
try:
# 重置消息计数器开始新的focus会话
self.reset_message_count()
# 标记为活动状态,防止重复启动
self._loop_active = True
self.running = True
# 检查是否已有任务在运行(理论上不应该,因为 _loop_active=False
if self._loop_task and not self._loop_task.done():
logger.warning(f"{self.log_prefix} 发现之前的循环任务仍在运行(不符合预期)。取消旧任务。")
self._loop_task.cancel()
try:
# 等待旧任务确实被取消
await asyncio.wait_for(self._loop_task, timeout=5.0)
except Exception as e:
logger.warning(f"{self.log_prefix} 等待旧任务取消时出错: {e}")
self._loop_task = None # 清理旧任务引用
logger.debug(f"{self.log_prefix} 创建新的 HeartFChatting 主循环任务")
self._loop_task = asyncio.create_task(self._run_focus_chat())
self._loop_task = asyncio.create_task(self._main_chat_loop())
self._loop_task.add_done_callback(self._handle_loop_completion)
logger.debug(f"{self.log_prefix} HeartFChatting 启动完成")
logger.info(f"{self.log_prefix} HeartFChatting 启动完成")
except Exception as e:
# 启动失败时重置状态
self._loop_active = False
self.running = False
self._loop_task = None
logger.error(f"{self.log_prefix} HeartFChatting 启动失败: {e}")
raise
@@ -143,264 +137,151 @@ class HeartFChatting:
except asyncio.CancelledError:
logger.info(f"{self.log_prefix} HeartFChatting: 脱离了聊天(任务取消)")
finally:
self._loop_active = False
self.running = False
self._loop_task = None
if self._processing_lock.locked():
logger.warning(f"{self.log_prefix} HeartFChatting: 处理锁在循环结束时仍被锁定,强制释放。")
self._processing_lock.release()
def start_cycle(self):
self._cycle_counter += 1
self._current_cycle_detail = CycleDetail(self._cycle_counter)
self._current_cycle_detail.prefix = self.log_prefix
thinking_id = "tid" + str(round(time.time(), 2))
self._current_cycle_detail.set_thinking_id(thinking_id)
cycle_timers = {}
return cycle_timers, thinking_id
def end_cycle(self,loop_info,cycle_timers):
self._current_cycle_detail.set_loop_info(loop_info)
self.loop_info.add_loop_info(self._current_cycle_detail)
self._current_cycle_detail.timers = cycle_timers
self._current_cycle_detail.complete_cycle()
self._cycle_history.append(self._current_cycle_detail)
def print_cycle_info(self,cycle_timers):
# 记录循环信息和计时器结果
timer_strings = []
for name, elapsed in cycle_timers.items():
formatted_time = f"{elapsed * 1000:.2f}毫秒" if elapsed < 1 else f"{elapsed:.2f}"
timer_strings.append(f"{name}: {formatted_time}")
async def _run_focus_chat(self):
"""主循环,持续进行计划并可能回复消息,直到被外部取消。"""
try:
while True: # 主循环
logger.debug(f"{self.log_prefix} 开始第{self._cycle_counter}次循环")
# 检查关闭标志
if self._shutting_down:
logger.info(f"{self.log_prefix} 检测到关闭标志,退出 Focus Chat 循环。")
break
# 创建新的循环信息
self._cycle_counter += 1
self._current_cycle_detail = CycleDetail(self._cycle_counter)
self._current_cycle_detail.prefix = self.log_prefix
# 初始化周期状态
cycle_timers = {}
# 执行规划和处理阶段
try:
async with self._get_cycle_context():
thinking_id = "tid" + str(round(time.time(), 2))
self._current_cycle_detail.set_thinking_id(thinking_id)
# 使用异步上下文管理器处理消息
try:
async with global_prompt_manager.async_message_scope(
self.chat_stream.context.get_template_name()
):
# 在上下文内部检查关闭状态
if self._shutting_down:
logger.info(f"{self.log_prefix} 在处理上下文中检测到关闭信号,退出")
break
logger.debug(f"模板 {self.chat_stream.context.get_template_name()}")
loop_info = await self._observe_process_plan_action_loop(cycle_timers, thinking_id)
if loop_info["loop_action_info"]["command"] == "stop_focus_chat":
logger.info(f"{self.log_prefix} 麦麦决定停止专注聊天")
# 如果设置了回调函数,则调用它
if self.on_stop_focus_chat:
try:
await self.on_stop_focus_chat()
logger.info(f"{self.log_prefix} 成功调用回调函数处理停止专注聊天")
except Exception as e:
logger.error(f"{self.log_prefix} 调用停止专注聊天回调函数时出错: {e}")
logger.error(traceback.format_exc())
break
except asyncio.CancelledError:
logger.info(f"{self.log_prefix} 处理上下文时任务被取消")
break
except Exception as e:
logger.error(f"{self.log_prefix} 处理上下文时出错: {e}")
# 为当前循环设置错误状态,防止后续重复报错
error_loop_info = {
"loop_plan_info": {
"action_result": {
"action_type": "error",
"action_data": {},
},
},
"loop_action_info": {
"action_taken": False,
"reply_text": "",
"command": "",
"taken_time": time.time(),
},
}
self._current_cycle_detail.set_loop_info(error_loop_info)
self._current_cycle_detail.complete_cycle()
# 上下文处理失败,跳过当前循环
await asyncio.sleep(1)
continue
self._current_cycle_detail.set_loop_info(loop_info)
self.loop_info.add_loop_info(self._current_cycle_detail)
self._current_cycle_detail.timers = cycle_timers
# 完成当前循环并保存历史
self._current_cycle_detail.complete_cycle()
self._cycle_history.append(self._current_cycle_detail)
# 记录循环信息和计时器结果
timer_strings = []
for name, elapsed in cycle_timers.items():
formatted_time = f"{elapsed * 1000:.2f}毫秒" if elapsed < 1 else f"{elapsed:.2f}"
timer_strings.append(f"{name}: {formatted_time}")
logger.info(
f"{self.log_prefix}{self._current_cycle_detail.cycle_id}次思考,"
f"耗时: {self._current_cycle_detail.end_time - self._current_cycle_detail.start_time:.1f}秒, "
f"选择动作: {self._current_cycle_detail.loop_plan_info.get('action_result', {}).get('action_type', '未知动作')}"
+ (f"\n详情: {'; '.join(timer_strings)}" if timer_strings else "")
)
# 记录性能数据
try:
action_result = self._current_cycle_detail.loop_plan_info.get("action_result", {})
cycle_performance_data = {
"cycle_id": self._current_cycle_detail.cycle_id,
"action_type": action_result.get("action_type", "unknown"),
"total_time": self._current_cycle_detail.end_time - self._current_cycle_detail.start_time,
"step_times": cycle_timers.copy(),
"reasoning": action_result.get("reasoning", ""),
"success": self._current_cycle_detail.loop_action_info.get("action_taken", False),
}
self.performance_logger.record_cycle(cycle_performance_data)
except Exception as perf_e:
logger.warning(f"{self.log_prefix} 记录性能数据失败: {perf_e}")
await asyncio.sleep(global_config.focus_chat.think_interval)
except asyncio.CancelledError:
logger.info(f"{self.log_prefix} 循环处理时任务被取消")
break
except Exception as e:
logger.error(f"{self.log_prefix} 循环处理时出错: {e}")
logger.error(traceback.format_exc())
# 如果_current_cycle_detail存在但未完成为其设置错误状态
if self._current_cycle_detail and not hasattr(self._current_cycle_detail, "end_time"):
error_loop_info = {
"loop_plan_info": {
"action_result": {
"action_type": "error",
"action_data": {},
"reasoning": f"循环处理失败: {e}",
},
},
"loop_action_info": {
"action_taken": False,
"reply_text": "",
"command": "",
"taken_time": time.time(),
},
}
try:
self._current_cycle_detail.set_loop_info(error_loop_info)
self._current_cycle_detail.complete_cycle()
except Exception as inner_e:
logger.error(f"{self.log_prefix} 设置错误状态时出错: {inner_e}")
await asyncio.sleep(1) # 出错后等待一秒再继续
except asyncio.CancelledError:
# 设置了关闭标志位后被取消是正常流程
if not self._shutting_down:
logger.warning(f"{self.log_prefix} 麦麦Focus聊天模式意外被取消")
else:
logger.info(f"{self.log_prefix} 麦麦已离开Focus聊天模式")
except Exception as e:
logger.error(f"{self.log_prefix} 麦麦Focus聊天模式意外错误: {e}")
print(traceback.format_exc())
@contextlib.asynccontextmanager
async def _get_cycle_context(self):
"""
循环周期的上下文管理器
用于确保资源的正确获取和释放:
1. 获取处理锁
2. 执行操作
3. 释放锁
"""
acquired = False
try:
await self._processing_lock.acquire()
acquired = True
yield acquired
finally:
if acquired and self._processing_lock.locked():
self._processing_lock.release()
async def _observe_process_plan_action_loop(self, cycle_timers: dict, thinking_id: str) -> dict:
try:
loop_start_time = time.time()
await self.loop_info.observe()
await self.relationship_builder.build_relation()
# 顺序执行调整动作和处理器阶段
# 第一步:动作修改
with Timer("动作修改", cycle_timers):
try:
# 调用完整的动作修改流程
await self.action_modifier.modify_actions(
loop_info=self.loop_info,
mode="focus",
)
except Exception as e:
logger.error(f"{self.log_prefix} 动作修改失败: {e}")
# 继续执行,不中断流程
with Timer("规划器", cycle_timers):
plan_result = await self.action_planner.plan()
loop_plan_info = {
"action_result": plan_result.get("action_result", {}),
}
action_type, action_data, reasoning = (
plan_result.get("action_result", {}).get("action_type", "error"),
plan_result.get("action_result", {}).get("action_data", {}),
plan_result.get("action_result", {}).get("reasoning", "未提供理由"),
logger.info(
f"{self.log_prefix}{self._current_cycle_detail.cycle_id}次思考,"
f"耗时: {self._current_cycle_detail.end_time - self._current_cycle_detail.start_time:.1f}秒, "
f"选择动作: {self._current_cycle_detail.loop_plan_info.get('action_result', {}).get('action_type', '未知动作')}"
+ (f"\n详情: {'; '.join(timer_strings)}" if timer_strings else "")
)
action_data["loop_start_time"] = loop_start_time
async def _focus_mode_loopbody(self):
logger.info(f"{self.log_prefix} 开始第{self._cycle_counter}次循环")
if action_type == "reply":
action_str = "回复"
elif action_type == "no_reply":
action_str = "不回复"
else:
action_str = action_type
# 创建新的循环信息
cycle_timers, thinking_id = self.start_cycle()
logger.debug(f"{self.log_prefix} 麦麦想要:'{action_str}',理由是:{reasoning}")
# 执行规划和处理阶段
try:
async with global_prompt_manager.async_message_scope(
self.chat_stream.context.get_template_name()
):
# 动作执行计时
with Timer("动作执行", cycle_timers):
success, reply_text, command = await self._handle_action(
action_type, reasoning, action_data, cycle_timers, thinking_id
loop_start_time = time.time()
await self.loop_info.observe()
await self.relationship_builder.build_relation()
# 第一步:动作修改
with Timer("动作修改", cycle_timers):
try:
await self.action_modifier.modify_actions(
loop_info=self.loop_info,
mode="focus",
)
except Exception as e:
logger.error(f"{self.log_prefix} 动作修改失败: {e}")
with Timer("规划器", cycle_timers):
plan_result = await self.action_planner.plan()
action_result = plan_result.get("action_result", {})
action_type, action_data, reasoning = (
action_result.get("action_type", "error"),
action_result.get("action_data", {}),
action_result.get("reasoning", "未提供理由"),
)
loop_action_info = {
"action_taken": success,
"reply_text": reply_text,
"command": command,
"taken_time": time.time(),
action_data["loop_start_time"] = loop_start_time
# 动作执行计时
with Timer("动作执行", cycle_timers):
success, reply_text, command = await self._handle_action(
action_type, reasoning, action_data, cycle_timers, thinking_id
)
loop_info = {
"loop_plan_info": {
"action_result": plan_result.get("action_result", {}),
},
"loop_action_info": {
"action_taken": success,
"reply_text": reply_text,
"command": command,
"taken_time": time.time(),
},
}
loop_info = {
"loop_plan_info": loop_plan_info,
"loop_action_info": loop_action_info,
}
if loop_info["loop_action_info"]["command"] == "stop_focus_chat":
logger.info(f"{self.log_prefix} 麦麦决定停止专注聊天")
return False
#停止该聊天模式的循环
return loop_info
self.end_cycle(loop_info,cycle_timers)
self.print_cycle_info(cycle_timers)
await asyncio.sleep(global_config.focus_chat.think_interval)
return True
except asyncio.CancelledError:
logger.info(f"{self.log_prefix} focus循环任务被取消")
return False
except Exception as e:
logger.error(f"{self.log_prefix} FOCUS聊天处理失败: {e}")
logger.error(f"{self.log_prefix} 循环处理时出错: {e}")
logger.error(traceback.format_exc())
return {
"loop_plan_info": {
"action_result": {"action_type": "error", "action_data": {}, "reasoning": f"处理失败: {e}"},
},
"loop_action_info": {"action_taken": False, "reply_text": "", "command": "", "taken_time": time.time()},
}
# 如果_current_cycle_detail存在但未完成为其设置错误状态
if self._current_cycle_detail and not hasattr(self._current_cycle_detail, "end_time"):
error_loop_info = ERROR_LOOP_INFO
try:
self._current_cycle_detail.set_loop_info(error_loop_info)
self._current_cycle_detail.complete_cycle()
except Exception as inner_e:
logger.error(f"{self.log_prefix} 设置错误状态时出错: {inner_e}")
await asyncio.sleep(1) # 出错后等待一秒再继续\
return False
async def _main_chat_loop(self):
"""主循环,持续进行计划并可能回复消息,直到被外部取消。"""
try:
loop_mode = "focus"
loop_mode_loopbody = self._focus_mode_loopbody
while self.running: # 主循环
success = await loop_mode_loopbody()
if not success:
break
logger.info(f"{self.log_prefix} 麦麦已强制离开 {loop_mode} 聊天模式")
except asyncio.CancelledError:
# 设置了关闭标志位后被取消是正常流程
logger.info(f"{self.log_prefix} 麦麦已强制离开 {loop_mode} 聊天模式")
except Exception as e:
logger.error(f"{self.log_prefix} 麦麦 {loop_mode} 聊天模式意外错误: {e}")
print(traceback.format_exc())
async def _handle_action(
self,
@@ -434,7 +315,6 @@ class HeartFChatting:
thinking_id=thinking_id,
chat_stream=self.chat_stream,
log_prefix=self.log_prefix,
shutting_down=self._shutting_down,
)
except Exception as e:
logger.error(f"{self.log_prefix} 创建动作处理器时出错: {e}")
@@ -453,40 +333,16 @@ class HeartFChatting:
success, reply_text = result
command = ""
# 检查action_data中是否有系统命令优先使用系统命令
if "_system_command" in action_data:
command = action_data["_system_command"]
logger.debug(f"{self.log_prefix} 从action_data中获取系统命令: {command}")
# 新增:消息计数和疲惫检查
if action == "reply" and success:
self._message_count += 1
current_threshold = self._get_current_fatigue_threshold()
logger.info(
f"{self.log_prefix} 已发送第 {self._message_count} 条消息(动态阈值: {current_threshold}, exit_focus_threshold: {global_config.chat.exit_focus_threshold}"
)
# 检查是否达到疲惫阈值只有在auto模式下才会自动退出
if (
global_config.chat.chat_mode == "auto"
and self._message_count >= current_threshold
and not self._fatigue_triggered
):
self._fatigue_triggered = True
logger.info(
f"{self.log_prefix} [auto模式] 已发送 {self._message_count} 条消息,达到疲惫阈值 {current_threshold},麦麦感到疲惫了,准备退出专注聊天模式"
command = self._count_reply_and_exit_focus_chat(action,success)
if reply_text == "timeout":
self.reply_timeout_count += 1
if self.reply_timeout_count > 5:
logger.warning(
f"[{self.log_prefix} ] 连续回复超时次数过多,{global_config.chat.thinking_timeout}秒 内大模型没有返回有效内容请检查你的api是否速度过慢或配置错误。建议不要使用推理模型推理模型生成速度过慢。或者尝试拉高thinking_timeout参数这可能导致回复时间过长。"
)
# 设置系统命令,在下次循环检查时触发退出
command = "stop_focus_chat"
else:
if reply_text == "timeout":
self.reply_timeout_count += 1
if self.reply_timeout_count > 5:
logger.warning(
f"[{self.log_prefix} ] 连续回复超时次数过多,{global_config.chat.thinking_timeout}秒 内大模型没有返回有效内容请检查你的api是否速度过慢或配置错误。建议不要使用推理模型推理模型生成速度过慢。或者尝试拉高thinking_timeout参数这可能导致回复时间过长。"
)
logger.warning(f"{self.log_prefix} 回复生成超时{global_config.chat.thinking_timeout}s已跳过")
return False, "", ""
logger.warning(f"{self.log_prefix} 回复生成超时{global_config.chat.thinking_timeout}s已跳过")
return False, "", ""
return success, reply_text, command
@@ -494,6 +350,33 @@ class HeartFChatting:
logger.error(f"{self.log_prefix} 处理{action}时出错: {e}")
traceback.print_exc()
return False, "", ""
def _count_reply_and_exit_focus_chat(self,action,success):
# 新增:消息计数和疲惫检查
if action == "reply" and success:
self._message_count += 1
current_threshold = self._get_current_fatigue_threshold()
logger.info(
f"{self.log_prefix} 已发送第 {self._message_count} 条消息(动态阈值: {current_threshold}, exit_focus_threshold: {global_config.chat.exit_focus_threshold}"
)
# 检查是否达到疲惫阈值只有在auto模式下才会自动退出
if (
global_config.chat.chat_mode == "auto"
and self._message_count >= current_threshold
and not self._fatigue_triggered
):
self._fatigue_triggered = True
logger.info(
f"{self.log_prefix} [auto模式] 已发送 {self._message_count} 条消息,达到疲惫阈值 {current_threshold},麦麦感到疲惫了,准备退出专注聊天模式"
)
# 设置系统命令,在下次循环检查时触发退出
command = "stop_focus_chat"
return command
return ""
def _get_current_fatigue_threshold(self) -> int:
"""动态获取当前的疲惫阈值基于exit_focus_threshold配置
@@ -503,19 +386,6 @@ class HeartFChatting:
"""
return max(10, int(30 / global_config.chat.exit_focus_threshold))
def get_message_count_info(self) -> dict:
"""获取消息计数信息
Returns:
dict: 包含消息计数信息的字典
"""
current_threshold = self._get_current_fatigue_threshold()
return {
"current_count": self._message_count,
"threshold": current_threshold,
"fatigue_triggered": self._fatigue_triggered,
"remaining": max(0, current_threshold - self._message_count),
}
def reset_message_count(self):
"""重置消息计数器用于重新启动focus模式时"""
@@ -526,7 +396,7 @@ class HeartFChatting:
async def shutdown(self):
"""优雅关闭HeartFChatting实例取消活动循环任务"""
logger.info(f"{self.log_prefix} 正在关闭HeartFChatting...")
self._shutting_down = True # <-- 在开始关闭时设置标志位
self.running = False # <-- 在开始关闭时设置标志位
# 记录最终的消息统计
if self._message_count > 0:
@@ -549,34 +419,11 @@ class HeartFChatting:
logger.info(f"{self.log_prefix} 没有活动的HeartFChatting循环任务")
# 清理状态
self._loop_active = False
self.running = False
self._loop_task = None
if self._processing_lock.locked():
self._processing_lock.release()
logger.warning(f"{self.log_prefix} 已释放处理锁")
# 完成性能统计
try:
self.performance_logger.finalize_session()
logger.info(f"{self.log_prefix} 性能统计已完成")
except Exception as e:
logger.warning(f"{self.log_prefix} 完成性能统计时出错: {e}")
# 重置消息计数器,为下次启动做准备
self.reset_message_count()
logger.info(f"{self.log_prefix} HeartFChatting关闭完成")
def get_cycle_history(self, last_n: Optional[int] = None) -> List[Dict[str, Any]]:
"""获取循环历史记录
参数:
last_n: 获取最近n个循环的信息如果为None则获取所有历史记录
返回:
List[Dict[str, Any]]: 循环历史记录列表
"""
history = list(self._cycle_history)
if last_n is not None:
history = history[-last_n:]
return [cycle.to_dict() for cycle in history]