From 8497441507ed8b1a9bac9c580b058086c698e5fd Mon Sep 17 00:00:00 2001 From: sky2002 Date: Fri, 7 Mar 2025 14:17:03 +0000 Subject: [PATCH] =?UTF-8?q?=E4=BD=BF=E6=94=AF=E6=8C=81=E6=B5=81=E5=BC=8F?= =?UTF-8?q?=E8=BE=93=E5=87=BA?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/plugins/models/utils_model.py | 50 +++++++++++++++++++++++++++---- 1 file changed, 44 insertions(+), 6 deletions(-) diff --git a/src/plugins/models/utils_model.py b/src/plugins/models/utils_model.py index a471bd72d..7604389bf 100644 --- a/src/plugins/models/utils_model.py +++ b/src/plugins/models/utils_model.py @@ -1,5 +1,6 @@ import aiohttp import asyncio +import json import requests import time import re @@ -138,7 +139,12 @@ class LLM_request: } api_url = f"{self.base_url.rstrip('/')}/{endpoint.lstrip('/')}" - logger.info(f"发送请求到URL: {api_url}") + #判断是否为流式 + stream_mode = self.params.get("stream", False) + if self.params.get("stream", False) is True: + logger.info(f"进入流式输出模式,发送请求到URL: {api_url}") + else: + logger.info(f"发送请求到URL: {api_url}") logger.info(f"使用模型: {self.model_name}") # 构建请求体 @@ -151,6 +157,9 @@ class LLM_request: try: # 使用上下文管理器处理会话 headers = await self._build_headers() + #似乎是openai流式必须要的东西,不过阿里云的qwq-plus加了这个没有影响 + if stream_mode: + headers["Accept"] = "text/event-stream" async with aiohttp.ClientSession() as session: async with session.post(api_url, headers=headers, json=payload) as response: @@ -173,12 +182,41 @@ class LLM_request: elif response.status in policy["abort_codes"]: logger.error(f"错误码: {response.status} - {error_code_mapping.get(response.status)}") raise RuntimeError(f"请求被拒绝: {error_code_mapping.get(response.status)}") - + response.raise_for_status() - result = await response.json() - - # 使用自定义处理器或默认处理 - return response_handler(result) if response_handler else self._default_response_handler(result, user_id, request_type, endpoint) + + if stream_mode: + accumulated_content = "" + async for line_bytes in response.content: + line = line_bytes.decode("utf-8").strip() + if not line: + continue + if line.startswith("data:"): + data_str = line[5:].strip() + if data_str == "[DONE]": + break + try: + chunk = json.loads(data_str) + delta = chunk["choices"][0]["delta"] + delta_content = delta.get("content") + if delta_content is None: + delta_content = "" + accumulated_content += delta_content + except Exception as e: + logger.error(f"解析流式输出错误: {e}") + content = accumulated_content + reasoning_content = "" + think_match = re.search(r'(.*?)', content, re.DOTALL) + if think_match: + reasoning_content = think_match.group(1).strip() + content = re.sub(r'.*?', '', content, flags=re.DOTALL).strip() + # 构造一个伪result以便调用自定义响应处理器或默认处理器 + result = {"choices": [{"message": {"content": content, "reasoning_content": reasoning_content}}]} + return response_handler(result) if response_handler else self._default_response_handler(result, user_id, request_type, endpoint) + else: + result = await response.json() + # 使用自定义处理器或默认处理 + return response_handler(result) if response_handler else self._default_response_handler(result, user_id, request_type, endpoint) except Exception as e: if retry < policy["max_retries"] - 1: