feat(maizone): 引入持久化回复跟踪以避免重复回复

引入 `ReplyTrackerService` 来持久化跟踪已回复的评论,确保即使在程序重启后也不会对同一评论进行重复回复。

主要变更:
- 新增 `ReplyTrackerService`,用于记录和管理对特定说说下评论的回复状态。
- 在 `QZoneService` 中重构评论回复逻辑,利用 `ReplyTrackerService` 来判断评论是否已被回复。
- 增加逻辑以验证和清理无效的回复记录,例如当用户手动删除了机器人的回复后,程序能够识别并清除相应的记录,从而可以重新进行回复。
- 将 `ReplyTrackerService` 注册为全局服务,以便在插件内部共享。
This commit is contained in:
tt-P607
2025-08-28 21:02:33 +08:00
parent 4b6542b8e0
commit 3d958b9e05
3 changed files with 262 additions and 22 deletions

View File

@@ -24,6 +24,7 @@ from .services.qzone_service import QZoneService
from .services.scheduler_service import SchedulerService
from .services.monitor_service import MonitorService
from .services.cookie_service import CookieService
from .services.reply_tracker_service import ReplyTrackerService
from .services.manager import register_service
logger = get_logger("MaiZone.Plugin")
@@ -92,11 +93,13 @@ class MaiZoneRefactoredPlugin(BasePlugin):
content_service = ContentService(self.get_config)
image_service = ImageService(self.get_config)
cookie_service = CookieService(self.get_config)
reply_tracker_service = ReplyTrackerService()
qzone_service = QZoneService(self.get_config, content_service, image_service, cookie_service)
scheduler_service = SchedulerService(self.get_config, qzone_service)
monitor_service = MonitorService(self.get_config, qzone_service)
register_service("qzone", qzone_service)
register_service("reply_tracker", reply_tracker_service)
register_service("get_config", self.get_config)
asyncio.create_task(scheduler_service.start())

View File

