From 659a8e0d7826b9a77279d3dfd466cbf934367af0 Mon Sep 17 00:00:00 2001 From: tt-P607 <68868379+tt-P607@users.noreply.github.com> Date: Tue, 2 Dec 2025 14:40:58 +0800 Subject: [PATCH] =?UTF-8?q?refactor(api,=20chat):=20=E6=94=B9=E8=BF=9B?= =?UTF-8?q?=E5=BC=82=E6=AD=A5=E5=A4=84=E7=90=86=E5=B9=B6=E8=A7=A3=E5=86=B3?= =?UTF-8?q?=E5=B9=B6=E5=8F=91=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 内存可视化器 API 端点之前在异步路由中执行同步的阻塞操作(文件 I/O、数据处理)。在处理大型图文件时,这可能导致服务器冻结。现在,这些任务已被移至 ThreadPoolExecutor,从而使 API 非阻塞并显著提高响应速度。 在聊天消息管理器中,竞争条件可能导致消息处理重叠或中断后数据流停滞。此提交引入了: - 并发锁(`is_chatter_processing`)以防止流循环同时运行多个 chatter 实例。 - 故障保护机制,在中断时重置处理状态,确保数据流能够恢复并正确继续。 --- scripts/memory_cleaner.py | 680 ++++++++++++++++++ src/api/memory_visualizer_router.py | 263 ++++--- .../message_manager/distribution_manager.py | 20 +- src/chat/message_manager/message_manager.py | 12 +- 4 files changed, 860 insertions(+), 115 deletions(-) create mode 100644 scripts/memory_cleaner.py diff --git a/scripts/memory_cleaner.py b/scripts/memory_cleaner.py new file mode 100644 index 000000000..de388b207 --- /dev/null +++ b/scripts/memory_cleaner.py @@ -0,0 +1,680 @@ +""" +记忆清理脚本 + +功能: +1. 遍历所有长期记忆 +2. 使用 LLM 评估每条记忆的价值 +3. 删除无效/低价值记忆 +4. 合并/精简相似记忆 + +使用方式: +cd Bot +python scripts/memory_cleaner.py [--dry-run] [--batch-size 10] +""" + +import argparse +import asyncio +import json +import sys +from datetime import datetime +from pathlib import Path + +# 添加项目根目录到 Python 路径 +project_root = Path(__file__).parent.parent +sys.path.insert(0, str(project_root)) + +from src.config.config import model_config +from src.llm_models.utils_model import LLMRequest +from src.config.config import global_config + + +# ==================== 配置 ==================== + +# LLM 评估提示词 +EVALUATION_PROMPT = """你是一个非常严格的记忆价值评估专家。你的任务是大幅清理低质量记忆,只保留真正有价值的信息。 + +## 核心原则:宁缺毋滥! +- 默认态度是 DELETE(删除) +- 只有非常明确、有具体信息的记忆才能保留 +- 有任何疑虑就删除 + +## 必须删除的记忆(直接 delete): + +1. **无意义内容**: + - 单字/短语回复:"?"、"1"、"好"、"哦"、"啊"、"嗯"、"哈哈"、"呜呜" + - 表情包、颜文字、emoji 刷屏 + - "某人发了图片/表情/语音"等无实质内容 + - 乱码、无法理解的内容 + +2. **模糊/缺乏上下文的信息**: + - "用户说了什么" 但没有具体内容 + - "某人和某人聊天" 但不知道聊什么 + - 泛泛的描述如"用户很开心"但不知道原因 + - 指代不明的内容("那个"、"这个"、"它") + +3. **水群/无营养聊天**: + - 群内的日常寒暄、问好 + - 闲聊、灌水、抖机灵 + - 无实际信息的互动 + - 复读、玩梗、接龙 + - 讨论与用户个人无关的话题 + +4. **临时/过时信息**: + - 游戏状态、在线状态 + - 已过期的活动、事件 + - 天气、时间等即时信息 + - "刚才"、"现在"等时效性表述 + +5. **重复/冗余**: + - 相同内容的多条记录 + - 可以合并的相似信息 + +6. **AI自身的记忆**: + - AI说了什么话 + - AI的回复内容 + - AI的想法/计划 + +## 可以保留的记忆(必须同时满足): + +1. **有明确的主体**:知道是谁(具体的用户名/昵称/ID) +2. **有具体的内容**:知道具体说了什么、做了什么、是什么 +3. **有长期价值**:这个信息在一个月后仍然有参考意义 + +**保留示例**: +- "用户张三说他是程序员,在杭州工作" ✅ +- "李四说他喜欢打篮球,每周三都会去" ✅ +- "小明说他女朋友叫小红,在一起2年了" ✅ +- "用户A的生日是3月15日" ✅ + +**删除示例**: +- "用户发了个表情" ❌ +- "群里在聊天" ❌ +- "某人说了什么" ❌ +- "今天天气很好" ❌ +- "哈哈哈太好笑了" ❌ + +## 待评估记忆 + +{memories} + +## 输出要求 + +严格按以下 JSON 格式输出,不要有任何其他内容: + +```json +{{ + "evaluations": [ + {{ + "memory_id": "记忆的ID(从上面复制)", + "action": "delete", + "reason": "删除原因" + }}, + {{ + "memory_id": "另一个ID", + "action": "keep", + "reason": "保留原因" + }} + ] +}} +``` + +action 只能是: +- "delete": 删除(应该是大多数) +- "keep": 保留(只有高价值记忆) +- "summarize": 精简(很少用,只有内容过长但有价值时) + +如果 action 是 summarize,需要加 "new_content": "精简后的内容" + +直接输出 JSON,不要任何解释。""" + + +class MemoryCleaner: + """记忆清理器""" + + def __init__(self, dry_run: bool = True, batch_size: int = 10, concurrency: int = 5): + """ + 初始化清理器 + + Args: + dry_run: 是否为模拟运行(不实际修改数据) + batch_size: 每批处理的记忆数量 + concurrency: 并发请求数 + """ + self.dry_run = dry_run + self.batch_size = batch_size + self.concurrency = concurrency + self.data_dir = project_root / "data" / "memory_graph" + self.memory_file = self.data_dir / "memory_graph.json" + self.backup_dir = self.data_dir / "backups" + + # 并发控制 + self.semaphore: asyncio.Semaphore | None = None + + # 统计信息 + self.stats = { + "total": 0, + "kept": 0, + "deleted": 0, + "summarized": 0, + "errors": 0, + "deleted_nodes": 0, + "deleted_edges": 0, + } + + # 日志文件 + self.log_file = self.data_dir / f"cleanup_log_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json" + self.cleanup_log = [] + + def load_memories(self) -> dict: + """加载记忆数据""" + print(f"📂 加载记忆文件: {self.memory_file}") + + if not self.memory_file.exists(): + raise FileNotFoundError(f"记忆文件不存在: {self.memory_file}") + + with open(self.memory_file, "r", encoding="utf-8") as f: + data = json.load(f) + + return data + + def extract_memory_text(self, memory_dict: dict) -> str: + """从记忆字典中提取可读文本""" + parts = [] + + # 提取基本信息 + memory_id = memory_dict.get("id", "unknown") + parts.append(f"ID: {memory_id}") + + # 提取节点内容 + nodes = memory_dict.get("nodes", []) + for node in nodes: + node_type = node.get("node_type", "") + content = node.get("content", "") + if content: + parts.append(f"[{node_type}] {content}") + + # 提取边关系 + edges = memory_dict.get("edges", []) + for edge in edges: + relation = edge.get("relation", "") + if relation: + parts.append(f"关系: {relation}") + + # 提取元数据 + metadata = memory_dict.get("metadata", {}) + if metadata: + if "context" in metadata: + parts.append(f"上下文: {metadata['context']}") + if "emotion" in metadata: + parts.append(f"情感: {metadata['emotion']}") + + # 提取重要性和状态 + importance = memory_dict.get("importance", 0) + status = memory_dict.get("status", "unknown") + created_at = memory_dict.get("created_at", "unknown") + + parts.append(f"重要性: {importance}, 状态: {status}, 创建时间: {created_at}") + + return "\n".join(parts) + + async def evaluate_batch(self, memories: list[dict], batch_id: int = 0) -> tuple[int, list[dict]]: + """ + 使用 LLM 评估一批记忆(带并发控制) + + Args: + memories: 记忆字典列表 + batch_id: 批次编号 + + Returns: + (批次ID, 评估结果列表) + """ + async with self.semaphore: + # 构建记忆文本 + memory_texts = [] + for i, mem in enumerate(memories): + text = self.extract_memory_text(mem) + memory_texts.append(f"=== 记忆 {i+1} ===\n{text}") + + combined_text = "\n\n".join(memory_texts) + prompt = EVALUATION_PROMPT.format(memories=combined_text) + + try: + # 使用 LLMRequest 调用模型 + if model_config is None: + raise RuntimeError("model_config 未初始化,请确保已加载配置") + task_config = model_config.model_task_config.utils + llm = LLMRequest(task_config, request_type="memory_cleanup") + response_text, (reasoning, model_name, _) = await llm.generate_response_async( + prompt=prompt, + temperature=0.2, + max_tokens=4000, + ) + + print(f" ✅ 批次 {batch_id} 完成 (模型: {model_name})") + + # 解析 JSON 响应 + response_text = response_text.strip() + + # 尝试提取 JSON + if "```json" in response_text: + json_start = response_text.find("```json") + 7 + json_end = response_text.find("```", json_start) + response_text = response_text[json_start:json_end].strip() + elif "```" in response_text: + json_start = response_text.find("```") + 3 + json_end = response_text.find("```", json_start) + response_text = response_text[json_start:json_end].strip() + + result = json.loads(response_text) + evaluations = result.get("evaluations", []) + + # 为评估结果添加实际的 memory_id + for j, eval_result in enumerate(evaluations): + if j < len(memories): + eval_result["memory_id"] = memories[j].get("id", f"unknown_{batch_id}_{j}") + + return (batch_id, evaluations) + + except json.JSONDecodeError as e: + print(f" ❌ 批次 {batch_id} JSON 解析失败: {e}") + return (batch_id, []) + except Exception as e: + print(f" ❌ 批次 {batch_id} LLM 调用失败: {e}") + return (batch_id, []) + + async def initialize(self): + """初始化(创建信号量)""" + self.semaphore = asyncio.Semaphore(self.concurrency) + print(f"🔧 初始化完成 (并发数: {self.concurrency})") + + def create_backup(self, data: dict): + """创建数据备份""" + self.backup_dir.mkdir(parents=True, exist_ok=True) + backup_file = self.backup_dir / f"memory_graph_backup_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json" + + print(f"💾 创建备份: {backup_file}") + with open(backup_file, "w", encoding="utf-8") as f: + json.dump(data, f, ensure_ascii=False, indent=2) + + return backup_file + + def apply_changes(self, data: dict, evaluations: list[dict]) -> dict: + """ + 应用评估结果到数据 + + Args: + data: 原始数据 + evaluations: 评估结果列表 + + Returns: + 修改后的数据 + """ + # 创建评估结果索引 + eval_map = {e["memory_id"]: e for e in evaluations if "memory_id" in e} + + # 需要删除的记忆 ID + to_delete = set() + # 需要更新的记忆 + to_update = {} + + for eval_result in evaluations: + memory_id = eval_result.get("memory_id") + action = eval_result.get("action") + + if action == "delete": + to_delete.add(memory_id) + self.stats["deleted"] += 1 + self.cleanup_log.append({ + "memory_id": memory_id, + "action": "delete", + "reason": eval_result.get("reason", ""), + "timestamp": datetime.now().isoformat() + }) + elif action == "summarize": + to_update[memory_id] = eval_result.get("new_content") + self.stats["summarized"] += 1 + self.cleanup_log.append({ + "memory_id": memory_id, + "action": "summarize", + "reason": eval_result.get("reason", ""), + "new_content": eval_result.get("new_content"), + "timestamp": datetime.now().isoformat() + }) + else: + self.stats["kept"] += 1 + + if self.dry_run: + print("🔍 [DRY RUN] 不实际修改数据") + return data + + # 实际修改数据 + # 1. 删除记忆 + memories = data.get("memories", {}) + for mem_id in to_delete: + if mem_id in memories: + del memories[mem_id] + + # 2. 更新记忆内容 + for mem_id, new_content in to_update.items(): + if mem_id in memories: + # 更新主题节点的内容 + memory = memories[mem_id] + for node in memory.get("nodes", []): + if node.get("node_type") in ["主题", "topic", "TOPIC"]: + node["content"] = new_content + break + + # 3. 清理孤立节点和边 + data = self.cleanup_orphaned_nodes_and_edges(data) + + return data + + def cleanup_orphaned_nodes_and_edges(self, data: dict) -> dict: + """ + 清理孤立的节点和边 + + 孤立节点:其 metadata.memory_ids 中的所有记忆都已被删除 + 孤立边:其 source 或 target 节点已被删除 + """ + print("\n🔗 清理孤立节点和边...") + + # 获取当前所有有效的记忆 ID + valid_memory_ids = set(data.get("memories", {}).keys()) + print(f" 有效记忆数: {len(valid_memory_ids)}") + + # 清理节点 + nodes = data.get("nodes", []) + original_node_count = len(nodes) + + valid_nodes = [] + valid_node_ids = set() + + for node in nodes: + node_id = node.get("id") + metadata = node.get("metadata", {}) + memory_ids = metadata.get("memory_ids", []) + + # 检查节点关联的记忆是否还存在 + if memory_ids: + # 过滤掉已删除的记忆 ID + remaining_memory_ids = [mid for mid in memory_ids if mid in valid_memory_ids] + + if remaining_memory_ids: + # 更新 metadata 中的 memory_ids + metadata["memory_ids"] = remaining_memory_ids + valid_nodes.append(node) + valid_node_ids.add(node_id) + # 如果没有剩余的有效记忆 ID,节点被丢弃 + else: + # 没有 memory_ids 的节点(可能是其他方式创建的),检查是否被某个记忆引用 + # 保守处理:保留这些节点 + valid_nodes.append(node) + valid_node_ids.add(node_id) + + deleted_nodes = original_node_count - len(valid_nodes) + data["nodes"] = valid_nodes + print(f" ✅ 节点: {original_node_count} → {len(valid_nodes)} (删除 {deleted_nodes})") + + # 清理边 + edges = data.get("edges", []) + original_edge_count = len(edges) + + valid_edges = [] + for edge in edges: + source = edge.get("source") + target = edge.get("target") + + # 只保留两端节点都存在的边 + if source in valid_node_ids and target in valid_node_ids: + valid_edges.append(edge) + + deleted_edges = original_edge_count - len(valid_edges) + data["edges"] = valid_edges + print(f" ✅ 边: {original_edge_count} → {len(valid_edges)} (删除 {deleted_edges})") + + # 更新统计 + self.stats["deleted_nodes"] = deleted_nodes + self.stats["deleted_edges"] = deleted_edges + + return data + + def save_data(self, data: dict): + """保存修改后的数据""" + if self.dry_run: + print("🔍 [DRY RUN] 跳过保存") + return + + print(f"💾 保存数据到: {self.memory_file}") + with open(self.memory_file, "w", encoding="utf-8") as f: + json.dump(data, f, ensure_ascii=False, indent=2) + + def save_log(self): + """保存清理日志""" + print(f"📝 保存清理日志到: {self.log_file}") + with open(self.log_file, "w", encoding="utf-8") as f: + json.dump({ + "stats": self.stats, + "dry_run": self.dry_run, + "timestamp": datetime.now().isoformat(), + "log": self.cleanup_log + }, f, ensure_ascii=False, indent=2) + + async def run(self): + """运行清理流程""" + print("=" * 60) + print("🧹 记忆清理脚本 (高并发版)") + print("=" * 60) + print(f"模式: {'模拟运行 (DRY RUN)' if self.dry_run else '实际执行'}") + print(f"批次大小: {self.batch_size}") + print(f"并发数: {self.concurrency}") + print("=" * 60) + + # 初始化 + await self.initialize() + + # 加载数据 + data = self.load_memories() + + # 获取所有记忆 + memories = data.get("memories", {}) + memory_list = list(memories.values()) + self.stats["total"] = len(memory_list) + + print(f"📊 总记忆数: {self.stats['total']}") + + if not memory_list: + print("⚠️ 没有记忆需要处理") + return + + # 创建备份 + if not self.dry_run: + self.create_backup(data) + + # 分批 + batches = [] + for i in range(0, len(memory_list), self.batch_size): + batch = memory_list[i:i + self.batch_size] + batches.append(batch) + + total_batches = len(batches) + print(f"📦 共 {total_batches} 个批次,开始并发处理...\n") + + # 并发处理所有批次 + start_time = datetime.now() + tasks = [ + self.evaluate_batch(batch, batch_id=idx) + for idx, batch in enumerate(batches) + ] + + # 使用 asyncio.gather 并发执行 + results = await asyncio.gather(*tasks, return_exceptions=True) + + end_time = datetime.now() + elapsed = (end_time - start_time).total_seconds() + + # 收集所有评估结果 + all_evaluations = [] + success_count = 0 + error_count = 0 + + for result in results: + if isinstance(result, Exception): + print(f" ❌ 批次异常: {result}") + error_count += 1 + elif isinstance(result, tuple): + batch_id, evaluations = result + if evaluations: + all_evaluations.extend(evaluations) + success_count += 1 + else: + error_count += 1 + + print(f"\n⏱️ 并发处理完成,耗时 {elapsed:.1f} 秒") + print(f" 成功批次: {success_count}/{total_batches}, 失败: {error_count}") + + # 统计评估结果 + delete_count = sum(1 for e in all_evaluations if e.get("action") == "delete") + keep_count = sum(1 for e in all_evaluations if e.get("action") == "keep") + summarize_count = sum(1 for e in all_evaluations if e.get("action") == "summarize") + + print(f" 📊 评估结果: 保留 {keep_count}, 删除 {delete_count}, 精简 {summarize_count}") + + # 应用更改 + print("\n" + "=" * 60) + print("📊 应用更改...") + data = self.apply_changes(data, all_evaluations) + + # 保存数据 + self.save_data(data) + + # 保存日志 + self.save_log() + + # 打印统计 + print("\n" + "=" * 60) + print("📊 清理统计") + print("=" * 60) + print(f"总记忆数: {self.stats['total']}") + print(f"保留: {self.stats['kept']}") + print(f"删除: {self.stats['deleted']}") + print(f"精简: {self.stats['summarized']}") + print(f"删除节点: {self.stats['deleted_nodes']}") + print(f"删除边: {self.stats['deleted_edges']}") + print(f"错误: {self.stats['errors']}") + print(f"处理速度: {self.stats['total'] / elapsed:.1f} 条/秒") + print("=" * 60) + + if self.dry_run: + print("\n⚠️ 这是模拟运行,实际数据未被修改") + print("如要实际执行,请移除 --dry-run 参数") + + async def run_cleanup_only(self): + """只清理孤立节点和边,不重新评估记忆""" + print("=" * 60) + print("🔗 孤立节点/边清理模式") + print("=" * 60) + print(f"模式: {'模拟运行 (DRY RUN)' if self.dry_run else '实际执行'}") + print("=" * 60) + + # 加载数据 + data = self.load_memories() + + # 统计原始数据 + memories = data.get("memories", {}) + nodes = data.get("nodes", []) + edges = data.get("edges", []) + + print(f"📊 当前状态: {len(memories)} 条记忆, {len(nodes)} 个节点, {len(edges)} 条边") + + if not self.dry_run: + self.create_backup(data) + + # 清理孤立节点和边 + if self.dry_run: + # 模拟运行:统计但不修改 + valid_memory_ids = set(memories.keys()) + + # 统计要删除的节点 + nodes_to_keep = 0 + for node in nodes: + metadata = node.get("metadata", {}) + memory_ids = metadata.get("memory_ids", []) + if memory_ids: + remaining = [mid for mid in memory_ids if mid in valid_memory_ids] + if remaining: + nodes_to_keep += 1 + else: + nodes_to_keep += 1 + + nodes_to_delete = len(nodes) - nodes_to_keep + + # 统计要删除的边(需要先确定哪些节点会被保留) + valid_node_ids = set() + for node in nodes: + metadata = node.get("metadata", {}) + memory_ids = metadata.get("memory_ids", []) + if memory_ids: + remaining = [mid for mid in memory_ids if mid in valid_memory_ids] + if remaining: + valid_node_ids.add(node.get("id")) + else: + valid_node_ids.add(node.get("id")) + + edges_to_keep = sum(1 for e in edges if e.get("source") in valid_node_ids and e.get("target") in valid_node_ids) + edges_to_delete = len(edges) - edges_to_keep + + print(f"\n🔍 [DRY RUN] 预计清理:") + print(f" 节点: {len(nodes)} → {nodes_to_keep} (删除 {nodes_to_delete})") + print(f" 边: {len(edges)} → {edges_to_keep} (删除 {edges_to_delete})") + print("\n⚠️ 这是模拟运行,实际数据未被修改") + print("如要实际执行,请移除 --dry-run 参数") + else: + data = self.cleanup_orphaned_nodes_and_edges(data) + self.save_data(data) + + print(f"\n✅ 清理完成!") + print(f" 删除节点: {self.stats['deleted_nodes']}") + print(f" 删除边: {self.stats['deleted_edges']}") + + +async def main(): + parser = argparse.ArgumentParser(description="记忆清理脚本 (高并发版)") + parser.add_argument( + "--dry-run", + action="store_true", + help="模拟运行,不实际修改数据" + ) + parser.add_argument( + "--batch-size", + type=int, + default=10, + help="每批处理的记忆数量 (默认: 10)" + ) + parser.add_argument( + "--concurrency", + type=int, + default=10, + help="并发请求数 (默认: 10)" + ) + parser.add_argument( + "--cleanup-only", + action="store_true", + help="只清理孤立节点和边,不重新评估记忆" + ) + + args = parser.parse_args() + + cleaner = MemoryCleaner( + dry_run=args.dry_run, + batch_size=args.batch_size, + concurrency=args.concurrency, + ) + + if args.cleanup_only: + await cleaner.run_cleanup_only() + else: + await cleaner.run() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/src/api/memory_visualizer_router.py b/src/api/memory_visualizer_router.py index 2a197651e..86658e7e0 100644 --- a/src/api/memory_visualizer_router.py +++ b/src/api/memory_visualizer_router.py @@ -4,7 +4,9 @@ 提供 Web API 用于可视化记忆图数据 """ +import asyncio from collections import defaultdict +from concurrent.futures import ThreadPoolExecutor from datetime import datetime from pathlib import Path from typing import Any @@ -23,6 +25,9 @@ data_dir = project_root / "data" / "memory_graph" graph_data_cache = None current_data_file = None +# 线程池用于异步文件读取 +_executor = ThreadPoolExecutor(max_workers=2) + # FastAPI 路由 router = APIRouter() @@ -64,7 +69,7 @@ def find_available_data_files() -> list[Path]: async def load_graph_data_from_file(file_path: Path | None = None) -> dict[str, Any]: - """从磁盘加载图数据""" + """从磁盘加载图数据(异步,不阻塞主线程)""" global graph_data_cache, current_data_file if file_path and file_path != current_data_file: @@ -86,65 +91,81 @@ async def load_graph_data_from_file(file_path: Path | None = None) -> dict[str, if not graph_file.exists(): return {"error": f"文件不存在: {graph_file}", "nodes": [], "edges": [], "stats": {}} - with open(graph_file, encoding="utf-8") as f: - data = orjson.loads(f.read()) + # 在线程池中异步读取文件,避免阻塞主事件循环 + loop = asyncio.get_event_loop() + data = await loop.run_in_executor(_executor, _sync_load_json_file, graph_file) nodes = data.get("nodes", []) edges = data.get("edges", []) metadata = data.get("metadata", {}) - nodes_dict = { - node["id"]: { - **node, - "label": node.get("content", ""), - "group": node.get("node_type", ""), - "title": f"{node.get('node_type', '')}: {node.get('content', '')}", - } - for node in nodes - if node.get("id") - } - - edges_list = [] - seen_edge_ids = set() - for edge in edges: - edge_id = edge.get("id") - if edge_id and edge_id not in seen_edge_ids: - edges_list.append( - { - **edge, - "from": edge.get("source", edge.get("source_id")), - "to": edge.get("target", edge.get("target_id")), - "label": edge.get("relation", ""), - "arrows": "to", - } - ) - seen_edge_ids.add(edge_id) - - stats = metadata.get("statistics", {}) - total_memories = stats.get("total_memories", 0) - - graph_data_cache = { - "nodes": list(nodes_dict.values()), - "edges": edges_list, - "memories": [], - "stats": { - "total_nodes": len(nodes_dict), - "total_edges": len(edges_list), - "total_memories": total_memories, - }, - "current_file": str(graph_file), - "file_size": graph_file.stat().st_size, - "file_modified": datetime.fromtimestamp(graph_file.stat().st_mtime).isoformat(), - } + # 在线程池中处理数据转换 + processed = await loop.run_in_executor( + _executor, _process_graph_data, nodes, edges, metadata, graph_file + ) + + graph_data_cache = processed return graph_data_cache except Exception as e: import traceback - traceback.print_exc() raise HTTPException(status_code=500, detail=f"加载图数据失败: {e}") +def _sync_load_json_file(file_path: Path) -> dict: + """同步加载 JSON 文件(在线程池中执行)""" + with open(file_path, encoding="utf-8") as f: + return orjson.loads(f.read()) + + +def _process_graph_data(nodes: list, edges: list, metadata: dict, graph_file: Path) -> dict: + """处理图数据(在线程池中执行)""" + nodes_dict = { + node["id"]: { + **node, + "label": node.get("content", ""), + "group": node.get("node_type", ""), + "title": f"{node.get('node_type', '')}: {node.get('content', '')}", + } + for node in nodes + if node.get("id") + } + + edges_list = [] + seen_edge_ids = set() + for edge in edges: + edge_id = edge.get("id") + if edge_id and edge_id not in seen_edge_ids: + edges_list.append( + { + **edge, + "from": edge.get("source", edge.get("source_id")), + "to": edge.get("target", edge.get("target_id")), + "label": edge.get("relation", ""), + "arrows": "to", + } + ) + seen_edge_ids.add(edge_id) + + stats = metadata.get("statistics", {}) + total_memories = stats.get("total_memories", 0) + + return { + "nodes": list(nodes_dict.values()), + "edges": edges_list, + "memories": [], + "stats": { + "total_nodes": len(nodes_dict), + "total_edges": len(edges_list), + "total_memories": total_memories, + }, + "current_file": str(graph_file), + "file_size": graph_file.stat().st_size, + "file_modified": datetime.fromtimestamp(graph_file.stat().st_mtime).isoformat(), + } + + @router.get("/", response_class=HTMLResponse) async def index(request: Request): """主页面""" @@ -152,7 +173,7 @@ async def index(request: Request): def _format_graph_data_from_manager(memory_manager) -> dict[str, Any]: - """从 MemoryManager 提取并格式化图数据""" + """从 MemoryManager 提取并格式化图数据(同步版本,需在线程池中调用)""" if not memory_manager.graph_store: return {"nodes": [], "edges": [], "memories": [], "stats": {}} @@ -216,7 +237,9 @@ async def get_full_graph(): data = {} if memory_manager and memory_manager._initialized: - data = _format_graph_data_from_manager(memory_manager) + # 在线程池中执行,避免阻塞主事件循环 + loop = asyncio.get_event_loop() + data = await loop.run_in_executor(_executor, _format_graph_data_from_manager, memory_manager) else: # 如果内存管理器不可用,则从文件加载 data = await load_graph_data_from_file() @@ -270,71 +293,93 @@ async def get_paginated_graph( memory_manager = get_memory_manager() - # 获取完整数据 + # 获取完整数据(已经是异步的) if memory_manager and memory_manager._initialized: - full_data = _format_graph_data_from_manager(memory_manager) + loop = asyncio.get_event_loop() + full_data = await loop.run_in_executor(_executor, _format_graph_data_from_manager, memory_manager) else: full_data = await load_graph_data_from_file() - nodes = full_data.get("nodes", []) - edges = full_data.get("edges", []) + # 在线程池中处理分页逻辑 + loop = asyncio.get_event_loop() + result = await loop.run_in_executor( + _executor, + _process_pagination, + full_data, page, page_size, min_importance, node_types + ) - # 过滤节点类型 - if node_types: - allowed_types = set(node_types.split(",")) - nodes = [n for n in nodes if n.get("group") in allowed_types] - - # 按重要性排序(如果有importance字段) - nodes_with_importance = [] - for node in nodes: - # 计算节点重要性(连接的边数) - edge_count = sum(1 for e in edges if e.get("from") == node["id"] or e.get("to") == node["id"]) - importance = edge_count / max(len(edges), 1) - if importance >= min_importance: - node["importance"] = importance - nodes_with_importance.append(node) - - # 按重要性降序排序 - nodes_with_importance.sort(key=lambda x: x.get("importance", 0), reverse=True) - - # 分页 - total_nodes = len(nodes_with_importance) - total_pages = (total_nodes + page_size - 1) // page_size - start_idx = (page - 1) * page_size - end_idx = min(start_idx + page_size, total_nodes) - - paginated_nodes = nodes_with_importance[start_idx:end_idx] - node_ids = set(n["id"] for n in paginated_nodes) - - # 只保留连接分页节点的边 - paginated_edges = [ - e for e in edges - if e.get("from") in node_ids and e.get("to") in node_ids - ] - - return JSONResponse(content={"success": True, "data": { - "nodes": paginated_nodes, - "edges": paginated_edges, - "pagination": { - "page": page, - "page_size": page_size, - "total_nodes": total_nodes, - "total_pages": total_pages, - "has_next": page < total_pages, - "has_prev": page > 1, - }, - "stats": { - "total_nodes": total_nodes, - "total_edges": len(paginated_edges), - "total_memories": full_data.get("stats", {}).get("total_memories", 0), - }, - }}) + return JSONResponse(content={"success": True, "data": result}) except Exception as e: import traceback traceback.print_exc() return JSONResponse(content={"success": False, "error": str(e)}, status_code=500) +def _process_pagination(full_data: dict, page: int, page_size: int, min_importance: float, node_types: str | None) -> dict: + """处理分页逻辑(在线程池中执行)""" + nodes = full_data.get("nodes", []) + edges = full_data.get("edges", []) + + # 过滤节点类型 + if node_types: + allowed_types = set(node_types.split(",")) + nodes = [n for n in nodes if n.get("group") in allowed_types] + + # 构建边的索引以加速查找 + edge_count_map = {} + for e in edges: + from_id = e.get("from") + to_id = e.get("to") + edge_count_map[from_id] = edge_count_map.get(from_id, 0) + 1 + edge_count_map[to_id] = edge_count_map.get(to_id, 0) + 1 + + # 按重要性排序 + nodes_with_importance = [] + total_edges = max(len(edges), 1) + for node in nodes: + edge_count = edge_count_map.get(node["id"], 0) + importance = edge_count / total_edges + if importance >= min_importance: + node["importance"] = importance + nodes_with_importance.append(node) + + # 按重要性降序排序 + nodes_with_importance.sort(key=lambda x: x.get("importance", 0), reverse=True) + + # 分页 + total_nodes = len(nodes_with_importance) + total_pages = (total_nodes + page_size - 1) // page_size + start_idx = (page - 1) * page_size + end_idx = min(start_idx + page_size, total_nodes) + + paginated_nodes = nodes_with_importance[start_idx:end_idx] + node_ids = set(n["id"] for n in paginated_nodes) + + # 只保留连接分页节点的边 + paginated_edges = [ + e for e in edges + if e.get("from") in node_ids and e.get("to") in node_ids + ] + + return { + "nodes": paginated_nodes, + "edges": paginated_edges, + "pagination": { + "page": page, + "page_size": page_size, + "total_nodes": total_nodes, + "total_pages": total_pages, + "has_next": page < total_pages, + "has_prev": page > 1, + }, + "stats": { + "total_nodes": total_nodes, + "total_edges": len(paginated_edges), + "total_memories": full_data.get("stats", {}).get("total_memories", 0), + }, + } + + @router.get("/api/graph/clustered") async def get_clustered_graph( max_nodes: int = Query(300, ge=50, le=1000, description="最大节点数"), @@ -346,9 +391,10 @@ async def get_clustered_graph( memory_manager = get_memory_manager() - # 获取完整数据 + # 获取完整数据(异步) if memory_manager and memory_manager._initialized: - full_data = _format_graph_data_from_manager(memory_manager) + loop = asyncio.get_event_loop() + full_data = await loop.run_in_executor(_executor, _format_graph_data_from_manager, memory_manager) else: full_data = await load_graph_data_from_file() @@ -364,8 +410,11 @@ async def get_clustered_graph( "clustered": False, }}) - # 执行聚类 - clustered_data = _cluster_graph_data(nodes, edges, max_nodes, cluster_threshold) + # 在线程池中执行聚类 + loop = asyncio.get_event_loop() + clustered_data = await loop.run_in_executor( + _executor, _cluster_graph_data, nodes, edges, max_nodes, cluster_threshold + ) return JSONResponse(content={"success": True, "data": { **clustered_data, diff --git a/src/chat/message_manager/distribution_manager.py b/src/chat/message_manager/distribution_manager.py index a7bd24f69..e3b3c4070 100644 --- a/src/chat/message_manager/distribution_manager.py +++ b/src/chat/message_manager/distribution_manager.py @@ -318,6 +318,15 @@ class StreamLoopManager: has_messages = force_dispatch or await self._has_messages_to_process(context) if has_messages: + # 🔒 并发保护:如果 Chatter 正在处理中,跳过本轮 + # 这可能发生在:1) 打断后重启循环 2) 处理时间超过轮询间隔 + if context.is_chatter_processing: + logger.debug(f"🔒 [流工作器] stream={stream_id[:8]}, Chatter正在处理中,跳过本轮") + # 不打印"开始处理"日志,直接进入下一轮等待 + # 使用较短的等待时间,等待当前处理完成 + await asyncio.sleep(1.0) + continue + if force_dispatch: logger.info(f"⚡ [流工作器] stream={stream_id[:8]}, 任务ID={task_id}, 未读消息 {unread_count} 条,触发强制分发") else: @@ -477,10 +486,11 @@ class StreamLoopManager: logger.warning(f"Chatter管理器未设置: {stream_id}") return False - # 🔒 防止并发处理:如果已经在处理中,直接返回 + # 🔒 二次并发保护(防御性检查) + # 正常情况下不应该触发,如果触发说明有竞态条件 if context.is_chatter_processing: - logger.debug(f"🔒 [并发保护] stream={stream_id[:8]}, Chatter 正在处理中,跳过本次处理请求") - return True # 返回 True,这是正常的保护机制,不是失败 + logger.warning(f"🔒 [并发保护] stream={stream_id[:8]}, Chatter正在处理中(二次检查触发,可能存在竞态)") + return False # 设置处理状态为正在处理 self._set_stream_processing_status(stream_id, True) @@ -720,8 +730,8 @@ class StreamLoopManager: chat_manager = get_chat_manager() chat_stream = await chat_manager.get_stream(stream_id) if chat_stream and not chat_stream.group_info: - # 私聊:有消息时几乎立即响应,空转时稍微等待 - min_interval = 0.1 if has_messages else 3.0 + # 私聊:有消息时快速响应,空转时稍微等待 + min_interval = 0.5 if has_messages else 5.0 logger.debug(f"流 {stream_id} 私聊模式,使用最小间隔: {min_interval:.2f}s") return min_interval except Exception as e: diff --git a/src/chat/message_manager/message_manager.py b/src/chat/message_manager/message_manager.py index 211a2f8fc..657c59067 100644 --- a/src/chat/message_manager/message_manager.py +++ b/src/chat/message_manager/message_manager.py @@ -370,12 +370,18 @@ class MessageManager: logger.info(f"🚀 打断后重新创建流循环任务: {stream_id}") - # 等待一小段时间确保当前消息已经添加到未读消息中 - await asyncio.sleep(0.1) - # 获取当前的stream context context = chat_stream.context + # 🔒 重要:确保 is_chatter_processing 被重置 + # 被取消的任务的 finally 块可能还没执行完,这里强制重置 + if context.is_chatter_processing: + logger.debug(f"打断后强制重置 is_chatter_processing: {stream_id}") + context.is_chatter_processing = False + + # 等待一小段时间确保当前消息已经添加到未读消息中 + await asyncio.sleep(0.1) + # 确保有未读消息需要处理 unread_messages = context.get_unread_messages() if not unread_messages: