feat(affinity-flow): 重构兴趣度评分系统为智能embedding匹配

- 移除传统关键词匹配方式,改用embedding计算智能兴趣匹配度
- 添加异步方法支持机器人兴趣管理器的智能匹配计算
- 增加详细的日志记录和错误处理机制
- 添加数据库关键词提取和降级处理逻辑
- 集成智能兴趣系统初始化到人设构建流程
- 防止回复自身消息的死循环保护机制

BREAKING CHANGE: 兴趣匹配评分机制完全重构,从基于关键词的硬编码匹配改为基于embedding的智能匹配,需要重新初始化兴趣系统
This commit is contained in:
Windpicker-owo
2025-09-16 22:55:38 +08:00
committed by Windpicker-owo
parent 2739848451
commit dcdef633e0
9 changed files with 1322 additions and 97 deletions

View File

@@ -1,12 +1,14 @@
"""
兴趣度评分系统
基于多维度评分机制,包括兴趣匹配度、用户关系分、提及度和时间因子
现在使用embedding计算智能兴趣匹配
"""
from datetime import datetime
from typing import Dict, List
import traceback
from typing import Dict, List, Any
from src.common.data_models.database_data_model import DatabaseMessages
from src.common.data_models.info_data_model import InterestScore
from src.chat.interest_system import bot_interest_manager
from src.common.logger import get_logger
from src.config.config import global_config
@@ -17,15 +19,8 @@ class InterestScoringSystem:
"""兴趣度评分系统"""
def __init__(self):
self.interest_keywords = {
"游戏": ["游戏", "原神", "米哈游", "抽卡", "角色", "装备", "任务", "副本", "PVP", "LOL", "王者荣耀", "吃鸡"],
"动漫": ["动漫", "二次元", "新番", "番剧", "漫画", "角色", "声优", "OP", "ED"],
"音乐": ["音乐", "歌曲", "歌手", "专辑", "演唱会", "乐器", "作词", "作曲"],
"电影": ["电影", "电视剧", "综艺", "演员", "导演", "剧情", "影评", "票房"],
"科技": ["科技", "AI", "人工智能", "编程", "Python", "代码", "软件", "硬件", "手机"],
"生活": ["生活", "日常", "美食", "旅行", "天气", "工作", "学习", "健身"],
"情感": ["情感", "心情", "感情", "恋爱", "友情", "家人", "开心", "难过", "生气"],
}
# 智能兴趣匹配配置
self.use_smart_matching = True
# 评分权重
self.score_weights = {
@@ -46,30 +41,51 @@ class InterestScoringSystem:
# 用户关系数据
self.user_relationships: Dict[str, float] = {} # user_id -> relationship_score
def calculate_interest_scores(self, messages: List[DatabaseMessages], bot_nickname: str) -> List[InterestScore]:
async def calculate_interest_scores(self, messages: List[DatabaseMessages], bot_nickname: str) -> List[InterestScore]:
"""计算消息的兴趣度评分"""
scores = []
logger.info("🚀 开始计算消息兴趣度评分...")
logger.info(f"📨 收到 {len(messages)} 条消息")
# 通过 user_id 判断是否是用户消息(非机器人发送的消息)
user_messages = [msg for msg in messages if str(msg.user_info.user_id) != str(global_config.bot.qq_account)]
logger.info(f"👤 过滤出 {len(user_messages)} 条用户消息")
for msg in user_messages:
score = self._calculate_single_message_score(msg, bot_nickname)
scores = []
for i, msg in enumerate(user_messages, 1):
logger.info(f"📋 [{i}/{len(user_messages)}] 处理消息 ID: {msg.message_id}")
score = await self._calculate_single_message_score(msg, bot_nickname)
scores.append(score)
logger.info(f"✅ 兴趣度评分计算完成,生成 {len(scores)} 个评分")
return scores
def _calculate_single_message_score(self, message: DatabaseMessages, bot_nickname: str) -> InterestScore:
async def _calculate_single_message_score(self, message: DatabaseMessages, bot_nickname: str) -> InterestScore:
"""计算单条消息的兴趣度评分"""
# 1. 计算兴趣匹配度
interest_match_score = self._calculate_interest_match_score(message.processed_plain_text)
logger.info(f"🎯 计算消息 {message.message_id} 的兴趣度评分...")
logger.debug(f"📝 消息长度: {len(message.processed_plain_text)} 字符")
# 提取关键词(从数据库的反序列化字段)
logger.debug("🔍 提取关键词...")
keywords = self._extract_keywords_from_database(message)
logger.debug(f"🏷️ 提取到 {len(keywords)} 个关键词")
# 1. 计算兴趣匹配度(现在是异步的)
logger.debug("🧠 计算兴趣匹配度...")
interest_match_score = await self._calculate_interest_match_score(message.processed_plain_text, keywords)
logger.debug(f"📊 兴趣匹配度: {interest_match_score:.3f}")
# 2. 计算关系分
logger.debug("🤝 计算关系分...")
relationship_score = self._calculate_relationship_score(message.user_info.user_id)
logger.debug(f"💝 关系分: {relationship_score:.3f}")
# 3. 计算提及分数
logger.debug("📢 计算提及分数...")
mentioned_score = self._calculate_mentioned_score(message, bot_nickname)
logger.debug(f"📣 提及分数: {mentioned_score:.3f}")
# 5. 计算总分
# 4. 计算总分
logger.debug("🧮 计算加权总分...")
total_score = (
interest_match_score * self.score_weights["interest_match"] +
relationship_score * self.score_weights["relationship"] +
@@ -77,11 +93,15 @@ class InterestScoringSystem:
)
details = {
"interest_match": f"兴趣匹配度: {interest_match_score:.2f}",
"relationship": f"关系分: {relationship_score:.2f}",
"mentioned": f"提及分数: {mentioned_score:.2f}",
"interest_match": f"兴趣匹配度: {interest_match_score:.3f}",
"relationship": f"关系分: {relationship_score:.3f}",
"mentioned": f"提及分数: {mentioned_score:.3f}",
}
logger.info(f"📈 消息 {message.message_id} 最终评分: {total_score:.3f}")
logger.debug(f"⚖️ 评分权重: {self.score_weights}")
logger.debug(f"📋 评分详情: {details}")
return InterestScore(
message_id=message.message_id,
total_score=total_score,
@@ -91,32 +111,107 @@ class InterestScoringSystem:
details=details
)
def _calculate_interest_match_score(self, content: str) -> float:
"""计算兴趣匹配度"""
async def _calculate_interest_match_score(self, content: str, keywords: List[str] = None) -> float:
"""计算兴趣匹配度 - 使用智能embedding匹配"""
if not content:
return 0.0
content_lower = content.lower()
max_score = 0.0
# 使用智能匹配embedding
if self.use_smart_matching and bot_interest_manager.is_initialized:
return await self._calculate_smart_interest_match(content, keywords)
else:
# 智能匹配未初始化,返回默认分数
logger.warning("智能兴趣匹配系统未初始化,返回默认分数")
return 0.3
for _category, keywords in self.interest_keywords.items():
category_score = 0.0
matched_keywords = []
async def _calculate_smart_interest_match(self, content: str, keywords: List[str] = None) -> float:
"""使用embedding计算智能兴趣匹配"""
try:
logger.debug("🧠 开始智能兴趣匹配计算...")
for keyword in keywords:
if keyword.lower() in content_lower:
category_score += 0.1
matched_keywords.append(keyword)
# 如果没有传入关键词,则提取
if not keywords:
logger.debug("🔍 从内容中提取关键词...")
keywords = self._extract_keywords_from_content(content)
logger.debug(f"🏷️ 提取到 {len(keywords)} 个关键词")
# 如果匹配到多个关键词,增加额外分数
if len(matched_keywords) > 1:
category_score += (len(matched_keywords) - 1) * 0.05
# 使用机器人兴趣管理器计算匹配度
logger.debug("🤖 调用机器人兴趣管理器计算匹配度...")
match_result = await bot_interest_manager.calculate_interest_match(content, keywords)
# 限制每个类别的最高分
category_score = min(category_score, 0.8)
max_score = max(max_score, category_score)
if match_result:
logger.debug("✅ 智能兴趣匹配成功:")
logger.debug(f" 📊 总分: {match_result.overall_score:.3f}")
logger.debug(f" 🏷️ 匹配标签: {match_result.matched_tags}")
logger.debug(f" 🎯 最佳标签: {match_result.top_tag}")
logger.debug(f" 📈 置信度: {match_result.confidence:.3f}")
logger.debug(f" 🔢 匹配详情: {match_result.match_scores}")
return min(max_score, 1.0)
# 返回匹配分数,考虑置信度
final_score = match_result.overall_score * match_result.confidence
logger.debug(f"⚖️ 最终分数(总分×置信度): {final_score:.3f}")
return final_score
else:
logger.warning("⚠️ 智能兴趣匹配未返回结果")
return 0.0
except Exception as e:
logger.error(f"❌ 智能兴趣匹配计算失败: {e}")
logger.debug("🔍 错误详情:")
logger.debug(f" 💬 内容长度: {len(content)} 字符")
logger.debug(f" 🏷️ 关键词数量: {len(keywords) if keywords else 0}")
return 0.0
def _extract_keywords_from_database(self, message: DatabaseMessages) -> List[str]:
"""从数据库消息中提取关键词"""
keywords = []
# 尝试从 key_words 字段提取存储的是JSON字符串
if message.key_words:
try:
import orjson
keywords = orjson.loads(message.key_words)
if not isinstance(keywords, list):
keywords = []
except (orjson.JSONDecodeError, TypeError):
keywords = []
# 如果没有 keywords尝试从 key_words_lite 提取
if not keywords and message.key_words_lite:
try:
import orjson
keywords = orjson.loads(message.key_words_lite)
if not isinstance(keywords, list):
keywords = []
except (orjson.JSONDecodeError, TypeError):
keywords = []
# 如果还是没有,从消息内容中提取(降级方案)
if not keywords:
keywords = self._extract_keywords_from_content(message.processed_plain_text)
return keywords[:15] # 返回前15个关键词
def _extract_keywords_from_content(self, content: str) -> List[str]:
"""从内容中提取关键词(降级方案)"""
import re
# 清理文本
content = re.sub(r'[^\w\s\u4e00-\u9fff]', ' ', content) # 保留中文、英文、数字
words = content.split()
# 过滤和关键词提取
keywords = []
for word in words:
word = word.strip()
if (len(word) >= 2 and # 至少2个字符
word.isalnum() and # 字母数字
not word.isdigit()): # 不是纯数字
keywords.append(word.lower())
# 去重并限制数量
unique_keywords = list(set(keywords))
return unique_keywords[:10] # 返回前10个唯一关键词
def _calculate_relationship_score(self, user_id: str) -> float:
"""计算关系分"""
@@ -137,40 +232,69 @@ class InterestScoringSystem:
def should_reply(self, score: InterestScore) -> bool:
"""判断是否应该回复"""
logger.info("🤔 评估是否应该回复...")
logger.debug("📊 评分详情:")
logger.debug(f" 📝 消息ID: {score.message_id}")
logger.debug(f" 💯 总分: {score.total_score:.3f}")
logger.debug(f" 🧠 兴趣匹配: {score.interest_match_score:.3f}")
logger.debug(f" 🤝 关系分: {score.relationship_score:.3f}")
logger.debug(f" 📢 提及分: {score.mentioned_score:.3f}")
base_threshold = self.reply_threshold
logger.debug(f"📋 基础阈值: {base_threshold:.3f}")
# 如果被提及,降低阈值
if score.mentioned_score >= 1.0:
base_threshold = self.mention_threshold
logger.debug(f"📣 消息提及了机器人,使用降低阈值: {base_threshold:.3f}")
# 计算连续不回复的概率提升
probability_boost = min(self.no_reply_count * self.probability_boost_per_no_reply, 0.8)
effective_threshold = base_threshold - probability_boost
logger.debug(f"评分决策: 总分={score.total_score:.2f}, 有效阈值={effective_threshold:.2f}, 连续不回复次数={self.no_reply_count}")
logger.debug("📈 连续不回复统计:")
logger.debug(f" 🚫 不回复次数: {self.no_reply_count}")
logger.debug(f" 📈 概率提升: {probability_boost:.3f}")
logger.debug(f" 🎯 有效阈值: {effective_threshold:.3f}")
return score.total_score >= effective_threshold
# 做出决策
should_reply = score.total_score >= effective_threshold
decision = "✅ 应该回复" if should_reply else "❌ 不回复"
logger.info(f"🎯 回复决策: {decision}")
logger.info(f"📊 决策依据: {score.total_score:.3f} {'>=' if should_reply else '<'} {effective_threshold:.3f}")
return should_reply
def record_reply_action(self, did_reply: bool):
"""记录回复动作"""
old_count = self.no_reply_count
if did_reply:
self.no_reply_count = max(0, self.no_reply_count - 1)
action = "✅ 回复了消息"
else:
self.no_reply_count += 1
action = "❌ 选择不回复"
# 限制最大计数
self.no_reply_count = min(self.no_reply_count, self.max_no_reply_count)
logger.debug(f"回复动作记录: {did_reply}, 当前连续不回复次数: {self.no_reply_count}")
logger.info(f"📊 记录回复动作: {action}")
logger.info(f"📈 连续不回复次数: {old_count}{self.no_reply_count}")
logger.debug(f"📋 最大限制: {self.max_no_reply_count}")
def update_user_relationship(self, user_id: str, relationship_change: float):
"""更新用户关系"""
if user_id in self.user_relationships:
self.user_relationships[user_id] = max(0.0, min(1.0, self.user_relationships[user_id] + relationship_change))
else:
self.user_relationships[user_id] = max(0.0, min(1.0, relationship_change))
old_score = self.user_relationships.get(user_id, 0.3) # 默认新用户分数
new_score = max(0.0, min(1.0, old_score + relationship_change))
logger.debug(f"更新用户关系: {user_id} -> {self.user_relationships[user_id]:.2f}")
self.user_relationships[user_id] = new_score
change_direction = "📈" if relationship_change > 0 else "📉" if relationship_change < 0 else ""
logger.info(f"{change_direction} 更新用户关系: {user_id}")
logger.info(f"💝 关系分: {old_score:.3f}{new_score:.3f} (变化: {relationship_change:+.3f})")
logger.debug(f"👥 当前追踪用户数: {len(self.user_relationships)}")
def get_user_relationship(self, user_id: str) -> float:
"""获取用户关系分"""
@@ -184,33 +308,44 @@ class InterestScoringSystem:
"reply_threshold": self.reply_threshold,
"mention_threshold": self.mention_threshold,
"user_relationships": len(self.user_relationships),
"interest_categories": len(self.interest_keywords),
}
def add_interest_category(self, category: str, keywords: List[str]):
"""添加新的兴趣类别"""
self.interest_keywords[category] = keywords
logger.info(f"添加新的兴趣类别: {category}, 关键词数量: {len(keywords)}")
def remove_interest_category(self, category: str):
"""移除兴趣类别"""
if category in self.interest_keywords:
del self.interest_keywords[category]
logger.info(f"移除兴趣类别: {category}")
def update_interest_keywords(self, category: str, keywords: List[str]):
"""更新兴趣类别的关键词"""
if category in self.interest_keywords:
self.interest_keywords[category] = keywords
logger.info(f"更新兴趣类别 {category} 的关键词: {len(keywords)}")
else:
self.add_interest_category(category, keywords)
def get_interest_keywords(self) -> Dict[str, List[str]]:
"""获取所有兴趣关键词"""
return self.interest_keywords.copy()
def reset_stats(self):
"""重置统计信息"""
self.no_reply_count = 0
logger.info("重置兴趣度评分系统统计")
async def initialize_smart_interests(self, personality_description: str, personality_id: str = "default"):
"""初始化智能兴趣系统"""
try:
logger.info("🚀 开始初始化智能兴趣系统...")
logger.info(f"📋 人设ID: {personality_id}")
logger.info(f"📝 人设描述长度: {len(personality_description)} 字符")
await bot_interest_manager.initialize(personality_description, personality_id)
logger.info("✅ 智能兴趣系统初始化完成")
# 显示初始化后的统计信息
stats = bot_interest_manager.get_interest_stats()
logger.info("📊 兴趣系统统计:")
logger.info(f" 🏷️ 总标签数: {stats.get('total_tags', 0)}")
logger.info(f" 💾 缓存大小: {stats.get('cache_size', 0)}")
logger.info(f" 🧠 模型: {stats.get('embedding_model', '未知')}")
except Exception as e:
logger.error(f"❌ 初始化智能兴趣系统失败: {e}")
logger.error("🔍 错误详情:")
traceback.print_exc()
def get_matching_config(self) -> Dict[str, Any]:
"""获取匹配配置信息"""
return {
"use_smart_matching": self.use_smart_matching,
"smart_system_initialized": bot_interest_manager.is_initialized,
"smart_system_stats": bot_interest_manager.get_interest_stats() if bot_interest_manager.is_initialized else None
}
# 创建全局兴趣评分系统实例
interest_scoring_system = InterestScoringSystem()

