refactor(chat): 移除亲和力流模块并将其重构为插件架构

BREAKING CHANGE: 原有的亲和力流相关模块(src/chat/affinity_flow/)已被完全移除,相关功能已重构为插件形式。需要更新配置文件和相关依赖。

- 删除 src/chat/affinity_flow/ 目录下的所有文件
- 将 AFC 管理器功能移至 chatter 插件中实现
- 更新相关导入路径和引用
- 重构关系追踪器和兴趣评分系统的初始化逻辑
- 调整聊天管理器和消息管理器以适应新的插件架构
This commit is contained in:
Windpicker-owo
2025-09-23 13:14:38 +08:00
parent db29ebfeae
commit c9b20aa61a
27 changed files with 511 additions and 761 deletions

View File

@@ -1,10 +0,0 @@
"""
亲和力流模块初始化文件
提供全局的AFC管理器实例
"""
# Avoid importing submodules at package import time to prevent circular imports.
# Consumers should import specific submodules directly, for example:
# from src.chat.affinity_flow.afc_manager import afc_manager
__all__ = ["afc_manager", "AFCManager", "AffinityFlowChatter"]

View File

@@ -1,131 +0,0 @@
"""
亲和力聊天处理流管理器
管理不同聊天流的亲和力聊天处理流,统一获取新消息并分发到对应的亲和力聊天处理流
"""
import time
import traceback
from typing import Dict, Optional, List
from src.chat.planner_actions.action_manager import ActionManager
from src.chat.planner_actions.planner import ActionPlanner
from src.chat.affinity_flow.chatter import AffinityFlowChatter
from src.common.data_models.message_manager_data_model import StreamContext
from src.common.logger import get_logger
logger = get_logger("afc_manager")
class AFCManager:
"""亲和力聊天处理流管理器"""
def __init__(self):
self.affinity_flow_chatters: Dict[str, "AffinityFlowChatter"] = {}
"""所有聊天流的亲和力聊天处理流stream_id -> affinity_flow_chatter"""
# 动作管理器
self.action_manager = ActionManager()
# 管理器统计
self.manager_stats = {
"total_messages_processed": 0,
"total_plans_created": 0,
"total_actions_executed": 0,
"active_chatters": 0,
"last_activity_time": time.time(),
}
def get_or_create_chatter(self, stream_id: str) -> "AffinityFlowChatter":
"""获取或创建聊天流处理器"""
if stream_id not in self.affinity_flow_chatters:
# 创建增强版规划器
planner = ActionPlanner(stream_id, self.action_manager)
chatter = AffinityFlowChatter(stream_id=stream_id, planner=planner, action_manager=self.action_manager)
self.affinity_flow_chatters[stream_id] = chatter
logger.info(f"创建新的亲和力聊天处理器: {stream_id}")
return self.affinity_flow_chatters[stream_id]
async def process_stream_context(self, stream_id: str, context: StreamContext) -> Dict[str, any]:
"""处理StreamContext对象"""
try:
# 获取或创建聊天处理器
chatter = self.get_or_create_chatter(stream_id)
# 处理StreamContext
result = await chatter.process_stream_context(context)
# 更新统计
self.manager_stats["total_messages_processed"] += 1
self.manager_stats["total_actions_executed"] += result.get("executed_count", 0)
self.manager_stats["last_activity_time"] = time.time()
return result
except Exception as e:
logger.error(f"处理StreamContext时出错: {e}\n{traceback.format_exc()}")
return {
"success": False,
"error_message": str(e),
"executed_count": 0,
}
def get_chatter_stats(self, stream_id: str) -> Optional[Dict[str, any]]:
"""获取聊天处理器统计"""
if stream_id in self.affinity_flow_chatters:
return self.affinity_flow_chatters[stream_id].get_stats()
return None
def get_manager_stats(self) -> Dict[str, any]:
"""获取管理器统计"""
stats = self.manager_stats.copy()
stats["active_chatters"] = len(self.affinity_flow_chatters)
return stats
def cleanup_inactive_chatters(self, max_inactive_minutes: int = 60):
"""清理不活跃的聊天处理器"""
current_time = time.time()
max_inactive_seconds = max_inactive_minutes * 60
inactive_streams = []
for stream_id, chatter in self.affinity_flow_chatters.items():
if current_time - chatter.last_activity_time > max_inactive_seconds:
inactive_streams.append(stream_id)
for stream_id in inactive_streams:
del self.affinity_flow_chatters[stream_id]
logger.info(f"清理不活跃聊天处理器: {stream_id}")
def get_planner_stats(self, stream_id: str) -> Optional[Dict[str, any]]:
"""获取规划器统计"""
if stream_id in self.affinity_flow_chatters:
return self.affinity_flow_chatters[stream_id].get_planner_stats()
return None
def get_interest_scoring_stats(self, stream_id: str) -> Optional[Dict[str, any]]:
"""获取兴趣度评分统计"""
if stream_id in self.affinity_flow_chatters:
return self.affinity_flow_chatters[stream_id].get_interest_scoring_stats()
return None
def get_relationship_stats(self, stream_id: str) -> Optional[Dict[str, any]]:
"""获取用户关系统计"""
if stream_id in self.affinity_flow_chatters:
return self.affinity_flow_chatters[stream_id].get_relationship_stats()
return None
def get_user_relationship(self, stream_id: str, user_id: str) -> float:
"""获取用户关系分"""
if stream_id in self.affinity_flow_chatters:
return self.affinity_flow_chatters[stream_id].get_user_relationship(user_id)
return 0.3 # 默认新用户关系分
def update_interest_keywords(self, stream_id: str, new_keywords: dict):
"""更新兴趣关键词"""
if stream_id in self.affinity_flow_chatters:
self.affinity_flow_chatters[stream_id].update_interest_keywords(new_keywords)
logger.info(f"已更新聊天流 {stream_id} 的兴趣关键词: {list(new_keywords.keys())}")
afc_manager = AFCManager()

View File

@@ -1,206 +0,0 @@
"""
亲和力聊天处理器
单个聊天流的处理器,负责处理特定聊天流的完整交互流程
"""
import time
import traceback
from datetime import datetime
from typing import Dict
from src.chat.planner_actions.action_manager import ActionManager
from src.chat.planner_actions.planner import ActionPlanner
from src.common.data_models.message_manager_data_model import StreamContext
from src.plugin_system.base.base_chatter import BaseChatter
from src.plugin_system.base.component_types import ChatMode
from src.common.logger import get_logger
logger = get_logger("affinity_chatter")
class AffinityFlowChatter(BaseChatter):
"""单个亲和力聊天处理器"""
def __init__(self, stream_id: str, planner: ActionPlanner, action_manager: ActionManager):
"""
初始化亲和力聊天处理器
Args:
stream_id: 聊天流ID
planner: 动作规划器
action_manager: 动作管理器
"""
self.stream_id = stream_id
self.planner = planner
self.action_manager = action_manager
# 处理器统计
self.stats = {
"messages_processed": 0,
"plans_created": 0,
"actions_executed": 0,
"successful_executions": 0,
"failed_executions": 0,
}
self.last_activity_time = time.time()
async def execute(self, context: StreamContext) -> dict:
"""
处理StreamContext对象
Args:
context: StreamContext对象包含聊天流的所有消息信息
Returns:
处理结果字典
"""
try:
unread_messages = context.get_unread_messages()
# 使用增强版规划器处理消息
actions, target_message = await self.planner.plan(mode=ChatMode.FOCUS, context=context)
self.stats["plans_created"] += 1
# 执行动作(如果规划器返回了动作)
execution_result = {"executed_count": len(actions) if actions else 0}
if actions:
logger.debug(f"聊天流 {self.stream_id} 生成了 {len(actions)} 个动作")
# 更新统计
self.stats["messages_processed"] += 1
self.stats["actions_executed"] += execution_result.get("executed_count", 0)
self.stats["successful_executions"] += 1
self.last_activity_time = time.time()
result = {
"success": True,
"stream_id": self.stream_id,
"plan_created": True,
"actions_count": len(actions) if actions else 0,
"has_target_message": target_message is not None,
"unread_messages_processed": len(unread_messages),
**execution_result,
}
logger.info(
f"聊天流 {self.stream_id} StreamContext处理成功: 动作数={result['actions_count']}, 未读消息={result['unread_messages_processed']}"
)
return result
except Exception as e:
logger.error(f"亲和力聊天处理器 {self.stream_id} 处理StreamContext时出错: {e}\n{traceback.format_exc()}")
self.stats["failed_executions"] += 1
self.last_activity_time = time.time()
return {
"success": False,
"stream_id": self.stream_id,
"error_message": str(e),
"executed_count": 0,
}
def get_stats(self) -> Dict[str, any]:
"""
获取处理器统计信息
Returns:
统计信息字典
"""
return self.stats.copy()
def get_planner_stats(self) -> Dict[str, any]:
"""
获取规划器统计信息
Returns:
规划器统计信息字典
"""
return self.planner.get_planner_stats()
def get_interest_scoring_stats(self) -> Dict[str, any]:
"""
获取兴趣度评分统计信息
Returns:
兴趣度评分统计信息字典
"""
return self.planner.get_interest_scoring_stats()
def get_relationship_stats(self) -> Dict[str, any]:
"""
获取用户关系统计信息
Returns:
用户关系统计信息字典
"""
return self.planner.get_relationship_stats()
def get_user_relationship(self, user_id: str) -> float:
"""
获取用户关系分
Args:
user_id: 用户ID
Returns:
用户关系分 (0.0-1.0)
"""
return self.planner.get_user_relationship(user_id)
def update_interest_keywords(self, new_keywords: dict):
"""
更新兴趣关键词
Args:
new_keywords: 新的兴趣关键词字典
"""
self.planner.update_interest_keywords(new_keywords)
logger.info(f"聊天流 {self.stream_id} 已更新兴趣关键词: {list(new_keywords.keys())}")
def reset_stats(self):
"""重置统计信息"""
self.stats = {
"messages_processed": 0,
"plans_created": 0,
"actions_executed": 0,
"successful_executions": 0,
"failed_executions": 0,
}
def is_active(self, max_inactive_minutes: int = 60) -> bool:
"""
检查处理器是否活跃
Args:
max_inactive_minutes: 最大不活跃分钟数
Returns:
是否活跃
"""
current_time = time.time()
max_inactive_seconds = max_inactive_minutes * 60
return (current_time - self.last_activity_time) < max_inactive_seconds
def get_activity_time(self) -> float:
"""
获取最后活动时间
Returns:
最后活动时间戳
"""
return self.last_activity_time
def __str__(self) -> str:
"""字符串表示"""
return f"AffinityFlowChatter(stream_id={self.stream_id}, messages={self.stats['messages_processed']})"
def __repr__(self) -> str:
"""详细字符串表示"""
return (
f"AffinityFlowChatter(stream_id={self.stream_id}, "
f"messages_processed={self.stats['messages_processed']}, "
f"plans_created={self.stats['plans_created']}, "
f"last_activity={datetime.fromtimestamp(self.last_activity_time)})"
)

View File

