From 54565a405c61c1359a50db2a3eb8c1d673b4d6f3 Mon Sep 17 00:00:00 2001 From: minecraft1024a Date: Sat, 16 Aug 2025 13:39:49 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B7=BB=E5=8A=A0=E5=B9=B6=E5=8F=91=E8=AF=B7?= =?UTF-8?q?=E6=B1=82=E6=94=AF=E6=8C=81=EF=BC=8C=E6=9B=B4=E6=96=B0LLMReques?= =?UTF-8?q?t=E7=B1=BB=E4=BB=A5=E5=85=81=E8=AE=B8=E6=A0=B9=E6=8D=AE?= =?UTF-8?q?=E9=85=8D=E7=BD=AE=E7=9A=84=E5=B9=B6=E5=8F=91=E6=95=B0=E9=87=8F?= =?UTF-8?q?=E8=BF=9B=E8=A1=8C=E5=BC=82=E6=AD=A5=E8=AF=B7=E6=B1=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/config/api_ada_configs.py | 3 ++ src/llm_models/utils_model.py | 76 +++++++++++++++++++++-------- template/model_config_template.toml | 1 + 3 files changed, 59 insertions(+), 21 deletions(-) diff --git a/src/config/api_ada_configs.py b/src/config/api_ada_configs.py index 45abf4c92..c38497618 100644 --- a/src/config/api_ada_configs.py +++ b/src/config/api_ada_configs.py @@ -94,6 +94,9 @@ class TaskConfig(ConfigBase): temperature: float = 0.3 """模型温度""" + concurrency_count: int = 1 + """并发请求数量,默认为1(不并发)""" + @dataclass class ModelTaskConfig(ConfigBase): diff --git a/src/llm_models/utils_model.py b/src/llm_models/utils_model.py index 3c6ca8ff3..9cffe810e 100644 --- a/src/llm_models/utils_model.py +++ b/src/llm_models/utils_model.py @@ -1,6 +1,7 @@ import re import asyncio import time +import random from enum import Enum from rich.traceback import install @@ -181,32 +182,72 @@ class LLMRequest: raise_when_empty: bool = True, ) -> Tuple[str, Tuple[str, str, Optional[List[ToolCall]]]]: """ - 异步生成响应 + 异步生成响应,支持并发请求 Args: prompt (str): 提示词 temperature (float, optional): 温度参数 max_tokens (int, optional): 最大token数 + tools: 工具配置 + raise_when_empty: 是否在空回复时抛出异常 Returns: (Tuple[str, str, str, Optional[List[ToolCall]]]): 响应内容、推理内容、模型名称、工具调用列表 """ - # 请求体构建 start_time = time.time() - # 模型选择 - model_info, api_provider, client = self._select_model() + # 检查是否需要并发请求 + concurrency_count = getattr(self.model_for_task, 'concurrency_count', 1) - # 🔥 内容混淆处理 + if concurrency_count <= 1: + # 单次请求,原有逻辑 + return await self._execute_single_request(prompt, temperature, max_tokens, tools, raise_when_empty) + + # 并发请求 + logger.info(f"启用并发请求模式,并发数: {concurrency_count}") + tasks = [ + self._execute_single_request(prompt, temperature, max_tokens, tools, False) + for _ in range(concurrency_count) + ] + + try: + results = await asyncio.gather(*tasks, return_exceptions=True) + successful_results = [] + for result in results: + if not isinstance(result, Exception): + successful_results.append(result) + + if successful_results: + # 随机选择一个成功结果 + selected = random.choice(successful_results) if len(successful_results) > 1 else successful_results[0] + logger.info(f"并发请求完成,从{len(successful_results)}个成功结果中选择了一个") + return selected + elif raise_when_empty: + raise RuntimeError(f"所有{concurrency_count}个并发请求都失败了") + else: + return "所有并发请求都失败了", ("", "unknown", None) + + except Exception as e: + if raise_when_empty: + raise e + return "并发请求异常", ("", "unknown", None) + + async def _execute_single_request( + self, + prompt: str, + temperature: Optional[float] = None, + max_tokens: Optional[int] = None, + tools: Optional[List[Dict[str, Any]]] = None, + raise_when_empty: bool = True, + ) -> Tuple[str, Tuple[str, str, Optional[List[ToolCall]]]]: + """执行单次请求""" + # 模型选择和请求准备 + model_info, api_provider, client = self._select_model() processed_prompt = self._apply_content_obfuscation(prompt, api_provider) message_builder = MessageBuilder() message_builder.add_text_content(processed_prompt) messages = [message_builder.build()] - tool_built = self._build_tool_options(tools) - # 请求并处理返回值 - logger.debug(f"LLM选择耗时: {model_info.name} {time.time() - start_time}") - # 空回复重试逻辑 empty_retry_count = 0 max_empty_retry = api_provider.max_retry @@ -241,11 +282,9 @@ class LLMRequest: empty_retry_count += 1 logger.warning(f"检测到空回复,正在进行第 {empty_retry_count}/{max_empty_retry} 次重新生成") - # 等待一定时间后重试 if empty_retry_interval > 0: await asyncio.sleep(empty_retry_interval) - # 重新选择模型(可能选择不同的模型) model_info, api_provider, client = self._select_model() continue @@ -259,32 +298,27 @@ class LLMRequest: endpoint="/chat/completions", ) - # 如果内容仍然为空 + # 处理空回复 if not content: if raise_when_empty: - logger.warning(f"经过 {empty_retry_count} 次重试后仍然生成空回复") raise RuntimeError(f"经过 {empty_retry_count} 次重试后仍然生成空回复") content = "生成的响应为空,请检查模型配置或输入内容是否正确" - else: - # 成功生成非空回复 - if empty_retry_count > 0: - logger.info(f"经过 {empty_retry_count} 次重试后成功生成回复") + elif empty_retry_count > 0: + logger.info(f"经过 {empty_retry_count} 次重试后成功生成回复") return content, (reasoning_content, model_info.name, tool_calls) except Exception as e: - # 如果是网络错误等其他异常,不进行空回复重试 - if empty_retry_count == 0: # 只在第一次出错时抛出异常 + if empty_retry_count == 0: raise e else: - # 如果已经在重试过程中出错,记录日志并继续 logger.error(f"重试过程中出错: {e}") empty_retry_count += 1 if empty_retry_count <= max_empty_retry and empty_retry_interval > 0: await asyncio.sleep(empty_retry_interval) continue - # 如果所有重试都失败了 + # 重试失败 if raise_when_empty: raise RuntimeError(f"经过 {max_empty_retry} 次重试后仍然无法生成有效回复") return "生成的响应为空,请检查模型配置或输入内容是否正确", ("", model_info.name, None) diff --git a/template/model_config_template.toml b/template/model_config_template.toml index 69b275920..1c4fd5781 100644 --- a/template/model_config_template.toml +++ b/template/model_config_template.toml @@ -118,6 +118,7 @@ price_out = 0 model_list = ["siliconflow-deepseek-v3"] # 使用的模型列表,每个子项对应上面的模型名称(name) temperature = 0.2 # 模型温度,新V3建议0.1-0.3 max_tokens = 800 # 最大输出token数 +#concurrency_count = 2 # 并发请求数量,默认为1(不并发),设置为2或更高启用并发 [model_task_config.utils_small] # 在麦麦的一些组件中使用的小模型,消耗量较大,建议使用速度较快的小模型 model_list = ["qwen3-8b"]