diff --git a/src/plugin_system/base/base_plugin.py b/src/plugin_system/base/base_plugin.py index ca80f9ce0..5c7edd23b 100644 --- a/src/plugin_system/base/base_plugin.py +++ b/src/plugin_system/base/base_plugin.py @@ -306,10 +306,10 @@ class BasePlugin(ABC): """备份配置文件""" import shutil import datetime - + timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") backup_path = f"{config_file_path}.backup_{timestamp}" - + try: shutil.copy2(config_file_path, backup_path) logger.info(f"{self.log_prefix} 配置文件已备份到: {backup_path}") @@ -320,26 +320,31 @@ class BasePlugin(ABC): def _migrate_config_values(self, old_config: Dict[str, Any], new_config: Dict[str, Any]) -> Dict[str, Any]: """将旧配置值迁移到新配置结构中 - + Args: old_config: 旧配置数据 new_config: 基于新schema生成的默认配置 - + Returns: Dict[str, Any]: 迁移后的配置 """ - def migrate_section(old_section: Dict[str, Any], new_section: Dict[str, Any], section_name: str) -> Dict[str, Any]: + + def migrate_section( + old_section: Dict[str, Any], new_section: Dict[str, Any], section_name: str + ) -> Dict[str, Any]: """迁移单个配置节""" result = new_section.copy() - + for key, value in old_section.items(): if key in new_section: # 特殊处理:config_version字段总是使用新版本 if section_name == "plugin" and key == "config_version": # 保持新的版本号,不迁移旧值 - logger.debug(f"{self.log_prefix} 更新配置版本: {section_name}.{key} = {result[key]} (旧值: {value})") + logger.debug( + f"{self.log_prefix} 更新配置版本: {section_name}.{key} = {result[key]} (旧值: {value})" + ) continue - + # 键存在于新配置中,复制值 if isinstance(value, dict) and isinstance(new_section[key], dict): # 递归处理嵌套字典 @@ -350,26 +355,32 @@ class BasePlugin(ABC): else: # 键在新配置中不存在,记录警告 logger.warning(f"{self.log_prefix} 配置项 {section_name}.{key} 在新版本中已被移除") - + return result migrated_config = {} - + # 迁移每个配置节 for section_name, new_section_data in new_config.items(): - if section_name in old_config and isinstance(old_config[section_name], dict) and isinstance(new_section_data, dict): - migrated_config[section_name] = migrate_section(old_config[section_name], new_section_data, section_name) + if ( + section_name in old_config + and isinstance(old_config[section_name], dict) + and isinstance(new_section_data, dict) + ): + migrated_config[section_name] = migrate_section( + old_config[section_name], new_section_data, section_name + ) else: # 新增的节或类型不匹配,使用默认值 migrated_config[section_name] = new_section_data if section_name in old_config: logger.warning(f"{self.log_prefix} 配置节 {section_name} 结构已改变,使用默认值") - + # 检查旧配置中是否有新配置没有的节 for section_name in old_config.keys(): if section_name not in migrated_config: logger.warning(f"{self.log_prefix} 配置节 {section_name} 在新版本中已被移除") - + return migrated_config def _generate_config_from_schema(self) -> Dict[str, Any]: @@ -378,19 +389,19 @@ class BasePlugin(ABC): return {} config_data = {} - + # 遍历每个配置节 for section, fields in self.config_schema.items(): if isinstance(fields, dict): section_data = {} - + # 遍历节内的字段 for field_name, field in fields.items(): if isinstance(field, ConfigField): section_data[field_name] = field.default - + config_data[section] = section_data - + return config_data def _save_config_to_file(self, config_data: Dict[str, Any], config_file_path: str): @@ -402,7 +413,7 @@ class BasePlugin(ABC): toml_str = f"# {self.plugin_name} - 配置文件\n" plugin_description = self.get_manifest_info("description", "插件配置文件") toml_str += f"# {plugin_description}\n" - + # 获取当前期望的配置版本 expected_version = self._get_expected_config_version() toml_str += f"# 配置版本: {expected_version}\n\n" @@ -418,7 +429,7 @@ class BasePlugin(ABC): # 遍历节内的字段 if isinstance(fields, dict) and section in config_data: section_data = config_data[section] - + for field_name, field in fields.items(): if isinstance(field, ConfigField): # 添加字段描述 @@ -502,31 +513,33 @@ class BasePlugin(ABC): # 加载现有配置 with open(config_file_path, "r", encoding="utf-8") as f: existing_config = toml.load(f) or {} - + # 检查配置版本 current_version = self._get_current_config_version(existing_config) - + # 如果配置文件没有版本信息,跳过版本检查 if current_version == "0.0.0": logger.debug(f"{self.log_prefix} 配置文件无版本信息,跳过版本检查") self.config = existing_config else: expected_version = self._get_expected_config_version() - + if current_version != expected_version: - logger.info(f"{self.log_prefix} 检测到配置版本需要更新: 当前=v{current_version}, 期望=v{expected_version}") - + logger.info( + f"{self.log_prefix} 检测到配置版本需要更新: 当前=v{current_version}, 期望=v{expected_version}" + ) + # 生成新的默认配置结构 new_config_structure = self._generate_config_from_schema() - + # 迁移旧配置值到新结构 migrated_config = self._migrate_config_values(existing_config, new_config_structure) - + # 保存迁移后的配置 self._save_config_to_file(migrated_config, config_file_path) - + logger.info(f"{self.log_prefix} 配置文件已从 v{current_version} 更新到 v{expected_version}") - + self.config = migrated_config else: logger.debug(f"{self.log_prefix} 配置版本匹配 (v{current_version}),直接加载") diff --git a/src/plugins/built_in/mute_plugin/plugin.py b/src/plugins/built_in/mute_plugin/plugin.py index 72e5637f1..71e2f29c5 100644 --- a/src/plugins/built_in/mute_plugin/plugin.py +++ b/src/plugins/built_in/mute_plugin/plugin.py @@ -93,31 +93,31 @@ class MuteAction(BaseAction): def _check_group_permission(self) -> Tuple[bool, Optional[str]]: """检查当前群是否有禁言动作权限 - + Returns: Tuple[bool, Optional[str]]: (是否有权限, 错误信息) """ # 如果不是群聊,直接返回False if not self.is_group: return False, "禁言动作只能在群聊中使用" - + # 获取权限配置 allowed_groups = self.get_config("permissions.allowed_groups", []) - + # 如果配置为空,表示不启用权限控制 if not allowed_groups: logger.info(f"{self.log_prefix} 群组权限未配置,允许所有群使用禁言动作") return True, None - + # 检查当前群是否在允许列表中 current_group_key = f"{self.platform}:{self.group_id}" for allowed_group in allowed_groups: if allowed_group == current_group_key: logger.info(f"{self.log_prefix} 群组 {current_group_key} 有禁言动作权限") return True, None - + logger.warning(f"{self.log_prefix} 群组 {current_group_key} 没有禁言动作权限") - return False, f"当前群组没有使用禁言动作的权限" + return False, "当前群组没有使用禁言动作的权限" async def execute(self) -> Tuple[bool, Optional[str]]: """执行智能禁言判定""" @@ -276,7 +276,7 @@ class MuteCommand(BaseCommand): def _check_user_permission(self) -> Tuple[bool, Optional[str]]: """检查当前用户是否有禁言命令权限 - + Returns: Tuple[bool, Optional[str]]: (是否有权限, 错误信息) """ @@ -284,27 +284,27 @@ class MuteCommand(BaseCommand): chat_stream = self.message.chat_stream if not chat_stream: return False, "无法获取聊天流信息" - + current_platform = chat_stream.platform current_user_id = str(chat_stream.user_info.user_id) - + # 获取权限配置 allowed_users = self.get_config("permissions.allowed_users", []) - + # 如果配置为空,表示不启用权限控制 if not allowed_users: logger.info(f"{self.log_prefix} 用户权限未配置,允许所有用户使用禁言命令") return True, None - + # 检查当前用户是否在允许列表中 current_user_key = f"{current_platform}:{current_user_id}" for allowed_user in allowed_users: if allowed_user == current_user_key: logger.info(f"{self.log_prefix} 用户 {current_user_key} 有禁言命令权限") return True, None - + logger.warning(f"{self.log_prefix} 用户 {current_user_key} 没有禁言命令权限") - return False, f"你没有使用禁言命令的权限" + return False, "你没有使用禁言命令的权限" async def execute(self) -> Tuple[bool, Optional[str]]: """执行禁言命令"""