diff --git a/src/chat/message_receive/uni_message_sender.py b/src/chat/message_receive/uni_message_sender.py index 94bf6e4c0..3a1204f23 100644 --- a/src/chat/message_receive/uni_message_sender.py +++ b/src/chat/message_receive/uni_message_sender.py @@ -30,14 +30,27 @@ async def send_message(message: MessageSending, show_log=True) -> bool: from src.plugin_system.base.component_types import EventType if message.chat_stream: - await event_manager.trigger_event( - EventType.AFTER_SEND, - permission_group="SYSTEM", - stream_id=message.chat_stream.stream_id, - message=message, - ) + logger.info(f"[发送完成] 准备触发 AFTER_SEND 事件,stream_id={message.chat_stream.stream_id}") + + # 使用 asyncio.create_task 来异步触发事件,避免阻塞 + async def trigger_event_async(): + try: + logger.info(f"[事件触发] 开始异步触发 AFTER_SEND 事件") + await event_manager.trigger_event( + EventType.AFTER_SEND, + permission_group="SYSTEM", + stream_id=message.chat_stream.stream_id, + message=message, + ) + logger.info(f"[事件触发] AFTER_SEND 事件触发完成") + except Exception as e: + logger.error(f"[事件触发] 异步触发事件失败: {e}", exc_info=True) + + # 创建异步任务,不等待完成 + asyncio.create_task(trigger_event_async()) + logger.info(f"[发送完成] AFTER_SEND 事件已提交到异步任务") except Exception as event_error: - logger.error(f"触发 AFTER_SEND 事件时出错: {event_error}") + logger.error(f"触发 AFTER_SEND 事件时出错: {event_error}", exc_info=True) return True diff --git a/src/plugins/built_in/affinity_flow_chatter/proactive_thinking_event.py b/src/plugins/built_in/affinity_flow_chatter/proactive_thinking_event.py index 029bcc22f..c5fcc6856 100644 --- a/src/plugins/built_in/affinity_flow_chatter/proactive_thinking_event.py +++ b/src/plugins/built_in/affinity_flow_chatter/proactive_thinking_event.py @@ -34,6 +34,8 @@ class ProactiveThinkingReplyHandler(BaseEventHandler): Returns: HandlerResult: 处理结果 """ + logger.info("[事件] ProactiveThinkingReplyHandler 开始执行") + if not kwargs: return HandlerResult(success=True, continue_process=True, message=None) @@ -42,6 +44,8 @@ class ProactiveThinkingReplyHandler(BaseEventHandler): logger.warning("Reply事件缺少stream_id参数") return HandlerResult(success=True, continue_process=True, message=None) + logger.info(f"[事件] 收到 AFTER_SEND 事件,stream_id={stream_id}") + try: from src.config.config import global_config @@ -56,7 +60,9 @@ class ProactiveThinkingReplyHandler(BaseEventHandler): logger.info(f"检测到reply事件,聊天流 {stream_id} 之前因抛出话题而暂停,现在恢复") # 重置定时任务(这会自动清除暂停标记并创建新任务) + logger.debug(f"[事件] 准备调用 schedule_proactive_thinking") success = await proactive_thinking_scheduler.schedule_proactive_thinking(stream_id) + logger.debug(f"[事件] schedule_proactive_thinking 调用完成,success={success}") if success: if was_paused: diff --git a/src/plugins/built_in/affinity_flow_chatter/proactive_thinking_executor.py b/src/plugins/built_in/affinity_flow_chatter/proactive_thinking_executor.py index d014f47b0..e367f73ee 100644 --- a/src/plugins/built_in/affinity_flow_chatter/proactive_thinking_executor.py +++ b/src/plugins/built_in/affinity_flow_chatter/proactive_thinking_executor.py @@ -330,13 +330,13 @@ class ProactiveThinkingPlanner: 【最近的聊天记录】 {context['recent_chat_history']} {expression_habits} -请生成一条简短的消息,用于活跃气氛或刷存在感。要求: +请生成一条简短的消息,用于水群。要求: 1. 非常简短(5-15字) 2. 轻松随意,不要有明确的话题或问题 3. 可以是:问候、表达心情、随口一句话 4. 符合你的人设和当前聊天风格 5. 如果有表达方式参考,在合适时自然使用 - +6. 合理参考历史记录 直接输出消息内容,不要解释:""" else: # throw_topic @@ -438,7 +438,7 @@ async def execute_proactive_thinking(stream_id: str): config = global_config.proactive_thinking - logger.info(f"开始为聊天流 {stream_id} 执行主动思考") + logger.info(f"[主动思考] 开始为聊天流 {stream_id} 执行主动思考") try: # 0. 前置检查 @@ -453,22 +453,26 @@ async def execute_proactive_thinking(stream_id: str): return # 1. 搜集信息 + logger.info(f"[主动思考] 步骤1:搜集上下文信息") context = await _planner.gather_context(stream_id) if not context: - logger.warning(f"无法搜集聊天流 {stream_id} 的上下文,跳过本次主动思考") + logger.warning(f"[主动思考] 无法搜集聊天流 {stream_id} 的上下文,跳过本次主动思考") return - + logger.info(f"[主动思考] 上下文搜集完成") + # 检查兴趣分数阈值 interest_score = context.get('interest_score', 0.5) if not proactive_thinking_scheduler._check_interest_score_threshold(interest_score): - logger.info(f"聊天流 {stream_id} 兴趣分数不在阈值范围内") + logger.info(f"[主动思考] 聊天流 {stream_id} 兴趣分数不在阈值范围内") return # 2. 进行决策 + logger.info(f"[主动思考] 步骤2:LLM决策") decision = await _planner.make_decision(context) if not decision: - logger.warning(f"决策失败,跳过本次主动思考") + logger.warning(f"[主动思考] 决策失败,跳过本次主动思考") return + logger.info(f"[主动思考] 决策完成") action = decision.get("action", "do_nothing") reasoning = decision.get("reasoning", "无") @@ -483,13 +487,18 @@ async def execute_proactive_thinking(stream_id: str): return elif action == "simple_bubble": - logger.info(f"决策:简单冒个泡。理由:{reasoning}") + logger.info(f"[主动思考] 决策:简单冒个泡。理由:{reasoning}") # 生成简单的消息 + logger.info(f"[主动思考] 步骤3:生成冒泡回复") reply = await _planner.generate_reply(context, "simple_bubble") if reply: - await send_api.text_to_stream(stream_id=stream_id, text=reply) - logger.info(f"已发送冒泡消息到 {stream_id}") + logger.info(f"[主动思考] 步骤4:发送消息") + await send_api.text_to_stream( + stream_id=stream_id, + text=reply, + ) + logger.info(f"[主动思考] 已发送冒泡消息到 {stream_id}") # 增加每日计数 proactive_thinking_scheduler._increment_daily_count(stream_id) @@ -497,23 +506,30 @@ async def execute_proactive_thinking(stream_id: str): # 更新统计 if config.enable_statistics: _update_statistics(stream_id, action) - + + logger.info(f"[主动思考] simple_bubble 执行完成") # 冒泡后可以继续主动思考,不需要暂停 elif action == "throw_topic": topic = decision.get("topic", "") - logger.info(f"决策:抛出话题。理由:{reasoning},话题:{topic}") + logger.info(f"[主动思考] 决策:抛出话题。理由:{reasoning},话题:{topic}") if not topic: - logger.warning("选择了抛出话题但未提供话题内容,降级为冒泡") + logger.warning("[主动思考] 选择了抛出话题但未提供话题内容,降级为冒泡") + logger.info(f"[主动思考] 步骤3:生成降级冒泡回复") reply = await _planner.generate_reply(context, "simple_bubble") else: # 生成基于话题的消息 + logger.info(f"[主动思考] 步骤3:生成话题回复") reply = await _planner.generate_reply(context, "throw_topic", topic) if reply: - await send_api.text_to_stream(stream_id=stream_id, text=reply) - logger.info(f"已发送话题消息到 {stream_id}") + logger.info(f"[主动思考] 步骤4:发送消息") + await send_api.text_to_stream( + stream_id=stream_id, + text=reply, + ) + logger.info(f"[主动思考] 已发送话题消息到 {stream_id}") # 增加每日计数 proactive_thinking_scheduler._increment_daily_count(stream_id) @@ -524,15 +540,13 @@ async def execute_proactive_thinking(stream_id: str): # 抛出话题后暂停主动思考(如果配置了冷却时间) if config.topic_throw_cooldown > 0: + logger.info(f"[主动思考] 步骤5:暂停任务") await proactive_thinking_scheduler.pause_proactive_thinking(stream_id, reason="已抛出话题") - - # 设置定时恢复(在reply_reset_enabled关闭时使用) - if not config.reply_reset_enabled: - import asyncio - async def resume_after_cooldown(): - await asyncio.sleep(config.topic_throw_cooldown) - await proactive_thinking_scheduler.schedule_proactive_thinking(stream_id) - asyncio.create_task(resume_after_cooldown()) + logger.info(f"[主动思考] 已暂停聊天流 {stream_id} 的主动思考,等待用户回复") + + logger.info(f"[主动思考] throw_topic 执行完成") + + logger.info(f"[主动思考] 聊天流 {stream_id} 的主动思考执行完成") except Exception as e: - logger.error(f"执行主动思考失败: {e}", exc_info=True) + logger.error(f"[主动思考] 执行主动思考失败: {e}", exc_info=True) diff --git a/src/schedule/unified_scheduler.py b/src/schedule/unified_scheduler.py index a4004b277..0a2fc859b 100644 --- a/src/schedule/unified_scheduler.py +++ b/src/schedule/unified_scheduler.py @@ -85,8 +85,11 @@ class UnifiedScheduler: """处理来自 event_manager 的事件通知 此方法由 event_manager 在触发事件时直接调用 + + 注意:此方法不能在持有 self._lock 的情况下调用, + 否则会导致死锁(因为回调可能再次触发事件) """ - # 获取订阅该事件的所有任务 + # 获取订阅该事件的所有任务(快速复制,减少锁持有时间) async with self._lock: event_tasks = [ task @@ -97,15 +100,18 @@ class UnifiedScheduler: ] if not event_tasks: - logger.debug(f"事件 '{event_name}' 没有对应的调度任务") + logger.debug(f"[调度器] 事件 '{event_name}' 没有对应的调度任务") return - logger.info(f"事件 '{event_name}' 触发,共有 {len(event_tasks)} 个调度任务") + logger.info(f"[调度器] 事件 '{event_name}' 触发,共有 {len(event_tasks)} 个调度任务") tasks_to_remove = [] + # 在锁外执行回调,避免死锁 for task in event_tasks: try: + logger.debug(f"[调度器] 执行事件任务: {task.task_name}") + # 执行回调,传入事件参数 if event_params: if asyncio.iscoroutinefunction(task.callback): @@ -121,14 +127,17 @@ class UnifiedScheduler: # 如果不是循环任务,标记为删除 if not task.is_recurring: tasks_to_remove.append(task.schedule_id) + + logger.debug(f"[调度器] 事件任务 {task.task_name} 执行完成") except Exception as e: - logger.error(f"执行事件 '{event_name}' 的任务 {task.task_name} 时出错: {e}", exc_info=True) + logger.error(f"[调度器] 执行事件 '{event_name}' 的任务 {task.task_name} 时出错: {e}", exc_info=True) # 移除已完成的一次性任务 - async with self._lock: - for schedule_id in tasks_to_remove: - await self._remove_task_internal(schedule_id) + if tasks_to_remove: + async with self._lock: + for schedule_id in tasks_to_remove: + await self._remove_task_internal(schedule_id) async def start(self): """启动调度器""" @@ -190,37 +199,54 @@ class UnifiedScheduler: logger.error(f"调度器检查循环发生错误: {e}", exc_info=True) async def _check_and_trigger_tasks(self): - """检查并触发到期任务""" + """检查并触发到期任务 + + 注意:为了避免死锁,回调执行必须在锁外进行 + """ + current_time = datetime.now() + + # 第一阶段:在锁内快速收集需要触发的任务 async with self._lock: - tasks_to_remove = [] - current_time = datetime.now() - + tasks_to_trigger = [] + for schedule_id, task in list(self._tasks.items()): if not task.is_active: continue try: should_trigger = await self._should_trigger_task(task, current_time) - if should_trigger: - # 执行回调 - await self._execute_callback(task) - - # 更新任务状态 - task.last_triggered_at = current_time - task.trigger_count += 1 - - # 如果不是循环任务,标记为删除 - if not task.is_recurring: - tasks_to_remove.append(schedule_id) - logger.info(f"一次性任务 {task.task_name} 已完成,将被移除") - + tasks_to_trigger.append(task) except Exception as e: logger.error(f"检查任务 {task.task_name} 时发生错误: {e}", exc_info=True) + + # 第二阶段:在锁外执行回调(避免死锁) + tasks_to_remove = [] + + for task in tasks_to_trigger: + try: + logger.debug(f"[调度器] 触发定时任务: {task.task_name}") + + # 执行回调 + await self._execute_callback(task) - # 移除已完成的一次性任务 - for schedule_id in tasks_to_remove: - await self._remove_task_internal(schedule_id) + # 更新任务状态 + task.last_triggered_at = current_time + task.trigger_count += 1 + + # 如果不是循环任务,标记为删除 + if not task.is_recurring: + tasks_to_remove.append(task.schedule_id) + logger.info(f"[调度器] 一次性任务 {task.task_name} 已完成,将被移除") + + except Exception as e: + logger.error(f"[调度器] 执行任务 {task.task_name} 时发生错误: {e}", exc_info=True) + + # 第三阶段:在锁内移除已完成的任务 + if tasks_to_remove: + async with self._lock: + for schedule_id in tasks_to_remove: + await self._remove_task_internal(schedule_id) async def _should_trigger_task(self, task: ScheduleTask, current_time: datetime) -> bool: """判断任务是否应该触发"""