refactor(chat): 移除亲和力流模块并将其重构为插件架构

BREAKING CHANGE: 原有的亲和力流相关模块(src/chat/affinity_flow/)已被完全移除,相关功能已重构为插件形式。需要更新配置文件和相关依赖。

- 删除 src/chat/affinity_flow/ 目录下的所有文件
- 将 AFC 管理器功能移至 chatter 插件中实现
- 更新相关导入路径和引用
- 重构关系追踪器和兴趣评分系统的初始化逻辑
- 调整聊天管理器和消息管理器以适应新的插件架构
This commit is contained in:
Windpicker-owo
2025-09-23 13:14:38 +08:00
parent ddcae01612
commit a218b932fb
29 changed files with 511 additions and 1068 deletions

View File

@@ -18,7 +18,7 @@ from src.plugin_system.apis import generator_api, database_api, send_api, messag
logger = get_logger("action_manager")
class ActionManager:
class ChatterActionManager:
"""
动作管理器,用于管理各种类型的动作
@@ -34,7 +34,7 @@ class ActionManager:
# 初始化时将默认动作加载到使用中的动作
self._using_actions = component_registry.get_default_actions()
self.log_prefix: str = "ActionManager"
self.log_prefix: str = "ChatterActionManager"
# === 执行Action方法 ===
@@ -449,7 +449,7 @@ class ActionManager:
data = "".join(map(str, data))
reply_text += data
# 如果是主动思考且内容为沉默,则不发送
# 如果是主动思考且内容为"沉默",则不发送
if is_proactive_thinking and data.strip() == "沉默":
logger.info(f"{self.log_prefix} 主动思考决定保持沉默,不发送消息")
continue
@@ -474,4 +474,4 @@ class ActionManager:
typing=True,
)
return reply_text
return reply_text

View File

@@ -8,7 +8,7 @@ from src.common.logger import get_logger
from src.config.config import global_config, model_config
from src.llm_models.utils_model import LLMRequest
from src.chat.message_receive.chat_stream import get_chat_manager, ChatMessageContext
from src.chat.planner_actions.action_manager import ActionManager
from src.chat.planner_actions.action_manager import ChatterActionManager
from src.chat.utils.chat_message_builder import get_raw_msg_before_timestamp_with_chat, build_readable_messages
from src.plugin_system.base.component_types import ActionInfo, ActionActivationType
from src.plugin_system.core.global_announcement_manager import global_announcement_manager
@@ -27,7 +27,7 @@ class ActionModifier:
支持并行判定和智能缓存优化。
"""
def __init__(self, action_manager: ActionManager, chat_id: str):
def __init__(self, action_manager: ChatterActionManager, chat_id: str):
"""初始化动作处理器"""
self.chat_id = chat_id
self.chat_stream: ChatStream = get_chat_manager().get_stream(self.chat_id) # type: ignore

View File

