初步开始重写聊天系统

This commit is contained in:
Windpicker-owo
2025-09-15 13:11:37 +08:00
parent 11bd2ffc53
commit c52b4daf1a
8 changed files with 1277 additions and 65 deletions

View File

@@ -0,0 +1,138 @@
"""
亲和力聊天处理流管理器
管理不同聊天流的亲和力聊天处理流,统一获取新消息并分发到对应的亲和力聊天处理流
"""
import time
import traceback
from typing import Dict, Optional, List
from src.chat.planner_actions.action_manager import ActionManager
from src.chat.planner_actions.planner import ActionPlanner
from src.chat.affinity_flow.chatter import AffinityFlowChatter
from src.common.logger import get_logger
logger = get_logger("afc_manager")
class AFCManager:
"""亲和力聊天处理流管理器"""
def __init__(self):
self.affinity_flow_chatters: Dict[str, "AffinityFlowChatter"] = {}
'''所有聊天流的亲和力聊天处理流stream_id -> affinity_flow_chatter'''
# 动作管理器
self.action_manager = ActionManager()
# 管理器统计
self.manager_stats = {
"total_messages_processed": 0,
"total_plans_created": 0,
"total_actions_executed": 0,
"active_chatters": 0,
"last_activity_time": time.time(),
}
def get_or_create_chatter(self, stream_id: str) -> "AffinityFlowChatter":
"""获取或创建聊天流处理器"""
if stream_id not in self.affinity_flow_chatters:
# 创建增强版规划器
planner = ActionPlanner(stream_id, self.action_manager)
chatter = AffinityFlowChatter(
stream_id=stream_id,
planner=planner,
action_manager=self.action_manager
)
self.affinity_flow_chatters[stream_id] = chatter
logger.info(f"创建新的亲和力聊天处理器: {stream_id}")
return self.affinity_flow_chatters[stream_id]
async def process_message(self, stream_id: str, message_data: dict) -> Dict[str, any]:
"""处理消息"""
try:
# 获取或创建聊天处理器
chatter = self.get_or_create_chatter(stream_id)
# 处理消息
result = await chatter.process_message(message_data)
# 更新统计
self.manager_stats["total_messages_processed"] += 1
self.manager_stats["total_actions_executed"] += result.get("executed_count", 0)
self.manager_stats["last_activity_time"] = time.time()
return result
except Exception as e:
logger.error(f"处理消息时出错: {e}\n{traceback.format_exc()}")
return {
"success": False,
"error_message": str(e),
"executed_count": 0,
}
async def process_messages_batch(self, stream_id: str, messages_data: List[dict]) -> List[Dict[str, any]]:
"""批量处理消息"""
results = []
for message_data in messages_data:
result = await self.process_message(stream_id, message_data)
results.append(result)
return results
def get_chatter_stats(self, stream_id: str) -> Optional[Dict[str, any]]:
"""获取聊天处理器统计"""
if stream_id in self.affinity_flow_chatters:
return self.affinity_flow_chatters[stream_id].get_stats()
return None
def get_manager_stats(self) -> Dict[str, any]:
"""获取管理器统计"""
stats = self.manager_stats.copy()
stats["active_chatters"] = len(self.affinity_flow_chatters)
return stats
def cleanup_inactive_chatters(self, max_inactive_minutes: int = 60):
"""清理不活跃的聊天处理器"""
current_time = time.time()
max_inactive_seconds = max_inactive_minutes * 60
inactive_streams = []
for stream_id, chatter in self.affinity_flow_chatters.items():
if current_time - chatter.last_activity_time > max_inactive_seconds:
inactive_streams.append(stream_id)
for stream_id in inactive_streams:
del self.affinity_flow_chatters[stream_id]
logger.info(f"清理不活跃聊天处理器: {stream_id}")
def get_planner_stats(self, stream_id: str) -> Optional[Dict[str, any]]:
"""获取规划器统计"""
if stream_id in self.affinity_flow_chatters:
return self.affinity_flow_chatters[stream_id].get_planner_stats()
return None
def get_interest_scoring_stats(self, stream_id: str) -> Optional[Dict[str, any]]:
"""获取兴趣度评分统计"""
if stream_id in self.affinity_flow_chatters:
return self.affinity_flow_chatters[stream_id].get_interest_scoring_stats()
return None
def get_relationship_stats(self, stream_id: str) -> Optional[Dict[str, any]]:
"""获取用户关系统计"""
if stream_id in self.affinity_flow_chatters:
return self.affinity_flow_chatters[stream_id].get_relationship_stats()
return None
def get_user_relationship(self, stream_id: str, user_id: str) -> float:
"""获取用户关系分"""
if stream_id in self.affinity_flow_chatters:
return self.affinity_flow_chatters[stream_id].get_user_relationship(user_id)
return 0.3 # 默认新用户关系分
def update_interest_keywords(self, stream_id: str, new_keywords: dict):
"""更新兴趣关键词"""
if stream_id in self.affinity_flow_chatters:
self.affinity_flow_chatters[stream_id].update_interest_keywords(new_keywords)
logger.info(f"已更新聊天流 {stream_id} 的兴趣关键词: {list(new_keywords.keys())}")

View File

