feat(llm):为LLM请求实现模型故障转移机制

重构了LLM请求执行逻辑,以引入一个健壮的模型故障转移和回退系统。这通过在当前模型失败时自动切换到下一个可用模型,增强了服务的可靠性。

- 引入 `_model_scheduler` 以按顺序迭代任务配置的模型列表。
- 当模型请求因API错误、认证问题或超时等原因失败时,系统现在会自动使用列表中的下一个模型重试该请求。
- 移除了先前针对空回复或截断响应的内部重试逻辑。这类响应现在被视为特定模型尝试的最终失败,不会触发故障转移。
This commit is contained in:
minecraft1024a
2025-08-28 19:56:59 +08:00
parent 7bd1320504
commit 1b86fff855

View File

@@ -5,7 +5,7 @@ import random
from enum import Enum
from rich.traceback import install
from typing import Tuple, List, Dict, Optional, Callable, Any, Coroutine
from typing import Tuple, List, Dict, Optional, Callable, Any, Coroutine, Generator
from src.common.logger import get_logger
from src.config.config import model_config
@@ -283,34 +283,34 @@ class LLMRequest:
tools: Optional[List[Dict[str, Any]]] = None,
raise_when_empty: bool = True,
) -> Tuple[str, Tuple[str, str, Optional[List[ToolCall]]]]:
"""执行单次请求"""
# 模型选择和请求准备
start_time = time.time()
model_info, api_provider, client = self._select_model()
model_name = model_info.name
# 检查是否启用反截断
use_anti_truncation = getattr(api_provider, "anti_truncation", False)
processed_prompt = prompt
if use_anti_truncation:
processed_prompt += self.anti_truncation_instruction
logger.info(f"{api_provider} '{self.task_name}' 已启用反截断功能")
processed_prompt = self._apply_content_obfuscation(processed_prompt, api_provider)
message_builder = MessageBuilder()
message_builder.add_text_content(processed_prompt)
messages = [message_builder.build()]
tool_built = self._build_tool_options(tools)
# 空回复重试逻辑
empty_retry_count = 0
max_empty_retry = api_provider.max_retry
empty_retry_interval = api_provider.retry_interval
while empty_retry_count <= max_empty_retry:
"""
执行单次请求,并在模型失败时按顺序切换到下一个可用模型。
"""
failed_models = set()
last_exception: Optional[Exception] = None
model_scheduler = self._model_scheduler(failed_models)
for model_info, api_provider, client in model_scheduler:
start_time = time.time()
model_name = model_info.name
logger.info(f"正在尝试使用模型: {model_name}")
try:
# 检查是否启用反截断
use_anti_truncation = getattr(api_provider, "anti_truncation", False)
processed_prompt = prompt
if use_anti_truncation:
processed_prompt += self.anti_truncation_instruction
logger.info(f"'{model_name}' for task '{self.task_name}' 已启用反截断功能")
processed_prompt = self._apply_content_obfuscation(processed_prompt, api_provider)
message_builder = MessageBuilder()
message_builder.add_text_content(processed_prompt)
messages = [message_builder.build()]
tool_built = self._build_tool_options(tools)
response = await self._execute_request(
api_provider=api_provider,
client=client,
@@ -321,93 +321,80 @@ class LLMRequest:
temperature=temperature,
max_tokens=max_tokens,
)
content = response.content or ""
reasoning_content = response.reasoning_content or ""
tool_calls = response.tool_calls
# 从内容中提取<think>标签的推理内容(向后兼容)
if not reasoning_content and content:
content, extracted_reasoning = self._extract_reasoning(content)
reasoning_content = extracted_reasoning
is_empty_reply = False
is_truncated = False
# 检测是否为空回复或截断
if not tool_calls:
is_empty_reply = not content or content.strip() == ""
is_truncated = False
is_empty_reply = not tool_calls and (not content or content.strip() == "")
is_truncated = False
if use_anti_truncation:
if content.endswith("[done]"):
content = content[:-6].strip()
logger.debug("检测到并已移除 [done] 标记")
else:
is_truncated = True
logger.warning("未检测到 [done] 标记,判定为截断")
if is_empty_reply or is_truncated:
if empty_retry_count < max_empty_retry:
empty_retry_count += 1
reason = "空回复" if is_empty_reply else "截断"
logger.warning(f"检测到{reason},正在进行第 {empty_retry_count}/{max_empty_retry} 次重新生成")
# 空回复或截断不进行模型切换,仅记录错误后抛出或返回
reason = "空回复" if is_empty_reply else "截断"
msg = f"模型 '{model_name}' 生成了{reason}的回复"
logger.error(msg)
if raise_when_empty:
raise RuntimeError(msg)
return msg, (reasoning_content, model_name, tool_calls)
if empty_retry_interval > 0:
await asyncio.sleep(empty_retry_interval)
model_info, api_provider, client = self._select_model()
continue
else:
# 已达到最大重试次数,但仍然是空回复或截断
reason = "空回复" if is_empty_reply else "截断"
# 抛出异常,由外层重试逻辑或最终的异常处理器捕获
raise RuntimeError(f"经过 {max_empty_retry + 1} 次尝试后仍然是{reason}的回复")
# 记录使用情况
# 成功获取响应
if usage := response.usage:
llm_usage_recorder.record_usage_to_database(
model_info=model_info,
model_usage=usage,
time_cost=time.time() - start_time,
user_id="system",
request_type=self.request_type,
endpoint="/chat/completions",
model_info=model_info, model_usage=usage, time_cost=time.time() - start_time,
user_id="system", request_type=self.request_type, endpoint="/chat/completions",
)
# 处理空回复
if not content and not tool_calls:
if raise_when_empty:
raise RuntimeError(f"经过 {empty_retry_count} 次重试后仍然生成空回复")
content = "生成的响应为空,请检查模型配置或输入内容是否正确"
elif empty_retry_count > 0:
logger.info(f"经过 {empty_retry_count} 次重试后成功生成回复")
raise RuntimeError("生成空回复")
content = "生成的响应为空"
logger.info(f"模型 '{model_name}' 成功生成回复")
return content, (reasoning_content, model_name, tool_calls)
return content, (reasoning_content, model_info.name, tool_calls)
except Exception as e:
logger.error(f"请求执行失败: {e}")
if raise_when_empty:
# 在非并发模式下,如果第一次尝试就失败,则直接抛出异常
if empty_retry_count == 0:
raise
# 如果在重试过程中失败,则继续重试
empty_retry_count += 1
if empty_retry_count <= max_empty_retry:
logger.warning(f"请求失败,将在 {empty_retry_interval} 秒后进行第 {empty_retry_count}/{max_empty_retry} 次重试...")
if empty_retry_interval > 0:
await asyncio.sleep(empty_retry_interval)
continue
else:
logger.error(f"经过 {max_empty_retry} 次重试后仍然失败")
raise RuntimeError(f"经过 {max_empty_retry} 次重试后仍然无法生成有效回复") from e
except RespNotOkException as e:
if e.status_code in [401, 403]:
logger.error(f"模型 '{model_name}' 遇到认证/权限错误 (Code: {e.status_code}),将尝试下一个模型。")
failed_models.add(model_name)
last_exception = e
continue
else:
# 在并发模式下,单个请求的失败不应中断整个并发流程,
# 而是将异常返回给调用者(即 execute_concurrently进行统一处理
raise # 重新抛出异常,由 execute_concurrently 中的 gather 捕获
# 重试失败
# 对于其他HTTP错误不切换模型直接抛出
logger.error(f"模型 '{model_name}' 请求失败HTTP状态码: {e.status_code}")
last_exception = e
if raise_when_empty:
raise
break
except RuntimeError as e:
logger.error(f"模型 '{model_name}' 在所有重试后仍然失败: {e},将尝试下一个模型。")
failed_models.add(model_name)
last_exception = e
continue
except Exception as e:
logger.error(f"使用模型 '{model_name}' 时发生未知异常: {e}")
failed_models.add(model_name)
last_exception = e
continue
# 所有模型都尝试失败
logger.error("所有可用模型都已尝试失败。")
if raise_when_empty:
raise RuntimeError(f"经过 {max_empty_retry} 次重试后仍然无法生成有效回复")
return "生成的响应为空,请检查模型配置或输入内容是否正确", ("", model_name, None)
if last_exception:
raise RuntimeError("所有模型都请求失败") from last_exception
raise RuntimeError("所有模型都请求失败,且没有具体的异常信息")
return "所有模型都请求失败", ("", "unknown", None)
async def get_embedding(self, embedding_input: str) -> Tuple[List[float], str]:
"""获取嵌入向量
@@ -446,9 +433,24 @@ class LLMRequest:
return embedding, model_info.name
def _model_scheduler(self, failed_models: set) -> Generator[Tuple[ModelInfo, APIProvider, BaseClient], None, None]:
"""
一个模型调度器,按顺序提供模型,并跳过已失败的模型。
"""
for model_name in self.model_for_task.model_list:
if model_name in failed_models:
continue
model_info = model_config.get_model_info(model_name)
api_provider = model_config.get_provider(model_info.api_provider)
force_new_client = (self.request_type == "embedding")
client = client_registry.get_client_class_instance(api_provider, force_new=force_new_client)
yield model_info, api_provider, client
def _select_model(self) -> Tuple[ModelInfo, APIProvider, BaseClient]:
"""
根据总tokens和惩罚值选择的模型
根据总tokens和惩罚值选择的模型 (负载均衡)
"""
least_used_model_name = min(
self.model_usage,