@@ -1,363 +0,0 @@
"""
PlanExecutor: 接收 Plan 对象并执行其中的所有动作。
集成用户关系追踪机制,自动记录交互并更新关系。
"""
import asyncio
import re
import time
from typing import Dict, List
from src.config.config import global_config
from src.chat.planner_actions.action_manager import ActionManager
from src.common.data_models.info_data_model import Plan, ActionPlannerInfo
from src.common.logger import get_logger
logger = get_logger("plan_executor")
class PlanExecutor:
"""
增强版PlanExecutor集成用户关系追踪机制。
功能:
1. 执行Plan中的所有动作
2. 自动记录用户交互并添加到关系追踪
3. 分类执行回复动作和其他动作
4. 提供完整的执行统计和监控
"""
def __init__(self, action_manager: ActionManager):
"""
初始化增强版PlanExecutor。
Args:
action_manager (ActionManager): 用于实际执行各种动作的管理器实例。
"""
self.action_manager = action_manager
# 执行统计
self.execution_stats = {
"total_executed": 0,
"successful_executions": 0,
"failed_executions": 0,
"reply_executions": 0,
"other_action_executions": 0,
"execution_times": [],
}
# 用户关系追踪引用
self.relationship_tracker = None
def set_relationship_tracker(self, relationship_tracker):
"""设置关系追踪器"""
self.relationship_tracker = relationship_tracker
async def execute(self, plan: Plan) -> Dict[str, any]:
"""
遍历并执行Plan对象中`decided_actions`列表里的所有动作。
Args:
plan (Plan): 包含待执行动作列表的Plan对象。
Returns:
Dict[str, any]: 执行结果统计信息
"""
if not plan.decided_actions:
logger.info("没有需要执行的动作。")
return {"executed_count": 0, "results": []}
execution_results = []
reply_actions = []
other_actions = []
# 分类动作:回复动作和其他动作
for action_info in plan.decided_actions:
if action_info.action_type in ["reply", "proactive_reply"]:
reply_actions.append(action_info)
else:
other_actions.append(action_info)
# 执行回复动作(优先执行)
if reply_actions:
reply_result = await self._execute_reply_actions(reply_actions, plan)
execution_results.extend(reply_result["results"])
self.execution_stats["reply_executions"] += len(reply_actions)
# 将其他动作放入后台任务执行,避免阻塞主流程
if other_actions:
asyncio.create_task(self._execute_other_actions(other_actions, plan))
logger.info(f"已将 {len(other_actions)} 个其他动作放入后台任务执行。")
# 注意:后台任务的结果不会立即计入本次返回的统计数据
# 更新总体统计
self.execution_stats["total_executed"] += len(plan.decided_actions)
successful_count = sum(1 for r in execution_results if r["success"])
self.execution_stats["successful_executions"] += successful_count
self.execution_stats["failed_executions"] += len(execution_results) - successful_count
logger.info(
f"规划执行完成: 总数={len(plan.decided_actions)}, 成功={successful_count}, 失败={len(execution_results) - successful_count}"
)
return {
"executed_count": len(plan.decided_actions),
"successful_count": successful_count,
"failed_count": len(execution_results) - successful_count,
"results": execution_results,
}
async def _execute_reply_actions(self, reply_actions: List[ActionPlannerInfo], plan: Plan) -> Dict[str, any]:
"""执行回复动作"""
results = []
for action_info in reply_actions:
result = await self._execute_single_reply_action(action_info, plan)
results.append(result)
return {"results": results}
async def _execute_single_reply_action(self, action_info: ActionPlannerInfo, plan: Plan) -> Dict[str, any]:
"""执行单个回复动作"""
start_time = time.time()
success = False
error_message = ""
reply_content = ""
try:
logger.info(f"执行回复动作: {action_info.action_type} (原因: {action_info.reasoning})")
# 获取用户ID - 兼容对象和字典
if hasattr(action_info.action_message, "user_info"):
user_id = action_info.action_message.user_info.user_id
else:
user_id = action_info.action_message.get("user_info", {}).get("user_id")
if user_id == str(global_config.bot.qq_account):
logger.warning("尝试回复自己,跳过此动作以防止死循环。")
return {
"action_type": action_info.action_type,
"success": False,
"error_message": "尝试回复自己,跳过此动作以防止死循环。",
"execution_time": 0,
"reasoning": action_info.reasoning,
"reply_content": "",
}
# 构建回复动作参数
action_params = {
"chat_id": plan.chat_id,
"target_message": action_info.action_message,
"reasoning": action_info.reasoning,
"action_data": action_info.action_data or {},
}
# 通过动作管理器执行回复
reply_content = await self.action_manager.execute_action(
action_name=action_info.action_type, **action_params
)
success = True
logger.info(f"回复动作 '{action_info.action_type}' 执行成功。")
except Exception as e:
error_message = str(e)
logger.error(f"执行回复动作失败: {action_info.action_type}, 错误: {error_message}")
# 记录用户关系追踪
if success and action_info.action_message:
await self._track_user_interaction(action_info, plan, reply_content)
execution_time = time.time() - start_time
self.execution_stats["execution_times"].append(execution_time)
return {
"action_type": action_info.action_type,
"success": success,
"error_message": error_message,
"execution_time": execution_time,
"reasoning": action_info.reasoning,
"reply_content": reply_content[:200] + "..." if len(reply_content) > 200 else reply_content,
}
async def _execute_other_actions(self, other_actions: List[ActionPlannerInfo], plan: Plan) -> Dict[str, any]:
"""执行其他动作"""
results = []
# 并行执行其他动作
tasks = []
for action_info in other_actions:
task = self._execute_single_other_action(action_info, plan)
tasks.append(task)
if tasks:
executed_results = await asyncio.gather(*tasks, return_exceptions=True)
for i, result in enumerate(executed_results):
if isinstance(result, Exception):
logger.error(f"执行动作 {other_actions[i].action_type} 时发生异常: {result}")
results.append(
{
"action_type": other_actions[i].action_type,
"success": False,
"error_message": str(result),
"execution_time": 0,
"reasoning": other_actions[i].reasoning,
}
)
else:
results.append(result)
return {"results": results}
async def _execute_single_other_action(self, action_info: ActionPlannerInfo, plan: Plan) -> Dict[str, any]:
"""执行单个其他动作"""
start_time = time.time()
success = False
error_message = ""
try:
logger.info(f"执行其他动作: {action_info.action_type} (原因: {action_info.reasoning})")
action_data = action_info.action_data or {}
# 针对 poke_user 动作,特殊处理
if action_info.action_type == "poke_user":
target_message = action_info.action_message
if target_message:
# 优先直接获取 user_id这才是最可靠的信息
user_id = target_message.get("user_id")
if user_id:
action_data["user_id"] = user_id
logger.info(f"检测到戳一戳动作目标用户ID: {user_id}")
else:
# 如果没有 user_id再尝试用 user_nickname 作为备用方案
user_name = target_message.get("user_nickname")
if user_name:
action_data["user_name"] = user_name
logger.info(f"检测到戳一戳动作,目标用户: {user_name}")
else:
logger.warning("无法从戳一戳消息中获取用户ID或昵称。")
# 传递原始消息ID以支持引用
action_data["target_message_id"] = target_message.get("message_id")
# 构建动作参数
action_params = {
"chat_id": plan.chat_id,
"target_message": action_info.action_message,
"reasoning": action_info.reasoning,
"action_data": action_data,
}
# 通过动作管理器执行动作
await self.action_manager.execute_action(action_name=action_info.action_type, **action_params)
success = True
logger.info(f"其他动作 '{action_info.action_type}' 执行成功。")
except Exception as e:
error_message = str(e)
logger.error(f"执行其他动作失败: {action_info.action_type}, 错误: {error_message}")
execution_time = time.time() - start_time
self.execution_stats["execution_times"].append(execution_time)
return {
"action_type": action_info.action_type,
"success": success,
"error_message": error_message,
"execution_time": execution_time,
"reasoning": action_info.reasoning,
}
async def _track_user_interaction(self, action_info: ActionPlannerInfo, plan: Plan, reply_content: str):
"""追踪用户交互 - 集成回复后关系追踪"""
try:
if not action_info.action_message:
return
# 获取用户信息 - 处理对象和字典两种情况
if hasattr(action_info.action_message, "user_info"):
# 对象情况
user_info = action_info.action_message.user_info
user_id = user_info.user_id
user_name = user_info.user_nickname or user_id
user_message = action_info.action_message.content
else:
# 字典情况
user_info = action_info.action_message.get("user_info", {})
user_id = user_info.get("user_id")
user_name = user_info.get("user_nickname") or user_id
user_message = action_info.action_message.get("content", "")
if not user_id:
logger.debug("跳过追踪缺少用户ID")
return
# 如果有设置关系追踪器,执行回复后关系追踪
if self.relationship_tracker:
# 记录基础交互信息(保持向后兼容)
self.relationship_tracker.add_interaction(
user_id=user_id,
user_name=user_name,
user_message=user_message,
bot_reply=reply_content,
reply_timestamp=time.time(),
)
# 执行新的回复后关系追踪
await self.relationship_tracker.track_reply_relationship(
user_id=user_id, user_name=user_name, bot_reply_content=reply_content, reply_timestamp=time.time()
)
logger.debug(f"已执行用户交互追踪: {user_id}")
except Exception as e:
logger.error(f"追踪用户交互时出错: {e}")
logger.debug(f"action_message类型: {type(action_info.action_message)}")
logger.debug(f"action_message内容: {action_info.action_message}")
def get_execution_stats(self) -> Dict[str, any]:
"""获取执行统计信息"""
stats = self.execution_stats.copy()
# 计算平均执行时间
if stats["execution_times"]:
avg_time = sum(stats["execution_times"]) / len(stats["execution_times"])
stats["average_execution_time"] = avg_time
stats["max_execution_time"] = max(stats["execution_times"])
stats["min_execution_time"] = min(stats["execution_times"])
else:
stats["average_execution_time"] = 0
stats["max_execution_time"] = 0
stats["min_execution_time"] = 0
# 移除执行时间列表以避免返回过大数据
stats.pop("execution_times", None)
return stats
def reset_stats(self):
"""重置统计信息"""
self.execution_stats = {
"total_executed": 0,
"successful_executions": 0,
"failed_executions": 0,
"reply_executions": 0,
"other_action_executions": 0,
"execution_times": [],
}
def get_recent_performance(self, limit: int = 10) -> List[Dict[str, any]]:
"""获取最近的执行性能"""
recent_times = self.execution_stats["execution_times"][-limit:]
if not recent_times:
return []
return [
{
"execution_index": i + 1,
"execution_time": time_val,
"timestamp": time.time() - (len(recent_times) - i) * 60, # 估算时间戳
}
for i, time_val in enumerate(recent_times)
]

