From be7997e1b7cb6d47f7f7ce746bd7540a5c643e19 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=98=A5=E6=B2=B3=E6=99=B4?= Date: Fri, 14 Mar 2025 14:08:09 +0900 Subject: [PATCH] =?UTF-8?q?refactor:=20=E6=94=B9=E8=BF=9B=E9=94=99?= =?UTF-8?q?=E8=AF=AF=E5=A4=84=E7=90=86=E5=92=8C=E4=BB=A3=E7=A0=81=E6=A0=BC?= =?UTF-8?q?=E5=BC=8F=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 增强API错误响应解析,添加详细错误日志 - 优化HTTP客户端响应错误处理逻辑 - 规范代码格式,调整函数参数和字典格式 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- src/plugins/models/utils_model.py | 264 ++++++++++++++++++++---------- 1 file changed, 173 insertions(+), 91 deletions(-) diff --git a/src/plugins/models/utils_model.py b/src/plugins/models/utils_model.py index 49c335eb2..d4d57a93d 100644 --- a/src/plugins/models/utils_model.py +++ b/src/plugins/models/utils_model.py @@ -47,9 +47,15 @@ class LLM_request: except Exception: logger.error("创建数据库索引失败") - def _record_usage(self, prompt_tokens: int, completion_tokens: int, total_tokens: int, - user_id: str = "system", request_type: str = "chat", - endpoint: str = "/chat/completions"): + def _record_usage( + self, + prompt_tokens: int, + completion_tokens: int, + total_tokens: int, + user_id: str = "system", + request_type: str = "chat", + endpoint: str = "/chat/completions", + ): """记录模型使用情况到数据库 Args: prompt_tokens: 输入token数 @@ -70,7 +76,7 @@ class LLM_request: "total_tokens": total_tokens, "cost": self._calculate_cost(prompt_tokens, completion_tokens), "status": "success", - "timestamp": datetime.now() + "timestamp": datetime.now(), } db.llm_usage.insert_one(usage_data) logger.info( @@ -85,11 +91,11 @@ class LLM_request: def _calculate_cost(self, prompt_tokens: int, completion_tokens: int) -> float: """计算API调用成本 使用模型的pri_in和pri_out价格计算输入和输出的成本 - + Args: prompt_tokens: 输入token数量 completion_tokens: 输出token数量 - + Returns: float: 总成本(元) """ @@ -99,16 +105,16 @@ class LLM_request: return round(input_cost + output_cost, 6) async def _execute_request( - self, - endpoint: str, - prompt: str = None, - image_base64: str = None, - image_format: str = None, - payload: dict = None, - retry_policy: dict = None, - response_handler: callable = None, - user_id: str = "system", - request_type: str = "chat" + self, + endpoint: str, + prompt: str = None, + image_base64: str = None, + image_format: str = None, + payload: dict = None, + retry_policy: dict = None, + response_handler: callable = None, + user_id: str = "system", + request_type: str = "chat", ): """统一请求执行入口 Args: @@ -124,9 +130,11 @@ class LLM_request: """ # 合并重试策略 default_retry = { - "max_retries": 3, "base_wait": 15, + "max_retries": 3, + "base_wait": 15, "retry_codes": [429, 413, 500, 503], - "abort_codes": [400, 401, 402, 403]} + "abort_codes": [400, 401, 402, 403], + } policy = {**default_retry, **(retry_policy or {})} # 常见Error Code Mapping @@ -138,7 +146,7 @@ class LLM_request: 404: "Not Found", 429: "请求过于频繁,请稍后再试", 500: "服务器内部故障", - 503: "服务器负载过高" + 503: "服务器负载过高", } api_url = f"{self.base_url.rstrip('/')}/{endpoint.lstrip('/')}" @@ -168,7 +176,7 @@ class LLM_request: async with session.post(api_url, headers=headers, json=payload) as response: # 处理需要重试的状态码 if response.status in policy["retry_codes"]: - wait_time = policy["base_wait"] * (2 ** retry) + wait_time = policy["base_wait"] * (2**retry) logger.warning(f"错误码: {response.status}, 等待 {wait_time}秒后重试") if response.status == 413: logger.warning("请求体过大,尝试压缩...") @@ -184,26 +192,56 @@ class LLM_request: continue elif response.status in policy["abort_codes"]: logger.error(f"错误码: {response.status} - {error_code_mapping.get(response.status)}") + # 尝试获取并记录服务器返回的详细错误信息 + try: + error_json = await response.json() + if error_json and isinstance(error_json, list) and len(error_json) > 0: + for error_item in error_json: + if "error" in error_item and isinstance(error_item["error"], dict): + error_obj = error_item["error"] + error_code = error_obj.get("code") + error_message = error_obj.get("message") + error_status = error_obj.get("status") + logger.error( + f"服务器错误详情: 代码={error_code}, 状态={error_status}, 消息={error_message}" + ) + elif isinstance(error_json, dict) and "error" in error_json: + # 处理单个错误对象的情况 + error_obj = error_json.get("error", {}) + error_code = error_obj.get("code") + error_message = error_obj.get("message") + error_status = error_obj.get("status") + logger.error( + f"服务器错误详情: 代码={error_code}, 状态={error_status}, 消息={error_message}" + ) + else: + # 记录原始错误响应内容 + logger.error(f"服务器错误响应: {error_json}") + except Exception as e: + logger.warning(f"无法解析服务器错误响应: {str(e)}") + if response.status == 403: # 只针对硅基流动的V3和R1进行降级处理 - if self.model_name.startswith( - "Pro/deepseek-ai") and self.base_url == "https://api.siliconflow.cn/v1/": + if ( + self.model_name.startswith("Pro/deepseek-ai") + and self.base_url == "https://api.siliconflow.cn/v1/" + ): old_model_name = self.model_name self.model_name = self.model_name[4:] # 移除"Pro/"前缀 logger.warning(f"检测到403错误,模型从 {old_model_name} 降级为 {self.model_name}") # 对全局配置进行更新 - if global_config.llm_normal.get('name') == old_model_name: - global_config.llm_normal['name'] = self.model_name + if global_config.llm_normal.get("name") == old_model_name: + global_config.llm_normal["name"] = self.model_name logger.warning(f"将全局配置中的 llm_normal 模型临时降级至{self.model_name}") - if global_config.llm_reasoning.get('name') == old_model_name: - global_config.llm_reasoning['name'] = self.model_name + if global_config.llm_reasoning.get("name") == old_model_name: + global_config.llm_reasoning["name"] = self.model_name logger.warning(f"将全局配置中的 llm_reasoning 模型临时降级至{self.model_name}") # 更新payload中的模型名 - if payload and 'model' in payload: - payload['model'] = self.model_name + if payload and "model" in payload: + payload["model"] = self.model_name # 重新尝试请求 retry -= 1 # 不计入重试次数 @@ -248,32 +286,75 @@ class LLM_request: logger.exception("解析流式输出错误") content = accumulated_content reasoning_content = "" - think_match = re.search(r'(.*?)', content, re.DOTALL) + think_match = re.search(r"(.*?)", content, re.DOTALL) if think_match: reasoning_content = think_match.group(1).strip() - content = re.sub(r'.*?', '', content, flags=re.DOTALL).strip() + content = re.sub(r".*?", "", content, flags=re.DOTALL).strip() # 构造一个伪result以便调用自定义响应处理器或默认处理器 result = { "choices": [{"message": {"content": content, "reasoning_content": reasoning_content}}], - "usage": usage} - return response_handler(result) if response_handler else self._default_response_handler( - result, user_id, request_type, endpoint) + "usage": usage, + } + return ( + response_handler(result) + if response_handler + else self._default_response_handler(result, user_id, request_type, endpoint) + ) else: result = await response.json() # 使用自定义处理器或默认处理 - return response_handler(result) if response_handler else self._default_response_handler( - result, user_id, request_type, endpoint) + return ( + response_handler(result) + if response_handler + else self._default_response_handler(result, user_id, request_type, endpoint) + ) + except aiohttp.ClientResponseError as e: + # 处理aiohttp抛出的响应错误 + if retry < policy["max_retries"] - 1: + wait_time = policy["base_wait"] * (2**retry) + logger.error(f"HTTP响应错误,等待{wait_time}秒后重试... 状态码: {e.status}, 错误: {e.message}") + try: + if hasattr(e, "history") and e.history and hasattr(e.history[0], "text"): + error_text = await e.history[0].text() + error_json = json.loads(error_text) + if isinstance(error_json, list) and len(error_json) > 0: + for error_item in error_json: + if "error" in error_item and isinstance(error_item["error"], dict): + error_obj = error_item["error"] + logger.error( + f"服务器错误详情: 代码={error_obj.get('code')}, 状态={error_obj.get('status')}, 消息={error_obj.get('message')}" + ) + elif isinstance(error_json, dict) and "error" in error_json: + error_obj = error_json.get("error", {}) + logger.error( + f"服务器错误详情: 代码={error_obj.get('code')}, 状态={error_obj.get('status')}, 消息={error_obj.get('message')}" + ) + else: + logger.error(f"服务器错误响应: {error_json}") + except Exception as parse_err: + logger.warning(f"无法解析响应错误内容: {str(parse_err)}") + + await asyncio.sleep(wait_time) + else: + logger.critical(f"HTTP响应错误达到最大重试次数: 状态码: {e.status}, 错误: {e.message}") + if image_base64: + payload["messages"][0]["content"][1]["image_url"]["url"] = ( + f"data:image/{image_format.lower()};base64,{image_base64[:10]}...{image_base64[-10:]}" + ) + logger.critical(f"请求头: {await self._build_headers(no_key=True)} 请求体: {payload}") + raise RuntimeError(f"API请求失败: 状态码 {e.status}, {e.message}") except Exception as e: if retry < policy["max_retries"] - 1: - wait_time = policy["base_wait"] * (2 ** retry) + wait_time = policy["base_wait"] * (2**retry) logger.error(f"请求失败,等待{wait_time}秒后重试... 错误: {str(e)}") await asyncio.sleep(wait_time) else: logger.critical(f"请求失败: {str(e)}") if image_base64: - payload["messages"][0]["content"][1]["image_url"][ - "url"] = f"data:image/{image_format.lower()};base64,{image_base64[:10]}...{image_base64[-10:]}" + payload["messages"][0]["content"][1]["image_url"]["url"] = ( + f"data:image/{image_format.lower()};base64,{image_base64[:10]}...{image_base64[-10:]}" + ) logger.critical(f"请求头: {await self._build_headers(no_key=True)} 请求体: {payload}") raise RuntimeError(f"API请求失败: {str(e)}") @@ -289,8 +370,15 @@ class LLM_request: # 复制一份参数,避免直接修改原始数据 new_params = dict(params) # 定义需要转换的模型列表 - models_needing_transformation = ["o3-mini", "o1-mini", "o1-preview", "o1-2024-12-17", "o1-preview-2024-09-12", - "o3-mini-2025-01-31", "o1-mini-2024-09-12"] + models_needing_transformation = [ + "o3-mini", + "o1-mini", + "o1-preview", + "o1-2024-12-17", + "o1-preview-2024-09-12", + "o3-mini-2025-01-31", + "o1-mini-2024-09-12", + ] if self.model_name.lower() in models_needing_transformation: # 删除 'temprature' 参数(如果存在) new_params.pop("temperature", None) @@ -311,29 +399,43 @@ class LLM_request: "role": "user", "content": [ {"type": "text", "text": prompt}, - {"type": "image_url", - "image_url": {"url": f"data:image/{image_format.lower()};base64,{image_base64}"}} - ] + { + "type": "image_url", + "image_url": {"url": f"data:image/{image_format.lower()};base64,{image_base64}"}, + }, + ], } ], "max_tokens": global_config.max_response_length, - **params_copy + **params_copy, } else: payload = { "model": self.model_name, "messages": [{"role": "user", "content": prompt}], "max_tokens": global_config.max_response_length, - **params_copy + **params_copy, } # 如果 payload 中依然存在 max_tokens 且需要转换,在这里进行再次检查 - if self.model_name.lower() in ["o3-mini", "o1-mini", "o1-preview", "o1-2024-12-17", "o1-preview-2024-09-12", - "o3-mini-2025-01-31", "o1-mini-2024-09-12"] and "max_tokens" in payload: + if ( + self.model_name.lower() + in [ + "o3-mini", + "o1-mini", + "o1-preview", + "o1-2024-12-17", + "o1-preview-2024-09-12", + "o3-mini-2025-01-31", + "o1-mini-2024-09-12", + ] + and "max_tokens" in payload + ): payload["max_completion_tokens"] = payload.pop("max_tokens") return payload - def _default_response_handler(self, result: dict, user_id: str = "system", - request_type: str = "chat", endpoint: str = "/chat/completions") -> Tuple: + def _default_response_handler( + self, result: dict, user_id: str = "system", request_type: str = "chat", endpoint: str = "/chat/completions" + ) -> Tuple: """默认响应解析""" if "choices" in result and result["choices"]: message = result["choices"][0]["message"] @@ -357,7 +459,7 @@ class LLM_request: total_tokens=total_tokens, user_id=user_id, request_type=request_type, - endpoint=endpoint + endpoint=endpoint, ) return content, reasoning_content @@ -366,8 +468,8 @@ class LLM_request: def _extract_reasoning(self, content: str) -> tuple[str, str]: """CoT思维链提取""" - match = re.search(r'(?:)?(.*?)', content, re.DOTALL) - content = re.sub(r'(?:)?.*?', '', content, flags=re.DOTALL, count=1).strip() + match = re.search(r"(?:)?(.*?)", content, re.DOTALL) + content = re.sub(r"(?:)?.*?", "", content, flags=re.DOTALL, count=1).strip() if match: reasoning = match.group(1).strip() else: @@ -377,34 +479,22 @@ class LLM_request: async def _build_headers(self, no_key: bool = False) -> dict: """构建请求头""" if no_key: - return { - "Authorization": "Bearer **********", - "Content-Type": "application/json" - } + return {"Authorization": "Bearer **********", "Content-Type": "application/json"} else: - return { - "Authorization": f"Bearer {self.api_key}", - "Content-Type": "application/json" - } + return {"Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json"} # 防止小朋友们截图自己的key async def generate_response(self, prompt: str) -> Tuple[str, str]: """根据输入的提示生成模型的异步响应""" - content, reasoning_content = await self._execute_request( - endpoint="/chat/completions", - prompt=prompt - ) + content, reasoning_content = await self._execute_request(endpoint="/chat/completions", prompt=prompt) return content, reasoning_content async def generate_response_for_image(self, prompt: str, image_base64: str, image_format: str) -> Tuple[str, str]: """根据输入的提示和图片生成模型的异步响应""" content, reasoning_content = await self._execute_request( - endpoint="/chat/completions", - prompt=prompt, - image_base64=image_base64, - image_format=image_format + endpoint="/chat/completions", prompt=prompt, image_base64=image_base64, image_format=image_format ) return content, reasoning_content @@ -415,22 +505,20 @@ class LLM_request: "model": self.model_name, "messages": [{"role": "user", "content": prompt}], "max_tokens": global_config.max_response_length, - **self.params + **self.params, } content, reasoning_content = await self._execute_request( - endpoint="/chat/completions", - payload=data, - prompt=prompt + endpoint="/chat/completions", payload=data, prompt=prompt ) return content, reasoning_content async def get_embedding(self, text: str) -> Union[list, None]: """异步方法:获取文本的embedding向量 - + Args: text: 需要获取embedding的文本 - + Returns: list: embedding向量,如果失败则返回None """ @@ -444,16 +532,9 @@ class LLM_request: embedding = await self._execute_request( endpoint="/embeddings", prompt=text, - payload={ - "model": self.model_name, - "input": text, - "encoding_format": "float" - }, - retry_policy={ - "max_retries": 2, - "base_wait": 6 - }, - response_handler=embedding_handler + payload={"model": self.model_name, "input": text, "encoding_format": "float"}, + retry_policy={"max_retries": 2, "base_wait": 6}, + response_handler=embedding_handler, ) return embedding @@ -502,32 +583,33 @@ def compress_base64_image_by_scale(base64_data: str, target_size: int = 0.8 * 10 # 保存到缓冲区 frames[0].save( output_buffer, - format='GIF', + format="GIF", save_all=True, append_images=frames[1:], optimize=True, - duration=img.info.get('duration', 100), - loop=img.info.get('loop', 0) + duration=img.info.get("duration", 100), + loop=img.info.get("loop", 0), ) else: # 处理静态图片 resized_img = img.resize((new_width, new_height), Image.Resampling.LANCZOS) # 保存到缓冲区,保持原始格式 - if img.format == 'PNG' and img.mode in ('RGBA', 'LA'): - resized_img.save(output_buffer, format='PNG', optimize=True) + if img.format == "PNG" and img.mode in ("RGBA", "LA"): + resized_img.save(output_buffer, format="PNG", optimize=True) else: - resized_img.save(output_buffer, format='JPEG', quality=95, optimize=True) + resized_img.save(output_buffer, format="JPEG", quality=95, optimize=True) # 获取压缩后的数据并转换为base64 compressed_data = output_buffer.getvalue() logger.success(f"压缩图片: {original_width}x{original_height} -> {new_width}x{new_height}") logger.info(f"压缩前大小: {len(image_data) / 1024:.1f}KB, 压缩后大小: {len(compressed_data) / 1024:.1f}KB") - return base64.b64encode(compressed_data).decode('utf-8') + return base64.b64encode(compressed_data).decode("utf-8") except Exception as e: logger.error(f"压缩图片失败: {str(e)}") import traceback + logger.error(traceback.format_exc()) return base64_data