diff --git a/examples/example_adapter_plugin.py b/examples/example_adapter_plugin.py new file mode 100644 index 000000000..1173365c8 --- /dev/null +++ b/examples/example_adapter_plugin.py @@ -0,0 +1,126 @@ +""" +示例:演示如何创建一个完整的适配器插件 + +这个示例展示了: +1. 如何继承 BaseAdapter 创建自定义适配器 +2. 如何在插件中集成适配器 +3. 如何支持子进程运行 +""" + +from pathlib import Path +from typing import Any, Dict, Optional + +from mofox_bus import CoreMessageSink, InProcessCoreSink, MessageEnvelope, WebSocketAdapterOptions + +from src.plugin_system.base import BaseAdapter, BasePlugin, PluginMetadata, AdapterInfo +from src.plugin_system import register_plugin + +class ExampleAdapter(BaseAdapter): + """示例适配器""" + + adapter_name = "example_adapter" + adapter_version = "1.0.0" + adapter_author = "MoFox Team" + adapter_description = "示例适配器,演示如何创建适配器插件" + platform = "example" + + # 是否在子进程中运行(设为 False 在主进程中运行) + run_in_subprocess = False + + # 子进程入口脚本(如果 run_in_subprocess=True) + subprocess_entry = "adapter_entry.py" + + def __init__(self, core_sink: CoreMessageSink, plugin: Optional[BasePlugin] = None): + """初始化适配器""" + # 配置 WebSocket 传输(如果需要) + transport = None + if plugin and plugin.config: + ws_url = plugin.config.get("websocket_url") + if ws_url: + transport = WebSocketAdapterOptions( + url=ws_url, + headers={"platform": self.platform}, + ) + + super().__init__(core_sink, plugin=plugin, transport=transport) + + def from_platform_message(self, raw: Dict[str, Any]) -> MessageEnvelope: + """ + 将平台消息转换为 MessageEnvelope + + Args: + raw: 平台原始消息 + + Returns: + MessageEnvelope: 统一消息信封 + """ + # 示例:假设平台消息格式为 + # { + # "id": "msg_123", + # "user_id": "user_456", + # "group_id": "group_789", + # "text": "Hello", + # "timestamp": 1234567890 + # } + + envelope: MessageEnvelope = { + "id": raw.get("id", "unknown"), + "direction": "incoming", + "platform": self.platform, + "timestamp_ms": int(raw.get("timestamp", 0) * 1000), + "channel": { + "channel_id": raw.get("group_id", raw.get("user_id", "unknown")), + "channel_type": "group" if "group_id" in raw else "private", + }, + "sender": { + "user_id": raw.get("user_id", "unknown"), + "role": "user", + }, + "content": { + "type": "text", + "text": raw.get("text", ""), + }, + "conversation_id": raw.get("group_id", raw.get("user_id", "unknown")), + } + + return envelope + + async def _send_platform_message(self, envelope: MessageEnvelope) -> None: + """ + 发送消息到平台 + + 如果配置了 WebSocketAdapterOptions,会自动通过 WebSocket 发送。 + 否则需要在这里实现自定义发送逻辑。 + """ + if self._transport_config: + # 使用自动传输 + await super()._send_platform_message(envelope) + else: + # 自定义发送逻辑 + # 例如:调用平台 API + pass + + +class ExampleAdapterPlugin(BasePlugin): + """示例适配器插件""" + + plugin_name = "example_adapter_plugin" + enable_plugin = True # 设为 False 禁用插件 + plugin_version = "1.0.0" + plugin_author = "MoFox Team" + + def get_plugin_components(self) -> list: + """获取插件组件列表 + + 适配器作为组件返回,插件管理器会自动创建实例并传入 core_sink + """ + return [ + # 适配器组件 - 使用 get_adapter_info() 方法 + (ExampleAdapter.get_adapter_info(), ExampleAdapter), + ] + + +# 注册插件 +def register_plugin() -> type[BasePlugin]: + """插件注册函数""" + return ExampleAdapterPlugin diff --git a/examples/mofox_bus_demo_adapter.py b/examples/mofox_bus_demo_adapter.py index 40cff4f35..c5b75b58e 100644 --- a/examples/mofox_bus_demo_adapter.py +++ b/examples/mofox_bus_demo_adapter.py @@ -21,7 +21,7 @@ import websockets sys.path.append(str(Path(__file__).resolve().parents[1] / "src")) from mofox_bus import ( - BaseAdapter, + AdapterBase, InProcessCoreSink, MessageEnvelope, MessageRuntime, @@ -86,13 +86,16 @@ class FakePlatformServer: # --------------------------------------------------------------------------- -# 2. 适配器实现:仅关注核心转换逻辑,网络层交由 BaseAdapter 管理 +# 2. 适配器实现:仅关注核心转换逻辑,网络层交由 AdapterBase 管理 # --------------------------------------------------------------------------- -class DemoWsAdapter(BaseAdapter): - platform = "demo" +class DemoWsAdapter(AdapterBase): # 继承AdapterBase + platform = "demo" # 定义平台名称 + # 实现 from_platform_message 方法,将平台消息转换为 MessageEnvelope + # 该方法必须被实现以便 AdapterBase 正确处理消息转换 + # 该方法会在adapter接收到平台消息后被调用 def from_platform_message(self, raw: Dict[str, Any]) -> MessageEnvelope: return { "id": raw["message_id"], @@ -105,7 +108,6 @@ class DemoWsAdapter(BaseAdapter): "content": {"type": "text", "text": raw["text"]}, } - def incoming_parser(raw: str | bytes) -> Any: data = orjson.loads(raw) if data.get("type") == "message": diff --git a/src/common/message/envelope_converter.py b/src/common/message/envelope_converter.py new file mode 100644 index 000000000..2e59e031b --- /dev/null +++ b/src/common/message/envelope_converter.py @@ -0,0 +1,275 @@ +""" +MessageEnvelope 转换器 + +将 mofox_bus 的 MessageEnvelope 转换为 MoFox Bot 内部使用的消息格式。 +""" + +from __future__ import annotations + +import time +from typing import Any, Dict, List, Optional + +from mofox_bus import MessageEnvelope, BaseMessageInfo, FormatInfo, GroupInfo, MessageBase, Seg, UserInfo + +from src.common.logger import get_logger + +logger = get_logger("envelope_converter") + + +class EnvelopeConverter: + """MessageEnvelope 到内部消息格式的转换器""" + + @staticmethod + def to_message_base(envelope: MessageEnvelope) -> MessageBase: + """ + 将 MessageEnvelope 转换为 MessageBase + + Args: + envelope: 统一的消息信封 + + Returns: + MessageBase: 内部消息格式 + """ + try: + # 提取基本信息 + platform = envelope["platform"] + channel = envelope["channel"] + sender = envelope["sender"] + content = envelope["content"] + + # 创建 UserInfo + user_info = UserInfo( + user_id=sender["user_id"], + user_nickname=sender.get("display_name", sender["user_id"]), + user_avatar=sender.get("avatar_url"), + ) + + # 创建 GroupInfo (如果是群组消息) + group_info: Optional[GroupInfo] = None + if channel["channel_type"] in ("group", "supergroup", "room"): + group_info = GroupInfo( + group_id=channel["channel_id"], + group_name=channel.get("title", channel["channel_id"]), + ) + + # 创建 BaseMessageInfo + message_info = BaseMessageInfo( + platform=platform, + chat_type="group" if group_info else "private", + message_id=envelope["id"], + user_info=user_info, + group_info=group_info, + timestamp=envelope["timestamp_ms"] / 1000.0, # 转换为秒 + ) + + # 转换 Content 为 Seg 列表 + segments = EnvelopeConverter._content_to_segments(content) + + # 创建 MessageBase + message_base = MessageBase( + message_info=message_info, + message=segments, + ) + + # 保存原始 envelope 到 raw 字段 + if hasattr(message_base, "raw"): + message_base.raw = envelope + + return message_base + + except Exception as e: + logger.error(f"转换 MessageEnvelope 失败: {e}", exc_info=True) + raise + + @staticmethod + def _content_to_segments(content: Dict[str, Any]) -> List[Seg]: + """ + 将 Content 转换为 Seg 列表 + + Args: + content: 消息内容 + + Returns: + List[Seg]: 消息段列表 + """ + segments: List[Seg] = [] + content_type = content.get("type") + + if content_type == "text": + # 文本消息 + text = content.get("text", "") + segments.append(Seg.text(text)) + + elif content_type == "image": + # 图片消息 + url = content.get("url", "") + file_id = content.get("file_id") + segments.append(Seg.image(url if url else file_id)) + + elif content_type == "audio": + # 音频消息 + url = content.get("url", "") + file_id = content.get("file_id") + segments.append(Seg.record(url if url else file_id)) + + elif content_type == "video": + # 视频消息 + url = content.get("url", "") + file_id = content.get("file_id") + segments.append(Seg.video(url if url else file_id)) + + elif content_type == "file": + # 文件消息 + url = content.get("url", "") + file_name = content.get("file_name", "file") + # 使用 text 表示文件(或者可以自定义一个 file seg type) + segments.append(Seg.text(f"[文件: {file_name}]")) + + elif content_type == "command": + # 命令消息 + name = content.get("name", "") + args = content.get("args", {}) + # 重构为文本格式 + cmd_text = f"/{name}" + if args: + cmd_text += " " + " ".join(f"{k}={v}" for k, v in args.items()) + segments.append(Seg.text(cmd_text)) + + elif content_type == "event": + # 事件消息 - 转换为文本表示 + event_type = content.get("event_type", "unknown") + segments.append(Seg.text(f"[事件: {event_type}]")) + + elif content_type == "system": + # 系统消息 + text = content.get("text", "") + segments.append(Seg.text(f"[系统] {text}")) + + else: + # 未知类型 - 转换为文本 + logger.warning(f"未知的消息类型: {content_type}") + segments.append(Seg.text(f"[未知消息类型: {content_type}]")) + + return segments + + @staticmethod + def to_legacy_dict(envelope: MessageEnvelope) -> Dict[str, Any]: + """ + 将 MessageEnvelope 转换为旧版字典格式(用于向后兼容) + + Args: + envelope: 统一的消息信封 + + Returns: + Dict[str, Any]: 旧版消息字典 + """ + message_base = EnvelopeConverter.to_message_base(envelope) + return message_base.to_dict() + + @staticmethod + def from_message_base(message: MessageBase, direction: str = "outgoing") -> MessageEnvelope: + """ + 将 MessageBase 转换为 MessageEnvelope (反向转换) + + Args: + message: 内部消息格式 + direction: 消息方向 ("incoming" 或 "outgoing") + + Returns: + MessageEnvelope: 统一的消息信封 + """ + try: + message_info = message.message_info + user_info = message_info.user_info + group_info = message_info.group_info + + # 创建 SenderInfo + sender = { + "user_id": user_info.user_id, + "role": "assistant" if direction == "outgoing" else "user", + } + if user_info.user_nickname: + sender["display_name"] = user_info.user_nickname + if user_info.user_avatar: + sender["avatar_url"] = user_info.user_avatar + + # 创建 ChannelInfo + if group_info: + channel = { + "channel_id": group_info.group_id, + "channel_type": "group", + } + if group_info.group_name: + channel["title"] = group_info.group_name + else: + channel = { + "channel_id": user_info.user_id, + "channel_type": "private", + } + + # 转换 segments 为 Content + content = EnvelopeConverter._segments_to_content(message.message) + + # 创建 MessageEnvelope + envelope: MessageEnvelope = { + "id": message_info.message_id, + "direction": direction, + "platform": message_info.platform, + "timestamp_ms": int(message_info.timestamp * 1000), + "channel": channel, + "sender": sender, + "content": content, + "conversation_id": group_info.group_id if group_info else user_info.user_id, + } + + return envelope + + except Exception as e: + logger.error(f"转换 MessageBase 失败: {e}", exc_info=True) + raise + + @staticmethod + def _segments_to_content(segments: List[Seg]) -> Dict[str, Any]: + """ + 将 Seg 列表转换为 Content + + Args: + segments: 消息段列表 + + Returns: + Dict[str, Any]: 消息内容 + """ + if not segments: + return {"type": "text", "text": ""} + + # 简化处理:如果有多个段,合并为文本 + if len(segments) == 1: + seg = segments[0] + + if seg.type == "text": + return {"type": "text", "text": seg.data.get("text", "")} + elif seg.type == "image": + return {"type": "image", "url": seg.data.get("file", "")} + elif seg.type == "record": + return {"type": "audio", "url": seg.data.get("file", "")} + elif seg.type == "video": + return {"type": "video", "url": seg.data.get("file", "")} + + # 多个段或未知类型 - 合并为文本 + text_parts = [] + for seg in segments: + if seg.type == "text": + text_parts.append(seg.data.get("text", "")) + elif seg.type == "image": + text_parts.append("[图片]") + elif seg.type == "record": + text_parts.append("[语音]") + elif seg.type == "video": + text_parts.append("[视频]") + else: + text_parts.append(f"[{seg.type}]") + + return {"type": "text", "text": "".join(text_parts)} + + +__all__ = ["EnvelopeConverter"] diff --git a/src/main.py b/src/main.py index acba32aa8..69ea51630 100644 --- a/src/main.py +++ b/src/main.py @@ -10,7 +10,7 @@ from functools import partial from random import choices from typing import Any -from mofox_bus import MessageServer +from mofox_bus import InProcessCoreSink, MessageEnvelope from rich.traceback import install from src.chat.emoji_system.emoji_manager import get_emoji_manager @@ -18,7 +18,7 @@ from src.chat.message_receive.bot import chat_bot from src.chat.message_receive.chat_stream import get_chat_manager from src.chat.utils.statistic import OnlineTimeRecordTask, StatisticOutputTask from src.common.logger import get_logger -from src.common.message import get_global_api +from src.common.message.envelope_converter import EnvelopeConverter # 全局背景任务集合 _background_tasks = set() @@ -76,8 +76,10 @@ class MainSystem: def __init__(self) -> None: self.individuality: Individuality = get_individuality() - # 使用消息API替代直接的FastAPI实例 - self.app: MessageServer = get_global_api() + # 创建核心消息接收器 + self.core_sink: InProcessCoreSink = InProcessCoreSink(self._handle_message_envelope) + + # 使用服务器 self.server: Server = get_global_server() # 设置信号处理器用于优雅退出 @@ -288,14 +290,15 @@ class MainSystem: cleanup_tasks.append(("服务器", self.server.shutdown())) except Exception as e: logger.error(f"准备停止服务器时出错: {e}") - - # 停止应用 + + # 停止所有适配器 try: - if self.app: - if hasattr(self.app, "stop"): - cleanup_tasks.append(("应用", self.app.stop())) + from src.plugin_system.core.adapter_manager import get_adapter_manager + + adapter_manager = get_adapter_manager() + cleanup_tasks.append(("适配器管理器", adapter_manager.stop_all_adapters())) except Exception as e: - logger.error(f"准备停止应用时出错: {e}") + logger.error(f"准备停止适配器管理器时出错: {e}") # 并行执行所有清理任务 if cleanup_tasks: @@ -371,6 +374,23 @@ class MainSystem: logger.error("在创建消息处理任务时发生严重错误:") logger.error(traceback.format_exc()) + async def _handle_message_envelope(self, envelope: MessageEnvelope) -> None: + """ + 处理来自适配器的 MessageEnvelope + + Args: + envelope: 统一的消息信封 + """ + try: + # 转换为旧版格式 + message_data = EnvelopeConverter.to_legacy_dict(envelope) + + # 使用现有的消息处理流程 + await self._message_process_wrapper(message_data) + + except Exception as e: + logger.error(f"处理 MessageEnvelope 时出错: {e}", exc_info=True) + async def initialize(self) -> None: """初始化系统组件""" # 检查必要的配置 @@ -450,6 +470,9 @@ MoFox_Bot(第三方修改版) except Exception as e: logger.error(f"统一调度器初始化失败: {e}") + # 设置核心消息接收器到插件管理器 + plugin_manager.set_core_sink(self.core_sink) + # 加载所有插件 plugin_manager.load_all_plugins() @@ -500,8 +523,8 @@ MoFox_Bot(第三方修改版) except Exception as e: logger.error(f"LPMM知识库初始化失败: {e}") - # 将消息处理函数注册到API - self.app.register_message_handler(self._message_process_wrapper) + # 消息接收器已经在 __init__ 中创建,无需再次注册 + logger.info("核心消息接收器已就绪") # 启动消息重组器 try: @@ -548,6 +571,16 @@ MoFox_Bot(第三方修改版) logger.info(f"初始化完成,神经元放电{init_time}次") except Exception as e: logger.error(f"启动事件触发失败: {e}") + + # 启动所有适配器 + try: + from src.plugin_system.core.adapter_manager import get_adapter_manager + + adapter_manager = get_adapter_manager() + await adapter_manager.start_all_adapters() + logger.info("所有适配器已启动") + except Exception as e: + logger.error(f"启动适配器失败: {e}", exc_info=True) async def _init_planning_components(self) -> None: """初始化计划相关组件""" @@ -591,7 +624,6 @@ MoFox_Bot(第三方修改版) try: tasks = [ get_emoji_manager().start_periodic_check_register(), - self.app.run(), self.server.run(), ] diff --git a/src/mofox_bus/__init__.py b/src/mofox_bus/__init__.py index 955736e8a..85a29fa55 100644 --- a/src/mofox_bus/__init__.py +++ b/src/mofox_bus/__init__.py @@ -8,7 +8,7 @@ MoFox 内部通用消息总线实现。 from . import codec, types from .adapter_utils import ( AdapterTransportOptions, - BaseAdapter, + AdapterBase, BatchDispatcher, CoreMessageSink, HttpAdapterOptions, @@ -84,7 +84,7 @@ __all__ = [ "TargetConfig", # Adapter helpers "AdapterTransportOptions", - "BaseAdapter", + "AdapterBase", "BatchDispatcher", "CoreMessageSink", "InProcessCoreSink", diff --git a/src/mofox_bus/adapter_utils.py b/src/mofox_bus/adapter_utils.py index 5c4b01be3..11c278ca5 100644 --- a/src/mofox_bus/adapter_utils.py +++ b/src/mofox_bus/adapter_utils.py @@ -48,7 +48,7 @@ class HttpAdapterOptions: AdapterTransportOptions = WebSocketAdapterOptions | HttpAdapterOptions | None -class BaseAdapter: +class AdapterBase: """ 适配器基类:负责平台原始消息与 MessageEnvelope 之间的互转。 子类需要实现平台入站解析与出站发送逻辑。 @@ -261,7 +261,7 @@ class BatchDispatcher: __all__ = [ "AdapterTransportOptions", - "BaseAdapter", + "AdapterBase", "BatchDispatcher", "CoreMessageSink", "HttpAdapterOptions", diff --git a/src/plugin_system/base/__init__.py b/src/plugin_system/base/__init__.py index 014ea4852..56ba1f7e1 100644 --- a/src/plugin_system/base/__init__.py +++ b/src/plugin_system/base/__init__.py @@ -5,6 +5,7 @@ """ from .base_action import BaseAction +from .base_adapter import BaseAdapter from .base_command import BaseCommand from .base_events_handler import BaseEventHandler from .base_http_component import BaseRouterComponent @@ -15,6 +16,7 @@ from .command_args import CommandArgs from .component_types import ( ActionActivationType, ActionInfo, + AdapterInfo, ChatMode, ChatType, CommandInfo, @@ -36,7 +38,9 @@ from .plus_command import PlusCommand, create_plus_command_adapter __all__ = [ "ActionActivationType", "ActionInfo", + "AdapterInfo", "BaseAction", + "BaseAdapter", "BaseCommand", "BaseEventHandler", "BasePlugin", @@ -56,7 +60,7 @@ __all__ = [ "PluginMetadata", # 增强命令系统 "PlusCommand", - "BaseRouterComponent" + "BaseRouterComponent", "PlusCommandInfo", "PythonDependency", "ToolInfo", diff --git a/src/plugin_system/base/base_adapter.py b/src/plugin_system/base/base_adapter.py new file mode 100644 index 000000000..6f9364f8c --- /dev/null +++ b/src/plugin_system/base/base_adapter.py @@ -0,0 +1,252 @@ +""" +插件系统 Adapter 基类 + +提供插件化的适配器支持,包装 mofox_bus.AdapterBase, +添加插件生命周期、配置管理、自动启动等特性。 +""" + +from __future__ import annotations + +import asyncio +from abc import ABC, abstractmethod +from pathlib import Path +from typing import TYPE_CHECKING, Any, Dict, Optional + +from mofox_bus import AdapterBase as MoFoxAdapterBase, CoreMessageSink, MessageEnvelope + +if TYPE_CHECKING: + from src.plugin_system.base.base_plugin import BasePlugin + +from src.common.logger import get_logger + +logger = get_logger("plugin.adapter") + + +class BaseAdapter(MoFoxAdapterBase, ABC): + """ + 插件系统的 Adapter 基类 + + 相比 mofox_bus.AdapterBase,增加了以下特性: + 1. 插件生命周期管理 (on_adapter_loaded, on_adapter_unloaded) + 2. 配置管理集成 + 3. 自动重连与健康检查 + 4. 子进程启动支持 + """ + + # 适配器元数据 + adapter_name: str = "unknown_adapter" + adapter_version: str = "0.0.1" + adapter_author: str = "Unknown" + adapter_description: str = "No description" + + # 是否在子进程中运行 + run_in_subprocess: bool = True + + # 子进程启动脚本路径(相对于插件目录) + subprocess_entry: Optional[str] = None + + def __init__( + self, + core_sink: CoreMessageSink, + plugin: Optional[BasePlugin] = None, + **kwargs + ): + """ + Args: + core_sink: 核心消息接收器 + plugin: 所属插件实例(可选) + **kwargs: 传递给 AdapterBase 的其他参数 + """ + super().__init__(core_sink, **kwargs) + self.plugin = plugin + self._config: Dict[str, Any] = {} + self._health_check_task: Optional[asyncio.Task] = None + self._running = False + + @property + def config(self) -> Dict[str, Any]: + """获取适配器配置""" + if self.plugin and hasattr(self.plugin, "config"): + return self.plugin.config + return self._config + + @config.setter + def config(self, value: Dict[str, Any]) -> None: + """设置适配器配置""" + self._config = value + + async def start(self) -> None: + """启动适配器""" + logger.info(f"启动适配器: {self.adapter_name} v{self.adapter_version}") + + # 调用生命周期钩子 + await self.on_adapter_loaded() + + # 调用父类启动 + await super().start() + + # 启动健康检查 + if self.config.get("enable_health_check", False): + self._health_check_task = asyncio.create_task(self._health_check_loop()) + + self._running = True + logger.info(f"适配器 {self.adapter_name} 启动成功") + + async def stop(self) -> None: + """停止适配器""" + logger.info(f"停止适配器: {self.adapter_name}") + + self._running = False + + # 停止健康检查 + if self._health_check_task and not self._health_check_task.done(): + self._health_check_task.cancel() + try: + await self._health_check_task + except asyncio.CancelledError: + pass + + # 调用父类停止 + await super().stop() + + # 调用生命周期钩子 + await self.on_adapter_unloaded() + + logger.info(f"适配器 {self.adapter_name} 已停止") + + async def on_adapter_loaded(self) -> None: + """ + 适配器加载时的钩子 + 子类可重写以执行初始化逻辑 + """ + pass + + async def on_adapter_unloaded(self) -> None: + """ + 适配器卸载时的钩子 + 子类可重写以执行清理逻辑 + """ + pass + + async def _health_check_loop(self) -> None: + """健康检查循环""" + interval = self.config.get("health_check_interval", 30) + + while self._running: + try: + await asyncio.sleep(interval) + + # 执行健康检查 + is_healthy = await self.health_check() + + if not is_healthy: + logger.warning(f"适配器 {self.adapter_name} 健康检查失败,尝试重连...") + await self.reconnect() + + except asyncio.CancelledError: + break + except Exception as e: + logger.error(f"适配器 {self.adapter_name} 健康检查异常: {e}", exc_info=True) + + async def health_check(self) -> bool: + """ + 健康检查 + 子类可重写以实现自定义检查逻辑 + + Returns: + bool: 是否健康 + """ + # 默认检查 WebSocket 连接状态 + if self._ws and not self._ws.closed: + return True + return False + + async def reconnect(self) -> None: + """ + 重新连接 + 子类可重写以实现自定义重连逻辑 + """ + try: + await self.stop() + await asyncio.sleep(2) # 等待一段时间再重连 + await self.start() + except Exception as e: + logger.error(f"适配器 {self.adapter_name} 重连失败: {e}", exc_info=True) + + def get_subprocess_entry_path(self) -> Optional[Path]: + """ + 获取子进程启动脚本的完整路径 + + Returns: + Path | None: 脚本路径,如果不存在则返回 None + """ + if not self.subprocess_entry: + return None + + if not self.plugin: + return None + + # 获取插件目录 + plugin_dir = Path(self.plugin.__file__).parent + entry_path = plugin_dir / self.subprocess_entry + + if entry_path.exists(): + return entry_path + + logger.warning(f"子进程入口脚本不存在: {entry_path}") + return None + + @classmethod + def get_adapter_info(cls) -> "AdapterInfo": + """获取适配器的信息 + + Returns: + AdapterInfo: 适配器组件信息 + """ + from src.plugin_system.base.component_types import AdapterInfo + + return AdapterInfo( + name=getattr(cls, "adapter_name", cls.__name__.lower().replace("adapter", "")), + version=getattr(cls, "adapter_version", "1.0.0"), + platform=getattr(cls, "platform", "unknown"), + description=getattr(cls, "adapter_description", ""), + enabled=True, + run_in_subprocess=getattr(cls, "run_in_subprocess", False), + subprocess_entry=getattr(cls, "subprocess_entry", None), + ) + + @abstractmethod + def from_platform_message(self, raw: Any) -> MessageEnvelope: + """ + 将平台原始消息转换为 MessageEnvelope + + 子类必须实现此方法 + + Args: + raw: 平台原始消息 + + Returns: + MessageEnvelope: 统一的消息信封 + """ + raise NotImplementedError + + async def _send_platform_message(self, envelope: MessageEnvelope) -> None: + """ + 发送消息到平台 + + 如果使用了 WebSocketAdapterOptions 或 HttpAdapterOptions, + 此方法会自动处理。否则子类需要重写此方法。 + + Args: + envelope: 要发送的消息信封 + """ + # 如果配置了自动传输,调用父类方法 + if self._transport_config: + await super()._send_platform_message(envelope) + else: + raise NotImplementedError( + f"适配器 {self.adapter_name} 未配置自动传输,必须重写 _send_platform_message 方法" + ) + + +__all__ = ["BaseAdapter"] diff --git a/src/plugin_system/base/component_types.py b/src/plugin_system/base/component_types.py index 10d1e2ede..47970d1cf 100644 --- a/src/plugin_system/base/component_types.py +++ b/src/plugin_system/base/component_types.py @@ -54,6 +54,7 @@ class ComponentType(Enum): INTEREST_CALCULATOR = "interest_calculator" # 兴趣度计算组件 PROMPT = "prompt" # Prompt组件 ROUTER = "router" # 路由组件 + ADAPTER = "adapter" # 适配器组件 def __str__(self) -> str: return self.value @@ -148,6 +149,20 @@ class PermissionNodeField: description: str # 权限描述 +@dataclass +class AdapterInfo: + """适配器组件信息""" + + name: str # 适配器名称 + component_type: ComponentType = field(default=ComponentType.ADAPTER, init=False) + version: str = "1.0.0" # 适配器版本 + platform: str = "unknown" # 平台名称 + description: str = "" # 适配器描述 + enabled: bool = True # 是否启用 + run_in_subprocess: bool = False # 是否在子进程中运行 + subprocess_entry: str | None = None # 子进程入口脚本 + + @dataclass class ComponentInfo: """组件信息""" diff --git a/src/plugin_system/core/adapter_manager.py b/src/plugin_system/core/adapter_manager.py new file mode 100644 index 000000000..f3ee98a23 --- /dev/null +++ b/src/plugin_system/core/adapter_manager.py @@ -0,0 +1,316 @@ +""" +Adapter 管理器 + +负责管理所有注册的适配器,支持子进程自动启动和生命周期管理。 +""" + +from __future__ import annotations + +import asyncio +import subprocess +import sys +from pathlib import Path +from typing import TYPE_CHECKING, Dict, Optional + +if TYPE_CHECKING: + from src.plugin_system.base.base_adapter import BaseAdapter + +from src.common.logger import get_logger + +logger = get_logger("adapter_manager") + + +class AdapterProcess: + """适配器子进程包装器""" + + def __init__( + self, + adapter_name: str, + entry_path: Path, + python_executable: Optional[str] = None, + ): + self.adapter_name = adapter_name + self.entry_path = entry_path + self.python_executable = python_executable or sys.executable + self.process: Optional[subprocess.Popen] = None + self._monitor_task: Optional[asyncio.Task] = None + + async def start(self) -> bool: + """启动适配器子进程""" + try: + logger.info(f"启动适配器子进程: {self.adapter_name}") + logger.debug(f"Python: {self.python_executable}") + logger.debug(f"Entry: {self.entry_path}") + + # 启动子进程 + self.process = subprocess.Popen( + [self.python_executable, str(self.entry_path)], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + bufsize=1, + ) + + # 启动监控任务 + self._monitor_task = asyncio.create_task(self._monitor_process()) + + logger.info(f"适配器 {self.adapter_name} 子进程已启动 (PID: {self.process.pid})") + return True + + except Exception as e: + logger.error(f"启动适配器 {self.adapter_name} 子进程失败: {e}", exc_info=True) + return False + + async def stop(self) -> None: + """停止适配器子进程""" + if not self.process: + return + + logger.info(f"停止适配器子进程: {self.adapter_name} (PID: {self.process.pid})") + + try: + # 取消监控任务 + if self._monitor_task and not self._monitor_task.done(): + self._monitor_task.cancel() + try: + await self._monitor_task + except asyncio.CancelledError: + pass + + # 终止进程 + self.process.terminate() + + # 等待进程退出(最多等待5秒) + try: + await asyncio.wait_for( + asyncio.to_thread(self.process.wait), + timeout=5.0 + ) + except asyncio.TimeoutError: + logger.warning(f"适配器 {self.adapter_name} 未能在5秒内退出,强制终止") + self.process.kill() + await asyncio.to_thread(self.process.wait) + + logger.info(f"适配器 {self.adapter_name} 子进程已停止") + + except Exception as e: + logger.error(f"停止适配器 {self.adapter_name} 子进程时出错: {e}", exc_info=True) + finally: + self.process = None + + async def _monitor_process(self) -> None: + """监控子进程状态""" + if not self.process: + return + + try: + # 在后台线程中等待进程退出 + return_code = await asyncio.to_thread(self.process.wait) + + if return_code != 0: + logger.error( + f"适配器 {self.adapter_name} 子进程异常退出 (返回码: {return_code})" + ) + + # 读取 stderr 输出 + if self.process.stderr: + stderr = self.process.stderr.read() + if stderr: + logger.error(f"错误输出:\n{stderr}") + else: + logger.info(f"适配器 {self.adapter_name} 子进程正常退出") + + except asyncio.CancelledError: + pass + except Exception as e: + logger.error(f"监控适配器 {self.adapter_name} 子进程时出错: {e}", exc_info=True) + + def is_running(self) -> bool: + """检查进程是否正在运行""" + if not self.process: + return False + return self.process.poll() is None + + +class AdapterManager: + """适配器管理器""" + + def __init__(self): + self._adapters: Dict[str, BaseAdapter] = {} + self._adapter_processes: Dict[str, AdapterProcess] = {} + self._in_process_adapters: Dict[str, BaseAdapter] = {} + + def register_adapter(self, adapter: BaseAdapter) -> None: + """ + 注册适配器 + + Args: + adapter: 要注册的适配器实例 + """ + adapter_name = adapter.adapter_name + + if adapter_name in self._adapters: + logger.warning(f"适配器 {adapter_name} 已经注册,将被覆盖") + + self._adapters[adapter_name] = adapter + logger.info(f"已注册适配器: {adapter_name} v{adapter.adapter_version}") + + async def start_adapter(self, adapter_name: str) -> bool: + """ + 启动指定的适配器 + + Args: + adapter_name: 适配器名称 + + Returns: + bool: 是否成功启动 + """ + adapter = self._adapters.get(adapter_name) + if not adapter: + logger.error(f"适配器 {adapter_name} 未注册") + return False + + # 检查是否需要在子进程中运行 + if adapter.run_in_subprocess: + return await self._start_adapter_subprocess(adapter) + else: + return await self._start_adapter_in_process(adapter) + + async def _start_adapter_subprocess(self, adapter: BaseAdapter) -> bool: + """在子进程中启动适配器""" + adapter_name = adapter.adapter_name + + # 获取子进程入口脚本 + entry_path = adapter.get_subprocess_entry_path() + if not entry_path: + logger.error( + f"适配器 {adapter_name} 配置为子进程运行,但未提供有效的入口脚本" + ) + return False + + # 创建并启动子进程 + adapter_process = AdapterProcess(adapter_name, entry_path) + success = await adapter_process.start() + + if success: + self._adapter_processes[adapter_name] = adapter_process + + return success + + async def _start_adapter_in_process(self, adapter: BaseAdapter) -> bool: + """在主进程中启动适配器""" + adapter_name = adapter.adapter_name + + try: + await adapter.start() + self._in_process_adapters[adapter_name] = adapter + logger.info(f"适配器 {adapter_name} 已在主进程中启动") + return True + except Exception as e: + logger.error(f"在主进程中启动适配器 {adapter_name} 失败: {e}", exc_info=True) + return False + + async def stop_adapter(self, adapter_name: str) -> None: + """ + 停止指定的适配器 + + Args: + adapter_name: 适配器名称 + """ + # 检查是否在子进程中运行 + if adapter_name in self._adapter_processes: + adapter_process = self._adapter_processes.pop(adapter_name) + await adapter_process.stop() + + # 检查是否在主进程中运行 + if adapter_name in self._in_process_adapters: + adapter = self._in_process_adapters.pop(adapter_name) + try: + await adapter.stop() + logger.info(f"适配器 {adapter_name} 已从主进程中停止") + except Exception as e: + logger.error(f"停止适配器 {adapter_name} 时出错: {e}", exc_info=True) + + async def start_all_adapters(self) -> None: + """启动所有注册的适配器""" + logger.info(f"开始启动 {len(self._adapters)} 个适配器...") + + for adapter_name in list(self._adapters.keys()): + await self.start_adapter(adapter_name) + + async def stop_all_adapters(self) -> None: + """停止所有适配器""" + logger.info("停止所有适配器...") + + # 停止所有子进程适配器 + for adapter_name in list(self._adapter_processes.keys()): + await self.stop_adapter(adapter_name) + + # 停止所有主进程适配器 + for adapter_name in list(self._in_process_adapters.keys()): + await self.stop_adapter(adapter_name) + + logger.info("所有适配器已停止") + + def get_adapter(self, adapter_name: str) -> Optional[BaseAdapter]: + """ + 获取适配器实例 + + Args: + adapter_name: 适配器名称 + + Returns: + BaseAdapter | None: 适配器实例,如果不存在则返回 None + """ + # 只返回在主进程中运行的适配器 + return self._in_process_adapters.get(adapter_name) + + def list_adapters(self) -> Dict[str, Dict[str, any]]: + """ + 列出所有适配器的状态 + + Returns: + Dict: 适配器状态信息 + """ + result = {} + + for adapter_name, adapter in self._adapters.items(): + status = { + "name": adapter_name, + "version": adapter.adapter_version, + "platform": adapter.platform, + "run_in_subprocess": adapter.run_in_subprocess, + "running": False, + "location": "unknown", + } + + # 检查运行状态 + if adapter_name in self._adapter_processes: + process = self._adapter_processes[adapter_name] + status["running"] = process.is_running() + status["location"] = "subprocess" + if process.process: + status["pid"] = process.process.pid + + elif adapter_name in self._in_process_adapters: + status["running"] = True + status["location"] = "in-process" + + result[adapter_name] = status + + return result + + +# 全局单例 +_adapter_manager: Optional[AdapterManager] = None + + +def get_adapter_manager() -> AdapterManager: + """获取适配器管理器单例""" + global _adapter_manager + if _adapter_manager is None: + _adapter_manager = AdapterManager() + return _adapter_manager + + +__all__ = ["AdapterManager", "AdapterProcess", "get_adapter_manager"] diff --git a/src/plugin_system/core/plugin_manager.py b/src/plugin_system/core/plugin_manager.py index 2548326a9..62479a66e 100644 --- a/src/plugin_system/core/plugin_manager.py +++ b/src/plugin_system/core/plugin_manager.py @@ -33,6 +33,9 @@ class PluginManager: self.loaded_plugins: dict[str, PluginBase] = {} # 已加载的插件类实例注册表,插件名 -> 插件类实例 self.failed_plugins: dict[str, str] = {} # 记录加载失败的插件文件及其错误信息,插件名 -> 错误信息 + + # 核心消息接收器(由主程序设置) + self._core_sink: Optional[Any] = None # 确保插件目录存在 self._ensure_plugin_directories() @@ -40,6 +43,15 @@ class PluginManager: # === 插件目录管理 === + def set_core_sink(self, core_sink: Any) -> None: + """设置核心消息接收器 + + Args: + core_sink: 核心消息接收器实例(InProcessCoreSink) + """ + self._core_sink = core_sink + logger.info("已设置核心消息接收器") + def add_plugin_directory(self, directory: str) -> bool: """添加插件目录""" if os.path.exists(directory): @@ -151,6 +163,11 @@ class PluginManager: except Exception as e: logger.error(f"调用插件 '{plugin_name}' 的 on_plugin_loaded 钩子时出错: {e}") + # 检查并注册适配器组件 + task = asyncio.create_task(self._register_adapter_components(plugin_name, plugin_instance)) + _background_tasks.add(task) + task.add_done_callback(_background_tasks.discard) + return True, 1 else: self.failed_plugins[plugin_name] = "插件注册失败" @@ -165,6 +182,74 @@ class PluginManager: logger.debug("详细错误信息: ", exc_info=True) return False, 1 + async def _register_adapter_components(self, plugin_name: str, plugin_instance: PluginBase) -> None: + """注册适配器组件 + + Args: + plugin_name: 插件名称 + plugin_instance: 插件实例 + """ + try: + from src.plugin_system.base.component_types import AdapterInfo, ComponentType + from src.plugin_system.core.adapter_manager import get_adapter_manager + from src.plugin_system.core.component_registry import component_registry + + # 获取所有 ADAPTER 类型的组件 + plugin_info = plugin_instance.plugin_info + adapter_components = [ + comp for comp in plugin_info.components + if comp.component_type == ComponentType.ADAPTER + ] + + if not adapter_components: + return + + adapter_manager = get_adapter_manager() + + for comp_info in adapter_components: + # 类型检查:确保是 AdapterInfo + if not isinstance(comp_info, AdapterInfo): + logger.warning(f"组件 {comp_info.name} 不是 AdapterInfo 类型") + continue + + try: + # 从组件注册表获取适配器类 + adapter_class = component_registry.get_component_class( + comp_info.name, + ComponentType.ADAPTER + ) + + if not adapter_class: + logger.warning(f"无法找到适配器组件类: {comp_info.name}") + continue + + # 创建适配器实例,传入 core_sink 和 plugin + if self._core_sink is not None: + adapter_instance = adapter_class(self._core_sink, plugin=plugin_instance) # type: ignore + else: + logger.warning( + f"适配器 '{comp_info.name}' 未获得 core_sink," + "请在主程序中调用 plugin_manager.set_core_sink()" + ) + # 尝试无参数创建(某些适配器可能不需要 core_sink) + adapter_instance = adapter_class(plugin=plugin_instance) # type: ignore + + # 注册到适配器管理器 + adapter_manager.register_adapter(adapter_instance) # type: ignore + logger.info( + f"插件 '{plugin_name}' 注册了适配器组件: {comp_info.name} " + f"(平台: {comp_info.platform})" + ) + + except Exception as e: + logger.error( + f"注册插件 '{plugin_name}' 的适配器组件 '{comp_info.name}' 时出错: {e}", + exc_info=True + ) + + except Exception as e: + logger.error(f"处理插件 '{plugin_name}' 的适配器组件时出错: {e}", exc_info=True) + async def remove_registered_plugin(self, plugin_name: str) -> bool: """ 禁用插件模块 diff --git a/src/plugins/phi_plugin/README.md b/src/plugins/phi_plugin/README.md deleted file mode 100644 index 7c4ec0ab9..000000000 --- a/src/plugins/phi_plugin/README.md +++ /dev/null @@ -1,110 +0,0 @@ -# Phi Plugin for MoFox_Bot - -基于MoFox_Bot插件系统的Phigros查分插件,移植自原phi-plugin项目。 - -## 插件化进展 - -### ✅ 已完成 -1. **基础架构搭建** - - 创建了完整的插件目录结构 - - 实现了_manifest.json和config.toml配置文件 - - 建立了MoFox_Bot插件系统兼容的基础框架 - -2. **命令系统迁移** - - 实现了5个核心命令的PlusCommand适配: - - `phi help` - 帮助命令 - - `phi bind` - sessionToken绑定命令 - - `phi b30` - Best30查询命令 - - `phi info` - 个人信息查询命令 - - `phi score` - 单曲成绩查询命令 - -3. **数据管理模块** - - 创建了PhiDataManager用于数据处理 - - 创建了PhiDatabaseManager用于数据库操作 - - 设计了统一的数据访问接口 - -4. **配置与元数据** - - 符合MoFox_Bot规范的manifest文件 - - 支持功能开关的配置文件 - - 完整的插件依赖管理 - -### 🚧 待实现 -1. **核心功能逻辑** - - Phigros API调用实现 - - sessionToken验证逻辑 - - 存档数据解析处理 - - B30等数据计算算法 - -2. **数据存储** - - 用户token数据库存储 - - 曲库数据导入 - - 别名系统迁移 - -3. **图片生成** - - B30成绩图片生成 - - 个人信息卡片生成 - - 单曲成绩展示图 - -4. **高级功能** - - 更多原phi-plugin命令迁移 - - 数据缓存优化 - - 性能监控 - -## 目录结构 - -``` -src/plugins/phi_plugin/ -├── __init__.py # 插件初始化 -├── plugin.py # 主插件文件 -├── _manifest.json # 插件元数据 -├── config.toml # 插件配置 -├── README.md # 本文档 -├── commands/ # 命令实现 -│ ├── __init__.py -│ ├── phi_help.py # 帮助命令 -│ ├── phi_bind.py # 绑定命令 -│ ├── phi_b30.py # B30查询 -│ ├── phi_info.py # 信息查询 -│ └── phi_score.py # 单曲成绩 -├── utils/ # 工具模块 -│ ├── __init__.py -│ └── data_manager.py # 数据管理器 -├── data/ # 数据文件 -└── static/ # 静态资源 -``` - -## 使用方式 - -### 命令列表 -- `/phi help` - 查看帮助 -- `/phi bind ` - 绑定sessionToken -- `/phi b30` - 查询Best30成绩 -- `/phi info [1|2]` - 查询个人信息 -- `/phi score <曲名>` - 查询单曲成绩 - -### 配置说明 -编辑 `config.toml` 文件可以调整: -- 插件启用状态 -- API相关设置 -- 功能开关 - -## 技术特点 - -1. **架构兼容**:完全符合MoFox_Bot插件系统规范 -2. **命令适配**:使用PlusCommand系统,支持别名和参数解析 -3. **模块化设计**:清晰的模块分离,便于维护和扩展 -4. **异步处理**:全面使用async/await进行异步处理 -5. **错误处理**:完善的异常处理和用户提示 - -## 开发说明 - -目前插件已完成基础架构搭建,可以在MoFox_Bot中正常加载和注册命令。 - -下一步开发重点: -1. 实现Phigros API调用逻辑 -2. 完成数据库存储功能 -3. 移植原插件的核心算法 -4. 实现图片生成功能 - -## 原始项目 -基于 [phi-plugin](https://github.com/Catrong/phi-plugin) 进行插件化改造。