perf(methods): 通过移除不必要的 self 参数优化方法签名

在包括 chat、plugin_system、schedule 和 mais4u 在内的多个模块中,消除冗余的实例引用。此次改动将无需访问实例状态的实用函数转换为静态方法,从而提升了内存效率,并使方法依赖关系更加清晰。
This commit is contained in:
雅诺狐
2025-09-20 10:55:06 +08:00
parent 0cc4f5bb27
commit 898208f425
111 changed files with 643 additions and 467 deletions

View File

@@ -215,6 +215,10 @@ class PromptManager:
result = prompt.format(**kwargs)
return result
@property
def context(self):
return self._context
# 全局单例
global_prompt_manager = PromptManager()
@@ -256,7 +260,7 @@ class Prompt:
self._processed_template = self._process_escaped_braces(template)
# 自动注册
if should_register and not global_prompt_manager._context._current_context:
if should_register and not global_prompt_manager.context._current_context:
global_prompt_manager.register(self)
@staticmethod
@@ -459,8 +463,9 @@ class Prompt:
context_data["chat_info"] = f"""群里的聊天内容:
{self.parameters.chat_talking_prompt_short}"""
@staticmethod
async def _build_s4u_chat_history_prompts(
self, message_list_before_now: List[Dict[str, Any]], target_user_id: str, sender: str
message_list_before_now: List[Dict[str, Any]], target_user_id: str, sender: str
) -> Tuple[str, str]:
"""构建S4U风格的分离对话prompt"""
# 实现逻辑与原有SmartPromptBuilder相同
@@ -537,14 +542,10 @@ class Prompt:
)
# 创建表情选择器
expression_selector = ExpressionSelector(self.parameters.chat_id)
expression_selector = ExpressionSelector()
# 选择合适的表情
selected_expressions = await expression_selector.select_suitable_expressions_llm(
chat_history=chat_history,
current_message=self.parameters.target,
emotional_tone="neutral",
topic_type="general"
)
# 构建表达习惯块
@@ -991,7 +992,7 @@ async def create_prompt_async(
) -> Prompt:
"""异步创建Prompt实例"""
prompt = create_prompt(template, name, parameters, **kwargs)
if global_prompt_manager._context._current_context:
await global_prompt_manager._context.register_async(prompt)
if global_prompt_manager.context._current_context:
await global_prompt_manager.context.register_async(prompt)
return prompt

View File

@@ -763,7 +763,8 @@ class StatisticOutputTask(AsyncTask):
output.append("")
return "\n".join(output)
def _get_chat_display_name_from_id(self, chat_id: str) -> str:
@staticmethod
def _get_chat_display_name_from_id(chat_id: str) -> str:
"""从chat_id获取显示名称"""
try:
# 首先尝试从chat_stream获取真实群组名称
@@ -1109,7 +1110,8 @@ class StatisticOutputTask(AsyncTask):
return chart_data
def _collect_interval_data(self, now: datetime, hours: int, interval_minutes: int) -> dict:
@staticmethod
def _collect_interval_data(now: datetime, hours: int, interval_minutes: int) -> dict:
"""收集指定时间范围内每个间隔的数据"""
# 生成时间点
start_time = now - timedelta(hours=hours)
@@ -1199,7 +1201,8 @@ class StatisticOutputTask(AsyncTask):
"message_by_chat": message_by_chat,
}
def _generate_chart_tab(self, chart_data: dict) -> str:
@staticmethod
def _generate_chart_tab(chart_data: dict) -> str:
# sourcery skip: extract-duplicate-method, move-assign-in-block
"""生成图表选项卡HTML内容"""
@@ -1563,13 +1566,13 @@ class AsyncStatisticOutputTask(AsyncTask):
return StatisticOutputTask._generate_chart_data(self, stat) # type: ignore
def _collect_interval_data(self, now: datetime, hours: int, interval_minutes: int) -> dict:
return StatisticOutputTask._collect_interval_data(self, now, hours, interval_minutes) # type: ignore
return StatisticOutputTask._collect_interval_data(now, hours, interval_minutes) # type: ignore
def _generate_chart_tab(self, chart_data: dict) -> str:
return StatisticOutputTask._generate_chart_tab(self, chart_data) # type: ignore
return StatisticOutputTask._generate_chart_tab(chart_data) # type: ignore
def _get_chat_display_name_from_id(self, chat_id: str) -> str:
return StatisticOutputTask._get_chat_display_name_from_id(self, chat_id) # type: ignore
return StatisticOutputTask._get_chat_display_name_from_id(chat_id) # type: ignore
def _convert_defaultdict_to_dict(self, data):
return StatisticOutputTask._convert_defaultdict_to_dict(self, data) # type: ignore

View File

@@ -7,7 +7,7 @@ import numpy as np
from collections import Counter
from maim_message import UserInfo
from typing import Optional, Tuple, Dict, List, Any
from typing import Optional, Tuple, Dict, List, Any, Coroutine
from src.common.logger import get_logger
from src.common.message_repository import find_messages, count_messages
@@ -540,7 +540,8 @@ def get_western_ratio(paragraph):
return western_count / len(alnum_chars)
def count_messages_between(start_time: float, end_time: float, stream_id: str) -> tuple[int, int]:
def count_messages_between(start_time: float, end_time: float, stream_id: str) -> tuple[int, int] | tuple[
Coroutine[Any, Any, int], int]:
"""计算两个时间点之间的消息数量和文本总长度
Args:

View File

@@ -134,7 +134,8 @@ class ImageManager:
except Exception as e:
logger.error(f"保存描述到数据库失败 (SQLAlchemy): {str(e)}")
async def get_emoji_tag(self, image_base64: str) -> str:
@staticmethod
async def get_emoji_tag(image_base64: str) -> str:
from src.chat.emoji_system.emoji_manager import get_emoji_manager
emoji_manager = get_emoji_manager()

View File

@@ -167,7 +167,8 @@ class VideoAnalyzer:
# 获取Rust模块系统信息
self._log_system_info()
def _log_system_info(self):
@staticmethod
def _log_system_info():
"""记录系统信息"""
if not RUST_VIDEO_AVAILABLE:
logger.info("⚠️ Rust模块不可用跳过系统信息获取")
@@ -196,13 +197,15 @@ class VideoAnalyzer:
except Exception as e:
logger.warning(f"获取系统信息失败: {e}")
def _calculate_video_hash(self, video_data: bytes) -> str:
@staticmethod
def _calculate_video_hash(video_data: bytes) -> str:
"""计算视频文件的hash值"""
hash_obj = hashlib.sha256()
hash_obj.update(video_data)
return hash_obj.hexdigest()
def _check_video_exists(self, video_hash: str) -> Optional[Videos]:
@staticmethod
def _check_video_exists(video_hash: str) -> Optional[Videos]:
"""检查视频是否已经分析过"""
try:
with get_db_session() as session:
@@ -213,8 +216,9 @@ class VideoAnalyzer:
logger.warning(f"检查视频是否存在时出错: {e}")
return None
@staticmethod
def _store_video_result(
self, video_hash: str, description: str, metadata: Optional[Dict] = None
video_hash: str, description: str, metadata: Optional[Dict] = None
) -> Optional[Videos]:
"""存储视频分析结果到数据库"""
# 检查描述是否为错误信息,如果是则不保存
@@ -619,7 +623,7 @@ class VideoAnalyzer:
if self.disabled:
error_msg = "❌ 视频分析功能已禁用:没有可用的视频处理实现"
logger.warning(error_msg)
return (False, error_msg)
return False, error_msg
try:
logger.info(f"开始分析视频: {os.path.basename(video_path)}")
@@ -628,7 +632,7 @@ class VideoAnalyzer:
frames = await self.extract_frames(video_path)
if not frames:
error_msg = "❌ 无法从视频中提取有效帧"
return (False, error_msg)
return False, error_msg
# 根据模式选择分析方法
if self.analysis_mode == "auto":
@@ -645,12 +649,12 @@ class VideoAnalyzer:
result = await self.analyze_frames_sequential(frames, user_question)
logger.info("✅ 视频分析完成")
return (True, result)
return True, result
except Exception as e:
error_msg = f"❌ 视频分析失败: {str(e)}"
logger.error(error_msg)
return (False, error_msg)
return False, error_msg
async def analyze_video_from_bytes(
self, video_bytes: bytes, filename: str = None, user_question: str = None, prompt: str = None
@@ -783,7 +787,8 @@ class VideoAnalyzer:
return {"summary": error_msg}
def is_supported_video(self, file_path: str) -> bool:
@staticmethod
def is_supported_video(file_path: str) -> bool:
"""检查是否为支持的视频格式"""
supported_formats = {".mp4", ".avi", ".mov", ".mkv", ".flv", ".wmv", ".m4v", ".3gp", ".webm"}
return Path(file_path).suffix.lower() in supported_formats
@@ -818,7 +823,8 @@ class VideoAnalyzer:
logger.error(f"获取处理能力信息失败: {e}")
return {"error": str(e), "available": False}
def _get_recommended_settings(self, cpu_features: Dict[str, bool]) -> Dict[str, any]:
@staticmethod
def _get_recommended_settings(cpu_features: Dict[str, bool]) -> Dict[str, any]:
"""根据CPU特性推荐最佳设置"""
settings = {
"use_simd": any(cpu_features.values()),

View File

@@ -13,7 +13,7 @@ import base64
import numpy as np
from PIL import Image
from pathlib import Path
from typing import List, Tuple, Optional
from typing import List, Tuple, Optional, Any
import io
from concurrent.futures import ThreadPoolExecutor
@@ -31,7 +31,7 @@ def _extract_frames_worker(
max_image_size: int,
frame_extraction_mode: str,
frame_interval_seconds: Optional[float],
) -> List[Tuple[str, float]]:
) -> list[Any] | list[tuple[str, str]]:
"""线程池中提取视频帧的工作函数"""
frames = []
try:
@@ -568,7 +568,8 @@ class LegacyVideoAnalyzer:
logger.error(error_msg)
return error_msg
def is_supported_video(self, file_path: str) -> bool:
@staticmethod
def is_supported_video(file_path: str) -> bool:
"""检查是否为支持的视频格式"""
supported_formats = {".mp4", ".avi", ".mov", ".mkv", ".flv", ".wmv", ".m4v", ".3gp", ".webm"}
return Path(file_path).suffix.lower() in supported_formats