diff --git a/src/chat/focus_chat/planners/action_manager.py b/src/chat/focus_chat/planners/action_manager.py index 988b3112b..71e256d13 100644 --- a/src/chat/focus_chat/planners/action_manager.py +++ b/src/chat/focus_chat/planners/action_manager.py @@ -366,6 +366,12 @@ class ActionManager: logger.error(f"未找到插件Action组件类: {action_name}") return None + # 获取插件配置 + component_info = component_registry.get_component_info(action_name) + plugin_config = None + if component_info and component_info.plugin_name: + plugin_config = component_registry.get_plugin_config(component_info.plugin_name) + # 创建插件Action实例 plugin_action_instance = component_class( action_data=action_data, @@ -377,6 +383,7 @@ class ActionManager: replyer=replyer, observations=observations, log_prefix=log_prefix, + plugin_config=plugin_config, ) # 创建兼容性包装器 diff --git a/src/chat/focus_chat/replyer/default_replyer.py b/src/chat/focus_chat/replyer/default_replyer.py index a591a26c5..b69573fd8 100644 --- a/src/chat/focus_chat/replyer/default_replyer.py +++ b/src/chat/focus_chat/replyer/default_replyer.py @@ -335,7 +335,7 @@ class DefaultReplyer: chat_talking_prompt = build_readable_messages( message_list_before_now, replace_bot_name=True, - merge_messages=True, + merge_messages=False, timestamp_mode="normal_no_YMD", read_mark=0.0, truncate=True, diff --git a/src/chat/message_receive/bot.py b/src/chat/message_receive/bot.py index 7a73889da..67b956b04 100644 --- a/src/chat/message_receive/bot.py +++ b/src/chat/message_receive/bot.py @@ -59,10 +59,13 @@ class ChatBot: # 使用新的组件注册中心查找命令 command_result = component_registry.find_command_by_text(text) if command_result: - command_class, matched_groups = command_result + command_class, matched_groups, intercept_message, plugin_name = command_result + + # 获取插件配置 + plugin_config = component_registry.get_plugin_config(plugin_name) # 创建命令实例 - command_instance = command_class(message) + command_instance = command_class(message, plugin_config) command_instance.set_matched_groups(matched_groups) try: @@ -71,11 +74,12 @@ class ChatBot: # 记录命令执行结果 if success: - logger.info(f"命令执行成功: {command_class.__name__}") + logger.info(f"命令执行成功: {command_class.__name__} (拦截: {intercept_message})") else: logger.warning(f"命令执行失败: {command_class.__name__} - {response}") - return True, response, False # 找到命令,不继续处理 + # 根据命令的拦截设置决定是否继续处理消息 + return True, response, not intercept_message # 找到命令,根据intercept_message决定是否继续 except Exception as e: logger.error(f"执行命令时出错: {command_class.__name__} - {e}") @@ -88,7 +92,8 @@ class ChatBot: except Exception as send_error: logger.error(f"发送错误消息失败: {send_error}") - return True, str(e), False # 命令出错,不继续处理 + # 命令出错时,根据命令的拦截设置决定是否继续处理消息 + return True, str(e), not intercept_message # 没有找到命令,继续处理消息 return False, None, True diff --git a/src/main.py b/src/main.py index f537928e2..4be15f24e 100644 --- a/src/main.py +++ b/src/main.py @@ -20,9 +20,6 @@ from rich.traceback import install from .chat.focus_chat.expressors.exprssion_learner import expression_learner from .api.main import start_api_server -# 导入actions模块,确保装饰器被执行 -import src.chat.actions.default_actions # noqa - # 导入新的插件管理器 from src.plugin_system.core.plugin_manager import plugin_manager @@ -82,8 +79,8 @@ class MainSystem: logger.success("API服务器启动成功") # 加载所有actions,包括默认的和插件的 - self._load_all_actions() - logger.success("动作系统加载成功") + plugin_count, component_count = plugin_manager.load_all_plugins() + logger.success(f"插件系统加载成功: {plugin_count} 个插件,{component_count} 个组件") # 初始化表情管理器 emoji_manager.initialize() @@ -138,18 +135,6 @@ class MainSystem: logger.error(f"启动大脑和外部世界失败: {e}") raise - def _load_all_actions(self): - """加载所有actions和commands,使用新的插件系统""" - try: - # 使用新的插件管理器加载所有插件 - plugin_count, component_count = plugin_manager.load_all_plugins() - - logger.success(f"插件系统加载成功: {plugin_count} 个插件,{component_count} 个组件") - - except Exception as e: - logger.error(f"加载插件失败: {e}") - logger.error(traceback.format_exc()) - async def schedule_tasks(self): """调度定时任务""" while True: diff --git a/src/plugin_system/apis/config_api.py b/src/plugin_system/apis/config_api.py index 0ca617bb4..a3dc64eb2 100644 --- a/src/plugin_system/apis/config_api.py +++ b/src/plugin_system/apis/config_api.py @@ -26,6 +26,34 @@ class ConfigAPI: """ return global_config.get(key, default) + def get_config(self, key: str, default: Any = None) -> Any: + """ + 从插件配置中获取值,支持嵌套键访问 + + Args: + key: 配置键名,支持嵌套访问如 "section.subsection.key" + default: 如果配置不存在时返回的默认值 + + Returns: + Any: 配置值或默认值 + """ + # 获取插件配置 + plugin_config = getattr(self, '_plugin_config', {}) + if not plugin_config: + return default + + # 支持嵌套键访问 + keys = key.split('.') + current = plugin_config + + for k in keys: + if isinstance(current, dict) and k in current: + current = current[k] + else: + return default + + return current + async def get_user_id_by_person_name(self, person_name: str) -> tuple[str, str]: """根据用户名获取用户ID diff --git a/src/plugin_system/apis/message_api.py b/src/plugin_system/apis/message_api.py index e7a9e6c73..3e8a629fa 100644 --- a/src/plugin_system/apis/message_api.py +++ b/src/plugin_system/apis/message_api.py @@ -160,201 +160,7 @@ class MessageAPI: message_type="text", content=text, platform=platform, target_id=user_id, is_group=False ) - async def send_message(self, type: str, data: str, target: Optional[str] = "", display_message: str = "") -> bool: - """发送消息的简化方法 - Args: - type: 消息类型,如"text"、"image"等 - data: 消息内容 - target: 目标消息(可选) - display_message: 显示的消息内容(可选) - - Returns: - bool: 是否发送成功 - """ - try: - # 安全获取服务和日志前缀 - services = getattr(self, "_services", {}) - log_prefix = getattr(self, "log_prefix", "[MessageAPI]") - - expressor: DefaultExpressor = services.get("expressor") - chat_stream: ChatStream = services.get("chat_stream") - - if not expressor or not chat_stream: - logger.error(f"{log_prefix} 无法发送消息:缺少必要的内部服务") - return False - - # 获取锚定消息(如果有) - observations = services.get("observations", []) - - if len(observations) > 0: - chatting_observation: ChattingObservation = next( - (obs for obs in observations if isinstance(obs, ChattingObservation)), None - ) - - if chatting_observation: - anchor_message = chatting_observation.search_message_by_text(target) - else: - anchor_message = None - else: - anchor_message = None - - # 如果没有找到锚点消息,创建一个占位符 - if not anchor_message: - logger.info(f"{log_prefix} 未找到锚点消息,创建占位符") - anchor_message = await create_empty_anchor_message( - chat_stream.platform, chat_stream.group_info, chat_stream - ) - else: - anchor_message.update_chat_stream(chat_stream) - - response_set = [ - (type, data), - ] - - # 调用内部方法发送消息 - success = await expressor.send_response_messages( - anchor_message=anchor_message, - response_set=response_set, - display_message=display_message, - ) - - return success - except Exception as e: - log_prefix = getattr(self, "log_prefix", "[MessageAPI]") - logger.error(f"{log_prefix} 发送消息时出错: {e}") - traceback.print_exc() - return False - - async def send_message_by_expressor(self, text: str, target: Optional[str] = None) -> bool: - """通过expressor发送文本消息的简化方法 - - Args: - text: 要发送的消息文本 - target: 目标消息(可选) - - Returns: - bool: 是否发送成功 - """ - # 安全获取服务和日志前缀 - services = getattr(self, "_services", {}) - log_prefix = getattr(self, "log_prefix", "[MessageAPI]") - - expressor: DefaultExpressor = services.get("expressor") - chat_stream: ChatStream = services.get("chat_stream") - - if not expressor or not chat_stream: - logger.error(f"{log_prefix} 无法发送消息:缺少必要的内部服务") - return False - - # 构造简化的动作数据 - reply_data = {"text": text, "target": target or "", "emojis": []} - - # 获取锚定消息(如果有) - observations = services.get("observations", []) - - # 查找 ChattingObservation 实例 - chatting_observation = None - for obs in observations: - if isinstance(obs, ChattingObservation): - chatting_observation = obs - break - - if not chatting_observation: - logger.warning(f"{log_prefix} 未找到 ChattingObservation 实例,创建占位符") - anchor_message = await create_empty_anchor_message( - chat_stream.platform, chat_stream.group_info, chat_stream - ) - else: - anchor_message = chatting_observation.search_message_by_text(reply_data["target"]) - if not anchor_message: - logger.info(f"{log_prefix} 未找到锚点消息,创建占位符") - anchor_message = await create_empty_anchor_message( - chat_stream.platform, chat_stream.group_info, chat_stream - ) - else: - anchor_message.update_chat_stream(chat_stream) - - # 调用内部方法发送消息 - cycle_timers = getattr(self, "cycle_timers", {}) - reasoning = getattr(self, "reasoning", "插件生成") - thinking_id = getattr(self, "thinking_id", "plugin_thinking") - - success, _ = await expressor.deal_reply( - cycle_timers=cycle_timers, - action_data=reply_data, - anchor_message=anchor_message, - reasoning=reasoning, - thinking_id=thinking_id, - ) - - return success - - async def send_message_by_replyer( - self, target: Optional[str] = None, extra_info_block: Optional[str] = None - ) -> bool: - """通过replyer发送消息的简化方法 - - Args: - target: 目标消息(可选) - extra_info_block: 额外信息块(可选) - - Returns: - bool: 是否发送成功 - """ - # 安全获取服务和日志前缀 - services = getattr(self, "_services", {}) - log_prefix = getattr(self, "log_prefix", "[MessageAPI]") - - replyer: DefaultReplyer = services.get("replyer") - chat_stream: ChatStream = services.get("chat_stream") - - if not replyer or not chat_stream: - logger.error(f"{log_prefix} 无法发送消息:缺少必要的内部服务") - return False - - # 构造简化的动作数据 - reply_data = {"target": target or "", "extra_info_block": extra_info_block} - - # 获取锚定消息(如果有) - observations = services.get("observations", []) - - # 查找 ChattingObservation 实例 - chatting_observation = None - for obs in observations: - if isinstance(obs, ChattingObservation): - chatting_observation = obs - break - - if not chatting_observation: - logger.warning(f"{log_prefix} 未找到 ChattingObservation 实例,创建占位符") - anchor_message = await create_empty_anchor_message( - chat_stream.platform, chat_stream.group_info, chat_stream - ) - else: - anchor_message = chatting_observation.search_message_by_text(reply_data["target"]) - if not anchor_message: - logger.info(f"{log_prefix} 未找到锚点消息,创建占位符") - anchor_message = await create_empty_anchor_message( - chat_stream.platform, chat_stream.group_info, chat_stream - ) - else: - anchor_message.update_chat_stream(chat_stream) - - # 调用内部方法发送消息 - cycle_timers = getattr(self, "cycle_timers", {}) - reasoning = getattr(self, "reasoning", "插件生成") - thinking_id = getattr(self, "thinking_id", "plugin_thinking") - - success, _ = await replyer.deal_reply( - cycle_timers=cycle_timers, - action_data=reply_data, - anchor_message=anchor_message, - reasoning=reasoning, - thinking_id=thinking_id, - ) - - return success def get_chat_type(self) -> str: """获取当前聊天类型 diff --git a/src/plugin_system/apis/plugin_api.py b/src/plugin_system/apis/plugin_api.py index 0931a0055..44d990c78 100644 --- a/src/plugin_system/apis/plugin_api.py +++ b/src/plugin_system/apis/plugin_api.py @@ -33,7 +33,7 @@ class PluginAPI(MessageAPI, LLMAPI, DatabaseAPI, ConfigAPI, UtilsAPI, StreamAPI, """ def __init__( - self, chat_stream=None, expressor=None, replyer=None, observations=None, log_prefix: str = "[PluginAPI]" + self, chat_stream=None, expressor=None, replyer=None, observations=None, log_prefix: str = "[PluginAPI]", plugin_config: dict = None ): """ 初始化插件API @@ -44,6 +44,7 @@ class PluginAPI(MessageAPI, LLMAPI, DatabaseAPI, ConfigAPI, UtilsAPI, StreamAPI, replyer: 回复器对象 observations: 观察列表 log_prefix: 日志前缀 + plugin_config: 插件配置字典 """ # 存储依赖对象 self._services = { @@ -61,6 +62,9 @@ class PluginAPI(MessageAPI, LLMAPI, DatabaseAPI, ConfigAPI, UtilsAPI, StreamAPI, # 调用所有父类的初始化 super().__init__() + # 存储插件配置 + self._plugin_config = plugin_config or {} + logger.debug(f"{self.log_prefix} PluginAPI 初始化完成") def set_chat_stream(self, chat_stream): @@ -105,7 +109,7 @@ class PluginAPI(MessageAPI, LLMAPI, DatabaseAPI, ConfigAPI, UtilsAPI, StreamAPI, # 便捷的工厂函数 def create_plugin_api( - chat_stream=None, expressor=None, replyer=None, observations=None, log_prefix: str = "[Plugin]" + chat_stream=None, expressor=None, replyer=None, observations=None, log_prefix: str = "[Plugin]", plugin_config: dict = None ) -> PluginAPI: """ 创建插件API实例的便捷函数 @@ -116,12 +120,13 @@ def create_plugin_api( replyer: 回复器对象 observations: 观察列表 log_prefix: 日志前缀 + plugin_config: 插件配置字典 Returns: PluginAPI: 配置好的插件API实例 """ return PluginAPI( - chat_stream=chat_stream, expressor=expressor, replyer=replyer, observations=observations, log_prefix=log_prefix + chat_stream=chat_stream, expressor=expressor, replyer=replyer, observations=observations, log_prefix=log_prefix, plugin_config=plugin_config ) diff --git a/src/plugin_system/base/base_action.py b/src/plugin_system/base/base_action.py index c8ce005eb..b5d841728 100644 --- a/src/plugin_system/base/base_action.py +++ b/src/plugin_system/base/base_action.py @@ -35,6 +35,7 @@ class BaseAction(ABC): chat_stream=None, log_prefix: str = "", shutting_down: bool = False, + plugin_config: dict = None, **kwargs, ): """初始化Action组件 @@ -50,6 +51,7 @@ class BaseAction(ABC): chat_stream: 聊天流对象 log_prefix: 日志前缀 shutting_down: 是否正在关闭 + plugin_config: 插件配置字典 **kwargs: 其他参数 """ self.action_data = action_data @@ -84,6 +86,7 @@ class BaseAction(ABC): replyer=replyer or kwargs.get("replyer"), observations=observations or kwargs.get("observations", []), log_prefix=log_prefix, + plugin_config=plugin_config or kwargs.get("plugin_config"), ) # 设置API的action上下文 @@ -118,7 +121,221 @@ class BaseAction(ABC): Returns: bool: 是否发送成功 """ - return await self.api.send_message("text", content) + chat_stream = self.api.get_service('chat_stream') + if not chat_stream: + logger.error(f"{self.log_prefix} 没有可用的聊天流发送回复") + return False + + if chat_stream.group_info: + # 群聊 + return await self.api.send_text_to_group( + text=content, + group_id=str(chat_stream.group_info.group_id), + platform=chat_stream.platform + ) + else: + # 私聊 + return await self.api.send_text_to_user( + text=content, + user_id=str(chat_stream.user_info.user_id), + platform=chat_stream.platform + ) + + async def send_command(self, command_name: str, args: dict = None, display_message: str = None) -> bool: + """发送命令消息 + + 使用和send_reply相同的方式通过MessageAPI发送命令 + + Args: + command_name: 命令名称 + args: 命令参数 + display_message: 显示消息 + + Returns: + bool: 是否发送成功 + """ + try: + # 构造命令数据 + command_data = { + "name": command_name, + "args": args or {} + } + + # 使用send_message_to_target方法发送命令 + chat_stream = self.api.get_service('chat_stream') + if not chat_stream: + logger.error(f"{self.log_prefix} 没有可用的聊天流发送命令") + return False + + command_content = str(command_data) + + if chat_stream.group_info: + # 群聊 + success = await self.api.send_message_to_target( + message_type="command", + content=command_content, + platform=chat_stream.platform, + target_id=str(chat_stream.group_info.group_id), + is_group=True, + display_message=display_message or f"执行命令: {command_name}" + ) + else: + # 私聊 + success = await self.api.send_message_to_target( + message_type="command", + content=command_content, + platform=chat_stream.platform, + target_id=str(chat_stream.user_info.user_id), + is_group=False, + display_message=display_message or f"执行命令: {command_name}" + ) + + if success: + logger.info(f"{self.log_prefix} 成功发送命令: {command_name}") + else: + logger.error(f"{self.log_prefix} 发送命令失败: {command_name}") + + return success + + except Exception as e: + logger.error(f"{self.log_prefix} 发送命令时出错: {e}") + return False + + async def send_message_by_expressor(self, text: str, target: str = "") -> bool: + """通过expressor发送文本消息的Action专用方法 + + Args: + text: 要发送的消息文本 + target: 目标消息(可选) + + Returns: + bool: 是否发送成功 + """ + try: + from src.chat.heart_flow.observation.chatting_observation import ChattingObservation + from src.chat.message_receive.message import create_empty_anchor_message + + # 获取服务 + expressor = self.api.get_service("expressor") + chat_stream = self.api.get_service("chat_stream") + observations = self.api.get_service("observations") or [] + + if not expressor or not chat_stream: + logger.error(f"{self.log_prefix} 无法通过expressor发送消息:缺少必要的服务") + return False + + # 构造动作数据 + reply_data = {"text": text, "target": target, "emojis": []} + + # 查找 ChattingObservation 实例 + chatting_observation = None + for obs in observations: + if isinstance(obs, ChattingObservation): + chatting_observation = obs + break + + if not chatting_observation: + logger.warning(f"{self.log_prefix} 未找到 ChattingObservation 实例,创建占位符") + anchor_message = await create_empty_anchor_message( + chat_stream.platform, chat_stream.group_info, chat_stream + ) + else: + anchor_message = chatting_observation.search_message_by_text(target) + if not anchor_message: + logger.info(f"{self.log_prefix} 未找到锚点消息,创建占位符") + anchor_message = await create_empty_anchor_message( + chat_stream.platform, chat_stream.group_info, chat_stream + ) + else: + anchor_message.update_chat_stream(chat_stream) + + # 使用Action上下文信息发送消息 + success, _ = await expressor.deal_reply( + cycle_timers=self.cycle_timers, + action_data=reply_data, + anchor_message=anchor_message, + reasoning=self.reasoning, + thinking_id=self.thinking_id, + ) + + if success: + logger.info(f"{self.log_prefix} 成功通过expressor发送消息") + else: + logger.error(f"{self.log_prefix} 通过expressor发送消息失败") + + return success + + except Exception as e: + logger.error(f"{self.log_prefix} 通过expressor发送消息时出错: {e}") + return False + + async def send_message_by_replyer(self, target: str = "", extra_info_block: str = None) -> bool: + """通过replyer发送消息的Action专用方法 + + Args: + target: 目标消息(可选) + extra_info_block: 额外信息块(可选) + + Returns: + bool: 是否发送成功 + """ + try: + from src.chat.heart_flow.observation.chatting_observation import ChattingObservation + from src.chat.message_receive.message import create_empty_anchor_message + + # 获取服务 + replyer = self.api.get_service("replyer") + chat_stream = self.api.get_service("chat_stream") + observations = self.api.get_service("observations") or [] + + if not replyer or not chat_stream: + logger.error(f"{self.log_prefix} 无法通过replyer发送消息:缺少必要的服务") + return False + + # 构造动作数据 + reply_data = {"target": target, "extra_info_block": extra_info_block} + + # 查找 ChattingObservation 实例 + chatting_observation = None + for obs in observations: + if isinstance(obs, ChattingObservation): + chatting_observation = obs + break + + if not chatting_observation: + logger.warning(f"{self.log_prefix} 未找到 ChattingObservation 实例,创建占位符") + anchor_message = await create_empty_anchor_message( + chat_stream.platform, chat_stream.group_info, chat_stream + ) + else: + anchor_message = chatting_observation.search_message_by_text(target) + if not anchor_message: + logger.info(f"{self.log_prefix} 未找到锚点消息,创建占位符") + anchor_message = await create_empty_anchor_message( + chat_stream.platform, chat_stream.group_info, chat_stream + ) + else: + anchor_message.update_chat_stream(chat_stream) + + # 使用Action上下文信息发送消息 + success, _ = await replyer.deal_reply( + cycle_timers=self.cycle_timers, + action_data=reply_data, + anchor_message=anchor_message, + reasoning=self.reasoning, + thinking_id=self.thinking_id, + ) + + if success: + logger.info(f"{self.log_prefix} 成功通过replyer发送消息") + else: + logger.error(f"{self.log_prefix} 通过replyer发送消息失败") + + return success + + except Exception as e: + logger.error(f"{self.log_prefix} 通过replyer发送消息时出错: {e}") + return False @classmethod def get_action_info(cls, name: str = None, description: str = None) -> "ActionInfo": @@ -132,12 +349,14 @@ class BaseAction(ABC): ActionInfo: 生成的Action信息对象 """ - # 自动生成名称和描述 + # 优先使用类属性,然后自动生成 if name is None: - name = cls.__name__.lower().replace("action", "") + name = getattr(cls, "action_name", cls.__name__.lower().replace("action", "")) if description is None: - description = cls.__doc__ or f"{cls.__name__} Action组件" - description = description.strip().split("\n")[0] # 取第一行作为描述 + description = getattr(cls, "action_description", None) + if description is None: + description = cls.__doc__ or f"{cls.__name__} Action组件" + description = description.strip().split("\n")[0] # 取第一行作为描述 # 安全获取激活类型值 def get_enum_value(attr_name, default): diff --git a/src/plugin_system/base/base_command.py b/src/plugin_system/base/base_command.py index 0a58dbd02..251b8ce8b 100644 --- a/src/plugin_system/base/base_command.py +++ b/src/plugin_system/base/base_command.py @@ -17,24 +17,27 @@ class BaseCommand(ABC): - command_pattern: 命令匹配的正则表达式 - command_help: 命令帮助信息 - command_examples: 命令使用示例列表 + - intercept_message: 是否拦截消息处理(默认True拦截,False继续传递) """ # 默认命令设置(子类可以覆盖) command_pattern: str = "" command_help: str = "" command_examples: List[str] = [] + intercept_message: bool = True # 默认拦截消息,不继续处理 - def __init__(self, message: MessageRecv): + def __init__(self, message: MessageRecv, plugin_config: dict = None): """初始化Command组件 Args: message: 接收到的消息对象 + plugin_config: 插件配置字典 """ self.message = message self.matched_groups: Dict[str, str] = {} # 存储正则表达式匹配的命名组 # 创建API实例 - self.api = PluginAPI(chat_stream=message.chat_stream, log_prefix="[Command]") + self.api = PluginAPI(chat_stream=message.chat_stream, log_prefix="[Command]", plugin_config=plugin_config) self.log_prefix = "[Command]" @@ -77,6 +80,62 @@ class BaseCommand(ABC): text=content, user_id=str(chat_stream.user_info.user_id), platform=chat_stream.platform ) + async def send_command(self, command_name: str, args: dict = None, display_message: str = None) -> bool: + """发送命令消息 + + 使用和send_reply相同的方式通过MessageAPI发送命令 + + Args: + command_name: 命令名称 + args: 命令参数 + display_message: 显示消息 + + Returns: + bool: 是否发送成功 + """ + try: + # 构造命令数据 + command_data = { + "name": command_name, + "args": args or {} + } + + # 使用send_message_to_target方法发送命令 + chat_stream = self.message.chat_stream + command_content = str(command_data) + + if chat_stream.group_info: + # 群聊 + success = await self.api.send_message_to_target( + message_type="command", + content=command_content, + platform=chat_stream.platform, + target_id=str(chat_stream.group_info.group_id), + is_group=True, + display_message=display_message or f"执行命令: {command_name}" + ) + else: + # 私聊 + success = await self.api.send_message_to_target( + message_type="command", + content=command_content, + platform=chat_stream.platform, + target_id=str(chat_stream.user_info.user_id), + is_group=False, + display_message=display_message or f"执行命令: {command_name}" + ) + + if success: + logger.info(f"{self.log_prefix} 成功发送命令: {command_name}") + else: + logger.error(f"{self.log_prefix} 发送命令失败: {command_name}") + + return success + + except Exception as e: + logger.error(f"{self.log_prefix} 发送命令时出错: {e}") + return False + @classmethod def get_command_info(cls, name: str = None, description: str = None) -> "CommandInfo": """从类属性生成CommandInfo @@ -89,12 +148,14 @@ class BaseCommand(ABC): CommandInfo: 生成的Command信息对象 """ - # 自动生成名称和描述 + # 优先使用类属性,然后自动生成 if name is None: - name = cls.__name__.lower().replace("command", "") + name = getattr(cls, "command_name", cls.__name__.lower().replace("command", "")) if description is None: - description = cls.__doc__ or f"{cls.__name__} Command组件" - description = description.strip().split("\n")[0] # 取第一行作为描述 + description = getattr(cls, "command_description", None) + if description is None: + description = cls.__doc__ or f"{cls.__name__} Command组件" + description = description.strip().split("\n")[0] # 取第一行作为描述 return CommandInfo( name=name, @@ -103,4 +164,5 @@ class BaseCommand(ABC): command_pattern=cls.command_pattern, command_help=cls.command_help, command_examples=cls.command_examples.copy() if cls.command_examples else [], + intercept_message=cls.intercept_message, ) diff --git a/src/plugin_system/base/component_types.py b/src/plugin_system/base/component_types.py index ee9173928..7ee48c7f6 100644 --- a/src/plugin_system/base/component_types.py +++ b/src/plugin_system/base/component_types.py @@ -86,6 +86,7 @@ class CommandInfo(ComponentInfo): command_pattern: str = "" # 命令匹配模式(正则表达式) command_help: str = "" # 命令帮助信息 command_examples: List[str] = None # 命令使用示例 + intercept_message: bool = True # 是否拦截消息处理(默认拦截) def __post_init__(self): super().__post_init__() diff --git a/src/plugin_system/core/component_registry.py b/src/plugin_system/core/component_registry.py index b0d2446ae..84eb54f13 100644 --- a/src/plugin_system/core/component_registry.py +++ b/src/plugin_system/core/component_registry.py @@ -141,14 +141,14 @@ class ComponentRegistry: info = self.get_component_info(command_name) return info if isinstance(info, CommandInfo) else None - def find_command_by_text(self, text: str) -> Optional[tuple[Type, dict]]: + def find_command_by_text(self, text: str) -> Optional[tuple[Type, dict, bool, str]]: """根据文本查找匹配的命令 Args: text: 输入文本 Returns: - Optional[tuple[Type, dict]]: (命令类, 匹配的命名组) 或 None + Optional[tuple[Type, dict, bool, str]]: (命令类, 匹配的命名组, 是否拦截消息, 插件名) 或 None """ for pattern, command_class in self._command_patterns.items(): match = pattern.match(text) @@ -164,7 +164,7 @@ class ComponentRegistry: if command_name: command_info = self.get_command_info(command_name) if command_info and command_info.enabled: - return command_class, match.groupdict() + return command_class, match.groupdict(), command_info.intercept_message, command_info.plugin_name return None # === 插件管理方法 === @@ -205,6 +205,20 @@ class ComponentRegistry: plugin_info = self.get_plugin_info(plugin_name) return plugin_info.components if plugin_info else [] + def get_plugin_config(self, plugin_name: str) -> Optional[dict]: + """获取插件配置 + + Args: + plugin_name: 插件名称 + + Returns: + Optional[dict]: 插件配置字典或None + """ + # 从插件管理器获取插件实例的配置 + from src.plugin_system.core.plugin_manager import plugin_manager + plugin_instance = plugin_manager.get_plugin_instance(plugin_name) + return plugin_instance.config if plugin_instance else None + # === 状态管理方法 === def enable_component(self, component_name: str) -> bool: diff --git a/src/plugin_system/core/plugin_manager.py b/src/plugin_system/core/plugin_manager.py index ea6c4977c..c2acdf527 100644 --- a/src/plugin_system/core/plugin_manager.py +++ b/src/plugin_system/core/plugin_manager.py @@ -2,6 +2,7 @@ from typing import Dict, List, Optional, Any, TYPE_CHECKING import os import importlib import importlib.util +import inspect from pathlib import Path if TYPE_CHECKING: @@ -72,9 +73,10 @@ class PluginManager: if plugin_dir: self.plugin_paths[plugin_name] = plugin_dir - if instantiate_and_register_plugin(plugin_class, plugin_dir): + plugin_instance = plugin_class(plugin_dir=plugin_dir) + if plugin_instance.register_plugin(): total_registered += 1 - self.loaded_plugins[plugin_name] = plugin_class + self.loaded_plugins[plugin_name] = plugin_instance # 📊 显示插件详细信息 plugin_info = component_registry.get_plugin_info(plugin_name) @@ -288,6 +290,17 @@ class PluginManager: return True return False + def get_plugin_instance(self, plugin_name: str) -> Optional["BasePlugin"]: + """获取插件实例 + + Args: + plugin_name: 插件名称 + + Returns: + Optional[BasePlugin]: 插件实例或None + """ + return self.loaded_plugins.get(plugin_name) + def get_plugin_stats(self) -> Dict[str, Any]: """获取插件统计信息""" all_plugins = component_registry.get_all_plugins() diff --git a/src/plugins/README.md b/src/plugins/README.md deleted file mode 100644 index e1880b087..000000000 --- a/src/plugins/README.md +++ /dev/null @@ -1,206 +0,0 @@ -# MaiBot 插件系统架构 - -## 概述 - -MaiBot 插件系统采用组件化设计,支持插件包含多种组件类型: -- **Action组件**:处理聊天中的动作逻辑 -- **Command组件**:处理命令请求 -- **未来扩展**:Scheduler(定时任务)、Listener(事件监听)等 - -## 目录结构 - -``` -src/plugins/ -├── core/ # 插件核心管理 -│ ├── plugin_manager.py # 插件管理器 -│ ├── plugin_loader.py # 插件加载器(预留) -│ └── component_registry.py # 组件注册中心 -├── apis/ # API模块 -│ ├── plugin_api.py # 统一API聚合 -│ ├── message_api.py # 消息API -│ ├── llm_api.py # LLM API -│ ├── database_api.py # 数据库API -│ ├── config_api.py # 配置API -│ ├── utils_api.py # 工具API -│ ├── stream_api.py # 流API -│ └── hearflow_api.py # 心流API -├── base/ # 基础类 -│ ├── base_plugin.py # 插件基类 -│ ├── base_action.py # Action组件基类 -│ ├── base_command.py # Command组件基类 -│ └── component_types.py # 组件类型定义 -├── built_in/ # 内置组件 -│ ├── actions/ # 内置Action -│ └── commands/ # 内置Command -└── examples/ # 示例插件 - └── simple_plugin/ # 简单插件示例 - ├── plugin.py - └── config.toml -``` - -## 核心特性 - -### 1. 组件化设计 -- 插件可以包含多种组件类型 -- 每种组件有明确的职责和接口 -- 支持组件的独立启用/禁用 - -### 2. 统一的API访问 -- 所有插件组件通过 `PluginAPI` 访问系统功能 -- 包含消息发送、数据库操作、LLM调用等 -- 提供统一的错误处理和日志记录 - -### 3. 灵活的配置系统 -- 支持 TOML 格式的配置文件 -- 插件可以读取自定义配置 -- 支持全局配置和插件特定配置 - -### 4. 统一的注册管理 -- 组件注册中心管理所有组件 -- 支持组件的动态启用/禁用 -- 提供丰富的查询和统计接口 - -## 插件开发指南 - -### 创建基本插件 - -```python -from src.plugins.base.base_plugin import BasePlugin, register_plugin -from src.plugins.base.base_action import BaseAction -from src.plugins.base.component_types import ActionInfo, ActionActivationType - -class MyAction(BaseAction): - async def execute(self) -> tuple[bool, str]: - # 使用API发送消息 - response = "Hello from my plugin!" - return True, response - -@register_plugin -class MyPlugin(BasePlugin): - plugin_name = "my_plugin" - plugin_description = "我的第一个插件" - - def get_plugin_components(self): - action_info = ActionInfo( - name="my_action", - description="我的动作", - activation_keywords=["hello"] - ) - return [(action_info, MyAction)] -``` - -### 创建命令组件 - -```python -from src.plugins.base.base_command import BaseCommand -from src.plugins.base.component_types import CommandInfo - -class MyCommand(BaseCommand): - async def execute(self) -> tuple[bool, str]: - # 获取命令参数 - param = self.matched_groups.get("param", "") - - # 发送回复 - await self.send_reply(f"收到参数: {param}") - return True, f"处理完成: {param}" - -# 在插件中注册 -def get_plugin_components(self): - command_info = CommandInfo( - name="my_command", - description="我的命令", - command_pattern=r"^/mycmd\s+(?P\w+)$", - command_help="用法:/mycmd <参数>" - ) - return [(command_info, MyCommand)] -``` - -### 使用配置文件 - -```toml -# config.toml -[plugin] -name = "my_plugin" -enabled = true - -[my_settings] -max_items = 10 -default_message = "Hello World" -``` - -```python -class MyPlugin(BasePlugin): - config_file_name = "config.toml" - - def get_plugin_components(self): - # 读取配置 - max_items = self.get_config("my_settings.max_items", 5) - message = self.get_config("my_settings.default_message", "Hi") - - # 使用配置创建组件... -``` - -## API使用示例 - -### 消息操作 -```python -# 发送文本消息 -await self.api.send_text_to_group(chat_stream, "Hello!") - -# 发送图片 -await self.api.send_image_to_group(chat_stream, image_path) -``` - -### 数据库操作 -```python -# 查询数据 -data = await self.api.db_get("table_name", "key") - -# 保存数据 -await self.api.db_set("table_name", "key", "value") -``` - -### LLM调用 -```python -# 生成文本 -response = await self.api.llm_text_request("你好,请介绍一下自己") - -# 生成图片 -image_url = await self.api.llm_image_request("一只可爱的猫咪") -``` - -## 内置组件迁移 - -现有的内置Action和Command将迁移到新架构: - -### Action迁移 -- `reply_action.py` → `src/plugins/built_in/actions/reply_action.py` -- `emoji_action.py` → `src/plugins/built_in/actions/emoji_action.py` -- `no_reply_action.py` → `src/plugins/built_in/actions/no_reply_action.py` - -### Command迁移 -- 现有命令系统将封装为内置Command组件 -- 保持现有的命令模式和功能 - -## 兼容性 - -新插件系统保持与现有系统的兼容性: -- 现有的Action和Command继续工作 -- 提供兼容层和适配器 -- 逐步迁移到新架构 - -## 扩展性 - -系统设计支持未来扩展: -- 新的组件类型(Scheduler、Listener等) -- 插件间依赖和通信 -- 插件热重载 -- 插件市场和分发 - -## 最佳实践 - -1. **单一职责**:每个组件专注于特定功能 -2. **配置驱动**:通过配置文件控制行为 -3. **错误处理**:妥善处理异常情况 -4. **日志记录**:记录关键操作和错误 -5. **测试覆盖**:为插件编写单元测试 \ No newline at end of file diff --git a/src/plugins/built_in/core_actions/plugin.py b/src/plugins/built_in/core_actions/plugin.py index 9e7110e9b..c70b4b0b1 100644 --- a/src/plugins/built_in/core_actions/plugin.py +++ b/src/plugins/built_in/core_actions/plugin.py @@ -10,6 +10,7 @@ from typing import List, Tuple, Type, Optional # 导入新插件系统 from src.plugin_system import BasePlugin, register_plugin, BaseAction, ComponentInfo, ActionActivationType, ChatMode +from src.plugin_system.base.base_command import BaseCommand # 导入依赖的系统组件 from src.common.logger_manager import get_logger @@ -341,19 +342,97 @@ class CoreActionsPlugin(BasePlugin): return [ # 回复动作 - (ReplyAction.get_action_info(name="reply", description="参与聊天回复,处理文本和表情的发送"), ReplyAction), + (ReplyAction.get_action_info( + name="reply", + description="参与聊天回复,处理文本和表情的发送" + ), ReplyAction), + # 不回复动作 - ( - NoReplyAction.get_action_info(name="no_reply", description="暂时不回复消息,等待新消息或超时"), - NoReplyAction, - ), + (NoReplyAction.get_action_info( + name="no_reply", + description="暂时不回复消息,等待新消息或超时" + ), NoReplyAction), + # 表情动作 - (EmojiAction.get_action_info(name="emoji", description="发送表情包辅助表达情绪"), EmojiAction), + (EmojiAction.get_action_info( + name="emoji", + description="发送表情包辅助表达情绪" + ), EmojiAction), + # 退出专注聊天动作 - ( - ExitFocusChatAction.get_action_info( - name="exit_focus_chat", description="退出专注聊天,从专注模式切换到普通模式" - ), - ExitFocusChatAction, - ), + (ExitFocusChatAction.get_action_info( + name="exit_focus_chat", + description="退出专注聊天,从专注模式切换到普通模式" + ), ExitFocusChatAction), + + # 示例Command - Ping命令 + (PingCommand.get_command_info( + name="ping", + description="测试机器人响应,拦截后续处理" + ), PingCommand), + + # 示例Command - Log命令 + (LogCommand.get_command_info( + name="log", + description="记录消息到日志,不拦截后续处理" + ), LogCommand) ] + + +# ===== 示例Command组件 ===== + +class PingCommand(BaseCommand): + """Ping命令 - 测试响应,拦截消息处理""" + + command_pattern = r"^/ping(\s+(?P.+))?$" + command_help = "测试机器人响应 - 拦截后续处理" + command_examples = ["/ping", "/ping 测试消息"] + intercept_message = True # 拦截消息,不继续处理 + + async def execute(self) -> Tuple[bool, Optional[str]]: + """执行ping命令""" + try: + message = self.matched_groups.get("message", "") + reply_text = f"🏓 Pong! {message}" if message else "🏓 Pong!" + + await self.send_reply(reply_text) + return True, f"发送ping响应: {reply_text}" + + except Exception as e: + logger.error(f"Ping命令执行失败: {e}") + return False, f"执行失败: {str(e)}" + + +class LogCommand(BaseCommand): + """日志命令 - 记录消息但不拦截后续处理""" + + command_pattern = r"^/log(\s+(?Pdebug|info|warn|error))?$" + command_help = "记录当前消息到日志 - 不拦截后续处理" + command_examples = ["/log", "/log info", "/log debug"] + intercept_message = False # 不拦截消息,继续后续处理 + + async def execute(self) -> Tuple[bool, Optional[str]]: + """执行日志命令""" + try: + level = self.matched_groups.get("level", "info") + user_nickname = self.message.message_info.user_info.user_nickname + content = self.message.processed_plain_text + + log_message = f"[{level.upper()}] 用户 {user_nickname}: {content}" + + # 根据级别记录日志 + if level == "debug": + logger.debug(log_message) + elif level == "warn": + logger.warning(log_message) + elif level == "error": + logger.error(log_message) + else: + logger.info(log_message) + + # 不发送回复,让消息继续处理 + return True, f"已记录到{level}级别日志" + + except Exception as e: + logger.error(f"Log命令执行失败: {e}") + return False, f"执行失败: {str(e)}" diff --git a/src/plugins/doubao_pic/__init__.py b/src/plugins/built_in/doubao_pic/__init__.py similarity index 100% rename from src/plugins/doubao_pic/__init__.py rename to src/plugins/built_in/doubao_pic/__init__.py diff --git a/src/plugins/doubao_pic/actions/__init__.py b/src/plugins/built_in/doubao_pic/actions/__init__.py similarity index 100% rename from src/plugins/doubao_pic/actions/__init__.py rename to src/plugins/built_in/doubao_pic/actions/__init__.py diff --git a/src/plugins/doubao_pic/actions/generate_pic_config.py b/src/plugins/built_in/doubao_pic/actions/generate_pic_config.py similarity index 100% rename from src/plugins/doubao_pic/actions/generate_pic_config.py rename to src/plugins/built_in/doubao_pic/actions/generate_pic_config.py diff --git a/src/plugins/doubao_pic/actions/pic_action.py b/src/plugins/built_in/doubao_pic/actions/pic_action.py similarity index 100% rename from src/plugins/doubao_pic/actions/pic_action.py rename to src/plugins/built_in/doubao_pic/actions/pic_action.py diff --git a/src/plugins/doubao_pic/actions/pic_action_config.toml b/src/plugins/built_in/doubao_pic/actions/pic_action_config.toml similarity index 100% rename from src/plugins/doubao_pic/actions/pic_action_config.toml rename to src/plugins/built_in/doubao_pic/actions/pic_action_config.toml diff --git a/src/plugins/doubao_pic/actions/pic_action_config.toml.backup b/src/plugins/built_in/doubao_pic/actions/pic_action_config.toml.backup similarity index 100% rename from src/plugins/doubao_pic/actions/pic_action_config.toml.backup rename to src/plugins/built_in/doubao_pic/actions/pic_action_config.toml.backup diff --git a/src/plugins/built_in/mute_plugin/config.toml b/src/plugins/built_in/mute_plugin/config.toml new file mode 100644 index 000000000..4e8ccf440 --- /dev/null +++ b/src/plugins/built_in/mute_plugin/config.toml @@ -0,0 +1,74 @@ +# 禁言插件配置文件 + +[plugin] +name = "mute_plugin" +version = "2.0.0" +enabled = true +description = "群聊禁言管理插件,提供智能禁言功能" + +# 组件启用控制 +[components] +enable_smart_mute = true # 启用智能禁言Action +enable_mute_command = true # 启用禁言命令Command + +# 禁言配置 +[mute] +# 时长限制(秒) +min_duration = 60 # 最短禁言时长 +max_duration = 2592000 # 最长禁言时长(30天) +default_duration = 300 # 默认禁言时长(5分钟) + +# 是否启用时长美化显示 +enable_duration_formatting = true + +# 是否记录禁言历史 +log_mute_history = true + +# 禁言消息模板 +templates = [ + "好的,禁言 {target} {duration},理由:{reason}", + "收到,对 {target} 执行禁言 {duration},因为{reason}", + "明白了,禁言 {target} {duration},原因是{reason}", + "✅ 已禁言 {target} {duration},理由:{reason}", + "🔇 对 {target} 执行禁言 {duration},因为{reason}", + "⛔ 禁言 {target} {duration},原因:{reason}" +] + +# 错误消息模板 +error_messages = [ + "没有指定禁言对象呢~", + "没有指定禁言时长呢~", + "禁言时长必须是正数哦~", + "禁言时长必须是数字哦~", + "找不到 {target} 这个人呢~", + "查找用户信息时出现问题~" +] + +# 智能禁言Action配置 +[smart_mute] +# LLM判定严格模式 +strict_mode = true + +# 关键词激活设置 +keyword_sensitivity = "normal" # low, normal, high + +# 并行执行设置 +allow_parallel = false + +# 禁言命令配置 +[mute_command] +# 是否需要管理员权限 +require_admin = true + +# 最大批量禁言数量 +max_batch_size = 5 + +# 命令冷却时间(秒) +cooldown_seconds = 3 + +# 日志配置 +[logging] +level = "INFO" +prefix = "[MutePlugin]" +include_user_info = true +include_duration_info = true \ No newline at end of file diff --git a/src/plugins/built_in/mute_plugin/plugin.py b/src/plugins/built_in/mute_plugin/plugin.py new file mode 100644 index 000000000..8ced269be --- /dev/null +++ b/src/plugins/built_in/mute_plugin/plugin.py @@ -0,0 +1,385 @@ +""" +禁言插件 + +提供智能禁言功能的群聊管理插件。 + +功能特性: +- 智能LLM判定:根据聊天内容智能判断是否需要禁言 +- 灵活的时长管理:支持自定义禁言时长限制 +- 模板化消息:支持自定义禁言提示消息 +- 参数验证:完整的输入参数验证和错误处理 +- 配置文件支持:所有设置可通过配置文件调整 + +包含组件: +- 智能禁言Action - 基于LLM判断是否需要禁言 +- 禁言命令Command - 手动执行禁言操作 +""" + +from typing import List, Tuple, Type, Optional, Dict, Any +import random + +# 导入新插件系统 +from src.plugin_system.base.base_plugin import BasePlugin +from src.plugin_system.base.base_plugin import register_plugin +from src.plugin_system.base.base_action import BaseAction +from src.plugin_system.base.base_command import BaseCommand +from src.plugin_system.base.component_types import ComponentInfo, ActionActivationType, ChatMode +from src.common.logger_manager import get_logger + +logger = get_logger("mute_plugin") + + +# ===== Action组件 ===== + +class MuteAction(BaseAction): + """智能禁言Action - 基于LLM智能判断是否需要禁言""" + + # Action基本信息 + action_name = "mute" + action_description = "智能禁言系统,基于LLM判断是否需要禁言" + + # 激活设置 + focus_activation_type = ActionActivationType.LLM_JUDGE # Focus模式使用LLM判定,确保谨慎 + normal_activation_type = ActionActivationType.KEYWORD # Normal模式使用关键词激活,快速响应 + + # 关键词设置(用于Normal模式) + activation_keywords = ["禁言", "mute", "ban", "silence"] + keyword_case_sensitive = False + + # LLM判定提示词(用于Focus模式) + llm_judge_prompt = """ +判定是否需要使用禁言动作的严格条件: + +使用禁言的情况: +1. 用户发送明显违规内容(色情、暴力、政治敏感等) +2. 恶意刷屏或垃圾信息轰炸 +3. 用户主动明确要求被禁言("禁言我"等) +4. 严重违反群规的行为 +5. 恶意攻击他人或群组管理 + +绝对不要使用的情况: +2. 情绪化表达但无恶意 +3. 开玩笑或调侃,除非过分 +4. 单纯的意见分歧或争论 + +""" + + mode_enable = ChatMode.ALL + parallel_action = False + + # Action参数定义 + action_parameters = { + "target": "禁言对象,必填,输入你要禁言的对象的名字", + "duration": "禁言时长,必填,输入你要禁言的时长(秒),单位为秒,必须为数字", + "reason": "禁言理由,可选" + } + + # Action使用场景 + action_require = [ + "当有人违反了公序良俗的内容", + "当有人刷屏时使用", + "当有人发了擦边,或者色情内容时使用", + "当有人要求禁言自己时使用", + "如果某人已经被禁言了,就不要再次禁言了,除非你想追加时间!!" + ] + + async def execute(self) -> Tuple[bool, Optional[str]]: + """执行智能禁言判定""" + logger.info(f"{self.log_prefix} 执行智能禁言动作") + + # 获取参数 + target = self.action_data.get("target") + duration = self.action_data.get("duration") + reason = self.action_data.get("reason", "违反群规") + + # 参数验证 + if not target: + error_msg = "禁言目标不能为空" + logger.error(f"{self.log_prefix} {error_msg}") + await self.send_reply("没有指定禁言对象呢~") + return False, error_msg + + if not duration: + error_msg = "禁言时长不能为空" + logger.error(f"{self.log_prefix} {error_msg}") + await self.send_reply("没有指定禁言时长呢~") + return False, error_msg + + # 获取时长限制配置 + min_duration = self.api.get_config("mute.min_duration", 60) + max_duration = self.api.get_config("mute.max_duration", 2592000) + + # 验证时长格式并转换 + try: + duration_int = int(duration) + if duration_int <= 0: + error_msg = "禁言时长必须大于0" + logger.error(f"{self.log_prefix} {error_msg}") + await self.send_reply("禁言时长必须是正数哦~") + return False, error_msg + + # 限制禁言时长范围 + if duration_int < min_duration: + duration_int = min_duration + logger.info(f"{self.log_prefix} 禁言时长过短,调整为{min_duration}秒") + elif duration_int > max_duration: + duration_int = max_duration + logger.info(f"{self.log_prefix} 禁言时长过长,调整为{max_duration}秒") + + except (ValueError, TypeError): + error_msg = f"禁言时长格式无效: {duration}" + logger.error(f"{self.log_prefix} {error_msg}") + await self.send_reply("禁言时长必须是数字哦~") + return False, error_msg + + # 获取用户ID + try: + platform, user_id = await self.api.get_user_id_by_person_name(target) + except Exception as e: + error_msg = f"查找用户ID时出错: {e}" + logger.error(f"{self.log_prefix} {error_msg}") + await self.send_reply("查找用户信息时出现问题~") + return False, error_msg + + if not user_id: + error_msg = f"未找到用户 {target} 的ID" + await self.send_reply(f"找不到 {target} 这个人呢~") + logger.error(f"{self.log_prefix} {error_msg}") + return False, error_msg + + # 格式化时长显示 + enable_formatting = self.api.get_config("mute.enable_duration_formatting", True) + time_str = self._format_duration(duration_int) if enable_formatting else f"{duration_int}秒" + + # 获取模板化消息 + message = self._get_template_message(target, time_str, reason) + await self.send_reply(message) + + # 发送群聊禁言命令 + success = await self.send_command( + command_name="GROUP_BAN", + args={"qq_id": str(user_id), "duration": str(duration_int)}, + display_message=f"禁言了 {target} {time_str}" + ) + + if success: + logger.info(f"{self.log_prefix} 成功发送禁言命令,用户 {target}({user_id}),时长 {duration_int} 秒") + return True, f"成功禁言 {target},时长 {time_str}" + else: + error_msg = "发送禁言命令失败" + logger.error(f"{self.log_prefix} {error_msg}") + await self.send_reply(f"执行禁言动作失败") + return False, error_msg + + def _get_template_message(self, target: str, duration_str: str, reason: str) -> str: + """获取模板化的禁言消息""" + templates = self.api.get_config("mute.templates", [ + "好的,禁言 {target} {duration},理由:{reason}", + "收到,对 {target} 执行禁言 {duration},因为{reason}", + "明白了,禁言 {target} {duration},原因是{reason}" + ]) + + template = random.choice(templates) + return template.format(target=target, duration=duration_str, reason=reason) + + def _format_duration(self, seconds: int) -> str: + """将秒数格式化为可读的时间字符串""" + if seconds < 60: + return f"{seconds}秒" + elif seconds < 3600: + minutes = seconds // 60 + remaining_seconds = seconds % 60 + if remaining_seconds > 0: + return f"{minutes}分{remaining_seconds}秒" + else: + return f"{minutes}分钟" + elif seconds < 86400: + hours = seconds // 3600 + remaining_minutes = (seconds % 3600) // 60 + if remaining_minutes > 0: + return f"{hours}小时{remaining_minutes}分钟" + else: + return f"{hours}小时" + else: + days = seconds // 86400 + remaining_hours = (seconds % 86400) // 3600 + if remaining_hours > 0: + return f"{days}天{remaining_hours}小时" + else: + return f"{days}天" + + +# ===== Command组件 ===== + +class MuteCommand(BaseCommand): + """禁言命令 - 手动执行禁言操作""" + + # Command基本信息 + command_name = "mute_command" + command_description = "禁言命令,手动执行禁言操作" + + command_pattern = r"^/mute\s+(?P\S+)\s+(?P\d+)(?:\s+(?P.+))?$" + command_help = "禁言指定用户,用法:/mute <用户名> <时长(秒)> [理由]" + command_examples = [ + "/mute 用户名 300", + "/mute 张三 600 刷屏", + "/mute @某人 1800 违规内容" + ] + intercept_message = True # 拦截消息处理 + + async def execute(self) -> Tuple[bool, Optional[str]]: + """执行禁言命令""" + try: + target = self.matched_groups.get("target") + duration = self.matched_groups.get("duration") + reason = self.matched_groups.get("reason", "管理员操作") + + if not all([target, duration]): + await self.send_reply("❌ 命令参数不完整,请检查格式") + return False, "参数不完整" + + # 获取时长限制配置 + min_duration = self.api.get_config("mute.min_duration", 60) + max_duration = self.api.get_config("mute.max_duration", 2592000) + + # 验证时长 + try: + duration_int = int(duration) + if duration_int <= 0: + await self.send_reply("❌ 禁言时长必须大于0") + return False, "时长无效" + + # 限制禁言时长范围 + if duration_int < min_duration: + duration_int = min_duration + await self.send_reply(f"⚠️ 禁言时长过短,调整为{min_duration}秒") + elif duration_int > max_duration: + duration_int = max_duration + await self.send_reply(f"⚠️ 禁言时长过长,调整为{max_duration}秒") + + except ValueError: + await self.send_reply("❌ 禁言时长必须是数字") + return False, "时长格式错误" + + # 获取用户ID + try: + platform, user_id = await self.api.get_user_id_by_person_name(target) + except Exception as e: + logger.error(f"{self.log_prefix} 查找用户ID时出错: {e}") + await self.send_reply("❌ 查找用户信息时出现问题") + return False, str(e) + + if not user_id: + await self.send_reply(f"❌ 找不到用户: {target}") + return False, "用户不存在" + + # 格式化时长显示 + enable_formatting = self.api.get_config("mute.enable_duration_formatting", True) + time_str = self._format_duration(duration_int) if enable_formatting else f"{duration_int}秒" + + logger.info(f"{self.log_prefix} 执行禁言命令: {target}({user_id}) -> {time_str}") + + # 发送群聊禁言命令 + success = await self.send_command( + command_name="GROUP_BAN", + args={"qq_id": str(user_id), "duration": str(duration_int)}, + display_message=f"禁言了 {target} {time_str}" + ) + + if success: + # 获取并发送模板化消息 + message = self._get_template_message(target, time_str, reason) + await self.send_reply(message) + + logger.info(f"{self.log_prefix} 成功禁言 {target}({user_id}),时长 {duration_int} 秒") + return True, f"成功禁言 {target},时长 {time_str}" + else: + await self.send_reply("❌ 发送禁言命令失败") + return False, "发送禁言命令失败" + + except Exception as e: + logger.error(f"{self.log_prefix} 禁言命令执行失败: {e}") + await self.send_reply(f"❌ 禁言命令错误: {str(e)}") + return False, str(e) + + def _get_template_message(self, target: str, duration_str: str, reason: str) -> str: + """获取模板化的禁言消息""" + templates = self.api.get_config("mute.templates", [ + "✅ 已禁言 {target} {duration},理由:{reason}", + "🔇 对 {target} 执行禁言 {duration},因为{reason}", + "⛔ 禁言 {target} {duration},原因:{reason}" + ]) + + template = random.choice(templates) + return template.format(target=target, duration=duration_str, reason=reason) + + def _format_duration(self, seconds: int) -> str: + """将秒数格式化为可读的时间字符串""" + if seconds < 60: + return f"{seconds}秒" + elif seconds < 3600: + minutes = seconds // 60 + remaining_seconds = seconds % 60 + if remaining_seconds > 0: + return f"{minutes}分{remaining_seconds}秒" + else: + return f"{minutes}分钟" + elif seconds < 86400: + hours = seconds // 3600 + remaining_minutes = (seconds % 3600) // 60 + if remaining_minutes > 0: + return f"{hours}小时{remaining_minutes}分钟" + else: + return f"{hours}小时" + else: + days = seconds // 86400 + remaining_hours = (seconds % 86400) // 3600 + if remaining_hours > 0: + return f"{days}天{remaining_hours}小时" + else: + return f"{days}天" + + +# ===== 插件主类 ===== + +@register_plugin +class MutePlugin(BasePlugin): + """禁言插件 + + 提供智能禁言功能: + - 智能禁言Action:基于LLM判断是否需要禁言 + - 禁言命令Command:手动执行禁言操作 + """ + + # 插件基本信息 + plugin_name = "mute_plugin" + plugin_description = "群聊禁言管理插件,提供智能禁言功能" + plugin_version = "2.0.0" + plugin_author = "MaiBot开发团队" + enable_plugin = True + config_file_name = "config.toml" + + def get_plugin_components(self) -> List[Tuple[ComponentInfo, Type]]: + """返回插件包含的组件列表""" + + # 从配置获取组件启用状态 + enable_smart_mute = self.get_config("components.enable_smart_mute", True) + enable_mute_command = self.get_config("components.enable_mute_command", True) + + components = [] + + # 添加智能禁言Action + if enable_smart_mute: + components.append(( + MuteAction.get_action_info(), + MuteAction + )) + + # 添加禁言命令Command + if enable_mute_command: + components.append(( + MuteCommand.get_command_info(), + MuteCommand + )) + + return components \ No newline at end of file diff --git a/src/plugins/tts_plgin/__init__.py b/src/plugins/built_in/tts_plgin/__init__.py similarity index 100% rename from src/plugins/tts_plgin/__init__.py rename to src/plugins/built_in/tts_plgin/__init__.py diff --git a/src/plugins/tts_plgin/actions/__init__.py b/src/plugins/built_in/tts_plgin/actions/__init__.py similarity index 100% rename from src/plugins/tts_plgin/actions/__init__.py rename to src/plugins/built_in/tts_plgin/actions/__init__.py diff --git a/src/plugins/tts_plgin/actions/tts_action.py b/src/plugins/built_in/tts_plgin/actions/tts_action.py similarity index 100% rename from src/plugins/tts_plgin/actions/tts_action.py rename to src/plugins/built_in/tts_plgin/actions/tts_action.py diff --git a/src/plugins/vtb_action/__init__.py b/src/plugins/built_in/vtb_action/__init__.py similarity index 100% rename from src/plugins/vtb_action/__init__.py rename to src/plugins/built_in/vtb_action/__init__.py diff --git a/src/plugins/vtb_action/actions/__init__.py b/src/plugins/built_in/vtb_action/actions/__init__.py similarity index 100% rename from src/plugins/vtb_action/actions/__init__.py rename to src/plugins/built_in/vtb_action/actions/__init__.py diff --git a/src/plugins/vtb_action/actions/vtb_action.py b/src/plugins/built_in/vtb_action/actions/vtb_action.py similarity index 100% rename from src/plugins/vtb_action/actions/vtb_action.py rename to src/plugins/built_in/vtb_action/actions/vtb_action.py diff --git a/src/plugins/example_command_plugin/__init__.py b/src/plugins/example_command_plugin/__init__.py deleted file mode 100644 index 482b8c27c..000000000 --- a/src/plugins/example_command_plugin/__init__.py +++ /dev/null @@ -1,14 +0,0 @@ -"""示例命令插件包 - -这是一个演示如何使用命令系统的示例插件。 - -功能特性: -- 提供简单的命令示例 -- 演示命令参数提取 -- 展示命令帮助信息 - -使用场景: -- 用户输入特定格式的命令时触发 -- 通过命令前缀(如/)快速执行特定功能 -- 提供快速响应的交互方式 -""" diff --git a/src/plugins/example_command_plugin/commands/__init__.py b/src/plugins/example_command_plugin/commands/__init__.py deleted file mode 100644 index 9fb74a8c3..000000000 --- a/src/plugins/example_command_plugin/commands/__init__.py +++ /dev/null @@ -1,4 +0,0 @@ -"""示例命令包 - -包含示例命令的实现 -""" diff --git a/src/plugins/example_command_plugin/commands/custom_prefix_command.py b/src/plugins/example_command_plugin/commands/custom_prefix_command.py deleted file mode 100644 index 5297dd9ad..000000000 --- a/src/plugins/example_command_plugin/commands/custom_prefix_command.py +++ /dev/null @@ -1,59 +0,0 @@ -from src.common.logger_manager import get_logger -from src.chat.command.command_handler import BaseCommand, register_command -from typing import Tuple, Optional -import random - -logger = get_logger("custom_prefix_command") - - -@register_command -class DiceCommand(BaseCommand): - """骰子命令,使用!前缀而不是/前缀""" - - command_name = "dice" - command_description = "骰子命令,随机生成1-6的数字" - command_pattern = r"^[!!](?:dice|骰子)(?:\s+(?P\d+))?$" # 匹配 !dice 或 !骰子,可选参数为骰子数量 - command_help = "使用方法: !dice [数量] 或 !骰子 [数量] - 掷骰子,默认掷1个" - command_examples = ["!dice", "!骰子", "!dice 3", "!骰子 5"] - enable_command = True - - async def execute(self) -> Tuple[bool, Optional[str]]: - """执行骰子命令 - - Returns: - Tuple[bool, Optional[str]]: (是否执行成功, 回复消息) - """ - try: - # 获取骰子数量,默认为1 - count_str = self.matched_groups.get("count") - - # 确保count_str不为None - if count_str is None: - count = 1 # 默认值 - else: - try: - count = int(count_str) - if count <= 0: - return False, "骰子数量必须大于0" - if count > 10: # 限制最大数量 - return False, "一次最多只能掷10个骰子" - except ValueError: - return False, "骰子数量必须是整数" - - # 生成随机数 - results = [random.randint(1, 6) for _ in range(count)] - - # 构建回复消息 - if count == 1: - message = f"🎲 掷出了 {results[0]} 点" - else: - dice_results = ", ".join(map(str, results)) - total = sum(results) - message = f"🎲 掷出了 {count} 个骰子: [{dice_results}],总点数: {total}" - - logger.info(f"{self.log_prefix} 执行骰子命令: {message}") - return True, message - - except Exception as e: - logger.error(f"{self.log_prefix} 执行骰子命令时出错: {e}") - return False, f"执行命令时出错: {str(e)}" diff --git a/src/plugins/example_command_plugin/commands/help_command.py b/src/plugins/example_command_plugin/commands/help_command.py deleted file mode 100644 index 020f48300..000000000 --- a/src/plugins/example_command_plugin/commands/help_command.py +++ /dev/null @@ -1,111 +0,0 @@ -from src.common.logger_manager import get_logger -from src.chat.command.command_handler import BaseCommand, register_command, _COMMAND_REGISTRY -from typing import Tuple, Optional - -logger = get_logger("help_command") - - -@register_command -class HelpCommand(BaseCommand): - """帮助命令,显示所有可用命令的帮助信息""" - - command_name = "help" - command_description = "显示所有可用命令的帮助信息" - command_pattern = r"^/help(?:\s+(?P\w+))?$" # 匹配 /help 或 /help 命令名 - command_help = "使用方法: /help [命令名] - 显示所有命令或特定命令的帮助信息" - command_examples = ["/help", "/help echo"] - enable_command = True - - async def execute(self) -> Tuple[bool, Optional[str]]: - """执行帮助命令 - - Returns: - Tuple[bool, Optional[str]]: (是否执行成功, 回复消息) - """ - try: - # 获取匹配到的命令名(如果有) - command_name = self.matched_groups.get("command") - - # 如果指定了命令名,显示该命令的详细帮助 - if command_name: - logger.info(f"{self.log_prefix} 查询命令帮助: {command_name}") - return self._show_command_help(command_name) - - # 否则,显示所有命令的简要帮助 - logger.info(f"{self.log_prefix} 查询所有命令帮助") - return self._show_all_commands() - - except Exception as e: - logger.error(f"{self.log_prefix} 执行帮助命令时出错: {e}") - return False, f"执行命令时出错: {str(e)}" - - def _show_command_help(self, command_name: str) -> Tuple[bool, str]: - """显示特定命令的详细帮助信息 - - Args: - command_name: 命令名称 - - Returns: - Tuple[bool, str]: (是否执行成功, 回复消息) - """ - # 查找命令 - command_cls = _COMMAND_REGISTRY.get(command_name) - - if not command_cls: - return False, f"未找到命令: {command_name}" - - # 获取命令信息 - description = getattr(command_cls, "command_description", "无描述") - help_text = getattr(command_cls, "command_help", "无帮助信息") - examples = getattr(command_cls, "command_examples", []) - - # 构建帮助信息 - help_info = [f"【命令】: {command_name}", f"【描述】: {description}", f"【用法】: {help_text}"] - - # 添加示例 - if examples: - help_info.append("【示例】:") - for example in examples: - help_info.append(f" {example}") - - return True, "\n".join(help_info) - - def _show_all_commands(self) -> Tuple[bool, str]: - """显示所有可用命令的简要帮助信息 - - Returns: - Tuple[bool, str]: (是否执行成功, 回复消息) - """ - # 获取所有已启用的命令 - enabled_commands = { - name: cls for name, cls in _COMMAND_REGISTRY.items() if getattr(cls, "enable_command", True) - } - - if not enabled_commands: - return True, "当前没有可用的命令" - - # 构建命令列表 - command_list = ["可用命令列表:"] - for name, cls in sorted(enabled_commands.items()): - description = getattr(cls, "command_description", "无描述") - # 获取命令前缀示例 - examples = getattr(cls, "command_examples", []) - prefix = "" - if examples and len(examples) > 0: - # 从第一个示例中提取前缀 - example = examples[0] - # 找到第一个空格前的内容作为前缀 - space_pos = example.find(" ") - if space_pos > 0: - prefix = example[:space_pos] - else: - prefix = example - else: - # 默认使用/name作为前缀 - prefix = f"/{name}" - - command_list.append(f"{prefix} - {description}") - - command_list.append("\n使用 /help <命令名> 获取特定命令的详细帮助") - - return True, "\n".join(command_list) diff --git a/src/plugins/example_command_plugin/commands/message_info_command.py b/src/plugins/example_command_plugin/commands/message_info_command.py deleted file mode 100644 index dd05c2a71..000000000 --- a/src/plugins/example_command_plugin/commands/message_info_command.py +++ /dev/null @@ -1,311 +0,0 @@ -from src.common.logger_manager import get_logger -from src.chat.command.command_handler import BaseCommand, register_command -from typing import Tuple, Optional - -logger = get_logger("message_info_command") - - -@register_command -class MessageInfoCommand(BaseCommand): - """消息信息查看命令,展示发送命令的原始消息和相关信息""" - - command_name = "msginfo" - command_description = "查看发送命令的原始消息信息" - command_pattern = r"^/msginfo(?:\s+(?Pfull|simple))?$" - command_help = "使用方法: /msginfo [full|simple] - 查看当前消息的详细信息" - command_examples = ["/msginfo", "/msginfo full", "/msginfo simple"] - enable_command = True - - async def execute(self) -> Tuple[bool, Optional[str]]: - """执行消息信息查看命令""" - try: - detail_level = self.matched_groups.get("detail", "simple") - - logger.info(f"{self.log_prefix} 查看消息信息,详细级别: {detail_level}") - - if detail_level == "full": - info_text = self._get_full_message_info() - else: - info_text = self._get_simple_message_info() - - return True, info_text - - except Exception as e: - logger.error(f"{self.log_prefix} 获取消息信息时出错: {e}") - return False, f"获取消息信息失败: {str(e)}" - - def _get_simple_message_info(self) -> str: - """获取简化的消息信息""" - message = self.message - - # 基础信息 - info_lines = [ - "📨 消息信息概览", - f"🆔 消息ID: {message.message_info.message_id}", - f"⏰ 时间: {message.message_info.time}", - f"🌐 平台: {message.message_info.platform}", - ] - - # 发送者信息 - user = message.message_info.user_info - info_lines.extend( - [ - "", - "👤 发送者信息:", - f" 用户ID: {user.user_id}", - f" 昵称: {user.user_nickname}", - f" 群名片: {user.user_cardname or '无'}", - ] - ) - - # 群聊信息(如果是群聊) - if message.message_info.group_info: - group = message.message_info.group_info - info_lines.extend( - [ - "", - "👥 群聊信息:", - f" 群ID: {group.group_id}", - f" 群名: {group.group_name or '未知'}", - ] - ) - else: - info_lines.extend( - [ - "", - "💬 消息类型: 私聊消息", - ] - ) - - # 消息内容 - info_lines.extend( - [ - "", - "📝 消息内容:", - f" 原始文本: {message.processed_plain_text}", - f" 是否表情: {'是' if getattr(message, 'is_emoji', False) else '否'}", - ] - ) - - # 聊天流信息 - if hasattr(message, "chat_stream") and message.chat_stream: - chat_stream = message.chat_stream - info_lines.extend( - [ - "", - "🔄 聊天流信息:", - f" 流ID: {chat_stream.stream_id}", - ] - ) - - return "\n".join(info_lines) - - def _get_full_message_info(self) -> str: - """获取完整的消息信息(包含技术细节)""" - message = self.message - - info_lines = [ - "📨 完整消息信息", - "=" * 40, - ] - - # 消息基础信息 - info_lines.extend( - [ - "", - "🔍 基础消息信息:", - f" 消息ID: {message.message_info.message_id}", - f" 时间戳: {message.message_info.time}", - f" 平台: {message.message_info.platform}", - f" 处理后文本: {message.processed_plain_text}", - f" 详细文本: {message.detailed_plain_text[:100]}{'...' if len(message.detailed_plain_text) > 100 else ''}", - ] - ) - - # 用户详细信息 - user = message.message_info.user_info - info_lines.extend( - [ - "", - "👤 发送者详细信息:", - f" 用户ID: {user.user_id}", - f" 昵称: {user.user_nickname}", - f" 群名片: {user.user_cardname or '无'}", - f" 平台: {user.platform}", - ] - ) - - # 群聊详细信息 - if message.message_info.group_info: - group = message.message_info.group_info - info_lines.extend( - [ - "", - "👥 群聊详细信息:", - f" 群ID: {group.group_id}", - f" 群名: {group.group_name or '未知'}", - f" 平台: {group.platform}", - ] - ) - else: - info_lines.append("\n💬 消息类型: 私聊消息") - - # 消息段信息 - if message.message_segment: - info_lines.extend( - [ - "", - "📦 消息段信息:", - f" 类型: {message.message_segment.type}", - f" 数据类型: {type(message.message_segment.data).__name__}", - f" 数据预览: {str(message.message_segment.data)[:200]}{'...' if len(str(message.message_segment.data)) > 200 else ''}", - ] - ) - - # 聊天流详细信息 - if hasattr(message, "chat_stream") and message.chat_stream: - chat_stream = message.chat_stream - info_lines.extend( - [ - "", - "🔄 聊天流详细信息:", - f" 流ID: {chat_stream.stream_id}", - f" 平台: {chat_stream.platform}", - f" 用户信息: {chat_stream.user_info.user_nickname} ({chat_stream.user_info.user_id})", - f" 群信息: {getattr(chat_stream.group_info, 'group_name', '私聊') if chat_stream.group_info else '私聊'}", - ] - ) - - # 回复信息 - if hasattr(message, "reply") and message.reply: - info_lines.extend( - [ - "", - "↩️ 回复信息:", - f" 回复消息ID: {message.reply.message_info.message_id}", - f" 回复内容: {message.reply.processed_plain_text[:100]}{'...' if len(message.reply.processed_plain_text) > 100 else ''}", - ] - ) - - # 原始消息数据(如果存在) - if hasattr(message, "raw_message") and message.raw_message: - info_lines.extend( - [ - "", - "🗂️ 原始消息数据:", - f" 数据类型: {type(message.raw_message).__name__}", - f" 数据大小: {len(str(message.raw_message))} 字符", - ] - ) - - return "\n".join(info_lines) - - -@register_command -class SenderInfoCommand(BaseCommand): - """发送者信息命令,快速查看发送者信息""" - - command_name = "whoami" - command_description = "查看发送命令的用户信息" - command_pattern = r"^/whoami$" - command_help = "使用方法: /whoami - 查看你的用户信息" - command_examples = ["/whoami"] - enable_command = True - - async def execute(self) -> Tuple[bool, Optional[str]]: - """执行发送者信息查看命令""" - try: - user = self.message.message_info.user_info - group = self.message.message_info.group_info - - info_lines = [ - "👤 你的身份信息", - f"🆔 用户ID: {user.user_id}", - f"📝 昵称: {user.user_nickname}", - f"🏷️ 群名片: {user.user_cardname or '无'}", - f"🌐 平台: {user.platform}", - ] - - if group: - info_lines.extend( - [ - "", - "👥 当前群聊:", - f"🆔 群ID: {group.group_id}", - f"📝 群名: {group.group_name or '未知'}", - ] - ) - else: - info_lines.append("\n💬 当前在私聊中") - - return True, "\n".join(info_lines) - - except Exception as e: - logger.error(f"{self.log_prefix} 获取发送者信息时出错: {e}") - return False, f"获取发送者信息失败: {str(e)}" - - -@register_command -class ChatStreamInfoCommand(BaseCommand): - """聊天流信息命令""" - - command_name = "streaminfo" - command_description = "查看当前聊天流的详细信息" - command_pattern = r"^/streaminfo$" - command_help = "使用方法: /streaminfo - 查看当前聊天流信息" - command_examples = ["/streaminfo"] - enable_command = True - - async def execute(self) -> Tuple[bool, Optional[str]]: - """执行聊天流信息查看命令""" - try: - if not hasattr(self.message, "chat_stream") or not self.message.chat_stream: - return False, "无法获取聊天流信息" - - chat_stream = self.message.chat_stream - - info_lines = [ - "🔄 聊天流信息", - f"🆔 流ID: {chat_stream.stream_id}", - f"🌐 平台: {chat_stream.platform}", - ] - - # 用户信息 - if chat_stream.user_info: - info_lines.extend( - [ - "", - "👤 关联用户:", - f" ID: {chat_stream.user_info.user_id}", - f" 昵称: {chat_stream.user_info.user_nickname}", - ] - ) - - # 群信息 - if chat_stream.group_info: - info_lines.extend( - [ - "", - "👥 关联群聊:", - f" 群ID: {chat_stream.group_info.group_id}", - f" 群名: {chat_stream.group_info.group_name or '未知'}", - ] - ) - else: - info_lines.append("\n💬 类型: 私聊流") - - # 最近消息统计 - if hasattr(chat_stream, "last_messages"): - msg_count = len(chat_stream.last_messages) - info_lines.extend( - [ - "", - f"📈 消息统计: 记录了 {msg_count} 条最近消息", - ] - ) - - return True, "\n".join(info_lines) - - except Exception as e: - logger.error(f"{self.log_prefix} 获取聊天流信息时出错: {e}") - return False, f"获取聊天流信息失败: {str(e)}" diff --git a/src/plugins/example_command_plugin/commands/send_msg_commad.py b/src/plugins/example_command_plugin/commands/send_msg_commad.py deleted file mode 100644 index 0b4176467..000000000 --- a/src/plugins/example_command_plugin/commands/send_msg_commad.py +++ /dev/null @@ -1,117 +0,0 @@ -from src.common.logger_manager import get_logger -from src.chat.command.command_handler import BaseCommand, register_command -from src.chat.actions.plugin_api.message_api import MessageAPI -from typing import Tuple, Optional - -logger = get_logger("send_msg_command") - - -@register_command -class SendMessageCommand(BaseCommand, MessageAPI): - """发送消息命令,可以向指定群聊或私聊发送消息""" - - command_name = "send" - command_description = "向指定群聊或私聊发送消息" - command_pattern = r"^/send\s+(?Pgroup|user)\s+(?P\d+)\s+(?P.+)$" - command_help = "使用方法: /send <消息内容> - 发送消息到指定群聊或用户" - command_examples = ["/send group 123456789 大家好!", "/send user 987654321 私聊消息"] - enable_command = True - - def __init__(self, message): - super().__init__(message) - # 初始化MessageAPI需要的服务(虽然这里不会用到,但保持一致性) - self._services = {} - self.log_prefix = f"[Command:{self.command_name}]" - - async def execute(self) -> Tuple[bool, Optional[str]]: - """执行发送消息命令 - - Returns: - Tuple[bool, Optional[str]]: (是否执行成功, 回复消息) - """ - try: - # 获取匹配到的参数 - target_type = self.matched_groups.get("target_type") # group 或 user - target_id = self.matched_groups.get("target_id") # 群ID或用户ID - content = self.matched_groups.get("content") # 消息内容 - - if not all([target_type, target_id, content]): - return False, "命令参数不完整,请检查格式" - - logger.info(f"{self.log_prefix} 执行发送消息命令: {target_type}:{target_id} -> {content[:50]}...") - - # 根据目标类型调用不同的发送方法 - if target_type == "group": - success = await self._send_to_group(target_id, content) - target_desc = f"群聊 {target_id}" - elif target_type == "user": - success = await self._send_to_user(target_id, content) - target_desc = f"用户 {target_id}" - else: - return False, f"不支持的目标类型: {target_type},只支持 group 或 user" - - # 返回执行结果 - if success: - return True, f"✅ 消息已成功发送到 {target_desc}" - else: - return False, f"❌ 消息发送失败,可能是目标 {target_desc} 不存在或没有权限" - - except Exception as e: - logger.error(f"{self.log_prefix} 执行发送消息命令时出错: {e}") - return False, f"命令执行出错: {str(e)}" - - async def _send_to_group(self, group_id: str, content: str) -> bool: - """发送消息到群聊 - - Args: - group_id: 群聊ID - content: 消息内容 - - Returns: - bool: 是否发送成功 - """ - try: - success = await self.send_text_to_group( - text=content, - group_id=group_id, - platform="qq", # 默认使用QQ平台 - ) - - if success: - logger.info(f"{self.log_prefix} 成功发送消息到群聊 {group_id}") - else: - logger.warning(f"{self.log_prefix} 发送消息到群聊 {group_id} 失败") - - return success - - except Exception as e: - logger.error(f"{self.log_prefix} 发送群聊消息时出错: {e}") - return False - - async def _send_to_user(self, user_id: str, content: str) -> bool: - """发送消息到私聊 - - Args: - user_id: 用户ID - content: 消息内容 - - Returns: - bool: 是否发送成功 - """ - try: - success = await self.send_text_to_user( - text=content, - user_id=user_id, - platform="qq", # 默认使用QQ平台 - ) - - if success: - logger.info(f"{self.log_prefix} 成功发送消息到用户 {user_id}") - else: - logger.warning(f"{self.log_prefix} 发送消息到用户 {user_id} 失败") - - return success - - except Exception as e: - logger.error(f"{self.log_prefix} 发送私聊消息时出错: {e}") - return False diff --git a/src/plugins/example_command_plugin/commands/send_msg_enhanced.py b/src/plugins/example_command_plugin/commands/send_msg_enhanced.py deleted file mode 100644 index bd46da916..000000000 --- a/src/plugins/example_command_plugin/commands/send_msg_enhanced.py +++ /dev/null @@ -1,149 +0,0 @@ -from src.common.logger_manager import get_logger -from src.chat.command.command_handler import BaseCommand, register_command -from src.chat.actions.plugin_api.message_api import MessageAPI -from typing import Tuple, Optional - -logger = get_logger("send_msg_enhanced") - - -@register_command -class SendMessageEnhancedCommand(BaseCommand, MessageAPI): - """增强版发送消息命令,支持多种消息类型和平台""" - - command_name = "sendfull" - command_description = "增强版消息发送命令,支持多种类型和平台" - command_pattern = r"^/sendfull\s+(?Ptext|image|emoji)\s+(?Pgroup|user)\s+(?P\d+)(?:\s+(?P\w+))?\s+(?P.+)$" - command_help = "使用方法: /sendfull <消息类型> <目标类型> [平台] <内容>" - command_examples = [ - "/sendfull text group 123456789 qq 大家好!这是文本消息", - "/sendfull image user 987654321 https://example.com/image.jpg", - "/sendfull emoji group 123456789 😄", - "/sendfull text user 987654321 qq 私聊消息", - ] - enable_command = True - - def __init__(self, message): - super().__init__(message) - self._services = {} - self.log_prefix = f"[Command:{self.command_name}]" - - async def execute(self) -> Tuple[bool, Optional[str]]: - """执行增强版发送消息命令""" - try: - # 获取匹配参数 - msg_type = self.matched_groups.get("msg_type") # 消息类型: text/image/emoji - target_type = self.matched_groups.get("target_type") # 目标类型: group/user - target_id = self.matched_groups.get("target_id") # 目标ID - platform = self.matched_groups.get("platform") or "qq" # 平台,默认qq - content = self.matched_groups.get("content") # 内容 - - if not all([msg_type, target_type, target_id, content]): - return False, "命令参数不完整,请检查格式" - - # 验证消息类型 - valid_types = ["text", "image", "emoji"] - if msg_type not in valid_types: - return False, f"不支持的消息类型: {msg_type},支持的类型: {', '.join(valid_types)}" - - # 验证目标类型 - if target_type not in ["group", "user"]: - return False, "目标类型只能是 group 或 user" - - logger.info(f"{self.log_prefix} 执行发送命令: {msg_type} -> {target_type}:{target_id} (平台:{platform})") - - # 根据消息类型和目标类型发送消息 - is_group = target_type == "group" - success = await self.send_message_to_target( - message_type=msg_type, content=content, platform=platform, target_id=target_id, is_group=is_group - ) - - # 构建结果消息 - target_desc = f"{'群聊' if is_group else '用户'} {target_id} (平台: {platform})" - msg_type_desc = {"text": "文本", "image": "图片", "emoji": "表情"}.get(msg_type, msg_type) - - if success: - return True, f"✅ {msg_type_desc}消息已成功发送到 {target_desc}" - else: - return False, f"❌ {msg_type_desc}消息发送失败,可能是目标 {target_desc} 不存在或没有权限" - - except Exception as e: - logger.error(f"{self.log_prefix} 执行增强发送命令时出错: {e}") - return False, f"命令执行出错: {str(e)}" - - -@register_command -class SendQuickCommand(BaseCommand, MessageAPI): - """快速发送文本消息命令""" - - command_name = "msg" - command_description = "快速发送文本消息到群聊" - command_pattern = r"^/msg\s+(?P\d+)\s+(?P.+)$" - command_help = "使用方法: /msg <群ID> <消息内容> - 快速发送文本到指定群聊" - command_examples = ["/msg 123456789 大家好!", "/msg 987654321 这是一条快速消息"] - enable_command = True - - def __init__(self, message): - super().__init__(message) - self._services = {} - self.log_prefix = f"[Command:{self.command_name}]" - - async def execute(self) -> Tuple[bool, Optional[str]]: - """执行快速发送消息命令""" - try: - group_id = self.matched_groups.get("group_id") - content = self.matched_groups.get("content") - - if not all([group_id, content]): - return False, "命令参数不完整" - - logger.info(f"{self.log_prefix} 快速发送到群 {group_id}: {content[:50]}...") - - success = await self.send_text_to_group(text=content, group_id=group_id, platform="qq") - - if success: - return True, f"✅ 消息已发送到群 {group_id}" - else: - return False, f"❌ 发送到群 {group_id} 失败" - - except Exception as e: - logger.error(f"{self.log_prefix} 快速发送命令出错: {e}") - return False, f"发送失败: {str(e)}" - - -@register_command -class SendPrivateCommand(BaseCommand, MessageAPI): - """发送私聊消息命令""" - - command_name = "pm" - command_description = "发送私聊消息到指定用户" - command_pattern = r"^/pm\s+(?P\d+)\s+(?P.+)$" - command_help = "使用方法: /pm <用户ID> <消息内容> - 发送私聊消息" - command_examples = ["/pm 123456789 你好!", "/pm 987654321 这是私聊消息"] - enable_command = True - - def __init__(self, message): - super().__init__(message) - self._services = {} - self.log_prefix = f"[Command:{self.command_name}]" - - async def execute(self) -> Tuple[bool, Optional[str]]: - """执行私聊发送命令""" - try: - user_id = self.matched_groups.get("user_id") - content = self.matched_groups.get("content") - - if not all([user_id, content]): - return False, "命令参数不完整" - - logger.info(f"{self.log_prefix} 发送私聊到用户 {user_id}: {content[:50]}...") - - success = await self.send_text_to_user(text=content, user_id=user_id, platform="qq") - - if success: - return True, f"✅ 私聊消息已发送到用户 {user_id}" - else: - return False, f"❌ 发送私聊到用户 {user_id} 失败" - - except Exception as e: - logger.error(f"{self.log_prefix} 私聊发送命令出错: {e}") - return False, f"私聊发送失败: {str(e)}" diff --git a/src/plugins/example_command_plugin/commands/send_msg_with_context.py b/src/plugins/example_command_plugin/commands/send_msg_with_context.py deleted file mode 100644 index 82031b60e..000000000 --- a/src/plugins/example_command_plugin/commands/send_msg_with_context.py +++ /dev/null @@ -1,240 +0,0 @@ -from src.common.logger_manager import get_logger -from src.chat.command.command_handler import BaseCommand, register_command -from src.chat.actions.plugin_api.message_api import MessageAPI -from typing import Tuple, Optional -import time - -logger = get_logger("send_msg_with_context") - - -@register_command -class ContextAwareSendCommand(BaseCommand, MessageAPI): - """上下文感知的发送消息命令,展示如何利用原始消息信息""" - - command_name = "csend" - command_description = "带上下文感知的发送消息命令" - command_pattern = ( - r"^/csend\s+(?Pgroup|user|here|reply)\s+(?P.*?)(?:\s+(?P.*))?$" - ) - command_help = "使用方法: /csend <参数> [内容]" - command_examples = [ - "/csend group 123456789 大家好!", - "/csend user 987654321 私聊消息", - "/csend here 在当前聊天发送", - "/csend reply 回复当前群/私聊", - ] - enable_command = True - - # 管理员用户ID列表(示例) - ADMIN_USERS = ["123456789", "987654321"] # 可以从配置文件读取 - - def __init__(self, message): - super().__init__(message) - self._services = {} - self.log_prefix = f"[Command:{self.command_name}]" - - async def execute(self) -> Tuple[bool, Optional[str]]: - """执行上下文感知的发送命令""" - try: - # 获取命令发送者信息 - sender = self.message.message_info.user_info - current_group = self.message.message_info.group_info - - # 权限检查 - if not self._check_permission(sender.user_id): - return False, f"❌ 权限不足,只有管理员可以使用此命令\n你的ID: {sender.user_id}" - - # 解析命令参数 - target_type = self.matched_groups.get("target_type") - target_id_or_content = self.matched_groups.get("target_id_or_content", "") - content = self.matched_groups.get("content", "") - - # 根据目标类型处理不同情况 - if target_type == "here": - # 发送到当前聊天 - return await self._send_to_current_chat(target_id_or_content, sender, current_group) - - elif target_type == "reply": - # 回复到当前聊天,带发送者信息 - return await self._send_reply_with_context(target_id_or_content, sender, current_group) - - elif target_type in ["group", "user"]: - # 发送到指定目标 - if not content: - return False, "指定群聊或用户时需要提供消息内容" - return await self._send_to_target(target_type, target_id_or_content, content, sender) - - else: - return False, f"不支持的目标类型: {target_type}" - - except Exception as e: - logger.error(f"{self.log_prefix} 执行上下文感知发送命令时出错: {e}") - return False, f"命令执行出错: {str(e)}" - - def _check_permission(self, user_id: str) -> bool: - """检查用户权限""" - return user_id in self.ADMIN_USERS - - async def _send_to_current_chat(self, content: str, sender, current_group) -> Tuple[bool, str]: - """发送到当前聊天""" - if not content: - return False, "消息内容不能为空" - - # 构建带发送者信息的消息 - timestamp = time.strftime("%H:%M:%S", time.localtime()) - if current_group: - # 群聊 - formatted_content = f"[管理员转发 {timestamp}] {sender.user_nickname}({sender.user_id}): {content}" - success = await self.send_text_to_group( - text=formatted_content, group_id=current_group.group_id, platform="qq" - ) - target_desc = f"当前群聊 {current_group.group_name}({current_group.group_id})" - else: - # 私聊 - formatted_content = f"[管理员消息 {timestamp}]: {content}" - success = await self.send_text_to_user(text=formatted_content, user_id=sender.user_id, platform="qq") - target_desc = "当前私聊" - - if success: - return True, f"✅ 消息已发送到{target_desc}" - else: - return False, f"❌ 发送到{target_desc}失败" - - async def _send_reply_with_context(self, content: str, sender, current_group) -> Tuple[bool, str]: - """发送回复,带完整上下文信息""" - if not content: - return False, "回复内容不能为空" - - # 获取当前时间和环境信息 - timestamp = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) - - # 构建上下文信息 - context_info = [ - f"📢 管理员回复 [{timestamp}]", - f"👤 发送者: {sender.user_nickname}({sender.user_id})", - ] - - if current_group: - context_info.append(f"👥 当前群聊: {current_group.group_name}({current_group.group_id})") - target_desc = f"群聊 {current_group.group_name}" - else: - context_info.append("💬 当前环境: 私聊") - target_desc = "私聊" - - context_info.extend([f"📝 回复内容: {content}", "─" * 30]) - - formatted_content = "\n".join(context_info) - - # 发送消息 - if current_group: - success = await self.send_text_to_group( - text=formatted_content, group_id=current_group.group_id, platform="qq" - ) - else: - success = await self.send_text_to_user(text=formatted_content, user_id=sender.user_id, platform="qq") - - if success: - return True, f"✅ 带上下文的回复已发送到{target_desc}" - else: - return False, f"❌ 发送上下文回复到{target_desc}失败" - - async def _send_to_target(self, target_type: str, target_id: str, content: str, sender) -> Tuple[bool, str]: - """发送到指定目标,带发送者追踪信息""" - timestamp = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) - - # 构建带追踪信息的消息 - tracking_info = f"[管理转发 {timestamp}] 来自 {sender.user_nickname}({sender.user_id})" - formatted_content = f"{tracking_info}\n{content}" - - if target_type == "group": - success = await self.send_text_to_group(text=formatted_content, group_id=target_id, platform="qq") - target_desc = f"群聊 {target_id}" - else: # user - success = await self.send_text_to_user(text=formatted_content, user_id=target_id, platform="qq") - target_desc = f"用户 {target_id}" - - if success: - return True, f"✅ 带追踪信息的消息已发送到{target_desc}" - else: - return False, f"❌ 发送到{target_desc}失败" - - -@register_command -class MessageContextCommand(BaseCommand): - """消息上下文命令,展示如何获取和利用上下文信息""" - - command_name = "context" - command_description = "显示当前消息的完整上下文信息" - command_pattern = r"^/context$" - command_help = "使用方法: /context - 显示当前环境的上下文信息" - command_examples = ["/context"] - enable_command = True - - async def execute(self) -> Tuple[bool, Optional[str]]: - """显示上下文信息""" - try: - message = self.message - user = message.message_info.user_info - group = message.message_info.group_info - - # 构建上下文信息 - context_lines = [ - "🌐 当前上下文信息", - "=" * 30, - "", - "⏰ 时间信息:", - f" 消息时间: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(message.message_info.time))}", - f" 时间戳: {message.message_info.time}", - "", - "👤 发送者:", - f" 用户ID: {user.user_id}", - f" 昵称: {user.user_nickname}", - f" 群名片: {user.user_cardname or '无'}", - f" 平台: {user.platform}", - ] - - if group: - context_lines.extend( - [ - "", - "👥 群聊环境:", - f" 群ID: {group.group_id}", - f" 群名: {group.group_name or '未知'}", - f" 平台: {group.platform}", - ] - ) - else: - context_lines.extend( - [ - "", - "💬 私聊环境", - ] - ) - - # 添加聊天流信息 - if hasattr(message, "chat_stream") and message.chat_stream: - chat_stream = message.chat_stream - context_lines.extend( - [ - "", - "🔄 聊天流:", - f" 流ID: {chat_stream.stream_id}", - ] - ) - - # 添加消息内容信息 - context_lines.extend( - [ - "", - "📝 消息内容:", - f" 原始内容: {message.processed_plain_text}", - f" 消息长度: {len(message.processed_plain_text)} 字符", - f" 消息ID: {message.message_info.message_id}", - ] - ) - - return True, "\n".join(context_lines) - - except Exception as e: - logger.error(f"{self.log_prefix} 获取上下文信息时出错: {e}") - return False, f"获取上下文失败: {str(e)}" diff --git a/src/plugins/examples/example_plugin/README.md b/src/plugins/examples/example_plugin/README.md new file mode 100644 index 000000000..c1129df7a --- /dev/null +++ b/src/plugins/examples/example_plugin/README.md @@ -0,0 +1,187 @@ +# 综合示例插件 + +## 概述 + +这是一个展示新插件系统完整功能的综合示例插件,整合了所有旧示例插件的功能,并使用新的架构重写。 + +## 功能特性 + +### 🎯 Action组件 + +#### SmartGreetingAction - 智能问候 +- **触发方式**: 关键词触发 (你好、hello、hi、嗨等) +- **支持模式**: 所有聊天模式 +- **功能**: 智能问候,支持LLM个性化生成 +- **配置**: 可自定义问候模板、启用表情、LLM生成 + +### 📝 Command组件 + +#### 1. ComprehensiveHelpCommand - 综合帮助系统 +``` +/help [命令名] +``` +- **功能**: 显示所有命令帮助或特定命令详情 +- **拦截**: ✅ 拦截消息处理 +- **示例**: `/help`, `/help send` + +#### 2. MessageSendCommand - 消息发送 +``` +/send <消息内容> +``` +- **功能**: 向指定群聊或私聊发送消息 +- **拦截**: ✅ 拦截消息处理 +- **示例**: `/send group 123456 大家好` + +#### 3. SystemStatusCommand - 系统状态查询 +``` +/status [类型] +``` +- **功能**: 查询系统、插件、内存等状态 +- **拦截**: ✅ 拦截消息处理 +- **示例**: `/status`, `/status 插件` + +#### 4. EchoCommand - 回声命令 +``` +/echo <消息内容> +``` +- **功能**: 重复用户输入的消息 +- **拦截**: ✅ 拦截消息处理 +- **示例**: `/echo Hello World` + +#### 5. MessageInfoCommand - 消息信息查询 +``` +/info +``` +- **功能**: 显示当前消息的详细信息 +- **拦截**: ✅ 拦截消息处理 +- **示例**: `/info` + +#### 6. CustomPrefixCommand - 自定义前缀 +``` +/prefix <前缀> <内容> +``` +- **功能**: 为消息添加自定义前缀 +- **拦截**: ✅ 拦截消息处理 +- **示例**: `/prefix [公告] 系统维护` + +#### 7. LogMonitorCommand - 日志监控 +``` +/log [级别] +``` +- **功能**: 记录消息到日志但不拦截后续处理 +- **拦截**: ❌ 不拦截,继续处理消息 +- **示例**: `/log`, `/log debug` + +## 🔧 拦截控制演示 + +此插件完美演示了新插件系统的**拦截控制功能**: + +### 拦截型命令 (intercept_message = True) +- `/help` - 显示帮助后停止处理 +- `/send` - 发送消息后停止处理 +- `/status` - 查询状态后停止处理 +- `/echo` - 回声后停止处理 +- `/info` - 显示信息后停止处理 +- `/prefix` - 添加前缀后停止处理 + +### 非拦截型命令 (intercept_message = False) +- `/log` - 记录日志但继续处理,可能触发其他功能 + +## ⚙️ 配置说明 + +插件支持通过 `config.toml` 进行详细配置: + +### 组件控制 +```toml +[components] +enable_greeting = true # 启用智能问候 +enable_help = true # 启用帮助系统 +enable_send = true # 启用消息发送 +# ... 其他组件开关 +``` + +### 功能配置 +```toml +[greeting] +template = "你好,{username}!" # 问候模板 +enable_emoji = true # 启用表情 +enable_llm = false # 启用LLM生成 + +[send] +max_message_length = 500 # 最大消息长度 + +[echo] +max_length = 200 # 回声最大长度 +enable_formatting = true # 启用格式化 +``` + +## 🚀 使用示例 + +### 智能问候 +``` +用户: 你好 +机器人: 你好,朋友!欢迎使用MaiBot综合插件系统!😊 +``` + +### 帮助查询 +``` +用户: /help +机器人: [显示完整命令帮助列表] + +用户: /help send +机器人: [显示send命令的详细帮助] +``` + +### 消息发送 +``` +用户: /send group 123456 大家好! +机器人: ✅ 消息已成功发送到 群聊 123456 +``` + +### 日志监控(不拦截) +``` +用户: /log info 这是一条测试消息 +[日志记录但消息继续处理,可能触发智能问候等其他功能] +``` + +## 📁 文件结构 + +``` +src/plugins/built_in/example_comprehensive/ +├── plugin.py # 主插件文件 +├── config.toml # 配置文件 +└── README.md # 说明文档 +``` + +## 🔄 架构升级 + +此插件展示了从旧插件系统到新插件系统的完整升级: + +### 旧系统特征 +- 使用 `@register_command` 装饰器 +- 继承旧的 `BaseCommand` +- 硬编码的消息处理逻辑 +- 有限的配置支持 + +### 新系统特征 +- 使用统一的组件注册机制 +- 新的 `BaseAction` 和 `BaseCommand` 基类 +- **拦截控制功能** - 灵活的消息处理流程 +- 强大的配置驱动架构 +- 统一的API接口 +- 完整的错误处理和日志 + +## 💡 开发指南 + +此插件可作为开发新插件的完整参考: + +1. **Action开发**: 参考 `SmartGreetingAction` +2. **Command开发**: 参考各种Command实现 +3. **拦截控制**: 根据需要设置 `intercept_message` +4. **配置使用**: 通过 `self.api.get_config()` 读取配置 +5. **错误处理**: 完整的异常捕获和用户反馈 +6. **日志记录**: 结构化的日志输出 + +## 🎉 总结 + +这个综合示例插件完美展示了新插件系统的强大功能,特别是**拦截控制机制**,让开发者可以精确控制消息处理流程,实现更灵活的插件交互模式。 \ No newline at end of file diff --git a/src/plugins/examples/example_plugin/config.toml b/src/plugins/examples/example_plugin/config.toml new file mode 100644 index 000000000..981b480e1 --- /dev/null +++ b/src/plugins/examples/example_plugin/config.toml @@ -0,0 +1,53 @@ +# 综合示例插件配置文件 + +[plugin] +name = "example_comprehensive" +version = "2.0.0" +enabled = true +description = "展示新插件系统完整功能的综合示例插件" + +# 组件启用控制 +[components] +enable_greeting = true +enable_help = true +enable_send = true +enable_echo = true +enable_info = true + +# 智能问候配置 +[greeting] +template = "你好,{username}!欢迎使用MaiBot综合插件系统!" +enable_emoji = true +enable_llm = false # 是否使用LLM生成个性化问候 + +# 消息发送配置 +[send] +max_message_length = 500 +enable_length_check = true +default_platform = "qq" + +# 回声命令配置 +[echo] +max_length = 200 +enable_formatting = true + +# 消息信息配置 +[info] +show_detailed_info = true +include_stream_info = true +max_content_preview = 100 + +# 帮助系统配置 +[help] +show_extended_help = true +include_action_info = true +include_config_info = true + +# 骰子命令配置 +[dice] +enable_dice = true + +# 日志配置 +[logging] +level = "INFO" +prefix = "[ExampleComprehensive]" \ No newline at end of file diff --git a/src/plugins/examples/example_plugin/plugin.py b/src/plugins/examples/example_plugin/plugin.py new file mode 100644 index 000000000..8a91e7c82 --- /dev/null +++ b/src/plugins/examples/example_plugin/plugin.py @@ -0,0 +1,486 @@ +""" +综合示例插件 + +将旧的示例插件功能重写为新插件系统架构,展示完整的插件开发模式。 + +包含功能: +- 智能问候Action +- 帮助系统Command +- 消息发送Command +- 状态查询Command +- 回声Command +- 自定义前缀Command +- 消息信息查询Command +- 高级消息发送Command + +演示新插件系统的完整功能: +- Action和Command组件的定义 +- 拦截控制功能 +- 配置驱动的行为 +- API的多种使用方式 +- 日志和错误处理 +""" + +from typing import List, Tuple, Type, Optional, Dict, Any +import re +import time +import random +# 导入新插件系统 +from src.plugin_system.base.base_plugin import BasePlugin +from src.plugin_system.base.base_plugin import register_plugin +from src.plugin_system.base.base_action import BaseAction +from src.plugin_system.base.base_command import BaseCommand +from src.plugin_system.base.component_types import ComponentInfo, ActionActivationType, ChatMode +from src.common.logger_manager import get_logger + +logger = get_logger("example_comprehensive") + + +# ===== Action组件 ===== + +class SmartGreetingAction(BaseAction): + """智能问候Action - 基于关键词触发的问候系统""" + + # 激活设置 + focus_activation_type = ActionActivationType.KEYWORD + normal_activation_type = ActionActivationType.KEYWORD + activation_keywords = ["你好", "hello", "hi", "嗨", "问候", "早上好", "晚上好"] + keyword_case_sensitive = False + mode_enable = ChatMode.ALL + parallel_action = False + + # Action参数定义 + action_parameters = { + "username": "要问候的用户名(可选)" + } + + # Action使用场景 + action_require = [ + "用户发送包含问候词汇的消息", + "检测到新用户加入时", + "响应友好交流需求" + ] + + +# ===== Command组件 ===== + +class ComprehensiveHelpCommand(BaseCommand): + """综合帮助系统 - 显示所有可用命令和Action""" + + command_pattern = r"^/help(?:\s+(?P\w+))?$" + command_help = "显示所有命令帮助或特定命令详情,用法:/help [命令名]" + command_examples = ["/help", "/help send", "/help status"] + intercept_message = True # 拦截消息,不继续处理 + + async def execute(self) -> Tuple[bool, Optional[str]]: + """执行帮助命令""" + try: + command_name = self.matched_groups.get("command") + + if command_name: + # 显示特定命令帮助 + return await self._show_specific_help(command_name) + else: + # 显示所有命令概览 + return await self._show_all_commands() + + except Exception as e: + logger.error(f"{self.log_prefix} 帮助命令执行失败: {e}") + await self.send_reply(f"❌ 帮助系统错误: {str(e)}") + return False, str(e) + + async def _show_specific_help(self, command_name: str) -> Tuple[bool, str]: + """显示特定命令的详细帮助""" + # 这里可以扩展为动态获取所有注册的Command信息 + help_info = { + "help": { + "description": "显示帮助信息", + "usage": "/help [命令名]", + "examples": ["/help", "/help send"] + }, + "send": { + "description": "发送消息到指定目标", + "usage": "/send <消息内容>", + "examples": ["/send group 123456 你好", "/send user 789456 私聊"] + }, + "status": { + "description": "查询系统状态", + "usage": "/status [类型]", + "examples": ["/status", "/status 系统", "/status 插件"] + } + } + + info = help_info.get(command_name.lower()) + if not info: + response = f"❌ 未找到命令: {command_name}\n使用 /help 查看所有可用命令" + else: + response = f""" +📖 命令帮助: {command_name} + +📝 描述: {info['description']} +⚙️ 用法: {info['usage']} +💡 示例: +{chr(10).join(f" • {example}" for example in info['examples'])} + """.strip() + + await self.send_reply(response) + return True, response + + async def _show_all_commands(self) -> Tuple[bool, str]: + """显示所有可用命令""" + help_text = """ +🤖 综合示例插件 - 命令帮助 + +📝 可用命令: +• /help [命令] - 显示帮助信息 +• /send <目标类型> <消息> - 发送消息 +• /status [类型] - 查询系统状态 +• /echo <消息> - 回声重复消息 +• /info - 查询当前消息信息 +• /prefix <前缀> <内容> - 自定义前缀消息 + +🎯 智能功能: +• 智能问候 - 关键词触发自动问候 +• 状态监控 - 实时系统状态查询 +• 消息转发 - 跨群聊/私聊消息发送 + +⚙️ 拦截控制: +• 部分命令拦截消息处理(如 /help) +• 部分命令允许继续处理(如 /log) + +💡 使用 /help <命令名> 获取特定命令的详细说明 + """.strip() + + await self.send_reply(help_text) + return True, help_text + + +class MessageSendCommand(BaseCommand): + """消息发送Command - 向指定群聊或私聊发送消息""" + + command_pattern = r"^/send\s+(?Pgroup|user)\s+(?P\d+)\s+(?P.+)$" + command_help = "向指定群聊或私聊发送消息,用法:/send <消息内容>" + command_examples = [ + "/send group 123456789 大家好!", + "/send user 987654321 私聊消息", + "/send group 555666777 这是来自插件的消息" + ] + intercept_message = True # 拦截消息处理 + + async def execute(self) -> Tuple[bool, Optional[str]]: + """执行消息发送""" + try: + target_type = self.matched_groups.get("target_type") + target_id = self.matched_groups.get("target_id") + content = self.matched_groups.get("content") + + if not all([target_type, target_id, content]): + await self.send_reply("❌ 命令参数不完整,请检查格式") + return False, "参数不完整" + + # 长度限制检查 + max_length = self.api.get_config("send.max_message_length", 500) + if len(content) > max_length: + await self.send_reply(f"❌ 消息过长,最大长度: {max_length} 字符") + return False, "消息过长" + + logger.info(f"{self.log_prefix} 发送消息: {target_type}:{target_id} -> {content[:50]}...") + + # 根据目标类型发送消息 + if target_type == "group": + success = await self.api.send_text_to_group( + text=content, + group_id=target_id, + platform="qq" + ) + target_desc = f"群聊 {target_id}" + elif target_type == "user": + success = await self.api.send_text_to_user( + text=content, + user_id=target_id, + platform="qq" + ) + target_desc = f"用户 {target_id}" + else: + await self.send_reply(f"❌ 不支持的目标类型: {target_type}") + return False, f"不支持的目标类型: {target_type}" + + # 返回结果 + if success: + response = f"✅ 消息已成功发送到 {target_desc}" + await self.send_reply(response) + return True, response + else: + response = f"❌ 消息发送失败,目标 {target_desc} 可能不存在" + await self.send_reply(response) + return False, response + + except Exception as e: + logger.error(f"{self.log_prefix} 消息发送失败: {e}") + error_msg = f"❌ 发送失败: {str(e)}" + await self.send_reply(error_msg) + return False, str(e) + + +class DiceCommand(BaseCommand): + """骰子命令,使用!前缀而不是/前缀""" + + command_pattern = r"^[!!](?:dice|骰子)(?:\s+(?P\d+))?$" # 匹配 !dice 或 !骰子,可选参数为骰子数量 + command_help = "使用方法: !dice [数量] 或 !骰子 [数量] - 掷骰子,默认掷1个" + command_examples = ["!dice", "!骰子", "!dice 3", "!骰子 5"] + intercept_message = True # 拦截消息处理 + + async def execute(self) -> Tuple[bool, Optional[str]]: + """执行骰子命令 + + Returns: + Tuple[bool, Optional[str]]: (是否执行成功, 回复消息) + """ + try: + # 获取骰子数量,默认为1 + count_str = self.matched_groups.get("count") + + # 确保count_str不为None + if count_str is None: + count = 1 # 默认值 + else: + try: + count = int(count_str) + if count <= 0: + response = "❌ 骰子数量必须大于0" + await self.send_reply(response) + return False, response + if count > 10: # 限制最大数量 + response = "❌ 一次最多只能掷10个骰子" + await self.send_reply(response) + return False, response + except ValueError: + response = "❌ 骰子数量必须是整数" + await self.send_reply(response) + return False, response + + # 生成随机数 + results = [random.randint(1, 6) for _ in range(count)] + + # 构建回复消息 + if count == 1: + message = f"🎲 掷出了 {results[0]} 点" + else: + dice_results = ", ".join(map(str, results)) + total = sum(results) + message = f"🎲 掷出了 {count} 个骰子: [{dice_results}],总点数: {total}" + + await self.send_reply(message) + logger.info(f"{self.log_prefix} 执行骰子命令: {message}") + return True, message + + except Exception as e: + error_msg = f"❌ 执行命令时出错: {str(e)}" + await self.send_reply(error_msg) + logger.error(f"{self.log_prefix} 执行骰子命令时出错: {e}") + return False, error_msg + + +class EchoCommand(BaseCommand): + """回声Command - 重复用户输入的消息""" + + command_pattern = r"^/echo\s+(?P.+)$" + command_help = "重复你的消息内容,用法:/echo <消息内容>" + command_examples = ["/echo Hello World", "/echo 你好世界", "/echo 测试回声"] + intercept_message = True # 拦截消息处理 + + async def execute(self) -> Tuple[bool, Optional[str]]: + """执行回声命令""" + try: + message = self.matched_groups.get("message", "") + + if not message: + response = "❌ 请提供要重复的消息!用法:/echo <消息内容>" + await self.send_reply(response) + return False, response + + # 检查消息长度限制 + max_length = self.api.get_config("echo.max_length", 200) + if len(message) > max_length: + response = f"❌ 消息过长,最大长度: {max_length} 字符" + await self.send_reply(response) + return False, response + + # 格式化回声消息 + enable_formatting = self.api.get_config("echo.enable_formatting", True) + if enable_formatting: + response = f"🔊 回声: {message}" + else: + response = message + + await self.send_reply(response) + logger.info(f"{self.log_prefix} 回声消息: {message}") + return True, response + + except Exception as e: + logger.error(f"{self.log_prefix} 回声命令失败: {e}") + error_msg = f"❌ 回声失败: {str(e)}" + await self.send_reply(error_msg) + return False, str(e) + + +class MessageInfoCommand(BaseCommand): + """消息信息Command - 显示当前消息的详细信息""" + + command_pattern = r"^/info$" + command_help = "显示当前消息的详细信息" + command_examples = ["/info"] + intercept_message = True # 拦截消息处理 + + async def execute(self) -> Tuple[bool, Optional[str]]: + """执行消息信息查询""" + try: + message = self.message + + # 收集消息信息 + user_info = message.message_info.user_info + group_info = message.message_info.group_info + + info_parts = [ + "📋 消息信息详情", + "", + f"👤 用户信息:", + f" • ID: {user_info.user_id}", + f" • 昵称: {user_info.user_nickname}", + f" • 群名片: {getattr(user_info, 'user_cardname', '无')}", + f" • 平台: {message.message_info.platform}", + "", + f"💬 消息信息:", + f" • 消息ID: {message.message_info.message_id}", + f" • 时间戳: {message.message_info.time}", + f" • 原始内容: {message.processed_plain_text[:100]}{'...' if len(message.processed_plain_text) > 100 else ''}", + f" • 是否表情: {'是' if getattr(message, 'is_emoji', False) else '否'}", + ] + + # 群聊信息 + if group_info: + info_parts.extend([ + "", + f"👥 群聊信息:", + f" • 群ID: {group_info.group_id}", + f" • 群名: {getattr(group_info, 'group_name', '未知')}", + f" • 聊天类型: 群聊" + ]) + else: + info_parts.extend([ + "", + f"💭 聊天类型: 私聊" + ]) + + # 流信息 + if hasattr(message, 'chat_stream') and message.chat_stream: + stream = message.chat_stream + info_parts.extend([ + "", + f"🌊 聊天流信息:", + f" • 流ID: {stream.stream_id}", + f" • 创建时间: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(stream.create_time))}", + f" • 最后活跃: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(stream.last_active_time))}" + ]) + + response = "\n".join(info_parts) + await self.send_reply(response) + logger.info(f"{self.log_prefix} 显示消息信息: {user_info.user_id}") + return True, response + + except Exception as e: + logger.error(f"{self.log_prefix} 消息信息查询失败: {e}") + error_msg = f"❌ 信息查询失败: {str(e)}" + await self.send_reply(error_msg) + return False, str(e) + + +@register_plugin +class ExampleComprehensivePlugin(BasePlugin): + """综合示例插件 + + 整合了旧示例插件的所有功能,展示新插件系统的完整能力: + - 多种Action和Command组件 + - 拦截控制功能演示 + - 配置驱动的行为 + - 完整的错误处理 + - 日志记录和监控 + """ + + # 插件基本信息 + plugin_name = "example_comprehensive" + plugin_description = "综合示例插件,展示新插件系统的完整功能" + plugin_version = "2.0.0" + plugin_author = "MaiBot开发团队" + enable_plugin = True + config_file_name = "config.toml" + + def get_plugin_components(self) -> List[Tuple[ComponentInfo, Type]]: + """返回插件包含的组件列表""" + + # 从配置获取组件启用状态 + enable_greeting = self.get_config("components.enable_greeting", True) + enable_help = self.get_config("components.enable_help", True) + enable_send = self.get_config("components.enable_send", True) + enable_echo = self.get_config("components.enable_echo", True) + enable_info = self.get_config("components.enable_info", True) + enable_dice = self.get_config("components.enable_dice", True) + components = [] + + # 添加Action组件 + if enable_greeting: + components.append(( + SmartGreetingAction.get_action_info( + name="smart_greeting", + description="智能问候系统,基于关键词触发" + ), + SmartGreetingAction + )) + + # 添加Command组件 + if enable_help: + components.append(( + ComprehensiveHelpCommand.get_command_info( + name="comprehensive_help", + description="综合帮助系统,显示所有命令信息" + ), + ComprehensiveHelpCommand + )) + + if enable_send: + components.append(( + MessageSendCommand.get_command_info( + name="message_send", + description="消息发送命令,支持群聊和私聊" + ), + MessageSendCommand + )) + + if enable_echo: + components.append(( + EchoCommand.get_command_info( + name="echo", + description="回声命令,重复用户输入" + ), + EchoCommand + )) + + if enable_info: + components.append(( + MessageInfoCommand.get_command_info( + name="message_info", + description="消息信息查询,显示详细信息" + ), + MessageInfoCommand + )) + + if enable_dice: + components.append(( + DiceCommand.get_command_info( + name="dice", + description="骰子命令,掷骰子" + ), + DiceCommand + )) + + return components \ No newline at end of file diff --git a/src/plugins/examples/simple_plugin/config.toml b/src/plugins/examples/simple_plugin/config.toml deleted file mode 100644 index 7529453f7..000000000 --- a/src/plugins/examples/simple_plugin/config.toml +++ /dev/null @@ -1,30 +0,0 @@ -# 完整示例插件配置文件 - -[plugin] -name = "simple_plugin" -version = "1.1.0" -enabled = true -description = "展示新插件系统完整功能的示例插件" - -[hello_action] -greeting_message = "你好,{username}!欢迎使用MaiBot新插件系统!" -enable_emoji = true -enable_llm_greeting = false # 是否使用LLM生成个性化问候 -default_username = "朋友" - -[status_command] -show_detailed_info = true -allowed_types = ["系统", "插件", "数据库", "内存", "网络"] -default_type = "系统" - -[echo_command] -max_message_length = 500 -enable_formatting = true - -[help_command] -show_extended_help = true -include_config_info = true - -[logging] -level = "INFO" -prefix = "[SimplePlugin]" \ No newline at end of file diff --git a/src/plugins/examples/simple_plugin/plugin.py b/src/plugins/examples/simple_plugin/plugin.py deleted file mode 100644 index 71b2fb7c8..000000000 --- a/src/plugins/examples/simple_plugin/plugin.py +++ /dev/null @@ -1,195 +0,0 @@ -""" -完整示例插件 - -演示新插件系统的完整功能: -- 使用简化的导入接口 -- 展示Action和Command组件的定义 -- 展示插件配置的使用 -- 提供实用的示例功能 -- 演示API的多种使用方式 -""" - -from typing import List, Tuple, Type, Optional - -# 使用简化的导入接口 -from src.plugin_system import ( - BasePlugin, - register_plugin, - BaseAction, - BaseCommand, - ComponentInfo, - ActionActivationType, - ChatMode, -) -from src.common.logger_manager import get_logger - -logger = get_logger("simple_plugin") - - -class HelloAction(BaseAction): - """智能问候Action组件""" - - # ✅ 现在可以直接在类中定义激活条件! - focus_activation_type = ActionActivationType.KEYWORD - normal_activation_type = ActionActivationType.KEYWORD - activation_keywords = ["你好", "hello", "问候", "hi", "嗨"] - keyword_case_sensitive = False - mode_enable = ChatMode.ALL - parallel_action = False - - async def execute(self) -> Tuple[bool, str]: - """执行问候动作""" - username = self.action_data.get("username", "朋友") - - # 使用默认配置值(避免创建新插件实例) - greeting_template = "你好,{username}!" - enable_emoji = True - enable_llm = False - - # 如果启用LLM生成个性化问候 - if enable_llm: - try: - # 演示:使用LLM API生成个性化问候 - models = self.api.get_available_models() - if models: - first_model = list(models.values())[0] - prompt = f"为用户名叫{username}的朋友生成一句温暖的个性化问候语,不超过30字:" - - success, response, _, _ = await self.api.generate_with_model( - prompt=prompt, model_config=first_model - ) - - if success: - logger.info(f"{self.log_prefix} 使用LLM生成问候: {response}") - return True, response - except Exception as e: - logger.warning(f"{self.log_prefix} LLM生成问候失败,使用默认模板: {e}") - - # 构建基础问候消息 - response = greeting_template.format(username=username) - if enable_emoji: - response += " 😊" - - # 演示:存储Action执行记录到数据库 - await self.api.store_action_info( - action_build_into_prompt=False, action_prompt_display=f"问候了用户: {username}", action_done=True - ) - - logger.info(f"{self.log_prefix} 执行问候动作: {username}") - return True, response - - -class EchoCommand(BaseCommand): - """回声命令 - 重复用户输入""" - - # ✅ 现在可以直接在类中定义命令模式! - command_pattern = r"^/echo\s+(?P.+)$" - command_help = "重复消息,用法:/echo <消息内容>" - command_examples = ["/echo Hello World", "/echo 你好世界"] - - async def execute(self) -> Tuple[bool, Optional[str]]: - """执行回声命令""" - # 获取匹配的参数 - message = self.matched_groups.get("message", "") - - if not message: - response = "请提供要重复的消息!用法:/echo <消息内容>" - else: - response = f"🔊 {message}" - - # 发送回复 - await self.send_reply(response) - - logger.info(f"{self.log_prefix} 执行回声命令: {message}") - return True, response - - -class StatusCommand(BaseCommand): - """状态查询Command组件""" - - # ✅ 直接定义命令模式 - command_pattern = r"^/status\s*(?P\w+)?$" - command_help = "查询系统状态,用法:/status [类型]" - command_examples = ["/status", "/status 系统", "/status 插件"] - - async def execute(self) -> Tuple[bool, Optional[str]]: - """执行状态查询命令""" - # 获取匹配的参数 - query_type = self.matched_groups.get("type", "系统") - - # 使用默认配置值(避免创建新插件实例) - show_detailed = True - allowed_types = ["系统", "插件"] - - if query_type not in allowed_types: - response = f"不支持的查询类型: {query_type}\n支持的类型: {', '.join(allowed_types)}" - elif show_detailed: - response = f"📊 {query_type}状态详情:\n✅ 运行正常\n🔧 版本: 1.0.0\n⚡ 性能: 良好" - else: - response = f"✅ {query_type}状态:正常" - - # 发送回复 - await self.send_reply(response) - - logger.info(f"{self.log_prefix} 执行状态查询: {query_type}") - return True, response - - -class HelpCommand(BaseCommand): - """帮助命令 - 显示插件功能""" - - # ✅ 直接定义命令模式 - command_pattern = r"^/help$" - command_help = "显示插件帮助信息" - command_examples = ["/help"] - - async def execute(self) -> Tuple[bool, Optional[str]]: - """执行帮助命令""" - help_text = """ -🤖 简单示例插件帮助 - -📝 可用命令: -• /echo <消息> - 重复你的消息 -• /status [类型] - 查询系统状态 -• /help - 显示此帮助信息 - -🎯 智能功能: -• 自动问候 - 当消息包含"你好"、"hello"等关键词时触发 - -⚙️ 配置: -本插件支持通过config.toml文件进行个性化配置 - -💡 这是新插件系统的完整示例,展示了Action和Command的结合使用。 - """.strip() - - await self.send_reply(help_text) - - logger.info(f"{self.log_prefix} 显示帮助信息") - return True, "已显示帮助信息" - - -@register_plugin -class SimplePlugin(BasePlugin): - """完整示例插件 - - 包含多个Action和Command组件,展示插件系统的完整功能 - """ - - # 插件基本信息 - plugin_name = "simple_plugin" - plugin_description = "完整的示例插件,展示新插件系统的各种功能" - plugin_version = "1.1.0" - plugin_author = "MaiBot开发团队" - enable_plugin = True - config_file_name = "config.toml" # 配置文件 - - def get_plugin_components(self) -> List[Tuple[ComponentInfo, Type]]: - """返回插件包含的组件列表""" - - # ✅ 现在可以直接从类属性生成组件信息! - return [ - (HelloAction.get_action_info("hello_action", "智能问候动作,支持自定义消息和表情"), HelloAction), - (EchoCommand.get_command_info("echo_command", "回声命令,重复用户输入的消息"), EchoCommand), - (StatusCommand.get_command_info("status_command", "状态查询命令,支持多种查询类型"), StatusCommand), - (HelpCommand.get_command_info("help_command", "帮助命令,显示插件功能说明"), HelpCommand), - ] diff --git a/src/plugins/mute_plugin/__init__.py b/src/plugins/mute_plugin/__init__.py deleted file mode 100644 index 02aaf3b87..000000000 --- a/src/plugins/mute_plugin/__init__.py +++ /dev/null @@ -1,22 +0,0 @@ -"""禁言插件包 - -这是一个群聊管理插件,提供智能禁言功能。 - -功能特性: -- 智能LLM判定:根据聊天内容智能判断是否需要禁言 -- 灵活的时长管理:支持自定义禁言时长限制 -- 模板化消息:支持自定义禁言提示消息 -- 参数验证:完整的输入参数验证和错误处理 -- 配置文件支持:所有设置可通过配置文件调整 - -使用场景: -- 用户发送违规内容时自动判定禁言 -- 用户主动要求被禁言时执行操作 -- 管理员通过聊天指令触发禁言动作 - -配置文件:src/plugins/mute_plugin/actions/mute_action_config.toml -""" - -""" -这是一个测试插件 -""" diff --git a/src/plugins/mute_plugin/actions/__init__.py b/src/plugins/mute_plugin/actions/__init__.py deleted file mode 100644 index e44fd983c..000000000 --- a/src/plugins/mute_plugin/actions/__init__.py +++ /dev/null @@ -1,3 +0,0 @@ -"""测试插件动作模块""" - -from . import mute_action # noqa diff --git a/src/plugins/mute_plugin/actions/mute_action.py b/src/plugins/mute_plugin/actions/mute_action.py deleted file mode 100644 index a50e18ed0..000000000 --- a/src/plugins/mute_plugin/actions/mute_action.py +++ /dev/null @@ -1,263 +0,0 @@ -from src.common.logger_manager import get_logger -from src.chat.actions.plugin_action import PluginAction, register_action, ActionActivationType -from src.chat.actions.base_action import ChatMode -from typing import Tuple - -logger = get_logger("mute_action") - - -@register_action -class MuteAction(PluginAction): - """群聊禁言动作处理类""" - - action_name = "mute_action" - action_description = "在特定情境下,对某人采取禁言,让他不能说话" - action_parameters = { - "target": "禁言对象,必填,输入你要禁言的对象的名字", - "duration": "禁言时长,必填,输入你要禁言的时长(秒),单位为秒,必须为数字", - "reason": "禁言理由,可选", - } - action_require = [ - "当有人违反了公序良俗的内容", - "当有人刷屏时使用", - "当有人发了擦边,或者色情内容时使用", - "当有人要求禁言自己时使用", - "如果某人已经被禁言了,就不要再次禁言了,除非你想追加时间!!", - ] - enable_plugin = False # 启用插件 - associated_types = ["command", "text"] - action_config_file_name = "mute_action_config.toml" - - # 激活类型设置 - focus_activation_type = ActionActivationType.LLM_JUDGE # Focus模式使用LLM判定,确保谨慎 - normal_activation_type = ActionActivationType.KEYWORD # Normal模式使用关键词激活,快速响应 - - # 关键词设置(用于Normal模式) - activation_keywords = ["禁言", "mute", "ban", "silence"] - keyword_case_sensitive = False - - # LLM判定提示词(用于Focus模式) - llm_judge_prompt = """ -判定是否需要使用禁言动作的严格条件: - -必须使用禁言的情况: -1. 用户发送明显违规内容(色情、暴力、政治敏感等) -2. 恶意刷屏或垃圾信息轰炸 -3. 用户主动明确要求被禁言("禁言我"等) -4. 严重违反群规的行为 -5. 恶意攻击他人或群组管理 - -绝对不要使用的情况: -1. 正常聊天和讨论,即使话题敏感 -2. 情绪化表达但无恶意 -3. 开玩笑或调侃,除非过分 -4. 单纯的意见分歧或争论 -5. 轻微的不当言论(应优先提醒) -6. 用户只是提到"禁言"词汇但非要求 - -注意:禁言是严厉措施,只在明确违规或用户主动要求时使用。 -宁可保守也不要误判,保护用户的发言权利。 -""" - - # Random激活概率(备用) - random_activation_probability = 0.05 # 设置很低的概率作为兜底 - - # 模式启用设置 - 禁言功能在所有模式下都可用 - mode_enable = ChatMode.ALL - - # 并行执行设置 - 禁言动作可以与回复并行执行,不覆盖回复内容 - parallel_action = False - - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - # 生成配置文件(如果不存在) - self._generate_config_if_needed() - - def _generate_config_if_needed(self): - """生成配置文件(如果不存在)""" - import os - - # 获取动作文件所在目录 - current_dir = os.path.dirname(os.path.abspath(__file__)) - config_path = os.path.join(current_dir, "mute_action_config.toml") - - if not os.path.exists(config_path): - config_content = """\ -# 禁言动作配置文件 - -# 默认禁言时长限制(秒) -min_duration = 60 # 最短禁言时长 -max_duration = 2592000 # 最长禁言时长(30天) -default_duration = 300 # 默认禁言时长(5分钟) - -# 禁言消息模板 -templates = [ - "好的,禁言 {target} {duration},理由:{reason}", - "收到,对 {target} 执行禁言 {duration},因为{reason}", - "明白了,禁言 {target} {duration},原因是{reason}" -] - -# 错误消息模板 -error_messages = [ - "没有指定禁言对象呢~", - "没有指定禁言时长呢~", - "禁言时长必须是正数哦~", - "禁言时长必须是数字哦~", - "找不到 {target} 这个人呢~", - "查找用户信息时出现问题~" -] - -# 是否启用时长美化显示 -enable_duration_formatting = true - -# 是否记录禁言历史 -log_mute_history = true -""" - try: - with open(config_path, "w", encoding="utf-8") as f: - f.write(config_content) - logger.info(f"已生成禁言动作配置文件: {config_path}") - except Exception as e: - logger.error(f"生成配置文件失败: {e}") - - def _get_duration_limits(self) -> tuple[int, int, int]: - """获取时长限制配置""" - min_dur = self.config.get("min_duration", 60) - max_dur = self.config.get("max_duration", 2592000) - default_dur = self.config.get("default_duration", 300) - return min_dur, max_dur, default_dur - - def _get_template_message(self, target: str, duration_str: str, reason: str) -> str: - """获取模板化的禁言消息""" - templates = self.config.get("templates", ["好的,禁言 {target} {duration},理由:{reason}"]) - - import random - - template = random.choice(templates) - return template.format(target=target, duration=duration_str, reason=reason) - - async def process(self) -> Tuple[bool, str]: - """处理群聊禁言动作""" - logger.info(f"{self.log_prefix} 执行禁言动作: {self.reasoning}") - - # 获取参数 - target = self.action_data.get("target") - duration = self.action_data.get("duration") - reason = self.action_data.get("reason", "违反群规") - - # 参数验证 - if not target: - error_msg = "禁言目标不能为空" - logger.error(f"{self.log_prefix} {error_msg}") - await self.send_message_by_expressor("没有指定禁言对象呢~") - return False, error_msg - - if not duration: - error_msg = "禁言时长不能为空" - logger.error(f"{self.log_prefix} {error_msg}") - await self.send_message_by_expressor("没有指定禁言时长呢~") - return False, error_msg - - # 获取时长限制配置 - min_duration, max_duration, default_duration = self._get_duration_limits() - - # 验证时长格式并转换 - try: - duration_int = int(duration) - if duration_int <= 0: - error_msg = "禁言时长必须大于0" - logger.error(f"{self.log_prefix} {error_msg}") - error_templates = self.config.get("error_messages", ["禁言时长必须是正数哦~"]) - await self.send_message_by_expressor( - error_templates[2] if len(error_templates) > 2 else "禁言时长必须是正数哦~" - ) - return False, error_msg - - # 限制禁言时长范围 - if duration_int < min_duration: - duration_int = min_duration - logger.info(f"{self.log_prefix} 禁言时长过短,调整为{min_duration}秒") - elif duration_int > max_duration: - duration_int = max_duration - logger.info(f"{self.log_prefix} 禁言时长过长,调整为{max_duration}秒") - - except (ValueError, TypeError): - error_msg = f"禁言时长格式无效: {duration}" - logger.error(f"{self.log_prefix} {error_msg}") - error_templates = self.config.get("error_messages", ["禁言时长必须是数字哦~"]) - await self.send_message_by_expressor( - error_templates[3] if len(error_templates) > 3 else "禁言时长必须是数字哦~" - ) - return False, error_msg - - # 获取用户ID - try: - platform, user_id = await self.get_user_id_by_person_name(target) - except Exception as e: - error_msg = f"查找用户ID时出错: {e}" - logger.error(f"{self.log_prefix} {error_msg}") - await self.send_message_by_expressor("查找用户信息时出现问题~") - return False, error_msg - - if not user_id: - error_msg = f"未找到用户 {target} 的ID" - await self.send_message_by_expressor(f"找不到 {target} 这个人呢~") - logger.error(f"{self.log_prefix} {error_msg}") - return False, error_msg - - # 发送表达情绪的消息 - enable_formatting = self.config.get("enable_duration_formatting", True) - time_str = self._format_duration(duration_int) if enable_formatting else f"{duration_int}秒" - - # 使用模板化消息 - message = self._get_template_message(target, time_str, reason) - await self.send_message_by_expressor(message) - - try: - duration_str = str(duration_int) - - # 发送群聊禁言命令,按照新格式 - await self.send_message( - type="command", - data={"name": "GROUP_BAN", "args": {"qq_id": str(user_id), "duration": duration_str}}, - display_message=f"尝试禁言了 {target} {time_str}", - ) - - await self.store_action_info( - action_build_into_prompt=False, - action_prompt_display=f"你尝试禁言了 {target} {time_str},理由:{reason}", - ) - - logger.info(f"{self.log_prefix} 成功发送禁言命令,用户 {target}({user_id}),时长 {duration_int} 秒") - return True, f"成功禁言 {target},时长 {time_str}" - - except Exception as e: - logger.error(f"{self.log_prefix} 执行禁言动作时出错: {e}") - await self.send_message_by_expressor(f"执行禁言动作时出错: {e}") - return False, f"执行禁言动作时出错: {e}" - - def _format_duration(self, seconds: int) -> str: - """将秒数格式化为可读的时间字符串""" - if seconds < 60: - return f"{seconds}秒" - elif seconds < 3600: - minutes = seconds // 60 - remaining_seconds = seconds % 60 - if remaining_seconds > 0: - return f"{minutes}分{remaining_seconds}秒" - else: - return f"{minutes}分钟" - elif seconds < 86400: - hours = seconds // 3600 - remaining_minutes = (seconds % 3600) // 60 - if remaining_minutes > 0: - return f"{hours}小时{remaining_minutes}分钟" - else: - return f"{hours}小时" - else: - days = seconds // 86400 - remaining_hours = (seconds % 86400) // 3600 - if remaining_hours > 0: - return f"{days}天{remaining_hours}小时" - else: - return f"{days}天" diff --git a/src/plugins/mute_plugin/actions/mute_action_config.toml b/src/plugins/mute_plugin/actions/mute_action_config.toml deleted file mode 100644 index 0dceae50c..000000000 --- a/src/plugins/mute_plugin/actions/mute_action_config.toml +++ /dev/null @@ -1,29 +0,0 @@ -# 禁言动作配置文件 - -# 默认禁言时长限制(秒) -min_duration = 60 # 最短禁言时长 -max_duration = 2592000 # 最长禁言时长(30天) -default_duration = 300 # 默认禁言时长(5分钟) - -# 禁言消息模板 -templates = [ - "好的,禁言 {target} {duration},理由:{reason}", - "收到,对 {target} 执行禁言 {duration},因为{reason}", - "明白了,禁言 {target} {duration},原因是{reason}" -] - -# 错误消息模板 -error_messages = [ - "没有指定禁言对象呢~", - "没有指定禁言时长呢~", - "禁言时长必须是正数哦~", - "禁言时长必须是数字哦~", - "找不到 {target} 这个人呢~", - "查找用户信息时出现问题~" -] - -# 是否启用时长美化显示 -enable_duration_formatting = true - -# 是否记录禁言历史 -log_mute_history = true