fix(llm): 增强并发执行和异常处理的健壮性
- 为 `execute_concurrently` 添加协程函数检查,防止传入非异步函数导致运行时错误。 - 统一将日志中的异常对象显式转换为字符串,以获得更清晰的错误输出。 - 移除了针对特定模型供应商的 403 错误降级处理逻辑,该逻辑已过时。
This commit is contained in:
@@ -1,4 +1,5 @@
|
|||||||
import re
|
import re
|
||||||
|
import inspect
|
||||||
import asyncio
|
import asyncio
|
||||||
import time
|
import time
|
||||||
import random
|
import random
|
||||||
@@ -95,9 +96,13 @@ async def execute_concurrently(
|
|||||||
Raises:
|
Raises:
|
||||||
RuntimeError: 如果所有并发请求都失败。
|
RuntimeError: 如果所有并发请求都失败。
|
||||||
"""
|
"""
|
||||||
|
if not inspect.iscoroutinefunction(coro_callable):
|
||||||
|
err_msg = f"并发执行的函数 '{coro_callable.__name__}' 必须是协程函数 (async def)"
|
||||||
|
logger.error(err_msg)
|
||||||
|
raise TypeError(err_msg)
|
||||||
|
|
||||||
logger.info(f"启用并发请求模式,并发数: {concurrency_count}")
|
logger.info(f"启用并发请求模式,并发数: {concurrency_count}")
|
||||||
tasks = [coro_callable(*args, **kwargs) for _ in range(concurrency_count)]
|
tasks = [coro_callable(*args, **kwargs) for _ in range(concurrency_count)]
|
||||||
|
|
||||||
results = await asyncio.gather(*tasks, return_exceptions=True)
|
results = await asyncio.gather(*tasks, return_exceptions=True)
|
||||||
successful_results = [res for res in results if not isinstance(res, Exception)]
|
successful_results = [res for res in results if not isinstance(res, Exception)]
|
||||||
|
|
||||||
@@ -663,7 +668,7 @@ class LLMRequest:
|
|||||||
cannot_retry_msg=f"任务-'{task_name}' 模型-'{model_name}': 连接异常,超过最大重试次数,请检查网络连接状态或URL是否正确",
|
cannot_retry_msg=f"任务-'{task_name}' 模型-'{model_name}': 连接异常,超过最大重试次数,请检查网络连接状态或URL是否正确",
|
||||||
)
|
)
|
||||||
elif isinstance(e, ReqAbortException):
|
elif isinstance(e, ReqAbortException):
|
||||||
logger.warning(f"任务-'{task_name}' 模型-'{model_name}': 请求被中断,详细信息-{e}")
|
logger.warning(f"任务-'{task_name}' 模型-'{model_name}': 请求被中断,详细信息-{str(e)}")
|
||||||
return -1, None # 不再重试请求该模型
|
return -1, None # 不再重试请求该模型
|
||||||
elif isinstance(e, RespNotOkException):
|
elif isinstance(e, RespNotOkException):
|
||||||
return self._handle_resp_not_ok(
|
return self._handle_resp_not_ok(
|
||||||
@@ -677,7 +682,7 @@ class LLMRequest:
|
|||||||
)
|
)
|
||||||
elif isinstance(e, RespParseException):
|
elif isinstance(e, RespParseException):
|
||||||
# 响应解析错误
|
# 响应解析错误
|
||||||
logger.error(f"任务-'{task_name}' 模型-'{model_name}': 响应解析错误,错误信息-{e}")
|
logger.error(f"任务-'{task_name}' 模型-'{model_name}': 响应解析错误,错误信息-{str(e)}")
|
||||||
logger.debug(f"附加内容: {str(e.ext_info)}")
|
logger.debug(f"附加内容: {str(e.ext_info)}")
|
||||||
return -1, None # 不再重试请求该模型
|
return -1, None # 不再重试请求该模型
|
||||||
else:
|
else:
|
||||||
@@ -744,27 +749,10 @@ class LLMRequest:
|
|||||||
# 响应错误
|
# 响应错误
|
||||||
if e.status_code in [400, 401, 402, 403, 404]:
|
if e.status_code in [400, 401, 402, 403, 404]:
|
||||||
model_name = model_info.name
|
model_name = model_info.name
|
||||||
if (
|
|
||||||
e.status_code == 403
|
|
||||||
and model_name.startswith("Pro/deepseek-ai")
|
|
||||||
and api_provider.base_url == "https://api.siliconflow.cn/v1/"
|
|
||||||
):
|
|
||||||
old_model_name = model_name
|
|
||||||
new_model_name = model_name[4:]
|
|
||||||
model_info.name = new_model_name
|
|
||||||
logger.warning(f"检测到403错误,模型从 {old_model_name} 降级为 {new_model_name}")
|
|
||||||
# 更新任务配置中的模型列表
|
|
||||||
for i, m_name in enumerate(self.model_for_task.model_list):
|
|
||||||
if m_name == old_model_name:
|
|
||||||
self.model_for_task.model_list[i] = new_model_name
|
|
||||||
logger.warning(
|
|
||||||
f"将任务 {self.task_name} 的模型列表中的 {old_model_name} 临时降级至 {new_model_name}"
|
|
||||||
)
|
|
||||||
break
|
|
||||||
return 0, None # 立即重试
|
return 0, None # 立即重试
|
||||||
# 客户端错误
|
# 客户端错误
|
||||||
logger.warning(
|
logger.warning(
|
||||||
f"任务-'{task_name}' 模型-'{model_name}': 请求失败,错误代码-{e.status_code},错误信息-{e.message}"
|
f"任务-'{task_name}' 模型-'{model_name}': 请求失败,错误代码-{e.status_code},错误信息-{str(e)}"
|
||||||
)
|
)
|
||||||
return -1, None # 不再重试请求该模型
|
return -1, None # 不再重试请求该模型
|
||||||
elif e.status_code == 413:
|
elif e.status_code == 413:
|
||||||
@@ -800,7 +788,7 @@ class LLMRequest:
|
|||||||
else:
|
else:
|
||||||
# 未知错误
|
# 未知错误
|
||||||
logger.warning(
|
logger.warning(
|
||||||
f"任务-'{task_name}' 模型-'{model_name}': 未知错误,错误代码-{e.status_code},错误信息-{e.message}"
|
f"任务-'{task_name}' 模型-'{model_name}': 未知错误,错误代码-{e.status_code},错误信息-{str(e)}"
|
||||||
)
|
)
|
||||||
return -1, None
|
return -1, None
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user