From c73ffdee9b71eebb68f9cbfcbe906571f33129a4 Mon Sep 17 00:00:00 2001 From: minecraft1024a Date: Thu, 2 Oct 2025 16:58:37 +0800 Subject: [PATCH] =?UTF-8?q?refactor(proactive=5Fthinker):=20=E9=87=8D?= =?UTF-8?q?=E6=9E=84=E6=89=A7=E8=A1=8C=E5=99=A8=E4=BB=A5=E5=AE=9E=E7=8E=B0?= =?UTF-8?q?=E5=86=B3=E7=AD=96-=E8=A7=84=E5=88=92-=E6=89=A7=E8=A1=8C?= =?UTF-8?q?=E6=B5=81=E7=A8=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 重构 `ProactiveThinkerExecutor`,引入更智能的主动对话机制。旧的实现较为简单,直接根据不同场景生成固定类型的问候语。新的实现将主动对话过程分为三个阶段:信息收集、决策、规划与执行。 - **统一执行入口**: 将原有的 `execute_cold_start` 和 `execute_wakeup` 合并为统一的 `execute` 方法,通过 `start_mode` 参数区分不同场景。 - **信息收集**: 增加 `_gather_context` 方法,全面收集构建提示词所需的信息,包括用户关系、日程安排、人设、最近聊天记录等。 - **决策模块**: 新增 `_make_decision` 方法,利用 LLM 判断是否应该发起对话以及聊什么话题,避免在不合适的时机打扰用户。 - **规划与执行**: `_build_plan_prompt` 方法根据决策结果(话题)和上下文,生成最终的对话内容,使对话更具情境感和个性化。 - **事件调用更新**: 在 `proacive_thinker_event.py` 中更新了对新版执行器 `execute` 方法的调用方式。 --- .../proacive_thinker_event.py | 6 +- .../proactive_thinker_executor.py | 266 ++++++++++++++---- 2 files changed, 220 insertions(+), 52 deletions(-) diff --git a/src/plugins/built_in/proactive_thinker/proacive_thinker_event.py b/src/plugins/built_in/proactive_thinker/proacive_thinker_event.py index dd348630e..55492c7f9 100644 --- a/src/plugins/built_in/proactive_thinker/proacive_thinker_event.py +++ b/src/plugins/built_in/proactive_thinker/proacive_thinker_event.py @@ -72,9 +72,9 @@ class ColdStartTask(AsyncTask): # 【关键步骤】主动创建聊天流。 # 创建后,该用户就进入了机器人的“好友列表”,后续将由 ProactiveThinkingTask 接管 - await self.chat_manager.get_or_create_stream(platform, user_info) + stream = await self.chat_manager.get_or_create_stream(platform, user_info) - await self.executor.execute_cold_start(user_info) + await self.executor.execute(stream_id=stream.stream_id, start_mode="cold_start") logger.info(f"【冷启动】已为新用户 {chat_id} (昵称: {user_nickname}) 创建聊天流并发送问候。") except ValueError: @@ -177,7 +177,7 @@ class ProactiveThinkingTask(AsyncTask): if time_since_last_active > next_interval: logger.info(f"【日常唤醒】聊天流 {stream.stream_id} 已冷却 {time_since_last_active:.2f} 秒,触发主动对话。") - await self.executor.execute_wakeup(stream.stream_id) + await self.executor.execute(stream_id=stream.stream_id, start_mode="wake_up") # 【关键步骤】在触发后,立刻更新活跃时间并保存。 # 这可以防止在同一个检查周期内,对同一个目标因为意外的延迟而发送多条消息。 diff --git a/src/plugins/built_in/proactive_thinker/proactive_thinker_executor.py b/src/plugins/built_in/proactive_thinker/proactive_thinker_executor.py index f6c79d9e2..297c005e2 100644 --- a/src/plugins/built_in/proactive_thinker/proactive_thinker_executor.py +++ b/src/plugins/built_in/proactive_thinker/proactive_thinker_executor.py @@ -1,77 +1,245 @@ -from typing import Optional +import orjson +from typing import Optional, Dict, Any +from datetime import datetime -from maim_message import UserInfo - -from src.chat.memory_system.memory_manager import MemoryManager from src.common.logger import get_logger -from src.plugin_system.apis import chat_api, person_api, schedule_api, send_api, llm_api +from src.plugin_system.apis import chat_api, person_api, schedule_api, send_api, llm_api, message_api +from src.config.config import global_config, model_config +from src.person_info.person_info import get_person_info_manager logger = get_logger(__name__) class ProactiveThinkerExecutor: """ - 主动思考执行器,负责生成并发送主动消息。 + 主动思考执行器 V2 + - 统一执行入口 + - 引入决策模块,判断是否及如何发起对话 + - 结合人设、日程、关系信息生成更具情境的对话 """ def __init__(self): - self.memory_manager = MemoryManager() + # 可以在此初始化所需模块,例如LLM请求器等 + pass - async def _generate_prompt(self, stream_id: str) -> Optional[str]: + async def execute(self, stream_id: str, start_mode: str = "wake_up"): """ - 根据聊天流信息,生成包含记忆、日程和个人信息的提示词。 + 统一执行入口 + Args: + stream_id: 聊天流ID + start_mode: 启动模式, 'cold_start' 或 'wake_up' """ - # 1. 获取用户信息 - stream = chat_api.get_stream_by_stream_id(stream_id) + logger.info(f"开始为聊天流 {stream_id} 执行主动思考,模式: {start_mode}") + + # 1. 信息收集 + context = await self._gather_context(stream_id) + if not context: + return + + # 2. 决策阶段 + decision_result = await self._make_decision(context, start_mode) + if not decision_result or not decision_result.get("should_reply"): + reason = decision_result.get("reason", "未提供") if decision_result else "决策过程返回None" + logger.info(f"决策结果为:不回复。原因: {reason}") + return + + # 3. 规划与执行阶段 + topic = decision_result.get("topic", "打个招呼") + logger.info(f"决策结果为:回复。话题: {topic}") + + plan_prompt = self._build_plan_prompt(context, start_mode, topic) + + is_success, response, _, _ = await llm_api.generate_with_model(prompt=plan_prompt, model_config=model_config.model_task_config.utils) + + if is_success: + stream = self._get_stream_from_id(stream_id) + if stream: + await send_api.text_to_stream(stream_id=stream.stream_id, text=response) + else: + logger.warning(f"无法发送消息,因为找不到 stream_id 为 {stream_id} 的聊天流") + + def _get_stream_from_id(self, stream_id: str): + """根据stream_id解析并获取stream对象""" + try: + platform, chat_id, stream_type = stream_id.split(":") + if stream_type == "private": + return chat_api.ChatManager.get_private_stream_by_user_id(platform, chat_id) + elif stream_type == "group": + return chat_api.ChatManager.get_group_stream_by_group_id(platform, chat_id) + except Exception as e: + logger.error(f"解析 stream_id ({stream_id}) 或获取 stream 失败: {e}") + return None + + async def _gather_context(self, stream_id: str) -> Optional[Dict[str, Any]]: + """ + 收集构建提示词所需的所有上下文信息 + """ + stream = self._get_stream_from_id(stream_id) if not stream: logger.warning(f"无法找到 stream_id 为 {stream_id} 的聊天流") return None user_info = stream.user_info + if not user_info or not user_info.platform or not user_info.user_id: + logger.warning(f"Stream {stream_id} 的 user_info 不完整") + return None + person_id = person_api.get_person_id(user_info.platform, int(user_info.user_id)) + person_info_manager = get_person_info_manager() - # 2. 获取记忆 - memories = await self.memory_manager.get_memories(person_id) - memory_context = "\n".join([f"- {m.content}" for m in memories]) + # 获取日程 + schedules = await schedule_api.ScheduleAPI.get_today_schedule() + schedule_context = "\n".join([f"- {s['title']} ({s['start_time']}-{s['end_time']})" for s in schedules]) if schedules else "今天没有日程安排。" - # 3. 获取日程 - schedules = await schedule_api.get_today_schedule(person_id) - schedule_context = "\n".join([f"- {s.title} ({s.start_time}-{s.end_time})" for s in schedules]) + # 获取关系信息 + short_impression = await person_info_manager.get_value(person_id, "short_impression") or "无" + impression = await person_info_manager.get_value(person_id, "impression") or "无" + attitude = await person_info_manager.get_value(person_id, "attitude") or 50 + + # 获取最近聊天记录 + recent_messages = await message_api.get_recent_messages(stream_id, limit=10) + recent_chat_history = await message_api.build_readable_messages_to_str(recent_messages) if recent_messages else "无" + + return { + "person_id": person_id, + "user_info": user_info, + "schedule_context": schedule_context, + "recent_chat_history": recent_chat_history, + "relationship": { + "short_impression": short_impression, + "impression": impression, + "attitude": attitude + }, + "persona": { + "core": global_config.personality.personality_core, + "side": global_config.personality.personality_side, + "identity": global_config.personality.identity, + }, + "current_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S") + } + + async def _make_decision(self, context: Dict[str, Any], start_mode: str) -> Optional[Dict[str, Any]]: + """ + 决策模块:判断是否应该主动发起对话,以及聊什么话题 + """ + persona = context['persona'] + user_info = context['user_info'] + relationship = context['relationship'] - # 4. 构建提示词 prompt = f""" - # Context - ## Memory - {memory_context} +# 角色 +你的名字是{global_config.bot.nickname},你的人设如下: +- 核心人设: {persona['core']} +- 侧面人设: {persona['side']} +- 身份: {persona['identity']} - ## Schedule - {schedule_context} +# 任务 +现在是 {context['current_time']},你需要根据当前的情境,决定是否要主动向用户 '{user_info.user_nickname}' 发起对话。 - # Task - You are a proactive assistant. Based on the user's memory and schedule, initiate a conversation. +# 情境分析 +1. **启动模式**: {start_mode} ({'初次见面/很久未见' if start_mode == 'cold_start' else '日常唤醒'}) +2. **你的日程**: +{context['schedule_context']} +3. **你和Ta的关系**: + - 简短印象: {relationship['short_impression']} + - 详细印象: {relationship['impression']} + - 好感度: {relationship['attitude']}/100 +4. **最近的聊天摘要**: +{context['recent_chat_history']} + +# 决策指令 +请综合以上所有信息,做出决策。你的决策需要以JSON格式输出,包含以下字段: +- `should_reply`: bool, 是否应该发起对话。 +- `topic`: str, 如果 `should_reply` 为 true,你打算聊什么话题?(例如:问候一下今天的日程、关心一下昨天的某件事、分享一个你自己的趣事等) +- `reason`: str, 做出此决策的简要理由。 + +--- +示例1 (应该回复): +{{ + "should_reply": true, + "topic": "提醒Ta今天下午有'项目会议'的日程", + "reason": "现在是上午,Ta下午有个重要会议,我觉得应该主动提醒一下,这会显得我很贴心。" +}} + +示例2 (不应回复): +{{ + "should_reply": false, + "topic": null, + "reason": "虽然我们的关系不错,但现在是深夜,而且Ta今天的日程都已经完成了,我没有合适的理由去打扰Ta。" +}} +--- + +请输出你的决策: +""" + + is_success, response, _, _ = await llm_api.generate_with_model(prompt=prompt, model_config=model_config.model_task_config.utils) + + if not is_success: + return {"should_reply": False, "reason": "决策模型生成失败"} + + try: + # 假设LLM返回JSON格式的决策结果 + decision = orjson.loads(response) + return decision + except orjson.JSONDecodeError: + logger.error(f"决策LLM返回的JSON格式无效: {response}") + return {"should_reply": False, "reason": "决策模型返回格式错误"} + + def _build_plan_prompt(self, context: Dict[str, Any], start_mode: str, topic: str) -> str: """ + 根据启动模式和决策话题,构建最终的规划提示词 + """ + persona = context['persona'] + user_info = context['user_info'] + relationship = context['relationship'] + + if start_mode == "cold_start": + prompt = f""" +# 角色 +你的名字是{global_config.bot.nickname},你的人设如下: +- 核心人设: {persona['core']} +- 侧面人设: {persona['side']} +- 身份: {persona['identity']} + +# 任务 +你需要主动向一个新朋友 '{user_info.user_nickname}' 发起对话。这是你们的第一次交流,或者很久没聊了。 + +# 情境分析 +- **你和Ta的关系**: + - 简短印象: {relationship['short_impression']} + - 详细印象: {relationship['impression']} + - 好感度: {relationship['attitude']}/100 + +# 对话指引 +- 你的目标是“破冰”,让对话自然地开始。 +- 你应该围绕这个话题展开: {topic} +- 你的语气应该符合你的人设,友好且真诚。 +- 直接输出你要说的第一句话,不要包含任何额外的前缀或解释。 +""" + else: # wake_up + prompt = f""" +# 角色 +你的名字是{global_config.bot.nickname},你的人设如下: +- 核心人设: {persona['core']} +- 侧面人设: {persona['side']} +- 身份: {persona['identity']} + +# 任务 +现在是 {context['current_time']},你需要主动向你的朋友 '{user_info.user_nickname}' 发起对话。 + +# 情境分析 +1. **你的日程**: +{context['schedule_context']} +2. **你和Ta的关系**: + - 详细印象: {relationship['impression']} + - 好感度: {relationship['attitude']}/100 +3. **最近的聊天摘要**: +{context['recent_chat_history']} + +# 对话指引 +- 你决定和Ta聊聊关于“{topic}”的话题。 +- 请结合以上所有情境信息,自然地开启对话。 +- 你的语气应该符合你的人设以及你对Ta的好感度。 +- 直接输出你要说的第一句话,不要包含任何额外的前缀或解释。 +""" return prompt - - async def execute_cold_start(self, user_info: UserInfo): - """ - 为新用户执行“破冰”操作。 - """ - logger.info(f"为新用户 {user_info.user_id} 执行“破冰”操作") - prompt = f"You are a proactive assistant. Initiate a conversation with a new friend named {user_info.user_nickname}." - - response = await llm_api.generate(prompt) - await send_api.send_message(user_info.platform, user_info.user_id, response) - - async def execute_wakeup(self, stream_id: str): - """ - 为已冷却的聊天执行“唤醒”操作。 - """ - logger.info(f"为聊天流 {stream_id} 执行“唤醒”操作") - prompt = await self._generate_prompt(stream_id) - if not prompt: - return - - response = await llm_api.generate(prompt) - - stream = chat_api.get_stream_by_stream_id(stream_id) - await send_api.send_message(stream.user_info.platform, stream.user_info.user_id, response)