perf(methods): 通过移除不必要的 self 参数优化方法签名

在包括 chat、plugin_system、schedule 和 mais4u 在内的多个模块中,消除冗余的实例引用。此次改动将无需访问实例状态的实用函数转换为静态方法,从而提升了内存效率,并使方法依赖关系更加清晰。
This commit is contained in:
雅诺狐
2025-09-20 10:55:06 +08:00
committed by Windpicker-owo
parent aba4f1a947
commit 93542cadef
111 changed files with 705 additions and 465 deletions

View File

@@ -8,7 +8,7 @@
readable_text = message_api.build_readable_messages(messages)
"""
from typing import List, Dict, Any, Tuple, Optional
from typing import List, Dict, Any, Tuple, Optional, Coroutine
from src.config.config import global_config
import time
from src.chat.utils.chat_message_builder import (
@@ -36,7 +36,7 @@ from src.chat.utils.chat_message_builder import (
def get_messages_by_time(
start_time: float, end_time: float, limit: int = 0, limit_mode: str = "latest", filter_mai: bool = False
) -> List[Dict[str, Any]]:
) -> Coroutine[Any, Any, list[dict[str, Any]]]:
"""
获取指定时间范围内的消息
@@ -155,7 +155,7 @@ def get_messages_by_time_in_chat_for_users(
person_ids: List[str],
limit: int = 0,
limit_mode: str = "latest",
) -> List[Dict[str, Any]]:
) -> Coroutine[Any, Any, list[dict[str, Any]]]:
"""
获取指定聊天中指定用户在指定时间范围内的消息
@@ -186,7 +186,7 @@ def get_messages_by_time_in_chat_for_users(
def get_random_chat_messages(
start_time: float, end_time: float, limit: int = 0, limit_mode: str = "latest", filter_mai: bool = False
) -> List[Dict[str, Any]]:
) -> Coroutine[Any, Any, list[dict[str, Any]]]:
"""
随机选择一个聊天,返回该聊天在指定时间范围内的消息
@@ -214,7 +214,7 @@ def get_random_chat_messages(
def get_messages_by_time_for_users(
start_time: float, end_time: float, person_ids: List[str], limit: int = 0, limit_mode: str = "latest"
) -> List[Dict[str, Any]]:
) -> Coroutine[Any, Any, list[dict[str, Any]]]:
"""
获取指定用户在所有聊天中指定时间范围内的消息
@@ -238,7 +238,8 @@ def get_messages_by_time_for_users(
return get_raw_msg_by_timestamp_with_users(start_time, end_time, person_ids, limit, limit_mode)
def get_messages_before_time(timestamp: float, limit: int = 0, filter_mai: bool = False) -> List[Dict[str, Any]]:
def get_messages_before_time(timestamp: float, limit: int = 0, filter_mai: bool = False) -> Coroutine[
Any, Any, list[dict[str, Any]]]:
"""
获取指定时间戳之前的消息
@@ -264,7 +265,7 @@ def get_messages_before_time(timestamp: float, limit: int = 0, filter_mai: bool
def get_messages_before_time_in_chat(
chat_id: str, timestamp: float, limit: int = 0, filter_mai: bool = False
) -> List[Dict[str, Any]]:
) -> Coroutine[Any, Any, list[dict[str, Any]]]:
"""
获取指定聊天中指定时间戳之前的消息
@@ -293,7 +294,8 @@ def get_messages_before_time_in_chat(
return get_raw_msg_before_timestamp_with_chat(chat_id, timestamp, limit)
def get_messages_before_time_for_users(timestamp: float, person_ids: List[str], limit: int = 0) -> List[Dict[str, Any]]:
def get_messages_before_time_for_users(timestamp: float, person_ids: List[str], limit: int = 0) -> Coroutine[
Any, Any, list[dict[str, Any]]]:
"""
获取指定用户在指定时间戳之前的消息
@@ -317,7 +319,7 @@ def get_messages_before_time_for_users(timestamp: float, person_ids: List[str],
def get_recent_messages(
chat_id: str, hours: float = 24.0, limit: int = 100, limit_mode: str = "latest", filter_mai: bool = False
) -> List[Dict[str, Any]]:
) -> Coroutine[Any, Any, list[dict[str, Any]]]:
"""
获取指定聊天中最近一段时间的消息
@@ -354,7 +356,8 @@ def get_recent_messages(
# =============================================================================
def count_new_messages(chat_id: str, start_time: float = 0.0, end_time: Optional[float] = None) -> int:
def count_new_messages(chat_id: str, start_time: float = 0.0, end_time: Optional[float] = None) -> Coroutine[
Any, Any, int]:
"""
计算指定聊天中从开始时间到结束时间的新消息数量
@@ -378,7 +381,8 @@ def count_new_messages(chat_id: str, start_time: float = 0.0, end_time: Optional
return num_new_messages_since(chat_id, start_time, end_time)
def count_new_messages_for_users(chat_id: str, start_time: float, end_time: float, person_ids: List[str]) -> int:
def count_new_messages_for_users(chat_id: str, start_time: float, end_time: float, person_ids: List[str]) -> Coroutine[
Any, Any, int]:
"""
计算指定聊天中指定用户从开始时间到结束时间的新消息数量
@@ -416,7 +420,7 @@ def build_readable_messages_to_str(
read_mark: float = 0.0,
truncate: bool = False,
show_actions: bool = False,
) -> str:
) -> Coroutine[Any, Any, str]:
"""
将消息列表构建成可读的字符串

View File

@@ -44,7 +44,7 @@ class UserInfo:
def to_tuple(self) -> tuple[str, str]:
"""转换为元组格式"""
return (self.platform, self.user_id)
return self.platform, self.user_id
class IPermissionManager(ABC):

View File

@@ -118,10 +118,10 @@ async def wait_adapter_response(request_id: str, timeout: float = 30.0) -> dict:
response = await asyncio.wait_for(future, timeout=timeout)
return response
except asyncio.TimeoutError:
_adapter_response_pool.pop(request_id, None)
await _adapter_response_pool.pop(request_id, None)
return {"status": "error", "message": "timeout"}
except Exception as e:
_adapter_response_pool.pop(request_id, None)
await _adapter_response_pool.pop(request_id, None)
return {"status": "error", "message": str(e)}

View File

@@ -1,5 +1,6 @@
import asyncio
from typing import List, Dict, Any, Optional
from src.common.logger import get_logger
logger = get_logger("base_event")
@@ -90,8 +91,6 @@ class BaseEvent:
self.allowed_subscribers = allowed_subscribers # 记录事件处理器名
self.allowed_triggers = allowed_triggers # 记录插件名
from src.plugin_system.base.base_events_handler import BaseEventHandler
self.subscribers: List["BaseEventHandler"] = [] # 订阅该事件的事件处理器列表
self.event_handle_lock = asyncio.Lock()
@@ -150,7 +149,8 @@ class BaseEvent:
return HandlerResultsCollection(processed_results)
async def _execute_subscriber(self, subscriber, params: dict) -> HandlerResult:
@staticmethod
async def _execute_subscriber(subscriber, params: dict) -> HandlerResult:
"""执行单个订阅者处理器"""
try:
return await subscriber.execute(params)

View File

@@ -277,7 +277,8 @@ class PluginBase(ABC):
return config_version_field.default
return "1.0.0"
def _get_current_config_version(self, config: Dict[str, Any]) -> str:
@staticmethod
def _get_current_config_version(config: Dict[str, Any]) -> str:
"""从配置文件中获取当前版本号"""
if "plugin" in config and "config_version" in config["plugin"]:
return str(config["plugin"]["config_version"])

View File

@@ -149,7 +149,7 @@ class PlusCommand(ABC):
Returns:
bool: 如果匹配返回True
"""
return not self.args.is_empty() or self._is_exact_command_call()
return not self.args.is_empty or self._is_exact_command_call()
def _is_exact_command_call(self) -> bool:
"""检查是否是精确的命令调用(无参数)"""

View File

@@ -31,6 +31,7 @@ class ComponentRegistry:
def __init__(self):
# 命名空间式组件名构成法 f"{component_type}.{component_name}"
self._plus_command_registry: Dict[str, Type[PlusCommand]] = {}
self._components: Dict[str, ComponentInfo] = {}
"""组件注册表 命名空间式组件名 -> 组件信息"""
self._components_by_type: Dict[ComponentType, Dict[str, ComponentInfo]] = {types: {} for types in ComponentType}
@@ -618,7 +619,7 @@ class ComponentRegistry:
def get_plus_command_registry(self) -> Dict[str, Type[PlusCommand]]:
"""获取PlusCommand注册表"""
if not hasattr(self, "_plus_command_registry"):
self._plus_command_registry: Dict[str, Type[PlusCommand]] = {}
pass
return self._plus_command_registry.copy()
def get_registered_plus_command_info(self, command_name: str) -> Optional[PlusCommandInfo]:
@@ -667,7 +668,8 @@ class ComponentRegistry:
plugin_info = self.get_plugin_info(plugin_name)
return plugin_info.components if plugin_info else []
def get_plugin_config(self, plugin_name: str) -> dict:
@staticmethod
def get_plugin_config(plugin_name: str) -> dict:
"""获取插件配置
Args:

View File

@@ -7,6 +7,7 @@ from typing import Dict, Type, List, Optional, Any, Union
from threading import Lock
from src.common.logger import get_logger
from src.plugin_system import BaseEventHandler
from src.plugin_system.base.base_event import BaseEvent, HandlerResultsCollection
from src.plugin_system.base.base_events_handler import BaseEventHandler
from src.plugin_system.base.component_types import EventType
@@ -198,7 +199,7 @@ class EventManager:
"""
return self._event_handlers.get(handler_name)
def get_all_event_handlers(self) -> Dict[str, BaseEventHandler]:
def get_all_event_handlers(self) -> dict[str, type[BaseEventHandler]]:
"""获取所有已注册的事件处理器
Returns:

View File

@@ -290,7 +290,8 @@ class PluginHotReloadManager:
logger.error(f"❌ 重载插件 {plugin_name} 时发生错误: {e}", exc_info=True)
return False
def _resolve_plugin_name(self, folder_name: str) -> str:
@staticmethod
def _resolve_plugin_name(folder_name: str) -> str:
"""
将文件夹名称解析为实际的插件名称
通过检查插件管理器中的路径映射来找到对应的插件名
@@ -349,7 +350,8 @@ class PluginHotReloadManager:
# 出错时尝试简单重载
return self._reload_plugin(plugin_name)
def _force_clear_plugin_modules(self, plugin_name: str):
@staticmethod
def _force_clear_plugin_modules(plugin_name: str):
"""强制清理插件相关的模块缓存"""
# 找到所有相关的模块名
@@ -366,7 +368,8 @@ class PluginHotReloadManager:
logger.debug(f"🗑️ 清理模块缓存: {module_name}")
del sys.modules[module_name]
def _force_reimport_plugin(self, plugin_name: str):
@staticmethod
def _force_reimport_plugin(plugin_name: str):
"""强制重新导入插件(委托给插件管理器)"""
try:
# 使用插件管理器的重载功能
@@ -377,7 +380,8 @@ class PluginHotReloadManager:
logger.error(f"❌ 强制重新导入插件 {plugin_name} 时发生错误: {e}", exc_info=True)
return False
def _unload_plugin(self, plugin_name: str):
@staticmethod
def _unload_plugin(plugin_name: str):
"""卸载指定插件"""
try:
logger.info(f"🗑️ 开始卸载插件: {plugin_name}")
@@ -490,7 +494,8 @@ class PluginHotReloadManager:
"debounce_delay": self.file_handlers[0].debounce_delay if self.file_handlers else 0,
}
def clear_all_caches(self):
@staticmethod
def clear_all_caches():
"""清理所有Python模块缓存"""
try:
logger.info("🧹 开始清理所有Python模块缓存...")

View File

@@ -67,7 +67,8 @@ class PluginManager:
except Exception as e:
logger.error(f"同步插件 '{plugin_name}' 配置时发生未知错误: {e}")
def _copy_default_config_to_central(self, plugin_name: str, plugin_config_file: Path, central_config_dir: Path):
@staticmethod
def _copy_default_config_to_central(plugin_name: str, plugin_config_file: Path, central_config_dir: Path):
"""
如果中央配置不存在,则将插件的默认 config.toml 复制到中央目录。
"""
@@ -96,7 +97,8 @@ class PluginManager:
shutil.copy2(central_file, target_plugin_file)
logger.info(f"已将中央配置 '{central_file.name}' 同步到插件 '{plugin_name}'")
def _is_file_content_identical(self, file1: Path, file2: Path) -> bool:
@staticmethod
def _is_file_content_identical(file1: Path, file2: Path) -> bool:
"""
通过比较 MD5 哈希值检查两个文件的内容是否相同。
"""
@@ -403,7 +405,8 @@ class PluginManager:
# == 兼容性检查 ==
def _check_plugin_version_compatibility(self, plugin_name: str, manifest_data: Dict[str, Any]) -> Tuple[bool, str]:
@staticmethod
def _check_plugin_version_compatibility(plugin_name: str, manifest_data: Dict[str, Any]) -> Tuple[bool, str]:
"""检查插件版本兼容性
Args:
@@ -557,7 +560,8 @@ class PluginManager:
else:
logger.warning("😕 没有成功加载任何插件")
def _show_plugin_components(self, plugin_name: str) -> None:
@staticmethod
def _show_plugin_components(plugin_name: str) -> None:
if plugin_info := component_registry.get_plugin_info(plugin_name):
component_types = {}
for comp in plugin_info.components:
@@ -673,7 +677,8 @@ class PluginManager:
"""
return self.reload_plugin(plugin_name)
def clear_all_plugin_caches(self):
@staticmethod
def clear_all_plugin_caches():
"""清理所有插件相关的模块缓存(简化版)"""
try:
logger.info("🧹 清理模块缓存...")

View File

@@ -162,7 +162,8 @@ class DependencyManager:
return False, all_errors
def _normalize_dependencies(self, dependencies: Any) -> List[PythonDependency]:
@staticmethod
def _normalize_dependencies(dependencies: Any) -> List[PythonDependency]:
"""将依赖列表标准化为PythonDependency对象"""
normalized = []
@@ -191,7 +192,8 @@ class DependencyManager:
return normalized
def _check_single_dependency(self, dep: PythonDependency) -> bool:
@staticmethod
def _check_single_dependency(dep: PythonDependency) -> bool:
"""检查单个依赖是否满足要求"""
def _try_check(import_name: str) -> bool:

View File

@@ -82,10 +82,10 @@ class VersionComparator:
normalized = VersionComparator.normalize_version(version)
try:
parts = normalized.split(".")
return (int(parts[0]), int(parts[1]), int(parts[2]))
return int(parts[0]), int(parts[1]), int(parts[2])
except (ValueError, IndexError):
logger.warning(f"无法解析版本号: {version},使用默认版本 0.0.0")
return (0, 0, 0)
return 0, 0, 0
@staticmethod
def compare_versions(version1: str, version2: str) -> int:

View File

@@ -58,7 +58,7 @@ def require_permission(permission_node: str, deny_message: Optional[str] = None)
if chat_stream is None:
logger.error(f"权限装饰器无法找到 ChatStream 对象,函数: {func.__name__}")
return
return None
# 检查权限
has_permission = permission_api.check_permission(
@@ -72,7 +72,7 @@ def require_permission(permission_node: str, deny_message: Optional[str] = None)
# 对于PlusCommand的execute方法需要返回适当的元组
if func.__name__ == "execute" and hasattr(args[0], "send_text"):
return False, "权限不足", True
return
return None
# 权限检查通过,执行原函数
return await func(*args, **kwargs)
@@ -90,7 +90,7 @@ def require_permission(permission_node: str, deny_message: Optional[str] = None)
if chat_stream is None:
logger.error(f"权限装饰器无法找到 ChatStream 对象,函数: {func.__name__}")
return
return None
# 检查权限
has_permission = permission_api.check_permission(
@@ -101,7 +101,7 @@ def require_permission(permission_node: str, deny_message: Optional[str] = None)
logger.warning(
f"用户 {chat_stream.platform}:{chat_stream.user_info.user_id} 没有权限 {permission_node}"
)
return
return None
# 权限检查通过,执行原函数
return func(*args, **kwargs)
@@ -156,7 +156,7 @@ def require_master(deny_message: Optional[str] = None):
if chat_stream is None:
logger.error(f"Master权限装饰器无法找到 ChatStream 对象,函数: {func.__name__}")
return
return None
# 检查是否为Master用户
is_master = permission_api.is_master(chat_stream.platform, chat_stream.user_info.user_id)
@@ -166,7 +166,7 @@ def require_master(deny_message: Optional[str] = None):
await text_to_stream(message, chat_stream.stream_id)
if func.__name__ == "execute" and hasattr(args[0], "send_text"):
return False, "需要Master权限", True
return
return None
# 权限检查通过,执行原函数
return await func(*args, **kwargs)
@@ -184,14 +184,14 @@ def require_master(deny_message: Optional[str] = None):
if chat_stream is None:
logger.error(f"Master权限装饰器无法找到 ChatStream 对象,函数: {func.__name__}")
return
return None
# 检查是否为Master用户
is_master = permission_api.is_master(chat_stream.platform, chat_stream.user_info.user_id)
if not is_master:
logger.warning(f"用户 {chat_stream.platform}:{chat_stream.user_info.user_id} 不是Master用户")
return
return None
# 权限检查通过,执行原函数
return func(*args, **kwargs)