diff --git a/changelogs/changelog.md b/changelogs/changelog.md
index a510b51e0..9369fbdc2 100644
--- a/changelogs/changelog.md
+++ b/changelogs/changelog.md
@@ -1,5 +1,17 @@
# Changelog
+## [0.10.0] - 2025-7-1
+### 主要功能更改
+- 工具系统重构,现在合并到了插件系统中
+- 彻底重构了整个LLM Request了,现在支持模型轮询和更多灵活的参数
+ - 同时重构了整个模型配置系统,升级需要重新配置llm配置文件
+- 随着LLM Request的重构,插件系统彻底重构完成。插件系统进入稳定状态,仅增加新的API
+ - 具体相比于之前的更改可以查看[changes.md](./changes.md)
+
+### 细节优化
+- 修复了lint爆炸的问题,代码更加规范了
+- 修改了log的颜色,更加护眼
+
## [0.9.1] - 2025-7-26
### 主要修复和优化
diff --git a/changes.md b/changelogs/changes.md
similarity index 90%
rename from changes.md
rename to changelogs/changes.md
index b776991de..db41703c4 100644
--- a/changes.md
+++ b/changelogs/changes.md
@@ -25,6 +25,7 @@
- 这意味着你终于可以动态控制是否继续后续消息的处理了。
8. 移除了dependency_manager,但是依然保留了`python_dependencies`属性,等待后续重构。
- 一并移除了文档有关manager的内容。
+9. 增加了工具的有关api
# 插件系统修改
1. 现在所有的匹配模式不再是关键字了,而是枚举类。**(可能有遗漏)**
@@ -57,30 +58,12 @@
15. 实现了组件的局部禁用,也就是针对某一个聊天禁用的功能。
- 通过`disable_specific_chat_action`,`enable_specific_chat_action`,`disable_specific_chat_command`,`enable_specific_chat_command`,`disable_specific_chat_event_handler`,`enable_specific_chat_event_handler`来操作
- 同样不保存到配置文件~
+16. 把`BaseTool`一并合并进入了插件系统
# 官方插件修改
1. `HelloWorld`插件现在有一个样例的`EventHandler`。
-2. 内置插件增加了一个通过`Command`来管理插件的功能。具体是使用`/pm`命令唤起。
-
-### TODO
-把这个看起来就很别扭的config获取方式改一下
-
-
-# 吐槽
-```python
-plugin_path = Path(plugin_file)
-if plugin_path.parent.name != "plugins":
- # 插件包格式:parent_dir.plugin
- module_name = f"plugins.{plugin_path.parent.name}.plugin"
-else:
- # 单文件格式:plugins.filename
- module_name = f"plugins.{plugin_path.stem}"
-```
-```python
-plugin_path = Path(plugin_file)
-module_name = ".".join(plugin_path.parent.parts)
-```
-这两个区别很大的。
+2. 内置插件增加了一个通过`Command`来管理插件的功能。具体是使用`/pm`命令唤起。(需要自行启用)
+3. `HelloWorld`插件现在有一个样例的`CompareNumbersTool`。
### 执笔BGM
塞壬唱片!
\ No newline at end of file
diff --git a/docs/plugins/tool-system.md b/docs/plugins/tool-system.md
index eab560734..55e2cb713 100644
--- a/docs/plugins/tool-system.md
+++ b/docs/plugins/tool-system.md
@@ -33,21 +33,26 @@ class MyTool(BaseTool):
# 工具描述,告诉LLM这个工具的用途
description = "这个工具用于获取特定类型的信息"
- # 参数定义,遵循JSONSchema格式
- parameters = {
- "type": "object",
- "properties": {
- "query": {
- "type": "string",
- "description": "查询参数"
- },
- "limit": {
- "type": "integer",
- "description": "结果数量限制"
- }
- },
- "required": ["query"]
- }
+ # 参数定义,仅定义参数
+ # 比如想要定义一个类似下面的openai格式的参数表,则可以这么定义:
+ # {
+ # "type": "object",
+ # "properties": {
+ # "query": {
+ # "type": "string",
+ # "description": "查询参数"
+ # },
+ # "limit": {
+ # "type": "integer",
+ # "description": "结果数量限制"
+ # }
+ # },
+ # "required": ["query"]
+ # }
+ parameters = [
+ ("query", "string", "查询参数", True), # 必填参数
+ ("limit", "integer", "结果数量限制", False) # 可选参数
+ ]
available_for_llm = True # 是否对LLM可用
@@ -68,7 +73,7 @@ class MyTool(BaseTool):
|-----|------|------|
| `name` | str | 工具的唯一标识名称 |
| `description` | str | 工具功能描述,帮助LLM理解用途 |
-| `parameters` | dict | JSONSchema格式的参数定义 |
+| `parameters` | list[tuple] | 参数定义 |
### 方法说明
@@ -92,23 +97,13 @@ class WeatherTool(BaseTool):
name = "weather_query"
description = "查询指定城市的实时天气信息,包括温度、湿度、天气状况等"
+ available_for_llm = True # 允许LLM调用此工具
+ parameters = [
+ ("city", "string", "要查询天气的城市名称,如:北京、上海、纽约", True),
+ ("country", "string", "国家代码,如:CN、US,可选参数", False)
+ ]
- parameters = {
- "type": "object",
- "properties": {
- "city": {
- "type": "string",
- "description": "要查询天气的城市名称,如:北京、上海、纽约"
- },
- "country": {
- "type": "string",
- "description": "国家代码,如:CN、US,可选参数"
- }
- },
- "required": ["city"]
- }
-
- async def execute(self, function_args, message_txt=""):
+ async def execute(self, function_args: dict):
"""执行天气查询"""
try:
city = function_args.get("city")
@@ -185,66 +180,49 @@ class WeatherTool(BaseTool):
## 🎯 最佳实践
### 1. 工具命名规范
-
+#### ✅ 好的命名
```python
-# ✅ 好的命名
name = "weather_query" # 清晰表达功能
name = "knowledge_search" # 描述性强
name = "stock_price_check" # 功能明确
-
-# ❌ 避免的命名
+```
+#### ❌ 避免的命名
+```python
name = "tool1" # 无意义
name = "wq" # 过于简短
name = "weather_and_news" # 功能过于复杂
```
### 2. 描述规范
-
+#### ✅ 良好的描述
```python
-# ✅ 好的描述
description = "查询指定城市的实时天气信息,包括温度、湿度、天气状况"
-
-# ❌ 避免的描述
+```
+#### ❌ 避免的描述
+```python
description = "天气" # 过于简单
description = "获取信息" # 不够具体
```
### 3. 参数设计
+#### ✅ 合理的参数设计
```python
-# ✅ 合理的参数设计
-parameters = {
- "type": "object",
- "properties": {
- "city": {
- "type": "string",
- "description": "城市名称,如:北京、上海"
- },
- "unit": {
- "type": "string",
- "description": "温度单位:celsius(摄氏度) 或 fahrenheit(华氏度)",
- "enum": ["celsius", "fahrenheit"]
- }
- },
- "required": ["city"]
-}
-
-# ❌ 避免的参数设计
-parameters = {
- "type": "object",
- "properties": {
- "data": {
- "type": "string",
- "description": "数据" # 描述不清晰
- }
- }
-}
+parameters = [
+ ("city", "string", "城市名称,如:北京、上海", True),
+ ("unit", "string", "温度单位:celsius 或 fahrenheit", False)
+]
+```
+#### ❌ 避免的参数设计
+```python
+parameters = [
+ ("data", "string", "数据", True) # 参数过于模糊
+]
```
### 4. 结果格式化
-
+#### ✅ 良好的结果格式
```python
-# ✅ 良好的结果格式
def _format_result(self, data):
return f"""
🔍 查询结果
@@ -254,12 +232,9 @@ def _format_result(self, data):
📝 说明: {data['description']}
━━━━━━━━━━━━
""".strip()
-
-# ❌ 避免的结果格式
+```
+#### ❌ 避免的结果格式
+```python
def _format_result(self, data):
return str(data) # 直接返回原始数据
```
-
----
-
-🎉 **工具系统为麦麦提供了强大的信息获取能力!合理使用工具可以让麦麦变得更加智能和博学。**
\ No newline at end of file
diff --git a/src/chat/memory_system/Hippocampus.py b/src/chat/memory_system/Hippocampus.py
index af1723047..fe3c25625 100644
--- a/src/chat/memory_system/Hippocampus.py
+++ b/src/chat/memory_system/Hippocampus.py
@@ -10,7 +10,7 @@ import networkx as nx
import numpy as np
from itertools import combinations
-from typing import List, Tuple, Coroutine, Any, Dict, Set
+from typing import List, Tuple, Coroutine, Any, Set
from collections import Counter
from rich.traceback import install
@@ -1267,7 +1267,7 @@ class ParahippocampalGyrus:
logger.debug(f"过滤后话题: {filtered_topics}")
# 4. 创建所有话题的摘要生成任务
- tasks: List[Tuple[str, Coroutine[Any, Any, Tuple[str, Tuple[str, str, List[Dict[str, Any]] | None]]]]] = []
+ tasks: List[Tuple[str, Coroutine[Any, Any, Tuple[str, Tuple[str, str, List | None]]]]] = []
for topic in filtered_topics:
# 调用修改后的 topic_what,不再需要 time_info
topic_what_prompt = self.hippocampus.topic_what(input_text, topic)
diff --git a/src/llm_models/model_client/__init__bak.py b/src/llm_models/model_client/__init__bak.py
deleted file mode 100644
index 7e57c82d6..000000000
--- a/src/llm_models/model_client/__init__bak.py
+++ /dev/null
@@ -1,380 +0,0 @@
-import asyncio
-from typing import Callable, Any
-
-from openai import AsyncStream
-from openai.types.chat import ChatCompletionChunk, ChatCompletion
-
-from .base_client import BaseClient, APIResponse
-from src.config.api_ada_configs import (
- ModelInfo,
- ModelUsageArgConfigItem,
- RequestConfig,
- ModuleConfig,
-)
-from ..exceptions import (
- NetworkConnectionError,
- ReqAbortException,
- RespNotOkException,
- RespParseException,
-)
-from ..payload_content.message import Message
-from ..payload_content.resp_format import RespFormat
-from ..payload_content.tool_option import ToolOption
-from ..utils import compress_messages
-from src.common.logger import get_logger
-
-logger = get_logger("模型客户端")
-
-
-def _check_retry(
- remain_try: int,
- retry_interval: int,
- can_retry_msg: str,
- cannot_retry_msg: str,
- can_retry_callable: Callable | None = None,
- **kwargs,
-) -> tuple[int, Any | None]:
- """
- 辅助函数:检查是否可以重试
- :param remain_try: 剩余尝试次数
- :param retry_interval: 重试间隔
- :param can_retry_msg: 可以重试时的提示信息
- :param cannot_retry_msg: 不可以重试时的提示信息
- :return: (等待间隔(如果为0则不等待,为-1则不再请求该模型), 新的消息列表(适用于压缩消息))
- """
- if remain_try > 0:
- # 还有重试机会
- logger.warning(f"{can_retry_msg}")
- if can_retry_callable is not None:
- return retry_interval, can_retry_callable(**kwargs)
- else:
- return retry_interval, None
- else:
- # 达到最大重试次数
- logger.warning(f"{cannot_retry_msg}")
- return -1, None # 不再重试请求该模型
-
-
-def _handle_resp_not_ok(
- e: RespNotOkException,
- task_name: str,
- model_name: str,
- remain_try: int,
- retry_interval: int = 10,
- messages: tuple[list[Message], bool] | None = None,
-):
- """
- 处理响应错误异常
- :param e: 异常对象
- :param task_name: 任务名称
- :param model_name: 模型名称
- :param remain_try: 剩余尝试次数
- :param retry_interval: 重试间隔
- :param messages: (消息列表, 是否已压缩过)
- :return: (等待间隔(如果为0则不等待,为-1则不再请求该模型), 新的消息列表(适用于压缩消息))
- """
- # 响应错误
- if e.status_code in [401, 403]:
- # API Key认证错误 - 让多API Key机制处理,给一次重试机会
- if remain_try > 0:
- logger.warning(
- f"任务-'{task_name}' 模型-'{model_name}'\n"
- f"API Key认证失败(错误代码-{e.status_code}),多API Key机制会自动切换"
- )
- return 0, None # 立即重试,让底层客户端切换API Key
- else:
- logger.warning(
- f"任务-'{task_name}' 模型-'{model_name}'\n"
- f"所有API Key都认证失败,错误代码-{e.status_code},错误信息-{e.message}"
- )
- return -1, None # 不再重试请求该模型
- elif e.status_code in [400, 402, 404]:
- # 其他客户端错误(不应该重试)
- logger.warning(
- f"任务-'{task_name}' 模型-'{model_name}'\n"
- f"请求失败,错误代码-{e.status_code},错误信息-{e.message}"
- )
- return -1, None # 不再重试请求该模型
- elif e.status_code == 413:
- if messages and not messages[1]:
- # 消息列表不为空且未压缩,尝试压缩消息
- return _check_retry(
- remain_try,
- 0,
- can_retry_msg=(
- f"任务-'{task_name}' 模型-'{model_name}'\n"
- "请求体过大,尝试压缩消息后重试"
- ),
- cannot_retry_msg=(
- f"任务-'{task_name}' 模型-'{model_name}'\n"
- "请求体过大,压缩消息后仍然过大,放弃请求"
- ),
- can_retry_callable=compress_messages,
- messages=messages[0],
- )
- # 没有消息可压缩
- logger.warning(
- f"任务-'{task_name}' 模型-'{model_name}'\n"
- "请求体过大,无法压缩消息,放弃请求。"
- )
- return -1, None
- elif e.status_code == 429:
- # 请求过于频繁 - 让多API Key机制处理,适当延迟后重试
- return _check_retry(
- remain_try,
- min(retry_interval, 5), # 限制最大延迟为5秒,让API Key切换更快生效
- can_retry_msg=(
- f"任务-'{task_name}' 模型-'{model_name}'\n"
- f"请求过于频繁,多API Key机制会自动切换,{min(retry_interval, 5)}秒后重试"
- ),
- cannot_retry_msg=(
- f"任务-'{task_name}' 模型-'{model_name}'\n"
- "请求过于频繁,所有API Key都被限制,放弃请求"
- ),
- )
- elif e.status_code >= 500:
- # 服务器错误
- return _check_retry(
- remain_try,
- retry_interval,
- can_retry_msg=(
- f"任务-'{task_name}' 模型-'{model_name}'\n"
- f"服务器错误,将于{retry_interval}秒后重试"
- ),
- cannot_retry_msg=(
- f"任务-'{task_name}' 模型-'{model_name}'\n"
- "服务器错误,超过最大重试次数,请稍后再试"
- ),
- )
- else:
- # 未知错误
- logger.warning(
- f"任务-'{task_name}' 模型-'{model_name}'\n"
- f"未知错误,错误代码-{e.status_code},错误信息-{e.message}"
- )
- return -1, None
-
-
-def default_exception_handler(
- e: Exception,
- task_name: str,
- model_name: str,
- remain_try: int,
- retry_interval: int = 10,
- messages: tuple[list[Message], bool] | None = None,
-) -> tuple[int, list[Message] | None]:
- """
- 默认异常处理函数
- :param e: 异常对象
- :param task_name: 任务名称
- :param model_name: 模型名称
- :param remain_try: 剩余尝试次数
- :param retry_interval: 重试间隔
- :param messages: (消息列表, 是否已压缩过)
- :return (等待间隔(如果为0则不等待,为-1则不再请求该模型), 新的消息列表(适用于压缩消息))
- """
-
- if isinstance(e, NetworkConnectionError): # 网络连接错误
- # 网络错误可能是某个API Key的端点问题,给多API Key机制一次快速重试机会
- return _check_retry(
- remain_try,
- min(retry_interval, 3), # 网络错误时减少等待时间,让API Key切换更快
- can_retry_msg=(
- f"任务-'{task_name}' 模型-'{model_name}'\n"
- f"连接异常,多API Key机制会尝试其他Key,{min(retry_interval, 3)}秒后重试"
- ),
- cannot_retry_msg=(
- f"任务-'{task_name}' 模型-'{model_name}'\n"
- f"连接异常,超过最大重试次数,请检查网络连接状态或URL是否正确"
- ),
- )
- elif isinstance(e, ReqAbortException):
- logger.warning(
- f"任务-'{task_name}' 模型-'{model_name}'\n请求被中断,详细信息-{str(e.message)}"
- )
- return -1, None # 不再重试请求该模型
- elif isinstance(e, RespNotOkException):
- return _handle_resp_not_ok(
- e,
- task_name,
- model_name,
- remain_try,
- retry_interval,
- messages,
- )
- elif isinstance(e, RespParseException):
- # 响应解析错误
- logger.error(
- f"任务-'{task_name}' 模型-'{model_name}'\n"
- f"响应解析错误,错误信息-{e.message}\n"
- )
- logger.debug(f"附加内容:\n{str(e.ext_info)}")
- return -1, None # 不再重试请求该模型
- else:
- logger.error(
- f"任务-'{task_name}' 模型-'{model_name}'\n未知异常,错误信息-{str(e)}"
- )
- return -1, None # 不再重试请求该模型
-
-
-class ModelRequestHandler:
- """
- 模型请求处理器
- """
-
- def __init__(
- self,
- task_name: str,
- config: ModuleConfig,
- api_client_map: dict[str, BaseClient],
- ):
- self.task_name: str = task_name
- """任务名称"""
-
- self.client_map: dict[str, BaseClient] = {}
- """API客户端列表"""
-
- self.configs: list[tuple[ModelInfo, ModelUsageArgConfigItem]] = []
- """模型参数配置"""
-
- self.req_conf: RequestConfig = config.req_conf
- """请求配置"""
-
- # 获取模型与使用配置
- for model_usage in config.task_model_arg_map[task_name].usage:
- if model_usage.name not in config.models:
- logger.error(f"Model '{model_usage.name}' not found in ModelManager")
- raise KeyError(f"Model '{model_usage.name}' not found in ModelManager")
- model_info = config.models[model_usage.name]
-
- if model_info.api_provider not in self.client_map:
- # 缓存API客户端
- self.client_map[model_info.api_provider] = api_client_map[
- model_info.api_provider
- ]
-
- self.configs.append((model_info, model_usage)) # 添加模型与使用配置
-
- async def get_response(
- self,
- messages: list[Message],
- tool_options: list[ToolOption] | None = None,
- response_format: RespFormat | None = None, # 暂不启用
- stream_response_handler: Callable[
- [AsyncStream[ChatCompletionChunk], asyncio.Event | None], APIResponse
- ]
- | None = None,
- async_response_parser: Callable[[ChatCompletion], APIResponse] | None = None,
- interrupt_flag: asyncio.Event | None = None,
- ) -> APIResponse:
- """
- 获取对话响应
- :param messages: 消息列表
- :param tool_options: 工具选项列表
- :param response_format: 响应格式
- :param stream_response_handler: 流式响应处理函数(可选)
- :param async_response_parser: 响应解析函数(可选)
- :param interrupt_flag: 中断信号量(可选,默认为None)
- :return: APIResponse
- """
- # 遍历可用模型,若获取响应失败,则使用下一个模型继续请求
- for config_item in self.configs:
- client = self.client_map[config_item[0].api_provider]
- model_info: ModelInfo = config_item[0]
- model_usage_config: ModelUsageArgConfigItem = config_item[1]
-
- remain_try = (
- model_usage_config.max_retry or self.req_conf.max_retry
- ) + 1 # 初始化:剩余尝试次数 = 最大重试次数 + 1
-
- compressed_messages = None
- retry_interval = self.req_conf.retry_interval
- while remain_try > 0:
- try:
- return await client.get_response(
- model_info,
- message_list=(compressed_messages or messages),
- tool_options=tool_options,
- max_tokens=model_usage_config.max_tokens
- or self.req_conf.default_max_tokens,
- temperature=model_usage_config.temperature
- or self.req_conf.default_temperature,
- response_format=response_format,
- stream_response_handler=stream_response_handler,
- async_response_parser=async_response_parser,
- interrupt_flag=interrupt_flag,
- )
- except Exception as e:
- logger.debug(e)
- remain_try -= 1 # 剩余尝试次数减1
-
- # 处理异常
- handle_res = default_exception_handler(
- e,
- self.task_name,
- model_info.name,
- remain_try,
- retry_interval=self.req_conf.retry_interval,
- messages=(messages, compressed_messages is not None),
- )
-
- if handle_res[0] == -1:
- # 等待间隔为-1,表示不再请求该模型
- remain_try = 0
- elif handle_res[0] != 0:
- # 等待间隔不为0,表示需要等待
- await asyncio.sleep(handle_res[0])
- retry_interval *= 2
-
- if handle_res[1] is not None:
- # 压缩消息
- compressed_messages = handle_res[1]
-
- logger.error(f"任务-'{self.task_name}' 请求执行失败,所有模型均不可用")
- raise RuntimeError("请求失败,所有模型均不可用") # 所有请求尝试均失败
-
- async def get_embedding(
- self,
- embedding_input: str,
- ) -> APIResponse:
- """
- 获取嵌入向量
- :param embedding_input: 嵌入输入
- :return: APIResponse
- """
- for config in self.configs:
- client = self.client_map[config[0].api_provider]
- model_info: ModelInfo = config[0]
- model_usage_config: ModelUsageArgConfigItem = config[1]
- remain_try = (
- model_usage_config.max_retry or self.req_conf.max_retry
- ) + 1 # 初始化:剩余尝试次数 = 最大重试次数 + 1
-
- while remain_try:
- try:
- return await client.get_embedding(
- model_info=model_info,
- embedding_input=embedding_input,
- )
- except Exception as e:
- logger.debug(e)
- remain_try -= 1 # 剩余尝试次数减1
-
- # 处理异常
- handle_res = default_exception_handler(
- e,
- self.task_name,
- model_info.name,
- remain_try,
- retry_interval=self.req_conf.retry_interval,
- )
-
- if handle_res[0] == -1:
- # 等待间隔为-1,表示不再请求该模型
- remain_try = 0
- elif handle_res[0] != 0:
- # 等待间隔不为0,表示需要等待
- await asyncio.sleep(handle_res[0])
-
- logger.error(f"任务-'{self.task_name}' 请求执行失败,所有模型均不可用")
- raise RuntimeError("请求失败,所有模型均不可用") # 所有请求尝试均失败
diff --git a/src/llm_models/utils_model_bak.py b/src/llm_models/utils_model_bak.py
deleted file mode 100644
index fd78d559b..000000000
--- a/src/llm_models/utils_model_bak.py
+++ /dev/null
@@ -1,778 +0,0 @@
-import re
-from datetime import datetime
-from typing import Tuple, Union
-from src.common.logger import get_logger
-import base64
-from PIL import Image
-import io
-from src.common.database.database import db # 确保 db 被导入用于 create_tables
-from src.common.database.database_model import LLMUsage # 导入 LLMUsage 模型
-from src.config.config import global_config
-from rich.traceback import install
-
-from .exceptions import NetworkConnectionError, ReqAbortException, RespNotOkException, RespParseException, PayLoadTooLargeError, RequestAbortException, PermissionDeniedException
-install(extra_lines=3)
-
-logger = get_logger("model_utils")
-
-# 导入具体的异常类型用于精确的异常处理
-from .exceptions import NetworkConnectionError, ReqAbortException, RespNotOkException, RespParseException
-SPECIFIC_EXCEPTIONS_AVAILABLE = True
-
-# 新架构导入 - 使用延迟导入以支持fallback模式
-
-from .model_manager_bak import ModelManager
-from .model_client import ModelRequestHandler
-from .payload_content.message import MessageBuilder
-
-# 不在模块级别初始化ModelManager,延迟到实际使用时
-ModelManager_class = ModelManager
-model_manager = None # 延迟初始化
-
-# 添加请求处理器缓存,避免重复创建
-_request_handler_cache = {} # 格式: {(model_name, task_name): ModelRequestHandler}
-
-NEW_ARCHITECTURE_AVAILABLE = True
-logger.info("新架构模块导入成功")
-
-
-
-
-
-# 常见Error Code Mapping
-error_code_mapping = {
- 400: "参数不正确",
- 401: "API key 错误,认证失败,请检查 config/model_config.toml 中的配置是否正确",
- 402: "账号余额不足",
- 403: "需要实名,或余额不足",
- 404: "Not Found",
- 429: "请求过于频繁,请稍后再试",
- 500: "服务器内部故障",
- 503: "服务器负载过高",
-}
-
-
-
-
-class LLMRequest:
- """
- 重构后的LLM请求类,基于新的model_manager和model_client架构
- 保持向后兼容的API接口
- """
-
- # 定义需要转换的模型列表,作为类变量避免重复
- MODELS_NEEDING_TRANSFORMATION = [
- "o1",
- "o1-2024-12-17",
- "o1-mini",
- "o1-mini-2024-09-12",
- "o1-preview",
- "o1-preview-2024-09-12",
- "o1-pro",
- "o1-pro-2025-03-19",
- "o3",
- "o3-2025-04-16",
- "o3-mini",
- "o3-mini-2025-01-31",
- "o4-mini",
- "o4-mini-2025-04-16",
- ]
-
- def __init__(self, model: dict, **kwargs):
- """
- 初始化LLM请求实例
- Args:
- model: 模型配置字典,兼容旧格式和新格式
- **kwargs: 额外参数
- """
- logger.debug(f"🔍 [模型初始化] 开始初始化模型: {model.get('model_name', model.get('name', 'Unknown'))}")
- logger.debug(f"🔍 [模型初始化] 输入的模型配置: {model}")
- logger.debug(f"🔍 [模型初始化] 额外参数: {kwargs}")
-
- # 兼容新旧模型配置格式
- # 新格式使用 model_name,旧格式使用 name
- self.model_name: str = model.get("model_name", model.get("name", ""))
-
- # 如果传入的配置不完整,自动从全局配置中获取完整配置
- if not all(key in model for key in ["task_type", "capabilities"]):
- logger.debug("🔍 [模型初始化] 检测到不完整的模型配置,尝试获取完整配置")
- if (full_model_config := self._get_full_model_config(self.model_name)):
- logger.debug("🔍 [模型初始化] 成功获取完整模型配置,合并配置信息")
- # 合并配置:运行时参数优先,但添加缺失的配置字段
- model = {**full_model_config, **model}
- logger.debug(f"🔍 [模型初始化] 合并后的模型配置: {model}")
- else:
- logger.warning(f"⚠️ [模型初始化] 无法获取模型 {self.model_name} 的完整配置,使用原始配置")
-
- # 在新架构中,provider信息从model_config.toml自动获取,不需要在这里设置
- self.provider = model.get("provider", "") # 保留兼容性,但在新架构中不使用
-
- # 从全局配置中获取任务配置
- self.request_type = kwargs.pop("request_type", "default")
-
- # 确定使用哪个任务配置
- task_name = self._determine_task_name(model)
-
- # 初始化 request_handler
- self.request_handler = None
-
- # 尝试初始化新架构
- if NEW_ARCHITECTURE_AVAILABLE and ModelManager_class is not None:
- try:
- # 延迟初始化ModelManager
- global model_manager, _request_handler_cache
- if model_manager is None:
- from src.config.config import model_config
- model_manager = ModelManager_class(model_config)
- logger.debug("🔍 [模型初始化] ModelManager延迟初始化成功")
-
- # 构建缓存键
- cache_key = (self.model_name, task_name)
-
- # 检查是否已有缓存的请求处理器
- if cache_key in _request_handler_cache:
- self.request_handler = _request_handler_cache[cache_key]
- logger.debug(f"🚀 [性能优化] 从LLMRequest缓存获取请求处理器: {cache_key}")
- else:
- # 使用新架构获取模型请求处理器
- self.request_handler = model_manager[task_name]
- _request_handler_cache[cache_key] = self.request_handler
- logger.debug(f"🔧 [性能优化] 创建并缓存LLMRequest请求处理器: {cache_key}")
-
- logger.debug(f"🔍 [模型初始化] 成功获取模型请求处理器,任务: {task_name}")
- self.use_new_architecture = True
- except Exception as e:
- logger.warning(f"无法使用新架构,任务 {task_name} 初始化失败: {e}")
- logger.warning("回退到兼容模式,某些功能可能受限")
- self.request_handler = None
- self.use_new_architecture = False
- else:
- logger.warning("新架构不可用,使用兼容模式")
- logger.warning("回退到兼容模式,某些功能可能受限")
- self.request_handler = None
- self.use_new_architecture = False
-
- # 保存原始参数用于向后兼容
- self.params = kwargs
-
- # 兼容性属性,从模型配置中提取
- # 新格式和旧格式都支持
- self.enable_thinking = model.get("enable_thinking", False)
- self.temp = model.get("temperature", model.get("temp", 0.7)) # 新格式用temperature,旧格式用temp
- self.thinking_budget = model.get("thinking_budget", 4096)
- self.stream = model.get("stream", False)
- self.pri_in = model.get("pri_in", 0)
- self.pri_out = model.get("pri_out", 0)
- self.max_tokens = model.get("max_tokens", global_config.model.model_max_output_length)
-
- # 记录配置文件中声明了哪些参数(不管值是什么)
- self.has_enable_thinking = "enable_thinking" in model
- self.has_thinking_budget = "thinking_budget" in model
- self.pri_out = model.get("pri_out", 0)
- self.max_tokens = model.get("max_tokens", global_config.model.model_max_output_length)
-
- # 记录配置文件中声明了哪些参数(不管值是什么)
- self.has_enable_thinking = "enable_thinking" in model
- self.has_thinking_budget = "thinking_budget" in model
-
- logger.debug("🔍 [模型初始化] 模型参数设置完成:")
- logger.debug(f" - model_name: {self.model_name}")
- logger.debug(f" - provider: {self.provider}")
- logger.debug(f" - has_enable_thinking: {self.has_enable_thinking}")
- logger.debug(f" - enable_thinking: {self.enable_thinking}")
- logger.debug(f" - has_thinking_budget: {self.has_thinking_budget}")
- logger.debug(f" - thinking_budget: {self.thinking_budget}")
- logger.debug(f" - temp: {self.temp}")
- logger.debug(f" - stream: {self.stream}")
- logger.debug(f" - max_tokens: {self.max_tokens}")
- logger.debug(f" - use_new_architecture: {self.use_new_architecture}")
-
- # 获取数据库实例
- self._init_database()
-
- logger.debug(f"🔍 [模型初始化] 初始化完成,request_type: {self.request_type}")
-
- def _determine_task_name(self, model: dict) -> str:
- """
- 根据模型配置确定任务名称
- 优先使用配置文件中明确定义的任务类型,避免基于模型名称的脆弱推断
-
- Args:
- model: 模型配置字典
- Returns:
- 任务名称
- """
- # 调试信息:打印模型配置字典的所有键
- logger.debug(f"🔍 [任务确定] 模型配置字典的所有键: {list(model.keys())}")
- logger.debug(f"🔍 [任务确定] 模型配置字典内容: {model}")
-
- # 获取模型名称
- model_name = model.get("model_name", model.get("name", ""))
-
- # 方法1: 优先使用配置文件中明确定义的 task_type 字段
- if "task_type" in model:
- task_type = model["task_type"]
- logger.debug(f"🎯 [任务确定] 使用配置中的 task_type: {task_type}")
- return task_type
-
- # 方法2: 使用 capabilities 字段来推断主要任务类型
- if "capabilities" in model:
- capabilities = model["capabilities"]
- if isinstance(capabilities, list):
- # 按优先级顺序检查能力
- if "vision" in capabilities:
- logger.debug(f"🎯 [任务确定] 从 capabilities {capabilities} 推断为: vision")
- return "vision"
- elif "embedding" in capabilities:
- logger.debug(f"🎯 [任务确定] 从 capabilities {capabilities} 推断为: embedding")
- return "embedding"
- elif "speech" in capabilities:
- logger.debug(f"🎯 [任务确定] 从 capabilities {capabilities} 推断为: speech")
- return "speech"
- elif "text" in capabilities:
- # 如果只有文本能力,则根据request_type细分
- task = "llm_reasoning" if self.request_type == "reasoning" else "llm_normal"
- logger.debug(f"🎯 [任务确定] 从 capabilities {capabilities} 和 request_type {self.request_type} 推断为: {task}")
- return task
-
- # 方法3: 向后兼容 - 基于模型名称的关键字推断(不推荐但保留兼容性)
- logger.warning(f"⚠️ [任务确定] 配置中未找到 task_type 或 capabilities,回退到基于模型名称的推断: {model_name}")
- logger.warning("⚠️ [建议] 请在 model_config.toml 中为模型添加明确的 task_type 或 capabilities 字段")
-
- # 保留原有的关键字匹配逻辑作为fallback
- if any(keyword in model_name.lower() for keyword in ["vlm", "vision", "gpt-4o", "claude", "vl-"]):
- logger.debug(f"🎯 [任务确定] 从模型名称 {model_name} 推断为: vision")
- return "vision"
- elif any(keyword in model_name.lower() for keyword in ["embed", "text-embedding", "bge-"]):
- logger.debug(f"🎯 [任务确定] 从模型名称 {model_name} 推断为: embedding")
- return "embedding"
- elif any(keyword in model_name.lower() for keyword in ["whisper", "speech", "voice"]):
- logger.debug(f"🎯 [任务确定] 从模型名称 {model_name} 推断为: speech")
- return "speech"
- else:
- # 根据request_type确定,映射到配置文件中定义的任务
- task = "llm_reasoning" if self.request_type == "reasoning" else "llm_normal"
- logger.debug(f"🎯 [任务确定] 从 request_type {self.request_type} 推断为: {task}")
- return task
-
- def _get_full_model_config(self, model_name: str) -> dict | None:
- """
- 根据模型名称从全局配置中获取完整的模型配置
- 现在直接使用已解析的ModelInfo对象,不再读取TOML文件
-
- Args:
- model_name: 模型名称
- Returns:
- 完整的模型配置字典,如果找不到则返回None
- """
- try:
- from src.config.config import model_config
- return self._get_model_config_from_parsed(model_name, model_config)
-
- except Exception as e:
- logger.warning(f"⚠️ [配置查找] 获取模型配置时出错: {str(e)}")
- return None
-
- def _get_model_config_from_parsed(self, model_name: str, model_config) -> dict | None:
- """
- 从已解析的配置对象中获取模型配置
- 使用扩展后的ModelInfo类,包含task_type和capabilities字段
- """
- try:
- # 直接通过模型名称查找
- if model_name in model_config.models:
- model_info = model_config.models[model_name]
- logger.debug(f"🔍 [配置查找] 找到模型 {model_name} 的配置对象: {model_info}")
-
- # 将ModelInfo对象转换为字典
- model_dict = {
- "model_identifier": model_info.model_identifier,
- "name": model_info.name,
- "api_provider": model_info.api_provider,
- "price_in": model_info.price_in,
- "price_out": model_info.price_out,
- "force_stream_mode": model_info.force_stream_mode,
- "task_type": model_info.task_type,
- "capabilities": model_info.capabilities,
- }
-
- logger.debug(f"🔍 [配置查找] 转换后的模型配置字典: {model_dict}")
- return model_dict
-
- # 如果直接查找失败,尝试通过model_identifier查找
- for name, model_info in model_config.models.items():
- if (model_info.model_identifier == model_name or
- hasattr(model_info, 'model_name') and model_info.model_name == model_name):
-
- logger.debug(f"🔍 [配置查找] 通过标识符找到模型 {model_name} (配置名称: {name})")
- # 同样转换为字典
- model_dict = {
- "model_identifier": model_info.model_identifier,
- "name": model_info.name,
- "api_provider": model_info.api_provider,
- "price_in": model_info.price_in,
- "price_out": model_info.price_out,
- "force_stream_mode": model_info.force_stream_mode,
- "task_type": model_info.task_type,
- "capabilities": model_info.capabilities,
- }
-
- return model_dict
-
- return None
-
- except Exception as e:
- logger.warning(f"⚠️ [配置查找] 从已解析配置获取模型配置时出错: {str(e)}")
- return None
-
- @staticmethod
- def _init_database():
- """初始化数据库集合"""
- try:
- # 使用 Peewee 创建表,safe=True 表示如果表已存在则不会抛出错误
- db.create_tables([LLMUsage], safe=True)
- # logger.debug("LLMUsage 表已初始化/确保存在。")
- except Exception as e:
- logger.error(f"创建 LLMUsage 表失败: {str(e)}")
-
- def _record_usage(
- self,
- prompt_tokens: int,
- completion_tokens: int,
- total_tokens: int,
- user_id: str = "system",
- request_type: str | None = None,
- endpoint: str = "/chat/completions",
- ):
- """记录模型使用情况到数据库
- Args:
- prompt_tokens: 输入token数
- completion_tokens: 输出token数
- total_tokens: 总token数
- user_id: 用户ID,默认为system
- request_type: 请求类型
- endpoint: API端点
- """
- # 如果 request_type 为 None,则使用实例变量中的值
- if request_type is None:
- request_type = self.request_type
-
- try:
- # 使用 Peewee 模型创建记录
- LLMUsage.create(
- model_name=self.model_name,
- user_id=user_id,
- request_type=request_type,
- endpoint=endpoint,
- prompt_tokens=prompt_tokens,
- completion_tokens=completion_tokens,
- total_tokens=total_tokens,
- cost=self._calculate_cost(prompt_tokens, completion_tokens),
- status="success",
- timestamp=datetime.now(), # Peewee 会处理 DateTimeField
- )
- logger.debug(
- f"Token使用情况 - 模型: {self.model_name}, "
- f"用户: {user_id}, 类型: {request_type}, "
- f"提示词: {prompt_tokens}, 完成: {completion_tokens}, "
- f"总计: {total_tokens}"
- )
- except Exception as e:
- logger.error(f"记录token使用情况失败: {str(e)}")
-
- def _calculate_cost(self, prompt_tokens: int, completion_tokens: int) -> float:
- """计算API调用成本
- 使用模型的pri_in和pri_out价格计算输入和输出的成本
-
- Args:
- prompt_tokens: 输入token数量
- completion_tokens: 输出token数量
-
- Returns:
- float: 总成本(元)
- """
- # 使用模型的pri_in和pri_out计算成本
- input_cost = (prompt_tokens / 1000000) * self.pri_in
- output_cost = (completion_tokens / 1000000) * self.pri_out
- return round(input_cost + output_cost, 6)
-
- @staticmethod
- def _extract_reasoning(content: str) -> Tuple[str, str]:
- """CoT思维链提取"""
- match = re.search(r"(?:)?(.*?)", content, re.DOTALL)
- content = re.sub(r"(?:)?.*?", "", content, flags=re.DOTALL, count=1).strip()
- reasoning = match[1].strip() if match else ""
- return content, reasoning
-
- def _handle_model_exception(self, e: Exception, operation: str) -> None:
- """
- 统一的模型异常处理方法
- 根据异常类型提供更精确的错误信息和处理策略
-
- Args:
- e: 捕获的异常
- operation: 操作类型(用于日志记录)
- """
- operation_desc = {
- "image": "图片响应生成",
- "voice": "语音识别",
- "text": "文本响应生成",
- "embedding": "向量嵌入获取"
- }
-
- op_name = operation_desc.get(operation, operation)
-
- if SPECIFIC_EXCEPTIONS_AVAILABLE:
- # 使用具体异常类型进行精确处理
- if isinstance(e, NetworkConnectionError):
- logger.error(f"模型 {self.model_name} {op_name}失败: 网络连接错误")
- raise RuntimeError("网络连接异常,请检查网络连接状态或API服务器地址是否正确") from e
-
- elif isinstance(e, ReqAbortException):
- logger.error(f"模型 {self.model_name} {op_name}失败: 请求被中断")
- raise RuntimeError("请求被中断或取消,请稍后重试") from e
-
- elif isinstance(e, RespNotOkException):
- logger.error(f"模型 {self.model_name} {op_name}失败: HTTP响应错误 {e.status_code}")
- # 重新抛出原始异常,保留详细的状态码信息
- raise e
-
- elif isinstance(e, RespParseException):
- logger.error(f"模型 {self.model_name} {op_name}失败: 响应解析错误")
- raise RuntimeError("API响应格式异常,请检查模型配置或联系管理员") from e
-
- else:
- # 未知异常,使用通用处理
- logger.error(f"模型 {self.model_name} {op_name}失败: 未知错误 {type(e).__name__}: {str(e)}")
- self._handle_generic_exception(e, op_name)
- else:
- # 如果无法导入具体异常,使用通用处理
- logger.error(f"模型 {self.model_name} {op_name}失败: {str(e)}")
- self._handle_generic_exception(e, op_name)
-
- def _handle_generic_exception(self, e: Exception, operation: str) -> None:
- """
- 通用异常处理(向后兼容的错误字符串匹配)
-
- Args:
- e: 捕获的异常
- operation: 操作描述
- """
- error_str = str(e)
-
- # 基于错误消息内容的分类处理
- if "401" in error_str or "API key" in error_str or "认证" in error_str:
- raise RuntimeError("API key 错误,认证失败,请检查 config/model_config.toml 中的 API key 配置是否正确") from e
- elif "429" in error_str or "频繁" in error_str or "rate limit" in error_str:
- raise RuntimeError("请求过于频繁,请稍后再试") from e
- elif "500" in error_str or "503" in error_str or "服务器" in error_str:
- raise RuntimeError("服务器负载过高,模型回复失败QAQ") from e
- elif "413" in error_str or "payload" in error_str.lower() or "过大" in error_str:
- raise RuntimeError("请求体过大,请尝试压缩图片或减少输入内容") from e
- elif "timeout" in error_str.lower() or "超时" in error_str:
- raise RuntimeError("请求超时,请检查网络连接或稍后重试") from e
- else:
- raise RuntimeError(f"模型 {self.model_name} {operation}失败: {str(e)}") from e
-
- # === 主要API方法 ===
- # 这些方法提供与新架构的桥接
-
- async def generate_response_for_image(self, prompt: str, image_base64: str, image_format: str) -> Tuple:
- """
- 根据输入的提示和图片生成模型的异步响应
- 使用新架构的模型请求处理器
- """
- if not self.use_new_architecture:
- raise RuntimeError(
- f"模型 {self.model_name} 无法使用新架构,请检查 config/model_config.toml 中的 API 配置。"
- )
-
- if self.request_handler is None:
- raise RuntimeError(
- f"模型 {self.model_name} 请求处理器未初始化,无法处理图片请求"
- )
-
- if MessageBuilder is None:
- raise RuntimeError("MessageBuilder不可用,请检查新架构配置")
-
- try:
- # 构建包含图片的消息
- message_builder = MessageBuilder()
- message_builder.add_text_content(prompt).add_image_content(
- image_format=image_format,
- image_base64=image_base64
- )
- messages = [message_builder.build()]
-
- # 使用新架构发送请求(只传递支持的参数)
- response = await self.request_handler.get_response( # type: ignore
- messages=messages,
- tool_options=None,
- response_format=None
- )
-
- # 新架构返回的是 APIResponse 对象,直接提取内容
- content = response.content or ""
- reasoning_content = response.reasoning_content or ""
- tool_calls = response.tool_calls
-
- # 从内容中提取标签的推理内容(向后兼容)
- if not reasoning_content and content:
- content, extracted_reasoning = self._extract_reasoning(content)
- reasoning_content = extracted_reasoning
-
- # 记录token使用情况
- if response.usage:
- self._record_usage(
- prompt_tokens=response.usage.prompt_tokens or 0,
- completion_tokens=response.usage.completion_tokens or 0,
- total_tokens=response.usage.total_tokens or 0,
- user_id="system",
- request_type=self.request_type,
- endpoint="/chat/completions"
- )
-
- # 返回格式兼容旧版本
- if tool_calls:
- return content, reasoning_content, tool_calls
- else:
- return content, reasoning_content
-
- except Exception as e:
- self._handle_model_exception(e, "image")
- # 这行代码永远不会执行,因为_handle_model_exception总是抛出异常
- # 但是为了满足类型检查的要求,我们添加一个不可达的返回语句
- return "", "" # pragma: no cover
-
- async def generate_response_for_voice(self, voice_bytes: bytes) -> Tuple:
- """
- 根据输入的语音文件生成模型的异步响应
- 使用新架构的模型请求处理器
- """
- if not self.use_new_architecture:
- raise RuntimeError(
- f"模型 {self.model_name} 无法使用新架构,请检查 config/model_config.toml 中的 API 配置。"
- )
-
- if self.request_handler is None:
- raise RuntimeError(
- f"模型 {self.model_name} 请求处理器未初始化,无法处理语音请求"
- )
-
- try:
- # 构建语音识别请求参数
- # 注意:新架构中的语音识别可能使用不同的方法
- # 这里先使用get_response方法,可能需要根据实际API调整
- response = await self.request_handler.get_response( # type: ignore
- messages=[], # 语音识别可能不需要消息
- tool_options=None
- )
-
- # 新架构返回的是 APIResponse 对象,直接提取文本内容
- return (response.content,) if response.content else ("",)
-
- except Exception as e:
- self._handle_model_exception(e, "voice")
- # 不可达的返回语句,仅用于满足类型检查
- return ("",) # pragma: no cover
-
- async def generate_response_async(self, prompt: str, **kwargs) -> Union[str, Tuple]:
- """
- 异步方式根据输入的提示生成模型的响应
- 使用新架构的模型请求处理器,如无法使用则抛出错误
- """
- if not self.use_new_architecture:
- raise RuntimeError(
- f"模型 {self.model_name} 无法使用新架构,请检查 config/model_config.toml 中的 API 配置。"
- )
-
- if self.request_handler is None:
- raise RuntimeError(
- f"模型 {self.model_name} 请求处理器未初始化,无法生成响应"
- )
-
- if MessageBuilder is None:
- raise RuntimeError("MessageBuilder不可用,请检查新架构配置")
-
- try:
- # 构建消息
- message_builder = MessageBuilder()
- message_builder.add_text_content(prompt)
- messages = [message_builder.build()]
-
- # 使用新架构发送请求(只传递支持的参数)
- response = await self.request_handler.get_response( # type: ignore
- messages=messages,
- tool_options=None,
- response_format=None
- )
-
- # 新架构返回的是 APIResponse 对象,直接提取内容
- content = response.content or ""
- reasoning_content = response.reasoning_content or ""
- tool_calls = response.tool_calls
-
- # 从内容中提取标签的推理内容(向后兼容)
- if not reasoning_content and content:
- content, extracted_reasoning = self._extract_reasoning(content)
- reasoning_content = extracted_reasoning
-
- # 记录token使用情况
- if response.usage:
- self._record_usage(
- prompt_tokens=response.usage.prompt_tokens or 0,
- completion_tokens=response.usage.completion_tokens or 0,
- total_tokens=response.usage.total_tokens or 0,
- user_id="system",
- request_type=self.request_type,
- endpoint="/chat/completions"
- )
-
- # 返回格式兼容旧版本
- if tool_calls:
- return content, (reasoning_content, self.model_name, tool_calls)
- else:
- return content, (reasoning_content, self.model_name)
-
- except Exception as e:
- self._handle_model_exception(e, "text")
- # 不可达的返回语句,仅用于满足类型检查
- return "", ("", self.model_name) # pragma: no cover
-
- async def get_embedding(self, text: str) -> Union[list, None]:
- """
- 异步方法:获取文本的embedding向量
- 使用新架构的模型请求处理器
-
- Args:
- text: 需要获取embedding的文本
-
- Returns:
- list: embedding向量,如果失败则返回None
- """
- if not text:
- logger.debug("该消息没有长度,不再发送获取embedding向量的请求")
- return None
-
- if not self.use_new_architecture:
- logger.warning(f"模型 {self.model_name} 无法使用新架构,embedding请求将被跳过")
- return None
-
- if self.request_handler is None:
- logger.warning(f"模型 {self.model_name} 请求处理器未初始化,embedding请求将被跳过")
- return None
-
- try:
- # 构建embedding请求参数
- # 使用新架构的get_embedding方法
- response = await self.request_handler.get_embedding(text) # type: ignore
-
- # 新架构返回的是 APIResponse 对象,直接提取embedding
- if response.embedding:
- embedding = response.embedding
-
- # 记录token使用情况
- if response.usage:
- self._record_usage(
- prompt_tokens=response.usage.prompt_tokens or 0,
- completion_tokens=response.usage.completion_tokens or 0,
- total_tokens=response.usage.total_tokens or 0,
- user_id="system",
- request_type=self.request_type,
- endpoint="/embeddings"
- )
-
- return embedding
- else:
- logger.warning(f"模型 {self.model_name} 返回的embedding响应为空")
- return None
-
- except Exception as e:
- # 对于embedding请求,我们记录错误但不抛出异常,而是返回None
- # 这是为了保持与原有行为的兼容性
- try:
- self._handle_model_exception(e, "embedding")
- except RuntimeError:
- # 捕获_handle_model_exception抛出的RuntimeError,转换为警告日志
- logger.warning(f"模型 {self.model_name} embedding请求失败,返回None: {str(e)}")
- return None
-
-
-def compress_base64_image_by_scale(base64_data: str, target_size: int = int(0.8 * 1024 * 1024)) -> str:
- """压缩base64格式的图片到指定大小
- Args:
- base64_data: base64编码的图片数据
- target_size: 目标文件大小(字节),默认0.8MB
- Returns:
- str: 压缩后的base64图片数据
- """
- try:
- # 将base64转换为字节数据
- # 确保base64字符串只包含ASCII字符
- if isinstance(base64_data, str):
- base64_data = base64_data.encode("ascii", errors="ignore").decode("ascii")
- image_data = base64.b64decode(base64_data)
-
- # 如果已经小于目标大小,直接返回原图
- if len(image_data) <= 2 * 1024 * 1024:
- return base64_data
-
- # 将字节数据转换为图片对象
- img = Image.open(io.BytesIO(image_data))
-
- # 获取原始尺寸
- original_width, original_height = img.size
-
- # 计算缩放比例
- scale = min(1.0, (target_size / len(image_data)) ** 0.5)
-
- # 计算新的尺寸
- new_width = int(original_width * scale)
- new_height = int(original_height * scale)
-
- # 创建内存缓冲区
- output_buffer = io.BytesIO()
-
- # 如果是GIF,处理所有帧
- if getattr(img, "is_animated", False):
- frames = []
- n_frames = getattr(img, 'n_frames', 1)
- for frame_idx in range(n_frames):
- img.seek(frame_idx)
- new_frame = img.copy()
- new_frame = new_frame.resize((new_width // 2, new_height // 2), Image.Resampling.LANCZOS) # 动图折上折
- frames.append(new_frame)
-
- # 保存到缓冲区
- frames[0].save(
- output_buffer,
- format="GIF",
- save_all=True,
- append_images=frames[1:],
- optimize=True,
- duration=img.info.get("duration", 100),
- loop=img.info.get("loop", 0),
- )
- else:
- # 处理静态图片
- resized_img = img.resize((new_width, new_height), Image.Resampling.LANCZOS)
-
- # 保存到缓冲区,保持原始格式
- if img.format == "PNG" and img.mode in ("RGBA", "LA"):
- resized_img.save(output_buffer, format="PNG", optimize=True)
- else:
- resized_img.save(output_buffer, format="JPEG", quality=95, optimize=True)
-
- # 获取压缩后的数据并转换为base64
- compressed_data = output_buffer.getvalue()
- logger.info(f"压缩图片: {original_width}x{original_height} -> {new_width}x{new_height}")
- logger.info(f"压缩前大小: {len(image_data) / 1024:.1f}KB, 压缩后大小: {len(compressed_data) / 1024:.1f}KB")
-
- return base64.b64encode(compressed_data).decode("utf-8")
-
- except Exception as e:
- logger.error(f"压缩图片失败: {str(e)}")
- import traceback
-
- logger.error(traceback.format_exc())
- return base64_data