diff --git a/src/common/logger.py b/src/common/logger.py
index 6ab3505df..176d4629c 100644
--- a/src/common/logger.py
+++ b/src/common/logger.py
@@ -337,7 +337,7 @@ REMOTE_STYLE_CONFIG = {
"file_format": "{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {extra[module]: <15} | 远程 | {message}",
},
"simple": {
- "console_format": "{time:MM-DD HH:mm} | 远程 | {message}",
+ "console_format": "{time:MM-DD HH:mm} | 远程| {message}",
"file_format": "{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {extra[module]: <15} | 远程 | {message}",
},
}
diff --git a/src/heart_flow/README.md b/src/heart_flow/README.md
index a2540ebf1..4ccb662dd 100644
--- a/src/heart_flow/README.md
+++ b/src/heart_flow/README.md
@@ -61,6 +61,7 @@ c HeartFChatting工作方式
c.5.2.2 通过 HeartFCSender 直接发送匹配查询 (emoji_query) 的表情。
c.5.3 如果决策是 no_reply:
c.5.3.1 进入等待状态,直到检测到新消息或超时。
+ c.5.3.2 同时,增加内部连续不回复计数器。如果该计数器达到预设阈值(例如 5 次),则调用初始化时由 `SubHeartflowManager` 提供的回调函数。此回调函数会通知 `SubHeartflowManager` 请求将对应的 `SubHeartflow` 状态转换为 `ABSENT`。如果执行了其他动作(如 `text_reply` 或 `emoji_reply`),则此计数器会被重置。
c.6 循环结束后,记录周期信息 (CycleInfo),并根据情况进行短暂休眠,防止CPU空转。
@@ -152,7 +153,7 @@ c HeartFChatting工作方式
- **状态转换机制** (由 `SubHeartflowManager` 驱动):
- **激活 `CHAT`**: 当 `Heartflow` 状态从 `OFFLINE` 变为允许聊天的状态时,`SubHeartflowManager` 会根据限制(通过 `self.mai_state_info` 获取),选择部分 `ABSENT` 状态的子心流,**检查当前 CHAT 状态数量是否达到上限**,如果未达上限,则调用其 `change_chat_state` 方法将其转换为 `CHAT`。此外,`evaluate_and_transition_subflows_by_llm` 方法也会根据 LLM 的判断,在未达上限时将 `ABSENT` 状态的子心流激活为 `CHAT`。
- **激活 `FOCUSED`**: `SubHeartflowManager` 会定期评估处于 `CHAT` 状态的子心流的兴趣度 (`InterestChatting.start_hfc_probability`),若满足条件且**检查当前 FOCUSED 状态数量未达上限**(通过 `self.mai_state_info` 获取限制),则调用 `change_chat_state` 将其提升为 `FOCUSED`。
- - **停用/回退**: `SubHeartflowManager` 可能因 `Heartflow` 状态变化、达到数量限制、长时间不活跃、随机概率 (`randomly_deactivate_subflows`) 或 LLM 评估 (`evaluate_and_transition_subflows_by_llm` 判断 `CHAT` 状态子心流应休眠) 等原因,调用 `change_chat_state` 将子心流状态设置为 `ABSENT` 或从 `FOCUSED` 回退到 `CHAT`。当子心流进入 `ABSENT` 状态后,如果持续一小时不活跃,才会被后台清理任务删除。
+ - **停用/回退**: `SubHeartflowManager` 可能因 `Heartflow` 状态变化、达到数量限制、长时间不活跃、随机概率 (`randomly_deactivate_subflows`)、LLM 评估 (`evaluate_and_transition_subflows_by_llm` 判断 `CHAT` 状态子心流应休眠) 或收到来自 `HeartFChatting` 的连续不回复回调信号 (`request_absent_transition`) 等原因,调用 `change_chat_state` 将子心流状态设置为 `ABSENT` 或从 `FOCUSED` 回退到 `CHAT`。当子心流进入 `ABSENT` 状态后,如果持续一小时不活跃,才会被后台清理任务删除。
- **注意**: `change_chat_state` 方法本身只负责执行状态转换和管理内部聊天实例(`NormalChatInstance`/`HeartFlowChatInstance`),不再进行限额检查。限额检查的责任完全由调用方(即 `SubHeartflowManager` 中的相关方法,这些方法会使用内部存储的 `mai_state_info` 来获取限制)承担。
## 3. 聊天实例详解 (Chat Instances Explained)
diff --git a/src/heart_flow/background_tasks.py b/src/heart_flow/background_tasks.py
index 7ae4b62f9..d2bd93213 100644
--- a/src/heart_flow/background_tasks.py
+++ b/src/heart_flow/background_tasks.py
@@ -112,15 +112,6 @@ class BackgroundTaskManager:
f"兴趣评估任务已启动 间隔:{self.interest_eval_interval}s",
"_interest_eval_task",
),
- # 新增随机停用任务配置
- (
- self._random_deactivation_task,
- self._run_random_deactivation_cycle,
- "hf_random_deactivation",
- "debug", # 设为debug,避免过多日志
- f"随机停用任务已启动 间隔:{self.random_deactivation_interval}s",
- "_random_deactivation_task",
- ),
]
# 统一启动所有任务
@@ -264,11 +255,6 @@ class BackgroundTaskManager:
# --- 结束新增 ---
- # --- 新增随机停用工作函数 ---
- async def _perform_random_deactivation_work(self):
- """执行一轮子心流随机停用检查。"""
- await self.subheartflow_manager.randomly_deactivate_subflows()
-
# --- 结束新增 ---
# --- Specific Task Runners --- #
@@ -299,16 +285,3 @@ class BackgroundTaskManager:
interval=self.interest_eval_interval,
task_func=self._perform_interest_eval_work,
)
-
- # --- 结束新增 ---
-
- # --- 新增随机停用任务运行器 ---
- async def _run_random_deactivation_cycle(self):
- """运行随机停用循环。"""
- await self._run_periodic_loop(
- task_name="Random Deactivation",
- interval=self.random_deactivation_interval,
- task_func=self._perform_random_deactivation_work,
- )
-
- # --- 结束新增 ---
diff --git a/src/heart_flow/sub_heartflow.py b/src/heart_flow/sub_heartflow.py
index 364303ccd..ead07f53c 100644
--- a/src/heart_flow/sub_heartflow.py
+++ b/src/heart_flow/sub_heartflow.py
@@ -2,7 +2,7 @@ from .observation import Observation, ChattingObservation
import asyncio
from src.config.config import global_config
import time
-from typing import Optional, List, Dict, Tuple
+from typing import Optional, List, Dict, Tuple, Callable, Coroutine
import traceback
from src.common.logger import get_module_logger, LogConfig, SUB_HEARTFLOW_STYLE_CONFIG # noqa: E402
import random
@@ -15,6 +15,11 @@ from src.heart_flow.mai_state_manager import MaiStateInfo
from src.heart_flow.chat_state_info import ChatState, ChatStateInfo
from src.heart_flow.sub_mind import SubMind
+# # --- REMOVE: Conditional import --- #
+# if TYPE_CHECKING:
+# from src.heart_flow.subheartflow_manager import SubHeartflowManager
+# # --- END REMOVE --- #
+
# 定义常量 (从 interest.py 移动过来)
MAX_INTEREST = 15.0
@@ -234,16 +239,23 @@ class InterestChatting:
class SubHeartflow:
- def __init__(self, subheartflow_id, mai_states: MaiStateInfo):
+ def __init__(
+ self,
+ subheartflow_id,
+ mai_states: MaiStateInfo,
+ hfc_no_reply_callback: Callable[[], Coroutine[None, None, None]],
+ ):
"""子心流初始化函数
Args:
subheartflow_id: 子心流唯一标识符
- parent_heartflow: 父级心流实例
+ mai_states: 麦麦状态信息实例
+ hfc_no_reply_callback: HFChatting 连续不回复时触发的回调
"""
# 基础属性,两个值是一样的
self.subheartflow_id = subheartflow_id
self.chat_id = subheartflow_id
+ self.hfc_no_reply_callback = hfc_no_reply_callback
# 麦麦的状态
self.mai_states = mai_states
@@ -364,11 +376,17 @@ class SubHeartflow:
# 如果实例不存在,则创建并启动
logger.info(f"{log_prefix} 麦麦准备开始专注聊天 (创建新实例)...")
try:
+ # 创建 HeartFChatting 实例,并传递 从构造函数传入的 回调函数
self.heart_fc_instance = HeartFChatting(
- chat_id=self.chat_id, sub_mind=self.sub_mind, observations=self.observations
+ chat_id=self.subheartflow_id,
+ sub_mind=self.sub_mind,
+ observations=self.observations, # 传递所有观察者
+ on_consecutive_no_reply_callback=self.hfc_no_reply_callback, # <-- Use stored callback
)
+
+ # 初始化并启动 HeartFChatting
if await self.heart_fc_instance._initialize():
- await self.heart_fc_instance.start() # 初始化成功后启动循环
+ await self.heart_fc_instance.start()
logger.info(f"{log_prefix} 麦麦已成功进入专注聊天模式 (新实例已启动)。")
return True
else:
diff --git a/src/heart_flow/sub_mind.py b/src/heart_flow/sub_mind.py
index be995b843..f340b717b 100644
--- a/src/heart_flow/sub_mind.py
+++ b/src/heart_flow/sub_mind.py
@@ -103,7 +103,7 @@ class SubMind:
individuality = Individuality.get_instance()
# 构建个性部分
- prompt_personality = f"你的名字是{individuality.personality.bot_nickname},你"
+ prompt_personality = f"你正在扮演名为{individuality.personality.bot_nickname}的人类,你"
prompt_personality += individuality.personality.personality_core
# 随机添加个性侧面
diff --git a/src/heart_flow/subheartflow_manager.py b/src/heart_flow/subheartflow_manager.py
index 46e34d1d3..6d22494d4 100644
--- a/src/heart_flow/subheartflow_manager.py
+++ b/src/heart_flow/subheartflow_manager.py
@@ -3,6 +3,7 @@ import time
import random
from typing import Dict, Any, Optional, List
import json # 导入 json 模块
+import functools # <-- 新增导入
# 导入日志模块
from src.common.logger import get_module_logger, LogConfig, SUBHEARTFLOW_MANAGER_STYLE_CONFIG
@@ -18,9 +19,10 @@ from .observation import ChattingObservation
# 导入LLM请求工具
from src.plugins.models.utils_model import LLMRequest
from src.config.config import global_config
-
+from src.individuality.individuality import Individuality
import traceback
+
# 初始化日志记录器
subheartflow_manager_log_config = LogConfig(
@@ -77,8 +79,17 @@ class SubHeartflowManager:
return subflow
try:
- # 初始化子心流, 传入存储的 mai_state_info
- new_subflow = SubHeartflow(subheartflow_id, self.mai_state_info)
+ # --- 使用 functools.partial 创建 HFC 回调 --- #
+ # 将 manager 的 _handle_hfc_no_reply 方法与当前的 subheartflow_id 绑定
+ hfc_callback = functools.partial(self._handle_hfc_no_reply, subheartflow_id)
+ # --- 结束创建回调 --- #
+
+ # 初始化子心流, 传入 mai_state_info 和 partial 创建的回调
+ new_subflow = SubHeartflow(
+ subheartflow_id,
+ self.mai_state_info,
+ hfc_callback, # <-- 传递 partial 创建的回调
+ )
# 异步初始化
await new_subflow.initialize()
@@ -100,24 +111,56 @@ class SubHeartflowManager:
logger.error(f"创建子心流 {subheartflow_id} 失败: {e}", exc_info=True)
return None
+ # --- 新增:内部方法,用于尝试将单个子心流设置为 ABSENT ---
+ async def _try_set_subflow_absent_internal(self, subflow: "SubHeartflow", log_prefix: str) -> bool:
+ """
+ 尝试将给定的子心流对象状态设置为 ABSENT (内部方法,不处理锁)。
+
+ Args:
+ subflow: 子心流对象。
+ log_prefix: 用于日志记录的前缀 (例如 "[子心流管理]" 或 "[停用]")。
+
+ Returns:
+ bool: 如果状态成功变为 ABSENT 或原本就是 ABSENT,返回 True;否则返回 False。
+ """
+ flow_id = subflow.subheartflow_id
+ stream_name = chat_manager.get_stream_name(flow_id) or flow_id
+
+ if subflow.chat_state.chat_status != ChatState.ABSENT:
+ logger.debug(f"{log_prefix} 设置 {stream_name} 状态为 ABSENT")
+ try:
+ await subflow.change_chat_state(ChatState.ABSENT)
+ # 再次检查以确认状态已更改 (change_chat_state 内部应确保)
+ if subflow.chat_state.chat_status == ChatState.ABSENT:
+ return True
+ else:
+ logger.warning(
+ f"{log_prefix} 调用 change_chat_state 后,{stream_name} 状态仍为 {subflow.chat_state.chat_status.value}"
+ )
+ return False
+ except Exception as e:
+ logger.error(f"{log_prefix} 设置 {stream_name} 状态为 ABSENT 时失败: {e}", exc_info=True)
+ return False
+ else:
+ logger.debug(f"{log_prefix} {stream_name} 已是 ABSENT 状态")
+ return True # 已经是目标状态,视为成功
+
+ # --- 结束新增 ---
+
async def sleep_subheartflow(self, subheartflow_id: Any, reason: str) -> bool:
- """停止指定的子心流并清理资源"""
- subheartflow = self.subheartflows.get(subheartflow_id)
- if not subheartflow:
- return False
+ """停止指定的子心流并将其状态设置为 ABSENT"""
+ log_prefix = "[子心流管理]"
+ async with self._lock: # 加锁以安全访问字典
+ subheartflow = self.subheartflows.get(subheartflow_id)
- stream_name = chat_manager.get_stream_name(subheartflow_id) or subheartflow_id
- logger.info(f"[子心流管理] 正在停止 {stream_name}, 原因: {reason}")
+ stream_name = chat_manager.get_stream_name(subheartflow_id) or subheartflow_id
+ logger.info(f"{log_prefix} 正在停止 {stream_name}, 原因: {reason}")
- try:
- # 设置状态为ABSENT释放资源
- if subheartflow.chat_state.chat_status != ChatState.ABSENT:
- logger.debug(f"[子心流管理] 设置 {stream_name} 状态为ABSENT")
- await subheartflow.change_chat_state(ChatState.ABSENT)
- else:
- logger.debug(f"[子心流管理] {stream_name} 已是ABSENT状态")
- except Exception as e:
- logger.error(f"[子心流管理] 设置ABSENT状态失败: {e}")
+ # 调用内部方法处理状态变更
+ success = await self._try_set_subflow_absent_internal(subheartflow, log_prefix)
+
+ return success
+ # 锁在此处自动释放
def get_inactive_subheartflows(self, max_age_seconds=INACTIVE_THRESHOLD_SECONDS):
"""识别并返回需要清理的不活跃(处于ABSENT状态超过一小时)子心流(id, 原因)"""
@@ -185,174 +228,107 @@ class SubHeartflowManager:
async def deactivate_all_subflows(self):
"""将所有子心流的状态更改为 ABSENT (例如主状态变为OFFLINE时调用)"""
- # logger.info("[停用] 开始将所有子心流状态设置为 ABSENT")
- # 使用 list() 创建一个当前值的快照,防止在迭代时修改字典
- flows_to_update = list(self.subheartflows.values())
-
- if not flows_to_update:
- logger.debug("[停用] 无活跃子心流,无需操作")
- return
-
+ log_prefix = "[停用]"
changed_count = 0
- for subflow in flows_to_update:
- flow_id = subflow.subheartflow_id
- stream_name = chat_manager.get_stream_name(flow_id) or flow_id
- # 再次检查子心流是否仍然存在于管理器中,以防万一在迭代过程中被移除
+ processed_count = 0
- if subflow.chat_state.chat_status != ChatState.ABSENT:
- logger.debug(
- f"正在将子心流 {stream_name} 的状态从 {subflow.chat_state.chat_status.value} 更改为 ABSENT"
- )
- try:
- # 调用 change_chat_state 将状态设置为 ABSENT
- await subflow.change_chat_state(ChatState.ABSENT)
- # 验证状态是否真的改变了
- if (
- flow_id in self.subheartflows
- and self.subheartflows[flow_id].chat_state.chat_status == ChatState.ABSENT
- ):
+ async with self._lock: # 获取锁以安全迭代
+ # 使用 list() 创建一个当前值的快照,防止在迭代时修改字典
+ flows_to_update = list(self.subheartflows.values())
+ processed_count = len(flows_to_update)
+ if not flows_to_update:
+ logger.debug(f"{log_prefix} 无活跃子心流,无需操作")
+ return
+
+ for subflow in flows_to_update:
+ # 记录原始状态,以便统计实际改变的数量
+ original_state_was_absent = subflow.chat_state.chat_status == ChatState.ABSENT
+
+ success = await self._try_set_subflow_absent_internal(subflow, log_prefix)
+
+ # 如果成功设置为 ABSENT 且原始状态不是 ABSENT,则计数
+ if success and not original_state_was_absent:
+ if subflow.chat_state.chat_status == ChatState.ABSENT:
changed_count += 1
else:
- logger.warning(
- f"[停用] 尝试更改子心流 {stream_name} 状态后,状态仍未变为 ABSENT 或子心流已消失。"
- )
- except Exception as e:
- logger.error(f"[停用] 更改子心流 {stream_name} 状态为 ABSENT 时出错: {e}", exc_info=True)
- else:
- logger.debug(f"[停用] 子心流 {stream_name} 已处于 ABSENT 状态,无需更改。")
+ # 这种情况理论上不应发生,如果内部方法返回 True 的话
+ stream_name = chat_manager.get_stream_name(subflow.subheartflow_id) or subflow.subheartflow_id
+ logger.warning(f"{log_prefix} 内部方法声称成功但 {stream_name} 状态未变为 ABSENT。")
+ # 锁在此处自动释放
logger.info(
- f"下限完成,共处理 {len(flows_to_update)} 个子心流,成功将 {changed_count} 个子心流的状态更改为 ABSENT。"
+ f"{log_prefix} 完成,共处理 {processed_count} 个子心流,成功将 {changed_count} 个非 ABSENT 子心流的状态更改为 ABSENT。"
)
async def evaluate_interest_and_promote(self):
"""评估子心流兴趣度,满足条件且未达上限则提升到FOCUSED状态(基于start_hfc_probability)"""
- log_prefix = "[兴趣评估]"
- # 使用 self.mai_state_info 获取当前状态和限制
- current_state = self.mai_state_info.get_current_state()
- focused_limit = current_state.get_focused_chat_max_num()
-
- if int(time.time()) % 20 == 0: # 每20秒输出一次
- logger.debug(f"{log_prefix} 当前状态 ({current_state.value}) 可以在{focused_limit}个群激情聊天")
-
- if focused_limit <= 0:
- # logger.debug(f"{log_prefix} 当前状态 ({current_state.value}) 不允许 FOCUSED 子心流")
- return
-
- current_focused_count = self.count_subflows_by_state(ChatState.FOCUSED)
- if current_focused_count >= focused_limit:
- logger.debug(f"{log_prefix} 已达专注上限 ({current_focused_count}/{focused_limit})")
- return
-
- for sub_hf in list(self.subheartflows.values()):
- flow_id = sub_hf.subheartflow_id
- stream_name = chat_manager.get_stream_name(flow_id) or flow_id
-
- # 跳过非CHAT状态或已经是FOCUSED状态的子心流
- if sub_hf.chat_state.chat_status == ChatState.FOCUSED:
- continue
-
- from .mai_state_manager import enable_unlimited_hfc_chat
-
- if not enable_unlimited_hfc_chat:
- if sub_hf.chat_state.chat_status != ChatState.CHAT:
- continue
-
- # 检查是否满足提升概率
- if random.random() >= sub_hf.interest_chatting.start_hfc_probability:
- continue
-
- # 再次检查是否达到上限
- if current_focused_count >= focused_limit:
- logger.debug(f"{log_prefix} [{stream_name}] 已达专注上限")
- break
-
- # 获取最新状态并执行提升
- current_subflow = self.subheartflows.get(flow_id)
- if not current_subflow:
- continue
-
- logger.info(
- f"{log_prefix} [{stream_name}] 触发 认真水群 (概率={current_subflow.interest_chatting.start_hfc_probability:.2f})"
- )
-
- # 执行状态提升
- await current_subflow.change_chat_state(ChatState.FOCUSED)
-
- # 验证提升结果
- if (
- final_subflow := self.subheartflows.get(flow_id)
- ) and final_subflow.chat_state.chat_status == ChatState.FOCUSED:
- current_focused_count += 1
-
- async def randomly_deactivate_subflows(self, deactivation_probability: float = 0.1):
- """以一定概率将 FOCUSED 或 CHAT 状态的子心流回退到 ABSENT 状态。"""
- log_prefix_manager = "[子心流管理器-随机停用]"
- logger.debug(f"{log_prefix_manager} 开始随机停用检查... (概率: {deactivation_probability:.0%})")
-
- # 使用快照安全遍历
- subflows_snapshot = list(self.subheartflows.values())
- deactivated_count = 0
-
try:
- for sub_hf in subflows_snapshot:
+ log_prefix = "[兴趣评估]"
+ # 使用 self.mai_state_info 获取当前状态和限制
+ current_state = self.mai_state_info.get_current_state()
+ focused_limit = current_state.get_focused_chat_max_num()
+ logger.debug(f"{log_prefix} 当前状态 ({current_state.value}) 开始尝试提升到FOCUSED状态")
+
+ if int(time.time()) % 20 == 0: # 每20秒输出一次
+ logger.debug(f"{log_prefix} 当前状态 ({current_state.value}) 可以在{focused_limit}个群激情聊天")
+
+ if focused_limit <= 0:
+ # logger.debug(f"{log_prefix} 当前状态 ({current_state.value}) 不允许 FOCUSED 子心流")
+ return
+
+ current_focused_count = self.count_subflows_by_state(ChatState.FOCUSED)
+ if current_focused_count >= focused_limit:
+ logger.debug(f"{log_prefix} 已达专注上限 ({current_focused_count}/{focused_limit})")
+ return
+
+ for sub_hf in list(self.subheartflows.values()):
flow_id = sub_hf.subheartflow_id
stream_name = chat_manager.get_stream_name(flow_id) or flow_id
- log_prefix_flow = f"[{stream_name}]"
- current_state = sub_hf.chat_state.chat_status
- # 只处理 FOCUSED 或 CHAT 状态
- if current_state not in [ChatState.FOCUSED, ChatState.CHAT]:
+ # 跳过非CHAT状态或已经是FOCUSED状态的子心流
+ if sub_hf.chat_state.chat_status == ChatState.FOCUSED:
continue
- # 检查随机概率
- if random.random() < deactivation_probability:
- logger.info(
- f"{log_prefix_manager} {log_prefix_flow} 随机触发停用 (从 {current_state.value}) -> ABSENT"
- )
+ from .mai_state_manager import enable_unlimited_hfc_chat
- # 获取当前实例以检查最新状态
- current_subflow = self.subheartflows.get(flow_id)
- if not current_subflow or current_subflow.chat_state.chat_status != current_state:
- logger.warning(f"{log_prefix_manager} {log_prefix_flow} 尝试停用时状态已改变或实例消失,跳过。")
+ if not enable_unlimited_hfc_chat:
+ if sub_hf.chat_state.chat_status != ChatState.CHAT:
continue
- # --- 状态设置 --- #
- # 注意:这里传递的状态数量是 *停用前* 的状态数量
- await current_subflow.change_chat_state(ChatState.ABSENT)
+ # 检查是否满足提升概率
+ if random.random() >= sub_hf.interest_chatting.start_hfc_probability:
+ continue
- # --- 状态验证 (可选) ---
- final_subflow = self.subheartflows.get(flow_id)
- if final_subflow:
- final_state = final_subflow.chat_state.chat_status
- if final_state == ChatState.ABSENT:
- logger.debug(
- f"{log_prefix_manager} {log_prefix_flow} 成功从 {current_state.value} 停用到 ABSENT 状态"
- )
- deactivated_count += 1
- else:
- logger.warning(
- f"{log_prefix_manager} {log_prefix_flow} 尝试停用到 ABSENT 后状态仍为 {final_state.value}"
- )
- else:
- logger.warning(f"{log_prefix_manager} {log_prefix_flow} 停用后验证时子心流 {flow_id} 消失")
+ # 再次检查是否达到上限
+ if current_focused_count >= focused_limit:
+ logger.debug(f"{log_prefix} [{stream_name}] 已达专注上限")
+ break
+ # 获取最新状态并执行提升
+ current_subflow = self.subheartflows.get(flow_id)
+ if not current_subflow:
+ continue
+
+ logger.info(
+ f"{log_prefix} [{stream_name}] 触发 认真水群 (概率={current_subflow.interest_chatting.start_hfc_probability:.2f})"
+ )
+
+ # 执行状态提升
+ await current_subflow.change_chat_state(ChatState.FOCUSED)
+
+ # 验证提升结果
+ if (
+ final_subflow := self.subheartflows.get(flow_id)
+ ) and final_subflow.chat_state.chat_status == ChatState.FOCUSED:
+ current_focused_count += 1
except Exception as e:
- logger.error(f"{log_prefix_manager} 随机停用周期出错: {e}", exc_info=True)
-
- if deactivated_count > 0:
- logger.info(f"{log_prefix_manager} 随机停用周期结束, 成功停用 {deactivated_count} 个子心流。")
- else:
- logger.debug(f"{log_prefix_manager} 随机停用周期结束, 未停用任何子心流。")
+ logger.error(f"启动HFC 兴趣评估失败: {e}", exc_info=True)
async def evaluate_and_transition_subflows_by_llm(self):
"""
使用LLM评估每个子心流的状态,并根据LLM的判断执行状态转换(ABSENT <-> CHAT)。
注意:此函数包含对假设的LLM函数的调用。
"""
- log_prefix = "[LLM状态评估]"
- logger.info(f"{log_prefix} 开始基于LLM评估子心流状态...")
-
# 获取当前状态和限制,用于CHAT激活检查
current_mai_state = self.mai_state_info.get_current_state()
chat_limit = current_mai_state.get_normal_chat_max_num()
@@ -366,55 +342,67 @@ class SubHeartflowManager:
current_chat_count = self.count_subflows_by_state_nolock(ChatState.CHAT)
if not subflows_snapshot:
- logger.info(f"{log_prefix} 当前没有子心流需要评估。")
+ logger.info("当前没有子心流需要评估。")
return
for sub_hf in subflows_snapshot:
flow_id = sub_hf.subheartflow_id
stream_name = chat_manager.get_stream_name(flow_id) or flow_id
+ log_prefix = f"[{stream_name}]"
current_subflow_state = sub_hf.chat_state.chat_status
- # --- 获取观察内容 ---
- # 从 sub_hf.observations 获取 ChattingObservation 并提取信息
_observation_summary = "没有可用的观察信息。" # 默认值
- try:
- # 检查 observations 列表是否存在且不为空
- # 假设第一个观察者是 ChattingObservation
- first_observation = sub_hf.observations[0]
- if isinstance(first_observation, ChattingObservation):
- # 组合中期记忆和当前聊天内容
- current_chat = first_observation.talking_message_str or "当前无聊天内容。"
- combined_summary = f"当前聊天内容:\n{current_chat}"
- else:
- logger.warning(f"{log_prefix} [{stream_name}] 第一个观察者不是 ChattingObservation 类型。")
-
- except Exception as e:
- logger.warning(f"{log_prefix} [{stream_name}] 获取观察信息失败: {e}", exc_info=True)
- # 保留默认值或错误信息
- combined_summary = f"获取观察信息时出错: {e}"
+ first_observation = sub_hf.observations[0]
+ if isinstance(first_observation, ChattingObservation):
+ # 组合中期记忆和当前聊天内容
+ await first_observation.observe()
+ current_chat = first_observation.talking_message_str or "当前无聊天内容。"
+ combined_summary = f"当前聊天内容:\n{current_chat}"
+ else:
+ logger.warning(f"{log_prefix} [{stream_name}] 第一个观察者不是 ChattingObservation 类型。")
# --- 获取麦麦状态 ---
- mai_state_description = f"麦麦当前状态: {current_mai_state.value}。"
+ mai_state_description = f"你当前状态: {current_mai_state.value}。"
+
+ # 获取个性化信息
+ individuality = Individuality.get_instance()
+
+ # 构建个性部分
+ prompt_personality = f"你正在扮演名为{individuality.personality.bot_nickname}的人类,你"
+ prompt_personality += individuality.personality.personality_core
+
+ # 随机添加个性侧面
+ if individuality.personality.personality_sides:
+ random_side = random.choice(individuality.personality.personality_sides)
+ prompt_personality += f",{random_side}"
+
+ # 随机添加身份细节
+ if individuality.identity.identity_detail:
+ random_detail = random.choice(individuality.identity.identity_detail)
+ prompt_personality += f",{random_detail}"
# --- 针对 ABSENT 状态 ---
if current_subflow_state == ChatState.ABSENT:
# 构建Prompt
prompt = (
- f"子心流 [{stream_name}] 当前处于非活跃(ABSENT)状态.\n"
+ f"{prompt_personality}\n"
+ f"你当前没有在: [{stream_name}] 群中聊天。\n"
f"{mai_state_description}\n"
- f"最近观察到的内容摘要:\n---\n{combined_summary}\n---\n"
- f"基于以上信息,该子心流是否表现出足够的活跃迹象或重要性,"
- f"值得将其唤醒并进入常规聊天(CHAT)状态?\n"
- f"请以 JSON 格式回答,包含一个键 'decision',其值为 true 或 false.\n"
- f'例如:{{"decision": true}}\n'
+ f"这个群里最近的聊天内容是:\n---\n{combined_summary}\n---\n"
+ f"基于以上信息,请判断你是否愿意在这个群开始闲聊,"
+ f"进入常规聊天(CHAT)状态?\n"
+ f"给出你的判断,和理由,然后以 JSON 格式回答"
+ f"包含键 'decision',如果要开始聊天,值为 true ,否则为 false.\n"
+ f"包含键 'reason',其值为你的理由。\n"
+ f'例如:{{"decision": true, "reason": "因为我想聊天"}}\n'
f"请只输出有效的 JSON 对象。"
)
# 调用LLM评估
should_activate = await self._llm_evaluate_state_transition(prompt)
if should_activate is None: # 处理解析失败或意外情况
- logger.warning(f"{log_prefix} [{stream_name}] LLM评估返回无效结果,跳过。")
+ logger.warning(f"{log_prefix}LLM评估返回无效结果,跳过。")
continue
if should_activate:
@@ -423,49 +411,50 @@ class SubHeartflowManager:
current_chat_count = self.count_subflows_by_state_nolock(ChatState.CHAT)
if current_chat_count < chat_limit:
logger.info(
- f"{log_prefix} [{stream_name}] LLM建议激活到CHAT状态,且未达上限({current_chat_count}/{chat_limit})。正在尝试转换..."
+ f"{log_prefix}LLM建议激活到CHAT状态,且未达上限({current_chat_count}/{chat_limit})。正在尝试转换..."
)
await sub_hf.change_chat_state(ChatState.CHAT)
if sub_hf.chat_state.chat_status == ChatState.CHAT:
transitioned_to_chat += 1
else:
- logger.warning(f"{log_prefix} [{stream_name}] 尝试激活到CHAT失败。")
+ logger.warning(f"{log_prefix}尝试激活到CHAT失败。")
else:
logger.info(
- f"{log_prefix} [{stream_name}] LLM建议激活到CHAT状态,但已达到上限({current_chat_count}/{chat_limit})。跳过转换。"
+ f"{log_prefix}LLM建议激活到CHAT状态,但已达到上限({current_chat_count}/{chat_limit})。跳过转换。"
)
+ else:
+ logger.info(f"{log_prefix}LLM建议不激活到CHAT状态。")
# --- 针对 CHAT 状态 ---
elif current_subflow_state == ChatState.CHAT:
# 构建Prompt
prompt = (
- f"子心流 [{stream_name}] 当前处于常规聊天(CHAT)状态.\n"
+ f"{prompt_personality}\n"
+ f"你正在在: [{stream_name}] 群中聊天。\n"
f"{mai_state_description}\n"
- f"最近观察到的内容摘要:\n---\n{combined_summary}\n---\n"
- f"基于以上信息,该子心流是否表现出不活跃、对话结束或不再需要关注的迹象,"
- f"应该让其进入休眠(ABSENT)状态?\n"
- f"请以 JSON 格式回答,包含一个键 'decision',其值为 true (表示应休眠) 或 false (表示不应休眠).\n"
- f'例如:{{"decision": true}}\n'
+ f"这个群里最近的聊天内容是:\n---\n{combined_summary}\n---\n"
+ f"基于以上信息,请判断你是否愿意在这个群继续闲聊,"
+ f"还是暂时离开聊天,进入休眠状态?\n"
+ f"给出你的判断,和理由,然后以 JSON 格式回答"
+ f"包含键 'decision',如果要离开聊天,值为 true ,否则为 false.\n"
+ f"包含键 'reason',其值为你的理由。\n"
+ f'例如:{{"decision": true, "reason": "因为我想休息"}}\n'
f"请只输出有效的 JSON 对象。"
)
# 调用LLM评估
should_deactivate = await self._llm_evaluate_state_transition(prompt)
if should_deactivate is None: # 处理解析失败或意外情况
- logger.warning(f"{log_prefix} [{stream_name}] LLM评估返回无效结果,跳过。")
+ logger.warning(f"{log_prefix}LLM评估返回无效结果,跳过。")
continue
if should_deactivate:
- logger.info(f"{log_prefix} [{stream_name}] LLM建议进入ABSENT状态。正在尝试转换...")
+ logger.info(f"{log_prefix}LLM建议进入ABSENT状态。正在尝试转换...")
await sub_hf.change_chat_state(ChatState.ABSENT)
if sub_hf.chat_state.chat_status == ChatState.ABSENT:
transitioned_to_absent += 1
-
- logger.info(
- f"{log_prefix} LLM评估周期结束。"
- f" 成功转换到CHAT: {transitioned_to_chat}."
- f" 成功转换到ABSENT: {transitioned_to_absent}."
- )
+ else:
+ logger.info(f"{log_prefix}LLM建议不进入ABSENT状态。")
async def _llm_evaluate_state_transition(self, prompt: str) -> Optional[bool]:
"""
@@ -482,9 +471,9 @@ class SubHeartflowManager:
try:
# --- 真实的 LLM 调用 ---
response_text, _ = await self.llm_state_evaluator.generate_response_async(prompt)
- logger.debug(
- f"{log_prefix} 使用模型 {self.llm_state_evaluator.model_name} 评估,原始响应: ```{response_text}```"
- )
+ # logger.debug(f"{log_prefix} 使用模型 {self.llm_state_evaluator.model_name} 评估")
+ logger.debug(f"{log_prefix} 原始输入: {prompt}")
+ logger.debug(f"{log_prefix} 原始响应: {response_text}")
# --- 解析 JSON 响应 ---
try:
@@ -577,3 +566,48 @@ class SubHeartflowManager:
logger.error(f"删除 SubHeartflow {subheartflow_id} 时出错: {e}", exc_info=True)
else:
logger.warning(f"尝试删除不存在的 SubHeartflow: {subheartflow_id}")
+
+ # --- 新增:处理 HFC 无回复回调的专用方法 --- #
+ async def _handle_hfc_no_reply(self, subheartflow_id: Any):
+ """处理来自 HeartFChatting 的连续无回复信号 (通过 partial 绑定 ID)"""
+ # 注意:这里不需要再获取锁,因为 request_absent_transition 内部会处理锁
+ logger.debug(f"[管理器 HFC 处理器] 接收到来自 {subheartflow_id} 的 HFC 无回复信号")
+ await self.request_absent_transition(subheartflow_id)
+
+ # --- 结束新增 --- #
+
+ # --- 新增:处理来自 HeartFChatting 的状态转换请求 --- #
+ async def request_absent_transition(self, subflow_id: Any):
+ """
+ 接收来自 HeartFChatting 的请求,将特定子心流的状态转换为 ABSENT。
+ 通常在连续多次 "no_reply" 后被调用。
+
+ Args:
+ subflow_id: 需要转换状态的子心流 ID。
+ """
+ async with self._lock:
+ subflow = self.subheartflows.get(subflow_id)
+ if not subflow:
+ logger.warning(f"[状态转换请求] 尝试转换不存在的子心流 {subflow_id} 到 ABSENT")
+ return
+
+ stream_name = chat_manager.get_stream_name(subflow_id) or subflow_id
+ current_state = subflow.chat_state.chat_status
+
+ # 仅当子心流处于 FOCUSED 状态时才进行转换
+ # 因为 HeartFChatting 只在 FOCUSED 状态下运行
+ if current_state == ChatState.FOCUSED:
+ logger.info(f"[状态转换请求] 接收到请求,将 {stream_name} (当前: {current_state.value}) 转换为 ABSENT")
+ try:
+ await subflow.change_chat_state(ChatState.ABSENT)
+ logger.info(f"[状态转换请求] {stream_name} 状态已成功转换为 ABSENT")
+ except Exception as e:
+ logger.error(f"[状态转换请求] 转换 {stream_name} 到 ABSENT 时出错: {e}", exc_info=True)
+ elif current_state == ChatState.ABSENT:
+ logger.debug(f"[状态转换请求] {stream_name} 已处于 ABSENT 状态,无需转换")
+ else:
+ logger.warning(
+ f"[状态转换请求] 收到对 {stream_name} 的请求,但其状态为 {current_state.value} (非 FOCUSED),不执行转换"
+ )
+
+ # --- 结束新增 --- #
diff --git a/src/plugins/heartFC_chat/heartFC_chat.py b/src/plugins/heartFC_chat/heartFC_chat.py
index 2a33d0671..8f376b37f 100644
--- a/src/plugins/heartFC_chat/heartFC_chat.py
+++ b/src/plugins/heartFC_chat/heartFC_chat.py
@@ -2,7 +2,7 @@ import asyncio
import time
import traceback
import random # <-- 添加导入
-from typing import List, Optional, Dict, Any, Deque
+from typing import List, Optional, Dict, Any, Deque, Callable, Coroutine
from collections import deque
from src.plugins.chat.message import MessageRecv, BaseMessageInfo, MessageThinking, MessageSending
from src.plugins.chat.message import Seg # Local import needed after move
@@ -25,7 +25,6 @@ import contextlib
from src.plugins.utils.chat_message_builder import num_new_messages_since
from src.plugins.heartFC_chat.heartFC_Cycleinfo import CycleInfo
from .heartFC_sender import HeartFCSender
-# --- End import ---
INITIAL_DURATION = 60.0
@@ -155,18 +154,30 @@ class HeartFChatting:
其生命周期现在由其关联的 SubHeartflow 的 FOCUSED 状态控制。
"""
- def __init__(self, chat_id: str, sub_mind: SubMind, observations: Observation):
+ CONSECUTIVE_NO_REPLY_THRESHOLD = 4 # 连续不回复的阈值
+
+ def __init__(
+ self,
+ chat_id: str,
+ sub_mind: SubMind,
+ observations: Observation,
+ on_consecutive_no_reply_callback: Callable[[], Coroutine[None, None, None]],
+ ):
"""
HeartFChatting 初始化函数
参数:
chat_id: 聊天流唯一标识符(如stream_id)
+ sub_mind: 关联的子思维
+ observations: 关联的观察列表
+ on_consecutive_no_reply_callback: 连续不回复达到阈值时调用的异步回调函数
"""
# 基础属性
self.stream_id: str = chat_id # 聊天流ID
self.chat_stream: Optional[ChatStream] = None # 关联的聊天流
self.sub_mind: SubMind = sub_mind # 关联的子思维
self.observations: List[Observation] = observations # 关联的观察列表,用于监控聊天流状态
+ self.on_consecutive_no_reply_callback = on_consecutive_no_reply_callback
# 日志前缀
self.log_prefix: str = f"[{chat_manager.get_stream_name(chat_id) or chat_id}]"
@@ -198,6 +209,8 @@ class HeartFChatting:
self._cycle_counter = 0
self._cycle_history: Deque[CycleInfo] = deque(maxlen=10) # 保留最近10个循环的信息
self._current_cycle: Optional[CycleInfo] = None
+ self._lian_xu_bu_hui_fu_ci_shu: int = 0 # <--- 新增:连续不回复计数器
+ self._shutting_down: bool = False # <--- 新增:关闭标志位
async def _initialize(self) -> bool:
"""
@@ -276,6 +289,12 @@ class HeartFChatting:
"""主循环,持续进行计划并可能回复消息,直到被外部取消。"""
try:
while True: # 主循环
+ # --- 在循环开始处检查关闭标志 ---
+ if self._shutting_down:
+ logger.info(f"{self.log_prefix} 检测到关闭标志,退出 HFC 循环。")
+ break
+ # --------------------------------
+
# 创建新的循环信息
self._cycle_counter += 1
self._current_cycle = CycleInfo(self._cycle_counter)
@@ -287,6 +306,12 @@ class HeartFChatting:
# 执行规划和处理阶段
async with self._get_cycle_context() as acquired_lock:
if not acquired_lock:
+ # 如果未能获取锁(理论上不太可能,除非 shutdown 过程中释放了但又被抢了?)
+ # 或者也可以在这里再次检查 self._shutting_down
+ if self._shutting_down:
+ break # 再次检查,确保退出
+ logger.warning(f"{self.log_prefix} 未能获取循环处理锁,跳过本次循环。")
+ await asyncio.sleep(0.1) # 短暂等待避免空转
continue
# 记录规划开始时间点
@@ -320,7 +345,11 @@ class HeartFChatting:
)
except asyncio.CancelledError:
- logger.info(f"{self.log_prefix} HeartFChatting: 麦麦的认真水群(HFC)被取消了")
+ # 设置了关闭标志位后被取消是正常流程
+ if not self._shutting_down:
+ logger.warning(f"{self.log_prefix} HeartFChatting: 麦麦的认真水群(HFC)循环意外被取消")
+ else:
+ logger.info(f"{self.log_prefix} HeartFChatting: 麦麦的认真水群(HFC)循环已取消 (正常关闭)")
except Exception as e:
logger.error(f"{self.log_prefix} HeartFChatting: 意外错误: {e}")
logger.error(traceback.format_exc())
@@ -451,6 +480,8 @@ class HeartFChatting:
return await handler(reasoning, planner_start_db_time, cycle_timers), ""
except HeartFCError as e:
logger.error(f"{self.log_prefix} 处理{action}时出错: {e}")
+ # 出错时也重置计数器
+ self._lian_xu_bu_hui_fu_ci_shu = 0
return False, ""
async def _handle_text_reply(self, reasoning: str, emoji_query: str, cycle_timers: dict) -> tuple[bool, str]:
@@ -471,6 +502,8 @@ class HeartFChatting:
返回:
tuple[bool, str]: (是否回复成功, 思考消息ID)
"""
+ # 重置连续不回复计数器
+ self._lian_xu_bu_hui_fu_ci_shu = 0
# 获取锚点消息
anchor_message = await self._get_anchor_message()
@@ -544,8 +577,9 @@ class HeartFChatting:
处理不回复的情况
工作流程:
- 1. 等待新消息
- 2. 超时或收到新消息时返回
+ 1. 等待新消息、超时或关闭信号
+ 2. 根据等待结果更新连续不回复计数
+ 3. 如果达到阈值,触发回调
参数:
reasoning: 不回复的原因
@@ -561,14 +595,39 @@ class HeartFChatting:
try:
with Timer("等待新消息", cycle_timers):
- return await self._wait_for_new_message(observation, planner_start_db_time, self.log_prefix)
+ # 等待新消息、超时或关闭信号,并获取结果
+ await self._wait_for_new_message(observation, planner_start_db_time, self.log_prefix)
+
+ if not self._shutting_down:
+ self._lian_xu_bu_hui_fu_ci_shu += 1
+ logger.debug(
+ f"{self.log_prefix} 连续不回复计数增加: {self._lian_xu_bu_hui_fu_ci_shu}/{self.CONSECUTIVE_NO_REPLY_THRESHOLD}"
+ )
+
+ # 检查是否达到阈值
+ if self._lian_xu_bu_hui_fu_ci_shu >= self.CONSECUTIVE_NO_REPLY_THRESHOLD:
+ logger.info(
+ f"{self.log_prefix} 连续不回复达到阈值 ({self._lian_xu_bu_hui_fu_ci_shu}次),调用回调请求状态转换"
+ )
+ # 调用回调。注意:这里不重置计数器,依赖回调函数成功改变状态来隐式重置上下文。
+ await self.on_consecutive_no_reply_callback()
+
+ return True
+
except asyncio.CancelledError:
- logger.info(f"{self.log_prefix} 等待被中断")
+ # 如果在等待过程中任务被取消(可能是因为 shutdown)
+ logger.info(f"{self.log_prefix} 处理 'no_reply' 时等待被中断 (CancelledError)")
+ # 让异常向上传播,由 _hfc_loop 的异常处理逻辑接管
raise
+ except Exception as e: # 捕获调用管理器或其他地方可能发生的错误
+ logger.error(f"{self.log_prefix} 处理 'no_reply' 时发生错误: {e}")
+ logger.error(traceback.format_exc())
+ # 发生意外错误时,可以选择是否重置计数器,这里选择不重置
+ return False # 表示动作未成功
async def _wait_for_new_message(self, observation, planner_start_db_time: float, log_prefix: str) -> bool:
"""
- 等待新消息
+ 等待新消息 或 检测到关闭信号
参数:
observation: 观察实例
@@ -576,19 +635,36 @@ class HeartFChatting:
log_prefix: 日志前缀
返回:
- bool: 是否检测到新消息
+ bool: 是否检测到新消息 (如果因关闭信号退出则返回 False)
"""
wait_start_time = time.monotonic()
while True:
+ # --- 在每次循环开始时检查关闭标志 ---
+ if self._shutting_down:
+ logger.info(f"{log_prefix} 等待新消息时检测到关闭信号,中断等待。")
+ return False # 表示因为关闭而退出
+ # -----------------------------------
+
+ # 检查新消息
if await observation.has_new_messages_since(planner_start_db_time):
logger.info(f"{log_prefix} 检测到新消息")
return True
+ # 检查超时 (放在检查新消息和关闭之后)
if time.monotonic() - wait_start_time > 120:
- logger.warning(f"{log_prefix} 等待超时(120秒)")
+ logger.warning(f"{log_prefix} 等待新消息超时(20秒)")
return False
- await asyncio.sleep(1.5)
+ try:
+ # 短暂休眠,让其他任务有机会运行,并能更快响应取消或关闭
+ await asyncio.sleep(0.5) # 缩短休眠时间
+ except asyncio.CancelledError:
+ # 如果在休眠时被取消,再次检查关闭标志
+ # 如果是正常关闭,则不需要警告
+ if not self._shutting_down:
+ logger.warning(f"{log_prefix} _wait_for_new_message 的休眠被意外取消")
+ # 无论如何,重新抛出异常,让上层处理
+ raise
async def _log_cycle_timers(self, cycle_timers: dict, log_prefix: str):
"""记录循环周期的计时器结果"""
@@ -599,7 +675,9 @@ class HeartFChatting:
timer_strings.append(f"{name}: {formatted_time}")
if timer_strings:
- logger.debug(f"{log_prefix} 该次决策耗时: {'; '.join(timer_strings)}")
+ # 在记录前检查关闭标志
+ if not self._shutting_down:
+ logger.debug(f"{log_prefix} 该次决策耗时: {'; '.join(timer_strings)}")
async def _handle_cycle_delay(self, action_taken_this_cycle: bool, cycle_start_time: float, log_prefix: str):
"""处理循环延迟"""
@@ -835,6 +913,7 @@ class HeartFChatting:
async def shutdown(self):
"""优雅关闭HeartFChatting实例,取消活动循环任务"""
logger.info(f"{self.log_prefix} 正在关闭HeartFChatting...")
+ self._shutting_down = True # <-- 在开始关闭时设置标志位
# 取消循环任务
if self._loop_task and not self._loop_task.done():
@@ -865,6 +944,25 @@ class HeartFChatting:
action=action,
reasoning=reasoning,
)
+
+ # 在记录循环日志前检查关闭标志
+ if not self._shutting_down:
+ self._current_cycle.complete_cycle()
+ self._cycle_history.append(self._current_cycle)
+
+ # 记录循环信息和计时器结果
+ timer_strings = []
+ for name, elapsed in self._current_cycle.timers.items():
+ formatted_time = f"{elapsed * 1000:.2f}毫秒" if elapsed < 1 else f"{elapsed:.2f}秒"
+ timer_strings.append(f"{name}: {formatted_time}")
+
+ logger.debug(
+ f"{self.log_prefix} 第 #{self._current_cycle.cycle_id}次思考完成,"
+ f"耗时: {self._current_cycle.end_time - self._current_cycle.start_time:.2f}秒, "
+ f"动作: {self._current_cycle.action_type}"
+ + (f"\n计时器详情: {'; '.join(timer_strings)}" if timer_strings else "")
+ )
+
return prompt
async def _build_planner_prompt(
diff --git a/src/plugins/models/utils_model.py b/src/plugins/models/utils_model.py
index 2cab7b629..7c87cf946 100644
--- a/src/plugins/models/utils_model.py
+++ b/src/plugins/models/utils_model.py
@@ -178,395 +178,6 @@ class LLMRequest:
output_cost = (completion_tokens / 1000000) * self.pri_out
return round(input_cost + output_cost, 6)
- '''
- async def _execute_request(
- self,
- endpoint: str,
- prompt: str = None,
- image_base64: str = None,
- image_format: str = None,
- payload: dict = None,
- retry_policy: dict = None,
- response_handler: callable = None,
- user_id: str = "system",
- request_type: str = None,
- ):
- """统一请求执行入口
- Args:
- endpoint: API端点路径 (如 "chat/completions")
- prompt: prompt文本
- image_base64: 图片的base64编码
- image_format: 图片格式
- payload: 请求体数据
- retry_policy: 自定义重试策略
- response_handler: 自定义响应处理器
- user_id: 用户ID
- request_type: 请求类型
- """
-
- if request_type is None:
- request_type = self.request_type
-
- # 合并重试策略
- default_retry = {
- "max_retries": 3,
- "base_wait": 10,
- "retry_codes": [429, 413, 500, 503],
- "abort_codes": [400, 401, 402, 403],
- }
- policy = {**default_retry, **(retry_policy or {})}
-
- # 常见Error Code Mapping
- error_code_mapping = {
- 400: "参数不正确",
- 401: "API key 错误,认证失败,请检查/config/bot_config.toml和.env中的配置是否正确哦~",
- 402: "账号余额不足",
- 403: "需要实名,或余额不足",
- 404: "Not Found",
- 429: "请求过于频繁,请稍后再试",
- 500: "服务器内部故障",
- 503: "服务器负载过高",
- }
-
- api_url = f"{self.base_url.rstrip('/')}/{endpoint.lstrip('/')}"
- # 判断是否为流式
- stream_mode = self.stream
- # logger_msg = "进入流式输出模式," if stream_mode else ""
- # logger.debug(f"{logger_msg}发送请求到URL: {api_url}")
- # logger.info(f"使用模型: {self.model_name}")
-
- # 构建请求体
- if image_base64:
- payload = await self._build_payload(prompt, image_base64, image_format)
- elif payload is None:
- payload = await self._build_payload(prompt)
-
- # 流式输出标志
- # 先构建payload,再添加流式输出标志
- if stream_mode:
- payload["stream"] = stream_mode
-
- for retry in range(policy["max_retries"]):
- try:
- # 使用上下文管理器处理会话
- headers = await self._build_headers()
- # 似乎是openai流式必须要的东西,不过阿里云的qwq-plus加了这个没有影响
- if stream_mode:
- headers["Accept"] = "text/event-stream"
-
- async with aiohttp.ClientSession() as session:
- try:
- async with session.post(api_url, headers=headers, json=payload) as response:
- # 处理需要重试的状态码
- if response.status in policy["retry_codes"]:
- wait_time = policy["base_wait"] * (2**retry)
- logger.warning(
- f"模型 {self.model_name} 错误码: {response.status}, 等待 {wait_time}秒后重试"
- )
- if response.status == 413:
- logger.warning("请求体过大,尝试压缩...")
- image_base64 = compress_base64_image_by_scale(image_base64)
- payload = await self._build_payload(prompt, image_base64, image_format)
- elif response.status in [500, 503]:
- logger.error(
- f"模型 {self.model_name} 错误码: {response.status} - {error_code_mapping.get(response.status)}"
- )
- raise RuntimeError("服务器负载过高,模型恢复失败QAQ")
- else:
- logger.warning(f"模型 {self.model_name} 请求限制(429),等待{wait_time}秒后重试...")
-
- await asyncio.sleep(wait_time)
- continue
- elif response.status in policy["abort_codes"]:
- logger.error(
- f"模型 {self.model_name} 错误码: {response.status} - {error_code_mapping.get(response.status)}"
- )
- # 尝试获取并记录服务器返回的详细错误信息
- try:
- error_json = await response.json()
- if error_json and isinstance(error_json, list) and len(error_json) > 0:
- for error_item in error_json:
- if "error" in error_item and isinstance(error_item["error"], dict):
- error_obj = error_item["error"]
- error_code = error_obj.get("code")
- error_message = error_obj.get("message")
- error_status = error_obj.get("status")
- logger.error(
- f"服务器错误详情: 代码={error_code}, 状态={error_status}, "
- f"消息={error_message}"
- )
- elif isinstance(error_json, dict) and "error" in error_json:
- # 处理单个错误对象的情况
- error_obj = error_json.get("error", {})
- error_code = error_obj.get("code")
- error_message = error_obj.get("message")
- error_status = error_obj.get("status")
- logger.error(
- f"服务器错误详情: 代码={error_code}, 状态={error_status}, 消息={error_message}"
- )
- else:
- # 记录原始错误响应内容
- logger.error(f"服务器错误响应: {error_json}")
- except Exception as e:
- logger.warning(f"无法解析服务器错误响应: {str(e)}")
-
- if response.status == 403:
- # 只针对硅基流动的V3和R1进行降级处理
- if (
- self.model_name.startswith("Pro/deepseek-ai")
- and self.base_url == "https://api.siliconflow.cn/v1/"
- ):
- old_model_name = self.model_name
- self.model_name = self.model_name[4:] # 移除"Pro/"前缀
- logger.warning(
- f"检测到403错误,模型从 {old_model_name} 降级为 {self.model_name}"
- )
-
- # 对全局配置进行更新
- if global_config.llm_normal.get("name") == old_model_name:
- global_config.llm_normal["name"] = self.model_name
- logger.warning(f"将全局配置中的 llm_normal 模型临时降级至{self.model_name}")
-
- if global_config.llm_reasoning.get("name") == old_model_name:
- global_config.llm_reasoning["name"] = self.model_name
- logger.warning(
- f"将全局配置中的 llm_reasoning 模型临时降级至{self.model_name}"
- )
-
- # 更新payload中的模型名
- if payload and "model" in payload:
- payload["model"] = self.model_name
-
- # 重新尝试请求
- retry -= 1 # 不计入重试次数
- continue
-
- raise RuntimeError(f"请求被拒绝: {error_code_mapping.get(response.status)}")
-
- response.raise_for_status()
- reasoning_content = ""
-
- # 将流式输出转化为非流式输出
- if stream_mode:
- flag_delta_content_finished = False
- accumulated_content = ""
- usage = None # 初始化usage变量,避免未定义错误
-
- async for line_bytes in response.content:
- try:
- line = line_bytes.decode("utf-8").strip()
- if not line:
- continue
- if line.startswith("data:"):
- data_str = line[5:].strip()
- if data_str == "[DONE]":
- break
- try:
- chunk = json.loads(data_str)
- if flag_delta_content_finished:
- chunk_usage = chunk.get("usage", None)
- if chunk_usage:
- usage = chunk_usage # 获取token用量
- else:
- delta = chunk["choices"][0]["delta"]
- delta_content = delta.get("content")
- if delta_content is None:
- delta_content = ""
- accumulated_content += delta_content
- # 检测流式输出文本是否结束
- finish_reason = chunk["choices"][0].get("finish_reason")
- if delta.get("reasoning_content", None):
- reasoning_content += delta["reasoning_content"]
- if finish_reason == "stop":
- chunk_usage = chunk.get("usage", None)
- if chunk_usage:
- usage = chunk_usage
- break
- # 部分平台在文本输出结束前不会返回token用量,此时需要再获取一次chunk
- flag_delta_content_finished = True
-
- except Exception as e:
- logger.exception(f"模型 {self.model_name} 解析流式输出错误: {str(e)}")
- except GeneratorExit:
- logger.warning("模型 {self.model_name} 流式输出被中断,正在清理资源...")
- # 确保资源被正确清理
- await response.release()
- # 返回已经累积的内容
- result = {
- "choices": [
- {
- "message": {
- "content": accumulated_content,
- "reasoning_content": reasoning_content,
- # 流式输出可能没有工具调用,此处不需要添加tool_calls字段
- }
- }
- ],
- "usage": usage,
- }
- return (
- response_handler(result)
- if response_handler
- else self._default_response_handler(result, user_id, request_type, endpoint)
- )
- except Exception as e:
- logger.error(f"模型 {self.model_name} 处理流式输出时发生错误: {str(e)}")
- # 确保在发生错误时也能正确清理资源
- try:
- await response.release()
- except Exception as cleanup_error:
- logger.error(f"清理资源时发生错误: {cleanup_error}")
- # 返回已经累积的内容
- result = {
- "choices": [
- {
- "message": {
- "content": accumulated_content,
- "reasoning_content": reasoning_content,
- # 流式输出可能没有工具调用,此处不需要添加tool_calls字段
- }
- }
- ],
- "usage": usage,
- }
- return (
- response_handler(result)
- if response_handler
- else self._default_response_handler(result, user_id, request_type, endpoint)
- )
- content = accumulated_content
- think_match = re.search(r"(.*?)", content, re.DOTALL)
- if think_match:
- reasoning_content = think_match.group(1).strip()
- content = re.sub(r".*?", "", content, flags=re.DOTALL).strip()
- # 构造一个伪result以便调用自定义响应处理器或默认处理器
- result = {
- "choices": [
- {
- "message": {
- "content": content,
- "reasoning_content": reasoning_content,
- # 流式输出可能没有工具调用,此处不需要添加tool_calls字段
- }
- }
- ],
- "usage": usage,
- }
- return (
- response_handler(result)
- if response_handler
- else self._default_response_handler(result, user_id, request_type, endpoint)
- )
- else:
- result = await response.json()
- # 使用自定义处理器或默认处理
- return (
- response_handler(result)
- if response_handler
- else self._default_response_handler(result, user_id, request_type, endpoint)
- )
-
- except (aiohttp.ClientError, asyncio.TimeoutError) as e:
- if retry < policy["max_retries"] - 1:
- wait_time = policy["base_wait"] * (2**retry)
- logger.error(f"模型 {self.model_name} 网络错误,等待{wait_time}秒后重试... 错误: {str(e)}")
- await asyncio.sleep(wait_time)
- continue
- else:
- logger.critical(f"模型 {self.model_name} 网络错误达到最大重试次数: {str(e)}")
- raise RuntimeError(f"网络请求失败: {str(e)}") from e
- except Exception as e:
- logger.critical(f"模型 {self.model_name} 未预期的错误: {str(e)}")
- raise RuntimeError(f"请求过程中发生错误: {str(e)}") from e
-
- except aiohttp.ClientResponseError as e:
- # 处理aiohttp抛出的响应错误
- if retry < policy["max_retries"] - 1:
- wait_time = policy["base_wait"] * (2**retry)
- logger.error(
- f"模型 {self.model_name} HTTP响应错误,等待{wait_time}秒后重试... 状态码: {e.status}, 错误: {e.message}"
- )
- try:
- if hasattr(e, "response") and e.response and hasattr(e.response, "text"):
- error_text = await e.response.text()
- try:
- error_json = json.loads(error_text)
- if isinstance(error_json, list) and len(error_json) > 0:
- for error_item in error_json:
- if "error" in error_item and isinstance(error_item["error"], dict):
- error_obj = error_item["error"]
- logger.error(
- f"模型 {self.model_name} 服务器错误详情: 代码={error_obj.get('code')}, "
- f"状态={error_obj.get('status')}, "
- f"消息={error_obj.get('message')}"
- )
- elif isinstance(error_json, dict) and "error" in error_json:
- error_obj = error_json.get("error", {})
- logger.error(
- f"模型 {self.model_name} 服务器错误详情: 代码={error_obj.get('code')}, "
- f"状态={error_obj.get('status')}, "
- f"消息={error_obj.get('message')}"
- )
- else:
- logger.error(f"模型 {self.model_name} 服务器错误响应: {error_json}")
- except (json.JSONDecodeError, TypeError) as json_err:
- logger.warning(
- f"模型 {self.model_name} 响应不是有效的JSON: {str(json_err)}, 原始内容: {error_text[:200]}"
- )
- except (AttributeError, TypeError, ValueError) as parse_err:
- logger.warning(f"模型 {self.model_name} 无法解析响应错误内容: {str(parse_err)}")
-
- await asyncio.sleep(wait_time)
- else:
- logger.critical(
- f"模型 {self.model_name} HTTP响应错误达到最大重试次数: 状态码: {e.status}, 错误: {e.message}"
- )
- # 安全地检查和记录请求详情
- if (
- image_base64
- and payload
- and isinstance(payload, dict)
- and "messages" in payload
- and len(payload["messages"]) > 0
- ):
- if isinstance(payload["messages"][0], dict) and "content" in payload["messages"][0]:
- content = payload["messages"][0]["content"]
- if isinstance(content, list) and len(content) > 1 and "image_url" in content[1]:
- payload["messages"][0]["content"][1]["image_url"]["url"] = (
- f"data:image/{image_format.lower() if image_format else 'jpeg'};base64,"
- f"{image_base64[:10]}...{image_base64[-10:]}"
- )
- logger.critical(f"请求头: {await self._build_headers(no_key=True)} 请求体: {payload}")
- raise RuntimeError(f"模型 {self.model_name} API请求失败: 状态码 {e.status}, {e.message}") from e
- except Exception as e:
- if retry < policy["max_retries"] - 1:
- wait_time = policy["base_wait"] * (2**retry)
- logger.error(f"模型 {self.model_name} 请求失败,等待{wait_time}秒后重试... 错误: {str(e)}")
- await asyncio.sleep(wait_time)
- else:
- logger.critical(f"模型 {self.model_name} 请求失败: {str(e)}")
- # 安全地检查和记录请求详情
- if (
- image_base64
- and payload
- and isinstance(payload, dict)
- and "messages" in payload
- and len(payload["messages"]) > 0
- ):
- if isinstance(payload["messages"][0], dict) and "content" in payload["messages"][0]:
- content = payload["messages"][0]["content"]
- if isinstance(content, list) and len(content) > 1 and "image_url" in content[1]:
- payload["messages"][0]["content"][1]["image_url"]["url"] = (
- f"data:image/{image_format.lower() if image_format else 'jpeg'};base64,"
- f"{image_base64[:10]}...{image_base64[-10:]}"
- )
- logger.critical(f"请求头: {await self._build_headers(no_key=True)} 请求体: {payload}")
- raise RuntimeError(f"模型 {self.model_name} API请求失败: {str(e)}") from e
-
- logger.error(f"模型 {self.model_name} 达到最大重试次数,请求仍然失败")
- raise RuntimeError(f"模型 {self.model_name} 达到最大重试次数,API请求仍然失败")
- '''
-
async def _prepare_request(
self,
endpoint: str,
@@ -820,6 +431,7 @@ class LLMRequest:
policy = request_content["policy"]
payload = request_content["payload"]
wait_time = policy["base_wait"] * (2**retry_count)
+ keep_request = False
if retry_count < policy["max_retries"] - 1:
keep_request = True
if isinstance(exception, RequestAbortException):