From ee44b02f937bbf503e6be38170a3d4055a473110 Mon Sep 17 00:00:00 2001 From: Windpicker-owo <3431391539@qq.com> Date: Sun, 9 Nov 2025 20:16:47 +0800 Subject: [PATCH] =?UTF-8?q?feat(client):=20=E4=BC=98=E5=8C=96=E8=BF=9E?= =?UTF-8?q?=E6=8E=A5=E6=B1=A0=E9=85=8D=E7=BD=AE=E4=BB=A5=E6=94=AF=E6=8C=81?= =?UTF-8?q?=E9=AB=98=E5=B9=B6=E5=8F=91embedding=E8=AF=B7=E6=B1=82=20refact?= =?UTF-8?q?or(request):=20=E7=A7=BB=E9=99=A4=E5=85=A8=E5=B1=80=E9=94=81?= =?UTF-8?q?=EF=BC=8C=E6=94=B9=E7=94=A8=E4=BF=A1=E5=8F=B7=E9=87=8F=E6=8E=A7?= =?UTF-8?q?=E5=88=B6=E5=B9=B6=E5=8F=91=E5=BA=A6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/llm_models/model_client/openai_client.py | 11 ++++ src/llm_models/utils_model.py | 60 +++++++++++--------- 2 files changed, 44 insertions(+), 27 deletions(-) diff --git a/src/llm_models/model_client/openai_client.py b/src/llm_models/model_client/openai_client.py index efa1785d9..2bf07c734 100644 --- a/src/llm_models/model_client/openai_client.py +++ b/src/llm_models/model_client/openai_client.py @@ -433,11 +433,22 @@ class OpenaiClient(BaseClient): f"创建新的 AsyncOpenAI 客户端实例 (base_url={self.api_provider.base_url}, config_hash={self._config_hash}, loop_id={current_loop_id})" ) + # 🔧 优化:增加连接池限制,支持高并发embedding请求 + # 默认httpx限制为100,对于高频embedding场景不够用 + import httpx + + limits = httpx.Limits( + max_keepalive_connections=200, # 保持活跃连接数(原100) + max_connections=300, # 最大总连接数(原100) + keepalive_expiry=30.0, # 连接保活时间 + ) + client = AsyncOpenAI( base_url=self.api_provider.base_url, api_key=self.api_provider.get_api_key(), max_retries=0, timeout=self.api_provider.timeout, + http_client=httpx.AsyncClient(limits=limits), # 🔧 自定义连接池配置 ) # 存入全局缓存(带事件循环ID) diff --git a/src/llm_models/utils_model.py b/src/llm_models/utils_model.py index 663c08a02..4599f1d8b 100644 --- a/src/llm_models/utils_model.py +++ b/src/llm_models/utils_model.py @@ -802,7 +802,11 @@ class LLMRequest: for model in self.model_for_task.model_list } """模型使用量记录""" - self._lock = asyncio.Lock() + # 🔧 优化:移除全局锁,改用信号量控制并发度(允许多个请求并行) + # 默认允许50个并发请求,可通过配置调整 + max_concurrent = getattr(model_set, "max_concurrent_requests", 50) + self._semaphore = asyncio.Semaphore(max_concurrent) + self._stats_lock = asyncio.Lock() # 只保护统计数据的写入 # 初始化辅助类 self._model_selector = _ModelSelector(self.model_for_task.model_list, self.model_usage) @@ -931,23 +935,24 @@ class LLMRequest: tools: list[dict[str, Any]] | None = None, raise_when_empty: bool = True, ) -> tuple[str, tuple[str, str, list[ToolCall] | None]]: - async with self._lock: - """ - 执行单次文本生成请求的内部方法。 - 这是 `generate_response_async` 的核心实现,处理单个请求的完整生命周期, - 包括工具构建、故障转移执行和用量记录。 + """ + 执行单次文本生成请求的内部方法。 + 这是 `generate_response_async` 的核心实现,处理单个请求的完整生命周期, + 包括工具构建、故障转移执行和用量记录。 - Args: - prompt (str): 用户的提示。 - temperature (Optional[float]): 生成温度。 - max_tokens (Optional[int]): 最大生成令牌数。 - tools (Optional[List[Dict[str, Any]]]): 可用工具列表。 - raise_when_empty (bool): 如果响应为空是否引发异常。 + Args: + prompt (str): 用户的提示。 + temperature (Optional[float]): 生成温度。 + max_tokens (Optional[int]): 最大生成令牌数。 + tools (Optional[List[Dict[str, Any]]]): 可用工具列表。 + raise_when_empty (bool): 如果响应为空是否引发异常。 - Returns: - Tuple[str, Tuple[str, str, Optional[List[ToolCall]]]]: - (响应内容, (推理过程, 模型名称, 工具调用)) - """ + Returns: + Tuple[str, Tuple[str, str, Optional[List[ToolCall]]]]: + (响应内容, (推理过程, 模型名称, 工具调用)) + """ + # 🔧 优化:使用信号量控制并发,允许多个请求并行执行 + async with self._semaphore: start_time = time.time() tool_options = await self._build_tool_options(tools) @@ -1006,20 +1011,21 @@ class LLMRequest: endpoint (str): 请求的API端点 (e.g., "/chat/completions")。 """ if usage: - # 步骤1: 更新内存中的统计数据,用于负载均衡 - stats = self.model_usage[model_info.name] + # 步骤1: 更新内存中的统计数据,用于负载均衡(需要加锁保护) + async with self._stats_lock: + stats = self.model_usage[model_info.name] - # 计算新的平均延迟 - new_request_count = stats.request_count + 1 - new_avg_latency = (stats.avg_latency * stats.request_count + time_cost) / new_request_count + # 计算新的平均延迟 + new_request_count = stats.request_count + 1 + new_avg_latency = (stats.avg_latency * stats.request_count + time_cost) / new_request_count - self.model_usage[model_info.name] = stats._replace( - total_tokens=stats.total_tokens + usage.total_tokens, - avg_latency=new_avg_latency, - request_count=new_request_count, - ) + self.model_usage[model_info.name] = stats._replace( + total_tokens=stats.total_tokens + usage.total_tokens, + avg_latency=new_avg_latency, + request_count=new_request_count, + ) - # 步骤2: 创建一个后台任务,将用量数据异步写入数据库 + # 步骤2: 创建一个后台任务,将用量数据异步写入数据库(无需等待) asyncio.create_task( # noqa: RUF006 llm_usage_recorder.record_usage_to_database( model_info=model_info,