Merge pull request #1038 from A0000Xz/dev

给base_action添加了一个能指定发送类型的消息发送方法
This commit is contained in:
SengokuCola
2025-06-12 20:47:34 +08:00
committed by GitHub

View File

@@ -1,402 +1,436 @@
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from typing import Tuple from typing import Tuple
from src.common.logger import get_logger from src.common.logger import get_logger
from src.plugin_system.apis.plugin_api import PluginAPI from src.plugin_system.apis.plugin_api import PluginAPI
from src.plugin_system.base.component_types import ActionActivationType, ChatMode, ActionInfo, ComponentType from src.plugin_system.base.component_types import ActionActivationType, ChatMode, ActionInfo, ComponentType
logger = get_logger("base_action") logger = get_logger("base_action")
class BaseAction(ABC): class BaseAction(ABC):
"""Action组件基类 """Action组件基类
Action是插件的一种组件类型用于处理聊天中的动作逻辑 Action是插件的一种组件类型用于处理聊天中的动作逻辑
子类可以通过类属性定义激活条件,这些会在实例化时转换为实例属性: 子类可以通过类属性定义激活条件,这些会在实例化时转换为实例属性:
- focus_activation_type: 专注模式激活类型 - focus_activation_type: 专注模式激活类型
- normal_activation_type: 普通模式激活类型 - normal_activation_type: 普通模式激活类型
- activation_keywords: 激活关键词列表 - activation_keywords: 激活关键词列表
- keyword_case_sensitive: 关键词是否区分大小写 - keyword_case_sensitive: 关键词是否区分大小写
- mode_enable: 启用的聊天模式 - mode_enable: 启用的聊天模式
- parallel_action: 是否允许并行执行 - parallel_action: 是否允许并行执行
- random_activation_probability: 随机激活概率 - random_activation_probability: 随机激活概率
- llm_judge_prompt: LLM判断提示词 - llm_judge_prompt: LLM判断提示词
""" """
def __init__( def __init__(
self, self,
action_data: dict, action_data: dict,
reasoning: str, reasoning: str,
cycle_timers: dict, cycle_timers: dict,
thinking_id: str, thinking_id: str,
observations: list = None, observations: list = None,
expressor=None, expressor=None,
replyer=None, replyer=None,
chat_stream=None, chat_stream=None,
log_prefix: str = "", log_prefix: str = "",
shutting_down: bool = False, shutting_down: bool = False,
plugin_config: dict = None, plugin_config: dict = None,
**kwargs, **kwargs,
): ):
"""初始化Action组件 """初始化Action组件
Args: Args:
action_data: 动作数据 action_data: 动作数据
reasoning: 执行该动作的理由 reasoning: 执行该动作的理由
cycle_timers: 计时器字典 cycle_timers: 计时器字典
thinking_id: 思考ID thinking_id: 思考ID
observations: 观察列表 observations: 观察列表
expressor: 表达器对象 expressor: 表达器对象
replyer: 回复器对象 replyer: 回复器对象
chat_stream: 聊天流对象 chat_stream: 聊天流对象
log_prefix: 日志前缀 log_prefix: 日志前缀
shutting_down: 是否正在关闭 shutting_down: 是否正在关闭
plugin_config: 插件配置字典 plugin_config: 插件配置字典
**kwargs: 其他参数 **kwargs: 其他参数
""" """
self.action_data = action_data self.action_data = action_data
self.reasoning = reasoning self.reasoning = reasoning
self.cycle_timers = cycle_timers self.cycle_timers = cycle_timers
self.thinking_id = thinking_id self.thinking_id = thinking_id
self.log_prefix = log_prefix self.log_prefix = log_prefix
self.shutting_down = shutting_down self.shutting_down = shutting_down
# 设置动作基本信息实例属性(兼容旧系统) # 设置动作基本信息实例属性(兼容旧系统)
self.action_name: str = getattr(self, "action_name", self.__class__.__name__.lower().replace("action", "")) self.action_name: str = getattr(self, "action_name", self.__class__.__name__.lower().replace("action", ""))
self.action_description: str = getattr(self, "action_description", self.__doc__ or "Action组件") self.action_description: str = getattr(self, "action_description", self.__doc__ or "Action组件")
self.action_parameters: dict = getattr(self.__class__, "action_parameters", {}).copy() self.action_parameters: dict = getattr(self.__class__, "action_parameters", {}).copy()
self.action_require: list[str] = getattr(self.__class__, "action_require", []).copy() self.action_require: list[str] = getattr(self.__class__, "action_require", []).copy()
# 设置激活类型实例属性(从类属性复制,提供默认值) # 设置激活类型实例属性(从类属性复制,提供默认值)
self.focus_activation_type: str = self._get_activation_type_value("focus_activation_type", "never") self.focus_activation_type: str = self._get_activation_type_value("focus_activation_type", "never")
self.normal_activation_type: str = self._get_activation_type_value("normal_activation_type", "never") self.normal_activation_type: str = self._get_activation_type_value("normal_activation_type", "never")
self.random_activation_probability: float = getattr(self.__class__, "random_activation_probability", 0.0) self.random_activation_probability: float = getattr(self.__class__, "random_activation_probability", 0.0)
self.llm_judge_prompt: str = getattr(self.__class__, "llm_judge_prompt", "") self.llm_judge_prompt: str = getattr(self.__class__, "llm_judge_prompt", "")
self.activation_keywords: list[str] = getattr(self.__class__, "activation_keywords", []).copy() self.activation_keywords: list[str] = getattr(self.__class__, "activation_keywords", []).copy()
self.keyword_case_sensitive: bool = getattr(self.__class__, "keyword_case_sensitive", False) self.keyword_case_sensitive: bool = getattr(self.__class__, "keyword_case_sensitive", False)
self.mode_enable: str = self._get_mode_value("mode_enable", "all") self.mode_enable: str = self._get_mode_value("mode_enable", "all")
self.parallel_action: bool = getattr(self.__class__, "parallel_action", True) self.parallel_action: bool = getattr(self.__class__, "parallel_action", True)
self.associated_types: list[str] = getattr(self.__class__, "associated_types", []).copy() self.associated_types: list[str] = getattr(self.__class__, "associated_types", []).copy()
self.enable_plugin: bool = True # 默认启用 self.enable_plugin: bool = True # 默认启用
# 创建API实例传递所有服务对象 # 创建API实例传递所有服务对象
self.api = PluginAPI( self.api = PluginAPI(
chat_stream=chat_stream or kwargs.get("chat_stream"), chat_stream=chat_stream or kwargs.get("chat_stream"),
expressor=expressor or kwargs.get("expressor"), expressor=expressor or kwargs.get("expressor"),
replyer=replyer or kwargs.get("replyer"), replyer=replyer or kwargs.get("replyer"),
observations=observations or kwargs.get("observations", []), observations=observations or kwargs.get("observations", []),
log_prefix=log_prefix, log_prefix=log_prefix,
plugin_config=plugin_config or kwargs.get("plugin_config"), plugin_config=plugin_config or kwargs.get("plugin_config"),
) )
# 设置API的action上下文 # 设置API的action上下文
self.api.set_action_context(thinking_id=thinking_id, shutting_down=shutting_down) self.api.set_action_context(thinking_id=thinking_id, shutting_down=shutting_down)
logger.debug(f"{self.log_prefix} Action组件初始化完成") logger.debug(f"{self.log_prefix} Action组件初始化完成")
def _get_activation_type_value(self, attr_name: str, default: str) -> str: def _get_activation_type_value(self, attr_name: str, default: str) -> str:
"""获取激活类型的字符串值""" """获取激活类型的字符串值"""
attr = getattr(self.__class__, attr_name, None) attr = getattr(self.__class__, attr_name, None)
if attr is None: if attr is None:
return default return default
if hasattr(attr, "value"): if hasattr(attr, "value"):
return attr.value return attr.value
return str(attr) return str(attr)
def _get_mode_value(self, attr_name: str, default: str) -> str: def _get_mode_value(self, attr_name: str, default: str) -> str:
"""获取模式的字符串值""" """获取模式的字符串值"""
attr = getattr(self.__class__, attr_name, None) attr = getattr(self.__class__, attr_name, None)
if attr is None: if attr is None:
return default return default
if hasattr(attr, "value"): if hasattr(attr, "value"):
return attr.value return attr.value
return str(attr) return str(attr)
async def send_reply(self, content: str) -> bool: async def send_reply(self, content: str) -> bool:
"""发送回复消息 """发送回复消息
Args: Args:
content: 回复内容 content: 回复内容
Returns: Returns:
bool: 是否发送成功 bool: 是否发送成功
""" """
chat_stream = self.api.get_service("chat_stream") chat_stream = self.api.get_service("chat_stream")
if not chat_stream: if not chat_stream:
logger.error(f"{self.log_prefix} 没有可用的聊天流发送回复") logger.error(f"{self.log_prefix} 没有可用的聊天流发送回复")
return False return False
if chat_stream.group_info: if chat_stream.group_info:
# 群聊 # 群聊
return await self.api.send_text_to_group( return await self.api.send_text_to_group(
text=content, group_id=str(chat_stream.group_info.group_id), platform=chat_stream.platform text=content, group_id=str(chat_stream.group_info.group_id), platform=chat_stream.platform
) )
else: else:
# 私聊 # 私聊
return await self.api.send_text_to_user( return await self.api.send_text_to_user(
text=content, user_id=str(chat_stream.user_info.user_id), platform=chat_stream.platform text=content, user_id=str(chat_stream.user_info.user_id), platform=chat_stream.platform
) )
async def send_command(self, command_name: str, args: dict = None, display_message: str = None) -> bool: async def send_type_reply(self, type: str, text: str) -> bool:
"""发送命令消息 """发送回复消息
使用和send_reply相同的方式通过MessageAPI发送命令 Args:
text: 回复内容
Args:
command_name: 命令名称 Returns:
args: 命令参数 bool: 是否发送成功
display_message: 显示消息 """
chat_stream = self.api.get_service("chat_stream")
Returns: if not chat_stream:
bool: 是否发送成功 logger.error(f"{self.log_prefix} 没有可用的聊天流发送回复")
""" return False
try:
# 构造命令数据 if chat_stream.group_info:
command_data = {"name": command_name, "args": args or {}} # 群聊
return await self.api.send_message_to_target(
# 使用send_message_to_target方法发送命令 message_type=type,
chat_stream = self.api.get_service("chat_stream") content=text,
if not chat_stream: platform=chat_stream.platform,
logger.error(f"{self.log_prefix} 没有可用的聊天流发送命令") target_id=str(chat_stream.group_info.group_id),
return False is_group=True
)
if chat_stream.group_info: else:
# #
success = await self.api.send_message_to_target( return await self.api.send_message_to_target(
message_type="command", message_type=type,
content=command_data, content=text,
platform=chat_stream.platform, platform=chat_stream.platform,
target_id=str(chat_stream.group_info.group_id), target_id=str(chat_stream.user_info.user_id),
is_group=True, is_group=False
display_message=display_message or f"执行命令: {command_name}",
) )
else:
# 私聊 async def send_command(self, command_name: str, args: dict = None, display_message: str = None) -> bool:
success = await self.api.send_message_to_target( """发送命令消息
message_type="command",
content=command_data, 使用和send_reply相同的方式通过MessageAPI发送命令
platform=chat_stream.platform,
target_id=str(chat_stream.user_info.user_id), Args:
is_group=False, command_name: 命令名称
display_message=display_message or f"执行命令: {command_name}", args: 命令参数
) display_message: 显示消息
if success: Returns:
logger.info(f"{self.log_prefix} 成功发送命令: {command_name}") bool: 是否发送成功
else: """
logger.error(f"{self.log_prefix} 发送命令失败: {command_name}") try:
# 构造命令数据
return success command_data = {"name": command_name, "args": args or {}}
except Exception as e: # 使用send_message_to_target方法发送命令
logger.error(f"{self.log_prefix} 发送命令时出错: {e}") chat_stream = self.api.get_service("chat_stream")
return False if not chat_stream:
logger.error(f"{self.log_prefix} 没有可用的聊天流发送命令")
async def send_message_by_expressor(self, text: str, target: str = "") -> bool: return False
"""通过expressor发送文本消息的Action专用方法
if chat_stream.group_info:
Args: # 群聊
text: 要发送的消息文本 success = await self.api.send_message_to_target(
target: 目标消息(可选) message_type="command",
content=command_data,
Returns: platform=chat_stream.platform,
bool: 是否发送成功 target_id=str(chat_stream.group_info.group_id),
""" is_group=True,
try: display_message=display_message or f"执行命令: {command_name}",
from src.chat.heart_flow.observation.chatting_observation import ChattingObservation )
from src.chat.focus_chat.hfc_utils import create_empty_anchor_message else:
# 私聊
# 获取服务 success = await self.api.send_message_to_target(
expressor = self.api.get_service("expressor") message_type="command",
chat_stream = self.api.get_service("chat_stream") content=command_data,
observations = self.api.get_service("observations") or [] platform=chat_stream.platform,
target_id=str(chat_stream.user_info.user_id),
if not expressor or not chat_stream: is_group=False,
logger.error(f"{self.log_prefix} 无法通过expressor发送消息缺少必要的服务") display_message=display_message or f"执行命令: {command_name}",
return False )
# 构造动作数据 if success:
reply_data = {"text": text, "target": target, "emojis": []} logger.info(f"{self.log_prefix} 成功发送命令: {command_name}")
else:
# 查找 ChattingObservation 实例 logger.error(f"{self.log_prefix} 发送命令失败: {command_name}")
chatting_observation = None
for obs in observations: return success
if isinstance(obs, ChattingObservation):
chatting_observation = obs except Exception as e:
break logger.error(f"{self.log_prefix} 发送命令时出错: {e}")
return False
if not chatting_observation:
logger.warning(f"{self.log_prefix} 未找到 ChattingObservation 实例,创建占位符") async def send_message_by_expressor(self, text: str, target: str = "") -> bool:
anchor_message = await create_empty_anchor_message( """通过expressor发送文本消息的Action专用方法
chat_stream.platform, chat_stream.group_info, chat_stream
) Args:
else: text: 要发送的消息文本
anchor_message = chatting_observation.search_message_by_text(target) target: 目标消息(可选)
if not anchor_message:
logger.info(f"{self.log_prefix} 未找到锚点消息,创建占位符") Returns:
anchor_message = await create_empty_anchor_message( bool: 是否发送成功
chat_stream.platform, chat_stream.group_info, chat_stream """
) try:
else: from src.chat.heart_flow.observation.chatting_observation import ChattingObservation
anchor_message.update_chat_stream(chat_stream) from src.chat.focus_chat.hfc_utils import create_empty_anchor_message
# 使用Action上下文信息发送消息 # 获取服务
success, _ = await expressor.deal_reply( expressor = self.api.get_service("expressor")
cycle_timers=self.cycle_timers, chat_stream = self.api.get_service("chat_stream")
action_data=reply_data, observations = self.api.get_service("observations") or []
anchor_message=anchor_message,
reasoning=self.reasoning, if not expressor or not chat_stream:
thinking_id=self.thinking_id, logger.error(f"{self.log_prefix} 无法通过expressor发送消息缺少必要的服务")
) return False
if success: # 构造动作数据
logger.info(f"{self.log_prefix} 成功通过expressor发送消息") reply_data = {"text": text, "target": target, "emojis": []}
else:
logger.error(f"{self.log_prefix} 通过expressor发送消息失败") # 查找 ChattingObservation 实例
chatting_observation = None
return success for obs in observations:
if isinstance(obs, ChattingObservation):
except Exception as e: chatting_observation = obs
logger.error(f"{self.log_prefix} 通过expressor发送消息时出错: {e}") break
return False
if not chatting_observation:
async def send_message_by_replyer(self, target: str = "", extra_info_block: str = None) -> bool: logger.warning(f"{self.log_prefix} 未找到 ChattingObservation 实例,创建占位符")
"""通过replyer发送消息的Action专用方法 anchor_message = await create_empty_anchor_message(
chat_stream.platform, chat_stream.group_info, chat_stream
Args: )
target: 目标消息(可选) else:
extra_info_block: 额外信息块(可选) anchor_message = chatting_observation.search_message_by_text(target)
if not anchor_message:
Returns: logger.info(f"{self.log_prefix} 未找到锚点消息,创建占位符")
bool: 是否发送成功 anchor_message = await create_empty_anchor_message(
""" chat_stream.platform, chat_stream.group_info, chat_stream
try: )
from src.chat.heart_flow.observation.chatting_observation import ChattingObservation else:
from src.chat.focus_chat.hfc_utils import create_empty_anchor_message anchor_message.update_chat_stream(chat_stream)
# 获取服务 # 使用Action上下文信息发送消息
replyer = self.api.get_service("replyer") success, _ = await expressor.deal_reply(
chat_stream = self.api.get_service("chat_stream") cycle_timers=self.cycle_timers,
observations = self.api.get_service("observations") or [] action_data=reply_data,
anchor_message=anchor_message,
if not replyer or not chat_stream: reasoning=self.reasoning,
logger.error(f"{self.log_prefix} 无法通过replyer发送消息缺少必要的服务") thinking_id=self.thinking_id,
return False )
# 构造动作数据 if success:
reply_data = {"target": target, "extra_info_block": extra_info_block} logger.info(f"{self.log_prefix} 成功通过expressor发送消息")
else:
# 查找 ChattingObservation 实例 logger.error(f"{self.log_prefix} 通过expressor发送消息失败")
chatting_observation = None
for obs in observations: return success
if isinstance(obs, ChattingObservation):
chatting_observation = obs except Exception as e:
break logger.error(f"{self.log_prefix} 通过expressor发送消息时出错: {e}")
return False
if not chatting_observation:
logger.warning(f"{self.log_prefix} 未找到 ChattingObservation 实例,创建占位符") async def send_message_by_replyer(self, target: str = "", extra_info_block: str = None) -> bool:
anchor_message = await create_empty_anchor_message( """通过replyer发送消息的Action专用方法
chat_stream.platform, chat_stream.group_info, chat_stream
) Args:
else: target: 目标消息(可选)
anchor_message = chatting_observation.search_message_by_text(target) extra_info_block: 额外信息块(可选)
if not anchor_message:
logger.info(f"{self.log_prefix} 未找到锚点消息,创建占位符") Returns:
anchor_message = await create_empty_anchor_message( bool: 是否发送成功
chat_stream.platform, chat_stream.group_info, chat_stream """
) try:
else: from src.chat.heart_flow.observation.chatting_observation import ChattingObservation
anchor_message.update_chat_stream(chat_stream) from src.chat.focus_chat.hfc_utils import create_empty_anchor_message
# 使用Action上下文信息发送消息 # 获取服务
success, _ = await replyer.deal_reply( replyer = self.api.get_service("replyer")
cycle_timers=self.cycle_timers, chat_stream = self.api.get_service("chat_stream")
action_data=reply_data, observations = self.api.get_service("observations") or []
anchor_message=anchor_message,
reasoning=self.reasoning, if not replyer or not chat_stream:
thinking_id=self.thinking_id, logger.error(f"{self.log_prefix} 无法通过replyer发送消息缺少必要的服务")
) return False
if success: # 构造动作数据
logger.info(f"{self.log_prefix} 成功通过replyer发送消息") reply_data = {"target": target, "extra_info_block": extra_info_block}
else:
logger.error(f"{self.log_prefix} 通过replyer发送消息失败") # 查找 ChattingObservation 实例
chatting_observation = None
return success for obs in observations:
if isinstance(obs, ChattingObservation):
except Exception as e: chatting_observation = obs
logger.error(f"{self.log_prefix} 通过replyer发送消息时出错: {e}") break
return False
if not chatting_observation:
@classmethod logger.warning(f"{self.log_prefix} 未找到 ChattingObservation 实例,创建占位符")
def get_action_info(cls, name: str = None, description: str = None) -> "ActionInfo": anchor_message = await create_empty_anchor_message(
"""从类属性生成ActionInfo chat_stream.platform, chat_stream.group_info, chat_stream
)
Args: else:
name: Action名称如果不提供则使用类名 anchor_message = chatting_observation.search_message_by_text(target)
description: Action描述如果不提供则使用类文档字符串 if not anchor_message:
logger.info(f"{self.log_prefix} 未找到锚点消息,创建占位符")
Returns: anchor_message = await create_empty_anchor_message(
ActionInfo: 生成的Action信息对象 chat_stream.platform, chat_stream.group_info, chat_stream
""" )
else:
# 优先使用类属性,然后自动生成 anchor_message.update_chat_stream(chat_stream)
if name is None:
name = getattr(cls, "action_name", cls.__name__.lower().replace("action", "")) # 使用Action上下文信息发送消息
if description is None: success, _ = await replyer.deal_reply(
description = getattr(cls, "action_description", None) cycle_timers=self.cycle_timers,
if description is None: action_data=reply_data,
description = cls.__doc__ or f"{cls.__name__} Action组件" anchor_message=anchor_message,
description = description.strip().split("\n")[0] # 取第一行作为描述 reasoning=self.reasoning,
thinking_id=self.thinking_id,
# 安全获取激活类型值 )
def get_enum_value(attr_name, default):
attr = getattr(cls, attr_name, None) if success:
if attr is None: logger.info(f"{self.log_prefix} 成功通过replyer发送消息")
# 如果没有定义,返回默认的枚举值 else:
return getattr(ActionActivationType, default.upper(), ActionActivationType.NEVER) logger.error(f"{self.log_prefix} 通过replyer发送消息失败")
return attr
return success
def get_mode_value(attr_name, default):
attr = getattr(cls, attr_name, None) except Exception as e:
if attr is None: logger.error(f"{self.log_prefix} 通过replyer发送消息时出错: {e}")
return getattr(ChatMode, default.upper(), ChatMode.ALL) return False
return attr
@classmethod
return ActionInfo( def get_action_info(cls, name: str = None, description: str = None) -> "ActionInfo":
name=name, """从类属性生成ActionInfo
component_type=ComponentType.ACTION,
description=description, Args:
focus_activation_type=get_enum_value("focus_activation_type", "never"), name: Action名称如果不提供则使用类名
normal_activation_type=get_enum_value("normal_activation_type", "never"), description: Action描述如果不提供则使用类文档字符串
activation_keywords=getattr(cls, "activation_keywords", []).copy(),
keyword_case_sensitive=getattr(cls, "keyword_case_sensitive", False), Returns:
mode_enable=get_mode_value("mode_enable", "all"), ActionInfo: 生成的Action信息对象
parallel_action=getattr(cls, "parallel_action", True), """
random_activation_probability=getattr(cls, "random_activation_probability", 0.0),
llm_judge_prompt=getattr(cls, "llm_judge_prompt", ""), # 优先使用类属性,然后自动生成
# 使用正确的字段名 if name is None:
action_parameters=getattr(cls, "action_parameters", {}).copy(), name = getattr(cls, "action_name", cls.__name__.lower().replace("action", ""))
action_require=getattr(cls, "action_require", []).copy(), if description is None:
associated_types=getattr(cls, "associated_types", []).copy(), description = getattr(cls, "action_description", None)
) if description is None:
description = cls.__doc__ or f"{cls.__name__} Action组件"
@abstractmethod description = description.strip().split("\n")[0] # 取第一行作为描述
async def execute(self) -> Tuple[bool, str]:
"""执行Action的抽象方法子类必须实现 # 安全获取激活类型值
def get_enum_value(attr_name, default):
Returns: attr = getattr(cls, attr_name, None)
Tuple[bool, str]: (是否执行成功, 回复文本) if attr is None:
""" # 如果没有定义,返回默认的枚举值
pass return getattr(ActionActivationType, default.upper(), ActionActivationType.NEVER)
return attr
async def handle_action(self) -> Tuple[bool, str]:
"""兼容旧系统的handle_action接口委托给execute方法 def get_mode_value(attr_name, default):
attr = getattr(cls, attr_name, None)
为了保持向后兼容性旧系统的代码可能会调用handle_action方法。 if attr is None:
此方法将调用委托给新的execute方法。 return getattr(ChatMode, default.upper(), ChatMode.ALL)
return attr
Returns:
Tuple[bool, str]: (是否执行成功, 回复文本) return ActionInfo(
""" name=name,
return await self.execute() component_type=ComponentType.ACTION,
description=description,
focus_activation_type=get_enum_value("focus_activation_type", "never"),
normal_activation_type=get_enum_value("normal_activation_type", "never"),
activation_keywords=getattr(cls, "activation_keywords", []).copy(),
keyword_case_sensitive=getattr(cls, "keyword_case_sensitive", False),
mode_enable=get_mode_value("mode_enable", "all"),
parallel_action=getattr(cls, "parallel_action", True),
random_activation_probability=getattr(cls, "random_activation_probability", 0.0),
llm_judge_prompt=getattr(cls, "llm_judge_prompt", ""),
# 使用正确的字段名
action_parameters=getattr(cls, "action_parameters", {}).copy(),
action_require=getattr(cls, "action_require", []).copy(),
associated_types=getattr(cls, "associated_types", []).copy(),
)
@abstractmethod
async def execute(self) -> Tuple[bool, str]:
"""执行Action的抽象方法子类必须实现
Returns:
Tuple[bool, str]: (是否执行成功, 回复文本)
"""
pass
async def handle_action(self) -> Tuple[bool, str]:
"""兼容旧系统的handle_action接口委托给execute方法
为了保持向后兼容性旧系统的代码可能会调用handle_action方法。
此方法将调用委托给新的execute方法。
Returns:
Tuple[bool, str]: (是否执行成功, 回复文本)
"""
return await self.execute()