From 8aa5bed97dde1720c58bb1576ba162e35b424c44 Mon Sep 17 00:00:00 2001 From: Windpicker-owo <3431391539@qq.com> Date: Tue, 23 Sep 2025 13:38:55 +0800 Subject: [PATCH] =?UTF-8?q?refactor(plugins):=20=E7=BB=9F=E4=B8=80?= =?UTF-8?q?=E4=BA=B2=E5=92=8C=E5=8A=9B=E6=B5=81=E6=A8=A1=E5=9D=97=E5=AF=BC?= =?UTF-8?q?=E5=85=A5=E8=B7=AF=E5=BE=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 将 chatter 子模块重命名为 affinity_flow_chatter 后,更新相关导入路径以保持一致性: - individuality.py 中的兴趣评分系统导入路径 - plan_filter.py 中的兴趣评分系统导入路径 这些更改确保模块引用与新的目录结构保持一致,避免导入错误。 --- src/chat/planner_actions/plan_executor.py | 363 ++++++++++++ src/chat/planner_actions/plan_filter.py | 519 ++++++++++++++++++ src/chat/planner_actions/planner.py | 244 ++++++++ src/individuality/individuality.py | 2 +- .../affinity_flow_chatter/plan_filter.py | 2 +- 5 files changed, 1128 insertions(+), 2 deletions(-) create mode 100644 src/chat/planner_actions/plan_executor.py create mode 100644 src/chat/planner_actions/plan_filter.py create mode 100644 src/chat/planner_actions/planner.py diff --git a/src/chat/planner_actions/plan_executor.py b/src/chat/planner_actions/plan_executor.py new file mode 100644 index 000000000..9b551d75d --- /dev/null +++ b/src/chat/planner_actions/plan_executor.py @@ -0,0 +1,363 @@ +""" +PlanExecutor: 接收 Plan 对象并执行其中的所有动作。 +集成用户关系追踪机制,自动记录交互并更新关系。 +""" + +import asyncio +import re +import time +from typing import Dict, List + +from src.config.config import global_config +from src.chat.planner_actions.action_manager import ChatterActionManager +from src.common.data_models.info_data_model import Plan, ActionPlannerInfo +from src.common.logger import get_logger + +logger = get_logger("plan_executor") + + +class PlanExecutor: + """ + 增强版PlanExecutor,集成用户关系追踪机制。 + + 功能: + 1. 执行Plan中的所有动作 + 2. 自动记录用户交互并添加到关系追踪 + 3. 分类执行回复动作和其他动作 + 4. 提供完整的执行统计和监控 + """ + + def __init__(self, action_manager: ChatterActionManager): + """ + 初始化增强版PlanExecutor。 + + Args: + action_manager (ChatterActionManager): 用于实际执行各种动作的管理器实例。 + """ + self.action_manager = action_manager + + # 执行统计 + self.execution_stats = { + "total_executed": 0, + "successful_executions": 0, + "failed_executions": 0, + "reply_executions": 0, + "other_action_executions": 0, + "execution_times": [], + } + + # 用户关系追踪引用 + self.relationship_tracker = None + + def set_relationship_tracker(self, relationship_tracker): + """设置关系追踪器""" + self.relationship_tracker = relationship_tracker + + async def execute(self, plan: Plan) -> Dict[str, any]: + """ + 遍历并执行Plan对象中`decided_actions`列表里的所有动作。 + + Args: + plan (Plan): 包含待执行动作列表的Plan对象。 + + Returns: + Dict[str, any]: 执行结果统计信息 + """ + if not plan.decided_actions: + logger.info("没有需要执行的动作。") + return {"executed_count": 0, "results": []} + + execution_results = [] + reply_actions = [] + other_actions = [] + + # 分类动作:回复动作和其他动作 + for action_info in plan.decided_actions: + if action_info.action_type in ["reply", "proactive_reply"]: + reply_actions.append(action_info) + else: + other_actions.append(action_info) + + # 执行回复动作(优先执行) + if reply_actions: + reply_result = await self._execute_reply_actions(reply_actions, plan) + execution_results.extend(reply_result["results"]) + self.execution_stats["reply_executions"] += len(reply_actions) + + # 将其他动作放入后台任务执行,避免阻塞主流程 + if other_actions: + asyncio.create_task(self._execute_other_actions(other_actions, plan)) + logger.info(f"已将 {len(other_actions)} 个其他动作放入后台任务执行。") + # 注意:后台任务的结果不会立即计入本次返回的统计数据 + + # 更新总体统计 + self.execution_stats["total_executed"] += len(plan.decided_actions) + successful_count = sum(1 for r in execution_results if r["success"]) + self.execution_stats["successful_executions"] += successful_count + self.execution_stats["failed_executions"] += len(execution_results) - successful_count + + logger.info( + f"规划执行完成: 总数={len(plan.decided_actions)}, 成功={successful_count}, 失败={len(execution_results) - successful_count}" + ) + + return { + "executed_count": len(plan.decided_actions), + "successful_count": successful_count, + "failed_count": len(execution_results) - successful_count, + "results": execution_results, + } + + async def _execute_reply_actions(self, reply_actions: List[ActionPlannerInfo], plan: Plan) -> Dict[str, any]: + """执行回复动作""" + results = [] + + for action_info in reply_actions: + result = await self._execute_single_reply_action(action_info, plan) + results.append(result) + + return {"results": results} + + async def _execute_single_reply_action(self, action_info: ActionPlannerInfo, plan: Plan) -> Dict[str, any]: + """执行单个回复动作""" + start_time = time.time() + success = False + error_message = "" + reply_content = "" + + try: + logger.info(f"执行回复动作: {action_info.action_type} (原因: {action_info.reasoning})") + + # 获取用户ID - 兼容对象和字典 + if hasattr(action_info.action_message, "user_info"): + user_id = action_info.action_message.user_info.user_id + else: + user_id = action_info.action_message.get("user_info", {}).get("user_id") + + if user_id == str(global_config.bot.qq_account): + logger.warning("尝试回复自己,跳过此动作以防止死循环。") + return { + "action_type": action_info.action_type, + "success": False, + "error_message": "尝试回复自己,跳过此动作以防止死循环。", + "execution_time": 0, + "reasoning": action_info.reasoning, + "reply_content": "", + } + # 构建回复动作参数 + action_params = { + "chat_id": plan.chat_id, + "target_message": action_info.action_message, + "reasoning": action_info.reasoning, + "action_data": action_info.action_data or {}, + } + + # 通过动作管理器执行回复 + reply_content = await self.action_manager.execute_action( + action_name=action_info.action_type, **action_params + ) + + success = True + logger.info(f"回复动作 '{action_info.action_type}' 执行成功。") + + except Exception as e: + error_message = str(e) + logger.error(f"执行回复动作失败: {action_info.action_type}, 错误: {error_message}") + + # 记录用户关系追踪 + if success and action_info.action_message: + await self._track_user_interaction(action_info, plan, reply_content) + + execution_time = time.time() - start_time + self.execution_stats["execution_times"].append(execution_time) + + return { + "action_type": action_info.action_type, + "success": success, + "error_message": error_message, + "execution_time": execution_time, + "reasoning": action_info.reasoning, + "reply_content": reply_content[:200] + "..." if len(reply_content) > 200 else reply_content, + } + + async def _execute_other_actions(self, other_actions: List[ActionPlannerInfo], plan: Plan) -> Dict[str, any]: + """执行其他动作""" + results = [] + + # 并行执行其他动作 + tasks = [] + for action_info in other_actions: + task = self._execute_single_other_action(action_info, plan) + tasks.append(task) + + if tasks: + executed_results = await asyncio.gather(*tasks, return_exceptions=True) + for i, result in enumerate(executed_results): + if isinstance(result, Exception): + logger.error(f"执行动作 {other_actions[i].action_type} 时发生异常: {result}") + results.append( + { + "action_type": other_actions[i].action_type, + "success": False, + "error_message": str(result), + "execution_time": 0, + "reasoning": other_actions[i].reasoning, + } + ) + else: + results.append(result) + + return {"results": results} + + async def _execute_single_other_action(self, action_info: ActionPlannerInfo, plan: Plan) -> Dict[str, any]: + """执行单个其他动作""" + start_time = time.time() + success = False + error_message = "" + + try: + logger.info(f"执行其他动作: {action_info.action_type} (原因: {action_info.reasoning})") + + action_data = action_info.action_data or {} + + # 针对 poke_user 动作,特殊处理 + if action_info.action_type == "poke_user": + target_message = action_info.action_message + if target_message: + # 优先直接获取 user_id,这才是最可靠的信息 + user_id = target_message.get("user_id") + if user_id: + action_data["user_id"] = user_id + logger.info(f"检测到戳一戳动作,目标用户ID: {user_id}") + else: + # 如果没有 user_id,再尝试用 user_nickname 作为备用方案 + user_name = target_message.get("user_nickname") + if user_name: + action_data["user_name"] = user_name + logger.info(f"检测到戳一戳动作,目标用户: {user_name}") + else: + logger.warning("无法从戳一戳消息中获取用户ID或昵称。") + + # 传递原始消息ID以支持引用 + action_data["target_message_id"] = target_message.get("message_id") + + # 构建动作参数 + action_params = { + "chat_id": plan.chat_id, + "target_message": action_info.action_message, + "reasoning": action_info.reasoning, + "action_data": action_data, + } + + # 通过动作管理器执行动作 + await self.action_manager.execute_action(action_name=action_info.action_type, **action_params) + + success = True + logger.info(f"其他动作 '{action_info.action_type}' 执行成功。") + + except Exception as e: + error_message = str(e) + logger.error(f"执行其他动作失败: {action_info.action_type}, 错误: {error_message}") + + execution_time = time.time() - start_time + self.execution_stats["execution_times"].append(execution_time) + + return { + "action_type": action_info.action_type, + "success": success, + "error_message": error_message, + "execution_time": execution_time, + "reasoning": action_info.reasoning, + } + + async def _track_user_interaction(self, action_info: ActionPlannerInfo, plan: Plan, reply_content: str): + """追踪用户交互 - 集成回复后关系追踪""" + try: + if not action_info.action_message: + return + + # 获取用户信息 - 处理对象和字典两种情况 + if hasattr(action_info.action_message, "user_info"): + # 对象情况 + user_info = action_info.action_message.user_info + user_id = user_info.user_id + user_name = user_info.user_nickname or user_id + user_message = action_info.action_message.content + else: + # 字典情况 + user_info = action_info.action_message.get("user_info", {}) + user_id = user_info.get("user_id") + user_name = user_info.get("user_nickname") or user_id + user_message = action_info.action_message.get("content", "") + + if not user_id: + logger.debug("跳过追踪:缺少用户ID") + return + + # 如果有设置关系追踪器,执行回复后关系追踪 + if self.relationship_tracker: + # 记录基础交互信息(保持向后兼容) + self.relationship_tracker.add_interaction( + user_id=user_id, + user_name=user_name, + user_message=user_message, + bot_reply=reply_content, + reply_timestamp=time.time(), + ) + + # 执行新的回复后关系追踪 + await self.relationship_tracker.track_reply_relationship( + user_id=user_id, user_name=user_name, bot_reply_content=reply_content, reply_timestamp=time.time() + ) + + logger.debug(f"已执行用户交互追踪: {user_id}") + + except Exception as e: + logger.error(f"追踪用户交互时出错: {e}") + logger.debug(f"action_message类型: {type(action_info.action_message)}") + logger.debug(f"action_message内容: {action_info.action_message}") + + def get_execution_stats(self) -> Dict[str, any]: + """获取执行统计信息""" + stats = self.execution_stats.copy() + + # 计算平均执行时间 + if stats["execution_times"]: + avg_time = sum(stats["execution_times"]) / len(stats["execution_times"]) + stats["average_execution_time"] = avg_time + stats["max_execution_time"] = max(stats["execution_times"]) + stats["min_execution_time"] = min(stats["execution_times"]) + else: + stats["average_execution_time"] = 0 + stats["max_execution_time"] = 0 + stats["min_execution_time"] = 0 + + # 移除执行时间列表以避免返回过大数据 + stats.pop("execution_times", None) + + return stats + + def reset_stats(self): + """重置统计信息""" + self.execution_stats = { + "total_executed": 0, + "successful_executions": 0, + "failed_executions": 0, + "reply_executions": 0, + "other_action_executions": 0, + "execution_times": [], + } + + def get_recent_performance(self, limit: int = 10) -> List[Dict[str, any]]: + """获取最近的执行性能""" + recent_times = self.execution_stats["execution_times"][-limit:] + if not recent_times: + return [] + + return [ + { + "execution_index": i + 1, + "execution_time": time_val, + "timestamp": time.time() - (len(recent_times) - i) * 60, # 估算时间戳 + } + for i, time_val in enumerate(recent_times) + ] diff --git a/src/chat/planner_actions/plan_filter.py b/src/chat/planner_actions/plan_filter.py new file mode 100644 index 000000000..3dab354a6 --- /dev/null +++ b/src/chat/planner_actions/plan_filter.py @@ -0,0 +1,519 @@ +""" +PlanFilter: 接收 Plan 对象,根据不同模式的逻辑进行筛选,决定最终要执行的动作。 +""" + +import orjson +import time +import traceback +from datetime import datetime +from typing import Any, Dict, List, Optional + +from json_repair import repair_json + +from src.chat.memory_system.Hippocampus import hippocampus_manager +from src.chat.utils.chat_message_builder import ( + build_readable_actions, + build_readable_messages_with_id, + get_actions_by_timestamp_with_chat, +) +from src.chat.utils.prompt import global_prompt_manager +from src.common.data_models.info_data_model import ActionPlannerInfo, Plan +from src.common.logger import get_logger +from src.config.config import global_config, model_config +from src.llm_models.utils_model import LLMRequest +from src.mood.mood_manager import mood_manager +from src.plugin_system.base.component_types import ActionInfo, ChatMode +from src.schedule.schedule_manager import schedule_manager + +logger = get_logger("plan_filter") + + +class PlanFilter: + """ + 根据 Plan 中的模式和信息,筛选并决定最终的动作。 + """ + + def __init__(self): + self.planner_llm = LLMRequest(model_set=model_config.model_task_config.planner, request_type="planner") + self.last_obs_time_mark = 0.0 + + async def filter(self, reply_not_available: bool, plan: Plan) -> Plan: + """ + 执行筛选逻辑,并填充 Plan 对象的 decided_actions 字段。 + """ + logger.debug(f"墨墨在这里加了日志 -> filter 入口 plan: {plan}") + try: + prompt, used_message_id_list = await self._build_prompt(plan) + plan.llm_prompt = prompt + logger.debug(f"墨墨在这里加了日志 -> LLM prompt: {prompt}") + + llm_content, _ = await self.planner_llm.generate_response_async(prompt=prompt) + + if llm_content: + logger.debug(f"墨墨在这里加了日志 -> LLM a原始返回: {llm_content}") + try: + parsed_json = orjson.loads(repair_json(llm_content)) + except orjson.JSONDecodeError: + parsed_json = {"action": "no_action", "reason": "返回内容无法解析为JSON"} + logger.debug(f"墨墨在这里加了日志 -> 解析后的 JSON: {parsed_json}") + + if "reply" in plan.available_actions and reply_not_available: + # 如果reply动作不可用,但llm返回的仍然有reply,则改为no_reply + if isinstance(parsed_json, dict) and parsed_json.get("action") == "reply": + parsed_json["action"] = "no_reply" + elif isinstance(parsed_json, list): + for item in parsed_json: + if isinstance(item, dict) and item.get("action") == "reply": + item["action"] = "no_reply" + item["reason"] += " (但由于兴趣度不足,reply动作不可用,已改为no_reply)" + + if isinstance(parsed_json, dict): + parsed_json = [parsed_json] + + if isinstance(parsed_json, list): + final_actions = [] + reply_action_added = False + # 定义回复类动作的集合,方便扩展 + reply_action_types = {"reply", "proactive_reply"} + + for item in parsed_json: + if not isinstance(item, dict): + continue + + # 预解析 action_type 来进行判断 + action_type = item.get("action", "no_action") + + if action_type in reply_action_types: + if not reply_action_added: + final_actions.extend(await self._parse_single_action(item, used_message_id_list, plan)) + reply_action_added = True + else: + # 非回复类动作直接添加 + final_actions.extend(await self._parse_single_action(item, used_message_id_list, plan)) + + plan.decided_actions = self._filter_no_actions(final_actions) + + except Exception as e: + logger.error(f"筛选 Plan 时出错: {e}\n{traceback.format_exc()}") + plan.decided_actions = [ActionPlannerInfo(action_type="no_action", reasoning=f"筛选时出错: {e}")] + + logger.debug(f"墨墨在这里加了日志 -> filter 出口 decided_actions: {plan.decided_actions}") + return plan + + async def _build_prompt(self, plan: Plan) -> tuple[str, list]: + """ + 根据 Plan 对象构建提示词。 + """ + try: + time_block = f"当前时间:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}" + bot_name = global_config.bot.nickname + bot_nickname = ( + f",也有人叫你{','.join(global_config.bot.alias_names)}" if global_config.bot.alias_names else "" + ) + bot_core_personality = global_config.personality.personality_core + identity_block = f"你的名字是{bot_name}{bot_nickname},你{bot_core_personality}:" + + schedule_block = "" + if global_config.planning_system.schedule_enable: + if current_activity := schedule_manager.get_current_activity(): + schedule_block = f"你当前正在:{current_activity},但注意它与群聊的聊天无关。" + + mood_block = "" + if global_config.mood.enable_mood: + chat_mood = mood_manager.get_mood_by_chat_id(plan.chat_id) + mood_block = f"你现在的心情是:{chat_mood.mood_state}" + + if plan.mode == ChatMode.PROACTIVE: + long_term_memory_block = await self._get_long_term_memory_context() + + chat_content_block, message_id_list = build_readable_messages_with_id( + messages=[msg.flatten() for msg in plan.chat_history], + timestamp_mode="normal", + truncate=False, + show_actions=False, + ) + + prompt_template = await global_prompt_manager.get_prompt_async("proactive_planner_prompt") + actions_before_now = get_actions_by_timestamp_with_chat( + chat_id=plan.chat_id, + timestamp_start=time.time() - 3600, + timestamp_end=time.time(), + limit=5, + ) + actions_before_now_block = build_readable_actions(actions=actions_before_now) + actions_before_now_block = f"你刚刚选择并执行过的action是:\n{actions_before_now_block}" + + prompt = prompt_template.format( + time_block=time_block, + identity_block=identity_block, + schedule_block=schedule_block, + mood_block=mood_block, + long_term_memory_block=long_term_memory_block, + chat_content_block=chat_content_block or "最近没有聊天内容。", + actions_before_now_block=actions_before_now_block, + ) + return prompt, message_id_list + + # 构建已读/未读历史消息 + read_history_block, unread_history_block, message_id_list = await self._build_read_unread_history_blocks( + plan + ) + + # 为了兼容性,保留原有的chat_content_block + chat_content_block, _ = build_readable_messages_with_id( + messages=[msg.flatten() for msg in plan.chat_history], + timestamp_mode="normal", + read_mark=self.last_obs_time_mark, + truncate=True, + show_actions=True, + ) + + actions_before_now = get_actions_by_timestamp_with_chat( + chat_id=plan.chat_id, + timestamp_start=time.time() - 3600, + timestamp_end=time.time(), + limit=5, + ) + + actions_before_now_block = build_readable_actions(actions=actions_before_now) + actions_before_now_block = f"你刚刚选择并执行过的action是:\n{actions_before_now_block}" + + self.last_obs_time_mark = time.time() + + mentioned_bonus = "" + if global_config.chat.mentioned_bot_inevitable_reply: + mentioned_bonus = "\n- 有人提到你" + if global_config.chat.at_bot_inevitable_reply: + mentioned_bonus = "\n- 有人提到你,或者at你" + + if plan.mode == ChatMode.FOCUS: + no_action_block = """ +动作:no_action +动作描述:不选择任何动作 +{{ + "action": "no_action", + "reason":"不动作的原因" +}} + +动作:no_reply +动作描述:不进行回复,等待合适的回复时机 +- 当你刚刚发送了消息,没有人回复时,选择no_reply +- 当你一次发送了太多消息,为了避免打扰聊天节奏,选择no_reply +{{ + "action": "no_reply", + "reason":"不回复的原因" +}} +""" + else: # NORMAL Mode + no_action_block = """重要说明: +- 'reply' 表示只进行普通聊天回复,不执行任何额外动作 +- 其他action表示在普通回复的基础上,执行相应的额外动作 +{{ + "action": "reply", + "target_message_id":"触发action的消息id", + "reason":"回复的原因" +}}""" + + is_group_chat = plan.target_info.platform == "group" if plan.target_info else True + chat_context_description = "你现在正在一个群聊中" + if not is_group_chat and plan.target_info: + chat_target_name = plan.target_info.person_name or plan.target_info.user_nickname or "对方" + chat_context_description = f"你正在和 {chat_target_name} 私聊" + + action_options_block = await self._build_action_options(plan.available_actions) + + moderation_prompt_block = "请不要输出违法违规内容,不要输出色情,暴力,政治相关内容,如有敏感内容,请规避。" + + custom_prompt_block = "" + if global_config.custom_prompt.planner_custom_prompt_content: + custom_prompt_block = global_config.custom_prompt.planner_custom_prompt_content + + users_in_chat_str = "" # TODO: Re-implement user list fetching if needed + + planner_prompt_template = await global_prompt_manager.get_prompt_async("planner_prompt") + prompt = planner_prompt_template.format( + schedule_block=schedule_block, + mood_block=mood_block, + time_block=time_block, + chat_context_description=chat_context_description, + read_history_block=read_history_block, + unread_history_block=unread_history_block, + actions_before_now_block=actions_before_now_block, + mentioned_bonus=mentioned_bonus, + no_action_block=no_action_block, + action_options_text=action_options_block, + moderation_prompt=moderation_prompt_block, + identity_block=identity_block, + custom_prompt_block=custom_prompt_block, + bot_name=bot_name, + users_in_chat=users_in_chat_str, + ) + return prompt, message_id_list + except Exception as e: + logger.error(f"构建 Planner 提示词时出错: {e}") + logger.error(traceback.format_exc()) + return "构建 Planner Prompt 时出错", [] + + async def _build_read_unread_history_blocks(self, plan: Plan) -> tuple[str, str, list]: + """构建已读/未读历史消息块""" + try: + # 从message_manager获取真实的已读/未读消息 + from src.chat.message_manager.message_manager import message_manager + from src.chat.utils.utils import assign_message_ids + + # 获取聊天流的上下文 + stream_context = message_manager.stream_contexts.get(plan.chat_id) + + # 获取真正的已读和未读消息 + read_messages = stream_context.history_messages # 已读消息存储在history_messages中 + unread_messages = stream_context.get_unread_messages() # 获取未读消息 + + # 构建已读历史消息块 + if read_messages: + read_content, read_ids = build_readable_messages_with_id( + messages=[msg.flatten() for msg in read_messages[-50:]], # 限制数量 + timestamp_mode="normal_no_YMD", + truncate=False, + show_actions=False, + ) + read_history_block = f"{read_content}" + else: + read_history_block = "暂无已读历史消息" + + # 构建未读历史消息块(包含兴趣度) + if unread_messages: + # 扁平化未读消息用于计算兴趣度和格式化 + flattened_unread = [msg.flatten() for msg in unread_messages] + + # 尝试获取兴趣度评分(返回以真实 message_id 为键的字典) + interest_scores = await self._get_interest_scores_for_messages(flattened_unread) + + # 为未读消息分配短 id(保持与 build_readable_messages_with_id 的一致结构) + message_id_list = assign_message_ids(flattened_unread) + + unread_lines = [] + for idx, msg in enumerate(flattened_unread): + mapped = message_id_list[idx] + synthetic_id = mapped.get("id") + original_msg_id = msg.get("message_id") or msg.get("id") + msg_time = time.strftime("%H:%M:%S", time.localtime(msg.get("time", time.time()))) + msg_content = msg.get("processed_plain_text", "") + + # 添加兴趣度信息 + interest_score = interest_scores.get(original_msg_id, 0.0) + interest_text = f" [兴趣度: {interest_score:.3f}]" if interest_score > 0 else "" + + # 在未读行中显示合成id,方便 planner 返回时使用 + unread_lines.append(f"{msg_time} {synthetic_id}: {msg_content}{interest_text}") + + unread_history_block = "\n".join(unread_lines) + else: + unread_history_block = "暂无未读历史消息" + + return read_history_block, unread_history_block, message_id_list + + except Exception as e: + logger.error(f"构建已读/未读历史消息块时出错: {e}") + return "构建已读历史消息时出错", "构建未读历史消息时出错", [] + + async def _get_interest_scores_for_messages(self, messages: List[dict]) -> dict[str, float]: + """为消息获取兴趣度评分""" + interest_scores = {} + + try: + from src.plugins.built_in.affinity_flow_chatter.interest_scoring import chatter_interest_scoring_system + from src.common.data_models.database_data_model import DatabaseMessages + + # 转换消息格式 + db_messages = [] + for msg_dict in messages: + try: + db_msg = DatabaseMessages( + message_id=msg_dict.get("message_id", ""), + time=msg_dict.get("time", time.time()), + chat_id=msg_dict.get("chat_id", ""), + processed_plain_text=msg_dict.get("processed_plain_text", ""), + user_id=msg_dict.get("user_id", ""), + user_nickname=msg_dict.get("user_nickname", ""), + user_platform=msg_dict.get("platform", "qq"), + chat_info_group_id=msg_dict.get("group_id", ""), + chat_info_group_name=msg_dict.get("group_name", ""), + chat_info_group_platform=msg_dict.get("platform", "qq"), + ) + db_messages.append(db_msg) + except Exception as e: + logger.warning(f"转换消息格式失败: {e}") + continue + + # 计算兴趣度评分 + if db_messages: + bot_nickname = global_config.bot.nickname or "麦麦" + scores = await chatter_interest_scoring_system.calculate_interest_scores(db_messages, bot_nickname) + + # 构建兴趣度字典 + for score in scores: + interest_scores[score.message_id] = score.total_score + + except Exception as e: + logger.warning(f"获取兴趣度评分失败: {e}") + + return interest_scores + + async def _parse_single_action( + self, action_json: dict, message_id_list: list, plan: Plan + ) -> List[ActionPlannerInfo]: + parsed_actions = [] + try: + action = action_json.get("action", "no_action") + reasoning = action_json.get("reason", "未提供原因") + action_data = {k: v for k, v in action_json.items() if k not in ["action", "reason"]} + + target_message_obj = None + if action not in ["no_action", "no_reply", "do_nothing", "proactive_reply"]: + if target_message_id := action_json.get("target_message_id"): + target_message_dict = self._find_message_by_id(target_message_id, message_id_list) + else: + # 如果LLM没有指定target_message_id,我们就默认选择最新的一条消息 + target_message_dict = self._get_latest_message(message_id_list) + + if target_message_dict: + # 直接使用字典作为action_message,避免DatabaseMessages对象创建失败 + target_message_obj = target_message_dict + # 替换action_data中的临时ID为真实ID + if "target_message_id" in action_data: + real_message_id = target_message_dict.get("message_id") or target_message_dict.get("id") + if real_message_id: + action_data["target_message_id"] = real_message_id + else: + # 如果找不到目标消息,对于reply动作来说这是必需的,应该记录警告 + if action == "reply": + logger.warning( + f"reply动作找不到目标消息,target_message_id: {action_json.get('target_message_id')}" + ) + # 将reply动作改为no_action,避免后续执行时出错 + action = "no_action" + reasoning = f"找不到目标消息进行回复。原始理由: {reasoning}" + + available_action_names = list(plan.available_actions.keys()) + if ( + action not in ["no_action", "no_reply", "reply", "do_nothing", "proactive_reply"] + and action not in available_action_names + ): + reasoning = f"LLM 返回了当前不可用的动作 '{action}'。原始理由: {reasoning}" + action = "no_action" + + parsed_actions.append( + ActionPlannerInfo( + action_type=action, + reasoning=reasoning, + action_data=action_data, + action_message=target_message_obj, + available_actions=plan.available_actions, + ) + ) + except Exception as e: + logger.error(f"解析单个action时出错: {e}") + parsed_actions.append( + ActionPlannerInfo( + action_type="no_action", + reasoning=f"解析action时出错: {e}", + ) + ) + return parsed_actions + + def _filter_no_actions(self, action_list: List[ActionPlannerInfo]) -> List[ActionPlannerInfo]: + non_no_actions = [a for a in action_list if a.action_type not in ["no_action", "no_reply"]] + if non_no_actions: + return non_no_actions + return action_list[:1] if action_list else [] + + async def _get_long_term_memory_context(self) -> str: + try: + now = datetime.now() + keywords = ["今天", "日程", "计划"] + if 5 <= now.hour < 12: + keywords.append("早上") + elif 12 <= now.hour < 18: + keywords.append("中午") + else: + keywords.append("晚上") + + retrieved_memories = await hippocampus_manager.get_memory_from_topic( + valid_keywords=keywords, max_memory_num=5, max_memory_length=1 + ) + + if not retrieved_memories: + return "最近没有什么特别的记忆。" + + memory_statements = [f"关于'{topic}', 你记得'{memory_item}'。" for topic, memory_item in retrieved_memories] + return " ".join(memory_statements) + except Exception as e: + logger.error(f"获取长期记忆时出错: {e}") + return "回忆时出现了一些问题。" + + async def _build_action_options(self, current_available_actions: Dict[str, ActionInfo]) -> str: + action_options_block = "" + for action_name, action_info in current_available_actions.items(): + param_text = "" + if action_info.action_parameters: + param_text = "\n" + "\n".join( + f' "{p_name}":"{p_desc}"' for p_name, p_desc in action_info.action_parameters.items() + ) + require_text = "\n".join(f"- {req}" for req in action_info.action_require) + using_action_prompt = await global_prompt_manager.get_prompt_async("action_prompt") + action_options_block += using_action_prompt.format( + action_name=action_name, + action_description=action_info.description, + action_parameters=param_text, + action_require=require_text, + ) + return action_options_block + + def _find_message_by_id(self, message_id: str, message_id_list: list) -> Optional[Dict[str, Any]]: + # 兼容多种 message_id 格式:数字、m123、buffered-xxxx + # 如果是纯数字,补上 m 前缀以兼容旧格式 + candidate_ids = {message_id} + if message_id.isdigit(): + candidate_ids.add(f"m{message_id}") + + # 如果是 m 开头且后面是数字,尝试去掉 m 前缀的数字形式 + if message_id.startswith("m") and message_id[1:].isdigit(): + candidate_ids.add(message_id[1:]) + + # 逐项匹配 message_id_list(每项可能为 {'id':..., 'message':...}) + for item in message_id_list: + # 支持 message_id_list 中直接是字符串/ID 的情形 + if isinstance(item, str): + if item in candidate_ids: + # 没有 message 对象,返回None + return None + continue + + if not isinstance(item, dict): + continue + + item_id = item.get("id") + # 直接匹配分配的短 id + if item_id and item_id in candidate_ids: + return item.get("message") + + # 有时 message 存储里会有原始的 message_id 字段(如 buffered-xxxx) + message_obj = item.get("message") + if isinstance(message_obj, dict): + orig_mid = message_obj.get("message_id") or message_obj.get("id") + if orig_mid and orig_mid in candidate_ids: + return message_obj + + # 作为兜底,尝试在 message_id_list 中找到 message.message_id 匹配 + for item in message_id_list: + if isinstance(item, dict) and isinstance(item.get("message"), dict): + mid = item["message"].get("message_id") or item["message"].get("id") + if mid == message_id: + return item["message"] + + return None + + def _get_latest_message(self, message_id_list: list) -> Optional[Dict[str, Any]]: + if not message_id_list: + return None + return message_id_list[-1].get("message") diff --git a/src/chat/planner_actions/planner.py b/src/chat/planner_actions/planner.py new file mode 100644 index 000000000..65e25a1c0 --- /dev/null +++ b/src/chat/planner_actions/planner.py @@ -0,0 +1,244 @@ +""" +主规划器入口,负责协调 PlanGenerator, PlanFilter, 和 PlanExecutor。 +集成兴趣度评分系统和用户关系追踪机制,实现智能化的聊天决策。 +""" + +from dataclasses import asdict +from typing import TYPE_CHECKING, Dict, List, Optional, Tuple + +from src.plugin_system.base.component_types import ChatMode +from src.plugins.built_in.affinity_flow_chatter.plan_executor import ChatterPlanExecutor +from src.plugins.built_in.affinity_flow_chatter.plan_filter import ChatterPlanFilter +from src.plugins.built_in.affinity_flow_chatter.plan_generator import ChatterPlanGenerator +from src.plugins.built_in.affinity_flow_chatter.interest_scoring import ChatterInterestScoringSystem +from src.plugins.built_in.affinity_flow_chatter.relationship_tracker import ChatterRelationshipTracker + + +from src.common.logger import get_logger +from src.config.config import global_config + +if TYPE_CHECKING: + from src.chat.planner_actions.action_manager import ActionManager + from src.common.data_models.message_manager_data_model import StreamContext + from src.common.data_models.info_data_model import Plan + +# 导入提示词模块以确保其被初始化 +from src.plugins.built_in.affinity_flow_chatter import planner_prompts # noqa + +logger = get_logger("planner") + + +class ActionPlanner: + """ + 增强版ActionPlanner,集成兴趣度评分和用户关系追踪机制。 + + 核心功能: + 1. 兴趣度评分系统:根据兴趣匹配度、关系分、提及度、时间因子对消息评分 + 2. 用户关系追踪:自动追踪用户交互并更新关系分 + 3. 智能回复决策:基于兴趣度阈值和连续不回复概率的智能决策 + 4. 完整的规划流程:生成→筛选→执行的完整三阶段流程 + """ + + def __init__(self, chat_id: str, action_manager: "ActionManager"): + """ + 初始化增强版ActionPlanner。 + + Args: + chat_id (str): 当前聊天的 ID。 + action_manager (ActionManager): 一个 ActionManager 实例。 + """ + self.chat_id = chat_id + self.action_manager = action_manager + self.generator = ChatterPlanGenerator(chat_id) + self.filter = ChatterPlanFilter() + self.executor = ChatterPlanExecutor(action_manager) + + # 初始化兴趣度评分系统 + self.interest_scoring = ChatterInterestScoringSystem() + + # 创建新的关系追踪器 + self.relationship_tracker = ChatterRelationshipTracker(self.interest_scoring) + logger.info("创建新的关系追踪器实例") + + # 设置执行器的关系追踪器 + self.executor.set_relationship_tracker(self.relationship_tracker) + + # 规划器统计 + self.planner_stats = { + "total_plans": 0, + "successful_plans": 0, + "failed_plans": 0, + "replies_generated": 0, + "other_actions_executed": 0, + } + + async def plan( + self, mode: ChatMode = ChatMode.FOCUS, context: "StreamContext" = None + ) -> Tuple[List[Dict], Optional[Dict]]: + """ + 执行完整的增强版规划流程。 + + Args: + mode (ChatMode): 当前的聊天模式,默认为 FOCUS。 + context (StreamContext): 包含聊天流消息的上下文对象。 + + Returns: + Tuple[List[Dict], Optional[Dict]]: 一个元组,包含: + - final_actions_dict (List[Dict]): 最终确定的动作列表(字典格式)。 + - final_target_message_dict (Optional[Dict]): 最终的目标消息(字典格式)。 + """ + try: + self.planner_stats["total_plans"] += 1 + + return await self._enhanced_plan_flow(mode, context) + + except Exception as e: + logger.error(f"规划流程出错: {e}") + self.planner_stats["failed_plans"] += 1 + return [], None + + async def _enhanced_plan_flow(self, mode: ChatMode, context: "StreamContext") -> Tuple[List[Dict], Optional[Dict]]: + """执行增强版规划流程""" + try: + # 1. 生成初始 Plan + initial_plan = await self.generator.generate(mode) + + unread_messages = context.get_unread_messages() if context else [] + # 2. 兴趣度评分 - 只对未读消息进行评分 + if unread_messages: + bot_nickname = global_config.bot.nickname + interest_scores = await self.interest_scoring.calculate_interest_scores(unread_messages, bot_nickname) + + # 3. 根据兴趣度调整可用动作 + if interest_scores: + latest_score = max(interest_scores, key=lambda s: s.total_score) + latest_message = next( + (msg for msg in unread_messages if msg.message_id == latest_score.message_id), None + ) + should_reply, score = self.interest_scoring.should_reply(latest_score, latest_message) + + reply_not_available = False + if not should_reply and "reply" in initial_plan.available_actions: + logger.info(f"兴趣度不足 ({latest_score.total_score:.2f}),移除'回复'动作。") + reply_not_available = True + + # base_threshold = self.interest_scoring.reply_threshold + # 检查兴趣度是否达到非回复动作阈值 + non_reply_action_interest_threshold = global_config.affinity_flow.non_reply_action_interest_threshold + if score < non_reply_action_interest_threshold: + logger.info( + f"兴趣度 {score:.3f} 低于非回复动作阈值 {non_reply_action_interest_threshold:.3f},不执行任何动作。" + ) + # 直接返回 no_action + from src.common.data_models.info_data_model import ActionPlannerInfo + + no_action = ActionPlannerInfo( + action_type="no_action", + reasoning=f"兴趣度评分 {score:.3f} 未达阈值 {non_reply_action_interest_threshold:.3f}", + action_data={}, + action_message=None, + ) + filtered_plan = initial_plan + filtered_plan.decided_actions = [no_action] + else: + # 4. 筛选 Plan + filtered_plan = await self.filter.filter(reply_not_available, initial_plan) + + # 检查filtered_plan是否有reply动作,以便记录reply action + has_reply_action = False + for decision in filtered_plan.decided_actions: + if decision.action_type == "reply": + has_reply_action = True + self.interest_scoring.record_reply_action(has_reply_action) + + # 5. 使用 PlanExecutor 执行 Plan + execution_result = await self.executor.execute(filtered_plan) + + # 6. 根据执行结果更新统计信息 + self._update_stats_from_execution_result(execution_result) + + # 7. 检查关系更新 + await self.relationship_tracker.check_and_update_relationships() + + # 8. 返回结果 + return self._build_return_result(filtered_plan) + + except Exception as e: + logger.error(f"增强版规划流程出错: {e}") + self.planner_stats["failed_plans"] += 1 + return [], None + + def _update_stats_from_execution_result(self, execution_result: Dict[str, any]): + """根据执行结果更新规划器统计""" + if not execution_result: + return + + successful_count = execution_result.get("successful_count", 0) + + # 更新成功执行计数 + self.planner_stats["successful_plans"] += successful_count + + # 统计回复动作和其他动作 + reply_count = 0 + other_count = 0 + + for result in execution_result.get("results", []): + action_type = result.get("action_type", "") + if action_type in ["reply", "proactive_reply"]: + reply_count += 1 + else: + other_count += 1 + + self.planner_stats["replies_generated"] += reply_count + self.planner_stats["other_actions_executed"] += other_count + + def _build_return_result(self, plan: "Plan") -> Tuple[List[Dict], Optional[Dict]]: + """构建返回结果""" + final_actions = plan.decided_actions or [] + final_target_message = next((act.action_message for act in final_actions if act.action_message), None) + + final_actions_dict = [asdict(act) for act in final_actions] + + if final_target_message: + if hasattr(final_target_message, "__dataclass_fields__"): + final_target_message_dict = asdict(final_target_message) + else: + final_target_message_dict = final_target_message + else: + final_target_message_dict = None + + return final_actions_dict, final_target_message_dict + + def get_user_relationship(self, user_id: str) -> float: + """获取用户关系分""" + return self.interest_scoring.get_user_relationship(user_id) + + def update_interest_keywords(self, new_keywords: Dict[str, List[str]]): + """更新兴趣关键词(已弃用,仅保留用于兼容性)""" + logger.info("传统关键词匹配已移除,此方法仅保留用于兼容性") + # 此方法已弃用,因为现在完全使用embedding匹配 + + def get_planner_stats(self) -> Dict[str, any]: + """获取规划器统计""" + return self.planner_stats.copy() + + def get_interest_scoring_stats(self) -> Dict[str, any]: + """获取兴趣度评分统计""" + return { + "no_reply_count": self.interest_scoring.no_reply_count, + "max_no_reply_count": self.interest_scoring.max_no_reply_count, + "reply_threshold": self.interest_scoring.reply_threshold, + "mention_threshold": self.interest_scoring.mention_threshold, + "user_relationships": len(self.interest_scoring.user_relationships), + } + + def get_relationship_stats(self) -> Dict[str, any]: + """获取用户关系统计""" + return { + "tracking_users": len(self.relationship_tracker.tracking_users), + "relationship_history": len(self.relationship_tracker.relationship_history), + "max_tracking_users": self.relationship_tracker.max_tracking_users, + } + + +# 全局兴趣度评分系统实例 - 在 individuality 模块中创建 diff --git a/src/individuality/individuality.py b/src/individuality/individuality.py index a840e9e6d..a1520e4d6 100644 --- a/src/individuality/individuality.py +++ b/src/individuality/individuality.py @@ -80,7 +80,7 @@ class Individuality: full_personality = f"{personality_result},{identity_result}" # 获取全局兴趣评分系统实例 - from src.plugins.built_in.chatter.interest_scoring import chatter_interest_scoring_system as interest_scoring_system + from src.plugins.built_in.affinity_flow_chatter.interest_scoring import chatter_interest_scoring_system as interest_scoring_system # 初始化智能兴趣系统 await interest_scoring_system.initialize_smart_interests( diff --git a/src/plugins/built_in/affinity_flow_chatter/plan_filter.py b/src/plugins/built_in/affinity_flow_chatter/plan_filter.py index 5729f53b6..ac43110a1 100644 --- a/src/plugins/built_in/affinity_flow_chatter/plan_filter.py +++ b/src/plugins/built_in/affinity_flow_chatter/plan_filter.py @@ -321,7 +321,7 @@ class ChatterPlanFilter: interest_scores = {} try: - from src.plugins.built_in.chatter.interest_scoring import chatter_interest_scoring_system as interest_scoring_system + from src.plugins.built_in.affinity_flow_chatter.interest_scoring import chatter_interest_scoring_system as interest_scoring_system from src.common.data_models.database_data_model import DatabaseMessages # 转换消息格式