创建了新的反注入
This commit is contained in:
96
src/chat/security/interfaces.py
Normal file
96
src/chat/security/interfaces.py
Normal file
@@ -0,0 +1,96 @@
|
||||
"""
|
||||
安全检测接口定义
|
||||
"""
|
||||
|
||||
from abc import ABC, abstractmethod
|
||||
from dataclasses import dataclass, field
|
||||
from enum import Enum
|
||||
|
||||
|
||||
class SecurityLevel(Enum):
|
||||
"""安全级别"""
|
||||
|
||||
SAFE = "safe" # 安全
|
||||
LOW_RISK = "low_risk" # 低风险
|
||||
MEDIUM_RISK = "medium_risk" # 中等风险
|
||||
HIGH_RISK = "high_risk" # 高风险
|
||||
CRITICAL = "critical" # 严重风险
|
||||
|
||||
|
||||
class SecurityAction(Enum):
|
||||
"""安全处理动作"""
|
||||
|
||||
ALLOW = "allow" # 允许通过
|
||||
MONITOR = "monitor" # 监控但允许
|
||||
SHIELD = "shield" # 加盾处理
|
||||
BLOCK = "block" # 阻止
|
||||
COUNTER = "counter" # 反击
|
||||
|
||||
|
||||
@dataclass
|
||||
class SecurityCheckResult:
|
||||
"""安全检测结果"""
|
||||
|
||||
is_safe: bool = True # 是否安全
|
||||
level: SecurityLevel = SecurityLevel.SAFE # 风险级别
|
||||
confidence: float = 0.0 # 置信度 (0.0-1.0)
|
||||
action: SecurityAction = SecurityAction.ALLOW # 建议动作
|
||||
reason: str = "" # 检测原因
|
||||
details: dict = field(default_factory=dict) # 详细信息
|
||||
matched_patterns: list[str] = field(default_factory=list) # 匹配的模式
|
||||
checker_name: str = "" # 检测器名称
|
||||
processing_time: float = 0.0 # 处理时间(秒)
|
||||
|
||||
def __post_init__(self):
|
||||
"""结果后处理"""
|
||||
# 根据风险级别自动设置 is_safe
|
||||
if self.level in [SecurityLevel.HIGH_RISK, SecurityLevel.CRITICAL]:
|
||||
self.is_safe = False
|
||||
|
||||
|
||||
class SecurityChecker(ABC):
|
||||
"""安全检测器基类"""
|
||||
|
||||
def __init__(self, name: str, priority: int = 50):
|
||||
"""初始化检测器
|
||||
|
||||
Args:
|
||||
name: 检测器名称
|
||||
priority: 优先级 (0-100,数值越大优先级越高)
|
||||
"""
|
||||
self.name = name
|
||||
self.priority = priority
|
||||
self.enabled = True
|
||||
|
||||
@abstractmethod
|
||||
async def check(self, message: str, context: dict | None = None) -> SecurityCheckResult:
|
||||
"""执行安全检测
|
||||
|
||||
Args:
|
||||
message: 待检测的消息内容
|
||||
context: 上下文信息(可选),包含用户信息、聊天信息等
|
||||
|
||||
Returns:
|
||||
SecurityCheckResult: 检测结果
|
||||
"""
|
||||
pass
|
||||
|
||||
def enable(self):
|
||||
"""启用检测器"""
|
||||
self.enabled = True
|
||||
|
||||
def disable(self):
|
||||
"""禁用检测器"""
|
||||
self.enabled = False
|
||||
|
||||
async def pre_check(self, message: str, context: dict | None = None) -> bool:
|
||||
"""预检查,快速判断是否需要执行完整检查
|
||||
|
||||
Args:
|
||||
message: 待检测的消息内容
|
||||
context: 上下文信息
|
||||
|
||||
Returns:
|
||||
bool: True表示需要完整检查,False表示可以跳过
|
||||
"""
|
||||
return True # 默认总是执行完整检查
|
||||
Reference in New Issue
Block a user