添加并发请求支持,更新LLMRequest类以允许根据配置的并发数量进行异步请求
This commit is contained in:
@@ -94,6 +94,9 @@ class TaskConfig(ConfigBase):
|
||||
temperature: float = 0.3
|
||||
"""模型温度"""
|
||||
|
||||
concurrency_count: int = 1
|
||||
"""并发请求数量,默认为1(不并发)"""
|
||||
|
||||
|
||||
@dataclass
|
||||
class ModelTaskConfig(ConfigBase):
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
import re
|
||||
import asyncio
|
||||
import time
|
||||
import random
|
||||
|
||||
from enum import Enum
|
||||
from rich.traceback import install
|
||||
@@ -181,32 +182,72 @@ class LLMRequest:
|
||||
raise_when_empty: bool = True,
|
||||
) -> Tuple[str, Tuple[str, str, Optional[List[ToolCall]]]]:
|
||||
"""
|
||||
异步生成响应
|
||||
异步生成响应,支持并发请求
|
||||
Args:
|
||||
prompt (str): 提示词
|
||||
temperature (float, optional): 温度参数
|
||||
max_tokens (int, optional): 最大token数
|
||||
tools: 工具配置
|
||||
raise_when_empty: 是否在空回复时抛出异常
|
||||
Returns:
|
||||
(Tuple[str, str, str, Optional[List[ToolCall]]]): 响应内容、推理内容、模型名称、工具调用列表
|
||||
"""
|
||||
# 请求体构建
|
||||
start_time = time.time()
|
||||
|
||||
# 模型选择
|
||||
model_info, api_provider, client = self._select_model()
|
||||
# 检查是否需要并发请求
|
||||
concurrency_count = getattr(self.model_for_task, 'concurrency_count', 1)
|
||||
|
||||
# 🔥 内容混淆处理
|
||||
if concurrency_count <= 1:
|
||||
# 单次请求,原有逻辑
|
||||
return await self._execute_single_request(prompt, temperature, max_tokens, tools, raise_when_empty)
|
||||
|
||||
# 并发请求
|
||||
logger.info(f"启用并发请求模式,并发数: {concurrency_count}")
|
||||
tasks = [
|
||||
self._execute_single_request(prompt, temperature, max_tokens, tools, False)
|
||||
for _ in range(concurrency_count)
|
||||
]
|
||||
|
||||
try:
|
||||
results = await asyncio.gather(*tasks, return_exceptions=True)
|
||||
successful_results = []
|
||||
for result in results:
|
||||
if not isinstance(result, Exception):
|
||||
successful_results.append(result)
|
||||
|
||||
if successful_results:
|
||||
# 随机选择一个成功结果
|
||||
selected = random.choice(successful_results) if len(successful_results) > 1 else successful_results[0]
|
||||
logger.info(f"并发请求完成,从{len(successful_results)}个成功结果中选择了一个")
|
||||
return selected
|
||||
elif raise_when_empty:
|
||||
raise RuntimeError(f"所有{concurrency_count}个并发请求都失败了")
|
||||
else:
|
||||
return "所有并发请求都失败了", ("", "unknown", None)
|
||||
|
||||
except Exception as e:
|
||||
if raise_when_empty:
|
||||
raise e
|
||||
return "并发请求异常", ("", "unknown", None)
|
||||
|
||||
async def _execute_single_request(
|
||||
self,
|
||||
prompt: str,
|
||||
temperature: Optional[float] = None,
|
||||
max_tokens: Optional[int] = None,
|
||||
tools: Optional[List[Dict[str, Any]]] = None,
|
||||
raise_when_empty: bool = True,
|
||||
) -> Tuple[str, Tuple[str, str, Optional[List[ToolCall]]]]:
|
||||
"""执行单次请求"""
|
||||
# 模型选择和请求准备
|
||||
model_info, api_provider, client = self._select_model()
|
||||
processed_prompt = self._apply_content_obfuscation(prompt, api_provider)
|
||||
|
||||
message_builder = MessageBuilder()
|
||||
message_builder.add_text_content(processed_prompt)
|
||||
messages = [message_builder.build()]
|
||||
|
||||
tool_built = self._build_tool_options(tools)
|
||||
|
||||
# 请求并处理返回值
|
||||
logger.debug(f"LLM选择耗时: {model_info.name} {time.time() - start_time}")
|
||||
|
||||
# 空回复重试逻辑
|
||||
empty_retry_count = 0
|
||||
max_empty_retry = api_provider.max_retry
|
||||
@@ -241,11 +282,9 @@ class LLMRequest:
|
||||
empty_retry_count += 1
|
||||
logger.warning(f"检测到空回复,正在进行第 {empty_retry_count}/{max_empty_retry} 次重新生成")
|
||||
|
||||
# 等待一定时间后重试
|
||||
if empty_retry_interval > 0:
|
||||
await asyncio.sleep(empty_retry_interval)
|
||||
|
||||
# 重新选择模型(可能选择不同的模型)
|
||||
model_info, api_provider, client = self._select_model()
|
||||
continue
|
||||
|
||||
@@ -259,32 +298,27 @@ class LLMRequest:
|
||||
endpoint="/chat/completions",
|
||||
)
|
||||
|
||||
# 如果内容仍然为空
|
||||
# 处理空回复
|
||||
if not content:
|
||||
if raise_when_empty:
|
||||
logger.warning(f"经过 {empty_retry_count} 次重试后仍然生成空回复")
|
||||
raise RuntimeError(f"经过 {empty_retry_count} 次重试后仍然生成空回复")
|
||||
content = "生成的响应为空,请检查模型配置或输入内容是否正确"
|
||||
else:
|
||||
# 成功生成非空回复
|
||||
if empty_retry_count > 0:
|
||||
elif empty_retry_count > 0:
|
||||
logger.info(f"经过 {empty_retry_count} 次重试后成功生成回复")
|
||||
|
||||
return content, (reasoning_content, model_info.name, tool_calls)
|
||||
|
||||
except Exception as e:
|
||||
# 如果是网络错误等其他异常,不进行空回复重试
|
||||
if empty_retry_count == 0: # 只在第一次出错时抛出异常
|
||||
if empty_retry_count == 0:
|
||||
raise e
|
||||
else:
|
||||
# 如果已经在重试过程中出错,记录日志并继续
|
||||
logger.error(f"重试过程中出错: {e}")
|
||||
empty_retry_count += 1
|
||||
if empty_retry_count <= max_empty_retry and empty_retry_interval > 0:
|
||||
await asyncio.sleep(empty_retry_interval)
|
||||
continue
|
||||
|
||||
# 如果所有重试都失败了
|
||||
# 重试失败
|
||||
if raise_when_empty:
|
||||
raise RuntimeError(f"经过 {max_empty_retry} 次重试后仍然无法生成有效回复")
|
||||
return "生成的响应为空,请检查模型配置或输入内容是否正确", ("", model_info.name, None)
|
||||
|
||||
@@ -118,6 +118,7 @@ price_out = 0
|
||||
model_list = ["siliconflow-deepseek-v3"] # 使用的模型列表,每个子项对应上面的模型名称(name)
|
||||
temperature = 0.2 # 模型温度,新V3建议0.1-0.3
|
||||
max_tokens = 800 # 最大输出token数
|
||||
#concurrency_count = 2 # 并发请求数量,默认为1(不并发),设置为2或更高启用并发
|
||||
|
||||
[model_task_config.utils_small] # 在麦麦的一些组件中使用的小模型,消耗量较大,建议使用速度较快的小模型
|
||||
model_list = ["qwen3-8b"]
|
||||
|
||||
Reference in New Issue
Block a user