diff --git a/src/llm_models/utils_model.py b/src/llm_models/utils_model.py index 8a8ff1965..af8ef225f 100644 --- a/src/llm_models/utils_model.py +++ b/src/llm_models/utils_model.py @@ -5,7 +5,7 @@ import random from enum import Enum from rich.traceback import install -from typing import Tuple, List, Dict, Optional, Callable, Any, Coroutine +from typing import Tuple, List, Dict, Optional, Callable, Any, Coroutine, Generator from src.common.logger import get_logger from src.config.config import model_config @@ -284,34 +284,34 @@ class LLMRequest: tools: Optional[List[Dict[str, Any]]] = None, raise_when_empty: bool = True, ) -> Tuple[str, Tuple[str, str, Optional[List[ToolCall]]]]: - """执行单次请求""" - # 模型选择和请求准备 - start_time = time.time() - model_info, api_provider, client = self._select_model() - model_name = model_info.name - - # 检查是否启用反截断 - use_anti_truncation = getattr(api_provider, "anti_truncation", False) - - processed_prompt = prompt - if use_anti_truncation: - processed_prompt += self.anti_truncation_instruction - logger.info(f"{api_provider} '{self.task_name}' 已启用反截断功能") - - processed_prompt = self._apply_content_obfuscation(processed_prompt, api_provider) - - message_builder = MessageBuilder() - message_builder.add_text_content(processed_prompt) - messages = [message_builder.build()] - tool_built = self._build_tool_options(tools) - - # 空回复重试逻辑 - empty_retry_count = 0 - max_empty_retry = api_provider.max_retry - empty_retry_interval = api_provider.retry_interval - - while empty_retry_count <= max_empty_retry: + """ + 执行单次请求,并在模型失败时按顺序切换到下一个可用模型。 + """ + failed_models = set() + last_exception: Optional[Exception] = None + + model_scheduler = self._model_scheduler(failed_models) + + for model_info, api_provider, client in model_scheduler: + start_time = time.time() + model_name = model_info.name + logger.info(f"正在尝试使用模型: {model_name}") + try: + # 检查是否启用反截断 + use_anti_truncation = getattr(api_provider, "anti_truncation", False) + processed_prompt = prompt + if use_anti_truncation: + processed_prompt += self.anti_truncation_instruction + logger.info(f"'{model_name}' for task '{self.task_name}' 已启用反截断功能") + + processed_prompt = self._apply_content_obfuscation(processed_prompt, api_provider) + + message_builder = MessageBuilder() + message_builder.add_text_content(processed_prompt) + messages = [message_builder.build()] + tool_built = self._build_tool_options(tools) + response = await self._execute_request( api_provider=api_provider, client=client, @@ -322,93 +322,80 @@ class LLMRequest: temperature=temperature, max_tokens=max_tokens, ) + content = response.content or "" reasoning_content = response.reasoning_content or "" tool_calls = response.tool_calls - # 从内容中提取标签的推理内容(向后兼容) + if not reasoning_content and content: content, extracted_reasoning = self._extract_reasoning(content) reasoning_content = extracted_reasoning - - is_empty_reply = False - is_truncated = False - # 检测是否为空回复或截断 - if not tool_calls: - is_empty_reply = not content or content.strip() == "" - is_truncated = False + is_empty_reply = not tool_calls and (not content or content.strip() == "") + is_truncated = False if use_anti_truncation: if content.endswith("[done]"): content = content[:-6].strip() - logger.debug("检测到并已移除 [done] 标记") else: is_truncated = True - logger.warning("未检测到 [done] 标记,判定为截断") - + if is_empty_reply or is_truncated: - if empty_retry_count < max_empty_retry: - empty_retry_count += 1 - reason = "空回复" if is_empty_reply else "截断" - logger.warning(f"检测到{reason},正在进行第 {empty_retry_count}/{max_empty_retry} 次重新生成") + # 空回复或截断不进行模型切换,仅记录错误后抛出或返回 + reason = "空回复" if is_empty_reply else "截断" + msg = f"模型 '{model_name}' 生成了{reason}的回复" + logger.error(msg) + if raise_when_empty: + raise RuntimeError(msg) + return msg, (reasoning_content, model_name, tool_calls) - if empty_retry_interval > 0: - await asyncio.sleep(empty_retry_interval) - model_info, api_provider, client = self._select_model() - continue - else: - # 已达到最大重试次数,但仍然是空回复或截断 - reason = "空回复" if is_empty_reply else "截断" - # 抛出异常,由外层重试逻辑或最终的异常处理器捕获 - raise RuntimeError(f"经过 {max_empty_retry + 1} 次尝试后仍然是{reason}的回复") - - # 记录使用情况 + # 成功获取响应 if usage := response.usage: llm_usage_recorder.record_usage_to_database( - model_info=model_info, - model_usage=usage, - time_cost=time.time() - start_time, - user_id="system", - request_type=self.request_type, - endpoint="/chat/completions", + model_info=model_info, model_usage=usage, time_cost=time.time() - start_time, + user_id="system", request_type=self.request_type, endpoint="/chat/completions", ) - # 处理空回复 if not content and not tool_calls: if raise_when_empty: - raise RuntimeError(f"经过 {empty_retry_count} 次重试后仍然生成空回复") - content = "生成的响应为空,请检查模型配置或输入内容是否正确" - elif empty_retry_count > 0: - logger.info(f"经过 {empty_retry_count} 次重试后成功生成回复") + raise RuntimeError("生成空回复") + content = "生成的响应为空" + + logger.info(f"模型 '{model_name}' 成功生成回复。") + return content, (reasoning_content, model_name, tool_calls) - return content, (reasoning_content, model_info.name, tool_calls) - - except Exception as e: - logger.error(f"请求执行失败: {e}") - if raise_when_empty: - # 在非并发模式下,如果第一次尝试就失败,则直接抛出异常 - if empty_retry_count == 0: - raise - - # 如果在重试过程中失败,则继续重试 - empty_retry_count += 1 - if empty_retry_count <= max_empty_retry: - logger.warning(f"请求失败,将在 {empty_retry_interval} 秒后进行第 {empty_retry_count}/{max_empty_retry} 次重试...") - if empty_retry_interval > 0: - await asyncio.sleep(empty_retry_interval) - continue - else: - logger.error(f"经过 {max_empty_retry} 次重试后仍然失败") - raise RuntimeError(f"经过 {max_empty_retry} 次重试后仍然无法生成有效回复") from e + except RespNotOkException as e: + if e.status_code in [401, 403]: + logger.error(f"模型 '{model_name}' 遇到认证/权限错误 (Code: {e.status_code}),将尝试下一个模型。") + failed_models.add(model_name) + last_exception = e + continue else: - # 在并发模式下,单个请求的失败不应中断整个并发流程, - # 而是将异常返回给调用者(即 execute_concurrently)进行统一处理 - raise # 重新抛出异常,由 execute_concurrently 中的 gather 捕获 - - # 重试失败 + # 对于其他HTTP错误,不切换模型,直接抛出 + logger.error(f"模型 '{model_name}' 请求失败,HTTP状态码: {e.status_code}") + last_exception = e + if raise_when_empty: + raise + break + except RuntimeError as e: + logger.error(f"模型 '{model_name}' 在所有重试后仍然失败: {e},将尝试下一个模型。") + failed_models.add(model_name) + last_exception = e + continue + except Exception as e: + logger.error(f"使用模型 '{model_name}' 时发生未知异常: {e}") + failed_models.add(model_name) + last_exception = e + continue + + # 所有模型都尝试失败 + logger.error("所有可用模型都已尝试失败。") if raise_when_empty: - raise RuntimeError(f"经过 {max_empty_retry} 次重试后仍然无法生成有效回复") - return "生成的响应为空,请检查模型配置或输入内容是否正确", ("", model_name, None) + if last_exception: + raise RuntimeError("所有模型都请求失败") from last_exception + raise RuntimeError("所有模型都请求失败,且没有具体的异常信息") + + return "所有模型都请求失败", ("", "unknown", None) async def get_embedding(self, embedding_input: str) -> Tuple[List[float], str]: """获取嵌入向量 @@ -448,9 +435,24 @@ class LLMRequest: return embedding, model_info.name + def _model_scheduler(self, failed_models: set) -> Generator[Tuple[ModelInfo, APIProvider, BaseClient], None, None]: + """ + 一个模型调度器,按顺序提供模型,并跳过已失败的模型。 + """ + for model_name in self.model_for_task.model_list: + if model_name in failed_models: + continue + + model_info = model_config.get_model_info(model_name) + api_provider = model_config.get_provider(model_info.api_provider) + force_new_client = (self.request_type == "embedding") + client = client_registry.get_client_class_instance(api_provider, force_new=force_new_client) + + yield model_info, api_provider, client + def _select_model(self) -> Tuple[ModelInfo, APIProvider, BaseClient]: """ - 根据总tokens和惩罚值选择的模型 + 根据总tokens和惩罚值选择的模型 (负载均衡) """ least_used_model_name = min( self.model_usage,