@@ -27,6 +27,7 @@ from src.chat.utils.chat_message_builder import (
from .content_service import ContentService
from .image_service import ImageService
from .cookie_service import CookieService
from .reply_tracker_service import ReplyTrackerService
logger = get_logger("MaiZone.QZoneService")
@@ -55,6 +56,7 @@ class QZoneService:
self.content_service = content_service
self.image_service = image_service
self.cookie_service = cookie_service
self.reply_tracker = ReplyTrackerService()
# --- Public Methods (High-Level Business Logic) ---
@@ -249,43 +251,83 @@ class QZoneService:
content = feed.get("content", "")
fid = feed.get("tid", "")
if not comments:
return
# 筛选出未被自己回复过的评论
if not comments:
if not comments or not fid:
return
# 1. 将评论分为用户评论和自己的回复
user_comments = [c for c in comments if str(c.get('qq_account')) != str(qq_account)]
my_replies = [c for c in comments if str(c.get('qq_account')) == str(qq_account)]
if not user_comments:
return
# 2. 获取所有已经被我回复过的评论的ID
replied_comment_ids = {reply.get('parent_tid') for reply in my_replies if reply.get('parent_tid')}
# 2. 验证已记录的回复是否仍然存在,清理已删除的回复记录
await self._validate_and_cleanup_reply_records(fid, my_replies)
# 3. 找出所有尚未被回复过的用户评论
comments_to_reply = [
comment for comment in user_comments
if comment.get('comment_tid') not in replied_comment_ids
]
# 3. 使用验证后的持久化记录来筛选未回复的评论
comments_to_reply = []
for comment in user_comments:
comment_tid = comment.get('comment_tid')
if not comment_tid:
continue
# 检查是否已经在持久化记录中标记为已回复
if not self.reply_tracker.has_replied(fid, comment_tid):
comments_to_reply.append(comment)
if not comments_to_reply:
logger.debug(f"说说 {fid} 下的所有评论都已回复过")
return
logger.info(f"发现自己说说下的 {len(comments_to_reply)} 条新评论,准备回复...")
for comment in comments_to_reply:
reply_content = await self.content_service.generate_comment_reply(
content, comment.get("content", ""), comment.get("nickname", "")
)
if reply_content:
success = await api_client["reply"](
fid, qq_account, comment.get("nickname", ""), reply_content, comment.get("comment_tid")
comment_tid = comment.get("comment_tid")
nickname = comment.get("nickname", "")
comment_content = comment.get("content", "")
try:
reply_content = await self.content_service.generate_comment_reply(
content, comment_content, nickname
)
if success:
logger.info(f"成功回复'{comment.get('nickname', '')}'的评论: '{reply_content}'")
if reply_content:
success = await api_client["reply"](
fid, qq_account, nickname, reply_content, comment_tid
)
if success:
# 标记为已回复
self.reply_tracker.mark_as_replied(fid, comment_tid)
logger.info(f"成功回复'{nickname}'的评论: '{reply_content}'")
else:
logger.error(f"回复'{nickname}'的评论失败")
await asyncio.sleep(random.uniform(10, 20))
else:
logger.error(f"回复'{comment.get('nickname', '')}'的评论失败")
await asyncio.sleep(random.uniform(10, 20))
logger.warning(f"生成回复内容失败,跳过回复'{nickname}'的评论")
except Exception as e:
logger.error(f"回复'{nickname}'的评论时发生异常: {e}", exc_info=True)
async def _validate_and_cleanup_reply_records(self, fid: str, my_replies: List[Dict]):
"""验证并清理已删除的回复记录"""
# 获取当前记录中该说说的所有已回复评论ID
recorded_replied_comments = self.reply_tracker.get_replied_comments(fid)
if not recorded_replied_comments:
return
# 从API返回的我的回复中提取parent_tid即被回复的评论ID
current_replied_comments = set()
for reply in my_replies:
parent_tid = reply.get('parent_tid')
if parent_tid:
current_replied_comments.add(parent_tid)
# 找出记录中有但实际已不存在的回复
deleted_replies = recorded_replied_comments - current_replied_comments
if deleted_replies:
logger.info(f"检测到 {len(deleted_replies)} 个回复已被删除,清理记录...")
for comment_tid in deleted_replies:
self.reply_tracker.remove_reply_record(fid, comment_tid)
logger.debug(f"已清理删除的回复记录: feed_id={fid}, comment_id={comment_tid}")
async def _process_single_feed(self, feed: Dict, api_client: Dict, target_qq: str, target_name: str):
"""处理单条说说,决定是否评论和点赞"""

View File

@@ -0,0 +1,195 @@
# -*- coding: utf-8 -*-
"""
评论回复跟踪服务
负责记录和管理已回复过的评论ID避免重复回复
"""
import json
import time
from pathlib import Path
from typing import Set, Dict, Any
from src.common.logger import get_logger
logger = get_logger("MaiZone.ReplyTrackerService")
class ReplyTrackerService:
"""
评论回复跟踪服务
使用本地JSON文件持久化存储已回复的评论ID
"""
def __init__(self):
# 数据存储路径
self.data_dir = Path(__file__).resolve().parent.parent / "data"
self.data_dir.mkdir(exist_ok=True)
self.reply_record_file = self.data_dir / "replied_comments.json"
# 内存中的已回复评论记录
# 格式: {feed_id: {comment_id: timestamp, ...}, ...}
self.replied_comments: Dict[str, Dict[str, float]] = {}
# 数据清理配置
self.max_record_days = 30 # 保留30天的记录
# 加载已有数据
self._load_data()
def _load_data(self):
"""从文件加载已回复评论数据"""
try:
if self.reply_record_file.exists():
with open(self.reply_record_file, 'r', encoding='utf-8') as f:
data = json.load(f)
self.replied_comments = data
logger.info(f"已加载 {len(self.replied_comments)} 条说说的回复记录")
else:
logger.info("未找到回复记录文件,将创建新的记录")
except Exception as e:
logger.error(f"加载回复记录失败: {e}")
self.replied_comments = {}
def _save_data(self):
"""保存已回复评论数据到文件"""
try:
# 清理过期数据
self._cleanup_old_records()
with open(self.reply_record_file, 'w', encoding='utf-8') as f:
json.dump(self.replied_comments, f, ensure_ascii=False, indent=2)
logger.debug("回复记录已保存")
except Exception as e:
logger.error(f"保存回复记录失败: {e}")
def _cleanup_old_records(self):
"""清理超过保留期限的记录"""
current_time = time.time()
cutoff_time = current_time - (self.max_record_days * 24 * 60 * 60)
feeds_to_remove = []
total_removed = 0
for feed_id, comments in self.replied_comments.items():
comments_to_remove = []
for comment_id, timestamp in comments.items():
if timestamp < cutoff_time:
comments_to_remove.append(comment_id)
# 移除过期的评论记录
for comment_id in comments_to_remove:
del comments[comment_id]
total_removed += 1
# 如果该说说下没有任何记录了,标记删除整个说说记录
if not comments:
feeds_to_remove.append(feed_id)
# 移除空的说说记录
for feed_id in feeds_to_remove:
del self.replied_comments[feed_id]
if total_removed > 0:
logger.info(f"清理了 {total_removed} 条过期的回复记录")
def has_replied(self, feed_id: str, comment_id: str) -> bool:
"""
检查是否已经回复过指定的评论
Args:
feed_id: 说说ID
comment_id: 评论ID
Returns:
bool: 如果已回复过返回True否则返回False
"""
if not feed_id or not comment_id:
return False
return (feed_id in self.replied_comments and
comment_id in self.replied_comments[feed_id])
def mark_as_replied(self, feed_id: str, comment_id: str):
"""
标记指定评论为已回复
Args:
feed_id: 说说ID
comment_id: 评论ID
"""
if not feed_id or not comment_id:
logger.warning("feed_id 或 comment_id 为空,无法标记为已回复")
return
current_time = time.time()
if feed_id not in self.replied_comments:
self.replied_comments[feed_id] = {}
self.replied_comments[feed_id][comment_id] = current_time
# 保存到文件
self._save_data()
logger.info(f"已标记评论为已回复: feed_id={feed_id}, comment_id={comment_id}")
def get_replied_comments(self, feed_id: str) -> Set[str]:
"""
获取指定说说下所有已回复的评论ID
Args:
feed_id: 说说ID
Returns:
Set[str]: 已回复的评论ID集合
"""
if feed_id in self.replied_comments:
return set(self.replied_comments[feed_id].keys())
return set()
def get_stats(self) -> Dict[str, Any]:
"""
获取回复记录统计信息
Returns:
Dict: 包含统计信息的字典
"""
total_feeds = len(self.replied_comments)
total_replies = sum(len(comments) for comments in self.replied_comments.values())
return {
"total_feeds_with_replies": total_feeds,
"total_replied_comments": total_replies,
"data_file": str(self.reply_record_file),
"max_record_days": self.max_record_days
}
def remove_reply_record(self, feed_id: str, comment_id: str):
"""
移除指定评论的回复记录
Args:
feed_id: 说说ID
comment_id: 评论ID
"""
if feed_id in self.replied_comments and comment_id in self.replied_comments[feed_id]:
del self.replied_comments[feed_id][comment_id]
# 如果该说说下没有任何回复记录了,删除整个说说记录
if not self.replied_comments[feed_id]:
del self.replied_comments[feed_id]
self._save_data()
logger.debug(f"已移除回复记录: feed_id={feed_id}, comment_id={comment_id}")
def remove_feed_records(self, feed_id: str):
"""
移除指定说说的所有回复记录
Args:
feed_id: 说说ID
"""
if feed_id in self.replied_comments:
del self.replied_comments[feed_id]
self._save_data()
logger.info(f"已移除说说 {feed_id} 的所有回复记录")