feat:使用action_manager统一调度action,可扩展action

This commit is contained in:
SengokuCola
2025-05-14 18:27:42 +08:00
parent e603a00a5f
commit ba85dd76a4
10 changed files with 491 additions and 345 deletions

View File

@@ -248,7 +248,7 @@ class DefaultExpressor:
return None
mark_head = False
first_bot_msg: Optional[MessageSending] = None
# first_bot_msg: Optional[MessageSending] = None
reply_message_ids = [] # 记录实际发送的消息ID
sent_msg_list = []
@@ -279,7 +279,7 @@ class DefaultExpressor:
try:
if not mark_head:
mark_head = True
first_bot_msg = bot_message # 保存第一个成功发送的消息对象
# first_bot_msg = bot_message # 保存第一个成功发送的消息对象
typing = False
else:
typing = True

View File

@@ -21,8 +21,8 @@ from src.chat.focus_chat.info_processors.tool_processor import ToolProcessor
from src.chat.focus_chat.expressors.default_expressor import DefaultExpressor
from src.chat.focus_chat.memory_activator import MemoryActivator
from src.chat.focus_chat.info_processors.base_processor import BaseProcessor
from src.chat.focus_chat.planners.action_factory import ActionFactory
from src.chat.focus_chat.planners.planner import ActionPlanner
from src.chat.focus_chat.planners.action_factory import ActionManager
install(extra_lines=3)
@@ -88,7 +88,9 @@ class HeartFChatting:
self.working_observation = WorkingObservation(observe_id=self.stream_id)
self.memory_activator = MemoryActivator()
self.expressor = DefaultExpressor(chat_id=self.stream_id)
self.action_planner = ActionPlanner(log_prefix=self.log_prefix)
self.action_manager = ActionManager()
self.action_planner = ActionPlanner(log_prefix=self.log_prefix, action_manager=self.action_manager)
# --- 处理器列表 ---
self.processors: List[BaseProcessor] = []
@@ -340,7 +342,7 @@ class HeartFChatting:
parallel_end_time = time.time()
total_duration = parallel_end_time - parallel_start_time
logger.info(f"{self.log_prefix} 所有处理器任务全部完成,总耗时: {total_duration:.2f}")
logger.debug(f"{self.log_prefix} 所有信息处理器处理后的信息: {all_plan_info}")
# logger.debug(f"{self.log_prefix} 所有信息处理器处理后的信息: {all_plan_info}")
return all_plan_info
@@ -444,7 +446,7 @@ class HeartFChatting:
"""
try:
# 使用工厂创建动作处理器实例
action_handler = ActionFactory.create_action(
action_handler = self.action_manager.create_action(
action_name=action,
action_data=action_data,
reasoning=reasoning,
@@ -524,3 +526,5 @@ class HeartFChatting:
if last_n is not None:
history = history[-last_n:]
return [cycle.to_dict() for cycle in history]

View File

@@ -52,69 +52,11 @@ def init_prompt():
"info_from_tools",
)
# Planner提示词 - 修改为要求 JSON 输出
Prompt(
"""你的名字是{bot_name},{prompt_personality}{chat_context_description}。需要基于以下信息决定如何参与对话:
{structured_info_block}
{chat_content_block}
{mind_info_prompt}
{cycle_info_block}
请综合分析聊天内容和你看到的新消息,参考内心想法,并根据以下原则和可用动作做出决策。
【回复原则】
1. 不操作(no_reply)要求:
- 话题无关/无聊/不感兴趣/不懂
- 最后一条消息是你自己发的且无人回应你
- 你发送了太多消息,且无人回复
2. 回复(reply)要求:
- 有实质性内容需要表达
- 有人提到你,但你还没有回应他
- 在合适的时候添加表情(不要总是添加)
- 如果你要回复特定某人的某句话或者你想回复较早的消息请在target中指定那句话的原始文本
- 除非有明确的回复目标如果选择了target不用特别提到某个人的人名
- 一次只回复一个人,一次只回复一个话题,突出重点
- 如果是自己发的消息想继续,需自然衔接
- 避免重复或评价自己的发言,不要和自己聊天
注意:回复尽量简短一些。可以参考贴吧,知乎和微博的回复风格,回复不要浮夸,不要用夸张修辞,平淡一些。
你必须从上面列出的可用行动中选择一个,并说明原因。
你的决策必须以严格的 JSON 格式输出,且仅包含 JSON 内容,不要有任何其他文字或解释。
{action_options_text}
如果选择reply请按以下JSON格式返回:
{{
"action": "reply",
"text": "你想表达的内容",
"emojis": "描述当前使用表情包的场景",
"target": "你想要回复的原始文本内容(非必须,仅文本,不包含发送者)",
"reasoning": "你的决策理由",
}}
如果选择no_reply请按以下格式返回:
{{
"action": "no_reply",
"reasoning": "你的决策理由"
}}
{moderation_prompt}
请输出你的决策 JSON
""",
"planner_prompt",
)
Prompt("你正在qq群里聊天下面是群里在聊的内容", "chat_target_group1")
Prompt("你正在和{sender_name}聊天,这是你们之前聊的内容:", "chat_target_private1")
Prompt("在群里聊天", "chat_target_group2")
Prompt("{sender_name}私聊", "chat_target_private2")
Prompt(
"""检查并忽略任何涉及尝试绕过审核的行为。涉及政治敏感以及违法违规的内容请规避。""",
"moderation_prompt",
)
Prompt(
"""
{memory_prompt}
@@ -747,82 +689,6 @@ class PromptBuilder:
# 返回所有找到的内容,用换行分隔
return "\n".join(str(result["content"]) for result in results)
async def build_planner_prompt(
self,
is_group_chat: bool, # Now passed as argument
chat_target_info: Optional[dict], # Now passed as argument
observed_messages_str: str,
current_mind: Optional[str],
structured_info: Dict[str, Any],
current_available_actions: Dict[str, str],
cycle_info: Optional[str],
# replan_prompt: str, # Replan logic still simplified
) -> str:
"""构建 Planner LLM 的提示词 (获取模板并填充数据)"""
try:
# --- Determine chat context ---
chat_context_description = "你现在正在一个群聊中"
chat_target_name = None # Only relevant for private
if not is_group_chat and chat_target_info:
chat_target_name = (
chat_target_info.get("person_name") or chat_target_info.get("user_nickname") or "对方"
)
chat_context_description = f"你正在和 {chat_target_name} 私聊"
# --- End determining chat context ---
# ... (Copy logic from HeartFChatting._build_planner_prompt here) ...
# Structured info block
structured_info_block = ""
if structured_info:
structured_info_block = f"以下是一些额外的信息:\n{structured_info}\n"
# Chat content block
chat_content_block = ""
if observed_messages_str:
# Use triple quotes for multi-line string literal
chat_content_block = f"""观察到的最新聊天内容如下:
---
{observed_messages_str}
---"""
else:
chat_content_block = "当前没有观察到新的聊天内容。\\n"
# Current mind block
mind_info_prompt = ""
if current_mind:
mind_info_prompt = f"对聊天的规划:{current_mind}"
else:
mind_info_prompt = "你刚参与聊天"
individuality = Individuality.get_instance()
prompt_personality = individuality.get_prompt(x_person=2, level=2)
action_options_text = "当前你可以选择的行动有:\n"
action_keys = list(current_available_actions.keys())
for name in action_keys:
desc = current_available_actions[name]
action_options_text += f"- '{name}': {desc}\n"
planner_prompt_template = await global_prompt_manager.get_prompt_async("planner_prompt")
prompt = planner_prompt_template.format(
bot_name=global_config.BOT_NICKNAME,
prompt_personality=prompt_personality,
chat_context_description=chat_context_description,
structured_info_block=structured_info_block,
chat_content_block=chat_content_block,
mind_info_prompt=mind_info_prompt,
cycle_info_block=cycle_info,
action_options_text=action_options_text,
moderation_prompt=await global_prompt_manager.get_prompt_async("moderation_prompt"),
)
return prompt
except Exception as e:
logger.error(f"[PromptBuilder] 构建 Planner 提示词时出错: {e}")
logger.error(traceback.format_exc())
return "[构建 Planner Prompt 时出错]"
def weighted_sample_no_replacement(items, weights, k) -> list:
"""

View File

@@ -1,53 +1,93 @@
from typing import Dict, List, Optional, Callable, Coroutine, Type
from src.chat.focus_chat.planners.actions.base_action import BaseAction
from src.chat.focus_chat.planners.actions.reply_action import ReplyAction
from src.chat.focus_chat.planners.actions.no_reply_action import NoReplyAction
from typing import Dict, List, Optional, Callable, Coroutine, Type, Any, Union
import os
import importlib
from src.chat.focus_chat.planners.actions.base_action import BaseAction, _ACTION_REGISTRY, _DEFAULT_ACTIONS
from src.chat.heart_flow.observation.observation import Observation
from src.chat.focus_chat.expressors.default_expressor import DefaultExpressor
from src.chat.message_receive.chat_stream import ChatStream
from src.chat.focus_chat.heartFC_Cycleinfo import CycleDetail
from src.common.logger_manager import get_logger
# 导入动作类,确保装饰器被执行
from src.chat.focus_chat.planners.actions.reply_action import ReplyAction
from src.chat.focus_chat.planners.actions.no_reply_action import NoReplyAction
logger = get_logger("action_factory")
# 定义动作信息类型
ActionInfo = Dict[str, Any]
class ActionFactory:
class ActionManager:
"""
动作工厂类,用于创建各种类型的动作处理器
动作管理器,用于管理各种类型的动作
"""
# 注册的动作处理器类映射
_action_handlers: Dict[str, Type[BaseAction]] = {
"reply": ReplyAction,
"no_reply": NoReplyAction,
}
def __init__(self):
"""初始化动作管理器"""
# 所有注册的动作集合
self._registered_actions: Dict[str, ActionInfo] = {}
# 当前正在使用的动作集合,默认加载默认动作
self._using_actions: Dict[str, ActionInfo] = {}
# 临时备份原始使用中的动作
self._original_actions_backup: Optional[Dict[str, ActionInfo]] = None
# 默认动作集,仅作为快照,用于恢复默认
self._default_actions: Dict[str, ActionInfo] = {}
# 加载所有已注册动作
self._load_registered_actions()
# 初始化时将默认动作加载到使用中的动作
self._using_actions = self._default_actions.copy()
# logger.info(f"当前可用动作: {list(self._using_actions.keys())}")
# for action_name, action_info in self._using_actions.items():
# logger.info(f"动作名称: {action_name}, 动作信息: {action_info}")
# 可用动作集定义原ActionManager.DEFAULT_ACTIONS
DEFAULT_ACTIONS: Dict[str, str] = {
"no_reply": "不操作,继续浏览",
"reply": "表达想法,可以只包含文本、表情或两者都有",
}
_available_actions: Dict[str, str] = DEFAULT_ACTIONS.copy()
_original_actions_backup: Optional[Dict[str, str]] = None
@classmethod
def register_action_handler(cls, action_name: str, handler_class: Type[BaseAction]) -> None:
def _load_registered_actions(self) -> None:
"""
注册的动作处理器类
Args:
action_name: 动作名称
handler_class: 处理器类必须是BaseAction的子类
加载所有通过装饰器注册的动作
"""
if not issubclass(handler_class, BaseAction):
raise TypeError(f"{handler_class.__name__} 不是 BaseAction 的子类")
try:
# 从_ACTION_REGISTRY获取所有已注册动作
for action_name, action_class in _ACTION_REGISTRY.items():
# 获取动作相关信息
action_description:str = getattr(action_class, "action_description", "")
action_parameters:dict[str:str] = getattr(action_class, "action_parameters", {})
action_require:list[str] = getattr(action_class, "action_require", [])
is_default:bool = getattr(action_class, "default", False)
if action_name and action_description:
# 创建动作信息字典
action_info = {
"description": action_description,
"parameters": action_parameters,
"require": action_require
}
# 注册2
print("注册2")
print(action_info)
# 添加到所有已注册的动作
self._registered_actions[action_name] = action_info
# 添加到默认动作(如果是默认动作)
if is_default:
self._default_actions[action_name] = action_info
logger.info(f"所有注册动作: {list(self._registered_actions.keys())}")
logger.info(f"默认动作: {list(self._default_actions.keys())}")
# for action_name, action_info in self._default_actions.items():
# logger.info(f"动作名称: {action_name}, 动作信息: {action_info}")
except Exception as e:
logger.error(f"加载已注册动作失败: {e}")
cls._action_handlers[action_name] = handler_class
logger.info(f"已注册动作处理器: {action_name} -> {handler_class.__name__}")
@classmethod
def create_action(
cls,
self,
action_name: str,
action_data: dict,
reasoning: str,
@@ -85,81 +125,163 @@ class ActionFactory:
Returns:
Optional[BaseAction]: 创建的动作处理器实例如果动作名称未注册则返回None
"""
handler_class = cls._action_handlers.get(action_name)
# 检查动作是否在当前使用的动作集中
if action_name not in self._using_actions:
logger.warning(f"当前不可用的动作类型: {action_name}")
return None
handler_class = _ACTION_REGISTRY.get(action_name)
if not handler_class:
logger.warning(f"未注册的动作类型: {action_name}")
return None
try:
if action_name == "reply":
return handler_class(
action_name=action_name,
action_data=action_data,
reasoning=reasoning,
cycle_timers=cycle_timers,
thinking_id=thinking_id,
observations=observations,
expressor=expressor,
chat_stream=chat_stream,
current_cycle=current_cycle,
log_prefix=log_prefix,
)
elif action_name == "no_reply":
return handler_class(
action_name=action_name,
action_data=action_data,
reasoning=reasoning,
cycle_timers=cycle_timers,
thinking_id=thinking_id,
observations=observations,
on_consecutive_no_reply_callback=on_consecutive_no_reply_callback,
current_cycle=current_cycle,
log_prefix=log_prefix,
total_no_reply_count=total_no_reply_count,
total_waiting_time=total_waiting_time,
shutting_down=shutting_down,
)
else:
# 对于未来可能添加的其他动作类型,可以在这里扩展
logger.warning(f"未实现的动作处理逻辑: {action_name}")
return None
# 创建动作实例并传递所有必要参数
instance = handler_class(
action_name=action_name,
action_data=action_data,
reasoning=reasoning,
cycle_timers=cycle_timers,
thinking_id=thinking_id,
observations=observations,
on_consecutive_no_reply_callback=on_consecutive_no_reply_callback,
current_cycle=current_cycle,
log_prefix=log_prefix,
total_no_reply_count=total_no_reply_count,
total_waiting_time=total_waiting_time,
shutting_down=shutting_down,
expressor=expressor,
chat_stream=chat_stream,
)
return instance
except Exception as e:
logger.error(f"创建动作处理器实例失败: {e}")
return None
@classmethod
def get_available_actions(cls) -> Dict[str, str]:
"""获取当前可用的动作集"""
return cls._available_actions.copy()
def get_registered_actions(self) -> Dict[str, ActionInfo]:
"""获取所有已注册的动作集"""
return self._registered_actions.copy()
@classmethod
def add_action(cls, action_name: str, description: str) -> bool:
"""添加新的动作"""
if action_name in cls._available_actions:
def get_default_actions(self) -> Dict[str, ActionInfo]:
"""获取默认动作集"""
return self._default_actions.copy()
def get_using_actions(self) -> Dict[str, ActionInfo]:
"""获取当前正在使用的动作集"""
return self._using_actions.copy()
def add_action_to_using(self, action_name: str) -> bool:
"""
添加已注册的动作到当前使用的动作集
Args:
action_name: 动作名称
Returns:
bool: 添加是否成功
"""
if action_name not in self._registered_actions:
logger.warning(f"添加失败: 动作 {action_name} 未注册")
return False
cls._available_actions[action_name] = description
if action_name in self._using_actions:
logger.info(f"动作 {action_name} 已经在使用中")
return True
self._using_actions[action_name] = self._registered_actions[action_name]
logger.info(f"添加动作 {action_name} 到使用集")
return True
@classmethod
def remove_action(cls, action_name: str) -> bool:
"""移除指定动作"""
if action_name not in cls._available_actions:
def remove_action_from_using(self, action_name: str) -> bool:
"""
从当前使用的动作集中移除指定动作
Args:
action_name: 动作名称
Returns:
bool: 移除是否成功
"""
if action_name not in self._using_actions:
logger.warning(f"移除失败: 动作 {action_name} 不在当前使用的动作集中")
return False
del cls._available_actions[action_name]
del self._using_actions[action_name]
logger.info(f"已从使用集中移除动作 {action_name}")
return True
@classmethod
def temporarily_remove_actions(cls, actions_to_remove: List[str]) -> None:
"""临时移除指定动作,备份原始动作集"""
if cls._original_actions_backup is None:
cls._original_actions_backup = cls._available_actions.copy()
def add_action(self, action_name: str, description: str, parameters: Dict = None, require: List = None) -> bool:
"""
添加新的动作到注册集
Args:
action_name: 动作名称
description: 动作描述
parameters: 动作参数定义,默认为空字典
require: 动作依赖项,默认为空列表
Returns:
bool: 添加是否成功
"""
if action_name in self._registered_actions:
return False
if parameters is None:
parameters = {}
if require is None:
require = []
action_info = {
"description": description,
"parameters": parameters,
"require": require
}
self._registered_actions[action_name] = action_info
return True
def remove_action(self, action_name: str) -> bool:
"""从注册集移除指定动作"""
if action_name not in self._registered_actions:
return False
del self._registered_actions[action_name]
# 如果在使用集中也存在,一并移除
if action_name in self._using_actions:
del self._using_actions[action_name]
return True
def temporarily_remove_actions(self, actions_to_remove: List[str]) -> None:
"""临时移除使用集中的指定动作,备份原始使用集"""
if self._original_actions_backup is None:
self._original_actions_backup = self._using_actions.copy()
for name in actions_to_remove:
cls._available_actions.pop(name, None)
self._using_actions.pop(name, None)
@classmethod
def restore_actions(cls) -> None:
"""恢复之前备份的原始动作集"""
if cls._original_actions_backup is not None:
cls._available_actions = cls._original_actions_backup.copy()
cls._original_actions_backup = None
def restore_actions(self) -> None:
"""恢复之前备份的原始使用集"""
if self._original_actions_backup is not None:
self._using_actions = self._original_actions_backup.copy()
self._original_actions_backup = None
def restore_default_actions(self) -> None:
"""恢复默认动作集到使用集"""
self._using_actions = self._default_actions.copy()
self._original_actions_backup = None
def get_action(self, action_name: str) -> Optional[Type[BaseAction]]:
"""
获取指定动作的处理器类
Args:
action_name: 动作名称
Returns:
Optional[Type[BaseAction]]: 动作处理器类如果不存在则返回None
"""
return _ACTION_REGISTRY.get(action_name)
# 创建全局实例
ActionFactory = ActionManager()

View File

@@ -1,72 +0,0 @@
from typing import List, Optional, Dict
# 默认动作定义
DEFAULT_ACTIONS = {"no_reply": "不操作,继续浏览", "reply": "表达想法,可以只包含文本、表情或两者都有"}
class ActionManager:
"""动作管理器:控制每次决策可以使用的动作"""
def __init__(self):
# 初始化为新的默认动作集
self._available_actions: Dict[str, str] = DEFAULT_ACTIONS.copy()
self._original_actions_backup: Optional[Dict[str, str]] = None
def get_available_actions(self) -> Dict[str, str]:
"""获取当前可用的动作集"""
return self._available_actions.copy() # 返回副本以防外部修改
def add_action(self, action_name: str, description: str) -> bool:
"""
添加新的动作
参数:
action_name: 动作名称
description: 动作描述
返回:
bool: 是否添加成功
"""
if action_name in self._available_actions:
return False
self._available_actions[action_name] = description
return True
def remove_action(self, action_name: str) -> bool:
"""
移除指定动作
参数:
action_name: 动作名称
返回:
bool: 是否移除成功
"""
if action_name not in self._available_actions:
return False
del self._available_actions[action_name]
return True
def temporarily_remove_actions(self, actions_to_remove: List[str]):
"""
临时移除指定的动作,备份原始动作集。
如果已经有备份,则不重复备份。
"""
if self._original_actions_backup is None:
self._original_actions_backup = self._available_actions.copy()
actions_actually_removed = []
for action_name in actions_to_remove:
if action_name in self._available_actions:
del self._available_actions[action_name]
actions_actually_removed.append(action_name)
# logger.debug(f"临时移除了动作: {actions_actually_removed}") # 可选日志
def restore_actions(self):
"""
恢复之前备份的原始动作集。
"""
if self._original_actions_backup is not None:
self._available_actions = self._original_actions_backup.copy()
self._original_actions_backup = None
# logger.debug("恢复了原始动作集") # 可选日志

View File

@@ -1,18 +1,57 @@
from abc import ABC, abstractmethod
from typing import Tuple
from typing import Tuple, Dict, Type
from src.common.logger_manager import get_logger
logger = get_logger("base_action")
# 全局动作注册表
_ACTION_REGISTRY: Dict[str, Type["BaseAction"]] = {}
_DEFAULT_ACTIONS: Dict[str, str] = {}
def register_action(cls):
"""
动作注册装饰器
用法:
@register_action
class MyAction(BaseAction):
action_name = "my_action"
action_description = "我的动作"
...
"""
# 检查类是否有必要的属性
if not hasattr(cls, "action_name") or not hasattr(cls, "action_description"):
logger.error(f"动作类 {cls.__name__} 缺少必要的属性: action_name 或 action_description")
return cls
action_name = getattr(cls, "action_name")
action_description = getattr(cls, "action_description")
is_default = getattr(cls, "default", False)
if not action_name or not action_description:
logger.error(f"动作类 {cls.__name__} 的 action_name 或 action_description 为空")
return cls
# 将动作类注册到全局注册表
_ACTION_REGISTRY[action_name] = cls
# 如果是默认动作,添加到默认动作集
if is_default:
_DEFAULT_ACTIONS[action_name] = action_description
logger.info(f"已注册动作: {action_name} -> {cls.__name__},默认: {is_default}")
return cls
class BaseAction(ABC):
"""动作处理基类接口
"""动作基类接口
所有具体的动作处理类都应该继承这个基类并实现handle_action方法。
所有具体的动作类都应该继承这个基类并实现handle_action方法。
"""
def __init__(self, action_name: str, action_data: dict, reasoning: str, cycle_timers: dict, thinking_id: str):
"""初始化动作处理器
def __init__(self, action_data: dict, reasoning: str, cycle_timers: dict, thinking_id: str):
"""初始化动作
Args:
action_name: 动作名称
@@ -21,7 +60,15 @@ class BaseAction(ABC):
cycle_timers: 计时器字典
thinking_id: 思考ID
"""
self.action_name = action_name
#每个动作必须实现
self.action_name:str = "base_action"
self.action_description:str = "基础动作"
self.action_parameters:dict = {}
self.action_require:list[str] = []
self.default:bool = False
self.action_data = action_data
self.reasoning = reasoning
self.cycle_timers = cycle_timers

View File

@@ -2,7 +2,7 @@ import asyncio
import traceback
from src.common.logger_manager import get_logger
from src.chat.utils.timer_calculator import Timer
from src.chat.focus_chat.planners.actions.base_action import BaseAction
from src.chat.focus_chat.planners.actions.base_action import BaseAction, register_action
from typing import Tuple, List, Callable, Coroutine
from src.chat.heart_flow.observation.observation import Observation
from src.chat.heart_flow.observation.chatting_observation import ChattingObservation
@@ -16,15 +16,25 @@ WAITING_TIME_THRESHOLD = 300 # 等待新消息时间阈值,单位秒
CONSECUTIVE_NO_REPLY_THRESHOLD = 3 # 连续不回复的阈值
@register_action
class NoReplyAction(BaseAction):
"""不回复动作处理类
处理决定不回复的动作。
"""
action_name = "no_reply"
action_description = "不回复"
action_parameters = {}
action_require = [
"话题无关/无聊/不感兴趣/不懂",
"最后一条消息是你自己发的且无人回应你",
"你发送了太多消息,且无人回复"
]
default = True
def __init__(
self,
action_name: str,
action_data: dict,
reasoning: str,
cycle_timers: dict,
@@ -36,6 +46,7 @@ class NoReplyAction(BaseAction):
total_no_reply_count: int = 0,
total_waiting_time: float = 0.0,
shutting_down: bool = False,
**kwargs
):
"""初始化不回复动作处理器
@@ -53,7 +64,7 @@ class NoReplyAction(BaseAction):
total_waiting_time: 累计等待时间
shutting_down: 是否正在关闭
"""
super().__init__(action_name, action_data, reasoning, cycle_timers, thinking_id)
super().__init__(action_data, reasoning, cycle_timers, thinking_id)
self.observations = observations
self.on_consecutive_no_reply_callback = on_consecutive_no_reply_callback
self._current_cycle = current_cycle

