feat:将旧版示例插件更新,更新mute插件(tts,vtb,doubaopic持续炸裂中)

This commit is contained in:
SengokuCola
2025-06-10 23:36:45 +08:00
parent 9f90b2568c
commit 6455dab5b8
45 changed files with 1657 additions and 2001 deletions

View File

@@ -366,6 +366,12 @@ class ActionManager:
logger.error(f"未找到插件Action组件类: {action_name}")
return None
# 获取插件配置
component_info = component_registry.get_component_info(action_name)
plugin_config = None
if component_info and component_info.plugin_name:
plugin_config = component_registry.get_plugin_config(component_info.plugin_name)
# 创建插件Action实例
plugin_action_instance = component_class(
action_data=action_data,
@@ -377,6 +383,7 @@ class ActionManager:
replyer=replyer,
observations=observations,
log_prefix=log_prefix,
plugin_config=plugin_config,
)
# 创建兼容性包装器

View File

@@ -335,7 +335,7 @@ class DefaultReplyer:
chat_talking_prompt = build_readable_messages(
message_list_before_now,
replace_bot_name=True,
merge_messages=True,
merge_messages=False,
timestamp_mode="normal_no_YMD",
read_mark=0.0,
truncate=True,

View File

@@ -59,10 +59,13 @@ class ChatBot:
# 使用新的组件注册中心查找命令
command_result = component_registry.find_command_by_text(text)
if command_result:
command_class, matched_groups = command_result
command_class, matched_groups, intercept_message, plugin_name = command_result
# 获取插件配置
plugin_config = component_registry.get_plugin_config(plugin_name)
# 创建命令实例
command_instance = command_class(message)
command_instance = command_class(message, plugin_config)
command_instance.set_matched_groups(matched_groups)
try:
@@ -71,11 +74,12 @@ class ChatBot:
# 记录命令执行结果
if success:
logger.info(f"命令执行成功: {command_class.__name__}")
logger.info(f"命令执行成功: {command_class.__name__} (拦截: {intercept_message})")
else:
logger.warning(f"命令执行失败: {command_class.__name__} - {response}")
return True, response, False # 找到命令,不继续处理
# 根据命令的拦截设置决定是否继续处理消息
return True, response, not intercept_message # 找到命令根据intercept_message决定是否继续
except Exception as e:
logger.error(f"执行命令时出错: {command_class.__name__} - {e}")
@@ -88,7 +92,8 @@ class ChatBot:
except Exception as send_error:
logger.error(f"发送错误消息失败: {send_error}")
return True, str(e), False # 命令出错,不继续处理
# 命令出错时,根据命令的拦截设置决定是否继续处理消息
return True, str(e), not intercept_message
# 没有找到命令,继续处理消息
return False, None, True

View File

@@ -20,9 +20,6 @@ from rich.traceback import install
from .chat.focus_chat.expressors.exprssion_learner import expression_learner
from .api.main import start_api_server
# 导入actions模块确保装饰器被执行
import src.chat.actions.default_actions # noqa
# 导入新的插件管理器
from src.plugin_system.core.plugin_manager import plugin_manager
@@ -82,8 +79,8 @@ class MainSystem:
logger.success("API服务器启动成功")
# 加载所有actions包括默认的和插件的
self._load_all_actions()
logger.success("动作系统加载成功")
plugin_count, component_count = plugin_manager.load_all_plugins()
logger.success(f"插件系统加载成功: {plugin_count} 个插件,{component_count} 个组件")
# 初始化表情管理器
emoji_manager.initialize()
@@ -138,18 +135,6 @@ class MainSystem:
logger.error(f"启动大脑和外部世界失败: {e}")
raise
def _load_all_actions(self):
"""加载所有actions和commands使用新的插件系统"""
try:
# 使用新的插件管理器加载所有插件
plugin_count, component_count = plugin_manager.load_all_plugins()
logger.success(f"插件系统加载成功: {plugin_count} 个插件,{component_count} 个组件")
except Exception as e:
logger.error(f"加载插件失败: {e}")
logger.error(traceback.format_exc())
async def schedule_tasks(self):
"""调度定时任务"""
while True:

View File

@@ -26,6 +26,34 @@ class ConfigAPI:
"""
return global_config.get(key, default)
def get_config(self, key: str, default: Any = None) -> Any:
"""
从插件配置中获取值,支持嵌套键访问
Args:
key: 配置键名,支持嵌套访问如 "section.subsection.key"
default: 如果配置不存在时返回的默认值
Returns:
Any: 配置值或默认值
"""
# 获取插件配置
plugin_config = getattr(self, '_plugin_config', {})
if not plugin_config:
return default
# 支持嵌套键访问
keys = key.split('.')
current = plugin_config
for k in keys:
if isinstance(current, dict) and k in current:
current = current[k]
else:
return default
return current
async def get_user_id_by_person_name(self, person_name: str) -> tuple[str, str]:
"""根据用户名获取用户ID

View File

@@ -160,201 +160,7 @@ class MessageAPI:
message_type="text", content=text, platform=platform, target_id=user_id, is_group=False
)
async def send_message(self, type: str, data: str, target: Optional[str] = "", display_message: str = "") -> bool:
"""发送消息的简化方法
Args:
type: 消息类型,如"text""image"
data: 消息内容
target: 目标消息(可选)
display_message: 显示的消息内容(可选)
Returns:
bool: 是否发送成功
"""
try:
# 安全获取服务和日志前缀
services = getattr(self, "_services", {})
log_prefix = getattr(self, "log_prefix", "[MessageAPI]")
expressor: DefaultExpressor = services.get("expressor")
chat_stream: ChatStream = services.get("chat_stream")
if not expressor or not chat_stream:
logger.error(f"{log_prefix} 无法发送消息:缺少必要的内部服务")
return False
# 获取锚定消息(如果有)
observations = services.get("observations", [])
if len(observations) > 0:
chatting_observation: ChattingObservation = next(
(obs for obs in observations if isinstance(obs, ChattingObservation)), None
)
if chatting_observation:
anchor_message = chatting_observation.search_message_by_text(target)
else:
anchor_message = None
else:
anchor_message = None
# 如果没有找到锚点消息,创建一个占位符
if not anchor_message:
logger.info(f"{log_prefix} 未找到锚点消息,创建占位符")
anchor_message = await create_empty_anchor_message(
chat_stream.platform, chat_stream.group_info, chat_stream
)
else:
anchor_message.update_chat_stream(chat_stream)
response_set = [
(type, data),
]
# 调用内部方法发送消息
success = await expressor.send_response_messages(
anchor_message=anchor_message,
response_set=response_set,
display_message=display_message,
)
return success
except Exception as e:
log_prefix = getattr(self, "log_prefix", "[MessageAPI]")
logger.error(f"{log_prefix} 发送消息时出错: {e}")
traceback.print_exc()
return False
async def send_message_by_expressor(self, text: str, target: Optional[str] = None) -> bool:
"""通过expressor发送文本消息的简化方法
Args:
text: 要发送的消息文本
target: 目标消息(可选)
Returns:
bool: 是否发送成功
"""
# 安全获取服务和日志前缀
services = getattr(self, "_services", {})
log_prefix = getattr(self, "log_prefix", "[MessageAPI]")
expressor: DefaultExpressor = services.get("expressor")
chat_stream: ChatStream = services.get("chat_stream")
if not expressor or not chat_stream:
logger.error(f"{log_prefix} 无法发送消息:缺少必要的内部服务")
return False
# 构造简化的动作数据
reply_data = {"text": text, "target": target or "", "emojis": []}
# 获取锚定消息(如果有)
observations = services.get("observations", [])
# 查找 ChattingObservation 实例
chatting_observation = None
for obs in observations:
if isinstance(obs, ChattingObservation):
chatting_observation = obs
break
if not chatting_observation:
logger.warning(f"{log_prefix} 未找到 ChattingObservation 实例,创建占位符")
anchor_message = await create_empty_anchor_message(
chat_stream.platform, chat_stream.group_info, chat_stream
)
else:
anchor_message = chatting_observation.search_message_by_text(reply_data["target"])
if not anchor_message:
logger.info(f"{log_prefix} 未找到锚点消息,创建占位符")
anchor_message = await create_empty_anchor_message(
chat_stream.platform, chat_stream.group_info, chat_stream
)
else:
anchor_message.update_chat_stream(chat_stream)
# 调用内部方法发送消息
cycle_timers = getattr(self, "cycle_timers", {})
reasoning = getattr(self, "reasoning", "插件生成")
thinking_id = getattr(self, "thinking_id", "plugin_thinking")
success, _ = await expressor.deal_reply(
cycle_timers=cycle_timers,
action_data=reply_data,
anchor_message=anchor_message,
reasoning=reasoning,
thinking_id=thinking_id,
)
return success
async def send_message_by_replyer(
self, target: Optional[str] = None, extra_info_block: Optional[str] = None
) -> bool:
"""通过replyer发送消息的简化方法
Args:
target: 目标消息(可选)
extra_info_block: 额外信息块(可选)
Returns:
bool: 是否发送成功
"""
# 安全获取服务和日志前缀
services = getattr(self, "_services", {})
log_prefix = getattr(self, "log_prefix", "[MessageAPI]")
replyer: DefaultReplyer = services.get("replyer")
chat_stream: ChatStream = services.get("chat_stream")
if not replyer or not chat_stream:
logger.error(f"{log_prefix} 无法发送消息:缺少必要的内部服务")
return False
# 构造简化的动作数据
reply_data = {"target": target or "", "extra_info_block": extra_info_block}
# 获取锚定消息(如果有)
observations = services.get("observations", [])
# 查找 ChattingObservation 实例
chatting_observation = None
for obs in observations:
if isinstance(obs, ChattingObservation):
chatting_observation = obs
break
if not chatting_observation:
logger.warning(f"{log_prefix} 未找到 ChattingObservation 实例,创建占位符")
anchor_message = await create_empty_anchor_message(
chat_stream.platform, chat_stream.group_info, chat_stream
)
else:
anchor_message = chatting_observation.search_message_by_text(reply_data["target"])
if not anchor_message:
logger.info(f"{log_prefix} 未找到锚点消息,创建占位符")
anchor_message = await create_empty_anchor_message(
chat_stream.platform, chat_stream.group_info, chat_stream
)
else:
anchor_message.update_chat_stream(chat_stream)
# 调用内部方法发送消息
cycle_timers = getattr(self, "cycle_timers", {})
reasoning = getattr(self, "reasoning", "插件生成")
thinking_id = getattr(self, "thinking_id", "plugin_thinking")
success, _ = await replyer.deal_reply(
cycle_timers=cycle_timers,
action_data=reply_data,
anchor_message=anchor_message,
reasoning=reasoning,
thinking_id=thinking_id,
)
return success
def get_chat_type(self) -> str:
"""获取当前聊天类型

View File

@@ -33,7 +33,7 @@ class PluginAPI(MessageAPI, LLMAPI, DatabaseAPI, ConfigAPI, UtilsAPI, StreamAPI,
"""
def __init__(
self, chat_stream=None, expressor=None, replyer=None, observations=None, log_prefix: str = "[PluginAPI]"
self, chat_stream=None, expressor=None, replyer=None, observations=None, log_prefix: str = "[PluginAPI]", plugin_config: dict = None
):
"""
初始化插件API
@@ -44,6 +44,7 @@ class PluginAPI(MessageAPI, LLMAPI, DatabaseAPI, ConfigAPI, UtilsAPI, StreamAPI,
replyer: 回复器对象
observations: 观察列表
log_prefix: 日志前缀
plugin_config: 插件配置字典
"""
# 存储依赖对象
self._services = {
@@ -61,6 +62,9 @@ class PluginAPI(MessageAPI, LLMAPI, DatabaseAPI, ConfigAPI, UtilsAPI, StreamAPI,
# 调用所有父类的初始化
super().__init__()
# 存储插件配置
self._plugin_config = plugin_config or {}
logger.debug(f"{self.log_prefix} PluginAPI 初始化完成")
def set_chat_stream(self, chat_stream):
@@ -105,7 +109,7 @@ class PluginAPI(MessageAPI, LLMAPI, DatabaseAPI, ConfigAPI, UtilsAPI, StreamAPI,
# 便捷的工厂函数
def create_plugin_api(
chat_stream=None, expressor=None, replyer=None, observations=None, log_prefix: str = "[Plugin]"
chat_stream=None, expressor=None, replyer=None, observations=None, log_prefix: str = "[Plugin]", plugin_config: dict = None
) -> PluginAPI:
"""
创建插件API实例的便捷函数
@@ -116,12 +120,13 @@ def create_plugin_api(
replyer: 回复器对象
observations: 观察列表
log_prefix: 日志前缀
plugin_config: 插件配置字典
Returns:
PluginAPI: 配置好的插件API实例
"""
return PluginAPI(
chat_stream=chat_stream, expressor=expressor, replyer=replyer, observations=observations, log_prefix=log_prefix
chat_stream=chat_stream, expressor=expressor, replyer=replyer, observations=observations, log_prefix=log_prefix, plugin_config=plugin_config
)

View File

@@ -35,6 +35,7 @@ class BaseAction(ABC):
chat_stream=None,
log_prefix: str = "",
shutting_down: bool = False,
plugin_config: dict = None,
**kwargs,
):
"""初始化Action组件
@@ -50,6 +51,7 @@ class BaseAction(ABC):
chat_stream: 聊天流对象
log_prefix: 日志前缀
shutting_down: 是否正在关闭
plugin_config: 插件配置字典
**kwargs: 其他参数
"""
self.action_data = action_data
@@ -84,6 +86,7 @@ class BaseAction(ABC):
replyer=replyer or kwargs.get("replyer"),
observations=observations or kwargs.get("observations", []),
log_prefix=log_prefix,
plugin_config=plugin_config or kwargs.get("plugin_config"),
)
# 设置API的action上下文
@@ -118,7 +121,221 @@ class BaseAction(ABC):
Returns:
bool: 是否发送成功
"""
return await self.api.send_message("text", content)
chat_stream = self.api.get_service('chat_stream')
if not chat_stream:
logger.error(f"{self.log_prefix} 没有可用的聊天流发送回复")
return False
if chat_stream.group_info:
# 群聊
return await self.api.send_text_to_group(
text=content,
group_id=str(chat_stream.group_info.group_id),
platform=chat_stream.platform
)
else:
# 私聊
return await self.api.send_text_to_user(
text=content,
user_id=str(chat_stream.user_info.user_id),
platform=chat_stream.platform
)
async def send_command(self, command_name: str, args: dict = None, display_message: str = None) -> bool:
"""发送命令消息
使用和send_reply相同的方式通过MessageAPI发送命令
Args:
command_name: 命令名称
args: 命令参数
display_message: 显示消息
Returns:
bool: 是否发送成功
"""
try:
# 构造命令数据
command_data = {
"name": command_name,
"args": args or {}
}
# 使用send_message_to_target方法发送命令
chat_stream = self.api.get_service('chat_stream')
if not chat_stream:
logger.error(f"{self.log_prefix} 没有可用的聊天流发送命令")
return False
command_content = str(command_data)
if chat_stream.group_info:
# 群聊
success = await self.api.send_message_to_target(
message_type="command",
content=command_content,
platform=chat_stream.platform,
target_id=str(chat_stream.group_info.group_id),
is_group=True,
display_message=display_message or f"执行命令: {command_name}"
)
else:
# 私聊
success = await self.api.send_message_to_target(
message_type="command",
content=command_content,
platform=chat_stream.platform,
target_id=str(chat_stream.user_info.user_id),
is_group=False,
display_message=display_message or f"执行命令: {command_name}"
)
if success:
logger.info(f"{self.log_prefix} 成功发送命令: {command_name}")
else:
logger.error(f"{self.log_prefix} 发送命令失败: {command_name}")
return success
except Exception as e:
logger.error(f"{self.log_prefix} 发送命令时出错: {e}")
return False
async def send_message_by_expressor(self, text: str, target: str = "") -> bool:
"""通过expressor发送文本消息的Action专用方法
Args:
text: 要发送的消息文本
target: 目标消息(可选)
Returns:
bool: 是否发送成功
"""
try:
from src.chat.heart_flow.observation.chatting_observation import ChattingObservation
from src.chat.message_receive.message import create_empty_anchor_message
# 获取服务
expressor = self.api.get_service("expressor")
chat_stream = self.api.get_service("chat_stream")
observations = self.api.get_service("observations") or []
if not expressor or not chat_stream:
logger.error(f"{self.log_prefix} 无法通过expressor发送消息缺少必要的服务")
return False
# 构造动作数据
reply_data = {"text": text, "target": target, "emojis": []}
# 查找 ChattingObservation 实例
chatting_observation = None
for obs in observations:
if isinstance(obs, ChattingObservation):
chatting_observation = obs
break
if not chatting_observation:
logger.warning(f"{self.log_prefix} 未找到 ChattingObservation 实例,创建占位符")
anchor_message = await create_empty_anchor_message(
chat_stream.platform, chat_stream.group_info, chat_stream
)
else:
anchor_message = chatting_observation.search_message_by_text(target)
if not anchor_message:
logger.info(f"{self.log_prefix} 未找到锚点消息,创建占位符")
anchor_message = await create_empty_anchor_message(
chat_stream.platform, chat_stream.group_info, chat_stream
)
else:
anchor_message.update_chat_stream(chat_stream)
# 使用Action上下文信息发送消息
success, _ = await expressor.deal_reply(
cycle_timers=self.cycle_timers,
action_data=reply_data,
anchor_message=anchor_message,
reasoning=self.reasoning,
thinking_id=self.thinking_id,
)
if success:
logger.info(f"{self.log_prefix} 成功通过expressor发送消息")
else:
logger.error(f"{self.log_prefix} 通过expressor发送消息失败")
return success
except Exception as e:
logger.error(f"{self.log_prefix} 通过expressor发送消息时出错: {e}")
return False
async def send_message_by_replyer(self, target: str = "", extra_info_block: str = None) -> bool:
"""通过replyer发送消息的Action专用方法
Args:
target: 目标消息(可选)
extra_info_block: 额外信息块(可选)
Returns:
bool: 是否发送成功
"""
try:
from src.chat.heart_flow.observation.chatting_observation import ChattingObservation
from src.chat.message_receive.message import create_empty_anchor_message
# 获取服务
replyer = self.api.get_service("replyer")
chat_stream = self.api.get_service("chat_stream")
observations = self.api.get_service("observations") or []
if not replyer or not chat_stream:
logger.error(f"{self.log_prefix} 无法通过replyer发送消息缺少必要的服务")
return False
# 构造动作数据
reply_data = {"target": target, "extra_info_block": extra_info_block}
# 查找 ChattingObservation 实例
chatting_observation = None
for obs in observations:
if isinstance(obs, ChattingObservation):
chatting_observation = obs
break
if not chatting_observation:
logger.warning(f"{self.log_prefix} 未找到 ChattingObservation 实例,创建占位符")
anchor_message = await create_empty_anchor_message(
chat_stream.platform, chat_stream.group_info, chat_stream
)
else:
anchor_message = chatting_observation.search_message_by_text(target)
if not anchor_message:
logger.info(f"{self.log_prefix} 未找到锚点消息,创建占位符")
anchor_message = await create_empty_anchor_message(
chat_stream.platform, chat_stream.group_info, chat_stream
)
else:
anchor_message.update_chat_stream(chat_stream)
# 使用Action上下文信息发送消息
success, _ = await replyer.deal_reply(
cycle_timers=self.cycle_timers,
action_data=reply_data,
anchor_message=anchor_message,
reasoning=self.reasoning,
thinking_id=self.thinking_id,
)
if success:
logger.info(f"{self.log_prefix} 成功通过replyer发送消息")
else:
logger.error(f"{self.log_prefix} 通过replyer发送消息失败")
return success
except Exception as e:
logger.error(f"{self.log_prefix} 通过replyer发送消息时出错: {e}")
return False
@classmethod
def get_action_info(cls, name: str = None, description: str = None) -> "ActionInfo":
@@ -132,9 +349,11 @@ class BaseAction(ABC):
ActionInfo: 生成的Action信息对象
"""
# 自动生成名称和描述
# 优先使用类属性,然后自动生成
if name is None:
name = cls.__name__.lower().replace("action", "")
name = getattr(cls, "action_name", cls.__name__.lower().replace("action", ""))
if description is None:
description = getattr(cls, "action_description", None)
if description is None:
description = cls.__doc__ or f"{cls.__name__} Action组件"
description = description.strip().split("\n")[0] # 取第一行作为描述

View File

@@ -17,24 +17,27 @@ class BaseCommand(ABC):
- command_pattern: 命令匹配的正则表达式
- command_help: 命令帮助信息
- command_examples: 命令使用示例列表
- intercept_message: 是否拦截消息处理默认True拦截False继续传递
"""
# 默认命令设置(子类可以覆盖)
command_pattern: str = ""
command_help: str = ""
command_examples: List[str] = []
intercept_message: bool = True # 默认拦截消息,不继续处理
def __init__(self, message: MessageRecv):
def __init__(self, message: MessageRecv, plugin_config: dict = None):
"""初始化Command组件
Args:
message: 接收到的消息对象
plugin_config: 插件配置字典
"""
self.message = message
self.matched_groups: Dict[str, str] = {} # 存储正则表达式匹配的命名组
# 创建API实例
self.api = PluginAPI(chat_stream=message.chat_stream, log_prefix="[Command]")
self.api = PluginAPI(chat_stream=message.chat_stream, log_prefix="[Command]", plugin_config=plugin_config)
self.log_prefix = "[Command]"
@@ -77,6 +80,62 @@ class BaseCommand(ABC):
text=content, user_id=str(chat_stream.user_info.user_id), platform=chat_stream.platform
)
async def send_command(self, command_name: str, args: dict = None, display_message: str = None) -> bool:
"""发送命令消息
使用和send_reply相同的方式通过MessageAPI发送命令
Args:
command_name: 命令名称
args: 命令参数
display_message: 显示消息
Returns:
bool: 是否发送成功
"""
try:
# 构造命令数据
command_data = {
"name": command_name,
"args": args or {}
}
# 使用send_message_to_target方法发送命令
chat_stream = self.message.chat_stream
command_content = str(command_data)
if chat_stream.group_info:
# 群聊
success = await self.api.send_message_to_target(
message_type="command",
content=command_content,
platform=chat_stream.platform,
target_id=str(chat_stream.group_info.group_id),
is_group=True,
display_message=display_message or f"执行命令: {command_name}"
)
else:
# 私聊
success = await self.api.send_message_to_target(
message_type="command",
content=command_content,
platform=chat_stream.platform,
target_id=str(chat_stream.user_info.user_id),
is_group=False,
display_message=display_message or f"执行命令: {command_name}"
)
if success:
logger.info(f"{self.log_prefix} 成功发送命令: {command_name}")
else:
logger.error(f"{self.log_prefix} 发送命令失败: {command_name}")
return success
except Exception as e:
logger.error(f"{self.log_prefix} 发送命令时出错: {e}")
return False
@classmethod
def get_command_info(cls, name: str = None, description: str = None) -> "CommandInfo":
"""从类属性生成CommandInfo
@@ -89,9 +148,11 @@ class BaseCommand(ABC):
CommandInfo: 生成的Command信息对象
"""
# 自动生成名称和描述
# 优先使用类属性,然后自动生成
if name is None:
name = cls.__name__.lower().replace("command", "")
name = getattr(cls, "command_name", cls.__name__.lower().replace("command", ""))
if description is None:
description = getattr(cls, "command_description", None)
if description is None:
description = cls.__doc__ or f"{cls.__name__} Command组件"
description = description.strip().split("\n")[0] # 取第一行作为描述
@@ -103,4 +164,5 @@ class BaseCommand(ABC):
command_pattern=cls.command_pattern,
command_help=cls.command_help,
command_examples=cls.command_examples.copy() if cls.command_examples else [],
intercept_message=cls.intercept_message,
)

View File

@@ -86,6 +86,7 @@ class CommandInfo(ComponentInfo):
command_pattern: str = "" # 命令匹配模式(正则表达式)
command_help: str = "" # 命令帮助信息
command_examples: List[str] = None # 命令使用示例
intercept_message: bool = True # 是否拦截消息处理(默认拦截)
def __post_init__(self):
super().__post_init__()

View File

@@ -141,14 +141,14 @@ class ComponentRegistry:
info = self.get_component_info(command_name)
return info if isinstance(info, CommandInfo) else None
def find_command_by_text(self, text: str) -> Optional[tuple[Type, dict]]:
def find_command_by_text(self, text: str) -> Optional[tuple[Type, dict, bool, str]]:
"""根据文本查找匹配的命令
Args:
text: 输入文本
Returns:
Optional[tuple[Type, dict]]: (命令类, 匹配的命名组) 或 None
Optional[tuple[Type, dict, bool, str]]: (命令类, 匹配的命名组, 是否拦截消息, 插件名) 或 None
"""
for pattern, command_class in self._command_patterns.items():
match = pattern.match(text)
@@ -164,7 +164,7 @@ class ComponentRegistry:
if command_name:
command_info = self.get_command_info(command_name)
if command_info and command_info.enabled:
return command_class, match.groupdict()
return command_class, match.groupdict(), command_info.intercept_message, command_info.plugin_name
return None
# === 插件管理方法 ===
@@ -205,6 +205,20 @@ class ComponentRegistry:
plugin_info = self.get_plugin_info(plugin_name)
return plugin_info.components if plugin_info else []
def get_plugin_config(self, plugin_name: str) -> Optional[dict]:
"""获取插件配置
Args:
plugin_name: 插件名称
Returns:
Optional[dict]: 插件配置字典或None
"""
# 从插件管理器获取插件实例的配置
from src.plugin_system.core.plugin_manager import plugin_manager
plugin_instance = plugin_manager.get_plugin_instance(plugin_name)
return plugin_instance.config if plugin_instance else None
# === 状态管理方法 ===
def enable_component(self, component_name: str) -> bool:

View File

@@ -2,6 +2,7 @@ from typing import Dict, List, Optional, Any, TYPE_CHECKING
import os
import importlib
import importlib.util
import inspect
from pathlib import Path
if TYPE_CHECKING:
@@ -72,9 +73,10 @@ class PluginManager:
if plugin_dir:
self.plugin_paths[plugin_name] = plugin_dir
if instantiate_and_register_plugin(plugin_class, plugin_dir):
plugin_instance = plugin_class(plugin_dir=plugin_dir)
if plugin_instance.register_plugin():
total_registered += 1
self.loaded_plugins[plugin_name] = plugin_class
self.loaded_plugins[plugin_name] = plugin_instance
# 📊 显示插件详细信息
plugin_info = component_registry.get_plugin_info(plugin_name)
@@ -288,6 +290,17 @@ class PluginManager:
return True
return False
def get_plugin_instance(self, plugin_name: str) -> Optional["BasePlugin"]:
"""获取插件实例
Args:
plugin_name: 插件名称
Returns:
Optional[BasePlugin]: 插件实例或None
"""
return self.loaded_plugins.get(plugin_name)
def get_plugin_stats(self) -> Dict[str, Any]:
"""获取插件统计信息"""
all_plugins = component_registry.get_all_plugins()

View File

@@ -1,206 +0,0 @@
# MaiBot 插件系统架构
## 概述
MaiBot 插件系统采用组件化设计,支持插件包含多种组件类型:
- **Action组件**:处理聊天中的动作逻辑
- **Command组件**:处理命令请求
- **未来扩展**Scheduler定时任务、Listener事件监听
## 目录结构
```
src/plugins/
├── core/ # 插件核心管理
│ ├── plugin_manager.py # 插件管理器
│ ├── plugin_loader.py # 插件加载器(预留)
│ └── component_registry.py # 组件注册中心
├── apis/ # API模块
│ ├── plugin_api.py # 统一API聚合
│ ├── message_api.py # 消息API
│ ├── llm_api.py # LLM API
│ ├── database_api.py # 数据库API
│ ├── config_api.py # 配置API
│ ├── utils_api.py # 工具API
│ ├── stream_api.py # 流API
│ └── hearflow_api.py # 心流API
├── base/ # 基础类
│ ├── base_plugin.py # 插件基类
│ ├── base_action.py # Action组件基类
│ ├── base_command.py # Command组件基类
│ └── component_types.py # 组件类型定义
├── built_in/ # 内置组件
│ ├── actions/ # 内置Action
│ └── commands/ # 内置Command
└── examples/ # 示例插件
└── simple_plugin/ # 简单插件示例
├── plugin.py
└── config.toml
```
## 核心特性
### 1. 组件化设计
- 插件可以包含多种组件类型
- 每种组件有明确的职责和接口
- 支持组件的独立启用/禁用
### 2. 统一的API访问
- 所有插件组件通过 `PluginAPI` 访问系统功能
- 包含消息发送、数据库操作、LLM调用等
- 提供统一的错误处理和日志记录
### 3. 灵活的配置系统
- 支持 TOML 格式的配置文件
- 插件可以读取自定义配置
- 支持全局配置和插件特定配置
### 4. 统一的注册管理
- 组件注册中心管理所有组件
- 支持组件的动态启用/禁用
- 提供丰富的查询和统计接口
## 插件开发指南
### 创建基本插件
```python
from src.plugins.base.base_plugin import BasePlugin, register_plugin
from src.plugins.base.base_action import BaseAction
from src.plugins.base.component_types import ActionInfo, ActionActivationType
class MyAction(BaseAction):
async def execute(self) -> tuple[bool, str]:
# 使用API发送消息
response = "Hello from my plugin!"
return True, response
@register_plugin
class MyPlugin(BasePlugin):
plugin_name = "my_plugin"
plugin_description = "我的第一个插件"
def get_plugin_components(self):
action_info = ActionInfo(
name="my_action",
description="我的动作",
activation_keywords=["hello"]
)
return [(action_info, MyAction)]
```
### 创建命令组件
```python
from src.plugins.base.base_command import BaseCommand
from src.plugins.base.component_types import CommandInfo
class MyCommand(BaseCommand):
async def execute(self) -> tuple[bool, str]:
# 获取命令参数
param = self.matched_groups.get("param", "")
# 发送回复
await self.send_reply(f"收到参数: {param}")
return True, f"处理完成: {param}"
# 在插件中注册
def get_plugin_components(self):
command_info = CommandInfo(
name="my_command",
description="我的命令",
command_pattern=r"^/mycmd\s+(?P<param>\w+)$",
command_help="用法:/mycmd <参数>"
)
return [(command_info, MyCommand)]
```
### 使用配置文件
```toml
# config.toml
[plugin]
name = "my_plugin"
enabled = true
[my_settings]
max_items = 10
default_message = "Hello World"
```
```python
class MyPlugin(BasePlugin):
config_file_name = "config.toml"
def get_plugin_components(self):
# 读取配置
max_items = self.get_config("my_settings.max_items", 5)
message = self.get_config("my_settings.default_message", "Hi")
# 使用配置创建组件...
```
## API使用示例
### 消息操作
```python
# 发送文本消息
await self.api.send_text_to_group(chat_stream, "Hello!")
# 发送图片
await self.api.send_image_to_group(chat_stream, image_path)
```
### 数据库操作
```python
# 查询数据
data = await self.api.db_get("table_name", "key")
# 保存数据
await self.api.db_set("table_name", "key", "value")
```
### LLM调用
```python
# 生成文本
response = await self.api.llm_text_request("你好,请介绍一下自己")
# 生成图片
image_url = await self.api.llm_image_request("一只可爱的猫咪")
```
## 内置组件迁移
现有的内置Action和Command将迁移到新架构
### Action迁移
- `reply_action.py``src/plugins/built_in/actions/reply_action.py`
- `emoji_action.py``src/plugins/built_in/actions/emoji_action.py`
- `no_reply_action.py``src/plugins/built_in/actions/no_reply_action.py`
### Command迁移
- 现有命令系统将封装为内置Command组件
- 保持现有的命令模式和功能
## 兼容性
新插件系统保持与现有系统的兼容性:
- 现有的Action和Command继续工作
- 提供兼容层和适配器
- 逐步迁移到新架构
## 扩展性
系统设计支持未来扩展:
- 新的组件类型Scheduler、Listener等
- 插件间依赖和通信
- 插件热重载
- 插件市场和分发
## 最佳实践
1. **单一职责**:每个组件专注于特定功能
2. **配置驱动**:通过配置文件控制行为
3. **错误处理**:妥善处理异常情况
4. **日志记录**:记录关键操作和错误
5. **测试覆盖**:为插件编写单元测试

View File

@@ -10,6 +10,7 @@ from typing import List, Tuple, Type, Optional
# 导入新插件系统
from src.plugin_system import BasePlugin, register_plugin, BaseAction, ComponentInfo, ActionActivationType, ChatMode
from src.plugin_system.base.base_command import BaseCommand
# 导入依赖的系统组件
from src.common.logger_manager import get_logger
@@ -341,19 +342,97 @@ class CoreActionsPlugin(BasePlugin):
return [
# 回复动作
(ReplyAction.get_action_info(name="reply", description="参与聊天回复,处理文本和表情的发送"), ReplyAction),
(ReplyAction.get_action_info(
name="reply",
description="参与聊天回复,处理文本和表情的发送"
), ReplyAction),
# 不回复动作
(
NoReplyAction.get_action_info(name="no_reply", description="暂时不回复消息,等待新消息或超时"),
NoReplyAction,
),
(NoReplyAction.get_action_info(
name="no_reply",
description="暂时不回复消息,等待新消息或超时"
), NoReplyAction),
# 表情动作
(EmojiAction.get_action_info(name="emoji", description="发送表情包辅助表达情绪"), EmojiAction),
(EmojiAction.get_action_info(
name="emoji",
description="发送表情包辅助表达情绪"
), EmojiAction),
# 退出专注聊天动作
(
ExitFocusChatAction.get_action_info(
name="exit_focus_chat", description="退出专注聊天,从专注模式切换到普通模式"
),
ExitFocusChatAction,
),
(ExitFocusChatAction.get_action_info(
name="exit_focus_chat",
description="退出专注聊天,从专注模式切换到普通模式"
), ExitFocusChatAction),
# 示例Command - Ping命令
(PingCommand.get_command_info(
name="ping",
description="测试机器人响应,拦截后续处理"
), PingCommand),
# 示例Command - Log命令
(LogCommand.get_command_info(
name="log",
description="记录消息到日志,不拦截后续处理"
), LogCommand)
]
# ===== 示例Command组件 =====
class PingCommand(BaseCommand):
"""Ping命令 - 测试响应,拦截消息处理"""
command_pattern = r"^/ping(\s+(?P<message>.+))?$"
command_help = "测试机器人响应 - 拦截后续处理"
command_examples = ["/ping", "/ping 测试消息"]
intercept_message = True # 拦截消息,不继续处理
async def execute(self) -> Tuple[bool, Optional[str]]:
"""执行ping命令"""
try:
message = self.matched_groups.get("message", "")
reply_text = f"🏓 Pong! {message}" if message else "🏓 Pong!"
await self.send_reply(reply_text)
return True, f"发送ping响应: {reply_text}"
except Exception as e:
logger.error(f"Ping命令执行失败: {e}")
return False, f"执行失败: {str(e)}"
class LogCommand(BaseCommand):
"""日志命令 - 记录消息但不拦截后续处理"""
command_pattern = r"^/log(\s+(?P<level>debug|info|warn|error))?$"
command_help = "记录当前消息到日志 - 不拦截后续处理"
command_examples = ["/log", "/log info", "/log debug"]
intercept_message = False # 不拦截消息,继续后续处理
async def execute(self) -> Tuple[bool, Optional[str]]:
"""执行日志命令"""
try:
level = self.matched_groups.get("level", "info")
user_nickname = self.message.message_info.user_info.user_nickname
content = self.message.processed_plain_text
log_message = f"[{level.upper()}] 用户 {user_nickname}: {content}"
# 根据级别记录日志
if level == "debug":
logger.debug(log_message)
elif level == "warn":
logger.warning(log_message)
elif level == "error":
logger.error(log_message)
else:
logger.info(log_message)
# 不发送回复,让消息继续处理
return True, f"已记录到{level}级别日志"
except Exception as e:
logger.error(f"Log命令执行失败: {e}")
return False, f"执行失败: {str(e)}"

View File

@@ -0,0 +1,74 @@
# 禁言插件配置文件
[plugin]
name = "mute_plugin"
version = "2.0.0"
enabled = true
description = "群聊禁言管理插件,提供智能禁言功能"
# 组件启用控制
[components]
enable_smart_mute = true # 启用智能禁言Action
enable_mute_command = true # 启用禁言命令Command
# 禁言配置
[mute]
# 时长限制(秒)
min_duration = 60 # 最短禁言时长
max_duration = 2592000 # 最长禁言时长30天
default_duration = 300 # 默认禁言时长5分钟
# 是否启用时长美化显示
enable_duration_formatting = true
# 是否记录禁言历史
log_mute_history = true
# 禁言消息模板
templates = [
"好的,禁言 {target} {duration},理由:{reason}",
"收到,对 {target} 执行禁言 {duration},因为{reason}",
"明白了,禁言 {target} {duration},原因是{reason}",
"✅ 已禁言 {target} {duration},理由:{reason}",
"🔇 对 {target} 执行禁言 {duration},因为{reason}",
"⛔ 禁言 {target} {duration},原因:{reason}"
]
# 错误消息模板
error_messages = [
"没有指定禁言对象呢~",
"没有指定禁言时长呢~",
"禁言时长必须是正数哦~",
"禁言时长必须是数字哦~",
"找不到 {target} 这个人呢~",
"查找用户信息时出现问题~"
]
# 智能禁言Action配置
[smart_mute]
# LLM判定严格模式
strict_mode = true
# 关键词激活设置
keyword_sensitivity = "normal" # low, normal, high
# 并行执行设置
allow_parallel = false
# 禁言命令配置
[mute_command]
# 是否需要管理员权限
require_admin = true
# 最大批量禁言数量
max_batch_size = 5
# 命令冷却时间(秒)
cooldown_seconds = 3
# 日志配置
[logging]
level = "INFO"
prefix = "[MutePlugin]"
include_user_info = true
include_duration_info = true

View File

@@ -0,0 +1,385 @@
"""
禁言插件
提供智能禁言功能的群聊管理插件。
功能特性:
- 智能LLM判定根据聊天内容智能判断是否需要禁言
- 灵活的时长管理:支持自定义禁言时长限制
- 模板化消息:支持自定义禁言提示消息
- 参数验证:完整的输入参数验证和错误处理
- 配置文件支持:所有设置可通过配置文件调整
包含组件:
- 智能禁言Action - 基于LLM判断是否需要禁言
- 禁言命令Command - 手动执行禁言操作
"""
from typing import List, Tuple, Type, Optional, Dict, Any
import random
# 导入新插件系统
from src.plugin_system.base.base_plugin import BasePlugin
from src.plugin_system.base.base_plugin import register_plugin
from src.plugin_system.base.base_action import BaseAction
from src.plugin_system.base.base_command import BaseCommand
from src.plugin_system.base.component_types import ComponentInfo, ActionActivationType, ChatMode
from src.common.logger_manager import get_logger
logger = get_logger("mute_plugin")
# ===== Action组件 =====
class MuteAction(BaseAction):
"""智能禁言Action - 基于LLM智能判断是否需要禁言"""
# Action基本信息
action_name = "mute"
action_description = "智能禁言系统基于LLM判断是否需要禁言"
# 激活设置
focus_activation_type = ActionActivationType.LLM_JUDGE # Focus模式使用LLM判定确保谨慎
normal_activation_type = ActionActivationType.KEYWORD # Normal模式使用关键词激活快速响应
# 关键词设置用于Normal模式
activation_keywords = ["禁言", "mute", "ban", "silence"]
keyword_case_sensitive = False
# LLM判定提示词用于Focus模式
llm_judge_prompt = """
判定是否需要使用禁言动作的严格条件:
使用禁言的情况:
1. 用户发送明显违规内容(色情、暴力、政治敏感等)
2. 恶意刷屏或垃圾信息轰炸
3. 用户主动明确要求被禁言("禁言我"等)
4. 严重违反群规的行为
5. 恶意攻击他人或群组管理
绝对不要使用的情况:
2. 情绪化表达但无恶意
3. 开玩笑或调侃,除非过分
4. 单纯的意见分歧或争论
"""
mode_enable = ChatMode.ALL
parallel_action = False
# Action参数定义
action_parameters = {
"target": "禁言对象,必填,输入你要禁言的对象的名字",
"duration": "禁言时长,必填,输入你要禁言的时长(秒),单位为秒,必须为数字",
"reason": "禁言理由,可选"
}
# Action使用场景
action_require = [
"当有人违反了公序良俗的内容",
"当有人刷屏时使用",
"当有人发了擦边,或者色情内容时使用",
"当有人要求禁言自己时使用",
"如果某人已经被禁言了,就不要再次禁言了,除非你想追加时间!!"
]
async def execute(self) -> Tuple[bool, Optional[str]]:
"""执行智能禁言判定"""
logger.info(f"{self.log_prefix} 执行智能禁言动作")
# 获取参数
target = self.action_data.get("target")
duration = self.action_data.get("duration")
reason = self.action_data.get("reason", "违反群规")
# 参数验证
if not target:
error_msg = "禁言目标不能为空"
logger.error(f"{self.log_prefix} {error_msg}")
await self.send_reply("没有指定禁言对象呢~")
return False, error_msg
if not duration:
error_msg = "禁言时长不能为空"
logger.error(f"{self.log_prefix} {error_msg}")
await self.send_reply("没有指定禁言时长呢~")
return False, error_msg
# 获取时长限制配置
min_duration = self.api.get_config("mute.min_duration", 60)
max_duration = self.api.get_config("mute.max_duration", 2592000)
# 验证时长格式并转换
try:
duration_int = int(duration)
if duration_int <= 0:
error_msg = "禁言时长必须大于0"
logger.error(f"{self.log_prefix} {error_msg}")
await self.send_reply("禁言时长必须是正数哦~")
return False, error_msg
# 限制禁言时长范围
if duration_int < min_duration:
duration_int = min_duration
logger.info(f"{self.log_prefix} 禁言时长过短,调整为{min_duration}")
elif duration_int > max_duration:
duration_int = max_duration
logger.info(f"{self.log_prefix} 禁言时长过长,调整为{max_duration}")
except (ValueError, TypeError):
error_msg = f"禁言时长格式无效: {duration}"
logger.error(f"{self.log_prefix} {error_msg}")
await self.send_reply("禁言时长必须是数字哦~")
return False, error_msg
# 获取用户ID
try:
platform, user_id = await self.api.get_user_id_by_person_name(target)
except Exception as e:
error_msg = f"查找用户ID时出错: {e}"
logger.error(f"{self.log_prefix} {error_msg}")
await self.send_reply("查找用户信息时出现问题~")
return False, error_msg
if not user_id:
error_msg = f"未找到用户 {target} 的ID"
await self.send_reply(f"找不到 {target} 这个人呢~")
logger.error(f"{self.log_prefix} {error_msg}")
return False, error_msg
# 格式化时长显示
enable_formatting = self.api.get_config("mute.enable_duration_formatting", True)
time_str = self._format_duration(duration_int) if enable_formatting else f"{duration_int}"
# 获取模板化消息
message = self._get_template_message(target, time_str, reason)
await self.send_reply(message)
# 发送群聊禁言命令
success = await self.send_command(
command_name="GROUP_BAN",
args={"qq_id": str(user_id), "duration": str(duration_int)},
display_message=f"禁言了 {target} {time_str}"
)
if success:
logger.info(f"{self.log_prefix} 成功发送禁言命令,用户 {target}({user_id}),时长 {duration_int}")
return True, f"成功禁言 {target},时长 {time_str}"
else:
error_msg = "发送禁言命令失败"
logger.error(f"{self.log_prefix} {error_msg}")
await self.send_reply(f"执行禁言动作失败")
return False, error_msg
def _get_template_message(self, target: str, duration_str: str, reason: str) -> str:
"""获取模板化的禁言消息"""
templates = self.api.get_config("mute.templates", [
"好的,禁言 {target} {duration},理由:{reason}",
"收到,对 {target} 执行禁言 {duration},因为{reason}",
"明白了,禁言 {target} {duration},原因是{reason}"
])
template = random.choice(templates)
return template.format(target=target, duration=duration_str, reason=reason)
def _format_duration(self, seconds: int) -> str:
"""将秒数格式化为可读的时间字符串"""
if seconds < 60:
return f"{seconds}"
elif seconds < 3600:
minutes = seconds // 60
remaining_seconds = seconds % 60
if remaining_seconds > 0:
return f"{minutes}{remaining_seconds}"
else:
return f"{minutes}分钟"
elif seconds < 86400:
hours = seconds // 3600
remaining_minutes = (seconds % 3600) // 60
if remaining_minutes > 0:
return f"{hours}小时{remaining_minutes}分钟"
else:
return f"{hours}小时"
else:
days = seconds // 86400
remaining_hours = (seconds % 86400) // 3600
if remaining_hours > 0:
return f"{days}{remaining_hours}小时"
else:
return f"{days}"
# ===== Command组件 =====
class MuteCommand(BaseCommand):
"""禁言命令 - 手动执行禁言操作"""
# Command基本信息
command_name = "mute_command"
command_description = "禁言命令,手动执行禁言操作"
command_pattern = r"^/mute\s+(?P<target>\S+)\s+(?P<duration>\d+)(?:\s+(?P<reason>.+))?$"
command_help = "禁言指定用户,用法:/mute <用户名> <时长(秒)> [理由]"
command_examples = [
"/mute 用户名 300",
"/mute 张三 600 刷屏",
"/mute @某人 1800 违规内容"
]
intercept_message = True # 拦截消息处理
async def execute(self) -> Tuple[bool, Optional[str]]:
"""执行禁言命令"""
try:
target = self.matched_groups.get("target")
duration = self.matched_groups.get("duration")
reason = self.matched_groups.get("reason", "管理员操作")
if not all([target, duration]):
await self.send_reply("❌ 命令参数不完整,请检查格式")
return False, "参数不完整"
# 获取时长限制配置
min_duration = self.api.get_config("mute.min_duration", 60)
max_duration = self.api.get_config("mute.max_duration", 2592000)
# 验证时长
try:
duration_int = int(duration)
if duration_int <= 0:
await self.send_reply("❌ 禁言时长必须大于0")
return False, "时长无效"
# 限制禁言时长范围
if duration_int < min_duration:
duration_int = min_duration
await self.send_reply(f"⚠️ 禁言时长过短,调整为{min_duration}")
elif duration_int > max_duration:
duration_int = max_duration
await self.send_reply(f"⚠️ 禁言时长过长,调整为{max_duration}")
except ValueError:
await self.send_reply("❌ 禁言时长必须是数字")
return False, "时长格式错误"
# 获取用户ID
try:
platform, user_id = await self.api.get_user_id_by_person_name(target)
except Exception as e:
logger.error(f"{self.log_prefix} 查找用户ID时出错: {e}")
await self.send_reply("❌ 查找用户信息时出现问题")
return False, str(e)
if not user_id:
await self.send_reply(f"❌ 找不到用户: {target}")
return False, "用户不存在"
# 格式化时长显示
enable_formatting = self.api.get_config("mute.enable_duration_formatting", True)
time_str = self._format_duration(duration_int) if enable_formatting else f"{duration_int}"
logger.info(f"{self.log_prefix} 执行禁言命令: {target}({user_id}) -> {time_str}")
# 发送群聊禁言命令
success = await self.send_command(
command_name="GROUP_BAN",
args={"qq_id": str(user_id), "duration": str(duration_int)},
display_message=f"禁言了 {target} {time_str}"
)
if success:
# 获取并发送模板化消息
message = self._get_template_message(target, time_str, reason)
await self.send_reply(message)
logger.info(f"{self.log_prefix} 成功禁言 {target}({user_id}),时长 {duration_int}")
return True, f"成功禁言 {target},时长 {time_str}"
else:
await self.send_reply("❌ 发送禁言命令失败")
return False, "发送禁言命令失败"
except Exception as e:
logger.error(f"{self.log_prefix} 禁言命令执行失败: {e}")
await self.send_reply(f"❌ 禁言命令错误: {str(e)}")
return False, str(e)
def _get_template_message(self, target: str, duration_str: str, reason: str) -> str:
"""获取模板化的禁言消息"""
templates = self.api.get_config("mute.templates", [
"✅ 已禁言 {target} {duration},理由:{reason}",
"🔇 对 {target} 执行禁言 {duration},因为{reason}",
"⛔ 禁言 {target} {duration},原因:{reason}"
])
template = random.choice(templates)
return template.format(target=target, duration=duration_str, reason=reason)
def _format_duration(self, seconds: int) -> str:
"""将秒数格式化为可读的时间字符串"""
if seconds < 60:
return f"{seconds}"
elif seconds < 3600:
minutes = seconds // 60
remaining_seconds = seconds % 60
if remaining_seconds > 0:
return f"{minutes}{remaining_seconds}"
else:
return f"{minutes}分钟"
elif seconds < 86400:
hours = seconds // 3600
remaining_minutes = (seconds % 3600) // 60
if remaining_minutes > 0:
return f"{hours}小时{remaining_minutes}分钟"
else:
return f"{hours}小时"
else:
days = seconds // 86400
remaining_hours = (seconds % 86400) // 3600
if remaining_hours > 0:
return f"{days}{remaining_hours}小时"
else:
return f"{days}"
# ===== 插件主类 =====
@register_plugin
class MutePlugin(BasePlugin):
"""禁言插件
提供智能禁言功能:
- 智能禁言Action基于LLM判断是否需要禁言
- 禁言命令Command手动执行禁言操作
"""
# 插件基本信息
plugin_name = "mute_plugin"
plugin_description = "群聊禁言管理插件,提供智能禁言功能"
plugin_version = "2.0.0"
plugin_author = "MaiBot开发团队"
enable_plugin = True
config_file_name = "config.toml"
def get_plugin_components(self) -> List[Tuple[ComponentInfo, Type]]:
"""返回插件包含的组件列表"""
# 从配置获取组件启用状态
enable_smart_mute = self.get_config("components.enable_smart_mute", True)
enable_mute_command = self.get_config("components.enable_mute_command", True)
components = []
# 添加智能禁言Action
if enable_smart_mute:
components.append((
MuteAction.get_action_info(),
MuteAction
))
# 添加禁言命令Command
if enable_mute_command:
components.append((
MuteCommand.get_command_info(),
MuteCommand
))
return components

View File

@@ -1,14 +0,0 @@
"""示例命令插件包
这是一个演示如何使用命令系统的示例插件。
功能特性:
- 提供简单的命令示例
- 演示命令参数提取
- 展示命令帮助信息
使用场景:
- 用户输入特定格式的命令时触发
- 通过命令前缀(如/)快速执行特定功能
- 提供快速响应的交互方式
"""

View File

@@ -1,4 +0,0 @@
"""示例命令包
包含示例命令的实现
"""

View File

@@ -1,59 +0,0 @@
from src.common.logger_manager import get_logger
from src.chat.command.command_handler import BaseCommand, register_command
from typing import Tuple, Optional
import random
logger = get_logger("custom_prefix_command")
@register_command
class DiceCommand(BaseCommand):
"""骰子命令,使用!前缀而不是/前缀"""
command_name = "dice"
command_description = "骰子命令随机生成1-6的数字"
command_pattern = r"^[!](?:dice|骰子)(?:\s+(?P<count>\d+))?$" # 匹配 !dice 或 !骰子,可选参数为骰子数量
command_help = "使用方法: !dice [数量] 或 !骰子 [数量] - 掷骰子默认掷1个"
command_examples = ["!dice", "!骰子", "!dice 3", "!骰子 5"]
enable_command = True
async def execute(self) -> Tuple[bool, Optional[str]]:
"""执行骰子命令
Returns:
Tuple[bool, Optional[str]]: (是否执行成功, 回复消息)
"""
try:
# 获取骰子数量默认为1
count_str = self.matched_groups.get("count")
# 确保count_str不为None
if count_str is None:
count = 1 # 默认值
else:
try:
count = int(count_str)
if count <= 0:
return False, "骰子数量必须大于0"
if count > 10: # 限制最大数量
return False, "一次最多只能掷10个骰子"
except ValueError:
return False, "骰子数量必须是整数"
# 生成随机数
results = [random.randint(1, 6) for _ in range(count)]
# 构建回复消息
if count == 1:
message = f"🎲 掷出了 {results[0]}"
else:
dice_results = ", ".join(map(str, results))
total = sum(results)
message = f"🎲 掷出了 {count} 个骰子: [{dice_results}],总点数: {total}"
logger.info(f"{self.log_prefix} 执行骰子命令: {message}")
return True, message
except Exception as e:
logger.error(f"{self.log_prefix} 执行骰子命令时出错: {e}")
return False, f"执行命令时出错: {str(e)}"

View File

@@ -1,111 +0,0 @@
from src.common.logger_manager import get_logger
from src.chat.command.command_handler import BaseCommand, register_command, _COMMAND_REGISTRY
from typing import Tuple, Optional
logger = get_logger("help_command")
@register_command
class HelpCommand(BaseCommand):
"""帮助命令,显示所有可用命令的帮助信息"""
command_name = "help"
command_description = "显示所有可用命令的帮助信息"
command_pattern = r"^/help(?:\s+(?P<command>\w+))?$" # 匹配 /help 或 /help 命令名
command_help = "使用方法: /help [命令名] - 显示所有命令或特定命令的帮助信息"
command_examples = ["/help", "/help echo"]
enable_command = True
async def execute(self) -> Tuple[bool, Optional[str]]:
"""执行帮助命令
Returns:
Tuple[bool, Optional[str]]: (是否执行成功, 回复消息)
"""
try:
# 获取匹配到的命令名(如果有)
command_name = self.matched_groups.get("command")
# 如果指定了命令名,显示该命令的详细帮助
if command_name:
logger.info(f"{self.log_prefix} 查询命令帮助: {command_name}")
return self._show_command_help(command_name)
# 否则,显示所有命令的简要帮助
logger.info(f"{self.log_prefix} 查询所有命令帮助")
return self._show_all_commands()
except Exception as e:
logger.error(f"{self.log_prefix} 执行帮助命令时出错: {e}")
return False, f"执行命令时出错: {str(e)}"
def _show_command_help(self, command_name: str) -> Tuple[bool, str]:
"""显示特定命令的详细帮助信息
Args:
command_name: 命令名称
Returns:
Tuple[bool, str]: (是否执行成功, 回复消息)
"""
# 查找命令
command_cls = _COMMAND_REGISTRY.get(command_name)
if not command_cls:
return False, f"未找到命令: {command_name}"
# 获取命令信息
description = getattr(command_cls, "command_description", "无描述")
help_text = getattr(command_cls, "command_help", "无帮助信息")
examples = getattr(command_cls, "command_examples", [])
# 构建帮助信息
help_info = [f"【命令】: {command_name}", f"【描述】: {description}", f"【用法】: {help_text}"]
# 添加示例
if examples:
help_info.append("【示例】:")
for example in examples:
help_info.append(f" {example}")
return True, "\n".join(help_info)
def _show_all_commands(self) -> Tuple[bool, str]:
"""显示所有可用命令的简要帮助信息
Returns:
Tuple[bool, str]: (是否执行成功, 回复消息)
"""
# 获取所有已启用的命令
enabled_commands = {
name: cls for name, cls in _COMMAND_REGISTRY.items() if getattr(cls, "enable_command", True)
}
if not enabled_commands:
return True, "当前没有可用的命令"
# 构建命令列表
command_list = ["可用命令列表:"]
for name, cls in sorted(enabled_commands.items()):
description = getattr(cls, "command_description", "无描述")
# 获取命令前缀示例
examples = getattr(cls, "command_examples", [])
prefix = ""
if examples and len(examples) > 0:
# 从第一个示例中提取前缀
example = examples[0]
# 找到第一个空格前的内容作为前缀
space_pos = example.find(" ")
if space_pos > 0:
prefix = example[:space_pos]
else:
prefix = example
else:
# 默认使用/name作为前缀
prefix = f"/{name}"
command_list.append(f"{prefix} - {description}")
command_list.append("\n使用 /help <命令名> 获取特定命令的详细帮助")
return True, "\n".join(command_list)

View File

@@ -1,311 +0,0 @@
from src.common.logger_manager import get_logger
from src.chat.command.command_handler import BaseCommand, register_command
from typing import Tuple, Optional
logger = get_logger("message_info_command")
@register_command
class MessageInfoCommand(BaseCommand):
"""消息信息查看命令,展示发送命令的原始消息和相关信息"""
command_name = "msginfo"
command_description = "查看发送命令的原始消息信息"
command_pattern = r"^/msginfo(?:\s+(?P<detail>full|simple))?$"
command_help = "使用方法: /msginfo [full|simple] - 查看当前消息的详细信息"
command_examples = ["/msginfo", "/msginfo full", "/msginfo simple"]
enable_command = True
async def execute(self) -> Tuple[bool, Optional[str]]:
"""执行消息信息查看命令"""
try:
detail_level = self.matched_groups.get("detail", "simple")
logger.info(f"{self.log_prefix} 查看消息信息,详细级别: {detail_level}")
if detail_level == "full":
info_text = self._get_full_message_info()
else:
info_text = self._get_simple_message_info()
return True, info_text
except Exception as e:
logger.error(f"{self.log_prefix} 获取消息信息时出错: {e}")
return False, f"获取消息信息失败: {str(e)}"
def _get_simple_message_info(self) -> str:
"""获取简化的消息信息"""
message = self.message
# 基础信息
info_lines = [
"📨 消息信息概览",
f"🆔 消息ID: {message.message_info.message_id}",
f"⏰ 时间: {message.message_info.time}",
f"🌐 平台: {message.message_info.platform}",
]
# 发送者信息
user = message.message_info.user_info
info_lines.extend(
[
"",
"👤 发送者信息:",
f" 用户ID: {user.user_id}",
f" 昵称: {user.user_nickname}",
f" 群名片: {user.user_cardname or ''}",
]
)
# 群聊信息(如果是群聊)
if message.message_info.group_info:
group = message.message_info.group_info
info_lines.extend(
[
"",
"👥 群聊信息:",
f" 群ID: {group.group_id}",
f" 群名: {group.group_name or '未知'}",
]
)
else:
info_lines.extend(
[
"",
"💬 消息类型: 私聊消息",
]
)
# 消息内容
info_lines.extend(
[
"",
"📝 消息内容:",
f" 原始文本: {message.processed_plain_text}",
f" 是否表情: {'' if getattr(message, 'is_emoji', False) else ''}",
]
)
# 聊天流信息
if hasattr(message, "chat_stream") and message.chat_stream:
chat_stream = message.chat_stream
info_lines.extend(
[
"",
"🔄 聊天流信息:",
f" 流ID: {chat_stream.stream_id}",
]
)
return "\n".join(info_lines)
def _get_full_message_info(self) -> str:
"""获取完整的消息信息(包含技术细节)"""
message = self.message
info_lines = [
"📨 完整消息信息",
"=" * 40,
]
# 消息基础信息
info_lines.extend(
[
"",
"🔍 基础消息信息:",
f" 消息ID: {message.message_info.message_id}",
f" 时间戳: {message.message_info.time}",
f" 平台: {message.message_info.platform}",
f" 处理后文本: {message.processed_plain_text}",
f" 详细文本: {message.detailed_plain_text[:100]}{'...' if len(message.detailed_plain_text) > 100 else ''}",
]
)
# 用户详细信息
user = message.message_info.user_info
info_lines.extend(
[
"",
"👤 发送者详细信息:",
f" 用户ID: {user.user_id}",
f" 昵称: {user.user_nickname}",
f" 群名片: {user.user_cardname or ''}",
f" 平台: {user.platform}",
]
)
# 群聊详细信息
if message.message_info.group_info:
group = message.message_info.group_info
info_lines.extend(
[
"",
"👥 群聊详细信息:",
f" 群ID: {group.group_id}",
f" 群名: {group.group_name or '未知'}",
f" 平台: {group.platform}",
]
)
else:
info_lines.append("\n💬 消息类型: 私聊消息")
# 消息段信息
if message.message_segment:
info_lines.extend(
[
"",
"📦 消息段信息:",
f" 类型: {message.message_segment.type}",
f" 数据类型: {type(message.message_segment.data).__name__}",
f" 数据预览: {str(message.message_segment.data)[:200]}{'...' if len(str(message.message_segment.data)) > 200 else ''}",
]
)
# 聊天流详细信息
if hasattr(message, "chat_stream") and message.chat_stream:
chat_stream = message.chat_stream
info_lines.extend(
[
"",
"🔄 聊天流详细信息:",
f" 流ID: {chat_stream.stream_id}",
f" 平台: {chat_stream.platform}",
f" 用户信息: {chat_stream.user_info.user_nickname} ({chat_stream.user_info.user_id})",
f" 群信息: {getattr(chat_stream.group_info, 'group_name', '私聊') if chat_stream.group_info else '私聊'}",
]
)
# 回复信息
if hasattr(message, "reply") and message.reply:
info_lines.extend(
[
"",
"↩️ 回复信息:",
f" 回复消息ID: {message.reply.message_info.message_id}",
f" 回复内容: {message.reply.processed_plain_text[:100]}{'...' if len(message.reply.processed_plain_text) > 100 else ''}",
]
)
# 原始消息数据(如果存在)
if hasattr(message, "raw_message") and message.raw_message:
info_lines.extend(
[
"",
"🗂️ 原始消息数据:",
f" 数据类型: {type(message.raw_message).__name__}",
f" 数据大小: {len(str(message.raw_message))} 字符",
]
)
return "\n".join(info_lines)
@register_command
class SenderInfoCommand(BaseCommand):
"""发送者信息命令,快速查看发送者信息"""
command_name = "whoami"
command_description = "查看发送命令的用户信息"
command_pattern = r"^/whoami$"
command_help = "使用方法: /whoami - 查看你的用户信息"
command_examples = ["/whoami"]
enable_command = True
async def execute(self) -> Tuple[bool, Optional[str]]:
"""执行发送者信息查看命令"""
try:
user = self.message.message_info.user_info
group = self.message.message_info.group_info
info_lines = [
"👤 你的身份信息",
f"🆔 用户ID: {user.user_id}",
f"📝 昵称: {user.user_nickname}",
f"🏷️ 群名片: {user.user_cardname or ''}",
f"🌐 平台: {user.platform}",
]
if group:
info_lines.extend(
[
"",
"👥 当前群聊:",
f"🆔 群ID: {group.group_id}",
f"📝 群名: {group.group_name or '未知'}",
]
)
else:
info_lines.append("\n💬 当前在私聊中")
return True, "\n".join(info_lines)
except Exception as e:
logger.error(f"{self.log_prefix} 获取发送者信息时出错: {e}")
return False, f"获取发送者信息失败: {str(e)}"
@register_command
class ChatStreamInfoCommand(BaseCommand):
"""聊天流信息命令"""
command_name = "streaminfo"
command_description = "查看当前聊天流的详细信息"
command_pattern = r"^/streaminfo$"
command_help = "使用方法: /streaminfo - 查看当前聊天流信息"
command_examples = ["/streaminfo"]
enable_command = True
async def execute(self) -> Tuple[bool, Optional[str]]:
"""执行聊天流信息查看命令"""
try:
if not hasattr(self.message, "chat_stream") or not self.message.chat_stream:
return False, "无法获取聊天流信息"
chat_stream = self.message.chat_stream
info_lines = [
"🔄 聊天流信息",
f"🆔 流ID: {chat_stream.stream_id}",
f"🌐 平台: {chat_stream.platform}",
]
# 用户信息
if chat_stream.user_info:
info_lines.extend(
[
"",
"👤 关联用户:",
f" ID: {chat_stream.user_info.user_id}",
f" 昵称: {chat_stream.user_info.user_nickname}",
]
)
# 群信息
if chat_stream.group_info:
info_lines.extend(
[
"",
"👥 关联群聊:",
f" 群ID: {chat_stream.group_info.group_id}",
f" 群名: {chat_stream.group_info.group_name or '未知'}",
]
)
else:
info_lines.append("\n💬 类型: 私聊流")
# 最近消息统计
if hasattr(chat_stream, "last_messages"):
msg_count = len(chat_stream.last_messages)
info_lines.extend(
[
"",
f"📈 消息统计: 记录了 {msg_count} 条最近消息",
]
)
return True, "\n".join(info_lines)
except Exception as e:
logger.error(f"{self.log_prefix} 获取聊天流信息时出错: {e}")
return False, f"获取聊天流信息失败: {str(e)}"

View File

@@ -1,117 +0,0 @@
from src.common.logger_manager import get_logger
from src.chat.command.command_handler import BaseCommand, register_command
from src.chat.actions.plugin_api.message_api import MessageAPI
from typing import Tuple, Optional
logger = get_logger("send_msg_command")
@register_command
class SendMessageCommand(BaseCommand, MessageAPI):
"""发送消息命令,可以向指定群聊或私聊发送消息"""
command_name = "send"
command_description = "向指定群聊或私聊发送消息"
command_pattern = r"^/send\s+(?P<target_type>group|user)\s+(?P<target_id>\d+)\s+(?P<content>.+)$"
command_help = "使用方法: /send <group|user> <ID> <消息内容> - 发送消息到指定群聊或用户"
command_examples = ["/send group 123456789 大家好!", "/send user 987654321 私聊消息"]
enable_command = True
def __init__(self, message):
super().__init__(message)
# 初始化MessageAPI需要的服务虽然这里不会用到但保持一致性
self._services = {}
self.log_prefix = f"[Command:{self.command_name}]"
async def execute(self) -> Tuple[bool, Optional[str]]:
"""执行发送消息命令
Returns:
Tuple[bool, Optional[str]]: (是否执行成功, 回复消息)
"""
try:
# 获取匹配到的参数
target_type = self.matched_groups.get("target_type") # group 或 user
target_id = self.matched_groups.get("target_id") # 群ID或用户ID
content = self.matched_groups.get("content") # 消息内容
if not all([target_type, target_id, content]):
return False, "命令参数不完整,请检查格式"
logger.info(f"{self.log_prefix} 执行发送消息命令: {target_type}:{target_id} -> {content[:50]}...")
# 根据目标类型调用不同的发送方法
if target_type == "group":
success = await self._send_to_group(target_id, content)
target_desc = f"群聊 {target_id}"
elif target_type == "user":
success = await self._send_to_user(target_id, content)
target_desc = f"用户 {target_id}"
else:
return False, f"不支持的目标类型: {target_type},只支持 group 或 user"
# 返回执行结果
if success:
return True, f"✅ 消息已成功发送到 {target_desc}"
else:
return False, f"❌ 消息发送失败,可能是目标 {target_desc} 不存在或没有权限"
except Exception as e:
logger.error(f"{self.log_prefix} 执行发送消息命令时出错: {e}")
return False, f"命令执行出错: {str(e)}"
async def _send_to_group(self, group_id: str, content: str) -> bool:
"""发送消息到群聊
Args:
group_id: 群聊ID
content: 消息内容
Returns:
bool: 是否发送成功
"""
try:
success = await self.send_text_to_group(
text=content,
group_id=group_id,
platform="qq", # 默认使用QQ平台
)
if success:
logger.info(f"{self.log_prefix} 成功发送消息到群聊 {group_id}")
else:
logger.warning(f"{self.log_prefix} 发送消息到群聊 {group_id} 失败")
return success
except Exception as e:
logger.error(f"{self.log_prefix} 发送群聊消息时出错: {e}")
return False
async def _send_to_user(self, user_id: str, content: str) -> bool:
"""发送消息到私聊
Args:
user_id: 用户ID
content: 消息内容
Returns:
bool: 是否发送成功
"""
try:
success = await self.send_text_to_user(
text=content,
user_id=user_id,
platform="qq", # 默认使用QQ平台
)
if success:
logger.info(f"{self.log_prefix} 成功发送消息到用户 {user_id}")
else:
logger.warning(f"{self.log_prefix} 发送消息到用户 {user_id} 失败")
return success
except Exception as e:
logger.error(f"{self.log_prefix} 发送私聊消息时出错: {e}")
return False

View File

@@ -1,149 +0,0 @@
from src.common.logger_manager import get_logger
from src.chat.command.command_handler import BaseCommand, register_command
from src.chat.actions.plugin_api.message_api import MessageAPI
from typing import Tuple, Optional
logger = get_logger("send_msg_enhanced")
@register_command
class SendMessageEnhancedCommand(BaseCommand, MessageAPI):
"""增强版发送消息命令,支持多种消息类型和平台"""
command_name = "sendfull"
command_description = "增强版消息发送命令,支持多种类型和平台"
command_pattern = r"^/sendfull\s+(?P<msg_type>text|image|emoji)\s+(?P<target_type>group|user)\s+(?P<target_id>\d+)(?:\s+(?P<platform>\w+))?\s+(?P<content>.+)$"
command_help = "使用方法: /sendfull <消息类型> <目标类型> <ID> [平台] <内容>"
command_examples = [
"/sendfull text group 123456789 qq 大家好!这是文本消息",
"/sendfull image user 987654321 https://example.com/image.jpg",
"/sendfull emoji group 123456789 😄",
"/sendfull text user 987654321 qq 私聊消息",
]
enable_command = True
def __init__(self, message):
super().__init__(message)
self._services = {}
self.log_prefix = f"[Command:{self.command_name}]"
async def execute(self) -> Tuple[bool, Optional[str]]:
"""执行增强版发送消息命令"""
try:
# 获取匹配参数
msg_type = self.matched_groups.get("msg_type") # 消息类型: text/image/emoji
target_type = self.matched_groups.get("target_type") # 目标类型: group/user
target_id = self.matched_groups.get("target_id") # 目标ID
platform = self.matched_groups.get("platform") or "qq" # 平台默认qq
content = self.matched_groups.get("content") # 内容
if not all([msg_type, target_type, target_id, content]):
return False, "命令参数不完整,请检查格式"
# 验证消息类型
valid_types = ["text", "image", "emoji"]
if msg_type not in valid_types:
return False, f"不支持的消息类型: {msg_type},支持的类型: {', '.join(valid_types)}"
# 验证目标类型
if target_type not in ["group", "user"]:
return False, "目标类型只能是 group 或 user"
logger.info(f"{self.log_prefix} 执行发送命令: {msg_type} -> {target_type}:{target_id} (平台:{platform})")
# 根据消息类型和目标类型发送消息
is_group = target_type == "group"
success = await self.send_message_to_target(
message_type=msg_type, content=content, platform=platform, target_id=target_id, is_group=is_group
)
# 构建结果消息
target_desc = f"{'群聊' if is_group else '用户'} {target_id} (平台: {platform})"
msg_type_desc = {"text": "文本", "image": "图片", "emoji": "表情"}.get(msg_type, msg_type)
if success:
return True, f"{msg_type_desc}消息已成功发送到 {target_desc}"
else:
return False, f"{msg_type_desc}消息发送失败,可能是目标 {target_desc} 不存在或没有权限"
except Exception as e:
logger.error(f"{self.log_prefix} 执行增强发送命令时出错: {e}")
return False, f"命令执行出错: {str(e)}"
@register_command
class SendQuickCommand(BaseCommand, MessageAPI):
"""快速发送文本消息命令"""
command_name = "msg"
command_description = "快速发送文本消息到群聊"
command_pattern = r"^/msg\s+(?P<group_id>\d+)\s+(?P<content>.+)$"
command_help = "使用方法: /msg <群ID> <消息内容> - 快速发送文本到指定群聊"
command_examples = ["/msg 123456789 大家好!", "/msg 987654321 这是一条快速消息"]
enable_command = True
def __init__(self, message):
super().__init__(message)
self._services = {}
self.log_prefix = f"[Command:{self.command_name}]"
async def execute(self) -> Tuple[bool, Optional[str]]:
"""执行快速发送消息命令"""
try:
group_id = self.matched_groups.get("group_id")
content = self.matched_groups.get("content")
if not all([group_id, content]):
return False, "命令参数不完整"
logger.info(f"{self.log_prefix} 快速发送到群 {group_id}: {content[:50]}...")
success = await self.send_text_to_group(text=content, group_id=group_id, platform="qq")
if success:
return True, f"✅ 消息已发送到群 {group_id}"
else:
return False, f"❌ 发送到群 {group_id} 失败"
except Exception as e:
logger.error(f"{self.log_prefix} 快速发送命令出错: {e}")
return False, f"发送失败: {str(e)}"
@register_command
class SendPrivateCommand(BaseCommand, MessageAPI):
"""发送私聊消息命令"""
command_name = "pm"
command_description = "发送私聊消息到指定用户"
command_pattern = r"^/pm\s+(?P<user_id>\d+)\s+(?P<content>.+)$"
command_help = "使用方法: /pm <用户ID> <消息内容> - 发送私聊消息"
command_examples = ["/pm 123456789 你好!", "/pm 987654321 这是私聊消息"]
enable_command = True
def __init__(self, message):
super().__init__(message)
self._services = {}
self.log_prefix = f"[Command:{self.command_name}]"
async def execute(self) -> Tuple[bool, Optional[str]]:
"""执行私聊发送命令"""
try:
user_id = self.matched_groups.get("user_id")
content = self.matched_groups.get("content")
if not all([user_id, content]):
return False, "命令参数不完整"
logger.info(f"{self.log_prefix} 发送私聊到用户 {user_id}: {content[:50]}...")
success = await self.send_text_to_user(text=content, user_id=user_id, platform="qq")
if success:
return True, f"✅ 私聊消息已发送到用户 {user_id}"
else:
return False, f"❌ 发送私聊到用户 {user_id} 失败"
except Exception as e:
logger.error(f"{self.log_prefix} 私聊发送命令出错: {e}")
return False, f"私聊发送失败: {str(e)}"

View File

@@ -1,240 +0,0 @@
from src.common.logger_manager import get_logger
from src.chat.command.command_handler import BaseCommand, register_command
from src.chat.actions.plugin_api.message_api import MessageAPI
from typing import Tuple, Optional
import time
logger = get_logger("send_msg_with_context")
@register_command
class ContextAwareSendCommand(BaseCommand, MessageAPI):
"""上下文感知的发送消息命令,展示如何利用原始消息信息"""
command_name = "csend"
command_description = "带上下文感知的发送消息命令"
command_pattern = (
r"^/csend\s+(?P<target_type>group|user|here|reply)\s+(?P<target_id_or_content>.*?)(?:\s+(?P<content>.*))?$"
)
command_help = "使用方法: /csend <target_type> <参数> [内容]"
command_examples = [
"/csend group 123456789 大家好!",
"/csend user 987654321 私聊消息",
"/csend here 在当前聊天发送",
"/csend reply 回复当前群/私聊",
]
enable_command = True
# 管理员用户ID列表示例
ADMIN_USERS = ["123456789", "987654321"] # 可以从配置文件读取
def __init__(self, message):
super().__init__(message)
self._services = {}
self.log_prefix = f"[Command:{self.command_name}]"
async def execute(self) -> Tuple[bool, Optional[str]]:
"""执行上下文感知的发送命令"""
try:
# 获取命令发送者信息
sender = self.message.message_info.user_info
current_group = self.message.message_info.group_info
# 权限检查
if not self._check_permission(sender.user_id):
return False, f"❌ 权限不足,只有管理员可以使用此命令\n你的ID: {sender.user_id}"
# 解析命令参数
target_type = self.matched_groups.get("target_type")
target_id_or_content = self.matched_groups.get("target_id_or_content", "")
content = self.matched_groups.get("content", "")
# 根据目标类型处理不同情况
if target_type == "here":
# 发送到当前聊天
return await self._send_to_current_chat(target_id_or_content, sender, current_group)
elif target_type == "reply":
# 回复到当前聊天,带发送者信息
return await self._send_reply_with_context(target_id_or_content, sender, current_group)
elif target_type in ["group", "user"]:
# 发送到指定目标
if not content:
return False, "指定群聊或用户时需要提供消息内容"
return await self._send_to_target(target_type, target_id_or_content, content, sender)
else:
return False, f"不支持的目标类型: {target_type}"
except Exception as e:
logger.error(f"{self.log_prefix} 执行上下文感知发送命令时出错: {e}")
return False, f"命令执行出错: {str(e)}"
def _check_permission(self, user_id: str) -> bool:
"""检查用户权限"""
return user_id in self.ADMIN_USERS
async def _send_to_current_chat(self, content: str, sender, current_group) -> Tuple[bool, str]:
"""发送到当前聊天"""
if not content:
return False, "消息内容不能为空"
# 构建带发送者信息的消息
timestamp = time.strftime("%H:%M:%S", time.localtime())
if current_group:
# 群聊
formatted_content = f"[管理员转发 {timestamp}] {sender.user_nickname}({sender.user_id}): {content}"
success = await self.send_text_to_group(
text=formatted_content, group_id=current_group.group_id, platform="qq"
)
target_desc = f"当前群聊 {current_group.group_name}({current_group.group_id})"
else:
# 私聊
formatted_content = f"[管理员消息 {timestamp}]: {content}"
success = await self.send_text_to_user(text=formatted_content, user_id=sender.user_id, platform="qq")
target_desc = "当前私聊"
if success:
return True, f"✅ 消息已发送到{target_desc}"
else:
return False, f"❌ 发送到{target_desc}失败"
async def _send_reply_with_context(self, content: str, sender, current_group) -> Tuple[bool, str]:
"""发送回复,带完整上下文信息"""
if not content:
return False, "回复内容不能为空"
# 获取当前时间和环境信息
timestamp = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
# 构建上下文信息
context_info = [
f"📢 管理员回复 [{timestamp}]",
f"👤 发送者: {sender.user_nickname}({sender.user_id})",
]
if current_group:
context_info.append(f"👥 当前群聊: {current_group.group_name}({current_group.group_id})")
target_desc = f"群聊 {current_group.group_name}"
else:
context_info.append("💬 当前环境: 私聊")
target_desc = "私聊"
context_info.extend([f"📝 回复内容: {content}", "" * 30])
formatted_content = "\n".join(context_info)
# 发送消息
if current_group:
success = await self.send_text_to_group(
text=formatted_content, group_id=current_group.group_id, platform="qq"
)
else:
success = await self.send_text_to_user(text=formatted_content, user_id=sender.user_id, platform="qq")
if success:
return True, f"✅ 带上下文的回复已发送到{target_desc}"
else:
return False, f"❌ 发送上下文回复到{target_desc}失败"
async def _send_to_target(self, target_type: str, target_id: str, content: str, sender) -> Tuple[bool, str]:
"""发送到指定目标,带发送者追踪信息"""
timestamp = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
# 构建带追踪信息的消息
tracking_info = f"[管理转发 {timestamp}] 来自 {sender.user_nickname}({sender.user_id})"
formatted_content = f"{tracking_info}\n{content}"
if target_type == "group":
success = await self.send_text_to_group(text=formatted_content, group_id=target_id, platform="qq")
target_desc = f"群聊 {target_id}"
else: # user
success = await self.send_text_to_user(text=formatted_content, user_id=target_id, platform="qq")
target_desc = f"用户 {target_id}"
if success:
return True, f"✅ 带追踪信息的消息已发送到{target_desc}"
else:
return False, f"❌ 发送到{target_desc}失败"
@register_command
class MessageContextCommand(BaseCommand):
"""消息上下文命令,展示如何获取和利用上下文信息"""
command_name = "context"
command_description = "显示当前消息的完整上下文信息"
command_pattern = r"^/context$"
command_help = "使用方法: /context - 显示当前环境的上下文信息"
command_examples = ["/context"]
enable_command = True
async def execute(self) -> Tuple[bool, Optional[str]]:
"""显示上下文信息"""
try:
message = self.message
user = message.message_info.user_info
group = message.message_info.group_info
# 构建上下文信息
context_lines = [
"🌐 当前上下文信息",
"=" * 30,
"",
"⏰ 时间信息:",
f" 消息时间: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(message.message_info.time))}",
f" 时间戳: {message.message_info.time}",
"",
"👤 发送者:",
f" 用户ID: {user.user_id}",
f" 昵称: {user.user_nickname}",
f" 群名片: {user.user_cardname or ''}",
f" 平台: {user.platform}",
]
if group:
context_lines.extend(
[
"",
"👥 群聊环境:",
f" 群ID: {group.group_id}",
f" 群名: {group.group_name or '未知'}",
f" 平台: {group.platform}",
]
)
else:
context_lines.extend(
[
"",
"💬 私聊环境",
]
)
# 添加聊天流信息
if hasattr(message, "chat_stream") and message.chat_stream:
chat_stream = message.chat_stream
context_lines.extend(
[
"",
"🔄 聊天流:",
f" 流ID: {chat_stream.stream_id}",
]
)
# 添加消息内容信息
context_lines.extend(
[
"",
"📝 消息内容:",
f" 原始内容: {message.processed_plain_text}",
f" 消息长度: {len(message.processed_plain_text)} 字符",
f" 消息ID: {message.message_info.message_id}",
]
)
return True, "\n".join(context_lines)
except Exception as e:
logger.error(f"{self.log_prefix} 获取上下文信息时出错: {e}")
return False, f"获取上下文失败: {str(e)}"

View File

@@ -0,0 +1,187 @@
# 综合示例插件
## 概述
这是一个展示新插件系统完整功能的综合示例插件,整合了所有旧示例插件的功能,并使用新的架构重写。
## 功能特性
### 🎯 Action组件
#### SmartGreetingAction - 智能问候
- **触发方式**: 关键词触发 (你好、hello、hi、嗨等)
- **支持模式**: 所有聊天模式
- **功能**: 智能问候支持LLM个性化生成
- **配置**: 可自定义问候模板、启用表情、LLM生成
### 📝 Command组件
#### 1. ComprehensiveHelpCommand - 综合帮助系统
```
/help [命令名]
```
- **功能**: 显示所有命令帮助或特定命令详情
- **拦截**: ✅ 拦截消息处理
- **示例**: `/help`, `/help send`
#### 2. MessageSendCommand - 消息发送
```
/send <group|user> <ID> <消息内容>
```
- **功能**: 向指定群聊或私聊发送消息
- **拦截**: ✅ 拦截消息处理
- **示例**: `/send group 123456 大家好`
#### 3. SystemStatusCommand - 系统状态查询
```
/status [类型]
```
- **功能**: 查询系统、插件、内存等状态
- **拦截**: ✅ 拦截消息处理
- **示例**: `/status`, `/status 插件`
#### 4. EchoCommand - 回声命令
```
/echo <消息内容>
```
- **功能**: 重复用户输入的消息
- **拦截**: ✅ 拦截消息处理
- **示例**: `/echo Hello World`
#### 5. MessageInfoCommand - 消息信息查询
```
/info
```
- **功能**: 显示当前消息的详细信息
- **拦截**: ✅ 拦截消息处理
- **示例**: `/info`
#### 6. CustomPrefixCommand - 自定义前缀
```
/prefix <前缀> <内容>
```
- **功能**: 为消息添加自定义前缀
- **拦截**: ✅ 拦截消息处理
- **示例**: `/prefix [公告] 系统维护`
#### 7. LogMonitorCommand - 日志监控
```
/log [级别]
```
- **功能**: 记录消息到日志但不拦截后续处理
- **拦截**: ❌ 不拦截,继续处理消息
- **示例**: `/log`, `/log debug`
## 🔧 拦截控制演示
此插件完美演示了新插件系统的**拦截控制功能**
### 拦截型命令 (intercept_message = True)
- `/help` - 显示帮助后停止处理
- `/send` - 发送消息后停止处理
- `/status` - 查询状态后停止处理
- `/echo` - 回声后停止处理
- `/info` - 显示信息后停止处理
- `/prefix` - 添加前缀后停止处理
### 非拦截型命令 (intercept_message = False)
- `/log` - 记录日志但继续处理,可能触发其他功能
## ⚙️ 配置说明
插件支持通过 `config.toml` 进行详细配置:
### 组件控制
```toml
[components]
enable_greeting = true # 启用智能问候
enable_help = true # 启用帮助系统
enable_send = true # 启用消息发送
# ... 其他组件开关
```
### 功能配置
```toml
[greeting]
template = "你好,{username}" # 问候模板
enable_emoji = true # 启用表情
enable_llm = false # 启用LLM生成
[send]
max_message_length = 500 # 最大消息长度
[echo]
max_length = 200 # 回声最大长度
enable_formatting = true # 启用格式化
```
## 🚀 使用示例
### 智能问候
```
用户: 你好
机器人: 你好朋友欢迎使用MaiBot综合插件系统😊
```
### 帮助查询
```
用户: /help
机器人: [显示完整命令帮助列表]
用户: /help send
机器人: [显示send命令的详细帮助]
```
### 消息发送
```
用户: /send group 123456 大家好!
机器人: ✅ 消息已成功发送到 群聊 123456
```
### 日志监控(不拦截)
```
用户: /log info 这是一条测试消息
[日志记录但消息继续处理,可能触发智能问候等其他功能]
```
## 📁 文件结构
```
src/plugins/built_in/example_comprehensive/
├── plugin.py # 主插件文件
├── config.toml # 配置文件
└── README.md # 说明文档
```
## 🔄 架构升级
此插件展示了从旧插件系统到新插件系统的完整升级:
### 旧系统特征
- 使用 `@register_command` 装饰器
- 继承旧的 `BaseCommand`
- 硬编码的消息处理逻辑
- 有限的配置支持
### 新系统特征
- 使用统一的组件注册机制
- 新的 `BaseAction``BaseCommand` 基类
- **拦截控制功能** - 灵活的消息处理流程
- 强大的配置驱动架构
- 统一的API接口
- 完整的错误处理和日志
## 💡 开发指南
此插件可作为开发新插件的完整参考:
1. **Action开发**: 参考 `SmartGreetingAction`
2. **Command开发**: 参考各种Command实现
3. **拦截控制**: 根据需要设置 `intercept_message`
4. **配置使用**: 通过 `self.api.get_config()` 读取配置
5. **错误处理**: 完整的异常捕获和用户反馈
6. **日志记录**: 结构化的日志输出
## 🎉 总结
这个综合示例插件完美展示了新插件系统的强大功能,特别是**拦截控制机制**,让开发者可以精确控制消息处理流程,实现更灵活的插件交互模式。

View File

@@ -0,0 +1,53 @@
# 综合示例插件配置文件
[plugin]
name = "example_comprehensive"
version = "2.0.0"
enabled = true
description = "展示新插件系统完整功能的综合示例插件"
# 组件启用控制
[components]
enable_greeting = true
enable_help = true
enable_send = true
enable_echo = true
enable_info = true
# 智能问候配置
[greeting]
template = "你好,{username}欢迎使用MaiBot综合插件系统"
enable_emoji = true
enable_llm = false # 是否使用LLM生成个性化问候
# 消息发送配置
[send]
max_message_length = 500
enable_length_check = true
default_platform = "qq"
# 回声命令配置
[echo]
max_length = 200
enable_formatting = true
# 消息信息配置
[info]
show_detailed_info = true
include_stream_info = true
max_content_preview = 100
# 帮助系统配置
[help]
show_extended_help = true
include_action_info = true
include_config_info = true
# 骰子命令配置
[dice]
enable_dice = true
# 日志配置
[logging]
level = "INFO"
prefix = "[ExampleComprehensive]"

View File

@@ -0,0 +1,486 @@
"""
综合示例插件
将旧的示例插件功能重写为新插件系统架构,展示完整的插件开发模式。
包含功能:
- 智能问候Action
- 帮助系统Command
- 消息发送Command
- 状态查询Command
- 回声Command
- 自定义前缀Command
- 消息信息查询Command
- 高级消息发送Command
演示新插件系统的完整功能:
- Action和Command组件的定义
- 拦截控制功能
- 配置驱动的行为
- API的多种使用方式
- 日志和错误处理
"""
from typing import List, Tuple, Type, Optional, Dict, Any
import re
import time
import random
# 导入新插件系统
from src.plugin_system.base.base_plugin import BasePlugin
from src.plugin_system.base.base_plugin import register_plugin
from src.plugin_system.base.base_action import BaseAction
from src.plugin_system.base.base_command import BaseCommand
from src.plugin_system.base.component_types import ComponentInfo, ActionActivationType, ChatMode
from src.common.logger_manager import get_logger
logger = get_logger("example_comprehensive")
# ===== Action组件 =====
class SmartGreetingAction(BaseAction):
"""智能问候Action - 基于关键词触发的问候系统"""
# 激活设置
focus_activation_type = ActionActivationType.KEYWORD
normal_activation_type = ActionActivationType.KEYWORD
activation_keywords = ["你好", "hello", "hi", "", "问候", "早上好", "晚上好"]
keyword_case_sensitive = False
mode_enable = ChatMode.ALL
parallel_action = False
# Action参数定义
action_parameters = {
"username": "要问候的用户名(可选)"
}
# Action使用场景
action_require = [
"用户发送包含问候词汇的消息",
"检测到新用户加入时",
"响应友好交流需求"
]
# ===== Command组件 =====
class ComprehensiveHelpCommand(BaseCommand):
"""综合帮助系统 - 显示所有可用命令和Action"""
command_pattern = r"^/help(?:\s+(?P<command>\w+))?$"
command_help = "显示所有命令帮助或特定命令详情,用法:/help [命令名]"
command_examples = ["/help", "/help send", "/help status"]
intercept_message = True # 拦截消息,不继续处理
async def execute(self) -> Tuple[bool, Optional[str]]:
"""执行帮助命令"""
try:
command_name = self.matched_groups.get("command")
if command_name:
# 显示特定命令帮助
return await self._show_specific_help(command_name)
else:
# 显示所有命令概览
return await self._show_all_commands()
except Exception as e:
logger.error(f"{self.log_prefix} 帮助命令执行失败: {e}")
await self.send_reply(f"❌ 帮助系统错误: {str(e)}")
return False, str(e)
async def _show_specific_help(self, command_name: str) -> Tuple[bool, str]:
"""显示特定命令的详细帮助"""
# 这里可以扩展为动态获取所有注册的Command信息
help_info = {
"help": {
"description": "显示帮助信息",
"usage": "/help [命令名]",
"examples": ["/help", "/help send"]
},
"send": {
"description": "发送消息到指定目标",
"usage": "/send <group|user> <ID> <消息内容>",
"examples": ["/send group 123456 你好", "/send user 789456 私聊"]
},
"status": {
"description": "查询系统状态",
"usage": "/status [类型]",
"examples": ["/status", "/status 系统", "/status 插件"]
}
}
info = help_info.get(command_name.lower())
if not info:
response = f"❌ 未找到命令: {command_name}\n使用 /help 查看所有可用命令"
else:
response = f"""
📖 命令帮助: {command_name}
📝 描述: {info['description']}
⚙️ 用法: {info['usage']}
💡 示例:
{chr(10).join(f"{example}" for example in info['examples'])}
""".strip()
await self.send_reply(response)
return True, response
async def _show_all_commands(self) -> Tuple[bool, str]:
"""显示所有可用命令"""
help_text = """
🤖 综合示例插件 - 命令帮助
📝 可用命令:
• /help [命令] - 显示帮助信息
• /send <目标类型> <ID> <消息> - 发送消息
• /status [类型] - 查询系统状态
• /echo <消息> - 回声重复消息
• /info - 查询当前消息信息
• /prefix <前缀> <内容> - 自定义前缀消息
🎯 智能功能:
• 智能问候 - 关键词触发自动问候
• 状态监控 - 实时系统状态查询
• 消息转发 - 跨群聊/私聊消息发送
⚙️ 拦截控制:
• 部分命令拦截消息处理(如 /help
• 部分命令允许继续处理(如 /log
💡 使用 /help <命令名> 获取特定命令的详细说明
""".strip()
await self.send_reply(help_text)
return True, help_text
class MessageSendCommand(BaseCommand):
"""消息发送Command - 向指定群聊或私聊发送消息"""
command_pattern = r"^/send\s+(?P<target_type>group|user)\s+(?P<target_id>\d+)\s+(?P<content>.+)$"
command_help = "向指定群聊或私聊发送消息,用法:/send <group|user> <ID> <消息内容>"
command_examples = [
"/send group 123456789 大家好!",
"/send user 987654321 私聊消息",
"/send group 555666777 这是来自插件的消息"
]
intercept_message = True # 拦截消息处理
async def execute(self) -> Tuple[bool, Optional[str]]:
"""执行消息发送"""
try:
target_type = self.matched_groups.get("target_type")
target_id = self.matched_groups.get("target_id")
content = self.matched_groups.get("content")
if not all([target_type, target_id, content]):
await self.send_reply("❌ 命令参数不完整,请检查格式")
return False, "参数不完整"
# 长度限制检查
max_length = self.api.get_config("send.max_message_length", 500)
if len(content) > max_length:
await self.send_reply(f"❌ 消息过长,最大长度: {max_length} 字符")
return False, "消息过长"
logger.info(f"{self.log_prefix} 发送消息: {target_type}:{target_id} -> {content[:50]}...")
# 根据目标类型发送消息
if target_type == "group":
success = await self.api.send_text_to_group(
text=content,
group_id=target_id,
platform="qq"
)
target_desc = f"群聊 {target_id}"
elif target_type == "user":
success = await self.api.send_text_to_user(
text=content,
user_id=target_id,
platform="qq"
)
target_desc = f"用户 {target_id}"
else:
await self.send_reply(f"❌ 不支持的目标类型: {target_type}")
return False, f"不支持的目标类型: {target_type}"
# 返回结果
if success:
response = f"✅ 消息已成功发送到 {target_desc}"
await self.send_reply(response)
return True, response
else:
response = f"❌ 消息发送失败,目标 {target_desc} 可能不存在"
await self.send_reply(response)
return False, response
except Exception as e:
logger.error(f"{self.log_prefix} 消息发送失败: {e}")
error_msg = f"❌ 发送失败: {str(e)}"
await self.send_reply(error_msg)
return False, str(e)
class DiceCommand(BaseCommand):
"""骰子命令,使用!前缀而不是/前缀"""
command_pattern = r"^[!](?:dice|骰子)(?:\s+(?P<count>\d+))?$" # 匹配 !dice 或 !骰子,可选参数为骰子数量
command_help = "使用方法: !dice [数量] 或 !骰子 [数量] - 掷骰子默认掷1个"
command_examples = ["!dice", "!骰子", "!dice 3", "!骰子 5"]
intercept_message = True # 拦截消息处理
async def execute(self) -> Tuple[bool, Optional[str]]:
"""执行骰子命令
Returns:
Tuple[bool, Optional[str]]: (是否执行成功, 回复消息)
"""
try:
# 获取骰子数量默认为1
count_str = self.matched_groups.get("count")
# 确保count_str不为None
if count_str is None:
count = 1 # 默认值
else:
try:
count = int(count_str)
if count <= 0:
response = "❌ 骰子数量必须大于0"
await self.send_reply(response)
return False, response
if count > 10: # 限制最大数量
response = "❌ 一次最多只能掷10个骰子"
await self.send_reply(response)
return False, response
except ValueError:
response = "❌ 骰子数量必须是整数"
await self.send_reply(response)
return False, response
# 生成随机数
results = [random.randint(1, 6) for _ in range(count)]
# 构建回复消息
if count == 1:
message = f"🎲 掷出了 {results[0]}"
else:
dice_results = ", ".join(map(str, results))
total = sum(results)
message = f"🎲 掷出了 {count} 个骰子: [{dice_results}],总点数: {total}"
await self.send_reply(message)
logger.info(f"{self.log_prefix} 执行骰子命令: {message}")
return True, message
except Exception as e:
error_msg = f"❌ 执行命令时出错: {str(e)}"
await self.send_reply(error_msg)
logger.error(f"{self.log_prefix} 执行骰子命令时出错: {e}")
return False, error_msg
class EchoCommand(BaseCommand):
"""回声Command - 重复用户输入的消息"""
command_pattern = r"^/echo\s+(?P<message>.+)$"
command_help = "重复你的消息内容,用法:/echo <消息内容>"
command_examples = ["/echo Hello World", "/echo 你好世界", "/echo 测试回声"]
intercept_message = True # 拦截消息处理
async def execute(self) -> Tuple[bool, Optional[str]]:
"""执行回声命令"""
try:
message = self.matched_groups.get("message", "")
if not message:
response = "❌ 请提供要重复的消息!用法:/echo <消息内容>"
await self.send_reply(response)
return False, response
# 检查消息长度限制
max_length = self.api.get_config("echo.max_length", 200)
if len(message) > max_length:
response = f"❌ 消息过长,最大长度: {max_length} 字符"
await self.send_reply(response)
return False, response
# 格式化回声消息
enable_formatting = self.api.get_config("echo.enable_formatting", True)
if enable_formatting:
response = f"🔊 回声: {message}"
else:
response = message
await self.send_reply(response)
logger.info(f"{self.log_prefix} 回声消息: {message}")
return True, response
except Exception as e:
logger.error(f"{self.log_prefix} 回声命令失败: {e}")
error_msg = f"❌ 回声失败: {str(e)}"
await self.send_reply(error_msg)
return False, str(e)
class MessageInfoCommand(BaseCommand):
"""消息信息Command - 显示当前消息的详细信息"""
command_pattern = r"^/info$"
command_help = "显示当前消息的详细信息"
command_examples = ["/info"]
intercept_message = True # 拦截消息处理
async def execute(self) -> Tuple[bool, Optional[str]]:
"""执行消息信息查询"""
try:
message = self.message
# 收集消息信息
user_info = message.message_info.user_info
group_info = message.message_info.group_info
info_parts = [
"📋 消息信息详情",
"",
f"👤 用户信息:",
f" • ID: {user_info.user_id}",
f" • 昵称: {user_info.user_nickname}",
f" • 群名片: {getattr(user_info, 'user_cardname', '')}",
f" • 平台: {message.message_info.platform}",
"",
f"💬 消息信息:",
f" • 消息ID: {message.message_info.message_id}",
f" • 时间戳: {message.message_info.time}",
f" • 原始内容: {message.processed_plain_text[:100]}{'...' if len(message.processed_plain_text) > 100 else ''}",
f" • 是否表情: {'' if getattr(message, 'is_emoji', False) else ''}",
]
# 群聊信息
if group_info:
info_parts.extend([
"",
f"👥 群聊信息:",
f" • 群ID: {group_info.group_id}",
f" • 群名: {getattr(group_info, 'group_name', '未知')}",
f" • 聊天类型: 群聊"
])
else:
info_parts.extend([
"",
f"💭 聊天类型: 私聊"
])
# 流信息
if hasattr(message, 'chat_stream') and message.chat_stream:
stream = message.chat_stream
info_parts.extend([
"",
f"🌊 聊天流信息:",
f" • 流ID: {stream.stream_id}",
f" • 创建时间: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(stream.create_time))}",
f" • 最后活跃: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(stream.last_active_time))}"
])
response = "\n".join(info_parts)
await self.send_reply(response)
logger.info(f"{self.log_prefix} 显示消息信息: {user_info.user_id}")
return True, response
except Exception as e:
logger.error(f"{self.log_prefix} 消息信息查询失败: {e}")
error_msg = f"❌ 信息查询失败: {str(e)}"
await self.send_reply(error_msg)
return False, str(e)
@register_plugin
class ExampleComprehensivePlugin(BasePlugin):
"""综合示例插件
整合了旧示例插件的所有功能,展示新插件系统的完整能力:
- 多种Action和Command组件
- 拦截控制功能演示
- 配置驱动的行为
- 完整的错误处理
- 日志记录和监控
"""
# 插件基本信息
plugin_name = "example_comprehensive"
plugin_description = "综合示例插件,展示新插件系统的完整功能"
plugin_version = "2.0.0"
plugin_author = "MaiBot开发团队"
enable_plugin = True
config_file_name = "config.toml"
def get_plugin_components(self) -> List[Tuple[ComponentInfo, Type]]:
"""返回插件包含的组件列表"""
# 从配置获取组件启用状态
enable_greeting = self.get_config("components.enable_greeting", True)
enable_help = self.get_config("components.enable_help", True)
enable_send = self.get_config("components.enable_send", True)
enable_echo = self.get_config("components.enable_echo", True)
enable_info = self.get_config("components.enable_info", True)
enable_dice = self.get_config("components.enable_dice", True)
components = []
# 添加Action组件
if enable_greeting:
components.append((
SmartGreetingAction.get_action_info(
name="smart_greeting",
description="智能问候系统,基于关键词触发"
),
SmartGreetingAction
))
# 添加Command组件
if enable_help:
components.append((
ComprehensiveHelpCommand.get_command_info(
name="comprehensive_help",
description="综合帮助系统,显示所有命令信息"
),
ComprehensiveHelpCommand
))
if enable_send:
components.append((
MessageSendCommand.get_command_info(
name="message_send",
description="消息发送命令,支持群聊和私聊"
),
MessageSendCommand
))
if enable_echo:
components.append((
EchoCommand.get_command_info(
name="echo",
description="回声命令,重复用户输入"
),
EchoCommand
))
if enable_info:
components.append((
MessageInfoCommand.get_command_info(
name="message_info",
description="消息信息查询,显示详细信息"
),
MessageInfoCommand
))
if enable_dice:
components.append((
DiceCommand.get_command_info(
name="dice",
description="骰子命令,掷骰子"
),
DiceCommand
))
return components

View File

@@ -1,30 +0,0 @@
# 完整示例插件配置文件
[plugin]
name = "simple_plugin"
version = "1.1.0"
enabled = true
description = "展示新插件系统完整功能的示例插件"
[hello_action]
greeting_message = "你好,{username}欢迎使用MaiBot新插件系统"
enable_emoji = true
enable_llm_greeting = false # 是否使用LLM生成个性化问候
default_username = "朋友"
[status_command]
show_detailed_info = true
allowed_types = ["系统", "插件", "数据库", "内存", "网络"]
default_type = "系统"
[echo_command]
max_message_length = 500
enable_formatting = true
[help_command]
show_extended_help = true
include_config_info = true
[logging]
level = "INFO"
prefix = "[SimplePlugin]"

View File

@@ -1,195 +0,0 @@
"""
完整示例插件
演示新插件系统的完整功能:
- 使用简化的导入接口
- 展示Action和Command组件的定义
- 展示插件配置的使用
- 提供实用的示例功能
- 演示API的多种使用方式
"""
from typing import List, Tuple, Type, Optional
# 使用简化的导入接口
from src.plugin_system import (
BasePlugin,
register_plugin,
BaseAction,
BaseCommand,
ComponentInfo,
ActionActivationType,
ChatMode,
)
from src.common.logger_manager import get_logger
logger = get_logger("simple_plugin")
class HelloAction(BaseAction):
"""智能问候Action组件"""
# ✅ 现在可以直接在类中定义激活条件!
focus_activation_type = ActionActivationType.KEYWORD
normal_activation_type = ActionActivationType.KEYWORD
activation_keywords = ["你好", "hello", "问候", "hi", ""]
keyword_case_sensitive = False
mode_enable = ChatMode.ALL
parallel_action = False
async def execute(self) -> Tuple[bool, str]:
"""执行问候动作"""
username = self.action_data.get("username", "朋友")
# 使用默认配置值(避免创建新插件实例)
greeting_template = "你好,{username}"
enable_emoji = True
enable_llm = False
# 如果启用LLM生成个性化问候
if enable_llm:
try:
# 演示使用LLM API生成个性化问候
models = self.api.get_available_models()
if models:
first_model = list(models.values())[0]
prompt = f"为用户名叫{username}的朋友生成一句温暖的个性化问候语不超过30字"
success, response, _, _ = await self.api.generate_with_model(
prompt=prompt, model_config=first_model
)
if success:
logger.info(f"{self.log_prefix} 使用LLM生成问候: {response}")
return True, response
except Exception as e:
logger.warning(f"{self.log_prefix} LLM生成问候失败使用默认模板: {e}")
# 构建基础问候消息
response = greeting_template.format(username=username)
if enable_emoji:
response += " 😊"
# 演示存储Action执行记录到数据库
await self.api.store_action_info(
action_build_into_prompt=False, action_prompt_display=f"问候了用户: {username}", action_done=True
)
logger.info(f"{self.log_prefix} 执行问候动作: {username}")
return True, response
class EchoCommand(BaseCommand):
"""回声命令 - 重复用户输入"""
# ✅ 现在可以直接在类中定义命令模式!
command_pattern = r"^/echo\s+(?P<message>.+)$"
command_help = "重复消息,用法:/echo <消息内容>"
command_examples = ["/echo Hello World", "/echo 你好世界"]
async def execute(self) -> Tuple[bool, Optional[str]]:
"""执行回声命令"""
# 获取匹配的参数
message = self.matched_groups.get("message", "")
if not message:
response = "请提供要重复的消息!用法:/echo <消息内容>"
else:
response = f"🔊 {message}"
# 发送回复
await self.send_reply(response)
logger.info(f"{self.log_prefix} 执行回声命令: {message}")
return True, response
class StatusCommand(BaseCommand):
"""状态查询Command组件"""
# ✅ 直接定义命令模式
command_pattern = r"^/status\s*(?P<type>\w+)?$"
command_help = "查询系统状态,用法:/status [类型]"
command_examples = ["/status", "/status 系统", "/status 插件"]
async def execute(self) -> Tuple[bool, Optional[str]]:
"""执行状态查询命令"""
# 获取匹配的参数
query_type = self.matched_groups.get("type", "系统")
# 使用默认配置值(避免创建新插件实例)
show_detailed = True
allowed_types = ["系统", "插件"]
if query_type not in allowed_types:
response = f"不支持的查询类型: {query_type}\n支持的类型: {', '.join(allowed_types)}"
elif show_detailed:
response = f"📊 {query_type}状态详情:\n✅ 运行正常\n🔧 版本: 1.0.0\n⚡ 性能: 良好"
else:
response = f"{query_type}状态:正常"
# 发送回复
await self.send_reply(response)
logger.info(f"{self.log_prefix} 执行状态查询: {query_type}")
return True, response
class HelpCommand(BaseCommand):
"""帮助命令 - 显示插件功能"""
# ✅ 直接定义命令模式
command_pattern = r"^/help$"
command_help = "显示插件帮助信息"
command_examples = ["/help"]
async def execute(self) -> Tuple[bool, Optional[str]]:
"""执行帮助命令"""
help_text = """
🤖 简单示例插件帮助
📝 可用命令:
• /echo <消息> - 重复你的消息
• /status [类型] - 查询系统状态
• /help - 显示此帮助信息
🎯 智能功能:
• 自动问候 - 当消息包含"你好""hello"等关键词时触发
⚙️ 配置:
本插件支持通过config.toml文件进行个性化配置
💡 这是新插件系统的完整示例展示了Action和Command的结合使用。
""".strip()
await self.send_reply(help_text)
logger.info(f"{self.log_prefix} 显示帮助信息")
return True, "已显示帮助信息"
@register_plugin
class SimplePlugin(BasePlugin):
"""完整示例插件
包含多个Action和Command组件展示插件系统的完整功能
"""
# 插件基本信息
plugin_name = "simple_plugin"
plugin_description = "完整的示例插件,展示新插件系统的各种功能"
plugin_version = "1.1.0"
plugin_author = "MaiBot开发团队"
enable_plugin = True
config_file_name = "config.toml" # 配置文件
def get_plugin_components(self) -> List[Tuple[ComponentInfo, Type]]:
"""返回插件包含的组件列表"""
# ✅ 现在可以直接从类属性生成组件信息!
return [
(HelloAction.get_action_info("hello_action", "智能问候动作,支持自定义消息和表情"), HelloAction),
(EchoCommand.get_command_info("echo_command", "回声命令,重复用户输入的消息"), EchoCommand),
(StatusCommand.get_command_info("status_command", "状态查询命令,支持多种查询类型"), StatusCommand),
(HelpCommand.get_command_info("help_command", "帮助命令,显示插件功能说明"), HelpCommand),
]

View File

@@ -1,22 +0,0 @@
"""禁言插件包
这是一个群聊管理插件,提供智能禁言功能。
功能特性:
- 智能LLM判定根据聊天内容智能判断是否需要禁言
- 灵活的时长管理:支持自定义禁言时长限制
- 模板化消息:支持自定义禁言提示消息
- 参数验证:完整的输入参数验证和错误处理
- 配置文件支持:所有设置可通过配置文件调整
使用场景:
- 用户发送违规内容时自动判定禁言
- 用户主动要求被禁言时执行操作
- 管理员通过聊天指令触发禁言动作
配置文件src/plugins/mute_plugin/actions/mute_action_config.toml
"""
"""
这是一个测试插件
"""

View File

@@ -1,3 +0,0 @@
"""测试插件动作模块"""
from . import mute_action # noqa

View File

@@ -1,263 +0,0 @@
from src.common.logger_manager import get_logger
from src.chat.actions.plugin_action import PluginAction, register_action, ActionActivationType
from src.chat.actions.base_action import ChatMode
from typing import Tuple
logger = get_logger("mute_action")
@register_action
class MuteAction(PluginAction):
"""群聊禁言动作处理类"""
action_name = "mute_action"
action_description = "在特定情境下,对某人采取禁言,让他不能说话"
action_parameters = {
"target": "禁言对象,必填,输入你要禁言的对象的名字",
"duration": "禁言时长,必填,输入你要禁言的时长(秒),单位为秒,必须为数字",
"reason": "禁言理由,可选",
}
action_require = [
"当有人违反了公序良俗的内容",
"当有人刷屏时使用",
"当有人发了擦边,或者色情内容时使用",
"当有人要求禁言自己时使用",
"如果某人已经被禁言了,就不要再次禁言了,除非你想追加时间!!",
]
enable_plugin = False # 启用插件
associated_types = ["command", "text"]
action_config_file_name = "mute_action_config.toml"
# 激活类型设置
focus_activation_type = ActionActivationType.LLM_JUDGE # Focus模式使用LLM判定确保谨慎
normal_activation_type = ActionActivationType.KEYWORD # Normal模式使用关键词激活快速响应
# 关键词设置用于Normal模式
activation_keywords = ["禁言", "mute", "ban", "silence"]
keyword_case_sensitive = False
# LLM判定提示词用于Focus模式
llm_judge_prompt = """
判定是否需要使用禁言动作的严格条件:
必须使用禁言的情况:
1. 用户发送明显违规内容(色情、暴力、政治敏感等)
2. 恶意刷屏或垃圾信息轰炸
3. 用户主动明确要求被禁言("禁言我"等)
4. 严重违反群规的行为
5. 恶意攻击他人或群组管理
绝对不要使用的情况:
1. 正常聊天和讨论,即使话题敏感
2. 情绪化表达但无恶意
3. 开玩笑或调侃,除非过分
4. 单纯的意见分歧或争论
5. 轻微的不当言论(应优先提醒)
6. 用户只是提到"禁言"词汇但非要求
注意:禁言是严厉措施,只在明确违规或用户主动要求时使用。
宁可保守也不要误判,保护用户的发言权利。
"""
# Random激活概率备用
random_activation_probability = 0.05 # 设置很低的概率作为兜底
# 模式启用设置 - 禁言功能在所有模式下都可用
mode_enable = ChatMode.ALL
# 并行执行设置 - 禁言动作可以与回复并行执行,不覆盖回复内容
parallel_action = False
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# 生成配置文件(如果不存在)
self._generate_config_if_needed()
def _generate_config_if_needed(self):
"""生成配置文件(如果不存在)"""
import os
# 获取动作文件所在目录
current_dir = os.path.dirname(os.path.abspath(__file__))
config_path = os.path.join(current_dir, "mute_action_config.toml")
if not os.path.exists(config_path):
config_content = """\
# 禁言动作配置文件
# 默认禁言时长限制(秒)
min_duration = 60 # 最短禁言时长
max_duration = 2592000 # 最长禁言时长30天
default_duration = 300 # 默认禁言时长5分钟
# 禁言消息模板
templates = [
"好的,禁言 {target} {duration},理由:{reason}",
"收到,对 {target} 执行禁言 {duration},因为{reason}",
"明白了,禁言 {target} {duration},原因是{reason}"
]
# 错误消息模板
error_messages = [
"没有指定禁言对象呢~",
"没有指定禁言时长呢~",
"禁言时长必须是正数哦~",
"禁言时长必须是数字哦~",
"找不到 {target} 这个人呢~",
"查找用户信息时出现问题~"
]
# 是否启用时长美化显示
enable_duration_formatting = true
# 是否记录禁言历史
log_mute_history = true
"""
try:
with open(config_path, "w", encoding="utf-8") as f:
f.write(config_content)
logger.info(f"已生成禁言动作配置文件: {config_path}")
except Exception as e:
logger.error(f"生成配置文件失败: {e}")
def _get_duration_limits(self) -> tuple[int, int, int]:
"""获取时长限制配置"""
min_dur = self.config.get("min_duration", 60)
max_dur = self.config.get("max_duration", 2592000)
default_dur = self.config.get("default_duration", 300)
return min_dur, max_dur, default_dur
def _get_template_message(self, target: str, duration_str: str, reason: str) -> str:
"""获取模板化的禁言消息"""
templates = self.config.get("templates", ["好的,禁言 {target} {duration},理由:{reason}"])
import random
template = random.choice(templates)
return template.format(target=target, duration=duration_str, reason=reason)
async def process(self) -> Tuple[bool, str]:
"""处理群聊禁言动作"""
logger.info(f"{self.log_prefix} 执行禁言动作: {self.reasoning}")
# 获取参数
target = self.action_data.get("target")
duration = self.action_data.get("duration")
reason = self.action_data.get("reason", "违反群规")
# 参数验证
if not target:
error_msg = "禁言目标不能为空"
logger.error(f"{self.log_prefix} {error_msg}")
await self.send_message_by_expressor("没有指定禁言对象呢~")
return False, error_msg
if not duration:
error_msg = "禁言时长不能为空"
logger.error(f"{self.log_prefix} {error_msg}")
await self.send_message_by_expressor("没有指定禁言时长呢~")
return False, error_msg
# 获取时长限制配置
min_duration, max_duration, default_duration = self._get_duration_limits()
# 验证时长格式并转换
try:
duration_int = int(duration)
if duration_int <= 0:
error_msg = "禁言时长必须大于0"
logger.error(f"{self.log_prefix} {error_msg}")
error_templates = self.config.get("error_messages", ["禁言时长必须是正数哦~"])
await self.send_message_by_expressor(
error_templates[2] if len(error_templates) > 2 else "禁言时长必须是正数哦~"
)
return False, error_msg
# 限制禁言时长范围
if duration_int < min_duration:
duration_int = min_duration
logger.info(f"{self.log_prefix} 禁言时长过短,调整为{min_duration}")
elif duration_int > max_duration:
duration_int = max_duration
logger.info(f"{self.log_prefix} 禁言时长过长,调整为{max_duration}")
except (ValueError, TypeError):
error_msg = f"禁言时长格式无效: {duration}"
logger.error(f"{self.log_prefix} {error_msg}")
error_templates = self.config.get("error_messages", ["禁言时长必须是数字哦~"])
await self.send_message_by_expressor(
error_templates[3] if len(error_templates) > 3 else "禁言时长必须是数字哦~"
)
return False, error_msg
# 获取用户ID
try:
platform, user_id = await self.get_user_id_by_person_name(target)
except Exception as e:
error_msg = f"查找用户ID时出错: {e}"
logger.error(f"{self.log_prefix} {error_msg}")
await self.send_message_by_expressor("查找用户信息时出现问题~")
return False, error_msg
if not user_id:
error_msg = f"未找到用户 {target} 的ID"
await self.send_message_by_expressor(f"找不到 {target} 这个人呢~")
logger.error(f"{self.log_prefix} {error_msg}")
return False, error_msg
# 发送表达情绪的消息
enable_formatting = self.config.get("enable_duration_formatting", True)
time_str = self._format_duration(duration_int) if enable_formatting else f"{duration_int}"
# 使用模板化消息
message = self._get_template_message(target, time_str, reason)
await self.send_message_by_expressor(message)
try:
duration_str = str(duration_int)
# 发送群聊禁言命令,按照新格式
await self.send_message(
type="command",
data={"name": "GROUP_BAN", "args": {"qq_id": str(user_id), "duration": duration_str}},
display_message=f"尝试禁言了 {target} {time_str}",
)
await self.store_action_info(
action_build_into_prompt=False,
action_prompt_display=f"你尝试禁言了 {target} {time_str},理由:{reason}",
)
logger.info(f"{self.log_prefix} 成功发送禁言命令,用户 {target}({user_id}),时长 {duration_int}")
return True, f"成功禁言 {target},时长 {time_str}"
except Exception as e:
logger.error(f"{self.log_prefix} 执行禁言动作时出错: {e}")
await self.send_message_by_expressor(f"执行禁言动作时出错: {e}")
return False, f"执行禁言动作时出错: {e}"
def _format_duration(self, seconds: int) -> str:
"""将秒数格式化为可读的时间字符串"""
if seconds < 60:
return f"{seconds}"
elif seconds < 3600:
minutes = seconds // 60
remaining_seconds = seconds % 60
if remaining_seconds > 0:
return f"{minutes}{remaining_seconds}"
else:
return f"{minutes}分钟"
elif seconds < 86400:
hours = seconds // 3600
remaining_minutes = (seconds % 3600) // 60
if remaining_minutes > 0:
return f"{hours}小时{remaining_minutes}分钟"
else:
return f"{hours}小时"
else:
days = seconds // 86400
remaining_hours = (seconds % 86400) // 3600
if remaining_hours > 0:
return f"{days}{remaining_hours}小时"
else:
return f"{days}"

View File

@@ -1,29 +0,0 @@
# 禁言动作配置文件
# 默认禁言时长限制(秒)
min_duration = 60 # 最短禁言时长
max_duration = 2592000 # 最长禁言时长30天
default_duration = 300 # 默认禁言时长5分钟
# 禁言消息模板
templates = [
"好的,禁言 {target} {duration},理由:{reason}",
"收到,对 {target} 执行禁言 {duration},因为{reason}",
"明白了,禁言 {target} {duration},原因是{reason}"
]
# 错误消息模板
error_messages = [
"没有指定禁言对象呢~",
"没有指定禁言时长呢~",
"禁言时长必须是正数哦~",
"禁言时长必须是数字哦~",
"找不到 {target} 这个人呢~",
"查找用户信息时出现问题~"
]
# 是否启用时长美化显示
enable_duration_formatting = true
# 是否记录禁言历史
log_mute_history = true