@@ -1,368 +0,0 @@
"""
兴趣度评分系统
基于多维度评分机制,包括兴趣匹配度、用户关系分、提及度和时间因子
现在使用embedding计算智能兴趣匹配
"""
import traceback
from typing import Dict, List, Any
from src.common.data_models.database_data_model import DatabaseMessages
from src.common.data_models.info_data_model import InterestScore
from src.chat.interest_system import bot_interest_manager
from src.common.logger import get_logger
from src.config.config import global_config
logger = get_logger("interest_scoring")
class InterestScoringSystem:
"""兴趣度评分系统"""
def __init__(self):
# 智能兴趣匹配配置
self.use_smart_matching = True
# 从配置加载评分权重
affinity_config = global_config.affinity_flow
self.score_weights = {
"interest_match": affinity_config.keyword_match_weight, # 兴趣匹配度权重
"relationship": affinity_config.relationship_weight, # 关系分权重
"mentioned": affinity_config.mention_bot_weight, # 是否提及bot权重
}
# 评分阈值
self.reply_threshold = affinity_config.reply_action_interest_threshold # 回复动作兴趣阈值
self.mention_threshold = affinity_config.mention_bot_adjustment_threshold # 提及bot后的调整阈值
# 连续不回复概率提升
self.no_reply_count = 0
self.max_no_reply_count = affinity_config.max_no_reply_count
self.probability_boost_per_no_reply = (
affinity_config.no_reply_threshold_adjustment / affinity_config.max_no_reply_count
) # 每次不回复增加的概率
# 用户关系数据
self.user_relationships: Dict[str, float] = {} # user_id -> relationship_score
async def calculate_interest_scores(
self, messages: List[DatabaseMessages], bot_nickname: str
) -> List[InterestScore]:
"""计算消息的兴趣度评分"""
user_messages = [msg for msg in messages if str(msg.user_info.user_id) != str(global_config.bot.qq_account)]
if not user_messages:
return []
logger.info(f"正在为 {len(user_messages)} 条用户消息计算兴趣度...")
scores = []
for i, msg in enumerate(user_messages, 1):
logger.debug(f"[{i}/{len(user_messages)}] 处理消息 ID: {msg.message_id}")
score = await self._calculate_single_message_score(msg, bot_nickname)
scores.append(score)
logger.info(f"{len(scores)} 条消息生成了兴趣度评分。")
return scores
async def _calculate_single_message_score(self, message: DatabaseMessages, bot_nickname: str) -> InterestScore:
"""计算单条消息的兴趣度评分"""
message_preview = f"\033[96m{message.processed_plain_text[:30].replace('\n', ' ')}...\033[0m"
logger.info(f"计算消息 {message.message_id} 的分数 | 内容: {message_preview}")
keywords = self._extract_keywords_from_database(message)
interest_match_score = await self._calculate_interest_match_score(message.processed_plain_text, keywords)
relationship_score = self._calculate_relationship_score(message.user_info.user_id)
mentioned_score = self._calculate_mentioned_score(message, bot_nickname)
total_score = (
interest_match_score * self.score_weights["interest_match"]
+ relationship_score * self.score_weights["relationship"]
+ mentioned_score * self.score_weights["mentioned"]
)
details = {
"interest_match": f"兴趣匹配: {interest_match_score:.3f}",
"relationship": f"关系: {relationship_score:.3f}",
"mentioned": f"提及: {mentioned_score:.3f}",
}
logger.info(
f"消息 {message.message_id} 得分: {total_score:.3f} "
f"(匹配: {interest_match_score:.2f}, 关系: {relationship_score:.2f}, 提及: {mentioned_score:.2f})"
)
return InterestScore(
message_id=message.message_id,
total_score=total_score,
interest_match_score=interest_match_score,
relationship_score=relationship_score,
mentioned_score=mentioned_score,
details=details,
)
async def _calculate_interest_match_score(self, content: str, keywords: List[str] = None) -> float:
"""计算兴趣匹配度 - 使用智能embedding匹配"""
if not content:
return 0.0
# 使用智能匹配embedding
if self.use_smart_matching and bot_interest_manager.is_initialized:
return await self._calculate_smart_interest_match(content, keywords)
else:
# 智能匹配未初始化,返回默认分数
logger.warning("智能兴趣匹配系统未初始化,返回默认分数")
return 0.3
async def _calculate_smart_interest_match(self, content: str, keywords: List[str] = None) -> float:
"""使用embedding计算智能兴趣匹配"""
try:
logger.debug("🧠 开始智能兴趣匹配计算...")
# 如果没有传入关键词,则提取
if not keywords:
logger.debug("🔍 从内容中提取关键词...")
keywords = self._extract_keywords_from_content(content)
logger.debug(f"🏷️ 提取到 {len(keywords)} 个关键词")
# 使用机器人兴趣管理器计算匹配度
logger.debug("🤖 调用机器人兴趣管理器计算匹配度...")
match_result = await bot_interest_manager.calculate_interest_match(content, keywords)
if match_result:
logger.debug("✅ 智能兴趣匹配成功:")
logger.debug(f" 📊 总分: {match_result.overall_score:.3f}")
logger.debug(f" 🏷️ 匹配标签: {match_result.matched_tags}")
logger.debug(f" 🎯 最佳标签: {match_result.top_tag}")
logger.debug(f" 📈 置信度: {match_result.confidence:.3f}")
logger.debug(f" 🔢 匹配详情: {match_result.match_scores}")
# 返回匹配分数,考虑置信度和匹配标签数量
affinity_config = global_config.affinity_flow
match_count_bonus = min(
len(match_result.matched_tags) * affinity_config.match_count_bonus, affinity_config.max_match_bonus
)
final_score = match_result.overall_score * 1.15 * match_result.confidence + match_count_bonus
logger.debug(
f"⚖️ 最终分数计算: 总分({match_result.overall_score:.3f}) × 1.3 × 置信度({match_result.confidence:.3f}) + 标签数量奖励({match_count_bonus:.3f}) = {final_score:.3f}"
)
return final_score
else:
logger.warning("⚠️ 智能兴趣匹配未返回结果")
return 0.0
except Exception as e:
logger.error(f"❌ 智能兴趣匹配计算失败: {e}")
logger.debug("🔍 错误详情:")
logger.debug(f" 💬 内容长度: {len(content)} 字符")
logger.debug(f" 🏷️ 关键词数量: {len(keywords) if keywords else 0}")
return 0.0
def _extract_keywords_from_database(self, message: DatabaseMessages) -> List[str]:
"""从数据库消息中提取关键词"""
keywords = []
# 尝试从 key_words 字段提取存储的是JSON字符串
if message.key_words:
try:
import orjson
keywords = orjson.loads(message.key_words)
if not isinstance(keywords, list):
keywords = []
except (orjson.JSONDecodeError, TypeError):
keywords = []
# 如果没有 keywords尝试从 key_words_lite 提取
if not keywords and message.key_words_lite:
try:
import orjson
keywords = orjson.loads(message.key_words_lite)
if not isinstance(keywords, list):
keywords = []
except (orjson.JSONDecodeError, TypeError):
keywords = []
# 如果还是没有,从消息内容中提取(降级方案)
if not keywords:
keywords = self._extract_keywords_from_content(message.processed_plain_text)
return keywords[:15] # 返回前15个关键词
def _extract_keywords_from_content(self, content: str) -> List[str]:
"""从内容中提取关键词(降级方案)"""
import re
# 清理文本
content = re.sub(r"[^\w\s\u4e00-\u9fff]", " ", content) # 保留中文、英文、数字
words = content.split()
# 过滤和关键词提取
keywords = []
for word in words:
word = word.strip()
if (
len(word) >= 2 # 至少2个字符
and word.isalnum() # 字母数字
and not word.isdigit()
): # 不是纯数字
keywords.append(word.lower())
# 去重并限制数量
unique_keywords = list(set(keywords))
return unique_keywords[:10] # 返回前10个唯一关键词
def _calculate_relationship_score(self, user_id: str) -> float:
"""计算关系分 - 从数据库获取关系分"""
# 优先使用内存中的关系分
if user_id in self.user_relationships:
relationship_value = self.user_relationships[user_id]
return min(relationship_value, 1.0)
# 如果内存中没有,尝试从关系追踪器获取
if hasattr(self, "relationship_tracker") and self.relationship_tracker:
try:
relationship_score = self.relationship_tracker.get_user_relationship_score(user_id)
# 同时更新内存缓存
self.user_relationships[user_id] = relationship_score
return relationship_score
except Exception as e:
logger.warning(f"从关系追踪器获取关系分失败: {e}")
else:
# 尝试从全局关系追踪器获取
try:
from src.chat.affinity_flow.relationship_integration import get_relationship_tracker
global_tracker = get_relationship_tracker()
if global_tracker:
relationship_score = global_tracker.get_user_relationship_score(user_id)
# 同时更新内存缓存
self.user_relationships[user_id] = relationship_score
return relationship_score
except Exception as e:
logger.warning(f"从全局关系追踪器获取关系分失败: {e}")
# 默认新用户的基础分
return global_config.affinity_flow.base_relationship_score
def _calculate_mentioned_score(self, msg: DatabaseMessages, bot_nickname: str) -> float:
"""计算提及分数"""
if not msg.processed_plain_text:
return 0.0
# 检查是否被提及
bot_aliases = [bot_nickname] + global_config.bot.alias_names
is_mentioned = msg.is_mentioned or any(alias in msg.processed_plain_text for alias in bot_aliases if alias)
# 如果被提及或是私聊都视为提及了bot
if is_mentioned or not hasattr(msg, "chat_info_group_id"):
return global_config.affinity_flow.mention_bot_interest_score
return 0.0
def should_reply(self, score: InterestScore, message: "DatabaseMessages") -> bool:
"""判断是否应该回复"""
message_preview = f"\033[96m{(message.processed_plain_text or 'N/A')[:50].replace('\n', ' ')}\033[0m"
logger.info(f"评估消息 {score.message_id} (得分: {score.total_score:.3f}) | 内容: '{message_preview}...'")
base_threshold = self.reply_threshold
# 如果被提及,降低阈值
if score.mentioned_score >= global_config.affinity_flow.mention_bot_adjustment_threshold:
base_threshold = self.mention_threshold
logger.debug(f"机器人被提及, 使用较低阈值: {base_threshold:.3f}")
# 计算连续不回复的概率提升
probability_boost = min(self.no_reply_count * self.probability_boost_per_no_reply, 0.8)
effective_threshold = base_threshold - probability_boost
logger.debug(
f"基础阈值: {base_threshold:.3f}, 不回复提升: {probability_boost:.3f}, 有效阈值: {effective_threshold:.3f}"
)
# 做出决策
should_reply = score.total_score >= effective_threshold
decision = "✅ 回复" if should_reply else "❌ 不回复"
logger.info(f"回复决策: {decision} (分数: {score.total_score:.3f} {' >=' if should_reply else ' <'} 阈值: {effective_threshold:.3f})")
return should_reply, score.total_score
def record_reply_action(self, did_reply: bool):
"""记录回复动作"""
old_count = self.no_reply_count
if did_reply:
self.no_reply_count = max(0, self.no_reply_count - global_config.affinity_flow.reply_cooldown_reduction)
action = "回复"
else:
self.no_reply_count += 1
action = "不回复"
# 限制最大计数
self.no_reply_count = min(self.no_reply_count, self.max_no_reply_count)
logger.info(f"记录动作: {action} | 连续不回复次数: {old_count} -> {self.no_reply_count}")
logger.debug(f"📋 最大限制: {self.max_no_reply_count}")
def update_user_relationship(self, user_id: str, relationship_change: float):
"""更新用户关系"""
old_score = self.user_relationships.get(
user_id, global_config.affinity_flow.base_relationship_score
) # 默认新用户分数
new_score = max(0.0, min(1.0, old_score + relationship_change))
self.user_relationships[user_id] = new_score
change_direction = "📈" if relationship_change > 0 else "📉" if relationship_change < 0 else ""
logger.info(f"{change_direction} 更新用户关系: {user_id}")
logger.info(f"💝 关系分: {old_score:.3f}{new_score:.3f} (变化: {relationship_change:+.3f})")
logger.debug(f"👥 当前追踪用户数: {len(self.user_relationships)}")
def get_user_relationship(self, user_id: str) -> float:
"""获取用户关系分"""
return self.user_relationships.get(user_id, 0.3)
def get_scoring_stats(self) -> Dict:
"""获取评分系统统计"""
return {
"no_reply_count": self.no_reply_count,
"max_no_reply_count": self.max_no_reply_count,
"reply_threshold": self.reply_threshold,
"mention_threshold": self.mention_threshold,
"user_relationships": len(self.user_relationships),
}
def reset_stats(self):
"""重置统计信息"""
self.no_reply_count = 0
logger.info("重置兴趣度评分系统统计")
async def initialize_smart_interests(self, personality_description: str, personality_id: str = "default"):
"""初始化智能兴趣系统"""
try:
logger.info("开始初始化智能兴趣系统...")
logger.info(f"人设ID: {personality_id}, 描述长度: {len(personality_description)}")
await bot_interest_manager.initialize(personality_description, personality_id)
logger.info("智能兴趣系统初始化完成。")
# 显示初始化后的统计信息
stats = bot_interest_manager.get_interest_stats()
logger.info(
f"兴趣系统统计: 总标签={stats.get('total_tags', 0)}, "
f"缓存大小={stats.get('cache_size', 0)}, "
f"模型='{stats.get('embedding_model', '未知')}'"
)
except Exception as e:
logger.error(f"初始化智能兴趣系统失败: {e}")
traceback.print_exc()
def get_matching_config(self) -> Dict[str, Any]:
"""获取匹配配置信息"""
return {
"use_smart_matching": self.use_smart_matching,
"smart_system_initialized": bot_interest_manager.is_initialized,
"smart_system_stats": bot_interest_manager.get_interest_stats()
if bot_interest_manager.is_initialized
else None,
}
# 创建全局兴趣评分系统实例
interest_scoring_system = InterestScoringSystem()

View File

@@ -1,68 +0,0 @@
"""
回复后关系追踪集成初始化脚本
此脚本用于设置回复后关系追踪系统的全局变量和初始化连接
确保各组件能正确协同工作
"""
from src.chat.affinity_flow.relationship_tracker import UserRelationshipTracker
from src.chat.affinity_flow.interest_scoring import interest_scoring_system
from src.common.logger import get_logger
logger = get_logger("relationship_integration")
# 全局关系追踪器实例
relationship_tracker = None
def initialize_relationship_tracking():
"""初始化关系追踪系统"""
global relationship_tracker
try:
logger.info("🚀 初始化回复后关系追踪系统...")
# 创建关系追踪器实例
relationship_tracker = UserRelationshipTracker(interest_scoring_system=interest_scoring_system)
# 设置兴趣度评分系统的关系追踪器引用
interest_scoring_system.relationship_tracker = relationship_tracker
logger.info("✅ 回复后关系追踪系统初始化完成")
logger.info("📋 系统功能:")
logger.info(" 🔄 自动回复后关系追踪")
logger.info(" 💾 数据库持久化存储")
logger.info(" 🧠 LLM智能关系分析")
logger.info(" ⏰ 5分钟追踪间隔")
logger.info(" 🎯 兴趣度评分集成")
return relationship_tracker
except Exception as e:
logger.error(f"❌ 关系追踪系统初始化失败: {e}")
logger.debug("错误详情:", exc_info=True)
return None
def get_relationship_tracker():
"""获取全局关系追踪器实例"""
global relationship_tracker
return relationship_tracker
def setup_plan_executor_relationship_tracker(plan_executor):
"""为PlanExecutor设置关系追踪器"""
global relationship_tracker
if relationship_tracker and plan_executor:
plan_executor.set_relationship_tracker(relationship_tracker)
logger.info("✅ PlanExecutor关系追踪器设置完成")
return True
logger.warning("⚠️ 无法设置PlanExecutor关系追踪器")
return False
# 自动初始化
if __name__ == "__main__":
initialize_relationship_tracking()

View File

