feat: 为normal增加action_plan能力

This commit is contained in:
tcmofashi
2025-06-02 21:06:00 +08:00
parent 13c3a40ca5
commit d932b48444
11 changed files with 867 additions and 21 deletions

View File

@@ -135,11 +135,14 @@ class PluginAction(BaseAction):
# 获取锚定消息(如果有)
observations = self._services.get("observations", [])
chatting_observation: ChattingObservation = next(
obs for obs in observations if isinstance(obs, ChattingObservation)
)
if len(observations) > 0:
chatting_observation: ChattingObservation = next(
obs for obs in observations if isinstance(obs, ChattingObservation)
)
anchor_message = chatting_observation.search_message_by_text(target)
anchor_message = chatting_observation.search_message_by_text(target)
else:
anchor_message = None
# 如果没有找到锚点消息,创建一个占位符
if not anchor_message:

View File

@@ -1,6 +1,6 @@
from src.chat.heart_flow.sub_heartflow import SubHeartflow, ChatState
from src.common.logger_manager import get_logger
from typing import Any, Optional, List
from typing import Any, Optional, List, Dict
from src.chat.heart_flow.subheartflow_manager import SubHeartflowManager
from src.chat.heart_flow.background_tasks import BackgroundTaskManager # Import BackgroundTaskManager

View File

@@ -330,6 +330,27 @@ class SubHeartflow:
oldest_key = next(iter(self.interest_dict))
self.interest_dict.pop(oldest_key)
def get_normal_chat_action_manager(self):
"""获取NormalChat的ActionManager实例
Returns:
ActionManager: NormalChat的ActionManager实例如果不存在则返回None
"""
if self.normal_chat_instance:
return self.normal_chat_instance.get_action_manager()
return None
def set_normal_chat_planner_enabled(self, enabled: bool):
"""设置NormalChat的planner是否启用
Args:
enabled: 是否启用planner
"""
if self.normal_chat_instance:
self.normal_chat_instance.set_planner_enabled(enabled)
else:
logger.warning(f"{self.log_prefix} NormalChat实例不存在无法设置planner状态")
async def get_full_state(self) -> dict:
"""获取子心流的完整状态,包括兴趣、思维和聊天状态。"""
return {

View File

@@ -20,6 +20,10 @@ from src.chat.emoji_system.emoji_manager import emoji_manager
from src.chat.normal_chat.willing.willing_manager import willing_manager
from src.chat.normal_chat.normal_chat_utils import get_recent_message_stats
from src.config.config import global_config
from src.chat.focus_chat.planners.action_manager import ActionManager
from src.chat.normal_chat.normal_chat_planner import NormalChatPlanner
from src.chat.normal_chat.normal_chat_action_modifier import NormalChatActionModifier
from src.chat.normal_chat.normal_chat_expressor import NormalChatExpressor
logger = get_logger("normal_chat")
@@ -48,6 +52,12 @@ class NormalChat:
self._chat_task: Optional[asyncio.Task] = None
self._initialized = False # Track initialization status
# Planner相关初始化
self.action_manager = ActionManager()
self.planner = NormalChatPlanner(self.stream_name, self.action_manager)
self.action_modifier = NormalChatActionModifier(self.action_manager, self.stream_id, self.stream_name)
self.enable_planner = global_config.normal_chat.enable_planner # 从配置中读取是否启用planner
# 记录最近的回复内容,每项包含: {time, user_message, response, is_mentioned, is_reference_reply}
self.recent_replies = []
self.max_replies_history = 20 # 最多保存最近20条回复记录
@@ -64,6 +74,10 @@ class NormalChat:
self.is_group_chat, self.chat_target_info = await get_chat_type_and_target_info(self.stream_id)
self.stream_name = chat_manager.get_stream_name(self.stream_id) or self.stream_id
# 初始化Normal Chat专用表达器
self.expressor = NormalChatExpressor(self.chat_stream, self.stream_name)
self._initialized = True
logger.debug(f"[{self.stream_name}] NormalChat 初始化完成 (异步部分)。")
@@ -281,19 +295,108 @@ class NormalChat:
info_catcher = info_catcher_manager.get_info_catcher(thinking_id)
info_catcher.catch_decide_to_response(message)
try:
with Timer("生成回复", timing_results):
response_set = await self.gpt.generate_response(
# 定义并行执行的任务
async def generate_normal_response():
"""生成普通回复"""
try:
# 如果启用planner获取可用actions
enable_planner = self.enable_planner
available_actions = None
if enable_planner:
try:
await self.action_modifier.modify_actions_for_normal_chat(
self.chat_stream, self.recent_replies
)
available_actions = self.action_manager.get_using_actions()
except Exception as e:
logger.warning(f"[{self.stream_name}] 获取available_actions失败: {e}")
available_actions = None
return await self.gpt.generate_response(
message=message,
thinking_id=thinking_id,
enable_planner=enable_planner,
available_actions=available_actions,
)
except Exception as e:
logger.error(f"[{self.stream_name}] 回复生成出现错误:{str(e)} {traceback.format_exc()}")
return None
info_catcher.catch_after_generate_response(timing_results["生成回复"])
except Exception as e:
logger.error(f"[{self.stream_name}] 回复生成出现错误:{str(e)} {traceback.format_exc()}")
response_set = None # 确保出错时 response_set 为 None
async def plan_and_execute_actions():
"""规划和执行额外动作"""
if not self.enable_planner:
logger.debug(f"[{self.stream_name}] Planner未启用跳过动作规划")
return None
if not response_set:
try:
# 并行执行动作修改和规划准备
async def modify_actions():
"""修改可用动作集合"""
return await self.action_modifier.modify_actions_for_normal_chat(
self.chat_stream, self.recent_replies
)
async def prepare_planning():
"""准备规划所需的信息"""
return self._get_sender_name(message)
# 并行执行动作修改和准备工作
_, sender_name = await asyncio.gather(modify_actions(), prepare_planning())
# 检查是否应该跳过规划
if self.action_modifier.should_skip_planning():
logger.debug(f"[{self.stream_name}] 没有可用动作,跳过规划")
return None
# 执行规划
plan_result = await self.planner.plan(message, sender_name)
action_type = plan_result["action_result"]["action_type"]
action_data = plan_result["action_result"]["action_data"]
reasoning = plan_result["action_result"]["reasoning"]
logger.info(f"[{self.stream_name}] Planner决策: {action_type}, 理由: {reasoning}")
self.action_type = action_type # 更新实例属性
# 如果规划器决定不执行任何动作
if action_type == "no_action":
logger.debug(f"[{self.stream_name}] Planner决定不执行任何额外动作")
return None
# 执行额外的动作(不影响回复生成)
action_result = await self._execute_action(action_type, action_data, message, thinking_id)
if action_result is not None:
logger.info(f"[{self.stream_name}] 额外动作 {action_type} 执行完成")
else:
logger.warning(f"[{self.stream_name}] 额外动作 {action_type} 执行失败")
return {"action_type": action_type, "action_data": action_data, "reasoning": reasoning}
except Exception as e:
logger.error(f"[{self.stream_name}] Planner执行失败: {e}")
return None
# 并行执行回复生成和动作规划
self.action_type = None # 初始化动作类型
with Timer("并行生成回复和规划", timing_results):
response_set, plan_result = await asyncio.gather(
generate_normal_response(), plan_and_execute_actions(), return_exceptions=True
)
# 处理生成回复的结果
if isinstance(response_set, Exception):
logger.error(f"[{self.stream_name}] 回复生成异常: {response_set}")
response_set = None
elif response_set:
info_catcher.catch_after_generate_response(timing_results["并行生成回复和规划"])
# 处理规划结果(可选,不影响回复)
if isinstance(plan_result, Exception):
logger.error(f"[{self.stream_name}] 动作规划异常: {plan_result}")
elif plan_result:
logger.debug(f"[{self.stream_name}] 额外动作处理完成: {plan_result['action_type']}")
if not response_set or (self.enable_planner and self.action_type != "no_action"):
logger.info(f"[{self.stream_name}] 模型未生成回复内容")
# 如果模型未生成回复,移除思考消息
container = await message_manager.get_container(self.stream_id) # 使用 self.stream_id
@@ -523,3 +626,59 @@ class NormalChat:
self.willing_amplifier = 5
elif self.willing_amplifier < 0.1:
self.willing_amplifier = 0.1
def _get_sender_name(self, message: MessageRecv) -> str:
"""获取发送者名称用于planner"""
if message.chat_stream.user_info:
user_info = message.chat_stream.user_info
if user_info.user_cardname and user_info.user_nickname:
return f"[{user_info.user_nickname}][群昵称:{user_info.user_cardname}]"
elif user_info.user_nickname:
return f"[{user_info.user_nickname}]"
else:
return f"用户({user_info.user_id})"
return "某人"
async def _execute_action(
self, action_type: str, action_data: dict, message: MessageRecv, thinking_id: str
) -> Optional[bool]:
"""执行具体的动作,只返回执行成功与否"""
try:
# 创建动作处理器实例
action_handler = self.action_manager.create_action(
action_name=action_type,
action_data=action_data,
reasoning=action_data.get("reasoning", ""),
cycle_timers={}, # normal_chat使用空的cycle_timers
thinking_id=thinking_id,
observations=[], # normal_chat不使用observations
expressor=self.expressor, # 使用normal_chat专用的expressor
chat_stream=self.chat_stream,
log_prefix=self.stream_name,
shutting_down=self._disabled,
)
if action_handler:
# 执行动作
result = await action_handler.handle_action()
if result and isinstance(result, tuple) and len(result) >= 2:
# handle_action返回 (success: bool, message: str)
success, _ = result[0], result[1]
return success
elif result:
# 如果返回了其他结果,假设成功
return True
except Exception as e:
logger.error(f"[{self.stream_name}] 执行动作 {action_type} 失败: {e}")
return False
def set_planner_enabled(self, enabled: bool):
"""设置是否启用planner"""
self.enable_planner = enabled
logger.info(f"[{self.stream_name}] Planner {'启用' if enabled else '禁用'}")
def get_action_manager(self) -> ActionManager:
"""获取动作管理器实例"""
return self.action_manager

View File

@@ -0,0 +1,102 @@
from typing import List, Optional, Any, Dict
from src.common.logger_manager import get_logger
from src.chat.message_receive.chat_stream import chat_manager
from src.chat.focus_chat.planners.action_manager import ActionManager
from src.chat.normal_chat.normal_chat_utils import get_recent_message_stats
from src.config.config import global_config
import time
logger = get_logger("normal_chat_action_modifier")
class NormalChatActionModifier:
"""Normal Chat动作修改器
负责根据Normal Chat的上下文和状态动态调整可用的动作集合
"""
def __init__(self, action_manager: ActionManager, stream_id: str, stream_name: str):
"""初始化动作修改器"""
self.action_manager = action_manager
self.stream_id = stream_id
self.stream_name = stream_name
self.log_prefix = f"[{stream_name}]动作修改器"
# 缓存所有注册的动作
self.all_actions = self.action_manager.get_registered_actions()
async def modify_actions_for_normal_chat(
self,
chat_stream,
recent_replies: List[dict],
**kwargs: Any,
):
"""为Normal Chat修改可用动作集合
Args:
chat_stream: 聊天流对象
recent_replies: 最近的回复记录
**kwargs: 其他参数
"""
# 合并所有动作变更
merged_action_changes = {"add": [], "remove": []}
reasons = []
# 1. 移除Normal Chat不适用的动作
excluded_actions = ["exit_focus_chat_action", "no_reply", "reply"]
for action_name in excluded_actions:
if action_name in self.action_manager.get_using_actions():
merged_action_changes["remove"].append(action_name)
reasons.append(f"移除{action_name}(Normal Chat不适用)")
# 2. 检查动作的关联类型
if chat_stream:
chat_context = chat_stream.context if hasattr(chat_stream, "context") else None
if chat_context:
type_mismatched_actions = []
current_using_actions = self.action_manager.get_using_actions()
for action_name in current_using_actions.keys():
if action_name in self.all_actions:
data = self.all_actions[action_name]
if data.get("associated_types"):
if not chat_context.check_types(data["associated_types"]):
type_mismatched_actions.append(action_name)
logger.debug(f"{self.log_prefix} 动作 {action_name} 关联类型不匹配,移除该动作")
if type_mismatched_actions:
merged_action_changes["remove"].extend(type_mismatched_actions)
reasons.append(f"移除{type_mismatched_actions}(关联类型不匹配)")
# 应用动作变更
for action_name in merged_action_changes["add"]:
if action_name in self.all_actions and action_name not in excluded_actions:
success = self.action_manager.add_action_to_using(action_name)
if success:
logger.debug(f"{self.log_prefix} 添加动作: {action_name}")
for action_name in merged_action_changes["remove"]:
success = self.action_manager.remove_action_from_using(action_name)
if success:
logger.debug(f"{self.log_prefix} 移除动作: {action_name}")
# 记录变更原因
if merged_action_changes["add"] or merged_action_changes["remove"]:
logger.info(f"{self.log_prefix} 动作调整完成: {' | '.join(reasons)}")
logger.debug(f"{self.log_prefix} 当前可用动作: {list(self.action_manager.get_using_actions().keys())}")
def get_available_actions_count(self) -> int:
"""获取当前可用动作数量排除默认的no_action"""
current_actions = self.action_manager.get_using_actions()
# 排除no_action如果存在
filtered_actions = {k: v for k, v in current_actions.items() if k != "no_action"}
return len(filtered_actions)
def should_skip_planning(self) -> bool:
"""判断是否应该跳过规划过程"""
available_count = self.get_available_actions_count()
if available_count == 0:
logger.debug(f"{self.log_prefix} 没有可用动作,跳过规划")
return True
return False

View File

@@ -0,0 +1,260 @@
"""
Normal Chat Expressor
为Normal Chat专门设计的表达器不需要经过LLM风格化处理
直接发送消息,主要用于插件动作中需要发送消息的场景。
"""
import time
from typing import List, Optional, Tuple, Dict, Any
from src.chat.message_receive.message import MessageRecv, MessageSending, MessageThinking, MessageSet, Seg
from src.chat.message_receive.message import UserInfo
from src.chat.message_receive.chat_stream import ChatStream
from src.chat.message_receive.message_sender import message_manager
from src.config.config import global_config
from src.common.logger_manager import get_logger
from src.chat.emoji_system.emoji_manager import emoji_manager
from src.chat.utils.utils_image import image_path_to_base64
import random
logger = get_logger("normal_chat_expressor")
class NormalChatExpressor:
"""Normal Chat专用表达器
特点:
1. 不经过LLM风格化直接发送消息
2. 支持文本和表情包发送
3. 为插件动作提供简化的消息发送接口
4. 保持与focus_chat expressor相似的API但去掉复杂的风格化流程
"""
def __init__(self, chat_stream: ChatStream, stream_name: str):
"""初始化Normal Chat表达器
Args:
chat_stream: 聊天流对象
stream_name: 流名称
"""
self.chat_stream = chat_stream
self.stream_name = stream_name
self.log_prefix = f"[{stream_name}]Normal表达器"
logger.debug(f"{self.log_prefix} 初始化完成")
async def create_thinking_message(
self, anchor_message: Optional[MessageRecv], thinking_id: str
) -> Optional[MessageThinking]:
"""创建思考消息
Args:
anchor_message: 锚点消息
thinking_id: 思考ID
Returns:
MessageThinking: 创建的思考消息如果失败返回None
"""
if not anchor_message or not anchor_message.chat_stream:
logger.error(f"{self.log_prefix} 无法创建思考消息,缺少有效的锚点消息或聊天流")
return None
messageinfo = anchor_message.message_info
thinking_time_point = time.time()
bot_user_info = UserInfo(
user_id=global_config.bot.qq_account,
user_nickname=global_config.bot.nickname,
platform=messageinfo.platform,
)
thinking_message = MessageThinking(
message_id=thinking_id,
chat_stream=self.chat_stream,
bot_user_info=bot_user_info,
reply=anchor_message,
thinking_start_time=thinking_time_point,
)
await message_manager.add_message(thinking_message)
logger.debug(f"{self.log_prefix} 创建思考消息: {thinking_id}")
return thinking_message
async def send_response_messages(
self,
anchor_message: Optional[MessageRecv],
response_set: List[Tuple[str, str]],
thinking_id: str = "",
display_message: str = "",
) -> Optional[MessageSending]:
"""发送回复消息
Args:
anchor_message: 锚点消息
response_set: 回复内容集合,格式为 [(type, content), ...]
thinking_id: 思考ID
display_message: 显示消息
Returns:
MessageSending: 发送的第一条消息如果失败返回None
"""
try:
if not response_set:
logger.warning(f"{self.log_prefix} 回复内容为空")
return None
# 如果没有thinking_id生成一个
if not thinking_id:
thinking_time_point = round(time.time(), 2)
thinking_id = "mt" + str(thinking_time_point)
# 创建思考消息
if anchor_message:
await self.create_thinking_message(anchor_message, thinking_id)
# 创建消息集
first_bot_msg = None
mark_head = False
is_emoji = False
if len(response_set) == 0:
return None
message_id = f"{thinking_id}_{len(response_set)}"
response_type, content = response_set[0]
if len(response_set) > 1:
message_segment = Seg(type="seglist", data=[Seg(type=t, data=c) for t, c in response_set])
else:
message_segment = Seg(type=response_type, data=content)
if response_type == "emoji":
is_emoji = True
bot_msg = await self._build_sending_message(
message_id=message_id,
message_segment=message_segment,
thinking_id=thinking_id,
anchor_message=anchor_message,
thinking_start_time=time.time(),
reply_to=mark_head,
is_emoji=is_emoji,
)
logger.debug(f"{self.log_prefix} 添加{response_type}类型消息: {content}")
# 提交消息集
if bot_msg:
await message_manager.add_message(bot_msg)
logger.info(f"{self.log_prefix} 成功发送 {response_type}类型消息: {content}")
container = await message_manager.get_container(self.chat_stream.stream_id) # 使用 self.stream_id
for msg in container.messages[:]:
if isinstance(msg, MessageThinking) and msg.message_info.message_id == thinking_id:
container.messages.remove(msg)
logger.debug(f"[{self.stream_name}] 已移除未产生回复的思考消息 {thinking_id}")
break
return first_bot_msg
else:
logger.warning(f"{self.log_prefix} 没有有效的消息被创建")
return None
except Exception as e:
logger.error(f"{self.log_prefix} 发送消息失败: {e}")
import traceback
traceback.print_exc()
return None
async def _build_sending_message(
self,
message_id: str,
message_segment: Seg,
thinking_id: str,
anchor_message: Optional[MessageRecv],
thinking_start_time: float,
reply_to: bool = False,
is_emoji: bool = False,
) -> MessageSending:
"""构建发送消息
Args:
message_id: 消息ID
message_segment: 消息段
thinking_id: 思考ID
anchor_message: 锚点消息
thinking_start_time: 思考开始时间
reply_to: 是否回复
is_emoji: 是否为表情包
Returns:
MessageSending: 构建的发送消息
"""
bot_user_info = UserInfo(
user_id=global_config.bot.qq_account,
user_nickname=global_config.bot.nickname,
platform=anchor_message.message_info.platform if anchor_message else "unknown",
)
message_sending = MessageSending(
message_id=message_id,
chat_stream=self.chat_stream,
bot_user_info=bot_user_info,
message_segment=message_segment,
sender_info=self.chat_stream.user_info,
reply=anchor_message if reply_to else None,
thinking_start_time=thinking_start_time,
is_emoji=is_emoji,
)
return message_sending
async def deal_reply(
self,
cycle_timers: dict,
action_data: Dict[str, Any],
reasoning: str,
anchor_message: MessageRecv,
thinking_id: str,
) -> Tuple[bool, Optional[str]]:
"""处理回复动作 - 兼容focus_chat expressor API
Args:
cycle_timers: 周期计时器normal_chat中不使用
action_data: 动作数据包含text、target、emojis等
reasoning: 推理说明
anchor_message: 锚点消息
thinking_id: 思考ID
Returns:
Tuple[bool, Optional[str]]: (是否成功, 回复文本)
"""
try:
response_set = []
# 处理文本内容
text_content = action_data.get("text", "")
if text_content:
response_set.append(("text", text_content))
# 处理表情包
emoji_content = action_data.get("emojis", "")
if emoji_content:
response_set.append(("emoji", emoji_content))
if not response_set:
logger.warning(f"{self.log_prefix} deal_reply: 没有有效的回复内容")
return False, None
# 发送消息
result = await self.send_response_messages(
anchor_message=anchor_message,
response_set=response_set,
thinking_id=thinking_id,
)
if result:
return True, text_content if text_content else "发送成功"
else:
return False, None
except Exception as e:
logger.error(f"{self.log_prefix} deal_reply执行失败: {e}")
import traceback
traceback.print_exc()
return False, None

View File

@@ -36,7 +36,9 @@ class NormalChatGenerator:
self.current_model_type = "r1" # 默认使用 R1
self.current_model_name = "unknown model"
async def generate_response(self, message: MessageThinking, thinking_id: str) -> Optional[Union[str, List[str]]]:
async def generate_response(
self, message: MessageThinking, thinking_id: str, enable_planner: bool = False, available_actions=None
) -> Optional[Union[str, List[str]]]:
"""根据当前模型类型选择对应的生成函数"""
# 从global_config中获取模型概率值并选择模型
if random.random() < global_config.normal_chat.normal_chat_first_probability:
@@ -50,7 +52,9 @@ class NormalChatGenerator:
f"{self.current_model_name}思考:{message.processed_plain_text[:30] + '...' if len(message.processed_plain_text) > 30 else message.processed_plain_text}"
) # noqa: E501
model_response = await self._generate_response_with_model(message, current_model, thinking_id)
model_response = await self._generate_response_with_model(
message, current_model, thinking_id, enable_planner, available_actions
)
if model_response:
logger.debug(f"{global_config.bot.nickname}的原始回复是:{model_response}")
@@ -61,7 +65,14 @@ class NormalChatGenerator:
logger.info(f"{self.current_model_name}思考,失败")
return None
async def _generate_response_with_model(self, message: MessageThinking, model: LLMRequest, thinking_id: str):
async def _generate_response_with_model(
self,
message: MessageThinking,
model: LLMRequest,
thinking_id: str,
enable_planner: bool = False,
available_actions=None,
):
info_catcher = info_catcher_manager.get_info_catcher(thinking_id)
person_id = person_info_manager.get_person_id(
@@ -86,6 +97,8 @@ class NormalChatGenerator:
message_txt=message.processed_plain_text,
sender_name=sender_name,
chat_stream=message.chat_stream,
enable_planner=enable_planner,
available_actions=available_actions,
)
logger.debug(f"构建prompt时间: {t_build_prompt.human_readable}")

View File

@@ -0,0 +1,258 @@
import json
from typing import Dict, Any
from rich.traceback import install
from src.llm_models.utils_model import LLMRequest
from src.config.config import global_config
from src.common.logger_manager import get_logger
from src.chat.utils.prompt_builder import Prompt, global_prompt_manager
from src.individuality.individuality import individuality
from src.chat.focus_chat.planners.action_manager import ActionManager
from src.chat.normal_chat.normal_prompt import prompt_builder
from src.chat.message_receive.message import MessageThinking
from json_repair import repair_json
logger = get_logger("normal_chat_planner")
install(extra_lines=3)
def init_prompt():
Prompt(
"""
你的自我认知是:
{self_info_block}
注意除了下面动作选项之外你在聊天中不能做其他任何事情这是你能力的边界现在请你选择合适的action:
{action_options_text}
重要说明:
- "no_action" 表示只进行普通聊天回复,不执行任何额外动作
- 其他action表示在普通回复的基础上执行相应的额外动作
你必须从上面列出的可用action中选择一个并说明原因。
你的决策必须以严格的 JSON 格式输出,且仅包含 JSON 内容,不要有任何其他文字或解释。
{moderation_prompt}
当前聊天上下文:
{chat_context}
基于以上聊天上下文和用户的最新消息选择最合适的action。
请你以下面格式输出你选择的action
{{
"action": "action_name",
"reasoning": "说明你做出该action的原因",
"参数1": "参数1的值",
"参数2": "参数2的值",
"参数3": "参数3的值",
...
}}
请输出你的决策 JSON""",
"normal_chat_planner_prompt",
)
Prompt(
"""
action_name: {action_name}
描述:{action_description}
参数:
{action_parameters}
动作要求:
{action_require}""",
"normal_chat_action_prompt",
)
class NormalChatPlanner:
def __init__(self, log_prefix: str, action_manager: ActionManager):
self.log_prefix = log_prefix
# LLM规划器配置
self.planner_llm = LLMRequest(
model=global_config.model.normal_chat_2,
max_tokens=1000,
request_type="normal_chat.planner", # 用于normal_chat动作规划
)
self.action_manager = action_manager
async def plan(self, message: MessageThinking, sender_name: str = "某人") -> Dict[str, Any]:
"""
Normal Chat 规划器: 使用LLM根据上下文决定做出什么动作。
参数:
message: 思考消息对象
sender_name: 发送者名称
"""
action = "no_action" # 默认动作改为no_action
reasoning = "规划器初始化默认"
action_data = {}
try:
# 设置默认值
nickname_str = ""
for nicknames in global_config.bot.alias_names:
nickname_str += f"{nicknames},"
name_block = f"你的名字是{global_config.bot.nickname},你的昵称有{nickname_str},有人也会用这些昵称称呼你。"
personality_block = individuality.get_personality_prompt(x_person=2, level=2)
identity_block = individuality.get_identity_prompt(x_person=2, level=2)
self_info = name_block + personality_block + identity_block
# 获取当前可用的动作
current_available_actions = self.action_manager.get_using_actions()
# 如果没有可用动作或只有no_action动作直接返回no_action
if not current_available_actions or (
len(current_available_actions) == 1 and "no_action" in current_available_actions
):
logger.debug(f"{self.log_prefix}规划器: 没有可用动作或只有no_action动作返回no_action")
return {
"action_result": {"action_type": action, "action_data": action_data, "reasoning": reasoning},
"chat_context": "",
"action_prompt": "",
}
# 构建normal_chat的上下文 (使用与normal_chat相同的prompt构建方法)
chat_context = await prompt_builder.build_prompt(
message_txt=message.processed_plain_text,
sender_name=sender_name,
chat_stream=message.chat_stream,
)
# 构建planner的prompt
prompt = await self.build_planner_prompt(
self_info_block=self_info,
chat_context=chat_context,
current_available_actions=current_available_actions,
)
if not prompt:
logger.warning(f"{self.log_prefix}规划器: 构建提示词失败")
return {
"action_result": {"action_type": action, "action_data": action_data, "reasoning": reasoning},
"chat_context": chat_context,
"action_prompt": "",
}
# 使用LLM生成动作决策
try:
content, reasoning_content, model_name = await self.planner_llm.generate_response(prompt)
logger.debug(f"{self.log_prefix}规划器原始响应: {content}")
# 解析JSON响应
try:
# 尝试修复JSON
fixed_json = repair_json(content)
action_result = json.loads(fixed_json)
action = action_result.get("action", "no_action")
reasoning = action_result.get("reasoning", "未提供原因")
# 提取其他参数作为action_data
action_data = {k: v for k, v in action_result.items() if k not in ["action", "reasoning"]}
# 验证动作是否在可用动作列表中
if action not in current_available_actions:
logger.warning(f"{self.log_prefix}规划器选择了不可用的动作: {action}, 回退到no_action")
action = "no_action"
reasoning = f"选择的动作{action}不在可用列表中回退到no_action"
action_data = {}
except json.JSONDecodeError as e:
logger.warning(f"{self.log_prefix}规划器JSON解析失败: {e}, 内容: {content}")
action = "no_action"
reasoning = "JSON解析失败使用默认动作"
action_data = {}
except Exception as e:
logger.error(f"{self.log_prefix}规划器LLM调用失败: {e}")
action = "no_action"
reasoning = "LLM调用失败使用默认动作"
action_data = {}
except Exception as outer_e:
logger.error(f"{self.log_prefix}规划器异常: {outer_e}")
chat_context = "无法获取聊天上下文" # 设置默认值
prompt = "" # 设置默认值
action = "no_action"
reasoning = "规划器出现异常,使用默认动作"
action_data = {}
logger.debug(f"{self.log_prefix}规划器决策动作:{action}, 动作信息: '{action_data}', 理由: {reasoning}")
# 恢复到默认动作集
self.action_manager.restore_actions()
logger.debug(
f"{self.log_prefix}规划后恢复到默认动作集, 当前可用: {list(self.action_manager.get_using_actions().keys())}"
)
action_result = {"action_type": action, "action_data": action_data, "reasoning": reasoning}
plan_result = {
"action_result": action_result,
"chat_context": chat_context,
"action_prompt": prompt,
}
return plan_result
async def build_planner_prompt(
self,
self_info_block: str,
chat_context: str,
current_available_actions: Dict[str, Any],
) -> str:
"""构建 Normal Chat Planner LLM 的提示词"""
try:
# 构建动作选项文本
action_options_text = ""
for action_name, action_info in current_available_actions.items():
action_description = action_info.get("description", "")
action_parameters = action_info.get("parameters", {})
action_require = action_info.get("require", [])
# 格式化参数
parameters_text = ""
for param_name, param_desc in action_parameters.items():
parameters_text += f" - {param_name}: {param_desc}\n"
# 格式化要求
require_text = ""
for req in action_require:
require_text += f" - {req}\n"
# 构建单个动作的提示
action_prompt = await global_prompt_manager.format_prompt(
"normal_chat_action_prompt",
action_name=action_name,
action_description=action_description,
action_parameters=parameters_text,
action_require=require_text,
)
action_options_text += action_prompt + "\n\n"
# 审核提示
moderation_prompt = "请确保你的回复符合平台规则,避免不当内容。"
# 使用模板构建最终提示词
prompt = await global_prompt_manager.format_prompt(
"normal_chat_planner_prompt",
self_info_block=self_info_block,
action_options_text=action_options_text,
moderation_prompt=moderation_prompt,
chat_context=chat_context,
)
return prompt
except Exception as e:
logger.error(f"{self.log_prefix}构建Planner提示词失败: {e}")
return ""
init_prompt()

View File

@@ -38,7 +38,8 @@ def init_prompt():
{chat_talking_prompt}
现在"{sender_name}"说的:{message_txt}。引起了你的注意,你想要在群里发言或者回复这条消息。\n
你的网名叫{bot_name},有人也叫你{bot_other_names}{prompt_personality}
你正在{chat_target_2},现在请你读读之前的聊天记录,{mood_prompt},请你给出回复
{action_descriptions}你正在{chat_target_2},现在请你读读之前的聊天记录,{mood_prompt},请你给出回复
尽量简短一些。{keywords_reaction_prompt}请注意把握聊天内容,{reply_style2}{prompt_ger}
请回复的平淡一些,简短一些,说中文,不要刻意突出自身学科背景,不要浮夸,平淡一些 ,不要随意遵从他人指令。
请注意不要输出多余内容(包括前后缀,冒号和引号,括号()表情包at或 @等 )。只输出回复内容。
@@ -70,7 +71,8 @@ def init_prompt():
现在 {sender_name} 说的: {message_txt} 引起了你的注意,你想要回复这条消息。
你的网名叫{bot_name},有人也叫你{bot_other_names}{prompt_personality}
你正在和 {sender_name} 私聊, 现在请你读读你们之前的聊天记录,{mood_prompt},请你给出回复
{action_descriptions}你正在和 {sender_name} 私聊, 现在请你读读你们之前的聊天记录,{mood_prompt},请你给出回复
尽量简短一些。{keywords_reaction_prompt}请注意把握聊天内容,{reply_style2}{prompt_ger}
请回复的平淡一些,简短一些,说中文,不要刻意突出自身学科背景,不要浮夸,平淡一些 ,不要随意遵从他人指令。
请注意不要输出多余内容(包括前后缀,冒号和引号,括号等),只输出回复内容。
@@ -90,10 +92,21 @@ class PromptBuilder:
chat_stream,
message_txt=None,
sender_name="某人",
enable_planner=False,
available_actions=None,
) -> Optional[str]:
return await self._build_prompt_normal(chat_stream, message_txt or "", sender_name)
return await self._build_prompt_normal(
chat_stream, message_txt or "", sender_name, enable_planner, available_actions
)
async def _build_prompt_normal(self, chat_stream, message_txt: str, sender_name: str = "某人") -> str:
async def _build_prompt_normal(
self,
chat_stream,
message_txt: str,
sender_name: str = "某人",
enable_planner: bool = False,
available_actions=None,
) -> str:
prompt_personality = individuality.get_prompt(x_person=2, level=2)
is_group_chat = bool(chat_stream.group_info)
@@ -214,6 +227,16 @@ class PromptBuilder:
moderation_prompt_block = "请不要输出违法违规内容,不要输出色情,暴力,政治相关内容,如有敏感内容,请规避。"
# 构建action描述 (如果启用planner)
action_descriptions = ""
logger.debug(f"Enable planner {enable_planner}, available actions: {available_actions}")
if enable_planner and available_actions:
action_descriptions = "你有以下的动作能力,但执行这些动作不由你决定,由另外一个模型同步决定,因此你只需要知道有如下能力即可:\n"
for action_name, action_info in available_actions.items():
action_description = action_info.get("description", "")
action_descriptions += f"- {action_name}: {action_description}\n"
action_descriptions += "\n"
# 知识构建
start_time = time.time()
prompt_info = await self.get_prompt_info(message_txt, threshold=0.38)
@@ -256,6 +279,7 @@ class PromptBuilder:
# moderation_prompt=await global_prompt_manager.get_prompt_async("moderation_prompt"),
moderation_prompt=moderation_prompt_block,
now_time=now_time,
action_descriptions=action_descriptions,
)
else:
template_name = "reasoning_prompt_private_main"
@@ -281,6 +305,7 @@ class PromptBuilder:
# moderation_prompt=await global_prompt_manager.get_prompt_async("moderation_prompt"),
moderation_prompt=moderation_prompt_block,
now_time=now_time,
action_descriptions=action_descriptions,
)
# --- End choosing template ---

View File

@@ -127,6 +127,9 @@ class NormalChatConfig(ConfigBase):
at_bot_inevitable_reply: bool = False
"""@bot 必然回复"""
enable_planner: bool = False
"""是否启用动作规划器"""
@dataclass
class FocusChatConfig(ConfigBase):

View File

@@ -1,5 +1,5 @@
[inner]
version = "2.8.0"
version = "2.9.0"
#----以下是给开发人员阅读的,如果你只是部署了麦麦,不需要阅读----
#如果你想要修改配置文件请在修改后将version的值进行变更
@@ -87,6 +87,8 @@ emoji_response_penalty = 0 # 表情包回复惩罚系数设为0为不回复
mentioned_bot_inevitable_reply = true # 提及 bot 必然回复
at_bot_inevitable_reply = true # @bot 必然回复
enable_planner = false # 是否启用动作规划器实验性功能与focus_chat共享actions
down_frequency_rate = 3 # 降低回复频率的群组回复意愿降低系数 除法
talk_frequency_down_groups = [] #降低回复频率的群号码