diff --git a/src/chat/affinity_flow/afc_manager.py b/src/chat/affinity_flow/afc_manager.py index 38c50816e..b67ae6939 100644 --- a/src/chat/affinity_flow/afc_manager.py +++ b/src/chat/affinity_flow/afc_manager.py @@ -49,14 +49,14 @@ class AFCManager: return self.affinity_flow_chatters[stream_id] - async def process_message(self, stream_id: str, message_data: dict) -> Dict[str, any]: - """处理消息""" + async def process_stream_context(self, stream_id: str, context) -> Dict[str, any]: + """处理StreamContext对象""" try: # 获取或创建聊天处理器 chatter = self.get_or_create_chatter(stream_id) - # 处理消息 - result = await chatter.process_message(message_data) + # 处理StreamContext + result = await chatter.process_stream_context(context) # 更新统计 self.manager_stats["total_messages_processed"] += 1 @@ -66,20 +66,13 @@ class AFCManager: return result except Exception as e: - logger.error(f"处理消息时出错: {e}\n{traceback.format_exc()}") + logger.error(f"处理StreamContext时出错: {e}\n{traceback.format_exc()}") return { "success": False, "error_message": str(e), "executed_count": 0, } - async def process_messages_batch(self, stream_id: str, messages_data: List[dict]) -> List[Dict[str, any]]: - """批量处理消息""" - results = [] - for message_data in messages_data: - result = await self.process_message(stream_id, message_data) - results.append(result) - return results def get_chatter_stats(self, stream_id: str) -> Optional[Dict[str, any]]: """获取聊天处理器统计""" diff --git a/src/chat/affinity_flow/chatter.py b/src/chat/affinity_flow/chatter.py index 8514350b2..a613a143e 100644 --- a/src/chat/affinity_flow/chatter.py +++ b/src/chat/affinity_flow/chatter.py @@ -42,28 +42,31 @@ class AffinityFlowChatter: } self.last_activity_time = time.time() - async def process_message(self, message_data: dict) -> Dict[str, any]: + async def process_stream_context(self, context) -> Dict[str, any]: """ - 处理单个消息 + 处理StreamContext对象 Args: - message_data: 消息数据字典,包含: - - message_info: 消息基本信息 - - processed_plain_text: 处理后的纯文本 - - context_messages: 上下文消息(历史+未读) - - unread_messages: 未读消息列表 + context: StreamContext对象,包含聊天流的所有消息信息 Returns: 处理结果字典 """ try: - # 提取未读消息用于兴趣度计算 - unread_messages = message_data.get("unread_messages", []) + # 获取未读消息和历史消息 + unread_messages = context.get_unread_messages() + history_messages = context.get_history_messages() - # 使用增强版规划器处理消息,传递未读消息用于兴趣度计算 + # 准备消息数据 + message_data = { + "unread_messages": unread_messages, + "history_messages": history_messages + } + + # 使用增强版规划器处理消息 actions, target_message = await self.planner.plan( mode=ChatMode.FOCUS, - unread_messages=unread_messages + message_data=message_data ) self.stats["plans_created"] += 1 @@ -76,7 +79,7 @@ class AffinityFlowChatter: # 更新统计 self.stats["messages_processed"] += 1 self.stats["actions_executed"] += execution_result.get("executed_count", 0) - self.stats["successful_executions"] += 1 # TODO:假设成功 + self.stats["successful_executions"] += 1 self.last_activity_time = time.time() result = { @@ -89,12 +92,12 @@ class AffinityFlowChatter: **execution_result, } - logger.info(f"聊天流 {self.stream_id} 消息处理成功: 动作数={result['actions_count']}, 未读消息={result['unread_messages_processed']}") + logger.info(f"聊天流 {self.stream_id} StreamContext处理成功: 动作数={result['actions_count']}, 未读消息={result['unread_messages_processed']}") return result except Exception as e: - logger.error(f"亲和力聊天处理器 {self.stream_id} 处理消息时出错: {e}\n{traceback.format_exc()}") + logger.error(f"亲和力聊天处理器 {self.stream_id} 处理StreamContext时出错: {e}\n{traceback.format_exc()}") self.stats["failed_executions"] += 1 self.last_activity_time = time.time() diff --git a/src/chat/affinity_flow/interest_scoring.py b/src/chat/affinity_flow/interest_scoring.py index fd05bb22a..9a35bfaaa 100644 --- a/src/chat/affinity_flow/interest_scoring.py +++ b/src/chat/affinity_flow/interest_scoring.py @@ -30,13 +30,13 @@ class InterestScoringSystem: } # 评分阈值 - self.reply_threshold = 0.55 # 默认回复阈值 + self.reply_threshold = 0.56 # 默认回复阈值 self.mention_threshold = 0.3 # 提及阈值 # 连续不回复概率提升 self.no_reply_count = 0 - self.max_no_reply_count = 15 - self.probability_boost_per_no_reply = 0.01 # 每次不回复增加15%概率 + self.max_no_reply_count = 20 + self.probability_boost_per_no_reply = 0.02 # 每次不回复增加5%概率 # 用户关系数据 self.user_relationships: Dict[str, float] = {} # user_id -> relationship_score @@ -149,7 +149,7 @@ class InterestScoringSystem: # 返回匹配分数,考虑置信度和匹配标签数量 match_count_bonus = min(len(match_result.matched_tags) * 0.05, 0.3) # 每多匹配一个标签+0.05,最高+0.3 - final_score = match_result.overall_score * 1.3 * match_result.confidence + match_count_bonus + final_score = match_result.overall_score * 1.15 * match_result.confidence + match_count_bonus logger.debug(f"⚖️ 最终分数计算: 总分({match_result.overall_score:.3f}) × 1.3 × 置信度({match_result.confidence:.3f}) + 标签数量奖励({match_count_bonus:.3f}) = {final_score:.3f}") return final_score else: @@ -227,7 +227,7 @@ class InterestScoringSystem: return 0.0 if msg.is_mentioned or (bot_nickname and bot_nickname in msg.processed_plain_text): - return 3.0 + return 1.0 return 0.0 @@ -273,7 +273,7 @@ class InterestScoringSystem: old_count = self.no_reply_count if did_reply: - self.no_reply_count = max(0, self.no_reply_count - 1) + self.no_reply_count = max(0, self.no_reply_count - 3) action = "✅ reply动作可用" else: self.no_reply_count += 1 diff --git a/src/chat/interest_system/bot_interest_manager.py b/src/chat/interest_system/bot_interest_manager.py index 4b2ea8a70..1d654efc9 100644 --- a/src/chat/interest_system/bot_interest_manager.py +++ b/src/chat/interest_system/bot_interest_manager.py @@ -433,9 +433,9 @@ class BotInterestManager: low_similarity_count = 0 # 分级相似度阈值 - high_threshold = 0.5 - medium_threshold = 0.3 - low_threshold = 0.15 + high_threshold = 0.55 + medium_threshold = 0.47 + low_threshold = 0.3 logger.debug(f"🔍 使用分级相似度阈值: 高={high_threshold}, 中={medium_threshold}, 低={low_threshold}") @@ -449,7 +449,7 @@ class BotInterestManager: # 根据相似度等级应用不同的加成 if similarity > high_threshold: # 高相似度:强加成 - enhanced_score = weighted_score * 1.5 + enhanced_score = weighted_score * 1.8 match_count += 1 high_similarity_count += 1 result.add_match(tag.tag_name, enhanced_score, [tag.tag_name]) @@ -457,7 +457,7 @@ class BotInterestManager: elif similarity > medium_threshold: # 中相似度:中等加成 - enhanced_score = weighted_score * 1.2 + enhanced_score = weighted_score * 1.4 match_count += 1 medium_similarity_count += 1 result.add_match(tag.tag_name, enhanced_score, [tag.tag_name]) @@ -465,7 +465,7 @@ class BotInterestManager: elif similarity > low_threshold: # 低相似度:轻微加成 - enhanced_score = weighted_score * 1.05 + enhanced_score = weighted_score * 1.15 match_count += 1 low_similarity_count += 1 result.add_match(tag.tag_name, enhanced_score, [tag.tag_name]) diff --git a/src/chat/message_manager/message_manager.py b/src/chat/message_manager/message_manager.py index 4d073ac10..503d92c03 100644 --- a/src/chat/message_manager/message_manager.py +++ b/src/chat/message_manager/message_manager.py @@ -5,13 +5,16 @@ import asyncio import time import traceback -from typing import Dict, Optional, Any +from typing import Dict, Optional, Any, TYPE_CHECKING from src.common.logger import get_logger from src.common.data_models.database_data_model import DatabaseMessages from src.common.data_models.message_manager_data_model import StreamContext, MessageManagerStats, StreamStats from src.chat.affinity_flow.afc_manager import afc_manager +if TYPE_CHECKING: + from src.common.data_models.message_manager_data_model import StreamContext + logger = get_logger("message_manager") @@ -120,44 +123,21 @@ class MessageManager: logger.debug(f"开始处理聊天流 {stream_id} 的 {len(unread_messages)} 条未读消息") - # 获取上下文消息 - context_messages = context.get_context_messages() - - # 批量处理消息 - messages_data = [] - for msg in unread_messages: - message_data = { - "message_info": { - "platform": msg.user_info.platform, - "user_info": { - "user_id": msg.user_info.user_id, - "user_nickname": msg.user_info.user_nickname, - "user_cardname": msg.user_info.user_cardname, - "platform": msg.user_info.platform - }, - "group_info": { - "group_id": msg.group_info.group_id, - "group_name": msg.group_info.group_name, - "group_platform": msg.group_info.group_platform - } if msg.group_info else None - }, - "processed_plain_text": msg.processed_plain_text, - "context_messages": [ctx_msg.flatten() for ctx_msg in context_messages], - "unread_messages": unread_messages # 传递原始对象而不是字典 - } - messages_data.append(message_data) - - # 发送到AFC处理器 - if messages_data: - results = await afc_manager.process_messages_batch(stream_id, messages_data) - - # 处理结果,标记消息为已读 - for i, result in enumerate(results): - if result.get("success", False): - msg_id = unread_messages[i].message_id - context.mark_message_as_read(msg_id) - self.stats.total_processed_messages += 1 - logger.debug(f"消息 {msg_id} 处理完成,标记为已读") + # 直接使用StreamContext对象进行处理 + if unread_messages: + try: + # 发送到AFC处理器,传递StreamContext对象 + results = await afc_manager.process_stream_context(stream_id, context) + + # 处理结果,标记消息为已读 + if results.get("success", False): + self._clear_all_unread_messages(context) + + except Exception as e: + # 发生异常时,清除所有未读消息,防止意外关闭等导致消息一直未读 + logger.error(f"处理聊天流 {stream_id} 时发生异常,将清除所有未读消息: {e}") + self._clear_all_unread_messages(context) + raise logger.debug(f"聊天流 {stream_id} 消息处理完成") @@ -227,6 +207,23 @@ class MessageManager: del self.stream_contexts[stream_id] logger.info(f"清理不活跃聊天流: {stream_id}") + def _clear_all_unread_messages(self, context: StreamContext): + """清除指定上下文中的所有未读消息,防止意外情况导致消息一直未读""" + unread_messages = context.get_unread_messages() + if not unread_messages: + return + + logger.warning(f"正在清除 {len(unread_messages)} 条未读消息") + + # 将所有未读消息标记为已读并移动到历史记录 + for msg in unread_messages[:]: # 使用切片复制避免迭代时修改列表 + try: + context.mark_message_as_read(msg.message_id) + self.stats.total_processed_messages += 1 + logger.info(f"强制清除消息 {msg.message_id},标记为已读") + except Exception as e: + logger.error(f"清除消息 {msg.message_id} 时出错: {e}") + # 创建全局消息管理器实例 message_manager = MessageManager() \ No newline at end of file diff --git a/src/chat/planner_actions/plan_filter.py b/src/chat/planner_actions/plan_filter.py index ee1f8e843..2c0802116 100644 --- a/src/chat/planner_actions/plan_filter.py +++ b/src/chat/planner_actions/plan_filter.py @@ -1,6 +1,7 @@ """ PlanFilter: 接收 Plan 对象,根据不同模式的逻辑进行筛选,决定最终要执行的动作。 """ + import orjson import time import traceback @@ -33,9 +34,7 @@ class PlanFilter: """ def __init__(self): - self.planner_llm = LLMRequest( - model_set=model_config.model_task_config.planner, request_type="planner" - ) + self.planner_llm = LLMRequest(model_set=model_config.model_task_config.planner, request_type="planner") self.last_obs_time_mark = 0.0 async def filter(self, reply_not_available: bool, plan: Plan) -> Plan: @@ -55,9 +54,9 @@ class PlanFilter: try: parsed_json = orjson.loads(repair_json(llm_content)) except orjson.JSONDecodeError: - prased_json = {"action": "no_action", "reason": "返回内容无法解析为JSON"} + parsed_json = {"action": "no_action", "reason": "返回内容无法解析为JSON"} logger.debug(f"墨墨在这里加了日志 -> 解析后的 JSON: {parsed_json}") - + if "reply" in plan.available_actions and reply_not_available: # 如果reply动作不可用,但llm返回的仍然有reply,则改为no_reply if isinstance(parsed_json, dict) and parsed_json.get("action") == "reply": @@ -86,28 +85,18 @@ class PlanFilter: if action_type in reply_action_types: if not reply_action_added: - final_actions.extend( - await self._parse_single_action( - item, used_message_id_list, plan - ) - ) + final_actions.extend(await self._parse_single_action(item, used_message_id_list, plan)) reply_action_added = True else: # 非回复类动作直接添加 - final_actions.extend( - await self._parse_single_action( - item, used_message_id_list, plan - ) - ) - + final_actions.extend(await self._parse_single_action(item, used_message_id_list, plan)) + plan.decided_actions = self._filter_no_actions(final_actions) except Exception as e: logger.error(f"筛选 Plan 时出错: {e}\n{traceback.format_exc()}") - plan.decided_actions = [ - ActionPlannerInfo(action_type="no_action", reasoning=f"筛选时出错: {e}") - ] - + plan.decided_actions = [ActionPlannerInfo(action_type="no_action", reasoning=f"筛选时出错: {e}")] + logger.debug(f"墨墨在这里加了日志 -> filter 出口 decided_actions: {plan.decided_actions}") return plan @@ -136,7 +125,7 @@ class PlanFilter: if plan.mode == ChatMode.PROACTIVE: long_term_memory_block = await self._get_long_term_memory_context() - + chat_content_block, message_id_list = build_readable_messages_with_id( messages=[msg.flatten() for msg in plan.chat_history], timestamp_mode="normal", @@ -165,7 +154,13 @@ class PlanFilter: ) return prompt, message_id_list - chat_content_block, message_id_list = build_readable_messages_with_id( + # 构建已读/未读历史消息 + read_history_block, unread_history_block, message_id_list = await self._build_read_unread_history_blocks( + plan + ) + + # 为了兼容性,保留原有的chat_content_block + chat_content_block, _ = build_readable_messages_with_id( messages=[msg.flatten() for msg in plan.chat_history], timestamp_mode="normal", read_mark=self.last_obs_time_mark, @@ -232,8 +227,8 @@ class PlanFilter: custom_prompt_block = "" if global_config.custom_prompt.planner_custom_prompt_content: custom_prompt_block = global_config.custom_prompt.planner_custom_prompt_content - - users_in_chat_str = "" # TODO: Re-implement user list fetching if needed + + users_in_chat_str = "" # TODO: Re-implement user list fetching if needed planner_prompt_template = await global_prompt_manager.get_prompt_async("planner_prompt") prompt = planner_prompt_template.format( @@ -241,7 +236,8 @@ class PlanFilter: mood_block=mood_block, time_block=time_block, chat_context_description=chat_context_description, - chat_content_block=chat_content_block, + read_history_block=read_history_block, + unread_history_block=unread_history_block, actions_before_now_block=actions_before_now_block, mentioned_bonus=mentioned_bonus, no_action_block=no_action_block, @@ -250,7 +246,7 @@ class PlanFilter: identity_block=identity_block, custom_prompt_block=custom_prompt_block, bot_name=bot_name, - users_in_chat=users_in_chat_str + users_in_chat=users_in_chat_str, ) return prompt, message_id_list except Exception as e: @@ -258,6 +254,114 @@ class PlanFilter: logger.error(traceback.format_exc()) return "构建 Planner Prompt 时出错", [] + async def _build_read_unread_history_blocks(self, plan: Plan) -> tuple[str, str, list]: + """构建已读/未读历史消息块""" + try: + # 从message_manager获取真实的已读/未读消息 + from src.chat.message_manager.message_manager import message_manager + from src.chat.utils.utils import assign_message_ids + + # 获取聊天流的上下文 + stream_context = message_manager.stream_contexts.get(plan.chat_id) + if not stream_context: + # 如果没有找到对应的上下文,使用兼容性处理 + return await self._fallback_build_history_blocks(plan) + + # 获取真正的已读和未读消息 + read_messages = stream_context.history_messages # 已读消息存储在history_messages中 + unread_messages = stream_context.get_unread_messages() # 获取未读消息 + + # 构建已读历史消息块 + if read_messages: + read_content, read_ids = build_readable_messages_with_id( + messages=[msg.flatten() for msg in read_messages[-50:]], # 限制数量 + timestamp_mode="normal_no_YMD", + truncate=False, + show_actions=False, + ) + read_history_block = f"{read_content}" + else: + read_history_block = "暂无已读历史消息" + + # 构建未读历史消息块(包含兴趣度) + if unread_messages: + # 扁平化未读消息用于计算兴趣度和格式化 + flattened_unread = [msg.flatten() for msg in unread_messages] + + # 尝试获取兴趣度评分(返回以真实 message_id 为键的字典) + interest_scores = await self._get_interest_scores_for_messages(flattened_unread) + + # 为未读消息分配短 id(保持与 build_readable_messages_with_id 的一致结构) + message_id_list = assign_message_ids(flattened_unread) + + unread_lines = [] + for idx, msg in enumerate(flattened_unread): + mapped = message_id_list[idx] + synthetic_id = mapped.get("id") + original_msg_id = msg.get("message_id") or msg.get("id") + msg_time = time.strftime("%H:%M:%S", time.localtime(msg.get("time", time.time()))) + msg_content = msg.get("processed_plain_text", "") + + # 添加兴趣度信息 + interest_score = interest_scores.get(original_msg_id, 0.0) + interest_text = f" [兴趣度: {interest_score:.3f}]" if interest_score > 0 else "" + + # 在未读行中显示合成id,方便 planner 返回时使用 + unread_lines.append(f"{msg_time} {synthetic_id}: {msg_content}{interest_text}") + + unread_history_block = "\n".join(unread_lines) + else: + unread_history_block = "暂无未读历史消息" + + return read_history_block, unread_history_block, message_id_list + + except Exception as e: + logger.error(f"构建已读/未读历史消息块时出错: {e}") + return "构建已读历史消息时出错", "构建未读历史消息时出错", [] + + async def _get_interest_scores_for_messages(self, messages: List[dict]) -> dict[str, float]: + """为消息获取兴趣度评分""" + interest_scores = {} + + try: + from src.chat.affinity_flow.interest_scoring import interest_scoring_system + from src.common.data_models.database_data_model import DatabaseMessages + + # 转换消息格式 + db_messages = [] + for msg_dict in messages: + try: + db_msg = DatabaseMessages( + message_id=msg_dict.get("message_id", ""), + time=msg_dict.get("time", time.time()), + chat_id=msg_dict.get("chat_id", ""), + processed_plain_text=msg_dict.get("processed_plain_text", ""), + user_id=msg_dict.get("user_id", ""), + user_nickname=msg_dict.get("user_nickname", ""), + user_platform=msg_dict.get("platform", "qq"), + chat_info_group_id=msg_dict.get("group_id", ""), + chat_info_group_name=msg_dict.get("group_name", ""), + chat_info_group_platform=msg_dict.get("platform", "qq"), + ) + db_messages.append(db_msg) + except Exception as e: + logger.warning(f"转换消息格式失败: {e}") + continue + + # 计算兴趣度评分 + if db_messages: + bot_nickname = global_config.bot.nickname or "麦麦" + scores = await interest_scoring_system.calculate_interest_scores(db_messages, bot_nickname) + + # 构建兴趣度字典 + for score in scores: + interest_scores[score.message_id] = score.total_score + + except Exception as e: + logger.warning(f"获取兴趣度评分失败: {e}") + + return interest_scores + async def _parse_single_action( self, action_json: dict, message_id_list: list, plan: Plan ) -> List[ActionPlannerInfo]: @@ -281,13 +385,18 @@ class PlanFilter: else: # 如果找不到目标消息,对于reply动作来说这是必需的,应该记录警告 if action == "reply": - logger.warning(f"reply动作找不到目标消息,target_message_id: {action_json.get('target_message_id')}") + logger.warning( + f"reply动作找不到目标消息,target_message_id: {action_json.get('target_message_id')}" + ) # 将reply动作改为no_action,避免后续执行时出错 action = "no_action" reasoning = f"找不到目标消息进行回复。原始理由: {reasoning}" available_action_names = list(plan.available_actions.keys()) - if action not in ["no_action", "no_reply", "reply", "do_nothing", "proactive_reply"] and action not in available_action_names: + if ( + action not in ["no_action", "no_reply", "reply", "do_nothing", "proactive_reply"] + and action not in available_action_names + ): reasoning = f"LLM 返回了当前不可用的动作 '{action}'。原始理由: {reasoning}" action = "no_action" @@ -310,9 +419,7 @@ class PlanFilter: ) return parsed_actions - def _filter_no_actions( - self, action_list: List[ActionPlannerInfo] - ) -> List[ActionPlannerInfo]: + def _filter_no_actions(self, action_list: List[ActionPlannerInfo]) -> List[ActionPlannerInfo]: non_no_actions = [a for a in action_list if a.action_type not in ["no_action", "no_reply"]] if non_no_actions: return non_no_actions @@ -361,11 +468,47 @@ class PlanFilter: return action_options_block def _find_message_by_id(self, message_id: str, message_id_list: list) -> Optional[Dict[str, Any]]: + # 兼容多种 message_id 格式:数字、m123、buffered-xxxx + # 如果是纯数字,补上 m 前缀以兼容旧格式 + candidate_ids = {message_id} if message_id.isdigit(): - message_id = f"m{message_id}" + candidate_ids.add(f"m{message_id}") + + # 如果是 m 开头且后面是数字,尝试去掉 m 前缀的数字形式 + if message_id.startswith("m") and message_id[1:].isdigit(): + candidate_ids.add(message_id[1:]) + + # 逐项匹配 message_id_list(每项可能为 {'id':..., 'message':...}) for item in message_id_list: - if item.get("id") == message_id: + # 支持 message_id_list 中直接是字符串/ID 的情形 + if isinstance(item, str): + if item in candidate_ids: + # 没有 message 对象,返回None + return None + continue + + if not isinstance(item, dict): + continue + + item_id = item.get("id") + # 直接匹配分配的短 id + if item_id and item_id in candidate_ids: return item.get("message") + + # 有时 message 存储里会有原始的 message_id 字段(如 buffered-xxxx) + message_obj = item.get("message") + if isinstance(message_obj, dict): + orig_mid = message_obj.get("message_id") or message_obj.get("id") + if orig_mid and orig_mid in candidate_ids: + return message_obj + + # 作为兜底,尝试在 message_id_list 中找到 message.message_id 匹配 + for item in message_id_list: + if isinstance(item, dict) and isinstance(item.get("message"), dict): + mid = item["message"].get("message_id") or item["message"].get("id") + if mid == message_id: + return item["message"] + return None def _get_latest_message(self, message_id_list: list) -> Optional[Dict[str, Any]]: diff --git a/src/chat/planner_actions/planner.py b/src/chat/planner_actions/planner.py index 1ae92b8c2..e4e8b216a 100644 --- a/src/chat/planner_actions/planner.py +++ b/src/chat/planner_actions/planner.py @@ -64,13 +64,15 @@ class ActionPlanner: "other_actions_executed": 0, } - async def plan(self, mode: ChatMode = ChatMode.FOCUS, unread_messages: List[Dict] = None) -> Tuple[List[Dict], Optional[Dict]]: + async def plan(self, mode: ChatMode = ChatMode.FOCUS, message_data: dict = None) -> Tuple[List[Dict], Optional[Dict]]: """ 执行完整的增强版规划流程。 Args: mode (ChatMode): 当前的聊天模式,默认为 FOCUS。 - unread_messages (List[Dict]): 未读消息列表,用于兴趣度计算。 + message_data (dict): 消息数据字典,包含: + - unread_messages: 未读消息列表 + - history_messages: 历史消息列表(可选) Returns: Tuple[List[Dict], Optional[Dict]]: 一个元组,包含: @@ -78,6 +80,8 @@ class ActionPlanner: - final_target_message_dict (Optional[Dict]): 最终的目标消息(字典格式)。 """ try: + # 提取未读消息用于兴趣度计算 + unread_messages = message_data.get("unread_messages", []) if message_data else [] self.planner_stats["total_plans"] += 1 return await self._enhanced_plan_flow(mode, unread_messages or []) @@ -118,12 +122,13 @@ class ActionPlanner: logger.info(f"❌ 兴趣度不足阈值的80%: {score:.3f} < {threshold_requirement:.3f},直接返回no_action") logger.info(f"📊 最低要求: 阈值({base_threshold:.3f}) × 0.8 = {threshold_requirement:.3f}") # 直接返回 no_action - no_action = { - "action_type": "no_action", - "reason": f"兴趣度评分 {score:.3f} 未达阈值80% {threshold_requirement:.3f}", - "action_data": {}, - "action_message": None, - } + from src.common.data_models.info_data_model import ActionPlannerInfo + no_action = ActionPlannerInfo( + action_type="no_action", + reasoning=f"兴趣度评分 {score:.3f} 未达阈值80% {threshold_requirement:.3f}", + action_data={}, + action_message=None, + ) filtered_plan = initial_plan filtered_plan.decided_actions = [no_action] else: diff --git a/src/chat/planner_actions/planner_prompts.py b/src/chat/planner_actions/planner_prompts.py index 29ef4b916..7e509c8c8 100644 --- a/src/chat/planner_actions/planner_prompts.py +++ b/src/chat/planner_actions/planner_prompts.py @@ -4,6 +4,7 @@ 通过将提示词与代码逻辑分离,可以更方便地对模型的行为进行迭代和优化, 而无需修改核心代码。 """ + from src.chat.utils.prompt import Prompt @@ -25,7 +26,12 @@ def init_prompts(): {users_in_chat} {custom_prompt_block} {chat_context_description},以下是具体的聊天内容。 -{chat_content_block} + +## 📜 已读历史消息(仅供参考) +{read_history_block} + +## 📬 未读历史消息(动作执行对象) +{unread_history_block} {moderation_prompt} @@ -35,10 +41,13 @@ def init_prompts(): 2. **辅助动作 (可选)**: 这是为了增强表达效果的附加动作,例如 `emoji`(发送表情包)或 `poke_user`(戳一戳)。 **决策流程:** -1. 首先,决定是否要进行 `reply`(如果有)。 -2. 然后,评估当前的对话气氛和用户情绪,判断是否需要一个**辅助动作**来让你的回应更生动、更符合你的性格。 -3. 如果需要,选择一个最合适的辅助动作与 `reply`(如果有) 组合。 -4. 如果用户明确要求了某个动作,请务必优先满足。 +1. **重要:已读历史消息仅作为当前聊天情景的参考,帮助你理解对话上下文。** +2. **重要:所有动作的执行对象只能是未读历史消息中的消息,不能对已读消息执行动作。** +3. 在未读历史消息中,优先对兴趣值高的消息做出动作(兴趣值标注在消息末尾)。 +4. 首先,决定是否要对未读消息进行 `reply`(如果有)。 +5. 然后,评估当前的对话气氛和用户情绪,判断是否需要一个**辅助动作**来让你的回应更生动、更符合你的性格。 +6. 如果需要,选择一个最合适的辅助动作与 `reply`(如果有) 组合。 +7. 如果用户明确要求了某个动作,请务必优先满足。 **如果可选动作中没有reply,请不要使用** @@ -77,7 +86,9 @@ def init_prompts(): ] **重要规则:** -当 `reply` 和 `emoji` 动作同时被选择时,`emoji` 动作的 `reason` 字段必须包含 `reply` 动作最终生成的回复文本内容。你需要将 `` 占位符替换为 `reply` 动作的 `reason` 字段内容,以确保表情包的选择与回复文本高度相关。 +1. 当 `reply` 和 `emoji` 动作同时被选择时,`emoji` 动作的 `reason` 字段必须包含 `reply` 动作最终生成的回复文本内容。你需要将 `` 占位符替换为 `reply` 动作的 `reason` 字段内容,以确保表情包的选择与回复文本高度相关。 +2. **动作执行限制:所有动作的target_message_id必须是未读历史消息中的消息ID(消息ID格式:m123)。** +3. **兴趣度优先:在多个未读消息中,优先选择兴趣值高的消息进行回复。** 不要输出markdown格式```json等内容,直接输出且仅包含 JSON 列表内容: """, @@ -161,4 +172,4 @@ def init_prompts(): # 在模块加载时自动调用,完成提示词的注册。 -init_prompts() \ No newline at end of file +init_prompts() diff --git a/src/chat/replyer/default_generator.py b/src/chat/replyer/default_generator.py index d2a8eb850..03818a1d8 100644 --- a/src/chat/replyer/default_generator.py +++ b/src/chat/replyer/default_generator.py @@ -83,13 +83,13 @@ def init_prompt(): - {schedule_block} ## 历史记录 -### 当前群聊中的所有人的聊天记录: -{background_dialogue_prompt} +### 📜 已读历史消息(仅供参考) +{read_history_prompt} {cross_context_block} -### 当前群聊中正在与你对话的聊天记录 -{core_dialogue_prompt} +### 📬 未读历史消息(动作执行对象) +{unread_history_prompt} ## 表达方式 - *你需要参考你的回复风格:* @@ -119,6 +119,11 @@ def init_prompt(): ## 规则 {safety_guidelines_block} +**重要提醒:** +- **已读历史消息仅作为当前聊天情景的参考** +- **动作执行对象只能是未读历史消息中的消息** +- **请优先对兴趣值高的消息做出回复**(兴趣度标注在未读消息末尾) + 在回应之前,首先分析消息的针对性: 1. **直接针对你**:@你、回复你、明确询问你 → 必须回应 2. **间接相关**:涉及你感兴趣的话题但未直接问你 → 谨慎参与 @@ -657,78 +662,195 @@ class DefaultReplyer: duration = end_time - start_time return name, result, duration - def build_s4u_chat_history_prompts( - self, message_list_before_now: List[Dict[str, Any]], target_user_id: str, sender: str + async def build_s4u_chat_history_prompts( + self, message_list_before_now: List[Dict[str, Any]], target_user_id: str, sender: str, chat_id: str ) -> Tuple[str, str]: """ - 构建 s4u 风格的分离对话 prompt + 构建 s4u 风格的已读/未读历史消息 prompt Args: message_list_before_now: 历史消息列表 target_user_id: 目标用户ID(当前对话对象) + sender: 发送者名称 + chat_id: 聊天ID Returns: - Tuple[str, str]: (核心对话prompt, 背景对话prompt) + Tuple[str, str]: (已读历史消息prompt, 未读历史消息prompt) """ - core_dialogue_list = [] + try: + # 从message_manager获取真实的已读/未读消息 + from src.chat.message_manager.message_manager import message_manager + + # 获取聊天流的上下文 + stream_context = message_manager.stream_contexts.get(chat_id) + if stream_context: + # 使用真正的已读和未读消息 + read_messages = stream_context.history_messages # 已读消息 + unread_messages = stream_context.get_unread_messages() # 未读消息 + + # 构建已读历史消息 prompt + read_history_prompt = "" + if read_messages: + read_content = build_readable_messages( + [msg.flatten() for msg in read_messages[-50:]], # 限制数量 + replace_bot_name=True, + timestamp_mode="normal_no_YMD", + truncate=True, + ) + read_history_prompt = f"这是已读历史消息,仅作为当前聊天情景的参考:\n{read_content}" + else: + read_history_prompt = "暂无已读历史消息" + + # 构建未读历史消息 prompt(包含兴趣度) + unread_history_prompt = "" + if unread_messages: + # 尝试获取兴趣度评分 + interest_scores = await self._get_interest_scores_for_messages([msg.flatten() for msg in unread_messages]) + + unread_lines = [] + for msg in unread_messages: + msg_id = msg.message_id + msg_time = time.strftime('%H:%M:%S', time.localtime(msg.time)) + msg_content = msg.processed_plain_text + + # 添加兴趣度信息 + interest_score = interest_scores.get(msg_id, 0.0) + interest_text = f" [兴趣度: {interest_score:.3f}]" if interest_score > 0 else "" + + unread_lines.append(f"{msg_time}: {msg_content}{interest_text}") + + unread_history_prompt_str = "\n".join(unread_lines) + unread_history_prompt = f"这是未读历史消息,包含兴趣度评分,请优先对兴趣值高的消息做出动作:\n{unread_history_prompt_str}" + else: + unread_history_prompt = "暂无未读历史消息" + + return read_history_prompt, unread_history_prompt + else: + # 回退到传统方法 + return await self._fallback_build_chat_history_prompts(message_list_before_now, target_user_id, sender) + + except Exception as e: + logger.warning(f"获取已读/未读历史消息失败,使用回退方法: {e}") + return await self._fallback_build_chat_history_prompts(message_list_before_now, target_user_id, sender) + + async def _fallback_build_chat_history_prompts( + self, message_list_before_now: List[Dict[str, Any]], target_user_id: str, sender: str + ) -> Tuple[str, str]: + """ + 回退的已读/未读历史消息构建方法 + """ + # 通过is_read字段分离已读和未读消息 + read_messages = [] + unread_messages = [] bot_id = str(global_config.bot.qq_account) - # 过滤消息:分离bot和目标用户的对话 vs 其他用户的对话 for msg_dict in message_list_before_now: try: msg_user_id = str(msg_dict.get("user_id")) - reply_to = msg_dict.get("reply_to", "") - _platform, reply_to_user_id = self._parse_reply_target(reply_to) - if (msg_user_id == bot_id and reply_to_user_id == target_user_id) or msg_user_id == target_user_id: - # bot 和目标用户的对话 - core_dialogue_list.append(msg_dict) + if msg_dict.get("is_read", False): + read_messages.append(msg_dict) + else: + unread_messages.append(msg_dict) except Exception as e: logger.error(f"处理消息记录时出错: {msg_dict}, 错误: {e}") - # 构建背景对话 prompt - all_dialogue_prompt = "" - if message_list_before_now: - latest_25_msgs = message_list_before_now[-int(global_config.chat.max_context_size) :] - all_dialogue_prompt_str = build_readable_messages( - latest_25_msgs, + # 如果没有is_read字段,使用原有的逻辑 + if not read_messages and not unread_messages: + # 使用原有的核心对话逻辑 + core_dialogue_list = [] + for msg_dict in message_list_before_now: + try: + msg_user_id = str(msg_dict.get("user_id")) + reply_to = msg_dict.get("reply_to", "") + _platform, reply_to_user_id = self._parse_reply_target(reply_to) + if (msg_user_id == bot_id and reply_to_user_id == target_user_id) or msg_user_id == target_user_id: + core_dialogue_list.append(msg_dict) + except Exception as e: + logger.error(f"处理消息记录时出错: {msg_dict}, 错误: {e}") + + read_messages = [msg for msg in message_list_before_now if msg not in core_dialogue_list] + unread_messages = core_dialogue_list + + # 构建已读历史消息 prompt + read_history_prompt = "" + if read_messages: + read_content = build_readable_messages( + read_messages[-50:], replace_bot_name=True, - timestamp_mode="normal", + timestamp_mode="normal_no_YMD", truncate=True, ) - all_dialogue_prompt = f"所有用户的发言:\n{all_dialogue_prompt_str}" + read_history_prompt = f"这是已读历史消息,仅作为当前聊天情景的参考:\n{read_content}" + else: + read_history_prompt = "暂无已读历史消息" - # 构建核心对话 prompt - core_dialogue_prompt = "" - if core_dialogue_list: - # 检查最新五条消息中是否包含bot自己说的消息 - latest_5_messages = core_dialogue_list[-5:] if len(core_dialogue_list) >= 5 else core_dialogue_list - has_bot_message = any(str(msg.get("user_id")) == bot_id for msg in latest_5_messages) + # 构建未读历史消息 prompt + unread_history_prompt = "" + if unread_messages: + # 尝试获取兴趣度评分 + interest_scores = await self._get_interest_scores_for_messages(unread_messages) - # logger.info(f"最新五条消息:{latest_5_messages}") - # logger.info(f"最新五条消息中是否包含bot自己说的消息:{has_bot_message}") + unread_lines = [] + for msg in unread_messages: + msg_id = msg.get("message_id", "") + msg_time = time.strftime('%H:%M:%S', time.localtime(msg.get("time", time.time()))) + msg_content = msg.get("processed_plain_text", "") - # 如果最新五条消息中不包含bot的消息,则返回空字符串 - if not has_bot_message: - core_dialogue_prompt = "" - else: - core_dialogue_list = core_dialogue_list[-int(global_config.chat.max_context_size * 2) :] # 限制消息数量 + # 添加兴趣度信息 + interest_score = interest_scores.get(msg_id, 0.0) + interest_text = f" [兴趣度: {interest_score:.3f}]" if interest_score > 0 else "" - core_dialogue_prompt_str = build_readable_messages( - core_dialogue_list, - replace_bot_name=True, - merge_messages=False, - timestamp_mode="normal_no_YMD", - read_mark=0.0, - truncate=True, - show_actions=True, - ) - core_dialogue_prompt = f"""-------------------------------- -这是你和{sender}的对话,你们正在交流中: -{core_dialogue_prompt_str} --------------------------------- -""" + unread_lines.append(f"{msg_time}: {msg_content}{interest_text}") - return core_dialogue_prompt, all_dialogue_prompt + unread_history_prompt_str = "\n".join(unread_lines) + unread_history_prompt = f"这是未读历史消息,包含兴趣度评分,请优先对兴趣值高的消息做出动作:\n{unread_history_prompt_str}" + else: + unread_history_prompt = "暂无未读历史消息" + + return read_history_prompt, unread_history_prompt + + async def _get_interest_scores_for_messages(self, messages: List[dict]) -> dict[str, float]: + """为消息获取兴趣度评分""" + interest_scores = {} + + try: + from src.chat.affinity_flow.interest_scoring import interest_scoring_system + from src.common.data_models.database_data_model import DatabaseMessages + + # 转换消息格式 + db_messages = [] + for msg_dict in messages: + try: + db_msg = DatabaseMessages( + message_id=msg_dict.get("message_id", ""), + time=msg_dict.get("time", time.time()), + chat_id=msg_dict.get("chat_id", ""), + processed_plain_text=msg_dict.get("processed_plain_text", ""), + user_id=msg_dict.get("user_id", ""), + user_nickname=msg_dict.get("user_nickname", ""), + user_platform=msg_dict.get("platform", "qq"), + chat_info_group_id=msg_dict.get("group_id", ""), + chat_info_group_name=msg_dict.get("group_name", ""), + chat_info_group_platform=msg_dict.get("platform", "qq"), + ) + db_messages.append(db_msg) + except Exception as e: + logger.warning(f"转换消息格式失败: {e}") + continue + + # 计算兴趣度评分 + if db_messages: + bot_nickname = global_config.bot.nickname or "麦麦" + scores = await interest_scoring_system.calculate_interest_scores(db_messages, bot_nickname) + + # 构建兴趣度字典 + for score in scores: + interest_scores[score.message_id] = score.total_score + + except Exception as e: + logger.warning(f"获取兴趣度评分失败: {e}") + + return interest_scores def build_mai_think_context( self, diff --git a/src/chat/utils/prompt.py b/src/chat/utils/prompt.py index 217a2071b..fa73f9538 100644 --- a/src/chat/utils/prompt.py +++ b/src/chat/utils/prompt.py @@ -441,15 +441,16 @@ class Prompt: """构建S4U模式的聊天上下文""" if not self.parameters.message_list_before_now_long: return - - core_dialogue, background_dialogue = await self._build_s4u_chat_history_prompts( + + read_history_prompt, unread_history_prompt = await self._build_s4u_chat_history_prompts( self.parameters.message_list_before_now_long, self.parameters.target_user_info.get("user_id") if self.parameters.target_user_info else "", - self.parameters.sender + self.parameters.sender, + self.parameters.chat_id ) - - context_data["core_dialogue_prompt"] = core_dialogue - context_data["background_dialogue_prompt"] = background_dialogue + + context_data["read_history_prompt"] = read_history_prompt + context_data["unread_history_prompt"] = unread_history_prompt async def _build_normal_chat_context(self, context_data: Dict[str, Any]) -> None: """构建normal模式的聊天上下文""" @@ -460,62 +461,22 @@ class Prompt: {self.parameters.chat_talking_prompt_short}""" async def _build_s4u_chat_history_prompts( - self, message_list_before_now: List[Dict[str, Any]], target_user_id: str, sender: str + self, message_list_before_now: List[Dict[str, Any]], target_user_id: str, sender: str, chat_id: str ) -> Tuple[str, str]: - """构建S4U风格的分离对话prompt""" - # 实现逻辑与原有SmartPromptBuilder相同 - core_dialogue_list = [] - bot_id = str(global_config.bot.qq_account) - - for msg_dict in message_list_before_now: - try: - msg_user_id = str(msg_dict.get("user_id")) - reply_to = msg_dict.get("reply_to", "") - platform, reply_to_user_id = Prompt.parse_reply_target(reply_to) - if (msg_user_id == bot_id and reply_to_user_id == target_user_id) or msg_user_id == target_user_id: - core_dialogue_list.append(msg_dict) - except Exception as e: - logger.error(f"处理消息记录时出错: {msg_dict}, 错误: {e}") - - # 构建背景对话 prompt - all_dialogue_prompt = "" - if message_list_before_now: - latest_25_msgs = message_list_before_now[-int(global_config.chat.max_context_size) :] - all_dialogue_prompt_str = build_readable_messages( - latest_25_msgs, - replace_bot_name=True, - timestamp_mode="normal", - truncate=True, + """构建S4U风格的已读/未读历史消息prompt""" + try: + # 动态导入default_generator以避免循环导入 + from src.plugin_system.apis.generator_api import get_replyer + + # 创建临时生成器实例来使用其方法 + temp_generator = get_replyer(None, chat_id, request_type="prompt_building") + return await temp_generator.build_s4u_chat_history_prompts( + message_list_before_now, target_user_id, sender, chat_id ) - all_dialogue_prompt = f"所有用户的发言:\n{all_dialogue_prompt_str}" - - # 构建核心对话 prompt - core_dialogue_prompt = "" - if core_dialogue_list: - latest_5_messages = core_dialogue_list[-5:] if len(core_dialogue_list) >= 5 else core_dialogue_list - has_bot_message = any(str(msg.get("user_id")) == bot_id for msg in latest_5_messages) - - if not has_bot_message: - core_dialogue_prompt = "" - else: - core_dialogue_list = core_dialogue_list[-int(global_config.chat.max_context_size * 2) :] - - core_dialogue_prompt_str = build_readable_messages( - core_dialogue_list, - replace_bot_name=True, - merge_messages=False, - timestamp_mode="normal_no_YMD", - read_mark=0.0, - truncate=True, - show_actions=True, - ) - core_dialogue_prompt = f"""-------------------------------- -这是你和{sender}的对话,你们正在交流中: -{core_dialogue_prompt_str} --------------------------------- -""" - - return core_dialogue_prompt, all_dialogue_prompt + except Exception as e: + logger.error(f"构建S4U历史消息prompt失败: {e}") + + async def _build_expression_habits(self) -> Dict[str, Any]: """构建表达习惯""" @@ -759,9 +720,9 @@ class Prompt: "action_descriptions": self.parameters.action_descriptions or context_data.get("action_descriptions", ""), "sender_name": self.parameters.sender or "未知用户", "mood_state": self.parameters.mood_prompt or context_data.get("mood_state", ""), - "background_dialogue_prompt": context_data.get("background_dialogue_prompt", ""), + "read_history_prompt": context_data.get("read_history_prompt", ""), + "unread_history_prompt": context_data.get("unread_history_prompt", ""), "time_block": context_data.get("time_block", ""), - "core_dialogue_prompt": context_data.get("core_dialogue_prompt", ""), "reply_target_block": context_data.get("reply_target_block", ""), "reply_style": global_config.personality.reply_style, "keywords_reaction_prompt": self.parameters.keywords_reaction_prompt or context_data.get("keywords_reaction_prompt", ""), diff --git a/src/common/data_models/message_data_model.py b/src/common/data_models/message_data_model.py deleted file mode 100644 index 8e0b77862..000000000 --- a/src/common/data_models/message_data_model.py +++ /dev/null @@ -1,36 +0,0 @@ -from typing import Optional, TYPE_CHECKING -from dataclasses import dataclass, field - -from . import BaseDataModel - -if TYPE_CHECKING: - from .database_data_model import DatabaseMessages - - -@dataclass -class MessageAndActionModel(BaseDataModel): - chat_id: str = field(default_factory=str) - time: float = field(default_factory=float) - user_id: str = field(default_factory=str) - user_platform: str = field(default_factory=str) - user_nickname: str = field(default_factory=str) - user_cardname: Optional[str] = None - processed_plain_text: Optional[str] = None - display_message: Optional[str] = None - chat_info_platform: str = field(default_factory=str) - is_action_record: bool = field(default=False) - action_name: Optional[str] = None - - @classmethod - def from_DatabaseMessages(cls, message: "DatabaseMessages"): - return cls( - chat_id=message.chat_id, - time=message.time, - user_id=message.user_info.user_id, - user_platform=message.user_info.platform, - user_nickname=message.user_info.user_nickname, - user_cardname=message.user_info.user_cardname, - processed_plain_text=message.processed_plain_text, - display_message=message.display_message, - chat_info_platform=message.chat_info.platform, - ) diff --git a/src/common/data_models/message_manager_data_model.py b/src/common/data_models/message_manager_data_model.py index 82919d3b6..a54cb826b 100644 --- a/src/common/data_models/message_manager_data_model.py +++ b/src/common/data_models/message_manager_data_model.py @@ -49,11 +49,11 @@ class StreamContext(BaseDataModel): self.unread_messages.remove(msg) break - def get_context_messages(self, limit: int = 20) -> List["DatabaseMessages"]: - """获取上下文消息(历史消息+未读消息)""" + def get_history_messages(self, limit: int = 20) -> List["DatabaseMessages"]: + """获取历史消息""" # 优先返回最近的历史消息和所有未读消息 recent_history = self.history_messages[-limit:] if len(self.history_messages) > limit else self.history_messages - return recent_history + self.unread_messages + return recent_history @dataclass