docs(maizone): 为回复跟踪服务添加详细文档和注释
为 ReplyTrackerService 类及其所有方法添加了全面的文档字符串(docstrings)和内联注释。 此次更新旨在提高代码的可读性和可维护性,详细阐明了以下方面: - 服务的核心职责和初始化流程。 - 从旧文件系统到新存储API的一次性数据迁移逻辑。 - 各个公共和私有方法的具体功能、参数及作用。
This commit is contained in:
committed by
Windpicker-owo
parent
64ba38312e
commit
7475f87826
@@ -13,30 +13,43 @@ import orjson
|
||||
from src.common.logger import get_logger
|
||||
from src.plugin_system.apis.storage_api import get_local_storage
|
||||
|
||||
# 初始化日志记录器
|
||||
logger = get_logger("MaiZone.ReplyTrackerService")
|
||||
|
||||
|
||||
class ReplyTrackerService:
|
||||
"""
|
||||
评论回复跟踪服务
|
||||
使用插件存储API持久化存储已回复的评论ID
|
||||
|
||||
本服务负责持久化存储已回复的评论ID,以防止对同一评论的重复回复。
|
||||
它利用了插件系统的 `storage_api` 来实现统一和安全的数据管理。
|
||||
在初始化时,它还会自动处理从旧版文件存储到新版API的数据迁移。
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
# 使用新的存储API
|
||||
"""
|
||||
初始化回复跟踪服务。
|
||||
|
||||
- 获取专用的插件存储实例。
|
||||
- 设置数据清理的配置。
|
||||
- 执行一次性数据迁移(如果需要)。
|
||||
- 从存储中加载已有的回复记录。
|
||||
"""
|
||||
# 使用插件存储API,获取一个名为 "maizone_reply_tracker" 的专属存储空间
|
||||
self.storage = get_local_storage("maizone_reply_tracker")
|
||||
|
||||
# 内存中的已回复评论记录
|
||||
# 格式: {feed_id: {comment_id: timestamp, ...}, ...}
|
||||
# 在内存中维护已回复的评论记录,以提高访问速度
|
||||
# 数据结构为: {feed_id: {comment_id: timestamp, ...}, ...}
|
||||
self.replied_comments: dict[str, dict[str, float]] = {}
|
||||
|
||||
# 数据清理配置
|
||||
self.max_record_days = 30 # 保留30天的记录
|
||||
# 配置记录的最大保留天数,过期将被清理
|
||||
self.max_record_days = 30
|
||||
|
||||
# --- 一次性数据迁移 ---
|
||||
# --- 核心初始化流程 ---
|
||||
# 步骤1: 检查并执行从旧文件到新存储API的一次性数据迁移
|
||||
self._perform_one_time_migration()
|
||||
|
||||
# 从新存储加载数据
|
||||
# 步骤2: 从新的存储API中加载数据来初始化服务状态
|
||||
initial_data = self.storage.get("data", {})
|
||||
if self._validate_data(initial_data):
|
||||
self.replied_comments = initial_data
|
||||
@@ -45,64 +58,96 @@ class ReplyTrackerService:
|
||||
f"总计 {sum(len(comments) for comments in self.replied_comments.values())} 条评论"
|
||||
)
|
||||
else:
|
||||
# 如果数据格式校验失败,则初始化为空字典以保证服务的稳定性
|
||||
logger.error("从存储API加载的数据格式无效,将创建新的记录")
|
||||
self.replied_comments = {}
|
||||
|
||||
logger.debug(f"ReplyTrackerService initialized with data file: {self.storage.file_path}")
|
||||
logger.debug(f"ReplyTrackerService 初始化完成,使用数据文件: {self.storage.file_path}")
|
||||
|
||||
def _perform_one_time_migration(self):
|
||||
"""
|
||||
执行一次性数据迁移,从旧的JSON文件到新的存储API。
|
||||
执行一次性数据迁移。
|
||||
|
||||
该函数会检查是否存在旧的 `replied_comments.json` 文件。
|
||||
如果存在,它会读取数据,验证其格式,将其写入新的存储API,
|
||||
然后将旧文件重命名为备份文件,以完成迁移。
|
||||
这是一个安全操作,旨在平滑过渡。
|
||||
"""
|
||||
# 定义旧数据文件的路径
|
||||
old_data_file = Path(__file__).resolve().parent.parent / "data" / "replied_comments.json"
|
||||
|
||||
# 仅当旧文件存在时才执行迁移
|
||||
if old_data_file.exists():
|
||||
logger.info(f"检测到旧的数据文件 '{old_data_file}',开始执行一次性迁移...")
|
||||
try:
|
||||
# 读取旧文件内容
|
||||
with open(old_data_file, "rb") as f:
|
||||
file_content = f.read()
|
||||
# 如果文件为空,直接删除,无需迁移
|
||||
if not file_content.strip():
|
||||
logger.warning("旧数据文件为空,无需迁移。")
|
||||
os.remove(old_data_file)
|
||||
logger.info(f"空的旧数据文件 '{old_data_file}' 已被删除。")
|
||||
return
|
||||
|
||||
# 解析JSON数据
|
||||
old_data = orjson.loads(file_content)
|
||||
|
||||
# 验证数据格式是否正确
|
||||
if self._validate_data(old_data):
|
||||
# 将数据写入新存储
|
||||
# 验证通过,将数据写入新的存储API
|
||||
self.storage.set("data", old_data)
|
||||
# 立即强制保存以确保迁移完成
|
||||
# 立即强制保存,确保迁移数据落盘
|
||||
self.storage._save_data()
|
||||
logger.info("旧数据已成功迁移到新的存储API。")
|
||||
# 备份旧文件而不是删除
|
||||
|
||||
# 将旧文件重命名为备份文件,而不是直接删除,以防万一
|
||||
backup_file = old_data_file.with_suffix(f".json.bak.migrated.{int(time.time())}")
|
||||
old_data_file.rename(backup_file)
|
||||
logger.info(f"旧数据文件已成功迁移并备份为: {backup_file}")
|
||||
else:
|
||||
# 如果数据格式无效,迁移中止,并备份损坏的文件
|
||||
logger.error("旧数据文件格式无效,迁移中止。")
|
||||
backup_file = old_data_file.with_suffix(f".json.bak.invalid.{int(time.time())}")
|
||||
old_data_file.rename(backup_file)
|
||||
logger.warning(f"已将无效的旧数据文件备份为: {backup_file}")
|
||||
|
||||
except Exception as e:
|
||||
# 捕获迁移过程中可能出现的任何异常
|
||||
logger.error(f"迁移旧数据文件时发生错误: {e}", exc_info=True)
|
||||
|
||||
def _validate_data(self, data: Any) -> bool:
|
||||
"""验证加载的数据格式是否正确"""
|
||||
"""
|
||||
验证加载的数据格式是否正确。
|
||||
|
||||
Args:
|
||||
data (Any): 待验证的数据。
|
||||
|
||||
Returns:
|
||||
bool: 如果数据格式符合预期则返回 True,否则返回 False。
|
||||
"""
|
||||
# 顶级结构必须是字典
|
||||
if not isinstance(data, dict):
|
||||
logger.error("加载的数据不是字典格式")
|
||||
return False
|
||||
|
||||
# 遍历每个说说(feed)的记录
|
||||
for feed_id, comments in data.items():
|
||||
# 说说ID必须是字符串
|
||||
if not isinstance(feed_id, str):
|
||||
logger.error(f"无效的说说ID格式: {feed_id}")
|
||||
return False
|
||||
# 评论记录必须是字典
|
||||
if not isinstance(comments, dict):
|
||||
logger.error(f"说说 {feed_id} 的评论数据不是字典格式")
|
||||
return False
|
||||
# 遍历每条评论
|
||||
for comment_id, timestamp in comments.items():
|
||||
# 评论ID必须是字符串或整数
|
||||
if not isinstance(comment_id, str | int):
|
||||
logger.error(f"无效的评论ID格式: {comment_id}")
|
||||
return False
|
||||
# 时间戳必须是整数或浮点数
|
||||
if not isinstance(timestamp, int | float):
|
||||
logger.error(f"无效的时间戳格式: {timestamp}")
|
||||
return False
|
||||
@@ -111,36 +156,47 @@ class ReplyTrackerService:
|
||||
def _persist_data(self):
|
||||
"""
|
||||
清理、验证并持久化数据到存储API。
|
||||
|
||||
这是一个核心的内部方法,用于将内存中的 `self.replied_comments` 数据
|
||||
通过 `storage_api` 保存到磁盘。它封装了清理和验证的逻辑。
|
||||
"""
|
||||
try:
|
||||
# 第一步:清理内存中的过期记录
|
||||
self._cleanup_old_records()
|
||||
|
||||
# 第二步:验证当前数据格式是否有效,防止坏数据写入
|
||||
if not self._validate_data(self.replied_comments):
|
||||
logger.error("当前内存中的数据格式无效,取消保存")
|
||||
return
|
||||
|
||||
# 第三步:调用存储API的set方法,将数据暂存。API会处理后续的延迟写入
|
||||
self.storage.set("data", self.replied_comments)
|
||||
logger.debug(f"回复记录已暂存,将由存储API在后台保存")
|
||||
logger.debug("回复记录已暂存,将由存储API在后台保存")
|
||||
except Exception as e:
|
||||
logger.error(f"持久化回复记录失败: {e}", exc_info=True)
|
||||
|
||||
def _cleanup_old_records(self):
|
||||
"""清理超过保留期限的记录"""
|
||||
"""
|
||||
清理内存中超过保留期限的回复记录。
|
||||
"""
|
||||
current_time = time.time()
|
||||
# 计算N天前的时间戳,作为清理的阈值
|
||||
cutoff_time = current_time - (self.max_record_days * 24 * 60 * 60)
|
||||
total_removed = 0
|
||||
|
||||
# 找出所有评论都已过期的说说记录
|
||||
feeds_to_remove = [
|
||||
feed_id
|
||||
for feed_id, comments in self.replied_comments.items()
|
||||
if not any(timestamp >= cutoff_time for timestamp in comments.values())
|
||||
]
|
||||
|
||||
# 先移除整个过期的说说
|
||||
# 先整体移除这些完全过期的说说记录,效率更高
|
||||
for feed_id in feeds_to_remove:
|
||||
total_removed += len(self.replied_comments[feed_id])
|
||||
del self.replied_comments[feed_id]
|
||||
|
||||
# 再清理部分过期的评论
|
||||
# 然后遍历剩余的说说,清理其中部分过期的评论记录
|
||||
for feed_id, comments in self.replied_comments.items():
|
||||
comments_to_remove = [comment_id for comment_id, timestamp in comments.items() if timestamp < cutoff_time]
|
||||
for comment_id in comments_to_remove:
|
||||
@@ -151,52 +207,104 @@ class ReplyTrackerService:
|
||||
logger.info(f"清理了 {total_removed} 条超过{self.max_record_days}天的过期回复记录")
|
||||
|
||||
def has_replied(self, feed_id: str, comment_id: str | int) -> bool:
|
||||
"""检查是否已经回复过指定的评论"""
|
||||
"""
|
||||
检查是否已经回复过指定的评论。
|
||||
|
||||
Args:
|
||||
feed_id (str): 说说ID。
|
||||
comment_id (str | int): 评论ID。
|
||||
|
||||
Returns:
|
||||
bool: 如果已回复过返回True,否则返回False。
|
||||
"""
|
||||
if not feed_id or comment_id is None:
|
||||
return False
|
||||
# 将评论ID统一转为字符串进行比较
|
||||
comment_id_str = str(comment_id)
|
||||
return feed_id in self.replied_comments and comment_id_str in self.replied_comments[feed_id]
|
||||
|
||||
def mark_as_replied(self, feed_id: str, comment_id: str | int):
|
||||
"""标记指定评论为已回复"""
|
||||
"""
|
||||
标记指定评论为已回复,并触发数据持久化。
|
||||
|
||||
Args:
|
||||
feed_id (str): 说说ID。
|
||||
comment_id (str | int): 评论ID。
|
||||
"""
|
||||
if not feed_id or comment_id is None:
|
||||
logger.warning("feed_id 或 comment_id 为空,无法标记为已回复")
|
||||
return
|
||||
|
||||
# 将评论ID统一转为字符串作为键
|
||||
comment_id_str = str(comment_id)
|
||||
# 如果是该说说下的第一条回复,则初始化内层字典
|
||||
if feed_id not in self.replied_comments:
|
||||
self.replied_comments[feed_id] = {}
|
||||
# 记录回复时间
|
||||
self.replied_comments[feed_id][comment_id_str] = time.time()
|
||||
|
||||
# 调用持久化方法保存数据
|
||||
self._persist_data()
|
||||
logger.info(f"已标记评论为已回复: feed_id={feed_id}, comment_id={comment_id}")
|
||||
|
||||
def get_replied_comments(self, feed_id: str) -> set[str]:
|
||||
"""获取指定说说下所有已回复的评论ID"""
|
||||
"""
|
||||
获取指定说说下所有已回复的评论ID集合。
|
||||
|
||||
Args:
|
||||
feed_id (str): 说说ID。
|
||||
|
||||
Returns:
|
||||
set[str]: 已回复的评论ID集合。
|
||||
"""
|
||||
# 使用 .get() 避免当 feed_id 不存在时发生KeyError
|
||||
return {str(cid) for cid in self.replied_comments.get(feed_id, {}).keys()}
|
||||
|
||||
def get_stats(self) -> dict[str, Any]:
|
||||
"""获取回复记录统计信息"""
|
||||
"""
|
||||
获取回复记录的统计信息。
|
||||
|
||||
Returns:
|
||||
dict[str, Any]: 包含统计信息的字典。
|
||||
"""
|
||||
total_feeds = len(self.replied_comments)
|
||||
total_replies = sum(len(comments) for comments in self.replied_comments.values())
|
||||
return {
|
||||
"total_feeds_with_replies": total_feeds,
|
||||
"total_replied_comments": total_replies,
|
||||
# 从存储实例获取准确的数据文件路径
|
||||
"data_file": str(self.storage.file_path),
|
||||
"max_record_days": self.max_record_days,
|
||||
}
|
||||
|
||||
def remove_reply_record(self, feed_id: str, comment_id: str):
|
||||
"""移除指定评论的回复记录"""
|
||||
"""
|
||||
移除指定评论的回复记录。
|
||||
|
||||
Args:
|
||||
feed_id (str): 说说ID。
|
||||
comment_id (str): 评论ID。
|
||||
"""
|
||||
# 确保记录存在再执行删除
|
||||
if feed_id in self.replied_comments and comment_id in self.replied_comments[feed_id]:
|
||||
del self.replied_comments[feed_id][comment_id]
|
||||
# 如果该说说下已无任何回复记录,则清理掉整个条目
|
||||
if not self.replied_comments[feed_id]:
|
||||
del self.replied_comments[feed_id]
|
||||
# 调用持久化方法保存更改
|
||||
self._persist_data()
|
||||
logger.debug(f"已移除回复记录: feed_id={feed_id}, comment_id={comment_id}")
|
||||
|
||||
def remove_feed_records(self, feed_id: str):
|
||||
"""移除指定说说的所有回复记录"""
|
||||
"""
|
||||
移除指定说说的所有回复记录。
|
||||
|
||||
Args:
|
||||
feed_id (str): 说说ID。
|
||||
"""
|
||||
# 确保记录存在再执行删除
|
||||
if feed_id in self.replied_comments:
|
||||
del self.replied_comments[feed_id]
|
||||
# 调用持久化方法保存更改
|
||||
self._persist_data()
|
||||
logger.info(f"已移除说说 {feed_id} 的所有回复记录")
|
||||
|
||||
Reference in New Issue
Block a user