@@ -0,0 +1,197 @@
"""
亲和力聊天处理器
单个聊天流的处理器,负责处理特定聊天流的完整交互流程
"""
import time
import traceback
from datetime import datetime
from typing import Dict
from src.chat.planner_actions.action_manager import ActionManager
from src.chat.planner_actions.planner import ActionPlanner
from src.plugin_system.base.component_types import ChatMode
from src.common.logger import get_logger
logger = get_logger("affinity_chatter")
class AffinityFlowChatter:
"""单个亲和力聊天处理器"""
def __init__(self, stream_id: str, planner: ActionPlanner, action_manager: ActionManager):
"""
初始化亲和力聊天处理器
Args:
stream_id: 聊天流ID
planner: 动作规划器
action_manager: 动作管理器
"""
self.stream_id = stream_id
self.planner = planner
self.action_manager = action_manager
# 处理器统计
self.stats = {
"messages_processed": 0,
"plans_created": 0,
"actions_executed": 0,
"successful_executions": 0,
"failed_executions": 0,
}
self.last_activity_time = time.time()
async def process_message(self, message_data: dict) -> Dict[str, any]:
"""
处理单个消息
Args:
message_data: 消息数据字典
Returns:
处理结果字典
"""
try:
# 使用增强版规划器处理消息
actions, target_message = await self.planner.plan(mode=ChatMode.FOCUS, use_enhanced=True)
self.stats["plans_created"] += 1
# 执行动作(如果规划器返回了动作)
execution_result = {"executed_count": len(actions) if actions else 0}
if actions:
# 这里可以添加额外的动作执行逻辑
logger.debug(f"聊天流 {self.stream_id} 生成了 {len(actions)} 个动作")
# 更新统计
self.stats["messages_processed"] += 1
self.stats["actions_executed"] += execution_result.get("executed_count", 0)
self.stats["successful_executions"] += 1 # 假设成功
self.last_activity_time = time.time()
result = {
"success": True,
"stream_id": self.stream_id,
"plan_created": True,
"actions_count": len(actions) if actions else 0,
"has_target_message": target_message is not None,
**execution_result,
}
logger.info(f"聊天流 {self.stream_id} 消息处理成功: 动作数={result['actions_count']}")
return result
except Exception as e:
logger.error(f"亲和力聊天处理器 {self.stream_id} 处理消息时出错: {e}\n{traceback.format_exc()}")
self.stats["failed_executions"] += 1
self.last_activity_time = time.time()
return {
"success": False,
"stream_id": self.stream_id,
"error_message": str(e),
"executed_count": 0,
}
def get_stats(self) -> Dict[str, any]:
"""
获取处理器统计信息
Returns:
统计信息字典
"""
return self.stats.copy()
def get_planner_stats(self) -> Dict[str, any]:
"""
获取规划器统计信息
Returns:
规划器统计信息字典
"""
return self.planner.get_planner_stats()
def get_interest_scoring_stats(self) -> Dict[str, any]:
"""
获取兴趣度评分统计信息
Returns:
兴趣度评分统计信息字典
"""
return self.planner.get_interest_scoring_stats()
def get_relationship_stats(self) -> Dict[str, any]:
"""
获取用户关系统计信息
Returns:
用户关系统计信息字典
"""
return self.planner.get_relationship_stats()
def get_user_relationship(self, user_id: str) -> float:
"""
获取用户关系分
Args:
user_id: 用户ID
Returns:
用户关系分 (0.0-1.0)
"""
return self.planner.get_user_relationship(user_id)
def update_interest_keywords(self, new_keywords: dict):
"""
更新兴趣关键词
Args:
new_keywords: 新的兴趣关键词字典
"""
self.planner.update_interest_keywords(new_keywords)
logger.info(f"聊天流 {self.stream_id} 已更新兴趣关键词: {list(new_keywords.keys())}")
def reset_stats(self):
"""重置统计信息"""
self.stats = {
"messages_processed": 0,
"plans_created": 0,
"actions_executed": 0,
"successful_executions": 0,
"failed_executions": 0,
}
def is_active(self, max_inactive_minutes: int = 60) -> bool:
"""
检查处理器是否活跃
Args:
max_inactive_minutes: 最大不活跃分钟数
Returns:
是否活跃
"""
current_time = time.time()
max_inactive_seconds = max_inactive_minutes * 60
return (current_time - self.last_activity_time) < max_inactive_seconds
def get_activity_time(self) -> float:
"""
获取最后活动时间
Returns:
最后活动时间戳
"""
return self.last_activity_time
def __str__(self) -> str:
"""字符串表示"""
return f"AffinityFlowChatter(stream_id={self.stream_id}, messages={self.stats['messages_processed']})"
def __repr__(self) -> str:
"""详细字符串表示"""
return (f"AffinityFlowChatter(stream_id={self.stream_id}, "
f"messages_processed={self.stats['messages_processed']}, "
f"plans_created={self.stats['plans_created']}, "
f"last_activity={datetime.fromtimestamp(self.last_activity_time)})")

View File

@@ -0,0 +1,245 @@
"""
兴趣度评分系统
基于多维度评分机制,包括兴趣匹配度、用户关系分、提及度和时间因子
"""
from datetime import datetime
from typing import Dict, List
from src.common.data_models.database_data_model import DatabaseMessages
from src.common.data_models.info_data_model import InterestScore
from src.common.logger import get_logger
from src.config.config import global_config
logger = get_logger("interest_scoring")
class InterestScoringSystem:
"""兴趣度评分系统"""
def __init__(self):
self.interest_keywords = {
"游戏": ["游戏", "原神", "米哈游", "抽卡", "角色", "装备", "任务", "副本", "PVP", "LOL", "王者荣耀", "吃鸡"],
"动漫": ["动漫", "二次元", "新番", "番剧", "漫画", "角色", "声优", "OP", "ED"],
"音乐": ["音乐", "歌曲", "歌手", "专辑", "演唱会", "乐器", "作词", "作曲"],
"电影": ["电影", "电视剧", "综艺", "演员", "导演", "剧情", "影评", "票房"],
"科技": ["科技", "AI", "人工智能", "编程", "Python", "代码", "软件", "硬件", "手机"],
"生活": ["生活", "日常", "美食", "旅行", "天气", "工作", "学习", "健身"],
"情感": ["情感", "心情", "感情", "恋爱", "友情", "家人", "开心", "难过", "生气"],
}
# 评分权重
self.score_weights = {
"interest_match": 0.4, # 兴趣匹配度权重
"relationship": 0.3, # 关系分权重
"mentioned": 0.2, # 是否提及bot权重
"time_factor": 0.1, # 时间因子权重
}
# 评分阈值
self.reply_threshold = 0.6 # 默认回复阈值
self.mention_threshold = 0.3 # 提及阈值
# 连续不回复概率提升
self.no_reply_count = 0
self.max_no_reply_count = 5
self.probability_boost_per_no_reply = 0.15 # 每次不回复增加15%概率
# 用户关系数据
self.user_relationships: Dict[str, float] = {} # user_id -> relationship_score
def calculate_interest_scores(self, messages: List[DatabaseMessages], bot_nickname: str) -> List[InterestScore]:
"""计算消息的兴趣度评分"""
scores = []
user_messages = [msg for msg in messages if msg.role == "user"]
for msg in user_messages:
score = self._calculate_single_message_score(msg, bot_nickname)
scores.append(score)
return scores
def _calculate_single_message_score(self, message: DatabaseMessages, bot_nickname: str) -> InterestScore:
"""计算单条消息的兴趣度评分"""
# 1. 计算兴趣匹配度
interest_match_score = self._calculate_interest_match_score(message.content)
# 2. 计算关系分
relationship_score = self._calculate_relationship_score(message.user_id)
# 3. 计算提及分数
mentioned_score = self._calculate_mentioned_score(message.content, bot_nickname)
# 4. 计算时间因子
time_factor_score = self._calculate_time_factor_score(message.timestamp)
# 5. 计算总分
total_score = (
interest_match_score * self.score_weights["interest_match"] +
relationship_score * self.score_weights["relationship"] +
mentioned_score * self.score_weights["mentioned"] +
time_factor_score * self.score_weights["time_factor"]
)
details = {
"interest_match": f"兴趣匹配度: {interest_match_score:.2f}",
"relationship": f"关系分: {relationship_score:.2f}",
"mentioned": f"提及分数: {mentioned_score:.2f}",
"time_factor": f"时间因子: {time_factor_score:.2f}",
}
return InterestScore(
message_id=message.message_id,
total_score=total_score,
interest_match_score=interest_match_score,
relationship_score=relationship_score,
mentioned_score=mentioned_score,
time_factor_score=time_factor_score,
details=details
)
def _calculate_interest_match_score(self, content: str) -> float:
"""计算兴趣匹配度"""
if not content:
return 0.0
content_lower = content.lower()
max_score = 0.0
for _category, keywords in self.interest_keywords.items():
category_score = 0.0
matched_keywords = []
for keyword in keywords:
if keyword.lower() in content_lower:
category_score += 0.1
matched_keywords.append(keyword)
# 如果匹配到多个关键词,增加额外分数
if len(matched_keywords) > 1:
category_score += (len(matched_keywords) - 1) * 0.05
# 限制每个类别的最高分
category_score = min(category_score, 0.8)
max_score = max(max_score, category_score)
return min(max_score, 1.0)
def _calculate_relationship_score(self, user_id: str) -> float:
"""计算关系分"""
if user_id in self.user_relationships:
relationship_value = self.user_relationships[user_id]
return min(relationship_value, 1.0)
return 0.3 # 默认新用户的基础分
def _calculate_mentioned_score(self, content: str, bot_nickname: str) -> float:
"""计算提及分数"""
if not content:
return 0.0
content_lower = content.lower()
bot_name_lower = bot_nickname.lower()
if bot_name_lower in content_lower:
return 1.0
# 检查是否被@提及
if "@" in content and any(alias.lower() in content_lower for alias in global_config.bot.alias_names or []):
return 1.0
return 0.0
def _calculate_time_factor_score(self, timestamp: float) -> float:
"""计算时间因子分数"""
message_time = datetime.fromtimestamp(timestamp)
current_time = datetime.now()
time_diff_hours = (current_time - message_time).total_seconds() / 3600
# 24小时内消息时间因子为1.0,之后逐渐衰减
if time_diff_hours <= 24:
return 1.0
elif time_diff_hours <= 72: # 3天内
return 0.8
elif time_diff_hours <= 168: # 7天内
return 0.6
else:
return 0.3
def should_reply(self, score: InterestScore) -> bool:
"""判断是否应该回复"""
base_threshold = self.reply_threshold
# 如果被提及,降低阈值
if score.mentioned_score >= 1.0:
base_threshold = self.mention_threshold
# 计算连续不回复的概率提升
probability_boost = min(self.no_reply_count * self.probability_boost_per_no_reply, 0.8)
effective_threshold = base_threshold - probability_boost
logger.debug(f"评分决策: 总分={score.total_score:.2f}, 有效阈值={effective_threshold:.2f}, 连续不回复次数={self.no_reply_count}")
return score.total_score >= effective_threshold
def record_reply_action(self, did_reply: bool):
"""记录回复动作"""
if did_reply:
self.no_reply_count = max(0, self.no_reply_count - 1)
else:
self.no_reply_count += 1
# 限制最大计数
self.no_reply_count = min(self.no_reply_count, self.max_no_reply_count)
logger.debug(f"回复动作记录: {did_reply}, 当前连续不回复次数: {self.no_reply_count}")
def update_user_relationship(self, user_id: str, relationship_change: float):
"""更新用户关系"""
if user_id in self.user_relationships:
self.user_relationships[user_id] = max(0.0, min(1.0, self.user_relationships[user_id] + relationship_change))
else:
self.user_relationships[user_id] = max(0.0, min(1.0, relationship_change))
logger.debug(f"更新用户关系: {user_id} -> {self.user_relationships[user_id]:.2f}")
def get_user_relationship(self, user_id: str) -> float:
"""获取用户关系分"""
return self.user_relationships.get(user_id, 0.3)
def get_scoring_stats(self) -> Dict:
"""获取评分系统统计"""
return {
"no_reply_count": self.no_reply_count,
"max_no_reply_count": self.max_no_reply_count,
"reply_threshold": self.reply_threshold,
"mention_threshold": self.mention_threshold,
"user_relationships": len(self.user_relationships),
"interest_categories": len(self.interest_keywords),
}
def add_interest_category(self, category: str, keywords: List[str]):
"""添加新的兴趣类别"""
self.interest_keywords[category] = keywords
logger.info(f"添加新的兴趣类别: {category}, 关键词数量: {len(keywords)}")
def remove_interest_category(self, category: str):
"""移除兴趣类别"""
if category in self.interest_keywords:
del self.interest_keywords[category]
logger.info(f"移除兴趣类别: {category}")
def update_interest_keywords(self, category: str, keywords: List[str]):
"""更新兴趣类别的关键词"""
if category in self.interest_keywords:
self.interest_keywords[category] = keywords
logger.info(f"更新兴趣类别 {category} 的关键词: {len(keywords)}")
else:
self.add_interest_category(category, keywords)
def get_interest_keywords(self) -> Dict[str, List[str]]:
"""获取所有兴趣关键词"""
return self.interest_keywords.copy()
def reset_stats(self):
"""重置统计信息"""
self.no_reply_count = 0
logger.info("重置兴趣度评分系统统计")

View File

@@ -0,0 +1,228 @@
"""
用户关系追踪器
负责追踪用户交互历史并通过LLM分析更新用户关系分
"""
import time
from typing import Dict, List, Optional
from src.common.logger import get_logger
from src.config.config import model_config
from src.llm_models.utils_model import LLMRequest
logger = get_logger("relationship_tracker")
class UserRelationshipTracker:
"""用户关系追踪器"""
def __init__(self, interest_scoring_system=None):
self.tracking_users: Dict[str, Dict] = {} # user_id -> interaction_data
self.max_tracking_users = 3
self.update_interval_minutes = 30
self.last_update_time = time.time()
self.relationship_history: List[Dict] = []
self.interest_scoring_system = interest_scoring_system
# 关系更新LLM
try:
self.relationship_llm = LLMRequest(
model_set=model_config.model_task_config.relationship_tracker,
request_type="relationship_tracker"
)
except AttributeError:
# 如果relationship_tracker配置不存在尝试其他可用的模型配置
available_models = [attr for attr in dir(model_config.model_task_config)
if not attr.startswith('_') and attr != 'model_dump']
if available_models:
# 使用第一个可用的模型配置
fallback_model = available_models[0]
logger.warning(f"relationship_tracker model configuration not found, using fallback: {fallback_model}")
self.relationship_llm = LLMRequest(
model_set=getattr(model_config.model_task_config, fallback_model),
request_type="relationship_tracker"
)
else:
# 如果没有任何模型配置创建一个简单的LLMRequest
logger.warning("No model configurations found, creating basic LLMRequest")
self.relationship_llm = LLMRequest(
model_set="gpt-3.5-turbo", # 默认模型
request_type="relationship_tracker"
)
def set_interest_scoring_system(self, interest_scoring_system):
"""设置兴趣度评分系统引用"""
self.interest_scoring_system = interest_scoring_system
def add_interaction(self, user_id: str, user_name: str, user_message: str, bot_reply: str, reply_timestamp: float):
"""添加用户交互记录"""
if len(self.tracking_users) >= self.max_tracking_users:
# 移除最旧的记录
oldest_user = min(self.tracking_users.keys(),
key=lambda k: self.tracking_users[k].get("reply_timestamp", 0))
del self.tracking_users[oldest_user]
# 获取当前关系分
current_relationship_score = 0.3 # 默认值
if self.interest_scoring_system:
current_relationship_score = self.interest_scoring_system.get_user_relationship(user_id)
self.tracking_users[user_id] = {
"user_id": user_id,
"user_name": user_name,
"user_message": user_message,
"bot_reply": bot_reply,
"reply_timestamp": reply_timestamp,
"current_relationship_score": current_relationship_score
}
logger.debug(f"添加用户交互追踪: {user_id}")
async def check_and_update_relationships(self) -> List[Dict]:
"""检查并更新用户关系"""
current_time = time.time()
if current_time - self.last_update_time < self.update_interval_minutes * 60:
return []
updates = []
for user_id, interaction in list(self.tracking_users.items()):
if current_time - interaction["reply_timestamp"] > 60 * 5: # 5分钟
update = await self._update_user_relationship(interaction)
if update:
updates.append(update)
del self.tracking_users[user_id]
self.last_update_time = current_time
return updates
async def _update_user_relationship(self, interaction: Dict) -> Optional[Dict]:
"""更新单个用户的关系"""
try:
prompt = f"""
分析以下用户交互,更新用户关系:
用户ID: {interaction['user_id']}
用户名: {interaction['user_name']}
用户消息: {interaction['user_message']}
Bot回复: {interaction['bot_reply']}
当前关系分: {interaction['current_relationship_score']}
请以JSON格式返回更新结果
{{
"new_relationship_score": 0.0~1.0的数值,
"reasoning": "更新理由",
"interaction_summary": "交互总结"
}}
"""
llm_response, _ = await self.relationship_llm.generate_response_async(prompt=prompt)
if llm_response:
import json
response_data = json.loads(llm_response)
new_score = max(0.0, min(1.0, float(response_data.get("new_relationship_score", 0.3))))
if self.interest_scoring_system:
self.interest_scoring_system.update_user_relationship(
interaction['user_id'],
new_score - interaction['current_relationship_score']
)
return {
"user_id": interaction['user_id'],
"new_relationship_score": new_score,
"reasoning": response_data.get("reasoning", ""),
"interaction_summary": response_data.get("interaction_summary", "")
}
except Exception as e:
logger.error(f"更新用户关系时出错: {e}")
return None
def get_tracking_users(self) -> Dict[str, Dict]:
"""获取正在追踪的用户"""
return self.tracking_users.copy()
def get_user_interaction(self, user_id: str) -> Optional[Dict]:
"""获取特定用户的交互记录"""
return self.tracking_users.get(user_id)
def remove_user_tracking(self, user_id: str):
"""移除用户追踪"""
if user_id in self.tracking_users:
del self.tracking_users[user_id]
logger.debug(f"移除用户追踪: {user_id}")
def clear_all_tracking(self):
"""清空所有追踪"""
self.tracking_users.clear()
logger.info("清空所有用户追踪")
def get_relationship_history(self) -> List[Dict]:
"""获取关系历史记录"""
return self.relationship_history.copy()
def add_to_history(self, relationship_update: Dict):
"""添加到关系历史"""
self.relationship_history.append({
**relationship_update,
"update_time": time.time()
})
# 限制历史记录数量
if len(self.relationship_history) > 100:
self.relationship_history = self.relationship_history[-100:]
def get_tracker_stats(self) -> Dict:
"""获取追踪器统计"""
return {
"tracking_users": len(self.tracking_users),
"max_tracking_users": self.max_tracking_users,
"update_interval_minutes": self.update_interval_minutes,
"relationship_history": len(self.relationship_history),
"last_update_time": self.last_update_time,
}
def update_config(self, max_tracking_users: int = None, update_interval_minutes: int = None):
"""更新配置"""
if max_tracking_users is not None:
self.max_tracking_users = max_tracking_users
logger.info(f"更新最大追踪用户数: {max_tracking_users}")
if update_interval_minutes is not None:
self.update_interval_minutes = update_interval_minutes
logger.info(f"更新关系更新间隔: {update_interval_minutes} 分钟")
def force_update_relationship(self, user_id: str, new_score: float, reasoning: str = ""):
"""强制更新用户关系分"""
if user_id in self.tracking_users:
current_score = self.tracking_users[user_id]["current_relationship_score"]
if self.interest_scoring_system:
self.interest_scoring_system.update_user_relationship(
user_id,
new_score - current_score
)
update_info = {
"user_id": user_id,
"new_relationship_score": new_score,
"reasoning": reasoning or "手动更新",
"interaction_summary": "手动更新关系分"
}
self.add_to_history(update_info)
logger.info(f"强制更新用户关系: {user_id} -> {new_score:.2f}")
def get_user_summary(self, user_id: str) -> Dict:
"""获取用户交互总结"""
if user_id not in self.tracking_users:
return {}
interaction = self.tracking_users[user_id]
return {
"user_id": user_id,
"user_name": interaction["user_name"],
"current_relationship_score": interaction["current_relationship_score"],
"interaction_count": 1, # 简化版本,每次追踪只记录一次交互
"last_interaction": interaction["reply_timestamp"],
"recent_message": interaction["user_message"][:100] + "..." if len(interaction["user_message"]) > 100 else interaction["user_message"]
}

