diff --git a/src/chat/focus_chat/heartFC_chat.py b/src/chat/focus_chat/heartFC_chat.py index 4509826b3..d212579df 100644 --- a/src/chat/focus_chat/heartFC_chat.py +++ b/src/chat/focus_chat/heartFC_chat.py @@ -165,7 +165,7 @@ class HeartFChatting: if not global_config.focus_chat_processor.working_memory_processor: logger.debug(f"{self.log_prefix} 工作记忆处理器已禁用,跳过注册观察器 {name}") continue - + # 根据参数名使用正确的参数 kwargs = {param_name: self.stream_id} observation = observation_class(**kwargs) diff --git a/src/plugin_system/apis/config_api.py b/src/plugin_system/apis/config_api.py index a3dc64eb2..06f64a9da 100644 --- a/src/plugin_system/apis/config_api.py +++ b/src/plugin_system/apis/config_api.py @@ -29,29 +29,29 @@ class ConfigAPI: def get_config(self, key: str, default: Any = None) -> Any: """ 从插件配置中获取值,支持嵌套键访问 - + Args: key: 配置键名,支持嵌套访问如 "section.subsection.key" default: 如果配置不存在时返回的默认值 - + Returns: Any: 配置值或默认值 """ # 获取插件配置 - plugin_config = getattr(self, '_plugin_config', {}) + plugin_config = getattr(self, "_plugin_config", {}) if not plugin_config: return default - + # 支持嵌套键访问 - keys = key.split('.') + keys = key.split(".") current = plugin_config - + for k in keys: if isinstance(current, dict) and k in current: current = current[k] else: return default - + return current async def get_user_id_by_person_name(self, person_name: str) -> tuple[str, str]: diff --git a/src/plugin_system/apis/message_api.py b/src/plugin_system/apis/message_api.py index c3984618c..6b6dbdbe3 100644 --- a/src/plugin_system/apis/message_api.py +++ b/src/plugin_system/apis/message_api.py @@ -157,8 +157,6 @@ class MessageAPI: message_type="text", content=text, platform=platform, target_id=user_id, is_group=False ) - - def get_chat_type(self) -> str: """获取当前聊天类型 diff --git a/src/plugin_system/apis/plugin_api.py b/src/plugin_system/apis/plugin_api.py index 44d990c78..d38999065 100644 --- a/src/plugin_system/apis/plugin_api.py +++ b/src/plugin_system/apis/plugin_api.py @@ -33,7 +33,13 @@ class PluginAPI(MessageAPI, LLMAPI, DatabaseAPI, ConfigAPI, UtilsAPI, StreamAPI, """ def __init__( - self, chat_stream=None, expressor=None, replyer=None, observations=None, log_prefix: str = "[PluginAPI]", plugin_config: dict = None + self, + chat_stream=None, + expressor=None, + replyer=None, + observations=None, + log_prefix: str = "[PluginAPI]", + plugin_config: dict = None, ): """ 初始化插件API @@ -109,7 +115,12 @@ class PluginAPI(MessageAPI, LLMAPI, DatabaseAPI, ConfigAPI, UtilsAPI, StreamAPI, # 便捷的工厂函数 def create_plugin_api( - chat_stream=None, expressor=None, replyer=None, observations=None, log_prefix: str = "[Plugin]", plugin_config: dict = None + chat_stream=None, + expressor=None, + replyer=None, + observations=None, + log_prefix: str = "[Plugin]", + plugin_config: dict = None, ) -> PluginAPI: """ 创建插件API实例的便捷函数 @@ -126,7 +137,12 @@ def create_plugin_api( PluginAPI: 配置好的插件API实例 """ return PluginAPI( - chat_stream=chat_stream, expressor=expressor, replyer=replyer, observations=observations, log_prefix=log_prefix, plugin_config=plugin_config + chat_stream=chat_stream, + expressor=expressor, + replyer=replyer, + observations=observations, + log_prefix=log_prefix, + plugin_config=plugin_config, ) diff --git a/src/plugin_system/base/base_action.py b/src/plugin_system/base/base_action.py index 120ab7873..b9899f9a4 100644 --- a/src/plugin_system/base/base_action.py +++ b/src/plugin_system/base/base_action.py @@ -121,53 +121,45 @@ class BaseAction(ABC): Returns: bool: 是否发送成功 """ - chat_stream = self.api.get_service('chat_stream') + chat_stream = self.api.get_service("chat_stream") if not chat_stream: logger.error(f"{self.log_prefix} 没有可用的聊天流发送回复") return False - + if chat_stream.group_info: # 群聊 return await self.api.send_text_to_group( - text=content, - group_id=str(chat_stream.group_info.group_id), - platform=chat_stream.platform + text=content, group_id=str(chat_stream.group_info.group_id), platform=chat_stream.platform ) else: # 私聊 return await self.api.send_text_to_user( - text=content, - user_id=str(chat_stream.user_info.user_id), - platform=chat_stream.platform + text=content, user_id=str(chat_stream.user_info.user_id), platform=chat_stream.platform ) async def send_command(self, command_name: str, args: dict = None, display_message: str = None) -> bool: """发送命令消息 - + 使用和send_reply相同的方式通过MessageAPI发送命令 - + Args: command_name: 命令名称 args: 命令参数 display_message: 显示消息 - + Returns: bool: 是否发送成功 """ try: # 构造命令数据 - command_data = { - "name": command_name, - "args": args or {} - } - + command_data = {"name": command_name, "args": args or {}} + # 使用send_message_to_target方法发送命令 - chat_stream = self.api.get_service('chat_stream') + chat_stream = self.api.get_service("chat_stream") if not chat_stream: logger.error(f"{self.log_prefix} 没有可用的聊天流发送命令") return False - - + if chat_stream.group_info: # 群聊 success = await self.api.send_message_to_target( @@ -176,7 +168,7 @@ class BaseAction(ABC): platform=chat_stream.platform, target_id=str(chat_stream.group_info.group_id), is_group=True, - display_message=display_message or f"执行命令: {command_name}" + display_message=display_message or f"执行命令: {command_name}", ) else: # 私聊 @@ -186,16 +178,16 @@ class BaseAction(ABC): platform=chat_stream.platform, target_id=str(chat_stream.user_info.user_id), is_group=False, - display_message=display_message or f"执行命令: {command_name}" + display_message=display_message or f"执行命令: {command_name}", ) - + if success: logger.info(f"{self.log_prefix} 成功发送命令: {command_name}") else: logger.error(f"{self.log_prefix} 发送命令失败: {command_name}") - + return success - + except Exception as e: logger.error(f"{self.log_prefix} 发送命令时出错: {e}") return False @@ -213,7 +205,7 @@ class BaseAction(ABC): try: from src.chat.heart_flow.observation.chatting_observation import ChattingObservation from src.chat.focus_chat.hfc_utils import create_empty_anchor_message - + # 获取服务 expressor = self.api.get_service("expressor") chat_stream = self.api.get_service("chat_stream") @@ -281,7 +273,7 @@ class BaseAction(ABC): try: from src.chat.heart_flow.observation.chatting_observation import ChattingObservation from src.chat.focus_chat.hfc_utils import create_empty_anchor_message - + # 获取服务 replyer = self.api.get_service("replyer") chat_stream = self.api.get_service("chat_stream") diff --git a/src/plugin_system/base/base_command.py b/src/plugin_system/base/base_command.py index 42dd5328a..9596c5d38 100644 --- a/src/plugin_system/base/base_command.py +++ b/src/plugin_system/base/base_command.py @@ -82,28 +82,25 @@ class BaseCommand(ABC): async def send_command(self, command_name: str, args: dict = None, display_message: str = None) -> bool: """发送命令消息 - + 使用和send_reply相同的方式通过MessageAPI发送命令 - + Args: command_name: 命令名称 args: 命令参数 display_message: 显示消息 - + Returns: bool: 是否发送成功 """ try: # 构造命令数据 - command_data = { - "name": command_name, - "args": args or {} - } - + command_data = {"name": command_name, "args": args or {}} + # 使用send_message_to_target方法发送命令 chat_stream = self.message.chat_stream command_content = command_data - + if chat_stream.group_info: # 群聊 success = await self.api.send_message_to_target( @@ -112,7 +109,7 @@ class BaseCommand(ABC): platform=chat_stream.platform, target_id=str(chat_stream.group_info.group_id), is_group=True, - display_message=display_message or f"执行命令: {command_name}" + display_message=display_message or f"执行命令: {command_name}", ) else: # 私聊 @@ -122,16 +119,16 @@ class BaseCommand(ABC): platform=chat_stream.platform, target_id=str(chat_stream.user_info.user_id), is_group=False, - display_message=display_message or f"执行命令: {command_name}" + display_message=display_message or f"执行命令: {command_name}", ) - + if success: logger.info(f"{self.log_prefix} 成功发送命令: {command_name}") else: logger.error(f"{self.log_prefix} 发送命令失败: {command_name}") - + return success - + except Exception as e: logger.error(f"{self.log_prefix} 发送命令时出错: {e}") return False diff --git a/src/plugin_system/base/base_plugin.py b/src/plugin_system/base/base_plugin.py index 99c4335bb..9b000dbd0 100644 --- a/src/plugin_system/base/base_plugin.py +++ b/src/plugin_system/base/base_plugin.py @@ -177,15 +177,15 @@ class BasePlugin(ABC): Any: 配置值或默认值 """ # 支持嵌套键访问 - keys = key.split('.') + keys = key.split(".") current = self.config - + for k in keys: if isinstance(current, dict) and k in current: current = current[k] else: return default - + return current diff --git a/src/plugin_system/core/component_registry.py b/src/plugin_system/core/component_registry.py index 84eb54f13..d3283f5f2 100644 --- a/src/plugin_system/core/component_registry.py +++ b/src/plugin_system/core/component_registry.py @@ -164,7 +164,12 @@ class ComponentRegistry: if command_name: command_info = self.get_command_info(command_name) if command_info and command_info.enabled: - return command_class, match.groupdict(), command_info.intercept_message, command_info.plugin_name + return ( + command_class, + match.groupdict(), + command_info.intercept_message, + command_info.plugin_name, + ) return None # === 插件管理方法 === @@ -207,15 +212,16 @@ class ComponentRegistry: def get_plugin_config(self, plugin_name: str) -> Optional[dict]: """获取插件配置 - + Args: plugin_name: 插件名称 - + Returns: Optional[dict]: 插件配置字典或None """ # 从插件管理器获取插件实例的配置 from src.plugin_system.core.plugin_manager import plugin_manager + plugin_instance = plugin_manager.get_plugin_instance(plugin_name) return plugin_instance.config if plugin_instance else None diff --git a/src/plugin_system/core/plugin_manager.py b/src/plugin_system/core/plugin_manager.py index 34d86131e..ea2d0c51d 100644 --- a/src/plugin_system/core/plugin_manager.py +++ b/src/plugin_system/core/plugin_manager.py @@ -291,10 +291,10 @@ class PluginManager: def get_plugin_instance(self, plugin_name: str) -> Optional["BasePlugin"]: """获取插件实例 - + Args: plugin_name: 插件名称 - + Returns: Optional[BasePlugin]: 插件实例或None """ diff --git a/src/plugins/built_in/core_actions/plugin.py b/src/plugins/built_in/core_actions/plugin.py index c70b4b0b1..c7b9f483f 100644 --- a/src/plugins/built_in/core_actions/plugin.py +++ b/src/plugins/built_in/core_actions/plugin.py @@ -342,62 +342,48 @@ class CoreActionsPlugin(BasePlugin): return [ # 回复动作 - (ReplyAction.get_action_info( - name="reply", - description="参与聊天回复,处理文本和表情的发送" - ), ReplyAction), - + (ReplyAction.get_action_info(name="reply", description="参与聊天回复,处理文本和表情的发送"), ReplyAction), # 不回复动作 - (NoReplyAction.get_action_info( - name="no_reply", - description="暂时不回复消息,等待新消息或超时" - ), NoReplyAction), - + ( + NoReplyAction.get_action_info(name="no_reply", description="暂时不回复消息,等待新消息或超时"), + NoReplyAction, + ), # 表情动作 - (EmojiAction.get_action_info( - name="emoji", - description="发送表情包辅助表达情绪" - ), EmojiAction), - + (EmojiAction.get_action_info(name="emoji", description="发送表情包辅助表达情绪"), EmojiAction), # 退出专注聊天动作 - (ExitFocusChatAction.get_action_info( - name="exit_focus_chat", - description="退出专注聊天,从专注模式切换到普通模式" - ), ExitFocusChatAction), - + ( + ExitFocusChatAction.get_action_info( + name="exit_focus_chat", description="退出专注聊天,从专注模式切换到普通模式" + ), + ExitFocusChatAction, + ), # 示例Command - Ping命令 - (PingCommand.get_command_info( - name="ping", - description="测试机器人响应,拦截后续处理" - ), PingCommand), - + (PingCommand.get_command_info(name="ping", description="测试机器人响应,拦截后续处理"), PingCommand), # 示例Command - Log命令 - (LogCommand.get_command_info( - name="log", - description="记录消息到日志,不拦截后续处理" - ), LogCommand) + (LogCommand.get_command_info(name="log", description="记录消息到日志,不拦截后续处理"), LogCommand), ] # ===== 示例Command组件 ===== + class PingCommand(BaseCommand): """Ping命令 - 测试响应,拦截消息处理""" - + command_pattern = r"^/ping(\s+(?P.+))?$" command_help = "测试机器人响应 - 拦截后续处理" command_examples = ["/ping", "/ping 测试消息"] intercept_message = True # 拦截消息,不继续处理 - + async def execute(self) -> Tuple[bool, Optional[str]]: """执行ping命令""" try: message = self.matched_groups.get("message", "") reply_text = f"🏓 Pong! {message}" if message else "🏓 Pong!" - + await self.send_reply(reply_text) return True, f"发送ping响应: {reply_text}" - + except Exception as e: logger.error(f"Ping命令执行失败: {e}") return False, f"执行失败: {str(e)}" @@ -405,34 +391,34 @@ class PingCommand(BaseCommand): class LogCommand(BaseCommand): """日志命令 - 记录消息但不拦截后续处理""" - + command_pattern = r"^/log(\s+(?Pdebug|info|warn|error))?$" command_help = "记录当前消息到日志 - 不拦截后续处理" command_examples = ["/log", "/log info", "/log debug"] intercept_message = False # 不拦截消息,继续后续处理 - + async def execute(self) -> Tuple[bool, Optional[str]]: """执行日志命令""" try: level = self.matched_groups.get("level", "info") user_nickname = self.message.message_info.user_info.user_nickname content = self.message.processed_plain_text - + log_message = f"[{level.upper()}] 用户 {user_nickname}: {content}" - + # 根据级别记录日志 if level == "debug": logger.debug(log_message) elif level == "warn": - logger.warning(log_message) + logger.warning(log_message) elif level == "error": logger.error(log_message) else: logger.info(log_message) - + # 不发送回复,让消息继续处理 return True, f"已记录到{level}级别日志" - + except Exception as e: logger.error(f"Log命令执行失败: {e}") return False, f"执行失败: {str(e)}" diff --git a/src/plugins/built_in/doubao_pic_plugin/plugin.py b/src/plugins/built_in/doubao_pic_plugin/plugin.py index 575e2b5ec..1fc2a39d3 100644 --- a/src/plugins/built_in/doubao_pic_plugin/plugin.py +++ b/src/plugins/built_in/doubao_pic_plugin/plugin.py @@ -35,21 +35,24 @@ logger = get_logger("doubao_pic_plugin") # ===== Action组件 ===== + class DoubaoImageGenerationAction(BaseAction): """豆包图片生成Action - 根据描述使用火山引擎API生成图片""" - + # Action基本信息 action_name = "doubao_image_generation" - action_description = "可以根据特定的描述,生成并发送一张图片,如果没提供描述,就根据聊天内容生成,你可以立刻画好,不用等待" - + action_description = ( + "可以根据特定的描述,生成并发送一张图片,如果没提供描述,就根据聊天内容生成,你可以立刻画好,不用等待" + ) + # 激活设置 focus_activation_type = ActionActivationType.LLM_JUDGE # Focus模式使用LLM判定,精确理解需求 - normal_activation_type = ActionActivationType.KEYWORD # Normal模式使用关键词激活,快速响应 - + normal_activation_type = ActionActivationType.KEYWORD # Normal模式使用关键词激活,快速响应 + # 关键词设置(用于Normal模式) activation_keywords = ["画", "绘制", "生成图片", "画图", "draw", "paint", "图片生成"] keyword_case_sensitive = False - + # LLM判定提示词(用于Focus模式) llm_judge_prompt = """ 判定是否需要使用图片生成动作的条件: @@ -71,23 +74,23 @@ class DoubaoImageGenerationAction(BaseAction): 4. 技术讨论中提到绘图概念但无生成需求 5. 用户明确表示不需要图片时 """ - + mode_enable = ChatMode.ALL parallel_action = True - + # Action参数定义 action_parameters = { "description": "图片描述,输入你想要生成并发送的图片的描述,必填", "size": "图片尺寸,例如 '1024x1024' (可选, 默认从配置或 '1024x1024')", } - + # Action使用场景 action_require = [ "当有人让你画东西时使用,你可以立刻画好,不用等待", "当有人要求你生成并发送一张图片时使用", "当有人让你画一张图时使用", ] - + # 简单的请求缓存,避免短时间内重复请求 _request_cache = {} _cache_max_size = 10 @@ -95,7 +98,7 @@ class DoubaoImageGenerationAction(BaseAction): async def execute(self) -> Tuple[bool, Optional[str]]: """执行图片生成动作""" logger.info(f"{self.log_prefix} 执行豆包图片生成动作") - + # 配置验证 http_base_url = self.api.get_config("base_url") http_api_key = self.api.get_config("volcano_generate_api_key") @@ -141,7 +144,7 @@ class DoubaoImageGenerationAction(BaseAction): cached_result = self._request_cache[cache_key] logger.info(f"{self.log_prefix} 使用缓存的图片结果") await self.send_reply("我之前画过类似的图片,用之前的结果~") - + # 直接发送缓存的结果 send_success = await self._send_image(cached_result) if send_success: @@ -195,7 +198,7 @@ class DoubaoImageGenerationAction(BaseAction): # 缓存成功的结果 self._request_cache[cache_key] = base64_image_string self._cleanup_cache() - + await self.send_message_by_expressor("图片已发送!") return True, "图片已发送" else: @@ -243,11 +246,11 @@ class DoubaoImageGenerationAction(BaseAction): """发送图片""" try: # 使用聊天流信息确定发送目标 - chat_stream = self.api.get_service('chat_stream') + chat_stream = self.api.get_service("chat_stream") if not chat_stream: logger.error(f"{self.log_prefix} 没有可用的聊天流发送图片") return False - + if chat_stream.group_info: # 群聊 return await self.api.send_message_to_target( @@ -256,7 +259,7 @@ class DoubaoImageGenerationAction(BaseAction): platform=chat_stream.platform, target_id=str(chat_stream.group_info.group_id), is_group=True, - display_message="发送生成的图片" + display_message="发送生成的图片", ) else: # 私聊 @@ -266,7 +269,7 @@ class DoubaoImageGenerationAction(BaseAction): platform=chat_stream.platform, target_id=str(chat_stream.user_info.user_id), is_group=False, - display_message="发送生成的图片" + display_message="发送生成的图片", ) except Exception as e: logger.error(f"{self.log_prefix} 发送图片时出错: {e}") @@ -281,14 +284,14 @@ class DoubaoImageGenerationAction(BaseAction): def _cleanup_cache(cls): """清理缓存,保持大小在限制内""" if len(cls._request_cache) > cls._cache_max_size: - keys_to_remove = list(cls._request_cache.keys())[:-cls._cache_max_size//2] + keys_to_remove = list(cls._request_cache.keys())[: -cls._cache_max_size // 2] for key in keys_to_remove: del cls._request_cache[key] def _validate_image_size(self, image_size: str) -> bool: """验证图片尺寸格式""" try: - width, height = map(int, image_size.split('x')) + width, height = map(int, image_size.split("x")) return 100 <= width <= 10000 and 100 <= height <= 10000 except (ValueError, TypeError): return False @@ -380,14 +383,15 @@ class DoubaoImageGenerationAction(BaseAction): # ===== 插件主类 ===== + @register_plugin class DoubaoImagePlugin(BasePlugin): """豆包图片生成插件 - + 基于火山引擎豆包模型的AI图片生成插件: - 图片生成Action:根据描述使用火山引擎API生成图片 """ - + # 插件基本信息 plugin_name = "doubao_pic_plugin" plugin_description = "基于火山引擎豆包模型的AI图片生成插件" @@ -395,20 +399,17 @@ class DoubaoImagePlugin(BasePlugin): plugin_author = "MaiBot开发团队" enable_plugin = True config_file_name = "config.toml" - + def get_plugin_components(self) -> List[Tuple[ComponentInfo, Type]]: """返回插件包含的组件列表""" - + # 从配置获取组件启用状态 enable_image_generation = self.get_config("components.enable_image_generation", True) - + components = [] - + # 添加图片生成Action if enable_image_generation: - components.append(( - DoubaoImageGenerationAction.get_action_info(), - DoubaoImageGenerationAction - )) - - return components \ No newline at end of file + components.append((DoubaoImageGenerationAction.get_action_info(), DoubaoImageGenerationAction)) + + return components diff --git a/src/plugins/built_in/mute_plugin/plugin.py b/src/plugins/built_in/mute_plugin/plugin.py index 227017b54..4e83624f4 100644 --- a/src/plugins/built_in/mute_plugin/plugin.py +++ b/src/plugins/built_in/mute_plugin/plugin.py @@ -31,21 +31,22 @@ logger = get_logger("mute_plugin") # ===== Action组件 ===== + class MuteAction(BaseAction): """智能禁言Action - 基于LLM智能判断是否需要禁言""" - + # Action基本信息 action_name = "mute" action_description = "智能禁言系统,基于LLM判断是否需要禁言" - + # 激活设置 focus_activation_type = ActionActivationType.LLM_JUDGE # Focus模式使用LLM判定,确保谨慎 normal_activation_type = ActionActivationType.KEYWORD # Normal模式使用关键词激活,快速响应 - + # 关键词设置(用于Normal模式) activation_keywords = ["禁言", "mute", "ban", "silence"] keyword_case_sensitive = False - + # LLM判定提示词(用于Focus模式) llm_judge_prompt = """ 判定是否需要使用禁言动作的严格条件: @@ -63,52 +64,52 @@ class MuteAction(BaseAction): 4. 单纯的意见分歧或争论 """ - + mode_enable = ChatMode.ALL parallel_action = False - + # Action参数定义 action_parameters = { "target": "禁言对象,必填,输入你要禁言的对象的名字", "duration": "禁言时长,必填,输入你要禁言的时长(秒),单位为秒,必须为数字", - "reason": "禁言理由,可选" + "reason": "禁言理由,可选", } - + # Action使用场景 action_require = [ "当有人违反了公序良俗的内容", "当有人刷屏时使用", "当有人发了擦边,或者色情内容时使用", "当有人要求禁言自己时使用", - "如果某人已经被禁言了,就不要再次禁言了,除非你想追加时间!!" + "如果某人已经被禁言了,就不要再次禁言了,除非你想追加时间!!", ] async def execute(self) -> Tuple[bool, Optional[str]]: """执行智能禁言判定""" logger.info(f"{self.log_prefix} 执行智能禁言动作") - + # 获取参数 target = self.action_data.get("target") duration = self.action_data.get("duration") reason = self.action_data.get("reason", "违反群规") - + # 参数验证 if not target: error_msg = "禁言目标不能为空" logger.error(f"{self.log_prefix} {error_msg}") await self.send_reply("没有指定禁言对象呢~") return False, error_msg - + if not duration: error_msg = "禁言时长不能为空" logger.error(f"{self.log_prefix} {error_msg}") await self.send_reply("没有指定禁言时长呢~") return False, error_msg - + # 获取时长限制配置 min_duration = self.api.get_config("mute.min_duration", 60) max_duration = self.api.get_config("mute.max_duration", 2592000) - + # 验证时长格式并转换 try: duration_int = int(duration) @@ -117,7 +118,7 @@ class MuteAction(BaseAction): logger.error(f"{self.log_prefix} {error_msg}") await self.send_reply("禁言时长必须是正数哦~") return False, error_msg - + # 限制禁言时长范围 if duration_int < min_duration: duration_int = min_duration @@ -125,13 +126,13 @@ class MuteAction(BaseAction): elif duration_int > max_duration: duration_int = max_duration logger.info(f"{self.log_prefix} 禁言时长过长,调整为{max_duration}秒") - + except (ValueError, TypeError): error_msg = f"禁言时长格式无效: {duration}" logger.error(f"{self.log_prefix} {error_msg}") await self.send_reply("禁言时长必须是数字哦~") return False, error_msg - + # 获取用户ID try: platform, user_id = await self.api.get_user_id_by_person_name(target) @@ -140,29 +141,29 @@ class MuteAction(BaseAction): logger.error(f"{self.log_prefix} {error_msg}") await self.send_reply("查找用户信息时出现问题~") return False, error_msg - + if not user_id: error_msg = f"未找到用户 {target} 的ID" await self.send_reply(f"找不到 {target} 这个人呢~") logger.error(f"{self.log_prefix} {error_msg}") return False, error_msg - + # 格式化时长显示 enable_formatting = self.api.get_config("mute.enable_duration_formatting", True) time_str = self._format_duration(duration_int) if enable_formatting else f"{duration_int}秒" - + # 获取模板化消息 message = self._get_template_message(target, time_str, reason) # await self.send_reply(message) await self.send_message_by_expressor(message) - + # 发送群聊禁言命令 success = await self.send_command( command_name="GROUP_BAN", args={"qq_id": str(user_id), "duration": str(duration_int)}, - display_message=f"禁言了 {target} {time_str}" + display_message=f"禁言了 {target} {time_str}", ) - + if success: logger.info(f"{self.log_prefix} 成功发送禁言命令,用户 {target}({user_id}),时长 {duration_int} 秒") return True, f"成功禁言 {target},时长 {time_str}" @@ -174,12 +175,15 @@ class MuteAction(BaseAction): def _get_template_message(self, target: str, duration_str: str, reason: str) -> str: """获取模板化的禁言消息""" - templates = self.api.get_config("mute.templates", [ - "好的,禁言 {target} {duration},理由:{reason}", - "收到,对 {target} 执行禁言 {duration},因为{reason}", - "明白了,禁言 {target} {duration},原因是{reason}" - ]) - + templates = self.api.get_config( + "mute.templates", + [ + "好的,禁言 {target} {duration},理由:{reason}", + "收到,对 {target} 执行禁言 {duration},因为{reason}", + "明白了,禁言 {target} {duration},原因是{reason}", + ], + ) + template = random.choice(templates) return template.format(target=target, duration=duration_str, reason=reason) @@ -212,44 +216,41 @@ class MuteAction(BaseAction): # ===== Command组件 ===== + class MuteCommand(BaseCommand): """禁言命令 - 手动执行禁言操作""" - + # Command基本信息 command_name = "mute_command" command_description = "禁言命令,手动执行禁言操作" - + command_pattern = r"^/mute\s+(?P\S+)\s+(?P\d+)(?:\s+(?P.+))?$" command_help = "禁言指定用户,用法:/mute <用户名> <时长(秒)> [理由]" - command_examples = [ - "/mute 用户名 300", - "/mute 张三 600 刷屏", - "/mute @某人 1800 违规内容" - ] + command_examples = ["/mute 用户名 300", "/mute 张三 600 刷屏", "/mute @某人 1800 违规内容"] intercept_message = True # 拦截消息处理 - + async def execute(self) -> Tuple[bool, Optional[str]]: """执行禁言命令""" try: target = self.matched_groups.get("target") duration = self.matched_groups.get("duration") reason = self.matched_groups.get("reason", "管理员操作") - + if not all([target, duration]): await self.send_reply("❌ 命令参数不完整,请检查格式") return False, "参数不完整" - + # 获取时长限制配置 min_duration = self.api.get_config("mute.min_duration", 60) max_duration = self.api.get_config("mute.max_duration", 2592000) - + # 验证时长 try: duration_int = int(duration) if duration_int <= 0: await self.send_reply("❌ 禁言时长必须大于0") return False, "时长无效" - + # 限制禁言时长范围 if duration_int < min_duration: duration_int = min_duration @@ -257,11 +258,11 @@ class MuteCommand(BaseCommand): elif duration_int > max_duration: duration_int = max_duration await self.send_reply(f"⚠️ 禁言时长过长,调整为{max_duration}秒") - + except ValueError: await self.send_reply("❌ 禁言时长必须是数字") return False, "时长格式错误" - + # 获取用户ID try: platform, user_id = await self.api.get_user_id_by_person_name(target) @@ -269,35 +270,35 @@ class MuteCommand(BaseCommand): logger.error(f"{self.log_prefix} 查找用户ID时出错: {e}") await self.send_reply("❌ 查找用户信息时出现问题") return False, str(e) - + if not user_id: await self.send_reply(f"❌ 找不到用户: {target}") return False, "用户不存在" - + # 格式化时长显示 enable_formatting = self.api.get_config("mute.enable_duration_formatting", True) time_str = self._format_duration(duration_int) if enable_formatting else f"{duration_int}秒" - + logger.info(f"{self.log_prefix} 执行禁言命令: {target}({user_id}) -> {time_str}") - + # 发送群聊禁言命令 success = await self.send_command( command_name="GROUP_BAN", args={"qq_id": str(user_id), "duration": str(duration_int)}, - display_message=f"禁言了 {target} {time_str}" + display_message=f"禁言了 {target} {time_str}", ) - + if success: # 获取并发送模板化消息 message = self._get_template_message(target, time_str, reason) await self.send_reply(message) - + logger.info(f"{self.log_prefix} 成功禁言 {target}({user_id}),时长 {duration_int} 秒") return True, f"成功禁言 {target},时长 {time_str}" else: await self.send_reply("❌ 发送禁言命令失败") return False, "发送禁言命令失败" - + except Exception as e: logger.error(f"{self.log_prefix} 禁言命令执行失败: {e}") await self.send_reply(f"❌ 禁言命令错误: {str(e)}") @@ -305,12 +306,15 @@ class MuteCommand(BaseCommand): def _get_template_message(self, target: str, duration_str: str, reason: str) -> str: """获取模板化的禁言消息""" - templates = self.api.get_config("mute.templates", [ - "✅ 已禁言 {target} {duration},理由:{reason}", - "🔇 对 {target} 执行禁言 {duration},因为{reason}", - "⛔ 禁言 {target} {duration},原因:{reason}" - ]) - + templates = self.api.get_config( + "mute.templates", + [ + "✅ 已禁言 {target} {duration},理由:{reason}", + "🔇 对 {target} 执行禁言 {duration},因为{reason}", + "⛔ 禁言 {target} {duration},原因:{reason}", + ], + ) + template = random.choice(templates) return template.format(target=target, duration=duration_str, reason=reason) @@ -343,15 +347,16 @@ class MuteCommand(BaseCommand): # ===== 插件主类 ===== + @register_plugin class MutePlugin(BasePlugin): """禁言插件 - + 提供智能禁言功能: - 智能禁言Action:基于LLM判断是否需要禁言 - 禁言命令Command:手动执行禁言操作 """ - + # 插件基本信息 plugin_name = "mute_plugin" plugin_description = "群聊禁言管理插件,提供智能禁言功能" @@ -359,28 +364,22 @@ class MutePlugin(BasePlugin): plugin_author = "MaiBot开发团队" enable_plugin = True config_file_name = "config.toml" - + def get_plugin_components(self) -> List[Tuple[ComponentInfo, Type]]: """返回插件包含的组件列表""" - + # 从配置获取组件启用状态 enable_smart_mute = self.get_config("components.enable_smart_mute", True) enable_mute_command = self.get_config("components.enable_mute_command", True) - + components = [] - + # 添加智能禁言Action if enable_smart_mute: - components.append(( - MuteAction.get_action_info(), - MuteAction - )) - + components.append((MuteAction.get_action_info(), MuteAction)) + # 添加禁言命令Command if enable_mute_command: - components.append(( - MuteCommand.get_command_info(), - MuteCommand - )) - - return components \ No newline at end of file + components.append((MuteCommand.get_command_info(), MuteCommand)) + + return components diff --git a/src/plugins/examples/example_plugin/plugin.py b/src/plugins/examples/example_plugin/plugin.py index a0a45bb2c..ffd0cbd91 100644 --- a/src/plugins/examples/example_plugin/plugin.py +++ b/src/plugins/examples/example_plugin/plugin.py @@ -6,7 +6,7 @@ 包含功能: - 智能问候Action - 帮助系统Command -- 消息发送Command +- 消息发送Command - 状态查询Command - 回声Command - 自定义前缀Command @@ -24,6 +24,7 @@ from typing import List, Tuple, Type, Optional import time import random + # 导入新插件系统 from src.plugin_system.base.base_plugin import BasePlugin from src.plugin_system.base.base_plugin import register_plugin @@ -37,6 +38,7 @@ logger = get_logger("example_comprehensive") # ===== Action组件 ===== + class SmartGreetingAction(BaseAction): """智能问候Action - 基于关键词触发的问候系统""" @@ -49,66 +51,57 @@ class SmartGreetingAction(BaseAction): parallel_action = False # Action参数定义 - action_parameters = { - "username": "要问候的用户名(可选)" - } + action_parameters = {"username": "要问候的用户名(可选)"} # Action使用场景 - action_require = [ - "用户发送包含问候词汇的消息", - "检测到新用户加入时", - "响应友好交流需求" - ] + action_require = ["用户发送包含问候词汇的消息", "检测到新用户加入时", "响应友好交流需求"] # ===== Command组件 ===== + class ComprehensiveHelpCommand(BaseCommand): """综合帮助系统 - 显示所有可用命令和Action""" - + command_pattern = r"^/help(?:\s+(?P\w+))?$" command_help = "显示所有命令帮助或特定命令详情,用法:/help [命令名]" command_examples = ["/help", "/help send", "/help status"] intercept_message = True # 拦截消息,不继续处理 - + async def execute(self) -> Tuple[bool, Optional[str]]: """执行帮助命令""" try: command_name = self.matched_groups.get("command") - + if command_name: # 显示特定命令帮助 return await self._show_specific_help(command_name) else: # 显示所有命令概览 return await self._show_all_commands() - + except Exception as e: logger.error(f"{self.log_prefix} 帮助命令执行失败: {e}") await self.send_reply(f"❌ 帮助系统错误: {str(e)}") return False, str(e) - + async def _show_specific_help(self, command_name: str) -> Tuple[bool, str]: """显示特定命令的详细帮助""" # 这里可以扩展为动态获取所有注册的Command信息 help_info = { - "help": { - "description": "显示帮助信息", - "usage": "/help [命令名]", - "examples": ["/help", "/help send"] - }, + "help": {"description": "显示帮助信息", "usage": "/help [命令名]", "examples": ["/help", "/help send"]}, "send": { "description": "发送消息到指定目标", "usage": "/send <消息内容>", - "examples": ["/send group 123456 你好", "/send user 789456 私聊"] + "examples": ["/send group 123456 你好", "/send user 789456 私聊"], }, "status": { "description": "查询系统状态", "usage": "/status [类型]", - "examples": ["/status", "/status 系统", "/status 插件"] - } + "examples": ["/status", "/status 系统", "/status 插件"], + }, } - + info = help_info.get(command_name.lower()) if not info: response = f"❌ 未找到命令: {command_name}\n使用 /help 查看所有可用命令" @@ -116,15 +109,15 @@ class ComprehensiveHelpCommand(BaseCommand): response = f""" 📖 命令帮助: {command_name} -📝 描述: {info['description']} -⚙️ 用法: {info['usage']} +📝 描述: {info["description"]} +⚙️ 用法: {info["usage"]} 💡 示例: -{chr(10).join(f" • {example}" for example in info['examples'])} +{chr(10).join(f" • {example}" for example in info["examples"])} """.strip() - + await self.send_reply(response) return True, response - + async def _show_all_commands(self) -> Tuple[bool, str]: """显示所有可用命令""" help_text = """ @@ -149,61 +142,53 @@ class ComprehensiveHelpCommand(BaseCommand): 💡 使用 /help <命令名> 获取特定命令的详细说明 """.strip() - + await self.send_reply(help_text) return True, help_text class MessageSendCommand(BaseCommand): """消息发送Command - 向指定群聊或私聊发送消息""" - + command_pattern = r"^/send\s+(?Pgroup|user)\s+(?P\d+)\s+(?P.+)$" command_help = "向指定群聊或私聊发送消息,用法:/send <消息内容>" command_examples = [ "/send group 123456789 大家好!", "/send user 987654321 私聊消息", - "/send group 555666777 这是来自插件的消息" + "/send group 555666777 这是来自插件的消息", ] intercept_message = True # 拦截消息处理 - + async def execute(self) -> Tuple[bool, Optional[str]]: """执行消息发送""" try: target_type = self.matched_groups.get("target_type") target_id = self.matched_groups.get("target_id") content = self.matched_groups.get("content") - + if not all([target_type, target_id, content]): await self.send_reply("❌ 命令参数不完整,请检查格式") return False, "参数不完整" - + # 长度限制检查 max_length = self.api.get_config("send.max_message_length", 500) if len(content) > max_length: await self.send_reply(f"❌ 消息过长,最大长度: {max_length} 字符") return False, "消息过长" - + logger.info(f"{self.log_prefix} 发送消息: {target_type}:{target_id} -> {content[:50]}...") - + # 根据目标类型发送消息 if target_type == "group": - success = await self.api.send_text_to_group( - text=content, - group_id=target_id, - platform="qq" - ) + success = await self.api.send_text_to_group(text=content, group_id=target_id, platform="qq") target_desc = f"群聊 {target_id}" elif target_type == "user": - success = await self.api.send_text_to_user( - text=content, - user_id=target_id, - platform="qq" - ) + success = await self.api.send_text_to_user(text=content, user_id=target_id, platform="qq") target_desc = f"用户 {target_id}" else: await self.send_reply(f"❌ 不支持的目标类型: {target_type}") return False, f"不支持的目标类型: {target_type}" - + # 返回结果 if success: response = f"✅ 消息已成功发送到 {target_desc}" @@ -213,13 +198,13 @@ class MessageSendCommand(BaseCommand): response = f"❌ 消息发送失败,目标 {target_desc} 可能不存在" await self.send_reply(response) return False, response - + except Exception as e: logger.error(f"{self.log_prefix} 消息发送失败: {e}") error_msg = f"❌ 发送失败: {str(e)}" await self.send_reply(error_msg) return False, str(e) - + class DiceCommand(BaseCommand): """骰子命令,使用!前缀而不是/前缀""" @@ -282,40 +267,40 @@ class DiceCommand(BaseCommand): class EchoCommand(BaseCommand): """回声Command - 重复用户输入的消息""" - + command_pattern = r"^/echo\s+(?P.+)$" command_help = "重复你的消息内容,用法:/echo <消息内容>" command_examples = ["/echo Hello World", "/echo 你好世界", "/echo 测试回声"] intercept_message = True # 拦截消息处理 - + async def execute(self) -> Tuple[bool, Optional[str]]: """执行回声命令""" try: message = self.matched_groups.get("message", "") - + if not message: response = "❌ 请提供要重复的消息!用法:/echo <消息内容>" await self.send_reply(response) return False, response - + # 检查消息长度限制 max_length = self.api.get_config("echo.max_length", 200) if len(message) > max_length: response = f"❌ 消息过长,最大长度: {max_length} 字符" await self.send_reply(response) return False, response - + # 格式化回声消息 enable_formatting = self.api.get_config("echo.enable_formatting", True) if enable_formatting: response = f"🔊 回声: {message}" else: response = message - + await self.send_reply(response) logger.info(f"{self.log_prefix} 回声消息: {message}") return True, response - + except Exception as e: logger.error(f"{self.log_prefix} 回声命令失败: {e}") error_msg = f"❌ 回声失败: {str(e)}" @@ -325,21 +310,21 @@ class EchoCommand(BaseCommand): class MessageInfoCommand(BaseCommand): """消息信息Command - 显示当前消息的详细信息""" - + command_pattern = r"^/info$" command_help = "显示当前消息的详细信息" command_examples = ["/info"] intercept_message = True # 拦截消息处理 - + async def execute(self) -> Tuple[bool, Optional[str]]: """执行消息信息查询""" try: message = self.message - + # 收集消息信息 user_info = message.message_info.user_info group_info = message.message_info.group_info - + info_parts = [ "📋 消息信息详情", "", @@ -355,38 +340,39 @@ class MessageInfoCommand(BaseCommand): f" • 原始内容: {message.processed_plain_text[:100]}{'...' if len(message.processed_plain_text) > 100 else ''}", f" • 是否表情: {'是' if getattr(message, 'is_emoji', False) else '否'}", ] - + # 群聊信息 if group_info: - info_parts.extend([ - "", - "👥 群聊信息:", - f" • 群ID: {group_info.group_id}", - f" • 群名: {getattr(group_info, 'group_name', '未知')}", - " • 聊天类型: 群聊" - ]) + info_parts.extend( + [ + "", + "👥 群聊信息:", + f" • 群ID: {group_info.group_id}", + f" • 群名: {getattr(group_info, 'group_name', '未知')}", + " • 聊天类型: 群聊", + ] + ) else: - info_parts.extend([ - "", - "💭 聊天类型: 私聊" - ]) - + info_parts.extend(["", "💭 聊天类型: 私聊"]) + # 流信息 - if hasattr(message, 'chat_stream') and message.chat_stream: + if hasattr(message, "chat_stream") and message.chat_stream: stream = message.chat_stream - info_parts.extend([ - "", - "🌊 聊天流信息:", - f" • 流ID: {stream.stream_id}", - f" • 创建时间: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(stream.create_time))}", - f" • 最后活跃: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(stream.last_active_time))}" - ]) - + info_parts.extend( + [ + "", + "🌊 聊天流信息:", + f" • 流ID: {stream.stream_id}", + f" • 创建时间: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(stream.create_time))}", + f" • 最后活跃: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(stream.last_active_time))}", + ] + ) + response = "\n".join(info_parts) await self.send_reply(response) logger.info(f"{self.log_prefix} 显示消息信息: {user_info.user_id}") return True, response - + except Exception as e: logger.error(f"{self.log_prefix} 消息信息查询失败: {e}") error_msg = f"❌ 信息查询失败: {str(e)}" @@ -397,7 +383,7 @@ class MessageInfoCommand(BaseCommand): @register_plugin class ExampleComprehensivePlugin(BasePlugin): """综合示例插件 - + 整合了旧示例插件的所有功能,展示新插件系统的完整能力: - 多种Action和Command组件 - 拦截控制功能演示 @@ -405,7 +391,7 @@ class ExampleComprehensivePlugin(BasePlugin): - 完整的错误处理 - 日志记录和监控 """ - + # 插件基本信息 plugin_name = "example_plugin" plugin_description = "综合示例插件,展示新插件系统的完整功能" @@ -413,10 +399,10 @@ class ExampleComprehensivePlugin(BasePlugin): plugin_author = "MaiBot开发团队" enable_plugin = True config_file_name = "config.toml" - + def get_plugin_components(self) -> List[Tuple[ComponentInfo, Type]]: """返回插件包含的组件列表""" - + # 从配置获取组件启用状态 enable_greeting = self.get_config("components.enable_greeting", True) enable_help = self.get_config("components.enable_help", True) @@ -425,61 +411,53 @@ class ExampleComprehensivePlugin(BasePlugin): enable_info = self.get_config("components.enable_info", True) enable_dice = self.get_config("components.enable_dice", True) components = [] - + # 添加Action组件 if enable_greeting: - components.append(( - SmartGreetingAction.get_action_info( - name="smart_greeting", - description="智能问候系统,基于关键词触发" - ), - SmartGreetingAction - )) - + components.append( + ( + SmartGreetingAction.get_action_info( + name="smart_greeting", description="智能问候系统,基于关键词触发" + ), + SmartGreetingAction, + ) + ) + # 添加Command组件 if enable_help: - components.append(( - ComprehensiveHelpCommand.get_command_info( - name="comprehensive_help", - description="综合帮助系统,显示所有命令信息" - ), - ComprehensiveHelpCommand - )) - + components.append( + ( + ComprehensiveHelpCommand.get_command_info( + name="comprehensive_help", description="综合帮助系统,显示所有命令信息" + ), + ComprehensiveHelpCommand, + ) + ) + if enable_send: - components.append(( - MessageSendCommand.get_command_info( - name="message_send", - description="消息发送命令,支持群聊和私聊" - ), - MessageSendCommand - )) - + components.append( + ( + MessageSendCommand.get_command_info( + name="message_send", description="消息发送命令,支持群聊和私聊" + ), + MessageSendCommand, + ) + ) + if enable_echo: - components.append(( - EchoCommand.get_command_info( - name="echo", - description="回声命令,重复用户输入" - ), - EchoCommand - )) - + components.append( + (EchoCommand.get_command_info(name="echo", description="回声命令,重复用户输入"), EchoCommand) + ) + if enable_info: - components.append(( - MessageInfoCommand.get_command_info( - name="message_info", - description="消息信息查询,显示详细信息" - ), - MessageInfoCommand - )) - + components.append( + ( + MessageInfoCommand.get_command_info(name="message_info", description="消息信息查询,显示详细信息"), + MessageInfoCommand, + ) + ) + if enable_dice: - components.append(( - DiceCommand.get_command_info( - name="dice", - description="骰子命令,掷骰子" - ), - DiceCommand - )) - - return components \ No newline at end of file + components.append((DiceCommand.get_command_info(name="dice", description="骰子命令,掷骰子"), DiceCommand)) + + return components