refactor(planner): 重构动作规划器为模块化结构

将原有的 `ActionPlanner` 类拆分为三个独立的模块:`PlanGenerator`、`PlanFilter` 和 `PlanExecutor`。`ActionPlanner` 现在作为协调器,按顺序调用这三个模块,使规划流程更加清晰和模块化。

- **PlanGenerator**: 负责根据聊天模式和上下文生成初始规划。
- **PlanFilter**: 负责审查和筛选由生成器产生的动作。
- **PlanExecutor**: 负责执行最终确定的动作。

此重构简化了 `cycle_processor` 中的调用逻辑,并为未来的功能扩展(如更复杂的过滤规则)提供了更好的基础。同时,引入了新的 `Plan` 数据模型来统一规划过程中的数据传递。
This commit is contained in:
minecraft1024a
2025-09-13 11:48:04 +08:00
committed by Windpicker-owo
parent c6634b5e99
commit 60e22f66e7
8 changed files with 677 additions and 922 deletions

View File

@@ -1,695 +1,57 @@
import orjson
import time
import traceback
import random
from typing import Dict, Any, Optional, Tuple, List, TYPE_CHECKING
from rich.traceback import install
from datetime import datetime
from json_repair import repair_json
"""
主规划器入口,负责协调 PlanGenerator, PlanFilter, 和 PlanExecutor。
"""
from dataclasses import asdict
from typing import Dict, List, Optional, Tuple
from src.llm_models.utils_model import LLMRequest
from src.config.config import global_config, model_config
from src.common.logger import get_logger
from src.chat.utils.prompt import Prompt, global_prompt_manager
from src.chat.utils.chat_message_builder import (
build_readable_actions,
get_actions_by_timestamp_with_chat,
build_readable_messages_with_id,
get_raw_msg_before_timestamp_with_chat,
)
from src.chat.utils.utils import get_chat_type_and_target_info
from src.chat.planner_actions.action_manager import ActionManager
from src.chat.message_receive.chat_stream import get_chat_manager
from src.plugin_system.base.component_types import (
ActionInfo,
ChatMode,
ComponentType,
)
from src.plugin_system.core.component_registry import component_registry
from src.schedule.schedule_manager import schedule_manager
from src.mood.mood_manager import mood_manager
from src.chat.memory_system.Hippocampus import hippocampus_manager
from src.chat.planner_actions.plan_executor import PlanExecutor
from src.chat.planner_actions.plan_filter import PlanFilter
from src.chat.planner_actions.plan_generator import PlanGenerator
from src.common.data_models.info_data_model import ActionPlannerInfo
from src.common.logger import get_logger
from src.plugin_system.base.component_types import ChatMode
if TYPE_CHECKING:
pass
# 导入提示词模块以确保其被初始化
from . import planner_prompts
logger = get_logger("planner")
install(extra_lines=3)
def init_prompt():
Prompt(
"""
{schedule_block}
{mood_block}
{time_block}
{identity_block}
{users_in_chat}
{custom_prompt_block}
{chat_context_description},以下是具体的聊天内容。
{chat_content_block}
{moderation_prompt}
**任务: 构建一个完整的响应**
你的任务是根据当前的聊天内容,构建一个完整的、人性化的响应。一个完整的响应由两部分组成:
1. **主要动作**: 这是响应的核心,通常是 `reply`(文本回复)。
2. **辅助动作 (可选)**: 这是为了增强表达效果的附加动作,例如 `emoji`(发送表情包)或 `poke_user`(戳一戳)。
**决策流程:**
1. 首先,决定是否要进行 `reply`。
2. 然后,评估当前的对话气氛和用户情绪,判断是否需要一个**辅助动作**来让你的回应更生动、更符合你的性格。
3. 如果需要,选择一个最合适的辅助动作与 `reply` 组合。
4. 如果用户明确要求了某个动作,请务必优先满足。
**可用动作:**
{actions_before_now_block}
{no_action_block}
动作reply
动作描述:参与聊天回复,发送文本进行表达
- 你想要闲聊或者随便附和
- {mentioned_bonus}
- 如果你刚刚进行了回复,不要对同一个话题重复回应
- 不要回复自己发送的消息
{{
"action": "reply",
"target_message_id": "触发action的消息id",
"reason": "回复的原因"
}}
{action_options_text}
**输出格式:**
你必须以严格的 JSON 格式输出返回一个包含所有选定动作的JSON列表。如果没有任何合适的动作返回一个空列表[]。
**单动作示例 (仅回复):**
[
{{
"action": "reply",
"target_message_id": "m123",
"reason": "回答用户的问题"
}}
]
**组合动作示例 (回复 + 表情包):**
[
{{
"action": "reply",
"target_message_id": "m123",
"reason": "回答用户的问题"
}},
{{
"action": "emoji",
"target_message_id": "m123",
"reason": "用一个可爱的表情来缓和气氛"
}}
]
不要输出markdown格式```json等内容直接输出且仅包含 JSON 列表内容:
""",
"planner_prompt",
)
Prompt(
"""
# 主动思考决策
## 你的内部状态
{time_block}
{identity_block}
{schedule_block}
{mood_block}
## 长期记忆摘要
{long_term_memory_block}
## 最近的聊天内容
{chat_content_block}
## 最近的动作历史
{actions_before_now_block}
## 任务
你现在要决定是否主动说些什么。就像一个真实的人一样,有时候会突然想起之前聊到的话题,或者对朋友的近况感到好奇,想主动询问或关心一下。
请基于聊天内容,用你的判断力来决定是否要主动发言。不要按照固定规则,而是像人类一样自然地思考:
- 是否想起了什么之前提到的事情,想问问后来怎么样了?
- 是否注意到朋友提到了什么值得关心的事情?
- 是否有什么话题突然想到,觉得现在聊聊很合适?
- 或者觉得现在保持沉默更好?
## 可用动作
动作proactive_reply
动作描述:主动发起对话,可以是关心朋友、询问近况、延续之前的话题,或分享想法。
- 当你突然想起之前的话题,想询问进展时
- 当你想关心朋友的情况时
- 当你有什么想法想分享时
- 当你觉得现在是个合适的聊天时机时
{{
"action": "proactive_reply",
"reason": "你决定主动发言的具体原因",
"topic": "你想说的内容主题(简洁描述)"
}}
动作do_nothing
动作描述:保持沉默,不主动发起对话。
- 当你觉得现在不是合适的时机时
- 当最近已经说得够多了时
- 当对话氛围不适合插入时
{{
"action": "do_nothing",
"reason": "决定保持沉默的原因"
}}
你必须从上面列出的可用action中选择一个。要像真人一样自然地思考和决策。
请以严格的 JSON 格式输出,且仅包含 JSON 内容:
""",
"proactive_planner_prompt",
)
Prompt(
"""
动作:{action_name}
动作描述:{action_description}
{action_require}
{{
"action": "{action_name}",
"target_message_id": "触发action的消息id",
"reason": "触发action的原因"{action_parameters}
}}
""",
"action_prompt",
)
class ActionPlanner:
"""
协调器,按顺序调用 Generator -> Filter -> Executor。
"""
def __init__(self, chat_id: str, action_manager: ActionManager):
self.chat_id = chat_id
self.log_prefix = f"[{get_chat_manager().get_stream_name(chat_id) or chat_id}]"
self.action_manager = action_manager
# LLM规划器配置
# --- 大脑 ---
self.planner_llm = LLMRequest(
model_set=model_config.model_task_config.planner, request_type="planner"
)
self.last_obs_time_mark = 0.0
async def _get_long_term_memory_context(self) -> str:
"""
获取长期记忆上下文
"""
try:
# 1. 生成时间相关的关键词
now = datetime.now()
keywords = ["今天", "日程", "计划"]
if 5 <= now.hour < 12:
keywords.append("早上")
elif 12 <= now.hour < 18:
keywords.append("中午")
else:
keywords.append("晚上")
# TODO: 添加与聊天对象相关的关键词
# 2. 调用 hippocampus_manager 检索记忆
retrieved_memories = await hippocampus_manager.get_memory_from_topic(
valid_keywords=keywords, max_memory_num=5, max_memory_length=1
)
if not retrieved_memories:
return "最近没有什么特别的记忆。"
# 3. 格式化记忆
memory_statements = []
for topic, memory_item in retrieved_memories:
memory_statements.append(f"关于'{topic}', 你记得'{memory_item}'")
return " ".join(memory_statements)
except Exception as e:
logger.error(f"获取长期记忆时出错: {e}")
return "回忆时出现了一些问题。"
async def _build_action_options(
self,
current_available_actions: Dict[str, ActionInfo],
mode: ChatMode,
target_prompt: str = "",
) -> str:
"""
构建动作选项
"""
action_options_block = ""
for action_name, action_info in current_available_actions.items():
# TODO: 增加一个字段来判断action是否支持在PROACTIVE模式下使用
param_text = ""
if action_info.action_parameters:
param_text = "\n" + "\n".join(
f' "{p_name}":"{p_desc}"' for p_name, p_desc in action_info.action_parameters.items()
)
require_text = "\n".join(f"- {req}" for req in action_info.action_require)
using_action_prompt = await global_prompt_manager.get_prompt_async("action_prompt")
action_options_block += using_action_prompt.format(
action_name=action_name,
action_description=action_info.description,
action_parameters=param_text,
action_require=require_text,
)
return action_options_block
def find_message_by_id(self, message_id: str, message_id_list: list) -> Optional[Dict[str, Any]]:
# sourcery skip: use-next
"""
根据message_id从message_id_list中查找对应的原始消息
Args:
message_id: 要查找的消息ID
message_id_list: 消息ID列表格式为[{'id': str, 'message': dict}, ...]
Returns:
找到的原始消息字典如果未找到则返回None
"""
# 检测message_id 是否为纯数字
if message_id.isdigit():
message_id = f"m{message_id}"
for item in message_id_list:
if item.get("id") == message_id:
return item.get("message")
return None
def get_latest_message(self, message_id_list: list) -> Optional[Dict[str, Any]]:
"""
获取消息列表中的最新消息
Args:
message_id_list: 消息ID列表格式为[{'id': str, 'message': dict}, ...]
Returns:
最新的消息字典如果列表为空则返回None
"""
if not message_id_list:
return None
# 假设消息列表是按时间顺序排列的,最后一个是最新的
return message_id_list[-1].get("message")
async def _parse_single_action(
self,
action_json: dict,
message_id_list: list, # 使用 planner.py 的 list of dict
current_available_actions: list, # 使用 planner.py 的 list of tuple
) -> List[Dict[str, Any]]:
"""
[注释] 解析单个LLM返回的action JSON并将其转换为标准化的字典。
"""
parsed_actions = []
try:
action = action_json.get("action", "no_action")
reasoning = action_json.get("reason", "未提供原因")
action_data = {k: v for k, v in action_json.items() if k not in ["action", "reason"]}
target_message = None
if action not in ["no_action", "no_reply", "do_nothing", "proactive_reply"]:
if target_message_id := action_json.get("target_message_id"):
target_message = self.find_message_by_id(target_message_id, message_id_list)
if target_message is None:
logger.warning(f"{self.log_prefix}无法找到target_message_id '{target_message_id}'")
target_message = self.get_latest_message(message_id_list)
else:
logger.warning(f"{self.log_prefix}动作'{action}'缺少target_message_id")
available_action_names = [name for name, _ in current_available_actions]
if action not in ["no_action", "no_reply", "reply", "do_nothing", "proactive_reply"] and action not in available_action_names:
logger.warning(
f"{self.log_prefix}LLM 返回了当前不可用或无效的动作: '{action}' (可用: {available_action_names}),将强制使用 'no_action'"
)
reasoning = f"LLM 返回了当前不可用的动作 '{action}' (可用: {available_action_names})。原始理由: {reasoning}"
action = "no_action"
# 将列表转换为字典格式以供将来使用
available_actions_dict = dict(current_available_actions)
parsed_actions.append(
{
"action_type": action,
"reasoning": reasoning,
"action_data": action_data,
"action_message": target_message,
"available_actions": available_actions_dict,
}
)
# 如果是at_user动作且只有user_name尝试转换为user_id
if action == "at_user" and "user_name" in action_data and "user_id" not in action_data:
user_name = action_data["user_name"]
from src.person_info.person_info import get_person_info_manager
user_info = await get_person_info_manager().get_person_info_by_name(user_name)
if user_info and user_info.get("user_id"):
action_data["user_id"] = user_info["user_id"]
logger.info(f"成功将用户名 '{user_name}' 解析为 user_id '{user_info['user_id']}'")
else:
logger.warning(f"无法将用户名 '{user_name}' 解析为 user_id")
except Exception as e:
logger.error(f"{self.log_prefix}解析单个action时出错: {e}")
parsed_actions.append(
{
"action_type": "no_action",
"reasoning": f"解析action时出错: {e}",
"action_data": {},
"action_message": None,
"available_actions": dict(current_available_actions),
}
)
return parsed_actions
def _filter_no_actions(self, action_list: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""
[注释] 从一个action字典列表中过滤掉所有的 'no_action'
如果过滤后列表为空, 则返回一个空的列表, 或者根据需要返回一个默认的no_action字典。
"""
non_no_actions = [a for a in action_list if a.get("action_type") not in ["no_action", "no_reply"]]
if non_no_actions:
return non_no_actions
# 如果都是 no_action则返回一个包含第一个 no_action 的列表,以保留 reason
return action_list[:1] if action_list else []
self.generator = PlanGenerator(chat_id)
self.filter = PlanFilter()
self.executor = PlanExecutor(action_manager)
async def plan(
self,
mode: ChatMode = ChatMode.FOCUS,
loop_start_time: float = 0.0,
available_actions: Optional[Dict[str, ActionInfo]] = None,
pseudo_message: Optional[str] = None,
) -> Tuple[List[Dict[str, Any]], Optional[Dict[str, Any]]]:
self, mode: ChatMode = ChatMode.FOCUS
) -> Tuple[List[Dict], Optional[Dict]]:
"""
[注释] "大脑"规划器
统一决策是否进行聊天回复(reply)以及执行哪些actions。
执行完整的规划流程
"""
# --- 1. 准备上下文信息 ---
is_group_chat, chat_target_info, current_available_actions = self.get_necessary_info()
if available_actions is None:
available_actions = current_available_actions
# 1. 生成初始 Plan
initial_plan = await self.generator.generate(mode)
# --- 2. 大脑统一决策 ---
final_actions: List[Dict[str, Any]] = []
try:
prompt, used_message_id_list = await self.build_planner_prompt(
is_group_chat=is_group_chat,
chat_target_info=chat_target_info,
current_available_actions=available_actions,
mode=mode,
)
llm_content, _ = await self.planner_llm.generate_response_async(prompt=prompt)
# 2. 筛选 Plan
filtered_plan = await self.filter.filter(initial_plan)
if llm_content:
parsed_json = orjson.loads(repair_json(llm_content))
# 确保处理的是列表
if isinstance(parsed_json, dict):
parsed_json = [parsed_json]
# 3. 执行 Plan
await self.executor.execute(filtered_plan)
if isinstance(parsed_json, list):
for item in parsed_json:
if isinstance(item, dict):
final_actions.extend(await self._parse_single_action(item, used_message_id_list, list(available_actions.items())))
# 如果是私聊且开启了强制回复并且没有任何回复性action则强制添加reply
if not is_group_chat and global_config.chat.force_reply_private:
has_reply_action = any(a.get("action_type") == "reply" for a in final_actions)
if not has_reply_action:
final_actions.append({
"action_type": "reply",
"reasoning": "私聊强制回复",
"action_data": {},
"action_message": self.get_latest_message(used_message_id_list),
"available_actions": available_actions,
})
logger.info(f"{self.log_prefix}私聊强制回复已触发,添加 'reply' 动作")
logger.info(f"{self.log_prefix}大脑决策: {[a.get('action_type') for a in final_actions]}")
except Exception as e:
logger.error(f"{self.log_prefix}大脑处理过程中发生意外错误: {e}\n{traceback.format_exc()}")
final_actions.append({"action_type": "no_action", "reasoning": f"大脑处理错误: {e}"})
# --- 3. 后处理 ---
final_actions = self._filter_no_actions(final_actions)
# === 概率模式后处理:根据配置决定是否强制添加 emoji 动作 ===
if global_config.emoji.emoji_activate_type == 'random':
has_reply_action = any(a.get("action_type") == "reply" for a in final_actions)
if has_reply_action:
# 检查此动作是否已被选择
is_already_chosen = any(a.get("action_type") == 'emoji' for a in final_actions)
if not is_already_chosen:
if random.random() < global_config.emoji.emoji_chance:
logger.info(f"{self.log_prefix}根据概率 '{global_config.emoji.emoji_chance}' 添加 emoji 动作")
final_actions.append({
"action_type": 'emoji',
"reasoning": f"根据概率 {global_config.emoji.emoji_chance} 自动添加",
"action_data": {},
"action_message": self.get_latest_message(used_message_id_list),
"available_actions": available_actions,
})
if not final_actions:
final_actions = [
{
"action_type": "no_action",
"reasoning": "规划器选择不执行动作",
"action_data": {}, "action_message": None, "available_actions": available_actions
}
]
final_target_message = next((act.get("action_message") for act in final_actions if act.get("action_message")), None)
# 记录每个动作的原因
for action_info in final_actions:
action_type = action_info.get("action_type", "N/A")
reasoning = action_info.get("reasoning", "")
logger.info(f"{self.log_prefix}决策: [{action_type}],原因: {reasoning}")
actions_str = ", ".join([a.get('action_type', 'N/A') for a in final_actions])
logger.info(f"{self.log_prefix}最终执行动作 ({len(final_actions)}): [{actions_str}]")
# 4. 返回结果 (与旧版 planner 的返回值保持兼容)
final_actions = filtered_plan.decided_actions or []
final_target_message = next(
(act.action_message for act in final_actions if act.action_message), None
)
return final_actions, final_target_message
final_actions_dict = [asdict(act) for act in final_actions]
final_target_message_dict = asdict(final_target_message) if final_target_message else None
async def build_planner_prompt(
self,
is_group_chat: bool,
chat_target_info: Optional[dict],
current_available_actions: Dict[str, ActionInfo],
mode: ChatMode = ChatMode.FOCUS,
refresh_time: bool = False, # 添加缺失的参数
) -> tuple[str, list]:
"""构建 Planner LLM 的提示词 (获取模板并填充数据)"""
try:
# --- 通用信息获取 ---
time_block = f"当前时间:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
bot_name = global_config.bot.nickname
bot_nickname = (
f",也有人叫你{','.join(global_config.bot.alias_names)}" if global_config.bot.alias_names else ""
)
bot_core_personality = global_config.personality.personality_core
identity_block = f"你的名字是{bot_name}{bot_nickname},你{bot_core_personality}"
schedule_block = ""
if global_config.planning_system.schedule_enable:
if current_activity := schedule_manager.get_current_activity():
schedule_block = f"你当前正在:{current_activity},但注意它与群聊的聊天无关。"
mood_block = ""
if global_config.mood.enable_mood:
chat_mood = mood_manager.get_mood_by_chat_id(self.chat_id)
mood_block = f"你现在的心情是:{chat_mood.mood_state}"
# --- 根据模式构建不同的Prompt ---
if mode == ChatMode.PROACTIVE:
long_term_memory_block = await self._get_long_term_memory_context()
# 获取最近的聊天记录用于主动思考决策
message_list_short = get_raw_msg_before_timestamp_with_chat(
chat_id=self.chat_id,
timestamp=time.time(),
limit=int(global_config.chat.max_context_size * 0.2), # 主动思考时只看少量最近消息
)
chat_content_block, message_id_list = build_readable_messages_with_id(
messages=message_list_short,
timestamp_mode="normal",
truncate=False,
show_actions=False,
)
prompt_template = await global_prompt_manager.get_prompt_async("proactive_planner_prompt")
actions_before_now = get_actions_by_timestamp_with_chat(
chat_id=self.chat_id,
timestamp_start=time.time() - 3600,
timestamp_end=time.time(),
limit=5,
)
actions_before_now_block = build_readable_actions(actions=actions_before_now)
actions_before_now_block = f"你刚刚选择并执行过的action是\n{actions_before_now_block}"
prompt = prompt_template.format(
time_block=time_block,
identity_block=identity_block,
schedule_block=schedule_block,
mood_block=mood_block,
long_term_memory_block=long_term_memory_block,
chat_content_block=chat_content_block or "最近没有聊天内容。",
actions_before_now_block=actions_before_now_block,
)
return prompt, message_id_list
# --- FOCUS 和 NORMAL 模式的逻辑 ---
message_list_before_now = get_raw_msg_before_timestamp_with_chat(
chat_id=self.chat_id,
timestamp=time.time(),
limit=int(global_config.chat.max_context_size * 0.6),
)
chat_content_block, message_id_list = build_readable_messages_with_id(
messages=message_list_before_now,
timestamp_mode="normal",
read_mark=self.last_obs_time_mark,
truncate=True,
show_actions=True,
)
actions_before_now = get_actions_by_timestamp_with_chat(
chat_id=self.chat_id,
timestamp_start=time.time() - 3600,
timestamp_end=time.time(),
limit=5,
)
actions_before_now_block = build_readable_actions(actions=actions_before_now)
actions_before_now_block = f"你刚刚选择并执行过的action是\n{actions_before_now_block}"
if refresh_time:
self.last_obs_time_mark = time.time()
mentioned_bonus = ""
if global_config.chat.mentioned_bot_inevitable_reply:
mentioned_bonus = "\n- 有人提到你"
if global_config.chat.at_bot_inevitable_reply:
mentioned_bonus = "\n- 有人提到你或者at你"
if mode == ChatMode.FOCUS:
no_action_block = """
动作no_action
动作描述:不选择任何动作
{{
"action": "no_action",
"reason":"不动作的原因"
}}
动作no_reply
动作描述:不进行回复,等待合适的回复时机
- 当你刚刚发送了消息没有人回复时选择no_reply
- 当你一次发送了太多消息为了避免打扰聊天节奏选择no_reply
{{
"action": "no_reply",
"reason":"不回复的原因"
}}
"""
else: # NORMAL Mode
no_action_block = """重要说明:
- 'reply' 表示只进行普通聊天回复,不执行任何额外动作
- 其他action表示在普通回复的基础上执行相应的额外动作
{{
"action": "reply",
"target_message_id":"触发action的消息id",
"reason":"回复的原因"
}}"""
chat_context_description = "你现在正在一个群聊中"
chat_target_name = None
if not is_group_chat and chat_target_info:
chat_target_name = (
chat_target_info.get("person_name") or chat_target_info.get("user_nickname") or "对方"
)
chat_context_description = f"你正在和 {chat_target_name} 私聊"
action_options_block = await self._build_action_options(current_available_actions, mode)
moderation_prompt_block = "请不要输出违法违规内容,不要输出色情,暴力,政治相关内容,如有敏感内容,请规避。"
custom_prompt_block = ""
if global_config.custom_prompt.planner_custom_prompt_content:
custom_prompt_block = global_config.custom_prompt.planner_custom_prompt_content
from src.person_info.person_info import get_person_info_manager
users_in_chat_str = ""
if is_group_chat and chat_target_info and chat_target_info.get("group_id"):
user_list = await get_person_info_manager().get_specific_value_list("person_name", lambda x: x is not None)
if user_list:
users_in_chat_str = "当前聊天中的用户列表(用于@\n" + "\n".join([f"- {name} (ID: {pid})" for pid, name in user_list.items()]) + "\n"
planner_prompt_template = await global_prompt_manager.get_prompt_async("planner_prompt")
prompt = planner_prompt_template.format(
schedule_block=schedule_block,
mood_block=mood_block,
time_block=time_block,
chat_context_description=chat_context_description,
chat_content_block=chat_content_block,
actions_before_now_block=actions_before_now_block,
mentioned_bonus=mentioned_bonus,
no_action_block=no_action_block,
mentioned_bonus=mentioned_bonus,
action_options_text=action_options_block,
moderation_prompt=moderation_prompt_block,
identity_block=identity_block,
custom_prompt_block=custom_prompt_block,
bot_name=bot_name,
users_in_chat=users_in_chat_str
)
return prompt, message_id_list
except Exception as e:
logger.error(f"构建 Planner 提示词时出错: {e}")
logger.error(traceback.format_exc())
return "构建 Planner Prompt 时出错", []
def get_necessary_info(self) -> Tuple[bool, Optional[dict], Dict[str, ActionInfo]]:
"""
获取 Planner 需要的必要信息
"""
is_group_chat = True
is_group_chat, chat_target_info = get_chat_type_and_target_info(self.chat_id)
logger.debug(f"{self.log_prefix}获取到聊天信息 - 群聊: {is_group_chat}, 目标信息: {chat_target_info}")
current_available_actions_dict = self.action_manager.get_using_actions()
# 获取完整的动作信息
all_registered_actions: Dict[str, ActionInfo] = component_registry.get_components_by_type( # type: ignore
ComponentType.ACTION
)
current_available_actions = {}
for action_name in current_available_actions_dict:
if action_name in all_registered_actions:
current_available_actions[action_name] = all_registered_actions[action_name]
else:
logger.warning(f"{self.log_prefix}使用中的动作 {action_name} 未在已注册动作中找到")
# 将no_reply作为系统级特殊动作添加到可用动作中
# no_reply虽然是系统级决策但需要让规划器认为它是可用的
no_reply_info = ActionInfo(
name="no_reply",
component_type=ComponentType.ACTION,
description="系统级动作:选择不回复消息的决策",
action_parameters={},
activation_keywords=[],
plugin_name="SYSTEM",
enabled=True, # 始终启用
parallel_action=False,
)
current_available_actions["no_reply"] = no_reply_info
return is_group_chat, chat_target_info, current_available_actions
init_prompt()
return final_actions_dict, final_target_message_dict