From 502d0b7c5981fabc14dccdf71378dc6f1640afe4 Mon Sep 17 00:00:00 2001 From: Windpicker-owo <3431391539@qq.com> Date: Tue, 23 Sep 2025 14:13:41 +0800 Subject: [PATCH] =?UTF-8?q?refactor(plugins):=20=E5=B0=86=E4=BA=B2?= =?UTF-8?q?=E5=92=8C=E5=8A=9B=E6=B5=81=E6=A8=A1=E5=9D=97=E9=87=8D=E6=9E=84?= =?UTF-8?q?=E4=B8=BA=E6=8F=92=E4=BB=B6=E6=9E=B6=E6=9E=84=E5=B9=B6=E7=A7=BB?= =?UTF-8?q?=E9=99=A4=E6=97=A7=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 本次重构将亲和力流(Affinity Flow)模块从核心聊天系统迁移到插件架构中,主要变更包括: - 删除 src/chat/planner_actions/ 目录下的 plan_executor.py、plan_filter.py 和 planner.py - 更新插件系统组件类型,将 FOCUS 聊天模式改为 GROUP 和 PRIVATE 模式 - 调整亲和力流插件中的模式引用,确保与新的聊天模式枚举保持一致 - 统一亲和力流模块的导入路径,使其完全作为插件运行 BREAKING CHANGE: 移除原有的 FOCUS 聊天模式,改为 GROUP 和 PRIVATE 模式,需要更新相关配置和代码引用 --- src/chat/planner_actions/plan_executor.py | 363 ------------ src/chat/planner_actions/plan_filter.py | 519 ------------------ src/chat/planner_actions/planner.py | 244 -------- src/plugin_system/base/component_types.py | 3 +- .../affinity_flow_chatter/affinity_chatter.py | 2 +- .../affinity_flow_chatter/plan_filter.py | 4 +- .../built_in/affinity_flow_chatter/planner.py | 4 +- 7 files changed, 7 insertions(+), 1132 deletions(-) delete mode 100644 src/chat/planner_actions/plan_executor.py delete mode 100644 src/chat/planner_actions/plan_filter.py delete mode 100644 src/chat/planner_actions/planner.py diff --git a/src/chat/planner_actions/plan_executor.py b/src/chat/planner_actions/plan_executor.py deleted file mode 100644 index 9b551d75d..000000000 --- a/src/chat/planner_actions/plan_executor.py +++ /dev/null @@ -1,363 +0,0 @@ -""" -PlanExecutor: 接收 Plan 对象并执行其中的所有动作。 -集成用户关系追踪机制,自动记录交互并更新关系。 -""" - -import asyncio -import re -import time -from typing import Dict, List - -from src.config.config import global_config -from src.chat.planner_actions.action_manager import ChatterActionManager -from src.common.data_models.info_data_model import Plan, ActionPlannerInfo -from src.common.logger import get_logger - -logger = get_logger("plan_executor") - - -class PlanExecutor: - """ - 增强版PlanExecutor,集成用户关系追踪机制。 - - 功能: - 1. 执行Plan中的所有动作 - 2. 自动记录用户交互并添加到关系追踪 - 3. 分类执行回复动作和其他动作 - 4. 提供完整的执行统计和监控 - """ - - def __init__(self, action_manager: ChatterActionManager): - """ - 初始化增强版PlanExecutor。 - - Args: - action_manager (ChatterActionManager): 用于实际执行各种动作的管理器实例。 - """ - self.action_manager = action_manager - - # 执行统计 - self.execution_stats = { - "total_executed": 0, - "successful_executions": 0, - "failed_executions": 0, - "reply_executions": 0, - "other_action_executions": 0, - "execution_times": [], - } - - # 用户关系追踪引用 - self.relationship_tracker = None - - def set_relationship_tracker(self, relationship_tracker): - """设置关系追踪器""" - self.relationship_tracker = relationship_tracker - - async def execute(self, plan: Plan) -> Dict[str, any]: - """ - 遍历并执行Plan对象中`decided_actions`列表里的所有动作。 - - Args: - plan (Plan): 包含待执行动作列表的Plan对象。 - - Returns: - Dict[str, any]: 执行结果统计信息 - """ - if not plan.decided_actions: - logger.info("没有需要执行的动作。") - return {"executed_count": 0, "results": []} - - execution_results = [] - reply_actions = [] - other_actions = [] - - # 分类动作:回复动作和其他动作 - for action_info in plan.decided_actions: - if action_info.action_type in ["reply", "proactive_reply"]: - reply_actions.append(action_info) - else: - other_actions.append(action_info) - - # 执行回复动作(优先执行) - if reply_actions: - reply_result = await self._execute_reply_actions(reply_actions, plan) - execution_results.extend(reply_result["results"]) - self.execution_stats["reply_executions"] += len(reply_actions) - - # 将其他动作放入后台任务执行,避免阻塞主流程 - if other_actions: - asyncio.create_task(self._execute_other_actions(other_actions, plan)) - logger.info(f"已将 {len(other_actions)} 个其他动作放入后台任务执行。") - # 注意:后台任务的结果不会立即计入本次返回的统计数据 - - # 更新总体统计 - self.execution_stats["total_executed"] += len(plan.decided_actions) - successful_count = sum(1 for r in execution_results if r["success"]) - self.execution_stats["successful_executions"] += successful_count - self.execution_stats["failed_executions"] += len(execution_results) - successful_count - - logger.info( - f"规划执行完成: 总数={len(plan.decided_actions)}, 成功={successful_count}, 失败={len(execution_results) - successful_count}" - ) - - return { - "executed_count": len(plan.decided_actions), - "successful_count": successful_count, - "failed_count": len(execution_results) - successful_count, - "results": execution_results, - } - - async def _execute_reply_actions(self, reply_actions: List[ActionPlannerInfo], plan: Plan) -> Dict[str, any]: - """执行回复动作""" - results = [] - - for action_info in reply_actions: - result = await self._execute_single_reply_action(action_info, plan) - results.append(result) - - return {"results": results} - - async def _execute_single_reply_action(self, action_info: ActionPlannerInfo, plan: Plan) -> Dict[str, any]: - """执行单个回复动作""" - start_time = time.time() - success = False - error_message = "" - reply_content = "" - - try: - logger.info(f"执行回复动作: {action_info.action_type} (原因: {action_info.reasoning})") - - # 获取用户ID - 兼容对象和字典 - if hasattr(action_info.action_message, "user_info"): - user_id = action_info.action_message.user_info.user_id - else: - user_id = action_info.action_message.get("user_info", {}).get("user_id") - - if user_id == str(global_config.bot.qq_account): - logger.warning("尝试回复自己,跳过此动作以防止死循环。") - return { - "action_type": action_info.action_type, - "success": False, - "error_message": "尝试回复自己,跳过此动作以防止死循环。", - "execution_time": 0, - "reasoning": action_info.reasoning, - "reply_content": "", - } - # 构建回复动作参数 - action_params = { - "chat_id": plan.chat_id, - "target_message": action_info.action_message, - "reasoning": action_info.reasoning, - "action_data": action_info.action_data or {}, - } - - # 通过动作管理器执行回复 - reply_content = await self.action_manager.execute_action( - action_name=action_info.action_type, **action_params - ) - - success = True - logger.info(f"回复动作 '{action_info.action_type}' 执行成功。") - - except Exception as e: - error_message = str(e) - logger.error(f"执行回复动作失败: {action_info.action_type}, 错误: {error_message}") - - # 记录用户关系追踪 - if success and action_info.action_message: - await self._track_user_interaction(action_info, plan, reply_content) - - execution_time = time.time() - start_time - self.execution_stats["execution_times"].append(execution_time) - - return { - "action_type": action_info.action_type, - "success": success, - "error_message": error_message, - "execution_time": execution_time, - "reasoning": action_info.reasoning, - "reply_content": reply_content[:200] + "..." if len(reply_content) > 200 else reply_content, - } - - async def _execute_other_actions(self, other_actions: List[ActionPlannerInfo], plan: Plan) -> Dict[str, any]: - """执行其他动作""" - results = [] - - # 并行执行其他动作 - tasks = [] - for action_info in other_actions: - task = self._execute_single_other_action(action_info, plan) - tasks.append(task) - - if tasks: - executed_results = await asyncio.gather(*tasks, return_exceptions=True) - for i, result in enumerate(executed_results): - if isinstance(result, Exception): - logger.error(f"执行动作 {other_actions[i].action_type} 时发生异常: {result}") - results.append( - { - "action_type": other_actions[i].action_type, - "success": False, - "error_message": str(result), - "execution_time": 0, - "reasoning": other_actions[i].reasoning, - } - ) - else: - results.append(result) - - return {"results": results} - - async def _execute_single_other_action(self, action_info: ActionPlannerInfo, plan: Plan) -> Dict[str, any]: - """执行单个其他动作""" - start_time = time.time() - success = False - error_message = "" - - try: - logger.info(f"执行其他动作: {action_info.action_type} (原因: {action_info.reasoning})") - - action_data = action_info.action_data or {} - - # 针对 poke_user 动作,特殊处理 - if action_info.action_type == "poke_user": - target_message = action_info.action_message - if target_message: - # 优先直接获取 user_id,这才是最可靠的信息 - user_id = target_message.get("user_id") - if user_id: - action_data["user_id"] = user_id - logger.info(f"检测到戳一戳动作,目标用户ID: {user_id}") - else: - # 如果没有 user_id,再尝试用 user_nickname 作为备用方案 - user_name = target_message.get("user_nickname") - if user_name: - action_data["user_name"] = user_name - logger.info(f"检测到戳一戳动作,目标用户: {user_name}") - else: - logger.warning("无法从戳一戳消息中获取用户ID或昵称。") - - # 传递原始消息ID以支持引用 - action_data["target_message_id"] = target_message.get("message_id") - - # 构建动作参数 - action_params = { - "chat_id": plan.chat_id, - "target_message": action_info.action_message, - "reasoning": action_info.reasoning, - "action_data": action_data, - } - - # 通过动作管理器执行动作 - await self.action_manager.execute_action(action_name=action_info.action_type, **action_params) - - success = True - logger.info(f"其他动作 '{action_info.action_type}' 执行成功。") - - except Exception as e: - error_message = str(e) - logger.error(f"执行其他动作失败: {action_info.action_type}, 错误: {error_message}") - - execution_time = time.time() - start_time - self.execution_stats["execution_times"].append(execution_time) - - return { - "action_type": action_info.action_type, - "success": success, - "error_message": error_message, - "execution_time": execution_time, - "reasoning": action_info.reasoning, - } - - async def _track_user_interaction(self, action_info: ActionPlannerInfo, plan: Plan, reply_content: str): - """追踪用户交互 - 集成回复后关系追踪""" - try: - if not action_info.action_message: - return - - # 获取用户信息 - 处理对象和字典两种情况 - if hasattr(action_info.action_message, "user_info"): - # 对象情况 - user_info = action_info.action_message.user_info - user_id = user_info.user_id - user_name = user_info.user_nickname or user_id - user_message = action_info.action_message.content - else: - # 字典情况 - user_info = action_info.action_message.get("user_info", {}) - user_id = user_info.get("user_id") - user_name = user_info.get("user_nickname") or user_id - user_message = action_info.action_message.get("content", "") - - if not user_id: - logger.debug("跳过追踪:缺少用户ID") - return - - # 如果有设置关系追踪器,执行回复后关系追踪 - if self.relationship_tracker: - # 记录基础交互信息(保持向后兼容) - self.relationship_tracker.add_interaction( - user_id=user_id, - user_name=user_name, - user_message=user_message, - bot_reply=reply_content, - reply_timestamp=time.time(), - ) - - # 执行新的回复后关系追踪 - await self.relationship_tracker.track_reply_relationship( - user_id=user_id, user_name=user_name, bot_reply_content=reply_content, reply_timestamp=time.time() - ) - - logger.debug(f"已执行用户交互追踪: {user_id}") - - except Exception as e: - logger.error(f"追踪用户交互时出错: {e}") - logger.debug(f"action_message类型: {type(action_info.action_message)}") - logger.debug(f"action_message内容: {action_info.action_message}") - - def get_execution_stats(self) -> Dict[str, any]: - """获取执行统计信息""" - stats = self.execution_stats.copy() - - # 计算平均执行时间 - if stats["execution_times"]: - avg_time = sum(stats["execution_times"]) / len(stats["execution_times"]) - stats["average_execution_time"] = avg_time - stats["max_execution_time"] = max(stats["execution_times"]) - stats["min_execution_time"] = min(stats["execution_times"]) - else: - stats["average_execution_time"] = 0 - stats["max_execution_time"] = 0 - stats["min_execution_time"] = 0 - - # 移除执行时间列表以避免返回过大数据 - stats.pop("execution_times", None) - - return stats - - def reset_stats(self): - """重置统计信息""" - self.execution_stats = { - "total_executed": 0, - "successful_executions": 0, - "failed_executions": 0, - "reply_executions": 0, - "other_action_executions": 0, - "execution_times": [], - } - - def get_recent_performance(self, limit: int = 10) -> List[Dict[str, any]]: - """获取最近的执行性能""" - recent_times = self.execution_stats["execution_times"][-limit:] - if not recent_times: - return [] - - return [ - { - "execution_index": i + 1, - "execution_time": time_val, - "timestamp": time.time() - (len(recent_times) - i) * 60, # 估算时间戳 - } - for i, time_val in enumerate(recent_times) - ] diff --git a/src/chat/planner_actions/plan_filter.py b/src/chat/planner_actions/plan_filter.py deleted file mode 100644 index 3dab354a6..000000000 --- a/src/chat/planner_actions/plan_filter.py +++ /dev/null @@ -1,519 +0,0 @@ -""" -PlanFilter: 接收 Plan 对象,根据不同模式的逻辑进行筛选,决定最终要执行的动作。 -""" - -import orjson -import time -import traceback -from datetime import datetime -from typing import Any, Dict, List, Optional - -from json_repair import repair_json - -from src.chat.memory_system.Hippocampus import hippocampus_manager -from src.chat.utils.chat_message_builder import ( - build_readable_actions, - build_readable_messages_with_id, - get_actions_by_timestamp_with_chat, -) -from src.chat.utils.prompt import global_prompt_manager -from src.common.data_models.info_data_model import ActionPlannerInfo, Plan -from src.common.logger import get_logger -from src.config.config import global_config, model_config -from src.llm_models.utils_model import LLMRequest -from src.mood.mood_manager import mood_manager -from src.plugin_system.base.component_types import ActionInfo, ChatMode -from src.schedule.schedule_manager import schedule_manager - -logger = get_logger("plan_filter") - - -class PlanFilter: - """ - 根据 Plan 中的模式和信息,筛选并决定最终的动作。 - """ - - def __init__(self): - self.planner_llm = LLMRequest(model_set=model_config.model_task_config.planner, request_type="planner") - self.last_obs_time_mark = 0.0 - - async def filter(self, reply_not_available: bool, plan: Plan) -> Plan: - """ - 执行筛选逻辑,并填充 Plan 对象的 decided_actions 字段。 - """ - logger.debug(f"墨墨在这里加了日志 -> filter 入口 plan: {plan}") - try: - prompt, used_message_id_list = await self._build_prompt(plan) - plan.llm_prompt = prompt - logger.debug(f"墨墨在这里加了日志 -> LLM prompt: {prompt}") - - llm_content, _ = await self.planner_llm.generate_response_async(prompt=prompt) - - if llm_content: - logger.debug(f"墨墨在这里加了日志 -> LLM a原始返回: {llm_content}") - try: - parsed_json = orjson.loads(repair_json(llm_content)) - except orjson.JSONDecodeError: - parsed_json = {"action": "no_action", "reason": "返回内容无法解析为JSON"} - logger.debug(f"墨墨在这里加了日志 -> 解析后的 JSON: {parsed_json}") - - if "reply" in plan.available_actions and reply_not_available: - # 如果reply动作不可用,但llm返回的仍然有reply,则改为no_reply - if isinstance(parsed_json, dict) and parsed_json.get("action") == "reply": - parsed_json["action"] = "no_reply" - elif isinstance(parsed_json, list): - for item in parsed_json: - if isinstance(item, dict) and item.get("action") == "reply": - item["action"] = "no_reply" - item["reason"] += " (但由于兴趣度不足,reply动作不可用,已改为no_reply)" - - if isinstance(parsed_json, dict): - parsed_json = [parsed_json] - - if isinstance(parsed_json, list): - final_actions = [] - reply_action_added = False - # 定义回复类动作的集合,方便扩展 - reply_action_types = {"reply", "proactive_reply"} - - for item in parsed_json: - if not isinstance(item, dict): - continue - - # 预解析 action_type 来进行判断 - action_type = item.get("action", "no_action") - - if action_type in reply_action_types: - if not reply_action_added: - final_actions.extend(await self._parse_single_action(item, used_message_id_list, plan)) - reply_action_added = True - else: - # 非回复类动作直接添加 - final_actions.extend(await self._parse_single_action(item, used_message_id_list, plan)) - - plan.decided_actions = self._filter_no_actions(final_actions) - - except Exception as e: - logger.error(f"筛选 Plan 时出错: {e}\n{traceback.format_exc()}") - plan.decided_actions = [ActionPlannerInfo(action_type="no_action", reasoning=f"筛选时出错: {e}")] - - logger.debug(f"墨墨在这里加了日志 -> filter 出口 decided_actions: {plan.decided_actions}") - return plan - - async def _build_prompt(self, plan: Plan) -> tuple[str, list]: - """ - 根据 Plan 对象构建提示词。 - """ - try: - time_block = f"当前时间:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}" - bot_name = global_config.bot.nickname - bot_nickname = ( - f",也有人叫你{','.join(global_config.bot.alias_names)}" if global_config.bot.alias_names else "" - ) - bot_core_personality = global_config.personality.personality_core - identity_block = f"你的名字是{bot_name}{bot_nickname},你{bot_core_personality}:" - - schedule_block = "" - if global_config.planning_system.schedule_enable: - if current_activity := schedule_manager.get_current_activity(): - schedule_block = f"你当前正在:{current_activity},但注意它与群聊的聊天无关。" - - mood_block = "" - if global_config.mood.enable_mood: - chat_mood = mood_manager.get_mood_by_chat_id(plan.chat_id) - mood_block = f"你现在的心情是:{chat_mood.mood_state}" - - if plan.mode == ChatMode.PROACTIVE: - long_term_memory_block = await self._get_long_term_memory_context() - - chat_content_block, message_id_list = build_readable_messages_with_id( - messages=[msg.flatten() for msg in plan.chat_history], - timestamp_mode="normal", - truncate=False, - show_actions=False, - ) - - prompt_template = await global_prompt_manager.get_prompt_async("proactive_planner_prompt") - actions_before_now = get_actions_by_timestamp_with_chat( - chat_id=plan.chat_id, - timestamp_start=time.time() - 3600, - timestamp_end=time.time(), - limit=5, - ) - actions_before_now_block = build_readable_actions(actions=actions_before_now) - actions_before_now_block = f"你刚刚选择并执行过的action是:\n{actions_before_now_block}" - - prompt = prompt_template.format( - time_block=time_block, - identity_block=identity_block, - schedule_block=schedule_block, - mood_block=mood_block, - long_term_memory_block=long_term_memory_block, - chat_content_block=chat_content_block or "最近没有聊天内容。", - actions_before_now_block=actions_before_now_block, - ) - return prompt, message_id_list - - # 构建已读/未读历史消息 - read_history_block, unread_history_block, message_id_list = await self._build_read_unread_history_blocks( - plan - ) - - # 为了兼容性,保留原有的chat_content_block - chat_content_block, _ = build_readable_messages_with_id( - messages=[msg.flatten() for msg in plan.chat_history], - timestamp_mode="normal", - read_mark=self.last_obs_time_mark, - truncate=True, - show_actions=True, - ) - - actions_before_now = get_actions_by_timestamp_with_chat( - chat_id=plan.chat_id, - timestamp_start=time.time() - 3600, - timestamp_end=time.time(), - limit=5, - ) - - actions_before_now_block = build_readable_actions(actions=actions_before_now) - actions_before_now_block = f"你刚刚选择并执行过的action是:\n{actions_before_now_block}" - - self.last_obs_time_mark = time.time() - - mentioned_bonus = "" - if global_config.chat.mentioned_bot_inevitable_reply: - mentioned_bonus = "\n- 有人提到你" - if global_config.chat.at_bot_inevitable_reply: - mentioned_bonus = "\n- 有人提到你,或者at你" - - if plan.mode == ChatMode.FOCUS: - no_action_block = """ -动作:no_action -动作描述:不选择任何动作 -{{ - "action": "no_action", - "reason":"不动作的原因" -}} - -动作:no_reply -动作描述:不进行回复,等待合适的回复时机 -- 当你刚刚发送了消息,没有人回复时,选择no_reply -- 当你一次发送了太多消息,为了避免打扰聊天节奏,选择no_reply -{{ - "action": "no_reply", - "reason":"不回复的原因" -}} -""" - else: # NORMAL Mode - no_action_block = """重要说明: -- 'reply' 表示只进行普通聊天回复,不执行任何额外动作 -- 其他action表示在普通回复的基础上,执行相应的额外动作 -{{ - "action": "reply", - "target_message_id":"触发action的消息id", - "reason":"回复的原因" -}}""" - - is_group_chat = plan.target_info.platform == "group" if plan.target_info else True - chat_context_description = "你现在正在一个群聊中" - if not is_group_chat and plan.target_info: - chat_target_name = plan.target_info.person_name or plan.target_info.user_nickname or "对方" - chat_context_description = f"你正在和 {chat_target_name} 私聊" - - action_options_block = await self._build_action_options(plan.available_actions) - - moderation_prompt_block = "请不要输出违法违规内容,不要输出色情,暴力,政治相关内容,如有敏感内容,请规避。" - - custom_prompt_block = "" - if global_config.custom_prompt.planner_custom_prompt_content: - custom_prompt_block = global_config.custom_prompt.planner_custom_prompt_content - - users_in_chat_str = "" # TODO: Re-implement user list fetching if needed - - planner_prompt_template = await global_prompt_manager.get_prompt_async("planner_prompt") - prompt = planner_prompt_template.format( - schedule_block=schedule_block, - mood_block=mood_block, - time_block=time_block, - chat_context_description=chat_context_description, - read_history_block=read_history_block, - unread_history_block=unread_history_block, - actions_before_now_block=actions_before_now_block, - mentioned_bonus=mentioned_bonus, - no_action_block=no_action_block, - action_options_text=action_options_block, - moderation_prompt=moderation_prompt_block, - identity_block=identity_block, - custom_prompt_block=custom_prompt_block, - bot_name=bot_name, - users_in_chat=users_in_chat_str, - ) - return prompt, message_id_list - except Exception as e: - logger.error(f"构建 Planner 提示词时出错: {e}") - logger.error(traceback.format_exc()) - return "构建 Planner Prompt 时出错", [] - - async def _build_read_unread_history_blocks(self, plan: Plan) -> tuple[str, str, list]: - """构建已读/未读历史消息块""" - try: - # 从message_manager获取真实的已读/未读消息 - from src.chat.message_manager.message_manager import message_manager - from src.chat.utils.utils import assign_message_ids - - # 获取聊天流的上下文 - stream_context = message_manager.stream_contexts.get(plan.chat_id) - - # 获取真正的已读和未读消息 - read_messages = stream_context.history_messages # 已读消息存储在history_messages中 - unread_messages = stream_context.get_unread_messages() # 获取未读消息 - - # 构建已读历史消息块 - if read_messages: - read_content, read_ids = build_readable_messages_with_id( - messages=[msg.flatten() for msg in read_messages[-50:]], # 限制数量 - timestamp_mode="normal_no_YMD", - truncate=False, - show_actions=False, - ) - read_history_block = f"{read_content}" - else: - read_history_block = "暂无已读历史消息" - - # 构建未读历史消息块(包含兴趣度) - if unread_messages: - # 扁平化未读消息用于计算兴趣度和格式化 - flattened_unread = [msg.flatten() for msg in unread_messages] - - # 尝试获取兴趣度评分(返回以真实 message_id 为键的字典) - interest_scores = await self._get_interest_scores_for_messages(flattened_unread) - - # 为未读消息分配短 id(保持与 build_readable_messages_with_id 的一致结构) - message_id_list = assign_message_ids(flattened_unread) - - unread_lines = [] - for idx, msg in enumerate(flattened_unread): - mapped = message_id_list[idx] - synthetic_id = mapped.get("id") - original_msg_id = msg.get("message_id") or msg.get("id") - msg_time = time.strftime("%H:%M:%S", time.localtime(msg.get("time", time.time()))) - msg_content = msg.get("processed_plain_text", "") - - # 添加兴趣度信息 - interest_score = interest_scores.get(original_msg_id, 0.0) - interest_text = f" [兴趣度: {interest_score:.3f}]" if interest_score > 0 else "" - - # 在未读行中显示合成id,方便 planner 返回时使用 - unread_lines.append(f"{msg_time} {synthetic_id}: {msg_content}{interest_text}") - - unread_history_block = "\n".join(unread_lines) - else: - unread_history_block = "暂无未读历史消息" - - return read_history_block, unread_history_block, message_id_list - - except Exception as e: - logger.error(f"构建已读/未读历史消息块时出错: {e}") - return "构建已读历史消息时出错", "构建未读历史消息时出错", [] - - async def _get_interest_scores_for_messages(self, messages: List[dict]) -> dict[str, float]: - """为消息获取兴趣度评分""" - interest_scores = {} - - try: - from src.plugins.built_in.affinity_flow_chatter.interest_scoring import chatter_interest_scoring_system - from src.common.data_models.database_data_model import DatabaseMessages - - # 转换消息格式 - db_messages = [] - for msg_dict in messages: - try: - db_msg = DatabaseMessages( - message_id=msg_dict.get("message_id", ""), - time=msg_dict.get("time", time.time()), - chat_id=msg_dict.get("chat_id", ""), - processed_plain_text=msg_dict.get("processed_plain_text", ""), - user_id=msg_dict.get("user_id", ""), - user_nickname=msg_dict.get("user_nickname", ""), - user_platform=msg_dict.get("platform", "qq"), - chat_info_group_id=msg_dict.get("group_id", ""), - chat_info_group_name=msg_dict.get("group_name", ""), - chat_info_group_platform=msg_dict.get("platform", "qq"), - ) - db_messages.append(db_msg) - except Exception as e: - logger.warning(f"转换消息格式失败: {e}") - continue - - # 计算兴趣度评分 - if db_messages: - bot_nickname = global_config.bot.nickname or "麦麦" - scores = await chatter_interest_scoring_system.calculate_interest_scores(db_messages, bot_nickname) - - # 构建兴趣度字典 - for score in scores: - interest_scores[score.message_id] = score.total_score - - except Exception as e: - logger.warning(f"获取兴趣度评分失败: {e}") - - return interest_scores - - async def _parse_single_action( - self, action_json: dict, message_id_list: list, plan: Plan - ) -> List[ActionPlannerInfo]: - parsed_actions = [] - try: - action = action_json.get("action", "no_action") - reasoning = action_json.get("reason", "未提供原因") - action_data = {k: v for k, v in action_json.items() if k not in ["action", "reason"]} - - target_message_obj = None - if action not in ["no_action", "no_reply", "do_nothing", "proactive_reply"]: - if target_message_id := action_json.get("target_message_id"): - target_message_dict = self._find_message_by_id(target_message_id, message_id_list) - else: - # 如果LLM没有指定target_message_id,我们就默认选择最新的一条消息 - target_message_dict = self._get_latest_message(message_id_list) - - if target_message_dict: - # 直接使用字典作为action_message,避免DatabaseMessages对象创建失败 - target_message_obj = target_message_dict - # 替换action_data中的临时ID为真实ID - if "target_message_id" in action_data: - real_message_id = target_message_dict.get("message_id") or target_message_dict.get("id") - if real_message_id: - action_data["target_message_id"] = real_message_id - else: - # 如果找不到目标消息,对于reply动作来说这是必需的,应该记录警告 - if action == "reply": - logger.warning( - f"reply动作找不到目标消息,target_message_id: {action_json.get('target_message_id')}" - ) - # 将reply动作改为no_action,避免后续执行时出错 - action = "no_action" - reasoning = f"找不到目标消息进行回复。原始理由: {reasoning}" - - available_action_names = list(plan.available_actions.keys()) - if ( - action not in ["no_action", "no_reply", "reply", "do_nothing", "proactive_reply"] - and action not in available_action_names - ): - reasoning = f"LLM 返回了当前不可用的动作 '{action}'。原始理由: {reasoning}" - action = "no_action" - - parsed_actions.append( - ActionPlannerInfo( - action_type=action, - reasoning=reasoning, - action_data=action_data, - action_message=target_message_obj, - available_actions=plan.available_actions, - ) - ) - except Exception as e: - logger.error(f"解析单个action时出错: {e}") - parsed_actions.append( - ActionPlannerInfo( - action_type="no_action", - reasoning=f"解析action时出错: {e}", - ) - ) - return parsed_actions - - def _filter_no_actions(self, action_list: List[ActionPlannerInfo]) -> List[ActionPlannerInfo]: - non_no_actions = [a for a in action_list if a.action_type not in ["no_action", "no_reply"]] - if non_no_actions: - return non_no_actions - return action_list[:1] if action_list else [] - - async def _get_long_term_memory_context(self) -> str: - try: - now = datetime.now() - keywords = ["今天", "日程", "计划"] - if 5 <= now.hour < 12: - keywords.append("早上") - elif 12 <= now.hour < 18: - keywords.append("中午") - else: - keywords.append("晚上") - - retrieved_memories = await hippocampus_manager.get_memory_from_topic( - valid_keywords=keywords, max_memory_num=5, max_memory_length=1 - ) - - if not retrieved_memories: - return "最近没有什么特别的记忆。" - - memory_statements = [f"关于'{topic}', 你记得'{memory_item}'。" for topic, memory_item in retrieved_memories] - return " ".join(memory_statements) - except Exception as e: - logger.error(f"获取长期记忆时出错: {e}") - return "回忆时出现了一些问题。" - - async def _build_action_options(self, current_available_actions: Dict[str, ActionInfo]) -> str: - action_options_block = "" - for action_name, action_info in current_available_actions.items(): - param_text = "" - if action_info.action_parameters: - param_text = "\n" + "\n".join( - f' "{p_name}":"{p_desc}"' for p_name, p_desc in action_info.action_parameters.items() - ) - require_text = "\n".join(f"- {req}" for req in action_info.action_require) - using_action_prompt = await global_prompt_manager.get_prompt_async("action_prompt") - action_options_block += using_action_prompt.format( - action_name=action_name, - action_description=action_info.description, - action_parameters=param_text, - action_require=require_text, - ) - return action_options_block - - def _find_message_by_id(self, message_id: str, message_id_list: list) -> Optional[Dict[str, Any]]: - # 兼容多种 message_id 格式:数字、m123、buffered-xxxx - # 如果是纯数字,补上 m 前缀以兼容旧格式 - candidate_ids = {message_id} - if message_id.isdigit(): - candidate_ids.add(f"m{message_id}") - - # 如果是 m 开头且后面是数字,尝试去掉 m 前缀的数字形式 - if message_id.startswith("m") and message_id[1:].isdigit(): - candidate_ids.add(message_id[1:]) - - # 逐项匹配 message_id_list(每项可能为 {'id':..., 'message':...}) - for item in message_id_list: - # 支持 message_id_list 中直接是字符串/ID 的情形 - if isinstance(item, str): - if item in candidate_ids: - # 没有 message 对象,返回None - return None - continue - - if not isinstance(item, dict): - continue - - item_id = item.get("id") - # 直接匹配分配的短 id - if item_id and item_id in candidate_ids: - return item.get("message") - - # 有时 message 存储里会有原始的 message_id 字段(如 buffered-xxxx) - message_obj = item.get("message") - if isinstance(message_obj, dict): - orig_mid = message_obj.get("message_id") or message_obj.get("id") - if orig_mid and orig_mid in candidate_ids: - return message_obj - - # 作为兜底,尝试在 message_id_list 中找到 message.message_id 匹配 - for item in message_id_list: - if isinstance(item, dict) and isinstance(item.get("message"), dict): - mid = item["message"].get("message_id") or item["message"].get("id") - if mid == message_id: - return item["message"] - - return None - - def _get_latest_message(self, message_id_list: list) -> Optional[Dict[str, Any]]: - if not message_id_list: - return None - return message_id_list[-1].get("message") diff --git a/src/chat/planner_actions/planner.py b/src/chat/planner_actions/planner.py deleted file mode 100644 index 65e25a1c0..000000000 --- a/src/chat/planner_actions/planner.py +++ /dev/null @@ -1,244 +0,0 @@ -""" -主规划器入口,负责协调 PlanGenerator, PlanFilter, 和 PlanExecutor。 -集成兴趣度评分系统和用户关系追踪机制,实现智能化的聊天决策。 -""" - -from dataclasses import asdict -from typing import TYPE_CHECKING, Dict, List, Optional, Tuple - -from src.plugin_system.base.component_types import ChatMode -from src.plugins.built_in.affinity_flow_chatter.plan_executor import ChatterPlanExecutor -from src.plugins.built_in.affinity_flow_chatter.plan_filter import ChatterPlanFilter -from src.plugins.built_in.affinity_flow_chatter.plan_generator import ChatterPlanGenerator -from src.plugins.built_in.affinity_flow_chatter.interest_scoring import ChatterInterestScoringSystem -from src.plugins.built_in.affinity_flow_chatter.relationship_tracker import ChatterRelationshipTracker - - -from src.common.logger import get_logger -from src.config.config import global_config - -if TYPE_CHECKING: - from src.chat.planner_actions.action_manager import ActionManager - from src.common.data_models.message_manager_data_model import StreamContext - from src.common.data_models.info_data_model import Plan - -# 导入提示词模块以确保其被初始化 -from src.plugins.built_in.affinity_flow_chatter import planner_prompts # noqa - -logger = get_logger("planner") - - -class ActionPlanner: - """ - 增强版ActionPlanner,集成兴趣度评分和用户关系追踪机制。 - - 核心功能: - 1. 兴趣度评分系统:根据兴趣匹配度、关系分、提及度、时间因子对消息评分 - 2. 用户关系追踪:自动追踪用户交互并更新关系分 - 3. 智能回复决策:基于兴趣度阈值和连续不回复概率的智能决策 - 4. 完整的规划流程:生成→筛选→执行的完整三阶段流程 - """ - - def __init__(self, chat_id: str, action_manager: "ActionManager"): - """ - 初始化增强版ActionPlanner。 - - Args: - chat_id (str): 当前聊天的 ID。 - action_manager (ActionManager): 一个 ActionManager 实例。 - """ - self.chat_id = chat_id - self.action_manager = action_manager - self.generator = ChatterPlanGenerator(chat_id) - self.filter = ChatterPlanFilter() - self.executor = ChatterPlanExecutor(action_manager) - - # 初始化兴趣度评分系统 - self.interest_scoring = ChatterInterestScoringSystem() - - # 创建新的关系追踪器 - self.relationship_tracker = ChatterRelationshipTracker(self.interest_scoring) - logger.info("创建新的关系追踪器实例") - - # 设置执行器的关系追踪器 - self.executor.set_relationship_tracker(self.relationship_tracker) - - # 规划器统计 - self.planner_stats = { - "total_plans": 0, - "successful_plans": 0, - "failed_plans": 0, - "replies_generated": 0, - "other_actions_executed": 0, - } - - async def plan( - self, mode: ChatMode = ChatMode.FOCUS, context: "StreamContext" = None - ) -> Tuple[List[Dict], Optional[Dict]]: - """ - 执行完整的增强版规划流程。 - - Args: - mode (ChatMode): 当前的聊天模式,默认为 FOCUS。 - context (StreamContext): 包含聊天流消息的上下文对象。 - - Returns: - Tuple[List[Dict], Optional[Dict]]: 一个元组,包含: - - final_actions_dict (List[Dict]): 最终确定的动作列表(字典格式)。 - - final_target_message_dict (Optional[Dict]): 最终的目标消息(字典格式)。 - """ - try: - self.planner_stats["total_plans"] += 1 - - return await self._enhanced_plan_flow(mode, context) - - except Exception as e: - logger.error(f"规划流程出错: {e}") - self.planner_stats["failed_plans"] += 1 - return [], None - - async def _enhanced_plan_flow(self, mode: ChatMode, context: "StreamContext") -> Tuple[List[Dict], Optional[Dict]]: - """执行增强版规划流程""" - try: - # 1. 生成初始 Plan - initial_plan = await self.generator.generate(mode) - - unread_messages = context.get_unread_messages() if context else [] - # 2. 兴趣度评分 - 只对未读消息进行评分 - if unread_messages: - bot_nickname = global_config.bot.nickname - interest_scores = await self.interest_scoring.calculate_interest_scores(unread_messages, bot_nickname) - - # 3. 根据兴趣度调整可用动作 - if interest_scores: - latest_score = max(interest_scores, key=lambda s: s.total_score) - latest_message = next( - (msg for msg in unread_messages if msg.message_id == latest_score.message_id), None - ) - should_reply, score = self.interest_scoring.should_reply(latest_score, latest_message) - - reply_not_available = False - if not should_reply and "reply" in initial_plan.available_actions: - logger.info(f"兴趣度不足 ({latest_score.total_score:.2f}),移除'回复'动作。") - reply_not_available = True - - # base_threshold = self.interest_scoring.reply_threshold - # 检查兴趣度是否达到非回复动作阈值 - non_reply_action_interest_threshold = global_config.affinity_flow.non_reply_action_interest_threshold - if score < non_reply_action_interest_threshold: - logger.info( - f"兴趣度 {score:.3f} 低于非回复动作阈值 {non_reply_action_interest_threshold:.3f},不执行任何动作。" - ) - # 直接返回 no_action - from src.common.data_models.info_data_model import ActionPlannerInfo - - no_action = ActionPlannerInfo( - action_type="no_action", - reasoning=f"兴趣度评分 {score:.3f} 未达阈值 {non_reply_action_interest_threshold:.3f}", - action_data={}, - action_message=None, - ) - filtered_plan = initial_plan - filtered_plan.decided_actions = [no_action] - else: - # 4. 筛选 Plan - filtered_plan = await self.filter.filter(reply_not_available, initial_plan) - - # 检查filtered_plan是否有reply动作,以便记录reply action - has_reply_action = False - for decision in filtered_plan.decided_actions: - if decision.action_type == "reply": - has_reply_action = True - self.interest_scoring.record_reply_action(has_reply_action) - - # 5. 使用 PlanExecutor 执行 Plan - execution_result = await self.executor.execute(filtered_plan) - - # 6. 根据执行结果更新统计信息 - self._update_stats_from_execution_result(execution_result) - - # 7. 检查关系更新 - await self.relationship_tracker.check_and_update_relationships() - - # 8. 返回结果 - return self._build_return_result(filtered_plan) - - except Exception as e: - logger.error(f"增强版规划流程出错: {e}") - self.planner_stats["failed_plans"] += 1 - return [], None - - def _update_stats_from_execution_result(self, execution_result: Dict[str, any]): - """根据执行结果更新规划器统计""" - if not execution_result: - return - - successful_count = execution_result.get("successful_count", 0) - - # 更新成功执行计数 - self.planner_stats["successful_plans"] += successful_count - - # 统计回复动作和其他动作 - reply_count = 0 - other_count = 0 - - for result in execution_result.get("results", []): - action_type = result.get("action_type", "") - if action_type in ["reply", "proactive_reply"]: - reply_count += 1 - else: - other_count += 1 - - self.planner_stats["replies_generated"] += reply_count - self.planner_stats["other_actions_executed"] += other_count - - def _build_return_result(self, plan: "Plan") -> Tuple[List[Dict], Optional[Dict]]: - """构建返回结果""" - final_actions = plan.decided_actions or [] - final_target_message = next((act.action_message for act in final_actions if act.action_message), None) - - final_actions_dict = [asdict(act) for act in final_actions] - - if final_target_message: - if hasattr(final_target_message, "__dataclass_fields__"): - final_target_message_dict = asdict(final_target_message) - else: - final_target_message_dict = final_target_message - else: - final_target_message_dict = None - - return final_actions_dict, final_target_message_dict - - def get_user_relationship(self, user_id: str) -> float: - """获取用户关系分""" - return self.interest_scoring.get_user_relationship(user_id) - - def update_interest_keywords(self, new_keywords: Dict[str, List[str]]): - """更新兴趣关键词(已弃用,仅保留用于兼容性)""" - logger.info("传统关键词匹配已移除,此方法仅保留用于兼容性") - # 此方法已弃用,因为现在完全使用embedding匹配 - - def get_planner_stats(self) -> Dict[str, any]: - """获取规划器统计""" - return self.planner_stats.copy() - - def get_interest_scoring_stats(self) -> Dict[str, any]: - """获取兴趣度评分统计""" - return { - "no_reply_count": self.interest_scoring.no_reply_count, - "max_no_reply_count": self.interest_scoring.max_no_reply_count, - "reply_threshold": self.interest_scoring.reply_threshold, - "mention_threshold": self.interest_scoring.mention_threshold, - "user_relationships": len(self.interest_scoring.user_relationships), - } - - def get_relationship_stats(self) -> Dict[str, any]: - """获取用户关系统计""" - return { - "tracking_users": len(self.relationship_tracker.tracking_users), - "relationship_history": len(self.relationship_tracker.relationship_history), - "max_tracking_users": self.relationship_tracker.max_tracking_users, - } - - -# 全局兴趣度评分系统实例 - 在 individuality 模块中创建 diff --git a/src/plugin_system/base/component_types.py b/src/plugin_system/base/component_types.py index 98870044e..4d8454da9 100644 --- a/src/plugin_system/base/component_types.py +++ b/src/plugin_system/base/component_types.py @@ -41,7 +41,8 @@ class ActionActivationType(Enum): class ChatMode(Enum): """聊天模式枚举""" - FOCUS = "focus" # Focus聊天模式 + GROUP = "group" # 群聊模式 + PRIVATE = "private" # 私聊模式 NORMAL = "normal" # Normal聊天模式 PROACTIVE = "proactive" # 主动思考模式 PRIORITY = "priority" # 优先级聊天模式 diff --git a/src/plugins/built_in/affinity_flow_chatter/affinity_chatter.py b/src/plugins/built_in/affinity_flow_chatter/affinity_chatter.py index 36a0e5b03..45575a033 100644 --- a/src/plugins/built_in/affinity_flow_chatter/affinity_chatter.py +++ b/src/plugins/built_in/affinity_flow_chatter/affinity_chatter.py @@ -59,7 +59,7 @@ class AffinityChatter(BaseChatter): unread_messages = context.get_unread_messages() # 使用增强版规划器处理消息 - actions, target_message = await self.planner.plan(mode=ChatMode.FOCUS, context=context) + actions, target_message = await self.planner.plan(mode=ChatMode.GROUP, context=context) self.stats["plans_created"] += 1 # 执行动作(如果规划器返回了动作) diff --git a/src/plugins/built_in/affinity_flow_chatter/plan_filter.py b/src/plugins/built_in/affinity_flow_chatter/plan_filter.py index ac43110a1..75cd7b4fe 100644 --- a/src/plugins/built_in/affinity_flow_chatter/plan_filter.py +++ b/src/plugins/built_in/affinity_flow_chatter/plan_filter.py @@ -186,7 +186,7 @@ class ChatterPlanFilter: if global_config.chat.at_bot_inevitable_reply: mentioned_bonus = "\n- 有人提到你,或者at你" - if plan.mode == ChatMode.FOCUS: + if plan.mode == ChatMode.GROUP: no_action_block = """ 动作:no_action 动作描述:不选择任何动作 @@ -204,7 +204,7 @@ class ChatterPlanFilter: "reason":"不回复的原因" }} """ - else: # NORMAL Mode + else: # PRIVATE Mode no_action_block = """重要说明: - 'reply' 表示只进行普通聊天回复,不执行任何额外动作 - 其他action表示在普通回复的基础上,执行相应的额外动作 diff --git a/src/plugins/built_in/affinity_flow_chatter/planner.py b/src/plugins/built_in/affinity_flow_chatter/planner.py index 79ec3e514..c1bd7ab9b 100644 --- a/src/plugins/built_in/affinity_flow_chatter/planner.py +++ b/src/plugins/built_in/affinity_flow_chatter/planner.py @@ -73,13 +73,13 @@ class ChatterActionPlanner: } async def plan( - self, mode: ChatMode = ChatMode.FOCUS, context: "StreamContext" = None + self, mode: ChatMode = ChatMode.GROUP, context: "StreamContext" = None ) -> Tuple[List[Dict], Optional[Dict]]: """ 执行完整的增强版规划流程。 Args: - mode (ChatMode): 当前的聊天模式,默认为 FOCUS。 + mode (ChatMode): 当前的聊天模式,默认为 GROUP。 context (StreamContext): 包含聊天流消息的上下文对象。 Returns: