From ba85dd76a4b03d857ef7dcc29893cf4a9471f794 Mon Sep 17 00:00:00 2001 From: SengokuCola <1026294844@qq.com> Date: Wed, 14 May 2025 18:27:42 +0800 Subject: [PATCH] =?UTF-8?q?feat=EF=BC=9A=E4=BD=BF=E7=94=A8action=5Fmanager?= =?UTF-8?q?=E7=BB=9F=E4=B8=80=E8=B0=83=E5=BA=A6action=EF=BC=8C=E5=8F=AF?= =?UTF-8?q?=E6=89=A9=E5=B1=95action?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../expressors/default_expressor.py | 4 +- src/chat/focus_chat/heartFC_chat.py | 12 +- .../focus_chat/heartflow_prompt_builder.py | 134 -------- .../focus_chat/planners/action_factory.py | 306 ++++++++++++------ .../focus_chat/planners/action_manager.py | 72 ----- .../planners/actions/base_action.py | 59 +++- .../planners/actions/no_reply_action.py | 17 +- .../planners/actions/reply_action.py | 76 +++-- src/chat/focus_chat/planners/planner.py | 152 ++++++++- src/chat/utils/chat_message_builder.py | 4 +- 10 files changed, 491 insertions(+), 345 deletions(-) delete mode 100644 src/chat/focus_chat/planners/action_manager.py diff --git a/src/chat/focus_chat/expressors/default_expressor.py b/src/chat/focus_chat/expressors/default_expressor.py index 13aae4a57..37c50c0dc 100644 --- a/src/chat/focus_chat/expressors/default_expressor.py +++ b/src/chat/focus_chat/expressors/default_expressor.py @@ -248,7 +248,7 @@ class DefaultExpressor: return None mark_head = False - first_bot_msg: Optional[MessageSending] = None + # first_bot_msg: Optional[MessageSending] = None reply_message_ids = [] # 记录实际发送的消息ID sent_msg_list = [] @@ -279,7 +279,7 @@ class DefaultExpressor: try: if not mark_head: mark_head = True - first_bot_msg = bot_message # 保存第一个成功发送的消息对象 + # first_bot_msg = bot_message # 保存第一个成功发送的消息对象 typing = False else: typing = True diff --git a/src/chat/focus_chat/heartFC_chat.py b/src/chat/focus_chat/heartFC_chat.py index e0dac89b9..4a28652d1 100644 --- a/src/chat/focus_chat/heartFC_chat.py +++ b/src/chat/focus_chat/heartFC_chat.py @@ -21,8 +21,8 @@ from src.chat.focus_chat.info_processors.tool_processor import ToolProcessor from src.chat.focus_chat.expressors.default_expressor import DefaultExpressor from src.chat.focus_chat.memory_activator import MemoryActivator from src.chat.focus_chat.info_processors.base_processor import BaseProcessor -from src.chat.focus_chat.planners.action_factory import ActionFactory from src.chat.focus_chat.planners.planner import ActionPlanner +from src.chat.focus_chat.planners.action_factory import ActionManager install(extra_lines=3) @@ -88,7 +88,9 @@ class HeartFChatting: self.working_observation = WorkingObservation(observe_id=self.stream_id) self.memory_activator = MemoryActivator() self.expressor = DefaultExpressor(chat_id=self.stream_id) - self.action_planner = ActionPlanner(log_prefix=self.log_prefix) + self.action_manager = ActionManager() + self.action_planner = ActionPlanner(log_prefix=self.log_prefix, action_manager=self.action_manager) + # --- 处理器列表 --- self.processors: List[BaseProcessor] = [] @@ -340,7 +342,7 @@ class HeartFChatting: parallel_end_time = time.time() total_duration = parallel_end_time - parallel_start_time logger.info(f"{self.log_prefix} 所有处理器任务全部完成,总耗时: {total_duration:.2f}秒") - logger.debug(f"{self.log_prefix} 所有信息处理器处理后的信息: {all_plan_info}") + # logger.debug(f"{self.log_prefix} 所有信息处理器处理后的信息: {all_plan_info}") return all_plan_info @@ -444,7 +446,7 @@ class HeartFChatting: """ try: # 使用工厂创建动作处理器实例 - action_handler = ActionFactory.create_action( + action_handler = self.action_manager.create_action( action_name=action, action_data=action_data, reasoning=reasoning, @@ -524,3 +526,5 @@ class HeartFChatting: if last_n is not None: history = history[-last_n:] return [cycle.to_dict() for cycle in history] + + diff --git a/src/chat/focus_chat/heartflow_prompt_builder.py b/src/chat/focus_chat/heartflow_prompt_builder.py index 3086c2716..55fb79b46 100644 --- a/src/chat/focus_chat/heartflow_prompt_builder.py +++ b/src/chat/focus_chat/heartflow_prompt_builder.py @@ -52,69 +52,11 @@ def init_prompt(): "info_from_tools", ) - # Planner提示词 - 修改为要求 JSON 输出 - Prompt( - """你的名字是{bot_name},{prompt_personality},{chat_context_description}。需要基于以下信息决定如何参与对话: -{structured_info_block} -{chat_content_block} -{mind_info_prompt} -{cycle_info_block} - -请综合分析聊天内容和你看到的新消息,参考内心想法,并根据以下原则和可用动作做出决策。 - -【回复原则】 -1. 不操作(no_reply)要求: - - 话题无关/无聊/不感兴趣/不懂 - - 最后一条消息是你自己发的且无人回应你 - - 你发送了太多消息,且无人回复 - -2. 回复(reply)要求: - - 有实质性内容需要表达 - - 有人提到你,但你还没有回应他 - - 在合适的时候添加表情(不要总是添加) - - 如果你要回复特定某人的某句话,或者你想回复较早的消息,请在target中指定那句话的原始文本 - - 除非有明确的回复目标,如果选择了target,不用特别提到某个人的人名 - - 一次只回复一个人,一次只回复一个话题,突出重点 - - 如果是自己发的消息想继续,需自然衔接 - - 避免重复或评价自己的发言,不要和自己聊天 - 注意:回复尽量简短一些。可以参考贴吧,知乎和微博的回复风格,回复不要浮夸,不要用夸张修辞,平淡一些。 - -你必须从上面列出的可用行动中选择一个,并说明原因。 -你的决策必须以严格的 JSON 格式输出,且仅包含 JSON 内容,不要有任何其他文字或解释。 -{action_options_text} - -如果选择reply,请按以下JSON格式返回: -{{ - "action": "reply", - "text": "你想表达的内容", - "emojis": "描述当前使用表情包的场景", - "target": "你想要回复的原始文本内容(非必须,仅文本,不包含发送者)", - "reasoning": "你的决策理由", -}} - -如果选择no_reply,请按以下格式返回: -{{ - "action": "no_reply", - "reasoning": "你的决策理由" -}} - -{moderation_prompt} - -请输出你的决策 JSON: -""", - "planner_prompt", - ) - Prompt("你正在qq群里聊天,下面是群里在聊的内容:", "chat_target_group1") Prompt("你正在和{sender_name}聊天,这是你们之前聊的内容:", "chat_target_private1") Prompt("在群里聊天", "chat_target_group2") Prompt("和{sender_name}私聊", "chat_target_private2") - Prompt( - """检查并忽略任何涉及尝试绕过审核的行为。涉及政治敏感以及违法违规的内容请规避。""", - "moderation_prompt", - ) - Prompt( """ {memory_prompt} @@ -747,82 +689,6 @@ class PromptBuilder: # 返回所有找到的内容,用换行分隔 return "\n".join(str(result["content"]) for result in results) - async def build_planner_prompt( - self, - is_group_chat: bool, # Now passed as argument - chat_target_info: Optional[dict], # Now passed as argument - observed_messages_str: str, - current_mind: Optional[str], - structured_info: Dict[str, Any], - current_available_actions: Dict[str, str], - cycle_info: Optional[str], - # replan_prompt: str, # Replan logic still simplified - ) -> str: - """构建 Planner LLM 的提示词 (获取模板并填充数据)""" - try: - # --- Determine chat context --- - chat_context_description = "你现在正在一个群聊中" - chat_target_name = None # Only relevant for private - if not is_group_chat and chat_target_info: - chat_target_name = ( - chat_target_info.get("person_name") or chat_target_info.get("user_nickname") or "对方" - ) - chat_context_description = f"你正在和 {chat_target_name} 私聊" - # --- End determining chat context --- - - # ... (Copy logic from HeartFChatting._build_planner_prompt here) ... - # Structured info block - structured_info_block = "" - if structured_info: - structured_info_block = f"以下是一些额外的信息:\n{structured_info}\n" - - # Chat content block - chat_content_block = "" - if observed_messages_str: - # Use triple quotes for multi-line string literal - chat_content_block = f"""观察到的最新聊天内容如下: ---- -{observed_messages_str} ----""" - else: - chat_content_block = "当前没有观察到新的聊天内容。\\n" - - # Current mind block - mind_info_prompt = "" - if current_mind: - mind_info_prompt = f"对聊天的规划:{current_mind}" - else: - mind_info_prompt = "你刚参与聊天" - - individuality = Individuality.get_instance() - prompt_personality = individuality.get_prompt(x_person=2, level=2) - - action_options_text = "当前你可以选择的行动有:\n" - action_keys = list(current_available_actions.keys()) - for name in action_keys: - desc = current_available_actions[name] - action_options_text += f"- '{name}': {desc}\n" - - planner_prompt_template = await global_prompt_manager.get_prompt_async("planner_prompt") - - prompt = planner_prompt_template.format( - bot_name=global_config.BOT_NICKNAME, - prompt_personality=prompt_personality, - chat_context_description=chat_context_description, - structured_info_block=structured_info_block, - chat_content_block=chat_content_block, - mind_info_prompt=mind_info_prompt, - cycle_info_block=cycle_info, - action_options_text=action_options_text, - moderation_prompt=await global_prompt_manager.get_prompt_async("moderation_prompt"), - ) - return prompt - - except Exception as e: - logger.error(f"[PromptBuilder] 构建 Planner 提示词时出错: {e}") - logger.error(traceback.format_exc()) - return "[构建 Planner Prompt 时出错]" - def weighted_sample_no_replacement(items, weights, k) -> list: """ diff --git a/src/chat/focus_chat/planners/action_factory.py b/src/chat/focus_chat/planners/action_factory.py index 274206de0..257156a25 100644 --- a/src/chat/focus_chat/planners/action_factory.py +++ b/src/chat/focus_chat/planners/action_factory.py @@ -1,53 +1,93 @@ -from typing import Dict, List, Optional, Callable, Coroutine, Type -from src.chat.focus_chat.planners.actions.base_action import BaseAction -from src.chat.focus_chat.planners.actions.reply_action import ReplyAction -from src.chat.focus_chat.planners.actions.no_reply_action import NoReplyAction +from typing import Dict, List, Optional, Callable, Coroutine, Type, Any, Union +import os +import importlib +from src.chat.focus_chat.planners.actions.base_action import BaseAction, _ACTION_REGISTRY, _DEFAULT_ACTIONS from src.chat.heart_flow.observation.observation import Observation from src.chat.focus_chat.expressors.default_expressor import DefaultExpressor from src.chat.message_receive.chat_stream import ChatStream from src.chat.focus_chat.heartFC_Cycleinfo import CycleDetail from src.common.logger_manager import get_logger +# 导入动作类,确保装饰器被执行 +from src.chat.focus_chat.planners.actions.reply_action import ReplyAction +from src.chat.focus_chat.planners.actions.no_reply_action import NoReplyAction + logger = get_logger("action_factory") +# 定义动作信息类型 +ActionInfo = Dict[str, Any] -class ActionFactory: + +class ActionManager: """ - 动作工厂类,用于创建各种类型的动作处理器 + 动作管理器,用于管理各种类型的动作 """ - # 注册的动作处理器类映射 - _action_handlers: Dict[str, Type[BaseAction]] = { - "reply": ReplyAction, - "no_reply": NoReplyAction, - } + def __init__(self): + """初始化动作管理器""" + # 所有注册的动作集合 + self._registered_actions: Dict[str, ActionInfo] = {} + # 当前正在使用的动作集合,默认加载默认动作 + self._using_actions: Dict[str, ActionInfo] = {} + # 临时备份原始使用中的动作 + self._original_actions_backup: Optional[Dict[str, ActionInfo]] = None + + # 默认动作集,仅作为快照,用于恢复默认 + self._default_actions: Dict[str, ActionInfo] = {} + + # 加载所有已注册动作 + self._load_registered_actions() + + # 初始化时将默认动作加载到使用中的动作 + self._using_actions = self._default_actions.copy() + + # logger.info(f"当前可用动作: {list(self._using_actions.keys())}") + # for action_name, action_info in self._using_actions.items(): + # logger.info(f"动作名称: {action_name}, 动作信息: {action_info}") + - # 可用动作集定义(原ActionManager.DEFAULT_ACTIONS) - DEFAULT_ACTIONS: Dict[str, str] = { - "no_reply": "不操作,继续浏览", - "reply": "表达想法,可以只包含文本、表情或两者都有", - } - _available_actions: Dict[str, str] = DEFAULT_ACTIONS.copy() - _original_actions_backup: Optional[Dict[str, str]] = None - - @classmethod - def register_action_handler(cls, action_name: str, handler_class: Type[BaseAction]) -> None: + def _load_registered_actions(self) -> None: """ - 注册新的动作处理器类 - - Args: - action_name: 动作名称 - handler_class: 处理器类,必须是BaseAction的子类 + 加载所有通过装饰器注册的动作 """ - if not issubclass(handler_class, BaseAction): - raise TypeError(f"{handler_class.__name__} 不是 BaseAction 的子类") + try: + # 从_ACTION_REGISTRY获取所有已注册动作 + for action_name, action_class in _ACTION_REGISTRY.items(): + # 获取动作相关信息 + action_description:str = getattr(action_class, "action_description", "") + action_parameters:dict[str:str] = getattr(action_class, "action_parameters", {}) + action_require:list[str] = getattr(action_class, "action_require", []) + is_default:bool = getattr(action_class, "default", False) + + if action_name and action_description: + # 创建动作信息字典 + action_info = { + "description": action_description, + "parameters": action_parameters, + "require": action_require + } + + # 注册2 + print("注册2") + print(action_info) + + # 添加到所有已注册的动作 + self._registered_actions[action_name] = action_info + + # 添加到默认动作(如果是默认动作) + if is_default: + self._default_actions[action_name] = action_info + + logger.info(f"所有注册动作: {list(self._registered_actions.keys())}") + logger.info(f"默认动作: {list(self._default_actions.keys())}") + # for action_name, action_info in self._default_actions.items(): + # logger.info(f"动作名称: {action_name}, 动作信息: {action_info}") + + except Exception as e: + logger.error(f"加载已注册动作失败: {e}") - cls._action_handlers[action_name] = handler_class - logger.info(f"已注册动作处理器: {action_name} -> {handler_class.__name__}") - - @classmethod def create_action( - cls, + self, action_name: str, action_data: dict, reasoning: str, @@ -85,81 +125,163 @@ class ActionFactory: Returns: Optional[BaseAction]: 创建的动作处理器实例,如果动作名称未注册则返回None """ - handler_class = cls._action_handlers.get(action_name) + # 检查动作是否在当前使用的动作集中 + if action_name not in self._using_actions: + logger.warning(f"当前不可用的动作类型: {action_name}") + return None + + handler_class = _ACTION_REGISTRY.get(action_name) if not handler_class: logger.warning(f"未注册的动作类型: {action_name}") return None try: - if action_name == "reply": - return handler_class( - action_name=action_name, - action_data=action_data, - reasoning=reasoning, - cycle_timers=cycle_timers, - thinking_id=thinking_id, - observations=observations, - expressor=expressor, - chat_stream=chat_stream, - current_cycle=current_cycle, - log_prefix=log_prefix, - ) - elif action_name == "no_reply": - return handler_class( - action_name=action_name, - action_data=action_data, - reasoning=reasoning, - cycle_timers=cycle_timers, - thinking_id=thinking_id, - observations=observations, - on_consecutive_no_reply_callback=on_consecutive_no_reply_callback, - current_cycle=current_cycle, - log_prefix=log_prefix, - total_no_reply_count=total_no_reply_count, - total_waiting_time=total_waiting_time, - shutting_down=shutting_down, - ) - else: - # 对于未来可能添加的其他动作类型,可以在这里扩展 - logger.warning(f"未实现的动作处理逻辑: {action_name}") - return None + # 创建动作实例并传递所有必要参数 + instance = handler_class( + action_name=action_name, + action_data=action_data, + reasoning=reasoning, + cycle_timers=cycle_timers, + thinking_id=thinking_id, + observations=observations, + on_consecutive_no_reply_callback=on_consecutive_no_reply_callback, + current_cycle=current_cycle, + log_prefix=log_prefix, + total_no_reply_count=total_no_reply_count, + total_waiting_time=total_waiting_time, + shutting_down=shutting_down, + expressor=expressor, + chat_stream=chat_stream, + ) + + return instance except Exception as e: logger.error(f"创建动作处理器实例失败: {e}") return None - @classmethod - def get_available_actions(cls) -> Dict[str, str]: - """获取当前可用的动作集""" - return cls._available_actions.copy() + def get_registered_actions(self) -> Dict[str, ActionInfo]: + """获取所有已注册的动作集""" + return self._registered_actions.copy() - @classmethod - def add_action(cls, action_name: str, description: str) -> bool: - """添加新的动作""" - if action_name in cls._available_actions: + def get_default_actions(self) -> Dict[str, ActionInfo]: + """获取默认动作集""" + return self._default_actions.copy() + + def get_using_actions(self) -> Dict[str, ActionInfo]: + """获取当前正在使用的动作集""" + return self._using_actions.copy() + + def add_action_to_using(self, action_name: str) -> bool: + """ + 添加已注册的动作到当前使用的动作集 + + Args: + action_name: 动作名称 + + Returns: + bool: 添加是否成功 + """ + if action_name not in self._registered_actions: + logger.warning(f"添加失败: 动作 {action_name} 未注册") return False - cls._available_actions[action_name] = description + + if action_name in self._using_actions: + logger.info(f"动作 {action_name} 已经在使用中") + return True + + self._using_actions[action_name] = self._registered_actions[action_name] + logger.info(f"添加动作 {action_name} 到使用集") return True - @classmethod - def remove_action(cls, action_name: str) -> bool: - """移除指定动作""" - if action_name not in cls._available_actions: + def remove_action_from_using(self, action_name: str) -> bool: + """ + 从当前使用的动作集中移除指定动作 + + Args: + action_name: 动作名称 + + Returns: + bool: 移除是否成功 + """ + if action_name not in self._using_actions: + logger.warning(f"移除失败: 动作 {action_name} 不在当前使用的动作集中") return False - del cls._available_actions[action_name] + + del self._using_actions[action_name] + logger.info(f"已从使用集中移除动作 {action_name}") return True - @classmethod - def temporarily_remove_actions(cls, actions_to_remove: List[str]) -> None: - """临时移除指定动作,备份原始动作集""" - if cls._original_actions_backup is None: - cls._original_actions_backup = cls._available_actions.copy() + def add_action(self, action_name: str, description: str, parameters: Dict = None, require: List = None) -> bool: + """ + 添加新的动作到注册集 + + Args: + action_name: 动作名称 + description: 动作描述 + parameters: 动作参数定义,默认为空字典 + require: 动作依赖项,默认为空列表 + + Returns: + bool: 添加是否成功 + """ + if action_name in self._registered_actions: + return False + + if parameters is None: + parameters = {} + if require is None: + require = [] + + action_info = { + "description": description, + "parameters": parameters, + "require": require + } + + self._registered_actions[action_name] = action_info + return True + + def remove_action(self, action_name: str) -> bool: + """从注册集移除指定动作""" + if action_name not in self._registered_actions: + return False + del self._registered_actions[action_name] + # 如果在使用集中也存在,一并移除 + if action_name in self._using_actions: + del self._using_actions[action_name] + return True + + def temporarily_remove_actions(self, actions_to_remove: List[str]) -> None: + """临时移除使用集中的指定动作,备份原始使用集""" + if self._original_actions_backup is None: + self._original_actions_backup = self._using_actions.copy() for name in actions_to_remove: - cls._available_actions.pop(name, None) + self._using_actions.pop(name, None) - @classmethod - def restore_actions(cls) -> None: - """恢复之前备份的原始动作集""" - if cls._original_actions_backup is not None: - cls._available_actions = cls._original_actions_backup.copy() - cls._original_actions_backup = None + def restore_actions(self) -> None: + """恢复之前备份的原始使用集""" + if self._original_actions_backup is not None: + self._using_actions = self._original_actions_backup.copy() + self._original_actions_backup = None + + def restore_default_actions(self) -> None: + """恢复默认动作集到使用集""" + self._using_actions = self._default_actions.copy() + self._original_actions_backup = None + + def get_action(self, action_name: str) -> Optional[Type[BaseAction]]: + """ + 获取指定动作的处理器类 + + Args: + action_name: 动作名称 + + Returns: + Optional[Type[BaseAction]]: 动作处理器类,如果不存在则返回None + """ + return _ACTION_REGISTRY.get(action_name) + + +# 创建全局实例 +ActionFactory = ActionManager() diff --git a/src/chat/focus_chat/planners/action_manager.py b/src/chat/focus_chat/planners/action_manager.py deleted file mode 100644 index 782c60973..000000000 --- a/src/chat/focus_chat/planners/action_manager.py +++ /dev/null @@ -1,72 +0,0 @@ -from typing import List, Optional, Dict - -# 默认动作定义 -DEFAULT_ACTIONS = {"no_reply": "不操作,继续浏览", "reply": "表达想法,可以只包含文本、表情或两者都有"} - - -class ActionManager: - """动作管理器:控制每次决策可以使用的动作""" - - def __init__(self): - # 初始化为新的默认动作集 - self._available_actions: Dict[str, str] = DEFAULT_ACTIONS.copy() - self._original_actions_backup: Optional[Dict[str, str]] = None - - def get_available_actions(self) -> Dict[str, str]: - """获取当前可用的动作集""" - return self._available_actions.copy() # 返回副本以防外部修改 - - def add_action(self, action_name: str, description: str) -> bool: - """ - 添加新的动作 - - 参数: - action_name: 动作名称 - description: 动作描述 - - 返回: - bool: 是否添加成功 - """ - if action_name in self._available_actions: - return False - self._available_actions[action_name] = description - return True - - def remove_action(self, action_name: str) -> bool: - """ - 移除指定动作 - - 参数: - action_name: 动作名称 - - 返回: - bool: 是否移除成功 - """ - if action_name not in self._available_actions: - return False - del self._available_actions[action_name] - return True - - def temporarily_remove_actions(self, actions_to_remove: List[str]): - """ - 临时移除指定的动作,备份原始动作集。 - 如果已经有备份,则不重复备份。 - """ - if self._original_actions_backup is None: - self._original_actions_backup = self._available_actions.copy() - - actions_actually_removed = [] - for action_name in actions_to_remove: - if action_name in self._available_actions: - del self._available_actions[action_name] - actions_actually_removed.append(action_name) - # logger.debug(f"临时移除了动作: {actions_actually_removed}") # 可选日志 - - def restore_actions(self): - """ - 恢复之前备份的原始动作集。 - """ - if self._original_actions_backup is not None: - self._available_actions = self._original_actions_backup.copy() - self._original_actions_backup = None - # logger.debug("恢复了原始动作集") # 可选日志 diff --git a/src/chat/focus_chat/planners/actions/base_action.py b/src/chat/focus_chat/planners/actions/base_action.py index 43d12eff9..7c77c300c 100644 --- a/src/chat/focus_chat/planners/actions/base_action.py +++ b/src/chat/focus_chat/planners/actions/base_action.py @@ -1,18 +1,57 @@ from abc import ABC, abstractmethod -from typing import Tuple +from typing import Tuple, Dict, Type from src.common.logger_manager import get_logger logger = get_logger("base_action") +# 全局动作注册表 +_ACTION_REGISTRY: Dict[str, Type["BaseAction"]] = {} +_DEFAULT_ACTIONS: Dict[str, str] = {} + + +def register_action(cls): + """ + 动作注册装饰器 + + 用法: + @register_action + class MyAction(BaseAction): + action_name = "my_action" + action_description = "我的动作" + ... + """ + # 检查类是否有必要的属性 + if not hasattr(cls, "action_name") or not hasattr(cls, "action_description"): + logger.error(f"动作类 {cls.__name__} 缺少必要的属性: action_name 或 action_description") + return cls + + action_name = getattr(cls, "action_name") + action_description = getattr(cls, "action_description") + is_default = getattr(cls, "default", False) + + if not action_name or not action_description: + logger.error(f"动作类 {cls.__name__} 的 action_name 或 action_description 为空") + return cls + + # 将动作类注册到全局注册表 + _ACTION_REGISTRY[action_name] = cls + + # 如果是默认动作,添加到默认动作集 + if is_default: + _DEFAULT_ACTIONS[action_name] = action_description + + logger.info(f"已注册动作: {action_name} -> {cls.__name__},默认: {is_default}") + return cls + class BaseAction(ABC): - """动作处理基类接口 + """动作基类接口 - 所有具体的动作处理类都应该继承这个基类,并实现handle_action方法。 + 所有具体的动作类都应该继承这个基类,并实现handle_action方法。 """ - def __init__(self, action_name: str, action_data: dict, reasoning: str, cycle_timers: dict, thinking_id: str): - """初始化动作处理器 + def __init__(self, action_data: dict, reasoning: str, cycle_timers: dict, thinking_id: str): + """初始化动作 Args: action_name: 动作名称 @@ -21,7 +60,15 @@ class BaseAction(ABC): cycle_timers: 计时器字典 thinking_id: 思考ID """ - self.action_name = action_name + #每个动作必须实现 + self.action_name:str = "base_action" + self.action_description:str = "基础动作" + self.action_parameters:dict = {} + self.action_require:list[str] = [] + + self.default:bool = False + + self.action_data = action_data self.reasoning = reasoning self.cycle_timers = cycle_timers diff --git a/src/chat/focus_chat/planners/actions/no_reply_action.py b/src/chat/focus_chat/planners/actions/no_reply_action.py index 01167bcb4..a29812c7a 100644 --- a/src/chat/focus_chat/planners/actions/no_reply_action.py +++ b/src/chat/focus_chat/planners/actions/no_reply_action.py @@ -2,7 +2,7 @@ import asyncio import traceback from src.common.logger_manager import get_logger from src.chat.utils.timer_calculator import Timer -from src.chat.focus_chat.planners.actions.base_action import BaseAction +from src.chat.focus_chat.planners.actions.base_action import BaseAction, register_action from typing import Tuple, List, Callable, Coroutine from src.chat.heart_flow.observation.observation import Observation from src.chat.heart_flow.observation.chatting_observation import ChattingObservation @@ -16,15 +16,25 @@ WAITING_TIME_THRESHOLD = 300 # 等待新消息时间阈值,单位秒 CONSECUTIVE_NO_REPLY_THRESHOLD = 3 # 连续不回复的阈值 +@register_action class NoReplyAction(BaseAction): """不回复动作处理类 处理决定不回复的动作。 """ + action_name = "no_reply" + action_description = "不回复" + action_parameters = {} + action_require = [ + "话题无关/无聊/不感兴趣/不懂", + "最后一条消息是你自己发的且无人回应你", + "你发送了太多消息,且无人回复" + ] + default = True + def __init__( self, - action_name: str, action_data: dict, reasoning: str, cycle_timers: dict, @@ -36,6 +46,7 @@ class NoReplyAction(BaseAction): total_no_reply_count: int = 0, total_waiting_time: float = 0.0, shutting_down: bool = False, + **kwargs ): """初始化不回复动作处理器 @@ -53,7 +64,7 @@ class NoReplyAction(BaseAction): total_waiting_time: 累计等待时间 shutting_down: 是否正在关闭 """ - super().__init__(action_name, action_data, reasoning, cycle_timers, thinking_id) + super().__init__(action_data, reasoning, cycle_timers, thinking_id) self.observations = observations self.on_consecutive_no_reply_callback = on_consecutive_no_reply_callback self._current_cycle = current_cycle diff --git a/src/chat/focus_chat/planners/actions/reply_action.py b/src/chat/focus_chat/planners/actions/reply_action.py index 3198847df..7b2e88fa0 100644 --- a/src/chat/focus_chat/planners/actions/reply_action.py +++ b/src/chat/focus_chat/planners/actions/reply_action.py @@ -1,22 +1,47 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + from src.common.logger_manager import get_logger -from src.chat.heart_flow.observation.chatting_observation import ChattingObservation -from src.chat.focus_chat.hfc_utils import create_empty_anchor_message -from src.chat.focus_chat.planners.actions.base_action import BaseAction -from typing import Tuple, List -from src.chat.focus_chat.heartFC_Cycleinfo import CycleDetail -from src.chat.message_receive.chat_stream import ChatStream +from src.chat.utils.timer_calculator import Timer +from src.chat.focus_chat.planners.actions.base_action import BaseAction, register_action +from typing import Tuple, List, Optional from src.chat.heart_flow.observation.observation import Observation from src.chat.focus_chat.expressors.default_expressor import DefaultExpressor +from src.chat.message_receive.chat_stream import ChatStream +from src.chat.focus_chat.heartFC_Cycleinfo import CycleDetail +from src.chat.heart_flow.observation.chatting_observation import ChattingObservation +from src.chat.focus_chat.hfc_utils import create_empty_anchor_message logger = get_logger("action_taken") +@register_action class ReplyAction(BaseAction): """回复动作处理类 - 处理发送回复消息的动作,包括文本和表情。 + 处理构建和发送消息回复的动作。 """ + action_name:str = "reply" + action_description:str = "表达想法,可以只包含文本、表情或两者都有" + action_parameters:dict[str:str] = { + "text": "你想要表达的内容(可选)", + "emojis": "描述当前使用表情包的场景(可选)", + "target": "你想要回复的原始文本内容(非必须,仅文本,不包含发送者)(可选)", + } + action_require:list[str] = [ + "有实质性内容需要表达", + "有人提到你,但你还没有回应他", + "在合适的时候添加表情(不要总是添加)", + "如果你要回复特定某人的某句话,或者你想回复较早的消息,请在target中指定那句话的原始文本", + "除非有明确的回复目标,如果选择了target,不用特别提到某个人的人名", + "一次只回复一个人,一次只回复一个话题,突出重点", + "如果是自己发的消息想继续,需自然衔接", + "避免重复或评价自己的发言,不要和自己聊天", + "注意:回复尽量简短一些。可以参考贴吧,知乎和微博的回复风格,回复不要浮夸,不要用夸张修辞,平淡一些。" + ] + default = True + def __init__( self, action_name: str, @@ -29,12 +54,13 @@ class ReplyAction(BaseAction): chat_stream: ChatStream, current_cycle: CycleDetail, log_prefix: str, + **kwargs ): """初始化回复动作处理器 Args: action_name: 动作名称 - action_data: 动作数据 + action_data: 动作数据,包含 message, emojis, target 等 reasoning: 执行该动作的理由 cycle_timers: 计时器字典 thinking_id: 思考ID @@ -44,16 +70,31 @@ class ReplyAction(BaseAction): current_cycle: 当前循环信息 log_prefix: 日志前缀 """ - super().__init__(action_name, action_data, reasoning, cycle_timers, thinking_id) + super().__init__(action_data, reasoning, cycle_timers, thinking_id) self.observations = observations self.expressor = expressor self.chat_stream = chat_stream self._current_cycle = current_cycle self.log_prefix = log_prefix - self.total_no_reply_count = 0 - self.total_waiting_time = 0.0 async def handle_action(self) -> Tuple[bool, str]: + """ + 处理回复动作 + + Returns: + Tuple[bool, str]: (是否执行成功, 回复文本) + """ + # 注意: 此处可能会使用不同的expressor实现根据任务类型切换不同的回复策略 + return await self._handle_reply( + reasoning=self.reasoning, + reply_data=self.action_data, + cycle_timers=self.cycle_timers, + thinking_id=self.thinking_id + ) + + async def _handle_reply( + self, reasoning: str, reply_data: dict, cycle_timers: dict, thinking_id: str + ) -> tuple[bool, str]: """ 处理统一的回复动作 - 可包含文本和表情,顺序任意 @@ -63,9 +104,6 @@ class ReplyAction(BaseAction): "target": "锚定消息", # 锚定消息的文本内容 "emojis": "微笑" # 表情关键词列表(可选) } - - Returns: - Tuple[bool, str]: (是否执行成功, 回复文本) """ # 重置连续不回复计数器 self.total_no_reply_count = 0 @@ -73,7 +111,7 @@ class ReplyAction(BaseAction): # 从聊天观察获取锚定消息 observations: ChattingObservation = self.observations[0] - anchor_message = observations.serch_message_by_text(self.action_data["target"]) + anchor_message = observations.serch_message_by_text(reply_data["target"]) # 如果没有找到锚点消息,创建一个占位符 if not anchor_message: @@ -85,11 +123,11 @@ class ReplyAction(BaseAction): anchor_message.update_chat_stream(self.chat_stream) success, reply_set = await self.expressor.deal_reply( - cycle_timers=self.cycle_timers, - action_data=self.action_data, + cycle_timers=cycle_timers, + action_data=reply_data, anchor_message=anchor_message, - reasoning=self.reasoning, - thinking_id=self.thinking_id, + reasoning=reasoning, + thinking_id=thinking_id, ) reply_text = "" diff --git a/src/chat/focus_chat/planners/planner.py b/src/chat/focus_chat/planners/planner.py index 3303afaac..bb87e1da7 100644 --- a/src/chat/focus_chat/planners/planner.py +++ b/src/chat/focus_chat/planners/planner.py @@ -1,6 +1,6 @@ import json # <--- 确保导入 json import traceback -from typing import List, Dict, Any +from typing import List, Dict, Any, Optional from rich.traceback import install from src.chat.models.utils_model import LLMRequest from src.config.config import global_config @@ -10,16 +10,57 @@ from src.chat.focus_chat.info.obs_info import ObsInfo from src.chat.focus_chat.info.cycle_info import CycleInfo from src.chat.focus_chat.info.mind_info import MindInfo from src.chat.focus_chat.info.structured_info import StructuredInfo -from src.chat.focus_chat.planners.action_factory import ActionFactory from src.common.logger_manager import get_logger - +from src.chat.utils.prompt_builder import Prompt, global_prompt_manager +from src.individuality.individuality import Individuality +from src.chat.focus_chat.planners.action_factory import ActionManager +from src.chat.focus_chat.planners.action_factory import ActionInfo logger = get_logger("planner") install(extra_lines=3) +def init_prompt(): + Prompt( + """你的名字是{bot_name},{prompt_personality},{chat_context_description}。需要基于以下信息决定如何参与对话: +{chat_content_block} +{mind_info_block} +{cycle_info_block} + +请综合分析聊天内容和你看到的新消息,参考聊天规划,选择合适的action: + +{action_options_text} + +你必须从上面列出的可用action中选择一个,并说明原因。 +你的决策必须以严格的 JSON 格式输出,且仅包含 JSON 内容,不要有任何其他文字或解释。 + +请你以下面格式输出你选择的action: +{{ + "action": "action_name", + "reasoning": "你的决策理由", + "参数1": "参数1的值", + "参数2": "参数2的值", + "参数3": "参数3的值", + ... +}} + +请输出你的决策 JSON:""", +"planner_prompt",) + + Prompt( + """ +action_name: {action_name} + 描述:{action_description} + 参数: + {action_parameters} + 动作要求: + {action_require} + """, + "action_prompt", + ) + class ActionPlanner: - def __init__(self, log_prefix: str): + def __init__(self, log_prefix: str, action_manager: ActionManager): self.log_prefix = log_prefix # LLM规划器配置 self.planner_llm = LLMRequest( @@ -27,6 +68,8 @@ class ActionPlanner: max_tokens=1000, request_type="action_planning", # 用于动作规划 ) + + self.action_manager = action_manager async def plan(self, all_plan_info: List[InfoBase], cycle_timers: dict) -> Dict[str, Any]: """ @@ -62,16 +105,15 @@ class ActionPlanner: logger.debug(f"{self.log_prefix} 结构化信息: {info}") structured_info = info.get_data() - # 获取我们将传递给 prompt 构建器和用于验证的当前可用动作 - current_available_actions = ActionFactory.get_available_actions() - + current_available_actions = self.action_manager.get_using_actions() + # --- 构建提示词 (调用修改后的 PromptBuilder 方法) --- - prompt = await prompt_builder.build_planner_prompt( + prompt = await self.build_planner_prompt( is_group_chat=is_group_chat, # <-- Pass HFC state chat_target_info=None, observed_messages_str=observed_messages_str, # <-- Pass local variable current_mind=current_mind, # <-- Pass argument - structured_info=structured_info, # <-- Pass SubMind info + # structured_info=structured_info, # <-- Pass SubMind info current_available_actions=current_available_actions, # <-- Pass determined actions cycle_info=cycle_info, # <-- Pass cycle info ) @@ -139,9 +181,9 @@ class ActionPlanner: ) # 恢复原始动作集 - ActionFactory.restore_actions() + self.action_manager.restore_actions() logger.debug( - f"{self.log_prefix}恢复了原始动作集, 当前可用: {list(ActionFactory.get_available_actions().keys())}" + f"{self.log_prefix}恢复了原始动作集, 当前可用: {list(self.action_manager.get_using_actions().keys())}" ) action_result = {"action_type": action, "action_data": action_data, "reasoning": reasoning} @@ -154,3 +196,91 @@ class ActionPlanner: # 返回结果字典 return plan_result + + + async def build_planner_prompt( + self, + is_group_chat: bool, # Now passed as argument + chat_target_info: Optional[dict], # Now passed as argument + observed_messages_str: str, + current_mind: Optional[str], + current_available_actions: Dict[str, ActionInfo], + cycle_info: Optional[str], + ) -> str: + """构建 Planner LLM 的提示词 (获取模板并填充数据)""" + try: + # --- Determine chat context --- + chat_context_description = "你现在正在一个群聊中" + chat_target_name = None # Only relevant for private + if not is_group_chat and chat_target_info: + chat_target_name = ( + chat_target_info.get("person_name") or chat_target_info.get("user_nickname") or "对方" + ) + chat_context_description = f"你正在和 {chat_target_name} 私聊" + + + chat_content_block = "" + if observed_messages_str: + chat_content_block = f"聊天记录:\n{observed_messages_str}" + else: + chat_content_block = "你还未开始聊天" + + mind_info_block = "" + if current_mind: + mind_info_block = f"对聊天的规划:{current_mind}" + else: + mind_info_block = "你刚参与聊天" + + individuality = Individuality.get_instance() + personality_block = individuality.get_prompt(x_person=2, level=2) + + + action_options_block = "" + for using_actions_name, using_actions_info in current_available_actions.items(): + # print(using_actions_name) + # print(using_actions_info) + # print(using_actions_info["parameters"]) + # print(using_actions_info["require"]) + # print(using_actions_info["description"]) + + using_action_prompt = await global_prompt_manager.get_prompt_async("action_prompt") + + param_text = "" + for param_name, param_description in using_actions_info["parameters"].items(): + param_text += f"{param_name}: {param_description}\n" + + require_text = "" + for require_item in using_actions_info["require"]: + require_text += f"- {require_item}\n" + + using_action_prompt = using_action_prompt.format( + action_name=using_actions_name, + action_description=using_actions_info["description"], + action_parameters=param_text, + action_require=require_text, + ) + + action_options_block += using_action_prompt + + + + + planner_prompt_template = await global_prompt_manager.get_prompt_async("planner_prompt") + prompt = planner_prompt_template.format( + bot_name=global_config.BOT_NICKNAME, + prompt_personality=personality_block, + chat_context_description=chat_context_description, + chat_content_block=chat_content_block, + mind_info_block=mind_info_block, + cycle_info_block=cycle_info, + action_options_text=action_options_block, + ) + return prompt + + except Exception as e: + logger.error(f"构建 Planner 提示词时出错: {e}") + logger.error(traceback.format_exc()) + return "构建 Planner Prompt 时出错" + + +init_prompt() diff --git a/src/chat/utils/chat_message_builder.py b/src/chat/utils/chat_message_builder.py index a740d7192..15b1e4fc6 100644 --- a/src/chat/utils/chat_message_builder.py +++ b/src/chat/utils/chat_message_builder.py @@ -452,7 +452,7 @@ async def build_anonymous_messages(messages: List[Dict[str, Any]]) -> str: reply_pattern = r"回复<([^:<>]+):([^:<>]+)>" def reply_replacer(match): - aaa = match.group(1) + # aaa = match.group(1) bbb = match.group(2) anon_reply = get_anon_name(platform, bbb) return f"回复 {anon_reply}" @@ -463,7 +463,7 @@ async def build_anonymous_messages(messages: List[Dict[str, Any]]) -> str: at_pattern = r"@<([^:<>]+):([^:<>]+)>" def at_replacer(match): - aaa = match.group(1) + # aaa = match.group(1) bbb = match.group(2) anon_at = get_anon_name(platform, bbb) return f"@{anon_at}"