View File

@@ -1,519 +0,0 @@
"""
PlanFilter: 接收 Plan 对象,根据不同模式的逻辑进行筛选,决定最终要执行的动作。
"""
import orjson
import time
import traceback
from datetime import datetime
from typing import Any, Dict, List, Optional
from json_repair import repair_json
from src.chat.memory_system.Hippocampus import hippocampus_manager
from src.chat.utils.chat_message_builder import (
build_readable_actions,
build_readable_messages_with_id,
get_actions_by_timestamp_with_chat,
)
from src.chat.utils.prompt import global_prompt_manager
from src.common.data_models.info_data_model import ActionPlannerInfo, Plan
from src.common.logger import get_logger
from src.config.config import global_config, model_config
from src.llm_models.utils_model import LLMRequest
from src.mood.mood_manager import mood_manager
from src.plugin_system.base.component_types import ActionInfo, ChatMode
from src.schedule.schedule_manager import schedule_manager
logger = get_logger("plan_filter")
class PlanFilter:
"""
根据 Plan 中的模式和信息,筛选并决定最终的动作。
"""
def __init__(self):
self.planner_llm = LLMRequest(model_set=model_config.model_task_config.planner, request_type="planner")
self.last_obs_time_mark = 0.0
async def filter(self, reply_not_available: bool, plan: Plan) -> Plan:
"""
执行筛选逻辑,并填充 Plan 对象的 decided_actions 字段。
"""
logger.debug(f"墨墨在这里加了日志 -> filter 入口 plan: {plan}")
try:
prompt, used_message_id_list = await self._build_prompt(plan)
plan.llm_prompt = prompt
logger.debug(f"墨墨在这里加了日志 -> LLM prompt: {prompt}")
llm_content, _ = await self.planner_llm.generate_response_async(prompt=prompt)
if llm_content:
logger.debug(f"墨墨在这里加了日志 -> LLM a原始返回: {llm_content}")
try:
parsed_json = orjson.loads(repair_json(llm_content))
except orjson.JSONDecodeError:
parsed_json = {"action": "no_action", "reason": "返回内容无法解析为JSON"}
logger.debug(f"墨墨在这里加了日志 -> 解析后的 JSON: {parsed_json}")
if "reply" in plan.available_actions and reply_not_available:
# 如果reply动作不可用但llm返回的仍然有reply则改为no_reply
if isinstance(parsed_json, dict) and parsed_json.get("action") == "reply":
parsed_json["action"] = "no_reply"
elif isinstance(parsed_json, list):
for item in parsed_json:
if isinstance(item, dict) and item.get("action") == "reply":
item["action"] = "no_reply"
item["reason"] += " (但由于兴趣度不足reply动作不可用已改为no_reply)"
if isinstance(parsed_json, dict):
parsed_json = [parsed_json]
if isinstance(parsed_json, list):
final_actions = []
reply_action_added = False
# 定义回复类动作的集合,方便扩展
reply_action_types = {"reply", "proactive_reply"}
for item in parsed_json:
if not isinstance(item, dict):
continue
# 预解析 action_type 来进行判断
action_type = item.get("action", "no_action")
if action_type in reply_action_types:
if not reply_action_added:
final_actions.extend(await self._parse_single_action(item, used_message_id_list, plan))
reply_action_added = True
else:
# 非回复类动作直接添加
final_actions.extend(await self._parse_single_action(item, used_message_id_list, plan))
plan.decided_actions = self._filter_no_actions(final_actions)
except Exception as e:
logger.error(f"筛选 Plan 时出错: {e}\n{traceback.format_exc()}")
plan.decided_actions = [ActionPlannerInfo(action_type="no_action", reasoning=f"筛选时出错: {e}")]
logger.debug(f"墨墨在这里加了日志 -> filter 出口 decided_actions: {plan.decided_actions}")
return plan
async def _build_prompt(self, plan: Plan) -> tuple[str, list]:
"""
根据 Plan 对象构建提示词。
"""
try:
time_block = f"当前时间:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
bot_name = global_config.bot.nickname
bot_nickname = (
f",也有人叫你{','.join(global_config.bot.alias_names)}" if global_config.bot.alias_names else ""
)
bot_core_personality = global_config.personality.personality_core
identity_block = f"你的名字是{bot_name}{bot_nickname},你{bot_core_personality}"
schedule_block = ""
if global_config.planning_system.schedule_enable:
if current_activity := schedule_manager.get_current_activity():
schedule_block = f"你当前正在:{current_activity},但注意它与群聊的聊天无关。"
mood_block = ""
if global_config.mood.enable_mood:
chat_mood = mood_manager.get_mood_by_chat_id(plan.chat_id)
mood_block = f"你现在的心情是:{chat_mood.mood_state}"
if plan.mode == ChatMode.PROACTIVE:
long_term_memory_block = await self._get_long_term_memory_context()
chat_content_block, message_id_list = build_readable_messages_with_id(
messages=[msg.flatten() for msg in plan.chat_history],
timestamp_mode="normal",
truncate=False,
show_actions=False,
)
prompt_template = await global_prompt_manager.get_prompt_async("proactive_planner_prompt")
actions_before_now = get_actions_by_timestamp_with_chat(
chat_id=plan.chat_id,
timestamp_start=time.time() - 3600,
timestamp_end=time.time(),
limit=5,
)
actions_before_now_block = build_readable_actions(actions=actions_before_now)
actions_before_now_block = f"你刚刚选择并执行过的action是\n{actions_before_now_block}"
prompt = prompt_template.format(
time_block=time_block,
identity_block=identity_block,
schedule_block=schedule_block,
mood_block=mood_block,
long_term_memory_block=long_term_memory_block,
chat_content_block=chat_content_block or "最近没有聊天内容。",
actions_before_now_block=actions_before_now_block,
)
return prompt, message_id_list
# 构建已读/未读历史消息
read_history_block, unread_history_block, message_id_list = await self._build_read_unread_history_blocks(
plan
)
# 为了兼容性保留原有的chat_content_block
chat_content_block, _ = build_readable_messages_with_id(
messages=[msg.flatten() for msg in plan.chat_history],
timestamp_mode="normal",
read_mark=self.last_obs_time_mark,
truncate=True,
show_actions=True,
)
actions_before_now = get_actions_by_timestamp_with_chat(
chat_id=plan.chat_id,
timestamp_start=time.time() - 3600,
timestamp_end=time.time(),
limit=5,
)
actions_before_now_block = build_readable_actions(actions=actions_before_now)
actions_before_now_block = f"你刚刚选择并执行过的action是\n{actions_before_now_block}"
self.last_obs_time_mark = time.time()
mentioned_bonus = ""
if global_config.chat.mentioned_bot_inevitable_reply:
mentioned_bonus = "\n- 有人提到你"
if global_config.chat.at_bot_inevitable_reply:
mentioned_bonus = "\n- 有人提到你或者at你"
if plan.mode == ChatMode.FOCUS:
no_action_block = """
动作no_action
动作描述:不选择任何动作
{{
"action": "no_action",
"reason":"不动作的原因"
}}
动作no_reply
动作描述:不进行回复,等待合适的回复时机
- 当你刚刚发送了消息没有人回复时选择no_reply
- 当你一次发送了太多消息为了避免打扰聊天节奏选择no_reply
{{
"action": "no_reply",
"reason":"不回复的原因"
}}
"""
else: # NORMAL Mode
no_action_block = """重要说明:
- 'reply' 表示只进行普通聊天回复,不执行任何额外动作
- 其他action表示在普通回复的基础上执行相应的额外动作
{{
"action": "reply",
"target_message_id":"触发action的消息id",
"reason":"回复的原因"
}}"""
is_group_chat = plan.target_info.platform == "group" if plan.target_info else True
chat_context_description = "你现在正在一个群聊中"
if not is_group_chat and plan.target_info:
chat_target_name = plan.target_info.person_name or plan.target_info.user_nickname or "对方"
chat_context_description = f"你正在和 {chat_target_name} 私聊"
action_options_block = await self._build_action_options(plan.available_actions)
moderation_prompt_block = "请不要输出违法违规内容,不要输出色情,暴力,政治相关内容,如有敏感内容,请规避。"
custom_prompt_block = ""
if global_config.custom_prompt.planner_custom_prompt_content:
custom_prompt_block = global_config.custom_prompt.planner_custom_prompt_content
users_in_chat_str = "" # TODO: Re-implement user list fetching if needed
planner_prompt_template = await global_prompt_manager.get_prompt_async("planner_prompt")
prompt = planner_prompt_template.format(
schedule_block=schedule_block,
mood_block=mood_block,
time_block=time_block,
chat_context_description=chat_context_description,
read_history_block=read_history_block,
unread_history_block=unread_history_block,
actions_before_now_block=actions_before_now_block,
mentioned_bonus=mentioned_bonus,
no_action_block=no_action_block,
action_options_text=action_options_block,
moderation_prompt=moderation_prompt_block,
identity_block=identity_block,
custom_prompt_block=custom_prompt_block,
bot_name=bot_name,
users_in_chat=users_in_chat_str,
)
return prompt, message_id_list
except Exception as e:
logger.error(f"构建 Planner 提示词时出错: {e}")
logger.error(traceback.format_exc())
return "构建 Planner Prompt 时出错", []
async def _build_read_unread_history_blocks(self, plan: Plan) -> tuple[str, str, list]:
"""构建已读/未读历史消息块"""
try:
# 从message_manager获取真实的已读/未读消息
from src.chat.message_manager.message_manager import message_manager
from src.chat.utils.utils import assign_message_ids
# 获取聊天流的上下文
stream_context = message_manager.stream_contexts.get(plan.chat_id)
# 获取真正的已读和未读消息
read_messages = stream_context.history_messages # 已读消息存储在history_messages中
unread_messages = stream_context.get_unread_messages() # 获取未读消息
# 构建已读历史消息块
if read_messages:
read_content, read_ids = build_readable_messages_with_id(
messages=[msg.flatten() for msg in read_messages[-50:]], # 限制数量
timestamp_mode="normal_no_YMD",
truncate=False,
show_actions=False,
)
read_history_block = f"{read_content}"
else:
read_history_block = "暂无已读历史消息"
# 构建未读历史消息块(包含兴趣度)
if unread_messages:
# 扁平化未读消息用于计算兴趣度和格式化
flattened_unread = [msg.flatten() for msg in unread_messages]
# 尝试获取兴趣度评分(返回以真实 message_id 为键的字典)
interest_scores = await self._get_interest_scores_for_messages(flattened_unread)
# 为未读消息分配短 id保持与 build_readable_messages_with_id 的一致结构)
message_id_list = assign_message_ids(flattened_unread)
unread_lines = []
for idx, msg in enumerate(flattened_unread):
mapped = message_id_list[idx]
synthetic_id = mapped.get("id")
original_msg_id = msg.get("message_id") or msg.get("id")
msg_time = time.strftime("%H:%M:%S", time.localtime(msg.get("time", time.time())))
msg_content = msg.get("processed_plain_text", "")
# 添加兴趣度信息
interest_score = interest_scores.get(original_msg_id, 0.0)
interest_text = f" [兴趣度: {interest_score:.3f}]" if interest_score > 0 else ""
# 在未读行中显示合成id方便 planner 返回时使用
unread_lines.append(f"{msg_time} {synthetic_id}: {msg_content}{interest_text}")
unread_history_block = "\n".join(unread_lines)
else:
unread_history_block = "暂无未读历史消息"
return read_history_block, unread_history_block, message_id_list
except Exception as e:
logger.error(f"构建已读/未读历史消息块时出错: {e}")
return "构建已读历史消息时出错", "构建未读历史消息时出错", []
async def _get_interest_scores_for_messages(self, messages: List[dict]) -> dict[str, float]:
"""为消息获取兴趣度评分"""
interest_scores = {}
try:
from src.chat.affinity_flow.interest_scoring import interest_scoring_system
from src.common.data_models.database_data_model import DatabaseMessages
# 转换消息格式
db_messages = []
for msg_dict in messages:
try:
db_msg = DatabaseMessages(
message_id=msg_dict.get("message_id", ""),
time=msg_dict.get("time", time.time()),
chat_id=msg_dict.get("chat_id", ""),
processed_plain_text=msg_dict.get("processed_plain_text", ""),
user_id=msg_dict.get("user_id", ""),
user_nickname=msg_dict.get("user_nickname", ""),
user_platform=msg_dict.get("platform", "qq"),
chat_info_group_id=msg_dict.get("group_id", ""),
chat_info_group_name=msg_dict.get("group_name", ""),
chat_info_group_platform=msg_dict.get("platform", "qq"),
)
db_messages.append(db_msg)
except Exception as e:
logger.warning(f"转换消息格式失败: {e}")
continue
# 计算兴趣度评分
if db_messages:
bot_nickname = global_config.bot.nickname or "麦麦"
scores = await interest_scoring_system.calculate_interest_scores(db_messages, bot_nickname)
# 构建兴趣度字典
for score in scores:
interest_scores[score.message_id] = score.total_score
except Exception as e:
logger.warning(f"获取兴趣度评分失败: {e}")
return interest_scores
async def _parse_single_action(
self, action_json: dict, message_id_list: list, plan: Plan
) -> List[ActionPlannerInfo]:
parsed_actions = []
try:
action = action_json.get("action", "no_action")
reasoning = action_json.get("reason", "未提供原因")
action_data = {k: v for k, v in action_json.items() if k not in ["action", "reason"]}
target_message_obj = None
if action not in ["no_action", "no_reply", "do_nothing", "proactive_reply"]:
if target_message_id := action_json.get("target_message_id"):
target_message_dict = self._find_message_by_id(target_message_id, message_id_list)
else:
# 如果LLM没有指定target_message_id我们就默认选择最新的一条消息
target_message_dict = self._get_latest_message(message_id_list)
if target_message_dict:
# 直接使用字典作为action_message避免DatabaseMessages对象创建失败
target_message_obj = target_message_dict
# 替换action_data中的临时ID为真实ID
if "target_message_id" in action_data:
real_message_id = target_message_dict.get("message_id") or target_message_dict.get("id")
if real_message_id:
action_data["target_message_id"] = real_message_id
else:
# 如果找不到目标消息对于reply动作来说这是必需的应该记录警告
if action == "reply":
logger.warning(
f"reply动作找不到目标消息target_message_id: {action_json.get('target_message_id')}"
)
# 将reply动作改为no_action避免后续执行时出错
action = "no_action"
reasoning = f"找不到目标消息进行回复。原始理由: {reasoning}"
available_action_names = list(plan.available_actions.keys())
if (
action not in ["no_action", "no_reply", "reply", "do_nothing", "proactive_reply"]
and action not in available_action_names
):
reasoning = f"LLM 返回了当前不可用的动作 '{action}'。原始理由: {reasoning}"
action = "no_action"
parsed_actions.append(
ActionPlannerInfo(
action_type=action,
reasoning=reasoning,
action_data=action_data,
action_message=target_message_obj,
available_actions=plan.available_actions,
)
)
except Exception as e:
logger.error(f"解析单个action时出错: {e}")
parsed_actions.append(
ActionPlannerInfo(
action_type="no_action",
reasoning=f"解析action时出错: {e}",
)
)
return parsed_actions
def _filter_no_actions(self, action_list: List[ActionPlannerInfo]) -> List[ActionPlannerInfo]:
non_no_actions = [a for a in action_list if a.action_type not in ["no_action", "no_reply"]]
if non_no_actions:
return non_no_actions
return action_list[:1] if action_list else []
async def _get_long_term_memory_context(self) -> str:
try:
now = datetime.now()
keywords = ["今天", "日程", "计划"]
if 5 <= now.hour < 12:
keywords.append("早上")
elif 12 <= now.hour < 18:
keywords.append("中午")
else:
keywords.append("晚上")
retrieved_memories = await hippocampus_manager.get_memory_from_topic(
valid_keywords=keywords, max_memory_num=5, max_memory_length=1
)
if not retrieved_memories:
return "最近没有什么特别的记忆。"
memory_statements = [f"关于'{topic}', 你记得'{memory_item}'" for topic, memory_item in retrieved_memories]
return " ".join(memory_statements)
except Exception as e:
logger.error(f"获取长期记忆时出错: {e}")
return "回忆时出现了一些问题。"
async def _build_action_options(self, current_available_actions: Dict[str, ActionInfo]) -> str:
action_options_block = ""
for action_name, action_info in current_available_actions.items():
param_text = ""
if action_info.action_parameters:
param_text = "\n" + "\n".join(
f' "{p_name}":"{p_desc}"' for p_name, p_desc in action_info.action_parameters.items()
)
require_text = "\n".join(f"- {req}" for req in action_info.action_require)
using_action_prompt = await global_prompt_manager.get_prompt_async("action_prompt")
action_options_block += using_action_prompt.format(
action_name=action_name,
action_description=action_info.description,
action_parameters=param_text,
action_require=require_text,
)
return action_options_block
def _find_message_by_id(self, message_id: str, message_id_list: list) -> Optional[Dict[str, Any]]:
# 兼容多种 message_id 格式数字、m123、buffered-xxxx
# 如果是纯数字,补上 m 前缀以兼容旧格式
candidate_ids = {message_id}
if message_id.isdigit():
candidate_ids.add(f"m{message_id}")
# 如果是 m 开头且后面是数字,尝试去掉 m 前缀的数字形式
if message_id.startswith("m") and message_id[1:].isdigit():
candidate_ids.add(message_id[1:])
# 逐项匹配 message_id_list每项可能为 {'id':..., 'message':...}
for item in message_id_list:
# 支持 message_id_list 中直接是字符串/ID 的情形
if isinstance(item, str):
if item in candidate_ids:
# 没有 message 对象返回None
return None
continue
if not isinstance(item, dict):
continue
item_id = item.get("id")
# 直接匹配分配的短 id
if item_id and item_id in candidate_ids:
return item.get("message")
# 有时 message 存储里会有原始的 message_id 字段(如 buffered-xxxx
message_obj = item.get("message")
if isinstance(message_obj, dict):
orig_mid = message_obj.get("message_id") or message_obj.get("id")
if orig_mid and orig_mid in candidate_ids:
return message_obj
# 作为兜底,尝试在 message_id_list 中找到 message.message_id 匹配
for item in message_id_list:
if isinstance(item, dict) and isinstance(item.get("message"), dict):
mid = item["message"].get("message_id") or item["message"].get("id")
if mid == message_id:
return item["message"]
return None
def _get_latest_message(self, message_id_list: list) -> Optional[Dict[str, Any]]:
if not message_id_list:
return None
return message_id_list[-1].get("message")

