diff --git a/src/chat/affinity_flow/interest_scoring.py b/src/chat/affinity_flow/interest_scoring.py index 6e44ccd21..037806a34 100644 --- a/src/chat/affinity_flow/interest_scoring.py +++ b/src/chat/affinity_flow/interest_scoring.py @@ -1,12 +1,14 @@ """ 兴趣度评分系统 基于多维度评分机制,包括兴趣匹配度、用户关系分、提及度和时间因子 +现在使用embedding计算智能兴趣匹配 """ -from datetime import datetime -from typing import Dict, List +import traceback +from typing import Dict, List, Any from src.common.data_models.database_data_model import DatabaseMessages from src.common.data_models.info_data_model import InterestScore +from src.chat.interest_system import bot_interest_manager from src.common.logger import get_logger from src.config.config import global_config @@ -17,15 +19,8 @@ class InterestScoringSystem: """兴趣度评分系统""" def __init__(self): - self.interest_keywords = { - "游戏": ["游戏", "原神", "米哈游", "抽卡", "角色", "装备", "任务", "副本", "PVP", "LOL", "王者荣耀", "吃鸡"], - "动漫": ["动漫", "二次元", "新番", "番剧", "漫画", "角色", "声优", "OP", "ED"], - "音乐": ["音乐", "歌曲", "歌手", "专辑", "演唱会", "乐器", "作词", "作曲"], - "电影": ["电影", "电视剧", "综艺", "演员", "导演", "剧情", "影评", "票房"], - "科技": ["科技", "AI", "人工智能", "编程", "Python", "代码", "软件", "硬件", "手机"], - "生活": ["生活", "日常", "美食", "旅行", "天气", "工作", "学习", "健身"], - "情感": ["情感", "心情", "感情", "恋爱", "友情", "家人", "开心", "难过", "生气"], - } + # 智能兴趣匹配配置 + self.use_smart_matching = True # 评分权重 self.score_weights = { @@ -46,30 +41,51 @@ class InterestScoringSystem: # 用户关系数据 self.user_relationships: Dict[str, float] = {} # user_id -> relationship_score - def calculate_interest_scores(self, messages: List[DatabaseMessages], bot_nickname: str) -> List[InterestScore]: + async def calculate_interest_scores(self, messages: List[DatabaseMessages], bot_nickname: str) -> List[InterestScore]: """计算消息的兴趣度评分""" - scores = [] + logger.info("🚀 开始计算消息兴趣度评分...") + logger.info(f"📨 收到 {len(messages)} 条消息") + # 通过 user_id 判断是否是用户消息(非机器人发送的消息) user_messages = [msg for msg in messages if str(msg.user_info.user_id) != str(global_config.bot.qq_account)] + logger.info(f"👤 过滤出 {len(user_messages)} 条用户消息") - for msg in user_messages: - score = self._calculate_single_message_score(msg, bot_nickname) + scores = [] + for i, msg in enumerate(user_messages, 1): + logger.info(f"📋 [{i}/{len(user_messages)}] 处理消息 ID: {msg.message_id}") + score = await self._calculate_single_message_score(msg, bot_nickname) scores.append(score) + logger.info(f"✅ 兴趣度评分计算完成,生成 {len(scores)} 个评分") return scores - def _calculate_single_message_score(self, message: DatabaseMessages, bot_nickname: str) -> InterestScore: + async def _calculate_single_message_score(self, message: DatabaseMessages, bot_nickname: str) -> InterestScore: """计算单条消息的兴趣度评分""" - # 1. 计算兴趣匹配度 - interest_match_score = self._calculate_interest_match_score(message.processed_plain_text) + logger.info(f"🎯 计算消息 {message.message_id} 的兴趣度评分...") + logger.debug(f"📝 消息长度: {len(message.processed_plain_text)} 字符") + + # 提取关键词(从数据库的反序列化字段) + logger.debug("🔍 提取关键词...") + keywords = self._extract_keywords_from_database(message) + logger.debug(f"🏷️ 提取到 {len(keywords)} 个关键词") + + # 1. 计算兴趣匹配度(现在是异步的) + logger.debug("🧠 计算兴趣匹配度...") + interest_match_score = await self._calculate_interest_match_score(message.processed_plain_text, keywords) + logger.debug(f"📊 兴趣匹配度: {interest_match_score:.3f}") # 2. 计算关系分 + logger.debug("🤝 计算关系分...") relationship_score = self._calculate_relationship_score(message.user_info.user_id) + logger.debug(f"💝 关系分: {relationship_score:.3f}") # 3. 计算提及分数 + logger.debug("📢 计算提及分数...") mentioned_score = self._calculate_mentioned_score(message, bot_nickname) + logger.debug(f"📣 提及分数: {mentioned_score:.3f}") - # 5. 计算总分 + # 4. 计算总分 + logger.debug("🧮 计算加权总分...") total_score = ( interest_match_score * self.score_weights["interest_match"] + relationship_score * self.score_weights["relationship"] + @@ -77,11 +93,15 @@ class InterestScoringSystem: ) details = { - "interest_match": f"兴趣匹配度: {interest_match_score:.2f}", - "relationship": f"关系分: {relationship_score:.2f}", - "mentioned": f"提及分数: {mentioned_score:.2f}", + "interest_match": f"兴趣匹配度: {interest_match_score:.3f}", + "relationship": f"关系分: {relationship_score:.3f}", + "mentioned": f"提及分数: {mentioned_score:.3f}", } + logger.info(f"📈 消息 {message.message_id} 最终评分: {total_score:.3f}") + logger.debug(f"⚖️ 评分权重: {self.score_weights}") + logger.debug(f"📋 评分详情: {details}") + return InterestScore( message_id=message.message_id, total_score=total_score, @@ -91,32 +111,107 @@ class InterestScoringSystem: details=details ) - def _calculate_interest_match_score(self, content: str) -> float: - """计算兴趣匹配度""" + async def _calculate_interest_match_score(self, content: str, keywords: List[str] = None) -> float: + """计算兴趣匹配度 - 使用智能embedding匹配""" if not content: return 0.0 - content_lower = content.lower() - max_score = 0.0 + # 使用智能匹配(embedding) + if self.use_smart_matching and bot_interest_manager.is_initialized: + return await self._calculate_smart_interest_match(content, keywords) + else: + # 智能匹配未初始化,返回默认分数 + logger.warning("智能兴趣匹配系统未初始化,返回默认分数") + return 0.3 - for _category, keywords in self.interest_keywords.items(): - category_score = 0.0 - matched_keywords = [] + async def _calculate_smart_interest_match(self, content: str, keywords: List[str] = None) -> float: + """使用embedding计算智能兴趣匹配""" + try: + logger.debug("🧠 开始智能兴趣匹配计算...") - for keyword in keywords: - if keyword.lower() in content_lower: - category_score += 0.1 - matched_keywords.append(keyword) + # 如果没有传入关键词,则提取 + if not keywords: + logger.debug("🔍 从内容中提取关键词...") + keywords = self._extract_keywords_from_content(content) + logger.debug(f"🏷️ 提取到 {len(keywords)} 个关键词") - # 如果匹配到多个关键词,增加额外分数 - if len(matched_keywords) > 1: - category_score += (len(matched_keywords) - 1) * 0.05 + # 使用机器人兴趣管理器计算匹配度 + logger.debug("🤖 调用机器人兴趣管理器计算匹配度...") + match_result = await bot_interest_manager.calculate_interest_match(content, keywords) - # 限制每个类别的最高分 - category_score = min(category_score, 0.8) - max_score = max(max_score, category_score) + if match_result: + logger.debug("✅ 智能兴趣匹配成功:") + logger.debug(f" 📊 总分: {match_result.overall_score:.3f}") + logger.debug(f" 🏷️ 匹配标签: {match_result.matched_tags}") + logger.debug(f" 🎯 最佳标签: {match_result.top_tag}") + logger.debug(f" 📈 置信度: {match_result.confidence:.3f}") + logger.debug(f" 🔢 匹配详情: {match_result.match_scores}") - return min(max_score, 1.0) + # 返回匹配分数,考虑置信度 + final_score = match_result.overall_score * match_result.confidence + logger.debug(f"⚖️ 最终分数(总分×置信度): {final_score:.3f}") + return final_score + else: + logger.warning("⚠️ 智能兴趣匹配未返回结果") + return 0.0 + + except Exception as e: + logger.error(f"❌ 智能兴趣匹配计算失败: {e}") + logger.debug("🔍 错误详情:") + logger.debug(f" 💬 内容长度: {len(content)} 字符") + logger.debug(f" 🏷️ 关键词数量: {len(keywords) if keywords else 0}") + return 0.0 + + def _extract_keywords_from_database(self, message: DatabaseMessages) -> List[str]: + """从数据库消息中提取关键词""" + keywords = [] + + # 尝试从 key_words 字段提取(存储的是JSON字符串) + if message.key_words: + try: + import orjson + keywords = orjson.loads(message.key_words) + if not isinstance(keywords, list): + keywords = [] + except (orjson.JSONDecodeError, TypeError): + keywords = [] + + # 如果没有 keywords,尝试从 key_words_lite 提取 + if not keywords and message.key_words_lite: + try: + import orjson + keywords = orjson.loads(message.key_words_lite) + if not isinstance(keywords, list): + keywords = [] + except (orjson.JSONDecodeError, TypeError): + keywords = [] + + # 如果还是没有,从消息内容中提取(降级方案) + if not keywords: + keywords = self._extract_keywords_from_content(message.processed_plain_text) + + return keywords[:15] # 返回前15个关键词 + + def _extract_keywords_from_content(self, content: str) -> List[str]: + """从内容中提取关键词(降级方案)""" + import re + + # 清理文本 + content = re.sub(r'[^\w\s\u4e00-\u9fff]', ' ', content) # 保留中文、英文、数字 + words = content.split() + + # 过滤和关键词提取 + keywords = [] + for word in words: + word = word.strip() + if (len(word) >= 2 and # 至少2个字符 + word.isalnum() and # 字母数字 + not word.isdigit()): # 不是纯数字 + keywords.append(word.lower()) + + # 去重并限制数量 + unique_keywords = list(set(keywords)) + return unique_keywords[:10] # 返回前10个唯一关键词 def _calculate_relationship_score(self, user_id: str) -> float: """计算关系分""" @@ -137,40 +232,69 @@ class InterestScoringSystem: def should_reply(self, score: InterestScore) -> bool: """判断是否应该回复""" + logger.info("🤔 评估是否应该回复...") + logger.debug("📊 评分详情:") + logger.debug(f" 📝 消息ID: {score.message_id}") + logger.debug(f" 💯 总分: {score.total_score:.3f}") + logger.debug(f" 🧠 兴趣匹配: {score.interest_match_score:.3f}") + logger.debug(f" 🤝 关系分: {score.relationship_score:.3f}") + logger.debug(f" 📢 提及分: {score.mentioned_score:.3f}") + base_threshold = self.reply_threshold + logger.debug(f"📋 基础阈值: {base_threshold:.3f}") # 如果被提及,降低阈值 if score.mentioned_score >= 1.0: base_threshold = self.mention_threshold + logger.debug(f"📣 消息提及了机器人,使用降低阈值: {base_threshold:.3f}") # 计算连续不回复的概率提升 probability_boost = min(self.no_reply_count * self.probability_boost_per_no_reply, 0.8) effective_threshold = base_threshold - probability_boost - logger.debug(f"评分决策: 总分={score.total_score:.2f}, 有效阈值={effective_threshold:.2f}, 连续不回复次数={self.no_reply_count}") + logger.debug("📈 连续不回复统计:") + logger.debug(f" 🚫 不回复次数: {self.no_reply_count}") + logger.debug(f" 📈 概率提升: {probability_boost:.3f}") + logger.debug(f" 🎯 有效阈值: {effective_threshold:.3f}") - return score.total_score >= effective_threshold + # 做出决策 + should_reply = score.total_score >= effective_threshold + decision = "✅ 应该回复" if should_reply else "❌ 不回复" + + logger.info(f"🎯 回复决策: {decision}") + logger.info(f"📊 决策依据: {score.total_score:.3f} {'>=' if should_reply else '<'} {effective_threshold:.3f}") + + return should_reply def record_reply_action(self, did_reply: bool): """记录回复动作""" + old_count = self.no_reply_count + if did_reply: self.no_reply_count = max(0, self.no_reply_count - 1) + action = "✅ 回复了消息" else: self.no_reply_count += 1 + action = "❌ 选择不回复" # 限制最大计数 self.no_reply_count = min(self.no_reply_count, self.max_no_reply_count) - logger.debug(f"回复动作记录: {did_reply}, 当前连续不回复次数: {self.no_reply_count}") + logger.info(f"📊 记录回复动作: {action}") + logger.info(f"📈 连续不回复次数: {old_count} → {self.no_reply_count}") + logger.debug(f"📋 最大限制: {self.max_no_reply_count} 次") def update_user_relationship(self, user_id: str, relationship_change: float): """更新用户关系""" - if user_id in self.user_relationships: - self.user_relationships[user_id] = max(0.0, min(1.0, self.user_relationships[user_id] + relationship_change)) - else: - self.user_relationships[user_id] = max(0.0, min(1.0, relationship_change)) + old_score = self.user_relationships.get(user_id, 0.3) # 默认新用户分数 + new_score = max(0.0, min(1.0, old_score + relationship_change)) - logger.debug(f"更新用户关系: {user_id} -> {self.user_relationships[user_id]:.2f}") + self.user_relationships[user_id] = new_score + + change_direction = "📈" if relationship_change > 0 else "📉" if relationship_change < 0 else "➖" + logger.info(f"{change_direction} 更新用户关系: {user_id}") + logger.info(f"💝 关系分: {old_score:.3f} → {new_score:.3f} (变化: {relationship_change:+.3f})") + logger.debug(f"👥 当前追踪用户数: {len(self.user_relationships)}") def get_user_relationship(self, user_id: str) -> float: """获取用户关系分""" @@ -184,33 +308,44 @@ class InterestScoringSystem: "reply_threshold": self.reply_threshold, "mention_threshold": self.mention_threshold, "user_relationships": len(self.user_relationships), - "interest_categories": len(self.interest_keywords), } - def add_interest_category(self, category: str, keywords: List[str]): - """添加新的兴趣类别""" - self.interest_keywords[category] = keywords - logger.info(f"添加新的兴趣类别: {category}, 关键词数量: {len(keywords)}") - - def remove_interest_category(self, category: str): - """移除兴趣类别""" - if category in self.interest_keywords: - del self.interest_keywords[category] - logger.info(f"移除兴趣类别: {category}") - - def update_interest_keywords(self, category: str, keywords: List[str]): - """更新兴趣类别的关键词""" - if category in self.interest_keywords: - self.interest_keywords[category] = keywords - logger.info(f"更新兴趣类别 {category} 的关键词: {len(keywords)}") - else: - self.add_interest_category(category, keywords) - - def get_interest_keywords(self) -> Dict[str, List[str]]: - """获取所有兴趣关键词""" - return self.interest_keywords.copy() def reset_stats(self): """重置统计信息""" self.no_reply_count = 0 - logger.info("重置兴趣度评分系统统计") \ No newline at end of file + logger.info("重置兴趣度评分系统统计") + + async def initialize_smart_interests(self, personality_description: str, personality_id: str = "default"): + """初始化智能兴趣系统""" + try: + logger.info("🚀 开始初始化智能兴趣系统...") + logger.info(f"📋 人设ID: {personality_id}") + logger.info(f"📝 人设描述长度: {len(personality_description)} 字符") + + await bot_interest_manager.initialize(personality_description, personality_id) + logger.info("✅ 智能兴趣系统初始化完成") + + # 显示初始化后的统计信息 + stats = bot_interest_manager.get_interest_stats() + logger.info("📊 兴趣系统统计:") + logger.info(f" 🏷️ 总标签数: {stats.get('total_tags', 0)}") + logger.info(f" 💾 缓存大小: {stats.get('cache_size', 0)}") + logger.info(f" 🧠 模型: {stats.get('embedding_model', '未知')}") + + except Exception as e: + logger.error(f"❌ 初始化智能兴趣系统失败: {e}") + logger.error("🔍 错误详情:") + traceback.print_exc() + + def get_matching_config(self) -> Dict[str, Any]: + """获取匹配配置信息""" + return { + "use_smart_matching": self.use_smart_matching, + "smart_system_initialized": bot_interest_manager.is_initialized, + "smart_system_stats": bot_interest_manager.get_interest_stats() if bot_interest_manager.is_initialized else None + } + + +# 创建全局兴趣评分系统实例 +interest_scoring_system = InterestScoringSystem() \ No newline at end of file diff --git a/src/chat/interest_system/__init__.py b/src/chat/interest_system/__init__.py new file mode 100644 index 000000000..3fe14e7bf --- /dev/null +++ b/src/chat/interest_system/__init__.py @@ -0,0 +1,17 @@ +""" +机器人兴趣标签系统 +基于人设生成兴趣标签,使用embedding计算匹配度 +""" + +from .bot_interest_manager import BotInterestManager, bot_interest_manager +from src.common.data_models.bot_interest_data_model import ( + BotInterestTag, BotPersonalityInterests, InterestMatchResult +) + +__all__ = [ + "BotInterestManager", + "bot_interest_manager", + "BotInterestTag", + "BotPersonalityInterests", + "InterestMatchResult" +] \ No newline at end of file diff --git a/src/chat/interest_system/bot_interest_manager.py b/src/chat/interest_system/bot_interest_manager.py new file mode 100644 index 000000000..520bf9033 --- /dev/null +++ b/src/chat/interest_system/bot_interest_manager.py @@ -0,0 +1,666 @@ +""" +机器人兴趣标签管理系统 +基于人设生成兴趣标签,并使用embedding计算匹配度 +""" +import orjson +import traceback +from typing import List, Dict, Optional, Any +from datetime import datetime +import numpy as np + +from src.common.logger import get_logger +from src.common.data_models.bot_interest_data_model import ( + BotPersonalityInterests, BotInterestTag, InterestMatchResult +) + +logger = get_logger("bot_interest_manager") + + +class BotInterestManager: + """机器人兴趣标签管理器""" + + def __init__(self): + self.current_interests: Optional[BotPersonalityInterests] = None + self.embedding_cache: Dict[str, List[float]] = {} # embedding缓存 + self._initialized = False + + # Embedding客户端配置 + self.embedding_request = None + self.embedding_config = None + self.embedding_dimension = 1024 # 默认BGE-M3 embedding维度 + + @property + def is_initialized(self) -> bool: + """检查兴趣系统是否已初始化""" + return self._initialized + + async def initialize(self, personality_description: str, personality_id: str = "default"): + """初始化兴趣标签系统""" + try: + logger.info("=" * 60) + logger.info("🚀 开始初始化机器人兴趣标签系统") + logger.info(f"📋 人设ID: {personality_id}") + logger.info(f"📝 人设描述长度: {len(personality_description)} 字符") + logger.info("=" * 60) + + # 初始化embedding模型 + logger.info("🧠 正在初始化embedding模型...") + await self._initialize_embedding_model() + + # 检查embedding客户端是否成功初始化 + if not self.embedding_request: + raise RuntimeError("❌ Embedding客户端初始化失败,无法继续") + + # 生成或加载兴趣标签 + logger.info("🎯 正在生成或加载兴趣标签...") + await self._load_or_generate_interests(personality_description, personality_id) + + self._initialized = True + + # 检查是否成功获取兴趣标签 + if self.current_interests and len(self.current_interests.get_active_tags()) > 0: + active_tags_count = len(self.current_interests.get_active_tags()) + logger.info("=" * 60) + logger.info("✅ 机器人兴趣标签系统初始化完成!") + logger.info(f"📊 活跃兴趣标签数量: {active_tags_count}") + logger.info(f"💾 Embedding缓存大小: {len(self.embedding_cache)}") + logger.info("=" * 60) + else: + raise RuntimeError("❌ 未能成功生成或加载兴趣标签") + + except Exception as e: + logger.error("=" * 60) + logger.error(f"❌ 初始化机器人兴趣标签系统失败: {e}") + logger.error("=" * 60) + traceback.print_exc() + raise # 重新抛出异常,不允许降级初始化 + + async def _initialize_embedding_model(self): + """初始化embedding模型""" + logger.info("🔧 正在配置embedding客户端...") + + # 使用项目配置的embedding模型 + from src.config.config import model_config + from src.llm_models.utils_model import LLMRequest + + logger.debug("✅ 成功导入embedding相关模块") + + # 检查embedding配置是否存在 + if not hasattr(model_config.model_task_config, 'embedding'): + raise RuntimeError("❌ 未找到embedding模型配置") + + logger.info("📋 找到embedding模型配置") + self.embedding_config = model_config.model_task_config.embedding + self.embedding_dimension = 1024 # BGE-M3的维度 + logger.info(f"📐 使用模型维度: {self.embedding_dimension}") + + # 创建LLMRequest实例用于embedding + self.embedding_request = LLMRequest(model_set=self.embedding_config, request_type="interest_embedding") + logger.info("✅ Embedding请求客户端初始化成功") + logger.info(f"🔗 客户端类型: {type(self.embedding_request).__name__}") + + # 获取第一个embedding模型的ModelInfo + if hasattr(self.embedding_config, 'model_list') and self.embedding_config.model_list: + first_model_name = self.embedding_config.model_list[0] + logger.info(f"🎯 使用embedding模型: {first_model_name}") + else: + logger.warning("⚠️ 未找到embedding模型列表") + + logger.info("✅ Embedding模型初始化完成") + + async def _load_or_generate_interests(self, personality_description: str, personality_id: str): + """加载或生成兴趣标签""" + logger.info(f"📚 正在为 '{personality_id}' 加载或生成兴趣标签...") + + # 首先尝试从数据库加载 + logger.info("💾 尝试从数据库加载现有兴趣标签...") + loaded_interests = await self._load_interests_from_database(personality_id) + + if loaded_interests: + self.current_interests = loaded_interests + active_count = len(loaded_interests.get_active_tags()) + logger.info(f"✅ 成功从数据库加载 {active_count} 个兴趣标签") + logger.info(f"📅 最后更新时间: {loaded_interests.last_updated}") + logger.info(f"🔄 版本号: {loaded_interests.version}") + else: + # 生成新的兴趣标签 + logger.info("🆕 数据库中未找到兴趣标签,开始生成新的...") + logger.info("🤖 正在调用LLM生成个性化兴趣标签...") + generated_interests = await self._generate_interests_from_personality(personality_description, personality_id) + + if generated_interests: + self.current_interests = generated_interests + active_count = len(generated_interests.get_active_tags()) + logger.info(f"✅ 成功生成 {active_count} 个兴趣标签") + + # 保存到数据库 + logger.info("💾 正在保存兴趣标签到数据库...") + await self._save_interests_to_database(generated_interests) + else: + raise RuntimeError("❌ 兴趣标签生成失败") + + async def _generate_interests_from_personality(self, personality_description: str, personality_id: str) -> Optional[BotPersonalityInterests]: + """根据人设生成兴趣标签""" + try: + logger.info("🎨 开始根据人设生成兴趣标签...") + logger.info(f"📝 人设长度: {len(personality_description)} 字符") + + # 检查embedding客户端是否可用 + if not hasattr(self, 'embedding_request'): + raise RuntimeError("❌ Embedding客户端未初始化,无法生成兴趣标签") + + # 构建提示词 + logger.info("📝 构建LLM提示词...") + prompt = f""" +基于以下机器人人设描述,生成一套合适的兴趣标签: + +人设描述: +{personality_description} + +请生成一系列兴趣关键词标签,要求: +1. 标签应该符合人设特点和性格 +2. 每个标签都有权重(0.1-1.0),表示对该兴趣的喜好程度 +3. 生成15-25个不等的标签 +4. 标签应该是具体的关键词,而不是抽象概念 + +请以JSON格式返回,格式如下: +{{ + "interests": [ + {{"name": "标签名", "weight": 0.8}}, + {{"name": "标签名", "weight": 0.6}}, + {{"name": "标签名", "weight": 0.9}} + ] +}} + +注意: +- 权重范围0.1-1.0,权重越高表示越感兴趣 +- 标签要具体,如"编程"、"游戏"、"旅行"等 +- 根据人设生成个性化的标签 +""" + + # 调用LLM生成兴趣标签 + logger.info("🤖 正在调用LLM生成兴趣标签...") + response = await self._call_llm_for_interest_generation(prompt) + + if not response: + raise RuntimeError("❌ LLM未返回有效响应") + + logger.info("✅ LLM响应成功,开始解析兴趣标签...") + interests_data = orjson.loads(response) + + bot_interests = BotPersonalityInterests( + personality_id=personality_id, + personality_description=personality_description + ) + + # 解析生成的兴趣标签 + interests_list = interests_data.get("interests", []) + logger.info(f"📋 解析到 {len(interests_list)} 个兴趣标签") + + for i, tag_data in enumerate(interests_list): + tag_name = tag_data.get("name", f"标签_{i}") + weight = tag_data.get("weight", 0.5) + + tag = BotInterestTag( + tag_name=tag_name, + weight=weight + ) + bot_interests.interest_tags.append(tag) + + logger.debug(f" 🏷️ {tag_name} (权重: {weight:.2f})") + + # 为所有标签生成embedding + logger.info("🧠 开始为兴趣标签生成embedding向量...") + await self._generate_embeddings_for_tags(bot_interests) + + logger.info("✅ 兴趣标签生成完成") + return bot_interests + + except orjson.JSONDecodeError as e: + logger.error(f"❌ 解析LLM响应JSON失败: {e}") + raise + except Exception as e: + logger.error(f"❌ 根据人设生成兴趣标签失败: {e}") + traceback.print_exc() + raise + + + async def _call_llm_for_interest_generation(self, prompt: str) -> Optional[str]: + """调用LLM生成兴趣标签""" + try: + logger.info("🔧 配置LLM客户端...") + + # 使用llm_api来处理请求 + from src.plugin_system.apis import llm_api + from src.config.config import model_config + + # 构建完整的提示词,明确要求只返回纯JSON + full_prompt = f"""你是一个专业的机器人人设分析师,擅长根据人设描述生成合适的兴趣标签。 + +{prompt} + +请确保返回格式为有效的JSON,不要包含任何额外的文本、解释或代码块标记。只返回JSON对象本身。""" + + # 使用replyer模型配置 + replyer_config = model_config.model_task_config.replyer + + # 调用LLM API + logger.info("🚀 正在通过LLM API发送请求...") + success, response, reasoning_content, model_name = await llm_api.generate_with_model( + prompt=full_prompt, + model_config=replyer_config, + request_type="interest_generation", + temperature=0.7, + max_tokens=2000 + ) + + if success and response: + logger.info(f"✅ LLM响应成功,模型: {model_name}, 响应长度: {len(response)} 字符") + logger.debug(f"📄 LLM响应内容: {response[:200]}..." if len(response) > 200 else f"📄 LLM响应内容: {response}") + if reasoning_content: + logger.debug(f"🧠 推理内容: {reasoning_content[:100]}...") + + # 清理响应内容,移除可能的代码块标记 + cleaned_response = self._clean_llm_response(response) + return cleaned_response + else: + logger.warning("⚠️ LLM返回空响应或调用失败") + return None + + except Exception as e: + logger.error(f"❌ 调用LLM生成兴趣标签失败: {e}") + logger.error("🔍 错误详情:") + traceback.print_exc() + return None + + def _clean_llm_response(self, response: str) -> str: + """清理LLM响应,移除代码块标记和其他非JSON内容""" + import re + + # 移除 ```json 和 ``` 标记 + cleaned = re.sub(r'```json\s*', '', response) + cleaned = re.sub(r'\s*```', '', cleaned) + + # 移除可能的多余空格和换行 + cleaned = cleaned.strip() + + # 尝试提取JSON对象(如果响应中有其他文本) + json_match = re.search(r'\{.*\}', cleaned, re.DOTALL) + if json_match: + cleaned = json_match.group(0) + + logger.debug(f"🧹 清理后的响应: {cleaned[:200]}..." if len(cleaned) > 200 else f"🧹 清理后的响应: {cleaned}") + return cleaned + + async def _generate_embeddings_for_tags(self, interests: BotPersonalityInterests): + """为所有兴趣标签生成embedding""" + if not hasattr(self, 'embedding_request'): + raise RuntimeError("❌ Embedding客户端未初始化,无法生成embedding") + + total_tags = len(interests.interest_tags) + logger.info(f"🧠 开始为 {total_tags} 个兴趣标签生成embedding向量...") + + cached_count = 0 + generated_count = 0 + failed_count = 0 + + for i, tag in enumerate(interests.interest_tags, 1): + if tag.tag_name in self.embedding_cache: + # 使用缓存的embedding + tag.embedding = self.embedding_cache[tag.tag_name] + cached_count += 1 + logger.debug(f" [{i}/{total_tags}] 🏷️ '{tag.tag_name}' - 使用缓存") + else: + # 生成新的embedding + embedding_text = tag.tag_name + + logger.debug(f" [{i}/{total_tags}] 🔄 正在为 '{tag.tag_name}' 生成embedding...") + embedding = await self._get_embedding(embedding_text) + + if embedding: + tag.embedding = embedding + self.embedding_cache[tag.tag_name] = embedding + generated_count += 1 + logger.debug(f" ✅ '{tag.tag_name}' embedding生成成功") + else: + failed_count += 1 + logger.warning(f" ❌ '{tag.tag_name}' embedding生成失败") + + if failed_count > 0: + raise RuntimeError(f"❌ 有 {failed_count} 个兴趣标签embedding生成失败") + + interests.last_updated = datetime.now() + logger.info("=" * 50) + logger.info("✅ Embedding生成完成!") + logger.info(f"📊 总标签数: {total_tags}") + logger.info(f"💾 缓存命中: {cached_count}") + logger.info(f"🆕 新生成: {generated_count}") + logger.info(f"❌ 失败: {failed_count}") + logger.info(f"🗃️ 总缓存大小: {len(self.embedding_cache)}") + logger.info("=" * 50) + + async def _get_embedding(self, text: str) -> List[float]: + """获取文本的embedding向量""" + if not hasattr(self, 'embedding_request'): + raise RuntimeError("❌ Embedding请求客户端未初始化") + + # 检查缓存 + if text in self.embedding_cache: + logger.debug(f"💾 使用缓存的embedding: '{text[:30]}...'") + return self.embedding_cache[text] + + # 使用LLMRequest获取embedding + logger.debug(f"🔄 正在获取embedding: '{text[:30]}...'") + embedding, model_name = await self.embedding_request.get_embedding(text) + + if embedding and len(embedding) > 0: + self.embedding_cache[text] = embedding + logger.debug(f"✅ Embedding获取成功,维度: {len(embedding)}, 模型: {model_name}") + return embedding + else: + raise RuntimeError(f"❌ 返回的embedding为空: {embedding}") + + async def _generate_message_embedding(self, message_text: str, keywords: List[str]) -> List[float]: + """为消息生成embedding向量""" + # 组合消息文本和关键词作为embedding输入 + if keywords: + combined_text = f"{message_text} {' '.join(keywords)}" + else: + combined_text = message_text + + logger.debug(f"🔄 正在为消息生成embedding,输入长度: {len(combined_text)}") + + # 生成embedding + embedding = await self._get_embedding(combined_text) + logger.debug(f"✅ 消息embedding生成成功,维度: {len(embedding)}") + return embedding + + async def _calculate_similarity_scores(self, result: InterestMatchResult, message_embedding: List[float], keywords: List[str]): + """计算消息与兴趣标签的相似度分数""" + try: + if not self.current_interests: + return + + active_tags = self.current_interests.get_active_tags() + if not active_tags: + return + + logger.debug(f"🔍 开始计算与 {len(active_tags)} 个兴趣标签的相似度") + + for tag in active_tags: + if tag.embedding: + # 计算余弦相似度 + similarity = self._calculate_cosine_similarity(message_embedding, tag.embedding) + weighted_score = similarity * tag.weight + + # 设置相似度阈值为0.3 + if similarity > 0.3: + result.add_match(tag.tag_name, weighted_score, keywords) + logger.debug(f" 🏷️ '{tag.tag_name}': 相似度={similarity:.3f}, 权重={tag.weight:.2f}, 加权分数={weighted_score:.3f}") + + except Exception as e: + logger.error(f"❌ 计算相似度分数失败: {e}") + + async def calculate_interest_match(self, message_text: str, keywords: List[str] = None) -> InterestMatchResult: + """计算消息与机器人兴趣的匹配度""" + if not self.current_interests or not self._initialized: + raise RuntimeError("❌ 兴趣标签系统未初始化") + + logger.info("🎯 开始计算兴趣匹配度...") + logger.debug(f"💬 消息长度: {len(message_text)} 字符") + if keywords: + logger.debug(f"🏷️ 关键词数量: {len(keywords)}") + + message_id = f"msg_{datetime.now().timestamp()}" + result = InterestMatchResult(message_id=message_id) + + # 获取活跃的兴趣标签 + active_tags = self.current_interests.get_active_tags() + if not active_tags: + raise RuntimeError("❌ 没有活跃的兴趣标签") + + logger.info(f"📊 有 {len(active_tags)} 个活跃兴趣标签参与匹配") + + # 生成消息的embedding + logger.debug("🔄 正在生成消息embedding...") + message_embedding = await self._get_embedding(message_text) + logger.debug(f"✅ 消息embedding生成成功,维度: {len(message_embedding)}") + + # 计算与每个兴趣标签的相似度 + match_count = 0 + high_similarity_count = 0 + similarity_threshold = 0.3 + + logger.debug(f"🔍 使用相似度阈值: {similarity_threshold}") + + for tag in active_tags: + if tag.embedding: + similarity = self._calculate_cosine_similarity(message_embedding, tag.embedding) + weighted_score = similarity * tag.weight + + if similarity > similarity_threshold: + match_count += 1 + result.add_match(tag.tag_name, weighted_score, [tag.tag_name]) + + if similarity > 0.7: + high_similarity_count += 1 + + logger.debug(f" 🏷️ '{tag.tag_name}': 相似度={similarity:.3f}, 权重={tag.weight:.2f}, 加权分数={weighted_score:.3f}") + + logger.info(f"📈 匹配统计: {match_count}/{len(active_tags)} 个标签超过阈值") + logger.info(f"🔥 高相似度匹配(>0.7): {high_similarity_count} 个") + + # 计算总体分数 + result.calculate_overall_score() + + # 确定最佳匹配标签 + if result.matched_tags: + top_tag_name = max(result.match_scores.items(), key=lambda x: x[1])[0] + result.top_tag = top_tag_name + logger.info(f"🏆 最佳匹配标签: '{top_tag_name}' (分数: {result.match_scores[top_tag_name]:.3f})") + + logger.info(f"📊 最终结果: 总分={result.overall_score:.3f}, 置信度={result.confidence:.3f}, 匹配标签数={len(result.matched_tags)}") + return result + + + def _calculate_cosine_similarity(self, vec1: List[float], vec2: List[float]) -> float: + """计算余弦相似度""" + try: + vec1 = np.array(vec1) + vec2 = np.array(vec2) + + dot_product = np.dot(vec1, vec2) + norm1 = np.linalg.norm(vec1) + norm2 = np.linalg.norm(vec2) + + if norm1 == 0 or norm2 == 0: + return 0.0 + + return dot_product / (norm1 * norm2) + + except Exception as e: + logger.error(f"计算余弦相似度失败: {e}") + return 0.0 + + async def _load_interests_from_database(self, personality_id: str) -> Optional[BotPersonalityInterests]: + """从数据库加载兴趣标签""" + try: + logger.info(f"💾 正在从数据库加载兴趣标签,personality_id: {personality_id}") + + # 导入SQLAlchemy相关模块 + from src.common.database.sqlalchemy_models import BotPersonalityInterests as DBBotPersonalityInterests + from src.common.database.sqlalchemy_database_api import get_db_session + import orjson + + with get_db_session() as session: + # 查询最新的兴趣标签配置 + db_interests = session.query(DBBotPersonalityInterests).filter( + DBBotPersonalityInterests.personality_id == personality_id + ).order_by( + DBBotPersonalityInterests.version.desc(), + DBBotPersonalityInterests.last_updated.desc() + ).first() + + if db_interests: + logger.info(f"✅ 找到数据库中的兴趣标签配置,版本: {db_interests.version}") + logger.debug(f"📅 最后更新时间: {db_interests.last_updated}") + logger.debug(f"🧠 使用的embedding模型: {db_interests.embedding_model}") + + # 解析JSON格式的兴趣标签 + try: + tags_data = orjson.loads(db_interests.interest_tags) + logger.debug(f"🏷️ 解析到 {len(tags_data)} 个兴趣标签") + + # 创建BotPersonalityInterests对象 + interests = BotPersonalityInterests( + personality_id=db_interests.personality_id, + personality_description=db_interests.personality_description, + embedding_model=db_interests.embedding_model, + version=db_interests.version, + last_updated=db_interests.last_updated + ) + + # 解析兴趣标签 + for tag_data in tags_data: + tag = BotInterestTag( + tag_name=tag_data.get("tag_name", ""), + weight=tag_data.get("weight", 0.5), + created_at=datetime.fromisoformat(tag_data.get("created_at", datetime.now().isoformat())), + updated_at=datetime.fromisoformat(tag_data.get("updated_at", datetime.now().isoformat())), + is_active=tag_data.get("is_active", True), + embedding=tag_data.get("embedding") + ) + interests.interest_tags.append(tag) + + logger.info(f"✅ 成功从数据库加载 {len(interests.interest_tags)} 个兴趣标签") + return interests + + except (orjson.JSONDecodeError, Exception) as e: + logger.error(f"❌ 解析兴趣标签JSON失败: {e}") + logger.debug(f"🔍 原始JSON数据: {db_interests.interest_tags[:200]}...") + return None + else: + logger.info(f"ℹ️ 数据库中未找到personality_id为 '{personality_id}' 的兴趣标签配置") + return None + + except Exception as e: + logger.error(f"❌ 从数据库加载兴趣标签失败: {e}") + logger.error("🔍 错误详情:") + traceback.print_exc() + return None + + async def _save_interests_to_database(self, interests: BotPersonalityInterests): + """保存兴趣标签到数据库""" + try: + logger.info("💾 正在保存兴趣标签到数据库...") + logger.info(f"📋 personality_id: {interests.personality_id}") + logger.info(f"🏷️ 兴趣标签数量: {len(interests.interest_tags)}") + logger.info(f"🔄 版本: {interests.version}") + + # 导入SQLAlchemy相关模块 + from src.common.database.sqlalchemy_models import BotPersonalityInterests as DBBotPersonalityInterests + from src.common.database.sqlalchemy_database_api import get_db_session + import orjson + + # 将兴趣标签转换为JSON格式 + tags_data = [] + for tag in interests.interest_tags: + tag_dict = { + "tag_name": tag.tag_name, + "weight": tag.weight, + "created_at": tag.created_at.isoformat(), + "updated_at": tag.updated_at.isoformat(), + "is_active": tag.is_active, + "embedding": tag.embedding + } + tags_data.append(tag_dict) + + # 序列化为JSON + json_data = orjson.dumps(tags_data) + + with get_db_session() as session: + # 检查是否已存在相同personality_id的记录 + existing_record = session.query(DBBotPersonalityInterests).filter( + DBBotPersonalityInterests.personality_id == interests.personality_id + ).first() + + if existing_record: + # 更新现有记录 + logger.info("🔄 更新现有的兴趣标签配置") + existing_record.interest_tags = json_data + existing_record.personality_description = interests.personality_description + existing_record.embedding_model = interests.embedding_model + existing_record.version = interests.version + existing_record.last_updated = interests.last_updated + + logger.info(f"✅ 成功更新兴趣标签配置,版本: {interests.version}") + + else: + # 创建新记录 + logger.info("🆕 创建新的兴趣标签配置") + new_record = DBBotPersonalityInterests( + personality_id=interests.personality_id, + personality_description=interests.personality_description, + interest_tags=json_data, + embedding_model=interests.embedding_model, + version=interests.version, + last_updated=interests.last_updated + ) + session.add(new_record) + logger.info(f"✅ 成功创建兴趣标签配置,版本: {interests.version}") + + logger.info("✅ 兴趣标签已成功保存到数据库") + + except Exception as e: + logger.error(f"❌ 保存兴趣标签到数据库失败: {e}") + logger.error("🔍 错误详情:") + traceback.print_exc() + + def get_current_interests(self) -> Optional[BotPersonalityInterests]: + """获取当前的兴趣标签配置""" + return self.current_interests + + def get_interest_stats(self) -> Dict[str, Any]: + """获取兴趣系统统计信息""" + if not self.current_interests: + return {"initialized": False} + + active_tags = self.current_interests.get_active_tags() + + return { + "initialized": self._initialized, + "total_tags": len(active_tags), + "embedding_model": self.current_interests.embedding_model, + "last_updated": self.current_interests.last_updated.isoformat(), + "cache_size": len(self.embedding_cache) + } + + async def update_interest_tags(self, new_personality_description: str = None): + """更新兴趣标签""" + try: + if not self.current_interests: + logger.warning("没有当前的兴趣标签配置,无法更新") + return + + if new_personality_description: + self.current_interests.personality_description = new_personality_description + + # 重新生成兴趣标签 + new_interests = await self._generate_interests_from_personality( + self.current_interests.personality_description, + self.current_interests.personality_id + ) + + if new_interests: + new_interests.version = self.current_interests.version + 1 + self.current_interests = new_interests + await self._save_interests_to_database(new_interests) + logger.info(f"兴趣标签已更新,版本: {new_interests.version}") + + except Exception as e: + logger.error(f"更新兴趣标签失败: {e}") + traceback.print_exc() + + +# 创建全局实例(重新创建以包含新的属性) +bot_interest_manager = BotInterestManager() \ No newline at end of file diff --git a/src/chat/planner_actions/plan_executor.py b/src/chat/planner_actions/plan_executor.py index 871d4e885..d8ee746c2 100644 --- a/src/chat/planner_actions/plan_executor.py +++ b/src/chat/planner_actions/plan_executor.py @@ -6,6 +6,7 @@ import asyncio import time from typing import Dict, List +from src.config.config import global_config from src.chat.planner_actions.action_manager import ActionManager from src.common.data_models.info_data_model import Plan, ActionPlannerInfo from src.common.logger import get_logger @@ -122,6 +123,16 @@ class PlanExecutor: try: logger.info(f"执行回复动作: {action_info.action_type}, 原因: {action_info.reasoning}") + if action_info.action_message.get("user_id","") == str(global_config.bot.qq_account): + logger.warning("尝试回复自己,跳过此动作以防止死循环。") + return { + "action_type": action_info.action_type, + "success": False, + "error_message": "尝试回复自己,跳过此动作以防止死循环。", + "execution_time": 0, + "reasoning": action_info.reasoning, + "reply_content": "", + } # 构建回复动作参数 action_params = { "chat_id": plan.chat_id, diff --git a/src/chat/planner_actions/planner.py b/src/chat/planner_actions/planner.py index 8bdd21464..85269a756 100644 --- a/src/chat/planner_actions/planner.py +++ b/src/chat/planner_actions/planner.py @@ -97,7 +97,7 @@ class ActionPlanner: # 2. 兴趣度评分 - 只对未读消息进行评分 if unread_messages: bot_nickname = global_config.bot.nickname - interest_scores = self.interest_scoring.calculate_interest_scores( + interest_scores = await self.interest_scoring.calculate_interest_scores( unread_messages, bot_nickname ) @@ -175,33 +175,14 @@ class ActionPlanner: return final_actions_dict, final_target_message_dict - def _build_return_result(self, plan: Plan) -> Tuple[List[Dict], Optional[Dict]]: - """构建返回结果""" - final_actions = plan.decided_actions or [] - final_target_message = next( - (act.action_message for act in final_actions if act.action_message), None - ) - - final_actions_dict = [asdict(act) for act in final_actions] - - if final_target_message: - if hasattr(final_target_message, '__dataclass_fields__'): - final_target_message_dict = asdict(final_target_message) - else: - final_target_message_dict = final_target_message - else: - final_target_message_dict = None - - return final_actions_dict, final_target_message_dict - def get_user_relationship(self, user_id: str) -> float: """获取用户关系分""" return self.interest_scoring.get_user_relationship(user_id) def update_interest_keywords(self, new_keywords: Dict[str, List[str]]): - """更新兴趣关键词""" - self.interest_scoring.interest_keywords.update(new_keywords) - logger.info(f"已更新兴趣关键词: {list(new_keywords.keys())}") + """更新兴趣关键词(已弃用,仅保留用于兼容性)""" + logger.info("传统关键词匹配已移除,此方法仅保留用于兼容性") + # 此方法已弃用,因为现在完全使用embedding匹配 def get_planner_stats(self) -> Dict[str, any]: """获取规划器统计""" @@ -226,5 +207,4 @@ class ActionPlanner: } -# 全局兴趣度评分系统实例 -interest_scoring_system = InterestScoringSystem() \ No newline at end of file +# 全局兴趣度评分系统实例 - 在 individuality 模块中创建 \ No newline at end of file diff --git a/src/common/data_models/bot_interest_data_model.py b/src/common/data_models/bot_interest_data_model.py new file mode 100644 index 000000000..e0f86237f --- /dev/null +++ b/src/common/data_models/bot_interest_data_model.py @@ -0,0 +1,132 @@ +""" +机器人兴趣标签数据模型 +定义机器人的兴趣标签和相关的embedding数据结构 +""" +from dataclasses import dataclass, field +from typing import List, Dict, Optional, Any +from datetime import datetime + +from . import BaseDataModel + + +@dataclass +class BotInterestTag(BaseDataModel): + """机器人兴趣标签""" + tag_name: str + weight: float = 1.0 # 权重,表示对这个兴趣的喜好程度 (0.0-1.0) + embedding: Optional[List[float]] = None # 标签的embedding向量 + created_at: datetime = field(default_factory=datetime.now) + updated_at: datetime = field(default_factory=datetime.now) + is_active: bool = True + + def to_dict(self) -> Dict[str, Any]: + """转换为字典格式""" + return { + "tag_name": self.tag_name, + "weight": self.weight, + "embedding": self.embedding, + "created_at": self.created_at.isoformat(), + "updated_at": self.updated_at.isoformat(), + "is_active": self.is_active + } + + @classmethod + def from_dict(cls, data: Dict[str, Any]) -> "BotInterestTag": + """从字典创建对象""" + return cls( + tag_name=data["tag_name"], + weight=data.get("weight", 1.0), + embedding=data.get("embedding"), + created_at=datetime.fromisoformat(data["created_at"]) if data.get("created_at") else datetime.now(), + updated_at=datetime.fromisoformat(data["updated_at"]) if data.get("updated_at") else datetime.now(), + is_active=data.get("is_active", True) + ) + + +@dataclass +class BotPersonalityInterests(BaseDataModel): + """机器人人格化兴趣配置""" + personality_id: str + personality_description: str # 人设描述文本 + interest_tags: List[BotInterestTag] = field(default_factory=list) + embedding_model: str = "text-embedding-ada-002" # 使用的embedding模型 + last_updated: datetime = field(default_factory=datetime.now) + version: int = 1 # 版本号,用于追踪更新 + + def get_active_tags(self) -> List[BotInterestTag]: + """获取活跃的兴趣标签""" + return [tag for tag in self.interest_tags if tag.is_active] + + + def to_dict(self) -> Dict[str, Any]: + """转换为字典格式""" + return { + "personality_id": self.personality_id, + "personality_description": self.personality_description, + "interest_tags": [tag.to_dict() for tag in self.interest_tags], + "embedding_model": self.embedding_model, + "last_updated": self.last_updated.isoformat(), + "version": self.version + } + + @classmethod + def from_dict(cls, data: Dict[str, Any]) -> "BotPersonalityInterests": + """从字典创建对象""" + return cls( + personality_id=data["personality_id"], + personality_description=data["personality_description"], + interest_tags=[BotInterestTag.from_dict(tag_data) for tag_data in data.get("interest_tags", [])], + embedding_model=data.get("embedding_model", "text-embedding-ada-002"), + last_updated=datetime.fromisoformat(data["last_updated"]) if data.get("last_updated") else datetime.now(), + version=data.get("version", 1) + ) + + +@dataclass +class InterestMatchResult(BaseDataModel): + """兴趣匹配结果""" + message_id: str + matched_tags: List[str] = field(default_factory=list) + match_scores: Dict[str, float] = field(default_factory=dict) # tag_name -> score + overall_score: float = 0.0 + top_tag: Optional[str] = None + confidence: float = 0.0 # 匹配置信度 (0.0-1.0) + matched_keywords: List[str] = field(default_factory=list) + + def add_match(self, tag_name: str, score: float, keywords: List[str] = None): + """添加匹配结果""" + self.matched_tags.append(tag_name) + self.match_scores[tag_name] = score + if keywords: + self.matched_keywords.extend(keywords) + + def calculate_overall_score(self): + """计算总体匹配分数""" + if not self.match_scores: + self.overall_score = 0.0 + self.top_tag = None + return + + # 使用加权平均计算总体分数 + total_weight = len(self.match_scores) + if total_weight > 0: + self.overall_score = sum(self.match_scores.values()) / total_weight + # 设置最佳匹配标签 + self.top_tag = max(self.match_scores.items(), key=lambda x: x[1])[0] + else: + self.overall_score = 0.0 + self.top_tag = None + + # 计算置信度(基于匹配标签数量和分数分布) + if len(self.match_scores) > 0: + avg_score = self.overall_score + score_variance = sum((score - avg_score) ** 2 for score in self.match_scores.values()) / len(self.match_scores) + # 分数越集中,置信度越高 + self.confidence = max(0.0, 1.0 - score_variance) + else: + self.confidence = 0.0 + + def get_top_matches(self, top_n: int = 3) -> List[tuple]: + """获取前N个最佳匹配""" + sorted_matches = sorted(self.match_scores.items(), key=lambda x: x[1], reverse=True) + return sorted_matches[:top_n] \ No newline at end of file diff --git a/src/common/database/sqlalchemy_models.py b/src/common/database/sqlalchemy_models.py index 464b38e9f..8b7109522 100644 --- a/src/common/database/sqlalchemy_models.py +++ b/src/common/database/sqlalchemy_models.py @@ -298,6 +298,26 @@ class PersonInfo(Base): ) +class BotPersonalityInterests(Base): + """机器人人格兴趣标签模型""" + + __tablename__ = "bot_personality_interests" + + id = Column(Integer, primary_key=True, autoincrement=True) + personality_id = Column(get_string_field(100), nullable=False, index=True) + personality_description = Column(Text, nullable=False) + interest_tags = Column(Text, nullable=False) # JSON格式存储的兴趣标签列表 + embedding_model = Column(get_string_field(100), nullable=False, default="text-embedding-ada-002") + version = Column(Integer, nullable=False, default=1) + last_updated = Column(DateTime, nullable=False, default=datetime.datetime.now, index=True) + + __table_args__ = ( + Index("idx_botpersonality_personality_id", "personality_id"), + Index("idx_botpersonality_version", "version"), + Index("idx_botpersonality_last_updated", "last_updated"), + ) + + class Memory(Base): """记忆模型""" diff --git a/src/individuality/individuality.py b/src/individuality/individuality.py index 39aef9b3b..09bd3ad00 100644 --- a/src/individuality/individuality.py +++ b/src/individuality/individuality.py @@ -64,6 +64,9 @@ class Individuality: else: logger.error("人设构建失败") + # 初始化智能兴趣系统 + await self._initialize_smart_interest_system(personality_result, identity_result) + # 如果任何一个发生变化,都需要清空数据库中的info_list(因为这影响整体人设) if personality_changed or identity_changed: logger.info("将清空数据库中原有的关键词缓存") @@ -75,6 +78,22 @@ class Individuality: } await person_info_manager.update_one_field(self.bot_person_id, "info_list", [], data=update_data) + async def _initialize_smart_interest_system(self, personality_result: str, identity_result: str): + """初始化智能兴趣系统""" + # 组合完整的人设描述 + full_personality = f"{personality_result},{identity_result}" + + # 获取全局兴趣评分系统实例 + from src.chat.affinity_flow.interest_scoring import interest_scoring_system + + # 初始化智能兴趣系统 + await interest_scoring_system.initialize_smart_interests( + personality_description=full_personality, + personality_id=self.bot_person_id + ) + + logger.info("智能兴趣系统初始化完成") + async def get_personality_block(self) -> str: bot_name = global_config.bot.nickname if global_config.bot.alias_names: diff --git a/消息处理流程.md b/消息处理流程.md new file mode 100644 index 000000000..db78ba7c1 --- /dev/null +++ b/消息处理流程.md @@ -0,0 +1,235 @@ +# 从消息接收到执行Action的完整流程图 + +## 整体流程概览 + +```mermaid +flowchart TD + A[原始消息数据] --> B[消息接收层
src/chat/message_receive/bot.py] + B --> C[消息解析
src/chat/message_receive/message.py] + C --> D[会话管理
src/chat/message_receive/chat_stream.py] + D --> E[亲和力流分发
src/chat/affinity_flow/afc_manager.py] + E --> F[聊天处理器
src/chat/affinity_flow/chatter.py] + F --> G[智能规划决策
三层架构] + G --> H[动作执行管理
src/chat/planner_actions/action_manager.py] + H --> I[最终执行
src/chat/planner_actions/plan_executor.py] + I --> J[Action执行结果] +``` + +## 详细分阶段流程图 + +### 1. 消息接收与预处理阶段 + +```mermaid +flowchart TD + A[原始消息数据] --> B[message_process入口] + B --> C{消息切片重组} + C -- 完整消息 --> D[平台类型判断] + C -- 切片消息 --> E[等待更多切片] + + D --> F{S4U平台?} + F -- 是 --> G[S4U特殊处理] + F -- 否 --> H[创建MessageRecv对象] + + H --> I[过滤检查
违禁词/正则] + I --> J[命令处理系统] + + J --> K{PlusCommand?} + K -- 是 --> L[执行PlusCommand] + K -- 否 --> M[执行BaseCommand] + + L --> N[事件触发] + M --> N + + N --> O[模板处理] + O --> P[预处理完成] +``` + +### 2. 消息解析阶段 + +```mermaid +flowchart TD + A[预处理完成消息] --> B[MessageRecv.process] + B --> C{消息类型判断} + + C -- 文本 --> D[直接提取文本] + C -- 图片 --> E[图片识别处理] + C -- 表情 --> F[表情包描述] + C -- 语音 --> G[语音转文本] + C -- 视频 --> H[视频内容分析] + C -- AT消息 --> I[提取用户信息] + C -- 其他 --> J[通用处理] + + D --> K[生成纯文本] + E --> K + F --> K + G --> K + H --> K + I --> K + J --> K + + K --> L[消息解析完成] +``` + +### 3. 会话管理阶段 + +```mermaid +flowchart TD + A[解析后消息] --> B[ChatManager.register_message] + B --> C[生成stream_id
platform+user+group] + + C --> D{会话是否存在?} + D -- 内存中存在 --> E[获取现有会话] + D -- 内存中不存在 --> F[数据库查询] + + F --> G{数据库存在?} + G -- 是 --> H[从数据库加载] + G -- 否 --> I[创建新会话] + + H --> J[更新会话信息] + I --> J + + J --> K[设置消息上下文] + K --> L[会话管理完成] +``` + +### 4. 智能规划决策阶段(三层架构) + +```mermaid +flowchart TD + A[会话管理完成] --> B[规划器入口 ActionPlanner] + + B --> C[PlanGenerator生成初始Plan] + C --> D[兴趣度评分系统] + + D --> E[提取未读消息] + E --> F[计算多维评分] + F --> G[兴趣匹配度] + F --> H[用户关系分] + F --> I[提及度评分] + + G --> J[加权总分计算] + H --> J + I --> J + + J --> K{是否回复?} + K -- 是 --> L[保留reply动作] + K -- 否 --> M[移除reply动作] + + L --> N[PlanFilter筛选] + M --> N + + N --> O[LLM决策最终动作] + O --> P[规划决策完成] +``` + +### 5. 动作执行阶段 + +```mermaid +flowchart TD + A[规划决策完成] --> B[ActionManager执行] + + B --> C{动作类型判断} + C -- no_action --> D[记录不动作] + C -- no_reply --> E[记录不回复] + C -- reply --> F[生成回复内容] + C -- 其他动作 --> G[执行具体动作] + + D --> H[执行完成] + E --> H + F --> I[发送回复消息] + G --> J[动作处理器执行] + + I --> H + J --> H + + H --> K[PlanExecutor最终执行] + K --> L[用户关系追踪] + L --> M[执行统计记录] + M --> N[动作执行完成] +``` + +## 完整端到端流程 + +```mermaid +flowchart LR + A[消息接收] --> B[消息解析] + B --> C[会话管理] + C --> D[消息分发] + D --> E[聊天处理] + E --> F[兴趣度评分] + F --> G[规划生成] + G --> H[LLM筛选] + H --> I[动作管理] + I --> J[最终执行] + J --> K[结果返回] + + subgraph 智能决策层 + F + G + H + end + + subgraph 执行层 + I + J + K + end + + style 智能决策层 fill:#e1f5fe + style 执行层 fill:#f3e5f5 +``` + +## 关键组件交互关系 + +```mermaid +flowchart TD + Bot[Bot.message_process] --> Message[MessageRecv] + Message --> ChatManager[ChatManager] + ChatManager --> AFCManager[AFCManager] + AFCManager --> Chatter[AffinityFlowChatter] + + Chatter --> Planner[ActionPlanner] + Planner --> Generator[PlanGenerator] + Planner --> Scorer[InterestScoringSystem] + Planner --> Filter[PlanFilter] + + Filter --> ActionManager[ActionManager] + ActionManager --> Executor[PlanExecutor] + + Executor --> Result[执行结果] + + %% 数据流 + Message -.-> |消息数据| Chatter + Scorer -.-> |兴趣评分| Filter + Generator -.-> |初始Plan| Filter + Filter -.-> |最终Plan| Executor +``` + +## 异常处理流程 + +```mermaid +flowchart TD + A[开始处理] --> B[正常流程] + B --> C[处理完成] + + B --> D{发生异常?} + D -- 是 --> E[异常捕获] + D -- 否 --> C + + E --> F[日志记录错误] + F --> G[错误类型判断] + + G -- 消息解析失败 --> H[返回解析错误] + G -- 会话不存在 --> I[创建新会话重试] + G -- LLM决策失败 --> J[使用默认动作] + G -- 动作执行失败 --> K[动作回退机制] + G -- 其他错误 --> L[返回通用错误] + + H --> M[异常处理完成] + I --> B + J --> M + K --> M + L --> M +``` + +这个流程图详细展示了从消息接收到执行action的完整流程,包括各个阶段的处理逻辑、组件交互关系以及异常处理机制。整个系统采用了模块化设计,具有清晰的职责分离和良好的可扩展性。 \ No newline at end of file