feat: 添加插件配置支持,优化适配器和组件初始化

This commit is contained in:
Windpicker-owo
2025-11-22 13:24:09 +08:00
parent 7c579e6ee4
commit b08c70dfa6
10 changed files with 102 additions and 152 deletions

View File

@@ -1,126 +0,0 @@
"""
示例:演示如何创建一个完整的适配器插件
这个示例展示了:
1. 如何继承 BaseAdapter 创建自定义适配器
2. 如何在插件中集成适配器
3. 如何支持子进程运行
"""
from pathlib import Path
from typing import Any, Dict, Optional
from mofox_bus import CoreMessageSink, InProcessCoreSink, MessageEnvelope, WebSocketAdapterOptions
from src.plugin_system.base import BaseAdapter, BasePlugin, PluginMetadata, AdapterInfo
from src.plugin_system import register_plugin
class ExampleAdapter(BaseAdapter):
"""示例适配器"""
adapter_name = "example_adapter"
adapter_version = "1.0.0"
adapter_author = "MoFox Team"
adapter_description = "示例适配器,演示如何创建适配器插件"
platform = "example"
# 是否在子进程中运行(设为 False 在主进程中运行)
run_in_subprocess = False
# 子进程入口脚本(如果 run_in_subprocess=True
subprocess_entry = "adapter_entry.py"
def __init__(self, core_sink: CoreMessageSink, plugin: Optional[BasePlugin] = None):
"""初始化适配器"""
# 配置 WebSocket 传输(如果需要)
transport = None
if plugin and plugin.config:
ws_url = plugin.config.get("websocket_url")
if ws_url:
transport = WebSocketAdapterOptions(
url=ws_url,
headers={"platform": self.platform},
)
super().__init__(core_sink, plugin=plugin, transport=transport)
def from_platform_message(self, raw: Dict[str, Any]) -> MessageEnvelope:
"""
将平台消息转换为 MessageEnvelope
Args:
raw: 平台原始消息
Returns:
MessageEnvelope: 统一消息信封
"""
# 示例:假设平台消息格式为
# {
# "id": "msg_123",
# "user_id": "user_456",
# "group_id": "group_789",
# "text": "Hello",
# "timestamp": 1234567890
# }
envelope: MessageEnvelope = {
"id": raw.get("id", "unknown"),
"direction": "incoming",
"platform": self.platform,
"timestamp_ms": int(raw.get("timestamp", 0) * 1000),
"channel": {
"channel_id": raw.get("group_id", raw.get("user_id", "unknown")),
"channel_type": "group" if "group_id" in raw else "private",
},
"sender": {
"user_id": raw.get("user_id", "unknown"),
"role": "user",
},
"content": {
"type": "text",
"text": raw.get("text", ""),
},
"conversation_id": raw.get("group_id", raw.get("user_id", "unknown")),
}
return envelope
async def _send_platform_message(self, envelope: MessageEnvelope) -> None:
"""
发送消息到平台
如果配置了 WebSocketAdapterOptions会自动通过 WebSocket 发送。
否则需要在这里实现自定义发送逻辑。
"""
if self._transport_config:
# 使用自动传输
await super()._send_platform_message(envelope)
else:
# 自定义发送逻辑
# 例如:调用平台 API
pass
class ExampleAdapterPlugin(BasePlugin):
"""示例适配器插件"""
plugin_name = "example_adapter_plugin"
enable_plugin = True # 设为 False 禁用插件
plugin_version = "1.0.0"
plugin_author = "MoFox Team"
def get_plugin_components(self) -> list:
"""获取插件组件列表
适配器作为组件返回,插件管理器会自动创建实例并传入 core_sink
"""
return [
# 适配器组件 - 使用 get_adapter_info() 方法
(ExampleAdapter.get_adapter_info(), ExampleAdapter),
]
# 注册插件
def register_plugin() -> type[BasePlugin]:
"""插件注册函数"""
return ExampleAdapterPlugin

View File

@@ -109,14 +109,15 @@ class BaseAction(ABC):
action_message: 消息数据 action_message: 消息数据
**kwargs: 其他参数 **kwargs: 其他参数
""" """
if plugin_config is None:
plugin_config: ClassVar = {}
self.action_data = action_data self.action_data = action_data
self.reasoning = reasoning self.reasoning = reasoning
self.cycle_timers = cycle_timers self.cycle_timers = cycle_timers
self.thinking_id = thinking_id self.thinking_id = thinking_id
self.log_prefix = log_prefix self.log_prefix = log_prefix
if plugin_config is None:
plugin_config = getattr(self.__class__, "plugin_config", {})
self.plugin_config = plugin_config or {} self.plugin_config = plugin_config or {}
"""对应的插件配置""" """对应的插件配置"""

View File

@@ -75,6 +75,17 @@ class BaseAdapter(MoFoxAdapterBase, ABC):
"""设置适配器配置""" """设置适配器配置"""
self._config = value self._config = value
def get_config(self, key: str, default: Any = None) -> Any:
"""获取适配器配置,优先使用插件配置,其次使用内部配置。"""
current = self.config or {}
for part in key.split("."):
if isinstance(current, dict) and part in current:
current = current[part]
else:
return default
return current
async def start(self) -> None: async def start(self) -> None:
"""启动适配器""" """启动适配器"""
logger.info(f"启动适配器: {self.adapter_name} v{self.adapter_version}") logger.info(f"启动适配器: {self.adapter_name} v{self.adapter_version}")

View File

@@ -12,29 +12,34 @@ if TYPE_CHECKING:
class BaseChatter(ABC): class BaseChatter(ABC):
chatter_name: str = "" chatter_name: str = ""
"""Chatter组件名称""" """Chatter组件名称"""
chatter_description: str = "" chatter_description: str = ""
"""Chatter组件描述""" """Chatter组件描述"""
chat_types: ClassVar[list[ChatType]] = [ChatType.PRIVATE, ChatType.GROUP] chat_types: ClassVar[list[ChatType]] = [ChatType.PRIVATE, ChatType.GROUP]
def __init__(self, stream_id: str, action_manager: "ChatterActionManager"): def __init__(self, stream_id: str, action_manager: "ChatterActionManager", plugin_config: dict | None = None):
""" """
初始化聊天处理器 初始化聊天处理器
Args: Args:
stream_id: 聊天流ID stream_id: 聊天流ID
action_manager: 动作管理器 action_manager: 动作管理器
plugin_config: 插件配置字典
""" """
self.stream_id = stream_id self.stream_id = stream_id
self.action_manager = action_manager self.action_manager = action_manager
if plugin_config is None:
plugin_config = getattr(self.__class__, "plugin_config", {})
self.plugin_config = plugin_config or {}
@abstractmethod @abstractmethod
async def execute(self, context: StreamContext) -> dict: async def execute(self, context: StreamContext) -> dict:
""" """
执行聊天处理流程 执行聊天处理逻辑
Args: Args:
context: StreamContext对象包含聊天流的所有消息信息 context: StreamContext对象包含聊天上下文信息
Returns: Returns:
处理结果字典 处理结果字典
@@ -43,9 +48,9 @@ class BaseChatter(ABC):
@classmethod @classmethod
def get_chatter_info(cls) -> "ChatterInfo": def get_chatter_info(cls) -> "ChatterInfo":
"""从类属性生成ChatterInfo """构造并返回ChatterInfo
Returns: Returns:
ChatterInfo对象 ChatterInfo实例
""" """
return ChatterInfo( return ChatterInfo(
@@ -54,3 +59,16 @@ class BaseChatter(ABC):
chat_type_allow=cls.chat_types[0], chat_type_allow=cls.chat_types[0],
component_type=ComponentType.CHATTER, component_type=ComponentType.CHATTER,
) )
def get_config(self, key: str, default=None):
"""获取插件配置,支持嵌套键"""
if not self.plugin_config:
return default
current = self.plugin_config
for part in key.split("."):
if isinstance(current, dict) and part in current:
current = current[part]
else:
return default
return current

View File

@@ -7,34 +7,52 @@ from .component_types import ComponentType, RouterInfo
class BaseRouterComponent(ABC): class BaseRouterComponent(ABC):
""" """
用于暴露HTTP端点的组件基类。 对外暴露HTTP接口的基类。
插件开发者应继承类,并实现 register_endpoints 方法来定义API路由。 插件路由类应继承类,并实现 register_endpoints 方法注册API路由。
""" """
# 组件元数据,由插件管理器读取
# 基本元数据,可由插件类读取
component_name: str component_name: str
component_description: str component_description: str
component_version: str = "1.0.0" component_version: str = "1.0.0"
# 每个组件实例都会管理自己的APIRouter # 每个路由实例都拥有自己的 APIRouter
router: APIRouter router: APIRouter
def __init__(self): def __init__(self, plugin_config: dict | None = None):
if plugin_config is None:
plugin_config = getattr(self.__class__, "plugin_config", {})
self.plugin_config = plugin_config or {}
self.router = APIRouter() self.router = APIRouter()
self.register_endpoints() self.register_endpoints()
@abstractmethod @abstractmethod
def register_endpoints(self) -> None: def register_endpoints(self) -> None:
""" """
【开发者必须实现】 子类需要实现的方法。
在此方法中定义所有HTTP端点 在此方法中定义插件的HTTP接口
""" """
pass pass
@classmethod @classmethod
def get_router_info(cls) -> "RouterInfo": def get_router_info(cls) -> "RouterInfo":
"""从类属性生成RouterInfo""" """构造 RouterInfo"""
return RouterInfo( return RouterInfo(
name=cls.component_name, name=cls.component_name,
description=getattr(cls, "component_description", "路由组件"), description=getattr(cls, "component_description", "路由组件"),
component_type=ComponentType.ROUTER, component_type=ComponentType.ROUTER,
) )
def get_config(self, key: str, default=None):
"""获取插件配置值,支持嵌套键"""
if not self.plugin_config:
return default
current = self.plugin_config
for part in key.split("."):
if isinstance(current, dict) and part in current:
current = current[part]
else:
return default
return current

View File

@@ -5,7 +5,7 @@
import time import time
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from typing import TYPE_CHECKING from typing import TYPE_CHECKING, Any
if TYPE_CHECKING: if TYPE_CHECKING:
from src.common.data_models.database_data_model import DatabaseMessages from src.common.data_models.database_data_model import DatabaseMessages
@@ -79,12 +79,16 @@ class BaseInterestCalculator(ABC):
component_description: str = "" component_description: str = ""
enabled_by_default: bool = True # 是否默认启用 enabled_by_default: bool = True # 是否默认启用
def __init__(self): def __init__(self, plugin_config: dict | None = None):
self._enabled = False self._enabled = False
self._last_calculation_time = 0.0 self._last_calculation_time = 0.0
self._total_calculations = 0 self._total_calculations = 0
self._failed_calculations = 0 self._failed_calculations = 0
self._average_calculation_time = 0.0 self._average_calculation_time = 0.0
if plugin_config is None:
plugin_config = getattr(self.__class__, "plugin_config", {})
self.plugin_config = plugin_config or {}
# 验证必须定义的属性 # 验证必须定义的属性
if not self.component_name: if not self.component_name:
@@ -193,6 +197,19 @@ class BaseInterestCalculator(ABC):
self._update_statistics(result) self._update_statistics(result)
return result return result
def get_config(self, key: str, default: Any = None) -> Any:
"""获取插件配置,支持嵌套键访问"""
if not self.plugin_config:
return default
current = self.plugin_config
for part in key.split("."):
if isinstance(current, dict) and part in current:
current = current[part]
else:
return default
return current
@classmethod @classmethod
def get_interest_calculator_info(cls) -> "InterestCalculatorInfo": def get_interest_calculator_info(cls) -> "InterestCalculatorInfo":
"""从类属性生成InterestCalculatorInfo """从类属性生成InterestCalculatorInfo

View File

@@ -34,7 +34,9 @@ class BasePrompt(ABC):
injection_point: str | list[str] | None = None injection_point: str | list[str] | None = None
"""[已废弃] 要注入的目标Prompt名称或列表请使用 injection_rules""" """[已废弃] 要注入的目标Prompt名称或列表请使用 injection_rules"""
def __init__(self, params: PromptParameters, plugin_config: dict | None = None, target_prompt_name: str | None = None): def __init__(
self, params: PromptParameters, plugin_config: dict | None = None, target_prompt_name: str | None = None
):
"""初始化Prompt组件 """初始化Prompt组件
Args: Args:
@@ -43,6 +45,9 @@ class BasePrompt(ABC):
target_prompt_name: 在应用注入时,当前注入的目标提示词名称。 target_prompt_name: 在应用注入时,当前注入的目标提示词名称。
""" """
self.params = params self.params = params
if plugin_config is None:
plugin_config = getattr(self.__class__, "plugin_config", {})
self.plugin_config = plugin_config or {} self.plugin_config = plugin_config or {}
self.target_prompt_name = target_prompt_name self.target_prompt_name = target_prompt_name
self.log_prefix = "[PromptComponent]" self.log_prefix = "[PromptComponent]"

View File

@@ -48,6 +48,9 @@ class BaseTool(ABC):
"""子工具列表,格式为[(子工具名, 子工具描述, 子工具参数)]。仅在二步工具中使用""" """子工具列表,格式为[(子工具名, 子工具描述, 子工具参数)]。仅在二步工具中使用"""
def __init__(self, plugin_config: dict | None = None, chat_stream: Any = None): def __init__(self, plugin_config: dict | None = None, chat_stream: Any = None):
if plugin_config is None:
plugin_config = getattr(self.__class__, "plugin_config", {})
self.plugin_config = plugin_config or {} # 直接存储插件配置字典 self.plugin_config = plugin_config or {} # 直接存储插件配置字典
self.chat_stream = chat_stream # 存储聊天流信息,可用于获取上下文 self.chat_stream = chat_stream # 存储聊天流信息,可用于获取上下文
@@ -205,7 +208,7 @@ class BaseTool(ABC):
"""直接执行工具函数(供插件调用) """直接执行工具函数(供插件调用)
通过该方法,插件可以直接调用工具,而不需要传入字典格式的参数 通过该方法,插件可以直接调用工具,而不需要传入字典格式的参数
插件可以直接调用此方法,用更加明了的方式传入参数 插件可以直接调用此方法,用更加明了的方式传入参数
示例: result = await tool.direct_execute(arg1="参数",arg2="参数2") 示例: result = await tool.direct_execute(arg1=\"参数\",arg2=\"参数2\")
工具开发者可以重写此方法以实现与llm调用差异化的执行逻辑 工具开发者可以重写此方法以实现与llm调用差异化的执行逻辑
@@ -226,7 +229,7 @@ class BaseTool(ABC):
"""获取插件配置值,使用嵌套键访问 """获取插件配置值,使用嵌套键访问
Args: Args:
key: 配置键名,使用嵌套访问如 "section.subsection.key" key: 配置键名,使用嵌套访问如 \"section.subsection.key\"
default: 默认值 default: 默认值
Returns: Returns:

View File

@@ -60,6 +60,9 @@ class PlusCommand(ABC):
message: 接收到的消息对象DatabaseMessages message: 接收到的消息对象DatabaseMessages
plugin_config: 插件配置字典 plugin_config: 插件配置字典
""" """
if plugin_config is None:
plugin_config = getattr(self.__class__, "plugin_config", {})
self.message = message self.message = message
self.plugin_config = plugin_config or {} self.plugin_config = plugin_config or {}
self.log_prefix = "[PlusCommand]" self.log_prefix = "[PlusCommand]"

View File

@@ -341,11 +341,9 @@ class ComponentRegistry:
if not hasattr(self, "_enabled_interest_calculator_registry"): if not hasattr(self, "_enabled_interest_calculator_registry"):
self._enabled_interest_calculator_registry: dict[str, type["BaseInterestCalculator"]] = {} self._enabled_interest_calculator_registry: dict[str, type["BaseInterestCalculator"]] = {}
setattr(interest_calculator_class, "plugin_name", interest_calculator_info.plugin_name) _assign_plugin_attrs(
# 设置插件配置
setattr(
interest_calculator_class, interest_calculator_class,
"plugin_config", interest_calculator_info.plugin_name,
self.get_plugin_config(interest_calculator_info.plugin_name) or {}, self.get_plugin_config(interest_calculator_info.plugin_name) or {},
) )
self._interest_calculator_registry[calculator_name] = interest_calculator_class self._interest_calculator_registry[calculator_name] = interest_calculator_class
@@ -394,6 +392,8 @@ class ComponentRegistry:
router_name = router_info.name router_name = router_info.name
plugin_name = router_info.plugin_name plugin_name = router_info.plugin_name
plugin_config = self.get_plugin_config(plugin_name) or {}
_assign_plugin_attrs(router_class, plugin_name, plugin_config)
# 2. 实例化组件以触发其 __init__ 和 register_endpoints # 2. 实例化组件以触发其 __init__ 和 register_endpoints
component_instance = router_class() component_instance = router_class()