diff --git a/src/api/memory_visualizer_router.py b/src/api/memory_visualizer_router.py index e1137d684..2e4e1d482 100644 --- a/src/api/memory_visualizer_router.py +++ b/src/api/memory_visualizer_router.py @@ -218,7 +218,7 @@ async def get_full_graph(): data = _format_graph_data_from_manager(memory_manager) else: # 如果内存管理器不可用,则从文件加载 - data = load_graph_data_from_file() + data = await load_graph_data_from_file() return JSONResponse(content={"success": True, "data": data}) except Exception as e: @@ -247,7 +247,7 @@ async def get_graph_summary(): "current_file": "memory_manager (实时数据)", }}) else: - data = load_graph_data_from_file() + data = await load_graph_data_from_file() return JSONResponse(content={"success": True, "data": { "stats": data.get("stats", {}), "current_file": data.get("current_file", ""), @@ -273,7 +273,7 @@ async def get_paginated_graph( if memory_manager and memory_manager._initialized: full_data = _format_graph_data_from_manager(memory_manager) else: - full_data = load_graph_data_from_file() + full_data = await load_graph_data_from_file() nodes = full_data.get("nodes", []) edges = full_data.get("edges", []) @@ -349,7 +349,7 @@ async def get_clustered_graph( if memory_manager and memory_manager._initialized: full_data = _format_graph_data_from_manager(memory_manager) else: - full_data = load_graph_data_from_file() + full_data = await load_graph_data_from_file() nodes = full_data.get("nodes", []) edges = full_data.get("edges", []) @@ -514,7 +514,7 @@ async def select_file(request: Request): graph_data_cache = None current_data_file = file_to_load - graph_data = load_graph_data_from_file(file_to_load) + graph_data = await load_graph_data_from_file(file_to_load) return JSONResponse( content={ @@ -532,7 +532,7 @@ async def reload_data(): """重新加载数据""" global graph_data_cache graph_data_cache = None - data = load_graph_data_from_file() + data = await load_graph_data_from_file() return JSONResponse(content={"success": True, "message": "数据已重新加载", "stats": data.get("stats", {})}) @@ -560,7 +560,7 @@ async def search_memories(q: str, limit: int = 50): ) else: # 从文件加载的数据中搜索 (降级方案) - data = load_graph_data_from_file() + data = await load_graph_data_from_file() for memory in data.get("memories", []): if q.lower() in memory.get("text", "").lower(): results.append(memory) @@ -582,7 +582,7 @@ async def search_memories(q: str, limit: int = 50): async def get_statistics(): """获取统计信息""" try: - data = load_graph_data_from_file() + data = await load_graph_data_from_file() node_types = {} memory_types = {} diff --git a/src/chat/message_receive/chat_stream.py b/src/chat/message_receive/chat_stream.py index f021ddebe..e16930ffe 100644 --- a/src/chat/message_receive/chat_stream.py +++ b/src/chat/message_receive/chat_stream.py @@ -51,7 +51,7 @@ class ChatStream: context=StreamContext( stream_id=stream_id, chat_type=ChatType.GROUP if group_info else ChatType.PRIVATE, - chat_mode=ChatMode.NORMAL, + chat_mode=ChatMode.FOCUS, ), ) @@ -523,7 +523,7 @@ class ChatManager: context=StreamContext( stream_id=stream_id, chat_type=ChatType.GROUP if stream.group_info else ChatType.PRIVATE, - chat_mode=ChatMode.NORMAL, + chat_mode=ChatMode.FOCUS, ), ) else: @@ -777,7 +777,7 @@ class ChatManager: context=StreamContext( stream_id=stream.stream_id, chat_type=ChatType.GROUP if stream.group_info else ChatType.PRIVATE, - chat_mode=ChatMode.NORMAL, + chat_mode=ChatMode.FOCUS, ), ) else: diff --git a/src/common/data_models/info_data_model.py b/src/common/data_models/info_data_model.py index d53921c4b..7737fc80c 100644 --- a/src/common/data_models/info_data_model.py +++ b/src/common/data_models/info_data_model.py @@ -27,8 +27,6 @@ class ActionPlannerInfo(BaseDataModel): action_data: dict | None = None action_message: Optional["DatabaseMessages"] = None available_actions: dict[str, "ActionInfo"] | None = None - should_quote_reply: bool | None = None # 是否应该引用回复原消息,None表示由系统自动决定 - @dataclass class InterestScore(BaseDataModel): diff --git a/src/common/data_models/message_manager_data_model.py b/src/common/data_models/message_manager_data_model.py index 8afbbde57..73e7e5d82 100644 --- a/src/common/data_models/message_manager_data_model.py +++ b/src/common/data_models/message_manager_data_model.py @@ -43,7 +43,7 @@ class StreamContext(BaseDataModel): stream_id: str chat_type: ChatType = ChatType.PRIVATE # 聊天类型,默认为私聊 - chat_mode: ChatMode = ChatMode.NORMAL # 聊天模式,默认为普通模式 + chat_mode: ChatMode = ChatMode.FOCUS # 聊天模式,默认为专注模式 unread_messages: list["DatabaseMessages"] = field(default_factory=list) history_messages: list["DatabaseMessages"] = field(default_factory=list) last_check_time: float = field(default_factory=time.time) diff --git a/src/memory_graph/manager.py b/src/memory_graph/manager.py index 32fe76f20..ac43ff954 100644 --- a/src/memory_graph/manager.py +++ b/src/memory_graph/manager.py @@ -327,8 +327,8 @@ class MemoryManager: memory.updated_at = datetime.now() - # 保存更新 - await self.persistence.save_graph_store(self.graph_store) + # 异步保存更新(不阻塞当前操作) + asyncio.create_task(self._async_save_graph_store("更新记忆")) logger.info(f"记忆更新成功: {memory_id}") return True @@ -363,8 +363,8 @@ class MemoryManager: # 从图存储删除记忆 self.graph_store.remove_memory(memory_id) - # 保存更新 - await self.persistence.save_graph_store(self.graph_store) + # 异步保存更新(不阻塞当前操作) + asyncio.create_task(self._async_save_graph_store("删除记忆")) logger.info(f"记忆删除成功: {memory_id}") return True @@ -595,8 +595,8 @@ class MemoryManager: for related_id in related_memories[:max_related]: await self.activate_memory(related_id, propagation_strength) - # 保存更新 - await self.persistence.save_graph_store(self.graph_store) + # 异步保存更新(不阻塞当前操作) + asyncio.create_task(self._async_save_graph_store("激活记忆")) logger.debug(f"记忆已激活: {memory_id} (level={new_activation:.3f})") return True @@ -663,9 +663,9 @@ class MemoryManager: "strength": strength }) - # 批量保存到数据库 + # 批量保存到数据库(异步执行) if activation_updates: - await self.persistence.save_graph_store(self.graph_store) + asyncio.create_task(self._async_save_graph_store("批量激活更新")) # 激活传播(异步执行,不阻塞主流程) asyncio.create_task(self._batch_propagate_activation(memories_to_activate, base_strength)) @@ -752,8 +752,8 @@ class MemoryManager: base_strength: 基础激活强度 """ try: - # 批量保存到数据库 - await self.persistence.save_graph_store(self.graph_store) + # 批量保存到数据库(异步执行) + asyncio.create_task(self._async_save_graph_store("后台激活更新")) # 简化的激活传播(仅在强度足够时执行) if base_strength > 0.08: # 提高传播阈值,减少传播频率 @@ -952,8 +952,8 @@ class MemoryManager: else: logger.debug(f"记忆已删除: {memory_id} (删除了 {deleted_vectors} 个向量)") - # 4. 保存更新 - await self.persistence.save_graph_store(self.graph_store) + # 4. 异步保存更新(不阻塞当前操作) + asyncio.create_task(self._async_save_graph_store("删除相关记忆")) return True else: logger.error(f"从图存储删除记忆失败: {memory_id}") @@ -1375,9 +1375,9 @@ class MemoryManager: except Exception as e: logger.warning(f"删除记忆 {memory.id[:8]} 失败: {e}") - # 批量保存(一次性写入,减少I/O) - await self.persistence.save_graph_store(self.graph_store) - logger.info("💾 去重保存完成") + # 批量保存(一次性写入,减少I/O,异步执行) + asyncio.create_task(self._async_save_graph_store("记忆去重")) + logger.info("💾 去重保存任务已启动") # ===== 步骤4: 向量检索关联记忆 + LLM分析关系 ===== # 过滤掉已删除的记忆 @@ -1486,9 +1486,9 @@ class MemoryManager: except Exception as e: logger.warning(f"添加边到图失败: {e}") - # 批量保存更新 - await self.persistence.save_graph_store(self.graph_store) - logger.info("💾 关联边保存完成") + # 批量保存更新(异步执行) + asyncio.create_task(self._async_save_graph_store("记忆关联边")) + logger.info("💾 关联边保存任务已启动") logger.info(f"✅ 记忆整理完成: {result}") @@ -1636,10 +1636,10 @@ class MemoryManager: logger.warning(f"建立关联失败: {e}") continue - # 保存更新后的图数据 + # 异步保存更新后的图数据 if result["linked_count"] > 0: - await self.persistence.save_graph_store(self.graph_store) - logger.info(f"已保存 {result['linked_count']} 条自动关联边") + asyncio.create_task(self._async_save_graph_store("自动关联")) + logger.info(f"已启动保存任务: {result['linked_count']} 条自动关联边") logger.info(f"自动关联完成: {result}") return result @@ -2335,3 +2335,28 @@ class MemoryManager: finally: self._maintenance_running = False logger.debug("维护循环已清理完毕") + + async def _async_save_graph_store(self, operation_name: str = "未知操作") -> None: + """ + 异步保存图存储到磁盘 + + 此方法设计为在后台任务中执行,包含错误处理 + + Args: + operation_name: 操作名称,用于日志记录 + """ + try: + # 确保图存储存在且已初始化 + if self.graph_store is None: + logger.warning(f"图存储未初始化,跳过异步保存: {operation_name}") + return + + if self.persistence is None: + logger.warning(f"持久化管理器未初始化,跳过异步保存: {operation_name}") + return + + await self.persistence.save_graph_store(self.graph_store) + logger.debug(f"异步保存图数据成功: {operation_name}") + except Exception as e: + logger.error(f"异步保存图数据失败 ({operation_name}): {e}", exc_info=True) + # 可以考虑添加重试机制或者通知机制 diff --git a/src/memory_graph/storage/persistence.py b/src/memory_graph/storage/persistence.py index 23b6a0735..70e745099 100644 --- a/src/memory_graph/storage/persistence.py +++ b/src/memory_graph/storage/persistence.py @@ -56,6 +56,7 @@ class PersistenceManager: self.auto_save_interval = auto_save_interval self._auto_save_task: asyncio.Task | None = None self._running = False + self._file_lock = asyncio.Lock() # 文件操作锁 logger.info(f"初始化持久化管理器: data_dir={data_dir}") @@ -66,34 +67,53 @@ class PersistenceManager: Args: graph_store: 图存储对象 """ - try: - # 转换为字典 - data = graph_store.to_dict() + async with self._file_lock: # 使用文件锁防止并发访问 + try: + # 转换为字典 + data = graph_store.to_dict() - # 添加元数据 - data["metadata"] = { - "version": "0.1.0", - "saved_at": datetime.now().isoformat(), - "statistics": graph_store.get_statistics(), - } + # 添加元数据 + data["metadata"] = { + "version": "0.1.0", + "saved_at": datetime.now().isoformat(), + "statistics": graph_store.get_statistics(), + } - # 使用 orjson 序列化(更快) - json_data = orjson.dumps( - data, - option=orjson.OPT_INDENT_2 | orjson.OPT_SERIALIZE_NUMPY, - ) + # 使用 orjson 序列化(更快) + json_data = orjson.dumps( + data, + option=orjson.OPT_INDENT_2 | orjson.OPT_SERIALIZE_NUMPY, + ) - # 原子写入(先写临时文件,再重命名) - temp_file = self.graph_file.with_suffix(".tmp") - async with aiofiles.open(temp_file, "wb") as f: - await f.write(json_data) - temp_file.replace(self.graph_file) + # 原子写入(先写临时文件,再重命名) + temp_file = self.graph_file.with_suffix(".tmp") + async with aiofiles.open(temp_file, "wb") as f: + await f.write(json_data) - logger.info(f"图数据已保存: {self.graph_file}, 大小: {len(json_data) / 1024:.2f} KB") + # 在Windows上,确保目标文件没有被占用 + if self.graph_file.exists(): + import os + try: + os.unlink(self.graph_file) + except OSError: + # 如果无法删除,等待一小段时间再重试 + await asyncio.sleep(0.1) + try: + os.unlink(self.graph_file) + except OSError: + # 如果还是失败,使用备用策略 + backup_file = self.graph_file.with_suffix(".bak") + if backup_file.exists(): + os.unlink(backup_file) + self.graph_file.rename(backup_file) - except Exception as e: - logger.error(f"保存图数据失败: {e}", exc_info=True) - raise + temp_file.replace(self.graph_file) + + logger.info(f"图数据已保存: {self.graph_file}, 大小: {len(json_data) / 1024:.2f} KB") + + except Exception as e: + logger.error(f"保存图数据失败: {e}", exc_info=True) + raise async def load_graph_store(self) -> GraphStore | None: """ @@ -106,26 +126,36 @@ class PersistenceManager: logger.info("图数据文件不存在,返回空图") return None - try: - # 读取文件 - async with aiofiles.open(self.graph_file, "rb") as f: - json_data = await f.read() - data = orjson.loads(json_data) + async with self._file_lock: # 使用文件锁防止并发访问 + try: + # 读取文件,添加重试机制处理可能的文件锁定 + max_retries = 3 + for attempt in range(max_retries): + try: + async with aiofiles.open(self.graph_file, "rb") as f: + json_data = await f.read() + data = orjson.loads(json_data) + break + except OSError as e: + if attempt == max_retries - 1: + raise + logger.warning(f"读取图数据文件失败 (尝试 {attempt + 1}/{max_retries}): {e}") + await asyncio.sleep(0.1 * (attempt + 1)) - # 检查版本(未来可能需要数据迁移) - version = data.get("metadata", {}).get("version", "unknown") - logger.info(f"加载图数据: version={version}") + # 检查版本(未来可能需要数据迁移) + version = data.get("metadata", {}).get("version", "unknown") + logger.info(f"加载图数据: version={version}") - # 恢复图存储 - graph_store = GraphStore.from_dict(data) + # 恢复图存储 + graph_store = GraphStore.from_dict(data) - logger.info(f"图数据加载完成: {graph_store.get_statistics()}") - return graph_store + logger.info(f"图数据加载完成: {graph_store.get_statistics()}") + return graph_store - except Exception as e: - logger.error(f"加载图数据失败: {e}", exc_info=True) - # 尝试加载备份 - return await self._load_from_backup() + except Exception as e: + logger.error(f"加载图数据失败: {e}", exc_info=True) + # 尝试加载备份 + return await self._load_from_backup() async def save_staged_memories(self, staged_memories: list[StagedMemory]) -> None: """ @@ -134,28 +164,47 @@ class PersistenceManager: Args: staged_memories: 临时记忆列表 """ - try: - data = { - "metadata": { - "version": "0.1.0", - "saved_at": datetime.now().isoformat(), - "count": len(staged_memories), - }, - "staged_memories": [sm.to_dict() for sm in staged_memories], - } + async with self._file_lock: # 使用文件锁防止并发访问 + try: + data = { + "metadata": { + "version": "0.1.0", + "saved_at": datetime.now().isoformat(), + "count": len(staged_memories), + }, + "staged_memories": [sm.to_dict() for sm in staged_memories], + } - json_data = orjson.dumps(data, option=orjson.OPT_INDENT_2 | orjson.OPT_SERIALIZE_NUMPY) + json_data = orjson.dumps(data, option=orjson.OPT_INDENT_2 | orjson.OPT_SERIALIZE_NUMPY) - temp_file = self.staged_file.with_suffix(".tmp") - async with aiofiles.open(temp_file, "wb") as f: - await f.write(json_data) - temp_file.replace(self.staged_file) + temp_file = self.staged_file.with_suffix(".tmp") + async with aiofiles.open(temp_file, "wb") as f: + await f.write(json_data) - logger.info(f"临时记忆已保存: {len(staged_memories)} 条") + # 在Windows上,确保目标文件没有被占用 + if self.staged_file.exists(): + import os + try: + os.unlink(self.staged_file) + except OSError: + # 如果无法删除,等待一小段时间再重试 + await asyncio.sleep(0.1) + try: + os.unlink(self.staged_file) + except OSError: + # 如果还是失败,使用备用策略 + backup_file = self.staged_file.with_suffix(".bak") + if backup_file.exists(): + os.unlink(backup_file) + self.staged_file.rename(backup_file) - except Exception as e: - logger.error(f"保存临时记忆失败: {e}", exc_info=True) - raise + temp_file.replace(self.staged_file) + + logger.info(f"临时记忆已保存: {len(staged_memories)} 条") + + except Exception as e: + logger.error(f"保存临时记忆失败: {e}", exc_info=True) + raise async def load_staged_memories(self) -> list[StagedMemory]: """ @@ -168,19 +217,30 @@ class PersistenceManager: logger.info("临时记忆文件不存在,返回空列表") return [] - try: - async with aiofiles.open(self.staged_file, "rb") as f: - json_data = await f.read() - data = orjson.loads(json_data) + async with self._file_lock: # 使用文件锁防止并发访问 + try: + # 读取文件,添加重试机制处理可能的文件锁定 + max_retries = 3 + for attempt in range(max_retries): + try: + async with aiofiles.open(self.staged_file, "rb") as f: + json_data = await f.read() + data = orjson.loads(json_data) + break + except OSError as e: + if attempt == max_retries - 1: + raise + logger.warning(f"读取临时记忆文件失败 (尝试 {attempt + 1}/{max_retries}): {e}") + await asyncio.sleep(0.1 * (attempt + 1)) - staged_memories = [StagedMemory.from_dict(sm) for sm in data.get("staged_memories", [])] + staged_memories = [StagedMemory.from_dict(sm) for sm in data.get("staged_memories", [])] - logger.info(f"临时记忆加载完成: {len(staged_memories)} 条") - return staged_memories + logger.info(f"临时记忆加载完成: {len(staged_memories)} 条") + return staged_memories - except Exception as e: - logger.error(f"加载临时记忆失败: {e}", exc_info=True) - return [] + except Exception as e: + logger.error(f"加载临时记忆失败: {e}", exc_info=True) + return [] async def create_backup(self) -> Path | None: """ @@ -225,9 +285,19 @@ class PersistenceManager: latest_backup = backup_files[0] logger.warning(f"尝试从备份恢复: {latest_backup}") - async with aiofiles.open(latest_backup, "rb") as f: - json_data = await f.read() - data = orjson.loads(json_data) + # 读取备份文件,添加重试机制 + max_retries = 3 + for attempt in range(max_retries): + try: + async with aiofiles.open(latest_backup, "rb") as f: + json_data = await f.read() + data = orjson.loads(json_data) + break + except OSError as e: + if attempt == max_retries - 1: + raise + logger.warning(f"读取备份文件失败 (尝试 {attempt + 1}/{max_retries}): {e}") + await asyncio.sleep(0.1 * (attempt + 1)) graph_store = GraphStore.from_dict(data) logger.info(f"从备份恢复成功: {graph_store.get_statistics()}") diff --git a/src/memory_graph/tools/memory_tools.py b/src/memory_graph/tools/memory_tools.py index 20a9022f3..88e77b34b 100644 --- a/src/memory_graph/tools/memory_tools.py +++ b/src/memory_graph/tools/memory_tools.py @@ -4,6 +4,7 @@ LLM 工具接口:定义记忆系统的工具 schema 和执行逻辑 from __future__ import annotations +import asyncio from typing import Any from src.common.logger import get_logger @@ -376,8 +377,8 @@ class MemoryTools: # 3. 添加到存储(暂存状态) await self._add_memory_to_stores(memory) - # 4. 保存到磁盘 - await self.persistence_manager.save_graph_store(self.graph_store) + # 4. 异步保存到磁盘(不阻塞当前操作) + asyncio.create_task(self._async_save_graph_store()) logger.info(f"记忆创建成功: {memory.id}") @@ -456,8 +457,8 @@ class MemoryTools: **edge.metadata ) - # 5. 保存 - await self.persistence_manager.save_graph_store(self.graph_store) + # 5. 异步保存(不阻塞当前操作) + asyncio.create_task(self._async_save_graph_store()) logger.info(f"记忆关联成功: {source_memory.id} -> {target_memory.id}") @@ -918,7 +919,6 @@ class MemoryTools: # 获取上下文信息 chat_history = context.get("chat_history", "") if context else "" - sender = context.get("sender", "") if context else "" # 处理聊天历史,提取最近5条左右的对话 recent_chat = "" @@ -937,6 +937,7 @@ class MemoryTools: **最近聊天记录(最近10条):** {recent_chat if recent_chat else '无聊天历史'} +**目标消息:** {query} --- ## 第一步:分析查询意图与记忆类型 @@ -1313,3 +1314,24 @@ class MemoryTools: MemoryTools.get_link_memories_schema(), MemoryTools.get_search_memories_schema(), ] + + async def _async_save_graph_store(self) -> None: + """ + 异步保存图存储到磁盘 + + 此方法设计为在后台任务中执行,包含错误处理 + """ + try: + # 确保组件已初始化 + if self.graph_store is None: + logger.warning("图存储未初始化,跳过异步保存") + return + + if self.persistence_manager is None: + logger.warning("持久化管理器未初始化,跳过异步保存") + return + + await self.persistence_manager.save_graph_store(self.graph_store) + logger.debug("异步保存图数据成功") + except Exception as e: + logger.error(f"异步保存图数据失败: {e}", exc_info=True) diff --git a/src/plugins/built_in/affinity_flow_chatter/planner/plan_executor.py b/src/plugins/built_in/affinity_flow_chatter/planner/plan_executor.py index 3120917ee..cc03c0656 100644 --- a/src/plugins/built_in/affinity_flow_chatter/planner/plan_executor.py +++ b/src/plugins/built_in/affinity_flow_chatter/planner/plan_executor.py @@ -275,10 +275,6 @@ class ChatterPlanExecutor: # 构建回复动作参数 action_data = action_info.action_data or {} - # 如果action_info中有should_quote_reply且action_data中没有,则添加到action_data中 - if action_info.should_quote_reply is not None and "should_quote_reply" not in action_data: - action_data["should_quote_reply"] = action_info.should_quote_reply - action_params = { "chat_id": plan.chat_id, "target_message": action_info.action_message, @@ -287,11 +283,6 @@ class ChatterPlanExecutor: "clear_unread_messages": clear_unread, } - logger.debug( - f"📬 [PlanExecutor] 准备调用 ActionManager,target_message: {action_info.action_message}, " - f"should_quote_reply: {action_info.should_quote_reply}" - ) - # 通过动作管理器执行回复 execution_result = await self.action_manager.execute_action( action_name=action_info.action_type, **action_params diff --git a/src/plugins/built_in/affinity_flow_chatter/planner/plan_filter.py b/src/plugins/built_in/affinity_flow_chatter/planner/plan_filter.py index efabbf4a5..9c1cb7f80 100644 --- a/src/plugins/built_in/affinity_flow_chatter/planner/plan_filter.py +++ b/src/plugins/built_in/affinity_flow_chatter/planner/plan_filter.py @@ -9,10 +9,7 @@ from datetime import datetime from typing import Any import orjson -from json_repair import repair_json -# 旧的Hippocampus系统已被移除,现在使用增强记忆系统 -# from src.chat.memory_system.enhanced_memory_manager import enhanced_memory_manager from src.chat.utils.chat_message_builder import ( build_readable_messages_with_id, ) @@ -22,7 +19,8 @@ from src.common.logger import get_logger from src.config.config import global_config, model_config from src.llm_models.utils_model import LLMRequest from src.mood.mood_manager import mood_manager -from src.plugin_system.base.component_types import ActionInfo, ChatMode, ChatType +from json_repair import repair_json +from src.plugin_system.base.component_types import ActionInfo, ChatType from src.schedule.schedule_manager import schedule_manager logger = get_logger("plan_filter") @@ -47,10 +45,12 @@ class ChatterPlanFilter: """ self.chat_id = chat_id self.available_actions = available_actions - self.planner_llm = LLMRequest(model_set=model_config.model_task_config.planner, request_type="planner") + self.planner_llm = LLMRequest( + model_set=model_config.model_task_config.planner, request_type="planner" + ) self.last_obs_time_mark = 0.0 - async def filter(self, reply_not_available: bool, plan: Plan) -> Plan: + async def filter(self, plan: Plan) -> Plan: """ 执行筛选逻辑,并填充 Plan 对象的 decided_actions 字段。 """ @@ -58,55 +58,28 @@ class ChatterPlanFilter: prompt, used_message_id_list = await self._build_prompt(plan) plan.llm_prompt = prompt if global_config.debug.show_prompt: - logger.info(f"规划器原始提示词:{prompt}") # 叫你不要改你耳朵聋吗😡😡😡😡😡 + logger.info( + f"规划器原始提示词:{prompt}" + ) # 叫你不要改你耳朵聋吗😡😡😡😡😡 - llm_content, _ = await self.planner_llm.generate_response_async(prompt=prompt) + llm_content, _ = await self.planner_llm.generate_response_async( + prompt=prompt + ) if llm_content: if global_config.debug.show_prompt: logger.debug(f"LLM规划器原始响应:{llm_content}") - try: - parsed_json = orjson.loads(repair_json(llm_content)) - except orjson.JSONDecodeError: - parsed_json = { - "thinking": "", - "actions": {"action_type": "no_action", "reason": "返回内容无法解析为JSON"}, - } - if "reply" in plan.available_actions and reply_not_available: - # 如果reply动作不可用,但llm返回的仍然有reply,则改为no_reply - if isinstance(parsed_json, dict): - actions_obj = parsed_json.get("actions", {}) - # actions 可能是字典或列表 - if isinstance(actions_obj, dict) and actions_obj.get("action_type", "") == "reply": - parsed_json["actions"]["action_type"] = "no_reply" - elif isinstance(actions_obj, list): - for action_item in actions_obj: - if isinstance(action_item, dict) and action_item.get("action_type", "") == "reply": - action_item["action_type"] = "no_reply" - if "reason" in action_item: - action_item["reason"] += " (但由于兴趣度不足,reply动作不可用,已改为no_reply)" - elif isinstance(parsed_json, list): - for item in parsed_json: - if isinstance(item, dict): - actions_obj = item.get("actions", {}) - if isinstance(actions_obj, dict) and actions_obj.get("action_type", "") == "reply": - item["actions"]["action_type"] = "no_reply" - elif isinstance(actions_obj, list): - for action_item in actions_obj: - if isinstance(action_item, dict) and action_item.get("action_type", "") == "reply": - action_item["action_type"] = "no_reply" - if "reason" in action_item: - action_item["reason"] += " (但由于兴趣度不足,reply动作不可用,已改为no_reply)" + # 尝试修复JSON格式 + repaired_content = repair_json(llm_content) + parsed_json = orjson.loads(repaired_content) + # 确保parsed_json是列表格式 if isinstance(parsed_json, dict): parsed_json = [parsed_json] if isinstance(parsed_json, list): final_actions = [] - reply_action_added = False - # 定义回复类动作的集合,方便扩展 - reply_action_types = {"reply", "proactive_reply"} for item in parsed_json: if not isinstance(item, dict): @@ -114,7 +87,7 @@ class ChatterPlanFilter: # 预解析 action_type 来进行判断 thinking = item.get("thinking", "未提供思考过程") - actions_obj = item.get("actions", {}) + actions_obj = item.get("actions", []) # 记录决策历史 if ( @@ -135,41 +108,49 @@ class ChatterPlanFilter: ] if thinking != "未提供思考过程" and action_types_to_log: - await self._add_decision_to_history(plan, thinking, ", ".join(action_types_to_log)) + await self._add_decision_to_history( + plan, thinking, ", ".join(action_types_to_log) + ) - # 处理actions字段可能是字典或列表的情况 - if isinstance(actions_obj, dict): - action_type = actions_obj.get("action_type", "no_action") - elif isinstance(actions_obj, list) and actions_obj: - # 如果是列表,取第一个元素的action_type - first_action = actions_obj[0] - if isinstance(first_action, dict): - action_type = first_action.get("action_type", "no_action") + # 严格按照新格式处理actions列表 + if isinstance(actions_obj, list) and actions_obj: + if len(actions_obj) == 0: + plan.decided_actions = [ + ActionPlannerInfo( + action_type="no_action", reasoning="未提供动作" + ) + ] else: - action_type = "no_action" - else: - action_type = "no_action" - - if action_type in reply_action_types: - if not reply_action_added: - final_actions.extend(await self._parse_single_action(item, used_message_id_list, plan)) - reply_action_added = True - else: - # 非回复类动作直接添加 - final_actions.extend(await self._parse_single_action(item, used_message_id_list, plan)) + # 处理每个动作 + for single_action in actions_obj: + if isinstance(single_action, dict): + final_actions.append( + await self._parse_single_action( + single_action, + used_message_id_list, + plan, + ) + ) if thinking and thinking != "未提供思考过程": - logger.info(f"\n{SAKURA_PINK}思考: {thinking}{RESET_COLOR}\n") - plan.decided_actions = self._filter_no_actions(final_actions) + logger.info( + f"\n{SAKURA_PINK}思考: {thinking}{RESET_COLOR}\n" + ) + + plan.decided_actions = final_actions except Exception as e: logger.error(f"筛选 Plan 时出错: {e}\n{traceback.format_exc()}") - plan.decided_actions = [ActionPlannerInfo(action_type="no_action", reasoning=f"筛选时出错: {e}")] + plan.decided_actions = [ + ActionPlannerInfo(action_type="no_action", reasoning=f"筛选时出错: {e}") + ] # 在返回最终计划前,打印将要执行的动作 if plan.decided_actions: action_types = [action.action_type for action in plan.decided_actions] - logger.info(f"选择动作: [{SKY_BLUE}{', '.join(action_types) if action_types else '无'}{RESET_COLOR}]") + logger.info( + f"选择动作: [{SKY_BLUE}{', '.join(action_types) if action_types else '无'}{RESET_COLOR}]" + ) return plan @@ -195,7 +176,9 @@ class ChatterPlanFilter: context.decision_history.append(new_record) # 获取历史长度限制 - max_history_length = getattr(global_config.chat, "decision_history_length", 3) + max_history_length = getattr( + global_config.chat, "decision_history_length", 3 + ) # 如果历史记录超过长度,则移除最旧的记录 if len(context.decision_history) > max_history_length: @@ -208,7 +191,10 @@ class ChatterPlanFilter: async def _build_decision_history_block(self, plan: Plan) -> str: """构建决策历史块""" - if not hasattr(global_config.chat, "enable_decision_history") or not global_config.chat.enable_decision_history: + if ( + not hasattr(global_config.chat, "enable_decision_history") + or not global_config.chat.enable_decision_history + ): return "" try: from src.chat.message_receive.chat_stream import get_chat_manager @@ -223,8 +209,10 @@ class ChatterPlanFilter: return "" history_records = [] - for i, record in enumerate(context.decision_history): - history_records.append(f"- 思考: {record.thought}\n - 动作: {record.action}") + for record in context.decision_history: + history_records.append( + f"- 思考: {record.thought}\n - 动作: {record.action}" + ) history_str = "\n".join(history_records) return f"{history_str}" @@ -240,10 +228,14 @@ class ChatterPlanFilter: time_block = f"当前时间:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}" bot_name = global_config.bot.nickname bot_nickname = ( - f",也有人叫你{','.join(global_config.bot.alias_names)}" if global_config.bot.alias_names else "" + f",也有人叫你{','.join(global_config.bot.alias_names)}" + if global_config.bot.alias_names + else "" ) bot_core_personality = global_config.personality.personality_core - identity_block = f"你的名字是{bot_name}{bot_nickname},你{bot_core_personality}:" + identity_block = ( + f"你的名字是{bot_name}{bot_nickname},你{bot_core_personality}:" + ) schedule_block = "" if global_config.planning_system.schedule_enable: @@ -261,8 +253,8 @@ class ChatterPlanFilter: decision_history_block = await self._build_decision_history_block(plan) # 构建已读/未读历史消息 - read_history_block, unread_history_block, message_id_list = await self._build_read_unread_history_blocks( - plan + read_history_block, unread_history_block, message_id_list = ( + await self._build_read_unread_history_blocks(plan) ) actions_before_now_block = "" @@ -275,81 +267,72 @@ class ChatterPlanFilter: if global_config.chat.at_bot_inevitable_reply: mentioned_bonus = "\n- 有人提到你,或者at你" - if plan.mode == ChatMode.FOCUS: - no_action_block = """ -动作:no_action -动作描述:不选择任何动作 -{{ - "action": "no_action", - "reason":"不动作的原因" -}} - -动作:no_reply -动作描述:不进行回复,等待合适的回复时机 -- 当你刚刚发送了消息,没有人回复时,选择no_reply -- 当你一次发送了太多消息,为了避免打扰聊天节奏,选择no_reply -- 在认为对方话没有讲完的时候选择这个 -{{ - "action": "no_reply", - "reason":"不回复的原因" -}} -""" - else: # normal Mode - no_action_block = """重要说明: -- 'reply' 表示只进行普通聊天回复,不执行任何额外动作 -- 其他action表示在普通回复的基础上,执行相应的额外动作 -{{ - "action": "reply", - "target_message_id":"触发action的消息id", - "reason":"回复的原因" -}}""" + # 移除no_reply/no_action提示词,如果actions是空列表则自动设置为no_action + no_action_block = "" is_group_chat = plan.chat_type == ChatType.GROUP chat_context_description = "你现在正在一个群聊中" if not is_group_chat and plan.target_info: - chat_target_name = plan.target_info.person_name or plan.target_info.user_nickname or "对方" + chat_target_name = ( + plan.target_info.person_name + or plan.target_info.user_nickname + or "对方" + ) chat_context_description = f"你正在和 {chat_target_name} 私聊" - action_options_block = await self._build_action_options(plan.available_actions) + action_options_block = await self._build_action_options( + plan.available_actions + ) moderation_prompt_block = "请不要输出违法违规内容,不要输出色情,暴力,政治相关内容,如有敏感内容,请规避。" custom_prompt_block = "" if global_config.custom_prompt.planner_custom_prompt_content: - custom_prompt_block = global_config.custom_prompt.planner_custom_prompt_content + custom_prompt_block = ( + global_config.custom_prompt.planner_custom_prompt_content + ) users_in_chat_str = "" # TODO: Re-implement user list fetching if needed - planner_prompt_template = await global_prompt_manager.get_prompt_async("planner_prompt") - prompt = planner_prompt_template.format( - schedule_block=schedule_block, - mood_block=mood_block, - time_block=time_block, - chat_context_description=chat_context_description, - decision_history_block=decision_history_block, - read_history_block=read_history_block, - unread_history_block=unread_history_block, - actions_before_now_block=actions_before_now_block, - mentioned_bonus=mentioned_bonus, - no_action_block=no_action_block, - action_options_text=action_options_block, - moderation_prompt=moderation_prompt_block, - identity_block=identity_block, - custom_prompt_block=custom_prompt_block, - bot_name=bot_name, - users_in_chat=users_in_chat_str, + planner_prompt_template = await global_prompt_manager.get_prompt_async( + "planner_prompt" ) + + # Prepare format parameters + format_params = { + "schedule_block": schedule_block, + "mood_block": mood_block, + "time_block": time_block, + "chat_context_description": chat_context_description, + "decision_history_block": decision_history_block, + "read_history_block": read_history_block, + "unread_history_block": unread_history_block, + "actions_before_now_block": actions_before_now_block, + "mentioned_bonus": mentioned_bonus, + "no_action_block": no_action_block, + "action_options_text": action_options_block, + "moderation_prompt": moderation_prompt_block, + "identity_block": identity_block, + "custom_prompt_block": custom_prompt_block, + "bot_name": bot_name, + "users_in_chat": users_in_chat_str, + } + prompt = planner_prompt_template.format(**format_params) return prompt, message_id_list except Exception as e: logger.error(f"构建 Planner 提示词时出错: {e}") logger.error(traceback.format_exc()) return "构建 Planner Prompt 时出错", [] - async def _build_read_unread_history_blocks(self, plan: Plan) -> tuple[str, str, list]: + async def _build_read_unread_history_blocks( + self, plan: Plan + ) -> tuple[str, str, list]: """构建已读/未读历史消息块""" try: # 从message_manager获取真实的已读/未读消息 - from src.chat.utils.chat_message_builder import get_raw_msg_before_timestamp_with_chat + from src.chat.utils.chat_message_builder import ( + get_raw_msg_before_timestamp_with_chat, + ) from src.chat.utils.utils import assign_message_ids # 获取聊天流的上下文 @@ -364,7 +347,9 @@ class ChatterPlanFilter: stream_context = chat_stream.context_manager # 获取真正的已读和未读消息 - read_messages = stream_context.context.history_messages # 已读消息存储在history_messages中 + read_messages = ( + stream_context.context.history_messages + ) # 已读消息存储在history_messages中 if not read_messages: from src.common.data_models.database_data_model import DatabaseMessages @@ -375,13 +360,15 @@ class ChatterPlanFilter: limit=global_config.chat.max_context_size, ) # 将字典转换为DatabaseMessages对象 - read_messages = [DatabaseMessages(**msg_dict) for msg_dict in fallback_messages_dicts] + read_messages = [ + DatabaseMessages(**msg_dict) for msg_dict in fallback_messages_dicts + ] unread_messages = stream_context.get_unread_messages() # 获取未读消息 # 构建已读历史消息块 if read_messages: - read_content, read_ids = await build_readable_messages_with_id( + read_content, _ = await build_readable_messages_with_id( messages=[msg.flatten() for msg in read_messages[-50:]], # 限制数量 timestamp_mode="normal_no_YMD", truncate=False, @@ -398,7 +385,9 @@ class ChatterPlanFilter: flattened_unread = [msg.flatten() for msg in unread_messages] # 尝试获取兴趣度评分(返回以真实 message_id 为键的字典) - interest_scores = await self._get_interest_scores_for_messages(flattened_unread) + interest_scores = await self._get_interest_scores_for_messages( + flattened_unread + ) # 为未读消息分配短 id(保持与 build_readable_messages_with_id 的一致结构) message_id_list = assign_message_ids(flattened_unread) @@ -411,16 +400,22 @@ class ChatterPlanFilter: if not real_msg_id: continue # 如果消息没有ID,则跳过 - msg_time = time.strftime("%H:%M:%S", time.localtime(msg.get("time", time.time()))) + msg_time = time.strftime( + "%H:%M:%S", time.localtime(msg.get("time", time.time())) + ) user_nickname = msg.get("user_nickname", "未知用户") msg_content = msg.get("processed_plain_text", "") # 获取兴趣度信息并显示在提示词中 interest_score = interest_scores.get(real_msg_id, 0.0) - interest_text = f" [兴趣度: {interest_score:.3f}]" if interest_score > 0 else "" + interest_text = ( + f" [兴趣度: {interest_score:.3f}]" if interest_score > 0 else "" + ) # 在未读消息中显示兴趣度,让planner优先选择兴趣度高的消息 - unread_lines.append(f"<{synthetic_id}> {msg_time} {user_nickname}: {msg_content}{interest_text}") + unread_lines.append( + f"<{synthetic_id}> {msg_time} {user_nickname}: {msg_content}{interest_text}" + ) unread_history_block = "\n".join(unread_lines) else: @@ -432,7 +427,9 @@ class ChatterPlanFilter: logger.error(f"构建已读/未读历史消息块时出错: {e}") return "构建已读历史消息时出错", "构建未读历史消息时出错", [] - async def _get_interest_scores_for_messages(self, messages: list[dict]) -> dict[str, float]: + async def _get_interest_scores_for_messages( + self, messages: list[dict] + ) -> dict[str, float]: """为消息获取兴趣度评分""" interest_scores = {} @@ -447,7 +444,9 @@ class ChatterPlanFilter: # 构建兴趣度字典 interest_scores[msg_dict.get("message_id", "")] = interest_score - logger.debug(f"使用消息预计算兴趣值: {interest_score:.3f}, should_reply: {should_reply}") + logger.debug( + f"使用消息预计算兴趣值: {interest_score:.3f}, should_reply: {should_reply}" + ) except Exception as e: logger.warning(f"获取消息预计算兴趣值失败: {e}") @@ -461,116 +460,58 @@ class ChatterPlanFilter: async def _parse_single_action( self, action_json: dict, message_id_list: list, plan: Plan - ) -> list[ActionPlannerInfo]: - parsed_actions = [] + ) -> ActionPlannerInfo: try: - # 从新的actions结构中获取动作信息 - actions_obj = action_json.get("actions", {}) + action: str = action_json.get("action_type", "no_action") + reasoning: str = action_json.get("reasoning", "") + action_data: dict = action_json.get("action_data", {}) - # 处理actions字段可能是字典或列表的情况 - actions_to_process = [] - if isinstance(actions_obj, dict): - actions_to_process.append(actions_obj) - elif isinstance(actions_obj, list): - actions_to_process.extend(actions_obj) + # 严格按照标准格式,如果没有action_data则使用空对象 + if not action_data: + action_data = {} - if not actions_to_process: - actions_to_process.append({"action_type": "no_action", "reason": "actions格式错误"}) + target_message_obj = None + if "target_message_id" in action_data: + # 处理 target_message_id,支持多种格式 + original_target_id = action_data.get("target_message_id") - for single_action_obj in actions_to_process: - if not isinstance(single_action_obj, dict): - continue + if original_target_id: + # 记录原始ID用于调试 + logger.debug(f"[{action}] 尝试查找目标消息: {original_target_id}") - action = single_action_obj.get("action_type", "no_action") - reasoning = single_action_obj.get("reasoning", "未提供原因") # 兼容旧的reason字段 - action_data = single_action_obj.get("action_data", {}) + # 使用统一的查找函数 + target_message_dict = self._find_message_by_id( + original_target_id, message_id_list + ) - # 为了向后兼容,如果action_data不存在,则从顶层字段获取 - if not action_data: - action_data = { - k: v - for k, v in single_action_obj.items() - if k not in ["action_type", "reason", "reasoning", "thinking"] - } + if not target_message_dict: + logger.warning( + f"[{action}] 未找到目标消息: {original_target_id}" + ) + # 统一使用最新消息作为兜底 + target_message_dict = self._get_latest_message(message_id_list) + if target_message_dict: + logger.info( + f"[{action}] 使用最新消息作为目标: {target_message_dict.get('message_id')}" + ) + else: + # 如果LLM没有指定target_message_id,统一使用最新消息 + target_message_dict = self._get_latest_message(message_id_list) - # 保留原始的thinking字段(如果有) - thinking = action_json.get("thinking", "") - if thinking and thinking != "未提供思考过程": - action_data["thinking"] = thinking - - # 根据动作定义过滤多余参数 - action_data = self._filter_action_parameters(action, action_data, plan) - - target_message_obj = None - if action not in ["no_action", "no_reply", "do_nothing", "proactive_reply"]: - original_target_id = action_data.get("target_message_id") - - if original_target_id: - # 记录原始ID用于调试 - logger.debug(f"[{action}] 尝试查找目标消息: {original_target_id}") - - # 使用增强的查找函数 - target_message_dict = self._find_message_by_id(original_target_id, message_id_list) - - if not target_message_dict: - logger.warning(f"[{action}] 未找到目标消息: {original_target_id}") - - # 根据动作类型采用不同的恢复策略 - if action == "reply": - # reply动作必须有目标消息,使用最新消息作为兜底 - target_message_dict = self._get_latest_message(message_id_list) - if target_message_dict: - logger.info( - f"[{action}] 使用最新消息作为目标: {target_message_dict.get('message_id')}" - ) - else: - logger.error(f"[{action}] 无法找到任何目标消息,降级为no_action") - action = "no_action" - reasoning = f"无法找到目标消息进行回复。原始理由: {reasoning}" - - elif action in ["poke_user", "set_emoji_like"]: - # 这些动作可以尝试其他策略 - target_message_dict = self._find_poke_notice( - message_id_list - ) or self._get_latest_message(message_id_list) - if target_message_dict: - logger.info( - f"[{action}] 使用替代消息作为目标: {target_message_dict.get('message_id')}" - ) - - else: - # 其他动作使用最新消息或跳过 - target_message_dict = self._get_latest_message(message_id_list) - if target_message_dict: - logger.info( - f"[{action}] 使用最新消息作为目标: {target_message_dict.get('message_id')}" - ) - else: - # 如果LLM没有指定target_message_id,进行特殊处理 - if action == "poke_user": - # 对于poke_user,尝试找到触发它的那条戳一戳消息 - target_message_dict = self._find_poke_notice(message_id_list) - if not target_message_dict: - # 如果找不到,再使用最新消息作为兜底 - target_message_dict = self._get_latest_message(message_id_list) - else: - # 其他动作,默认选择最新的一条消息 - target_message_dict = self._get_latest_message(message_id_list) - - if target_message_dict: - target_message_obj = target_message_dict - # 替换action_data中的临时ID为真实ID - if "target_message_id" in action_data: - real_message_id = target_message_dict.get("message_id") or target_message_dict.get("id") - if real_message_id: - action_data["target_message_id"] = real_message_id - logger.debug(f"[{action}] 更新目标消息ID: {original_target_id} -> {real_message_id}") - else: - logger.warning(f"[{action}] 最终未找到任何可用的目标消息") - if action == "reply": - # reply动作如果没有目标消息,降级为no_action - action = "no_action" - reasoning = f"无法找到目标消息进行回复。原始理由: {reasoning}" + if target_message_dict: + target_message_obj = target_message_dict + # 更新 action_data 中的 target_message_id 为真实 ID + real_message_id = target_message_dict.get( + "message_id" + ) or target_message_dict.get("id") + if real_message_id: + action_data["target_message_id"] = real_message_id + logger.debug( + f"[{action}] 更新目标消息ID: {original_target_id} -> {real_message_id}" + ) + else: + # 严格按照标准格式,找不到目标消息则记录错误但不降级 + logger.error(f"[{action}] 最终未找到任何可用的目标消息") # 转换为 DatabaseMessages 对象 from src.common.data_models.database_data_model import DatabaseMessages @@ -578,238 +519,96 @@ class ChatterPlanFilter: action_message_obj = None if target_message_obj: # 确保字典中有 message_id 字段 - if "message_id" not in target_message_obj and "id" in target_message_obj: + if ( + "message_id" not in target_message_obj + and "id" in target_message_obj + ): target_message_obj["message_id"] = target_message_obj["id"] try: # 使用 ** 解包字典传入构造函数 action_message_obj = DatabaseMessages(**target_message_obj) - logger.debug(f"[{action}] 成功转换目标消息为 DatabaseMessages 对象: {action_message_obj.message_id}") - except Exception as e: - logger.warning(f"[{action}] 无法将目标消息转换为 DatabaseMessages 对象: {e}", exc_info=True) - # 如果转换失败,对于必需目标消息的动作降级为 no_action - if action == "reply": - action = "no_action" - reasoning = f"目标消息转换失败: {e}。原始理由: {reasoning}" - else: - # 如果找不到目标消息,对于reply动作来说这是必需的,应该记录警告 - if action == "reply": - logger.warning( - f"reply动作找不到目标消息,target_message_id: {action_data.get('target_message_id')}" + logger.debug( + f"[{action}] 成功转换目标消息为 DatabaseMessages 对象: {action_message_obj.message_id}" + ) + except Exception as e: + logger.error( + f"[{action}] 无法将目标消息转换为 DatabaseMessages 对象: {e}", + exc_info=True, + ) + else: + # 严格按照标准格式,找不到目标消息则记录错误 + if action != "no_action": + logger.error( + f"[{action}] 找不到目标消息,target_message_id: {action_data.get('target_message_id')}" ) - # 将reply动作改为no_action,避免后续执行时出错 - action = "no_action" - reasoning = f"找不到目标消息进行回复。原始理由: {reasoning}" - - if ( - action not in ["no_action", "no_reply", "reply", "do_nothing", "proactive_reply"] - and action not in plan.available_actions - ): - reasoning = f"LLM 返回了当前不可用的动作 '{action}'。原始理由: {reasoning}" - action = "no_action" # 从action_data中提取should_quote_reply参数 - should_quote_reply = action_data.get("should_quote_reply", None) - # 将should_quote_reply转换为布尔值(如果是字符串的话) - if isinstance(should_quote_reply, str): - should_quote_reply = should_quote_reply.lower() in ["true", "1", "yes"] - elif not isinstance(should_quote_reply, bool): + should_quote_reply = action_data.get("should_quote_reply") + # 严格按照标准格式,只接受布尔值 + if not isinstance(should_quote_reply, bool): should_quote_reply = None - parsed_actions.append( - ActionPlannerInfo( - action_type=action, - reasoning=reasoning, - action_data=action_data, - action_message=action_message_obj, # 使用转换后的 DatabaseMessages 对象 - available_actions=plan.available_actions, - should_quote_reply=should_quote_reply, # 传递should_quote_reply参数 - ) + return ActionPlannerInfo( + action_type=action, + reasoning=reasoning, + action_data=action_data, + action_message=action_message_obj, # 使用转换后的 DatabaseMessages 对象 + available_actions=plan.available_actions, + ) + else: + return ActionPlannerInfo( + action_type=action, + reasoning=reasoning, + action_data=action_data, ) except Exception as e: logger.error(f"解析单个action时出错: {e}") - parsed_actions.append( - ActionPlannerInfo( - action_type="no_action", - reasoning=f"解析action时出错: {e}", - ) - ) - return parsed_actions - - def _filter_action_parameters(self, action_name: str, action_data: dict, plan: Plan) -> dict: - """根据动作定义过滤多余的参数 - - Args: - action_name: 动作名称 - action_data: LLM返回的动作参数 - plan: Plan对象,用于获取可用动作信息 - - Returns: - 过滤后的参数字典 - """ - # 获取该动作的定义 - action_info = plan.available_actions.get(action_name) - if not action_info: - logger.debug(f"动作 {action_name} 不在可用动作列表中,保留所有参数") - return action_data - - # 获取该动作定义的合法参数 - defined_params = set(action_info.action_parameters.keys()) - - # 合法参数集合 - valid_params = defined_params - - # 过滤参数 - filtered_data = {} - removed_params = [] - - for key, value in action_data.items(): - if key in valid_params: - filtered_data[key] = value - else: - removed_params.append(key) - - # 记录被移除的参数 - if removed_params: - logger.info( - f"🧹 [参数过滤] 动作 '{action_name}' 移除了多余参数: {removed_params}. " - f"合法参数: {sorted(valid_params)}" + return ActionPlannerInfo( + action_type="no_action", + reasoning=f"解析action时出错: {e}", ) - return filtered_data - - def _filter_no_actions(self, action_list: list[ActionPlannerInfo]) -> list[ActionPlannerInfo]: - non_no_actions = [a for a in action_list if a.action_type not in ["no_action", "no_reply"]] - if non_no_actions: - return non_no_actions - return action_list[:1] if action_list else [] - - async def _get_long_term_memory_context(self) -> str: - try: - now = datetime.now() - keywords = ["今天", "日程", "计划"] - if 5 <= now.hour < 12: - keywords.append("早上") - elif 12 <= now.hour < 18: - keywords.append("中午") - else: - keywords.append("晚上") - - # 使用记忆图系统检索记忆 - try: - from src.memory_graph.manager_singleton import get_memory_manager - - memory_manager = get_memory_manager() - if not memory_manager: - return "记忆系统未初始化。" - - # 将关键词转换为查询字符串 - query = " ".join(keywords) - enhanced_memories = await memory_manager.search_memories( - query=query, - top_k=5, # AFC 场景使用较少记忆,避免干扰规划 - use_multi_query=False, # 直接使用关键词查询 - ) - - if not enhanced_memories: - return "最近没有什么特别的记忆。" - - # 转换格式以兼容现有代码 - retrieved_memories = [] - for memory in enhanced_memories: - # 从记忆图的节点中提取内容 - content_parts = [] - for node in memory.nodes: - if node.content: - content_parts.append(node.content) - content = " ".join(content_parts) if content_parts else "无内容" - memory_type = memory.memory_type.value - retrieved_memories.append((memory_type, content)) - - memory_statements = [ - f"关于'{topic}', 你记得'{memory_item}'。" for topic, memory_item in retrieved_memories - ] - - except Exception as e: - logger.warning(f"增强记忆系统检索失败,使用默认回复: {e}") - return "最近没有什么特别的记忆。" - return " ".join(memory_statements) - except Exception as e: - logger.error(f"获取长期记忆时出错: {e}") - return "回忆时出现了一些问题。" - - async def _build_action_options(self, current_available_actions: dict[str, ActionInfo]) -> str: + async def _build_action_options( + self, current_available_actions: dict[str, ActionInfo] + ) -> str: action_options_block = "" for action_name, action_info in current_available_actions.items(): - # 构建参数的JSON示例 - params_json_list = [] - if action_info.action_parameters: - for p_name, p_desc in action_info.action_parameters.items(): - # 为参数描述添加一个通用示例值 - if action_name == "set_emoji_like" and p_name == "emoji": - # 特殊处理set_emoji_like的emoji参数 - from src.plugins.built_in.social_toolkit_plugin.qq_emoji_list import qq_face - - emoji_options = [] - for name in qq_face.values(): - match = re.search(r"\[表情:(.+?)\]", name) - if match: - emoji_options.append(match.group(1)) - example_value = f"<从'{', '.join(emoji_options[:10])}...'中选择一个>" - else: - example_value = f"<{p_desc}>" - params_json_list.append(f' "{p_name}": "{example_value}"') - # 基础动作信息 action_description = action_info.description action_require = "\n".join(f"- {req}" for req in action_info.action_require) - # 构建完整的JSON使用范例 - json_example_lines = [ - " {", - f' "action_type": "{action_name}"', - ] - # 将参数列表合并到JSON示例中 + # 构建参数的JSON示例 + params_json_list = [] + + # 构建完整的action_data JSON示例 + action_data_lines = ["{"] if params_json_list: - # 移除最后一行的逗号 - json_example_lines.extend([line.rstrip(",") for line in params_json_list]) + action_data_lines.extend( + [line.rstrip(",") for line in params_json_list] + ) + action_data_lines.append(" }") + action_data_json = "\n".join(action_data_lines) - json_example_lines.append(' "reason": "<执行该动作的详细原因>"') - json_example_lines.append(" }") + # 使用新的action格式,避免双重花括号 + action_options_block += f"""动作: {action_name} +动作描述: {action_description} +动作使用场景: +{action_require} - # 使用逗号连接内部元素,除了最后一个 - json_parts = [] - for i, line in enumerate(json_example_lines): - # "{" 和 "}" 不需要逗号 - if line.strip() in ["{", "}"]: - json_parts.append(line) - continue +你应该像这样使用它: + {{ + "action_type": "{action_name}", + "reasoning": "<执行该动作的详细原因>", + "action_data": {action_data_json} + }} - # 检查是否是最后一个需要逗号的元素 - is_last_item = True - for next_line in json_example_lines[i + 1 :]: - if next_line.strip() not in ["}"]: - is_last_item = False - break - - if not is_last_item: - json_parts.append(f"{line},") - else: - json_parts.append(line) - - json_example = "\n".join(json_parts) - - # 使用新的、更详细的action_prompt模板 - using_action_prompt = await global_prompt_manager.get_prompt_async("action_prompt_with_example") - action_options_block += using_action_prompt.format( - action_name=action_name, - action_description=action_description, - action_require=action_require, - json_example=json_example, - ) +""" return action_options_block - def _find_message_by_id(self, message_id: str, message_id_list: list) -> dict[str, Any] | None: + def _find_message_by_id( + self, message_id: str, message_id_list: list + ) -> dict[str, Any] | None: """ 增强的消息查找函数,支持多种格式和模糊匹配 兼容大模型可能返回的各种格式变体 @@ -893,10 +692,16 @@ class ChatterPlanFilter: # 检查消息对象中的ID message_obj = item.get("message") if isinstance(message_obj, dict): - orig_mid = message_obj.get("message_id") or message_obj.get("id") - orig_number = re.sub(r"[^0-9]", "", str(orig_mid)) if orig_mid else "" + orig_mid = message_obj.get("message_id") or message_obj.get( + "id" + ) + orig_number = ( + re.sub(r"[^0-9]", "", str(orig_mid)) if orig_mid else "" + ) if orig_number == number_part: - logger.debug(f"模糊匹配成功(消息对象): {candidate} -> {orig_mid}") + logger.debug( + f"模糊匹配成功(消息对象): {candidate} -> {orig_mid}" + ) return message_obj # 5. 兜底策略:返回最新消息 @@ -905,10 +710,14 @@ class ChatterPlanFilter: if isinstance(latest_item, dict): latest_message = latest_item.get("message") if isinstance(latest_message, dict): - logger.warning(f"未找到精确匹配的消息ID {original_id},使用最新消息作为兜底") + logger.warning( + f"未找到精确匹配的消息ID {original_id},使用最新消息作为兜底" + ) return latest_message elif latest_message is not None: - logger.warning(f"未找到精确匹配的消息ID {original_id},使用最新消息作为兜底") + logger.warning( + f"未找到精确匹配的消息ID {original_id},使用最新消息作为兜底" + ) return latest_message logger.warning(f"未找到任何匹配的消息: {original_id} (候选: {candidate_ids})") @@ -918,15 +727,3 @@ class ChatterPlanFilter: if not message_id_list: return None return message_id_list[-1].get("message") - - def _find_poke_notice(self, message_id_list: list) -> dict[str, Any] | None: - """在消息列表中寻找戳一戳的通知消息""" - for item in reversed(message_id_list): - message = item.get("message") - if ( - isinstance(message, dict) - and message.get("type") == "notice" - and "戳" in message.get("processed_plain_text", "") - ): - return message - return None diff --git a/src/plugins/built_in/affinity_flow_chatter/planner/planner.py b/src/plugins/built_in/affinity_flow_chatter/planner/planner.py index 63b9f740a..a0f1229e8 100644 --- a/src/plugins/built_in/affinity_flow_chatter/planner/planner.py +++ b/src/plugins/built_in/affinity_flow_chatter/planner/planner.py @@ -199,7 +199,7 @@ class ChatterActionPlanner: # 6. 筛选 Plan available_actions = list(initial_plan.available_actions.keys()) plan_filter = ChatterPlanFilter(self.chat_id, available_actions) - filtered_plan = await plan_filter.filter(reply_not_available, initial_plan) + filtered_plan = await plan_filter.filter(initial_plan) # 7. 检查是否正在处理相同的目标消息,防止重复回复 target_message_id = None diff --git a/src/plugins/built_in/affinity_flow_chatter/planner/planner_prompts.py b/src/plugins/built_in/affinity_flow_chatter/planner/planner_prompts.py index 7dbdd07d6..1b953d43f 100644 --- a/src/plugins/built_in/affinity_flow_chatter/planner/planner_prompts.py +++ b/src/plugins/built_in/affinity_flow_chatter/planner/planner_prompts.py @@ -72,23 +72,26 @@ def init_prompts(): {action_options_text} ## 输出格式(只输出 JSON,不要多余文本或代码块) +最终输出必须是一个包含 thinking 和 actions 字段的 JSON 对象,其中 actions 必须是一个列表。 + 示例(单动作): ```json {{ "thinking": "在这里写下你的思绪流...", "actions": [ {{ - "action_type": "respond", - "reasoning": "选择该动作的理由", + "action_type": "reply", + "reasoning": "选择该动作的详细理由", "action_data": {{ - "content": "你的回复内容", + "target_message_id": "m124", + "content": "回复内容" }} }} ] }} ``` -示例(多重回复,并行 - 需要区分回复对象时才引用): +示例(多重动作,并行): ```json {{ "thinking": "在这里写下你的思绪流...", @@ -97,14 +100,14 @@ def init_prompts(): "action_type": "reply", "reasoning": "理由A - 这个消息较早且需要明确回复对象", "action_data": {{ - "target_message_id": "m124", - "content": "对A的回复", - "should_quote_reply": true + "target_message_id": "m124", + "content": "对A的回复", + "should_quote_reply": false }} }}, {{ - "action_type": "reply", - "reasoning": "理由B - 这是对最新消息的自然接续", + "action_type": "emoji", + "reasoning": "理由B", "action_data": {{ "target_message_id": "m125", "content": "对B的回复", @@ -116,16 +119,11 @@ def init_prompts(): ``` # 强制规则 -- 需要目标消息的动作(reply/poke_user/set_emoji_like 等),必须提供准确的 target_message_id(来自未读历史里的 标签)。 -- 当动作需要额外参数时,必须在 action_data 中补全。 -- 私聊场景只允许使用 reply;群聊可选用辅助动作。 -- 如果没有合适的目标或无需动作,请输出: -```json -{{ - "thinking": "说明为什么不需要动作/不需要回复", - "actions": [] -}} -``` +- 每个动作块必须包含 action_type、reasoning 和 action_data 三个字段 +- actions 必须是一个列表,即使是单个动作也要放在列表中 +- 如果动作不需要任何参数,则 action_data 为空对象 {{}} +- 需要目标消息的动作,target_message_id 提取统一使用一套流程,没有任何区别对待 +- 如果没有合适的目标或无需动作,请返回空的 actions 列表: "actions": [] {no_action_block} """,