Merge branch 'MaiM-with-u:dev' into dev

This commit is contained in:
未來星織
2025-04-26 23:00:45 +09:00
committed by GitHub
8 changed files with 377 additions and 641 deletions

View File

@@ -337,7 +337,7 @@ REMOTE_STYLE_CONFIG = {
"file_format": "{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {extra[module]: <15} | 远程 | {message}",
},
"simple": {
"console_format": "<level>{time:MM-DD HH:mm}</level> | <light-yellow>远程</light-yellow> | {message}",
"console_format": "<level>{time:MM-DD HH:mm}</level> | <fg #00788A>远程| {message}</fg #00788A>",
"file_format": "{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {extra[module]: <15} | 远程 | {message}",
},
}

View File

@@ -61,6 +61,7 @@ c HeartFChatting工作方式
c.5.2.2 通过 HeartFCSender 直接发送匹配查询 (emoji_query) 的表情。
c.5.3 如果决策是 no_reply:
c.5.3.1 进入等待状态,直到检测到新消息或超时。
c.5.3.2 同时,增加内部连续不回复计数器。如果该计数器达到预设阈值(例如 5 次),则调用初始化时由 `SubHeartflowManager` 提供的回调函数。此回调函数会通知 `SubHeartflowManager` 请求将对应的 `SubHeartflow` 状态转换为 `ABSENT`。如果执行了其他动作(如 `text_reply``emoji_reply`),则此计数器会被重置。
c.6 循环结束后,记录周期信息 (CycleInfo)并根据情况进行短暂休眠防止CPU空转。
@@ -152,7 +153,7 @@ c HeartFChatting工作方式
- **状态转换机制** (由 `SubHeartflowManager` 驱动):
- **激活 `CHAT`**: 当 `Heartflow` 状态从 `OFFLINE` 变为允许聊天的状态时,`SubHeartflowManager` 会根据限制(通过 `self.mai_state_info` 获取),选择部分 `ABSENT` 状态的子心流,**检查当前 CHAT 状态数量是否达到上限**,如果未达上限,则调用其 `change_chat_state` 方法将其转换为 `CHAT`。此外,`evaluate_and_transition_subflows_by_llm` 方法也会根据 LLM 的判断,在未达上限时将 `ABSENT` 状态的子心流激活为 `CHAT`
- **激活 `FOCUSED`**: `SubHeartflowManager` 会定期评估处于 `CHAT` 状态的子心流的兴趣度 (`InterestChatting.start_hfc_probability`),若满足条件且**检查当前 FOCUSED 状态数量未达上限**(通过 `self.mai_state_info` 获取限制),则调用 `change_chat_state` 将其提升为 `FOCUSED`
- **停用/回退**: `SubHeartflowManager` 可能因 `Heartflow` 状态变化、达到数量限制、长时间不活跃、随机概率 (`randomly_deactivate_subflows`)LLM 评估 (`evaluate_and_transition_subflows_by_llm` 判断 `CHAT` 状态子心流应休眠) 等原因,调用 `change_chat_state` 将子心流状态设置为 `ABSENT` 或从 `FOCUSED` 回退到 `CHAT`。当子心流进入 `ABSENT` 状态后,如果持续一小时不活跃,才会被后台清理任务删除。
- **停用/回退**: `SubHeartflowManager` 可能因 `Heartflow` 状态变化、达到数量限制、长时间不活跃、随机概率 (`randomly_deactivate_subflows`)LLM 评估 (`evaluate_and_transition_subflows_by_llm` 判断 `CHAT` 状态子心流应休眠) 或收到来自 `HeartFChatting` 的连续不回复回调信号 (`request_absent_transition`) 等原因,调用 `change_chat_state` 将子心流状态设置为 `ABSENT` 或从 `FOCUSED` 回退到 `CHAT`。当子心流进入 `ABSENT` 状态后,如果持续一小时不活跃,才会被后台清理任务删除。
- **注意**: `change_chat_state` 方法本身只负责执行状态转换和管理内部聊天实例(`NormalChatInstance`/`HeartFlowChatInstance`),不再进行限额检查。限额检查的责任完全由调用方(即 `SubHeartflowManager` 中的相关方法,这些方法会使用内部存储的 `mai_state_info` 来获取限制)承担。
## 3. 聊天实例详解 (Chat Instances Explained)

View File

@@ -112,15 +112,6 @@ class BackgroundTaskManager:
f"兴趣评估任务已启动 间隔:{self.interest_eval_interval}s",
"_interest_eval_task",
),
# 新增随机停用任务配置
(
self._random_deactivation_task,
self._run_random_deactivation_cycle,
"hf_random_deactivation",
"debug", # 设为debug避免过多日志
f"随机停用任务已启动 间隔:{self.random_deactivation_interval}s",
"_random_deactivation_task",
),
]
# 统一启动所有任务
@@ -264,11 +255,6 @@ class BackgroundTaskManager:
# --- 结束新增 ---
# --- 新增随机停用工作函数 ---
async def _perform_random_deactivation_work(self):
"""执行一轮子心流随机停用检查。"""
await self.subheartflow_manager.randomly_deactivate_subflows()
# --- 结束新增 ---
# --- Specific Task Runners --- #
@@ -299,16 +285,3 @@ class BackgroundTaskManager:
interval=self.interest_eval_interval,
task_func=self._perform_interest_eval_work,
)
# --- 结束新增 ---
# --- 新增随机停用任务运行器 ---
async def _run_random_deactivation_cycle(self):
"""运行随机停用循环。"""
await self._run_periodic_loop(
task_name="Random Deactivation",
interval=self.random_deactivation_interval,
task_func=self._perform_random_deactivation_work,
)
# --- 结束新增 ---

View File

@@ -2,7 +2,7 @@ from .observation import Observation, ChattingObservation
import asyncio
from src.config.config import global_config
import time
from typing import Optional, List, Dict, Tuple
from typing import Optional, List, Dict, Tuple, Callable, Coroutine
import traceback
from src.common.logger import get_module_logger, LogConfig, SUB_HEARTFLOW_STYLE_CONFIG # noqa: E402
import random
@@ -15,6 +15,11 @@ from src.heart_flow.mai_state_manager import MaiStateInfo
from src.heart_flow.chat_state_info import ChatState, ChatStateInfo
from src.heart_flow.sub_mind import SubMind
# # --- REMOVE: Conditional import --- #
# if TYPE_CHECKING:
# from src.heart_flow.subheartflow_manager import SubHeartflowManager
# # --- END REMOVE --- #
# 定义常量 (从 interest.py 移动过来)
MAX_INTEREST = 15.0
@@ -234,16 +239,23 @@ class InterestChatting:
class SubHeartflow:
def __init__(self, subheartflow_id, mai_states: MaiStateInfo):
def __init__(
self,
subheartflow_id,
mai_states: MaiStateInfo,
hfc_no_reply_callback: Callable[[], Coroutine[None, None, None]],
):
"""子心流初始化函数
Args:
subheartflow_id: 子心流唯一标识符
parent_heartflow: 父级心流实例
mai_states: 麦麦状态信息实例
hfc_no_reply_callback: HFChatting 连续不回复时触发的回调
"""
# 基础属性,两个值是一样的
self.subheartflow_id = subheartflow_id
self.chat_id = subheartflow_id
self.hfc_no_reply_callback = hfc_no_reply_callback
# 麦麦的状态
self.mai_states = mai_states
@@ -364,11 +376,17 @@ class SubHeartflow:
# 如果实例不存在,则创建并启动
logger.info(f"{log_prefix} 麦麦准备开始专注聊天 (创建新实例)...")
try:
# 创建 HeartFChatting 实例,并传递 从构造函数传入的 回调函数
self.heart_fc_instance = HeartFChatting(
chat_id=self.chat_id, sub_mind=self.sub_mind, observations=self.observations
chat_id=self.subheartflow_id,
sub_mind=self.sub_mind,
observations=self.observations, # 传递所有观察者
on_consecutive_no_reply_callback=self.hfc_no_reply_callback, # <-- Use stored callback
)
# 初始化并启动 HeartFChatting
if await self.heart_fc_instance._initialize():
await self.heart_fc_instance.start() # 初始化成功后启动循环
await self.heart_fc_instance.start()
logger.info(f"{log_prefix} 麦麦已成功进入专注聊天模式 (新实例已启动)。")
return True
else:

View File

@@ -103,7 +103,7 @@ class SubMind:
individuality = Individuality.get_instance()
# 构建个性部分
prompt_personality = f"的名字是{individuality.personality.bot_nickname},你"
prompt_personality = f"正在扮演名为{individuality.personality.bot_nickname}的人类,你"
prompt_personality += individuality.personality.personality_core
# 随机添加个性侧面

View File

@@ -3,6 +3,7 @@ import time
import random
from typing import Dict, Any, Optional, List
import json # 导入 json 模块
import functools # <-- 新增导入
# 导入日志模块
from src.common.logger import get_module_logger, LogConfig, SUBHEARTFLOW_MANAGER_STYLE_CONFIG
@@ -18,9 +19,10 @@ from .observation import ChattingObservation
# 导入LLM请求工具
from src.plugins.models.utils_model import LLMRequest
from src.config.config import global_config
from src.individuality.individuality import Individuality
import traceback
# 初始化日志记录器
subheartflow_manager_log_config = LogConfig(
@@ -77,8 +79,17 @@ class SubHeartflowManager:
return subflow
try:
# 初始化子心流, 传入存储的 mai_state_info
new_subflow = SubHeartflow(subheartflow_id, self.mai_state_info)
# --- 使用 functools.partial 创建 HFC 回调 --- #
# 将 manager 的 _handle_hfc_no_reply 方法与当前的 subheartflow_id 绑定
hfc_callback = functools.partial(self._handle_hfc_no_reply, subheartflow_id)
# --- 结束创建回调 --- #
# 初始化子心流, 传入 mai_state_info 和 partial 创建的回调
new_subflow = SubHeartflow(
subheartflow_id,
self.mai_state_info,
hfc_callback, # <-- 传递 partial 创建的回调
)
# 异步初始化
await new_subflow.initialize()
@@ -100,24 +111,56 @@ class SubHeartflowManager:
logger.error(f"创建子心流 {subheartflow_id} 失败: {e}", exc_info=True)
return None
# --- 新增:内部方法,用于尝试将单个子心流设置为 ABSENT ---
async def _try_set_subflow_absent_internal(self, subflow: "SubHeartflow", log_prefix: str) -> bool:
"""
尝试将给定的子心流对象状态设置为 ABSENT (内部方法,不处理锁)。
Args:
subflow: 子心流对象。
log_prefix: 用于日志记录的前缀 (例如 "[子心流管理]""[停用]")。
Returns:
bool: 如果状态成功变为 ABSENT 或原本就是 ABSENT返回 True否则返回 False。
"""
flow_id = subflow.subheartflow_id
stream_name = chat_manager.get_stream_name(flow_id) or flow_id
if subflow.chat_state.chat_status != ChatState.ABSENT:
logger.debug(f"{log_prefix} 设置 {stream_name} 状态为 ABSENT")
try:
await subflow.change_chat_state(ChatState.ABSENT)
# 再次检查以确认状态已更改 (change_chat_state 内部应确保)
if subflow.chat_state.chat_status == ChatState.ABSENT:
return True
else:
logger.warning(
f"{log_prefix} 调用 change_chat_state 后,{stream_name} 状态仍为 {subflow.chat_state.chat_status.value}"
)
return False
except Exception as e:
logger.error(f"{log_prefix} 设置 {stream_name} 状态为 ABSENT 时失败: {e}", exc_info=True)
return False
else:
logger.debug(f"{log_prefix} {stream_name} 已是 ABSENT 状态")
return True # 已经是目标状态,视为成功
# --- 结束新增 ---
async def sleep_subheartflow(self, subheartflow_id: Any, reason: str) -> bool:
"""停止指定的子心流并清理资源"""
subheartflow = self.subheartflows.get(subheartflow_id)
if not subheartflow:
return False
"""停止指定的子心流并将其状态设置为 ABSENT"""
log_prefix = "[子心流管理]"
async with self._lock: # 加锁以安全访问字典
subheartflow = self.subheartflows.get(subheartflow_id)
stream_name = chat_manager.get_stream_name(subheartflow_id) or subheartflow_id
logger.info(f"[子心流管理] 正在停止 {stream_name}, 原因: {reason}")
stream_name = chat_manager.get_stream_name(subheartflow_id) or subheartflow_id
logger.info(f"{log_prefix} 正在停止 {stream_name}, 原因: {reason}")
try:
# 设置状态为ABSENT释放资源
if subheartflow.chat_state.chat_status != ChatState.ABSENT:
logger.debug(f"[子心流管理] 设置 {stream_name} 状态为ABSENT")
await subheartflow.change_chat_state(ChatState.ABSENT)
else:
logger.debug(f"[子心流管理] {stream_name} 已是ABSENT状态")
except Exception as e:
logger.error(f"[子心流管理] 设置ABSENT状态失败: {e}")
# 调用内部方法处理状态变更
success = await self._try_set_subflow_absent_internal(subheartflow, log_prefix)
return success
# 锁在此处自动释放
def get_inactive_subheartflows(self, max_age_seconds=INACTIVE_THRESHOLD_SECONDS):
"""识别并返回需要清理的不活跃(处于ABSENT状态超过一小时)子心流(id, 原因)"""
@@ -185,174 +228,107 @@ class SubHeartflowManager:
async def deactivate_all_subflows(self):
"""将所有子心流的状态更改为 ABSENT (例如主状态变为OFFLINE时调用)"""
# logger.info("[停用] 开始将所有子心流状态设置为 ABSENT")
# 使用 list() 创建一个当前值的快照,防止在迭代时修改字典
flows_to_update = list(self.subheartflows.values())
if not flows_to_update:
logger.debug("[停用] 无活跃子心流,无需操作")
return
log_prefix = "[停用]"
changed_count = 0
for subflow in flows_to_update:
flow_id = subflow.subheartflow_id
stream_name = chat_manager.get_stream_name(flow_id) or flow_id
# 再次检查子心流是否仍然存在于管理器中,以防万一在迭代过程中被移除
processed_count = 0
if subflow.chat_state.chat_status != ChatState.ABSENT:
logger.debug(
f"正在将子心流 {stream_name} 的状态从 {subflow.chat_state.chat_status.value} 更改为 ABSENT"
)
try:
# 调用 change_chat_state 将状态设置为 ABSENT
await subflow.change_chat_state(ChatState.ABSENT)
# 验证状态是否真的改变了
if (
flow_id in self.subheartflows
and self.subheartflows[flow_id].chat_state.chat_status == ChatState.ABSENT
):
async with self._lock: # 获取锁以安全迭代
# 使用 list() 创建一个当前值的快照,防止在迭代时修改字典
flows_to_update = list(self.subheartflows.values())
processed_count = len(flows_to_update)
if not flows_to_update:
logger.debug(f"{log_prefix} 无活跃子心流,无需操作")
return
for subflow in flows_to_update:
# 记录原始状态,以便统计实际改变的数量
original_state_was_absent = subflow.chat_state.chat_status == ChatState.ABSENT
success = await self._try_set_subflow_absent_internal(subflow, log_prefix)
# 如果成功设置为 ABSENT 且原始状态不是 ABSENT则计数
if success and not original_state_was_absent:
if subflow.chat_state.chat_status == ChatState.ABSENT:
changed_count += 1
else:
logger.warning(
f"[停用] 尝试更改子心流 {stream_name} 状态后,状态仍未变为 ABSENT 或子心流已消失。"
)
except Exception as e:
logger.error(f"[停用] 更改子心流 {stream_name} 状态为 ABSENT 时出错: {e}", exc_info=True)
else:
logger.debug(f"[停用] 子心流 {stream_name} 已处于 ABSENT 状态,无需更改。")
# 这种情况理论上不应发生,如果内部方法返回 True 的话
stream_name = chat_manager.get_stream_name(subflow.subheartflow_id) or subflow.subheartflow_id
logger.warning(f"{log_prefix} 内部方法声称成功但 {stream_name} 状态未变为 ABSENT。")
# 锁在此处自动释放
logger.info(
f"下限完成,共处理 {len(flows_to_update)} 个子心流,成功将 {changed_count} 个子心流的状态更改为 ABSENT。"
f"{log_prefix} 完成,共处理 {processed_count} 个子心流,成功将 {changed_count}非 ABSENT 子心流的状态更改为 ABSENT。"
)
async def evaluate_interest_and_promote(self):
"""评估子心流兴趣度满足条件且未达上限则提升到FOCUSED状态基于start_hfc_probability"""
log_prefix = "[兴趣评估]"
# 使用 self.mai_state_info 获取当前状态和限制
current_state = self.mai_state_info.get_current_state()
focused_limit = current_state.get_focused_chat_max_num()
if int(time.time()) % 20 == 0: # 每20秒输出一次
logger.debug(f"{log_prefix} 当前状态 ({current_state.value}) 可以在{focused_limit}个群激情聊天")
if focused_limit <= 0:
# logger.debug(f"{log_prefix} 当前状态 ({current_state.value}) 不允许 FOCUSED 子心流")
return
current_focused_count = self.count_subflows_by_state(ChatState.FOCUSED)
if current_focused_count >= focused_limit:
logger.debug(f"{log_prefix} 已达专注上限 ({current_focused_count}/{focused_limit})")
return
for sub_hf in list(self.subheartflows.values()):
flow_id = sub_hf.subheartflow_id
stream_name = chat_manager.get_stream_name(flow_id) or flow_id
# 跳过非CHAT状态或已经是FOCUSED状态的子心流
if sub_hf.chat_state.chat_status == ChatState.FOCUSED:
continue
from .mai_state_manager import enable_unlimited_hfc_chat
if not enable_unlimited_hfc_chat:
if sub_hf.chat_state.chat_status != ChatState.CHAT:
continue
# 检查是否满足提升概率
if random.random() >= sub_hf.interest_chatting.start_hfc_probability:
continue
# 再次检查是否达到上限
if current_focused_count >= focused_limit:
logger.debug(f"{log_prefix} [{stream_name}] 已达专注上限")
break
# 获取最新状态并执行提升
current_subflow = self.subheartflows.get(flow_id)
if not current_subflow:
continue
logger.info(
f"{log_prefix} [{stream_name}] 触发 认真水群 (概率={current_subflow.interest_chatting.start_hfc_probability:.2f})"
)
# 执行状态提升
await current_subflow.change_chat_state(ChatState.FOCUSED)
# 验证提升结果
if (
final_subflow := self.subheartflows.get(flow_id)
) and final_subflow.chat_state.chat_status == ChatState.FOCUSED:
current_focused_count += 1
async def randomly_deactivate_subflows(self, deactivation_probability: float = 0.1):
"""以一定概率将 FOCUSED 或 CHAT 状态的子心流回退到 ABSENT 状态。"""
log_prefix_manager = "[子心流管理器-随机停用]"
logger.debug(f"{log_prefix_manager} 开始随机停用检查... (概率: {deactivation_probability:.0%})")
# 使用快照安全遍历
subflows_snapshot = list(self.subheartflows.values())
deactivated_count = 0
try:
for sub_hf in subflows_snapshot:
log_prefix = "[兴趣评估]"
# 使用 self.mai_state_info 获取当前状态和限制
current_state = self.mai_state_info.get_current_state()
focused_limit = current_state.get_focused_chat_max_num()
logger.debug(f"{log_prefix} 当前状态 ({current_state.value}) 开始尝试提升到FOCUSED状态")
if int(time.time()) % 20 == 0: # 每20秒输出一次
logger.debug(f"{log_prefix} 当前状态 ({current_state.value}) 可以在{focused_limit}个群激情聊天")
if focused_limit <= 0:
# logger.debug(f"{log_prefix} 当前状态 ({current_state.value}) 不允许 FOCUSED 子心流")
return
current_focused_count = self.count_subflows_by_state(ChatState.FOCUSED)
if current_focused_count >= focused_limit:
logger.debug(f"{log_prefix} 已达专注上限 ({current_focused_count}/{focused_limit})")
return
for sub_hf in list(self.subheartflows.values()):
flow_id = sub_hf.subheartflow_id
stream_name = chat_manager.get_stream_name(flow_id) or flow_id
log_prefix_flow = f"[{stream_name}]"
current_state = sub_hf.chat_state.chat_status
# 只处理 FOCUSED 或 CHAT 状态
if current_state not in [ChatState.FOCUSED, ChatState.CHAT]:
# 跳过非CHAT状态或已经是FOCUSED状态的子心流
if sub_hf.chat_state.chat_status == ChatState.FOCUSED:
continue
# 检查随机概率
if random.random() < deactivation_probability:
logger.info(
f"{log_prefix_manager} {log_prefix_flow} 随机触发停用 (从 {current_state.value}) -> ABSENT"
)
from .mai_state_manager import enable_unlimited_hfc_chat
# 获取当前实例以检查最新状态
current_subflow = self.subheartflows.get(flow_id)
if not current_subflow or current_subflow.chat_state.chat_status != current_state:
logger.warning(f"{log_prefix_manager} {log_prefix_flow} 尝试停用时状态已改变或实例消失,跳过。")
if not enable_unlimited_hfc_chat:
if sub_hf.chat_state.chat_status != ChatState.CHAT:
continue
# --- 状态设置 --- #
# 注意:这里传递的状态数量是 *停用前* 的状态数量
await current_subflow.change_chat_state(ChatState.ABSENT)
# 检查是否满足提升概率
if random.random() >= sub_hf.interest_chatting.start_hfc_probability:
continue
# --- 状态验证 (可选) ---
final_subflow = self.subheartflows.get(flow_id)
if final_subflow:
final_state = final_subflow.chat_state.chat_status
if final_state == ChatState.ABSENT:
logger.debug(
f"{log_prefix_manager} {log_prefix_flow} 成功从 {current_state.value} 停用到 ABSENT 状态"
)
deactivated_count += 1
else:
logger.warning(
f"{log_prefix_manager} {log_prefix_flow} 尝试停用到 ABSENT 后状态仍为 {final_state.value}"
)
else:
logger.warning(f"{log_prefix_manager} {log_prefix_flow} 停用后验证时子心流 {flow_id} 消失")
# 再次检查是否达到上限
if current_focused_count >= focused_limit:
logger.debug(f"{log_prefix} [{stream_name}] 已达专注上限")
break
# 获取最新状态并执行提升
current_subflow = self.subheartflows.get(flow_id)
if not current_subflow:
continue
logger.info(
f"{log_prefix} [{stream_name}] 触发 认真水群 (概率={current_subflow.interest_chatting.start_hfc_probability:.2f})"
)
# 执行状态提升
await current_subflow.change_chat_state(ChatState.FOCUSED)
# 验证提升结果
if (
final_subflow := self.subheartflows.get(flow_id)
) and final_subflow.chat_state.chat_status == ChatState.FOCUSED:
current_focused_count += 1
except Exception as e:
logger.error(f"{log_prefix_manager} 随机停用周期出错: {e}", exc_info=True)
if deactivated_count > 0:
logger.info(f"{log_prefix_manager} 随机停用周期结束, 成功停用 {deactivated_count} 个子心流。")
else:
logger.debug(f"{log_prefix_manager} 随机停用周期结束, 未停用任何子心流。")
logger.error(f"启动HFC 兴趣评估失败: {e}", exc_info=True)
async def evaluate_and_transition_subflows_by_llm(self):
"""
使用LLM评估每个子心流的状态并根据LLM的判断执行状态转换ABSENT <-> CHAT
注意此函数包含对假设的LLM函数的调用。
"""
log_prefix = "[LLM状态评估]"
logger.info(f"{log_prefix} 开始基于LLM评估子心流状态...")
# 获取当前状态和限制用于CHAT激活检查
current_mai_state = self.mai_state_info.get_current_state()
chat_limit = current_mai_state.get_normal_chat_max_num()
@@ -366,55 +342,67 @@ class SubHeartflowManager:
current_chat_count = self.count_subflows_by_state_nolock(ChatState.CHAT)
if not subflows_snapshot:
logger.info(f"{log_prefix} 当前没有子心流需要评估。")
logger.info("当前没有子心流需要评估。")
return
for sub_hf in subflows_snapshot:
flow_id = sub_hf.subheartflow_id
stream_name = chat_manager.get_stream_name(flow_id) or flow_id
log_prefix = f"[{stream_name}]"
current_subflow_state = sub_hf.chat_state.chat_status
# --- 获取观察内容 ---
# 从 sub_hf.observations 获取 ChattingObservation 并提取信息
_observation_summary = "没有可用的观察信息。" # 默认值
try:
# 检查 observations 列表是否存在且不为空
# 假设第一个观察者是 ChattingObservation
first_observation = sub_hf.observations[0]
if isinstance(first_observation, ChattingObservation):
# 组合中期记忆和当前聊天内容
current_chat = first_observation.talking_message_str or "当前无聊天内容。"
combined_summary = f"当前聊天内容:\n{current_chat}"
else:
logger.warning(f"{log_prefix} [{stream_name}] 第一个观察者不是 ChattingObservation 类型。")
except Exception as e:
logger.warning(f"{log_prefix} [{stream_name}] 获取观察信息失败: {e}", exc_info=True)
# 保留默认值或错误信息
combined_summary = f"获取观察信息时出错: {e}"
first_observation = sub_hf.observations[0]
if isinstance(first_observation, ChattingObservation):
# 组合中期记忆和当前聊天内容
await first_observation.observe()
current_chat = first_observation.talking_message_str or "当前无聊天内容。"
combined_summary = f"当前聊天内容:\n{current_chat}"
else:
logger.warning(f"{log_prefix} [{stream_name}] 第一个观察者不是 ChattingObservation 类型。")
# --- 获取麦麦状态 ---
mai_state_description = f"麦麦当前状态: {current_mai_state.value}"
mai_state_description = f"当前状态: {current_mai_state.value}"
# 获取个性化信息
individuality = Individuality.get_instance()
# 构建个性部分
prompt_personality = f"你正在扮演名为{individuality.personality.bot_nickname}的人类,你"
prompt_personality += individuality.personality.personality_core
# 随机添加个性侧面
if individuality.personality.personality_sides:
random_side = random.choice(individuality.personality.personality_sides)
prompt_personality += f"{random_side}"
# 随机添加身份细节
if individuality.identity.identity_detail:
random_detail = random.choice(individuality.identity.identity_detail)
prompt_personality += f"{random_detail}"
# --- 针对 ABSENT 状态 ---
if current_subflow_state == ChatState.ABSENT:
# 构建Prompt
prompt = (
f"子心流 [{stream_name}] 当前处于非活跃(ABSENT)状态.\n"
f"{prompt_personality}\n"
f"你当前没有在: [{stream_name}] 群中聊天。\n"
f"{mai_state_description}\n"
f"最近观察到的内容摘要:\n---\n{combined_summary}\n---\n"
f"基于以上信息,该子心流是否表现出足够的活跃迹象或重要性"
f"值得将其唤醒并进入常规聊天(CHAT)状态?\n"
f"请以 JSON 格式回答,包含一个键 'decision',其值为 true 或 false.\n"
f'例如:{{"decision": true}}\n'
f"这个群里最近的聊天内容是:\n---\n{combined_summary}\n---\n"
f"基于以上信息,请判断你是否愿意在这个群开始闲聊"
f"进入常规聊天(CHAT)状态?\n"
f"给出你的判断,和理由,然后以 JSON 格式回答"
f"包含键 'decision',如果要开始聊天,值为 true ,否则为 false.\n"
f"包含键 'reason',其值为你的理由。\n"
f'例如:{{"decision": true, "reason": "因为我想聊天"}}\n'
f"请只输出有效的 JSON 对象。"
)
# 调用LLM评估
should_activate = await self._llm_evaluate_state_transition(prompt)
if should_activate is None: # 处理解析失败或意外情况
logger.warning(f"{log_prefix} [{stream_name}] LLM评估返回无效结果跳过。")
logger.warning(f"{log_prefix}LLM评估返回无效结果跳过。")
continue
if should_activate:
@@ -423,49 +411,50 @@ class SubHeartflowManager:
current_chat_count = self.count_subflows_by_state_nolock(ChatState.CHAT)
if current_chat_count < chat_limit:
logger.info(
f"{log_prefix} [{stream_name}] LLM建议激活到CHAT状态且未达上限({current_chat_count}/{chat_limit})。正在尝试转换..."
f"{log_prefix}LLM建议激活到CHAT状态且未达上限({current_chat_count}/{chat_limit})。正在尝试转换..."
)
await sub_hf.change_chat_state(ChatState.CHAT)
if sub_hf.chat_state.chat_status == ChatState.CHAT:
transitioned_to_chat += 1
else:
logger.warning(f"{log_prefix} [{stream_name}] 尝试激活到CHAT失败。")
logger.warning(f"{log_prefix}尝试激活到CHAT失败。")
else:
logger.info(
f"{log_prefix} [{stream_name}] LLM建议激活到CHAT状态但已达到上限({current_chat_count}/{chat_limit})。跳过转换。"
f"{log_prefix}LLM建议激活到CHAT状态但已达到上限({current_chat_count}/{chat_limit})。跳过转换。"
)
else:
logger.info(f"{log_prefix}LLM建议不激活到CHAT状态。")
# --- 针对 CHAT 状态 ---
elif current_subflow_state == ChatState.CHAT:
# 构建Prompt
prompt = (
f"子心流 [{stream_name}] 当前处于常规聊天(CHAT)状态.\n"
f"{prompt_personality}\n"
f"你正在在: [{stream_name}] 群中聊天。\n"
f"{mai_state_description}\n"
f"最近观察到的内容摘要:\n---\n{combined_summary}\n---\n"
f"基于以上信息,该子心流是否表现出不活跃、对话结束或不再需要关注的迹象"
f"应该让其进入休眠(ABSENT)状态?\n"
f"请以 JSON 格式回答,包含一个键 'decision',其值为 true (表示应休眠) 或 false (表示不应休眠).\n"
f'例如:{{"decision": true}}\n'
f"这个群里最近的聊天内容是:\n---\n{combined_summary}\n---\n"
f"基于以上信息,请判断你是否愿意在这个群继续闲聊"
f"还是暂时离开聊天,进入休眠状态?\n"
f"给出你的判断,和理由,然后以 JSON 格式回答"
f"包含键 'decision',如果要离开聊天,值为 true ,否则为 false.\n"
f"包含键 'reason',其值为你的理由。\n"
f'例如:{{"decision": true, "reason": "因为我想休息"}}\n'
f"请只输出有效的 JSON 对象。"
)
# 调用LLM评估
should_deactivate = await self._llm_evaluate_state_transition(prompt)
if should_deactivate is None: # 处理解析失败或意外情况
logger.warning(f"{log_prefix} [{stream_name}] LLM评估返回无效结果跳过。")
logger.warning(f"{log_prefix}LLM评估返回无效结果跳过。")
continue
if should_deactivate:
logger.info(f"{log_prefix} [{stream_name}] LLM建议进入ABSENT状态。正在尝试转换...")
logger.info(f"{log_prefix}LLM建议进入ABSENT状态。正在尝试转换...")
await sub_hf.change_chat_state(ChatState.ABSENT)
if sub_hf.chat_state.chat_status == ChatState.ABSENT:
transitioned_to_absent += 1
logger.info(
f"{log_prefix} LLM评估周期结束。"
f" 成功转换到CHAT: {transitioned_to_chat}."
f" 成功转换到ABSENT: {transitioned_to_absent}."
)
else:
logger.info(f"{log_prefix}LLM建议不进入ABSENT状态。")
async def _llm_evaluate_state_transition(self, prompt: str) -> Optional[bool]:
"""
@@ -482,9 +471,9 @@ class SubHeartflowManager:
try:
# --- 真实的 LLM 调用 ---
response_text, _ = await self.llm_state_evaluator.generate_response_async(prompt)
logger.debug(
f"{log_prefix} 使用模型 {self.llm_state_evaluator.model_name} 评估,原始响应: ```{response_text}```"
)
# logger.debug(f"{log_prefix} 使用模型 {self.llm_state_evaluator.model_name} 评估")
logger.debug(f"{log_prefix} 原始输入: {prompt}")
logger.debug(f"{log_prefix} 原始响应: {response_text}")
# --- 解析 JSON 响应 ---
try:
@@ -577,3 +566,48 @@ class SubHeartflowManager:
logger.error(f"删除 SubHeartflow {subheartflow_id} 时出错: {e}", exc_info=True)
else:
logger.warning(f"尝试删除不存在的 SubHeartflow: {subheartflow_id}")
# --- 新增:处理 HFC 无回复回调的专用方法 --- #
async def _handle_hfc_no_reply(self, subheartflow_id: Any):
"""处理来自 HeartFChatting 的连续无回复信号 (通过 partial 绑定 ID)"""
# 注意:这里不需要再获取锁,因为 request_absent_transition 内部会处理锁
logger.debug(f"[管理器 HFC 处理器] 接收到来自 {subheartflow_id} 的 HFC 无回复信号")
await self.request_absent_transition(subheartflow_id)
# --- 结束新增 --- #
# --- 新增:处理来自 HeartFChatting 的状态转换请求 --- #
async def request_absent_transition(self, subflow_id: Any):
"""
接收来自 HeartFChatting 的请求,将特定子心流的状态转换为 ABSENT。
通常在连续多次 "no_reply" 后被调用。
Args:
subflow_id: 需要转换状态的子心流 ID。
"""
async with self._lock:
subflow = self.subheartflows.get(subflow_id)
if not subflow:
logger.warning(f"[状态转换请求] 尝试转换不存在的子心流 {subflow_id} 到 ABSENT")
return
stream_name = chat_manager.get_stream_name(subflow_id) or subflow_id
current_state = subflow.chat_state.chat_status
# 仅当子心流处于 FOCUSED 状态时才进行转换
# 因为 HeartFChatting 只在 FOCUSED 状态下运行
if current_state == ChatState.FOCUSED:
logger.info(f"[状态转换请求] 接收到请求,将 {stream_name} (当前: {current_state.value}) 转换为 ABSENT")
try:
await subflow.change_chat_state(ChatState.ABSENT)
logger.info(f"[状态转换请求] {stream_name} 状态已成功转换为 ABSENT")
except Exception as e:
logger.error(f"[状态转换请求] 转换 {stream_name} 到 ABSENT 时出错: {e}", exc_info=True)
elif current_state == ChatState.ABSENT:
logger.debug(f"[状态转换请求] {stream_name} 已处于 ABSENT 状态,无需转换")
else:
logger.warning(
f"[状态转换请求] 收到对 {stream_name} 的请求,但其状态为 {current_state.value} (非 FOCUSED),不执行转换"
)
# --- 结束新增 --- #

View File

@@ -2,7 +2,7 @@ import asyncio
import time
import traceback
import random # <-- 添加导入
from typing import List, Optional, Dict, Any, Deque
from typing import List, Optional, Dict, Any, Deque, Callable, Coroutine
from collections import deque
from src.plugins.chat.message import MessageRecv, BaseMessageInfo, MessageThinking, MessageSending
from src.plugins.chat.message import Seg # Local import needed after move
@@ -25,7 +25,6 @@ import contextlib
from src.plugins.utils.chat_message_builder import num_new_messages_since
from src.plugins.heartFC_chat.heartFC_Cycleinfo import CycleInfo
from .heartFC_sender import HeartFCSender
# --- End import ---
INITIAL_DURATION = 60.0
@@ -155,18 +154,30 @@ class HeartFChatting:
其生命周期现在由其关联的 SubHeartflow 的 FOCUSED 状态控制。
"""
def __init__(self, chat_id: str, sub_mind: SubMind, observations: Observation):
CONSECUTIVE_NO_REPLY_THRESHOLD = 4 # 连续不回复的阈值
def __init__(
self,
chat_id: str,
sub_mind: SubMind,
observations: Observation,
on_consecutive_no_reply_callback: Callable[[], Coroutine[None, None, None]],
):
"""
HeartFChatting 初始化函数
参数:
chat_id: 聊天流唯一标识符(如stream_id)
sub_mind: 关联的子思维
observations: 关联的观察列表
on_consecutive_no_reply_callback: 连续不回复达到阈值时调用的异步回调函数
"""
# 基础属性
self.stream_id: str = chat_id # 聊天流ID
self.chat_stream: Optional[ChatStream] = None # 关联的聊天流
self.sub_mind: SubMind = sub_mind # 关联的子思维
self.observations: List[Observation] = observations # 关联的观察列表,用于监控聊天流状态
self.on_consecutive_no_reply_callback = on_consecutive_no_reply_callback
# 日志前缀
self.log_prefix: str = f"[{chat_manager.get_stream_name(chat_id) or chat_id}]"
@@ -198,6 +209,8 @@ class HeartFChatting:
self._cycle_counter = 0
self._cycle_history: Deque[CycleInfo] = deque(maxlen=10) # 保留最近10个循环的信息
self._current_cycle: Optional[CycleInfo] = None
self._lian_xu_bu_hui_fu_ci_shu: int = 0 # <--- 新增:连续不回复计数器
self._shutting_down: bool = False # <--- 新增:关闭标志位
async def _initialize(self) -> bool:
"""
@@ -276,6 +289,12 @@ class HeartFChatting:
"""主循环,持续进行计划并可能回复消息,直到被外部取消。"""
try:
while True: # 主循环
# --- 在循环开始处检查关闭标志 ---
if self._shutting_down:
logger.info(f"{self.log_prefix} 检测到关闭标志,退出 HFC 循环。")
break
# --------------------------------
# 创建新的循环信息
self._cycle_counter += 1
self._current_cycle = CycleInfo(self._cycle_counter)
@@ -287,6 +306,12 @@ class HeartFChatting:
# 执行规划和处理阶段
async with self._get_cycle_context() as acquired_lock:
if not acquired_lock:
# 如果未能获取锁(理论上不太可能,除非 shutdown 过程中释放了但又被抢了?)
# 或者也可以在这里再次检查 self._shutting_down
if self._shutting_down:
break # 再次检查,确保退出
logger.warning(f"{self.log_prefix} 未能获取循环处理锁,跳过本次循环。")
await asyncio.sleep(0.1) # 短暂等待避免空转
continue
# 记录规划开始时间点
@@ -320,7 +345,11 @@ class HeartFChatting:
)
except asyncio.CancelledError:
logger.info(f"{self.log_prefix} HeartFChatting: 麦麦的认真水群(HFC)被取消了")
# 设置了关闭标志位后被取消是正常流程
if not self._shutting_down:
logger.warning(f"{self.log_prefix} HeartFChatting: 麦麦的认真水群(HFC)循环意外被取消")
else:
logger.info(f"{self.log_prefix} HeartFChatting: 麦麦的认真水群(HFC)循环已取消 (正常关闭)")
except Exception as e:
logger.error(f"{self.log_prefix} HeartFChatting: 意外错误: {e}")
logger.error(traceback.format_exc())
@@ -451,6 +480,8 @@ class HeartFChatting:
return await handler(reasoning, planner_start_db_time, cycle_timers), ""
except HeartFCError as e:
logger.error(f"{self.log_prefix} 处理{action}时出错: {e}")
# 出错时也重置计数器
self._lian_xu_bu_hui_fu_ci_shu = 0
return False, ""
async def _handle_text_reply(self, reasoning: str, emoji_query: str, cycle_timers: dict) -> tuple[bool, str]:
@@ -471,6 +502,8 @@ class HeartFChatting:
返回:
tuple[bool, str]: (是否回复成功, 思考消息ID)
"""
# 重置连续不回复计数器
self._lian_xu_bu_hui_fu_ci_shu = 0
# 获取锚点消息
anchor_message = await self._get_anchor_message()
@@ -544,8 +577,9 @@ class HeartFChatting:
处理不回复的情况
工作流程:
1. 等待新消息
2. 超时或收到新消息时返回
1. 等待新消息、超时或关闭信号
2. 根据等待结果更新连续不回复计数
3. 如果达到阈值,触发回调
参数:
reasoning: 不回复的原因
@@ -561,14 +595,39 @@ class HeartFChatting:
try:
with Timer("等待新消息", cycle_timers):
return await self._wait_for_new_message(observation, planner_start_db_time, self.log_prefix)
# 等待新消息、超时或关闭信号,并获取结果
await self._wait_for_new_message(observation, planner_start_db_time, self.log_prefix)
if not self._shutting_down:
self._lian_xu_bu_hui_fu_ci_shu += 1
logger.debug(
f"{self.log_prefix} 连续不回复计数增加: {self._lian_xu_bu_hui_fu_ci_shu}/{self.CONSECUTIVE_NO_REPLY_THRESHOLD}"
)
# 检查是否达到阈值
if self._lian_xu_bu_hui_fu_ci_shu >= self.CONSECUTIVE_NO_REPLY_THRESHOLD:
logger.info(
f"{self.log_prefix} 连续不回复达到阈值 ({self._lian_xu_bu_hui_fu_ci_shu}次),调用回调请求状态转换"
)
# 调用回调。注意:这里不重置计数器,依赖回调函数成功改变状态来隐式重置上下文。
await self.on_consecutive_no_reply_callback()
return True
except asyncio.CancelledError:
logger.info(f"{self.log_prefix} 等待被中断")
# 如果在等待过程中任务被取消(可能是因为 shutdown
logger.info(f"{self.log_prefix} 处理 'no_reply' 时等待被中断 (CancelledError)")
# 让异常向上传播,由 _hfc_loop 的异常处理逻辑接管
raise
except Exception as e: # 捕获调用管理器或其他地方可能发生的错误
logger.error(f"{self.log_prefix} 处理 'no_reply' 时发生错误: {e}")
logger.error(traceback.format_exc())
# 发生意外错误时,可以选择是否重置计数器,这里选择不重置
return False # 表示动作未成功
async def _wait_for_new_message(self, observation, planner_start_db_time: float, log_prefix: str) -> bool:
"""
等待新消息
等待新消息 或 检测到关闭信号
参数:
observation: 观察实例
@@ -576,19 +635,36 @@ class HeartFChatting:
log_prefix: 日志前缀
返回:
bool: 是否检测到新消息
bool: 是否检测到新消息 (如果因关闭信号退出则返回 False)
"""
wait_start_time = time.monotonic()
while True:
# --- 在每次循环开始时检查关闭标志 ---
if self._shutting_down:
logger.info(f"{log_prefix} 等待新消息时检测到关闭信号,中断等待。")
return False # 表示因为关闭而退出
# -----------------------------------
# 检查新消息
if await observation.has_new_messages_since(planner_start_db_time):
logger.info(f"{log_prefix} 检测到新消息")
return True
# 检查超时 (放在检查新消息和关闭之后)
if time.monotonic() - wait_start_time > 120:
logger.warning(f"{log_prefix} 等待超时(120秒)")
logger.warning(f"{log_prefix} 等待新消息超时(20秒)")
return False
await asyncio.sleep(1.5)
try:
# 短暂休眠,让其他任务有机会运行,并能更快响应取消或关闭
await asyncio.sleep(0.5) # 缩短休眠时间
except asyncio.CancelledError:
# 如果在休眠时被取消,再次检查关闭标志
# 如果是正常关闭,则不需要警告
if not self._shutting_down:
logger.warning(f"{log_prefix} _wait_for_new_message 的休眠被意外取消")
# 无论如何,重新抛出异常,让上层处理
raise
async def _log_cycle_timers(self, cycle_timers: dict, log_prefix: str):
"""记录循环周期的计时器结果"""
@@ -599,7 +675,9 @@ class HeartFChatting:
timer_strings.append(f"{name}: {formatted_time}")
if timer_strings:
logger.debug(f"{log_prefix} 该次决策耗时: {'; '.join(timer_strings)}")
# 在记录前检查关闭标志
if not self._shutting_down:
logger.debug(f"{log_prefix} 该次决策耗时: {'; '.join(timer_strings)}")
async def _handle_cycle_delay(self, action_taken_this_cycle: bool, cycle_start_time: float, log_prefix: str):
"""处理循环延迟"""
@@ -835,6 +913,7 @@ class HeartFChatting:
async def shutdown(self):
"""优雅关闭HeartFChatting实例取消活动循环任务"""
logger.info(f"{self.log_prefix} 正在关闭HeartFChatting...")
self._shutting_down = True # <-- 在开始关闭时设置标志位
# 取消循环任务
if self._loop_task and not self._loop_task.done():
@@ -865,6 +944,25 @@ class HeartFChatting:
action=action,
reasoning=reasoning,
)
# 在记录循环日志前检查关闭标志
if not self._shutting_down:
self._current_cycle.complete_cycle()
self._cycle_history.append(self._current_cycle)
# 记录循环信息和计时器结果
timer_strings = []
for name, elapsed in self._current_cycle.timers.items():
formatted_time = f"{elapsed * 1000:.2f}毫秒" if elapsed < 1 else f"{elapsed:.2f}"
timer_strings.append(f"{name}: {formatted_time}")
logger.debug(
f"{self.log_prefix} 第 #{self._current_cycle.cycle_id}次思考完成,"
f"耗时: {self._current_cycle.end_time - self._current_cycle.start_time:.2f}秒, "
f"动作: {self._current_cycle.action_type}"
+ (f"\n计时器详情: {'; '.join(timer_strings)}" if timer_strings else "")
)
return prompt
async def _build_planner_prompt(

View File

@@ -178,395 +178,6 @@ class LLMRequest:
output_cost = (completion_tokens / 1000000) * self.pri_out
return round(input_cost + output_cost, 6)
'''
async def _execute_request(
self,
endpoint: str,
prompt: str = None,
image_base64: str = None,
image_format: str = None,
payload: dict = None,
retry_policy: dict = None,
response_handler: callable = None,
user_id: str = "system",
request_type: str = None,
):
"""统一请求执行入口
Args:
endpoint: API端点路径 (如 "chat/completions")
prompt: prompt文本
image_base64: 图片的base64编码
image_format: 图片格式
payload: 请求体数据
retry_policy: 自定义重试策略
response_handler: 自定义响应处理器
user_id: 用户ID
request_type: 请求类型
"""
if request_type is None:
request_type = self.request_type
# 合并重试策略
default_retry = {
"max_retries": 3,
"base_wait": 10,
"retry_codes": [429, 413, 500, 503],
"abort_codes": [400, 401, 402, 403],
}
policy = {**default_retry, **(retry_policy or {})}
# 常见Error Code Mapping
error_code_mapping = {
400: "参数不正确",
401: "API key 错误,认证失败,请检查/config/bot_config.toml和.env中的配置是否正确哦~",
402: "账号余额不足",
403: "需要实名,或余额不足",
404: "Not Found",
429: "请求过于频繁,请稍后再试",
500: "服务器内部故障",
503: "服务器负载过高",
}
api_url = f"{self.base_url.rstrip('/')}/{endpoint.lstrip('/')}"
# 判断是否为流式
stream_mode = self.stream
# logger_msg = "进入流式输出模式," if stream_mode else ""
# logger.debug(f"{logger_msg}发送请求到URL: {api_url}")
# logger.info(f"使用模型: {self.model_name}")
# 构建请求体
if image_base64:
payload = await self._build_payload(prompt, image_base64, image_format)
elif payload is None:
payload = await self._build_payload(prompt)
# 流式输出标志
# 先构建payload再添加流式输出标志
if stream_mode:
payload["stream"] = stream_mode
for retry in range(policy["max_retries"]):
try:
# 使用上下文管理器处理会话
headers = await self._build_headers()
# 似乎是openai流式必须要的东西,不过阿里云的qwq-plus加了这个没有影响
if stream_mode:
headers["Accept"] = "text/event-stream"
async with aiohttp.ClientSession() as session:
try:
async with session.post(api_url, headers=headers, json=payload) as response:
# 处理需要重试的状态码
if response.status in policy["retry_codes"]:
wait_time = policy["base_wait"] * (2**retry)
logger.warning(
f"模型 {self.model_name} 错误码: {response.status}, 等待 {wait_time}秒后重试"
)
if response.status == 413:
logger.warning("请求体过大,尝试压缩...")
image_base64 = compress_base64_image_by_scale(image_base64)
payload = await self._build_payload(prompt, image_base64, image_format)
elif response.status in [500, 503]:
logger.error(
f"模型 {self.model_name} 错误码: {response.status} - {error_code_mapping.get(response.status)}"
)
raise RuntimeError("服务器负载过高模型恢复失败QAQ")
else:
logger.warning(f"模型 {self.model_name} 请求限制(429),等待{wait_time}秒后重试...")
await asyncio.sleep(wait_time)
continue
elif response.status in policy["abort_codes"]:
logger.error(
f"模型 {self.model_name} 错误码: {response.status} - {error_code_mapping.get(response.status)}"
)
# 尝试获取并记录服务器返回的详细错误信息
try:
error_json = await response.json()
if error_json and isinstance(error_json, list) and len(error_json) > 0:
for error_item in error_json:
if "error" in error_item and isinstance(error_item["error"], dict):
error_obj = error_item["error"]
error_code = error_obj.get("code")
error_message = error_obj.get("message")
error_status = error_obj.get("status")
logger.error(
f"服务器错误详情: 代码={error_code}, 状态={error_status}, "
f"消息={error_message}"
)
elif isinstance(error_json, dict) and "error" in error_json:
# 处理单个错误对象的情况
error_obj = error_json.get("error", {})
error_code = error_obj.get("code")
error_message = error_obj.get("message")
error_status = error_obj.get("status")
logger.error(
f"服务器错误详情: 代码={error_code}, 状态={error_status}, 消息={error_message}"
)
else:
# 记录原始错误响应内容
logger.error(f"服务器错误响应: {error_json}")
except Exception as e:
logger.warning(f"无法解析服务器错误响应: {str(e)}")
if response.status == 403:
# 只针对硅基流动的V3和R1进行降级处理
if (
self.model_name.startswith("Pro/deepseek-ai")
and self.base_url == "https://api.siliconflow.cn/v1/"
):
old_model_name = self.model_name
self.model_name = self.model_name[4:] # 移除"Pro/"前缀
logger.warning(
f"检测到403错误模型从 {old_model_name} 降级为 {self.model_name}"
)
# 对全局配置进行更新
if global_config.llm_normal.get("name") == old_model_name:
global_config.llm_normal["name"] = self.model_name
logger.warning(f"将全局配置中的 llm_normal 模型临时降级至{self.model_name}")
if global_config.llm_reasoning.get("name") == old_model_name:
global_config.llm_reasoning["name"] = self.model_name
logger.warning(
f"将全局配置中的 llm_reasoning 模型临时降级至{self.model_name}"
)
# 更新payload中的模型名
if payload and "model" in payload:
payload["model"] = self.model_name
# 重新尝试请求
retry -= 1 # 不计入重试次数
continue
raise RuntimeError(f"请求被拒绝: {error_code_mapping.get(response.status)}")
response.raise_for_status()
reasoning_content = ""
# 将流式输出转化为非流式输出
if stream_mode:
flag_delta_content_finished = False
accumulated_content = ""
usage = None # 初始化usage变量避免未定义错误
async for line_bytes in response.content:
try:
line = line_bytes.decode("utf-8").strip()
if not line:
continue
if line.startswith("data:"):
data_str = line[5:].strip()
if data_str == "[DONE]":
break
try:
chunk = json.loads(data_str)
if flag_delta_content_finished:
chunk_usage = chunk.get("usage", None)
if chunk_usage:
usage = chunk_usage # 获取token用量
else:
delta = chunk["choices"][0]["delta"]
delta_content = delta.get("content")
if delta_content is None:
delta_content = ""
accumulated_content += delta_content
# 检测流式输出文本是否结束
finish_reason = chunk["choices"][0].get("finish_reason")
if delta.get("reasoning_content", None):
reasoning_content += delta["reasoning_content"]
if finish_reason == "stop":
chunk_usage = chunk.get("usage", None)
if chunk_usage:
usage = chunk_usage
break
# 部分平台在文本输出结束前不会返回token用量此时需要再获取一次chunk
flag_delta_content_finished = True
except Exception as e:
logger.exception(f"模型 {self.model_name} 解析流式输出错误: {str(e)}")
except GeneratorExit:
logger.warning("模型 {self.model_name} 流式输出被中断,正在清理资源...")
# 确保资源被正确清理
await response.release()
# 返回已经累积的内容
result = {
"choices": [
{
"message": {
"content": accumulated_content,
"reasoning_content": reasoning_content,
# 流式输出可能没有工具调用此处不需要添加tool_calls字段
}
}
],
"usage": usage,
}
return (
response_handler(result)
if response_handler
else self._default_response_handler(result, user_id, request_type, endpoint)
)
except Exception as e:
logger.error(f"模型 {self.model_name} 处理流式输出时发生错误: {str(e)}")
# 确保在发生错误时也能正确清理资源
try:
await response.release()
except Exception as cleanup_error:
logger.error(f"清理资源时发生错误: {cleanup_error}")
# 返回已经累积的内容
result = {
"choices": [
{
"message": {
"content": accumulated_content,
"reasoning_content": reasoning_content,
# 流式输出可能没有工具调用此处不需要添加tool_calls字段
}
}
],
"usage": usage,
}
return (
response_handler(result)
if response_handler
else self._default_response_handler(result, user_id, request_type, endpoint)
)
content = accumulated_content
think_match = re.search(r"<think>(.*?)</think>", content, re.DOTALL)
if think_match:
reasoning_content = think_match.group(1).strip()
content = re.sub(r"<think>.*?</think>", "", content, flags=re.DOTALL).strip()
# 构造一个伪result以便调用自定义响应处理器或默认处理器
result = {
"choices": [
{
"message": {
"content": content,
"reasoning_content": reasoning_content,
# 流式输出可能没有工具调用此处不需要添加tool_calls字段
}
}
],
"usage": usage,
}
return (
response_handler(result)
if response_handler
else self._default_response_handler(result, user_id, request_type, endpoint)
)
else:
result = await response.json()
# 使用自定义处理器或默认处理
return (
response_handler(result)
if response_handler
else self._default_response_handler(result, user_id, request_type, endpoint)
)
except (aiohttp.ClientError, asyncio.TimeoutError) as e:
if retry < policy["max_retries"] - 1:
wait_time = policy["base_wait"] * (2**retry)
logger.error(f"模型 {self.model_name} 网络错误,等待{wait_time}秒后重试... 错误: {str(e)}")
await asyncio.sleep(wait_time)
continue
else:
logger.critical(f"模型 {self.model_name} 网络错误达到最大重试次数: {str(e)}")
raise RuntimeError(f"网络请求失败: {str(e)}") from e
except Exception as e:
logger.critical(f"模型 {self.model_name} 未预期的错误: {str(e)}")
raise RuntimeError(f"请求过程中发生错误: {str(e)}") from e
except aiohttp.ClientResponseError as e:
# 处理aiohttp抛出的响应错误
if retry < policy["max_retries"] - 1:
wait_time = policy["base_wait"] * (2**retry)
logger.error(
f"模型 {self.model_name} HTTP响应错误等待{wait_time}秒后重试... 状态码: {e.status}, 错误: {e.message}"
)
try:
if hasattr(e, "response") and e.response and hasattr(e.response, "text"):
error_text = await e.response.text()
try:
error_json = json.loads(error_text)
if isinstance(error_json, list) and len(error_json) > 0:
for error_item in error_json:
if "error" in error_item and isinstance(error_item["error"], dict):
error_obj = error_item["error"]
logger.error(
f"模型 {self.model_name} 服务器错误详情: 代码={error_obj.get('code')}, "
f"状态={error_obj.get('status')}, "
f"消息={error_obj.get('message')}"
)
elif isinstance(error_json, dict) and "error" in error_json:
error_obj = error_json.get("error", {})
logger.error(
f"模型 {self.model_name} 服务器错误详情: 代码={error_obj.get('code')}, "
f"状态={error_obj.get('status')}, "
f"消息={error_obj.get('message')}"
)
else:
logger.error(f"模型 {self.model_name} 服务器错误响应: {error_json}")
except (json.JSONDecodeError, TypeError) as json_err:
logger.warning(
f"模型 {self.model_name} 响应不是有效的JSON: {str(json_err)}, 原始内容: {error_text[:200]}"
)
except (AttributeError, TypeError, ValueError) as parse_err:
logger.warning(f"模型 {self.model_name} 无法解析响应错误内容: {str(parse_err)}")
await asyncio.sleep(wait_time)
else:
logger.critical(
f"模型 {self.model_name} HTTP响应错误达到最大重试次数: 状态码: {e.status}, 错误: {e.message}"
)
# 安全地检查和记录请求详情
if (
image_base64
and payload
and isinstance(payload, dict)
and "messages" in payload
and len(payload["messages"]) > 0
):
if isinstance(payload["messages"][0], dict) and "content" in payload["messages"][0]:
content = payload["messages"][0]["content"]
if isinstance(content, list) and len(content) > 1 and "image_url" in content[1]:
payload["messages"][0]["content"][1]["image_url"]["url"] = (
f"data:image/{image_format.lower() if image_format else 'jpeg'};base64,"
f"{image_base64[:10]}...{image_base64[-10:]}"
)
logger.critical(f"请求头: {await self._build_headers(no_key=True)} 请求体: {payload}")
raise RuntimeError(f"模型 {self.model_name} API请求失败: 状态码 {e.status}, {e.message}") from e
except Exception as e:
if retry < policy["max_retries"] - 1:
wait_time = policy["base_wait"] * (2**retry)
logger.error(f"模型 {self.model_name} 请求失败,等待{wait_time}秒后重试... 错误: {str(e)}")
await asyncio.sleep(wait_time)
else:
logger.critical(f"模型 {self.model_name} 请求失败: {str(e)}")
# 安全地检查和记录请求详情
if (
image_base64
and payload
and isinstance(payload, dict)
and "messages" in payload
and len(payload["messages"]) > 0
):
if isinstance(payload["messages"][0], dict) and "content" in payload["messages"][0]:
content = payload["messages"][0]["content"]
if isinstance(content, list) and len(content) > 1 and "image_url" in content[1]:
payload["messages"][0]["content"][1]["image_url"]["url"] = (
f"data:image/{image_format.lower() if image_format else 'jpeg'};base64,"
f"{image_base64[:10]}...{image_base64[-10:]}"
)
logger.critical(f"请求头: {await self._build_headers(no_key=True)} 请求体: {payload}")
raise RuntimeError(f"模型 {self.model_name} API请求失败: {str(e)}") from e
logger.error(f"模型 {self.model_name} 达到最大重试次数,请求仍然失败")
raise RuntimeError(f"模型 {self.model_name} 达到最大重试次数API请求仍然失败")
'''
async def _prepare_request(
self,
endpoint: str,
@@ -820,6 +431,7 @@ class LLMRequest:
policy = request_content["policy"]
payload = request_content["payload"]
wait_time = policy["base_wait"] * (2**retry_count)
keep_request = False
if retry_count < policy["max_retries"] - 1:
keep_request = True
if isinstance(exception, RequestAbortException):