refactor(proactive_thinker): 重构执行器以实现决策-规划-执行流程

重构 `ProactiveThinkerExecutor`,引入更智能的主动对话机制。旧的实现较为简单,直接根据不同场景生成固定类型的问候语。新的实现将主动对话过程分为三个阶段:信息收集、决策、规划与执行。

- **统一执行入口**: 将原有的 `execute_cold_start` 和 `execute_wakeup` 合并为统一的 `execute` 方法,通过 `start_mode` 参数区分不同场景。
- **信息收集**: 增加 `_gather_context` 方法,全面收集构建提示词所需的信息,包括用户关系、日程安排、人设、最近聊天记录等。
- **决策模块**: 新增 `_make_decision` 方法,利用 LLM 判断是否应该发起对话以及聊什么话题,避免在不合适的时机打扰用户。
- **规划与执行**: `_build_plan_prompt` 方法根据决策结果(话题)和上下文,生成最终的对话内容,使对话更具情境感和个性化。
- **事件调用更新**: 在 `proacive_thinker_event.py` 中更新了对新版执行器 `execute` 方法的调用方式。
This commit is contained in:
minecraft1024a
2025-10-02 16:58:37 +08:00
parent cabbdcc90a
commit c73ffdee9b
2 changed files with 220 additions and 52 deletions

View File

@@ -72,9 +72,9 @@ class ColdStartTask(AsyncTask):
# 【关键步骤】主动创建聊天流。 # 【关键步骤】主动创建聊天流。
# 创建后,该用户就进入了机器人的“好友列表”,后续将由 ProactiveThinkingTask 接管 # 创建后,该用户就进入了机器人的“好友列表”,后续将由 ProactiveThinkingTask 接管
await self.chat_manager.get_or_create_stream(platform, user_info) stream = await self.chat_manager.get_or_create_stream(platform, user_info)
await self.executor.execute_cold_start(user_info) await self.executor.execute(stream_id=stream.stream_id, start_mode="cold_start")
logger.info(f"【冷启动】已为新用户 {chat_id} (昵称: {user_nickname}) 创建聊天流并发送问候。") logger.info(f"【冷启动】已为新用户 {chat_id} (昵称: {user_nickname}) 创建聊天流并发送问候。")
except ValueError: except ValueError:
@@ -177,7 +177,7 @@ class ProactiveThinkingTask(AsyncTask):
if time_since_last_active > next_interval: if time_since_last_active > next_interval:
logger.info(f"【日常唤醒】聊天流 {stream.stream_id} 已冷却 {time_since_last_active:.2f} 秒,触发主动对话。") logger.info(f"【日常唤醒】聊天流 {stream.stream_id} 已冷却 {time_since_last_active:.2f} 秒,触发主动对话。")
await self.executor.execute_wakeup(stream.stream_id) await self.executor.execute(stream_id=stream.stream_id, start_mode="wake_up")
# 【关键步骤】在触发后,立刻更新活跃时间并保存。 # 【关键步骤】在触发后,立刻更新活跃时间并保存。
# 这可以防止在同一个检查周期内,对同一个目标因为意外的延迟而发送多条消息。 # 这可以防止在同一个检查周期内,对同一个目标因为意外的延迟而发送多条消息。

View File

