使支持流式输出
This commit is contained in:
@@ -1,5 +1,6 @@
|
|||||||
import aiohttp
|
import aiohttp
|
||||||
import asyncio
|
import asyncio
|
||||||
|
import json
|
||||||
import requests
|
import requests
|
||||||
import time
|
import time
|
||||||
import re
|
import re
|
||||||
@@ -138,7 +139,12 @@ class LLM_request:
|
|||||||
}
|
}
|
||||||
|
|
||||||
api_url = f"{self.base_url.rstrip('/')}/{endpoint.lstrip('/')}"
|
api_url = f"{self.base_url.rstrip('/')}/{endpoint.lstrip('/')}"
|
||||||
logger.info(f"发送请求到URL: {api_url}")
|
#判断是否为流式
|
||||||
|
stream_mode = self.params.get("stream", False)
|
||||||
|
if self.params.get("stream", False) is True:
|
||||||
|
logger.info(f"进入流式输出模式,发送请求到URL: {api_url}")
|
||||||
|
else:
|
||||||
|
logger.info(f"发送请求到URL: {api_url}")
|
||||||
logger.info(f"使用模型: {self.model_name}")
|
logger.info(f"使用模型: {self.model_name}")
|
||||||
|
|
||||||
# 构建请求体
|
# 构建请求体
|
||||||
@@ -151,6 +157,9 @@ class LLM_request:
|
|||||||
try:
|
try:
|
||||||
# 使用上下文管理器处理会话
|
# 使用上下文管理器处理会话
|
||||||
headers = await self._build_headers()
|
headers = await self._build_headers()
|
||||||
|
#似乎是openai流式必须要的东西,不过阿里云的qwq-plus加了这个没有影响
|
||||||
|
if stream_mode:
|
||||||
|
headers["Accept"] = "text/event-stream"
|
||||||
|
|
||||||
async with aiohttp.ClientSession() as session:
|
async with aiohttp.ClientSession() as session:
|
||||||
async with session.post(api_url, headers=headers, json=payload) as response:
|
async with session.post(api_url, headers=headers, json=payload) as response:
|
||||||
@@ -175,10 +184,39 @@ class LLM_request:
|
|||||||
raise RuntimeError(f"请求被拒绝: {error_code_mapping.get(response.status)}")
|
raise RuntimeError(f"请求被拒绝: {error_code_mapping.get(response.status)}")
|
||||||
|
|
||||||
response.raise_for_status()
|
response.raise_for_status()
|
||||||
result = await response.json()
|
|
||||||
|
|
||||||
# 使用自定义处理器或默认处理
|
if stream_mode:
|
||||||
return response_handler(result) if response_handler else self._default_response_handler(result, user_id, request_type, endpoint)
|
accumulated_content = ""
|
||||||
|
async for line_bytes in response.content:
|
||||||
|
line = line_bytes.decode("utf-8").strip()
|
||||||
|
if not line:
|
||||||
|
continue
|
||||||
|
if line.startswith("data:"):
|
||||||
|
data_str = line[5:].strip()
|
||||||
|
if data_str == "[DONE]":
|
||||||
|
break
|
||||||
|
try:
|
||||||
|
chunk = json.loads(data_str)
|
||||||
|
delta = chunk["choices"][0]["delta"]
|
||||||
|
delta_content = delta.get("content")
|
||||||
|
if delta_content is None:
|
||||||
|
delta_content = ""
|
||||||
|
accumulated_content += delta_content
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"解析流式输出错误: {e}")
|
||||||
|
content = accumulated_content
|
||||||
|
reasoning_content = ""
|
||||||
|
think_match = re.search(r'<think>(.*?)</think>', content, re.DOTALL)
|
||||||
|
if think_match:
|
||||||
|
reasoning_content = think_match.group(1).strip()
|
||||||
|
content = re.sub(r'<think>.*?</think>', '', content, flags=re.DOTALL).strip()
|
||||||
|
# 构造一个伪result以便调用自定义响应处理器或默认处理器
|
||||||
|
result = {"choices": [{"message": {"content": content, "reasoning_content": reasoning_content}}]}
|
||||||
|
return response_handler(result) if response_handler else self._default_response_handler(result, user_id, request_type, endpoint)
|
||||||
|
else:
|
||||||
|
result = await response.json()
|
||||||
|
# 使用自定义处理器或默认处理
|
||||||
|
return response_handler(result) if response_handler else self._default_response_handler(result, user_id, request_type, endpoint)
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
if retry < policy["max_retries"] - 1:
|
if retry < policy["max_retries"] - 1:
|
||||||
|
|||||||
Reference in New Issue
Block a user