@@ -1,683 +0,0 @@
"""
用户关系追踪器
负责追踪用户交互历史并通过LLM分析更新用户关系分
支持数据库持久化存储和回复后自动关系更新
"""
import time
from typing import Dict, List, Optional
from src.common.logger import get_logger
from src.config.config import model_config, global_config
from src.llm_models.utils_model import LLMRequest
from src.common.database.sqlalchemy_database_api import get_db_session
from src.common.database.sqlalchemy_models import UserRelationships, Messages
from sqlalchemy import select, desc
from src.common.data_models.database_data_model import DatabaseMessages
logger = get_logger("relationship_tracker")
class UserRelationshipTracker:
"""用户关系追踪器"""
def __init__(self, interest_scoring_system=None):
self.tracking_users: Dict[str, Dict] = {} # user_id -> interaction_data
self.max_tracking_users = 3
self.update_interval_minutes = 30
self.last_update_time = time.time()
self.relationship_history: List[Dict] = []
self.interest_scoring_system = interest_scoring_system
# 数据库访问 - 使用SQLAlchemy
pass
# 用户关系缓存 (user_id -> {"relationship_text": str, "relationship_score": float, "last_tracked": float})
self.user_relationship_cache: Dict[str, Dict] = {}
self.cache_expiry_hours = 1 # 缓存过期时间(小时)
# 关系更新LLM
try:
self.relationship_llm = LLMRequest(
model_set=model_config.model_task_config.relationship_tracker, request_type="relationship_tracker"
)
except AttributeError:
# 如果relationship_tracker配置不存在尝试其他可用的模型配置
available_models = [
attr
for attr in dir(model_config.model_task_config)
if not attr.startswith("_") and attr != "model_dump"
]
if available_models:
# 使用第一个可用的模型配置
fallback_model = available_models[0]
logger.warning(f"relationship_tracker model configuration not found, using fallback: {fallback_model}")
self.relationship_llm = LLMRequest(
model_set=getattr(model_config.model_task_config, fallback_model),
request_type="relationship_tracker",
)
else:
# 如果没有任何模型配置创建一个简单的LLMRequest
logger.warning("No model configurations found, creating basic LLMRequest")
self.relationship_llm = LLMRequest(
model_set="gpt-3.5-turbo", # 默认模型
request_type="relationship_tracker",
)
def set_interest_scoring_system(self, interest_scoring_system):
"""设置兴趣度评分系统引用"""
self.interest_scoring_system = interest_scoring_system
def add_interaction(self, user_id: str, user_name: str, user_message: str, bot_reply: str, reply_timestamp: float):
"""添加用户交互记录"""
if len(self.tracking_users) >= self.max_tracking_users:
# 移除最旧的记录
oldest_user = min(
self.tracking_users.keys(), key=lambda k: self.tracking_users[k].get("reply_timestamp", 0)
)
del self.tracking_users[oldest_user]
# 获取当前关系分
current_relationship_score = global_config.affinity_flow.base_relationship_score # 默认值
if self.interest_scoring_system:
current_relationship_score = self.interest_scoring_system.get_user_relationship(user_id)
self.tracking_users[user_id] = {
"user_id": user_id,
"user_name": user_name,
"user_message": user_message,
"bot_reply": bot_reply,
"reply_timestamp": reply_timestamp,
"current_relationship_score": current_relationship_score,
}
logger.debug(f"添加用户交互追踪: {user_id}")
async def check_and_update_relationships(self) -> List[Dict]:
"""检查并更新用户关系"""
current_time = time.time()
if current_time - self.last_update_time < self.update_interval_minutes * 60:
return []
updates = []
for user_id, interaction in list(self.tracking_users.items()):
if current_time - interaction["reply_timestamp"] > 60 * 5: # 5分钟
update = await self._update_user_relationship(interaction)
if update:
updates.append(update)
del self.tracking_users[user_id]
self.last_update_time = current_time
return updates
async def _update_user_relationship(self, interaction: Dict) -> Optional[Dict]:
"""更新单个用户的关系"""
try:
# 获取bot人设信息
from src.individuality.individuality import Individuality
individuality = Individuality()
bot_personality = await individuality.get_personality_block()
prompt = f"""
你现在是一个有着特定性格和身份的AI助手。你的人设是{bot_personality}
请以你独特的性格视角,严格按现实逻辑分析以下用户交互,更新用户关系:
用户ID: {interaction["user_id"]}
用户名: {interaction["user_name"]}
用户消息: {interaction["user_message"]}
你的回复: {interaction["bot_reply"]}
当前关系分: {interaction["current_relationship_score"]}
【重要】关系分数档次定义:
- 0.0-0.2:陌生人/初次认识 - 仅礼貌性交流
- 0.2-0.4:普通网友 - 有基本互动但不熟悉
- 0.4-0.6:熟悉网友 - 经常交流,有一定了解
- 0.6-0.8:朋友 - 可以分享心情,互相关心
- 0.8-1.0:好朋友/知己 - 深度信任,亲密无间
【严格要求】:
1. 加分必须符合现实关系发展逻辑 - 不能因为对方态度好就盲目加分到不符合当前关系档次的分数
2. 关系提升需要足够的互动积累和时间验证
3. 即使是朋友关系单次互动加分通常不超过0.05-0.1
4. 关系描述要详细具体,包括:
- 用户性格特点观察
- 印象深刻的互动记忆
- 你们关系的具体状态描述
根据你的人设性格,思考:
1. 以你的性格,你会如何看待这次互动?
2. 用户的行为是否符合你性格的喜好?
3. 这次互动是否真的让你们的关系提升了一个档次?为什么?
4. 有什么特别值得记住的互动细节?
请以JSON格式返回更新结果
{{
"new_relationship_score": 0.0~1.0的数值(必须符合现实逻辑),
"reasoning": "从你的性格角度说明更新理由,重点说明是否符合现实关系发展逻辑",
"interaction_summary": "基于你性格的交互总结,包含印象深刻的互动记忆"
}}
"""
llm_response, _ = await self.relationship_llm.generate_response_async(prompt=prompt)
if llm_response:
import json
try:
# 清理LLM响应移除可能的格式标记
cleaned_response = self._clean_llm_json_response(llm_response)
response_data = json.loads(cleaned_response)
new_score = max(
0.0,
min(
1.0,
float(
response_data.get(
"new_relationship_score", global_config.affinity_flow.base_relationship_score
)
),
),
)
if self.interest_scoring_system:
self.interest_scoring_system.update_user_relationship(
interaction["user_id"], new_score - interaction["current_relationship_score"]
)
return {
"user_id": interaction["user_id"],
"new_relationship_score": new_score,
"reasoning": response_data.get("reasoning", ""),
"interaction_summary": response_data.get("interaction_summary", ""),
}
except json.JSONDecodeError as e:
logger.error(f"LLM响应JSON解析失败: {e}")
logger.debug(f"LLM原始响应: {llm_response}")
except Exception as e:
logger.error(f"处理关系更新数据失败: {e}")
except Exception as e:
logger.error(f"更新用户关系时出错: {e}")
return None
def get_tracking_users(self) -> Dict[str, Dict]:
"""获取正在追踪的用户"""
return self.tracking_users.copy()
def get_user_interaction(self, user_id: str) -> Optional[Dict]:
"""获取特定用户的交互记录"""
return self.tracking_users.get(user_id)
def remove_user_tracking(self, user_id: str):
"""移除用户追踪"""
if user_id in self.tracking_users:
del self.tracking_users[user_id]
logger.debug(f"移除用户追踪: {user_id}")
def clear_all_tracking(self):
"""清空所有追踪"""
self.tracking_users.clear()
logger.info("清空所有用户追踪")
def get_relationship_history(self) -> List[Dict]:
"""获取关系历史记录"""
return self.relationship_history.copy()
def add_to_history(self, relationship_update: Dict):
"""添加到关系历史"""
self.relationship_history.append({**relationship_update, "update_time": time.time()})
# 限制历史记录数量
if len(self.relationship_history) > 100:
self.relationship_history = self.relationship_history[-100:]
def get_tracker_stats(self) -> Dict:
"""获取追踪器统计"""
return {
"tracking_users": len(self.tracking_users),
"max_tracking_users": self.max_tracking_users,
"update_interval_minutes": self.update_interval_minutes,
"relationship_history": len(self.relationship_history),
"last_update_time": self.last_update_time,
}
def update_config(self, max_tracking_users: int = None, update_interval_minutes: int = None):
"""更新配置"""
if max_tracking_users is not None:
self.max_tracking_users = max_tracking_users
logger.info(f"更新最大追踪用户数: {max_tracking_users}")
if update_interval_minutes is not None:
self.update_interval_minutes = update_interval_minutes
logger.info(f"更新关系更新间隔: {update_interval_minutes} 分钟")
def force_update_relationship(self, user_id: str, new_score: float, reasoning: str = ""):
"""强制更新用户关系分"""
if user_id in self.tracking_users:
current_score = self.tracking_users[user_id]["current_relationship_score"]
if self.interest_scoring_system:
self.interest_scoring_system.update_user_relationship(user_id, new_score - current_score)
update_info = {
"user_id": user_id,
"new_relationship_score": new_score,
"reasoning": reasoning or "手动更新",
"interaction_summary": "手动更新关系分",
}
self.add_to_history(update_info)
logger.info(f"强制更新用户关系: {user_id} -> {new_score:.2f}")
def get_user_summary(self, user_id: str) -> Dict:
"""获取用户交互总结"""
if user_id not in self.tracking_users:
return {}
interaction = self.tracking_users[user_id]
return {
"user_id": user_id,
"user_name": interaction["user_name"],
"current_relationship_score": interaction["current_relationship_score"],
"interaction_count": 1, # 简化版本,每次追踪只记录一次交互
"last_interaction": interaction["reply_timestamp"],
"recent_message": interaction["user_message"][:100] + "..."
if len(interaction["user_message"]) > 100
else interaction["user_message"],
}
# ===== 数据库支持方法 =====
def get_user_relationship_score(self, user_id: str) -> float:
"""获取用户关系分"""
# 先检查缓存
if user_id in self.user_relationship_cache:
cache_data = self.user_relationship_cache[user_id]
# 检查缓存是否过期
cache_time = cache_data.get("last_tracked", 0)
if time.time() - cache_time < self.cache_expiry_hours * 3600:
return cache_data.get("relationship_score", global_config.affinity_flow.base_relationship_score)
# 缓存过期或不存在,从数据库获取
relationship_data = self._get_user_relationship_from_db(user_id)
if relationship_data:
# 更新缓存
self.user_relationship_cache[user_id] = {
"relationship_text": relationship_data.get("relationship_text", ""),
"relationship_score": relationship_data.get(
"relationship_score", global_config.affinity_flow.base_relationship_score
),
"last_tracked": time.time(),
}
return relationship_data.get("relationship_score", global_config.affinity_flow.base_relationship_score)
# 数据库中也没有,返回默认值
return global_config.affinity_flow.base_relationship_score
def _get_user_relationship_from_db(self, user_id: str) -> Optional[Dict]:
"""从数据库获取用户关系数据"""
try:
with get_db_session() as session:
# 查询用户关系表
stmt = select(UserRelationships).where(UserRelationships.user_id == user_id)
result = session.execute(stmt).scalar_one_or_none()
if result:
return {
"relationship_text": result.relationship_text or "",
"relationship_score": float(result.relationship_score)
if result.relationship_score is not None
else 0.3,
"last_updated": result.last_updated,
}
except Exception as e:
logger.error(f"从数据库获取用户关系失败: {e}")
return None
def _update_user_relationship_in_db(self, user_id: str, relationship_text: str, relationship_score: float):
"""更新数据库中的用户关系"""
try:
current_time = time.time()
with get_db_session() as session:
# 检查是否已存在关系记录
existing = session.execute(
select(UserRelationships).where(UserRelationships.user_id == user_id)
).scalar_one_or_none()
if existing:
# 更新现有记录
existing.relationship_text = relationship_text
existing.relationship_score = relationship_score
existing.last_updated = current_time
existing.user_name = existing.user_name or user_id # 更新用户名如果为空
else:
# 插入新记录
new_relationship = UserRelationships(
user_id=user_id,
user_name=user_id,
relationship_text=relationship_text,
relationship_score=relationship_score,
last_updated=current_time,
)
session.add(new_relationship)
session.commit()
logger.info(f"已更新数据库中用户关系: {user_id} -> 分数: {relationship_score:.3f}")
except Exception as e:
logger.error(f"更新数据库用户关系失败: {e}")
# ===== 回复后关系追踪方法 =====
async def track_reply_relationship(
self, user_id: str, user_name: str, bot_reply_content: str, reply_timestamp: float
):
"""回复后关系追踪 - 主要入口点"""
try:
logger.info(f"🔄 开始回复后关系追踪: {user_id}")
# 检查上次追踪时间
last_tracked_time = self._get_last_tracked_time(user_id)
time_diff = reply_timestamp - last_tracked_time
if time_diff < 5 * 60: # 5分钟内不重复追踪
logger.debug(f"用户 {user_id} 距离上次追踪时间不足5分钟跳过")
return
# 获取上次bot回复该用户的消息
last_bot_reply = await self._get_last_bot_reply_to_user(user_id)
if not last_bot_reply:
logger.debug(f"未找到上次回复用户 {user_id} 的记录")
return
# 获取用户后续的反应消息
user_reactions = await self._get_user_reactions_after_reply(user_id, last_bot_reply.time)
# 获取当前关系数据
current_relationship = self._get_user_relationship_from_db(user_id)
current_score = (
current_relationship.get("relationship_score", global_config.affinity_flow.base_relationship_score)
if current_relationship
else global_config.affinity_flow.base_relationship_score
)
current_text = current_relationship.get("relationship_text", "新用户") if current_relationship else "新用户"
# 使用LLM分析并更新关系
await self._analyze_and_update_relationship(
user_id, user_name, last_bot_reply, user_reactions, current_text, current_score, bot_reply_content
)
except Exception as e:
logger.error(f"回复后关系追踪失败: {e}")
logger.debug("错误详情:", exc_info=True)
def _get_last_tracked_time(self, user_id: str) -> float:
"""获取上次追踪时间"""
# 先检查缓存
if user_id in self.user_relationship_cache:
return self.user_relationship_cache[user_id].get("last_tracked", 0)
# 从数据库获取
relationship_data = self._get_user_relationship_from_db(user_id)
if relationship_data:
return relationship_data.get("last_updated", 0)
return 0
async def _get_last_bot_reply_to_user(self, user_id: str) -> Optional[DatabaseMessages]:
"""获取上次bot回复该用户的消息"""
try:
with get_db_session() as session:
# 查询bot回复给该用户的最新消息
stmt = (
select(Messages)
.where(Messages.user_id == user_id)
.where(Messages.reply_to.isnot(None))
.order_by(desc(Messages.time))
.limit(1)
)
result = session.execute(stmt).scalar_one_or_none()
if result:
# 将SQLAlchemy模型转换为DatabaseMessages对象
return self._sqlalchemy_to_database_messages(result)
except Exception as e:
logger.error(f"获取上次回复消息失败: {e}")
return None
async def _get_user_reactions_after_reply(self, user_id: str, reply_time: float) -> List[DatabaseMessages]:
"""获取用户在bot回复后的反应消息"""
try:
with get_db_session() as session:
# 查询用户在回复时间之后的5分钟内的消息
end_time = reply_time + 5 * 60 # 5分钟
stmt = (
select(Messages)
.where(Messages.user_id == user_id)
.where(Messages.time > reply_time)
.where(Messages.time <= end_time)
.order_by(Messages.time)
)
results = session.execute(stmt).scalars().all()
if results:
return [self._sqlalchemy_to_database_messages(result) for result in results]
except Exception as e:
logger.error(f"获取用户反应消息失败: {e}")
return []
def _sqlalchemy_to_database_messages(self, sqlalchemy_message) -> DatabaseMessages:
"""将SQLAlchemy消息模型转换为DatabaseMessages对象"""
try:
return DatabaseMessages(
message_id=sqlalchemy_message.message_id or "",
time=float(sqlalchemy_message.time) if sqlalchemy_message.time is not None else 0.0,
chat_id=sqlalchemy_message.chat_id or "",
reply_to=sqlalchemy_message.reply_to,
processed_plain_text=sqlalchemy_message.processed_plain_text or "",
user_id=sqlalchemy_message.user_id or "",
user_nickname=sqlalchemy_message.user_nickname or "",
user_platform=sqlalchemy_message.user_platform or "",
)
except Exception as e:
logger.error(f"SQLAlchemy消息转换失败: {e}")
# 返回一个基本的消息对象
return DatabaseMessages(
message_id="",
time=0.0,
chat_id="",
processed_plain_text="",
user_id="",
user_nickname="",
user_platform="",
)
async def _analyze_and_update_relationship(
self,
user_id: str,
user_name: str,
last_bot_reply: DatabaseMessages,
user_reactions: List[DatabaseMessages],
current_text: str,
current_score: float,
current_reply: str,
):
"""使用LLM分析并更新用户关系"""
try:
# 构建分析提示
user_reactions_text = "\n".join([f"- {msg.processed_plain_text}" for msg in user_reactions])
# 获取bot人设信息
from src.individuality.individuality import Individuality
individuality = Individuality()
bot_personality = await individuality.get_personality_block()
prompt = f"""
你现在是一个有着特定性格和身份的AI助手。你的人设是{bot_personality}
请以你独特的性格视角,严格按现实逻辑分析以下用户交互,更新用户关系印象和分数:
用户信息:
- 用户ID: {user_id}
- 用户名: {user_name}
你上次的回复: {last_bot_reply.processed_plain_text}
用户反应消息:
{user_reactions_text}
你当前的回复: {current_reply}
当前关系印象: {current_text}
当前关系分数: {current_score:.3f}
【重要】关系分数档次定义:
- 0.0-0.2:陌生人/初次认识 - 仅礼貌性交流
- 0.2-0.4:普通网友 - 有基本互动但不熟悉
- 0.4-0.6:熟悉网友 - 经常交流,有一定了解
- 0.6-0.8:朋友 - 可以分享心情,互相关心
- 0.8-1.0:好朋友/知己 - 深度信任,亲密无间
【严格要求】:
1. 加分必须符合现实关系发展逻辑 - 不能因为用户反应好就盲目加分
2. 关系提升需要足够的互动积累和时间验证单次互动加分通常不超过0.05-0.1
3. 必须考虑当前关系档次不能跳跃式提升比如从0.3直接到0.7
4. 关系印象描述要详细具体100-200字包括
- 用户性格特点和交流风格观察
- 印象深刻的互动记忆和对话片段
- 你们关系的具体状态描述和发展阶段
- 根据你的性格,你对用户的真实感受
性格视角深度分析:
1. 以你的性格特点,用户这次的反应给你什么感受?
2. 用户的情绪和行为符合你性格的喜好吗?具体哪些方面?
3. 从现实角度看,这次互动是否足以让关系提升到下一个档次?为什么?
4. 有什么特别值得记住的互动细节或对话内容?
5. 基于你们的互动历史,用户给你留下了哪些深刻印象?
请以JSON格式返回更新结果:
{{
"relationship_text": "详细的关系印象描述(100-200字),包含用户性格观察、印象深刻记忆、关系状态描述",
"relationship_score": 0.0~1.0的新分数(必须严格符合现实逻辑),
"analysis_reasoning": "从你性格角度的深度分析,重点说明分数调整的现实合理性",
"interaction_quality": "high/medium/low"
}}
"""
# 调用LLM进行分析
llm_response, _ = await self.relationship_llm.generate_response_async(prompt=prompt)
if llm_response:
import json
try:
# 清理LLM响应移除可能的格式标记
cleaned_response = self._clean_llm_json_response(llm_response)
response_data = json.loads(cleaned_response)
new_text = response_data.get("relationship_text", current_text)
new_score = max(0.0, min(1.0, float(response_data.get("relationship_score", current_score))))
reasoning = response_data.get("analysis_reasoning", "")
quality = response_data.get("interaction_quality", "medium")
# 更新数据库
self._update_user_relationship_in_db(user_id, new_text, new_score)
# 更新缓存
self.user_relationship_cache[user_id] = {
"relationship_text": new_text,
"relationship_score": new_score,
"last_tracked": time.time(),
}
# 如果有兴趣度评分系统,也更新内存中的关系分
if self.interest_scoring_system:
self.interest_scoring_system.update_user_relationship(user_id, new_score - current_score)
# 记录分析历史
analysis_record = {
"user_id": user_id,
"timestamp": time.time(),
"old_score": current_score,
"new_score": new_score,
"old_text": current_text,
"new_text": new_text,
"reasoning": reasoning,
"quality": quality,
"user_reactions_count": len(user_reactions),
}
self.relationship_history.append(analysis_record)
# 限制历史记录数量
if len(self.relationship_history) > 100:
self.relationship_history = self.relationship_history[-100:]
logger.info(f"✅ 关系分析完成: {user_id}")
logger.info(f" 📝 印象: '{current_text}' -> '{new_text}'")
logger.info(f" 💝 分数: {current_score:.3f} -> {new_score:.3f}")
logger.info(f" 🎯 质量: {quality}")
except json.JSONDecodeError as e:
logger.error(f"LLM响应JSON解析失败: {e}")
logger.debug(f"LLM原始响应: {llm_response}")
else:
logger.warning("LLM未返回有效响应")
except Exception as e:
logger.error(f"关系分析失败: {e}")
logger.debug("错误详情:", exc_info=True)
def _clean_llm_json_response(self, response: str) -> str:
"""
清理LLM响应移除可能的JSON格式标记
Args:
response: LLM原始响应
Returns:
清理后的JSON字符串
"""
try:
import re
# 移除常见的JSON格式标记
cleaned = response.strip()
# 移除 ```json 或 ``` 等标记
cleaned = re.sub(r"^```(?:json)?\s*", "", cleaned, flags=re.MULTILINE | re.IGNORECASE)
cleaned = re.sub(r"\s*```$", "", cleaned, flags=re.MULTILINE)
# 移除可能的Markdown代码块标记
cleaned = re.sub(r"^`|`$", "", cleaned, flags=re.MULTILINE)
# 尝试找到JSON对象的开始和结束
json_start = cleaned.find("{")
json_end = cleaned.rfind("}")
if json_start != -1 and json_end != -1 and json_end > json_start:
# 提取JSON部分
cleaned = cleaned[json_start : json_end + 1]
# 移除多余的空白字符
cleaned = cleaned.strip()
logger.debug(f"LLM响应清理: 原始长度={len(response)}, 清理后长度={len(cleaned)}")
if cleaned != response:
logger.debug(f"清理前: {response[:200]}...")
logger.debug(f"清理后: {cleaned[:200]}...")
return cleaned
except Exception as e:
logger.warning(f"清理LLM响应失败: {e}")
return response # 清理失败时返回原始响应

