移除了MCP

This commit is contained in:
雅诺狐
2025-08-14 23:13:11 +08:00
parent f4968c5751
commit 0400804056
12 changed files with 3 additions and 1625 deletions

View File

@@ -1,331 +0,0 @@
# MCP Server-Sent Events (SSE) 使用指南
## 📖 概述
MCP SSE 客户端是 MaiBot 的一个扩展功能,提供与 MCP (Model Context Protocol) 服务器的实时事件订阅能力。通过 Server-Sent Events (SSE) 技术MaiBot 可以接收来自 MCP 服务器的实时事件推送,实现低延迟的交互响应。
## ✨ 功能特性
- 🔗 **实时连接**: 与 MCP 服务器建立持久的 SSE 连接
- 🔄 **自动重连**: 支持指数退避策略的断线重连机制
- 🎯 **事件订阅**: 灵活的事件类型订阅和处理系统
- 🔐 **安全认证**: 支持 Bearer Token 和 SSL/TLS 认证
- 📊 **监控统计**: 提供连接状态和事件统计信息
- 🛡️ **错误处理**: 完善的异常处理和日志记录
## 🚀 快速开始
### 第一步:启用 MCP SSE 功能
1. 打开 `config/bot_config.toml` 配置文件
2. 找到 `[mcp_sse]` 配置段,如果没有请添加:
```toml
[mcp_sse]
enable = true # 启用 MCP SSE 功能
server_url = "http://your-mcp-server.com:8080/events" # MCP 服务器 SSE 端点
auth_key = "your-auth-token" # 认证密钥(可选)
```
### 第二步:配置连接参数
```toml
[mcp_sse]
# 基本配置
enable = true
server_url = "http://localhost:8080/events"
auth_key = ""
# 连接配置
connection_timeout = 30 # 连接超时时间(秒)
read_timeout = 60 # 读取超时时间(秒)
# 重连配置
enable_reconnect = true # 启用自动重连
max_reconnect_attempts = 10 # 最大重连次数(-1 表示无限重连)
initial_reconnect_delay = 1.0 # 初始重连延迟(秒)
max_reconnect_delay = 60.0 # 最大重连延迟(秒)
reconnect_backoff_factor = 2.0 # 重连延迟指数退避因子
# 事件处理配置
event_buffer_size = 1000 # 事件缓冲区大小
enable_event_logging = true # 启用事件日志记录
# 订阅配置
subscribed_events = [] # 订阅的事件类型(空列表表示订阅所有事件)
# 示例:只订阅特定事件
# subscribed_events = ["chat.message", "user.login", "system.status"]
# 高级配置
user_agent = "MaiBot-MCP-SSE-Client/1.0" # 用户代理字符串
# SSL 配置
verify_ssl = true # 是否验证 SSL 证书
ssl_cert_path = "" # SSL 客户端证书路径(可选)
ssl_key_path = "" # SSL 客户端密钥路径(可选)
```
### 第三步:启动 MaiBot
正常启动 MaiBot系统会自动初始化 MCP SSE 客户端:
```bash
python bot.py
```
启动日志中会显示 MCP SSE 相关信息:
```
[INFO] 初始化 MCP SSE 管理器
[INFO] 连接到 MCP 服务器: http://localhost:8080/events
[INFO] 成功连接到 MCP 服务器
[INFO] MCP SSE 系统初始化成功
```
## 📝 配置详解
### 基本配置
| 配置项 | 类型 | 默认值 | 说明 |
|--------|------|--------|------|
| `enable` | bool | false | 是否启用 MCP SSE 功能 |
| `server_url` | string | "" | MCP 服务器 SSE 端点 URL |
| `auth_key` | string | "" | 认证密钥,用于 Bearer Token 认证 |
### 连接配置
| 配置项 | 类型 | 默认值 | 说明 |
|--------|------|--------|------|
| `connection_timeout` | int | 30 | 连接超时时间(秒) |
| `read_timeout` | int | 60 | 读取超时时间(秒) |
### 重连配置
| 配置项 | 类型 | 默认值 | 说明 |
|--------|------|--------|------|
| `enable_reconnect` | bool | true | 是否启用自动重连 |
| `max_reconnect_attempts` | int | 10 | 最大重连次数,-1 表示无限重连 |
| `initial_reconnect_delay` | float | 1.0 | 初始重连延迟时间(秒) |
| `max_reconnect_delay` | float | 60.0 | 最大重连延迟时间(秒) |
| `reconnect_backoff_factor` | float | 2.0 | 重连延迟指数退避因子 |
### 事件处理配置
| 配置项 | 类型 | 默认值 | 说明 |
|--------|------|--------|------|
| `event_buffer_size` | int | 1000 | 事件缓冲区大小 |
| `enable_event_logging` | bool | true | 是否启用事件日志记录 |
| `subscribed_events` | list | [] | 订阅的事件类型列表 |
## 🎯 事件类型
MCP SSE 支持多种事件类型,常见的包括:
### 系统事件
- `system.status` - 系统状态变化
- `system.heartbeat` - 系统心跳
- `system.shutdown` - 系统关闭
### 用户事件
- `user.login` - 用户登录
- `user.logout` - 用户登出
- `user.action` - 用户行为
### 聊天事件
- `chat.message` - 聊天消息
- `chat.typing` - 正在输入
- `chat.reaction` - 消息反应
### 通知事件
- `notification.info` - 信息通知
- `notification.warning` - 警告通知
- `notification.error` - 错误通知
## 🔧 高级用法
### 自定义事件处理器
如果您需要对特定事件进行自定义处理,可以通过插件系统或直接修改代码来注册事件处理器:
```python
from src.mcp import get_mcp_sse_manager
# 获取 MCP SSE 管理器
manager = get_mcp_sse_manager()
if manager:
# 注册聊天消息事件处理器
def handle_chat_message(event):
print(f"收到聊天消息: {event.data}")
# 在这里添加您的自定义逻辑
manager.register_event_handler("chat.message", handle_chat_message)
# 注册全局事件处理器
def handle_all_events(event):
print(f"收到事件: {event.event_type} - {event.data}")
manager.register_global_event_handler(handle_all_events)
```
### 获取连接状态
```python
from src.mcp import get_mcp_sse_manager
manager = get_mcp_sse_manager()
if manager:
# 检查连接状态
if manager.is_connected():
print("MCP SSE 客户端已连接")
# 获取详细统计信息
stats = manager.get_stats()
print(f"连接状态: {stats['connected']}")
print(f"运行状态: {stats['running']}")
print(f"总接收事件数: {stats['total_events_received']}")
print(f"重连次数: {stats['reconnect_attempts']}")
```
### 查看最近事件
```python
from src.mcp import get_mcp_sse_manager
manager = get_mcp_sse_manager()
if manager:
# 获取最近 10 个事件
recent_events = manager.get_recent_events(10)
for event in recent_events:
print(f"{event.timestamp}: {event.event_type}")
print(f"数据: {event.data}")
print("---")
```
## 🧪 测试功能
项目包含了完整的测试工具,您可以使用它们来验证 MCP SSE 功能:
### 基本功能测试
```bash
# 进入项目目录
cd /path/to/MaiBot
# 运行基本功能测试
python -m src.mcp.test_client
```
### 重连功能测试
```bash
# 测试断线重连功能
python -m src.mcp.test_client reconnect
```
测试脚本会输出详细的测试结果,包括:
- 连接状态
- 接收到的事件数量
- 事件类型统计
- 连接时长和性能指标
## 🔍 故障排除
### 常见问题
#### 1. 连接失败
**问题**: 无法连接到 MCP 服务器
**解决方案**:
- 检查 `server_url` 配置是否正确
- 确认 MCP 服务器是否正在运行
- 检查网络连接和防火墙设置
- 验证端口是否可访问
#### 2. 认证失败
**问题**: 收到 401 认证错误
**解决方案**:
- 检查 `auth_key` 配置是否正确
- 确认认证密钥是否有效
- 联系 MCP 服务器管理员获取正确的认证信息
#### 3. SSL 证书错误
**问题**: SSL 证书验证失败
**解决方案**:
- 检查服务器 SSL 证书是否有效
- 临时设置 `verify_ssl = false` 进行测试
- 配置正确的客户端证书路径
#### 4. 频繁重连
**问题**: 客户端不断重连
**解决方案**:
- 检查网络连接稳定性
- 调整重连参数(增加延迟时间)
- 检查服务器端是否有连接限制
#### 5. 事件丢失
**问题**: 部分事件没有收到
**解决方案**:
- 增加 `event_buffer_size` 配置
- 检查事件处理器是否有异常
- 确认 `subscribed_events` 配置是否正确
### 日志调试
MCP SSE 相关日志使用以下标签:
- `mcp.sse_client` - SSE 客户端日志
- `mcp.event_handler` - 事件处理器日志
- `mcp.manager` - 管理器日志
您可以通过调整日志级别来获取更详细的调试信息:
```toml
[log]
console_log_level = "DEBUG" # 设置为 DEBUG 级别查看详细日志
```
## 📚 API 参考
### MCPSSEManager
主要的管理器类,提供以下方法:
- `is_running()` - 检查是否正在运行
- `is_connected()` - 检查是否已连接
- `get_stats()` - 获取统计信息
- `get_recent_events(count)` - 获取最近的事件
- `register_event_handler(event_type, handler)` - 注册事件处理器
- `register_global_event_handler(handler)` - 注册全局事件处理器
### MCPEvent
事件对象,包含以下属性:
- `event_type` - 事件类型
- `data` - 事件数据(字典格式)
- `timestamp` - 事件时间戳
- `event_id` - 事件 ID可选
- `retry` - 重试间隔(可选)
## 🤝 贡献
如果您在使用过程中遇到问题或有改进建议,欢迎:
1. 提交 Issue 报告问题
2. 提交 Pull Request 贡献代码
3. 完善文档和示例
## 📄 许可证
本功能遵循 MaiBot 项目的许可证协议。

