This commit is contained in:
tt-P607
2025-12-09 22:55:03 +08:00
174 changed files with 8740 additions and 5096 deletions

View File

@@ -25,11 +25,13 @@ from .apis import (
from .base import (
ActionActivationType,
ActionInfo,
AdapterInfo,
BaseAction,
BaseCommand,
BaseEventHandler,
BasePlugin,
BasePrompt,
BaseRouterComponent,
BaseTool,
ChatMode,
ChatType,
@@ -41,10 +43,8 @@ from .base import (
EventHandlerInfo,
EventType,
PluginInfo,
AdapterInfo,
# 新增的增强命令系统
PlusCommand,
BaseRouterComponent,
PythonDependency,
ToolInfo,
ToolParamType,

View File

@@ -13,7 +13,7 @@
"""
from enum import Enum
from typing import Any, TYPE_CHECKING
from typing import TYPE_CHECKING, Any
from src.common.logger import get_logger

View File

@@ -3,13 +3,11 @@
"""
import time
from typing import Any, TYPE_CHECKING
from src.common.message_repository import find_messages
from typing import TYPE_CHECKING, Any
from src.chat.message_receive.chat_stream import get_chat_manager
from src.chat.utils.chat_message_builder import (
build_readable_messages_with_id,
get_raw_msg_before_timestamp_with_chat,
)
from src.common.logger import get_logger
from src.common.message_repository import get_user_messages_from_streams
@@ -31,20 +29,20 @@ async def build_cross_context_s4u(
"""
# 记录S4U上下文构建开始
logger.debug("[S4U] Starting S4U context build.")
# 检查全局配置是否存在且包含必要部分
if not global_config or not global_config.cross_context or not global_config.bot:
logger.error("全局配置尚未初始化或缺少关键配置无法构建S4U上下文。")
return ""
# 获取跨上下文配置
cross_context_config = global_config.cross_context
# 检查目标用户信息和用户ID是否存在
if not target_user_info or not (user_id := target_user_info.get("user_id")):
logger.warning(f"[S4U] Failed: target_user_info ({target_user_info}) or user_id is missing.")
return ""
# 记录目标用户ID
logger.debug(f"[S4U] Target user ID: {user_id}")
@@ -56,14 +54,14 @@ async def build_cross_context_s4u(
# --- 1. 优先处理私聊上下文 ---
# 获取与目标用户的私聊流ID
private_stream_id = chat_manager.get_stream_id(chat_stream.platform, user_id, is_group=False)
# 如果存在私聊流且不是当前聊天流
if private_stream_id and private_stream_id != chat_stream.stream_id:
logger.debug(f"[S4U] Found private chat with target user: {private_stream_id}")
try:
# 定义需要获取消息的用户ID列表目标用户和机器人自己
user_ids_to_fetch = [str(user_id), str(global_config.bot.qq_account)]
# 从指定私聊流中获取双方的消息
messages_by_stream = await get_user_messages_from_streams(
user_ids=user_ids_to_fetch,
@@ -71,12 +69,12 @@ async def build_cross_context_s4u(
timestamp_after=time.time() - (3 * 24 * 60 * 60), # 最近3天的消息
limit_per_stream=cross_context_config.s4u_limit,
)
# 如果获取到了私聊消息
if private_messages := messages_by_stream.get(private_stream_id):
chat_name = await chat_manager.get_stream_name(private_stream_id) or "私聊"
title = f'[以下是您与"{chat_name}"的近期私聊记录]\n'
# 格式化消息为可读字符串
formatted, _ = await build_readable_messages_with_id(private_messages, timestamp_mode="relative")
private_context_block = f"{title}{formatted}"
@@ -86,7 +84,7 @@ async def build_cross_context_s4u(
# --- 2. 处理其他群聊上下文 ---
streams_to_scan = []
# 根据S4U配置模式白名单/黑名单)确定要扫描的聊天范围
if cross_context_config.s4u_mode == "whitelist":
# 白名单模式:只扫描在白名单中的聊天
@@ -95,7 +93,7 @@ async def build_cross_context_s4u(
platform, chat_type, chat_raw_id = chat_str.split(":")
is_group = chat_type == "group"
stream_id = chat_manager.get_stream_id(platform, chat_raw_id, is_group=is_group)
# 排除当前聊和私聊
if stream_id and stream_id != chat_stream.stream_id and stream_id != private_stream_id:
streams_to_scan.append(stream_id)
@@ -113,7 +111,7 @@ async def build_cross_context_s4u(
blacklisted_streams.add(stream_id)
except ValueError:
logger.warning(f"无效的S4U黑名单格式: {chat_str}")
# 将不在黑名单中的流添加到扫描列表
streams_to_scan.extend(
stream_id for stream_id in chat_manager.streams

View File

@@ -13,9 +13,9 @@
"""
from abc import ABC, abstractmethod # ABC: 抽象基类abstractmethod: 抽象方法装饰器
from dataclasses import dataclass # dataclass: 自动生成 __init__, __repr__ 等方法的装饰器
from enum import Enum # Enum: 枚举类型基类
from typing import Any # Any: 表示任意类型
from dataclasses import dataclass # dataclass: 自动生成 __init__, __repr__ 等方法的装饰器
from enum import Enum # Enum: 枚举类型基类
from typing import Any # Any: 表示任意类型
from src.common.logger import get_logger
@@ -25,7 +25,7 @@ logger = get_logger(__name__) # 获取当前模块的日志记录器
class PermissionLevel(Enum):
"""
权限等级枚举类。
定义了系统中的权限等级,目前只有 MASTER管理员/主人)级别。
MASTER 用户拥有最高权限,可以执行所有操作。
"""
@@ -36,9 +36,9 @@ class PermissionLevel(Enum):
class PermissionNode:
"""
权限节点数据类。
每个权限节点代表一个具体的权限项,例如"发送消息""管理用户"等。
属性:
node_name: 权限节点名称,例如 "plugin.chat.send_message"
description: 权限描述,用于向用户展示这个权限的用途
@@ -55,9 +55,9 @@ class PermissionNode:
class UserInfo:
"""
用户信息数据类。
用于唯一标识一个用户,通过平台+用户ID的组合确定用户身份。
属性:
platform: 用户所在平台,例如 "qq", "telegram", "discord"
user_id: 用户在该平台上的唯一标识ID
@@ -68,7 +68,7 @@ class UserInfo:
def __post_init__(self):
"""
dataclass 的后初始化钩子。
确保 user_id 始终是字符串类型,即使传入的是数字也会被转换。
这样可以避免类型不一致导致的比较问题。
"""
@@ -78,25 +78,25 @@ class UserInfo:
class IPermissionManager(ABC):
"""
权限管理器抽象接口Interface
这是一个抽象基类,定义了权限管理器必须实现的所有方法。
具体的权限管理实现类需要继承此接口并实现所有抽象方法。
使用抽象接口的好处:
1. 解耦PermissionAPI 不需要知道具体的实现细节
2. 可测试:可以轻松创建 Mock 实现用于测试
3. 可替换:可以随时更换不同的权限管理实现
"""
@abstractmethod
async def check_permission(self, user: UserInfo, permission_node: str) -> bool:
"""
检查用户是否拥有指定权限。
Args:
user: 要检查的用户信息
permission_node: 权限节点名称
Returns:
bool: True 表示用户拥有该权限False 表示没有
"""
@@ -106,12 +106,12 @@ class IPermissionManager(ABC):
async def is_master(self, user: UserInfo) -> bool:
"""
检查用户是否是管理员/主人。
管理员拥有最高权限,通常绕过所有权限检查。
Args:
user: 要检查的用户信息
Returns:
bool: True 表示是管理员False 表示不是
"""
@@ -121,12 +121,12 @@ class IPermissionManager(ABC):
async def register_permission_node(self, node: PermissionNode) -> bool:
"""
注册一个新的权限节点。
插件在加载时会调用此方法注册自己需要的权限。
Args:
node: 要注册的权限节点信息
Returns:
bool: True 表示注册成功False 表示失败(可能是重复注册)
"""
@@ -136,11 +136,11 @@ class IPermissionManager(ABC):
async def grant_permission(self, user: UserInfo, permission_node: str) -> bool:
"""
授予用户指定权限。
Args:
user: 目标用户信息
permission_node: 要授予的权限节点名称
Returns:
bool: True 表示授权成功False 表示失败
"""
@@ -150,11 +150,11 @@ class IPermissionManager(ABC):
async def revoke_permission(self, user: UserInfo, permission_node: str) -> bool:
"""
撤销用户的指定权限。
Args:
user: 目标用户信息
permission_node: 要撤销的权限节点名称
Returns:
bool: True 表示撤销成功False 表示失败
"""
@@ -164,10 +164,10 @@ class IPermissionManager(ABC):
async def get_user_permissions(self, user: UserInfo) -> list[str]:
"""
获取用户拥有的所有权限列表。
Args:
user: 目标用户信息
Returns:
list[str]: 用户拥有的权限节点名称列表
"""
@@ -177,7 +177,7 @@ class IPermissionManager(ABC):
async def get_all_permission_nodes(self) -> list[PermissionNode]:
"""
获取系统中所有已注册的权限节点。
Returns:
list[PermissionNode]: 所有权限节点的列表
"""
@@ -187,10 +187,10 @@ class IPermissionManager(ABC):
async def get_plugin_permission_nodes(self, plugin_name: str) -> list[PermissionNode]:
"""
获取指定插件注册的所有权限节点。
Args:
plugin_name: 插件名称
Returns:
list[PermissionNode]: 该插件注册的权限节点列表
"""
@@ -200,27 +200,27 @@ class IPermissionManager(ABC):
class PermissionAPI:
"""
权限API封装类。
这是对外暴露的权限操作接口,插件和其他模块通过这个类来进行权限相关操作。
它封装了底层的 IPermissionManager提供更简洁的调用方式。
使用方式:
from src.plugin_system.apis.permission_api import permission_api
# 检查权限
has_perm = await permission_api.check_permission("qq", "12345", "chat.send")
# 检查是否是管理员
is_admin = await permission_api.is_master("qq", "12345")
设计模式:
这是一个单例模式的变体,模块级别的 permission_api 实例供全局使用。
"""
def __init__(self):
"""
初始化 PermissionAPI。
初始时权限管理器为 None需要在系统启动时通过 set_permission_manager 设置。
"""
self._permission_manager: IPermissionManager | None = None # 底层权限管理器实例
@@ -228,9 +228,9 @@ class PermissionAPI:
def set_permission_manager(self, manager: IPermissionManager):
"""
设置权限管理器实例。
这个方法应该在系统启动时被调用,注入具体的权限管理器实现。
Args:
manager: 实现了 IPermissionManager 接口的权限管理器实例
"""
@@ -239,10 +239,10 @@ class PermissionAPI:
def _ensure_manager(self):
"""
确保权限管理器已设置(内部辅助方法)。
如果权限管理器未设置,抛出 RuntimeError 异常。
这是一个防御性编程措施,帮助开发者快速发现配置问题。
Raises:
RuntimeError: 当权限管理器未设置时
"""
@@ -252,17 +252,17 @@ class PermissionAPI:
async def check_permission(self, platform: str, user_id: str, permission_node: str) -> bool:
"""
检查用户是否拥有指定权限。
这是最常用的权限检查方法,在执行需要权限的操作前调用。
Args:
platform: 用户所在平台(如 "qq", "telegram"
user_id: 用户ID
permission_node: 要检查的权限节点名称
Returns:
bool: True 表示用户拥有权限False 表示没有
Example:
if await permission_api.check_permission("qq", "12345", "admin.ban_user"):
# 执行封禁操作
@@ -276,13 +276,13 @@ class PermissionAPI:
async def is_master(self, platform: str, user_id: str) -> bool:
"""
检查用户是否是管理员/主人。
管理员是系统的最高权限用户,通常在配置文件中指定。
Args:
platform: 用户所在平台
user_id: 用户ID
Returns:
bool: True 表示是管理员False 表示不是
"""
@@ -302,19 +302,19 @@ class PermissionAPI:
) -> bool:
"""
注册一个新的权限节点。
插件在初始化时应调用此方法注册自己需要的权限节点。
Args:
node_name: 权限节点名称,建议使用 "插件名.功能.操作" 的格式
description: 权限描述,向用户解释这个权限的作用
plugin_name: 注册此权限的插件名称
default_granted: 是否默认授予所有用户(默认 False需要显式授权
allow_relative: 预留参数,是否允许相对权限名(目前未使用)
Returns:
bool: True 表示注册成功False 表示失败
Example:
await permission_api.register_permission_node(
node_name="my_plugin.chat.send_image",
@@ -324,7 +324,6 @@ class PermissionAPI:
)
"""
self._ensure_manager()
original_name = node_name # 保存原始名称(预留给相对路径处理)
# 创建权限节点对象
node = PermissionNode(node_name, description, plugin_name, default_granted)
@@ -337,17 +336,17 @@ class PermissionAPI:
async def grant_permission(self, platform: str, user_id: str, permission_node: str) -> bool:
"""
授予用户指定权限。
通常由管理员调用,给某个用户赋予特定权限。
Args:
platform: 目标用户所在平台
user_id: 目标用户ID
permission_node: 要授予的权限节点名称
Returns:
bool: True 表示授权成功False 表示失败
Example:
# 授予用户管理权限
await permission_api.grant_permission("qq", "12345", "admin.manage_users")
@@ -360,14 +359,14 @@ class PermissionAPI:
async def revoke_permission(self, platform: str, user_id: str, permission_node: str) -> bool:
"""
撤销用户的指定权限。
通常由管理员调用,移除某个用户的特定权限。
Args:
platform: 目标用户所在平台
user_id: 目标用户ID
permission_node: 要撤销的权限节点名称
Returns:
bool: True 表示撤销成功False 表示失败
"""
@@ -379,16 +378,16 @@ class PermissionAPI:
async def get_user_permissions(self, platform: str, user_id: str) -> list[str]:
"""
获取用户拥有的所有权限列表。
可用于展示用户的权限信息,或进行批量权限检查。
Args:
platform: 目标用户所在平台
user_id: 目标用户ID
Returns:
list[str]: 用户拥有的所有权限节点名称列表
Example:
perms = await permission_api.get_user_permissions("qq", "12345")
print(f"用户拥有以下权限: {perms}")
@@ -401,16 +400,16 @@ class PermissionAPI:
async def get_all_permission_nodes(self) -> list[dict[str, Any]]:
"""
获取系统中所有已注册的权限节点。
返回所有插件注册的权限节点信息,可用于权限管理界面展示。
Returns:
list[dict]: 权限节点信息列表,每个字典包含:
- node_name: 权限节点名称
- description: 权限描述
- plugin_name: 所属插件名称
- default_granted: 是否默认授予
Note:
返回字典而非 PermissionNode 对象便于序列化和API响应。
"""
@@ -432,12 +431,12 @@ class PermissionAPI:
async def get_plugin_permission_nodes(self, plugin_name: str) -> list[dict[str, Any]]:
"""
获取指定插件注册的所有权限节点。
用于查看某个特定插件定义了哪些权限。
Args:
plugin_name: 插件名称
Returns:
list[dict]: 该插件的权限节点信息列表,格式同 get_all_permission_nodes
"""

View File

@@ -217,7 +217,7 @@ async def load_plugin(plugin_name: str) -> bool:
logger.info(f"插件 '{plugin_name}' 加载成功。")
else:
logger.error(f"插件 '{plugin_name}' 加载失败。")
return success

View File

@@ -208,7 +208,7 @@ class ScheduleAPI:
if not time_range:
continue
try:
event_start_str, event_end_str = time_range.split("-")
event_start_str, _event_end_str = time_range.split("-")
event_start = datetime.strptime(event_start_str.strip(), "%H:%M").time()
if start <= event_start < end:
activities_in_range.append(event)

View File

@@ -93,7 +93,9 @@ import uuid
from typing import TYPE_CHECKING, Any
from mofox_wire import MessageEnvelope
from src.common.data_models.database_data_model import DatabaseUserInfo
if TYPE_CHECKING:
from src.common.data_models.database_data_model import DatabaseMessages

View File

@@ -420,14 +420,6 @@ class UnifiedScheduler:
# 取消所有正在执行的任务
await self._cancel_all_running_tasks()
# 显示最终统计
stats = self.get_statistics()
logger.info(
f"调度器最终统计: 总任务={stats['total_tasks']}, "
f"执行次数={stats['total_executions']}, "
f"失败={stats['total_failures']}"
)
# 清理资源
self._tasks.clear()
self._tasks_by_name.clear()
@@ -623,7 +615,7 @@ class UnifiedScheduler:
async def _execute_task(self, task: ScheduleTask) -> None:
"""执行单个任务(完全隔离)"""
execution = task.start_execution()
task.start_execution()
self._deadlock_detector.register_task(task.schedule_id, task.task_name)
try:
@@ -763,7 +755,7 @@ class UnifiedScheduler:
async def _execute_event_task(self, task: ScheduleTask, event_params: dict[str, Any]) -> None:
"""执行事件触发的任务"""
execution = task.start_execution()
task.start_execution()
self._deadlock_detector.register_task(task.schedule_id, task.task_name)
try:
@@ -867,7 +859,7 @@ class UnifiedScheduler:
for i, timeout in enumerate(timeouts):
try:
# 使用 asyncio.wait 代替 wait_for避免重新抛出异常
done, pending = await asyncio.wait({task._asyncio_task}, timeout=timeout)
done, _pending = await asyncio.wait({task._asyncio_task}, timeout=timeout)
if done:
# 任务已完成(可能是正常完成或被取消)

View File

@@ -44,6 +44,7 @@ __all__ = [
"BaseEventHandler",
"BasePlugin",
"BasePrompt",
"BaseRouterComponent",
"BaseTool",
"ChatMode",
"ChatType",
@@ -58,7 +59,6 @@ __all__ = [
"PluginMetadata",
# 增强命令系统
"PlusCommand",
"BaseRouterComponent",
"PlusCommandInfo",
"PythonDependency",
"ToolInfo",

View File

@@ -10,12 +10,13 @@ from __future__ import annotations
import asyncio
from abc import ABC, abstractmethod
from pathlib import Path
from typing import TYPE_CHECKING, Any, Dict, Optional
from typing import TYPE_CHECKING, Any
from mofox_wire import AdapterBase as MoFoxAdapterBase, CoreSink, MessageEnvelope, ProcessCoreSink
from mofox_wire import AdapterBase as MoFoxAdapterBase
from mofox_wire import CoreSink, MessageEnvelope, ProcessCoreSink
if TYPE_CHECKING:
from src.plugin_system import BasePlugin, AdapterInfo
from src.plugin_system import AdapterInfo, BasePlugin
from src.common.logger import get_logger
@@ -25,7 +26,7 @@ logger = get_logger("plugin.adapter")
class BaseAdapter(MoFoxAdapterBase, ABC):
"""
插件系统的 Adapter 基类
相比 mofox_wire.AdapterBase增加了以下特性
1. 插件生命周期管理 (on_adapter_loaded, on_adapter_unloaded)
2. 配置管理集成
@@ -38,17 +39,17 @@ class BaseAdapter(MoFoxAdapterBase, ABC):
adapter_version: str = "0.0.1"
adapter_author: str = "Unknown"
adapter_description: str = "No description"
# 是否在子进程中运行
run_in_subprocess: bool = True
# 子进程启动脚本路径(相对于插件目录)
subprocess_entry: Optional[str] = None
subprocess_entry: str | None = None
def __init__(
self,
core_sink: CoreSink,
plugin: Optional[BasePlugin] = None,
plugin: BasePlugin | None = None,
**kwargs
):
"""
@@ -59,8 +60,8 @@ class BaseAdapter(MoFoxAdapterBase, ABC):
"""
super().__init__(core_sink, **kwargs)
self.plugin = plugin
self._config: Dict[str, Any] = {}
self._health_check_task: Optional[asyncio.Task] = None
self._config: dict[str, Any] = {}
self._health_check_task: asyncio.Task | None = None
self._running = False
# 标记是否在子进程中运行
self._is_subprocess = False
@@ -70,7 +71,7 @@ class BaseAdapter(MoFoxAdapterBase, ABC):
cls,
to_core_queue,
from_core_queue,
plugin: Optional["BasePlugin"] = None,
plugin: "BasePlugin" | None = None,
**kwargs: Any,
) -> "BaseAdapter":
"""
@@ -86,14 +87,14 @@ class BaseAdapter(MoFoxAdapterBase, ABC):
return cls(core_sink=sink, plugin=plugin, **kwargs)
@property
def config(self) -> Dict[str, Any]:
def config(self) -> dict[str, Any]:
"""获取适配器配置"""
if self.plugin and hasattr(self.plugin, "config"):
return self.plugin.config
return self._config
@config.setter
def config(self, value: Dict[str, Any]) -> None:
def config(self, value: dict[str, Any]) -> None:
"""设置适配器配置"""
self._config = value
@@ -111,26 +112,26 @@ class BaseAdapter(MoFoxAdapterBase, ABC):
async def start(self) -> None:
"""启动适配器"""
logger.info(f"启动适配器: {self.adapter_name} v{self.adapter_version}")
# 调用生命周期钩子
await self.on_adapter_loaded()
# 调用父类启动
await super().start()
# 启动健康检查
if self.config.get("enable_health_check", False):
self._health_check_task = asyncio.create_task(self._health_check_loop())
self._running = True
logger.info(f"适配器 {self.adapter_name} 启动成功")
async def stop(self) -> None:
"""停止适配器"""
logger.info(f"停止适配器: {self.adapter_name}")
self._running = False
# 停止健康检查
if self._health_check_task and not self._health_check_task.done():
self._health_check_task.cancel()
@@ -138,13 +139,13 @@ class BaseAdapter(MoFoxAdapterBase, ABC):
await self._health_check_task
except asyncio.CancelledError:
pass
# 调用父类停止
await super().stop()
# 调用生命周期钩子
await self.on_adapter_unloaded()
logger.info(f"适配器 {self.adapter_name} 已停止")
async def on_adapter_loaded(self) -> None:
@@ -164,18 +165,18 @@ class BaseAdapter(MoFoxAdapterBase, ABC):
async def _health_check_loop(self) -> None:
"""健康检查循环"""
interval = self.config.get("health_check_interval", 30)
while self._running:
try:
await asyncio.sleep(interval)
# 执行健康检查
is_healthy = await self.health_check()
if not is_healthy:
logger.warning(f"适配器 {self.adapter_name} 健康检查失败,尝试重连...")
await self.reconnect()
except asyncio.CancelledError:
break
except Exception as e:
@@ -185,7 +186,7 @@ class BaseAdapter(MoFoxAdapterBase, ABC):
"""
健康检查
子类可重写以实现自定义检查逻辑
Returns:
bool: 是否健康
"""
@@ -206,38 +207,38 @@ class BaseAdapter(MoFoxAdapterBase, ABC):
except Exception as e:
logger.error(f"适配器 {self.adapter_name} 重连失败: {e}")
def get_subprocess_entry_path(self) -> Optional[Path]:
def get_subprocess_entry_path(self) -> Path | None:
"""
获取子进程启动脚本的完整路径
Returns:
Path | None: 脚本路径,如果不存在则返回 None
"""
if not self.subprocess_entry:
return None
if not self.plugin:
return None
# 获取插件目录
plugin_dir = Path(self.plugin.__file__).parent
entry_path = plugin_dir / self.subprocess_entry
if entry_path.exists():
return entry_path
logger.warning(f"子进程入口脚本不存在: {entry_path}")
return None
@classmethod
def get_adapter_info(cls) -> "AdapterInfo":
"""获取适配器的信息
Returns:
AdapterInfo: 适配器组件信息
"""
from src.plugin_system.base.component_types import AdapterInfo
return AdapterInfo(
name=getattr(cls, "adapter_name", cls.__name__.lower().replace("adapter", "")),
version=getattr(cls, "adapter_version", "1.0.0"),
@@ -252,12 +253,12 @@ class BaseAdapter(MoFoxAdapterBase, ABC):
async def from_platform_message(self, raw: Any) -> MessageEnvelope:
"""
将平台原始消息转换为 MessageEnvelope
子类必须实现此方法
Args:
raw: 平台原始消息
Returns:
MessageEnvelope: 统一的消息信封
"""
@@ -266,10 +267,10 @@ class BaseAdapter(MoFoxAdapterBase, ABC):
async def _send_platform_message(self, envelope: MessageEnvelope) -> None:
"""
发送消息到平台
如果使用了 WebSocketAdapterOptions 或 HttpAdapterOptions
此方法会自动处理。否则子类需要重写此方法。
Args:
envelope: 要发送的消息信封
"""

View File

@@ -14,12 +14,13 @@ from __future__ import annotations
import asyncio
import importlib
import multiprocessing as mp
from typing import TYPE_CHECKING, Dict, Optional
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from src.plugin_system.base.base_adapter import BaseAdapter
from mofox_wire import ProcessCoreSinkServer
from src.common.logger import get_logger
logger = get_logger("adapter_manager")
@@ -64,11 +65,11 @@ def _adapter_process_entry(
):
"""
子进程适配器入口函数
在子进程中运行,创建 ProcessCoreSink 与主进程通信
"""
import asyncio
import contextlib
from mofox_wire import ProcessCoreSink
async def _run() -> None:
@@ -77,14 +78,14 @@ def _adapter_process_entry(
if plugin_info:
plugin_cls = _load_class(plugin_info["module"], plugin_info["class"])
plugin_instance = plugin_cls(plugin_info["plugin_dir"], plugin_info["metadata"])
# 创建 ProcessCoreSink 用于与主进程通信
core_sink = ProcessCoreSink(to_core_queue=incoming_queue, from_core_queue=outgoing_queue)
# 创建并启动适配器
adapter = adapter_cls(core_sink, plugin=plugin_instance)
await adapter.start()
try:
while not getattr(core_sink, "_closed", False):
await asyncio.sleep(0.2)
@@ -101,7 +102,7 @@ def _adapter_process_entry(
class AdapterProcess:
"""
适配器子进程封装:管理子进程的生命周期与通信桥接
使用 CoreSinkManager 创建通信队列,自动维护与子进程的消息通道
"""
@@ -132,13 +133,13 @@ class AdapterProcess:
"""启动适配器子进程"""
try:
logger.info(f"启动适配器子进程: {self.adapter_name}")
# 从 CoreSinkManager 获取通信队列
from src.common.core_sink_manager import get_core_sink_manager
manager = get_core_sink_manager()
self._incoming_queue, self._outgoing_queue = manager.create_process_sink_queues(self.adapter_name)
# 启动子进程
self.process = self._ctx.Process(
target=_adapter_process_entry,
@@ -146,10 +147,10 @@ class AdapterProcess:
name=f"{self.adapter_name}-proc",
)
self.process.start()
logger.info(f"启动适配器子进程 {self.adapter_name} (PID: {self.process.pid})")
return True
except Exception as e:
logger.error(f"启动适配器子进程 {self.adapter_name} 失败: {e}")
return False
@@ -158,25 +159,25 @@ class AdapterProcess:
"""停止适配器子进程"""
if not self.process:
return
logger.info(f"停止适配器子进程: {self.adapter_name} (PID: {self.process.pid})")
try:
# 从 CoreSinkManager 移除通信队列
from src.common.core_sink_manager import get_core_sink_manager
manager = get_core_sink_manager()
manager.remove_process_sink(self.adapter_name)
# 等待子进程结束
if self.process.is_alive():
self.process.join(timeout=5.0)
if self.process.is_alive():
logger.warning(f"适配器 {self.adapter_name} 未能及时停止,强制终止中")
self.process.terminate()
self.process.join()
except Exception as e:
logger.error(f"停止适配器子进程 {self.adapter_name} 时发生错误: {e}")
finally:
@@ -193,7 +194,7 @@ class AdapterProcess:
class AdapterManager:
"""
适配器管理器
负责管理所有注册的适配器,根据 run_in_subprocess 属性自动选择:
- run_in_subprocess=True: 在子进程中运行,使用 ProcessCoreSink
- run_in_subprocess=False: 在主进程中运行,使用 InProcessCoreSink
@@ -201,9 +202,9 @@ class AdapterManager:
def __init__(self):
# 注册信息name -> (adapter class, plugin instance | None)
self._adapter_defs: Dict[str, tuple[type[BaseAdapter], object | None]] = {}
self._adapter_processes: Dict[str, AdapterProcess] = {}
self._in_process_adapters: Dict[str, BaseAdapter] = {}
self._adapter_defs: dict[str, tuple[type[BaseAdapter], object | None]] = {}
self._adapter_processes: dict[str, AdapterProcess] = {}
self._in_process_adapters: dict[str, BaseAdapter] = {}
def register_adapter(self, adapter_cls: type[BaseAdapter], plugin=None) -> None:
"""
@@ -213,15 +214,15 @@ class AdapterManager:
adapter_cls: 适配器类
plugin: 可选 Plugin 实例
"""
adapter_name = getattr(adapter_cls, 'adapter_name', adapter_cls.__name__)
adapter_name = getattr(adapter_cls, "adapter_name", adapter_cls.__name__)
if adapter_name in self._adapter_defs:
logger.warning(f"适配器 {adapter_name} 已注册,已覆盖")
self._adapter_defs[adapter_name] = (adapter_cls, plugin)
adapter_version = getattr(adapter_cls, 'adapter_version', 'unknown')
run_in_subprocess = getattr(adapter_cls, 'run_in_subprocess', False)
adapter_version = getattr(adapter_cls, "adapter_version", "unknown")
run_in_subprocess = getattr(adapter_cls, "run_in_subprocess", False)
logger.info(
f"注册适配器: {adapter_name} v{adapter_version} "
f"(子进程: {'' if run_in_subprocess else ''})"
@@ -230,7 +231,7 @@ class AdapterManager:
async def start_adapter(self, adapter_name: str) -> bool:
"""
启动指定适配器
根据适配器的 run_in_subprocess 属性自动选择:
- True: 创建子进程,使用 ProcessCoreSink
- False: 在当前进程,使用 InProcessCoreSink
@@ -239,7 +240,7 @@ class AdapterManager:
if not definition:
logger.error(f"适配器 {adapter_name} 未注册")
return False
adapter_cls, plugin = definition
run_in_subprocess = getattr(adapter_cls, "run_in_subprocess", False)
@@ -248,9 +249,9 @@ class AdapterManager:
return await self._start_adapter_in_process(adapter_name, adapter_cls, plugin)
async def _start_adapter_subprocess(
self,
adapter_name: str,
adapter_cls: type[BaseAdapter],
self,
adapter_name: str,
adapter_cls: type[BaseAdapter],
plugin
) -> bool:
"""在子进程中启动适配器(使用 ProcessCoreSink"""
@@ -263,24 +264,24 @@ class AdapterManager:
return success
async def _start_adapter_in_process(
self,
adapter_name: str,
adapter_cls: type[BaseAdapter],
self,
adapter_name: str,
adapter_cls: type[BaseAdapter],
plugin
) -> bool:
"""在当前进程中启动适配器(使用 InProcessCoreSink"""
try:
# 从 CoreSinkManager 获取 InProcessCoreSink
from src.common.core_sink_manager import get_core_sink_manager
core_sink = get_core_sink_manager().get_in_process_sink()
adapter = adapter_cls(core_sink, plugin=plugin) # type: ignore[call-arg]
await adapter.start()
self._in_process_adapters[adapter_name] = adapter
logger.info(f"适配器 {adapter_name} 已在当前进程启动")
return True
except Exception as e:
logger.error(f"启动适配器 {adapter_name} 失败: {e}")
return False
@@ -288,7 +289,7 @@ class AdapterManager:
async def stop_adapter(self, adapter_name: str) -> None:
"""
停止指定的适配器
Args:
adapter_name: 适配器名称
"""
@@ -327,20 +328,20 @@ class AdapterManager:
logger.info("所有适配器已停止")
def get_adapter(self, adapter_name: str) -> Optional[BaseAdapter]:
def get_adapter(self, adapter_name: str) -> BaseAdapter | None:
"""
获取适配器实例
Args:
adapter_name: 适配器名称
Returns:
BaseAdapter | None: 适配器实例,如果不存在则返回 None
"""
# 只返回在主进程中运行的适配器
return self._in_process_adapters.get(adapter_name)
def list_adapters(self) -> Dict[str, Dict[str, any]]:
def list_adapters(self) -> dict[str, dict[str, any]]:
"""列出适配器状态"""
result = {}
@@ -371,7 +372,7 @@ class AdapterManager:
# 全局单例
_adapter_manager: Optional[AdapterManager] = None
_adapter_manager: AdapterManager | None = None
def get_adapter_manager() -> AdapterManager:

View File

@@ -137,7 +137,7 @@ class EventManager:
return False
event.enabled = True
logger.info(f"事件 {event_name} 已启用")
logger.debug(f"事件 {event_name} 已启用")
return True
def disable_event(self, event_name: EventType | str) -> bool:
@@ -155,7 +155,7 @@ class EventManager:
return False
event.enabled = False
logger.info(f"事件 {event_name} 已禁用")
logger.debug(f"事件 {event_name} 已禁用")
return True
def register_event_handler(self, handler_class: type[BaseEventHandler], plugin_config: dict | None = None) -> bool:
@@ -198,7 +198,7 @@ class EventManager:
self._pending_subscriptions[handler_name] = failed_subscriptions
logger.warning(f"事件处理器 {handler_name} 的部分订阅失败,已缓存: {failed_subscriptions}")
logger.info(f"事件处理器 {handler_name} 注册成功")
logger.debug(f"事件处理器 {handler_name} 注册成功")
return True
def get_event_handler(self, handler_name: str) -> BaseEventHandler | None:
@@ -242,11 +242,11 @@ class EventManager:
for event in self._events.values():
# 创建订阅者列表的副本进行迭代,以安全地修改原始列表
for subscriber in list(event.subscribers):
if getattr(subscriber, 'handler_name', None) == handler_name:
if getattr(subscriber, "handler_name", None) == handler_name:
event.subscribers.remove(subscriber)
logger.debug(f"事件处理器 {handler_name} 已从事件 {event.name} 取消订阅。")
logger.info(f"事件处理器 {handler_name} 已被完全移除。")
logger.debug(f"事件处理器 {handler_name} 已被完全移除。")
return True
@@ -284,7 +284,7 @@ class EventManager:
# 按权重从高到低排序订阅者
event.subscribers.sort(key=lambda h: getattr(h, "weight", 0), reverse=True)
logger.info(f"事件处理器 {handler_name} 成功订阅到事件 {event_name},当前权重排序完成")
logger.debug(f"事件处理器 {handler_name} 成功订阅到事件 {event_name},当前权重排序完成")
return True
def unsubscribe_handler_from_event(self, handler_name: str, event_name: EventType | str) -> bool:
@@ -311,7 +311,7 @@ class EventManager:
break
if removed:
logger.info(f"事件处理器 {handler_name} 成功从事件 {event_name} 取消订阅")
logger.debug(f"事件处理器 {handler_name} 成功从事件 {event_name} 取消订阅")
else:
logger.warning(f"事件处理器 {handler_name} 未订阅事件 {event_name}")

View File

@@ -38,14 +38,14 @@ class PermissionManager(IPermissionManager):
try:
master_users_config = global_config.permission.master_users
if not isinstance(master_users_config, list):
logger.warning(f"配置文件中的 permission.master_users 不是一个列表,已跳过加载。")
logger.warning("配置文件中的 permission.master_users 不是一个列表,已跳过加载。")
self._master_users = set()
return
self._master_users = set()
for i, user_info in enumerate(master_users_config):
if not isinstance(user_info, list) or len(user_info) != 2:
logger.warning(f"Master用户配置项格式错误 (索引: {i}): {user_info},应为 [\"platform\", \"user_id\"]")
logger.warning(f'Master用户配置项格式错误 (索引: {i}): {user_info},应为 ["platform", "user_id"]')
continue
platform, user_id = user_info

View File

@@ -33,9 +33,9 @@ class PluginManager:
self.loaded_plugins: dict[str, PluginBase] = {} # 已加载的插件类实例注册表,插件名 -> 插件类实例
self.failed_plugins: dict[str, str] = {} # 记录加载失败的插件文件及其错误信息,插件名 -> 错误信息
# 核心消息接收器(由主程序设置)
self._core_sink: Optional[Any] = None
self._core_sink: Any | None = None
# 确保插件目录存在
self._ensure_plugin_directories()
@@ -45,12 +45,11 @@ class PluginManager:
def set_core_sink(self, core_sink: Any) -> None:
"""设置核心消息接收器
Args:
core_sink: 核心消息接收器实例InProcessCoreSink
"""
self._core_sink = core_sink
logger.info("已设置核心消息接收器")
def add_plugin_directory(self, directory: str) -> bool:
"""添加插件目录"""
@@ -184,7 +183,7 @@ class PluginManager:
async def _register_adapter_components(self, plugin_name: str, plugin_instance: PluginBase) -> None:
"""注册适配器组件
Args:
plugin_name: 插件名称
plugin_instance: 插件实例
@@ -193,36 +192,36 @@ class PluginManager:
from src.plugin_system.base.component_types import AdapterInfo, ComponentType
from src.plugin_system.core.adapter_manager import get_adapter_manager
from src.plugin_system.core.component_registry import component_registry
# 获取所有 ADAPTER 类型的组件
plugin_info = plugin_instance.plugin_info
adapter_components = [
comp for comp in plugin_info.components
comp for comp in plugin_info.components
if comp.component_type == ComponentType.ADAPTER
]
if not adapter_components:
return
adapter_manager = get_adapter_manager()
for comp_info in adapter_components:
# 类型检查:确保是 AdapterInfo
if not isinstance(comp_info, AdapterInfo):
logger.warning(f"组件 {comp_info.name} 不是 AdapterInfo 类型")
continue
try:
# 从组件注册表获取适配器类
adapter_class = component_registry.get_component_class(
comp_info.name,
comp_info.name,
ComponentType.ADAPTER
)
if not adapter_class:
logger.warning(f"无法找到适配器组件类: {comp_info.name}")
continue
# 创建适配器实例,传入 core_sink 和 plugin
# 注册到适配器管理器,由管理器统一在运行时创建实例
adapter_manager.register_adapter(adapter_class, plugin_instance) # type: ignore
@@ -230,13 +229,13 @@ class PluginManager:
f"插件 '{plugin_name}' 注册了适配器组件: {comp_info.name} "
f"(平台: {comp_info.platform})"
)
except Exception as e:
logger.error(
f"注册插件 '{plugin_name}' 的适配器组件 '{comp_info.name}' 时出错: {e}",
exc_info=True
)
except Exception as e:
logger.error(f"处理插件 '{plugin_name}' 的适配器组件时出错: {e}")

View File

@@ -36,7 +36,7 @@ class ToolCallRecord:
no_truncate_tools = {"web_search", "web_surfing", "knowledge_search"}
should_truncate = self.tool_name not in no_truncate_tools
max_length = 500 if should_truncate else 10000 # 联网搜索给更大的限制
if isinstance(content, str):
if len(content) > max_length:
self.result_preview = content[:max_length] + "..."
@@ -97,7 +97,7 @@ class StreamToolHistoryManager:
"average_execution_time": 0.0,
}
logger.info(f"[{chat_id}] 工具历史记录管理器初始化完成,最大历史: {max_history}")
logger.debug(f"[{chat_id}] 工具历史记录管理器初始化完成,最大历史: {max_history}")
async def add_tool_call(self, record: ToolCallRecord) -> None:
"""添加工具调用记录
@@ -141,7 +141,7 @@ class StreamToolHistoryManager:
if self.enable_memory_cache:
memory_result = self._search_memory_cache(tool_name, args)
if memory_result:
logger.info(f"[{self.chat_id}] 内存缓存命中: {tool_name}")
logger.debug(f"[{self.chat_id}] 内存缓存命中: {tool_name}")
return memory_result
# 然后检查全局缓存系统
@@ -415,7 +415,6 @@ _STREAM_MANAGERS_MAX_SIZE = 100 # 最大保留数量
def _evict_old_stream_managers() -> None:
"""内存优化:淘汰最久未使用的 stream manager"""
import time
if len(_stream_managers) < _STREAM_MANAGERS_MAX_SIZE:
return
@@ -429,14 +428,12 @@ def _evict_old_stream_managers() -> None:
evicted = []
for chat_id, _ in sorted_by_time[:evict_count]:
if chat_id in _stream_managers:
del _stream_managers[chat_id]
if chat_id in _stream_managers_last_used:
del _stream_managers_last_used[chat_id]
_stream_managers.pop(chat_id, None)
_stream_managers_last_used.pop(chat_id, None)
evicted.append(chat_id)
if evicted:
logger.info(f"🔧 StreamToolHistoryManager LRU淘汰: 释放了 {len(evicted)} 个不活跃的管理器")
logger.debug(f"StreamToolHistoryManager LRU淘汰: 释放了 {len(evicted)} 个不活跃的管理器")
def get_stream_tool_history_manager(chat_id: str) -> StreamToolHistoryManager:
@@ -466,8 +463,6 @@ def cleanup_stream_manager(chat_id: str) -> None:
Args:
chat_id: 聊天ID
"""
if chat_id in _stream_managers:
del _stream_managers[chat_id]
if chat_id in _stream_managers_last_used:
del _stream_managers_last_used[chat_id]
_stream_managers.pop(chat_id, None)
_stream_managers_last_used.pop(chat_id, None)
logger.info(f"已清理聊天 {chat_id} 的工具历史记录管理器")

View File

@@ -11,7 +11,6 @@ from src.llm_models.payload_content import ToolCall
from src.llm_models.utils_model import LLMRequest
from src.plugin_system.apis.tool_api import get_llm_available_tool_definitions, get_tool_instance
from src.plugin_system.base.base_tool import BaseTool
from src.plugin_system.core.global_announcement_manager import global_announcement_manager
from src.plugin_system.core.stream_tool_history import ToolCallRecord, get_stream_tool_history_manager
logger = get_logger("tool_use")
@@ -203,7 +202,7 @@ class ToolExecutor:
logger.debug(f"{self.log_prefix}开始LLM工具调用分析")
# 调用LLM进行工具决策
response, llm_extra_info = await self.llm_model.generate_response_async(
_response, llm_extra_info = await self.llm_model.generate_response_async(
prompt=prompt, tools=tools, raise_when_empty=False
)
@@ -412,7 +411,7 @@ class ToolExecutor:
for i, tool_call in enumerate(tool_calls)
]
async def _execute_single_tool_with_timeout(self, tool_call: ToolCall, index: int) -> ToolExecutionResult:
"""执行单个工具调用,支持超时控制

View File

@@ -26,15 +26,13 @@ class InterestService:
"""
try:
logger.info("开始初始化智能兴趣系统...")
logger.info(f"人设ID: {personality_id}, 描述长度: {len(personality_description)}")
await bot_interest_manager.initialize(personality_description, personality_id)
self.is_initialized = True
logger.info("智能兴趣系统初始化完成。")
# 显示初始化后的统计信息
stats = bot_interest_manager.get_interest_stats()
logger.info(f"兴趣系统统计: {stats}")
logger.debug(f"兴趣系统统计: {stats}")
except Exception as e:
logger.error(f"初始化智能兴趣系统失败: {e}")

View File

@@ -15,7 +15,7 @@ logger = get_logger("relationship_service")
class RelationshipService:
"""用户关系分服务 - 独立于插件的数据库直接访问层
内存优化:添加缓存大小限制和自动过期清理
"""

View File

@@ -110,9 +110,6 @@ def require_permission(permission_node: str, deny_message: str | None = None, *,
)
if not has_permission:
# 权限不足,发送拒绝消息
message = deny_message or f"❌ 你没有执行此操作的权限\n需要权限: {full_permission_node}"
await text_to_stream(message, chat_stream.stream_id)
# 对于PlusCommand的execute方法需要返回适当的元组
if func.__name__ == "execute" and hasattr(args[0], "send_text"):
return False, "权限不足", True
@@ -190,8 +187,6 @@ def require_master(deny_message: str | None = None):
is_master = await permission_api.is_master(chat_stream.platform, chat_stream.user_info.user_id)
if not is_master:
message = deny_message or "❌ 此操作仅限Master用户执行"
await text_to_stream(message, chat_stream.stream_id)
if func.__name__ == "execute" and hasattr(args[0], "send_text"):
return False, "需要Master权限", True
return None
@@ -258,9 +253,7 @@ class PermissionChecker:
has_permission = await permission_api.check_permission(
chat_stream.platform, chat_stream.user_info.user_id, permission_node
)
if not has_permission:
message = deny_message or f"❌ 你没有执行此操作的权限\n需要权限: {permission_node}"
await text_to_stream(message, chat_stream.stream_id)
return has_permission
@staticmethod
@@ -277,8 +270,4 @@ class PermissionChecker:
"""
is_master = await PermissionChecker.is_master(chat_stream)
if not is_master:
message = deny_message or "❌ 此操作仅限Master用户执行"
await text_to_stream(message, chat_stream.stream_id)
return is_master