View File

@@ -1,132 +0,0 @@
"""
PlanGenerator: 负责搜集和汇总所有决策所需的信息,生成一个未经筛选的“原始计划” (Plan)。
"""
import time
from typing import Dict
from src.chat.utils.chat_message_builder import get_raw_msg_before_timestamp_with_chat
from src.chat.utils.utils import get_chat_type_and_target_info
from src.common.data_models.database_data_model import DatabaseMessages
from src.common.data_models.info_data_model import Plan, TargetPersonInfo
from src.config.config import global_config
from src.plugin_system.base.component_types import ActionActivationType, ActionInfo, ChatMode, ChatType, ComponentType
from src.plugin_system.core.component_registry import component_registry
class PlanGenerator:
"""
PlanGenerator 负责在规划流程的初始阶段收集所有必要信息。
它会汇总以下信息来构建一个“原始”的 Plan 对象,该对象后续会由 PlanFilter 进行筛选:
- 当前聊天信息 (ID, 目标用户)
- 当前可用的动作列表
- 最近的聊天历史记录
Attributes:
chat_id (str): 当前聊天的唯一标识符。
action_manager (ActionManager): 用于获取可用动作列表的管理器。
"""
def __init__(self, chat_id: str):
"""
初始化 PlanGenerator。
Args:
chat_id (str): 当前聊天的 ID。
"""
from src.chat.planner_actions.action_manager import ActionManager
self.chat_id = chat_id
# 注意ActionManager 可能需要根据实际情况初始化
self.action_manager = ActionManager()
async def generate(self, mode: ChatMode) -> Plan:
"""
收集所有信息,生成并返回一个初始的 Plan 对象。
这个 Plan 对象包含了决策所需的所有上下文信息。
Args:
mode (ChatMode): 当前的聊天模式。
Returns:
Plan: 一个填充了初始上下文信息的 Plan 对象。
"""
_is_group_chat, chat_target_info_dict = get_chat_type_and_target_info(self.chat_id)
target_info = None
if chat_target_info_dict:
target_info = TargetPersonInfo(**chat_target_info_dict)
available_actions = self._get_available_actions()
chat_history_raw = get_raw_msg_before_timestamp_with_chat(
chat_id=self.chat_id,
timestamp=time.time(),
limit=int(global_config.chat.max_context_size),
)
chat_history = [DatabaseMessages(**msg) for msg in chat_history_raw]
plan = Plan(
chat_id=self.chat_id,
mode=mode,
available_actions=available_actions,
chat_history=chat_history,
target_info=target_info,
)
return plan
def _get_available_actions(self) -> Dict[str, "ActionInfo"]:
"""
从 ActionManager 和组件注册表中获取当前所有可用的动作。
它会合并已注册的动作和系统级动作(如 "no_reply"
并以字典形式返回。
Returns:
Dict[str, "ActionInfo"]: 一个字典,键是动作名称,值是 ActionInfo 对象。
"""
current_available_actions_dict = self.action_manager.get_using_actions()
all_registered_actions: Dict[str, ActionInfo] = component_registry.get_components_by_type( # type: ignore
ComponentType.ACTION
)
current_available_actions = {}
for action_name in current_available_actions_dict:
if action_name in all_registered_actions:
current_available_actions[action_name] = all_registered_actions[action_name]
reply_info = ActionInfo(
name="reply",
component_type=ComponentType.ACTION,
description="系统级动作:选择回复消息的决策",
action_parameters={"content": "回复的文本内容", "reply_to_message_id": "要回复的消息ID"},
action_require=[
"你想要闲聊或者随便附和",
"当用户提到你或艾特你时",
"当需要回答用户的问题时",
"当你想参与对话时",
"当用户分享有趣的内容时",
],
activation_type=ActionActivationType.ALWAYS,
activation_keywords=[],
associated_types=["text", "reply"],
plugin_name="SYSTEM",
enabled=True,
parallel_action=False,
mode_enable=ChatMode.ALL,
chat_type_allow=ChatType.ALL,
)
no_reply_info = ActionInfo(
name="no_reply",
component_type=ComponentType.ACTION,
description="系统级动作:选择不回复消息的决策",
action_parameters={},
activation_keywords=[],
plugin_name="SYSTEM",
enabled=True,
parallel_action=False,
)
current_available_actions["no_reply"] = no_reply_info
current_available_actions["reply"] = reply_info
return current_available_actions

