diff --git a/src/plugins/models/utils_model.py b/src/plugins/models/utils_model.py index 5ab82b42b..7c87cf946 100644 --- a/src/plugins/models/utils_model.py +++ b/src/plugins/models/utils_model.py @@ -178,395 +178,6 @@ class LLMRequest: output_cost = (completion_tokens / 1000000) * self.pri_out return round(input_cost + output_cost, 6) - ''' - async def _execute_request( - self, - endpoint: str, - prompt: str = None, - image_base64: str = None, - image_format: str = None, - payload: dict = None, - retry_policy: dict = None, - response_handler: callable = None, - user_id: str = "system", - request_type: str = None, - ): - """统一请求执行入口 - Args: - endpoint: API端点路径 (如 "chat/completions") - prompt: prompt文本 - image_base64: 图片的base64编码 - image_format: 图片格式 - payload: 请求体数据 - retry_policy: 自定义重试策略 - response_handler: 自定义响应处理器 - user_id: 用户ID - request_type: 请求类型 - """ - - if request_type is None: - request_type = self.request_type - - # 合并重试策略 - default_retry = { - "max_retries": 3, - "base_wait": 10, - "retry_codes": [429, 413, 500, 503], - "abort_codes": [400, 401, 402, 403], - } - policy = {**default_retry, **(retry_policy or {})} - - # 常见Error Code Mapping - error_code_mapping = { - 400: "参数不正确", - 401: "API key 错误,认证失败,请检查/config/bot_config.toml和.env中的配置是否正确哦~", - 402: "账号余额不足", - 403: "需要实名,或余额不足", - 404: "Not Found", - 429: "请求过于频繁,请稍后再试", - 500: "服务器内部故障", - 503: "服务器负载过高", - } - - api_url = f"{self.base_url.rstrip('/')}/{endpoint.lstrip('/')}" - # 判断是否为流式 - stream_mode = self.stream - # logger_msg = "进入流式输出模式," if stream_mode else "" - # logger.debug(f"{logger_msg}发送请求到URL: {api_url}") - # logger.info(f"使用模型: {self.model_name}") - - # 构建请求体 - if image_base64: - payload = await self._build_payload(prompt, image_base64, image_format) - elif payload is None: - payload = await self._build_payload(prompt) - - # 流式输出标志 - # 先构建payload,再添加流式输出标志 - if stream_mode: - payload["stream"] = stream_mode - - for retry in range(policy["max_retries"]): - try: - # 使用上下文管理器处理会话 - headers = await self._build_headers() - # 似乎是openai流式必须要的东西,不过阿里云的qwq-plus加了这个没有影响 - if stream_mode: - headers["Accept"] = "text/event-stream" - - async with aiohttp.ClientSession() as session: - try: - async with session.post(api_url, headers=headers, json=payload) as response: - # 处理需要重试的状态码 - if response.status in policy["retry_codes"]: - wait_time = policy["base_wait"] * (2**retry) - logger.warning( - f"模型 {self.model_name} 错误码: {response.status}, 等待 {wait_time}秒后重试" - ) - if response.status == 413: - logger.warning("请求体过大,尝试压缩...") - image_base64 = compress_base64_image_by_scale(image_base64) - payload = await self._build_payload(prompt, image_base64, image_format) - elif response.status in [500, 503]: - logger.error( - f"模型 {self.model_name} 错误码: {response.status} - {error_code_mapping.get(response.status)}" - ) - raise RuntimeError("服务器负载过高,模型恢复失败QAQ") - else: - logger.warning(f"模型 {self.model_name} 请求限制(429),等待{wait_time}秒后重试...") - - await asyncio.sleep(wait_time) - continue - elif response.status in policy["abort_codes"]: - logger.error( - f"模型 {self.model_name} 错误码: {response.status} - {error_code_mapping.get(response.status)}" - ) - # 尝试获取并记录服务器返回的详细错误信息 - try: - error_json = await response.json() - if error_json and isinstance(error_json, list) and len(error_json) > 0: - for error_item in error_json: - if "error" in error_item and isinstance(error_item["error"], dict): - error_obj = error_item["error"] - error_code = error_obj.get("code") - error_message = error_obj.get("message") - error_status = error_obj.get("status") - logger.error( - f"服务器错误详情: 代码={error_code}, 状态={error_status}, " - f"消息={error_message}" - ) - elif isinstance(error_json, dict) and "error" in error_json: - # 处理单个错误对象的情况 - error_obj = error_json.get("error", {}) - error_code = error_obj.get("code") - error_message = error_obj.get("message") - error_status = error_obj.get("status") - logger.error( - f"服务器错误详情: 代码={error_code}, 状态={error_status}, 消息={error_message}" - ) - else: - # 记录原始错误响应内容 - logger.error(f"服务器错误响应: {error_json}") - except Exception as e: - logger.warning(f"无法解析服务器错误响应: {str(e)}") - - if response.status == 403: - # 只针对硅基流动的V3和R1进行降级处理 - if ( - self.model_name.startswith("Pro/deepseek-ai") - and self.base_url == "https://api.siliconflow.cn/v1/" - ): - old_model_name = self.model_name - self.model_name = self.model_name[4:] # 移除"Pro/"前缀 - logger.warning( - f"检测到403错误,模型从 {old_model_name} 降级为 {self.model_name}" - ) - - # 对全局配置进行更新 - if global_config.llm_normal.get("name") == old_model_name: - global_config.llm_normal["name"] = self.model_name - logger.warning(f"将全局配置中的 llm_normal 模型临时降级至{self.model_name}") - - if global_config.llm_reasoning.get("name") == old_model_name: - global_config.llm_reasoning["name"] = self.model_name - logger.warning( - f"将全局配置中的 llm_reasoning 模型临时降级至{self.model_name}" - ) - - # 更新payload中的模型名 - if payload and "model" in payload: - payload["model"] = self.model_name - - # 重新尝试请求 - retry -= 1 # 不计入重试次数 - continue - - raise RuntimeError(f"请求被拒绝: {error_code_mapping.get(response.status)}") - - response.raise_for_status() - reasoning_content = "" - - # 将流式输出转化为非流式输出 - if stream_mode: - flag_delta_content_finished = False - accumulated_content = "" - usage = None # 初始化usage变量,避免未定义错误 - - async for line_bytes in response.content: - try: - line = line_bytes.decode("utf-8").strip() - if not line: - continue - if line.startswith("data:"): - data_str = line[5:].strip() - if data_str == "[DONE]": - break - try: - chunk = json.loads(data_str) - if flag_delta_content_finished: - chunk_usage = chunk.get("usage", None) - if chunk_usage: - usage = chunk_usage # 获取token用量 - else: - delta = chunk["choices"][0]["delta"] - delta_content = delta.get("content") - if delta_content is None: - delta_content = "" - accumulated_content += delta_content - # 检测流式输出文本是否结束 - finish_reason = chunk["choices"][0].get("finish_reason") - if delta.get("reasoning_content", None): - reasoning_content += delta["reasoning_content"] - if finish_reason == "stop": - chunk_usage = chunk.get("usage", None) - if chunk_usage: - usage = chunk_usage - break - # 部分平台在文本输出结束前不会返回token用量,此时需要再获取一次chunk - flag_delta_content_finished = True - - except Exception as e: - logger.exception(f"模型 {self.model_name} 解析流式输出错误: {str(e)}") - except GeneratorExit: - logger.warning("模型 {self.model_name} 流式输出被中断,正在清理资源...") - # 确保资源被正确清理 - await response.release() - # 返回已经累积的内容 - result = { - "choices": [ - { - "message": { - "content": accumulated_content, - "reasoning_content": reasoning_content, - # 流式输出可能没有工具调用,此处不需要添加tool_calls字段 - } - } - ], - "usage": usage, - } - return ( - response_handler(result) - if response_handler - else self._default_response_handler(result, user_id, request_type, endpoint) - ) - except Exception as e: - logger.error(f"模型 {self.model_name} 处理流式输出时发生错误: {str(e)}") - # 确保在发生错误时也能正确清理资源 - try: - await response.release() - except Exception as cleanup_error: - logger.error(f"清理资源时发生错误: {cleanup_error}") - # 返回已经累积的内容 - result = { - "choices": [ - { - "message": { - "content": accumulated_content, - "reasoning_content": reasoning_content, - # 流式输出可能没有工具调用,此处不需要添加tool_calls字段 - } - } - ], - "usage": usage, - } - return ( - response_handler(result) - if response_handler - else self._default_response_handler(result, user_id, request_type, endpoint) - ) - content = accumulated_content - think_match = re.search(r"(.*?)", content, re.DOTALL) - if think_match: - reasoning_content = think_match.group(1).strip() - content = re.sub(r".*?", "", content, flags=re.DOTALL).strip() - # 构造一个伪result以便调用自定义响应处理器或默认处理器 - result = { - "choices": [ - { - "message": { - "content": content, - "reasoning_content": reasoning_content, - # 流式输出可能没有工具调用,此处不需要添加tool_calls字段 - } - } - ], - "usage": usage, - } - return ( - response_handler(result) - if response_handler - else self._default_response_handler(result, user_id, request_type, endpoint) - ) - else: - result = await response.json() - # 使用自定义处理器或默认处理 - return ( - response_handler(result) - if response_handler - else self._default_response_handler(result, user_id, request_type, endpoint) - ) - - except (aiohttp.ClientError, asyncio.TimeoutError) as e: - if retry < policy["max_retries"] - 1: - wait_time = policy["base_wait"] * (2**retry) - logger.error(f"模型 {self.model_name} 网络错误,等待{wait_time}秒后重试... 错误: {str(e)}") - await asyncio.sleep(wait_time) - continue - else: - logger.critical(f"模型 {self.model_name} 网络错误达到最大重试次数: {str(e)}") - raise RuntimeError(f"网络请求失败: {str(e)}") from e - except Exception as e: - logger.critical(f"模型 {self.model_name} 未预期的错误: {str(e)}") - raise RuntimeError(f"请求过程中发生错误: {str(e)}") from e - - except aiohttp.ClientResponseError as e: - # 处理aiohttp抛出的响应错误 - if retry < policy["max_retries"] - 1: - wait_time = policy["base_wait"] * (2**retry) - logger.error( - f"模型 {self.model_name} HTTP响应错误,等待{wait_time}秒后重试... 状态码: {e.status}, 错误: {e.message}" - ) - try: - if hasattr(e, "response") and e.response and hasattr(e.response, "text"): - error_text = await e.response.text() - try: - error_json = json.loads(error_text) - if isinstance(error_json, list) and len(error_json) > 0: - for error_item in error_json: - if "error" in error_item and isinstance(error_item["error"], dict): - error_obj = error_item["error"] - logger.error( - f"模型 {self.model_name} 服务器错误详情: 代码={error_obj.get('code')}, " - f"状态={error_obj.get('status')}, " - f"消息={error_obj.get('message')}" - ) - elif isinstance(error_json, dict) and "error" in error_json: - error_obj = error_json.get("error", {}) - logger.error( - f"模型 {self.model_name} 服务器错误详情: 代码={error_obj.get('code')}, " - f"状态={error_obj.get('status')}, " - f"消息={error_obj.get('message')}" - ) - else: - logger.error(f"模型 {self.model_name} 服务器错误响应: {error_json}") - except (json.JSONDecodeError, TypeError) as json_err: - logger.warning( - f"模型 {self.model_name} 响应不是有效的JSON: {str(json_err)}, 原始内容: {error_text[:200]}" - ) - except (AttributeError, TypeError, ValueError) as parse_err: - logger.warning(f"模型 {self.model_name} 无法解析响应错误内容: {str(parse_err)}") - - await asyncio.sleep(wait_time) - else: - logger.critical( - f"模型 {self.model_name} HTTP响应错误达到最大重试次数: 状态码: {e.status}, 错误: {e.message}" - ) - # 安全地检查和记录请求详情 - if ( - image_base64 - and payload - and isinstance(payload, dict) - and "messages" in payload - and len(payload["messages"]) > 0 - ): - if isinstance(payload["messages"][0], dict) and "content" in payload["messages"][0]: - content = payload["messages"][0]["content"] - if isinstance(content, list) and len(content) > 1 and "image_url" in content[1]: - payload["messages"][0]["content"][1]["image_url"]["url"] = ( - f"data:image/{image_format.lower() if image_format else 'jpeg'};base64," - f"{image_base64[:10]}...{image_base64[-10:]}" - ) - logger.critical(f"请求头: {await self._build_headers(no_key=True)} 请求体: {payload}") - raise RuntimeError(f"模型 {self.model_name} API请求失败: 状态码 {e.status}, {e.message}") from e - except Exception as e: - if retry < policy["max_retries"] - 1: - wait_time = policy["base_wait"] * (2**retry) - logger.error(f"模型 {self.model_name} 请求失败,等待{wait_time}秒后重试... 错误: {str(e)}") - await asyncio.sleep(wait_time) - else: - logger.critical(f"模型 {self.model_name} 请求失败: {str(e)}") - # 安全地检查和记录请求详情 - if ( - image_base64 - and payload - and isinstance(payload, dict) - and "messages" in payload - and len(payload["messages"]) > 0 - ): - if isinstance(payload["messages"][0], dict) and "content" in payload["messages"][0]: - content = payload["messages"][0]["content"] - if isinstance(content, list) and len(content) > 1 and "image_url" in content[1]: - payload["messages"][0]["content"][1]["image_url"]["url"] = ( - f"data:image/{image_format.lower() if image_format else 'jpeg'};base64," - f"{image_base64[:10]}...{image_base64[-10:]}" - ) - logger.critical(f"请求头: {await self._build_headers(no_key=True)} 请求体: {payload}") - raise RuntimeError(f"模型 {self.model_name} API请求失败: {str(e)}") from e - - logger.error(f"模型 {self.model_name} 达到最大重试次数,请求仍然失败") - raise RuntimeError(f"模型 {self.model_name} 达到最大重试次数,API请求仍然失败") - ''' - async def _prepare_request( self, endpoint: str,