文档更新,changelog更新

This commit is contained in:
UnCLAS-Prommer
2025-07-31 14:28:16 +08:00
parent 52acfe5958
commit baaf0262b3
6 changed files with 68 additions and 1256 deletions

View File

@@ -1,5 +1,17 @@
# Changelog
## [0.10.0] - 2025-7-1
### 主要功能更改
- 工具系统重构,现在合并到了插件系统中
- 彻底重构了整个LLM Request了现在支持模型轮询和更多灵活的参数
- 同时重构了整个模型配置系统升级需要重新配置llm配置文件
- 随着LLM Request的重构插件系统彻底重构完成。插件系统进入稳定状态仅增加新的API
- 具体相比于之前的更改可以查看[changes.md](./changes.md)
### 细节优化
- 修复了lint爆炸的问题代码更加规范了
- 修改了log的颜色更加护眼
## [0.9.1] - 2025-7-26
### 主要修复和优化

View File

@@ -25,6 +25,7 @@
- 这意味着你终于可以动态控制是否继续后续消息的处理了。
8. 移除了dependency_manager但是依然保留了`python_dependencies`属性,等待后续重构。
- 一并移除了文档有关manager的内容。
9. 增加了工具的有关api
# 插件系统修改
1. 现在所有的匹配模式不再是关键字了,而是枚举类。**(可能有遗漏)**
@@ -57,30 +58,12 @@
15. 实现了组件的局部禁用,也就是针对某一个聊天禁用的功能。
- 通过`disable_specific_chat_action``enable_specific_chat_action``disable_specific_chat_command``enable_specific_chat_command``disable_specific_chat_event_handler``enable_specific_chat_event_handler`来操作
- 同样不保存到配置文件~
16. 把`BaseTool`一并合并进入了插件系统
# 官方插件修改
1. `HelloWorld`插件现在有一个样例的`EventHandler`
2. 内置插件增加了一个通过`Command`来管理插件的功能。具体是使用`/pm`命令唤起。
### TODO
把这个看起来就很别扭的config获取方式改一下
# 吐槽
```python
plugin_path = Path(plugin_file)
if plugin_path.parent.name != "plugins":
# 插件包格式parent_dir.plugin
module_name = f"plugins.{plugin_path.parent.name}.plugin"
else:
# 单文件格式plugins.filename
module_name = f"plugins.{plugin_path.stem}"
```
```python
plugin_path = Path(plugin_file)
module_name = ".".join(plugin_path.parent.parts)
```
这两个区别很大的。
2. 内置插件增加了一个通过`Command`来管理插件的功能。具体是使用`/pm`命令唤起。(需要自行启用)
3. `HelloWorld`插件现在有一个样例的`CompareNumbersTool`
### 执笔BGM
塞壬唱片!

View File

@@ -33,21 +33,26 @@ class MyTool(BaseTool):
# 工具描述告诉LLM这个工具的用途
description = "这个工具用于获取特定类型的信息"
# 参数定义,遵循JSONSchema格式
parameters = {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "查询参数"
},
"limit": {
"type": "integer",
"description": "结果数量限制"
}
},
"required": ["query"]
}
# 参数定义,仅定义参数
# 比如想要定义一个类似下面的openai格式的参数表则可以这么定义:
# {
# "type": "object",
# "properties": {
# "query": {
# "type": "string",
# "description": "查询参数"
# },
# "limit": {
# "type": "integer",
# "description": "结果数量限制"
# }
# },
# "required": ["query"]
# }
parameters = [
("query", "string", "查询参数", True), # 必填参数
("limit", "integer", "结果数量限制", False) # 可选参数
]
available_for_llm = True # 是否对LLM可用
@@ -68,7 +73,7 @@ class MyTool(BaseTool):
|-----|------|------|
| `name` | str | 工具的唯一标识名称 |
| `description` | str | 工具功能描述帮助LLM理解用途 |
| `parameters` | dict | JSONSchema格式的参数定义 |
| `parameters` | list[tuple] | 参数定义 |
### 方法说明
@@ -92,23 +97,13 @@ class WeatherTool(BaseTool):
name = "weather_query"
description = "查询指定城市的实时天气信息,包括温度、湿度、天气状况等"
available_for_llm = True # 允许LLM调用此工具
parameters = [
("city", "string", "要查询天气的城市名称,如:北京、上海、纽约", True),
("country", "string", "国家代码CN、US可选参数", False)
]
parameters = {
"type": "object",
"properties": {
"city": {
"type": "string",
"description": "要查询天气的城市名称,如:北京、上海、纽约"
},
"country": {
"type": "string",
"description": "国家代码CN、US可选参数"
}
},
"required": ["city"]
}
async def execute(self, function_args, message_txt=""):
async def execute(self, function_args: dict):
"""执行天气查询"""
try:
city = function_args.get("city")
@@ -185,66 +180,49 @@ class WeatherTool(BaseTool):
## 🎯 最佳实践
### 1. 工具命名规范
#### ✅ 好的命名
```python
# ✅ 好的命名
name = "weather_query" # 清晰表达功能
name = "knowledge_search" # 描述性强
name = "stock_price_check" # 功能明确
# ❌ 避免的命名
```
#### ❌ 避免的命名
```python
name = "tool1" # 无意义
name = "wq" # 过于简短
name = "weather_and_news" # 功能过于复杂
```
### 2. 描述规范
#### ✅ 良好的描述
```python
# ✅ 好的描述
description = "查询指定城市的实时天气信息,包括温度、湿度、天气状况"
# ❌ 避免的描述
```
#### ❌ 避免的描述
```python
description = "天气" # 过于简单
description = "获取信息" # 不够具体
```
### 3. 参数设计
#### ✅ 合理的参数设计
```python
# ✅ 合理的参数设计
parameters = {
"type": "object",
"properties": {
"city": {
"type": "string",
"description": "城市名称,如:北京、上海"
},
"unit": {
"type": "string",
"description": "温度单位celsius(摄氏度) 或 fahrenheit(华氏度)",
"enum": ["celsius", "fahrenheit"]
}
},
"required": ["city"]
}
# ❌ 避免的参数设计
parameters = {
"type": "object",
"properties": {
"data": {
"type": "string",
"description": "数据" # 描述不清晰
}
}
}
parameters = [
("city", "string", "城市名称,如:北京、上海", True),
("unit", "string", "温度单位celsius 或 fahrenheit", False)
]
```
#### ❌ 避免的参数设计
```python
parameters = [
("data", "string", "数据", True) # 参数过于模糊
]
```
### 4. 结果格式化
#### ✅ 良好的结果格式
```python
# ✅ 良好的结果格式
def _format_result(self, data):
return f"""
🔍 查询结果
@@ -254,12 +232,9 @@ def _format_result(self, data):
📝 说明: {data['description']}
━━━━━━━━━━━━
""".strip()
# ❌ 避免的结果格式
```
#### ❌ 避免的结果格式
```python
def _format_result(self, data):
return str(data) # 直接返回原始数据
```
---
🎉 **工具系统为麦麦提供了强大的信息获取能力!合理使用工具可以让麦麦变得更加智能和博学。**

View File

@@ -10,7 +10,7 @@ import networkx as nx
import numpy as np
from itertools import combinations
from typing import List, Tuple, Coroutine, Any, Dict, Set
from typing import List, Tuple, Coroutine, Any, Set
from collections import Counter
from rich.traceback import install
@@ -1267,7 +1267,7 @@ class ParahippocampalGyrus:
logger.debug(f"过滤后话题: {filtered_topics}")
# 4. 创建所有话题的摘要生成任务
tasks: List[Tuple[str, Coroutine[Any, Any, Tuple[str, Tuple[str, str, List[Dict[str, Any]] | None]]]]] = []
tasks: List[Tuple[str, Coroutine[Any, Any, Tuple[str, Tuple[str, str, List | None]]]]] = []
for topic in filtered_topics:
# 调用修改后的 topic_what不再需要 time_info
topic_what_prompt = self.hippocampus.topic_what(input_text, topic)

View File

@@ -1,380 +0,0 @@
import asyncio
from typing import Callable, Any
from openai import AsyncStream
from openai.types.chat import ChatCompletionChunk, ChatCompletion
from .base_client import BaseClient, APIResponse
from src.config.api_ada_configs import (
ModelInfo,
ModelUsageArgConfigItem,
RequestConfig,
ModuleConfig,
)
from ..exceptions import (
NetworkConnectionError,
ReqAbortException,
RespNotOkException,
RespParseException,
)
from ..payload_content.message import Message
from ..payload_content.resp_format import RespFormat
from ..payload_content.tool_option import ToolOption
from ..utils import compress_messages
from src.common.logger import get_logger
logger = get_logger("模型客户端")
def _check_retry(
remain_try: int,
retry_interval: int,
can_retry_msg: str,
cannot_retry_msg: str,
can_retry_callable: Callable | None = None,
**kwargs,
) -> tuple[int, Any | None]:
"""
辅助函数:检查是否可以重试
:param remain_try: 剩余尝试次数
:param retry_interval: 重试间隔
:param can_retry_msg: 可以重试时的提示信息
:param cannot_retry_msg: 不可以重试时的提示信息
:return: (等待间隔如果为0则不等待为-1则不再请求该模型, 新的消息列表(适用于压缩消息))
"""
if remain_try > 0:
# 还有重试机会
logger.warning(f"{can_retry_msg}")
if can_retry_callable is not None:
return retry_interval, can_retry_callable(**kwargs)
else:
return retry_interval, None
else:
# 达到最大重试次数
logger.warning(f"{cannot_retry_msg}")
return -1, None # 不再重试请求该模型
def _handle_resp_not_ok(
e: RespNotOkException,
task_name: str,
model_name: str,
remain_try: int,
retry_interval: int = 10,
messages: tuple[list[Message], bool] | None = None,
):
"""
处理响应错误异常
:param e: 异常对象
:param task_name: 任务名称
:param model_name: 模型名称
:param remain_try: 剩余尝试次数
:param retry_interval: 重试间隔
:param messages: (消息列表, 是否已压缩过)
:return: (等待间隔如果为0则不等待为-1则不再请求该模型, 新的消息列表(适用于压缩消息))
"""
# 响应错误
if e.status_code in [401, 403]:
# API Key认证错误 - 让多API Key机制处理给一次重试机会
if remain_try > 0:
logger.warning(
f"任务-'{task_name}' 模型-'{model_name}'\n"
f"API Key认证失败错误代码-{e.status_code}多API Key机制会自动切换"
)
return 0, None # 立即重试让底层客户端切换API Key
else:
logger.warning(
f"任务-'{task_name}' 模型-'{model_name}'\n"
f"所有API Key都认证失败错误代码-{e.status_code},错误信息-{e.message}"
)
return -1, None # 不再重试请求该模型
elif e.status_code in [400, 402, 404]:
# 其他客户端错误(不应该重试)
logger.warning(
f"任务-'{task_name}' 模型-'{model_name}'\n"
f"请求失败,错误代码-{e.status_code},错误信息-{e.message}"
)
return -1, None # 不再重试请求该模型
elif e.status_code == 413:
if messages and not messages[1]:
# 消息列表不为空且未压缩,尝试压缩消息
return _check_retry(
remain_try,
0,
can_retry_msg=(
f"任务-'{task_name}' 模型-'{model_name}'\n"
"请求体过大,尝试压缩消息后重试"
),
cannot_retry_msg=(
f"任务-'{task_name}' 模型-'{model_name}'\n"
"请求体过大,压缩消息后仍然过大,放弃请求"
),
can_retry_callable=compress_messages,
messages=messages[0],
)
# 没有消息可压缩
logger.warning(
f"任务-'{task_name}' 模型-'{model_name}'\n"
"请求体过大,无法压缩消息,放弃请求。"
)
return -1, None
elif e.status_code == 429:
# 请求过于频繁 - 让多API Key机制处理适当延迟后重试
return _check_retry(
remain_try,
min(retry_interval, 5), # 限制最大延迟为5秒让API Key切换更快生效
can_retry_msg=(
f"任务-'{task_name}' 模型-'{model_name}'\n"
f"请求过于频繁多API Key机制会自动切换{min(retry_interval, 5)}秒后重试"
),
cannot_retry_msg=(
f"任务-'{task_name}' 模型-'{model_name}'\n"
"请求过于频繁所有API Key都被限制放弃请求"
),
)
elif e.status_code >= 500:
# 服务器错误
return _check_retry(
remain_try,
retry_interval,
can_retry_msg=(
f"任务-'{task_name}' 模型-'{model_name}'\n"
f"服务器错误,将于{retry_interval}秒后重试"
),
cannot_retry_msg=(
f"任务-'{task_name}' 模型-'{model_name}'\n"
"服务器错误,超过最大重试次数,请稍后再试"
),
)
else:
# 未知错误
logger.warning(
f"任务-'{task_name}' 模型-'{model_name}'\n"
f"未知错误,错误代码-{e.status_code},错误信息-{e.message}"
)
return -1, None
def default_exception_handler(
e: Exception,
task_name: str,
model_name: str,
remain_try: int,
retry_interval: int = 10,
messages: tuple[list[Message], bool] | None = None,
) -> tuple[int, list[Message] | None]:
"""
默认异常处理函数
:param e: 异常对象
:param task_name: 任务名称
:param model_name: 模型名称
:param remain_try: 剩余尝试次数
:param retry_interval: 重试间隔
:param messages: (消息列表, 是否已压缩过)
:return (等待间隔如果为0则不等待为-1则不再请求该模型, 新的消息列表(适用于压缩消息))
"""
if isinstance(e, NetworkConnectionError): # 网络连接错误
# 网络错误可能是某个API Key的端点问题给多API Key机制一次快速重试机会
return _check_retry(
remain_try,
min(retry_interval, 3), # 网络错误时减少等待时间让API Key切换更快
can_retry_msg=(
f"任务-'{task_name}' 模型-'{model_name}'\n"
f"连接异常多API Key机制会尝试其他Key{min(retry_interval, 3)}秒后重试"
),
cannot_retry_msg=(
f"任务-'{task_name}' 模型-'{model_name}'\n"
f"连接异常超过最大重试次数请检查网络连接状态或URL是否正确"
),
)
elif isinstance(e, ReqAbortException):
logger.warning(
f"任务-'{task_name}' 模型-'{model_name}'\n请求被中断,详细信息-{str(e.message)}"
)
return -1, None # 不再重试请求该模型
elif isinstance(e, RespNotOkException):
return _handle_resp_not_ok(
e,
task_name,
model_name,
remain_try,
retry_interval,
messages,
)
elif isinstance(e, RespParseException):
# 响应解析错误
logger.error(
f"任务-'{task_name}' 模型-'{model_name}'\n"
f"响应解析错误,错误信息-{e.message}\n"
)
logger.debug(f"附加内容:\n{str(e.ext_info)}")
return -1, None # 不再重试请求该模型
else:
logger.error(
f"任务-'{task_name}' 模型-'{model_name}'\n未知异常,错误信息-{str(e)}"
)
return -1, None # 不再重试请求该模型
class ModelRequestHandler:
"""
模型请求处理器
"""
def __init__(
self,
task_name: str,
config: ModuleConfig,
api_client_map: dict[str, BaseClient],
):
self.task_name: str = task_name
"""任务名称"""
self.client_map: dict[str, BaseClient] = {}
"""API客户端列表"""
self.configs: list[tuple[ModelInfo, ModelUsageArgConfigItem]] = []
"""模型参数配置"""
self.req_conf: RequestConfig = config.req_conf
"""请求配置"""
# 获取模型与使用配置
for model_usage in config.task_model_arg_map[task_name].usage:
if model_usage.name not in config.models:
logger.error(f"Model '{model_usage.name}' not found in ModelManager")
raise KeyError(f"Model '{model_usage.name}' not found in ModelManager")
model_info = config.models[model_usage.name]
if model_info.api_provider not in self.client_map:
# 缓存API客户端
self.client_map[model_info.api_provider] = api_client_map[
model_info.api_provider
]
self.configs.append((model_info, model_usage)) # 添加模型与使用配置
async def get_response(
self,
messages: list[Message],
tool_options: list[ToolOption] | None = None,
response_format: RespFormat | None = None, # 暂不启用
stream_response_handler: Callable[
[AsyncStream[ChatCompletionChunk], asyncio.Event | None], APIResponse
]
| None = None,
async_response_parser: Callable[[ChatCompletion], APIResponse] | None = None,
interrupt_flag: asyncio.Event | None = None,
) -> APIResponse:
"""
获取对话响应
:param messages: 消息列表
:param tool_options: 工具选项列表
:param response_format: 响应格式
:param stream_response_handler: 流式响应处理函数(可选)
:param async_response_parser: 响应解析函数(可选)
:param interrupt_flag: 中断信号量可选默认为None
:return: APIResponse
"""
# 遍历可用模型,若获取响应失败,则使用下一个模型继续请求
for config_item in self.configs:
client = self.client_map[config_item[0].api_provider]
model_info: ModelInfo = config_item[0]
model_usage_config: ModelUsageArgConfigItem = config_item[1]
remain_try = (
model_usage_config.max_retry or self.req_conf.max_retry
) + 1 # 初始化:剩余尝试次数 = 最大重试次数 + 1
compressed_messages = None
retry_interval = self.req_conf.retry_interval
while remain_try > 0:
try:
return await client.get_response(
model_info,
message_list=(compressed_messages or messages),
tool_options=tool_options,
max_tokens=model_usage_config.max_tokens
or self.req_conf.default_max_tokens,
temperature=model_usage_config.temperature
or self.req_conf.default_temperature,
response_format=response_format,
stream_response_handler=stream_response_handler,
async_response_parser=async_response_parser,
interrupt_flag=interrupt_flag,
)
except Exception as e:
logger.debug(e)
remain_try -= 1 # 剩余尝试次数减1
# 处理异常
handle_res = default_exception_handler(
e,
self.task_name,
model_info.name,
remain_try,
retry_interval=self.req_conf.retry_interval,
messages=(messages, compressed_messages is not None),
)
if handle_res[0] == -1:
# 等待间隔为-1表示不再请求该模型
remain_try = 0
elif handle_res[0] != 0:
# 等待间隔不为0表示需要等待
await asyncio.sleep(handle_res[0])
retry_interval *= 2
if handle_res[1] is not None:
# 压缩消息
compressed_messages = handle_res[1]
logger.error(f"任务-'{self.task_name}' 请求执行失败,所有模型均不可用")
raise RuntimeError("请求失败,所有模型均不可用") # 所有请求尝试均失败
async def get_embedding(
self,
embedding_input: str,
) -> APIResponse:
"""
获取嵌入向量
:param embedding_input: 嵌入输入
:return: APIResponse
"""
for config in self.configs:
client = self.client_map[config[0].api_provider]
model_info: ModelInfo = config[0]
model_usage_config: ModelUsageArgConfigItem = config[1]
remain_try = (
model_usage_config.max_retry or self.req_conf.max_retry
) + 1 # 初始化:剩余尝试次数 = 最大重试次数 + 1
while remain_try:
try:
return await client.get_embedding(
model_info=model_info,
embedding_input=embedding_input,
)
except Exception as e:
logger.debug(e)
remain_try -= 1 # 剩余尝试次数减1
# 处理异常
handle_res = default_exception_handler(
e,
self.task_name,
model_info.name,
remain_try,
retry_interval=self.req_conf.retry_interval,
)
if handle_res[0] == -1:
# 等待间隔为-1表示不再请求该模型
remain_try = 0
elif handle_res[0] != 0:
# 等待间隔不为0表示需要等待
await asyncio.sleep(handle_res[0])
logger.error(f"任务-'{self.task_name}' 请求执行失败,所有模型均不可用")
raise RuntimeError("请求失败,所有模型均不可用") # 所有请求尝试均失败

View File

@@ -1,778 +0,0 @@
import re
from datetime import datetime
from typing import Tuple, Union
from src.common.logger import get_logger
import base64
from PIL import Image
import io
from src.common.database.database import db # 确保 db 被导入用于 create_tables
from src.common.database.database_model import LLMUsage # 导入 LLMUsage 模型
from src.config.config import global_config
from rich.traceback import install
from .exceptions import NetworkConnectionError, ReqAbortException, RespNotOkException, RespParseException, PayLoadTooLargeError, RequestAbortException, PermissionDeniedException
install(extra_lines=3)
logger = get_logger("model_utils")
# 导入具体的异常类型用于精确的异常处理
from .exceptions import NetworkConnectionError, ReqAbortException, RespNotOkException, RespParseException
SPECIFIC_EXCEPTIONS_AVAILABLE = True
# 新架构导入 - 使用延迟导入以支持fallback模式
from .model_manager_bak import ModelManager
from .model_client import ModelRequestHandler
from .payload_content.message import MessageBuilder
# 不在模块级别初始化ModelManager延迟到实际使用时
ModelManager_class = ModelManager
model_manager = None # 延迟初始化
# 添加请求处理器缓存,避免重复创建
_request_handler_cache = {} # 格式: {(model_name, task_name): ModelRequestHandler}
NEW_ARCHITECTURE_AVAILABLE = True
logger.info("新架构模块导入成功")
# 常见Error Code Mapping
error_code_mapping = {
400: "参数不正确",
401: "API key 错误,认证失败,请检查 config/model_config.toml 中的配置是否正确",
402: "账号余额不足",
403: "需要实名,或余额不足",
404: "Not Found",
429: "请求过于频繁,请稍后再试",
500: "服务器内部故障",
503: "服务器负载过高",
}
class LLMRequest:
"""
重构后的LLM请求类基于新的model_manager和model_client架构
保持向后兼容的API接口
"""
# 定义需要转换的模型列表,作为类变量避免重复
MODELS_NEEDING_TRANSFORMATION = [
"o1",
"o1-2024-12-17",
"o1-mini",
"o1-mini-2024-09-12",
"o1-preview",
"o1-preview-2024-09-12",
"o1-pro",
"o1-pro-2025-03-19",
"o3",
"o3-2025-04-16",
"o3-mini",
"o3-mini-2025-01-31",
"o4-mini",
"o4-mini-2025-04-16",
]
def __init__(self, model: dict, **kwargs):
"""
初始化LLM请求实例
Args:
model: 模型配置字典,兼容旧格式和新格式
**kwargs: 额外参数
"""
logger.debug(f"🔍 [模型初始化] 开始初始化模型: {model.get('model_name', model.get('name', 'Unknown'))}")
logger.debug(f"🔍 [模型初始化] 输入的模型配置: {model}")
logger.debug(f"🔍 [模型初始化] 额外参数: {kwargs}")
# 兼容新旧模型配置格式
# 新格式使用 model_name旧格式使用 name
self.model_name: str = model.get("model_name", model.get("name", ""))
# 如果传入的配置不完整,自动从全局配置中获取完整配置
if not all(key in model for key in ["task_type", "capabilities"]):
logger.debug("🔍 [模型初始化] 检测到不完整的模型配置,尝试获取完整配置")
if (full_model_config := self._get_full_model_config(self.model_name)):
logger.debug("🔍 [模型初始化] 成功获取完整模型配置,合并配置信息")
# 合并配置:运行时参数优先,但添加缺失的配置字段
model = {**full_model_config, **model}
logger.debug(f"🔍 [模型初始化] 合并后的模型配置: {model}")
else:
logger.warning(f"⚠️ [模型初始化] 无法获取模型 {self.model_name} 的完整配置,使用原始配置")
# 在新架构中provider信息从model_config.toml自动获取不需要在这里设置
self.provider = model.get("provider", "") # 保留兼容性,但在新架构中不使用
# 从全局配置中获取任务配置
self.request_type = kwargs.pop("request_type", "default")
# 确定使用哪个任务配置
task_name = self._determine_task_name(model)
# 初始化 request_handler
self.request_handler = None
# 尝试初始化新架构
if NEW_ARCHITECTURE_AVAILABLE and ModelManager_class is not None:
try:
# 延迟初始化ModelManager
global model_manager, _request_handler_cache
if model_manager is None:
from src.config.config import model_config
model_manager = ModelManager_class(model_config)
logger.debug("🔍 [模型初始化] ModelManager延迟初始化成功")
# 构建缓存键
cache_key = (self.model_name, task_name)
# 检查是否已有缓存的请求处理器
if cache_key in _request_handler_cache:
self.request_handler = _request_handler_cache[cache_key]
logger.debug(f"🚀 [性能优化] 从LLMRequest缓存获取请求处理器: {cache_key}")
else:
# 使用新架构获取模型请求处理器
self.request_handler = model_manager[task_name]
_request_handler_cache[cache_key] = self.request_handler
logger.debug(f"🔧 [性能优化] 创建并缓存LLMRequest请求处理器: {cache_key}")
logger.debug(f"🔍 [模型初始化] 成功获取模型请求处理器,任务: {task_name}")
self.use_new_architecture = True
except Exception as e:
logger.warning(f"无法使用新架构,任务 {task_name} 初始化失败: {e}")
logger.warning("回退到兼容模式,某些功能可能受限")
self.request_handler = None
self.use_new_architecture = False
else:
logger.warning("新架构不可用,使用兼容模式")
logger.warning("回退到兼容模式,某些功能可能受限")
self.request_handler = None
self.use_new_architecture = False
# 保存原始参数用于向后兼容
self.params = kwargs
# 兼容性属性,从模型配置中提取
# 新格式和旧格式都支持
self.enable_thinking = model.get("enable_thinking", False)
self.temp = model.get("temperature", model.get("temp", 0.7)) # 新格式用temperature旧格式用temp
self.thinking_budget = model.get("thinking_budget", 4096)
self.stream = model.get("stream", False)
self.pri_in = model.get("pri_in", 0)
self.pri_out = model.get("pri_out", 0)
self.max_tokens = model.get("max_tokens", global_config.model.model_max_output_length)
# 记录配置文件中声明了哪些参数(不管值是什么)
self.has_enable_thinking = "enable_thinking" in model
self.has_thinking_budget = "thinking_budget" in model
self.pri_out = model.get("pri_out", 0)
self.max_tokens = model.get("max_tokens", global_config.model.model_max_output_length)
# 记录配置文件中声明了哪些参数(不管值是什么)
self.has_enable_thinking = "enable_thinking" in model
self.has_thinking_budget = "thinking_budget" in model
logger.debug("🔍 [模型初始化] 模型参数设置完成:")
logger.debug(f" - model_name: {self.model_name}")
logger.debug(f" - provider: {self.provider}")
logger.debug(f" - has_enable_thinking: {self.has_enable_thinking}")
logger.debug(f" - enable_thinking: {self.enable_thinking}")
logger.debug(f" - has_thinking_budget: {self.has_thinking_budget}")
logger.debug(f" - thinking_budget: {self.thinking_budget}")
logger.debug(f" - temp: {self.temp}")
logger.debug(f" - stream: {self.stream}")
logger.debug(f" - max_tokens: {self.max_tokens}")
logger.debug(f" - use_new_architecture: {self.use_new_architecture}")
# 获取数据库实例
self._init_database()
logger.debug(f"🔍 [模型初始化] 初始化完成request_type: {self.request_type}")
def _determine_task_name(self, model: dict) -> str:
"""
根据模型配置确定任务名称
优先使用配置文件中明确定义的任务类型,避免基于模型名称的脆弱推断
Args:
model: 模型配置字典
Returns:
任务名称
"""
# 调试信息:打印模型配置字典的所有键
logger.debug(f"🔍 [任务确定] 模型配置字典的所有键: {list(model.keys())}")
logger.debug(f"🔍 [任务确定] 模型配置字典内容: {model}")
# 获取模型名称
model_name = model.get("model_name", model.get("name", ""))
# 方法1: 优先使用配置文件中明确定义的 task_type 字段
if "task_type" in model:
task_type = model["task_type"]
logger.debug(f"🎯 [任务确定] 使用配置中的 task_type: {task_type}")
return task_type
# 方法2: 使用 capabilities 字段来推断主要任务类型
if "capabilities" in model:
capabilities = model["capabilities"]
if isinstance(capabilities, list):
# 按优先级顺序检查能力
if "vision" in capabilities:
logger.debug(f"🎯 [任务确定] 从 capabilities {capabilities} 推断为: vision")
return "vision"
elif "embedding" in capabilities:
logger.debug(f"🎯 [任务确定] 从 capabilities {capabilities} 推断为: embedding")
return "embedding"
elif "speech" in capabilities:
logger.debug(f"🎯 [任务确定] 从 capabilities {capabilities} 推断为: speech")
return "speech"
elif "text" in capabilities:
# 如果只有文本能力则根据request_type细分
task = "llm_reasoning" if self.request_type == "reasoning" else "llm_normal"
logger.debug(f"🎯 [任务确定] 从 capabilities {capabilities} 和 request_type {self.request_type} 推断为: {task}")
return task
# 方法3: 向后兼容 - 基于模型名称的关键字推断(不推荐但保留兼容性)
logger.warning(f"⚠️ [任务确定] 配置中未找到 task_type 或 capabilities回退到基于模型名称的推断: {model_name}")
logger.warning("⚠️ [建议] 请在 model_config.toml 中为模型添加明确的 task_type 或 capabilities 字段")
# 保留原有的关键字匹配逻辑作为fallback
if any(keyword in model_name.lower() for keyword in ["vlm", "vision", "gpt-4o", "claude", "vl-"]):
logger.debug(f"🎯 [任务确定] 从模型名称 {model_name} 推断为: vision")
return "vision"
elif any(keyword in model_name.lower() for keyword in ["embed", "text-embedding", "bge-"]):
logger.debug(f"🎯 [任务确定] 从模型名称 {model_name} 推断为: embedding")
return "embedding"
elif any(keyword in model_name.lower() for keyword in ["whisper", "speech", "voice"]):
logger.debug(f"🎯 [任务确定] 从模型名称 {model_name} 推断为: speech")
return "speech"
else:
# 根据request_type确定映射到配置文件中定义的任务
task = "llm_reasoning" if self.request_type == "reasoning" else "llm_normal"
logger.debug(f"🎯 [任务确定] 从 request_type {self.request_type} 推断为: {task}")
return task
def _get_full_model_config(self, model_name: str) -> dict | None:
"""
根据模型名称从全局配置中获取完整的模型配置
现在直接使用已解析的ModelInfo对象不再读取TOML文件
Args:
model_name: 模型名称
Returns:
完整的模型配置字典如果找不到则返回None
"""
try:
from src.config.config import model_config
return self._get_model_config_from_parsed(model_name, model_config)
except Exception as e:
logger.warning(f"⚠️ [配置查找] 获取模型配置时出错: {str(e)}")
return None
def _get_model_config_from_parsed(self, model_name: str, model_config) -> dict | None:
"""
从已解析的配置对象中获取模型配置
使用扩展后的ModelInfo类包含task_type和capabilities字段
"""
try:
# 直接通过模型名称查找
if model_name in model_config.models:
model_info = model_config.models[model_name]
logger.debug(f"🔍 [配置查找] 找到模型 {model_name} 的配置对象: {model_info}")
# 将ModelInfo对象转换为字典
model_dict = {
"model_identifier": model_info.model_identifier,
"name": model_info.name,
"api_provider": model_info.api_provider,
"price_in": model_info.price_in,
"price_out": model_info.price_out,
"force_stream_mode": model_info.force_stream_mode,
"task_type": model_info.task_type,
"capabilities": model_info.capabilities,
}
logger.debug(f"🔍 [配置查找] 转换后的模型配置字典: {model_dict}")
return model_dict
# 如果直接查找失败尝试通过model_identifier查找
for name, model_info in model_config.models.items():
if (model_info.model_identifier == model_name or
hasattr(model_info, 'model_name') and model_info.model_name == model_name):
logger.debug(f"🔍 [配置查找] 通过标识符找到模型 {model_name} (配置名称: {name})")
# 同样转换为字典
model_dict = {
"model_identifier": model_info.model_identifier,
"name": model_info.name,
"api_provider": model_info.api_provider,
"price_in": model_info.price_in,
"price_out": model_info.price_out,
"force_stream_mode": model_info.force_stream_mode,
"task_type": model_info.task_type,
"capabilities": model_info.capabilities,
}
return model_dict
return None
except Exception as e:
logger.warning(f"⚠️ [配置查找] 从已解析配置获取模型配置时出错: {str(e)}")
return None
@staticmethod
def _init_database():
"""初始化数据库集合"""
try:
# 使用 Peewee 创建表safe=True 表示如果表已存在则不会抛出错误
db.create_tables([LLMUsage], safe=True)
# logger.debug("LLMUsage 表已初始化/确保存在。")
except Exception as e:
logger.error(f"创建 LLMUsage 表失败: {str(e)}")
def _record_usage(
self,
prompt_tokens: int,
completion_tokens: int,
total_tokens: int,
user_id: str = "system",
request_type: str | None = None,
endpoint: str = "/chat/completions",
):
"""记录模型使用情况到数据库
Args:
prompt_tokens: 输入token数
completion_tokens: 输出token数
total_tokens: 总token数
user_id: 用户ID默认为system
request_type: 请求类型
endpoint: API端点
"""
# 如果 request_type 为 None则使用实例变量中的值
if request_type is None:
request_type = self.request_type
try:
# 使用 Peewee 模型创建记录
LLMUsage.create(
model_name=self.model_name,
user_id=user_id,
request_type=request_type,
endpoint=endpoint,
prompt_tokens=prompt_tokens,
completion_tokens=completion_tokens,
total_tokens=total_tokens,
cost=self._calculate_cost(prompt_tokens, completion_tokens),
status="success",
timestamp=datetime.now(), # Peewee 会处理 DateTimeField
)
logger.debug(
f"Token使用情况 - 模型: {self.model_name}, "
f"用户: {user_id}, 类型: {request_type}, "
f"提示词: {prompt_tokens}, 完成: {completion_tokens}, "
f"总计: {total_tokens}"
)
except Exception as e:
logger.error(f"记录token使用情况失败: {str(e)}")
def _calculate_cost(self, prompt_tokens: int, completion_tokens: int) -> float:
"""计算API调用成本
使用模型的pri_in和pri_out价格计算输入和输出的成本
Args:
prompt_tokens: 输入token数量
completion_tokens: 输出token数量
Returns:
float: 总成本(元)
"""
# 使用模型的pri_in和pri_out计算成本
input_cost = (prompt_tokens / 1000000) * self.pri_in
output_cost = (completion_tokens / 1000000) * self.pri_out
return round(input_cost + output_cost, 6)
@staticmethod
def _extract_reasoning(content: str) -> Tuple[str, str]:
"""CoT思维链提取"""
match = re.search(r"(?:<think>)?(.*?)</think>", content, re.DOTALL)
content = re.sub(r"(?:<think>)?.*?</think>", "", content, flags=re.DOTALL, count=1).strip()
reasoning = match[1].strip() if match else ""
return content, reasoning
def _handle_model_exception(self, e: Exception, operation: str) -> None:
"""
统一的模型异常处理方法
根据异常类型提供更精确的错误信息和处理策略
Args:
e: 捕获的异常
operation: 操作类型(用于日志记录)
"""
operation_desc = {
"image": "图片响应生成",
"voice": "语音识别",
"text": "文本响应生成",
"embedding": "向量嵌入获取"
}
op_name = operation_desc.get(operation, operation)
if SPECIFIC_EXCEPTIONS_AVAILABLE:
# 使用具体异常类型进行精确处理
if isinstance(e, NetworkConnectionError):
logger.error(f"模型 {self.model_name} {op_name}失败: 网络连接错误")
raise RuntimeError("网络连接异常请检查网络连接状态或API服务器地址是否正确") from e
elif isinstance(e, ReqAbortException):
logger.error(f"模型 {self.model_name} {op_name}失败: 请求被中断")
raise RuntimeError("请求被中断或取消,请稍后重试") from e
elif isinstance(e, RespNotOkException):
logger.error(f"模型 {self.model_name} {op_name}失败: HTTP响应错误 {e.status_code}")
# 重新抛出原始异常,保留详细的状态码信息
raise e
elif isinstance(e, RespParseException):
logger.error(f"模型 {self.model_name} {op_name}失败: 响应解析错误")
raise RuntimeError("API响应格式异常请检查模型配置或联系管理员") from e
else:
# 未知异常,使用通用处理
logger.error(f"模型 {self.model_name} {op_name}失败: 未知错误 {type(e).__name__}: {str(e)}")
self._handle_generic_exception(e, op_name)
else:
# 如果无法导入具体异常,使用通用处理
logger.error(f"模型 {self.model_name} {op_name}失败: {str(e)}")
self._handle_generic_exception(e, op_name)
def _handle_generic_exception(self, e: Exception, operation: str) -> None:
"""
通用异常处理(向后兼容的错误字符串匹配)
Args:
e: 捕获的异常
operation: 操作描述
"""
error_str = str(e)
# 基于错误消息内容的分类处理
if "401" in error_str or "API key" in error_str or "认证" in error_str:
raise RuntimeError("API key 错误,认证失败,请检查 config/model_config.toml 中的 API key 配置是否正确") from e
elif "429" in error_str or "频繁" in error_str or "rate limit" in error_str:
raise RuntimeError("请求过于频繁,请稍后再试") from e
elif "500" in error_str or "503" in error_str or "服务器" in error_str:
raise RuntimeError("服务器负载过高模型回复失败QAQ") from e
elif "413" in error_str or "payload" in error_str.lower() or "过大" in error_str:
raise RuntimeError("请求体过大,请尝试压缩图片或减少输入内容") from e
elif "timeout" in error_str.lower() or "超时" in error_str:
raise RuntimeError("请求超时,请检查网络连接或稍后重试") from e
else:
raise RuntimeError(f"模型 {self.model_name} {operation}失败: {str(e)}") from e
# === 主要API方法 ===
# 这些方法提供与新架构的桥接
async def generate_response_for_image(self, prompt: str, image_base64: str, image_format: str) -> Tuple:
"""
根据输入的提示和图片生成模型的异步响应
使用新架构的模型请求处理器
"""
if not self.use_new_architecture:
raise RuntimeError(
f"模型 {self.model_name} 无法使用新架构,请检查 config/model_config.toml 中的 API 配置。"
)
if self.request_handler is None:
raise RuntimeError(
f"模型 {self.model_name} 请求处理器未初始化,无法处理图片请求"
)
if MessageBuilder is None:
raise RuntimeError("MessageBuilder不可用请检查新架构配置")
try:
# 构建包含图片的消息
message_builder = MessageBuilder()
message_builder.add_text_content(prompt).add_image_content(
image_format=image_format,
image_base64=image_base64
)
messages = [message_builder.build()]
# 使用新架构发送请求(只传递支持的参数)
response = await self.request_handler.get_response( # type: ignore
messages=messages,
tool_options=None,
response_format=None
)
# 新架构返回的是 APIResponse 对象,直接提取内容
content = response.content or ""
reasoning_content = response.reasoning_content or ""
tool_calls = response.tool_calls
# 从内容中提取<think>标签的推理内容(向后兼容)
if not reasoning_content and content:
content, extracted_reasoning = self._extract_reasoning(content)
reasoning_content = extracted_reasoning
# 记录token使用情况
if response.usage:
self._record_usage(
prompt_tokens=response.usage.prompt_tokens or 0,
completion_tokens=response.usage.completion_tokens or 0,
total_tokens=response.usage.total_tokens or 0,
user_id="system",
request_type=self.request_type,
endpoint="/chat/completions"
)
# 返回格式兼容旧版本
if tool_calls:
return content, reasoning_content, tool_calls
else:
return content, reasoning_content
except Exception as e:
self._handle_model_exception(e, "image")
# 这行代码永远不会执行因为_handle_model_exception总是抛出异常
# 但是为了满足类型检查的要求,我们添加一个不可达的返回语句
return "", "" # pragma: no cover
async def generate_response_for_voice(self, voice_bytes: bytes) -> Tuple:
"""
根据输入的语音文件生成模型的异步响应
使用新架构的模型请求处理器
"""
if not self.use_new_architecture:
raise RuntimeError(
f"模型 {self.model_name} 无法使用新架构,请检查 config/model_config.toml 中的 API 配置。"
)
if self.request_handler is None:
raise RuntimeError(
f"模型 {self.model_name} 请求处理器未初始化,无法处理语音请求"
)
try:
# 构建语音识别请求参数
# 注意:新架构中的语音识别可能使用不同的方法
# 这里先使用get_response方法可能需要根据实际API调整
response = await self.request_handler.get_response( # type: ignore
messages=[], # 语音识别可能不需要消息
tool_options=None
)
# 新架构返回的是 APIResponse 对象,直接提取文本内容
return (response.content,) if response.content else ("",)
except Exception as e:
self._handle_model_exception(e, "voice")
# 不可达的返回语句,仅用于满足类型检查
return ("",) # pragma: no cover
async def generate_response_async(self, prompt: str, **kwargs) -> Union[str, Tuple]:
"""
异步方式根据输入的提示生成模型的响应
使用新架构的模型请求处理器,如无法使用则抛出错误
"""
if not self.use_new_architecture:
raise RuntimeError(
f"模型 {self.model_name} 无法使用新架构,请检查 config/model_config.toml 中的 API 配置。"
)
if self.request_handler is None:
raise RuntimeError(
f"模型 {self.model_name} 请求处理器未初始化,无法生成响应"
)
if MessageBuilder is None:
raise RuntimeError("MessageBuilder不可用请检查新架构配置")
try:
# 构建消息
message_builder = MessageBuilder()
message_builder.add_text_content(prompt)
messages = [message_builder.build()]
# 使用新架构发送请求(只传递支持的参数)
response = await self.request_handler.get_response( # type: ignore
messages=messages,
tool_options=None,
response_format=None
)
# 新架构返回的是 APIResponse 对象,直接提取内容
content = response.content or ""
reasoning_content = response.reasoning_content or ""
tool_calls = response.tool_calls
# 从内容中提取<think>标签的推理内容(向后兼容)
if not reasoning_content and content:
content, extracted_reasoning = self._extract_reasoning(content)
reasoning_content = extracted_reasoning
# 记录token使用情况
if response.usage:
self._record_usage(
prompt_tokens=response.usage.prompt_tokens or 0,
completion_tokens=response.usage.completion_tokens or 0,
total_tokens=response.usage.total_tokens or 0,
user_id="system",
request_type=self.request_type,
endpoint="/chat/completions"
)
# 返回格式兼容旧版本
if tool_calls:
return content, (reasoning_content, self.model_name, tool_calls)
else:
return content, (reasoning_content, self.model_name)
except Exception as e:
self._handle_model_exception(e, "text")
# 不可达的返回语句,仅用于满足类型检查
return "", ("", self.model_name) # pragma: no cover
async def get_embedding(self, text: str) -> Union[list, None]:
"""
异步方法获取文本的embedding向量
使用新架构的模型请求处理器
Args:
text: 需要获取embedding的文本
Returns:
list: embedding向量如果失败则返回None
"""
if not text:
logger.debug("该消息没有长度不再发送获取embedding向量的请求")
return None
if not self.use_new_architecture:
logger.warning(f"模型 {self.model_name} 无法使用新架构embedding请求将被跳过")
return None
if self.request_handler is None:
logger.warning(f"模型 {self.model_name} 请求处理器未初始化embedding请求将被跳过")
return None
try:
# 构建embedding请求参数
# 使用新架构的get_embedding方法
response = await self.request_handler.get_embedding(text) # type: ignore
# 新架构返回的是 APIResponse 对象直接提取embedding
if response.embedding:
embedding = response.embedding
# 记录token使用情况
if response.usage:
self._record_usage(
prompt_tokens=response.usage.prompt_tokens or 0,
completion_tokens=response.usage.completion_tokens or 0,
total_tokens=response.usage.total_tokens or 0,
user_id="system",
request_type=self.request_type,
endpoint="/embeddings"
)
return embedding
else:
logger.warning(f"模型 {self.model_name} 返回的embedding响应为空")
return None
except Exception as e:
# 对于embedding请求我们记录错误但不抛出异常而是返回None
# 这是为了保持与原有行为的兼容性
try:
self._handle_model_exception(e, "embedding")
except RuntimeError:
# 捕获_handle_model_exception抛出的RuntimeError转换为警告日志
logger.warning(f"模型 {self.model_name} embedding请求失败返回None: {str(e)}")
return None
def compress_base64_image_by_scale(base64_data: str, target_size: int = int(0.8 * 1024 * 1024)) -> str:
"""压缩base64格式的图片到指定大小
Args:
base64_data: base64编码的图片数据
target_size: 目标文件大小字节默认0.8MB
Returns:
str: 压缩后的base64图片数据
"""
try:
# 将base64转换为字节数据
# 确保base64字符串只包含ASCII字符
if isinstance(base64_data, str):
base64_data = base64_data.encode("ascii", errors="ignore").decode("ascii")
image_data = base64.b64decode(base64_data)
# 如果已经小于目标大小,直接返回原图
if len(image_data) <= 2 * 1024 * 1024:
return base64_data
# 将字节数据转换为图片对象
img = Image.open(io.BytesIO(image_data))
# 获取原始尺寸
original_width, original_height = img.size
# 计算缩放比例
scale = min(1.0, (target_size / len(image_data)) ** 0.5)
# 计算新的尺寸
new_width = int(original_width * scale)
new_height = int(original_height * scale)
# 创建内存缓冲区
output_buffer = io.BytesIO()
# 如果是GIF处理所有帧
if getattr(img, "is_animated", False):
frames = []
n_frames = getattr(img, 'n_frames', 1)
for frame_idx in range(n_frames):
img.seek(frame_idx)
new_frame = img.copy()
new_frame = new_frame.resize((new_width // 2, new_height // 2), Image.Resampling.LANCZOS) # 动图折上折
frames.append(new_frame)
# 保存到缓冲区
frames[0].save(
output_buffer,
format="GIF",
save_all=True,
append_images=frames[1:],
optimize=True,
duration=img.info.get("duration", 100),
loop=img.info.get("loop", 0),
)
else:
# 处理静态图片
resized_img = img.resize((new_width, new_height), Image.Resampling.LANCZOS)
# 保存到缓冲区,保持原始格式
if img.format == "PNG" and img.mode in ("RGBA", "LA"):
resized_img.save(output_buffer, format="PNG", optimize=True)
else:
resized_img.save(output_buffer, format="JPEG", quality=95, optimize=True)
# 获取压缩后的数据并转换为base64
compressed_data = output_buffer.getvalue()
logger.info(f"压缩图片: {original_width}x{original_height} -> {new_width}x{new_height}")
logger.info(f"压缩前大小: {len(image_data) / 1024:.1f}KB, 压缩后大小: {len(compressed_data) / 1024:.1f}KB")
return base64.b64encode(compressed_data).decode("utf-8")
except Exception as e:
logger.error(f"压缩图片失败: {str(e)}")
import traceback
logger.error(traceback.format_exc())
return base64_data