diff --git a/src/plugins/built_in/maizone_refactored/services/qzone_service.py b/src/plugins/built_in/maizone_refactored/services/qzone_service.py index 2022461f7..cfee3787c 100644 --- a/src/plugins/built_in/maizone_refactored/services/qzone_service.py +++ b/src/plugins/built_in/maizone_refactored/services/qzone_service.py @@ -59,6 +59,8 @@ class QZoneService: self.cookie_service = cookie_service # 如果没有提供 reply_tracker 实例,则创建一个新的 self.reply_tracker = reply_tracker if reply_tracker is not None else ReplyTrackerService() + # 用于防止并发回复/评论的内存锁 + self.processing_comments = set() # --- Public Methods (High-Level Business Logic) --- @@ -159,7 +161,7 @@ class QZoneService: if self.get_config("monitor.enable_auto_reply", False): try: # 传入新参数,表明正在检查自己的说说 - own_feeds = await api_client["list_feeds"](qq_account, 5, is_monitoring_own_feeds=True) + own_feeds = await api_client["list_feeds"](qq_account, 5) if own_feeds: logger.info(f"获取到自己 {len(own_feeds)} 条说说,检查评论...") for feed in own_feeds: @@ -263,37 +265,37 @@ class QZoneService: return # 直接检查评论是否已回复,不做验证清理 - comments_to_reply = [] + comments_to_process = [] for comment in user_comments: comment_tid = comment.get("comment_tid") if not comment_tid: continue - # 检查是否已经在持久化记录中标记为已回复 - if not self.reply_tracker.has_replied(fid, comment_tid): - # 记录日志以便追踪 - logger.debug( - f"发现新评论需要回复 - 说说ID: {fid}, 评论ID: {comment_tid}, " - f"评论人: {comment.get('nickname', '')}, 内容: {comment.get('content', '')}" - ) - comments_to_reply.append(comment) + comment_key = f"{fid}_{comment_tid}" + # 检查持久化记录和内存锁 + if not self.reply_tracker.has_replied(fid, comment_tid) and comment_key not in self.processing_comments: + logger.debug(f"锁定待回复评论: {comment_key}") + self.processing_comments.add(comment_key) + comments_to_process.append(comment) - if not comments_to_reply: - logger.debug(f"说说 {fid} 下的所有评论都已回复过或无需回复") + if not comments_to_process: + logger.debug(f"说说 {fid} 下的所有评论都已回复过或正在处理中") return - logger.info(f"发现自己说说下的 {len(comments_to_reply)} 条新评论,准备回复...") - for comment in comments_to_reply: + logger.info(f"发现自己说说下的 {len(comments_to_process)} 条新评论,准备回复...") + for comment in comments_to_process: comment_tid = comment.get("comment_tid") + comment_key = f"{fid}_{comment_tid}" nickname = comment.get("nickname", "") comment_content = comment.get("content", "") try: - reply_content = await self.content_service.generate_comment_reply(content, comment_content, nickname) + reply_content = await self.content_service.generate_comment_reply( + content, comment_content, nickname + ) if reply_content: success = await api_client["reply"](fid, qq_account, nickname, reply_content, comment_tid) if success: - # 标记为已回复 self.reply_tracker.mark_as_replied(fid, comment_tid) logger.info(f"成功回复'{nickname}'的评论: '{reply_content}'") else: @@ -303,6 +305,11 @@ class QZoneService: logger.warning(f"生成回复内容失败,跳过回复'{nickname}'的评论") except Exception as e: logger.error(f"回复'{nickname}'的评论时发生异常: {e}", exc_info=True) + finally: + # 无论成功与否,都解除锁定 + logger.debug(f"解锁评论: {comment_key}") + if comment_key in self.processing_comments: + self.processing_comments.remove(comment_key) async def _validate_and_cleanup_reply_records(self, fid: str, my_replies: List[Dict]): """验证并清理已删除的回复记录""" @@ -335,11 +342,34 @@ class QZoneService: rt_con = feed.get("rt_con", "") images = feed.get("images", []) - if random.random() <= self.get_config("read.comment_possibility", 0.3): - comment_text = await self.content_service.generate_comment(content, target_name, rt_con, images) - if comment_text: - await api_client["comment"](target_qq, fid, comment_text) + # --- 处理评论 --- + comment_key = f"{fid}_main_comment" + should_comment = random.random() <= self.get_config("read.comment_possibility", 0.3) + if ( + should_comment + and not self.reply_tracker.has_replied(fid, "main_comment") + and comment_key not in self.processing_comments + ): + logger.debug(f"锁定待评论说说: {comment_key}") + self.processing_comments.add(comment_key) + try: + comment_text = await self.content_service.generate_comment(content, target_name, rt_con, images) + if comment_text: + success = await api_client["comment"](target_qq, fid, comment_text) + if success: + self.reply_tracker.mark_as_replied(fid, "main_comment") + logger.info(f"成功评论'{target_name}'的说说: '{comment_text}'") + else: + logger.error(f"评论'{target_name}'的说说失败") + except Exception as e: + logger.error(f"评论'{target_name}'的说说时发生异常: {e}", exc_info=True) + finally: + logger.debug(f"解锁说说: {comment_key}") + if comment_key in self.processing_comments: + self.processing_comments.remove(comment_key) + + # --- 处理点赞 (逻辑不变) --- if random.random() <= self.get_config("read.like_possibility", 1.0): await api_client["like"](target_qq, fid) @@ -714,9 +744,10 @@ class QZoneService: logger.error(f"上传图片 {index + 1} 异常: {e}", exc_info=True) return None - async def _list_feeds(t_qq: str, num: int, is_monitoring_own_feeds: bool = False) -> List[Dict]: - """获取指定用户说说列表""" + async def _list_feeds(t_qq: str, num: int) -> List[Dict]: + """获取指定用户说说列表 (统一接口)""" try: + # 统一使用 format=json 获取完整评论 params = { "g_tk": gtk, "uin": t_qq, @@ -724,59 +755,74 @@ class QZoneService: "sort": 0, "pos": 0, "num": num, - "replynum": 100, - "callback": "_preloadCallback", + "replynum": 999, # 尽量获取更多 "code_version": 1, - "format": "jsonp", + "format": "json", # 关键:使用JSON格式 "need_comment": 1, } res_text = await _request("GET", self.LIST_URL, params=params) - json_str = res_text[len("_preloadCallback(") : -2] - json_data = orjson.loads(json_str) + json_data = orjson.loads(res_text) if json_data.get("code") != 0: + logger.warning(f"获取说说列表API返回错误: code={json_data.get('code')}, message={json_data.get('message')}") return [] feeds_list = [] my_name = json_data.get("logininfo", {}).get("name", "") - for msg in json_data.get("msglist", []): - # 只有在处理好友说说时,才检查是否已评论并跳过 - commentlist = msg.get("commentlist") - # 只有在处理好友说说时,才检查是否已评论并跳过 - if not is_monitoring_own_feeds: + for msg in json_data.get("msglist", []): + # 当读取的是好友动态时,检查是否已评论过,如果是则跳过 + is_friend_feed = str(t_qq) != str(uin) + if is_friend_feed: + commentlist_for_check = msg.get("commentlist") is_commented = False - if isinstance(commentlist, list): + if isinstance(commentlist_for_check, list): is_commented = any( - c.get("name") == my_name for c in commentlist if isinstance(c, dict) + c.get("name") == my_name for c in commentlist_for_check if isinstance(c, dict) ) if is_commented: continue # --- 安全地处理图片列表 --- images = [] - pictotal = msg.get("pictotal") - if isinstance(pictotal, list): - images = [ - pic["url1"] for pic in pictotal if isinstance(pic, dict) and "url1" in pic - ] + if "pic" in msg and isinstance(msg["pic"], list): + images = [pic.get("url1", "") for pic in msg["pic"] if pic.get("url1")] + elif "pictotal" in msg and isinstance(msg["pictotal"], list): + images = [pic.get("url1", "") for pic in msg["pictotal"] if pic.get("url1")] - # --- 安全地处理评论列表 --- + # --- 解析完整评论列表 (包括二级评论) --- comments = [] + commentlist = msg.get("commentlist") if isinstance(commentlist, list): for c in commentlist: - # 确保评论条目也是字典 - if isinstance(c, dict): - comments.append( - { - "qq_account": c.get("uin"), - "nickname": c.get("name"), - "content": c.get("content"), - "comment_tid": c.get("tid"), - "parent_tid": c.get("parent_tid"), - } - ) - + if not isinstance(c, dict): + continue + + # 添加主评论 + comments.append( + { + "qq_account": c.get("uin"), + "nickname": c.get("name"), + "content": c.get("content"), + "comment_tid": c.get("tid"), + "parent_tid": None, # 主评论没有父ID + } + ) + # 检查并添加二级评论 (回复) + if "list_3" in c and isinstance(c["list_3"], list): + for reply in c["list_3"]: + if not isinstance(reply, dict): + continue + comments.append( + { + "qq_account": reply.get("uin"), + "nickname": reply.get("name"), + "content": reply.get("content"), + "comment_tid": reply.get("tid"), + "parent_tid": c.get("tid"), # 父ID是主评论的ID + } + ) + feeds_list.append( { "tid": msg.get("tid", ""), @@ -791,6 +837,8 @@ class QZoneService: "comments": comments, } ) + + logger.info(f"成功获取到 {len(feeds_list)} 条说说 from {t_qq} (使用统一JSON接口)") return feeds_list except Exception as e: logger.error(f"获取说说列表失败: {e}", exc_info=True)