feat(scorer): 添加概率输出对齐功能,支持二分类和三分类模型

This commit is contained in:
Windpicker-owo
2025-12-13 17:29:13 +08:00
parent b7e8f04f17
commit 21ccb6f0cd
3 changed files with 136 additions and 32 deletions

View File

@@ -171,12 +171,12 @@ class SemanticInterestModel:
# 确保类别顺序为 [-1, 0, 1]
classes = self.clf.classes_
if not np.array_equal(classes, [-1, 0, 1]):
# 需要重新排序
sorted_proba = np.zeros_like(proba)
# 需要重排/补齐(即使是二分类,也保证输出 3 列)
sorted_proba = np.zeros((proba.shape[0], 3), dtype=proba.dtype)
for i, cls in enumerate([-1, 0, 1]):
idx = np.where(classes == cls)[0]
if len(idx) > 0:
sorted_proba[:, i] = proba[:, idx[0]]
sorted_proba[:, i] = proba[:, int(idx[0])]
return sorted_proba
return proba

View File

@@ -101,6 +101,11 @@ class FastScorer:
# 偏置项: bias_pos - bias_neg
self.bias: float = 0.0
# 输出变换interest = output_bias + output_scale * sigmoid(z)
# 用于兼容二分类(缺少中立/负类)等情况
self.output_bias: float = 0.0
self.output_scale: float = 1.0
# 元信息
self.meta: dict[str, Any] = {}
self.is_loaded = False
@@ -156,19 +161,64 @@ class FastScorer:
idf = tfidf.idf_ # numpy array, shape (n_features,)
# 获取 LR 权重
# clf.coef_ shape: (n_classes, n_features) 对于多分类
# classes_ 顺序应该是 [-1, 0, 1]
coef = clf.coef_ # shape (3, n_features)
intercept = clf.intercept_ # shape (3,)
classes = clf.classes_
# - 多分类: coef_.shape == (n_classes, n_features)
# - 二分类: coef_.shape == (1, n_features),对应 classes_[1] 的 logit
coef = np.asarray(clf.coef_)
intercept = np.asarray(clf.intercept_)
classes = np.asarray(clf.classes_)
# 找到 -1 和 1 的索引
idx_neg = np.where(classes == -1)[0][0]
idx_pos = np.where(classes == 1)[0][0]
# 默认输出变换
self.output_bias = 0.0
self.output_scale = 1.0
# 计算 z_interest = z_pos - z_neg 的权重
w_interest = coef[idx_pos] - coef[idx_neg] # shape (n_features,)
b_interest = intercept[idx_pos] - intercept[idx_neg]
extraction_mode = "unknown"
b_interest: float
if len(classes) == 2 and coef.shape[0] == 1:
# 二分类sigmoid(w·x + b) == P(classes_[1])
w_interest = coef[0]
b_interest = float(intercept[0]) if intercept.size else 0.0
extraction_mode = "binary"
# 兼容兴趣分定义interest = P(1) + 0.5*P(0)
# 二分类下缺失的类别概率视为 0 或 (1-P(pos)),可化简为线性变换
class_set = {int(c) for c in classes.tolist()}
pos_label = int(classes[1])
if class_set == {-1, 1} and pos_label == 1:
# interest = P(1)
self.output_bias, self.output_scale = 0.0, 1.0
elif class_set == {0, 1} and pos_label == 1:
# P(0) = 1 - P(1) => interest = P(1) + 0.5*(1-P(1)) = 0.5 + 0.5*P(1)
self.output_bias, self.output_scale = 0.5, 0.5
elif class_set == {-1, 0} and pos_label == 0:
# interest = 0.5*P(0)
self.output_bias, self.output_scale = 0.0, 0.5
else:
logger.warning(f"[FastScorer] 非标准二分类标签 {classes.tolist()},将直接使用 sigmoid(logit)")
else:
# 多分类/非标准:尽量构造一个可用的 z
if coef.ndim != 2 or coef.shape[0] != len(classes):
raise ValueError(
f"不支持的模型权重形状: coef={coef.shape}, classes={classes.tolist()}"
)
if (-1 in classes) and (1 in classes):
# 对三分类:使用 z_pos - z_neg 近似兴趣 logit忽略中立
idx_neg = int(np.where(classes == -1)[0][0])
idx_pos = int(np.where(classes == 1)[0][0])
w_interest = coef[idx_pos] - coef[idx_neg]
b_interest = float(intercept[idx_pos] - intercept[idx_neg])
extraction_mode = "multiclass_diff"
elif 1 in classes:
# 退化:仅使用 class=1 的 logit仍然输出 sigmoid(logit)
idx_pos = int(np.where(classes == 1)[0][0])
w_interest = coef[idx_pos]
b_interest = float(intercept[idx_pos])
extraction_mode = "multiclass_pos_only"
logger.warning(f"[FastScorer] 模型缺少 -1 类别: {classes.tolist()},将仅使用 class=1 logit")
else:
raise ValueError(f"模型缺少 class=1无法构建兴趣评分: classes={classes.tolist()}")
# 融合: combined_weight = w_interest * idf
combined_weights = w_interest * idf
@@ -200,6 +250,10 @@ class FastScorer:
"top_k_weights": self.config.top_k_weights,
"bias": self.bias,
"ngram_range": self.config.ngram_range,
"classes": classes.tolist(),
"extraction_mode": extraction_mode,
"output_bias": self.output_bias,
"output_scale": self.output_scale,
}
logger.info(
@@ -272,6 +326,9 @@ class FastScorer:
except OverflowError:
interest = 0.0 if z < 0 else 1.0
interest = self.output_bias + self.output_scale * interest
interest = max(0.0, min(1.0, interest))
# 统计
self.total_scores += 1
self.total_time += time.time() - start_time

View File

@@ -82,6 +82,45 @@ class SemanticInterestScorer:
self.total_scores = 0
self.total_time = 0.0
def _get_underlying_clf(self):
model = self.model
if model is None:
return None
return model.clf if hasattr(model, "clf") else model
def _proba_to_three(self, proba_row) -> tuple[float, float, float]:
"""将任意 predict_proba 输出对齐为 (-1, 0, 1) 三类概率。
兼容情况:
- 三分类classes_ 可能不是 [-1,0,1],需要按 classes_ 重排
- 二分类classes_ 可能是 [-1,1] / [0,1] / [-1,0]
- 包装模型:可能已输出固定 3 列(按 [-1,0,1])但 classes_ 仍为二类
"""
# numpy array / list 都支持 len() 与迭代
proba_row = list(proba_row)
clf = self._get_underlying_clf()
classes = getattr(clf, "classes_", None)
if classes is not None and len(classes) == len(proba_row):
mapping = {int(cls): float(p) for cls, p in zip(classes, proba_row)}
return (
mapping.get(-1, 0.0),
mapping.get(0, 0.0),
mapping.get(1, 0.0),
)
# 兼容包装模型输出:固定为 [-1, 0, 1]
if len(proba_row) == 3:
return float(proba_row[0]), float(proba_row[1]), float(proba_row[2])
# 无 classes_ 时的保守兜底(尽量不抛异常)
if len(proba_row) == 2:
return float(proba_row[0]), 0.0, float(proba_row[1])
if len(proba_row) == 1:
return 0.0, float(proba_row[0]), 0.0
raise ValueError(f"不支持的 proba 形状: len={len(proba_row)}")
def load(self):
"""同步加载模型(阻塞)"""
if not self.model_path.exists():
@@ -105,6 +144,7 @@ class SemanticInterestScorer:
ngram_range=self.vectorizer.get_config().get("ngram_range", (2, 3)),
weight_prune_threshold=1e-4,
)
try:
self._fast_scorer = FastScorer.from_sklearn_model(
self.vectorizer, self.model, config
)
@@ -112,6 +152,9 @@ class SemanticInterestScorer:
f"[FastScorer] 已启用,词表从 {self.vectorizer.get_vocabulary_size()} "
f"剪枝到 {len(self._fast_scorer.token_weights)}"
)
except Exception as e:
self._fast_scorer = None
logger.warning(f"[FastScorer] 初始化失败,将回退到 sklearn 评分路径: {e}")
self.is_loaded = True
load_time = time.time() - start_time
@@ -154,6 +197,7 @@ class SemanticInterestScorer:
ngram_range=self.vectorizer.get_config().get("ngram_range", (2, 3)),
weight_prune_threshold=1e-4,
)
try:
self._fast_scorer = FastScorer.from_sklearn_model(
self.vectorizer, self.model, config
)
@@ -161,6 +205,9 @@ class SemanticInterestScorer:
f"[FastScorer] 已启用,词表从 {self.vectorizer.get_vocabulary_size()} "
f"剪枝到 {len(self._fast_scorer.token_weights)}"
)
except Exception as e:
self._fast_scorer = None
logger.warning(f"[FastScorer] 初始化失败,将回退到 sklearn 评分路径: {e}")
self.is_loaded = True
load_time = time.time() - start_time
@@ -218,8 +265,7 @@ class SemanticInterestScorer:
# 预测概率
proba = self.model.predict_proba(X)[0]
# proba 顺序为 [-1, 0, 1]
p_neg, p_neu, p_pos = proba
p_neg, p_neu, p_pos = self._proba_to_three(proba)
# 兴趣分计算策略:
# interest = P(1) + 0.5 * P(0)
@@ -297,7 +343,8 @@ class SemanticInterestScorer:
# 计算兴趣分
interests = []
for p_neg, p_neu, p_pos in proba:
for row in proba:
_, p_neu, p_pos = self._proba_to_three(row)
interest = float(p_pos + 0.5 * p_neu)
interest = max(0.0, min(1.0, interest))
interests.append(interest)
@@ -390,7 +437,7 @@ class SemanticInterestScorer:
proba = self.model.predict_proba(X)[0]
pred_label = self.model.predict(X)[0]
p_neg, p_neu, p_pos = proba
p_neg, p_neu, p_pos = self._proba_to_three(proba)
interest = float(p_pos + 0.5 * p_neu)
return {