refactor(core): 提升类型安全性并添加配置空值检查

此提交在核心模块中引入了多项改进,以增强类型安全性和健壮性,主要通过为类型提示添加 `cast` 并在访问 `global_config` 属性前进行空值检查实现。

主要改动包括:
- **类型安全**:在 `message_handler.py`、`unified_manager.py` 和 `napcat_adapter` 的消息处理器中使用 `typing.cast` 来解决类型不一致问题并提高静态分析的准确性。
- **配置空值检查**:在 `message_handler.py` 和 `message_processor.py` 中添加对 `global_config` 及其嵌套属性的检查,以防止在应用启动或配置加载过程中出现 `NoneType` 错误。
- **内存管理提示**:优化了 `unified_manager.py` 中内存判断器的提示,使其在获取长期记忆时更加保守,从而提升简单交互的性能。
- **Napcat 适配器**:新增了视频处理的配置选项以及回复行为。同时改进了消息解析逻辑的鲁棒性。- **消息处理器**:重构了 `_process_message_segments` 及相关函数,移除了未使用的 `message_info` 参数,从而简化了函数签名。
This commit is contained in:
tt-P607
2025-11-28 10:15:53 +08:00
parent 883e391010
commit abfcf56941
6 changed files with 119 additions and 94 deletions

View File

@@ -43,6 +43,7 @@ from src.config.config import global_config
from src.mood.mood_manager import mood_manager
from src.plugin_system.base import BaseCommand, EventType
from src.plugin_system.core import component_registry, event_manager, global_announcement_manager
from typing import cast
if TYPE_CHECKING:
from src.chat.message_receive.chat_stream import ChatStream
@@ -55,23 +56,25 @@ PROJECT_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), "../.."))
def _check_ban_words(text: str, chat: "ChatStream", userinfo) -> bool:
"""检查消息是否包含过滤词"""
for word in global_config.message_receive.ban_words:
if word in text:
chat_name = chat.group_info.group_name if chat.group_info else "私聊"
logger.info(f"[{chat_name}]{userinfo.user_nickname}:{text}")
logger.info(f"[过滤词识别]消息中含有{word}filtered")
return True
if global_config and global_config.message_receive:
for word in global_config.message_receive.ban_words:
if word in text:
chat_name = chat.group_info.group_name if chat.group_info else "私聊"
logger.info(f"[{chat_name}]{userinfo.user_nickname}:{text}")
logger.info(f"[过滤词识别]消息中含有{word}filtered")
return True
return False
def _check_ban_regex(text: str, chat: "ChatStream", userinfo) -> bool:
"""检查消息是否匹配过滤正则表达式"""
for pattern in global_config.message_receive.ban_msgs_regex:
if re.search(pattern, text):
chat_name = chat.group_info.group_name if chat.group_info else "私聊"
logger.info(f"[{chat_name}]{userinfo.user_nickname}:{text}")
logger.info(f"[正则表达式过滤]消息匹配到{pattern}filtered")
return True
if global_config and global_config.message_receive:
for pattern in global_config.message_receive.ban_msgs_regex:
if re.search(pattern, text):
chat_name = chat.group_info.group_name if chat.group_info else "私聊"
logger.info(f"[{chat_name}]{userinfo.user_nickname}:{text}")
logger.info(f"[正则表达式过滤]消息匹配到{pattern}filtered")
return True
return False
@@ -281,7 +284,7 @@ class MessageHandler:
chat = await get_chat_manager().get_or_create_stream(
platform=platform,
user_info=DatabaseUserInfo.from_dict(user_info) if user_info else None, # type: ignore
group_info=DatabaseGroupInfo.from_dict(group_info) if group_info else None,
group_info=DatabaseGroupInfo.from_dict(cast(dict, group_info)) if group_info else None,
)
# 将消息信封转换为 DatabaseMessages
@@ -431,7 +434,7 @@ class MessageHandler:
chat = await get_chat_manager().get_or_create_stream(
platform=platform,
user_info=DatabaseUserInfo.from_dict(user_info) if user_info else None, # type: ignore
group_info=DatabaseGroupInfo.from_dict(group_info) if group_info else None,
group_info=DatabaseGroupInfo.from_dict(cast(dict, group_info)) if group_info else None,
)
# 将消息信封转换为 DatabaseMessages
@@ -535,7 +538,9 @@ class MessageHandler:
text = message.processed_plain_text or ""
# 获取配置的命令前缀
prefixes = global_config.command.command_prefixes
prefixes = []
if global_config and global_config.command:
prefixes = global_config.command.command_prefixes
# 检查是否以任何前缀开头
matched_prefix = None
@@ -707,7 +712,7 @@ class MessageHandler:
# 检查是否需要处理消息
should_process_in_manager = True
if group_info and str(group_info.group_id) in global_config.message_receive.mute_group_list:
if group_info and global_config and global_config.message_receive and str(group_info.group_id) in global_config.message_receive.mute_group_list:
is_image_or_emoji = message.is_picid or message.is_emoji
if not message.is_mentioned and not is_image_or_emoji:
logger.debug(
@@ -731,7 +736,7 @@ class MessageHandler:
# 情绪系统更新
try:
if global_config.mood.enable_mood:
if global_config and global_config.mood and global_config.mood.enable_mood:
interest_rate = message.interest_value or 0.0
logger.debug(f"开始更新情绪状态,兴趣度: {interest_rate:.2f}")

View File

@@ -56,7 +56,7 @@ async def process_message_from_dict(message_dict: MessageEnvelope, stream_id: st
}
# 异步处理消息段,生成纯文本
processed_plain_text = await _process_message_segments(message_segment, processing_state, message_info)
processed_plain_text = await _process_message_segments(message_segment, processing_state)
# 解析 notice 信息
is_notify = False
@@ -155,15 +155,13 @@ async def process_message_from_dict(message_dict: MessageEnvelope, stream_id: st
async def _process_message_segments(
segment: SegPayload | list[SegPayload],
state: dict,
message_info: MessageInfoPayload
state: dict
) -> str:
"""递归处理消息段,转换为文字描述
Args:
segment: 要处理的消息段TypedDict 或列表)
state: 处理状态字典(用于记录消息类型标记)
message_info: 消息基础信息TypedDict 格式)
Returns:
str: 处理后的文本
@@ -172,7 +170,7 @@ async def _process_message_segments(
if isinstance(segment, list):
segments_text = []
for seg in segment:
processed = await _process_message_segments(seg, state, message_info)
processed = await _process_message_segments(seg, state)
if processed:
segments_text.append(processed)
return " ".join(segments_text)
@@ -186,28 +184,26 @@ async def _process_message_segments(
if seg_type == "seglist" and isinstance(seg_data, list):
segments_text = []
for sub_seg in seg_data:
processed = await _process_message_segments(sub_seg, state, message_info)
processed = await _process_message_segments(sub_seg, state)
if processed:
segments_text.append(processed)
return " ".join(segments_text)
# 处理其他类型
return await _process_single_segment(segment, state, message_info)
return await _process_single_segment(segment, state)
return ""
async def _process_single_segment(
segment: SegPayload,
state: dict,
message_info: MessageInfoPayload
state: dict
) -> str:
"""处理单个消息段
Args:
segment: 消息段TypedDict 格式)
state: 处理状态字典
message_info: 消息基础信息TypedDict 格式)
Returns:
str: 处理后的文本
@@ -234,7 +230,6 @@ async def _process_single_segment(
return f"@{seg_data}" if isinstance(seg_data, str) else "@未知用户"
elif seg_type == "image":
# 如果是base64图片数据
if isinstance(seg_data, str):
state["has_picid"] = True
state["is_picid"] = True
@@ -247,27 +242,17 @@ async def _process_single_segment(
state["has_emoji"] = True
state["is_emoji"] = True
if isinstance(seg_data, str):
return await get_image_manager().get_emoji_description(seg_data)
image_manager = get_image_manager()
return await image_manager.get_emoji_description(seg_data)
return "[发了一个表情包,网卡了加载不出来]"
elif seg_type == "voice":
state["is_voice"] = True
# 检查消息是否由机器人自己发送
user_info = message_info.get("user_info", {})
user_id_str = str(user_info.get("user_id", ""))
if user_id_str == str(global_config.bot.qq_account):
logger.info(f"检测到机器人自身发送的语音消息 (User ID: {user_id_str}),尝试从缓存获取文本。")
if isinstance(seg_data, str):
cached_text = consume_self_voice_text(seg_data)
if cached_text:
logger.info(f"成功从缓存中获取语音文本: '{cached_text[:70]}...'")
return f"[语音:{cached_text}]"
else:
logger.warning("机器人自身语音消息缓存未命中,将回退到标准语音识别。")
# 标准语音识别流程
# 检查是否是自己发送的语音
if isinstance(seg_data, str):
cached_text = consume_self_voice_text(seg_data)
if cached_text:
return f"[语音:{cached_text}]"
return await get_voice_text(seg_data)
return "[发了一段语音,网卡了加载不出来]"
@@ -299,7 +284,7 @@ async def _process_single_segment(
logger.warning("⚠️ Rust视频处理模块不可用跳过视频分析")
return "[视频]"
if global_config.video_analysis.enable:
if global_config and global_config.video_analysis and global_config.video_analysis.enable:
logger.info("已启用视频识别,开始识别")
if isinstance(seg_data, dict):
try:
@@ -317,8 +302,9 @@ async def _process_single_segment(
# 使用video analyzer分析视频
video_analyzer = get_video_analyzer()
prompt = global_config.video_analysis.batch_analysis_prompt if global_config and global_config.video_analysis else ""
result = await video_analyzer.analyze_video_from_bytes(
video_bytes, filename, prompt=global_config.video_analysis.batch_analysis_prompt
video_bytes, filename, prompt=prompt
)
logger.info(f"视频分析结果: {result}")