转换原来的tools到新的(虽然没转)

This commit is contained in:
UnCLAS-Prommer
2025-07-29 00:15:29 +08:00
parent 6062b6bd3c
commit 16c644a666
10 changed files with 54 additions and 376 deletions

View File

@@ -1,10 +1,10 @@
# 🔧 工具系统详解 # 🔧 工具系统详解
## 📖 什么是工具系统 ## 📖 什么是工具
工具系统是MaiBot的信息获取能力扩展组件。如果说Action组件功能五花八门可以拓展麦麦能做的事情那么Tool就是在某个过程中拓宽了麦麦能够获得的信息量。 工具是MaiBot的信息获取能力扩展组件。如果说Action组件功能五花八门可以拓展麦麦能做的事情那么Tool就是在某个过程中拓宽了麦麦能够获得的信息量。
### 🎯 工具系统的特点 ### 🎯 工具的特点
- 🔍 **信息获取增强**:扩展麦麦获取外部信息的能力 - 🔍 **信息获取增强**:扩展麦麦获取外部信息的能力
- 📊 **数据丰富**:帮助麦麦获得更多背景信息和实时数据 - 📊 **数据丰富**:帮助麦麦获得更多背景信息和实时数据
@@ -20,14 +20,11 @@
| **目标** | 让麦麦做更多事情 | 提供具体功能 | 让麦麦知道更多信息 | | **目标** | 让麦麦做更多事情 | 提供具体功能 | 让麦麦知道更多信息 |
| **使用场景** | 增强交互体验 | 功能服务 | 信息查询和分析 | | **使用场景** | 增强交互体验 | 功能服务 | 信息查询和分析 |
## 🏗️ 工具基本结构 ## 🏗️ Tool组件的基本结构
### 必要组件
每个工具必须继承 `BaseTool` 基类并实现以下属性和方法: 每个工具必须继承 `BaseTool` 基类并实现以下属性和方法:
```python ```python
from src.tools.tool_can_use.base_tool import BaseTool, register_tool from src.plugin_system import BaseTool
class MyTool(BaseTool): class MyTool(BaseTool):
# 工具名称,必须唯一 # 工具名称,必须唯一
@@ -51,6 +48,8 @@ class MyTool(BaseTool):
}, },
"required": ["query"] "required": ["query"]
} }
available_for_llm = True # 是否对LLM可用
async def execute(self, function_args: Dict[str, Any]): async def execute(self, function_args: Dict[str, Any]):
"""执行工具逻辑""" """执行工具逻辑"""
@@ -77,15 +76,6 @@ class MyTool(BaseTool):
|-----|------|--------|------| |-----|------|--------|------|
| `execute` | `function_args` | `dict` | 执行工具核心逻辑 | | `execute` | `function_args` | `dict` | 执行工具核心逻辑 |
## 🔄 自动注册机制
工具系统采用自动发现和注册机制:
1. **文件扫描**:系统自动遍历 `tool_can_use` 目录中的所有Python文件
2. **类识别**:寻找继承自 `BaseTool` 的工具类
3. **自动注册**:只需要实现对应的类并把文件放在正确文件夹中就可自动注册
4. **即用即加载**:工具在需要时被实例化和调用
--- ---
## 🎨 完整工具示例 ## 🎨 完整工具示例
@@ -93,7 +83,7 @@ class MyTool(BaseTool):
完成一个天气查询工具 完成一个天气查询工具
```python ```python
from src.tools.tool_can_use.base_tool import BaseTool, register_tool from src.plugin_system import BaseTool
import aiohttp import aiohttp
import json import json
@@ -177,55 +167,12 @@ class WeatherTool(BaseTool):
--- ---
## 📊 工具开发步骤
### 1. 创建工具文件
`src/tools/tool_can_use/` 目录下创建新的Python文件
```bash
# 例如创建 my_new_tool.py
touch src/tools/tool_can_use/my_new_tool.py
```
### 2. 实现工具类
```python
from src.tools.tool_can_use.base_tool import BaseTool, register_tool
class MyNewTool(BaseTool):
name = "my_new_tool"
description = "新工具的功能描述"
parameters = {
"type": "object",
"properties": {
# 定义参数
},
"required": []
}
async def execute(self, function_args, message_txt=""):
# 实现工具逻辑
return {
"name": self.name,
"content": "执行结果"
}
```
### 3. 系统集成
工具创建完成后,系统会自动发现和注册,无需额外配置。
---
## 🚨 注意事项和限制 ## 🚨 注意事项和限制
### 当前限制 ### 当前限制
1. **独立开发**:需要单独编写,暂未完全融入插件系统 1. **适用范围**:主要适用于信息获取场景
2. **适用范围**:主要适用于信息获取场景 2. **配置要求**:必须开启工具处理器
3. **配置要求**:必须开启工具处理器
### 开发建议 ### 开发建议

View File

@@ -1,4 +1,4 @@
from typing import List, Tuple, Type from typing import List, Tuple, Type, Any
from src.plugin_system import ( from src.plugin_system import (
BasePlugin, BasePlugin,
register_plugin, register_plugin,
@@ -13,32 +13,45 @@ from src.plugin_system import (
MaiMessages, MaiMessages,
) )
class HelloTool(BaseTool):
"""问候工具 - 用于发送问候消息"""
name = "hello_tool" class CompareNumbersTool(BaseTool):
description = "发送问候消息" """比较两个数大小的工具"""
name = "compare_numbers"
description = "使用工具 比较两个数的大小,返回较大的数"
parameters = { parameters = {
"type": "object", "type": "object",
"properties": { "properties": {
"greeting_message": { "num1": {"type": "number", "description": "第一个数字"},
"type": "string", "num2": {"type": "number", "description": "第二个数字"},
"description": "要发送的问候消息"
},
}, },
"required": ["greeting_message"] "required": ["num1", "num2"],
} }
available_for_llm = True
async def execute(self, function_args: dict[str, Any]) -> dict[str, Any]:
"""执行比较两个数的大小
Args:
function_args: 工具参数
Returns:
dict: 工具执行结果
"""
num1: int | float = function_args.get("num1") # type: ignore
num2: int | float = function_args.get("num2") # type: ignore
try:
if num1 > num2:
result = f"{num1} 大于 {num2}"
elif num1 < num2:
result = f"{num1} 小于 {num2}"
else:
result = f"{num1} 等于 {num2}"
return {"name": self.name, "content": result}
except Exception as e:
return {"name": self.name, "content": f"比较数字失败,炸了: {str(e)}"}
async def execute(self, function_args):
"""执行问候工具"""
import random
greeting_message = random.choice(function_args.get("greeting_message", ["嗨!很高兴见到你!😊"]))
return {
"name": self.name,
"content": greeting_message
}
# ===== Action组件 ===== # ===== Action组件 =====
class HelloAction(BaseAction): class HelloAction(BaseAction):
@@ -159,7 +172,9 @@ class HelloWorldPlugin(BasePlugin):
"enabled": ConfigField(type=bool, default=False, description="是否启用插件"), "enabled": ConfigField(type=bool, default=False, description="是否启用插件"),
}, },
"greeting": { "greeting": {
"message": ConfigField(type=list, default=["嗨!很开心见到你!😊","Ciallo(∠・ω< )⌒★"], description="默认问候消息"), "message": ConfigField(
type=list, default=["嗨!很开心见到你!😊", "Ciallo(∠・ω< )⌒★"], description="默认问候消息"
),
"enable_emoji": ConfigField(type=bool, default=True, description="是否启用表情符号"), "enable_emoji": ConfigField(type=bool, default=True, description="是否启用表情符号"),
}, },
"time": {"format": ConfigField(type=str, default="%Y-%m-%d %H:%M:%S", description="时间显示格式")}, "time": {"format": ConfigField(type=str, default="%Y-%m-%d %H:%M:%S", description="时间显示格式")},
@@ -169,7 +184,7 @@ class HelloWorldPlugin(BasePlugin):
def get_plugin_components(self) -> List[Tuple[ComponentInfo, Type]]: def get_plugin_components(self) -> List[Tuple[ComponentInfo, Type]]:
return [ return [
(HelloAction.get_action_info(), HelloAction), (HelloAction.get_action_info(), HelloAction),
(HelloTool.get_tool_info(), HelloTool), # 添加问候工具 (CompareNumbersTool.get_tool_info(), CompareNumbersTool), # 添加比较数字工具
(ByeAction.get_action_info(), ByeAction), # 添加告别Action (ByeAction.get_action_info(), ByeAction), # 添加告别Action
(TimeCommand.get_command_info(), TimeCommand), (TimeCommand.get_command_info(), TimeCommand),
(PrintMessage.get_handler_info(), PrintMessage), (PrintMessage.get_handler_info(), PrintMessage),

View File

@@ -10,7 +10,6 @@ install(extra_lines=3)
logger = get_logger("base_tool") logger = get_logger("base_tool")
class BaseTool(ABC): class BaseTool(ABC):
"""所有工具的基类""" """所有工具的基类"""
@@ -37,7 +36,7 @@ class BaseTool(ABC):
"type": "function", "type": "function",
"function": {"name": cls.name, "description": cls.description, "parameters": cls.parameters}, "function": {"name": cls.name, "description": cls.description, "parameters": cls.parameters},
} }
@classmethod @classmethod
def get_tool_info(cls) -> ToolInfo: def get_tool_info(cls) -> ToolInfo:
"""获取工具信息""" """获取工具信息"""
@@ -79,7 +78,7 @@ class BaseTool(ABC):
Returns: Returns:
dict: 工具执行结果 dict: 工具执行结果
""" """
if self.parameters and (missing := [p for p in self.parameters.get("required", []) if p not in function_args]): if self.parameters and (missing := [p for p in self.parameters.get("required", []) if p not in function_args]):
raise ValueError(f"工具类 {self.__class__.__name__} 缺少必要参数: {', '.join(missing)}") raise ValueError(f"工具类 {self.__class__.__name__} 缺少必要参数: {', '.join(missing)}")

View File

@@ -195,7 +195,7 @@ class ComponentRegistry:
def _register_tool_component(self, tool_info: ToolInfo, tool_class: Type[BaseTool]) -> bool: def _register_tool_component(self, tool_info: ToolInfo, tool_class: Type[BaseTool]) -> bool:
"""注册Tool组件到Tool特定注册表""" """注册Tool组件到Tool特定注册表"""
tool_name = tool_info.name tool_name = tool_info.name
self._tool_registry[tool_name] = tool_class self._tool_registry[tool_name] = tool_class
# 如果是llm可用的且启用的工具,添加到 llm可用工具列表 # 如果是llm可用的且启用的工具,添加到 llm可用工具列表

View File

@@ -1,4 +1,4 @@
from src.tools.tool_can_use.base_tool import BaseTool from src.plugin_system.base.base_tool import BaseTool
from src.chat.utils.utils import get_embedding from src.chat.utils.utils import get_embedding
from src.common.database.database_model import Knowledges # Updated import from src.common.database.database_model import Knowledges # Updated import
from src.common.logger import get_logger from src.common.logger import get_logger
@@ -77,7 +77,7 @@ class SearchKnowledgeTool(BaseTool):
Union[str, list]: 格式化的信息字符串或原始结果列表 Union[str, list]: 格式化的信息字符串或原始结果列表
""" """
if not query_embedding: if not query_embedding:
return "" if not return_raw else [] return [] if return_raw else ""
similar_items = [] similar_items = []
try: try:
@@ -115,10 +115,10 @@ class SearchKnowledgeTool(BaseTool):
except Exception as e: except Exception as e:
logger.error(f"从 Peewee 数据库获取知识信息失败: {str(e)}") logger.error(f"从 Peewee 数据库获取知识信息失败: {str(e)}")
return "" if not return_raw else [] return [] if return_raw else ""
if not results: if not results:
return "" if not return_raw else [] return [] if return_raw else ""
if return_raw: if return_raw:
# Peewee 模型实例不能直接序列化为 JSON如果需要原始模型调用者需要处理 # Peewee 模型实例不能直接序列化为 JSON如果需要原始模型调用者需要处理

View File

@@ -1,4 +1,4 @@
from src.tools.tool_can_use.base_tool import BaseTool from src.plugin_system.base.base_tool import BaseTool
# from src.common.database import db # from src.common.database import db
from src.common.logger import get_logger from src.common.logger import get_logger

View File

@@ -1,20 +0,0 @@
from src.tools.tool_can_use.base_tool import (
BaseTool,
register_tool,
discover_tools,
get_all_tool_definitions,
get_tool_instance,
TOOL_REGISTRY,
)
__all__ = [
"BaseTool",
"register_tool",
"discover_tools",
"get_all_tool_definitions",
"get_tool_instance",
"TOOL_REGISTRY",
]
# 自动发现并注册工具
discover_tools()

View File

@@ -1,115 +0,0 @@
from typing import List, Any, Optional, Type
import inspect
import importlib
import pkgutil
import os
from src.common.logger import get_logger
from rich.traceback import install
install(extra_lines=3)
logger = get_logger("base_tool")
# 工具注册表
TOOL_REGISTRY = {}
class BaseTool:
"""所有工具的基类"""
# 工具名称,子类必须重写
name = None
# 工具描述,子类必须重写
description = None
# 工具参数定义,子类必须重写
parameters = None
@classmethod
def get_tool_definition(cls) -> dict[str, Any]:
"""获取工具定义用于LLM工具调用
Returns:
dict: 工具定义字典
"""
if not cls.name or not cls.description or not cls.parameters:
raise NotImplementedError(f"工具类 {cls.__name__} 必须定义 name, description 和 parameters 属性")
return {
"type": "function",
"function": {"name": cls.name, "description": cls.description, "parameters": cls.parameters},
}
async def execute(self, function_args: dict[str, Any]) -> dict[str, Any]:
"""执行工具函数
Args:
function_args: 工具调用参数
Returns:
dict: 工具执行结果
"""
raise NotImplementedError("子类必须实现execute方法")
def register_tool(tool_class: Type[BaseTool]):
"""注册工具到全局注册表
Args:
tool_class: 工具类
"""
if not issubclass(tool_class, BaseTool):
raise TypeError(f"{tool_class.__name__} 不是 BaseTool 的子类")
tool_name = tool_class.name
if not tool_name:
raise ValueError(f"工具类 {tool_class.__name__} 没有定义 name 属性")
TOOL_REGISTRY[tool_name] = tool_class
logger.info(f"已注册: {tool_name}")
def discover_tools():
"""自动发现并注册tool_can_use目录下的所有工具"""
# 获取当前目录路径
current_dir = os.path.dirname(os.path.abspath(__file__))
package_name = os.path.basename(current_dir)
# 遍历包中的所有模块
for _, module_name, _ in pkgutil.iter_modules([current_dir]):
# 跳过当前模块和__pycache__
if module_name == "base_tool" or module_name.startswith("__"):
continue
# 导入模块
module = importlib.import_module(f"src.tools.{package_name}.{module_name}")
# 查找模块中的工具类
for _, obj in inspect.getmembers(module):
if inspect.isclass(obj) and issubclass(obj, BaseTool) and obj != BaseTool:
register_tool(obj)
logger.info(f"工具发现完成,共注册 {len(TOOL_REGISTRY)} 个工具")
def get_all_tool_definitions() -> List[dict[str, Any]]:
"""获取所有已注册工具的定义
Returns:
List[dict]: 工具定义列表
"""
return [tool_class().get_tool_definition() for tool_class in TOOL_REGISTRY.values()]
def get_tool_instance(tool_name: str) -> Optional[BaseTool]:
"""获取指定名称的工具实例
Args:
tool_name: 工具名称
Returns:
Optional[BaseTool]: 工具实例如果找不到则返回None
"""
tool_class = TOOL_REGISTRY.get(tool_name)
if not tool_class:
return None
return tool_class()

View File

@@ -1,45 +0,0 @@
from src.tools.tool_can_use.base_tool import BaseTool
from src.common.logger import get_logger
from typing import Any
logger = get_logger("compare_numbers_tool")
class CompareNumbersTool(BaseTool):
"""比较两个数大小的工具"""
name = "compare_numbers"
description = "使用工具 比较两个数的大小,返回较大的数"
parameters = {
"type": "object",
"properties": {
"num1": {"type": "number", "description": "第一个数字"},
"num2": {"type": "number", "description": "第二个数字"},
},
"required": ["num1", "num2"],
}
async def execute(self, function_args: dict[str, Any]) -> dict[str, Any]:
"""执行比较两个数的大小
Args:
function_args: 工具参数
Returns:
dict: 工具执行结果
"""
num1: int | float = function_args.get("num1") # type: ignore
num2: int | float = function_args.get("num2") # type: ignore
try:
if num1 > num2:
result = f"{num1} 大于 {num2}"
elif num1 < num2:
result = f"{num1} 小于 {num2}"
else:
result = f"{num1} 等于 {num2}"
return {"name": self.name, "content": result}
except Exception as e:
logger.error(f"比较数字失败: {str(e)}")
return {"name": self.name, "content": f"比较数字失败,炸了: {str(e)}"}

View File

@@ -1,103 +0,0 @@
from src.tools.tool_can_use.base_tool import BaseTool
from src.person_info.person_info import get_person_info_manager
from src.common.logger import get_logger
logger = get_logger("rename_person_tool")
class RenamePersonTool(BaseTool):
name = "rename_person"
description = (
"这个工具可以改变用户的昵称。你可以选择改变对他人的称呼。你想给人改名,叫别人别的称呼,需要调用这个工具。"
)
parameters = {
"type": "object",
"properties": {
"person_name": {"type": "string", "description": "需要重新取名的用户的当前昵称"},
"message_content": {
"type": "string",
"description": "当前的聊天内容或特定要求,用于提供取名建议的上下文,尽可能详细。",
},
},
"required": ["person_name"],
}
async def execute(self, function_args: dict):
"""
执行取名工具逻辑
Args:
function_args (dict): 包含 'person_name' 和可选 'message_content' 的字典
message_txt (str): 原始消息文本 (这里未使用,因为 message_content 更明确)
Returns:
dict: 包含执行结果的字典
"""
person_name_to_find = function_args.get("person_name")
request_context = function_args.get("message_content", "") # 如果没有提供,则为空字符串
if not person_name_to_find:
return {"name": self.name, "content": "错误:必须提供需要重命名的用户昵称 (person_name)。"}
person_info_manager = get_person_info_manager()
try:
# 1. 根据昵称查找用户信息
logger.debug(f"尝试根据昵称 '{person_name_to_find}' 查找用户...")
person_info = await person_info_manager.get_person_info_by_name(person_name_to_find)
if not person_info:
logger.info(f"未找到昵称为 '{person_name_to_find}' 的用户。")
return {
"name": self.name,
"content": f"找不到昵称为 '{person_name_to_find}' 的用户。请确保输入的是我之前为该用户取的昵称。",
}
person_id = person_info.get("person_id")
user_nickname = person_info.get("nickname") # 这是用户原始昵称
user_cardname = person_info.get("user_cardname")
user_avatar = person_info.get("user_avatar")
if not person_id:
logger.error(f"找到了用户 '{person_name_to_find}' 但无法获取 person_id")
return {"name": self.name, "content": f"找到了用户 '{person_name_to_find}' 但获取内部ID时出错。"}
# 2. 调用 qv_person_name 进行取名
logger.debug(
f"为用户 {person_id} (原昵称: {person_name_to_find}) 调用 qv_person_name请求上下文: '{request_context}'"
)
result = await person_info_manager.qv_person_name(
person_id=person_id,
user_nickname=user_nickname, # type: ignore
user_cardname=user_cardname, # type: ignore
user_avatar=user_avatar, # type: ignore
request=request_context,
)
# 3. 处理结果
if result and result.get("nickname"):
new_name = result["nickname"]
# reason = result.get("reason", "未提供理由")
logger.info(f"成功为用户 {person_id} 取了新昵称: {new_name}")
content = f"已成功将用户 {person_name_to_find} 的备注名更新为 {new_name}"
logger.info(content)
return {"name": self.name, "content": content}
else:
logger.warning(f"为用户 {person_id} 调用 qv_person_name 后未能成功获取新昵称。")
# 尝试从内存中获取可能已经更新的名字
current_name = await person_info_manager.get_value(person_id, "person_name")
if current_name and current_name != person_name_to_find:
return {
"name": self.name,
"content": f"尝试取新昵称时遇到一点小问题,但我已经将 '{person_name_to_find}' 的昵称更新为 '{current_name}' 了。",
}
else:
return {
"name": self.name,
"content": f"尝试为 '{person_name_to_find}' 取新昵称时遇到了问题,未能成功生成。可能需要稍后再试。",
}
except Exception as e:
error_msg = f"重命名失败: {str(e)}"
logger.error(error_msg, exc_info=True)
return {"name": self.name, "content": error_msg}