View File

@@ -0,0 +1,17 @@
"""
机器人兴趣标签系统
基于人设生成兴趣标签使用embedding计算匹配度
"""
from .bot_interest_manager import BotInterestManager, bot_interest_manager
from src.common.data_models.bot_interest_data_model import (
BotInterestTag, BotPersonalityInterests, InterestMatchResult
)
__all__ = [
"BotInterestManager",
"bot_interest_manager",
"BotInterestTag",
"BotPersonalityInterests",
"InterestMatchResult"
]

View File

@@ -0,0 +1,666 @@
"""
机器人兴趣标签管理系统
基于人设生成兴趣标签并使用embedding计算匹配度
"""
import orjson
import traceback
from typing import List, Dict, Optional, Any
from datetime import datetime
import numpy as np
from src.common.logger import get_logger
from src.common.data_models.bot_interest_data_model import (
BotPersonalityInterests, BotInterestTag, InterestMatchResult
)
logger = get_logger("bot_interest_manager")
class BotInterestManager:
"""机器人兴趣标签管理器"""
def __init__(self):
self.current_interests: Optional[BotPersonalityInterests] = None
self.embedding_cache: Dict[str, List[float]] = {} # embedding缓存
self._initialized = False
# Embedding客户端配置
self.embedding_request = None
self.embedding_config = None
self.embedding_dimension = 1024 # 默认BGE-M3 embedding维度
@property
def is_initialized(self) -> bool:
"""检查兴趣系统是否已初始化"""
return self._initialized
async def initialize(self, personality_description: str, personality_id: str = "default"):
"""初始化兴趣标签系统"""
try:
logger.info("=" * 60)
logger.info("🚀 开始初始化机器人兴趣标签系统")
logger.info(f"📋 人设ID: {personality_id}")
logger.info(f"📝 人设描述长度: {len(personality_description)} 字符")
logger.info("=" * 60)
# 初始化embedding模型
logger.info("🧠 正在初始化embedding模型...")
await self._initialize_embedding_model()
# 检查embedding客户端是否成功初始化
if not self.embedding_request:
raise RuntimeError("❌ Embedding客户端初始化失败无法继续")
# 生成或加载兴趣标签
logger.info("🎯 正在生成或加载兴趣标签...")
await self._load_or_generate_interests(personality_description, personality_id)
self._initialized = True
# 检查是否成功获取兴趣标签
if self.current_interests and len(self.current_interests.get_active_tags()) > 0:
active_tags_count = len(self.current_interests.get_active_tags())
logger.info("=" * 60)
logger.info("✅ 机器人兴趣标签系统初始化完成!")
logger.info(f"📊 活跃兴趣标签数量: {active_tags_count}")
logger.info(f"💾 Embedding缓存大小: {len(self.embedding_cache)}")
logger.info("=" * 60)
else:
raise RuntimeError("❌ 未能成功生成或加载兴趣标签")
except Exception as e:
logger.error("=" * 60)
logger.error(f"❌ 初始化机器人兴趣标签系统失败: {e}")
logger.error("=" * 60)
traceback.print_exc()
raise # 重新抛出异常,不允许降级初始化
async def _initialize_embedding_model(self):
"""初始化embedding模型"""
logger.info("🔧 正在配置embedding客户端...")
# 使用项目配置的embedding模型
from src.config.config import model_config
from src.llm_models.utils_model import LLMRequest
logger.debug("✅ 成功导入embedding相关模块")
# 检查embedding配置是否存在
if not hasattr(model_config.model_task_config, 'embedding'):
raise RuntimeError("❌ 未找到embedding模型配置")
logger.info("📋 找到embedding模型配置")
self.embedding_config = model_config.model_task_config.embedding
self.embedding_dimension = 1024 # BGE-M3的维度
logger.info(f"📐 使用模型维度: {self.embedding_dimension}")
# 创建LLMRequest实例用于embedding
self.embedding_request = LLMRequest(model_set=self.embedding_config, request_type="interest_embedding")
logger.info("✅ Embedding请求客户端初始化成功")
logger.info(f"🔗 客户端类型: {type(self.embedding_request).__name__}")
# 获取第一个embedding模型的ModelInfo
if hasattr(self.embedding_config, 'model_list') and self.embedding_config.model_list:
first_model_name = self.embedding_config.model_list[0]
logger.info(f"🎯 使用embedding模型: {first_model_name}")
else:
logger.warning("⚠️ 未找到embedding模型列表")
logger.info("✅ Embedding模型初始化完成")
async def _load_or_generate_interests(self, personality_description: str, personality_id: str):
"""加载或生成兴趣标签"""
logger.info(f"📚 正在为 '{personality_id}' 加载或生成兴趣标签...")
# 首先尝试从数据库加载
logger.info("💾 尝试从数据库加载现有兴趣标签...")
loaded_interests = await self._load_interests_from_database(personality_id)
if loaded_interests:
self.current_interests = loaded_interests
active_count = len(loaded_interests.get_active_tags())
logger.info(f"✅ 成功从数据库加载 {active_count} 个兴趣标签")
logger.info(f"📅 最后更新时间: {loaded_interests.last_updated}")
logger.info(f"🔄 版本号: {loaded_interests.version}")
else:
# 生成新的兴趣标签
logger.info("🆕 数据库中未找到兴趣标签,开始生成新的...")
logger.info("🤖 正在调用LLM生成个性化兴趣标签...")
generated_interests = await self._generate_interests_from_personality(personality_description, personality_id)
if generated_interests:
self.current_interests = generated_interests
active_count = len(generated_interests.get_active_tags())
logger.info(f"✅ 成功生成 {active_count} 个兴趣标签")
# 保存到数据库
logger.info("💾 正在保存兴趣标签到数据库...")
await self._save_interests_to_database(generated_interests)
else:
raise RuntimeError("❌ 兴趣标签生成失败")
async def _generate_interests_from_personality(self, personality_description: str, personality_id: str) -> Optional[BotPersonalityInterests]:
"""根据人设生成兴趣标签"""
try:
logger.info("🎨 开始根据人设生成兴趣标签...")
logger.info(f"📝 人设长度: {len(personality_description)} 字符")
# 检查embedding客户端是否可用
if not hasattr(self, 'embedding_request'):
raise RuntimeError("❌ Embedding客户端未初始化无法生成兴趣标签")
# 构建提示词
logger.info("📝 构建LLM提示词...")
prompt = f"""
基于以下机器人人设描述,生成一套合适的兴趣标签:
人设描述:
{personality_description}
请生成一系列兴趣关键词标签,要求:
1. 标签应该符合人设特点和性格
2. 每个标签都有权重0.1-1.0),表示对该兴趣的喜好程度
3. 生成15-25个不等的标签
4. 标签应该是具体的关键词,而不是抽象概念
请以JSON格式返回格式如下
{{
"interests": [
{{"name": "标签名", "weight": 0.8}},
{{"name": "标签名", "weight": 0.6}},
{{"name": "标签名", "weight": 0.9}}
]
}}
注意:
- 权重范围0.1-1.0,权重越高表示越感兴趣
- 标签要具体,如"编程""游戏""旅行"
- 根据人设生成个性化的标签
"""
# 调用LLM生成兴趣标签
logger.info("🤖 正在调用LLM生成兴趣标签...")
response = await self._call_llm_for_interest_generation(prompt)
if not response:
raise RuntimeError("❌ LLM未返回有效响应")
logger.info("✅ LLM响应成功开始解析兴趣标签...")
interests_data = orjson.loads(response)
bot_interests = BotPersonalityInterests(
personality_id=personality_id,
personality_description=personality_description
)
# 解析生成的兴趣标签
interests_list = interests_data.get("interests", [])
logger.info(f"📋 解析到 {len(interests_list)} 个兴趣标签")
for i, tag_data in enumerate(interests_list):
tag_name = tag_data.get("name", f"标签_{i}")
weight = tag_data.get("weight", 0.5)
tag = BotInterestTag(
tag_name=tag_name,
weight=weight
)
bot_interests.interest_tags.append(tag)
logger.debug(f" 🏷️ {tag_name} (权重: {weight:.2f})")
# 为所有标签生成embedding
logger.info("🧠 开始为兴趣标签生成embedding向量...")
await self._generate_embeddings_for_tags(bot_interests)
logger.info("✅ 兴趣标签生成完成")
return bot_interests
except orjson.JSONDecodeError as e:
logger.error(f"❌ 解析LLM响应JSON失败: {e}")
raise
except Exception as e:
logger.error(f"❌ 根据人设生成兴趣标签失败: {e}")
traceback.print_exc()
raise
async def _call_llm_for_interest_generation(self, prompt: str) -> Optional[str]:
"""调用LLM生成兴趣标签"""
try:
logger.info("🔧 配置LLM客户端...")
# 使用llm_api来处理请求
from src.plugin_system.apis import llm_api
from src.config.config import model_config
# 构建完整的提示词明确要求只返回纯JSON
full_prompt = f"""你是一个专业的机器人人设分析师,擅长根据人设描述生成合适的兴趣标签。
{prompt}
请确保返回格式为有效的JSON不要包含任何额外的文本、解释或代码块标记。只返回JSON对象本身。"""
# 使用replyer模型配置
replyer_config = model_config.model_task_config.replyer
# 调用LLM API
logger.info("🚀 正在通过LLM API发送请求...")
success, response, reasoning_content, model_name = await llm_api.generate_with_model(
prompt=full_prompt,
model_config=replyer_config,
request_type="interest_generation",
temperature=0.7,
max_tokens=2000
)
if success and response:
logger.info(f"✅ LLM响应成功模型: {model_name}, 响应长度: {len(response)} 字符")
logger.debug(f"📄 LLM响应内容: {response[:200]}..." if len(response) > 200 else f"📄 LLM响应内容: {response}")
if reasoning_content:
logger.debug(f"🧠 推理内容: {reasoning_content[:100]}...")
# 清理响应内容,移除可能的代码块标记
cleaned_response = self._clean_llm_response(response)
return cleaned_response
else:
logger.warning("⚠️ LLM返回空响应或调用失败")
return None
except Exception as e:
logger.error(f"❌ 调用LLM生成兴趣标签失败: {e}")
logger.error("🔍 错误详情:")
traceback.print_exc()
return None
def _clean_llm_response(self, response: str) -> str:
"""清理LLM响应移除代码块标记和其他非JSON内容"""
import re
# 移除 ```json 和 ``` 标记
cleaned = re.sub(r'```json\s*', '', response)
cleaned = re.sub(r'\s*```', '', cleaned)
# 移除可能的多余空格和换行
cleaned = cleaned.strip()
# 尝试提取JSON对象如果响应中有其他文本
json_match = re.search(r'\{.*\}', cleaned, re.DOTALL)
if json_match:
cleaned = json_match.group(0)
logger.debug(f"🧹 清理后的响应: {cleaned[:200]}..." if len(cleaned) > 200 else f"🧹 清理后的响应: {cleaned}")
return cleaned
async def _generate_embeddings_for_tags(self, interests: BotPersonalityInterests):
"""为所有兴趣标签生成embedding"""
if not hasattr(self, 'embedding_request'):
raise RuntimeError("❌ Embedding客户端未初始化无法生成embedding")
total_tags = len(interests.interest_tags)
logger.info(f"🧠 开始为 {total_tags} 个兴趣标签生成embedding向量...")
cached_count = 0
generated_count = 0
failed_count = 0
for i, tag in enumerate(interests.interest_tags, 1):
if tag.tag_name in self.embedding_cache:
# 使用缓存的embedding
tag.embedding = self.embedding_cache[tag.tag_name]
cached_count += 1
logger.debug(f" [{i}/{total_tags}] 🏷️ '{tag.tag_name}' - 使用缓存")
else:
# 生成新的embedding
embedding_text = tag.tag_name
logger.debug(f" [{i}/{total_tags}] 🔄 正在为 '{tag.tag_name}' 生成embedding...")
embedding = await self._get_embedding(embedding_text)
if embedding:
tag.embedding = embedding
self.embedding_cache[tag.tag_name] = embedding
generated_count += 1
logger.debug(f"'{tag.tag_name}' embedding生成成功")
else:
failed_count += 1
logger.warning(f"'{tag.tag_name}' embedding生成失败")
if failed_count > 0:
raise RuntimeError(f"❌ 有 {failed_count} 个兴趣标签embedding生成失败")
interests.last_updated = datetime.now()
logger.info("=" * 50)
logger.info("✅ Embedding生成完成!")
logger.info(f"📊 总标签数: {total_tags}")
logger.info(f"💾 缓存命中: {cached_count}")
logger.info(f"🆕 新生成: {generated_count}")
logger.info(f"❌ 失败: {failed_count}")
logger.info(f"🗃️ 总缓存大小: {len(self.embedding_cache)}")
logger.info("=" * 50)
async def _get_embedding(self, text: str) -> List[float]:
"""获取文本的embedding向量"""
if not hasattr(self, 'embedding_request'):
raise RuntimeError("❌ Embedding请求客户端未初始化")
# 检查缓存
if text in self.embedding_cache:
logger.debug(f"💾 使用缓存的embedding: '{text[:30]}...'")
return self.embedding_cache[text]
# 使用LLMRequest获取embedding
logger.debug(f"🔄 正在获取embedding: '{text[:30]}...'")
embedding, model_name = await self.embedding_request.get_embedding(text)
if embedding and len(embedding) > 0:
self.embedding_cache[text] = embedding
logger.debug(f"✅ Embedding获取成功维度: {len(embedding)}, 模型: {model_name}")
return embedding
else:
raise RuntimeError(f"❌ 返回的embedding为空: {embedding}")
async def _generate_message_embedding(self, message_text: str, keywords: List[str]) -> List[float]:
"""为消息生成embedding向量"""
# 组合消息文本和关键词作为embedding输入
if keywords:
combined_text = f"{message_text} {' '.join(keywords)}"
else:
combined_text = message_text
logger.debug(f"🔄 正在为消息生成embedding输入长度: {len(combined_text)}")
# 生成embedding
embedding = await self._get_embedding(combined_text)
logger.debug(f"✅ 消息embedding生成成功维度: {len(embedding)}")
return embedding
async def _calculate_similarity_scores(self, result: InterestMatchResult, message_embedding: List[float], keywords: List[str]):
"""计算消息与兴趣标签的相似度分数"""
try:
if not self.current_interests:
return
active_tags = self.current_interests.get_active_tags()
if not active_tags:
return
logger.debug(f"🔍 开始计算与 {len(active_tags)} 个兴趣标签的相似度")
for tag in active_tags:
if tag.embedding:
# 计算余弦相似度
similarity = self._calculate_cosine_similarity(message_embedding, tag.embedding)
weighted_score = similarity * tag.weight
# 设置相似度阈值为0.3
if similarity > 0.3:
result.add_match(tag.tag_name, weighted_score, keywords)
logger.debug(f" 🏷️ '{tag.tag_name}': 相似度={similarity:.3f}, 权重={tag.weight:.2f}, 加权分数={weighted_score:.3f}")
except Exception as e:
logger.error(f"❌ 计算相似度分数失败: {e}")
async def calculate_interest_match(self, message_text: str, keywords: List[str] = None) -> InterestMatchResult:
"""计算消息与机器人兴趣的匹配度"""
if not self.current_interests or not self._initialized:
raise RuntimeError("❌ 兴趣标签系统未初始化")
logger.info("🎯 开始计算兴趣匹配度...")
logger.debug(f"💬 消息长度: {len(message_text)} 字符")
if keywords:
logger.debug(f"🏷️ 关键词数量: {len(keywords)}")
message_id = f"msg_{datetime.now().timestamp()}"
result = InterestMatchResult(message_id=message_id)
# 获取活跃的兴趣标签
active_tags = self.current_interests.get_active_tags()
if not active_tags:
raise RuntimeError("❌ 没有活跃的兴趣标签")
logger.info(f"📊 有 {len(active_tags)} 个活跃兴趣标签参与匹配")
# 生成消息的embedding
logger.debug("🔄 正在生成消息embedding...")
message_embedding = await self._get_embedding(message_text)
logger.debug(f"✅ 消息embedding生成成功维度: {len(message_embedding)}")
# 计算与每个兴趣标签的相似度
match_count = 0
high_similarity_count = 0
similarity_threshold = 0.3
logger.debug(f"🔍 使用相似度阈值: {similarity_threshold}")
for tag in active_tags:
if tag.embedding:
similarity = self._calculate_cosine_similarity(message_embedding, tag.embedding)
weighted_score = similarity * tag.weight
if similarity > similarity_threshold:
match_count += 1
result.add_match(tag.tag_name, weighted_score, [tag.tag_name])
if similarity > 0.7:
high_similarity_count += 1
logger.debug(f" 🏷️ '{tag.tag_name}': 相似度={similarity:.3f}, 权重={tag.weight:.2f}, 加权分数={weighted_score:.3f}")
logger.info(f"📈 匹配统计: {match_count}/{len(active_tags)} 个标签超过阈值")
logger.info(f"🔥 高相似度匹配(>0.7): {high_similarity_count}")
# 计算总体分数
result.calculate_overall_score()
# 确定最佳匹配标签
if result.matched_tags:
top_tag_name = max(result.match_scores.items(), key=lambda x: x[1])[0]
result.top_tag = top_tag_name
logger.info(f"🏆 最佳匹配标签: '{top_tag_name}' (分数: {result.match_scores[top_tag_name]:.3f})")
logger.info(f"📊 最终结果: 总分={result.overall_score:.3f}, 置信度={result.confidence:.3f}, 匹配标签数={len(result.matched_tags)}")
return result
def _calculate_cosine_similarity(self, vec1: List[float], vec2: List[float]) -> float:
"""计算余弦相似度"""
try:
vec1 = np.array(vec1)
vec2 = np.array(vec2)
dot_product = np.dot(vec1, vec2)
norm1 = np.linalg.norm(vec1)
norm2 = np.linalg.norm(vec2)
if norm1 == 0 or norm2 == 0:
return 0.0
return dot_product / (norm1 * norm2)
except Exception as e:
logger.error(f"计算余弦相似度失败: {e}")
return 0.0
async def _load_interests_from_database(self, personality_id: str) -> Optional[BotPersonalityInterests]:
"""从数据库加载兴趣标签"""
try:
logger.info(f"💾 正在从数据库加载兴趣标签personality_id: {personality_id}")
# 导入SQLAlchemy相关模块
from src.common.database.sqlalchemy_models import BotPersonalityInterests as DBBotPersonalityInterests
from src.common.database.sqlalchemy_database_api import get_db_session
import orjson
with get_db_session() as session:
# 查询最新的兴趣标签配置
db_interests = session.query(DBBotPersonalityInterests).filter(
DBBotPersonalityInterests.personality_id == personality_id
).order_by(
DBBotPersonalityInterests.version.desc(),
DBBotPersonalityInterests.last_updated.desc()
).first()
if db_interests:
logger.info(f"✅ 找到数据库中的兴趣标签配置,版本: {db_interests.version}")
logger.debug(f"📅 最后更新时间: {db_interests.last_updated}")
logger.debug(f"🧠 使用的embedding模型: {db_interests.embedding_model}")
# 解析JSON格式的兴趣标签
try:
tags_data = orjson.loads(db_interests.interest_tags)
logger.debug(f"🏷️ 解析到 {len(tags_data)} 个兴趣标签")
# 创建BotPersonalityInterests对象
interests = BotPersonalityInterests(
personality_id=db_interests.personality_id,
personality_description=db_interests.personality_description,
embedding_model=db_interests.embedding_model,
version=db_interests.version,
last_updated=db_interests.last_updated
)
# 解析兴趣标签
for tag_data in tags_data:
tag = BotInterestTag(
tag_name=tag_data.get("tag_name", ""),
weight=tag_data.get("weight", 0.5),
created_at=datetime.fromisoformat(tag_data.get("created_at", datetime.now().isoformat())),
updated_at=datetime.fromisoformat(tag_data.get("updated_at", datetime.now().isoformat())),
is_active=tag_data.get("is_active", True),
embedding=tag_data.get("embedding")
)
interests.interest_tags.append(tag)
logger.info(f"✅ 成功从数据库加载 {len(interests.interest_tags)} 个兴趣标签")
return interests
except (orjson.JSONDecodeError, Exception) as e:
logger.error(f"❌ 解析兴趣标签JSON失败: {e}")
logger.debug(f"🔍 原始JSON数据: {db_interests.interest_tags[:200]}...")
return None
else:
logger.info(f" 数据库中未找到personality_id为 '{personality_id}' 的兴趣标签配置")
return None
except Exception as e:
logger.error(f"❌ 从数据库加载兴趣标签失败: {e}")
logger.error("🔍 错误详情:")
traceback.print_exc()
return None
async def _save_interests_to_database(self, interests: BotPersonalityInterests):
"""保存兴趣标签到数据库"""
try:
logger.info("💾 正在保存兴趣标签到数据库...")
logger.info(f"📋 personality_id: {interests.personality_id}")
logger.info(f"🏷️ 兴趣标签数量: {len(interests.interest_tags)}")
logger.info(f"🔄 版本: {interests.version}")
# 导入SQLAlchemy相关模块
from src.common.database.sqlalchemy_models import BotPersonalityInterests as DBBotPersonalityInterests
from src.common.database.sqlalchemy_database_api import get_db_session
import orjson
# 将兴趣标签转换为JSON格式
tags_data = []
for tag in interests.interest_tags:
tag_dict = {
"tag_name": tag.tag_name,
"weight": tag.weight,
"created_at": tag.created_at.isoformat(),
"updated_at": tag.updated_at.isoformat(),
"is_active": tag.is_active,
"embedding": tag.embedding
}
tags_data.append(tag_dict)
# 序列化为JSON
json_data = orjson.dumps(tags_data)
with get_db_session() as session:
# 检查是否已存在相同personality_id的记录
existing_record = session.query(DBBotPersonalityInterests).filter(
DBBotPersonalityInterests.personality_id == interests.personality_id
).first()
if existing_record:
# 更新现有记录
logger.info("🔄 更新现有的兴趣标签配置")
existing_record.interest_tags = json_data
existing_record.personality_description = interests.personality_description
existing_record.embedding_model = interests.embedding_model
existing_record.version = interests.version
existing_record.last_updated = interests.last_updated
logger.info(f"✅ 成功更新兴趣标签配置,版本: {interests.version}")
else:
# 创建新记录
logger.info("🆕 创建新的兴趣标签配置")
new_record = DBBotPersonalityInterests(
personality_id=interests.personality_id,
personality_description=interests.personality_description,
interest_tags=json_data,
embedding_model=interests.embedding_model,
version=interests.version,
last_updated=interests.last_updated
)
session.add(new_record)
logger.info(f"✅ 成功创建兴趣标签配置,版本: {interests.version}")
logger.info("✅ 兴趣标签已成功保存到数据库")
except Exception as e:
logger.error(f"❌ 保存兴趣标签到数据库失败: {e}")
logger.error("🔍 错误详情:")
traceback.print_exc()
def get_current_interests(self) -> Optional[BotPersonalityInterests]:
"""获取当前的兴趣标签配置"""
return self.current_interests
def get_interest_stats(self) -> Dict[str, Any]:
"""获取兴趣系统统计信息"""
if not self.current_interests:
return {"initialized": False}
active_tags = self.current_interests.get_active_tags()
return {
"initialized": self._initialized,
"total_tags": len(active_tags),
"embedding_model": self.current_interests.embedding_model,
"last_updated": self.current_interests.last_updated.isoformat(),
"cache_size": len(self.embedding_cache)
}
async def update_interest_tags(self, new_personality_description: str = None):
"""更新兴趣标签"""
try:
if not self.current_interests:
logger.warning("没有当前的兴趣标签配置,无法更新")
return
if new_personality_description:
self.current_interests.personality_description = new_personality_description
# 重新生成兴趣标签
new_interests = await self._generate_interests_from_personality(
self.current_interests.personality_description,
self.current_interests.personality_id
)
if new_interests:
new_interests.version = self.current_interests.version + 1
self.current_interests = new_interests
await self._save_interests_to_database(new_interests)
logger.info(f"兴趣标签已更新,版本: {new_interests.version}")
except Exception as e:
logger.error(f"更新兴趣标签失败: {e}")
traceback.print_exc()
# 创建全局实例(重新创建以包含新的属性)
bot_interest_manager = BotInterestManager()

View File

@@ -6,6 +6,7 @@ import asyncio
import time
from typing import Dict, List
from src.config.config import global_config
from src.chat.planner_actions.action_manager import ActionManager
from src.common.data_models.info_data_model import Plan, ActionPlannerInfo
from src.common.logger import get_logger
@@ -122,6 +123,16 @@ class PlanExecutor:
try:
logger.info(f"执行回复动作: {action_info.action_type}, 原因: {action_info.reasoning}")
if action_info.action_message.get("user_id","") == str(global_config.bot.qq_account):
logger.warning("尝试回复自己,跳过此动作以防止死循环。")
return {
"action_type": action_info.action_type,
"success": False,
"error_message": "尝试回复自己,跳过此动作以防止死循环。",
"execution_time": 0,
"reasoning": action_info.reasoning,
"reply_content": "",
}
# 构建回复动作参数
action_params = {
"chat_id": plan.chat_id,

View File

@@ -97,7 +97,7 @@ class ActionPlanner:
# 2. 兴趣度评分 - 只对未读消息进行评分
if unread_messages:
bot_nickname = global_config.bot.nickname
interest_scores = self.interest_scoring.calculate_interest_scores(
interest_scores = await self.interest_scoring.calculate_interest_scores(
unread_messages, bot_nickname
)
@@ -175,33 +175,14 @@ class ActionPlanner:
return final_actions_dict, final_target_message_dict
def _build_return_result(self, plan: Plan) -> Tuple[List[Dict], Optional[Dict]]:
"""构建返回结果"""
final_actions = plan.decided_actions or []
final_target_message = next(
(act.action_message for act in final_actions if act.action_message), None
)
final_actions_dict = [asdict(act) for act in final_actions]
if final_target_message:
if hasattr(final_target_message, '__dataclass_fields__'):
final_target_message_dict = asdict(final_target_message)
else:
final_target_message_dict = final_target_message
else:
final_target_message_dict = None
return final_actions_dict, final_target_message_dict
def get_user_relationship(self, user_id: str) -> float:
"""获取用户关系分"""
return self.interest_scoring.get_user_relationship(user_id)
def update_interest_keywords(self, new_keywords: Dict[str, List[str]]):
"""更新兴趣关键词"""
self.interest_scoring.interest_keywords.update(new_keywords)
logger.info(f"已更新兴趣关键词: {list(new_keywords.keys())}")
"""更新兴趣关键词(已弃用,仅保留用于兼容性)"""
logger.info("传统关键词匹配已移除,此方法仅保留用于兼容性")
# 此方法已弃用因为现在完全使用embedding匹配
def get_planner_stats(self) -> Dict[str, any]:
"""获取规划器统计"""
@@ -226,5 +207,4 @@ class ActionPlanner:
}
# 全局兴趣度评分系统实例
interest_scoring_system = InterestScoringSystem()
# 全局兴趣度评分系统实例 - 在 individuality 模块中创建

View File

@@ -0,0 +1,132 @@
"""
机器人兴趣标签数据模型
定义机器人的兴趣标签和相关的embedding数据结构
"""
from dataclasses import dataclass, field
from typing import List, Dict, Optional, Any
from datetime import datetime
from . import BaseDataModel
@dataclass
class BotInterestTag(BaseDataModel):
"""机器人兴趣标签"""
tag_name: str
weight: float = 1.0 # 权重,表示对这个兴趣的喜好程度 (0.0-1.0)
embedding: Optional[List[float]] = None # 标签的embedding向量
created_at: datetime = field(default_factory=datetime.now)
updated_at: datetime = field(default_factory=datetime.now)
is_active: bool = True
def to_dict(self) -> Dict[str, Any]:
"""转换为字典格式"""
return {
"tag_name": self.tag_name,
"weight": self.weight,
"embedding": self.embedding,
"created_at": self.created_at.isoformat(),
"updated_at": self.updated_at.isoformat(),
"is_active": self.is_active
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "BotInterestTag":
"""从字典创建对象"""
return cls(
tag_name=data["tag_name"],
weight=data.get("weight", 1.0),
embedding=data.get("embedding"),
created_at=datetime.fromisoformat(data["created_at"]) if data.get("created_at") else datetime.now(),
updated_at=datetime.fromisoformat(data["updated_at"]) if data.get("updated_at") else datetime.now(),
is_active=data.get("is_active", True)
)
@dataclass
class BotPersonalityInterests(BaseDataModel):
"""机器人人格化兴趣配置"""
personality_id: str
personality_description: str # 人设描述文本
interest_tags: List[BotInterestTag] = field(default_factory=list)
embedding_model: str = "text-embedding-ada-002" # 使用的embedding模型
last_updated: datetime = field(default_factory=datetime.now)
version: int = 1 # 版本号,用于追踪更新
def get_active_tags(self) -> List[BotInterestTag]:
"""获取活跃的兴趣标签"""
return [tag for tag in self.interest_tags if tag.is_active]
def to_dict(self) -> Dict[str, Any]:
"""转换为字典格式"""
return {
"personality_id": self.personality_id,
"personality_description": self.personality_description,
"interest_tags": [tag.to_dict() for tag in self.interest_tags],
"embedding_model": self.embedding_model,
"last_updated": self.last_updated.isoformat(),
"version": self.version
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "BotPersonalityInterests":
"""从字典创建对象"""
return cls(
personality_id=data["personality_id"],
personality_description=data["personality_description"],
interest_tags=[BotInterestTag.from_dict(tag_data) for tag_data in data.get("interest_tags", [])],
embedding_model=data.get("embedding_model", "text-embedding-ada-002"),
last_updated=datetime.fromisoformat(data["last_updated"]) if data.get("last_updated") else datetime.now(),
version=data.get("version", 1)
)
@dataclass
class InterestMatchResult(BaseDataModel):
"""兴趣匹配结果"""
message_id: str
matched_tags: List[str] = field(default_factory=list)
match_scores: Dict[str, float] = field(default_factory=dict) # tag_name -> score
overall_score: float = 0.0
top_tag: Optional[str] = None
confidence: float = 0.0 # 匹配置信度 (0.0-1.0)
matched_keywords: List[str] = field(default_factory=list)
def add_match(self, tag_name: str, score: float, keywords: List[str] = None):
"""添加匹配结果"""
self.matched_tags.append(tag_name)
self.match_scores[tag_name] = score
if keywords:
self.matched_keywords.extend(keywords)
def calculate_overall_score(self):
"""计算总体匹配分数"""
if not self.match_scores:
self.overall_score = 0.0
self.top_tag = None
return
# 使用加权平均计算总体分数
total_weight = len(self.match_scores)
if total_weight > 0:
self.overall_score = sum(self.match_scores.values()) / total_weight
# 设置最佳匹配标签
self.top_tag = max(self.match_scores.items(), key=lambda x: x[1])[0]
else:
self.overall_score = 0.0
self.top_tag = None
# 计算置信度(基于匹配标签数量和分数分布)
if len(self.match_scores) > 0:
avg_score = self.overall_score
score_variance = sum((score - avg_score) ** 2 for score in self.match_scores.values()) / len(self.match_scores)
# 分数越集中,置信度越高
self.confidence = max(0.0, 1.0 - score_variance)
else:
self.confidence = 0.0
def get_top_matches(self, top_n: int = 3) -> List[tuple]:
"""获取前N个最佳匹配"""
sorted_matches = sorted(self.match_scores.items(), key=lambda x: x[1], reverse=True)
return sorted_matches[:top_n]

View File

@@ -300,6 +300,26 @@ class PersonInfo(Base):
)
class BotPersonalityInterests(Base):
"""机器人人格兴趣标签模型"""
__tablename__ = "bot_personality_interests"
id = Column(Integer, primary_key=True, autoincrement=True)
personality_id = Column(get_string_field(100), nullable=False, index=True)
personality_description = Column(Text, nullable=False)
interest_tags = Column(Text, nullable=False) # JSON格式存储的兴趣标签列表
embedding_model = Column(get_string_field(100), nullable=False, default="text-embedding-ada-002")
version = Column(Integer, nullable=False, default=1)
last_updated = Column(DateTime, nullable=False, default=datetime.datetime.now, index=True)
__table_args__ = (
Index("idx_botpersonality_personality_id", "personality_id"),
Index("idx_botpersonality_version", "version"),
Index("idx_botpersonality_last_updated", "last_updated"),
)
class Memory(Base):
"""记忆模型"""

View File

@@ -60,6 +60,35 @@ class Individuality:
else:
logger.error("人设构建失败")
# 初始化智能兴趣系统
await self._initialize_smart_interest_system(personality_result, identity_result)
# 如果任何一个发生变化都需要清空数据库中的info_list因为这影响整体人设
if personality_changed or identity_changed:
logger.info("将清空数据库中原有的关键词缓存")
update_data = {
"platform": "system",
"user_id": "bot_id",
"person_name": self.name,
"nickname": self.name,
}
await person_info_manager.update_one_field(self.bot_person_id, "info_list", [], data=update_data)
async def _initialize_smart_interest_system(self, personality_result: str, identity_result: str):
"""初始化智能兴趣系统"""
# 组合完整的人设描述
full_personality = f"{personality_result}{identity_result}"
# 获取全局兴趣评分系统实例
from src.chat.affinity_flow.interest_scoring import interest_scoring_system
# 初始化智能兴趣系统
await interest_scoring_system.initialize_smart_interests(
personality_description=full_personality,
personality_id=self.bot_person_id
)
logger.info("智能兴趣系统初始化完成")
async def get_personality_block(self) -> str:
bot_name = global_config.bot.nickname

235
消息处理流程.md Normal file
View File

@@ -0,0 +1,235 @@
# 从消息接收到执行Action的完整流程图
## 整体流程概览
```mermaid
flowchart TD
A[原始消息数据] --> B[消息接收层<br>src/chat/message_receive/bot.py]
B --> C[消息解析<br>src/chat/message_receive/message.py]
C --> D[会话管理<br>src/chat/message_receive/chat_stream.py]
D --> E[亲和力流分发<br>src/chat/affinity_flow/afc_manager.py]
E --> F[聊天处理器<br>src/chat/affinity_flow/chatter.py]
F --> G[智能规划决策<br>三层架构]
G --> H[动作执行管理<br>src/chat/planner_actions/action_manager.py]
H --> I[最终执行<br>src/chat/planner_actions/plan_executor.py]
I --> J[Action执行结果]
```
## 详细分阶段流程图
### 1. 消息接收与预处理阶段
```mermaid
flowchart TD
A[原始消息数据] --> B[message_process入口]
B --> C{消息切片重组}
C -- 完整消息 --> D[平台类型判断]
C -- 切片消息 --> E[等待更多切片]
D --> F{S4U平台?}
F -- 是 --> G[S4U特殊处理]
F -- 否 --> H[创建MessageRecv对象]
H --> I[过滤检查<br>违禁词/正则]
I --> J[命令处理系统]
J --> K{PlusCommand?}
K -- 是 --> L[执行PlusCommand]
K -- 否 --> M[执行BaseCommand]
L --> N[事件触发]
M --> N
N --> O[模板处理]
O --> P[预处理完成]
```
### 2. 消息解析阶段
```mermaid
flowchart TD
A[预处理完成消息] --> B[MessageRecv.process]
B --> C{消息类型判断}
C -- 文本 --> D[直接提取文本]
C -- 图片 --> E[图片识别处理]
C -- 表情 --> F[表情包描述]
C -- 语音 --> G[语音转文本]
C -- 视频 --> H[视频内容分析]
C -- AT消息 --> I[提取用户信息]
C -- 其他 --> J[通用处理]
D --> K[生成纯文本]
E --> K
F --> K
G --> K
H --> K
I --> K
J --> K
K --> L[消息解析完成]
```
### 3. 会话管理阶段
```mermaid
flowchart TD
A[解析后消息] --> B[ChatManager.register_message]
B --> C[生成stream_id<br>platform+user+group]
C --> D{会话是否存在?}
D -- 内存中存在 --> E[获取现有会话]
D -- 内存中不存在 --> F[数据库查询]
F --> G{数据库存在?}
G -- 是 --> H[从数据库加载]
G -- 否 --> I[创建新会话]
H --> J[更新会话信息]
I --> J
J --> K[设置消息上下文]
K --> L[会话管理完成]
```
### 4. 智能规划决策阶段(三层架构)
```mermaid
flowchart TD
A[会话管理完成] --> B[规划器入口 ActionPlanner]
B --> C[PlanGenerator生成初始Plan]
C --> D[兴趣度评分系统]
D --> E[提取未读消息]
E --> F[计算多维评分]
F --> G[兴趣匹配度]
F --> H[用户关系分]
F --> I[提及度评分]
G --> J[加权总分计算]
H --> J
I --> J
J --> K{是否回复?}
K -- 是 --> L[保留reply动作]
K -- 否 --> M[移除reply动作]
L --> N[PlanFilter筛选]
M --> N
N --> O[LLM决策最终动作]
O --> P[规划决策完成]
```
### 5. 动作执行阶段
```mermaid
flowchart TD
A[规划决策完成] --> B[ActionManager执行]
B --> C{动作类型判断}
C -- no_action --> D[记录不动作]
C -- no_reply --> E[记录不回复]
C -- reply --> F[生成回复内容]
C -- 其他动作 --> G[执行具体动作]
D --> H[执行完成]
E --> H
F --> I[发送回复消息]
G --> J[动作处理器执行]
I --> H
J --> H
H --> K[PlanExecutor最终执行]
K --> L[用户关系追踪]
L --> M[执行统计记录]
M --> N[动作执行完成]
```
## 完整端到端流程
```mermaid
flowchart LR
A[消息接收] --> B[消息解析]
B --> C[会话管理]
C --> D[消息分发]
D --> E[聊天处理]
E --> F[兴趣度评分]
F --> G[规划生成]
G --> H[LLM筛选]
H --> I[动作管理]
I --> J[最终执行]
J --> K[结果返回]
subgraph 智能决策层
F
G
H
end
subgraph 执行层
I
J
K
end
style 智能决策层 fill:#e1f5fe
style 执行层 fill:#f3e5f5
```
## 关键组件交互关系
```mermaid
flowchart TD
Bot[Bot.message_process] --> Message[MessageRecv]
Message --> ChatManager[ChatManager]
ChatManager --> AFCManager[AFCManager]
AFCManager --> Chatter[AffinityFlowChatter]
Chatter --> Planner[ActionPlanner]
Planner --> Generator[PlanGenerator]
Planner --> Scorer[InterestScoringSystem]
Planner --> Filter[PlanFilter]
Filter --> ActionManager[ActionManager]
ActionManager --> Executor[PlanExecutor]
Executor --> Result[执行结果]
%% 数据流
Message -.-> |消息数据| Chatter
Scorer -.-> |兴趣评分| Filter
Generator -.-> |初始Plan| Filter
Filter -.-> |最终Plan| Executor
```
## 异常处理流程
```mermaid
flowchart TD
A[开始处理] --> B[正常流程]
B --> C[处理完成]
B --> D{发生异常?}
D -- 是 --> E[异常捕获]
D -- 否 --> C
E --> F[日志记录错误]
F --> G[错误类型判断]
G -- 消息解析失败 --> H[返回解析错误]
G -- 会话不存在 --> I[创建新会话重试]
G -- LLM决策失败 --> J[使用默认动作]
G -- 动作执行失败 --> K[动作回退机制]
G -- 其他错误 --> L[返回通用错误]
H --> M[异常处理完成]
I --> B
J --> M
K --> M
L --> M
```
这个流程图详细展示了从消息接收到执行action的完整流程包括各个阶段的处理逻辑、组件交互关系以及异常处理机制。整个系统采用了模块化设计具有清晰的职责分离和良好的可扩展性。