refactor(logging): 将多个info日志级别的记录更改为debug级别,以减少日志输出

This commit is contained in:
Windpicker-owo
2025-11-01 00:44:50 +08:00
parent 49f376dc1c
commit 69ee822ef7
9 changed files with 105 additions and 129 deletions

View File

@@ -270,7 +270,7 @@ class ExpressionSelector:
# 根据配置选择模式
mode = global_config.expression.mode
logger.debug(f"[ExpressionSelector] 使用模式: {mode}")
logger.debug(f"使用表达选择模式: {mode}")
if mode == "exp_model":
return await self._select_expressions_model_only(
@@ -298,7 +298,7 @@ class ExpressionSelector:
min_num: int = 5,
) -> list[dict[str, Any]]:
"""经典模式:随机抽样 + LLM评估"""
logger.debug("[Classic模式] 使用LLM评估表达方式")
logger.debug("使用LLM评估表达方式")
return await self.select_suitable_expressions_llm(
chat_id=chat_id,
chat_info=chat_info,
@@ -316,7 +316,7 @@ class ExpressionSelector:
min_num: int = 5,
) -> list[dict[str, Any]]:
"""模型预测模式先提取情境再使用StyleLearner预测表达风格"""
logger.debug("[Exp_model模式] 使用情境提取 + StyleLearner预测表达方式")
logger.debug("使用情境提取 + StyleLearner预测表达方式")
# 检查是否允许在此聊天流中使用表达
if not self.can_use_expression_for_chat(chat_id):
@@ -331,7 +331,7 @@ class ExpressionSelector:
)
if not situations:
logger.warning("无法提取聊天情境,回退到经典模式")
logger.debug("无法提取聊天情境,回退到经典模式")
return await self._select_expressions_classic(
chat_id=chat_id,
chat_info=chat_info,
@@ -340,27 +340,27 @@ class ExpressionSelector:
min_num=min_num
)
logger.info(f"[Exp_model模式] 步骤1完成 - 提取到 {len(situations)} 个情境: {situations}")
logger.debug(f"提取到 {len(situations)} 个情境")
# 步骤2: 使用 StyleLearner 为每个情境预测合适的表达方式
learner = style_learner_manager.get_learner(chat_id)
all_predicted_styles = {}
for i, situation in enumerate(situations, 1):
logger.debug(f"[Exp_model模式] 步骤2.{i} - 为情境预测风格: {situation}")
logger.debug(f"为情境 {i} 预测风格: {situation}")
best_style, scores = learner.predict_style(situation, top_k=max_num)
if best_style and scores:
logger.debug(f" 预测结果: best={best_style}, scores数量={len(scores)}")
logger.debug(f"预测最佳风格: {best_style}")
# 合并分数(取最高分)
for style, score in scores.items():
if style not in all_predicted_styles or score > all_predicted_styles[style]:
all_predicted_styles[style] = score
else:
logger.debug(" 该情境未返回预测结果")
logger.debug("该情境未返回预测结果")
if not all_predicted_styles:
logger.warning("[Exp_model模式] StyleLearner未返回预测结果(可能模型未训练),回退到经典模式")
logger.debug("StyleLearner未返回预测结果回退到经典模式")
return await self._select_expressions_classic(
chat_id=chat_id,
chat_info=chat_info,
@@ -372,10 +372,10 @@ class ExpressionSelector:
# 将分数字典转换为列表格式 [(style, score), ...]
predicted_styles = sorted(all_predicted_styles.items(), key=lambda x: x[1], reverse=True)
logger.info(f"[Exp_model模式] 步骤2完成 - 预测到 {len(predicted_styles)} 个风格, Top3: {predicted_styles[:3]}")
logger.debug(f"预测到 {len(predicted_styles)} 个风格")
# 步骤3: 根据预测的风格从数据库获取表达方式
logger.debug("[Exp_model模式] 步骤3 - 从数据库查询表达方式")
logger.debug("从数据库查询表达方式")
expressions = await self.get_model_predicted_expressions(
chat_id=chat_id,
predicted_styles=predicted_styles,
@@ -383,7 +383,7 @@ class ExpressionSelector:
)
if not expressions:
logger.warning("[Exp_model模式] 未找到匹配预测风格的表达方式,回退到经典模式")
logger.debug("未找到匹配预测风格的表达方式,回退到经典模式")
return await self._select_expressions_classic(
chat_id=chat_id,
chat_info=chat_info,
@@ -392,7 +392,7 @@ class ExpressionSelector:
min_num=min_num
)
logger.info(f"[Exp_model模式] 成功! 返回 {len(expressions)} 个表达方式")
logger.debug(f"返回 {len(expressions)} 个表达方式")
return expressions
async def get_model_predicted_expressions(
@@ -417,11 +417,11 @@ class ExpressionSelector:
# 提取风格名称前3个最佳匹配
style_names = [style for style, _ in predicted_styles[:min(3, len(predicted_styles))]]
logger.debug(f"预测最佳风格: {style_names[0] if style_names else 'None'}, Top3分数: {predicted_styles[:3]}")
logger.debug(f"预测最佳风格: {style_names[0] if style_names else 'None'}")
# 🔥 使用 get_related_chat_ids 获取所有相关的 chat_id支持共享表达方式
related_chat_ids = self.get_related_chat_ids(chat_id)
logger.info(f"查询相关的chat_ids ({len(related_chat_ids)}): {related_chat_ids}")
logger.debug(f"查询相关的chat_ids: {len(related_chat_ids)}")
async with get_db_session() as session:
# 🔍 先检查数据库中实际有哪些 chat_id 的数据
@@ -431,7 +431,7 @@ class ExpressionSelector:
.distinct()
)
db_chat_ids = list(db_chat_ids_result.scalars())
logger.info(f"数据库中有表达方式的chat_ids ({len(db_chat_ids)}): {db_chat_ids}")
logger.debug(f"数据库中有表达方式的chat_ids: {len(db_chat_ids)}")
# 获取所有相关 chat_id 的表达方式(用于模糊匹配)
all_expressions_result = await session.execute(
@@ -441,11 +441,11 @@ class ExpressionSelector:
)
all_expressions = list(all_expressions_result.scalars())
logger.info(f"配置的相关chat_id的表达方式数量: {len(all_expressions)}")
logger.debug(f"配置的相关chat_id的表达方式数量: {len(all_expressions)}")
# 🔥 智能回退:如果相关 chat_id 没有数据,尝试查询所有 chat_id
if not all_expressions:
logger.info("相关chat_id没有数据尝试从所有chat_id查询")
logger.debug("相关chat_id没有数据尝试从所有chat_id查询")
all_expressions_result = await session.execute(
select(Expression)
.where(Expression.type == "style")
@@ -501,12 +501,7 @@ class ExpressionSelector:
expressions_objs = [e[0] for e in matched_expressions[:max_num]]
# 显示最佳匹配的详细信息
top_matches = [f"{e[3]}->{e[0].style}({e[1]:.2f})" for e in matched_expressions[:3]]
logger.info(
f"模糊匹配成功: 找到 {len(expressions_objs)} 个表达方式\n"
f" 相似度范围: {matched_expressions[0][1]:.2f} ~ {matched_expressions[min(len(matched_expressions)-1, max_num-1)][1]:.2f}\n"
f" Top3匹配: {top_matches}"
)
logger.debug(f"模糊匹配成功: 找到 {len(expressions_objs)} 个表达方式")
# 转换为字典格式
expressions = [

View File

@@ -138,7 +138,7 @@ class MemorySystem:
self.config = config or MemorySystemConfig.from_global_config()
self.llm_model = llm_model
self.status = MemorySystemStatus.INITIALIZING
logger.info(f"MemorySystem __init__ called, id: {id(self)}")
logger.debug(f"MemorySystem __init__ called, id: {id(self)}")
# 核心组件(简化版)
self.memory_builder: MemoryBuilder | None = None
@@ -167,11 +167,11 @@ class MemorySystem:
# 海马体采样器
self.hippocampus_sampler = None
logger.info("MemorySystem 初始化开始")
logger.debug("MemorySystem 初始化开始")
async def initialize(self):
"""异步初始化记忆系统"""
logger.info(f"MemorySystem initialize started, id: {id(self)}")
logger.debug(f"MemorySystem initialize started, id: {id(self)}")
try:
# 初始化LLM模型
fallback_task = getattr(self.llm_model, "model_for_task", None) if self.llm_model else None
@@ -226,13 +226,13 @@ class MemorySystem:
try:
try:
self.unified_storage = VectorMemoryStorage(storage_config)
logger.info("Vector DB存储系统初始化成功")
logger.debug("Vector DB存储系统初始化成功")
except Exception as storage_error:
logger.error(f"Vector DB存储系统初始化失败: {storage_error}", exc_info=True)
logger.error(f"Vector DB存储系统初始化失败: {storage_error}", exc_info=True)
self.unified_storage = None # 确保在失败时为None
raise
except Exception as storage_error:
logger.error(f"Vector DB存储系统初始化失败: {storage_error}", exc_info=True)
logger.error(f"Vector DB存储系统初始化失败: {storage_error}", exc_info=True)
raise
# 初始化遗忘引擎
@@ -281,7 +281,7 @@ class MemorySystem:
from .hippocampus_sampler import initialize_hippocampus_sampler
self.hippocampus_sampler = await initialize_hippocampus_sampler(self)
logger.info("海马体采样器初始化成功")
logger.debug("海马体采样器初始化成功")
except Exception as e:
logger.warning(f"海马体采样器初始化失败: {e}")
self.hippocampus_sampler = None
@@ -289,7 +289,7 @@ class MemorySystem:
# 统一存储已经自动加载数据,无需额外加载
self.status = MemorySystemStatus.READY
logger.info(f"MemorySystem initialize finished, id: {id(self)}")
logger.debug(f"MemorySystem initialize finished, id: {id(self)}")
except Exception as e:
self.status = MemorySystemStatus.ERROR
logger.error(f"❌ 记忆系统初始化失败: {e}", exc_info=True)
@@ -394,7 +394,7 @@ class MemorySystem:
value_score = await self._assess_information_value(conversation_text, normalized_context)
if value_score < self.config.memory_value_threshold:
logger.info(f"信息价值评分 {value_score:.2f} 低于阈值,跳过记忆构建")
logger.debug(f"信息价值评分 {value_score:.2f} 低于阈值,跳过记忆构建")
self.status = original_status
return []
else:
@@ -446,7 +446,7 @@ class MemorySystem:
build_time = time.time() - start_time
logger.info(
f"生成 {len(fused_chunks)} 条记忆,成功入库 {stored_count} 条,耗时 {build_time:.2f}",
f"生成 {len(fused_chunks)} 条记忆,入库 {stored_count} 条,耗时 {build_time:.2f}",
)
self.status = original_status
@@ -473,16 +473,16 @@ class MemorySystem:
def _log_memory_preview(self, memories: list[MemoryChunk]) -> None:
"""在控制台输出记忆预览,便于人工检查"""
if not memories:
logger.info("📝 本次未生成新的记忆")
logger.debug("本次未生成新的记忆")
return
logger.info(f"📝 本次生成的记忆预览 ({len(memories)} 条):")
logger.debug(f"本次生成的记忆预览 ({len(memories)} 条):")
for idx, memory in enumerate(memories, start=1):
text = memory.text_content or ""
if len(text) > 120:
text = text[:117] + "..."
logger.info(
logger.debug(
f" {idx}) 类型={memory.memory_type.value} 重要性={memory.metadata.importance.name} "
f"置信度={memory.metadata.confidence.name} | 内容={text}"
)
@@ -800,7 +800,7 @@ class MemorySystem:
metadata_filters=metadata_filters, # JSON元数据索引过滤
)
logger.info(f"[阶段二] 向量搜索完成: 返回 {len(search_results)} 条候选")
logger.debug(f"[阶段二] 向量搜索完成: 返回 {len(search_results)} 条候选")
# === 阶段三:综合重排 ===
scored_memories = []
@@ -874,7 +874,7 @@ class MemorySystem:
if instant_memories:
# 将瞬时记忆放在列表最前面
final_memories = instant_memories + final_memories
logger.info(f"融合了 {len(instant_memories)} 条瞬时记忆")
logger.debug(f"融合了 {len(instant_memories)} 条瞬时记忆")
except Exception as e:
logger.warning(f"检索瞬时记忆失败: {e}", exc_info=True)
@@ -884,9 +884,9 @@ class MemorySystem:
retrieval_time = time.time() - start_time
# 详细日志 - 打印检索到的有效记忆的完整内容
if scored_memories:
logger.debug("🧠 检索到的有效记忆内容详情:")
# 详细日志 - 只在debug模式打印检索到的完整内容
if scored_memories and logger.level <= 10: # DEBUG level
logger.debug("检索到的有效记忆内容详情:")
for i, (mem, score, details) in enumerate(scored_memories[:effective_limit], 1):
try:
# 获取记忆的完整内容
@@ -909,7 +909,7 @@ class MemorySystem:
created_time_str = datetime.datetime.fromtimestamp(created_time).strftime("%Y-%m-%d %H:%M:%S") if created_time else "unknown"
# 打印记忆详细信息
logger.debug(f" 📝 记忆 #{i}")
logger.debug(f" 记忆 #{i}")
logger.debug(f" 类型: {memory_type} | 重要性: {importance} | 置信度: {confidence}")
logger.debug(f" 创建时间: {created_time_str}")
logger.debug(f" 综合得分: {details['final']:.3f} (向量:{details['vector']:.3f}, 时效:{details['recency']:.3f}, 重要性:{details['importance']:.3f}, 频率:{details['frequency']:.3f})")
@@ -935,13 +935,7 @@ class MemorySystem:
continue
logger.info(
"✅ 三阶段记忆检索完成"
f" | user={resolved_user_id}"
f" | 粗筛={len(search_results)}"
f" | 精筛={len(scored_memories)}"
f" | 返回={len(final_memories)}"
f" | duration={retrieval_time:.3f}s"
f" | query='{optimized_query[:60]}...'"
f"记忆检索完成: 返回 {len(final_memories)} 条 | 耗时 {retrieval_time:.2f}s"
)
self.last_retrieval_time = time.time()
@@ -1429,9 +1423,9 @@ class MemorySystem:
reasoning = result.get("reasoning", "")
key_factors = result.get("key_factors", [])
logger.info(f"信息价值评估: {value_score:.2f}, 理由: {reasoning}")
logger.debug(f"信息价值评估: {value_score:.2f}, 理由: {reasoning}")
if key_factors:
logger.info(f"关键因素: {', '.join(key_factors)}")
logger.debug(f"关键因素: {', '.join(key_factors)}")
return max(0.0, min(1.0, value_score))

View File

@@ -39,7 +39,7 @@ class SingleStreamContextManager:
# 标记是否已初始化历史消息
self._history_initialized = False
logger.info(f"[新建] 单流上下文管理器初始化: {stream_id} (id={id(self)})")
logger.debug(f"单流上下文管理器初始化: {stream_id}")
# 异步初始化历史消息(不阻塞构造函数)
asyncio.create_task(self._initialize_history_from_db())
@@ -237,7 +237,7 @@ class SingleStreamContextManager:
else:
setattr(self.context, attr, time.time())
await self._update_stream_energy()
logger.info(f"清空单流上下文: {self.stream_id}")
logger.debug(f"清空单流上下文: {self.stream_id}")
return True
except Exception as e:
logger.error(f"清空单流上下文失败 {self.stream_id}: {e}", exc_info=True)
@@ -303,15 +303,15 @@ class SingleStreamContextManager:
async def _initialize_history_from_db(self):
"""从数据库初始化历史消息到context中"""
if self._history_initialized:
logger.info(f"历史消息已初始化,跳过: {self.stream_id}")
logger.debug(f"历史消息已初始化,跳过: {self.stream_id}")
return
# 立即设置标志,防止并发重复加载
logger.info(f"设置历史初始化标志: {self.stream_id}")
logger.debug(f"设置历史初始化标志: {self.stream_id}")
self._history_initialized = True
try:
logger.info(f"开始从数据库加载历史消息: {self.stream_id}")
logger.debug(f"开始从数据库加载历史消息: {self.stream_id}")
from src.chat.utils.chat_message_builder import get_raw_msg_before_timestamp_with_chat
@@ -339,7 +339,7 @@ class SingleStreamContextManager:
logger.warning(f"转换历史消息失败 (message_id={msg_dict.get('message_id', 'unknown')}): {e}")
continue
logger.info(f"成功从数据库加载 {len(self.context.history_messages)} 条历史消息到内存: {self.stream_id}")
logger.debug(f"成功从数据库加载 {len(self.context.history_messages)} 条历史消息到内存: {self.stream_id}")
else:
logger.debug(f"没有历史消息需要加载: {self.stream_id}")

View File

@@ -118,12 +118,12 @@ class StreamLoopManager:
# 如果是强制启动且任务仍在运行,先取消旧任务
if force and context.stream_loop_task and not context.stream_loop_task.done():
logger.info(f"强制启动模式:先取消现有流循环任务: {stream_id}")
logger.debug(f"强制启动模式:先取消现有流循环任务: {stream_id}")
old_task = context.stream_loop_task
old_task.cancel()
try:
await asyncio.wait_for(old_task, timeout=2.0)
logger.info(f"旧流循环任务已结束: {stream_id}")
logger.debug(f"旧流循环任务已结束: {stream_id}")
except (asyncio.TimeoutError, asyncio.CancelledError):
logger.debug(f"旧流循环任务已取消或超时: {stream_id}")
except Exception as e:
@@ -140,7 +140,7 @@ class StreamLoopManager:
self.stats["active_streams"] += 1
self.stats["total_loops"] += 1
logger.info(f"启动流循环任务: {stream_id}")
logger.debug(f"启动流循环任务: {stream_id}")
return True
except Exception as e:
@@ -183,7 +183,7 @@ class StreamLoopManager:
# 清空 StreamContext 中的任务记录
context.stream_loop_task = None
logger.info(f"停止流循环: {stream_id}")
logger.debug(f"停止流循环: {stream_id}")
return True
async def _stream_loop_worker(self, stream_id: str) -> None:
@@ -192,7 +192,7 @@ class StreamLoopManager:
Args:
stream_id: 流ID
"""
logger.info(f"流循环工作器启动: {stream_id}")
logger.debug(f"流循环工作器启动: {stream_id}")
try:
while self.is_running:
@@ -243,7 +243,7 @@ class StreamLoopManager:
await asyncio.sleep(interval)
except asyncio.CancelledError:
logger.info(f"流循环被取消: {stream_id}")
logger.debug(f"流循环被取消: {stream_id}")
break
except Exception as e:
logger.error(f"流循环出错 {stream_id}: {e}", exc_info=True)
@@ -263,7 +263,7 @@ class StreamLoopManager:
# 清理间隔记录
self._last_intervals.pop(stream_id, None)
logger.info(f"流循环结束: {stream_id}")
logger.debug(f"流循环结束: {stream_id}")
async def _get_stream_context(self, stream_id: str) -> Any | None:
"""获取流上下文
@@ -333,7 +333,7 @@ class StreamLoopManager:
# 在处理开始前,先刷新缓存到未读消息
cached_messages = await self._flush_cached_messages_to_unread(stream_id)
if cached_messages:
logger.info(f"处理开始前刷新缓存消息: stream={stream_id}, 数量={len(cached_messages)}")
logger.debug(f"处理开始前刷新缓存消息: stream={stream_id}, 数量={len(cached_messages)}")
# 设置触发用户ID以实现回复保护
last_message = context.get_last_message()
@@ -357,7 +357,7 @@ class StreamLoopManager:
# 处理成功后,再次刷新缓存中可能的新消息
additional_messages = await self._flush_cached_messages_to_unread(stream_id)
if additional_messages:
logger.info(f"处理完成后刷新新消息: stream={stream_id}, 数量={len(additional_messages)}")
logger.debug(f"处理完成后刷新新消息: stream={stream_id}, 数量={len(additional_messages)}")
process_time = time.time() - start_time
logger.debug(f"流处理成功: {stream_id} (耗时: {process_time:.2f}s)")
@@ -367,7 +367,7 @@ class StreamLoopManager:
return success
except asyncio.CancelledError:
logger.info(f"流处理被取消: {stream_id}")
logger.debug(f"流处理被取消: {stream_id}")
# 取消所有子任务
for child_task in child_tasks:
if not child_task.done():
@@ -552,7 +552,7 @@ class StreamLoopManager:
chatter_manager: chatter管理器实例
"""
self.chatter_manager = chatter_manager
logger.info(f"设置chatter管理器: {chatter_manager.__class__.__name__}")
logger.debug(f"设置chatter管理器: {chatter_manager.__class__.__name__}")
async def _should_force_dispatch_for_stream(self, stream_id: str) -> bool:
if not self.force_dispatch_unread_threshold or self.force_dispatch_unread_threshold <= 0:
@@ -652,7 +652,7 @@ class StreamLoopManager:
Args:
stream_id: 流ID
"""
logger.info(f"强制分发流处理: {stream_id}")
logger.debug(f"强制分发流处理: {stream_id}")
try:
# 获取流上下文
@@ -663,7 +663,7 @@ class StreamLoopManager:
# 检查是否有现有的 stream_loop_task
if context.stream_loop_task and not context.stream_loop_task.done():
logger.info(f"发现现有流循环 {stream_id},将先取消再重新创建")
logger.debug(f"发现现有流循环 {stream_id},将先取消再重新创建")
existing_task = context.stream_loop_task
existing_task.cancel()
# 创建异步任务来等待取消完成,并添加异常处理

View File

@@ -74,7 +74,7 @@ class GlobalNoticeManager:
"last_cleanup_time": 0,
}
logger.info("全局Notice管理器初始化完成")
logger.debug("全局Notice管理器初始化完成")
def add_notice(
self,
@@ -135,7 +135,7 @@ class GlobalNoticeManager:
# 定期清理过期消息
self._cleanup_expired_notices()
logger.info(f"Notice已添加: id={message.message_id}, type={self._get_notice_type(message)}, scope={scope.value}, target={target_stream_id}, storage_key={storage_key}, ttl={ttl}s")
logger.debug(f"Notice已添加: id={message.message_id}, type={self._get_notice_type(message)}, scope={scope.value}")
return True
except Exception as e:
@@ -282,7 +282,8 @@ class GlobalNoticeManager:
for key in keys_to_remove:
del self._notices[key]
logger.info(f"清理notice消息: {removed_count}")
if removed_count > 0:
logger.debug(f"清理notice消息: {removed_count}")
return removed_count
except Exception as e:

View File

@@ -72,13 +72,13 @@ class MessageManager:
logger.error(f"启动批量数据库写入器失败: {e}")
# 启动消息缓存系统(内置)
logger.info("📦 消息缓存系统已启动")
logger.debug("消息缓存系统已启动")
# 启动流循环管理器并设置chatter_manager
await stream_loop_manager.start()
stream_loop_manager.set_chatter_manager(self.chatter_manager)
logger.info("🚀 消息管理器已启动 | 流循环管理器已启动")
logger.info("消息管理器已启动")
async def stop(self):
"""停止消息管理器"""
@@ -92,19 +92,19 @@ class MessageManager:
from src.chat.message_manager.batch_database_writer import shutdown_batch_writer
await shutdown_batch_writer()
logger.info("📦 批量数据库写入器已停止")
logger.debug("批量数据库写入器已停止")
except Exception as e:
logger.error(f"停止批量数据库写入器失败: {e}")
# 停止消息缓存系统(内置)
self.message_caches.clear()
self.stream_processing_status.clear()
logger.info("📦 消息缓存系统已停止")
logger.debug("消息缓存系统已停止")
# 停止流循环管理器
await stream_loop_manager.stop()
logger.info("🛑 消息管理器已停止 | 流循环管理器已停止")
logger.info("消息管理器已停止")
async def add_message(self, stream_id: str, message: DatabaseMessages):
"""添加消息到指定聊天流"""
@@ -113,15 +113,15 @@ class MessageManager:
# 检查是否为notice消息
if self._is_notice_message(message):
# Notice消息处理 - 添加到全局管理器
logger.info(f"📢 检测到notice消息: notice_type={getattr(message, 'notice_type', None)}")
logger.debug(f"检测到notice消息: notice_type={getattr(message, 'notice_type', None)}")
await self._handle_notice_message(stream_id, message)
# 根据配置决定是否继续处理(触发聊天流程)
if not global_config.notice.enable_notice_trigger_chat:
logger.info(f"根据配置,流 {stream_id} 的Notice消息将被忽略不触发聊天流程。")
logger.debug(f"Notice消息将被忽略不触发聊天流程: {stream_id}")
return # 停止处理,不进入未读消息队列
else:
logger.info(f"根据配置,流 {stream_id} 的Notice消息将触发聊天流程。")
logger.debug(f"Notice消息将触发聊天流程: {stream_id}")
# 继续执行,将消息添加到未读队列
# 普通消息处理
@@ -201,7 +201,7 @@ class MessageManager:
if hasattr(context, "processing_task") and context.processing_task and not context.processing_task.done():
context.processing_task.cancel()
logger.info(f"停用聊天流: {stream_id}")
logger.debug(f"停用聊天流: {stream_id}")
except Exception as e:
logger.error(f"停用聊天流 {stream_id} 时发生错误: {e}")
@@ -218,7 +218,7 @@ class MessageManager:
context = chat_stream.context_manager.context
context.is_active = True
logger.info(f"激活聊天流: {stream_id}")
logger.debug(f"激活聊天流: {stream_id}")
except Exception as e:
logger.error(f"激活聊天流 {stream_id} 时发生错误: {e}")
@@ -354,8 +354,7 @@ class MessageManager:
# 取消 stream_loop_task子任务会通过 try-catch 自动取消
try:
stream_loop_task.cancel()
logger.info(f"已发送取消信号到流循环任务: {chat_stream.stream_id}")
# 等待任务真正结束(设置超时避免死锁)
try:
await asyncio.wait_for(stream_loop_task, timeout=2.0)
@@ -401,21 +400,21 @@ class MessageManager:
# 确保有未读消息需要处理
unread_messages = context.get_unread_messages()
if not unread_messages:
logger.debug(f"💭 聊天流 {stream_id} 没有未读消息,跳过重新处理")
logger.debug(f"聊天流 {stream_id} 没有未读消息,跳过重新处理")
return
logger.info(f"💬 准备重新处理 {len(unread_messages)} 条未读消息: {stream_id}")
logger.debug(f"准备重新处理 {len(unread_messages)} 条未读消息: {stream_id}")
# 重新创建 stream_loop 任务
success = await stream_loop_manager.start_stream_loop(stream_id, force=True)
if success:
logger.info(f"成功重新创建流循环任务: {stream_id}")
logger.debug(f"成功重新创建流循环任务: {stream_id}")
else:
logger.warning(f"⚠️ 重新创建流循环任务失败: {stream_id}")
logger.warning(f"重新创建流循环任务失败: {stream_id}")
except Exception as e:
logger.error(f"🚨 触发重新处理时出错: {e}")
logger.error(f"触发重新处理时出错: {e}")
async def clear_all_unread_messages(self, stream_id: str):
"""清除指定上下文中的所有未读消息,在消息处理完成后调用"""

View File

@@ -30,25 +30,12 @@ async def send_message(message: MessageSending, show_log=True) -> bool:
from src.plugin_system.core.event_manager import event_manager
if message.chat_stream:
logger.info(f"[发送完成] 准备触发 AFTER_SEND 事件stream_id={message.chat_stream.stream_id}")
# 使用 asyncio.create_task 来异步触发事件,避免阻塞
async def trigger_event_async():
try:
logger.info("[事件触发] 开始异步触发 AFTER_SEND 事件")
await event_manager.trigger_event(
EventType.AFTER_SEND,
permission_group="SYSTEM",
stream_id=message.chat_stream.stream_id,
message=message,
)
logger.info("[事件触发] AFTER_SEND 事件触发完成")
except Exception as e:
logger.error(f"[事件触发] 异步触发事件失败: {e}", exc_info=True)
# 创建异步任务,不等待完成
asyncio.create_task(trigger_event_async()) # noqa: RUF006
logger.info("[发送完成] AFTER_SEND 事件已提交到异步任务")
await event_manager.trigger_event(
EventType.AFTER_SEND,
permission_group="SYSTEM",
stream_id=message.chat_stream.stream_id,
message=message,
)
except Exception as event_error:
logger.error(f"触发 AFTER_SEND 事件时出错: {event_error}", exc_info=True)

View File

@@ -215,10 +215,10 @@ class ProactiveThinkingScheduler:
# 计算并获取最新的 focus_energy
logger.debug("[调度器] 找到聊天流,开始计算 focus_energy")
focus_energy = await chat_stream.calculate_focus_energy()
logger.info(f"[调度器] 聊天流 {stream_id} 的 focus_energy: {focus_energy:.3f}")
logger.debug(f"[调度器] 聊天流 {stream_id} 的 focus_energy: {focus_energy:.3f}")
return focus_energy
else:
logger.warning(f"[调度器] ⚠️ 未找到聊天流 {stream_id},使用默认 focus_energy=0.5")
logger.debug(f"[调度器] 未找到聊天流 {stream_id},使用默认 focus_energy=0.5")
return 0.5
except Exception as e:
@@ -277,8 +277,8 @@ class ProactiveThinkingScheduler:
# 计算下次触发时间
next_run_time = datetime.now() + timedelta(seconds=interval_seconds)
logger.info(
f"✅ 聊天流 {stream_id} 主动思考任务已创建 | "
logger.debug(
f"主动思考任务已创建: {stream_id} | "
f"Focus: {focus_energy:.3f} | "
f"间隔: {interval_seconds / 60:.1f}分钟 | "
f"下次: {next_run_time.strftime('%H:%M:%S')}"
@@ -313,7 +313,7 @@ class ProactiveThinkingScheduler:
if success:
self._paused_streams.add(stream_id)
logger.info(f"⏸️ 暂停主动思考 {stream_id},原因: {reason}")
logger.debug(f"暂停主动思考: {stream_id},原因: {reason}")
return success
@@ -341,7 +341,7 @@ class ProactiveThinkingScheduler:
if success:
self._paused_streams.discard(stream_id)
logger.info(f"▶️ 恢复主动思考 {stream_id}")
logger.debug(f"恢复主动思考: {stream_id}")
return success

View File

@@ -104,7 +104,7 @@ class UnifiedScheduler:
logger.debug(f"[调度器] 事件 '{event_name}' 没有对应的调度任务")
return
logger.info(f"[调度器] 事件 '{event_name}' 触发,共有 {len(event_tasks)} 个调度任务")
logger.debug(f"[调度器] 事件 '{event_name}' 触发,共有 {len(event_tasks)} 个调度任务")
tasks_to_remove = []
@@ -154,7 +154,7 @@ class UnifiedScheduler:
from src.plugin_system.core.event_manager import event_manager
event_manager.register_scheduler_callback(self._handle_event_trigger)
logger.info("调度器已注册到 event_manager")
logger.debug("调度器已注册到 event_manager")
except ImportError:
logger.warning("无法导入 event_manager事件触发功能将不可用")
@@ -178,23 +178,23 @@ class UnifiedScheduler:
from src.plugin_system.core.event_manager import event_manager
event_manager.unregister_scheduler_callback()
logger.info("调度器回调已从 event_manager 注销")
logger.debug("调度器回调已从 event_manager 注销")
except ImportError:
pass
logger.info(f"统一调度器已停止,共有 {len(self._tasks)} 个任务被清理")
logger.info(f"统一调度器已停止")
self._tasks.clear()
self._event_subscriptions.clear()
async def _check_loop(self):
"""主循环:每秒检查一次所有任务"""
logger.info("调度器检查循环已启动")
logger.debug("调度器检查循环已启动")
while self._running:
try:
await asyncio.sleep(1)
await self._check_and_trigger_tasks()
except asyncio.CancelledError:
logger.info("调度器检查循环被取消")
logger.debug("调度器检查循环被取消")
break
except Exception as e:
logger.error(f"调度器检查循环发生错误: {e}", exc_info=True)
@@ -238,7 +238,7 @@ class UnifiedScheduler:
# 如果不是循环任务,标记为删除
if not task.is_recurring:
tasks_to_remove.append(task.schedule_id)
logger.info(f"[调度器] 一次性任务 {task.task_name} 已完成,将被移除")
logger.debug(f"[调度器] 一次性任务 {task.task_name} 已完成,将被移除")
except Exception as e:
logger.error(f"[调度器] 执行任务 {task.task_name} 时发生错误: {e}", exc_info=True)
@@ -306,14 +306,14 @@ class UnifiedScheduler:
async def _execute_callback(self, task: ScheduleTask):
"""执行任务回调函数"""
try:
logger.info(f"触发任务: {task.task_name} (ID: {task.schedule_id[:8]}...)")
logger.debug(f"触发任务: {task.task_name}")
if asyncio.iscoroutinefunction(task.callback):
await task.callback(*task.callback_args, **task.callback_kwargs)
else:
task.callback(*task.callback_args, **task.callback_kwargs)
logger.info(f"任务 {task.task_name} 执行成功 (第 {task.trigger_count + 1} 次)")
logger.debug(f"任务 {task.task_name} 执行")
except Exception as e:
logger.error(f"执行任务 {task.task_name} 的回调函数时出错: {e}", exc_info=True)
@@ -371,7 +371,7 @@ class UnifiedScheduler:
self._event_subscriptions.add(event_name)
logger.debug(f"开始追踪事件: {event_name}")
logger.info(f"创建调度任务: {task}")
logger.debug(f"创建调度任务: {task.task_name}")
return schedule_id
async def remove_schedule(self, schedule_id: str) -> bool:
@@ -383,7 +383,7 @@ class UnifiedScheduler:
task = self._tasks[schedule_id]
await self._remove_task_internal(schedule_id)
logger.info(f"移除调度任务: {task.task_name} (ID: {schedule_id[:8]}...)")
logger.debug(f"移除调度任务: {task.task_name}")
return True
async def trigger_schedule(self, schedule_id: str) -> bool:
@@ -416,7 +416,7 @@ class UnifiedScheduler:
return False
task.is_active = False
logger.info(f"暂停任务: {task.task_name} (ID: {schedule_id[:8]}...)")
logger.debug(f"暂停任务: {task.task_name}")
return True
async def resume_schedule(self, schedule_id: str) -> bool:
@@ -428,7 +428,7 @@ class UnifiedScheduler:
return False
task.is_active = True
logger.info(f"恢复任务: {task.task_name} (ID: {schedule_id[:8]}...)")
logger.debug(f"恢复任务: {task.task_name}")
return True
async def get_task_info(self, schedule_id: str) -> dict[str, Any] | None: