This commit is contained in:
tcmofashi
2025-06-02 21:09:53 +08:00
31 changed files with 1714 additions and 384 deletions

View File

@@ -1,6 +1,7 @@
from typing import Dict, List, Optional, Type, Any
from src.chat.focus_chat.planners.actions.base_action import BaseAction, _ACTION_REGISTRY
from src.chat.heart_flow.observation.observation import Observation
from src.chat.focus_chat.replyer.default_replyer import DefaultReplyer
from src.chat.focus_chat.expressors.default_expressor import DefaultExpressor
from src.chat.message_receive.chat_stream import ChatStream
from src.common.logger_manager import get_logger
@@ -135,6 +136,7 @@ class ActionManager:
thinking_id: str,
observations: List[Observation],
expressor: DefaultExpressor,
replyer: DefaultReplyer,
chat_stream: ChatStream,
log_prefix: str,
shutting_down: bool = False,
@@ -150,6 +152,7 @@ class ActionManager:
thinking_id: 思考ID
observations: 观察列表
expressor: 表达器
replyer: 回复器
chat_stream: 聊天流
log_prefix: 日志前缀
shutting_down: 是否正在关闭
@@ -176,6 +179,7 @@ class ActionManager:
thinking_id=thinking_id,
observations=observations,
expressor=expressor,
replyer=replyer,
chat_stream=chat_stream,
log_prefix=log_prefix,
shutting_down=shutting_down,

View File

@@ -2,5 +2,6 @@
from . import reply_action # noqa
from . import no_reply_action # noqa
from . import exit_focus_chat_action # noqa
from . import emoji_action # noqa
# 在此处添加更多动作模块导入

View File

@@ -0,0 +1,135 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from src.common.logger_manager import get_logger
from src.chat.focus_chat.planners.actions.base_action import BaseAction, register_action
from typing import Tuple, List
from src.chat.heart_flow.observation.observation import Observation
from src.chat.focus_chat.replyer.default_replyer import DefaultReplyer
from src.chat.message_receive.chat_stream import ChatStream
from src.chat.heart_flow.observation.chatting_observation import ChattingObservation
from src.chat.focus_chat.hfc_utils import create_empty_anchor_message
logger = get_logger("action_taken")
@register_action
class EmojiAction(BaseAction):
"""表情动作处理类
处理构建和发送消息表情的动作。
"""
action_name: str = "emoji"
action_description: str = "当你想发送一个表情辅助你的回复表达"
action_parameters: dict[str:str] = {
"description": "文字描述你想要发送的表情",
}
action_require: list[str] = [
"你想要发送一个表情",
"表达情绪时可以选择使用",
"一般在你回复之后可以选择性使用"
]
associated_types: list[str] = ["emoji"]
default = True
def __init__(
self,
action_data: dict,
reasoning: str,
cycle_timers: dict,
thinking_id: str,
observations: List[Observation],
chat_stream: ChatStream,
log_prefix: str,
replyer: DefaultReplyer,
**kwargs,
):
"""初始化回复动作处理器
Args:
action_name: 动作名称
action_data: 动作数据,包含 message, emojis, target 等
reasoning: 执行该动作的理由
cycle_timers: 计时器字典
thinking_id: 思考ID
observations: 观察列表
replyer: 回复器
chat_stream: 聊天流
log_prefix: 日志前缀
"""
super().__init__(action_data, reasoning, cycle_timers, thinking_id)
self.observations = observations
self.replyer = replyer
self.chat_stream = chat_stream
self.log_prefix = log_prefix
async def handle_action(self) -> Tuple[bool, str]:
"""
处理回复动作
Returns:
Tuple[bool, str]: (是否执行成功, 回复文本)
"""
# 注意: 此处可能会使用不同的expressor实现根据任务类型切换不同的回复策略
return await self._handle_reply(
reasoning=self.reasoning,
reply_data=self.action_data,
cycle_timers=self.cycle_timers,
thinking_id=self.thinking_id,
)
async def _handle_reply(
self, reasoning: str, reply_data: dict, cycle_timers: dict, thinking_id: str
) -> tuple[bool, str]:
"""
处理统一的回复动作 - 可包含文本和表情,顺序任意
reply_data格式:
{
"description": "描述你想要发送的表情"
}
"""
logger.info(f"{self.log_prefix} 决定发送表情")
# 从聊天观察获取锚定消息
# chatting_observation: ChattingObservation = next(
# obs for obs in self.observations if isinstance(obs, ChattingObservation)
# )
# if reply_data.get("target"):
# anchor_message = chatting_observation.search_message_by_text(reply_data["target"])
# else:
# anchor_message = None
# 如果没有找到锚点消息,创建一个占位符
# if not anchor_message:
# logger.info(f"{self.log_prefix} 未找到锚点消息,创建占位符")
# anchor_message = await create_empty_anchor_message(
# self.chat_stream.platform, self.chat_stream.group_info, self.chat_stream
# )
# else:
# anchor_message.update_chat_stream(self.chat_stream)
logger.info(f"{self.log_prefix} 为了表情包创建占位符")
anchor_message = await create_empty_anchor_message(
self.chat_stream.platform, self.chat_stream.group_info, self.chat_stream
)
success, reply_set = await self.replyer.deal_emoji(
cycle_timers=cycle_timers,
action_data=reply_data,
anchor_message=anchor_message,
# reasoning=reasoning,
thinking_id=thinking_id,
)
reply_text = ""
for reply in reply_set:
type = reply[0]
data = reply[1]
if type == "text":
reply_text += data
elif type == "emoji":
reply_text += data
return success, reply_text

View File

@@ -22,12 +22,11 @@ class NoReplyAction(BaseAction):
"""
action_name = "no_reply"
action_description = "不回复"
action_description = "暂时不回复消息"
action_parameters = {}
action_require = [
"话题无关/无聊/不感兴趣/不懂",
"聊天记录中最新一条消息是你自己发的且无人回应你",
"你连续发送了太多消息,且无人回复",
"想要休息一下",
]
default = True

View File

@@ -0,0 +1,134 @@
import asyncio
import traceback
from src.common.logger_manager import get_logger
from src.chat.utils.timer_calculator import Timer
from src.chat.focus_chat.planners.actions.base_action import BaseAction, register_action
from typing import Tuple, List
from src.chat.heart_flow.observation.observation import Observation
from src.chat.heart_flow.observation.chatting_observation import ChattingObservation
from src.chat.focus_chat.hfc_utils import parse_thinking_id_to_timestamp
logger = get_logger("action_taken")
# 常量定义
WAITING_TIME_THRESHOLD = 1200 # 等待新消息时间阈值,单位秒
@register_action
class NoReplyAction(BaseAction):
"""不回复动作处理类
处理决定不回复的动作。
"""
action_name = "no_reply"
action_description = "不回复"
action_parameters = {}
action_require = [
"话题无关/无聊/不感兴趣/不懂",
"聊天记录中最新一条消息是你自己发的且无人回应你",
"你连续发送了太多消息,且无人回复",
]
default = True
def __init__(
self,
action_data: dict,
reasoning: str,
cycle_timers: dict,
thinking_id: str,
observations: List[Observation],
log_prefix: str,
shutting_down: bool = False,
**kwargs,
):
"""初始化不回复动作处理器
Args:
action_name: 动作名称
action_data: 动作数据
reasoning: 执行该动作的理由
cycle_timers: 计时器字典
thinking_id: 思考ID
observations: 观察列表
log_prefix: 日志前缀
shutting_down: 是否正在关闭
"""
super().__init__(action_data, reasoning, cycle_timers, thinking_id)
self.observations = observations
self.log_prefix = log_prefix
self._shutting_down = shutting_down
async def handle_action(self) -> Tuple[bool, str]:
"""
处理不回复的情况
工作流程:
1. 等待新消息、超时或关闭信号
2. 根据等待结果更新连续不回复计数
3. 如果达到阈值,触发回调
Returns:
Tuple[bool, str]: (是否执行成功, 空字符串)
"""
logger.info(f"{self.log_prefix} 决定不回复: {self.reasoning}")
observation = self.observations[0] if self.observations else None
try:
with Timer("等待新消息", self.cycle_timers):
# 等待新消息、超时或关闭信号,并获取结果
await self._wait_for_new_message(observation, self.thinking_id, self.log_prefix)
return True, "" # 不回复动作没有回复文本
except asyncio.CancelledError:
logger.info(f"{self.log_prefix} 处理 'no_reply' 时等待被中断 (CancelledError)")
raise
except Exception as e: # 捕获调用管理器或其他地方可能发生的错误
logger.error(f"{self.log_prefix} 处理 'no_reply' 时发生错误: {e}")
logger.error(traceback.format_exc())
return False, ""
async def _wait_for_new_message(self, observation: ChattingObservation, thinking_id: str, log_prefix: str) -> bool:
"""
等待新消息 或 检测到关闭信号
参数:
observation: 观察实例
thinking_id: 思考ID
log_prefix: 日志前缀
返回:
bool: 是否检测到新消息 (如果因关闭信号退出则返回 False)
"""
wait_start_time = asyncio.get_event_loop().time()
while True:
# --- 在每次循环开始时检查关闭标志 ---
if self._shutting_down:
logger.info(f"{log_prefix} 等待新消息时检测到关闭信号,中断等待。")
return False # 表示因为关闭而退出
# -----------------------------------
thinking_id_timestamp = parse_thinking_id_to_timestamp(thinking_id)
# 检查新消息
if await observation.has_new_messages_since(thinking_id_timestamp):
logger.info(f"{log_prefix} 检测到新消息")
return True
# 检查超时 (放在检查新消息和关闭之后)
if asyncio.get_event_loop().time() - wait_start_time > WAITING_TIME_THRESHOLD:
logger.warning(f"{log_prefix} 等待新消息超时({WAITING_TIME_THRESHOLD}秒)")
return False
try:
# 短暂休眠,让其他任务有机会运行,并能更快响应取消或关闭
await asyncio.sleep(0.5) # 缩短休眠时间
except asyncio.CancelledError:
# 如果在休眠时被取消,再次检查关闭标志
# 如果是正常关闭,则不需要警告
if not self._shutting_down:
logger.warning(f"{log_prefix} _wait_for_new_message 的休眠被意外取消")
# 无论如何,重新抛出异常,让上层处理
raise

View File

@@ -45,6 +45,8 @@ class PluginAction(BaseAction):
self._services["expressor"] = kwargs["expressor"]
if "chat_stream" in kwargs:
self._services["chat_stream"] = kwargs["chat_stream"]
if "replyer" in kwargs:
self._services["replyer"] = kwargs["replyer"]
self.log_prefix = kwargs.get("log_prefix", "")
self._load_plugin_config() # 初始化时加载插件配置

View File

@@ -4,11 +4,10 @@ from src.common.logger_manager import get_logger
from src.chat.focus_chat.planners.actions.base_action import BaseAction, register_action
from typing import Tuple, List
from src.chat.heart_flow.observation.observation import Observation
from src.chat.focus_chat.expressors.default_expressor import DefaultExpressor
from src.chat.focus_chat.replyer.default_replyer import DefaultReplyer
from src.chat.message_receive.chat_stream import ChatStream
from src.chat.heart_flow.observation.chatting_observation import ChattingObservation
from src.chat.focus_chat.hfc_utils import create_empty_anchor_message
from src.config.config import global_config
logger = get_logger("action_taken")
@@ -21,21 +20,13 @@ class ReplyAction(BaseAction):
"""
action_name: str = "reply"
action_description: str = "表达想法,可以只包含文本、表情或两者都有"
action_description: str = "当你想要参与回复或者聊天"
action_parameters: dict[str:str] = {
"text": "你想要表达的内容(可选)",
"emojis": "描述当前使用表情包的场景,一段话描述(可选)",
"target": "你想要回复的原始文本内容(非必须,仅文本,不包含发送者)(可选)",
"target": "如果你要明确回复特定某人的某句话请在target参数中中指定那句话的原始文本非必须仅文本不包含发送者)(可选)",
}
action_require: list[str] = [
"有实质性内容需要表达",
"有人提到你,但你还没有回应他",
"在合适的时候添加表情(不要总是添加),表情描述要详细,描述当前场景,一段话描述",
"如果你有明确的,要回复特定某人的某句话或者你想回复较早的消息请在target中指定那句话的原始文本",
"一次只回复一个人,一次只回复一个话题,突出重点",
"如果是自己发的消息想继续,需自然衔接",
"避免重复或评价自己的发言,不要和自己聊天",
f"注意你的回复要求:{global_config.expression.expression_style}",
"你想要闲聊或者随便附和",
"有人提到你",
]
associated_types: list[str] = ["text", "emoji"]
@@ -49,9 +40,9 @@ class ReplyAction(BaseAction):
cycle_timers: dict,
thinking_id: str,
observations: List[Observation],
expressor: DefaultExpressor,
chat_stream: ChatStream,
log_prefix: str,
replyer: DefaultReplyer,
**kwargs,
):
"""初始化回复动作处理器
@@ -63,13 +54,13 @@ class ReplyAction(BaseAction):
cycle_timers: 计时器字典
thinking_id: 思考ID
observations: 观察列表
expressor: 表达
replyer: 回复
chat_stream: 聊天流
log_prefix: 日志前缀
"""
super().__init__(action_data, reasoning, cycle_timers, thinking_id)
self.observations = observations
self.expressor = expressor
self.replyer = replyer
self.chat_stream = chat_stream
self.log_prefix = log_prefix
@@ -121,7 +112,7 @@ class ReplyAction(BaseAction):
else:
anchor_message.update_chat_stream(self.chat_stream)
success, reply_set = await self.expressor.deal_reply(
success, reply_set = await self.replyer.deal_reply(
cycle_timers=cycle_timers,
action_data=reply_data,
anchor_message=anchor_message,

View File

@@ -0,0 +1,141 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from src.common.logger_manager import get_logger
from src.chat.focus_chat.planners.actions.base_action import BaseAction, register_action
from typing import Tuple, List
from src.chat.heart_flow.observation.observation import Observation
from chat.focus_chat.replyer.default_expressor import DefaultExpressor
from src.chat.message_receive.chat_stream import ChatStream
from src.chat.heart_flow.observation.chatting_observation import ChattingObservation
from src.chat.focus_chat.hfc_utils import create_empty_anchor_message
from src.config.config import global_config
logger = get_logger("action_taken")
@register_action
class ReplyAction(BaseAction):
"""回复动作处理类
处理构建和发送消息回复的动作。
"""
action_name: str = "reply"
action_description: str = "表达想法,可以只包含文本、表情或两者都有"
action_parameters: dict[str:str] = {
"text": "你想要表达的内容(可选)",
"emojis": "描述当前使用表情包的场景,一段话描述(可选)",
"target": "你想要回复的原始文本内容(非必须,仅文本,不包含发送者)(可选)",
}
action_require: list[str] = [
"有实质性内容需要表达",
"有人提到你,但你还没有回应他",
"在合适的时候添加表情(不要总是添加),表情描述要详细,描述当前场景,一段话描述",
"如果你有明确的,要回复特定某人的某句话或者你想回复较早的消息请在target中指定那句话的原始文本",
"一次只回复一个人,一次只回复一个话题,突出重点",
"如果是自己发的消息想继续,需自然衔接",
"避免重复或评价自己的发言,不要和自己聊天",
f"注意你的回复要求:{global_config.expression.expression_style}",
]
associated_types: list[str] = ["text", "emoji"]
default = True
def __init__(
self,
action_data: dict,
reasoning: str,
cycle_timers: dict,
thinking_id: str,
observations: List[Observation],
expressor: DefaultExpressor,
chat_stream: ChatStream,
log_prefix: str,
**kwargs,
):
"""初始化回复动作处理器
Args:
action_name: 动作名称
action_data: 动作数据,包含 message, emojis, target 等
reasoning: 执行该动作的理由
cycle_timers: 计时器字典
thinking_id: 思考ID
observations: 观察列表
expressor: 表达器
chat_stream: 聊天流
log_prefix: 日志前缀
"""
super().__init__(action_data, reasoning, cycle_timers, thinking_id)
self.observations = observations
self.expressor = expressor
self.chat_stream = chat_stream
self.log_prefix = log_prefix
async def handle_action(self) -> Tuple[bool, str]:
"""
处理回复动作
Returns:
Tuple[bool, str]: (是否执行成功, 回复文本)
"""
# 注意: 此处可能会使用不同的expressor实现根据任务类型切换不同的回复策略
return await self._handle_reply(
reasoning=self.reasoning,
reply_data=self.action_data,
cycle_timers=self.cycle_timers,
thinking_id=self.thinking_id,
)
async def _handle_reply(
self, reasoning: str, reply_data: dict, cycle_timers: dict, thinking_id: str
) -> tuple[bool, str]:
"""
处理统一的回复动作 - 可包含文本和表情,顺序任意
reply_data格式:
{
"text": "你好啊" # 文本内容列表(可选)
"target": "锚定消息", # 锚定消息的文本内容
"emojis": "微笑" # 表情关键词列表(可选)
}
"""
logger.info(f"{self.log_prefix} 决定回复: {self.reasoning}")
# 从聊天观察获取锚定消息
chatting_observation: ChattingObservation = next(
obs for obs in self.observations if isinstance(obs, ChattingObservation)
)
if reply_data.get("target"):
anchor_message = chatting_observation.search_message_by_text(reply_data["target"])
else:
anchor_message = None
# 如果没有找到锚点消息,创建一个占位符
if not anchor_message:
logger.info(f"{self.log_prefix} 未找到锚点消息,创建占位符")
anchor_message = await create_empty_anchor_message(
self.chat_stream.platform, self.chat_stream.group_info, self.chat_stream
)
else:
anchor_message.update_chat_stream(self.chat_stream)
success, reply_set = await self.expressor.deal_reply(
cycle_timers=cycle_timers,
action_data=reply_data,
anchor_message=anchor_message,
reasoning=reasoning,
thinking_id=thinking_id,
)
reply_text = ""
for reply in reply_set:
type = reply[0]
data = reply[1]
if type == "text":
reply_text += data
elif type == "emoji":
reply_text += data
return success, reply_text

View File

@@ -0,0 +1,26 @@
from abc import ABC, abstractmethod
from typing import List, Dict, Any
from src.chat.focus_chat.planners.action_manager import ActionManager
from src.chat.focus_chat.info.info_base import InfoBase
class BasePlanner(ABC):
"""规划器基类"""
def __init__(self, log_prefix: str, action_manager: ActionManager):
self.log_prefix = log_prefix
self.action_manager = action_manager
@abstractmethod
async def plan(self, all_plan_info: List[InfoBase], running_memorys: List[Dict[str, Any]]) -> Dict[str, Any]:
"""
规划下一步行动
Args:
all_plan_info: 所有计划信息
running_memorys: 回忆信息
Returns:
Dict[str, Any]: 规划结果
"""
pass

View File

@@ -16,6 +16,7 @@ from src.chat.utils.prompt_builder import Prompt, global_prompt_manager
from src.individuality.individuality import individuality
from src.chat.focus_chat.planners.action_manager import ActionManager
from json_repair import repair_json
from src.chat.focus_chat.planners.base_planner import BasePlanner
logger = get_logger("planner")
@@ -73,9 +74,9 @@ action_name: {action_name}
)
class ActionPlanner:
class ActionPlanner(BasePlanner):
def __init__(self, log_prefix: str, action_manager: ActionManager):
self.log_prefix = log_prefix
super().__init__(log_prefix, action_manager)
# LLM规划器配置
self.planner_llm = LLMRequest(
model=global_config.model.focus_planner,
@@ -83,8 +84,6 @@ class ActionPlanner:
request_type="focus.planner", # 用于动作规划
)
self.action_manager = action_manager
async def plan(self, all_plan_info: List[InfoBase], running_memorys: List[Dict[str, Any]]) -> Dict[str, Any]:
"""
规划器 (Planner): 使用LLM根据上下文决定做出什么动作
@@ -117,6 +116,7 @@ class ActionPlanner:
cycle_info = ""
structured_info = ""
extra_info = []
current_mind = ""
observed_messages = []
observed_messages_str = ""
chat_type = "group"

View File

@@ -0,0 +1,53 @@
from typing import Dict, Type
from src.chat.focus_chat.planners.base_planner import BasePlanner
from src.chat.focus_chat.planners.planner_complex import ActionPlanner as ComplexActionPlanner
from src.chat.focus_chat.planners.planner_simple import ActionPlanner as SimpleActionPlanner
from src.chat.focus_chat.planners.action_manager import ActionManager
from src.config.config import global_config
from src.common.logger_manager import get_logger
logger = get_logger("planner_factory")
class PlannerFactory:
"""规划器工厂类,用于创建不同类型的规划器实例"""
# 注册所有可用的规划器类型
_planner_types: Dict[str, Type[BasePlanner]] = {
"complex": ComplexActionPlanner,
"simple": SimpleActionPlanner,
}
@classmethod
def register_planner(cls, name: str, planner_class: Type[BasePlanner]) -> None:
"""
注册新的规划器类型
Args:
name: 规划器类型名称
planner_class: 规划器类
"""
cls._planner_types[name] = planner_class
logger.info(f"注册新的规划器类型: {name}")
@classmethod
def create_planner(cls, log_prefix: str, action_manager: ActionManager) -> BasePlanner:
"""
创建规划器实例
Args:
log_prefix: 日志前缀
action_manager: 动作管理器实例
Returns:
BasePlanner: 规划器实例
"""
planner_type = global_config.focus_chat.planner_type
if planner_type not in cls._planner_types:
logger.warning(f"{log_prefix} 未知的规划器类型: {planner_type},使用默认规划器")
planner_type = "complex"
planner_class = cls._planner_types[planner_type]
logger.info(f"{log_prefix} 使用{planner_type}规划器")
return planner_class(log_prefix=log_prefix, action_manager=action_manager)

View File

@@ -0,0 +1,384 @@
import json # <--- 确保导入 json
import traceback
from typing import List, Dict, Any, Optional
from rich.traceback import install
from src.llm_models.utils_model import LLMRequest
from src.config.config import global_config
from src.chat.focus_chat.info.info_base import InfoBase
from src.chat.focus_chat.info.obs_info import ObsInfo
from src.chat.focus_chat.info.cycle_info import CycleInfo
from src.chat.focus_chat.info.mind_info import MindInfo
from src.chat.focus_chat.info.action_info import ActionInfo
from src.chat.focus_chat.info.structured_info import StructuredInfo
from src.chat.focus_chat.info.self_info import SelfInfo
from src.common.logger_manager import get_logger
from src.chat.utils.prompt_builder import Prompt, global_prompt_manager
from src.individuality.individuality import individuality
from src.chat.focus_chat.planners.action_manager import ActionManager
from json_repair import repair_json
from src.chat.focus_chat.planners.base_planner import BasePlanner
from datetime import datetime
logger = get_logger("planner")
install(extra_lines=3)
def init_prompt():
Prompt(
"""
你的自我认知是:
{self_info_block}
{extra_info_block}
{memory_str}
{time_block}
你是群内的一员,你现在正在参与群内的闲聊,以下是群内的聊天内容:
{chat_content_block}
{mind_info_block}
{cycle_info_block}
注意除了下面动作选项之外你在群聊里不能做其他任何事情这是你能力的边界现在请你选择合适的action:
{moderation_prompt}
{action_options_text}
以严格的 JSON 格式输出,且仅包含 JSON 内容,不要有任何其他文字或解释。
请你以下面格式输出:
{{
"action": "action_name"
"参数": "参数的值"(可能有多个参数),
}}
请输出你提取的JSON不要有任何其他文字或解释
""",
"simple_planner_prompt",
)
Prompt(
"""
动作名称:{action_name}
描述:{action_description}
{action_parameters}
使用该动作的场景:
{action_require}""",
"action_prompt",
)
class ActionPlanner(BasePlanner):
def __init__(self, log_prefix: str, action_manager: ActionManager):
super().__init__(log_prefix, action_manager)
# LLM规划器配置
self.planner_llm = LLMRequest(
model=global_config.model.focus_planner,
max_tokens=1000,
request_type="focus.planner", # 用于动作规划
)
self.utils_llm = LLMRequest(
model=global_config.model.utils_small,
max_tokens=1000,
request_type="focus.planner", # 用于动作规划
)
async def plan(self, all_plan_info: List[InfoBase], running_memorys: List[Dict[str, Any]]) -> Dict[str, Any]:
"""
规划器 (Planner): 使用LLM根据上下文决定做出什么动作。
参数:
all_plan_info: 所有计划信息
running_memorys: 回忆信息
"""
action = "no_reply" # 默认动作
reasoning = "规划器初始化默认"
action_data = {}
try:
# 获取观察信息
extra_info: list[str] = []
# 设置默认值
nickname_str = ""
for nicknames in global_config.bot.alias_names:
nickname_str += f"{nicknames},"
name_block = f"你的名字是{global_config.bot.nickname},你的昵称有{nickname_str},有人也会用这些昵称称呼你。"
personality_block = individuality.get_personality_prompt(x_person=2, level=2)
identity_block = individuality.get_identity_prompt(x_person=2, level=2)
self_info = name_block + personality_block + identity_block
current_mind = "你思考了很久,没有想清晰要做什么"
cycle_info = ""
structured_info = ""
extra_info = []
observed_messages = []
observed_messages_str = ""
chat_type = "group"
is_group_chat = True
for info in all_plan_info:
if isinstance(info, ObsInfo):
observed_messages = info.get_talking_message()
observed_messages_str = info.get_talking_message_str_truncate()
chat_type = info.get_chat_type()
is_group_chat = chat_type == "group"
elif isinstance(info, MindInfo):
current_mind = info.get_current_mind()
elif isinstance(info, CycleInfo):
cycle_info = info.get_observe_info()
elif isinstance(info, SelfInfo):
self_info = info.get_processed_info()
elif isinstance(info, StructuredInfo):
structured_info = info.get_processed_info()
# print(f"structured_info: {structured_info}")
# elif not isinstance(info, ActionInfo): # 跳过已处理的ActionInfo
# extra_info.append(info.get_processed_info())
# 获取当前可用的动作
current_available_actions = self.action_manager.get_using_actions()
# 如果没有可用动作或只有no_reply动作直接返回no_reply
if not current_available_actions or (
len(current_available_actions) == 1 and "no_reply" in current_available_actions
):
action = "no_reply"
reasoning = "没有可用的动作" if not current_available_actions else "只有no_reply动作可用跳过规划"
logger.info(f"{self.log_prefix}{reasoning}")
self.action_manager.restore_actions()
logger.debug(
f"{self.log_prefix}沉默后恢复到默认动作集, 当前可用: {list(self.action_manager.get_using_actions().keys())}"
)
return {
"action_result": {"action_type": action, "action_data": action_data, "reasoning": reasoning},
"current_mind": current_mind,
"observed_messages": observed_messages,
}
# --- 构建提示词 (调用修改后的 PromptBuilder 方法) ---
prompt = await self.build_planner_prompt(
self_info_block=self_info,
is_group_chat=is_group_chat, # <-- Pass HFC state
chat_target_info=None,
observed_messages_str=observed_messages_str, # <-- Pass local variable
current_mind=current_mind, # <-- Pass argument
structured_info=structured_info, # <-- Pass SubMind info
current_available_actions=current_available_actions, # <-- Pass determined actions
cycle_info=cycle_info, # <-- Pass cycle info
extra_info=extra_info,
running_memorys=running_memorys,
)
# --- 调用 LLM (普通文本生成) ---
llm_content = None
try:
prompt = f"{prompt}"
llm_content, (reasoning_content, _) = await self.planner_llm.generate_response_async(prompt=prompt)
logger.debug(
f"{self.log_prefix}规划器Prompt:\n{prompt}\n\n决策动作:{action},\n动作信息: '{action_data}'\n理由: {reasoning}"
)
logger.debug(f"{self.log_prefix}LLM 原始响应: {llm_content}")
logger.debug(f"{self.log_prefix}LLM 原始理由响应: {reasoning_content}")
except Exception as req_e:
logger.error(f"{self.log_prefix}LLM 请求执行失败: {req_e}")
reasoning = f"LLM 请求失败,你的模型出现问题: {req_e}"
action = "no_reply"
if llm_content:
try:
fixed_json_string = repair_json(llm_content)
if isinstance(fixed_json_string, str):
try:
parsed_json = json.loads(fixed_json_string)
except json.JSONDecodeError as decode_error:
logger.error(f"JSON解析错误: {str(decode_error)}")
parsed_json = {}
else:
# 如果repair_json直接返回了字典对象直接使用
parsed_json = fixed_json_string
# 提取决策,提供默认值
extracted_action = parsed_json.get("action", "no_reply")
# extracted_reasoning = parsed_json.get("reasoning", "LLM未提供理由")
extracted_reasoning = ""
# 将所有其他属性添加到action_data
action_data = {}
for key, value in parsed_json.items():
if key not in ["action", "reasoning"]:
action_data[key] = value
action_data["identity"] = self_info
# 对于reply动作不需要额外处理因为相关字段已经在上面的循环中添加到action_data
if extracted_action not in current_available_actions:
logger.warning(
f"{self.log_prefix}LLM 返回了当前不可用或无效的动作: '{extracted_action}' (可用: {list(current_available_actions.keys())}),将强制使用 'no_reply'"
)
action = "no_reply"
reasoning = f"LLM 返回了当前不可用的动作 '{extracted_action}' (可用: {list(current_available_actions.keys())})。原始理由: {extracted_reasoning}"
else:
# 动作有效且可用
action = extracted_action
reasoning = extracted_reasoning
except Exception as json_e:
logger.warning(
f"{self.log_prefix}解析LLM响应JSON失败 {json_e}. LLM原始输出: '{llm_content}'"
)
traceback.print_exc()
reasoning = f"解析LLM响应JSON失败: {json_e}. 将使用默认动作 'no_reply'."
action = "no_reply"
except Exception as outer_e:
logger.error(f"{self.log_prefix}Planner 处理过程中发生意外错误,规划失败,将执行 no_reply: {outer_e}")
traceback.print_exc()
action = "no_reply"
reasoning = f"Planner 内部处理错误: {outer_e}"
# logger.debug(
# f"{self.log_prefix}规划器Prompt:\n{prompt}\n\n决策动作:{action},\n动作信息: '{action_data}'\n理由: {reasoning}"
# )
# 恢复到默认动作集
self.action_manager.restore_actions()
logger.debug(
f"{self.log_prefix}规划后恢复到默认动作集, 当前可用: {list(self.action_manager.get_using_actions().keys())}"
)
action_result = {"action_type": action, "action_data": action_data, "reasoning": reasoning}
plan_result = {
"action_result": action_result,
"current_mind": current_mind,
"observed_messages": observed_messages,
"action_prompt": prompt,
}
return plan_result
async def build_planner_prompt(
self,
self_info_block: str,
is_group_chat: bool, # Now passed as argument
chat_target_info: Optional[dict], # Now passed as argument
observed_messages_str: str,
current_mind: Optional[str],
structured_info: Optional[str],
current_available_actions: Dict[str, ActionInfo],
cycle_info: Optional[str],
extra_info: list[str],
running_memorys: List[Dict[str, Any]],
) -> str:
"""构建 Planner LLM 的提示词 (获取模板并填充数据)"""
try:
memory_str = ""
if global_config.focus_chat.parallel_processing:
memory_str = ""
if running_memorys:
memory_str = "以下是当前在聊天中,你回忆起的记忆:\n"
for running_memory in running_memorys:
memory_str += f"{running_memory['topic']}: {running_memory['content']}\n"
chat_context_description = "你现在正在一个群聊中"
chat_target_name = None # Only relevant for private
if not is_group_chat and chat_target_info:
chat_target_name = (
chat_target_info.get("person_name") or chat_target_info.get("user_nickname") or "对方"
)
chat_context_description = f"你正在和 {chat_target_name} 私聊"
chat_content_block = ""
if observed_messages_str:
chat_content_block = f"聊天记录:\n{observed_messages_str}"
else:
chat_content_block = "你还未开始聊天"
mind_info_block = ""
if current_mind:
mind_info_block = f"对聊天的规划:{current_mind}"
else:
mind_info_block = "你刚参与聊天"
personality_block = individuality.get_prompt(x_person=2, level=2)
action_options_block = ""
for using_actions_name, using_actions_info in current_available_actions.items():
# print(using_actions_name)
# print(using_actions_info)
# print(using_actions_info["parameters"])
# print(using_actions_info["require"])
# print(using_actions_info["description"])
using_action_prompt = await global_prompt_manager.get_prompt_async("action_prompt")
param_text = ""
for param_name, param_description in using_actions_info["parameters"].items():
param_text += f" {param_name}: {param_description}\n"
require_text = ""
for require_item in using_actions_info["require"]:
require_text += f" - {require_item}\n"
if param_text:
param_text = f"参数:\n{param_text}"
else:
param_text = "无需参数"
using_action_prompt = using_action_prompt.format(
action_name=using_actions_name,
action_description=using_actions_info["description"],
action_parameters=param_text,
action_require=require_text,
)
action_options_block += using_action_prompt
extra_info_block = "\n".join(extra_info)
extra_info_block += f"\n{structured_info}"
if extra_info or structured_info:
extra_info_block = f"以下是一些额外的信息,现在请你阅读以下内容,进行决策\n{extra_info_block}\n以上是一些额外的信息,现在请你阅读以下内容,进行决策"
else:
extra_info_block = ""
# moderation_prompt_block = "请不要输出违法违规内容,不要输出色情,暴力,政治相关内容,如有敏感内容,请规避。"
moderation_prompt_block = ""
# 获取当前时间
time_block = f"当前时间:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
planner_prompt_template = await global_prompt_manager.get_prompt_async("simple_planner_prompt")
prompt = planner_prompt_template.format(
self_info_block=self_info_block,
memory_str=memory_str,
time_block=time_block,
# bot_name=global_config.bot.nickname,
prompt_personality=personality_block,
chat_context_description=chat_context_description,
chat_content_block=chat_content_block,
mind_info_block=mind_info_block,
cycle_info_block=cycle_info,
action_options_text=action_options_block,
# action_available_block=action_available_block,
extra_info_block=extra_info_block,
moderation_prompt=moderation_prompt_block,
)
return prompt
except Exception as e:
logger.error(f"构建 Planner 提示词时出错: {e}")
logger.error(traceback.format_exc())
return "构建 Planner Prompt 时出错"
init_prompt()