docs(maizone): 为回复跟踪服务添加详细文档和注释

为 ReplyTrackerService 类及其所有方法添加了全面的文档字符串(docstrings)和内联注释。

此次更新旨在提高代码的可读性和可维护性,详细阐明了以下方面:
- 服务的核心职责和初始化流程。
- 从旧文件系统到新存储API的一次性数据迁移逻辑。
- 各个公共和私有方法的具体功能、参数及作用。
This commit is contained in:
minecraft1024a
2025-11-15 17:05:55 +08:00
parent 79ff981776
commit bd45899dce

View File

@@ -13,30 +13,43 @@ import orjson
from src.common.logger import get_logger from src.common.logger import get_logger
from src.plugin_system.apis.storage_api import get_local_storage from src.plugin_system.apis.storage_api import get_local_storage
# 初始化日志记录器
logger = get_logger("MaiZone.ReplyTrackerService") logger = get_logger("MaiZone.ReplyTrackerService")
class ReplyTrackerService: class ReplyTrackerService:
""" """
评论回复跟踪服务 评论回复跟踪服务
使用插件存储API持久化存储已回复的评论ID
本服务负责持久化存储已回复的评论ID以防止对同一评论的重复回复。
它利用了插件系统的 `storage_api` 来实现统一和安全的数据管理。
在初始化时它还会自动处理从旧版文件存储到新版API的数据迁移。
""" """
def __init__(self): def __init__(self):
# 使用新的存储API """
初始化回复跟踪服务。
- 获取专用的插件存储实例。
- 设置数据清理的配置。
- 执行一次性数据迁移(如果需要)。
- 从存储中加载已有的回复记录。
"""
# 使用插件存储API获取一个名为 "maizone_reply_tracker" 的专属存储空间
self.storage = get_local_storage("maizone_reply_tracker") self.storage = get_local_storage("maizone_reply_tracker")
# 内存中已回复评论记录 # 内存中维护已回复评论记录,以提高访问速度
# 格式: {feed_id: {comment_id: timestamp, ...}, ...} # 数据结构为: {feed_id: {comment_id: timestamp, ...}, ...}
self.replied_comments: dict[str, dict[str, float]] = {} self.replied_comments: dict[str, dict[str, float]] = {}
# 数据清理配置 # 配置记录的最大保留天数,过期将被清理
self.max_record_days = 30 # 保留30天的记录 self.max_record_days = 30
# --- 一次性数据迁移 --- # --- 核心初始化流程 ---
# 步骤1: 检查并执行从旧文件到新存储API的一次性数据迁移
self._perform_one_time_migration() self._perform_one_time_migration()
# 从新存储加载数据 # 步骤2: 从新存储API中加载数据来初始化服务状态
initial_data = self.storage.get("data", {}) initial_data = self.storage.get("data", {})
if self._validate_data(initial_data): if self._validate_data(initial_data):
self.replied_comments = initial_data self.replied_comments = initial_data
@@ -45,64 +58,96 @@ class ReplyTrackerService:
f"总计 {sum(len(comments) for comments in self.replied_comments.values())} 条评论" f"总计 {sum(len(comments) for comments in self.replied_comments.values())} 条评论"
) )
else: else:
# 如果数据格式校验失败,则初始化为空字典以保证服务的稳定性
logger.error("从存储API加载的数据格式无效将创建新的记录") logger.error("从存储API加载的数据格式无效将创建新的记录")
self.replied_comments = {} self.replied_comments = {}
logger.debug(f"ReplyTrackerService initialized with data file: {self.storage.file_path}") logger.debug(f"ReplyTrackerService 初始化完成,使用数据文件: {self.storage.file_path}")
def _perform_one_time_migration(self): def _perform_one_time_migration(self):
""" """
执行一次性数据迁移从旧的JSON文件到新的存储API 执行一次性数据迁移。
该函数会检查是否存在旧的 `replied_comments.json` 文件。
如果存在它会读取数据验证其格式将其写入新的存储API
然后将旧文件重命名为备份文件,以完成迁移。
这是一个安全操作,旨在平滑过渡。
""" """
# 定义旧数据文件的路径
old_data_file = Path(__file__).resolve().parent.parent / "data" / "replied_comments.json" old_data_file = Path(__file__).resolve().parent.parent / "data" / "replied_comments.json"
# 仅当旧文件存在时才执行迁移
if old_data_file.exists(): if old_data_file.exists():
logger.info(f"检测到旧的数据文件 '{old_data_file}',开始执行一次性迁移...") logger.info(f"检测到旧的数据文件 '{old_data_file}',开始执行一次性迁移...")
try: try:
# 读取旧文件内容
with open(old_data_file, "rb") as f: with open(old_data_file, "rb") as f:
file_content = f.read() file_content = f.read()
# 如果文件为空,直接删除,无需迁移
if not file_content.strip(): if not file_content.strip():
logger.warning("旧数据文件为空,无需迁移。") logger.warning("旧数据文件为空,无需迁移。")
os.remove(old_data_file) os.remove(old_data_file)
logger.info(f"空的旧数据文件 '{old_data_file}' 已被删除。") logger.info(f"空的旧数据文件 '{old_data_file}' 已被删除。")
return return
# 解析JSON数据
old_data = orjson.loads(file_content) old_data = orjson.loads(file_content)
# 验证数据格式是否正确
if self._validate_data(old_data): if self._validate_data(old_data):
# 将数据写入新存储 # 验证通过,将数据写入新存储API
self.storage.set("data", old_data) self.storage.set("data", old_data)
# 立即强制保存确保迁移完成 # 立即强制保存确保迁移数据落盘
self.storage._save_data() self.storage._save_data()
logger.info("旧数据已成功迁移到新的存储API。") logger.info("旧数据已成功迁移到新的存储API。")
# 备份旧文件而不是删除
# 将旧文件重命名为备份文件,而不是直接删除,以防万一
backup_file = old_data_file.with_suffix(f".json.bak.migrated.{int(time.time())}") backup_file = old_data_file.with_suffix(f".json.bak.migrated.{int(time.time())}")
old_data_file.rename(backup_file) old_data_file.rename(backup_file)
logger.info(f"旧数据文件已成功迁移并备份为: {backup_file}") logger.info(f"旧数据文件已成功迁移并备份为: {backup_file}")
else: else:
# 如果数据格式无效,迁移中止,并备份损坏的文件
logger.error("旧数据文件格式无效,迁移中止。") logger.error("旧数据文件格式无效,迁移中止。")
backup_file = old_data_file.with_suffix(f".json.bak.invalid.{int(time.time())}") backup_file = old_data_file.with_suffix(f".json.bak.invalid.{int(time.time())}")
old_data_file.rename(backup_file) old_data_file.rename(backup_file)
logger.warning(f"已将无效的旧数据文件备份为: {backup_file}") logger.warning(f"已将无效的旧数据文件备份为: {backup_file}")
except Exception as e: except Exception as e:
# 捕获迁移过程中可能出现的任何异常
logger.error(f"迁移旧数据文件时发生错误: {e}", exc_info=True) logger.error(f"迁移旧数据文件时发生错误: {e}", exc_info=True)
def _validate_data(self, data: Any) -> bool: def _validate_data(self, data: Any) -> bool:
"""验证加载的数据格式是否正确""" """
验证加载的数据格式是否正确。
Args:
data (Any): 待验证的数据。
Returns:
bool: 如果数据格式符合预期则返回 True否则返回 False。
"""
# 顶级结构必须是字典
if not isinstance(data, dict): if not isinstance(data, dict):
logger.error("加载的数据不是字典格式") logger.error("加载的数据不是字典格式")
return False return False
# 遍历每个说说(feed)的记录
for feed_id, comments in data.items(): for feed_id, comments in data.items():
# 说说ID必须是字符串
if not isinstance(feed_id, str): if not isinstance(feed_id, str):
logger.error(f"无效的说说ID格式: {feed_id}") logger.error(f"无效的说说ID格式: {feed_id}")
return False return False
# 评论记录必须是字典
if not isinstance(comments, dict): if not isinstance(comments, dict):
logger.error(f"说说 {feed_id} 的评论数据不是字典格式") logger.error(f"说说 {feed_id} 的评论数据不是字典格式")
return False return False
# 遍历每条评论
for comment_id, timestamp in comments.items(): for comment_id, timestamp in comments.items():
# 评论ID必须是字符串或整数
if not isinstance(comment_id, str | int): if not isinstance(comment_id, str | int):
logger.error(f"无效的评论ID格式: {comment_id}") logger.error(f"无效的评论ID格式: {comment_id}")
return False return False
# 时间戳必须是整数或浮点数
if not isinstance(timestamp, int | float): if not isinstance(timestamp, int | float):
logger.error(f"无效的时间戳格式: {timestamp}") logger.error(f"无效的时间戳格式: {timestamp}")
return False return False
@@ -111,36 +156,47 @@ class ReplyTrackerService:
def _persist_data(self): def _persist_data(self):
""" """
清理、验证并持久化数据到存储API。 清理、验证并持久化数据到存储API。
这是一个核心的内部方法,用于将内存中的 `self.replied_comments` 数据
通过 `storage_api` 保存到磁盘。它封装了清理和验证的逻辑。
""" """
try: try:
# 第一步:清理内存中的过期记录
self._cleanup_old_records() self._cleanup_old_records()
# 第二步:验证当前数据格式是否有效,防止坏数据写入
if not self._validate_data(self.replied_comments): if not self._validate_data(self.replied_comments):
logger.error("当前内存中的数据格式无效,取消保存") logger.error("当前内存中的数据格式无效,取消保存")
return return
# 第三步调用存储API的set方法将数据暂存。API会处理后续的延迟写入
self.storage.set("data", self.replied_comments) self.storage.set("data", self.replied_comments)
logger.debug(f"回复记录已暂存将由存储API在后台保存") logger.debug("回复记录已暂存将由存储API在后台保存")
except Exception as e: except Exception as e:
logger.error(f"持久化回复记录失败: {e}", exc_info=True) logger.error(f"持久化回复记录失败: {e}", exc_info=True)
def _cleanup_old_records(self): def _cleanup_old_records(self):
"""清理超过保留期限的记录""" """
清理内存中超过保留期限的回复记录。
"""
current_time = time.time() current_time = time.time()
# 计算N天前的时间戳作为清理的阈值
cutoff_time = current_time - (self.max_record_days * 24 * 60 * 60) cutoff_time = current_time - (self.max_record_days * 24 * 60 * 60)
total_removed = 0 total_removed = 0
# 找出所有评论都已过期的说说记录
feeds_to_remove = [ feeds_to_remove = [
feed_id feed_id
for feed_id, comments in self.replied_comments.items() for feed_id, comments in self.replied_comments.items()
if not any(timestamp >= cutoff_time for timestamp in comments.values()) if not any(timestamp >= cutoff_time for timestamp in comments.values())
] ]
# 先移除整个过期的说说 # 先整体移除这些完全过期的说说记录,效率更高
for feed_id in feeds_to_remove: for feed_id in feeds_to_remove:
total_removed += len(self.replied_comments[feed_id]) total_removed += len(self.replied_comments[feed_id])
del self.replied_comments[feed_id] del self.replied_comments[feed_id]
# 再清理部分过期的评论 # 然后遍历剩余的说说,清理其中部分过期的评论记录
for feed_id, comments in self.replied_comments.items(): for feed_id, comments in self.replied_comments.items():
comments_to_remove = [comment_id for comment_id, timestamp in comments.items() if timestamp < cutoff_time] comments_to_remove = [comment_id for comment_id, timestamp in comments.items() if timestamp < cutoff_time]
for comment_id in comments_to_remove: for comment_id in comments_to_remove:
@@ -151,52 +207,104 @@ class ReplyTrackerService:
logger.info(f"清理了 {total_removed} 条超过{self.max_record_days}天的过期回复记录") logger.info(f"清理了 {total_removed} 条超过{self.max_record_days}天的过期回复记录")
def has_replied(self, feed_id: str, comment_id: str | int) -> bool: def has_replied(self, feed_id: str, comment_id: str | int) -> bool:
"""检查是否已经回复过指定的评论""" """
检查是否已经回复过指定的评论。
Args:
feed_id (str): 说说ID。
comment_id (str | int): 评论ID。
Returns:
bool: 如果已回复过返回True否则返回False。
"""
if not feed_id or comment_id is None: if not feed_id or comment_id is None:
return False return False
# 将评论ID统一转为字符串进行比较
comment_id_str = str(comment_id) comment_id_str = str(comment_id)
return feed_id in self.replied_comments and comment_id_str in self.replied_comments[feed_id] return feed_id in self.replied_comments and comment_id_str in self.replied_comments[feed_id]
def mark_as_replied(self, feed_id: str, comment_id: str | int): def mark_as_replied(self, feed_id: str, comment_id: str | int):
"""标记指定评论为已回复""" """
标记指定评论为已回复,并触发数据持久化。
Args:
feed_id (str): 说说ID。
comment_id (str | int): 评论ID。
"""
if not feed_id or comment_id is None: if not feed_id or comment_id is None:
logger.warning("feed_id 或 comment_id 为空,无法标记为已回复") logger.warning("feed_id 或 comment_id 为空,无法标记为已回复")
return return
# 将评论ID统一转为字符串作为键
comment_id_str = str(comment_id) comment_id_str = str(comment_id)
# 如果是该说说下的第一条回复,则初始化内层字典
if feed_id not in self.replied_comments: if feed_id not in self.replied_comments:
self.replied_comments[feed_id] = {} self.replied_comments[feed_id] = {}
# 记录回复时间
self.replied_comments[feed_id][comment_id_str] = time.time() self.replied_comments[feed_id][comment_id_str] = time.time()
# 调用持久化方法保存数据
self._persist_data() self._persist_data()
logger.info(f"已标记评论为已回复: feed_id={feed_id}, comment_id={comment_id}") logger.info(f"已标记评论为已回复: feed_id={feed_id}, comment_id={comment_id}")
def get_replied_comments(self, feed_id: str) -> set[str]: def get_replied_comments(self, feed_id: str) -> set[str]:
"""获取指定说说下所有已回复的评论ID""" """
获取指定说说下所有已回复的评论ID集合。
Args:
feed_id (str): 说说ID。
Returns:
set[str]: 已回复的评论ID集合。
"""
# 使用 .get() 避免当 feed_id 不存在时发生KeyError
return {str(cid) for cid in self.replied_comments.get(feed_id, {}).keys()} return {str(cid) for cid in self.replied_comments.get(feed_id, {}).keys()}
def get_stats(self) -> dict[str, Any]: def get_stats(self) -> dict[str, Any]:
"""获取回复记录统计信息""" """
获取回复记录的统计信息。
Returns:
dict[str, Any]: 包含统计信息的字典。
"""
total_feeds = len(self.replied_comments) total_feeds = len(self.replied_comments)
total_replies = sum(len(comments) for comments in self.replied_comments.values()) total_replies = sum(len(comments) for comments in self.replied_comments.values())
return { return {
"total_feeds_with_replies": total_feeds, "total_feeds_with_replies": total_feeds,
"total_replied_comments": total_replies, "total_replied_comments": total_replies,
# 从存储实例获取准确的数据文件路径
"data_file": str(self.storage.file_path), "data_file": str(self.storage.file_path),
"max_record_days": self.max_record_days, "max_record_days": self.max_record_days,
} }
def remove_reply_record(self, feed_id: str, comment_id: str): def remove_reply_record(self, feed_id: str, comment_id: str):
"""移除指定评论的回复记录""" """
移除指定评论的回复记录。
Args:
feed_id (str): 说说ID。
comment_id (str): 评论ID。
"""
# 确保记录存在再执行删除
if feed_id in self.replied_comments and comment_id in self.replied_comments[feed_id]: if feed_id in self.replied_comments and comment_id in self.replied_comments[feed_id]:
del self.replied_comments[feed_id][comment_id] del self.replied_comments[feed_id][comment_id]
# 如果该说说下已无任何回复记录,则清理掉整个条目
if not self.replied_comments[feed_id]: if not self.replied_comments[feed_id]:
del self.replied_comments[feed_id] del self.replied_comments[feed_id]
# 调用持久化方法保存更改
self._persist_data() self._persist_data()
logger.debug(f"已移除回复记录: feed_id={feed_id}, comment_id={comment_id}") logger.debug(f"已移除回复记录: feed_id={feed_id}, comment_id={comment_id}")
def remove_feed_records(self, feed_id: str): def remove_feed_records(self, feed_id: str):
"""移除指定说说的所有回复记录""" """
移除指定说说的所有回复记录。
Args:
feed_id (str): 说说ID。
"""
# 确保记录存在再执行删除
if feed_id in self.replied_comments: if feed_id in self.replied_comments:
del self.replied_comments[feed_id] del self.replied_comments[feed_id]
# 调用持久化方法保存更改
self._persist_data() self._persist_data()
logger.info(f"已移除说说 {feed_id} 的所有回复记录") logger.info(f"已移除说说 {feed_id} 的所有回复记录")