View File

@@ -2,15 +2,15 @@ from typing import Dict, List, Optional, Any
import time
from src.plugin_system.base.base_chatter import BaseChatter
from src.common.data_models.message_manager_data_model import StreamContext
from src.chat.planner_actions.planner import ActionPlanner
from src.chat.planner_actions.action_manager import ActionManager
from src.plugins.built_in.chatter.planner import ChatterActionPlanner as ActionPlanner
from src.chat.planner_actions.action_manager import ChatterActionManager
from src.plugin_system.base.component_types import ChatType, ComponentType
from src.common.logger import get_logger
logger = get_logger("chatter_manager")
class ChatterManager:
def __init__(self, action_manager: ActionManager):
def __init__(self, action_manager: ChatterActionManager):
self.action_manager = action_manager
self.chatter_classes: Dict[ChatType, List[type]] = {}
self.instances: Dict[str, BaseChatter] = {}

View File

@@ -21,7 +21,7 @@ from datetime import datetime
from typing import Dict, Optional
from src.common.logger import get_logger
from src.chat.affinity_flow.afc_manager import afc_manager
# AFC manager has been moved to chatter plugin
# TODO: 需要重新实现主动思考和睡眠管理功能
from .analyzer import chat_frequency_analyzer
@@ -61,8 +61,9 @@ class FrequencyBasedTrigger:
# continue
# 2. 获取所有已知的聊天ID
# 亲和力流系统中聊天ID直接从管理器获取
all_chat_ids = list(afc_manager.affinity_flow_chatters.keys())
# 注意AFC管理器已移至chatter插件此功能暂时禁用
# all_chat_ids = list(afc_manager.affinity_flow_chatters.keys())
all_chat_ids = [] # 暂时禁用此功能
if not all_chat_ids:
continue
@@ -77,26 +78,10 @@ class FrequencyBasedTrigger:
# 4. 检查当前是否是该用户的高峰聊天时间
if chat_frequency_analyzer.is_in_peak_time(chat_id, now):
# 5. 检查用户当前是否已有活跃的处理任务
# 亲和力流系统不直接提供循环状态,通过检查最后活动时间来判断是否忙碌
chatter = afc_manager.get_or_create_chatter(chat_id)
if not chatter:
logger.warning(f"无法为 {chat_id} 获取或创建亲和力聊天处理器。")
continue
# 检查是否在活跃状态最近1分钟内有活动
current_time = time.time()
if current_time - chatter.get_activity_time() < 60:
logger.debug(f"用户 {chat_id} 的亲和力处理器正忙,本次不触发。")
continue
logger.info(f"检测到用户 {chat_id} 处于聊天高峰期,且处理器空闲,准备触发主动思考。")
# 6. TODO: 亲和力流系统的主动思考机制需要另行实现
# 目前先记录日志,等待后续实现
logger.info(f"用户 {chat_id} 处于高峰期,但亲和力流的主动思考功能暂未实现")
# 7. 更新触发时间,进入冷却
self._last_triggered[chat_id] = time.time()
# 注意AFC管理器已移至chatter插件此功能暂时禁用
# chatter = afc_manager.get_or_create_chatter(chat_id)
logger.info(f"检测到用户 {chat_id} 处于聊天高峰期但AFC功能已移至chatter插件")
continue
except asyncio.CancelledError:
logger.info("频率触发器任务被取消。")

View File

@@ -12,7 +12,7 @@ from src.common.logger import get_logger
from src.common.data_models.database_data_model import DatabaseMessages
from src.common.data_models.message_manager_data_model import StreamContext, MessageManagerStats, StreamStats
from src.chat.chatter_manager import ChatterManager
from src.chat.planner_actions.action_manager import ActionManager
from src.chat.planner_actions.action_manager import ChatterActionManager
if TYPE_CHECKING:
from src.common.data_models.message_manager_data_model import StreamContext
@@ -33,7 +33,7 @@ class MessageManager:
self.stats = MessageManagerStats()
# 初始化chatter manager
self.action_manager = ActionManager()
self.action_manager = ChatterActionManager()
self.chatter_manager = ChatterManager(self.action_manager)
async def start(self):

View File

@@ -18,7 +18,7 @@ from src.plugin_system.apis import generator_api, database_api, send_api, messag
logger = get_logger("action_manager")
class ActionManager:
class ChatterActionManager:
"""
动作管理器,用于管理各种类型的动作
@@ -34,7 +34,7 @@ class ActionManager:
# 初始化时将默认动作加载到使用中的动作
self._using_actions = component_registry.get_default_actions()
self.log_prefix: str = "ActionManager"
self.log_prefix: str = "ChatterActionManager"
# === 执行Action方法 ===
@@ -449,7 +449,7 @@ class ActionManager:
data = "".join(map(str, data))
reply_text += data
# 如果是主动思考且内容为沉默,则不发送
# 如果是主动思考且内容为"沉默",则不发送
if is_proactive_thinking and data.strip() == "沉默":
logger.info(f"{self.log_prefix} 主动思考决定保持沉默,不发送消息")
continue
@@ -474,4 +474,4 @@ class ActionManager:
typing=True,
)
return reply_text
return reply_text

View File

@@ -8,7 +8,7 @@ from src.common.logger import get_logger
from src.config.config import global_config, model_config
from src.llm_models.utils_model import LLMRequest
from src.chat.message_receive.chat_stream import get_chat_manager, ChatMessageContext
from src.chat.planner_actions.action_manager import ActionManager
from src.chat.planner_actions.action_manager import ChatterActionManager
from src.chat.utils.chat_message_builder import get_raw_msg_before_timestamp_with_chat, build_readable_messages
from src.plugin_system.base.component_types import ActionInfo, ActionActivationType
from src.plugin_system.core.global_announcement_manager import global_announcement_manager
@@ -27,7 +27,7 @@ class ActionModifier:
支持并行判定和智能缓存优化。
"""
def __init__(self, action_manager: ActionManager, chat_id: str):
def __init__(self, action_manager: ChatterActionManager, chat_id: str):
"""初始化动作处理器"""
self.chat_id = chat_id
self.chat_stream: ChatStream = get_chat_manager().get_stream(self.chat_id) # type: ignore

View File

@@ -1,363 +0,0 @@
"""
PlanExecutor: 接收 Plan 对象并执行其中的所有动作。
集成用户关系追踪机制,自动记录交互并更新关系。
"""
import asyncio
import re
import time
from typing import Dict, List
from src.config.config import global_config
from src.chat.planner_actions.action_manager import ActionManager
from src.common.data_models.info_data_model import Plan, ActionPlannerInfo
from src.common.logger import get_logger
logger = get_logger("plan_executor")
class PlanExecutor:
"""
增强版PlanExecutor集成用户关系追踪机制。
功能:
1. 执行Plan中的所有动作
2. 自动记录用户交互并添加到关系追踪
3. 分类执行回复动作和其他动作
4. 提供完整的执行统计和监控
"""
def __init__(self, action_manager: ActionManager):
"""
初始化增强版PlanExecutor。
Args:
action_manager (ActionManager): 用于实际执行各种动作的管理器实例。
"""
self.action_manager = action_manager
# 执行统计
self.execution_stats = {
"total_executed": 0,
"successful_executions": 0,
"failed_executions": 0,
"reply_executions": 0,
"other_action_executions": 0,
"execution_times": [],
}
# 用户关系追踪引用
self.relationship_tracker = None
def set_relationship_tracker(self, relationship_tracker):
"""设置关系追踪器"""
self.relationship_tracker = relationship_tracker
async def execute(self, plan: Plan) -> Dict[str, any]:
"""
遍历并执行Plan对象中`decided_actions`列表里的所有动作。
Args:
plan (Plan): 包含待执行动作列表的Plan对象。
Returns:
Dict[str, any]: 执行结果统计信息
"""
if not plan.decided_actions:
logger.info("没有需要执行的动作。")
return {"executed_count": 0, "results": []}
execution_results = []
reply_actions = []
other_actions = []
# 分类动作:回复动作和其他动作
for action_info in plan.decided_actions:
if action_info.action_type in ["reply", "proactive_reply"]:
reply_actions.append(action_info)
else:
other_actions.append(action_info)
# 执行回复动作(优先执行)
if reply_actions:
reply_result = await self._execute_reply_actions(reply_actions, plan)
execution_results.extend(reply_result["results"])
self.execution_stats["reply_executions"] += len(reply_actions)
# 将其他动作放入后台任务执行,避免阻塞主流程
if other_actions:
asyncio.create_task(self._execute_other_actions(other_actions, plan))
logger.info(f"已将 {len(other_actions)} 个其他动作放入后台任务执行。")
# 注意:后台任务的结果不会立即计入本次返回的统计数据
# 更新总体统计
self.execution_stats["total_executed"] += len(plan.decided_actions)
successful_count = sum(1 for r in execution_results if r["success"])
self.execution_stats["successful_executions"] += successful_count
self.execution_stats["failed_executions"] += len(execution_results) - successful_count
logger.info(
f"规划执行完成: 总数={len(plan.decided_actions)}, 成功={successful_count}, 失败={len(execution_results) - successful_count}"
)
return {
"executed_count": len(plan.decided_actions),
"successful_count": successful_count,
"failed_count": len(execution_results) - successful_count,
"results": execution_results,
}
async def _execute_reply_actions(self, reply_actions: List[ActionPlannerInfo], plan: Plan) -> Dict[str, any]:
"""执行回复动作"""
results = []
for action_info in reply_actions:
result = await self._execute_single_reply_action(action_info, plan)
results.append(result)
return {"results": results}
async def _execute_single_reply_action(self, action_info: ActionPlannerInfo, plan: Plan) -> Dict[str, any]:
"""执行单个回复动作"""
start_time = time.time()
success = False
error_message = ""
reply_content = ""
try:
logger.info(f"执行回复动作: {action_info.action_type} (原因: {action_info.reasoning})")
# 获取用户ID - 兼容对象和字典
if hasattr(action_info.action_message, "user_info"):
user_id = action_info.action_message.user_info.user_id
else:
user_id = action_info.action_message.get("user_info", {}).get("user_id")
if user_id == str(global_config.bot.qq_account):
logger.warning("尝试回复自己,跳过此动作以防止死循环。")
return {
"action_type": action_info.action_type,
"success": False,
"error_message": "尝试回复自己,跳过此动作以防止死循环。",
"execution_time": 0,
"reasoning": action_info.reasoning,
"reply_content": "",
}
# 构建回复动作参数
action_params = {
"chat_id": plan.chat_id,
"target_message": action_info.action_message,
"reasoning": action_info.reasoning,
"action_data": action_info.action_data or {},
}
# 通过动作管理器执行回复
reply_content = await self.action_manager.execute_action(
action_name=action_info.action_type, **action_params
)
success = True
logger.info(f"回复动作 '{action_info.action_type}' 执行成功。")
except Exception as e:
error_message = str(e)
logger.error(f"执行回复动作失败: {action_info.action_type}, 错误: {error_message}")
# 记录用户关系追踪
if success and action_info.action_message:
await self._track_user_interaction(action_info, plan, reply_content)
execution_time = time.time() - start_time
self.execution_stats["execution_times"].append(execution_time)
return {
"action_type": action_info.action_type,
"success": success,
"error_message": error_message,
"execution_time": execution_time,
"reasoning": action_info.reasoning,
"reply_content": reply_content[:200] + "..." if len(reply_content) > 200 else reply_content,
}
async def _execute_other_actions(self, other_actions: List[ActionPlannerInfo], plan: Plan) -> Dict[str, any]:
"""执行其他动作"""
results = []
# 并行执行其他动作
tasks = []
for action_info in other_actions:
task = self._execute_single_other_action(action_info, plan)
tasks.append(task)
if tasks:
executed_results = await asyncio.gather(*tasks, return_exceptions=True)
for i, result in enumerate(executed_results):
if isinstance(result, Exception):
logger.error(f"执行动作 {other_actions[i].action_type} 时发生异常: {result}")
results.append(
{
"action_type": other_actions[i].action_type,
"success": False,
"error_message": str(result),
"execution_time": 0,
"reasoning": other_actions[i].reasoning,
}
)
else:
results.append(result)
return {"results": results}
async def _execute_single_other_action(self, action_info: ActionPlannerInfo, plan: Plan) -> Dict[str, any]:
"""执行单个其他动作"""
start_time = time.time()
success = False
error_message = ""
try:
logger.info(f"执行其他动作: {action_info.action_type} (原因: {action_info.reasoning})")
action_data = action_info.action_data or {}
# 针对 poke_user 动作,特殊处理
if action_info.action_type == "poke_user":
target_message = action_info.action_message
if target_message:
# 优先直接获取 user_id这才是最可靠的信息
user_id = target_message.get("user_id")
if user_id:
action_data["user_id"] = user_id
logger.info(f"检测到戳一戳动作目标用户ID: {user_id}")
else:
# 如果没有 user_id再尝试用 user_nickname 作为备用方案
user_name = target_message.get("user_nickname")
if user_name:
action_data["user_name"] = user_name
logger.info(f"检测到戳一戳动作,目标用户: {user_name}")
else:
logger.warning("无法从戳一戳消息中获取用户ID或昵称。")
# 传递原始消息ID以支持引用
action_data["target_message_id"] = target_message.get("message_id")
# 构建动作参数
action_params = {
"chat_id": plan.chat_id,
"target_message": action_info.action_message,
"reasoning": action_info.reasoning,
"action_data": action_data,
}
# 通过动作管理器执行动作
await self.action_manager.execute_action(action_name=action_info.action_type, **action_params)
success = True
logger.info(f"其他动作 '{action_info.action_type}' 执行成功。")
except Exception as e:
error_message = str(e)
logger.error(f"执行其他动作失败: {action_info.action_type}, 错误: {error_message}")
execution_time = time.time() - start_time
self.execution_stats["execution_times"].append(execution_time)
return {
"action_type": action_info.action_type,
"success": success,
"error_message": error_message,
"execution_time": execution_time,
"reasoning": action_info.reasoning,
}
async def _track_user_interaction(self, action_info: ActionPlannerInfo, plan: Plan, reply_content: str):
"""追踪用户交互 - 集成回复后关系追踪"""
try:
if not action_info.action_message:
return
# 获取用户信息 - 处理对象和字典两种情况
if hasattr(action_info.action_message, "user_info"):
# 对象情况
user_info = action_info.action_message.user_info
user_id = user_info.user_id
user_name = user_info.user_nickname or user_id
user_message = action_info.action_message.content
else:
# 字典情况
user_info = action_info.action_message.get("user_info", {})
user_id = user_info.get("user_id")
user_name = user_info.get("user_nickname") or user_id
user_message = action_info.action_message.get("content", "")
if not user_id:
logger.debug("跳过追踪缺少用户ID")
return
# 如果有设置关系追踪器,执行回复后关系追踪
if self.relationship_tracker:
# 记录基础交互信息(保持向后兼容)
self.relationship_tracker.add_interaction(
user_id=user_id,
user_name=user_name,
user_message=user_message,
bot_reply=reply_content,
reply_timestamp=time.time(),
)
# 执行新的回复后关系追踪
await self.relationship_tracker.track_reply_relationship(
user_id=user_id, user_name=user_name, bot_reply_content=reply_content, reply_timestamp=time.time()
)
logger.debug(f"已执行用户交互追踪: {user_id}")
except Exception as e:
logger.error(f"追踪用户交互时出错: {e}")
logger.debug(f"action_message类型: {type(action_info.action_message)}")
logger.debug(f"action_message内容: {action_info.action_message}")
def get_execution_stats(self) -> Dict[str, any]:
"""获取执行统计信息"""
stats = self.execution_stats.copy()
# 计算平均执行时间
if stats["execution_times"]:
avg_time = sum(stats["execution_times"]) / len(stats["execution_times"])
stats["average_execution_time"] = avg_time
stats["max_execution_time"] = max(stats["execution_times"])
stats["min_execution_time"] = min(stats["execution_times"])
else:
stats["average_execution_time"] = 0
stats["max_execution_time"] = 0
stats["min_execution_time"] = 0
# 移除执行时间列表以避免返回过大数据
stats.pop("execution_times", None)
return stats
def reset_stats(self):
"""重置统计信息"""
self.execution_stats = {
"total_executed": 0,
"successful_executions": 0,
"failed_executions": 0,
"reply_executions": 0,
"other_action_executions": 0,
"execution_times": [],
}
def get_recent_performance(self, limit: int = 10) -> List[Dict[str, any]]:
"""获取最近的执行性能"""
recent_times = self.execution_stats["execution_times"][-limit:]
if not recent_times:
return []
return [
{
"execution_index": i + 1,
"execution_time": time_val,
"timestamp": time.time() - (len(recent_times) - i) * 60, # 估算时间戳
}
for i, time_val in enumerate(recent_times)
]

View File

@@ -1,522 +0,0 @@
"""
PlanFilter: 接收 Plan 对象,根据不同模式的逻辑进行筛选,决定最终要执行的动作。
"""
import orjson
import time
import traceback
from datetime import datetime
from typing import Any, Dict, List, Optional
from json_repair import repair_json
from src.chat.memory_system.Hippocampus import hippocampus_manager
from src.chat.utils.chat_message_builder import (
build_readable_actions,
build_readable_messages_with_id,
get_actions_by_timestamp_with_chat,
)
from src.chat.utils.prompt import global_prompt_manager
from src.common.data_models.info_data_model import ActionPlannerInfo, Plan
from src.common.logger import get_logger
from src.config.config import global_config, model_config
from src.llm_models.utils_model import LLMRequest
from src.mood.mood_manager import mood_manager
from src.plugin_system.base.component_types import ActionInfo, ChatMode
from src.schedule.schedule_manager import schedule_manager
logger = get_logger("plan_filter")
class PlanFilter:
"""
根据 Plan 中的模式和信息,筛选并决定最终的动作。
"""
def __init__(self):
self.planner_llm = LLMRequest(model_set=model_config.model_task_config.planner, request_type="planner")
self.last_obs_time_mark = 0.0
async def filter(self, reply_not_available: bool, plan: Plan) -> Plan:
"""
执行筛选逻辑,并填充 Plan 对象的 decided_actions 字段。
"""
logger.debug(f"墨墨在这里加了日志 -> filter 入口 plan: {plan}")
try:
prompt, used_message_id_list = await self._build_prompt(plan)
plan.llm_prompt = prompt
logger.info(f"规划器原始提示词: {prompt}")
llm_content, _ = await self.planner_llm.generate_response_async(prompt=prompt)
if llm_content:
logger.debug(f"墨墨在这里加了日志 -> LLM a原始返回: {llm_content}")
try:
parsed_json = orjson.loads(repair_json(llm_content))
except orjson.JSONDecodeError:
parsed_json = {"action": "no_action", "reason": "返回内容无法解析为JSON"}
logger.debug(f"墨墨在这里加了日志 -> 解析后的 JSON: {parsed_json}")
if "reply" in plan.available_actions and reply_not_available:
# 如果reply动作不可用但llm返回的仍然有reply则改为no_reply
if isinstance(parsed_json, dict) and parsed_json.get("action") == "reply":
parsed_json["action"] = "no_reply"
elif isinstance(parsed_json, list):
for item in parsed_json:
if isinstance(item, dict) and item.get("action") == "reply":
item["action"] = "no_reply"
item["reason"] += " (但由于兴趣度不足reply动作不可用已改为no_reply)"
if isinstance(parsed_json, dict):
parsed_json = [parsed_json]
if isinstance(parsed_json, list):
final_actions = []
reply_action_added = False
# 定义回复类动作的集合,方便扩展
reply_action_types = {"reply", "proactive_reply"}
for item in parsed_json:
if not isinstance(item, dict):
continue
# 预解析 action_type 来进行判断
action_type = item.get("action", "no_action")
if action_type in reply_action_types:
if not reply_action_added:
final_actions.extend(await self._parse_single_action(item, used_message_id_list, plan))
reply_action_added = True
else:
# 非回复类动作直接添加
final_actions.extend(await self._parse_single_action(item, used_message_id_list, plan))
plan.decided_actions = self._filter_no_actions(final_actions)
except Exception as e:
logger.error(f"筛选 Plan 时出错: {e}\n{traceback.format_exc()}")
plan.decided_actions = [ActionPlannerInfo(action_type="no_action", reasoning=f"筛选时出错: {e}")]
logger.debug(f"墨墨在这里加了日志 -> filter 出口 decided_actions: {plan.decided_actions}")
return plan
async def _build_prompt(self, plan: Plan) -> tuple[str, list]:
"""
根据 Plan 对象构建提示词。
"""
try:
time_block = f"当前时间:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
bot_name = global_config.bot.nickname
bot_nickname = (
f",也有人叫你{','.join(global_config.bot.alias_names)}" if global_config.bot.alias_names else ""
)
bot_core_personality = global_config.personality.personality_core
identity_block = f"你的名字是{bot_name}{bot_nickname},你{bot_core_personality}"
schedule_block = ""
if global_config.planning_system.schedule_enable:
if current_activity := schedule_manager.get_current_activity():
schedule_block = f"你当前正在:{current_activity},但注意它与群聊的聊天无关。"
mood_block = ""
if global_config.mood.enable_mood:
chat_mood = mood_manager.get_mood_by_chat_id(plan.chat_id)
mood_block = f"你现在的心情是:{chat_mood.mood_state}"
if plan.mode == ChatMode.PROACTIVE:
long_term_memory_block = await self._get_long_term_memory_context()
chat_content_block, message_id_list = build_readable_messages_with_id(
messages=[msg.flatten() for msg in plan.chat_history],
timestamp_mode="normal",
truncate=False,
show_actions=False,
)
prompt_template = await global_prompt_manager.get_prompt_async("proactive_planner_prompt")
actions_before_now = await get_actions_by_timestamp_with_chat(
chat_id=plan.chat_id,
timestamp_start=time.time() - 3600,
timestamp_end=time.time(),
limit=5,
)
actions_before_now_block = build_readable_actions(actions=actions_before_now)
actions_before_now_block = f"你刚刚选择并执行过的action是\n{actions_before_now_block}"
prompt = prompt_template.format(
time_block=time_block,
identity_block=identity_block,
schedule_block=schedule_block,
mood_block=mood_block,
long_term_memory_block=long_term_memory_block,
chat_content_block=chat_content_block or "最近没有聊天内容。",
actions_before_now_block=actions_before_now_block,
)
return prompt, message_id_list
# 构建已读/未读历史消息
read_history_block, unread_history_block, message_id_list = await self._build_read_unread_history_blocks(
plan
)
# 为了兼容性保留原有的chat_content_block
chat_content_block, _ = build_readable_messages_with_id(
messages=[msg.flatten() for msg in plan.chat_history],
timestamp_mode="normal",
read_mark=self.last_obs_time_mark,
truncate=True,
show_actions=True,
)
actions_before_now = await get_actions_by_timestamp_with_chat(
chat_id=plan.chat_id,
timestamp_start=time.time() - 3600,
timestamp_end=time.time(),
limit=5,
)
actions_before_now_block = build_readable_actions(actions=actions_before_now)
actions_before_now_block = f"你刚刚选择并执行过的action是\n{actions_before_now_block}"
self.last_obs_time_mark = time.time()
mentioned_bonus = ""
if global_config.chat.mentioned_bot_inevitable_reply:
mentioned_bonus = "\n- 有人提到你"
if global_config.chat.at_bot_inevitable_reply:
mentioned_bonus = "\n- 有人提到你或者at你"
if plan.mode == ChatMode.FOCUS:
no_action_block = """
动作no_action
动作描述:不选择任何动作
{{
"action": "no_action",
"reason":"不动作的原因"
}}
动作no_reply
动作描述:不进行回复,等待合适的回复时机
- 当你刚刚发送了消息没有人回复时选择no_reply
- 当你一次发送了太多消息为了避免打扰聊天节奏选择no_reply
{{
"action": "no_reply",
"reason":"不回复的原因"
}}
"""
else: # NORMAL Mode
no_action_block = """重要说明:
- 'reply' 表示只进行普通聊天回复,不执行任何额外动作
- 其他action表示在普通回复的基础上执行相应的额外动作
{{
"action": "reply",
"target_message_id":"触发action的消息id",
"reason":"回复的原因"
}}"""
is_group_chat = plan.target_info.platform == "group" if plan.target_info else True
chat_context_description = "你现在正在一个群聊中"
if not is_group_chat and plan.target_info:
chat_target_name = plan.target_info.person_name or plan.target_info.user_nickname or "对方"
chat_context_description = f"你正在和 {chat_target_name} 私聊"
action_options_block = await self._build_action_options(plan.available_actions)
moderation_prompt_block = "请不要输出违法违规内容,不要输出色情,暴力,政治相关内容,如有敏感内容,请规避。"
custom_prompt_block = ""
if global_config.custom_prompt.planner_custom_prompt_content:
custom_prompt_block = global_config.custom_prompt.planner_custom_prompt_content
users_in_chat_str = "" # TODO: Re-implement user list fetching if needed
planner_prompt_template = await global_prompt_manager.get_prompt_async("planner_prompt")
prompt = planner_prompt_template.format(
schedule_block=schedule_block,
mood_block=mood_block,
time_block=time_block,
chat_context_description=chat_context_description,
read_history_block=read_history_block,
unread_history_block=unread_history_block,
actions_before_now_block=actions_before_now_block,
mentioned_bonus=mentioned_bonus,
no_action_block=no_action_block,
action_options_text=action_options_block,
moderation_prompt=moderation_prompt_block,
identity_block=identity_block,
custom_prompt_block=custom_prompt_block,
bot_name=bot_name,
users_in_chat=users_in_chat_str,
)
return prompt, message_id_list
except Exception as e:
logger.error(f"构建 Planner 提示词时出错: {e}")
logger.error(traceback.format_exc())
return "构建 Planner Prompt 时出错", []
async def _build_read_unread_history_blocks(self, plan: Plan) -> tuple[str, str, list]:
"""构建已读/未读历史消息块"""
try:
# 从message_manager获取真实的已读/未读消息
from src.chat.message_manager.message_manager import message_manager
from src.chat.utils.utils import assign_message_ids
# 获取聊天流的上下文
stream_context = message_manager.stream_contexts.get(plan.chat_id)
# 获取真正的已读和未读消息
read_messages = stream_context.history_messages # 已读消息存储在history_messages中
unread_messages = stream_context.get_unread_messages() # 获取未读消息
# 构建已读历史消息块
if read_messages:
read_content, read_ids = build_readable_messages_with_id(
messages=[msg.flatten() for msg in read_messages[-50:]], # 限制数量
timestamp_mode="normal_no_YMD",
truncate=False,
show_actions=False,
)
read_history_block = f"{read_content}"
else:
read_history_block = "暂无已读历史消息"
# 构建未读历史消息块(包含兴趣度)
if unread_messages:
# 扁平化未读消息用于计算兴趣度和格式化
flattened_unread = [msg.flatten() for msg in unread_messages]
# 尝试获取兴趣度评分(返回以真实 message_id 为键的字典)
interest_scores = await self._get_interest_scores_for_messages(flattened_unread)
# 为未读消息分配短 id保持与 build_readable_messages_with_id 的一致结构)
message_id_list = assign_message_ids(flattened_unread)
unread_lines = []
for idx, msg in enumerate(flattened_unread):
mapped = message_id_list[idx]
synthetic_id = mapped.get("id")
original_msg_id = msg.get("message_id") or msg.get("id")
msg_time = time.strftime("%H:%M:%S", time.localtime(msg.get("time", time.time())))
msg_content = msg.get("processed_plain_text", "")
# 添加兴趣度信息
interest_score = interest_scores.get(original_msg_id, 0.0)
interest_text = f" [兴趣度: {interest_score:.3f}]" if interest_score > 0 else ""
# 在未读行中显示合成id方便 planner 返回时使用
unread_lines.append(f"{msg_time} {synthetic_id}: {msg_content}{interest_text}")
unread_history_block = "\n".join(unread_lines)
else:
unread_history_block = "暂无未读历史消息"
return read_history_block, unread_history_block, message_id_list
except Exception as e:
logger.error(f"构建已读/未读历史消息块时出错: {e}")
return "构建已读历史消息时出错", "构建未读历史消息时出错", []
async def _get_interest_scores_for_messages(self, messages: List[dict]) -> dict[str, float]:
"""为消息获取兴趣度评分"""
interest_scores = {}
try:
from src.chat.affinity_flow.interest_scoring import interest_scoring_system
from src.common.data_models.database_data_model import DatabaseMessages
# 转换消息格式
db_messages = []
for msg_dict in messages:
try:
db_msg = DatabaseMessages(
message_id=msg_dict.get("message_id", ""),
time=msg_dict.get("time", time.time()),
chat_id=msg_dict.get("chat_id", ""),
processed_plain_text=msg_dict.get("processed_plain_text", ""),
user_id=msg_dict.get("user_id", ""),
user_nickname=msg_dict.get("user_nickname", ""),
user_platform=msg_dict.get("platform", "qq"),
chat_info_group_id=msg_dict.get("group_id", ""),
chat_info_group_name=msg_dict.get("group_name", ""),
chat_info_group_platform=msg_dict.get("platform", "qq"),
)
db_messages.append(db_msg)
except Exception as e:
logger.warning(f"转换消息格式失败: {e}")
continue
# 计算兴趣度评分
if db_messages:
bot_nickname = global_config.bot.nickname or "麦麦"
scores = await interest_scoring_system.calculate_interest_scores(db_messages, bot_nickname)
# 构建兴趣度字典
for score in scores:
interest_scores[score.message_id] = score.total_score
except Exception as e:
logger.warning(f"获取兴趣度评分失败: {e}")
return interest_scores
async def _parse_single_action(
self, action_json: dict, message_id_list: list, plan: Plan
) -> List[ActionPlannerInfo]:
parsed_actions = []
try:
action = action_json.get("action", "no_action")
reasoning = action_json.get("reason", "未提供原因")
action_data = {k: v for k, v in action_json.items() if k not in ["action", "reason"]}
target_message_obj = None
if action not in ["no_action", "no_reply", "do_nothing", "proactive_reply"]:
if target_message_id := action_json.get("target_message_id"):
target_message_dict = self._find_message_by_id(target_message_id, message_id_list)
else:
# 如果LLM没有指定target_message_id我们就默认选择最新的一条消息
target_message_dict = self._get_latest_message(message_id_list)
if target_message_dict:
# 直接使用字典作为action_message避免DatabaseMessages对象创建失败
target_message_obj = target_message_dict
# 替换action_data中的临时ID为真实ID
if "target_message_id" in action_data:
real_message_id = target_message_dict.get("message_id") or target_message_dict.get("id")
if real_message_id:
action_data["target_message_id"] = real_message_id
else:
# 如果找不到目标消息对于reply动作来说这是必需的应该记录警告
if action == "reply":
logger.warning(
f"reply动作找不到目标消息target_message_id: {action_json.get('target_message_id')}"
)
# 将reply动作改为no_action避免后续执行时出错
action = "no_action"
reasoning = f"找不到目标消息进行回复。原始理由: {reasoning}"
available_action_names = list(plan.available_actions.keys())
if (
action not in ["no_action", "no_reply", "reply", "do_nothing", "proactive_reply"]
and action not in available_action_names
):
reasoning = f"LLM 返回了当前不可用的动作 '{action}'。原始理由: {reasoning}"
action = "no_action"
parsed_actions.append(
ActionPlannerInfo(
action_type=action,
reasoning=reasoning,
action_data=action_data,
action_message=target_message_obj,
available_actions=plan.available_actions,
)
)
except Exception as e:
logger.error(f"解析单个action时出错: {e}")
parsed_actions.append(
ActionPlannerInfo(
action_type="no_action",
reasoning=f"解析action时出错: {e}",
)
)
return parsed_actions
def _filter_no_actions(self, action_list: List[ActionPlannerInfo]) -> List[ActionPlannerInfo]:
non_no_actions = [a for a in action_list if a.action_type not in ["no_action", "no_reply"]]
if non_no_actions:
return non_no_actions
return action_list[:1] if action_list else []
@staticmethod
async def _get_long_term_memory_context() -> str:
try:
now = datetime.now()
keywords = ["今天", "日程", "计划"]
if 5 <= now.hour < 12:
keywords.append("早上")
elif 12 <= now.hour < 18:
keywords.append("中午")
else:
keywords.append("晚上")
retrieved_memories = await hippocampus_manager.get_memory_from_topic(
valid_keywords=keywords, max_memory_num=5, max_memory_length=1
)
if not retrieved_memories:
return "最近没有什么特别的记忆。"
memory_statements = [f"关于'{topic}', 你记得'{memory_item}'" for topic, memory_item in retrieved_memories]
return " ".join(memory_statements)
except Exception as e:
logger.error(f"获取长期记忆时出错: {e}")
return "回忆时出现了一些问题。"
@staticmethod
async def _build_action_options(current_available_actions: Dict[str, ActionInfo]) -> str:
action_options_block = ""
for action_name, action_info in current_available_actions.items():
param_text = ""
if action_info.action_parameters:
param_text = "\n" + "\n".join(
f' "{p_name}":"{p_desc}"' for p_name, p_desc in action_info.action_parameters.items()
)
require_text = "\n".join(f"- {req}" for req in action_info.action_require)
using_action_prompt = await global_prompt_manager.get_prompt_async("action_prompt")
action_options_block += using_action_prompt.format(
action_name=action_name,
action_description=action_info.description,
action_parameters=param_text,
action_require=require_text,
)
return action_options_block
def _find_message_by_id(self, message_id: str, message_id_list: list) -> Optional[Dict[str, Any]]:
# 兼容多种 message_id 格式数字、m123、buffered-xxxx
# 如果是纯数字,补上 m 前缀以兼容旧格式
candidate_ids = {message_id}
if message_id.isdigit():
candidate_ids.add(f"m{message_id}")
# 如果是 m 开头且后面是数字,尝试去掉 m 前缀的数字形式
if message_id.startswith("m") and message_id[1:].isdigit():
candidate_ids.add(message_id[1:])
# 逐项匹配 message_id_list每项可能为 {'id':..., 'message':...}
for item in message_id_list:
# 支持 message_id_list 中直接是字符串/ID 的情形
if isinstance(item, str):
if item in candidate_ids:
# 没有 message 对象返回None
return None
continue
if not isinstance(item, dict):
continue
item_id = item.get("id")
# 直接匹配分配的短 id
if item_id and item_id in candidate_ids:
return item.get("message")
# 有时 message 存储里会有原始的 message_id 字段(如 buffered-xxxx
message_obj = item.get("message")
if isinstance(message_obj, dict):
orig_mid = message_obj.get("message_id") or message_obj.get("id")
if orig_mid and orig_mid in candidate_ids:
return message_obj
# 作为兜底,尝试在 message_id_list 中找到 message.message_id 匹配
for item in message_id_list:
if isinstance(item, dict) and isinstance(item.get("message"), dict):
mid = item["message"].get("message_id") or item["message"].get("id")
if mid == message_id:
return item["message"]
return None
@staticmethod
def _get_latest_message(message_id_list: list) -> Optional[Dict[str, Any]]:
if not message_id_list:
return None
return message_id_list[-1].get("message")

