feat:提供方法让HFC结束,当等待过久no_reply,会回到ABSENT模式

This commit is contained in:
SengokuCola
2025-04-26 19:03:36 +08:00
parent 042e969292
commit 3763a0ed9e
6 changed files with 214 additions and 140 deletions

View File

@@ -2,7 +2,7 @@ import asyncio
import time
import traceback
import random # <-- 添加导入
from typing import List, Optional, Dict, Any, Deque
from typing import List, Optional, Dict, Any, Deque, Callable, Coroutine
from collections import deque
from src.plugins.chat.message import MessageRecv, BaseMessageInfo, MessageThinking, MessageSending
from src.plugins.chat.message import Seg # Local import needed after move
@@ -25,7 +25,6 @@ import contextlib
from src.plugins.utils.chat_message_builder import num_new_messages_since
from src.plugins.heartFC_chat.heartFC_Cycleinfo import CycleInfo
from .heartFC_sender import HeartFCSender
# --- End import ---
INITIAL_DURATION = 60.0
@@ -155,18 +154,30 @@ class HeartFChatting:
其生命周期现在由其关联的 SubHeartflow 的 FOCUSED 状态控制。
"""
def __init__(self, chat_id: str, sub_mind: SubMind, observations: Observation):
CONSECUTIVE_NO_REPLY_THRESHOLD = 4 # 连续不回复的阈值
def __init__(
self,
chat_id: str,
sub_mind: SubMind,
observations: Observation,
on_consecutive_no_reply_callback: Callable[[], Coroutine[None, None, None]]
):
"""
HeartFChatting 初始化函数
参数:
chat_id: 聊天流唯一标识符(如stream_id)
sub_mind: 关联的子思维
observations: 关联的观察列表
on_consecutive_no_reply_callback: 连续不回复达到阈值时调用的异步回调函数
"""
# 基础属性
self.stream_id: str = chat_id # 聊天流ID
self.chat_stream: Optional[ChatStream] = None # 关联的聊天流
self.sub_mind: SubMind = sub_mind # 关联的子思维
self.observations: List[Observation] = observations # 关联的观察列表,用于监控聊天流状态
self.on_consecutive_no_reply_callback = on_consecutive_no_reply_callback
# 日志前缀
self.log_prefix: str = f"[{chat_manager.get_stream_name(chat_id) or chat_id}]"
@@ -198,6 +209,8 @@ class HeartFChatting:
self._cycle_counter = 0
self._cycle_history: Deque[CycleInfo] = deque(maxlen=10) # 保留最近10个循环的信息
self._current_cycle: Optional[CycleInfo] = None
self._lian_xu_bu_hui_fu_ci_shu: int = 0 # <--- 新增:连续不回复计数器
self._shutting_down: bool = False # <--- 新增:关闭标志位
async def _initialize(self) -> bool:
"""
@@ -276,6 +289,12 @@ class HeartFChatting:
"""主循环,持续进行计划并可能回复消息,直到被外部取消。"""
try:
while True: # 主循环
# --- 在循环开始处检查关闭标志 ---
if self._shutting_down:
logger.info(f"{self.log_prefix} 检测到关闭标志,退出 HFC 循环。")
break
# --------------------------------
# 创建新的循环信息
self._cycle_counter += 1
self._current_cycle = CycleInfo(self._cycle_counter)
@@ -287,6 +306,12 @@ class HeartFChatting:
# 执行规划和处理阶段
async with self._get_cycle_context() as acquired_lock:
if not acquired_lock:
# 如果未能获取锁(理论上不太可能,除非 shutdown 过程中释放了但又被抢了?)
# 或者也可以在这里再次检查 self._shutting_down
if self._shutting_down:
break # 再次检查,确保退出
logger.warning(f"{self.log_prefix} 未能获取循环处理锁,跳过本次循环。")
await asyncio.sleep(0.1) # 短暂等待避免空转
continue
# 记录规划开始时间点
@@ -320,7 +345,11 @@ class HeartFChatting:
)
except asyncio.CancelledError:
logger.info(f"{self.log_prefix} HeartFChatting: 麦麦的认真水群(HFC)被取消了")
# 设置了关闭标志位后被取消是正常流程
if not self._shutting_down:
logger.warning(f"{self.log_prefix} HeartFChatting: 麦麦的认真水群(HFC)循环意外被取消")
else:
logger.info(f"{self.log_prefix} HeartFChatting: 麦麦的认真水群(HFC)循环已取消 (正常关闭)")
except Exception as e:
logger.error(f"{self.log_prefix} HeartFChatting: 意外错误: {e}")
logger.error(traceback.format_exc())
@@ -451,6 +480,8 @@ class HeartFChatting:
return await handler(reasoning, planner_start_db_time, cycle_timers), ""
except HeartFCError as e:
logger.error(f"{self.log_prefix} 处理{action}时出错: {e}")
# 出错时也重置计数器
self._lian_xu_bu_hui_fu_ci_shu = 0
return False, ""
async def _handle_text_reply(self, reasoning: str, emoji_query: str, cycle_timers: dict) -> tuple[bool, str]:
@@ -471,6 +502,8 @@ class HeartFChatting:
返回:
tuple[bool, str]: (是否回复成功, 思考消息ID)
"""
# 重置连续不回复计数器
self._lian_xu_bu_hui_fu_ci_shu = 0
# 获取锚点消息
anchor_message = await self._get_anchor_message()
@@ -544,8 +577,9 @@ class HeartFChatting:
处理不回复的情况
工作流程:
1. 等待新消息
2. 超时或收到新消息时返回
1. 等待新消息、超时或关闭信号
2. 根据等待结果更新连续不回复计数
3. 如果达到阈值,触发回调
参数:
reasoning: 不回复的原因
@@ -561,14 +595,36 @@ class HeartFChatting:
try:
with Timer("等待新消息", cycle_timers):
return await self._wait_for_new_message(observation, planner_start_db_time, self.log_prefix)
# 等待新消息、超时或关闭信号,并获取结果
await self._wait_for_new_message(observation, planner_start_db_time, self.log_prefix)
if not self._shutting_down:
self._lian_xu_bu_hui_fu_ci_shu += 1
logger.debug(f"{self.log_prefix} 连续不回复计数增加: {self._lian_xu_bu_hui_fu_ci_shu}/{self.CONSECUTIVE_NO_REPLY_THRESHOLD}")
# 检查是否达到阈值
if self._lian_xu_bu_hui_fu_ci_shu >= self.CONSECUTIVE_NO_REPLY_THRESHOLD:
logger.info(f"{self.log_prefix} 连续不回复达到阈值 ({self._lian_xu_bu_hui_fu_ci_shu}次),调用回调请求状态转换")
# 调用回调。注意:这里不重置计数器,依赖回调函数成功改变状态来隐式重置上下文。
await self.on_consecutive_no_reply_callback()
return True
except asyncio.CancelledError:
logger.info(f"{self.log_prefix} 等待被中断")
# 如果在等待过程中任务被取消(可能是因为 shutdown
logger.info(f"{self.log_prefix} 处理 'no_reply' 时等待被中断 (CancelledError)")
# 让异常向上传播,由 _hfc_loop 的异常处理逻辑接管
raise
except Exception as e: # 捕获调用管理器或其他地方可能发生的错误
logger.error(f"{self.log_prefix} 处理 'no_reply' 时发生错误: {e}")
logger.error(traceback.format_exc())
# 发生意外错误时,可以选择是否重置计数器,这里选择不重置
return False # 表示动作未成功
async def _wait_for_new_message(self, observation, planner_start_db_time: float, log_prefix: str) -> bool:
"""
等待新消息
等待新消息 或 检测到关闭信号
参数:
observation: 观察实例
@@ -576,19 +632,36 @@ class HeartFChatting:
log_prefix: 日志前缀
返回:
bool: 是否检测到新消息
bool: 是否检测到新消息 (如果因关闭信号退出则返回 False)
"""
wait_start_time = time.monotonic()
while True:
# --- 在每次循环开始时检查关闭标志 ---
if self._shutting_down:
logger.info(f"{log_prefix} 等待新消息时检测到关闭信号,中断等待。")
return False # 表示因为关闭而退出
# -----------------------------------
# 检查新消息
if await observation.has_new_messages_since(planner_start_db_time):
logger.info(f"{log_prefix} 检测到新消息")
return True
# 检查超时 (放在检查新消息和关闭之后)
if time.monotonic() - wait_start_time > 120:
logger.warning(f"{log_prefix} 等待超时(120秒)")
logger.warning(f"{log_prefix} 等待新消息超时(20秒)")
return False
await asyncio.sleep(1.5)
try:
# 短暂休眠,让其他任务有机会运行,并能更快响应取消或关闭
await asyncio.sleep(0.5) # 缩短休眠时间
except asyncio.CancelledError:
# 如果在休眠时被取消,再次检查关闭标志
# 如果是正常关闭,则不需要警告
if not self._shutting_down:
logger.warning(f"{log_prefix} _wait_for_new_message 的休眠被意外取消")
# 无论如何,重新抛出异常,让上层处理
raise
async def _log_cycle_timers(self, cycle_timers: dict, log_prefix: str):
"""记录循环周期的计时器结果"""
@@ -599,7 +672,9 @@ class HeartFChatting:
timer_strings.append(f"{name}: {formatted_time}")
if timer_strings:
logger.debug(f"{log_prefix} 该次决策耗时: {'; '.join(timer_strings)}")
# 在记录前检查关闭标志
if not self._shutting_down:
logger.debug(f"{log_prefix} 该次决策耗时: {'; '.join(timer_strings)}")
async def _handle_cycle_delay(self, action_taken_this_cycle: bool, cycle_start_time: float, log_prefix: str):
"""处理循环延迟"""
@@ -835,6 +910,7 @@ class HeartFChatting:
async def shutdown(self):
"""优雅关闭HeartFChatting实例取消活动循环任务"""
logger.info(f"{self.log_prefix} 正在关闭HeartFChatting...")
self._shutting_down = True # <-- 在开始关闭时设置标志位
# 取消循环任务
if self._loop_task and not self._loop_task.done():
@@ -865,6 +941,25 @@ class HeartFChatting:
action=action,
reasoning=reasoning,
)
# 在记录循环日志前检查关闭标志
if not self._shutting_down:
self._current_cycle.complete_cycle()
self._cycle_history.append(self._current_cycle)
# 记录循环信息和计时器结果
timer_strings = []
for name, elapsed in self._current_cycle.timers.items():
formatted_time = f"{elapsed * 1000:.2f}毫秒" if elapsed < 1 else f"{elapsed:.2f}"
timer_strings.append(f"{name}: {formatted_time}")
logger.debug(
f"{self.log_prefix} 第 #{self._current_cycle.cycle_id}次思考完成,"
f"耗时: {self._current_cycle.end_time - self._current_cycle.start_time:.2f}秒, "
f"动作: {self._current_cycle.action_type}"
+ (f"\n计时器详情: {'; '.join(timer_strings)}" if timer_strings else "")
)
return prompt
async def _build_planner_prompt(