🤖 自动格式化代码 [skip ci]

This commit is contained in:
github-actions[bot]
2025-06-10 16:20:05 +00:00
parent 8fb3662c03
commit 0cb595218e
13 changed files with 316 additions and 343 deletions

View File

@@ -29,29 +29,29 @@ class ConfigAPI:
def get_config(self, key: str, default: Any = None) -> Any:
"""
从插件配置中获取值,支持嵌套键访问
Args:
key: 配置键名,支持嵌套访问如 "section.subsection.key"
default: 如果配置不存在时返回的默认值
Returns:
Any: 配置值或默认值
"""
# 获取插件配置
plugin_config = getattr(self, '_plugin_config', {})
plugin_config = getattr(self, "_plugin_config", {})
if not plugin_config:
return default
# 支持嵌套键访问
keys = key.split('.')
keys = key.split(".")
current = plugin_config
for k in keys:
if isinstance(current, dict) and k in current:
current = current[k]
else:
return default
return current
async def get_user_id_by_person_name(self, person_name: str) -> tuple[str, str]:

View File

@@ -157,8 +157,6 @@ class MessageAPI:
message_type="text", content=text, platform=platform, target_id=user_id, is_group=False
)
def get_chat_type(self) -> str:
"""获取当前聊天类型

View File

@@ -33,7 +33,13 @@ class PluginAPI(MessageAPI, LLMAPI, DatabaseAPI, ConfigAPI, UtilsAPI, StreamAPI,
"""
def __init__(
self, chat_stream=None, expressor=None, replyer=None, observations=None, log_prefix: str = "[PluginAPI]", plugin_config: dict = None
self,
chat_stream=None,
expressor=None,
replyer=None,
observations=None,
log_prefix: str = "[PluginAPI]",
plugin_config: dict = None,
):
"""
初始化插件API
@@ -109,7 +115,12 @@ class PluginAPI(MessageAPI, LLMAPI, DatabaseAPI, ConfigAPI, UtilsAPI, StreamAPI,
# 便捷的工厂函数
def create_plugin_api(
chat_stream=None, expressor=None, replyer=None, observations=None, log_prefix: str = "[Plugin]", plugin_config: dict = None
chat_stream=None,
expressor=None,
replyer=None,
observations=None,
log_prefix: str = "[Plugin]",
plugin_config: dict = None,
) -> PluginAPI:
"""
创建插件API实例的便捷函数
@@ -126,7 +137,12 @@ def create_plugin_api(
PluginAPI: 配置好的插件API实例
"""
return PluginAPI(
chat_stream=chat_stream, expressor=expressor, replyer=replyer, observations=observations, log_prefix=log_prefix, plugin_config=plugin_config
chat_stream=chat_stream,
expressor=expressor,
replyer=replyer,
observations=observations,
log_prefix=log_prefix,
plugin_config=plugin_config,
)

View File

@@ -121,53 +121,45 @@ class BaseAction(ABC):
Returns:
bool: 是否发送成功
"""
chat_stream = self.api.get_service('chat_stream')
chat_stream = self.api.get_service("chat_stream")
if not chat_stream:
logger.error(f"{self.log_prefix} 没有可用的聊天流发送回复")
return False
if chat_stream.group_info:
# 群聊
return await self.api.send_text_to_group(
text=content,
group_id=str(chat_stream.group_info.group_id),
platform=chat_stream.platform
text=content, group_id=str(chat_stream.group_info.group_id), platform=chat_stream.platform
)
else:
# 私聊
return await self.api.send_text_to_user(
text=content,
user_id=str(chat_stream.user_info.user_id),
platform=chat_stream.platform
text=content, user_id=str(chat_stream.user_info.user_id), platform=chat_stream.platform
)
async def send_command(self, command_name: str, args: dict = None, display_message: str = None) -> bool:
"""发送命令消息
使用和send_reply相同的方式通过MessageAPI发送命令
Args:
command_name: 命令名称
args: 命令参数
display_message: 显示消息
Returns:
bool: 是否发送成功
"""
try:
# 构造命令数据
command_data = {
"name": command_name,
"args": args or {}
}
command_data = {"name": command_name, "args": args or {}}
# 使用send_message_to_target方法发送命令
chat_stream = self.api.get_service('chat_stream')
chat_stream = self.api.get_service("chat_stream")
if not chat_stream:
logger.error(f"{self.log_prefix} 没有可用的聊天流发送命令")
return False
if chat_stream.group_info:
# 群聊
success = await self.api.send_message_to_target(
@@ -176,7 +168,7 @@ class BaseAction(ABC):
platform=chat_stream.platform,
target_id=str(chat_stream.group_info.group_id),
is_group=True,
display_message=display_message or f"执行命令: {command_name}"
display_message=display_message or f"执行命令: {command_name}",
)
else:
# 私聊
@@ -186,16 +178,16 @@ class BaseAction(ABC):
platform=chat_stream.platform,
target_id=str(chat_stream.user_info.user_id),
is_group=False,
display_message=display_message or f"执行命令: {command_name}"
display_message=display_message or f"执行命令: {command_name}",
)
if success:
logger.info(f"{self.log_prefix} 成功发送命令: {command_name}")
else:
logger.error(f"{self.log_prefix} 发送命令失败: {command_name}")
return success
except Exception as e:
logger.error(f"{self.log_prefix} 发送命令时出错: {e}")
return False
@@ -213,7 +205,7 @@ class BaseAction(ABC):
try:
from src.chat.heart_flow.observation.chatting_observation import ChattingObservation
from src.chat.focus_chat.hfc_utils import create_empty_anchor_message
# 获取服务
expressor = self.api.get_service("expressor")
chat_stream = self.api.get_service("chat_stream")
@@ -281,7 +273,7 @@ class BaseAction(ABC):
try:
from src.chat.heart_flow.observation.chatting_observation import ChattingObservation
from src.chat.focus_chat.hfc_utils import create_empty_anchor_message
# 获取服务
replyer = self.api.get_service("replyer")
chat_stream = self.api.get_service("chat_stream")

View File

@@ -82,28 +82,25 @@ class BaseCommand(ABC):
async def send_command(self, command_name: str, args: dict = None, display_message: str = None) -> bool:
"""发送命令消息
使用和send_reply相同的方式通过MessageAPI发送命令
Args:
command_name: 命令名称
args: 命令参数
display_message: 显示消息
Returns:
bool: 是否发送成功
"""
try:
# 构造命令数据
command_data = {
"name": command_name,
"args": args or {}
}
command_data = {"name": command_name, "args": args or {}}
# 使用send_message_to_target方法发送命令
chat_stream = self.message.chat_stream
command_content = command_data
if chat_stream.group_info:
# 群聊
success = await self.api.send_message_to_target(
@@ -112,7 +109,7 @@ class BaseCommand(ABC):
platform=chat_stream.platform,
target_id=str(chat_stream.group_info.group_id),
is_group=True,
display_message=display_message or f"执行命令: {command_name}"
display_message=display_message or f"执行命令: {command_name}",
)
else:
# 私聊
@@ -122,16 +119,16 @@ class BaseCommand(ABC):
platform=chat_stream.platform,
target_id=str(chat_stream.user_info.user_id),
is_group=False,
display_message=display_message or f"执行命令: {command_name}"
display_message=display_message or f"执行命令: {command_name}",
)
if success:
logger.info(f"{self.log_prefix} 成功发送命令: {command_name}")
else:
logger.error(f"{self.log_prefix} 发送命令失败: {command_name}")
return success
except Exception as e:
logger.error(f"{self.log_prefix} 发送命令时出错: {e}")
return False

View File

@@ -177,15 +177,15 @@ class BasePlugin(ABC):
Any: 配置值或默认值
"""
# 支持嵌套键访问
keys = key.split('.')
keys = key.split(".")
current = self.config
for k in keys:
if isinstance(current, dict) and k in current:
current = current[k]
else:
return default
return current

View File

@@ -164,7 +164,12 @@ class ComponentRegistry:
if command_name:
command_info = self.get_command_info(command_name)
if command_info and command_info.enabled:
return command_class, match.groupdict(), command_info.intercept_message, command_info.plugin_name
return (
command_class,
match.groupdict(),
command_info.intercept_message,
command_info.plugin_name,
)
return None
# === 插件管理方法 ===
@@ -207,15 +212,16 @@ class ComponentRegistry:
def get_plugin_config(self, plugin_name: str) -> Optional[dict]:
"""获取插件配置
Args:
plugin_name: 插件名称
Returns:
Optional[dict]: 插件配置字典或None
"""
# 从插件管理器获取插件实例的配置
from src.plugin_system.core.plugin_manager import plugin_manager
plugin_instance = plugin_manager.get_plugin_instance(plugin_name)
return plugin_instance.config if plugin_instance else None

View File

@@ -291,10 +291,10 @@ class PluginManager:
def get_plugin_instance(self, plugin_name: str) -> Optional["BasePlugin"]:
"""获取插件实例
Args:
plugin_name: 插件名称
Returns:
Optional[BasePlugin]: 插件实例或None
"""