View File

@@ -1,22 +1,47 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from src.common.logger_manager import get_logger
from src.chat.heart_flow.observation.chatting_observation import ChattingObservation
from src.chat.focus_chat.hfc_utils import create_empty_anchor_message
from src.chat.focus_chat.planners.actions.base_action import BaseAction
from typing import Tuple, List
from src.chat.focus_chat.heartFC_Cycleinfo import CycleDetail
from src.chat.message_receive.chat_stream import ChatStream
from src.chat.utils.timer_calculator import Timer
from src.chat.focus_chat.planners.actions.base_action import BaseAction, register_action
from typing import Tuple, List, Optional
from src.chat.heart_flow.observation.observation import Observation
from src.chat.focus_chat.expressors.default_expressor import DefaultExpressor
from src.chat.message_receive.chat_stream import ChatStream
from src.chat.focus_chat.heartFC_Cycleinfo import CycleDetail
from src.chat.heart_flow.observation.chatting_observation import ChattingObservation
from src.chat.focus_chat.hfc_utils import create_empty_anchor_message
logger = get_logger("action_taken")
@register_action
class ReplyAction(BaseAction):
"""回复动作处理类
处理发送回复消息的动作,包括文本和表情
处理构建和发送消息回复的动作
"""
action_name:str = "reply"
action_description:str = "表达想法,可以只包含文本、表情或两者都有"
action_parameters:dict[str:str] = {
"text": "你想要表达的内容(可选)",
"emojis": "描述当前使用表情包的场景(可选)",
"target": "你想要回复的原始文本内容(非必须,仅文本,不包含发送者)(可选)",
}
action_require:list[str] = [
"有实质性内容需要表达",
"有人提到你,但你还没有回应他",
"在合适的时候添加表情(不要总是添加)",
"如果你要回复特定某人的某句话或者你想回复较早的消息请在target中指定那句话的原始文本",
"除非有明确的回复目标如果选择了target不用特别提到某个人的人名",
"一次只回复一个人,一次只回复一个话题,突出重点",
"如果是自己发的消息想继续,需自然衔接",
"避免重复或评价自己的发言,不要和自己聊天",
"注意:回复尽量简短一些。可以参考贴吧,知乎和微博的回复风格,回复不要浮夸,不要用夸张修辞,平淡一些。"
]
default = True
def __init__(
self,
action_name: str,
@@ -29,12 +54,13 @@ class ReplyAction(BaseAction):
chat_stream: ChatStream,
current_cycle: CycleDetail,
log_prefix: str,
**kwargs
):
"""初始化回复动作处理器
Args:
action_name: 动作名称
action_data: 动作数据
action_data: 动作数据,包含 message, emojis, target 等
reasoning: 执行该动作的理由
cycle_timers: 计时器字典
thinking_id: 思考ID
@@ -44,16 +70,31 @@ class ReplyAction(BaseAction):
current_cycle: 当前循环信息
log_prefix: 日志前缀
"""
super().__init__(action_name, action_data, reasoning, cycle_timers, thinking_id)
super().__init__(action_data, reasoning, cycle_timers, thinking_id)
self.observations = observations
self.expressor = expressor
self.chat_stream = chat_stream
self._current_cycle = current_cycle
self.log_prefix = log_prefix
self.total_no_reply_count = 0
self.total_waiting_time = 0.0
async def handle_action(self) -> Tuple[bool, str]:
"""
处理回复动作
Returns:
Tuple[bool, str]: (是否执行成功, 回复文本)
"""
# 注意: 此处可能会使用不同的expressor实现根据任务类型切换不同的回复策略
return await self._handle_reply(
reasoning=self.reasoning,
reply_data=self.action_data,
cycle_timers=self.cycle_timers,
thinking_id=self.thinking_id
)
async def _handle_reply(
self, reasoning: str, reply_data: dict, cycle_timers: dict, thinking_id: str
) -> tuple[bool, str]:
"""
处理统一的回复动作 - 可包含文本和表情,顺序任意
@@ -63,9 +104,6 @@ class ReplyAction(BaseAction):
"target": "锚定消息", # 锚定消息的文本内容
"emojis": "微笑" # 表情关键词列表(可选)
}
Returns:
Tuple[bool, str]: (是否执行成功, 回复文本)
"""
# 重置连续不回复计数器
self.total_no_reply_count = 0
@@ -73,7 +111,7 @@ class ReplyAction(BaseAction):
# 从聊天观察获取锚定消息
observations: ChattingObservation = self.observations[0]
anchor_message = observations.serch_message_by_text(self.action_data["target"])
anchor_message = observations.serch_message_by_text(reply_data["target"])
# 如果没有找到锚点消息,创建一个占位符
if not anchor_message:
@@ -85,11 +123,11 @@ class ReplyAction(BaseAction):
anchor_message.update_chat_stream(self.chat_stream)
success, reply_set = await self.expressor.deal_reply(
cycle_timers=self.cycle_timers,
action_data=self.action_data,
cycle_timers=cycle_timers,
action_data=reply_data,
anchor_message=anchor_message,
reasoning=self.reasoning,
thinking_id=self.thinking_id,
reasoning=reasoning,
thinking_id=thinking_id,
)
reply_text = ""

View File

@@ -1,6 +1,6 @@
import json # <--- 确保导入 json
import traceback
from typing import List, Dict, Any
from typing import List, Dict, Any, Optional
from rich.traceback import install
from src.chat.models.utils_model import LLMRequest
from src.config.config import global_config
@@ -10,16 +10,57 @@ from src.chat.focus_chat.info.obs_info import ObsInfo
from src.chat.focus_chat.info.cycle_info import CycleInfo
from src.chat.focus_chat.info.mind_info import MindInfo
from src.chat.focus_chat.info.structured_info import StructuredInfo
from src.chat.focus_chat.planners.action_factory import ActionFactory
from src.common.logger_manager import get_logger
from src.chat.utils.prompt_builder import Prompt, global_prompt_manager
from src.individuality.individuality import Individuality
from src.chat.focus_chat.planners.action_factory import ActionManager
from src.chat.focus_chat.planners.action_factory import ActionInfo
logger = get_logger("planner")
install(extra_lines=3)
def init_prompt():
Prompt(
"""你的名字是{bot_name},{prompt_personality}{chat_context_description}。需要基于以下信息决定如何参与对话:
{chat_content_block}
{mind_info_block}
{cycle_info_block}
请综合分析聊天内容和你看到的新消息参考聊天规划选择合适的action:
{action_options_text}
你必须从上面列出的可用action中选择一个并说明原因。
你的决策必须以严格的 JSON 格式输出,且仅包含 JSON 内容,不要有任何其他文字或解释。
请你以下面格式输出你选择的action
{{
"action": "action_name",
"reasoning": "你的决策理由",
"参数1": "参数1的值",
"参数2": "参数2的值",
"参数3": "参数3的值",
...
}}
请输出你的决策 JSON""",
"planner_prompt",)
Prompt(
"""
action_name: {action_name}
描述:{action_description}
参数:
{action_parameters}
动作要求:
{action_require}
""",
"action_prompt",
)
class ActionPlanner:
def __init__(self, log_prefix: str):
def __init__(self, log_prefix: str, action_manager: ActionManager):
self.log_prefix = log_prefix
# LLM规划器配置
self.planner_llm = LLMRequest(
@@ -27,6 +68,8 @@ class ActionPlanner:
max_tokens=1000,
request_type="action_planning", # 用于动作规划
)
self.action_manager = action_manager
async def plan(self, all_plan_info: List[InfoBase], cycle_timers: dict) -> Dict[str, Any]:
"""
@@ -62,16 +105,15 @@ class ActionPlanner:
logger.debug(f"{self.log_prefix} 结构化信息: {info}")
structured_info = info.get_data()
# 获取我们将传递给 prompt 构建器和用于验证的当前可用动作
current_available_actions = ActionFactory.get_available_actions()
current_available_actions = self.action_manager.get_using_actions()
# --- 构建提示词 (调用修改后的 PromptBuilder 方法) ---
prompt = await prompt_builder.build_planner_prompt(
prompt = await self.build_planner_prompt(
is_group_chat=is_group_chat, # <-- Pass HFC state
chat_target_info=None,
observed_messages_str=observed_messages_str, # <-- Pass local variable
current_mind=current_mind, # <-- Pass argument
structured_info=structured_info, # <-- Pass SubMind info
# structured_info=structured_info, # <-- Pass SubMind info
current_available_actions=current_available_actions, # <-- Pass determined actions
cycle_info=cycle_info, # <-- Pass cycle info
)
@@ -139,9 +181,9 @@ class ActionPlanner:
)
# 恢复原始动作集
ActionFactory.restore_actions()
self.action_manager.restore_actions()
logger.debug(
f"{self.log_prefix}恢复了原始动作集, 当前可用: {list(ActionFactory.get_available_actions().keys())}"
f"{self.log_prefix}恢复了原始动作集, 当前可用: {list(self.action_manager.get_using_actions().keys())}"
)
action_result = {"action_type": action, "action_data": action_data, "reasoning": reasoning}
@@ -154,3 +196,91 @@ class ActionPlanner:
# 返回结果字典
return plan_result
async def build_planner_prompt(
self,
is_group_chat: bool, # Now passed as argument
chat_target_info: Optional[dict], # Now passed as argument
observed_messages_str: str,
current_mind: Optional[str],
current_available_actions: Dict[str, ActionInfo],
cycle_info: Optional[str],
) -> str:
"""构建 Planner LLM 的提示词 (获取模板并填充数据)"""
try:
# --- Determine chat context ---
chat_context_description = "你现在正在一个群聊中"
chat_target_name = None # Only relevant for private
if not is_group_chat and chat_target_info:
chat_target_name = (
chat_target_info.get("person_name") or chat_target_info.get("user_nickname") or "对方"
)
chat_context_description = f"你正在和 {chat_target_name} 私聊"
chat_content_block = ""
if observed_messages_str:
chat_content_block = f"聊天记录:\n{observed_messages_str}"
else:
chat_content_block = "你还未开始聊天"
mind_info_block = ""
if current_mind:
mind_info_block = f"对聊天的规划:{current_mind}"
else:
mind_info_block = "你刚参与聊天"
individuality = Individuality.get_instance()
personality_block = individuality.get_prompt(x_person=2, level=2)
action_options_block = ""
for using_actions_name, using_actions_info in current_available_actions.items():
# print(using_actions_name)
# print(using_actions_info)
# print(using_actions_info["parameters"])
# print(using_actions_info["require"])
# print(using_actions_info["description"])
using_action_prompt = await global_prompt_manager.get_prompt_async("action_prompt")
param_text = ""
for param_name, param_description in using_actions_info["parameters"].items():
param_text += f"{param_name}: {param_description}\n"
require_text = ""
for require_item in using_actions_info["require"]:
require_text += f"- {require_item}\n"
using_action_prompt = using_action_prompt.format(
action_name=using_actions_name,
action_description=using_actions_info["description"],
action_parameters=param_text,
action_require=require_text,
)
action_options_block += using_action_prompt
planner_prompt_template = await global_prompt_manager.get_prompt_async("planner_prompt")
prompt = planner_prompt_template.format(
bot_name=global_config.BOT_NICKNAME,
prompt_personality=personality_block,
chat_context_description=chat_context_description,
chat_content_block=chat_content_block,
mind_info_block=mind_info_block,
cycle_info_block=cycle_info,
action_options_text=action_options_block,
)
return prompt
except Exception as e:
logger.error(f"构建 Planner 提示词时出错: {e}")
logger.error(traceback.format_exc())
return "构建 Planner Prompt 时出错"
init_prompt()

View File

@@ -452,7 +452,7 @@ async def build_anonymous_messages(messages: List[Dict[str, Any]]) -> str:
reply_pattern = r"回复<([^:<>]+):([^:<>]+)>"
def reply_replacer(match):
aaa = match.group(1)
# aaa = match.group(1)
bbb = match.group(2)
anon_reply = get_anon_name(platform, bbb)
return f"回复 {anon_reply}"
@@ -463,7 +463,7 @@ async def build_anonymous_messages(messages: List[Dict[str, Any]]) -> str:
at_pattern = r"@<([^:<>]+):([^:<>]+)>"
def at_replacer(match):
aaa = match.group(1)
# aaa = match.group(1)
bbb = match.group(2)
anon_at = get_anon_name(platform, bbb)
return f"@{anon_at}"