diff --git a/src/chat/express/expression_selector.py b/src/chat/express/expression_selector.py index 18fe26360..3b3f8869e 100644 --- a/src/chat/express/expression_selector.py +++ b/src/chat/express/expression_selector.py @@ -270,7 +270,7 @@ class ExpressionSelector: # 根据配置选择模式 mode = global_config.expression.mode - logger.debug(f"[ExpressionSelector] 使用模式: {mode}") + logger.debug(f"使用表达选择模式: {mode}") if mode == "exp_model": return await self._select_expressions_model_only( @@ -298,7 +298,7 @@ class ExpressionSelector: min_num: int = 5, ) -> list[dict[str, Any]]: """经典模式:随机抽样 + LLM评估""" - logger.debug("[Classic模式] 使用LLM评估表达方式") + logger.debug("使用LLM评估表达方式") return await self.select_suitable_expressions_llm( chat_id=chat_id, chat_info=chat_info, @@ -316,7 +316,7 @@ class ExpressionSelector: min_num: int = 5, ) -> list[dict[str, Any]]: """模型预测模式:先提取情境,再使用StyleLearner预测表达风格""" - logger.debug("[Exp_model模式] 使用情境提取 + StyleLearner预测表达方式") + logger.debug("使用情境提取 + StyleLearner预测表达方式") # 检查是否允许在此聊天流中使用表达 if not self.can_use_expression_for_chat(chat_id): @@ -331,7 +331,7 @@ class ExpressionSelector: ) if not situations: - logger.warning("无法提取聊天情境,回退到经典模式") + logger.debug("无法提取聊天情境,回退到经典模式") return await self._select_expressions_classic( chat_id=chat_id, chat_info=chat_info, @@ -340,27 +340,27 @@ class ExpressionSelector: min_num=min_num ) - logger.info(f"[Exp_model模式] 步骤1完成 - 提取到 {len(situations)} 个情境: {situations}") + logger.debug(f"提取到 {len(situations)} 个情境") # 步骤2: 使用 StyleLearner 为每个情境预测合适的表达方式 learner = style_learner_manager.get_learner(chat_id) all_predicted_styles = {} for i, situation in enumerate(situations, 1): - logger.debug(f"[Exp_model模式] 步骤2.{i} - 为情境预测风格: {situation}") + logger.debug(f"为情境 {i} 预测风格: {situation}") best_style, scores = learner.predict_style(situation, top_k=max_num) if best_style and scores: - logger.debug(f" 预测结果: best={best_style}, scores数量={len(scores)}") + logger.debug(f"预测最佳风格: {best_style}") # 合并分数(取最高分) for style, score in scores.items(): if style not in all_predicted_styles or score > all_predicted_styles[style]: all_predicted_styles[style] = score else: - logger.debug(" 该情境未返回预测结果") + logger.debug("该情境未返回预测结果") if not all_predicted_styles: - logger.warning("[Exp_model模式] StyleLearner未返回预测结果(可能模型未训练),回退到经典模式") + logger.debug("StyleLearner未返回预测结果,回退到经典模式") return await self._select_expressions_classic( chat_id=chat_id, chat_info=chat_info, @@ -372,10 +372,10 @@ class ExpressionSelector: # 将分数字典转换为列表格式 [(style, score), ...] predicted_styles = sorted(all_predicted_styles.items(), key=lambda x: x[1], reverse=True) - logger.info(f"[Exp_model模式] 步骤2完成 - 预测到 {len(predicted_styles)} 个风格, Top3: {predicted_styles[:3]}") + logger.debug(f"预测到 {len(predicted_styles)} 个风格") # 步骤3: 根据预测的风格从数据库获取表达方式 - logger.debug("[Exp_model模式] 步骤3 - 从数据库查询表达方式") + logger.debug("从数据库查询表达方式") expressions = await self.get_model_predicted_expressions( chat_id=chat_id, predicted_styles=predicted_styles, @@ -383,7 +383,7 @@ class ExpressionSelector: ) if not expressions: - logger.warning("[Exp_model模式] 未找到匹配预测风格的表达方式,回退到经典模式") + logger.debug("未找到匹配预测风格的表达方式,回退到经典模式") return await self._select_expressions_classic( chat_id=chat_id, chat_info=chat_info, @@ -392,7 +392,7 @@ class ExpressionSelector: min_num=min_num ) - logger.info(f"[Exp_model模式] 成功! 返回 {len(expressions)} 个表达方式") + logger.debug(f"返回 {len(expressions)} 个表达方式") return expressions async def get_model_predicted_expressions( @@ -417,11 +417,11 @@ class ExpressionSelector: # 提取风格名称(前3个最佳匹配) style_names = [style for style, _ in predicted_styles[:min(3, len(predicted_styles))]] - logger.debug(f"预测最佳风格: {style_names[0] if style_names else 'None'}, Top3分数: {predicted_styles[:3]}") + logger.debug(f"预测最佳风格: {style_names[0] if style_names else 'None'}") # 🔥 使用 get_related_chat_ids 获取所有相关的 chat_id(支持共享表达方式) related_chat_ids = self.get_related_chat_ids(chat_id) - logger.info(f"查询相关的chat_ids ({len(related_chat_ids)}个): {related_chat_ids}") + logger.debug(f"查询相关的chat_ids: {len(related_chat_ids)}个") async with get_db_session() as session: # 🔍 先检查数据库中实际有哪些 chat_id 的数据 @@ -431,7 +431,7 @@ class ExpressionSelector: .distinct() ) db_chat_ids = list(db_chat_ids_result.scalars()) - logger.info(f"数据库中有表达方式的chat_ids ({len(db_chat_ids)}个): {db_chat_ids}") + logger.debug(f"数据库中有表达方式的chat_ids: {len(db_chat_ids)}个") # 获取所有相关 chat_id 的表达方式(用于模糊匹配) all_expressions_result = await session.execute( @@ -441,11 +441,11 @@ class ExpressionSelector: ) all_expressions = list(all_expressions_result.scalars()) - logger.info(f"配置的相关chat_id的表达方式数量: {len(all_expressions)}") + logger.debug(f"配置的相关chat_id的表达方式数量: {len(all_expressions)}") # 🔥 智能回退:如果相关 chat_id 没有数据,尝试查询所有 chat_id if not all_expressions: - logger.info("相关chat_id没有数据,尝试从所有chat_id查询") + logger.debug("相关chat_id没有数据,尝试从所有chat_id查询") all_expressions_result = await session.execute( select(Expression) .where(Expression.type == "style") @@ -501,12 +501,7 @@ class ExpressionSelector: expressions_objs = [e[0] for e in matched_expressions[:max_num]] # 显示最佳匹配的详细信息 - top_matches = [f"{e[3]}->{e[0].style}({e[1]:.2f})" for e in matched_expressions[:3]] - logger.info( - f"模糊匹配成功: 找到 {len(expressions_objs)} 个表达方式\n" - f" 相似度范围: {matched_expressions[0][1]:.2f} ~ {matched_expressions[min(len(matched_expressions)-1, max_num-1)][1]:.2f}\n" - f" Top3匹配: {top_matches}" - ) + logger.debug(f"模糊匹配成功: 找到 {len(expressions_objs)} 个表达方式") # 转换为字典格式 expressions = [ diff --git a/src/chat/memory_system/memory_system.py b/src/chat/memory_system/memory_system.py index 44953b675..c1fd44557 100644 --- a/src/chat/memory_system/memory_system.py +++ b/src/chat/memory_system/memory_system.py @@ -138,7 +138,7 @@ class MemorySystem: self.config = config or MemorySystemConfig.from_global_config() self.llm_model = llm_model self.status = MemorySystemStatus.INITIALIZING - logger.info(f"MemorySystem __init__ called, id: {id(self)}") + logger.debug(f"MemorySystem __init__ called, id: {id(self)}") # 核心组件(简化版) self.memory_builder: MemoryBuilder | None = None @@ -167,11 +167,11 @@ class MemorySystem: # 海马体采样器 self.hippocampus_sampler = None - logger.info("MemorySystem 初始化开始") + logger.debug("MemorySystem 初始化开始") async def initialize(self): """异步初始化记忆系统""" - logger.info(f"MemorySystem initialize started, id: {id(self)}") + logger.debug(f"MemorySystem initialize started, id: {id(self)}") try: # 初始化LLM模型 fallback_task = getattr(self.llm_model, "model_for_task", None) if self.llm_model else None @@ -226,13 +226,13 @@ class MemorySystem: try: try: self.unified_storage = VectorMemoryStorage(storage_config) - logger.info("✅ Vector DB存储系统初始化成功") + logger.debug("Vector DB存储系统初始化成功") except Exception as storage_error: - logger.error(f"❌ Vector DB存储系统初始化失败: {storage_error}", exc_info=True) + logger.error(f"Vector DB存储系统初始化失败: {storage_error}", exc_info=True) self.unified_storage = None # 确保在失败时为None raise except Exception as storage_error: - logger.error(f"❌ Vector DB存储系统初始化失败: {storage_error}", exc_info=True) + logger.error(f"Vector DB存储系统初始化失败: {storage_error}", exc_info=True) raise # 初始化遗忘引擎 @@ -281,7 +281,7 @@ class MemorySystem: from .hippocampus_sampler import initialize_hippocampus_sampler self.hippocampus_sampler = await initialize_hippocampus_sampler(self) - logger.info("✅ 海马体采样器初始化成功") + logger.debug("海马体采样器初始化成功") except Exception as e: logger.warning(f"海马体采样器初始化失败: {e}") self.hippocampus_sampler = None @@ -289,7 +289,7 @@ class MemorySystem: # 统一存储已经自动加载数据,无需额外加载 self.status = MemorySystemStatus.READY - logger.info(f"MemorySystem initialize finished, id: {id(self)}") + logger.debug(f"MemorySystem initialize finished, id: {id(self)}") except Exception as e: self.status = MemorySystemStatus.ERROR logger.error(f"❌ 记忆系统初始化失败: {e}", exc_info=True) @@ -394,7 +394,7 @@ class MemorySystem: value_score = await self._assess_information_value(conversation_text, normalized_context) if value_score < self.config.memory_value_threshold: - logger.info(f"信息价值评分 {value_score:.2f} 低于阈值,跳过记忆构建") + logger.debug(f"信息价值评分 {value_score:.2f} 低于阈值,跳过记忆构建") self.status = original_status return [] else: @@ -446,7 +446,7 @@ class MemorySystem: build_time = time.time() - start_time logger.info( - f"✅ 生成 {len(fused_chunks)} 条记忆,成功入库 {stored_count} 条,耗时 {build_time:.2f}秒", + f"生成 {len(fused_chunks)} 条记忆,入库 {stored_count} 条,耗时 {build_time:.2f}秒", ) self.status = original_status @@ -473,16 +473,16 @@ class MemorySystem: def _log_memory_preview(self, memories: list[MemoryChunk]) -> None: """在控制台输出记忆预览,便于人工检查""" if not memories: - logger.info("📝 本次未生成新的记忆") + logger.debug("本次未生成新的记忆") return - logger.info(f"📝 本次生成的记忆预览 ({len(memories)} 条):") + logger.debug(f"本次生成的记忆预览 ({len(memories)} 条):") for idx, memory in enumerate(memories, start=1): text = memory.text_content or "" if len(text) > 120: text = text[:117] + "..." - logger.info( + logger.debug( f" {idx}) 类型={memory.memory_type.value} 重要性={memory.metadata.importance.name} " f"置信度={memory.metadata.confidence.name} | 内容={text}" ) @@ -800,7 +800,7 @@ class MemorySystem: metadata_filters=metadata_filters, # JSON元数据索引过滤 ) - logger.info(f"[阶段二] 向量搜索完成: 返回 {len(search_results)} 条候选") + logger.debug(f"[阶段二] 向量搜索完成: 返回 {len(search_results)} 条候选") # === 阶段三:综合重排 === scored_memories = [] @@ -874,7 +874,7 @@ class MemorySystem: if instant_memories: # 将瞬时记忆放在列表最前面 final_memories = instant_memories + final_memories - logger.info(f"融合了 {len(instant_memories)} 条瞬时记忆") + logger.debug(f"融合了 {len(instant_memories)} 条瞬时记忆") except Exception as e: logger.warning(f"检索瞬时记忆失败: {e}", exc_info=True) @@ -884,9 +884,9 @@ class MemorySystem: retrieval_time = time.time() - start_time - # 详细日志 - 打印检索到的有效记忆的完整内容 - if scored_memories: - logger.debug("🧠 检索到的有效记忆内容详情:") + # 详细日志 - 只在debug模式打印检索到的完整内容 + if scored_memories and logger.level <= 10: # DEBUG level + logger.debug("检索到的有效记忆内容详情:") for i, (mem, score, details) in enumerate(scored_memories[:effective_limit], 1): try: # 获取记忆的完整内容 @@ -909,7 +909,7 @@ class MemorySystem: created_time_str = datetime.datetime.fromtimestamp(created_time).strftime("%Y-%m-%d %H:%M:%S") if created_time else "unknown" # 打印记忆详细信息 - logger.debug(f" 📝 记忆 #{i}") + logger.debug(f" 记忆 #{i}") logger.debug(f" 类型: {memory_type} | 重要性: {importance} | 置信度: {confidence}") logger.debug(f" 创建时间: {created_time_str}") logger.debug(f" 综合得分: {details['final']:.3f} (向量:{details['vector']:.3f}, 时效:{details['recency']:.3f}, 重要性:{details['importance']:.3f}, 频率:{details['frequency']:.3f})") @@ -935,13 +935,7 @@ class MemorySystem: continue logger.info( - "✅ 三阶段记忆检索完成" - f" | user={resolved_user_id}" - f" | 粗筛={len(search_results)}" - f" | 精筛={len(scored_memories)}" - f" | 返回={len(final_memories)}" - f" | duration={retrieval_time:.3f}s" - f" | query='{optimized_query[:60]}...'" + f"记忆检索完成: 返回 {len(final_memories)} 条 | 耗时 {retrieval_time:.2f}s" ) self.last_retrieval_time = time.time() @@ -1431,9 +1425,9 @@ class MemorySystem: reasoning = result.get("reasoning", "") key_factors = result.get("key_factors", []) - logger.info(f"信息价值评估: {value_score:.2f}, 理由: {reasoning}") + logger.debug(f"信息价值评估: {value_score:.2f}, 理由: {reasoning}") if key_factors: - logger.info(f"关键因素: {', '.join(key_factors)}") + logger.debug(f"关键因素: {', '.join(key_factors)}") return max(0.0, min(1.0, value_score)) diff --git a/src/chat/message_manager/context_manager.py b/src/chat/message_manager/context_manager.py index bd74925c7..f32cdc177 100644 --- a/src/chat/message_manager/context_manager.py +++ b/src/chat/message_manager/context_manager.py @@ -39,7 +39,7 @@ class SingleStreamContextManager: # 标记是否已初始化历史消息 self._history_initialized = False - logger.info(f"[新建] 单流上下文管理器初始化: {stream_id} (id={id(self)})") + logger.debug(f"单流上下文管理器初始化: {stream_id}") # 异步初始化历史消息(不阻塞构造函数) asyncio.create_task(self._initialize_history_from_db()) @@ -237,7 +237,7 @@ class SingleStreamContextManager: else: setattr(self.context, attr, time.time()) await self._update_stream_energy() - logger.info(f"清空单流上下文: {self.stream_id}") + logger.debug(f"清空单流上下文: {self.stream_id}") return True except Exception as e: logger.error(f"清空单流上下文失败 {self.stream_id}: {e}", exc_info=True) @@ -303,15 +303,15 @@ class SingleStreamContextManager: async def _initialize_history_from_db(self): """从数据库初始化历史消息到context中""" if self._history_initialized: - logger.info(f"历史消息已初始化,跳过: {self.stream_id}") + logger.debug(f"历史消息已初始化,跳过: {self.stream_id}") return # 立即设置标志,防止并发重复加载 - logger.info(f"设置历史初始化标志: {self.stream_id}") + logger.debug(f"设置历史初始化标志: {self.stream_id}") self._history_initialized = True try: - logger.info(f"开始从数据库加载历史消息: {self.stream_id}") + logger.debug(f"开始从数据库加载历史消息: {self.stream_id}") from src.chat.utils.chat_message_builder import get_raw_msg_before_timestamp_with_chat @@ -339,7 +339,7 @@ class SingleStreamContextManager: logger.warning(f"转换历史消息失败 (message_id={msg_dict.get('message_id', 'unknown')}): {e}") continue - logger.info(f"成功从数据库加载 {len(self.context.history_messages)} 条历史消息到内存: {self.stream_id}") + logger.debug(f"成功从数据库加载 {len(self.context.history_messages)} 条历史消息到内存: {self.stream_id}") else: logger.debug(f"没有历史消息需要加载: {self.stream_id}") diff --git a/src/chat/message_manager/distribution_manager.py b/src/chat/message_manager/distribution_manager.py index b1def5260..3dda41061 100644 --- a/src/chat/message_manager/distribution_manager.py +++ b/src/chat/message_manager/distribution_manager.py @@ -118,12 +118,12 @@ class StreamLoopManager: # 如果是强制启动且任务仍在运行,先取消旧任务 if force and context.stream_loop_task and not context.stream_loop_task.done(): - logger.info(f"强制启动模式:先取消现有流循环任务: {stream_id}") + logger.debug(f"强制启动模式:先取消现有流循环任务: {stream_id}") old_task = context.stream_loop_task old_task.cancel() try: await asyncio.wait_for(old_task, timeout=2.0) - logger.info(f"旧流循环任务已结束: {stream_id}") + logger.debug(f"旧流循环任务已结束: {stream_id}") except (asyncio.TimeoutError, asyncio.CancelledError): logger.debug(f"旧流循环任务已取消或超时: {stream_id}") except Exception as e: @@ -140,7 +140,7 @@ class StreamLoopManager: self.stats["active_streams"] += 1 self.stats["total_loops"] += 1 - logger.info(f"启动流循环任务: {stream_id}") + logger.debug(f"启动流循环任务: {stream_id}") return True except Exception as e: @@ -183,7 +183,7 @@ class StreamLoopManager: # 清空 StreamContext 中的任务记录 context.stream_loop_task = None - logger.info(f"停止流循环: {stream_id}") + logger.debug(f"停止流循环: {stream_id}") return True async def _stream_loop_worker(self, stream_id: str) -> None: @@ -192,7 +192,7 @@ class StreamLoopManager: Args: stream_id: 流ID """ - logger.info(f"流循环工作器启动: {stream_id}") + logger.debug(f"流循环工作器启动: {stream_id}") try: while self.is_running: @@ -243,7 +243,7 @@ class StreamLoopManager: await asyncio.sleep(interval) except asyncio.CancelledError: - logger.info(f"流循环被取消: {stream_id}") + logger.debug(f"流循环被取消: {stream_id}") break except Exception as e: logger.error(f"流循环出错 {stream_id}: {e}", exc_info=True) @@ -263,7 +263,7 @@ class StreamLoopManager: # 清理间隔记录 self._last_intervals.pop(stream_id, None) - logger.info(f"流循环结束: {stream_id}") + logger.debug(f"流循环结束: {stream_id}") async def _get_stream_context(self, stream_id: str) -> Any | None: """获取流上下文 @@ -333,7 +333,7 @@ class StreamLoopManager: # 在处理开始前,先刷新缓存到未读消息 cached_messages = await self._flush_cached_messages_to_unread(stream_id) if cached_messages: - logger.info(f"处理开始前刷新缓存消息: stream={stream_id}, 数量={len(cached_messages)}") + logger.debug(f"处理开始前刷新缓存消息: stream={stream_id}, 数量={len(cached_messages)}") # 设置触发用户ID,以实现回复保护 last_message = context.get_last_message() @@ -357,7 +357,7 @@ class StreamLoopManager: # 处理成功后,再次刷新缓存中可能的新消息 additional_messages = await self._flush_cached_messages_to_unread(stream_id) if additional_messages: - logger.info(f"处理完成后刷新新消息: stream={stream_id}, 数量={len(additional_messages)}") + logger.debug(f"处理完成后刷新新消息: stream={stream_id}, 数量={len(additional_messages)}") process_time = time.time() - start_time logger.debug(f"流处理成功: {stream_id} (耗时: {process_time:.2f}s)") @@ -367,7 +367,7 @@ class StreamLoopManager: return success except asyncio.CancelledError: - logger.info(f"流处理被取消: {stream_id}") + logger.debug(f"流处理被取消: {stream_id}") # 取消所有子任务 for child_task in child_tasks: if not child_task.done(): @@ -552,7 +552,7 @@ class StreamLoopManager: chatter_manager: chatter管理器实例 """ self.chatter_manager = chatter_manager - logger.info(f"设置chatter管理器: {chatter_manager.__class__.__name__}") + logger.debug(f"设置chatter管理器: {chatter_manager.__class__.__name__}") async def _should_force_dispatch_for_stream(self, stream_id: str) -> bool: if not self.force_dispatch_unread_threshold or self.force_dispatch_unread_threshold <= 0: @@ -652,7 +652,7 @@ class StreamLoopManager: Args: stream_id: 流ID """ - logger.info(f"强制分发流处理: {stream_id}") + logger.debug(f"强制分发流处理: {stream_id}") try: # 获取流上下文 @@ -663,7 +663,7 @@ class StreamLoopManager: # 检查是否有现有的 stream_loop_task if context.stream_loop_task and not context.stream_loop_task.done(): - logger.info(f"发现现有流循环 {stream_id},将先取消再重新创建") + logger.debug(f"发现现有流循环 {stream_id},将先取消再重新创建") existing_task = context.stream_loop_task existing_task.cancel() # 创建异步任务来等待取消完成,并添加异常处理 diff --git a/src/chat/message_manager/global_notice_manager.py b/src/chat/message_manager/global_notice_manager.py index fb6c50736..7f382835f 100644 --- a/src/chat/message_manager/global_notice_manager.py +++ b/src/chat/message_manager/global_notice_manager.py @@ -74,7 +74,7 @@ class GlobalNoticeManager: "last_cleanup_time": 0, } - logger.info("全局Notice管理器初始化完成") + logger.debug("全局Notice管理器初始化完成") def add_notice( self, @@ -135,7 +135,7 @@ class GlobalNoticeManager: # 定期清理过期消息 self._cleanup_expired_notices() - logger.info(f"✅ Notice已添加: id={message.message_id}, type={self._get_notice_type(message)}, scope={scope.value}, target={target_stream_id}, storage_key={storage_key}, ttl={ttl}s") + logger.debug(f"Notice已添加: id={message.message_id}, type={self._get_notice_type(message)}, scope={scope.value}") return True except Exception as e: @@ -282,7 +282,8 @@ class GlobalNoticeManager: for key in keys_to_remove: del self._notices[key] - logger.info(f"清理notice消息: {removed_count} 条") + if removed_count > 0: + logger.debug(f"清理notice消息: {removed_count} 条") return removed_count except Exception as e: diff --git a/src/chat/message_manager/message_manager.py b/src/chat/message_manager/message_manager.py index dd8017ef5..5b92e6ea3 100644 --- a/src/chat/message_manager/message_manager.py +++ b/src/chat/message_manager/message_manager.py @@ -72,13 +72,13 @@ class MessageManager: logger.error(f"启动批量数据库写入器失败: {e}") # 启动消息缓存系统(内置) - logger.info("📦 消息缓存系统已启动") + logger.debug("消息缓存系统已启动") # 启动流循环管理器并设置chatter_manager await stream_loop_manager.start() stream_loop_manager.set_chatter_manager(self.chatter_manager) - logger.info("🚀 消息管理器已启动 | 流循环管理器已启动") + logger.info("消息管理器已启动") async def stop(self): """停止消息管理器""" @@ -92,19 +92,19 @@ class MessageManager: from src.chat.message_manager.batch_database_writer import shutdown_batch_writer await shutdown_batch_writer() - logger.info("📦 批量数据库写入器已停止") + logger.debug("批量数据库写入器已停止") except Exception as e: logger.error(f"停止批量数据库写入器失败: {e}") # 停止消息缓存系统(内置) self.message_caches.clear() self.stream_processing_status.clear() - logger.info("📦 消息缓存系统已停止") + logger.debug("消息缓存系统已停止") # 停止流循环管理器 await stream_loop_manager.stop() - logger.info("🛑 消息管理器已停止 | 流循环管理器已停止") + logger.info("消息管理器已停止") async def add_message(self, stream_id: str, message: DatabaseMessages): """添加消息到指定聊天流""" @@ -113,15 +113,15 @@ class MessageManager: # 检查是否为notice消息 if self._is_notice_message(message): # Notice消息处理 - 添加到全局管理器 - logger.info(f"📢 检测到notice消息: notice_type={getattr(message, 'notice_type', None)}") + logger.debug(f"检测到notice消息: notice_type={getattr(message, 'notice_type', None)}") await self._handle_notice_message(stream_id, message) # 根据配置决定是否继续处理(触发聊天流程) if not global_config.notice.enable_notice_trigger_chat: - logger.info(f"根据配置,流 {stream_id} 的Notice消息将被忽略,不触发聊天流程。") + logger.debug(f"Notice消息将被忽略,不触发聊天流程: {stream_id}") return # 停止处理,不进入未读消息队列 else: - logger.info(f"根据配置,流 {stream_id} 的Notice消息将触发聊天流程。") + logger.debug(f"Notice消息将触发聊天流程: {stream_id}") # 继续执行,将消息添加到未读队列 # 普通消息处理 @@ -201,7 +201,7 @@ class MessageManager: if hasattr(context, "processing_task") and context.processing_task and not context.processing_task.done(): context.processing_task.cancel() - logger.info(f"停用聊天流: {stream_id}") + logger.debug(f"停用聊天流: {stream_id}") except Exception as e: logger.error(f"停用聊天流 {stream_id} 时发生错误: {e}") @@ -218,7 +218,7 @@ class MessageManager: context = chat_stream.context_manager.context context.is_active = True - logger.info(f"激活聊天流: {stream_id}") + logger.debug(f"激活聊天流: {stream_id}") except Exception as e: logger.error(f"激活聊天流 {stream_id} 时发生错误: {e}") @@ -354,8 +354,7 @@ class MessageManager: # 取消 stream_loop_task,子任务会通过 try-catch 自动取消 try: stream_loop_task.cancel() - logger.info(f"已发送取消信号到流循环任务: {chat_stream.stream_id}") - + # 等待任务真正结束(设置超时避免死锁) try: await asyncio.wait_for(stream_loop_task, timeout=2.0) @@ -401,21 +400,21 @@ class MessageManager: # 确保有未读消息需要处理 unread_messages = context.get_unread_messages() if not unread_messages: - logger.debug(f"💭 聊天流 {stream_id} 没有未读消息,跳过重新处理") + logger.debug(f"聊天流 {stream_id} 没有未读消息,跳过重新处理") return - logger.info(f"💬 准备重新处理 {len(unread_messages)} 条未读消息: {stream_id}") + logger.debug(f"准备重新处理 {len(unread_messages)} 条未读消息: {stream_id}") # 重新创建 stream_loop 任务 success = await stream_loop_manager.start_stream_loop(stream_id, force=True) if success: - logger.info(f"✅ 成功重新创建流循环任务: {stream_id}") + logger.debug(f"成功重新创建流循环任务: {stream_id}") else: - logger.warning(f"⚠️ 重新创建流循环任务失败: {stream_id}") + logger.warning(f"重新创建流循环任务失败: {stream_id}") except Exception as e: - logger.error(f"🚨 触发重新处理时出错: {e}") + logger.error(f"触发重新处理时出错: {e}") async def clear_all_unread_messages(self, stream_id: str): """清除指定上下文中的所有未读消息,在消息处理完成后调用""" diff --git a/src/chat/message_receive/uni_message_sender.py b/src/chat/message_receive/uni_message_sender.py index 6e4afe04f..265150b21 100644 --- a/src/chat/message_receive/uni_message_sender.py +++ b/src/chat/message_receive/uni_message_sender.py @@ -30,25 +30,12 @@ async def send_message(message: MessageSending, show_log=True) -> bool: from src.plugin_system.core.event_manager import event_manager if message.chat_stream: - logger.info(f"[发送完成] 准备触发 AFTER_SEND 事件,stream_id={message.chat_stream.stream_id}") - - # 使用 asyncio.create_task 来异步触发事件,避免阻塞 - async def trigger_event_async(): - try: - logger.info("[事件触发] 开始异步触发 AFTER_SEND 事件") - await event_manager.trigger_event( - EventType.AFTER_SEND, - permission_group="SYSTEM", - stream_id=message.chat_stream.stream_id, - message=message, - ) - logger.info("[事件触发] AFTER_SEND 事件触发完成") - except Exception as e: - logger.error(f"[事件触发] 异步触发事件失败: {e}", exc_info=True) - - # 创建异步任务,不等待完成 - asyncio.create_task(trigger_event_async()) # noqa: RUF006 - logger.info("[发送完成] AFTER_SEND 事件已提交到异步任务") + await event_manager.trigger_event( + EventType.AFTER_SEND, + permission_group="SYSTEM", + stream_id=message.chat_stream.stream_id, + message=message, + ) except Exception as event_error: logger.error(f"触发 AFTER_SEND 事件时出错: {event_error}", exc_info=True) diff --git a/src/plugins/built_in/affinity_flow_chatter/proactive_thinking_scheduler.py b/src/plugins/built_in/affinity_flow_chatter/proactive_thinking_scheduler.py index 47ed467cd..e5171c721 100644 --- a/src/plugins/built_in/affinity_flow_chatter/proactive_thinking_scheduler.py +++ b/src/plugins/built_in/affinity_flow_chatter/proactive_thinking_scheduler.py @@ -215,10 +215,10 @@ class ProactiveThinkingScheduler: # 计算并获取最新的 focus_energy logger.debug("[调度器] 找到聊天流,开始计算 focus_energy") focus_energy = await chat_stream.calculate_focus_energy() - logger.info(f"[调度器] 聊天流 {stream_id} 的 focus_energy: {focus_energy:.3f}") + logger.debug(f"[调度器] 聊天流 {stream_id} 的 focus_energy: {focus_energy:.3f}") return focus_energy else: - logger.warning(f"[调度器] ⚠️ 未找到聊天流 {stream_id},使用默认 focus_energy=0.5") + logger.debug(f"[调度器] 未找到聊天流 {stream_id},使用默认 focus_energy=0.5") return 0.5 except Exception as e: @@ -277,8 +277,8 @@ class ProactiveThinkingScheduler: # 计算下次触发时间 next_run_time = datetime.now() + timedelta(seconds=interval_seconds) - logger.info( - f"✅ 聊天流 {stream_id} 主动思考任务已创建 | " + logger.debug( + f"主动思考任务已创建: {stream_id} | " f"Focus: {focus_energy:.3f} | " f"间隔: {interval_seconds / 60:.1f}分钟 | " f"下次: {next_run_time.strftime('%H:%M:%S')}" @@ -313,7 +313,7 @@ class ProactiveThinkingScheduler: if success: self._paused_streams.add(stream_id) - logger.info(f"⏸️ 暂停主动思考 {stream_id},原因: {reason}") + logger.debug(f"暂停主动思考: {stream_id},原因: {reason}") return success @@ -341,7 +341,7 @@ class ProactiveThinkingScheduler: if success: self._paused_streams.discard(stream_id) - logger.info(f"▶️ 恢复主动思考 {stream_id}") + logger.debug(f"恢复主动思考: {stream_id}") return success diff --git a/src/schedule/unified_scheduler.py b/src/schedule/unified_scheduler.py index ac2b93c6a..034154168 100644 --- a/src/schedule/unified_scheduler.py +++ b/src/schedule/unified_scheduler.py @@ -104,7 +104,7 @@ class UnifiedScheduler: logger.debug(f"[调度器] 事件 '{event_name}' 没有对应的调度任务") return - logger.info(f"[调度器] 事件 '{event_name}' 触发,共有 {len(event_tasks)} 个调度任务") + logger.debug(f"[调度器] 事件 '{event_name}' 触发,共有 {len(event_tasks)} 个调度任务") tasks_to_remove = [] @@ -154,7 +154,7 @@ class UnifiedScheduler: from src.plugin_system.core.event_manager import event_manager event_manager.register_scheduler_callback(self._handle_event_trigger) - logger.info("调度器已注册到 event_manager") + logger.debug("调度器已注册到 event_manager") except ImportError: logger.warning("无法导入 event_manager,事件触发功能将不可用") @@ -178,23 +178,23 @@ class UnifiedScheduler: from src.plugin_system.core.event_manager import event_manager event_manager.unregister_scheduler_callback() - logger.info("调度器回调已从 event_manager 注销") + logger.debug("调度器回调已从 event_manager 注销") except ImportError: pass - logger.info(f"统一调度器已停止,共有 {len(self._tasks)} 个任务被清理") + logger.info(f"统一调度器已停止") self._tasks.clear() self._event_subscriptions.clear() async def _check_loop(self): """主循环:每秒检查一次所有任务""" - logger.info("调度器检查循环已启动") + logger.debug("调度器检查循环已启动") while self._running: try: await asyncio.sleep(1) await self._check_and_trigger_tasks() except asyncio.CancelledError: - logger.info("调度器检查循环被取消") + logger.debug("调度器检查循环被取消") break except Exception as e: logger.error(f"调度器检查循环发生错误: {e}", exc_info=True) @@ -238,7 +238,7 @@ class UnifiedScheduler: # 如果不是循环任务,标记为删除 if not task.is_recurring: tasks_to_remove.append(task.schedule_id) - logger.info(f"[调度器] 一次性任务 {task.task_name} 已完成,将被移除") + logger.debug(f"[调度器] 一次性任务 {task.task_name} 已完成,将被移除") except Exception as e: logger.error(f"[调度器] 执行任务 {task.task_name} 时发生错误: {e}", exc_info=True) @@ -306,14 +306,14 @@ class UnifiedScheduler: async def _execute_callback(self, task: ScheduleTask): """执行任务回调函数""" try: - logger.info(f"触发任务: {task.task_name} (ID: {task.schedule_id[:8]}...)") + logger.debug(f"触发任务: {task.task_name}") if asyncio.iscoroutinefunction(task.callback): await task.callback(*task.callback_args, **task.callback_kwargs) else: task.callback(*task.callback_args, **task.callback_kwargs) - logger.info(f"任务 {task.task_name} 执行成功 (第 {task.trigger_count + 1} 次)") + logger.debug(f"任务 {task.task_name} 执行完成") except Exception as e: logger.error(f"执行任务 {task.task_name} 的回调函数时出错: {e}", exc_info=True) @@ -371,7 +371,7 @@ class UnifiedScheduler: self._event_subscriptions.add(event_name) logger.debug(f"开始追踪事件: {event_name}") - logger.info(f"创建调度任务: {task}") + logger.debug(f"创建调度任务: {task.task_name}") return schedule_id async def remove_schedule(self, schedule_id: str) -> bool: @@ -383,7 +383,7 @@ class UnifiedScheduler: task = self._tasks[schedule_id] await self._remove_task_internal(schedule_id) - logger.info(f"移除调度任务: {task.task_name} (ID: {schedule_id[:8]}...)") + logger.debug(f"移除调度任务: {task.task_name}") return True async def trigger_schedule(self, schedule_id: str) -> bool: @@ -416,7 +416,7 @@ class UnifiedScheduler: return False task.is_active = False - logger.info(f"暂停任务: {task.task_name} (ID: {schedule_id[:8]}...)") + logger.debug(f"暂停任务: {task.task_name}") return True async def resume_schedule(self, schedule_id: str) -> bool: @@ -428,7 +428,7 @@ class UnifiedScheduler: return False task.is_active = True - logger.info(f"恢复任务: {task.task_name} (ID: {schedule_id[:8]}...)") + logger.debug(f"恢复任务: {task.task_name}") return True async def get_task_info(self, schedule_id: str) -> dict[str, Any] | None: