Refactor anti-injector process result handling

Introduced a ProcessResult enum to standardize anti-injector message processing outcomes. Updated anti_injector.py to return ProcessResult values instead of booleans, and refactored bot.py to handle these results with improved logging and clearer control flow. This change improves code clarity and maintainability for anti-prompt injection logic.
This commit is contained in:
雅诺狐
2025-08-18 17:51:44 +08:00
parent f3958ef484
commit f61710b0ce
3 changed files with 37 additions and 27 deletions

View File

@@ -19,7 +19,7 @@ import datetime
from src.common.logger import get_logger
from src.config.config import global_config
from src.chat.message_receive.message import MessageRecv
from .config import DetectionResult
from .config import DetectionResult, ProcessResult
from .detector import PromptInjectionDetector
from .shield import MessageShield
@@ -38,9 +38,6 @@ class AntiPromptInjector:
self.detector = PromptInjectionDetector()
self.shield = MessageShield()
logger.info(f"反注入系统已初始化 - 模式: {self.config.process_mode}, "
f"规则检测: {self.config.enabled_rules}, LLM检测: {self.config.enabled_LLM}")
async def _get_or_create_stats(self):
"""获取或创建统计记录"""
try:
@@ -95,15 +92,15 @@ class AntiPromptInjector:
except Exception as e:
logger.error(f"更新统计数据失败: {e}")
async def process_message(self, message: MessageRecv) -> Tuple[bool, Optional[str], Optional[str]]:
async def process_message(self, message: MessageRecv) -> Tuple[ProcessResult, Optional[str], Optional[str]]:
"""处理消息并返回结果
Args:
message: 接收到的消息对象
Returns:
Tuple[bool, Optional[str], Optional[str]]:
- 是否允许继续处理消息
Tuple[ProcessResult, Optional[str], Optional[str]]:
- 处理结果状态枚举
- 处理后的消息内容(如果有修改)
- 处理结果说明
"""
@@ -115,7 +112,7 @@ class AntiPromptInjector:
# 1. 检查系统是否启用
if not self.config.enabled:
return True, None, "反注入系统未启用"
return ProcessResult.ALLOWED, None, "反注入系统未启用"
# 2. 检查用户是否被封禁
if self.config.auto_ban_enabled:
@@ -123,12 +120,12 @@ class AntiPromptInjector:
platform = message.message_info.platform
ban_result = await self._check_user_ban(user_id, platform)
if ban_result is not None:
return ban_result
return ProcessResult.BLOCKED_BAN, None, ban_result[2]
# 3. 用户白名单检测
whitelist_result = self._check_whitelist(message)
if whitelist_result is not None:
return whitelist_result
return ProcessResult.ALLOWED, None, whitelist_result[2]
# 4. 内容检测
detection_result = await self.detector.detect(message.processed_plain_text)
@@ -147,7 +144,7 @@ class AntiPromptInjector:
if self.config.process_mode == "strict":
# 严格模式:直接拒绝
await self._update_stats(blocked_messages=1)
return False, None, f"检测到提示词注入攻击,消息已拒绝 (置信度: {detection_result.confidence:.2f})"
return ProcessResult.BLOCKED_INJECTION, None, f"检测到提示词注入攻击,消息已拒绝 (置信度: {detection_result.confidence:.2f})"
elif self.config.process_mode == "lenient":
# 宽松模式:加盾处理
@@ -162,20 +159,20 @@ class AntiPromptInjector:
summary = self.shield.create_safety_summary(detection_result.confidence, detection_result.matched_patterns)
return True, shielded_content, f"检测到可疑内容已加盾处理: {summary}"
return ProcessResult.SHIELDED, shielded_content, f"检测到可疑内容已加盾处理: {summary}"
else:
# 置信度不高,允许通过
return True, None, "检测到轻微可疑内容,已允许通过"
return ProcessResult.ALLOWED, None, "检测到轻微可疑内容,已允许通过"
# 6. 正常消息
return True, None, "消息检查通过"
return ProcessResult.ALLOWED, None, "消息检查通过"
except Exception as e:
logger.error(f"反注入处理异常: {e}", exc_info=True)
await self._update_stats(error_count=1)
# 异常情况下直接阻止消息
return False, None, f"反注入系统异常,消息已阻止: {str(e)}"
return ProcessResult.BLOCKED_INJECTION, None, f"反注入系统异常,消息已阻止: {str(e)}"
finally:
# 更新处理时间统计

View File

@@ -9,6 +9,15 @@
import time
from typing import List, Optional
from dataclasses import dataclass, field
from enum import Enum
class ProcessResult(Enum):
"""处理结果枚举"""
ALLOWED = "allowed" # 允许通过
BLOCKED_INJECTION = "blocked_injection" # 被阻止-注入攻击
BLOCKED_BAN = "blocked_ban" # 被阻止-用户封禁
SHIELDED = "shielded" # 已加盾处理
@dataclass