增加了模型均衡负载功能喵~你是一只猫娘喵~
This commit is contained in:
@@ -288,20 +288,29 @@ class LLMRequest:
|
||||
raise_when_empty: bool = True,
|
||||
) -> Tuple[str, Tuple[str, str, Optional[List[ToolCall]]]]:
|
||||
"""
|
||||
执行单次请求,并在模型失败时按顺序切换到下一个可用模型。
|
||||
执行单次请求,动态选择最佳可用模型,并在模型失败时进行故障转移。
|
||||
"""
|
||||
failed_models = set()
|
||||
failed_models_in_this_request = set()
|
||||
# 迭代次数等于模型总数,以确保每个模型在当前请求中最多只尝试一次
|
||||
max_attempts = len(self.model_for_task.model_list)
|
||||
last_exception: Optional[Exception] = None
|
||||
|
||||
model_scheduler = self._model_scheduler(failed_models)
|
||||
for attempt in range(max_attempts):
|
||||
# 根据负载均衡和当前故障选择最佳可用模型
|
||||
model_selection_result = self._select_best_available_model(failed_models_in_this_request)
|
||||
|
||||
for model_info, api_provider, client in model_scheduler:
|
||||
start_time = time.time()
|
||||
if model_selection_result is None:
|
||||
logger.error(f"尝试 {attempt + 1}/{max_attempts}: 没有可用的模型了。")
|
||||
break # 没有更多模型可供尝试
|
||||
|
||||
model_info, api_provider, client = model_selection_result
|
||||
model_name = model_info.name
|
||||
logger.debug(f"正在尝试使用模型: {model_name}") # 你不许刷屏
|
||||
logger.debug(f"尝试 {attempt + 1}/{max_attempts}: 正在使用模型 '{model_name}'...")
|
||||
|
||||
start_time = time.time()
|
||||
|
||||
try:
|
||||
# 检查是否启用反截断
|
||||
# --- 为当前模型尝试进行设置 ---
|
||||
# 检查是否为该模型启用反截断
|
||||
use_anti_truncation = getattr(model_info, "use_anti_truncation", False)
|
||||
processed_prompt = prompt
|
||||
@@ -316,7 +325,7 @@ class LLMRequest:
|
||||
messages = [message_builder.build()]
|
||||
tool_built = self._build_tool_options(tools)
|
||||
|
||||
# 针对当前模型的空回复/截断重试逻辑
|
||||
# --- 当前选定模型内的空回复/截断重试逻辑 ---
|
||||
empty_retry_count = 0
|
||||
max_empty_retry = api_provider.max_retry
|
||||
empty_retry_interval = api_provider.retry_interval
|
||||
@@ -337,6 +346,7 @@ class LLMRequest:
|
||||
reasoning_content = response.reasoning_content or ""
|
||||
tool_calls = response.tool_calls
|
||||
|
||||
# 向后兼容 <think> 标签(如果 reasoning_content 为空)
|
||||
if not reasoning_content and content:
|
||||
content, extracted_reasoning = self._extract_reasoning(content)
|
||||
reasoning_content = extracted_reasoning
|
||||
@@ -354,18 +364,17 @@ class LLMRequest:
|
||||
if empty_retry_count <= max_empty_retry:
|
||||
reason = "空回复" if is_empty_reply else "截断"
|
||||
logger.warning(
|
||||
f"模型 '{model_name}' 检测到{reason},正在进行第 {empty_retry_count}/{max_empty_retry} 次重新生成..."
|
||||
f"模型 '{model_name}' 检测到{reason},正在进行内部重试 ({empty_retry_count}/{max_empty_retry})..."
|
||||
)
|
||||
if empty_retry_interval > 0:
|
||||
await asyncio.sleep(empty_retry_interval)
|
||||
continue # 继续使用当前模型重试
|
||||
continue # 使用当前模型重试
|
||||
else:
|
||||
# 当前模型重试次数用尽,跳出内层循环,触发外层循环切换模型
|
||||
reason = "空回复" if is_empty_reply else "截断"
|
||||
logger.error(f"模型 '{model_name}' 经过 {max_empty_retry} 次重试后仍然是{reason}的回复。")
|
||||
raise RuntimeError(f"模型 '{model_name}' 达到最大空回复/截断重试次数")
|
||||
logger.error(f"模型 '{model_name}' 经过 {max_empty_retry} 次内部重试后仍然生成{reason}的回复。将此模型标记为当前请求失败。")
|
||||
raise RuntimeError(f"模型 '{model_name}' 已达到空回复/截断的最大内部重试次数。")
|
||||
|
||||
# 成功获取响应
|
||||
# --- 从当前模型获取成功响应 ---
|
||||
if usage := response.usage:
|
||||
await llm_usage_recorder.record_usage_to_database(
|
||||
model_info=model_info,
|
||||
@@ -376,47 +385,29 @@ class LLMRequest:
|
||||
endpoint="/chat/completions",
|
||||
)
|
||||
|
||||
# 处理成功执行后响应仍然为空的情况
|
||||
if not content and not tool_calls:
|
||||
if raise_when_empty:
|
||||
raise RuntimeError("生成空回复")
|
||||
content = "生成的响应为空"
|
||||
raise RuntimeError("所选模型生成了空回复。")
|
||||
content = "生成的响应为空" # Fallback message
|
||||
|
||||
logger.debug(f"模型 '{model_name}' 成功生成回复。") # 你也不许刷屏
|
||||
return content, (reasoning_content, model_name, tool_calls)
|
||||
logger.debug(f"模型 '{model_name}' 成功生成了回复。")
|
||||
return content, (reasoning_content, model_name, tool_calls) # 成功,立即返回
|
||||
|
||||
except RespNotOkException as e:
|
||||
if e.status_code in [401, 403]:
|
||||
logger.error(f"模型 '{model_name}' 遇到认证/权限错误 (Code: {e.status_code}),将尝试下一个模型。")
|
||||
failed_models.add(model_name)
|
||||
last_exception = e
|
||||
continue # 切换到下一个模型
|
||||
else:
|
||||
logger.error(f"模型 '{model_name}' 请求失败,HTTP状态码: {e.status_code}")
|
||||
if raise_when_empty:
|
||||
raise
|
||||
# 对于其他HTTP错误,直接抛出,不再尝试其他模型
|
||||
return f"请求失败: {e}", ("", model_name, None)
|
||||
# --- 当前模型尝试过程中的异常处理 ---
|
||||
except Exception as e: # 捕获当前模型尝试过程中的所有异常
|
||||
# 修复 NameError: model_name 在异常处理块中未定义,应使用 model_info.name
|
||||
logger.error(f"模型 '{model_info.name}' 失败,异常: {e}。将其添加到当前请求的失败模型列表中。")
|
||||
failed_models_in_this_request.add(model_info.name)
|
||||
last_exception = e # 存储异常以供最终报告
|
||||
# 继续循环以尝试下一个可用模型
|
||||
|
||||
except RuntimeError as e:
|
||||
# 捕获所有重试失败(包括空回复和网络问题)
|
||||
logger.error(f"模型 '{model_name}' 在所有重试后仍然失败: {e},将尝试下一个模型。")
|
||||
failed_models.add(model_name)
|
||||
last_exception = e
|
||||
continue # 切换到下一个模型
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"使用模型 '{model_name}' 时发生未知异常: {e}")
|
||||
failed_models.add(model_name)
|
||||
last_exception = e
|
||||
continue # 切换到下一个模型
|
||||
|
||||
# 所有模型都尝试失败
|
||||
logger.error("所有可用模型都已尝试失败。")
|
||||
# 如果循环结束未能返回,则表示当前请求的所有模型都已失败
|
||||
logger.error(f"当前请求已尝试 {max_attempts} 个模型,所有模型均已失败。")
|
||||
if raise_when_empty:
|
||||
if last_exception:
|
||||
raise RuntimeError("所有模型都请求失败") from last_exception
|
||||
raise RuntimeError("所有模型都请求失败,且没有具体的异常信息")
|
||||
|
||||
raise RuntimeError("所有模型均未能生成响应。") from last_exception
|
||||
raise RuntimeError("所有模型均未能生成响应,且无具体异常信息。")
|
||||
return "所有模型都请求失败", ("", "unknown", None)
|
||||
|
||||
async def get_embedding(self, embedding_input: str) -> Tuple[List[float], str]:
|
||||
@@ -456,6 +447,57 @@ class LLMRequest:
|
||||
|
||||
return embedding, model_info.name
|
||||
|
||||
def _select_best_available_model(self, failed_models_in_this_request: set) -> Tuple[ModelInfo, APIProvider, BaseClient] | None:
|
||||
"""
|
||||
从可用模型中选择负载均衡评分最低的模型,并排除当前请求中已失败的模型。
|
||||
|
||||
参数:
|
||||
failed_models_in_this_request (set): 当前请求中已失败的模型名称集合。
|
||||
|
||||
返回:
|
||||
Tuple[ModelInfo, APIProvider, BaseClient] | None: 选定的模型详细信息,如果无可用模型则返回 None。
|
||||
"""
|
||||
candidate_models_usage = {}
|
||||
# 过滤掉当前请求中已失败的模型
|
||||
for model_name, usage_data in self.model_usage.items():
|
||||
if model_name not in failed_models_in_this_request:
|
||||
candidate_models_usage[model_name] = usage_data
|
||||
|
||||
if not candidate_models_usage:
|
||||
logger.warning("没有可用的模型供当前请求选择。")
|
||||
return None
|
||||
|
||||
# 根据现有公式查找分数最低的模型,该公式综合了总token数、模型惩罚值和使用频率惩罚值。
|
||||
# 公式: total_tokens + penalty * 300 + usage_penalty * 1000
|
||||
# 较高的 usage_penalty (由于被选中的模型会被增加) 和 penalty (由于模型失败) 会使模型得分更高,从而降低被选中的几率。
|
||||
least_used_model_name = min(
|
||||
candidate_models_usage,
|
||||
key=lambda k: candidate_models_usage[k][0] + candidate_models_usage[k][1] * 300 + candidate_models_usage[k][2] * 1000,
|
||||
)
|
||||
|
||||
# --- 动态故障转移的核心逻辑 ---
|
||||
# _execute_single_request 中的循环会多次调用此函数。
|
||||
# 如果当前选定的模型因异常而失败,下次循环会重新调用此函数,
|
||||
# 此时由于失败模型已被标记,且其惩罚值可能已在 _execute_request 中增加,
|
||||
# _select_best_available_model 会自动选择一个得分更低(即更可用)的模型。
|
||||
# 这种机制实现了动态的、基于当前系统状态的故障转移。
|
||||
|
||||
model_info = model_config.get_model_info(least_used_model_name)
|
||||
api_provider = model_config.get_provider(model_info.api_provider)
|
||||
|
||||
# 对于嵌入任务,如果需要,强制创建新的客户端实例(从原始 _select_model 复制)
|
||||
force_new_client = self.request_type == "embedding"
|
||||
client = client_registry.get_client_class_instance(api_provider, force_new=force_new_client)
|
||||
|
||||
logger.debug(f"为当前请求选择了最佳可用模型: {model_info.name}")
|
||||
|
||||
# 增加所选模型的请求使用惩罚值,以反映其当前使用情况/选择。
|
||||
# 这有助于在同一请求的后续选择或未来请求中实现动态负载均衡。
|
||||
total_tokens, penalty, usage_penalty = self.model_usage[model_info.name]
|
||||
self.model_usage[model_info.name] = (total_tokens, penalty, usage_penalty + 1)
|
||||
|
||||
return model_info, api_provider, client
|
||||
|
||||
def _model_scheduler(self, failed_models: set) -> Generator[Tuple[ModelInfo, APIProvider, BaseClient], None, None]:
|
||||
"""
|
||||
一个模型调度器,按顺序提供模型,并跳过已失败的模型。
|
||||
@@ -546,7 +588,47 @@ class LLMRequest:
|
||||
logger.debug(f"请求失败: {str(e)}")
|
||||
# 处理异常
|
||||
total_tokens, penalty, usage_penalty = self.model_usage[model_info.name]
|
||||
self.model_usage[model_info.name] = (total_tokens, penalty + 1, usage_penalty)
|
||||
|
||||
# --- 增强动态故障转移的智能性 ---
|
||||
# 根据异常类型和严重程度,动态调整模型的惩罚值。
|
||||
# 关键错误(如网络连接、服务器错误)会获得更高的惩罚,
|
||||
# 促使负载均衡算法在下次选择时优先规避这些不可靠的模型。
|
||||
CRITICAL_PENALTY_MULTIPLIER = 5 # 关键错误时的惩罚系数
|
||||
default_penalty_increment = 1 # 普通错误时的基础惩罚
|
||||
|
||||
penalty_increment = default_penalty_increment
|
||||
|
||||
if isinstance(e, NetworkConnectionError):
|
||||
# 网络连接问题表明模型服务器不稳定,增加较高惩罚
|
||||
penalty_increment = CRITICAL_PENALTY_MULTIPLIER
|
||||
# 修复 NameError: model_name 在此处未定义,应使用 model_info.name
|
||||
logger.warning(f"模型 '{model_info.name}' 发生网络连接错误,增加惩罚值: {penalty_increment}")
|
||||
elif isinstance(e, ReqAbortException):
|
||||
# 请求被中止,可能是服务器端原因或服务不稳定,增加较高惩罚
|
||||
penalty_increment = CRITICAL_PENALTY_MULTIPLIER
|
||||
# 修复 NameError: model_name 在此处未定义,应使用 model_info.name
|
||||
logger.warning(f"模型 '{model_info.name}' 请求被中止,增加惩罚值: {penalty_increment}")
|
||||
elif isinstance(e, RespNotOkException):
|
||||
if e.status_code >= 500:
|
||||
# 服务器错误 (5xx) 表明服务器端问题,应显著增加惩罚
|
||||
penalty_increment = CRITICAL_PENALTY_MULTIPLIER
|
||||
logger.warning(f"模型 '{model_name}' 发生服务器错误 (状态码: {e.status_code}),增加惩罚值: {penalty_increment}")
|
||||
elif e.status_code == 429:
|
||||
# 请求过于频繁,是暂时性问题,但仍需惩罚,此处使用默认基础值
|
||||
# penalty_increment = 2 # 可以选择一个中间值,例如2,表示比普通错误重,但比关键错误轻
|
||||
logger.warning(f"模型 '{model_name}' 请求过于频繁 (状态码: {e.status_code}),增加基础惩罚值: {penalty_increment}")
|
||||
else:
|
||||
# 其他客户端错误 (4xx)。通常不重试,_handle_resp_not_ok 会处理。
|
||||
# 如果 _handle_resp_not_ok 返回 retry_interval, 则进入这里的 exception 块。
|
||||
logger.warning(f"模型 '{model_name}' 发生非致命的响应错误 (状态码: {e.status_code}),增加基础惩罚值: {penalty_increment}")
|
||||
else:
|
||||
# 其他未捕获的异常,增加基础惩罚
|
||||
logger.warning(f"模型 '{model_name}' 发生未知异常: {type(e).__name__},增加基础惩罚值: {penalty_increment}")
|
||||
|
||||
self.model_usage[model_info.name] = (total_tokens, penalty + penalty_increment, usage_penalty)
|
||||
# --- 结束增强 ---
|
||||
# 移除冗余的、错误的惩罚值更新行,保留上面正确的动态惩罚更新
|
||||
# self.model_usage[model_info.name] = (total_tokens, penalty + 1, usage_penalty)
|
||||
|
||||
wait_interval, compressed_messages = self._default_exception_handler(
|
||||
e,
|
||||
|
||||
Reference in New Issue
Block a user