@@ -1,77 +1,245 @@
from typing import Optional import orjson
from typing import Optional, Dict, Any
from datetime import datetime
from maim_message import UserInfo
from src.chat.memory_system.memory_manager import MemoryManager
from src.common.logger import get_logger from src.common.logger import get_logger
from src.plugin_system.apis import chat_api, person_api, schedule_api, send_api, llm_api from src.plugin_system.apis import chat_api, person_api, schedule_api, send_api, llm_api, message_api
from src.config.config import global_config, model_config
from src.person_info.person_info import get_person_info_manager
logger = get_logger(__name__) logger = get_logger(__name__)
class ProactiveThinkerExecutor: class ProactiveThinkerExecutor:
""" """
主动思考执行器,负责生成并发送主动消息。 主动思考执行器 V2
- 统一执行入口
- 引入决策模块,判断是否及如何发起对话
- 结合人设、日程、关系信息生成更具情境的对话
""" """
def __init__(self): def __init__(self):
self.memory_manager = MemoryManager() # 可以在此初始化所需模块例如LLM请求器等
pass
async def _generate_prompt(self, stream_id: str) -> Optional[str]: async def execute(self, stream_id: str, start_mode: str = "wake_up"):
""" """
根据聊天流信息,生成包含记忆、日程和个人信息的提示词。 统一执行入口
Args:
stream_id: 聊天流ID
start_mode: 启动模式, 'cold_start''wake_up'
""" """
# 1. 获取用户信息 logger.info(f"开始为聊天流 {stream_id} 执行主动思考,模式: {start_mode}")
stream = chat_api.get_stream_by_stream_id(stream_id)
# 1. 信息收集
context = await self._gather_context(stream_id)
if not context:
return
# 2. 决策阶段
decision_result = await self._make_decision(context, start_mode)
if not decision_result or not decision_result.get("should_reply"):
reason = decision_result.get("reason", "未提供") if decision_result else "决策过程返回None"
logger.info(f"决策结果为:不回复。原因: {reason}")
return
# 3. 规划与执行阶段
topic = decision_result.get("topic", "打个招呼")
logger.info(f"决策结果为:回复。话题: {topic}")
plan_prompt = self._build_plan_prompt(context, start_mode, topic)
is_success, response, _, _ = await llm_api.generate_with_model(prompt=plan_prompt, model_config=model_config.model_task_config.utils)
if is_success:
stream = self._get_stream_from_id(stream_id)
if stream:
await send_api.text_to_stream(stream_id=stream.stream_id, text=response)
else:
logger.warning(f"无法发送消息,因为找不到 stream_id 为 {stream_id} 的聊天流")
def _get_stream_from_id(self, stream_id: str):
"""根据stream_id解析并获取stream对象"""
try:
platform, chat_id, stream_type = stream_id.split(":")
if stream_type == "private":
return chat_api.ChatManager.get_private_stream_by_user_id(platform, chat_id)
elif stream_type == "group":
return chat_api.ChatManager.get_group_stream_by_group_id(platform, chat_id)
except Exception as e:
logger.error(f"解析 stream_id ({stream_id}) 或获取 stream 失败: {e}")
return None
async def _gather_context(self, stream_id: str) -> Optional[Dict[str, Any]]:
"""
收集构建提示词所需的所有上下文信息
"""
stream = self._get_stream_from_id(stream_id)
if not stream: if not stream:
logger.warning(f"无法找到 stream_id 为 {stream_id} 的聊天流") logger.warning(f"无法找到 stream_id 为 {stream_id} 的聊天流")
return None return None
user_info = stream.user_info user_info = stream.user_info
if not user_info or not user_info.platform or not user_info.user_id:
logger.warning(f"Stream {stream_id} 的 user_info 不完整")
return None
person_id = person_api.get_person_id(user_info.platform, int(user_info.user_id)) person_id = person_api.get_person_id(user_info.platform, int(user_info.user_id))
person_info_manager = get_person_info_manager()
# 2. 获取记忆 # 获取日程
memories = await self.memory_manager.get_memories(person_id) schedules = await schedule_api.ScheduleAPI.get_today_schedule()
memory_context = "\n".join([f"- {m.content}" for m in memories]) schedule_context = "\n".join([f"- {s['title']} ({s['start_time']}-{s['end_time']})" for s in schedules]) if schedules else "今天没有日程安排。"
# 3. 获取日程 # 获取关系信息
schedules = await schedule_api.get_today_schedule(person_id) short_impression = await person_info_manager.get_value(person_id, "short_impression") or ""
schedule_context = "\n".join([f"- {s.title} ({s.start_time}-{s.end_time})" for s in schedules]) impression = await person_info_manager.get_value(person_id, "impression") or ""
attitude = await person_info_manager.get_value(person_id, "attitude") or 50
# 获取最近聊天记录
recent_messages = await message_api.get_recent_messages(stream_id, limit=10)
recent_chat_history = await message_api.build_readable_messages_to_str(recent_messages) if recent_messages else ""
return {
"person_id": person_id,
"user_info": user_info,
"schedule_context": schedule_context,
"recent_chat_history": recent_chat_history,
"relationship": {
"short_impression": short_impression,
"impression": impression,
"attitude": attitude
},
"persona": {
"core": global_config.personality.personality_core,
"side": global_config.personality.personality_side,
"identity": global_config.personality.identity,
},
"current_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
async def _make_decision(self, context: Dict[str, Any], start_mode: str) -> Optional[Dict[str, Any]]:
"""
决策模块:判断是否应该主动发起对话,以及聊什么话题
"""
persona = context['persona']
user_info = context['user_info']
relationship = context['relationship']
# 4. 构建提示词
prompt = f""" prompt = f"""
# Context # 角色
## Memory 你的名字是{global_config.bot.nickname},你的人设如下:
{memory_context} - 核心人设: {persona['core']}
- 侧面人设: {persona['side']}
- 身份: {persona['identity']}
## Schedule # 任务
{schedule_context} 现在是 {context['current_time']},你需要根据当前的情境,决定是否要主动向用户 '{user_info.user_nickname}' 发起对话。
# Task # 情境分析
You are a proactive assistant. Based on the user's memory and schedule, initiate a conversation. 1. **启动模式**: {start_mode} ({'初次见面/很久未见' if start_mode == 'cold_start' else '日常唤醒'})
2. **你的日程**:
{context['schedule_context']}
3. **你和Ta的关系**:
- 简短印象: {relationship['short_impression']}
- 详细印象: {relationship['impression']}
- 好感度: {relationship['attitude']}/100
4. **最近的聊天摘要**:
{context['recent_chat_history']}
# 决策指令
请综合以上所有信息做出决策。你的决策需要以JSON格式输出包含以下字段
- `should_reply`: bool, 是否应该发起对话。
- `topic`: str, 如果 `should_reply` 为 true你打算聊什么话题(例如:问候一下今天的日程、关心一下昨天的某件事、分享一个你自己的趣事等)
- `reason`: str, 做出此决策的简要理由。
---
示例1 (应该回复):
{{
"should_reply": true,
"topic": "提醒Ta今天下午有'项目会议'的日程",
"reason": "现在是上午Ta下午有个重要会议我觉得应该主动提醒一下这会显得我很贴心。"
}}
示例2 (不应回复):
{{
"should_reply": false,
"topic": null,
"reason": "虽然我们的关系不错但现在是深夜而且Ta今天的日程都已经完成了我没有合适的理由去打扰Ta。"
}}
---
请输出你的决策:
"""
is_success, response, _, _ = await llm_api.generate_with_model(prompt=prompt, model_config=model_config.model_task_config.utils)
if not is_success:
return {"should_reply": False, "reason": "决策模型生成失败"}
try:
# 假设LLM返回JSON格式的决策结果
decision = orjson.loads(response)
return decision
except orjson.JSONDecodeError:
logger.error(f"决策LLM返回的JSON格式无效: {response}")
return {"should_reply": False, "reason": "决策模型返回格式错误"}
def _build_plan_prompt(self, context: Dict[str, Any], start_mode: str, topic: str) -> str:
""" """
根据启动模式和决策话题,构建最终的规划提示词
"""
persona = context['persona']
user_info = context['user_info']
relationship = context['relationship']
if start_mode == "cold_start":
prompt = f"""
# 角色
你的名字是{global_config.bot.nickname},你的人设如下:
- 核心人设: {persona['core']}
- 侧面人设: {persona['side']}
- 身份: {persona['identity']}
# 任务
你需要主动向一个新朋友 '{user_info.user_nickname}' 发起对话。这是你们的第一次交流,或者很久没聊了。
# 情境分析
- **你和Ta的关系**:
- 简短印象: {relationship['short_impression']}
- 详细印象: {relationship['impression']}
- 好感度: {relationship['attitude']}/100
# 对话指引
- 你的目标是“破冰”,让对话自然地开始。
- 你应该围绕这个话题展开: {topic}
- 你的语气应该符合你的人设,友好且真诚。
- 直接输出你要说的第一句话,不要包含任何额外的前缀或解释。
"""
else: # wake_up
prompt = f"""
# 角色
你的名字是{global_config.bot.nickname},你的人设如下:
- 核心人设: {persona['core']}
- 侧面人设: {persona['side']}
- 身份: {persona['identity']}
# 任务
现在是 {context['current_time']},你需要主动向你的朋友 '{user_info.user_nickname}' 发起对话。
# 情境分析
1. **你的日程**:
{context['schedule_context']}
2. **你和Ta的关系**:
- 详细印象: {relationship['impression']}
- 好感度: {relationship['attitude']}/100
3. **最近的聊天摘要**:
{context['recent_chat_history']}
# 对话指引
- 你决定和Ta聊聊关于“{topic}”的话题。
- 请结合以上所有情境信息,自然地开启对话。
- 你的语气应该符合你的人设以及你对Ta的好感度。
- 直接输出你要说的第一句话,不要包含任何额外的前缀或解释。
"""
return prompt return prompt
async def execute_cold_start(self, user_info: UserInfo):
"""
为新用户执行“破冰”操作。
"""
logger.info(f"为新用户 {user_info.user_id} 执行“破冰”操作")
prompt = f"You are a proactive assistant. Initiate a conversation with a new friend named {user_info.user_nickname}."
response = await llm_api.generate(prompt)
await send_api.send_message(user_info.platform, user_info.user_id, response)
async def execute_wakeup(self, stream_id: str):
"""
为已冷却的聊天执行“唤醒”操作。
"""
logger.info(f"为聊天流 {stream_id} 执行“唤醒”操作")
prompt = await self._generate_prompt(stream_id)
if not prompt:
return
response = await llm_api.generate(prompt)
stream = chat_api.get_stream_by_stream_id(stream_id)
await send_api.send_message(stream.user_info.platform, stream.user_info.user_id, response)