diff --git a/src/plugins/models/utils_model.py b/src/plugins/models/utils_model.py
index a471bd72d..7604389bf 100644
--- a/src/plugins/models/utils_model.py
+++ b/src/plugins/models/utils_model.py
@@ -1,5 +1,6 @@
import aiohttp
import asyncio
+import json
import requests
import time
import re
@@ -138,7 +139,12 @@ class LLM_request:
}
api_url = f"{self.base_url.rstrip('/')}/{endpoint.lstrip('/')}"
- logger.info(f"发送请求到URL: {api_url}")
+ #判断是否为流式
+ stream_mode = self.params.get("stream", False)
+ if self.params.get("stream", False) is True:
+ logger.info(f"进入流式输出模式,发送请求到URL: {api_url}")
+ else:
+ logger.info(f"发送请求到URL: {api_url}")
logger.info(f"使用模型: {self.model_name}")
# 构建请求体
@@ -151,6 +157,9 @@ class LLM_request:
try:
# 使用上下文管理器处理会话
headers = await self._build_headers()
+ #似乎是openai流式必须要的东西,不过阿里云的qwq-plus加了这个没有影响
+ if stream_mode:
+ headers["Accept"] = "text/event-stream"
async with aiohttp.ClientSession() as session:
async with session.post(api_url, headers=headers, json=payload) as response:
@@ -173,12 +182,41 @@ class LLM_request:
elif response.status in policy["abort_codes"]:
logger.error(f"错误码: {response.status} - {error_code_mapping.get(response.status)}")
raise RuntimeError(f"请求被拒绝: {error_code_mapping.get(response.status)}")
-
+
response.raise_for_status()
- result = await response.json()
-
- # 使用自定义处理器或默认处理
- return response_handler(result) if response_handler else self._default_response_handler(result, user_id, request_type, endpoint)
+
+ if stream_mode:
+ accumulated_content = ""
+ async for line_bytes in response.content:
+ line = line_bytes.decode("utf-8").strip()
+ if not line:
+ continue
+ if line.startswith("data:"):
+ data_str = line[5:].strip()
+ if data_str == "[DONE]":
+ break
+ try:
+ chunk = json.loads(data_str)
+ delta = chunk["choices"][0]["delta"]
+ delta_content = delta.get("content")
+ if delta_content is None:
+ delta_content = ""
+ accumulated_content += delta_content
+ except Exception as e:
+ logger.error(f"解析流式输出错误: {e}")
+ content = accumulated_content
+ reasoning_content = ""
+ think_match = re.search(r'(.*?)', content, re.DOTALL)
+ if think_match:
+ reasoning_content = think_match.group(1).strip()
+ content = re.sub(r'.*?', '', content, flags=re.DOTALL).strip()
+ # 构造一个伪result以便调用自定义响应处理器或默认处理器
+ result = {"choices": [{"message": {"content": content, "reasoning_content": reasoning_content}}]}
+ return response_handler(result) if response_handler else self._default_response_handler(result, user_id, request_type, endpoint)
+ else:
+ result = await response.json()
+ # 使用自定义处理器或默认处理
+ return response_handler(result) if response_handler else self._default_response_handler(result, user_id, request_type, endpoint)
except Exception as e:
if retry < policy["max_retries"] - 1: