没想到吧,我还是没测试()

feat(mcp): 集成MCP SSE协议支持并扩展工具调用能力

新增MCP客户端类型(mcp_ssd),支持通过Model Context Protocol连接外部工具服务器。
更新文档和配置模板,提供完整的MCP接入指南;主程序启动时自动初始化MCP工具提供器,
tool_api 与 tool_use 核心链路新增对MCP工具的检测与调用,实现与既有插件工具的无缝兼容。
同步更新配置模型、模板与帮助文档。
This commit is contained in:
subiz
2025-10-05 19:24:57 +08:00
parent 33f7128a5a
commit 43fe6046b4
14 changed files with 1418 additions and 27 deletions

View File

@@ -12,8 +12,8 @@ class APIProvider(ValidatedConfigBase):
name: str = Field(..., min_length=1, description="API提供商名称")
base_url: str = Field(..., description="API基础URL")
api_key: str | list[str] = Field(..., min_length=1, description="API密钥支持单个密钥或密钥列表轮询")
client_type: Literal["openai", "gemini", "aiohttp_gemini"] = Field(
default="openai", description="客户端类型如openai/google等默认为openai"
client_type: Literal["openai", "gemini", "aiohttp_gemini", "mcp_sse"] = Field(
default="openai", description="客户端类型如openai/google/mcp_sse默认为openai"
)
max_retry: int = Field(default=2, ge=0, description="最大重试次数单个模型API调用失败最多重试的次数")
timeout: int = Field(

View File

@@ -339,6 +339,16 @@ MoFox_Bot(第三方修改版)
# 处理所有缓存的事件订阅(插件加载完成后)
event_manager.process_all_pending_subscriptions()
# 初始化MCP工具提供器
try:
mcp_config = global_config.get("mcp_servers", [])
if mcp_config:
from src.plugin_system.utils.mcp_tool_provider import mcp_tool_provider
await mcp_tool_provider.initialize(mcp_config)
logger.info("MCP工具提供器初始化成功")
except Exception as e:
logger.info(f"MCP工具提供器未配置或初始化失败: {e}")
# 初始化表情管理器
get_emoji_manager().initialize()

View File

@@ -17,7 +17,12 @@ def get_tool_instance(tool_name: str) -> BaseTool | None:
plugin_config = None
tool_class: type[BaseTool] = component_registry.get_component_class(tool_name, ComponentType.TOOL) # type: ignore
return tool_class(plugin_config) if tool_class else None
if tool_class:
return tool_class(plugin_config)
# 如果不是常规工具检查是否是MCP工具
# MCP工具不需要返回实例会在execute_tool_call中特殊处理
return None
def get_llm_available_tool_definitions():
@@ -29,4 +34,16 @@ def get_llm_available_tool_definitions():
from src.plugin_system.core import component_registry
llm_available_tools = component_registry.get_llm_available_tools()
return [(name, tool_class.get_tool_definition()) for name, tool_class in llm_available_tools.items()]
tool_definitions = [(name, tool_class.get_tool_definition()) for name, tool_class in llm_available_tools.items()]
# 添加MCP工具
try:
from src.plugin_system.utils.mcp_tool_provider import mcp_tool_provider
mcp_tools = mcp_tool_provider.get_mcp_tool_definitions()
tool_definitions.extend(mcp_tools)
if mcp_tools:
logger.debug(f"已添加 {len(mcp_tools)} 个MCP工具到可用工具列表")
except Exception as e:
logger.debug(f"获取MCP工具失败可能未配置: {e}")
return tool_definitions

View File

@@ -279,6 +279,23 @@ class ToolExecutor:
logger.info(
f"{self.log_prefix} 正在执行工具: [bold green]{function_name}[/bold green] | 参数: {function_args}"
)
# 检查是否是MCP工具
try:
from src.plugin_system.utils.mcp_tool_provider import mcp_tool_provider
if function_name in mcp_tool_provider.mcp_tools:
logger.info(f"{self.log_prefix}执行MCP工具: {function_name}")
result = await mcp_tool_provider.call_mcp_tool(function_name, function_args)
return {
"tool_call_id": tool_call.call_id,
"role": "tool",
"name": function_name,
"type": "function",
"content": result.get("content", ""),
}
except Exception as e:
logger.debug(f"检查MCP工具时出错: {e}")
function_args["llm_called"] = True # 标记为LLM调用
# 检查是否是二步工具的第二步调用

View File

@@ -0,0 +1,235 @@
"""
MCP (Model Context Protocol) 连接器
负责连接MCP服务器获取和执行工具
"""
import asyncio
from typing import Any
import aiohttp
import orjson
from src.common.logger import get_logger
logger = get_logger("MCP连接器")
class MCPConnector:
"""MCP服务器连接器"""
def __init__(self, server_url: str, api_key: str | None = None, timeout: int = 30):
"""
初始化MCP连接器
Args:
server_url: MCP服务器URL
api_key: API密钥可选
timeout: 超时时间(秒)
"""
self.server_url = server_url.rstrip("/")
self.api_key = api_key
self.timeout = timeout
self._session: aiohttp.ClientSession | None = None
self._tools_cache: dict[str, dict[str, Any]] = {}
self._cache_timestamp: float = 0
self._cache_ttl: int = 300 # 工具列表缓存5分钟
async def _get_session(self) -> aiohttp.ClientSession:
"""获取或创建aiohttp会话"""
if self._session is None or self._session.closed:
timeout = aiohttp.ClientTimeout(total=self.timeout)
self._session = aiohttp.ClientSession(timeout=timeout)
return self._session
async def close(self):
"""关闭连接"""
if self._session and not self._session.closed:
await self._session.close()
def _build_headers(self) -> dict[str, str]:
"""构建请求头"""
headers = {
"Content-Type": "application/json",
"Accept": "application/json",
}
if self.api_key:
headers["Authorization"] = f"Bearer {self.api_key}"
return headers
async def list_tools(self, force_refresh: bool = False) -> dict[str, dict[str, Any]]:
"""
获取MCP服务器提供的工具列表
Args:
force_refresh: 是否强制刷新缓存
Returns:
Dict[str, Dict]: 工具字典key为工具名value为工具定义
"""
import time
# 检查缓存
if not force_refresh and self._tools_cache and (time.time() - self._cache_timestamp) < self._cache_ttl:
logger.debug("使用缓存的MCP工具列表")
return self._tools_cache
logger.info(f"正在从MCP服务器获取工具列表: {self.server_url}")
try:
session = await self._get_session()
url = f"{self.server_url}/tools/list"
async with session.post(url, headers=self._build_headers(), json={}) as response:
if response.status != 200:
error_text = await response.text()
logger.error(f"获取MCP工具列表失败: HTTP {response.status} - {error_text}")
return {}
data = await response.json()
# 解析工具列表
tools = {}
tool_list = data.get("tools", [])
for tool_def in tool_list:
tool_name = tool_def.get("name")
if not tool_name:
continue
tools[tool_name] = {
"name": tool_name,
"description": tool_def.get("description", ""),
"input_schema": tool_def.get("inputSchema", {}),
}
logger.info(f"成功获取 {len(tools)} 个MCP工具")
self._tools_cache = tools
self._cache_timestamp = time.time()
return tools
except aiohttp.ClientError as e:
logger.error(f"连接MCP服务器失败: {e}")
return {}
except Exception as e:
logger.error(f"获取MCP工具列表时发生错误: {e}")
return {}
async def call_tool(self, tool_name: str, arguments: dict[str, Any]) -> dict[str, Any]:
"""
调用MCP服务器上的工具
Args:
tool_name: 工具名称
arguments: 工具参数
Returns:
Dict: 工具执行结果
"""
logger.info(f"调用MCP工具: {tool_name}")
logger.debug(f"工具参数: {arguments}")
try:
session = await self._get_session()
url = f"{self.server_url}/tools/call"
payload = {"name": tool_name, "arguments": arguments}
async with session.post(url, headers=self._build_headers(), json=payload) as response:
if response.status != 200:
error_text = await response.text()
logger.error(f"MCP工具调用失败: HTTP {response.status} - {error_text}")
return {
"success": False,
"error": f"HTTP {response.status}: {error_text}",
"content": f"调用MCP工具 {tool_name} 失败",
}
result = await response.json()
# 提取内容
content = result.get("content", [])
if isinstance(content, list) and len(content) > 0:
# MCP返回的是content数组
text_content = []
for item in content:
if isinstance(item, dict):
if item.get("type") == "text":
text_content.append(item.get("text", ""))
else:
text_content.append(str(item))
result_text = "\n".join(text_content) if text_content else str(content)
else:
result_text = str(content)
logger.info(f"MCP工具 {tool_name} 执行成功")
return {"success": True, "content": result_text, "raw_result": result}
except aiohttp.ClientError as e:
logger.error(f"调用MCP工具失败网络错误: {e}")
return {"success": False, "error": str(e), "content": f"网络错误:无法调用工具 {tool_name}"}
except Exception as e:
logger.error(f"调用MCP工具时发生错误: {e}")
return {"success": False, "error": str(e), "content": f"调用工具 {tool_name} 时发生错误"}
async def list_resources(self) -> list[dict[str, Any]]:
"""
获取MCP服务器提供的资源列表
Returns:
List[Dict]: 资源列表
"""
logger.info(f"正在从MCP服务器获取资源列表: {self.server_url}")
try:
session = await self._get_session()
url = f"{self.server_url}/resources/list"
async with session.post(url, headers=self._build_headers(), json={}) as response:
if response.status != 200:
error_text = await response.text()
logger.error(f"获取MCP资源列表失败: HTTP {response.status} - {error_text}")
return []
data = await response.json()
resources = data.get("resources", [])
logger.info(f"成功获取 {len(resources)} 个MCP资源")
return resources
except Exception as e:
logger.error(f"获取MCP资源列表时发生错误: {e}")
return []
async def read_resource(self, resource_uri: str) -> dict[str, Any]:
"""
读取MCP资源
Args:
resource_uri: 资源URI
Returns:
Dict: 资源内容
"""
logger.info(f"读取MCP资源: {resource_uri}")
try:
session = await self._get_session()
url = f"{self.server_url}/resources/read"
payload = {"uri": resource_uri}
async with session.post(url, headers=self._build_headers(), json=payload) as response:
if response.status != 200:
error_text = await response.text()
logger.error(f"读取MCP资源失败: HTTP {response.status} - {error_text}")
return {"success": False, "error": error_text}
result = await response.json()
logger.info(f"成功读取MCP资源: {resource_uri}")
return {"success": True, "content": result.get("contents", [])}
except Exception as e:
logger.error(f"读取MCP资源时发生错误: {e}")
return {"success": False, "error": str(e)}

View File

@@ -0,0 +1,174 @@
"""
MCP工具提供器 - 简化版
直接集成到工具系统,无需复杂的插件架构
"""
import asyncio
from typing import Any
from src.common.logger import get_logger
from src.plugin_system.utils.mcp_connector import MCPConnector
logger = get_logger("MCP工具提供器")
class MCPToolProvider:
"""MCP工具提供器单例"""
_instance = None
_initialized = False
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
def __init__(self):
if not MCPToolProvider._initialized:
self.connectors: dict[str, MCPConnector] = {}
self.mcp_tools: dict[str, dict[str, Any]] = {}
"""格式: {tool_full_name: {"connector": connector, "original_name": name, "definition": def}}"""
MCPToolProvider._initialized = True
async def initialize(self, mcp_servers: list[dict]):
"""
初始化MCP服务器连接
Args:
mcp_servers: MCP服务器配置列表
"""
logger.info(f"初始化MCP工具提供器{len(mcp_servers)}个服务器")
for server_config in mcp_servers:
await self._connect_server(server_config)
logger.info(f"MCP工具提供器初始化完成共注册{len(self.mcp_tools)}个工具")
async def _connect_server(self, config: dict):
"""连接单个MCP服务器"""
name = config.get("name", "unnamed")
url = config.get("url")
api_key = config.get("api_key")
enabled = config.get("enabled", True)
if not enabled or not url:
return
logger.info(f"连接MCP服务器: {name} ({url})")
connector = MCPConnector(url, api_key, config.get("timeout", 30))
self.connectors[name] = connector
try:
tools = await connector.list_tools()
for tool_name, tool_def in tools.items():
# 使用服务器名作前缀
full_name = f"{name}_{tool_name}"
self.mcp_tools[full_name] = {
"connector": connector,
"original_name": tool_name,
"definition": tool_def,
"server_name": name,
}
logger.info(f"{name}获取{len(tools)}个工具")
except Exception as e:
logger.error(f"连接MCP服务器{name}失败: {e}")
def get_mcp_tool_definitions(self) -> list[tuple[str, dict[str, Any]]]:
"""
获取所有MCP工具定义适配Bot的工具格式
Returns:
List[Tuple[str, dict]]: [(tool_name, tool_definition), ...]
"""
definitions = []
for full_name, tool_info in self.mcp_tools.items():
mcp_def = tool_info["definition"]
input_schema = mcp_def.get("input_schema", {})
# 转换为Bot的工具格式
bot_tool_def = {
"name": full_name,
"description": mcp_def.get("description", f"MCP工具: {full_name}"),
"parameters": self._convert_schema_to_parameters(input_schema),
}
definitions.append((full_name, bot_tool_def))
return definitions
def _convert_schema_to_parameters(self, schema: dict) -> list[tuple]:
"""
将MCP的JSON Schema转换为Bot的参数格式
Args:
schema: MCP的inputSchema
Returns:
Bot的parameters格式
"""
from src.plugin_system.base.component_types import ToolParamType
parameters = []
properties = schema.get("properties", {})
required = schema.get("required", [])
type_mapping = {
"string": ToolParamType.STRING,
"integer": ToolParamType.INTEGER,
"number": ToolParamType.FLOAT,
"boolean": ToolParamType.BOOLEAN,
}
for param_name, param_def in properties.items():
param_type = type_mapping.get(param_def.get("type", "string"), ToolParamType.STRING)
description = param_def.get("description", "")
is_required = param_name in required
enum_values = param_def.get("enum", None)
parameters.append((param_name, param_type, description, is_required, enum_values))
return parameters
async def call_mcp_tool(self, tool_name: str, arguments: dict[str, Any]) -> dict[str, Any]:
"""
调用MCP工具
Args:
tool_name: 工具全名(包含前缀)
arguments: 参数
Returns:
工具执行结果
"""
if tool_name not in self.mcp_tools:
return {"content": f"MCP工具{tool_name}不存在"}
tool_info = self.mcp_tools[tool_name]
connector = tool_info["connector"]
original_name = tool_info["original_name"]
logger.info(f"调用MCP工具: {tool_name}")
result = await connector.call_tool(original_name, arguments)
if result.get("success"):
return {"content": result.get("content", "")}
else:
return {"content": f"工具执行失败: {result.get('error', '未知错误')}"}
async def close(self):
"""关闭所有连接"""
for name, connector in self.connectors.items():
try:
await connector.close()
except Exception as e:
logger.error(f"关闭MCP连接{name}失败: {e}")
# 全局单例
mcp_tool_provider = MCPToolProvider()