View File

@@ -1,8 +1,13 @@
"""
PlanExecutor: 接收 Plan 对象并执行其中的所有动作。
集成用户关系追踪机制,自动记录交互并更新关系。
"""
import asyncio
import time
from typing import Dict, List
from src.chat.planner_actions.action_manager import ActionManager
from src.common.data_models.info_data_model import Plan
from src.common.data_models.info_data_model import Plan, ActionPlannerInfo
from src.common.logger import get_logger
logger = get_logger("plan_executor")
@@ -10,48 +15,286 @@ logger = get_logger("plan_executor")
class PlanExecutor:
"""
负责接收一个 Plan 对象,并执行其中最终确定的所有动作
增强版PlanExecutor集成用户关系追踪机制
这个类是规划流程的最后一步,将规划结果转化为实际的动作执行。
Attributes:
action_manager (ActionManager): 用于实际执行各种动作的管理器实例。
功能:
1. 执行Plan中的所有动作
2. 自动记录用户交互并添加到关系追踪
3. 分类执行回复动作和其他动作
4. 提供完整的执行统计和监控
"""
def __init__(self, action_manager: ActionManager):
"""
初始化 PlanExecutor。
初始化增强版PlanExecutor。
Args:
action_manager (ActionManager): 一个 ActionManager 实例,用于执行动作
action_manager (ActionManager): 用于实际执行各种动作的管理器实例
"""
self.action_manager = action_manager
async def execute(self, plan: Plan):
# 执行统计
self.execution_stats = {
"total_executed": 0,
"successful_executions": 0,
"failed_executions": 0,
"reply_executions": 0,
"other_action_executions": 0,
"execution_times": [],
}
# 用户关系追踪引用
self.relationship_tracker = None
def set_relationship_tracker(self, relationship_tracker):
"""设置关系追踪器"""
self.relationship_tracker = relationship_tracker
async def execute(self, plan: Plan) -> Dict[str, any]:
"""
遍历并执行Plan对象中`decided_actions`列表里的所有动作。
如果动作类型为 "no_action",则会记录原因并跳过。
否则,它将调用 ActionManager 来执行相应的动作。
Args:
plan (Plan): 包含待执行动作列表的Plan对象。
Returns:
Dict[str, any]: 执行结果统计信息
"""
if not plan.decided_actions:
logger.info("没有需要执行的动作。")
return {"executed_count": 0, "results": []}
execution_results = []
reply_actions = []
other_actions = []
# 分类动作:回复动作和其他动作
for action_info in plan.decided_actions:
if action_info.action_type in ["reply", "proactive_reply"]:
reply_actions.append(action_info)
else:
other_actions.append(action_info)
# 执行回复动作(优先执行)
if reply_actions:
reply_result = await self._execute_reply_actions(reply_actions, plan)
execution_results.extend(reply_result["results"])
self.execution_stats["reply_executions"] += len(reply_actions)
# 并行执行其他动作
if other_actions:
other_result = await self._execute_other_actions(other_actions, plan)
execution_results.extend(other_result["results"])
self.execution_stats["other_action_executions"] += len(other_actions)
# 更新总体统计
self.execution_stats["total_executed"] += len(plan.decided_actions)
successful_count = sum(1 for r in execution_results if r["success"])
self.execution_stats["successful_executions"] += successful_count
self.execution_stats["failed_executions"] += len(execution_results) - successful_count
logger.info(f"动作执行完成: 总数={len(plan.decided_actions)}, 成功={successful_count}, 失败={len(execution_results) - successful_count}")
return {
"executed_count": len(plan.decided_actions),
"successful_count": successful_count,
"failed_count": len(execution_results) - successful_count,
"results": execution_results,
}
async def _execute_reply_actions(self, reply_actions: List[ActionPlannerInfo], plan: Plan) -> Dict[str, any]:
"""执行回复动作"""
results = []
for action_info in reply_actions:
result = await self._execute_single_reply_action(action_info, plan)
results.append(result)
return {"results": results}
async def _execute_single_reply_action(self, action_info: ActionPlannerInfo, plan: Plan) -> Dict[str, any]:
"""执行单个回复动作"""
start_time = time.time()
success = False
error_message = ""
reply_content = ""
try:
logger.info(f"执行回复动作: {action_info.action_type}, 原因: {action_info.reasoning}")
# 构建回复动作参数
action_params = {
"chat_id": plan.chat_id,
"target_message": action_info.action_message,
"reasoning": action_info.reasoning,
"action_data": action_info.action_data or {},
}
# 通过动作管理器执行回复
reply_content = await self.action_manager.execute_action(
action_name=action_info.action_type,
**action_params
)
success = True
logger.info(f"回复动作执行成功: {action_info.action_type}")
except Exception as e:
error_message = str(e)
logger.error(f"执行回复动作失败: {action_info.action_type}, 错误: {error_message}")
# 记录用户关系追踪
if success and action_info.action_message:
await self._track_user_interaction(action_info, plan, reply_content)
execution_time = time.time() - start_time
self.execution_stats["execution_times"].append(execution_time)
return {
"action_type": action_info.action_type,
"success": success,
"error_message": error_message,
"execution_time": execution_time,
"reasoning": action_info.reasoning,
"reply_content": reply_content[:200] + "..." if len(reply_content) > 200 else reply_content,
}
async def _execute_other_actions(self, other_actions: List[ActionPlannerInfo], plan: Plan) -> Dict[str, any]:
"""执行其他动作"""
results = []
# 并行执行其他动作
tasks = []
for action_info in other_actions:
task = self._execute_single_other_action(action_info, plan)
tasks.append(task)
if tasks:
executed_results = await asyncio.gather(*tasks, return_exceptions=True)
for i, result in enumerate(executed_results):
if isinstance(result, Exception):
logger.error(f"执行动作 {other_actions[i].action_type} 时发生异常: {result}")
results.append({
"action_type": other_actions[i].action_type,
"success": False,
"error_message": str(result),
"execution_time": 0,
"reasoning": other_actions[i].reasoning,
})
else:
results.append(result)
return {"results": results}
async def _execute_single_other_action(self, action_info: ActionPlannerInfo, plan: Plan) -> Dict[str, any]:
"""执行单个其他动作"""
start_time = time.time()
success = False
error_message = ""
try:
logger.info(f"执行其他动作: {action_info.action_type}, 原因: {action_info.reasoning}")
# 构建动作参数
action_params = {
"chat_id": plan.chat_id,
"target_message": action_info.action_message,
"reasoning": action_info.reasoning,
"action_data": action_info.action_data or {},
}
# 通过动作管理器执行动作
await self.action_manager.execute_action(
action_name=action_info.action_type,
**action_params
)
success = True
logger.info(f"其他动作执行成功: {action_info.action_type}")
except Exception as e:
error_message = str(e)
logger.error(f"执行其他动作失败: {action_info.action_type}, 错误: {error_message}")
execution_time = time.time() - start_time
self.execution_stats["execution_times"].append(execution_time)
return {
"action_type": action_info.action_type,
"success": success,
"error_message": error_message,
"execution_time": execution_time,
"reasoning": action_info.reasoning,
}
async def _track_user_interaction(self, action_info: ActionPlannerInfo, plan: Plan, reply_content: str):
"""追踪用户交互"""
try:
if not action_info.action_message:
return
for action_info in plan.decided_actions:
if action_info.action_type == "no_action":
logger.info(f"规划器决策不执行动作,原因: {action_info.reasoning}")
continue
# 获取用户信息
user_id = action_info.action_message.user_id
user_name = action_info.action_message.user_nickname or user_id
user_message = action_info.action_message.content
# TODO: 对接 ActionManager 的执行方法
# 这是一个示例调用,需要根据 ActionManager 的最终实现进行调整
logger.info(f"执行动作: {action_info.action_type}, 原因: {action_info.reasoning}")
# await self.action_manager.execute_action(
# action_name=action_info.action_type,
# action_data=action_info.action_data,
# reasoning=action_info.reasoning,
# action_message=action_info.action_message,
# )
# 如果有设置关系追踪器,添加交互记录
if self.relationship_tracker:
self.relationship_tracker.add_interaction(
user_id=user_id,
user_name=user_name,
user_message=user_message,
bot_reply=reply_content,
reply_timestamp=time.time()
)
logger.debug(f"已添加用户交互追踪: {user_id}")
except Exception as e:
logger.error(f"追踪用户交互时出错: {e}")
def get_execution_stats(self) -> Dict[str, any]:
"""获取执行统计信息"""
stats = self.execution_stats.copy()
# 计算平均执行时间
if stats["execution_times"]:
avg_time = sum(stats["execution_times"]) / len(stats["execution_times"])
stats["average_execution_time"] = avg_time
stats["max_execution_time"] = max(stats["execution_times"])
stats["min_execution_time"] = min(stats["execution_times"])
else:
stats["average_execution_time"] = 0
stats["max_execution_time"] = 0
stats["min_execution_time"] = 0
# 移除执行时间列表以避免返回过大数据
stats.pop("execution_times", None)
return stats
def reset_stats(self):
"""重置统计信息"""
self.execution_stats = {
"total_executed": 0,
"successful_executions": 0,
"failed_executions": 0,
"reply_executions": 0,
"other_action_executions": 0,
"execution_times": [],
}
def get_recent_performance(self, limit: int = 10) -> List[Dict[str, any]]:
"""获取最近的执行性能"""
recent_times = self.execution_stats["execution_times"][-limit:]
if not recent_times:
return []
return [
{
"execution_index": i + 1,
"execution_time": time_val,
"timestamp": time.time() - (len(recent_times) - i) * 60, # 估算时间戳
}
for i, time_val in enumerate(recent_times)
]

View File

@@ -2,7 +2,7 @@
PlanGenerator: 负责搜集和汇总所有决策所需的信息,生成一个未经筛选的“原始计划” (Plan)。
"""
import time
from typing import Dict, Optional, Tuple
from typing import Dict
from src.chat.utils.chat_message_builder import get_raw_msg_before_timestamp_with_chat
from src.chat.utils.utils import get_chat_type_and_target_info

View File

@@ -1,6 +1,8 @@
"""
主规划器入口,负责协调 PlanGenerator, PlanFilter, 和 PlanExecutor。
集成兴趣度评分系统和用户关系追踪机制,实现智能化的聊天决策。
"""
import time
from dataclasses import asdict
from typing import Dict, List, Optional, Tuple
@@ -8,36 +10,34 @@ from src.chat.planner_actions.action_manager import ActionManager
from src.chat.planner_actions.plan_executor import PlanExecutor
from src.chat.planner_actions.plan_filter import PlanFilter
from src.chat.planner_actions.plan_generator import PlanGenerator
from src.common.data_models.info_data_model import ActionPlannerInfo
from src.chat.affinity_flow.interest_scoring import InterestScoringSystem
from src.chat.affinity_flow.relationship_tracker import UserRelationshipTracker
from src.common.data_models.info_data_model import Plan
from src.common.logger import get_logger
from src.config.config import global_config
from src.plugin_system.base.component_types import ChatMode
# 导入提示词模块以确保其被初始化
from . import planner_prompts
logger = get_logger("planner")
class ActionPlanner:
"""
ActionPlanner 是规划系统的核心协调器
增强版ActionPlanner,集成兴趣度评分和用户关系追踪机制
它负责整合规划流程的三个主要阶段
1. **生成 (Generate)**: 使用 PlanGenerator 创建一个初始的行动计划。
2. **筛选 (Filter)**: 使用 PlanFilter 对生成的计划进行审查和优化。
3. **执行 (Execute)**: 使用 PlanExecutor 执行最终确定的行动。
Attributes:
chat_id (str): 当前聊天的唯一标识符。
action_manager (ActionManager): 用于执行具体动作的管理器。
generator (PlanGenerator): 负责生成初始计划。
filter (PlanFilter): 负责筛选和优化计划。
executor (PlanExecutor): 负责执行最终计划。
核心功能
1. 兴趣度评分系统:根据兴趣匹配度、关系分、提及度、时间因子对消息评分
2. 用户关系追踪:自动追踪用户交互并更新关系分
3. 智能回复决策:基于兴趣度阈值和连续不回复概率的智能决策
4. 完整的规划流程:生成→筛选→执行的完整三阶段流程
"""
def __init__(self, chat_id: str, action_manager: ActionManager):
"""
初始化 ActionPlanner。
初始化增强版ActionPlanner。
Args:
chat_id (str): 当前聊天的 ID。
@@ -49,48 +49,197 @@ class ActionPlanner:
self.filter = PlanFilter()
self.executor = PlanExecutor(action_manager)
async def plan(
self, mode: ChatMode = ChatMode.FOCUS
) -> Tuple[List[Dict], Optional[Dict]]:
"""
执行从生成到执行的完整规划流程。
# 初始化兴趣度评分系统
self.interest_scoring = InterestScoringSystem()
这个方法按顺序协调生成、筛选和执行三个阶段。
# 初始化用户关系追踪器
self.relationship_tracker = UserRelationshipTracker(self.interest_scoring)
# 设置执行器的关系追踪器
self.executor.set_relationship_tracker(self.relationship_tracker)
# 规划器统计
self.planner_stats = {
"total_plans": 0,
"successful_plans": 0,
"failed_plans": 0,
"replies_generated": 0,
"other_actions_executed": 0,
}
async def plan(self, mode: ChatMode = ChatMode.FOCUS, use_enhanced: bool = True) -> Tuple[List[Dict], Optional[Dict]]:
"""
执行完整的增强版规划流程。
Args:
mode (ChatMode): 当前的聊天模式,默认为 FOCUS。
use_enhanced (bool): 是否使用增强功能,默认为 True。
Returns:
Tuple[List[Dict], Optional[Dict]]: 一个元组,包含:
- final_actions_dict (List[Dict]): 最终确定的动作列表(字典格式)。
- final_target_message_dict (Optional[Dict]): 最终的目标消息(字典格式),如果没有则为 None
这与旧版 planner 的返回值保持兼容。
- final_target_message_dict (Optional[Dict]): 最终的目标消息(字典格式)。
"""
try:
self.planner_stats["total_plans"] += 1
if use_enhanced:
return await self._enhanced_plan_flow(mode)
else:
return await self._standard_plan_flow(mode)
except Exception as e:
logger.error(f"规划流程出错: {e}")
self.planner_stats["failed_plans"] += 1
return [], None
async def _enhanced_plan_flow(self, mode: ChatMode) -> Tuple[List[Dict], Optional[Dict]]:
"""执行增强版规划流程"""
try:
# 1. 生成初始 Plan
initial_plan = await self.generator.generate(mode)
# 2. 兴趣度评分
if initial_plan.chat_history:
bot_nickname = global_config.bot.nickname
interest_scores = self.interest_scoring.calculate_interest_scores(
initial_plan.chat_history, bot_nickname
)
# 3. 根据兴趣度调整可用动作
if interest_scores:
latest_score = max(interest_scores, key=lambda s: s.total_score)
should_reply = self.interest_scoring.should_reply(latest_score)
if not should_reply and "reply" in initial_plan.available_actions:
logger.info(f"消息兴趣度不足({latest_score.total_score:.2f})移除reply动作")
del initial_plan.available_actions["reply"]
self.interest_scoring.record_reply_action(False)
else:
self.interest_scoring.record_reply_action(True)
# 4. 筛选 Plan
filtered_plan = await self.filter.filter(initial_plan)
# 5. 执行 Plan
await self._execute_plan_with_tracking(filtered_plan)
# 6. 检查关系更新
await self.relationship_tracker.check_and_update_relationships()
# 7. 返回结果
return self._build_return_result(filtered_plan)
except Exception as e:
logger.error(f"增强版规划流程出错: {e}")
self.planner_stats["failed_plans"] += 1
return [], None
async def _standard_plan_flow(self, mode: ChatMode) -> Tuple[List[Dict], Optional[Dict]]:
"""执行标准规划流程"""
try:
# 1. 生成初始 Plan
initial_plan = await self.generator.generate(mode)
# 2. 筛选 Plan
filtered_plan = await self.filter.filter(initial_plan)
# 3. 执行 Plan(临时引爆因为它暂时还跑不了)
#await self.executor.execute(filtered_plan)
# 3. 执行 Plan
await self._execute_plan_with_tracking(filtered_plan)
# 4. 返回结果 (与旧版 planner 的返回值保持兼容)
final_actions = filtered_plan.decided_actions or []
# 4. 返回结果
return self._build_return_result(filtered_plan)
except Exception as e:
logger.error(f"标准规划流程出错: {e}")
self.planner_stats["failed_plans"] += 1
return [], None
async def _execute_plan_with_tracking(self, plan: Plan):
"""执行Plan并追踪用户关系"""
if not plan.decided_actions:
return
for action_info in plan.decided_actions:
if action_info.action_type in ["reply", "proactive_reply"] and action_info.action_message:
# 记录用户交互
self.relationship_tracker.add_interaction(
user_id=action_info.action_message.user_id,
user_name=action_info.action_message.user_nickname or action_info.action_message.user_id,
user_message=action_info.action_message.content,
bot_reply="Bot回复内容", # 这里需要实际的回复内容
reply_timestamp=time.time()
)
# 执行动作
try:
await self.action_manager.execute_action(
action_name=action_info.action_type,
chat_id=self.chat_id,
target_message=action_info.action_message,
reasoning=action_info.reasoning,
action_data=action_info.action_data or {},
)
self.planner_stats["successful_plans"] += 1
if action_info.action_type in ["reply", "proactive_reply"]:
self.planner_stats["replies_generated"] += 1
else:
self.planner_stats["other_actions_executed"] += 1
except Exception as e:
logger.error(f"执行动作失败: {action_info.action_type}, 错误: {e}")
def _build_return_result(self, plan: Plan) -> Tuple[List[Dict], Optional[Dict]]:
"""构建返回结果"""
final_actions = plan.decided_actions or []
final_target_message = next(
(act.action_message for act in final_actions if act.action_message), None
)
final_actions_dict = [asdict(act) for act in final_actions]
# action_message现在可能是字典而不是dataclass实例需要特殊处理
if final_target_message:
if hasattr(final_target_message, '__dataclass_fields__'):
# 如果是dataclass实例使用asdict转换
final_target_message_dict = asdict(final_target_message)
else:
# 如果已经是字典,直接使用
final_target_message_dict = final_target_message
else:
final_target_message_dict = None
return final_actions_dict, final_target_message_dict
def get_user_relationship(self, user_id: str) -> float:
"""获取用户关系分"""
return self.interest_scoring.get_user_relationship(user_id)
def update_interest_keywords(self, new_keywords: Dict[str, List[str]]):
"""更新兴趣关键词"""
self.interest_scoring.interest_keywords.update(new_keywords)
logger.info(f"已更新兴趣关键词: {list(new_keywords.keys())}")
def get_planner_stats(self) -> Dict[str, any]:
"""获取规划器统计"""
return self.planner_stats.copy()
def get_interest_scoring_stats(self) -> Dict[str, any]:
"""获取兴趣度评分统计"""
return {
"no_reply_count": self.interest_scoring.no_reply_count,
"max_no_reply_count": self.interest_scoring.max_no_reply_count,
"reply_threshold": self.interest_scoring.reply_threshold,
"mention_threshold": self.interest_scoring.mention_threshold,
"user_relationships": len(self.interest_scoring.user_relationships),
}
def get_relationship_stats(self) -> Dict[str, any]:
"""获取用户关系统计"""
return {
"tracking_users": len(self.relationship_tracker.tracking_users),
"relationship_history": len(self.relationship_tracker.relationship_history),
"max_tracking_users": self.relationship_tracker.max_tracking_users,
}
# 全局兴趣度评分系统实例
interest_scoring_system = InterestScoringSystem()

View File

@@ -25,6 +25,18 @@ class ActionPlannerInfo(BaseDataModel):
available_actions: Optional[Dict[str, "ActionInfo"]] = None
@dataclass
class InterestScore(BaseDataModel):
"""兴趣度评分结果"""
message_id: str
total_score: float
interest_match_score: float
relationship_score: float
mentioned_score: float
time_factor_score: float
details: Dict[str, str]
@dataclass
class Plan(BaseDataModel):
"""