From e5c5e5c0368b0e7abafc55a90940be3516b18ff7 Mon Sep 17 00:00:00 2001 From: Windpicker-owo <3431391539@qq.com> Date: Mon, 10 Nov 2025 14:23:10 +0800 Subject: [PATCH] =?UTF-8?q?feat(stream):=20=E6=B7=BB=E5=8A=A0=E6=B5=81?= =?UTF-8?q?=E5=BE=AA=E7=8E=AF=E5=90=AF=E5=8A=A8=E9=94=81=E4=BB=A5=E9=98=B2?= =?UTF-8?q?=E6=AD=A2=E5=B9=B6=E5=8F=91=E5=90=AF=E5=8A=A8=E5=90=8C=E4=B8=80?= =?UTF-8?q?=E6=B5=81=E7=9A=84=E5=A4=9A=E4=B8=AA=E5=BE=AA=E7=8E=AF=E4=BB=BB?= =?UTF-8?q?=E5=8A=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../message_manager/distribution_manager.py | 85 +++--- .../proactive/proactive_thinking_executor.py | 262 ++++++++++-------- 2 files changed, 193 insertions(+), 154 deletions(-) diff --git a/src/chat/message_manager/distribution_manager.py b/src/chat/message_manager/distribution_manager.py index a504f49d8..17e1cc861 100644 --- a/src/chat/message_manager/distribution_manager.py +++ b/src/chat/message_manager/distribution_manager.py @@ -48,6 +48,9 @@ class StreamLoopManager: # 每个流的上一次间隔值(用于日志去重) self._last_intervals: dict[str, float] = {} + # 流循环启动锁:防止并发启动同一个流的多个循环任务 + self._stream_start_locks: dict[str, asyncio.Lock] = {} + logger.info(f"流循环管理器初始化完成 (最大并发流数: {self.max_concurrent_streams})") async def start(self) -> None: @@ -105,47 +108,55 @@ class StreamLoopManager: Returns: bool: 是否成功启动 """ - # 获取流上下文 - context = await self._get_stream_context(stream_id) - if not context: - logger.warning(f"无法获取流上下文: {stream_id}") - return False + # 获取或创建该流的启动锁 + if stream_id not in self._stream_start_locks: + self._stream_start_locks[stream_id] = asyncio.Lock() + + lock = self._stream_start_locks[stream_id] + + # 使用锁防止并发启动同一个流的多个循环任务 + async with lock: + # 获取流上下文 + context = await self._get_stream_context(stream_id) + if not context: + logger.warning(f"无法获取流上下文: {stream_id}") + return False - # 快速路径:如果流已存在且不是强制启动,无需处理 - if not force and context.stream_loop_task and not context.stream_loop_task.done(): - logger.debug(f"流 {stream_id} 循环已在运行") - return True + # 快速路径:如果流已存在且不是强制启动,无需处理 + if not force and context.stream_loop_task and not context.stream_loop_task.done(): + logger.debug(f"流 {stream_id} 循环已在运行") + return True - # 如果是强制启动且任务仍在运行,先取消旧任务 - if force and context.stream_loop_task and not context.stream_loop_task.done(): - logger.debug(f"强制启动模式:先取消现有流循环任务: {stream_id}") - old_task = context.stream_loop_task - old_task.cancel() + # 如果是强制启动且任务仍在运行,先取消旧任务 + if force and context.stream_loop_task and not context.stream_loop_task.done(): + logger.debug(f"强制启动模式:先取消现有流循环任务: {stream_id}") + old_task = context.stream_loop_task + old_task.cancel() + try: + await asyncio.wait_for(old_task, timeout=2.0) + logger.debug(f"旧流循环任务已结束: {stream_id}") + except (asyncio.TimeoutError, asyncio.CancelledError): + logger.debug(f"旧流循环任务已取消或超时: {stream_id}") + except Exception as e: + logger.warning(f"等待旧任务结束时出错: {e}") + + # 创建流循环任务 try: - await asyncio.wait_for(old_task, timeout=2.0) - logger.debug(f"旧流循环任务已结束: {stream_id}") - except (asyncio.TimeoutError, asyncio.CancelledError): - logger.debug(f"旧流循环任务已取消或超时: {stream_id}") + loop_task = asyncio.create_task(self._stream_loop_worker(stream_id), name=f"stream_loop_{stream_id}") + + # 将任务记录到 StreamContext 中 + context.stream_loop_task = loop_task + + # 更新统计信息 + self.stats["active_streams"] += 1 + self.stats["total_loops"] += 1 + + logger.debug(f"启动流循环任务: {stream_id}") + return True + except Exception as e: - logger.warning(f"等待旧任务结束时出错: {e}") - - # 创建流循环任务 - try: - loop_task = asyncio.create_task(self._stream_loop_worker(stream_id), name=f"stream_loop_{stream_id}") - - # 将任务记录到 StreamContext 中 - context.stream_loop_task = loop_task - - # 更新统计信息 - self.stats["active_streams"] += 1 - self.stats["total_loops"] += 1 - - logger.debug(f"启动流循环任务: {stream_id}") - return True - - except Exception as e: - logger.error(f"启动流循环任务失败 {stream_id}: {e}") - return False + logger.error(f"启动流循环任务失败 {stream_id}: {e}") + return False async def stop_stream_loop(self, stream_id: str) -> bool: """停止指定流的循环任务 diff --git a/src/plugins/built_in/affinity_flow_chatter/proactive/proactive_thinking_executor.py b/src/plugins/built_in/affinity_flow_chatter/proactive/proactive_thinking_executor.py index 20e1ce468..0499c31f5 100644 --- a/src/plugins/built_in/affinity_flow_chatter/proactive/proactive_thinking_executor.py +++ b/src/plugins/built_in/affinity_flow_chatter/proactive/proactive_thinking_executor.py @@ -3,6 +3,7 @@ 当定时任务触发时,负责搜集信息、调用LLM决策、并根据决策生成回复 """ +import asyncio from datetime import datetime from typing import Any, Literal @@ -484,6 +485,9 @@ _planner = ProactiveThinkingPlanner() # 统计数据 _statistics: dict[str, dict[str, Any]] = {} +# 全局执行锁字典:防止同一聊天流的主动思考被并发执行 +_execution_locks: dict[str, asyncio.Lock] = {} + def _update_statistics(stream_id: str, action: str): """更新统计数据 @@ -538,144 +542,168 @@ async def execute_proactive_thinking(stream_id: str): logger.debug(f"主动思考功能已关闭,跳过执行 {stream_id}") return - logger.debug(f"🤔 开始主动思考 {stream_id}") + # 获取或创建该聊天流的执行锁 + if stream_id not in _execution_locks: + _execution_locks[stream_id] = asyncio.Lock() + + lock = _execution_locks[stream_id] + + # 尝试获取锁,如果已被占用则跳过本次执行(防止重复) + if lock.locked(): + logger.warning(f"⚠️ 主动思考跳过:聊天流 {stream_id} 已有正在执行的主动思考任务") + return + + async with lock: + logger.debug(f"🤔 开始主动思考 {stream_id}") - try: - # 0. 前置检查 - # 0.1 检查白名单/黑名单 - # 从 stream_id 获取 stream_config 字符串进行验证 try: - from src.chat.message_receive.chat_stream import get_chat_manager - chat_manager = get_chat_manager() - chat_stream = await chat_manager.get_stream(stream_id) - - if chat_stream: - # 使用 ChatStream 的 get_raw_id() 方法获取配置字符串 - stream_config = chat_stream.get_raw_id() - - # 执行白名单/黑名单检查 - if not proactive_thinking_scheduler._check_whitelist_blacklist(stream_config): - logger.debug(f"聊天流 {stream_id} ({stream_config}) 未通过白名单/黑名单检查,跳过主动思考") + # 0. 前置检查 + # 0.0 检查聊天流是否正在处理消息(双重保护) + try: + from src.chat.message_receive.chat_stream import get_chat_manager + chat_manager = get_chat_manager() + chat_stream = await chat_manager.get_stream(stream_id) + + if chat_stream and chat_stream.context_manager.context.is_chatter_processing: + logger.warning(f"⚠️ 主动思考跳过:聊天流 {stream_id} 的 chatter 正在处理消息") return - else: - logger.warning(f"无法获取聊天流 {stream_id} 的信息,跳过白名单检查") - except Exception as e: - logger.warning(f"白名单检查时出错: {e},继续执行") + except Exception as e: + logger.warning(f"检查 chatter 处理状态时出错: {e},继续执行") + + # 0.1 检查白名单/黑名单 + # 从 stream_id 获取 stream_config 字符串进行验证 + try: + from src.chat.message_receive.chat_stream import get_chat_manager + chat_manager = get_chat_manager() + chat_stream = await chat_manager.get_stream(stream_id) - # 0.2 检查安静时段 - if proactive_thinking_scheduler._is_in_quiet_hours(): - logger.debug("安静时段,跳过") - return + if chat_stream: + # 使用 ChatStream 的 get_raw_id() 方法获取配置字符串 + stream_config = chat_stream.get_raw_id() - # 0.3 检查每日限制 - if not proactive_thinking_scheduler._check_daily_limit(stream_id): - logger.debug("今日发言达上限") - return + # 执行白名单/黑名单检查 + if not proactive_thinking_scheduler._check_whitelist_blacklist(stream_config): + logger.debug(f"聊天流 {stream_id} ({stream_config}) 未通过白名单/黑名单检查,跳过主动思考") + return + else: + logger.warning(f"无法获取聊天流 {stream_id} 的信息,跳过白名单检查") + except Exception as e: + logger.warning(f"白名单检查时出错: {e},继续执行") - # 1. 搜集信息 - logger.debug("步骤1: 搜集上下文") - context = await _planner.gather_context(stream_id) - if not context: - logger.warning("无法搜集上下文,跳过") - return + # 0.2 检查安静时段 + if proactive_thinking_scheduler._is_in_quiet_hours(): + logger.debug("安静时段,跳过") + return - # 检查兴趣分数阈值 - interest_score = context.get("interest_score", 0.5) - if not proactive_thinking_scheduler._check_interest_score_threshold(interest_score): - logger.debug("兴趣分数不在阈值范围内") - return + # 0.3 检查每日限制 + if not proactive_thinking_scheduler._check_daily_limit(stream_id): + logger.debug("今日发言达上限") + return - # 2. 进行决策 - logger.debug("步骤2: LLM决策") - decision = await _planner.make_decision(context) - if not decision: - logger.warning("决策失败,跳过") - return + # 1. 搜集信息 + logger.debug("步骤1: 搜集上下文") + context = await _planner.gather_context(stream_id) + if not context: + logger.warning("无法搜集上下文,跳过") + return - action = decision.get("action", "do_nothing") - reasoning = decision.get("reasoning", "无") + # 检查兴趣分数阈值 + interest_score = context.get("interest_score", 0.5) + if not proactive_thinking_scheduler._check_interest_score_threshold(interest_score): + logger.debug("兴趣分数不在阈值范围内") + return - # 记录决策日志 - if config.log_decisions: - logger.debug(f"决策: action={action}, reasoning={reasoning}") + # 2. 进行决策 + logger.debug("步骤2: LLM决策") + decision = await _planner.make_decision(context) + if not decision: + logger.warning("决策失败,跳过") + return - # 3. 根据决策执行相应动作 - if action == "do_nothing": - logger.debug(f"决策:什么都不做。理由:{reasoning}") - proactive_thinking_scheduler.record_decision(stream_id, action, reasoning, None) - return + action = decision.get("action", "do_nothing") + reasoning = decision.get("reasoning", "无") - elif action == "simple_bubble": - logger.info(f"💬 决策:冒个泡。理由:{reasoning}") + # 记录决策日志 + if config.log_decisions: + logger.debug(f"决策: action={action}, reasoning={reasoning}") - proactive_thinking_scheduler.record_decision(stream_id, action, reasoning, None) + # 3. 根据决策执行相应动作 + if action == "do_nothing": + logger.debug(f"决策:什么都不做。理由:{reasoning}") + proactive_thinking_scheduler.record_decision(stream_id, action, reasoning, None) + return - # 生成简单的消息 - logger.debug("步骤3: 生成冒泡回复") - reply = await _planner.generate_reply(context, "simple_bubble") - if reply: - await send_api.text_to_stream( - stream_id=stream_id, - text=reply, - ) - logger.info("✅ 已发送冒泡消息") + elif action == "simple_bubble": + logger.info(f"💬 决策:冒个泡。理由:{reasoning}") - # 增加每日计数 - proactive_thinking_scheduler._increment_daily_count(stream_id) + proactive_thinking_scheduler.record_decision(stream_id, action, reasoning, None) - # 更新统计 - if config.enable_statistics: - _update_statistics(stream_id, action) - - # 冒泡后暂停主动思考,等待用户回复 - # 使用与 topic_throw 相同的冷却时间配置 - if config.topic_throw_cooldown > 0: - logger.info("[主动思考] 步骤5:暂停任务") - await proactive_thinking_scheduler.pause_proactive_thinking(stream_id, reason="已冒泡") - logger.info(f"[主动思考] 已暂停聊天流 {stream_id} 的主动思考,等待用户回复") - - logger.info("[主动思考] simple_bubble 执行完成") - - elif action == "throw_topic": - topic = decision.get("topic", "") - logger.info(f"[主动思考] 决策:抛出话题。理由:{reasoning},话题:{topic}") - - # 记录决策 - proactive_thinking_scheduler.record_decision(stream_id, action, reasoning, topic) - - if not topic: - logger.warning("[主动思考] 选择了抛出话题但未提供话题内容,降级为冒泡") - logger.info("[主动思考] 步骤3:生成降级冒泡回复") + # 生成简单的消息 + logger.debug("步骤3: 生成冒泡回复") reply = await _planner.generate_reply(context, "simple_bubble") - else: - # 生成基于话题的消息 - logger.info("[主动思考] 步骤3:生成话题回复") - reply = await _planner.generate_reply(context, "throw_topic", topic) + if reply: + await send_api.text_to_stream( + stream_id=stream_id, + text=reply, + ) + logger.info("✅ 已发送冒泡消息") - if reply: - logger.info("[主动思考] 步骤4:发送消息") - await send_api.text_to_stream( - stream_id=stream_id, - text=reply, - ) - logger.info(f"[主动思考] 已发送话题消息到 {stream_id}") + # 增加每日计数 + proactive_thinking_scheduler._increment_daily_count(stream_id) - # 增加每日计数 - proactive_thinking_scheduler._increment_daily_count(stream_id) + # 更新统计 + if config.enable_statistics: + _update_statistics(stream_id, action) - # 更新统计 - if config.enable_statistics: - _update_statistics(stream_id, action) + # 冒泡后暂停主动思考,等待用户回复 + # 使用与 topic_throw 相同的冷却时间配置 + if config.topic_throw_cooldown > 0: + logger.info("[主动思考] 步骤5:暂停任务") + await proactive_thinking_scheduler.pause_proactive_thinking(stream_id, reason="已冒泡") + logger.info(f"[主动思考] 已暂停聊天流 {stream_id} 的主动思考,等待用户回复") - # 抛出话题后暂停主动思考(如果配置了冷却时间) - if config.topic_throw_cooldown > 0: - logger.info("[主动思考] 步骤5:暂停任务") - await proactive_thinking_scheduler.pause_proactive_thinking(stream_id, reason="已抛出话题") - logger.info(f"[主动思考] 已暂停聊天流 {stream_id} 的主动思考,等待用户回复") + logger.info("[主动思考] simple_bubble 执行完成") - logger.info("[主动思考] throw_topic 执行完成") + elif action == "throw_topic": + topic = decision.get("topic", "") + logger.info(f"[主动思考] 决策:抛出话题。理由:{reasoning},话题:{topic}") - logger.info(f"[主动思考] 聊天流 {stream_id} 的主动思考执行完成") + # 记录决策 + proactive_thinking_scheduler.record_decision(stream_id, action, reasoning, topic) - except Exception as e: - logger.error(f"[主动思考] 执行主动思考失败: {e}", exc_info=True) + if not topic: + logger.warning("[主动思考] 选择了抛出话题但未提供话题内容,降级为冒泡") + logger.info("[主动思考] 步骤3:生成降级冒泡回复") + reply = await _planner.generate_reply(context, "simple_bubble") + else: + # 生成基于话题的消息 + logger.info("[主动思考] 步骤3:生成话题回复") + reply = await _planner.generate_reply(context, "throw_topic", topic) + + if reply: + logger.info("[主动思考] 步骤4:发送消息") + await send_api.text_to_stream( + stream_id=stream_id, + text=reply, + ) + logger.info(f"[主动思考] 已发送话题消息到 {stream_id}") + + # 增加每日计数 + proactive_thinking_scheduler._increment_daily_count(stream_id) + + # 更新统计 + if config.enable_statistics: + _update_statistics(stream_id, action) + + # 抛出话题后暂停主动思考(如果配置了冷却时间) + if config.topic_throw_cooldown > 0: + logger.info("[主动思考] 步骤5:暂停任务") + await proactive_thinking_scheduler.pause_proactive_thinking(stream_id, reason="已抛出话题") + logger.info(f"[主动思考] 已暂停聊天流 {stream_id} 的主动思考,等待用户回复") + + logger.info("[主动思考] throw_topic 执行完成") + + logger.info(f"[主动思考] 聊天流 {stream_id} 的主动思考执行完成") + + except Exception as e: + logger.error(f"[主动思考] 执行主动思考失败: {e}", exc_info=True)