View File

@@ -1,260 +0,0 @@
"""
主规划器入口,负责协调 PlanGenerator, PlanFilter, 和 PlanExecutor。
集成兴趣度评分系统和用户关系追踪机制,实现智能化的聊天决策。
"""
from dataclasses import asdict
from typing import TYPE_CHECKING, Dict, List, Optional, Tuple
from src.plugin_system.base.component_types import ChatMode
from src.chat.planner_actions.plan_executor import PlanExecutor
from src.chat.planner_actions.plan_filter import PlanFilter
from src.chat.planner_actions.plan_generator import PlanGenerator
from src.chat.affinity_flow.interest_scoring import InterestScoringSystem
from src.chat.affinity_flow.relationship_tracker import UserRelationshipTracker
from src.common.logger import get_logger
from src.config.config import global_config
if TYPE_CHECKING:
from src.chat.planner_actions.action_manager import ActionManager
from src.common.data_models.message_manager_data_model import StreamContext
from src.common.data_models.info_data_model import Plan
# 导入提示词模块以确保其被初始化
from src.chat.planner_actions import planner_prompts # noqa
logger = get_logger("planner")
class ActionPlanner:
"""
增强版ActionPlanner集成兴趣度评分和用户关系追踪机制。
核心功能:
1. 兴趣度评分系统:根据兴趣匹配度、关系分、提及度、时间因子对消息评分
2. 用户关系追踪:自动追踪用户交互并更新关系分
3. 智能回复决策:基于兴趣度阈值和连续不回复概率的智能决策
4. 完整的规划流程:生成→筛选→执行的完整三阶段流程
"""
def __init__(self, chat_id: str, action_manager: "ActionManager"):
"""
初始化增强版ActionPlanner。
Args:
chat_id (str): 当前聊天的 ID。
action_manager (ActionManager): 一个 ActionManager 实例。
"""
self.chat_id = chat_id
self.action_manager = action_manager
self.generator = PlanGenerator(chat_id)
self.filter = PlanFilter()
self.executor = PlanExecutor(action_manager)
# 初始化兴趣度评分系统
self.interest_scoring = InterestScoringSystem()
# 尝试获取全局关系追踪器,如果没有则创建新的
try:
from src.chat.affinity_flow.relationship_integration import get_relationship_tracker
global_relationship_tracker = get_relationship_tracker()
if global_relationship_tracker:
# 使用全局关系追踪器
self.relationship_tracker = global_relationship_tracker
# 设置兴趣度评分系统的关系追踪器引用
self.interest_scoring.relationship_tracker = self.relationship_tracker
logger.info("使用全局关系追踪器")
else:
# 创建新的关系追踪器
self.relationship_tracker = UserRelationshipTracker(self.interest_scoring)
logger.info("创建新的关系追踪器实例")
except Exception as e:
logger.warning(f"获取全局关系追踪器失败: {e}")
# 创建新的关系追踪器
self.relationship_tracker = UserRelationshipTracker(self.interest_scoring)
# 设置执行器的关系追踪器
self.executor.set_relationship_tracker(self.relationship_tracker)
# 规划器统计
self.planner_stats = {
"total_plans": 0,
"successful_plans": 0,
"failed_plans": 0,
"replies_generated": 0,
"other_actions_executed": 0,
}
async def plan(
self, mode: ChatMode = ChatMode.FOCUS, context: "StreamContext" = None
) -> Tuple[List[Dict], Optional[Dict]]:
"""
执行完整的增强版规划流程。
Args:
mode (ChatMode): 当前的聊天模式,默认为 FOCUS。
context (StreamContext): 包含聊天流消息的上下文对象。
Returns:
Tuple[List[Dict], Optional[Dict]]: 一个元组,包含:
- final_actions_dict (List[Dict]): 最终确定的动作列表(字典格式)。
- final_target_message_dict (Optional[Dict]): 最终的目标消息(字典格式)。
"""
try:
self.planner_stats["total_plans"] += 1
return await self._enhanced_plan_flow(mode, context)
except Exception as e:
logger.error(f"规划流程出错: {e}")
self.planner_stats["failed_plans"] += 1
return [], None
async def _enhanced_plan_flow(self, mode: ChatMode, context: "StreamContext") -> Tuple[List[Dict], Optional[Dict]]:
"""执行增强版规划流程"""
try:
# 1. 生成初始 Plan
initial_plan = await self.generator.generate(mode)
unread_messages = context.get_unread_messages() if context else []
# 2. 兴趣度评分 - 只对未读消息进行评分
if unread_messages:
bot_nickname = global_config.bot.nickname
interest_scores = await self.interest_scoring.calculate_interest_scores(unread_messages, bot_nickname)
# 3. 根据兴趣度调整可用动作
if interest_scores:
latest_score = max(interest_scores, key=lambda s: s.total_score)
latest_message = next(
(msg for msg in unread_messages if msg.message_id == latest_score.message_id), None
)
should_reply, score = self.interest_scoring.should_reply(latest_score, latest_message)
reply_not_available = False
if not should_reply and "reply" in initial_plan.available_actions:
logger.info(f"兴趣度不足 ({latest_score.total_score:.2f}),移除'回复'动作。")
reply_not_available = True
# base_threshold = self.interest_scoring.reply_threshold
# 检查兴趣度是否达到非回复动作阈值
non_reply_action_interest_threshold = global_config.affinity_flow.non_reply_action_interest_threshold
if score < non_reply_action_interest_threshold:
logger.info(
f"兴趣度 {score:.3f} 低于非回复动作阈值 {non_reply_action_interest_threshold:.3f},不执行任何动作。"
)
# 直接返回 no_action
from src.common.data_models.info_data_model import ActionPlannerInfo
no_action = ActionPlannerInfo(
action_type="no_action",
reasoning=f"兴趣度评分 {score:.3f} 未达阈值 {non_reply_action_interest_threshold:.3f}",
action_data={},
action_message=None,
)
filtered_plan = initial_plan
filtered_plan.decided_actions = [no_action]
else:
# 4. 筛选 Plan
filtered_plan = await self.filter.filter(reply_not_available, initial_plan)
# 检查filtered_plan是否有reply动作以便记录reply action
has_reply_action = False
for decision in filtered_plan.decided_actions:
if decision.action_type == "reply":
has_reply_action = True
self.interest_scoring.record_reply_action(has_reply_action)
# 5. 使用 PlanExecutor 执行 Plan
execution_result = await self.executor.execute(filtered_plan)
# 6. 根据执行结果更新统计信息
self._update_stats_from_execution_result(execution_result)
# 7. 检查关系更新
await self.relationship_tracker.check_and_update_relationships()
# 8. 返回结果
return self._build_return_result(filtered_plan)
except Exception as e:
logger.error(f"增强版规划流程出错: {e}")
self.planner_stats["failed_plans"] += 1
return [], None
def _update_stats_from_execution_result(self, execution_result: Dict[str, any]):
"""根据执行结果更新规划器统计"""
if not execution_result:
return
successful_count = execution_result.get("successful_count", 0)
# 更新成功执行计数
self.planner_stats["successful_plans"] += successful_count
# 统计回复动作和其他动作
reply_count = 0
other_count = 0
for result in execution_result.get("results", []):
action_type = result.get("action_type", "")
if action_type in ["reply", "proactive_reply"]:
reply_count += 1
else:
other_count += 1
self.planner_stats["replies_generated"] += reply_count
self.planner_stats["other_actions_executed"] += other_count
def _build_return_result(self, plan: "Plan") -> Tuple[List[Dict], Optional[Dict]]:
"""构建返回结果"""
final_actions = plan.decided_actions or []
final_target_message = next((act.action_message for act in final_actions if act.action_message), None)
final_actions_dict = [asdict(act) for act in final_actions]
if final_target_message:
if hasattr(final_target_message, "__dataclass_fields__"):
final_target_message_dict = asdict(final_target_message)
else:
final_target_message_dict = final_target_message
else:
final_target_message_dict = None
return final_actions_dict, final_target_message_dict
def get_user_relationship(self, user_id: str) -> float:
"""获取用户关系分"""
return self.interest_scoring.get_user_relationship(user_id)
def update_interest_keywords(self, new_keywords: Dict[str, List[str]]):
"""更新兴趣关键词(已弃用,仅保留用于兼容性)"""
logger.info("传统关键词匹配已移除,此方法仅保留用于兼容性")
# 此方法已弃用因为现在完全使用embedding匹配
def get_planner_stats(self) -> Dict[str, any]:
"""获取规划器统计"""
return self.planner_stats.copy()
def get_interest_scoring_stats(self) -> Dict[str, any]:
"""获取兴趣度评分统计"""
return {
"no_reply_count": self.interest_scoring.no_reply_count,
"max_no_reply_count": self.interest_scoring.max_no_reply_count,
"reply_threshold": self.interest_scoring.reply_threshold,
"mention_threshold": self.interest_scoring.mention_threshold,
"user_relationships": len(self.interest_scoring.user_relationships),
}
def get_relationship_stats(self) -> Dict[str, any]:
"""获取用户关系统计"""
return {
"tracking_users": len(self.relationship_tracker.tracking_users),
"relationship_history": len(self.relationship_tracker.relationship_history),
"max_tracking_users": self.relationship_tracker.max_tracking_users,
}
# 全局兴趣度评分系统实例 - 在 individuality 模块中创建

View File

@@ -897,7 +897,7 @@ class DefaultReplyer:
interest_scores = {}
try:
from src.chat.affinity_flow.interest_scoring import interest_scoring_system
from src.plugins.built_in.chatter.interest_scoring import chatter_interest_scoring_system as interest_scoring_system
from src.common.data_models.database_data_model import DatabaseMessages
# 转换消息格式
@@ -1635,9 +1635,11 @@ class DefaultReplyer:
# 使用AFC关系追踪器获取关系信息
try:
from src.chat.affinity_flow.relationship_integration import get_relationship_tracker
from src.plugins.built_in.chatter.relationship_tracker import ChatterRelationshipTracker
relationship_tracker = get_relationship_tracker()
# 创建关系追踪器实例
from src.plugins.built_in.chatter.interest_scoring import chatter_interest_scoring_system
relationship_tracker = ChatterRelationshipTracker(chatter_interest_scoring_system)
if relationship_tracker:
# 获取用户信息以获取真实的user_id
user_info = await person_info_manager.get_values(person_id, ["user_id", "platform"])