refc:重构插件api,补全文档,合并expressor和replyer,分离reply和sender,新log浏览器

This commit is contained in:
SengokuCola
2025-06-19 20:20:34 +08:00
parent 7e05ede846
commit ab28b94e33
63 changed files with 5285 additions and 8316 deletions

View File

@@ -1,126 +1,165 @@
"""工具类API模块
提供了各种辅助功能
使用方式:
from src.plugin_system.apis import utils_api
plugin_path = utils_api.get_plugin_path()
data = utils_api.read_json_file("data.json")
timestamp = utils_api.get_timestamp()
"""
import os
import json
import time
import inspect
import datetime
import uuid
from typing import Any, Optional
from src.common.logger import get_logger
logger = get_logger("utils_api")
class UtilsAPI:
"""工具类API模块
# =============================================================================
# 文件操作API函数
# =============================================================================
提供了各种辅助功能
def get_plugin_path(caller_frame=None) -> str:
"""获取调用者插件的路径
Args:
caller_frame: 调用者的栈帧默认为None自动获取
Returns:
str: 插件目录的绝对路径
"""
try:
if caller_frame is None:
caller_frame = inspect.currentframe().f_back
def get_plugin_path(self) -> str:
"""获取当前插件的路径
Returns:
str: 插件目录的绝对路径
"""
import inspect
plugin_module_path = inspect.getfile(self.__class__)
plugin_module_path = inspect.getfile(caller_frame)
plugin_dir = os.path.dirname(plugin_module_path)
return plugin_dir
except Exception as e:
logger.error(f"[UtilsAPI] 获取插件路径失败: {e}")
return ""
def read_json_file(self, file_path: str, default: Any = None) -> Any:
"""读取JSON文件
Args:
file_path: 文件路径,可以是相对于插件目录的路径
default: 如果文件不存在或读取失败时返回的默认值
def read_json_file(file_path: str, default: Any = None) -> Any:
"""读取JSON文件
Returns:
Any: JSON数据或默认值
"""
try:
# 如果是相对路径,则相对于插件目录
if not os.path.isabs(file_path):
file_path = os.path.join(self.get_plugin_path(), file_path)
Args:
file_path: 文件路径,可以是相对于插件目录的路径
default: 如果文件不存在或读取失败时返回的默认值
if not os.path.exists(file_path):
logger.warning(f"{self.log_prefix} 文件不存在: {file_path}")
return default
Returns:
Any: JSON数据或默认值
"""
try:
# 如果是相对路径,则相对于调用者的插件目录
if not os.path.isabs(file_path):
caller_frame = inspect.currentframe().f_back
plugin_dir = get_plugin_path(caller_frame)
file_path = os.path.join(plugin_dir, file_path)
with open(file_path, "r", encoding="utf-8") as f:
return json.load(f)
except Exception as e:
logger.error(f"{self.log_prefix} 读取JSON文件出错: {e}")
if not os.path.exists(file_path):
logger.warning(f"[UtilsAPI] 文件不存在: {file_path}")
return default
def write_json_file(self, file_path: str, data: Any, indent: int = 2) -> bool:
"""写入JSON文件
with open(file_path, "r", encoding="utf-8") as f:
return json.load(f)
except Exception as e:
logger.error(f"[UtilsAPI] 读取JSON文件出错: {e}")
return default
Args:
file_path: 文件路径,可以是相对于插件目录的路径
data: 要写入的数据
indent: JSON缩进
Returns:
bool: 是否写入成功
"""
try:
# 如果是相对路径,则相对于插件目录
if not os.path.isabs(file_path):
file_path = os.path.join(self.get_plugin_path(), file_path)
def write_json_file(file_path: str, data: Any, indent: int = 2) -> bool:
"""写入JSON文件
# 确保目录存在
os.makedirs(os.path.dirname(file_path), exist_ok=True)
Args:
file_path: 文件路径,可以是相对于插件目录的路径
data: 要写入的数据
indent: JSON缩进
with open(file_path, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=indent)
return True
except Exception as e:
logger.error(f"{self.log_prefix} 写入JSON文件出错: {e}")
return False
Returns:
bool: 是否写入成功
"""
try:
# 如果是相对路径,则相对于调用者的插件目录
if not os.path.isabs(file_path):
caller_frame = inspect.currentframe().f_back
plugin_dir = get_plugin_path(caller_frame)
file_path = os.path.join(plugin_dir, file_path)
def get_timestamp(self) -> int:
"""获取当前时间戳
# 确保目录存在
os.makedirs(os.path.dirname(file_path), exist_ok=True)
Returns:
int: 当前时间戳(秒)
"""
return int(time.time())
with open(file_path, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=indent)
return True
except Exception as e:
logger.error(f"[UtilsAPI] 写入JSON文件出错: {e}")
return False
def format_time(self, timestamp: Optional[int] = None, format_str: str = "%Y-%m-%d %H:%M:%S") -> str:
"""格式化时间
Args:
timestamp: 时间戳如果为None则使用当前时间
format_str: 时间格式字符串
# =============================================================================
# 时间相关API函数
# =============================================================================
Returns:
str: 格式化后的时间字符串
"""
import datetime
def get_timestamp() -> int:
"""获取当前时间戳
Returns:
int: 当前时间戳(秒)
"""
return int(time.time())
def format_time(timestamp: Optional[int] = None, format_str: str = "%Y-%m-%d %H:%M:%S") -> str:
"""格式化时间
Args:
timestamp: 时间戳如果为None则使用当前时间
format_str: 时间格式字符串
Returns:
str: 格式化后的时间字符串
"""
try:
if timestamp is None:
timestamp = time.time()
return datetime.datetime.fromtimestamp(timestamp).strftime(format_str)
except Exception as e:
logger.error(f"[UtilsAPI] 格式化时间失败: {e}")
return ""
def parse_time(self, time_str: str, format_str: str = "%Y-%m-%d %H:%M:%S") -> int:
"""解析时间字符串为时间戳
Args:
time_str: 时间字符串
format_str: 时间格式字符串
def parse_time(time_str: str, format_str: str = "%Y-%m-%d %H:%M:%S") -> int:
"""解析时间字符串为时间戳
Returns:
int: 时间戳(秒)
"""
import datetime
Args:
time_str: 时间字符串
format_str: 时间格式字符串
Returns:
int: 时间戳(秒)
"""
try:
dt = datetime.datetime.strptime(time_str, format_str)
return int(dt.timestamp())
except Exception as e:
logger.error(f"[UtilsAPI] 解析时间失败: {e}")
return 0
def generate_unique_id(self) -> str:
"""生成唯一ID
Returns:
str: 唯一ID
"""
import uuid
# =============================================================================
# 其他工具函数
# =============================================================================
return str(uuid.uuid4())
def generate_unique_id() -> str:
"""生成唯一ID
Returns:
str: 唯一ID
"""
return str(uuid.uuid4())