# 工具API 工具API模块提供了各种辅助功能,包括文件操作、时间处理、唯一ID生成等常用工具函数。 ## 导入方式 ```python from src.plugin_system.apis import utils_api ``` ## 主要功能 ### 1. 文件操作 #### `get_plugin_path(caller_frame=None) -> str` 获取调用者插件的路径 **参数:** - `caller_frame`:调用者的栈帧,默认为None(自动获取) **返回:** - `str`:插件目录的绝对路径 **示例:** ```python plugin_path = utils_api.get_plugin_path() print(f"插件路径: {plugin_path}") ``` #### `read_json_file(file_path: str, default: Any = None) -> Any` 读取JSON文件 **参数:** - `file_path`:文件路径,可以是相对于插件目录的路径 - `default`:如果文件不存在或读取失败时返回的默认值 **返回:** - `Any`:JSON数据或默认值 **示例:** ```python # 读取插件配置文件 config = utils_api.read_json_file("config.json", {}) settings = utils_api.read_json_file("data/settings.json", {"enabled": True}) ``` #### `write_json_file(file_path: str, data: Any, indent: int = 2) -> bool` 写入JSON文件 **参数:** - `file_path`:文件路径,可以是相对于插件目录的路径 - `data`:要写入的数据 - `indent`:JSON缩进 **返回:** - `bool`:是否写入成功 **示例:** ```python data = {"name": "test", "value": 123} success = utils_api.write_json_file("output.json", data) ``` ### 2. 时间相关 #### `get_timestamp() -> int` 获取当前时间戳 **返回:** - `int`:当前时间戳(秒) #### `format_time(timestamp: Optional[int] = None, format_str: str = "%Y-%m-%d %H:%M:%S") -> str` 格式化时间 **参数:** - `timestamp`:时间戳,如果为None则使用当前时间 - `format_str`:时间格式字符串 **返回:** - `str`:格式化后的时间字符串 #### `parse_time(time_str: str, format_str: str = "%Y-%m-%d %H:%M:%S") -> int` 解析时间字符串为时间戳 **参数:** - `time_str`:时间字符串 - `format_str`:时间格式字符串 **返回:** - `int`:时间戳(秒) ### 3. 其他工具 #### `generate_unique_id() -> str` 生成唯一ID **返回:** - `str`:唯一ID ## 使用示例 ### 1. 插件数据管理 ```python from src.plugin_system.apis import utils_api class DataPlugin(BasePlugin): def __init__(self): self.plugin_path = utils_api.get_plugin_path() self.data_file = "plugin_data.json" self.load_data() def load_data(self): """加载插件数据""" default_data = { "users": {}, "settings": {"enabled": True}, "stats": {"message_count": 0} } self.data = utils_api.read_json_file(self.data_file, default_data) def save_data(self): """保存插件数据""" return utils_api.write_json_file(self.data_file, self.data) async def handle_action(self, action_data, chat_stream): # 更新统计信息 self.data["stats"]["message_count"] += 1 self.data["stats"]["last_update"] = utils_api.get_timestamp() # 保存数据 if self.save_data(): return {"success": True, "message": "数据已保存"} else: return {"success": False, "message": "数据保存失败"} ``` ### 2. 日志记录系统 ```python class PluginLogger: def __init__(self, plugin_name: str): self.plugin_name = plugin_name self.log_file = f"{plugin_name}_log.json" self.logs = utils_api.read_json_file(self.log_file, []) def log_event(self, event_type: str, message: str, data: dict = None): """记录事件""" log_entry = { "id": utils_api.generate_unique_id(), "timestamp": utils_api.get_timestamp(), "formatted_time": utils_api.format_time(), "event_type": event_type, "message": message, "data": data or {} } self.logs.append(log_entry) # 保持最新的100条记录 if len(self.logs) > 100: self.logs = self.logs[-100:] # 保存到文件 utils_api.write_json_file(self.log_file, self.logs) def get_logs_by_type(self, event_type: str) -> list: """获取指定类型的日志""" return [log for log in self.logs if log["event_type"] == event_type] def get_recent_logs(self, count: int = 10) -> list: """获取最近的日志""" return self.logs[-count:] # 使用示例 logger = PluginLogger("my_plugin") logger.log_event("user_action", "用户发送了消息", {"user_id": "123", "message": "hello"}) ``` ### 3. 配置管理系统 ```python class ConfigManager: def __init__(self, config_file: str = "plugin_config.json"): self.config_file = config_file self.default_config = { "enabled": True, "debug": False, "max_users": 100, "response_delay": 1.0, "features": { "auto_reply": True, "logging": True } } self.config = self.load_config() def load_config(self) -> dict: """加载配置""" return utils_api.read_json_file(self.config_file, self.default_config) def save_config(self) -> bool: """保存配置""" return utils_api.write_json_file(self.config_file, self.config, indent=4) def get(self, key: str, default=None): """获取配置值,支持嵌套访问""" keys = key.split('.') value = self.config for k in keys: if isinstance(value, dict) and k in value: value = value[k] else: return default return value def set(self, key: str, value): """设置配置值,支持嵌套设置""" keys = key.split('.') config = self.config for k in keys[:-1]: if k not in config: config[k] = {} config = config[k] config[keys[-1]] = value def update_config(self, updates: dict): """批量更新配置""" def deep_update(base, updates): for key, value in updates.items(): if isinstance(value, dict) and key in base and isinstance(base[key], dict): deep_update(base[key], value) else: base[key] = value deep_update(self.config, updates) # 使用示例 config = ConfigManager() print(f"调试模式: {config.get('debug', False)}") print(f"自动回复: {config.get('features.auto_reply', True)}") config.set('features.new_feature', True) config.save_config() ``` ### 4. 缓存系统 ```python class PluginCache: def __init__(self, cache_file: str = "plugin_cache.json", ttl: int = 3600): self.cache_file = cache_file self.ttl = ttl # 缓存过期时间(秒) self.cache = self.load_cache() def load_cache(self) -> dict: """加载缓存""" return utils_api.read_json_file(self.cache_file, {}) def save_cache(self): """保存缓存""" return utils_api.write_json_file(self.cache_file, self.cache) def get(self, key: str): """获取缓存值""" if key not in self.cache: return None item = self.cache[key] current_time = utils_api.get_timestamp() # 检查是否过期 if current_time - item["timestamp"] > self.ttl: del self.cache[key] return None return item["value"] def set(self, key: str, value): """设置缓存值""" self.cache[key] = { "value": value, "timestamp": utils_api.get_timestamp() } self.save_cache() def clear_expired(self): """清理过期缓存""" current_time = utils_api.get_timestamp() expired_keys = [] for key, item in self.cache.items(): if current_time - item["timestamp"] > self.ttl: expired_keys.append(key) for key in expired_keys: del self.cache[key] if expired_keys: self.save_cache() return len(expired_keys) # 使用示例 cache = PluginCache(ttl=1800) # 30分钟过期 cache.set("user_data_123", {"name": "张三", "score": 100}) user_data = cache.get("user_data_123") ``` ### 5. 时间处理工具 ```python class TimeHelper: @staticmethod def get_time_info(): """获取当前时间的详细信息""" timestamp = utils_api.get_timestamp() return { "timestamp": timestamp, "datetime": utils_api.format_time(timestamp), "date": utils_api.format_time(timestamp, "%Y-%m-%d"), "time": utils_api.format_time(timestamp, "%H:%M:%S"), "year": utils_api.format_time(timestamp, "%Y"), "month": utils_api.format_time(timestamp, "%m"), "day": utils_api.format_time(timestamp, "%d"), "weekday": utils_api.format_time(timestamp, "%A") } @staticmethod def time_ago(timestamp: int) -> str: """计算时间差""" current = utils_api.get_timestamp() diff = current - timestamp if diff < 60: return f"{diff}秒前" elif diff < 3600: return f"{diff // 60}分钟前" elif diff < 86400: return f"{diff // 3600}小时前" else: return f"{diff // 86400}天前" @staticmethod def parse_duration(duration_str: str) -> int: """解析时间段字符串,返回秒数""" import re pattern = r'(\d+)([smhd])' matches = re.findall(pattern, duration_str.lower()) total_seconds = 0 for value, unit in matches: value = int(value) if unit == 's': total_seconds += value elif unit == 'm': total_seconds += value * 60 elif unit == 'h': total_seconds += value * 3600 elif unit == 'd': total_seconds += value * 86400 return total_seconds # 使用示例 time_info = TimeHelper.get_time_info() print(f"当前时间: {time_info['datetime']}") last_seen = 1699000000 print(f"最后见面: {TimeHelper.time_ago(last_seen)}") duration = TimeHelper.parse_duration("1h30m") # 1小时30分钟 = 5400秒 ``` ## 最佳实践 ### 1. 错误处理 ```python def safe_file_operation(file_path: str, data: dict): """安全的文件操作""" try: success = utils_api.write_json_file(file_path, data) if not success: logger.warning(f"文件写入失败: {file_path}") return success except Exception as e: logger.error(f"文件操作出错: {e}") return False ``` ### 2. 路径处理 ```python import os def get_data_path(filename: str) -> str: """获取数据文件的完整路径""" plugin_path = utils_api.get_plugin_path() data_dir = os.path.join(plugin_path, "data") # 确保数据目录存在 os.makedirs(data_dir, exist_ok=True) return os.path.join(data_dir, filename) ``` ### 3. 定期清理 ```python async def cleanup_old_files(): """清理旧文件""" plugin_path = utils_api.get_plugin_path() current_time = utils_api.get_timestamp() for filename in os.listdir(plugin_path): if filename.endswith('.tmp'): file_path = os.path.join(plugin_path, filename) file_time = os.path.getmtime(file_path) # 删除超过24小时的临时文件 if current_time - file_time > 86400: os.remove(file_path) ``` ## 注意事项 1. **相对路径**:文件路径支持相对于插件目录的路径 2. **自动创建目录**:写入文件时会自动创建必要的目录 3. **错误处理**:所有函数都有错误处理,失败时返回默认值 4. **编码格式**:文件读写使用UTF-8编码 5. **时间格式**:时间戳使用秒为单位 6. **JSON格式**:JSON文件使用可读性好的缩进格式