View File

@@ -1,260 +0,0 @@
"""
主规划器入口,负责协调 PlanGenerator, PlanFilter, 和 PlanExecutor。
集成兴趣度评分系统和用户关系追踪机制,实现智能化的聊天决策。
"""
from dataclasses import asdict
from typing import TYPE_CHECKING, Dict, List, Optional, Tuple
from src.plugin_system.base.component_types import ChatMode
from src.chat.planner_actions.plan_executor import PlanExecutor
from src.chat.planner_actions.plan_filter import PlanFilter
from src.chat.planner_actions.plan_generator import PlanGenerator
from src.chat.affinity_flow.interest_scoring import InterestScoringSystem
from src.chat.affinity_flow.relationship_tracker import UserRelationshipTracker
from src.common.logger import get_logger
from src.config.config import global_config
if TYPE_CHECKING:
from src.chat.planner_actions.action_manager import ActionManager
from src.common.data_models.message_manager_data_model import StreamContext
from src.common.data_models.info_data_model import Plan
# 导入提示词模块以确保其被初始化
from src.chat.planner_actions import planner_prompts # noqa
logger = get_logger("planner")
class ActionPlanner:
"""
增强版ActionPlanner集成兴趣度评分和用户关系追踪机制。
核心功能:
1. 兴趣度评分系统:根据兴趣匹配度、关系分、提及度、时间因子对消息评分
2. 用户关系追踪:自动追踪用户交互并更新关系分
3. 智能回复决策:基于兴趣度阈值和连续不回复概率的智能决策
4. 完整的规划流程:生成→筛选→执行的完整三阶段流程
"""
def __init__(self, chat_id: str, action_manager: "ActionManager"):
"""
初始化增强版ActionPlanner。
Args:
chat_id (str): 当前聊天的 ID。
action_manager (ActionManager): 一个 ActionManager 实例。
"""
self.chat_id = chat_id
self.action_manager = action_manager
self.generator = PlanGenerator(chat_id)
self.filter = PlanFilter()
self.executor = PlanExecutor(action_manager)
# 初始化兴趣度评分系统
self.interest_scoring = InterestScoringSystem()
# 尝试获取全局关系追踪器,如果没有则创建新的
try:
from src.chat.affinity_flow.relationship_integration import get_relationship_tracker
global_relationship_tracker = get_relationship_tracker()
if global_relationship_tracker:
# 使用全局关系追踪器
self.relationship_tracker = global_relationship_tracker
# 设置兴趣度评分系统的关系追踪器引用
self.interest_scoring.relationship_tracker = self.relationship_tracker
logger.info("使用全局关系追踪器")
else:
# 创建新的关系追踪器
self.relationship_tracker = UserRelationshipTracker(self.interest_scoring)
logger.info("创建新的关系追踪器实例")
except Exception as e:
logger.warning(f"获取全局关系追踪器失败: {e}")
# 创建新的关系追踪器
self.relationship_tracker = UserRelationshipTracker(self.interest_scoring)
# 设置执行器的关系追踪器
self.executor.set_relationship_tracker(self.relationship_tracker)
# 规划器统计
self.planner_stats = {
"total_plans": 0,
"successful_plans": 0,
"failed_plans": 0,
"replies_generated": 0,
"other_actions_executed": 0,
}
async def plan(
self, mode: ChatMode = ChatMode.FOCUS, context: "StreamContext" = None
) -> Tuple[List[Dict], Optional[Dict]]:
"""
执行完整的增强版规划流程。
Args:
mode (ChatMode): 当前的聊天模式,默认为 FOCUS。
context (StreamContext): 包含聊天流消息的上下文对象。
Returns:
Tuple[List[Dict], Optional[Dict]]: 一个元组,包含:
- final_actions_dict (List[Dict]): 最终确定的动作列表(字典格式)。
- final_target_message_dict (Optional[Dict]): 最终的目标消息(字典格式)。
"""
try:
self.planner_stats["total_plans"] += 1
return await self._enhanced_plan_flow(mode, context)
except Exception as e:
logger.error(f"规划流程出错: {e}")
self.planner_stats["failed_plans"] += 1
return [], None
async def _enhanced_plan_flow(self, mode: ChatMode, context: "StreamContext") -> Tuple[List[Dict], Optional[Dict]]:
"""执行增强版规划流程"""
try:
# 1. 生成初始 Plan
initial_plan = await self.generator.generate(mode)
unread_messages = context.get_unread_messages() if context else []
# 2. 兴趣度评分 - 只对未读消息进行评分
if unread_messages:
bot_nickname = global_config.bot.nickname
interest_scores = await self.interest_scoring.calculate_interest_scores(unread_messages, bot_nickname)
# 3. 根据兴趣度调整可用动作
if interest_scores:
latest_score = max(interest_scores, key=lambda s: s.total_score)
latest_message = next(
(msg for msg in unread_messages if msg.message_id == latest_score.message_id), None
)
should_reply, score = self.interest_scoring.should_reply(latest_score, latest_message)
reply_not_available = False
if not should_reply and "reply" in initial_plan.available_actions:
logger.info(f"兴趣度不足 ({latest_score.total_score:.2f}),移除'回复'动作。")
reply_not_available = True
# base_threshold = self.interest_scoring.reply_threshold
# 检查兴趣度是否达到非回复动作阈值
non_reply_action_interest_threshold = global_config.affinity_flow.non_reply_action_interest_threshold
if score < non_reply_action_interest_threshold:
logger.info(
f"兴趣度 {score:.3f} 低于非回复动作阈值 {non_reply_action_interest_threshold:.3f},不执行任何动作。"
)
# 直接返回 no_action
from src.common.data_models.info_data_model import ActionPlannerInfo
no_action = ActionPlannerInfo(
action_type="no_action",
reasoning=f"兴趣度评分 {score:.3f} 未达阈值 {non_reply_action_interest_threshold:.3f}",
action_data={},
action_message=None,
)
filtered_plan = initial_plan
filtered_plan.decided_actions = [no_action]
else:
# 4. 筛选 Plan
filtered_plan = await self.filter.filter(reply_not_available, initial_plan)
# 检查filtered_plan是否有reply动作以便记录reply action
has_reply_action = False
for decision in filtered_plan.decided_actions:
if decision.action_type == "reply":
has_reply_action = True
self.interest_scoring.record_reply_action(has_reply_action)
# 5. 使用 PlanExecutor 执行 Plan
execution_result = await self.executor.execute(filtered_plan)
# 6. 根据执行结果更新统计信息
self._update_stats_from_execution_result(execution_result)
# 7. 检查关系更新
await self.relationship_tracker.check_and_update_relationships()
# 8. 返回结果
return self._build_return_result(filtered_plan)
except Exception as e:
logger.error(f"增强版规划流程出错: {e}")
self.planner_stats["failed_plans"] += 1
return [], None
def _update_stats_from_execution_result(self, execution_result: Dict[str, any]):
"""根据执行结果更新规划器统计"""
if not execution_result:
return
successful_count = execution_result.get("successful_count", 0)
# 更新成功执行计数
self.planner_stats["successful_plans"] += successful_count
# 统计回复动作和其他动作
reply_count = 0
other_count = 0
for result in execution_result.get("results", []):
action_type = result.get("action_type", "")
if action_type in ["reply", "proactive_reply"]:
reply_count += 1
else:
other_count += 1
self.planner_stats["replies_generated"] += reply_count
self.planner_stats["other_actions_executed"] += other_count
def _build_return_result(self, plan: "Plan") -> Tuple[List[Dict], Optional[Dict]]:
"""构建返回结果"""
final_actions = plan.decided_actions or []
final_target_message = next((act.action_message for act in final_actions if act.action_message), None)
final_actions_dict = [asdict(act) for act in final_actions]
if final_target_message:
if hasattr(final_target_message, "__dataclass_fields__"):
final_target_message_dict = asdict(final_target_message)
else:
final_target_message_dict = final_target_message
else:
final_target_message_dict = None
return final_actions_dict, final_target_message_dict
def get_user_relationship(self, user_id: str) -> float:
"""获取用户关系分"""
return self.interest_scoring.get_user_relationship(user_id)
def update_interest_keywords(self, new_keywords: Dict[str, List[str]]):
"""更新兴趣关键词(已弃用,仅保留用于兼容性)"""
logger.info("传统关键词匹配已移除,此方法仅保留用于兼容性")
# 此方法已弃用因为现在完全使用embedding匹配
def get_planner_stats(self) -> Dict[str, any]:
"""获取规划器统计"""
return self.planner_stats.copy()
def get_interest_scoring_stats(self) -> Dict[str, any]:
"""获取兴趣度评分统计"""
return {
"no_reply_count": self.interest_scoring.no_reply_count,
"max_no_reply_count": self.interest_scoring.max_no_reply_count,
"reply_threshold": self.interest_scoring.reply_threshold,
"mention_threshold": self.interest_scoring.mention_threshold,
"user_relationships": len(self.interest_scoring.user_relationships),
}
def get_relationship_stats(self) -> Dict[str, any]:
"""获取用户关系统计"""
return {
"tracking_users": len(self.relationship_tracker.tracking_users),
"relationship_history": len(self.relationship_tracker.relationship_history),
"max_tracking_users": self.relationship_tracker.max_tracking_users,
}
# 全局兴趣度评分系统实例 - 在 individuality 模块中创建