View File

@@ -448,11 +448,6 @@ MODULE_COLORS = {
"web_surfing_tool": "\033[38;5;130m", # 棕色
"tts": "\033[38;5;136m", # 浅棕色
# MCP SSE
"mcp_sse_manager": "\033[38;5;202m", # 橙红色
"mcp_event_handler": "\033[38;5;208m", # 橙色
"mcp_sse_client": "\033[38;5;214m", # 橙黄色
# mais4u系统扩展
"s4u_config": "\033[38;5;18m", # 深蓝色
"action": "\033[38;5;52m", # 深红色mais4u的action
@@ -535,10 +530,7 @@ MODULE_ALIASES = {
"web_surfing_tool": "网络搜索",
"tts": "语音合成",
# MCP SSE
"mcp_sse_manager": "MCP管理器",
"mcp_event_handler": "MCP事件处",
"mcp_sse_client": "MCP客户端",
# mais4u系统扩展
"s4u_config": "直播配置",
"action": "直播动作",

View File

@@ -41,8 +41,7 @@ from src.config.official_configs import (
DependencyManagementConfig,
ExaConfig,
WebSearchConfig,
TavilyConfig,
MCPSSEConfig,
TavilyConfig
)
from .api_ada_configs import (
@@ -363,7 +362,6 @@ class Config(ConfigBase):
exa: ExaConfig = field(default_factory=lambda: ExaConfig())
web_search: WebSearchConfig = field(default_factory=lambda: WebSearchConfig())
tavily: TavilyConfig = field(default_factory=lambda: TavilyConfig())
mcp_sse: MCPSSEConfig = field(default_factory=lambda: MCPSSEConfig())
@dataclass

View File

@@ -1003,111 +1003,3 @@ class WebSearchConfig(ConfigBase):
search_strategy: str = "single"
"""搜索策略: 'single'(使用第一个可用引擎), 'parallel'(并行使用所有启用的引擎), 'fallback'(按顺序尝试,失败则尝试下一个)"""
@dataclass
class MCPSSEConfig(ConfigBase):
"""MCP Server-Sent Events 客户端配置类"""
enable: bool = False
"""是否启用 MCP SSE 客户端"""
server_url: str = ""
"""MCP 服务器 SSE 端点 URL例如: http://localhost:8080/events"""
auth_key: str = ""
"""MCP 服务器认证密钥"""
# 连接配置
connection_timeout: int = 30
"""连接超时时间(秒)"""
read_timeout: int = 60
"""读取超时时间(秒)"""
# 重连配置
enable_reconnect: bool = True
"""是否启用自动重连"""
max_reconnect_attempts: int = 10
"""最大重连尝试次数,-1 表示无限重连"""
initial_reconnect_delay: float = 1.0
"""初始重连延迟时间(秒)"""
max_reconnect_delay: float = 60.0
"""最大重连延迟时间(秒)"""
reconnect_backoff_factor: float = 2.0
"""重连延迟指数退避因子"""
# 事件处理配置
event_buffer_size: int = 1000
"""事件缓冲区大小"""
enable_event_logging: bool = True
"""是否启用事件日志记录"""
# 订阅配置
subscribed_events: list[str] = field(default_factory=lambda: [])
"""订阅的事件类型列表,空列表表示订阅所有事件"""
# 高级配置
custom_headers: Dict[str, str] = field(default_factory=dict)
"""自定义 HTTP 头部"""
user_agent: str = "MaiBot-MCP-SSE-Client/1.0"
"""用户代理字符串"""
# SSL 配置
verify_ssl: bool = True
"""是否验证 SSL 证书"""
ssl_cert_path: Optional[str] = None
"""SSL 客户端证书路径"""
ssl_key_path: Optional[str] = None
"""SSL 客户端密钥路径"""
def __post_init__(self):
"""配置验证"""
# 只有在启用时才验证必需的配置
if self.enable:
if not self.server_url:
raise ValueError("启用 MCP SSE 客户端时必须提供 server_url")
# 这些参数无论是否启用都需要验证(因为有默认值)
if self.connection_timeout <= 0:
raise ValueError("connection_timeout 必须大于 0")
if self.read_timeout <= 0:
raise ValueError("read_timeout 必须大于 0")
if self.max_reconnect_attempts < -1:
raise ValueError("max_reconnect_attempts 必须大于等于 -1")
if self.initial_reconnect_delay <= 0:
raise ValueError("initial_reconnect_delay 必须大于 0")
if self.max_reconnect_delay <= 0:
raise ValueError("max_reconnect_delay 必须大于 0")
if self.reconnect_backoff_factor <= 1.0:
raise ValueError("reconnect_backoff_factor 必须大于 1.0")
if self.event_buffer_size <= 0:
raise ValueError("event_buffer_size 必须大于 0")
def get_headers(self) -> Dict[str, str]:
"""获取完整的 HTTP 头部"""
headers = {
"Accept": "text/event-stream",
"Cache-Control": "no-cache",
"User-Agent": self.user_agent,
}
if self.auth_key:
headers["Authorization"] = f"Bearer {self.auth_key}"
headers.update(self.custom_headers)
return headers

View File

@@ -31,10 +31,6 @@ from src.common.message import get_global_api
if global_config.memory.enable_memory:
from src.chat.memory_system.Hippocampus import hippocampus_manager
# 条件导入 MCP SSE 系统
if global_config.mcp_sse.enable:
from src.mcp import initialize_mcp_sse_manager, start_mcp_sse_manager, stop_mcp_sse_manager
# 插件系统现在使用统一的插件加载器
install(extra_lines=3)
@@ -52,11 +48,6 @@ class MainSystem:
else:
self.hippocampus_manager = None
# 根据配置条件性地初始化 MCP SSE 系统
if global_config.mcp_sse.enable:
self.mcp_sse_manager = initialize_mcp_sse_manager(global_config.mcp_sse)
else:
self.mcp_sse_manager = None
self.individuality: Individuality = get_individuality()
@@ -86,14 +77,6 @@ class MainSystem:
except Exception as e:
logger.error(f"停止热重载系统时出错: {e}")
# 停止 MCP SSE 系统
if global_config.mcp_sse.enable and self.mcp_sse_manager:
try:
asyncio.create_task(stop_mcp_sse_manager())
logger.info("🛑 MCP SSE 系统已停止")
except Exception as e:
logger.error(f"停止 MCP SSE 系统时出错: {e}")
async def initialize(self):
"""初始化系统组件"""
logger.info(f"正在唤醒{global_config.bot.nickname}......")
@@ -179,18 +162,6 @@ MaiMbot-Pro-Max(第三方改版)
await schedule_manager.load_or_generate_today_schedule()
logger.info("日程表管理器初始化成功。")
# 根据配置条件性地启动 MCP SSE 系统
if global_config.mcp_sse.enable:
if self.mcp_sse_manager:
try:
await start_mcp_sse_manager()
logger.info("MCP SSE 系统初始化成功")
except Exception as e:
logger.error(f"MCP SSE 系统初始化失败: {e}")
else:
logger.warning("MCP SSE 系统已启用但管理器未初始化")
else:
logger.info("MCP SSE 系统已禁用,跳过初始化")
try:
init_time = int(1000 * (time.time() - init_start_time))

View File

@@ -1,32 +0,0 @@
"""
MCP (Model Context Protocol) 模块
提供 MCP 服务器的 Server-Sent Events (SSE) 客户端功能,
支持实时事件订阅、断线重连和事件处理。
"""
from .sse_client import MCPSSEClient
from .event_handler import MCPEventHandler, MCPEvent
from .exceptions import MCPConnectionError, MCPEventError
from .manager import (
MCPSSEManager,
get_mcp_sse_manager,
initialize_mcp_sse_manager,
start_mcp_sse_manager,
stop_mcp_sse_manager,
)
from .config import MCPSSEConfig
__all__ = [
"MCPSSEClient",
"MCPEventHandler",
"MCPEvent",
"MCPConnectionError",
"MCPEventError",
"MCPSSEManager",
"MCPSSEConfig",
"get_mcp_sse_manager",
"initialize_mcp_sse_manager",
"start_mcp_sse_manager",
"stop_mcp_sse_manager",
]

View File

@@ -1,112 +0,0 @@
"""
MCP SSE 客户端配置类
"""
from dataclasses import dataclass, field
from typing import Optional, Dict, Any
from src.config.config_base import ConfigBase
@dataclass
class MCPSSEConfig(ConfigBase):
"""MCP Server-Sent Events 客户端配置类"""
enable: bool = False
"""是否启用 MCP SSE 客户端"""
server_url: str = ""
"""MCP 服务器 SSE 端点 URL例如: http://localhost:8080/events"""
auth_key: str = ""
"""MCP 服务器认证密钥"""
# 连接配置
connection_timeout: int = 30
"""连接超时时间(秒)"""
read_timeout: int = 60
"""读取超时时间(秒)"""
# 重连配置
enable_reconnect: bool = True
"""是否启用自动重连"""
max_reconnect_attempts: int = 10
"""最大重连尝试次数,-1 表示无限重连"""
initial_reconnect_delay: float = 1.0
"""初始重连延迟时间(秒)"""
max_reconnect_delay: float = 60.0
"""最大重连延迟时间(秒)"""
reconnect_backoff_factor: float = 2.0
"""重连延迟指数退避因子"""
# 事件处理配置
event_buffer_size: int = 1000
"""事件缓冲区大小"""
enable_event_logging: bool = True
"""是否启用事件日志记录"""
# 订阅配置
subscribed_events: list[str] = field(default_factory=lambda: [])
"""订阅的事件类型列表,空列表表示订阅所有事件"""
# 高级配置
custom_headers: Dict[str, str] = field(default_factory=dict)
"""自定义 HTTP 头部"""
user_agent: str = "MaiBot-MCP-SSE-Client/1.0"
"""用户代理字符串"""
# SSL 配置
verify_ssl: bool = True
"""是否验证 SSL 证书"""
ssl_cert_path: Optional[str] = None
"""SSL 客户端证书路径"""
ssl_key_path: Optional[str] = None
"""SSL 客户端密钥路径"""
def __post_init__(self):
"""配置验证"""
if self.enable and not self.server_url:
raise ValueError("启用 MCP SSE 客户端时必须提供 server_url")
if self.connection_timeout <= 0:
raise ValueError("connection_timeout 必须大于 0")
if self.read_timeout <= 0:
raise ValueError("read_timeout 必须大于 0")
if self.max_reconnect_attempts < -1:
raise ValueError("max_reconnect_attempts 必须大于等于 -1")
if self.initial_reconnect_delay <= 0:
raise ValueError("initial_reconnect_delay 必须大于 0")
if self.max_reconnect_delay <= 0:
raise ValueError("max_reconnect_delay 必须大于 0")
if self.reconnect_backoff_factor <= 1.0:
raise ValueError("reconnect_backoff_factor 必须大于 1.0")
if self.event_buffer_size <= 0:
raise ValueError("event_buffer_size 必须大于 0")
def get_headers(self) -> Dict[str, str]:
"""获取完整的 HTTP 头部"""
headers = {
"Accept": "text/event-stream",
"Cache-Control": "no-cache",
"User-Agent": self.user_agent,
}
if self.auth_key:
headers["Authorization"] = f"Bearer {self.auth_key}"
headers.update(self.custom_headers)
return headers

View File

@@ -1,256 +0,0 @@
"""
MCP 事件处理器
"""
import json
import asyncio
from typing import Dict, Any, Callable, Optional, List
from dataclasses import dataclass
from datetime import datetime
from src.common.logger import get_logger
logger = get_logger("mcp_event_handler")
@dataclass
class MCPEvent:
"""MCP 事件数据类"""
event_type: str
"""事件类型"""
data: Dict[str, Any]
"""事件数据"""
timestamp: datetime
"""事件时间戳"""
event_id: Optional[str] = None
"""事件 ID"""
retry: Optional[int] = None
"""重试间隔(毫秒)"""
class MCPEventHandler:
"""MCP 事件处理器"""
def __init__(self):
self._event_handlers: Dict[str, List[Callable]] = {}
self._global_handlers: List[Callable] = []
self._event_buffer: List[MCPEvent] = []
self._buffer_size = 1000
self._running = False
def register_handler(self, event_type: str, handler: Callable[[MCPEvent], None]):
"""
注册事件处理器
Args:
event_type: 事件类型
handler: 事件处理函数
"""
if event_type not in self._event_handlers:
self._event_handlers[event_type] = []
self._event_handlers[event_type].append(handler)
logger.debug(f"注册事件处理器: {event_type}")
def register_global_handler(self, handler: Callable[[MCPEvent], None]):
"""
注册全局事件处理器(处理所有事件)
Args:
handler: 事件处理函数
"""
self._global_handlers.append(handler)
logger.debug("注册全局事件处理器")
def unregister_handler(self, event_type: str, handler: Callable[[MCPEvent], None]):
"""
取消注册事件处理器
Args:
event_type: 事件类型
handler: 事件处理函数
"""
if event_type in self._event_handlers:
try:
self._event_handlers[event_type].remove(handler)
logger.debug(f"取消注册事件处理器: {event_type}")
except ValueError:
logger.warning(f"尝试取消注册不存在的事件处理器: {event_type}")
def unregister_global_handler(self, handler: Callable[[MCPEvent], None]):
"""
取消注册全局事件处理器
Args:
handler: 事件处理函数
"""
try:
self._global_handlers.remove(handler)
logger.debug("取消注册全局事件处理器")
except ValueError:
logger.warning("尝试取消注册不存在的全局事件处理器")
async def handle_event(self, event: MCPEvent):
"""
处理单个事件
Args:
event: MCP 事件
"""
logger.debug(f"处理事件: {event.event_type}")
# 添加到事件缓冲区
self._add_to_buffer(event)
# 处理特定类型的事件处理器
if event.event_type in self._event_handlers:
for handler in self._event_handlers[event.event_type]:
try:
if asyncio.iscoroutinefunction(handler):
await handler(event)
else:
handler(event)
except Exception as e:
logger.error(f"事件处理器执行失败: {e}", exc_info=True)
# 处理全局事件处理器
for handler in self._global_handlers:
try:
if asyncio.iscoroutinefunction(handler):
await handler(event)
else:
handler(event)
except Exception as e:
logger.error(f"全局事件处理器执行失败: {e}", exc_info=True)
def _add_to_buffer(self, event: MCPEvent):
"""
添加事件到缓冲区
Args:
event: MCP 事件
"""
self._event_buffer.append(event)
# 如果缓冲区超过限制,移除最旧的事件
if len(self._event_buffer) > self._buffer_size:
self._event_buffer.pop(0)
def get_recent_events(self, count: int = 10) -> List[MCPEvent]:
"""
获取最近的事件
Args:
count: 获取的事件数量
Returns:
最近的事件列表
"""
return self._event_buffer[-count:]
def get_events_by_type(self, event_type: str, count: int = 10) -> List[MCPEvent]:
"""
根据类型获取事件
Args:
event_type: 事件类型
count: 获取的事件数量
Returns:
指定类型的事件列表
"""
filtered_events = [e for e in self._event_buffer if e.event_type == event_type]
return filtered_events[-count:]
def clear_buffer(self):
"""清空事件缓冲区"""
self._event_buffer.clear()
logger.debug("清空事件缓冲区")
def set_buffer_size(self, size: int):
"""
设置缓冲区大小
Args:
size: 缓冲区大小
"""
if size <= 0:
raise ValueError("缓冲区大小必须大于 0")
self._buffer_size = size
# 如果当前缓冲区超过新大小,截断
if len(self._event_buffer) > size:
self._event_buffer = self._event_buffer[-size:]
logger.debug(f"设置事件缓冲区大小: {size}")
def get_handler_count(self) -> Dict[str, int]:
"""
获取各类型事件处理器数量
Returns:
事件类型到处理器数量的映射
"""
counts = {}
for event_type, handlers in self._event_handlers.items():
counts[event_type] = len(handlers)
counts["global"] = len(self._global_handlers)
return counts
def parse_sse_event(raw_data: str) -> Optional[MCPEvent]:
"""
解析 SSE 事件数据
Args:
raw_data: 原始 SSE 数据
Returns:
解析后的 MCP 事件,如果解析失败返回 None
"""
try:
lines = raw_data.strip().split('\n')
event_type = None
event_data = None
event_id = None
retry = None
for line in lines:
line = line.strip()
if line.startswith('event:'):
event_type = line[6:].strip()
elif line.startswith('data:'):
data_str = line[5:].strip()
if data_str:
try:
event_data = json.loads(data_str)
except json.JSONDecodeError:
# 如果不是 JSON直接使用字符串
event_data = {"message": data_str}
elif line.startswith('id:'):
event_id = line[3:].strip()
elif line.startswith('retry:'):
try:
retry = int(line[6:].strip())
except ValueError:
pass
if event_type and event_data is not None:
return MCPEvent(
event_type=event_type,
data=event_data,
timestamp=datetime.now(),
event_id=event_id,
retry=retry
)
return None
except Exception as e:
logger.error(f"解析 SSE 事件失败: {e}")
return None

View File

@@ -1,67 +0,0 @@
"""
MCP SSE 客户端异常类
"""
class MCPError(Exception):
"""MCP 基础异常类"""
pass
class MCPConnectionError(MCPError):
"""MCP 连接异常"""
def __init__(self, message: str, url: str = None, status_code: int = None):
super().__init__(message)
self.url = url
self.status_code = status_code
def __str__(self):
base_msg = super().__str__()
if self.url:
base_msg += f" (URL: {self.url})"
if self.status_code:
base_msg += f" (Status: {self.status_code})"
return base_msg
class MCPEventError(MCPError):
"""MCP 事件处理异常"""
def __init__(self, message: str, event_type: str = None, event_data: str = None):
super().__init__(message)
self.event_type = event_type
self.event_data = event_data
def __str__(self):
base_msg = super().__str__()
if self.event_type:
base_msg += f" (Event Type: {self.event_type})"
return base_msg
class MCPAuthenticationError(MCPConnectionError):
"""MCP 认证异常"""
pass
class MCPTimeoutError(MCPConnectionError):
"""MCP 超时异常"""
pass
class MCPReconnectError(MCPConnectionError):
"""MCP 重连异常"""
def __init__(self, message: str, attempts: int = 0, max_attempts: int = 0):
super().__init__(message)
self.attempts = attempts
self.max_attempts = max_attempts
def __str__(self):
base_msg = super().__str__()
if self.max_attempts > 0:
base_msg += f" (Attempts: {self.attempts}/{self.max_attempts})"
else:
base_msg += f" (Attempts: {self.attempts})"
return base_msg

View File

@@ -1,260 +0,0 @@
"""
MCP SSE 管理器
负责管理 MCP SSE 客户端的生命周期,集成到 MaiBot 主系统中。
"""
import asyncio
from typing import Optional, Dict, Any, Callable
from datetime import datetime
from .sse_client import MCPSSEClient
from .config import MCPSSEConfig
from .event_handler import MCPEvent
from .exceptions import MCPConnectionError, MCPReconnectError
from src.common.logger import get_logger
logger = get_logger("mcp_sse_manager")
class MCPSSEManager:
"""MCP SSE 管理器"""
def __init__(self, config: MCPSSEConfig):
"""
初始化 MCP SSE 管理器
Args:
config: MCP SSE 配置
"""
self.config = config
self.client: Optional[MCPSSEClient] = None
self._task: Optional[asyncio.Task] = None
self._running = False
logger.info("初始化 MCP SSE 管理器")
async def start(self):
"""启动 MCP SSE 客户端"""
if not self.config.enable:
logger.info("MCP SSE 客户端未启用,跳过启动")
return
if self._running:
logger.warning("MCP SSE 客户端已在运行")
return
try:
# 创建客户端
self.client = MCPSSEClient(self.config)
# 注册默认事件处理器
self._register_default_handlers()
# 启动监听任务
self._task = asyncio.create_task(self._run_client())
self._running = True
logger.info("MCP SSE 客户端启动成功")
except Exception as e:
logger.error(f"启动 MCP SSE 客户端失败: {e}", exc_info=True)
await self.stop()
raise
async def stop(self):
"""停止 MCP SSE 客户端"""
if not self._running:
return
logger.info("停止 MCP SSE 客户端")
self._running = False
# 停止客户端
if self.client:
self.client.stop()
# 取消任务
if self._task and not self._task.done():
self._task.cancel()
try:
await self._task
except asyncio.CancelledError:
pass
# 断开连接
if self.client:
await self.client.disconnect()
self.client = None
self._task = None
logger.info("MCP SSE 客户端已停止")
async def _run_client(self):
"""运行客户端监听循环"""
if not self.client:
return
try:
await self.client.start_listening()
except MCPReconnectError as e:
logger.error(f"MCP SSE 客户端重连失败: {e}")
except Exception as e:
logger.error(f"MCP SSE 客户端运行异常: {e}", exc_info=True)
finally:
self._running = False
def _register_default_handlers(self):
"""注册默认事件处理器"""
if not self.client:
return
# 注册全局事件处理器用于日志记录
self.client.register_global_event_handler(self._log_event_handler)
# 注册一些常见事件的处理器
self.client.register_event_handler("system.status", self._handle_system_status)
self.client.register_event_handler("chat.message", self._handle_chat_message)
self.client.register_event_handler("user.action", self._handle_user_action)
logger.debug("注册默认 MCP 事件处理器")
def _log_event_handler(self, event: MCPEvent):
"""全局事件日志处理器"""
if self.config.enable_event_logging:
logger.debug(f"MCP 事件: {event.event_type} - {event.data}")
def _handle_system_status(self, event: MCPEvent):
"""处理系统状态事件"""
logger.info(f"收到系统状态事件: {event.data}")
# 这里可以添加具体的系统状态处理逻辑
def _handle_chat_message(self, event: MCPEvent):
"""处理聊天消息事件"""
logger.info(f"收到聊天消息事件: {event.data}")
# 这里可以添加具体的聊天消息处理逻辑
# 例如:触发 MaiBot 的回复逻辑
def _handle_user_action(self, event: MCPEvent):
"""处理用户行为事件"""
logger.info(f"收到用户行为事件: {event.data}")
# 这里可以添加具体的用户行为处理逻辑
def register_event_handler(self, event_type: str, handler: Callable[[MCPEvent], None]):
"""
注册自定义事件处理器
Args:
event_type: 事件类型
handler: 事件处理函数
"""
if self.client:
self.client.register_event_handler(event_type, handler)
logger.debug(f"注册自定义事件处理器: {event_type}")
else:
logger.warning("客户端未初始化,无法注册事件处理器")
def register_global_event_handler(self, handler: Callable[[MCPEvent], None]):
"""
注册全局事件处理器
Args:
handler: 事件处理函数
"""
if self.client:
self.client.register_global_event_handler(handler)
logger.debug("注册全局事件处理器")
else:
logger.warning("客户端未初始化,无法注册全局事件处理器")
def is_running(self) -> bool:
"""检查是否正在运行"""
return self._running
def is_connected(self) -> bool:
"""检查是否已连接"""
return self.client.is_connected() if self.client else False
def get_stats(self) -> Dict[str, Any]:
"""
获取统计信息
Returns:
统计信息字典
"""
if not self.client:
return {
"enabled": self.config.enable,
"running": False,
"connected": False,
"client_initialized": False
}
stats = self.client.get_stats()
stats.update({
"enabled": self.config.enable,
"client_initialized": True,
"server_url": self.config.server_url,
"subscribed_events": self.config.subscribed_events,
})
return stats
def get_recent_events(self, count: int = 10):
"""
获取最近的事件
Args:
count: 获取的事件数量
Returns:
最近的事件列表
"""
if self.client:
return self.client.get_recent_events(count)
return []
# 全局 MCP SSE 管理器实例
_mcp_sse_manager: Optional[MCPSSEManager] = None
def get_mcp_sse_manager() -> Optional[MCPSSEManager]:
"""获取全局 MCP SSE 管理器实例"""
return _mcp_sse_manager
def initialize_mcp_sse_manager(config: MCPSSEConfig) -> MCPSSEManager:
"""
初始化全局 MCP SSE 管理器
Args:
config: MCP SSE 配置
Returns:
MCP SSE 管理器实例
"""
global _mcp_sse_manager
if _mcp_sse_manager:
logger.warning("MCP SSE 管理器已初始化")
return _mcp_sse_manager
_mcp_sse_manager = MCPSSEManager(config)
logger.info("全局 MCP SSE 管理器初始化完成")
return _mcp_sse_manager
async def start_mcp_sse_manager():
"""启动全局 MCP SSE 管理器"""
if _mcp_sse_manager:
await _mcp_sse_manager.start()
else:
logger.warning("MCP SSE 管理器未初始化")
async def stop_mcp_sse_manager():
"""停止全局 MCP SSE 管理器"""
if _mcp_sse_manager:
await _mcp_sse_manager.stop()

View File

@@ -1,379 +0,0 @@
"""
MCP Server-Sent Events 客户端
"""
import asyncio
import aiohttp
import ssl
from typing import Optional, Dict, Any, Callable
from datetime import datetime
import time
import json
from .config import MCPSSEConfig
from .event_handler import MCPEventHandler, MCPEvent, parse_sse_event
from .exceptions import (
MCPConnectionError,
MCPAuthenticationError,
MCPTimeoutError,
MCPReconnectError,
MCPEventError
)
from src.common.logger import get_logger
logger = get_logger("mcp_sse_client")
class MCPSSEClient:
"""MCP Server-Sent Events 客户端"""
def __init__(self, config: MCPSSEConfig):
"""
初始化 MCP SSE 客户端
Args:
config: MCP SSE 配置
"""
self.config = config
self.event_handler = MCPEventHandler()
# 连接状态
self._session: Optional[aiohttp.ClientSession] = None
self._response: Optional[aiohttp.ClientResponse] = None
self._connected = False
self._running = False
# 重连状态
self._reconnect_attempts = 0
self._last_event_id: Optional[str] = None
# 统计信息
self._connection_start_time: Optional[datetime] = None
self._total_events_received = 0
self._last_event_time: Optional[datetime] = None
# 设置事件缓冲区大小
self.event_handler.set_buffer_size(config.event_buffer_size)
logger.info(f"初始化 MCP SSE 客户端: {config.server_url}")
async def connect(self) -> bool:
"""
连接到 MCP 服务器
Returns:
连接是否成功
"""
if self._connected:
logger.warning("客户端已连接")
return True
try:
# 创建 SSL 上下文
ssl_context = None
if self.config.server_url.startswith('https://'):
ssl_context = ssl.create_default_context()
if not self.config.verify_ssl:
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE
if self.config.ssl_cert_path and self.config.ssl_key_path:
ssl_context.load_cert_chain(
self.config.ssl_cert_path,
self.config.ssl_key_path
)
# 创建会话
timeout = aiohttp.ClientTimeout(
connect=self.config.connection_timeout,
sock_read=self.config.read_timeout
)
self._session = aiohttp.ClientSession(
timeout=timeout,
headers=self.config.get_headers()
)
# 建立连接
headers = {}
if self._last_event_id:
headers['Last-Event-ID'] = self._last_event_id
logger.info(f"连接到 MCP 服务器: {self.config.server_url}")
self._response = await self._session.get(
self.config.server_url,
headers=headers,
ssl=ssl_context
)
# 检查响应状态
if self._response.status == 401:
raise MCPAuthenticationError(
"认证失败",
url=self.config.server_url,
status_code=self._response.status
)
elif self._response.status != 200:
raise MCPConnectionError(
f"连接失败: HTTP {self._response.status}",
url=self.config.server_url,
status_code=self._response.status
)
# 检查内容类型
content_type = self._response.headers.get('Content-Type', '')
if 'text/event-stream' not in content_type:
raise MCPConnectionError(
f"无效的内容类型: {content_type}",
url=self.config.server_url
)
self._connected = True
self._connection_start_time = datetime.now()
self._reconnect_attempts = 0
logger.info("成功连接到 MCP 服务器")
return True
except asyncio.TimeoutError:
raise MCPTimeoutError(
"连接超时",
url=self.config.server_url
)
except Exception as e:
await self._cleanup_connection()
if isinstance(e, (MCPConnectionError, MCPAuthenticationError, MCPTimeoutError)):
raise
else:
raise MCPConnectionError(f"连接失败: {str(e)}", url=self.config.server_url)
async def disconnect(self):
"""断开连接"""
logger.info("断开 MCP 服务器连接")
self._running = False
await self._cleanup_connection()
async def _cleanup_connection(self):
"""清理连接资源"""
self._connected = False
if self._response:
self._response.close()
self._response = None
if self._session:
await self._session.close()
self._session = None
async def start_listening(self):
"""开始监听事件"""
if not self.config.enable:
logger.warning("MCP SSE 客户端未启用")
return
self._running = True
while self._running:
try:
if not self._connected:
await self.connect()
await self._listen_events()
except (MCPConnectionError, MCPTimeoutError) as e:
logger.error(f"连接错误: {e}")
await self._cleanup_connection()
if self.config.enable_reconnect:
await self._handle_reconnect()
else:
break
except Exception as e:
logger.error(f"监听事件时发生未知错误: {e}", exc_info=True)
await self._cleanup_connection()
if self.config.enable_reconnect:
await self._handle_reconnect()
else:
break
await self._cleanup_connection()
logger.info("停止监听 MCP 事件")
async def _listen_events(self):
"""监听事件流"""
if not self._response:
raise MCPConnectionError("没有活动的连接")
logger.info("开始监听 MCP 事件流")
buffer = ""
async for chunk in self._response.content.iter_chunked(1024):
if not self._running:
break
try:
# 解码数据
data = chunk.decode('utf-8')
buffer += data
# 处理完整的事件
while '\n\n' in buffer:
event_data, buffer = buffer.split('\n\n', 1)
if event_data.strip():
await self._process_event_data(event_data)
except UnicodeDecodeError as e:
logger.error(f"解码事件数据失败: {e}")
continue
except Exception as e:
logger.error(f"处理事件数据失败: {e}", exc_info=True)
continue
async def _process_event_data(self, event_data: str):
"""
处理事件数据
Args:
event_data: 原始事件数据
"""
try:
# 解析 SSE 事件
event = parse_sse_event(event_data)
if not event:
return
# 更新统计信息
self._total_events_received += 1
self._last_event_time = event.timestamp
if event.event_id:
self._last_event_id = event.event_id
# 检查事件订阅
if self.config.subscribed_events:
if event.event_type not in self.config.subscribed_events:
logger.debug(f"跳过未订阅的事件类型: {event.event_type}")
return
# 记录事件日志
if self.config.enable_event_logging:
logger.info(f"收到 MCP 事件: {event.event_type}")
logger.debug(f"事件数据: {event.data}")
# 处理事件
await self.event_handler.handle_event(event)
except Exception as e:
logger.error(f"处理事件失败: {e}", exc_info=True)
raise MCPEventError(f"处理事件失败: {str(e)}")
async def _handle_reconnect(self):
"""处理重连逻辑"""
if not self.config.enable_reconnect:
return
self._reconnect_attempts += 1
# 检查最大重连次数
if (self.config.max_reconnect_attempts > 0 and
self._reconnect_attempts > self.config.max_reconnect_attempts):
raise MCPReconnectError(
"超过最大重连次数",
attempts=self._reconnect_attempts,
max_attempts=self.config.max_reconnect_attempts
)
# 计算重连延迟(指数退避)
delay = min(
self.config.initial_reconnect_delay * (
self.config.reconnect_backoff_factor ** (self._reconnect_attempts - 1)
),
self.config.max_reconnect_delay
)
logger.info(f"{self._reconnect_attempts} 次重连尝试,延迟 {delay:.2f}")
await asyncio.sleep(delay)
def stop(self):
"""停止客户端"""
logger.info("停止 MCP SSE 客户端")
self._running = False
def is_connected(self) -> bool:
"""检查是否已连接"""
return self._connected
def is_running(self) -> bool:
"""检查是否正在运行"""
return self._running
def get_stats(self) -> Dict[str, Any]:
"""
获取客户端统计信息
Returns:
统计信息字典
"""
stats = {
"connected": self._connected,
"running": self._running,
"reconnect_attempts": self._reconnect_attempts,
"total_events_received": self._total_events_received,
"connection_start_time": self._connection_start_time,
"last_event_time": self._last_event_time,
"last_event_id": self._last_event_id,
}
if self._connection_start_time:
stats["uptime_seconds"] = (datetime.now() - self._connection_start_time).total_seconds()
# 添加事件处理器统计
stats["event_handlers"] = self.event_handler.get_handler_count()
return stats
def register_event_handler(self, event_type: str, handler: Callable[[MCPEvent], None]):
"""
注册事件处理器
Args:
event_type: 事件类型
handler: 事件处理函数
"""
self.event_handler.register_handler(event_type, handler)
def register_global_event_handler(self, handler: Callable[[MCPEvent], None]):
"""
注册全局事件处理器
Args:
handler: 事件处理函数
"""
self.event_handler.register_global_handler(handler)
def unregister_event_handler(self, event_type: str, handler: Callable[[MCPEvent], None]):
"""
取消注册事件处理器
Args:
event_type: 事件类型
handler: 事件处理函数
"""
self.event_handler.unregister_handler(event_type, handler)
def get_recent_events(self, count: int = 10):
"""
获取最近的事件
Args:
count: 获取的事件数量
Returns:
最近的事件列表
"""
return self.event_handler.get_recent_events(count)

View File

@@ -350,41 +350,3 @@ enable_url_tool = true # 是否启用URL解析tool
# 搜索引擎配置
enabled_engines = ["ddg"] # 启用的搜索引擎列表,可选: "exa", "tavily", "ddg"
search_strategy = "single" # 搜索策略: "single"(使用第一个可用引擎), "parallel"(并行使用所有启用的引擎), "fallback"(按顺序尝试,失败则尝试下一个)
# MCP Server-Sent Events 客户端配置
[mcp_sse]
enable = false # 是否启用 MCP SSE 客户端
server_url = "http://localhost:8080/events" # MCP 服务器 SSE 端点 URL例如: "http://localhost:8080/events"
auth_key = "" # MCP 服务器认证密钥
# 连接配置
connection_timeout = 30 # 连接超时时间(秒)
read_timeout = 60 # 读取超时时间(秒)
# 重连配置
enable_reconnect = true # 是否启用自动重连
max_reconnect_attempts = 10 # 最大重连尝试次数,-1 表示无限重连
initial_reconnect_delay = 1.0 # 初始重连延迟时间(秒)
max_reconnect_delay = 60.0 # 最大重连延迟时间(秒)
reconnect_backoff_factor = 2.0 # 重连延迟指数退避因子
# 事件处理配置
event_buffer_size = 1000 # 事件缓冲区大小
enable_event_logging = true # 是否启用事件日志记录
# 订阅配置
subscribed_events = [] # 订阅的事件类型列表,空列表表示订阅所有事件
# 示例: subscribed_events = ["chat.message", "user.login", "system.status"]
# 高级配置
user_agent = "MaiBot-MCP-SSE-Client/1.0" # 用户代理字符串
# SSL 配置
verify_ssl = true # 是否验证 SSL 证书
ssl_cert_path = "" # SSL 客户端证书路径(可选)
ssl_key_path = "" # SSL 客户端密钥路径(可选)
# 自定义 HTTP 头部(可选)
# [mcp_sse.custom_headers]
# "X-Custom-Header" = "custom-value"
# "X-API-Version" = "v1"