View File

@@ -1,175 +0,0 @@
"""
本文件集中管理所有与规划器Planner相关的提示词Prompt模板。
通过将提示词与代码逻辑分离,可以更方便地对模型的行为进行迭代和优化,
而无需修改核心代码。
"""
from src.chat.utils.prompt import Prompt
def init_prompts():
"""
初始化并向 Prompt 注册系统注册所有规划器相关的提示词。
这个函数会在模块加载时自动调用,确保所有提示词在系统启动时都已准备就绪。
"""
# 核心规划器提示词,用于在接收到新消息时决定如何回应。
# 它构建了一个复杂的上下文,包括历史记录、可用动作、角色设定等,
# 并要求模型以 JSON 格式输出一个或多个动作组合。
Prompt(
"""
{mood_block}
{time_block}
{identity_block}
{users_in_chat}
{custom_prompt_block}
{chat_context_description},以下是具体的聊天内容。
## 📜 已读历史消息(仅供参考)
{read_history_block}
## 📬 未读历史消息(动作执行对象)
{unread_history_block}
{moderation_prompt}
**任务: 构建一个完整的响应**
你的任务是根据当前的聊天内容,构建一个完整的、人性化的响应。一个完整的响应由两部分组成:
1. **主要动作**: 这是响应的核心,通常是 `reply`(如果有)。
2. **辅助动作 (可选)**: 这是为了增强表达效果的附加动作,例如 `emoji`(发送表情包)或 `poke_user`(戳一戳)。
**决策流程:**
1. **重要:已读历史消息仅作为当前聊天情景的参考,帮助你理解对话上下文。**
2. **重要:所有动作的执行对象只能是未读历史消息中的消息,不能对已读消息执行动作。**
3. 在未读历史消息中,优先对兴趣值高的消息做出动作(兴趣值标注在消息末尾)。
4. 首先,决定是否要对未读消息进行 `reply`(如果有)。
5. 然后,评估当前的对话气氛和用户情绪,判断是否需要一个**辅助动作**来让你的回应更生动、更符合你的性格。
6. 如果需要,选择一个最合适的辅助动作与 `reply`(如果有) 组合。
7. 如果用户明确要求了某个动作,请务必优先满足。
**如果可选动作中没有reply请不要使用**
**可用动作:**
{actions_before_now_block}
{no_action_block}
{action_options_text}
**输出格式:**
你必须以严格的 JSON 格式输出返回一个包含所有选定动作的JSON列表。如果没有任何合适的动作返回一个空列表[]。
**单动作示例 (仅回复):**
[
{{
"action": "reply",
"target_message_id": "m123",
"reason": "回答用户的问题"
}}
]
**组合动作示例 (回复 + 表情包):**
[
{{
"action": "reply",
"target_message_id": "m123",
"reason": "回答用户的问题"
}},
{{
"action": "emoji",
"target_message_id": "m123",
"reason": "根据我将要回复的文本内容,选择一个最匹配的表情包来增强表达效果。回复的文本是:<TEXT>"
}}
]
**重要规则:**
1. 当 `reply` 和 `emoji` 动作同时被选择时,`emoji` 动作的 `reason` 字段必须包含 `reply` 动作最终生成的回复文本内容。你需要将 `<TEXT>` 占位符替换为 `reply` 动作的 `reason` 字段内容,以确保表情包的选择与回复文本高度相关。
2. **动作执行限制所有动作的target_message_id必须是未读历史消息中的消息ID(消息ID格式:m123)。**
3. **兴趣度优先:在多个未读消息中,优先选择兴趣值高的消息进行回复。**
不要输出markdown格式```json等内容直接输出且仅包含 JSON 列表内容:
""",
"planner_prompt",
)
# 主动思考规划器提示词,用于在没有新消息时决定是否要主动发起对话。
# 它模拟了人类的自发性思考,允许模型根据长期记忆和最近的对话来决定是否开启新话题。
Prompt(
"""
# 主动思考决策
## 你的内部状态
{time_block}
{identity_block}
{mood_block}
## 长期记忆摘要
{long_term_memory_block}
## 最近的聊天内容
{chat_content_block}
## 最近的动作历史
{actions_before_now_block}
## 任务
你现在要决定是否主动说些什么。就像一个真实的人一样,有时候会突然想起之前聊到的话题,或者对朋友的近况感到好奇,想主动询问或关心一下。
**重要提示**:你的日程安排仅供你个人参考,不应作为主动聊天话题的主要来源。请更多地从聊天内容和朋友的动态中寻找灵感。
请基于聊天内容,用你的判断力来决定是否要主动发言。不要按照固定规则,而是像人类一样自然地思考:
- 是否想起了什么之前提到的事情,想问问后来怎么样了?
- 是否注意到朋友提到了什么值得关心的事情?
- 是否有什么话题突然想到,觉得现在聊聊很合适?
- 或者觉得现在保持沉默更好?
## 可用动作
动作proactive_reply
动作描述:主动发起对话,可以是关心朋友、询问近况、延续之前的话题,或分享想法。
- 当你突然想起之前的话题,想询问进展时
- 当你想关心朋友的情况时
- 当你有什么想法想分享时
- 当你觉得现在是个合适的聊天时机时
{{
"action": "proactive_reply",
"reason": "你决定主动发言的具体原因",
"topic": "你想说的内容主题(简洁描述)"
}}
动作do_nothing
动作描述:保持沉默,不主动发起对话。
- 当你觉得现在不是合适的时机时
- 当最近已经说得够多了时
- 当对话氛围不适合插入时
{{
"action": "do_nothing",
"reason": "决定保持沉默的原因"
}}
你必须从上面列出的可用action中选择一个。要像真人一样自然地思考和决策。
请以严格的 JSON 格式输出,且仅包含 JSON 内容:
""",
"proactive_planner_prompt",
)
# 单个动作的格式化提示词模板。
# 用于将每个可用动作的信息格式化后,插入到主提示词的 {action_options_text} 占位符中。
Prompt(
"""
动作:{action_name}
动作描述:{action_description}
{action_require}
{{
"action": "{action_name}",
"target_message_id": "触发action的消息id",
"reason": "触发action的原因"{action_parameters}
}}
""",
"action_prompt",
)
# 在模块加载时自动调用,完成提示词的注册。
init_prompts()